Skip to content

Commit f07b3a0

Browse files
authored
[improve] [broker] [break change] Do not create partitioned DLQ/Retry topic automatically (apache#22705)
1 parent 7befb8d commit f07b3a0

File tree

2 files changed

+257
-0
lines changed

2 files changed

+257
-0
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

+6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import static org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY;
2525
import static org.apache.commons.collections4.CollectionUtils.isEmpty;
2626
import static org.apache.commons.lang3.StringUtils.isNotBlank;
27+
import static org.apache.pulsar.client.util.RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
28+
import static org.apache.pulsar.client.util.RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
2729
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
2830
import com.google.common.annotations.VisibleForTesting;
2931
import com.google.common.collect.Queues;
@@ -3493,6 +3495,10 @@ private CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final TopicName
34933495
}
34943496

34953497
public boolean isDefaultTopicTypePartitioned(final TopicName topicName, final Optional<Policies> policies) {
3498+
if (topicName.getPartitionedTopicName().endsWith(DLQ_GROUP_TOPIC_SUFFIX)
3499+
|| topicName.getPartitionedTopicName().endsWith(RETRY_GROUP_TOPIC_SUFFIX)) {
3500+
return false;
3501+
}
34963502
AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, policies);
34973503
if (autoTopicCreationOverride != null) {
34983504
return TopicType.PARTITIONED.toString().equals(autoTopicCreationOverride.getTopicType());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api;
20+
21+
import static org.apache.pulsar.client.util.RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
22+
import static org.testng.Assert.assertFalse;
23+
import static org.testng.Assert.assertTrue;
24+
import static org.testng.Assert.fail;
25+
import java.time.Duration;
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
import java.util.concurrent.TimeUnit;
29+
import lombok.extern.slf4j.Slf4j;
30+
import org.apache.pulsar.broker.BrokerTestUtil;
31+
import org.apache.pulsar.common.naming.TopicDomain;
32+
import org.apache.pulsar.common.naming.TopicName;
33+
import org.apache.pulsar.common.policies.data.TopicType;
34+
import org.awaitility.Awaitility;
35+
import org.testng.annotations.AfterClass;
36+
import org.testng.annotations.BeforeClass;
37+
import org.testng.annotations.DataProvider;
38+
import org.testng.annotations.Test;
39+
40+
@Slf4j
41+
@Test(groups = "broker-impl")
42+
public class DeadLetterTopicDefaultMultiPartitionsTest extends ProducerConsumerBase {
43+
44+
@BeforeClass(alwaysRun = true)
45+
@Override
46+
protected void setup() throws Exception {
47+
this.conf.setMaxMessageSize(5 * 1024);
48+
this.conf.setAllowAutoTopicCreation(true);
49+
this.conf.setDefaultNumPartitions(2);
50+
this.conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
51+
super.internalSetup();
52+
super.producerBaseSetup();
53+
}
54+
55+
@AfterClass(alwaysRun = true)
56+
@Override
57+
protected void cleanup() throws Exception {
58+
super.internalCleanup();
59+
}
60+
61+
private void triggerDLQGenerate(String topic, String subscription) throws Exception {
62+
String DLQ = getDLQName(topic, subscription);
63+
String p0OfDLQ = TopicName.get(DLQ).getPartition(0).toString();
64+
Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(subscription)
65+
.ackTimeout(1000, TimeUnit.MILLISECONDS)
66+
.subscriptionType(SubscriptionType.Shared)
67+
.receiverQueueSize(10)
68+
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
69+
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
70+
.subscribe();
71+
Producer producer = pulsarClient.newProducer().topic(topic).create();
72+
producer.newMessage().value(new byte[]{1}).send();
73+
74+
Message<Integer> message1 = consumer.receive();
75+
consumer.negativeAcknowledge(message1);
76+
Message<Integer> message2 = consumer.receive();
77+
consumer.negativeAcknowledge(message2);
78+
79+
Awaitility.await().atMost(Duration.ofSeconds(1500)).until(() -> {
80+
Message<Integer> message3 = consumer.receive(2, TimeUnit.SECONDS);
81+
if (message3 != null) {
82+
log.info("===> {}", message3.getRedeliveryCount());
83+
consumer.negativeAcknowledge(message3);
84+
}
85+
List<String> topicList = pulsar.getPulsarResources().getTopicResources()
86+
.listPersistentTopicsAsync(TopicName.get(topic).getNamespaceObject()).join();
87+
if (topicList.contains(DLQ) || topicList.contains(p0OfDLQ)) {
88+
return true;
89+
}
90+
int partitions = admin.topics().getPartitionedTopicMetadata(topic).partitions;
91+
for (int i = 0; i < partitions; i++) {
92+
for (int j = -1; j < pulsar.getConfig().getDefaultNumPartitions(); j++) {
93+
String p0OfDLQ2;
94+
if (j == -1) {
95+
p0OfDLQ2 = TopicName
96+
.get(getDLQName(TopicName.get(topic).getPartition(i).toString(), subscription))
97+
.toString();
98+
} else {
99+
p0OfDLQ2 = TopicName
100+
.get(getDLQName(TopicName.get(topic).getPartition(i).toString(), subscription))
101+
.getPartition(j).toString();
102+
}
103+
if (topicList.contains(p0OfDLQ2)) {
104+
return true;
105+
}
106+
}
107+
}
108+
return false;
109+
});
110+
producer.close();
111+
consumer.close();
112+
admin.topics().unload(topic);
113+
}
114+
115+
private static String getDLQName(String primaryTopic, String subscription) {
116+
String domain = TopicName.get(primaryTopic).getDomain().toString();
117+
return domain + "://" + TopicName.get(primaryTopic)
118+
.toString().substring(( domain + "://").length())
119+
+ "-" + subscription + DLQ_GROUP_TOPIC_SUFFIX;
120+
}
121+
122+
@DataProvider(name = "topicCreationTypes")
123+
public Object[][] topicCreationTypes() {
124+
return new Object[][]{
125+
//{TopicType.NON_PARTITIONED},
126+
{TopicType.PARTITIONED}
127+
};
128+
}
129+
130+
@Test(dataProvider = "topicCreationTypes")
131+
public void testGenerateNonPartitionedDLQ(TopicType topicType) throws Exception {
132+
final String topic = BrokerTestUtil.newUniqueName( "persistent://public/default/tp");
133+
final String subscription = "s1";
134+
switch (topicType) {
135+
case PARTITIONED: {
136+
admin.topics().createPartitionedTopic(topic, 2);
137+
break;
138+
}
139+
case NON_PARTITIONED: {
140+
admin.topics().createNonPartitionedTopic(topic);
141+
}
142+
}
143+
144+
triggerDLQGenerate(topic, subscription);
145+
146+
// Verify: no partitioned DLQ.
147+
List<String> partitionedTopics = pulsar.getPulsarResources().getNamespaceResources()
148+
.getPartitionedTopicResources()
149+
.listPartitionedTopicsAsync(TopicName.get(topic).getNamespaceObject(), TopicDomain.persistent).join();
150+
for (String tp : partitionedTopics) {
151+
assertFalse(tp.endsWith("-DLQ"));
152+
}
153+
// Verify: non-partitioned DLQ exists.
154+
List<String> partitions = pulsar.getPulsarResources().getTopicResources()
155+
.listPersistentTopicsAsync(TopicName.get(topic).getNamespaceObject()).join();
156+
List<String> DLQCreated = new ArrayList<>();
157+
for (String tp : partitions) {
158+
if (tp.endsWith("-DLQ")) {
159+
DLQCreated.add(tp);
160+
}
161+
assertFalse(tp.endsWith("-partition-0-DLQ"));
162+
}
163+
assertTrue(!DLQCreated.isEmpty());
164+
165+
// cleanup.
166+
switch (topicType) {
167+
case PARTITIONED: {
168+
admin.topics().deletePartitionedTopic(topic);
169+
break;
170+
}
171+
case NON_PARTITIONED: {
172+
admin.topics().delete(topic, false);
173+
}
174+
}
175+
for (String t : DLQCreated) {
176+
try {
177+
admin.topics().delete(TopicName.get(t).getPartitionedTopicName(), false);
178+
} catch (Exception ex) {}
179+
try {
180+
admin.topics().deletePartitionedTopic(TopicName.get(t).getPartitionedTopicName(), false);
181+
} catch (Exception ex) {}
182+
}
183+
}
184+
185+
@Test
186+
public void testManuallyCreatePartitionedDLQ() throws Exception {
187+
final String topic = BrokerTestUtil.newUniqueName( "persistent://public/default/tp");
188+
final String subscription = "s1";
189+
String DLQ = getDLQName(topic, subscription);
190+
String p0OfDLQ = TopicName.get(DLQ).getPartition(0).toString();
191+
String p1OfDLQ = TopicName.get(DLQ).getPartition(1).toString();
192+
admin.topics().createNonPartitionedTopic(topic);
193+
admin.topics().createPartitionedTopic(DLQ, 2);
194+
195+
Awaitility.await().untilAsserted(() -> {
196+
// Verify: partitioned DLQ exists.
197+
List<String> partitionedTopics = pulsar.getPulsarResources().getNamespaceResources()
198+
.getPartitionedTopicResources()
199+
.listPartitionedTopicsAsync(TopicName.get(topic).getNamespaceObject(), TopicDomain.persistent).join();
200+
assertTrue(partitionedTopics.contains(DLQ));
201+
assertFalse(partitionedTopics.contains(p0OfDLQ));
202+
// Verify: DLQ partitions exists.
203+
List<String> partitions = pulsar.getPulsarResources().getTopicResources()
204+
.listPersistentTopicsAsync(TopicName.get(topic).getNamespaceObject()).join();
205+
assertFalse(partitions.contains(DLQ));
206+
assertTrue(partitions.contains(p0OfDLQ));
207+
assertTrue(partitions.contains(p1OfDLQ));
208+
});
209+
210+
// cleanup.
211+
admin.topics().delete(topic, false);
212+
admin.topics().deletePartitionedTopic(DLQ, false);
213+
}
214+
215+
@Test
216+
public void testManuallyCreatePartitionedDLQ2() throws Exception {
217+
final String topic = BrokerTestUtil.newUniqueName( "persistent://public/default/tp");
218+
final String subscription = "s1";
219+
final String p0OfTopic = TopicName.get(topic).getPartition(0).toString();
220+
String DLQ = getDLQName(p0OfTopic, subscription);
221+
String p0OfDLQ = TopicName.get(DLQ).getPartition(0).toString();
222+
admin.topics().createPartitionedTopic(topic, 10);
223+
try {
224+
admin.topics().createPartitionedTopic(DLQ, 2);
225+
} catch (Exception ex) {
226+
// Keep multiple versions compatible.
227+
if (ex.getMessage().contains("Partitioned Topic Name should not contain '-partition-'")){
228+
return;
229+
} else {
230+
fail("Failed to create partitioned DLQ");
231+
}
232+
}
233+
234+
Awaitility.await().untilAsserted(() -> {
235+
// Verify: partitioned DLQ exists.
236+
List<String> partitionedTopics = pulsar.getPulsarResources().getNamespaceResources()
237+
.getPartitionedTopicResources()
238+
.listPartitionedTopicsAsync(TopicName.get(topic).getNamespaceObject(), TopicDomain.persistent).join();
239+
assertTrue(partitionedTopics.contains(DLQ));
240+
assertFalse(partitionedTopics.contains(p0OfDLQ));
241+
// Verify: DLQ partitions exists.
242+
List<String> partitions = pulsar.getPulsarResources().getTopicResources()
243+
.listPersistentTopicsAsync(TopicName.get(topic).getNamespaceObject()).join();
244+
assertFalse(partitions.contains(DLQ));
245+
});
246+
247+
// cleanup.
248+
admin.topics().deletePartitionedTopic(topic, false);
249+
admin.topics().deletePartitionedTopic(DLQ, false);
250+
}
251+
}

0 commit comments

Comments
 (0)