Skip to content

Commit ee4a8e7

Browse files
committed
feat: supports new caught up and fell behind events.
1 parent 26c72a2 commit ee4a8e7

File tree

7 files changed

+74
-18
lines changed

7 files changed

+74
-18
lines changed

src/main/java/io/kurrent/dbclient/ReadResponseObserver.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.slf4j.LoggerFactory;
1313

1414
import java.nio.charset.Charset;
15+
import java.time.Instant;
1516
import java.util.concurrent.atomic.AtomicBoolean;
1617
import java.util.concurrent.atomic.AtomicInteger;
1718

@@ -118,11 +119,21 @@ else if (value.hasLastStreamPosition())
118119
else if (value.hasLastAllStreamPosition()) {
119120
Shared.AllStreamPosition position = value.getLastAllStreamPosition();
120121
consumer.onLastAllStreamPosition(position.getCommitPosition(), position.getPreparePosition());
121-
} else if (value.hasCaughtUp())
122-
consumer.onCaughtUp();
123-
else if (value.hasFellBehind())
124-
consumer.onFellBehind();
125-
else {
122+
} else if (value.hasCaughtUp()) {
123+
StreamsOuterClass.ReadResp.CaughtUp caughtUp = value.getCaughtUp();
124+
Instant timestamp = Instant.ofEpochSecond(caughtUp.getTimestamp().getSeconds(), caughtUp.getTimestamp().getNanos());
125+
Long streamRevision = caughtUp.hasStreamRevision() ? caughtUp.getStreamRevision() : null;
126+
Position position = caughtUp.hasPosition() ? new Position(caughtUp.getPosition().getCommitPosition(), caughtUp.getPosition().getPreparePosition()) : null;
127+
128+
consumer.onCaughtUp(timestamp, streamRevision, position);
129+
} else if (value.hasFellBehind()) {
130+
StreamsOuterClass.ReadResp.FellBehind fellBehind = value.getFellBehind();
131+
Instant timestamp = Instant.ofEpochSecond(fellBehind.getTimestamp().getSeconds(), fellBehind.getTimestamp().getNanos());
132+
Long streamRevision = fellBehind.hasStreamRevision() ? fellBehind.getStreamRevision() : null;
133+
Position position = fellBehind.hasPosition() ? new Position(fellBehind.getPosition().getCommitPosition(), fellBehind.getPosition().getPreparePosition()) : null;
134+
135+
consumer.onFellBehind(timestamp, streamRevision, position);
136+
} else {
126137
logger.warn("received unknown message variant");
127138
}
128139

src/main/java/io/kurrent/dbclient/ReadStreamConsumer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import org.reactivestreams.Subscriber;
44

5+
import java.time.Instant;
6+
57
class ReadStreamConsumer implements StreamConsumer {
68
private final Subscriber<? super ReadMessage> subscriber;
79

@@ -41,10 +43,10 @@ public void onLastAllStreamPosition(long commit, long prepare) {
4143
}
4244

4345
@Override
44-
public void onCaughtUp() {}
46+
public void onCaughtUp(Instant timestamp, Long streamRevision, Position position) {}
4547

4648
@Override
47-
public void onFellBehind() {}
49+
public void onFellBehind(Instant timestamp, Long streamRevision, Position position) {}
4850

4951
@Override
5052
public void onCancelled(Throwable exception) {
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.kurrent.dbclient;
22

3+
import java.time.Instant;
4+
35
public interface StreamConsumer {
46
default void onSubscribe(org.reactivestreams.Subscription subscription) {}
57
void onEvent(ResolvedEvent event);
@@ -9,8 +11,8 @@ default void onSubscribe(org.reactivestreams.Subscription subscription) {}
911
void onFirstStreamPosition(long position);
1012
void onLastStreamPosition(long position);
1113
void onLastAllStreamPosition(long commit, long prepare);
12-
void onCaughtUp();
13-
void onFellBehind();
14+
void onCaughtUp(Instant timestamp, Long streamRevision, Position position);
15+
void onFellBehind(Instant timestamp, Long streamRevision, Position position);
1416
void onCancelled(Throwable exception);
1517
void onComplete();
1618
}

src/main/java/io/kurrent/dbclient/SubscriptionListener.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.kurrent.dbclient;
22

3+
import java.time.Instant;
4+
35
/**
46
* Listener used to handle catch-up subscription notifications raised throughout its lifecycle.
57
*/
@@ -28,12 +30,12 @@ public void onConfirmation(Subscription subscription) {}
2830
* Called when the subscription has reached the head of the stream.
2931
* @param subscription handle to the subscription.
3032
*/
31-
public void onCaughtUp(Subscription subscription) {}
33+
public void onCaughtUp(Subscription subscription, Instant timestamp, Long streamRevision, Position position) {}
3234

3335
/**
3436
* Called when the subscription has fallen behind, meaning it's no longer keeping up with the
3537
* stream's pace.
3638
* @param subscription handle to the subscription.
3739
*/
38-
public void onFellBehind(Subscription subscription) {}
40+
public void onFellBehind(Subscription subscription, Instant timestamp, Long streamRevision, Position position) {}
3941
}

src/main/java/io/kurrent/dbclient/SubscriptionStreamConsumer.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.kurrent.dbclient;
22

33

4+
import java.time.Instant;
45
import java.util.concurrent.CompletableFuture;
56

67
class SubscriptionStreamConsumer implements StreamConsumer {
@@ -56,13 +57,13 @@ public void onLastStreamPosition(long position) {}
5657
public void onLastAllStreamPosition(long commit, long prepare) {}
5758

5859
@Override
59-
public void onCaughtUp() {
60-
this.listener.onCaughtUp(this.subscription);
60+
public void onCaughtUp(Instant timestamp, Long streamRevision, Position position) {
61+
this.listener.onCaughtUp(this.subscription, timestamp, streamRevision, position);
6162
}
6263

6364
@Override
64-
public void onFellBehind() {
65-
this.listener.onFellBehind(this.subscription);
65+
public void onFellBehind(Instant timestamp, Long streamRevision, Position position) {
66+
this.listener.onFellBehind(this.subscription, timestamp, streamRevision, position);
6667
}
6768

6869
@Override

src/main/proto/streams.proto

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,37 @@ message ReadResp {
103103
FellBehind fell_behind = 9;
104104
}
105105

106-
message CaughtUp {}
106+
// The $all or stream subscription has caught up and become live.
107+
message CaughtUp {
108+
// Current time in the server when the subscription caught up
109+
google.protobuf.Timestamp timestamp = 1;
110+
111+
// Checkpoint for resuming a stream subscription.
112+
// For stream subscriptions it is populated unless the stream is empty.
113+
// For $all subscriptions it is not populated.
114+
optional int64 stream_revision = 2;
115+
116+
// Checkpoint for resuming a $all subscription.
117+
// For stream subscriptions it is not populated.
118+
// For $all subscriptions it is populated unless the database is empty.
119+
optional Position position = 3;
120+
}
121+
122+
// The $all or stream subscription has fallen back into catchup mode and is no longer live.
123+
message FellBehind {
124+
// Current time in the server when the subscription fell behind
125+
google.protobuf.Timestamp timestamp = 1;
126+
127+
// Checkpoint for resuming a stream subscription.
128+
// For stream subscriptions it is populated unless the stream is empty.
129+
// For $all subscriptions it is not populated.
130+
optional int64 stream_revision = 2;
107131

108-
message FellBehind {}
132+
// Checkpoint for resuming a $all subscription.
133+
// For stream subscriptions it is not populated.
134+
// For $all subscriptions it is populated unless the database is empty.
135+
optional Position position = 3;
136+
}
109137

110138
message ReadEvent {
111139
RecordedEvent event = 1;
@@ -132,7 +160,16 @@ message ReadResp {
132160
message Checkpoint {
133161
uint64 commit_position = 1;
134162
uint64 prepare_position = 2;
163+
164+
// Current time in the server when the checkpoint was reached
165+
google.protobuf.Timestamp timestamp = 3;
135166
}
167+
168+
message Position {
169+
uint64 commit_position = 1;
170+
uint64 prepare_position = 2;
171+
}
172+
136173
message StreamNotFound {
137174
event_store.client.StreamIdentifier stream_identifier = 1;
138175
}

src/test/java/io/kurrent/dbclient/streams/SubscriptionTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.junit.jupiter.api.Timeout;
88

99
import java.io.IOException;
10+
import java.time.Instant;
1011
import java.util.ArrayList;
1112
import java.util.Optional;
1213
import java.util.concurrent.CountDownLatch;
@@ -176,7 +177,7 @@ default void testCaughtUpMessageIsReceived() throws Throwable {
176177

177178
Subscription subscription = client.subscribeToStream(streamName, new SubscriptionListener() {
178179
@Override
179-
public void onCaughtUp(Subscription subscription) {
180+
public void onCaughtUp(Subscription subscription, Instant timestamp, Long streamRevision, Position position) {
180181
caughtUp.countDown();
181182
}
182183
}, SubscribeToStreamOptions.get().fromStart()).get();

0 commit comments

Comments
 (0)