Skip to content

Add support for Consumer Create Action #497

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion src/NATS.Client.JetStream/INatsJSContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ValueTask<INatsJSConsumer> CreateOrderedConsumerAsync(
/// <summary>
/// Creates new consumer if it doesn't exists or updates an existing one with the same name.
/// </summary>
/// <param name="stream">Name of the stream to create consumer under.</param>
/// <param name="stream">Name of the stream to create or update consumer under.</param>
/// <param name="config">Consumer configuration.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream consumer object which can be used retrieving data from the stream.</returns>
Expand All @@ -36,6 +36,38 @@ ValueTask<INatsJSConsumer> CreateOrUpdateConsumerAsync(
ConsumerConfig config,
CancellationToken cancellationToken = default);

/// <summary>
/// Creates new consumer if it doesn't exists.
/// </summary>
/// <param name="stream">Name of the stream to create consumer under.</param>
/// <param name="config">Consumer configuration.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream consumer object which can be used retrieving data from the stream.</returns>
/// <exception cref="NatsJSException">Ack policy is set to <c>none</c> or there was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
ValueTask<INatsJSConsumer> CreateConsumerAsync(
string stream,
ConsumerConfig config,
CancellationToken cancellationToken = default);

/// <summary>
/// Update consumer exists consumer
/// </summary>
/// <param name="stream">Name of the stream to update consumer under.</param>
/// <param name="config">Consumer configuration.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream consumer object which can be used retrieving data from the stream.</returns>
/// <exception cref="NatsJSException">Ack policy is set to <c>none</c> or there was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
ValueTask<INatsJSConsumer> UpdateConsumerAsync(
string stream,
ConsumerConfig config,
CancellationToken cancellationToken = default);

/// <summary>
/// Gets consumer information from the server and creates a NATS JetStream consumer <see cref="NatsJSConsumer"/>.
/// </summary>
Expand Down
29 changes: 29 additions & 0 deletions src/NATS.Client.JetStream/Internal/NatsJSJsonSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ internal static class NatsJSJsonSerializer<T>
new JsonStringEnumConverter<StreamConfigDiscard>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<StreamConfigRetention>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<StreamConfigStorage>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<ConsumerCreateAction>(JsonNamingPolicy.SnakeCaseLower),
},
}));
#endif
Expand Down Expand Up @@ -207,6 +208,19 @@ public override TEnum Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSe
}
}

if (typeToConvert == typeof(ConsumerCreateAction))
{
switch (stringValue)
{
case "create":
return (TEnum)(object)ConsumerCreateAction.Create;
case "update":
return (TEnum)(object)ConsumerCreateAction.Update;
default:
return (TEnum)(object)ConsumerCreateAction.Default;
}
}

throw new InvalidOperationException($"Reading unknown enum type {typeToConvert.Name} or value {stringValue}");
}

Expand Down Expand Up @@ -314,6 +328,21 @@ public override void Write(Utf8JsonWriter writer, TEnum value, JsonSerializerOpt
return;
}
}
else if (value is ConsumerCreateAction consumerCreateRequestAction)
{
switch (consumerCreateRequestAction)
{
case ConsumerCreateAction.Default:
// ignore default value
return;
case ConsumerCreateAction.Create:
writer.WriteStringValue("create");
return;
case ConsumerCreateAction.Update:
writer.WriteStringValue("update");
return;
}
}

throw new InvalidOperationException($"Writing unknown enum value {value.GetType().Name}.{value}");
}
Expand Down
8 changes: 8 additions & 0 deletions src/NATS.Client.JetStream/Models/ConsumerCreateAction.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace NATS.Client.JetStream.Models;

public enum ConsumerCreateAction
{
Default = 0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to make the default descriptive? I.e. CreateOrUpdate = 0 instead of Default = 0?

Or is it best to leave it vague as the NATS Server can decide the default?

Create = 1,
Update = 2,
}
9 changes: 9 additions & 0 deletions src/NATS.Client.JetStream/Models/ConsumerCreateRequest.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using NATS.Client.JetStream.Internal;

namespace NATS.Client.JetStream.Models;

/// <summary>
Expand Down Expand Up @@ -26,4 +28,11 @@ internal record ConsumerCreateRequest
[System.Text.Json.Serialization.JsonPropertyName("config")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)]
public ConsumerConfig? Config { get; set; }

[System.Text.Json.Serialization.JsonPropertyName("action")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
#if NET6_0
[System.Text.Json.Serialization.JsonConverter(typeof(NatsJSJsonStringEnumConverter<ConsumerCreateAction>))]
#endif
public ConsumerCreateAction Action { get; set; }
}
75 changes: 51 additions & 24 deletions src/NATS.Client.JetStream/NatsJSContext.Consumers.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System.Runtime.CompilerServices;
using NATS.Client.Core.Internal;
using NATS.Client.JetStream.Internal;
using NATS.Client.JetStream.Models;

namespace NATS.Client.JetStream;
Expand Down Expand Up @@ -33,31 +32,25 @@ public async ValueTask<INatsJSConsumer> CreateOrUpdateConsumerAsync(
CancellationToken cancellationToken = default)
{
ThrowIfInvalidStreamName(stream);
return await CreateOrUpdateConsumerInternalAsync(stream, config, default, cancellationToken);
}

// TODO: Adjust API subject according to server version and filter subject
var subject = $"{Opts.Prefix}.CONSUMER.CREATE.{stream}";

if (!string.IsNullOrWhiteSpace(config.Name))
{
subject += $".{config.Name}";
config.Name = default!;
}

if (!string.IsNullOrWhiteSpace(config.FilterSubject))
{
subject += $".{config.FilterSubject}";
}

var response = await JSRequestResponseAsync<ConsumerCreateRequest, ConsumerInfo>(
subject: subject,
new ConsumerCreateRequest
{
StreamName = stream,
Config = config,
},
cancellationToken);
public async ValueTask<INatsJSConsumer> CreateConsumerAsync(
string stream,
ConsumerConfig config,
CancellationToken cancellationToken = default)
{
ThrowIfInvalidStreamName(stream);
return await CreateOrUpdateConsumerInternalAsync(stream, config, ConsumerCreateAction.Create, cancellationToken);
}

return new NatsJSConsumer(this, response);
public async ValueTask<INatsJSConsumer> UpdateConsumerAsync(
string stream,
ConsumerConfig config,
CancellationToken cancellationToken = default)
{
ThrowIfInvalidStreamName(stream);
return await CreateOrUpdateConsumerInternalAsync(stream, config, ConsumerCreateAction.Update, cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -212,6 +205,7 @@ internal ValueTask<ConsumerInfo> CreateOrderedConsumerInternalAsync(
NumReplicas = 1,
MemStorage = true,
},
Action = ConsumerCreateAction.Create,
};

if (opts.OptStartSeq > 0)
Expand Down Expand Up @@ -242,4 +236,37 @@ internal ValueTask<ConsumerInfo> CreateOrderedConsumerInternalAsync(
request,
cancellationToken);
}

private async ValueTask<NatsJSConsumer> CreateOrUpdateConsumerInternalAsync(
string stream,
ConsumerConfig config,
ConsumerCreateAction action,
CancellationToken cancellationToken)
{
// TODO: Adjust API subject according to server version and filter subject
var subject = $"{Opts.Prefix}.CONSUMER.CREATE.{stream}";

if (!string.IsNullOrWhiteSpace(config.Name))
{
subject += $".{config.Name}";
config.Name = default!;
}

if (!string.IsNullOrWhiteSpace(config.FilterSubject))
{
subject += $".{config.FilterSubject}";
}

var response = await JSRequestResponseAsync<ConsumerCreateRequest, ConsumerInfo>(
subject: subject,
new ConsumerCreateRequest
{
StreamName = stream,
Config = config,
Action = action,
},
cancellationToken);

return new NatsJSConsumer(this, response);
}
}
12 changes: 6 additions & 6 deletions tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public async Task Consume_msgs_test()
var (nats, proxy) = server.CreateProxiedClientConnection();
var js = new NatsJSContext(nats);
await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

for (var i = 0; i < 30; i++)
{
Expand Down Expand Up @@ -128,7 +128,7 @@ public async Task Consume_idle_heartbeat_test()

var js = new NatsJSContext(nats);
await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

var ack = await js.PublishAsync("s1.foo", new TestData { Test = 0 }, serializer: TestDataJsonSerializer<TestData>.Default, cancellationToken: cts.Token);
ack.EnsureSuccess();
Expand Down Expand Up @@ -203,7 +203,7 @@ public async Task Consume_reconnect_test()
var js = new NatsJSContext(nats);
var js2 = new NatsJSContext(nats2);
await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

var consumerOpts = new NatsJSConsumeOpts
{
Expand Down Expand Up @@ -276,7 +276,7 @@ public async Task Consume_dispose_test()

var js = new NatsJSContext(nats);
var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
var consumer = (NatsJSConsumer)await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
var consumer = (NatsJSConsumer)await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

var consumerOpts = new NatsJSConsumeOpts
{
Expand Down Expand Up @@ -353,7 +353,7 @@ public async Task Consume_stop_test()

var js = new NatsJSContext(nats);
var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
var consumer = (NatsJSConsumer)await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
var consumer = (NatsJSConsumer)await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

var consumerOpts = new NatsJSConsumeOpts
{
Expand Down Expand Up @@ -433,7 +433,7 @@ public async Task Serialization_errors()
var ack = await js.PublishAsync("s1.foo", "not an int", cancellationToken: cts.Token);
ack.EnsureSuccess();

var consumer = await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
var consumer = await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

await foreach (var msg in consumer.ConsumeAsync<int>(cancellationToken: cts.Token))
{
Expand Down
6 changes: 3 additions & 3 deletions tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public async Task Fetch_test()
await using var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);
await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

for (var i = 0; i < 10; i++)
{
Expand Down Expand Up @@ -47,7 +47,7 @@ public async Task FetchNoWait_test()
await using var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);
await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

for (var i = 0; i < 10; i++)
{
Expand Down Expand Up @@ -77,7 +77,7 @@ public async Task Fetch_dispose_test()

var js = new NatsJSContext(nats);
var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
var consumer = (NatsJSConsumer)await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
var consumer = (NatsJSConsumer)await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

var fetchOpts = new NatsJSFetchOpts
{
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.JetStream.Tests/ConsumerNextTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public async Task Next_test()
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
var consumer = await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
var consumer = await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

for (var i = 0; i < 10; i++)
{
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.JetStream.Tests/CustomSerializerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public async Task When_consuming_ack_should_be_serialized_normally_if_custom_ser
await js.PublishAsync("s1.1", new byte[] { 0 }, cancellationToken: cts.Token);
await js.PublishAsync("s1.2", new byte[] { 0 }, cancellationToken: cts.Token);

var consumer = await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
var consumer = await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

// single ack
{
Expand Down
4 changes: 2 additions & 2 deletions tests/NATS.Client.JetStream.Tests/DoubleAckNakDelayTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public async Task Double_ack_received_messages()
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
var consumer = await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
var consumer = await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

var ack = await js.PublishAsync("s1.foo", 42, cancellationToken: cts.Token);
ack.EnsureSuccess();
Expand Down Expand Up @@ -54,7 +54,7 @@ public async Task Delay_nak_received_messages()
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
var consumer = await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
var consumer = await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

var ack = await js.PublishAsync("s1.foo", 42, cancellationToken: cts.Token);
ack.EnsureSuccess();
Expand Down
4 changes: 2 additions & 2 deletions tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public async Task Fetch_should_not_block_socket()

// fetch loop
{
var consumer = (NatsJSConsumer)await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
var consumer = (NatsJSConsumer)await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

var fetchOpts = new NatsJSFetchOpts
{
Expand All @@ -45,7 +45,7 @@ public async Task Fetch_should_not_block_socket()

// consume loop
{
var consumer = (NatsJSConsumer)await js.CreateConsumerAsync("s1", "c2", cancellationToken: cts.Token);
var consumer = (NatsJSConsumer)await js.CreateOrUpdateConsumerAsync("s1", "c2", cancellationToken: cts.Token);

var opts = new NatsJSConsumeOpts
{
Expand Down
19 changes: 19 additions & 0 deletions tests/NATS.Client.JetStream.Tests/EnumJsonTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,23 @@ public void StreamConfigStorage_test(StreamConfigStorage value, string expected)
Assert.NotNull(result);
Assert.Equal(value, result.Storage);
}

[Theory]
[InlineData(ConsumerCreateAction.Create, "{\"stream_name\":\"\",\"config\":null,\"action\":\"create\"}")]
[InlineData(ConsumerCreateAction.Update, "{\"stream_name\":\"\",\"config\":null,\"action\":\"update\"}")]
[InlineData(ConsumerCreateAction.Default, "{\"stream_name\":\"\",\"config\":null}")]
public void ConsumerCreateRequestAction_Test(ConsumerCreateAction value, string expected)
{
var serializer = NatsJSJsonSerializer<ConsumerCreateRequest>.Default;

var bw = new NatsBufferWriter<byte>();
serializer.Serialize(bw, new ConsumerCreateRequest { Action = value, StreamName = string.Empty });

var json = Encoding.UTF8.GetString(bw.WrittenSpan);
Assert.Contains(expected, json);

var result = serializer.Deserialize(new ReadOnlySequence<byte>(bw.WrittenMemory));
Assert.NotNull(result);
Assert.Equal(value, result.Action);
}
}
Loading
Loading