Search Unity

Concurrent Array with Random Index in IJobParallelFor

Discussion in 'Entity Component System' started by Tazadar66, Sep 23, 2018.

  1. Tazadar66

    Tazadar66

    Joined:
    Aug 27, 2013
    Posts:
    57
    Hello,

    After several questions and the kind help of the community I have made a NativeConcurrentIntArray with random index access inside IJobParallelFor.


    Code (CSharp):
    1. using System;
    2. using System.Runtime.InteropServices;
    3. using System.Threading;
    4. using Unity.Collections;
    5. using Unity.Collections.LowLevel.Unsafe;
    6.  
    7. [StructLayout(LayoutKind.Sequential)]
    8. [NativeContainer]
    9. unsafe public struct NativeConcurrentIntArray
    10. {
    11.     // The actual pointer to the allocated count needs to have restrictions relaxed so jobs can be schedled with this container
    12.     [NativeDisableUnsafePtrRestriction]
    13.     int* m_Counter;
    14.  
    15. #if ENABLE_UNITY_COLLECTIONS_CHECKS
    16.     AtomicSafetyHandle m_Safety;
    17.     // The dispose sentinel tracks memory leaks. It is a managed type so it is cleared to null when scheduling a job
    18.     // The job cannot dispose the container, and no one else can dispose it until the job has run so it is ok to not pass it along
    19.     // This attribute is required, without it this native container cannot be passed to a job since that would give the job access to a managed object
    20.     [NativeSetClassTypeToNullOnSchedule]
    21.     DisposeSentinel m_DisposeSentinel;
    22. #endif
    23.  
    24.     // Keep track of where the memory for this was allocated
    25.     Allocator m_AllocatorLabel;
    26.  
    27.     public NativeConcurrentIntArray(int size, Allocator label)
    28.     {
    29.         // This check is redundant since we always use an int which is blittable.
    30.         // It is here as an example of how to check for type correctness for generic types.
    31. #if ENABLE_UNITY_COLLECTIONS_CHECKS
    32.         if (!UnsafeUtility.IsBlittable<int>())
    33.             throw new ArgumentException(string.Format("{0} used in NativeQueue<{0}> must be blittable", typeof(int)));
    34. #endif
    35.         m_AllocatorLabel = label;
    36.  
    37.         // Allocate native memory for a single integer
    38.         m_Counter = (int*)UnsafeUtility.Malloc(UnsafeUtility.SizeOf<int>() * size, 4, label);
    39.  
    40.         // Create a dispose sentinel to track memory leaks. This also creates the AtomicSafetyHandle
    41. #if ENABLE_UNITY_COLLECTIONS_CHECKS
    42. #if UNITY_2018_3_OR_NEWER
    43.         DisposeSentinel.Create(out m_Safety, out m_DisposeSentinel, 0, label);
    44. #else
    45.         DisposeSentinel.Create(out m_Safety, out m_DisposeSentinel, 0);
    46. #endif
    47. #endif
    48.         // Initialize the count to 0 to avoid uninitialized data
    49.         for (int i = 0; i < size; ++i)
    50.         {
    51.             *(m_Counter + i) = 0;
    52.         }
    53.     }
    54.  
    55.     public void Increment(int index)
    56.     {
    57.         // Verify that the caller has write permission on this data.
    58.         // This is the race condition protection, without these checks the AtomicSafetyHandle is useless
    59. #if ENABLE_UNITY_COLLECTIONS_CHECKS
    60.         AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
    61. #endif
    62.         (*(m_Counter + index))++;
    63.     }
    64.  
    65.     public void Add(int index, int value)
    66.     {
    67.         // Verify that the caller has write permission on this data.
    68.         // This is the race condition protection, without these checks the AtomicSafetyHandle is useless
    69. #if ENABLE_UNITY_COLLECTIONS_CHECKS
    70.         AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
    71. #endif
    72.         (*(m_Counter + index)) += value;
    73.     }
    74.  
    75.     public int GetValue(int index)
    76.     {
    77.         // Verify that the caller has read permission on this data.
    78.         // This is the race condition protection, without these checks the AtomicSafetyHandle is useless
    79. #if ENABLE_UNITY_COLLECTIONS_CHECKS
    80.         AtomicSafetyHandle.CheckReadAndThrow(m_Safety);
    81. #endif
    82.         return *(m_Counter+index);
    83.     }
    84.  
    85.     public void SetValue(int index, int value)
    86.     {
    87.         // Verify that the caller has write permission on this data. This is the race condition protection, without these checks the AtomicSafetyHandle is useless
    88. #if ENABLE_UNITY_COLLECTIONS_CHECKS
    89.         AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
    90. #endif
    91.         *(m_Counter + index) = value;
    92.     }
    93.  
    94.     public bool IsCreated
    95.     {
    96.         get { return m_Counter != null; }
    97.     }
    98.  
    99.     public void Dispose()
    100.     {
    101.         // Let the dispose sentinel know that the data has been freed so it does not report any memory leaks
    102. #if ENABLE_UNITY_COLLECTIONS_CHECKS
    103. #if UNITY_2018_3_OR_NEWER
    104.         DisposeSentinel.Dispose(ref m_Safety, ref m_DisposeSentinel);
    105. #else
    106.         DisposeSentinel.Dispose(m_Safety, ref m_DisposeSentinel);
    107. #endif
    108. #endif
    109.  
    110.         UnsafeUtility.Free(m_Counter, m_AllocatorLabel);
    111.         m_Counter = null;
    112.     }
    113.  
    114.     public Concurrent ToConcurrent()
    115.     {
    116.         Concurrent concurrent;
    117.  
    118. #if ENABLE_UNITY_COLLECTIONS_CHECKS
    119.         AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
    120.         concurrent.m_Safety = m_Safety;
    121.         AtomicSafetyHandle.UseSecondaryVersion(ref concurrent.m_Safety);
    122. #endif
    123.  
    124.         concurrent.m_Counter = m_Counter;
    125.         return concurrent;
    126.     }
    127.  
    128.     [NativeContainer]
    129.     // This attribute is what makes it possible to use NativeCounter.Concurrent in a ParallelFor job
    130.     [NativeContainerIsAtomicWriteOnly]
    131.     unsafe public struct Concurrent
    132.     {
    133.         // Copy of the pointer from the full NativeCounter
    134.         [NativeDisableUnsafePtrRestriction]
    135.         internal int* m_Counter;
    136.  
    137.         // Copy of the AtomicSafetyHandle from the full NativeCounter. The dispose sentinel is not copied since this inner struct does not own the memory and is not responsible for freeing it
    138. #if ENABLE_UNITY_COLLECTIONS_CHECKS
    139.         internal AtomicSafetyHandle m_Safety;
    140. #endif
    141.  
    142.         public void Increment(int index)
    143.         {
    144.             // Increment still needs to check for write permissions
    145. #if ENABLE_UNITY_COLLECTIONS_CHECKS
    146.             AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
    147. #endif
    148.             // The actual increment is implemented with an atomic since it can be incremented by multiple threads at the same time
    149.             Interlocked.Increment(ref *(m_Counter + index));
    150.         }
    151.  
    152.         public void Add(int index, int value)
    153.         {
    154.             // Increment still needs to check for write permissions
    155. #if ENABLE_UNITY_COLLECTIONS_CHECKS
    156.             AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
    157. #endif
    158.             // The actual increment is implemented with an atomic since it can be incremented by multiple threads at the same time
    159.             Interlocked.Add(ref *(m_Counter + index), value);
    160.         }
    161.     }
    162. }
    163.  

    I have made several Tests in order to check its speed with 512*1024 elements (524288 elements).

    In the Test1 I increment 524288 times the first element of the NativeConcurrentIntArray.
    In the Test2 I increment 524288 times the first element of a NativeArray<int>. (Note that to avoid race conditions it uses an IJob instead of an IJobParallelFor).
    In the Test3 I increment 524288 times the first element of a standard C# int[] array in the main thread loop.
    In the Test4 I increment 524288 times the index element of the NativeConcurrentIntArray.

    Code (CSharp):
    1. public class TestSystem : JobComponentSystem
    2. {
    3.     public struct MyJob : IJobParallelFor
    4.     {
    5.         [NativeDisableParallelForRestriction]
    6.         public NativeConcurrentIntArray.Concurrent Data;
    7.  
    8.         public void Execute(int i)
    9.         {
    10.             Data.Add(0, 1);
    11.         }
    12.     }
    13.  
    14.     NativeConcurrentIntArray Test;
    15.  
    16.     protected override void OnCreateManager()
    17.     {
    18.         Test = new NativeConcurrentIntArray(1024 * 512, Allocator.Persistent);
    19.     }
    20.  
    21.     protected override void OnDestroyManager()
    22.     {
    23.         Test.Dispose();
    24.     }
    25.  
    26.     protected override JobHandle OnUpdate(JobHandle inputDeps)
    27.     {
    28.         //Debug.Log("TestSystem Value = " + Test.GetValue(0));
    29.  
    30.         return new MyJob
    31.         {
    32.             Data = Test.ToConcurrent(),
    33.         }.Schedule(1024*512, 32, inputDeps);
    34.     }
    35. }
    Code (CSharp):
    1. public class TestSystem2 : JobComponentSystem
    2. {
    3.     public struct MyJob : IJob
    4.     {
    5.         public NativeArray<int> Data;
    6.  
    7.         public void Execute()
    8.         {
    9.             for (int i = 0; i < Data.Length; ++i)
    10.             {
    11.                 Data[0] = Data[0] + 1;
    12.             }
    13.         }
    14.     }
    15.  
    16.     NativeArray<int> Test;
    17.  
    18.     protected override void OnCreateManager()
    19.     {
    20.         Test = new NativeArray<int>(1024 * 512, Allocator.Persistent);
    21.     }
    22.  
    23.     protected override void OnDestroyManager()
    24.     {
    25.         Test.Dispose();
    26.     }
    27.  
    28.     protected override JobHandle OnUpdate(JobHandle inputDeps)
    29.     {
    30.         //Debug.Log("TestSystem2 Value = " + Test[0]);
    31.  
    32.         return new MyJob
    33.         {
    34.             Data = Test
    35.         }.Schedule(inputDeps);
    36.     }
    37. }
    Code (CSharp):
    1. public class TestSystem3 : JobComponentSystem
    2. {
    3.     int[] Test;
    4.  
    5.     protected override void OnCreateManager()
    6.     {
    7.         Test = new int[1024 * 512];
    8.     }
    9.  
    10.     protected override JobHandle OnUpdate(JobHandle inputDeps)
    11.     {
    12.         //Debug.Log("TestSystem3 Value = " + Test[0]);
    13.  
    14.         for (int i = 0; i < Test.Length; ++i)
    15.         {
    16.             Test[0] = Test[0] + 1;
    17.         }
    18.  
    19.         return base.OnUpdate(inputDeps);
    20.     }
    21. }
    22.  
    Code (CSharp):
    1. public class TestSystem4 : JobComponentSystem
    2. {
    3.     public struct MyJob : IJobParallelFor
    4.     {
    5.         [NativeDisableParallelForRestriction]
    6.         public NativeConcurrentIntArray.Concurrent Data;
    7.  
    8.         public void Execute(int i)
    9.         {
    10.             Data.Add(i, 1);
    11.         }
    12.     }
    13.  
    14.     NativeConcurrentIntArray Test;
    15.  
    16.     protected override void OnCreateManager()
    17.     {
    18.         Test = new NativeConcurrentIntArray(1024 * 512, Allocator.Persistent);
    19.     }
    20.  
    21.     protected override void OnDestroyManager()
    22.     {
    23.         Test.Dispose();
    24.     }
    25.  
    26.     protected override JobHandle OnUpdate(JobHandle inputDeps)
    27.     {
    28.         //Debug.Log("TestSystem4 Value = " + Test.GetValue(0));
    29.  
    30.         return new MyJob
    31.         {
    32.             Data = Test.ToConcurrent(),
    33.         }.Schedule(1024*512, 32, inputDeps);
    34.     }
    35. }

    Here are the results displayed in the Entity Debugger with my CPU (first i5 generation from 2011) :
    Test1 : 0.02 ms
    Test2 : 31.01 ms
    Test3 : 2.51 ms
    Test4 : 0.01ms

    I am quite impressed by the result but i don't know why?

    Let me know what you think ;)
     
  2. xman7c7

    xman7c7

    Joined:
    Oct 28, 2016
    Posts:
    28
    Tazadar66 likes this.
  3. Tazadar66

    Tazadar66

    Joined:
    Aug 27, 2013
    Posts:
    57
    Thanks for your help but I don't understand how your project work I can't event open Entity Debugger...
     
  4. xman7c7

    xman7c7

    Joined:
    Oct 28, 2016
    Posts:
    28
    Tazadar66 likes this.