Skip to content

Commit 46a07b9

Browse files
Change ConsumerConfig AckPolicy default to Explicit (#490)
* Change ConsumerConfig constructor to always set default AckPolicy * Update default AckPolicy for ConsumerConfig The default constructor for the ConsumerConfig class no longer sets the AckPolicy property to Explicit. Also, the values of the ConsumerConfigAckPolicy enumeration have been rearranged, making Explicit the default value (0). * Removed redundant default ACK policy assignments * Fixed consumer ack policy default * fixed test when ack policy is set to explicit, the default ack wait seems to be set to 30s by the server. --------- Co-authored-by: Ziya Suzen <ziya@suzen.net>
1 parent f6c6419 commit 46a07b9

File tree

9 files changed

+18
-14
lines changed

9 files changed

+18
-14
lines changed

sandbox/Example.JetStream.PullConsumer/Program.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
var js = new NatsJSContext(nats);
2121

22-
var consumer = await js.CreateOrUpdateConsumerAsync("s1", new ConsumerConfig { Name = "c1", DurableName = "c1", AckPolicy = ConsumerConfigAckPolicy.Explicit });
22+
var consumer = await js.CreateOrUpdateConsumerAsync("s1", new ConsumerConfig { Name = "c1", DurableName = "c1" });
2323

2424
var idle = TimeSpan.FromSeconds(5);
2525
var expires = TimeSpan.FromSeconds(10);

src/NATS.Client.JetStream/Models/ConsumerConfig.cs

+1-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ public ConsumerConfig(string name)
2929
{
3030
Name = name;
3131
DurableName = name;
32-
AckPolicy = ConsumerConfigAckPolicy.Explicit;
3332
}
3433

3534
[System.Text.Json.Serialization.JsonPropertyName("deliver_policy")]
@@ -91,7 +90,7 @@ public ConsumerConfig(string name)
9190
#if NET6_0
9291
[System.Text.Json.Serialization.JsonConverter(typeof(NatsJSJsonStringEnumConverter<ConsumerConfigAckPolicy>))]
9392
#endif
94-
public ConsumerConfigAckPolicy AckPolicy { get; set; } = ConsumerConfigAckPolicy.None;
93+
public ConsumerConfigAckPolicy AckPolicy { get; set; } = ConsumerConfigAckPolicy.Explicit;
9594

9695
/// <summary>
9796
/// How long (in nanoseconds) to allow messages to remain un-acknowledged before attempting redelivery

src/NATS.Client.JetStream/Models/ConsumerConfigAckPolicy.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ namespace NATS.Client.JetStream.Models;
22

33
public enum ConsumerConfigAckPolicy
44
{
5-
None = 0,
5+
Explicit = 0,
66
All = 1,
7-
Explicit = 2,
7+
None = 2,
88
}

tests/NATS.Client.CheckNativeAot/Program.cs

-3
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,6 @@ async Task JetStreamTests()
8686
{
8787
Name = "consumer1",
8888
DurableName = "consumer1",
89-
90-
// Turn on ACK so we can test them below
91-
AckPolicy = ConsumerConfigAckPolicy.Explicit,
9289
},
9390
cts1.Token);
9491
AssertEqual("events", consumer.Info.StreamName);

tests/NATS.Client.Core.MemoryTests/NatsConsumeTests.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public void Subscription_should_not_be_collected_when_in_consume_async_enumerato
2929
{
3030
await js.CreateStreamAsync(new StreamConfig { Name = "s1", Subjects = new[] { "s1.*" } });
3131

32-
var consumer = await js.CreateOrUpdateConsumerAsync("s1", new ConsumerConfig { Name = "c1", DurableName = "c1", AckPolicy = ConsumerConfigAckPolicy.Explicit });
32+
var consumer = await js.CreateOrUpdateConsumerAsync("s1", new ConsumerConfig { Name = "c1", DurableName = "c1" });
3333

3434
var count = 0;
3535
await foreach (var msg in consumer.ConsumeAsync<int>(opts: new NatsJSConsumeOpts { MaxMsgs = 100 }))

tests/NATS.Client.JetStream.Tests/JetStreamTest.cs

-3
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,6 @@ public async Task Create_stream_test()
7676
{
7777
Name = "consumer1",
7878
DurableName = "consumer1",
79-
80-
// Turn on ACK so we can test them below
81-
AckPolicy = ConsumerConfigAckPolicy.Explicit,
8279
},
8380
cts1.Token);
8481
Assert.Equal("events", consumer.Info.StreamName);

tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public async Task Consumer_create_update_action()
148148
// Update consumer
149149
{
150150
var c1 = await js.GetConsumerAsync("s1", "c1");
151-
Assert.Equal(default, c1.Info.Config.AckWait);
151+
Assert.Equal(TimeSpan.FromSeconds(30), c1.Info.Config.AckWait);
152152

153153
var changedConsumerConfig = new ConsumerConfig { Name = "c1", AckWait = TimeSpan.FromSeconds(10) };
154154
await js.UpdateConsumerAsync("s1", changedConsumerConfig);

tests/NATS.Client.JetStream.Tests/ParseJsonTests.cs

+12
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,16 @@ public void Placement_properties_should_be_optional()
2525
Assert.Null(result.Cluster);
2626
Assert.Null(result.Tags);
2727
}
28+
29+
[Fact]
30+
public void Default_consumer_ack_policy_should_be_explicit()
31+
{
32+
var serializer = NatsJSJsonSerializer<ConsumerConfig>.Default;
33+
34+
var bw = new NatsBufferWriter<byte>();
35+
serializer.Serialize(bw, new ConsumerConfig());
36+
37+
var json = Encoding.UTF8.GetString(bw.WrittenSpan);
38+
Assert.Matches("\"ack_policy\":\"explicit\"", json);
39+
}
2840
}

tests/NATS.Net.DocsExamples/JetStream/ManagingPage.cs

-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ public async Task Run()
7070
{
7171
Name = "durable_processor",
7272
DurableName = "durable_processor",
73-
AckPolicy = ConsumerConfigAckPolicy.Explicit,
7473
};
7574

7675
var consumer = await js.CreateOrUpdateConsumerAsync(stream: "orders", durableConfig);

0 commit comments

Comments
 (0)