Skip to content

Commit 2448a1d

Browse files
authored
Merge pull request #273 from MidLevel/pooled-scene-buffering
fix: Added scene buffering for scene switches
2 parents 678d0ba + c301900 commit 2448a1d

File tree

6 files changed

+284
-27
lines changed

6 files changed

+284
-27
lines changed

MLAPI-Editor/NetworkingManagerEditor.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public class NetworkingManagerEditor : Editor
4242
private SerializedProperty networkIdRecycleDelayProperty;
4343
private SerializedProperty rpcHashSizeProperty;
4444
private SerializedProperty loadSceneTimeOutProperty;
45+
private SerializedProperty enableMessageBufferingProperty;
46+
private SerializedProperty messageBufferTimeoutProperty;
4547
private SerializedProperty enableEncryptionProperty;
4648
private SerializedProperty signKeyExchangeProperty;
4749
private SerializedProperty serverBase64PfxCertificateProperty;
@@ -121,6 +123,8 @@ private void Init()
121123
networkIdRecycleDelayProperty = networkConfigProperty.FindPropertyRelative("NetworkIdRecycleDelay");
122124
rpcHashSizeProperty = networkConfigProperty.FindPropertyRelative("RpcHashSize");
123125
loadSceneTimeOutProperty = networkConfigProperty.FindPropertyRelative("LoadSceneTimeOut");
126+
enableMessageBufferingProperty = networkConfigProperty.FindPropertyRelative("EnableMessageBuffering");
127+
messageBufferTimeoutProperty = networkConfigProperty.FindPropertyRelative("MessageBufferTimeout");
124128
enableEncryptionProperty = networkConfigProperty.FindPropertyRelative("EnableEncryption");
125129
signKeyExchangeProperty = networkConfigProperty.FindPropertyRelative("SignKeyExchange");
126130
serverBase64PfxCertificateProperty = networkConfigProperty.FindPropertyRelative("ServerBase64PfxCertificate");
@@ -160,6 +164,8 @@ private void CheckNullProperties()
160164
networkIdRecycleDelayProperty = networkConfigProperty.FindPropertyRelative("NetworkIdRecycleDelay");
161165
rpcHashSizeProperty = networkConfigProperty.FindPropertyRelative("RpcHashSize");
162166
loadSceneTimeOutProperty = networkConfigProperty.FindPropertyRelative("LoadSceneTimeOut");
167+
enableMessageBufferingProperty = networkConfigProperty.FindPropertyRelative("EnableMessageBuffering");
168+
messageBufferTimeoutProperty = networkConfigProperty.FindPropertyRelative("MessageBufferTimeout");
163169
enableEncryptionProperty = networkConfigProperty.FindPropertyRelative("EnableEncryption");
164170
signKeyExchangeProperty = networkConfigProperty.FindPropertyRelative("SignKeyExchange");
165171
serverBase64PfxCertificateProperty = networkConfigProperty.FindPropertyRelative("ServerBase64PfxCertificate");
@@ -345,6 +351,13 @@ public override void OnInspectorGUI()
345351
EditorGUILayout.PropertyField(networkIdRecycleDelayProperty);
346352
}
347353

354+
EditorGUILayout.PropertyField(enableMessageBufferingProperty);
355+
356+
using (new EditorGUI.DisabledScope(!networkingManager.NetworkConfig.EnableMessageBuffering))
357+
{
358+
EditorGUILayout.PropertyField(messageBufferTimeoutProperty);
359+
}
360+
348361
EditorGUILayout.LabelField("Bandwidth", EditorStyles.boldLabel);
349362
EditorGUILayout.PropertyField(rpcHashSizeProperty);
350363

MLAPI/Configuration/NetworkConfig.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,16 @@ public class NetworkConfig
166166
[Tooltip("The amount of seconds to wait for all clients to load a requested scene")]
167167
public int LoadSceneTimeOut = 120;
168168
/// <summary>
169+
/// Whether or not message buffering should be enabled. This will resolve most out of order messages during spawn.
170+
/// </summary>
171+
[Tooltip("Whether or not message buffering should be enabled. This will resolve most out of order messages during spawn")]
172+
public bool EnableMessageBuffering = true;
173+
/// <summary>
174+
/// The amount of time a message should be buffered for without being consumed. If it is not consumed within this time, it will be dropped.
175+
/// </summary>
176+
[Tooltip("The amount of time a message should be buffered for without being consumed. If it is not consumed within this time, it will be dropped")]
177+
public float MessageBufferTimeout = 20f;
178+
/// <summary>
169179
/// Whether or not to enable the ECDHE key exchange to allow for encryption and authentication of messages
170180
/// </summary>
171181
[Tooltip("Whether or not to enable the ECDHE key exchange to allow for encryption and authentication of messages")]

MLAPI/Core/NetworkingManager.cs

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
using static MLAPI.Messaging.CustomMessagingManager;
2626
using MLAPI.Exceptions;
2727
using MLAPI.Transports.Tasks;
28+
using MLAPI.Messaging.Buffering;
2829

2930
namespace MLAPI
3031
{
@@ -695,6 +696,11 @@ private void Update()
695696
NetworkedObject.NetworkedBehaviourUpdate();
696697
}
697698

699+
if (!IsServer && NetworkConfig.EnableMessageBuffering)
700+
{
701+
BufferManager.CleanBuffer();
702+
}
703+
698704
if (IsServer)
699705
{
700706
lastEventTickTime = NetworkTime;
@@ -881,7 +887,7 @@ private void HandleRawTransportPoll(NetEventType eventType, ulong clientId, stri
881887
case NetEventType.Data:
882888
if (LogHelper.CurrentLogLevel <= LogLevel.Developer) LogHelper.LogInfo($"Incoming Data From {clientId} : {payload.Count} bytes");
883889

884-
HandleIncomingData(clientId, channelName, payload, receiveTime);
890+
HandleIncomingData(clientId, channelName, payload, receiveTime, true);
885891
break;
886892
case NetEventType.Disconnect:
887893
NetworkProfiler.StartEvent(TickType.Receive, 0, "NONE", "TRANSPORT_DISCONNECT");
@@ -905,7 +911,7 @@ private void HandleRawTransportPoll(NetEventType eventType, ulong clientId, stri
905911

906912
private readonly BitStream inputStreamWrapper = new BitStream(new byte[0]);
907913

908-
private void HandleIncomingData(ulong clientId, string channelName, ArraySegment<byte> data, float receiveTime)
914+
internal void HandleIncomingData(ulong clientId, string channelName, ArraySegment<byte> data, float receiveTime, bool allowBuffer)
909915
{
910916
if (LogHelper.CurrentLogLevel <= LogLevel.Developer) LogHelper.LogInfo("Unwrapping Data Header");
911917

@@ -939,7 +945,31 @@ private void HandleIncomingData(ulong clientId, string channelName, ArraySegment
939945
return;
940946
}
941947

948+
949+
void bufferCallback(ulong networkId)
950+
{
951+
if (!allowBuffer)
952+
{
953+
// This is to prevent recursive buffering
954+
if (LogHelper.CurrentLogLevel <= LogLevel.Error) LogHelper.LogError("A message of type " + MLAPIConstants.MESSAGE_NAMES[messageType] + " was recursivley buffered. It has been dropped.");
955+
return;
956+
}
957+
958+
if (!NetworkConfig.EnableMessageBuffering)
959+
{
960+
throw new InvalidOperationException("Cannot buffer with buffering disabled.");
961+
}
962+
963+
if (IsServer)
964+
{
965+
throw new InvalidOperationException("Cannot buffer on server.");
966+
}
967+
968+
BufferManager.BufferMessageForNetworkId(networkId, clientId, channelName, receiveTime, data);
969+
}
970+
942971
#region INTERNAL MESSAGE
972+
943973
switch (messageType)
944974
{
945975
case MLAPIConstants.MLAPI_CONNECTION_REQUEST:
@@ -955,7 +985,7 @@ private void HandleIncomingData(ulong clientId, string channelName, ArraySegment
955985
if (IsClient) InternalMessageHandler.HandleDestroyObject(clientId, messageStream);
956986
break;
957987
case MLAPIConstants.MLAPI_SWITCH_SCENE:
958-
if (IsClient && NetworkConfig.EnableSceneManagement) InternalMessageHandler.HandleSwitchScene(clientId, messageStream);
988+
if (IsClient) InternalMessageHandler.HandleSwitchScene(clientId, messageStream);
959989
break;
960990
case MLAPIConstants.MLAPI_CHANGE_OWNER:
961991
if (IsClient) InternalMessageHandler.HandleChangeOwner(clientId, messageStream);
@@ -970,10 +1000,10 @@ private void HandleIncomingData(ulong clientId, string channelName, ArraySegment
9701000
if (IsClient) InternalMessageHandler.HandleTimeSync(clientId, messageStream, receiveTime);
9711001
break;
9721002
case MLAPIConstants.MLAPI_NETWORKED_VAR_DELTA:
973-
InternalMessageHandler.HandleNetworkedVarDelta(clientId, messageStream);
1003+
InternalMessageHandler.HandleNetworkedVarDelta(clientId, messageStream, bufferCallback);
9741004
break;
9751005
case MLAPIConstants.MLAPI_NETWORKED_VAR_UPDATE:
976-
InternalMessageHandler.HandleNetworkedVarUpdate(clientId, messageStream);
1006+
InternalMessageHandler.HandleNetworkedVarUpdate(clientId, messageStream, bufferCallback);
9771007
break;
9781008
case MLAPIConstants.MLAPI_SERVER_RPC:
9791009
if (IsServer) InternalMessageHandler.HandleServerRPC(clientId, messageStream);
@@ -985,10 +1015,10 @@ private void HandleIncomingData(ulong clientId, string channelName, ArraySegment
9851015
if (IsClient) InternalMessageHandler.HandleServerRPCResponse(clientId, messageStream);
9861016
break;
9871017
case MLAPIConstants.MLAPI_CLIENT_RPC:
988-
if (IsClient) InternalMessageHandler.HandleClientRPC(clientId, messageStream);
1018+
if (IsClient) InternalMessageHandler.HandleClientRPC(clientId, messageStream, bufferCallback);
9891019
break;
9901020
case MLAPIConstants.MLAPI_CLIENT_RPC_REQUEST:
991-
if (IsClient) InternalMessageHandler.HandleClientRPCRequest(clientId, messageStream, channelName, security);
1021+
if (IsClient) InternalMessageHandler.HandleClientRPCRequest(clientId, messageStream, channelName, security, bufferCallback);
9921022
break;
9931023
case MLAPIConstants.MLAPI_CLIENT_RPC_RESPONSE:
9941024
if (IsServer) InternalMessageHandler.HandleClientRPCResponse(clientId, messageStream);
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using MLAPI.Serialization.Pooled;
4+
using UnityEngine;
5+
6+
namespace MLAPI.Messaging.Buffering
7+
{
8+
internal static class BufferManager
9+
{
10+
private static readonly Dictionary<ulong, Queue<BufferedMessage>> bufferQueues = new Dictionary<ulong, Queue<BufferedMessage>>();
11+
12+
internal struct BufferedMessage
13+
{
14+
internal ulong sender;
15+
internal string channelName;
16+
internal PooledBitStream payload;
17+
internal float receiveTime;
18+
internal float bufferTime;
19+
}
20+
21+
internal static Queue<BufferedMessage> ConsumeBuffersForNetworkId(ulong networkId)
22+
{
23+
if (bufferQueues.ContainsKey(networkId))
24+
{
25+
Queue<BufferedMessage> message = bufferQueues[networkId];
26+
27+
bufferQueues.Remove(networkId);
28+
29+
return message;
30+
}
31+
else
32+
{
33+
return null;
34+
}
35+
}
36+
37+
internal static void RecycleConsumedBufferedMessage(BufferedMessage message)
38+
{
39+
message.payload.Dispose();
40+
}
41+
42+
internal static void BufferMessageForNetworkId(ulong networkId, ulong sender, string channelName, float receiveTime, ArraySegment<byte> payload)
43+
{
44+
if (!bufferQueues.ContainsKey(networkId))
45+
{
46+
bufferQueues.Add(networkId, new Queue<BufferedMessage>());
47+
}
48+
49+
Queue<BufferedMessage> queue = bufferQueues[networkId];
50+
51+
PooledBitStream payloadStream = PooledBitStream.Get();
52+
53+
payloadStream.Write(payload.Array, payload.Offset, payload.Count);
54+
payloadStream.Position = 0;
55+
56+
queue.Enqueue(new BufferedMessage()
57+
{
58+
bufferTime = Time.realtimeSinceStartup,
59+
channelName = channelName,
60+
payload = payloadStream,
61+
receiveTime = receiveTime,
62+
sender = sender
63+
});
64+
}
65+
66+
private static readonly List<ulong> _keysToDestroy = new List<ulong>();
67+
internal static void CleanBuffer()
68+
{
69+
foreach (KeyValuePair<ulong, Queue<BufferedMessage>> pair in bufferQueues)
70+
{
71+
while (pair.Value.Count > 0 && Time.realtimeSinceStartup - pair.Value.Peek().bufferTime >= NetworkingManager.Singleton.NetworkConfig.MessageBufferTimeout)
72+
{
73+
BufferedMessage message = pair.Value.Dequeue();
74+
75+
RecycleConsumedBufferedMessage(message);
76+
}
77+
78+
if (pair.Value.Count == 0)
79+
{
80+
_keysToDestroy.Add(pair.Key);
81+
}
82+
}
83+
84+
for (int i = 0; i < _keysToDestroy.Count; i++)
85+
{
86+
bufferQueues.Remove(_keysToDestroy[i]);
87+
}
88+
89+
_keysToDestroy.Clear();
90+
}
91+
}
92+
}

0 commit comments

Comments
 (0)