Browse Source

websocket和超级api (bug,routeId的兼容问题),物关系,

2024年12月23日15:47:00
qingyuan_dev_new
lishuai 1 year ago
parent
commit
708e47a22b
  1. 21
      modules/thing/src/main/java/com/thing/thing/api/service/impl/IotThingApiServiceImpl.java
  2. 6
      modules/thing/src/main/java/com/thing/thing/relation/root/service/impl/IotThingRelationRootServiceImpl.java
  3. 83
      modules/thing/src/main/java/com/thing/websocket/QueueSocketEventListener.java
  4. 45
      modules/thing/src/main/java/com/thing/websocket/WebSocketServer.java

21
modules/thing/src/main/java/com/thing/thing/api/service/impl/IotThingApiServiceImpl.java

@ -7,6 +7,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.mybatisflex.core.keygen.impl.SnowFlakeIDKeyGenerator;
@ -40,6 +41,7 @@ import com.thing.thing.dictRelation.dto.IotThingDictRelationDTO;
import com.thing.thing.dictRelation.param.IotThingDictRelationParamDTO;
import com.thing.thing.entity.dto.IotThingEntityDictDTO;
import com.thing.thing.entity.dto.IotThingViewDTO;
import com.thing.transport.api.adaptor.JsonConverter;
import com.thing.util.BeanUtil;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletRequest;
@ -421,7 +423,10 @@ public class IotThingApiServiceImpl extends BaseServiceImpl<IotThingApiMapper, I
public Map<String, Object> websocketApi(Long id) {
IotThingApiDTO dto = getByIdAs(id, IotThingApiDTO.class);
if (ObjectUtil.isNull(dto)) {
return Maps.newHashMap();
dto = mapper.selectOneByQueryAs(QueryWrapper.create().eq(IotThingApiEntity::getRouteId, id), IotThingApiDTO.class);
if(ObjectUtil.isNull(dto)){
return Maps.newHashMap();
}
}
//todo 这里主要是组态设计的websocket请求可能需要修改
String remark = dto.getRemark();
@ -429,12 +434,6 @@ public class IotThingApiServiceImpl extends BaseServiceImpl<IotThingApiMapper, I
if (StringUtils.equals(remark, "0")) {
tenantCode = UserContext.getRealTenantCode();
}
// Map<String, Object> params = new HashMap<>();
// params.put("id", id);
// params.put("reqParams", dto.getReqParams());
// Map<String, Object> stringObjectMap1 = telemetryById(params);
// Map<String, Object> stringObjectMap = encapsulationQuery(dto.getThingCondition(), dto.getAttrCondition(), dto.getTimeCondition(), dto.getReqParams(), null, null, dto.getSort(), tenantCode);
return encapsulationQuery(dto.getThingCondition(), dto.getAttrCondition(), dto.getTimeCondition(), dto.getReqParams(), null, null, dto.getSort(), tenantCode);
}
@ -503,7 +502,7 @@ public class IotThingApiServiceImpl extends BaseServiceImpl<IotThingApiMapper, I
//属性查询类型
Map<String, Object> entityResultMap = new HashMap<>();
Map<String, IotThingViewDTO> entityInfoMap = new HashMap<>();
Map<String, ObjectNode> entityInfoMap = new HashMap<>();
ApiEntityAttrDTO attrsEntity = ApiEntityAttrDTO.createFromJson(attrCondition);
String type = attrsEntity.getType();
//过滤条件的封装
@ -534,7 +533,8 @@ public class IotThingApiServiceImpl extends BaseServiceImpl<IotThingApiMapper, I
Map<String, IotThingDictRelationDTO> collect = dictRelationDTOList.orElseGet(Collections::emptyList).stream()
.collect(Collectors.toMap(IotThingDictRelationDTO::getCode, Function.identity(),(existing, replacement) -> existing));
optional.get().setAttrs(collect);
entityInfoMap.put(code, optional.get());
ObjectNode nodes = JsonConverter.convertToJsonObjectObjectNode(optional.get());
entityInfoMap.put(code, nodes);
}
});
}
@ -582,7 +582,8 @@ public class IotThingApiServiceImpl extends BaseServiceImpl<IotThingApiMapper, I
s-> ConvertUtils.sourceToTarget(s,IotThingDictRelationDTO.class)
,(existing, replacement) -> existing));
optional.get().setAttrs(collect);
entityInfoMap.put(entityCode, optional.get());
ObjectNode nodes = JsonConverter.convertToJsonObjectObjectNode(optional.get());
entityInfoMap.put(entityCode, nodes);
}
});
}

6
modules/thing/src/main/java/com/thing/thing/relation/root/service/impl/IotThingRelationRootServiceImpl.java

@ -122,7 +122,7 @@ public class IotThingRelationRootServiceImpl extends BaseServiceImpl<IotThingRel
}
String finalOrderField = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, orderField);
Comparator<ObjectNode> comparator = CompareUtils.getComparator(order, finalOrderField); //封装参数
List<Pair<String, String>> pairs = buildParam(name, groupName,tenantCode);
List<Pair<String, String>> pairs = buildParam(name, null,tenantCode);
return rootList.stream()
.filter(jsonObject -> {
boolean passesFilter = JacksonUtil.filter(jsonObject, pairs);
@ -132,7 +132,9 @@ public class IotThingRelationRootServiceImpl extends BaseServiceImpl<IotThingRel
passesFilter = ArrayUtils.contains(idList, String.valueOf(entityId));
}
return passesFilter;
}).sorted(comparator.thenComparing(obj -> obj.get(CacheNameEnum.RelationRootField.THING_RELATION_ROOT_ID.getField()).asLong())).toList();
})
.filter(s-> s.get(CacheNameEnum.RelationRootField.THING_RELATION_ROOT_GROUP_NAME.getField()).asText().equals(groupName))
.sorted(comparator.thenComparing(obj -> obj.get(CacheNameEnum.RelationRootField.THING_RELATION_ROOT_ID.getField()).asLong())).toList();
}
@Override

83
modules/thing/src/main/java/com/thing/websocket/QueueSocketEventListener.java

@ -2,11 +2,10 @@ package com.thing.websocket;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.map.MapUtil;
import com.google.common.collect.Lists;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import com.thing.common.core.event.QueueSocketEvent;
import com.thing.common.core.message.MessageData;
@ -15,10 +14,7 @@ import com.thing.thing.tskv.dto.ThingAttrDTO;
import com.thing.thing.tskv.dto.TsKvDTO;
import com.thing.thing.tskv.service.TskvService;
import com.thing.websocket.data.SocketDataCache;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@ -27,7 +23,6 @@ import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
@ -54,9 +49,12 @@ public class QueueSocketEventListener {
//超级API
if (MapUtil.isNotEmpty(webSocketServer.API_SOCKET_MAP)) {
webSocketServer.API_SOCKET_MAP.forEach((sessionId, thingList) -> {
JsonObject jsonObject = getApiMap(lastAttrValMap, thingList);
if (jsonObject != null && !jsonObject.isJsonNull() && !jsonObject.entrySet().isEmpty()) {
webSocketServer.sendMessage(sessionId, new MessageData<>().data(jsonObject));
JSONObject jsonObject = getApiMap(lastAttrValMap, thingList);
if(null != jsonObject){
JSONObject result = jsonObject.getJSONObject("result"); // 获取 result
if(result.containsKey("values")){
webSocketServer.sendMessage(sessionId, new MessageData<>().data(jsonObject));
}
}
});
}
@ -120,33 +118,44 @@ public class QueueSocketEventListener {
private final Gson gson = new Gson();
private final Type listType = new TypeToken<List<QueueMsgDTO>>() {}.getType();
private JsonObject getApiMap(Map<String, List<QueueMsgDTO>> lastAttrValMap, JsonObject jsonObject) {
// 使用 Gson 将第一个 JsonObject 转换为 JsonElement
JsonElement jsonElement = gson.toJsonTree(jsonObject);
// JsonElement 转换为第二个 JsonObject
JsonObject dataJson = jsonElement.getAsJsonObject();
JsonObject result = dataJson.get("result").getAsJsonObject();
JsonObject info = result.get("info").getAsJsonObject();
List<QueueMsgDTO> queueMsgDTOList = Lists.newArrayList();
for (String thingCode : info.keySet()) {
JsonObject object = info.get(thingCode).getAsJsonObject();
Set<String> thingAttrList = object.get("attrs").getAsJsonObject().keySet();
List<QueueMsgDTO> queueMsgDTOS = lastAttrValMap.get(thingCode);
if(CollectionUtil.isNotEmpty(thingAttrList) && CollectionUtil.isNotEmpty(queueMsgDTOS)){
List<QueueMsgDTO> collect = queueMsgDTOS.stream()
.filter(s -> thingAttrList.contains(s.getAttrKey()) && StringUtils.equals(thingCode, s.getThingCode()))
.collect(Collectors.toList());
queueMsgDTOList.addAll(collect);
}
private JSONObject getApiMap(Map<String, List<QueueMsgDTO>> lastAttrValMap, JSONObject jsonObject) {
// 获取 "result" 对象
JSONObject result = jsonObject.getJSONObject("result"); // 获取 result
if (result == null) {
return null; // 如果 "result" null返回空 JSON 对象
}
if(CollectionUtil.isEmpty(queueMsgDTOList)){
return new JsonObject();
JSONObject info = result.getJSONObject("info"); // 获取 info
if (info == null) {
return null; // 如果 "info" null返回空 JSON 对象
}
List<QueueMsgDTO> queueMsgDTOList = new ArrayList<>();
info.forEach((k,v)->{
ObjectMapper objectMapper = new ObjectMapper();
ObjectNode objectNode = objectMapper.createObjectNode();
String string = info.getString(k);
JSONObject jsonObject1 = JSONObject.parseObject(string);
String thingCode = jsonObject1.getString("entityCode");
boolean b = lastAttrValMap.containsKey(thingCode);
if(b){
JSONObject attrs = jsonObject1.getJSONObject("attrs");
List<QueueMsgDTO> queueMsgDTOS = lastAttrValMap.get(thingCode);
if (CollectionUtil.isNotEmpty(queueMsgDTOS)) {
List<QueueMsgDTO> collect = queueMsgDTOS.stream()
.filter(s -> attrs.keySet().contains(s.getAttrKey()) && s.getThingCode().equals(thingCode))
.toList();
queueMsgDTOList.addAll(collect);
}
}
});
if (queueMsgDTOList.isEmpty()) {
result.remove("values");
jsonObject.put("result",result);
return null; // 如果没有符合条件的 QueueMsgDTO返回空 JSON 对象
}
// 使用 TypeToken 获取 List<QueueMsgDTO> 的类型
// Type listType = new TypeToken<List<QueueMsgDTO>>() {}.getType();
// List<QueueMsgDTO> 转换为 JsonElement
JsonElement jsonElement1 = gson.toJsonTree(queueMsgDTOList, listType);
result.add("values",jsonElement1);
return dataJson;
result.put("values", queueMsgDTOList); // values 放入 result
result.put("info", info); // values 放入 result
jsonObject.put("result",result);
return jsonObject; // 返回修改后的 dataJson
}
}

45
modules/thing/src/main/java/com/thing/websocket/WebSocketServer.java

@ -2,14 +2,11 @@ package com.thing.websocket;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.thing.common.core.constants.Constant;
import com.thing.common.core.message.MessageData;
import com.thing.common.core.utils.SpringContextUtils;
@ -34,7 +31,6 @@ import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -52,13 +48,13 @@ public class WebSocketServer {
/**
* 客户端连接信息
*/
private static Map<String, WebSocketData> servers = new ConcurrentHashMap<>();
private static final Map<String, WebSocketData> servers = new ConcurrentHashMap<>();
public static final Map<String, List<Long>> TRANSPORT_EXTEND_MAP = new ConcurrentHashMap<>();
public static final Map<String, List<SocketDataCache>> DASHBOARD_MAP = new ConcurrentHashMap<>();
public static final Map<String, JsonObject> API_SOCKET_MAP = new ConcurrentHashMap<>();
public static final Map<String, JSONObject> API_SOCKET_MAP = new ConcurrentHashMap<>();
private static final ByteBuffer PING_MSG = ByteBuffer.wrap(new byte[]{});
@ -94,7 +90,6 @@ public class WebSocketServer {
iotThingApiService = SpringContextUtils.getBean(IotThingApiService.class);
//加载节点配置
loadApiList(session, apiId);
}
//看板全局变量存储
@ -252,25 +247,15 @@ public class WebSocketServer {
if (MapUtil.isNotEmpty(apiMap)) {
try {
String jsonString = objectMapper.writeValueAsString(apiMap);
JsonNode jsonNode = objectMapper.readTree(jsonString);
convertValuesToString(jsonNode);
JsonObject jsonObject = JsonParser.parseString(jsonNode.toString()).getAsJsonObject();
JSONObject jsonObject1 = JSONObject.parseObject(jsonString);
//推送一次最新数据
sendMessage(session,new MessageData<>().data(jsonObject));
sendMessage(session,new MessageData<>().data(jsonObject1));
//存储一个超级API需要哪些物和相应的属性
API_SOCKET_MAP.put(session.getId(),jsonObject1);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
Map result = MapUtil.get(apiMap, "result", Map.class);
result.remove("values");
apiMap.put("result",result);
// 创建一个 Gson 对象
Gson gson = new Gson();
// Map<String, Object> 转换为 JsonObject
JsonObject jsonObject = gson.toJsonTree(apiMap).getAsJsonObject();
//存储一个超级API需要哪些物和相应的属性
API_SOCKET_MAP.put(session.getId(),jsonObject);
}
private void loadTransportExtendList(String sessionId, String transportExtendId) {
@ -279,20 +264,4 @@ public class WebSocketServer {
TRANSPORT_EXTEND_MAP.put(sessionId, collect);
}
public static void convertValuesToString(JsonNode jsonNode) {
if (jsonNode.isObject()) {
ObjectNode objectNode = (ObjectNode) jsonNode;
Iterator<Map.Entry<String, JsonNode>> iterator = objectNode.fields();
while (iterator.hasNext()) {
Map.Entry<String, JsonNode> entry = iterator.next();
convertValuesToString(entry.getValue());
if (entry.getValue().isValueNode()) {
objectNode.put(entry.getKey(), entry.getValue().asText());
}
}
} else if (jsonNode.isArray()) {
jsonNode.forEach(element -> convertValuesToString(element));
}
}
}
Loading…
Cancel
Save