From 827a84de87e5289367ce3b99554530c73f52495c Mon Sep 17 00:00:00 2001 From: lishuai Date: Mon, 23 Dec 2024 15:40:49 +0800 Subject: [PATCH] =?UTF-8?q?websocketbug=E4=BF=AE=E5=A4=8D=202024=E5=B9=B41?= =?UTF-8?q?2=E6=9C=8823=E6=97=A515:40:45?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../websocket/QueueSocketEventListener.java | 83 ++++++++++--------- .../com/thing/websocket/WebSocketServer.java | 45 ++-------- 2 files changed, 53 insertions(+), 75 deletions(-) diff --git a/modules/thing/src/main/java/com/thing/websocket/QueueSocketEventListener.java b/modules/thing/src/main/java/com/thing/websocket/QueueSocketEventListener.java index 9a14574..44622b8 100644 --- a/modules/thing/src/main/java/com/thing/websocket/QueueSocketEventListener.java +++ b/modules/thing/src/main/java/com/thing/websocket/QueueSocketEventListener.java @@ -2,11 +2,10 @@ package com.thing.websocket; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.map.MapUtil; - -import com.google.common.collect.Lists; +import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; import com.google.gson.reflect.TypeToken; import com.thing.common.core.event.QueueSocketEvent; import com.thing.common.core.message.MessageData; @@ -15,10 +14,7 @@ import com.thing.thing.tskv.dto.ThingAttrDTO; import com.thing.thing.tskv.dto.TsKvDTO; import com.thing.thing.tskv.service.TskvService; import com.thing.websocket.data.SocketDataCache; - import lombok.extern.slf4j.Slf4j; - -import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; @@ -27,7 +23,6 @@ import java.lang.reflect.Type; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; /** @@ -54,9 +49,12 @@ public class QueueSocketEventListener { //超级API if (MapUtil.isNotEmpty(webSocketServer.API_SOCKET_MAP)) { webSocketServer.API_SOCKET_MAP.forEach((sessionId, thingList) -> { - JsonObject jsonObject = getApiMap(lastAttrValMap, thingList); - if (jsonObject != null && !jsonObject.isJsonNull() && !jsonObject.entrySet().isEmpty()) { - webSocketServer.sendMessage(sessionId, new MessageData<>().data(jsonObject)); + JSONObject jsonObject = getApiMap(lastAttrValMap, thingList); + if(null != jsonObject){ + JSONObject result = jsonObject.getJSONObject("result"); // 获取 result + if(result.containsKey("values")){ + webSocketServer.sendMessage(sessionId, new MessageData<>().data(jsonObject)); + } } }); } @@ -120,33 +118,44 @@ public class QueueSocketEventListener { private final Gson gson = new Gson(); private final Type listType = new TypeToken>() {}.getType(); - private JsonObject getApiMap(Map> lastAttrValMap, JsonObject jsonObject) { - // 使用 Gson 将第一个 JsonObject 转换为 JsonElement - JsonElement jsonElement = gson.toJsonTree(jsonObject); - // 将 JsonElement 转换为第二个 JsonObject - JsonObject dataJson = jsonElement.getAsJsonObject(); - JsonObject result = dataJson.get("result").getAsJsonObject(); - JsonObject info = result.get("info").getAsJsonObject(); - List queueMsgDTOList = Lists.newArrayList(); - for (String thingCode : info.keySet()) { - JsonObject object = info.get(thingCode).getAsJsonObject(); - Set thingAttrList = object.get("attrs").getAsJsonObject().keySet(); - List queueMsgDTOS = lastAttrValMap.get(thingCode); - if(CollectionUtil.isNotEmpty(thingAttrList) && CollectionUtil.isNotEmpty(queueMsgDTOS)){ - List collect = queueMsgDTOS.stream() - .filter(s -> thingAttrList.contains(s.getAttrKey()) && StringUtils.equals(thingCode, s.getThingCode())) - .collect(Collectors.toList()); - queueMsgDTOList.addAll(collect); - } + private JSONObject getApiMap(Map> lastAttrValMap, JSONObject jsonObject) { + // 获取 "result" 对象 + JSONObject result = jsonObject.getJSONObject("result"); // 获取 result + if (result == null) { + return null; // 如果 "result" 为 null,返回空 JSON 对象 } - if(CollectionUtil.isEmpty(queueMsgDTOList)){ - return new JsonObject(); + JSONObject info = result.getJSONObject("info"); // 获取 info + if (info == null) { + return null; // 如果 "info" 为 null,返回空 JSON 对象 + } + List queueMsgDTOList = new ArrayList<>(); + info.forEach((k,v)->{ + ObjectMapper objectMapper = new ObjectMapper(); + ObjectNode objectNode = objectMapper.createObjectNode(); + String string = info.getString(k); + JSONObject jsonObject1 = JSONObject.parseObject(string); + String thingCode = jsonObject1.getString("entityCode"); + boolean b = lastAttrValMap.containsKey(thingCode); + if(b){ + JSONObject attrs = jsonObject1.getJSONObject("attrs"); + List queueMsgDTOS = lastAttrValMap.get(thingCode); + if (CollectionUtil.isNotEmpty(queueMsgDTOS)) { + List collect = queueMsgDTOS.stream() + .filter(s -> attrs.keySet().contains(s.getAttrKey()) && s.getThingCode().equals(thingCode)) + .toList(); + queueMsgDTOList.addAll(collect); + } + } + }); + + if (queueMsgDTOList.isEmpty()) { + result.remove("values"); + jsonObject.put("result",result); + return null; // 如果没有符合条件的 QueueMsgDTO,返回空 JSON 对象 } - // 使用 TypeToken 获取 List 的类型 - // Type listType = new TypeToken>() {}.getType(); - // 将 List 转换为 JsonElement - JsonElement jsonElement1 = gson.toJsonTree(queueMsgDTOList, listType); - result.add("values",jsonElement1); - return dataJson; + result.put("values", queueMsgDTOList); // 将 values 放入 result 中 + result.put("info", info); // 将 values 放入 result 中 + jsonObject.put("result",result); + return jsonObject; // 返回修改后的 dataJson } } \ No newline at end of file diff --git a/modules/thing/src/main/java/com/thing/websocket/WebSocketServer.java b/modules/thing/src/main/java/com/thing/websocket/WebSocketServer.java index 0ae00b9..5aa4d13 100644 --- a/modules/thing/src/main/java/com/thing/websocket/WebSocketServer.java +++ b/modules/thing/src/main/java/com/thing/websocket/WebSocketServer.java @@ -2,14 +2,11 @@ package com.thing.websocket; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; import com.google.gson.Gson; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; import com.thing.common.core.constants.Constant; import com.thing.common.core.message.MessageData; import com.thing.common.core.utils.SpringContextUtils; @@ -34,7 +31,6 @@ import org.springframework.stereotype.Component; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -52,13 +48,13 @@ public class WebSocketServer { /** * 客户端连接信息 */ - private static Map servers = new ConcurrentHashMap<>(); + private static final Map servers = new ConcurrentHashMap<>(); public static final Map> TRANSPORT_EXTEND_MAP = new ConcurrentHashMap<>(); public static final Map> DASHBOARD_MAP = new ConcurrentHashMap<>(); - public static final Map API_SOCKET_MAP = new ConcurrentHashMap<>(); + public static final Map API_SOCKET_MAP = new ConcurrentHashMap<>(); private static final ByteBuffer PING_MSG = ByteBuffer.wrap(new byte[]{}); @@ -94,7 +90,6 @@ public class WebSocketServer { iotThingApiService = SpringContextUtils.getBean(IotThingApiService.class); //加载节点配置 loadApiList(session, apiId); - } //看板全局变量存储 @@ -252,25 +247,15 @@ public class WebSocketServer { if (MapUtil.isNotEmpty(apiMap)) { try { String jsonString = objectMapper.writeValueAsString(apiMap); - JsonNode jsonNode = objectMapper.readTree(jsonString); - convertValuesToString(jsonNode); - - JsonObject jsonObject = JsonParser.parseString(jsonNode.toString()).getAsJsonObject(); + JSONObject jsonObject1 = JSONObject.parseObject(jsonString); //推送一次最新数据 - sendMessage(session,new MessageData<>().data(jsonObject)); + sendMessage(session,new MessageData<>().data(jsonObject1)); + //存储一个超级API需要哪些物和相应的属性 + API_SOCKET_MAP.put(session.getId(),jsonObject1); } catch (JsonProcessingException e) { throw new RuntimeException(e); } } - Map result = MapUtil.get(apiMap, "result", Map.class); - result.remove("values"); - apiMap.put("result",result); - // 创建一个 Gson 对象 - Gson gson = new Gson(); - // 将 Map 转换为 JsonObject - JsonObject jsonObject = gson.toJsonTree(apiMap).getAsJsonObject(); - //存储一个超级API需要哪些物和相应的属性 - API_SOCKET_MAP.put(session.getId(),jsonObject); } private void loadTransportExtendList(String sessionId, String transportExtendId) { @@ -279,20 +264,4 @@ public class WebSocketServer { TRANSPORT_EXTEND_MAP.put(sessionId, collect); } - public static void convertValuesToString(JsonNode jsonNode) { - if (jsonNode.isObject()) { - ObjectNode objectNode = (ObjectNode) jsonNode; - Iterator> iterator = objectNode.fields(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - convertValuesToString(entry.getValue()); - if (entry.getValue().isValueNode()) { - objectNode.put(entry.getKey(), entry.getValue().asText()); - } - } - } else if (jsonNode.isArray()) { - jsonNode.forEach(element -> convertValuesToString(element)); - } - } - } \ No newline at end of file