diff --git a/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableColumnDTO.java b/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableColumnDTO.java index 6f45e04..fde5474 100644 --- a/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableColumnDTO.java +++ b/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableColumnDTO.java @@ -25,4 +25,11 @@ public class TableColumnDTO { * 列描述 */ private String columnDesc; + + public TableColumnDTO() { + } + public TableColumnDTO(String columnName, String columnType, String columnDesc) { + this.columnName = columnName; + this.columnType = columnType; + } } diff --git a/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableDTO.java b/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableDTO.java index 2f943ef..bfbeebf 100644 --- a/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableDTO.java +++ b/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableDTO.java @@ -18,6 +18,10 @@ import java.util.List; public class TableDTO { /** + * 数据库名 + */ + private String databaseName; + /** * 表名 */ private String tableName; @@ -26,4 +30,12 @@ public class TableDTO { */ private List tableColumnList; + public String handleTableColumnName(List tableColumnList) { + + return tableColumnList.stream().map(TableColumnDTO::getColumnName).reduce((a, b) -> a + "`" + b).get(); + } + + public String handleTableColumnType(List tableColumnList) { + return tableColumnList.stream().map(TableColumnDTO::getColumnType).reduce((a, b) -> a + "," + b).get(); + } } diff --git a/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableTemplateService.java b/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableTemplateService.java index 922e81d..6990963 100644 --- a/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableTemplateService.java +++ b/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableTemplateService.java @@ -1,5 +1,9 @@ package com.huaxing.data.tablemanagement.service; +import com.huaxing.data.tablemanagement.domain.TableDTO; + +import java.util.List; + /** * 表创建模板 * @author 孟剑 @@ -8,14 +12,16 @@ package com.huaxing.data.tablemanagement.service; public interface ITableTemplateService { // 创建dfs表模板 - String createDfsTableTemplate(String tableName, String columnName, String columnDefinition); + String createDfsTableTemplate(TableDTO tableDTO); // 创建流表模板 - String createStreamTableTemplate(String tableName, String columnName, String columnDefinition); + String createStreamTableTemplate(TableDTO tableDTO); // 取消订阅流表模板 - String unsubscribeStreamTableTemplate(String tableName, String columnName, String columnDefinition); + String unsubscribeStreamTableTemplate(String databaseName, String tableName); // 订阅流表模板 - String subscribeStreamTableTemplate(String tableName, String columnName, String columnDefinition); + String subscribeStreamTableTemplate(String databaseName, String tableName); + + String createDfsAndStreamTableSubscribe(TableDTO tableDTO); } diff --git a/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableTemplateServiceImpl.java b/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableTemplateServiceImpl.java index 4c1e0f0..747757b 100644 --- a/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableTemplateServiceImpl.java +++ b/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableTemplateServiceImpl.java @@ -1,6 +1,9 @@ package com.huaxing.data.tablemanagement.service.impl; +import com.huaxing.data.tablemanagement.domain.TableDTO; import com.huaxing.data.tablemanagement.service.ITableTemplateService; +import com.huaxing.dolphindb.base.CommonService; +import org.springframework.stereotype.Service; /** * @ProjectName: data-bridge @@ -11,8 +14,8 @@ import com.huaxing.data.tablemanagement.service.ITableTemplateService; * @Date: 2025/1/20 10:38 * @Version: 1.0 */ - -public class TableTemplateServiceImpl implements ITableTemplateService { +@Service +public class TableTemplateServiceImpl extends CommonService implements ITableTemplateService { /** * @Description 创建 DFS 表模板 @@ -21,9 +24,29 @@ public class TableTemplateServiceImpl implements ITableTemplateService { * @Version v1.0 **/ @Override - public String createDfsTableTemplate(String tableName, String columnName, String columnDefinition) { - - return null; + public String createDfsTableTemplate(TableDTO tableDTO) { + String sql = ""; + String colNames = tableDTO.handleTableColumnName(tableDTO.getTableColumnList()); + String colTypes = tableDTO.handleTableColumnType(tableDTO.getTableColumnList()); + sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" + + "ptName = \"" + tableDTO.getTableName() + "Dfs\"\n" + + "def createPT(dbPath,ptName){\n" + + " db = database(dbPath)\n" + + " if (existsTable(dbPath, ptName)){\n" + + " pt = loadTable(db,ptName)\n" + + " return pt\n" + + " }\n" + + " else{\n" + + " colNames = `time`projectId`deviceId`" + colNames + "\n" + + " colTypes = [DATETIME,STRING,STRING," + colTypes + "]\n" + + " t = table(1:0, colNames, colTypes)\n" + + " pt = db.createPartitionedTable(table=t, tableName=ptName, partitionColumns=`time, sortColumns=`deviceId`time, keepDuplicates=LAST)\n" + + " return pt\n" + + " }\n" + + "}\n" + + "createPT(dbPath,ptName);"; + exec(sql); + return sql; } /** @@ -33,8 +56,27 @@ public class TableTemplateServiceImpl implements ITableTemplateService { * @Version v1.0 **/ @Override - public String createStreamTableTemplate(String tableName, String columnName, String columnDefinition) { - return null; + public String createStreamTableTemplate(TableDTO tableDTO) { + String sql = ""; + String colNames = tableDTO.handleTableColumnName(tableDTO.getTableColumnList()); + String colTypes = tableDTO.handleTableColumnType(tableDTO.getTableColumnList()); + sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" + + "stName = \"" + tableDTO.getTableName() + "Stream\"\n" + + "def createST(stName){\n" + + " if(not existsStreamTable(stName)){\n" + + " colNames = `time`projectId`deviceId`" + colNames + "\n" + + " colTypes = [DATETIME,STRING,STRING,"+ colTypes +"]\n" + + " st = streamTable(150000:0, colNames, colTypes)\n" + + " enableTableShareAndPersistence(table=st, tableName=stName, retentionMinutes=10, cacheSize=100000, preCache=100000 )\n" + + " return st\n" + + " }\n" + + " else{\n" + + " return objByName(stName, true)\n" + + " }\n" + + "}\n" + + "createST(stName);"; + exec(sql); + return sql; } /** @@ -44,8 +86,14 @@ public class TableTemplateServiceImpl implements ITableTemplateService { * @Version v1.0 **/ @Override - public String unsubscribeStreamTableTemplate(String tableName, String columnName, String columnDefinition) { - return null; + public String unsubscribeStreamTableTemplate(String databaseName, String tableName) { + String sql = ""; + sql = "dbPath = \"dfs://" + databaseName + "\"\n" + + "stName = \"" + tableName + "Stream\"\n" + + "ptName = \"" + tableName + "Dfs\"\n" + + "unsubscribeTable(tableName=stName, actionName=`" + tableName + "ChgTime);"; + exec(sql); + return sql; } /** @@ -55,7 +103,26 @@ public class TableTemplateServiceImpl implements ITableTemplateService { * @Version v1.0 **/ @Override - public String subscribeStreamTableTemplate(String tableName, String columnName, String columnDefinition) { + public String subscribeStreamTableTemplate(String databaseName, String tableName) { + String sql = ""; + sql = "dbPath = \"dfs://" + databaseName + "\"\n" + + "stName = \"" + tableName + "Stream\"\n" + + "ptName = \"" + tableName + "Dfs\"\n" + + "subscribeTable(tableName=stName, actionName=`" + tableName + "ChgTime, offset=-1, handler=loadTable(dbPath,ptName), msgAsTable=true);"; + exec(sql); + return sql; + } + + @Override + public String createDfsAndStreamTableSubscribe(TableDTO tableDTO) { + // 执行创建 DFS 表模板 + createDfsTableTemplate(tableDTO); + // 执行创建流表模板 + createStreamTableTemplate(tableDTO); + // 取消订阅流表模板 + unsubscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName()); + // 订阅流表模板 + subscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName()); return null; } } diff --git a/data-storage/src/main/java/com/huaxing/test/TestController.java b/data-storage/src/main/java/com/huaxing/test/TestController.java index 318b9c8..6125a4a 100644 --- a/data-storage/src/main/java/com/huaxing/test/TestController.java +++ b/data-storage/src/main/java/com/huaxing/test/TestController.java @@ -4,12 +4,15 @@ import com.huaxing.data.storage.domain.DataAnalysisDTO; import com.huaxing.data.storage.service.IDeviceDataQueryDfsService; import com.huaxing.data.storage.service.IDeviceDataQueryStreamService; import com.huaxing.data.storage.service.IDeviceDataStoredService; +import com.huaxing.data.tablemanagement.domain.TableColumnDTO; +import com.huaxing.data.tablemanagement.domain.TableDTO; import com.huaxing.data.tablemanagement.service.ITableStructureService; import com.huaxing.common.util.JacksonUtil; +import com.huaxing.data.tablemanagement.service.ITableTemplateService; import com.huaxing.mqtt.processor.MqttMessageSender; import com.huaxing.common.result.ResultVo; -import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -32,31 +35,43 @@ import java.util.*; @SuppressWarnings("all") public class TestController { - @Resource + @Autowired private MqttMessageSender messageSender; final IDeviceDataStoredService dataStoredService; final IDeviceDataQueryDfsService dataQueryDfsService; final IDeviceDataQueryStreamService dataQueryStreamService; final ITableStructureService tableStructureService; -// // 依赖注入 - public TestController(IDeviceDataStoredService dataStoredService, IDeviceDataQueryDfsService dataQueryDfsService, IDeviceDataQueryStreamService dataQueryStreamService, ITableStructureService tableStructureService) { + final ITableTemplateService tableTemplateService; + + // =============================================== 测试构造器 ====================================== + public TestController(IDeviceDataStoredService dataStoredService, IDeviceDataQueryDfsService dataQueryDfsService, IDeviceDataQueryStreamService dataQueryStreamService, ITableStructureService tableStructureService, ITableTemplateService tableTemplateService) { this.dataStoredService = dataStoredService; this.dataQueryDfsService = dataQueryDfsService; this.dataQueryStreamService = dataQueryStreamService; this.tableStructureService = tableStructureService; + this.tableTemplateService = tableTemplateService; } // =============================================== 测试插入数据 ====================================== - @GetMapping(value = "/testInsert") // 成功 + + /** + * 测试数据插入 + */ + @GetMapping(value = "/testInsert") public void testInsert() { - String sql = "INSERT INTO ZbWaterMeter1Stream (time, projectId, deviceId, WM_WFA, WM_WFA_Unit) VALUES (2024.11.01 00:00:00,'48', '0jZU2102_0806_0011', 124.656, 'm³')"; + String tableName = "Test3Stream"; + String sql = "INSERT INTO " + tableName + " (time, projectId, deviceId, test, test1, test2) VALUES (2025.01.11 00:00:00,'48', '0jZU2102_0806_0011', 'testVal', 'test1Val', 'test2Val')"; dataStoredService.execute(sql); log.info("SUCCESS"); } // =============================================== 测试Dfs表查询 ====================================== + + /** + * 测试Dfs表查询 + */ @GetMapping(value = "/testSelectDfs") public ResultVo>> testSelectDfs() { String dbPath = "dfs://ZbDB"; @@ -65,6 +80,10 @@ public class TestController { } // =============================================== 测试订阅流表查询 ====================================== + + /** + * 测试订阅流表查询 + */ @GetMapping(value = "/testSelectStream") public ResultVo>> testSelectStream() { String sql = "select * from WaterMeterTset1Stream"; @@ -72,6 +91,10 @@ public class TestController { } // =============================================== 给指定的流表增加列字段 ====================================== + + /** + * 测试给指定的流表增加列字段 + */ @GetMapping(value = "/testStreamAddColumn") public void testStreamAddColumn() { String tableName = "WaterMeterTsetStream"; @@ -87,6 +110,9 @@ public class TestController { } // =============================================== 给指定的Dfs表增加列字段 ====================================== + /** + * 测试给指定的Dfs表增加列字段 + */ @GetMapping(value = "/testDfsAddColumn") public void testDfsAddColumn() { String tableName = "WaterMeterTsetDfs"; @@ -103,6 +129,10 @@ public class TestController { // ===================================== mqtt消息发送-测试数据入库 START ============================== + + /** + * 测试发送mqtt消息 + */ @GetMapping(value = "/testSendMessage") public void testSendMessage() { for (int i = 0; i < 2; i++) { @@ -113,6 +143,11 @@ public class TestController { } } + /** + * 根据索引生成测试数据 + * @param index 索引 + * @return + */ private Map handleMapByIndex(int index) { Map map = new HashMap<>(); map.put("time", new Date()); @@ -124,9 +159,75 @@ public class TestController { return map; } - // =============================================== TODO 创建Dfs表 Stream流表 创建 ====================================== + // =============================================== 创建Dfs表 Stream流表 取消订阅 添加订阅 创建 ====================================== + + /** + * 测试创建Dfs表 + */ + @GetMapping(value = "/testCreateDfsTable") + public void testCreateDfsTable() { + TableDTO tableDTO = new TableDTO(); + tableDTO.setDatabaseName("ZbDB"); + tableDTO.setTableName("Test1"); + List tableColumnList = new ArrayList<>(); + tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); + tableColumnList.add(new TableColumnDTO("test1", "STRING","test1")); + tableColumnList.add(new TableColumnDTO("test2", "STRING","test2")); + tableDTO.setTableColumnList(tableColumnList); + System.out.println(tableTemplateService.createDfsTableTemplate(tableDTO)); + } + /** + * 测试创建流表 + */ + @GetMapping(value = "/testCreateStreamTable") + public void testCreateStreamTable() { + TableDTO tableDTO = new TableDTO(); + tableDTO.setDatabaseName("ZbDB"); + tableDTO.setTableName("Test1"); + List tableColumnList = new ArrayList<>(); + tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); + tableColumnList.add(new TableColumnDTO("test1", "STRING","test1")); + tableColumnList.add(new TableColumnDTO("test2", "STRING","test2")); + tableDTO.setTableColumnList(tableColumnList); + System.out.println(tableTemplateService.createStreamTableTemplate(tableDTO)); + } + + /** + * 取消订阅流表 + */ + @GetMapping(value = "/testUnsubscribeStreamTable") + public void testUnsubscribeStreamTable() { + String databaseName = "ZbDB"; + String tableName = "Test"; + System.out.println(tableTemplateService.unsubscribeStreamTableTemplate(databaseName, tableName)); + } + /** + * 订阅流表 + */ + @GetMapping(value = "/testSubscribeStreamTable") + public void testSubscribeStreamTable() { + String databaseName = "ZbDB"; + String tableName = "Test"; + System.out.println(tableTemplateService.subscribeStreamTableTemplate(databaseName, tableName)); + } + + /** + * 创建Dfs和Stream表并添加订阅 + */ + @GetMapping(value = "/testCreateDfsAndStreamTableSubscribe") + public void testCreateDfsAndStreamTableSubscribe() { + TableDTO tableDTO = new TableDTO(); + tableDTO.setDatabaseName("ZbDB"); + tableDTO.setTableName("Test3"); + List tableColumnList = new ArrayList<>(); + tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); + tableColumnList.add(new TableColumnDTO("test1", "STRING","test1")); + tableColumnList.add(new TableColumnDTO("test2", "STRING","test2")); + tableDTO.setTableColumnList(tableColumnList); + System.out.println(tableTemplateService.createDfsAndStreamTableSubscribe(tableDTO)); + } }