Browse Source

新增feign等接口及数据库创建方法;

dev
swordmeng 3 weeks ago
parent
commit
b271ba7a24
  1. 2
      data-framework/src/main/java/com/huaxing/common/exception/BaseException.java
  2. 2
      data-framework/src/main/java/com/huaxing/common/exception/BizException.java
  3. 5
      data-framework/src/main/java/com/huaxing/common/result/ResultVo.java
  4. 2
      data-framework/src/main/java/com/huaxing/common/util/JacksonUtil.java
  5. 26
      data-framework/src/main/java/com/huaxing/common/util/StrUtils.java
  6. 9
      data-framework/src/main/java/com/huaxing/dolphindb/base/CommonService.java
  7. 41
      data-storage-api/pom.xml
  8. 7
      data-storage-api/src/main/java/com/huaxing/Main.java
  9. 73
      data-storage-api/src/main/java/com/huaxing/feign/IDatabaseClientFeign.java
  10. 41
      data-storage-api/src/main/java/com/huaxing/feign/fallback/DatabaseFeignFallbackFactory.java
  11. 18
      data-storage-api/src/main/java/com/huaxing/pojo/entity/DatabaseDTO.java
  12. 4
      data-storage-api/src/main/java/com/huaxing/pojo/entity/TableColumnDTO.java
  13. 16
      data-storage-api/src/main/java/com/huaxing/pojo/entity/TableDTO.java
  14. 24
      data-storage/pom.xml
  15. 2
      data-storage/src/main/java/com/huaxing/IotDataBridgeApplication.java
  16. 240
      data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseController.java
  17. 5
      data-storage/src/main/java/com/huaxing/data/database/domain/TableDTO.java
  18. 17
      data-storage/src/main/java/com/huaxing/data/database/service/IDatabaseService.java
  19. 7
      data-storage/src/main/java/com/huaxing/data/database/service/ITableStructureService.java
  20. 62
      data-storage/src/main/java/com/huaxing/data/database/service/impl/DatabaseServiceImpl.java
  21. 18
      data-storage/src/main/java/com/huaxing/data/database/service/impl/TableStructureService.java
  22. 14
      data-storage/src/main/java/com/huaxing/data/database/template/ISqlTemplateService.java
  23. 35
      data-storage/src/main/java/com/huaxing/data/database/template/SqlTemplateServiceImpl.java
  24. 415
      data-storage/src/main/java/com/huaxing/test/TestController.java
  25. 25
      pom.xml

2
data-framework/src/main/java/com/huaxing/common/result/BaseException.java → data-framework/src/main/java/com/huaxing/common/exception/BaseException.java

@ -1,4 +1,4 @@
package com.huaxing.common.result; package com.huaxing.common.exception;
import com.huaxing.common.constant.AppConstant; import com.huaxing.common.constant.AppConstant;
import lombok.Getter; import lombok.Getter;

2
data-framework/src/main/java/com/huaxing/common/result/BizException.java → data-framework/src/main/java/com/huaxing/common/exception/BizException.java

@ -1,4 +1,4 @@
package com.huaxing.common.result; package com.huaxing.common.exception;
import com.huaxing.common.constant.AppConstant; import com.huaxing.common.constant.AppConstant;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;

5
data-framework/src/main/java/com/huaxing/common/result/ResultVo.java

@ -1,6 +1,7 @@
package com.huaxing.common.result; package com.huaxing.common.result;
import com.huaxing.common.constant.AppConstant; import com.huaxing.common.constant.AppConstant;
import com.huaxing.common.exception.BaseException;
import lombok.Getter; import lombok.Getter;
import java.io.Serializable; import java.io.Serializable;
@ -49,4 +50,8 @@ public class ResultVo<T> implements Serializable {
public static <D> ResultVo<D> fail(BaseException exception) { public static <D> ResultVo<D> fail(BaseException exception) {
return new ResultVo<>(exception); return new ResultVo<>(exception);
} }
public static ResultVo<?> status(boolean flag) {
return flag ? ResultVo.ok() : ResultVo.fail("操作失败!");
}
} }

2
data-framework/src/main/java/com/huaxing/common/util/JacksonUtil.java

@ -21,10 +21,8 @@ public class JacksonUtil {
public static String objectStr(Object object) { public static String objectStr(Object object) {
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
try { try {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
return mapper.writeValueAsString(object); return mapper.writeValueAsString(object);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new RuntimeException(e); throw new RuntimeException(e);

26
data-framework/src/main/java/com/huaxing/common/util/StrUtils.java

@ -0,0 +1,26 @@
package com.huaxing.common.util;
/**
* @ProjectName: data-bridge
* @Package: com.huaxing.common.util
* @ClassName: StrUtils
* @Author: swordmeng8@163.com
* @Description: 字符串工具类
* @Date: 2025/2/12 15:23
* @Version: 1.0
*/
public class StrUtils {
/**
* 判断字符串是否以指定前缀开头
* @param str
* @param prefix
* @return
*/
public static boolean checkStringStartsWithExample(String str, String prefix) {
// 使用 startsWith 方法进行判断
return str.startsWith(prefix);
}
}

9
data-framework/src/main/java/com/huaxing/dolphindb/base/CommonService.java

@ -23,8 +23,6 @@ public class CommonService extends SqlConverterStatementHandle {
@Autowired @Autowired
private AbstractDbConnector dbPool; private AbstractDbConnector dbPool;
/** /**
* 执行单条sql * 执行单条sql
* *
@ -43,20 +41,23 @@ public class CommonService extends SqlConverterStatementHandle {
} }
/** /**
* 执行单条sql * 执行单条sql bool返回值
* *
* @param sql * @param sql
*/ */
public void exec(String sql) { public boolean exec(String sql) {
boolean isSuccess = false;
DBConnection connection = dbPool.getConnection(); DBConnection connection = dbPool.getConnection();
try { try {
connection.run(sql); connection.run(sql);
isSuccess = true;
} catch (IOException e) { } catch (IOException e) {
log.error("AbstractDbConnector.exec() Method执行异常:{}", e.getMessage()); log.error("AbstractDbConnector.exec() Method执行异常:{}", e.getMessage());
throw new RuntimeException(e); throw new RuntimeException(e);
} finally { } finally {
connection.close(); connection.close();
} }
return isSuccess;
} }
/** /**

41
data-storage-api/pom.xml

@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.huaxing</groupId>
<artifactId>data-bridge</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>data-storage-api</artifactId>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.huaxing</groupId>
<artifactId>data-framework</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<version>2.2.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
<version>3.1.4</version>
</dependency>
</dependencies>
</project>

7
data-storage-api/src/main/java/com/huaxing/Main.java

@ -0,0 +1,7 @@
package com.huaxing;
public class Main {
public static void main(String[] args) {
System.out.println("Hello world!");
}
}

73
data-storage-api/src/main/java/com/huaxing/feign/IDatabaseClientFeign.java

@ -0,0 +1,73 @@
package com.huaxing.feign;
import com.huaxing.common.constant.AppConstant;
import com.huaxing.common.result.ResultVo;
import com.huaxing.feign.fallback.DatabaseFeignFallbackFactory;
import com.huaxing.pojo.entity.DatabaseDTO;
import com.huaxing.pojo.entity.TableDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
* Notice Feign接口类
*
* @author Chill
*/
@FeignClient(name = AppConstant.APP_NAME, fallbackFactory = DatabaseFeignFallbackFactory.class)
public interface IDatabaseClientFeign {
// 请求前缀
String API_PREFIX = "/api";
// 创建数据库
String CREATE_DATABASE = API_PREFIX + "/database/create";
// 创建dfs表
String CREATE_DFS_TABLE = API_PREFIX + "/table/create/dfs";
// 创建stream表
String CREATE_STREAM_TABLE = API_PREFIX + "/table/create/stream";
// 创建dfs和stream表并订阅
String CREATE_DFS_AND_STREAM_TABLE_SUBSCRIBE = API_PREFIX + "/table/create/dfs/stream/subscribe";
// 创建dfs和stream表不订阅
String CREATE_DFS_AND_STREAM_TABLE_UNSUBSCRIBE = API_PREFIX + "/table/create/dfs/stream/unsubscribe";
// 订阅流表
String SUBSCRIBE_STREAM_TABLE = API_PREFIX + "/table/subscribe/stream";
// 取消订阅流表
String UNSUBSCRIBE_STREAM_TABLE = API_PREFIX + "/table/unsubscribe/stream";
// 新增dfs表字段
String ADD_DFS_COLUMNS = API_PREFIX + "/table/add/dfs/columns";
// 新增stream表字段
String ADD_STREAM_COLUMNS = API_PREFIX + "/table/add/stream/columns";
/**
* 创建数据库
* @param databaseDTO
* @return ResultVo
*/
@PostMapping(CREATE_DATABASE)
ResultVo<?> createDatabase(@RequestBody @Validated DatabaseDTO databaseDTO);
/**
* 创建dfs表
* @param tableDTO
* @return ResultVo
*/
@PostMapping(CREATE_DFS_TABLE)
ResultVo<?> createDfsTable(@RequestBody @Validated TableDTO tableDTO);
}

41
data-storage-api/src/main/java/com/huaxing/feign/fallback/DatabaseFeignFallbackFactory.java

@ -0,0 +1,41 @@
package com.huaxing.feign.fallback;
import com.huaxing.common.result.ResultVo;
import com.huaxing.feign.IDatabaseClientFeign;
import com.huaxing.pojo.entity.DatabaseDTO;
import com.huaxing.pojo.entity.TableDTO;
import feign.hystrix.FallbackFactory;
/**
* @ProjectName: data-bridge
* @Package: com.huaxing.fallback
* @ClassName: DataStorageFeignFallbackFactory
* @Author: swordmeng8@163.com
* @Description: 1
* @Date: 2025/2/12 10:29
* @Version: 1.0
*/
public class DatabaseFeignFallbackFactory implements FallbackFactory<IDatabaseClientFeign> {
@Override
public IDatabaseClientFeign create(Throwable throwable) {
return new IDatabaseClientFeign() {
/**
* 创建数据库
* @param query
* @return
*/
@Override
public ResultVo<?> createDatabase(DatabaseDTO query) {
ResultVo.fail(throwable.getMessage());
return null;
}
@Override
public ResultVo<?> createDfsTable(TableDTO tableDTO) {
return null;
}
};
}
}

18
data-storage-api/src/main/java/com/huaxing/pojo/entity/DatabaseDTO.java

@ -0,0 +1,18 @@
package com.huaxing.pojo.entity;
import lombok.Data;
/**
* @ProjectName: data-bridge
* @Package: com.huaxing.pojo.entity
* @ClassName: DatabaseDTO
* @Author: swordmeng8@163.com
* @Description: 数据库建库接参对象
* @Date: 2025/2/12 17:25
* @Version: 1.0
*/
@Data
public class DatabaseDTO {
private String databaseName;
private String databaseComment;
}

4
data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableColumnDTO.java → data-storage-api/src/main/java/com/huaxing/pojo/entity/TableColumnDTO.java

@ -1,10 +1,10 @@
package com.huaxing.data.tablemanagement.domain; package com.huaxing.pojo.entity;
import lombok.Data; import lombok.Data;
/** /**
* @ProjectName: iot-data-bridge * @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.tablemanagement.domain * @Package: com.huaxing.data.database.domain
* @ClassName: TableColumnDTO * @ClassName: TableColumnDTO
* @Author: swordmeng8@163.com * @Author: swordmeng8@163.com
* @Description: 表列DTO * @Description: 表列DTO

16
data-storage-api/src/main/java/com/huaxing/pojo/entity/TableDTO.java

@ -0,0 +1,16 @@
package com.huaxing.pojo.entity;
import lombok.Data;
/**
* @ProjectName: data-bridge
* @Package: com.huaxing.pojo.entity
* @ClassName: TableDTO
* @Author: swordmeng8@163.com
* @Description: 表结构接参类
* @Date: 2025/2/12 17:40
* @Version: 1.0
*/
@Data
public class TableDTO {
}

24
data-storage/pom.xml

@ -48,11 +48,33 @@
<artifactId>spring-web</artifactId> <artifactId>spring-web</artifactId>
<version>6.2.1</version> <version>6.2.1</version>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<version>2.2.9.RELEASE</version>
</dependency>
<dependency>
<groupId>com.huaxing</groupId>
<artifactId>data-storage-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies> </dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2021.0.5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project> </project>

2
data-storage/src/main/java/com/huaxing/IotDataBridgeApplication.java

@ -2,7 +2,9 @@ package com.huaxing;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
@EnableFeignClients
@SpringBootApplication @SpringBootApplication
public class IotDataBridgeApplication { public class IotDataBridgeApplication {

240
data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseController.java

@ -0,0 +1,240 @@
package com.huaxing.data.database.controller;
import com.huaxing.data.storage.domain.DataAnalysisDTO;
import com.huaxing.data.storage.service.IDeviceDataQueryDfsService;
import com.huaxing.data.storage.service.IDeviceDataQueryStreamService;
import com.huaxing.data.storage.service.IDeviceDataStoredService;
import com.huaxing.data.database.domain.TableDTO;
import com.huaxing.data.database.service.ITableStructureService;
import com.huaxing.common.util.JacksonUtil;
import com.huaxing.data.database.template.ISqlTemplateService;
import com.huaxing.mqtt.processor.MqttMessageSender;
import com.huaxing.common.result.ResultVo;
import com.huaxing.pojo.entity.TableColumnDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.*;
/**
* @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.storage.controller
* @ClassName: TestController
* @Author: swordmeng8@163.com
* @Description: 测试controller
* @Date: 2025/1/15 17:08
* @Version: 1.0
*/
@Slf4j
@RestController
@RequestMapping("/api/database")
@SuppressWarnings("all")
public class DatabaseController {
@Autowired
private MqttMessageSender messageSender;
final IDeviceDataStoredService dataStoredService;
final IDeviceDataQueryDfsService dataQueryDfsService;
final IDeviceDataQueryStreamService dataQueryStreamService;
final ITableStructureService tableStructureService;
// =============================================== 测试构造器 ======================================
public DatabaseController(IDeviceDataStoredService dataStoredService, IDeviceDataQueryDfsService dataQueryDfsService, IDeviceDataQueryStreamService dataQueryStreamService, ITableStructureService tableStructureService, ISqlTemplateService tableTemplateService) {
this.dataStoredService = dataStoredService;
this.dataQueryDfsService = dataQueryDfsService;
this.dataQueryStreamService = dataQueryStreamService;
this.tableStructureService = tableStructureService;
}
// =============================================== 测试插入数据 ======================================
/**
* 测试数据插入
*/
@GetMapping(value = "/testInsert")
public void testInsert() {
String tableName = "MyTable2Stream";
String sql = "INSERT INTO " + tableName + " (time, projectId, deviceId, test) VALUES (2025.01.11 00:00:00,'48', '0jZU2102_0806_0011', 'testVal')";
dataStoredService.execute(sql);
log.info("SUCCESS");
}
// =============================================== 测试Dfs表查询 ======================================
/**
* 测试Dfs表查询
*/
@GetMapping(value = "/testSelectDfs")
public ResultVo<List<Map<String, Object>>> testSelectDfs() {
String dbPath = "dfs://ZbDB";
String sql = String.format("select * from loadTable('%s','%s')", dbPath, "ZbWaterMeter1Dfs");
return ResultVo.ok(dataQueryDfsService.selectList(sql));
}
// =============================================== 测试订阅流表查询 ======================================
/**
* 测试订阅流表查询
*/
@GetMapping(value = "/testSelectStream")
public ResultVo<List<Map<String, Object>>> testSelectStream() {
String sql = "select * from WaterMeterTset1Stream";
return ResultVo.ok(dataQueryStreamService.selectList(sql));
}
// =============================================== 给指定的流表增加列字段 ======================================
/**
* 测试给指定的流表增加列字段
*/
@GetMapping(value = "/testStreamAddColumn")
public void testStreamAddColumn() {
String tableName = "WaterMeterTsetStream";
String columnName = "test";
String columnType = "STRING";
String columnDesc = "test";
TableDTO tableDTO = new TableDTO();
tableDTO.setTableName(tableName);
tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc)));
try {
tableStructureService.addStreamColumns(tableDTO);
log.info("SUCCESS");
} catch (Exception e) {
log.error("FAIL");
e.printStackTrace();
}
}
// =============================================== 给指定的Dfs表增加列字段 ======================================
/**
* 测试给指定的Dfs表增加列字段
*/
@GetMapping(value = "/testDfsAddColumn")
public void testDfsAddColumn() {
String tableName = "WaterMeterTsetDfs";
String columnName = "test";
String columnType = "STRING";
String columnDesc = "test描述";
TableDTO tableDTO = new TableDTO();
tableDTO.setTableName(tableName);
tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc)));
try {
tableStructureService.addDfsColumns(tableDTO);
log.info("SUCCESS");
} catch (Exception e) {
log.error("FAIL");
e.printStackTrace();
}
}
// ===================================== mqtt消息发送-测试数据入库 START ==============================
/**
* 测试发送mqtt消息
*/
@GetMapping(value = "/testSendMessage")
public void testSendMessage() {
for (int i = 0; i < 2; i++) {
List<Map<String, Object>> dataList = new ArrayList<>();
dataList.add(handleMapByIndex(i));
DataAnalysisDTO dataAnalysisDTO = DataAnalysisDTO.builder().tableName( "WaterMeterTsetStream").dataList(dataList).build();
messageSender.send("iot/test1/in-storage", JacksonUtil.objectStr(dataAnalysisDTO));
}
}
/**
* 根据索引生成测试数据
* @param index 索引
* @return
*/
private Map<String, Object> handleMapByIndex(int index) {
Map<String, Object> map = new HashMap<>();
map.put("time", new Date());
map.put("projectId", "0jZU2102");
map.put("deviceId", "0jZU2102_0806_0011");
map.put("WM_WFA", 124.656 + index);
map.put("WM_WFA_Unit", "m³");
map.put("test", "test");
return map;
}
// =============================================== 创建Dfs表 Stream流表 取消订阅 添加订阅 创建 ======================================
/**
* 测试创建Dfs表
*/
@GetMapping(value = "/testCreateDfsTable")
public void testCreateDfsTable() {
TableDTO tableDTO = new TableDTO();
tableDTO.setDatabaseName("ZbDB");
tableDTO.setTableName("Test1");
List<TableColumnDTO> tableColumnList = new ArrayList<>();
tableColumnList.add(new TableColumnDTO("test", "STRING", "test"));
tableColumnList.add(new TableColumnDTO("test1", "STRING","test1"));
tableColumnList.add(new TableColumnDTO("test2", "STRING","test2"));
tableDTO.setTableColumnList(tableColumnList);
tableStructureService.createDfsTable(tableDTO);
}
/**
* 测试创建流表
*/
@GetMapping(value = "/testCreateStreamTable")
public void testCreateStreamTable() {
TableDTO tableDTO = new TableDTO();
tableDTO.setDatabaseName("ZbDB");
tableDTO.setTableName("Test1");
List<TableColumnDTO> tableColumnList = new ArrayList<>();
tableColumnList.add(new TableColumnDTO("test", "STRING", "test"));
tableColumnList.add(new TableColumnDTO("test1", "STRING","test1"));
tableColumnList.add(new TableColumnDTO("test2", "STRING","test2"));
tableDTO.setTableColumnList(tableColumnList);
tableStructureService.createStreamTable(tableDTO);
}
/**
* 取消订阅流表
*/
@GetMapping(value = "/testUnsubscribeStreamTable")
public void testUnsubscribeStreamTable() {
TableDTO tableDTO = new TableDTO();
tableDTO.setDatabaseName("ZbDB");
tableDTO.setTableName("Test1");
tableStructureService.unsubscribeStreamTable(tableDTO);
}
/**
* 订阅流表
*/
@GetMapping(value = "/testSubscribeStreamTable")
public void testSubscribeStreamTable() {
TableDTO tableDTO = new TableDTO();
tableDTO.setDatabaseName("ZbDB");
tableDTO.setTableName("Test1");
tableStructureService.subscribeStreamTable(tableDTO);
}
/**
* 创建Dfs和Stream表并添加订阅
*/
@GetMapping(value = "/testCreateDfsAndStreamTableSubscribe")
public void testCreateDfsAndStreamTableSubscribe() {
TableDTO tableDTO = new TableDTO();
tableDTO.setDatabaseName("ZbDB");
tableDTO.setTableName("MyTe2");
tableDTO.setTableDesc("MyTe2测试表");
List<TableColumnDTO> tableColumnList = new ArrayList<>();
tableColumnList.add(new TableColumnDTO("test", "STRING", "test"));
tableDTO.setTableColumnList(tableColumnList);
tableStructureService.createDfsAndStreamTableSubscribe(tableDTO);
}
}

5
data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableDTO.java → data-storage/src/main/java/com/huaxing/data/database/domain/TableDTO.java

@ -1,5 +1,6 @@
package com.huaxing.data.tablemanagement.domain; package com.huaxing.data.database.domain;
import com.huaxing.pojo.entity.TableColumnDTO;
import lombok.Data; import lombok.Data;
import java.util.Iterator; import java.util.Iterator;
@ -8,7 +9,7 @@ import java.util.Objects;
/** /**
* @ProjectName: iot-data-bridge * @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.tablemanagement.domain * @Package: com.huaxing.data.database.domain
* @ClassName: TableDTO * @ClassName: TableDTO
* @Author: swordmeng8@163.com * @Author: swordmeng8@163.com
* @Description: 表操作类 * @Description: 表操作类

17
data-storage/src/main/java/com/huaxing/data/database/service/IDatabaseService.java

@ -0,0 +1,17 @@
package com.huaxing.data.database.service;
import com.huaxing.common.result.ResultVo;
/**
* 数据库操作
* @author 孟剑
* @date 2025-02-12 13:47
*/
public interface IDatabaseService {
// 检查数据库是否存在
ResultVo<?> checkExistsDatabase(String databaseName);
// 创建数据库
ResultVo<?> createDatabase(String databaseName);
}

7
data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableStructureService.java → data-storage/src/main/java/com/huaxing/data/database/service/ITableStructureService.java

@ -1,9 +1,6 @@
package com.huaxing.data.tablemanagement.service; package com.huaxing.data.database.service;
import com.huaxing.data.tablemanagement.domain.TableDTO; import com.huaxing.data.database.domain.TableDTO;
import java.sql.SQLException;
import java.util.List;
/** /**
* @ProjectName: iot-data-bridge * @ProjectName: iot-data-bridge

62
data-storage/src/main/java/com/huaxing/data/database/service/impl/DatabaseServiceImpl.java

@ -0,0 +1,62 @@
package com.huaxing.data.database.service.impl;
import com.huaxing.common.result.ResultVo;
import com.huaxing.common.util.StrUtils;
import com.huaxing.data.database.service.IDatabaseService;
import com.huaxing.data.database.template.ISqlTemplateService;
import com.huaxing.dolphindb.base.CommonService;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;
/**
* @ProjectName: data-bridge
* @Package: com.huaxing.data.database.service.impl
* @ClassName: DatabaseServiceImpl
* @Author: swordmeng8@163.com
* @Description: 数据库操作实现类
* @Date: 2025/2/12 13:48
* @Version: 1.0
*/
@Service
@AllArgsConstructor
public class DatabaseServiceImpl extends CommonService implements IDatabaseService {
final ISqlTemplateService tableTemplateService;
/**
* @description: 检查数据库是否存在
* @param databaseName 数据库名
* @return com.huaxing.common.result.ResultVo<?>
* @author swordmeng8@163.com
* @date 2025/2/12 13:48
*/
@Override
public ResultVo<?> checkExistsDatabase(String databaseName) {
if (!StrUtils.checkStringStartsWithExample(databaseName, "dfs://")) {
return ResultVo.fail("数据库名必须以dfs://开头");
}
;
return ResultVo.status(exec(tableTemplateService.checkExistsDatabaseTemplate(databaseName)));
}
/**
* @description: 创建数据库
* @param databaseName 数据库名
* @return void
* @author swordmeng8@163.com
* @date 2025/2/12 13:48
*/
@Override
public ResultVo<?> createDatabase(String databaseName) {
// 创建数据库操作
// StringBuilder script = new StringBuilder();
// script.append("pt = loadTable(\"dfs://").append(tableDTO.getDatabaseName()).append("\", `").append(tableDTO.getTableName()).append(");");
// tableDTO.getTableColumnList().forEach(columnItem -> {
// script.append("alter table pt add ").append(columnItem.getColumnName()).append(" ").append(columnItem.getColumnType()).append(";");
// });
// exec(script.toString());
return ResultVo.status(true);
}
}

18
data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableStructureService.java → data-storage/src/main/java/com/huaxing/data/database/service/impl/TableStructureService.java

@ -1,14 +1,12 @@
package com.huaxing.data.tablemanagement.service.impl; package com.huaxing.data.database.service.impl;
import com.huaxing.data.tablemanagement.domain.TableDTO; import com.huaxing.data.database.domain.TableDTO;
import com.huaxing.data.tablemanagement.service.ITableTemplateService; import com.huaxing.data.database.template.ISqlTemplateService;
import com.huaxing.dolphindb.base.CommonService; import com.huaxing.dolphindb.base.CommonService;
import com.huaxing.data.tablemanagement.service.ITableStructureService; import com.huaxing.data.database.service.ITableStructureService;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.sql.SQLException;
import java.util.List;
/** /**
* @ProjectName: iot-data-bridge * @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.storage.service.impl * @Package: com.huaxing.data.storage.service.impl
@ -19,12 +17,10 @@ import java.util.List;
* @Version: 1.0 * @Version: 1.0
*/ */
@Service @Service
@AllArgsConstructor
public class TableStructureService extends CommonService implements ITableStructureService { public class TableStructureService extends CommonService implements ITableStructureService {
final ITableTemplateService tableTemplateService; final ISqlTemplateService tableTemplateService;
public TableStructureService(ITableTemplateService tableTemplateService) {
this.tableTemplateService = tableTemplateService;
}
/** /**
* 添加dfs表列 * 添加dfs表列

14
data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableTemplateService.java → data-storage/src/main/java/com/huaxing/data/database/template/ISqlTemplateService.java

@ -1,15 +1,19 @@
package com.huaxing.data.tablemanagement.service; package com.huaxing.data.database.template;
import com.huaxing.data.tablemanagement.domain.TableDTO; import com.huaxing.data.database.domain.TableDTO;
import java.util.List;
/** /**
* 表创建模板 * 表创建模板
* @author 孟剑 * @author 孟剑
* @date 2025-01-20 10:32 * @date 2025-01-20 10:32
*/ */
public interface ITableTemplateService { public interface ISqlTemplateService {
// 检查数据库是否存在SQL模板
String checkExistsDatabaseTemplate(String databaseName);
// 创建数据库SQL模板
String createDatabaseTemplate(String databaseName);
// 创建dfs表模板 // 创建dfs表模板
String createDfsTableTemplate(TableDTO tableDTO); String createDfsTableTemplate(TableDTO tableDTO);

35
data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableTemplateServiceImpl.java → data-storage/src/main/java/com/huaxing/data/database/template/SqlTemplateServiceImpl.java

@ -1,13 +1,12 @@
package com.huaxing.data.tablemanagement.service.impl; package com.huaxing.data.database.template;
import com.huaxing.data.tablemanagement.domain.TableDTO; import com.huaxing.data.database.domain.TableDTO;
import com.huaxing.data.tablemanagement.service.ITableTemplateService;
import com.huaxing.dolphindb.base.CommonService; import com.huaxing.dolphindb.base.CommonService;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/** /**
* @ProjectName: data-bridge * @ProjectName: data-bridge
* @Package: com.huaxing.data.tablemanagement.service.impl * @Package: com.huaxing.data.database.service.impl
* @ClassName: TableTemplateServiceImpl * @ClassName: TableTemplateServiceImpl
* @Author: swordmeng8@163.com * @Author: swordmeng8@163.com
* @Description: 表操作模板 * @Description: 表操作模板
@ -15,7 +14,29 @@ import org.springframework.stereotype.Service;
* @Version: 1.0 * @Version: 1.0
*/ */
@Service @Service
public class TableTemplateServiceImpl extends CommonService implements ITableTemplateService { public class SqlTemplateServiceImpl extends CommonService implements ISqlTemplateService {
/**
* @Description 检查数据库是否存在
* @Author swordmeng8@163.com
* @Date 2025/1/20 10:46
* @Version v1.0
**/
@Override
public String checkExistsDatabaseTemplate(String databaseName) {
return null;
}
/**
* @Description 创建数据库模板
* @Author swordmeng8@163.com
* @Date 2025/1/20 10:46
* @Version v1.0
**/
@Override
public String createDatabaseTemplate(String databaseName) {
return null;
}
/** /**
* @Description 创建 DFS 表模板 * @Description 创建 DFS 表模板
@ -62,7 +83,7 @@ public class TableTemplateServiceImpl extends CommonService implements ITableTem
String sql = ""; String sql = "";
String colNames = tableDTO.handleTableColumnName(tableDTO.getTableColumnList()); String colNames = tableDTO.handleTableColumnName(tableDTO.getTableColumnList());
String colTypes = tableDTO.handleTableColumnType(tableDTO.getTableColumnList()); String colTypes = tableDTO.handleTableColumnType(tableDTO.getTableColumnList());
String colDesc = tableDTO.handleTableColumnDesc(tableDTO.getTableColumnList()); // String colDesc = tableDTO.handleTableColumnDesc(tableDTO.getTableColumnList());
sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" + sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" +
"stName = \"" + tableDTO.getTableName() + "Stream\"\n" + "stName = \"" + tableDTO.getTableName() + "Stream\"\n" +
"def createST(stName){\n" + "def createST(stName){\n" +
@ -71,8 +92,6 @@ public class TableTemplateServiceImpl extends CommonService implements ITableTem
" colTypes = [DATETIME,STRING,STRING,"+ colTypes +"]\n" + " colTypes = [DATETIME,STRING,STRING,"+ colTypes +"]\n" +
" st = streamTable(150000:0, colNames, colTypes)\n" + " st = streamTable(150000:0, colNames, colTypes)\n" +
" enableTableShareAndPersistence(table=st, tableName=stName, retentionMinutes=10, cacheSize=100000, preCache=100000 )\n" + " enableTableShareAndPersistence(table=st, tableName=stName, retentionMinutes=10, cacheSize=100000, preCache=100000 )\n" +
// " setColumnComment(st,{time:\"时间\",projectId:\"项目ID\",deviceId:\"设备编号\"," + colDesc + "})\n" +
// " setTableComment(table=st, comment=\"" + tableDTO.getTableDesc() + "\")\n" +
" return st\n" + " return st\n" +
" }\n" + " }\n" +
" else{\n" + " else{\n" +

415
data-storage/src/main/java/com/huaxing/test/TestController.java

@ -1,24 +1,9 @@
package com.huaxing.test; package com.huaxing.test;
import com.huaxing.data.storage.domain.DataAnalysisDTO;
import com.huaxing.data.storage.service.IDeviceDataQueryDfsService;
import com.huaxing.data.storage.service.IDeviceDataQueryStreamService;
import com.huaxing.data.storage.service.IDeviceDataStoredService;
import com.huaxing.data.tablemanagement.domain.TableColumnDTO;
import com.huaxing.data.tablemanagement.domain.TableDTO;
import com.huaxing.data.tablemanagement.service.ITableStructureService;
import com.huaxing.common.util.JacksonUtil;
import com.huaxing.data.tablemanagement.service.ITableTemplateService;
import com.huaxing.mqtt.processor.MqttMessageSender;
import com.huaxing.common.result.ResultVo;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import java.util.*;
/** /**
* @ProjectName: iot-data-bridge * @ProjectName: iot-data-bridge
@ -35,206 +20,206 @@ import java.util.*;
@SuppressWarnings("all") @SuppressWarnings("all")
public class TestController { public class TestController {
@Autowired // @Autowired
private MqttMessageSender messageSender; // private MqttMessageSender messageSender;
final IDeviceDataStoredService dataStoredService; // final IDeviceDataStoredService dataStoredService;
final IDeviceDataQueryDfsService dataQueryDfsService; // final IDeviceDataQueryDfsService dataQueryDfsService;
final IDeviceDataQueryStreamService dataQueryStreamService; // final IDeviceDataQueryStreamService dataQueryStreamService;
final ITableStructureService tableStructureService; // final ITableStructureService tableStructureService;
//
//
// =============================================== 测试构造器 ====================================== // // =============================================== 测试构造器 ======================================
public TestController(IDeviceDataStoredService dataStoredService, IDeviceDataQueryDfsService dataQueryDfsService, IDeviceDataQueryStreamService dataQueryStreamService, ITableStructureService tableStructureService, ITableTemplateService tableTemplateService) { // public TestController(IDeviceDataStoredService dataStoredService, IDeviceDataQueryDfsService dataQueryDfsService, IDeviceDataQueryStreamService dataQueryStreamService, ITableStructureService tableStructureService, ISqlTemplateService tableTemplateService) {
this.dataStoredService = dataStoredService; // this.dataStoredService = dataStoredService;
this.dataQueryDfsService = dataQueryDfsService; // this.dataQueryDfsService = dataQueryDfsService;
this.dataQueryStreamService = dataQueryStreamService; // this.dataQueryStreamService = dataQueryStreamService;
this.tableStructureService = tableStructureService; // this.tableStructureService = tableStructureService;
} // }
//
//
// =============================================== 测试插入数据 ====================================== // // =============================================== 测试插入数据 ======================================
//
/** // /**
* 测试数据插入 // * 测试数据插入
*/ // */
@GetMapping(value = "/testInsert") // @GetMapping(value = "/testInsert")
public void testInsert() { // public void testInsert() {
String tableName = "MyTable2Stream"; // String tableName = "MyTable2Stream";
String sql = "INSERT INTO " + tableName + " (time, projectId, deviceId, test) VALUES (2025.01.11 00:00:00,'48', '0jZU2102_0806_0011', 'testVal')"; // String sql = "INSERT INTO " + tableName + " (time, projectId, deviceId, test) VALUES (2025.01.11 00:00:00,'48', '0jZU2102_0806_0011', 'testVal')";
dataStoredService.execute(sql); // dataStoredService.execute(sql);
log.info("SUCCESS"); // log.info("SUCCESS");
} // }
//
// =============================================== 测试Dfs表查询 ====================================== // // =============================================== 测试Dfs表查询 ======================================
//
/** // /**
* 测试Dfs表查询 // * 测试Dfs表查询
*/ // */
@GetMapping(value = "/testSelectDfs") // @GetMapping(value = "/testSelectDfs")
public ResultVo<List<Map<String, Object>>> testSelectDfs() { // public ResultVo<List<Map<String, Object>>> testSelectDfs() {
String dbPath = "dfs://ZbDB"; // String dbPath = "dfs://ZbDB";
String sql = String.format("select * from loadTable('%s','%s')", dbPath, "ZbWaterMeter1Dfs"); // String sql = String.format("select * from loadTable('%s','%s')", dbPath, "ZbWaterMeter1Dfs");
return ResultVo.ok(dataQueryDfsService.selectList(sql)); // return ResultVo.ok(dataQueryDfsService.selectList(sql));
} // }
//
// =============================================== 测试订阅流表查询 ====================================== // // =============================================== 测试订阅流表查询 ======================================
//
/** // /**
* 测试订阅流表查询 // * 测试订阅流表查询
*/ // */
@GetMapping(value = "/testSelectStream") // @GetMapping(value = "/testSelectStream")
public ResultVo<List<Map<String, Object>>> testSelectStream() { // public ResultVo<List<Map<String, Object>>> testSelectStream() {
String sql = "select * from WaterMeterTset1Stream"; // String sql = "select * from WaterMeterTset1Stream";
return ResultVo.ok(dataQueryStreamService.selectList(sql)); // return ResultVo.ok(dataQueryStreamService.selectList(sql));
} // }
//
// =============================================== 给指定的流表增加列字段 ====================================== // // =============================================== 给指定的流表增加列字段 ======================================
//
/** // /**
* 测试给指定的流表增加列字段 // * 测试给指定的流表增加列字段
*/ // */
@GetMapping(value = "/testStreamAddColumn") // @GetMapping(value = "/testStreamAddColumn")
public void testStreamAddColumn() { // public void testStreamAddColumn() {
String tableName = "WaterMeterTsetStream"; // String tableName = "WaterMeterTsetStream";
String columnName = "test"; // String columnName = "test";
String columnType = "STRING"; // String columnType = "STRING";
String columnDesc = "test"; // String columnDesc = "test";
TableDTO tableDTO = new TableDTO(); // TableDTO tableDTO = new TableDTO();
tableDTO.setTableName(tableName); // tableDTO.setTableName(tableName);
tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc))); // tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc)));
try { // try {
tableStructureService.addStreamColumns(tableDTO); // tableStructureService.addStreamColumns(tableDTO);
log.info("SUCCESS"); // log.info("SUCCESS");
} catch (Exception e) { // } catch (Exception e) {
log.error("FAIL"); // log.error("FAIL");
e.printStackTrace(); // e.printStackTrace();
} // }
} // }
//
// =============================================== 给指定的Dfs表增加列字段 ====================================== // // =============================================== 给指定的Dfs表增加列字段 ======================================
/** // /**
* 测试给指定的Dfs表增加列字段 // * 测试给指定的Dfs表增加列字段
*/ // */
@GetMapping(value = "/testDfsAddColumn") // @GetMapping(value = "/testDfsAddColumn")
public void testDfsAddColumn() { // public void testDfsAddColumn() {
String tableName = "WaterMeterTsetDfs"; // String tableName = "WaterMeterTsetDfs";
String columnName = "test"; // String columnName = "test";
String columnType = "STRING"; // String columnType = "STRING";
String columnDesc = "test描述"; // String columnDesc = "test描述";
TableDTO tableDTO = new TableDTO(); // TableDTO tableDTO = new TableDTO();
tableDTO.setTableName(tableName); // tableDTO.setTableName(tableName);
tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc))); // tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc)));
try { // try {
tableStructureService.addDfsColumns(tableDTO); // tableStructureService.addDfsColumns(tableDTO);
log.info("SUCCESS"); // log.info("SUCCESS");
} catch (Exception e) { // } catch (Exception e) {
log.error("FAIL"); // log.error("FAIL");
e.printStackTrace(); // e.printStackTrace();
} // }
} // }
//
//
// ===================================== mqtt消息发送-测试数据入库 START ============================== // // ===================================== mqtt消息发送-测试数据入库 START ==============================
//
/** // /**
* 测试发送mqtt消息 // * 测试发送mqtt消息
*/ // */
@GetMapping(value = "/testSendMessage") // @GetMapping(value = "/testSendMessage")
public void testSendMessage() { // public void testSendMessage() {
for (int i = 0; i < 2; i++) { // for (int i = 0; i < 2; i++) {
List<Map<String, Object>> dataList = new ArrayList<>(); // List<Map<String, Object>> dataList = new ArrayList<>();
dataList.add(handleMapByIndex(i)); // dataList.add(handleMapByIndex(i));
DataAnalysisDTO dataAnalysisDTO = DataAnalysisDTO.builder().tableName( "WaterMeterTsetStream").dataList(dataList).build(); // DataAnalysisDTO dataAnalysisDTO = DataAnalysisDTO.builder().tableName( "WaterMeterTsetStream").dataList(dataList).build();
messageSender.send("iot/test1/in-storage", JacksonUtil.objectStr(dataAnalysisDTO)); // messageSender.send("iot/test1/in-storage", JacksonUtil.objectStr(dataAnalysisDTO));
} // }
} // }
//
/** // /**
* 根据索引生成测试数据 // * 根据索引生成测试数据
* @param index 索引 // * @param index 索引
* @return // * @return
*/ // */
private Map<String, Object> handleMapByIndex(int index) { // private Map<String, Object> handleMapByIndex(int index) {
Map<String, Object> map = new HashMap<>(); // Map<String, Object> map = new HashMap<>();
map.put("time", new Date()); // map.put("time", new Date());
map.put("projectId", "0jZU2102"); // map.put("projectId", "0jZU2102");
map.put("deviceId", "0jZU2102_0806_0011"); // map.put("deviceId", "0jZU2102_0806_0011");
map.put("WM_WFA", 124.656 + index); // map.put("WM_WFA", 124.656 + index);
map.put("WM_WFA_Unit", "m³"); // map.put("WM_WFA_Unit", "m³");
map.put("test", "test"); // map.put("test", "test");
return map; // return map;
} // }
//
// =============================================== 创建Dfs表 Stream流表 取消订阅 添加订阅 创建 ====================================== // // =============================================== 创建Dfs表 Stream流表 取消订阅 添加订阅 创建 ======================================
//
/** // /**
* 测试创建Dfs表 // * 测试创建Dfs表
*/ // */
@GetMapping(value = "/testCreateDfsTable") // @GetMapping(value = "/testCreateDfsTable")
public void testCreateDfsTable() { // public void testCreateDfsTable() {
TableDTO tableDTO = new TableDTO(); // TableDTO tableDTO = new TableDTO();
tableDTO.setDatabaseName("ZbDB"); // tableDTO.setDatabaseName("ZbDB");
tableDTO.setTableName("Test1"); // tableDTO.setTableName("Test1");
List<TableColumnDTO> tableColumnList = new ArrayList<>(); // List<TableColumnDTO> tableColumnList = new ArrayList<>();
tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); // tableColumnList.add(new TableColumnDTO("test", "STRING", "test"));
tableColumnList.add(new TableColumnDTO("test1", "STRING","test1")); // tableColumnList.add(new TableColumnDTO("test1", "STRING","test1"));
tableColumnList.add(new TableColumnDTO("test2", "STRING","test2")); // tableColumnList.add(new TableColumnDTO("test2", "STRING","test2"));
tableDTO.setTableColumnList(tableColumnList); // tableDTO.setTableColumnList(tableColumnList);
tableStructureService.createDfsTable(tableDTO); // tableStructureService.createDfsTable(tableDTO);
} // }
//
/** // /**
* 测试创建流表 // * 测试创建流表
*/ // */
@GetMapping(value = "/testCreateStreamTable") // @GetMapping(value = "/testCreateStreamTable")
public void testCreateStreamTable() { // public void testCreateStreamTable() {
TableDTO tableDTO = new TableDTO(); // TableDTO tableDTO = new TableDTO();
tableDTO.setDatabaseName("ZbDB"); // tableDTO.setDatabaseName("ZbDB");
tableDTO.setTableName("Test1"); // tableDTO.setTableName("Test1");
List<TableColumnDTO> tableColumnList = new ArrayList<>(); // List<TableColumnDTO> tableColumnList = new ArrayList<>();
tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); // tableColumnList.add(new TableColumnDTO("test", "STRING", "test"));
tableColumnList.add(new TableColumnDTO("test1", "STRING","test1")); // tableColumnList.add(new TableColumnDTO("test1", "STRING","test1"));
tableColumnList.add(new TableColumnDTO("test2", "STRING","test2")); // tableColumnList.add(new TableColumnDTO("test2", "STRING","test2"));
tableDTO.setTableColumnList(tableColumnList); // tableDTO.setTableColumnList(tableColumnList);
tableStructureService.createStreamTable(tableDTO); // tableStructureService.createStreamTable(tableDTO);
} // }
//
/** // /**
* 取消订阅流表 // * 取消订阅流表
*/ // */
@GetMapping(value = "/testUnsubscribeStreamTable") // @GetMapping(value = "/testUnsubscribeStreamTable")
public void testUnsubscribeStreamTable() { // public void testUnsubscribeStreamTable() {
TableDTO tableDTO = new TableDTO(); // TableDTO tableDTO = new TableDTO();
tableDTO.setDatabaseName("ZbDB"); // tableDTO.setDatabaseName("ZbDB");
tableDTO.setTableName("Test1"); // tableDTO.setTableName("Test1");
tableStructureService.unsubscribeStreamTable(tableDTO); // tableStructureService.unsubscribeStreamTable(tableDTO);
} // }
//
/** // /**
* 订阅流表 // * 订阅流表
*/ // */
@GetMapping(value = "/testSubscribeStreamTable") // @GetMapping(value = "/testSubscribeStreamTable")
public void testSubscribeStreamTable() { // public void testSubscribeStreamTable() {
TableDTO tableDTO = new TableDTO(); // TableDTO tableDTO = new TableDTO();
tableDTO.setDatabaseName("ZbDB"); // tableDTO.setDatabaseName("ZbDB");
tableDTO.setTableName("Test1"); // tableDTO.setTableName("Test1");
tableStructureService.subscribeStreamTable(tableDTO); // tableStructureService.subscribeStreamTable(tableDTO);
} // }
//
/** // /**
* 创建Dfs和Stream表并添加订阅 // * 创建Dfs和Stream表并添加订阅
*/ // */
@GetMapping(value = "/testCreateDfsAndStreamTableSubscribe") // @GetMapping(value = "/testCreateDfsAndStreamTableSubscribe")
public void testCreateDfsAndStreamTableSubscribe() { // public void testCreateDfsAndStreamTableSubscribe() {
TableDTO tableDTO = new TableDTO(); // TableDTO tableDTO = new TableDTO();
tableDTO.setDatabaseName("ZbDB"); // tableDTO.setDatabaseName("ZbDB");
tableDTO.setTableName("MyTe2"); // tableDTO.setTableName("MyTe2");
tableDTO.setTableDesc("MyTe2测试表"); // tableDTO.setTableDesc("MyTe2测试表");
List<TableColumnDTO> tableColumnList = new ArrayList<>(); // List<TableColumnDTO> tableColumnList = new ArrayList<>();
tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); // tableColumnList.add(new TableColumnDTO("test", "STRING", "test"));
tableDTO.setTableColumnList(tableColumnList); // tableDTO.setTableColumnList(tableColumnList);
tableStructureService.createDfsAndStreamTableSubscribe(tableDTO); // tableStructureService.createDfsAndStreamTableSubscribe(tableDTO);
} // }
} }

25
pom.xml

@ -13,6 +13,7 @@
<modules> <modules>
<module>data-framework</module> <module>data-framework</module>
<module>data-storage</module> <module>data-storage</module>
<module>data-storage-api</module>
</modules> </modules>
<dependencies> <dependencies>
@ -115,22 +116,16 @@
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>aliyunmaven</id>
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
<releases>
<!-- 使用稳定版本 -->
<enabled>true</enabled>
</releases>
<snapshots>
<!-- 使用快照版本 -->
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project> </project>

Loading…
Cancel
Save