From e78d98225fe17e9285287203da1b717d189a0fef Mon Sep 17 00:00:00 2001 From: swordmeng Date: Mon, 20 Jan 2025 11:06:41 +0800 Subject: [PATCH] =?UTF-8?q?=E9=A1=B9=E7=9B=AE=E5=88=9D=E5=A7=8B=E5=8C=96?= =?UTF-8?q?=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/huaxing/IotDataBridgeApplication.java | 11 +- .../data/storage/controller/TestController.java | 130 -------------------- .../service/ITableTemplateService.java | 21 ++++ .../service/impl/TableTemplateServiceImpl.java | 61 ++++++++++ .../data/test/controller/TestController.java | 133 +++++++++++++++++++++ data-storage/src/main/resources/banner.txt | 8 ++ data-storage/src/main/resources/补水箱.script | 4 +- 7 files changed, 234 insertions(+), 134 deletions(-) delete mode 100644 data-storage/src/main/java/com/huaxing/data/storage/controller/TestController.java create mode 100644 data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableTemplateService.java create mode 100644 data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableTemplateServiceImpl.java create mode 100644 data-storage/src/main/java/com/huaxing/data/test/controller/TestController.java create mode 100644 data-storage/src/main/resources/banner.txt diff --git a/data-storage/src/main/java/com/huaxing/IotDataBridgeApplication.java b/data-storage/src/main/java/com/huaxing/IotDataBridgeApplication.java index 1aaf980..ebebd9e 100644 --- a/data-storage/src/main/java/com/huaxing/IotDataBridgeApplication.java +++ b/data-storage/src/main/java/com/huaxing/IotDataBridgeApplication.java @@ -1,16 +1,21 @@ package com.huaxing; -import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.ComponentScan; @SpringBootApplication public class IotDataBridgeApplication { public static void main(String[] args) { SpringApplication.run(IotDataBridgeApplication.class, args); - System.out.println("================= iot-data-bridge started! ================="); + String banner = + "===========================================\n" + + " 中联创信低碳科技有限公司 \n" + + " http://localhost:8088 \n" + + " iot-data-bridge started! \n" + + "===========================================\n"; + System.out.println(banner); + } } diff --git a/data-storage/src/main/java/com/huaxing/data/storage/controller/TestController.java b/data-storage/src/main/java/com/huaxing/data/storage/controller/TestController.java deleted file mode 100644 index 82a9e3e..0000000 --- a/data-storage/src/main/java/com/huaxing/data/storage/controller/TestController.java +++ /dev/null @@ -1,130 +0,0 @@ -package com.huaxing.data.storage.controller; - -import com.huaxing.data.storage.domain.DataAnalysisDTO; -import com.huaxing.data.storage.service.IDeviceDataQueryDfsService; -import com.huaxing.data.storage.service.IDeviceDataQueryStreamService; -import com.huaxing.data.storage.service.IDeviceDataStoredService; -import com.huaxing.data.tablemanagement.service.ITableStructureService; -import com.huaxing.data.util.JacksonUtil; -import com.huaxing.mqtt.processor.MqttMessageSender; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - - -/** - * @ProjectName: iot-data-bridge - * @Package: com.huaxing.data.storage.controller - * @ClassName: TestController - * @Author: swordmeng8@163.com - * @Description: 测试controller - * @Date: 2025/1/15 17:08 - * @Version: 1.0 - */ -@Slf4j -@RestController -@RequestMapping("/test") -@SuppressWarnings("all") -public class TestController { - - @Resource - private MqttMessageSender messageSender; - final IDeviceDataStoredService dataStoredService; - final IDeviceDataQueryDfsService dataQueryDfsService; - final IDeviceDataQueryStreamService dataQueryStreamService; - final ITableStructureService tableStructureService; - -// // 依赖注入 - public TestController(IDeviceDataStoredService dataStoredService, IDeviceDataQueryDfsService dataQueryDfsService, IDeviceDataQueryStreamService dataQueryStreamService, ITableStructureService tableStructureService) { - this.dataStoredService = dataStoredService; - this.dataQueryDfsService = dataQueryDfsService; - this.dataQueryStreamService = dataQueryStreamService; - this.tableStructureService = tableStructureService; - } - - // 测试插入数据 - @GetMapping(value = "/testInsert") // 成功 - public void testInsert() { - String sql = "INSERT INTO ZbWaterMeter1Stream (time, projectId, deviceId, WM_WFA, WM_WFA_Unit) VALUES (2024.11.01 00:00:00,'48', '0jZU2102_0806_0011', 124.656, 'm³')"; - dataStoredService.execute(sql); - log.info("SUCCESS"); - } - - // 测试Dfs表查询 - @GetMapping(value = "/testSelectDfs") - public List> testSelectDfs() { - String dbPath = "dfs://ZbDB"; - String sql = String.format("select * from loadTable('%s','%s')", dbPath, "ZbWaterMeter1Dfs"); - return dataQueryDfsService.selectList(sql); - } - - // 测试订阅流表查询 - @GetMapping(value = "/testSelectStream") - public List> testSelectStream() { - String sql = "select * from ZbWaterMeter1Stream"; - return dataQueryStreamService.selectList(sql); - } - - - // 给指定的流表增加列字段 - @GetMapping(value = "/testStreamAddColumn") - public void testStreamAddColumn() { - String tableName = "ZbWaterMeter1Stream"; - String columnName = "WM_WFA_Unit14"; - String columnDefinition = "STRING"; - try { - tableStructureService.addStreamColumn(tableName, columnName, columnDefinition); - log.info("SUCCESS"); - } catch (Exception e) { - log.error("FAIL"); - e.printStackTrace(); - } - } - - @GetMapping(value = "/testDfsAddColumn") - public void testDfsAddColumn() { - String tableName = "ZbWaterMeter1Dfs"; - String columnName = "test2"; - String columnDefinition = "STRING"; - try { - tableStructureService.addDfsColumn(tableName, columnName, columnDefinition); - log.info("SUCCESS"); - } catch (Exception e) { - log.error("FAIL"); - e.printStackTrace(); - } - } - - - // 向消息队列中发送100W条数据 - @GetMapping(value = "/testSendMessage") - public void testSendMessage() { - for (int i = 0; i < 2; i++) { - List> dataList = new ArrayList<>(); - dataList.add(handleMapByIndex(i)); - DataAnalysisDTO dataAnalysisDTO = DataAnalysisDTO.builder().tableName( "WaterMeterTsetStream").dataList(dataList).build(); - messageSender.send("iot/test1/in-storage", JacksonUtil.objectStr(dataAnalysisDTO)); - } - } - - private Map handleMapByIndex(int index) { - Map map = new HashMap<>(); - map.put("time", "2025.01.01 00:00:00"); - map.put("projectId", "0jZU2102"); - map.put("deviceId", "0jZU2102_0806_0011"); - map.put("WM_WFA", 124.656 + index); - map.put("WM_WFA_Unit", "m³"); - return map; - } - - - - -} diff --git a/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableTemplateService.java b/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableTemplateService.java new file mode 100644 index 0000000..922e81d --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableTemplateService.java @@ -0,0 +1,21 @@ +package com.huaxing.data.tablemanagement.service; + +/** + * 表创建模板 + * @author 孟剑 + * @date 2025-01-20 10:32 + */ +public interface ITableTemplateService { + + // 创建dfs表模板 + String createDfsTableTemplate(String tableName, String columnName, String columnDefinition); + + // 创建流表模板 + String createStreamTableTemplate(String tableName, String columnName, String columnDefinition); + + // 取消订阅流表模板 + String unsubscribeStreamTableTemplate(String tableName, String columnName, String columnDefinition); + + // 订阅流表模板 + String subscribeStreamTableTemplate(String tableName, String columnName, String columnDefinition); +} diff --git a/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableTemplateServiceImpl.java b/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableTemplateServiceImpl.java new file mode 100644 index 0000000..4c1e0f0 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableTemplateServiceImpl.java @@ -0,0 +1,61 @@ +package com.huaxing.data.tablemanagement.service.impl; + +import com.huaxing.data.tablemanagement.service.ITableTemplateService; + +/** + * @ProjectName: data-bridge + * @Package: com.huaxing.data.tablemanagement.service.impl + * @ClassName: TableTemplateServiceImpl + * @Author: swordmeng8@163.com + * @Description: 表操作模板 + * @Date: 2025/1/20 10:38 + * @Version: 1.0 + */ + +public class TableTemplateServiceImpl implements ITableTemplateService { + + /** + * @Description 创建 DFS 表模板 + * @Author swordmeng8@163.com + * @Date 2025/1/20 10:46 + * @Version v1.0 + **/ + @Override + public String createDfsTableTemplate(String tableName, String columnName, String columnDefinition) { + + return null; + } + + /** + * @Description 创建流表模板 + * @Author swordmeng8@163.com + * @Date 2025/1/20 10:46 + * @Version v1.0 + **/ + @Override + public String createStreamTableTemplate(String tableName, String columnName, String columnDefinition) { + return null; + } + + /** + * @Description 取消订阅流表模板 + * @Author swordmeng8@163.com + * @Date 2025/1/20 10:46 + * @Version v1.0 + **/ + @Override + public String unsubscribeStreamTableTemplate(String tableName, String columnName, String columnDefinition) { + return null; + } + + /** + * @Description 订阅流表模板 + * @Author swordmeng8@163.com + * @Date 2025/1/20 10:46 + * @Version v1.0 + **/ + @Override + public String subscribeStreamTableTemplate(String tableName, String columnName, String columnDefinition) { + return null; + } +} diff --git a/data-storage/src/main/java/com/huaxing/data/test/controller/TestController.java b/data-storage/src/main/java/com/huaxing/data/test/controller/TestController.java new file mode 100644 index 0000000..cb3f316 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/test/controller/TestController.java @@ -0,0 +1,133 @@ +package com.huaxing.data.test.controller; + +import com.huaxing.data.storage.domain.DataAnalysisDTO; +import com.huaxing.data.storage.service.IDeviceDataQueryDfsService; +import com.huaxing.data.storage.service.IDeviceDataQueryStreamService; +import com.huaxing.data.storage.service.IDeviceDataStoredService; +import com.huaxing.data.tablemanagement.service.ITableStructureService; +import com.huaxing.data.util.JacksonUtil; +import com.huaxing.mqtt.processor.MqttMessageSender; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.storage.controller + * @ClassName: TestController + * @Author: swordmeng8@163.com + * @Description: 测试controller + * @Date: 2025/1/15 17:08 + * @Version: 1.0 + */ +@Slf4j +@RestController +@RequestMapping("/test") +@SuppressWarnings("all") +public class TestController { + + @Resource + private MqttMessageSender messageSender; + final IDeviceDataStoredService dataStoredService; + final IDeviceDataQueryDfsService dataQueryDfsService; + final IDeviceDataQueryStreamService dataQueryStreamService; + final ITableStructureService tableStructureService; + +// // 依赖注入 + public TestController(IDeviceDataStoredService dataStoredService, IDeviceDataQueryDfsService dataQueryDfsService, IDeviceDataQueryStreamService dataQueryStreamService, ITableStructureService tableStructureService) { + this.dataStoredService = dataStoredService; + this.dataQueryDfsService = dataQueryDfsService; + this.dataQueryStreamService = dataQueryStreamService; + this.tableStructureService = tableStructureService; + } + + + // =============================================== 测试插入数据 ====================================== + @GetMapping(value = "/testInsert") // 成功 + public void testInsert() { + String sql = "INSERT INTO ZbWaterMeter1Stream (time, projectId, deviceId, WM_WFA, WM_WFA_Unit) VALUES (2024.11.01 00:00:00,'48', '0jZU2102_0806_0011', 124.656, 'm³')"; + dataStoredService.execute(sql); + log.info("SUCCESS"); + } + + // =============================================== 测试Dfs表查询 ====================================== + @GetMapping(value = "/testSelectDfs") + public List> testSelectDfs() { + String dbPath = "dfs://ZbDB"; + String sql = String.format("select * from loadTable('%s','%s')", dbPath, "ZbWaterMeter1Dfs"); + return dataQueryDfsService.selectList(sql); + } + + // =============================================== 测试订阅流表查询 ====================================== + @GetMapping(value = "/testSelectStream") + public List> testSelectStream() { + String sql = "select * from ZbWaterMeter1Stream"; + return dataQueryStreamService.selectList(sql); + } + + // =============================================== 给指定的流表增加列字段 ====================================== + @GetMapping(value = "/testStreamAddColumn") + public void testStreamAddColumn() { + String tableName = "ZbWaterMeter1Stream"; + String columnName = "WM_WFA_Unit14"; + String columnDefinition = "STRING"; + try { + tableStructureService.addStreamColumn(tableName, columnName, columnDefinition); + log.info("SUCCESS"); + } catch (Exception e) { + log.error("FAIL"); + e.printStackTrace(); + } + } + + // =============================================== 给指定的Dfs表增加列字段 ====================================== + @GetMapping(value = "/testDfsAddColumn") + public void testDfsAddColumn() { + String tableName = "ZbWaterMeter1Dfs"; + String columnName = "test2"; + String columnDefinition = "STRING"; + try { + tableStructureService.addDfsColumn(tableName, columnName, columnDefinition); + log.info("SUCCESS"); + } catch (Exception e) { + log.error("FAIL"); + e.printStackTrace(); + } + } + + + // ===================================== mqtt消息发送-测试数据入库 START ============================== + @GetMapping(value = "/testSendMessage") + public void testSendMessage() { + for (int i = 0; i < 2; i++) { + List> dataList = new ArrayList<>(); + dataList.add(handleMapByIndex(i)); + DataAnalysisDTO dataAnalysisDTO = DataAnalysisDTO.builder().tableName( "WaterMeterTsetStream").dataList(dataList).build(); + messageSender.send("iot/test1/in-storage", JacksonUtil.objectStr(dataAnalysisDTO)); + } + } + + private Map handleMapByIndex(int index) { + Map map = new HashMap<>(); + map.put("time", "2025.01.01 00:00:00"); + map.put("projectId", "0jZU2102"); + map.put("deviceId", "0jZU2102_0806_0011"); + map.put("WM_WFA", 124.656 + index); + map.put("WM_WFA_Unit", "m³"); + return map; + } + + // =============================================== 创建Dfs表 Stream流表 创建 ====================================== + + + + +} diff --git a/data-storage/src/main/resources/banner.txt b/data-storage/src/main/resources/banner.txt new file mode 100644 index 0000000..6b056d1 --- /dev/null +++ b/data-storage/src/main/resources/banner.txt @@ -0,0 +1,8 @@ + ________ ________ _________ ________ ________ _________ ________ ________ ________ ________ _______ +|\ ___ \|\ __ \|\___ ___\\ __ \ |\ ____\|\___ ___\\ __ \|\ __ \|\ __ \|\ ____\|\ ___ \ +\ \ \_|\ \ \ \|\ \|___ \ \_\ \ \|\ \ ____________\ \ \___|\|___ \ \_\ \ \|\ \ \ \|\ \ \ \|\ \ \ \___|\ \ __/| + \ \ \ \\ \ \ __ \ \ \ \ \ \ __ \|\____________\ \_____ \ \ \ \ \ \ \\\ \ \ _ _\ \ __ \ \ \ __\ \ \_|/__ + \ \ \_\\ \ \ \ \ \ \ \ \ \ \ \ \ \|____________|\|____|\ \ \ \ \ \ \ \\\ \ \ \\ \\ \ \ \ \ \ \|\ \ \ \_|\ \ + \ \_______\ \__\ \__\ \ \__\ \ \__\ \__\ ____\_\ \ \ \__\ \ \_______\ \__\\ _\\ \__\ \__\ \_______\ \_______\ + \|_______|\|__|\|__| \|__| \|__|\|__| |\_________\ \|__| \|_______|\|__|\|__|\|__|\|__|\|_______|\|_______| + \|_________| diff --git a/data-storage/src/main/resources/补水箱.script b/data-storage/src/main/resources/补水箱.script index 7f27abe..c4d9d98 100644 --- a/data-storage/src/main/resources/补水箱.script +++ b/data-storage/src/main/resources/补水箱.script @@ -63,9 +63,11 @@ def createPT(dbPath,ptName){ //createDB(dbPath) //创建分区表 createPT(dbPath,ptName) -//创建流表 +// 取消订阅流表 unsubscribeTable(tableName=stName, actionName=`WaterMeterTsetChgTime) +// 删除流表 // dropStreamTable(stName) +//创建流表 createST(stName) //订阅流表 subscribeTable(tableName=stName, actionName=`WaterMeterTsetChgTime, offset=-1, handler=loadTable(dbPath,ptName), msgAsTable=true) \ No newline at end of file