diff --git a/common/tskv/src/main/java/com/thing/common/tskv/service/TsKvNativeSQL.java b/common/tskv/src/main/java/com/thing/common/tskv/service/TsKvNativeSQL.java index b4ada46..470520b 100644 --- a/common/tskv/src/main/java/com/thing/common/tskv/service/TsKvNativeSQL.java +++ b/common/tskv/src/main/java/com/thing/common/tskv/service/TsKvNativeSQL.java @@ -15,6 +15,7 @@ import org.joda.time.DateTime; import java.math.BigDecimal; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -120,6 +121,14 @@ public class TsKvNativeSQL { } public static String sqlSaveProtoTsKv(List protoList, DatabaseType dataType) { + + // 使用Stream API和Collectors.toMap来去重 + List distinctStudentFile = protoList.stream() + .collect(Collectors.collectingAndThen( + Collectors.toCollection(() -> + new TreeSet<>(Comparator.comparing(o -> o.getTskvProto().getThingCode() + ";" + o.getTskvProto().getKey()+ ";" + o.getTskvProto().getVal()+ ";" + o.getTskvProto().getTs()))), ArrayList::new)); + + StringBuilder sql = new StringBuilder(" INSERT INTO ") .append(TS_KV).append(" (") .append(THING_CODE) @@ -136,25 +145,25 @@ public class TsKvNativeSQL { sql.append(",").append(PARTITION); } sql.append(") VALUES "); - IntStream.range(0, protoList.size()).forEach(i -> { + IntStream.range(0, distinctStudentFile.size()).forEach(i -> { - long ts = protoList.get(i).getTskvProto().getTs(); + long ts = distinctStudentFile.get(i).getTskvProto().getTs(); DateTime dateTime = new DateTime(ts); sql.append("('"); - sql.append(protoList.get(i).getTskvProto().getThingCode()).append("',"); - sql.append("'").append(protoList.get(i).getTskvProto().getKey()).append("',"); + sql.append(distinctStudentFile.get(i).getTskvProto().getThingCode()).append("',"); + sql.append("'").append(distinctStudentFile.get(i).getTskvProto().getKey()).append("',"); sql.append(ts).append(","); sql.append(dateTime.getYear()).append(","); sql.append(dateTime.getMonthOfYear()).append(","); sql.append(dateTime.getDayOfMonth()).append(","); sql.append(dateTime.getHourOfDay()).append(","); sql.append(dateTime.getMinuteOfHour()).append(","); - sql.append("'").append(protoList.get(i).getTskvProto().getVal()).append("'"); + sql.append("'").append(distinctStudentFile.get(i).getTskvProto().getVal()).append("'"); if (DatabaseType.CK.equals(dataType)) { String partition = new DateTime(ts).toString("yyyy-MM-dd"); sql.append(",'").append(partition).append("'"); } - if (i == protoList.size() - 1) { + if (i == distinctStudentFile.size() - 1) { sql.append(")"); } else { sql.append("),");