diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java index 96ebadb1ff4aa2..8cad6f8bb2d779 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java @@ -373,8 +373,8 @@ static void createTenantIfAbsent(PulsarResources resources, String tenant, Strin } } - static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName namespaceName, - String cluster, int bundleNumber) throws IOException { + public static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName namespaceName, + String cluster, int bundleNumber) throws IOException { NamespaceResources namespaceResources = resources.getNamespaceResources(); if (!namespaceResources.namespaceExists(namespaceName)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index fd4e94ba7774d0..1c295fe0561ca1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -624,7 +624,9 @@ public CompletableFuture> getOwnershipWithLookupDataA public CompletableFuture unloadNamespaceBundleAsync(ServiceUnitId bundle, Optional destinationBroker, - boolean force) { + boolean force, + long timeout, + TimeUnit timeoutUnit) { if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) { log.info("Skip unloading namespace bundle: {}.", bundle); return CompletableFuture.completedFuture(null); @@ -647,7 +649,7 @@ public CompletableFuture unloadNamespaceBundleAsync(ServiceUnitId bundle, UnloadDecision unloadDecision = new UnloadDecision(unload, UnloadDecision.Label.Success, UnloadDecision.Reason.Admin); return unloadAsync(unloadDecision, - conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS); + timeout, timeoutUnit); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index bc2b6a5c87f768..3d1071aefdebaa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -115,7 +115,6 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000; private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100; - private static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000; public static final long VERSION_ID_INIT = 1; // initial versionId private static final long OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS = 60; public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins @@ -305,7 +304,8 @@ public synchronized void start() throws PulsarServerException { } } PulsarClusterMetadataSetup.createNamespaceIfAbsent - (pulsar.getPulsarResources(), SYSTEM_NAMESPACE, config.getClusterName()); + (pulsar.getPulsarResources(), SYSTEM_NAMESPACE, config.getClusterName(), + config.getDefaultNumberOfNamespaceBundles()); ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC); @@ -1316,11 +1316,6 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max } } if (cleaned) { - try { - MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS); - } catch (InterruptedException e) { - log.warn("Interrupted while gracefully waiting for the cleanup convergence."); - } break; } else { try { @@ -1331,9 +1326,23 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max } } } + log.info("Finished cleanup waiting for orphan broker:{}. Elapsed {} ms", brokerId, + System.currentTimeMillis() - started); } private synchronized void doCleanup(String broker) { + try { + if (getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS) + .isEmpty()) { + log.error("Found the channel owner is empty. Skip the inactive broker:{}'s orphan bundle cleanup", + broker); + return; + } + } catch (Exception e) { + log.error("Failed to find the channel owner. Skip the inactive broker:{}'s orphan bundle cleanup", broker); + return; + } + long startTime = System.nanoTime(); log.info("Started ownership cleanup for the inactive broker:{}", broker); int orphanServiceUnitCleanupCnt = 0; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java index d916e917162230..5675f0a887ec2f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java @@ -31,7 +31,6 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TableView; @@ -44,6 +43,7 @@ public class TableViewLoadDataStoreImpl implements LoadDataStore { private static final long LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART = 2; + private static final long INIT_TIMEOUT_IN_SECS = 5; private volatile TableView tableView; private volatile long tableViewLastUpdateTimestamp; @@ -123,10 +123,11 @@ public synchronized void start() throws LoadDataStoreException { public synchronized void startTableView() throws LoadDataStoreException { if (tableView == null) { try { - tableView = client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create(); + tableView = client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).createAsync() + .get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS); tableView.forEachAndListen((k, v) -> tableViewLastUpdateTimestamp = System.currentTimeMillis()); - } catch (PulsarClientException e) { + } catch (Exception e) { tableView = null; throw new LoadDataStoreException(e); } @@ -137,8 +138,9 @@ public synchronized void startTableView() throws LoadDataStoreException { public synchronized void startProducer() throws LoadDataStoreException { if (producer == null) { try { - producer = client.newProducer(Schema.JSON(clazz)).topic(topic).create(); - } catch (PulsarClientException e) { + producer = client.newProducer(Schema.JSON(clazz)).topic(topic).createAsync() + .get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS); + } catch (Exception e) { producer = null; throw new LoadDataStoreException(e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 5814c6316dd0fd..b00a97e5c8ac56 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -783,7 +783,7 @@ public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle, boolean closeWithoutWaitingClientDisconnect) { if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { return ExtensibleLoadManagerImpl.get(loadManager.get()) - .unloadNamespaceBundleAsync(bundle, destinationBroker, false); + .unloadNamespaceBundleAsync(bundle, destinationBroker, false, timeout, timeoutUnit); } // unload namespace bundle OwnedBundle ob = ownershipCache.getOwnedBundle(bundle); @@ -1233,7 +1233,8 @@ public CompletableFuture removeOwnedServiceUnitAsync(NamespaceBundle nsBun if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get()); future = extensibleLoadManager.unloadNamespaceBundleAsync( - nsBundle, Optional.empty(), true); + nsBundle, Optional.empty(), true, + pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS); } else { future = ownershipCache.removeOwnership(nsBundle); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java new file mode 100644 index 00000000000000..bdaddf9afb1da2 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.protocol; + +import java.io.File; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.util.PortManager; +import org.apache.commons.io.FileUtils; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +public class PulsarClientBasedHandlerTest { + + private final static String clusterName = "cluster"; + private final static int shutdownTimeoutMs = 100; + private final int zkPort = PortManager.nextFreePort(); + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextFreePort); + private File tempDirectory; + private PulsarService pulsar; + + @BeforeClass + public void setup() throws Exception { + bk.start(); + final var config = new ServiceConfiguration(); + config.setClusterName(clusterName); + config.setAdvertisedAddress("localhost"); + config.setBrokerServicePort(Optional.of(0)); + config.setWebServicePort(Optional.of(0)); + config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort); + + tempDirectory = SimpleProtocolHandlerTestsBase.configureProtocolHandler(config, + PulsarClientBasedHandler.class.getName(), true); + + config.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); + config.setLoadBalancerDebugModeEnabled(true); + config.setBrokerShutdownTimeoutMs(shutdownTimeoutMs); + + pulsar = new PulsarService(config); + pulsar.start(); + } + + @Test(timeOut = 30000) + public void testStopBroker() throws PulsarServerException { + final var beforeStop = System.currentTimeMillis(); + final var handler = (PulsarClientBasedHandler) pulsar.getProtocolHandlers() + .protocol(PulsarClientBasedHandler.PROTOCOL); + pulsar.close(); + final var elapsedMs = System.currentTimeMillis() - beforeStop; + log.info("It spends {} ms to stop the broker ({} for protocol handler)", elapsedMs, handler.closeTimeMs); + Assert.assertTrue(elapsedMs < + + handler.closeTimeMs + shutdownTimeoutMs + 1000); // tolerate 1 more second for other processes + } + + @AfterClass(alwaysRun = true) + public void cleanup() throws Exception { + bk.stop(); + if (tempDirectory != null) { + FileUtils.deleteDirectory(tempDirectory); + } + } +}