From 0b911d2642e00b2aeaff4f74df7890db0cf368af Mon Sep 17 00:00:00 2001 From: swordmeng Date: Thu, 13 Feb 2025 16:05:27 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E5=BB=BA=E5=BA=93?= =?UTF-8?q?=E3=80=81=E5=BB=BA=E8=A1=A8=E3=80=81=E8=AE=A2=E9=98=85=E3=80=81?= =?UTF-8?q?=E5=AD=97=E6=AE=B5=E5=A4=84=E7=90=86=E7=AD=89=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E7=BC=96=E5=86=99=EF=BC=8C=E9=80=82=E9=85=8D?= =?UTF-8?q?feign=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/huaxing/common/result/ResultVo.java | 3 + .../com/huaxing/dolphindb/base/CommonService.java | 25 ++- data-storage-api/pom.xml | 5 + .../com/huaxing/feign/IDatabaseClientFeign.java | 4 +- .../fallback/DatabaseFeignFallbackFactory.java | 4 +- .../java/com/huaxing/pojo/dto/DatabaseDTO.java | 18 ++ .../java/com/huaxing/pojo/dto/TableColumnDTO.java | 42 ++++ .../main/java/com/huaxing/pojo/dto/TableDTO.java | 93 ++++++++ .../java/com/huaxing/pojo/entity/DatabaseDTO.java | 18 -- .../com/huaxing/pojo/entity/TableColumnDTO.java | 36 ---- .../java/com/huaxing/pojo/entity/TableDTO.java | 16 -- data-storage/pom.xml | 5 + .../database/controller/DatabaseController.java | 39 ++-- .../controller/DatabaseTestController.java | 240 --------------------- .../com/huaxing/data/database/domain/TableDTO.java | 66 ------ .../data/database/service/IDatabaseService.java | 5 +- .../database/service/ITableStructureService.java | 25 ++- .../database/service/impl/DatabaseServiceImpl.java | 33 ++- .../service/impl/TableStructureService.java | 78 +++++-- .../database/template/ISqlTemplateService.java | 19 +- .../database/template/SqlTemplateServiceImpl.java | 83 +++++-- 21 files changed, 387 insertions(+), 470 deletions(-) create mode 100644 data-storage-api/src/main/java/com/huaxing/pojo/dto/DatabaseDTO.java create mode 100644 data-storage-api/src/main/java/com/huaxing/pojo/dto/TableColumnDTO.java create mode 100644 data-storage-api/src/main/java/com/huaxing/pojo/dto/TableDTO.java delete mode 100644 data-storage-api/src/main/java/com/huaxing/pojo/entity/DatabaseDTO.java delete mode 100644 data-storage-api/src/main/java/com/huaxing/pojo/entity/TableColumnDTO.java delete mode 100644 data-storage-api/src/main/java/com/huaxing/pojo/entity/TableDTO.java delete mode 100644 data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseTestController.java delete mode 100644 data-storage/src/main/java/com/huaxing/data/database/domain/TableDTO.java diff --git a/data-framework/src/main/java/com/huaxing/common/result/ResultVo.java b/data-framework/src/main/java/com/huaxing/common/result/ResultVo.java index 582ac8f..2c7f8b3 100644 --- a/data-framework/src/main/java/com/huaxing/common/result/ResultVo.java +++ b/data-framework/src/main/java/com/huaxing/common/result/ResultVo.java @@ -54,4 +54,7 @@ public class ResultVo implements Serializable { public static ResultVo status(boolean flag) { return flag ? ResultVo.ok() : ResultVo.fail("操作失败!"); } + public static ResultVo status(boolean flag,String successMsg,String failMsg) { + return flag ? ResultVo.ok(successMsg, null) : ResultVo.fail(failMsg); + } } diff --git a/data-framework/src/main/java/com/huaxing/dolphindb/base/CommonService.java b/data-framework/src/main/java/com/huaxing/dolphindb/base/CommonService.java index 395971e..3306798 100644 --- a/data-framework/src/main/java/com/huaxing/dolphindb/base/CommonService.java +++ b/data-framework/src/main/java/com/huaxing/dolphindb/base/CommonService.java @@ -1,7 +1,10 @@ package com.huaxing.dolphindb.base; +import com.huaxing.common.result.ResultVo; import com.huaxing.dolphindb.connection.AbstractDbConnector; import com.xxdb.DBConnection; +import com.xxdb.data.BasicBoolean; +import com.xxdb.data.Entity; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -45,19 +48,18 @@ public class CommonService extends SqlConverterStatementHandle { * * @param sql */ - public boolean exec(String sql) { - boolean isSuccess = false; + public Entity exec(String sql) { + Entity result = null; DBConnection connection = dbPool.getConnection(); try { - connection.run(sql); - isSuccess = true; + result = connection.run(sql); } catch (IOException e) { log.error("AbstractDbConnector.exec() Method执行异常:{}", e.getMessage()); throw new RuntimeException(e); } finally { connection.close(); } - return isSuccess; + return result; } /** @@ -77,4 +79,17 @@ public class CommonService extends SqlConverterStatementHandle { connection.close(); } + /** + * 解析bool返回值 + * @param result + * @return + */ + public boolean parseBoolResult(Entity result) { + boolean dbExists = false; + if (result instanceof BasicBoolean) { + dbExists = ((BasicBoolean) result).getBoolean(); + } + return dbExists; + } + } diff --git a/data-storage-api/pom.xml b/data-storage-api/pom.xml index a7c9196..5a5ff70 100644 --- a/data-storage-api/pom.xml +++ b/data-storage-api/pom.xml @@ -36,6 +36,11 @@ spring-cloud-context 3.1.4 + + jakarta.validation + jakarta.validation-api + 2.0.2 + \ No newline at end of file diff --git a/data-storage-api/src/main/java/com/huaxing/feign/IDatabaseClientFeign.java b/data-storage-api/src/main/java/com/huaxing/feign/IDatabaseClientFeign.java index 70401ed..b825ee4 100644 --- a/data-storage-api/src/main/java/com/huaxing/feign/IDatabaseClientFeign.java +++ b/data-storage-api/src/main/java/com/huaxing/feign/IDatabaseClientFeign.java @@ -3,8 +3,8 @@ package com.huaxing.feign; import com.huaxing.common.constant.AppConstant; import com.huaxing.common.result.ResultVo; import com.huaxing.feign.fallback.DatabaseFeignFallbackFactory; -import com.huaxing.pojo.entity.DatabaseDTO; -import com.huaxing.pojo.entity.TableDTO; +import com.huaxing.pojo.dto.DatabaseDTO; +import com.huaxing.pojo.dto.TableDTO; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.PostMapping; diff --git a/data-storage-api/src/main/java/com/huaxing/feign/fallback/DatabaseFeignFallbackFactory.java b/data-storage-api/src/main/java/com/huaxing/feign/fallback/DatabaseFeignFallbackFactory.java index 2264bca..65f6da6 100644 --- a/data-storage-api/src/main/java/com/huaxing/feign/fallback/DatabaseFeignFallbackFactory.java +++ b/data-storage-api/src/main/java/com/huaxing/feign/fallback/DatabaseFeignFallbackFactory.java @@ -2,8 +2,8 @@ package com.huaxing.feign.fallback; import com.huaxing.common.result.ResultVo; import com.huaxing.feign.IDatabaseClientFeign; -import com.huaxing.pojo.entity.DatabaseDTO; -import com.huaxing.pojo.entity.TableDTO; +import com.huaxing.pojo.dto.DatabaseDTO; +import com.huaxing.pojo.dto.TableDTO; import feign.hystrix.FallbackFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/data-storage-api/src/main/java/com/huaxing/pojo/dto/DatabaseDTO.java b/data-storage-api/src/main/java/com/huaxing/pojo/dto/DatabaseDTO.java new file mode 100644 index 0000000..dfe5ca6 --- /dev/null +++ b/data-storage-api/src/main/java/com/huaxing/pojo/dto/DatabaseDTO.java @@ -0,0 +1,18 @@ +package com.huaxing.pojo.dto; + +import lombok.Data; + +/** + * @ProjectName: data-bridge + * @Package: com.huaxing.pojo.entity + * @ClassName: DatabaseDTO + * @Author: swordmeng8@163.com + * @Description: 数据库建库接参对象 + * @Date: 2025/2/12 17:25 + * @Version: 1.0 + */ +@Data +public class DatabaseDTO { + private String databaseName; + private String databaseComment; +} diff --git a/data-storage-api/src/main/java/com/huaxing/pojo/dto/TableColumnDTO.java b/data-storage-api/src/main/java/com/huaxing/pojo/dto/TableColumnDTO.java new file mode 100644 index 0000000..8d60919 --- /dev/null +++ b/data-storage-api/src/main/java/com/huaxing/pojo/dto/TableColumnDTO.java @@ -0,0 +1,42 @@ +package com.huaxing.pojo.dto; + +import lombok.Data; + +import javax.validation.constraints.NotEmpty; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.database.domain + * @ClassName: TableColumnDTO + * @Author: swordmeng8@163.com + * @Description: 表列DTO + * @Date: 2025/1/15 16:05 + * @Version: 1.0 + */ +@Data +public class TableColumnDTO { + /** + * 列名 + */ + @NotEmpty(message = "列名不能为空") + private String columnName; + + /** + * 列类型 + */ + @NotEmpty(message = "列类型") + private String columnType; + + /** + * 列描述 + */ + private String columnDesc; + + public TableColumnDTO() { + } + public TableColumnDTO(String columnName, String columnType, String columnDesc) { + this.columnName = columnName; + this.columnType = columnType; + this.columnDesc = columnDesc; + } +} diff --git a/data-storage-api/src/main/java/com/huaxing/pojo/dto/TableDTO.java b/data-storage-api/src/main/java/com/huaxing/pojo/dto/TableDTO.java new file mode 100644 index 0000000..20e2d06 --- /dev/null +++ b/data-storage-api/src/main/java/com/huaxing/pojo/dto/TableDTO.java @@ -0,0 +1,93 @@ +package com.huaxing.pojo.dto; + +import lombok.Data; + +import javax.validation.constraints.NotEmpty; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +/** + * @ProjectName: data-bridge + * @Package: com.huaxing.pojo.entity + * @ClassName: TableDTO + * @Author: swordmeng8@163.com + * @Description: 表结构接参类 + * @Date: 2025/2/12 17:40 + * @Version: 1.0 + */ +@Data +public class TableDTO { + /** + * 数据库名 + */ + @NotEmpty(message = "数据库名不能为空") + private String databaseName; + + /** + * dfs表名 + */ + @NotEmpty(message = "Dfs表名不能为空") + private String dfsTableName; + + /** + * stream表名 + */ + @NotEmpty(message = "Stream表名不能为空") + private String streamTableName; + + /** + * 表描述 + */ + private String tableDesc; + + /** + * 表列信息 + */ + private List tableColumnList; + + /** + * 获取列名的拼接字符串并返回 + * + * @param tableColumnList + * @return String + */ + public String handleTableColumnName(List tableColumnList) { + return tableColumnList.stream().map(TableColumnDTO::getColumnName).reduce((a, b) -> a + "`" + b).get(); + } + + /** + * 获取列类型的拼接字符串并返回 + * + * @param tableColumnList + * @return String + */ + public String handleTableColumnType(List tableColumnList) { + return tableColumnList.stream().map(TableColumnDTO::getColumnType).reduce((a, b) -> a + "," + b).get(); + } + + /** + * 获取列与描述的拼接字符串并返回 + * + * @param tableColumnList + * @return String + */ + public String handleTableColumnDesc(List tableColumnList) { + StringBuilder columnDesc = new StringBuilder(); + // 获取列与描述的拼接字符串并返回 + Iterator iterator = tableColumnList.iterator(); + while (iterator.hasNext()) { + TableColumnDTO tableColumn = iterator.next(); + if (Objects.isNull(tableColumn.getColumnDesc())) { + columnDesc.append(tableColumn.getColumnName()).append(":").append("\"-\""); + } else { + columnDesc.append(tableColumn.getColumnName()).append(":\"").append(tableColumn.getColumnDesc()).append("\""); + } + if (iterator.hasNext()) { + columnDesc.append(","); + } + } + return columnDesc.toString(); + } + +} diff --git a/data-storage-api/src/main/java/com/huaxing/pojo/entity/DatabaseDTO.java b/data-storage-api/src/main/java/com/huaxing/pojo/entity/DatabaseDTO.java deleted file mode 100644 index 962c2a7..0000000 --- a/data-storage-api/src/main/java/com/huaxing/pojo/entity/DatabaseDTO.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.huaxing.pojo.entity; - -import lombok.Data; - -/** - * @ProjectName: data-bridge - * @Package: com.huaxing.pojo.entity - * @ClassName: DatabaseDTO - * @Author: swordmeng8@163.com - * @Description: 数据库建库接参对象 - * @Date: 2025/2/12 17:25 - * @Version: 1.0 - */ -@Data -public class DatabaseDTO { - private String databaseName; - private String databaseComment; -} diff --git a/data-storage-api/src/main/java/com/huaxing/pojo/entity/TableColumnDTO.java b/data-storage-api/src/main/java/com/huaxing/pojo/entity/TableColumnDTO.java deleted file mode 100644 index 18ac12e..0000000 --- a/data-storage-api/src/main/java/com/huaxing/pojo/entity/TableColumnDTO.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.huaxing.pojo.entity; - -import lombok.Data; - -/** - * @ProjectName: iot-data-bridge - * @Package: com.huaxing.data.database.domain - * @ClassName: TableColumnDTO - * @Author: swordmeng8@163.com - * @Description: 表列DTO - * @Date: 2025/1/15 16:05 - * @Version: 1.0 - */ -@Data -public class TableColumnDTO { - /** - * 列名 - */ - private String columnName; - /** - * 列类型 - */ - private String columnType; - /** - * 列描述 - */ - private String columnDesc; - - public TableColumnDTO() { - } - public TableColumnDTO(String columnName, String columnType, String columnDesc) { - this.columnName = columnName; - this.columnType = columnType; - this.columnDesc = columnDesc; - } -} diff --git a/data-storage-api/src/main/java/com/huaxing/pojo/entity/TableDTO.java b/data-storage-api/src/main/java/com/huaxing/pojo/entity/TableDTO.java deleted file mode 100644 index 5e780b9..0000000 --- a/data-storage-api/src/main/java/com/huaxing/pojo/entity/TableDTO.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.huaxing.pojo.entity; - -import lombok.Data; - -/** - * @ProjectName: data-bridge - * @Package: com.huaxing.pojo.entity - * @ClassName: TableDTO - * @Author: swordmeng8@163.com - * @Description: 表结构接参类 - * @Date: 2025/2/12 17:40 - * @Version: 1.0 - */ -@Data -public class TableDTO { -} diff --git a/data-storage/pom.xml b/data-storage/pom.xml index 1b94c8c..0f143f7 100644 --- a/data-storage/pom.xml +++ b/data-storage/pom.xml @@ -59,6 +59,11 @@ 0.0.1-SNAPSHOT compile + + jakarta.validation + jakarta.validation-api + 2.0.2 + diff --git a/data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseController.java b/data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseController.java index a0790d0..4bc9837 100644 --- a/data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseController.java +++ b/data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseController.java @@ -1,8 +1,11 @@ package com.huaxing.data.database.controller; import com.huaxing.common.result.ResultVo; -import com.huaxing.pojo.entity.DatabaseDTO; -import com.huaxing.pojo.entity.TableDTO; +import com.huaxing.data.database.service.IDatabaseService; +import com.huaxing.data.database.service.ITableStructureService; +import com.huaxing.pojo.dto.DatabaseDTO; +import com.huaxing.pojo.dto.TableDTO; +import lombok.AllArgsConstructor; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; @@ -15,8 +18,13 @@ import org.springframework.web.bind.annotation.RestController; * @author YourName */ @RestController +@AllArgsConstructor @RequestMapping("/api/database") public class DatabaseController { + /** 表结构 */ + private final ITableStructureService tableStructureService; + /** 数据库 */ + private final IDatabaseService databaseService; /** * 创建数据库 @@ -25,8 +33,7 @@ public class DatabaseController { */ @PostMapping("/create") public ResultVo createDatabase(@RequestBody @Validated DatabaseDTO databaseDTO) { - // 这里添加创建数据库的具体逻辑 - return ResultVo.ok(); + return databaseService.createDatabase(databaseDTO); } /** @@ -36,8 +43,7 @@ public class DatabaseController { */ @PostMapping("/table/create/dfs") public ResultVo createDfsTable(@RequestBody @Validated TableDTO tableDTO) { - // 这里添加创建 dfs 表的具体逻辑 - return ResultVo.ok(); + return tableStructureService.createDfsTable(tableDTO); } /** @@ -47,8 +53,7 @@ public class DatabaseController { */ @PostMapping("/table/create/stream") public ResultVo createStreamTable(@RequestBody @Validated TableDTO tableDTO) { - // 这里添加创建 stream 表的具体逻辑 - return ResultVo.ok(); + return tableStructureService.createStreamTable(tableDTO); } /** @@ -58,8 +63,7 @@ public class DatabaseController { */ @PostMapping("/table/create/dfs/stream/subscribe") public ResultVo createDfsAndStreamTableSubscribe(@RequestBody @Validated TableDTO tableDTO) { - // 这里添加创建 dfs 和 stream 表并订阅的具体逻辑 - return ResultVo.ok(); + return tableStructureService.createDfsAndStreamTableSubscribe(tableDTO); } /** @@ -69,8 +73,7 @@ public class DatabaseController { */ @PostMapping("/table/create/dfs/stream/unsubscribe") public ResultVo createDfsAndStreamTableUnsubscribe(@RequestBody @Validated TableDTO tableDTO) { - // 这里添加创建 dfs 和 stream 表不订阅的具体逻辑 - return ResultVo.ok(); + return tableStructureService.createDfsAndStreamTableUnsubscribe(tableDTO); } /** @@ -80,8 +83,7 @@ public class DatabaseController { */ @PostMapping("/table/subscribe/stream") public ResultVo subscribeStreamTable(@RequestBody @Validated TableDTO tableDTO) { - // 这里添加订阅流表的具体逻辑 - return ResultVo.ok(); + return tableStructureService.subscribeStreamTable(tableDTO); } /** @@ -91,8 +93,7 @@ public class DatabaseController { */ @PostMapping("/table/unsubscribe/stream") public ResultVo unsubscribeStreamTable(@RequestBody @Validated TableDTO tableDTO) { - // 这里添加取消订阅流表的具体逻辑 - return ResultVo.ok(); + return tableStructureService.unsubscribeStreamTable(tableDTO); } /** @@ -102,8 +103,7 @@ public class DatabaseController { */ @PostMapping("/table/add/dfs/columns") public ResultVo addDfsColumns(@RequestBody @Validated TableDTO tableDTO) { - // 这里添加新增 dfs 表字段的具体逻辑 - return ResultVo.ok(); + return tableStructureService.addDfsColumns(tableDTO); } /** @@ -113,7 +113,6 @@ public class DatabaseController { */ @PostMapping("/table/add/stream/columns") public ResultVo addStreamColumns(@RequestBody @Validated TableDTO tableDTO) { - // 这里添加新增 stream 表字段的具体逻辑 - return ResultVo.ok(); + return tableStructureService.addStreamColumns(tableDTO); } } \ No newline at end of file diff --git a/data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseTestController.java b/data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseTestController.java deleted file mode 100644 index 53be75f..0000000 --- a/data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseTestController.java +++ /dev/null @@ -1,240 +0,0 @@ -package com.huaxing.data.database.controller; - -import com.huaxing.data.storage.domain.DataAnalysisDTO; -import com.huaxing.data.storage.service.IDeviceDataQueryDfsService; -import com.huaxing.data.storage.service.IDeviceDataQueryStreamService; -import com.huaxing.data.storage.service.IDeviceDataStoredService; -import com.huaxing.data.database.domain.TableDTO; -import com.huaxing.data.database.service.ITableStructureService; -import com.huaxing.common.util.JacksonUtil; -import com.huaxing.data.database.template.ISqlTemplateService; -import com.huaxing.mqtt.processor.MqttMessageSender; -import com.huaxing.common.result.ResultVo; -import com.huaxing.pojo.entity.TableColumnDTO; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -import java.util.*; - - -/** - * @ProjectName: iot-data-bridge - * @Package: com.huaxing.data.storage.controller - * @ClassName: TestController - * @Author: swordmeng8@163.com - * @Description: 测试controller - * @Date: 2025/1/15 17:08 - * @Version: 1.0 - */ -@Slf4j -@RestController -@RequestMapping("/api/database") -@SuppressWarnings("all") -public class DatabaseTestController { - - @Autowired - private MqttMessageSender messageSender; - final IDeviceDataStoredService dataStoredService; - final IDeviceDataQueryDfsService dataQueryDfsService; - final IDeviceDataQueryStreamService dataQueryStreamService; - final ITableStructureService tableStructureService; - - - // =============================================== 测试构造器 ====================================== - public DatabaseTestController(IDeviceDataStoredService dataStoredService, IDeviceDataQueryDfsService dataQueryDfsService, IDeviceDataQueryStreamService dataQueryStreamService, ITableStructureService tableStructureService, ISqlTemplateService tableTemplateService) { - this.dataStoredService = dataStoredService; - this.dataQueryDfsService = dataQueryDfsService; - this.dataQueryStreamService = dataQueryStreamService; - this.tableStructureService = tableStructureService; - } - - - // =============================================== 测试插入数据 ====================================== - - /** - * 测试数据插入 - */ - @GetMapping(value = "/testInsert") - public void testInsert() { - String tableName = "MyTable2Stream"; - String sql = "INSERT INTO " + tableName + " (time, projectId, deviceId, test) VALUES (2025.01.11 00:00:00,'48', '0jZU2102_0806_0011', 'testVal')"; - dataStoredService.execute(sql); - log.info("SUCCESS"); - } - - // =============================================== 测试Dfs表查询 ====================================== - - /** - * 测试Dfs表查询 - */ - @GetMapping(value = "/testSelectDfs") - public ResultVo>> testSelectDfs() { - String dbPath = "dfs://ZbDB"; - String sql = String.format("select * from loadTable('%s','%s')", dbPath, "ZbWaterMeter1Dfs"); - return ResultVo.ok(dataQueryDfsService.selectList(sql)); - } - - // =============================================== 测试订阅流表查询 ====================================== - - /** - * 测试订阅流表查询 - */ - @GetMapping(value = "/testSelectStream") - public ResultVo>> testSelectStream() { - String sql = "select * from WaterMeterTset1Stream"; - return ResultVo.ok(dataQueryStreamService.selectList(sql)); - } - - // =============================================== 给指定的流表增加列字段 ====================================== - - /** - * 测试给指定的流表增加列字段 - */ - @GetMapping(value = "/testStreamAddColumn") - public void testStreamAddColumn() { - String tableName = "WaterMeterTsetStream"; - String columnName = "test"; - String columnType = "STRING"; - String columnDesc = "test"; - TableDTO tableDTO = new TableDTO(); - tableDTO.setTableName(tableName); - tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc))); - try { - tableStructureService.addStreamColumns(tableDTO); - log.info("SUCCESS"); - } catch (Exception e) { - log.error("FAIL"); - e.printStackTrace(); - } - } - - // =============================================== 给指定的Dfs表增加列字段 ====================================== - /** - * 测试给指定的Dfs表增加列字段 - */ - @GetMapping(value = "/testDfsAddColumn") - public void testDfsAddColumn() { - String tableName = "WaterMeterTsetDfs"; - String columnName = "test"; - String columnType = "STRING"; - String columnDesc = "test描述"; - TableDTO tableDTO = new TableDTO(); - tableDTO.setTableName(tableName); - tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc))); - try { - tableStructureService.addDfsColumns(tableDTO); - log.info("SUCCESS"); - } catch (Exception e) { - log.error("FAIL"); - e.printStackTrace(); - } - } - - - // ===================================== mqtt消息发送-测试数据入库 START ============================== - - /** - * 测试发送mqtt消息 - */ - @GetMapping(value = "/testSendMessage") - public void testSendMessage() { - for (int i = 0; i < 2; i++) { - List> dataList = new ArrayList<>(); - dataList.add(handleMapByIndex(i)); - DataAnalysisDTO dataAnalysisDTO = DataAnalysisDTO.builder().tableName( "WaterMeterTsetStream").dataList(dataList).build(); - messageSender.send("iot/test1/in-storage", JacksonUtil.objectStr(dataAnalysisDTO)); - } - } - - /** - * 根据索引生成测试数据 - * @param index 索引 - * @return - */ - private Map handleMapByIndex(int index) { - Map map = new HashMap<>(); - map.put("time", new Date()); - map.put("projectId", "0jZU2102"); - map.put("deviceId", "0jZU2102_0806_0011"); - map.put("WM_WFA", 124.656 + index); - map.put("WM_WFA_Unit", "m³"); - map.put("test", "test"); - return map; - } - - // =============================================== 创建Dfs表 Stream流表 取消订阅 添加订阅 创建 ====================================== - - /** - * 测试创建Dfs表 - */ - @GetMapping(value = "/testCreateDfsTable") - public void testCreateDfsTable() { - TableDTO tableDTO = new TableDTO(); - tableDTO.setDatabaseName("ZbDB"); - tableDTO.setTableName("Test1"); - List tableColumnList = new ArrayList<>(); - tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); - tableColumnList.add(new TableColumnDTO("test1", "STRING","test1")); - tableColumnList.add(new TableColumnDTO("test2", "STRING","test2")); - tableDTO.setTableColumnList(tableColumnList); - tableStructureService.createDfsTable(tableDTO); - } - - /** - * 测试创建流表 - */ - @GetMapping(value = "/testCreateStreamTable") - public void testCreateStreamTable() { - TableDTO tableDTO = new TableDTO(); - tableDTO.setDatabaseName("ZbDB"); - tableDTO.setTableName("Test1"); - List tableColumnList = new ArrayList<>(); - tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); - tableColumnList.add(new TableColumnDTO("test1", "STRING","test1")); - tableColumnList.add(new TableColumnDTO("test2", "STRING","test2")); - tableDTO.setTableColumnList(tableColumnList); - tableStructureService.createStreamTable(tableDTO); - } - - /** - * 取消订阅流表 - */ - @GetMapping(value = "/testUnsubscribeStreamTable") - public void testUnsubscribeStreamTable() { - TableDTO tableDTO = new TableDTO(); - tableDTO.setDatabaseName("ZbDB"); - tableDTO.setTableName("Test1"); - tableStructureService.unsubscribeStreamTable(tableDTO); - } - - /** - * 订阅流表 - */ - @GetMapping(value = "/testSubscribeStreamTable") - public void testSubscribeStreamTable() { - TableDTO tableDTO = new TableDTO(); - tableDTO.setDatabaseName("ZbDB"); - tableDTO.setTableName("Test1"); - tableStructureService.subscribeStreamTable(tableDTO); - } - - /** - * 创建Dfs和Stream表并添加订阅 - */ - @GetMapping(value = "/testCreateDfsAndStreamTableSubscribe") - public void testCreateDfsAndStreamTableSubscribe() { - TableDTO tableDTO = new TableDTO(); - tableDTO.setDatabaseName("ZbDB"); - tableDTO.setTableName("MyTe2"); - tableDTO.setTableDesc("MyTe2测试表"); - List tableColumnList = new ArrayList<>(); - tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); - tableDTO.setTableColumnList(tableColumnList); - tableStructureService.createDfsAndStreamTableSubscribe(tableDTO); - } - - -} diff --git a/data-storage/src/main/java/com/huaxing/data/database/domain/TableDTO.java b/data-storage/src/main/java/com/huaxing/data/database/domain/TableDTO.java deleted file mode 100644 index 81cee86..0000000 --- a/data-storage/src/main/java/com/huaxing/data/database/domain/TableDTO.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.huaxing.data.database.domain; - -import com.huaxing.pojo.entity.TableColumnDTO; -import lombok.Data; - -import java.util.Iterator; -import java.util.List; -import java.util.Objects; - -/** - * @ProjectName: iot-data-bridge - * @Package: com.huaxing.data.database.domain - * @ClassName: TableDTO - * @Author: swordmeng8@163.com - * @Description: 表操作类 - * @Date: 2025/1/15 16:03 - * @Version: 1.0 - */ - -@Data -public class TableDTO { - - /** - * 数据库名 - */ - private String databaseName; - /** - * 表名 - */ - private String tableName; - /** - * 表描述 - */ - private String tableDesc; - /** - * 表列信息 - */ - private List tableColumnList; - - public String handleTableColumnName(List tableColumnList) { - return tableColumnList.stream().map(TableColumnDTO::getColumnName).reduce((a, b) -> a + "`" + b).get(); - } - - public String handleTableColumnType(List tableColumnList) { - return tableColumnList.stream().map(TableColumnDTO::getColumnType).reduce((a, b) -> a + "," + b).get(); - } - - public String handleTableColumnDesc(List tableColumnList) { - String columnDesc = ""; - // 获取列与描述的拼接字符串并返回 - Iterator iterator = tableColumnList.iterator(); - while (iterator.hasNext()) { - TableColumnDTO tableColumn = iterator.next(); - if (Objects.isNull(tableColumn.getColumnDesc())) { - columnDesc += tableColumn.getColumnName() + ":" + "\"-\""; - } else { - columnDesc += tableColumn.getColumnName() + ":\"" + tableColumn.getColumnDesc()+"\""; - } - if (iterator.hasNext()) { - columnDesc += ","; - } - } - return columnDesc; - } - -} diff --git a/data-storage/src/main/java/com/huaxing/data/database/service/IDatabaseService.java b/data-storage/src/main/java/com/huaxing/data/database/service/IDatabaseService.java index 4c2bacd..2564102 100644 --- a/data-storage/src/main/java/com/huaxing/data/database/service/IDatabaseService.java +++ b/data-storage/src/main/java/com/huaxing/data/database/service/IDatabaseService.java @@ -1,6 +1,7 @@ package com.huaxing.data.database.service; import com.huaxing.common.result.ResultVo; +import com.huaxing.pojo.dto.DatabaseDTO; /** * 数据库操作 @@ -10,8 +11,8 @@ import com.huaxing.common.result.ResultVo; public interface IDatabaseService { // 检查数据库是否存在 - ResultVo checkExistsDatabase(String databaseName); + boolean checkExistsDatabase(String databaseName); // 创建数据库 - ResultVo createDatabase(String databaseName); + ResultVo createDatabase(DatabaseDTO databaseDTO); } diff --git a/data-storage/src/main/java/com/huaxing/data/database/service/ITableStructureService.java b/data-storage/src/main/java/com/huaxing/data/database/service/ITableStructureService.java index 3c411f5..af8486c 100644 --- a/data-storage/src/main/java/com/huaxing/data/database/service/ITableStructureService.java +++ b/data-storage/src/main/java/com/huaxing/data/database/service/ITableStructureService.java @@ -1,6 +1,8 @@ package com.huaxing.data.database.service; -import com.huaxing.data.database.domain.TableDTO; + +import com.huaxing.common.result.ResultVo; +import com.huaxing.pojo.dto.TableDTO; /** * @ProjectName: iot-data-bridge @@ -14,27 +16,32 @@ import com.huaxing.data.database.domain.TableDTO; public interface ITableStructureService { // 添加dfs表多列 - void addDfsColumns(TableDTO tableDTO); + ResultVo addDfsColumns(TableDTO tableDTO); // 添加Stream流表多列 - void addStreamColumns(TableDTO tableDTO); + ResultVo addStreamColumns(TableDTO tableDTO); // 创建dfs表 - void createDfsTable(TableDTO tableDTO); + ResultVo createDfsTable(TableDTO tableDTO); // 创建Stream流表 - void createStreamTable(TableDTO tableDTO); + ResultVo createStreamTable(TableDTO tableDTO); // 取消订阅 - public void unsubscribeStreamTable(TableDTO tableDTO); + ResultVo unsubscribeStreamTable(TableDTO tableDTO); // 订阅流表 - void subscribeStreamTable(TableDTO tableDTO); + ResultVo subscribeStreamTable(TableDTO tableDTO); // 判断表是否存在 - boolean isTableExist(String tableName); + boolean dfsTableExistStatus(String databaseName, String dfsTableName); + // 判断表是否存在 + boolean streamTableExistStatus(String streamTableName); - void createDfsAndStreamTableSubscribe(TableDTO tableDTO); + // 创建dfs表和Stream表并订阅 + ResultVo createDfsAndStreamTableSubscribe(TableDTO tableDTO); + // 创建dfs表和Stream表不订阅 + ResultVo createDfsAndStreamTableUnsubscribe(TableDTO tableDTO); } diff --git a/data-storage/src/main/java/com/huaxing/data/database/service/impl/DatabaseServiceImpl.java b/data-storage/src/main/java/com/huaxing/data/database/service/impl/DatabaseServiceImpl.java index 2606599..3530932 100644 --- a/data-storage/src/main/java/com/huaxing/data/database/service/impl/DatabaseServiceImpl.java +++ b/data-storage/src/main/java/com/huaxing/data/database/service/impl/DatabaseServiceImpl.java @@ -1,11 +1,14 @@ package com.huaxing.data.database.service.impl; import com.huaxing.common.result.ResultVo; -import com.huaxing.common.util.StrUtils; import com.huaxing.data.database.service.IDatabaseService; import com.huaxing.data.database.template.ISqlTemplateService; import com.huaxing.dolphindb.base.CommonService; +import com.huaxing.pojo.dto.DatabaseDTO; +import com.xxdb.data.BasicBoolean; +import com.xxdb.data.Entity; import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; /** @@ -17,11 +20,12 @@ import org.springframework.stereotype.Service; * @Date: 2025/2/12 13:48 * @Version: 1.0 */ +@Slf4j @Service @AllArgsConstructor public class DatabaseServiceImpl extends CommonService implements IDatabaseService { - final ISqlTemplateService tableTemplateService; + private final ISqlTemplateService tableTemplateService; /** * @description: 检查数据库是否存在 @@ -31,12 +35,9 @@ public class DatabaseServiceImpl extends CommonService implements IDatabaseServi * @date 2025/2/12 13:48 */ @Override - public ResultVo checkExistsDatabase(String databaseName) { - if (!StrUtils.checkStringStartsWithExample(databaseName, "dfs://")) { - return ResultVo.fail("数据库名必须以dfs://开头"); - } - ; - return ResultVo.status(exec(tableTemplateService.checkExistsDatabaseTemplate(databaseName))); + public boolean checkExistsDatabase(String databaseName) { + Entity result = exec(tableTemplateService.checkExistsDatabaseTemplate(databaseName)); + return parseBoolResult(result); } /** @@ -47,16 +48,12 @@ public class DatabaseServiceImpl extends CommonService implements IDatabaseServi * @date 2025/2/12 13:48 */ @Override - public ResultVo createDatabase(String databaseName) { + public ResultVo createDatabase(DatabaseDTO databaseDTO) { + if (this.checkExistsDatabase(databaseDTO.getDatabaseName())) { + return ResultVo.fail("数据库已存在"); + } // 创建数据库操作 -// StringBuilder script = new StringBuilder(); -// script.append("pt = loadTable(\"dfs://").append(tableDTO.getDatabaseName()).append("\", `").append(tableDTO.getTableName()).append(");"); -// tableDTO.getTableColumnList().forEach(columnItem -> { -// script.append("alter table pt add ").append(columnItem.getColumnName()).append(" ").append(columnItem.getColumnType()).append(";"); -// }); -// exec(script.toString()); - - return ResultVo.status(true); - + exec(tableTemplateService.createDatabaseTemplate(databaseDTO.getDatabaseName())); + return ResultVo.status(this.checkExistsDatabase(databaseDTO.getDatabaseName()), "创建数据库成功","创建数据库失败"); } } diff --git a/data-storage/src/main/java/com/huaxing/data/database/service/impl/TableStructureService.java b/data-storage/src/main/java/com/huaxing/data/database/service/impl/TableStructureService.java index 45d8f63..38d1669 100644 --- a/data-storage/src/main/java/com/huaxing/data/database/service/impl/TableStructureService.java +++ b/data-storage/src/main/java/com/huaxing/data/database/service/impl/TableStructureService.java @@ -1,9 +1,12 @@ package com.huaxing.data.database.service.impl; -import com.huaxing.data.database.domain.TableDTO; +import com.huaxing.common.result.ResultVo; import com.huaxing.data.database.template.ISqlTemplateService; import com.huaxing.dolphindb.base.CommonService; import com.huaxing.data.database.service.ITableStructureService; +import com.huaxing.pojo.dto.TableDTO; +import com.xxdb.data.BasicBoolean; +import com.xxdb.data.Entity; import lombok.AllArgsConstructor; import org.springframework.stereotype.Service; @@ -18,6 +21,7 @@ import org.springframework.stereotype.Service; */ @Service @AllArgsConstructor +@SuppressWarnings("all") public class TableStructureService extends CommonService implements ITableStructureService { final ISqlTemplateService tableTemplateService; @@ -27,13 +31,14 @@ public class TableStructureService extends CommonService implements ITableStruct * @param tableDTO */ @Override - public void addDfsColumns(TableDTO tableDTO) { + public ResultVo addDfsColumns(TableDTO tableDTO) { StringBuilder script = new StringBuilder(); - script.append("pt = loadTable(\"dfs://").append(tableDTO.getDatabaseName()).append("\", `").append(tableDTO.getTableName()).append(");"); + script.append("pt = loadTable(\"dfs://").append(tableDTO.getDatabaseName()).append("\", `").append(tableDTO.getDfsTableName()).append(");"); tableDTO.getTableColumnList().forEach(columnItem -> { script.append("alter table pt add ").append(columnItem.getColumnName()).append(" ").append(columnItem.getColumnType()).append(";"); }); exec(script.toString()); + return ResultVo.status(true); } /** @@ -41,12 +46,13 @@ public class TableStructureService extends CommonService implements ITableStruct * @param tableDTO */ @Override - public void addStreamColumns(TableDTO tableDTO) { + public ResultVo addStreamColumns(TableDTO tableDTO) { StringBuilder streamScript = new StringBuilder(); tableDTO.getTableColumnList().forEach(columnItem -> { - streamScript.append("addColumn(").append(tableDTO.getTableName()).append(", `").append(columnItem.getColumnName()).append(",").append(columnItem.getColumnType()).append(");"); + streamScript.append("addColumn(").append(tableDTO.getStreamTableName()).append(", `").append(columnItem.getColumnName()).append(",").append(columnItem.getColumnType()).append(");"); }); exec(streamScript.toString()); + return ResultVo.status(true); } /** @@ -54,8 +60,10 @@ public class TableStructureService extends CommonService implements ITableStruct * @param tableDTO */ @Override - public void createDfsTable(TableDTO tableDTO) { + public ResultVo createDfsTable(TableDTO tableDTO) { exec(tableTemplateService.createDfsTableTemplate(tableDTO)); + Entity result = exec(tableTemplateService.checkExistsDfsTableTemplate(tableDTO.getDatabaseName(), tableDTO.getDfsTableName())); + return ResultVo.status(parseBoolResult(result), "创建dfs表成功","创建dfs表失败"); } /** @@ -63,8 +71,10 @@ public class TableStructureService extends CommonService implements ITableStruct * @param tableDTO */ @Override - public void createStreamTable(TableDTO tableDTO) { + public ResultVo createStreamTable(TableDTO tableDTO) { exec(tableTemplateService.createStreamTableTemplate(tableDTO)); + Entity result = exec(tableTemplateService.checkExistsStreamTableTemplate(tableDTO.getStreamTableName())); + return ResultVo.status(parseBoolResult(result), "创建Stream表成功","创建Stream表失败"); } /** @@ -72,8 +82,9 @@ public class TableStructureService extends CommonService implements ITableStruct * @param tableDTO */ @Override - public void unsubscribeStreamTable(TableDTO tableDTO) { - exec(tableTemplateService.unsubscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName())); + public ResultVo unsubscribeStreamTable(TableDTO tableDTO) { + exec(tableTemplateService.unsubscribeStreamTableTemplate(tableDTO)); + return ResultVo.ok("取消流表订阅成功",null); } /** @@ -81,17 +92,37 @@ public class TableStructureService extends CommonService implements ITableStruct * @param tableDTO */ @Override - public void subscribeStreamTable(TableDTO tableDTO) { - exec(tableTemplateService.subscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName())); + public ResultVo subscribeStreamTable(TableDTO tableDTO) { + exec(tableTemplateService.subscribeStreamTableTemplate(tableDTO)); + return ResultVo.ok("流表订阅成功",null); } /** - * 判断表是否存在 - * @param tableName + * 判断DFS表是否存在 + * @param databaseName,tableName */ @Override - public boolean isTableExist(String tableName) { - return false; + public boolean dfsTableExistStatus(String databaseName, String tableName) { + boolean tableExist = false; + Entity result = exec(tableTemplateService.checkExistsDfsTableTemplate(databaseName, tableName)); + if (result instanceof BasicBoolean) { + tableExist = ((BasicBoolean) result).getBoolean(); + } + return tableExist; + } + + /** + * 判断Stream表是否存在 + * @param streamTableName + */ + @Override + public boolean streamTableExistStatus(String streamTableName) { + boolean tableExist = false; + Entity result = exec(tableTemplateService.checkExistsStreamTableTemplate(streamTableName)); + if (result instanceof BasicBoolean) { + tableExist = ((BasicBoolean) result).getBoolean(); + } + return tableExist; } /** @@ -99,7 +130,22 @@ public class TableStructureService extends CommonService implements ITableStruct * @param tableDTO */ @Override - public void createDfsAndStreamTableSubscribe(TableDTO tableDTO) { + public ResultVo createDfsAndStreamTableSubscribe(TableDTO tableDTO) { exec(tableTemplateService.createDfsAndStreamTableSubscribe(tableDTO)); + boolean dfsStatus = dfsTableExistStatus(tableDTO.getDatabaseName(), tableDTO.getDfsTableName()); + boolean streamStatus = streamTableExistStatus(tableDTO.getStreamTableName()); + return ResultVo.status(dfsStatus && streamStatus, "创建dfs表和流表并订阅成功","创建dfs表和流表并订阅失败"); + } + + /** + * 创建dfs表和流表并取消订阅 + * @param tableDTO + */ + @Override + public ResultVo createDfsAndStreamTableUnsubscribe(TableDTO tableDTO) { + exec(tableTemplateService.createDfsAndStreamTableUnsubscribe(tableDTO)); + boolean dfsStatus = dfsTableExistStatus(tableDTO.getDatabaseName(), tableDTO.getDfsTableName()); + boolean streamStatus = streamTableExistStatus(tableDTO.getStreamTableName()); + return ResultVo.status(dfsStatus && streamStatus, "创建dfs表和流表成功","创建dfs表和流表失败"); } } diff --git a/data-storage/src/main/java/com/huaxing/data/database/template/ISqlTemplateService.java b/data-storage/src/main/java/com/huaxing/data/database/template/ISqlTemplateService.java index 309331d..ac7898e 100644 --- a/data-storage/src/main/java/com/huaxing/data/database/template/ISqlTemplateService.java +++ b/data-storage/src/main/java/com/huaxing/data/database/template/ISqlTemplateService.java @@ -1,6 +1,7 @@ package com.huaxing.data.database.template; -import com.huaxing.data.database.domain.TableDTO; + +import com.huaxing.pojo.dto.TableDTO; /** * 表创建模板 @@ -22,10 +23,22 @@ public interface ISqlTemplateService { String createStreamTableTemplate(TableDTO tableDTO); // 取消订阅流表模板 - String unsubscribeStreamTableTemplate(String databaseName, String tableName); + String unsubscribeStreamTableTemplate(TableDTO tableDTO); // 订阅流表模板 - String subscribeStreamTableTemplate(String databaseName, String tableName); + String subscribeStreamTableTemplate(TableDTO tableDTO); + // 创建dfs和流表订阅模板 String createDfsAndStreamTableSubscribe(TableDTO tableDTO); + + // 创建dfs和流表取消订阅模板 + String createDfsAndStreamTableUnsubscribe(TableDTO tableDTO); + + // 检查表是否存在 + String checkExistsDfsTableTemplate(String databaseName, String tableName); + + // 检查流表是否存在 + String checkExistsStreamTableTemplate(String streamTableName); + + } diff --git a/data-storage/src/main/java/com/huaxing/data/database/template/SqlTemplateServiceImpl.java b/data-storage/src/main/java/com/huaxing/data/database/template/SqlTemplateServiceImpl.java index 66f35e8..593d7d3 100644 --- a/data-storage/src/main/java/com/huaxing/data/database/template/SqlTemplateServiceImpl.java +++ b/data-storage/src/main/java/com/huaxing/data/database/template/SqlTemplateServiceImpl.java @@ -1,7 +1,7 @@ package com.huaxing.data.database.template; -import com.huaxing.data.database.domain.TableDTO; import com.huaxing.dolphindb.base.CommonService; +import com.huaxing.pojo.dto.TableDTO; import org.springframework.stereotype.Service; /** @@ -24,7 +24,7 @@ public class SqlTemplateServiceImpl extends CommonService implements ISqlTemplat **/ @Override public String checkExistsDatabaseTemplate(String databaseName) { - return null; + return "existsDatabase(\"dfs://" + databaseName + "\")"; } /** @@ -35,7 +35,7 @@ public class SqlTemplateServiceImpl extends CommonService implements ISqlTemplat **/ @Override public String createDatabaseTemplate(String databaseName) { - return null; + return "database(directory=\"dfs://" + databaseName + "\", partitionType=VALUE, partitionScheme=2025.01M..2050.12M, engine='TSDB')"; } /** @@ -51,7 +51,7 @@ public class SqlTemplateServiceImpl extends CommonService implements ISqlTemplat String colTypes = tableDTO.handleTableColumnType(tableDTO.getTableColumnList()); String colDesc = tableDTO.handleTableColumnDesc(tableDTO.getTableColumnList()); sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" + - "ptName = \"" + tableDTO.getTableName() + "Dfs\"\n" + + "ptName = \"" + tableDTO.getDfsTableName() + "Dfs\"\n" + "def createPT(dbPath,ptName){\n" + " db = database(dbPath)\n" + " if (existsTable(dbPath, ptName)){\n" + @@ -85,7 +85,7 @@ public class SqlTemplateServiceImpl extends CommonService implements ISqlTemplat String colTypes = tableDTO.handleTableColumnType(tableDTO.getTableColumnList()); // String colDesc = tableDTO.handleTableColumnDesc(tableDTO.getTableColumnList()); sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" + - "stName = \"" + tableDTO.getTableName() + "Stream\"\n" + + "stName = \"" + tableDTO.getDfsTableName() + "Stream\"\n" + "def createST(stName){\n" + " if(not existsStreamTable(stName)){\n" + " colNames = `time`projectId`deviceId`" + colNames + "\n" + @@ -109,12 +109,12 @@ public class SqlTemplateServiceImpl extends CommonService implements ISqlTemplat * @Version v1.0 **/ @Override - public String unsubscribeStreamTableTemplate(String databaseName, String tableName) { + public String unsubscribeStreamTableTemplate(TableDTO tableDTO) { String sql = ""; - sql = "dbPath = \"dfs://" + databaseName + "\"\n" + - "stName = \"" + tableName + "Stream\"\n" + - "ptName = \"" + tableName + "Dfs\"\n" + - "unsubscribeTable(tableName=stName, actionName=`" + tableName + "ChgTime);"; + sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" + + "stName = \"" + tableDTO.getDfsTableName() + "Stream\"\n" + + "ptName = \"" + tableDTO.getStreamTableName() + "Dfs\"\n" + + "unsubscribeTable(tableName=stName, actionName=`" + tableDTO.getDfsTableName() + "ChgTime);"; return sql; } @@ -125,15 +125,21 @@ public class SqlTemplateServiceImpl extends CommonService implements ISqlTemplat * @Version v1.0 **/ @Override - public String subscribeStreamTableTemplate(String databaseName, String tableName) { + public String subscribeStreamTableTemplate(TableDTO tableDTO) { String sql = ""; - sql = "dbPath = \"dfs://" + databaseName + "\"\n" + - "stName = \"" + tableName + "Stream\"\n" + - "ptName = \"" + tableName + "Dfs\"\n" + - "subscribeTable(tableName=stName, actionName=`" + tableName + "ChgTime, offset=-1, handler=loadTable(dbPath,ptName), msgAsTable=true);"; + sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" + + "stName = \"" + tableDTO.getDfsTableName() + "Stream\"\n" + + "ptName = \"" + tableDTO.getStreamTableName() + "Dfs\"\n" + + "subscribeTable(tableName=stName, actionName=`" + tableDTO.getDfsTableName() + "ChgTime, offset=-1, handler=loadTable(dbPath,ptName), msgAsTable=true);"; return sql; } + /** + * @Description 创建 DFS 和流表订阅模板 + * @Author swordmeng8@163.com + * @Date 2025/1/20 10:46 + * @Version v1.0 + **/ @Override public String createDfsAndStreamTableSubscribe(TableDTO tableDTO) { StringBuilder script = new StringBuilder(); @@ -142,9 +148,52 @@ public class SqlTemplateServiceImpl extends CommonService implements ISqlTemplat // 执行创建流表模板 script.append(createStreamTableTemplate(tableDTO)); // 取消订阅流表模板 - script.append(unsubscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName())); + script.append(unsubscribeStreamTableTemplate(tableDTO)); // 订阅流表模板 - script.append(subscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName())); + script.append(subscribeStreamTableTemplate(tableDTO)); return script.toString(); } + + /** + * @Description 创建 DFS 和流表取消订阅模板 + * @Author swordmeng8@163.com + * @Date 2025/1/20 10:46 + * @Version v1.0 + **/ + @Override + public String createDfsAndStreamTableUnsubscribe(TableDTO tableDTO) { + StringBuilder script = new StringBuilder(); + // 执行创建 DFS 表模板 + script.append(createDfsTableTemplate(tableDTO)); + // 执行创建流表模板 + script.append(createStreamTableTemplate(tableDTO)); + // 取消订阅流表模板 + script.append(unsubscribeStreamTableTemplate(tableDTO)); + return script.toString(); + } + + /** + * @Description 检查表是否存在模板 + * @Author swordmeng8@163.com + * @param databaseName + * @param tableName + * @Date 2025/1/20 10:46 + * @Version v1.0 + **/ + @Override + public String checkExistsDfsTableTemplate(String databaseName, String tableName) { + return String.format("existsTable(\"dfs://%s\", \"%s\")", databaseName, tableName); + } + + /** + * @Description 检查流表是否存在模板 + * @Author swordmeng8@163.com + * @Date 2025/1/20 10:46 + * @param streamTableName + * @return + */ + @Override + public String checkExistsStreamTableTemplate(String streamTableName) { + return String.format("existsStreamTable(\"%s\")", streamTableName); + } }