Skip to content

Commit eafe281

Browse files
committed
feat: 系统主题功能支持
1. 一些小改动
1 parent 4f0d170 commit eafe281

File tree

3 files changed

+18
-11
lines changed

3 files changed

+18
-11
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<dependencies>
2323
<dependency>
2424
<groupId>org.springframework.boot</groupId>
25-
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
25+
<artifactId>spring-boot-starter-data-redis</artifactId>
2626
</dependency>
2727

2828
<dependency>

src/main/java/com/jun/mqttx/broker/handler/SubscribeHandler.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,10 @@
1010
import com.jun.mqttx.utils.TopicUtils;
1111
import io.netty.buffer.ByteBuf;
1212
import io.netty.buffer.Unpooled;
13-
import io.netty.channel.Channel;
1413
import io.netty.channel.ChannelHandlerContext;
1514
import io.netty.channel.group.ChannelGroup;
1615
import io.netty.channel.group.DefaultChannelGroup;
1716
import io.netty.handler.codec.mqtt.*;
18-
import io.netty.util.AttributeKey;
1917
import io.netty.util.concurrent.GlobalEventExecutor;
2018

2119
import java.util.ArrayList;
@@ -34,9 +32,6 @@
3432
@Handler(type = MqttMessageType.SUBSCRIBE)
3533
public class SubscribeHandler extends AbstractMqttTopicSecureHandler {
3634
//@formatter:off
37-
/** 客户端订阅的系统主题 key, 用于 {@link Channel#attr( AttributeKey)} */
38-
public static final String SYS_TOPICS_ATTR = "sys_topics";
39-
4035
private final boolean enableTopicPubSubSecure;
4136
private IRetainMessageService retainMessageService;
4237
private ISubscriptionService subscriptionService;
@@ -46,9 +41,13 @@ public class SubscribeHandler extends AbstractMqttTopicSecureHandler {
4641
private MqttQoS sysTopicQos;
4742
private String version;
4843

49-
/** 系统主题 $SYS 订阅群组 */
50-
public static final ChannelGroup sysChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
51-
/** 定时任务执行器 */
44+
/**
45+
* 系统主题 $SYS 订阅群组
46+
*/
47+
public static ChannelGroup sysChannels;
48+
/**
49+
* 定时任务执行器
50+
*/
5251
private ScheduledExecutorService fixRateExecutor;
5352
/** 系统主题消息 id */
5453
private AtomicInteger sysTopicMsgId;
@@ -64,6 +63,7 @@ public SubscribeHandler(IRetainMessageService retainMessageService, ISubscriptio
6463
this.version = mqttxConfig.getVersion();
6564
this.enableSysTopic = mqttxConfig.getSysTopic().getEnable();
6665
if (enableSysTopic) {
66+
SubscribeHandler.sysChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
6767
this.interval = mqttxConfig.getSysTopic().getInterval().getSeconds();
6868
this.sysTopicQos = MqttQoS.valueOf(mqttxConfig.getSysTopic().getQos());
6969
fixRateExecutor = Executors.newSingleThreadScheduledExecutor();

src/main/java/com/jun/mqttx/broker/handler/UnsubscribeHandler.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.jun.mqttx.broker.handler;
22

3+
import com.jun.mqttx.config.MqttxConfig;
34
import com.jun.mqttx.service.ISubscriptionService;
45
import com.jun.mqttx.utils.TopicUtils;
56
import io.netty.channel.Channel;
@@ -18,9 +19,12 @@
1819
@Handler(type = MqttMessageType.UNSUBSCRIBE)
1920
public class UnsubscribeHandler extends AbstractMqttSessionHandler {
2021

22+
private Boolean enableSysTopic;
23+
2124
private ISubscriptionService subscriptionService;
2225

23-
public UnsubscribeHandler(ISubscriptionService subscriptionService) {
26+
public UnsubscribeHandler(MqttxConfig config, ISubscriptionService subscriptionService) {
27+
this.enableSysTopic = config.getSysTopic().getEnable();
2428
this.subscriptionService = subscriptionService;
2529
}
2630

@@ -31,7 +35,10 @@ public void process(ChannelHandlerContext ctx, MqttMessage msg) {
3135
MqttUnsubscribePayload payload = mqttUnsubscribeMessage.payload();
3236

3337
// 系统主题
34-
List<String> collect = unsubscribeSysTopics(payload.topics(), ctx.channel());
38+
List<String> collect = payload.topics();
39+
if (enableSysTopic) {
40+
collect = unsubscribeSysTopics(payload.topics(), ctx.channel());
41+
}
3542

3643
// 非系统主题
3744
subscriptionService.unsubscribe(clientId(ctx), collect);

0 commit comments

Comments
 (0)