Browse Source

数据库建库、建表、订阅、字段处理等相关接口编写,适配feign接口

dev
swordmeng 3 weeks ago
parent
commit
0b911d2642
  1. 3
      data-framework/src/main/java/com/huaxing/common/result/ResultVo.java
  2. 25
      data-framework/src/main/java/com/huaxing/dolphindb/base/CommonService.java
  3. 5
      data-storage-api/pom.xml
  4. 4
      data-storage-api/src/main/java/com/huaxing/feign/IDatabaseClientFeign.java
  5. 4
      data-storage-api/src/main/java/com/huaxing/feign/fallback/DatabaseFeignFallbackFactory.java
  6. 2
      data-storage-api/src/main/java/com/huaxing/pojo/dto/DatabaseDTO.java
  7. 8
      data-storage-api/src/main/java/com/huaxing/pojo/dto/TableColumnDTO.java
  8. 57
      data-storage-api/src/main/java/com/huaxing/pojo/dto/TableDTO.java
  9. 16
      data-storage-api/src/main/java/com/huaxing/pojo/entity/TableDTO.java
  10. 5
      data-storage/pom.xml
  11. 39
      data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseController.java
  12. 240
      data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseTestController.java
  13. 5
      data-storage/src/main/java/com/huaxing/data/database/service/IDatabaseService.java
  14. 25
      data-storage/src/main/java/com/huaxing/data/database/service/ITableStructureService.java
  15. 33
      data-storage/src/main/java/com/huaxing/data/database/service/impl/DatabaseServiceImpl.java
  16. 78
      data-storage/src/main/java/com/huaxing/data/database/service/impl/TableStructureService.java
  17. 19
      data-storage/src/main/java/com/huaxing/data/database/template/ISqlTemplateService.java
  18. 83
      data-storage/src/main/java/com/huaxing/data/database/template/SqlTemplateServiceImpl.java

3
data-framework/src/main/java/com/huaxing/common/result/ResultVo.java

@ -54,4 +54,7 @@ public class ResultVo<T> implements Serializable {
public static ResultVo<?> status(boolean flag) { public static ResultVo<?> status(boolean flag) {
return flag ? ResultVo.ok() : ResultVo.fail("操作失败!"); return flag ? ResultVo.ok() : ResultVo.fail("操作失败!");
} }
public static ResultVo<?> status(boolean flag,String successMsg,String failMsg) {
return flag ? ResultVo.ok(successMsg, null) : ResultVo.fail(failMsg);
}
} }

25
data-framework/src/main/java/com/huaxing/dolphindb/base/CommonService.java

@ -1,7 +1,10 @@
package com.huaxing.dolphindb.base; package com.huaxing.dolphindb.base;
import com.huaxing.common.result.ResultVo;
import com.huaxing.dolphindb.connection.AbstractDbConnector; import com.huaxing.dolphindb.connection.AbstractDbConnector;
import com.xxdb.DBConnection; import com.xxdb.DBConnection;
import com.xxdb.data.BasicBoolean;
import com.xxdb.data.Entity;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -45,19 +48,18 @@ public class CommonService extends SqlConverterStatementHandle {
* *
* @param sql * @param sql
*/ */
public boolean exec(String sql) { public Entity exec(String sql) {
boolean isSuccess = false; Entity result = null;
DBConnection connection = dbPool.getConnection(); DBConnection connection = dbPool.getConnection();
try { try {
connection.run(sql); result = connection.run(sql);
isSuccess = true;
} catch (IOException e) { } catch (IOException e) {
log.error("AbstractDbConnector.exec() Method执行异常:{}", e.getMessage()); log.error("AbstractDbConnector.exec() Method执行异常:{}", e.getMessage());
throw new RuntimeException(e); throw new RuntimeException(e);
} finally { } finally {
connection.close(); connection.close();
} }
return isSuccess; return result;
} }
/** /**
@ -77,4 +79,17 @@ public class CommonService extends SqlConverterStatementHandle {
connection.close(); connection.close();
} }
/**
* 解析bool返回值
* @param result
* @return
*/
public boolean parseBoolResult(Entity result) {
boolean dbExists = false;
if (result instanceof BasicBoolean) {
dbExists = ((BasicBoolean) result).getBoolean();
}
return dbExists;
}
} }

5
data-storage-api/pom.xml

@ -36,6 +36,11 @@
<artifactId>spring-cloud-context</artifactId> <artifactId>spring-cloud-context</artifactId>
<version>3.1.4</version> <version>3.1.4</version>
</dependency> </dependency>
<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
<version>2.0.2</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

4
data-storage-api/src/main/java/com/huaxing/feign/IDatabaseClientFeign.java

@ -3,8 +3,8 @@ package com.huaxing.feign;
import com.huaxing.common.constant.AppConstant; import com.huaxing.common.constant.AppConstant;
import com.huaxing.common.result.ResultVo; import com.huaxing.common.result.ResultVo;
import com.huaxing.feign.fallback.DatabaseFeignFallbackFactory; import com.huaxing.feign.fallback.DatabaseFeignFallbackFactory;
import com.huaxing.pojo.entity.DatabaseDTO; import com.huaxing.pojo.dto.DatabaseDTO;
import com.huaxing.pojo.entity.TableDTO; import com.huaxing.pojo.dto.TableDTO;
import org.springframework.cloud.openfeign.FeignClient; import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;

4
data-storage-api/src/main/java/com/huaxing/feign/fallback/DatabaseFeignFallbackFactory.java

@ -2,8 +2,8 @@ package com.huaxing.feign.fallback;
import com.huaxing.common.result.ResultVo; import com.huaxing.common.result.ResultVo;
import com.huaxing.feign.IDatabaseClientFeign; import com.huaxing.feign.IDatabaseClientFeign;
import com.huaxing.pojo.entity.DatabaseDTO; import com.huaxing.pojo.dto.DatabaseDTO;
import com.huaxing.pojo.entity.TableDTO; import com.huaxing.pojo.dto.TableDTO;
import feign.hystrix.FallbackFactory; import feign.hystrix.FallbackFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;

2
data-storage-api/src/main/java/com/huaxing/pojo/entity/DatabaseDTO.java → data-storage-api/src/main/java/com/huaxing/pojo/dto/DatabaseDTO.java

@ -1,4 +1,4 @@
package com.huaxing.pojo.entity; package com.huaxing.pojo.dto;
import lombok.Data; import lombok.Data;

8
data-storage-api/src/main/java/com/huaxing/pojo/entity/TableColumnDTO.java → data-storage-api/src/main/java/com/huaxing/pojo/dto/TableColumnDTO.java

@ -1,7 +1,9 @@
package com.huaxing.pojo.entity; package com.huaxing.pojo.dto;
import lombok.Data; import lombok.Data;
import javax.validation.constraints.NotEmpty;
/** /**
* @ProjectName: iot-data-bridge * @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.database.domain * @Package: com.huaxing.data.database.domain
@ -16,11 +18,15 @@ public class TableColumnDTO {
/** /**
* 列名 * 列名
*/ */
@NotEmpty(message = "列名不能为空")
private String columnName; private String columnName;
/** /**
* 列类型 * 列类型
*/ */
@NotEmpty(message = "列类型")
private String columnType; private String columnType;
/** /**
* 列描述 * 列描述
*/ */

57
data-storage/src/main/java/com/huaxing/data/database/domain/TableDTO.java → data-storage-api/src/main/java/com/huaxing/pojo/dto/TableDTO.java

@ -1,66 +1,93 @@
package com.huaxing.data.database.domain; package com.huaxing.pojo.dto;
import com.huaxing.pojo.entity.TableColumnDTO;
import lombok.Data; import lombok.Data;
import javax.validation.constraints.NotEmpty;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
/** /**
* @ProjectName: iot-data-bridge * @ProjectName: data-bridge
* @Package: com.huaxing.data.database.domain * @Package: com.huaxing.pojo.entity
* @ClassName: TableDTO * @ClassName: TableDTO
* @Author: swordmeng8@163.com * @Author: swordmeng8@163.com
* @Description: 操作 * @Description: 结构接参
* @Date: 2025/1/15 16:03 * @Date: 2025/2/12 17:40
* @Version: 1.0 * @Version: 1.0
*/ */
@Data @Data
public class TableDTO { public class TableDTO {
/** /**
* 数据库名 * 数据库名
*/ */
@NotEmpty(message = "数据库名不能为空")
private String databaseName; private String databaseName;
/** /**
* 表名 * dfs表名
*/ */
private String tableName; @NotEmpty(message = "Dfs表名不能为空")
private String dfsTableName;
/**
* stream表名
*/
@NotEmpty(message = "Stream表名不能为空")
private String streamTableName;
/** /**
* 表描述 * 表描述
*/ */
private String tableDesc; private String tableDesc;
/** /**
* 表列信息 * 表列信息
*/ */
private List<TableColumnDTO> tableColumnList; private List<TableColumnDTO> tableColumnList;
/**
* 获取列名的拼接字符串并返回
*
* @param tableColumnList
* @return String
*/
public String handleTableColumnName(List<TableColumnDTO> tableColumnList) { public String handleTableColumnName(List<TableColumnDTO> tableColumnList) {
return tableColumnList.stream().map(TableColumnDTO::getColumnName).reduce((a, b) -> a + "`" + b).get(); return tableColumnList.stream().map(TableColumnDTO::getColumnName).reduce((a, b) -> a + "`" + b).get();
} }
/**
* 获取列类型的拼接字符串并返回
*
* @param tableColumnList
* @return String
*/
public String handleTableColumnType(List<TableColumnDTO> tableColumnList) { public String handleTableColumnType(List<TableColumnDTO> tableColumnList) {
return tableColumnList.stream().map(TableColumnDTO::getColumnType).reduce((a, b) -> a + "," + b).get(); return tableColumnList.stream().map(TableColumnDTO::getColumnType).reduce((a, b) -> a + "," + b).get();
} }
/**
* 获取列与描述的拼接字符串并返回
*
* @param tableColumnList
* @return String
*/
public String handleTableColumnDesc(List<TableColumnDTO> tableColumnList) { public String handleTableColumnDesc(List<TableColumnDTO> tableColumnList) {
String columnDesc = ""; StringBuilder columnDesc = new StringBuilder();
// 获取列与描述的拼接字符串并返回 // 获取列与描述的拼接字符串并返回
Iterator<TableColumnDTO> iterator = tableColumnList.iterator(); Iterator<TableColumnDTO> iterator = tableColumnList.iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
TableColumnDTO tableColumn = iterator.next(); TableColumnDTO tableColumn = iterator.next();
if (Objects.isNull(tableColumn.getColumnDesc())) { if (Objects.isNull(tableColumn.getColumnDesc())) {
columnDesc += tableColumn.getColumnName() + ":" + "\"-\""; columnDesc.append(tableColumn.getColumnName()).append(":").append("\"-\"");
} else { } else {
columnDesc += tableColumn.getColumnName() + ":\"" + tableColumn.getColumnDesc()+"\""; columnDesc.append(tableColumn.getColumnName()).append(":\"").append(tableColumn.getColumnDesc()).append("\"");
} }
if (iterator.hasNext()) { if (iterator.hasNext()) {
columnDesc += ","; columnDesc.append(",");
} }
} }
return columnDesc; return columnDesc.toString();
} }
} }

16
data-storage-api/src/main/java/com/huaxing/pojo/entity/TableDTO.java

@ -1,16 +0,0 @@
package com.huaxing.pojo.entity;
import lombok.Data;
/**
* @ProjectName: data-bridge
* @Package: com.huaxing.pojo.entity
* @ClassName: TableDTO
* @Author: swordmeng8@163.com
* @Description: 表结构接参类
* @Date: 2025/2/12 17:40
* @Version: 1.0
*/
@Data
public class TableDTO {
}

5
data-storage/pom.xml

@ -59,6 +59,11 @@
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
<version>2.0.2</version>
</dependency>
</dependencies> </dependencies>

39
data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseController.java

@ -1,8 +1,11 @@
package com.huaxing.data.database.controller; package com.huaxing.data.database.controller;
import com.huaxing.common.result.ResultVo; import com.huaxing.common.result.ResultVo;
import com.huaxing.pojo.entity.DatabaseDTO; import com.huaxing.data.database.service.IDatabaseService;
import com.huaxing.pojo.entity.TableDTO; import com.huaxing.data.database.service.ITableStructureService;
import com.huaxing.pojo.dto.DatabaseDTO;
import com.huaxing.pojo.dto.TableDTO;
import lombok.AllArgsConstructor;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestBody;
@ -15,8 +18,13 @@ import org.springframework.web.bind.annotation.RestController;
* @author YourName * @author YourName
*/ */
@RestController @RestController
@AllArgsConstructor
@RequestMapping("/api/database") @RequestMapping("/api/database")
public class DatabaseController { public class DatabaseController {
/** 表结构 */
private final ITableStructureService tableStructureService;
/** 数据库 */
private final IDatabaseService databaseService;
/** /**
* 创建数据库 * 创建数据库
@ -25,8 +33,7 @@ public class DatabaseController {
*/ */
@PostMapping("/create") @PostMapping("/create")
public ResultVo<?> createDatabase(@RequestBody @Validated DatabaseDTO databaseDTO) { public ResultVo<?> createDatabase(@RequestBody @Validated DatabaseDTO databaseDTO) {
// 这里添加创建数据库的具体逻辑 return databaseService.createDatabase(databaseDTO);
return ResultVo.ok();
} }
/** /**
@ -36,8 +43,7 @@ public class DatabaseController {
*/ */
@PostMapping("/table/create/dfs") @PostMapping("/table/create/dfs")
public ResultVo<?> createDfsTable(@RequestBody @Validated TableDTO tableDTO) { public ResultVo<?> createDfsTable(@RequestBody @Validated TableDTO tableDTO) {
// 这里添加创建 dfs 表的具体逻辑 return tableStructureService.createDfsTable(tableDTO);
return ResultVo.ok();
} }
/** /**
@ -47,8 +53,7 @@ public class DatabaseController {
*/ */
@PostMapping("/table/create/stream") @PostMapping("/table/create/stream")
public ResultVo<?> createStreamTable(@RequestBody @Validated TableDTO tableDTO) { public ResultVo<?> createStreamTable(@RequestBody @Validated TableDTO tableDTO) {
// 这里添加创建 stream 表的具体逻辑 return tableStructureService.createStreamTable(tableDTO);
return ResultVo.ok();
} }
/** /**
@ -58,8 +63,7 @@ public class DatabaseController {
*/ */
@PostMapping("/table/create/dfs/stream/subscribe") @PostMapping("/table/create/dfs/stream/subscribe")
public ResultVo<?> createDfsAndStreamTableSubscribe(@RequestBody @Validated TableDTO tableDTO) { public ResultVo<?> createDfsAndStreamTableSubscribe(@RequestBody @Validated TableDTO tableDTO) {
// 这里添加创建 dfs 和 stream 表并订阅的具体逻辑 return tableStructureService.createDfsAndStreamTableSubscribe(tableDTO);
return ResultVo.ok();
} }
/** /**
@ -69,8 +73,7 @@ public class DatabaseController {
*/ */
@PostMapping("/table/create/dfs/stream/unsubscribe") @PostMapping("/table/create/dfs/stream/unsubscribe")
public ResultVo<?> createDfsAndStreamTableUnsubscribe(@RequestBody @Validated TableDTO tableDTO) { public ResultVo<?> createDfsAndStreamTableUnsubscribe(@RequestBody @Validated TableDTO tableDTO) {
// 这里添加创建 dfs 和 stream 表不订阅的具体逻辑 return tableStructureService.createDfsAndStreamTableUnsubscribe(tableDTO);
return ResultVo.ok();
} }
/** /**
@ -80,8 +83,7 @@ public class DatabaseController {
*/ */
@PostMapping("/table/subscribe/stream") @PostMapping("/table/subscribe/stream")
public ResultVo<?> subscribeStreamTable(@RequestBody @Validated TableDTO tableDTO) { public ResultVo<?> subscribeStreamTable(@RequestBody @Validated TableDTO tableDTO) {
// 这里添加订阅流表的具体逻辑 return tableStructureService.subscribeStreamTable(tableDTO);
return ResultVo.ok();
} }
/** /**
@ -91,8 +93,7 @@ public class DatabaseController {
*/ */
@PostMapping("/table/unsubscribe/stream") @PostMapping("/table/unsubscribe/stream")
public ResultVo<?> unsubscribeStreamTable(@RequestBody @Validated TableDTO tableDTO) { public ResultVo<?> unsubscribeStreamTable(@RequestBody @Validated TableDTO tableDTO) {
// 这里添加取消订阅流表的具体逻辑 return tableStructureService.unsubscribeStreamTable(tableDTO);
return ResultVo.ok();
} }
/** /**
@ -102,8 +103,7 @@ public class DatabaseController {
*/ */
@PostMapping("/table/add/dfs/columns") @PostMapping("/table/add/dfs/columns")
public ResultVo<?> addDfsColumns(@RequestBody @Validated TableDTO tableDTO) { public ResultVo<?> addDfsColumns(@RequestBody @Validated TableDTO tableDTO) {
// 这里添加新增 dfs 表字段的具体逻辑 return tableStructureService.addDfsColumns(tableDTO);
return ResultVo.ok();
} }
/** /**
@ -113,7 +113,6 @@ public class DatabaseController {
*/ */
@PostMapping("/table/add/stream/columns") @PostMapping("/table/add/stream/columns")
public ResultVo<?> addStreamColumns(@RequestBody @Validated TableDTO tableDTO) { public ResultVo<?> addStreamColumns(@RequestBody @Validated TableDTO tableDTO) {
// 这里添加新增 stream 表字段的具体逻辑 return tableStructureService.addStreamColumns(tableDTO);
return ResultVo.ok();
} }
} }

240
data-storage/src/main/java/com/huaxing/data/database/controller/DatabaseTestController.java

@ -1,240 +0,0 @@
package com.huaxing.data.database.controller;
import com.huaxing.data.storage.domain.DataAnalysisDTO;
import com.huaxing.data.storage.service.IDeviceDataQueryDfsService;
import com.huaxing.data.storage.service.IDeviceDataQueryStreamService;
import com.huaxing.data.storage.service.IDeviceDataStoredService;
import com.huaxing.data.database.domain.TableDTO;
import com.huaxing.data.database.service.ITableStructureService;
import com.huaxing.common.util.JacksonUtil;
import com.huaxing.data.database.template.ISqlTemplateService;
import com.huaxing.mqtt.processor.MqttMessageSender;
import com.huaxing.common.result.ResultVo;
import com.huaxing.pojo.entity.TableColumnDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.*;
/**
* @ProjectName: iot-data-bridge
* @Package: com.huaxing.data.storage.controller
* @ClassName: TestController
* @Author: swordmeng8@163.com
* @Description: 测试controller
* @Date: 2025/1/15 17:08
* @Version: 1.0
*/
@Slf4j
@RestController
@RequestMapping("/api/database")
@SuppressWarnings("all")
public class DatabaseTestController {
@Autowired
private MqttMessageSender messageSender;
final IDeviceDataStoredService dataStoredService;
final IDeviceDataQueryDfsService dataQueryDfsService;
final IDeviceDataQueryStreamService dataQueryStreamService;
final ITableStructureService tableStructureService;
// =============================================== 测试构造器 ======================================
public DatabaseTestController(IDeviceDataStoredService dataStoredService, IDeviceDataQueryDfsService dataQueryDfsService, IDeviceDataQueryStreamService dataQueryStreamService, ITableStructureService tableStructureService, ISqlTemplateService tableTemplateService) {
this.dataStoredService = dataStoredService;
this.dataQueryDfsService = dataQueryDfsService;
this.dataQueryStreamService = dataQueryStreamService;
this.tableStructureService = tableStructureService;
}
// =============================================== 测试插入数据 ======================================
/**
* 测试数据插入
*/
@GetMapping(value = "/testInsert")
public void testInsert() {
String tableName = "MyTable2Stream";
String sql = "INSERT INTO " + tableName + " (time, projectId, deviceId, test) VALUES (2025.01.11 00:00:00,'48', '0jZU2102_0806_0011', 'testVal')";
dataStoredService.execute(sql);
log.info("SUCCESS");
}
// =============================================== 测试Dfs表查询 ======================================
/**
* 测试Dfs表查询
*/
@GetMapping(value = "/testSelectDfs")
public ResultVo<List<Map<String, Object>>> testSelectDfs() {
String dbPath = "dfs://ZbDB";
String sql = String.format("select * from loadTable('%s','%s')", dbPath, "ZbWaterMeter1Dfs");
return ResultVo.ok(dataQueryDfsService.selectList(sql));
}
// =============================================== 测试订阅流表查询 ======================================
/**
* 测试订阅流表查询
*/
@GetMapping(value = "/testSelectStream")
public ResultVo<List<Map<String, Object>>> testSelectStream() {
String sql = "select * from WaterMeterTset1Stream";
return ResultVo.ok(dataQueryStreamService.selectList(sql));
}
// =============================================== 给指定的流表增加列字段 ======================================
/**
* 测试给指定的流表增加列字段
*/
@GetMapping(value = "/testStreamAddColumn")
public void testStreamAddColumn() {
String tableName = "WaterMeterTsetStream";
String columnName = "test";
String columnType = "STRING";
String columnDesc = "test";
TableDTO tableDTO = new TableDTO();
tableDTO.setTableName(tableName);
tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc)));
try {
tableStructureService.addStreamColumns(tableDTO);
log.info("SUCCESS");
} catch (Exception e) {
log.error("FAIL");
e.printStackTrace();
}
}
// =============================================== 给指定的Dfs表增加列字段 ======================================
/**
* 测试给指定的Dfs表增加列字段
*/
@GetMapping(value = "/testDfsAddColumn")
public void testDfsAddColumn() {
String tableName = "WaterMeterTsetDfs";
String columnName = "test";
String columnType = "STRING";
String columnDesc = "test描述";
TableDTO tableDTO = new TableDTO();
tableDTO.setTableName(tableName);
tableDTO.setTableColumnList(Arrays.asList(new TableColumnDTO(columnName, columnType, columnDesc)));
try {
tableStructureService.addDfsColumns(tableDTO);
log.info("SUCCESS");
} catch (Exception e) {
log.error("FAIL");
e.printStackTrace();
}
}
// ===================================== mqtt消息发送-测试数据入库 START ==============================
/**
* 测试发送mqtt消息
*/
@GetMapping(value = "/testSendMessage")
public void testSendMessage() {
for (int i = 0; i < 2; i++) {
List<Map<String, Object>> dataList = new ArrayList<>();
dataList.add(handleMapByIndex(i));
DataAnalysisDTO dataAnalysisDTO = DataAnalysisDTO.builder().tableName( "WaterMeterTsetStream").dataList(dataList).build();
messageSender.send("iot/test1/in-storage", JacksonUtil.objectStr(dataAnalysisDTO));
}
}
/**
* 根据索引生成测试数据
* @param index 索引
* @return
*/
private Map<String, Object> handleMapByIndex(int index) {
Map<String, Object> map = new HashMap<>();
map.put("time", new Date());
map.put("projectId", "0jZU2102");
map.put("deviceId", "0jZU2102_0806_0011");
map.put("WM_WFA", 124.656 + index);
map.put("WM_WFA_Unit", "m³");
map.put("test", "test");
return map;
}
// =============================================== 创建Dfs表 Stream流表 取消订阅 添加订阅 创建 ======================================
/**
* 测试创建Dfs表
*/
@GetMapping(value = "/testCreateDfsTable")
public void testCreateDfsTable() {
TableDTO tableDTO = new TableDTO();
tableDTO.setDatabaseName("ZbDB");
tableDTO.setTableName("Test1");
List<TableColumnDTO> tableColumnList = new ArrayList<>();
tableColumnList.add(new TableColumnDTO("test", "STRING", "test"));
tableColumnList.add(new TableColumnDTO("test1", "STRING","test1"));
tableColumnList.add(new TableColumnDTO("test2", "STRING","test2"));
tableDTO.setTableColumnList(tableColumnList);
tableStructureService.createDfsTable(tableDTO);
}
/**
* 测试创建流表
*/
@GetMapping(value = "/testCreateStreamTable")
public void testCreateStreamTable() {
TableDTO tableDTO = new TableDTO();
tableDTO.setDatabaseName("ZbDB");
tableDTO.setTableName("Test1");
List<TableColumnDTO> tableColumnList = new ArrayList<>();
tableColumnList.add(new TableColumnDTO("test", "STRING", "test"));
tableColumnList.add(new TableColumnDTO("test1", "STRING","test1"));
tableColumnList.add(new TableColumnDTO("test2", "STRING","test2"));
tableDTO.setTableColumnList(tableColumnList);
tableStructureService.createStreamTable(tableDTO);
}
/**
* 取消订阅流表
*/
@GetMapping(value = "/testUnsubscribeStreamTable")
public void testUnsubscribeStreamTable() {
TableDTO tableDTO = new TableDTO();
tableDTO.setDatabaseName("ZbDB");
tableDTO.setTableName("Test1");
tableStructureService.unsubscribeStreamTable(tableDTO);
}
/**
* 订阅流表
*/
@GetMapping(value = "/testSubscribeStreamTable")
public void testSubscribeStreamTable() {
TableDTO tableDTO = new TableDTO();
tableDTO.setDatabaseName("ZbDB");
tableDTO.setTableName("Test1");
tableStructureService.subscribeStreamTable(tableDTO);
}
/**
* 创建Dfs和Stream表并添加订阅
*/
@GetMapping(value = "/testCreateDfsAndStreamTableSubscribe")
public void testCreateDfsAndStreamTableSubscribe() {
TableDTO tableDTO = new TableDTO();
tableDTO.setDatabaseName("ZbDB");
tableDTO.setTableName("MyTe2");
tableDTO.setTableDesc("MyTe2测试表");
List<TableColumnDTO> tableColumnList = new ArrayList<>();
tableColumnList.add(new TableColumnDTO("test", "STRING", "test"));
tableDTO.setTableColumnList(tableColumnList);
tableStructureService.createDfsAndStreamTableSubscribe(tableDTO);
}
}

5
data-storage/src/main/java/com/huaxing/data/database/service/IDatabaseService.java

@ -1,6 +1,7 @@
package com.huaxing.data.database.service; package com.huaxing.data.database.service;
import com.huaxing.common.result.ResultVo; import com.huaxing.common.result.ResultVo;
import com.huaxing.pojo.dto.DatabaseDTO;
/** /**
* 数据库操作 * 数据库操作
@ -10,8 +11,8 @@ import com.huaxing.common.result.ResultVo;
public interface IDatabaseService { public interface IDatabaseService {
// 检查数据库是否存在 // 检查数据库是否存在
ResultVo<?> checkExistsDatabase(String databaseName); boolean checkExistsDatabase(String databaseName);
// 创建数据库 // 创建数据库
ResultVo<?> createDatabase(String databaseName); ResultVo<?> createDatabase(DatabaseDTO databaseDTO);
} }

25
data-storage/src/main/java/com/huaxing/data/database/service/ITableStructureService.java

@ -1,6 +1,8 @@
package com.huaxing.data.database.service; package com.huaxing.data.database.service;
import com.huaxing.data.database.domain.TableDTO;
import com.huaxing.common.result.ResultVo;
import com.huaxing.pojo.dto.TableDTO;
/** /**
* @ProjectName: iot-data-bridge * @ProjectName: iot-data-bridge
@ -14,27 +16,32 @@ import com.huaxing.data.database.domain.TableDTO;
public interface ITableStructureService { public interface ITableStructureService {
// 添加dfs表多列 // 添加dfs表多列
void addDfsColumns(TableDTO tableDTO); ResultVo<?> addDfsColumns(TableDTO tableDTO);
// 添加Stream流表多列 // 添加Stream流表多列
void addStreamColumns(TableDTO tableDTO); ResultVo<?> addStreamColumns(TableDTO tableDTO);
// 创建dfs表 // 创建dfs表
void createDfsTable(TableDTO tableDTO); ResultVo<?> createDfsTable(TableDTO tableDTO);
// 创建Stream流表 // 创建Stream流表
void createStreamTable(TableDTO tableDTO); ResultVo<?> createStreamTable(TableDTO tableDTO);
// 取消订阅 // 取消订阅
public void unsubscribeStreamTable(TableDTO tableDTO); ResultVo<?> unsubscribeStreamTable(TableDTO tableDTO);
// 订阅流表 // 订阅流表
void subscribeStreamTable(TableDTO tableDTO); ResultVo<?> subscribeStreamTable(TableDTO tableDTO);
// 判断表是否存在 // 判断表是否存在
boolean isTableExist(String tableName); boolean dfsTableExistStatus(String databaseName, String dfsTableName);
// 判断表是否存在
boolean streamTableExistStatus(String streamTableName);
void createDfsAndStreamTableSubscribe(TableDTO tableDTO); // 创建dfs表和Stream表并订阅
ResultVo<?> createDfsAndStreamTableSubscribe(TableDTO tableDTO);
// 创建dfs表和Stream表不订阅
ResultVo<?> createDfsAndStreamTableUnsubscribe(TableDTO tableDTO);
} }

33
data-storage/src/main/java/com/huaxing/data/database/service/impl/DatabaseServiceImpl.java

@ -1,11 +1,14 @@
package com.huaxing.data.database.service.impl; package com.huaxing.data.database.service.impl;
import com.huaxing.common.result.ResultVo; import com.huaxing.common.result.ResultVo;
import com.huaxing.common.util.StrUtils;
import com.huaxing.data.database.service.IDatabaseService; import com.huaxing.data.database.service.IDatabaseService;
import com.huaxing.data.database.template.ISqlTemplateService; import com.huaxing.data.database.template.ISqlTemplateService;
import com.huaxing.dolphindb.base.CommonService; import com.huaxing.dolphindb.base.CommonService;
import com.huaxing.pojo.dto.DatabaseDTO;
import com.xxdb.data.BasicBoolean;
import com.xxdb.data.Entity;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/** /**
@ -17,11 +20,12 @@ import org.springframework.stereotype.Service;
* @Date: 2025/2/12 13:48 * @Date: 2025/2/12 13:48
* @Version: 1.0 * @Version: 1.0
*/ */
@Slf4j
@Service @Service
@AllArgsConstructor @AllArgsConstructor
public class DatabaseServiceImpl extends CommonService implements IDatabaseService { public class DatabaseServiceImpl extends CommonService implements IDatabaseService {
final ISqlTemplateService tableTemplateService; private final ISqlTemplateService tableTemplateService;
/** /**
* @description: 检查数据库是否存在 * @description: 检查数据库是否存在
@ -31,12 +35,9 @@ public class DatabaseServiceImpl extends CommonService implements IDatabaseServi
* @date 2025/2/12 13:48 * @date 2025/2/12 13:48
*/ */
@Override @Override
public ResultVo<?> checkExistsDatabase(String databaseName) { public boolean checkExistsDatabase(String databaseName) {
if (!StrUtils.checkStringStartsWithExample(databaseName, "dfs://")) { Entity result = exec(tableTemplateService.checkExistsDatabaseTemplate(databaseName));
return ResultVo.fail("数据库名必须以dfs://开头"); return parseBoolResult(result);
}
;
return ResultVo.status(exec(tableTemplateService.checkExistsDatabaseTemplate(databaseName)));
} }
/** /**
@ -47,16 +48,12 @@ public class DatabaseServiceImpl extends CommonService implements IDatabaseServi
* @date 2025/2/12 13:48 * @date 2025/2/12 13:48
*/ */
@Override @Override
public ResultVo<?> createDatabase(String databaseName) { public ResultVo<?> createDatabase(DatabaseDTO databaseDTO) {
if (this.checkExistsDatabase(databaseDTO.getDatabaseName())) {
return ResultVo.fail("数据库已存在");
}
// 创建数据库操作 // 创建数据库操作
// StringBuilder script = new StringBuilder(); exec(tableTemplateService.createDatabaseTemplate(databaseDTO.getDatabaseName()));
// script.append("pt = loadTable(\"dfs://").append(tableDTO.getDatabaseName()).append("\", `").append(tableDTO.getTableName()).append(");"); return ResultVo.status(this.checkExistsDatabase(databaseDTO.getDatabaseName()), "创建数据库成功","创建数据库失败");
// tableDTO.getTableColumnList().forEach(columnItem -> {
// script.append("alter table pt add ").append(columnItem.getColumnName()).append(" ").append(columnItem.getColumnType()).append(";");
// });
// exec(script.toString());
return ResultVo.status(true);
} }
} }

78
data-storage/src/main/java/com/huaxing/data/database/service/impl/TableStructureService.java

@ -1,9 +1,12 @@
package com.huaxing.data.database.service.impl; package com.huaxing.data.database.service.impl;
import com.huaxing.data.database.domain.TableDTO; import com.huaxing.common.result.ResultVo;
import com.huaxing.data.database.template.ISqlTemplateService; import com.huaxing.data.database.template.ISqlTemplateService;
import com.huaxing.dolphindb.base.CommonService; import com.huaxing.dolphindb.base.CommonService;
import com.huaxing.data.database.service.ITableStructureService; import com.huaxing.data.database.service.ITableStructureService;
import com.huaxing.pojo.dto.TableDTO;
import com.xxdb.data.BasicBoolean;
import com.xxdb.data.Entity;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -18,6 +21,7 @@ import org.springframework.stereotype.Service;
*/ */
@Service @Service
@AllArgsConstructor @AllArgsConstructor
@SuppressWarnings("all")
public class TableStructureService extends CommonService implements ITableStructureService { public class TableStructureService extends CommonService implements ITableStructureService {
final ISqlTemplateService tableTemplateService; final ISqlTemplateService tableTemplateService;
@ -27,13 +31,14 @@ public class TableStructureService extends CommonService implements ITableStruct
* @param tableDTO * @param tableDTO
*/ */
@Override @Override
public void addDfsColumns(TableDTO tableDTO) { public ResultVo<?> addDfsColumns(TableDTO tableDTO) {
StringBuilder script = new StringBuilder(); StringBuilder script = new StringBuilder();
script.append("pt = loadTable(\"dfs://").append(tableDTO.getDatabaseName()).append("\", `").append(tableDTO.getTableName()).append(");"); script.append("pt = loadTable(\"dfs://").append(tableDTO.getDatabaseName()).append("\", `").append(tableDTO.getDfsTableName()).append(");");
tableDTO.getTableColumnList().forEach(columnItem -> { tableDTO.getTableColumnList().forEach(columnItem -> {
script.append("alter table pt add ").append(columnItem.getColumnName()).append(" ").append(columnItem.getColumnType()).append(";"); script.append("alter table pt add ").append(columnItem.getColumnName()).append(" ").append(columnItem.getColumnType()).append(";");
}); });
exec(script.toString()); exec(script.toString());
return ResultVo.status(true);
} }
/** /**
@ -41,12 +46,13 @@ public class TableStructureService extends CommonService implements ITableStruct
* @param tableDTO * @param tableDTO
*/ */
@Override @Override
public void addStreamColumns(TableDTO tableDTO) { public ResultVo<?> addStreamColumns(TableDTO tableDTO) {
StringBuilder streamScript = new StringBuilder(); StringBuilder streamScript = new StringBuilder();
tableDTO.getTableColumnList().forEach(columnItem -> { tableDTO.getTableColumnList().forEach(columnItem -> {
streamScript.append("addColumn(").append(tableDTO.getTableName()).append(", `").append(columnItem.getColumnName()).append(",").append(columnItem.getColumnType()).append(");"); streamScript.append("addColumn(").append(tableDTO.getStreamTableName()).append(", `").append(columnItem.getColumnName()).append(",").append(columnItem.getColumnType()).append(");");
}); });
exec(streamScript.toString()); exec(streamScript.toString());
return ResultVo.status(true);
} }
/** /**
@ -54,8 +60,10 @@ public class TableStructureService extends CommonService implements ITableStruct
* @param tableDTO * @param tableDTO
*/ */
@Override @Override
public void createDfsTable(TableDTO tableDTO) { public ResultVo<?> createDfsTable(TableDTO tableDTO) {
exec(tableTemplateService.createDfsTableTemplate(tableDTO)); exec(tableTemplateService.createDfsTableTemplate(tableDTO));
Entity result = exec(tableTemplateService.checkExistsDfsTableTemplate(tableDTO.getDatabaseName(), tableDTO.getDfsTableName()));
return ResultVo.status(parseBoolResult(result), "创建dfs表成功","创建dfs表失败");
} }
/** /**
@ -63,8 +71,10 @@ public class TableStructureService extends CommonService implements ITableStruct
* @param tableDTO * @param tableDTO
*/ */
@Override @Override
public void createStreamTable(TableDTO tableDTO) { public ResultVo<?> createStreamTable(TableDTO tableDTO) {
exec(tableTemplateService.createStreamTableTemplate(tableDTO)); exec(tableTemplateService.createStreamTableTemplate(tableDTO));
Entity result = exec(tableTemplateService.checkExistsStreamTableTemplate(tableDTO.getStreamTableName()));
return ResultVo.status(parseBoolResult(result), "创建Stream表成功","创建Stream表失败");
} }
/** /**
@ -72,8 +82,9 @@ public class TableStructureService extends CommonService implements ITableStruct
* @param tableDTO * @param tableDTO
*/ */
@Override @Override
public void unsubscribeStreamTable(TableDTO tableDTO) { public ResultVo<?> unsubscribeStreamTable(TableDTO tableDTO) {
exec(tableTemplateService.unsubscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName())); exec(tableTemplateService.unsubscribeStreamTableTemplate(tableDTO));
return ResultVo.ok("取消流表订阅成功",null);
} }
/** /**
@ -81,17 +92,37 @@ public class TableStructureService extends CommonService implements ITableStruct
* @param tableDTO * @param tableDTO
*/ */
@Override @Override
public void subscribeStreamTable(TableDTO tableDTO) { public ResultVo<?> subscribeStreamTable(TableDTO tableDTO) {
exec(tableTemplateService.subscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName())); exec(tableTemplateService.subscribeStreamTableTemplate(tableDTO));
return ResultVo.ok("流表订阅成功",null);
} }
/** /**
* 判断表是否存在 * 判断DFS表是否存在
* @param tableName * @param databaseName,tableName
*/ */
@Override @Override
public boolean isTableExist(String tableName) { public boolean dfsTableExistStatus(String databaseName, String tableName) {
return false; boolean tableExist = false;
Entity result = exec(tableTemplateService.checkExistsDfsTableTemplate(databaseName, tableName));
if (result instanceof BasicBoolean) {
tableExist = ((BasicBoolean) result).getBoolean();
}
return tableExist;
}
/**
* 判断Stream表是否存在
* @param streamTableName
*/
@Override
public boolean streamTableExistStatus(String streamTableName) {
boolean tableExist = false;
Entity result = exec(tableTemplateService.checkExistsStreamTableTemplate(streamTableName));
if (result instanceof BasicBoolean) {
tableExist = ((BasicBoolean) result).getBoolean();
}
return tableExist;
} }
/** /**
@ -99,7 +130,22 @@ public class TableStructureService extends CommonService implements ITableStruct
* @param tableDTO * @param tableDTO
*/ */
@Override @Override
public void createDfsAndStreamTableSubscribe(TableDTO tableDTO) { public ResultVo<?> createDfsAndStreamTableSubscribe(TableDTO tableDTO) {
exec(tableTemplateService.createDfsAndStreamTableSubscribe(tableDTO)); exec(tableTemplateService.createDfsAndStreamTableSubscribe(tableDTO));
boolean dfsStatus = dfsTableExistStatus(tableDTO.getDatabaseName(), tableDTO.getDfsTableName());
boolean streamStatus = streamTableExistStatus(tableDTO.getStreamTableName());
return ResultVo.status(dfsStatus && streamStatus, "创建dfs表和流表并订阅成功","创建dfs表和流表并订阅失败");
}
/**
* 创建dfs表和流表并取消订阅
* @param tableDTO
*/
@Override
public ResultVo<?> createDfsAndStreamTableUnsubscribe(TableDTO tableDTO) {
exec(tableTemplateService.createDfsAndStreamTableUnsubscribe(tableDTO));
boolean dfsStatus = dfsTableExistStatus(tableDTO.getDatabaseName(), tableDTO.getDfsTableName());
boolean streamStatus = streamTableExistStatus(tableDTO.getStreamTableName());
return ResultVo.status(dfsStatus && streamStatus, "创建dfs表和流表成功","创建dfs表和流表失败");
} }
} }

19
data-storage/src/main/java/com/huaxing/data/database/template/ISqlTemplateService.java

@ -1,6 +1,7 @@
package com.huaxing.data.database.template; package com.huaxing.data.database.template;
import com.huaxing.data.database.domain.TableDTO;
import com.huaxing.pojo.dto.TableDTO;
/** /**
* 表创建模板 * 表创建模板
@ -22,10 +23,22 @@ public interface ISqlTemplateService {
String createStreamTableTemplate(TableDTO tableDTO); String createStreamTableTemplate(TableDTO tableDTO);
// 取消订阅流表模板 // 取消订阅流表模板
String unsubscribeStreamTableTemplate(String databaseName, String tableName); String unsubscribeStreamTableTemplate(TableDTO tableDTO);
// 订阅流表模板 // 订阅流表模板
String subscribeStreamTableTemplate(String databaseName, String tableName); String subscribeStreamTableTemplate(TableDTO tableDTO);
// 创建dfs和流表订阅模板
String createDfsAndStreamTableSubscribe(TableDTO tableDTO); String createDfsAndStreamTableSubscribe(TableDTO tableDTO);
// 创建dfs和流表取消订阅模板
String createDfsAndStreamTableUnsubscribe(TableDTO tableDTO);
// 检查表是否存在
String checkExistsDfsTableTemplate(String databaseName, String tableName);
// 检查流表是否存在
String checkExistsStreamTableTemplate(String streamTableName);
} }

83
data-storage/src/main/java/com/huaxing/data/database/template/SqlTemplateServiceImpl.java

@ -1,7 +1,7 @@
package com.huaxing.data.database.template; package com.huaxing.data.database.template;
import com.huaxing.data.database.domain.TableDTO;
import com.huaxing.dolphindb.base.CommonService; import com.huaxing.dolphindb.base.CommonService;
import com.huaxing.pojo.dto.TableDTO;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/** /**
@ -24,7 +24,7 @@ public class SqlTemplateServiceImpl extends CommonService implements ISqlTemplat
**/ **/
@Override @Override
public String checkExistsDatabaseTemplate(String databaseName) { public String checkExistsDatabaseTemplate(String databaseName) {
return null; return "existsDatabase(\"dfs://" + databaseName + "\")";
} }
/** /**
@ -35,7 +35,7 @@ public class SqlTemplateServiceImpl extends CommonService implements ISqlTemplat
**/ **/
@Override @Override
public String createDatabaseTemplate(String databaseName) { public String createDatabaseTemplate(String databaseName) {
return null; return "database(directory=\"dfs://" + databaseName + "\", partitionType=VALUE, partitionScheme=2025.01M..2050.12M, engine='TSDB')";
} }
/** /**
@ -51,7 +51,7 @@ public class SqlTemplateServiceImpl extends CommonService implements ISqlTemplat
String colTypes = tableDTO.handleTableColumnType(tableDTO.getTableColumnList()); String colTypes = tableDTO.handleTableColumnType(tableDTO.getTableColumnList());
String colDesc = tableDTO.handleTableColumnDesc(tableDTO.getTableColumnList()); String colDesc = tableDTO.handleTableColumnDesc(tableDTO.getTableColumnList());
sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" + sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" +
"ptName = \"" + tableDTO.getTableName() + "Dfs\"\n" + "ptName = \"" + tableDTO.getDfsTableName() + "Dfs\"\n" +
"def createPT(dbPath,ptName){\n" + "def createPT(dbPath,ptName){\n" +
" db = database(dbPath)\n" + " db = database(dbPath)\n" +
" if (existsTable(dbPath, ptName)){\n" + " if (existsTable(dbPath, ptName)){\n" +
@ -85,7 +85,7 @@ public class SqlTemplateServiceImpl extends CommonService implements ISqlTemplat
String colTypes = tableDTO.handleTableColumnType(tableDTO.getTableColumnList()); String colTypes = tableDTO.handleTableColumnType(tableDTO.getTableColumnList());
// String colDesc = tableDTO.handleTableColumnDesc(tableDTO.getTableColumnList()); // String colDesc = tableDTO.handleTableColumnDesc(tableDTO.getTableColumnList());
sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" + sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" +
"stName = \"" + tableDTO.getTableName() + "Stream\"\n" + "stName = \"" + tableDTO.getDfsTableName() + "Stream\"\n" +
"def createST(stName){\n" + "def createST(stName){\n" +
" if(not existsStreamTable(stName)){\n" + " if(not existsStreamTable(stName)){\n" +
" colNames = `time`projectId`deviceId`" + colNames + "\n" + " colNames = `time`projectId`deviceId`" + colNames + "\n" +
@ -109,12 +109,12 @@ public class SqlTemplateServiceImpl extends CommonService implements ISqlTemplat
* @Version v1.0 * @Version v1.0
**/ **/
@Override @Override
public String unsubscribeStreamTableTemplate(String databaseName, String tableName) { public String unsubscribeStreamTableTemplate(TableDTO tableDTO) {
String sql = ""; String sql = "";
sql = "dbPath = \"dfs://" + databaseName + "\"\n" + sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" +
"stName = \"" + tableName + "Stream\"\n" + "stName = \"" + tableDTO.getDfsTableName() + "Stream\"\n" +
"ptName = \"" + tableName + "Dfs\"\n" + "ptName = \"" + tableDTO.getStreamTableName() + "Dfs\"\n" +
"unsubscribeTable(tableName=stName, actionName=`" + tableName + "ChgTime);"; "unsubscribeTable(tableName=stName, actionName=`" + tableDTO.getDfsTableName() + "ChgTime);";
return sql; return sql;
} }
@ -125,15 +125,21 @@ public class SqlTemplateServiceImpl extends CommonService implements ISqlTemplat
* @Version v1.0 * @Version v1.0
**/ **/
@Override @Override
public String subscribeStreamTableTemplate(String databaseName, String tableName) { public String subscribeStreamTableTemplate(TableDTO tableDTO) {
String sql = ""; String sql = "";
sql = "dbPath = \"dfs://" + databaseName + "\"\n" + sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" +
"stName = \"" + tableName + "Stream\"\n" + "stName = \"" + tableDTO.getDfsTableName() + "Stream\"\n" +
"ptName = \"" + tableName + "Dfs\"\n" + "ptName = \"" + tableDTO.getStreamTableName() + "Dfs\"\n" +
"subscribeTable(tableName=stName, actionName=`" + tableName + "ChgTime, offset=-1, handler=loadTable(dbPath,ptName), msgAsTable=true);"; "subscribeTable(tableName=stName, actionName=`" + tableDTO.getDfsTableName() + "ChgTime, offset=-1, handler=loadTable(dbPath,ptName), msgAsTable=true);";
return sql; return sql;
} }
/**
* @Description 创建 DFS 和流表订阅模板
* @Author swordmeng8@163.com
* @Date 2025/1/20 10:46
* @Version v1.0
**/
@Override @Override
public String createDfsAndStreamTableSubscribe(TableDTO tableDTO) { public String createDfsAndStreamTableSubscribe(TableDTO tableDTO) {
StringBuilder script = new StringBuilder(); StringBuilder script = new StringBuilder();
@ -142,9 +148,52 @@ public class SqlTemplateServiceImpl extends CommonService implements ISqlTemplat
// 执行创建流表模板 // 执行创建流表模板
script.append(createStreamTableTemplate(tableDTO)); script.append(createStreamTableTemplate(tableDTO));
// 取消订阅流表模板 // 取消订阅流表模板
script.append(unsubscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName())); script.append(unsubscribeStreamTableTemplate(tableDTO));
// 订阅流表模板 // 订阅流表模板
script.append(subscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName())); script.append(subscribeStreamTableTemplate(tableDTO));
return script.toString(); return script.toString();
} }
/**
* @Description 创建 DFS 和流表取消订阅模板
* @Author swordmeng8@163.com
* @Date 2025/1/20 10:46
* @Version v1.0
**/
@Override
public String createDfsAndStreamTableUnsubscribe(TableDTO tableDTO) {
StringBuilder script = new StringBuilder();
// 执行创建 DFS 表模板
script.append(createDfsTableTemplate(tableDTO));
// 执行创建流表模板
script.append(createStreamTableTemplate(tableDTO));
// 取消订阅流表模板
script.append(unsubscribeStreamTableTemplate(tableDTO));
return script.toString();
}
/**
* @Description 检查表是否存在模板
* @Author swordmeng8@163.com
* @param databaseName
* @param tableName
* @Date 2025/1/20 10:46
* @Version v1.0
**/
@Override
public String checkExistsDfsTableTemplate(String databaseName, String tableName) {
return String.format("existsTable(\"dfs://%s\", \"%s\")", databaseName, tableName);
}
/**
* @Description 检查流表是否存在模板
* @Author swordmeng8@163.com
* @Date 2025/1/20 10:46
* @param streamTableName
* @return
*/
@Override
public String checkExistsStreamTableTemplate(String streamTableName) {
return String.format("existsStreamTable(\"%s\")", streamTableName);
}
} }

Loading…
Cancel
Save