Skip to content

Commit 84e64f9

Browse files
committed
nats-io#636 - JetStream Batch Get Client support
* Added specialized methods
1 parent 26d42c4 commit 84e64f9

File tree

4 files changed

+64
-69
lines changed

4 files changed

+64
-69
lines changed

src/NATS.Client.JetStream/INatsJSStream.cs

+30-2
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,38 @@ ValueTask UpdateAsync(
117117
/// </summary>
118118
/// <param name="request">Batch message request.</param>
119119
/// <param name="serializer">Serializer to use for the message type.</param>
120-
/// <param name="includeEob"><c>true</c> to send the last empty message with eobCode in the header; otherwise <c>false</c></param>
121120
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
121+
/// <typeparam name="T">Message type to deserialize.</typeparam>
122122
/// <exception cref="InvalidOperationException">There was an issue, stream must have allow direct set.</exception>
123-
IAsyncEnumerable<NatsMsg<T>> GetBatchDirectAsync<T>(StreamMsgBatchGetRequest request, INatsDeserialize<T>? serializer = default, bool includeEob = false, CancellationToken cancellationToken = default);
123+
IAsyncEnumerable<NatsMsg<T>> GetBatchDirectAsync<T>(StreamMsgBatchGetRequest request, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default);
124+
125+
/// <summary>
126+
/// Request a direct batch message
127+
/// </summary>
128+
/// <param name="multiLastBySubjects">Return last messages matching the subjects</param>
129+
/// <param name="batch">The maximum amount of messages to be returned</param>
130+
/// <param name="serializer">Serializer to use for the message type.</param>
131+
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
132+
/// <typeparam name="T">Message type to deserialize.</typeparam>
133+
/// <remarks>
134+
/// Get up to batch number of messages for subject
135+
/// </remarks>
136+
/// <exception cref="InvalidOperationException">There was an issue, stream must have allow direct set.</exception>
137+
IAsyncEnumerable<NatsMsg<T>> GetBatchDirectAsync<T>(string[] multiLastBySubjects, ulong batch, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default);
138+
139+
/// <summary>
140+
/// Request a direct batch message
141+
/// </summary>
142+
/// <param name="nextBySubject">The subject used filter messages that should be returned</param>
143+
/// <param name="batch">The maximum amount of messages to be returned</param>
144+
/// <param name="serializer">Serializer to use for the message type.</param>
145+
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
146+
/// <typeparam name="T">Message type to deserialize.</typeparam>
147+
/// <remarks>
148+
/// Get the last message for each subject in the list up to the batch size
149+
/// </remarks>
150+
/// <exception cref="InvalidOperationException">There was an issue, stream must have allow direct set.</exception>
151+
IAsyncEnumerable<NatsMsg<T>> GetBatchDirectAsync<T>(string nextBySubject, ulong batch, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default);
124152

125153
ValueTask<StreamMsgGetResponse> GetAsync(StreamMsgGetRequest request, CancellationToken cancellationToken = default);
126154
}

src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs

+11-13
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using System.ComponentModel.DataAnnotations;
21
using System.Text.Json.Serialization;
32

43
namespace NATS.Client.JetStream.Models;
@@ -13,23 +12,23 @@ public record StreamMsgBatchGetRequest
1312
/// </summary>
1413
[JsonPropertyName("batch")]
1514
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
16-
[Range(-1, int.MaxValue)]
17-
public int Batch { get; set; }
15+
[System.ComponentModel.DataAnnotations.Range(ulong.MinValue, ulong.MaxValue)]
16+
public ulong Batch { get; set; }
1817

1918
/// <summary>
2019
/// The maximum amount of returned bytes for this request.
2120
/// </summary>
2221
[JsonPropertyName("max_bytes")]
2322
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
24-
[Range(-1, int.MaxValue)]
25-
public int MaxBytes { get; set; }
23+
[System.ComponentModel.DataAnnotations.Range(ulong.MinValue, long.MaxValue)]
24+
public ulong MaxBytes { get; set; }
2625

2726
/// <summary>
2827
/// The minimum sequence for returned message
2928
/// </summary>
3029
[JsonPropertyName("seq")]
3130
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
32-
[Range(ulong.MinValue, ulong.MaxValue)]
31+
[System.ComponentModel.DataAnnotations.Range(ulong.MinValue, ulong.MaxValue)]
3332
public ulong Seq { get; set; }
3433

3534
/// <summary>
@@ -43,13 +42,12 @@ public record StreamMsgBatchGetRequest
4342
/// The subject used filter messages that should be returned
4443
/// </summary>
4544
[JsonPropertyName("next_by_subj")]
46-
[JsonIgnore(Condition = JsonIgnoreCondition.Never)]
47-
[Required]
45+
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
4846
#if NET6_0
49-
public string NextBySubject { get; set; } = default!;
47+
public string? NextBySubject { get; set; } = default!;
5048
#else
5149
#pragma warning disable SA1206
52-
public required string NextBySubject { get; set; }
50+
public string? NextBySubject { get; set; }
5351

5452
#pragma warning restore SA1206
5553
#endif
@@ -58,15 +56,15 @@ public record StreamMsgBatchGetRequest
5856
/// Return last messages mathing the subjects
5957
/// </summary>
6058
[JsonPropertyName("multi_last")]
61-
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
62-
public string[] MultiLastBySubjects { get; set; } = [];
59+
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
60+
public string[]? MultiLastBySubjects { get; set; }
6361

6462
/// <summary>
6563
/// Return message after sequence
6664
/// </summary>
6765
[JsonPropertyName("up_to_seq")]
6866
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
69-
[Range(ulong.MinValue, ulong.MaxValue)]
67+
[System.ComponentModel.DataAnnotations.Range(ulong.MinValue, ulong.MaxValue)]
7068
public ulong UpToSequence { get; set; }
7169

7270
/// <summary>

src/NATS.Client.JetStream/NatsJSStream.cs

+17-10
Original file line numberDiff line numberDiff line change
@@ -181,15 +181,22 @@ public ValueTask<NatsMsg<T>> GetDirectAsync<T>(StreamMsgGetRequest request, INat
181181
cancellationToken: cancellationToken);
182182
}
183183

184-
/// <summary>
185-
/// Request a direct batch message
186-
/// </summary>
187-
/// <param name="request">Batch message request.</param>
188-
/// <param name="serializer">Serializer to use for the message type.</param>
189-
/// <param name="includeEob"><c>true</c> to send the last empty message with eobCode in the header; otherwise <c>false</c></param>
190-
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
191-
/// <exception cref="InvalidOperationException">There was an issue, stream must have allow direct set.</exception>
192-
public IAsyncEnumerable<NatsMsg<T>> GetBatchDirectAsync<T>(StreamMsgBatchGetRequest request, INatsDeserialize<T>? serializer = default, bool includeEob = false, CancellationToken cancellationToken = default)
184+
public IAsyncEnumerable<NatsMsg<T>> GetBatchDirectAsync<T>(StreamMsgBatchGetRequest request, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default)
185+
=> GetBatchDirectInternalAsync<T>(request, serializer, cancellationToken);
186+
187+
public IAsyncEnumerable<NatsMsg<T>> GetBatchDirectAsync<T>(string[] multiLastBySubjects, ulong batch, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default)
188+
{
189+
var request = new StreamMsgBatchGetRequest { MultiLastBySubjects = multiLastBySubjects, Batch = batch };
190+
return GetBatchDirectInternalAsync<T>(request, serializer, cancellationToken);
191+
}
192+
193+
public IAsyncEnumerable<NatsMsg<T>> GetBatchDirectAsync<T>(string nextBySubject, ulong batch, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default)
194+
{
195+
var request = new StreamMsgBatchGetRequest { NextBySubject = nextBySubject, Batch = batch };
196+
return GetBatchDirectInternalAsync<T>(request, serializer, cancellationToken);
197+
}
198+
199+
private IAsyncEnumerable<NatsMsg<T>> GetBatchDirectInternalAsync<T>(StreamMsgBatchGetRequest request, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default)
193200
{
194201
ValidateStream();
195202

@@ -198,7 +205,7 @@ public IAsyncEnumerable<NatsMsg<T>> GetBatchDirectAsync<T>(StreamMsgBatchGetRequ
198205
data: request,
199206
requestSerializer: NatsJSJsonSerializer<StreamMsgBatchGetRequest>.Default,
200207
replySerializer: serializer,
201-
replyOpts: new NatsSubOpts() { StopOnEmptyMsg = !includeEob, ThrowIfNoResponders = true },
208+
replyOpts: new NatsSubOpts() { StopOnEmptyMsg = true, ThrowIfNoResponders = true },
202209
cancellationToken: cancellationToken);
203210
}
204211

tests/NATS.Client.JetStream.Tests/DirectGetTest.cs

+6-44
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ public async Task Direct_get_when_stream_disable()
1111
await using var server = NatsServer.StartJS();
1212
var nats = server.CreateClientConnection();
1313
var js = new NatsJSContext(nats);
14-
1514
var cts = new CancellationTokenSource();
1615
var cancellationToken = cts.Token;
1716
var streamConfig = new StreamConfig("stream_disable", new[] { "stream_disable.x" });
@@ -36,17 +35,13 @@ public async Task Direct_get_when_stream_enable()
3635
await using var server = NatsServer.StartJS();
3736
var nats = server.CreateClientConnection();
3837
var js = new NatsJSContext(nats);
39-
4038
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100));
4139
var cancellationToken = cts.Token;
4240
var streamConfig = new StreamConfig("stream_enable", new[] { "stream_enable.x" }) { AllowDirect = true };
4341

4442
var stream = await js.CreateStreamAsync(streamConfig, cancellationToken);
4543

46-
for (var i = 0; i < 1; i++)
47-
{
48-
await js.PublishAsync("stream_enable.x", new TestData { Test = i }, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken);
49-
}
44+
await js.PublishAsync("stream_enable.x", new TestData { Test = 1 }, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken);
5045

5146
var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { NextBySubject = "stream_enable.x", Batch = 3 };
5247
await foreach (var msg in stream.GetBatchDirectAsync(streamMsgBatchGetRequest, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken))
@@ -58,59 +53,26 @@ public async Task Direct_get_when_stream_enable()
5853
}
5954

6055
[SkipIfNatsServer(versionEarlierThan: "2.11")]
61-
public async Task Direct_get_with_eobCode()
56+
public async Task Direct_get_by_multi_last()
6257
{
6358
var testDataList = new List<TestData?>();
6459
await using var server = NatsServer.StartJS();
6560
var nats = server.CreateClientConnection();
6661
var js = new NatsJSContext(nats);
67-
6862
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100));
6963
var cancellationToken = cts.Token;
70-
var streamConfig = new StreamConfig("eobCode", new[] { "eobCode.x" }) { AllowDirect = true };
64+
var streamConfig = new StreamConfig("multiLast", new[] { "multiLast.*" }) { AllowDirect = true };
7165

7266
var stream = await js.CreateStreamAsync(streamConfig, cancellationToken);
7367

74-
for (var i = 0; i < 1; i++)
75-
{
76-
await js.PublishAsync("eobCode.x", new TestData { Test = i }, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken);
77-
}
68+
await js.PublishAsync("multiLast.x", new TestData { Test = 1 }, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken);
69+
await js.PublishAsync("multiLast.y", new TestData { Test = 2 }, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken);
7870

79-
var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { NextBySubject = "eobCode.x", Batch = 3 };
80-
await foreach (var msg in stream.GetBatchDirectAsync(streamMsgBatchGetRequest, TestDataJsonSerializer<TestData>.Default, includeEob: true, cancellationToken: cancellationToken))
71+
await foreach (var msg in stream.GetBatchDirectAsync(["multiLast.x", "multiLast.y"], 4, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken))
8172
{
8273
testDataList.Add(msg.Data);
8374
}
8475

8576
Assert.Equal(2, testDataList.Count);
8677
}
87-
88-
[SkipIfNatsServer(versionEarlierThan: "2.11")]
89-
public async Task Direct_get_min_sequence()
90-
{
91-
var testDataList = new List<TestData?>();
92-
await using var server = NatsServer.StartJS();
93-
var nats = server.CreateClientConnection();
94-
var js = new NatsJSContext(nats);
95-
96-
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100));
97-
var cancellationToken = cts.Token;
98-
var streamConfig = new StreamConfig("min_sequence", new[] { "min_sequence.x" }) { AllowDirect = true };
99-
100-
var stream = await js.CreateStreamAsync(streamConfig, cancellationToken);
101-
102-
for (var i = 0; i < 3; i++)
103-
{
104-
await js.PublishAsync("min_sequence.x", new TestData { Test = i }, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken);
105-
}
106-
107-
var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { NextBySubject = "min_sequence.x", Batch = 1, Seq = 3 };
108-
await foreach (var msg in stream.GetBatchDirectAsync(streamMsgBatchGetRequest, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken))
109-
{
110-
testDataList.Add(msg.Data);
111-
}
112-
113-
Assert.Single(testDataList);
114-
Assert.Equal(2, testDataList[0]?.Test);
115-
}
11678
}

0 commit comments

Comments
 (0)