Skip to content

Commit

Permalink
Store stream waiting for stream id control flow in a list
Browse files Browse the repository at this point in the history
  • Loading branch information
guhetier committed Feb 25, 2025
1 parent b7e9a8b commit e450e8a
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 19 deletions.
7 changes: 7 additions & 0 deletions src/core/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@ typedef struct QUIC_STREAM {
CXPLAT_LIST_ENTRY ClosedLink;
};

//
// TODO guhetier: Check if can be moved in the union or if need to be in hashtable + this at the same time.
// The entry in the connection's list of streams waiting on stream
// id flow control.
//
CXPLAT_LIST_ENTRY WaitingForIdFlowControlLink;

//
// The list entry in the output module's send list.
//
Expand Down
64 changes: 45 additions & 19 deletions src/core/stream_set.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ QuicStreamSetInitialize(
)
{
CxPlatListInitializeHead(&StreamSet->ClosedStreams);
CxPlatListInitializeHead(&StreamSet->WaitingStreams);
#if DEBUG
CxPlatListInitializeHead(&StreamSet->AllStreams);
CxPlatDispatchLockInitialize(&StreamSet->AllStreamsLock);
Expand Down Expand Up @@ -186,6 +187,7 @@ QuicStreamSetReleaseStream(
// Remove the stream from the list of open streams.
//
CxPlatHashtableRemove(StreamSet->StreamTable, &Stream->TableEntry, NULL);
CxPlatListEntryRemove(&Stream->WaitingForIdFlowControlLink);
CxPlatListInsertTail(&StreamSet->ClosedStreams, &Stream->ClosedLink);

uint8_t Flags = (uint8_t)(Stream->ID & STREAM_ID_MASK);
Expand Down Expand Up @@ -319,6 +321,7 @@ QuicStreamSetInitializeTransportParameters(
if (Info->MaxTotalStreamCount >= StreamCount &&
Stream->OutFlowBlockedReasons & QUIC_FLOW_BLOCKED_STREAM_ID_FLOW_CONTROL) {
FlowBlockedFlagsToRemove |= QUIC_FLOW_BLOCKED_STREAM_ID_FLOW_CONTROL;
CxPlatListEntryRemove(&Stream->WaitingForIdFlowControlLink);
QuicStreamIndicatePeerAccepted(Stream);
} else {
QuicSendSetSendFlag(
Expand Down Expand Up @@ -401,26 +404,34 @@ QuicStreamSetUpdateMaxStreams(
MaxStreams);

BOOLEAN FlushSend = FALSE;
if (StreamSet->StreamTable != NULL) {

CXPLAT_HASHTABLE_ENUMERATOR Enumerator;
CXPLAT_HASHTABLE_ENTRY* Entry;
CxPlatHashtableEnumerateBegin(StreamSet->StreamTable, &Enumerator);
while ((Entry = CxPlatHashtableEnumerateNext(StreamSet->StreamTable, &Enumerator)) != NULL) {
QUIC_STREAM* Stream = CXPLAT_CONTAINING_RECORD(Entry, QUIC_STREAM, TableEntry);

uint64_t Count = (Stream->ID >> 2) + 1;

if ((Stream->ID & STREAM_ID_MASK) == Mask &&
Count > Info->MaxTotalStreamCount &&
Count <= MaxStreams &&
QuicStreamRemoveOutFlowBlockedReason(
Stream, QUIC_FLOW_BLOCKED_STREAM_ID_FLOW_CONTROL)) {
QuicStreamIndicatePeerAccepted(Stream);
FlushSend = TRUE;
}

//
// Unblock streams that now fit in the new peer limits.
// The list is ordered so we can exit as soon as we reach the limits.
//
CXPLAT_LIST_ENTRY *Link = StreamSet->WaitingStreams.Flink;
while (Link != &StreamSet->WaitingStreams) {
QUIC_STREAM* Stream =

Check warning on line 414 in src/core/stream_set.c

View check run for this annotation

Codecov / codecov/patch

src/core/stream_set.c#L412-L414

Added lines #L412 - L414 were not covered by tests
CXPLAT_CONTAINING_RECORD(Link, QUIC_STREAM, WaitingForIdFlowControlLink);
Link = Link->Flink;

Check warning on line 416 in src/core/stream_set.c

View check run for this annotation

Codecov / codecov/patch

src/core/stream_set.c#L416

Added line #L416 was not covered by tests

uint64_t Index = (Stream->ID >> 2);
CXPLAT_DBG_ASSERT(Index >= Info->MaxTotalStreamCount);
if (Index >= MaxStreams) {
break;

Check warning on line 421 in src/core/stream_set.c

View check run for this annotation

Codecov / codecov/patch

src/core/stream_set.c#L418-L421

Added lines #L418 - L421 were not covered by tests
}

if ((Stream->ID & STREAM_ID_MASK) != Mask) {
continue;

Check warning on line 425 in src/core/stream_set.c

View check run for this annotation

Codecov / codecov/patch

src/core/stream_set.c#L424-L425

Added lines #L424 - L425 were not covered by tests
}
CxPlatHashtableEnumerateEnd(StreamSet->StreamTable, &Enumerator);

if (!QuicStreamRemoveOutFlowBlockedReason(

Check warning on line 428 in src/core/stream_set.c

View check run for this annotation

Codecov / codecov/patch

src/core/stream_set.c#L428

Added line #L428 was not covered by tests
Stream, QUIC_FLOW_BLOCKED_STREAM_ID_FLOW_CONTROL)) {
CXPLAT_DBG_ASSERTMSG(FALSE, "Stream should be blocked by id flow control");

Check warning on line 430 in src/core/stream_set.c

View check run for this annotation

Codecov / codecov/patch

src/core/stream_set.c#L430

Added line #L430 was not covered by tests
}
CxPlatListEntryRemove(&Stream->WaitingForIdFlowControlLink);
QuicStreamIndicatePeerAccepted(Stream);
FlushSend = TRUE;

Check warning on line 434 in src/core/stream_set.c

View check run for this annotation

Codecov / codecov/patch

src/core/stream_set.c#L432-L434

Added lines #L432 - L434 were not covered by tests
}

Info->MaxTotalStreamCount = MaxStreams;
Expand Down Expand Up @@ -554,6 +565,21 @@ QuicStreamSetNewLocalStream(
}

if (NewStreamBlocked) {
//
// Insert the stream into the list of streams waiting for stream id flow control.
// Make sure to keep the list ordered by stream ID.
//
CXPLAT_LIST_ENTRY* Link = StreamSet->WaitingStreams.Blink;
while (Link != &StreamSet->WaitingStreams) {
QUIC_STREAM* StreamIt =
CXPLAT_CONTAINING_RECORD(Link, QUIC_STREAM, WaitingForIdFlowControlLink);
if (StreamIt->ID < NewStreamId) {
break;
}
Link = Link->Blink;
}
CxPlatListInsertTail(Link, &Stream->WaitingForIdFlowControlLink);

//
// We don't call QuicStreamAddOutFlowBlockedReason here because we haven't
// logged the stream created event yet at this point. We will log the event
Expand Down
5 changes: 5 additions & 0 deletions src/core/stream_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ typedef struct QUIC_STREAM_SET {
//
CXPLAT_HASHTABLE* StreamTable;

//
// The list of streams that are waiting for stream id flow control.
//
CXPLAT_LIST_ENTRY WaitingStreams;

//
// The list of streams that are completely closed and need to be released.
//
Expand Down

0 comments on commit e450e8a

Please sign in to comment.