From 3b7dd13925f7d2a345b1d580a5ce739a66aab6f5 Mon Sep 17 00:00:00 2001 From: swordmeng Date: Thu, 13 Feb 2025 10:21:57 +0800 Subject: [PATCH] 1 --- data-framework/src/main/resources/banner.txt | 9 + .../src/main/java/com/huaxing/Main.java | 14 +- .../com/huaxing/feign/IDatabaseClientFeign.java | 9 +- .../database/controller/DatabaseController.java | 263 ++++++--------------- .../controller/DatabaseTestController.java | 240 +++++++++++++++++++ .../java/com/huaxing/mqtt/MqttMessageConsumer.java | 8 +- .../src/main/resources/application-dev.yaml | 1 - data-storage/src/main/resources/banner.txt | 9 - 8 files changed, 334 insertions(+), 219 deletions(-) create mode 100644 data-framework/src/main/resources/banner.txt create mode 100644 data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseTestController.java delete mode 100644 data-storage/src/main/resources/banner.txt diff --git a/data-framework/src/main/resources/banner.txt b/data-framework/src/main/resources/banner.txt new file mode 100644 index 0000000..a06ee77 --- /dev/null +++ b/data-framework/src/main/resources/banner.txt @@ -0,0 +1,9 @@ +${AnsiColor.BLUE} ######## ### ######## ### ######## ######## #### ######## ###### ######## +${AnsiColor.BLUE} ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## +${AnsiColor.BLUE} ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## +${AnsiColor.BLUE} ## ## ## ## ## ## ## ####### ######## ######## ## ## ## ## #### ###### +${AnsiColor.BLUE} ## ## ######### ## ######### ## ## ## ## ## ## ## ## ## ## +${AnsiColor.BLUE} ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## +${AnsiColor.BLUE} ######## ## ## ## ## ## ######## ## ## #### ######## ###### ######## +${AnsiColor.BRIGHT_GREEN}Spring Boot Version: ${spring-boot.version} +${AnsiColor.BRIGHT_YELLOW}Application Version: ${application.version} \ No newline at end of file diff --git a/data-storage-api/src/main/java/com/huaxing/Main.java b/data-storage-api/src/main/java/com/huaxing/Main.java index b310726..e0890f6 100644 --- a/data-storage-api/src/main/java/com/huaxing/Main.java +++ b/data-storage-api/src/main/java/com/huaxing/Main.java @@ -1,7 +1,7 @@ -package com.huaxing; - -public class Main { - public static void main(String[] args) { - System.out.println("Hello world!"); - } -} \ No newline at end of file +//package com.huaxing; +// +//public class Main { +// public static void main(String[] args) { +// System.out.println("Hello world!"); +// } +//} \ No newline at end of file diff --git a/data-storage-api/src/main/java/com/huaxing/feign/IDatabaseClientFeign.java b/data-storage-api/src/main/java/com/huaxing/feign/IDatabaseClientFeign.java index 04186d4..70401ed 100644 --- a/data-storage-api/src/main/java/com/huaxing/feign/IDatabaseClientFeign.java +++ b/data-storage-api/src/main/java/com/huaxing/feign/IDatabaseClientFeign.java @@ -19,10 +19,10 @@ import org.springframework.web.bind.annotation.RequestBody; public interface IDatabaseClientFeign { // 请求前缀 - String API_PREFIX = "/api"; + String API_PREFIX = "/api/database"; // 创建数据库 - String CREATE_DATABASE = API_PREFIX + "/database/create"; + String CREATE_DATABASE = API_PREFIX + "/create"; // 创建dfs表 String CREATE_DFS_TABLE = API_PREFIX + "/table/create/dfs"; @@ -42,8 +42,6 @@ public interface IDatabaseClientFeign { // 取消订阅流表 String UNSUBSCRIBE_STREAM_TABLE = API_PREFIX + "/table/unsubscribe/stream"; - - // 新增dfs表字段 String ADD_DFS_COLUMNS = API_PREFIX + "/table/add/dfs/columns"; @@ -125,7 +123,4 @@ public interface IDatabaseClientFeign { @PostMapping(ADD_STREAM_COLUMNS) ResultVo addStreamColumns(@RequestBody @Validated TableDTO tableDTO); - - - } diff --git a/data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseController.java b/data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseController.java index f64c521..a0790d0 100644 --- a/data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseController.java +++ b/data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseController.java @@ -1,240 +1,119 @@ package com.huaxing.data.database.controller; -import com.huaxing.data.storage.domain.DataAnalysisDTO; -import com.huaxing.data.storage.service.IDeviceDataQueryDfsService; -import com.huaxing.data.storage.service.IDeviceDataQueryStreamService; -import com.huaxing.data.storage.service.IDeviceDataStoredService; -import com.huaxing.data.database.domain.TableDTO; -import com.huaxing.data.database.service.ITableStructureService; -import com.huaxing.common.util.JacksonUtil; -import com.huaxing.data.database.template.ISqlTemplateService; -import com.huaxing.mqtt.processor.MqttMessageSender; import com.huaxing.common.result.ResultVo; -import com.huaxing.pojo.entity.TableColumnDTO; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.GetMapping; +import com.huaxing.pojo.entity.DatabaseDTO; +import com.huaxing.pojo.entity.TableDTO; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import java.util.*; - - /** - * @ProjectName: iot-data-bridge - * @Package: com.huaxing.data.storage.controller - * @ClassName: TestController - * @Author: swordmeng8@163.com - * @Description: 测试controller - * @Date: 2025/1/15 17:08 - * @Version: 1.0 + * 与 IDatabaseClientFeign 对应的 Controller + * + * @author YourName */ -@Slf4j @RestController @RequestMapping("/api/database") -@SuppressWarnings("all") public class DatabaseController { - @Autowired - private MqttMessageSender messageSender; - final IDeviceDataStoredService dataStoredService; - final IDeviceDataQueryDfsService dataQueryDfsService; - final IDeviceDataQueryStreamService dataQueryStreamService; - final ITableStructureService tableStructureService; - - - // =============================================== 测试构造器 ====================================== - public DatabaseController(IDeviceDataStoredService dataStoredService, IDeviceDataQueryDfsService dataQueryDfsService, IDeviceDataQueryStreamService dataQueryStreamService, ITableStructureService tableStructureService, ISqlTemplateService tableTemplateService) { - this.dataStoredService = dataStoredService; - this.dataQueryDfsService = dataQueryDfsService; - this.dataQueryStreamService = dataQueryStreamService; - this.tableStructureService = tableStructureService; - } - - - // =============================================== 测试插入数据 ====================================== - - /** - * 测试数据插入 - */ - @GetMapping(value = "/testInsert") - public void testInsert() { - String tableName = "MyTable2Stream"; - String sql = "INSERT INTO " + tableName + " (time, projectId, deviceId, test) VALUES (2025.01.11 00:00:00,'48', '0jZU2102_0806_0011', 'testVal')"; - dataStoredService.execute(sql); - log.info("SUCCESS"); - } - - // =============================================== 测试Dfs表查询 ====================================== - - /** - * 测试Dfs表查询 - */ - @GetMapping(value = "/testSelectDfs") - public ResultVo>> testSelectDfs() { - String dbPath = "dfs://ZbDB"; - String sql = String.format("select * from loadTable('%s','%s')", dbPath, "ZbWaterMeter1Dfs"); - return ResultVo.ok(dataQueryDfsService.selectList(sql)); - } - - // =============================================== 测试订阅流表查询 ====================================== - /** - * 测试订阅流表查询 + * 创建数据库 + * @param databaseDTO 数据库信息 + * @return ResultVo */ - @GetMapping(value = "/testSelectStream") - public ResultVo>> testSelectStream() { - String sql = "select * from WaterMeterTset1Stream"; - return ResultVo.ok(dataQueryStreamService.selectList(sql)); + @PostMapping("/create") + public ResultVo createDatabase(@RequestBody @Validated DatabaseDTO databaseDTO) { + // 这里添加创建数据库的具体逻辑 + return ResultVo.ok(); } - // =============================================== 给指定的流表增加列字段 ====================================== - /** - * 测试给指定的流表增加列字段 + * 创建 dfs 表 + * @param tableDTO 表信息 + * @return ResultVo */ - @GetMapping(value = "/testStreamAddColumn") - public void testStreamAddColumn() { - String tableName = "WaterMeterTsetStream"; - String columnName = "test"; - String columnType = "STRING"; - String columnDesc = "test"; - TableDTO tableDTO = new TableDTO(); - tableDTO.setTableName(tableName); - tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc))); - try { - tableStructureService.addStreamColumns(tableDTO); - log.info("SUCCESS"); - } catch (Exception e) { - log.error("FAIL"); - e.printStackTrace(); - } + @PostMapping("/table/create/dfs") + public ResultVo createDfsTable(@RequestBody @Validated TableDTO tableDTO) { + // 这里添加创建 dfs 表的具体逻辑 + return ResultVo.ok(); } - // =============================================== 给指定的Dfs表增加列字段 ====================================== /** - * 测试给指定的Dfs表增加列字段 + * 创建 stream 表 + * @param tableDTO 表信息 + * @return ResultVo */ - @GetMapping(value = "/testDfsAddColumn") - public void testDfsAddColumn() { - String tableName = "WaterMeterTsetDfs"; - String columnName = "test"; - String columnType = "STRING"; - String columnDesc = "test描述"; - TableDTO tableDTO = new TableDTO(); - tableDTO.setTableName(tableName); - tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc))); - try { - tableStructureService.addDfsColumns(tableDTO); - log.info("SUCCESS"); - } catch (Exception e) { - log.error("FAIL"); - e.printStackTrace(); - } + @PostMapping("/table/create/stream") + public ResultVo createStreamTable(@RequestBody @Validated TableDTO tableDTO) { + // 这里添加创建 stream 表的具体逻辑 + return ResultVo.ok(); } - - // ===================================== mqtt消息发送-测试数据入库 START ============================== - /** - * 测试发送mqtt消息 + * 创建 dfs 和 stream 表并订阅 + * @param tableDTO 表信息 + * @return ResultVo */ - @GetMapping(value = "/testSendMessage") - public void testSendMessage() { - for (int i = 0; i < 2; i++) { - List> dataList = new ArrayList<>(); - dataList.add(handleMapByIndex(i)); - DataAnalysisDTO dataAnalysisDTO = DataAnalysisDTO.builder().tableName( "WaterMeterTsetStream").dataList(dataList).build(); - messageSender.send("iot/test1/in-storage", JacksonUtil.objectStr(dataAnalysisDTO)); - } + @PostMapping("/table/create/dfs/stream/subscribe") + public ResultVo createDfsAndStreamTableSubscribe(@RequestBody @Validated TableDTO tableDTO) { + // 这里添加创建 dfs 和 stream 表并订阅的具体逻辑 + return ResultVo.ok(); } /** - * 根据索引生成测试数据 - * @param index 索引 - * @return + * 创建 dfs 和 stream 表不订阅 + * @param tableDTO 表信息 + * @return ResultVo */ - private Map handleMapByIndex(int index) { - Map map = new HashMap<>(); - map.put("time", new Date()); - map.put("projectId", "0jZU2102"); - map.put("deviceId", "0jZU2102_0806_0011"); - map.put("WM_WFA", 124.656 + index); - map.put("WM_WFA_Unit", "m³"); - map.put("test", "test"); - return map; + @PostMapping("/table/create/dfs/stream/unsubscribe") + public ResultVo createDfsAndStreamTableUnsubscribe(@RequestBody @Validated TableDTO tableDTO) { + // 这里添加创建 dfs 和 stream 表不订阅的具体逻辑 + return ResultVo.ok(); } - // =============================================== 创建Dfs表 Stream流表 取消订阅 添加订阅 创建 ====================================== - /** - * 测试创建Dfs表 - */ - @GetMapping(value = "/testCreateDfsTable") - public void testCreateDfsTable() { - TableDTO tableDTO = new TableDTO(); - tableDTO.setDatabaseName("ZbDB"); - tableDTO.setTableName("Test1"); - List tableColumnList = new ArrayList<>(); - tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); - tableColumnList.add(new TableColumnDTO("test1", "STRING","test1")); - tableColumnList.add(new TableColumnDTO("test2", "STRING","test2")); - tableDTO.setTableColumnList(tableColumnList); - tableStructureService.createDfsTable(tableDTO); - } - - /** - * 测试创建流表 + * 订阅流表 + * @param tableDTO 表信息 + * @return ResultVo */ - @GetMapping(value = "/testCreateStreamTable") - public void testCreateStreamTable() { - TableDTO tableDTO = new TableDTO(); - tableDTO.setDatabaseName("ZbDB"); - tableDTO.setTableName("Test1"); - List tableColumnList = new ArrayList<>(); - tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); - tableColumnList.add(new TableColumnDTO("test1", "STRING","test1")); - tableColumnList.add(new TableColumnDTO("test2", "STRING","test2")); - tableDTO.setTableColumnList(tableColumnList); - tableStructureService.createStreamTable(tableDTO); + @PostMapping("/table/subscribe/stream") + public ResultVo subscribeStreamTable(@RequestBody @Validated TableDTO tableDTO) { + // 这里添加订阅流表的具体逻辑 + return ResultVo.ok(); } /** * 取消订阅流表 + * @param tableDTO 表信息 + * @return ResultVo */ - @GetMapping(value = "/testUnsubscribeStreamTable") - public void testUnsubscribeStreamTable() { - TableDTO tableDTO = new TableDTO(); - tableDTO.setDatabaseName("ZbDB"); - tableDTO.setTableName("Test1"); - tableStructureService.unsubscribeStreamTable(tableDTO); + @PostMapping("/table/unsubscribe/stream") + public ResultVo unsubscribeStreamTable(@RequestBody @Validated TableDTO tableDTO) { + // 这里添加取消订阅流表的具体逻辑 + return ResultVo.ok(); } /** - * 订阅流表 + * 新增 dfs 表字段 + * @param tableDTO 表信息 + * @return ResultVo */ - @GetMapping(value = "/testSubscribeStreamTable") - public void testSubscribeStreamTable() { - TableDTO tableDTO = new TableDTO(); - tableDTO.setDatabaseName("ZbDB"); - tableDTO.setTableName("Test1"); - tableStructureService.subscribeStreamTable(tableDTO); + @PostMapping("/table/add/dfs/columns") + public ResultVo addDfsColumns(@RequestBody @Validated TableDTO tableDTO) { + // 这里添加新增 dfs 表字段的具体逻辑 + return ResultVo.ok(); } /** - * 创建Dfs和Stream表并添加订阅 + * 新增 stream 表字段 + * @param tableDTO 表信息 + * @return ResultVo */ - @GetMapping(value = "/testCreateDfsAndStreamTableSubscribe") - public void testCreateDfsAndStreamTableSubscribe() { - TableDTO tableDTO = new TableDTO(); - tableDTO.setDatabaseName("ZbDB"); - tableDTO.setTableName("MyTe2"); - tableDTO.setTableDesc("MyTe2测试表"); - List tableColumnList = new ArrayList<>(); - tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); - tableDTO.setTableColumnList(tableColumnList); - tableStructureService.createDfsAndStreamTableSubscribe(tableDTO); + @PostMapping("/table/add/stream/columns") + public ResultVo addStreamColumns(@RequestBody @Validated TableDTO tableDTO) { + // 这里添加新增 stream 表字段的具体逻辑 + return ResultVo.ok(); } - - -} +} \ No newline at end of file diff --git a/data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseTestController.java b/data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseTestController.java new file mode 100644 index 0000000..53be75f --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseTestController.java @@ -0,0 +1,240 @@ +package com.huaxing.data.database.controller; + +import com.huaxing.data.storage.domain.DataAnalysisDTO; +import com.huaxing.data.storage.service.IDeviceDataQueryDfsService; +import com.huaxing.data.storage.service.IDeviceDataQueryStreamService; +import com.huaxing.data.storage.service.IDeviceDataStoredService; +import com.huaxing.data.database.domain.TableDTO; +import com.huaxing.data.database.service.ITableStructureService; +import com.huaxing.common.util.JacksonUtil; +import com.huaxing.data.database.template.ISqlTemplateService; +import com.huaxing.mqtt.processor.MqttMessageSender; +import com.huaxing.common.result.ResultVo; +import com.huaxing.pojo.entity.TableColumnDTO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.*; + + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.storage.controller + * @ClassName: TestController + * @Author: swordmeng8@163.com + * @Description: 测试controller + * @Date: 2025/1/15 17:08 + * @Version: 1.0 + */ +@Slf4j +@RestController +@RequestMapping("/api/database") +@SuppressWarnings("all") +public class DatabaseTestController { + + @Autowired + private MqttMessageSender messageSender; + final IDeviceDataStoredService dataStoredService; + final IDeviceDataQueryDfsService dataQueryDfsService; + final IDeviceDataQueryStreamService dataQueryStreamService; + final ITableStructureService tableStructureService; + + + // =============================================== 测试构造器 ====================================== + public DatabaseTestController(IDeviceDataStoredService dataStoredService, IDeviceDataQueryDfsService dataQueryDfsService, IDeviceDataQueryStreamService dataQueryStreamService, ITableStructureService tableStructureService, ISqlTemplateService tableTemplateService) { + this.dataStoredService = dataStoredService; + this.dataQueryDfsService = dataQueryDfsService; + this.dataQueryStreamService = dataQueryStreamService; + this.tableStructureService = tableStructureService; + } + + + // =============================================== 测试插入数据 ====================================== + + /** + * 测试数据插入 + */ + @GetMapping(value = "/testInsert") + public void testInsert() { + String tableName = "MyTable2Stream"; + String sql = "INSERT INTO " + tableName + " (time, projectId, deviceId, test) VALUES (2025.01.11 00:00:00,'48', '0jZU2102_0806_0011', 'testVal')"; + dataStoredService.execute(sql); + log.info("SUCCESS"); + } + + // =============================================== 测试Dfs表查询 ====================================== + + /** + * 测试Dfs表查询 + */ + @GetMapping(value = "/testSelectDfs") + public ResultVo>> testSelectDfs() { + String dbPath = "dfs://ZbDB"; + String sql = String.format("select * from loadTable('%s','%s')", dbPath, "ZbWaterMeter1Dfs"); + return ResultVo.ok(dataQueryDfsService.selectList(sql)); + } + + // =============================================== 测试订阅流表查询 ====================================== + + /** + * 测试订阅流表查询 + */ + @GetMapping(value = "/testSelectStream") + public ResultVo>> testSelectStream() { + String sql = "select * from WaterMeterTset1Stream"; + return ResultVo.ok(dataQueryStreamService.selectList(sql)); + } + + // =============================================== 给指定的流表增加列字段 ====================================== + + /** + * 测试给指定的流表增加列字段 + */ + @GetMapping(value = "/testStreamAddColumn") + public void testStreamAddColumn() { + String tableName = "WaterMeterTsetStream"; + String columnName = "test"; + String columnType = "STRING"; + String columnDesc = "test"; + TableDTO tableDTO = new TableDTO(); + tableDTO.setTableName(tableName); + tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc))); + try { + tableStructureService.addStreamColumns(tableDTO); + log.info("SUCCESS"); + } catch (Exception e) { + log.error("FAIL"); + e.printStackTrace(); + } + } + + // =============================================== 给指定的Dfs表增加列字段 ====================================== + /** + * 测试给指定的Dfs表增加列字段 + */ + @GetMapping(value = "/testDfsAddColumn") + public void testDfsAddColumn() { + String tableName = "WaterMeterTsetDfs"; + String columnName = "test"; + String columnType = "STRING"; + String columnDesc = "test描述"; + TableDTO tableDTO = new TableDTO(); + tableDTO.setTableName(tableName); + tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc))); + try { + tableStructureService.addDfsColumns(tableDTO); + log.info("SUCCESS"); + } catch (Exception e) { + log.error("FAIL"); + e.printStackTrace(); + } + } + + + // ===================================== mqtt消息发送-测试数据入库 START ============================== + + /** + * 测试发送mqtt消息 + */ + @GetMapping(value = "/testSendMessage") + public void testSendMessage() { + for (int i = 0; i < 2; i++) { + List> dataList = new ArrayList<>(); + dataList.add(handleMapByIndex(i)); + DataAnalysisDTO dataAnalysisDTO = DataAnalysisDTO.builder().tableName( "WaterMeterTsetStream").dataList(dataList).build(); + messageSender.send("iot/test1/in-storage", JacksonUtil.objectStr(dataAnalysisDTO)); + } + } + + /** + * 根据索引生成测试数据 + * @param index 索引 + * @return + */ + private Map handleMapByIndex(int index) { + Map map = new HashMap<>(); + map.put("time", new Date()); + map.put("projectId", "0jZU2102"); + map.put("deviceId", "0jZU2102_0806_0011"); + map.put("WM_WFA", 124.656 + index); + map.put("WM_WFA_Unit", "m³"); + map.put("test", "test"); + return map; + } + + // =============================================== 创建Dfs表 Stream流表 取消订阅 添加订阅 创建 ====================================== + + /** + * 测试创建Dfs表 + */ + @GetMapping(value = "/testCreateDfsTable") + public void testCreateDfsTable() { + TableDTO tableDTO = new TableDTO(); + tableDTO.setDatabaseName("ZbDB"); + tableDTO.setTableName("Test1"); + List tableColumnList = new ArrayList<>(); + tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); + tableColumnList.add(new TableColumnDTO("test1", "STRING","test1")); + tableColumnList.add(new TableColumnDTO("test2", "STRING","test2")); + tableDTO.setTableColumnList(tableColumnList); + tableStructureService.createDfsTable(tableDTO); + } + + /** + * 测试创建流表 + */ + @GetMapping(value = "/testCreateStreamTable") + public void testCreateStreamTable() { + TableDTO tableDTO = new TableDTO(); + tableDTO.setDatabaseName("ZbDB"); + tableDTO.setTableName("Test1"); + List tableColumnList = new ArrayList<>(); + tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); + tableColumnList.add(new TableColumnDTO("test1", "STRING","test1")); + tableColumnList.add(new TableColumnDTO("test2", "STRING","test2")); + tableDTO.setTableColumnList(tableColumnList); + tableStructureService.createStreamTable(tableDTO); + } + + /** + * 取消订阅流表 + */ + @GetMapping(value = "/testUnsubscribeStreamTable") + public void testUnsubscribeStreamTable() { + TableDTO tableDTO = new TableDTO(); + tableDTO.setDatabaseName("ZbDB"); + tableDTO.setTableName("Test1"); + tableStructureService.unsubscribeStreamTable(tableDTO); + } + + /** + * 订阅流表 + */ + @GetMapping(value = "/testSubscribeStreamTable") + public void testSubscribeStreamTable() { + TableDTO tableDTO = new TableDTO(); + tableDTO.setDatabaseName("ZbDB"); + tableDTO.setTableName("Test1"); + tableStructureService.subscribeStreamTable(tableDTO); + } + + /** + * 创建Dfs和Stream表并添加订阅 + */ + @GetMapping(value = "/testCreateDfsAndStreamTableSubscribe") + public void testCreateDfsAndStreamTableSubscribe() { + TableDTO tableDTO = new TableDTO(); + tableDTO.setDatabaseName("ZbDB"); + tableDTO.setTableName("MyTe2"); + tableDTO.setTableDesc("MyTe2测试表"); + List tableColumnList = new ArrayList<>(); + tableColumnList.add(new TableColumnDTO("test", "STRING", "test")); + tableDTO.setTableColumnList(tableColumnList); + tableStructureService.createDfsAndStreamTableSubscribe(tableDTO); + } + + +} diff --git a/data-storage/src/main/java/com/huaxing/mqtt/MqttMessageConsumer.java b/data-storage/src/main/java/com/huaxing/mqtt/MqttMessageConsumer.java index 7d0b4c9..a6dcd46 100644 --- a/data-storage/src/main/java/com/huaxing/mqtt/MqttMessageConsumer.java +++ b/data-storage/src/main/java/com/huaxing/mqtt/MqttMessageConsumer.java @@ -2,6 +2,7 @@ package com.huaxing.mqtt; import com.huaxing.data.storage.service.IDataAnalysisService; import com.huaxing.mqtt.processor.MqttMessageReceiver; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.Message; @@ -24,15 +25,16 @@ import java.util.concurrent.CompletableFuture; */ @Slf4j @Component +@AllArgsConstructor public class MqttMessageConsumer extends MqttMessageReceiver { /** * 数据处理服务 */ final IDataAnalysisService analysisService; - public MqttMessageConsumer(IDataAnalysisService analysisService) { - this.analysisService = analysisService; - } +// public MqttMessageConsumer(IDataAnalysisService analysisService) { +// this.analysisService = analysisService; +// } @Override diff --git a/data-storage/src/main/resources/application-dev.yaml b/data-storage/src/main/resources/application-dev.yaml index 272876f..afcb30d 100644 --- a/data-storage/src/main/resources/application-dev.yaml +++ b/data-storage/src/main/resources/application-dev.yaml @@ -36,7 +36,6 @@ mqtt: - iot/test2/# dolphindb: - db-path: dfs://ZbDB host: 127.0.0.1 port: 8848 username: admin diff --git a/data-storage/src/main/resources/banner.txt b/data-storage/src/main/resources/banner.txt deleted file mode 100644 index d193f37..0000000 --- a/data-storage/src/main/resources/banner.txt +++ /dev/null @@ -1,9 +0,0 @@ -${AnsiColor.BLUE} ######## ### ######## ### ###### ######## ####### ######## ### ###### ######## -${AnsiColor.BLUE} ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## -${AnsiColor.BLUE} ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## -${AnsiColor.BLUE} ## ## ## ## ## ## ## ####### ###### ## ## ## ######## ## ## ## #### ###### -${AnsiColor.BLUE} ## ## ######### ## ######### ## ## ## ## ## ## ######### ## ## ## -${AnsiColor.BLUE} ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## -${AnsiColor.BLUE} ######## ## ## ## ## ## ###### ## ####### ## ## ## ## ###### ######## -${AnsiColor.BRIGHT_GREEN}Spring Boot Version: ${spring-boot.version} -${AnsiColor.BRIGHT_YELLOW}Application Version: ${application.version} \ No newline at end of file