From 2199eb7385a47f9a44077eefd191383b6bf8cf09 Mon Sep 17 00:00:00 2001 From: Kai Wang Date: Wed, 19 Feb 2025 21:41:07 +0800 Subject: [PATCH] [improve][broker] Fix non-persistent system topic schema compatibility (#23286) When upgrading broker version from `3.0.x` to `3.3.x` with `ExtensibleLoadManagerImpl` enabled, it will have an `Unable to read schema` exception. And the broker will fail to start. This issue is caused by https://github.com/apache/pulsar/pull/22055 . Add a new class `NonPersistentSystemTopic`, and it will use for system non-persistent topic. (cherry picked from commit 7dbd8a507fc07d4fd5dc813ad76ee8080afd43cb) (cherry picked from commit 1c86d2517354b357926fdd2bcd0bca1f5a586323) --- .../pulsar/broker/service/BrokerService.java | 7 +++- .../NonPersistentSystemTopic.java | 32 ++++++++++++++ .../ExtensibleLoadManagerImplTest.java | 42 +++++++++++++++++++ 3 files changed, 80 insertions(+), 1 deletion(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSystemTopic.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 4f88baf6f618b..5dc05bdf10a0b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -120,6 +120,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.service.TopicEventsListener.EventStage; import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSystemTopic; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; @@ -1326,7 +1327,11 @@ private CompletableFuture> createNonPersistentTopic(String topic final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); NonPersistentTopic nonPersistentTopic; try { - nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class); + if (isSystemTopic(topic)) { + nonPersistentTopic = new NonPersistentSystemTopic(topic, this); + } else { + nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class); + } nonPersistentTopic.setCreateFuture(topicFuture); } catch (Throwable e) { log.warn("Failed to create topic {}", topic, e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSystemTopic.java new file mode 100644 index 0000000000000..9b867c9a8b3b6 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSystemTopic.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.nonpersistent; + +import org.apache.pulsar.broker.service.BrokerService; + +public class NonPersistentSystemTopic extends NonPersistentTopic { + public NonPersistentSystemTopic(String topic, BrokerService brokerService) { + super(topic, brokerService); + } + + @Override + public boolean isSystemTopic() { + return true; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 69f5b085d24c1..08c76e119556b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -73,6 +73,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import lombok.Cleanup; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.tuple.Pair; @@ -94,6 +95,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter; import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder; +import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory; import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.broker.namespace.LookupOptions; @@ -101,6 +103,7 @@ import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSystemTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; @@ -114,6 +117,7 @@ import org.apache.pulsar.common.policies.data.BrokerAssignment; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; @@ -121,6 +125,7 @@ import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; import org.awaitility.Awaitility; import org.mockito.MockedStatic; +import org.testng.Assert; import org.testng.AssertJUnit; import org.testng.annotations.Test; @@ -1476,6 +1481,43 @@ public void compactionScheduleTest() { }); } + @Test + public void testSystemNonPersistentTopicSchemaCompatibility() throws Exception { + String topicName = ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC; + NonPersistentSystemTopic topic = new NonPersistentSystemTopic(topicName, pulsar.getBrokerService()); + Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy()); + + var brokerLoadDataStore = LoadDataStoreFactory.create(pulsar, topicName, BrokerLoadDataV1.class); + brokerLoadDataStore.init(); + brokerLoadDataStore.pushAsync("key", new BrokerLoadDataV1()).get(); + Awaitility.await().until(() -> { + var data = brokerLoadDataStore.get("key"); + return data.isPresent(); + }); + brokerLoadDataStore.pushAsync("key", null).get(); + brokerLoadDataStore.close(); + } + + @Data + private static class BrokerLoadDataV1 { + private ResourceUsage cpu; + private ResourceUsage memory; + private ResourceUsage directMemory; + private ResourceUsage bandwidthIn; + private ResourceUsage bandwidthOut; + private double msgThroughputIn; + private double msgThroughputOut; + private double msgRateIn; + private double msgRateOut; + private int bundleCount; + private int topics; + private double maxResourceUsage; + private double weightedMaxEMA; + private double msgThroughputEMA; + private long updatedAt; + private long reportedAt; + } + @Test(timeOut = 10 * 1000) public void unloadTimeoutCheckTest() throws Exception {