|
@ -35,7 +35,6 @@ public class MqttMessageConsumer extends MqttMessageReceiver { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
@Override |
|
|
@Async("handleMessage") |
|
|
@Async("handleMessage") |
|
|
public void handleMessage(Message<?> message) throws MessagingException { |
|
|
public void handleMessage(Message<?> message) throws MessagingException { |
|
@ -51,15 +50,9 @@ public class MqttMessageConsumer extends MqttMessageReceiver { |
|
|
if (topic.contains("iot/test1/in-storage")) { |
|
|
if (topic.contains("iot/test1/in-storage")) { |
|
|
// 模拟1000000条数据入库
|
|
|
// 模拟1000000条数据入库
|
|
|
log.info("接收到iot/test1/in-storage的消息啦,快去处理"); |
|
|
log.info("接收到iot/test1/in-storage的消息啦,快去处理"); |
|
|
long l = System.currentTimeMillis(); |
|
|
CompletableFuture.runAsync(() -> { |
|
|
System.currentTimeMillis(); |
|
|
|
|
|
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { |
|
|
|
|
|
analysisService.parseStoreData(payload); |
|
|
analysisService.parseStoreData(payload); |
|
|
}); |
|
|
}); |
|
|
// future.get();
|
|
|
|
|
|
long l1 = System.currentTimeMillis(); |
|
|
|
|
|
log.info("入库完成,耗时:{}", l1 - l); |
|
|
|
|
|
analysisService.parseStoreData(payload); |
|
|
|
|
|
} else if (topic.contains("table-update/")){} // TODO 表更新topic
|
|
|
} else if (topic.contains("table-update/")){} // TODO 表更新topic
|
|
|
} catch (Exception e) { |
|
|
} catch (Exception e) { |
|
|
log.error(e.toString()); |
|
|
log.error(e.toString()); |
|
|