Skip to content

Commit 5e0964e

Browse files
authored
Cancel subscribeToLogs and messageStream RPCs when the Session closes (deephaven#5433)
This PR fixes subscribeToLogs and messageStream RPCs; previously, they were staying open when a Session closed. Session closing behavior for a number of RPCs was specifically added as a tests here. I'll note that we are inconsistent in how we handle this case - some RPCs will complete successfully, others will pass along UNAUTHENTICATED or CANCELLED. We should aim to clean this up, but I'm proposing we do that as a follow-up PR to avoid PR bloat. The fix involves hooking into SessionServiceCallListener, which is a centralized place where we maintain Session state information. We should strongly consider migrating other RPC Session close behavior to here to simplify the implementations and maintain consistency. Again, I'd suggest doing that as a separate PR. Fixes deephaven#5415
1 parent 3b97aaf commit 5e0964e

File tree

7 files changed

+477
-28
lines changed

7 files changed

+477
-28
lines changed

server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java

+2
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ public void subscribeToLogs(
157157
GrpcUtil.safelyError(responseObserver, Code.FAILED_PRECONDITION, "Remote console disabled");
158158
return;
159159
}
160+
// Session close logic implicitly handled in
161+
// io.deephaven.server.session.SessionServiceGrpcImpl.SessionServiceInterceptor
160162
final LogsClient client =
161163
new LogsClient(request, (ServerCallStreamObserver<LogSubscriptionData>) responseObserver);
162164
client.start();

server/src/main/java/io/deephaven/server/object/ObjectServiceGrpcImpl.java

+2
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,8 @@ public void onCompleted() {
323323
@Override
324324
public StreamObserver<StreamRequest> messageStream(StreamObserver<StreamResponse> responseObserver) {
325325
SessionState session = sessionService.getCurrentSession();
326+
// Session close logic implicitly handled in
327+
// io.deephaven.server.session.SessionServiceGrpcImpl.SessionServiceInterceptor
326328
return new SendMessageObserver(session, responseObserver);
327329
}
328330

server/src/main/java/io/deephaven/server/session/SessionServiceGrpcImpl.java

+79-28
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
import io.deephaven.internal.log.LoggerFactory;
1616
import io.deephaven.io.logger.Logger;
1717
import io.deephaven.proto.backplane.grpc.*;
18+
import io.deephaven.proto.backplane.script.grpc.ConsoleServiceGrpc;
1819
import io.deephaven.proto.util.Exceptions;
1920
import io.deephaven.util.SafeCloseable;
20-
import io.deephaven.util.function.ThrowingRunnable;
2121
import io.grpc.Context;
2222
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
2323
import io.grpc.ForwardingServerCallListener;
@@ -36,9 +36,11 @@
3636

3737
import javax.inject.Inject;
3838
import javax.inject.Singleton;
39+
import java.io.Closeable;
3940
import java.lang.Object;
4041
import java.util.LinkedHashMap;
4142
import java.util.Map;
43+
import java.util.Set;
4244
import java.util.UUID;
4345

4446
public class SessionServiceGrpcImpl extends SessionServiceGrpc.SessionServiceImplBase {
@@ -310,10 +312,19 @@ private void addHeaders(final Metadata md) {
310312

311313
@Singleton
312314
public static class SessionServiceInterceptor implements ServerInterceptor {
315+
private static final Status AUTHENTICATION_DETAILS_INVALID =
316+
Status.UNAUTHENTICATED.withDescription("Authentication details invalid");
317+
318+
// We can't use just io.grpc.MethodDescriptor (unless we chose provide and inject the named method descriptors),
319+
// some of our methods are overridden from stock gRPC; for example,
320+
// io.deephaven.server.object.ObjectServiceGrpcBinding.bindService.
321+
// The goal should be to migrate all of the existing RPC Session close management logic to here if possible.
322+
private static final Set<String> CANCEL_RPC_ON_SESSION_CLOSE = Set.of(
323+
ConsoleServiceGrpc.getSubscribeToLogsMethod().getFullMethodName(),
324+
ObjectServiceGrpc.getMessageStreamMethod().getFullMethodName());
325+
313326
private final SessionService service;
314327
private final SessionService.ErrorTransformer errorTransformer;
315-
private static final Status authenticationDetailsInvalid =
316-
Status.UNAUTHENTICATED.withDescription("Authentication details invalid");
317328

318329
@Inject
319330
public SessionServiceInterceptor(
@@ -344,12 +355,8 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<Re
344355
try {
345356
session = service.getSessionForAuthToken(token);
346357
} catch (AuthenticationException e) {
347-
try {
348-
call.close(authenticationDetailsInvalid, new Metadata());
349-
} catch (IllegalStateException ignored) {
350-
// could be thrown if the call was already closed. As an interceptor, we can't throw,
351-
// so ignoring this and just returning the no-op listener.
352-
}
358+
// As an interceptor, we can't throw, so ignoring this and just returning the no-op listener.
359+
safeClose(call, AUTHENTICATION_DETAILS_INVALID, new Metadata(), false);
353360
return new ServerCall.Listener<>() {};
354361
}
355362
}
@@ -363,33 +370,61 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<Re
363370

364371
final MutableObject<SessionServiceCallListener<ReqT, RespT>> listener = new MutableObject<>();
365372
rpcWrapper(serverCall, context, finalSession, errorTransformer, () -> listener.setValue(
366-
new SessionServiceCallListener<>(serverCallHandler.startCall(serverCall, metadata), serverCall,
367-
context, finalSession, errorTransformer)));
373+
listener(serverCall, metadata, serverCallHandler, context, finalSession)));
368374
if (listener.getValue() == null) {
369375
return new ServerCall.Listener<>() {};
370376
}
371377
return listener.getValue();
372378
}
379+
380+
private <ReqT, RespT> @NotNull SessionServiceCallListener<ReqT, RespT> listener(
381+
InterceptedCall<ReqT, RespT> serverCall,
382+
Metadata metadata,
383+
ServerCallHandler<ReqT, RespT> serverCallHandler,
384+
Context context,
385+
SessionState session) {
386+
return new SessionServiceCallListener<>(
387+
serverCallHandler.startCall(serverCall, metadata),
388+
serverCall,
389+
context,
390+
session,
391+
errorTransformer,
392+
CANCEL_RPC_ON_SESSION_CLOSE.contains(serverCall.getMethodDescriptor().getFullMethodName()));
393+
}
373394
}
374395

375396
private static class SessionServiceCallListener<ReqT, RespT> extends
376-
ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {
397+
ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> implements Closeable {
398+
private static final Status SESSION_CLOSED = Status.CANCELLED.withDescription("Session closed");
399+
377400
private final ServerCall<ReqT, RespT> call;
378401
private final Context context;
379402
private final SessionState session;
380403
private final SessionService.ErrorTransformer errorTransformer;
404+
private final boolean autoCancelOnSessionClose;
381405

382-
public SessionServiceCallListener(
406+
SessionServiceCallListener(
383407
ServerCall.Listener<ReqT> delegate,
384408
ServerCall<ReqT, RespT> call,
385409
Context context,
386410
SessionState session,
387-
SessionService.ErrorTransformer errorTransformer) {
411+
SessionService.ErrorTransformer errorTransformer,
412+
boolean autoCancelOnSessionClose) {
388413
super(delegate);
389414
this.call = call;
390415
this.context = context;
391416
this.session = session;
392417
this.errorTransformer = errorTransformer;
418+
this.autoCancelOnSessionClose = autoCancelOnSessionClose;
419+
if (autoCancelOnSessionClose && session != null) {
420+
session.addOnCloseCallback(this);
421+
}
422+
}
423+
424+
@Override
425+
public void close() {
426+
// session.addOnCloseCallback
427+
safeClose(call, SESSION_CLOSED, new Metadata(), false);
393428
}
394429

395430
@Override
@@ -405,11 +440,17 @@ public void onHalfClose() {
405440
@Override
406441
public void onCancel() {
407442
rpcWrapper(call, context, session, errorTransformer, super::onCancel);
443+
if (autoCancelOnSessionClose && session != null) {
444+
session.removeOnCloseCallback(this);
445+
}
408446
}
409447

410448
@Override
411449
public void onComplete() {
412450
rpcWrapper(call, context, session, errorTransformer, super::onComplete);
451+
if (autoCancelOnSessionClose && session != null) {
452+
session.removeOnCloseCallback(this);
453+
}
413454
}
414455

415456
@Override
@@ -432,34 +473,44 @@ private static <ReqT, RespT> void rpcWrapper(
432473
@NotNull final Context context,
433474
@Nullable final SessionState session,
434475
@NotNull final SessionService.ErrorTransformer errorTransformer,
435-
@NotNull final ThrowingRunnable<InterruptedException> lambda) {
476+
@NotNull final Runnable lambda) {
436477
Context previous = context.attach();
437478
// note: we'll open the execution context here so that it may be used by the error transformer
438479
try (final SafeCloseable ignored1 = session == null ? null : session.getExecutionContext().open()) {
439480
try (final SafeCloseable ignored2 = LivenessScopeStack.open()) {
440481
lambda.run();
441-
} catch (final InterruptedException err) {
442-
Thread.currentThread().interrupt();
443-
closeWithError(call, errorTransformer.transform(err));
444-
} catch (final Throwable err) {
445-
closeWithError(call, errorTransformer.transform(err));
482+
} catch (final RuntimeException err) {
483+
safeClose(call, errorTransformer.transform(err));
484+
} catch (final Error error) {
485+
// Indicates a very serious failure; debateable whether we should even try to send close.
486+
safeClose(call, Status.INTERNAL, new Metadata(), false);
487+
throw error;
446488
} finally {
447489
context.detach(previous);
448490
}
449491
}
450492
}
451493

452-
private static <ReqT, RespT> void closeWithError(
453-
@NotNull final ServerCall<ReqT, RespT> call,
494+
private static void safeClose(
495+
@NotNull final ServerCall<?, ?> call,
454496
@NotNull final StatusRuntimeException err) {
497+
Metadata metadata = Status.trailersFromThrowable(err);
498+
if (metadata == null) {
499+
metadata = new Metadata();
500+
}
501+
safeClose(call, Status.fromThrowable(err), metadata, true);
502+
}
503+
504+
private static void safeClose(ServerCall<?, ?> call, Status status, Metadata trailers, boolean logOnError) {
455505
try {
456-
Metadata metadata = Status.trailersFromThrowable(err);
457-
if (metadata == null) {
458-
metadata = new Metadata();
506+
call.close(status, trailers);
507+
} catch (IllegalStateException e) {
508+
// IllegalStateException is explicitly documented as thrown if the call is already closed. It might be nice
509+
// if there was a more explicit exception type, but this should suffice. We _could_ try and check the text
510+
// "call already closed", but that is an undocumented implementation detail we should probably not rely on.
511+
if (logOnError && log.isDebugEnabled()) {
512+
log.debug().append("call.close error: ").append(e).endl();
459513
}
460-
call.close(Status.fromThrowable(err), metadata);
461-
} catch (final Exception unexpectedErr) {
462-
log.debug().append("Unanticipated gRPC Error: ").append(unexpectedErr).endl();
463514
}
464515
}
465516
}

0 commit comments

Comments
 (0)