Search Unity

Question Trying to understand how to parallel write to a NativeHashMap

Discussion in 'Entity Component System' started by jwvanderbeck, Jul 7, 2020.

  1. jwvanderbeck

    jwvanderbeck

    Joined:
    Dec 4, 2014
    Posts:
    825
    I have a series of arrays to process, and write the result out to a common NativeHashMap. The result of each processed array goes into a different key in the map. I want to schedule jobs to process each array and hopefully let them run at the same time, or as many of them as possible.

    I am having some trouble actually getting this working though and always end up spamming the following error:
    Code (CSharp):
    1. InvalidOperationException: The previously scheduled job ClearingHouse:CalculateMean writes to the Unity.Collections.NativeHashMap`2[System.Int32,GameLayer.MeanData] CalculateMean.output. You are trying to schedule a new job ClearingHouse:CalculateMean, which writes to the same Unity.Collections.NativeHashMap`2[System.Int32,GameLayer.MeanData] (via CalculateMean.output). To guarantee safety, you must include ClearingHouse:CalculateMean as a dependency of the newly scheduled job.
    I am attempting to do this using the ParallelWriter extension.

    Job Code
    Code (CSharp):
    1.         public struct CalculateMean : IJob
    2.         {
    3.             public NativeArray<TransactionRecord> transactions;
    4.             public NativeHashMap<int, MeanData>.ParallelWriter output;
    5.        
    6.             public void Execute()
    7.             {
    8.                 var itemIndex = transactions[0].itemIndex;
    9.            
    10.                 int totalAsks = 0;
    11.                 int totalBids = 0;
    12.                 int totalEntries = 0;
    13.  
    14.                 float totalPrice = 0f;
    15.                 int totalAskQuantity = 0;
    16.                 int totalBidQuantity = 0;
    17.            
    18.                 for (int i = 0; i < transactions.Length; i++)
    19.                 {
    20.                     var transaction = transactions[i];
    21.                     totalEntries++;
    22.  
    23.                     switch (transaction.offerType)
    24.                     {
    25.                         case OfferType.None:
    26.                             break;
    27.                         case OfferType.Ask:
    28.                             totalAsks++;
    29.                             totalPrice += transaction.price;
    30.                             totalAskQuantity += transaction.quantity;
    31.                             break;
    32.                         case OfferType.Bid:
    33.                             totalPrice += transaction.price;
    34.                             totalBids++;
    35.                             totalBidQuantity += transaction.quantity;
    36.                             break;
    37.                         default:
    38.                             throw new ArgumentOutOfRangeException();
    39.                     }
    40.                 }
    41.                 var mean = new MeanData
    42.                 {
    43.                     itemIndex = itemIndex,
    44.                     meanPrice = totalPrice / totalEntries,
    45.                     meanAskQuantity = totalAskQuantity / totalAsks,
    46.                     meanBidQuantity = totalBidQuantity / totalBids
    47.                 };
    48.                 output.TryAdd(itemIndex, mean);
    49.             }
    50.         }
    51.  
    Scheduling Code (currently running in Update() in Editor for testing)
    Code (CSharp):
    1.         void UpdateMeanDataForAllItems()
    2.         {
    3.             meanData.Clear();
    4.            
    5.             foreach (var records in historicalTransactions.Values)
    6.             {
    7.                 var recordsArray = new NativeArray<TransactionRecord>(records.Count, Allocator.TempJob);
    8.                 for (int i = 0; i < records.Count; i++)
    9.                 {
    10.                     recordsArray[i] = records[i];
    11.                 }
    12.                
    13.                 var processJob = new CalculateMean();
    14.                 processJob.output = meanData.AsParallelWriter();
    15.                 processJob.transactions = recordsArray;
    16.                 var handle = processJob.Schedule();
    17.                 recordsArray.Dispose(handle);
    18.             }
    19.         }
    20.  
     
  2. tertle

    tertle

    Joined:
    Jan 25, 2011
    Posts:
    3,761
    You aren't passing the previous handle into the job so they're executing at the same time.
    Needs to look something like this

    Code (CSharp):
    1. JobHandle handle = default;
    2.  
    3. foreach (var records in historicalTransactions.Values)
    4. {
    5.    var recordsArray = new NativeArray<TransactionRecord>(records.Count, Allocator.TempJob);
    6.    for (int i = 0; i < records.Count; i++)
    7.    {
    8.        recordsArray = records;
    9.    }
    10.  
    11.    var processJob = new CalculateMean();
    12.    processJob.output = meanData.AsParallelWriter();
    13.    processJob.transactions = recordsArray;
    14.    handle = processJob.Schedule(handle);
    15.    recordsArray.Dispose(handle);
    16. }
    However as this is a IJob this won't give you any parallel writing which seems to be what you're asking.

    Parallel writer means parallel within the same job, i.e. IJobParallelFor
    Not parallel in multiple jobs at the same time (though for some containers this is safe, e.g. NativeQueue from memory.)
     
  3. jwvanderbeck

    jwvanderbeck

    Joined:
    Dec 4, 2014
    Posts:
    825
    Well not passing in the handles because each job is independent of the others as far as what they do. The only connection being where they write to. It was my hope to make them the jobs run in parallel (as much as possible). Is this not possible? I initially looked at IJobParallelFor but I don't think that was what I wanted either, as I want to process each complete array parallel, not each element of an array.

    Essentially I have - at the "traditional" layer, a Dictionary which contains a list of transactions for each item type in the world. I want to run jobs on each list to derive statistical data from the transactions. Each list of transactions can be processed independently of the others, and ideally would be because there could potentially be tens of thousands of entries.

    EDIT: The tl;dr here is I intended for them to run at the same time, thinking the ParallelWriter would allow me to do that. Seems I was wrong.
     
    Last edited: Jul 7, 2020
  4. yondercode

    yondercode

    Joined:
    Jun 11, 2018
    Posts:
    27
    If you know the total number of transactions before hand (or keeping track of it, or just calculate it), this is one way to make it parallel (untested):

    Code (CSharp):
    1. // transactionsCount is the number of all transactions inside historicalTransactions
    2.  
    3. NativeArray<TransactionRecord> transactions = new NativeArray<TransactionRecord>(transactionsCount, Allocator.TempJob);
    4. NativeArray<int> startIndexes = new NativeArray<int>(historicalTransactions.Count, Allocator.TempJob);
    5. NativeArray<int> endIndexes = new NativeArray<int>(historicalTransactions.Count, Allocator.TempJob);
    6. int inputIndex = 0;
    7. int jobIndex = 0;
    8. foreach (var records in historicalTransactions.Values)
    9. {
    10.     int startInputIndex = inputIndex;
    11.     for (int i = 0; i < records.Count; i++, inputIndex++)
    12.     {
    13.         transactions[inputIndex] = records[i];
    14.     }
    15.     startIndexes[jobIndex] = startInputIndex;
    16.     endIndexes[jobIndex] = inputIndex;
    17.     jobIndex++;
    18. }
    19. var processJob = new CalculateMean();
    20. processJob.output = meanData.AsParallelWriter();
    21. processJob.transactions = transactions;
    22. processJob.startIndexes = startIndexes;
    23. processJob.endIndexes = endIndexes;
    24. var handle = processJob.Schedule(startIndexes.Length, 32);
    Meanwhile on the parallel job, you can safely access
    transactions
    array by using
    [ReadOnly]
    attribute.

    Code (CSharp):
    1. public struct CalculateMean : IJobParallelFor
    2. {
    3.     [ReadOnly] public NativeArray<TransactionRecord> transactions;
    4.     public NativeHashMap<int, MeanData>.ParallelWriter output;
    5.     public NativeArray<int> startIndexes;
    6.     public NativeArray<int> endIndexes;
    7.  
    8.     public void Execute(int index)
    9.     {
    10.         var startIndex = startIndexes[index];
    11.         var endIndex = endIndexes[index];
    12.  
    13.         var itemIndex = transactions[startIndex].itemIndex;
    14.  
    15.         // ...
    16.  
    17.         int sum = 0;
    18.  
    19.         for (int i = startIndex; i < endIndex; i++)
    20.         {
    21.             var transaction = transactions[i];
    22.             // ...
    23.         }
    24.         var mean = new MeanData
    25.         {
    26.             // ...
    27.         };
    28.         output.TryAdd(itemIndex, mean);
    29.     }
    30. }
     
    Last edited: Jul 7, 2020
  5. jwvanderbeck

    jwvanderbeck

    Joined:
    Dec 4, 2014
    Posts:
    825
    If I understand what you are getting at here, it is take all the arrays and pack them into one large flat array, then slice it up by storing start and end indices to each original array in the larger one, then process the whole thing as a single IJobParallel.

    My question there is, what happens if the scheduler decides to schedule more than one job to run in the same "section" at once?
     
  6. yondercode

    yondercode

    Joined:
    Jun 11, 2018
    Posts:
    27
    Your understanding is correct.

    In
    IJobParallelFor
    , the jobs will be executed exactly once for each
    index
    from 0 to the provided length, in this case the length of
    startIndexes
    . Since every
    index
    of
    startIndexes
    represents exactly one "section" of
    transactions
    , then every section will get processed exactly once.
     
  7. jwvanderbeck

    jwvanderbeck

    Joined:
    Dec 4, 2014
    Posts:
    825
    Ahh I see it now. Because you are running the Parallel Job on the startIndexes array, not the actual data array.

    This is ingenious and I doubt I would have thought of it myself!