diff --git a/common/transport/src/main/java/com/thing/transport/modules/dto/ThingModelInfo.java b/common/transport/src/main/java/com/thing/transport/modules/dto/ThingModelInfo.java new file mode 100644 index 0000000..fb0e700 --- /dev/null +++ b/common/transport/src/main/java/com/thing/transport/modules/dto/ThingModelInfo.java @@ -0,0 +1,51 @@ +package com.thing.transport.modules.dto; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import java.io.Serial; +import java.io.Serializable; + +/** + * 物模型表 + * + * @author mark + * @since 3.0 2024-03-18 + */ +@Data +@Schema(description = "物模型表") +public class ThingModelInfo implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + @Schema(description = "id") + @JsonSerialize(using = ToStringSerializer.class) + private Long id; + @Schema(description = "物编码") + private String code; + @Schema(description = "TB: token") + private String token; + @Schema(description = "是否网关: 0否1是") + private String gateway; + @Schema(description = "在线离线状态,0离线 1在线 2错误 3未接入") + private String status; + @Schema(description = "最新状态改变时间") + @JsonSerialize(using = ToStringSerializer.class) + private Long statusTs; + @Schema(description = "被分配到几个租户") + @JsonSerialize(using = ToStringSerializer.class) + private Long authNum; + @Schema(description = "数据来源") + private String origin; + @Schema(description = "备注说明") + private String remark; + @Schema(description = "创建时间") + @JsonSerialize(using = ToStringSerializer.class) + private Long createDate; + @Schema(description = "租户内物存在类型,0虚拟 1真实") + private String realType; + +} \ No newline at end of file diff --git a/common/transport/src/main/java/com/thing/transport/modules/mapper/MqttBrokerMsgMapper.java b/common/transport/src/main/java/com/thing/transport/modules/mapper/MqttBrokerMsgMapper.java index 8def1e6..9746466 100644 --- a/common/transport/src/main/java/com/thing/transport/modules/mapper/MqttBrokerMsgMapper.java +++ b/common/transport/src/main/java/com/thing/transport/modules/mapper/MqttBrokerMsgMapper.java @@ -1,6 +1,7 @@ package com.thing.transport.modules.mapper; import com.thing.common.orm.mapper.PowerBaseMapper; +import com.thing.transport.modules.dto.ThingModelInfo; import com.thing.transport.modules.entity.MqttBrokerMsgEntity; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Update; @@ -19,4 +20,8 @@ public interface MqttBrokerMsgMapper extends PowerBaseMapper> { @@ -44,9 +43,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private final MqttTransportContext context; private final MqttTransportAdaptor adaptor; private final TransportService transportService; - // private final ThingService thingService; private final ConcurrentMap mqttQoSMap; - + private final MqttBrokerMsgMapper brokerMsgMapper; private volatile SessionInfoProto sessionInfo; private final DeviceSessionCtx deviceSessionCtx; @@ -54,8 +52,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement this.sessionId = UUID.randomUUID(); this.context = context; this.transportService = context.getTransportService(); -// this.thingService = context.getThingService(); this.adaptor = context.getAdaptor(); + this.brokerMsgMapper= context.getBrokerMsgMapper(); this.mqttQoSMap = new ConcurrentHashMap<>(); this.deviceSessionCtx = new DeviceSessionCtx(sessionId, mqttQoSMap); } @@ -191,23 +189,23 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement String clientId = msg.payload().clientIdentifier(); String clientIp = ctx.pipeline().channel().remoteAddress().toString(); log.info("[{}] Processing connect msg for client with user name: {}! ==连接IP==>{}", sessionId, userName, clientIp); -// Optional optional = thingService.getByToken(userName); -// if (optional.isPresent()) { -// connectSuccess(ctx, userName, clientId, clientIp, optional.get()); -// } else { -// log.info("[{}] connection refused bad username or password user name: {}!", sessionId, userName); -// ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD)); -// ctx.close(); -// } + ThingModelInfo optional = brokerMsgMapper.findModelByToken(userName); + if (ObjectUtils.isNotEmpty(optional)) { + connectSuccess(ctx, userName, clientId, clientIp, optional); + } else { + log.info("[{}] connection refused bad username or password user name: {}!", sessionId, userName); + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD)); + ctx.close(); + } } -// private void connectSuccess(ChannelHandlerContext ctx, String userName, String clientId, String clientIp, ThingModel thingModel) { -// deviceSessionCtx.setConnected(true); -// sessionInfo = TransportSessionUtil.createSession(sessionId, thingModel.getCode(), thingModel.getGateway(), -// clientId, userName, StringUtils.stripStart(clientIp, "/"), System.currentTimeMillis()); -// transportService.registerSession(sessionInfo); -// ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); -// } + private void connectSuccess(ChannelHandlerContext ctx, String userName, String clientId, String clientIp, ThingModelInfo thingModel) { + deviceSessionCtx.setConnected(true); + sessionInfo = TransportSessionUtil.createSession(sessionId, thingModel.getCode(), thingModel.getGateway(), + clientId, userName, StringUtils.stripStart(clientIp, "/"), System.currentTimeMillis()); + transportService.registerSession(sessionInfo); + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); + } private boolean checkConnected(ChannelHandlerContext ctx, MqttMessage msg) { diff --git a/modules/thing/src/main/java/com/thing/thing/model/service/impl/IotThingModelServiceImpl.java b/modules/thing/src/main/java/com/thing/thing/model/service/impl/IotThingModelServiceImpl.java index 5a4d913..92c4938 100644 --- a/modules/thing/src/main/java/com/thing/thing/model/service/impl/IotThingModelServiceImpl.java +++ b/modules/thing/src/main/java/com/thing/thing/model/service/impl/IotThingModelServiceImpl.java @@ -33,6 +33,7 @@ import com.thing.thing.model.service.IotThingModelService; import com.thing.transport.api.adaptor.JsonConverter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.springframework.stereotype.Service; @@ -187,7 +188,7 @@ public class IotThingModelServiceImpl extends BaseServiceImpl topicMap = cache.getTopicMap(CacheNameEnum.THING_MODEL); if (CollectionUtils.isEmpty(topicMap)) { IotThingModelDTO modelDTO = mapper.selectOneByQueryAs(QueryWrapper.create() - .eq(IotThingModelEntity::getGateway, gateway) + .eq(IotThingModelEntity::getGateway, gateway, ObjectUtils.isNotEmpty(gateway)) .eq(IotThingModelEntity::getToken, token) , IotThingModelDTO.class); return Optional.ofNullable(JsonConverter.convertToJsonObjectObjectNode(modelDTO));