From 34d235aa947e6235b17999ed0a06ba18e9a12940 Mon Sep 17 00:00:00 2001 From: swordmeng Date: Mon, 20 Jan 2025 11:56:14 +0800 Subject: [PATCH] =?UTF-8?q?=E9=A1=B9=E7=9B=AE=E6=9E=B6=E6=9E=84=E8=B0=83?= =?UTF-8?q?=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- data-common/pom.xml | 22 ++++ .../com/huaxing/common/constant/MqttConstant.java | 17 +++ .../java/com/huaxing/common/util/JacksonUtil.java | 77 +++++++++++ data-common/src/main/java/com/huaxing/mqtt/ReadMe | 0 .../com/huaxing/mqtt/config/MqttConfiguration.java | 143 +++++++++++++++++++++ .../mqtt/config/MqttConsumerConfiguration.java | 96 ++++++++++++++ .../mqtt/config/MqttProducerConfiguration.java | 65 ++++++++++ .../com/huaxing/mqtt/processor/MqttGateway.java | 46 +++++++ .../mqtt/processor/MqttMessageReceiver.java | 61 +++++++++ .../huaxing/mqtt/processor/MqttMessageSender.java | 52 ++++++++ data-storage/pom.xml | 30 +---- .../config/DolphinDbPoolConfiguration.java | 3 + .../com/huaxing/data/mqtt/MqttMessageConsumer.java | 68 ++++++++++ .../storage/service/impl/DataAnalysisService.java | 2 +- .../service/impl/DeviceDataStoredServiceImpl.java | 2 +- .../data/test/controller/TestController.java | 9 +- .../java/com/huaxing/data/util/JacksonUtil.java | 77 ----------- data-storage/src/main/java/com/huaxing/mqtt/ReadMe | 0 .../com/huaxing/mqtt/config/MqttConfiguration.java | 143 --------------------- .../mqtt/config/MqttConsumerConfiguration.java | 96 -------------- .../mqtt/config/MqttProducerConfiguration.java | 65 ---------- .../com/huaxing/mqtt/constant/MqttConstant.java | 17 --- .../com/huaxing/mqtt/processor/MqttGateway.java | 46 ------- .../mqtt/processor/MqttMessageReceiver.java | 66 ---------- .../huaxing/mqtt/processor/MqttMessageSender.java | 52 -------- data-storage/src/main/resources/banner.txt | 8 -- pom.xml | 9 ++ 27 files changed, 669 insertions(+), 603 deletions(-) create mode 100644 data-common/src/main/java/com/huaxing/common/constant/MqttConstant.java create mode 100644 data-common/src/main/java/com/huaxing/common/util/JacksonUtil.java create mode 100644 data-common/src/main/java/com/huaxing/mqtt/ReadMe create mode 100644 data-common/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java create mode 100644 data-common/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java create mode 100644 data-common/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java create mode 100644 data-common/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java create mode 100644 data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java create mode 100644 data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java create mode 100644 data-storage/src/main/java/com/huaxing/data/mqtt/MqttMessageConsumer.java delete mode 100644 data-storage/src/main/java/com/huaxing/data/util/JacksonUtil.java delete mode 100644 data-storage/src/main/java/com/huaxing/mqtt/ReadMe delete mode 100644 data-storage/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java delete mode 100644 data-storage/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java delete mode 100644 data-storage/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java delete mode 100644 data-storage/src/main/java/com/huaxing/mqtt/constant/MqttConstant.java delete mode 100644 data-storage/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java delete mode 100644 data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java delete mode 100644 data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java delete mode 100644 data-storage/src/main/resources/banner.txt diff --git a/data-common/pom.xml b/data-common/pom.xml index d5526da..a136dff 100644 --- a/data-common/pom.xml +++ b/data-common/pom.xml @@ -11,4 +11,26 @@ data-common + + + + + org.springframework.integration + spring-integration-mqtt + 6.4.1 + + + org.json + json + 20240303 + + + + org.springframework.boot + spring-boot-starter-web + 3.4.1 + + + + \ No newline at end of file diff --git a/data-common/src/main/java/com/huaxing/common/constant/MqttConstant.java b/data-common/src/main/java/com/huaxing/common/constant/MqttConstant.java new file mode 100644 index 0000000..eba93f5 --- /dev/null +++ b/data-common/src/main/java/com/huaxing/common/constant/MqttConstant.java @@ -0,0 +1,17 @@ +package com.huaxing.common.constant; +/** + * 常量 + */ +public class MqttConstant { + + /** + * 客户端id消费者后缀 + */ + public static final String CLIENT_SUFFIX_CONSUMERS = "_consumers"; + /** + * 客户端id生产者后缀 + */ + public static final String CLIENT_SUFFIX_PRODUCERS = "_producers"; + +} + diff --git a/data-common/src/main/java/com/huaxing/common/util/JacksonUtil.java b/data-common/src/main/java/com/huaxing/common/util/JacksonUtil.java new file mode 100644 index 0000000..e589f41 --- /dev/null +++ b/data-common/src/main/java/com/huaxing/common/util/JacksonUtil.java @@ -0,0 +1,77 @@ +package com.huaxing.common.util; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author kwl99 + * @since 2024-08-08 14:31 + */ +public class JacksonUtil { + + /** + * 将对象转为json字符串 + */ + public static String objectStr(Object object) { + ObjectMapper mapper = new ObjectMapper(); + try { + + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + + return mapper.writeValueAsString(object); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + /** + * 将json字符串转为对象 + */ + public static T strToObject(String str, Class clazz) { + ObjectMapper mapper = new ObjectMapper(); + try { + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return mapper.readValue(str, clazz); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + + /** + * 将json字符串转为对象列表 + */ + public static List strToObjectList(String str, Class clazz) { + ObjectMapper mapper = new ObjectMapper(); + try { + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return mapper.readValue(str, mapper.getTypeFactory().constructCollectionType(List.class, clazz)); + } catch (Exception e) { + throw new RuntimeException(e); + + } + } + + /** + * 将json字符串转为对象列表 + */ + public static List> strToMapList(String str) { + ObjectMapper mapper = new ObjectMapper(); + try { + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return mapper.readValue(str, mapper.getTypeFactory().constructCollectionType(List.class, HashMap.class)); + } catch (Exception e) { + throw new RuntimeException(e); + + } + } + +} diff --git a/data-common/src/main/java/com/huaxing/mqtt/ReadMe b/data-common/src/main/java/com/huaxing/mqtt/ReadMe new file mode 100644 index 0000000..e69de29 diff --git a/data-common/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java b/data-common/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java new file mode 100644 index 0000000..fb12312 --- /dev/null +++ b/data-common/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java @@ -0,0 +1,143 @@ +package com.huaxing.mqtt.config; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.util.CollectionUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * mqtt全局相关配置信息 + */ +@Slf4j +@Setter +@Getter +@Configuration +@ConfigurationProperties("mqtt") +@IntegrationComponentScan(basePackages = "com.huaxing.mqtt.*") +public class MqttConfiguration { + + /** + * 用户名 + */ + private String username; + + /** + * 密码 + */ + private String password; + + /** + * 连接地址 + */ + private String hostUrl; + + /** + * 客户Id + */ + private String clientId; + + /** + * 默认连接话题 + */ + private String defaultTopic; + + /** + * 超时时间 + */ + private int timeout; + + /** + * qos + */ + private int qos; + + /** + * 订阅超时时间 + */ + private int completionTimeout; + + /** + * 保持连接数 + */ + private int keepalive; + + /** + * 2024-12-06新增 + * 要订阅的其他主题 + */ + private List topics; + + /** + * 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息 + */ + private static final byte[] WILL_DATA = "offline".getBytes(); + + + /** + * 获取所有订阅的主题 + * @return 所有订阅的主题 + */ + public String[] getAllTopics() { + // 校验配置文件是否配置 + if (CollectionUtils.isEmpty(topics)) { + this.topics = new ArrayList<>(); + } + // 将默认主题条件到其他主题里 + this.topics.add(defaultTopic); + // 返回主题数组 + return topics.toArray(new String[0]); + } + + /** + * 注册MQTT客户端工厂 + * @return MqttPahoClientFactory + */ + @Bean + public MqttPahoClientFactory mqttClientFactory() { + // 客户端工厂 + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + + MqttConnectOptions options = new MqttConnectOptions(); + // 设置连接的用户名 + options.setUserName(username); + // 设置连接的密码 + options.setPassword(password.toCharArray()); + // 设置连接的地址 + options.setServerURIs(new String[]{hostUrl}); + + // 如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持: + // 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。 + // 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。 + options.setCleanSession(true); + + // 设置超时时间,该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。 + // 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。 + options.setConnectionTimeout(10); + + // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线 + // 此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0 + // 但这个方法并没有重连的机制 + options.setKeepAliveInterval(20); + + // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。 + options.setWill("willTopic", WILL_DATA, 2, false); + + //自动重新连接 + options.setAutomaticReconnect(true); + factory.setConnectionOptions(options); + + log.info("【初始化 MQTT 配置】"); + + return factory; + } +} diff --git a/data-common/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java b/data-common/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java new file mode 100644 index 0000000..65311e6 --- /dev/null +++ b/data-common/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java @@ -0,0 +1,96 @@ +package com.huaxing.mqtt.config; + +import com.huaxing.common.constant.MqttConstant; +import com.huaxing.mqtt.processor.MqttMessageReceiver; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +import java.util.UUID; + + +/** + * MQTT消费者配置 + */ +@Slf4j +@Configuration +@IntegrationComponentScan +public class MqttConsumerConfiguration { + + final MqttConfiguration mqttConfiguration; + final MqttMessageReceiver mqttMessageReceiver; + + public MqttConsumerConfiguration(MqttConfiguration mqttConfiguration, MqttMessageReceiver mqttMessageReceiver) { + this.mqttConfiguration = mqttConfiguration; + this.mqttMessageReceiver = mqttMessageReceiver; + } + + + /** + * 此处可以使用其他消息通道 + * MQTT信息通道(消费者) + * Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。 + */ + @Bean + public MessageChannel mqttInBoundChannel() { + return new DirectChannel(); + } + + /** + * mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。 + */ + @Bean + @ServiceActivator(inputChannel = "mqttInBoundChannel") + public MessageHandler mqttMessageHandler() { + return this.mqttMessageReceiver; + } + + /** + * MQTT消息订阅绑定(消费者) + * 适配器, 两个topic共用一个adapter + * 客户端作为消费者,订阅主题,消费消息 + */ + @Bean + public MessageProducerSupport mqttInbound() { + // 获取客户端id + String clientId = mqttConfiguration.getClientId(); + // 获取默认主题 +// String defaultTopic = mqttConfiguration.getDefaultTopic(); + + // 获取所有配置的主题 + String[] topics = mqttConfiguration.getAllTopics(); + + // 获取客户端工厂 + MqttPahoClientFactory mqttPahoClientFactory = mqttConfiguration.mqttClientFactory(); + + // Paho客户端消息驱动通道适配器,主要用来订阅主题 + MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( + UUID.randomUUID() + clientId + MqttConstant.CLIENT_SUFFIX_CONSUMERS, + mqttPahoClientFactory, + // 所有需要订阅的topic + topics + ); + adapter.setCompletionTimeout(mqttConfiguration.getCompletionTimeout()); + // Paho消息转换器 + DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(); + // 按字节接收消息 + // defaultPahoMessageConverter.setPayloadAsBytes(true); + adapter.setConverter(defaultPahoMessageConverter); + // 设置QoS + adapter.setQos(mqttConfiguration.getQos()); + // 设置订阅通道 + adapter.setOutputChannel(mqttInBoundChannel()); + return adapter; + } + +} + diff --git a/data-common/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java b/data-common/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java new file mode 100644 index 0000000..37de71b --- /dev/null +++ b/data-common/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java @@ -0,0 +1,65 @@ +package com.huaxing.mqtt.config; + +import com.huaxing.common.constant.MqttConstant; +import jakarta.annotation.Resource; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + + + +/** + * MQTT生产者配置 + */ +@Slf4j +@Configuration +@AllArgsConstructor +public class MqttProducerConfiguration { + + @Resource + private MqttConfiguration mqttConfiguration; + + /** + * MQTT信息通道(生产者) + */ + @Bean + public MessageChannel mqttOutboundChannel() { + return new DirectChannel(); + } + + /** + * MQTT消息处理器(生产者) + */ + @Bean + @ServiceActivator(inputChannel = "mqttOutboundChannel") + public MessageHandler mqttOutbound() { + // 客户端id + String clientId = mqttConfiguration.getClientId(); + // 默认主题 + String defaultTopic = mqttConfiguration.getDefaultTopic(); + MqttPahoClientFactory mqttPahoClientFactory = mqttConfiguration.mqttClientFactory(); + + // 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory + MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + MqttConstant.CLIENT_SUFFIX_PRODUCERS, mqttPahoClientFactory); + // true,异步,发送消息时将不会阻塞。 + messageHandler.setAsync(true); + messageHandler.setDefaultTopic(defaultTopic); + // 默认QoS + messageHandler.setDefaultQos(1); + // Paho消息转换器 + DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(); + // defaultPahoMessageConverter.setPayloadAsBytes(true); + // 发送默认按字节类型发送消息 + messageHandler.setConverter(defaultPahoMessageConverter); + return messageHandler; + } + +} diff --git a/data-common/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java b/data-common/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java new file mode 100644 index 0000000..89c4fa2 --- /dev/null +++ b/data-common/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java @@ -0,0 +1,46 @@ +package com.huaxing.mqtt.processor; + +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; + +/** + * 生产者处理器 + */ +@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") +public interface MqttGateway { + + /** + * 发送mqtt消息 + * + * @param topic 主题 + * @param payload 内容 + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); + + /** + * 发送包含qos的消息 + * + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); + + /** + * 发送包含qos的消息 + * + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); +} diff --git a/data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java b/data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java new file mode 100644 index 0000000..27472d0 --- /dev/null +++ b/data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java @@ -0,0 +1,61 @@ +package com.huaxing.mqtt.processor; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.messaging.MessageHandler; +import org.springframework.stereotype.Component; +//import org.springframework.integration.mqtt.support.MqttHeaders; +//import org.springframework.messaging.Message; +//import org.springframework.messaging.MessageHeaders; +//import org.springframework.messaging.MessagingException; +//import org.springframework.scheduling.annotation.Async; +//import java.util.concurrent.CompletableFuture; + +/** + * 消费者处理器 + */ +@Slf4j +@Component +@SuppressWarnings("all") +public abstract class MqttMessageReceiver implements MessageHandler { + + // 获取消息接口 +// public abstract String getMessage(String payload); + +// /** +// * 消息处理 +// * +// * @param message 消息 +// * @throws MessagingException 消息异常 +// */ +// @Override +// @Async("handleMessage") +// public void handleMessage(Message message) throws MessagingException { +// try { +// // 获取消息Topic +// MessageHeaders headers = message.getHeaders(); +// String topic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC)); +// log.info("[获取到的消息的topic]:{} ", topic); +// // 获取消息体 +// String payload = (String) message.getPayload(); +// log.info("[获取到的消息的payload]:{} ", payload); +// // 数据库入库消息 +// if (topic.contains("iot/test1/in-storage")) { +// // 模拟1000000条数据入库 +// log.info("接收到iot/test1/in-storage的消息啦,快去处理"); +// getMessage(payload); +//// long l = System.currentTimeMillis(); +//// System.currentTimeMillis(); +//// CompletableFuture future = CompletableFuture.runAsync(() -> { +////// analysisService.parseStoreData(payload); +//// }); +//// long l1 = System.currentTimeMillis(); +//// log.info("入库完成,耗时:{}", l1 - l); +//// analysisService.parseStoreData(payload); +// } else if (topic.contains("table-update/")){} // TODO 表更新topic +// } catch (Exception e) { +// log.error(e.toString()); +// } +// } + +} + diff --git a/data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java b/data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java new file mode 100644 index 0000000..27cc2f8 --- /dev/null +++ b/data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java @@ -0,0 +1,52 @@ +package com.huaxing.mqtt.processor; + +import org.json.JSONObject; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + + +/** + * 生产者处理器 + */ +@Component +public class MqttMessageSender { + + @Autowired + private MqttGateway mqttGateway; + + + /** + * 发送mqtt消息 + * + * @param topic 主题 + * @param message 内容 + * @return void + */ + public void send(String topic, String message) { + mqttGateway.sendToMqtt(topic, message); + } + + /** + * 发送包含qos的消息 + * + * @param topic 主题 + * @param qos 质量 + * @param messageBody 消息体 + * @return void + */ + public void send(String topic, int qos, JSONObject messageBody) { + mqttGateway.sendToMqtt(topic, qos, messageBody.toString()); + } + + /** + * 发送包含qos的消息 + * + * @param topic 主题 + * @param qos 质量 + * @param message 消息体 + * @return void + */ + public void send(String topic, int qos, byte[] message) { + mqttGateway.sendToMqtt(topic, qos, message); + } +} diff --git a/data-storage/pom.xml b/data-storage/pom.xml index 0d33494..62ea8f2 100644 --- a/data-storage/pom.xml +++ b/data-storage/pom.xml @@ -13,21 +13,11 @@ jar + - org.projectlombok - lombok - RELEASE - provided - - - org.springframework.integration - spring-integration-mqtt - 6.4.1 - - - org.json - json - 20240303 + com.huaxing + data-common + 0.0.1-SNAPSHOT @@ -35,11 +25,7 @@ spring-boot-starter 3.4.1 - - org.springframework.boot - spring-boot-starter-web - 3.4.1 - + com.baomidou @@ -47,12 +33,6 @@ 3.5.8 - junit - junit - 4.13-beta-3 - compile - - org.aspectj aspectjweaver 1.9.22.1 diff --git a/data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbPoolConfiguration.java b/data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbPoolConfiguration.java index a2684a3..4591b75 100644 --- a/data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbPoolConfiguration.java +++ b/data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbPoolConfiguration.java @@ -1,6 +1,7 @@ package com.huaxing.data.dolphindb.config; import com.xxdb.*; +import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Configuration; /** @@ -12,6 +13,7 @@ import org.springframework.context.annotation.Configuration; * @Date: 2025/1/15 9:58 * @Version: 1.0 */ +@Slf4j @Configuration public class DolphinDbPoolConfiguration extends SimpleDBConnectionPool { @@ -25,5 +27,6 @@ public class DolphinDbPoolConfiguration extends SimpleDBConnectionPool { public DolphinDbPoolConfiguration(DolphinDbConfiguration dbConfiguration) { super(dbConfiguration.loadConfig()); this.dbConn = super.getConnection(); + log.info("【初始化 DolphinDbPool 连接池】"); } } diff --git a/data-storage/src/main/java/com/huaxing/data/mqtt/MqttMessageConsumer.java b/data-storage/src/main/java/com/huaxing/data/mqtt/MqttMessageConsumer.java new file mode 100644 index 0000000..941cf2d --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/mqtt/MqttMessageConsumer.java @@ -0,0 +1,68 @@ +package com.huaxing.data.mqtt; + +import com.huaxing.data.storage.service.IDataAnalysisService; +import com.huaxing.mqtt.processor.MqttMessageReceiver; +import lombok.extern.slf4j.Slf4j; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import java.util.concurrent.CompletableFuture; + + +/** + * @ProjectName: data-bridge + * @Package: com.huaxing.data.mqtt + * @ClassName: MqttMessageConsumer + * @Author: swordmeng8@163.com + * @Description: 消费者 + * @Date: 2025/1/20 11:23 + * @Version: 1.0 + */ +@Slf4j +@Component +public class MqttMessageConsumer extends MqttMessageReceiver { + + /** + * 数据处理服务 + */ + final IDataAnalysisService analysisService; + public MqttMessageConsumer(IDataAnalysisService analysisService) { + this.analysisService = analysisService; + } + + + + @Override + @Async("handleMessage") + public void handleMessage(Message message) throws MessagingException { + try { + // 获取消息Topic + MessageHeaders headers = message.getHeaders(); + String topic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC)); + log.info("[获取到的消息的topic]:{} ", topic); + // 获取消息体 + String payload = (String) message.getPayload(); + log.info("[获取到的消息的payload]:{} ", payload); + // 数据库入库消息 + if (topic.contains("iot/test1/in-storage")) { + // 模拟1000000条数据入库 + log.info("接收到iot/test1/in-storage的消息啦,快去处理"); + long l = System.currentTimeMillis(); + System.currentTimeMillis(); + CompletableFuture future = CompletableFuture.runAsync(() -> { + analysisService.parseStoreData(payload); + }); +// future.get(); + long l1 = System.currentTimeMillis(); + log.info("入库完成,耗时:{}", l1 - l); + analysisService.parseStoreData(payload); + } else if (topic.contains("table-update/")){} // TODO 表更新topic + } catch (Exception e) { + log.error(e.toString()); + } + } +} diff --git a/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DataAnalysisService.java b/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DataAnalysisService.java index 4b76ea4..7c9c15d 100644 --- a/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DataAnalysisService.java +++ b/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DataAnalysisService.java @@ -3,7 +3,7 @@ package com.huaxing.data.storage.service.impl; import com.huaxing.data.storage.domain.DataAnalysisDTO; import com.huaxing.data.storage.service.IDataAnalysisService; import com.huaxing.data.storage.service.IDeviceDataStoredService; -import com.huaxing.data.util.JacksonUtil; +import com.huaxing.common.util.JacksonUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; diff --git a/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataStoredServiceImpl.java b/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataStoredServiceImpl.java index 67c7f37..b01ae55 100644 --- a/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataStoredServiceImpl.java +++ b/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataStoredServiceImpl.java @@ -4,7 +4,7 @@ import com.huaxing.data.storage.domain.DataAnalysisDTO; import com.huaxing.data.storage.mapper.IDeviceDataStoredMapper; import com.huaxing.data.storage.service.IDeviceDataStoredService; import com.huaxing.data.dolphindb.base.CommonService; -import com.huaxing.data.util.JacksonUtil; +import com.huaxing.common.util.JacksonUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; diff --git a/data-storage/src/main/java/com/huaxing/data/test/controller/TestController.java b/data-storage/src/main/java/com/huaxing/data/test/controller/TestController.java index cb3f316..aa372dc 100644 --- a/data-storage/src/main/java/com/huaxing/data/test/controller/TestController.java +++ b/data-storage/src/main/java/com/huaxing/data/test/controller/TestController.java @@ -5,7 +5,7 @@ import com.huaxing.data.storage.service.IDeviceDataQueryDfsService; import com.huaxing.data.storage.service.IDeviceDataQueryStreamService; import com.huaxing.data.storage.service.IDeviceDataStoredService; import com.huaxing.data.tablemanagement.service.ITableStructureService; -import com.huaxing.data.util.JacksonUtil; +import com.huaxing.common.util.JacksonUtil; import com.huaxing.mqtt.processor.MqttMessageSender; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; @@ -13,10 +13,7 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; /** @@ -117,7 +114,7 @@ public class TestController { private Map handleMapByIndex(int index) { Map map = new HashMap<>(); - map.put("time", "2025.01.01 00:00:00"); + map.put("time", new Date()); map.put("projectId", "0jZU2102"); map.put("deviceId", "0jZU2102_0806_0011"); map.put("WM_WFA", 124.656 + index); diff --git a/data-storage/src/main/java/com/huaxing/data/util/JacksonUtil.java b/data-storage/src/main/java/com/huaxing/data/util/JacksonUtil.java deleted file mode 100644 index 3753c0a..0000000 --- a/data-storage/src/main/java/com/huaxing/data/util/JacksonUtil.java +++ /dev/null @@ -1,77 +0,0 @@ -package com.huaxing.data.util; - -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * @author kwl99 - * @since 2024-08-08 14:31 - */ -public class JacksonUtil { - - /** - * 将对象转为json字符串 - */ - public static String objectStr(Object object) { - ObjectMapper mapper = new ObjectMapper(); - try { - - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); - - return mapper.writeValueAsString(object); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - - /** - * 将json字符串转为对象 - */ - public static T strToObject(String str, Class clazz) { - ObjectMapper mapper = new ObjectMapper(); - try { - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - return mapper.readValue(str, clazz); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - - - /** - * 将json字符串转为对象列表 - */ - public static List strToObjectList(String str, Class clazz) { - ObjectMapper mapper = new ObjectMapper(); - try { - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - return mapper.readValue(str, mapper.getTypeFactory().constructCollectionType(List.class, clazz)); - } catch (Exception e) { - throw new RuntimeException(e); - - } - } - - /** - * 将json字符串转为对象列表 - */ - public static List> strToMapList(String str) { - ObjectMapper mapper = new ObjectMapper(); - try { - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - return mapper.readValue(str, mapper.getTypeFactory().constructCollectionType(List.class, HashMap.class)); - } catch (Exception e) { - throw new RuntimeException(e); - - } - } - -} diff --git a/data-storage/src/main/java/com/huaxing/mqtt/ReadMe b/data-storage/src/main/java/com/huaxing/mqtt/ReadMe deleted file mode 100644 index e69de29..0000000 diff --git a/data-storage/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java b/data-storage/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java deleted file mode 100644 index af9bf76..0000000 --- a/data-storage/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java +++ /dev/null @@ -1,143 +0,0 @@ -package com.huaxing.mqtt.config; - -import lombok.Getter; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.annotation.IntegrationComponentScan; -import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; -import org.springframework.util.CollectionUtils; - -import java.util.ArrayList; -import java.util.List; - -/** - * mqtt全局相关配置信息 - */ -@Slf4j -@Setter -@Getter -@Configuration -@ConfigurationProperties("mqtt") -@IntegrationComponentScan(basePackages = "com.huaxing.mqtt.*") -public class MqttConfiguration { - - /** - * 用户名 - */ - private String username; - - /** - * 密码 - */ - private String password; - - /** - * 连接地址 - */ - private String hostUrl; - - /** - * 客户Id - */ - private String clientId; - - /** - * 默认连接话题 - */ - private String defaultTopic; - - /** - * 超时时间 - */ - private int timeout; - - /** - * qos - */ - private int qos; - - /** - * 订阅超时时间 - */ - private int completionTimeout; - - /** - * 保持连接数 - */ - private int keepalive; - - /** - * 2024-12-06新增 - * 要订阅的其他主题 - */ - private List topics; - - /** - * 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息 - */ - private static final byte[] WILL_DATA = "offline".getBytes(); - - - /** - * 获取所有订阅的主题 - * @return 所有订阅的主题 - */ - public String[] getAllTopics() { - // 校验配置文件是否配置 - if (CollectionUtils.isEmpty(topics)) { - this.topics = new ArrayList<>(); - } - // 将默认主题条件到其他主题里 - this.topics.add(defaultTopic); - // 返回主题数组 - return topics.toArray(new String[0]); - } - - /** - * 注册MQTT客户端工厂 - * @return MqttPahoClientFactory - */ - @Bean - public MqttPahoClientFactory mqttClientFactory() { - // 客户端工厂 - DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); - - MqttConnectOptions options = new MqttConnectOptions(); - // 设置连接的用户名 - options.setUserName(username); - // 设置连接的密码 - options.setPassword(password.toCharArray()); - // 设置连接的地址 - options.setServerURIs(new String[]{hostUrl}); - - // 如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持: - // 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。 - // 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。 - options.setCleanSession(true); - - // 设置超时时间,该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。 - // 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。 - options.setConnectionTimeout(10); - - // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线 - // 此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0 - // 但这个方法并没有重连的机制 - options.setKeepAliveInterval(20); - - // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。 - options.setWill("willTopic", WILL_DATA, 2, false); - - //自动重新连接 - options.setAutomaticReconnect(true); - factory.setConnectionOptions(options); - - log.info("初始化 MQTT 配置"); - - return factory; - } -} diff --git a/data-storage/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java b/data-storage/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java deleted file mode 100644 index ac525f3..0000000 --- a/data-storage/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java +++ /dev/null @@ -1,96 +0,0 @@ -package com.huaxing.mqtt.config; - -import com.huaxing.mqtt.constant.MqttConstant; -import com.huaxing.mqtt.processor.MqttMessageReceiver; -import lombok.extern.slf4j.Slf4j; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.annotation.IntegrationComponentScan; -import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.endpoint.MessageProducerSupport; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; -import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; -import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessageHandler; - -import java.util.UUID; - - -/** - * MQTT消费者配置 - */ -@Slf4j -@Configuration -@IntegrationComponentScan -public class MqttConsumerConfiguration { - - final MqttConfiguration mqttConfiguration; - final MqttMessageReceiver mqttMessageReceiver; - - public MqttConsumerConfiguration(MqttConfiguration mqttConfiguration, MqttMessageReceiver mqttMessageReceiver) { - this.mqttConfiguration = mqttConfiguration; - this.mqttMessageReceiver = mqttMessageReceiver; - } - - - /** - * 此处可以使用其他消息通道 - * MQTT信息通道(消费者) - * Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。 - */ - @Bean - public MessageChannel mqttInBoundChannel() { - return new DirectChannel(); - } - - /** - * mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。 - */ - @Bean - @ServiceActivator(inputChannel = "mqttInBoundChannel") - public MessageHandler mqttMessageHandler() { - return this.mqttMessageReceiver; - } - - /** - * MQTT消息订阅绑定(消费者) - * 适配器, 两个topic共用一个adapter - * 客户端作为消费者,订阅主题,消费消息 - */ - @Bean - public MessageProducerSupport mqttInbound() { - // 获取客户端id - String clientId = mqttConfiguration.getClientId(); - // 获取默认主题 -// String defaultTopic = mqttConfiguration.getDefaultTopic(); - - // 获取所有配置的主题 - String[] topics = mqttConfiguration.getAllTopics(); - - // 获取客户端工厂 - MqttPahoClientFactory mqttPahoClientFactory = mqttConfiguration.mqttClientFactory(); - - // Paho客户端消息驱动通道适配器,主要用来订阅主题 - MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( - UUID.randomUUID() + clientId + MqttConstant.CLIENT_SUFFIX_CONSUMERS, - mqttPahoClientFactory, - // 所有需要订阅的topic - topics - ); - adapter.setCompletionTimeout(mqttConfiguration.getCompletionTimeout()); - // Paho消息转换器 - DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(); - // 按字节接收消息 - // defaultPahoMessageConverter.setPayloadAsBytes(true); - adapter.setConverter(defaultPahoMessageConverter); - // 设置QoS - adapter.setQos(mqttConfiguration.getQos()); - // 设置订阅通道 - adapter.setOutputChannel(mqttInBoundChannel()); - return adapter; - } - -} - diff --git a/data-storage/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java b/data-storage/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java deleted file mode 100644 index 15a9b55..0000000 --- a/data-storage/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java +++ /dev/null @@ -1,65 +0,0 @@ -package com.huaxing.mqtt.config; - -import com.huaxing.mqtt.constant.MqttConstant; -import jakarta.annotation.Resource; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; -import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; -import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessageHandler; - - - -/** - * MQTT生产者配置 - */ -@Slf4j -@Configuration -@AllArgsConstructor -public class MqttProducerConfiguration { - - @Resource - private MqttConfiguration mqttConfiguration; - - /** - * MQTT信息通道(生产者) - */ - @Bean - public MessageChannel mqttOutboundChannel() { - return new DirectChannel(); - } - - /** - * MQTT消息处理器(生产者) - */ - @Bean - @ServiceActivator(inputChannel = "mqttOutboundChannel") - public MessageHandler mqttOutbound() { - // 客户端id - String clientId = mqttConfiguration.getClientId(); - // 默认主题 - String defaultTopic = mqttConfiguration.getDefaultTopic(); - MqttPahoClientFactory mqttPahoClientFactory = mqttConfiguration.mqttClientFactory(); - - // 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory - MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + MqttConstant.CLIENT_SUFFIX_PRODUCERS, mqttPahoClientFactory); - // true,异步,发送消息时将不会阻塞。 - messageHandler.setAsync(true); - messageHandler.setDefaultTopic(defaultTopic); - // 默认QoS - messageHandler.setDefaultQos(1); - // Paho消息转换器 - DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(); - // defaultPahoMessageConverter.setPayloadAsBytes(true); - // 发送默认按字节类型发送消息 - messageHandler.setConverter(defaultPahoMessageConverter); - return messageHandler; - } - -} diff --git a/data-storage/src/main/java/com/huaxing/mqtt/constant/MqttConstant.java b/data-storage/src/main/java/com/huaxing/mqtt/constant/MqttConstant.java deleted file mode 100644 index 8266076..0000000 --- a/data-storage/src/main/java/com/huaxing/mqtt/constant/MqttConstant.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.huaxing.mqtt.constant; -/** - * 常量 - */ -public class MqttConstant { - - /** - * 客户端id消费者后缀 - */ - public static final String CLIENT_SUFFIX_CONSUMERS = "_consumers"; - /** - * 客户端id生产者后缀 - */ - public static final String CLIENT_SUFFIX_PRODUCERS = "_producers"; - -} - diff --git a/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java b/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java deleted file mode 100644 index 89c4fa2..0000000 --- a/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.huaxing.mqtt.processor; - -import org.springframework.integration.annotation.MessagingGateway; -import org.springframework.integration.mqtt.support.MqttHeaders; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.stereotype.Component; -import org.springframework.stereotype.Service; - -/** - * 生产者处理器 - */ -@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") -public interface MqttGateway { - - /** - * 发送mqtt消息 - * - * @param topic 主题 - * @param payload 内容 - */ - void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); - - /** - * 发送包含qos的消息 - * - * @param topic 主题 - * @param qos 对消息处理的几种机制。 - * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
- * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
- * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 - * @param payload 消息体 - */ - void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); - - /** - * 发送包含qos的消息 - * - * @param topic 主题 - * @param qos 对消息处理的几种机制。 - * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
- * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
- * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 - * @param payload 消息体 - */ - void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); -} diff --git a/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java b/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java deleted file mode 100644 index 6154d4b..0000000 --- a/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.huaxing.mqtt.processor; - -import com.huaxing.data.storage.service.IDataAnalysisService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.integration.mqtt.support.MqttHeaders; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHandler; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.MessagingException; -import org.springframework.scheduling.annotation.Async; -import org.springframework.stereotype.Component; - -import java.util.concurrent.CompletableFuture; - -/** - * 消费者处理器 - */ -@Slf4j -@Component -@SuppressWarnings("all") -public class MqttMessageReceiver implements MessageHandler { - - /** - * 数据处理服务 - */ - final IDataAnalysisService analysisService; - public MqttMessageReceiver(IDataAnalysisService analysisService) { - this.analysisService = analysisService; - } - - /** - * 消息处理 - * - * @param message 消息 - * @throws MessagingException 消息异常 - */ - @Override - @Async("handleMessage") - public void handleMessage(Message message) throws MessagingException { - try { - // 获取消息Topic - MessageHeaders headers = message.getHeaders(); - String topic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC)); - log.info("[获取到的消息的topic]:{} ", topic); - // 获取消息体 - String payload = (String) message.getPayload(); - log.info("[获取到的消息的payload]:{} ", payload); - // 数据库入库消息 - if (topic.contains("iot/test1/in-storage")) { - // 模拟1000000条数据入库 - log.info("接收到iot/test1/in-storage的消息啦,快去处理"); - long l = System.currentTimeMillis(); - System.currentTimeMillis(); - CompletableFuture future = CompletableFuture.runAsync(() -> { - analysisService.parseStoreData(payload); - }); - long l1 = System.currentTimeMillis(); - log.info("入库完成,耗时:{}", l1 - l); -// analysisService.parseStoreData(payload); - } else if (topic.contains("table-update/")){} // TODO 表更新topic - } catch (Exception e) { - log.error(e.toString()); - } - } -} - diff --git a/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java b/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java deleted file mode 100644 index 27cc2f8..0000000 --- a/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.huaxing.mqtt.processor; - -import org.json.JSONObject; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - - -/** - * 生产者处理器 - */ -@Component -public class MqttMessageSender { - - @Autowired - private MqttGateway mqttGateway; - - - /** - * 发送mqtt消息 - * - * @param topic 主题 - * @param message 内容 - * @return void - */ - public void send(String topic, String message) { - mqttGateway.sendToMqtt(topic, message); - } - - /** - * 发送包含qos的消息 - * - * @param topic 主题 - * @param qos 质量 - * @param messageBody 消息体 - * @return void - */ - public void send(String topic, int qos, JSONObject messageBody) { - mqttGateway.sendToMqtt(topic, qos, messageBody.toString()); - } - - /** - * 发送包含qos的消息 - * - * @param topic 主题 - * @param qos 质量 - * @param message 消息体 - * @return void - */ - public void send(String topic, int qos, byte[] message) { - mqttGateway.sendToMqtt(topic, qos, message); - } -} diff --git a/data-storage/src/main/resources/banner.txt b/data-storage/src/main/resources/banner.txt deleted file mode 100644 index 6b056d1..0000000 --- a/data-storage/src/main/resources/banner.txt +++ /dev/null @@ -1,8 +0,0 @@ - ________ ________ _________ ________ ________ _________ ________ ________ ________ ________ _______ -|\ ___ \|\ __ \|\___ ___\\ __ \ |\ ____\|\___ ___\\ __ \|\ __ \|\ __ \|\ ____\|\ ___ \ -\ \ \_|\ \ \ \|\ \|___ \ \_\ \ \|\ \ ____________\ \ \___|\|___ \ \_\ \ \|\ \ \ \|\ \ \ \|\ \ \ \___|\ \ __/| - \ \ \ \\ \ \ __ \ \ \ \ \ \ __ \|\____________\ \_____ \ \ \ \ \ \ \\\ \ \ _ _\ \ __ \ \ \ __\ \ \_|/__ - \ \ \_\\ \ \ \ \ \ \ \ \ \ \ \ \ \|____________|\|____|\ \ \ \ \ \ \ \\\ \ \ \\ \\ \ \ \ \ \ \|\ \ \ \_|\ \ - \ \_______\ \__\ \__\ \ \__\ \ \__\ \__\ ____\_\ \ \ \__\ \ \_______\ \__\\ _\\ \__\ \__\ \_______\ \_______\ - \|_______|\|__|\|__| \|__| \|__|\|__| |\_________\ \|__| \|_______|\|__|\|__|\|__|\|__|\|_______|\|_______| - \|_________| diff --git a/pom.xml b/pom.xml index fe572b1..058fa30 100644 --- a/pom.xml +++ b/pom.xml @@ -15,6 +15,15 @@ data-storage + + + org.projectlombok + lombok + RELEASE + provided + + +