diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 41f83c3d9c0fdd..722f1320244356 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2882,6 +2882,15 @@ public double getLoadBalancerBandwidthOutResourceWeight() { private String loadManagerServiceUnitStateTableViewClassName = "org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl"; + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "Enable ServiceUnitTableViewSyncer to sync service unit(bundle) states between metadata store and " + + "system topic table views during migration from one to the other. One could enable this" + + " syncer before migration and disable it after the migration finishes." + ) + private boolean loadBalancerServiceUnitTableViewSyncerEnabled = false; + /**** --- Replication. --- ****/ @FieldContext( category = CATEGORY_REPLICATION, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index ca01c8242277e2..243232b747e1ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -50,6 +50,7 @@ import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewSyncer; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData; @@ -167,6 +168,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS private TopBundleLoadDataReporter topBundleLoadDataReporter; + @Getter + protected ServiceUnitStateTableViewSyncer serviceUnitStateTableViewSyncer; + private volatile ScheduledFuture brokerLoadDataReportTask; private volatile ScheduledFuture topBundlesLoadDataReportTask; @@ -399,6 +403,7 @@ public void start() throws PulsarServerException { serviceUnitStateChannel, unloadCounter, unloadMetrics); this.splitScheduler = new SplitScheduler( pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context); + this.serviceUnitStateTableViewSyncer = new ServiceUnitStateTableViewSyncer(); pulsar.runWhenReadyForIncomingRequests(() -> { try { @@ -770,6 +775,7 @@ public void close() throws PulsarServerException { this.topBundlesLoadDataStore.shutdown(); this.unloadScheduler.close(); this.splitScheduler.close(); + this.serviceUnitStateTableViewSyncer.close(); } catch (IOException ex) { throw new PulsarServerException(ex); } finally { @@ -824,6 +830,9 @@ synchronized void playLeader() { topBundlesLoadDataStore.init(); unloadScheduler.start(); serviceUnitStateChannel.scheduleOwnershipMonitor(); + if (pulsar.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()) { + serviceUnitStateTableViewSyncer.start(pulsar); + } break; } catch (Throwable e) { log.warn("The broker:{} failed to set the role. Retrying {} th ...", @@ -873,6 +882,7 @@ synchronized void playFollower() { brokerLoadDataStore.init(); topBundlesLoadDataStore.close(); topBundlesLoadDataStore.startProducer(); + serviceUnitStateTableViewSyncer.close(); break; } catch (Throwable e) { log.warn("The broker:{} failed to set the role. Retrying {} th ...", @@ -951,12 +961,20 @@ protected void monitor() { + "Playing the leader role.", role, isChannelOwner); playLeader(); } + + if (pulsar.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()) { + serviceUnitStateTableViewSyncer.start(pulsar); + } else { + serviceUnitStateTableViewSyncer.close(); + } + } else { if (role != Follower) { log.warn("Current role:{} does not match with the channel ownership:{}. " + "Playing the follower role.", role, isChannelOwner); playFollower(); } + serviceUnitStateTableViewSyncer.close(); } } catch (Throwable e) { log.error("Failed to get the channel ownership.", e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java new file mode 100644 index 00000000000000..438afd17a5c6af --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.broker.loadbalance.extensions.channel; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.ObjectMapperFactory; + +/** + * Defines ServiceUnitTableViewSyncer. + * It syncs service unit(bundle) states between metadata store and system topic table views. + * One could enable this syncer before migration from one to the other and disable it after the migration finishes. + */ +@Slf4j +public class ServiceUnitStateTableViewSyncer implements Cloneable { + private static final int MAX_CONCURRENT_SYNC_COUNT = 100; + private volatile ServiceUnitStateTableView systemTopicTableView; + private volatile ServiceUnitStateTableView metadataStoreTableView; + private volatile boolean isActive = false; + + public void start(PulsarService pulsar) + throws IOException, TimeoutException, InterruptedException, ExecutionException { + if (!pulsar.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()) { + return; + } + + if (isActive) { + return; + } + + try { + long started = System.currentTimeMillis(); + + if (metadataStoreTableView != null) { + metadataStoreTableView.close(); + metadataStoreTableView = null; + } + metadataStoreTableView = new ServiceUnitStateMetadataStoreTableViewImpl(); + metadataStoreTableView.start( + pulsar, + this::syncToSystemTopic, + (k, v) -> {} + ); + log.info("Started MetadataStoreTableView"); + + if (systemTopicTableView != null) { + systemTopicTableView.close(); + systemTopicTableView = null; + } + systemTopicTableView = new ServiceUnitStateTableViewImpl(); + systemTopicTableView.start( + pulsar, + this::syncToMetadataStore, + (k, v) -> {} + ); + log.info("Started SystemTopicTableView"); + + List> futures = new ArrayList<>(); + var opTimeout = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(); + for (var e : metadataStoreTableView.entrySet()) { + futures.add(syncToSystemTopic(e.getKey(), e.getValue())); + if (futures.size() == MAX_CONCURRENT_SYNC_COUNT) { + FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS); + futures.clear(); + } + } + FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS); + futures.clear(); + + // Directly use store to sync existing items to metadataStoreTableView(otherwise, they are conflicted out) + var store = pulsar.getLocalMetadataStore(); + var writer = ObjectMapperFactory.getMapper().writer(); + for (var e : systemTopicTableView.entrySet()) { + futures.add(store.put(ServiceUnitStateMetadataStoreTableViewImpl.PATH_PREFIX + "/" + e.getKey(), + writer.writeValueAsBytes(e.getValue()), Optional.empty()).thenApply(__ -> null)); + if (futures.size() == MAX_CONCURRENT_SYNC_COUNT) { + FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS); + futures.clear(); + } + } + FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS); + futures.clear(); + int max = Math.max(metadataStoreTableView.entrySet().size(), systemTopicTableView.entrySet().size()); + int syncTimeout = opTimeout * max; + while (metadataStoreTableView.entrySet().size() != systemTopicTableView.entrySet().size()) { + if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started) > syncTimeout) { + throw new TimeoutException( + "Failed to sync tableviews. MetadataStoreTableView.size: " + + metadataStoreTableView.entrySet().size() + + ", SystemTopicTableView.size: " + systemTopicTableView.entrySet().size() + " in " + + syncTimeout + " secs"); + } + Thread.sleep(100); + } + + log.info("Successfully started ServiceUnitStateTableViewSyncer MetadataStoreTableView.size:{} , " + + "SystemTopicTableView.size: {} in {} secs", + metadataStoreTableView.entrySet().size(), systemTopicTableView.entrySet().size(), + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started)); + isActive = true; + + } catch (Throwable e) { + log.error("Failed to start ServiceUnitStateTableViewSyncer", e); + throw e; + } + } + + private CompletableFuture syncToSystemTopic(String key, ServiceUnitStateData data) { + return systemTopicTableView.put(key, data); + } + + private CompletableFuture syncToMetadataStore(String key, ServiceUnitStateData data) { + return metadataStoreTableView.put(key, data); + } + + public void close() throws IOException { + if (!isActive) { + return; + } + + try { + if (systemTopicTableView != null) { + systemTopicTableView.close(); + systemTopicTableView = null; + log.info("Closed SystemTopicTableView"); + } + } catch (Exception e) { + log.error("Failed to close SystemTopicTableView", e); + throw e; + } + + try { + if (metadataStoreTableView != null) { + metadataStoreTableView.close(); + metadataStoreTableView = null; + log.info("Closed MetadataStoreTableView"); + } + } catch (Exception e) { + log.error("Failed to close MetadataStoreTableView", e); + throw e; + } + + log.info("Successfully closed ServiceUnitStateTableViewSyncer."); + isActive = false; + } + + public boolean isActive() { + return isActive; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 0b63d8d9e64026..09628e482ee359 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -90,6 +90,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateMetadataStoreTableViewImpl; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData; @@ -1237,6 +1238,194 @@ public void testDeployAndRollbackLoadManager() throws Exception { } } + @Test(priority = 200) + public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { + + pulsar.getAdminClient().brokers() + .updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncerEnabled", "true"); + makeSecondaryAsLeader(); + makePrimaryAsLeader(); + Awaitility.waitAtMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer().isActive())); + Awaitility.waitAtMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive())); + ServiceConfiguration defaultConf = getDefaultConf(); + defaultConf.setAllowAutoTopicCreation(true); + defaultConf.setForceDeleteNamespaceAllowed(true); + defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName()); + defaultConf.setLoadBalancerSheddingEnabled(false); + defaultConf.setLoadManagerServiceUnitStateTableViewClassName(ServiceUnitStateTableViewImpl.class.getName()); + try (var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf)) { + // start pulsar3 with ServiceUnitStateTableViewImpl + @Cleanup + var pulsar3 = additionalPulsarTestContext.getPulsarService(); + Pair topicAndBundle = + getBundleIsNotOwnByChangeEventTopic("testDeployAndRollbackLoadManager"); + TopicName topicName = topicAndBundle.getLeft(); + NamespaceBundle bundle = topicAndBundle.getRight(); + String topic = topicName.toString(); + + String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic); + //assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl()); + + String lookupResult2 = pulsar1.getAdminClient().lookups().lookupTopic(topic); + String lookupResult3 = pulsar2.getAdminClient().lookups().lookupTopic(topic); + assertEquals(lookupResult1, lookupResult2); + assertEquals(lookupResult1, lookupResult3); + + LookupOptions options = LookupOptions.builder() + .authoritative(false) + .requestHttps(false) + .readOnly(false) + .loadTopicsInBundle(false).build(); + Optional webServiceUrl1 = + pulsar1.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrl1.isPresent()); + //assertEquals(webServiceUrl1.get().toString(), pulsar3.getWebServiceAddress()); + + Optional webServiceUrl2 = + pulsar2.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrl2.isPresent()); + //assertEquals(webServiceUrl2.get().toString(), webServiceUrl1.get().toString()); + + Optional webServiceUrl3 = + pulsar3.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrl3.isPresent()); + //assertEquals(webServiceUrl3.get().toString(), webServiceUrl1.get().toString()); + + List pulsarServices = List.of(pulsar1, pulsar2, pulsar3); + for (PulsarService pulsarService : pulsarServices) { + // Test lookup heartbeat namespace's topic + for (PulsarService pulsar : pulsarServices) { + assertLookupHeartbeatOwner(pulsarService, + pulsar.getBrokerId(), pulsar.getBrokerServiceUrl()); + } + // Test lookup SLA namespace's topic + for (PulsarService pulsar : pulsarServices) { + assertLookupSLANamespaceOwner(pulsarService, + pulsar.getBrokerId(), pulsar.getBrokerServiceUrl()); + } + } + + // Start broker4 with ServiceUnitStateMetadataStoreTableViewImpl + ServiceConfiguration conf = getDefaultConf(); + conf.setAllowAutoTopicCreation(true); + conf.setForceDeleteNamespaceAllowed(true); + conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName()); + conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); + conf.setLoadManagerServiceUnitStateTableViewClassName( + ServiceUnitStateMetadataStoreTableViewImpl.class.getName()); + try (var additionPulsarTestContext = createAdditionalPulsarTestContext(conf)) { + @Cleanup + var pulsar4 = additionPulsarTestContext.getPulsarService(); + + Set availableCandidates = Sets.newHashSet( + pulsar1.getBrokerServiceUrl(), + pulsar2.getBrokerServiceUrl(), + pulsar3.getBrokerServiceUrl(), + pulsar4.getBrokerServiceUrl()); + String lookupResult4 = pulsar4.getAdminClient().lookups().lookupTopic(topic); + assertTrue(availableCandidates.contains(lookupResult4)); + + String lookupResult5 = pulsar1.getAdminClient().lookups().lookupTopic(topic); + String lookupResult6 = pulsar2.getAdminClient().lookups().lookupTopic(topic); + String lookupResult7 = pulsar3.getAdminClient().lookups().lookupTopic(topic); + assertEquals(lookupResult4, lookupResult5); + assertEquals(lookupResult4, lookupResult6); + assertEquals(lookupResult4, lookupResult7); + + Set availableWebUrlCandidates = Sets.newHashSet( + pulsar1.getWebServiceAddress(), + pulsar2.getWebServiceAddress(), + pulsar3.getWebServiceAddress(), + pulsar4.getWebServiceAddress()); + + webServiceUrl1 = + pulsar1.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrl1.isPresent()); + assertTrue(availableWebUrlCandidates.contains(webServiceUrl1.get().toString())); + + webServiceUrl2 = + pulsar2.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrl2.isPresent()); + assertEquals(webServiceUrl2.get().toString(), webServiceUrl1.get().toString()); + + webServiceUrl3 = + pulsar3.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrl3.isPresent()); + assertTrue(availableWebUrlCandidates.contains(webServiceUrl3.get().toString())); + + var webServiceUrl4 = + pulsar4.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrl4.isPresent()); + assertEquals(webServiceUrl4.get().toString(), webServiceUrl1.get().toString()); + + pulsarServices = List.of(pulsar1, pulsar2, pulsar3, pulsar4); + for (PulsarService pulsarService : pulsarServices) { + // Test lookup heartbeat namespace's topic + for (PulsarService pulsar : pulsarServices) { + assertLookupHeartbeatOwner(pulsarService, + pulsar.getBrokerId(), pulsar.getBrokerServiceUrl()); + } + // Test lookup SLA namespace's topic + for (PulsarService pulsar : pulsarServices) { + assertLookupSLANamespaceOwner(pulsarService, + pulsar.getBrokerId(), pulsar.getBrokerServiceUrl()); + } + } + // Check if the broker is available + var wrapper = (ExtensibleLoadManagerWrapper) pulsar4.getLoadManager().get(); + var loadManager4 = spy((ExtensibleLoadManagerImpl) + FieldUtils.readField(wrapper, "loadManager", true)); + loadManager4.getBrokerRegistry().unregister(); + + NamespaceName slaMonitorNamespace = + getSLAMonitorNamespace(pulsar4.getBrokerId(), pulsar.getConfiguration()); + String slaMonitorTopic = slaMonitorNamespace.getPersistentTopicName("test"); + String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + assertNotNull(result); + log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); + assertNotEquals(result, pulsar4.getBrokerServiceUrl()); + + Producer producer = pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create(); + producer.send("t1"); + + // Test re-register broker and check the lookup result + loadManager4.getBrokerRegistry().register(); + + result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + assertNotNull(result); + log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); + assertEquals(result, pulsar4.getBrokerServiceUrl()); + + producer.send("t2"); + Producer producer1 = pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create(); + producer1.send("t3"); + + producer.close(); + producer1.close(); + @Cleanup + Consumer consumer = pulsar.getClient().newConsumer(Schema.STRING) + .topic(slaMonitorTopic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName("test") + .subscribe(); + // receive message t1 t2 t3 + assertEquals(consumer.receive().getValue(), "t1"); + assertEquals(consumer.receive().getValue(), "t2"); + assertEquals(consumer.receive().getValue(), "t3"); + } + } + + pulsar.getAdminClient().brokers() + .updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncerEnabled", "false"); + makeSecondaryAsLeader(); + Awaitility.waitAtMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertFalse(primaryLoadManager.getServiceUnitStateTableViewSyncer().isActive())); + Awaitility.waitAtMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive())); + } + private void assertLookupHeartbeatOwner(PulsarService pulsar, String brokerId, String expectedBrokerServiceUrl) throws Exception {