From 824c5f5e42ca980e8e3ffb2d6218d0c990b64490 Mon Sep 17 00:00:00 2001 From: swordmeng Date: Mon, 20 Jan 2025 10:21:05 +0800 Subject: [PATCH] =?UTF-8?q?=E9=A1=B9=E7=9B=AE=E5=88=9D=E5=A7=8B=E5=8C=96?= =?UTF-8?q?=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitattributes | 2 + .gitignore | 33 +++ .mvn/wrapper/maven-wrapper.properties | 19 ++ data-common/pom.xml | 14 ++ .../com/huaxing/common/constant/AppConstant.java | 15 ++ data-storage/pom.xml | 111 +++++++++ .../java/com/huaxing/IotDataBridgeApplication.java | 16 ++ .../main/java/com/huaxing/TestDBConnection.java | 25 ++ .../com/huaxing/data/dolphindb/MyBatisConfig.java | 31 +++ .../huaxing/data/dolphindb/base/CommonService.java | 75 ++++++ .../dolphindb/config/DolphinDbConfiguration.java | 69 ++++++ .../config/DolphinDbPoolConfiguration.java | 29 +++ .../dolphindb/connection/AbstractDbConnector.java | 30 +++ .../dolphindb/connection/DbConnectorHelper.java | 41 ++++ .../data/storage/controller/TestController.java | 130 +++++++++++ .../data/storage/domain/DataAnalysisDTO.java | 34 +++ .../huaxing/data/storage/domain/DataQueryDTO.java | 27 +++ .../storage/mapper/IDeviceDataQueryDfsMapper.java | 21 ++ .../mapper/IDeviceDataQueryStreamMapper.java | 21 ++ .../storage/mapper/IDeviceDataStoredMapper.java | 20 ++ .../data/storage/service/IDataAnalysisService.java | 13 ++ .../service/IDeviceDataQueryDfsService.java | 18 ++ .../service/IDeviceDataQueryStreamService.java | 18 ++ .../storage/service/IDeviceDataStoredService.java | 19 ++ .../service/abstracts/SqlConverterStatement.java | 25 ++ .../data/storage/service/base/IDbSqlFactory.java | 19 ++ .../service/base/SqlConverterStatementHandle.java | 71 ++++++ .../storage/service/impl/DataAnalysisService.java | 46 ++++ .../impl/DeviceDataQueryDfsServiceImpl.java | 39 ++++ .../impl/DeviceDataQueryStreamServiceImpl.java | 39 ++++ .../service/impl/DeviceDataStoredServiceImpl.java | 52 +++++ .../tablemanagement/domain/TableColumnDTO.java | 28 +++ .../data/tablemanagement/domain/TableDTO.java | 29 +++ .../service/ITableStructureService.java | 40 ++++ .../service/impl/TableStructureService.java | 65 ++++++ .../java/com/huaxing/data/util/JacksonUtil.java | 77 ++++++ data-storage/src/main/java/com/huaxing/mqtt/ReadMe | 0 .../com/huaxing/mqtt/config/MqttConfiguration.java | 143 ++++++++++++ .../mqtt/config/MqttConsumerConfiguration.java | 96 ++++++++ .../mqtt/config/MqttProducerConfiguration.java | 65 ++++++ .../com/huaxing/mqtt/constant/MqttConstant.java | 17 ++ .../com/huaxing/mqtt/processor/MqttGateway.java | 46 ++++ .../mqtt/processor/MqttMessageReceiver.java | 66 ++++++ .../huaxing/mqtt/processor/MqttMessageSender.java | 52 +++++ data-storage/src/main/resources/application.yml | 48 ++++ .../main/resources/mapper/DeviceDataStoredDfs.xml | 79 +++++++ .../resources/mapper/DeviceDataStoredStream.xml | 9 + .../src/main/resources/sqlScripts.properties | 3 + data-storage/src/main/resources/补水箱.script | 71 ++++++ mvnw | 259 +++++++++++++++++++++ mvnw.cmd | 149 ++++++++++++ pom.xml | 37 +++ 52 files changed, 2501 insertions(+) create mode 100644 .gitattributes create mode 100644 .gitignore create mode 100644 .mvn/wrapper/maven-wrapper.properties create mode 100644 data-common/pom.xml create mode 100644 data-common/src/main/java/com/huaxing/common/constant/AppConstant.java create mode 100644 data-storage/pom.xml create mode 100644 data-storage/src/main/java/com/huaxing/IotDataBridgeApplication.java create mode 100644 data-storage/src/main/java/com/huaxing/TestDBConnection.java create mode 100644 data-storage/src/main/java/com/huaxing/data/dolphindb/MyBatisConfig.java create mode 100644 data-storage/src/main/java/com/huaxing/data/dolphindb/base/CommonService.java create mode 100644 data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbConfiguration.java create mode 100644 data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbPoolConfiguration.java create mode 100644 data-storage/src/main/java/com/huaxing/data/dolphindb/connection/AbstractDbConnector.java create mode 100644 data-storage/src/main/java/com/huaxing/data/dolphindb/connection/DbConnectorHelper.java create mode 100644 data-storage/src/main/java/com/huaxing/data/storage/controller/TestController.java create mode 100644 data-storage/src/main/java/com/huaxing/data/storage/domain/DataAnalysisDTO.java create mode 100644 data-storage/src/main/java/com/huaxing/data/storage/domain/DataQueryDTO.java create mode 100644 data-storage/src/main/java/com/huaxing/data/storage/mapper/IDeviceDataQueryDfsMapper.java create mode 100644 data-storage/src/main/java/com/huaxing/data/storage/mapper/IDeviceDataQueryStreamMapper.java create mode 100644 data-storage/src/main/java/com/huaxing/data/storage/mapper/IDeviceDataStoredMapper.java create mode 100644 data-storage/src/main/java/com/huaxing/data/storage/service/IDataAnalysisService.java create mode 100644 data-storage/src/main/java/com/huaxing/data/storage/service/IDeviceDataQueryDfsService.java create mode 100644 data-storage/src/main/java/com/huaxing/data/storage/service/IDeviceDataQueryStreamService.java create mode 100644 data-storage/src/main/java/com/huaxing/data/storage/service/IDeviceDataStoredService.java create mode 100644 data-storage/src/main/java/com/huaxing/data/storage/service/abstracts/SqlConverterStatement.java create mode 100644 data-storage/src/main/java/com/huaxing/data/storage/service/base/IDbSqlFactory.java create mode 100644 data-storage/src/main/java/com/huaxing/data/storage/service/base/SqlConverterStatementHandle.java create mode 100644 data-storage/src/main/java/com/huaxing/data/storage/service/impl/DataAnalysisService.java create mode 100644 data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataQueryDfsServiceImpl.java create mode 100644 data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataQueryStreamServiceImpl.java create mode 100644 data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataStoredServiceImpl.java create mode 100644 data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableColumnDTO.java create mode 100644 data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableDTO.java create mode 100644 data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableStructureService.java create mode 100644 data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableStructureService.java create mode 100644 data-storage/src/main/java/com/huaxing/data/util/JacksonUtil.java create mode 100644 data-storage/src/main/java/com/huaxing/mqtt/ReadMe create mode 100644 data-storage/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java create mode 100644 data-storage/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java create mode 100644 data-storage/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java create mode 100644 data-storage/src/main/java/com/huaxing/mqtt/constant/MqttConstant.java create mode 100644 data-storage/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java create mode 100644 data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java create mode 100644 data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java create mode 100644 data-storage/src/main/resources/application.yml create mode 100644 data-storage/src/main/resources/mapper/DeviceDataStoredDfs.xml create mode 100644 data-storage/src/main/resources/mapper/DeviceDataStoredStream.xml create mode 100644 data-storage/src/main/resources/sqlScripts.properties create mode 100644 data-storage/src/main/resources/补水箱.script create mode 100644 mvnw create mode 100644 mvnw.cmd create mode 100644 pom.xml diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..3b41682 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,2 @@ +/mvnw text eol=lf +*.cmd text eol=crlf diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..549e00a --- /dev/null +++ b/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 0000000..d58dfb7 --- /dev/null +++ b/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +wrapperVersion=3.3.2 +distributionType=only-script +distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.9/apache-maven-3.9.9-bin.zip diff --git a/data-common/pom.xml b/data-common/pom.xml new file mode 100644 index 0000000..d5526da --- /dev/null +++ b/data-common/pom.xml @@ -0,0 +1,14 @@ + + + 4.0.0 + + com.huaxing + data-bridge + 0.0.1-SNAPSHOT + + + data-common + + \ No newline at end of file diff --git a/data-common/src/main/java/com/huaxing/common/constant/AppConstant.java b/data-common/src/main/java/com/huaxing/common/constant/AppConstant.java new file mode 100644 index 0000000..594ebf6 --- /dev/null +++ b/data-common/src/main/java/com/huaxing/common/constant/AppConstant.java @@ -0,0 +1,15 @@ +package com.huaxing.common.constant; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.base.constant + * @ClassName: AppConstant + * @Author: swordmeng8@163.com + * @Description: 系统常量 + * @Date: 2025/1/10 10:53 + * @Version: 1.0 + */ + +public class AppConstant { + public static final String APP_NAME = "iot-data-bridge"; +} diff --git a/data-storage/pom.xml b/data-storage/pom.xml new file mode 100644 index 0000000..0d33494 --- /dev/null +++ b/data-storage/pom.xml @@ -0,0 +1,111 @@ + + + 4.0.0 + + com.huaxing + data-bridge + 0.0.1-SNAPSHOT + + + data-storage + jar + + + + org.projectlombok + lombok + RELEASE + provided + + + org.springframework.integration + spring-integration-mqtt + 6.4.1 + + + org.json + json + 20240303 + + + + org.springframework.boot + spring-boot-starter + 3.4.1 + + + org.springframework.boot + spring-boot-starter-web + 3.4.1 + + + + com.baomidou + mybatis-plus-core + 3.5.8 + + + junit + junit + 4.13-beta-3 + compile + + + org.aspectj + aspectjweaver + 1.9.22.1 + + + com.fasterxml.jackson.core + jackson-databind + 2.18.2 + + + + com.dolphindb + jdbc + 3.00.0.1 + + + + com.dolphindb + dolphindb-javaapi + 3.00.2.3 + + + org.springframework + spring-web + 6.2.1 + + + + + org.mybatis + mybatis + 3.5.5 + + + org.mybatis + mybatis-spring + 3.0.4 + + + + org.mybatis.spring.boot + mybatis-spring-boot-starter + 2.3.2 + + + + org.springframework.boot + spring-boot-starter-jdbc + 3.4.1 + + + + + + + \ No newline at end of file diff --git a/data-storage/src/main/java/com/huaxing/IotDataBridgeApplication.java b/data-storage/src/main/java/com/huaxing/IotDataBridgeApplication.java new file mode 100644 index 0000000..1aaf980 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/IotDataBridgeApplication.java @@ -0,0 +1,16 @@ +package com.huaxing; + +import org.mybatis.spring.annotation.MapperScan; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.ComponentScan; + +@SpringBootApplication +public class IotDataBridgeApplication { + + public static void main(String[] args) { + SpringApplication.run(IotDataBridgeApplication.class, args); + System.out.println("================= iot-data-bridge started! ================="); + } + +} diff --git a/data-storage/src/main/java/com/huaxing/TestDBConnection.java b/data-storage/src/main/java/com/huaxing/TestDBConnection.java new file mode 100644 index 0000000..89730fd --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/TestDBConnection.java @@ -0,0 +1,25 @@ +//package com.huaxing; +// +//import com.xxdb.DBConnection; +//import com.xxdb.data.Entity; +//import org.junit.Test; +// +//import java.io.IOException; +// +//public class TestDBConnection { +// // host +// private static final String HOST = "localhost"; +// // port +// private static final int PORT = 8848; +// +// DBConnection dbConnection = new DBConnection(); +// +// @Test +// public void testDBConnectAndRun() throws IOException { +// dbConnection.connect(HOST, PORT); +// dbConnection.login("admin", "123456", true); +// Entity entity = dbConnection.run("INSERT INTO ZbWaterMeterStream (WM_WFA_Unit, temperature, humidity, time, projectId, deviceId) VALUES ('m3', 5.6, 4.5, 1736388176712, '48', '0jZU2102_0806_0000')"); +// Entity entity2 = dbConnection.run("addColumn(loadTable(\"dfs://ZbDB\", \"ZbWaterMeterDfs\"),`test3,STRING)"); +// dbConnection.close(); +// } +//} \ No newline at end of file diff --git a/data-storage/src/main/java/com/huaxing/data/dolphindb/MyBatisConfig.java b/data-storage/src/main/java/com/huaxing/data/dolphindb/MyBatisConfig.java new file mode 100644 index 0000000..a459535 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/dolphindb/MyBatisConfig.java @@ -0,0 +1,31 @@ +package com.huaxing.data.dolphindb; + +import org.apache.ibatis.session.SqlSessionFactory; +import org.mybatis.spring.SqlSessionFactoryBean; +import org.mybatis.spring.SqlSessionTemplate; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.support.PathMatchingResourcePatternResolver; +import org.springframework.core.io.support.ResourcePatternResolver; +import javax.sql.DataSource; + + +@Configuration +public class MyBatisConfig { + @Bean + public SqlSessionFactory sqlSessionFactory(@Qualifier(value = "dataSource") DataSource dataSource) throws Exception { + SqlSessionFactoryBean bean = new SqlSessionFactoryBean(); + bean.setDataSource(dataSource); + // 设置 Mapper 的 XML 文件位置 + ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); + bean.setMapperLocations(resolver.getResources("classpath:com/huaxing/data/storage/mapper/*.xml")); + return bean.getObject(); + } + + + @Bean + public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) { + return new SqlSessionTemplate(sqlSessionFactory); + } +} \ No newline at end of file diff --git a/data-storage/src/main/java/com/huaxing/data/dolphindb/base/CommonService.java b/data-storage/src/main/java/com/huaxing/data/dolphindb/base/CommonService.java new file mode 100644 index 0000000..b66d585 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/dolphindb/base/CommonService.java @@ -0,0 +1,75 @@ +package com.huaxing.data.dolphindb.base; + +import com.huaxing.data.dolphindb.connection.AbstractDbConnector; +import com.huaxing.data.storage.service.base.SqlConverterStatementHandle; +import com.xxdb.DBConnection; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.List; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.storage.service.base + * @ClassName: CommonService + * @Author: swordmeng8@163.com + * @Description: 全局通用 + * @Date: 2025/1/15 15:19 + * @Version: 1.0 + */ +@Slf4j +public class CommonService extends SqlConverterStatementHandle { + + @Resource + AbstractDbConnector dbPool; + + /** + * 执行单条sql + * + * @param sql + */ + public void executeOnce(String sql) { + DBConnection connection = dbPool.getConnection(); + try { + connection.run(sql); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + connection.close(); + } + } + + /** + * 执行单条sql + * + * @param sql + */ + public void exec(String sql) { + DBConnection connection = dbPool.getConnection(); + try { + connection.run(sql); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + connection.close(); + } + } + + /** + * 执行批量sql + * @param sqlList + */ + public void executeBatch(List sqlList) { + DBConnection connection = dbPool.getConnection(); + sqlList.forEach(sql -> { + try { + connection.run(sql); + } catch (IOException e) { + log.error("AbstractDbConnector.executeBatch() Method执行异常:{}", e.getMessage()); + } + }); + connection.close(); + } + +} diff --git a/data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbConfiguration.java b/data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbConfiguration.java new file mode 100644 index 0000000..217f824 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbConfiguration.java @@ -0,0 +1,69 @@ +package com.huaxing.data.dolphindb.config; + +import com.xxdb.*; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + + +/** + * mqtt全局相关配置信息 + */ +@Slf4j +@Setter +@Getter +@Configuration +@ConfigurationProperties("dolphindb") +@SuppressWarnings("all") +public class DolphinDbConfiguration extends SimpleDBConnectionPoolConfig { + + /** + * 连接地址 dfs://ZBdb + */ + private String dbPath; + /** + * 主机名 + */ + private String host; + /** + * 端口号 + */ + private int port; + /** + * 用户名 + */ + private String username; + /** + * 密码 + */ + private String password; + /** + * 连接池初始化大小 + */ + private int initPoolSize; + /** + * 表示连接池中最小连接数,正整数,默认值为 5 + */ + private boolean minimumPoolSize; + /** + * 是否开启高可用 + */ + private boolean enableHighAvailability; + + /** + * 创建连接池配置初始化方法 + * @return SimpleDBConnectionPoolConfig + */ + public SimpleDBConnectionPoolConfig loadConfig() { + SimpleDBConnectionPoolConfig config = new SimpleDBConnectionPoolConfig(); + config.setHostName(host); + config.setPort(port); + config.setUserId(username); + config.setPassword(password); + config.setInitialPoolSize(initPoolSize); + config.setEnableHighAvailability(enableHighAvailability); + return config; + } +} diff --git a/data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbPoolConfiguration.java b/data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbPoolConfiguration.java new file mode 100644 index 0000000..a2684a3 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbPoolConfiguration.java @@ -0,0 +1,29 @@ +package com.huaxing.data.dolphindb.config; + +import com.xxdb.*; +import org.springframework.context.annotation.Configuration; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.dolphindb.config + * @ClassName: DbClient + * @Author: swordmeng8@163.com + * @Description: 数据库客户端 + * @Date: 2025/1/15 9:58 + * @Version: 1.0 + */ +@Configuration +public class DolphinDbPoolConfiguration extends SimpleDBConnectionPool { + + public DBConnection dbConn = null; + + /** + * 构造方法 + * @param dbConfiguration + */ + + public DolphinDbPoolConfiguration(DolphinDbConfiguration dbConfiguration) { + super(dbConfiguration.loadConfig()); + this.dbConn = super.getConnection(); + } +} diff --git a/data-storage/src/main/java/com/huaxing/data/dolphindb/connection/AbstractDbConnector.java b/data-storage/src/main/java/com/huaxing/data/dolphindb/connection/AbstractDbConnector.java new file mode 100644 index 0000000..db5effc --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/dolphindb/connection/AbstractDbConnector.java @@ -0,0 +1,30 @@ +package com.huaxing.data.dolphindb.connection; + +import com.xxdb.DBConnection; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.List; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.dolphindb.connection + * @ClassName: DbConnector + * @Author: swordmeng8@163.com + * @Description: dolphindb 连接器 + * @Date: 2025/1/13 14:29 + * @Version: 1.0 + */ +@Accessors(chain = true) +@Slf4j +@SuppressWarnings("all") +public abstract class AbstractDbConnector { + + /** + * 连接 dolphindb + */ + public abstract DBConnection getConnection(); + + +} diff --git a/data-storage/src/main/java/com/huaxing/data/dolphindb/connection/DbConnectorHelper.java b/data-storage/src/main/java/com/huaxing/data/dolphindb/connection/DbConnectorHelper.java new file mode 100644 index 0000000..04e9f29 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/dolphindb/connection/DbConnectorHelper.java @@ -0,0 +1,41 @@ +package com.huaxing.data.dolphindb.connection; + +import com.huaxing.data.dolphindb.config.DolphinDbConfiguration; +import com.huaxing.data.dolphindb.config.DolphinDbPoolConfiguration; +import com.xxdb.DBConnection; +import jakarta.annotation.PostConstruct; +import org.springframework.stereotype.Component; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.dolphindb.connection + * @ClassName: DbConnectorHelper + * @Author: swordmeng8@163.com + * @Description: 数据库处理器 + * @Date: 2025/1/13 17:53 + * @Version: 1.0 + */ + +@Component +@SuppressWarnings("all") +public class DbConnectorHelper extends AbstractDbConnector { + + final DolphinDbPoolConfiguration dbPool; + final DolphinDbConfiguration dbConfiguration; + public DbConnectorHelper(DolphinDbConfiguration dbConfiguration, DolphinDbPoolConfiguration dbPool) { + this.dbConfiguration = dbConfiguration; + this.dbPool = dbPool; + } + + + + /** + * 获取数据库连接 + * @return DBConnection + */ + @PostConstruct + @Override + public DBConnection getConnection() { + return dbPool.dbConn; + } +} diff --git a/data-storage/src/main/java/com/huaxing/data/storage/controller/TestController.java b/data-storage/src/main/java/com/huaxing/data/storage/controller/TestController.java new file mode 100644 index 0000000..82a9e3e --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/storage/controller/TestController.java @@ -0,0 +1,130 @@ +package com.huaxing.data.storage.controller; + +import com.huaxing.data.storage.domain.DataAnalysisDTO; +import com.huaxing.data.storage.service.IDeviceDataQueryDfsService; +import com.huaxing.data.storage.service.IDeviceDataQueryStreamService; +import com.huaxing.data.storage.service.IDeviceDataStoredService; +import com.huaxing.data.tablemanagement.service.ITableStructureService; +import com.huaxing.data.util.JacksonUtil; +import com.huaxing.mqtt.processor.MqttMessageSender; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.storage.controller + * @ClassName: TestController + * @Author: swordmeng8@163.com + * @Description: 测试controller + * @Date: 2025/1/15 17:08 + * @Version: 1.0 + */ +@Slf4j +@RestController +@RequestMapping("/test") +@SuppressWarnings("all") +public class TestController { + + @Resource + private MqttMessageSender messageSender; + final IDeviceDataStoredService dataStoredService; + final IDeviceDataQueryDfsService dataQueryDfsService; + final IDeviceDataQueryStreamService dataQueryStreamService; + final ITableStructureService tableStructureService; + +// // 依赖注入 + public TestController(IDeviceDataStoredService dataStoredService, IDeviceDataQueryDfsService dataQueryDfsService, IDeviceDataQueryStreamService dataQueryStreamService, ITableStructureService tableStructureService) { + this.dataStoredService = dataStoredService; + this.dataQueryDfsService = dataQueryDfsService; + this.dataQueryStreamService = dataQueryStreamService; + this.tableStructureService = tableStructureService; + } + + // 测试插入数据 + @GetMapping(value = "/testInsert") // 成功 + public void testInsert() { + String sql = "INSERT INTO ZbWaterMeter1Stream (time, projectId, deviceId, WM_WFA, WM_WFA_Unit) VALUES (2024.11.01 00:00:00,'48', '0jZU2102_0806_0011', 124.656, 'm³')"; + dataStoredService.execute(sql); + log.info("SUCCESS"); + } + + // 测试Dfs表查询 + @GetMapping(value = "/testSelectDfs") + public List> testSelectDfs() { + String dbPath = "dfs://ZbDB"; + String sql = String.format("select * from loadTable('%s','%s')", dbPath, "ZbWaterMeter1Dfs"); + return dataQueryDfsService.selectList(sql); + } + + // 测试订阅流表查询 + @GetMapping(value = "/testSelectStream") + public List> testSelectStream() { + String sql = "select * from ZbWaterMeter1Stream"; + return dataQueryStreamService.selectList(sql); + } + + + // 给指定的流表增加列字段 + @GetMapping(value = "/testStreamAddColumn") + public void testStreamAddColumn() { + String tableName = "ZbWaterMeter1Stream"; + String columnName = "WM_WFA_Unit14"; + String columnDefinition = "STRING"; + try { + tableStructureService.addStreamColumn(tableName, columnName, columnDefinition); + log.info("SUCCESS"); + } catch (Exception e) { + log.error("FAIL"); + e.printStackTrace(); + } + } + + @GetMapping(value = "/testDfsAddColumn") + public void testDfsAddColumn() { + String tableName = "ZbWaterMeter1Dfs"; + String columnName = "test2"; + String columnDefinition = "STRING"; + try { + tableStructureService.addDfsColumn(tableName, columnName, columnDefinition); + log.info("SUCCESS"); + } catch (Exception e) { + log.error("FAIL"); + e.printStackTrace(); + } + } + + + // 向消息队列中发送100W条数据 + @GetMapping(value = "/testSendMessage") + public void testSendMessage() { + for (int i = 0; i < 2; i++) { + List> dataList = new ArrayList<>(); + dataList.add(handleMapByIndex(i)); + DataAnalysisDTO dataAnalysisDTO = DataAnalysisDTO.builder().tableName( "WaterMeterTsetStream").dataList(dataList).build(); + messageSender.send("iot/test1/in-storage", JacksonUtil.objectStr(dataAnalysisDTO)); + } + } + + private Map handleMapByIndex(int index) { + Map map = new HashMap<>(); + map.put("time", "2025.01.01 00:00:00"); + map.put("projectId", "0jZU2102"); + map.put("deviceId", "0jZU2102_0806_0011"); + map.put("WM_WFA", 124.656 + index); + map.put("WM_WFA_Unit", "m³"); + return map; + } + + + + +} diff --git a/data-storage/src/main/java/com/huaxing/data/storage/domain/DataAnalysisDTO.java b/data-storage/src/main/java/com/huaxing/data/storage/domain/DataAnalysisDTO.java new file mode 100644 index 0000000..59cf0a5 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/storage/domain/DataAnalysisDTO.java @@ -0,0 +1,34 @@ +package com.huaxing.data.storage.domain; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; + +import java.util.List; +import java.util.Map; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.storage.domain + * @ClassName: CommonDeviceControl + * @Author: swordmeng8@163.com + * @Description: 设备入库数据接收实体 + * @Date: 2025/1/10 18:11 + * @Version: 1.0 + */ +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Accessors(chain = true) +@Data +public class DataAnalysisDTO { + + // 表名 + private String tableName; + + // 设备数据集 + private List> dataList; + +} diff --git a/data-storage/src/main/java/com/huaxing/data/storage/domain/DataQueryDTO.java b/data-storage/src/main/java/com/huaxing/data/storage/domain/DataQueryDTO.java new file mode 100644 index 0000000..eb0a098 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/storage/domain/DataQueryDTO.java @@ -0,0 +1,27 @@ +package com.huaxing.data.storage.domain; + +import lombok.Data; +import lombok.experimental.Accessors; + +import java.util.List; +import java.util.Map; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.storage.domain + * @ClassName: CommonDeviceControl + * @Author: swordmeng8@163.com + * @Description: 数据查询DTO + * @Date: 2025/1/10 18:11 + * @Version: 1.0 + */ + +@Accessors(chain = true) +@Data +public class DataQueryDTO { + // 表名 + private String tableName; + // 设备数据集 + private List tableColumns; + +} \ No newline at end of file diff --git a/data-storage/src/main/java/com/huaxing/data/storage/mapper/IDeviceDataQueryDfsMapper.java b/data-storage/src/main/java/com/huaxing/data/storage/mapper/IDeviceDataQueryDfsMapper.java new file mode 100644 index 0000000..b150bca --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/storage/mapper/IDeviceDataQueryDfsMapper.java @@ -0,0 +1,21 @@ +package com.huaxing.data.storage.mapper; + +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Select; + +import java.util.List; +import java.util.Map; + +/** + * ClassName: IDeviceDataQuery + * Description: 查询Dfs表数据 + * @author 孟剑 + * @date 2025-01-16 16:49 + */ +@Mapper +@SuppressWarnings("all") +public interface IDeviceDataQueryDfsMapper { + + @Select("${sql}") + List> selectList(String sql); +} diff --git a/data-storage/src/main/java/com/huaxing/data/storage/mapper/IDeviceDataQueryStreamMapper.java b/data-storage/src/main/java/com/huaxing/data/storage/mapper/IDeviceDataQueryStreamMapper.java new file mode 100644 index 0000000..8cba124 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/storage/mapper/IDeviceDataQueryStreamMapper.java @@ -0,0 +1,21 @@ +package com.huaxing.data.storage.mapper; + +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Select; + +import java.util.List; +import java.util.Map; + +/** + * ClassName: IDeviceDataQuery + * Description: 查询Stream表数据 + * @author 孟剑 + * @date 2025-01-16 16:49 + */ +@Mapper +@SuppressWarnings("all") +public interface IDeviceDataQueryStreamMapper { + + @Select("${sql}") + List> selectList(String sql); +} diff --git a/data-storage/src/main/java/com/huaxing/data/storage/mapper/IDeviceDataStoredMapper.java b/data-storage/src/main/java/com/huaxing/data/storage/mapper/IDeviceDataStoredMapper.java new file mode 100644 index 0000000..f1f7b82 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/storage/mapper/IDeviceDataStoredMapper.java @@ -0,0 +1,20 @@ +package com.huaxing.data.storage.mapper; + +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Select; + +import java.util.List; +import java.util.Map; + +/** + * @author 孟剑 + * @date 2025-01-16 15:23 + */ +@Mapper +@SuppressWarnings("all") +public interface IDeviceDataStoredMapper { + + @Select("${sql}") + List> selectList (String sql); + +} diff --git a/data-storage/src/main/java/com/huaxing/data/storage/service/IDataAnalysisService.java b/data-storage/src/main/java/com/huaxing/data/storage/service/IDataAnalysisService.java new file mode 100644 index 0000000..bd099b2 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/storage/service/IDataAnalysisService.java @@ -0,0 +1,13 @@ +package com.huaxing.data.storage.service; + +/** + * 入库数据解析服务 + * @author 孟剑 + * @date 2025-01-13 11:16 + */ +public interface IDataAnalysisService { + + // 解析入库数据 + void parseStoreData (String jsonData); + +} diff --git a/data-storage/src/main/java/com/huaxing/data/storage/service/IDeviceDataQueryDfsService.java b/data-storage/src/main/java/com/huaxing/data/storage/service/IDeviceDataQueryDfsService.java new file mode 100644 index 0000000..20dbe11 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/storage/service/IDeviceDataQueryDfsService.java @@ -0,0 +1,18 @@ +package com.huaxing.data.storage.service; + +import java.util.List; +import java.util.Map; + +/** + * @author 孟剑 + * @date 2025-01-16 16:45 + */ +public interface IDeviceDataQueryDfsService { + + /** + * 查询Dfs表 list数据 + * @param sql + * @return + */ + List> selectList(String sql); +} diff --git a/data-storage/src/main/java/com/huaxing/data/storage/service/IDeviceDataQueryStreamService.java b/data-storage/src/main/java/com/huaxing/data/storage/service/IDeviceDataQueryStreamService.java new file mode 100644 index 0000000..232bf24 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/storage/service/IDeviceDataQueryStreamService.java @@ -0,0 +1,18 @@ +package com.huaxing.data.storage.service; + +import java.util.List; +import java.util.Map; + +/** + * @author 孟剑 + * @date 2025-01-16 16:45 + */ +public interface IDeviceDataQueryStreamService { + + /** + * 查询Dfs表 list数据 + * @param sql + * @return + */ + List> selectList(String sql); +} diff --git a/data-storage/src/main/java/com/huaxing/data/storage/service/IDeviceDataStoredService.java b/data-storage/src/main/java/com/huaxing/data/storage/service/IDeviceDataStoredService.java new file mode 100644 index 0000000..387056e --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/storage/service/IDeviceDataStoredService.java @@ -0,0 +1,19 @@ +package com.huaxing.data.storage.service; + +import com.huaxing.data.storage.domain.DataAnalysisDTO; + +/** + * ClassName: IDeviceDataStoredService + * Description: 设备采集入库与查询通用业务接口 + */ +public interface IDeviceDataStoredService { + + /** + * 设备采集入库 + * @param analysisDTO + */ + void insert(DataAnalysisDTO analysisDTO); + + void execute(String sql); + +} diff --git a/data-storage/src/main/java/com/huaxing/data/storage/service/abstracts/SqlConverterStatement.java b/data-storage/src/main/java/com/huaxing/data/storage/service/abstracts/SqlConverterStatement.java new file mode 100644 index 0000000..930ffc2 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/storage/service/abstracts/SqlConverterStatement.java @@ -0,0 +1,25 @@ +package com.huaxing.data.storage.service.abstracts; + +import java.util.Map; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.storage.service.base + * @ClassName: SqlConverterStatement + * @Author: swordmeng8@163.com + * @Description: SQL转换语句 + * @Date: 2025/1/14 14:01 + * @Version: 1.0 + */ + +abstract public class SqlConverterStatement { + + /** + * 生成插入语句 + * @param tableName + * @param map + * @return + */ + public abstract String generateInsertStreamStatement(String tableName, Map map); + +} diff --git a/data-storage/src/main/java/com/huaxing/data/storage/service/base/IDbSqlFactory.java b/data-storage/src/main/java/com/huaxing/data/storage/service/base/IDbSqlFactory.java new file mode 100644 index 0000000..8ebc938 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/storage/service/base/IDbSqlFactory.java @@ -0,0 +1,19 @@ +package com.huaxing.data.storage.service.base; + +import java.util.Map; + +/** + * @author 孟剑 + * @date 2025-01-16 16:54 + */ +public interface IDbSqlFactory { + + // 生成插入流式语句 + String generateInsertStreamStatement(String tableName, Map map); + + // 生成Stream表查询语句 + String generateSelectStreamStatement(String tableName, Map map); + + // 生成Dfs表查询语句 + String generateSelectDfsStatement(String tableName, Map map); +} diff --git a/data-storage/src/main/java/com/huaxing/data/storage/service/base/SqlConverterStatementHandle.java b/data-storage/src/main/java/com/huaxing/data/storage/service/base/SqlConverterStatementHandle.java new file mode 100644 index 0000000..f8db948 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/storage/service/base/SqlConverterStatementHandle.java @@ -0,0 +1,71 @@ +package com.huaxing.data.storage.service.base; + +import com.huaxing.data.storage.service.abstracts.SqlConverterStatement; + +import java.util.List; +import java.util.Map; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.storage.service.base + * @ClassName: SqlConverterStatementHandle + * @Author: swordmeng8@163.com + * @Description: sql处理类 + * @Date: 2025/1/14 14:05 + * @Version: 1.0 + */ + +public class SqlConverterStatementHandle extends SqlConverterStatement { + + /** + * 生成插入语句 + * + * @param tableName 表名 + * @param map 键值对 + * @return 插入语句 + */ + @Override + public String generateInsertStreamStatement(String tableName, Map map) { + StringBuilder columnBuilder = new StringBuilder(); + StringBuilder valueBuilder = new StringBuilder(); + int index = 0; + for (Map.Entry entry : map.entrySet()) { + // 处理列名 + columnBuilder.append(entry.getKey()); + // 处理值 + if (entry.getValue() instanceof String) { + valueBuilder.append("'").append(entry.getValue()).append("'"); + } else if (entry.getValue() == null) { + valueBuilder.append("NULL"); + } else { + valueBuilder.append(entry.getValue()); + } + if (index < map.size() - 1) { + columnBuilder.append(", "); + valueBuilder.append(", "); + } + index++; + } + return "INSERT INTO " + tableName + " (" + columnBuilder + ") VALUES (" + valueBuilder + ")"; + } + + + /** + * 生成流表Stream查询语句 + * @param tableName + * @param columnNameList + * @return + */ + public String generateSelectStreamStatement(String tableName, List columnNameList) { + StringBuilder columnBuilder = new StringBuilder(); + if (columnNameList.isEmpty()) { + columnBuilder.append(" * "); + } else { + columnNameList.forEach(columnName -> columnBuilder.append(columnName).append(", ")); + } + return "SELECT " + columnBuilder.toString() + " FROM " + tableName; + } + + + +} diff --git a/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DataAnalysisService.java b/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DataAnalysisService.java new file mode 100644 index 0000000..4b76ea4 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DataAnalysisService.java @@ -0,0 +1,46 @@ +package com.huaxing.data.storage.service.impl; + +import com.huaxing.data.storage.domain.DataAnalysisDTO; +import com.huaxing.data.storage.service.IDataAnalysisService; +import com.huaxing.data.storage.service.IDeviceDataStoredService; +import com.huaxing.data.util.JacksonUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.storage.service.impl + * @ClassName: DataAnalysisService + * @Author: swordmeng8@163.com + * @Description: 入库数据解析服务 + * @Date: 2025/1/13 11:16 + * @Version: 1.0 + */ +@Slf4j +@Service +public class DataAnalysisService implements IDataAnalysisService { + + final IDeviceDataStoredService dataStoredService; + + public DataAnalysisService(IDeviceDataStoredService dataStoredService) { + this.dataStoredService = dataStoredService; + } + + int row = 0; + /** + * @author: swordmeng8@163.com + * @date: 2025/1/13 11:16 + * @desc: 解析入库数据 + * @param jsonData + * @return void + */ + @Override + public void parseStoreData(String jsonData) { + DataAnalysisDTO dataAnalysisDTO = JacksonUtil.strToObject(jsonData, DataAnalysisDTO.class); + log.info("入库数据解析完成"); + dataStoredService.insert(dataAnalysisDTO); + log.info("入库数据入库完成"); + row++; + log.info("入库数据入库完成,入库总数:{}", row); + } +} diff --git a/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataQueryDfsServiceImpl.java b/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataQueryDfsServiceImpl.java new file mode 100644 index 0000000..0df8687 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataQueryDfsServiceImpl.java @@ -0,0 +1,39 @@ +package com.huaxing.data.storage.service.impl; + +import com.huaxing.data.storage.mapper.IDeviceDataQueryDfsMapper; +import com.huaxing.data.storage.service.IDeviceDataQueryDfsService; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.Map; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.storage.service.impl + * @ClassName: DeviceDataQueryDfsServiceImpl + * @Author: swordmeng8@163.com + * @Description: 设备数据查询业务层 + * @Date: 2025/1/16 16:45 + * @Version: 1.0 + */ +@Service +public class DeviceDataQueryDfsServiceImpl implements IDeviceDataQueryDfsService { + // 构造器注入 + private final IDeviceDataQueryDfsMapper deviceDataQueryDfsMapper; + public DeviceDataQueryDfsServiceImpl(IDeviceDataQueryDfsMapper deviceDataQueryDfsMapper) { + this.deviceDataQueryDfsMapper = deviceDataQueryDfsMapper; + } + + + /** + * @author: swordmeng8@163.com + * @date: 2025/1/16 16:45 + * @desc: 查询数据 + * @param sql + * @return java.util.List> + */ + @Override + public List> selectList(String sql) { + return deviceDataQueryDfsMapper.selectList(sql); + } +} diff --git a/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataQueryStreamServiceImpl.java b/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataQueryStreamServiceImpl.java new file mode 100644 index 0000000..93cb09c --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataQueryStreamServiceImpl.java @@ -0,0 +1,39 @@ +package com.huaxing.data.storage.service.impl; + +import com.huaxing.data.storage.mapper.IDeviceDataQueryStreamMapper; +import com.huaxing.data.storage.service.IDeviceDataQueryStreamService; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.Map; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.storage.service.impl + * @ClassName: DeviceDataQueryStreamServiceImpl + * @Author: swordmeng8@163.com + * @Description: 设备数据查询业务层 + * @Date: 2025/1/16 16:45 + * @Version: 1.0 + */ +@Service +public class DeviceDataQueryStreamServiceImpl implements IDeviceDataQueryStreamService { + // 构造器注入 + private final IDeviceDataQueryStreamMapper dataQueryStreamMapper; + public DeviceDataQueryStreamServiceImpl(IDeviceDataQueryStreamMapper dataQueryStreamMapper) { + this.dataQueryStreamMapper = dataQueryStreamMapper; + } + + + /** + * @author: swordmeng8@163.com + * @date: 2025/1/16 16:45 + * @desc: 查询数据 + * @param sql + * @return java.util.List> + */ + @Override + public List> selectList(String sql) { + return dataQueryStreamMapper.selectList(sql); + } +} diff --git a/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataStoredServiceImpl.java b/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataStoredServiceImpl.java new file mode 100644 index 0000000..67c7f37 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataStoredServiceImpl.java @@ -0,0 +1,52 @@ +package com.huaxing.data.storage.service.impl; + +import com.huaxing.data.storage.domain.DataAnalysisDTO; +import com.huaxing.data.storage.mapper.IDeviceDataStoredMapper; +import com.huaxing.data.storage.service.IDeviceDataStoredService; +import com.huaxing.data.dolphindb.base.CommonService; +import com.huaxing.data.util.JacksonUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + + +/** + * @ClassName DeviceDataStoredServiceImpl + * @Description 设备采集入库与查询服务实现类 + * @Author swordmeng8@163.com + * @Date 2024/12/2 18:59 + * @Version 1.0 + **/ +@Slf4j +@Service +@SuppressWarnings("all") +public class DeviceDataStoredServiceImpl extends CommonService implements IDeviceDataStoredService { + + final IDeviceDataStoredMapper deviceDataStoredMapper; + + public DeviceDataStoredServiceImpl(IDeviceDataStoredMapper deviceDataStoredMapper) { + this.deviceDataStoredMapper = deviceDataStoredMapper; + } + + /** + * 插入数据 + * @param dataAnalysis + */ + @Override + public void insert(DataAnalysisDTO dataAnalysis) { + String tableName = dataAnalysis.getTableName(); + dataAnalysis.getDataList().forEach(map -> { +// CompletableFuture.runAsync(() -> { +// +// }); + log.info("入库数据:{}", JacksonUtil.objectStr(map)); + executeOnce(generateInsertStreamStatement(tableName, map)); + }); + } + + @Override + public void execute(String sql) { + executeOnce(sql); + } + + +} diff --git a/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableColumnDTO.java b/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableColumnDTO.java new file mode 100644 index 0000000..6f45e04 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableColumnDTO.java @@ -0,0 +1,28 @@ +package com.huaxing.data.tablemanagement.domain; + +import lombok.Data; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.tablemanagement.domain + * @ClassName: TableColumnDTO + * @Author: swordmeng8@163.com + * @Description: 表列DTO + * @Date: 2025/1/15 16:05 + * @Version: 1.0 + */ +@Data +public class TableColumnDTO { + /** + * 列名 + */ + private String columnName; + /** + * 列类型 + */ + private String columnType; + /** + * 列描述 + */ + private String columnDesc; +} diff --git a/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableDTO.java b/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableDTO.java new file mode 100644 index 0000000..2f943ef --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableDTO.java @@ -0,0 +1,29 @@ +package com.huaxing.data.tablemanagement.domain; + +import lombok.Data; + +import java.util.List; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.tablemanagement.domain + * @ClassName: TableDTO + * @Author: swordmeng8@163.com + * @Description: 表操作类 + * @Date: 2025/1/15 16:03 + * @Version: 1.0 + */ + +@Data +public class TableDTO { + + /** + * 表名 + */ + private String tableName; + /** + * 表列信息 + */ + private List tableColumnList; + +} diff --git a/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableStructureService.java b/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableStructureService.java new file mode 100644 index 0000000..46d9c38 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableStructureService.java @@ -0,0 +1,40 @@ +package com.huaxing.data.tablemanagement.service; + +import java.sql.SQLException; +import java.util.List; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.storage.service + * @ClassName: ITableStructureService + * @Author: swordmeng8@163.com + * @Description: 表结构操作service + * @Date: 2025/1/13 11:02 + * @Version: 1.0 + */ +public interface ITableStructureService { + + // 添加dfs表单列 + void addDfsColumn(String tableName, String columnName, String columnDefinition) throws SQLException; + // 添加dfs表多列 + void addDfsColumns(String tableName, List columnName, String columnDefinition) throws SQLException; + + // 添加Stream流表单列 + void addStreamColumn(String tableName, String columnName, String columnDefinition) throws SQLException; + + // 添加Stream流表多列 + void addStreamColumns(String tableName, String columnName, String columnDefinition) throws SQLException; + + // 创建dfs表 + void createDfsTable(String tableName, String columnName, String columnDefinition) throws SQLException; + + // 创建Stream流表 + void createStreamTable(String tableName, String columnName, String columnDefinition) throws SQLException; + + // 订阅流表 + void subscribeStreamTable(String tableName, String columnName, String columnDefinition) throws SQLException; + + // 判断表是否存在 + boolean isTableExist(String tableName) throws SQLException; + +} diff --git a/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableStructureService.java b/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableStructureService.java new file mode 100644 index 0000000..63b02cf --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableStructureService.java @@ -0,0 +1,65 @@ +package com.huaxing.data.tablemanagement.service.impl; + +import com.huaxing.data.dolphindb.base.CommonService; +import com.huaxing.data.tablemanagement.service.ITableStructureService; +import org.springframework.stereotype.Service; + +import java.sql.SQLException; +import java.util.List; + +/** + * @ProjectName: iot-data-bridge + * @Package: com.huaxing.data.storage.service.impl + * @ClassName: TableStructureService + * @Author: swordmeng8@163.com + * @Description: 表结构操作service + * @Date: 2025/1/13 11:02 + * @Version: 1.0 + */ +@Service +public class TableStructureService extends CommonService implements ITableStructureService { + + @Override + public void addDfsColumn(String tableName, String columnName, String columnDefinition) throws SQLException { + String script = + "pt = loadTable(\"dfs://ZbDB\", `" + tableName + ");\n" + + "alter table pt add " + columnName + " " + columnDefinition + ";"; + exec(script); + } + + @Override + public void addDfsColumns(String tableName, List columnName, String columnDefinition) throws SQLException { + exec(""); + } + + @Override + public void addStreamColumn(String tableName, String columnName, String columnDefinition) throws SQLException { + exec("addColumn(" + tableName + ", " + columnName + "," + columnDefinition + ")"); + } + + @Override + public void addStreamColumns(String tableName, String columnName, String columnDefinition) throws SQLException { + exec(""); + } + + @Override + public void createDfsTable(String tableName, String columnName, String columnDefinition) throws SQLException { + exec(""); + } + + @Override + public void createStreamTable(String tableName, String columnName, String columnDefinition) throws SQLException { + exec(""); + } + + // 订阅流表 + @Override + public void subscribeStreamTable(String tableName, String columnName, String columnDefinition) throws SQLException { + exec(""); + } + + @Override + public boolean isTableExist(String tableName) throws SQLException { + return false; + } +} diff --git a/data-storage/src/main/java/com/huaxing/data/util/JacksonUtil.java b/data-storage/src/main/java/com/huaxing/data/util/JacksonUtil.java new file mode 100644 index 0000000..3753c0a --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/data/util/JacksonUtil.java @@ -0,0 +1,77 @@ +package com.huaxing.data.util; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author kwl99 + * @since 2024-08-08 14:31 + */ +public class JacksonUtil { + + /** + * 将对象转为json字符串 + */ + public static String objectStr(Object object) { + ObjectMapper mapper = new ObjectMapper(); + try { + + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + + return mapper.writeValueAsString(object); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + /** + * 将json字符串转为对象 + */ + public static T strToObject(String str, Class clazz) { + ObjectMapper mapper = new ObjectMapper(); + try { + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return mapper.readValue(str, clazz); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + + /** + * 将json字符串转为对象列表 + */ + public static List strToObjectList(String str, Class clazz) { + ObjectMapper mapper = new ObjectMapper(); + try { + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return mapper.readValue(str, mapper.getTypeFactory().constructCollectionType(List.class, clazz)); + } catch (Exception e) { + throw new RuntimeException(e); + + } + } + + /** + * 将json字符串转为对象列表 + */ + public static List> strToMapList(String str) { + ObjectMapper mapper = new ObjectMapper(); + try { + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return mapper.readValue(str, mapper.getTypeFactory().constructCollectionType(List.class, HashMap.class)); + } catch (Exception e) { + throw new RuntimeException(e); + + } + } + +} diff --git a/data-storage/src/main/java/com/huaxing/mqtt/ReadMe b/data-storage/src/main/java/com/huaxing/mqtt/ReadMe new file mode 100644 index 0000000..e69de29 diff --git a/data-storage/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java b/data-storage/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java new file mode 100644 index 0000000..af9bf76 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java @@ -0,0 +1,143 @@ +package com.huaxing.mqtt.config; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.util.CollectionUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * mqtt全局相关配置信息 + */ +@Slf4j +@Setter +@Getter +@Configuration +@ConfigurationProperties("mqtt") +@IntegrationComponentScan(basePackages = "com.huaxing.mqtt.*") +public class MqttConfiguration { + + /** + * 用户名 + */ + private String username; + + /** + * 密码 + */ + private String password; + + /** + * 连接地址 + */ + private String hostUrl; + + /** + * 客户Id + */ + private String clientId; + + /** + * 默认连接话题 + */ + private String defaultTopic; + + /** + * 超时时间 + */ + private int timeout; + + /** + * qos + */ + private int qos; + + /** + * 订阅超时时间 + */ + private int completionTimeout; + + /** + * 保持连接数 + */ + private int keepalive; + + /** + * 2024-12-06新增 + * 要订阅的其他主题 + */ + private List topics; + + /** + * 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息 + */ + private static final byte[] WILL_DATA = "offline".getBytes(); + + + /** + * 获取所有订阅的主题 + * @return 所有订阅的主题 + */ + public String[] getAllTopics() { + // 校验配置文件是否配置 + if (CollectionUtils.isEmpty(topics)) { + this.topics = new ArrayList<>(); + } + // 将默认主题条件到其他主题里 + this.topics.add(defaultTopic); + // 返回主题数组 + return topics.toArray(new String[0]); + } + + /** + * 注册MQTT客户端工厂 + * @return MqttPahoClientFactory + */ + @Bean + public MqttPahoClientFactory mqttClientFactory() { + // 客户端工厂 + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + + MqttConnectOptions options = new MqttConnectOptions(); + // 设置连接的用户名 + options.setUserName(username); + // 设置连接的密码 + options.setPassword(password.toCharArray()); + // 设置连接的地址 + options.setServerURIs(new String[]{hostUrl}); + + // 如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持: + // 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。 + // 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。 + options.setCleanSession(true); + + // 设置超时时间,该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。 + // 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。 + options.setConnectionTimeout(10); + + // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线 + // 此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0 + // 但这个方法并没有重连的机制 + options.setKeepAliveInterval(20); + + // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。 + options.setWill("willTopic", WILL_DATA, 2, false); + + //自动重新连接 + options.setAutomaticReconnect(true); + factory.setConnectionOptions(options); + + log.info("初始化 MQTT 配置"); + + return factory; + } +} diff --git a/data-storage/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java b/data-storage/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java new file mode 100644 index 0000000..ac525f3 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java @@ -0,0 +1,96 @@ +package com.huaxing.mqtt.config; + +import com.huaxing.mqtt.constant.MqttConstant; +import com.huaxing.mqtt.processor.MqttMessageReceiver; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +import java.util.UUID; + + +/** + * MQTT消费者配置 + */ +@Slf4j +@Configuration +@IntegrationComponentScan +public class MqttConsumerConfiguration { + + final MqttConfiguration mqttConfiguration; + final MqttMessageReceiver mqttMessageReceiver; + + public MqttConsumerConfiguration(MqttConfiguration mqttConfiguration, MqttMessageReceiver mqttMessageReceiver) { + this.mqttConfiguration = mqttConfiguration; + this.mqttMessageReceiver = mqttMessageReceiver; + } + + + /** + * 此处可以使用其他消息通道 + * MQTT信息通道(消费者) + * Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。 + */ + @Bean + public MessageChannel mqttInBoundChannel() { + return new DirectChannel(); + } + + /** + * mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。 + */ + @Bean + @ServiceActivator(inputChannel = "mqttInBoundChannel") + public MessageHandler mqttMessageHandler() { + return this.mqttMessageReceiver; + } + + /** + * MQTT消息订阅绑定(消费者) + * 适配器, 两个topic共用一个adapter + * 客户端作为消费者,订阅主题,消费消息 + */ + @Bean + public MessageProducerSupport mqttInbound() { + // 获取客户端id + String clientId = mqttConfiguration.getClientId(); + // 获取默认主题 +// String defaultTopic = mqttConfiguration.getDefaultTopic(); + + // 获取所有配置的主题 + String[] topics = mqttConfiguration.getAllTopics(); + + // 获取客户端工厂 + MqttPahoClientFactory mqttPahoClientFactory = mqttConfiguration.mqttClientFactory(); + + // Paho客户端消息驱动通道适配器,主要用来订阅主题 + MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( + UUID.randomUUID() + clientId + MqttConstant.CLIENT_SUFFIX_CONSUMERS, + mqttPahoClientFactory, + // 所有需要订阅的topic + topics + ); + adapter.setCompletionTimeout(mqttConfiguration.getCompletionTimeout()); + // Paho消息转换器 + DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(); + // 按字节接收消息 + // defaultPahoMessageConverter.setPayloadAsBytes(true); + adapter.setConverter(defaultPahoMessageConverter); + // 设置QoS + adapter.setQos(mqttConfiguration.getQos()); + // 设置订阅通道 + adapter.setOutputChannel(mqttInBoundChannel()); + return adapter; + } + +} + diff --git a/data-storage/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java b/data-storage/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java new file mode 100644 index 0000000..15a9b55 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java @@ -0,0 +1,65 @@ +package com.huaxing.mqtt.config; + +import com.huaxing.mqtt.constant.MqttConstant; +import jakarta.annotation.Resource; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + + + +/** + * MQTT生产者配置 + */ +@Slf4j +@Configuration +@AllArgsConstructor +public class MqttProducerConfiguration { + + @Resource + private MqttConfiguration mqttConfiguration; + + /** + * MQTT信息通道(生产者) + */ + @Bean + public MessageChannel mqttOutboundChannel() { + return new DirectChannel(); + } + + /** + * MQTT消息处理器(生产者) + */ + @Bean + @ServiceActivator(inputChannel = "mqttOutboundChannel") + public MessageHandler mqttOutbound() { + // 客户端id + String clientId = mqttConfiguration.getClientId(); + // 默认主题 + String defaultTopic = mqttConfiguration.getDefaultTopic(); + MqttPahoClientFactory mqttPahoClientFactory = mqttConfiguration.mqttClientFactory(); + + // 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory + MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + MqttConstant.CLIENT_SUFFIX_PRODUCERS, mqttPahoClientFactory); + // true,异步,发送消息时将不会阻塞。 + messageHandler.setAsync(true); + messageHandler.setDefaultTopic(defaultTopic); + // 默认QoS + messageHandler.setDefaultQos(1); + // Paho消息转换器 + DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(); + // defaultPahoMessageConverter.setPayloadAsBytes(true); + // 发送默认按字节类型发送消息 + messageHandler.setConverter(defaultPahoMessageConverter); + return messageHandler; + } + +} diff --git a/data-storage/src/main/java/com/huaxing/mqtt/constant/MqttConstant.java b/data-storage/src/main/java/com/huaxing/mqtt/constant/MqttConstant.java new file mode 100644 index 0000000..8266076 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/mqtt/constant/MqttConstant.java @@ -0,0 +1,17 @@ +package com.huaxing.mqtt.constant; +/** + * 常量 + */ +public class MqttConstant { + + /** + * 客户端id消费者后缀 + */ + public static final String CLIENT_SUFFIX_CONSUMERS = "_consumers"; + /** + * 客户端id生产者后缀 + */ + public static final String CLIENT_SUFFIX_PRODUCERS = "_producers"; + +} + diff --git a/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java b/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java new file mode 100644 index 0000000..89c4fa2 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java @@ -0,0 +1,46 @@ +package com.huaxing.mqtt.processor; + +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; + +/** + * 生产者处理器 + */ +@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") +public interface MqttGateway { + + /** + * 发送mqtt消息 + * + * @param topic 主题 + * @param payload 内容 + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); + + /** + * 发送包含qos的消息 + * + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); + + /** + * 发送包含qos的消息 + * + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); +} diff --git a/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java b/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java new file mode 100644 index 0000000..6154d4b --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java @@ -0,0 +1,66 @@ +package com.huaxing.mqtt.processor; + +import com.huaxing.data.storage.service.IDataAnalysisService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import java.util.concurrent.CompletableFuture; + +/** + * 消费者处理器 + */ +@Slf4j +@Component +@SuppressWarnings("all") +public class MqttMessageReceiver implements MessageHandler { + + /** + * 数据处理服务 + */ + final IDataAnalysisService analysisService; + public MqttMessageReceiver(IDataAnalysisService analysisService) { + this.analysisService = analysisService; + } + + /** + * 消息处理 + * + * @param message 消息 + * @throws MessagingException 消息异常 + */ + @Override + @Async("handleMessage") + public void handleMessage(Message message) throws MessagingException { + try { + // 获取消息Topic + MessageHeaders headers = message.getHeaders(); + String topic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC)); + log.info("[获取到的消息的topic]:{} ", topic); + // 获取消息体 + String payload = (String) message.getPayload(); + log.info("[获取到的消息的payload]:{} ", payload); + // 数据库入库消息 + if (topic.contains("iot/test1/in-storage")) { + // 模拟1000000条数据入库 + log.info("接收到iot/test1/in-storage的消息啦,快去处理"); + long l = System.currentTimeMillis(); + System.currentTimeMillis(); + CompletableFuture future = CompletableFuture.runAsync(() -> { + analysisService.parseStoreData(payload); + }); + long l1 = System.currentTimeMillis(); + log.info("入库完成,耗时:{}", l1 - l); +// analysisService.parseStoreData(payload); + } else if (topic.contains("table-update/")){} // TODO 表更新topic + } catch (Exception e) { + log.error(e.toString()); + } + } +} + diff --git a/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java b/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java new file mode 100644 index 0000000..27cc2f8 --- /dev/null +++ b/data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java @@ -0,0 +1,52 @@ +package com.huaxing.mqtt.processor; + +import org.json.JSONObject; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + + +/** + * 生产者处理器 + */ +@Component +public class MqttMessageSender { + + @Autowired + private MqttGateway mqttGateway; + + + /** + * 发送mqtt消息 + * + * @param topic 主题 + * @param message 内容 + * @return void + */ + public void send(String topic, String message) { + mqttGateway.sendToMqtt(topic, message); + } + + /** + * 发送包含qos的消息 + * + * @param topic 主题 + * @param qos 质量 + * @param messageBody 消息体 + * @return void + */ + public void send(String topic, int qos, JSONObject messageBody) { + mqttGateway.sendToMqtt(topic, qos, messageBody.toString()); + } + + /** + * 发送包含qos的消息 + * + * @param topic 主题 + * @param qos 质量 + * @param message 消息体 + * @return void + */ + public void send(String topic, int qos, byte[] message) { + mqttGateway.sendToMqtt(topic, qos, message); + } +} diff --git a/data-storage/src/main/resources/application.yml b/data-storage/src/main/resources/application.yml new file mode 100644 index 0000000..c7edbd7 --- /dev/null +++ b/data-storage/src/main/resources/application.yml @@ -0,0 +1,48 @@ + +server: + port: 8088 +spring: + application: + name: iot-data-bridge + datasource: + url: jdbc:dolphindb://localhost:8848?databasePath=dfs://ZbDB + username: admin + password: 123456 + driver-class-name: com.dolphindb.jdbc.Driver + main: + lazy-initialization: false + +mybatis: + mapper-locations: classpath:com/huaxing/data/storage/mapper/*.xml + type-aliases-package: com.huaxing.data.storage.entity + configuration: + map-underscore-to-camel-case: true + sql-session-factory: + data-source: ${spring.datasource} + sql-session-template: + executor-type: BATCH + sql-session-factory-ref: sqlSessionFactory + +mqtt: + username: admin + password: 123456 + host-url: tcp://8.130.65.74:1883 + client-id: iot + timeout: 100 + keepalive: 100 + completion-timeout: 5000 + qos: 1 + default-topic: iot/data/# + topics: + - iot/test1/# + - iot/test2/# + +dolphindb: + db-path: dfs://ZbDB + host: 127.0.0.1 + port: 8848 + username: admin + password: 123456 + init-pool-size: 10 + minimum-pool-size: 5 + enable-high-availability: false \ No newline at end of file diff --git a/data-storage/src/main/resources/mapper/DeviceDataStoredDfs.xml b/data-storage/src/main/resources/mapper/DeviceDataStoredDfs.xml new file mode 100644 index 0000000..8368f50 --- /dev/null +++ b/data-storage/src/main/resources/mapper/DeviceDataStoredDfs.xml @@ -0,0 +1,79 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/data-storage/src/main/resources/mapper/DeviceDataStoredStream.xml b/data-storage/src/main/resources/mapper/DeviceDataStoredStream.xml new file mode 100644 index 0000000..1d6785a --- /dev/null +++ b/data-storage/src/main/resources/mapper/DeviceDataStoredStream.xml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/data-storage/src/main/resources/sqlScripts.properties b/data-storage/src/main/resources/sqlScripts.properties new file mode 100644 index 0000000..0688f2e --- /dev/null +++ b/data-storage/src/main/resources/sqlScripts.properties @@ -0,0 +1,3 @@ +create.table.script=CREATE TABLE {0} (id INT PRIMARY KEY, name VARCHAR(255)) +add.column.script=ALTER TABLE {0} ADD COLUMN age INT +drop.column.script=ALTER TABLE {0} DROP COLUMN name \ No newline at end of file diff --git a/data-storage/src/main/resources/补水箱.script b/data-storage/src/main/resources/补水箱.script new file mode 100644 index 0000000..7f27abe --- /dev/null +++ b/data-storage/src/main/resources/补水箱.script @@ -0,0 +1,71 @@ +//分布式库名 +dbPath = "dfs://ZbDB" +//流表名 +stName = "WaterMeterTsetStream" +//分区表名 +ptName = "WaterMeterTsetDfs" + +//建库函数 +def createDB(dbPath){ + print('正在初始化数据库' + dbPath) + if (existsDatabase(dbPath)) { + print('当前数据库已存在,加载数据库:' + dbPath) + db = database(dbPath) + print('已加载数据库:' + dbPath) + return db + } + else { + print('当前数据库不存在,创建数据库:' + dbPath) + value = 2025.01M..2040.12M + db = database(dbPath, VALUE, value,,engine='OLAP') + print('已完成创建数据库:'+ dbPath) + return db + } +} +//建流表函数 +def createST(stName){ + if(not existsStreamTable(stName)){ + print('共享变量未定义,创建共享表:' + stName) + print(`正在创建Demo流表) + colNames = `time`projectId`deviceId`WM_WFA`WM_WFA_Unit + colTypes = [DATETIME,STRING,STRING,DECIMAL64(4),STRING] + st = streamTable(150000:0, colNames, colTypes) + print(`完成创建Demo流表) + enableTableShareAndPersistence(table=st, tableName=stName, retentionMinutes=10, cacheSize=100000, preCache=100000 ) + print('完成创建共享表') + return st + } + else{ + print('共享变量已存在,加载共享变量:' + stName) + return objByName(stName, true) + } +} +//建分区表函数 +def createPT(dbPath,ptName){ + db = createDB(dbPath) + if (existsTable(dbPath, ptName)){ + print('分区表已存在,加载分区表:' + ptName) + pt = loadTable(db,ptName) + return pt + } + else{ + print('分区表不存在,创建分区表:' + ptName) + print(`正在创建Demo分区表) + colNames = `time`projectId`deviceId`WM_WFA`WM_WFA_Unit + colTypes = [DATETIME,STRING,STRING,DECIMAL64(4),STRING] + t = table(1:0, colNames, colTypes) + pt = db.createPartitionedTable(table=t, tableName=ptName, partitionColumns=`time, sortColumns=`deviceId`time, keepDuplicates=LAST) + print('完成创建分区表:' + ptName) + return pt + } +} +//创建库 +//createDB(dbPath) +//创建分区表 +createPT(dbPath,ptName) +//创建流表 +unsubscribeTable(tableName=stName, actionName=`WaterMeterTsetChgTime) +// dropStreamTable(stName) +createST(stName) +//订阅流表 +subscribeTable(tableName=stName, actionName=`WaterMeterTsetChgTime, offset=-1, handler=loadTable(dbPath,ptName), msgAsTable=true) \ No newline at end of file diff --git a/mvnw b/mvnw new file mode 100644 index 0000000..19529dd --- /dev/null +++ b/mvnw @@ -0,0 +1,259 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Apache Maven Wrapper startup batch script, version 3.3.2 +# +# Optional ENV vars +# ----------------- +# JAVA_HOME - location of a JDK home dir, required when download maven via java source +# MVNW_REPOURL - repo url base for downloading maven distribution +# MVNW_USERNAME/MVNW_PASSWORD - user and password for downloading maven +# MVNW_VERBOSE - true: enable verbose log; debug: trace the mvnw script; others: silence the output +# ---------------------------------------------------------------------------- + +set -euf +[ "${MVNW_VERBOSE-}" != debug ] || set -x + +# OS specific support. +native_path() { printf %s\\n "$1"; } +case "$(uname)" in +CYGWIN* | MINGW*) + [ -z "${JAVA_HOME-}" ] || JAVA_HOME="$(cygpath --unix "$JAVA_HOME")" + native_path() { cygpath --path --windows "$1"; } + ;; +esac + +# set JAVACMD and JAVACCMD +set_java_home() { + # For Cygwin and MinGW, ensure paths are in Unix format before anything is touched + if [ -n "${JAVA_HOME-}" ]; then + if [ -x "$JAVA_HOME/jre/sh/java" ]; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + JAVACCMD="$JAVA_HOME/jre/sh/javac" + else + JAVACMD="$JAVA_HOME/bin/java" + JAVACCMD="$JAVA_HOME/bin/javac" + + if [ ! -x "$JAVACMD" ] || [ ! -x "$JAVACCMD" ]; then + echo "The JAVA_HOME environment variable is not defined correctly, so mvnw cannot run." >&2 + echo "JAVA_HOME is set to \"$JAVA_HOME\", but \"\$JAVA_HOME/bin/java\" or \"\$JAVA_HOME/bin/javac\" does not exist." >&2 + return 1 + fi + fi + else + JAVACMD="$( + 'set' +e + 'unset' -f command 2>/dev/null + 'command' -v java + )" || : + JAVACCMD="$( + 'set' +e + 'unset' -f command 2>/dev/null + 'command' -v javac + )" || : + + if [ ! -x "${JAVACMD-}" ] || [ ! -x "${JAVACCMD-}" ]; then + echo "The java/javac command does not exist in PATH nor is JAVA_HOME set, so mvnw cannot run." >&2 + return 1 + fi + fi +} + +# hash string like Java String::hashCode +hash_string() { + str="${1:-}" h=0 + while [ -n "$str" ]; do + char="${str%"${str#?}"}" + h=$(((h * 31 + $(LC_CTYPE=C printf %d "'$char")) % 4294967296)) + str="${str#?}" + done + printf %x\\n $h +} + +verbose() { :; } +[ "${MVNW_VERBOSE-}" != true ] || verbose() { printf %s\\n "${1-}"; } + +die() { + printf %s\\n "$1" >&2 + exit 1 +} + +trim() { + # MWRAPPER-139: + # Trims trailing and leading whitespace, carriage returns, tabs, and linefeeds. + # Needed for removing poorly interpreted newline sequences when running in more + # exotic environments such as mingw bash on Windows. + printf "%s" "${1}" | tr -d '[:space:]' +} + +# parse distributionUrl and optional distributionSha256Sum, requires .mvn/wrapper/maven-wrapper.properties +while IFS="=" read -r key value; do + case "${key-}" in + distributionUrl) distributionUrl=$(trim "${value-}") ;; + distributionSha256Sum) distributionSha256Sum=$(trim "${value-}") ;; + esac +done <"${0%/*}/.mvn/wrapper/maven-wrapper.properties" +[ -n "${distributionUrl-}" ] || die "cannot read distributionUrl property in ${0%/*}/.mvn/wrapper/maven-wrapper.properties" + +case "${distributionUrl##*/}" in +maven-mvnd-*bin.*) + MVN_CMD=mvnd.sh _MVNW_REPO_PATTERN=/maven/mvnd/ + case "${PROCESSOR_ARCHITECTURE-}${PROCESSOR_ARCHITEW6432-}:$(uname -a)" in + *AMD64:CYGWIN* | *AMD64:MINGW*) distributionPlatform=windows-amd64 ;; + :Darwin*x86_64) distributionPlatform=darwin-amd64 ;; + :Darwin*arm64) distributionPlatform=darwin-aarch64 ;; + :Linux*x86_64*) distributionPlatform=linux-amd64 ;; + *) + echo "Cannot detect native platform for mvnd on $(uname)-$(uname -m), use pure java version" >&2 + distributionPlatform=linux-amd64 + ;; + esac + distributionUrl="${distributionUrl%-bin.*}-$distributionPlatform.zip" + ;; +maven-mvnd-*) MVN_CMD=mvnd.sh _MVNW_REPO_PATTERN=/maven/mvnd/ ;; +*) MVN_CMD="mvn${0##*/mvnw}" _MVNW_REPO_PATTERN=/org/apache/maven/ ;; +esac + +# apply MVNW_REPOURL and calculate MAVEN_HOME +# maven home pattern: ~/.m2/wrapper/dists/{apache-maven-,maven-mvnd--}/ +[ -z "${MVNW_REPOURL-}" ] || distributionUrl="$MVNW_REPOURL$_MVNW_REPO_PATTERN${distributionUrl#*"$_MVNW_REPO_PATTERN"}" +distributionUrlName="${distributionUrl##*/}" +distributionUrlNameMain="${distributionUrlName%.*}" +distributionUrlNameMain="${distributionUrlNameMain%-bin}" +MAVEN_USER_HOME="${MAVEN_USER_HOME:-${HOME}/.m2}" +MAVEN_HOME="${MAVEN_USER_HOME}/wrapper/dists/${distributionUrlNameMain-}/$(hash_string "$distributionUrl")" + +exec_maven() { + unset MVNW_VERBOSE MVNW_USERNAME MVNW_PASSWORD MVNW_REPOURL || : + exec "$MAVEN_HOME/bin/$MVN_CMD" "$@" || die "cannot exec $MAVEN_HOME/bin/$MVN_CMD" +} + +if [ -d "$MAVEN_HOME" ]; then + verbose "found existing MAVEN_HOME at $MAVEN_HOME" + exec_maven "$@" +fi + +case "${distributionUrl-}" in +*?-bin.zip | *?maven-mvnd-?*-?*.zip) ;; +*) die "distributionUrl is not valid, must match *-bin.zip or maven-mvnd-*.zip, but found '${distributionUrl-}'" ;; +esac + +# prepare tmp dir +if TMP_DOWNLOAD_DIR="$(mktemp -d)" && [ -d "$TMP_DOWNLOAD_DIR" ]; then + clean() { rm -rf -- "$TMP_DOWNLOAD_DIR"; } + trap clean HUP INT TERM EXIT +else + die "cannot create temp dir" +fi + +mkdir -p -- "${MAVEN_HOME%/*}" + +# Download and Install Apache Maven +verbose "Couldn't find MAVEN_HOME, downloading and installing it ..." +verbose "Downloading from: $distributionUrl" +verbose "Downloading to: $TMP_DOWNLOAD_DIR/$distributionUrlName" + +# select .zip or .tar.gz +if ! command -v unzip >/dev/null; then + distributionUrl="${distributionUrl%.zip}.tar.gz" + distributionUrlName="${distributionUrl##*/}" +fi + +# verbose opt +__MVNW_QUIET_WGET=--quiet __MVNW_QUIET_CURL=--silent __MVNW_QUIET_UNZIP=-q __MVNW_QUIET_TAR='' +[ "${MVNW_VERBOSE-}" != true ] || __MVNW_QUIET_WGET='' __MVNW_QUIET_CURL='' __MVNW_QUIET_UNZIP='' __MVNW_QUIET_TAR=v + +# normalize http auth +case "${MVNW_PASSWORD:+has-password}" in +'') MVNW_USERNAME='' MVNW_PASSWORD='' ;; +has-password) [ -n "${MVNW_USERNAME-}" ] || MVNW_USERNAME='' MVNW_PASSWORD='' ;; +esac + +if [ -z "${MVNW_USERNAME-}" ] && command -v wget >/dev/null; then + verbose "Found wget ... using wget" + wget ${__MVNW_QUIET_WGET:+"$__MVNW_QUIET_WGET"} "$distributionUrl" -O "$TMP_DOWNLOAD_DIR/$distributionUrlName" || die "wget: Failed to fetch $distributionUrl" +elif [ -z "${MVNW_USERNAME-}" ] && command -v curl >/dev/null; then + verbose "Found curl ... using curl" + curl ${__MVNW_QUIET_CURL:+"$__MVNW_QUIET_CURL"} -f -L -o "$TMP_DOWNLOAD_DIR/$distributionUrlName" "$distributionUrl" || die "curl: Failed to fetch $distributionUrl" +elif set_java_home; then + verbose "Falling back to use Java to download" + javaSource="$TMP_DOWNLOAD_DIR/Downloader.java" + targetZip="$TMP_DOWNLOAD_DIR/$distributionUrlName" + cat >"$javaSource" <<-END + public class Downloader extends java.net.Authenticator + { + protected java.net.PasswordAuthentication getPasswordAuthentication() + { + return new java.net.PasswordAuthentication( System.getenv( "MVNW_USERNAME" ), System.getenv( "MVNW_PASSWORD" ).toCharArray() ); + } + public static void main( String[] args ) throws Exception + { + setDefault( new Downloader() ); + java.nio.file.Files.copy( java.net.URI.create( args[0] ).toURL().openStream(), java.nio.file.Paths.get( args[1] ).toAbsolutePath().normalize() ); + } + } + END + # For Cygwin/MinGW, switch paths to Windows format before running javac and java + verbose " - Compiling Downloader.java ..." + "$(native_path "$JAVACCMD")" "$(native_path "$javaSource")" || die "Failed to compile Downloader.java" + verbose " - Running Downloader.java ..." + "$(native_path "$JAVACMD")" -cp "$(native_path "$TMP_DOWNLOAD_DIR")" Downloader "$distributionUrl" "$(native_path "$targetZip")" +fi + +# If specified, validate the SHA-256 sum of the Maven distribution zip file +if [ -n "${distributionSha256Sum-}" ]; then + distributionSha256Result=false + if [ "$MVN_CMD" = mvnd.sh ]; then + echo "Checksum validation is not supported for maven-mvnd." >&2 + echo "Please disable validation by removing 'distributionSha256Sum' from your maven-wrapper.properties." >&2 + exit 1 + elif command -v sha256sum >/dev/null; then + if echo "$distributionSha256Sum $TMP_DOWNLOAD_DIR/$distributionUrlName" | sha256sum -c >/dev/null 2>&1; then + distributionSha256Result=true + fi + elif command -v shasum >/dev/null; then + if echo "$distributionSha256Sum $TMP_DOWNLOAD_DIR/$distributionUrlName" | shasum -a 256 -c >/dev/null 2>&1; then + distributionSha256Result=true + fi + else + echo "Checksum validation was requested but neither 'sha256sum' or 'shasum' are available." >&2 + echo "Please install either command, or disable validation by removing 'distributionSha256Sum' from your maven-wrapper.properties." >&2 + exit 1 + fi + if [ $distributionSha256Result = false ]; then + echo "Error: Failed to validate Maven distribution SHA-256, your Maven distribution might be compromised." >&2 + echo "If you updated your Maven version, you need to update the specified distributionSha256Sum property." >&2 + exit 1 + fi +fi + +# unzip and move +if command -v unzip >/dev/null; then + unzip ${__MVNW_QUIET_UNZIP:+"$__MVNW_QUIET_UNZIP"} "$TMP_DOWNLOAD_DIR/$distributionUrlName" -d "$TMP_DOWNLOAD_DIR" || die "failed to unzip" +else + tar xzf${__MVNW_QUIET_TAR:+"$__MVNW_QUIET_TAR"} "$TMP_DOWNLOAD_DIR/$distributionUrlName" -C "$TMP_DOWNLOAD_DIR" || die "failed to untar" +fi +printf %s\\n "$distributionUrl" >"$TMP_DOWNLOAD_DIR/$distributionUrlNameMain/mvnw.url" +mv -- "$TMP_DOWNLOAD_DIR/$distributionUrlNameMain" "$MAVEN_HOME" || [ -d "$MAVEN_HOME" ] || die "fail to move MAVEN_HOME" + +clean || : +exec_maven "$@" diff --git a/mvnw.cmd b/mvnw.cmd new file mode 100644 index 0000000..249bdf3 --- /dev/null +++ b/mvnw.cmd @@ -0,0 +1,149 @@ +<# : batch portion +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Apache Maven Wrapper startup batch script, version 3.3.2 +@REM +@REM Optional ENV vars +@REM MVNW_REPOURL - repo url base for downloading maven distribution +@REM MVNW_USERNAME/MVNW_PASSWORD - user and password for downloading maven +@REM MVNW_VERBOSE - true: enable verbose log; others: silence the output +@REM ---------------------------------------------------------------------------- + +@IF "%__MVNW_ARG0_NAME__%"=="" (SET __MVNW_ARG0_NAME__=%~nx0) +@SET __MVNW_CMD__= +@SET __MVNW_ERROR__= +@SET __MVNW_PSMODULEP_SAVE=%PSModulePath% +@SET PSModulePath= +@FOR /F "usebackq tokens=1* delims==" %%A IN (`powershell -noprofile "& {$scriptDir='%~dp0'; $script='%__MVNW_ARG0_NAME__%'; icm -ScriptBlock ([Scriptblock]::Create((Get-Content -Raw '%~f0'))) -NoNewScope}"`) DO @( + IF "%%A"=="MVN_CMD" (set __MVNW_CMD__=%%B) ELSE IF "%%B"=="" (echo %%A) ELSE (echo %%A=%%B) +) +@SET PSModulePath=%__MVNW_PSMODULEP_SAVE% +@SET __MVNW_PSMODULEP_SAVE= +@SET __MVNW_ARG0_NAME__= +@SET MVNW_USERNAME= +@SET MVNW_PASSWORD= +@IF NOT "%__MVNW_CMD__%"=="" (%__MVNW_CMD__% %*) +@echo Cannot start maven from wrapper >&2 && exit /b 1 +@GOTO :EOF +: end batch / begin powershell #> + +$ErrorActionPreference = "Stop" +if ($env:MVNW_VERBOSE -eq "true") { + $VerbosePreference = "Continue" +} + +# calculate distributionUrl, requires .mvn/wrapper/maven-wrapper.properties +$distributionUrl = (Get-Content -Raw "$scriptDir/.mvn/wrapper/maven-wrapper.properties" | ConvertFrom-StringData).distributionUrl +if (!$distributionUrl) { + Write-Error "cannot read distributionUrl property in $scriptDir/.mvn/wrapper/maven-wrapper.properties" +} + +switch -wildcard -casesensitive ( $($distributionUrl -replace '^.*/','') ) { + "maven-mvnd-*" { + $USE_MVND = $true + $distributionUrl = $distributionUrl -replace '-bin\.[^.]*$',"-windows-amd64.zip" + $MVN_CMD = "mvnd.cmd" + break + } + default { + $USE_MVND = $false + $MVN_CMD = $script -replace '^mvnw','mvn' + break + } +} + +# apply MVNW_REPOURL and calculate MAVEN_HOME +# maven home pattern: ~/.m2/wrapper/dists/{apache-maven-,maven-mvnd--}/ +if ($env:MVNW_REPOURL) { + $MVNW_REPO_PATTERN = if ($USE_MVND) { "/org/apache/maven/" } else { "/maven/mvnd/" } + $distributionUrl = "$env:MVNW_REPOURL$MVNW_REPO_PATTERN$($distributionUrl -replace '^.*'+$MVNW_REPO_PATTERN,'')" +} +$distributionUrlName = $distributionUrl -replace '^.*/','' +$distributionUrlNameMain = $distributionUrlName -replace '\.[^.]*$','' -replace '-bin$','' +$MAVEN_HOME_PARENT = "$HOME/.m2/wrapper/dists/$distributionUrlNameMain" +if ($env:MAVEN_USER_HOME) { + $MAVEN_HOME_PARENT = "$env:MAVEN_USER_HOME/wrapper/dists/$distributionUrlNameMain" +} +$MAVEN_HOME_NAME = ([System.Security.Cryptography.MD5]::Create().ComputeHash([byte[]][char[]]$distributionUrl) | ForEach-Object {$_.ToString("x2")}) -join '' +$MAVEN_HOME = "$MAVEN_HOME_PARENT/$MAVEN_HOME_NAME" + +if (Test-Path -Path "$MAVEN_HOME" -PathType Container) { + Write-Verbose "found existing MAVEN_HOME at $MAVEN_HOME" + Write-Output "MVN_CMD=$MAVEN_HOME/bin/$MVN_CMD" + exit $? +} + +if (! $distributionUrlNameMain -or ($distributionUrlName -eq $distributionUrlNameMain)) { + Write-Error "distributionUrl is not valid, must end with *-bin.zip, but found $distributionUrl" +} + +# prepare tmp dir +$TMP_DOWNLOAD_DIR_HOLDER = New-TemporaryFile +$TMP_DOWNLOAD_DIR = New-Item -Itemtype Directory -Path "$TMP_DOWNLOAD_DIR_HOLDER.dir" +$TMP_DOWNLOAD_DIR_HOLDER.Delete() | Out-Null +trap { + if ($TMP_DOWNLOAD_DIR.Exists) { + try { Remove-Item $TMP_DOWNLOAD_DIR -Recurse -Force | Out-Null } + catch { Write-Warning "Cannot remove $TMP_DOWNLOAD_DIR" } + } +} + +New-Item -Itemtype Directory -Path "$MAVEN_HOME_PARENT" -Force | Out-Null + +# Download and Install Apache Maven +Write-Verbose "Couldn't find MAVEN_HOME, downloading and installing it ..." +Write-Verbose "Downloading from: $distributionUrl" +Write-Verbose "Downloading to: $TMP_DOWNLOAD_DIR/$distributionUrlName" + +$webclient = New-Object System.Net.WebClient +if ($env:MVNW_USERNAME -and $env:MVNW_PASSWORD) { + $webclient.Credentials = New-Object System.Net.NetworkCredential($env:MVNW_USERNAME, $env:MVNW_PASSWORD) +} +[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12 +$webclient.DownloadFile($distributionUrl, "$TMP_DOWNLOAD_DIR/$distributionUrlName") | Out-Null + +# If specified, validate the SHA-256 sum of the Maven distribution zip file +$distributionSha256Sum = (Get-Content -Raw "$scriptDir/.mvn/wrapper/maven-wrapper.properties" | ConvertFrom-StringData).distributionSha256Sum +if ($distributionSha256Sum) { + if ($USE_MVND) { + Write-Error "Checksum validation is not supported for maven-mvnd. `nPlease disable validation by removing 'distributionSha256Sum' from your maven-wrapper.properties." + } + Import-Module $PSHOME\Modules\Microsoft.PowerShell.Utility -Function Get-FileHash + if ((Get-FileHash "$TMP_DOWNLOAD_DIR/$distributionUrlName" -Algorithm SHA256).Hash.ToLower() -ne $distributionSha256Sum) { + Write-Error "Error: Failed to validate Maven distribution SHA-256, your Maven distribution might be compromised. If you updated your Maven version, you need to update the specified distributionSha256Sum property." + } +} + +# unzip and move +Expand-Archive "$TMP_DOWNLOAD_DIR/$distributionUrlName" -DestinationPath "$TMP_DOWNLOAD_DIR" | Out-Null +Rename-Item -Path "$TMP_DOWNLOAD_DIR/$distributionUrlNameMain" -NewName $MAVEN_HOME_NAME | Out-Null +try { + Move-Item -Path "$TMP_DOWNLOAD_DIR/$MAVEN_HOME_NAME" -Destination $MAVEN_HOME_PARENT | Out-Null +} catch { + if (! (Test-Path -Path "$MAVEN_HOME" -PathType Container)) { + Write-Error "fail to move MAVEN_HOME" + } +} finally { + try { Remove-Item $TMP_DOWNLOAD_DIR -Recurse -Force | Out-Null } + catch { Write-Warning "Cannot remove $TMP_DOWNLOAD_DIR" } +} + +Write-Output "MVN_CMD=$MAVEN_HOME/bin/$MVN_CMD" diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..fe572b1 --- /dev/null +++ b/pom.xml @@ -0,0 +1,37 @@ + + + 4.0.0 + + com.huaxing + data-bridge + 0.0.1-SNAPSHOT + pom + data-bridge + data-bridge + + + data-common + data-storage + + + + + + + aliyunmaven + 阿里云公共仓库 + https://maven.aliyun.com/repository/public + + + true + + + + false + + + + + +