From 1bff2e5a75f74ec8d7623a596aa53dbace7bfe68 Mon Sep 17 00:00:00 2001 From: lishuai Date: Thu, 12 Sep 2024 17:19:28 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E8=A1=A5=E5=8F=91=202024?= =?UTF-8?q?=E5=B9=B49=E6=9C=8812=E6=97=A517:19:26?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/DataReissueServiceImpl.java | 62 +++++++++---------- pom.xml | 30 ++++----- 2 files changed, 45 insertions(+), 47 deletions(-) diff --git a/modules/thing/src/main/java/com/thing/reissue/service/impl/DataReissueServiceImpl.java b/modules/thing/src/main/java/com/thing/reissue/service/impl/DataReissueServiceImpl.java index fbb0e48..b38dfa1 100644 --- a/modules/thing/src/main/java/com/thing/reissue/service/impl/DataReissueServiceImpl.java +++ b/modules/thing/src/main/java/com/thing/reissue/service/impl/DataReissueServiceImpl.java @@ -28,6 +28,7 @@ import com.thing.thing.entity.dto.IotThingEntityDTO; import jakarta.annotation.Resource; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.Nullable; import org.springframework.context.ApplicationEventPublisher; @@ -39,7 +40,7 @@ import org.springframework.web.util.UriComponents; import org.springframework.web.util.UriComponentsBuilder; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.kv.*; import java.util.*; import java.util.stream.Collectors; @@ -74,6 +75,9 @@ public class DataReissueServiceImpl extends BaseServiceImpl params) { String dashboardGroupId = (String) params.get("dashboardGroupId"); @@ -132,9 +136,8 @@ public class DataReissueServiceImpl extends BaseServiceImpl> longListMap = thingsAttributesList.stream().collect(Collectors.groupingBy(IotThingDictRelationParamDTO::getEntityId)); longListMap.forEach((k,v)->{ - boolean flag = true; - String limit = "100"; - while (flag){ + Long start = startTime; + while (start < endTime){ String entityCode = v.get(0).getEntityCode(); List keyList = v.stream().map(IotThingDictRelationParamDTO::getCode).collect(Collectors.toList()); HttpHeaders headers = new HttpHeaders(); @@ -147,25 +150,22 @@ public class DataReissueServiceImpl extends BaseServiceImpl> timeseries = getTskvtMap(responseEntity, token, dataReissueSettingDTO, keyList, startTime, endTime,limit); -// List tsKvEntries = RestJsonConverter.toTimeseries(timeseries); -// if(CollectionUtil.isNotEmpty(tsKvEntries)){ -// pushQueue(entityCode,tsKvEntries); -// -// }else{ -// flag = false; -// } + Map> timeseries = getTskvtMap(responseEntity, token, dataReissueSettingDTO, keyList, start, start+ONE_DAY); + if(MapUtils.isNotEmpty(timeseries)){ + pushQueue(entityCode,timeseries); + } + start += ONE_DAY; } }); } @Nullable private Map> getTskvtMap(ResponseEntity responseEntity, String token, DataReissueSettingDTO dataReissueSettingDTO, - List keyList, Long startTime, Long endTime,String limit) { + List keyList, Long startTime, Long endTime) { DeviceId deviceId = responseEntity.getBody().getId(); - HttpHeaders headers1 = new HttpHeaders(); - headers1.setContentType(MediaType.APPLICATION_JSON); - headers1.set("x-authorization","Bearer " + token); + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set("x-authorization","Bearer " + token); UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromHttpUrl(dataReissueSettingDTO.getUrl()) .path("/api/plugins/telemetry/{entityType}/{entityId}/values/timeseries") .queryParam("keys", "{keys}") @@ -179,22 +179,21 @@ public class DataReissueServiceImpl extends BaseServiceImpl requestEntity = new HttpEntity<>(null , headers1); - + HttpEntity requestEntity = new HttpEntity<>(null , headers); Map> timeseries = restTemplate.exchange( uriComponents.toUri(), @@ -231,18 +230,17 @@ public class DataReissueServiceImpl extends BaseServiceImpl timeSeries) { - if (CollectionUtil.isEmpty(timeSeries)) { + private void pushQueue(String thingCode, Map> timeseries) { + if (MapUtils.isEmpty(timeseries)) { return ; } - List list = timeSeries.parallelStream() - .filter(tsKv -> Objects.nonNull(tsKv.getValue())) - .map(tsKv -> new TsKvDTO(thingCode, tsKv.getKey(), tsKv.getTs(), tsKv.getValueAsString())) - .toList(); - //send 队列 - publisher.publishEvent(new TsKvEvent(Topics.V1_TSKV_HISTORY.getValue(), TsKvDTO.toDataProtoList(list))); - + timeseries.forEach((key, values) -> { + if(!values.isEmpty()){ + List list = values.stream().map(ts -> new TsKvDTO(thingCode, key, ts.get("ts").asLong(), ts.get("value").asText())).toList(); + //send 队列 + publisher.publishEvent(new TsKvEvent(Topics.V1_TSKV_HISTORY.getValue(), TsKvDTO.toDataProtoList(list))); + } + }); } - } \ No newline at end of file diff --git a/pom.xml b/pom.xml index 2fa0276..4bd2e40 100644 --- a/pom.xml +++ b/pom.xml @@ -373,11 +373,11 @@ delight-nashorn-sandbox ${delight-nashorn-sandbox.version} - - org.thingsboard - tbel - ${tbel.version} - + + + + + com.github.rholder @@ -395,16 +395,16 @@ - - org.thingsboard.common - util - ${thingsboard.version} - - - org.thingsboard.common - data - ${thingsboard.version} - + + + + + + + + + + io.jsonwebtoken jjwt