Skip to content

Commit

Permalink
[fix][broker] Fix missing validation when setting retention policy on…
Browse files Browse the repository at this point in the history
… topic level (apache#24032)

Co-authored-by: 张浩 <zhanghao60@100.me>
  • Loading branch information
zhanghaou and 张浩 authored Mar 3, 2025
1 parent d0025e7 commit 1eb7866
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,21 @@ protected void validatePersistencePolicies(PersistencePolicies persistence) {

}

protected void validateRetentionPolicies(RetentionPolicies retention) {
if (retention == null) {
return;
}
checkArgument(retention.getRetentionSizeInMB() >= -1,
"Invalid retention policy: size limit must be >= -1");
checkArgument(retention.getRetentionTimeInMinutes() >= -1,
"Invalid retention policy: time limit must be >= -1");
checkArgument((retention.getRetentionTimeInMinutes() != 0 && retention.getRetentionSizeInMB() != 0)
|| (retention.getRetentionTimeInMinutes() == 0 && retention.getRetentionSizeInMB() == 0),
"Invalid retention policy: Setting a single time or size limit to 0 is invalid when "
+ "one of the limits has a non-zero value. Use the value of -1 instead of 0 to ignore a "
+ "specific limit. To disable retention both limits must be set to 0.");
}

protected void validateEntryFilters(EntryFilters entryFilters) {
if (entryFilters == null) {
// remove entry filters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1995,21 +1995,6 @@ private CompletableFuture<Void> validatePoliciesAsync(NamespaceName ns, Policies
});
}

protected void validateRetentionPolicies(RetentionPolicies retention) {
if (retention == null) {
return;
}
checkArgument(retention.getRetentionSizeInMB() >= -1,
"Invalid retention policy: size limit must be >= -1");
checkArgument(retention.getRetentionTimeInMinutes() >= -1,
"Invalid retention policy: time limit must be >= -1");
checkArgument((retention.getRetentionTimeInMinutes() != 0 && retention.getRetentionSizeInMB() != 0)
|| (retention.getRetentionTimeInMinutes() == 0 && retention.getRetentionSizeInMB() == 0),
"Invalid retention policy: Setting a single time or size limit to 0 is invalid when "
+ "one of the limits has a non-zero value. Use the value of -1 instead of 0 to ignore a "
+ "specific limit. To disable retention both limits must be set to 0.");
}

protected void internalSetDeduplicationSnapshotInterval(Integer interval) {
validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE);
if (interval != null && interval < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3489,6 +3489,7 @@ protected CompletableFuture<RetentionPolicies> internalGetRetention(boolean appl
}

protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retention, boolean isGlobal) {
validateRetentionPolicies(retention);
if (retention == null) {
return CompletableFuture.completedFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.BadRequestException;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
Expand Down Expand Up @@ -634,6 +635,35 @@ public void testSetRetention() throws Exception {
admin.topics().deletePartitionedTopic(testTopic, true);
}

@Test
public void testRetentionPolicyValidation() throws Exception {
// should pass
admin.topicPolicies().setRetention(testTopic, new RetentionPolicies());
admin.topicPolicies().setRetention(testTopic, new RetentionPolicies(-1, -1));
admin.topicPolicies().setRetention(testTopic, new RetentionPolicies(1, 1));

// should not pass validation
assertInvalidRetentionPolicy(testTopic, 1, 0);
assertInvalidRetentionPolicy(testTopic, 0, 1);
assertInvalidRetentionPolicy(testTopic, -1, 0);
assertInvalidRetentionPolicy(testTopic, 0, -1);
assertInvalidRetentionPolicy(testTopic, -2, 1);
assertInvalidRetentionPolicy(testTopic, 1, -2);

admin.topics().deletePartitionedTopic(testTopic, true);
}

private void assertInvalidRetentionPolicy(String topicName, int retentionTimeInMinutes, int retentionSizeInMB) {
try {
RetentionPolicies retention = new RetentionPolicies(retentionTimeInMinutes, retentionSizeInMB);
admin.topicPolicies().setRetention(topicName, retention);
fail("Validation should have failed for " + retention);
} catch (PulsarAdminException e) {
assertTrue(e.getCause() instanceof BadRequestException);
assertTrue(e.getMessage().startsWith("Invalid retention policy"));
}
}

@Test
public void testRemoveRetention() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ public void testSeekByFunctionAndMultiTopic() throws Exception {
public void testSeekWillNotEncounteredFencedError() throws Exception {
String topicName = "persistent://prop/ns-abc/my-topic2";
admin.topics().createNonPartitionedTopic(topicName);
admin.topicPolicies().setRetention(topicName, new RetentionPolicies(3600, 0));
admin.topicPolicies().setRetention(topicName, new RetentionPolicies(3600, -1));
// Create a pulsar client with a subscription fenced counter.
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
AtomicInteger receivedFencedErrorCounter = new AtomicInteger();
Expand Down

0 comments on commit 1eb7866

Please sign in to comment.