Browse Source

订阅数据

2024年9月10日14:34:25
thing_master
lishuai 1 year ago
parent
commit
4fa2811827
  1. 65
      modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java
  2. 4
      modules/thing/src/main/java/com/thing/thing/model/service/IotThingModelService.java
  3. 13
      modules/thing/src/main/java/com/thing/thing/model/service/impl/IotThingModelServiceImpl.java

65
modules/thing/src/main/java/com/thing/listener/QueueDeviceEventListener.java

@ -12,15 +12,16 @@ import com.thing.common.core.event.AuthParam;
import com.thing.common.core.event.QueueDeviceEvent; import com.thing.common.core.event.QueueDeviceEvent;
import com.thing.common.core.utils.ConvertUtils; import com.thing.common.core.utils.ConvertUtils;
import com.thing.common.core.utils.DateTimeUtils; import com.thing.common.core.utils.DateTimeUtils;
import com.thing.common.core.utils.JacksonUtil;
import com.thing.common.core.utils.TokenGenerator; import com.thing.common.core.utils.TokenGenerator;
import com.thing.common.data.dto.QueueMsgDTO; import com.thing.common.data.dto.QueueMsgDTO;
import com.thing.thing.cache.service.CacheInit;
import com.thing.thing.cache.service.ThingCache; import com.thing.thing.cache.service.ThingCache;
import com.thing.thing.context.service.ThingManageContextService; import com.thing.thing.context.service.ThingManageContextService;
import com.thing.thing.entity.dto.IotThingViewDTO; import com.thing.thing.entity.dto.IotThingViewDTO;
import com.thing.thing.entity.entity.IotThingEntity; import com.thing.thing.entity.entity.IotThingEntity;
import com.thing.thing.model.dto.IotThingModelDTO; import com.thing.thing.model.dto.IotThingModelDTO;
import com.thing.thing.model.entity.IotThingModelEntity; import com.thing.thing.model.entity.IotThingModelEntity;
import com.thing.thing.model.service.IotThingModelService;
import com.thing.transport.api.adaptor.JsonConverter; import com.thing.transport.api.adaptor.JsonConverter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -39,6 +40,8 @@ public class QueueDeviceEventListener {
private final ThingManageContextService thingManageContextService; private final ThingManageContextService thingManageContextService;
private final IotThingModelService thingModelService;
private final ThingCache thingCache; private final ThingCache thingCache;
@EventListener(QueueDeviceEvent.class) @EventListener(QueueDeviceEvent.class)
@ -58,10 +61,12 @@ public class QueueDeviceEventListener {
item -> (QueueOriginType.AUTO_DATA_SYNC.name().equals(item.getOrigin()) item -> (QueueOriginType.AUTO_DATA_SYNC.name().equals(item.getOrigin())
|| QueueOriginType.MANUAL_DATA_SYNC.name().equals(item.getOrigin())) || QueueOriginType.MANUAL_DATA_SYNC.name().equals(item.getOrigin()))
? QueueOriginType.TB.name() : item.getOrigin(), (e1, e2) -> e1)); ? QueueOriginType.TB.name() : item.getOrigin(), (e1, e2) -> e1));
//物模型
List<IotThingModelEntity> insertModelList = new ArrayList<>();
//物模型插入 //物模型插入
checkAndSaveThingModels(codeOriginMap, insertModelList);
List<IotThingModelEntity> insertModelList = new ArrayList<>();
//物模型更新
List<IotThingModelEntity> updateModelList = new ArrayList<>();
//筛选物模型的插入和更新
checkAndSaveThingModels(codeOriginMap, insertModelList,updateModelList);
if (CollectionUtils.isNotEmpty(insertModelList)) { if (CollectionUtils.isNotEmpty(insertModelList)) {
// 物实体 // 物实体
@ -73,34 +78,55 @@ public class QueueDeviceEventListener {
.collect(Collectors.groupingBy( .collect(Collectors.groupingBy(
item ->new AuthParam(item.getTenantCode(),item.getCompanyId(),item.getDeptId()), item ->new AuthParam(item.getTenantCode(),item.getCompanyId(),item.getDeptId()),
Collectors.mapping(QueueMsgDTO::getThingCode,Collectors.toSet()))); Collectors.mapping(QueueMsgDTO::getThingCode,Collectors.toSet())));
List<IotThingEntity> insertEntityList = new ArrayList<>(); List<IotThingEntity> insertEntityList = new ArrayList<>();
saveTenantThingList(tenantThingCodeMap, insertEntityList); saveTenantThingList(tenantThingCodeMap, insertEntityList);
insertModelList.forEach(item -> { insertModelList.forEach(item -> {
long count = insertEntityList.stream().filter(e -> StringUtils.equals(e.getCode(), item.getCode())).count(); long count = insertEntityList.stream().filter(e -> StringUtils.equals(e.getCode(), item.getCode())).count();
List<ObjectNode> keyMap = thingCache.findKeyMap(CacheNameEnum.THING_ENTITY, item.getCode()); List<ObjectNode> keyMap = thingCache.findKeyMap(CacheNameEnum.THING_ENTITY, item.getCode());
item.setAuthNum(count+CollectionUtils.size(keyMap)); item.setAuthNum(count+CollectionUtils.size(keyMap));
}); });
thingManageContextService.saveAllModel(insertModelList);
thingModelService.saveBatch(insertModelList);
//插入物实体
if(CollectionUtils.isNotEmpty(insertEntityList)){
List<IotThingEntity> distinctStudentFile = insertEntityList.stream()
.collect(Collectors.collectingAndThen(
Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(o -> o.getCode() + ";" + o.getTenantCode()))), ArrayList::new));
thingManageContextService.saveEntity(distinctStudentFile);
updateEntityCache(distinctStudentFile);
}
}
if(CollectionUtils.isNotEmpty(updateModelList)){
updateModelList.forEach(item -> {
long count = updateModelList.stream().filter(e -> StringUtils.equals(e.getCode(), item.getCode())).count();
List<ObjectNode> keyMap = thingCache.findKeyMap(CacheNameEnum.THING_ENTITY, item.getCode());
item.setAuthNum(count+CollectionUtils.size(keyMap));
});
thingModelService.updateBatch(updateModelList);
insertModelList.addAll(updateModelList);
}
//更新物模板缓存 //更新物模板缓存
updateModleCache(insertModelList);
} catch (Exception e) {
log.error("设备保存失败: {}", e.getMessage(), e);
}
}
private void updateModleCache(List<IotThingModelEntity> insertModelList) {
List<IotThingModelDTO> iotThingModelDTOS = ConvertUtils.sourceToTarget(insertModelList, IotThingModelDTO.class); List<IotThingModelDTO> iotThingModelDTOS = ConvertUtils.sourceToTarget(insertModelList, IotThingModelDTO.class);
List<ObjectNode> modelList = JsonConverter.convertToJsonObjectListObjectNode(iotThingModelDTOS); List<ObjectNode> modelList = JsonConverter.convertToJsonObjectListObjectNode(iotThingModelDTOS);
for (ObjectNode item : modelList) { for (ObjectNode item : modelList) {
//更新物模型
//更新物模型的缓存
thingCache.updateKeyMap(CacheNameEnum.THING_MODEL thingCache.updateKeyMap(CacheNameEnum.THING_MODEL
, item.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText() , item.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText()
+ ":" + item.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText(),item); + ":" + item.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText(),item);
} }
}
if(CollectionUtils.isNotEmpty(insertEntityList)){
List<IotThingEntity> distinctStudentFile = insertEntityList.stream()
.collect(Collectors.collectingAndThen(
Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(o -> o.getCode() + ";" + o.getTenantCode()))), ArrayList::new));
thingManageContextService.saveEntity(distinctStudentFile);
private void updateEntityCache(List<IotThingEntity> distinctStudentFile) {
//更新物实体缓存 //更新物实体缓存
List<IotThingViewDTO> entityList = distinctStudentFile.stream().map(e -> { List<IotThingViewDTO> entityList = distinctStudentFile.stream().map(e -> {
ObjectNode keyMap = thingCache.getKeyMap(CacheNameEnum.THING_ENTITY, e.getCode()); ObjectNode keyMap = thingCache.getKeyMap(CacheNameEnum.THING_ENTITY, e.getCode());
@ -135,18 +161,13 @@ public class QueueDeviceEventListener {
+ ":" + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_ID.getField()).asText(),entityNode); + ":" + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_ID.getField()).asText(),entityNode);
} }
} }
}
} catch (Exception e) {
log.error("设备保存失败: {}", e.getMessage(), e);
}
}
/** /**
* 物管理表是否存在不存在新增设备 * 物管理表是否存在不存在新增设备
* *
* @param codeOriginMap 物编码 * @param codeOriginMap 物编码
*/ */
private void checkAndSaveThingModels(Map<String, String> codeOriginMap, List<IotThingModelEntity> insertList) throws Exception {
private void checkAndSaveThingModels(Map<String, String> codeOriginMap, List<IotThingModelEntity> insertList,List<IotThingModelEntity> updateList) {
// 使用迭代器遍历并删除不符合条件的元素 // 使用迭代器遍历并删除不符合条件的元素
Iterator<Map.Entry<String, String>> iterator = codeOriginMap.entrySet().iterator(); Iterator<Map.Entry<String, String>> iterator = codeOriginMap.entrySet().iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
@ -154,6 +175,8 @@ public class QueueDeviceEventListener {
String code = entry.getKey(); String code = entry.getKey();
ObjectNode jsonObject = thingCache.findObjectNode(CacheNameEnum.THING_MODEL, code); ObjectNode jsonObject = thingCache.findObjectNode(CacheNameEnum.THING_MODEL, code);
if (null != jsonObject && !jsonObject.isEmpty()) { if (null != jsonObject && !jsonObject.isEmpty()) {
IotThingModelEntity modelEntity = JacksonUtil.convertValue(jsonObject, IotThingModelEntity.class);
updateList.add(modelEntity);
iterator.remove(); iterator.remove();
} }
} }
@ -163,7 +186,7 @@ public class QueueDeviceEventListener {
.setToken(TokenGenerator.generateValue()) .setToken(TokenGenerator.generateValue())
.setGateway(GateWayStatus.NO_GATE_WAY.getValue()) .setGateway(GateWayStatus.NO_GATE_WAY.getValue())
.setStatus(ThingStatus.NOT_CONNECTED.getCode()) .setStatus(ThingStatus.NOT_CONNECTED.getCode())
.setAuthNum(1L)
.setAuthNum(0L)
.setStatusTs(DateTimeUtils.getCurrentTime()) .setStatusTs(DateTimeUtils.getCurrentTime())
.setOrigin(origin); .setOrigin(origin);
modelEntity.setId(IdUtil.getSnowflake().nextId()) modelEntity.setId(IdUtil.getSnowflake().nextId())

4
modules/thing/src/main/java/com/thing/thing/model/service/IotThingModelService.java

@ -27,6 +27,8 @@ public interface IotThingModelService extends IBaseService<IotThingModelEntity>
Optional<ModelDetailDTO> findByCode(String code); Optional<ModelDetailDTO> findByCode(String code);
Optional<IotThingModelEntity> getByCode(String code);
Optional<List<ObjectNode>> findByGateway(String gateway); Optional<List<ObjectNode>> findByGateway(String gateway);
Optional<ObjectNode> findModelByGatewayAndToken(String gateway, String token); Optional<ObjectNode> findModelByGatewayAndToken(String gateway, String token);
@ -59,4 +61,6 @@ public interface IotThingModelService extends IBaseService<IotThingModelEntity>
void batchSaveOrUpdate(List<IotThingModelEntity> entities); void batchSaveOrUpdate(List<IotThingModelEntity> entities);
void batchInsertOrUpdate(List<IotThingModelEntity> entities);
} }

13
modules/thing/src/main/java/com/thing/thing/model/service/impl/IotThingModelServiceImpl.java

@ -137,7 +137,6 @@ public class IotThingModelServiceImpl extends BaseServiceImpl<IotThingModelMappe
Long startTime, Long startTime,
Long endTime) { Long endTime) {
List<ObjectNode> modelList = cache.getTopicMap(CacheNameEnum.THING_MODEL); List<ObjectNode> modelList = cache.getTopicMap(CacheNameEnum.THING_MODEL);
log.info("modelList缓存数据个数:{},modelList缓存数据:{}", modelList.size(),modelList.toString());
if (CollectionUtils.isEmpty(modelList)) { if (CollectionUtils.isEmpty(modelList)) {
List<IotThingModelDTO> list = mapper.selectListByQueryAs(getWrapper(orderField, order, null, null, null, List<IotThingModelDTO> list = mapper.selectListByQueryAs(getWrapper(orderField, order, null, null, null,
null, null,null,null, null), IotThingModelDTO.class); null, null,null,null, null), IotThingModelDTO.class);
@ -175,6 +174,11 @@ public class IotThingModelServiceImpl extends BaseServiceImpl<IotThingModelMappe
return Optional.ofNullable(mapper.selectOneByQueryAs(QueryWrapper.create().eq(IotThingModelEntity::getCode, code), ModelDetailDTO.class)); return Optional.ofNullable(mapper.selectOneByQueryAs(QueryWrapper.create().eq(IotThingModelEntity::getCode, code), ModelDetailDTO.class));
} }
@Override
public Optional<IotThingModelEntity> getByCode(String code) {
return Optional.ofNullable(mapper.selectOneByQuery(QueryWrapper.create().eq(IotThingModelEntity::getCode, code)));
}
@Override @Override
public Optional<List<ObjectNode>> findByGateway(String gateway) { public Optional<List<ObjectNode>> findByGateway(String gateway) {
List<ObjectNode> topicMap = cache.getTopicMap(CacheNameEnum.THING_MODEL); List<ObjectNode> topicMap = cache.getTopicMap(CacheNameEnum.THING_MODEL);
@ -379,10 +383,15 @@ public class IotThingModelServiceImpl extends BaseServiceImpl<IotThingModelMappe
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
@Override @Override
public void batchSaveOrUpdate(List<IotThingModelEntity> modelEntities) { public void batchSaveOrUpdate(List<IotThingModelEntity> modelEntities) {
modelEntities.forEach(model -> mapper.insertOrUpdateSelective(model));
modelEntities.forEach(model -> mapper.insertOrUpdate(model));
cache.clearTopic(CacheNameEnum.THING_MODEL); cache.clearTopic(CacheNameEnum.THING_MODEL);
} }
@Override
public void batchInsertOrUpdate(List<IotThingModelEntity> entities) {
entities.forEach(model -> mapper.insertOrUpdate(model));
}
//物模型新增和更新中分配物实体信息 //物模型新增和更新中分配物实体信息
private List<IotThingEntity> shareThingsToTenantCode(Collection<String> codeList, Collection<Long> tenantCodes) { private List<IotThingEntity> shareThingsToTenantCode(Collection<String> codeList, Collection<Long> tenantCodes) {
if (CollectionUtils.isEmpty(tenantCodes)) { if (CollectionUtils.isEmpty(tenantCodes)) {

Loading…
Cancel
Save