Search Unity

  1. Calling all beginners! Join the FPS Beginners Mods Challenge until December 13.
    Dismiss Notice
  2. It's Cyber Week at the Asset Store!
    Dismiss Notice

Event System

Discussion in 'Data Oriented Technology Stack' started by tertle, Nov 19, 2019.

  1. tertle

    tertle

    Joined:
    Jan 25, 2011
    Posts:
    1,906
    Preface This is a follow up to the idea I discussed here and a continuation of my original event system I designed from the same thread.

    What is this?
    It’s really just an idea on one possible way someone might implement part of the architecture of an ECS based application. Fundamentally this particular solution is really just a method of automatically and safely sharing NativeStreams between multiple systems to create an event system.

    Objectives
    • same frame producing then consuming
    • the option to share events between worlds
    • support different loop update rates. e.g. produce in update, consume in fixed update
    • any number of producers and consumers
    • fully threaded
    • no sync points
    • no garbage
    • easy dependency management
    • negligible overhead
    Where to get it?
    The repo is available here
    The best way to install it is to simply pull it into your project using the package manager. You can do this by adding
    to your manifest.json. The package contains 3 directories
    • Event
      The actual source code lives in here
    • Event.Tests
      Unit tests for the solution. Good coverage but definitely not thorough yet (it didn't catch a bunch of issues I found building a sample). If you want to run these you’ll need to pull in the shared testing library I use between packages
    • Samples~
      Contains importable samples. At this stage it is a 3 world system with 1 world being in fixed update. To import into project go to Window->Package Manager, Under BovineLabs Event select hit Import Samples.
    When to use it?
    ~coming soon.
    I will be working on a bunch of samples and use cases for ideas I have in my head in the coming days.

    How to use?
    To get the event system you simply request it like any other system.

    this.eventSystem = this.World.GetOrCreateSystem<LateSimulationEventSystem>();

    For now by default there is a LateSimulationEventSystem and PresentationEventSystem. It is important to use the system that is part of your loop. If you are using secondary worlds you'll need to implement your own system for that world (very easy, more below.)

    The basics between being a producer and a consumer are pretty much the same. You have to get your writer/reader and then return your dependency handle. An understanding of NativeStreams is important as this is what events are written to and read from. You can read about them here

    Producer (write)
    To write events you first request a new NativeStream.

    NativeStream.Writer writer = this.eventSystem.CreateEventWriter<TestEvent>(foreachCount);

    The foreachCount is the foreachCount property of the NativeStream. I will be providing more examples on how to determine this in the coming days.
    You can then pass this writer to a job or use it on the spot as you would any other native stream. It's important however to only write the event type requested to the stream (or a value that can be reinterpreted as the event type i.e. same memory layout).

    Once you have passed this to the job, you must return the handle to the event system.

    this.eventSystem.AddJobHandleForProducer<TestEvent>(handle);

    An example of a IJobParallelFor use could be something like this

    Code (CSharp):
    1.     protected override JobHandle OnUpdate(JobHandle handle)
    2.     {
    3.         const int foreachCount = 100;
    4.  
    5.         NativeStream.Writer writer = this.eventSystem.CreateEventWriter<TestEvent>(foreachCount);
    6.  
    7.         handle = new ProduceJob
    8.             {
    9.                 Events = writer,
    10.             }
    11.             .Schedule(foreachCount, 16, handle);
    12.  
    13.         this.eventSystem.AddJobHandleForProducer<TestEvent>(handle);
    14.  
    15.         return handle;
    16.     }
    Consumer (read)
    To read events you must request a collection of streams that have been registered since last update.

    handle = this.eventSystem.GetEventReaders<TestEvent>(handle, out IReadOnlyList<Tuple<NativeStream.Reader, int>>readers);

    Unlike CreateEventWriter this requires the input dependency of the system and returns an updated handle that you must use. If you are reading in a component system it is safe to pass in default(JobHandle) but you must call Complete() on the returned handle before reading the event streams.

    The actual output is from the out field which returns a readonly collection of tuples, with Item1 being the NativeStream.Reader and item2 being the foreachCount. The included foreachCount is there as while it is actually safe for us to read the ForEachCount property of the stream, it unfortunately has a read access check which will throw an error (I believe this check is here because of the ScheduleConstruct option for streams.)

    You can then iterate the streams, and create jobs from them. You must pass a handle back after you have scheduled your jobs.,

    this.eventSystem.AddJobHandleForConsumer<TestEvent>(handle);

    An example could be something like this

    Code (CSharp):
    1. protected override JobHandle OnUpdate(JobHandle handle)
    2.         {
    3.             handle = this.eventSystem.GetEventReaders<TestEvent>(handle, out var readers);
    4.  
    5.             for (var index = 0; index < readers.Count; index++)
    6.             {
    7.                 var reader = readers[index];
    8.  
    9.                 handle = new CountJob
    10.                     {
    11.                         Stream = reader.Item1,
    12.                     }
    13.                     .Schedule(reader.Item2, 8, handle);
    14.             }
    15.  
    16.             this.eventSystem.AddJobHandleForConsumer<TestEvent>(handle);
    17.  
    18.             return handle;
    19.         }
    Things to note
    • This is experimental.
    I delayed posting until I at least had a test sample out to ensure it was somewhat stable however I do not currently have a project to test it on so the samples are being written to test the workflow to ensure viability as well as catch any bugs.
    • You must balance CreateEventWriter<T> with AddJobHandleForProducer<T> and GetEventReaders<T> with AddJobHandleForConsumer<T>.
    Safety checks are in place for this and an exception will be thrown if you call 2 CreateEventWriter in a row etc. This is important to know as it is technically safe to do this

    Code (CSharp):
    1. var event1 = this.eventSystem.CreateEventWriter<UpdateCountEvent>(foreachCount1);
    2.  
    3. handle = new TestJob
    4.     {
    5.         Event = event1,
    6.     }
    7.     .Schedule(foreachCount1, 8, handle);
    8.  
    9.     // Without this, while the system would actually work safely it will still throw an exception
    10.     // this.eventSystem.AddJobHandleForProducer<UpdateCountEvent>(handle);
    11.  
    12.     var event2 = this.eventSystem.CreateEventWriter<UpdateCountEvent>(foreachCount2);
    13.  
    14. handle = new TestJob
    15.     {
    16.         Event = event2,
    17.     }
    18.     .Schedule(foreachCount2, 8, handle);
    19.  
    20. this.eventSystem.AddJobHandleForProducer<UpdateCountEvent>(handle);
    yet it will still throw an exception. I decided the strict requirement to ensure safety and avoid errors slipping through is more important than the inconvenience of an extra line of code here or there. Let me know.
    • NativeStreams don't optimize great with IJobForEach.
    They do work and for small entity counts there are no issues but for large entity counts you want to optimize you should switch to IJobChunk.
    I'll be posting samples on how to use these streams shortly for various job types.

    Implementing your own Event System - Custom worlds or game loops
    ~coming soon
    For now you can look at the existing sample. It demonstrates producing events in the default world, consuming events in a seperate world, and consuming events in a seperate world with a different (fixed) update rate. It will output a result like this.

    upload_2019-11-19_15-21-44.png

    49-51k events are being produced in the regular world.
    The events are being consumed and counted every frame in both a world on Update as well as another World updating on FixedUpdate.
    As fixed update only runs 50 times a second but the actual applications fps is nearly 3000 it is generating nearly 3millions events between fixed updates (a more realistic scenario would be producing events in the FixedUpdate and consuming in Update but the majority of Update frames would have 0 events being a dull example).
    A thing to note the UI is also powered by sending events back after counting events!

    Feedback
    Would love to get some feedback on whether other developers think that they could find this useful and/or features they would like to see.
     
    Last edited: Dec 3, 2019 at 10:30 AM
  2. tertle

    tertle

    Joined:
    Jan 25, 2011
    Posts:
    1,906
    ~Saved for later
     
  3. tertle

    tertle

    Joined:
    Jan 25, 2011
    Posts:
    1,906
    ~Saved for later
     
  4. Opeth001

    Opeth001

    Joined:
    Jan 28, 2017
    Posts:
    394
    I just discovered the Event System and I found it really interesting !!
    correct me if im wrong.
    1) this package allows us to use all CommandBuffer features in a more performant way and more important in burstable jobs.
    2) allows passing events between worlds.
    3) replace creation of Event Entities (small lifetime) by simply consuming events.
    4) and all this in 10x faster!!

    i wrote only the more intersting features for my case!

    is it possible to add this package to my project using the Entities 0.1 Version ? (i cant go to the 0.2 for bugs reasons)
    * No way to build player for other than Standalone Platform when using Subscenes.
    * more garbage collection than the 0.1 version and performance waste.
     
  5. tertle

    tertle

    Joined:
    Jan 25, 2011
    Posts:
    1,906
    1. It doesn't let you create or manipulate entities so I wouldn't really say this.
    2. Yes
    3. Yes
    4. Yes

    Just pull the commit before I updated to 0.2
    https://gitlab.com/tertle/com.bovinelabs.event/tree/1b11a39fc8fbdee82ce7445520c013c9dde50e6a
    I have added any functionality since 0.2 came out. The only minor changes were to how world messaging was done to be compatible with 0.2 changes.
     
  6. Opeth001

    Opeth001

    Joined:
    Jan 28, 2017
    Posts:
    394
    im sorry i read this in the previous thread so i thought it does.

    "And as I use quite a few short lived event entities in my project I was curious if I could write something that could generically replaced EntityCommandBuffer for these. So I did!
    Firstly code, this is my first, not well tested and quickly written proof of concept.
    -edit- dramatically updating code, will post it when i'm done.
    -edit2- latested post here, now 3x faster than original version (10x faster than a barrier): https://forum.unity.com/threads/batch-entitycommandbuffer.593569/#post-3965998
    How it works. It passes a NativeQueue<T> whenever requested.
    Get the barrier with
    this.batchBarrier = this.World.GetOrCreateManager<BatchBarrierSystem>();
    The same barrier is used for all systems.

    To create an entity use
    NativeQueue<T>.Concurrent createQueue = this.batchBarrier.GetCreateQueue<T>().ToConcurrent();
    where T is an IComponentData that will be added and set on the entity."


    ill add it and give you my feedback.
    Thanks!
     
  7. LastResortMatthew

    LastResortMatthew

    Joined:
    Apr 10, 2019
    Posts:
    51
    One thing I do often in my very basic Entities-based Event system is attach buffers to the event entities. Since that's not possible here, is the idea that the system is so fast that I would just create individual events for each item that I would have put in a buffer?
     
  8. tertle

    tertle

    Joined:
    Jan 25, 2011
    Posts:
    1,906
    You can pass anything to a native stream, including an array of data, as long as you know what to expect.
    Quick example.

    Code (CSharp):
    1. [BurstCompile]
    2. private struct ProduceJob : IJobParallelFor
    3. {
    4.     public NativeStream.Writer Events;
    5.  
    6.     public Random Random;
    7.  
    8.     public void Execute(int index)
    9.     {
    10.         this.Random.state = (uint)(this.Random.state + index);
    11.  
    12.         var eventCount = this.Random.NextInt(1, 1000);
    13.  
    14.         this.Events.BeginForEachIndex(index);
    15.      
    16.         this.Events.Write(default(TestEventEmpty));
    17.      
    18.         for (var i = 0; i < eventCount; i++)
    19.         {
    20.             this.Events.Write(i);
    21.         }
    22.  
    23.         this.Events.EndForEachIndex();
    24.     }
    25. }
    26.  
    27. [BurstCompile]
    28. public struct ConsumeJob : IJobParallelFor
    29. {
    30.     public NativeStream.Writer EventCount;
    31.     public NativeStream.Reader Stream;
    32.  
    33.     public void Execute(int index)
    34.     {
    35.         var count = this.Stream.BeginForEachIndex(index);
    36.      
    37.         if (count == 0)
    38.         {
    39.             return;
    40.         }
    41.      
    42.         // First element is TestEventEmpty
    43.         var eventData = this.Stream.Read<TestEventEmpty>();
    44.      
    45.         // Rest of elements are integers
    46.         for(var i = 1; i < count; i++)
    47.         {
    48.             var v = this.Stream.Read<int>();
    49.         }
    50.      
    51.         this.Stream.EndForEachIndex();
    52.     }
    53. }
     
  9. Jyskal

    Jyskal

    Joined:
    Dec 4, 2014
    Posts:
    14
    @tertle there is a small mistake in your manifest line.

    "com.bovinelabs.events": "[URL]https://gitlab.com/tertle/com.bovinelabs.event.git[/URL]",


    com.bovinelabs.events should be com.bovinelabs.event otherwise the package manager will complain.
     
  10. tertle

    tertle

    Joined:
    Jan 25, 2011
    Posts:
    1,906
    Cheers fixed.
     
  11. tertle

    tertle

    Joined:
    Jan 25, 2011
    Posts:
    1,906
    That was the original version which actually created events using entities. This new version ditches entities for performance and a lot more flexibility.
     
  12. eterlan

    eterlan

    Joined:
    Sep 29, 2018
    Posts:
    87
    Thanks for your event system! I get it from Package Manager with git url, and import the sample folder, but there are a few problems stop me run the sample. How to fix it?
    Assets/Samples/BovineLabs Event/0.1.0/Samples/CustomBootstrap.cs(16,36): error CS0535: 'CustomBootstrap' does not implement interface member 'ICustomBootstrap.Initialize(string)'
    Assets/Samples/BovineLabs Event/0.1.0/Samples/MultiWorld/UpdateEventSystem.cs(17,34): error CS0115: 'UpdateEventSystem.CustomWorld': no suitable method found to override
     
  13. pal_trefall

    pal_trefall

    Joined:
    Feb 5, 2019
    Posts:
    54
    Thank you for sharing Tertle!
     
  14. tertle

    tertle

    Joined:
    Jan 25, 2011
    Posts:
    1,906
    If you haven't updated to the latest entities package (0.2) you'll need the version posted earlier in the thread.
     
  15. eterlan

    eterlan

    Joined:
    Sep 29, 2018
    Posts:
    87
    my fault. I use latest 0.2.0 entities package & 2019.3.0f1 unity.
     
  16. Jyskal

    Jyskal

    Joined:
    Dec 4, 2014
    Posts:
    14
    I keep getting:

    ArgumentException: NativeStream.Writer must be passed by ref once it is in use.


    when using it in an IJobChunk.

    When using the example of the IJobParallelFor it works without any issues.

    OnUpdate (simplified):
    Code (CSharp):
    1.   protected override JobHandle OnUpdate(JobHandle inputDeps)
    2.   {
    3.     EntityQuery jobSeekersQuery = GetEntityQuery(typeof(Citizen), typeof(Translation));
    4.  
    5.     int numberOfRequests = jobSeekersQuery.CalculateChunkCount();
    6.     if (numberOfRequests == 0) return inputDeps;
    7.  
    8.     var writer = eventSystem.CreateEventWriter<JobFoundEvent>(numberOfRequests);
    9.  
    10.     inputDeps = new FindJobForCitizen
    11.     {
    12.       jobDataArray = jobDataArray,
    13.       writer = writer,
    14.       translationComponentType = GetArchetypeChunkComponentType<Translation>(false),
    15.       entitiesEntityType = GetArchetypeChunkEntityType()
    16.     }.Schedule(jobSeekersQuery, inputDeps);
    17.  
    18.     eventSystem.AddJobHandleForProducer<JobFoundEvent>(inputDeps);
    19.  
    20.     return inputDeps;
    21.   }
    The Job (simplified):
    Code (CSharp):
    1.   [BurstCompile]
    2.   struct FindJobForCitizen : IJobChunk
    3.   {
    4.     [DeallocateOnJobCompletion] [ReadOnly] public NativeArray<JobData> jobDataArray;
    5.     public NativeStream.Writer writer;
    6.  
    7.     public void Execute(ArchetypeChunk chunk, int chunkIndex, int firstEntityIndex)
    8.     {
    9.       for (int i = chunkIndex; i < chunk.Count; i++)
    10.       {
    11.           // Do stuff
    12.  
    13.         if (closestJob != Entity.Null)
    14.         {
    15.           writer.Write(new JobFoundEvent { citizen = entities[i], job = closestJob });
    16.         }
    17.       }
    18.     }
    19.   }
     
    Last edited: Dec 6, 2019 at 12:19 AM
  17. tertle

    tertle

    Joined:
    Jan 25, 2011
    Posts:
    1,906
    NativeStream requires
    writer.BeginForEachIndex(index);
    and
    writer.EndForEachIndex();

    So your job should look something like this

    Code (CSharp):
    1. writer.BeginForEachIndex(chunkIndex);
    2.  
    3. for (int i = chunkIndex; i < chunk.Count; i++)
    4. {
    5.     // Do stuff
    6.  
    7.     if (closestJob != Entity.Null)
    8.     {
    9.         writer.Write(new JobFoundEvent { citizen = entities[i], job = closestJob });
    10.     }
    11. }
    12.  
    13. writer.EndForEachIndex();
     
    Jyskal likes this.