Browse Source

数据补发

2024年9月12日17:19:26
thing_master
lishuai 1 year ago
parent
commit
1bff2e5a75
  1. 62
      modules/thing/src/main/java/com/thing/reissue/service/impl/DataReissueServiceImpl.java
  2. 30
      pom.xml

62
modules/thing/src/main/java/com/thing/reissue/service/impl/DataReissueServiceImpl.java

@ -28,6 +28,7 @@ import com.thing.thing.entity.dto.IotThingEntityDTO;
import jakarta.annotation.Resource;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.Nullable;
import org.springframework.context.ApplicationEventPublisher;
@ -39,7 +40,7 @@ import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.*;
import java.util.*;
import java.util.stream.Collectors;
@ -74,6 +75,9 @@ public class DataReissueServiceImpl extends BaseServiceImpl<DataReissueMapper, D
@Resource
private RestTemplate restTemplate;
private final Long ONE_DAY = 86400000L;
@Override
public QueryWrapper getWrapper(Map<String, Object> params) {
String dashboardGroupId = (String) params.get("dashboardGroupId");
@ -132,9 +136,8 @@ public class DataReissueServiceImpl extends BaseServiceImpl<DataReissueMapper, D
String token = login(dataReissueSettingDTO.getUrl(), dataReissueSettingDTO.getUserName(), dataReissueSettingDTO.getPassword());
Map<Long, List<IotThingDictRelationParamDTO>> longListMap = thingsAttributesList.stream().collect(Collectors.groupingBy(IotThingDictRelationParamDTO::getEntityId));
longListMap.forEach((k,v)->{
boolean flag = true;
String limit = "100";
while (flag){
Long start = startTime;
while (start < endTime){
String entityCode = v.get(0).getEntityCode();
List<String> keyList = v.stream().map(IotThingDictRelationParamDTO::getCode).collect(Collectors.toList());
HttpHeaders headers = new HttpHeaders();
@ -147,25 +150,22 @@ public class DataReissueServiceImpl extends BaseServiceImpl<DataReissueMapper, D
requestEntity,
Device.class
);
Map<String, List<JsonNode>> timeseries = getTskvtMap(responseEntity, token, dataReissueSettingDTO, keyList, startTime, endTime,limit);
// List<TsKvEntry> tsKvEntries = RestJsonConverter.toTimeseries(timeseries);
// if(CollectionUtil.isNotEmpty(tsKvEntries)){
// pushQueue(entityCode,tsKvEntries);
//
// }else{
// flag = false;
// }
Map<String, List<JsonNode>> timeseries = getTskvtMap(responseEntity, token, dataReissueSettingDTO, keyList, start, start+ONE_DAY);
if(MapUtils.isNotEmpty(timeseries)){
pushQueue(entityCode,timeseries);
}
start += ONE_DAY;
}
});
}
@Nullable
private Map<String, List<JsonNode>> getTskvtMap(ResponseEntity<Device> responseEntity, String token, DataReissueSettingDTO dataReissueSettingDTO,
List<String> keyList, Long startTime, Long endTime,String limit) {
List<String> keyList, Long startTime, Long endTime) {
DeviceId deviceId = responseEntity.getBody().getId();
HttpHeaders headers1 = new HttpHeaders();
headers1.setContentType(MediaType.APPLICATION_JSON);
headers1.set("x-authorization","Bearer " + token);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("x-authorization","Bearer " + token);
UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromHttpUrl(dataReissueSettingDTO.getUrl())
.path("/api/plugins/telemetry/{entityType}/{entityId}/values/timeseries")
.queryParam("keys", "{keys}")
@ -179,22 +179,21 @@ public class DataReissueServiceImpl extends BaseServiceImpl<DataReissueMapper, D
params.put("entityType", deviceId.getEntityType().name());
params.put("entityId", deviceId.getId().toString());
params.put("keys", String.join(",", keyList));
params.put("interval", "0");
params.put("interval","0");
params.put("agg", "NONE");
params.put("limit", limit);
params.put("limit", String.valueOf(Integer.MAX_VALUE));
params.put("orderBy", "DESC");
params.put("useStrictDataTypes", Boolean.toString(Boolean.FALSE));
if (startTime != null) {
uriBuilder.queryParam("&startTs={startTs}");
// uriBuilder.queryParam("&startTs={startTs}");
params.put("startTs", String.valueOf(startTime));
}
if (endTime != null) {
uriBuilder.queryParam("&endTs={endTs}");
// uriBuilder.queryParam("&endTs={endTs}");
params.put("endTs", String.valueOf(endTime));
}
UriComponents uriComponents = uriBuilder.buildAndExpand(params);
HttpEntity<String> requestEntity = new HttpEntity<>(null , headers1);
HttpEntity<String> requestEntity = new HttpEntity<>(null , headers);
Map<String, List<JsonNode>> timeseries = restTemplate.exchange(
uriComponents.toUri(),
@ -231,18 +230,17 @@ public class DataReissueServiceImpl extends BaseServiceImpl<DataReissueMapper, D
return tokenInfo.getBody().get("token").asText();
}
private void pushQueue(String thingCode, List<TsKvEntry> timeSeries) {
if (CollectionUtil.isEmpty(timeSeries)) {
private void pushQueue(String thingCode, Map<String, List<JsonNode>> timeseries) {
if (MapUtils.isEmpty(timeseries)) {
return ;
}
List<TsKvDTO> list = timeSeries.parallelStream()
.filter(tsKv -> Objects.nonNull(tsKv.getValue()))
.map(tsKv -> new TsKvDTO(thingCode, tsKv.getKey(), tsKv.getTs(), tsKv.getValueAsString()))
.toList();
//send 队列
publisher.publishEvent(new TsKvEvent(Topics.V1_TSKV_HISTORY.getValue(), TsKvDTO.toDataProtoList(list)));
timeseries.forEach((key, values) -> {
if(!values.isEmpty()){
List<TsKvDTO> list = values.stream().map(ts -> new TsKvDTO(thingCode, key, ts.get("ts").asLong(), ts.get("value").asText())).toList();
//send 队列
publisher.publishEvent(new TsKvEvent(Topics.V1_TSKV_HISTORY.getValue(), TsKvDTO.toDataProtoList(list)));
}
});
}
}

30
pom.xml

@ -373,11 +373,11 @@
<artifactId>delight-nashorn-sandbox</artifactId>
<version>${delight-nashorn-sandbox.version}</version>
</dependency>
<dependency>
<groupId>org.thingsboard</groupId>
<artifactId>tbel</artifactId>
<version>${tbel.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.thingsboard</groupId>-->
<!-- <artifactId>tbel</artifactId>-->
<!-- <version>${tbel.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>com.github.rholder</groupId>
@ -395,16 +395,16 @@
</exclusions>
</dependency>
<!-- thingsboard jar -->
<dependency>
<groupId>org.thingsboard.common</groupId>
<artifactId>util</artifactId>
<version>${thingsboard.version}</version>
</dependency>
<dependency>
<groupId>org.thingsboard.common</groupId>
<artifactId>data</artifactId>
<version>${thingsboard.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.thingsboard.common</groupId>-->
<!-- <artifactId>util</artifactId>-->
<!-- <version>${thingsboard.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.thingsboard.common</groupId>-->
<!-- <artifactId>data</artifactId>-->
<!-- <version>${thingsboard.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt</artifactId>

Loading…
Cancel
Save