Search Unity

Parrallel READ/WRITE to resizable "list"

Discussion in 'Data Oriented Technology Stack' started by WAYN_Group, Aug 10, 2019.

  1. WAYN_Group

    WAYN_Group

    Joined:
    Mar 16, 2019
    Posts:
    59
    Hello,

    I'm working on a project where I need to populate a list of unknown size and then read from it, of course read and write must happen concurently.
    I read this thread but it did not solve my issue : https://forum.unity.com/threads/req...n-writeonly-is-used-or-add-concurrent.522783/


    It seems that only the NativeQueue meet the criteria for the cocnurrent growing list.
    The problem is once I get that native queue I have no way to read concurently from it.
    I'm stuck with a non concurent dequeue.

    I managed to "fix" this by adding a PeekAt(index) to the NativeQueue file in the collection package.
    From my tests it works fine in any situation concurrent or non concurrent read but I can't extract that method from that file in an extension mehtod due to the level of protection of the property used.

    So the questions are :
    1 ) Is there a structure that can handle concurent read and write to an unknown sized "list", meaning it can grow on write (I don't care for removal operation in my case) ?
    2 ) Given the method I wrote (see below) Is there a way the extract it from the unity file through an extension method or something ?
    3 ) If none of the previous question can be solve would anyone also be interested in this functionality and if so would the unity team be willing to review my method to integrate it to the native queue ?

    The method I wrote :

    Code (CSharp):
    1.    
    2.       unsafe public T PeekAt(int index)
    3.         {
    4. #if ENABLE_UNITY_COLLECTIONS_CHECKS
    5.                 AtomicSafetyHandle.CheckReadAndThrow(m_Safety);
    6. #endif
    7.             NativeQueueBlockHeader* block = (NativeQueueBlockHeader*)m_Buffer->m_FirstBlock;
    8.             if (block == null)
    9.                 throw new InvalidOperationException("Trying to peek from an empty queue");
    10.  
    11.             int targetBlock = index / m_Buffer->m_ItemsPerBlock;
    12.             int currentBlock = 0;
    13.  
    14.             while (targetBlock > currentBlock)
    15.             {
    16.                 block = (NativeQueueBlockHeader*)block->nextBlock;
    17.                 currentBlock++;
    18.             }
    19.  
    20.             int itemIndexInBlock = index % m_Buffer->m_ItemsPerBlock;
    21.  
    22.             if (itemIndexInBlock  > block->itemsInBlock)
    23.                 throw new InvalidOperationException($"Out of bound {itemIndexInBlock} exceed {block->itemsInBlock}");
    24.  
    25.          
    26.  
    27.             return UnsafeUtility.ReadArrayElement<T>((byte*)block + UnsafeUtility.SizeOf<NativeQueueBlockHeader>(), itemIndexInBlock);
    28.         }

    The class used to test it :
    Code (CSharp):
    1. using System.Collections;
    2. using System.Collections.Generic;
    3. using Unity.Collections;
    4. using Unity.Jobs;
    5. using UnityEngine;
    6.  
    7. public class TestNativeQueue : MonoBehaviour
    8. {
    9.     public NativeArray<int> NativeArray;
    10.     public NativeArray<int> NativeArrayOut;
    11.     public NativeQueue<int> NativeQueue;
    12.     public int TestSize = 10;
    13.     public bool ConcurentTest = true;
    14.  
    15.     void Awake()
    16.     {
    17.         //Allocate memory
    18.         NativeArray = new NativeArray<int>(TestSize, Allocator.TempJob);
    19.         NativeQueue = new NativeQueue<int>(Allocator.TempJob);
    20.         NativeArrayOut = new NativeArray<int>(TestSize, Allocator.TempJob);
    21.  
    22.         // populate Array
    23.         for (int i = 0; i < NativeArray.Length; i++)
    24.         {
    25.             NativeArray[i] = i;
    26.             NativeQueue.Enqueue(i);
    27.         }
    28.  
    29.         if (ConcurentTest) {
    30.             // concurent read test
    31.             var job = new PopulateOutJob()
    32.             {
    33.                 NativeArray = NativeArrayOut,
    34.                 NativeQueue = NativeQueue
    35.             }.Schedule(NativeQueue.Count, 1);
    36.             job.Complete();
    37.         }
    38.         else {
    39.             // sequential read test
    40.             for (int i = 0; i < NativeArray.Length; i++)
    41.             {
    42.                 NativeArrayOut[i] = NativeQueue.PeekAt(i);
    43.             }
    44.         }
    45.  
    46.         // test Array
    47.         for (int i = 0; i < NativeArray.Length; i++)
    48.         {
    49.             if(NativeArray[i] != NativeArrayOut[i])
    50.             {
    51.                 Debug.Log($"NativeArray  {NativeArray[i]}    NativeArrayOut {NativeArrayOut[i]}");
    52.             }
    53.         }
    54.  
    55.         // Free memory
    56.         NativeArray.Dispose();
    57.         NativeArrayOut.Dispose();
    58.         NativeQueue.Dispose();
    59.     }
    60.  
    61.     struct PopulateOutJob : IJobParallelFor
    62.     {
    63.         public NativeArray<int> NativeArray;
    64.         [ReadOnly]public NativeQueue<int> NativeQueue;
    65.  
    66.         public void Execute(int index)
    67.         {
    68.             NativeArray[index] = NativeQueue.PeekAt(index);
    69.         }
    70.  
    71.     }
    72.    
    73. }
    74.  
     
  2. fholm

    fholm

    Joined:
    Aug 20, 2011
    Posts:
    2,031
    Just look up MPMC (Multiple Producer/Multiple Consumer) queues, theres tons of them in various langauges on github - port one to C#/unsafe.
     
  3. snacktime

    snacktime

    Joined:
    Apr 15, 2013
    Posts:
    2,314
    This is one of those things where almost always there is another approach, but sometimes it's just not worth the additional complexity. You really don't want to use this for high volume work where performance is critical.

    https://gist.github.com/gamemachine/16718ec0e68bc302ca12356922d0d994

    NativeIntPtr is from Jackson Dunstan's blog easy to google.
     
  4. WAYN_Group

    WAYN_Group

    Joined:
    Mar 16, 2019
    Posts:
    59
    Hello, thanks for your anwsers.

    Just to be clear I used the queue because it is the closest one from what I needed but I in fact use it a a list.

    The native list can't grow concurently but can be read at multiple index concurently, the native queue is the oposit.
    It was simple to make the read thatn the write so I went with the queue...

    I have one job that populate it concurently, then another job read from it concurently, each iteration of the job read from multiple index off the queue. I don' want to remove anything from the queue while reading it.
     
  5. Joachim_Ante

    Joachim_Ante

    Unity Technologies

    Joined:
    Mar 16, 2005
    Posts:
    4,641
    Use NativeStream. It offers:

    1) Performance
    2) Determinism
    3) Parallel Write
    4) Parallel Read

    Sounds too good to be true, but its not...

    Unity.Physics uses it for all temporary "events" contact points, collision events, jacobians etc.
     
  6. WAYN_Group

    WAYN_Group

    Joined:
    Mar 16, 2019
    Posts:
    59
    Great! I'll dig into that. Thanks.
     
  7. Soaryn

    Soaryn

    Joined:
    Apr 17, 2015
    Posts:
    198

    Happen to have any public examples on github perchance? :) The physics Samples seem to not have any in the repository, and the latest Physics package I believe is using preview-33 rather than entities 0.1.1
     
  8. Joachim_Ante

    Joachim_Ante

    Unity Technologies

    Joined:
    Mar 16, 2005
    Posts:
    4,641
    Best is to look at the NativeStreamTests.cs in the collections package.

    Physics still uses BlockStream which is the same code. Just moved to Unity.Collections, cleaned up, renamed.
    Physics hasn't been upgraded to use NativeStream yet.
     
  9. WAYN_Group

    WAYN_Group

    Joined:
    Mar 16, 2019
    Posts:
    59
    Hello @Joachim_Ante,
    Thanks for the answers. I'm migth be to exigent but it seems to me that it still requieres to know the size of the list first. It's not a "growing" concurent list like the queue is.