diff --git a/data-common/pom.xml b/data-common/pom.xml index d5526da..a136dff 100644 --- a/data-common/pom.xml +++ b/data-common/pom.xml @@ -11,4 +11,26 @@ data-common + + + + + org.springframework.integration + spring-integration-mqtt + 6.4.1 + + + org.json + json + 20240303 + + + + org.springframework.boot + spring-boot-starter-web + 3.4.1 + + + + \ No newline at end of file diff --git a/data-storage/src/main/java/com/huaxing/mqtt/constant/MqttConstant.java b/data-common/src/main/java/com/huaxing/common/constant/MqttConstant.java similarity index 88% rename from data-storage/src/main/java/com/huaxing/mqtt/constant/MqttConstant.java rename to data-common/src/main/java/com/huaxing/common/constant/MqttConstant.java index 8266076..eba93f5 100644 --- a/data-storage/src/main/java/com/huaxing/mqtt/constant/MqttConstant.java +++ b/data-common/src/main/java/com/huaxing/common/constant/MqttConstant.java @@ -1,4 +1,4 @@ -package com.huaxing.mqtt.constant; +package com.huaxing.common.constant; /** * 常量 */ diff --git a/data-storage/src/main/java/com/huaxing/data/util/JacksonUtil.java b/data-common/src/main/java/com/huaxing/common/util/JacksonUtil.java similarity index 98% rename from data-storage/src/main/java/com/huaxing/data/util/JacksonUtil.java rename to data-common/src/main/java/com/huaxing/common/util/JacksonUtil.java index 3753c0a..e589f41 100644 --- a/data-storage/src/main/java/com/huaxing/data/util/JacksonUtil.java +++ b/data-common/src/main/java/com/huaxing/common/util/JacksonUtil.java @@ -1,4 +1,4 @@ -package com.huaxing.data.util; +package com.huaxing.common.util; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonProcessingException; diff --git a/data-storage/src/main/java/com/huaxing/mqtt/ReadMe b/data-common/src/main/java/com/huaxing/mqtt/ReadMe similarity index 100% rename from data-storage/src/main/java/com/huaxing/mqtt/ReadMe rename to data-common/src/main/java/com/huaxing/mqtt/ReadMe diff --git a/data-storage/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java b/data-common/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java similarity index 98% rename from data-storage/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java rename to data-common/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java index af9bf76..fb12312 100644 --- a/data-storage/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java +++ b/data-common/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java @@ -136,7 +136,7 @@ public class MqttConfiguration { options.setAutomaticReconnect(true); factory.setConnectionOptions(options); - log.info("初始化 MQTT 配置"); + log.info("【初始化 MQTT 配置】"); return factory; } diff --git a/data-storage/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java b/data-common/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java similarity index 98% rename from data-storage/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java rename to data-common/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java index ac525f3..65311e6 100644 --- a/data-storage/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java +++ b/data-common/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java @@ -1,6 +1,6 @@ package com.huaxing.mqtt.config; -import com.huaxing.mqtt.constant.MqttConstant; +import com.huaxing.common.constant.MqttConstant; import com.huaxing.mqtt.processor.MqttMessageReceiver; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; diff --git a/data-storage/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java b/data-common/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java similarity index 97% rename from data-storage/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java rename to data-common/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java index 15a9b55..37de71b 100644 --- a/data-storage/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java +++ b/data-common/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java @@ -1,6 +1,6 @@ package com.huaxing.mqtt.config; -import com.huaxing.mqtt.constant.MqttConstant; +import com.huaxing.common.constant.MqttConstant; import jakarta.annotation.Resource; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; diff --git a/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java b/data-common/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java similarity index 100% rename from data-storage/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java rename to data-common/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java diff --git a/data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java b/data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java new file mode 100644 index 0000000..27472d0 --- /dev/null +++ b/data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java @@ -0,0 +1,61 @@ +package com.huaxing.mqtt.processor; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.messaging.MessageHandler; +import org.springframework.stereotype.Component; +//import org.springframework.integration.mqtt.support.MqttHeaders; +//import org.springframework.messaging.Message; +//import org.springframework.messaging.MessageHeaders; +//import org.springframework.messaging.MessagingException; +//import org.springframework.scheduling.annotation.Async; +//import java.util.concurrent.CompletableFuture; + +/** + * 消费者处理器 + */ +@Slf4j +@Component +@SuppressWarnings("all") +public abstract class MqttMessageReceiver implements MessageHandler { + + // 获取消息接口 +// public abstract String getMessage(String payload); + +// /** +// * 消息处理 +// * +// * @param message 消息 +// * @throws MessagingException 消息异常 +// */ +// @Override +// @Async("handleMessage") +// public void handleMessage(Message message) throws MessagingException { +// try { +// // 获取消息Topic +// MessageHeaders headers = message.getHeaders(); +// String topic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC)); +// log.info("[获取到的消息的topic]:{} ", topic); +// // 获取消息体 +// String payload = (String) message.getPayload(); +// log.info("[获取到的消息的payload]:{} ", payload); +// // 数据库入库消息 +// if (topic.contains("iot/test1/in-storage")) { +// // 模拟1000000条数据入库 +// log.info("接收到iot/test1/in-storage的消息啦,快去处理"); +// getMessage(payload); +//// long l = System.currentTimeMillis(); +//// System.currentTimeMillis(); +//// CompletableFuture future = CompletableFuture.runAsync(() -> { +////// analysisService.parseStoreData(payload); +//// }); +//// long l1 = System.currentTimeMillis(); +//// log.info("入库完成,耗时:{}", l1 - l); +//// analysisService.parseStoreData(payload); +// } else if (topic.contains("table-update/")){} // TODO 表更新topic +// } catch (Exception e) { +// log.error(e.toString()); +// } +// } + +} + diff --git a/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java b/data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java similarity index 100% rename from data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java rename to data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java diff --git a/data-storage/pom.xml b/data-storage/pom.xml index 0d33494..62ea8f2 100644 --- a/data-storage/pom.xml +++ b/data-storage/pom.xml @@ -13,21 +13,11 @@ jar + - org.projectlombok - lombok - RELEASE - provided - - - org.springframework.integration - spring-integration-mqtt - 6.4.1 - - - org.json - json - 20240303 + com.huaxing + data-common + 0.0.1-SNAPSHOT @@ -35,11 +25,7 @@ spring-boot-starter 3.4.1 - - org.springframework.boot - spring-boot-starter-web - 3.4.1 - + com.baomidou @@ -47,12 +33,6 @@ 3.5.8 - junit - junit - 4.13-beta-3 - compile - - org.aspectj aspectjweaver 1.9.22.1 diff --git a/data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbPoolConfiguration.java b/data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbPoolConfiguration.java index a2684a3..4591b75 100644 --- a/data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbPoolConfiguration.java +++ b/data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbPoolConfiguration.java @@ -1,6 +1,7 @@ package com.huaxing.data.dolphindb.config; import com.xxdb.*; +import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Configuration; /** @@ -12,6 +13,7 @@ import org.springframework.context.annotation.Configuration; * @Date: 2025/1/15 9:58 * @Version: 1.0 */ +@Slf4j @Configuration public class DolphinDbPoolConfiguration extends SimpleDBConnectionPool { @@ -25,5 +27,6 @@ public class DolphinDbPoolConfiguration extends SimpleDBConnectionPool { public DolphinDbPoolConfiguration(DolphinDbConfiguration dbConfiguration) { super(dbConfiguration.loadConfig()); this.dbConn = super.getConnection(); + log.info("【初始化 DolphinDbPool 连接池】"); } } diff --git a/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java b/data-storage/src/main/java/com/huaxing/data/mqtt/MqttMessageConsumer.java similarity index 79% rename from data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java rename to data-storage/src/main/java/com/huaxing/data/mqtt/MqttMessageConsumer.java index 6154d4b..941cf2d 100644 --- a/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java +++ b/data-storage/src/main/java/com/huaxing/data/mqtt/MqttMessageConsumer.java @@ -1,10 +1,10 @@ -package com.huaxing.mqtt.processor; +package com.huaxing.data.mqtt; import com.huaxing.data.storage.service.IDataAnalysisService; +import com.huaxing.mqtt.processor.MqttMessageReceiver; import lombok.extern.slf4j.Slf4j; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessagingException; import org.springframework.scheduling.annotation.Async; @@ -12,28 +12,30 @@ import org.springframework.stereotype.Component; import java.util.concurrent.CompletableFuture; + /** - * 消费者处理器 + * @ProjectName: data-bridge + * @Package: com.huaxing.data.mqtt + * @ClassName: MqttMessageConsumer + * @Author: swordmeng8@163.com + * @Description: 消费者 + * @Date: 2025/1/20 11:23 + * @Version: 1.0 */ @Slf4j @Component -@SuppressWarnings("all") -public class MqttMessageReceiver implements MessageHandler { +public class MqttMessageConsumer extends MqttMessageReceiver { /** * 数据处理服务 */ final IDataAnalysisService analysisService; - public MqttMessageReceiver(IDataAnalysisService analysisService) { + public MqttMessageConsumer(IDataAnalysisService analysisService) { this.analysisService = analysisService; } - /** - * 消息处理 - * - * @param message 消息 - * @throws MessagingException 消息异常 - */ + + @Override @Async("handleMessage") public void handleMessage(Message message) throws MessagingException { @@ -54,13 +56,13 @@ public class MqttMessageReceiver implements MessageHandler { CompletableFuture future = CompletableFuture.runAsync(() -> { analysisService.parseStoreData(payload); }); +// future.get(); long l1 = System.currentTimeMillis(); log.info("入库完成,耗时:{}", l1 - l); -// analysisService.parseStoreData(payload); + analysisService.parseStoreData(payload); } else if (topic.contains("table-update/")){} // TODO 表更新topic } catch (Exception e) { log.error(e.toString()); } } } - diff --git a/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DataAnalysisService.java b/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DataAnalysisService.java index 4b76ea4..7c9c15d 100644 --- a/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DataAnalysisService.java +++ b/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DataAnalysisService.java @@ -3,7 +3,7 @@ package com.huaxing.data.storage.service.impl; import com.huaxing.data.storage.domain.DataAnalysisDTO; import com.huaxing.data.storage.service.IDataAnalysisService; import com.huaxing.data.storage.service.IDeviceDataStoredService; -import com.huaxing.data.util.JacksonUtil; +import com.huaxing.common.util.JacksonUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; diff --git a/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataStoredServiceImpl.java b/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataStoredServiceImpl.java index 67c7f37..b01ae55 100644 --- a/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataStoredServiceImpl.java +++ b/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataStoredServiceImpl.java @@ -4,7 +4,7 @@ import com.huaxing.data.storage.domain.DataAnalysisDTO; import com.huaxing.data.storage.mapper.IDeviceDataStoredMapper; import com.huaxing.data.storage.service.IDeviceDataStoredService; import com.huaxing.data.dolphindb.base.CommonService; -import com.huaxing.data.util.JacksonUtil; +import com.huaxing.common.util.JacksonUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; diff --git a/data-storage/src/main/java/com/huaxing/data/test/controller/TestController.java b/data-storage/src/main/java/com/huaxing/data/test/controller/TestController.java index cb3f316..aa372dc 100644 --- a/data-storage/src/main/java/com/huaxing/data/test/controller/TestController.java +++ b/data-storage/src/main/java/com/huaxing/data/test/controller/TestController.java @@ -5,7 +5,7 @@ import com.huaxing.data.storage.service.IDeviceDataQueryDfsService; import com.huaxing.data.storage.service.IDeviceDataQueryStreamService; import com.huaxing.data.storage.service.IDeviceDataStoredService; import com.huaxing.data.tablemanagement.service.ITableStructureService; -import com.huaxing.data.util.JacksonUtil; +import com.huaxing.common.util.JacksonUtil; import com.huaxing.mqtt.processor.MqttMessageSender; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; @@ -13,10 +13,7 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; /** @@ -117,7 +114,7 @@ public class TestController { private Map handleMapByIndex(int index) { Map map = new HashMap<>(); - map.put("time", "2025.01.01 00:00:00"); + map.put("time", new Date()); map.put("projectId", "0jZU2102"); map.put("deviceId", "0jZU2102_0806_0011"); map.put("WM_WFA", 124.656 + index); diff --git a/data-storage/src/main/resources/banner.txt b/data-storage/src/main/resources/banner.txt deleted file mode 100644 index 6b056d1..0000000 --- a/data-storage/src/main/resources/banner.txt +++ /dev/null @@ -1,8 +0,0 @@ - ________ ________ _________ ________ ________ _________ ________ ________ ________ ________ _______ -|\ ___ \|\ __ \|\___ ___\\ __ \ |\ ____\|\___ ___\\ __ \|\ __ \|\ __ \|\ ____\|\ ___ \ -\ \ \_|\ \ \ \|\ \|___ \ \_\ \ \|\ \ ____________\ \ \___|\|___ \ \_\ \ \|\ \ \ \|\ \ \ \|\ \ \ \___|\ \ __/| - \ \ \ \\ \ \ __ \ \ \ \ \ \ __ \|\____________\ \_____ \ \ \ \ \ \ \\\ \ \ _ _\ \ __ \ \ \ __\ \ \_|/__ - \ \ \_\\ \ \ \ \ \ \ \ \ \ \ \ \ \|____________|\|____|\ \ \ \ \ \ \ \\\ \ \ \\ \\ \ \ \ \ \ \|\ \ \ \_|\ \ - \ \_______\ \__\ \__\ \ \__\ \ \__\ \__\ ____\_\ \ \ \__\ \ \_______\ \__\\ _\\ \__\ \__\ \_______\ \_______\ - \|_______|\|__|\|__| \|__| \|__|\|__| |\_________\ \|__| \|_______|\|__|\|__|\|__|\|__|\|_______|\|_______| - \|_________| diff --git a/pom.xml b/pom.xml index fe572b1..058fa30 100644 --- a/pom.xml +++ b/pom.xml @@ -15,6 +15,15 @@ data-storage + + + org.projectlombok + lombok + RELEASE + provided + + +