Search Unity

  1. Welcome to the Unity Forums! Please take the time to read our Code of Conduct to familiarize yourself with the forum rules and how to post constructively.

IJobBatchedParallelFor - easy safe scheduling of jobs that write to ranges of a NativeArray

Discussion in 'Entity Component System' started by gfoot, Aug 7, 2019.

  1. gfoot

    gfoot

    Joined:
    Jan 5, 2011
    Posts:
    550
    While experimenting with the job system, I wanted to have one large NativeArray be processed by parallel jobs, with each job getting a range of indices. For example, I wanted to take a large array of floats, split it into 16 roughly equal-sized ranges, run quicksorts on those (one job on each), then offer progressively larger ranges into merge sort jobs to leave the whole array sorted in the end.

    The existing NativeSlice<> functionality looks like it's heading in this direction, but doesn't seem to yet interface well enough with the NativeArray safety checks to allow this to work transparently.

    IJobParallelFor provides great support for processing one element at a time, in isolation, but doesn't explicitly allow "horizontal" algorithms that need write access to ranges of elements.

    On looking more closely at how IJobParallelFor works, I found it's possible to define an alternate implementation that I've called IJobBatchedParallelFor, without needing engine-level support. The code below is a decompile of the existing IJobParallelForExtensions implementation, edited to explicitly send a range of elements to the user's job Execute method.

    Using this, I was able to get the effect I wanted, so I thought it could be useful to others. I'd be interested to hear if other people have algorithms that want to work like this.

    The library code to support this is below, but first here's an example job using this system. If anybody is interested I can post the full code for the rest of the sort algorithm as well.

    Code (CSharp):
    1. [BurstCompile(CompileSynchronously = true)]
    2. struct BurstSortPFS : IJobBatchedParallelFor
    3. {
    4.     public NativeArray<float> Data;
    5.  
    6.     public void Execute(int index, int batchSize)
    7.     {
    8.         QuickSort(index, math.min(Data.Length, index + batchSize));
    9.     }
    10.  
    11.     private void QuickSort(int start, int end)
    12.     {
    13.         float pivotValue = Data[(start + end) / 2];
    14.         Data[(start + end) / 2] = Data[start];
    15.        
    16.         int ep = end - 1;
    17.         int sp = start + 1;
    18.         while (sp <= ep)
    19.         {
    20.             float currentValue = Data[sp];
    21.  
    22.             if (currentValue <= pivotValue)
    23.             {
    24.                 ++sp;
    25.                 continue;
    26.             }
    27.  
    28.             Data[sp] = Data[ep];
    29.             Data[ep] = currentValue;
    30.             --ep;
    31.         }
    32.  
    33.         Data[start] = Data[ep];
    34.         Data[ep] = pivotValue;
    35.         if (ep - start > 1)
    36.             QuickSort(start, ep);
    37.         if (end - ep > 1)
    38.             QuickSort(ep + 1, end);
    39.     }
    40. }
    41.  
    And here's the library code to provide the interfaces - just minor changes to the existing IJobParallelFor implementation:

    Code (CSharp):
    1. using System;
    2. using System.Runtime.InteropServices;
    3. using Unity.Collections.LowLevel.Unsafe;
    4. using Unity.Jobs;
    5. using Unity.Jobs.LowLevel.Unsafe;
    6.  
    7. namespace Gfoot
    8. {
    9.   [JobProducerType(typeof(IJobBatchedParallelForExtensions.BatchedParallelForJobStruct<>))]
    10.   public interface IJobBatchedParallelFor
    11.   {
    12.     /// <summary>
    13.     ///   <para>Implement this method to perform work against a specific batch of iteration indices.</para>
    14.     /// </summary>
    15.     /// <param name="index">The index of the Parallel for loop at which to perform work.</param>
    16.     /// <param name="batchSize">The number of elements following 'index' which are available to work on.</param>
    17.     void Execute(int index, int batchSize);
    18.   }
    19.  
    20.   /// <summary>
    21.   ///   <para>Custom extension methods to allow abuse of the ParallelFor mechanism for batched array processing..</para>
    22.   /// </summary>
    23.   public static class IJobBatchedParallelForExtensions
    24.   {
    25.     public static unsafe JobHandle Schedule<T>(
    26.       this T jobData,
    27.       int arrayLength,
    28.       int innerloopBatchCount,
    29.       JobHandle dependsOn = default (JobHandle))
    30.       where T : struct, IJobBatchedParallelFor
    31.     {
    32.       JobsUtility.JobScheduleParameters parameters = new JobsUtility.JobScheduleParameters(UnsafeUtility.AddressOf<T>(ref jobData), IJobBatchedParallelForExtensions.BatchedParallelForJobStruct<T>.Initialize(), dependsOn, ScheduleMode.Batched);
    33.       return JobsUtility.ScheduleParallelFor(ref parameters, arrayLength, innerloopBatchCount);
    34.     }
    35.  
    36.     public static unsafe void Run<T>(this T jobData, int arrayLength) where T : struct, IJobBatchedParallelFor
    37.     {
    38.       JobsUtility.JobScheduleParameters parameters = new JobsUtility.JobScheduleParameters(UnsafeUtility.AddressOf<T>(ref jobData), IJobBatchedParallelForExtensions.BatchedParallelForJobStruct<T>.Initialize(), new JobHandle(), ScheduleMode.Run);
    39.       JobsUtility.ScheduleParallelFor(ref parameters, arrayLength, arrayLength);
    40.     }
    41.  
    42.     [StructLayout(LayoutKind.Sequential, Size = 1)]
    43.     internal struct BatchedParallelForJobStruct<T> where T : struct, IJobBatchedParallelFor
    44.     {
    45.       public static IntPtr jobReflectionData;
    46.  
    47.       public static IntPtr Initialize()
    48.       {
    49.         if (IJobBatchedParallelForExtensions.BatchedParallelForJobStruct<T>.jobReflectionData == IntPtr.Zero)
    50.           IJobBatchedParallelForExtensions.BatchedParallelForJobStruct<T>.jobReflectionData = JobsUtility.CreateJobReflectionData(typeof (T), JobType.ParallelFor, (object) new IJobBatchedParallelForExtensions.BatchedParallelForJobStruct<T>.ExecuteJobFunction(IJobBatchedParallelForExtensions.BatchedParallelForJobStruct<T>.Execute), (object) null, (object) null);
    51.         return IJobBatchedParallelForExtensions.BatchedParallelForJobStruct<T>.jobReflectionData;
    52.       }
    53.  
    54.       public static unsafe void Execute(
    55.         ref T jobData,
    56.         IntPtr additionalPtr,
    57.         IntPtr bufferRangePatchData,
    58.         ref JobRanges ranges,
    59.         int jobIndex)
    60.       {
    61. label_5:
    62.         int beginIndex;
    63.         int endIndex;
    64.         if (!JobsUtility.GetWorkStealingRange(ref ranges, jobIndex, out beginIndex, out endIndex))
    65.           return;
    66.         JobsUtility.PatchBufferMinMaxRanges(bufferRangePatchData, UnsafeUtility.AddressOf<T>(ref jobData), beginIndex, endIndex - beginIndex);
    67.         jobData.Execute(beginIndex, endIndex - beginIndex);
    68.         goto label_5;
    69.       }
    70.  
    71.       public delegate void ExecuteJobFunction(
    72.         ref T data,
    73.         IntPtr additionalPtr,
    74.         IntPtr bufferRangePatchData,
    75.         ref JobRanges ranges,
    76.         int jobIndex);
    77.     }
    78.   }
    79. }
    80.  
     
  2. sngdan

    sngdan

    Joined:
    Feb 7, 2014
    Posts:
    1,108
    Have you looked at IJobParallelForBatch and the Array sort in the unity ecs sources?

    Is your approach different / better?
     
  3. DreamingImLatios

    DreamingImLatios

    Joined:
    Jun 3, 2017
    Posts:
    3,467
  4. gfoot

    gfoot

    Joined:
    Jan 5, 2011
    Posts:
    550
    Ah no, I wasn't aware that interface existed, thanks for the reference. I suspect it works in much the same way - I only made minor changes here to the interface.