Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'cancel on loss' send mode to MsQuicStream. #4037

Merged
merged 12 commits into from
Jan 8, 2024
4 changes: 3 additions & 1 deletion src/core/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ typedef union QUIC_STREAM_FLAGS {
BOOLEAN LocalCloseReset : 1; // Locally closed (locally aborted).
BOOLEAN LocalCloseResetReliable : 1; // Indicates that we should shutdown the send path once we sent/ACK'd ReliableOffsetSend bytes.
BOOLEAN LocalCloseResetReliableAcked : 1; // Indicates the peer has acknowledged we will stop sending once we sent/ACK'd ReliableOffsetSend bytes.
BOOLEAN RemoteCloseResetReliable : 1; // Indicates that the peer initiaited a reliable reset. Keep Recv path available for RecvMaxLength bytes.
BOOLEAN RemoteCloseResetReliable : 1; // Indicates that the peer initiated a reliable reset. Keep Recv path available for RecvMaxLength bytes.
BOOLEAN ReceivedStopSending : 1; // Peer sent STOP_SENDING frame.
BOOLEAN LocalCloseAcked : 1; // Any close acknowledged.
BOOLEAN FinAcked : 1; // Our FIN was acknowledged.
Expand All @@ -144,6 +144,8 @@ typedef union QUIC_STREAM_FLAGS {
BOOLEAN ReceiveCallPending : 1; // There is an uncompleted receive to the app.
BOOLEAN ReceiveCallActive : 1; // There is an active receive to the app.
BOOLEAN SendDelayed : 1; // A delayed send is currently queued.
BOOLEAN CancelOnLoss : 1; // Indicates that the stream is to be canceled
// if loss is detected.

BOOLEAN HandleSendShutdown : 1; // Send shutdown complete callback delivered.
BOOLEAN HandleShutdown : 1; // Shutdown callback delivered.
Expand Down
34 changes: 32 additions & 2 deletions src/core/stream_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ QuicStreamSendShutdown(

while (ApiSendRequests != NULL) {
//
// These sends were queued by the app after queueing a graceful
// These sends were queued by the app after queuing a graceful
// shutdown. Bad app!
//
QUIC_SEND_REQUEST* SendRequest = ApiSendRequests;
Expand All @@ -171,7 +171,7 @@ QuicStreamSendShutdown(

} else if (Stream->ReliableOffsetSend == 0 || Stream->Flags.LocalCloseResetReliable) {
//
// Enter abortive branch if we are not aborting reliablely or we have done it already.
// Enter abortive branch if we are not aborting reliably or we have done it already.
// Essentially, Reset trumps Reliable Reset, so if we have to call shutdown again, we reset.
//

Expand Down Expand Up @@ -613,6 +613,15 @@ QuicStreamSendFlush(

CXPLAT_DBG_ASSERT(!(SendRequest->Flags & QUIC_SEND_FLAG_BUFFERED));

//
// If a send has the 'cancel on loss' flag set, we irreversibly switch
// the associated stream over to that behavior.
//
if (!Stream->Flags.CancelOnLoss &&
(SendRequest->Flags & QUIC_SEND_FLAG_CANCEL_ON_LOSS) != 0) {
Stream->Flags.CancelOnLoss = TRUE;
}

if (!Stream->Flags.SendEnabled) {
//
// Only possible if they queue multiple sends, with a FIN flag set
Expand Down Expand Up @@ -1364,6 +1373,27 @@ QuicStreamOnLoss(
Done:

if (AddSendFlags != 0) {
//
// Check stream's 'cancel on loss' flag to determine how to handle
// the resends queued up at this point.
//
if (Stream->Flags.CancelOnLoss) {
QUIC_STREAM_EVENT Event;
Event.Type = QUIC_STREAM_EVENT_CANCEL_ON_LOSS;
Event.CANCEL_ON_LOSS.ErrorCode = 0;
(void)QuicStreamIndicateEvent(Stream, &Event);

//
// Immediately terminate stream (in both directions, if open)
// giving the error code from the app.
//
QuicStreamShutdown(
Stream,
QUIC_STREAM_SHUTDOWN_FLAG_ABORT,
Event.CANCEL_ON_LOSS.ErrorCode);

return FALSE; // Don't resend any data.
}

if (!Stream->Flags.InRecovery) {
Stream->Flags.InRecovery = TRUE; // TODO - Do we really need to be in recovery if no real data bytes need to be recovered?
Expand Down
20 changes: 20 additions & 0 deletions src/cs/lib/msquic_generated.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ internal enum QUIC_SEND_FLAGS
FIN = 0x0004,
DGRAM_PRIORITY = 0x0008,
DELAY_SEND = 0x0010,
CANCEL_ON_LOSS = 0x0020,
}

internal enum QUIC_DATAGRAM_SEND_STATE
Expand Down Expand Up @@ -2746,6 +2747,7 @@ internal enum QUIC_STREAM_EVENT_TYPE
SHUTDOWN_COMPLETE = 7,
IDEAL_SEND_BUFFER_SIZE = 8,
PEER_ACCEPTED = 9,
CANCEL_ON_LOSS = 10,
}

internal partial struct QUIC_STREAM_EVENT
Expand Down Expand Up @@ -2819,6 +2821,14 @@ internal ref _Anonymous_e__Union._IDEAL_SEND_BUFFER_SIZE_e__Struct IDEAL_SEND_BU
}
}

internal ref _Anonymous_e__Union._CANCEL_ON_LOSS_e__Struct CANCEL_ON_LOSS
{
get
{
return ref MemoryMarshal.GetReference(MemoryMarshal.CreateSpan(ref Anonymous.CANCEL_ON_LOSS, 1));
}
}

[StructLayout(LayoutKind.Explicit)]
internal partial struct _Anonymous_e__Union
{
Expand Down Expand Up @@ -2854,6 +2864,10 @@ internal partial struct _Anonymous_e__Union
[NativeTypeName("struct (anonymous struct)")]
internal _IDEAL_SEND_BUFFER_SIZE_e__Struct IDEAL_SEND_BUFFER_SIZE;

[FieldOffset(0)]
[NativeTypeName("struct (anonymous struct)")]
internal _CANCEL_ON_LOSS_e__Struct CANCEL_ON_LOSS;

internal partial struct _START_COMPLETE_e__Struct
{
[NativeTypeName("HRESULT")]
Expand Down Expand Up @@ -3011,6 +3025,12 @@ internal partial struct _IDEAL_SEND_BUFFER_SIZE_e__Struct
[NativeTypeName("uint64_t")]
internal ulong ByteCount;
}

internal partial struct _CANCEL_ON_LOSS_e__Struct
{
[NativeTypeName("QUIC_UINT62")]
internal ulong ErrorCode;
}
}
}

Expand Down
7 changes: 6 additions & 1 deletion src/inc/msquic.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,14 @@ typedef enum QUIC_SEND_FLAGS {
QUIC_SEND_FLAG_FIN = 0x0004, // Indicates the request is the one last sent on the stream.
QUIC_SEND_FLAG_DGRAM_PRIORITY = 0x0008, // Indicates the datagram is higher priority than others.
QUIC_SEND_FLAG_DELAY_SEND = 0x0010, // Indicates the send should be delayed because more will be queued soon.
QUIC_SEND_FLAG_CANCEL_ON_LOSS = 0x0020, // Indicates that a stream is to be cancelled when packet loss is detected.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to need to update docs for this too.

} QUIC_SEND_FLAGS;

DEFINE_ENUM_FLAG_OPERATORS(QUIC_SEND_FLAGS)

typedef enum QUIC_DATAGRAM_SEND_STATE {
QUIC_DATAGRAM_SEND_UNKNOWN, // Not yet sent.
QUIC_DATAGRAM_SEND_SENT, // Sent and awaiting acknowledegment
QUIC_DATAGRAM_SEND_SENT, // Sent and awaiting acknowledgment
QUIC_DATAGRAM_SEND_LOST_SUSPECT, // Suspected as lost, but still tracked
QUIC_DATAGRAM_SEND_LOST_DISCARDED, // Lost and not longer being tracked
QUIC_DATAGRAM_SEND_ACKNOWLEDGED, // Acknowledged
Expand Down Expand Up @@ -1385,6 +1386,7 @@ typedef enum QUIC_STREAM_EVENT_TYPE {
QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE = 7,
QUIC_STREAM_EVENT_IDEAL_SEND_BUFFER_SIZE = 8,
QUIC_STREAM_EVENT_PEER_ACCEPTED = 9,
QUIC_STREAM_EVENT_CANCEL_ON_LOSS = 10,
} QUIC_STREAM_EVENT_TYPE;

typedef struct QUIC_STREAM_EVENT {
Expand Down Expand Up @@ -1430,6 +1432,9 @@ typedef struct QUIC_STREAM_EVENT {
struct {
uint64_t ByteCount;
} IDEAL_SEND_BUFFER_SIZE;
struct {
/* out */ QUIC_UINT62 ErrorCode;
} CANCEL_ON_LOSS;
};
} QUIC_STREAM_EVENT;

Expand Down
2 changes: 1 addition & 1 deletion src/inc/msquic.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ struct MsQuicListener {
QUIC_STATUS
Start(
_In_ const MsQuicAlpn& Alpns,
_In_ const QUIC_ADDR* Address = nullptr
_In_opt_ const QUIC_ADDR* Address = nullptr
) noexcept {
return MsQuic->ListenerStart(Handle, Alpns, Alpns.Length(), Address);
}
Expand Down
2 changes: 2 additions & 0 deletions src/plugins/dbg/quictypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ typedef union QUIC_STREAM_FLAGS {
BOOLEAN ReceiveCallPending : 1; // There is an uncompleted receive to the app.
BOOLEAN ReceiveCallActive : 1; // There is an active receive to the app.
BOOLEAN SendDelayed : 1; // A delayed send is currently queued.
BOOLEAN CancelOnLoss : 1; // Indicates that the stream is to be canceled
// if loss is detected.

BOOLEAN HandleSendShutdown : 1; // Send shutdown complete callback delivered.
BOOLEAN HandleShutdown : 1; // Shutdown callback delivered.
Expand Down
20 changes: 19 additions & 1 deletion src/test/MsQuicTests.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,11 @@ QuicAbortiveTransfers(
_In_ QUIC_ABORTIVE_TRANSFER_FLAGS Flags
);

void
QuicCancelOnLossSend(
_In_ bool DropPackets
);

void
QuicTestCidUpdate(
_In_ int Family,
Expand Down Expand Up @@ -1246,4 +1251,17 @@ typedef struct {
#define IOCTL_QUIC_RUN_CONN_CLOSE_BEFORE_STREAM_CLOSE \
QUIC_CTL_CODE(117, METHOD_BUFFERED, FILE_WRITE_DATA)

#define QUIC_MAX_IOCTL_FUNC_CODE 117
#pragma pack(push)
#pragma pack(1)

typedef struct {
bool DropPackets;
} QUIC_RUN_CANCEL_ON_LOSS_PARAMS;

#pragma pack(pop)

#define IOCTL_QUIC_RUN_CANCEL_ON_LOSS \
QUIC_CTL_CODE(118, METHOD_BUFFERED, FILE_WRITE_DATA)
// QUIC_RUN_CANCEL_ON_LOSS_PARAMS

#define QUIC_MAX_IOCTL_FUNC_CODE 118
23 changes: 23 additions & 0 deletions src/test/bin/quic_gtest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1967,6 +1967,20 @@ TEST_P(WithAbortiveArgs, AbortiveShutdown) {
}
}

#if QUIC_TEST_DATAPATH_HOOKS_ENABLED
TEST_P(WithCancelOnLossArgs, CancelOnLossSend) {
TestLoggerT<ParamType> Logger("QuicCancelOnLossSend", GetParam());
if (TestingKernelMode) {
QUIC_RUN_CANCEL_ON_LOSS_PARAMS Params = {
GetParam().DropPackets
};
ASSERT_TRUE(DriverClient.Run(IOCTL_QUIC_RUN_CANCEL_ON_LOSS, Params));
} else {
QuicCancelOnLossSend(GetParam().DropPackets);
}
}
#endif

TEST_P(WithCidUpdateArgs, CidUpdate) {
TestLoggerT<ParamType> Logger("QuicTestCidUpdate", GetParam());
if (TestingKernelMode) {
Expand Down Expand Up @@ -2431,6 +2445,15 @@ INSTANTIATE_TEST_SUITE_P(
WithAbortiveArgs,
testing::ValuesIn(AbortiveArgs::Generate()));

#if QUIC_TEST_DATAPATH_HOOKS_ENABLED

INSTANTIATE_TEST_SUITE_P(
Misc,
WithCancelOnLossArgs,
testing::ValuesIn(CancelOnLossArgs::Generate()));

#endif

INSTANTIATE_TEST_SUITE_P(
Misc,
WithCidUpdateArgs,
Expand Down
18 changes: 18 additions & 0 deletions src/test/bin/quic_gtest.h
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,24 @@ class WithAbortiveArgs : public testing::Test,
public testing::WithParamInterface<AbortiveArgs> {
};

struct CancelOnLossArgs {
bool DropPackets;
static ::std::vector<CancelOnLossArgs> Generate() {
::std::vector<CancelOnLossArgs> list;
for (bool DropPackets : {false, true})
list.push_back({ DropPackets });
return list;
}
};

std::ostream& operator << (std::ostream& o, const CancelOnLossArgs& args) {
return o << "DropPackets: " << (args.DropPackets ? "true" : "false");
}

class WithCancelOnLossArgs : public testing::Test,
public testing::WithParamInterface<CancelOnLossArgs> {
};

struct CidUpdateArgs {
int Family;
uint16_t Iterations;
Expand Down
7 changes: 7 additions & 0 deletions src/test/bin/winkernel/control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ size_t QUIC_IOCTL_BUFFER_SIZES[] =
0,
sizeof(INT32),
0,
sizeof(QUIC_RUN_CANCEL_ON_LOSS_PARAMS),
};

CXPLAT_STATIC_ASSERT(
Expand All @@ -646,6 +647,7 @@ typedef union {
QUIC_RUN_ABORTIVE_SHUTDOWN_PARAMS Params4;
QUIC_RUN_CID_UPDATE_PARAMS Params5;
QUIC_RUN_RECEIVE_RESUME_PARAMS Params6;
QUIC_RUN_CANCEL_ON_LOSS_PARAMS Params7;
UINT8 EnableKeepAlive;
UINT8 StopListenerFirst;
QUIC_RUN_DRILL_INITIAL_PACKET_CID_PARAMS DrillParams;
Expand Down Expand Up @@ -1532,6 +1534,11 @@ QuicTestCtlEvtIoDeviceControl(
QuicTestCtlRun(QuicTestConnectionCloseBeforeStreamClose());
break;

case IOCTL_QUIC_RUN_CANCEL_ON_LOSS:
CXPLAT_FRE_ASSERT(Params != nullptr);
QuicTestCtlRun(QuicCancelOnLossSend(Params->Params7.DropPackets));
break;

default:
Status = STATUS_NOT_IMPLEMENTED;
break;
Expand Down
Loading
Loading