Search Unity

  1. Welcome to the Unity Forums! Please take the time to read our Code of Conduct to familiarize yourself with the forum rules and how to post constructively.
  2. Dismiss Notice

Question Dependency Management between systems accessing same DynamicBuffer

Discussion in 'Entity Component System' started by Rekart, Jan 12, 2021.

  1. Rekart

    Rekart

    Joined:
    Jan 13, 2020
    Posts:
    22
    I'm trying to set dependencies between 2 systems accessing same data. If I'd pick for example NativeList for that purpose, then I'll have to manage dependencies manually. As automatic dependency management works for component types only, I decided to start with DynamicBuffer.

    Publisher system writes data to DynamicBuffer
    Code (CSharp):
    1. [UpdateInGroup(typeof(FixedStepSimulationSystemGroup))]
    2. [UpdateAfter(typeof(ExportPhysicsWorld))]
    3. [UpdateBefore(typeof(EndFramePhysicsSystem))]
    4. public class PublisherTestSystem1 : SystemBase
    5. {
    6.     private NativeList<EventBuffer> frameEvents;
    7.  
    8.     public Entity PublisherEntity;
    9.  
    10.     //write data from NativeList to DynamicBuffer
    11.     [BurstCompile]
    12.     private struct PublishEventsJob : IJob
    13.     {
    14.         [ReadOnly]
    15.         public NativeList<EventBuffer>     FrameEvents;
    16.         public DynamicBuffer<EventBuffer>  EventBuffer;
    17.        
    18.         public void Execute()
    19.         {
    20.             for (int i = 0; i < FrameEvents.Length; i++)
    21.             {
    22.                 EventBuffer.Add(FrameEvents[i]);
    23.             }
    24.         }
    25.     }
    26.            
    27.     //clear DynamicBuffer and start job
    28.     private JobHandle Publish(JobHandle dependency, NativeList<EventBuffer> events)
    29.     {
    30.         var buffer = GetBuffer<EventBuffer>(PublisherEntity);
    31.         buffer.Clear();
    32.         var publishJob = new PublishEventsJob()
    33.         {
    34.             FrameEvents = events,
    35.             EventBuffer = buffer
    36.         };
    37.         return publishJob.Schedule(dependency);
    38.     }
    39.    
    40.     //create entity and attach DynamicBuffer
    41.     private void CreateEventHolder()
    42.     {
    43.         PublisherEntity = EntityManager.CreateEntity();
    44.         EntityManager.SetName(PublisherEntity, "CoreEventPublisher");
    45.         EntityManager.AddBuffer<EventBuffer>(PublisherEntity);
    46.     }
    47.  
    48.     protected override void OnCreate()
    49.     {
    50.         frameEvents       = new NativeList<EventBuffer>(100, Allocator.Persistent);
    51.         for (int i = 0; i < 10; i++)
    52.         {
    53.             frameEvents.Add(new EventBuffer(){Payload = i});
    54.         }
    55.         CreateEventHolder();
    56.     }
    57.  
    58.     protected override void OnUpdate()
    59.     {
    60.         var frameEventsCached = frameEvents;
    61.         Dependency = Publish(Dependency, frameEventsCached);
    62.     }
    63. }
    And Listener system reads data
    Code (CSharp):
    1. [UpdateInGroup(typeof(FixedStepSimulationSystemGroup))]
    2. [UpdateAfter(typeof(ExportPhysicsWorld))]
    3. [UpdateBefore(typeof(EndFramePhysicsSystem))]
    4. [UpdateAfter(typeof(PublisherTestSystem1))]
    5. public class ListenerTestSystem1 : SystemBase
    6. {
    7.     private NativeList<EventBuffer> frameEvents;
    8.     private Entity                  publisherEntity;
    9.    
    10.     //read data from DynamicBuffer and write to NativeList
    11.     [BurstCompile]
    12.     private struct CollectEventsJob : IJob
    13.     {
    14.         [ReadOnly] public DynamicBuffer<EventBuffer>  EventBuffer;
    15.         public NativeList<EventBuffer>.ParallelWriter FrameEvents;
    16.  
    17.         public void Execute()
    18.         {
    19.             for (int i = 0; i < EventBuffer.Length; i++)
    20.             {
    21.                 FrameEvents.AddNoResize(EventBuffer[i]);
    22.             }
    23.         }
    24.     }
    25.    
    26.     private JobHandle PollEvents(JobHandle dependency)
    27.     {
    28.         frameEvents.Clear();
    29.  
    30.         var eventsCached = frameEvents.AsParallelWriter();
    31.         var buffer = GetBufferFromEntity<EventBuffer>(isReadOnly: true);
    32.  
    33.         var job = new CollectEventsJob()
    34.         {
    35.             EventBuffer = buffer[publisherEntity],
    36.             FrameEvents = eventsCached
    37.         };
    38.         dependency = job.Schedule(dependency);
    39.  
    40.         return dependency;
    41.     }
    42.  
    43.     protected override void OnUpdate()
    44.     {
    45.         Dependency = PollEvents(dep);
    46.     }
    47. }
    Listener system runs right after publisher, and I'm getting error
    InvalidOperationException: The previously scheduled job PublisherTestSystem1:PublishEventsJob writes to the BufferFromEntity<DependencyTest.EventBuffer> PublishEventsJob.EventBuffer. You must call JobHandle.Complete() on the job PublisherTestSystem1:PublishEventsJob, before you can read from the BufferFromEntity<DependencyTest.EventBuffer> safely.


    I tried to do combine listener system input dependency with publisher system output dependency, but that didnt't help.
    What am I missing, and how can I handle such situations?
     
  2. PublicEnumE

    PublicEnumE

    Joined:
    Feb 3, 2019
    Posts:
    729
    Quick question, before answering more:

    ok line 45 of your Listener system, you pass a JobHandle named ‘dep’ into the PollEvents function as a dependency:

    Dependency = PollEvents(dep);


    Where does dep come from?

    Right now, I looks like your not actually passing the Dependency JobHandle from your PublishingSystem into the job scheduled by your ListeningSystem.

    In this case, that wouldn’t happen automatically. Apologies if you already know this: Unity’s automatic dependency management only considers jobs which work on the same component data (usually defined in queries), and not the order in which systems run.

    If SystemB is always set to update after SystemA, that isn’t enough to make SystemB’a jobs dependent on SystemA’s jobs. If you want that kind of thing, then you still need to manage the dependencies manually.
     
    Last edited: Jan 12, 2021
  3. PublicEnumE

    PublicEnumE

    Joined:
    Feb 3, 2019
    Posts:
    729
    As an aside, you probably don’t need to clear the DynamicBuffer on the main thread. It would probably be faster to pass a BufferFromEntity into the job, grab the correct DynamicBuffer from it inside the job’s Execute function, and clear it there.

    Might save you some time. :)
     
  4. sngdan

    sngdan

    Joined:
    Feb 7, 2014
    Posts:
    1,131
    Is clear not just setting the length to 0? (I have been off for more than a year, so don’t trust me)
     
  5. Rekart

    Rekart

    Joined:
    Jan 13, 2020
    Posts:
    22

    Sorry, missed a bit during post editing, I meant
    Dependency = PollEvents(Dependency);

    on line 45 of Listener system.

    I also tried to to do the following:
    1) Add GetDependency() method to publisher system
    Code (CSharp):
    1. public class PublisherTestSystem1 : SystemBase
    2. {
    3.     public JobHandle GetDependency() => Dependency;
    4.  
    5.     //all other code without changes
    6. }
    2) combine dependencies for listener system, so update looks this way:
    Code (CSharp):
    1. protected override void OnUpdate()
    2. {
    3.     Dependency = JobHandle.CombineDependencies(Dependency, World.GetExistingSystem<PublisherTestSystem1>().GetDependency());
    4.     Dependency = PollEvents(Dependency);
    5. }
    I thought that this is what "manual dependency management" means, and this way I can explicitly say that jobs inside listener must execute only after jobs inside publisher are complete, but it still dont work.
     
  6. Sarkahn

    Sarkahn

    Joined:
    Jan 9, 2013
    Posts:
    440
    I'm pretty sure your dependency error is from retrieving your buffer on the main thread then passing it in. You should pass in the BufferFromEntity along the with relevant entity instead, and only access your buffers inside a job. Although of you're passing around a nativelist you're going to be stuck managing dependencies anyways.

    You should think about if you actually need that nativelist, whether it wouldn't be easier to do it with all dynamic buffers and let unity worry about dependencies. Otherwise you should look at how the physics package handles dependencies with AddInput/GetOutputDependency
     
    Rekart likes this.
  7. Rekart

    Rekart

    Joined:
    Jan 13, 2020
    Posts:
    22
    You're right, thank you very much, dependency error was from BufferFromEntity
     
  8. Rekart

    Rekart

    Joined:
    Jan 13, 2020
    Posts:
    22
    I played around a bit with dependencies, and ended up with the following sample scripts, which work.

    System that writes data to NativeList is simple, nothing interesting about it
    Code (CSharp):
    1. [UpdateInGroup(typeof(FixedStepSimulationSystemGroup))]
    2. [UpdateAfter(typeof(ExportPhysicsWorld))]
    3. [UpdateBefore(typeof(EndFramePhysicsSystem))]
    4. public class WriterSystem : SystemBase
    5. {
    6.     public NativeList<int> SharedData;
    7.      
    8.     public JobHandle GetOutputDependency() => Dependency;
    9.              
    10.     [BurstCompile]
    11.     private struct WriteJob : IJob
    12.     {
    13.         public NativeList<int> Data;
    14.                  
    15.         public void Execute()
    16.         {
    17.             Data.Clear();
    18.             for (int i = 0; i < 10; i++)
    19.             {
    20.                 Data.Add(i);
    21.             }
    22.         }
    23.     }
    24.      
    25.     protected override void OnCreate()
    26.     {
    27.         SharedData = new NativeList<int>(10, Allocator.Persistent);
    28.     }
    29.      
    30.     protected override void OnDestroy()
    31.     {
    32.         SharedData.Dispose();
    33.     }
    34.      
    35.     protected override void OnUpdate()
    36.     {
    37.         var job = new WriteJob()
    38.         {
    39.             Data = SharedData
    40.         };
    41.         Dependency = job.Schedule(Dependency);
    42.     }
    43. }
    And system that accessing SharedData:
    Code (CSharp):
    1. [UpdateInGroup(typeof(FixedStepSimulationSystemGroup))]
    2. [UpdateAfter(typeof(ExportPhysicsWorld))]
    3. [UpdateBefore(typeof(EndFramePhysicsSystem))]
    4. [UpdateAfter(typeof(WriterSystem))]
    5. public class ReaderSystem : SystemBase
    6. {
    7.     private NativeList<int> copiedData;
    8.  
    9.     [BurstCompile]
    10.     private struct CopyAndMultiplyJob : IJob
    11.     {
    12.         [ReadOnly]
    13.         public NativeList<int> Source;
    14.         public NativeList<int> Destination;
    15.          
    16.         public void Execute()
    17.         {
    18.             Destination.Clear();          
    19.             for (int i = 0; i < Source.Length; i++)
    20.             {
    21.                 //copy data to destination list and multiply each value by 2
    22.                 Destination.Add(Source[i] * 2);
    23.             }
    24.         }
    25.     }
    26.      
    27.     protected override void OnCreate()
    28.     {
    29.         copiedData = new NativeList<int>(10, Allocator.Persistent);
    30.     }
    31.  
    32.     protected override void OnDestroy()
    33.     {
    34.         copiedData.Dispose();
    35.     }
    36.  
    37.     protected override void OnUpdate()
    38.     {
    39.         //Some debug output
    40.         string result = "";
    41.         for (int i = 0; i < copiedData.Length; i++)
    42.         {
    43.             result += copiedData[i] + " ";
    44.         }
    45.         UnityEngine.Debug.Log(result);
    46.      
    47.         //Piece with manual dependency management
    48.         Dependency = JobHandle.CombineDependencies(Dependency, World.GetExistingSystem<WriterSystem>().GetOutputDependency());
    49.          
    50.         var job = new CopyAndMultiplyJob()
    51.         {
    52.             Source      = World.GetExistingSystem<WriterSystem>().SharedData,
    53.             Destination = copiedData
    54.         };
    55.         Dependency = job.Schedule(Dependency);
    56.      
    57.         //Why we must call this?
    58.         World.GetExistingSystem<EndFramePhysicsSystem>().AddInputDependency(Dependency);
    59.     }
    60. }
    Why we must call the following line of code:
    World.GetExistingSystem<EndFramePhysicsSystem>().AddInputDependency(Dependency);


    PhysicsSamples gives one comment about these calls:
    // Chain the scheduled jobs as dependencies into the BuildPhysicsWorld system


    After digging into EndFramePhysicsSystem and BuildPhysicsWorld, it looks like calling
    AddInputDependency
    ensures that previous frame jobs are complete before new frame starts. Does this happen because jobs can run over multiple frames, and by calling
    AddInputDependency
    we ensure that it will be complete at the beginning of next frame?