Browse Source

MqttBrokerbug修复,需求完善

thing_master
xiachao 1 year ago
parent
commit
c0bb0ddc6a
  1. 51
      common/transport/src/main/java/com/thing/transport/modules/dto/ThingModelInfo.java
  2. 5
      common/transport/src/main/java/com/thing/transport/modules/mapper/MqttBrokerMsgMapper.java
  3. 6
      common/transport/src/main/java/com/thing/transport/mqtt/broker/MqttTransportContext.java
  4. 52
      common/transport/src/main/java/com/thing/transport/mqtt/broker/MqttTransportHandler.java
  5. 3
      modules/thing/src/main/java/com/thing/thing/model/service/impl/IotThingModelServiceImpl.java

51
common/transport/src/main/java/com/thing/transport/modules/dto/ThingModelInfo.java

@ -0,0 +1,51 @@
package com.thing.transport.modules.dto;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
/**
* 物模型表
*
* @author mark
* @since 3.0 2024-03-18
*/
@Data
@Schema(description = "物模型表")
public class ThingModelInfo implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
@Schema(description = "id")
@JsonSerialize(using = ToStringSerializer.class)
private Long id;
@Schema(description = "物编码")
private String code;
@Schema(description = "TB: token")
private String token;
@Schema(description = "是否网关: 0否1是")
private String gateway;
@Schema(description = "在线离线状态,0离线 1在线 2错误 3未接入")
private String status;
@Schema(description = "最新状态改变时间")
@JsonSerialize(using = ToStringSerializer.class)
private Long statusTs;
@Schema(description = "被分配到几个租户")
@JsonSerialize(using = ToStringSerializer.class)
private Long authNum;
@Schema(description = "数据来源")
private String origin;
@Schema(description = "备注说明")
private String remark;
@Schema(description = "创建时间")
@JsonSerialize(using = ToStringSerializer.class)
private Long createDate;
@Schema(description = "租户内物存在类型,0虚拟 1真实")
private String realType;
}

5
common/transport/src/main/java/com/thing/transport/modules/mapper/MqttBrokerMsgMapper.java

@ -1,6 +1,7 @@
package com.thing.transport.modules.mapper;
import com.thing.common.orm.mapper.PowerBaseMapper;
import com.thing.transport.modules.dto.ThingModelInfo;
import com.thing.transport.modules.entity.MqttBrokerMsgEntity;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Update;
@ -19,4 +20,8 @@ public interface MqttBrokerMsgMapper extends PowerBaseMapper<MqttBrokerMsgEntity
*/
@Update("truncate table mqtt_broker_msg")
void clear();
ThingModelInfo findModelByToken(String token);
}

6
common/transport/src/main/java/com/thing/transport/mqtt/broker/MqttTransportContext.java

@ -1,6 +1,7 @@
package com.thing.transport.mqtt.broker;
import com.thing.transport.api.AbstractTransportContext;
import com.thing.transport.modules.mapper.MqttBrokerMsgMapper;
import com.thing.transport.mqtt.broker.adaptors.MqttTransportAdaptor;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Autowired;
@ -18,9 +19,8 @@ public class MqttTransportContext extends AbstractTransportContext {
@Autowired
private MqttTransportAdaptor adaptor;
// @Autowired
// private ThingService thingService;
@Autowired
private MqttBrokerMsgMapper brokerMsgMapper;
@Value("${transport.mqtt.netty.max_payload_size}")
private Integer maxPayloadSize;

52
common/transport/src/main/java/com/thing/transport/mqtt/broker/MqttTransportHandler.java

@ -1,10 +1,5 @@
package com.thing.transport.mqtt.broker;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
import static io.netty.handler.codec.mqtt.MqttMessageType.*;
import static io.netty.handler.codec.mqtt.MqttQoS.*;
import com.thing.common.core.enumeration.GateWayStatus;
import com.thing.gen.queue.QueueProto.SessionInfoProto;
import com.thing.gen.queue.QueueProto.TransportMsg;
@ -12,29 +7,33 @@ import com.thing.transport.api.TransportService;
import com.thing.transport.api.TransportServiceCallback;
import com.thing.transport.api.adaptor.AdaptorException;
import com.thing.transport.api.session.TransportSessionUtil;
import com.thing.transport.modules.dto.ThingModelInfo;
import com.thing.transport.modules.mapper.MqttBrokerMsgMapper;
import com.thing.transport.mqtt.broker.adaptors.MqttTransportAdaptor;
import com.thing.transport.mqtt.broker.session.DeviceSessionCtx;
import com.thing.transport.mqtt.broker.session.MqttTopicMatcher;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.*;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
import static io.netty.handler.codec.mqtt.MqttMessageType.*;
import static io.netty.handler.codec.mqtt.MqttQoS.*;
@Slf4j
public class MqttTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>> {
@ -44,9 +43,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private final MqttTransportContext context;
private final MqttTransportAdaptor adaptor;
private final TransportService transportService;
// private final ThingService thingService;
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
private final MqttBrokerMsgMapper brokerMsgMapper;
private volatile SessionInfoProto sessionInfo;
private final DeviceSessionCtx deviceSessionCtx;
@ -54,8 +52,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
this.sessionId = UUID.randomUUID();
this.context = context;
this.transportService = context.getTransportService();
// this.thingService = context.getThingService();
this.adaptor = context.getAdaptor();
this.brokerMsgMapper= context.getBrokerMsgMapper();
this.mqttQoSMap = new ConcurrentHashMap<>();
this.deviceSessionCtx = new DeviceSessionCtx(sessionId, mqttQoSMap);
}
@ -191,23 +189,23 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
String clientId = msg.payload().clientIdentifier();
String clientIp = ctx.pipeline().channel().remoteAddress().toString();
log.info("[{}] Processing connect msg for client with user name: {}! ==连接IP==>{}", sessionId, userName, clientIp);
// Optional<ThingModel> optional = thingService.getByToken(userName);
// if (optional.isPresent()) {
// connectSuccess(ctx, userName, clientId, clientIp, optional.get());
// } else {
// log.info("[{}] connection refused bad username or password user name: {}!", sessionId, userName);
// ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
// ctx.close();
// }
ThingModelInfo optional = brokerMsgMapper.findModelByToken(userName);
if (ObjectUtils.isNotEmpty(optional)) {
connectSuccess(ctx, userName, clientId, clientIp, optional);
} else {
log.info("[{}] connection refused bad username or password user name: {}!", sessionId, userName);
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
ctx.close();
}
}
// private void connectSuccess(ChannelHandlerContext ctx, String userName, String clientId, String clientIp, ThingModel thingModel) {
// deviceSessionCtx.setConnected(true);
// sessionInfo = TransportSessionUtil.createSession(sessionId, thingModel.getCode(), thingModel.getGateway(),
// clientId, userName, StringUtils.stripStart(clientIp, "/"), System.currentTimeMillis());
// transportService.registerSession(sessionInfo);
// ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
// }
private void connectSuccess(ChannelHandlerContext ctx, String userName, String clientId, String clientIp, ThingModelInfo thingModel) {
deviceSessionCtx.setConnected(true);
sessionInfo = TransportSessionUtil.createSession(sessionId, thingModel.getCode(), thingModel.getGateway(),
clientId, userName, StringUtils.stripStart(clientIp, "/"), System.currentTimeMillis());
transportService.registerSession(sessionInfo);
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
}
private boolean checkConnected(ChannelHandlerContext ctx, MqttMessage msg) {

3
modules/thing/src/main/java/com/thing/thing/model/service/impl/IotThingModelServiceImpl.java

@ -33,6 +33,7 @@ import com.thing.thing.model.service.IotThingModelService;
import com.thing.transport.api.adaptor.JsonConverter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.stereotype.Service;
@ -187,7 +188,7 @@ public class IotThingModelServiceImpl extends BaseServiceImpl<IotThingModelMappe
List<ObjectNode> topicMap = cache.getTopicMap(CacheNameEnum.THING_MODEL);
if (CollectionUtils.isEmpty(topicMap)) {
IotThingModelDTO modelDTO = mapper.selectOneByQueryAs(QueryWrapper.create()
.eq(IotThingModelEntity::getGateway, gateway)
.eq(IotThingModelEntity::getGateway, gateway, ObjectUtils.isNotEmpty(gateway))
.eq(IotThingModelEntity::getToken, token)
, IotThingModelDTO.class);
return Optional.ofNullable(JsonConverter.convertToJsonObjectObjectNode(modelDTO));

Loading…
Cancel
Save