14 changed files with 529 additions and 11 deletions
-
11modules/thing/src/main/java/com/thing/device/analysisdata/service/impl/AnalysisDataServiceImpl.java
-
37modules/thing/src/main/java/com/thing/reissue/controller/DataReissueController.java
-
41modules/thing/src/main/java/com/thing/reissue/dto/DataReissueCalculationDTO.java
-
36modules/thing/src/main/java/com/thing/reissue/dto/DataReissueDTO.java
-
31modules/thing/src/main/java/com/thing/reissue/dto/DataReissueSettingDTO.java
-
66modules/thing/src/main/java/com/thing/reissue/entity/DataReissueEntity.java
-
16modules/thing/src/main/java/com/thing/reissue/mapper/DataReissueMapper.java
-
25modules/thing/src/main/java/com/thing/reissue/service/DataReissueService.java
-
248modules/thing/src/main/java/com/thing/reissue/service/impl/DataReissueServiceImpl.java
-
12modules/thing/src/main/java/com/thing/thing/api/service/impl/IotThingApiServiceImpl.java
-
6modules/thing/src/main/java/com/thing/thing/entity/controller/IotThingEntityController.java
-
5modules/thing/src/main/java/com/thing/thing/entity/dto/IotThingViewDTO.java
-
4modules/thing/src/main/java/com/thing/thing/entity/dto/IotThingViewSourceDTO.java
-
2modules/thing/src/main/java/com/thing/thing/entity/service/impl/IotThingEntityServiceImpl.java
@ -0,0 +1,37 @@ |
|||
package com.thing.reissue.controller; |
|||
|
|||
import com.thing.common.core.web.response.Result; |
|||
import com.thing.reissue.dto.DataReissueDTO; |
|||
import com.thing.reissue.service.DataReissueService; |
|||
import com.thing.sys.tenant.dto.SysTenantDTO; |
|||
import io.swagger.v3.oas.annotations.Operation; |
|||
import io.swagger.v3.oas.annotations.tags.Tag; |
|||
import lombok.RequiredArgsConstructor; |
|||
import org.springframework.web.bind.annotation.*; |
|||
|
|||
import java.util.List; |
|||
|
|||
@RestController |
|||
@RequestMapping("data/reissue") |
|||
@Tag(name = "数据补发") |
|||
@RequiredArgsConstructor |
|||
public class DataReissueController { |
|||
|
|||
private final DataReissueService dataReissueService; |
|||
|
|||
@PostMapping("dataReissue") |
|||
@Operation(summary = "数据补发") |
|||
public Result<Void> dataReissue(@RequestBody DataReissueDTO dataReissue) { |
|||
dataReissueService.dataReissue(dataReissue); |
|||
return new Result<>(); |
|||
} |
|||
|
|||
@GetMapping("tenantList") |
|||
@Operation(summary="侧边栏所有有数据的企业列表") |
|||
public Result<List<SysTenantDTO>> tenantList(){ |
|||
List<SysTenantDTO> dtos =dataReissueService.tenantList(); |
|||
return new Result<List<SysTenantDTO>>().ok(dtos); |
|||
} |
|||
|
|||
|
|||
} |
|||
@ -0,0 +1,41 @@ |
|||
package com.thing.reissue.dto; |
|||
|
|||
import io.swagger.v3.oas.annotations.media.Schema; |
|||
import lombok.AllArgsConstructor; |
|||
import lombok.Data; |
|||
|
|||
import java.util.List; |
|||
|
|||
/** |
|||
* 数据补发计算 |
|||
* |
|||
* @author zhenghh. 2021-12-10 |
|||
**/ |
|||
@Data |
|||
@Schema(description = "数据补发计算") |
|||
public class DataReissueCalculationDTO { |
|||
|
|||
@Schema(description = "开始时间") |
|||
private Long startTime; |
|||
@Schema(description = "结束时间") |
|||
private Long endTime; |
|||
@Schema(description = "需要计算的物") |
|||
List<ThingCalc> list; |
|||
|
|||
@Data |
|||
@AllArgsConstructor |
|||
public static class ThingCalc { |
|||
|
|||
@Schema(description = "物ID") |
|||
private Long thingId; |
|||
@Schema(description = "设备Code") |
|||
private String thingCode; |
|||
@Schema(description = "设备ID") |
|||
private String entityId; |
|||
@Schema(description = "属性列表") |
|||
private List<String> attrList; |
|||
@Schema(description = "部门ID") |
|||
private Long deptId; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,36 @@ |
|||
package com.thing.reissue.dto; |
|||
|
|||
import com.thing.common.core.validator.group.DefaultGroup; |
|||
import io.swagger.v3.oas.annotations.media.Schema; |
|||
import lombok.Data; |
|||
|
|||
import javax.validation.constraints.NotNull; |
|||
import java.io.Serial; |
|||
import java.io.Serializable; |
|||
import java.util.List; |
|||
|
|||
@Data |
|||
@Schema(name = "看板表") |
|||
public class DataReissueDTO implements Serializable { |
|||
|
|||
@Serial |
|||
private static final long serialVersionUID = 1L; |
|||
|
|||
private Long tenantCode; |
|||
|
|||
@Schema(description = "物entityIds") |
|||
private List<Long> entityIds; |
|||
|
|||
@Schema(description = "属性") |
|||
private List<String> attributeKey; |
|||
|
|||
@Schema(description = "开始时间(13位时间戳)") |
|||
@NotNull(message="开始时间不能为空", groups = DefaultGroup.class) |
|||
private Long startTime; |
|||
|
|||
@Schema(description = "结束时间(13位时间戳)") |
|||
@NotNull(message="结束时间不能为空", groups = DefaultGroup.class) |
|||
private Long endTime; |
|||
|
|||
|
|||
} |
|||
@ -0,0 +1,31 @@ |
|||
package com.thing.reissue.dto; |
|||
|
|||
import io.swagger.v3.oas.annotations.media.Schema; |
|||
import lombok.Data; |
|||
|
|||
import java.io.Serial; |
|||
import java.io.Serializable; |
|||
|
|||
|
|||
/** |
|||
* 产品 |
|||
* |
|||
* @author WangJunLong 56583086@qq.com |
|||
* @since 1.0.0 2020-09-09 |
|||
*/ |
|||
@Data |
|||
@Schema(description = "url用户名密码配置") |
|||
public class DataReissueSettingDTO implements Serializable { |
|||
|
|||
@Serial |
|||
private static final long serialVersionUID = 1L; |
|||
|
|||
@Schema(description = "url") |
|||
private String url; |
|||
|
|||
@Schema(description = "用户名") |
|||
private String userName; |
|||
|
|||
@Schema(description = "密码") |
|||
private String password; |
|||
} |
|||
@ -0,0 +1,66 @@ |
|||
package com.thing.reissue.entity; |
|||
|
|||
import com.mybatisflex.annotation.Table; |
|||
import com.thing.common.orm.entity.BaseInfoEntity; |
|||
import lombok.Data; |
|||
import lombok.EqualsAndHashCode; |
|||
import lombok.experimental.Accessors; |
|||
|
|||
import java.io.Serial; |
|||
|
|||
@Data |
|||
@Accessors(chain = true) |
|||
@EqualsAndHashCode(callSuper=false) |
|||
@Table("iot_dashboard") |
|||
public class DataReissueEntity extends BaseInfoEntity { |
|||
@Serial |
|||
private static final long serialVersionUID = 1L; |
|||
/** |
|||
* 看板管理id |
|||
*/ |
|||
private Long dashboardGroupId; |
|||
/** |
|||
* svg图片地址 |
|||
*/ |
|||
private String svgUrl; |
|||
/** |
|||
* 图片地址 |
|||
*/ |
|||
private String imgUrl; |
|||
/** |
|||
* 看板类型/0:svg看板,1:组态设计看板 |
|||
*/ |
|||
private String type="0"; |
|||
/** |
|||
* 组态看板url |
|||
*/ |
|||
private String scadaUrl; |
|||
/** |
|||
* 标题 |
|||
*/ |
|||
private String title; |
|||
/** |
|||
* 高度 |
|||
*/ |
|||
private String height; |
|||
/** |
|||
* 宽度 |
|||
*/ |
|||
private String width; |
|||
/** |
|||
* 背景颜色 |
|||
*/ |
|||
private String backgroundColor; |
|||
/** |
|||
* 背景图片 |
|||
*/ |
|||
private String backgroundPicture; |
|||
/** |
|||
* 描述 |
|||
*/ |
|||
private String remark; |
|||
/** |
|||
* 排序 |
|||
*/ |
|||
private Integer sort; |
|||
} |
|||
@ -0,0 +1,16 @@ |
|||
package com.thing.reissue.mapper; |
|||
|
|||
import com.thing.common.orm.mapper.PowerBaseMapper; |
|||
import com.thing.reissue.entity.DataReissueEntity; |
|||
import org.apache.ibatis.annotations.Mapper; |
|||
|
|||
/** |
|||
* 看板表 |
|||
* |
|||
* @author zzx |
|||
* @since 1.0.0 2022-12-05 |
|||
*/ |
|||
@Mapper |
|||
public interface DataReissueMapper extends PowerBaseMapper<DataReissueEntity> { |
|||
|
|||
} |
|||
@ -0,0 +1,25 @@ |
|||
package com.thing.reissue.service; |
|||
|
|||
import com.thing.common.orm.service.IBaseService; |
|||
import com.thing.reissue.dto.DataReissueCalculationDTO; |
|||
import com.thing.reissue.dto.DataReissueDTO; |
|||
import com.thing.reissue.entity.DataReissueEntity; |
|||
import com.thing.sys.tenant.dto.SysTenantDTO; |
|||
|
|||
import java.util.List; |
|||
|
|||
/** |
|||
* 看板表 |
|||
* |
|||
* @author zzx |
|||
* @since 1.0.0 2022-12-05 |
|||
*/ |
|||
public interface DataReissueService extends IBaseService<DataReissueEntity> { |
|||
|
|||
void dataReissue(DataReissueDTO dataReissue); |
|||
|
|||
|
|||
List<SysTenantDTO> tenantList(); |
|||
|
|||
|
|||
} |
|||
@ -0,0 +1,248 @@ |
|||
package com.thing.reissue.service.impl; |
|||
|
|||
import cn.hutool.core.collection.CollectionUtil; |
|||
import com.alibaba.fastjson.JSONObject; |
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
import com.mybatisflex.core.query.QueryWrapper; |
|||
import com.thing.common.core.enumeration.SuperAdminEnum; |
|||
import com.thing.common.core.exception.SysException; |
|||
import com.thing.common.data.tskv.TsKvDTO; |
|||
import com.thing.common.orm.service.impl.BaseServiceImpl; |
|||
import com.thing.common.tskv.event.TsKvEvent; |
|||
import com.thing.queue.util.Topics; |
|||
import com.thing.reissue.dto.DataReissueDTO; |
|||
import com.thing.reissue.dto.DataReissueSettingDTO; |
|||
import com.thing.reissue.entity.DataReissueEntity; |
|||
import com.thing.reissue.mapper.DataReissueMapper; |
|||
import com.thing.reissue.service.DataReissueService; |
|||
import com.thing.sys.biz.service.SysParamsService; |
|||
import com.thing.sys.security.context.TenantContext; |
|||
import com.thing.sys.security.domain.SecurityUser; |
|||
import com.thing.sys.security.domain.UserDetail; |
|||
import com.thing.sys.tenant.dto.SysTenantDTO; |
|||
import com.thing.sys.tenant.mapper.SysTenantMapper; |
|||
import com.thing.sys.tenant.service.SysTenantGroupService; |
|||
import com.thing.thing.context.service.ThingManageContextService; |
|||
import com.thing.thing.dictRelation.param.IotThingDictRelationParamDTO; |
|||
import com.thing.thing.entity.dto.IotThingEntityDTO; |
|||
import jakarta.annotation.Resource; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.commons.lang3.StringUtils; |
|||
import org.jetbrains.annotations.Nullable; |
|||
import org.springframework.context.ApplicationEventPublisher; |
|||
import org.springframework.core.ParameterizedTypeReference; |
|||
import org.springframework.http.*; |
|||
import org.springframework.stereotype.Service; |
|||
import org.springframework.web.client.RestTemplate; |
|||
import org.springframework.web.util.UriComponents; |
|||
import org.springframework.web.util.UriComponentsBuilder; |
|||
import org.thingsboard.server.common.data.Device; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.kv.TsKvEntry; |
|||
|
|||
import java.util.*; |
|||
import java.util.stream.Collectors; |
|||
|
|||
/** |
|||
* 看板表 |
|||
* |
|||
* @author zzx |
|||
* @since 1.0.0 2022-12-05 |
|||
*/ |
|||
@Slf4j |
|||
@Service |
|||
@RequiredArgsConstructor |
|||
public class DataReissueServiceImpl extends BaseServiceImpl<DataReissueMapper, DataReissueEntity> implements DataReissueService { |
|||
|
|||
private final ThingManageContextService thingManageContextService; |
|||
|
|||
private final String DATA_REISSUE = "DATA_REISSUE"; |
|||
|
|||
@Resource |
|||
private SysParamsService sysParamsService; |
|||
|
|||
@Resource |
|||
private SysTenantMapper sysTenantDao; |
|||
|
|||
@Resource |
|||
private SysTenantGroupService sysTenantGroupService; |
|||
|
|||
@Resource |
|||
private ApplicationEventPublisher publisher; |
|||
|
|||
@Resource |
|||
private RestTemplate restTemplate; |
|||
|
|||
@Override |
|||
public QueryWrapper getWrapper(Map<String, Object> params) { |
|||
String dashboardGroupId = (String) params.get("dashboardGroupId"); |
|||
QueryWrapper wrapper = new QueryWrapper(); |
|||
if (StringUtils.isBlank(dashboardGroupId)) throw new SysException("看板组id为必传字段"); |
|||
wrapper.eq("dashboard_group_id", Long.valueOf(dashboardGroupId)) |
|||
.orderBy(DataReissueEntity::getSort).asc() |
|||
.orderBy(DataReissueEntity::getCreateDate).desc(); |
|||
return wrapper; |
|||
} |
|||
|
|||
|
|||
@Override |
|||
public void dataReissue(DataReissueDTO dataReissue) { |
|||
//1.获取设备列表 |
|||
List<Long> entityIdList = dataReissue.getEntityIds(); |
|||
|
|||
Long tenantCode = dataReissue.getTenantCode(); |
|||
//过滤租户 |
|||
if(Objects.isNull(tenantCode)){ |
|||
tenantCode = TenantContext.getTenantCode(SecurityUser.getUser()); |
|||
} |
|||
|
|||
List<IotThingEntityDTO> thingsDTOS; |
|||
if(CollectionUtil.isNotEmpty(entityIdList)){ |
|||
thingsDTOS = thingManageContextService.findEntityAllById(entityIdList).orElse(Collections.emptyList()); |
|||
}else{ |
|||
thingsDTOS = thingManageContextService.findEntityAllByCode(null,tenantCode,true).orElse(Collections.emptyList()); |
|||
} |
|||
if(CollectionUtil.isEmpty(thingsDTOS)){ |
|||
throw new SysException("未获取到任何设备列表,数据补发失败"); |
|||
} |
|||
List<IotThingDictRelationParamDTO> thingsAttributesList = new ArrayList<>(); |
|||
List<Long> thingIdList = thingsDTOS.stream().map(IotThingEntityDTO::getId).collect(Collectors.toList()); |
|||
//2.获取属性列表 |
|||
if(CollectionUtil.isNotEmpty(dataReissue.getAttributeKey())){ |
|||
thingsAttributesList.addAll(thingManageContextService.findDictRelationAllByEntityIdsAndCodes(thingIdList, dataReissue.getAttributeKey()).orElse(Collections.emptyList())); |
|||
}else{ |
|||
thingsAttributesList.addAll(thingManageContextService.findDictRelationAllByEntityIdsAndCodes(thingIdList, dataReissue.getAttributeKey()).orElse(Collections.emptyList())); |
|||
} |
|||
if(CollectionUtil.isEmpty(thingsAttributesList)){ |
|||
throw new SysException("未获取到任何属性列表,数据补发失败"); |
|||
} |
|||
//3.获取数据补发时间 |
|||
Long startTime = dataReissue.getStartTime(); |
|||
Long endTime = dataReissue.getEndTime(); |
|||
if(Objects.isNull(startTime) || Objects.isNull(endTime)){ |
|||
throw new SysException("时间不能为空"); |
|||
} |
|||
|
|||
String dataReissueSetJson = sysParamsService.getValue(DATA_REISSUE); |
|||
if (StringUtils.isBlank(dataReissueSetJson)) { |
|||
throw new SysException("未获取到相关的配置信息!"); |
|||
} |
|||
DataReissueSettingDTO dataReissueSettingDTO = JSONObject.parseObject(dataReissueSetJson, DataReissueSettingDTO.class); |
|||
String token = login(dataReissueSettingDTO.getUrl(), dataReissueSettingDTO.getUserName(), dataReissueSettingDTO.getPassword()); |
|||
Map<Long, List<IotThingDictRelationParamDTO>> longListMap = thingsAttributesList.stream().collect(Collectors.groupingBy(IotThingDictRelationParamDTO::getEntityId)); |
|||
longListMap.forEach((k,v)->{ |
|||
boolean flag = true; |
|||
String limit = "100"; |
|||
while (flag){ |
|||
String entityCode = v.get(0).getEntityCode(); |
|||
List<String> keyList = v.stream().map(IotThingDictRelationParamDTO::getCode).collect(Collectors.toList()); |
|||
HttpHeaders headers = new HttpHeaders(); |
|||
headers.setContentType(MediaType.APPLICATION_JSON); |
|||
headers.set("x-authorization","Bearer " + token); |
|||
HttpEntity<String> requestEntity = new HttpEntity<>(null , headers); |
|||
ResponseEntity<Device> responseEntity = restTemplate.exchange( |
|||
dataReissueSettingDTO.getUrl() + "/api/tenant/devices?deviceName=" + entityCode, |
|||
HttpMethod.GET, |
|||
requestEntity, |
|||
Device.class |
|||
); |
|||
Map<String, List<JsonNode>> timeseries = getTskvtMap(responseEntity, token, dataReissueSettingDTO, keyList, startTime, endTime,limit); |
|||
// List<TsKvEntry> tsKvEntries = RestJsonConverter.toTimeseries(timeseries); |
|||
// if(CollectionUtil.isNotEmpty(tsKvEntries)){ |
|||
// pushQueue(entityCode,tsKvEntries); |
|||
// |
|||
// }else{ |
|||
// flag = false; |
|||
// } |
|||
} |
|||
}); |
|||
} |
|||
|
|||
@Nullable |
|||
private Map<String, List<JsonNode>> getTskvtMap(ResponseEntity<Device> responseEntity, String token, DataReissueSettingDTO dataReissueSettingDTO, |
|||
List<String> keyList, Long startTime, Long endTime,String limit) { |
|||
DeviceId deviceId = responseEntity.getBody().getId(); |
|||
HttpHeaders headers1 = new HttpHeaders(); |
|||
headers1.setContentType(MediaType.APPLICATION_JSON); |
|||
headers1.set("x-authorization","Bearer " + token); |
|||
UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromHttpUrl(dataReissueSettingDTO.getUrl()) |
|||
.path("/api/plugins/telemetry/{entityType}/{entityId}/values/timeseries") |
|||
.queryParam("keys", "{keys}") |
|||
.queryParam("interval", "{interval}") |
|||
.queryParam("agg", "{agg}") |
|||
.queryParam("limit", "{limit}") |
|||
.queryParam("orderBy", "{orderBy}") |
|||
.queryParam("useStrictDataTypes", "{useStrictDataTypes}"); |
|||
|
|||
Map<String, String> params = new HashMap(); |
|||
params.put("entityType", deviceId.getEntityType().name()); |
|||
params.put("entityId", deviceId.getId().toString()); |
|||
params.put("keys", String.join(",", keyList)); |
|||
params.put("interval", "0"); |
|||
params.put("agg", "NONE"); |
|||
params.put("limit", limit); |
|||
params.put("orderBy", "DESC"); |
|||
params.put("useStrictDataTypes", Boolean.toString(Boolean.FALSE)); |
|||
if (startTime != null) { |
|||
uriBuilder.queryParam("&startTs={startTs}"); |
|||
params.put("startTs", String.valueOf(startTime)); |
|||
} |
|||
if (endTime != null) { |
|||
uriBuilder.queryParam("&endTs={endTs}"); |
|||
params.put("endTs", String.valueOf(endTime)); |
|||
} |
|||
UriComponents uriComponents = uriBuilder.buildAndExpand(params); |
|||
HttpEntity<String> requestEntity = new HttpEntity<>(null , headers1); |
|||
|
|||
|
|||
Map<String, List<JsonNode>> timeseries = restTemplate.exchange( |
|||
uriComponents.toUri(), |
|||
HttpMethod.GET, |
|||
requestEntity, |
|||
new ParameterizedTypeReference<Map<String, List<JsonNode>>>() {} |
|||
).getBody(); |
|||
return timeseries; |
|||
} |
|||
|
|||
@Override |
|||
public List<SysTenantDTO> tenantList() { |
|||
Map<String,Object> params = new HashMap<>(); |
|||
UserDetail userDetail = SecurityUser.getUser(); |
|||
Long tenantCode = TenantContext.getTenantCode(userDetail); |
|||
List<Long> tenantCodeList = new ArrayList<>(); |
|||
if(!Objects.equals(userDetail.getSuperAdmin(), SuperAdminEnum.YES.value()) |
|||
|| !Objects.equals(tenantCode, userDetail.getTenantCode())) { |
|||
tenantCodeList.add(tenantCode); |
|||
//根据 编码找下级编码 |
|||
List<Long> children = sysTenantGroupService.getChildren(tenantCode); |
|||
tenantCodeList.addAll(children); |
|||
params.put("tenantCodeList",tenantCodeList); |
|||
} |
|||
return sysTenantDao.queryList(params).parallelStream().sorted(Comparator.comparingLong(SysTenantDTO::getTenantCode)).collect(Collectors.toList()); |
|||
} |
|||
|
|||
|
|||
private String login(String url, String username, String password) { |
|||
Map<String, String> loginRequest = new HashMap(); |
|||
loginRequest.put("username", username); |
|||
loginRequest.put("password", password); |
|||
ResponseEntity<JsonNode> tokenInfo = restTemplate.postForEntity(url + "/api/auth/login", loginRequest, JsonNode.class, new Object[0]); |
|||
return tokenInfo.getBody().get("token").asText(); |
|||
} |
|||
|
|||
private void pushQueue(String thingCode, List<TsKvEntry> timeSeries) { |
|||
if (CollectionUtil.isEmpty(timeSeries)) { |
|||
return ; |
|||
} |
|||
List<TsKvDTO> list = timeSeries.parallelStream() |
|||
.filter(tsKv -> Objects.nonNull(tsKv.getValue())) |
|||
.map(tsKv -> new TsKvDTO(thingCode, tsKv.getKey(), tsKv.getTs(), tsKv.getValueAsString())) |
|||
.toList(); |
|||
//send 队列 |
|||
publisher.publishEvent(new TsKvEvent(Topics.V1_TSKV_HISTORY.getValue(), TsKvDTO.toDataProtoList(list))); |
|||
|
|||
} |
|||
|
|||
|
|||
} |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue