From f8942b3127d52847b98f6aeaa1d07d1f70323b90 Mon Sep 17 00:00:00 2001 From: swordmeng Date: Mon, 20 Jan 2025 11:58:55 +0800 Subject: [PATCH] =?UTF-8?q?=E9=A1=B9=E7=9B=AE=E5=90=8D=E7=A7=B0=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- data-common/pom.xml | 36 ------ .../com/huaxing/common/constant/AppConstant.java | 15 --- .../com/huaxing/common/constant/MqttConstant.java | 17 --- .../java/com/huaxing/common/util/JacksonUtil.java | 77 ----------- data-common/src/main/java/com/huaxing/mqtt/ReadMe | 0 .../com/huaxing/mqtt/config/MqttConfiguration.java | 143 --------------------- .../mqtt/config/MqttConsumerConfiguration.java | 96 -------------- .../mqtt/config/MqttProducerConfiguration.java | 65 ---------- .../com/huaxing/mqtt/processor/MqttGateway.java | 46 ------- .../mqtt/processor/MqttMessageReceiver.java | 61 --------- .../huaxing/mqtt/processor/MqttMessageSender.java | 52 -------- data-framework/pom.xml | 36 ++++++ .../com/huaxing/common/constant/AppConstant.java | 15 +++ .../com/huaxing/common/constant/MqttConstant.java | 17 +++ .../java/com/huaxing/common/util/JacksonUtil.java | 77 +++++++++++ .../src/main/java/com/huaxing/mqtt/ReadMe | 0 .../com/huaxing/mqtt/config/MqttConfiguration.java | 143 +++++++++++++++++++++ .../mqtt/config/MqttConsumerConfiguration.java | 96 ++++++++++++++ .../mqtt/config/MqttProducerConfiguration.java | 65 ++++++++++ .../com/huaxing/mqtt/processor/MqttGateway.java | 46 +++++++ .../mqtt/processor/MqttMessageReceiver.java | 61 +++++++++ .../huaxing/mqtt/processor/MqttMessageSender.java | 52 ++++++++ data-storage/pom.xml | 2 +- pom.xml | 2 +- 24 files changed, 610 insertions(+), 610 deletions(-) delete mode 100644 data-common/pom.xml delete mode 100644 data-common/src/main/java/com/huaxing/common/constant/AppConstant.java delete mode 100644 data-common/src/main/java/com/huaxing/common/constant/MqttConstant.java delete mode 100644 data-common/src/main/java/com/huaxing/common/util/JacksonUtil.java delete mode 100644 data-common/src/main/java/com/huaxing/mqtt/ReadMe delete mode 100644 data-common/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java delete mode 100644 data-common/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java delete mode 100644 data-common/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java delete mode 100644 data-common/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java delete mode 100644 data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java delete mode 100644 data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java create mode 100644 data-framework/pom.xml create mode 100644 data-framework/src/main/java/com/huaxing/common/constant/AppConstant.java create mode 100644 data-framework/src/main/java/com/huaxing/common/constant/MqttConstant.java create mode 100644 data-framework/src/main/java/com/huaxing/common/util/JacksonUtil.java create mode 100644 data-framework/src/main/java/com/huaxing/mqtt/ReadMe create mode 100644 data-framework/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java create mode 100644 data-framework/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java create mode 100644 data-framework/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java create mode 100644 data-framework/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java create mode 100644 data-framework/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java create mode 100644 data-framework/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java diff --git a/data-common/pom.xml b/data-common/pom.xml deleted file mode 100644 index a136dff..0000000 --- a/data-common/pom.xml +++ /dev/null @@ -1,36 +0,0 @@ - - - 4.0.0 - - com.huaxing - data-bridge - 0.0.1-SNAPSHOT - - - data-common - - - - - - org.springframework.integration - spring-integration-mqtt - 6.4.1 - - - org.json - json - 20240303 - - - - org.springframework.boot - spring-boot-starter-web - 3.4.1 - - - - - \ No newline at end of file diff --git a/data-common/src/main/java/com/huaxing/common/constant/AppConstant.java b/data-common/src/main/java/com/huaxing/common/constant/AppConstant.java deleted file mode 100644 index 594ebf6..0000000 --- a/data-common/src/main/java/com/huaxing/common/constant/AppConstant.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.huaxing.common.constant; - -/** - * @ProjectName: iot-data-bridge - * @Package: com.huaxing.base.constant - * @ClassName: AppConstant - * @Author: swordmeng8@163.com - * @Description: 系统常量 - * @Date: 2025/1/10 10:53 - * @Version: 1.0 - */ - -public class AppConstant { - public static final String APP_NAME = "iot-data-bridge"; -} diff --git a/data-common/src/main/java/com/huaxing/common/constant/MqttConstant.java b/data-common/src/main/java/com/huaxing/common/constant/MqttConstant.java deleted file mode 100644 index eba93f5..0000000 --- a/data-common/src/main/java/com/huaxing/common/constant/MqttConstant.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.huaxing.common.constant; -/** - * 常量 - */ -public class MqttConstant { - - /** - * 客户端id消费者后缀 - */ - public static final String CLIENT_SUFFIX_CONSUMERS = "_consumers"; - /** - * 客户端id生产者后缀 - */ - public static final String CLIENT_SUFFIX_PRODUCERS = "_producers"; - -} - diff --git a/data-common/src/main/java/com/huaxing/common/util/JacksonUtil.java b/data-common/src/main/java/com/huaxing/common/util/JacksonUtil.java deleted file mode 100644 index e589f41..0000000 --- a/data-common/src/main/java/com/huaxing/common/util/JacksonUtil.java +++ /dev/null @@ -1,77 +0,0 @@ -package com.huaxing.common.util; - -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * @author kwl99 - * @since 2024-08-08 14:31 - */ -public class JacksonUtil { - - /** - * 将对象转为json字符串 - */ - public static String objectStr(Object object) { - ObjectMapper mapper = new ObjectMapper(); - try { - - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); - - return mapper.writeValueAsString(object); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - - /** - * 将json字符串转为对象 - */ - public static T strToObject(String str, Class clazz) { - ObjectMapper mapper = new ObjectMapper(); - try { - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - return mapper.readValue(str, clazz); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - - - /** - * 将json字符串转为对象列表 - */ - public static List strToObjectList(String str, Class clazz) { - ObjectMapper mapper = new ObjectMapper(); - try { - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - return mapper.readValue(str, mapper.getTypeFactory().constructCollectionType(List.class, clazz)); - } catch (Exception e) { - throw new RuntimeException(e); - - } - } - - /** - * 将json字符串转为对象列表 - */ - public static List> strToMapList(String str) { - ObjectMapper mapper = new ObjectMapper(); - try { - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - return mapper.readValue(str, mapper.getTypeFactory().constructCollectionType(List.class, HashMap.class)); - } catch (Exception e) { - throw new RuntimeException(e); - - } - } - -} diff --git a/data-common/src/main/java/com/huaxing/mqtt/ReadMe b/data-common/src/main/java/com/huaxing/mqtt/ReadMe deleted file mode 100644 index e69de29..0000000 diff --git a/data-common/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java b/data-common/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java deleted file mode 100644 index fb12312..0000000 --- a/data-common/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java +++ /dev/null @@ -1,143 +0,0 @@ -package com.huaxing.mqtt.config; - -import lombok.Getter; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.annotation.IntegrationComponentScan; -import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; -import org.springframework.util.CollectionUtils; - -import java.util.ArrayList; -import java.util.List; - -/** - * mqtt全局相关配置信息 - */ -@Slf4j -@Setter -@Getter -@Configuration -@ConfigurationProperties("mqtt") -@IntegrationComponentScan(basePackages = "com.huaxing.mqtt.*") -public class MqttConfiguration { - - /** - * 用户名 - */ - private String username; - - /** - * 密码 - */ - private String password; - - /** - * 连接地址 - */ - private String hostUrl; - - /** - * 客户Id - */ - private String clientId; - - /** - * 默认连接话题 - */ - private String defaultTopic; - - /** - * 超时时间 - */ - private int timeout; - - /** - * qos - */ - private int qos; - - /** - * 订阅超时时间 - */ - private int completionTimeout; - - /** - * 保持连接数 - */ - private int keepalive; - - /** - * 2024-12-06新增 - * 要订阅的其他主题 - */ - private List topics; - - /** - * 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息 - */ - private static final byte[] WILL_DATA = "offline".getBytes(); - - - /** - * 获取所有订阅的主题 - * @return 所有订阅的主题 - */ - public String[] getAllTopics() { - // 校验配置文件是否配置 - if (CollectionUtils.isEmpty(topics)) { - this.topics = new ArrayList<>(); - } - // 将默认主题条件到其他主题里 - this.topics.add(defaultTopic); - // 返回主题数组 - return topics.toArray(new String[0]); - } - - /** - * 注册MQTT客户端工厂 - * @return MqttPahoClientFactory - */ - @Bean - public MqttPahoClientFactory mqttClientFactory() { - // 客户端工厂 - DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); - - MqttConnectOptions options = new MqttConnectOptions(); - // 设置连接的用户名 - options.setUserName(username); - // 设置连接的密码 - options.setPassword(password.toCharArray()); - // 设置连接的地址 - options.setServerURIs(new String[]{hostUrl}); - - // 如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持: - // 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。 - // 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。 - options.setCleanSession(true); - - // 设置超时时间,该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。 - // 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。 - options.setConnectionTimeout(10); - - // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线 - // 此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0 - // 但这个方法并没有重连的机制 - options.setKeepAliveInterval(20); - - // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。 - options.setWill("willTopic", WILL_DATA, 2, false); - - //自动重新连接 - options.setAutomaticReconnect(true); - factory.setConnectionOptions(options); - - log.info("【初始化 MQTT 配置】"); - - return factory; - } -} diff --git a/data-common/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java b/data-common/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java deleted file mode 100644 index 65311e6..0000000 --- a/data-common/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java +++ /dev/null @@ -1,96 +0,0 @@ -package com.huaxing.mqtt.config; - -import com.huaxing.common.constant.MqttConstant; -import com.huaxing.mqtt.processor.MqttMessageReceiver; -import lombok.extern.slf4j.Slf4j; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.annotation.IntegrationComponentScan; -import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.endpoint.MessageProducerSupport; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; -import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; -import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessageHandler; - -import java.util.UUID; - - -/** - * MQTT消费者配置 - */ -@Slf4j -@Configuration -@IntegrationComponentScan -public class MqttConsumerConfiguration { - - final MqttConfiguration mqttConfiguration; - final MqttMessageReceiver mqttMessageReceiver; - - public MqttConsumerConfiguration(MqttConfiguration mqttConfiguration, MqttMessageReceiver mqttMessageReceiver) { - this.mqttConfiguration = mqttConfiguration; - this.mqttMessageReceiver = mqttMessageReceiver; - } - - - /** - * 此处可以使用其他消息通道 - * MQTT信息通道(消费者) - * Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。 - */ - @Bean - public MessageChannel mqttInBoundChannel() { - return new DirectChannel(); - } - - /** - * mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。 - */ - @Bean - @ServiceActivator(inputChannel = "mqttInBoundChannel") - public MessageHandler mqttMessageHandler() { - return this.mqttMessageReceiver; - } - - /** - * MQTT消息订阅绑定(消费者) - * 适配器, 两个topic共用一个adapter - * 客户端作为消费者,订阅主题,消费消息 - */ - @Bean - public MessageProducerSupport mqttInbound() { - // 获取客户端id - String clientId = mqttConfiguration.getClientId(); - // 获取默认主题 -// String defaultTopic = mqttConfiguration.getDefaultTopic(); - - // 获取所有配置的主题 - String[] topics = mqttConfiguration.getAllTopics(); - - // 获取客户端工厂 - MqttPahoClientFactory mqttPahoClientFactory = mqttConfiguration.mqttClientFactory(); - - // Paho客户端消息驱动通道适配器,主要用来订阅主题 - MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( - UUID.randomUUID() + clientId + MqttConstant.CLIENT_SUFFIX_CONSUMERS, - mqttPahoClientFactory, - // 所有需要订阅的topic - topics - ); - adapter.setCompletionTimeout(mqttConfiguration.getCompletionTimeout()); - // Paho消息转换器 - DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(); - // 按字节接收消息 - // defaultPahoMessageConverter.setPayloadAsBytes(true); - adapter.setConverter(defaultPahoMessageConverter); - // 设置QoS - adapter.setQos(mqttConfiguration.getQos()); - // 设置订阅通道 - adapter.setOutputChannel(mqttInBoundChannel()); - return adapter; - } - -} - diff --git a/data-common/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java b/data-common/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java deleted file mode 100644 index 37de71b..0000000 --- a/data-common/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java +++ /dev/null @@ -1,65 +0,0 @@ -package com.huaxing.mqtt.config; - -import com.huaxing.common.constant.MqttConstant; -import jakarta.annotation.Resource; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; -import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; -import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessageHandler; - - - -/** - * MQTT生产者配置 - */ -@Slf4j -@Configuration -@AllArgsConstructor -public class MqttProducerConfiguration { - - @Resource - private MqttConfiguration mqttConfiguration; - - /** - * MQTT信息通道(生产者) - */ - @Bean - public MessageChannel mqttOutboundChannel() { - return new DirectChannel(); - } - - /** - * MQTT消息处理器(生产者) - */ - @Bean - @ServiceActivator(inputChannel = "mqttOutboundChannel") - public MessageHandler mqttOutbound() { - // 客户端id - String clientId = mqttConfiguration.getClientId(); - // 默认主题 - String defaultTopic = mqttConfiguration.getDefaultTopic(); - MqttPahoClientFactory mqttPahoClientFactory = mqttConfiguration.mqttClientFactory(); - - // 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory - MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + MqttConstant.CLIENT_SUFFIX_PRODUCERS, mqttPahoClientFactory); - // true,异步,发送消息时将不会阻塞。 - messageHandler.setAsync(true); - messageHandler.setDefaultTopic(defaultTopic); - // 默认QoS - messageHandler.setDefaultQos(1); - // Paho消息转换器 - DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(); - // defaultPahoMessageConverter.setPayloadAsBytes(true); - // 发送默认按字节类型发送消息 - messageHandler.setConverter(defaultPahoMessageConverter); - return messageHandler; - } - -} diff --git a/data-common/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java b/data-common/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java deleted file mode 100644 index 89c4fa2..0000000 --- a/data-common/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.huaxing.mqtt.processor; - -import org.springframework.integration.annotation.MessagingGateway; -import org.springframework.integration.mqtt.support.MqttHeaders; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.stereotype.Component; -import org.springframework.stereotype.Service; - -/** - * 生产者处理器 - */ -@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") -public interface MqttGateway { - - /** - * 发送mqtt消息 - * - * @param topic 主题 - * @param payload 内容 - */ - void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); - - /** - * 发送包含qos的消息 - * - * @param topic 主题 - * @param qos 对消息处理的几种机制。 - * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
- * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
- * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 - * @param payload 消息体 - */ - void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); - - /** - * 发送包含qos的消息 - * - * @param topic 主题 - * @param qos 对消息处理的几种机制。 - * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
- * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
- * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 - * @param payload 消息体 - */ - void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); -} diff --git a/data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java b/data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java deleted file mode 100644 index 27472d0..0000000 --- a/data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.huaxing.mqtt.processor; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.messaging.MessageHandler; -import org.springframework.stereotype.Component; -//import org.springframework.integration.mqtt.support.MqttHeaders; -//import org.springframework.messaging.Message; -//import org.springframework.messaging.MessageHeaders; -//import org.springframework.messaging.MessagingException; -//import org.springframework.scheduling.annotation.Async; -//import java.util.concurrent.CompletableFuture; - -/** - * 消费者处理器 - */ -@Slf4j -@Component -@SuppressWarnings("all") -public abstract class MqttMessageReceiver implements MessageHandler { - - // 获取消息接口 -// public abstract String getMessage(String payload); - -// /** -// * 消息处理 -// * -// * @param message 消息 -// * @throws MessagingException 消息异常 -// */ -// @Override -// @Async("handleMessage") -// public void handleMessage(Message message) throws MessagingException { -// try { -// // 获取消息Topic -// MessageHeaders headers = message.getHeaders(); -// String topic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC)); -// log.info("[获取到的消息的topic]:{} ", topic); -// // 获取消息体 -// String payload = (String) message.getPayload(); -// log.info("[获取到的消息的payload]:{} ", payload); -// // 数据库入库消息 -// if (topic.contains("iot/test1/in-storage")) { -// // 模拟1000000条数据入库 -// log.info("接收到iot/test1/in-storage的消息啦,快去处理"); -// getMessage(payload); -//// long l = System.currentTimeMillis(); -//// System.currentTimeMillis(); -//// CompletableFuture future = CompletableFuture.runAsync(() -> { -////// analysisService.parseStoreData(payload); -//// }); -//// long l1 = System.currentTimeMillis(); -//// log.info("入库完成,耗时:{}", l1 - l); -//// analysisService.parseStoreData(payload); -// } else if (topic.contains("table-update/")){} // TODO 表更新topic -// } catch (Exception e) { -// log.error(e.toString()); -// } -// } - -} - diff --git a/data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java b/data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java deleted file mode 100644 index 27cc2f8..0000000 --- a/data-common/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.huaxing.mqtt.processor; - -import org.json.JSONObject; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - - -/** - * 生产者处理器 - */ -@Component -public class MqttMessageSender { - - @Autowired - private MqttGateway mqttGateway; - - - /** - * 发送mqtt消息 - * - * @param topic 主题 - * @param message 内容 - * @return void - */ - public void send(String topic, String message) { - mqttGateway.sendToMqtt(topic, message); - } - - /** - * 发送包含qos的消息 - * - * @param topic 主题 - * @param qos 质量 - * @param messageBody 消息体 - * @return void - */ - public void send(String topic, int qos, JSONObject messageBody) { - mqttGateway.sendToMqtt(topic, qos, messageBody.toString()); - } - - /** - * 发送包含qos的消息 - * - * @param topic 主题 - * @param qos 质量 - * @param message 消息体 - * @return void - */ - public void send(String topic, int qos, byte[] message) { - mqttGateway.sendToMqtt(topic, qos, message); - } -} diff --git a/data-framework/pom.xml b/data-framework/pom.xml new file mode 100644 index 0000000..930436f --- /dev/null +++ b/data-framework/pom.xml @@ -0,0 +1,36 @@ + + + 4.0.0 + + com.huaxing + data-bridge + 0.0.1-SNAPSHOT + + + data-framework + + + + + + org.springframework.integration + spring-integration-mqtt + 6.4.1 + + + org.json + json + 20240303 + + + + org.springframework.boot + spring-boot-starter-web + 3.4.1 + + + + + \ No newline at end of file diff --git a/data-framework/src/main/java/com/huaxing/common/constant/AppConstant.java b/data-framework/src/main/java/com/huaxing/common/constant/AppConstant.java new file mode 100644 index 0000000..594ebf6 --- /dev/null +++ b/data-framework/src/main/java/com/huaxing/common/constant/AppConstant.java @@ -0,0 +1,15 @@ +package com.huaxing.common.constant; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.base.constant + * @ClassName: AppConstant + * @Author: swordmeng8@163.com + * @Description: 系统常量 + * @Date: 2025/1/10 10:53 + * @Version: 1.0 + */ + +public class AppConstant { + public static final String APP_NAME = "iot-data-bridge"; +} diff --git a/data-framework/src/main/java/com/huaxing/common/constant/MqttConstant.java b/data-framework/src/main/java/com/huaxing/common/constant/MqttConstant.java new file mode 100644 index 0000000..eba93f5 --- /dev/null +++ b/data-framework/src/main/java/com/huaxing/common/constant/MqttConstant.java @@ -0,0 +1,17 @@ +package com.huaxing.common.constant; +/** + * 常量 + */ +public class MqttConstant { + + /** + * 客户端id消费者后缀 + */ + public static final String CLIENT_SUFFIX_CONSUMERS = "_consumers"; + /** + * 客户端id生产者后缀 + */ + public static final String CLIENT_SUFFIX_PRODUCERS = "_producers"; + +} + diff --git a/data-framework/src/main/java/com/huaxing/common/util/JacksonUtil.java b/data-framework/src/main/java/com/huaxing/common/util/JacksonUtil.java new file mode 100644 index 0000000..e589f41 --- /dev/null +++ b/data-framework/src/main/java/com/huaxing/common/util/JacksonUtil.java @@ -0,0 +1,77 @@ +package com.huaxing.common.util; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author kwl99 + * @since 2024-08-08 14:31 + */ +public class JacksonUtil { + + /** + * 将对象转为json字符串 + */ + public static String objectStr(Object object) { + ObjectMapper mapper = new ObjectMapper(); + try { + + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + + return mapper.writeValueAsString(object); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + /** + * 将json字符串转为对象 + */ + public static T strToObject(String str, Class clazz) { + ObjectMapper mapper = new ObjectMapper(); + try { + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return mapper.readValue(str, clazz); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + + /** + * 将json字符串转为对象列表 + */ + public static List strToObjectList(String str, Class clazz) { + ObjectMapper mapper = new ObjectMapper(); + try { + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return mapper.readValue(str, mapper.getTypeFactory().constructCollectionType(List.class, clazz)); + } catch (Exception e) { + throw new RuntimeException(e); + + } + } + + /** + * 将json字符串转为对象列表 + */ + public static List> strToMapList(String str) { + ObjectMapper mapper = new ObjectMapper(); + try { + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return mapper.readValue(str, mapper.getTypeFactory().constructCollectionType(List.class, HashMap.class)); + } catch (Exception e) { + throw new RuntimeException(e); + + } + } + +} diff --git a/data-framework/src/main/java/com/huaxing/mqtt/ReadMe b/data-framework/src/main/java/com/huaxing/mqtt/ReadMe new file mode 100644 index 0000000..e69de29 diff --git a/data-framework/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java b/data-framework/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java new file mode 100644 index 0000000..fb12312 --- /dev/null +++ b/data-framework/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java @@ -0,0 +1,143 @@ +package com.huaxing.mqtt.config; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.util.CollectionUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * mqtt全局相关配置信息 + */ +@Slf4j +@Setter +@Getter +@Configuration +@ConfigurationProperties("mqtt") +@IntegrationComponentScan(basePackages = "com.huaxing.mqtt.*") +public class MqttConfiguration { + + /** + * 用户名 + */ + private String username; + + /** + * 密码 + */ + private String password; + + /** + * 连接地址 + */ + private String hostUrl; + + /** + * 客户Id + */ + private String clientId; + + /** + * 默认连接话题 + */ + private String defaultTopic; + + /** + * 超时时间 + */ + private int timeout; + + /** + * qos + */ + private int qos; + + /** + * 订阅超时时间 + */ + private int completionTimeout; + + /** + * 保持连接数 + */ + private int keepalive; + + /** + * 2024-12-06新增 + * 要订阅的其他主题 + */ + private List topics; + + /** + * 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息 + */ + private static final byte[] WILL_DATA = "offline".getBytes(); + + + /** + * 获取所有订阅的主题 + * @return 所有订阅的主题 + */ + public String[] getAllTopics() { + // 校验配置文件是否配置 + if (CollectionUtils.isEmpty(topics)) { + this.topics = new ArrayList<>(); + } + // 将默认主题条件到其他主题里 + this.topics.add(defaultTopic); + // 返回主题数组 + return topics.toArray(new String[0]); + } + + /** + * 注册MQTT客户端工厂 + * @return MqttPahoClientFactory + */ + @Bean + public MqttPahoClientFactory mqttClientFactory() { + // 客户端工厂 + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + + MqttConnectOptions options = new MqttConnectOptions(); + // 设置连接的用户名 + options.setUserName(username); + // 设置连接的密码 + options.setPassword(password.toCharArray()); + // 设置连接的地址 + options.setServerURIs(new String[]{hostUrl}); + + // 如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持: + // 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。 + // 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。 + options.setCleanSession(true); + + // 设置超时时间,该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。 + // 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。 + options.setConnectionTimeout(10); + + // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线 + // 此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0 + // 但这个方法并没有重连的机制 + options.setKeepAliveInterval(20); + + // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。 + options.setWill("willTopic", WILL_DATA, 2, false); + + //自动重新连接 + options.setAutomaticReconnect(true); + factory.setConnectionOptions(options); + + log.info("【初始化 MQTT 配置】"); + + return factory; + } +} diff --git a/data-framework/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java b/data-framework/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java new file mode 100644 index 0000000..65311e6 --- /dev/null +++ b/data-framework/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java @@ -0,0 +1,96 @@ +package com.huaxing.mqtt.config; + +import com.huaxing.common.constant.MqttConstant; +import com.huaxing.mqtt.processor.MqttMessageReceiver; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +import java.util.UUID; + + +/** + * MQTT消费者配置 + */ +@Slf4j +@Configuration +@IntegrationComponentScan +public class MqttConsumerConfiguration { + + final MqttConfiguration mqttConfiguration; + final MqttMessageReceiver mqttMessageReceiver; + + public MqttConsumerConfiguration(MqttConfiguration mqttConfiguration, MqttMessageReceiver mqttMessageReceiver) { + this.mqttConfiguration = mqttConfiguration; + this.mqttMessageReceiver = mqttMessageReceiver; + } + + + /** + * 此处可以使用其他消息通道 + * MQTT信息通道(消费者) + * Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。 + */ + @Bean + public MessageChannel mqttInBoundChannel() { + return new DirectChannel(); + } + + /** + * mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。 + */ + @Bean + @ServiceActivator(inputChannel = "mqttInBoundChannel") + public MessageHandler mqttMessageHandler() { + return this.mqttMessageReceiver; + } + + /** + * MQTT消息订阅绑定(消费者) + * 适配器, 两个topic共用一个adapter + * 客户端作为消费者,订阅主题,消费消息 + */ + @Bean + public MessageProducerSupport mqttInbound() { + // 获取客户端id + String clientId = mqttConfiguration.getClientId(); + // 获取默认主题 +// String defaultTopic = mqttConfiguration.getDefaultTopic(); + + // 获取所有配置的主题 + String[] topics = mqttConfiguration.getAllTopics(); + + // 获取客户端工厂 + MqttPahoClientFactory mqttPahoClientFactory = mqttConfiguration.mqttClientFactory(); + + // Paho客户端消息驱动通道适配器,主要用来订阅主题 + MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( + UUID.randomUUID() + clientId + MqttConstant.CLIENT_SUFFIX_CONSUMERS, + mqttPahoClientFactory, + // 所有需要订阅的topic + topics + ); + adapter.setCompletionTimeout(mqttConfiguration.getCompletionTimeout()); + // Paho消息转换器 + DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(); + // 按字节接收消息 + // defaultPahoMessageConverter.setPayloadAsBytes(true); + adapter.setConverter(defaultPahoMessageConverter); + // 设置QoS + adapter.setQos(mqttConfiguration.getQos()); + // 设置订阅通道 + adapter.setOutputChannel(mqttInBoundChannel()); + return adapter; + } + +} + diff --git a/data-framework/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java b/data-framework/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java new file mode 100644 index 0000000..37de71b --- /dev/null +++ b/data-framework/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java @@ -0,0 +1,65 @@ +package com.huaxing.mqtt.config; + +import com.huaxing.common.constant.MqttConstant; +import jakarta.annotation.Resource; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + + + +/** + * MQTT生产者配置 + */ +@Slf4j +@Configuration +@AllArgsConstructor +public class MqttProducerConfiguration { + + @Resource + private MqttConfiguration mqttConfiguration; + + /** + * MQTT信息通道(生产者) + */ + @Bean + public MessageChannel mqttOutboundChannel() { + return new DirectChannel(); + } + + /** + * MQTT消息处理器(生产者) + */ + @Bean + @ServiceActivator(inputChannel = "mqttOutboundChannel") + public MessageHandler mqttOutbound() { + // 客户端id + String clientId = mqttConfiguration.getClientId(); + // 默认主题 + String defaultTopic = mqttConfiguration.getDefaultTopic(); + MqttPahoClientFactory mqttPahoClientFactory = mqttConfiguration.mqttClientFactory(); + + // 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory + MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + MqttConstant.CLIENT_SUFFIX_PRODUCERS, mqttPahoClientFactory); + // true,异步,发送消息时将不会阻塞。 + messageHandler.setAsync(true); + messageHandler.setDefaultTopic(defaultTopic); + // 默认QoS + messageHandler.setDefaultQos(1); + // Paho消息转换器 + DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(); + // defaultPahoMessageConverter.setPayloadAsBytes(true); + // 发送默认按字节类型发送消息 + messageHandler.setConverter(defaultPahoMessageConverter); + return messageHandler; + } + +} diff --git a/data-framework/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java b/data-framework/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java new file mode 100644 index 0000000..89c4fa2 --- /dev/null +++ b/data-framework/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java @@ -0,0 +1,46 @@ +package com.huaxing.mqtt.processor; + +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; + +/** + * 生产者处理器 + */ +@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") +public interface MqttGateway { + + /** + * 发送mqtt消息 + * + * @param topic 主题 + * @param payload 内容 + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); + + /** + * 发送包含qos的消息 + * + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); + + /** + * 发送包含qos的消息 + * + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); +} diff --git a/data-framework/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java b/data-framework/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java new file mode 100644 index 0000000..27472d0 --- /dev/null +++ b/data-framework/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java @@ -0,0 +1,61 @@ +package com.huaxing.mqtt.processor; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.messaging.MessageHandler; +import org.springframework.stereotype.Component; +//import org.springframework.integration.mqtt.support.MqttHeaders; +//import org.springframework.messaging.Message; +//import org.springframework.messaging.MessageHeaders; +//import org.springframework.messaging.MessagingException; +//import org.springframework.scheduling.annotation.Async; +//import java.util.concurrent.CompletableFuture; + +/** + * 消费者处理器 + */ +@Slf4j +@Component +@SuppressWarnings("all") +public abstract class MqttMessageReceiver implements MessageHandler { + + // 获取消息接口 +// public abstract String getMessage(String payload); + +// /** +// * 消息处理 +// * +// * @param message 消息 +// * @throws MessagingException 消息异常 +// */ +// @Override +// @Async("handleMessage") +// public void handleMessage(Message message) throws MessagingException { +// try { +// // 获取消息Topic +// MessageHeaders headers = message.getHeaders(); +// String topic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC)); +// log.info("[获取到的消息的topic]:{} ", topic); +// // 获取消息体 +// String payload = (String) message.getPayload(); +// log.info("[获取到的消息的payload]:{} ", payload); +// // 数据库入库消息 +// if (topic.contains("iot/test1/in-storage")) { +// // 模拟1000000条数据入库 +// log.info("接收到iot/test1/in-storage的消息啦,快去处理"); +// getMessage(payload); +//// long l = System.currentTimeMillis(); +//// System.currentTimeMillis(); +//// CompletableFuture future = CompletableFuture.runAsync(() -> { +////// analysisService.parseStoreData(payload); +//// }); +//// long l1 = System.currentTimeMillis(); +//// log.info("入库完成,耗时:{}", l1 - l); +//// analysisService.parseStoreData(payload); +// } else if (topic.contains("table-update/")){} // TODO 表更新topic +// } catch (Exception e) { +// log.error(e.toString()); +// } +// } + +} + diff --git a/data-framework/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java b/data-framework/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java new file mode 100644 index 0000000..27cc2f8 --- /dev/null +++ b/data-framework/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java @@ -0,0 +1,52 @@ +package com.huaxing.mqtt.processor; + +import org.json.JSONObject; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + + +/** + * 生产者处理器 + */ +@Component +public class MqttMessageSender { + + @Autowired + private MqttGateway mqttGateway; + + + /** + * 发送mqtt消息 + * + * @param topic 主题 + * @param message 内容 + * @return void + */ + public void send(String topic, String message) { + mqttGateway.sendToMqtt(topic, message); + } + + /** + * 发送包含qos的消息 + * + * @param topic 主题 + * @param qos 质量 + * @param messageBody 消息体 + * @return void + */ + public void send(String topic, int qos, JSONObject messageBody) { + mqttGateway.sendToMqtt(topic, qos, messageBody.toString()); + } + + /** + * 发送包含qos的消息 + * + * @param topic 主题 + * @param qos 质量 + * @param message 消息体 + * @return void + */ + public void send(String topic, int qos, byte[] message) { + mqttGateway.sendToMqtt(topic, qos, message); + } +} diff --git a/data-storage/pom.xml b/data-storage/pom.xml index 62ea8f2..3aab973 100644 --- a/data-storage/pom.xml +++ b/data-storage/pom.xml @@ -16,7 +16,7 @@ com.huaxing - data-common + data-framework 0.0.1-SNAPSHOT diff --git a/pom.xml b/pom.xml index 058fa30..552afd4 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ data-bridge - data-common + data-framework data-storage