Skip to content

Commit d068762

Browse files
authoredApr 25, 2025
Document read reactive (#322)
1 parent 6dfea76 commit d068762

File tree

1 file changed

+243
-0
lines changed

1 file changed

+243
-0
lines changed
 

‎src/test/java/io/kurrent/dbclient/samples/reading_events/ReadingEvents.java

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
import io.kurrent.dbclient.*;
44
import com.fasterxml.jackson.core.JsonProcessingException;
55
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import org.reactivestreams.*;
7+
import org.reactivestreams.Subscription;
68

79
import java.util.concurrent.ExecutionException;
810

11+
@SuppressWarnings("ALL")
912
public class ReadingEvents {
1013
private static void readFromStream(KurrentDBClient client) throws ExecutionException, InterruptedException, JsonProcessingException {
1114
// region read-from-stream
@@ -16,13 +19,41 @@ private static void readFromStream(KurrentDBClient client) throws ExecutionExcep
1619
ReadResult result = client.readStream("some-stream", options)
1720
.get();
1821

22+
23+
// or using read reactive
24+
Publisher<ReadMessage> publisher = client.readStreamReactive("some-stream", options);
1925
// endregion read-from-stream
2026

2127
// region iterate-stream
2228
for (ResolvedEvent resolvedEvent : result.getEvents()) {
2329
RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent();
2430
System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
2531
}
32+
33+
// or using read reactive
34+
publisher.subscribe(new Subscriber<ReadMessage>() {
35+
@Override
36+
public void onSubscribe(Subscription subscription) {
37+
}
38+
39+
@Override
40+
public void onNext(ReadMessage readMessage) {
41+
RecordedEvent recordedEvent = readMessage.getEvent().getOriginalEvent();
42+
try {
43+
System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
44+
} catch (JsonProcessingException e) {
45+
throw new RuntimeException(e);
46+
}
47+
}
48+
49+
@Override
50+
public void onError(Throwable throwable) {
51+
}
52+
53+
@Override
54+
public void onComplete() {
55+
}
56+
});
2657
// endregion iterate-stream
2758
}
2859

@@ -36,13 +67,40 @@ private static void readFromStreamPosition(KurrentDBClient client) throws Execut
3667
ReadResult result = client.readStream("some-stream", options)
3768
.get();
3869

70+
// or using read reactive
71+
Publisher<ReadMessage> publisher = client.readStreamReactive("some-stream", options);
3972
// endregion read-from-stream-position
4073

4174
// region iterate-stream
4275
for (ResolvedEvent resolvedEvent : result.getEvents()) {
4376
RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent();
4477
System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
4578
}
79+
80+
// or using read reactive
81+
publisher.subscribe(new Subscriber<ReadMessage>() {
82+
@Override
83+
public void onSubscribe(Subscription subscription) {
84+
}
85+
86+
@Override
87+
public void onNext(ReadMessage readMessage) {
88+
RecordedEvent recordedEvent = readMessage.getEvent().getOriginalEvent();
89+
try {
90+
System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
91+
} catch (JsonProcessingException e) {
92+
throw new RuntimeException(e);
93+
}
94+
}
95+
96+
@Override
97+
public void onError(Throwable throwable) {
98+
}
99+
100+
@Override
101+
public void onComplete() {
102+
}
103+
});
46104
// endregion iterate-stream
47105
}
48106

@@ -55,6 +113,9 @@ private static void readStreamOverridingUserCredentials(KurrentDBClient client)
55113

56114
ReadResult result = client.readStream("some-stream", options)
57115
.get();
116+
117+
// Or using reactive stream
118+
Publisher<ReadMessage> publisher = client.readStreamReactive("some-stream", options);
58119
// endregion overriding-user-credentials
59120
}
60121

@@ -81,6 +142,39 @@ private static void readFromStreamPositionCheck(KurrentDBClient client) throws J
81142
RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent();
82143
System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
83144
}
145+
146+
// or using read reactive
147+
Publisher<ReadMessage> publisher = client.readStreamReactive("some-stream", options);
148+
149+
publisher.subscribe(new Subscriber<ReadMessage>() {
150+
@Override
151+
public void onSubscribe(Subscription subscription) {
152+
}
153+
154+
@Override
155+
public void onNext(ReadMessage readMessage) {
156+
RecordedEvent recordedEvent = readMessage.getEvent().getOriginalEvent();
157+
try {
158+
System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
159+
} catch (JsonProcessingException e) {
160+
throw new RuntimeException(e);
161+
}
162+
}
163+
164+
@Override
165+
public void onError(Throwable throwable) {
166+
Throwable innerException = throwable.getCause();
167+
168+
if (innerException instanceof StreamNotFoundException) {
169+
return;
170+
}
171+
// Handle other errors
172+
}
173+
174+
@Override
175+
public void onComplete() {
176+
}
177+
});
84178
// endregion checking-for-stream-presence
85179
}
86180

@@ -97,6 +191,33 @@ private static void readFromStreamBackwards(KurrentDBClient client) throws JsonP
97191
RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent();
98192
System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
99193
}
194+
195+
// or using read reactive
196+
Publisher<ReadMessage> publisher = client.readStreamReactive("some-stream", options);
197+
198+
publisher.subscribe(new Subscriber<ReadMessage>() {
199+
@Override
200+
public void onSubscribe(Subscription subscription) {
201+
}
202+
203+
@Override
204+
public void onNext(ReadMessage readMessage) {
205+
RecordedEvent recordedEvent = readMessage.getEvent().getOriginalEvent();
206+
try {
207+
System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
208+
} catch (JsonProcessingException e) {
209+
throw new RuntimeException(e);
210+
}
211+
}
212+
213+
@Override
214+
public void onError(Throwable throwable) {
215+
}
216+
217+
@Override
218+
public void onComplete() {
219+
}
220+
});
100221
// endregion reading-backwards
101222
}
102223

@@ -109,13 +230,40 @@ private static void readFromAllStream(KurrentDBClient client) throws JsonProcess
109230
ReadResult result = client.readAll(options)
110231
.get();
111232

233+
// or using read reactive
234+
Publisher<ReadMessage> publisher = client.readAllReactive(options);
112235
// endregion read-from-all-stream
113236

114237
// region read-from-all-stream-iterate
115238
for (ResolvedEvent resolvedEvent : result.getEvents()) {
116239
RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent();
117240
System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
118241
}
242+
243+
// or using read reactive
244+
publisher.subscribe(new Subscriber<ReadMessage>() {
245+
@Override
246+
public void onSubscribe(Subscription subscription) {
247+
}
248+
249+
@Override
250+
public void onNext(ReadMessage readMessage) {
251+
RecordedEvent recordedEvent = readMessage.getEvent().getOriginalEvent();
252+
try {
253+
System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
254+
} catch (JsonProcessingException e) {
255+
throw new RuntimeException(e);
256+
}
257+
}
258+
259+
@Override
260+
public void onError(Throwable throwable) {
261+
}
262+
263+
@Override
264+
public void onComplete() {
265+
}
266+
});
119267
// endregion read-from-all-stream-iterate
120268
}
121269

@@ -128,6 +276,9 @@ private static void readAllOverridingUserCredentials(KurrentDBClient client) thr
128276

129277
ReadResult result = client.readAll(options)
130278
.get();
279+
280+
// or using read reactive
281+
Publisher<ReadMessage> publisher = client.readAllReactive(options);
131282
// endregion read-all-overriding-user-credentials
132283
}
133284

@@ -147,6 +298,38 @@ private static void ignoreSystemEvents(KurrentDBClient client) throws JsonProces
147298
}
148299
System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
149300
}
301+
302+
// or using read reactive
303+
Publisher<ReadMessage> publisher = client.readAllReactive(options);
304+
305+
publisher.subscribe(new Subscriber<ReadMessage>() {
306+
@Override
307+
public void onSubscribe(Subscription subscription) {
308+
}
309+
310+
@Override
311+
public void onNext(ReadMessage readMessage) {
312+
RecordedEvent recordedEvent = readMessage.getEvent().getOriginalEvent();
313+
314+
if (recordedEvent.getEventType().startsWith("$")) {
315+
return;
316+
}
317+
318+
try {
319+
System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
320+
} catch (JsonProcessingException e) {
321+
throw new RuntimeException(e);
322+
}
323+
}
324+
325+
@Override
326+
public void onError(Throwable throwable) {
327+
}
328+
329+
@Override
330+
public void onComplete() {
331+
}
332+
});
150333
// endregion ignore-system-events
151334
}
152335

@@ -159,13 +342,40 @@ private static void readFromAllStreamBackwards(KurrentDBClient client) throws Js
159342
ReadResult result = client.readAll(options)
160343
.get();
161344

345+
// or using read reactive
346+
Publisher<ReadMessage> publisher = client.readAllReactive(options);
162347
// endregion read-from-all-stream-backwards
163348

164349
// region read-from-all-stream-iterate
165350
for (ResolvedEvent resolvedEvent : result.getEvents()) {
166351
RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent();
167352
System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
168353
}
354+
355+
// or using read reactive
356+
publisher.subscribe(new Subscriber<ReadMessage>() {
357+
@Override
358+
public void onSubscribe(Subscription subscription) {
359+
}
360+
361+
@Override
362+
public void onNext(ReadMessage readMessage) {
363+
RecordedEvent recordedEvent = readMessage.getEvent().getOriginalEvent();
364+
try {
365+
System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
366+
} catch (JsonProcessingException e) {
367+
throw new RuntimeException(e);
368+
}
369+
}
370+
371+
@Override
372+
public void onError(Throwable throwable) {
373+
}
374+
375+
@Override
376+
public void onComplete() {
377+
}
378+
});
169379
// endregion read-from-all-stream-iterate
170380
}
171381

@@ -184,6 +394,36 @@ private static void filteringOutSystemEvents(KurrentDBClient client) throws Json
184394
}
185395
System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
186396
}
397+
398+
// or using read reactive
399+
Publisher<ReadMessage> publisher = client.readAllReactive(options);
400+
401+
publisher.subscribe(new Subscriber<ReadMessage>() {
402+
@Override
403+
public void onSubscribe(Subscription subscription) {
404+
}
405+
406+
@Override
407+
public void onNext(ReadMessage readMessage) {
408+
RecordedEvent recordedEvent = readMessage.getEvent().getOriginalEvent();
409+
if (!recordedEvent.getEventType().startsWith("$")) {
410+
return;
411+
}
412+
try {
413+
System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
414+
} catch (JsonProcessingException e) {
415+
throw new RuntimeException(e);
416+
}
417+
}
418+
419+
@Override
420+
public void onError(Throwable throwable) {
421+
}
422+
423+
@Override
424+
public void onComplete() {
425+
}
426+
});
187427
}
188428

189429
private static void readFromStreamResolvingLinkTos(KurrentDBClient client) throws JsonProcessingException, ExecutionException, InterruptedException {
@@ -196,6 +436,9 @@ private static void readFromStreamResolvingLinkTos(KurrentDBClient client) throw
196436
ReadResult result = client.readAll(options)
197437
.get();
198438

439+
// or using read reactive
440+
Publisher<ReadMessage> publisher = client.readAllReactive(options);
441+
199442
// endregion read-from-all-stream-resolving-link-Tos
200443
for (ResolvedEvent resolvedEvent : result.getEvents()) {
201444
RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent();

0 commit comments

Comments
 (0)