Search Unity

Parallel reduction

Discussion in 'Entity Component System' started by Pyromuffin, Jan 21, 2019.

  1. Pyromuffin

    Pyromuffin

    Joined:
    Aug 5, 2012
    Posts:
    85
    Hi, excuse me if this has been addressed elsewhere, but is it possible to do a parallel reduction using the job system?

    I see we have Interlocked.Add(), but that only works for integer types. For example, if I am trying to sum up a large list of floats using the job system, then it seems I would have to lock the native array element that I would be writing to. The lock statement only works on reference types, so I am unsure of what to do here.

    Thanks!
     
  2. tertle

    tertle

    Joined:
    Jan 25, 2011
    Posts:
    3,761
    Just do it in a single job thread. Honestly burst is so fast for this type of basic math stuff the overhead of scheduling parallel jobs is probably slower than just crunching it all in a single thread.
     
  3. Pyromuffin

    Pyromuffin

    Joined:
    Aug 5, 2012
    Posts:
    85
    Hi tertle,

    the example is not exactly what i am doing, really i am calculating the volume of a dynamic mesh using the shoelace algorithm, so there is actually quite a lot of work to be done.
     
  4. tertle

    tertle

    Joined:
    Jan 25, 2011
    Posts:
    3,761
    Well, if you must do it from multiple threads you could try to local sum it first (a separate element per thread) then do the final sum on the array in a single thread at the end. This is kind of how a few of the NativeContainer concurrent methods kind of work.

    But out of interest, have you benchmarked single threaded job performance?
     
  5. Pyromuffin

    Pyromuffin

    Joined:
    Aug 5, 2012
    Posts:
    85
    yes, performance is fine with 16 ish meshes, but when i scale up i start going 20 ms in just this function.

    i think i can probably do one mesh per thread and see if that works, i guess it’s the same amount of work.

    i’m coming from compute shaders, so i am used to dealing with much much higher thread counts with poor single threaded performance. i still think there should be a way to do some kind of cooperative parallel algorithms.
     
    Baggers_ likes this.
  6. jpvanoosten

    jpvanoosten

    Joined:
    Nov 20, 2009
    Posts:
    50
    I know this post is old, but I was curious about the same thing and looking for solutions for this. I didn't find any implementations, so I thought I'd attempt my own.

    This is the implementation of a generic parallel reduce using Unity's job system and the burst compiler. I've also did some performance testing and found that doing a reduction (sum) of 100k values takes 6.5ms (average) to sum all of the values in a single thread, and 0.41ms using this parallel sort. Summing 1M values takes 67ms (average) single threaded, and 2.8ms using this parallel sort (Intel i7-8700K).

    Code (CSharp):
    1.  
    2. using Unity.Burst;
    3. using Unity.Jobs;
    4. using Unity.Collections;
    5.  
    6. /// <summary>
    7. /// An interface that defines a binary operation.
    8. /// The operation can be any binary operation (sum, difference, multiply, min, max, etc...).
    9. /// </summary>
    10. /// <typeparam name="T">The type of the values to perform the operation on.</typeparam>
    11. public interface IBinaryOperator<T> where T : struct
    12. {
    13.     public T Operator(T a, T b);
    14. }
    15.  
    16. [BurstCompile(CompileSynchronously = true)]
    17. public struct ParalellReduceJob<T, U> : IJobParallelForBatch
    18.     where T : struct
    19.     where U : struct, IBinaryOperator<T>
    20. {
    21.     // The step rate of the source array.
    22.     public int Step;
    23.     [ReadOnly]
    24.     public NativeSlice<T> Src;
    25.     [WriteOnly]
    26.     public NativeSlice<T> Dst;
    27.  
    28.     /// <summary>
    29.     /// The operation to perform on the values of the array.
    30.     /// </summary>
    31.     public U Operator;
    32.  
    33.     /// <summary>
    34.     /// Serial reduction.
    35.     /// </summary>
    36.     /// <param name="src">The source array to reduce.</param>
    37.     /// <returns>The reduced value.</returns>
    38.     public static T1 Reduce<T1, U1>(in NativeSlice<T1> src, int step, U1 op)
    39.         where T1 : struct
    40.         where U1 : struct, IBinaryOperator<T1>
    41.     {
    42.         T1 val = src[0];
    43.         for (int i = step; i < src.Length; i += step)
    44.         {
    45.             val = op.Operator(val, src[i]);
    46.         }
    47.         return val;
    48.     }
    49.  
    50.     public void Execute(int startIndex, int count)
    51.     {
    52.         Dst[startIndex] = Reduce(Src.Slice(startIndex, count), Step, Operator);
    53.     }
    54. }
    55.  
    56. public static class ParallelReduce
    57. {
    58.     /// <summary>
    59.     /// Swap the arrays. This is very efficient for NativeArray as only the internal
    60.     /// memory pointer is swapped, not the values of the array.
    61.     /// </summary>
    62.     /// <typeparam name="T">The value type of the arrays being swapped.</typeparam>
    63.     /// <param name="a">The first array to swap with the second.</param>
    64.     /// <param name="b">The second array to swap with the first.</param>
    65.     private static void Swap<T>(ref NativeArray<T> a, ref NativeArray<T> b)
    66.         where T : struct
    67.     {
    68.         var t = a;
    69.         a = b;
    70.         b = t;
    71.     }
    72.  
    73.     /// <summary>
    74.     /// Perform a parallel reduction on the elements of the array.
    75.     /// </summary>
    76.     /// <typeparam name="T">The type of the elements to be reduced.</typeparam>
    77.     /// <typeparam name="U">The type of the binary operation to perform on each element of the array.</typeparam>
    78.     /// <param name="values">The values to be reduced.</param>
    79.     /// <param name="op">The operation to perform on the elements of the array.</param>
    80.     /// <returns>The result of the reduction.</returns>
    81.     public static T Reduce<T, U>(in NativeArray<T> values, U op)
    82.         where T : struct
    83.         where U : struct, IBinaryOperator<T>
    84.     {
    85.         // The number of values to reduce per thread batch.
    86.         const int BATCH_SIZE = 1024;
    87.  
    88.         // The step rate for the reduction.
    89.         // On the first iteration, this is every value of the source array.
    90.         int stepRate = 1;
    91.  
    92.         // How many values to reduce in the current batch.
    93.         int batchSize = BATCH_SIZE;
    94.  
    95.         JobHandle job = default;
    96.  
    97.         var src = new NativeArray<T>(values.Length, Allocator.TempJob, NativeArrayOptions.UninitializedMemory);
    98.         var dst = new NativeArray<T>(values, Allocator.TempJob);
    99.  
    100.         while (stepRate < values.Length)
    101.         {
    102.             Swap(ref src, ref dst);
    103.  
    104.             job = new ParalellReduceJob<T, U>
    105.             {
    106.                 Src = src,
    107.                 Dst = dst,
    108.                 Step = stepRate,
    109.                 Operator = op,
    110.             }.ScheduleBatch(values.Length, batchSize, job);
    111.  
    112.             // Increment the step rate and batch size.
    113.             stepRate = batchSize;
    114.             batchSize *= 2;
    115.         }
    116.  
    117.         job.Complete();
    118.  
    119.         T res = dst[0];
    120.  
    121.         src.Dispose();
    122.         dst.Dispose();
    123.  
    124.         return res;
    125.     }
    126. }
    127.  
    And a performance test case:

    Code (CSharp):
    1. using System;
    2. using System.Collections.Generic;
    3. using NUnit.Framework;
    4. using Unity.Collections;
    5. using Unity.Jobs;
    6. using Unity.Mathematics;
    7. using Unity.PerformanceTesting;
    8. using UnityEditor;
    9. using UnityEngine;
    10. using static Unity.Mathematics.math;
    11.  
    12. public class ParallelReduceTest
    13. {
    14.     struct SumInt : IBinaryOperator<int>
    15.     {
    16.         public int Operator(int a, int b)
    17.         {
    18.             return a + b;
    19.         }
    20.     }
    21.  
    22.     [Test, Performance]
    23.     public void Test()
    24.     {
    25.         const int NUM_VALUES = 1000000;
    26.  
    27.         var a = new NativeArray<int>(NUM_VALUES, Allocator.TempJob, NativeArrayOptions.UninitializedMemory);
    28.         for (int i = 0; i < NUM_VALUES; ++i)
    29.         {
    30.             a[i] = i;
    31.         }
    32.  
    33.         int sum0 = 0;
    34.         Measure.Method(() =>
    35.         {
    36.             sum0 = 0;
    37.             for (int i = 0; i < NUM_VALUES; ++i)
    38.             {
    39.                 sum0 += a[i];
    40.             }
    41.         }).SampleGroup($"Serial ({NUM_VALUES})").Run();
    42.  
    43.         int sum1 = 0;
    44.         Measure.Method(() =>
    45.         {
    46.             sum1 = ParallelReduce.Reduce(a, new SumInt());
    47.         }).SampleGroup($"Parallel ({NUM_VALUES})").Run();
    48.  
    49.         Assert.AreEqual(sum0, sum1);
    50.  
    51.         a.Dispose();
    52.     }
    53. }
    54.  
     
    bb8_1 and unity_fime5ey3qd like this.
  7. jpvanoosten

    jpvanoosten

    Joined:
    Nov 20, 2009
    Posts:
    50
    bb8_1 likes this.
  8. Chris-Herold

    Chris-Herold

    Joined:
    Nov 14, 2011
    Posts:
    116
    A problem like this is lends itself very well to be solved using compute shaders, especially with larger datasets.
     
  9. amarcolina

    amarcolina

    Joined:
    Jun 19, 2014
    Posts:
    65
    I worked on a small utility a while back that might be of use here. It's pretty old, but it may at least be of some use for inspiration writing a new version. It avoided using interlocked operations at all, and instead has each job thread write to it's own local buffer, which then get combined at the end when the result is read.

    It's called NativeResult<T, Op>, which allowed a parallel reduction of a set of T values using a defined Op. For example you could have a NativeResult<int, Sum>, which allows you to parallel-reduce a set of integers using addition. But it supports extension to any kind of reduce operation you might want, as long as it is commutative.
     
    Chris-Herold likes this.