Browse Source

项目架构调整

tags/v2.0
swordmeng 1 month ago
parent
commit
34d235aa94
  1. 22
      data-common/pom.xml
  2. 2
      data-common/src/main/java/com/huaxing/common/constant/MqttConstant.java
  3. 2
      data-common/src/main/java/com/huaxing/common/util/JacksonUtil.java
  4. 0
      data-common/src/main/java/com/huaxing/mqtt/ReadMe
  5. 2
      data-common/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java
  6. 2
      data-common/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java
  7. 2
      data-common/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java
  8. 0
      data-common/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java
  9. 61
      data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java
  10. 0
      data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java
  11. 30
      data-storage/pom.xml
  12. 3
      data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbPoolConfiguration.java
  13. 30
      data-storage/src/main/java/com/huaxing/data/mqtt/MqttMessageConsumer.java
  14. 2
      data-storage/src/main/java/com/huaxing/data/storage/service/impl/DataAnalysisService.java
  15. 2
      data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataStoredServiceImpl.java
  16. 9
      data-storage/src/main/java/com/huaxing/data/test/controller/TestController.java
  17. 8
      data-storage/src/main/resources/banner.txt
  18. 9
      pom.xml

22
data-common/pom.xml

@ -11,4 +11,26 @@
<artifactId>data-common</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.4.1</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20240303</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.4.1</version>
</dependency>
</dependencies>
</project>

2
data-storage/src/main/java/com/huaxing/mqtt/constant/MqttConstant.java → data-common/src/main/java/com/huaxing/common/constant/MqttConstant.java

@ -1,4 +1,4 @@
package com.huaxing.mqtt.constant;
package com.huaxing.common.constant;
/**
* 常量
*/

2
data-storage/src/main/java/com/huaxing/data/util/JacksonUtil.java → data-common/src/main/java/com/huaxing/common/util/JacksonUtil.java

@ -1,4 +1,4 @@
package com.huaxing.data.util;
package com.huaxing.common.util;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;

0
data-storage/src/main/java/com/huaxing/mqtt/ReadMe → data-common/src/main/java/com/huaxing/mqtt/ReadMe

2
data-storage/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java → data-common/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java

@ -136,7 +136,7 @@ public class MqttConfiguration {
options.setAutomaticReconnect(true);
factory.setConnectionOptions(options);
log.info("初始化 MQTT 配置");
log.info("初始化 MQTT 配置");
return factory;
}

2
data-storage/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java → data-common/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java

@ -1,6 +1,6 @@
package com.huaxing.mqtt.config;
import com.huaxing.mqtt.constant.MqttConstant;
import com.huaxing.common.constant.MqttConstant;
import com.huaxing.mqtt.processor.MqttMessageReceiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;

2
data-storage/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java → data-common/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java

@ -1,6 +1,6 @@
package com.huaxing.mqtt.config;
import com.huaxing.mqtt.constant.MqttConstant;
import com.huaxing.common.constant.MqttConstant;
import jakarta.annotation.Resource;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

0
data-storage/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java → data-common/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java

61
data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java

@ -0,0 +1,61 @@
package com.huaxing.mqtt.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Component;
//import org.springframework.integration.mqtt.support.MqttHeaders;
//import org.springframework.messaging.Message;
//import org.springframework.messaging.MessageHeaders;
//import org.springframework.messaging.MessagingException;
//import org.springframework.scheduling.annotation.Async;
//import java.util.concurrent.CompletableFuture;
/**
* 消费者处理器
*/
@Slf4j
@Component
@SuppressWarnings("all")
public abstract class MqttMessageReceiver implements MessageHandler {
// 获取消息接口
// public abstract String getMessage(String payload);
// /**
// * 消息处理
// *
// * @param message 消息
// * @throws MessagingException 消息异常
// */
// @Override
// @Async("handleMessage")
// public void handleMessage(Message<?> message) throws MessagingException {
// try {
// // 获取消息Topic
// MessageHeaders headers = message.getHeaders();
// String topic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC));
// log.info("[获取到的消息的topic]:{} ", topic);
// // 获取消息体
// String payload = (String) message.getPayload();
// log.info("[获取到的消息的payload]:{} ", payload);
// // 数据库入库消息
// if (topic.contains("iot/test1/in-storage")) {
// // 模拟1000000条数据入库
// log.info("接收到iot/test1/in-storage的消息啦,快去处理");
// getMessage(payload);
//// long l = System.currentTimeMillis();
//// System.currentTimeMillis();
//// CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
////// analysisService.parseStoreData(payload);
//// });
//// long l1 = System.currentTimeMillis();
//// log.info("入库完成,耗时:{}", l1 - l);
//// analysisService.parseStoreData(payload);
// } else if (topic.contains("table-update/")){} // TODO 表更新topic
// } catch (Exception e) {
// log.error(e.toString());
// }
// }
}

0
data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java → data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java

30
data-storage/pom.xml

@ -13,21 +13,11 @@
<packaging>jar</packaging>
<dependencies>
<!-- 引入data-common-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>RELEASE</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.4.1</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20240303</version>
<groupId>com.huaxing</groupId>
<artifactId>data-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
@ -35,11 +25,7 @@
<artifactId>spring-boot-starter</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
@ -47,12 +33,6 @@
<version>3.5.8</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13-beta-3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.9.22.1</version>

3
data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbPoolConfiguration.java

@ -1,6 +1,7 @@
package com.huaxing.data.dolphindb.config;
import com.xxdb.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
/**
@ -12,6 +13,7 @@ import org.springframework.context.annotation.Configuration;
* @Date: 2025/1/15 9:58
* @Version: 1.0
*/
@Slf4j
@Configuration
public class DolphinDbPoolConfiguration extends SimpleDBConnectionPool {
@ -25,5 +27,6 @@ public class DolphinDbPoolConfiguration extends SimpleDBConnectionPool {
public DolphinDbPoolConfiguration(DolphinDbConfiguration dbConfiguration) {
super(dbConfiguration.loadConfig());
this.dbConn = super.getConnection();
log.info("【初始化 DolphinDbPool 连接池】");
}
}

30
data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java → data-storage/src/main/java/com/huaxing/data/mqtt/MqttMessageConsumer.java

@ -1,10 +1,10 @@
package com.huaxing.mqtt.processor;
package com.huaxing.data.mqtt;
import com.huaxing.data.storage.service.IDataAnalysisService;
import com.huaxing.mqtt.processor.MqttMessageReceiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.annotation.Async;
@ -12,28 +12,30 @@ import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
/**
* 消费者处理器
* @ProjectName: data-bridge
* @Package: com.huaxing.data.mqtt
* @ClassName: MqttMessageConsumer
* @Author: swordmeng8@163.com
* @Description: 消费者
* @Date: 2025/1/20 11:23
* @Version: 1.0
*/
@Slf4j
@Component
@SuppressWarnings("all")
public class MqttMessageReceiver implements MessageHandler {
public class MqttMessageConsumer extends MqttMessageReceiver {
/**
* 数据处理服务
*/
final IDataAnalysisService analysisService;
public MqttMessageReceiver(IDataAnalysisService analysisService) {
public MqttMessageConsumer(IDataAnalysisService analysisService) {
this.analysisService = analysisService;
}
/**
* 消息处理
*
* @param message 消息
* @throws MessagingException 消息异常
*/
@Override
@Async("handleMessage")
public void handleMessage(Message<?> message) throws MessagingException {
@ -54,13 +56,13 @@ public class MqttMessageReceiver implements MessageHandler {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
analysisService.parseStoreData(payload);
});
// future.get();
long l1 = System.currentTimeMillis();
log.info("入库完成,耗时:{}", l1 - l);
// analysisService.parseStoreData(payload);
analysisService.parseStoreData(payload);
} else if (topic.contains("table-update/")){} // TODO 表更新topic
} catch (Exception e) {
log.error(e.toString());
}
}
}

2
data-storage/src/main/java/com/huaxing/data/storage/service/impl/DataAnalysisService.java

@ -3,7 +3,7 @@ package com.huaxing.data.storage.service.impl;
import com.huaxing.data.storage.domain.DataAnalysisDTO;
import com.huaxing.data.storage.service.IDataAnalysisService;
import com.huaxing.data.storage.service.IDeviceDataStoredService;
import com.huaxing.data.util.JacksonUtil;
import com.huaxing.common.util.JacksonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

2
data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataStoredServiceImpl.java

@ -4,7 +4,7 @@ import com.huaxing.data.storage.domain.DataAnalysisDTO;
import com.huaxing.data.storage.mapper.IDeviceDataStoredMapper;
import com.huaxing.data.storage.service.IDeviceDataStoredService;
import com.huaxing.data.dolphindb.base.CommonService;
import com.huaxing.data.util.JacksonUtil;
import com.huaxing.common.util.JacksonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

9
data-storage/src/main/java/com/huaxing/data/test/controller/TestController.java

@ -5,7 +5,7 @@ import com.huaxing.data.storage.service.IDeviceDataQueryDfsService;
import com.huaxing.data.storage.service.IDeviceDataQueryStreamService;
import com.huaxing.data.storage.service.IDeviceDataStoredService;
import com.huaxing.data.tablemanagement.service.ITableStructureService;
import com.huaxing.data.util.JacksonUtil;
import com.huaxing.common.util.JacksonUtil;
import com.huaxing.mqtt.processor.MqttMessageSender;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
@ -13,10 +13,7 @@ import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
/**
@ -117,7 +114,7 @@ public class TestController {
private Map<String, Object> handleMapByIndex(int index) {
Map<String, Object> map = new HashMap<>();
map.put("time", "2025.01.01 00:00:00");
map.put("time", new Date());
map.put("projectId", "0jZU2102");
map.put("deviceId", "0jZU2102_0806_0011");
map.put("WM_WFA", 124.656 + index);

8
data-storage/src/main/resources/banner.txt

@ -1,8 +0,0 @@
________ ________ _________ ________ ________ _________ ________ ________ ________ ________ _______
|\ ___ \|\ __ \|\___ ___\\ __ \ |\ ____\|\___ ___\\ __ \|\ __ \|\ __ \|\ ____\|\ ___ \
\ \ \_|\ \ \ \|\ \|___ \ \_\ \ \|\ \ ____________\ \ \___|\|___ \ \_\ \ \|\ \ \ \|\ \ \ \|\ \ \ \___|\ \ __/|
\ \ \ \\ \ \ __ \ \ \ \ \ \ __ \|\____________\ \_____ \ \ \ \ \ \ \\\ \ \ _ _\ \ __ \ \ \ __\ \ \_|/__
\ \ \_\\ \ \ \ \ \ \ \ \ \ \ \ \ \|____________|\|____|\ \ \ \ \ \ \ \\\ \ \ \\ \\ \ \ \ \ \ \|\ \ \ \_|\ \
\ \_______\ \__\ \__\ \ \__\ \ \__\ \__\ ____\_\ \ \ \__\ \ \_______\ \__\\ _\\ \__\ \__\ \_______\ \_______\
\|_______|\|__|\|__| \|__| \|__|\|__| |\_________\ \|__| \|_______|\|__|\|__|\|__|\|__|\|_______|\|_______|
\|_________|

9
pom.xml

@ -15,6 +15,15 @@
<module>data-storage</module>
</modules>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>RELEASE</version>
<scope>provided</scope>
</dependency>
</dependencies>
<repositories>

Loading…
Cancel
Save