Skip to content

Commit 4b49424

Browse files
committed
nats-io#636 - JetStream Batch Get Client support
* Added throw exception
1 parent edb6b4f commit 4b49424

File tree

2 files changed

+18
-8
lines changed

2 files changed

+18
-8
lines changed

src/NATS.Client.JetStream/NatsJSStream.cs

+14-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1+
using System.Runtime.CompilerServices;
12
using NATS.Client.Core;
2-
using NATS.Client.JetStream.Internal;
33
using NATS.Client.JetStream.Models;
44

55
namespace NATS.Client.JetStream;
@@ -202,17 +202,27 @@ public ValueTask<StreamMsgGetResponse> GetAsync(StreamMsgGetRequest request, Can
202202
request: request,
203203
cancellationToken);
204204

205-
private IAsyncEnumerable<NatsMsg<T>> GetBatchDirectInternalAsync<T>(StreamMsgBatchGetRequest request, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default)
205+
private async IAsyncEnumerable<NatsMsg<T>> GetBatchDirectInternalAsync<T>(StreamMsgBatchGetRequest request, INatsDeserialize<T>? serializer = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
206206
{
207207
ValidateStream();
208208

209-
return _context.Connection.RequestManyAsync<StreamMsgBatchGetRequest, T>(
209+
var requestManyAsync = _context.Connection.RequestManyAsync(
210210
subject: $"{_context.Opts.Prefix}.DIRECT.GET.{_name}",
211211
data: request,
212212
requestSerializer: NatsJSJsonSerializer<StreamMsgBatchGetRequest>.Default,
213213
replySerializer: serializer,
214-
replyOpts: new NatsSubOpts() { StopOnEmptyMsg = true, ThrowIfNoResponders = true },
214+
replyOpts: new NatsSubOpts { StopOnEmptyMsg = true, ThrowIfNoResponders = true },
215215
cancellationToken: cancellationToken);
216+
217+
await foreach (var msg in requestManyAsync.ConfigureAwait(false))
218+
{
219+
if (msg.Error is { } error)
220+
{
221+
throw error;
222+
}
223+
224+
yield return msg;
225+
}
216226
}
217227

218228
private void ThrowIfDeleted()

tests/NATS.Client.JetStream.Tests/DirectGetTest.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ public class DirectGetTest
99
public async Task Direct_get_when_stream_disable()
1010
{
1111
await using var server = NatsServer.StartJS();
12-
var nats = server.CreateClientConnection();
12+
await using var nats = server.CreateClientConnection();
1313
var js = new NatsJSContext(nats);
1414
var cts = new CancellationTokenSource();
1515
var cancellationToken = cts.Token;
@@ -33,7 +33,7 @@ public async Task Direct_get_when_stream_enable()
3333
{
3434
var testDataList = new List<TestData?>();
3535
await using var server = NatsServer.StartJS();
36-
var nats = server.CreateClientConnection();
36+
await using var nats = server.CreateClientConnection();
3737
var js = new NatsJSContext(nats);
3838
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100));
3939
var cancellationToken = cts.Token;
@@ -57,11 +57,11 @@ public async Task Direct_get_by_multi_last()
5757
{
5858
var testDataList = new List<TestData?>();
5959
await using var server = NatsServer.StartJS();
60-
var nats = server.CreateClientConnection();
60+
await using var nats = server.CreateClientConnection();
6161
var js = new NatsJSContext(nats);
6262
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100));
6363
var cancellationToken = cts.Token;
64-
var streamConfig = new StreamConfig("multiLast", new[] { "multiLast.*" }) { AllowDirect = true };
64+
var streamConfig = new StreamConfig("multiLast", ["multiLast.*"]) { AllowDirect = true };
6565

6666
var stream = await js.CreateStreamAsync(streamConfig, cancellationToken);
6767

0 commit comments

Comments
 (0)