Search Unity

  1. Curious about what's going to be in 2020.1? Have a look at the 2020.1 beta blog post.
    Dismiss Notice

Writing In Parallel from Multiple Jobs, then Reading from a later job

Discussion in 'Data Oriented Technology Stack' started by PublicEnumE, Jan 25, 2020.

  1. PublicEnumE

    PublicEnumE

    Joined:
    Feb 3, 2019
    Posts:
    196
    Part of my code is designed so that multiple jobs can safely write to the same DynamicBuffer, at different indices. Then later, a different job reads from that same DynamicBuffer and processes everything that’s been added.

    the problem is knowing how to control dependencies in this situation. How can I make sure the ‘reading’ job runs after all of the parallel ‘writing jobs’ have completed?

    the only way I know to let multiple jobs write to a NativeCollection is by disabling safety checks completely with the
    [NativeDisableContainerSafetyRestriction]
    attribute. But in that case, there's nothing to instruct the later 'reading' job to wait for the 'writing' jobs to run first.

    The only way I know around this is to use a pattern similar to what EntityCommandBufferSystem uses, and have the 'writing' systems all register their JobHandle dependencies with the 'reading' job's System, using a 'AddJobHandleForProducer()'-like function. But since this registering is happening on the main thread, it can be quite slow.

    I would love to know if there is a better way to do this.

    For example, it would be great if I could add a [WriteOnly] attribute to the the DynamicBuffer in all of the 'writing' jobs. Then add a [ReadOnly] attribute to the same DB in the later 'reading' job, and have the scheduling system understand that all the writing should complete first. But I don't believe it works that way.

    Please let me know if I'm missing anything. Thank you very much for any advice.
     
  2. PublicEnumE

    PublicEnumE

    Joined:
    Feb 3, 2019
    Posts:
    196
  3. tertle

    tertle

    Joined:
    Jan 25, 2011
    Posts:
    2,052
    The better way is to just not do it, and if you are doing it it should be all within the same system so you can easily control the dependencies.
     
  4. Curlyone

    Curlyone

    Joined:
    Mar 15, 2018
    Posts:
    32
    If i understood this correctly, you should be able to achieve that like this:

    If both of your write and read jobs are in same system:
    Code (CSharp):
    1.  
    2. var writeJobHandle = new WriteJob().Schedule(this, inputDependicies);
    3. var secondJobHandle = new ReadJob().Schedule(this, writeJobHandle);
    4. return secondJobHandle;
    5.  
    If your jobs are in different system, you can add attribute called UpdateBefore(type) or UpdateAfter(type) to your component class.

    So something like:
    Code (CSharp):
    1.  
    2. public class WriteJob : JobComponentSystem
    3.  
    4. [UpdateAfter(typeof(WriteJob)]
    5. public class ReadJob : JobComponentSystem
    6.  
    or

    Code (CSharp):
    1.  
    2. [UpdateBefore(typeof(ReadJob)]
    3. public class WriteJob : JobComponentSystem
    4.  
    5. public class ReadJob : JobComponentSystem
    6.  
     
    florianhanke likes this.
  5. sngdan

    sngdan

    Joined:
    Feb 7, 2014
    Posts:
    991
    The first example yes

    The second example needs in addition the attribute for the simple / sequential dependency management (forgot what that attribute was called)
     
  6. PublicEnumE

    PublicEnumE

    Joined:
    Feb 3, 2019
    Posts:
    196
    there’s an attribute for this? Wonderful - I thought that was a project-wise setting.
     
  7. sngdan

    sngdan

    Joined:
    Feb 7, 2014
    Posts:
    991
    Sorry - I should not have called it attribute - as far as I recall it is project wise (set in bootstrap, can’t find it - looking from mobile)
     
  8. 5argon

    5argon

    Joined:
    Jun 10, 2013
    Posts:
    1,494
    This preprocessor directive :
    ENABLE_SIMPLE_SYSTEM_DEPENDENCIES
     
  9. Sarkahn

    Sarkahn

    Joined:
    Jan 9, 2013
    Posts:
    266
    Feels like the original question kinda got lost here. Like Tertle was saying, if you want to schedule multiple jobs to write in parallel, it should be done in a single system. I don't think it's possible to get separate systems to *write* in parallel to the same data since the safety system would force them to run in sequence. Or at least if it is possible the code would be pretty horrible to read since you'd have to juggle your dependency between systems.

    There are jobs for exactly this purpose - IJobParallelFor will run a job for every index of your data, IJobParallelForBatch will run a job for each "batch size" you give it.
     
    Last edited: Jan 26, 2020
  10. PublicEnumE

    PublicEnumE

    Joined:
    Feb 3, 2019
    Posts:
    196
    I respect what you’re saying. Like all disagreeing opinions on these forums, I’ll be thinking about it for several days, imagining how to change my code to implement it. I really do appreciate it.

    but I’m seriousness - I assert that there’s nothing unsafe or unwise about this pattern. Writing to different indices of a collection from multiple jobs (from the same or different systems) doesn’t risk a race condition. There are clear benefits (especially for real-world projects) for being able to add new concurrent-write systems over time, rather than having everyone on the team add more and more jobs to a single system as development unfolds. And waiting to read from that collection until all writing jobs are finished: it seems like unity could account for this in their job scheduler.

    So while this may not be something Unity currently supports, that doesn’t make it a bad pattern. I assert it would be very useful and valid pattern if it was supported. Designing one’s code for safe concurrent writing can be wise. It seems like the limitation here - the only argument for why not to do it - may be a lack of support.
     
    Last edited: Jan 26, 2020
  11. Sarkahn

    Sarkahn

    Joined:
    Jan 9, 2013
    Posts:
    266
    For the record I wasn't saying it's a bad pattern, I was just saying the safety system literally will not let you do it from separate systems without making your code a lot harder to parse. It's very straightforward and entirely reasonable to do it within one system, as long as you know you're not creating a race condition.

    I agree it would be nice if you could do something like this from separate systems but I'm not sure what the api for that would even look like while still obeying the rules of the safety system. It's an inherent rule of the safety system that on any piece of data write jobs run in sequence and read jobs run in parallel after write jobs.
     
  12. PublicEnumE

    PublicEnumE

    Joined:
    Feb 3, 2019
    Posts:
    196
    Ah, my mistake. :) Sorry, and thank you for the discussion.

    What about an API like this?

    Code (CSharp):
    1. struct ConcurrentWriteJob : IJobForEach_B<MyBuffer>
    2. {
    3.     public void Execute([ConcurrentWriteOnly] DynamicBuffer<MyBuffer> myBuffer)
    4.     {
    5.         //...
    6.     }
    7. }
    A
    ConcurrentWriteOnlyAttribute
    which would inform the safety system to allow concurrent writes to this collection, from these jobs only. The scheduler would still schedule Read jobs to run after these jobs are complete.

    Of course imagining the API is the easy part. :p Someone with knowledge about why this wouldn't work, please chime in and explain. And thank you.
     
    Last edited: Jan 26, 2020
  13. sngdan

    sngdan

    Joined:
    Feb 7, 2014
    Posts:
    991
    Look, I have not been really using DOTS for a long time and mainly follow the forum.

    So basically you want to pass a dependency between 2 systems and either [Disable...] or scheduling IJPF on a .asarray from the buffer would not create the dependency. How about in the same system you create a "dummy" write an Ijob that depends on the [Disable...] parallel job of that same system and just writes to index[0] = index[0], would this not create the dependency...?
     
    PublicEnumE likes this.
  14. PublicEnumE

    PublicEnumE

    Joined:
    Feb 3, 2019
    Posts:
    196
    That would work as you described. Manually passing dependencies in the same job bypasses the issue completely, but requires that you do everything through one system. That's what I would like to avoid. Reasons include realistic project workflows and code expansion over time. In practical terms, I cannot stand in front of a 25 person team and ask them to keep adding new jobs to a single system over a 3-5 year development cycle. The resulting system would be massive by the end of the project, and difficult to maintain, debug. It would also mean giving up the use of ComponentSystemGroups and the UpdateBefore/After attributes for ordering.

    Workarounds may be possible using partial classes for the system - or to write the code is smaller classes which get assimilated into a single, giant system at runtime, but all of those approaches are likely to have code smell when implemented at scale.
     
    Last edited: Jan 26, 2020
  15. sngdan

    sngdan

    Joined:
    Feb 7, 2014
    Posts:
    991
    ...i meant...

    System A:
    JobHandle job1 = .schedule(parallel job with disablesafety...., inputDeps)
    JobHandle job2 = .schedule(dummy job writing to the buffer in question, job1) // would this not fix the disablesafety from job1
    return job2 handle

    System B:
    job3 = .schedule (read job, inputDeps)
     
    PublicEnumE likes this.
  16. PublicEnumE

    PublicEnumE

    Joined:
    Feb 3, 2019
    Posts:
    196
    I believe that would work. The problem would be that the code needs to support an unbound number of "job1"s before "job2" is called. Which means that they would all need to be added to SystemA.
     
    Last edited: Jan 26, 2020
  17. Razmot

    Razmot

    Joined:
    Apr 27, 2013
    Posts:
    286
    with IJobForEach_B and other variants, you can use [WriteOnly] and [Readonly] on buffers.
    and have a look at [AlwaysSynchronizeSystem]
     
  18. PublicEnumE

    PublicEnumE

    Joined:
    Feb 3, 2019
    Posts:
    196
    thank you for this!

    do you know if all of the WriteOnly jobs will run in parallel, even if they include the same Buffer type? That’s what I can’t nail down. The documentation on WriteOnly is quite sparse atm.

    Thanks for any insights.
     
  19. Sarkahn

    Sarkahn

    Joined:
    Jan 9, 2013
    Posts:
    266
    They won't. Like I said earlier - it's an inherent rule of the safety system that on any piece of data write jobs run in sequence and read jobs run in parallel before or after write jobs. By marking data as [WriteOnly] in a job (or passing it as "ref" in a ForEach lambda) you're literally telling the safety system to do the opposite of what you're trying to do.

    If you want to write in parallel on a single piece of data you have two options - use a ParallelFor job or schedule multiple write jobs to run in parallel (using JobHandle.CombineDependencies) and disable safety checks on your data. Obviously at that point it's up to you to prevent any race conditions and you would need to do the extra work passing your job handle between systems.
     
  20. sngdan

    sngdan

    Joined:
    Feb 7, 2014
    Posts:
    991
    @Sarkahn - I am fully with you. I just wonder, if you can avoid passing the job handle between systems, by scheduling a "Dummy" write job that depends on the JobHandle.CombineDependencies. I.e. simple IJob writing to the buffer buffer[0] = buffer[0]...and then return that handle from the OnUpdate, would this not put the dependency chain in order, without the need to manually pass to other system?
     
    Sarkahn likes this.
  21. PublicEnumE

    PublicEnumE

    Joined:
    Feb 3, 2019
    Posts:
    196
    @Sarkahn I’m with you too. My intention was to be respectful to @Razmot, in case he had new information.

    If any Unity devs are reading, please consider chiming in. Assuming the user knows what they’re doing (and never writes to the same index from multiple threads), could the safety system support concurrent writes, between multiple jobs, to the same DB?

    It could be quite powerful and useful if it did.

    Thank you!
     
    Last edited: Jan 27, 2020
    Razmot and Sarkahn like this.
  22. Sarkahn

    Sarkahn

    Joined:
    Jan 9, 2013
    Posts:
    266
    I think I see what you're saying, but whether you CombineDependencies on the dummy job or chain the dummy job off the actual write job like from your last post - it seems like you're still just returning the original job as a dependency (even if indirectly) which would cause the scheduler to force any other system doing the same thing to just run the jobs in sequence. It's possible I'm misunderstanding it though, my brain is not built for this, hahah.

    In that same vein I was thinking maybe you could schedule the write job in a system (with safety disabled on the container) but not return the dependency, or maybe return a dummy job that reads your data instead, so the scheduler would force the systems to run in parallel? I'm not familiar enough with how it works internally, but could be worth a try.

    Edit: Also with [AlwaysSynchronizeSystem] on your parallel writing systems too maybe that could work.

    Roger that, sorry about that!
     
    Last edited: Jan 27, 2020
    Razmot likes this.
  23. sngdan

    sngdan

    Joined:
    Feb 7, 2014
    Posts:
    991
    can you test if this works...this was the idea I was talking about...

    Code (CSharp):
    1.  
    2. using Unity.Burst;
    3. using Unity.Collections;
    4. using Unity.Entities;
    5. using Unity.Jobs;
    6.  
    7.  
    8. [InternalBufferCapacity(256)]
    9. public struct IntBuffer : IBufferElementData
    10. {
    11.     public int Value;
    12. }
    13.  
    14. public class BufferWriteSystem : JobComponentSystem
    15. {
    16.     private EntityQuery _bufferQuery;
    17.    
    18.     [BurstCompile]
    19.     private struct ParallelBufferWriteJob : IJobParallelFor
    20.     {
    21.         public int Time;
    22.         [NativeDisableParallelForRestriction] public NativeArray<int> MyIntBuffer;
    23.        
    24.         public void Execute(int index)
    25.         {
    26.             MyIntBuffer[index] = Time;
    27.         }
    28.     }
    29.    
    30.     [BurstCompile]
    31.     private struct BufferDependencyJob : IJobForEach_B<IntBuffer>
    32.     {
    33.         public void Execute(DynamicBuffer<IntBuffer> b0)
    34.         {
    35.             b0[0] = b0[0]; // required? scheduling the job should create the dependency
    36.         }
    37.     }
    38.    
    39.    
    40.     protected override void OnCreate()
    41.     {
    42.         var e = EntityManager.CreateEntity(ComponentType.ReadWrite<IntBuffer>());
    43.         EntityManager.GetBuffer<IntBuffer>(e).ResizeUninitialized(256);
    44.         _bufferQuery = GetEntityQuery(ComponentType.ReadWrite<IntBuffer>());
    45.     }
    46.  
    47.     protected override JobHandle OnUpdate(JobHandle inputDeps)
    48.     {
    49.         var bufferEntities = _bufferQuery.ToEntityArray(Allocator.TempJob);
    50.         var bufferEntitiesCount = bufferEntities.Length;
    51.         var parallelJobHandles = new NativeArray<JobHandle>(bufferEntitiesCount, Allocator.TempJob);
    52.  
    53.         for (int i = 0; i < bufferEntitiesCount; i++)
    54.         {
    55.             var e = bufferEntities[i];
    56.             var bufferAsArray = GetBufferFromEntity<IntBuffer>()[e].Reinterpret<int>().AsNativeArray();
    57.             parallelJobHandles[i] = new ParallelBufferWriteJob
    58.             {
    59.                 Time = (int) UnityEngine.Time.time,
    60.                 MyIntBuffer = bufferAsArray
    61.             }.Schedule(bufferAsArray.Length, 16, inputDeps);
    62.         }
    63.        
    64.         inputDeps = JobHandle.CombineDependencies(parallelJobHandles);
    65.        
    66.         inputDeps = new BufferDependencyJob().Schedule(this, inputDeps);
    67.  
    68.         bufferEntities.Dispose();
    69.         parallelJobHandles.Dispose();
    70.         return inputDeps;
    71.     }
    72. }
    73.  
    74. [UpdateAfter(typeof(BufferWriteSystem))]
    75. public class BufferReadSystem : JobComponentSystem
    76. {
    77.     [BurstCompile]
    78.     private struct BufferReadJob : IJobForEach_B<IntBuffer>
    79.     {
    80.         public void Execute([ReadOnly] DynamicBuffer<IntBuffer> b0)
    81.         {
    82.             for (int i = 0; i < b0.Length; i++)
    83.             {
    84.                 var value = b0[i].Value;
    85.             }
    86.         }
    87.     }
    88.     protected override JobHandle OnUpdate(JobHandle inputDeps)
    89.     {
    90.         inputDeps = new BufferReadJob().Schedule(this, inputDeps);
    91.  
    92.         return inputDeps;
    93.     }
    94. }
    95.  
     
unityunity