Browse Source

新增Dfs/Stream表创建 添加订阅、取消订阅等功能

tags/v2.0^0 v2.0
swordmeng 1 month ago
parent
commit
d180054bb8
  1. 7
      data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableColumnDTO.java
  2. 12
      data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableDTO.java
  3. 14
      data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableTemplateService.java
  4. 87
      data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableTemplateServiceImpl.java
  5. 115
      data-storage/src/main/java/com/huaxing/test/TestController.java

7
data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableColumnDTO.java

@ -25,4 +25,11 @@ public class TableColumnDTO {
* 列描述 * 列描述
*/ */
private String columnDesc; private String columnDesc;
public TableColumnDTO() {
}
public TableColumnDTO(String columnName, String columnType, String columnDesc) {
this.columnName = columnName;
this.columnType = columnType;
}
} }

12
data-storage/src/main/java/com/huaxing/data/tablemanagement/domain/TableDTO.java

@ -18,6 +18,10 @@ import java.util.List;
public class TableDTO { public class TableDTO {
/** /**
* 数据库名
*/
private String databaseName;
/**
* 表名 * 表名
*/ */
private String tableName; private String tableName;
@ -26,4 +30,12 @@ public class TableDTO {
*/ */
private List<TableColumnDTO> tableColumnList; private List<TableColumnDTO> tableColumnList;
public String handleTableColumnName(List<TableColumnDTO> tableColumnList) {
return tableColumnList.stream().map(TableColumnDTO::getColumnName).reduce((a, b) -> a + "`" + b).get();
}
public String handleTableColumnType(List<TableColumnDTO> tableColumnList) {
return tableColumnList.stream().map(TableColumnDTO::getColumnType).reduce((a, b) -> a + "," + b).get();
}
} }

14
data-storage/src/main/java/com/huaxing/data/tablemanagement/service/ITableTemplateService.java

@ -1,5 +1,9 @@
package com.huaxing.data.tablemanagement.service; package com.huaxing.data.tablemanagement.service;
import com.huaxing.data.tablemanagement.domain.TableDTO;
import java.util.List;
/** /**
* 表创建模板 * 表创建模板
* @author 孟剑 * @author 孟剑
@ -8,14 +12,16 @@ package com.huaxing.data.tablemanagement.service;
public interface ITableTemplateService { public interface ITableTemplateService {
// 创建dfs表模板 // 创建dfs表模板
String createDfsTableTemplate(String tableName, String columnName, String columnDefinition); String createDfsTableTemplate(TableDTO tableDTO);
// 创建流表模板 // 创建流表模板
String createStreamTableTemplate(String tableName, String columnName, String columnDefinition); String createStreamTableTemplate(TableDTO tableDTO);
// 取消订阅流表模板 // 取消订阅流表模板
String unsubscribeStreamTableTemplate(String tableName, String columnName, String columnDefinition); String unsubscribeStreamTableTemplate(String databaseName, String tableName);
// 订阅流表模板 // 订阅流表模板
String subscribeStreamTableTemplate(String tableName, String columnName, String columnDefinition); String subscribeStreamTableTemplate(String databaseName, String tableName);
String createDfsAndStreamTableSubscribe(TableDTO tableDTO);
} }

87
data-storage/src/main/java/com/huaxing/data/tablemanagement/service/impl/TableTemplateServiceImpl.java

@ -1,6 +1,9 @@
package com.huaxing.data.tablemanagement.service.impl; package com.huaxing.data.tablemanagement.service.impl;
import com.huaxing.data.tablemanagement.domain.TableDTO;
import com.huaxing.data.tablemanagement.service.ITableTemplateService; import com.huaxing.data.tablemanagement.service.ITableTemplateService;
import com.huaxing.dolphindb.base.CommonService;
import org.springframework.stereotype.Service;
/** /**
* @ProjectName: data-bridge * @ProjectName: data-bridge
@ -11,8 +14,8 @@ import com.huaxing.data.tablemanagement.service.ITableTemplateService;
* @Date: 2025/1/20 10:38 * @Date: 2025/1/20 10:38
* @Version: 1.0 * @Version: 1.0
*/ */
@Service
public class TableTemplateServiceImpl implements ITableTemplateService { public class TableTemplateServiceImpl extends CommonService implements ITableTemplateService {
/** /**
* @Description 创建 DFS 表模板 * @Description 创建 DFS 表模板
@ -21,9 +24,29 @@ public class TableTemplateServiceImpl implements ITableTemplateService {
* @Version v1.0 * @Version v1.0
**/ **/
@Override @Override
public String createDfsTableTemplate(String tableName, String columnName, String columnDefinition) { public String createDfsTableTemplate(TableDTO tableDTO) {
String sql = "";
return null; String colNames = tableDTO.handleTableColumnName(tableDTO.getTableColumnList());
String colTypes = tableDTO.handleTableColumnType(tableDTO.getTableColumnList());
sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" +
"ptName = \"" + tableDTO.getTableName() + "Dfs\"\n" +
"def createPT(dbPath,ptName){\n" +
" db = database(dbPath)\n" +
" if (existsTable(dbPath, ptName)){\n" +
" pt = loadTable(db,ptName)\n" +
" return pt\n" +
" }\n" +
" else{\n" +
" colNames = `time`projectId`deviceId`" + colNames + "\n" +
" colTypes = [DATETIME,STRING,STRING," + colTypes + "]\n" +
" t = table(1:0, colNames, colTypes)\n" +
" pt = db.createPartitionedTable(table=t, tableName=ptName, partitionColumns=`time, sortColumns=`deviceId`time, keepDuplicates=LAST)\n" +
" return pt\n" +
" }\n" +
"}\n" +
"createPT(dbPath,ptName);";
exec(sql);
return sql;
} }
/** /**
@ -33,8 +56,27 @@ public class TableTemplateServiceImpl implements ITableTemplateService {
* @Version v1.0 * @Version v1.0
**/ **/
@Override @Override
public String createStreamTableTemplate(String tableName, String columnName, String columnDefinition) { public String createStreamTableTemplate(TableDTO tableDTO) {
return null; String sql = "";
String colNames = tableDTO.handleTableColumnName(tableDTO.getTableColumnList());
String colTypes = tableDTO.handleTableColumnType(tableDTO.getTableColumnList());
sql = "dbPath = \"dfs://" + tableDTO.getDatabaseName() + "\"\n" +
"stName = \"" + tableDTO.getTableName() + "Stream\"\n" +
"def createST(stName){\n" +
" if(not existsStreamTable(stName)){\n" +
" colNames = `time`projectId`deviceId`" + colNames + "\n" +
" colTypes = [DATETIME,STRING,STRING,"+ colTypes +"]\n" +
" st = streamTable(150000:0, colNames, colTypes)\n" +
" enableTableShareAndPersistence(table=st, tableName=stName, retentionMinutes=10, cacheSize=100000, preCache=100000 )\n" +
" return st\n" +
" }\n" +
" else{\n" +
" return objByName(stName, true)\n" +
" }\n" +
"}\n" +
"createST(stName);";
exec(sql);
return sql;
} }
/** /**
@ -44,8 +86,14 @@ public class TableTemplateServiceImpl implements ITableTemplateService {
* @Version v1.0 * @Version v1.0
**/ **/
@Override @Override
public String unsubscribeStreamTableTemplate(String tableName, String columnName, String columnDefinition) { public String unsubscribeStreamTableTemplate(String databaseName, String tableName) {
return null; String sql = "";
sql = "dbPath = \"dfs://" + databaseName + "\"\n" +
"stName = \"" + tableName + "Stream\"\n" +
"ptName = \"" + tableName + "Dfs\"\n" +
"unsubscribeTable(tableName=stName, actionName=`" + tableName + "ChgTime);";
exec(sql);
return sql;
} }
/** /**
@ -55,7 +103,26 @@ public class TableTemplateServiceImpl implements ITableTemplateService {
* @Version v1.0 * @Version v1.0
**/ **/
@Override @Override
public String subscribeStreamTableTemplate(String tableName, String columnName, String columnDefinition) { public String subscribeStreamTableTemplate(String databaseName, String tableName) {
String sql = "";
sql = "dbPath = \"dfs://" + databaseName + "\"\n" +
"stName = \"" + tableName + "Stream\"\n" +
"ptName = \"" + tableName + "Dfs\"\n" +
"subscribeTable(tableName=stName, actionName=`" + tableName + "ChgTime, offset=-1, handler=loadTable(dbPath,ptName), msgAsTable=true);";
exec(sql);
return sql;
}
@Override
public String createDfsAndStreamTableSubscribe(TableDTO tableDTO) {
// 执行创建 DFS 表模板
createDfsTableTemplate(tableDTO);
// 执行创建流表模板
createStreamTableTemplate(tableDTO);
// 取消订阅流表模板
unsubscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName());
// 订阅流表模板
subscribeStreamTableTemplate(tableDTO.getDatabaseName(), tableDTO.getTableName());
return null; return null;
} }
} }

115
data-storage/src/main/java/com/huaxing/test/TestController.java

@ -4,12 +4,15 @@ import com.huaxing.data.storage.domain.DataAnalysisDTO;
import com.huaxing.data.storage.service.IDeviceDataQueryDfsService; import com.huaxing.data.storage.service.IDeviceDataQueryDfsService;
import com.huaxing.data.storage.service.IDeviceDataQueryStreamService; import com.huaxing.data.storage.service.IDeviceDataQueryStreamService;
import com.huaxing.data.storage.service.IDeviceDataStoredService; import com.huaxing.data.storage.service.IDeviceDataStoredService;
import com.huaxing.data.tablemanagement.domain.TableColumnDTO;
import com.huaxing.data.tablemanagement.domain.TableDTO;
import com.huaxing.data.tablemanagement.service.ITableStructureService; import com.huaxing.data.tablemanagement.service.ITableStructureService;
import com.huaxing.common.util.JacksonUtil; import com.huaxing.common.util.JacksonUtil;
import com.huaxing.data.tablemanagement.service.ITableTemplateService;
import com.huaxing.mqtt.processor.MqttMessageSender; import com.huaxing.mqtt.processor.MqttMessageSender;
import com.huaxing.common.result.ResultVo; import com.huaxing.common.result.ResultVo;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
@ -32,31 +35,43 @@ import java.util.*;
@SuppressWarnings("all") @SuppressWarnings("all")
public class TestController { public class TestController {
@Resource @Autowired
private MqttMessageSender messageSender; private MqttMessageSender messageSender;
final IDeviceDataStoredService dataStoredService; final IDeviceDataStoredService dataStoredService;
final IDeviceDataQueryDfsService dataQueryDfsService; final IDeviceDataQueryDfsService dataQueryDfsService;
final IDeviceDataQueryStreamService dataQueryStreamService; final IDeviceDataQueryStreamService dataQueryStreamService;
final ITableStructureService tableStructureService; final ITableStructureService tableStructureService;
// // 依赖注入 final ITableTemplateService tableTemplateService;
public TestController(IDeviceDataStoredService dataStoredService, IDeviceDataQueryDfsService dataQueryDfsService, IDeviceDataQueryStreamService dataQueryStreamService, ITableStructureService tableStructureService) {
// =============================================== 测试构造器 ======================================
public TestController(IDeviceDataStoredService dataStoredService, IDeviceDataQueryDfsService dataQueryDfsService, IDeviceDataQueryStreamService dataQueryStreamService, ITableStructureService tableStructureService, ITableTemplateService tableTemplateService) {
this.dataStoredService = dataStoredService; this.dataStoredService = dataStoredService;
this.dataQueryDfsService = dataQueryDfsService; this.dataQueryDfsService = dataQueryDfsService;
this.dataQueryStreamService = dataQueryStreamService; this.dataQueryStreamService = dataQueryStreamService;
this.tableStructureService = tableStructureService; this.tableStructureService = tableStructureService;
this.tableTemplateService = tableTemplateService;
} }
// =============================================== 测试插入数据 ====================================== // =============================================== 测试插入数据 ======================================
@GetMapping(value = "/testInsert") // 成功
/**
* 测试数据插入
*/
@GetMapping(value = "/testInsert")
public void testInsert() { public void testInsert() {
String sql = "INSERT INTO ZbWaterMeter1Stream (time, projectId, deviceId, WM_WFA, WM_WFA_Unit) VALUES (2024.11.01 00:00:00,'48', '0jZU2102_0806_0011', 124.656, 'm³')"; String tableName = "Test3Stream";
String sql = "INSERT INTO " + tableName + " (time, projectId, deviceId, test, test1, test2) VALUES (2025.01.11 00:00:00,'48', '0jZU2102_0806_0011', 'testVal', 'test1Val', 'test2Val')";
dataStoredService.execute(sql); dataStoredService.execute(sql);
log.info("SUCCESS"); log.info("SUCCESS");
} }
// =============================================== 测试Dfs表查询 ====================================== // =============================================== 测试Dfs表查询 ======================================
/**
* 测试Dfs表查询
*/
@GetMapping(value = "/testSelectDfs") @GetMapping(value = "/testSelectDfs")
public ResultVo<List<Map<String, Object>>> testSelectDfs() { public ResultVo<List<Map<String, Object>>> testSelectDfs() {
String dbPath = "dfs://ZbDB"; String dbPath = "dfs://ZbDB";
@ -65,6 +80,10 @@ public class TestController {
} }
// =============================================== 测试订阅流表查询 ====================================== // =============================================== 测试订阅流表查询 ======================================
/**
* 测试订阅流表查询
*/
@GetMapping(value = "/testSelectStream") @GetMapping(value = "/testSelectStream")
public ResultVo<List<Map<String, Object>>> testSelectStream() { public ResultVo<List<Map<String, Object>>> testSelectStream() {
String sql = "select * from WaterMeterTset1Stream"; String sql = "select * from WaterMeterTset1Stream";
@ -72,6 +91,10 @@ public class TestController {
} }
// =============================================== 给指定的流表增加列字段 ====================================== // =============================================== 给指定的流表增加列字段 ======================================
/**
* 测试给指定的流表增加列字段
*/
@GetMapping(value = "/testStreamAddColumn") @GetMapping(value = "/testStreamAddColumn")
public void testStreamAddColumn() { public void testStreamAddColumn() {
String tableName = "WaterMeterTsetStream"; String tableName = "WaterMeterTsetStream";
@ -87,6 +110,9 @@ public class TestController {
} }
// =============================================== 给指定的Dfs表增加列字段 ====================================== // =============================================== 给指定的Dfs表增加列字段 ======================================
/**
* 测试给指定的Dfs表增加列字段
*/
@GetMapping(value = "/testDfsAddColumn") @GetMapping(value = "/testDfsAddColumn")
public void testDfsAddColumn() { public void testDfsAddColumn() {
String tableName = "WaterMeterTsetDfs"; String tableName = "WaterMeterTsetDfs";
@ -103,6 +129,10 @@ public class TestController {
// ===================================== mqtt消息发送-测试数据入库 START ============================== // ===================================== mqtt消息发送-测试数据入库 START ==============================
/**
* 测试发送mqtt消息
*/
@GetMapping(value = "/testSendMessage") @GetMapping(value = "/testSendMessage")
public void testSendMessage() { public void testSendMessage() {
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
@ -113,6 +143,11 @@ public class TestController {
} }
} }
/**
* 根据索引生成测试数据
* @param index 索引
* @return
*/
private Map<String, Object> handleMapByIndex(int index) { private Map<String, Object> handleMapByIndex(int index) {
Map<String, Object> map = new HashMap<>(); Map<String, Object> map = new HashMap<>();
map.put("time", new Date()); map.put("time", new Date());
@ -124,9 +159,75 @@ public class TestController {
return map; return map;
} }
// =============================================== TODO 创建Dfs表 Stream流表 创建 ====================================== // =============================================== 创建Dfs表 Stream流表 取消订阅 添加订阅 创建 ======================================
/**
* 测试创建Dfs表
*/
@GetMapping(value = "/testCreateDfsTable")
public void testCreateDfsTable() {
TableDTO tableDTO = new TableDTO();
tableDTO.setDatabaseName("ZbDB");
tableDTO.setTableName("Test1");
List<TableColumnDTO> tableColumnList = new ArrayList<>();
tableColumnList.add(new TableColumnDTO("test", "STRING", "test"));
tableColumnList.add(new TableColumnDTO("test1", "STRING","test1"));
tableColumnList.add(new TableColumnDTO("test2", "STRING","test2"));
tableDTO.setTableColumnList(tableColumnList);
System.out.println(tableTemplateService.createDfsTableTemplate(tableDTO));
}
/**
* 测试创建流表
*/
@GetMapping(value = "/testCreateStreamTable")
public void testCreateStreamTable() {
TableDTO tableDTO = new TableDTO();
tableDTO.setDatabaseName("ZbDB");
tableDTO.setTableName("Test1");
List<TableColumnDTO> tableColumnList = new ArrayList<>();
tableColumnList.add(new TableColumnDTO("test", "STRING", "test"));
tableColumnList.add(new TableColumnDTO("test1", "STRING","test1"));
tableColumnList.add(new TableColumnDTO("test2", "STRING","test2"));
tableDTO.setTableColumnList(tableColumnList);
System.out.println(tableTemplateService.createStreamTableTemplate(tableDTO));
}
/**
* 取消订阅流表
*/
@GetMapping(value = "/testUnsubscribeStreamTable")
public void testUnsubscribeStreamTable() {
String databaseName = "ZbDB";
String tableName = "Test";
System.out.println(tableTemplateService.unsubscribeStreamTableTemplate(databaseName, tableName));
}
/**
* 订阅流表
*/
@GetMapping(value = "/testSubscribeStreamTable")
public void testSubscribeStreamTable() {
String databaseName = "ZbDB";
String tableName = "Test";
System.out.println(tableTemplateService.subscribeStreamTableTemplate(databaseName, tableName));
}
/**
* 创建Dfs和Stream表并添加订阅
*/
@GetMapping(value = "/testCreateDfsAndStreamTableSubscribe")
public void testCreateDfsAndStreamTableSubscribe() {
TableDTO tableDTO = new TableDTO();
tableDTO.setDatabaseName("ZbDB");
tableDTO.setTableName("Test3");
List<TableColumnDTO> tableColumnList = new ArrayList<>();
tableColumnList.add(new TableColumnDTO("test", "STRING", "test"));
tableColumnList.add(new TableColumnDTO("test1", "STRING","test1"));
tableColumnList.add(new TableColumnDTO("test2", "STRING","test2"));
tableDTO.setTableColumnList(tableColumnList);
System.out.println(tableTemplateService.createDfsAndStreamTableSubscribe(tableDTO));
}
} }

Loading…
Cancel
Save