Skip to content

Commit 5f88145

Browse files
committed
fix: 修复集群消息转换 bug
1 parent 2d60810 commit 5f88145

File tree

7 files changed

+44
-32
lines changed

7 files changed

+44
-32
lines changed

readme.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@
159159
![ak6nHK.png](https://s1.ax1x.com/2020/07/28/ak6nHK.png)
160160

161161
1. `mqttx.cluster.enable`:功能开关,默认 `false`
162-
162+
> `v1.0.5.RELEASE` 之前的版本集群消息处理存在 bug,无法使用.
163163
#### 4.4 ssl 支持
164164

165165
开启 ssl 你首先应该有了 *ca*(自签名或购买),然后修改 `application.yml` 文件中几个配置:

src/main/java/com/jun/mqttx/broker/BrokerHandler.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.jun.mqttx.broker;
22

3-
import com.alibaba.fastjson.JSONObject;
3+
import com.alibaba.fastjson.JSON;
4+
import com.alibaba.fastjson.TypeReference;
45
import com.jun.mqttx.broker.handler.AbstractMqttSessionHandler;
56
import com.jun.mqttx.broker.handler.ConnectHandler;
67
import com.jun.mqttx.broker.handler.MessageDelegatingHandler;
@@ -44,7 +45,7 @@
4445
@Slf4j
4546
@ChannelHandler.Sharable
4647
@Component
47-
public class BrokerHandler extends SimpleChannelInboundHandler<MqttMessage> implements Watcher<Object> {
48+
public class BrokerHandler extends SimpleChannelInboundHandler<MqttMessage> implements Watcher {
4849

4950
/**
5051
* channel 群组
@@ -234,26 +235,26 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
234235
* <li>修改用户权限</li>
235236
* </ol>
236237
*
237-
* @param im {@see InternalMessage}
238+
* @param msg 集群消息
238239
*/
239240
@Override
240-
public void action(InternalMessage<Object> im) {
241-
if (im.getData() instanceof JSONObject) {
242-
Authentication data = ((JSONObject) im.getData()).toJavaObject(Authentication.class);
243-
// 目的是为了兼容 v1.0.2(含) 之前的版本
244-
String clientId = StringUtils.isEmpty(data.getClientId()) ? data.getUsername() : data.getClientId();
245-
List<String> authorizedPub = data.getAuthorizedPub();
246-
List<String> authorizedSub = data.getAuthorizedSub();
247-
if (StringUtils.isEmpty(clientId) || (CollectionUtils.isEmpty(authorizedPub) && CollectionUtils.isEmpty(authorizedSub))) {
248-
log.info("权限修改参数非法:{}", im);
249-
return;
250-
}
251-
alterUserAuthorizedTopic(clientId, authorizedSub, authorizedPub);
241+
public void action(String msg) {
242+
InternalMessage<Authentication> im = JSON.parseObject(msg, new TypeReference<InternalMessage<Authentication>>() {
243+
});
244+
Authentication data = im.getData();
245+
// 目的是为了兼容 v1.0.2(含) 之前的版本
246+
String clientId = StringUtils.isEmpty(data.getClientId()) ? data.getUsername() : data.getClientId();
247+
List<String> authorizedPub = data.getAuthorizedPub();
248+
List<String> authorizedSub = data.getAuthorizedSub();
249+
if (StringUtils.isEmpty(clientId) || (CollectionUtils.isEmpty(authorizedPub) && CollectionUtils.isEmpty(authorizedSub))) {
250+
log.info("权限修改参数非法:{}", im);
251+
return;
252+
}
253+
alterUserAuthorizedTopic(clientId, authorizedSub, authorizedPub);
252254

253-
// 移除 cache&redis 中客户端订阅的 topic
254-
if (!CollectionUtils.isEmpty(authorizedSub)) {
255-
subscriptionService.clearClientSub(clientId, authorizedSub);
256-
}
255+
// 移除 cache&redis 中客户端订阅的 topic
256+
if (!CollectionUtils.isEmpty(authorizedSub)) {
257+
subscriptionService.clearClientSub(clientId, authorizedSub);
257258
}
258259
}
259260

src/main/java/com/jun/mqttx/broker/handler/DisconnectHandler.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.jun.mqttx.broker.handler;
22

3+
import com.alibaba.fastjson.JSON;
4+
import com.alibaba.fastjson.TypeReference;
35
import com.jun.mqttx.broker.BrokerHandler;
46
import com.jun.mqttx.constants.InternalMessageEnum;
57
import com.jun.mqttx.consumer.Watcher;
@@ -20,7 +22,7 @@
2022
*/
2123
@Slf4j
2224
@Handler(type = MqttMessageType.DISCONNECT)
23-
public final class DisconnectHandler extends AbstractMqttSessionHandler implements Watcher<String> {
25+
public final class DisconnectHandler extends AbstractMqttSessionHandler implements Watcher {
2426

2527
private ConnectHandler connectHandler;
2628

@@ -46,7 +48,9 @@ public void process(ChannelHandlerContext ctx, MqttMessage msg) {
4648
}
4749

4850
@Override
49-
public void action(InternalMessage<String> im) {
51+
public void action(String msg) {
52+
InternalMessage<String> im = JSON.parseObject(msg, new TypeReference<InternalMessage<String>>() {
53+
});
5054
ChannelId channelId = ConnectHandler.CLIENT_MAP.get(im.getData());
5155
if (channelId != null) {
5256
BrokerHandler.CHANNELS.find(channelId).close();

src/main/java/com/jun/mqttx/broker/handler/PublishHandler.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.jun.mqttx.broker.handler;
22

3+
import com.alibaba.fastjson.JSON;
4+
import com.alibaba.fastjson.TypeReference;
35
import com.jun.mqttx.broker.BrokerHandler;
46
import com.jun.mqttx.config.MqttxConfig;
57
import com.jun.mqttx.constants.InternalMessageEnum;
@@ -36,7 +38,7 @@
3638
* @since 1.0.4
3739
*/
3840
@Handler(type = MqttMessageType.PUBLISH)
39-
public class PublishHandler extends AbstractMqttTopicSecureHandler implements Watcher<PubMsg> {
41+
public class PublishHandler extends AbstractMqttTopicSecureHandler implements Watcher {
4042

4143
private IRetainMessageService retainMessageService;
4244

@@ -65,6 +67,7 @@ public class PublishHandler extends AbstractMqttTopicSecureHandler implements Wa
6567
*/
6668
private Map<String, AtomicInteger> roundMap;
6769

70+
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
6871
public PublishHandler(IPublishMessageService publishMessageService, IRetainMessageService retainMessageService,
6972
ISubscriptionService subscriptionService, IPubRelMessageService pubRelMessageService,
7073
@Nullable IInternalMessagePublishService internalMessagePublishService, MqttxConfig mqttxConfig,
@@ -182,7 +185,9 @@ public void process(ChannelHandlerContext ctx, MqttMessage msg) {
182185
}
183186

184187
@Override
185-
public void action(InternalMessage<PubMsg> im) {
188+
public void action(String msg) {
189+
InternalMessage<PubMsg> im = JSON.parseObject(msg, new TypeReference<InternalMessage<PubMsg>>() {
190+
});
186191
PubMsg data = im.getData();
187192
publish(data, null, true);
188193
}

src/main/java/com/jun/mqttx/consumer/InternalMessageSubscriber.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ public InternalMessageSubscriber(List<Watcher> watchers, MqttxConfig mqttxConfig
4545
* @param message 消息内容
4646
* @param channel 订阅频道
4747
*/
48-
@SuppressWarnings("unchecked")
4948
public void handleMessage(String message, String channel) {
5049
// 同 broker 消息屏蔽
5150
InternalMessage internalMessage = JSON.parseObject(message, InternalMessage.class);
@@ -55,7 +54,7 @@ public void handleMessage(String message, String channel) {
5554

5655
for (Watcher watcher : watchers) {
5756
if (watcher.support(channel)) {
58-
watcher.action(JSON.parseObject(message, InternalMessage.class));
57+
watcher.action(message);
5958
// 一个消息只能由一个观察者消费
6059
break;
6160
}

src/main/java/com/jun/mqttx/consumer/Watcher.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,22 @@
11
package com.jun.mqttx.consumer;
22

33
import com.jun.mqttx.constants.InternalMessageEnum;
4-
import com.jun.mqttx.entity.InternalMessage;
54

65
/**
76
* 观察者,实现此接口.
87
*
98
* @author Jun
109
* @since 1.0.4
1110
*/
12-
public interface Watcher<T> {
11+
public interface Watcher {
1312

1413
/**
1514
* 每当有新的集群消息达到是,触发行为。
1615
* 注意:实现方法不应该有耗时操作(e.g. 访问数据库)
1716
*
18-
* @param im {@see InternalMessage}
17+
* @param msg 集群消息
1918
*/
20-
void action(InternalMessage<T> im);
19+
void action(String msg);
2120

2221
/**
2322
* Watcher 支持的 channel 类别

src/main/java/com/jun/mqttx/service/impl/SubscriptionServiceImpl.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.jun.mqttx.service.impl;
22

3+
import com.alibaba.fastjson.JSON;
4+
import com.alibaba.fastjson.TypeReference;
35
import com.jun.mqttx.config.MqttxConfig;
46
import com.jun.mqttx.constants.InternalMessageEnum;
57
import com.jun.mqttx.consumer.Watcher;
@@ -28,7 +30,7 @@
2830
*/
2931
@Slf4j
3032
@Service
31-
public class SubscriptionServiceImpl implements ISubscriptionService, Watcher<ClientSubOrUnsubMsg> {
33+
public class SubscriptionServiceImpl implements ISubscriptionService, Watcher {
3234

3335
/**
3436
* 按顺序 -> 订阅,解除订阅,删除 topic
@@ -229,8 +231,10 @@ public void clearClientSub(String clientId, List<String> authorizedSub) {
229231
}
230232

231233
@Override
232-
public void action(InternalMessage<ClientSubOrUnsubMsg> im) {
234+
public void action(String msg) {
233235
if (enableInnerCache) {
236+
InternalMessage<ClientSubOrUnsubMsg> im = JSON.parseObject(msg, new TypeReference<InternalMessage<ClientSubOrUnsubMsg>>() {
237+
});
234238
ClientSubOrUnsubMsg data = im.getData();
235239
int type = data.getType();
236240
switch (type) {

0 commit comments

Comments
 (0)