From b271ba7a24d29a1085c1282973a203cfc603b902 Mon Sep 17 00:00:00 2001 From: swordmeng Date: Wed, 12 Feb 2025 17:44:52 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Efeign=E7=AD=89=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E5=8F=8A=E6=95=B0=E6=8D=AE=E5=BA=93=E5=88=9B=E5=BB=BA?= =?UTF-8?q?=E6=96=B9=E6=B3=95=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../huaxing/common/exception/BaseException.java | 50 +++ .../com/huaxing/common/exception/BizException.java | 23 ++ .../com/huaxing/common/result/BaseException.java | 50 --- .../com/huaxing/common/result/BizException.java | 23 -- .../java/com/huaxing/common/result/ResultVo.java | 5 + .../java/com/huaxing/common/util/JacksonUtil.java | 2 - .../java/com/huaxing/common/util/StrUtils.java | 26 ++ .../com/huaxing/dolphindb/base/CommonService.java | 9 +- data-storage-api/pom.xml | 41 ++ .../src/main/java/com/huaxing/Main.java | 7 + .../com/huaxing/feign/IDatabaseClientFeign.java | 73 ++++ .../fallback/DatabaseFeignFallbackFactory.java | 41 ++ .../java/com/huaxing/pojo/entity/DatabaseDTO.java | 18 + .../com/huaxing/pojo/entity/TableColumnDTO.java | 36 ++ .../java/com/huaxing/pojo/entity/TableDTO.java | 16 + data-storage/pom.xml | 24 +- .../java/com/huaxing/IotDataBridgeApplication.java | 2 + .../database/controller/DatabaseController.java | 240 ++++++++++++ .../com/huaxing/data/database/domain/TableDTO.java | 66 ++++ .../data/database/service/IDatabaseService.java | 17 + .../database/service/ITableStructureService.java | 40 ++ .../database/service/impl/DatabaseServiceImpl.java | 62 +++ .../service/impl/TableStructureService.java | 105 ++++++ .../database/template/ISqlTemplateService.java | 31 ++ .../database/template/SqlTemplateServiceImpl.java | 150 ++++++++ .../tablemanagement/domain/TableColumnDTO.java | 36 -- .../data/tablemanagement/domain/TableDTO.java | 65 ---- .../service/ITableStructureService.java | 43 --- .../service/ITableTemplateService.java | 27 -- .../service/impl/TableStructureService.java | 109 ------ .../service/impl/TableTemplateServiceImpl.java | 131 ------- .../main/java/com/huaxing/test/TestController.java | 415 ++++++++++----------- pom.xml | 25 +- 33 files changed, 1287 insertions(+), 721 deletions(-) create mode 100644 data-framework/src/main/java/com/huaxing/common/exception/BaseException.java create mode 100644 data-framework/src/main/java/com/huaxing/common/exception/BizException.java delete mode 100644 data-framework/src/main/java/com/huaxing/common/result/BaseException.java delete mode 100644 data-framework/src/main/java/com/huaxing/common/result/BizException.java create mode 100644 data-framework/src/main/java/com/huaxing/common/util/StrUtils.java create mode 100644 data-storage-api/pom.xml create mode 100644 data-storage-api/src/main/java/com/huaxing/Main.java create mode 100644 data-storage-api/src/main/java/com/huaxing/feign/IDatabaseClientFeign.java create mode 100644 data-storage-api/src/main/java/com/huaxing/feign/fallback/DatabaseFeignFallbackFactory.java create mode 100644 data-storage-api/src/main/java/com/huaxing/pojo/entity/DatabaseDTO.java create mode 100644 data-storage-api/src/main/java/com/huaxing/pojo/entity/TableColumnDTO.java create mode 100644 data-storage-api/src/main/java/com/huaxing/pojo/entity/TableDTO.java create mode 100644 data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseController.java create mode 100644 data-storage/src/main/java/com/huaxing/data/database/domain/TableDTO.java create mode 100644 data-storage/src/main/java/com/huaxing/data/database/service/IDatabaseService.java create mode 100644 data-storage/src/main/java/com/huaxing/data/database/service/ITableStructureService.java create mode 100644 data-storage/src/main/java/com/huaxing/data/database/service/impl/DatabaseServiceImpl.java create mode 100644 data-storage/src/main/java/com/huaxing/data/database/service/impl/TableStructureService.java create mode 100644 data-storage/src/main/java/com/huaxing/data/database/template/ISqlTemplateService.java create mode 100644 data-storage/src/main/java/com/huaxing/data/database/template/SqlTemplateServiceImpl.java delete mode 100644 data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableColumnDTO.java delete mode 100644 data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableDTO.java delete mode 100644 data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableStructureService.java delete mode 100644 data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableTemplateService.java delete mode 100644 data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableStructureService.java delete mode 100644 data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableTemplateServiceImpl.java diff --git a/data-framework/src/main/java/com/huaxing/common/exception/BaseException.java b/data-framework/src/main/java/com/huaxing/common/exception/BaseException.java new file mode 100644 index 0000000..f970671 --- /dev/null +++ b/data-framework/src/main/java/com/huaxing/common/exception/BaseException.java @@ -0,0 +1,50 @@ +package com.huaxing.common.exception; + +import com.huaxing.common.constant.AppConstant; +import lombok.Getter; +import lombok.ToString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Getter +@ToString +public abstract class BaseException extends RuntimeException { + + private static final Logger Logger = LoggerFactory.getLogger(BizException.class); + + protected final String code; + + protected Object data; + + protected BaseException(String code, String message) { + super(message); + if (code == null){ + this.code = AppConstant.CODE_BIZ; + }else { + this.code = code; + } + } + + public final void print(){ + Logger.error("exception info print: start"); + Logger.error(this.getMessage(),this); + Logger.error("data:{}", data); + Logger.error("exception info print: end"); + } + + public final BaseException setData(Object data){ + this.data = data; + return this; + } + + public final D getData(){ + return (D)data; + } + + protected static String buildMsg(String msg,String hint){ + if (hint != null){ + msg = msg + ":" + hint; + } + return msg; + } +} diff --git a/data-framework/src/main/java/com/huaxing/common/exception/BizException.java b/data-framework/src/main/java/com/huaxing/common/exception/BizException.java new file mode 100644 index 0000000..07af54f --- /dev/null +++ b/data-framework/src/main/java/com/huaxing/common/exception/BizException.java @@ -0,0 +1,23 @@ +package com.huaxing.common.exception; + +import com.huaxing.common.constant.AppConstant; +import org.springframework.util.StringUtils; + +public class BizException extends BaseException { + + private BizException(String msg) { + super(AppConstant.CODE_BIZ, msg); + } + + /** tag!(msg:hint) */ + public static BizException custom(String tag, String msg, String hint){ + return BizException.custom(tag,BaseException.buildMsg(msg,hint)); + } + /** tag!(msg) */ + public static BizException custom(String tag, String msg){ + if (!StringUtils.isEmpty(msg)){ + tag += "!(" + msg + ")"; + } + return new BizException(tag); + } +} diff --git a/data-framework/src/main/java/com/huaxing/common/result/BaseException.java b/data-framework/src/main/java/com/huaxing/common/result/BaseException.java deleted file mode 100644 index 30d8d90..0000000 --- a/data-framework/src/main/java/com/huaxing/common/result/BaseException.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.huaxing.common.result; - -import com.huaxing.common.constant.AppConstant; -import lombok.Getter; -import lombok.ToString; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Getter -@ToString -public abstract class BaseException extends RuntimeException { - - private static final Logger Logger = LoggerFactory.getLogger(BizException.class); - - protected final String code; - - protected Object data; - - protected BaseException(String code, String message) { - super(message); - if (code == null){ - this.code = AppConstant.CODE_BIZ; - }else { - this.code = code; - } - } - - public final void print(){ - Logger.error("exception info print: start"); - Logger.error(this.getMessage(),this); - Logger.error("data:{}", data); - Logger.error("exception info print: end"); - } - - public final BaseException setData(Object data){ - this.data = data; - return this; - } - - public final D getData(){ - return (D)data; - } - - protected static String buildMsg(String msg,String hint){ - if (hint != null){ - msg = msg + ":" + hint; - } - return msg; - } -} diff --git a/data-framework/src/main/java/com/huaxing/common/result/BizException.java b/data-framework/src/main/java/com/huaxing/common/result/BizException.java deleted file mode 100644 index 9446aa5..0000000 --- a/data-framework/src/main/java/com/huaxing/common/result/BizException.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.huaxing.common.result; - -import com.huaxing.common.constant.AppConstant; -import org.springframework.util.StringUtils; - -public class BizException extends BaseException { - - private BizException(String msg) { - super(AppConstant.CODE_BIZ, msg); - } - - /** tag!(msg:hint) */ - public static BizException custom(String tag, String msg, String hint){ - return BizException.custom(tag,BaseException.buildMsg(msg,hint)); - } - /** tag!(msg) */ - public static BizException custom(String tag, String msg){ - if (!StringUtils.isEmpty(msg)){ - tag += "!(" + msg + ")"; - } - return new BizException(tag); - } -} diff --git a/data-framework/src/main/java/com/huaxing/common/result/ResultVo.java b/data-framework/src/main/java/com/huaxing/common/result/ResultVo.java index 88e359e..582ac8f 100644 --- a/data-framework/src/main/java/com/huaxing/common/result/ResultVo.java +++ b/data-framework/src/main/java/com/huaxing/common/result/ResultVo.java @@ -1,6 +1,7 @@ package com.huaxing.common.result; import com.huaxing.common.constant.AppConstant; +import com.huaxing.common.exception.BaseException; import lombok.Getter; import java.io.Serializable; @@ -49,4 +50,8 @@ public class ResultVo implements Serializable { public static ResultVo fail(BaseException exception) { return new ResultVo<>(exception); } + + public static ResultVo status(boolean flag) { + return flag ? ResultVo.ok() : ResultVo.fail("操作失败!"); + } } diff --git a/data-framework/src/main/java/com/huaxing/common/util/JacksonUtil.java b/data-framework/src/main/java/com/huaxing/common/util/JacksonUtil.java index e589f41..39f8742 100644 --- a/data-framework/src/main/java/com/huaxing/common/util/JacksonUtil.java +++ b/data-framework/src/main/java/com/huaxing/common/util/JacksonUtil.java @@ -21,10 +21,8 @@ public class JacksonUtil { public static String objectStr(Object object) { ObjectMapper mapper = new ObjectMapper(); try { - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); - return mapper.writeValueAsString(object); } catch (JsonProcessingException e) { throw new RuntimeException(e); diff --git a/data-framework/src/main/java/com/huaxing/common/util/StrUtils.java b/data-framework/src/main/java/com/huaxing/common/util/StrUtils.java new file mode 100644 index 0000000..5738737 --- /dev/null +++ b/data-framework/src/main/java/com/huaxing/common/util/StrUtils.java @@ -0,0 +1,26 @@ +package com.huaxing.common.util; + +/** + * @ProjectName: data-bridge + * @Package: com.huaxing.common.util + * @ClassName: StrUtils + * @Author: swordmeng8@163.com + * @Description: 字符串工具类 + * @Date: 2025/2/12 15:23 + * @Version: 1.0 + */ + +public class StrUtils { + + /** + * 判断字符串是否以指定前缀开头 + * @param str + * @param prefix + * @return + */ + public static boolean checkStringStartsWithExample(String str, String prefix) { + // 使用 startsWith 方法进行判断 + return str.startsWith(prefix); + } + +} diff --git a/data-framework/src/main/java/com/huaxing/dolphindb/base/CommonService.java b/data-framework/src/main/java/com/huaxing/dolphindb/base/CommonService.java index bd3fbdb..395971e 100644 --- a/data-framework/src/main/java/com/huaxing/dolphindb/base/CommonService.java +++ b/data-framework/src/main/java/com/huaxing/dolphindb/base/CommonService.java @@ -23,8 +23,6 @@ public class CommonService extends SqlConverterStatementHandle { @Autowired private AbstractDbConnector dbPool; - - /** * 执行单条sql * @@ -43,20 +41,23 @@ public class CommonService extends SqlConverterStatementHandle { } /** - * 执行单条sql + * 执行单条sql bool返回值 * * @param sql */ - public void exec(String sql) { + public boolean exec(String sql) { + boolean isSuccess = false; DBConnection connection = dbPool.getConnection(); try { connection.run(sql); + isSuccess = true; } catch (IOException e) { log.error("AbstractDbConnector.exec() Method执行异常:{}", e.getMessage()); throw new RuntimeException(e); } finally { connection.close(); } + return isSuccess; } /** diff --git a/data-storage-api/pom.xml b/data-storage-api/pom.xml new file mode 100644 index 0000000..a7c9196 --- /dev/null +++ b/data-storage-api/pom.xml @@ -0,0 +1,41 @@ + + + 4.0.0 + + com.huaxing + data-bridge + 0.0.1-SNAPSHOT + + + data-storage-api + jar + + + 17 + 17 + UTF-8 + + + + com.huaxing + data-framework + 0.0.1-SNAPSHOT + compile + + + + org.springframework.cloud + spring-cloud-starter-openfeign + 2.2.9.RELEASE + + + + org.springframework.cloud + spring-cloud-context + 3.1.4 + + + + \ No newline at end of file diff --git a/data-storage-api/src/main/java/com/huaxing/Main.java b/data-storage-api/src/main/java/com/huaxing/Main.java new file mode 100644 index 0000000..b310726 --- /dev/null +++ b/data-storage-api/src/main/java/com/huaxing/Main.java @@ -0,0 +1,7 @@ +package com.huaxing; + +public class Main { + public static void main(String[] args) { + System.out.println("Hello world!"); + } +} \ No newline at end of file diff --git a/data-storage-api/src/main/java/com/huaxing/feign/IDatabaseClientFeign.java b/data-storage-api/src/main/java/com/huaxing/feign/IDatabaseClientFeign.java new file mode 100644 index 0000000..5ff2902 --- /dev/null +++ b/data-storage-api/src/main/java/com/huaxing/feign/IDatabaseClientFeign.java @@ -0,0 +1,73 @@ +package com.huaxing.feign; + +import com.huaxing.common.constant.AppConstant; +import com.huaxing.common.result.ResultVo; +import com.huaxing.feign.fallback.DatabaseFeignFallbackFactory; +import com.huaxing.pojo.entity.DatabaseDTO; +import com.huaxing.pojo.entity.TableDTO; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; + +/** + * Notice Feign接口类 + * + * @author Chill + */ +@FeignClient(name = AppConstant.APP_NAME, fallbackFactory = DatabaseFeignFallbackFactory.class) +public interface IDatabaseClientFeign { + + // 请求前缀 + String API_PREFIX = "/api"; + + // 创建数据库 + String CREATE_DATABASE = API_PREFIX + "/database/create"; + + // 创建dfs表 + String CREATE_DFS_TABLE = API_PREFIX + "/table/create/dfs"; + + // 创建stream表 + String CREATE_STREAM_TABLE = API_PREFIX + "/table/create/stream"; + + // 创建dfs和stream表并订阅 + String CREATE_DFS_AND_STREAM_TABLE_SUBSCRIBE = API_PREFIX + "/table/create/dfs/stream/subscribe"; + + // 创建dfs和stream表不订阅 + String CREATE_DFS_AND_STREAM_TABLE_UNSUBSCRIBE = API_PREFIX + "/table/create/dfs/stream/unsubscribe"; + + // 订阅流表 + String SUBSCRIBE_STREAM_TABLE = API_PREFIX + "/table/subscribe/stream"; + + // 取消订阅流表 + String UNSUBSCRIBE_STREAM_TABLE = API_PREFIX + "/table/unsubscribe/stream"; + + + + // 新增dfs表字段 + String ADD_DFS_COLUMNS = API_PREFIX + "/table/add/dfs/columns"; + + // 新增stream表字段 + String ADD_STREAM_COLUMNS = API_PREFIX + "/table/add/stream/columns"; + + + /** + * 创建数据库 + * @param databaseDTO + * @return ResultVo + */ + @PostMapping(CREATE_DATABASE) + ResultVo createDatabase(@RequestBody @Validated DatabaseDTO databaseDTO); + + + /** + * 创建dfs表 + * @param tableDTO + * @return ResultVo + */ + @PostMapping(CREATE_DFS_TABLE) + ResultVo createDfsTable(@RequestBody @Validated TableDTO tableDTO); + + + +} diff --git a/data-storage-api/src/main/java/com/huaxing/feign/fallback/DatabaseFeignFallbackFactory.java b/data-storage-api/src/main/java/com/huaxing/feign/fallback/DatabaseFeignFallbackFactory.java new file mode 100644 index 0000000..994c0fe --- /dev/null +++ b/data-storage-api/src/main/java/com/huaxing/feign/fallback/DatabaseFeignFallbackFactory.java @@ -0,0 +1,41 @@ +package com.huaxing.feign.fallback; + +import com.huaxing.common.result.ResultVo; +import com.huaxing.feign.IDatabaseClientFeign; +import com.huaxing.pojo.entity.DatabaseDTO; +import com.huaxing.pojo.entity.TableDTO; +import feign.hystrix.FallbackFactory; + +/** + * @ProjectName: data-bridge + * @Package: com.huaxing.fallback + * @ClassName: DataStorageFeignFallbackFactory + * @Author: swordmeng8@163.com + * @Description: 1 + * @Date: 2025/2/12 10:29 + * @Version: 1.0 + */ + +public class DatabaseFeignFallbackFactory implements FallbackFactory { + @Override + public IDatabaseClientFeign create(Throwable throwable) { + return new IDatabaseClientFeign() { + /** + * 创建数据库 + * @param query + * @return + */ + @Override + public ResultVo createDatabase(DatabaseDTO query) { + + ResultVo.fail(throwable.getMessage()); + return null; + } + + @Override + public ResultVo createDfsTable(TableDTO tableDTO) { + return null; + } + }; + } +} diff --git a/data-storage-api/src/main/java/com/huaxing/pojo/entity/DatabaseDTO.java b/data-storage-api/src/main/java/com/huaxing/pojo/entity/DatabaseDTO.java new file mode 100644 index 0000000..962c2a7 --- /dev/null +++ b/data-storage-api/src/main/java/com/huaxing/pojo/entity/DatabaseDTO.java @@ -0,0 +1,18 @@ +package com.huaxing.pojo.entity; + +import lombok.Data; + +/** + * @ProjectName: data-bridge + * @Package: com.huaxing.pojo.entity + * @ClassName: DatabaseDTO + * @Author: swordmeng8@163.com + * @Description: 数据库建库接参对象 + * @Date: 2025/2/12 17:25 + * @Version: 1.0 + */ +@Data +public class DatabaseDTO { + private String databaseName; + private String databaseComment; +} diff --git a/data-storage-api/src/main/java/com/huaxing/pojo/entity/TableColumnDTO.java b/data-storage-api/src/main/java/com/huaxing/pojo/entity/TableColumnDTO.java new file mode 100644 index 0000000..18ac12e --- /dev/null +++ b/data-storage-api/src/main/java/com/huaxing/pojo/entity/TableColumnDTO.java @@ -0,0 +1,36 @@ +package com.huaxing.pojo.entity; + +import lombok.Data; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.database.domain + * @ClassName: TableColumnDTO + * @Author: swordmeng8@163.com + * @Description: 表列DTO + * @Date: 2025/1/15 16:05 + * @Version: 1.0 + */ +@Data +public class TableColumnDTO { + /** + * 列名 + */ + private String columnName; + /** + * 列类型 + */ + private String columnType; + /** + * 列描述 + */ + private String columnDesc; + + public TableColumnDTO() { + } + public TableColumnDTO(String columnName, String columnType, String columnDesc) { + this.columnName = columnName; + this.columnType = columnType; + this.columnDesc = columnDesc; + } +} diff --git a/data-storage-api/src/main/java/com/huaxing/pojo/entity/TableDTO.java b/data-storage-api/src/main/java/com/huaxing/pojo/entity/TableDTO.java new file mode 100644 index 0000000..5e780b9 --- /dev/null +++ b/data-storage-api/src/main/java/com/huaxing/pojo/entity/TableDTO.java @@ -0,0 +1,16 @@ +package com.huaxing.pojo.entity; + +import lombok.Data; + +/** + * @ProjectName: data-bridge + * @Package: com.huaxing.pojo.entity + * @ClassName: TableDTO + * @Author: swordmeng8@163.com + * @Description: 表结构接参类 + * @Date: 2025/2/12 17:40 + * @Version: 1.0 + */ +@Data +public class TableDTO { +} diff --git a/data-storage/pom.xml b/data-storage/pom.xml index 2550459..1b94c8c 100644 --- a/data-storage/pom.xml +++ b/data-storage/pom.xml @@ -48,11 +48,33 @@ spring-web 6.2.1 - + + org.springframework.cloud + spring-cloud-starter-openfeign + 2.2.9.RELEASE + + + com.huaxing + data-storage-api + 0.0.1-SNAPSHOT + compile + + + + + org.springframework.cloud + spring-cloud-dependencies + 2021.0.5 + pom + import + + + + \ No newline at end of file diff --git a/data-storage/src/main/java/com/huaxing/IotDataBridgeApplication.java b/data-storage/src/main/java/com/huaxing/IotDataBridgeApplication.java index ef82823..0c56ad7 100644 --- a/data-storage/src/main/java/com/huaxing/IotDataBridgeApplication.java +++ b/data-storage/src/main/java/com/huaxing/IotDataBridgeApplication.java @@ -2,7 +2,9 @@ package com.huaxing; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.openfeign.EnableFeignClients; +@EnableFeignClients @SpringBootApplication public class IotDataBridgeApplication { diff --git a/data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseController.java b/data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseController.java new file mode 100644 index 0000000..f64c521 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseController.java @@ -0,0 +1,240 @@ +package com.huaxing.data.database.controller; + +import com.huaxing.data.storage.domain.DataAnalysisDTO; +import com.huaxing.data.storage.service.IDeviceDataQueryDfsService; +import com.huaxing.data.storage.service.IDeviceDataQueryStreamService; +import com.huaxing.data.storage.service.IDeviceDataStoredService; +import com.huaxing.data.database.domain.TableDTO; +import com.huaxing.data.database.service.ITableStructureService; +import com.huaxing.common.util.JacksonUtil; +import com.huaxing.data.database.template.ISqlTemplateService; +import com.huaxing.mqtt.processor.MqttMessageSender; +import com.huaxing.common.result.ResultVo; +import com.huaxing.pojo.entity.TableColumnDTO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.*; + + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.storage.controller + * @ClassName: TestController + * @Author: swordmeng8@163.com + * @Description: 测试controller + * @Date: 2025/1/15 17:08 + * @Version: 1.0 + */ +@Slf4j +@RestController +@RequestMapping("/api/database") +@SuppressWarnings("all") +public class DatabaseController { + + @Autowired + private MqttMessageSender messageSender; + final IDeviceDataStoredService dataStoredService; + final IDeviceDataQueryDfsService dataQueryDfsService; + final IDeviceDataQueryStreamService dataQueryStreamService; + final ITableStructureService tableStructureService; + + + // =============================================== 测试构造器 ====================================== + public DatabaseController(IDeviceDataStoredService dataStoredService, IDeviceDataQueryDfsService dataQueryDfsService, IDeviceDataQueryStreamService dataQueryStreamService, ITableStructureService tableStructureService, ISqlTemplateService tableTemplateService) { + this.dataStoredService = dataStoredService; + this.dataQueryDfsService = dataQueryDfsService; + this.dataQueryStreamService = dataQueryStreamService; + this.tableStructureService = tableStructureService; + } + + + // =============================================== 测试插入数据 ====================================== + + /** + * 测试数据插入 + */ + @GetMapping(value = "/testInsert") + public void testInsert() { + String tableName = "MyTable2Stream"; + String sql = "INSERT INTO " + tableName + " (time, projectId, deviceId, test) VALUES (2025.01.11 00:00:00,'48', '0jZU2102_0806_0011', 'testVal')"; + dataStoredService.execute(sql); + log.info("SUCCESS"); + } + + // =============================================== 测试Dfs表查询 ====================================== + + /** + * 测试Dfs表查询 + */ + @GetMapping(value = "/testSelectDfs") + public ResultVo>> testSelectDfs() { + String dbPath = "dfs://ZbDB"; + String sql = String.format("select * from loadTable('%s','%s')", dbPath, "ZbWaterMeter1Dfs"); + return ResultVo.ok(dataQueryDfsService.selectList(sql)); + } + + // =============================================== 测试订阅流表查询 ====================================== + + /** + * 测试订阅流表查询 + */ + @GetMapping(value = "/testSelectStream") + public ResultVo>> testSelectStream() { + String sql = "select * from WaterMeterTset1Stream"; + return ResultVo.ok(dataQueryStreamService.selectList(sql)); + } + + // =============================================== 给指定的流表增加列字段 ====================================== + + /** + * 测试给指定的流表增加列字段 + */ + @GetMapping(value = "/testStreamAddColumn") + public void testStreamAddColumn() { + String tableName = "WaterMeterTsetStream"; + String columnName = "test"; + String columnType = "STRING"; + String columnDesc = "test"; + TableDTO tableDTO = new TableDTO(); + tableDTO.setTableName(tableName); + tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc))); + try { + tableStructureService.addStreamColumns(tableDTO); + log.info("SUCCESS"); + } catch (Exception e) { + log.error("FAIL"); + e.printStackTrace(); + } + } + + // =============================================== 给指定的Dfs表增加列字段 ====================================== + /** + * 测试给指定的Dfs表增加列字段 + */ + @GetMapping(value = "/testDfsAddColumn") + public void testDfsAddColumn() { + String tableName = "WaterMeterTsetDfs"; + String columnName = "test"; + String columnType = "STRING"; + String columnDesc = "test描述"; + TableDTO tableDTO = new TableDTO(); + tableDTO.setTableName(tableName); + tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc))); + try { + tableStructureService.addDfsColumns(tableDTO); + log.info("SUCCESS"); + } catch (Exception e) { + log.error("FAIL"); + e.printStackTrace(); + } + } + + + // ===================================== mqtt消息发送-测试数据入库 START ============================== + + /** + * 测试发送mqtt消息 + */ + @GetMapping(value = "/testSendMessage") + public void testSendMessage() { + for (int i = 0; i < 2; i++) { + List> dataList = new ArrayList<>(); + dataList.add(handleMapByIndex(i)); + DataAnalysisDTO dataAnalysisDTO = DataAnalysisDTO.builder().tableName( "WaterMeterTsetStream").dataList(dataList).build(); + messageSender.send("iot/test1/in-storage", JacksonUtil.objectStr(dataAnalysisDTO)); + } + } + + /** + * 根据索引生成测试数据 + * @param index 索引 + * @return + */ + private Map handleMapByIndex(int index) { + Map map = new HashMap<>(); + map.put("time", new Date()); + map.put("projectId", "0jZU2102"); + map.put("deviceId", "0jZU2102_0806_0011"); + map.put("WM_WFA", 124.656 + index); + map.put("WM_WFA_Unit", "m³"); + map.put("test", "test"); + return map; + } + + // =============================================== 创建Dfs表 Stream流表 取消订阅 添加订阅 创建 ====================================== + + /** + * 测试创建Dfs表 + */ + @GetMapping(value = "/testCreateDfsTable") + public void testCreateDfsTable() { + TableDTO tableDTO = new TableDTO(); + tableDTO.setDatabaseName("ZbDB"); + tableDTO.setTableName("Test1"); + List tableColumnList = new ArrayList<>(); + tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); + tableColumnList.add(new TableColumnDTO("test1", "STRING","test1")); + tableColumnList.add(new TableColumnDTO("test2", "STRING","test2")); + tableDTO.setTableColumnList(tableColumnList); + tableStructureService.createDfsTable(tableDTO); + } + + /** + * 测试创建流表 + */ + @GetMapping(value = "/testCreateStreamTable") + public void testCreateStreamTable() { + TableDTO tableDTO = new TableDTO(); + tableDTO.setDatabaseName("ZbDB"); + tableDTO.setTableName("Test1"); + List tableColumnList = new ArrayList<>(); + tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); + tableColumnList.add(new TableColumnDTO("test1", "STRING","test1")); + tableColumnList.add(new TableColumnDTO("test2", "STRING","test2")); + tableDTO.setTableColumnList(tableColumnList); + tableStructureService.createStreamTable(tableDTO); + } + + /** + * 取消订阅流表 + */ + @GetMapping(value = "/testUnsubscribeStreamTable") + public void testUnsubscribeStreamTable() { + TableDTO tableDTO = new TableDTO(); + tableDTO.setDatabaseName("ZbDB"); + tableDTO.setTableName("Test1"); + tableStructureService.unsubscribeStreamTable(tableDTO); + } + + /** + * 订阅流表 + */ + @GetMapping(value = "/testSubscribeStreamTable") + public void testSubscribeStreamTable() { + TableDTO tableDTO = new TableDTO(); + tableDTO.setDatabaseName("ZbDB"); + tableDTO.setTableName("Test1"); + tableStructureService.subscribeStreamTable(tableDTO); + } + + /** + * 创建Dfs和Stream表并添加订阅 + */ + @GetMapping(value = "/testCreateDfsAndStreamTableSubscribe") + public void testCreateDfsAndStreamTableSubscribe() { + TableDTO tableDTO = new TableDTO(); + tableDTO.setDatabaseName("ZbDB"); + tableDTO.setTableName("MyTe2"); + tableDTO.setTableDesc("MyTe2测试表"); + List tableColumnList = new ArrayList<>(); + tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); + tableDTO.setTableColumnList(tableColumnList); + tableStructureService.createDfsAndStreamTableSubscribe(tableDTO); + } + + +} diff --git a/data-storage/src/main/java/com/huaxing/data/database/domain/TableDTO.java b/data-storage/src/main/java/com/huaxing/data/database/domain/TableDTO.java new file mode 100644 index 0000000..81cee86 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/database/domain/TableDTO.java @@ -0,0 +1,66 @@ +package com.huaxing.data.database.domain; + +import com.huaxing.pojo.entity.TableColumnDTO; +import lombok.Data; + +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.database.domain + * @ClassName: TableDTO + * @Author: swordmeng8@163.com + * @Description: 表操作类 + * @Date: 2025/1/15 16:03 + * @Version: 1.0 + */ + +@Data +public class TableDTO { + + /** + * 数据库名 + */ + private String databaseName; + /** + * 表名 + */ + private String tableName; + /** + * 表描述 + */ + private String tableDesc; + /** + * 表列信息 + */ + private List tableColumnList; + + public String handleTableColumnName(List tableColumnList) { + return tableColumnList.stream().map(TableColumnDTO::getColumnName).reduce((a, b) -> a + "`" + b).get(); + } + + public String handleTableColumnType(List tableColumnList) { + return tableColumnList.stream().map(TableColumnDTO::getColumnType).reduce((a, b) -> a + "," + b).get(); + } + + public String handleTableColumnDesc(List tableColumnList) { + String columnDesc = ""; + // 获取列与描述的拼接字符串并返回 + Iterator iterator = tableColumnList.iterator(); + while (iterator.hasNext()) { + TableColumnDTO tableColumn = iterator.next(); + if (Objects.isNull(tableColumn.getColumnDesc())) { + columnDesc += tableColumn.getColumnName() + ":" + "\"-\""; + } else { + columnDesc += tableColumn.getColumnName() + ":\"" + tableColumn.getColumnDesc()+"\""; + } + if (iterator.hasNext()) { + columnDesc += ","; + } + } + return columnDesc; + } + +} diff --git a/data-storage/src/main/java/com/huaxing/data/database/service/IDatabaseService.java b/data-storage/src/main/java/com/huaxing/data/database/service/IDatabaseService.java new file mode 100644 index 0000000..4c2bacd --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/database/service/IDatabaseService.java @@ -0,0 +1,17 @@ +package com.huaxing.data.database.service; + +import com.huaxing.common.result.ResultVo; + +/** + * 数据库操作 + * @author 孟剑 + * @date 2025-02-12 13:47 + */ +public interface IDatabaseService { + + // 检查数据库是否存在 + ResultVo checkExistsDatabase(String databaseName); + + // 创建数据库 + ResultVo createDatabase(String databaseName); +} diff --git a/data-storage/src/main/java/com/huaxing/data/database/service/ITableStructureService.java b/data-storage/src/main/java/com/huaxing/data/database/service/ITableStructureService.java new file mode 100644 index 0000000..3c411f5 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/database/service/ITableStructureService.java @@ -0,0 +1,40 @@ +package com.huaxing.data.database.service; + +import com.huaxing.data.database.domain.TableDTO; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.storage.service + * @ClassName: ITableStructureService + * @Author: swordmeng8@163.com + * @Description: 表结构操作service + * @Date: 2025/1/13 11:02 + * @Version: 1.0 + */ +public interface ITableStructureService { + + // 添加dfs表多列 + void addDfsColumns(TableDTO tableDTO); + + // 添加Stream流表多列 + void addStreamColumns(TableDTO tableDTO); + + // 创建dfs表 + void createDfsTable(TableDTO tableDTO); + + // 创建Stream流表 + void createStreamTable(TableDTO tableDTO); + + // 取消订阅 + public void unsubscribeStreamTable(TableDTO tableDTO); + + // 订阅流表 + void subscribeStreamTable(TableDTO tableDTO); + + // 判断表是否存在 + boolean isTableExist(String tableName); + + + void createDfsAndStreamTableSubscribe(TableDTO tableDTO); + +} diff --git a/data-storage/src/main/java/com/huaxing/data/database/service/impl/DatabaseServiceImpl.java b/data-storage/src/main/java/com/huaxing/data/database/service/impl/DatabaseServiceImpl.java new file mode 100644 index 0000000..2606599 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/database/service/impl/DatabaseServiceImpl.java @@ -0,0 +1,62 @@ +package com.huaxing.data.database.service.impl; + +import com.huaxing.common.result.ResultVo; +import com.huaxing.common.util.StrUtils; +import com.huaxing.data.database.service.IDatabaseService; +import com.huaxing.data.database.template.ISqlTemplateService; +import com.huaxing.dolphindb.base.CommonService; +import lombok.AllArgsConstructor; +import org.springframework.stereotype.Service; + +/** + * @ProjectName: data-bridge + * @Package: com.huaxing.data.database.service.impl + * @ClassName: DatabaseServiceImpl + * @Author: swordmeng8@163.com + * @Description: 数据库操作实现类 + * @Date: 2025/2/12 13:48 + * @Version: 1.0 + */ +@Service +@AllArgsConstructor +public class DatabaseServiceImpl extends CommonService implements IDatabaseService { + + final ISqlTemplateService tableTemplateService; + + /** + * @description: 检查数据库是否存在 + * @param databaseName 数据库名 + * @return com.huaxing.common.result.ResultVo + * @author swordmeng8@163.com + * @date 2025/2/12 13:48 + */ + @Override + public ResultVo checkExistsDatabase(String databaseName) { + if (!StrUtils.checkStringStartsWithExample(databaseName, "dfs://")) { + return ResultVo.fail("数据库名必须以dfs://开头"); + } + ; + return ResultVo.status(exec(tableTemplateService.checkExistsDatabaseTemplate(databaseName))); + } + + /** + * @description: 创建数据库 + * @param databaseName 数据库名 + * @return void + * @author swordmeng8@163.com + * @date 2025/2/12 13:48 + */ + @Override + public ResultVo createDatabase(String databaseName) { + // 创建数据库操作 +// StringBuilder script = new StringBuilder(); +// script.append("pt = loadTable(\"dfs://").append(tableDTO.getDatabaseName()).append("\", `").append(tableDTO.getTableName()).append(");"); +// tableDTO.getTableColumnList().forEach(columnItem -> { +// script.append("alter table pt add ").append(columnItem.getColumnName()).append(" ").append(columnItem.getColumnType()).append(";"); +// }); +// exec(script.toString()); + + return ResultVo.status(true); + + } +} diff --git a/data-storage/src/main/java/com/huaxing/data/database/service/impl/TableStructureService.java b/data-storage/src/main/java/com/huaxing/data/database/service/impl/TableStructureService.java new file mode 100644 index 0000000..45d8f63 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/database/service/impl/TableStructureService.java @@ -0,0 +1,105 @@ +package com.huaxing.data.database.service.impl; + +import com.huaxing.data.database.domain.TableDTO; +import com.huaxing.data.database.template.ISqlTemplateService; +import com.huaxing.dolphindb.base.CommonService; +import com.huaxing.data.database.service.ITableStructureService; +import lombok.AllArgsConstructor; +import org.springframework.stereotype.Service; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.storage.service.impl + * @ClassName: TableStructureService + * @Author: swordmeng8@163.com + * @Description: 表结构操作service + * @Date: 2025/1/13 11:02 + * @Version: 1.0 + */ +@Service +@AllArgsConstructor +public class TableStructureService extends CommonService implements ITableStructureService { + + final ISqlTemplateService tableTemplateService; + + /** + * 添加dfs表列 + * @param tableDTO + */ + @Override + public void addDfsColumns(TableDTO tableDTO) { + StringBuilder script = new StringBuilder(); + script.append("pt = loadTable(\"dfs://").append(tableDTO.getDatabaseName()).append("\", `").append(tableDTO.getTableName()).append(");"); + tableDTO.getTableColumnList().forEach(columnItem -> { + script.append("alter table pt add ").append(columnItem.getColumnName()).append(" ").append(columnItem.getColumnType()).append(";"); + }); + exec(script.toString()); + } + + /** + * 添加流表列 + * @param tableDTO + */ + @Override + public void addStreamColumns(TableDTO tableDTO) { + StringBuilder streamScript = new StringBuilder(); + tableDTO.getTableColumnList().forEach(columnItem -> { + streamScript.append("addColumn(").append(tableDTO.getTableName()).append(", `").append(columnItem.getColumnName()).append(",").append(columnItem.getColumnType()).append(");"); + }); + exec(streamScript.toString()); + } + + /** + * 创建dfs表 + * @param tableDTO + */ + @Override + public void createDfsTable(TableDTO tableDTO) { + exec(tableTemplateService.createDfsTableTemplate(tableDTO)); + } + + /** + * 创建流表 + * @param tableDTO + */ + @Override + public void createStreamTable(TableDTO tableDTO) { + exec(tableTemplateService.createStreamTableTemplate(tableDTO)); + } + + /** + * 取消流表订阅 + * @param tableDTO + */ + @Override + public void unsubscribeStreamTable(TableDTO tableDTO) { + exec(tableTemplateService.unsubscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName())); + } + + /** + * 订阅流表 + * @param tableDTO + */ + @Override + public void subscribeStreamTable(TableDTO tableDTO) { + exec(tableTemplateService.subscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName())); + } + + /** + * 判断表是否存在 + * @param tableName + */ + @Override + public boolean isTableExist(String tableName) { + return false; + } + + /** + * 创建dfs表和流表并订阅 + * @param tableDTO + */ + @Override + public void createDfsAndStreamTableSubscribe(TableDTO tableDTO) { + exec(tableTemplateService.createDfsAndStreamTableSubscribe(tableDTO)); + } +} diff --git a/data-storage/src/main/java/com/huaxing/data/database/template/ISqlTemplateService.java b/data-storage/src/main/java/com/huaxing/data/database/template/ISqlTemplateService.java new file mode 100644 index 0000000..309331d --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/database/template/ISqlTemplateService.java @@ -0,0 +1,31 @@ +package com.huaxing.data.database.template; + +import com.huaxing.data.database.domain.TableDTO; + +/** + * 表创建模板 + * @author 孟剑 + * @date 2025-01-20 10:32 + */ +public interface ISqlTemplateService { + + // 检查数据库是否存在SQL模板 + String checkExistsDatabaseTemplate(String databaseName); + + // 创建数据库SQL模板 + String createDatabaseTemplate(String databaseName); + + // 创建dfs表模板 + String createDfsTableTemplate(TableDTO tableDTO); + + // 创建流表模板 + String createStreamTableTemplate(TableDTO tableDTO); + + // 取消订阅流表模板 + String unsubscribeStreamTableTemplate(String databaseName, String tableName); + + // 订阅流表模板 + String subscribeStreamTableTemplate(String databaseName, String tableName); + + String createDfsAndStreamTableSubscribe(TableDTO tableDTO); +} diff --git a/data-storage/src/main/java/com/huaxing/data/database/template/SqlTemplateServiceImpl.java b/data-storage/src/main/java/com/huaxing/data/database/template/SqlTemplateServiceImpl.java new file mode 100644 index 0000000..66f35e8 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/database/template/SqlTemplateServiceImpl.java @@ -0,0 +1,150 @@ +package com.huaxing.data.database.template; + +import com.huaxing.data.database.domain.TableDTO; +import com.huaxing.dolphindb.base.CommonService; +import org.springframework.stereotype.Service; + +/** + * @ProjectName: data-bridge + * @Package: com.huaxing.data.database.service.impl + * @ClassName: TableTemplateServiceImpl + * @Author: swordmeng8@163.com + * @Description: 表操作模板 + * @Date: 2025/1/20 10:38 + * @Version: 1.0 + */ +@Service +public class SqlTemplateServiceImpl extends CommonService implements ISqlTemplateService { + + /** + * @Description 检查数据库是否存在 + * @Author swordmeng8@163.com + * @Date 2025/1/20 10:46 + * @Version v1.0 + **/ + @Override + public String checkExistsDatabaseTemplate(String databaseName) { + return null; + } + + /** + * @Description 创建数据库模板 + * @Author swordmeng8@163.com + * @Date 2025/1/20 10:46 + * @Version v1.0 + **/ + @Override + public String createDatabaseTemplate(String databaseName) { + return null; + } + + /** + * @Description 创建 DFS 表模板 + * @Author swordmeng8@163.com + * @Date 2025/1/20 10:46 + * @Version v1.0 + **/ + @Override + public String createDfsTableTemplate(TableDTO tableDTO) { + String sql = ""; + String colNames = tableDTO.handleTableColumnName(tableDTO.getTableColumnList()); + String colTypes = tableDTO.handleTableColumnType(tableDTO.getTableColumnList()); + String colDesc = tableDTO.handleTableColumnDesc(tableDTO.getTableColumnList()); + sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" + + "ptName = \"" + tableDTO.getTableName() + "Dfs\"\n" + + "def createPT(dbPath,ptName){\n" + + " db = database(dbPath)\n" + + " if (existsTable(dbPath, ptName)){\n" + + " pt = loadTable(db,ptName)\n" + + " return pt\n" + + " }\n" + + " else{\n" + + " colNames = `time`projectId`deviceId`" + colNames + "\n" + + " colTypes = [DATETIME,STRING,STRING," + colTypes + "]\n" + + " t = table(1:0, colNames, colTypes)\n" + + " pt = db.createPartitionedTable(table=t, tableName=ptName, partitionColumns=`time, sortColumns=`deviceId`time, keepDuplicates=LAST)\n" + + " setColumnComment(pt,{time:\"时间\",projectId:\"项目ID\",deviceId:\"设备编号\"," + colDesc + "})\n" + + " setTableComment(table=pt, comment=\"" + tableDTO.getTableDesc() + "\")\n" + + " return pt\n" + + " }\n" + + "}\n" + + "createPT(dbPath,ptName);"; + return sql; + } + + /** + * @Description 创建流表模板 + * @Author swordmeng8@163.com + * @Date 2025/1/20 10:46 + * @Version v1.0 + **/ + @Override + public String createStreamTableTemplate(TableDTO tableDTO) { + String sql = ""; + String colNames = tableDTO.handleTableColumnName(tableDTO.getTableColumnList()); + String colTypes = tableDTO.handleTableColumnType(tableDTO.getTableColumnList()); +// String colDesc = tableDTO.handleTableColumnDesc(tableDTO.getTableColumnList()); + sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" + + "stName = \"" + tableDTO.getTableName() + "Stream\"\n" + + "def createST(stName){\n" + + " if(not existsStreamTable(stName)){\n" + + " colNames = `time`projectId`deviceId`" + colNames + "\n" + + " colTypes = [DATETIME,STRING,STRING,"+ colTypes +"]\n" + + " st = streamTable(150000:0, colNames, colTypes)\n" + + " enableTableShareAndPersistence(table=st, tableName=stName, retentionMinutes=10, cacheSize=100000, preCache=100000 )\n" + + " return st\n" + + " }\n" + + " else{\n" + + " return objByName(stName, true)\n" + + " }\n" + + "}\n" + + "createST(stName);"; + return sql; + } + + /** + * @Description 取消订阅流表模板 + * @Author swordmeng8@163.com + * @Date 2025/1/20 10:46 + * @Version v1.0 + **/ + @Override + public String unsubscribeStreamTableTemplate(String databaseName, String tableName) { + String sql = ""; + sql = "dbPath = \"dfs://" + databaseName + "\"\n" + + "stName = \"" + tableName + "Stream\"\n" + + "ptName = \"" + tableName + "Dfs\"\n" + + "unsubscribeTable(tableName=stName, actionName=`" + tableName + "ChgTime);"; + return sql; + } + + /** + * @Description 订阅流表模板 + * @Author swordmeng8@163.com + * @Date 2025/1/20 10:46 + * @Version v1.0 + **/ + @Override + public String subscribeStreamTableTemplate(String databaseName, String tableName) { + String sql = ""; + sql = "dbPath = \"dfs://" + databaseName + "\"\n" + + "stName = \"" + tableName + "Stream\"\n" + + "ptName = \"" + tableName + "Dfs\"\n" + + "subscribeTable(tableName=stName, actionName=`" + tableName + "ChgTime, offset=-1, handler=loadTable(dbPath,ptName), msgAsTable=true);"; + return sql; + } + + @Override + public String createDfsAndStreamTableSubscribe(TableDTO tableDTO) { + StringBuilder script = new StringBuilder(); + // 执行创建 DFS 表模板 + script.append(createDfsTableTemplate(tableDTO)); + // 执行创建流表模板 + script.append(createStreamTableTemplate(tableDTO)); + // 取消订阅流表模板 + script.append(unsubscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName())); + // 订阅流表模板 + script.append(subscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName())); + return script.toString(); + } +} diff --git a/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableColumnDTO.java b/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableColumnDTO.java deleted file mode 100644 index 96ba9dc..0000000 --- a/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableColumnDTO.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.huaxing.data.tablemanagement.domain; - -import lombok.Data; - -/** - * @ProjectName: iot-data-bridge - * @Package: com.huaxing.data.tablemanagement.domain - * @ClassName: TableColumnDTO - * @Author: swordmeng8@163.com - * @Description: 表列DTO - * @Date: 2025/1/15 16:05 - * @Version: 1.0 - */ -@Data -public class TableColumnDTO { - /** - * 列名 - */ - private String columnName; - /** - * 列类型 - */ - private String columnType; - /** - * 列描述 - */ - private String columnDesc; - - public TableColumnDTO() { - } - public TableColumnDTO(String columnName, String columnType, String columnDesc) { - this.columnName = columnName; - this.columnType = columnType; - this.columnDesc = columnDesc; - } -} diff --git a/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableDTO.java b/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableDTO.java deleted file mode 100644 index 14c8903..0000000 --- a/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableDTO.java +++ /dev/null @@ -1,65 +0,0 @@ -package com.huaxing.data.tablemanagement.domain; - -import lombok.Data; - -import java.util.Iterator; -import java.util.List; -import java.util.Objects; - -/** - * @ProjectName: iot-data-bridge - * @Package: com.huaxing.data.tablemanagement.domain - * @ClassName: TableDTO - * @Author: swordmeng8@163.com - * @Description: 表操作类 - * @Date: 2025/1/15 16:03 - * @Version: 1.0 - */ - -@Data -public class TableDTO { - - /** - * 数据库名 - */ - private String databaseName; - /** - * 表名 - */ - private String tableName; - /** - * 表描述 - */ - private String tableDesc; - /** - * 表列信息 - */ - private List tableColumnList; - - public String handleTableColumnName(List tableColumnList) { - return tableColumnList.stream().map(TableColumnDTO::getColumnName).reduce((a, b) -> a + "`" + b).get(); - } - - public String handleTableColumnType(List tableColumnList) { - return tableColumnList.stream().map(TableColumnDTO::getColumnType).reduce((a, b) -> a + "," + b).get(); - } - - public String handleTableColumnDesc(List tableColumnList) { - String columnDesc = ""; - // 获取列与描述的拼接字符串并返回 - Iterator iterator = tableColumnList.iterator(); - while (iterator.hasNext()) { - TableColumnDTO tableColumn = iterator.next(); - if (Objects.isNull(tableColumn.getColumnDesc())) { - columnDesc += tableColumn.getColumnName() + ":" + "\"-\""; - } else { - columnDesc += tableColumn.getColumnName() + ":\"" + tableColumn.getColumnDesc()+"\""; - } - if (iterator.hasNext()) { - columnDesc += ","; - } - } - return columnDesc; - } - -} diff --git a/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableStructureService.java b/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableStructureService.java deleted file mode 100644 index 8ccffff..0000000 --- a/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableStructureService.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.huaxing.data.tablemanagement.service; - -import com.huaxing.data.tablemanagement.domain.TableDTO; - -import java.sql.SQLException; -import java.util.List; - -/** - * @ProjectName: iot-data-bridge - * @Package: com.huaxing.data.storage.service - * @ClassName: ITableStructureService - * @Author: swordmeng8@163.com - * @Description: 表结构操作service - * @Date: 2025/1/13 11:02 - * @Version: 1.0 - */ -public interface ITableStructureService { - - // 添加dfs表多列 - void addDfsColumns(TableDTO tableDTO); - - // 添加Stream流表多列 - void addStreamColumns(TableDTO tableDTO); - - // 创建dfs表 - void createDfsTable(TableDTO tableDTO); - - // 创建Stream流表 - void createStreamTable(TableDTO tableDTO); - - // 取消订阅 - public void unsubscribeStreamTable(TableDTO tableDTO); - - // 订阅流表 - void subscribeStreamTable(TableDTO tableDTO); - - // 判断表是否存在 - boolean isTableExist(String tableName); - - - void createDfsAndStreamTableSubscribe(TableDTO tableDTO); - -} diff --git a/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableTemplateService.java b/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableTemplateService.java deleted file mode 100644 index 6990963..0000000 --- a/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableTemplateService.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.huaxing.data.tablemanagement.service; - -import com.huaxing.data.tablemanagement.domain.TableDTO; - -import java.util.List; - -/** - * 表创建模板 - * @author 孟剑 - * @date 2025-01-20 10:32 - */ -public interface ITableTemplateService { - - // 创建dfs表模板 - String createDfsTableTemplate(TableDTO tableDTO); - - // 创建流表模板 - String createStreamTableTemplate(TableDTO tableDTO); - - // 取消订阅流表模板 - String unsubscribeStreamTableTemplate(String databaseName, String tableName); - - // 订阅流表模板 - String subscribeStreamTableTemplate(String databaseName, String tableName); - - String createDfsAndStreamTableSubscribe(TableDTO tableDTO); -} diff --git a/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableStructureService.java b/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableStructureService.java deleted file mode 100644 index 77d7a71..0000000 --- a/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableStructureService.java +++ /dev/null @@ -1,109 +0,0 @@ -package com.huaxing.data.tablemanagement.service.impl; - -import com.huaxing.data.tablemanagement.domain.TableDTO; -import com.huaxing.data.tablemanagement.service.ITableTemplateService; -import com.huaxing.dolphindb.base.CommonService; -import com.huaxing.data.tablemanagement.service.ITableStructureService; -import org.springframework.stereotype.Service; - -import java.sql.SQLException; -import java.util.List; - -/** - * @ProjectName: iot-data-bridge - * @Package: com.huaxing.data.storage.service.impl - * @ClassName: TableStructureService - * @Author: swordmeng8@163.com - * @Description: 表结构操作service - * @Date: 2025/1/13 11:02 - * @Version: 1.0 - */ -@Service -public class TableStructureService extends CommonService implements ITableStructureService { - - final ITableTemplateService tableTemplateService; - public TableStructureService(ITableTemplateService tableTemplateService) { - this.tableTemplateService = tableTemplateService; - } - - /** - * 添加dfs表列 - * @param tableDTO - */ - @Override - public void addDfsColumns(TableDTO tableDTO) { - StringBuilder script = new StringBuilder(); - script.append("pt = loadTable(\"dfs://").append(tableDTO.getDatabaseName()).append("\", `").append(tableDTO.getTableName()).append(");"); - tableDTO.getTableColumnList().forEach(columnItem -> { - script.append("alter table pt add ").append(columnItem.getColumnName()).append(" ").append(columnItem.getColumnType()).append(";"); - }); - exec(script.toString()); - } - - /** - * 添加流表列 - * @param tableDTO - */ - @Override - public void addStreamColumns(TableDTO tableDTO) { - StringBuilder streamScript = new StringBuilder(); - tableDTO.getTableColumnList().forEach(columnItem -> { - streamScript.append("addColumn(").append(tableDTO.getTableName()).append(", `").append(columnItem.getColumnName()).append(",").append(columnItem.getColumnType()).append(");"); - }); - exec(streamScript.toString()); - } - - /** - * 创建dfs表 - * @param tableDTO - */ - @Override - public void createDfsTable(TableDTO tableDTO) { - exec(tableTemplateService.createDfsTableTemplate(tableDTO)); - } - - /** - * 创建流表 - * @param tableDTO - */ - @Override - public void createStreamTable(TableDTO tableDTO) { - exec(tableTemplateService.createStreamTableTemplate(tableDTO)); - } - - /** - * 取消流表订阅 - * @param tableDTO - */ - @Override - public void unsubscribeStreamTable(TableDTO tableDTO) { - exec(tableTemplateService.unsubscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName())); - } - - /** - * 订阅流表 - * @param tableDTO - */ - @Override - public void subscribeStreamTable(TableDTO tableDTO) { - exec(tableTemplateService.subscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName())); - } - - /** - * 判断表是否存在 - * @param tableName - */ - @Override - public boolean isTableExist(String tableName) { - return false; - } - - /** - * 创建dfs表和流表并订阅 - * @param tableDTO - */ - @Override - public void createDfsAndStreamTableSubscribe(TableDTO tableDTO) { - exec(tableTemplateService.createDfsAndStreamTableSubscribe(tableDTO)); - } -} diff --git a/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableTemplateServiceImpl.java b/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableTemplateServiceImpl.java deleted file mode 100644 index b056218..0000000 --- a/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableTemplateServiceImpl.java +++ /dev/null @@ -1,131 +0,0 @@ -package com.huaxing.data.tablemanagement.service.impl; - -import com.huaxing.data.tablemanagement.domain.TableDTO; -import com.huaxing.data.tablemanagement.service.ITableTemplateService; -import com.huaxing.dolphindb.base.CommonService; -import org.springframework.stereotype.Service; - -/** - * @ProjectName: data-bridge - * @Package: com.huaxing.data.tablemanagement.service.impl - * @ClassName: TableTemplateServiceImpl - * @Author: swordmeng8@163.com - * @Description: 表操作模板 - * @Date: 2025/1/20 10:38 - * @Version: 1.0 - */ -@Service -public class TableTemplateServiceImpl extends CommonService implements ITableTemplateService { - - /** - * @Description 创建 DFS 表模板 - * @Author swordmeng8@163.com - * @Date 2025/1/20 10:46 - * @Version v1.0 - **/ - @Override - public String createDfsTableTemplate(TableDTO tableDTO) { - String sql = ""; - String colNames = tableDTO.handleTableColumnName(tableDTO.getTableColumnList()); - String colTypes = tableDTO.handleTableColumnType(tableDTO.getTableColumnList()); - String colDesc = tableDTO.handleTableColumnDesc(tableDTO.getTableColumnList()); - sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" + - "ptName = \"" + tableDTO.getTableName() + "Dfs\"\n" + - "def createPT(dbPath,ptName){\n" + - " db = database(dbPath)\n" + - " if (existsTable(dbPath, ptName)){\n" + - " pt = loadTable(db,ptName)\n" + - " return pt\n" + - " }\n" + - " else{\n" + - " colNames = `time`projectId`deviceId`" + colNames + "\n" + - " colTypes = [DATETIME,STRING,STRING," + colTypes + "]\n" + - " t = table(1:0, colNames, colTypes)\n" + - " pt = db.createPartitionedTable(table=t, tableName=ptName, partitionColumns=`time, sortColumns=`deviceId`time, keepDuplicates=LAST)\n" + - " setColumnComment(pt,{time:\"时间\",projectId:\"项目ID\",deviceId:\"设备编号\"," + colDesc + "})\n" + - " setTableComment(table=pt, comment=\"" + tableDTO.getTableDesc() + "\")\n" + - " return pt\n" + - " }\n" + - "}\n" + - "createPT(dbPath,ptName);"; - return sql; - } - - /** - * @Description 创建流表模板 - * @Author swordmeng8@163.com - * @Date 2025/1/20 10:46 - * @Version v1.0 - **/ - @Override - public String createStreamTableTemplate(TableDTO tableDTO) { - String sql = ""; - String colNames = tableDTO.handleTableColumnName(tableDTO.getTableColumnList()); - String colTypes = tableDTO.handleTableColumnType(tableDTO.getTableColumnList()); - String colDesc = tableDTO.handleTableColumnDesc(tableDTO.getTableColumnList()); - sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" + - "stName = \"" + tableDTO.getTableName() + "Stream\"\n" + - "def createST(stName){\n" + - " if(not existsStreamTable(stName)){\n" + - " colNames = `time`projectId`deviceId`" + colNames + "\n" + - " colTypes = [DATETIME,STRING,STRING,"+ colTypes +"]\n" + - " st = streamTable(150000:0, colNames, colTypes)\n" + - " enableTableShareAndPersistence(table=st, tableName=stName, retentionMinutes=10, cacheSize=100000, preCache=100000 )\n" + -// " setColumnComment(st,{time:\"时间\",projectId:\"项目ID\",deviceId:\"设备编号\"," + colDesc + "})\n" + -// " setTableComment(table=st, comment=\"" + tableDTO.getTableDesc() + "\")\n" + - " return st\n" + - " }\n" + - " else{\n" + - " return objByName(stName, true)\n" + - " }\n" + - "}\n" + - "createST(stName);"; - return sql; - } - - /** - * @Description 取消订阅流表模板 - * @Author swordmeng8@163.com - * @Date 2025/1/20 10:46 - * @Version v1.0 - **/ - @Override - public String unsubscribeStreamTableTemplate(String databaseName, String tableName) { - String sql = ""; - sql = "dbPath = \"dfs://" + databaseName + "\"\n" + - "stName = \"" + tableName + "Stream\"\n" + - "ptName = \"" + tableName + "Dfs\"\n" + - "unsubscribeTable(tableName=stName, actionName=`" + tableName + "ChgTime);"; - return sql; - } - - /** - * @Description 订阅流表模板 - * @Author swordmeng8@163.com - * @Date 2025/1/20 10:46 - * @Version v1.0 - **/ - @Override - public String subscribeStreamTableTemplate(String databaseName, String tableName) { - String sql = ""; - sql = "dbPath = \"dfs://" + databaseName + "\"\n" + - "stName = \"" + tableName + "Stream\"\n" + - "ptName = \"" + tableName + "Dfs\"\n" + - "subscribeTable(tableName=stName, actionName=`" + tableName + "ChgTime, offset=-1, handler=loadTable(dbPath,ptName), msgAsTable=true);"; - return sql; - } - - @Override - public String createDfsAndStreamTableSubscribe(TableDTO tableDTO) { - StringBuilder script = new StringBuilder(); - // 执行创建 DFS 表模板 - script.append(createDfsTableTemplate(tableDTO)); - // 执行创建流表模板 - script.append(createStreamTableTemplate(tableDTO)); - // 取消订阅流表模板 - script.append(unsubscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName())); - // 订阅流表模板 - script.append(subscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName())); - return script.toString(); - } -} diff --git a/data-storage/src/main/java/com/huaxing/test/TestController.java b/data-storage/src/main/java/com/huaxing/test/TestController.java index 8d3c12e..9555f7d 100644 --- a/data-storage/src/main/java/com/huaxing/test/TestController.java +++ b/data-storage/src/main/java/com/huaxing/test/TestController.java @@ -1,24 +1,9 @@ package com.huaxing.test; -import com.huaxing.data.storage.domain.DataAnalysisDTO; -import com.huaxing.data.storage.service.IDeviceDataQueryDfsService; -import com.huaxing.data.storage.service.IDeviceDataQueryStreamService; -import com.huaxing.data.storage.service.IDeviceDataStoredService; -import com.huaxing.data.tablemanagement.domain.TableColumnDTO; -import com.huaxing.data.tablemanagement.domain.TableDTO; -import com.huaxing.data.tablemanagement.service.ITableStructureService; -import com.huaxing.common.util.JacksonUtil; -import com.huaxing.data.tablemanagement.service.ITableTemplateService; -import com.huaxing.mqtt.processor.MqttMessageSender; -import com.huaxing.common.result.ResultVo; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import java.util.*; - /** * @ProjectName: iot-data-bridge @@ -35,206 +20,206 @@ import java.util.*; @SuppressWarnings("all") public class TestController { - @Autowired - private MqttMessageSender messageSender; - final IDeviceDataStoredService dataStoredService; - final IDeviceDataQueryDfsService dataQueryDfsService; - final IDeviceDataQueryStreamService dataQueryStreamService; - final ITableStructureService tableStructureService; - - - // =============================================== 测试构造器 ====================================== - public TestController(IDeviceDataStoredService dataStoredService, IDeviceDataQueryDfsService dataQueryDfsService, IDeviceDataQueryStreamService dataQueryStreamService, ITableStructureService tableStructureService, ITableTemplateService tableTemplateService) { - this.dataStoredService = dataStoredService; - this.dataQueryDfsService = dataQueryDfsService; - this.dataQueryStreamService = dataQueryStreamService; - this.tableStructureService = tableStructureService; - } - - - // =============================================== 测试插入数据 ====================================== - - /** - * 测试数据插入 - */ - @GetMapping(value = "/testInsert") - public void testInsert() { - String tableName = "MyTable2Stream"; - String sql = "INSERT INTO " + tableName + " (time, projectId, deviceId, test) VALUES (2025.01.11 00:00:00,'48', '0jZU2102_0806_0011', 'testVal')"; - dataStoredService.execute(sql); - log.info("SUCCESS"); - } - - // =============================================== 测试Dfs表查询 ====================================== - - /** - * 测试Dfs表查询 - */ - @GetMapping(value = "/testSelectDfs") - public ResultVo>> testSelectDfs() { - String dbPath = "dfs://ZbDB"; - String sql = String.format("select * from loadTable('%s','%s')", dbPath, "ZbWaterMeter1Dfs"); - return ResultVo.ok(dataQueryDfsService.selectList(sql)); - } - - // =============================================== 测试订阅流表查询 ====================================== - - /** - * 测试订阅流表查询 - */ - @GetMapping(value = "/testSelectStream") - public ResultVo>> testSelectStream() { - String sql = "select * from WaterMeterTset1Stream"; - return ResultVo.ok(dataQueryStreamService.selectList(sql)); - } - - // =============================================== 给指定的流表增加列字段 ====================================== - - /** - * 测试给指定的流表增加列字段 - */ - @GetMapping(value = "/testStreamAddColumn") - public void testStreamAddColumn() { - String tableName = "WaterMeterTsetStream"; - String columnName = "test"; - String columnType = "STRING"; - String columnDesc = "test"; - TableDTO tableDTO = new TableDTO(); - tableDTO.setTableName(tableName); - tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc))); - try { - tableStructureService.addStreamColumns(tableDTO); - log.info("SUCCESS"); - } catch (Exception e) { - log.error("FAIL"); - e.printStackTrace(); - } - } - - // =============================================== 给指定的Dfs表增加列字段 ====================================== - /** - * 测试给指定的Dfs表增加列字段 - */ - @GetMapping(value = "/testDfsAddColumn") - public void testDfsAddColumn() { - String tableName = "WaterMeterTsetDfs"; - String columnName = "test"; - String columnType = "STRING"; - String columnDesc = "test描述"; - TableDTO tableDTO = new TableDTO(); - tableDTO.setTableName(tableName); - tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc))); - try { - tableStructureService.addDfsColumns(tableDTO); - log.info("SUCCESS"); - } catch (Exception e) { - log.error("FAIL"); - e.printStackTrace(); - } - } - - - // ===================================== mqtt消息发送-测试数据入库 START ============================== - - /** - * 测试发送mqtt消息 - */ - @GetMapping(value = "/testSendMessage") - public void testSendMessage() { - for (int i = 0; i < 2; i++) { - List> dataList = new ArrayList<>(); - dataList.add(handleMapByIndex(i)); - DataAnalysisDTO dataAnalysisDTO = DataAnalysisDTO.builder().tableName( "WaterMeterTsetStream").dataList(dataList).build(); - messageSender.send("iot/test1/in-storage", JacksonUtil.objectStr(dataAnalysisDTO)); - } - } - - /** - * 根据索引生成测试数据 - * @param index 索引 - * @return - */ - private Map handleMapByIndex(int index) { - Map map = new HashMap<>(); - map.put("time", new Date()); - map.put("projectId", "0jZU2102"); - map.put("deviceId", "0jZU2102_0806_0011"); - map.put("WM_WFA", 124.656 + index); - map.put("WM_WFA_Unit", "m³"); - map.put("test", "test"); - return map; - } - - // =============================================== 创建Dfs表 Stream流表 取消订阅 添加订阅 创建 ====================================== - - /** - * 测试创建Dfs表 - */ - @GetMapping(value = "/testCreateDfsTable") - public void testCreateDfsTable() { - TableDTO tableDTO = new TableDTO(); - tableDTO.setDatabaseName("ZbDB"); - tableDTO.setTableName("Test1"); - List tableColumnList = new ArrayList<>(); - tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); - tableColumnList.add(new TableColumnDTO("test1", "STRING","test1")); - tableColumnList.add(new TableColumnDTO("test2", "STRING","test2")); - tableDTO.setTableColumnList(tableColumnList); - tableStructureService.createDfsTable(tableDTO); - } - - /** - * 测试创建流表 - */ - @GetMapping(value = "/testCreateStreamTable") - public void testCreateStreamTable() { - TableDTO tableDTO = new TableDTO(); - tableDTO.setDatabaseName("ZbDB"); - tableDTO.setTableName("Test1"); - List tableColumnList = new ArrayList<>(); - tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); - tableColumnList.add(new TableColumnDTO("test1", "STRING","test1")); - tableColumnList.add(new TableColumnDTO("test2", "STRING","test2")); - tableDTO.setTableColumnList(tableColumnList); - tableStructureService.createStreamTable(tableDTO); - } - - /** - * 取消订阅流表 - */ - @GetMapping(value = "/testUnsubscribeStreamTable") - public void testUnsubscribeStreamTable() { - TableDTO tableDTO = new TableDTO(); - tableDTO.setDatabaseName("ZbDB"); - tableDTO.setTableName("Test1"); - tableStructureService.unsubscribeStreamTable(tableDTO); - } - - /** - * 订阅流表 - */ - @GetMapping(value = "/testSubscribeStreamTable") - public void testSubscribeStreamTable() { - TableDTO tableDTO = new TableDTO(); - tableDTO.setDatabaseName("ZbDB"); - tableDTO.setTableName("Test1"); - tableStructureService.subscribeStreamTable(tableDTO); - } - - /** - * 创建Dfs和Stream表并添加订阅 - */ - @GetMapping(value = "/testCreateDfsAndStreamTableSubscribe") - public void testCreateDfsAndStreamTableSubscribe() { - TableDTO tableDTO = new TableDTO(); - tableDTO.setDatabaseName("ZbDB"); - tableDTO.setTableName("MyTe2"); - tableDTO.setTableDesc("MyTe2测试表"); - List tableColumnList = new ArrayList<>(); - tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); - tableDTO.setTableColumnList(tableColumnList); - tableStructureService.createDfsAndStreamTableSubscribe(tableDTO); - } +// @Autowired +// private MqttMessageSender messageSender; +// final IDeviceDataStoredService dataStoredService; +// final IDeviceDataQueryDfsService dataQueryDfsService; +// final IDeviceDataQueryStreamService dataQueryStreamService; +// final ITableStructureService tableStructureService; +// +// +// // =============================================== 测试构造器 ====================================== +// public TestController(IDeviceDataStoredService dataStoredService, IDeviceDataQueryDfsService dataQueryDfsService, IDeviceDataQueryStreamService dataQueryStreamService, ITableStructureService tableStructureService, ISqlTemplateService tableTemplateService) { +// this.dataStoredService = dataStoredService; +// this.dataQueryDfsService = dataQueryDfsService; +// this.dataQueryStreamService = dataQueryStreamService; +// this.tableStructureService = tableStructureService; +// } +// +// +// // =============================================== 测试插入数据 ====================================== +// +// /** +// * 测试数据插入 +// */ +// @GetMapping(value = "/testInsert") +// public void testInsert() { +// String tableName = "MyTable2Stream"; +// String sql = "INSERT INTO " + tableName + " (time, projectId, deviceId, test) VALUES (2025.01.11 00:00:00,'48', '0jZU2102_0806_0011', 'testVal')"; +// dataStoredService.execute(sql); +// log.info("SUCCESS"); +// } +// +// // =============================================== 测试Dfs表查询 ====================================== +// +// /** +// * 测试Dfs表查询 +// */ +// @GetMapping(value = "/testSelectDfs") +// public ResultVo>> testSelectDfs() { +// String dbPath = "dfs://ZbDB"; +// String sql = String.format("select * from loadTable('%s','%s')", dbPath, "ZbWaterMeter1Dfs"); +// return ResultVo.ok(dataQueryDfsService.selectList(sql)); +// } +// +// // =============================================== 测试订阅流表查询 ====================================== +// +// /** +// * 测试订阅流表查询 +// */ +// @GetMapping(value = "/testSelectStream") +// public ResultVo>> testSelectStream() { +// String sql = "select * from WaterMeterTset1Stream"; +// return ResultVo.ok(dataQueryStreamService.selectList(sql)); +// } +// +// // =============================================== 给指定的流表增加列字段 ====================================== +// +// /** +// * 测试给指定的流表增加列字段 +// */ +// @GetMapping(value = "/testStreamAddColumn") +// public void testStreamAddColumn() { +// String tableName = "WaterMeterTsetStream"; +// String columnName = "test"; +// String columnType = "STRING"; +// String columnDesc = "test"; +// TableDTO tableDTO = new TableDTO(); +// tableDTO.setTableName(tableName); +// tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc))); +// try { +// tableStructureService.addStreamColumns(tableDTO); +// log.info("SUCCESS"); +// } catch (Exception e) { +// log.error("FAIL"); +// e.printStackTrace(); +// } +// } +// +// // =============================================== 给指定的Dfs表增加列字段 ====================================== +// /** +// * 测试给指定的Dfs表增加列字段 +// */ +// @GetMapping(value = "/testDfsAddColumn") +// public void testDfsAddColumn() { +// String tableName = "WaterMeterTsetDfs"; +// String columnName = "test"; +// String columnType = "STRING"; +// String columnDesc = "test描述"; +// TableDTO tableDTO = new TableDTO(); +// tableDTO.setTableName(tableName); +// tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc))); +// try { +// tableStructureService.addDfsColumns(tableDTO); +// log.info("SUCCESS"); +// } catch (Exception e) { +// log.error("FAIL"); +// e.printStackTrace(); +// } +// } +// +// +// // ===================================== mqtt消息发送-测试数据入库 START ============================== +// +// /** +// * 测试发送mqtt消息 +// */ +// @GetMapping(value = "/testSendMessage") +// public void testSendMessage() { +// for (int i = 0; i < 2; i++) { +// List> dataList = new ArrayList<>(); +// dataList.add(handleMapByIndex(i)); +// DataAnalysisDTO dataAnalysisDTO = DataAnalysisDTO.builder().tableName( "WaterMeterTsetStream").dataList(dataList).build(); +// messageSender.send("iot/test1/in-storage", JacksonUtil.objectStr(dataAnalysisDTO)); +// } +// } +// +// /** +// * 根据索引生成测试数据 +// * @param index 索引 +// * @return +// */ +// private Map handleMapByIndex(int index) { +// Map map = new HashMap<>(); +// map.put("time", new Date()); +// map.put("projectId", "0jZU2102"); +// map.put("deviceId", "0jZU2102_0806_0011"); +// map.put("WM_WFA", 124.656 + index); +// map.put("WM_WFA_Unit", "m³"); +// map.put("test", "test"); +// return map; +// } +// +// // =============================================== 创建Dfs表 Stream流表 取消订阅 添加订阅 创建 ====================================== +// +// /** +// * 测试创建Dfs表 +// */ +// @GetMapping(value = "/testCreateDfsTable") +// public void testCreateDfsTable() { +// TableDTO tableDTO = new TableDTO(); +// tableDTO.setDatabaseName("ZbDB"); +// tableDTO.setTableName("Test1"); +// List tableColumnList = new ArrayList<>(); +// tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); +// tableColumnList.add(new TableColumnDTO("test1", "STRING","test1")); +// tableColumnList.add(new TableColumnDTO("test2", "STRING","test2")); +// tableDTO.setTableColumnList(tableColumnList); +// tableStructureService.createDfsTable(tableDTO); +// } +// +// /** +// * 测试创建流表 +// */ +// @GetMapping(value = "/testCreateStreamTable") +// public void testCreateStreamTable() { +// TableDTO tableDTO = new TableDTO(); +// tableDTO.setDatabaseName("ZbDB"); +// tableDTO.setTableName("Test1"); +// List tableColumnList = new ArrayList<>(); +// tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); +// tableColumnList.add(new TableColumnDTO("test1", "STRING","test1")); +// tableColumnList.add(new TableColumnDTO("test2", "STRING","test2")); +// tableDTO.setTableColumnList(tableColumnList); +// tableStructureService.createStreamTable(tableDTO); +// } +// +// /** +// * 取消订阅流表 +// */ +// @GetMapping(value = "/testUnsubscribeStreamTable") +// public void testUnsubscribeStreamTable() { +// TableDTO tableDTO = new TableDTO(); +// tableDTO.setDatabaseName("ZbDB"); +// tableDTO.setTableName("Test1"); +// tableStructureService.unsubscribeStreamTable(tableDTO); +// } +// +// /** +// * 订阅流表 +// */ +// @GetMapping(value = "/testSubscribeStreamTable") +// public void testSubscribeStreamTable() { +// TableDTO tableDTO = new TableDTO(); +// tableDTO.setDatabaseName("ZbDB"); +// tableDTO.setTableName("Test1"); +// tableStructureService.subscribeStreamTable(tableDTO); +// } +// +// /** +// * 创建Dfs和Stream表并添加订阅 +// */ +// @GetMapping(value = "/testCreateDfsAndStreamTableSubscribe") +// public void testCreateDfsAndStreamTableSubscribe() { +// TableDTO tableDTO = new TableDTO(); +// tableDTO.setDatabaseName("ZbDB"); +// tableDTO.setTableName("MyTe2"); +// tableDTO.setTableDesc("MyTe2测试表"); +// List tableColumnList = new ArrayList<>(); +// tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); +// tableDTO.setTableColumnList(tableColumnList); +// tableStructureService.createDfsAndStreamTableSubscribe(tableDTO); +// } } diff --git a/pom.xml b/pom.xml index e0a1a19..053b8d6 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ data-framework data-storage + data-storage-api @@ -115,22 +116,16 @@ + + + + org.apache.maven.plugins + maven-antrun-plugin + 3.0.0 + + + - - - aliyunmaven - 阿里云公共仓库 - https://maven.aliyun.com/repository/public - - - true - - - - false - - -