Skip to content

Commit faa287a

Browse files
committed
perf: 优化 cleanSession 会话消息处理逻辑
当 cleanSession = 1, mqttx 不在持久化消息, 转而使用内存保存消息
1 parent 3d55252 commit faa287a

13 files changed

+165
-28
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
</parent>
1111
<groupId>com.jun</groupId>
1212
<artifactId>mqttx</artifactId>
13-
<version>1.0.5.ALPHA</version>
13+
<version>1.0.5.BETA</version>
1414
<name>mqttx</name>
1515
<description>mqtt broker</description>
1616

readme.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,27 @@
168168
![share-topic](https://s1.ax1x.com/2020/09/22/wXddnU.png)
169169

170170
> `msg-a` 消息分发策略取决于配置项 `mqttx.share-topic.share-sub-strategy`
171+
>
172+
> 可以配合 `cleanSession = 1` 的会话,共享主题的客户端断开连接后会被服务端移除订阅,这样共享主题的消息只会分发给在线的客户端。
173+
> 注意: `mqtt3.1.1` 协议规定当 `cleanSession = 1` 时,连接断开后与会话相关联的所有状态(不含 retained 消息)都会被删除(`mqtt5` 增加了会话超时设置,感兴趣的同学可以了解一下)。
174+
> `mqttx v1.0.5.BETA` 版本后(含),`cleanSession = 1` 的会话消息保存在内存中,具备极高的性能.
175+
>
176+
> > If CleanSession is set to 1, the Client and Server **MUST** discard any previous Session and start a new one. This Session lasts as long as the Network Connection. State data associated with this Session **MUST NOT** be reused in any subsequent Session [MQTT-3.1.2-6].
177+
> >
178+
> > The Session state in the Client consists of:
179+
> >
180+
> > - QoS 1 and QoS 2 messages which have been sent to the Server, but have not been completely acknowledged.
181+
> > - QoS 2 messages which have been received from the Server, but have not been completely acknowledged.
182+
> >
183+
> > The Session state in the Server consists of:
184+
> >
185+
> > - The existence of a Session, even if the rest of the Session state is empty.
186+
> > - The Client’s subscriptions.
187+
> > - QoS 1 and QoS 2 messages which have been sent to the Client, but have not been completely acknowledged.
188+
> > - QoS 1 and QoS 2 messages pending transmission to the Client.
189+
> > - QoS 2 messages which have been received from the Client, but have not been completely acknowledged.
190+
> > - Optionally, QoS 0 messages pending transmission to the Client.
191+
171192

172193
#### 4.7 websocket 支持
173194

src/main/java/com/jun/mqttx/broker/BrokerHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
108108
.ifPresent(msg -> messageDelegatingHandler.handle(ctx, msg));
109109

110110
ConnectHandler.clientMap.remove(session.getClientId());
111-
if (Boolean.FALSE.equals(session.getClearSession())) {
111+
if (Boolean.FALSE.equals(session.getCleanSession())) {
112112
sessionService.save(session);
113113
}
114114
}

src/main/java/com/jun/mqttx/broker/handler/AbstractMqttSessionHandler.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ String clientId(ChannelHandlerContext ctx) {
4747
*/
4848
boolean clearSession(ChannelHandlerContext ctx) {
4949
Session session = getSession(ctx);
50-
return session.getClearSession();
50+
return session.getCleanSession();
5151
}
5252

5353
/**
@@ -77,13 +77,24 @@ void saveAuthorizedTopics(ChannelHandlerContext ctx, Authentication authenticati
7777
channel.attr(AttributeKey.valueOf(AUTHORIZED_PUB_TOPICS)).set(authentication.getAuthorizedPub());
7878
}
7979

80+
/**
81+
* 获取当前会话 clean session 状态
82+
*
83+
* @param ctx {@link ChannelHandlerContext}
84+
* @return true if clearSession = 1
85+
*/
86+
boolean isCleanSession(ChannelHandlerContext ctx) {
87+
Session session = (Session) ctx.channel().attr(AttributeKey.valueOf(Session.KEY)).get();
88+
return session.getCleanSession();
89+
}
90+
8091
/**
8192
* 获取客户会话
8293
*
8394
* @param ctx {@link ChannelHandlerContext}
8495
* @return {@link Session}
8596
*/
86-
private Session getSession(ChannelHandlerContext ctx) {
97+
Session getSession(ChannelHandlerContext ctx) {
8798
return (Session) ctx.channel().attr(AttributeKey.valueOf(Session.KEY)).get();
8899
}
89100
}

src/main/java/com/jun/mqttx/broker/handler/ConnectHandler.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -168,17 +168,13 @@ public void process(ChannelHandlerContext ctx, MqttMessage msg) {
168168
boolean sessionPresent;
169169
if (clearSession) {
170170
sessionPresent = false;
171-
session = new Session();
172-
session.setClearSession(true);
173-
session.setClientId(clientId);
171+
session = Session.of(clientId, true);
174172
} else {
175173
sessionPresent = sessionService.hasKey(clientId);
176174
if (sessionPresent) {
177175
session = sessionService.find(clientId);
178176
} else {
179-
session = new Session();
180-
session.setClientId(clientId);
181-
session.setClearSession(false);
177+
session = Session.of(clientId, false);
182178
sessionService.save(session);
183179
}
184180
}

src/main/java/com/jun/mqttx/broker/handler/PubAckHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ public PubAckHandler(IPublishMessageService publishMessageService) {
2525
public void process(ChannelHandlerContext ctx, MqttMessage msg) {
2626
MqttPubAckMessage mqttPubAckMessage = (MqttPubAckMessage) msg;
2727
int messageId = mqttPubAckMessage.variableHeader().messageId();
28-
publishMessageService.remove(clientId(ctx), messageId);
28+
if (isCleanSession(ctx)) {
29+
getSession(ctx).removePubMsg(messageId);
30+
} else {
31+
publishMessageService.remove(clientId(ctx), messageId);
32+
}
2933
}
3034
}

src/main/java/com/jun/mqttx/broker/handler/PubComHandler.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,12 @@ public PubComHandler(IPubRelMessageService pubRelMessageService) {
2323
public void process(ChannelHandlerContext ctx, MqttMessage msg) {
2424
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = (MqttMessageIdVariableHeader) msg.variableHeader();
2525
int messageId = mqttMessageIdVariableHeader.messageId();
26-
String clientId = clientId(ctx);
27-
pubRelMessageService.remove(clientId, messageId);
26+
if (isCleanSession(ctx)) {
27+
getSession(ctx).removePubRelMsg(messageId);
28+
} else {
29+
String clientId = clientId(ctx);
30+
pubRelMessageService.remove(clientId, messageId);
31+
}
2832

2933
MqttMessage mqttMessage = MqttMessageFactory.newMessage(
3034
new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0),

src/main/java/com/jun/mqttx/broker/handler/PubRecHandler.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.jun.mqttx.broker.handler;
22

3+
import com.jun.mqttx.entity.Session;
34
import com.jun.mqttx.service.IPubRelMessageService;
45
import com.jun.mqttx.service.IPublishMessageService;
56
import io.netty.channel.ChannelHandlerContext;
@@ -25,14 +26,20 @@ public PubRecHandler(IPubRelMessageService pubRelMessageService, IPublishMessage
2526

2627
@Override
2728
public void process(ChannelHandlerContext ctx, MqttMessage msg) {
28-
//移除消息
29+
// 移除消息
2930
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = (MqttMessageIdVariableHeader) msg.variableHeader();
3031
int messageId = mqttMessageIdVariableHeader.messageId();
31-
String clientId = clientId(ctx);
32-
publishMessageService.remove(clientId, messageId);
33-
34-
//保存 pubRec
35-
pubRelMessageService.save(clientId, messageId);
32+
if (clearSession(ctx)) {
33+
Session session = getSession(ctx);
34+
session.removePubMsg(messageId);
35+
session.savePubRelMsg(messageId);
36+
} else {
37+
String clientId = clientId(ctx);
38+
publishMessageService.remove(clientId, messageId);
39+
40+
// 保存 pubRec
41+
pubRelMessageService.save(clientId, messageId);
42+
}
3643

3744
MqttMessage mqttMessage = MqttMessageFactory.newMessage(
3845
new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0),

src/main/java/com/jun/mqttx/broker/handler/PubRelHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@ public PubRelHandler(IPubRelMessageService pubRelMessageService) {
2626
public void process(ChannelHandlerContext ctx, MqttMessage msg) {
2727
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = (MqttMessageIdVariableHeader) msg.variableHeader();
2828
int messageId = mqttMessageIdVariableHeader.messageId();
29-
pubRelMessageService.remove(clientId(ctx), messageId);
29+
if (isCleanSession(ctx)) {
30+
getSession(ctx).removePubRelMsg(messageId);
31+
} else {
32+
pubRelMessageService.remove(clientId(ctx), messageId);
33+
}
3034

3135
MqttMessage mqttMessage = MqttMessageFactory.newMessage(
3236
new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0),

src/main/java/com/jun/mqttx/broker/handler/PublishHandler.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,11 @@ public void process(ChannelHandlerContext ctx, MqttMessage msg) {
142142
if (!pubRelMessageService.isDupMsg(clientId(ctx), packetId)) {
143143
//发布新的消息并保存 pubRel 标记,用于实现Qos2
144144
publish(pubMsg, ctx, false);
145-
pubRelMessageService.save(clientId(ctx), packetId);
145+
if (isCleanSession(ctx)) {
146+
getSession(ctx).savePubRelMsg(packetId);
147+
} else {
148+
pubRelMessageService.save(clientId(ctx), packetId);
149+
}
146150
}
147151
break;
148152
}
@@ -247,7 +251,12 @@ private void publish0(ChannelHandlerContext ctx, ClientSub clientSub, PubMsg pub
247251

248252
//集群消息不做保存,传播消息的 broker 已经保存过了
249253
if ((qos == MqttQoS.EXACTLY_ONCE || qos == MqttQoS.AT_LEAST_ONCE) && !isInternalMessage) {
250-
publishMessageService.save(clientId, pubMsg);
254+
if (isCleanSession(ctx)) {
255+
// 如果 cleanSession = 1,消息直接关联会话,不需要持久化
256+
getSession(ctx).savePubMsg(messageId, pubMsg);
257+
} else {
258+
publishMessageService.save(clientId, pubMsg);
259+
}
251260
}
252261

253262
//将消息推送给集群中的broker

src/main/java/com/jun/mqttx/entity/InternalMessage.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import lombok.AllArgsConstructor;
44
import lombok.Data;
5+
import lombok.NoArgsConstructor;
56

67
/**
78
* 集群消息,用于集群间消息发布
@@ -11,6 +12,7 @@
1112
*/
1213
@Data
1314
@AllArgsConstructor
15+
@NoArgsConstructor
1416
public class InternalMessage<T> {
1517
//@formatter:off
1618

src/main/java/com/jun/mqttx/entity/Session.java

Lines changed: 84 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
package com.jun.mqttx.entity;
22

3-
import com.alibaba.fastjson.annotation.JSONField;
43
import io.netty.handler.codec.mqtt.MqttPublishMessage;
54
import lombok.Data;
65

6+
import java.util.HashMap;
7+
import java.util.HashSet;
8+
import java.util.Map;
9+
import java.util.Set;
10+
711
/**
812
* MQTT 会话
913
*
@@ -23,21 +27,51 @@ public class Session {
2327
/**
2428
* 清理会话标志
2529
*/
26-
private Boolean clearSession;
30+
private Boolean cleanSession;
31+
32+
/**
33+
* 用于 cleanSession 连接,负责存储 qos > 0 的消息
34+
*/
35+
private transient Map<Integer, PubMsg> pubMsgStore;
36+
37+
/**
38+
* @see #pubMsgStore
39+
*/
40+
private transient Set<Integer> pubRelMsgStore;
2741

2842
/**
2943
* 遗嘱消息
3044
*/
31-
@JSONField(serialize = false, deserialize = false)
32-
private MqttPublishMessage willMessage;
45+
private transient MqttPublishMessage willMessage;
3346

3447
/**
3548
* 用于生成 msgId
3649
*/
3750
private int messageId;
3851

52+
private Session() {
53+
}
54+
3955
/**
40-
* session 绑定 channel, 而channel 是绑定了 EventLoop 线程的,这个方法是线程安全的(如果没有额外的配置)。
56+
* 创建会话
57+
*
58+
* @param clientId 客户端 id
59+
* @param cleanSession clean session 标识. true: 1; false: 0
60+
* @return Session for clean session = 1
61+
*/
62+
public static Session of(String clientId, boolean cleanSession) {
63+
Session session = new Session();
64+
session.setClientId(clientId);
65+
session.setCleanSession(cleanSession);
66+
if (cleanSession) {
67+
session.setPubMsgStore(new HashMap<>());
68+
session.setPubRelMsgStore(new HashSet<>());
69+
}
70+
return session;
71+
}
72+
73+
/**
74+
* session 绑定 channel, 而 channel 绑定 EventLoop 线程,这个方法是线程安全的(如果没有额外的配置)。
4175
*
4276
* @return {@link #messageId}
4377
*/
@@ -51,4 +85,49 @@ public int increaseAndGetMessageId() {
5185
public void clearWillMessage() {
5286
willMessage = null;
5387
}
88+
89+
/**
90+
* 保存 {@link PubMsg}
91+
*
92+
* @param messageId 消息id
93+
* @param pubMsg {@link PubMsg}
94+
*/
95+
public void savePubMsg(Integer messageId, PubMsg pubMsg) {
96+
if (cleanSession) {
97+
pubMsgStore.put(messageId, pubMsg);
98+
}
99+
}
100+
101+
/**
102+
* 移除 {@link PubMsg}
103+
*
104+
* @param messageId 消息id
105+
*/
106+
public void removePubMsg(int messageId) {
107+
if (cleanSession) {
108+
pubMsgStore.remove(messageId);
109+
}
110+
}
111+
112+
/**
113+
* 保存 {@link PubRelMsg}
114+
*
115+
* @param messageId 消息id
116+
*/
117+
public void savePubRelMsg(int messageId) {
118+
if (cleanSession) {
119+
pubRelMsgStore.add(messageId);
120+
}
121+
}
122+
123+
/**
124+
* 移除 {@link PubRelMsg}
125+
*
126+
* @param messageId 消息id
127+
*/
128+
public void removePubRelMsg(int messageId) {
129+
if (cleanSession) {
130+
pubRelMsgStore.remove(messageId);
131+
}
132+
}
54133
}

src/main/resources/banner.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@
66
|_| |_| |_|\__, |\__|\__/_/\_\
77
| |
88
|_|
9-
:: mqttx :: (v1.0.5.ALPHA)
9+
:: mqttx :: (v1.0.5.BETA)

0 commit comments

Comments
 (0)