Skip to content

Commit

Permalink
[fix][broker] Support advertised listeners when gracefully transferri…
Browse files Browse the repository at this point in the history
…ng topics (ExtensibleLoadManagerImpl only) (apache#22862)

(cherry picked from commit 5af0595)
  • Loading branch information
heesung-sn committed Jun 26, 2024
1 parent 602914e commit 5e1d7f4
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.getMigratedClusterUrl;
import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.apache.pulsar.common.protocol.Commands.newCloseConsumer;
import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -70,6 +69,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
Expand All @@ -81,6 +81,7 @@
import org.apache.pulsar.broker.limiter.ConnectionController;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
Expand Down Expand Up @@ -146,6 +147,7 @@
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
Expand Down Expand Up @@ -3116,15 +3118,28 @@ public void closeProducer(Producer producer, Optional<BrokerLookupData> assigned
closeProducer(producer.getProducerId(), producer.getEpoch(), assignedBrokerLookupData);
}

private LookupData getLookupData(BrokerLookupData lookupData) {
LookupOptions.LookupOptionsBuilder builder = LookupOptions.builder();
if (StringUtils.isNotBlank((listenerName))) {
builder.advertisedListenerName(listenerName);
}
try {
return lookupData.toLookupResult(builder.build()).getLookupData();
} catch (PulsarServerException e) {
log.error("Failed to get lookup data", e);
throw new RuntimeException(e);
}
}

private void closeProducer(long producerId, long epoch, Optional<BrokerLookupData> assignedBrokerLookupData) {
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
if (assignedBrokerLookupData.isPresent()) {
writeAndFlush(Commands.newCloseProducer(producerId, -1L,
assignedBrokerLookupData.get().pulsarServiceUrl(),
assignedBrokerLookupData.get().pulsarServiceUrlTls()));
} else {
writeAndFlush(Commands.newCloseProducer(producerId, -1L));
}
assignedBrokerLookupData.ifPresentOrElse(lookup -> {
LookupData lookupData = getLookupData(lookup);
writeAndFlush(Commands.newCloseProducer(producerId, -1L,
lookupData.getBrokerUrl(),
lookupData.getBrokerUrlTls()));
},
() -> writeAndFlush(Commands.newCloseProducer(producerId, -1L)));

// The client does not necessarily know that the producer is closed, but the connection is still
// active, and there could be messages in flight already. We want to ignore these messages for a time
Expand All @@ -3150,9 +3165,13 @@ public void closeConsumer(Consumer consumer, Optional<BrokerLookupData> assigned

private void closeConsumer(long consumerId, Optional<BrokerLookupData> assignedBrokerLookupData) {
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
writeAndFlush(newCloseConsumer(consumerId, -1L,
assignedBrokerLookupData.map(BrokerLookupData::pulsarServiceUrl).orElse(null),
assignedBrokerLookupData.map(BrokerLookupData::pulsarServiceUrlTls).orElse(null)));
assignedBrokerLookupData.ifPresentOrElse(lookup -> {
LookupData lookupData = getLookupData(lookup);
writeAndFlush(Commands.newCloseConsumer(consumerId, -1L,
lookupData.getBrokerUrl(),
lookupData.getBrokerUrlTls()));
},
() -> writeAndFlush(Commands.newCloseConsumer(consumerId, -1L, null, null)));
} else {
close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,14 @@ protected ExtensibleLoadManagerImplBaseTest(String defaultTestNamespace) {
this.defaultTestNamespace = defaultTestNamespace;
}

protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
@Override
protected void doInitConf() throws Exception {
super.doInitConf();
updateConfig(conf);
}


protected ServiceConfiguration updateConfig(ServiceConfiguration conf) {
conf.setForceDeleteNamespaceAllowed(true);
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
Expand All @@ -75,10 +82,9 @@ protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
@Override
@BeforeClass(alwaysRun = true)
protected void setup() throws Exception {
initConfig(conf);
super.internalSetup(conf);
pulsar1 = pulsar;
var conf2 = initConfig(getDefaultConf());
var conf2 = updateConfig(getDefaultConf());
additionalPulsarTestContext = createAdditionalPulsarTestContext(conf2);
pulsar2 = additionalPulsarTestContext.getPulsarService();

Expand Down Expand Up @@ -141,7 +147,7 @@ private void setSecondaryLoadManager() throws IllegalAccessException {
FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true);
}

protected CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService pulsar, TopicName topic) {
protected static CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService pulsar, TopicName topic) {
return pulsar.getNamespaceService().getBundleAsync(topic);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -390,6 +391,19 @@ public Object[][] isPersistentTopicSubscriptionTypeTest() {
@Test(timeOut = 30_000, dataProvider = "isPersistentTopicSubscriptionTypeTest")
public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, SubscriptionType subscriptionType)
throws Exception {
testTransferClientReconnectionWithoutLookup(topicDomain, subscriptionType, defaultTestNamespace, admin,
lookupUrl.toString(), pulsar1, pulsar2, primaryLoadManager, secondaryLoadManager);
}

@Test(enabled = false)
public static void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain,
SubscriptionType subscriptionType,
String defaultTestNamespace,
PulsarAdmin admin, String brokerServiceUrl,
PulsarService pulsar1, PulsarService pulsar2,
ExtensibleLoadManager primaryLoadManager,
ExtensibleLoadManager secondaryLoadManager)
throws Exception {
var id = String.format("test-tx-client-reconnect-%s-%s", subscriptionType, UUID.randomUUID());
var topic = String.format("%s://%s/%s", topicDomain.toString(), defaultTestNamespace, id);
var topicName = TopicName.get(topic);
Expand All @@ -399,15 +413,16 @@ public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain,
var consumers = new ArrayList<Consumer<String>>();
try {
var lookups = new ArrayList<LookupService>();

var pulsarClient = pulsarClient(brokerServiceUrl, 0);
clients.add(pulsarClient);
@Cleanup
var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
lookups.add(spyLookupService(pulsarClient));

var consumerCount = subscriptionType == SubscriptionType.Exclusive ? 1 : 3;

for (int i = 0; i < consumerCount; i++) {
var client = newPulsarClient(lookupUrl.toString(), 0);
var client = pulsarClient(brokerServiceUrl, 0);
clients.add(client);
var consumer = client.newConsumer(Schema.STRING).
subscriptionName(id).
Expand All @@ -434,7 +449,7 @@ public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain,
dstBrokerUrl = pulsar1.getBrokerId();
dstBrokerServiceUrl = pulsar1.getBrokerServiceUrl();
}
checkOwnershipState(broker, bundle);
checkOwnershipState(broker, bundle, primaryLoadManager, secondaryLoadManager, pulsar1);

var messageCountBeforeUnloading = 100;
var messageCountAfterUnloading = 100;
Expand Down Expand Up @@ -528,6 +543,17 @@ public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain,
@Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicSubscriptionTypeTest")
public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain,
SubscriptionType subscriptionType) throws Exception {
testUnloadClientReconnectionWithLookup(topicDomain, subscriptionType, defaultTestNamespace, admin,
lookupUrl.toString(), pulsar1);
}

@Test(enabled = false)
public static void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain,
SubscriptionType subscriptionType,
String defaultTestNamespace,
PulsarAdmin admin,
String brokerServiceUrl,
PulsarService pulsar1) throws Exception {
var id = String.format("test-unload-%s-client-reconnect-%s-%s",
topicDomain, subscriptionType, UUID.randomUUID());
var topic = String.format("%s://%s/%s", topicDomain, defaultTestNamespace, id);
Expand All @@ -536,6 +562,7 @@ public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain,
var consumers = new ArrayList<Consumer<String>>();
try {
@Cleanup
var pulsarClient = pulsarClient(brokerServiceUrl, 0);
var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();

var consumerCount = subscriptionType == SubscriptionType.Exclusive ? 1 : 3;
Expand Down Expand Up @@ -606,7 +633,10 @@ private LookupService spyLookupService(PulsarClient client) throws IllegalAccess
FieldUtils.writeDeclaredField(client, "lookup", lookup, true);
return lookup;
}
private void checkOwnershipState(String broker, NamespaceBundle bundle)

protected static void checkOwnershipState(String broker, NamespaceBundle bundle,
ExtensibleLoadManager primaryLoadManager,
ExtensibleLoadManager secondaryLoadManager, PulsarService pulsar1)
throws ExecutionException, InterruptedException {
var targetLoadManager = secondaryLoadManager;
var otherLoadManager = primaryLoadManager;
Expand All @@ -618,6 +648,11 @@ private void checkOwnershipState(String broker, NamespaceBundle bundle)
assertFalse(otherLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get());
}

protected void checkOwnershipState(String broker, NamespaceBundle bundle)
throws ExecutionException, InterruptedException {
checkOwnershipState(broker, bundle, primaryLoadManager, secondaryLoadManager, pulsar1);
}

@Test(timeOut = 30 * 1000)
public void testSplitBundleAdminAPI() throws Exception {
final String namespace = "public/testSplitBundleAdminAPI";
Expand Down Expand Up @@ -1576,4 +1611,11 @@ public String name() {

}

protected static PulsarClient pulsarClient(String url, int intervalInSecs) throws PulsarClientException {
return
PulsarClient.builder()
.serviceUrl(url)
.statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions;

import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicDomain;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
* Unit test for {@link ExtensibleLoadManagerImpl with AdvertisedListeners broker configs}.
*/
@Slf4j
@Test(groups = "flaky")
@SuppressWarnings("unchecked")
public class ExtensibleLoadManagerImplWithAdvertisedListenersTest extends ExtensibleLoadManagerImplBaseTest {

public String brokerServiceUrl;
public ExtensibleLoadManagerImplWithAdvertisedListenersTest() {
super("public/test");
}

@Override
protected ServiceConfiguration updateConfig(ServiceConfiguration conf) {
super.updateConfig(conf);
int privatePulsarPort = nextLockedFreePort();
int publicPulsarPort = nextLockedFreePort();
conf.setInternalListenerName("internal");
conf.setBindAddresses("external:pulsar://localhost:" + publicPulsarPort);
conf.setAdvertisedListeners(
"external:pulsar://localhost:" + publicPulsarPort +
",internal:pulsar://localhost:" + privatePulsarPort);
conf.setWebServicePortTls(Optional.empty());
conf.setBrokerServicePortTls(Optional.empty());
conf.setBrokerServicePort(Optional.of(privatePulsarPort));
conf.setWebServicePort(Optional.of(0));
brokerServiceUrl = conf.getBindAddresses().replaceAll("external:", "");
return conf;
}

@DataProvider(name = "isPersistentTopicSubscriptionTypeTest")
public Object[][] isPersistentTopicSubscriptionTypeTest() {
return new Object[][]{
{TopicDomain.non_persistent, SubscriptionType.Exclusive},
{TopicDomain.persistent, SubscriptionType.Key_Shared}
};
}

@Test(timeOut = 30_000, dataProvider = "isPersistentTopicSubscriptionTypeTest")
public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, SubscriptionType subscriptionType)
throws Exception {
ExtensibleLoadManagerImplTest.testTransferClientReconnectionWithoutLookup(topicDomain, subscriptionType,
defaultTestNamespace, admin,
brokerServiceUrl,
pulsar1, pulsar2, primaryLoadManager, secondaryLoadManager);
}

@Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicSubscriptionTypeTest")
public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain,
SubscriptionType subscriptionType) throws Exception {
ExtensibleLoadManagerImplTest.testUnloadClientReconnectionWithLookup(topicDomain, subscriptionType,
defaultTestNamespace, admin,
brokerServiceUrl,
pulsar1);
}

@DataProvider(name = "isPersistentTopicTest")
public Object[][] isPersistentTopicTest() {
return new Object[][]{{TopicDomain.persistent}, {TopicDomain.non_persistent}};
}

@Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicTest")
public void testOptimizeUnloadDisable(TopicDomain topicDomain) throws Exception {
ExtensibleLoadManagerImplTest.testOptimizeUnloadDisable(topicDomain, defaultTestNamespace, admin,
brokerServiceUrl, pulsar1, pulsar2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public ExtensibleLoadManagerImplWithTransactionCoordinatorTest() {
}

@Override
protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
conf = super.initConfig(conf);
protected ServiceConfiguration updateConfig(ServiceConfiguration conf) {
conf = super.updateConfig(conf);
conf.setTransactionCoordinatorEnabled(true);
return conf;
}
Expand Down

0 comments on commit 5e1d7f4

Please sign in to comment.