Skip to content

Commit

Permalink
Support Multiple Pending Receives (#4177)
Browse files Browse the repository at this point in the history
  • Loading branch information
nibanks authored Mar 8, 2024
1 parent d02dd23 commit 925b669
Show file tree
Hide file tree
Showing 15 changed files with 137 additions and 212 deletions.
65 changes: 17 additions & 48 deletions src/core/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -737,8 +737,6 @@ MsQuicStreamClose(

} else {

QUIC_CONN_VERIFY(Connection, !Connection->State.HandleClosed);

BOOLEAN AlreadyShutdownComplete = Stream->ClientCallbackHandler == NULL;
if (AlreadyShutdownComplete) {
//
Expand Down Expand Up @@ -928,7 +926,6 @@ MsQuicStreamShutdown(
Connection = Stream->Connection;

QUIC_CONN_VERIFY(Connection, !Connection->State.Freed);
QUIC_CONN_VERIFY(Connection, !Connection->State.HandleClosed);

if (Flags & QUIC_STREAM_SHUTDOWN_FLAG_INLINE &&
Connection->WorkerThreadID == CxPlatCurThreadID()) {
Expand Down Expand Up @@ -1303,60 +1300,36 @@ MsQuicStreamReceiveComplete(
(Connection->WorkerThreadID == CxPlatCurThreadID()) ||
!Connection->State.HandleClosed);

if (!Stream->Flags.Started || !Stream->Flags.ReceiveCallPending) {
QuicTraceEvent(
ApiError,
"[ api] Error %u",
(uint32_t)QUIC_STATUS_INVALID_STATE);
goto Exit;
}
QUIC_CONN_VERIFY(Connection, BufferLength <= Stream->RecvPendingLength);

QuicTraceEvent(
StreamAppReceiveCompleteCall,
"[strm][%p] Receive complete call [%llu bytes]",
Stream,
BufferLength);

InterlockedExchangeAdd64(
(int64_t*)&Stream->RecvCompletionLength, (int64_t)BufferLength);

if (Connection->WorkerThreadID == CxPlatCurThreadID() &&
Stream->Flags.ReceiveCallActive) {

CXPLAT_PASSIVE_CODE();

BOOLEAN AlreadyInline = Connection->State.InlineApiExecution;
if (!AlreadyInline) {
Connection->State.InlineApiExecution = TRUE;
}
QuicStreamReceiveCompleteInline(Stream, BufferLength);
if (!AlreadyInline) {
Connection->State.InlineApiExecution = FALSE;
}

goto Exit;
goto Exit; // No need to queue a completion operation when run inline
}

Oper = InterlockedFetchAndClearPointer((void**)&Stream->ReceiveCompleteOperation);
if (Oper == NULL) {
QuicTraceEvent(
ApiError,
"[ api] Error %u",
(uint32_t)QUIC_STATUS_NOT_SUPPORTED);
goto Exit; // Duplicate calls to receive complete
}

Oper->API_CALL.Context->STRM_RECV_COMPLETE.Stream = Stream;
Oper->API_CALL.Context->STRM_RECV_COMPLETE.BufferLength = BufferLength;

//
// Async stream operations need to hold a ref on the stream so that the
// stream isn't freed before the operation can be processed. The ref is
// released after the operation is processed.
//
QuicStreamAddRef(Stream, QUIC_STREAM_REF_OPERATION);
if (Oper) {
//
// Async stream operations need to hold a ref on the stream so that the
// stream isn't freed before the operation can be processed. The ref is
// released after the operation is processed.
//
QuicStreamAddRef(Stream, QUIC_STREAM_REF_OPERATION);

//
// Queue the operation but don't wait for the completion.
//
QuicConnQueueOper(Connection, Oper);
//
// Queue the operation but don't wait for the completion.
//
QuicConnQueueOper(Connection, Oper);
}

Exit:

Expand Down Expand Up @@ -1446,8 +1419,6 @@ MsQuicSetParam(
goto Error;
}

QUIC_CONN_VERIFY(Connection, !Connection->State.HandleClosed);

QUIC_OPERATION Oper = { 0 };
QUIC_API_CONTEXT ApiCtx;

Expand Down Expand Up @@ -1566,8 +1537,6 @@ MsQuicGetParam(
goto Error;
}

QUIC_CONN_VERIFY(Connection, !Connection->State.HandleClosed);

QUIC_OPERATION Oper = { 0 };
QUIC_API_CONTEXT ApiCtx;

Expand Down
14 changes: 8 additions & 6 deletions src/core/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -7339,6 +7339,9 @@ QuicConnProcessApiOperation(
)
{
QUIC_STATUS Status = QUIC_STATUS_SUCCESS;
QUIC_STATUS* ApiStatus = ApiCtx->Status;
CXPLAT_EVENT* ApiCompleted = ApiCtx->Completed;

switch (ApiCtx->Type) {

case QUIC_API_TYPE_CONN_CLOSE:
Expand Down Expand Up @@ -7425,8 +7428,7 @@ QuicConnProcessApiOperation(

case QUIC_API_TYPE_STRM_RECV_COMPLETE:
QuicStreamReceiveCompletePending(
ApiCtx->STRM_RECV_COMPLETE.Stream,
ApiCtx->STRM_RECV_COMPLETE.BufferLength);
ApiCtx->STRM_RECV_COMPLETE.Stream);
break;

case QUIC_API_TYPE_STRM_RECV_SET_ENABLED:
Expand Down Expand Up @@ -7464,11 +7466,11 @@ QuicConnProcessApiOperation(
break;
}

if (ApiCtx->Status) {
*ApiCtx->Status = Status;
if (ApiStatus) {
*ApiStatus = Status;
}
if (ApiCtx->Completed) {
CxPlatEventSet(*ApiCtx->Completed);
if (ApiCompleted) {
CxPlatEventSet(*ApiCompleted);
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/core/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,9 @@ typedef struct QUIC_SERIALIZED_RESUMPTION_STATE {
1024 /* Extra QUIC stuff */ \
)

#ifdef CxPlatVerifierEnabledByAddr
#if DEBUG // Enable all verifier checks in debug builds
#define QUIC_CONN_VERIFY(Connection, Expr) CXPLAT_FRE_ASSERT(Expr)
#elif defined(CxPlatVerifierEnabledByAddr)
#define QUIC_CONN_VERIFY(Connection, Expr) \
if (Connection->State.IsVerifying) { CXPLAT_FRE_ASSERT(Expr); }
#elif defined(CxPlatVerifierEnabled)
Expand Down
4 changes: 3 additions & 1 deletion src/core/library.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@ typedef struct QUIC_LIBRARY {

extern QUIC_LIBRARY MsQuicLib;

#ifdef CxPlatVerifierEnabled
#if DEBUG // Enable all verifier checks in debug builds
#define QUIC_LIB_VERIFY(Expr) CXPLAT_FRE_ASSERT(Expr)
#elif defined(CxPlatVerifierEnabled)
#define QUIC_LIB_VERIFY(Expr) \
if (MsQuicLib.IsVerifying) { CXPLAT_FRE_ASSERT(Expr); }
#else
Expand Down
3 changes: 3 additions & 0 deletions src/core/operation.c
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ QuicOperationQueueClear(
*ApiCtx->Status = QUIC_STATUS_INVALID_STATE;
CxPlatEventSet(*ApiCtx->Completed);
}
if (ApiCtx->Type == QUIC_API_TYPE_STRM_RECV_COMPLETE) {
QuicStreamRelease(ApiCtx->STRM_RECV_COMPLETE.Stream, QUIC_STREAM_REF_OPERATION);
}
}
}
}
Expand Down
1 change: 0 additions & 1 deletion src/core/operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ typedef struct QUIC_API_CONTEXT {
} STRM_SEND;
struct {
QUIC_STREAM* Stream;
uint64_t BufferLength;
} STRM_RECV_COMPLETE;
struct {
QUIC_STREAM* Stream;
Expand Down
10 changes: 6 additions & 4 deletions src/core/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ QuicStreamInitialize(
QuicRangeInitialize(
QUIC_MAX_RANGE_ALLOC_SIZE,
&Stream->SparseAckRanges);
Stream->ReceiveCompleteOperation = &Stream->ReceiveCompleteOperationStorage;
Stream->ReceiveCompleteOperationStorage.API_CALL.Context = &Stream->ReceiveCompleteApiCtxStorage;
Stream->ReceiveCompleteOperation->Type = QUIC_OPER_TYPE_API_CALL;
Stream->ReceiveCompleteOperation->FreeAfterProcess = FALSE;
Stream->ReceiveCompleteOperation->API_CALL.Context->Type = QUIC_API_TYPE_STRM_RECV_COMPLETE;
Stream->ReceiveCompleteOperation->API_CALL.Context->STRM_RECV_COMPLETE.Stream = Stream;
#if DEBUG
Stream->RefTypeCount[QUIC_STREAM_REF_APP] = 1;
#endif
Expand Down Expand Up @@ -195,10 +201,6 @@ QuicStreamFree(
CxPlatDispatchLockUninitialize(&Stream->ApiSendRequestLock);
CxPlatRefUninitialize(&Stream->RefCount);

if (Stream->ReceiveCompleteOperation) {
QuicOperationFree(Worker, Stream->ReceiveCompleteOperation);
}

if (Stream->RecvBuffer.PreallocatedChunk) {
CxPlatPoolFree(
&Worker->DefaultReceiveBufferPool,
Expand Down
32 changes: 8 additions & 24 deletions src/core/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ typedef union QUIC_STREAM_FLAGS {
BOOLEAN ReceiveEnabled : 1; // Application is ready for receive callbacks.
BOOLEAN ReceiveFlushQueued : 1; // The receive flush operation is queued.
BOOLEAN ReceiveDataPending : 1; // Data (or FIN) is queued and ready for delivery.
BOOLEAN ReceiveCallPending : 1; // There is an uncompleted receive to the app.
BOOLEAN ReceiveCallActive : 1; // There is an active receive to the app.
BOOLEAN SendDelayed : 1; // A delayed send is currently queued.
BOOLEAN CancelOnLoss : 1; // Indicates that the stream is to be canceled
Expand Down Expand Up @@ -392,11 +391,6 @@ typedef struct QUIC_STREAM {
uint64_t RecvWindowBytesDelivered;
uint64_t RecvWindowLastUpdate;

//
// Flags indicating the state of queued events.
//
uint8_t EventFlags;

//
// The structure for tracking received buffers.
//
Expand All @@ -408,21 +402,20 @@ typedef struct QUIC_STREAM {
uint64_t RecvMax0RttLength;

//
// Maximum allowed inbound byte offset, established when the FIN
// is received.
// Maximum allowed inbound byte offset, established when the FIN is received.
//
uint64_t RecvMaxLength;

//
// The length of the pending receive call to the app.
// The number of bytes that are currently outstanding up to the app.
//
uint64_t RecvPendingLength;

//
// The length of any inline receive complete call by the app. UINT64_MAX
// indicates that no inline call was made.
// The number of received bytes the app has completed but not yet processed
// by MsQuic.
//
uint64_t RecvInlineCompletionLength;
volatile uint64_t RecvCompletionLength;

//
// The error code for why the receive path was shutdown.
Expand All @@ -438,6 +431,8 @@ typedef struct QUIC_STREAM {
// Preallocated operation for receive complete
//
QUIC_OPERATION* ReceiveCompleteOperation;
QUIC_OPERATION ReceiveCompleteOperationStorage;
QUIC_API_CONTEXT ReceiveCompleteApiCtxStorage;

//
// Stream blocked timings.
Expand Down Expand Up @@ -972,18 +967,7 @@ QuicStreamRecvShutdown(
_IRQL_requires_max_(PASSIVE_LEVEL)
void
QuicStreamReceiveCompletePending(
_In_ QUIC_STREAM* Stream,
_In_ uint64_t BufferLength
);

//
// Completes a receive call inline from a callback.
//
_IRQL_requires_max_(PASSIVE_LEVEL)
void
QuicStreamReceiveCompleteInline(
_In_ QUIC_STREAM* Stream,
_In_ uint64_t BufferLength
_In_ QUIC_STREAM* Stream
);

//
Expand Down
Loading

0 comments on commit 925b669

Please sign in to comment.