Search Unity

NativeStream usage.

Discussion in 'Entity Component System' started by swejk, Oct 1, 2019.

  1. swejk

    swejk

    Joined:
    Dec 22, 2013
    Posts:
    20
    Hello.

    I was checking out NativeStream collection which looked liked it could make my event processing system much simpler. I am processing events in FIFO fashion and I dont need parallel processing. The events are received continuously, every frame from a simulated game, but they are processed individually, sequentially and processing of a single event can take multiple frames before next event can be processed. To be specific, this system should control animation/visual effects of actions in a turn based game.
    The events are of different types, so this is the main reason i am considering NativeStream.
    I am having trouble with writting data to NativeStream in first job, and reading from it in a second job.
    I am receiving exception
    ArgumentException: BeginForEachIndex can only be called once for the same index (0).

    The code is at the bottom.
    Is it a correct way of using NativeStream ? Is it appropriate to use NativeStream in my use case ?
    Code (CSharp):
    1. struct PresentationQueueJob : IJob {
    2.  
    3. public NativeStream.Writer Writer;
    4. public NativeArray<UnitMovedEvent> UnitMovedEvents;
    5. public NativeArray<UnitAttackedEvent> UnitAttackedEvents;
    6.  
    7. public void Execute()
    8. {
    9.    Writer.BeginForEachIndex(0);
    10.   for(int i =0; i < UnitMovedEvents.Lenght; i++) {
    11.       Writer.Write(UnitMovedHeader);
    12.       Writer.Write(UnitMovedEvents[i]);
    13.   }
    14. // same for other events
    15.  
    16.   Writer.EndForEachIndex();
    17. }
    18. }
    19.  
    20. struct PresentationJob : IJob {
    21.  
    22. public NativeStream.Reader Reader;
    23. public EntityCommandBuffer Buffer;
    24. public NativeArray<PendingAction> PendingActions;
    25.  
    26. public void Execute()
    27. {
    28.  if(PendingActions.Lenght > 0 && PendingActions[0].IsCompleted) {
    29.    Reader.BeginForEachIndex(0);
    30.    var header = Reader.Read<int>();
    31.        swich(header) {
    32.      case UnitMovedHeader: {
    33.        var movedEvent = Reader.Read<UnitMovedEvent>();
    34.        var actionEntity = Buffer.CreateEntity();
    35.        Buffer.AddComponent(actionEntity, new PlayAnimation{Value = Move});
    36.        Buffer.AddComponent(actionEntity, new PendingAction { IsCompleted = false});
    37.        break;
    38.     }
    39.    }
    40.    Reader.EndForEachIndex();
    41.   }
    42. }
    43. }
     
  2. DreamingImLatios

    DreamingImLatios

    Joined:
    Jun 3, 2017
    Posts:
    4,264
    Show scheduling code? The error message may be misleading and I'm guessing it is a job chain issue.
     
  3. swejk

    swejk

    Joined:
    Dec 22, 2013
    Posts:
    20
    I am scheduling job as this:
    Code (CSharp):
    1.         protected override JobHandle OnUpdate(JobHandle inputDeps)
    2.         {
    3.             inputDeps = new PresentationQueueJob
    4.             {
    5.                 Writer= _presentationStream.AsWriter(),
    6.                 UnitMovedEvents = _unitMovedEventsQuery.ToComponentDataArray<UnitMovedEvent>(Allocator.TempJob),
    7.                 UnitAttackedEvents = _unitAttackedEventsQuery.ToComponentDataArray<UnitAttackedEvent>(Allocator.TempJob)
    8.             }.Schedule(inputDeps);
    9.  
    10.             var pendingActions = _pendingActionQuery.ToEntityArray(Allocator.TempJob);
    11.             inputDeps = new PresentationJob
    12.             {
    13.                 Buffer = _bufferSystem.CreateCommandBuffer(),
    14.                 PendingActions = pendingActions,
    15.                 Reader = _presentationStream.AsReader(),
    16.                 ProgressFromEntity = GetComponentDataFromEntity<Progress>(true)
    17.             }.Schedule(inputDeps);
    18.  
    19.             _bufferSystem.AddJobHandleForProducer(inputDeps);
    20.  
    21.             return inputDeps;
    22.         }

    I am creating native stream in OnCreate() as this:

    _presentationStream = new NativeStream(1, Allocator.Persistent);
     
  4. DreamingImLatios

    DreamingImLatios

    Joined:
    Jun 3, 2017
    Posts:
    4,264
    If this is the only system that touches that NativeStream, then I don't see anything wrong. The next thing I would look at is the stack trace. That should tell you which job is the offender. You can also change Schedule to Run on either or both jobs and see if that helps.
     
  5. swejk

    swejk

    Joined:
    Dec 22, 2013
    Posts:
    20
    Here is the test which throws
    ArgumentException: BeginForEachIndex can only be called once for the same index (0).

    Code (CSharp):
    1.        [Test]
    2.         public void TestReadWrite()
    3.         {
    4.             var stream = new NativeStream(1, Allocator.TempJob);
    5.             var writer = stream.AsWriter();
    6.             writer.BeginForEachIndex(0);
    7.             writer.Write(1);
    8.             writer.Write(1.5f);
    9.             writer.EndForEachIndex();
    10.             var reader = stream.AsReader();
    11.            
    12.             reader.BeginForEachIndex(0);
    13.             var intVal1 = reader.Read<int>();
    14.             var floatVal1 = reader.Read<float>();
    15.             reader.EndForEachIndex();
    16.  
    17.             writer.BeginForEachIndex(0);
    18.             writer.Write(2);
    19.             writer.Write(2.5f);
    20.             writer.EndForEachIndex();
    21.  
    22.             reader.BeginForEachIndex(0);
    23.             var intVal2 = reader.Read<int>();
    24.             var floatVal2 = reader.Read<float>();
    25.             reader.EndForEachIndex();
    26.            
    27.             Assert.True(1 == intVal1);
    28.             Assert.True(1.5f == floatVal1);
    29.            
    30.             Assert.True(2 == intVal2);
    31.             Assert.True(2.5f == floatVal2);
    32.  
    33.             stream.Dispose();
    34.         }

    Looks like NativeStream cant do what I was assuming. Test shows me that I cannot read/write from the same index multiple times. I thought that the index is there just to enable parallel read/writes.
    I will use multiple queues, one for each event type and one extra queue with headers as a alternative.
     
  6. DreamingImLatios

    DreamingImLatios

    Joined:
    Jun 3, 2017
    Posts:
    4,264
    Is it throwing on the second writer.BeginForEachIndex(0)? If so, that makes sense. You can't write to the same index multiple times.

    I guess I also never noticed that NativeStream doesn't have a Clear() method. So if you want to write to the stream every frame, you have to make the stream a tempJob and make a new one every frame.