Browse Source

项目初始化提交

tags/v2.0
swordmeng 1 month ago
parent
commit
824c5f5e42
  1. 2
      .gitattributes
  2. 33
      .gitignore
  3. 19
      .mvn/wrapper/maven-wrapper.properties
  4. 14
      data-common/pom.xml
  5. 15
      data-common/src/main/java/com/huaxing/common/constant/AppConstant.java
  6. 111
      data-storage/pom.xml
  7. 16
      data-storage/src/main/java/com/huaxing/IotDataBridgeApplication.java
  8. 25
      data-storage/src/main/java/com/huaxing/TestDBConnection.java
  9. 31
      data-storage/src/main/java/com/huaxing/data/dolphindb/MyBatisConfig.java
  10. 75
      data-storage/src/main/java/com/huaxing/data/dolphindb/base/CommonService.java
  11. 69
      data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbConfiguration.java
  12. 29
      data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbPoolConfiguration.java
  13. 30
      data-storage/src/main/java/com/huaxing/data/dolphindb/connection/AbstractDbConnector.java
  14. 41
      data-storage/src/main/java/com/huaxing/data/dolphindb/connection/DbConnectorHelper.java
  15. 130
      data-storage/src/main/java/com/huaxing/data/storage/controller/TestController.java
  16. 34
      data-storage/src/main/java/com/huaxing/data/storage/domain/DataAnalysisDTO.java
  17. 27
      data-storage/src/main/java/com/huaxing/data/storage/domain/DataQueryDTO.java
  18. 21
      data-storage/src/main/java/com/huaxing/data/storage/mapper/IDeviceDataQueryDfsMapper.java
  19. 21
      data-storage/src/main/java/com/huaxing/data/storage/mapper/IDeviceDataQueryStreamMapper.java
  20. 20
      data-storage/src/main/java/com/huaxing/data/storage/mapper/IDeviceDataStoredMapper.java
  21. 13
      data-storage/src/main/java/com/huaxing/data/storage/service/IDataAnalysisService.java
  22. 18
      data-storage/src/main/java/com/huaxing/data/storage/service/IDeviceDataQueryDfsService.java
  23. 18
      data-storage/src/main/java/com/huaxing/data/storage/service/IDeviceDataQueryStreamService.java
  24. 19
      data-storage/src/main/java/com/huaxing/data/storage/service/IDeviceDataStoredService.java
  25. 25
      data-storage/src/main/java/com/huaxing/data/storage/service/abstracts/SqlConverterStatement.java
  26. 19
      data-storage/src/main/java/com/huaxing/data/storage/service/base/IDbSqlFactory.java
  27. 71
      data-storage/src/main/java/com/huaxing/data/storage/service/base/SqlConverterStatementHandle.java
  28. 46
      data-storage/src/main/java/com/huaxing/data/storage/service/impl/DataAnalysisService.java
  29. 39
      data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataQueryDfsServiceImpl.java
  30. 39
      data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataQueryStreamServiceImpl.java
  31. 52
      data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataStoredServiceImpl.java
  32. 28
      data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableColumnDTO.java
  33. 29
      data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableDTO.java
  34. 40
      data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableStructureService.java
  35. 65
      data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableStructureService.java
  36. 77
      data-storage/src/main/java/com/huaxing/data/util/JacksonUtil.java
  37. 0
      data-storage/src/main/java/com/huaxing/mqtt/ReadMe
  38. 143
      data-storage/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java
  39. 96
      data-storage/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java
  40. 65
      data-storage/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java
  41. 17
      data-storage/src/main/java/com/huaxing/mqtt/constant/MqttConstant.java
  42. 46
      data-storage/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java
  43. 66
      data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java
  44. 52
      data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java
  45. 48
      data-storage/src/main/resources/application.yml
  46. 79
      data-storage/src/main/resources/mapper/DeviceDataStoredDfs.xml
  47. 9
      data-storage/src/main/resources/mapper/DeviceDataStoredStream.xml
  48. 3
      data-storage/src/main/resources/sqlScripts.properties
  49. 71
      data-storage/src/main/resources/补水箱.script
  50. 259
      mvnw
  51. 149
      mvnw.cmd
  52. 37
      pom.xml

2
.gitattributes

@ -0,0 +1,2 @@
/mvnw text eol=lf
*.cmd text eol=crlf

33
.gitignore

@ -0,0 +1,33 @@
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/

19
.mvn/wrapper/maven-wrapper.properties

@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
wrapperVersion=3.3.2
distributionType=only-script
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.9/apache-maven-3.9.9-bin.zip

14
data-common/pom.xml

@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.huaxing</groupId>
<artifactId>data-bridge</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>data-common</artifactId>
</project>

15
data-common/src/main/java/com/huaxing/common/constant/AppConstant.java

@ -0,0 +1,15 @@
package com.huaxing.common.constant;
/**
* @ProjectName: iot-data-bridge
* @Package: com.huaxing.base.constant
* @ClassName: AppConstant
* @Author: swordmeng8@163.com
* @Description: 系统常量
* @Date: 2025/1/10 10:53
* @Version: 1.0
*/
public class AppConstant {
public static final String APP_NAME = "iot-data-bridge";
}

111
data-storage/pom.xml

@ -0,0 +1,111 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.huaxing</groupId>
<artifactId>data-bridge</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>data-storage</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>RELEASE</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.4.1</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20240303</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-core</artifactId>
<version>3.5.8</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13-beta-3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.9.22.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.18.2</version>
</dependency>
<!--dolphindb 相关-->
<dependency>
<groupId>com.dolphindb</groupId>
<artifactId>jdbc</artifactId>
<version>3.00.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.dolphindb/dolphindb-javaapi -->
<dependency>
<groupId>com.dolphindb</groupId>
<artifactId>dolphindb-javaapi</artifactId>
<version>3.00.2.3</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>6.2.1</version>
</dependency>
<!--mybatis的开发包-->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.5.5</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>3.0.4</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>3.4.1</version>
</dependency>
</dependencies>
</project>

16
data-storage/src/main/java/com/huaxing/IotDataBridgeApplication.java

@ -0,0 +1,16 @@
package com.huaxing;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
public class IotDataBridgeApplication {
public static void main(String[] args) {
SpringApplication.run(IotDataBridgeApplication.class, args);
System.out.println("================= iot-data-bridge started! =================");
}
}

25
data-storage/src/main/java/com/huaxing/TestDBConnection.java

@ -0,0 +1,25 @@
//package com.huaxing;
//
//import com.xxdb.DBConnection;
//import com.xxdb.data.Entity;
//import org.junit.Test;
//
//import java.io.IOException;
//
//public class TestDBConnection {
// // host
// private static final String HOST = "localhost";
// // port
// private static final int PORT = 8848;
//
// DBConnection dbConnection = new DBConnection();
//
// @Test
// public void testDBConnectAndRun() throws IOException {
// dbConnection.connect(HOST, PORT);
// dbConnection.login("admin", "123456", true);
// Entity entity = dbConnection.run("INSERT INTO ZbWaterMeterStream (WM_WFA_Unit, temperature, humidity, time, projectId, deviceId) VALUES ('m3', 5.6, 4.5, 1736388176712, '48', '0jZU2102_0806_0000')");
// Entity entity2 = dbConnection.run("addColumn(loadTable(\"dfs://ZbDB\", \"ZbWaterMeterDfs\"),`test3,STRING)");
// dbConnection.close();
// }
//}

31
data-storage/src/main/java/com/huaxing/data/dolphindb/MyBatisConfig.java

@ -0,0 +1,31 @@
package com.huaxing.data.dolphindb;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import javax.sql.DataSource;
@Configuration
public class MyBatisConfig {
@Bean
public SqlSessionFactory sqlSessionFactory(@Qualifier(value = "dataSource") DataSource dataSource) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
// 设置 Mapper 的 XML 文件位置
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
bean.setMapperLocations(resolver.getResources("classpath:com/huaxing/data/storage/mapper/*.xml"));
return bean.getObject();
}
@Bean
public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
}

75
data-storage/src/main/java/com/huaxing/data/dolphindb/base/CommonService.java

@ -0,0 +1,75 @@
package com.huaxing.data.dolphindb.base;
import com.huaxing.data.dolphindb.connection.AbstractDbConnector;
import com.huaxing.data.storage.service.base.SqlConverterStatementHandle;
import com.xxdb.DBConnection;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.List;
/**
* @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.storage.service.base
* @ClassName: CommonService
* @Author: swordmeng8@163.com
* @Description: 全局通用
* @Date: 2025/1/15 15:19
* @Version: 1.0
*/
@Slf4j
public class CommonService extends SqlConverterStatementHandle {
@Resource
AbstractDbConnector dbPool;
/**
* 执行单条sql
*
* @param sql
*/
public void executeOnce(String sql) {
DBConnection connection = dbPool.getConnection();
try {
connection.run(sql);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
connection.close();
}
}
/**
* 执行单条sql
*
* @param sql
*/
public void exec(String sql) {
DBConnection connection = dbPool.getConnection();
try {
connection.run(sql);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
connection.close();
}
}
/**
* 执行批量sql
* @param sqlList
*/
public void executeBatch(List<String> sqlList) {
DBConnection connection = dbPool.getConnection();
sqlList.forEach(sql -> {
try {
connection.run(sql);
} catch (IOException e) {
log.error("AbstractDbConnector.executeBatch() Method执行异常:{}", e.getMessage());
}
});
connection.close();
}
}

69
data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbConfiguration.java

@ -0,0 +1,69 @@
package com.huaxing.data.dolphindb.config;
import com.xxdb.*;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* mqtt全局相关配置信息
*/
@Slf4j
@Setter
@Getter
@Configuration
@ConfigurationProperties("dolphindb")
@SuppressWarnings("all")
public class DolphinDbConfiguration extends SimpleDBConnectionPoolConfig {
/**
* 连接地址 dfs://ZBdb
*/
private String dbPath;
/**
* 主机名
*/
private String host;
/**
* 端口号
*/
private int port;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 连接池初始化大小
*/
private int initPoolSize;
/**
* 表示连接池中最小连接数正整数默认值为 5
*/
private boolean minimumPoolSize;
/**
* 是否开启高可用
*/
private boolean enableHighAvailability;
/**
* 创建连接池配置初始化方法
* @return SimpleDBConnectionPoolConfig
*/
public SimpleDBConnectionPoolConfig loadConfig() {
SimpleDBConnectionPoolConfig config = new SimpleDBConnectionPoolConfig();
config.setHostName(host);
config.setPort(port);
config.setUserId(username);
config.setPassword(password);
config.setInitialPoolSize(initPoolSize);
config.setEnableHighAvailability(enableHighAvailability);
return config;
}
}

29
data-storage/src/main/java/com/huaxing/data/dolphindb/config/DolphinDbPoolConfiguration.java

@ -0,0 +1,29 @@
package com.huaxing.data.dolphindb.config;
import com.xxdb.*;
import org.springframework.context.annotation.Configuration;
/**
* @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.dolphindb.config
* @ClassName: DbClient
* @Author: swordmeng8@163.com
* @Description: 数据库客户端
* @Date: 2025/1/15 9:58
* @Version: 1.0
*/
@Configuration
public class DolphinDbPoolConfiguration extends SimpleDBConnectionPool {
public DBConnection dbConn = null;
/**
* 构造方法
* @param dbConfiguration
*/
public DolphinDbPoolConfiguration(DolphinDbConfiguration dbConfiguration) {
super(dbConfiguration.loadConfig());
this.dbConn = super.getConnection();
}
}

30
data-storage/src/main/java/com/huaxing/data/dolphindb/connection/AbstractDbConnector.java

@ -0,0 +1,30 @@
package com.huaxing.data.dolphindb.connection;
import com.xxdb.DBConnection;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.List;
/**
* @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.dolphindb.connection
* @ClassName: DbConnector
* @Author: swordmeng8@163.com
* @Description: dolphindb 连接器
* @Date: 2025/1/13 14:29
* @Version: 1.0
*/
@Accessors(chain = true)
@Slf4j
@SuppressWarnings("all")
public abstract class AbstractDbConnector {
/**
* 连接 dolphindb
*/
public abstract DBConnection getConnection();
}

41
data-storage/src/main/java/com/huaxing/data/dolphindb/connection/DbConnectorHelper.java

@ -0,0 +1,41 @@
package com.huaxing.data.dolphindb.connection;
import com.huaxing.data.dolphindb.config.DolphinDbConfiguration;
import com.huaxing.data.dolphindb.config.DolphinDbPoolConfiguration;
import com.xxdb.DBConnection;
import jakarta.annotation.PostConstruct;
import org.springframework.stereotype.Component;
/**
* @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.dolphindb.connection
* @ClassName: DbConnectorHelper
* @Author: swordmeng8@163.com
* @Description: 数据库处理器
* @Date: 2025/1/13 17:53
* @Version: 1.0
*/
@Component
@SuppressWarnings("all")
public class DbConnectorHelper extends AbstractDbConnector {
final DolphinDbPoolConfiguration dbPool;
final DolphinDbConfiguration dbConfiguration;
public DbConnectorHelper(DolphinDbConfiguration dbConfiguration, DolphinDbPoolConfiguration dbPool) {
this.dbConfiguration = dbConfiguration;
this.dbPool = dbPool;
}
/**
* 获取数据库连接
* @return DBConnection
*/
@PostConstruct
@Override
public DBConnection getConnection() {
return dbPool.dbConn;
}
}

130
data-storage/src/main/java/com/huaxing/data/storage/controller/TestController.java

@ -0,0 +1,130 @@
package com.huaxing.data.storage.controller;
import com.huaxing.data.storage.domain.DataAnalysisDTO;
import com.huaxing.data.storage.service.IDeviceDataQueryDfsService;
import com.huaxing.data.storage.service.IDeviceDataQueryStreamService;
import com.huaxing.data.storage.service.IDeviceDataStoredService;
import com.huaxing.data.tablemanagement.service.ITableStructureService;
import com.huaxing.data.util.JacksonUtil;
import com.huaxing.mqtt.processor.MqttMessageSender;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.storage.controller
* @ClassName: TestController
* @Author: swordmeng8@163.com
* @Description: 测试controller
* @Date: 2025/1/15 17:08
* @Version: 1.0
*/
@Slf4j
@RestController
@RequestMapping("/test")
@SuppressWarnings("all")
public class TestController {
@Resource
private MqttMessageSender messageSender;
final IDeviceDataStoredService dataStoredService;
final IDeviceDataQueryDfsService dataQueryDfsService;
final IDeviceDataQueryStreamService dataQueryStreamService;
final ITableStructureService tableStructureService;
// // 依赖注入
public TestController(IDeviceDataStoredService dataStoredService, IDeviceDataQueryDfsService dataQueryDfsService, IDeviceDataQueryStreamService dataQueryStreamService, ITableStructureService tableStructureService) {
this.dataStoredService = dataStoredService;
this.dataQueryDfsService = dataQueryDfsService;
this.dataQueryStreamService = dataQueryStreamService;
this.tableStructureService = tableStructureService;
}
// 测试插入数据
@GetMapping(value = "/testInsert") // 成功
public void testInsert() {
String sql = "INSERT INTO ZbWaterMeter1Stream (time, projectId, deviceId, WM_WFA, WM_WFA_Unit) VALUES (2024.11.01 00:00:00,'48', '0jZU2102_0806_0011', 124.656, 'm³')";
dataStoredService.execute(sql);
log.info("SUCCESS");
}
// 测试Dfs表查询
@GetMapping(value = "/testSelectDfs")
public List<Map<String, Object>> testSelectDfs() {
String dbPath = "dfs://ZbDB";
String sql = String.format("select * from loadTable('%s','%s')", dbPath, "ZbWaterMeter1Dfs");
return dataQueryDfsService.selectList(sql);
}
// 测试订阅流表查询
@GetMapping(value = "/testSelectStream")
public List<Map<String, Object>> testSelectStream() {
String sql = "select * from ZbWaterMeter1Stream";
return dataQueryStreamService.selectList(sql);
}
// 给指定的流表增加列字段
@GetMapping(value = "/testStreamAddColumn")
public void testStreamAddColumn() {
String tableName = "ZbWaterMeter1Stream";
String columnName = "WM_WFA_Unit14";
String columnDefinition = "STRING";
try {
tableStructureService.addStreamColumn(tableName, columnName, columnDefinition);
log.info("SUCCESS");
} catch (Exception e) {
log.error("FAIL");
e.printStackTrace();
}
}
@GetMapping(value = "/testDfsAddColumn")
public void testDfsAddColumn() {
String tableName = "ZbWaterMeter1Dfs";
String columnName = "test2";
String columnDefinition = "STRING";
try {
tableStructureService.addDfsColumn(tableName, columnName, columnDefinition);
log.info("SUCCESS");
} catch (Exception e) {
log.error("FAIL");
e.printStackTrace();
}
}
// 向消息队列中发送100W条数据
@GetMapping(value = "/testSendMessage")
public void testSendMessage() {
for (int i = 0; i < 2; i++) {
List<Map<String, Object>> dataList = new ArrayList<>();
dataList.add(handleMapByIndex(i));
DataAnalysisDTO dataAnalysisDTO = DataAnalysisDTO.builder().tableName( "WaterMeterTsetStream").dataList(dataList).build();
messageSender.send("iot/test1/in-storage", JacksonUtil.objectStr(dataAnalysisDTO));
}
}
private Map<String, Object> handleMapByIndex(int index) {
Map<String, Object> map = new HashMap<>();
map.put("time", "2025.01.01 00:00:00");
map.put("projectId", "0jZU2102");
map.put("deviceId", "0jZU2102_0806_0011");
map.put("WM_WFA", 124.656 + index);
map.put("WM_WFA_Unit", "m³");
return map;
}
}

34
data-storage/src/main/java/com/huaxing/data/storage/domain/DataAnalysisDTO.java

@ -0,0 +1,34 @@
package com.huaxing.data.storage.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import java.util.List;
import java.util.Map;
/**
* @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.storage.domain
* @ClassName: CommonDeviceControl
* @Author: swordmeng8@163.com
* @Description: 设备入库数据接收实体
* @Date: 2025/1/10 18:11
* @Version: 1.0
*/
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
@Data
public class DataAnalysisDTO {
// 表名
private String tableName;
// 设备数据集
private List<Map<String, Object>> dataList;
}

27
data-storage/src/main/java/com/huaxing/data/storage/domain/DataQueryDTO.java

@ -0,0 +1,27 @@
package com.huaxing.data.storage.domain;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.List;
import java.util.Map;
/**
* @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.storage.domain
* @ClassName: CommonDeviceControl
* @Author: swordmeng8@163.com
* @Description: 数据查询DTO
* @Date: 2025/1/10 18:11
* @Version: 1.0
*/
@Accessors(chain = true)
@Data
public class DataQueryDTO {
// 表名
private String tableName;
// 设备数据集
private List<String> tableColumns;
}

21
data-storage/src/main/java/com/huaxing/data/storage/mapper/IDeviceDataQueryDfsMapper.java

@ -0,0 +1,21 @@
package com.huaxing.data.storage.mapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import java.util.List;
import java.util.Map;
/**
* ClassName: IDeviceDataQuery
* Description: 查询Dfs表数据
* @author 孟剑
* @date 2025-01-16 16:49
*/
@Mapper
@SuppressWarnings("all")
public interface IDeviceDataQueryDfsMapper {
@Select("${sql}")
List<Map<String, Object>> selectList(String sql);
}

21
data-storage/src/main/java/com/huaxing/data/storage/mapper/IDeviceDataQueryStreamMapper.java

@ -0,0 +1,21 @@
package com.huaxing.data.storage.mapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import java.util.List;
import java.util.Map;
/**
* ClassName: IDeviceDataQuery
* Description: 查询Stream表数据
* @author 孟剑
* @date 2025-01-16 16:49
*/
@Mapper
@SuppressWarnings("all")
public interface IDeviceDataQueryStreamMapper {
@Select("${sql}")
List<Map<String, Object>> selectList(String sql);
}

20
data-storage/src/main/java/com/huaxing/data/storage/mapper/IDeviceDataStoredMapper.java

@ -0,0 +1,20 @@
package com.huaxing.data.storage.mapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import java.util.List;
import java.util.Map;
/**
* @author 孟剑
* @date 2025-01-16 15:23
*/
@Mapper
@SuppressWarnings("all")
public interface IDeviceDataStoredMapper {
@Select("${sql}")
List<Map<String, Object>> selectList (String sql);
}

13
data-storage/src/main/java/com/huaxing/data/storage/service/IDataAnalysisService.java

@ -0,0 +1,13 @@
package com.huaxing.data.storage.service;
/**
* 入库数据解析服务
* @author 孟剑
* @date 2025-01-13 11:16
*/
public interface IDataAnalysisService {
// 解析入库数据
void parseStoreData (String jsonData);
}

18
data-storage/src/main/java/com/huaxing/data/storage/service/IDeviceDataQueryDfsService.java

@ -0,0 +1,18 @@
package com.huaxing.data.storage.service;
import java.util.List;
import java.util.Map;
/**
* @author 孟剑
* @date 2025-01-16 16:45
*/
public interface IDeviceDataQueryDfsService {
/**
* 查询Dfs表 list数据
* @param sql
* @return
*/
List<Map<String, Object>> selectList(String sql);
}

18
data-storage/src/main/java/com/huaxing/data/storage/service/IDeviceDataQueryStreamService.java

@ -0,0 +1,18 @@
package com.huaxing.data.storage.service;
import java.util.List;
import java.util.Map;
/**
* @author 孟剑
* @date 2025-01-16 16:45
*/
public interface IDeviceDataQueryStreamService {
/**
* 查询Dfs表 list数据
* @param sql
* @return
*/
List<Map<String, Object>> selectList(String sql);
}

19
data-storage/src/main/java/com/huaxing/data/storage/service/IDeviceDataStoredService.java

@ -0,0 +1,19 @@
package com.huaxing.data.storage.service;
import com.huaxing.data.storage.domain.DataAnalysisDTO;
/**
* ClassName: IDeviceDataStoredService
* Description: 设备采集入库与查询通用业务接口
*/
public interface IDeviceDataStoredService {
/**
* 设备采集入库
* @param analysisDTO
*/
void insert(DataAnalysisDTO analysisDTO);
void execute(String sql);
}

25
data-storage/src/main/java/com/huaxing/data/storage/service/abstracts/SqlConverterStatement.java

@ -0,0 +1,25 @@
package com.huaxing.data.storage.service.abstracts;
import java.util.Map;
/**
* @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.storage.service.base
* @ClassName: SqlConverterStatement
* @Author: swordmeng8@163.com
* @Description: SQL转换语句
* @Date: 2025/1/14 14:01
* @Version: 1.0
*/
abstract public class SqlConverterStatement {
/**
* 生成插入语句
* @param tableName
* @param map
* @return
*/
public abstract String generateInsertStreamStatement(String tableName, Map<String, Object> map);
}

19
data-storage/src/main/java/com/huaxing/data/storage/service/base/IDbSqlFactory.java

@ -0,0 +1,19 @@
package com.huaxing.data.storage.service.base;
import java.util.Map;
/**
* @author 孟剑
* @date 2025-01-16 16:54
*/
public interface IDbSqlFactory {
// 生成插入流式语句
String generateInsertStreamStatement(String tableName, Map<String, Object> map);
// 生成Stream表查询语句
String generateSelectStreamStatement(String tableName, Map<String, Object> map);
// 生成Dfs表查询语句
String generateSelectDfsStatement(String tableName, Map<String, Object> map);
}

71
data-storage/src/main/java/com/huaxing/data/storage/service/base/SqlConverterStatementHandle.java

@ -0,0 +1,71 @@
package com.huaxing.data.storage.service.base;
import com.huaxing.data.storage.service.abstracts.SqlConverterStatement;
import java.util.List;
import java.util.Map;
/**
* @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.storage.service.base
* @ClassName: SqlConverterStatementHandle
* @Author: swordmeng8@163.com
* @Description: sql处理类
* @Date: 2025/1/14 14:05
* @Version: 1.0
*/
public class SqlConverterStatementHandle extends SqlConverterStatement {
/**
* 生成插入语句
*
* @param tableName 表名
* @param map 键值对
* @return 插入语句
*/
@Override
public String generateInsertStreamStatement(String tableName, Map<String, Object> map) {
StringBuilder columnBuilder = new StringBuilder();
StringBuilder valueBuilder = new StringBuilder();
int index = 0;
for (Map.Entry<String, Object> entry : map.entrySet()) {
// 处理列名
columnBuilder.append(entry.getKey());
// 处理值
if (entry.getValue() instanceof String) {
valueBuilder.append("'").append(entry.getValue()).append("'");
} else if (entry.getValue() == null) {
valueBuilder.append("NULL");
} else {
valueBuilder.append(entry.getValue());
}
if (index < map.size() - 1) {
columnBuilder.append(", ");
valueBuilder.append(", ");
}
index++;
}
return "INSERT INTO " + tableName + " (" + columnBuilder + ") VALUES (" + valueBuilder + ")";
}
/**
* 生成流表Stream查询语句
* @param tableName
* @param columnNameList
* @return
*/
public String generateSelectStreamStatement(String tableName, List<String> columnNameList) {
StringBuilder columnBuilder = new StringBuilder();
if (columnNameList.isEmpty()) {
columnBuilder.append(" * ");
} else {
columnNameList.forEach(columnName -> columnBuilder.append(columnName).append(", "));
}
return "SELECT " + columnBuilder.toString() + " FROM " + tableName;
}
}

46
data-storage/src/main/java/com/huaxing/data/storage/service/impl/DataAnalysisService.java

@ -0,0 +1,46 @@
package com.huaxing.data.storage.service.impl;
import com.huaxing.data.storage.domain.DataAnalysisDTO;
import com.huaxing.data.storage.service.IDataAnalysisService;
import com.huaxing.data.storage.service.IDeviceDataStoredService;
import com.huaxing.data.util.JacksonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.storage.service.impl
* @ClassName: DataAnalysisService
* @Author: swordmeng8@163.com
* @Description: 入库数据解析服务
* @Date: 2025/1/13 11:16
* @Version: 1.0
*/
@Slf4j
@Service
public class DataAnalysisService implements IDataAnalysisService {
final IDeviceDataStoredService dataStoredService;
public DataAnalysisService(IDeviceDataStoredService dataStoredService) {
this.dataStoredService = dataStoredService;
}
int row = 0;
/**
* @author: swordmeng8@163.com
* @date: 2025/1/13 11:16
* @desc: 解析入库数据
* @param jsonData
* @return void
*/
@Override
public void parseStoreData(String jsonData) {
DataAnalysisDTO dataAnalysisDTO = JacksonUtil.strToObject(jsonData, DataAnalysisDTO.class);
log.info("入库数据解析完成");
dataStoredService.insert(dataAnalysisDTO);
log.info("入库数据入库完成");
row++;
log.info("入库数据入库完成,入库总数:{}", row);
}
}

39
data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataQueryDfsServiceImpl.java

@ -0,0 +1,39 @@
package com.huaxing.data.storage.service.impl;
import com.huaxing.data.storage.mapper.IDeviceDataQueryDfsMapper;
import com.huaxing.data.storage.service.IDeviceDataQueryDfsService;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
/**
* @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.storage.service.impl
* @ClassName: DeviceDataQueryDfsServiceImpl
* @Author: swordmeng8@163.com
* @Description: 设备数据查询业务层
* @Date: 2025/1/16 16:45
* @Version: 1.0
*/
@Service
public class DeviceDataQueryDfsServiceImpl implements IDeviceDataQueryDfsService {
// 构造器注入
private final IDeviceDataQueryDfsMapper deviceDataQueryDfsMapper;
public DeviceDataQueryDfsServiceImpl(IDeviceDataQueryDfsMapper deviceDataQueryDfsMapper) {
this.deviceDataQueryDfsMapper = deviceDataQueryDfsMapper;
}
/**
* @author: swordmeng8@163.com
* @date: 2025/1/16 16:45
* @desc: 查询数据
* @param sql
* @return java.util.List<java.util.Map<java.lang.String,java.lang.Object>>
*/
@Override
public List<Map<String, Object>> selectList(String sql) {
return deviceDataQueryDfsMapper.selectList(sql);
}
}

39
data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataQueryStreamServiceImpl.java

@ -0,0 +1,39 @@
package com.huaxing.data.storage.service.impl;
import com.huaxing.data.storage.mapper.IDeviceDataQueryStreamMapper;
import com.huaxing.data.storage.service.IDeviceDataQueryStreamService;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
/**
* @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.storage.service.impl
* @ClassName: DeviceDataQueryStreamServiceImpl
* @Author: swordmeng8@163.com
* @Description: 设备数据查询业务层
* @Date: 2025/1/16 16:45
* @Version: 1.0
*/
@Service
public class DeviceDataQueryStreamServiceImpl implements IDeviceDataQueryStreamService {
// 构造器注入
private final IDeviceDataQueryStreamMapper dataQueryStreamMapper;
public DeviceDataQueryStreamServiceImpl(IDeviceDataQueryStreamMapper dataQueryStreamMapper) {
this.dataQueryStreamMapper = dataQueryStreamMapper;
}
/**
* @author: swordmeng8@163.com
* @date: 2025/1/16 16:45
* @desc: 查询数据
* @param sql
* @return java.util.List<java.util.Map<java.lang.String,java.lang.Object>>
*/
@Override
public List<Map<String, Object>> selectList(String sql) {
return dataQueryStreamMapper.selectList(sql);
}
}

52
data-storage/src/main/java/com/huaxing/data/storage/service/impl/DeviceDataStoredServiceImpl.java

@ -0,0 +1,52 @@
package com.huaxing.data.storage.service.impl;
import com.huaxing.data.storage.domain.DataAnalysisDTO;
import com.huaxing.data.storage.mapper.IDeviceDataStoredMapper;
import com.huaxing.data.storage.service.IDeviceDataStoredService;
import com.huaxing.data.dolphindb.base.CommonService;
import com.huaxing.data.util.JacksonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* @ClassName DeviceDataStoredServiceImpl
* @Description 设备采集入库与查询服务实现类
* @Author swordmeng8@163.com
* @Date 2024/12/2 18:59
* @Version 1.0
**/
@Slf4j
@Service
@SuppressWarnings("all")
public class DeviceDataStoredServiceImpl extends CommonService implements IDeviceDataStoredService {
final IDeviceDataStoredMapper deviceDataStoredMapper;
public DeviceDataStoredServiceImpl(IDeviceDataStoredMapper deviceDataStoredMapper) {
this.deviceDataStoredMapper = deviceDataStoredMapper;
}
/**
* 插入数据
* @param dataAnalysis
*/
@Override
public void insert(DataAnalysisDTO dataAnalysis) {
String tableName = dataAnalysis.getTableName();
dataAnalysis.getDataList().forEach(map -> {
// CompletableFuture.runAsync(() -> {
//
// });
log.info("入库数据:{}", JacksonUtil.objectStr(map));
executeOnce(generateInsertStreamStatement(tableName, map));
});
}
@Override
public void execute(String sql) {
executeOnce(sql);
}
}

28
data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableColumnDTO.java

@ -0,0 +1,28 @@
package com.huaxing.data.tablemanagement.domain;
import lombok.Data;
/**
* @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.tablemanagement.domain
* @ClassName: TableColumnDTO
* @Author: swordmeng8@163.com
* @Description: 表列DTO
* @Date: 2025/1/15 16:05
* @Version: 1.0
*/
@Data
public class TableColumnDTO {
/**
* 列名
*/
private String columnName;
/**
* 列类型
*/
private String columnType;
/**
* 列描述
*/
private String columnDesc;
}

29
data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableDTO.java

@ -0,0 +1,29 @@
package com.huaxing.data.tablemanagement.domain;
import lombok.Data;
import java.util.List;
/**
* @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.tablemanagement.domain
* @ClassName: TableDTO
* @Author: swordmeng8@163.com
* @Description: 表操作类
* @Date: 2025/1/15 16:03
* @Version: 1.0
*/
@Data
public class TableDTO {
/**
* 表名
*/
private String tableName;
/**
* 表列信息
*/
private List<TableColumnDTO> tableColumnList;
}

40
data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableStructureService.java

@ -0,0 +1,40 @@
package com.huaxing.data.tablemanagement.service;
import java.sql.SQLException;
import java.util.List;
/**
* @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.storage.service
* @ClassName: ITableStructureService
* @Author: swordmeng8@163.com
* @Description: 表结构操作service
* @Date: 2025/1/13 11:02
* @Version: 1.0
*/
public interface ITableStructureService {
// 添加dfs表单列
void addDfsColumn(String tableName, String columnName, String columnDefinition) throws SQLException;
// 添加dfs表多列
void addDfsColumns(String tableName, List<String> columnName, String columnDefinition) throws SQLException;
// 添加Stream流表单列
void addStreamColumn(String tableName, String columnName, String columnDefinition) throws SQLException;
// 添加Stream流表多列
void addStreamColumns(String tableName, String columnName, String columnDefinition) throws SQLException;
// 创建dfs表
void createDfsTable(String tableName, String columnName, String columnDefinition) throws SQLException;
// 创建Stream流表
void createStreamTable(String tableName, String columnName, String columnDefinition) throws SQLException;
// 订阅流表
void subscribeStreamTable(String tableName, String columnName, String columnDefinition) throws SQLException;
// 判断表是否存在
boolean isTableExist(String tableName) throws SQLException;
}

65
data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableStructureService.java

@ -0,0 +1,65 @@
package com.huaxing.data.tablemanagement.service.impl;
import com.huaxing.data.dolphindb.base.CommonService;
import com.huaxing.data.tablemanagement.service.ITableStructureService;
import org.springframework.stereotype.Service;
import java.sql.SQLException;
import java.util.List;
/**
* @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.storage.service.impl
* @ClassName: TableStructureService
* @Author: swordmeng8@163.com
* @Description: 表结构操作service
* @Date: 2025/1/13 11:02
* @Version: 1.0
*/
@Service
public class TableStructureService extends CommonService implements ITableStructureService {
@Override
public void addDfsColumn(String tableName, String columnName, String columnDefinition) throws SQLException {
String script =
"pt = loadTable(\"dfs://ZbDB\", `" + tableName + ");\n" +
"alter table pt add " + columnName + " " + columnDefinition + ";";
exec(script);
}
@Override
public void addDfsColumns(String tableName, List<String> columnName, String columnDefinition) throws SQLException {
exec("");
}
@Override
public void addStreamColumn(String tableName, String columnName, String columnDefinition) throws SQLException {
exec("addColumn(" + tableName + ", " + columnName + "," + columnDefinition + ")");
}
@Override
public void addStreamColumns(String tableName, String columnName, String columnDefinition) throws SQLException {
exec("");
}
@Override
public void createDfsTable(String tableName, String columnName, String columnDefinition) throws SQLException {
exec("");
}
@Override
public void createStreamTable(String tableName, String columnName, String columnDefinition) throws SQLException {
exec("");
}
// 订阅流表
@Override
public void subscribeStreamTable(String tableName, String columnName, String columnDefinition) throws SQLException {
exec("");
}
@Override
public boolean isTableExist(String tableName) throws SQLException {
return false;
}
}

77
data-storage/src/main/java/com/huaxing/data/util/JacksonUtil.java

@ -0,0 +1,77 @@
package com.huaxing.data.util;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author kwl99
* @since 2024-08-08 14:31
*/
public class JacksonUtil {
/**
* 将对象转为json字符串
*/
public static String objectStr(Object object) {
ObjectMapper mapper = new ObjectMapper();
try {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
return mapper.writeValueAsString(object);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
/**
* 将json字符串转为对象
*/
public static <T> T strToObject(String str, Class<T> clazz) {
ObjectMapper mapper = new ObjectMapper();
try {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return mapper.readValue(str, clazz);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 将json字符串转为对象列表
*/
public static <T> List<T> strToObjectList(String str, Class<T> clazz) {
ObjectMapper mapper = new ObjectMapper();
try {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return mapper.readValue(str, mapper.getTypeFactory().constructCollectionType(List.class, clazz));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 将json字符串转为对象列表
*/
public static List<Map<String, Object>> strToMapList(String str) {
ObjectMapper mapper = new ObjectMapper();
try {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return mapper.readValue(str, mapper.getTypeFactory().constructCollectionType(List.class, HashMap.class));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

0
data-storage/src/main/java/com/huaxing/mqtt/ReadMe

143
data-storage/src/main/java/com/huaxing/mqtt/config/MqttConfiguration.java

@ -0,0 +1,143 @@
package com.huaxing.mqtt.config;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
/**
* mqtt全局相关配置信息
*/
@Slf4j
@Setter
@Getter
@Configuration
@ConfigurationProperties("mqtt")
@IntegrationComponentScan(basePackages = "com.huaxing.mqtt.*")
public class MqttConfiguration {
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 连接地址
*/
private String hostUrl;
/**
* 客户Id
*/
private String clientId;
/**
* 默认连接话题
*/
private String defaultTopic;
/**
* 超时时间
*/
private int timeout;
/**
* qos
*/
private int qos;
/**
* 订阅超时时间
*/
private int completionTimeout;
/**
* 保持连接数
*/
private int keepalive;
/**
* 2024-12-06新增
* 要订阅的其他主题
*/
private List<String> topics;
/**
* 客户端与服务器之间的连接意外中断服务器将发布客户端的遗嘱消息
*/
private static final byte[] WILL_DATA = "offline".getBytes();
/**
* 获取所有订阅的主题
* @return 所有订阅的主题
*/
public String[] getAllTopics() {
// 校验配置文件是否配置
if (CollectionUtils.isEmpty(topics)) {
this.topics = new ArrayList<>();
}
// 将默认主题条件到其他主题里
this.topics.add(defaultTopic);
// 返回主题数组
return topics.toArray(new String[0]);
}
/**
* 注册MQTT客户端工厂
* @return MqttPahoClientFactory
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
// 客户端工厂
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
// 设置连接的用户名
options.setUserName(username);
// 设置连接的密码
options.setPassword(password.toCharArray());
// 设置连接的地址
options.setServerURIs(new String[]{hostUrl});
// 如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持:
// 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。
// 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。
options.setCleanSession(true);
// 设置超时时间,该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。
// 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
// 此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0
// 但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
options.setWill("willTopic", WILL_DATA, 2, false);
//自动重新连接
options.setAutomaticReconnect(true);
factory.setConnectionOptions(options);
log.info("初始化 MQTT 配置");
return factory;
}
}

96
data-storage/src/main/java/com/huaxing/mqtt/config/MqttConsumerConfiguration.java

@ -0,0 +1,96 @@
package com.huaxing.mqtt.config;
import com.huaxing.mqtt.constant.MqttConstant;
import com.huaxing.mqtt.processor.MqttMessageReceiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import java.util.UUID;
/**
* MQTT消费者配置
*/
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttConsumerConfiguration {
final MqttConfiguration mqttConfiguration;
final MqttMessageReceiver mqttMessageReceiver;
public MqttConsumerConfiguration(MqttConfiguration mqttConfiguration, MqttMessageReceiver mqttMessageReceiver) {
this.mqttConfiguration = mqttConfiguration;
this.mqttMessageReceiver = mqttMessageReceiver;
}
/**
* 此处可以使用其他消息通道
* MQTT信息通道消费者
* Spring Integration默认的消息通道它允许将消息发送给一个订阅者然后阻碍发送直到消息被接收
*/
@Bean
public MessageChannel mqttInBoundChannel() {
return new DirectChannel();
}
/**
* mqtt入站消息处理工具对于指定消息入站通道接收到生产者生产的消息后处理消息的工具
*/
@Bean
@ServiceActivator(inputChannel = "mqttInBoundChannel")
public MessageHandler mqttMessageHandler() {
return this.mqttMessageReceiver;
}
/**
* MQTT消息订阅绑定消费者
* 适配器, 两个topic共用一个adapter
* 客户端作为消费者订阅主题消费消息
*/
@Bean
public MessageProducerSupport mqttInbound() {
// 获取客户端id
String clientId = mqttConfiguration.getClientId();
// 获取默认主题
// String defaultTopic = mqttConfiguration.getDefaultTopic();
// 获取所有配置的主题
String[] topics = mqttConfiguration.getAllTopics();
// 获取客户端工厂
MqttPahoClientFactory mqttPahoClientFactory = mqttConfiguration.mqttClientFactory();
// Paho客户端消息驱动通道适配器,主要用来订阅主题
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
UUID.randomUUID() + clientId + MqttConstant.CLIENT_SUFFIX_CONSUMERS,
mqttPahoClientFactory,
// 所有需要订阅的topic
topics
);
adapter.setCompletionTimeout(mqttConfiguration.getCompletionTimeout());
// Paho消息转换器
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
// 按字节接收消息
// defaultPahoMessageConverter.setPayloadAsBytes(true);
adapter.setConverter(defaultPahoMessageConverter);
// 设置QoS
adapter.setQos(mqttConfiguration.getQos());
// 设置订阅通道
adapter.setOutputChannel(mqttInBoundChannel());
return adapter;
}
}

65
data-storage/src/main/java/com/huaxing/mqtt/config/MqttProducerConfiguration.java

@ -0,0 +1,65 @@
package com.huaxing.mqtt.config;
import com.huaxing.mqtt.constant.MqttConstant;
import jakarta.annotation.Resource;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* MQTT生产者配置
*/
@Slf4j
@Configuration
@AllArgsConstructor
public class MqttProducerConfiguration {
@Resource
private MqttConfiguration mqttConfiguration;
/**
* MQTT信息通道生产者
*/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息处理器生产者
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
// 客户端id
String clientId = mqttConfiguration.getClientId();
// 默认主题
String defaultTopic = mqttConfiguration.getDefaultTopic();
MqttPahoClientFactory mqttPahoClientFactory = mqttConfiguration.mqttClientFactory();
// 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + MqttConstant.CLIENT_SUFFIX_PRODUCERS, mqttPahoClientFactory);
// true,异步,发送消息时将不会阻塞。
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
// 默认QoS
messageHandler.setDefaultQos(1);
// Paho消息转换器
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
// defaultPahoMessageConverter.setPayloadAsBytes(true);
// 发送默认按字节类型发送消息
messageHandler.setConverter(defaultPahoMessageConverter);
return messageHandler;
}
}

17
data-storage/src/main/java/com/huaxing/mqtt/constant/MqttConstant.java

@ -0,0 +1,17 @@
package com.huaxing.mqtt.constant;
/**
* 常量
*/
public class MqttConstant {
/**
* 客户端id消费者后缀
*/
public static final String CLIENT_SUFFIX_CONSUMERS = "_consumers";
/**
* 客户端id生产者后缀
*/
public static final String CLIENT_SUFFIX_PRODUCERS = "_producers";
}

46
data-storage/src/main/java/com/huaxing/mqtt/processor/MqttGateway.java

@ -0,0 +1,46 @@
package com.huaxing.mqtt.processor;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
/**
* 生产者处理器
*/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* 发送mqtt消息
*
* @param topic 主题
* @param payload 内容
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
/**
* 发送包含qos的消息
*
* @param topic 主题
* @param qos 对消息处理的几种机制
* * 0 表示的是订阅者没收到消息不会再次发送消息会丢失<br>
* * 1 表示的是会尝试重试一直到接收到消息但这种情况可能导致订阅者收到多次重复消息<br>
* * 2 多了一次去重的动作确保订阅者收到的消息有一次
* @param payload 消息体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
/**
* 发送包含qos的消息
*
* @param topic 主题
* @param qos 对消息处理的几种机制
* * 0 表示的是订阅者没收到消息不会再次发送消息会丢失<br>
* * 1 表示的是会尝试重试一直到接收到消息但这种情况可能导致订阅者收到多次重复消息<br>
* * 2 多了一次去重的动作确保订阅者收到的消息有一次
* @param payload 消息体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}

66
data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageReceiver.java

@ -0,0 +1,66 @@
package com.huaxing.mqtt.processor;
import com.huaxing.data.storage.service.IDataAnalysisService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
/**
* 消费者处理器
*/
@Slf4j
@Component
@SuppressWarnings("all")
public class MqttMessageReceiver implements MessageHandler {
/**
* 数据处理服务
*/
final IDataAnalysisService analysisService;
public MqttMessageReceiver(IDataAnalysisService analysisService) {
this.analysisService = analysisService;
}
/**
* 消息处理
*
* @param message 消息
* @throws MessagingException 消息异常
*/
@Override
@Async("handleMessage")
public void handleMessage(Message<?> message) throws MessagingException {
try {
// 获取消息Topic
MessageHeaders headers = message.getHeaders();
String topic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC));
log.info("[获取到的消息的topic]:{} ", topic);
// 获取消息体
String payload = (String) message.getPayload();
log.info("[获取到的消息的payload]:{} ", payload);
// 数据库入库消息
if (topic.contains("iot/test1/in-storage")) {
// 模拟1000000条数据入库
log.info("接收到iot/test1/in-storage的消息啦,快去处理");
long l = System.currentTimeMillis();
System.currentTimeMillis();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
analysisService.parseStoreData(payload);
});
long l1 = System.currentTimeMillis();
log.info("入库完成,耗时:{}", l1 - l);
// analysisService.parseStoreData(payload);
} else if (topic.contains("table-update/")){} // TODO 表更新topic
} catch (Exception e) {
log.error(e.toString());
}
}
}

52
data-storage/src/main/java/com/huaxing/mqtt/processor/MqttMessageSender.java

@ -0,0 +1,52 @@
package com.huaxing.mqtt.processor;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 生产者处理器
*/
@Component
public class MqttMessageSender {
@Autowired
private MqttGateway mqttGateway;
/**
* 发送mqtt消息
*
* @param topic 主题
* @param message 内容
* @return void
*/
public void send(String topic, String message) {
mqttGateway.sendToMqtt(topic, message);
}
/**
* 发送包含qos的消息
*
* @param topic 主题
* @param qos 质量
* @param messageBody 消息体
* @return void
*/
public void send(String topic, int qos, JSONObject messageBody) {
mqttGateway.sendToMqtt(topic, qos, messageBody.toString());
}
/**
* 发送包含qos的消息
*
* @param topic 主题
* @param qos 质量
* @param message 消息体
* @return void
*/
public void send(String topic, int qos, byte[] message) {
mqttGateway.sendToMqtt(topic, qos, message);
}
}

48
data-storage/src/main/resources/application.yml

@ -0,0 +1,48 @@
server:
port: 8088
spring:
application:
name: iot-data-bridge
datasource:
url: jdbc:dolphindb://localhost:8848?databasePath=dfs://ZbDB
username: admin
password: 123456
driver-class-name: com.dolphindb.jdbc.Driver
main:
lazy-initialization: false
mybatis:
mapper-locations: classpath:com/huaxing/data/storage/mapper/*.xml
type-aliases-package: com.huaxing.data.storage.entity
configuration:
map-underscore-to-camel-case: true
sql-session-factory:
data-source: ${spring.datasource}
sql-session-template:
executor-type: BATCH
sql-session-factory-ref: sqlSessionFactory
mqtt:
username: admin
password: 123456
host-url: tcp://8.130.65.74:1883
client-id: iot
timeout: 100
keepalive: 100
completion-timeout: 5000
qos: 1
default-topic: iot/data/#
topics:
- iot/test1/#
- iot/test2/#
dolphindb:
db-path: dfs://ZbDB
host: 127.0.0.1
port: 8848
username: admin
password: 123456
init-pool-size: 10
minimum-pool-size: 5
enable-high-availability: false

79
data-storage/src/main/resources/mapper/DeviceDataStoredDfs.xml

@ -0,0 +1,79 @@
<!--<?xml version="1.0" encoding="UTF-8" ?>-->
<!--<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">-->
<!--<mapper namespace="com.huaxing.iot.mqtt.mapper.AirConditionerDfsMapper">-->
<!-- <sql id="conditionSql">-->
<!-- <where>-->
<!-- projectId= #{projectId}-->
<!-- <if test="deviceIds !=null">-->
<!-- and deviceId in-->
<!-- <foreach collection="deviceIds" item="item" index="index"-->
<!-- separator="," open="(" close=")">-->
<!-- #{item}-->
<!-- </foreach>-->
<!-- </if>-->
<!-- <if test="startDate !=null and endDate !=null">-->
<!-- and <![CDATA[ time >=#{startDate} and time <= #{endDate}]]>-->
<!-- </if>-->
<!-- </where>-->
<!-- </sql>-->
<!-- <select id="queryByTimespan" resultType="com.huaxing.iot.mqtt.entity.AirConditioner">-->
<!-- select time,projectId,deviceId,AstRu, AdSw, ArtemRAr, ArtemRArUnit, ArvSAr, ArvSArUnit, ArstMa, AtemRAr,-->
<!-- AtemRArUnit, AvSAr, AvSArUnit, AstMa, AmSw, Asw, Apl, ArmSw-->
<!-- from AirConditionerStream-->
<!-- <include refid="conditionSql"/>-->
<!-- order by time desc-->
<!-- </select>-->
<!-- <select id="queryByTimespan_COUNT" resultType="Long">-->
<!-- select count(*)-->
<!-- from airConditioningBoxDfs-->
<!-- <include refid="conditionSql"/>-->
<!-- </select>-->
<!-- <select id="queryLastPointInfo" resultType="com.huaxing.iot.mqtt.entity.AirConditioner">-->
<!-- SELECT t1.time, t1.projectId, t1.deviceId,-->
<!-- <choose>-->
<!-- <when test="devicePointCodes != null and devicePointCodes.size() > 0">-->
<!-- <foreach item="field" collection="devicePointCodes" separator=",">-->
<!-- t1.${field}-->
<!-- </foreach>-->
<!-- </when>-->
<!-- <otherwise>-->
<!-- t1.AstRu, t1.AdSw, t1.ArtemRAr, t1.ArtemRArUnit, t1.ArvSAr, t1.ArvSArUnit, t1.ArstMa, t1.AtemRAr,-->
<!-- t1.AtemRArUnit, t1.AvSAr, t1.AvSArUnit, t1.AstMa, t1.AmSw, t1.Asw, t1.Apl, t1.ArmSw-->
<!-- </otherwise>-->
<!-- </choose>-->
<!-- FROM AirConditionerStream t1-->
<!-- INNER JOIN (-->
<!-- SELECT deviceId, max(time) AS max_time-->
<!-- FROM AirConditionerStream-->
<!-- WHERE deviceId IN-->
<!-- <foreach collection="deviceIds" item="item" index="index" separator="," open="(" close=")">-->
<!-- #{item}-->
<!-- </foreach>-->
<!-- GROUP BY deviceId-->
<!-- ) t2 ON t1.deviceId = t2.deviceId AND t1.time = t2.max_time-->
<!-- <where>-->
<!-- <if test="projectId != null and projectId != ''">-->
<!-- and t1.projectId = #{projectId}-->
<!-- </if>-->
<!-- </where>-->
<!-- </select>-->
<!-- <delete id="delete">-->
<!-- delete from AirConditionerStream-->
<!-- where projectId= #{projectId} and-->
<!-- deviceId in-->
<!-- <foreach collection="deviceIds" item="item" index="index"-->
<!-- separator="," open="(" close=")">-->
<!-- #{item}-->
<!-- </foreach>-->
<!-- </delete>-->
<!-- &lt;!&ndash;<insert id="insert" parameterType="com.huaxing.iot.mqtt.entity.Elevator">-->
<!-- insert into ElevatorDfs values(#{time},#{projectId},#{deviceId},#{onOffState},#{runMode})-->
<!-- </insert>&ndash;&gt;-->
<!--</mapper>-->

9
data-storage/src/main/resources/mapper/DeviceDataStoredStream.xml

@ -0,0 +1,9 @@
<!--<?xml version="1.0" encoding="UTF-8" ?>-->
<!--<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">-->
<!--<mapper namespace="com.huaxing.data.storage.mapper.IDeviceDataStoredMapper">-->
<!-- <select id="selectList" resultType="map">-->
<!-- ${selectSql}-->
<!-- </select>-->
<!--</mapper>-->

3
data-storage/src/main/resources/sqlScripts.properties

@ -0,0 +1,3 @@
create.table.script=CREATE TABLE {0} (id INT PRIMARY KEY, name VARCHAR(255))
add.column.script=ALTER TABLE {0} ADD COLUMN age INT
drop.column.script=ALTER TABLE {0} DROP COLUMN name

71
data-storage/src/main/resources/补水箱.script

@ -0,0 +1,71 @@
//分布式库名
dbPath = "dfs://ZbDB"
//流表名
stName = "WaterMeterTsetStream"
//分区表名
ptName = "WaterMeterTsetDfs"
//建库函数
def createDB(dbPath){
print('正在初始化数据库' + dbPath)
if (existsDatabase(dbPath)) {
print('当前数据库已存在,加载数据库:' + dbPath)
db = database(dbPath)
print('已加载数据库:' + dbPath)
return db
}
else {
print('当前数据库不存在,创建数据库:' + dbPath)
value = 2025.01M..2040.12M
db = database(dbPath, VALUE, value,,engine='OLAP')
print('已完成创建数据库:'+ dbPath)
return db
}
}
//建流表函数
def createST(stName){
if(not existsStreamTable(stName)){
print('共享变量未定义,创建共享表:' + stName)
print(`正在创建Demo流表)
colNames = `time`projectId`deviceId`WM_WFA`WM_WFA_Unit
colTypes = [DATETIME,STRING,STRING,DECIMAL64(4),STRING]
st = streamTable(150000:0, colNames, colTypes)
print(`完成创建Demo流表)
enableTableShareAndPersistence(table=st, tableName=stName, retentionMinutes=10, cacheSize=100000, preCache=100000 )
print('完成创建共享表')
return st
}
else{
print('共享变量已存在,加载共享变量:' + stName)
return objByName(stName, true)
}
}
//建分区表函数
def createPT(dbPath,ptName){
db = createDB(dbPath)
if (existsTable(dbPath, ptName)){
print('分区表已存在,加载分区表:' + ptName)
pt = loadTable(db,ptName)
return pt
}
else{
print('分区表不存在,创建分区表:' + ptName)
print(`正在创建Demo分区表)
colNames = `time`projectId`deviceId`WM_WFA`WM_WFA_Unit
colTypes = [DATETIME,STRING,STRING,DECIMAL64(4),STRING]
t = table(1:0, colNames, colTypes)
pt = db.createPartitionedTable(table=t, tableName=ptName, partitionColumns=`time, sortColumns=`deviceId`time, keepDuplicates=LAST)
print('完成创建分区表:' + ptName)
return pt
}
}
//创建库
//createDB(dbPath)
//创建分区表
createPT(dbPath,ptName)
//创建流表
unsubscribeTable(tableName=stName, actionName=`WaterMeterTsetChgTime)
// dropStreamTable(stName)
createST(stName)
//订阅流表
subscribeTable(tableName=stName, actionName=`WaterMeterTsetChgTime, offset=-1, handler=loadTable(dbPath,ptName), msgAsTable=true)

259
mvnw

@ -0,0 +1,259 @@
#!/bin/sh
# ----------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Apache Maven Wrapper startup batch script, version 3.3.2
#
# Optional ENV vars
# -----------------
# JAVA_HOME - location of a JDK home dir, required when download maven via java source
# MVNW_REPOURL - repo url base for downloading maven distribution
# MVNW_USERNAME/MVNW_PASSWORD - user and password for downloading maven
# MVNW_VERBOSE - true: enable verbose log; debug: trace the mvnw script; others: silence the output
# ----------------------------------------------------------------------------
set -euf
[ "${MVNW_VERBOSE-}" != debug ] || set -x
# OS specific support.
native_path() { printf %s\\n "$1"; }
case "$(uname)" in
CYGWIN* | MINGW*)
[ -z "${JAVA_HOME-}" ] || JAVA_HOME="$(cygpath --unix "$JAVA_HOME")"
native_path() { cygpath --path --windows "$1"; }
;;
esac
# set JAVACMD and JAVACCMD
set_java_home() {
# For Cygwin and MinGW, ensure paths are in Unix format before anything is touched
if [ -n "${JAVA_HOME-}" ]; then
if [ -x "$JAVA_HOME/jre/sh/java" ]; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
JAVACCMD="$JAVA_HOME/jre/sh/javac"
else
JAVACMD="$JAVA_HOME/bin/java"
JAVACCMD="$JAVA_HOME/bin/javac"
if [ ! -x "$JAVACMD" ] || [ ! -x "$JAVACCMD" ]; then
echo "The JAVA_HOME environment variable is not defined correctly, so mvnw cannot run." >&2
echo "JAVA_HOME is set to \"$JAVA_HOME\", but \"\$JAVA_HOME/bin/java\" or \"\$JAVA_HOME/bin/javac\" does not exist." >&2
return 1
fi
fi
else
JAVACMD="$(
'set' +e
'unset' -f command 2>/dev/null
'command' -v java
)" || :
JAVACCMD="$(
'set' +e
'unset' -f command 2>/dev/null
'command' -v javac
)" || :
if [ ! -x "${JAVACMD-}" ] || [ ! -x "${JAVACCMD-}" ]; then
echo "The java/javac command does not exist in PATH nor is JAVA_HOME set, so mvnw cannot run." >&2
return 1
fi
fi
}
# hash string like Java String::hashCode
hash_string() {
str="${1:-}" h=0
while [ -n "$str" ]; do
char="${str%"${str#?}"}"
h=$(((h * 31 + $(LC_CTYPE=C printf %d "'$char")) % 4294967296))
str="${str#?}"
done
printf %x\\n $h
}
verbose() { :; }
[ "${MVNW_VERBOSE-}" != true ] || verbose() { printf %s\\n "${1-}"; }
die() {
printf %s\\n "$1" >&2
exit 1
}
trim() {
# MWRAPPER-139:
# Trims trailing and leading whitespace, carriage returns, tabs, and linefeeds.
# Needed for removing poorly interpreted newline sequences when running in more
# exotic environments such as mingw bash on Windows.
printf "%s" "${1}" | tr -d '[:space:]'
}
# parse distributionUrl and optional distributionSha256Sum, requires .mvn/wrapper/maven-wrapper.properties
while IFS="=" read -r key value; do
case "${key-}" in
distributionUrl) distributionUrl=$(trim "${value-}") ;;
distributionSha256Sum) distributionSha256Sum=$(trim "${value-}") ;;
esac
done <"${0%/*}/.mvn/wrapper/maven-wrapper.properties"
[ -n "${distributionUrl-}" ] || die "cannot read distributionUrl property in ${0%/*}/.mvn/wrapper/maven-wrapper.properties"
case "${distributionUrl##*/}" in
maven-mvnd-*bin.*)
MVN_CMD=mvnd.sh _MVNW_REPO_PATTERN=/maven/mvnd/
case "${PROCESSOR_ARCHITECTURE-}${PROCESSOR_ARCHITEW6432-}:$(uname -a)" in
*AMD64:CYGWIN* | *AMD64:MINGW*) distributionPlatform=windows-amd64 ;;
:Darwin*x86_64) distributionPlatform=darwin-amd64 ;;
:Darwin*arm64) distributionPlatform=darwin-aarch64 ;;
:Linux*x86_64*) distributionPlatform=linux-amd64 ;;
*)
echo "Cannot detect native platform for mvnd on $(uname)-$(uname -m), use pure java version" >&2
distributionPlatform=linux-amd64
;;
esac
distributionUrl="${distributionUrl%-bin.*}-$distributionPlatform.zip"
;;
maven-mvnd-*) MVN_CMD=mvnd.sh _MVNW_REPO_PATTERN=/maven/mvnd/ ;;
*) MVN_CMD="mvn${0##*/mvnw}" _MVNW_REPO_PATTERN=/org/apache/maven/ ;;
esac
# apply MVNW_REPOURL and calculate MAVEN_HOME
# maven home pattern: ~/.m2/wrapper/dists/{apache-maven-<version>,maven-mvnd-<version>-<platform>}/<hash>
[ -z "${MVNW_REPOURL-}" ] || distributionUrl="$MVNW_REPOURL$_MVNW_REPO_PATTERN${distributionUrl#*"$_MVNW_REPO_PATTERN"}"
distributionUrlName="${distributionUrl##*/}"
distributionUrlNameMain="${distributionUrlName%.*}"
distributionUrlNameMain="${distributionUrlNameMain%-bin}"
MAVEN_USER_HOME="${MAVEN_USER_HOME:-${HOME}/.m2}"
MAVEN_HOME="${MAVEN_USER_HOME}/wrapper/dists/${distributionUrlNameMain-}/$(hash_string "$distributionUrl")"
exec_maven() {
unset MVNW_VERBOSE MVNW_USERNAME MVNW_PASSWORD MVNW_REPOURL || :
exec "$MAVEN_HOME/bin/$MVN_CMD" "$@" || die "cannot exec $MAVEN_HOME/bin/$MVN_CMD"
}
if [ -d "$MAVEN_HOME" ]; then
verbose "found existing MAVEN_HOME at $MAVEN_HOME"
exec_maven "$@"
fi
case "${distributionUrl-}" in
*?-bin.zip | *?maven-mvnd-?*-?*.zip) ;;
*) die "distributionUrl is not valid, must match *-bin.zip or maven-mvnd-*.zip, but found '${distributionUrl-}'" ;;
esac
# prepare tmp dir
if TMP_DOWNLOAD_DIR="$(mktemp -d)" && [ -d "$TMP_DOWNLOAD_DIR" ]; then
clean() { rm -rf -- "$TMP_DOWNLOAD_DIR"; }
trap clean HUP INT TERM EXIT
else
die "cannot create temp dir"
fi
mkdir -p -- "${MAVEN_HOME%/*}"
# Download and Install Apache Maven
verbose "Couldn't find MAVEN_HOME, downloading and installing it ..."
verbose "Downloading from: $distributionUrl"
verbose "Downloading to: $TMP_DOWNLOAD_DIR/$distributionUrlName"
# select .zip or .tar.gz
if ! command -v unzip >/dev/null; then
distributionUrl="${distributionUrl%.zip}.tar.gz"
distributionUrlName="${distributionUrl##*/}"
fi
# verbose opt
__MVNW_QUIET_WGET=--quiet __MVNW_QUIET_CURL=--silent __MVNW_QUIET_UNZIP=-q __MVNW_QUIET_TAR=''
[ "${MVNW_VERBOSE-}" != true ] || __MVNW_QUIET_WGET='' __MVNW_QUIET_CURL='' __MVNW_QUIET_UNZIP='' __MVNW_QUIET_TAR=v
# normalize http auth
case "${MVNW_PASSWORD:+has-password}" in
'') MVNW_USERNAME='' MVNW_PASSWORD='' ;;
has-password) [ -n "${MVNW_USERNAME-}" ] || MVNW_USERNAME='' MVNW_PASSWORD='' ;;
esac
if [ -z "${MVNW_USERNAME-}" ] && command -v wget >/dev/null; then
verbose "Found wget ... using wget"
wget ${__MVNW_QUIET_WGET:+"$__MVNW_QUIET_WGET"} "$distributionUrl" -O "$TMP_DOWNLOAD_DIR/$distributionUrlName" || die "wget: Failed to fetch $distributionUrl"
elif [ -z "${MVNW_USERNAME-}" ] && command -v curl >/dev/null; then
verbose "Found curl ... using curl"
curl ${__MVNW_QUIET_CURL:+"$__MVNW_QUIET_CURL"} -f -L -o "$TMP_DOWNLOAD_DIR/$distributionUrlName" "$distributionUrl" || die "curl: Failed to fetch $distributionUrl"
elif set_java_home; then
verbose "Falling back to use Java to download"
javaSource="$TMP_DOWNLOAD_DIR/Downloader.java"
targetZip="$TMP_DOWNLOAD_DIR/$distributionUrlName"
cat >"$javaSource" <<-END
public class Downloader extends java.net.Authenticator
{
protected java.net.PasswordAuthentication getPasswordAuthentication()
{
return new java.net.PasswordAuthentication( System.getenv( "MVNW_USERNAME" ), System.getenv( "MVNW_PASSWORD" ).toCharArray() );
}
public static void main( String[] args ) throws Exception
{
setDefault( new Downloader() );
java.nio.file.Files.copy( java.net.URI.create( args[0] ).toURL().openStream(), java.nio.file.Paths.get( args[1] ).toAbsolutePath().normalize() );
}
}
END
# For Cygwin/MinGW, switch paths to Windows format before running javac and java
verbose " - Compiling Downloader.java ..."
"$(native_path "$JAVACCMD")" "$(native_path "$javaSource")" || die "Failed to compile Downloader.java"
verbose " - Running Downloader.java ..."
"$(native_path "$JAVACMD")" -cp "$(native_path "$TMP_DOWNLOAD_DIR")" Downloader "$distributionUrl" "$(native_path "$targetZip")"
fi
# If specified, validate the SHA-256 sum of the Maven distribution zip file
if [ -n "${distributionSha256Sum-}" ]; then
distributionSha256Result=false
if [ "$MVN_CMD" = mvnd.sh ]; then
echo "Checksum validation is not supported for maven-mvnd." >&2
echo "Please disable validation by removing 'distributionSha256Sum' from your maven-wrapper.properties." >&2
exit 1
elif command -v sha256sum >/dev/null; then
if echo "$distributionSha256Sum $TMP_DOWNLOAD_DIR/$distributionUrlName" | sha256sum -c >/dev/null 2>&1; then
distributionSha256Result=true
fi
elif command -v shasum >/dev/null; then
if echo "$distributionSha256Sum $TMP_DOWNLOAD_DIR/$distributionUrlName" | shasum -a 256 -c >/dev/null 2>&1; then
distributionSha256Result=true
fi
else
echo "Checksum validation was requested but neither 'sha256sum' or 'shasum' are available." >&2
echo "Please install either command, or disable validation by removing 'distributionSha256Sum' from your maven-wrapper.properties." >&2
exit 1
fi
if [ $distributionSha256Result = false ]; then
echo "Error: Failed to validate Maven distribution SHA-256, your Maven distribution might be compromised." >&2
echo "If you updated your Maven version, you need to update the specified distributionSha256Sum property." >&2
exit 1
fi
fi
# unzip and move
if command -v unzip >/dev/null; then
unzip ${__MVNW_QUIET_UNZIP:+"$__MVNW_QUIET_UNZIP"} "$TMP_DOWNLOAD_DIR/$distributionUrlName" -d "$TMP_DOWNLOAD_DIR" || die "failed to unzip"
else
tar xzf${__MVNW_QUIET_TAR:+"$__MVNW_QUIET_TAR"} "$TMP_DOWNLOAD_DIR/$distributionUrlName" -C "$TMP_DOWNLOAD_DIR" || die "failed to untar"
fi
printf %s\\n "$distributionUrl" >"$TMP_DOWNLOAD_DIR/$distributionUrlNameMain/mvnw.url"
mv -- "$TMP_DOWNLOAD_DIR/$distributionUrlNameMain" "$MAVEN_HOME" || [ -d "$MAVEN_HOME" ] || die "fail to move MAVEN_HOME"
clean || :
exec_maven "$@"

149
mvnw.cmd

@ -0,0 +1,149 @@
<# : batch portion
@REM ----------------------------------------------------------------------------
@REM Licensed to the Apache Software Foundation (ASF) under one
@REM or more contributor license agreements. See the NOTICE file
@REM distributed with this work for additional information
@REM regarding copyright ownership. The ASF licenses this file
@REM to you under the Apache License, Version 2.0 (the
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM http://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@REM KIND, either express or implied. See the License for the
@REM specific language governing permissions and limitations
@REM under the License.
@REM ----------------------------------------------------------------------------
@REM ----------------------------------------------------------------------------
@REM Apache Maven Wrapper startup batch script, version 3.3.2
@REM
@REM Optional ENV vars
@REM MVNW_REPOURL - repo url base for downloading maven distribution
@REM MVNW_USERNAME/MVNW_PASSWORD - user and password for downloading maven
@REM MVNW_VERBOSE - true: enable verbose log; others: silence the output
@REM ----------------------------------------------------------------------------
@IF "%__MVNW_ARG0_NAME__%"=="" (SET __MVNW_ARG0_NAME__=%~nx0)
@SET __MVNW_CMD__=
@SET __MVNW_ERROR__=
@SET __MVNW_PSMODULEP_SAVE=%PSModulePath%
@SET PSModulePath=
@FOR /F "usebackq tokens=1* delims==" %%A IN (`powershell -noprofile "& {$scriptDir='%~dp0'; $script='%__MVNW_ARG0_NAME__%'; icm -ScriptBlock ([Scriptblock]::Create((Get-Content -Raw '%~f0'))) -NoNewScope}"`) DO @(
IF "%%A"=="MVN_CMD" (set __MVNW_CMD__=%%B) ELSE IF "%%B"=="" (echo %%A) ELSE (echo %%A=%%B)
)
@SET PSModulePath=%__MVNW_PSMODULEP_SAVE%
@SET __MVNW_PSMODULEP_SAVE=
@SET __MVNW_ARG0_NAME__=
@SET MVNW_USERNAME=
@SET MVNW_PASSWORD=
@IF NOT "%__MVNW_CMD__%"=="" (%__MVNW_CMD__% %*)
@echo Cannot start maven from wrapper >&2 && exit /b 1
@GOTO :EOF
: end batch / begin powershell #>
$ErrorActionPreference = "Stop"
if ($env:MVNW_VERBOSE -eq "true") {
$VerbosePreference = "Continue"
}
# calculate distributionUrl, requires .mvn/wrapper/maven-wrapper.properties
$distributionUrl = (Get-Content -Raw "$scriptDir/.mvn/wrapper/maven-wrapper.properties" | ConvertFrom-StringData).distributionUrl
if (!$distributionUrl) {
Write-Error "cannot read distributionUrl property in $scriptDir/.mvn/wrapper/maven-wrapper.properties"
}
switch -wildcard -casesensitive ( $($distributionUrl -replace '^.*/','') ) {
"maven-mvnd-*" {
$USE_MVND = $true
$distributionUrl = $distributionUrl -replace '-bin\.[^.]*$',"-windows-amd64.zip"
$MVN_CMD = "mvnd.cmd"
break
}
default {
$USE_MVND = $false
$MVN_CMD = $script -replace '^mvnw','mvn'
break
}
}
# apply MVNW_REPOURL and calculate MAVEN_HOME
# maven home pattern: ~/.m2/wrapper/dists/{apache-maven-<version>,maven-mvnd-<version>-<platform>}/<hash>
if ($env:MVNW_REPOURL) {
$MVNW_REPO_PATTERN = if ($USE_MVND) { "/org/apache/maven/" } else { "/maven/mvnd/" }
$distributionUrl = "$env:MVNW_REPOURL$MVNW_REPO_PATTERN$($distributionUrl -replace '^.*'+$MVNW_REPO_PATTERN,'')"
}
$distributionUrlName = $distributionUrl -replace '^.*/',''
$distributionUrlNameMain = $distributionUrlName -replace '\.[^.]*$','' -replace '-bin$',''
$MAVEN_HOME_PARENT = "$HOME/.m2/wrapper/dists/$distributionUrlNameMain"
if ($env:MAVEN_USER_HOME) {
$MAVEN_HOME_PARENT = "$env:MAVEN_USER_HOME/wrapper/dists/$distributionUrlNameMain"
}
$MAVEN_HOME_NAME = ([System.Security.Cryptography.MD5]::Create().ComputeHash([byte[]][char[]]$distributionUrl) | ForEach-Object {$_.ToString("x2")}) -join ''
$MAVEN_HOME = "$MAVEN_HOME_PARENT/$MAVEN_HOME_NAME"
if (Test-Path -Path "$MAVEN_HOME" -PathType Container) {
Write-Verbose "found existing MAVEN_HOME at $MAVEN_HOME"
Write-Output "MVN_CMD=$MAVEN_HOME/bin/$MVN_CMD"
exit $?
}
if (! $distributionUrlNameMain -or ($distributionUrlName -eq $distributionUrlNameMain)) {
Write-Error "distributionUrl is not valid, must end with *-bin.zip, but found $distributionUrl"
}
# prepare tmp dir
$TMP_DOWNLOAD_DIR_HOLDER = New-TemporaryFile
$TMP_DOWNLOAD_DIR = New-Item -Itemtype Directory -Path "$TMP_DOWNLOAD_DIR_HOLDER.dir"
$TMP_DOWNLOAD_DIR_HOLDER.Delete() | Out-Null
trap {
if ($TMP_DOWNLOAD_DIR.Exists) {
try { Remove-Item $TMP_DOWNLOAD_DIR -Recurse -Force | Out-Null }
catch { Write-Warning "Cannot remove $TMP_DOWNLOAD_DIR" }
}
}
New-Item -Itemtype Directory -Path "$MAVEN_HOME_PARENT" -Force | Out-Null
# Download and Install Apache Maven
Write-Verbose "Couldn't find MAVEN_HOME, downloading and installing it ..."
Write-Verbose "Downloading from: $distributionUrl"
Write-Verbose "Downloading to: $TMP_DOWNLOAD_DIR/$distributionUrlName"
$webclient = New-Object System.Net.WebClient
if ($env:MVNW_USERNAME -and $env:MVNW_PASSWORD) {
$webclient.Credentials = New-Object System.Net.NetworkCredential($env:MVNW_USERNAME, $env:MVNW_PASSWORD)
}
[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12
$webclient.DownloadFile($distributionUrl, "$TMP_DOWNLOAD_DIR/$distributionUrlName") | Out-Null
# If specified, validate the SHA-256 sum of the Maven distribution zip file
$distributionSha256Sum = (Get-Content -Raw "$scriptDir/.mvn/wrapper/maven-wrapper.properties" | ConvertFrom-StringData).distributionSha256Sum
if ($distributionSha256Sum) {
if ($USE_MVND) {
Write-Error "Checksum validation is not supported for maven-mvnd. `nPlease disable validation by removing 'distributionSha256Sum' from your maven-wrapper.properties."
}
Import-Module $PSHOME\Modules\Microsoft.PowerShell.Utility -Function Get-FileHash
if ((Get-FileHash "$TMP_DOWNLOAD_DIR/$distributionUrlName" -Algorithm SHA256).Hash.ToLower() -ne $distributionSha256Sum) {
Write-Error "Error: Failed to validate Maven distribution SHA-256, your Maven distribution might be compromised. If you updated your Maven version, you need to update the specified distributionSha256Sum property."
}
}
# unzip and move
Expand-Archive "$TMP_DOWNLOAD_DIR/$distributionUrlName" -DestinationPath "$TMP_DOWNLOAD_DIR" | Out-Null
Rename-Item -Path "$TMP_DOWNLOAD_DIR/$distributionUrlNameMain" -NewName $MAVEN_HOME_NAME | Out-Null
try {
Move-Item -Path "$TMP_DOWNLOAD_DIR/$MAVEN_HOME_NAME" -Destination $MAVEN_HOME_PARENT | Out-Null
} catch {
if (! (Test-Path -Path "$MAVEN_HOME" -PathType Container)) {
Write-Error "fail to move MAVEN_HOME"
}
} finally {
try { Remove-Item $TMP_DOWNLOAD_DIR -Recurse -Force | Out-Null }
catch { Write-Warning "Cannot remove $TMP_DOWNLOAD_DIR" }
}
Write-Output "MVN_CMD=$MAVEN_HOME/bin/$MVN_CMD"

37
pom.xml

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.huaxing</groupId>
<artifactId>data-bridge</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>data-bridge</name>
<description>data-bridge</description>
<modules>
<module>data-common</module>
<module>data-storage</module>
</modules>
<repositories>
<repository>
<id>aliyunmaven</id>
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
<releases>
<!-- 使用稳定版本 -->
<enabled>true</enabled>
</releases>
<snapshots>
<!-- 使用快照版本 -->
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>
Loading…
Cancel
Save