Skip to content

Commit 967cbbf

Browse files
authored
Merge pull request #357 from MidLevel/client-rpc-bulk
perf: Improve RPC bulking and delegate alloc fix
2 parents 42e661f + 1623065 commit 967cbbf

File tree

5 files changed

+136
-84
lines changed

5 files changed

+136
-84
lines changed

MLAPI/Core/NetworkedBehaviour.cs

Lines changed: 13 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1258,48 +1258,20 @@ internal void SendClientRPCPerformance(ulong hash, List<ulong> clientIds, Strea
12581258

12591259
stream.CopyFrom(messageStream);
12601260

1261-
if (clientIds == null)
1261+
if (IsHost)
12621262
{
1263-
for (int i = 0; i < NetworkingManager.Singleton.ConnectedClientsList.Count; i++)
1263+
if (this.NetworkedObject.observers.Contains(NetworkingManager.Singleton.LocalClientId))
12641264
{
1265-
if (!this.NetworkedObject.observers.Contains(NetworkingManager.Singleton.ConnectedClientsList[i].ClientId))
1266-
{
1267-
if (NetworkLog.CurrentLogLevel <= LogLevel.Developer) NetworkLog.LogWarning("Silently suppressed ClientRPC because a target in the bulk list was not an observer");
1268-
continue;
1269-
}
1270-
1271-
if (IsHost && NetworkingManager.Singleton.ConnectedClientsList[i].ClientId == NetworkingManager.Singleton.LocalClientId)
1272-
{
1273-
messageStream.Position = 0;
1274-
InvokeClientRPCLocal(hash, NetworkingManager.Singleton.LocalClientId, messageStream);
1275-
}
1276-
else
1277-
{
1278-
InternalMessageSender.Send(NetworkingManager.Singleton.ConnectedClientsList[i].ClientId, MLAPIConstants.MLAPI_CLIENT_RPC, string.IsNullOrEmpty(channel) ? "MLAPI_DEFAULT_MESSAGE" : channel, stream, security, null);
1279-
}
1265+
messageStream.Position = 0;
1266+
InvokeClientRPCLocal(hash, NetworkingManager.Singleton.LocalClientId, messageStream);
12801267
}
1281-
}
1282-
else
1283-
{
1284-
for (int i = 0; i < clientIds.Count; i++)
1268+
else
12851269
{
1286-
if (!this.NetworkedObject.observers.Contains(clientIds[i]))
1287-
{
1288-
if (NetworkLog.CurrentLogLevel <= LogLevel.Normal) NetworkLog.LogWarning("Cannot send ClientRPC to client without visibility to the object");
1289-
continue;
1290-
}
1291-
1292-
if (IsHost && clientIds[i] == NetworkingManager.Singleton.LocalClientId)
1293-
{
1294-
messageStream.Position = 0;
1295-
InvokeClientRPCLocal(hash, NetworkingManager.Singleton.LocalClientId, messageStream);
1296-
}
1297-
else
1298-
{
1299-
InternalMessageSender.Send(clientIds[i], MLAPIConstants.MLAPI_CLIENT_RPC, string.IsNullOrEmpty(channel) ? "MLAPI_DEFAULT_MESSAGE" : channel, stream, security, null);
1300-
}
1270+
if (NetworkLog.CurrentLogLevel <= LogLevel.Developer) NetworkLog.LogWarning("Silently suppressed ClientRPC because a connected client was not an observer");
13011271
}
13021272
}
1273+
1274+
InternalMessageSender.Send(MLAPIConstants.MLAPI_CLIENT_RPC, string.IsNullOrEmpty(channel) ? "MLAPI_DEFAULT_MESSAGE" : channel, clientIds, stream, security, this.NetworkedObject);
13031275
}
13041276
}
13051277
}
@@ -1324,28 +1296,20 @@ internal void SendClientRPCPerformance(ulong hash, Stream messageStream, ulong c
13241296
stream.CopyFrom(messageStream);
13251297

13261298

1327-
for (int i = 0; i < NetworkingManager.Singleton.ConnectedClientsList.Count; i++)
1299+
if (IsHost && NetworkingManager.Singleton.LocalClientId != clientIdToIgnore)
13281300
{
1329-
if (NetworkingManager.Singleton.ConnectedClientsList[i].ClientId == clientIdToIgnore)
1330-
continue;
1331-
1332-
if (!this.NetworkedObject.observers.Contains(NetworkingManager.Singleton.ConnectedClientsList[i].ClientId))
1333-
{
1334-
if (NetworkLog.CurrentLogLevel <= LogLevel.Developer) NetworkLog.LogWarning("Silently suppressed ClientRPC because a connected client was not an observer");
1335-
continue;
1336-
}
1337-
1338-
1339-
if (IsHost && NetworkingManager.Singleton.ConnectedClientsList[i].ClientId == NetworkingManager.Singleton.LocalClientId)
1301+
if (this.NetworkedObject.observers.Contains(NetworkingManager.Singleton.LocalClientId))
13401302
{
13411303
messageStream.Position = 0;
13421304
InvokeClientRPCLocal(hash, NetworkingManager.Singleton.LocalClientId, messageStream);
13431305
}
13441306
else
13451307
{
1346-
InternalMessageSender.Send(NetworkingManager.Singleton.ConnectedClientsList[i].ClientId, MLAPIConstants.MLAPI_CLIENT_RPC, string.IsNullOrEmpty(channel) ? "MLAPI_DEFAULT_MESSAGE" : channel, stream, security, null);
1308+
if (NetworkLog.CurrentLogLevel <= LogLevel.Developer) NetworkLog.LogWarning("Silently suppressed ClientRPC because a connected client was not an observer");
13471309
}
13481310
}
1311+
1312+
InternalMessageSender.Send(MLAPIConstants.MLAPI_CLIENT_RPC, string.IsNullOrEmpty(channel) ? "MLAPI_DEFAULT_MESSAGE" : channel, clientIdToIgnore, stream, security, this.NetworkedObject);
13491313
}
13501314
}
13511315
}

MLAPI/Core/NetworkingManager.cs

Lines changed: 58 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -938,29 +938,6 @@ internal void HandleIncomingData(ulong clientId, string channelName, ArraySegmen
938938
return;
939939
}
940940

941-
942-
void bufferCallback(ulong networkId)
943-
{
944-
if (!allowBuffer)
945-
{
946-
// This is to prevent recursive buffering
947-
if (NetworkLog.CurrentLogLevel <= LogLevel.Error) NetworkLog.LogError("A message of type " + MLAPIConstants.MESSAGE_NAMES[messageType] + " was recursivley buffered. It has been dropped.");
948-
return;
949-
}
950-
951-
if (!NetworkConfig.EnableMessageBuffering)
952-
{
953-
throw new InvalidOperationException("Cannot buffer with buffering disabled.");
954-
}
955-
956-
if (IsServer)
957-
{
958-
throw new InvalidOperationException("Cannot buffer on server.");
959-
}
960-
961-
BufferManager.BufferMessageForNetworkId(networkId, clientId, channelName, receiveTime, data);
962-
}
963-
964941
#region INTERNAL MESSAGE
965942

966943
switch (messageType)
@@ -993,10 +970,26 @@ void bufferCallback(ulong networkId)
993970
if (IsClient) InternalMessageHandler.HandleTimeSync(clientId, messageStream, receiveTime);
994971
break;
995972
case MLAPIConstants.MLAPI_NETWORKED_VAR_DELTA:
996-
InternalMessageHandler.HandleNetworkedVarDelta(clientId, messageStream, bufferCallback);
973+
InternalMessageHandler.HandleNetworkedVarDelta(clientId, messageStream, BufferCallback, new PreBufferPreset()
974+
{
975+
AllowBuffer = allowBuffer,
976+
ChannelName = channelName,
977+
ClientId = clientId,
978+
Data = data,
979+
MessageType = messageType,
980+
ReceiveTime = receiveTime
981+
});
997982
break;
998983
case MLAPIConstants.MLAPI_NETWORKED_VAR_UPDATE:
999-
InternalMessageHandler.HandleNetworkedVarUpdate(clientId, messageStream, bufferCallback);
984+
InternalMessageHandler.HandleNetworkedVarUpdate(clientId, messageStream, BufferCallback, new PreBufferPreset()
985+
{
986+
AllowBuffer = allowBuffer,
987+
ChannelName = channelName,
988+
ClientId = clientId,
989+
Data = data,
990+
MessageType = messageType,
991+
ReceiveTime = receiveTime
992+
});
1000993
break;
1001994
case MLAPIConstants.MLAPI_SERVER_RPC:
1002995
if (IsServer) InternalMessageHandler.HandleServerRPC(clientId, messageStream);
@@ -1008,10 +1001,26 @@ void bufferCallback(ulong networkId)
10081001
if (IsClient) InternalMessageHandler.HandleServerRPCResponse(clientId, messageStream);
10091002
break;
10101003
case MLAPIConstants.MLAPI_CLIENT_RPC:
1011-
if (IsClient) InternalMessageHandler.HandleClientRPC(clientId, messageStream, bufferCallback);
1004+
if (IsClient) InternalMessageHandler.HandleClientRPC(clientId, messageStream, BufferCallback, new PreBufferPreset()
1005+
{
1006+
AllowBuffer = allowBuffer,
1007+
ChannelName = channelName,
1008+
ClientId = clientId,
1009+
Data = data,
1010+
MessageType = messageType,
1011+
ReceiveTime = receiveTime
1012+
});
10121013
break;
10131014
case MLAPIConstants.MLAPI_CLIENT_RPC_REQUEST:
1014-
if (IsClient) InternalMessageHandler.HandleClientRPCRequest(clientId, messageStream, channelName, security, bufferCallback);
1015+
if (IsClient) InternalMessageHandler.HandleClientRPCRequest(clientId, messageStream, channelName, security, BufferCallback, new PreBufferPreset()
1016+
{
1017+
AllowBuffer = allowBuffer,
1018+
ChannelName = channelName,
1019+
ClientId = clientId,
1020+
Data = data,
1021+
MessageType = messageType,
1022+
ReceiveTime = receiveTime
1023+
});
10151024
break;
10161025
case MLAPIConstants.MLAPI_CLIENT_RPC_RESPONSE:
10171026
if (IsServer) InternalMessageHandler.HandleClientRPCResponse(clientId, messageStream);
@@ -1052,6 +1061,28 @@ void bufferCallback(ulong networkId)
10521061
}
10531062
}
10541063

1064+
private void BufferCallback(ulong networkId, PreBufferPreset preset)
1065+
{
1066+
if (!preset.AllowBuffer)
1067+
{
1068+
// This is to prevent recursive buffering
1069+
if (NetworkLog.CurrentLogLevel <= LogLevel.Error) NetworkLog.LogError("A message of type " + MLAPIConstants.MESSAGE_NAMES[preset.MessageType] + " was recursivley buffered. It has been dropped.");
1070+
return;
1071+
}
1072+
1073+
if (!NetworkConfig.EnableMessageBuffering)
1074+
{
1075+
throw new InvalidOperationException("Cannot buffer with buffering disabled.");
1076+
}
1077+
1078+
if (IsServer)
1079+
{
1080+
throw new InvalidOperationException("Cannot buffer on server.");
1081+
}
1082+
1083+
BufferManager.BufferMessageForNetworkId(networkId, preset.ClientId, preset.ChannelName, preset.ReceiveTime, preset.Data);
1084+
}
1085+
10551086
/// <summary>
10561087
/// Disconnects the remote client.
10571088
/// </summary>
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using System;
2+
namespace MLAPI.Messaging.Buffering
3+
{
4+
internal struct PreBufferPreset
5+
{
6+
public byte MessageType;
7+
public bool AllowBuffer;
8+
public ulong ClientId;
9+
public string ChannelName;
10+
public float ReceiveTime;
11+
public ArraySegment<byte> Data;
12+
}
13+
}

MLAPI/Messaging/InternalMessageHandler.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ internal static void HandleTimeSync(ulong clientId, Stream stream, float receive
497497
}
498498
}
499499

500-
internal static void HandleNetworkedVarDelta(ulong clientId, Stream stream, Action<ulong> bufferCallback)
500+
internal static void HandleNetworkedVarDelta(ulong clientId, Stream stream, Action<ulong, PreBufferPreset> bufferCallback, PreBufferPreset bufferPreset)
501501
{
502502
if (!NetworkingManager.Singleton.NetworkConfig.EnableNetworkedVar)
503503
{
@@ -530,12 +530,12 @@ internal static void HandleNetworkedVarDelta(ulong clientId, Stream stream, Acti
530530
else
531531
{
532532
if (NetworkLog.CurrentLogLevel <= LogLevel.Normal) NetworkLog.LogWarning("NetworkedVarDelta message recieved for a non existant object with id: " + networkId + ". This delta will be buffered and might be recovered.");
533-
bufferCallback(networkId);
533+
bufferCallback(networkId, bufferPreset);
534534
}
535535
}
536536
}
537537

538-
internal static void HandleNetworkedVarUpdate(ulong clientId, Stream stream, Action<ulong> bufferCallback)
538+
internal static void HandleNetworkedVarUpdate(ulong clientId, Stream stream, Action<ulong, PreBufferPreset> bufferCallback, PreBufferPreset bufferPreset)
539539
{
540540
if (!NetworkingManager.Singleton.NetworkConfig.EnableNetworkedVar)
541541
{
@@ -568,7 +568,7 @@ internal static void HandleNetworkedVarUpdate(ulong clientId, Stream stream, Act
568568
else
569569
{
570570
if (NetworkLog.CurrentLogLevel <= LogLevel.Normal) NetworkLog.LogWarning("NetworkedVarUpdate message recieved for a non existant object with id: " + networkId + ". This delta will be buffered and might be recovered.");
571-
bufferCallback(networkId);
571+
bufferCallback(networkId, bufferPreset);
572572
}
573573
}
574574
}
@@ -689,7 +689,7 @@ internal static void HandleServerRPCResponse(ulong clientId, Stream stream)
689689
}
690690
}
691691

692-
internal static void HandleClientRPC(ulong clientId, Stream stream, Action<ulong> bufferCallback)
692+
internal static void HandleClientRPC(ulong clientId, Stream stream, Action<ulong, PreBufferPreset> bufferCallback, PreBufferPreset bufferPreset)
693693
{
694694
using (PooledBitReader reader = PooledBitReader.Get(stream))
695695
{
@@ -717,12 +717,12 @@ internal static void HandleClientRPC(ulong clientId, Stream stream, Action<ulong
717717
else
718718
{
719719
if (NetworkLog.CurrentLogLevel <= LogLevel.Normal) NetworkLog.LogWarning("ClientRPC message recieved for a non existant object with id: " + networkId + ". This message will be buffered and might be recovered.");
720-
bufferCallback(networkId);
720+
bufferCallback(networkId, bufferPreset);
721721
}
722722
}
723723
}
724724

725-
internal static void HandleClientRPCRequest(ulong clientId, Stream stream, string channelName, SecuritySendFlags security, Action<ulong> bufferCallback)
725+
internal static void HandleClientRPCRequest(ulong clientId, Stream stream, string channelName, SecuritySendFlags security, Action<ulong, PreBufferPreset> bufferCallback, PreBufferPreset bufferPreset)
726726
{
727727
using (PooledBitReader reader = PooledBitReader.Get(stream))
728728
{
@@ -762,7 +762,7 @@ internal static void HandleClientRPCRequest(ulong clientId, Stream stream, strin
762762
else
763763
{
764764
if (NetworkLog.CurrentLogLevel <= LogLevel.Normal) NetworkLog.LogWarning("ClientRPCRequest message recieved for a non existant object with id: " + networkId + ". This message will be buffered and might be recovered.");
765-
bufferCallback(networkId);
765+
bufferCallback(networkId, bufferPreset);
766766
}
767767
}
768768
}

MLAPI/Messaging/InternalMessageSender.cs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Generic;
23
using MLAPI.Configuration;
34
using MLAPI.Internal;
45
using MLAPI.Logging;
@@ -70,6 +71,49 @@ internal static void Send(byte messageType, string channelName, BitStream messag
7071
}
7172
}
7273

74+
internal static void Send(byte messageType, string channelName, List<ulong> clientIds, BitStream messageStream, SecuritySendFlags flags, NetworkedObject targetObject)
75+
{
76+
if (clientIds == null)
77+
{
78+
Send(messageType, channelName, messageStream, flags, targetObject);
79+
return;
80+
}
81+
82+
bool encrypted = ((flags & SecuritySendFlags.Encrypted) == SecuritySendFlags.Encrypted) && NetworkingManager.Singleton.NetworkConfig.EnableEncryption;
83+
bool authenticated = ((flags & SecuritySendFlags.Authenticated) == SecuritySendFlags.Authenticated) && NetworkingManager.Singleton.NetworkConfig.EnableEncryption;
84+
85+
if (authenticated || encrypted)
86+
{
87+
for (int i = 0; i < clientIds.Count; i++)
88+
{
89+
Send(clientIds[i], messageType, channelName, messageStream, flags, targetObject);
90+
}
91+
}
92+
else
93+
{
94+
messageStream.PadStream();
95+
96+
using (BitStream stream = MessagePacker.WrapMessage(messageType, 0, messageStream, flags))
97+
{
98+
NetworkProfiler.StartEvent(TickType.Send, (uint)stream.Length, channelName, MLAPIConstants.MESSAGE_NAMES[messageType]);
99+
for (int i = 0; i < clientIds.Count; i++)
100+
{
101+
if (NetworkingManager.Singleton.IsServer && clientIds[i] == NetworkingManager.Singleton.ServerClientId)
102+
continue;
103+
104+
if (targetObject != null && !targetObject.observers.Contains(clientIds[i]))
105+
{
106+
if (NetworkLog.CurrentLogLevel <= LogLevel.Developer) NetworkLog.LogWarning("Silently suppressed send(all) call because it was directed to an object without visibility");
107+
continue;
108+
}
109+
110+
NetworkingManager.Singleton.NetworkConfig.NetworkTransport.Send(clientIds[i], new ArraySegment<byte>(stream.GetBuffer(), 0, (int)stream.Length), channelName);
111+
}
112+
NetworkProfiler.EndEvent();
113+
}
114+
}
115+
}
116+
73117
internal static void Send(byte messageType, string channelName, ulong clientIdToIgnore, BitStream messageStream, SecuritySendFlags flags, NetworkedObject targetObject)
74118
{
75119
bool encrypted = ((flags & SecuritySendFlags.Encrypted) == SecuritySendFlags.Encrypted) && NetworkingManager.Singleton.NetworkConfig.EnableEncryption;

0 commit comments

Comments
 (0)