Browse Source

Merge pull request 'master' (#64) from master into V3

Reviewed-on: http://git.lrdaiot.cn:9000/thing/thing_api/pulls/64
qingyuan_dev_new
李帅 1 year ago
parent
commit
4dc6445624
  1. 78
      modules/quartz/src/main/java/com/thing/quartz/timetask/task/ThingStatusTask.java
  2. 4
      modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java
  3. 2
      modules/thing/src/main/java/com/thing/thing/model/controller/IotThingModelController.java
  4. 2
      modules/thing/src/main/java/com/thing/thing/model/service/impl/IotThingModelServiceImpl.java

78
modules/quartz/src/main/java/com/thing/quartz/timetask/task/ThingStatusTask.java

@ -4,13 +4,11 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.thing.common.cache.constants.CacheNameEnum;
import com.thing.common.core.enumeration.GateWayStatus;
import com.thing.common.core.enumeration.ThingStatus;
import com.thing.common.core.utils.ConvertUtils;
import com.thing.common.core.utils.JacksonUtil;
import com.thing.common.data.tskv.TsKvDTO;
import com.thing.common.tskv.service.TsKvService;
import com.thing.thing.cache.service.ThingCache;
import com.thing.thing.context.service.ThingManageContextService;
import com.thing.thing.model.dto.IotThingModelDTO;
import com.thing.thing.model.entity.IotThingModelEntity;
import com.thing.thing.model.service.IotThingModelService;
import lombok.RequiredArgsConstructor;
@ -19,11 +17,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.*;
import static com.thing.thing.cache.service.CacheInit.KEY;
@ -63,32 +57,53 @@ public class ThingStatusTask implements ITask {
}
//获取最新值
List<String> codes = optionalList.get().stream().map(s -> s.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText()).toList();
List<TsKvDTO> lastTsKvList = tsKvService.findLatestByCodesAndAttrs(codes, null, true);
//处理
List<IotThingModelEntity> statusList = optionalList.get().stream()
.map(item -> {
// IotThingModelDTO iotThingModelDTO = JacksonUtil.convertValue(item, IotThingModelDTO.class);
if(CollectionUtils.isEmpty(codes)){
return;
}
List<ObjectNode> updateModelList = new ArrayList<>();
for (ObjectNode objectNode : optionalList.get()) {
String modelCode = objectNode.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText();
//获取最新值
List<TsKvDTO> lastTsKvList = tsKvService.findLatestByCodeAndAttrs(modelCode, null, true);
if(CollectionUtils.isEmpty(lastTsKvList)){
//若是没有值 则离线
objectNode.put(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField(),ThingStatus.OFFLINE.getCode());
//更新物模型缓存
cache.updateAccurateKeyMap(CacheNameEnum.THING_MODEL, objectNode.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText()
+ KEY + objectNode.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText(), objectNode);
//更新物实体在线离线状态
List<ObjectNode> entityNodeList = cache.findMapAccurateKey(CacheNameEnum.THING_ENTITY, modelCode);
if(CollectionUtils.isNotEmpty(entityNodeList)){
for (ObjectNode entityNode : entityNodeList) {
//若是没有值 则离线
entityNode.put(CacheNameEnum.EntityField.THING_ENTITY_STATUS.getField(),ThingStatus.OFFLINE.getCode());
cache.updateAccurateKeyMap(CacheNameEnum.THING_ENTITY
,entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_TENANT_CODE.getField()).asText()
+ KEY + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_CODE.getField()).asText()
+ KEY + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_ID.getField()).asText(),entityNode);
}
}
updateModelList.add(objectNode);
}else{
//找到最大时间的属性
Optional<TsKvDTO> optionalMax = lastTsKvList.stream()
.filter(tsKvDTO -> StringUtils.equals(tsKvDTO.getThingCode(),item.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText()))
.filter(tsKvDTO -> StringUtils.equals(tsKvDTO.getThingCode(),modelCode))
.max(Comparator.comparing(TsKvDTO::getTs));
if(optionalMax.isEmpty()){
item.put(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField(),ThingStatus.OFFLINE.getCode());
}else{
TsKvDTO tsKvDTO = optionalMax.get();
//根据当前时间和获取的最新时间 若大于45min 则离线 否则为在线
boolean isOffline = System.currentTimeMillis() - tsKvDTO.getTs() > TIME_INTERVAL;
item.put(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField(),isOffline ? ThingStatus.OFFLINE.getCode() : ThingStatus.ONLINE.getCode());
item.put(CacheNameEnum.ModelField.THING_MODEL_STATUS_TS.getField(),new Date(tsKvDTO.getTs()).getTime());
boolean isOffline = System.currentTimeMillis() - optionalMax.get().getTs() > TIME_INTERVAL;
objectNode.put(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField(),isOffline ? ThingStatus.OFFLINE.getCode() : ThingStatus.ONLINE.getCode());
objectNode.put(CacheNameEnum.ModelField.THING_MODEL_STATUS_TS.getField(),new Date(optionalMax.get().getTs()).getTime());
//更新物模型缓存
cache.updateAccurateKeyMap(CacheNameEnum.THING_MODEL, objectNode.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText()
+ KEY + objectNode.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText(), objectNode);
updateModelList.add(objectNode);
//更新实体设备状态:这里需要精确匹配
List<ObjectNode> entityNodes = cache.findMapAccurateKeys(CacheNameEnum.THING_ENTITY, item.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText());
List<ObjectNode> entityNodes = cache.findMapAccurateKeys(CacheNameEnum.THING_ENTITY, modelCode);
if(CollectionUtils.isNotEmpty(entityNodes)){
for (ObjectNode entityNode : entityNodes) {
entityNode.put(CacheNameEnum.EntityField.THING_ENTITY_STATUS.getField()
,isOffline ? ThingStatus.OFFLINE.getCode() : ThingStatus.ONLINE.getCode());
entityNode.put(CacheNameEnum.ModelField.THING_MODEL_STATUS_TS.getField(),new Date(optionalMax.get().getTs()).getTime());
cache.updateAccurateKeyMap(CacheNameEnum.THING_ENTITY
,entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_TENANT_CODE.getField()).asText()
+ KEY + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_CODE.getField()).asText()
@ -96,16 +111,13 @@ public class ThingStatusTask implements ITask {
}
}
}
//更新物模型
IotThingModelDTO iotThingModelDTO = ConvertUtils.sourceToTarget(item, IotThingModelDTO.class);
cache.updateAccurateKeyEntity(CacheNameEnum.THING_MODEL
, item.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText()
+ KEY + item.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText(), iotThingModelDTO);
return JacksonUtil.convertValue(item, IotThingModelEntity.class);
}).collect(Collectors.toList());
if(CollectionUtils.isNotEmpty(statusList)){
}
if(CollectionUtils.isNotEmpty(updateModelList)){
List<IotThingModelEntity> list = updateModelList.stream().map(item -> JacksonUtil.convertValue(item, IotThingModelEntity.class)).toList();
//修改状态
modelService.saveOrUpdateBatch(statusList);
modelService.saveOrUpdateBatch(list);
}
log.info("设备在线离线 statusTask end");
}

4
modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java

@ -36,7 +36,7 @@ import java.util.stream.Collectors;
import static com.thing.thing.cache.service.CacheInit.KEY;
@Slf4j
//@Component
@Component
@RequiredArgsConstructor
public class QueueDeviceEventListener {
@ -49,7 +49,7 @@ public class QueueDeviceEventListener {
private final ReentrantLock lock = new ReentrantLock();
// @EventListener(QueueDeviceEvent.class)
@EventListener(QueueDeviceEvent.class)
public void onDeviceEvent(QueueDeviceEvent event) {
List<QueueMsgDTO> msgList = event.getList();
if(CollectionUtils.isEmpty(msgList)){

2
modules/thing/src/main/java/com/thing/thing/model/controller/IotThingModelController.java

@ -69,7 +69,7 @@ public class IotThingModelController {
@Parameter(name = "gateway", description = "网关") @RequestParam(required = false) String gateway,
@Parameter(name = "startTime", description = "开始时间") @RequestParam(required = false) Long startTime,
@Parameter(name = "endTime", description = "结束时间") @RequestParam(required = false) Long endTime) {
List<ObjectNode> list = service.findList1(orderField, order, code, token, origin, status, realType,gateway,startTime, endTime);
List<ObjectNode> list = service.findList(orderField, order, code, token, origin, status, realType,gateway,startTime, endTime);
return new Result<List<ObjectNode>>().ok(list);
}

2
modules/thing/src/main/java/com/thing/thing/model/service/impl/IotThingModelServiceImpl.java

@ -117,7 +117,7 @@ public class IotThingModelServiceImpl extends BaseServiceImpl<IotThingModelMappe
String gateway,
Long startTime,
Long endTime) {
List<ObjectNode> list = findList1(orderField, order, code, token, origin, status, realType,gateway, startTime, endTime);
List<ObjectNode> list = findList(orderField, order, code, token, origin, status, realType,gateway, startTime, endTime);
if (CollectionUtils.isEmpty(list)) {
return PageData.empty();
}

Loading…
Cancel
Save