From 167b9901a3a9995fbe3b9ee8240a9f9c8b4572cd Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 27 Sep 2024 16:06:41 +0800 Subject: [PATCH 1/5] - --- .../service/SystemTopicBasedTopicPoliciesService.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 18b4c610a5c9b..26a7f310cb7c6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -133,7 +133,13 @@ public CompletableFuture deleteTopicPoliciesAsync(TopicName topicName) { if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject()) || isSelf(topicName)) { return CompletableFuture.completedFuture(null); } - return sendTopicPolicyEvent(topicName, ActionType.DELETE, null); + return pulsarService.getNamespaceService().checkTopicExists(NamespaceEventsSystemTopicFactory + .getEventsTopicName(topicName.getNamespaceObject())).thenCompose(topicExistsInfo -> { + if (topicExistsInfo.isExists()) { + return CompletableFuture.completedFuture(null); + } + return sendTopicPolicyEvent(topicName, ActionType.DELETE, null); + }); } @Override From c95a7f3049b77a2d2aaab3eb091a167d54794dc7 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 27 Sep 2024 17:05:15 +0800 Subject: [PATCH 2/5] add test --- .../OneWayReplicatorUsingGlobalZKTest.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index d99969fbaa7e5..519907bb7c853 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -19,16 +19,24 @@ package org.apache.pulsar.broker.service; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.time.Duration; import java.util.Arrays; import java.util.HashSet; +import java.util.Map; +import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; @@ -173,4 +181,40 @@ public void testDifferentTopicCreationRule(ReplicationMode replicationMode) thro public void testReplicationCountMetrics() throws Exception { super.testReplicationCountMetrics(); } + + @Test(enabled = true) + public void testRemoveCluster() throws Exception { + // Initialize. + final String ns1 = defaultTenant + "/" + "ns_73b1a31afce34671a5ddc48fe5ad7fc8"; + final String topic = "persistent://" + ns1 + "/___tp-5dd50794-7af8-4a34-8a0b-06188052c66a"; + final String topicChangeEvents = "persistent://" + ns1 + "/__change_events"; + admin1.namespaces().createNamespace(ns1); + admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2))); + admin1.topics().createNonPartitionedTopic(topic); + + // Wait for loading topic up. + Producer p = client1.newProducer(Schema.STRING).topic(topic).create(); + Awaitility.await().untilAsserted(() -> { + Map>> tps = pulsar1.getBrokerService().getTopics(); + assertTrue(tps.containsKey(topic)); + assertTrue(tps.containsKey(topicChangeEvents)); + }); + + // The topics under the namespace of the cluster-1 will be deleted. + // Verify the result. + admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster2))); + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + Map>> tps = pulsar1.getBrokerService().getTopics(); + assertFalse(tps.containsKey(topic)); + assertFalse(tps.containsKey(topicChangeEvents)); + assertFalse(pulsar1.getNamespaceService().checkTopicExists(TopicName.get(topic)).join().isExists()); + assertFalse(pulsar1.getNamespaceService() + .checkTopicExists(TopicName.get(topicChangeEvents)).join().isExists()); + }); + + // cleanup. + p.close(); + admin2.topics().delete(topic); + admin2.namespaces().deleteNamespace(ns1); + } } From c369bcb34ba51774873fa902b4a173ee4b2daa08 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 27 Sep 2024 17:06:16 +0800 Subject: [PATCH 3/5] - --- .../broker/service/OneWayReplicatorUsingGlobalZKTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index 519907bb7c853..7493e7024c19a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -182,7 +182,7 @@ public void testReplicationCountMetrics() throws Exception { super.testReplicationCountMetrics(); } - @Test(enabled = true) + @Test public void testRemoveCluster() throws Exception { // Initialize. final String ns1 = defaultTenant + "/" + "ns_73b1a31afce34671a5ddc48fe5ad7fc8"; From fd5bdf31d32412e2a467cdc5a43326dc4d016c9d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 30 Sep 2024 10:42:07 +0800 Subject: [PATCH 4/5] - --- bin/ledgers.md | 2 ++ .../systopic/NamespaceEventsSystemTopicFactory.java | 8 ++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) create mode 100644 bin/ledgers.md diff --git a/bin/ledgers.md b/bin/ledgers.md new file mode 100644 index 0000000000000..8f0f0066c3e56 --- /dev/null +++ b/bin/ledgers.md @@ -0,0 +1,2 @@ +- "q6/persist/botnet_legacy_proto_bytes-partition-1" + - \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java index f5e6c7748d10b..199026bc4c445 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java @@ -37,12 +37,16 @@ public NamespaceEventsSystemTopicFactory(PulsarClient client) { } public TopicPoliciesSystemTopicClient createTopicPoliciesSystemTopicClient(NamespaceName namespaceName) { - TopicName topicName = TopicName.get(TopicDomain.persistent.value(), namespaceName, - SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); + TopicName topicName = getEventsTopicName(namespaceName); log.info("Create topic policies system topic client {}", topicName.toString()); return new TopicPoliciesSystemTopicClient(client, topicName); } + public static TopicName getEventsTopicName(NamespaceName namespaceName) { + return TopicName.get(TopicDomain.persistent.value(), namespaceName, + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); + } + public TransactionBufferSnapshotBaseSystemTopicClient createTransactionBufferSystemTopicClient( TopicName systemTopicName, SystemTopicTxnBufferSnapshotService systemTopicTxnBufferSnapshotService, Class schemaType) { From 5c2e2b4bc7ebb20b9edef8d79fcb872e827cc819 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 9 Oct 2024 23:56:57 +0800 Subject: [PATCH 5/5] fix bug --- bin/ledgers.md | 2 -- .../service/SystemTopicBasedTopicPoliciesService.java | 9 ++++++--- .../service/OneWayReplicatorUsingGlobalZKTest.java | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) delete mode 100644 bin/ledgers.md diff --git a/bin/ledgers.md b/bin/ledgers.md deleted file mode 100644 index 8f0f0066c3e56..0000000000000 --- a/bin/ledgers.md +++ /dev/null @@ -1,2 +0,0 @@ -- "q6/persist/botnet_legacy_proto_bytes-partition-1" - - \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 26a7f310cb7c6..6842bb196f228 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -133,9 +133,12 @@ public CompletableFuture deleteTopicPoliciesAsync(TopicName topicName) { if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject()) || isSelf(topicName)) { return CompletableFuture.completedFuture(null); } - return pulsarService.getNamespaceService().checkTopicExists(NamespaceEventsSystemTopicFactory - .getEventsTopicName(topicName.getNamespaceObject())).thenCompose(topicExistsInfo -> { - if (topicExistsInfo.isExists()) { + TopicName changeEvents = NamespaceEventsSystemTopicFactory.getEventsTopicName(topicName.getNamespaceObject()); + return pulsarService.getNamespaceService().checkTopicExists(changeEvents).thenCompose(topicExistsInfo -> { + // If the system topic named "__change_events" has been deleted, it means all the data in the topic have + // been deleted, so we do not need to delete the message that we want to delete again. + if (!topicExistsInfo.isExists()) { + log.info("Skip delete topic-level policies because {} has been removed before", changeEvents); return CompletableFuture.completedFuture(null); } return sendTopicPolicyEvent(topicName, ActionType.DELETE, null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index 7493e7024c19a..ad877e8f947b7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -203,7 +203,7 @@ public void testRemoveCluster() throws Exception { // The topics under the namespace of the cluster-1 will be deleted. // Verify the result. admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster2))); - Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + Awaitility.await().atMost(Duration.ofSeconds(120)).untilAsserted(() -> { Map>> tps = pulsar1.getBrokerService().getTopics(); assertFalse(tps.containsKey(topic)); assertFalse(tps.containsKey(topicChangeEvents));