From de3e0e95175943e5af2338a4597cf1f7d9769257 Mon Sep 17 00:00:00 2001 From: swordmeng Date: Mon, 20 Jan 2025 16:43:28 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=BB=A3=E7=A0=81=E5=93=8D?= =?UTF-8?q?=E5=BA=94=E7=B1=BB=E5=B0=81=E8=A3=85=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- data-framework/pom.xml | 5 ++ .../com/huaxing/common/constant/AppConstant.java | 8 +++ .../com/huaxing/common/result/BaseException.java | 50 +++++++++++++++ .../com/huaxing/common/result/BizException.java | 23 +++++++ .../java/com/huaxing/common/result/ResultVo.java | 52 +++++++++++++++ .../src/main/java/com/huaxing/mqtt/ReadMe | 0 .../mqtt/processor/MqttMessageReceiver.java | 49 +-------------- .../data/api/controller/DataQueryController.java | 31 +++++++++ .../com/huaxing/data/api/domain/DataQueryDTO.java | 26 ++++++++ .../com/huaxing/data/mqtt/MqttMessageConsumer.java | 61 ------------------ .../huaxing/data/storage/domain/DataQueryDTO.java | 27 -------- .../java/com/huaxing/mqtt/MqttMessageConsumer.java | 58 +++++++++++++++++ .../main/java/com/huaxing/test/TestController.java | 9 +-- .../src/main/resources/application-dev.yaml | 1 + data-storage/src/main/resources/application.yaml | 2 + data-storage/src/main/resources/banner.txt | 16 ++--- .../src/main/resources/script/补水箱.script | 73 ++++++++++++++++++++++ data-storage/src/main/resources/补水箱.script | 73 ---------------------- pom.xml | 32 +++++----- 19 files changed, 360 insertions(+), 236 deletions(-) create mode 100644 data-framework/src/main/java/com/huaxing/common/result/BaseException.java create mode 100644 data-framework/src/main/java/com/huaxing/common/result/BizException.java create mode 100644 data-framework/src/main/java/com/huaxing/common/result/ResultVo.java delete mode 100644 data-framework/src/main/java/com/huaxing/mqtt/ReadMe create mode 100644 data-storage/src/main/java/com/huaxing/data/api/controller/DataQueryController.java create mode 100644 data-storage/src/main/java/com/huaxing/data/api/domain/DataQueryDTO.java delete mode 100644 data-storage/src/main/java/com/huaxing/data/mqtt/MqttMessageConsumer.java delete mode 100644 data-storage/src/main/java/com/huaxing/data/storage/domain/DataQueryDTO.java create mode 100644 data-storage/src/main/java/com/huaxing/mqtt/MqttMessageConsumer.java create mode 100644 data-storage/src/main/resources/script/补水箱.script delete mode 100644 data-storage/src/main/resources/补水箱.script diff --git a/data-framework/pom.xml b/data-framework/pom.xml index fb86669..0321883 100644 --- a/data-framework/pom.xml +++ b/data-framework/pom.xml @@ -17,6 +17,11 @@ spring-beans 6.2.1 + + io.swagger.core.v3 + swagger-annotations + 2.2.19 + \ No newline at end of file diff --git a/data-framework/src/main/java/com/huaxing/common/constant/AppConstant.java b/data-framework/src/main/java/com/huaxing/common/constant/AppConstant.java index 594ebf6..ecbfde2 100644 --- a/data-framework/src/main/java/com/huaxing/common/constant/AppConstant.java +++ b/data-framework/src/main/java/com/huaxing/common/constant/AppConstant.java @@ -12,4 +12,12 @@ package com.huaxing.common.constant; public class AppConstant { public static final String APP_NAME = "iot-data-bridge"; + + /** 网络请求 - 成功 */ + public static final String REQUEST_SUCCESS_CODE = "0"; + /** 网络请求 - 失败(默认) */ + public static final String REQUEST_CODE_FAIL = "-1"; + + /** 异常代码 - 业务类型 */ + public static final String CODE_BIZ = "-1"; } diff --git a/data-framework/src/main/java/com/huaxing/common/result/BaseException.java b/data-framework/src/main/java/com/huaxing/common/result/BaseException.java new file mode 100644 index 0000000..30d8d90 --- /dev/null +++ b/data-framework/src/main/java/com/huaxing/common/result/BaseException.java @@ -0,0 +1,50 @@ +package com.huaxing.common.result; + +import com.huaxing.common.constant.AppConstant; +import lombok.Getter; +import lombok.ToString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Getter +@ToString +public abstract class BaseException extends RuntimeException { + + private static final Logger Logger = LoggerFactory.getLogger(BizException.class); + + protected final String code; + + protected Object data; + + protected BaseException(String code, String message) { + super(message); + if (code == null){ + this.code = AppConstant.CODE_BIZ; + }else { + this.code = code; + } + } + + public final void print(){ + Logger.error("exception info print: start"); + Logger.error(this.getMessage(),this); + Logger.error("data:{}", data); + Logger.error("exception info print: end"); + } + + public final BaseException setData(Object data){ + this.data = data; + return this; + } + + public final D getData(){ + return (D)data; + } + + protected static String buildMsg(String msg,String hint){ + if (hint != null){ + msg = msg + ":" + hint; + } + return msg; + } +} diff --git a/data-framework/src/main/java/com/huaxing/common/result/BizException.java b/data-framework/src/main/java/com/huaxing/common/result/BizException.java new file mode 100644 index 0000000..9446aa5 --- /dev/null +++ b/data-framework/src/main/java/com/huaxing/common/result/BizException.java @@ -0,0 +1,23 @@ +package com.huaxing.common.result; + +import com.huaxing.common.constant.AppConstant; +import org.springframework.util.StringUtils; + +public class BizException extends BaseException { + + private BizException(String msg) { + super(AppConstant.CODE_BIZ, msg); + } + + /** tag!(msg:hint) */ + public static BizException custom(String tag, String msg, String hint){ + return BizException.custom(tag,BaseException.buildMsg(msg,hint)); + } + /** tag!(msg) */ + public static BizException custom(String tag, String msg){ + if (!StringUtils.isEmpty(msg)){ + tag += "!(" + msg + ")"; + } + return new BizException(tag); + } +} diff --git a/data-framework/src/main/java/com/huaxing/common/result/ResultVo.java b/data-framework/src/main/java/com/huaxing/common/result/ResultVo.java new file mode 100644 index 0000000..88e359e --- /dev/null +++ b/data-framework/src/main/java/com/huaxing/common/result/ResultVo.java @@ -0,0 +1,52 @@ +package com.huaxing.common.result; + +import com.huaxing.common.constant.AppConstant; +import lombok.Getter; + +import java.io.Serializable; + +@Getter +public class ResultVo implements Serializable { + + final String code; + + final String msg; + + final T data; + + private ResultVo(String code, String msg, T data) { + this.code = code; + this.msg = msg; + this.data = data; + } + + private ResultVo(BaseException exception) { + this.code = exception.getCode(); + this.msg = exception.getMessage(); + this.data = exception.getData(); + } + + public static ResultVo ok() { + return ResultVo.ok(null); + } + + public static ResultVo ok(D data) { + return new ResultVo<>(AppConstant.REQUEST_SUCCESS_CODE,"操作成功!",data); + } + + public static ResultVo ok(String msg,D data) { + return new ResultVo<>(AppConstant.REQUEST_SUCCESS_CODE,msg,data); + } + + public static ResultVo fail(String msg) { + return ResultVo.fail(msg,null); + } + + public static ResultVo fail(String msg,D data) { + return new ResultVo<>(AppConstant.REQUEST_CODE_FAIL,msg,data); + } + + public static ResultVo fail(BaseException exception) { + return new ResultVo<>(exception); + } +} diff --git a/data-framework/src/main/java/com/huaxing/mqtt/ReadMe b/data-framework/src/main/java/com/huaxing/mqtt/ReadMe deleted file mode 100644 index e69de29..0000000 diff --git a/data-framework/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java b/data-framework/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java index 27472d0..192febf 100644 --- a/data-framework/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java +++ b/data-framework/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java @@ -3,12 +3,6 @@ package com.huaxing.mqtt.processor; import lombok.extern.slf4j.Slf4j; import org.springframework.messaging.MessageHandler; import org.springframework.stereotype.Component; -//import org.springframework.integration.mqtt.support.MqttHeaders; -//import org.springframework.messaging.Message; -//import org.springframework.messaging.MessageHeaders; -//import org.springframework.messaging.MessagingException; -//import org.springframework.scheduling.annotation.Async; -//import java.util.concurrent.CompletableFuture; /** * 消费者处理器 @@ -16,46 +10,5 @@ import org.springframework.stereotype.Component; @Slf4j @Component @SuppressWarnings("all") -public abstract class MqttMessageReceiver implements MessageHandler { - - // 获取消息接口 -// public abstract String getMessage(String payload); - -// /** -// * 消息处理 -// * -// * @param message 消息 -// * @throws MessagingException 消息异常 -// */ -// @Override -// @Async("handleMessage") -// public void handleMessage(Message message) throws MessagingException { -// try { -// // 获取消息Topic -// MessageHeaders headers = message.getHeaders(); -// String topic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC)); -// log.info("[获取到的消息的topic]:{} ", topic); -// // 获取消息体 -// String payload = (String) message.getPayload(); -// log.info("[获取到的消息的payload]:{} ", payload); -// // 数据库入库消息 -// if (topic.contains("iot/test1/in-storage")) { -// // 模拟1000000条数据入库 -// log.info("接收到iot/test1/in-storage的消息啦,快去处理"); -// getMessage(payload); -//// long l = System.currentTimeMillis(); -//// System.currentTimeMillis(); -//// CompletableFuture future = CompletableFuture.runAsync(() -> { -////// analysisService.parseStoreData(payload); -//// }); -//// long l1 = System.currentTimeMillis(); -//// log.info("入库完成,耗时:{}", l1 - l); -//// analysisService.parseStoreData(payload); -// } else if (topic.contains("table-update/")){} // TODO 表更新topic -// } catch (Exception e) { -// log.error(e.toString()); -// } -// } - -} +public abstract class MqttMessageReceiver implements MessageHandler { } diff --git a/data-storage/src/main/java/com/huaxing/data/api/controller/DataQueryController.java b/data-storage/src/main/java/com/huaxing/data/api/controller/DataQueryController.java new file mode 100644 index 0000000..cfad902 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/api/controller/DataQueryController.java @@ -0,0 +1,31 @@ +package com.huaxing.data.api.controller; + +import com.huaxing.data.api.domain.DataQueryDTO; +import com.huaxing.common.result.ResultVo; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Map; + +/** + * @ProjectName: data-bridge + * @Package: com.huaxing.data.api.controller + * @ClassName: DataQueryController + * @Author: swordmeng8@163.com + * @Description: 数据查询控制器 + * @Date: 2025/1/20 16:02 + * @Version: 1.0 + */ +@RestController +@RequestMapping("/data") +public class DataQueryController { + + // TODO 查询方法 + @PostMapping("/query") + public ResultVo> query(DataQueryDTO queryDTO) { + return ResultVo.ok(null); + } + + +} diff --git a/data-storage/src/main/java/com/huaxing/data/api/domain/DataQueryDTO.java b/data-storage/src/main/java/com/huaxing/data/api/domain/DataQueryDTO.java new file mode 100644 index 0000000..f4e5283 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/api/domain/DataQueryDTO.java @@ -0,0 +1,26 @@ +package com.huaxing.data.api.domain; + +import lombok.Data; +import lombok.experimental.Accessors; + +import java.util.List; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.storage.domain + * @ClassName: CommonDeviceControl + * @Author: swordmeng8@163.com + * @Description: 数据查询DTO + * @Date: 2025/1/10 18:11 + * @Version: 1.0 + */ + +@Accessors(chain = true) +@Data +public class DataQueryDTO { + // 表名 + private String tableName; + // 设备数据集 + private List tableColumns; + +} \ No newline at end of file diff --git a/data-storage/src/main/java/com/huaxing/data/mqtt/MqttMessageConsumer.java b/data-storage/src/main/java/com/huaxing/data/mqtt/MqttMessageConsumer.java deleted file mode 100644 index d949cdd..0000000 --- a/data-storage/src/main/java/com/huaxing/data/mqtt/MqttMessageConsumer.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.huaxing.data.mqtt; - -import com.huaxing.data.storage.service.IDataAnalysisService; -import com.huaxing.mqtt.processor.MqttMessageReceiver; -import lombok.extern.slf4j.Slf4j; -import org.springframework.integration.mqtt.support.MqttHeaders; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.MessagingException; -import org.springframework.scheduling.annotation.Async; -import org.springframework.stereotype.Component; - -import java.util.concurrent.CompletableFuture; - - -/** - * @ProjectName: data-bridge - * @Package: com.huaxing.data.mqtt - * @ClassName: MqttMessageConsumer - * @Author: swordmeng8@163.com - * @Description: 消费者 - * @Date: 2025/1/20 11:23 - * @Version: 1.0 - */ -@Slf4j -@Component -public class MqttMessageConsumer extends MqttMessageReceiver { - - /** - * 数据处理服务 - */ - final IDataAnalysisService analysisService; - public MqttMessageConsumer(IDataAnalysisService analysisService) { - this.analysisService = analysisService; - } - - - @Override - @Async("handleMessage") - public void handleMessage(Message message) throws MessagingException { - try { - // 获取消息Topic - MessageHeaders headers = message.getHeaders(); - String topic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC)); - log.info("[获取到的消息的topic]:{} ", topic); - // 获取消息体 - String payload = (String) message.getPayload(); - log.info("[获取到的消息的payload]:{} ", payload); - // 数据库入库消息 - if (topic.contains("iot/test1/in-storage")) { - // 模拟1000000条数据入库 - log.info("接收到iot/test1/in-storage的消息啦,快去处理"); - CompletableFuture.runAsync(() -> { - analysisService.parseStoreData(payload); - }); - } else if (topic.contains("table-update/")){} // TODO 表更新topic - } catch (Exception e) { - log.error(e.toString()); - } - } -} diff --git a/data-storage/src/main/java/com/huaxing/data/storage/domain/DataQueryDTO.java b/data-storage/src/main/java/com/huaxing/data/storage/domain/DataQueryDTO.java deleted file mode 100644 index eb0a098..0000000 --- a/data-storage/src/main/java/com/huaxing/data/storage/domain/DataQueryDTO.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.huaxing.data.storage.domain; - -import lombok.Data; -import lombok.experimental.Accessors; - -import java.util.List; -import java.util.Map; - -/** - * @ProjectName: iot-data-bridge - * @Package: com.huaxing.data.storage.domain - * @ClassName: CommonDeviceControl - * @Author: swordmeng8@163.com - * @Description: 数据查询DTO - * @Date: 2025/1/10 18:11 - * @Version: 1.0 - */ - -@Accessors(chain = true) -@Data -public class DataQueryDTO { - // 表名 - private String tableName; - // 设备数据集 - private List tableColumns; - -} \ No newline at end of file diff --git a/data-storage/src/main/java/com/huaxing/mqtt/MqttMessageConsumer.java b/data-storage/src/main/java/com/huaxing/mqtt/MqttMessageConsumer.java new file mode 100644 index 0000000..297d4b1 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/mqtt/MqttMessageConsumer.java @@ -0,0 +1,58 @@ +package com.huaxing.mqtt; + +import com.huaxing.data.storage.service.IDataAnalysisService; +import com.huaxing.mqtt.processor.MqttMessageReceiver; +import lombok.extern.slf4j.Slf4j; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import java.util.concurrent.CompletableFuture; + + +/** + * @ProjectName: data-bridge + * @Package: com.huaxing.data.mqtt + * @ClassName: MqttMessageConsumer + * @Author: swordmeng8@163.com + * @Description: 消费者 + * @Date: 2025/1/20 11:23 + * @Version: 1.0 + */ +@Slf4j +@Component +public class MqttMessageConsumer extends MqttMessageReceiver { + + /** + * 数据处理服务 + */ + final IDataAnalysisService analysisService; + public MqttMessageConsumer(IDataAnalysisService analysisService) { + this.analysisService = analysisService; + } + + + @Override + @Async("handleMessage") + public void handleMessage(Message message) throws MessagingException { + try { + MessageHeaders headers = message.getHeaders(); + String topic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC)); + log.info("[获取到的消息的topic]:{} ", topic); + String payload = (String) message.getPayload(); + log.info("[获取到的消息的payload]:{} ", payload); + // 数据库入库消息 + if (topic.contains("iot/test1/in-storage")) { + log.info("接收到iot/test1/in-storage的消息啦,快去处理"); + CompletableFuture.runAsync(() -> { + analysisService.parseStoreData(payload); + }); + } else if (topic.contains("table-update/")){} // TODO 表更新topic + } catch (Exception e) { + log.error(e.toString()); + } + } +} diff --git a/data-storage/src/main/java/com/huaxing/test/TestController.java b/data-storage/src/main/java/com/huaxing/test/TestController.java index 7acc59b..318b9c8 100644 --- a/data-storage/src/main/java/com/huaxing/test/TestController.java +++ b/data-storage/src/main/java/com/huaxing/test/TestController.java @@ -7,6 +7,7 @@ import com.huaxing.data.storage.service.IDeviceDataStoredService; import com.huaxing.data.tablemanagement.service.ITableStructureService; import com.huaxing.common.util.JacksonUtil; import com.huaxing.mqtt.processor.MqttMessageSender; +import com.huaxing.common.result.ResultVo; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; @@ -57,17 +58,17 @@ public class TestController { // =============================================== 测试Dfs表查询 ====================================== @GetMapping(value = "/testSelectDfs") - public List> testSelectDfs() { + public ResultVo>> testSelectDfs() { String dbPath = "dfs://ZbDB"; String sql = String.format("select * from loadTable('%s','%s')", dbPath, "ZbWaterMeter1Dfs"); - return dataQueryDfsService.selectList(sql); + return ResultVo.ok(dataQueryDfsService.selectList(sql)); } // =============================================== 测试订阅流表查询 ====================================== @GetMapping(value = "/testSelectStream") - public List> testSelectStream() { + public ResultVo>> testSelectStream() { String sql = "select * from WaterMeterTset1Stream"; - return dataQueryStreamService.selectList(sql); + return ResultVo.ok(dataQueryStreamService.selectList(sql)); } // =============================================== 给指定的流表增加列字段 ====================================== diff --git a/data-storage/src/main/resources/application-dev.yaml b/data-storage/src/main/resources/application-dev.yaml index 9a7071a..272876f 100644 --- a/data-storage/src/main/resources/application-dev.yaml +++ b/data-storage/src/main/resources/application-dev.yaml @@ -1,6 +1,7 @@ spring: application: name: iot-data-bridge + version: 1.0.0 datasource: url: jdbc:dolphindb://localhost:8848?databasePath=dfs://ZbDB username: admin diff --git a/data-storage/src/main/resources/application.yaml b/data-storage/src/main/resources/application.yaml index 6213e80..10af456 100644 --- a/data-storage/src/main/resources/application.yaml +++ b/data-storage/src/main/resources/application.yaml @@ -1,5 +1,7 @@ server: port: 8088 + servlet: + context-path: /api spring: profiles: active: dev \ No newline at end of file diff --git a/data-storage/src/main/resources/banner.txt b/data-storage/src/main/resources/banner.txt index a10a24b..d193f37 100644 --- a/data-storage/src/main/resources/banner.txt +++ b/data-storage/src/main/resources/banner.txt @@ -1,7 +1,9 @@ -######## ### ######## ### ###### ######## ####### ######## ### ###### ######## -## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## -## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## -## ## ## ## ## ## ## ####### ###### ## ## ## ######## ## ## ## #### ###### -## ## ######### ## ######### ## ## ## ## ## ## ######### ## ## ## -## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## -######## ## ## ## ## ## ###### ## ####### ## ## ## ## ###### ######## +${AnsiColor.BLUE} ######## ### ######## ### ###### ######## ####### ######## ### ###### ######## +${AnsiColor.BLUE} ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## +${AnsiColor.BLUE} ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## +${AnsiColor.BLUE} ## ## ## ## ## ## ## ####### ###### ## ## ## ######## ## ## ## #### ###### +${AnsiColor.BLUE} ## ## ######### ## ######### ## ## ## ## ## ## ######### ## ## ## +${AnsiColor.BLUE} ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## +${AnsiColor.BLUE} ######## ## ## ## ## ## ###### ## ####### ## ## ## ## ###### ######## +${AnsiColor.BRIGHT_GREEN}Spring Boot Version: ${spring-boot.version} +${AnsiColor.BRIGHT_YELLOW}Application Version: ${application.version} \ No newline at end of file diff --git a/data-storage/src/main/resources/script/补水箱.script b/data-storage/src/main/resources/script/补水箱.script new file mode 100644 index 0000000..c4d9d98 --- /dev/null +++ b/data-storage/src/main/resources/script/补水箱.script @@ -0,0 +1,73 @@ +//分布式库名 +dbPath = "dfs://ZbDB" +//流表名 +stName = "WaterMeterTsetStream" +//分区表名 +ptName = "WaterMeterTsetDfs" + +//建库函数 +def createDB(dbPath){ + print('正在初始化数据库' + dbPath) + if (existsDatabase(dbPath)) { + print('当前数据库已存在,加载数据库:' + dbPath) + db = database(dbPath) + print('已加载数据库:' + dbPath) + return db + } + else { + print('当前数据库不存在,创建数据库:' + dbPath) + value = 2025.01M..2040.12M + db = database(dbPath, VALUE, value,,engine='OLAP') + print('已完成创建数据库:'+ dbPath) + return db + } +} +//建流表函数 +def createST(stName){ + if(not existsStreamTable(stName)){ + print('共享变量未定义,创建共享表:' + stName) + print(`正在创建Demo流表) + colNames = `time`projectId`deviceId`WM_WFA`WM_WFA_Unit + colTypes = [DATETIME,STRING,STRING,DECIMAL64(4),STRING] + st = streamTable(150000:0, colNames, colTypes) + print(`完成创建Demo流表) + enableTableShareAndPersistence(table=st, tableName=stName, retentionMinutes=10, cacheSize=100000, preCache=100000 ) + print('完成创建共享表') + return st + } + else{ + print('共享变量已存在,加载共享变量:' + stName) + return objByName(stName, true) + } +} +//建分区表函数 +def createPT(dbPath,ptName){ + db = createDB(dbPath) + if (existsTable(dbPath, ptName)){ + print('分区表已存在,加载分区表:' + ptName) + pt = loadTable(db,ptName) + return pt + } + else{ + print('分区表不存在,创建分区表:' + ptName) + print(`正在创建Demo分区表) + colNames = `time`projectId`deviceId`WM_WFA`WM_WFA_Unit + colTypes = [DATETIME,STRING,STRING,DECIMAL64(4),STRING] + t = table(1:0, colNames, colTypes) + pt = db.createPartitionedTable(table=t, tableName=ptName, partitionColumns=`time, sortColumns=`deviceId`time, keepDuplicates=LAST) + print('完成创建分区表:' + ptName) + return pt + } +} +//创建库 +//createDB(dbPath) +//创建分区表 +createPT(dbPath,ptName) +// 取消订阅流表 +unsubscribeTable(tableName=stName, actionName=`WaterMeterTsetChgTime) +// 删除流表 +// dropStreamTable(stName) +//创建流表 +createST(stName) +//订阅流表 +subscribeTable(tableName=stName, actionName=`WaterMeterTsetChgTime, offset=-1, handler=loadTable(dbPath,ptName), msgAsTable=true) \ No newline at end of file diff --git a/data-storage/src/main/resources/补水箱.script b/data-storage/src/main/resources/补水箱.script deleted file mode 100644 index c4d9d98..0000000 --- a/data-storage/src/main/resources/补水箱.script +++ /dev/null @@ -1,73 +0,0 @@ -//分布式库名 -dbPath = "dfs://ZbDB" -//流表名 -stName = "WaterMeterTsetStream" -//分区表名 -ptName = "WaterMeterTsetDfs" - -//建库函数 -def createDB(dbPath){ - print('正在初始化数据库' + dbPath) - if (existsDatabase(dbPath)) { - print('当前数据库已存在,加载数据库:' + dbPath) - db = database(dbPath) - print('已加载数据库:' + dbPath) - return db - } - else { - print('当前数据库不存在,创建数据库:' + dbPath) - value = 2025.01M..2040.12M - db = database(dbPath, VALUE, value,,engine='OLAP') - print('已完成创建数据库:'+ dbPath) - return db - } -} -//建流表函数 -def createST(stName){ - if(not existsStreamTable(stName)){ - print('共享变量未定义,创建共享表:' + stName) - print(`正在创建Demo流表) - colNames = `time`projectId`deviceId`WM_WFA`WM_WFA_Unit - colTypes = [DATETIME,STRING,STRING,DECIMAL64(4),STRING] - st = streamTable(150000:0, colNames, colTypes) - print(`完成创建Demo流表) - enableTableShareAndPersistence(table=st, tableName=stName, retentionMinutes=10, cacheSize=100000, preCache=100000 ) - print('完成创建共享表') - return st - } - else{ - print('共享变量已存在,加载共享变量:' + stName) - return objByName(stName, true) - } -} -//建分区表函数 -def createPT(dbPath,ptName){ - db = createDB(dbPath) - if (existsTable(dbPath, ptName)){ - print('分区表已存在,加载分区表:' + ptName) - pt = loadTable(db,ptName) - return pt - } - else{ - print('分区表不存在,创建分区表:' + ptName) - print(`正在创建Demo分区表) - colNames = `time`projectId`deviceId`WM_WFA`WM_WFA_Unit - colTypes = [DATETIME,STRING,STRING,DECIMAL64(4),STRING] - t = table(1:0, colNames, colTypes) - pt = db.createPartitionedTable(table=t, tableName=ptName, partitionColumns=`time, sortColumns=`deviceId`time, keepDuplicates=LAST) - print('完成创建分区表:' + ptName) - return pt - } -} -//创建库 -//createDB(dbPath) -//创建分区表 -createPT(dbPath,ptName) -// 取消订阅流表 -unsubscribeTable(tableName=stName, actionName=`WaterMeterTsetChgTime) -// 删除流表 -// dropStreamTable(stName) -//创建流表 -createST(stName) -//订阅流表 -subscribeTable(tableName=stName, actionName=`WaterMeterTsetChgTime, offset=-1, handler=loadTable(dbPath,ptName), msgAsTable=true) \ No newline at end of file diff --git a/pom.xml b/pom.xml index 297ba4a..63d5dc5 100644 --- a/pom.xml +++ b/pom.xml @@ -84,22 +84,22 @@ - - org.apache.cassandra - cassandra-all - 0.8.1 - - - - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - - + + + + + + + + + + + + + + + + org.springframework.integration