diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index dc92aeb0c7703..5f2f031a2d400 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -857,6 +857,21 @@ protected void validatePersistencePolicies(PersistencePolicies persistence) { } + protected void validateRetentionPolicies(RetentionPolicies retention) { + if (retention == null) { + return; + } + checkArgument(retention.getRetentionSizeInMB() >= -1, + "Invalid retention policy: size limit must be >= -1"); + checkArgument(retention.getRetentionTimeInMinutes() >= -1, + "Invalid retention policy: time limit must be >= -1"); + checkArgument((retention.getRetentionTimeInMinutes() != 0 && retention.getRetentionSizeInMB() != 0) + || (retention.getRetentionTimeInMinutes() == 0 && retention.getRetentionSizeInMB() == 0), + "Invalid retention policy: Setting a single time or size limit to 0 is invalid when " + + "one of the limits has a non-zero value. Use the value of -1 instead of 0 to ignore a " + + "specific limit. To disable retention both limits must be set to 0."); + } + protected void validateEntryFilters(EntryFilters entryFilters) { if (entryFilters == null) { // remove entry filters diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index ca4c685b2806a..c866d2d6f8af2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1995,21 +1995,6 @@ private CompletableFuture validatePoliciesAsync(NamespaceName ns, Policies }); } - protected void validateRetentionPolicies(RetentionPolicies retention) { - if (retention == null) { - return; - } - checkArgument(retention.getRetentionSizeInMB() >= -1, - "Invalid retention policy: size limit must be >= -1"); - checkArgument(retention.getRetentionTimeInMinutes() >= -1, - "Invalid retention policy: time limit must be >= -1"); - checkArgument((retention.getRetentionTimeInMinutes() != 0 && retention.getRetentionSizeInMB() != 0) - || (retention.getRetentionTimeInMinutes() == 0 && retention.getRetentionSizeInMB() == 0), - "Invalid retention policy: Setting a single time or size limit to 0 is invalid when " - + "one of the limits has a non-zero value. Use the value of -1 instead of 0 to ignore a " - + "specific limit. To disable retention both limits must be set to 0."); - } - protected void internalSetDeduplicationSnapshotInterval(Integer interval) { validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE); if (interval != null && interval < 0) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index e88b1110d0ad7..ed47650f42a64 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -3489,6 +3489,7 @@ protected CompletableFuture internalGetRetention(boolean appl } protected CompletableFuture internalSetRetention(RetentionPolicies retention, boolean isGlobal) { + validateRetentionPolicies(retention); if (retention == null) { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index dc9a7ec4429fc..c6a78d275dd27 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -39,6 +39,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import javax.ws.rs.BadRequestException; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -634,6 +635,35 @@ public void testSetRetention() throws Exception { admin.topics().deletePartitionedTopic(testTopic, true); } + @Test + public void testRetentionPolicyValidation() throws Exception { + // should pass + admin.topicPolicies().setRetention(testTopic, new RetentionPolicies()); + admin.topicPolicies().setRetention(testTopic, new RetentionPolicies(-1, -1)); + admin.topicPolicies().setRetention(testTopic, new RetentionPolicies(1, 1)); + + // should not pass validation + assertInvalidRetentionPolicy(testTopic, 1, 0); + assertInvalidRetentionPolicy(testTopic, 0, 1); + assertInvalidRetentionPolicy(testTopic, -1, 0); + assertInvalidRetentionPolicy(testTopic, 0, -1); + assertInvalidRetentionPolicy(testTopic, -2, 1); + assertInvalidRetentionPolicy(testTopic, 1, -2); + + admin.topics().deletePartitionedTopic(testTopic, true); + } + + private void assertInvalidRetentionPolicy(String topicName, int retentionTimeInMinutes, int retentionSizeInMB) { + try { + RetentionPolicies retention = new RetentionPolicies(retentionTimeInMinutes, retentionSizeInMB); + admin.topicPolicies().setRetention(topicName, retention); + fail("Validation should have failed for " + retention); + } catch (PulsarAdminException e) { + assertTrue(e.getCause() instanceof BadRequestException); + assertTrue(e.getMessage().startsWith("Invalid retention policy")); + } + } + @Test public void testRemoveRetention() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index 3a9c5c43f1c54..4970dc8818854 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -928,7 +928,7 @@ public void testSeekByFunctionAndMultiTopic() throws Exception { public void testSeekWillNotEncounteredFencedError() throws Exception { String topicName = "persistent://prop/ns-abc/my-topic2"; admin.topics().createNonPartitionedTopic(topicName); - admin.topicPolicies().setRetention(topicName, new RetentionPolicies(3600, 0)); + admin.topicPolicies().setRetention(topicName, new RetentionPolicies(3600, -1)); // Create a pulsar client with a subscription fenced counter. ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); AtomicInteger receivedFencedErrorCounter = new AtomicInteger();