Skip to content

Commit f8f2029

Browse files
Memory Optimization - Refactor TWCC (#1934) (#2075)
* Refactor TWCC (#1934) * Remove stackqueue usage * Use hashtable instead..working logic * Cleanup, increase hash table size, fix loop bug * method 2, array of pointers * Add rolling window * rolling window with hashtable * hash table with rw * Fix bug * Fix twcc unit test * Cleanup rw logic * Cleanup * Cleanup logic * Update README * unused var fix * Use defines for hash table size * Address comments, disable TWCC by default * readme * Fix windows gst issue * Comments * Remove enableIceStats merge conflict * Address comments * Fix left over typo * Address comments, refactor updaing of TWCC hash table in onRtcpTwccPacket for testing * Clang format, fix compiler Werror * Correct typos * Fix "expression result unused" Werrors * Remove unused variable * Address comments * Fix endless loop * Address comment * Add removal of null hashTable items, add test for null item in hashTable * Address comments * Cleanup freeing of KvsPeerConnection * Clang format --------- Co-authored-by: Divya Sampath Kumar <disa6302@colorado.edu>
1 parent 2f1a8af commit f8f2029

File tree

6 files changed

+356
-94
lines changed

6 files changed

+356
-94
lines changed

src/source/PeerConnection/PeerConnection.c

+99-35
Original file line numberDiff line numberDiff line change
@@ -1005,6 +1005,8 @@ STATUS createPeerConnection(PRtcConfiguration pConfiguration, PRtcPeerConnection
10051005
pKvsPeerConnection->twccLock = MUTEX_CREATE(TRUE);
10061006
pKvsPeerConnection->pTwccManager = (PTwccManager) MEMCALLOC(1, SIZEOF(TwccManager));
10071007
CHK(pKvsPeerConnection->pTwccManager != NULL, STATUS_NOT_ENOUGH_MEMORY);
1008+
CHK_STATUS(hashTableCreateWithParams(TWCC_HASH_TABLE_BUCKET_COUNT, TWCC_HASH_TABLE_BUCKET_LENGTH,
1009+
&pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable));
10081010
}
10091011

10101012
*ppPeerConnection = (PRtcPeerConnection) pKvsPeerConnection;
@@ -1041,10 +1043,12 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection)
10411043
{
10421044
ENTERS();
10431045
STATUS retStatus = STATUS_SUCCESS;
1044-
PKvsPeerConnection pKvsPeerConnection;
1046+
PKvsPeerConnection pKvsPeerConnection = NULL;
10451047
PDoubleListNode pCurNode = NULL;
10461048
UINT64 item = 0;
10471049
UINT64 startTime;
1050+
UINT32 twccHashTableCount = 0;
1051+
BOOL twccLocked = FALSE;
10481052

10491053
CHK(ppPeerConnection != NULL, STATUS_NULL_ARG);
10501054

@@ -1113,20 +1117,34 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection)
11131117
}
11141118

11151119
if (pKvsPeerConnection->pTwccManager != NULL) {
1116-
if (IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) {
1117-
MUTEX_FREE(pKvsPeerConnection->twccLock);
1120+
MUTEX_LOCK(pKvsPeerConnection->twccLock);
1121+
twccLocked = TRUE;
1122+
1123+
if (STATUS_SUCCEEDED(hashTableGetCount(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, &twccHashTableCount))) {
1124+
DLOGI("Number of TWCC info packets in memory: %d", twccHashTableCount);
11181125
}
1119-
// twccManager.twccPackets contains sequence numbers of packets (as opposed to pointers to actual packets)
1120-
// we should not deallocate items but we do need to clear the queue
1121-
CHK_LOG_ERR(stackQueueClear(&pKvsPeerConnection->pTwccManager->twccPackets, FALSE));
1126+
1127+
CHK_LOG_ERR(hashTableIterateEntries(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, 0, freeHashEntry));
1128+
CHK_LOG_ERR(hashTableFree(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable));
1129+
11221130
SAFE_MEMFREE(pKvsPeerConnection->pTwccManager);
11231131
}
11241132

11251133
// Incase the `RemoteSessionDescription` has not already been freed.
11261134
SAFE_MEMFREE(pKvsPeerConnection->pRemoteSessionDescription);
11271135

1136+
if (IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) {
1137+
if (twccLocked) {
1138+
MUTEX_UNLOCK(pKvsPeerConnection->twccLock);
1139+
twccLocked = FALSE;
1140+
}
1141+
MUTEX_FREE(pKvsPeerConnection->twccLock);
1142+
pKvsPeerConnection->twccLock = INVALID_MUTEX_VALUE;
1143+
}
1144+
11281145
PROFILE_WITH_START_TIME_OBJ(startTime, pKvsPeerConnection->peerConnectionDiagnostics.freePeerConnectionTime, "Free peer connection");
11291146
SAFE_MEMFREE(*ppPeerConnection);
1147+
11301148
CleanUp:
11311149

11321150
LEAVES();
@@ -1826,47 +1844,93 @@ STATUS deinitKvsWebRtc(VOID)
18261844
return retStatus;
18271845
}
18281846

1829-
STATUS twccManagerOnPacketSent(PKvsPeerConnection pc, PRtpPacket pRtpPacket)
1847+
// Not thread safe. Ensure this function is invoked in a guarded section
1848+
static STATUS twccRollingWindowDeletion(PKvsPeerConnection pKvsPeerConnection, PRtpPacket pRtpPacket, UINT16 endingSeqNum)
1849+
{
1850+
ENTERS();
1851+
STATUS retStatus = STATUS_SUCCESS;
1852+
UINT16 updatedSeqNum = 0;
1853+
PTwccRtpPacketInfo tempTwccRtpPktInfo = NULL;
1854+
UINT64 ageOfOldest = 0, firstRtpTime = 0;
1855+
UINT64 twccPacketValue = 0;
1856+
BOOL isCheckComplete = FALSE;
1857+
1858+
CHK(pKvsPeerConnection != NULL && pRtpPacket != NULL && pKvsPeerConnection->pTwccManager != NULL, STATUS_NULL_ARG);
1859+
1860+
updatedSeqNum = pKvsPeerConnection->pTwccManager->firstSeqNumInRollingWindow;
1861+
do {
1862+
// If the seqNum is not present in the hash table, it is ok. We move on to the next
1863+
if (STATUS_SUCCEEDED(hashTableGet(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum, &twccPacketValue))) {
1864+
tempTwccRtpPktInfo = (PTwccRtpPacketInfo) twccPacketValue;
1865+
if (tempTwccRtpPktInfo != NULL) {
1866+
firstRtpTime = tempTwccRtpPktInfo->localTimeKvs;
1867+
// Would be the case if the timestamps are not monotonically increasing.
1868+
if (pRtpPacket->sentTime >= firstRtpTime) {
1869+
ageOfOldest = pRtpPacket->sentTime - firstRtpTime;
1870+
if (ageOfOldest > TWCC_ESTIMATOR_TIME_WINDOW) {
1871+
// If the seqNum is not present in the hash table, move on. However, this case should not happen
1872+
// given this function is holding the lock and tempTwccRtpPktInfo is populated because it exists
1873+
if (STATUS_SUCCEEDED(hashTableRemove(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum))) {
1874+
SAFE_MEMFREE(tempTwccRtpPktInfo);
1875+
}
1876+
updatedSeqNum++;
1877+
} else {
1878+
isCheckComplete = TRUE;
1879+
}
1880+
} else {
1881+
// Move to the next seqNum to check if we can remove the next one atleast
1882+
DLOGV("Non-monotonic timestamp detected for RTP packet seqNum %d [ts: %" PRIu64 ". Current RTP packets' ts: %" PRIu64,
1883+
updatedSeqNum, firstRtpTime, pRtpPacket->sentTime);
1884+
updatedSeqNum++;
1885+
}
1886+
} else {
1887+
CHK_STATUS(hashTableRemove(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum));
1888+
updatedSeqNum++;
1889+
}
1890+
} else {
1891+
updatedSeqNum++;
1892+
}
1893+
// reset before next iteration
1894+
tempTwccRtpPktInfo = NULL;
1895+
} while (!isCheckComplete && updatedSeqNum != (endingSeqNum + 1));
1896+
1897+
// Update regardless. The loop checks until current RTP packets seq number irrespective of the failure
1898+
pKvsPeerConnection->pTwccManager->firstSeqNumInRollingWindow = updatedSeqNum;
1899+
CleanUp:
1900+
CHK_LOG_ERR(retStatus);
1901+
1902+
LEAVES();
1903+
return retStatus;
1904+
}
1905+
1906+
STATUS twccManagerOnPacketSent(PKvsPeerConnection pKvsPeerConnection, PRtpPacket pRtpPacket)
18301907
{
18311908
ENTERS();
18321909
STATUS retStatus = STATUS_SUCCESS;
18331910
BOOL locked = FALSE;
1834-
UINT64 sn = 0;
1835-
UINT16 seqNum;
1836-
BOOL isEmpty = FALSE;
1837-
INT64 firstTimeKvs, lastLocalTimeKvs, ageOfOldest;
1838-
CHK(pc != NULL && pRtpPacket != NULL, STATUS_NULL_ARG);
1839-
CHK(pc->onSenderBandwidthEstimation != NULL && pc->pTwccManager != NULL, STATUS_SUCCESS);
1911+
UINT16 seqNum = 0;
1912+
PTwccRtpPacketInfo pTwccRtpPktInfo = NULL;
1913+
1914+
CHK(pKvsPeerConnection != NULL && pRtpPacket != NULL, STATUS_NULL_ARG);
1915+
CHK(pKvsPeerConnection->onSenderBandwidthEstimation != NULL && pKvsPeerConnection->pTwccManager != NULL, STATUS_SUCCESS);
18401916
CHK(TWCC_EXT_PROFILE == pRtpPacket->header.extensionProfile, STATUS_SUCCESS);
18411917

1842-
MUTEX_LOCK(pc->twccLock);
1918+
MUTEX_LOCK(pKvsPeerConnection->twccLock);
18431919
locked = TRUE;
18441920

1921+
CHK((pTwccRtpPktInfo = MEMCALLOC(1, SIZEOF(TwccRtpPacketInfo))) != NULL, STATUS_NOT_ENOUGH_MEMORY);
1922+
1923+
pTwccRtpPktInfo->packetSize = pRtpPacket->payloadLength;
1924+
pTwccRtpPktInfo->localTimeKvs = pRtpPacket->sentTime;
1925+
pTwccRtpPktInfo->remoteTimeKvs = TWCC_PACKET_LOST_TIME;
18451926
seqNum = TWCC_SEQNUM(pRtpPacket->header.extensionPayload);
1846-
CHK_STATUS(stackQueueEnqueue(&pc->pTwccManager->twccPackets, seqNum));
1847-
pc->pTwccManager->twccPacketBySeqNum[seqNum].seqNum = seqNum;
1848-
pc->pTwccManager->twccPacketBySeqNum[seqNum].packetSize = pRtpPacket->payloadLength;
1849-
pc->pTwccManager->twccPacketBySeqNum[seqNum].localTimeKvs = pRtpPacket->sentTime;
1850-
pc->pTwccManager->twccPacketBySeqNum[seqNum].remoteTimeKvs = TWCC_PACKET_LOST_TIME;
1851-
pc->pTwccManager->lastLocalTimeKvs = pRtpPacket->sentTime;
1852-
1853-
// cleanup queue until it contains up to 2 seconds of sent packets
1854-
do {
1855-
CHK_STATUS(stackQueuePeek(&pc->pTwccManager->twccPackets, &sn));
1856-
firstTimeKvs = pc->pTwccManager->twccPacketBySeqNum[(UINT16) sn].localTimeKvs;
1857-
lastLocalTimeKvs = pRtpPacket->sentTime;
1858-
ageOfOldest = lastLocalTimeKvs - firstTimeKvs;
1859-
if (ageOfOldest > TWCC_ESTIMATOR_TIME_WINDOW) {
1860-
CHK_STATUS(stackQueueDequeue(&pc->pTwccManager->twccPackets, &sn));
1861-
CHK_STATUS(stackQueueIsEmpty(&pc->pTwccManager->twccPackets, &isEmpty));
1862-
} else {
1863-
break;
1864-
}
1865-
} while (!isEmpty);
1927+
CHK_STATUS(hashTableUpsert(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, seqNum, (UINT64) pTwccRtpPktInfo));
18661928

1929+
// Ensure twccRollingWindowDeletion is run in a guarded section
1930+
CHK_STATUS(twccRollingWindowDeletion(pKvsPeerConnection, pRtpPacket, seqNum));
18671931
CleanUp:
18681932
if (locked) {
1869-
MUTEX_UNLOCK(pc->twccLock);
1933+
MUTEX_UNLOCK(pKvsPeerConnection->twccLock);
18701934
}
18711935
CHK_LOG_ERR(retStatus);
18721936

src/source/PeerConnection/PeerConnection.h

+10-8
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ extern "C" {
3131
#define CODEC_HASH_TABLE_BUCKET_LENGTH 2
3232
#define RTX_HASH_TABLE_BUCKET_COUNT 50
3333
#define RTX_HASH_TABLE_BUCKET_LENGTH 2
34+
#define TWCC_HASH_TABLE_BUCKET_COUNT 100
35+
#define TWCC_HASH_TABLE_BUCKET_LENGTH 2
3436

3537
#define DATA_CHANNEL_HASH_TABLE_BUCKET_COUNT 200
3638
#define DATA_CHANNEL_HASH_TABLE_BUCKET_LENGTH 2
@@ -47,17 +49,16 @@ typedef enum {
4749
} RTX_CODEC;
4850

4951
typedef struct {
50-
UINT16 seqNum;
51-
UINT16 packetSize;
5252
UINT64 localTimeKvs;
5353
UINT64 remoteTimeKvs;
54-
} TwccPacket, *PTwccPacket;
54+
UINT32 packetSize;
55+
} TwccRtpPacketInfo, *PTwccRtpPacketInfo;
5556

5657
typedef struct {
57-
StackQueue twccPackets;
58-
TwccPacket twccPacketBySeqNum[65536]; // twccPacketBySeqNum takes about 1.2MB of RAM but provides great cache locality
59-
UINT64 lastLocalTimeKvs;
60-
UINT16 lastReportedSeqNum;
58+
PHashTable pTwccRtpPktInfosHashTable; // Hash table of [seqNum, PTwccPacket]
59+
UINT16 firstSeqNumInRollingWindow; // To monitor the last deleted packet in the rolling window
60+
UINT16 lastReportedSeqNum; // To monitor the last packet's seqNum in the TWCC response
61+
UINT16 prevReportedBaseSeqNum; // To monitor the base seqNum in the TWCC response
6162
} TwccManager, *PTwccManager;
6263

6364
typedef struct {
@@ -70,7 +71,7 @@ typedef struct {
7071

7172
typedef struct {
7273
RtcPeerConnection peerConnection;
73-
// UINT32 padding padding makes transportWideSequenceNumber 64bit aligned
74+
// UINT32 padding makes transportWideSequenceNumber 64bit aligned
7475
// we put atomics at the top of structs because customers application could set the packing to 0
7576
// in which case any atomic operations would result in bus errors if there is a misalignment
7677
// for more see https://github.com/awslabs/amazon-kinesis-video-streams-webrtc-sdk-c/pull/987#discussion_r534432907
@@ -183,6 +184,7 @@ VOID onSctpSessionDataChannelOpen(UINT64, UINT32, PBYTE, UINT32);
183184
STATUS sendPacketToRtpReceiver(PKvsPeerConnection, PBYTE, UINT32);
184185
STATUS changePeerConnectionState(PKvsPeerConnection, RTC_PEER_CONNECTION_STATE);
185186
STATUS twccManagerOnPacketSent(PKvsPeerConnection, PRtpPacket);
187+
UINT32 parseExtId(PCHAR);
186188

187189
// visible for testing only
188190
VOID onIceConnectionStateChange(UINT64, UINT64);

0 commit comments

Comments
 (0)