diff --git a/modules/calculation/src/main/java/com/thing/calculation/handler/CalcExecuteHandler.java b/modules/calculation/src/main/java/com/thing/calculation/handler/CalcExecuteHandler.java index 93d3087..a7bbb30 100644 --- a/modules/calculation/src/main/java/com/thing/calculation/handler/CalcExecuteHandler.java +++ b/modules/calculation/src/main/java/com/thing/calculation/handler/CalcExecuteHandler.java @@ -1,6 +1,7 @@ package com.thing.calculation.handler; import cn.hutool.core.map.MapUtil; +import com.google.common.collect.Lists; import com.thing.calculation.dto.CalcTargetConfigDTO; import com.thing.calculation.dto.ExecuteCalcRequest; import com.thing.calculation.dto.LogSourceInfoValue; @@ -83,7 +84,7 @@ public class CalcExecuteHandler { return; } // 算出结果后批量推送 - List protoList = new ArrayList<>(); + // List protoList = new ArrayList<>(); // 遍历计算日志 for (CalcLogEntity log : logs) { try { @@ -110,16 +111,17 @@ public class CalcExecuteHandler { .setTs(log.getTime()) .setVal(log.getResult()) .build(); - protoList.add( - QueueProto.DataProto.newBuilder().setTskvProto(tsKvProto).build()); + // 批量推送 + publisher.publishEvent( + new QueueConsumerEvent(Topics.V1_TSKV_HISTORY.getValue(), Lists.newArrayList(QueueProto.DataProto.newBuilder().setTskvProto(tsKvProto).build()))); +// protoList.add( +// ); }catch (Exception e){ //计算异常 log.setStatus(CalcStatus.CALCULATE_EXCEPTION.getCode()); } + } - // 批量推送 - publisher.publishEvent( - new QueueConsumerEvent(Topics.V1_TSKV_HISTORY.getValue(), protoList)); // 批量更新状态 calcLogService.batchUpdate(logs); } diff --git a/modules/calculation/src/main/java/com/thing/calculation/service/impl/CalcLogServiceImpl.java b/modules/calculation/src/main/java/com/thing/calculation/service/impl/CalcLogServiceImpl.java index 19d2306..eb5726b 100644 --- a/modules/calculation/src/main/java/com/thing/calculation/service/impl/CalcLogServiceImpl.java +++ b/modules/calculation/src/main/java/com/thing/calculation/service/impl/CalcLogServiceImpl.java @@ -145,7 +145,7 @@ public class CalcLogServiceImpl extends BaseServiceImpl calcLogEntities = mapper.selectListByQuery(QueryWrapper.create() + .in(CalcLogEntity::getCalcTargetConfigId, configIds) + .ge(CalcLogEntity::getTime, startTs, Objects.nonNull(startTs)) + .le(CalcLogEntity::getTime, endTs, Objects.nonNull(endTs)) +// .eq(CalcLogEntity::getStatus, CalcStatus.UN_CALCULATED.getCode()) + .ne(CalcLogEntity::getStatus, CalcStatus.CALCULATED.getCode()) + .le(CalcLogEntity::getCalcCount,MAX_CALC_COUNT).orderBy(CalcLogEntity::getTime).asc()); + + return calcLogEntities; + } @Override