Skip to content

Commit d0b4e78

Browse files
committed
Repro of datastax#112
1 parent 8286e8c commit d0b4e78

File tree

1 file changed

+184
-0
lines changed

1 file changed

+184
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.tests.integration.topics;
20+
21+
import lombok.extern.slf4j.Slf4j;
22+
import org.apache.pulsar.client.api.Consumer;
23+
import org.apache.pulsar.client.api.Message;
24+
import org.apache.pulsar.client.api.Producer;
25+
import org.apache.pulsar.client.api.PulsarClient;
26+
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
27+
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
28+
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
29+
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
30+
import org.testng.annotations.Test;
31+
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.function.Supplier;
34+
35+
import static java.nio.charset.StandardCharsets.UTF_8;
36+
import static org.testng.Assert.assertEquals;
37+
import static org.testng.Assert.assertNotEquals;
38+
import static org.testng.Assert.fail;
39+
40+
/**
41+
* Test cases for compaction.
42+
*/
43+
@Slf4j
44+
public class TestTopicDeletion extends PulsarTestSuite {
45+
46+
final private boolean unload = false;
47+
final private int numBrokers = 2;
48+
49+
public void setupCluster() throws Exception {
50+
brokerEnvs.put("managedLedgerMaxEntriesPerLedger", "10");
51+
brokerEnvs.put("brokerDeleteInactivePartitionedTopicMetadataEnabled", "false");
52+
brokerEnvs.put("brokerDeleteInactiveTopicsEnabled", "false");
53+
this.setupCluster("");
54+
}
55+
56+
protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
57+
String clusterName,
58+
PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
59+
specBuilder.numBrokers(numBrokers);
60+
specBuilder.enableContainerLog(true);
61+
return specBuilder;
62+
}
63+
64+
@Test(dataProvider = "ServiceUrls", timeOut=600_000)
65+
public void testPartitionedTopicForceDeletion(Supplier<String> serviceUrl) throws Exception {
66+
67+
log.info("Creating tenant and namespace");
68+
69+
final String tenant = "test-partitioned-topic-" + randomName(4);
70+
final String namespace = tenant + "/ns1";
71+
final String topic = "persistent://" + namespace + "/partitioned-topic";
72+
final int numPartitions = numBrokers * 3;
73+
final int numKeys = numPartitions * 50;
74+
final String subscriptionName = "sub1";
75+
76+
this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin");
77+
78+
this.createNamespace(namespace);
79+
80+
pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
81+
"set-clusters", "--clusters", pulsarCluster.getClusterName(), namespace);
82+
83+
pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
84+
"set-retention", "--size", "100M", "--time", "100m", namespace);
85+
86+
this.createPartitionedTopic(topic, numPartitions);
87+
88+
try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build()) {
89+
90+
log.info("Creating consumer");
91+
Consumer<byte[]> consumer = client.newConsumer()
92+
.topic(topic)
93+
.subscriptionName(subscriptionName)
94+
.subscribe();
95+
96+
log.info("Producing messages");
97+
try(Producer<byte[]> producer = client.newProducer()
98+
.topic(topic)
99+
.create()
100+
) {
101+
for (int i = 0; i < numKeys; i++) {
102+
producer.newMessage()
103+
.key("" + i)
104+
.value(("value-" + i).getBytes(UTF_8))
105+
.sendAsync();
106+
}
107+
producer.flush();
108+
log.info("Successfully wrote {} values", numKeys);
109+
}
110+
111+
log.info("Consuming half of the messages");
112+
for (int i = 0; i < numKeys / 2; i++) {
113+
Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES);
114+
log.info("Read value {}", m.getKey());
115+
}
116+
117+
if (unload) {
118+
log.info("Unloading topic");
119+
pulsarCluster.runAdminCommandOnAnyBroker("topics",
120+
"unload", topic);
121+
}
122+
123+
ContainerExecResult res;
124+
log.info("Deleting the topic");
125+
res = pulsarCluster.runAdminCommandOnAnyBroker("topics",
126+
"delete-partitioned-topic", "--force", topic);
127+
consumer.close();
128+
assertEquals(0, res.getExitCode());
129+
130+
// parts of topic deletion are async, let them finish
131+
Thread.sleep(5000);
132+
133+
log.info("Deleting the topic again");
134+
try {
135+
res = pulsarCluster.runAdminCommandOnAnyBroker("topics",
136+
"delete-partitioned-topic", "--force", topic);
137+
assertNotEquals(0, res.getExitCode());
138+
} catch (ContainerExecException e) {
139+
log.info("Second delete failed with ContainerExecException, could be ok", e);
140+
if (!e.getMessage().contains("with error code 1")) {
141+
fail("Expected different error code");
142+
}
143+
}
144+
145+
// should succeed
146+
log.info("Creating the topic again");
147+
this.createPartitionedTopic(topic, numPartitions);
148+
}
149+
}
150+
151+
152+
private ContainerExecResult createTenantName(final String tenantName,
153+
final String allowedClusterName,
154+
final String adminRoleName) throws Exception {
155+
ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
156+
"tenants", "create", "--allowed-clusters", allowedClusterName,
157+
"--admin-roles", adminRoleName, tenantName);
158+
assertEquals(0, result.getExitCode());
159+
return result;
160+
}
161+
162+
private ContainerExecResult createNamespace(final String Ns) throws Exception {
163+
ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
164+
"namespaces",
165+
"create",
166+
"--clusters",
167+
pulsarCluster.getClusterName(), Ns);
168+
assertEquals(0, result.getExitCode());
169+
return result;
170+
}
171+
172+
private ContainerExecResult createPartitionedTopic(final String partitionedTopicName, int numPartitions)
173+
throws Exception {
174+
ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
175+
"topics",
176+
"create-partitioned-topic",
177+
"--partitions", "" + numPartitions,
178+
partitionedTopicName);
179+
assertEquals(0, result.getExitCode());
180+
return result;
181+
}
182+
183+
184+
}

0 commit comments

Comments
 (0)