diff --git a/modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java b/modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java index 2640cf0..d074b6d 100644 --- a/modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java +++ b/modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java @@ -12,15 +12,16 @@ import com.thing.common.core.event.AuthParam; import com.thing.common.core.event.QueueDeviceEvent; import com.thing.common.core.utils.ConvertUtils; import com.thing.common.core.utils.DateTimeUtils; +import com.thing.common.core.utils.JacksonUtil; import com.thing.common.core.utils.TokenGenerator; import com.thing.common.data.dto.QueueMsgDTO; -import com.thing.thing.cache.service.CacheInit; import com.thing.thing.cache.service.ThingCache; import com.thing.thing.context.service.ThingManageContextService; import com.thing.thing.entity.dto.IotThingViewDTO; import com.thing.thing.entity.entity.IotThingEntity; import com.thing.thing.model.dto.IotThingModelDTO; import com.thing.thing.model.entity.IotThingModelEntity; +import com.thing.thing.model.service.IotThingModelService; import com.thing.transport.api.adaptor.JsonConverter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -39,6 +40,8 @@ public class QueueDeviceEventListener { private final ThingManageContextService thingManageContextService; + private final IotThingModelService thingModelService; + private final ThingCache thingCache; @EventListener(QueueDeviceEvent.class) @@ -58,10 +61,12 @@ public class QueueDeviceEventListener { item -> (QueueOriginType.AUTO_DATA_SYNC.name().equals(item.getOrigin()) || QueueOriginType.MANUAL_DATA_SYNC.name().equals(item.getOrigin())) ? QueueOriginType.TB.name() : item.getOrigin(), (e1, e2) -> e1)); - //物模型 - List insertModelList = new ArrayList<>(); //物模型插入 - checkAndSaveThingModels(codeOriginMap, insertModelList); + List insertModelList = new ArrayList<>(); + //物模型更新 + List updateModelList = new ArrayList<>(); + //筛选物模型的插入和更新 + checkAndSaveThingModels(codeOriginMap, insertModelList,updateModelList); if (CollectionUtils.isNotEmpty(insertModelList)) { // 物实体 @@ -73,80 +78,96 @@ public class QueueDeviceEventListener { .collect(Collectors.groupingBy( item ->new AuthParam(item.getTenantCode(),item.getCompanyId(),item.getDeptId()), Collectors.mapping(QueueMsgDTO::getThingCode,Collectors.toSet()))); - List insertEntityList = new ArrayList<>(); saveTenantThingList(tenantThingCodeMap, insertEntityList); - insertModelList.forEach(item -> { long count = insertEntityList.stream().filter(e -> StringUtils.equals(e.getCode(), item.getCode())).count(); List keyMap = thingCache.findKeyMap(CacheNameEnum.THING_ENTITY, item.getCode()); item.setAuthNum(count+CollectionUtils.size(keyMap)); }); - thingManageContextService.saveAllModel(insertModelList); + thingModelService.saveBatch(insertModelList); - //更新物模板缓存 - List iotThingModelDTOS = ConvertUtils.sourceToTarget(insertModelList, IotThingModelDTO.class); - List modelList = JsonConverter.convertToJsonObjectListObjectNode(iotThingModelDTOS); - - for (ObjectNode item : modelList) { - //更新物模型 - thingCache.updateKeyMap(CacheNameEnum.THING_MODEL - , item.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText() - + ":" + item.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText(),item); - } + //插入物实体 if(CollectionUtils.isNotEmpty(insertEntityList)){ List distinctStudentFile = insertEntityList.stream() .collect(Collectors.collectingAndThen( Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(o -> o.getCode() + ";" + o.getTenantCode()))), ArrayList::new)); thingManageContextService.saveEntity(distinctStudentFile); - - //更新物实体缓存 - List entityList = distinctStudentFile.stream().map(e -> { - ObjectNode keyMap = thingCache.getKeyMap(CacheNameEnum.THING_ENTITY, e.getCode()); - return new IotThingViewDTO() - .setLat(e.getLat()) - .setLon(e.getLon()) - .setTags(e.getTags()) - .setDeptIds(e.getDeptIds()) - .setOrigin(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_ORIGIN.getField()).asText()) - .setTemplateMark(TemplateMark.NO.getValue()) - .setRealType("1") - .setImg(e.getImg()) - .setRemark(e.getRemark()) - .setEnableStatus(e.getEnableStatus()) - .setStatusTs(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_STATUS_TS.getField()).asLong()) - .setStatus(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField()).asText()) - .setTenantCode(e.getTenantCode()) - .setCompanyId(e.getTenantCode()) - .setDeptId(e.getTenantCode()) - .setEntityType(e.getType()) - .setEntityName(e.getName()) - .setEntityCode(e.getCode()) - .setEntityId(e.getId()) - .setModelId(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asLong()) - .setCreateDate(e.getCreateDate()); - }).toList(); - List entityJsonList = JsonConverter.convertToJsonObjectListObjectNode(entityList); - for (ObjectNode entityNode : entityJsonList) { - thingCache.updateKeyMap(CacheNameEnum.THING_ENTITY - ,entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_TENANT_CODE.getField()).asText() - + ":" + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_CODE.getField()).asText() - + ":" + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_ID.getField()).asText(),entityNode); - } + updateEntityCache(distinctStudentFile); } } + + if(CollectionUtils.isNotEmpty(updateModelList)){ + updateModelList.forEach(item -> { + long count = updateModelList.stream().filter(e -> StringUtils.equals(e.getCode(), item.getCode())).count(); + List keyMap = thingCache.findKeyMap(CacheNameEnum.THING_ENTITY, item.getCode()); + item.setAuthNum(count+CollectionUtils.size(keyMap)); + }); + thingModelService.updateBatch(updateModelList); + insertModelList.addAll(updateModelList); + } + //更新物模板缓存 + updateModleCache(insertModelList); } catch (Exception e) { log.error("设备保存失败: {}", e.getMessage(), e); } } + private void updateModleCache(List insertModelList) { + List iotThingModelDTOS = ConvertUtils.sourceToTarget(insertModelList, IotThingModelDTO.class); + List modelList = JsonConverter.convertToJsonObjectListObjectNode(iotThingModelDTOS); + + for (ObjectNode item : modelList) { + //更新物模型的缓存 + thingCache.updateKeyMap(CacheNameEnum.THING_MODEL + , item.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText() + + ":" + item.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText(),item); + } + } + + private void updateEntityCache(List distinctStudentFile) { + //更新物实体缓存 + List entityList = distinctStudentFile.stream().map(e -> { + ObjectNode keyMap = thingCache.getKeyMap(CacheNameEnum.THING_ENTITY, e.getCode()); + return new IotThingViewDTO() + .setLat(e.getLat()) + .setLon(e.getLon()) + .setTags(e.getTags()) + .setDeptIds(e.getDeptIds()) + .setOrigin(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_ORIGIN.getField()).asText()) + .setTemplateMark(TemplateMark.NO.getValue()) + .setRealType("1") + .setImg(e.getImg()) + .setRemark(e.getRemark()) + .setEnableStatus(e.getEnableStatus()) + .setStatusTs(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_STATUS_TS.getField()).asLong()) + .setStatus(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField()).asText()) + .setTenantCode(e.getTenantCode()) + .setCompanyId(e.getTenantCode()) + .setDeptId(e.getTenantCode()) + .setEntityType(e.getType()) + .setEntityName(e.getName()) + .setEntityCode(e.getCode()) + .setEntityId(e.getId()) + .setModelId(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asLong()) + .setCreateDate(e.getCreateDate()); + }).toList(); + List entityJsonList = JsonConverter.convertToJsonObjectListObjectNode(entityList); + for (ObjectNode entityNode : entityJsonList) { + thingCache.updateKeyMap(CacheNameEnum.THING_ENTITY + ,entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_TENANT_CODE.getField()).asText() + + ":" + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_CODE.getField()).asText() + + ":" + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_ID.getField()).asText(),entityNode); + } + } + /** * 物管理表是否存在,不存在新增设备 * * @param codeOriginMap 物编码 */ - private void checkAndSaveThingModels(Map codeOriginMap, List insertList) throws Exception { + private void checkAndSaveThingModels(Map codeOriginMap, List insertList,List updateList) { // 使用迭代器遍历并删除不符合条件的元素 Iterator> iterator = codeOriginMap.entrySet().iterator(); while (iterator.hasNext()) { @@ -154,6 +175,8 @@ public class QueueDeviceEventListener { String code = entry.getKey(); ObjectNode jsonObject = thingCache.findObjectNode(CacheNameEnum.THING_MODEL, code); if (null != jsonObject && !jsonObject.isEmpty()) { + IotThingModelEntity modelEntity = JacksonUtil.convertValue(jsonObject, IotThingModelEntity.class); + updateList.add(modelEntity); iterator.remove(); } } @@ -163,7 +186,7 @@ public class QueueDeviceEventListener { .setToken(TokenGenerator.generateValue()) .setGateway(GateWayStatus.NO_GATE_WAY.getValue()) .setStatus(ThingStatus.NOT_CONNECTED.getCode()) - .setAuthNum(1L) + .setAuthNum(0L) .setStatusTs(DateTimeUtils.getCurrentTime()) .setOrigin(origin); modelEntity.setId(IdUtil.getSnowflake().nextId()) diff --git a/modules/thing/src/main/java/com/thing/thing/model/service/IotThingModelService.java b/modules/thing/src/main/java/com/thing/thing/model/service/IotThingModelService.java index 6197538..15e30e3 100644 --- a/modules/thing/src/main/java/com/thing/thing/model/service/IotThingModelService.java +++ b/modules/thing/src/main/java/com/thing/thing/model/service/IotThingModelService.java @@ -27,6 +27,8 @@ public interface IotThingModelService extends IBaseService Optional findByCode(String code); + Optional getByCode(String code); + Optional> findByGateway(String gateway); Optional findModelByGatewayAndToken(String gateway, String token); @@ -59,4 +61,6 @@ public interface IotThingModelService extends IBaseService void batchSaveOrUpdate(List entities); + void batchInsertOrUpdate(List entities); + } diff --git a/modules/thing/src/main/java/com/thing/thing/model/service/impl/IotThingModelServiceImpl.java b/modules/thing/src/main/java/com/thing/thing/model/service/impl/IotThingModelServiceImpl.java index dd859bf..93da625 100644 --- a/modules/thing/src/main/java/com/thing/thing/model/service/impl/IotThingModelServiceImpl.java +++ b/modules/thing/src/main/java/com/thing/thing/model/service/impl/IotThingModelServiceImpl.java @@ -137,7 +137,6 @@ public class IotThingModelServiceImpl extends BaseServiceImpl modelList = cache.getTopicMap(CacheNameEnum.THING_MODEL); - log.info("modelList缓存数据个数:{},modelList缓存数据:{}", modelList.size(),modelList.toString()); if (CollectionUtils.isEmpty(modelList)) { List list = mapper.selectListByQueryAs(getWrapper(orderField, order, null, null, null, null, null,null,null, null), IotThingModelDTO.class); @@ -175,6 +174,11 @@ public class IotThingModelServiceImpl extends BaseServiceImpl getByCode(String code) { + return Optional.ofNullable(mapper.selectOneByQuery(QueryWrapper.create().eq(IotThingModelEntity::getCode, code))); + } + @Override public Optional> findByGateway(String gateway) { List topicMap = cache.getTopicMap(CacheNameEnum.THING_MODEL); @@ -379,10 +383,15 @@ public class IotThingModelServiceImpl extends BaseServiceImpl modelEntities) { - modelEntities.forEach(model -> mapper.insertOrUpdateSelective(model)); + modelEntities.forEach(model -> mapper.insertOrUpdate(model)); cache.clearTopic(CacheNameEnum.THING_MODEL); } + @Override + public void batchInsertOrUpdate(List entities) { + entities.forEach(model -> mapper.insertOrUpdate(model)); + } + //物模型新增和更新中分配物实体信息 private List shareThingsToTenantCode(Collection codeList, Collection tenantCodes) { if (CollectionUtils.isEmpty(tenantCodes)) {