diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/AdapterChannel.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/AdapterChannel.java index fdbe75da..1789aef0 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/AdapterChannel.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/AdapterChannel.java @@ -16,6 +16,7 @@ import static com.google.common.base.Preconditions.checkArgument; import io.netty.channel.Channel; import io.netty.handler.codec.mqtt.MqttConnectMessage; +import io.netty.util.concurrent.FutureListener; import io.streamnative.pulsar.handlers.mqtt.common.Connection; import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterMessage; import io.streamnative.pulsar.handlers.mqtt.common.utils.FutureUtils; @@ -31,10 +32,11 @@ public class AdapterChannel { private final MQTTProxyAdapter adapter; @Getter private final InetSocketAddress broker; + @Getter private CompletableFuture channelFuture; - public AdapterChannel(MQTTProxyAdapter adapter, - InetSocketAddress broker, CompletableFuture channelFuture) { + public AdapterChannel(MQTTProxyAdapter adapter, InetSocketAddress broker, + CompletableFuture channelFuture) { this.adapter = adapter; this.broker = broker; this.channelFuture = channelFuture; @@ -68,16 +70,9 @@ private CompletableFuture writeConnectMessage(final Connection connection) return writeAndFlush(connection, new MqttAdapterMessage(connection.getClientId(), connectMessage)); } - /** - * When client subscribes, the adapter channel maybe close in exception, so register listener to close the - * related client channel and trigger reconnection. - * @param connection - */ - public void registerAdapterChannelInactiveListener(Connection connection) { + public void registerClosureListener(FutureListener closeListener) { channelFuture.thenAccept(channel -> { - MQTTProxyAdapter.AdapterHandler channelHandler = (MQTTProxyAdapter.AdapterHandler) - channel.pipeline().get(MQTTProxyAdapter.AdapterHandler.NAME); - channelHandler.registerAdapterChannelInactiveListener(connection); + channel.closeFuture().addListener(closeListener); }); } } diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/MQTTProxyAdapter.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/MQTTProxyAdapter.java index 83bb64dc..b9e7f02f 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/MQTTProxyAdapter.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/MQTTProxyAdapter.java @@ -45,10 +45,8 @@ import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService; import io.streamnative.pulsar.handlers.mqtt.proxy.impl.MQTTProxyProtocolMethodProcessor; import java.net.InetSocketAddress; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -157,17 +155,6 @@ public class AdapterHandler extends ChannelInboundHandlerAdapter { public static final String NAME = "adapter-handler"; - private final Set callbackConnections = Collections.newSetFromMap(new ConcurrentHashMap<>()); - - public void registerAdapterChannelInactiveListener(Connection connection) { - callbackConnections.add(connection); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - callbackConnections.forEach(connection -> connection.getChannel().close()); - } - @Override public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { checkArgument(message instanceof MqttAdapterMessage); diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java index ed68c595..7d7750bd 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java @@ -83,6 +83,7 @@ public class MQTTProxyProtocolMethodProcessor extends AbstractCommonProtocolMeth private Connection connection; private final LookupHandler lookupHandler; private final MQTTProxyConfiguration proxyConfig; + @Getter private final Map> topicBrokers; private final Map adapterChannels; @Getter @@ -396,9 +397,7 @@ private CompletableFuture doSubscribe(final MqttAdapterMessage adapter, fi .build(); MqttAdapterMessage mqttAdapterMessage = new MqttAdapterMessage(connection.getClientId(), subscribeMessage); - return writeToBroker(encodedPulsarTopicName, mqttAdapterMessage) - .thenAccept(__ -> - registerAdapterChannelInactiveListener(encodedPulsarTopicName)); + return writeToBroker(encodedPulsarTopicName, mqttAdapterMessage); }).collect(Collectors.toList()); return FutureUtil.waitForAll(writeFutures); }) @@ -420,11 +419,6 @@ private String getMqttTopicName(MqttTopicSubscription subscription, String encod } } - private void registerAdapterChannelInactiveListener(final String topic) { - CompletableFuture adapterChannel = topicBrokers.get(topic); - adapterChannel.thenAccept(channel -> channel.registerAdapterChannelInactiveListener(connection)); - } - @Override public void processUnSubscribe(final MqttAdapterMessage adapter) { final MqttUnsubscribeMessage msg = (MqttUnsubscribeMessage) adapter.getMqttMessage(); @@ -479,6 +473,19 @@ private CompletableFuture connectToBroker(final String topic) { final MqttConnectMessage connectMessage = connection.getConnectMessage(); adapterChannel.writeAndFlush(connection, new MqttAdapterMessage(connection.getClientId(), connectMessage)); + adapterChannel.registerClosureListener(future -> { + topicBrokers.values().remove(adapterChannel); + if (topicBrokers.values().size() <= 1) { + if (log.isDebugEnabled()) { + log.debug("Adapter channel inactive, close related connection {}", connection); + } + connection.getChannel().close(); + } else { + if (log.isDebugEnabled()) { + log.debug("connection {} has more than one AdapterChannel", connection); + } + } + }); return adapterChannel; }) ) diff --git a/tests/pom.xml b/tests/pom.xml index 8efb9e95..dc84b269 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -34,6 +34,12 @@ ${project.version} test + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + test + io.streamnative.pulsar.handlers pulsar-protocol-handler-mqtt-common @@ -106,4 +112,10 @@ + + + Eclipse Paho Repo + https://repo.eclipse.org/content/repositories/paho-releases/ + + \ No newline at end of file diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/paho/proxy/TestProxyConnectMultiBroker.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/paho/proxy/TestProxyConnectMultiBroker.java new file mode 100644 index 00000000..03658da0 --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/paho/proxy/TestProxyConnectMultiBroker.java @@ -0,0 +1,111 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.streamnative.pulsar.handlers.mqtt.mqtt3.paho.proxy; + +import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase; +import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonConfiguration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.IMqttMessageListener; +import org.eclipse.paho.client.mqttv3.MqttAsyncClient; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Test +@Slf4j +public class TestProxyConnectMultiBroker extends MQTTTestBase { + + @Override + protected MQTTCommonConfiguration initConfig() throws Exception { + MQTTCommonConfiguration mqtt = super.initConfig(); + mqtt.setDefaultNumberOfNamespaceBundles(4); + mqtt.setMqttProxyEnabled(true); + return mqtt; + } + + public static class Callback implements MqttCallback { + + @Override + public void connectionLost(Throwable throwable) { + log.info("Connection lost"); + } + + @Override + public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { + log.info("Message arrived"); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + } + } + + @Test(timeOut = 1000 * 60 * 5) + public void testProxyConnectMultiBroker() throws Exception { + int port = getMqttProxyPortList().get(0); + MqttAsyncClient client = new MqttAsyncClient("tcp://localhost:" + port, "test", new MemoryPersistence()); + MqttConnectOptions connectOptions = new MqttConnectOptions(); + connectOptions.setCleanSession(true); + connectOptions.setKeepAliveInterval(5); + log.info("connecting..."); + client.connect(connectOptions).waitForCompletion(); + log.info("connected"); + + client.subscribe("testsub1", 1).waitForCompletion(); + log.info("subscribed testsub1"); + // sleep the keep alive period to show that PING will happen in abscence of other messages. + Thread.sleep(6000); + + // make more subscriptions to connect to multiple brokers. + client.subscribe("testsub2", 1).waitForCompletion(); + log.info("subscribed testsub2"); + client.subscribe("testsub3", 1).waitForCompletion(); + log.info("subscribed testsub3"); + Map> msgs = new HashMap<>(); + String topic = "test1"; + client.subscribe(topic, 1, new IMqttMessageListener() { + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + msgs.compute(topic, (k, v) -> { + if (v == null) { + v = new ArrayList<>(); + } + v.add(new String(message.getPayload())); + return v; + }); + } + }).waitForCompletion(); + + // publish QoS 1 message to prevent the need for PINGREQ. Keep alive only sends ping in abscence of other + // messages. Refer to section 3.1.2.10 of the MQTT 3.1.1 specification. + for (int i = 0; i < 130; i++) { + log.info("Publishing message..." + System.currentTimeMillis()); + client.publish(topic, "test".getBytes(), 1, false).waitForCompletion(); + Thread.sleep(1000); + } + Assert.assertNotNull(msgs.get(topic) != null); + Assert.assertEquals(msgs.get(topic).size(), 130); + client.disconnect().waitForCompletion(); + } +}