Search Unity

  1. Welcome to the Unity Forums! Please take the time to read our Code of Conduct to familiarize yourself with the forum rules and how to post constructively.
  2. We have updated the language to the Editor Terms based on feedback from our employees and community. Learn more.
    Dismiss Notice
  3. Join us on November 16th, 2023, between 1 pm and 9 pm CET for Ask the Experts Online on Discord and on Unity Discussions.
    Dismiss Notice
  4. Dismiss Notice

NativeStream vs NativeQueue

Discussion in 'Entity Component System' started by MintTree117, Mar 21, 2021.

  1. MintTree117

    MintTree117

    Joined:
    Dec 2, 2018
    Posts:
    340
    I am having trouble figuring out why one would use a NativeStream over a concurrent NativeQueue. They both allow parallel writing, and while NativeStream says it allows parallel reading, you can just convert the queue to an array an read that in parallel.

    I looked at the NativeStreamTests folder in Unity, but I don't see in what scenario you would use stream. I know Unity physics uses NativeStream, but I don't know why they use that over NativeQueue.
     
  2. DreamingImLatios

    DreamingImLatios

    Joined:
    Jun 3, 2017
    Posts:
    3,986
    NativeQueue.ParallelWriter is not deterministic. Also converting a NativeQueue into a NativeArray is a memcpy operation, whereas NativeStream.Reader is directly reading the container memory.
     
    MintTree117 likes this.
  3. Joachim_Ante

    Joachim_Ante

    Unity Technologies

    Joined:
    Mar 16, 2005
    Posts:
    5,203
    Additionally NativeStream is significantly faster when used in a concurrent scenario. NativeQueue has contention on a single queue, while NativeStream splits the data into batches with each thread writing to its own head so to speak.
     
  4. MintTree117

    MintTree117

    Joined:
    Dec 2, 2018
    Posts:
    340
    Thanks! So as I understand it, NativeStream is sort of like an array of queues? With each foreach index refering to the head of each queue, each processed on its own thread? So this would not be performant if I have thousands of indices but each index only writes around 1-5 items? Seems like alot of allocation.
     
  5. tertle

    tertle

    Joined:
    Jan 25, 2011
    Posts:
    3,631
    It works surprisingly well at high indices count but it's not ideal and you should reduce if possible. For example you should allocate them per chunk index instead of entity index for example.

    Also if you have chunks of data (arrays) you want to pass to a stream and really want to optimize it, writing them individually can get quite costly. I wrote a couple of extensions to NativeStream, AllocateLarge and ReadLarge, that will allocate them in up to 4K blocks and are magnitudes faster for large data sets.

    Code (CSharp):
    1. namespace BovineLabs.Basics.Extensions
    2. {
    3.     using Unity.Collections;
    4.     using Unity.Collections.LowLevel.Unsafe;
    5.  
    6.     /// <summary> Extensions for NativeEventStream. </summary>
    7.     public static unsafe class NativeStreamExtensions
    8.     {
    9.         private static readonly int MaxSize = (4 * 1024) - sizeof(void*);
    10.  
    11.         /// <summary> Allocate a chunk of memory that can be larger than the max allocation size. </summary>
    12.         /// <param name="writer"> The writer. </param>
    13.         /// <param name="data"> The data to write. </param>
    14.         /// <param name="size"> The size of the data. For an array, this is UnsafeUtility.SizeOf{T} * length. </param>
    15.         public static void AllocateLarge(ref NativeStream.Writer writer, byte* data, int size)
    16.         {
    17.             if (size == 0)
    18.             {
    19.                 return;
    20.             }
    21.  
    22.             var allocationCount = size / MaxSize;
    23.             var allocationRemainder = size % MaxSize;
    24.  
    25.             for (var i = 0; i < allocationCount; i++)
    26.             {
    27.                 var ptr = writer.Allocate(MaxSize);
    28.                 UnsafeUtility.MemCpy(ptr, data + (i * MaxSize), MaxSize);
    29.             }
    30.  
    31.             if (allocationRemainder > 0)
    32.             {
    33.                 var ptr = writer.Allocate(allocationRemainder);
    34.                 UnsafeUtility.MemCpy(ptr, data + (allocationCount * MaxSize), allocationRemainder);
    35.             }
    36.         }
    37.  
    38.         /// <summary> Read a chunk of memory that could have been larger than the max allocation size. </summary>
    39.         /// <param name="reader"> The reader. </param>
    40.         /// <param name="size"> For an array, this is UnsafeUtility.SizeOf{T} * length. </param>
    41.         /// <param name="allocator"> Allocator to use. </param>
    42.         /// <returns> Pointer to data. </returns>
    43.         public static byte* ReadLarge(ref NativeStream.Reader reader, int size, Allocator allocator = Allocator.Temp)
    44.         {
    45.             if (size == 0)
    46.             {
    47.                 return default;
    48.             }
    49.  
    50.             if (size < MaxSize)
    51.             {
    52.                 return reader.ReadUnsafePtr(size);
    53.             }
    54.  
    55.             var output = (byte*)UnsafeUtility.Malloc(size, 4, allocator);
    56.  
    57.             var allocationCount = size / MaxSize;
    58.             var allocationRemainder = size % MaxSize;
    59.  
    60.             for (var i = 0; i < allocationCount; i++)
    61.             {
    62.                 var ptr = reader.ReadUnsafePtr(MaxSize);
    63.                 UnsafeUtility.MemCpy(output + (i * MaxSize), ptr, MaxSize);
    64.             }
    65.  
    66.             if (allocationRemainder > 0)
    67.             {
    68.                 var ptr = reader.ReadUnsafePtr(allocationRemainder);
    69.                 UnsafeUtility.MemCpy(output + (allocationCount * MaxSize), ptr, allocationRemainder);
    70.             }
    71.  
    72.             return output;
    73.         }
    74.     }
    75. }
    You can pass some really large sets of data with this quite efficiently.
     
    bb8_1 and LuckyWonton like this.
  6. DreamingImLatios

    DreamingImLatios

    Joined:
    Jun 3, 2017
    Posts:
    3,986
    "Queue" is the wrong word here. "Stream" is the right word. NativeStream is an "array of streams", where a stream is written out once from beginning to end, and can then be read from beginning to end multiple times.

    NativeStream has a limitation that only one stream can be written to at a time per thread, and this results in different streams written by the same thread to be compacted adjacent to each other in memory. How NativeStream does this under-the-hood reduces allocations to less than the number of streams. With that said, if you have tens of thousands of indices, there may be performance benefits to using IJobEntityBatch or IJobParallelForBatch to reduce that number of indices.
     
    xVergilx, MNNoxMortem and Ghat-Smith like this.
  7. MintTree117

    MintTree117

    Joined:
    Dec 2, 2018
    Posts:
    340
    Thanks for the information. I'd like to chime in with my experience trying out NativeStream. It performed orders of magnitude worse than the concurrent NativeQueue when writing tens of thousands of indices. I am surprised, but not surprised at the same time :p

    Although Tertle, I haven't tried your extension yet.
     
  8. LuckyWonton

    LuckyWonton

    Joined:
    Feb 28, 2014
    Posts:
    19
    How effective has this been for you? Can it handle millions of allocations?