From b8392574b4a6fe91c21fdc5114c316af9d5e7b29 Mon Sep 17 00:00:00 2001 From: lishuai Date: Wed, 11 Sep 2024 15:33:45 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E8=BE=93=E5=85=A5=E6=8A=A5=E9=94=99bug=E4=BF=AE=E5=A4=8D=20202?= =?UTF-8?q?4=E5=B9=B49=E6=9C=8811=E6=97=A515:33:43?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../listener/QueueDeviceEventListener.java | 232 +++++++++--------- .../thing/thing/cache/service/ThingCache.java | 33 ++- 2 files changed, 142 insertions(+), 123 deletions(-) diff --git a/modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java b/modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java index d074b6d..172f3e1 100644 --- a/modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java +++ b/modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java @@ -8,7 +8,6 @@ import com.thing.common.core.enumeration.GateWayStatus; import com.thing.common.core.enumeration.QueueOriginType; import com.thing.common.core.enumeration.TemplateMark; import com.thing.common.core.enumeration.ThingStatus; -import com.thing.common.core.event.AuthParam; import com.thing.common.core.event.QueueDeviceEvent; import com.thing.common.core.utils.ConvertUtils; import com.thing.common.core.utils.DateTimeUtils; @@ -19,10 +18,12 @@ import com.thing.thing.cache.service.ThingCache; import com.thing.thing.context.service.ThingManageContextService; import com.thing.thing.entity.dto.IotThingViewDTO; import com.thing.thing.entity.entity.IotThingEntity; +import com.thing.thing.entity.service.IotThingEntityService; import com.thing.thing.model.dto.IotThingModelDTO; import com.thing.thing.model.entity.IotThingModelEntity; import com.thing.thing.model.service.IotThingModelService; import com.thing.transport.api.adaptor.JsonConverter; +import jakarta.annotation.Resource; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; @@ -31,6 +32,7 @@ import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; @Slf4j @@ -42,6 +44,8 @@ public class QueueDeviceEventListener { private final IotThingModelService thingModelService; + private final IotThingEntityService entityService; + private final ThingCache thingCache; @EventListener(QueueDeviceEvent.class) @@ -55,60 +59,99 @@ public class QueueDeviceEventListener { if (CollectionUtil.isEmpty(validMsgList)) { return; } - //新增和更新物模型信息 - Map codeOriginMap = validMsgList.parallelStream() - .collect(Collectors.toMap(QueueMsgDTO::getThingCode, - item -> (QueueOriginType.AUTO_DATA_SYNC.name().equals(item.getOrigin()) - || QueueOriginType.MANUAL_DATA_SYNC.name().equals(item.getOrigin())) - ? QueueOriginType.TB.name() : item.getOrigin(), (e1, e2) -> e1)); + //先根据物模型编码去重 + Map>> codeMapList = validMsgList.stream() + .collect(Collectors.groupingBy(QueueMsgDTO::getThingCode, Collectors.groupingBy(QueueMsgDTO::getTenantCode))); //物模型插入 List insertModelList = new ArrayList<>(); //物模型更新 List updateModelList = new ArrayList<>(); - //筛选物模型的插入和更新 - checkAndSaveThingModels(codeOriginMap, insertModelList,updateModelList); + //物实体的插入 + List insertEntityList = new ArrayList<>(); + //组装物实体和物模型 + for (Map.Entry>> entry : codeMapList.entrySet()) { + String thingCode = entry.getKey(); + //分用户:admin用户不用创建物实体,其他用户需要创建物实体 + Map> valueEntry = entry.getValue(); + for (Map.Entry> thingEntry : valueEntry.entrySet()) { + Long tenantCode = thingEntry.getKey(); + QueueMsgDTO queueMsgDTO = thingEntry.getValue().get(0); + //物模型缓存查询 + ObjectNode jsonObject = thingCache.findAccurateObjectNode(CacheNameEnum.THING_MODEL, thingCode); + //缓存存在,更新,否则插入 + if (null != jsonObject && !jsonObject.isEmpty()) { + IotThingModelEntity modelEntity = JacksonUtil.convertValue(jsonObject, IotThingModelEntity.class); + updateModelList.add(modelEntity); + } else { + String origin = QueueOriginType.AUTO_DATA_SYNC.name().equals(queueMsgDTO.getOrigin()) + || QueueOriginType.MANUAL_DATA_SYNC.name().equals(queueMsgDTO.getOrigin()) + ? QueueOriginType.TB.name() : queueMsgDTO.getOrigin(); + IotThingModelEntity modelEntity = new IotThingModelEntity() + .setCode(thingCode) + .setToken(TokenGenerator.generateValue()) + .setGateway(GateWayStatus.NO_GATE_WAY.getValue()) + .setStatus(ThingStatus.NOT_CONNECTED.getCode()) + .setAuthNum(0L) + .setStatusTs(DateTimeUtils.getCurrentTime()) + .setOrigin(origin); +// modelEntity.setId(IdUtil.getSnowflake().nextId()) +// .setCreateDate(DateTimeUtils.getCurrentTime()) +// .setCreateDate(DateTimeUtils.getCurrentTime()) + ; + insertModelList.add(modelEntity); + } + //物实体的构建 +// if (!Objects.equals(queueMsgDTO.getTenantCode(), 1001L)) { +// ObjectNode entityNode = thingCache.findObjectNode(CacheNameEnum.THING_ENTITY, tenantCode + ":" + thingCode); +// if (null == entityNode || entityNode.isEmpty()) { +// IotThingEntity newThingEntity = createThingEntity(thingCode, queueMsgDTO.getTenantCode(), queueMsgDTO.getCompanyId(), queueMsgDTO.getDeptId()); +// insertEntityList.add(newThingEntity); +// } +// } + ObjectNode entityNode = thingCache.findObjectNode(CacheNameEnum.THING_ENTITY, tenantCode + ":" + thingCode); + if (null == entityNode || entityNode.isEmpty()) { + IotThingEntity newThingEntity = createThingEntity(thingCode, queueMsgDTO.getTenantCode(), queueMsgDTO.getCompanyId(), queueMsgDTO.getDeptId()); + insertEntityList.add(newThingEntity); + } + } + } + //插入新的物模型 if (CollectionUtils.isNotEmpty(insertModelList)) { - // 物实体 - Map> tenantThingCodeMap = - validMsgList.stream() - .filter(item -> Objects.nonNull(item.getTenantCode()) - || Objects.nonNull(item.getCompanyId()) - || Objects.nonNull(item.getDeptId())) - .collect(Collectors.groupingBy( - item ->new AuthParam(item.getTenantCode(),item.getCompanyId(),item.getDeptId()), - Collectors.mapping(QueueMsgDTO::getThingCode,Collectors.toSet()))); - List insertEntityList = new ArrayList<>(); - saveTenantThingList(tenantThingCodeMap, insertEntityList); - insertModelList.forEach(item -> { - long count = insertEntityList.stream().filter(e -> StringUtils.equals(e.getCode(), item.getCode())).count(); - List keyMap = thingCache.findKeyMap(CacheNameEnum.THING_ENTITY, item.getCode()); - item.setAuthNum(count+CollectionUtils.size(keyMap)); - }); - thingModelService.saveBatch(insertModelList); +// thingModelService.saveBatch(insertModelList); + } + //更新老的物模型 + if (CollectionUtils.isNotEmpty(updateModelList)) { +// thingModelService.updateBatch(updateModelList); + insertModelList.addAll(updateModelList); + } + //更新物实体的模型:uniqueModelList有值,物实体不一定有值 + List distinctStudentFile = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(insertModelList)) { - //插入物实体 - if(CollectionUtils.isNotEmpty(insertEntityList)){ - List distinctStudentFile = insertEntityList.stream() + //物实体的插入和缓存更新 + if (CollectionUtils.isNotEmpty(insertEntityList)) { + distinctStudentFile.addAll(insertEntityList.stream() .collect(Collectors.collectingAndThen( - Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(o -> o.getCode() + ";" + o.getTenantCode()))), ArrayList::new)); - thingManageContextService.saveEntity(distinctStudentFile); - updateEntityCache(distinctStudentFile); + Collectors.toCollection(() -> + new TreeSet<>(Comparator.comparing(o -> o.getCode() + ";" + o.getTenantCode()))), ArrayList::new))); } } - - if(CollectionUtils.isNotEmpty(updateModelList)){ - updateModelList.forEach(item -> { - long count = updateModelList.stream().filter(e -> StringUtils.equals(e.getCode(), item.getCode())).count(); - List keyMap = thingCache.findKeyMap(CacheNameEnum.THING_ENTITY, item.getCode()); - item.setAuthNum(count+CollectionUtils.size(keyMap)); - }); - thingModelService.updateBatch(updateModelList); - insertModelList.addAll(updateModelList); + //更新物模型和物实体的缓存 + insertModelList.forEach(item -> { + long count = distinctStudentFile.stream().filter(e -> StringUtils.equals(e.getCode(), item.getCode())).count(); + List keyMap = thingCache.findKeyMap(CacheNameEnum.THING_ENTITY, item.getCode()); + item.setAuthNum(count + CollectionUtils.size(keyMap)); + }); + if(CollectionUtils.isNotEmpty(insertModelList)){ + thingModelService.saveOrUpdateBatch(insertModelList); + updateModleCache(insertModelList); + } + if(CollectionUtils.isNotEmpty(distinctStudentFile)){ + entityService.saveOrUpdateBatch(distinctStudentFile); + updateEntityCache(distinctStudentFile); } - //更新物模板缓存 - updateModleCache(insertModelList); } catch (Exception e) { log.error("设备保存失败: {}", e.getMessage(), e); } @@ -117,32 +160,41 @@ public class QueueDeviceEventListener { private void updateModleCache(List insertModelList) { List iotThingModelDTOS = ConvertUtils.sourceToTarget(insertModelList, IotThingModelDTO.class); List modelList = JsonConverter.convertToJsonObjectListObjectNode(iotThingModelDTOS); - - for (ObjectNode item : modelList) { - //更新物模型的缓存 - thingCache.updateKeyMap(CacheNameEnum.THING_MODEL - , item.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText() - + ":" + item.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText(),item); + List topicMap = thingCache.getTopicMap(CacheNameEnum.THING_MODEL); + if(CollectionUtils.isNotEmpty(topicMap)){ + Map modelMap = modelList.stream().collect(Collectors.toMap( + model -> model.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText() + + ":" + model.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText() + , Function.identity())); + thingCache.putMap(CacheNameEnum.THING_MODEL, modelMap); + }else{ + for (ObjectNode item : modelList) { + //更新物模型的缓存 + thingCache.updateKeyMap(CacheNameEnum.THING_MODEL + , item.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText() + + ":" + item.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText(), item); + } } } private void updateEntityCache(List distinctStudentFile) { //更新物实体缓存 List entityList = distinctStudentFile.stream().map(e -> { - ObjectNode keyMap = thingCache.getKeyMap(CacheNameEnum.THING_ENTITY, e.getCode()); +// ObjectNode keyMap = thingCache.getKeyMap(CacheNameEnum.THING_ENTITY, e.getCode()); + ObjectNode modelMap = thingCache.findAccurateObjectNode(CacheNameEnum.THING_MODEL, e.getCode()); return new IotThingViewDTO() .setLat(e.getLat()) .setLon(e.getLon()) .setTags(e.getTags()) .setDeptIds(e.getDeptIds()) - .setOrigin(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_ORIGIN.getField()).asText()) + .setOrigin(modelMap.get(CacheNameEnum.ModelField.THING_MODEL_ORIGIN.getField()).asText()) .setTemplateMark(TemplateMark.NO.getValue()) .setRealType("1") .setImg(e.getImg()) .setRemark(e.getRemark()) .setEnableStatus(e.getEnableStatus()) - .setStatusTs(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_STATUS_TS.getField()).asLong()) - .setStatus(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField()).asText()) + .setStatusTs(modelMap.get(CacheNameEnum.ModelField.THING_MODEL_STATUS_TS.getField()).asLong()) + .setStatus(modelMap.get(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField()).asText()) .setTenantCode(e.getTenantCode()) .setCompanyId(e.getTenantCode()) .setDeptId(e.getTenantCode()) @@ -150,88 +202,30 @@ public class QueueDeviceEventListener { .setEntityName(e.getName()) .setEntityCode(e.getCode()) .setEntityId(e.getId()) - .setModelId(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asLong()) + .setModelId(modelMap.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asLong()) .setCreateDate(e.getCreateDate()); }).toList(); List entityJsonList = JsonConverter.convertToJsonObjectListObjectNode(entityList); for (ObjectNode entityNode : entityJsonList) { thingCache.updateKeyMap(CacheNameEnum.THING_ENTITY - ,entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_TENANT_CODE.getField()).asText() + , entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_TENANT_CODE.getField()).asText() + ":" + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_CODE.getField()).asText() - + ":" + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_ID.getField()).asText(),entityNode); + + ":" + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_ID.getField()).asText(), entityNode); } } - /** - * 物管理表是否存在,不存在新增设备 - * - * @param codeOriginMap 物编码 - */ - private void checkAndSaveThingModels(Map codeOriginMap, List insertList,List updateList) { - // 使用迭代器遍历并删除不符合条件的元素 - Iterator> iterator = codeOriginMap.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - String code = entry.getKey(); - ObjectNode jsonObject = thingCache.findObjectNode(CacheNameEnum.THING_MODEL, code); - if (null != jsonObject && !jsonObject.isEmpty()) { - IotThingModelEntity modelEntity = JacksonUtil.convertValue(jsonObject, IotThingModelEntity.class); - updateList.add(modelEntity); - iterator.remove(); - } - } - codeOriginMap.forEach((thingCode, origin) -> { - IotThingModelEntity modelEntity = new IotThingModelEntity() - .setCode(thingCode) - .setToken(TokenGenerator.generateValue()) - .setGateway(GateWayStatus.NO_GATE_WAY.getValue()) - .setStatus(ThingStatus.NOT_CONNECTED.getCode()) - .setAuthNum(0L) - .setStatusTs(DateTimeUtils.getCurrentTime()) - .setOrigin(origin); - modelEntity.setId(IdUtil.getSnowflake().nextId()) - .setCreateDate(DateTimeUtils.getCurrentTime()) - .setCreateDate(DateTimeUtils.getCurrentTime()) - ; - insertList.add(modelEntity); - }); - } - - /** - * 保存物实体 - * - * @param tenantThingCodeMap 物实体 - */ - private void saveTenantThingList(Map> tenantThingCodeMap, List insertList) { - tenantThingCodeMap.forEach( - (authParam, thingCodes) -> { - if (CollectionUtil.isEmpty(thingCodes)) { - return; - } - thingCodes.removeIf(code -> { - ObjectNode keyMap = thingCache.findObjectNode(CacheNameEnum.THING_ENTITY, authParam.getTenantCode() + ":" + code); - return null != keyMap && !keyMap.isEmpty(); - }); - if (CollectionUtil.isNotEmpty(thingCodes)) { - for (String newThingCode : thingCodes) { - insertList.add(createThingEntity(newThingCode, authParam)); - } - } - }); - } - - private IotThingEntity createThingEntity(String thingCode, AuthParam authParam) { + private IotThingEntity createThingEntity(String thingCode, Long tenantCode, Long companyId, Long deptId) { IotThingEntity entity = new IotThingEntity(); - entity.setId(IdUtil.getSnowflake().nextId()); + // entity.setId(IdUtil.getSnowflake().nextId()); entity.setCode(thingCode); entity.setName(thingCode); entity.setEnableStatus("1"); entity.setRealType("1"); entity.setTemplateMark("0"); entity.setType("默认物类型"); - entity.setTenantCode(authParam.getTenantCode()); - entity.setCompanyId(authParam.getCompanyId()); - entity.setDeptIds(authParam.getDeptId().toString()); + entity.setTenantCode(tenantCode); + entity.setCompanyId(companyId); + entity.setDeptIds(deptId.toString()); return entity; } diff --git a/modules/thing/src/main/java/com/thing/thing/cache/service/ThingCache.java b/modules/thing/src/main/java/com/thing/thing/cache/service/ThingCache.java index ec4321e..140a19d 100644 --- a/modules/thing/src/main/java/com/thing/thing/cache/service/ThingCache.java +++ b/modules/thing/src/main/java/com/thing/thing/cache/service/ThingCache.java @@ -6,10 +6,7 @@ import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -90,6 +87,12 @@ public final class ThingCache { return valMap.entrySet().stream().filter(entry -> entry.getKey().contains(key)).map(Map.Entry::getValue).collect(Collectors.toList()); } + /** + * 模糊匹配 + * @param topic + * @param key + * @return + */ public ObjectNode findObjectNode(String topic, String key) { ConcurrentHashMap valMap = thingMap.get(topic); if(MapUtils.isEmpty(valMap)){ @@ -99,6 +102,27 @@ public final class ThingCache { return first.orElse(null); } + /** + * 精确匹配 + * @param topic + * @param key + * @return + */ + public ObjectNode findAccurateObjectNode(String topic, String key) { + ConcurrentHashMap valMap = thingMap.get(topic); + if(MapUtils.isEmpty(valMap)){ + return null; + } + Optional first = valMap.entrySet().stream().filter(entry -> + Arrays.asList(entry.getKey().split(":")).contains(key) + ).map(Map.Entry::getValue).findFirst(); + return first.orElse(null); + } + + + + + public void deleteKeyMap(String topic, String key) { ConcurrentHashMap valMap = thingMap.get(topic); if(MapUtils.isEmpty(valMap)){ @@ -132,6 +156,7 @@ public final class ThingCache { } //直接利用key覆盖 valMap.put(key,value); + thingMap.put(topic,valMap); } public List findAllKeyMap(String topic, Collection key) { From 870f647e1ef6b15112d532f3e283000289c14182 Mon Sep 17 00:00:00 2001 From: lishuai Date: Wed, 11 Sep 2024 15:47:43 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E5=AD=97=E5=85=B8=E5=AF=BC=E5=85=A5=202024?= =?UTF-8?q?=E5=B9=B49=E6=9C=8811=E6=97=A515:47:40?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../thing/thing/dict/service/impl/IotThingDictServiceImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/thing/src/main/java/com/thing/thing/dict/service/impl/IotThingDictServiceImpl.java b/modules/thing/src/main/java/com/thing/thing/dict/service/impl/IotThingDictServiceImpl.java index 139ca07..7685556 100644 --- a/modules/thing/src/main/java/com/thing/thing/dict/service/impl/IotThingDictServiceImpl.java +++ b/modules/thing/src/main/java/com/thing/thing/dict/service/impl/IotThingDictServiceImpl.java @@ -251,7 +251,7 @@ public class IotThingDictServiceImpl extends BaseServiceImpl codeCountMap = sheetData.stream() - .collect(Collectors.groupingBy(IotThingDictExcel::getCode, Collectors.counting())); // 使用 groupingBy 收集器按照 code 进行分组,并计数 + .collect(Collectors.groupingBy(s-> s.getCode()+":"+s.getGroupName(), Collectors.counting())); // 使用 groupingBy 收集器按照 code 进行分组,并计数 String duplicates = codeCountMap.entrySet().stream() .filter(entry -> entry.getValue() > 1) // 过滤出计数大于 1 的 entry .map(Map.Entry::getKey) // 获取过滤后的 entry 的 key,即重复的 code 值 From f709a2f8599de9e0a0eb3c888620e1ebfb662528 Mon Sep 17 00:00:00 2001 From: lishuai Date: Wed, 11 Sep 2024 16:07:32 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E5=AD=97=E5=85=B8=E5=AF=BC=E5=85=A5=202024?= =?UTF-8?q?=E5=B9=B49=E6=9C=8811=E6=97=A515:47:40?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../listener/QueueDeviceEventListener.java | 39 +++++-------------- 1 file changed, 10 insertions(+), 29 deletions(-) diff --git a/modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java b/modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java index 172f3e1..9382788 100644 --- a/modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java +++ b/modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java @@ -1,7 +1,6 @@ package com.thing.listener; import cn.hutool.core.collection.CollectionUtil; -import cn.hutool.core.util.IdUtil; import com.fasterxml.jackson.databind.node.ObjectNode; import com.thing.common.cache.constants.CacheNameEnum; import com.thing.common.core.enumeration.GateWayStatus; @@ -15,7 +14,6 @@ import com.thing.common.core.utils.JacksonUtil; import com.thing.common.core.utils.TokenGenerator; import com.thing.common.data.dto.QueueMsgDTO; import com.thing.thing.cache.service.ThingCache; -import com.thing.thing.context.service.ThingManageContextService; import com.thing.thing.entity.dto.IotThingViewDTO; import com.thing.thing.entity.entity.IotThingEntity; import com.thing.thing.entity.service.IotThingEntityService; @@ -23,7 +21,6 @@ import com.thing.thing.model.dto.IotThingModelDTO; import com.thing.thing.model.entity.IotThingModelEntity; import com.thing.thing.model.service.IotThingModelService; import com.thing.transport.api.adaptor.JsonConverter; -import jakarta.annotation.Resource; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; @@ -40,8 +37,6 @@ import java.util.stream.Collectors; @RequiredArgsConstructor public class QueueDeviceEventListener { - private final ThingManageContextService thingManageContextService; - private final IotThingModelService thingModelService; private final IotThingEntityService entityService; @@ -96,40 +91,27 @@ public class QueueDeviceEventListener { .setOrigin(origin); // modelEntity.setId(IdUtil.getSnowflake().nextId()) // .setCreateDate(DateTimeUtils.getCurrentTime()) -// .setCreateDate(DateTimeUtils.getCurrentTime()) - ; +// .setCreateDate(DateTimeUtils.getCurrentTime()); insertModelList.add(modelEntity); } //物实体的构建 -// if (!Objects.equals(queueMsgDTO.getTenantCode(), 1001L)) { -// ObjectNode entityNode = thingCache.findObjectNode(CacheNameEnum.THING_ENTITY, tenantCode + ":" + thingCode); -// if (null == entityNode || entityNode.isEmpty()) { -// IotThingEntity newThingEntity = createThingEntity(thingCode, queueMsgDTO.getTenantCode(), queueMsgDTO.getCompanyId(), queueMsgDTO.getDeptId()); -// insertEntityList.add(newThingEntity); -// } -// } - ObjectNode entityNode = thingCache.findObjectNode(CacheNameEnum.THING_ENTITY, tenantCode + ":" + thingCode); - if (null == entityNode || entityNode.isEmpty()) { - IotThingEntity newThingEntity = createThingEntity(thingCode, queueMsgDTO.getTenantCode(), queueMsgDTO.getCompanyId(), queueMsgDTO.getDeptId()); - insertEntityList.add(newThingEntity); + if (!Objects.equals(queueMsgDTO.getTenantCode(), 1001L)) { + ObjectNode entityNode = thingCache.findObjectNode(CacheNameEnum.THING_ENTITY, tenantCode + ":" + thingCode); + if (null == entityNode || entityNode.isEmpty()) { + IotThingEntity newThingEntity = createThingEntity(thingCode, queueMsgDTO.getTenantCode(), queueMsgDTO.getCompanyId(), queueMsgDTO.getDeptId()); + insertEntityList.add(newThingEntity); + } } - } - } - //插入新的物模型 - if (CollectionUtils.isNotEmpty(insertModelList)) { -// thingModelService.saveBatch(insertModelList); + } } //更新老的物模型 if (CollectionUtils.isNotEmpty(updateModelList)) { -// thingModelService.updateBatch(updateModelList); insertModelList.addAll(updateModelList); } //更新物实体的模型:uniqueModelList有值,物实体不一定有值 - List distinctStudentFile = new ArrayList<>(); if (CollectionUtils.isNotEmpty(insertModelList)) { - //物实体的插入和缓存更新 if (CollectionUtils.isNotEmpty(insertEntityList)) { distinctStudentFile.addAll(insertEntityList.stream() @@ -146,7 +128,7 @@ public class QueueDeviceEventListener { }); if(CollectionUtils.isNotEmpty(insertModelList)){ thingModelService.saveOrUpdateBatch(insertModelList); - updateModleCache(insertModelList); + updateModelCache(insertModelList); } if(CollectionUtils.isNotEmpty(distinctStudentFile)){ entityService.saveOrUpdateBatch(distinctStudentFile); @@ -157,7 +139,7 @@ public class QueueDeviceEventListener { } } - private void updateModleCache(List insertModelList) { + private void updateModelCache(List insertModelList) { List iotThingModelDTOS = ConvertUtils.sourceToTarget(insertModelList, IotThingModelDTO.class); List modelList = JsonConverter.convertToJsonObjectListObjectNode(iotThingModelDTOS); List topicMap = thingCache.getTopicMap(CacheNameEnum.THING_MODEL); @@ -180,7 +162,6 @@ public class QueueDeviceEventListener { private void updateEntityCache(List distinctStudentFile) { //更新物实体缓存 List entityList = distinctStudentFile.stream().map(e -> { -// ObjectNode keyMap = thingCache.getKeyMap(CacheNameEnum.THING_ENTITY, e.getCode()); ObjectNode modelMap = thingCache.findAccurateObjectNode(CacheNameEnum.THING_MODEL, e.getCode()); return new IotThingViewDTO() .setLat(e.getLat()) From 1c85fc4ca2be6310cb310f27f0c7299a4315faaa Mon Sep 17 00:00:00 2001 From: lishuai Date: Wed, 11 Sep 2024 17:03:24 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E9=85=8D=E7=BD=AE=E4=B8=AD=E5=BF=83?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=BF=A9=E4=B8=AA=E5=AD=97=E6=AE=B5=202024?= =?UTF-8?q?=E5=B9=B49=E6=9C=8811=E6=97=A517:03:21?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/IotThingSourceController.java | 12 ++++ .../device/source/dto/IotThingSourceDTO.java | 4 ++ .../source/dto/IotThingSourceListDTO.java | 4 ++ .../source/entity/IotThingSourceEntity.java | 3 + .../source/service/IotThingSourceService.java | 7 ++ .../impl/IotThingSourceServiceImpl.java | 65 +++++++++++++++++-- 6 files changed, 91 insertions(+), 4 deletions(-) diff --git a/modules/thing/src/main/java/com/thing/device/source/controller/IotThingSourceController.java b/modules/thing/src/main/java/com/thing/device/source/controller/IotThingSourceController.java index 05923f8..d3d7761 100644 --- a/modules/thing/src/main/java/com/thing/device/source/controller/IotThingSourceController.java +++ b/modules/thing/src/main/java/com/thing/device/source/controller/IotThingSourceController.java @@ -142,4 +142,16 @@ public class IotThingSourceController { return new Result<>(); } + @GetMapping("attrGroup") + @Operation(summary="标签组") + public Result> attrGroup() { + return new Result>().ok(iotThingSourceService.attrGroup()); + } + + @PostMapping("attrGroupList") + @Operation(summary="标签组") + public Result> attrGroupRootId(@RequestBody IotThingSourceReqDTO iotThingSourceDTO) { + return new Result>().ok(iotThingSourceService.attrGroupRootId(iotThingSourceDTO)); + } + } diff --git a/modules/thing/src/main/java/com/thing/device/source/dto/IotThingSourceDTO.java b/modules/thing/src/main/java/com/thing/device/source/dto/IotThingSourceDTO.java index cace7d1..438608c 100644 --- a/modules/thing/src/main/java/com/thing/device/source/dto/IotThingSourceDTO.java +++ b/modules/thing/src/main/java/com/thing/device/source/dto/IotThingSourceDTO.java @@ -94,6 +94,10 @@ public class IotThingSourceDTO implements Serializable { private Long updateDate; @Schema(description = "数据处理规则js") private String dataRule; + @Schema(description = "物属性名称组(标签组)") + private String thingAttrGroup; + @Schema(description = "启动标志:0启动 1不启动") + private String startStatus; // @JsonIgnore // private Long thingTenantId; // /** 批量新增 同步 或者 更新 功能 **/ diff --git a/modules/thing/src/main/java/com/thing/device/source/dto/IotThingSourceListDTO.java b/modules/thing/src/main/java/com/thing/device/source/dto/IotThingSourceListDTO.java index 9201efb..060dc78 100644 --- a/modules/thing/src/main/java/com/thing/device/source/dto/IotThingSourceListDTO.java +++ b/modules/thing/src/main/java/com/thing/device/source/dto/IotThingSourceListDTO.java @@ -80,5 +80,9 @@ public class IotThingSourceListDTO implements Serializable { private String extendData; @Schema(description = "数据处理规则js") private String dataRule; + @Schema(description = "物属性名称组(标签组)") + private String thingAttrGroup; + @Schema(description = "启动标志:0启动 1不启动") + private String startStatus; } diff --git a/modules/thing/src/main/java/com/thing/device/source/entity/IotThingSourceEntity.java b/modules/thing/src/main/java/com/thing/device/source/entity/IotThingSourceEntity.java index ffac98a..5ad19de 100644 --- a/modules/thing/src/main/java/com/thing/device/source/entity/IotThingSourceEntity.java +++ b/modules/thing/src/main/java/com/thing/device/source/entity/IotThingSourceEntity.java @@ -81,6 +81,9 @@ public class IotThingSourceEntity extends BaseInfoEntity { // // private Long updateDate; + private String thingAttrGroup; + private String startStatus; + } \ No newline at end of file diff --git a/modules/thing/src/main/java/com/thing/device/source/service/IotThingSourceService.java b/modules/thing/src/main/java/com/thing/device/source/service/IotThingSourceService.java index 61ecd88..fcd88bc 100644 --- a/modules/thing/src/main/java/com/thing/device/source/service/IotThingSourceService.java +++ b/modules/thing/src/main/java/com/thing/device/source/service/IotThingSourceService.java @@ -20,6 +20,9 @@ public interface IotThingSourceService extends IBaseService findAllByFromIdAndThingIdAndConfigTypeAndAttrCode(Long fromId, List thingIds, String configType, Long rootId, Collection attrCodes); + List findAllByFromIdAndThingIdAndConfigTypeAndAttrCodeAndAttrGroup(Long fromId, List thingIds, String configType, + Long rootId, Collection attrCodes,String attrGroup); + List findAllByFromIdAndThingIdAndConfigType(List fromIds, List thingIds, String configType, List rootIds); IotThingSourceDTO findById(Long id); @@ -43,4 +46,8 @@ public interface IotThingSourceService extends IBaseService getAttrListByThingRelationDTONew(IotThingSourceGetAttrDTO iotThingSourceGetAttrDTO); void dragAndDrop(Long id,Long toId,Long sort); + + List attrGroup(); + + List attrGroupRootId(IotThingSourceReqDTO iotThingSourceDTO); } diff --git a/modules/thing/src/main/java/com/thing/device/source/service/impl/IotThingSourceServiceImpl.java b/modules/thing/src/main/java/com/thing/device/source/service/impl/IotThingSourceServiceImpl.java index da4b48e..1fd7882 100644 --- a/modules/thing/src/main/java/com/thing/device/source/service/impl/IotThingSourceServiceImpl.java +++ b/modules/thing/src/main/java/com/thing/device/source/service/impl/IotThingSourceServiceImpl.java @@ -116,6 +116,20 @@ public class IotThingSourceServiceImpl extends BaseServiceImpl findAllByFromIdAndThingIdAndConfigTypeAndAttrCodeAndAttrGroup(Long fromId, List thingIds, String configType, Long rootId, Collection attrCodes, String attrGroup) { + return mapper.selectListByQueryAs( + new QueryWrapper() + .eq(IotThingSourceEntity::getFromId, fromId, Objects::nonNull) + .in(IotThingSourceEntity::getThingId, thingIds, CollectionUtil.isNotEmpty(thingIds)) + .eq(IotThingSourceEntity::getConfigType, configType, StringUtils::isNotEmpty) + .eq(IotThingSourceEntity::getRootId, rootId, Objects::nonNull) + .in(IotThingSourceEntity::getThingAttrCode, attrCodes, CollectionUtil.isNotEmpty(attrCodes)) + .eq(IotThingSourceEntity::getTenantCode, UserContext.getRealTenantCode()) + .eq(IotThingSourceEntity::getThingAttrGroup, attrGroup,StringUtils.isNotBlank(attrGroup)) + , IotThingSourceDTO.class); + } + @Override public List findAllByFromIdAndThingIdAndConfigType(List fromIds, List thingIds, String configType, List rootIds) { return mapper.selectListByQueryAs( @@ -134,9 +148,9 @@ public class IotThingSourceServiceImpl extends BaseServiceImpl iotThingSourceDTOS = findAllByFromIdAndThingIdAndConfigTypeAndAttrCode(iotThingSourceDTO.getFromId(), + List iotThingSourceDTOS = findAllByFromIdAndThingIdAndConfigTypeAndAttrCodeAndAttrGroup(iotThingSourceDTO.getFromId(), Collections.singletonList(iotThingSourceDTO.getThingId()), iotThingSourceDTO.getConfigType(), iotThingSourceDTO.getRootId(), - Collections.singletonList(iotThingSourceDTO.getThingAttrCode())); + Collections.singletonList(iotThingSourceDTO.getThingAttrCode()), iotThingSourceDTO.getThingAttrGroup()); if (CollectionUtil.isNotEmpty(iotThingSourceDTOS)) { throw new SysException("同一个数据源下不允许重复添加相关属性数据:" + iotThingSourceDTO.getThingAttrCode()); } @@ -167,7 +181,10 @@ public class IotThingSourceServiceImpl extends BaseServiceImpl attrGroup() { + List strings = mapper.selectListByQueryAs(QueryWrapper.create() + .select(IOT_THING_SOURCE_ENTITY.THING_ATTR_GROUP).eq(IotThingSourceEntity::getTenantCode, UserContext.getRealTenantCode()), String.class); + if(CollectionUtils.isEmpty(strings)){ + return Lists.newArrayList(); + } + return strings.stream().filter(s -> !StringUtils.isBlank(s)).distinct().collect(Collectors.toList()); + } + + @Override + public List attrGroupRootId(IotThingSourceReqDTO iotThingSourceDTO) { + List iotThingSourceRelationDTOList = iotThingSourceDTO.getIotThingSourceRelationDTOList(); + if (CollectionUtil.isEmpty(iotThingSourceRelationDTOList)) { + return null; + } + List thingIds = iotThingSourceRelationDTOList.stream().map(IotThingSourceRelationDTO::getThingId).toList(); + List rootIds = iotThingSourceRelationDTOList.stream().map(IotThingSourceRelationDTO::getRootId).distinct().toList(); + + List iotThingSourceDTOS = mapper.selectListByQueryAs(QueryWrapper.create() + .in(IotThingSourceEntity::getFromId, thingIds, CollectionUtil.isNotEmpty(thingIds)) + .in(IotThingSourceEntity::getRootId, rootIds, CollectionUtil.isNotEmpty(rootIds)) + .eq(IotThingSourceEntity::getConfigType, iotThingSourceDTO.getConfigType()) + .eq(IotThingSourceEntity::getThingAttrCodeType, iotThingSourceDTO.getThingAttrCodeType(), + StringUtils.isNotBlank(iotThingSourceDTO.getThingAttrCodeType())) + .eq(IotThingSourceEntity::getTenantCode, UserContext.getRealTenantCode()) + .eq(IotThingSourceEntity::getStartStatus, "0") + .orderBy(IotThingSourceEntity::getSort,true) + , IotThingSourceDTO.class + ); + // 根据 thingAttrGroup 去重 + Set seen = new LinkedHashSet<>(); + return iotThingSourceDTOS.stream() + .filter(dto -> StringUtils.isNotBlank(dto.getThingAttrGroup()) && seen.add(dto.getThingAttrGroup())) + .toList(); + } + /** * 自定义函数去重 *