diff --git a/src/core/stream.h b/src/core/stream.h index 6f78a8d8d6..e8b15b7154 100644 --- a/src/core/stream.h +++ b/src/core/stream.h @@ -241,6 +241,13 @@ typedef struct QUIC_STREAM { CXPLAT_LIST_ENTRY ClosedLink; }; + // + // TODO guhetier: Check if can be moved in the union or if need to be in hashtable + this at the same time. + // The entry in the connection's list of streams waiting on stream + // id flow control. + // + CXPLAT_LIST_ENTRY WaitingForIdFlowControlLink; + // // The list entry in the output module's send list. // diff --git a/src/core/stream_set.c b/src/core/stream_set.c index fe7d71009b..7c55e1a559 100644 --- a/src/core/stream_set.c +++ b/src/core/stream_set.c @@ -49,6 +49,7 @@ QuicStreamSetInitialize( ) { CxPlatListInitializeHead(&StreamSet->ClosedStreams); + CxPlatListInitializeHead(&StreamSet->WaitingStreams); #if DEBUG CxPlatListInitializeHead(&StreamSet->AllStreams); CxPlatDispatchLockInitialize(&StreamSet->AllStreamsLock); @@ -186,6 +187,7 @@ QuicStreamSetReleaseStream( // Remove the stream from the list of open streams. // CxPlatHashtableRemove(StreamSet->StreamTable, &Stream->TableEntry, NULL); + CxPlatListEntryRemove(&Stream->WaitingForIdFlowControlLink); CxPlatListInsertTail(&StreamSet->ClosedStreams, &Stream->ClosedLink); uint8_t Flags = (uint8_t)(Stream->ID & STREAM_ID_MASK); @@ -319,6 +321,7 @@ QuicStreamSetInitializeTransportParameters( if (Info->MaxTotalStreamCount >= StreamCount && Stream->OutFlowBlockedReasons & QUIC_FLOW_BLOCKED_STREAM_ID_FLOW_CONTROL) { FlowBlockedFlagsToRemove |= QUIC_FLOW_BLOCKED_STREAM_ID_FLOW_CONTROL; + CxPlatListEntryRemove(&Stream->WaitingForIdFlowControlLink); QuicStreamIndicatePeerAccepted(Stream); } else { QuicSendSetSendFlag( @@ -401,26 +404,34 @@ QuicStreamSetUpdateMaxStreams( MaxStreams); BOOLEAN FlushSend = FALSE; - if (StreamSet->StreamTable != NULL) { - - CXPLAT_HASHTABLE_ENUMERATOR Enumerator; - CXPLAT_HASHTABLE_ENTRY* Entry; - CxPlatHashtableEnumerateBegin(StreamSet->StreamTable, &Enumerator); - while ((Entry = CxPlatHashtableEnumerateNext(StreamSet->StreamTable, &Enumerator)) != NULL) { - QUIC_STREAM* Stream = CXPLAT_CONTAINING_RECORD(Entry, QUIC_STREAM, TableEntry); - - uint64_t Count = (Stream->ID >> 2) + 1; - - if ((Stream->ID & STREAM_ID_MASK) == Mask && - Count > Info->MaxTotalStreamCount && - Count <= MaxStreams && - QuicStreamRemoveOutFlowBlockedReason( - Stream, QUIC_FLOW_BLOCKED_STREAM_ID_FLOW_CONTROL)) { - QuicStreamIndicatePeerAccepted(Stream); - FlushSend = TRUE; - } + + // + // Unblock streams that now fit in the new peer limits. + // The list is ordered so we can exit as soon as we reach the limits. + // + CXPLAT_LIST_ENTRY *Link = StreamSet->WaitingStreams.Flink; + while (Link != &StreamSet->WaitingStreams) { + QUIC_STREAM* Stream = + CXPLAT_CONTAINING_RECORD(Link, QUIC_STREAM, WaitingForIdFlowControlLink); + Link = Link->Flink; + + uint64_t Index = (Stream->ID >> 2); + CXPLAT_DBG_ASSERT(Index >= Info->MaxTotalStreamCount); + if (Index >= MaxStreams) { + break; + } + + if ((Stream->ID & STREAM_ID_MASK) != Mask) { + continue; } - CxPlatHashtableEnumerateEnd(StreamSet->StreamTable, &Enumerator); + + if (!QuicStreamRemoveOutFlowBlockedReason( + Stream, QUIC_FLOW_BLOCKED_STREAM_ID_FLOW_CONTROL)) { + CXPLAT_DBG_ASSERTMSG(FALSE, "Stream should be blocked by id flow control"); + } + CxPlatListEntryRemove(&Stream->WaitingForIdFlowControlLink); + QuicStreamIndicatePeerAccepted(Stream); + FlushSend = TRUE; } Info->MaxTotalStreamCount = MaxStreams; @@ -554,6 +565,21 @@ QuicStreamSetNewLocalStream( } if (NewStreamBlocked) { + // + // Insert the stream into the list of streams waiting for stream id flow control. + // Make sure to keep the list ordered by stream ID. + // + CXPLAT_LIST_ENTRY* Link = StreamSet->WaitingStreams.Blink; + while (Link != &StreamSet->WaitingStreams) { + QUIC_STREAM* StreamIt = + CXPLAT_CONTAINING_RECORD(Link, QUIC_STREAM, WaitingForIdFlowControlLink); + if (StreamIt->ID < NewStreamId) { + break; + } + Link = Link->Blink; + } + CxPlatListInsertTail(Link, &Stream->WaitingForIdFlowControlLink); + // // We don't call QuicStreamAddOutFlowBlockedReason here because we haven't // logged the stream created event yet at this point. We will log the event diff --git a/src/core/stream_set.h b/src/core/stream_set.h index b489fa8b67..35fc171080 100644 --- a/src/core/stream_set.h +++ b/src/core/stream_set.h @@ -46,6 +46,11 @@ typedef struct QUIC_STREAM_SET { // CXPLAT_HASHTABLE* StreamTable; + // + // The list of streams that are waiting for stream id flow control. + // + CXPLAT_LIST_ENTRY WaitingStreams; + // // The list of streams that are completely closed and need to be released. //