package com.thing; import cn.hutool.core.collection.CollectionUtil; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ListenableFuture; import com.thing.api.ScriptEngine; import com.thing.api.aviator.AviatorScriptEngine; import com.thing.api.aviator.DefaultAviatorInvokeService; import com.thing.api.mvel.DefaultTbelInvokeService; import com.thing.api.mvel.TbelScriptEngine; import com.thing.api.nashorn.DefaultNashornInvokeService; import com.thing.api.nashorn.NashornScriptEngine; import com.thing.common.core.exception.SysException; import com.thing.common.core.utils.JacksonUtil; import com.thing.common.core.validator.ValidatorUtils; import com.thing.common.core.validator.group.DefaultGroup; import com.thing.common.data.tskv.TsKvDTO; import com.thing.common.tskv.service.TsKvService; import com.thing.modules.dto.QueryMsg; import com.thing.modules.entity.ScriptInfoEntity; import com.thing.modules.mapper.ScriptInfoMapper; import com.thing.modules.service.ScriptLogService; import jakarta.annotation.Resource; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import java.util.*; import java.util.stream.Collectors; @Service public class ScriptCreateServiceImpl implements ScriptCreateService { @Autowired private DefaultNashornInvokeService nashornInvokeService; @Autowired private DefaultTbelInvokeService tbelInvokeService; @Autowired private DefaultAviatorInvokeService aviatorInvokeService; @Autowired private ScriptLogService scriptLogService; @Lazy @Autowired private ScriptInfoMapper scriptInfoMapper; @Resource private TsKvService tsKvService; /** * 创建ScriptEngine 调试用 * * @param scriptId 数据解析主键 * @param scriptLanguage 解析语言 * @param script 消息体 * @param debug 是否调试 * @param argNames 入参 * @return ScriptEngine */ @Override public ScriptEngine createScriptEngine(Long scriptId, ScriptLanguage scriptLanguage, String script, Boolean debug, String... argNames) { switch (scriptLanguage) { case MVEL: return new TbelScriptEngine(String.valueOf(scriptId), tbelInvokeService, scriptLogService, debug, script, argNames); case NASHORN: return new NashornScriptEngine(String.valueOf(scriptId), nashornInvokeService, scriptLogService, debug, script, argNames); case AVIATOR: return new AviatorScriptEngine(String.valueOf(scriptId), aviatorInvokeService, scriptLogService, debug, script, argNames); default: throw new UnsupportedOperationException("unsupported type: " + scriptLanguage.name()); } } /** * 创建ScriptEngine * 直接执行script使用 * 默认入参为 msg, metadata * * @param scriptId 数据解析主键 * @param scriptLanguage 解析语言 * @param script 消息体 * @param debug 是否调试 * @return ScriptEngine */ @Override public ScriptEngine createScriptEngine(Long scriptId, ScriptLanguage scriptLanguage, String script, Boolean debug) { return createScriptEngine(scriptId, scriptLanguage, script, debug, "msg", "metadata"); } /** * 过滤 * * @param scriptId 数据解析主键 * @param msg 消息 * @return Boolean */ @Override public ListenableFuture executeFilterAsync(Long scriptId, String msg) { return executeFilterAsync(scriptId, msg, "msg", "metadata"); } /** * 过滤 * * @param entity 数据解析 * @param msg 消息 * @return Boolean */ @Override public ListenableFuture executeFilterAsync(ScriptInfoEntity entity, String msg) { return executeFilterAsync(entity, msg, "msg", "metadata"); } /** * 过滤 * * @param scriptId 数据解析主键 * @param msg 消息 * @param argNames 入参名称 * @return Boolean */ @Override public ListenableFuture executeFilterAsync(Long scriptId, String msg, String... argNames) { return executeFilterAsync(getScriptInfo(scriptId), msg, argNames); } /** * 过滤 * * @param entity 数据解析 * @param msg 消息 * @param argNames 入参名称 * @return Boolean */ @Override public ListenableFuture executeFilterAsync(ScriptInfoEntity entity, String msg, String... argNames) { ScriptMsg scriptMsg = convertScriptMsg(msg, JacksonUtil.toJsonNode(entity.getSupMsg()), JacksonUtil.toJsonNode(entity.getQueryMsg())); ScriptEngine scriptEngine = createScriptEngine(entity.getId(), ScriptLanguage.valueOf(entity.getScriptType()), entity.getScriptBody(), entity.getDebug(), argNames); return scriptEngine.executeFilterAsync(scriptMsg); } /** * json转换 * * @param scriptId 计算主键 * @param language 解析语言 * @param scriptBody 方法体 * @param debug 是否调试 * @param msg 消息 * @param sup 辅助参数 * @param query 查询参数 * @param destroy 是否销毁 * @param argNames 属性名称 * @return JsonNode */ @Override public JsonNode executeJson(Long scriptId, ScriptLanguage language, String scriptBody, Boolean debug, String msg, JsonNode sup, JsonNode query, Boolean destroy, String... argNames) { if (StringUtils.isBlank(scriptBody)) { return JacksonUtil.toJsonNode(StringUtils.defaultIfBlank(msg, "{}")); } ScriptEngine scriptEngine = null; try { ScriptMsg scriptMsg = convertScriptMsg(msg, sup, query); scriptEngine = createScriptEngine(scriptId, language, scriptBody, debug, argNames); return scriptEngine.executeJsonAsync(scriptMsg).get(); } catch (Exception e) { if (scriptEngine != null) { scriptEngine.destroy(); } throw new SysException("解析失败: " + e.getMessage()); } finally { if (destroy && scriptEngine != null) { scriptEngine.destroy(); } } } /** * json转换 * * @param scriptId 计算主键 * @param language 解析语言 * @param scriptBody 方法体 * @param debug 是否调试 * @param msg 消息 * @param sup 辅助参数 * @param query 查询参数 * @param destroy 是否销毁 * @return JsonNode */ @Override public JsonNode executeJson(Long scriptId, ScriptLanguage language, String scriptBody, Boolean debug, String msg, JsonNode sup, JsonNode query, Boolean destroy) { return executeJson(scriptId, language, scriptBody, debug, msg, sup, query, destroy, "msg", "metadata"); } /** * 转换json * * @param scriptId 数据解析主键 * @param msg 消息 * @return JsonNode */ @Override public ListenableFuture executeJsonAsync(Long scriptId, String msg) { return executeJsonAsync(scriptId, msg, "msg", "metadata"); } /** * 转换json * * @param entity 数据解析 * @param msg 消息 * @return JsonNode */ @Override public ListenableFuture executeJsonAsync(ScriptInfoEntity entity, String msg) { return executeJsonAsync(entity, msg, "msg", "metadata"); } /** * 转换json * * @param scriptId 数据解析主键 * @param msg 消息 * @param argNames 入参名称 * @return JsonNode */ @Override public ListenableFuture executeJsonAsync(Long scriptId, String msg, String... argNames) { return executeJsonAsync(getScriptInfo(scriptId), msg, argNames); } /** * 转换json * * @param entity 数据解析主键 * @param msg 消息 * @param argNames 入参名称 * @return JsonNode */ @Override public ListenableFuture executeJsonAsync(ScriptInfoEntity entity, String msg, String... argNames) { ScriptMsg scriptMsg = convertScriptMsg(msg, JacksonUtil.toJsonNode(entity.getSupMsg()), JacksonUtil.toJsonNode(entity.getQueryMsg())); ScriptEngine scriptEngine = createScriptEngine(entity.getId(), ScriptLanguage.valueOf(entity.getScriptType()), entity.getScriptBody(), entity.getDebug(), argNames); return scriptEngine.executeJsonAsync(scriptMsg); } /** * 转换字符串 * * @param scriptId 数据解析主键 * @param msg 消息 * @return String */ @Override public ListenableFuture executeToStringAsync(Long scriptId, String msg) { return executeToStringAsync(scriptId, msg, "msg", "metadata"); } /** * 转换字符串 * * @param entity 数据解析 * @param msg 消息 * @return String */ @Override public ListenableFuture executeToStringAsync(ScriptInfoEntity entity, String msg) { return executeToStringAsync(entity, msg, "msg", "metadata"); } /** * 转换字符串 * * @param scriptId 数据解析主键 * @param msg 消息 * @param argNames 入参名称 * @return String */ @Override public ListenableFuture executeToStringAsync(Long scriptId, String msg, String... argNames) { return executeToStringAsync(getScriptInfo(scriptId), msg, argNames); } /** * 转换字符串 * * @param entity 数据解析 * @param msg 消息 * @param argNames 入参名称 * @return String */ @Override public ListenableFuture executeToStringAsync(ScriptInfoEntity entity, String msg, String... argNames) { ScriptMsg scriptMsg = convertScriptMsg(msg, JacksonUtil.toJsonNode(entity.getSupMsg()), JacksonUtil.toJsonNode(entity.getQueryMsg())); ScriptEngine scriptEngine = createScriptEngine(entity.getId(), ScriptLanguage.valueOf(entity.getScriptType()), entity.getScriptBody(), entity.getDebug(), argNames); return scriptEngine.executeToStringAsync(scriptMsg); } /** * 前置判断 * * @param inMsg 入参 * @param sup 辅助参数 * @param query 查询参数 * @return map */ @Override public ScriptMsg convertScriptMsg(String inMsg, JsonNode sup, JsonNode query) { String msg = StringUtils.defaultIfBlank(inMsg, "{}"); Map metaData = Maps.newHashMap(); JsonNode supMsg = sup == null ? JacksonUtil.newObjectNode() : sup; metaData.put("sup", supMsg); List queryList = query == null || query.isEmpty() ? Lists.newArrayList() : JacksonUtil.convertValue(query, new TypeReference<>() { }); if (CollectionUtil.isEmpty(queryList)) { metaData.put("query", JacksonUtil.newObjectNode()); return new ScriptMsg(msg, metaData); } queryList.forEach(item -> ValidatorUtils.validateEntity(item, DefaultGroup.class)); if (queryList.stream().anyMatch(item -> !item.isLast()) && !supMsg.has("ts")) { throw new SysException("勾选同时间后,辅助参数必须包含ts"); } List list = Lists.newArrayList(); //最新值 Map> lastMap = queryList.stream() .filter(QueryMsg::isLast) .collect(Collectors.groupingBy(QueryMsg::getCode, Collectors.mapping(QueryMsg::getAttr, Collectors.toCollection(ArrayList::new)))); if (CollectionUtil.isNotEmpty(lastMap)) { list.addAll(tsKvService.findLatestByMultiMap(lastMap, Boolean.FALSE)); } //区间值 Map> intervalMap = queryList.stream().filter(item -> !item.isLast()) .collect(Collectors.groupingBy(QueryMsg::getCode, Collectors.mapping(QueryMsg::getAttr, Collectors.toCollection(ArrayList::new)))); if (CollectionUtil.isNotEmpty(intervalMap)) { long ts = supMsg.get("ts").asLong(); list.addAll(tsKvService.findLatestByMultiMap(intervalMap,ts - 1, ts + 1, Boolean.FALSE)); } ObjectNode jsonNodes = JacksonUtil.newObjectNode(); for (QueryMsg queryMsg : queryList) { Optional first = list.parallelStream() .filter(item -> StringUtils.equals(item.getThingCode(), queryMsg.getCode()) && StringUtils.equals(item.getAttrKey(), queryMsg.getAttr())) .findFirst(); if (first.isPresent()) { jsonNodes.put(queryMsg.getLabel(), first.get().getVal()); } else if (StringUtils.isNotBlank(queryMsg.getDefVal())) { jsonNodes.put(queryMsg.getLabel(), queryMsg.getDefVal()); } } if (jsonNodes.isEmpty() || jsonNodes.size() != queryList.size()) { throw new SysException("未查询到数据或查询数据缺失"); } metaData.put("query", jsonNodes); return new ScriptMsg(msg, metaData); } /** * 数据解析 * * @param scriptId 数据解析主键 * @return ScriptInfoEntity */ private ScriptInfoEntity getScriptInfo(Long scriptId) { //todo 效率太低的话,调整一下缓存,从缓存获取 ScriptInfoEntity entity = scriptInfoMapper.selectOneById(scriptId); if (Objects.isNull(entity)) { throw new SysException("未找到数据解析配置"); } return entity; } }