Skip to content

Commit

Permalink
Do not forcifully close session when a shard is reassigned (#135)
Browse files Browse the repository at this point in the history
* Do not forcifully close session when a shard is reassigned

* Spotless

* checkstyle

* Revert "checkstyle"

This reverts commit 4b3ae57.
  • Loading branch information
merlimat authored Apr 26, 2024
1 parent 900e073 commit 494acac
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ public void close() {
scheduler = Schedulers.newSingle(threadName);
var disposable =
Flux.defer(() -> stub.reactor().getNotifications(request.build()))
.doOnError(t -> log.warn("Error receiving notifications for shard {}", shardId, t))
.doOnError(
t ->
log.warn(
"Error receiving notifications for shard {}: {}", shardId, t.getMessage()))
.doOnEach(metrics::recordBatch)
.retryWhen(retrySpec)
.repeat()
Expand All @@ -115,6 +118,10 @@ private void notify(@NonNull NotificationBatch batch) {
batch.getNotificationsMap().entrySet().stream()
.map(
e -> {
if (log.isDebugEnabled()) {
log.debug("--- Got notification: {} - {}", e.getKey(), e.getValue().getType());
}

var key = e.getKey();
var notice = e.getValue();
return switch (notice.getType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,8 @@ Map<Long, Session> sessions() {
@Override
public void accept(@NonNull ShardAssignmentChanges changes) {
if (!closed) {
// Added shards do not have any sessions to keep alive
var removed = changes.removed();
removed.forEach(s -> closeQuietly(sessionsByShardId.remove(s.shardId())));

var reassigned = changes.reassigned();
reassigned.forEach(
s ->
closeQuietly(sessionsByShardId.remove(s.shardId()))
.ifPresent(c -> getSession(s.shardId())));
// Removed shards do not have any sessions to keep alive
changes.removed().forEach(s -> closeQuietly(sessionsByShardId.remove(s.shardId())));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ void accept() throws Exception {
Set.of(new Reassigned(shardId2, "leader2", "leader3"))));

assertThat(manager.sessions()).doesNotContainKey(shardId1);
assertThat(manager.getSession(shardId2)).isSameAs(session22);
// Session here shouldn't have changed after the reassignment
assertThat(manager.getSession(shardId2)).isSameAs(session21);
verify(session).close();
}

Expand Down

0 comments on commit 494acac

Please sign in to comment.