Search Unity

Passed out DataStreamReader to main thread is often already disposed

Discussion in 'Unity Transport' started by cerestorm, May 11, 2023.

  1. cerestorm

    cerestorm

    Joined:
    Apr 16, 2020
    Posts:
    664
    I've running two servers using Unity Transport in the same application, each uses an instance of the same NetworkTransportManager class for the network handling. Network events are checked for in a job and when there's Data the DataStreamReader is passed out of the job to be processed in the main thread.

    The problem I'm having is quite often the reader is already disposed when it reaches the main thread. However if I only run a single server I'm not seeing this issue.

    This is the NetworkTransportManager class:
    Code (CSharp):
    1. public class NetworkTransportManager : MonoBehaviour
    2.     {
    3.         NativeArray<int> updateCount;
    4.         NetworkDriver networkDriver;
    5.         NetworkPipeline reliablePipeline;
    6.         NativeList<NetworkConnection> networkConnections;
    7.         NativeQueue<NetworkEventDetail> eventDetails;
    8.         JobHandle jobHandle;
    9.         NetworkUpdateConnectionsJob connectionsJob;
    10.         NetworkUpdateJob updateJob;
    11.         NetworkState networkState;
    12.  
    13.         public event Action OnServerStarted = default;
    14.         public event Action<NetworkConnection> OnClientConnected = default;
    15.         public event Action<NetworkConnection> OnClientDisconnected = default;
    16.         public event Action<NetworkConnection, DataStreamReader> OnReceiveMessage = default;
    17.  
    18.         private void OnEnable()
    19.         {
    20.             updateCount = new NativeArray<int>(1, Allocator.Persistent);
    21.             updateCount[0] = 1;
    22.  
    23.             networkConnections = new NativeList<NetworkConnection>(16, Allocator.Persistent);
    24.             eventDetails = new NativeQueue<NetworkEventDetail>(Allocator.Persistent);
    25.             networkState = NetworkState.Shutdown;
    26.         }
    27.  
    28.         private void Update()
    29.         {
    30.             if (networkDriver.IsCreated)
    31.             {
    32.                 jobHandle.Complete();
    33.  
    34.                 CheckForNetworkEvents();
    35.  
    36.                 updateCount[0]++;
    37.  
    38.                 ScheduleByState();
    39.             }
    40.         }
    41.  
    42.         private void CheckForNetworkEvents()
    43.         {
    44.             while (eventDetails.TryDequeue(out var eventDetail))
    45.             {
    46.                 Debug.Log("NetworkTransportManager CheckForNetworkEvents: " + eventDetail);
    47.  
    48.                 switch (eventDetail.EventType)
    49.                 {
    50.                     case NetworkEventType.Connected:
    51.                         OnClientConnected?.Invoke(eventDetail.Connection);
    52.                         break;
    53.  
    54.                     case NetworkEventType.Disconnect:
    55.                         OnClientDisconnected?.Invoke(eventDetail.Connection);
    56.                         break;
    57.  
    58.                     case NetworkEventType.Data:
    59.                         Debug.Log($"CheckForNetworkEvents Data updateCount: {eventDetail.UpdateCount} current updateCount: {updateCount[0]}");
    60.                         Debug.Log($"CheckForNetworkEvents Data length: {eventDetail.Reader.Length}");
    61.                         OnReceiveMessage?.Invoke(eventDetail.Connection, eventDetail.Reader);
    62.                         break;
    63.                 }
    64.             }
    65.         }
    66.  
    67.         private void ScheduleByState()
    68.         {
    69.             switch (networkState)
    70.             {
    71.                 case NetworkState.Started:
    72.                     jobHandle = networkDriver.ScheduleUpdate();
    73.                     jobHandle = connectionsJob.Schedule(jobHandle);
    74.                     jobHandle = updateJob.Schedule(networkConnections, 1, jobHandle);
    75.                     break;
    76.  
    77.                 case NetworkState.ShuttingdownStart:
    78.                     Debug.Log("NetworkTransportManager ScheduleByState ShuttingdownStart");
    79.                     jobHandle = networkDriver.ScheduleUpdate();
    80.                     networkState = NetworkState.ShuttingdownEnd;
    81.                     break;
    82.  
    83.                 case NetworkState.ShuttingdownEnd:
    84.                     Debug.Log("NetworkTransportManager ScheduleByState ShuttingdownEnd");
    85.                     networkDriver.Dispose();
    86.                     networkState = NetworkState.Shutdown;
    87.                     break;
    88.             }
    89.         }
    90.  
    91.         internal void DisconnectClient(NetworkConnection connection)
    92.         {
    93.             if (networkDriver.IsCreated)
    94.             {
    95.                 jobHandle.Complete();
    96.  
    97.                 for (int i = 0; i < networkConnections.Length; i++)
    98.                 {
    99.                     if (connection == networkConnections[i])
    100.                     {
    101.                         Debug.Log("DisconnectClient found client: " + networkConnections[i].InternalId);
    102.  
    103.                         connection.Disconnect(networkDriver);
    104.                         networkConnections[i] = default;
    105.                     }
    106.                 }
    107.             }
    108.         }
    109.  
    110.         internal void SendMessage(NetworkConnection connection, IMessage message)
    111.         {
    112.             Debug.Log($"NetworkTransportManager SendMessage connection: {connection} message: {message}");
    113.  
    114.             jobHandle.Complete();
    115.  
    116.             if (networkDriver.IsCreated && networkDriver.GetConnectionState(connection) == NetworkConnection.State.Connected)
    117.             {
    118.                 Debug.Log($"NetworkTransportManager connectionState: {networkDriver.GetConnectionState(connection)}");
    119.  
    120.                 if (networkDriver.BeginSend(reliablePipeline, connection, out DataStreamWriter writer) == 0)
    121.                 {
    122.                     message.Serialize(ref writer);
    123.                     int written = networkDriver.EndSend(writer);
    124.  
    125.                     Debug.Log($"NetworkTransportManager SendMessage written: " + written);
    126.                 }
    127.             }
    128.         }
    129.  
    130.         internal void StopServer()
    131.         {
    132.             Debug.Log("NetworkTransportManager StopServer");
    133.             jobHandle.Complete();
    134.  
    135.             foreach (var networkConnection in networkConnections)
    136.             {
    137.                 if (networkConnection.GetState(networkDriver) == NetworkConnection.State.Connected)
    138.                 {
    139.                     networkConnection.Disconnect(networkDriver);
    140.                 }
    141.             }
    142.  
    143.             networkState = NetworkState.ShuttingdownStart;
    144.         }
    145.  
    146.         internal void StartServer(ushort serverPort)
    147.         {
    148.             var networkEndPoint = NetworkEndPoint.AnyIpv4;
    149.             networkEndPoint.Port = serverPort;
    150.  
    151.             NetworkSettings networkSettings = new NetworkSettings();
    152.             networkSettings.WithReliableStageParameters(32);
    153.  
    154.             try
    155.             {
    156.                 networkDriver = NetworkDriver.Create();
    157.                 reliablePipeline = networkDriver.CreatePipeline(typeof(ReliableSequencedPipelineStage));
    158.  
    159.                 if ((networkDriver.Bind(networkEndPoint) == 0))
    160.                 {
    161.                     connectionsJob = new NetworkUpdateConnectionsJob(networkDriver, networkConnections, eventDetails.AsParallelWriter());
    162.                     updateJob = new NetworkUpdateJob(networkDriver.ToConcurrent(), networkConnections.AsDeferredJobArray(),
    163.                         eventDetails.AsParallelWriter(), updateCount);
    164.  
    165.                     if (networkDriver.Listen() == 0)
    166.                     {
    167.                         networkState = NetworkState.Started;
    168.  
    169.                         OnServerStarted?.Invoke();
    170.                     }
    171.                 }
    172.             }
    173.             catch (InvalidOperationException exception)
    174.             {
    175.                 Debug.LogError($"ServerNetworkManager StartServer exception: {exception.Message}\n{exception.StackTrace}");
    176.             }
    177.         }
    178.  
    179.         public void OnDestroy()
    180.         {
    181.             ReleaseNetworkResources();
    182.         }
    183.  
    184.         private void ReleaseNetworkResources()
    185.         {
    186.             jobHandle.Complete();
    187.  
    188.             if (updateCount.IsCreated) updateCount.Dispose();
    189.  
    190.             if (networkConnections.IsCreated) networkConnections.Dispose();
    191.             if (eventDetails.IsCreated) eventDetails.Dispose();
    192.             if (networkDriver.IsCreated) networkDriver.Dispose();
    193.         }
    194.     }
    And the job class that picks up network events:
    Code (CSharp):
    1. struct NetworkUpdateJob : IJobParallelForDefer
    2.     {
    3.         public NetworkDriver.Concurrent driver;
    4.         public NativeArray<NetworkConnection> connections;
    5.         public NativeQueue<NetworkEventDetail>.ParallelWriter connectionEvents;
    6.         NativeArray<int> updateCount;
    7.  
    8.         public NetworkUpdateJob(NetworkDriver.Concurrent driver, NativeArray<NetworkConnection> connections,
    9.             NativeQueue<NetworkEventDetail>.ParallelWriter connectionEvents, NativeArray<int> updateCount)
    10.         {
    11.             this.driver = driver;
    12.             this.connections = connections;
    13.             this.connectionEvents = connectionEvents;
    14.  
    15.             this.updateCount = updateCount;
    16.         }
    17.  
    18.         public void Execute(int index)
    19.         {
    20.             if (connections[index].IsCreated)
    21.             {
    22.                 DataStreamReader reader;
    23.                 NetworkEvent.Type eventType;
    24.                 while ((eventType = driver.PopEventForConnection(connections[index], out reader)) != NetworkEvent.Type.Empty)
    25.                 {
    26.                     switch (eventType)
    27.                     {
    28.                         case NetworkEvent.Type.Data:
    29.                             Debug.Log("NetworkUpdateJob Execute Data updateCount: " + updateCount[0]);
    30.                             Debug.Log("NetworkUpdateJob Execute Data isCreated: " + reader.IsCreated);
    31.                             Debug.Log("NetworkUpdateJob Execute Data length: " + reader.Length);
    32.                             if (reader.IsCreated)
    33.                             {
    34.                                 connectionEvents.Enqueue(new NetworkEventDetail(connections[index],
    35.                                     NetworkEventType.Data, ref reader, updateCount[0]));
    36.                             }
    37.                             break;
    38.  
    39.                         case NetworkEvent.Type.Disconnect:
    40.                             connectionEvents.Enqueue(new NetworkEventDetail(connections[index], NetworkEventType.Disconnect));
    41.                             connections[index] = default;
    42.                             break;
    43.                     }
    44.                 }
    45.             }
    46.         }
    47.     }
    The above code was passing out the reader by value but I changed to by reference to test it but it didn't make a difference. The reader is not disposed within the job itself. updateCount is just a sanity check on when the Data event is received and when it's actioned.

    This is the error I'm getting:
    Code (CSharp):
    1. ObjectDisposedException: The Unity.Collections.NativeList`1[System.Byte] has been deallocated, it is not allowed to access it
    I'm also seeing this twice preceding it, but I don't recall seeing it until I added updateCount.
    Code (CSharp):
    1. AtomicSafetyNode has either been corrupted or is being accessed on a job which is not allowed.
    Clients do something similar with passing out the reader to the main thread and there are no errors. Is there something in the code or some cross-over between NetworkTransportManager instances that's causing the issue?
     
  2. simon-lemay-unity

    simon-lemay-unity

    Unity Technologies

    Joined:
    Jul 19, 2021
    Posts:
    441
    Hmmm that's curious. The different driver instances should be independent from each other so having two servers shouldn't impact their individual behavior. Do you have the complete stack trace for that
    ObjectDisposedException
    ?
    DataStreamReader
    doesn't contain any native lists so the exception is unexpected. Unless it's the underlying backing data for the reader that's been disposed, but that would be concerning.

    One thing you could also try is to create the different jobs directly at scheduling time. That is, instead of creating the jobs in
    StartServer
    , you could try creating them as needed in
    ScheduleByState
    . I'm wondering if perhaps there's an issue with keeping an old
    NetworkDriver.Concurrent
    or deferred array instance in the same job structure for a long time...
     
  3. cerestorm

    cerestorm

    Joined:
    Apr 16, 2020
    Posts:
    664
    Here's the stack trace:
    Code (CSharp):
    1. ObjectDisposedException: The Unity.Collections.NativeList`1[System.Byte] has been deallocated, it is not allowed to access it
    2. Unity.Collections.LowLevel.Unsafe.AtomicSafetyHandle.CheckReadAndThrowNoEarlyOut (Unity.Collections.LowLevel.Unsafe.AtomicSafetyHandle handle) (at <52f1101efe6a469491144c6d5cb6932d>:0)
    3. Unity.Collections.LowLevel.Unsafe.AtomicSafetyHandle.CheckReadAndThrow (Unity.Collections.LowLevel.Unsafe.AtomicSafetyHandle handle) (at /Users/bokken/build/output/unity/unity/Runtime/Export/Jobs/AtomicSafetyHandle.bindings.cs:155)
    4. Unity.Networking.Transport.DataStreamReader.CheckRead () (at Library/PackageCache/com.unity.transport@1.3.4/Runtime/DataStream.cs:1550)
    5. Unity.Networking.Transport.DataStreamReader.get_Length () (at Library/PackageCache/com.unity.transport@1.3.4/Runtime/DataStream.cs:937)
    6. Mango.Networking.Shared.NetworkTransportManager.CheckForNetworkEvents () (at Assets/Scripts/Networking/Shared/Managers/NetworkTransportManager.cs:70)
    7. Mango.Networking.Shared.NetworkTransportManager.Update () (at Assets/Scripts/Networking/Shared/Managers/NetworkTransportManager.cs:44)
    8.  
    Line 70 is the debug log of reader.Length:
    Code (CSharp):
    1.                         Debug.Log($"CheckForNetworkEvents Data length: {eventDetail.Reader.Length}");
    I'll change the job scheduling when I get the chance to sit down.
     
  4. simon-lemay-unity

    simon-lemay-unity

    Unity Technologies

    Joined:
    Jul 19, 2021
    Posts:
    441
    Thanks for the extra information. I was able to reproduce the problem on a toy example. Seems like we have a bug in how we handle the safety handles of the
    DataStreamReader
    and its backing data. The safety system ends up thinking that the reader is a view into an array that's been disposed, but that's not the case.

    I'll investigate this. In the meantime, the error shouldn't happen in builds since the safety system is only enabled in the editor. You could also consider copying the data wholesale in a buffer instead of copying the reader. A cursory look at Netcode for Entities's source code seems to indicate that this is exactly what they're doing, so it can't be that bad for performance. Although unfortunately that will probably entail using raw pointers (e.g. with fixed buffers or native allocations). Entities can use
    DynamicBuffer
    here which makes their approach more convenient.
     
  5. cerestorm

    cerestorm

    Joined:
    Apr 16, 2020
    Posts:
    664
    Ooh okay, I'll work out how to do this. Many thanks for looking into it.
     
  6. simon-lemay-unity

    simon-lemay-unity

    Unity Technologies

    Joined:
    Jul 19, 2021
    Posts:
    441
    I had a deeper look at this and I'm starting to doubt that the safety system actually supports this. I had wrongly assumed that the data stream reader could just share its safety handle with the backing buffer, but that doesn't seem to be possible across job boundaries. I know I wrote in a different thread that this is supported. I should have checked my assumptions more carefully. I'm really sorry to have misled you.

    I'll keep digging in case there's a way to make this work I didn't see. But even if there is not, I'd very much like to support your use case in some form or another. Perhaps through some kind of escape hatch to bypass the safety system. I'll keep you updated.

    In the meantime if you're looking for a way to copy that data back to the main thread, one relatively simple option would be to pass a
    NativeList<byte>
    to the job and copy the contents of each data stream reader to it. The list can be grown automatically to accommodate packets. Your
    NetworkEventDetail
    structure would then only need to store indices and lengths into that list. You could still deserialize later through a
    DataStreamReader
    by building one from a sub-section of the list. To do that, convert the list to a
    NativeArray
    (there's an implicit operator for this) and call
    GetSubArray
    to get a section of it. That section can then be used to construct a
    DataStreamReader
    .
     
    cerestorm likes this.
  7. cerestorm

    cerestorm

    Joined:
    Apr 16, 2020
    Posts:
    664
    The functionality would be handy to have. The way the application and message system are designed it feels like traversing into a different domain using the Jobs system, had I planned to use Unity Transport from the start I likely would have done things differently. I decided to keep the message system pretty much intact as it was for NGO as it makes life a lot easier and it saves me having to think of an efficient way of transforming game data into job-friendly message data.

    Thanks for the tips on getting the data out of the job. I considered doing something similar after looking into dealing with pointers it looked like a lot of effort for something that was going to be temporary, if I could actually get it to work. In the end I (again) decided to take the easy option and I'm handling events in the main thread for now. I think with the volume of messages I'll be dealing with it won't be too much of a concern if it has to stay this way.
     
  8. simon-lemay-unity

    simon-lemay-unity

    Unity Technologies

    Joined:
    Jul 19, 2021
    Posts:
    441
    Small update on this: it's very unlikely we'll manage to get the safety system to allow this. I'm thus working on adding a
    GetUnsafeReadOnlyPtr
    method to
    DataStreamReader
    . This would allow getting a raw pointer to the backing buffer of the stream reader, which could then be passed out of the job. For example, you could then store this pointer and a length in your
    NetworkEventDetail
    structure and process the data on the main thread or another job. For deserialization, it's possible to reconstruct a
    DataStreamReader
    directly from a pointer and a length.
     
  9. cerestorm

    cerestorm

    Joined:
    Apr 16, 2020
    Posts:
    664
    Thanks Simon, I'll implement it as soon as it comes available.

    Out of curiosity, should I be seeing the same problem on the clients as they still pass out the DataStreamReader and I've had no issues so far.
     
  10. simon-lemay-unity

    simon-lemay-unity

    Unity Technologies

    Joined:
    Jul 19, 2021
    Posts:
    441
    I would expect the issue to happen on clients too, but you would only get the errors if running them in the editor, since that's where safety checks are enabled. In builds, the safety checks are not performed (for performance reasons) and things will work correctly since the only problem with what you're doing is that it triggers the safety system (conceptually there's nothing wrong with passing the stream readers outside of the job).
     
  11. cerestorm

    cerestorm

    Joined:
    Apr 16, 2020
    Posts:
    664
    Strange as I'm only working in the editor (two separate projects) and I'm not having issues on the clients or when I run a single server, testing has been limited mind you. Regardless I will switch out the code for your new solution just in case.