Search Unity

Question Multithreading simulation

Discussion in 'C# Job System' started by Green11001, May 26, 2023.

  1. Green11001

    Green11001

    Joined:
    Apr 14, 2018
    Posts:
    397
    For the record, I know nothing about how multithreading works beyondd a basic understanding of what it is, so please explain things in simple terms.

    So I have some machine learning (NEAT/genetic algorithm) agents that I manually coded setup and training. However, everything is quite slow as there are a hundred or so agents training. These agents have a generic neural network (so a lot of math), and might do some raycasts each physics update, along with moving around and interacting with an environment. The agents and their respective environments are independent from each other for the most part, so there is no need to allow interactions. This seems like the right place to stick each agent and that agents environment into a separate thread, but I have no idea how to go about doing this.
     
  2. Kurt-Dekker

    Kurt-Dekker

    Joined:
    Mar 16, 2013
    Posts:
    38,745
    Last edited: May 26, 2023
  3. spiney199

    spiney199

    Joined:
    Feb 11, 2021
    Posts:
    7,935
    orionsyndrome and Kurt-Dekker like this.
  4. Green11001

    Green11001

    Joined:
    Apr 14, 2018
    Posts:
    397
    Since this is a pretty technical post, perhaps someone can move it to the jobs forum for me?

    Im looking at the job system, not entirely sure if I understand it
    So would I create a parallel job in some sort of SimulationMaster class and have it run fixedUpdate on each of the agents?

    Something like this:
    Code (CSharp):
    1. public class SimulationMaster {
    2.   NativeArray<Agents> agents;
    3.   public void Reset(){
    4.     // delete all the agents and clear the nativearray
    5.   }
    6.   public void CreateAgents(){
    7.     // Instantiate agents from a prefab and add them to nativearray
    8.   }
    9.   void FixedUpdate(){
    10.     // Create new parallel job with the agents nativearray
    11.     // Run fixedupdate (or rather a function named something similar like run or smth) on each agent in the parallel job struct execute function (im not sure how this works)
    12.   }
    13. }
     
  5. Ryiah

    Ryiah

    Joined:
    Oct 11, 2012
    Posts:
    21,204
    I'd love to see a screenshot of what you're seeing because as far as I'm aware it's not stated anywhere in the task manager, and even if it were the limitation you're likely thinking of is not a limitation with the processor but a limitation with the way Windows allocates memory for processes.

    https://stackoverflow.com/questions...mber-of-threads-a-process-can-have-in-windows
    https://superuser.com/questions/1469924/maximum-threads-limit-per-process-in-windows-10

    On Windows threads have 1MB of stack space allocated to them in virtual space. On 32-bit systems that means the most you can have is 2,000 threads (32-bit = 2GB virtual space / 1MB = 2,000). On 64-bit systems the limitation is far beyond that. In the second link someone was able to hit 270,000 threads.

    Getting back to the hardware, the CPU has a far lower limit than that. In the screenshot below I have 32 logical processors or threads (32 threads = 2 simultaneous threads per core * 16 cores). My CPU can't do more than 32 threads on its own but the OS has an abstraction on top that allows it to handle as many threads as can be fit into memory.

    upload_2023-5-26_10-32-30.png
     
    Last edited: May 26, 2023
  6. Green11001

    Green11001

    Joined:
    Apr 14, 2018
    Posts:
    397
    I dont know how on topic this is but I have 6 cores and 12 logical processors with a number 4000 below the threads text on the task manager. It should be enough to handle whatever im doing.
     
    MaltsGangfoot likes this.
  7. Ryiah

    Ryiah

    Joined:
    Oct 11, 2012
    Posts:
    21,204
    You basically create threads, pass them code that they execute, and then use the results. I've never worked with the Unity JobSystem but I have worked with .NET Threads. You have to be careful as Unity's APIs are not thread safe meaning you will run into all kinds of bizarre behaviour if you try to call them outside the main thread.

    Here's a quick example of how I approach threading in Unity. Parallel.For is a for loop that executes across multiple threads. I start by (a) collecting data for the calculations, (b) running the calculations, and (c) create an action that holds the api calls to be used with that data, and then in LateUpdate() I execute all of these actions.

    Code (csharp):
    1. using System.Collections.Concurrent;
    2. using System.Threading;
    3. using UnityEngine;
    4.  
    5. public class ExampleBehaviour : MonoBehaviour
    6. {
    7.     private ConcurrentQueue<System.Action> actions = new ConcurrentQueue<System.Action>();
    8.  
    9.     private void Update()
    10.     {
    11.         // (a) set up data required for the task (ie by pulling from any necessary unity apis)
    12.  
    13.         Task.Run(() =>
    14.         {
    15.             Parallel.For(0, 100, (i) =>
    16.             {
    17.                 // (b) perform some math calculations
    18.  
    19.                 actions.Enqueue(() =>
    20.                 {
    21.                     // (c) set up the call to the required unity apis
    22.                 });
    23.             });
    24.         });
    25.     }
    26.  
    27.     private void LateUpdate()
    28.     {
    29.         while (actions.TryDequeue(out var action))
    30.         {
    31.             action.Invoke();
    32.         }
    33.     }
    34. }
     
  8. Green11001

    Green11001

    Joined:
    Apr 14, 2018
    Posts:
    397
    I looked at the job system for a while and got nowhere, because I tried to make the native array with my neural networks as T and apparently it only accepts structs, which I don't see how I will get around.

    I don't quite understand the solution above- first, I would prefer it to be called in fixedUpdate (especially since if I speed up the timescale, update won't get called more as well), as the AI should update along with the physics, is this just not preferred? Second, this is a really stupid question, but uh, what kind of object is Task and Parallel? I'm trying to find documentation for your code, but I can't seem to dig up a similar example.
     
  9. Green11001

    Green11001

    Joined:
    Apr 14, 2018
    Posts:
    397
    where does it say that?
    I see
    "The following example executes up to 100 iterations of a loop in parallel. Each iteration pauses for a random interval from 1 to 1,000 milliseconds."
    But thats because they intentionally call sleep on the thread
     
  10. Green11001

    Green11001

    Joined:
    Apr 14, 2018
    Posts:
    397
    If I were to split it into tasks, then each fixedupdate call I would need:
    1. To do a series of raycasts, or some other way to get information from the environment, and store it into a double array as input
    2. Using the input from 1, I need to perform a forward pass on an object that represents a neural network, and get the result. The result is passed to the object so the object can say, move around or something.
    3. Repeat for 100 or so agents acting in the environment.
    4. Simulate physics for each agent and each environment (if there are multiple, otherwise every agent would be stacked on top of each other but not interacting, to save space)
     
  11. Green11001

    Green11001

    Joined:
    Apr 14, 2018
    Posts:
    397
    Annoyingly enough, I can't seem to raycast in the thread. I don't know if there is a solution for this, but ill just ignore raycasting for now (although it does occupy like a significant amount of resources) and focus on the neural network maths...
     
  12. Green11001

    Green11001

    Joined:
    Apr 14, 2018
    Posts:
    397
    How would queuing up the raycasts help? I'd still be doing the same amount of raycasts, which is like 300-600 every fixedupdate...
    Also, I'm defitnately doing something wrong, but using parallel makes my code run 5x slower according to the profiler
    Code (CSharp):
    1. if (multiThreaded == false){
    2.                 foreach (DodgeAgent d in agents){
    3.                     if (d != null){
    4.                         float[] outputs = m.GetOutputs(d.ID, d.process());
    5.                         d.interpret(outputs);
    6.                     }
    7.                 }
    8.             } else {
    9.                 List<float[]> inputs = new List<float[]>();
    10.                 for (int i = 0; i < agents.Count; i++){
    11.                     if (agents[i] != null){
    12.                         inputs.Add(agents[i].process());
    13.                     } else {
    14.                         inputs.Add(new float[]{});
    15.                     }
    16.                 }
    17.                 int len = agents.Count;
    18.                 var Result = Parallel.For(0, len, (i) => {
    19.                     if (agents[i] != null){
    20.                         float[] outputs = m.GetOutputs(agents[i].ID, inputs[i]);
    21.                         agents[i].interpret(outputs);
    22.                     }
    23.                 });
    24.             }
    (multiThreaded = true)
    Ignore the redundancy, I haven't cleaned up my scripts

    This is ran for every other fixedupdate call
     
  13. Green11001

    Green11001

    Joined:
    Apr 14, 2018
    Posts:
    397
    On a side note, I was thinking of splitting the entire simulation into chunks and offloading each onto a different cpu or something, and the main thread would simply wait until all those chunks are done simulating. This would save the time needed to constantly create and delete threads. Although, I can't see a way to get Unity physics to cooperate like this.
     
  14. Ryiah

    Ryiah

    Joined:
    Oct 11, 2012
    Posts:
    21,204
    Here's a larger example that spins cubes. Just be aware that the performance of this example is going to be worse than spinning them in a single thread thanks to the overhead. If you had intensive calculations performed before spinning them it would be faster.

    Code (csharp):
    1. using System.Collections;
    2. using System.Collections.Concurrent;
    3. using System.Collections.Generic;
    4. using System.Threading.Tasks;
    5. using Unity.Collections;
    6. using UnityEngine;
    7.  
    8. public class MultiThreadCubeSpin : MonoBehaviour
    9. {
    10.     [SerializeField] private GameObject prefab;
    11.     private GameObject[] cubes = new GameObject[10000];
    12.  
    13.     private Task task;
    14.     private ConcurrentQueue<System.Action> actionQueue = new ConcurrentQueue<System.Action>();
    15.  
    16.     private void Start()
    17.     {
    18.         var spacing = 33.0f;
    19.  
    20.         for (var cubeIndex = 0; cubeIndex < cubes.Length; cubeIndex++)
    21.         {
    22.             var position = new Vector3(Random.Range(-spacing, spacing), Random.Range(-spacing, spacing), Random.Range(-spacing, spacing));
    23.             cubes[cubeIndex] = Instantiate(prefab, position, Quaternion.identity);
    24.         }
    25.     }
    26.  
    27.     private void Update()
    28.     {
    29.         // Time can't be accessed from a thread
    30.         var deltaTime = Time.deltaTime;
    31.  
    32.         // Transform and it's rotation data can't be accessed from a thread
    33.         var rotations = new Quaternion[cubes.Length];
    34.         for (var cubeIndex = 0; cubeIndex < cubes.Length; cubeIndex++)
    35.         {
    36.             rotations[cubeIndex] = cubes[cubeIndex].transform.rotation;
    37.         }
    38.  
    39.         // Parallel.For is synchronous so we need the task to make it async
    40.         task = Task.Run(() =>
    41.         {
    42.             Parallel.For(0, cubes.Length, (cubeIndex) =>
    43.             {
    44.                 // Generate the rotation info
    45.                 var euler = rotations[cubeIndex].eulerAngles;
    46.                 var rotation = Quaternion.Euler(euler.x, euler.y + 250.0f * deltaTime, euler.z);
    47.  
    48.                 // Create the action to be performed
    49.                 actionQueue.Enqueue(() =>
    50.                 {
    51.                     cubes[cubeIndex].transform.rotation = rotation;
    52.                 });
    53.             });
    54.         });
    55.     }
    56.  
    57.     private void LateUpdate()
    58.     {
    59.         // Block the main thread until the async task has completed
    60.         task.GetAwaiter().GetResult();
    61.  
    62.         // Execute all of the actions
    63.         while (actionQueue.TryDequeue(out var action))
    64.         {
    65.             action.Invoke();
    66.         }
    67.     }
    68. }

    Edit: Half an hour, and some communication with ChatGPT, later I was able to duplicate the behaviour using the JobSystem. My approach using ConcurrentQueue and Parallel.For takes 48 milliseconds while this just takes 22.

    Code (csharp):
    1. using System.Collections;
    2. using System.Collections.Generic;
    3. using Unity.Collections;
    4. using Unity.Jobs;
    5. using UnityEngine;
    6.  
    7. public class JobSystemCubeSpin : MonoBehaviour
    8. {
    9.     [SerializeField] private GameObject prefab;
    10.  
    11.     private GameObject[] cubes;
    12.     private NativeArray<Quaternion> rotations;
    13.  
    14.     private JobHandle jobHandle;
    15.  
    16.     struct CubeSpinJob : IJobParallelFor
    17.     {
    18.         public float deltaTime;
    19.         public NativeArray<Quaternion> rotations;
    20.  
    21.         public void Execute(int i)
    22.         {
    23.             var euler = rotations[i].eulerAngles;
    24.             rotations[i] = Quaternion.Euler(euler.x, euler.y + 250.0f * deltaTime, euler.z);
    25.         }
    26.     }
    27.  
    28.     private void Start()
    29.     {
    30.         var spacing = 33.0f;
    31.  
    32.         cubes = new GameObject[10000];
    33.         rotations = new NativeArray<Quaternion>(cubes.Length, Allocator.Persistent);
    34.  
    35.         for (var cubeIndex = 0; cubeIndex < cubes.Length; cubeIndex++)
    36.         {
    37.             var position = new Vector3(Random.Range(-spacing, spacing), Random.Range(-spacing, spacing), Random.Range(-spacing, spacing));
    38.             rotations[cubeIndex] = Quaternion.identity;
    39.             cubes[cubeIndex] = Instantiate(prefab, position, rotations[cubeIndex]);
    40.         }
    41.     }
    42.  
    43.     private void Update()
    44.     {
    45.         var job = new CubeSpinJob()
    46.         {
    47.             deltaTime = Time.deltaTime,
    48.             rotations = rotations
    49.         };
    50.  
    51.         jobHandle = job.Schedule(cubes.Length, 64);
    52.     }
    53.  
    54.     private void LateUpdate()
    55.     {
    56.         jobHandle.Complete();
    57.  
    58.         for (var cubeIndex = 0; cubeIndex < cubes.Length; ++cubeIndex)
    59.         {
    60.             cubes[cubeIndex].transform.rotation = rotations[cubeIndex];
    61.         }
    62.     }
    63.  
    64.     private void OnDestroy()
    65.     {
    66.         rotations.Dispose();
    67.     }
    68. }

    For a complete comparison here's the single threaded code that I used as a base for all of the above code. Time for this to execute is 25 milliseconds so even with overhead the JobSystem is still faster.

    Code (csharp):
    1. using System.Collections;
    2. using System.Collections.Generic;
    3. using UnityEngine;
    4.  
    5. public class SingleThreadCubeSpin : MonoBehaviour
    6. {
    7.     [SerializeField] private GameObject prefab;
    8.  
    9.     private GameObject[] cubes = new GameObject[10000];
    10.  
    11.     private void Start()
    12.     {
    13.         var spacing = 33.0f;
    14.  
    15.         for (var cubeIndex = 0; cubeIndex < cubes.Length; cubeIndex++)
    16.         {
    17.             var position = new Vector3(Random.Range(-spacing, spacing), Random.Range(-spacing, spacing), Random.Range(-spacing, spacing));
    18.             cubes[cubeIndex] = Instantiate(prefab, position, Quaternion.identity);
    19.         }
    20.     }
    21.  
    22.     private void Update()
    23.     {
    24.         for (var cubeIndex = 0; cubeIndex < cubes.Length; cubeIndex++)
    25.         {
    26.             var euler = cubes[cubeIndex].transform.rotation.eulerAngles;
    27.             var rotation = Quaternion.Euler(euler.x, euler.y + 250.0f * Time.deltaTime, euler.z);
    28.  
    29.             cubes[cubeIndex].transform.rotation = rotation;
    30.         }
    31.     }
    32. }

    You can change Update() to FixedUpdate() but that's only going to affect when they're started. Also just in case you've not seen it yet I'm going to link the explanation of how FixedUpdate() works as it's not exactly doing what most people think it's doing.

    https://forum.unity.com/threads/the-truth-about-fixedupdate.231637/

    Task.Run() executes code asynchronously. We need to do that as Parallel.For() will block the main thread until it's finished executing and we want to spend the time it takes everything else to Update() to perform the task. Once we've reached LateUpdate() the task is forced to finish before we continue.

    You're not locked into that approach though. With some changes you could have the code update the cubes after one or more frames have passed giving yourself a much larger amount of time to perform your math, or even split the updates across multiple frames executing some of them each frame.

    https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.task
    https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.for
     
    Last edited: May 26, 2023
    icauroboros likes this.
  15. Green11001

    Green11001

    Joined:
    Apr 14, 2018
    Posts:
    397
    You are totally right, I'm stupid. Well, I can either set simulationmode to update, or manual simulate.

    If I go with manual simulate and simply simulate more times per update(), how would I have it parallel more in lateupdate?


    can I simply move this code to the bottom of update, below Physics2D.simulate()?
     
  16. Green11001

    Green11001

    Joined:
    Apr 14, 2018
    Posts:
    397
    I think its working now though, although I admittedly found a negligible speedup.
     
  17. Green11001

    Green11001

    Joined:
    Apr 14, 2018
    Posts:
    397
    Hm, I ordered the code in update as follows -> Parallel.For -> Physics Update -> GetResult
    I can't see any speedup at all though. Is this because putting physics update between the two multithread actions in update doesn't create enough time for the task to finish, or does it just not work if I put Getresult in update?
    Or are the math operations im doing just too small, and creating the threads is what takes the longest time? I'm not sure how to tell, although I feel like the math should be taking a bit (its 200 agents, each doing 200 summations and 20 sigmoids).
     
  18. icauroboros

    icauroboros

    Joined:
    Apr 30, 2021
    Posts:
    168
    With Burst compiling job and using TransformAccesArray instead of iterating on managed side, this code can be way more faster. 5x faster on 100_000 cubes in my case.

    Code (CSharp):
    1. using Unity.Burst;
    2. using UnityEngine;
    3. using Unity.Jobs;
    4. using Unity.Mathematics;
    5. using UnityEngine.Jobs;
    6. using Random = UnityEngine.Random;
    7.  
    8. public class JobSystemCubeSpinV2 : MonoBehaviour
    9. {
    10.     [SerializeField] private int _cubeCount = 100_000;
    11.     [SerializeField] private GameObject _prefab;
    12.  
    13.     private JobHandle _jobHandle;
    14.     private TransformAccessArray _transformAccessArray;
    15.  
    16.     // Burst compile our job
    17.     // IJobParallelForTransform is faster way to access transforms
    18.     [BurstCompile]
    19.     private struct CubeSpinJob : IJobParallelForTransform
    20.     {
    21.         // We cant access Time.deltaTime in jobs
    22.         public float DeltaTime;
    23.         public void Execute(int i, TransformAccess trs)
    24.         {
    25.             // use mathematics package instead of Vectors to benefit from Burst Compiler
    26.             float3 euler = trs.rotation.eulerAngles;
    27.             trs.rotation = quaternion.Euler(euler.x, euler.y + 250.0f * DeltaTime, euler.z);
    28.         }
    29.     }
    30.  
    31.     private void Start()
    32.     {
    33.         var spacing = 33.0f;
    34.  
    35.         _transformAccessArray = new TransformAccessArray(_cubeCount);
    36.  
    37.         for (var cubeIndex = 0; cubeIndex < _cubeCount; cubeIndex++)
    38.         {
    39.             var position = new Vector3(Random.Range(-spacing, spacing), Random.Range(-spacing, spacing), Random.Range(-spacing, spacing));
    40.             var cube = Instantiate(_prefab, position, Quaternion.identity);
    41.          
    42.             _transformAccessArray.Add(cube.transform);
    43.         }
    44.     }
    45.  
    46.     private void Update()
    47.     {
    48.         var job = new CubeSpinJob()
    49.         {
    50.             DeltaTime = Time.deltaTime,
    51.         };
    52.  
    53.         // this is our current bottleneck, since we schedule huge amount of job with little to no calculations
    54.         _jobHandle = job.Schedule(_transformAccessArray);
    55.     }
    56.  
    57.     private void LateUpdate()
    58.     {
    59.         // its good to put some time between Scheduling and completing job
    60.         _jobHandle.Complete();
    61.     }
    62.  
    63.     private void OnDestroy()
    64.     {
    65.         //TransformAccessArray need to be destroyed manually
    66.         _transformAccessArray.Dispose();
    67.     }
    68. }
     
    Ryiah likes this.