Browse Source

订阅数据输入报错bug修复

2024年9月11日15:33:43
thing_master
lishuai 1 year ago
parent
commit
b8392574b4
  1. 232
      modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java
  2. 33
      modules/thing/src/main/java/com/thing/thing/cache/service/ThingCache.java

232
modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java

@ -8,7 +8,6 @@ import com.thing.common.core.enumeration.GateWayStatus;
import com.thing.common.core.enumeration.QueueOriginType; import com.thing.common.core.enumeration.QueueOriginType;
import com.thing.common.core.enumeration.TemplateMark; import com.thing.common.core.enumeration.TemplateMark;
import com.thing.common.core.enumeration.ThingStatus; import com.thing.common.core.enumeration.ThingStatus;
import com.thing.common.core.event.AuthParam;
import com.thing.common.core.event.QueueDeviceEvent; import com.thing.common.core.event.QueueDeviceEvent;
import com.thing.common.core.utils.ConvertUtils; import com.thing.common.core.utils.ConvertUtils;
import com.thing.common.core.utils.DateTimeUtils; import com.thing.common.core.utils.DateTimeUtils;
@ -19,10 +18,12 @@ import com.thing.thing.cache.service.ThingCache;
import com.thing.thing.context.service.ThingManageContextService; import com.thing.thing.context.service.ThingManageContextService;
import com.thing.thing.entity.dto.IotThingViewDTO; import com.thing.thing.entity.dto.IotThingViewDTO;
import com.thing.thing.entity.entity.IotThingEntity; import com.thing.thing.entity.entity.IotThingEntity;
import com.thing.thing.entity.service.IotThingEntityService;
import com.thing.thing.model.dto.IotThingModelDTO; import com.thing.thing.model.dto.IotThingModelDTO;
import com.thing.thing.model.entity.IotThingModelEntity; import com.thing.thing.model.entity.IotThingModelEntity;
import com.thing.thing.model.service.IotThingModelService; import com.thing.thing.model.service.IotThingModelService;
import com.thing.transport.api.adaptor.JsonConverter; import com.thing.transport.api.adaptor.JsonConverter;
import jakarta.annotation.Resource;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
@ -31,6 +32,7 @@ import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.*; import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Slf4j @Slf4j
@ -42,6 +44,8 @@ public class QueueDeviceEventListener {
private final IotThingModelService thingModelService; private final IotThingModelService thingModelService;
private final IotThingEntityService entityService;
private final ThingCache thingCache; private final ThingCache thingCache;
@EventListener(QueueDeviceEvent.class) @EventListener(QueueDeviceEvent.class)
@ -55,60 +59,99 @@ public class QueueDeviceEventListener {
if (CollectionUtil.isEmpty(validMsgList)) { if (CollectionUtil.isEmpty(validMsgList)) {
return; return;
} }
//新增和更新物模型信息
Map<String, String> codeOriginMap = validMsgList.parallelStream()
.collect(Collectors.toMap(QueueMsgDTO::getThingCode,
item -> (QueueOriginType.AUTO_DATA_SYNC.name().equals(item.getOrigin())
|| QueueOriginType.MANUAL_DATA_SYNC.name().equals(item.getOrigin()))
? QueueOriginType.TB.name() : item.getOrigin(), (e1, e2) -> e1));
//先根据物模型编码去重
Map<String, Map<Long, List<QueueMsgDTO>>> codeMapList = validMsgList.stream()
.collect(Collectors.groupingBy(QueueMsgDTO::getThingCode, Collectors.groupingBy(QueueMsgDTO::getTenantCode)));
//物模型插入 //物模型插入
List<IotThingModelEntity> insertModelList = new ArrayList<>(); List<IotThingModelEntity> insertModelList = new ArrayList<>();
//物模型更新 //物模型更新
List<IotThingModelEntity> updateModelList = new ArrayList<>(); List<IotThingModelEntity> updateModelList = new ArrayList<>();
//筛选物模型的插入和更新
checkAndSaveThingModels(codeOriginMap, insertModelList,updateModelList);
//物实体的插入
List<IotThingEntity> insertEntityList = new ArrayList<>();
//组装物实体和物模型
for (Map.Entry<String, Map<Long, List<QueueMsgDTO>>> entry : codeMapList.entrySet()) {
String thingCode = entry.getKey();
//分用户admin用户不用创建物实体其他用户需要创建物实体
Map<Long, List<QueueMsgDTO>> valueEntry = entry.getValue();
for (Map.Entry<Long, List<QueueMsgDTO>> thingEntry : valueEntry.entrySet()) {
Long tenantCode = thingEntry.getKey();
QueueMsgDTO queueMsgDTO = thingEntry.getValue().get(0);
//物模型缓存查询
ObjectNode jsonObject = thingCache.findAccurateObjectNode(CacheNameEnum.THING_MODEL, thingCode);
//缓存存在更新否则插入
if (null != jsonObject && !jsonObject.isEmpty()) {
IotThingModelEntity modelEntity = JacksonUtil.convertValue(jsonObject, IotThingModelEntity.class);
updateModelList.add(modelEntity);
} else {
String origin = QueueOriginType.AUTO_DATA_SYNC.name().equals(queueMsgDTO.getOrigin())
|| QueueOriginType.MANUAL_DATA_SYNC.name().equals(queueMsgDTO.getOrigin())
? QueueOriginType.TB.name() : queueMsgDTO.getOrigin();
IotThingModelEntity modelEntity = new IotThingModelEntity()
.setCode(thingCode)
.setToken(TokenGenerator.generateValue())
.setGateway(GateWayStatus.NO_GATE_WAY.getValue())
.setStatus(ThingStatus.NOT_CONNECTED.getCode())
.setAuthNum(0L)
.setStatusTs(DateTimeUtils.getCurrentTime())
.setOrigin(origin);
// modelEntity.setId(IdUtil.getSnowflake().nextId())
// .setCreateDate(DateTimeUtils.getCurrentTime())
// .setCreateDate(DateTimeUtils.getCurrentTime())
;
insertModelList.add(modelEntity);
}
//物实体的构建
// if (!Objects.equals(queueMsgDTO.getTenantCode(), 1001L)) {
// ObjectNode entityNode = thingCache.findObjectNode(CacheNameEnum.THING_ENTITY, tenantCode + ":" + thingCode);
// if (null == entityNode || entityNode.isEmpty()) {
// IotThingEntity newThingEntity = createThingEntity(thingCode, queueMsgDTO.getTenantCode(), queueMsgDTO.getCompanyId(), queueMsgDTO.getDeptId());
// insertEntityList.add(newThingEntity);
// }
// }
ObjectNode entityNode = thingCache.findObjectNode(CacheNameEnum.THING_ENTITY, tenantCode + ":" + thingCode);
if (null == entityNode || entityNode.isEmpty()) {
IotThingEntity newThingEntity = createThingEntity(thingCode, queueMsgDTO.getTenantCode(), queueMsgDTO.getCompanyId(), queueMsgDTO.getDeptId());
insertEntityList.add(newThingEntity);
}
}
}
//插入新的物模型
if (CollectionUtils.isNotEmpty(insertModelList)) { if (CollectionUtils.isNotEmpty(insertModelList)) {
// 物实体
Map<AuthParam, Set<String>> tenantThingCodeMap =
validMsgList.stream()
.filter(item -> Objects.nonNull(item.getTenantCode())
|| Objects.nonNull(item.getCompanyId())
|| Objects.nonNull(item.getDeptId()))
.collect(Collectors.groupingBy(
item ->new AuthParam(item.getTenantCode(),item.getCompanyId(),item.getDeptId()),
Collectors.mapping(QueueMsgDTO::getThingCode,Collectors.toSet())));
List<IotThingEntity> insertEntityList = new ArrayList<>();
saveTenantThingList(tenantThingCodeMap, insertEntityList);
insertModelList.forEach(item -> {
long count = insertEntityList.stream().filter(e -> StringUtils.equals(e.getCode(), item.getCode())).count();
List<ObjectNode> keyMap = thingCache.findKeyMap(CacheNameEnum.THING_ENTITY, item.getCode());
item.setAuthNum(count+CollectionUtils.size(keyMap));
});
thingModelService.saveBatch(insertModelList);
// thingModelService.saveBatch(insertModelList);
}
//更新老的物模型
if (CollectionUtils.isNotEmpty(updateModelList)) {
// thingModelService.updateBatch(updateModelList);
insertModelList.addAll(updateModelList);
}
//更新物实体的模型uniqueModelList有值物实体不一定有值
List<IotThingEntity> distinctStudentFile = new ArrayList<>();
if (CollectionUtils.isNotEmpty(insertModelList)) {
//插入物实体
if(CollectionUtils.isNotEmpty(insertEntityList)){
List<IotThingEntity> distinctStudentFile = insertEntityList.stream()
//物实体的插入和缓存更新
if (CollectionUtils.isNotEmpty(insertEntityList)) {
distinctStudentFile.addAll(insertEntityList.stream()
.collect(Collectors.collectingAndThen( .collect(Collectors.collectingAndThen(
Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(o -> o.getCode() + ";" + o.getTenantCode()))), ArrayList::new));
thingManageContextService.saveEntity(distinctStudentFile);
updateEntityCache(distinctStudentFile);
Collectors.toCollection(() ->
new TreeSet<>(Comparator.comparing(o -> o.getCode() + ";" + o.getTenantCode()))), ArrayList::new)));
} }
} }
if(CollectionUtils.isNotEmpty(updateModelList)){
updateModelList.forEach(item -> {
long count = updateModelList.stream().filter(e -> StringUtils.equals(e.getCode(), item.getCode())).count();
List<ObjectNode> keyMap = thingCache.findKeyMap(CacheNameEnum.THING_ENTITY, item.getCode());
item.setAuthNum(count+CollectionUtils.size(keyMap));
});
thingModelService.updateBatch(updateModelList);
insertModelList.addAll(updateModelList);
//更新物模型和物实体的缓存
insertModelList.forEach(item -> {
long count = distinctStudentFile.stream().filter(e -> StringUtils.equals(e.getCode(), item.getCode())).count();
List<ObjectNode> keyMap = thingCache.findKeyMap(CacheNameEnum.THING_ENTITY, item.getCode());
item.setAuthNum(count + CollectionUtils.size(keyMap));
});
if(CollectionUtils.isNotEmpty(insertModelList)){
thingModelService.saveOrUpdateBatch(insertModelList);
updateModleCache(insertModelList);
}
if(CollectionUtils.isNotEmpty(distinctStudentFile)){
entityService.saveOrUpdateBatch(distinctStudentFile);
updateEntityCache(distinctStudentFile);
} }
//更新物模板缓存
updateModleCache(insertModelList);
} catch (Exception e) { } catch (Exception e) {
log.error("设备保存失败: {}", e.getMessage(), e); log.error("设备保存失败: {}", e.getMessage(), e);
} }
@ -117,32 +160,41 @@ public class QueueDeviceEventListener {
private void updateModleCache(List<IotThingModelEntity> insertModelList) { private void updateModleCache(List<IotThingModelEntity> insertModelList) {
List<IotThingModelDTO> iotThingModelDTOS = ConvertUtils.sourceToTarget(insertModelList, IotThingModelDTO.class); List<IotThingModelDTO> iotThingModelDTOS = ConvertUtils.sourceToTarget(insertModelList, IotThingModelDTO.class);
List<ObjectNode> modelList = JsonConverter.convertToJsonObjectListObjectNode(iotThingModelDTOS); List<ObjectNode> modelList = JsonConverter.convertToJsonObjectListObjectNode(iotThingModelDTOS);
for (ObjectNode item : modelList) {
//更新物模型的缓存
thingCache.updateKeyMap(CacheNameEnum.THING_MODEL
, item.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText()
+ ":" + item.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText(),item);
List<ObjectNode> topicMap = thingCache.getTopicMap(CacheNameEnum.THING_MODEL);
if(CollectionUtils.isNotEmpty(topicMap)){
Map<String, ObjectNode> modelMap = modelList.stream().collect(Collectors.toMap(
model -> model.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText()
+ ":" + model.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText()
, Function.identity()));
thingCache.putMap(CacheNameEnum.THING_MODEL, modelMap);
}else{
for (ObjectNode item : modelList) {
//更新物模型的缓存
thingCache.updateKeyMap(CacheNameEnum.THING_MODEL
, item.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText()
+ ":" + item.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText(), item);
}
} }
} }
private void updateEntityCache(List<IotThingEntity> distinctStudentFile) { private void updateEntityCache(List<IotThingEntity> distinctStudentFile) {
//更新物实体缓存 //更新物实体缓存
List<IotThingViewDTO> entityList = distinctStudentFile.stream().map(e -> { List<IotThingViewDTO> entityList = distinctStudentFile.stream().map(e -> {
ObjectNode keyMap = thingCache.getKeyMap(CacheNameEnum.THING_ENTITY, e.getCode());
// ObjectNode keyMap = thingCache.getKeyMap(CacheNameEnum.THING_ENTITY, e.getCode());
ObjectNode modelMap = thingCache.findAccurateObjectNode(CacheNameEnum.THING_MODEL, e.getCode());
return new IotThingViewDTO() return new IotThingViewDTO()
.setLat(e.getLat()) .setLat(e.getLat())
.setLon(e.getLon()) .setLon(e.getLon())
.setTags(e.getTags()) .setTags(e.getTags())
.setDeptIds(e.getDeptIds()) .setDeptIds(e.getDeptIds())
.setOrigin(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_ORIGIN.getField()).asText())
.setOrigin(modelMap.get(CacheNameEnum.ModelField.THING_MODEL_ORIGIN.getField()).asText())
.setTemplateMark(TemplateMark.NO.getValue()) .setTemplateMark(TemplateMark.NO.getValue())
.setRealType("1") .setRealType("1")
.setImg(e.getImg()) .setImg(e.getImg())
.setRemark(e.getRemark()) .setRemark(e.getRemark())
.setEnableStatus(e.getEnableStatus()) .setEnableStatus(e.getEnableStatus())
.setStatusTs(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_STATUS_TS.getField()).asLong())
.setStatus(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField()).asText())
.setStatusTs(modelMap.get(CacheNameEnum.ModelField.THING_MODEL_STATUS_TS.getField()).asLong())
.setStatus(modelMap.get(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField()).asText())
.setTenantCode(e.getTenantCode()) .setTenantCode(e.getTenantCode())
.setCompanyId(e.getTenantCode()) .setCompanyId(e.getTenantCode())
.setDeptId(e.getTenantCode()) .setDeptId(e.getTenantCode())
@ -150,88 +202,30 @@ public class QueueDeviceEventListener {
.setEntityName(e.getName()) .setEntityName(e.getName())
.setEntityCode(e.getCode()) .setEntityCode(e.getCode())
.setEntityId(e.getId()) .setEntityId(e.getId())
.setModelId(keyMap.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asLong())
.setModelId(modelMap.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asLong())
.setCreateDate(e.getCreateDate()); .setCreateDate(e.getCreateDate());
}).toList(); }).toList();
List<ObjectNode> entityJsonList = JsonConverter.convertToJsonObjectListObjectNode(entityList); List<ObjectNode> entityJsonList = JsonConverter.convertToJsonObjectListObjectNode(entityList);
for (ObjectNode entityNode : entityJsonList) { for (ObjectNode entityNode : entityJsonList) {
thingCache.updateKeyMap(CacheNameEnum.THING_ENTITY thingCache.updateKeyMap(CacheNameEnum.THING_ENTITY
,entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_TENANT_CODE.getField()).asText()
, entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_TENANT_CODE.getField()).asText()
+ ":" + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_CODE.getField()).asText() + ":" + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_CODE.getField()).asText()
+ ":" + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_ID.getField()).asText(),entityNode);
+ ":" + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_ID.getField()).asText(), entityNode);
} }
} }
/**
* 物管理表是否存在不存在新增设备
*
* @param codeOriginMap 物编码
*/
private void checkAndSaveThingModels(Map<String, String> codeOriginMap, List<IotThingModelEntity> insertList,List<IotThingModelEntity> updateList) {
// 使用迭代器遍历并删除不符合条件的元素
Iterator<Map.Entry<String, String>> iterator = codeOriginMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, String> entry = iterator.next();
String code = entry.getKey();
ObjectNode jsonObject = thingCache.findObjectNode(CacheNameEnum.THING_MODEL, code);
if (null != jsonObject && !jsonObject.isEmpty()) {
IotThingModelEntity modelEntity = JacksonUtil.convertValue(jsonObject, IotThingModelEntity.class);
updateList.add(modelEntity);
iterator.remove();
}
}
codeOriginMap.forEach((thingCode, origin) -> {
IotThingModelEntity modelEntity = new IotThingModelEntity()
.setCode(thingCode)
.setToken(TokenGenerator.generateValue())
.setGateway(GateWayStatus.NO_GATE_WAY.getValue())
.setStatus(ThingStatus.NOT_CONNECTED.getCode())
.setAuthNum(0L)
.setStatusTs(DateTimeUtils.getCurrentTime())
.setOrigin(origin);
modelEntity.setId(IdUtil.getSnowflake().nextId())
.setCreateDate(DateTimeUtils.getCurrentTime())
.setCreateDate(DateTimeUtils.getCurrentTime())
;
insertList.add(modelEntity);
});
}
/**
* 保存物实体
*
* @param tenantThingCodeMap 物实体
*/
private void saveTenantThingList(Map<AuthParam, Set<String>> tenantThingCodeMap, List<IotThingEntity> insertList) {
tenantThingCodeMap.forEach(
(authParam, thingCodes) -> {
if (CollectionUtil.isEmpty(thingCodes)) {
return;
}
thingCodes.removeIf(code -> {
ObjectNode keyMap = thingCache.findObjectNode(CacheNameEnum.THING_ENTITY, authParam.getTenantCode() + ":" + code);
return null != keyMap && !keyMap.isEmpty();
});
if (CollectionUtil.isNotEmpty(thingCodes)) {
for (String newThingCode : thingCodes) {
insertList.add(createThingEntity(newThingCode, authParam));
}
}
});
}
private IotThingEntity createThingEntity(String thingCode, AuthParam authParam) {
private IotThingEntity createThingEntity(String thingCode, Long tenantCode, Long companyId, Long deptId) {
IotThingEntity entity = new IotThingEntity(); IotThingEntity entity = new IotThingEntity();
entity.setId(IdUtil.getSnowflake().nextId());
// entity.setId(IdUtil.getSnowflake().nextId());
entity.setCode(thingCode); entity.setCode(thingCode);
entity.setName(thingCode); entity.setName(thingCode);
entity.setEnableStatus("1"); entity.setEnableStatus("1");
entity.setRealType("1"); entity.setRealType("1");
entity.setTemplateMark("0"); entity.setTemplateMark("0");
entity.setType("默认物类型"); entity.setType("默认物类型");
entity.setTenantCode(authParam.getTenantCode());
entity.setCompanyId(authParam.getCompanyId());
entity.setDeptIds(authParam.getDeptId().toString());
entity.setTenantCode(tenantCode);
entity.setCompanyId(companyId);
entity.setDeptIds(deptId.toString());
return entity; return entity;
} }

33
modules/thing/src/main/java/com/thing/thing/cache/service/ThingCache.java

@ -6,10 +6,7 @@ import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -90,6 +87,12 @@ public final class ThingCache {
return valMap.entrySet().stream().filter(entry -> entry.getKey().contains(key)).map(Map.Entry::getValue).collect(Collectors.toList()); return valMap.entrySet().stream().filter(entry -> entry.getKey().contains(key)).map(Map.Entry::getValue).collect(Collectors.toList());
} }
/**
* 模糊匹配
* @param topic
* @param key
* @return
*/
public ObjectNode findObjectNode(String topic, String key) { public ObjectNode findObjectNode(String topic, String key) {
ConcurrentHashMap<String, ObjectNode> valMap = thingMap.get(topic); ConcurrentHashMap<String, ObjectNode> valMap = thingMap.get(topic);
if(MapUtils.isEmpty(valMap)){ if(MapUtils.isEmpty(valMap)){
@ -99,6 +102,27 @@ public final class ThingCache {
return first.orElse(null); return first.orElse(null);
} }
/**
* 精确匹配
* @param topic
* @param key
* @return
*/
public ObjectNode findAccurateObjectNode(String topic, String key) {
ConcurrentHashMap<String, ObjectNode> valMap = thingMap.get(topic);
if(MapUtils.isEmpty(valMap)){
return null;
}
Optional<ObjectNode> first = valMap.entrySet().stream().filter(entry ->
Arrays.asList(entry.getKey().split(":")).contains(key)
).map(Map.Entry::getValue).findFirst();
return first.orElse(null);
}
public void deleteKeyMap(String topic, String key) { public void deleteKeyMap(String topic, String key) {
ConcurrentHashMap<String, ObjectNode> valMap = thingMap.get(topic); ConcurrentHashMap<String, ObjectNode> valMap = thingMap.get(topic);
if(MapUtils.isEmpty(valMap)){ if(MapUtils.isEmpty(valMap)){
@ -132,6 +156,7 @@ public final class ThingCache {
} }
//直接利用key覆盖 //直接利用key覆盖
valMap.put(key,value); valMap.put(key,value);
thingMap.put(topic,valMap);
} }
public List<ObjectNode> findAllKeyMap(String topic, Collection<String> key) { public List<ObjectNode> findAllKeyMap(String topic, Collection<String> key) {

Loading…
Cancel
Save