Browse Source

优化物计算

2025年7月14日21:59:06
qingyuan_dev_new
lishuai 8 months ago
parent
commit
ac58a82adb
  1. 1
      common/queue/src/main/java/com/thing/queue/util/Topics.java
  2. 15
      common/tskv/src/main/java/com/thing/common/tskv/event/TsKvEventServiceImpl.java
  3. 31
      common/tskv/src/main/java/com/thing/common/tskv/service/TsKvNativeSQL.java
  4. 2
      modules/calculation/src/main/java/com/thing/calculation/handler/CalcExecuteHandler.java

1
common/queue/src/main/java/com/thing/queue/util/Topics.java

@ -14,6 +14,7 @@ public enum Topics {
/** 物计算日志存储事件 */
V1_TSKV_CALC_LOG_SAVE("v1/tskv/calc/log/save"),
V1_TSKV_CALC_LOG_CAL("v1/tskv/calc/log/cal"),
/** 过滤日志存储事件 */
V1_TSKV_FILTER_LOG_SAVE("v1/tskv/filter/log/save");

15
common/tskv/src/main/java/com/thing/common/tskv/event/TsKvEventServiceImpl.java

@ -1,5 +1,6 @@
package com.thing.common.tskv.event;
import cn.hutool.core.collection.CollectionUtil;
import com.thing.common.data.proto.QueueProto.DataProto;
import com.thing.common.tskv.service.DBExecutor;
import com.thing.common.tskv.service.TsKvService;
@ -32,8 +33,16 @@ public class TsKvEventServiceImpl implements TsKvEventService {
public void onApplicationEvent(TsKvEvent event) {
List<DataProto> protoList = event.getList();
if (!protoList.isEmpty()) {
// 只存储主题 tskv/save
if (event.getSource().toString().contains(Topics.V1_TSKV_HISTORY.getValue())) {
// 只存储主题 tskv/save:特殊处理
Set<String> setKeys = Arrays.stream(this.keys.split(",")).collect(Collectors.toSet());
if(event.getSource().toString().equals(Topics.V1_TSKV_CALC_LOG_CAL.getValue())){//物计算
tsKvService.saveProtoTsKvAndLatest(protoList);
//只能算到 setKeys 的值,不能计算am的值
protoList = protoList.stream().filter(s -> setKeys.stream().noneMatch(key -> s.getTskvProto().getKey().equals(key))).collect(Collectors.toList());
if(CollectionUtil.isEmpty(protoList)){
return;
}
} else if (event.getSource().toString().contains(Topics.V1_TSKV_HISTORY.getValue())) {
tsKvService.saveProtoTsKvAndLatest(protoList);
} else if (event.getSource().toString().contains(Topics.V1_TSKV_LATEST.getValue())) {
tsKvService.saveProtoLatest(protoList);
@ -47,7 +56,7 @@ public class TsKvEventServiceImpl implements TsKvEventService {
applicationEventPublisher.publishEvent(new FilterLogSaveEvent(Topics.V1_TSKV_FILTER_LOG_SAVE.getValue(), protoList));
}
Set<String> setKeys = Arrays.stream(this.keys.split(",")).collect(Collectors.toSet());
List<DataProto> dataProtos = new ArrayList<>();
List<DataProto> amList = new ArrayList<>();
Set<String> amKeys = new HashSet<>();

31
common/tskv/src/main/java/com/thing/common/tskv/service/TsKvNativeSQL.java

@ -42,17 +42,7 @@ public class TsKvNativeSQL {
if (protoList.isEmpty()) {
return 0;
}
// List<DataProto> list = new LinkedHashSet<>(protoList).stream().toList();
List<DataProto> list = protoList.stream()
.collect(Collectors.toMap(d -> List.of(
d.getTskvProto().getThingCode(),
d.getTskvProto().getTs(),
d.getTskvProto().getKey()
), Function.identity(), BinaryOperator.maxBy(Comparator.comparing(d -> d.getTskvProto().getVal()))))
.values()
.stream()
.toList();
String sql = sqlSaveProtoTsKv(list, dataType);
String sql = sqlSaveProtoTsKv(protoList, dataType);
try {
if(StringUtils.isNotEmpty(sql)){
Db.insertBySql(sql);
@ -61,7 +51,7 @@ public class TsKvNativeSQL {
log.error("--TsKv----{}-数据存储失败------>{}=======SQL====>{}", dataType, exception.getMessage(), sql);
throw new RuntimeException(exception);
}
return list.size();
return protoList.size();
}
public static Integer saveDTOTsKv(List<TsKvDTO> kvDTOList, DatabaseType dataType,String tableSuffix) {
@ -131,10 +121,6 @@ public class TsKvNativeSQL {
}
public static String sqlSaveProtoTsKv(List<DataProto> protoList, DatabaseType dataType) {
List<DataProto> distinctStudentFile = protoList.stream()
.collect(Collectors.collectingAndThen(
Collectors.toCollection(() ->
new TreeSet<>(Comparator.comparing(o -> o.getTskvProto().getThingCode() + ";" + o.getTskvProto().getKey()+ ";" + Double.valueOf(o.getTskvProto().getVal())+ ";" + o.getTskvProto().getTs()))), ArrayList::new));
StringBuilder sql = new StringBuilder(" INSERT INTO ")
.append(TS_KV).append(" (")
.append(THING_CODE)
@ -151,25 +137,24 @@ public class TsKvNativeSQL {
sql.append(",").append(PARTITION);
}
sql.append(") VALUES ");
IntStream.range(0, distinctStudentFile.size()).forEach(i -> {
long ts = distinctStudentFile.get(i).getTskvProto().getTs();
IntStream.range(0, protoList.size()).forEach(i -> {
long ts = protoList.get(i).getTskvProto().getTs();
DateTime dateTime = new DateTime(ts);
sql.append("('");
sql.append(distinctStudentFile.get(i).getTskvProto().getThingCode()).append("',");
sql.append("'").append(distinctStudentFile.get(i).getTskvProto().getKey()).append("',");
sql.append(protoList.get(i).getTskvProto().getThingCode()).append("',");
sql.append("'").append(protoList.get(i).getTskvProto().getKey()).append("',");
sql.append(ts).append(",");
sql.append(dateTime.getYear()).append(",");
sql.append(dateTime.getMonthOfYear()).append(",");
sql.append(dateTime.getDayOfMonth()).append(",");
sql.append(dateTime.getHourOfDay()).append(",");
sql.append(dateTime.getMinuteOfHour()).append(",");
sql.append("'").append(distinctStudentFile.get(i).getTskvProto().getVal()).append("'");
sql.append("'").append(protoList.get(i).getTskvProto().getVal()).append("'");
if (DatabaseType.CK.equals(dataType)) {
String partition = new DateTime(ts).toString("yyyy-MM-dd");
sql.append(",'").append(partition).append("'");
}
if (i == distinctStudentFile.size() - 1) {
if (i == protoList.size() - 1) {
sql.append(")");
} else {
sql.append("),");

2
modules/calculation/src/main/java/com/thing/calculation/handler/CalcExecuteHandler.java

@ -113,7 +113,7 @@ public class CalcExecuteHandler {
.build();
// 批量推送
publisher.publishEvent(
new QueueConsumerEvent(Topics.V1_TSKV_HISTORY.getValue(), Lists.newArrayList(QueueProto.DataProto.newBuilder().setTskvProto(tsKvProto).build())));
new QueueConsumerEvent(Topics.V1_TSKV_CALC_LOG_CAL.getValue(), Lists.newArrayList(QueueProto.DataProto.newBuilder().setTskvProto(tsKvProto).build())));
// protoList.add(
// );
}catch (Exception e){

Loading…
Cancel
Save