diff --git a/common/queue/src/main/java/com/thing/queue/util/Topics.java b/common/queue/src/main/java/com/thing/queue/util/Topics.java index f99362d..aa52320 100644 --- a/common/queue/src/main/java/com/thing/queue/util/Topics.java +++ b/common/queue/src/main/java/com/thing/queue/util/Topics.java @@ -14,6 +14,7 @@ public enum Topics { /** 物计算日志存储事件 */ V1_TSKV_CALC_LOG_SAVE("v1/tskv/calc/log/save"), + V1_TSKV_CALC_LOG_CAL("v1/tskv/calc/log/cal"), /** 过滤日志存储事件 */ V1_TSKV_FILTER_LOG_SAVE("v1/tskv/filter/log/save"); diff --git a/common/tskv/src/main/java/com/thing/common/tskv/event/TsKvEventServiceImpl.java b/common/tskv/src/main/java/com/thing/common/tskv/event/TsKvEventServiceImpl.java index 488c62a..beb00eb 100644 --- a/common/tskv/src/main/java/com/thing/common/tskv/event/TsKvEventServiceImpl.java +++ b/common/tskv/src/main/java/com/thing/common/tskv/event/TsKvEventServiceImpl.java @@ -1,5 +1,6 @@ package com.thing.common.tskv.event; +import cn.hutool.core.collection.CollectionUtil; import com.thing.common.data.proto.QueueProto.DataProto; import com.thing.common.tskv.service.DBExecutor; import com.thing.common.tskv.service.TsKvService; @@ -32,8 +33,16 @@ public class TsKvEventServiceImpl implements TsKvEventService { public void onApplicationEvent(TsKvEvent event) { List protoList = event.getList(); if (!protoList.isEmpty()) { - // 只存储主题 为 tskv/save - if (event.getSource().toString().contains(Topics.V1_TSKV_HISTORY.getValue())) { + // 只存储主题 为 tskv/save:特殊处理 + Set setKeys = Arrays.stream(this.keys.split(",")).collect(Collectors.toSet()); + if(event.getSource().toString().equals(Topics.V1_TSKV_CALC_LOG_CAL.getValue())){//物计算 + tsKvService.saveProtoTsKvAndLatest(protoList); + //只能算到 setKeys 的值,不能计算am的值 + protoList = protoList.stream().filter(s -> setKeys.stream().noneMatch(key -> s.getTskvProto().getKey().equals(key))).collect(Collectors.toList()); + if(CollectionUtil.isEmpty(protoList)){ + return; + } + } else if (event.getSource().toString().contains(Topics.V1_TSKV_HISTORY.getValue())) { tsKvService.saveProtoTsKvAndLatest(protoList); } else if (event.getSource().toString().contains(Topics.V1_TSKV_LATEST.getValue())) { tsKvService.saveProtoLatest(protoList); @@ -47,7 +56,7 @@ public class TsKvEventServiceImpl implements TsKvEventService { applicationEventPublisher.publishEvent(new FilterLogSaveEvent(Topics.V1_TSKV_FILTER_LOG_SAVE.getValue(), protoList)); } - Set setKeys = Arrays.stream(this.keys.split(",")).collect(Collectors.toSet()); + List dataProtos = new ArrayList<>(); List amList = new ArrayList<>(); Set amKeys = new HashSet<>(); diff --git a/common/tskv/src/main/java/com/thing/common/tskv/service/TsKvNativeSQL.java b/common/tskv/src/main/java/com/thing/common/tskv/service/TsKvNativeSQL.java index 1e99352..925206e 100644 --- a/common/tskv/src/main/java/com/thing/common/tskv/service/TsKvNativeSQL.java +++ b/common/tskv/src/main/java/com/thing/common/tskv/service/TsKvNativeSQL.java @@ -42,17 +42,7 @@ public class TsKvNativeSQL { if (protoList.isEmpty()) { return 0; } - // List list = new LinkedHashSet<>(protoList).stream().toList(); - List list = protoList.stream() - .collect(Collectors.toMap(d -> List.of( - d.getTskvProto().getThingCode(), - d.getTskvProto().getTs(), - d.getTskvProto().getKey() - ), Function.identity(), BinaryOperator.maxBy(Comparator.comparing(d -> d.getTskvProto().getVal())))) - .values() - .stream() - .toList(); - String sql = sqlSaveProtoTsKv(list, dataType); + String sql = sqlSaveProtoTsKv(protoList, dataType); try { if(StringUtils.isNotEmpty(sql)){ Db.insertBySql(sql); @@ -61,7 +51,7 @@ public class TsKvNativeSQL { log.error("--TsKv----{}-数据存储失败------>{}=======SQL====>{}", dataType, exception.getMessage(), sql); throw new RuntimeException(exception); } - return list.size(); + return protoList.size(); } public static Integer saveDTOTsKv(List kvDTOList, DatabaseType dataType,String tableSuffix) { @@ -131,10 +121,6 @@ public class TsKvNativeSQL { } public static String sqlSaveProtoTsKv(List protoList, DatabaseType dataType) { - List distinctStudentFile = protoList.stream() - .collect(Collectors.collectingAndThen( - Collectors.toCollection(() -> - new TreeSet<>(Comparator.comparing(o -> o.getTskvProto().getThingCode() + ";" + o.getTskvProto().getKey()+ ";" + Double.valueOf(o.getTskvProto().getVal())+ ";" + o.getTskvProto().getTs()))), ArrayList::new)); StringBuilder sql = new StringBuilder(" INSERT INTO ") .append(TS_KV).append(" (") .append(THING_CODE) @@ -151,25 +137,24 @@ public class TsKvNativeSQL { sql.append(",").append(PARTITION); } sql.append(") VALUES "); - IntStream.range(0, distinctStudentFile.size()).forEach(i -> { - - long ts = distinctStudentFile.get(i).getTskvProto().getTs(); + IntStream.range(0, protoList.size()).forEach(i -> { + long ts = protoList.get(i).getTskvProto().getTs(); DateTime dateTime = new DateTime(ts); sql.append("('"); - sql.append(distinctStudentFile.get(i).getTskvProto().getThingCode()).append("',"); - sql.append("'").append(distinctStudentFile.get(i).getTskvProto().getKey()).append("',"); + sql.append(protoList.get(i).getTskvProto().getThingCode()).append("',"); + sql.append("'").append(protoList.get(i).getTskvProto().getKey()).append("',"); sql.append(ts).append(","); sql.append(dateTime.getYear()).append(","); sql.append(dateTime.getMonthOfYear()).append(","); sql.append(dateTime.getDayOfMonth()).append(","); sql.append(dateTime.getHourOfDay()).append(","); sql.append(dateTime.getMinuteOfHour()).append(","); - sql.append("'").append(distinctStudentFile.get(i).getTskvProto().getVal()).append("'"); + sql.append("'").append(protoList.get(i).getTskvProto().getVal()).append("'"); if (DatabaseType.CK.equals(dataType)) { String partition = new DateTime(ts).toString("yyyy-MM-dd"); sql.append(",'").append(partition).append("'"); } - if (i == distinctStudentFile.size() - 1) { + if (i == protoList.size() - 1) { sql.append(")"); } else { sql.append("),"); diff --git a/modules/calculation/src/main/java/com/thing/calculation/handler/CalcExecuteHandler.java b/modules/calculation/src/main/java/com/thing/calculation/handler/CalcExecuteHandler.java index a7bbb30..3663397 100644 --- a/modules/calculation/src/main/java/com/thing/calculation/handler/CalcExecuteHandler.java +++ b/modules/calculation/src/main/java/com/thing/calculation/handler/CalcExecuteHandler.java @@ -113,7 +113,7 @@ public class CalcExecuteHandler { .build(); // 批量推送 publisher.publishEvent( - new QueueConsumerEvent(Topics.V1_TSKV_HISTORY.getValue(), Lists.newArrayList(QueueProto.DataProto.newBuilder().setTskvProto(tsKvProto).build()))); + new QueueConsumerEvent(Topics.V1_TSKV_CALC_LOG_CAL.getValue(), Lists.newArrayList(QueueProto.DataProto.newBuilder().setTskvProto(tsKvProto).build()))); // protoList.add( // ); }catch (Exception e){