Search Unity

Lock-free Native Queue?

Discussion in 'Entity Component System' started by Deleted User, Apr 12, 2019.

  1. Deleted User

    Deleted User

    Guest

    Good evening everyone.

    Docs says NativeQueue<T> is a FIFO queue, however I can see that it is not allowing to write from multiple different jobs, but only from a specific type of job and worker threads. My goal is to allow to write from different type of jobs.

    Is there any workaround to include dependencies or write a NativeContainer that will support Multi producer / Single consumer pattern?

    Cheers.
     
    Last edited by a moderator: Apr 12, 2019
  2. DreamingImLatios

    DreamingImLatios

    Joined:
    Jun 3, 2017
    Posts:
    4,269
    For different jobs, build a pool of NativeQueues that each job can draw from. Then write your consumer job to take in a whole bunch of NativeQueues and call an iteration function with each one. I'm finding out T4 can be useful for generating this kind of code for a large number of native containers in a job.
     
  3. Deleted User

    Deleted User

    Guest

    That is a good suggestion.
    However, something like a ConcurrentBuffer https://github.com/nxrighthere/NetStack/blob/master/Source/NetStack.Threading/ConcurrentBuffer.cs

    but native one would be awesome feature.

    Actually I will try to rewrite it for a NativeContainer.
     
  4. recursive

    recursive

    Joined:
    Jul 12, 2012
    Posts:
    669
    The NativeQueue<T>.Concurrent adapter can be written to from a variety of Jobs simultaneously.

    The normal NativeQueue<T> can only be written to by one job at a time, and can be dequeued from one Job at a time.
     
  5. Deleted User

    Deleted User

    Guest

    Obviously I am using a concurrent version of it. A system that writes to it from a different job simply throws a dependency exception. So the queue is not supporting writing from different job types, but from only unique one.

    Code (CSharp):
    1.     public class ClientTestSendSystem : NetworkJobComponentSystem, IClientSystem
    2.     {
    3.         protected override JobHandle OnUpdate(JobHandle inputDeps)
    4.         {
    5.             var handle = new SendJob
    6.             {
    7.                 JobCommandQueue = networkIOSystem.JobCommandQueue.ToConcurrent(),
    8.             }
    9.             .Schedule(this, inputDeps);
    10.  
    11.             networkEndFrameBarrier.AddJobHandleForProducer(handle);
    12.             return handle;
    13.         }
    14.     }
    15.  
    16.     public class ClientTestSendSystem2 : NetworkJobComponentSystem, IClientSystem
    17.     {
    18.         protected override JobHandle OnUpdate(JobHandle inputDeps)
    19.         {
    20.             var handle = new SendJob
    21.             {
    22.                 JobCommandQueue = networkIOSystem.JobCommandQueue.ToConcurrent(),
    23.             }
    24.             .Schedule(this, inputDeps);
    25.  
    26.             networkEndFrameBarrier.AddJobHandleForProducer(handle);
    27.             return handle;
    28.         }
    29.     }
     
    Last edited by a moderator: Apr 12, 2019
  6. Deleted User

    Deleted User

    Guest


    [NativeDisableContainerSafetyRestriction]
    public NativeQueue<IntPtr>.Concurrent JobCommandQueue;


    [NativeDisableContainerSafetyRestriction] attribute in a Job prevents Editor from throwing an exception about safety writing. However, race condition may occur.
     
  7. recursive

    recursive

    Joined:
    Jul 12, 2012
    Posts:
    669
    Oh... you're writing to one Queue from across systems. Yeah that's more complicated than what I thought the problem was.
     
  8. Spy-Shifty

    Spy-Shifty

    Joined:
    May 5, 2011
    Posts:
    546
    Deleted User likes this.
  9. Deleted User

    Deleted User

    Guest

    Last edited by a moderator: Apr 12, 2019
  10. julian-moschuering

    julian-moschuering

    Joined:
    Apr 15, 2014
    Posts:
    529
    I skimmed over it and I don't think this is the case. NativeQuery<T>.Concurrent uses the job systems ThreadIndex for TLS which is independent of which job is run by the thread. It should work cross jobs but you lose all other safety checks too of course.
    Btw the NativeQuery implemention should be alot faster than ConcurrentBuffer as it prevents same cacheline writes by multiple threads which can be expensive.
     
  11. 5argon

    5argon

    Joined:
    Jun 10, 2013
    Posts:
    1,555
    I am not aware of this restriction at all, and I remembered it works fine in my game being queued from multiple places. What is the actual condition to make the Concurrent version fail? What did the dependency exception looks like?
     
  12. Deleted User

    Deleted User

    Guest

    Thank you for the reply.
    That basically means that a race condition may occur? I also need a order of enqueued commands. So somehow I need to have a job dependency on the jobs that write to the same NativeQueue. Right now with an attribute it means that Job1 and Job2 do not work in any order.
     
  13. Deleted User

    Deleted User

    Guest

    Thank you for replying here.
    The error is a type of a dependency error type of :

    The previously scheduled job T writes to the NativeQueue You must call JobHandle.Complete() on the job T



    Code (CSharp):
    1.     public class ClientTestSendSystem : NetworkJobComponentSystem, IClientSystem
    2.     {
    3.         protected override JobHandle OnUpdate(JobHandle inputDeps)
    4.         {
    5.             var handle = new SendJob
    6.             {
    7.                 JobCommandQueue = networkIOSystem.JobCommandQueue.ToConcurrent(),
    8.             }
    9.             .Schedule(this, inputDeps);
    10.  
    11.             networkEndFrameBarrier.AddJobHandleForProducer(handle);
    12.             return handle;
    13.         }
    14.     }
    15.     public class ClientTestSendSystem2 : NetworkJobComponentSystem, IClientSystem
    16.     {
    17.         protected override JobHandle OnUpdate(JobHandle inputDeps)
    18.         {
    19.             var handle = new SendJob
    20.             {
    21.                 JobCommandQueue = networkIOSystem.JobCommandQueue.ToConcurrent(),
    22.             }
    23.             .Schedule(this, inputDeps);
    24.  
    25.             networkEndFrameBarrier.AddJobHandleForProducer(handle);
    26.             return handle;
    27.         }
    28.     }
     
  14. 5argon

    5argon

    Joined:
    Jun 10, 2013
    Posts:
    1,555
    I get what you meant now. What's happening here is that the "Concurrent" mechanism should work in your case, but safety system is getting in the way since the `inputDeps` is not bringing the information about your native container and could not auto complete the prior job. Input deps auto complete based on type, but this is a whole container. (As it is a common problem with any native containers yanked from somewhere and throw in the job, the `inputDeps` system can only work on dependencies known from EntityQuery) However the safety system do know which container is currently being worked on by which jobs, so it is able to thow this error. Just that it couldn't complete for you.

    However, Concurrent variant's trait is that it could handle multiple jobs working on it at the same time. But, (I think) it was designed for IJobForEach/IJobChunk/IJobParallelFor to be that "multiple jobs". (Jobs that could split itself to do the same task)

    In your case they are "completely different" jobs, what you want to do is to tell the safety system that it is fine not to complete the other job. [NativeDisableContainerSafetyRestriction] is for this purpose.

    No race condition should occur because this is what Concurrent containers are made for. It uses a unique thread index to block on write properly with a mutex. No 2 jobs could start with a same thread index, unless you use non Concurrent version where it all receive thread index 0. (Valid thread index from [NativeSetThreadIndex] starts at 1)

    You can just confirm by yourself with something like this. Try changing the Concurrent version to normal version and the exception will eventually throw with dequeued value 0.

    Code (CSharp):
    1.     public class QueueWrite1 : JobComponentSystem
    2.     {
    3.         public NativeQueue<int> q;
    4.         protected override void OnCreateManager()
    5.         {
    6.             q = new NativeQueue<int>(Allocator.Persistent);
    7.         }
    8.  
    9.         protected override void OnDestroyManager()
    10.         {
    11.             q.Dispose();
    12.         }
    13.  
    14.         protected override JobHandle OnUpdate(JobHandle inputDeps)
    15.         {
    16.             var handle = new WriteJob
    17.             {
    18.                 JobCommandQueue = q.ToConcurrent(),
    19.             }
    20.             .Schedule(inputDeps);
    21.             return handle;
    22.         }
    23.     }
    24.  
    25.     public class QueueWrite2 : JobComponentSystem
    26.     {
    27.         QueueWrite1 s;
    28.         protected override void OnCreateManager()
    29.         {
    30.             s = World.GetOrCreateSystem<QueueWrite1>();
    31.         }
    32.  
    33.         protected override JobHandle OnUpdate(JobHandle inputDeps)
    34.         {
    35.             var handle = new WriteJob
    36.             {
    37.                 JobCommandQueue = s.q.ToConcurrent(),
    38.             }
    39.             .Schedule(inputDeps);
    40.             return handle;
    41.         }
    42.     }
    43.  
    44.     public struct WriteJob : IJob
    45.     {
    46.         [NativeDisableContainerSafetyRestriction] public NativeQueue<int>.Concurrent JobCommandQueue;
    47.         public void Execute()
    48.         {
    49.             for (int i = 0; i < 10000; i++)
    50.             {
    51.                 JobCommandQueue.Enqueue(123456789);
    52.             }
    53.         }
    54.     }
    55.  
    56.  
    57.     public class CheckIntegrity : ComponentSystem
    58.     {
    59.         QueueWrite1 s;
    60.         protected override void OnCreateManager()
    61.         {
    62.             s = World.GetOrCreateSystem<QueueWrite1>();
    63.         }
    64.  
    65.         protected override void OnUpdate()
    66.         {
    67.             while(s.q.TryDequeue(out int item))
    68.             {
    69.                 if(item != 123456789)
    70.                 {
    71.                     throw new Exception($"Race condition occurred (Dequeued {item})");
    72.                 }
    73.             }
    74.         }
    75.     }
     
    Last edited: Apr 17, 2019
    Neiist and (deleted member) like this.
  15. snacktime

    snacktime

    Joined:
    Apr 15, 2013
    Posts:
    3,356
    What's confusing is that the scope for concurrent is a single job handle. But a job handle can represent both parallel and non parallel jobs, so it's not consistent behavior in terms of the amount of parallelism, which makes it a bit non intuitive. The safety system should really special case these IMO rather then end users having to manually disable the safety system for the container.
     
    Deleted User likes this.
  16. Spy-Shifty

    Spy-Shifty

    Joined:
    May 5, 2011
    Posts:
    546
    As I wrote it before... I've solved it by combining Jobhandles from the different systems before scheduling the jobs.
    Code (CSharp):
    1.  protected override JobHandle OnUpdate(JobHandle inputDeps) {
    2.             inputDeps = JobHandle.CombineDependencies(EndFrameNetworking.NetworkJobHandle, inputDeps);
    3.             inputDeps = //schedule jobs...
    4.             EndFrameNetworking.AddJobHandleForNetworkProducer(inputDeps);
    5.             return inputDeps;
    6.         }
    7.  
    Code (CSharp):
    1. public class EndFrameNetworking : EntityCommandBufferSystem {
    2.         //....
    3.        private JobHandle m_NetworkProducerHandle  = default;
    4.        public JobHandle NetworkJobHandle => m_NetworkProducerHandle;
    5.  
    6.         internal void AddJobHandleForNetworkProducer(JobHandle producerJob) {
    7.             m_NetworkProducerHandle = JobHandle.CombineDependencies(m_NetworkProducerHandle, producerJob);
    8.         }
    9.         protected override void OnUpdate() {
    10.             base.OnUpdate();
    11.             FlushBuffers();
    12.         }
    13.         private void FlushBuffers() {
    14.             m_NetworkProducerHandle.Complete();
    15.             m_NetworkProducerHandle = new JobHandle();
    16.             for (int i = 0; i < m_PendingWriters.Count; i++) {
    17.                 NativeNetworkWriter writer = m_PendingWriters[i];
    18.                 //TODO Data transfer to network API
    19.                 writer.Dispose();
    20.             }
    21.             for (int i = 0; i < m_PendingReaders.Count; i++) {
    22.                 NativeNetworkReader reader = m_PendingReaders[i];
    23.                 reader.Dispose();
    24.             }
    25.             m_PendingWriters.Clear();
    26.             m_PendingReaders.Clear();
    27.         }
    28.     }
     
  17. Deleted User

    Deleted User

    Guest

    Thank you so very much.
     
  18. Deleted User

    Deleted User

    Guest

    I would definitely agree on that.
     
  19. Deleted User

    Deleted User

    Guest

    @5argon last question. Would it be possible to access the NativeQueue in a regular C# thread? Because accessing it that way throws an exception that a NativeContainer can be accessed only from job thread or the main one?
     
  20. 5argon

    5argon

    Joined:
    Jun 10, 2013
    Posts:
    1,555
    I have no idea how to disable that warning on regular C# threading