diff --git a/modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java b/modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java index d074b6d..172f3e1 100644 --- a/modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java +++ b/modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java @@ -8,7 +8,6 @@ import com.thing.common.core.enumeration.GateWayStatus; import com.thing.common.core.enumeration.QueueOriginType; import com.thing.common.core.enumeration.TemplateMark; import com.thing.common.core.enumeration.ThingStatus; -import com.thing.common.core.event.AuthParam; import com.thing.common.core.event.QueueDeviceEvent; import com.thing.common.core.utils.ConvertUtils; import com.thing.common.core.utils.DateTimeUtils; @@ -19,10 +18,12 @@ import com.thing.thing.cache.service.ThingCache; import com.thing.thing.context.service.ThingManageContextService; import com.thing.thing.entity.dto.IotThingViewDTO; import com.thing.thing.entity.entity.IotThingEntity; +import com.thing.thing.entity.service.IotThingEntityService; import com.thing.thing.model.dto.IotThingModelDTO; import com.thing.thing.model.entity.IotThingModelEntity; import com.thing.thing.model.service.IotThingModelService; import com.thing.transport.api.adaptor.JsonConverter; +import jakarta.annotation.Resource; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; @@ -31,6 +32,7 @@ import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; @Slf4j @@ -42,6 +44,8 @@ public class QueueDeviceEventListener { private final IotThingModelService thingModelService; + private final IotThingEntityService entityService; + private final ThingCache thingCache; @EventListener(QueueDeviceEvent.class) @@ -55,60 +59,99 @@ public class QueueDeviceEventListener { if (CollectionUtil.isEmpty(validMsgList)) { return; } - //新增和更新物模型信息 - Map codeOriginMap = validMsgList.parallelStream() - .collect(Collectors.toMap(QueueMsgDTO::getThingCode, - item -> (QueueOriginType.AUTO_DATA_SYNC.name().equals(item.getOrigin()) - || QueueOriginType.MANUAL_DATA_SYNC.name().equals(item.getOrigin())) - ? QueueOriginType.TB.name() : item.getOrigin(), (e1, e2) -> e1)); + //先根据物模型编码去重 + Map>> codeMapList = validMsgList.stream() + .collect(Collectors.groupingBy(QueueMsgDTO::getThingCode, Collectors.groupingBy(QueueMsgDTO::getTenantCode))); //物模型插入 List insertModelList = new ArrayList<>(); //物模型更新 List updateModelList = new ArrayList<>(); - //筛选物模型的插入和更新 - checkAndSaveThingModels(codeOriginMap, insertModelList,updateModelList); + //物实体的插入 + List insertEntityList = new ArrayList<>(); + //组装物实体和物模型 + for (Map.Entry>> entry : codeMapList.entrySet()) { + String thingCode = entry.getKey(); + //分用户:admin用户不用创建物实体,其他用户需要创建物实体 + Map> valueEntry = entry.getValue(); + for (Map.Entry> thingEntry : valueEntry.entrySet()) { + Long tenantCode = thingEntry.getKey(); + QueueMsgDTO queueMsgDTO = thingEntry.getValue().get(0); + //物模型缓存查询 + ObjectNode jsonObject = thingCache.findAccurateObjectNode(CacheNameEnum.THING_MODEL, thingCode); + //缓存存在,更新,否则插入 + if (null != jsonObject && !jsonObject.isEmpty()) { + IotThingModelEntity modelEntity = JacksonUtil.convertValue(jsonObject, IotThingModelEntity.class); + updateModelList.add(modelEntity); + } else { + String origin = QueueOriginType.AUTO_DATA_SYNC.name().equals(queueMsgDTO.getOrigin()) + || QueueOriginType.MANUAL_DATA_SYNC.name().equals(queueMsgDTO.getOrigin()) + ? QueueOriginType.TB.name() : queueMsgDTO.getOrigin(); + IotThingModelEntity modelEntity = new IotThingModelEntity() + .setCode(thingCode) + .setToken(TokenGenerator.generateValue()) + .setGateway(GateWayStatus.NO_GATE_WAY.getValue()) + .setStatus(ThingStatus.NOT_CONNECTED.getCode()) + .setAuthNum(0L) + .setStatusTs(DateTimeUtils.getCurrentTime()) + .setOrigin(origin); +// modelEntity.setId(IdUtil.getSnowflake().nextId()) +// .setCreateDate(DateTimeUtils.getCurrentTime()) +// .setCreateDate(DateTimeUtils.getCurrentTime()) + ; + insertModelList.add(modelEntity); + } + //物实体的构建 +// if (!Objects.equals(queueMsgDTO.getTenantCode(), 1001L)) { +// ObjectNode entityNode = thingCache.findObjectNode(CacheNameEnum.THING_ENTITY, tenantCode + ":" + thingCode); +// if (null == entityNode || entityNode.isEmpty()) { +// IotThingEntity newThingEntity = createThingEntity(thingCode, queueMsgDTO.getTenantCode(), queueMsgDTO.getCompanyId(), queueMsgDTO.getDeptId()); +// insertEntityList.add(newThingEntity); +// } +// } + ObjectNode entityNode = thingCache.findObjectNode(CacheNameEnum.THING_ENTITY, tenantCode + ":" + thingCode); + if (null == entityNode || entityNode.isEmpty()) { + IotThingEntity newThingEntity = createThingEntity(thingCode, queueMsgDTO.getTenantCode(), queueMsgDTO.getCompanyId(), queueMsgDTO.getDeptId()); + insertEntityList.add(newThingEntity); + } + } + } + //插入新的物模型 if (CollectionUtils.isNotEmpty(insertModelList)) { - // 物实体 - Map> tenantThingCodeMap = - validMsgList.stream() - .filter(item -> Objects.nonNull(item.getTenantCode()) - || Objects.nonNull(item.getCompanyId()) - || Objects.nonNull(item.getDeptId())) - .collect(Collectors.groupingBy( - item ->new AuthParam(item.getTenantCode(),item.getCompanyId(),item.getDeptId()), - Collectors.mapping(QueueMsgDTO::getThingCode,Collectors.toSet()))); - List insertEntityList = new ArrayList<>(); - saveTenantThingList(tenantThingCodeMap, insertEntityList); - insertModelList.forEach(item -> { - long count = insertEntityList.stream().filter(e -> StringUtils.equals(e.getCode(), item.getCode())).count(); - List keyMap = thingCache.findKeyMap(CacheNameEnum.THING_ENTITY, item.getCode()); - item.setAuthNum(count+CollectionUtils.size(keyMap)); - }); - thingModelService.saveBatch(insertModelList); +// thingModelService.saveBatch(insertModelList); + } + //更新老的物模型 + if (CollectionUtils.isNotEmpty(updateModelList)) { +// thingModelService.updateBatch(updateModelList); + insertModelList.addAll(updateModelList); + } + //更新物实体的模型:uniqueModelList有值,物实体不一定有值 + List distinctStudentFile = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(insertModelList)) { - //插入物实体 - if(CollectionUtils.isNotEmpty(insertEntityList)){ - List distinctStudentFile = insertEntityList.stream() + //物实体的插入和缓存更新 + if (CollectionUtils.isNotEmpty(insertEntityList)) { + distinctStudentFile.addAll(insertEntityList.stream() .collect(Collectors.collectingAndThen( - Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(o -> o.getCode() + ";" + o.getTenantCode()))), ArrayList::new)); - thingManageContextService.saveEntity(distinctStudentFile); - updateEntityCache(distinctStudentFile); + Collectors.toCollection(() -> + new TreeSet<>(Comparator.comparing(o -> o.getCode() + ";" + o.getTenantCode()))), ArrayList::new))); } } - - if(CollectionUtils.isNotEmpty(updateModelList)){ - updateModelList.forEach(item -> { - long count = updateModelList.stream().filter(e -> StringUtils.equals(e.getCode(), item.getCode())).count(); - List keyMap = thingCache.findKeyMap(CacheNameEnum.THING_ENTITY, item.getCode()); - item.setAuthNum(count+CollectionUtils.size(keyMap)); - }); - thingModelService.updateBatch(updateModelList); - insertModelList.addAll(updateModelList); + //更新物模型和物实体的缓存 + insertModelList.forEach(item -> { + long count = distinctStudentFile.stream().filter(e -> StringUtils.equals(e.getCode(), item.getCode())).count(); + List keyMap = thingCache.findKeyMap(CacheNameEnum.THING_ENTITY, item.getCode()); + item.setAuthNum(count + CollectionUtils.size(keyMap)); + }); + if(CollectionUtils.isNotEmpty(insertModelList)){ + thingModelService.saveOrUpdateBatch(insertModelList); + updateModleCache(insertModelList); + } + if(CollectionUtils.isNotEmpty(distinctStudentFile)){ + entityService.saveOrUpdateBatch(distinctStudentFile); + updateEntityCache(distinctStudentFile); } - //更新物模板缓存 - updateModleCache(insertModelList); } catch (Exception e) { log.error("设备保存失败: {}", e.getMessage(), e); } @@ -117,32 +160,41 @@ public class QueueDeviceEventListener { private void updateModleCache(List insertModelList) { List iotThingModelDTOS = ConvertUtils.sourceToTarget(insertModelList, IotThingModelDTO.class); List modelList = JsonConverter.convertToJsonObjectListObjectNode(iotThingModelDTOS); - - for (ObjectNode item : modelList) { - //更新物模型的缓存 - thingCache.updateKeyMap(CacheNameEnum.THING_MODEL - , item.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText() - + ":" + item.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText(),item); + List topicMap = thingCache.getTopicMap(CacheNameEnum.THING_MODEL); + if(CollectionUtils.isNotEmpty(topicMap)){ + Map modelMap = modelList.stream().collect(Collectors.toMap( + model -> model.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText() + + ":" + model.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText() + , Function.identity())); + thingCache.putMap(CacheNameEnum.THING_MODEL, modelMap); + }else{ + for (ObjectNode item : modelList) { + //更新物模型的缓存 + thingCache.updateKeyMap(CacheNameEnum.THING_MODEL + , item.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText() + + ":" + item.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText(), item); + } } } private void updateEntityCache(List distinctStudentFile) { //更新物实体缓存 List entityList = distinctStudentFile.stream().map(e -> { - ObjectNode keyMap = thingCache.getKeyMap(CacheNameEnum.THING_ENTITY, e.getCode()); +// ObjectNode keyMap = thingCache.getKeyMap(CacheNameEnum.THING_ENTITY, e.getCode()); + ObjectNode modelMap = thingCache.findAccurateObjectNode(CacheNameEnum.THING_MODEL, e.getCode()); return new IotThingViewDTO() .setLat(e.getLat()) .setLon(e.getLon()) .setTags(e.getTags()) .setDeptIds(e.getDeptIds()) - .setOrigin(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_ORIGIN.getField()).asText()) + .setOrigin(modelMap.get(CacheNameEnum.ModelField.THING_MODEL_ORIGIN.getField()).asText()) .setTemplateMark(TemplateMark.NO.getValue()) .setRealType("1") .setImg(e.getImg()) .setRemark(e.getRemark()) .setEnableStatus(e.getEnableStatus()) - .setStatusTs(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_STATUS_TS.getField()).asLong()) - .setStatus(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField()).asText()) + .setStatusTs(modelMap.get(CacheNameEnum.ModelField.THING_MODEL_STATUS_TS.getField()).asLong()) + .setStatus(modelMap.get(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField()).asText()) .setTenantCode(e.getTenantCode()) .setCompanyId(e.getTenantCode()) .setDeptId(e.getTenantCode()) @@ -150,88 +202,30 @@ public class QueueDeviceEventListener { .setEntityName(e.getName()) .setEntityCode(e.getCode()) .setEntityId(e.getId()) - .setModelId(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asLong()) + .setModelId(modelMap.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asLong()) .setCreateDate(e.getCreateDate()); }).toList(); List entityJsonList = JsonConverter.convertToJsonObjectListObjectNode(entityList); for (ObjectNode entityNode : entityJsonList) { thingCache.updateKeyMap(CacheNameEnum.THING_ENTITY - ,entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_TENANT_CODE.getField()).asText() + , entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_TENANT_CODE.getField()).asText() + ":" + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_CODE.getField()).asText() - + ":" + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_ID.getField()).asText(),entityNode); + + ":" + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_ID.getField()).asText(), entityNode); } } - /** - * 物管理表是否存在,不存在新增设备 - * - * @param codeOriginMap 物编码 - */ - private void checkAndSaveThingModels(Map codeOriginMap, List insertList,List updateList) { - // 使用迭代器遍历并删除不符合条件的元素 - Iterator> iterator = codeOriginMap.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - String code = entry.getKey(); - ObjectNode jsonObject = thingCache.findObjectNode(CacheNameEnum.THING_MODEL, code); - if (null != jsonObject && !jsonObject.isEmpty()) { - IotThingModelEntity modelEntity = JacksonUtil.convertValue(jsonObject, IotThingModelEntity.class); - updateList.add(modelEntity); - iterator.remove(); - } - } - codeOriginMap.forEach((thingCode, origin) -> { - IotThingModelEntity modelEntity = new IotThingModelEntity() - .setCode(thingCode) - .setToken(TokenGenerator.generateValue()) - .setGateway(GateWayStatus.NO_GATE_WAY.getValue()) - .setStatus(ThingStatus.NOT_CONNECTED.getCode()) - .setAuthNum(0L) - .setStatusTs(DateTimeUtils.getCurrentTime()) - .setOrigin(origin); - modelEntity.setId(IdUtil.getSnowflake().nextId()) - .setCreateDate(DateTimeUtils.getCurrentTime()) - .setCreateDate(DateTimeUtils.getCurrentTime()) - ; - insertList.add(modelEntity); - }); - } - - /** - * 保存物实体 - * - * @param tenantThingCodeMap 物实体 - */ - private void saveTenantThingList(Map> tenantThingCodeMap, List insertList) { - tenantThingCodeMap.forEach( - (authParam, thingCodes) -> { - if (CollectionUtil.isEmpty(thingCodes)) { - return; - } - thingCodes.removeIf(code -> { - ObjectNode keyMap = thingCache.findObjectNode(CacheNameEnum.THING_ENTITY, authParam.getTenantCode() + ":" + code); - return null != keyMap && !keyMap.isEmpty(); - }); - if (CollectionUtil.isNotEmpty(thingCodes)) { - for (String newThingCode : thingCodes) { - insertList.add(createThingEntity(newThingCode, authParam)); - } - } - }); - } - - private IotThingEntity createThingEntity(String thingCode, AuthParam authParam) { + private IotThingEntity createThingEntity(String thingCode, Long tenantCode, Long companyId, Long deptId) { IotThingEntity entity = new IotThingEntity(); - entity.setId(IdUtil.getSnowflake().nextId()); + // entity.setId(IdUtil.getSnowflake().nextId()); entity.setCode(thingCode); entity.setName(thingCode); entity.setEnableStatus("1"); entity.setRealType("1"); entity.setTemplateMark("0"); entity.setType("默认物类型"); - entity.setTenantCode(authParam.getTenantCode()); - entity.setCompanyId(authParam.getCompanyId()); - entity.setDeptIds(authParam.getDeptId().toString()); + entity.setTenantCode(tenantCode); + entity.setCompanyId(companyId); + entity.setDeptIds(deptId.toString()); return entity; } diff --git a/modules/thing/src/main/java/com/thing/thing/cache/service/ThingCache.java b/modules/thing/src/main/java/com/thing/thing/cache/service/ThingCache.java index ec4321e..140a19d 100644 --- a/modules/thing/src/main/java/com/thing/thing/cache/service/ThingCache.java +++ b/modules/thing/src/main/java/com/thing/thing/cache/service/ThingCache.java @@ -6,10 +6,7 @@ import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -90,6 +87,12 @@ public final class ThingCache { return valMap.entrySet().stream().filter(entry -> entry.getKey().contains(key)).map(Map.Entry::getValue).collect(Collectors.toList()); } + /** + * 模糊匹配 + * @param topic + * @param key + * @return + */ public ObjectNode findObjectNode(String topic, String key) { ConcurrentHashMap valMap = thingMap.get(topic); if(MapUtils.isEmpty(valMap)){ @@ -99,6 +102,27 @@ public final class ThingCache { return first.orElse(null); } + /** + * 精确匹配 + * @param topic + * @param key + * @return + */ + public ObjectNode findAccurateObjectNode(String topic, String key) { + ConcurrentHashMap valMap = thingMap.get(topic); + if(MapUtils.isEmpty(valMap)){ + return null; + } + Optional first = valMap.entrySet().stream().filter(entry -> + Arrays.asList(entry.getKey().split(":")).contains(key) + ).map(Map.Entry::getValue).findFirst(); + return first.orElse(null); + } + + + + + public void deleteKeyMap(String topic, String key) { ConcurrentHashMap valMap = thingMap.get(topic); if(MapUtils.isEmpty(valMap)){ @@ -132,6 +156,7 @@ public final class ThingCache { } //直接利用key覆盖 valMap.put(key,value); + thingMap.put(topic,valMap); } public List findAllKeyMap(String topic, Collection key) {