Search Unity

  1. Welcome to the Unity Forums! Please take the time to read our Code of Conduct to familiarize yourself with the forum rules and how to post constructively.
  2. Dismiss Notice

Designing an event system for ecs?

Discussion in 'Entity Component System' started by jasons-novaleaf, Jan 19, 2021.

  1. jasons-novaleaf

    jasons-novaleaf

    Joined:
    Sep 13, 2012
    Posts:
    181
    So I'm about 30 hours into Unity+DOTS now, and thinking about how I'd like to do my prototype's architecture.

    I like the idea of event systems. I think the way entities are queried naturally lends towards this pattern, but I'm curious what "experienced" DOTS developers think?

    Specifically, I wonder about performance pitfalls, such as if adding/removing tag components often have a big performance impact? Is there a better way?
     
    PublicEnumE likes this.
  2. PublicEnumE

    PublicEnumE

    Joined:
    Feb 3, 2019
    Posts:
    729
    Welcome! People do use event systems in DOTS. Using tag components can work, but it can have some notable drawbacks. Mainly - in order to add or remove the ‘event’ components, you’ll probably need to use an Entity Command Buffer. Those only execute on the main thread, so all System updates will temporarily stop while these components are added. That can have a notable impact on perf, depending on the situation.

    There are a number of other approaches devs have taken. Here are two options that don’t involve tag components or sync points:

    1. Store all events of a type in a DynamicBuffer. That buffer is preallocated to the largest needed size. Each event producer is given an assigned index range which they can safely write to. This allows multiple producers to write to the same DynamicBuffer in separate, parallel jobs. All event “listener” jobs would be scheduled to be dependent on all event “producer” jobs completing first. Clear the event buffer after all listener jobs have completed.

    2. There are a lot of solutions out there which use Native collections in various ways to store events. NativeArray, NativeQueue, and NativeStream are all popular. Some of these are quite advanced, like the one tertle has made public.
     
    andreyakladov and adammpolak like this.
  3. Opeth001

    Opeth001

    Joined:
    Jan 28, 2017
    Posts:
    1,068
    ECB is thread safe and burstable from ~5 versions ago, you can add commands from jobs but it will create sync points when the ECB playsback.
    1) you can use the designed System groups that unity already use to playback and safely do sync points.
    2) @tertle created a good event system here which is based on nativestreams (no sync points).
    3) You can also build reactive systems based on components versions. when a System runs and is supposed to write to some CD, the chunk containing those entities increases their CD versions.
     
  4. jasons-novaleaf

    jasons-novaleaf

    Joined:
    Sep 13, 2012
    Posts:
    181
    Thank you so much for that reference and details. I had better crawl that git repo... but I can't stop myself from writing a toy implementation to play with :)

    From spending a few hrs writing a (still broken) toy implementation, I'm getting the feeling that ECS is already kind of an event system. Though, I think a true Event system is important if there is going to be network play, and maybe useful for serializing.

    I'll post my toy code here when I get it working!
     
    PublicEnumE and Opeth001 like this.
  5. PublicEnumE

    PublicEnumE

    Joined:
    Feb 3, 2019
    Posts:
    729
    Best of luck! Thank you for sharing.
     
  6. jasons-novaleaf

    jasons-novaleaf

    Joined:
    Sep 13, 2012
    Posts:
    181
    So I finished writing up a toy message system. Not an event system, as events imply multiple consumers.

    My code is based on NativeQueues, and is a pretty great learning experience into DOTS/ECS so I don't regret doing it, but I think it's more instructional on the kinds of patterns ecs uses, not really useful as is.

    The main part of the code is here:

    Code (CSharp):
    1. using UnityEngine;
    2. using Unity.Burst;
    3. using Unity.Collections;
    4. using Unity.Entities;
    5. using Unity.Jobs;
    6. using Unity.Mathematics;
    7. using Unity.Transforms;
    8. using System.Runtime.InteropServices;
    9. using System;
    10. using Unity.Collections.LowLevel.Unsafe;
    11.  
    12.  
    13. public enum SystemMessageType : int
    14. {
    15.     Audio_Sfx,
    16.     Audio_Music,
    17.  
    18. }
    19. //[StructLayout(LayoutKind.Explicit, Size = 132)]
    20. //public struct SystemMessage132
    21. //{
    22. //    [FieldOffset(0)]
    23. //    public SystemMessageType type;
    24. //    [FieldOffset(4)]
    25. //    public FixedString128 val_str128;
    26. //}
    27.  
    28.  
    29. public enum EventMsgType : int
    30. {
    31.     Spawn,
    32.     Hit,
    33.     Kill,
    34.     Move,
    35. }
    36. [StructLayout(LayoutKind.Sequential)]
    37. public struct EventMsg
    38. {
    39.     public EventMsgType type;
    40.     public double gt;
    41.     public Entity sender;
    42.     public Entity target;
    43.     public FixedListInt4096 data; //TODO: huge.  would want multiple sized message queues to prevent too much wasted mem.
    44. }
    45.  
    46. public unsafe class MessageSystem : SystemBase
    47. {
    48.  
    49.     private struct EventMsgParseJob : IJobParallelFor
    50.     {
    51.  
    52.         [Unity.Collections.LowLevel.Unsafe.NativeSetThreadIndex]
    53.         public int nativeThreadIndex;
    54.         /// <summary>
    55.         /// don't use this reference, but it's included so we tell burst that we do read-write with it so other jobs touching it don't run at the same time.
    56.         /// other jobs using it should use the [ReadOnly] attribute.
    57.         /// </summary>
    58.         public NativeArray<UnsafeList<EventMsg>> threadQueue;
    59.         /// <summary>
    60.         /// Thread local storge.  ptr to NativeArray<UnsafeList<EventMsg>>
    61.         /// </summary>
    62.         [NativeDisableUnsafePtrRestriction]
    63.         public UnsafeList<EventMsg>* p_threadQueue;
    64.         [NativeDisableParallelForRestriction, ReadOnly]
    65.         public ComponentDataFromEntity<OnKill> onKillData;
    66.         [NativeDisableParallelForRestriction]
    67.         public NativeQueue<AudioSystem.AudioMessage>.ParallelWriter audioIn;
    68.         [NativeDisableParallelForRestriction, ReadOnly]
    69.         public EntityManager em;
    70.  
    71.         public unsafe void Execute(int index)
    72.         {
    73.             //Debug.Log($"EventTest Thread=${nativeThreadIndex}, index={index}");
    74.             var p_list = p_threadQueue[index].Ptr;
    75.             var count = p_threadQueue[index].length;
    76.             for (var i = 0; i < count; i++)
    77.             {
    78.                 _processMsg(ref p_list[i]);
    79.             }
    80.             if (count != p_threadQueue[index].length)
    81.             {
    82.                 throw new Exception("race, shouldn't happen!");
    83.             }
    84.             p_threadQueue[index].Clear();
    85.         }
    86.  
    87.         private void _processMsg(ref EventMsg msg)
    88.         {
    89.             switch (msg.type)
    90.             {
    91.                 case EventMsgType.Kill:
    92.                     //if (em.HasComponent<OnKill>(msg.target)) //dots bug: https://forum.unity.com/threads/in-a-ijobparallelfor-can-not-use-entitymanager.1042306/
    93.                     if (onKillData.HasComponent(msg.target))
    94.                     {
    95.                         var onKill = onKillData[msg.target];
    96.                         //var onKill = em.GetComponentData<OnKill>(msg.target);  //dots bug: https://forum.unity.com/threads/in-a-ijobparallelfor-can-not-use-entitymanager.1042306/
    97.                         audioIn.Enqueue(new AudioSystem.AudioMessage() { type = SystemMessageType.Audio_Sfx, audioFile = onKill.sfxName });                                                                                                                                     //onKill.
    98.                     }
    99.                     break;
    100.             }
    101.         }
    102.     }
    103.  
    104.     //    public NativeArray<NativeQueue<EventMsg>> threadQueue;
    105.     //public NativeQueue<EventMsg>[] threadQueue;
    106.     public static NativeArray<UnsafeList<EventMsg>> threadQueue;
    107.  
    108.     protected override void OnCreate()
    109.     {
    110.  
    111.  
    112.         base.OnCreate();
    113.         //threadQueue = new NativeArray<NativeQueue<EventMsg>>(Unity.Jobs.LowLevel.Unsafe.JobsUtility.MaxJobThreadCount, Allocator.Persistent, NativeArrayOptions.UninitializedMemory);
    114.         //threadQueue = new NativeQueue<EventMsg>[Unity.Jobs.LowLevel.Unsafe.JobsUtility.MaxJobThreadCount];
    115.         threadQueue = new NativeArray<UnsafeList<EventMsg>>(Unity.Jobs.LowLevel.Unsafe.JobsUtility.MaxJobThreadCount, Allocator.Persistent, NativeArrayOptions.UninitializedMemory);
    116.         for (int i = 0; i < threadQueue.Length; i++)
    117.         {
    118.             //threadQueue[i] = new NativeQueue<EventMsg>(Allocator.Persistent);
    119.             threadQueue[i] = new UnsafeList<EventMsg>(100, Allocator.Persistent, NativeArrayOptions.ClearMemory);
    120.             //threadQueue[i].
    121.         }
    122.     }
    123.     protected override void OnDestroy()
    124.     {
    125.         base.OnDestroy();
    126.         for (int i = 0; i < threadQueue.Length; i++)
    127.         {
    128.             threadQueue[i].Dispose();
    129.         }
    130.         threadQueue.Dispose();
    131.     }
    132.     protected override void OnUpdate()
    133.     {
    134.  
    135.         var job = new EventMsgParseJob()
    136.         {
    137.  
    138.             audioIn = AudioSystem.messageIn,
    139.             em = this.EntityManager,
    140.             threadQueue = threadQueue,
    141.             p_threadQueue = (UnsafeList<EventMsg>*)threadQueue.GetUnsafePtr(),
    142.             onKillData = GetComponentDataFromEntity<OnKill>(),
    143.         };
    144.  
    145.  
    146.  
    147.         var handle = job
    148.             .Schedule(threadQueue.Length, 1); //each item is a queue, so batches of 1
    149.         handle.Complete();
    150.     }
    151. }
    152.  
    153.  
    Missing is the publishing and consuming systems. Publisher would enqueue to the MessageSystem.threadQueue static. consumers are currently manually hardcoded into the messageSystem loop (line 91).

    After getting that working, I went out to see how other DOTS event systems were designed.

    A great primer on using c# events (meaning no burst) can be found here:
    (video by @CodeMonkeyYT ) He does it two ways: NativeQueue and message Entities. both ways seem unsatisfactory to me due to sync points and the no-burst requirement but it's still a good primer.

    I started reading through @tertle's event system: https://forum.unity.com/threads/event-system.779711/ and found the idea of NativeStreams very interesting. Unfortunately the lack of documention is super frustrating. Via search I found https://github.com/Unity-Technologi.../Unity.Collections.Tests/NativeStreamTests.cs
    which shows some usage of NativeStreams. NativeStreams seem to allow multiple producers and consumers, basically what events are all about.

    Given that' I'm new to Unity, I think I better figure out how to setup unit tests, so I can validate some of my assumptions (like NativeStreams: If a Reader is added after a write, that reader won't get the prior written data?) and also will look further into tertle's event system after that.

    I'll add more to this thread afterwards.
     
  7. WAYNGames

    WAYNGames

    Joined:
    Mar 16, 2019
    Posts:
    938
    Native stream is the data container. the reader and writer are ways to access and modify that data container.

    My summarized explanation of it is has follow.

    You create a native stream with the foreachCount.
    That foreachCount is basically the number of thread that can write to the native stream. So in case of IJobChunk and EntitiesForeach it would be the number of chunks.

    That will basically create a container that is sort of an array of native queues (the difference being that you can write anything to it not just a single type of data).

    When you want to write to the native stream you request a Writer instance of it.
    This gives you access to the beginforeach and endforeach method.
    Specifying the index in the beginforeach method will tell the native stream that you want to write to the "native queue" referenced at that index in the array of native queues.
    You can only write to that index in a single thread.
    Once you are done writing, you invoke the endforeach method.
    Once it's done, you can't ever write to that ever again.

    Then to read the native stream, you do the same thing but requesting a Reader instead of a Writer.
    You can read the same index multiple times and I think you can read the same index form multiple thread at the same time (as each reader instance will have it's own tracking of at what point it is in the reading process).
    The reader will have access to everything that was written up to that point.
    You can not read and write at the same time.

    Hope this help anyone reading this, it took me a while to understand how that container worked and I hope I'm not completely wrong about it. I'have been using it for some time now for my ability/skill system and it works great. (I only ran into one issue but found a work around, more detail here https://forum.unity.com/threads/nat...ities-0-16-0-preview-21.1036351/#post-6709684)
     
  8. jasons-novaleaf

    jasons-novaleaf

    Joined:
    Sep 13, 2012
    Posts:
    181
    Really, thank you so much for that explanation. Without it I think I would have given up, as if you don't use NativeStream exactly as you described, you will get obtuse runtime exceptions thrown.

    If anyone wants to see basic NativeStream usage, here is a test I wrote that is passing successfully
    Code (CSharp):
    1.     [Test]
    2.     [Category("ecs lowlevel")]
    3.     public void NativeStreamBasic()
    4.     {
    5.         var stream = new NativeStream(1,Allocator.Persistent);
    6.  
    7.        
    8.         var writer = stream.AsWriter();
    9.         var reader = stream.AsReader();
    10.  
    11.         writer.BeginForEachIndex(0);
    12.         for (var i = 0; i < 100; i++)
    13.         {
    14.             if (i % 2 == 0)
    15.             {
    16.                 //unsafe{
    17.                 //    var pInt = (int*)writer.Allocate(sizeof(int));
    18.                 //    *pInt = i;
    19.                 //}
    20.                 ref int x = ref writer.Allocate<int>();
    21.                 x = i;
    22.             }
    23.             else
    24.             {
    25.                 writer.Write(i);
    26.             }
    27.         }
    28.         writer.EndForEachIndex();
    29.  
    30.         reader.BeginForEachIndex(0);
    31.         for (var i = 0; i < 100; i++)
    32.         {
    33.             var result = reader.Read<int>();
    34.             Assert.AreEqual(i, result);
    35.         }
    36.         reader.EndForEachIndex();
    37.  
    38.         stream.Dispose();
    39.     }
     
  9. RoughSpaghetti3211

    RoughSpaghetti3211

    Joined:
    Aug 11, 2015
    Posts:
    1,694
    I having trouble understanding this and would really like too, is there a small simple example I can look at. Useing dynamic buffers
     
    Last edited: Jan 23, 2021