com.unity.netcode.gameobjects@1.5.1
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). Additional documentation and release notes are available at [Multiplayer Documentation](https://docs-multiplayer.unity3d.com). ## [1.5.1] - 2023-06-07 ### Added - Added support for serializing `NativeArray<>` and `NativeList<>` in `FastBufferReader`/`FastBufferWriter`, `BufferSerializer`, `NetworkVariable`, and RPCs. (To use `NativeList<>`, add `UNITY_NETCODE_NATIVE_COLLECTION_SUPPORT` to your Scripting Define Symbols in `Project Settings > Player`) (#2375) - The location of the automatically-created default network prefab list can now be configured (#2544) - Added: Message size limits (max single message and max fragmented message) can now be set using NetworkManager.MaximumTransmissionUnitSize and NetworkManager.MaximumFragmentedMessageSize for transports that don't work with the default values (#2530) - Added `NetworkObject.SpawnWithObservers` property (default is true) that when set to false will spawn a `NetworkObject` with no observers and will not be spawned on any client until `NetworkObject.NetworkShow` is invoked. (#2568) ### Fixed - Fixed: Fixed a null reference in codegen in some projects (#2581) - Fixed issue where the `OnClientDisconnected` client identifier was incorrect after a pending client connection was denied. (#2569) - Fixed warning "Runtime Network Prefabs was not empty at initialization time." being erroneously logged when no runtime network prefabs had been added (#2565) - Fixed issue where some temporary debug console logging was left in a merged PR. (#2562) - Fixed the "Generate Default Network Prefabs List" setting not loading correctly and always reverting to being checked. (#2545) - Fixed issue where users could not use NetworkSceneManager.VerifySceneBeforeLoading to exclude runtime generated scenes from client synchronization. (#2550) - Fixed missing value on `NetworkListEvent` for `EventType.RemoveAt` events. (#2542,#2543) - Fixed issue where parenting a NetworkTransform under a transform with a scale other than Vector3.one would result in incorrect values on non-authoritative instances. (#2538) - Fixed issue where a server would include scene migrated and then despawned NetworkObjects to a client that was being synchronized. (#2532) - Fixed the inspector throwing exceptions when attempting to render `NetworkVariable`s of enum types. (#2529) - Making a `NetworkVariable` with an `INetworkSerializable` type that doesn't meet the `new()` constraint will now create a compile-time error instead of an editor crash (#2528) - Fixed Multiplayer Tools package installation docs page link on the NetworkManager popup. (#2526) - Fixed an exception and error logging when two different objects are shown and hidden on the same frame (#2524) - Fixed a memory leak in `UnityTransport` that occurred if `StartClient` failed. (#2518) - Fixed issue where a client could throw an exception if abruptly disconnected from a network session with one or more spawned `NetworkObject`(s). (#2510) - Fixed issue where invalid endpoint addresses were not being detected and returning false from NGO UnityTransport. (#2496) - Fixed some errors that could occur if a connection is lost and the loss is detected when attempting to write to the socket. (#2495) ## Changed - Adding network prefabs before NetworkManager initialization is now supported. (#2565) - Connecting clients being synchronized now switch to the server's active scene before spawning and synchronizing NetworkObjects. (#2532) - Updated `UnityTransport` dependency on `com.unity.transport` to 1.3.4. (#2533) - Improved performance of NetworkBehaviour initialization by replacing reflection when initializing NetworkVariables with compile-time code generation, which should help reduce hitching during additive scene loads. (#2522)
This commit is contained in:
855
Runtime/Messaging/NetworkMessageManager.cs
Normal file
855
Runtime/Messaging/NetworkMessageManager.cs
Normal file
@@ -0,0 +1,855 @@
|
||||
using System;
|
||||
using System.Collections;
|
||||
using System.Collections.Generic;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Text;
|
||||
using Unity.Collections;
|
||||
using Unity.Collections.LowLevel.Unsafe;
|
||||
using UnityEngine;
|
||||
|
||||
namespace Unity.Netcode
|
||||
{
|
||||
internal class HandlerNotRegisteredException : SystemException
|
||||
{
|
||||
public HandlerNotRegisteredException()
|
||||
{
|
||||
}
|
||||
|
||||
public HandlerNotRegisteredException(string issue) : base(issue)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
internal class InvalidMessageStructureException : SystemException
|
||||
{
|
||||
public InvalidMessageStructureException()
|
||||
{
|
||||
}
|
||||
|
||||
public InvalidMessageStructureException(string issue) : base(issue)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
internal class NetworkMessageManager : IDisposable
|
||||
{
|
||||
public bool StopProcessing = false;
|
||||
|
||||
private struct ReceiveQueueItem
|
||||
{
|
||||
public FastBufferReader Reader;
|
||||
public NetworkMessageHeader Header;
|
||||
public ulong SenderId;
|
||||
public float Timestamp;
|
||||
public int MessageHeaderSerializedSize;
|
||||
}
|
||||
|
||||
private struct SendQueueItem
|
||||
{
|
||||
public NetworkBatchHeader BatchHeader;
|
||||
public FastBufferWriter Writer;
|
||||
public readonly NetworkDelivery NetworkDelivery;
|
||||
|
||||
public SendQueueItem(NetworkDelivery delivery, int writerSize, Allocator writerAllocator, int maxWriterSize = -1)
|
||||
{
|
||||
Writer = new FastBufferWriter(writerSize, writerAllocator, maxWriterSize);
|
||||
NetworkDelivery = delivery;
|
||||
BatchHeader = new NetworkBatchHeader { Magic = NetworkBatchHeader.MagicValue };
|
||||
}
|
||||
}
|
||||
|
||||
internal delegate void MessageHandler(FastBufferReader reader, ref NetworkContext context, NetworkMessageManager manager);
|
||||
|
||||
internal delegate int VersionGetter();
|
||||
|
||||
private NativeList<ReceiveQueueItem> m_IncomingMessageQueue = new NativeList<ReceiveQueueItem>(16, Allocator.Persistent);
|
||||
|
||||
// These array will grow as we need more message handlers. 4 is just a starting size.
|
||||
private MessageHandler[] m_MessageHandlers = new MessageHandler[4];
|
||||
private Type[] m_ReverseTypeMap = new Type[4];
|
||||
|
||||
private Dictionary<Type, uint> m_MessageTypes = new Dictionary<Type, uint>();
|
||||
private Dictionary<ulong, NativeList<SendQueueItem>> m_SendQueues = new Dictionary<ulong, NativeList<SendQueueItem>>();
|
||||
|
||||
private HashSet<ulong> m_DisconnectedClients = new HashSet<ulong>();
|
||||
|
||||
// This is m_PerClientMessageVersion[clientId][messageType] = version
|
||||
private Dictionary<ulong, Dictionary<Type, int>> m_PerClientMessageVersions = new Dictionary<ulong, Dictionary<Type, int>>();
|
||||
private Dictionary<uint, Type> m_MessagesByHash = new Dictionary<uint, Type>();
|
||||
private Dictionary<Type, int> m_LocalVersions = new Dictionary<Type, int>();
|
||||
|
||||
private List<INetworkHooks> m_Hooks = new List<INetworkHooks>();
|
||||
|
||||
private uint m_HighMessageType;
|
||||
private object m_Owner;
|
||||
private INetworkMessageSender m_Sender;
|
||||
private bool m_Disposed;
|
||||
|
||||
internal Type[] MessageTypes => m_ReverseTypeMap;
|
||||
internal MessageHandler[] MessageHandlers => m_MessageHandlers;
|
||||
|
||||
internal uint MessageHandlerCount => m_HighMessageType;
|
||||
|
||||
internal uint GetMessageType(Type t)
|
||||
{
|
||||
return m_MessageTypes[t];
|
||||
}
|
||||
|
||||
public const int DefaultNonFragmentedMessageMaxSize = 1300;
|
||||
public int NonFragmentedMessageMaxSize = DefaultNonFragmentedMessageMaxSize;
|
||||
public int FragmentedMessageMaxSize = int.MaxValue;
|
||||
|
||||
internal struct MessageWithHandler
|
||||
{
|
||||
public Type MessageType;
|
||||
public MessageHandler Handler;
|
||||
public VersionGetter GetVersion;
|
||||
}
|
||||
|
||||
internal List<MessageWithHandler> PrioritizeMessageOrder(List<MessageWithHandler> allowedTypes)
|
||||
{
|
||||
var prioritizedTypes = new List<MessageWithHandler>();
|
||||
|
||||
// First pass puts the priority message in the first indices
|
||||
// Those are the messages that must be delivered in order to allow re-ordering the others later
|
||||
foreach (var t in allowedTypes)
|
||||
{
|
||||
if (t.MessageType.FullName == typeof(ConnectionRequestMessage).FullName ||
|
||||
t.MessageType.FullName == typeof(ConnectionApprovedMessage).FullName)
|
||||
{
|
||||
prioritizedTypes.Add(t);
|
||||
}
|
||||
}
|
||||
|
||||
foreach (var t in allowedTypes)
|
||||
{
|
||||
if (t.MessageType.FullName != typeof(ConnectionRequestMessage).FullName &&
|
||||
t.MessageType.FullName != typeof(ConnectionApprovedMessage).FullName)
|
||||
{
|
||||
prioritizedTypes.Add(t);
|
||||
}
|
||||
}
|
||||
|
||||
return prioritizedTypes;
|
||||
}
|
||||
|
||||
public NetworkMessageManager(INetworkMessageSender sender, object owner, INetworkMessageProvider provider = null)
|
||||
{
|
||||
try
|
||||
{
|
||||
m_Sender = sender;
|
||||
m_Owner = owner;
|
||||
|
||||
if (provider == null)
|
||||
{
|
||||
provider = new ILPPMessageProvider();
|
||||
}
|
||||
|
||||
var allowedTypes = provider.GetMessages();
|
||||
|
||||
allowedTypes.Sort((a, b) => string.CompareOrdinal(a.MessageType.FullName, b.MessageType.FullName));
|
||||
allowedTypes = PrioritizeMessageOrder(allowedTypes);
|
||||
foreach (var type in allowedTypes)
|
||||
{
|
||||
RegisterMessageType(type);
|
||||
}
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
Dispose();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (m_Disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// Can't just iterate SendQueues or SendQueues.Keys because ClientDisconnected removes from the queue.
|
||||
foreach (var kvp in m_SendQueues)
|
||||
{
|
||||
ClientDisconnected(kvp.Key);
|
||||
}
|
||||
|
||||
CleanupDisconnectedClients();
|
||||
|
||||
for (var queueIndex = 0; queueIndex < m_IncomingMessageQueue.Length; ++queueIndex)
|
||||
{
|
||||
// Avoid copies...
|
||||
ref var item = ref m_IncomingMessageQueue.ElementAt(queueIndex);
|
||||
item.Reader.Dispose();
|
||||
}
|
||||
|
||||
m_IncomingMessageQueue.Dispose();
|
||||
m_Disposed = true;
|
||||
}
|
||||
|
||||
~NetworkMessageManager()
|
||||
{
|
||||
Dispose();
|
||||
}
|
||||
|
||||
public void Hook(INetworkHooks hooks)
|
||||
{
|
||||
m_Hooks.Add(hooks);
|
||||
}
|
||||
|
||||
public void Unhook(INetworkHooks hooks)
|
||||
{
|
||||
m_Hooks.Remove(hooks);
|
||||
}
|
||||
|
||||
private void RegisterMessageType(MessageWithHandler messageWithHandler)
|
||||
{
|
||||
// If we are out of space, perform amortized linear growth
|
||||
if (m_HighMessageType == m_MessageHandlers.Length)
|
||||
{
|
||||
Array.Resize(ref m_MessageHandlers, 2 * m_MessageHandlers.Length);
|
||||
Array.Resize(ref m_ReverseTypeMap, 2 * m_ReverseTypeMap.Length);
|
||||
}
|
||||
|
||||
m_MessageHandlers[m_HighMessageType] = messageWithHandler.Handler;
|
||||
m_ReverseTypeMap[m_HighMessageType] = messageWithHandler.MessageType;
|
||||
m_MessagesByHash[XXHash.Hash32(messageWithHandler.MessageType.FullName)] = messageWithHandler.MessageType;
|
||||
m_MessageTypes[messageWithHandler.MessageType] = m_HighMessageType++;
|
||||
m_LocalVersions[messageWithHandler.MessageType] = messageWithHandler.GetVersion();
|
||||
}
|
||||
|
||||
public int GetLocalVersion(Type messageType)
|
||||
{
|
||||
return m_LocalVersions[messageType];
|
||||
}
|
||||
|
||||
internal static string ByteArrayToString(byte[] ba, int offset, int count)
|
||||
{
|
||||
var hex = new StringBuilder(ba.Length * 2);
|
||||
for (int i = offset; i < offset + count; ++i)
|
||||
{
|
||||
hex.AppendFormat("{0:x2} ", ba[i]);
|
||||
}
|
||||
|
||||
return hex.ToString();
|
||||
}
|
||||
|
||||
internal void HandleIncomingData(ulong clientId, ArraySegment<byte> data, float receiveTime)
|
||||
{
|
||||
unsafe
|
||||
{
|
||||
fixed (byte* dataPtr = data.Array)
|
||||
{
|
||||
var batchReader = new FastBufferReader(dataPtr + data.Offset, Allocator.None, data.Count);
|
||||
if (!batchReader.TryBeginRead(sizeof(NetworkBatchHeader)))
|
||||
{
|
||||
NetworkLog.LogError("Received a packet too small to contain a BatchHeader. Ignoring it.");
|
||||
return;
|
||||
}
|
||||
|
||||
batchReader.ReadValue(out NetworkBatchHeader batchHeader);
|
||||
|
||||
if (batchHeader.Magic != NetworkBatchHeader.MagicValue)
|
||||
{
|
||||
NetworkLog.LogError($"Received a packet with an invalid Magic Value. Please report this to the Netcode for GameObjects team at https://github.com/Unity-Technologies/com.unity.netcode.gameobjects/issues and include the following data: Offset: {data.Offset}, Size: {data.Count}, Full receive array: {ByteArrayToString(data.Array, 0, data.Array.Length)}");
|
||||
return;
|
||||
}
|
||||
|
||||
if (batchHeader.BatchSize != data.Count)
|
||||
{
|
||||
NetworkLog.LogError($"Received a packet with an invalid Batch Size Value. Please report this to the Netcode for GameObjects team at https://github.com/Unity-Technologies/com.unity.netcode.gameobjects/issues and include the following data: Offset: {data.Offset}, Size: {data.Count}, Expected Size: {batchHeader.BatchSize}, Full receive array: {ByteArrayToString(data.Array, 0, data.Array.Length)}");
|
||||
return;
|
||||
}
|
||||
|
||||
var hash = XXHash.Hash64(batchReader.GetUnsafePtrAtCurrentPosition(), batchReader.Length - batchReader.Position);
|
||||
|
||||
if (hash != batchHeader.BatchHash)
|
||||
{
|
||||
NetworkLog.LogError($"Received a packet with an invalid Hash Value. Please report this to the Netcode for GameObjects team at https://github.com/Unity-Technologies/com.unity.netcode.gameobjects/issues and include the following data: Received Hash: {batchHeader.BatchHash}, Calculated Hash: {hash}, Offset: {data.Offset}, Size: {data.Count}, Full receive array: {ByteArrayToString(data.Array, 0, data.Array.Length)}");
|
||||
return;
|
||||
}
|
||||
|
||||
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
||||
{
|
||||
m_Hooks[hookIdx].OnBeforeReceiveBatch(clientId, batchHeader.BatchCount, batchReader.Length);
|
||||
}
|
||||
|
||||
for (var messageIdx = 0; messageIdx < batchHeader.BatchCount; ++messageIdx)
|
||||
{
|
||||
var messageHeader = new NetworkMessageHeader();
|
||||
var position = batchReader.Position;
|
||||
try
|
||||
{
|
||||
ByteUnpacker.ReadValueBitPacked(batchReader, out messageHeader.MessageType);
|
||||
ByteUnpacker.ReadValueBitPacked(batchReader, out messageHeader.MessageSize);
|
||||
}
|
||||
catch (OverflowException)
|
||||
{
|
||||
NetworkLog.LogError("Received a batch that didn't have enough data for all of its batches, ending early!");
|
||||
throw;
|
||||
}
|
||||
|
||||
var receivedHeaderSize = batchReader.Position - position;
|
||||
|
||||
if (!batchReader.TryBeginRead((int)messageHeader.MessageSize))
|
||||
{
|
||||
NetworkLog.LogError("Received a message that claimed a size larger than the packet, ending early!");
|
||||
return;
|
||||
}
|
||||
|
||||
m_IncomingMessageQueue.Add(new ReceiveQueueItem
|
||||
{
|
||||
Header = messageHeader,
|
||||
SenderId = clientId,
|
||||
Timestamp = receiveTime,
|
||||
// Copy the data for this message into a new FastBufferReader that owns that memory.
|
||||
// We can't guarantee the memory in the ArraySegment stays valid because we don't own it, so we must move it to memory we do own.
|
||||
Reader = new FastBufferReader(batchReader.GetUnsafePtrAtCurrentPosition(), Allocator.TempJob, (int)messageHeader.MessageSize),
|
||||
MessageHeaderSerializedSize = receivedHeaderSize,
|
||||
});
|
||||
batchReader.Seek(batchReader.Position + (int)messageHeader.MessageSize);
|
||||
}
|
||||
|
||||
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
||||
{
|
||||
m_Hooks[hookIdx].OnAfterReceiveBatch(clientId, batchHeader.BatchCount, batchReader.Length);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private bool CanReceive(ulong clientId, Type messageType, FastBufferReader messageContent, ref NetworkContext context)
|
||||
{
|
||||
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
||||
{
|
||||
if (!m_Hooks[hookIdx].OnVerifyCanReceive(clientId, messageType, messageContent, ref context))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
internal Type GetMessageForHash(uint messageHash)
|
||||
{
|
||||
if (!m_MessagesByHash.ContainsKey(messageHash))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
return m_MessagesByHash[messageHash];
|
||||
}
|
||||
|
||||
internal void SetVersion(ulong clientId, uint messageHash, int version)
|
||||
{
|
||||
if (!m_MessagesByHash.ContainsKey(messageHash))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var messageType = m_MessagesByHash[messageHash];
|
||||
|
||||
if (!m_PerClientMessageVersions.ContainsKey(clientId))
|
||||
{
|
||||
m_PerClientMessageVersions[clientId] = new Dictionary<Type, int>();
|
||||
}
|
||||
|
||||
m_PerClientMessageVersions[clientId][messageType] = version;
|
||||
}
|
||||
|
||||
internal void SetServerMessageOrder(NativeArray<uint> messagesInIdOrder)
|
||||
{
|
||||
var oldHandlers = m_MessageHandlers;
|
||||
var oldTypes = m_MessageTypes;
|
||||
m_ReverseTypeMap = new Type[messagesInIdOrder.Length];
|
||||
m_MessageHandlers = new MessageHandler[messagesInIdOrder.Length];
|
||||
m_MessageTypes = new Dictionary<Type, uint>();
|
||||
|
||||
for (var i = 0; i < messagesInIdOrder.Length; ++i)
|
||||
{
|
||||
if (!m_MessagesByHash.ContainsKey(messagesInIdOrder[i]))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var messageType = m_MessagesByHash[messagesInIdOrder[i]];
|
||||
var oldId = oldTypes[messageType];
|
||||
var handler = oldHandlers[oldId];
|
||||
var newId = (uint)i;
|
||||
m_MessageTypes[messageType] = newId;
|
||||
m_MessageHandlers[newId] = handler;
|
||||
m_ReverseTypeMap[newId] = messageType;
|
||||
}
|
||||
}
|
||||
|
||||
public void HandleMessage(in NetworkMessageHeader header, FastBufferReader reader, ulong senderId, float timestamp, int serializedHeaderSize)
|
||||
{
|
||||
using (reader)
|
||||
{
|
||||
if (header.MessageType >= m_HighMessageType)
|
||||
{
|
||||
Debug.LogWarning($"Received a message with invalid message type value {header.MessageType}");
|
||||
return;
|
||||
}
|
||||
|
||||
var context = new NetworkContext
|
||||
{
|
||||
SystemOwner = m_Owner,
|
||||
SenderId = senderId,
|
||||
Timestamp = timestamp,
|
||||
Header = header,
|
||||
SerializedHeaderSize = serializedHeaderSize,
|
||||
MessageSize = header.MessageSize,
|
||||
};
|
||||
|
||||
var type = m_ReverseTypeMap[header.MessageType];
|
||||
if (!CanReceive(senderId, type, reader, ref context))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var handler = m_MessageHandlers[header.MessageType];
|
||||
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
||||
{
|
||||
m_Hooks[hookIdx].OnBeforeReceiveMessage(senderId, type, reader.Length + FastBufferWriter.GetWriteSize<NetworkMessageHeader>());
|
||||
}
|
||||
|
||||
// This will also log an exception is if the server knows about a message type the client doesn't know about.
|
||||
// In this case the handler will be null. It is still an issue the user must deal with:
|
||||
// If the two connecting builds know about different messages, the server should not send a message to a client that doesn't know about it
|
||||
if (handler == null)
|
||||
{
|
||||
Debug.LogException(new HandlerNotRegisteredException(header.MessageType.ToString()));
|
||||
}
|
||||
else
|
||||
{
|
||||
// No user-land message handler exceptions should escape the receive loop.
|
||||
// If an exception is throw, the message is ignored.
|
||||
// Example use case: A bad message is received that can't be deserialized and throws an OverflowException because it specifies a length greater than the number of bytes in it for some dynamic-length value.
|
||||
try
|
||||
{
|
||||
handler.Invoke(reader, ref context, this);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Debug.LogException(e);
|
||||
}
|
||||
}
|
||||
|
||||
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
||||
{
|
||||
m_Hooks[hookIdx].OnAfterReceiveMessage(senderId, type, reader.Length + FastBufferWriter.GetWriteSize<NetworkMessageHeader>());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal void ProcessIncomingMessageQueue()
|
||||
{
|
||||
if (StopProcessing)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
for (var index = 0; index < m_IncomingMessageQueue.Length; ++index)
|
||||
{
|
||||
// Avoid copies...
|
||||
ref var item = ref m_IncomingMessageQueue.ElementAt(index);
|
||||
HandleMessage(item.Header, item.Reader, item.SenderId, item.Timestamp, item.MessageHeaderSerializedSize);
|
||||
if (m_Disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
m_IncomingMessageQueue.Clear();
|
||||
}
|
||||
|
||||
internal void ClientConnected(ulong clientId)
|
||||
{
|
||||
if (m_SendQueues.ContainsKey(clientId))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
m_SendQueues[clientId] = new NativeList<SendQueueItem>(16, Allocator.Persistent);
|
||||
}
|
||||
|
||||
internal void ClientDisconnected(ulong clientId)
|
||||
{
|
||||
m_DisconnectedClients.Add(clientId);
|
||||
}
|
||||
|
||||
private void CleanupDisconnectedClient(ulong clientId)
|
||||
{
|
||||
if (!m_SendQueues.ContainsKey(clientId))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var queue = m_SendQueues[clientId];
|
||||
for (var i = 0; i < queue.Length; ++i)
|
||||
{
|
||||
queue.ElementAt(i).Writer.Dispose();
|
||||
}
|
||||
|
||||
queue.Dispose();
|
||||
m_SendQueues.Remove(clientId);
|
||||
|
||||
m_PerClientMessageVersions.Remove(clientId);
|
||||
}
|
||||
|
||||
internal void CleanupDisconnectedClients()
|
||||
{
|
||||
foreach (var clientId in m_DisconnectedClients)
|
||||
{
|
||||
CleanupDisconnectedClient(clientId);
|
||||
}
|
||||
|
||||
m_DisconnectedClients.Clear();
|
||||
}
|
||||
|
||||
public static int CreateMessageAndGetVersion<T>() where T : INetworkMessage, new()
|
||||
{
|
||||
return new T().Version;
|
||||
}
|
||||
|
||||
internal int GetMessageVersion(Type type, ulong clientId, bool forReceive = false)
|
||||
{
|
||||
if (!m_PerClientMessageVersions.TryGetValue(clientId, out var versionMap))
|
||||
{
|
||||
if (forReceive)
|
||||
{
|
||||
Debug.LogWarning($"Trying to receive {type.Name} from client {clientId} which is not in a connected state.");
|
||||
}
|
||||
else
|
||||
{
|
||||
Debug.LogWarning($"Trying to send {type.Name} to client {clientId} which is not in a connected state.");
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!versionMap.TryGetValue(type, out var messageVersion))
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
return messageVersion;
|
||||
}
|
||||
|
||||
public static void ReceiveMessage<T>(FastBufferReader reader, ref NetworkContext context, NetworkMessageManager manager) where T : INetworkMessage, new()
|
||||
{
|
||||
var message = new T();
|
||||
var messageVersion = 0;
|
||||
// Special cases because these are the messages that carry the version info - thus the version info isn't
|
||||
// populated yet when we get these. The first part of these messages always has to be the version data
|
||||
// and can't change.
|
||||
if (typeof(T) != typeof(ConnectionRequestMessage) && typeof(T) != typeof(ConnectionApprovedMessage) && typeof(T) != typeof(DisconnectReasonMessage))
|
||||
{
|
||||
messageVersion = manager.GetMessageVersion(typeof(T), context.SenderId, true);
|
||||
if (messageVersion < 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (message.Deserialize(reader, ref context, messageVersion))
|
||||
{
|
||||
for (var hookIdx = 0; hookIdx < manager.m_Hooks.Count; ++hookIdx)
|
||||
{
|
||||
manager.m_Hooks[hookIdx].OnBeforeHandleMessage(ref message, ref context);
|
||||
}
|
||||
|
||||
message.Handle(ref context);
|
||||
|
||||
for (var hookIdx = 0; hookIdx < manager.m_Hooks.Count; ++hookIdx)
|
||||
{
|
||||
manager.m_Hooks[hookIdx].OnAfterHandleMessage(ref message, ref context);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private bool CanSend(ulong clientId, Type messageType, NetworkDelivery delivery)
|
||||
{
|
||||
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
||||
{
|
||||
if (!m_Hooks[hookIdx].OnVerifyCanSend(clientId, messageType, delivery))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
internal int SendMessage<TMessageType, TClientIdListType>(ref TMessageType message, NetworkDelivery delivery, in TClientIdListType clientIds)
|
||||
where TMessageType : INetworkMessage
|
||||
where TClientIdListType : IReadOnlyList<ulong>
|
||||
{
|
||||
if (clientIds.Count == 0)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
var largestSerializedSize = 0;
|
||||
var sentMessageVersions = new NativeHashSet<int>(clientIds.Count, Allocator.Temp);
|
||||
for (var i = 0; i < clientIds.Count; ++i)
|
||||
{
|
||||
var messageVersion = 0;
|
||||
// Special case because this is the message that carries the version info - thus the version info isn't populated yet when we get this.
|
||||
// The first part of this message always has to be the version data and can't change.
|
||||
if (typeof(TMessageType) != typeof(ConnectionRequestMessage))
|
||||
{
|
||||
messageVersion = GetMessageVersion(typeof(TMessageType), clientIds[i]);
|
||||
if (messageVersion < 0)
|
||||
{
|
||||
// Client doesn't know this message exists, don't send it at all.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (sentMessageVersions.Contains(messageVersion))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
sentMessageVersions.Add(messageVersion);
|
||||
|
||||
var maxSize = delivery == NetworkDelivery.ReliableFragmentedSequenced ? FragmentedMessageMaxSize : NonFragmentedMessageMaxSize;
|
||||
|
||||
using var tmpSerializer = new FastBufferWriter(NonFragmentedMessageMaxSize - FastBufferWriter.GetWriteSize<NetworkMessageHeader>(), Allocator.Temp, maxSize - FastBufferWriter.GetWriteSize<NetworkMessageHeader>());
|
||||
|
||||
message.Serialize(tmpSerializer, messageVersion);
|
||||
|
||||
var size = SendPreSerializedMessage(tmpSerializer, maxSize, ref message, delivery, clientIds, messageVersion);
|
||||
largestSerializedSize = size > largestSerializedSize ? size : largestSerializedSize;
|
||||
}
|
||||
|
||||
sentMessageVersions.Dispose();
|
||||
|
||||
return largestSerializedSize;
|
||||
}
|
||||
|
||||
internal unsafe int SendPreSerializedMessage<TMessageType>(in FastBufferWriter tmpSerializer, int maxSize, ref TMessageType message, NetworkDelivery delivery, in IReadOnlyList<ulong> clientIds, int messageVersionFilter)
|
||||
where TMessageType : INetworkMessage
|
||||
{
|
||||
using var headerSerializer = new FastBufferWriter(FastBufferWriter.GetWriteSize<NetworkMessageHeader>(), Allocator.Temp);
|
||||
|
||||
var header = new NetworkMessageHeader
|
||||
{
|
||||
MessageSize = (uint)tmpSerializer.Length,
|
||||
MessageType = m_MessageTypes[typeof(TMessageType)],
|
||||
};
|
||||
BytePacker.WriteValueBitPacked(headerSerializer, header.MessageType);
|
||||
BytePacker.WriteValueBitPacked(headerSerializer, header.MessageSize);
|
||||
|
||||
for (var i = 0; i < clientIds.Count; ++i)
|
||||
{
|
||||
if (m_DisconnectedClients.Contains(clientIds[i]))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
// Special case because this is the message that carries the version info - thus the version info isn't populated yet when we get this.
|
||||
// The first part of this message always has to be the version data and can't change.
|
||||
if (typeof(TMessageType) != typeof(ConnectionRequestMessage))
|
||||
{
|
||||
var messageVersion = GetMessageVersion(typeof(TMessageType), clientIds[i]);
|
||||
if (messageVersion < 0)
|
||||
{
|
||||
// Client doesn't know this message exists, don't send it at all.
|
||||
continue;
|
||||
}
|
||||
|
||||
if (messageVersion != messageVersionFilter)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
var clientId = clientIds[i];
|
||||
|
||||
if (!CanSend(clientId, typeof(TMessageType), delivery))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
||||
{
|
||||
m_Hooks[hookIdx].OnBeforeSendMessage(clientId, ref message, delivery);
|
||||
}
|
||||
|
||||
var sendQueueItem = m_SendQueues[clientId];
|
||||
if (sendQueueItem.Length == 0)
|
||||
{
|
||||
sendQueueItem.Add(new SendQueueItem(delivery, NonFragmentedMessageMaxSize, Allocator.TempJob, maxSize));
|
||||
sendQueueItem.ElementAt(0).Writer.Seek(sizeof(NetworkBatchHeader));
|
||||
}
|
||||
else
|
||||
{
|
||||
ref var lastQueueItem = ref sendQueueItem.ElementAt(sendQueueItem.Length - 1);
|
||||
if (lastQueueItem.NetworkDelivery != delivery || lastQueueItem.Writer.MaxCapacity - lastQueueItem.Writer.Position < tmpSerializer.Length + headerSerializer.Length)
|
||||
{
|
||||
sendQueueItem.Add(new SendQueueItem(delivery, NonFragmentedMessageMaxSize, Allocator.TempJob, maxSize));
|
||||
sendQueueItem.ElementAt(sendQueueItem.Length - 1).Writer.Seek(sizeof(NetworkBatchHeader));
|
||||
}
|
||||
}
|
||||
|
||||
ref var writeQueueItem = ref sendQueueItem.ElementAt(sendQueueItem.Length - 1);
|
||||
writeQueueItem.Writer.TryBeginWrite(tmpSerializer.Length + headerSerializer.Length);
|
||||
|
||||
writeQueueItem.Writer.WriteBytes(headerSerializer.GetUnsafePtr(), headerSerializer.Length);
|
||||
writeQueueItem.Writer.WriteBytes(tmpSerializer.GetUnsafePtr(), tmpSerializer.Length);
|
||||
writeQueueItem.BatchHeader.BatchCount++;
|
||||
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
||||
{
|
||||
m_Hooks[hookIdx].OnAfterSendMessage(clientId, ref message, delivery, tmpSerializer.Length + headerSerializer.Length);
|
||||
}
|
||||
}
|
||||
|
||||
return tmpSerializer.Length + headerSerializer.Length;
|
||||
}
|
||||
|
||||
internal unsafe int SendPreSerializedMessage<TMessageType>(in FastBufferWriter tmpSerializer, int maxSize, ref TMessageType message, NetworkDelivery delivery, ulong clientId)
|
||||
where TMessageType : INetworkMessage
|
||||
{
|
||||
var messageVersion = 0;
|
||||
// Special case because this is the message that carries the version info - thus the version info isn't
|
||||
// populated yet when we get this. The first part of this message always has to be the version data
|
||||
// and can't change.
|
||||
if (typeof(TMessageType) != typeof(ConnectionRequestMessage))
|
||||
{
|
||||
messageVersion = GetMessageVersion(typeof(TMessageType), clientId);
|
||||
if (messageVersion < 0)
|
||||
{
|
||||
// Client doesn't know this message exists, don't send it at all.
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
ulong* clientIds = stackalloc ulong[] { clientId };
|
||||
return SendPreSerializedMessage(tmpSerializer, maxSize, ref message, delivery, new PointerListWrapper<ulong>(clientIds, 1), messageVersion);
|
||||
}
|
||||
|
||||
private struct PointerListWrapper<T> : IReadOnlyList<T>
|
||||
where T : unmanaged
|
||||
{
|
||||
private unsafe T* m_Value;
|
||||
private int m_Length;
|
||||
|
||||
internal unsafe PointerListWrapper(T* ptr, int length)
|
||||
{
|
||||
m_Value = ptr;
|
||||
m_Length = length;
|
||||
}
|
||||
|
||||
public int Count
|
||||
{
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
get => m_Length;
|
||||
}
|
||||
|
||||
public unsafe T this[int index]
|
||||
{
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
get => m_Value[index];
|
||||
}
|
||||
|
||||
public IEnumerator<T> GetEnumerator()
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
IEnumerator IEnumerable.GetEnumerator()
|
||||
{
|
||||
return GetEnumerator();
|
||||
}
|
||||
}
|
||||
|
||||
internal unsafe int SendMessage<T>(ref T message, NetworkDelivery delivery,
|
||||
ulong* clientIds, int numClientIds)
|
||||
where T : INetworkMessage
|
||||
{
|
||||
return SendMessage(ref message, delivery, new PointerListWrapper<ulong>(clientIds, numClientIds));
|
||||
}
|
||||
|
||||
internal unsafe int SendMessage<T>(ref T message, NetworkDelivery delivery, ulong clientId)
|
||||
where T : INetworkMessage
|
||||
{
|
||||
ulong* clientIds = stackalloc ulong[] { clientId };
|
||||
return SendMessage(ref message, delivery, new PointerListWrapper<ulong>(clientIds, 1));
|
||||
}
|
||||
|
||||
internal unsafe int SendMessage<T>(ref T message, NetworkDelivery delivery, in NativeArray<ulong> clientIds)
|
||||
where T : INetworkMessage
|
||||
{
|
||||
return SendMessage(ref message, delivery, new PointerListWrapper<ulong>((ulong*)clientIds.GetUnsafePtr(), clientIds.Length));
|
||||
}
|
||||
|
||||
internal unsafe void ProcessSendQueues()
|
||||
{
|
||||
if (StopProcessing)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
foreach (var kvp in m_SendQueues)
|
||||
{
|
||||
var clientId = kvp.Key;
|
||||
var sendQueueItem = kvp.Value;
|
||||
for (var i = 0; i < sendQueueItem.Length; ++i)
|
||||
{
|
||||
ref var queueItem = ref sendQueueItem.ElementAt(i);
|
||||
// This is checked at every iteration because
|
||||
// 1) each writer needs to be disposed, so we have to do the full loop regardless, and
|
||||
// 2) the call to m_MessageSender.Send() may result in calling ClientDisconnected(), so the result of this check may change partway through iteration
|
||||
if (m_DisconnectedClients.Contains(clientId))
|
||||
{
|
||||
queueItem.Writer.Dispose();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (queueItem.BatchHeader.BatchCount == 0)
|
||||
{
|
||||
queueItem.Writer.Dispose();
|
||||
continue;
|
||||
}
|
||||
|
||||
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
||||
{
|
||||
m_Hooks[hookIdx].OnBeforeSendBatch(clientId, queueItem.BatchHeader.BatchCount, queueItem.Writer.Length, queueItem.NetworkDelivery);
|
||||
}
|
||||
|
||||
queueItem.Writer.Seek(0);
|
||||
#if UNITY_EDITOR || DEVELOPMENT_BUILD
|
||||
// Skipping the Verify and sneaking the write mark in because we know it's fine.
|
||||
queueItem.Writer.Handle->AllowedWriteMark = sizeof(NetworkBatchHeader);
|
||||
#endif
|
||||
queueItem.BatchHeader.BatchHash = XXHash.Hash64(queueItem.Writer.GetUnsafePtr() + sizeof(NetworkBatchHeader), queueItem.Writer.Length - sizeof(NetworkBatchHeader));
|
||||
|
||||
queueItem.BatchHeader.BatchSize = queueItem.Writer.Length;
|
||||
|
||||
queueItem.Writer.WriteValue(queueItem.BatchHeader);
|
||||
|
||||
|
||||
try
|
||||
{
|
||||
m_Sender.Send(clientId, queueItem.NetworkDelivery, queueItem.Writer);
|
||||
|
||||
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
||||
{
|
||||
m_Hooks[hookIdx].OnAfterSendBatch(clientId, queueItem.BatchHeader.BatchCount, queueItem.Writer.Length, queueItem.NetworkDelivery);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
queueItem.Writer.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
sendQueueItem.Clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user