Search Unity

  1. Megacity Metro Demo now available. Download now.
    Dismiss Notice
  2. Unity support for visionOS is now available. Learn more in our blog post.
    Dismiss Notice

[FREE] A simple thread pool I made in C#

Discussion in 'Scripting' started by imaginaryhuman, Jan 22, 2017.

  1. imaginaryhuman

    imaginaryhuman

    Joined:
    Mar 21, 2010
    Posts:
    5,834
    I made a simple-ish thread pool in C#. You can use it freely.

    It basically maintains a number of threads and re-uses them. It uses a very simple circular array as a job queue. See GetNewJob(), AssignJob() and ExecuteJobs(). Read the notes at the top how to use it.

    The Start() function contains one line to initialize the pool and one like to run a simple test. The test will max out your cpu for about 10 seconds or so.

    It's able to check periodically if all jobs are done... which is not real efficient or punctual, but it's sort of the only way since I can't really rely on Join() to wait for threads to end... they might end on their own beforehand so calling Join() on a closed thread hangs it. So it has to poll (in milliseconds increments) to see if jobs are done.

    To assign functions you need to create a function which takes an object. The object will be cast from a JobData class. GetNewJob() gets access to a JobData instance to modify, into which you can set your function and any parameter data as fields. This is passed into the thread. Then the function will be called and the JobData will be passed to it, which it can then use to get parameter data into the function. Feel free to expand the JobData fields as needed (for all functions). You can assign multiple jobs before executing, and can stop the threads from running the new jobs, using the ThreadsCanSeeNewJobs() function (false to stop them, true to make them aware). You could theoretically keep adding jobs (until queue is full) while other jobs are running and either let the threads discover the jobs or, if they ended and didn't see the jobs, signal them to go check by calling ExecuteJobs() again.

    Let me know if you spot any bugs. I'm pretty happy with it so far. Props to the C# crowd on the web for the main principles and some basic sourcecode.

    I've designed it to exclude WEBGL since that platform doesn't support threads. If you target some other platform with no threading you might want to set the appropriate compiler directives. It *should* run single-threaded when there is no threading support (untested).

    [edit]... I found a couple of bugs and it wasn't keeping track of the oldest active job properly, nor was it reporting the amount of free space properly, so I had to recode some bits and add a few new functions. New version is below.[/edit]

    Code (CSharp):
    1. using System.Collections;
    2. using System.Collections.Generic;
    3. using UnityEngine;
    4.  
    5. #if !UNITY_WEBGL
    6. using System.Threading;
    7. #endif
    8.  
    9. namespace TurboCharger{
    10.  
    11.     public class ThreadPool : MonoBehaviour {
    12.  
    13.         //Threads will search for jobs automatically, and if there are none, or if LookForNewJobs==false, they will go to sleep. They will not wake up until an ExecuteJobs() is called from somewhere.
    14.         //LookForNewJubs must == true for new jobs to be visible to the threads, and you also must ExecuteJobs() at some point unless you are assuming that threads will see your jobs when they are done working.
    15.         //Its possibly they will finish jobs before your new jobs are assigned and visible, so they may not necessarily run them automatically. If in doubt, call ExecuteJobs() after assigning work and making jobs visible.
    16.  
    17.         //Note also that regardless of the sequence in which you assign jobs, although they begin running in exact sequence due to the fact that each thread has to lock access to the threadpool to get the 'next' job,
    18.         //how much cpu time is given to each thread or how the thread interacts with other parts of the o/s may vary, e.g. calls to Debug.Log() might print 'out of sequence' messages. Generally speaking
    19.         //jobs will execute in approximate order, with some random jitter. It also depends on the work to be done and how the o/s assigns cpu time.
    20.         //You can at least be certain that more recently added 'new jobs' will absolutely NOT be executed until older jobs are completed, and at least one thread has no older jobs left to run
    21.  
    22.         //Basic usage:
    23.         //1) Upon Start(), threadool will be auto-initialized. Otherwise call InitializeThreadPool() with number of threads and max amount of jobs that can be queued at one time
    24.         //2) Call GetNewJob() to get reference to 'empty' JobData object (which will contain old data) which you can edit the fields of
    25.         //3) Modify fields of JobData object, store the function you want to be run, and any parameter data
    26.         //4) Call AssignJob() to 'lock in' the job and make the queue advance to the next empty job. Optionally execute the job now (which will also execute any other pending jobs).
    27.         //NB: After you call AssignJob(), any other references you have to the JobData object should be considered obsolete. Do not use them. If you try to re-use it later, you will break the threadpool. Always use GetNewJob()
    28.         //5) Optionally you can assign multiple jobs before running them. First do GetNewJob(), edit the job, AssignJob(false), for each of the jobs.... then at the end make one call to ExecuteJobs()
    29.         //NB: Threads may constantly be on the lookout for new jobs except when there are no jobs left, at which point they go to sleep and will only be woken to look for work by AssignJob(true) or ExecuteJobs()
    30.         //But if threads are already awake/working on older jobs, and you AssignJob(false), the job may instantly or soon be automatically executed because threads look for new work when they finish a job.
    31.         //If you prefer, you can stop threads looking for NEW jobs, with ThreadsSeeNewJobs(false); They will continue running existing jobs till done then go to sleep. You can then GetNewJob(), edit, and AssignJob()
    32.         //for several jobs, and then finally ThreadsSeeNewJobs(true) to allow them to be seen, and ExecuteJobs() to signal them to wake up and check for jobs.
    33.  
    34.         #if !UNITY_WEBGL
    35.         public Thread[] Threads;                            //Array of actual threads
    36.         #endif
    37.  
    38.         public int ThreadCount=0;                            //Maximum/total number of threads in the threadpool
    39.         public JobData[] Jobs;                                //Circular Array as a queue of temporary job data to be used by threadpool when it claims ownership of a job, containing the function to execute, variables/parameters to pass etc. Allows us to add new jobs onto the virtual 'end' while keeping job data in-place until it's done running, so that threads can pass the JobData class instance to the function being run without having to copy field data
    40.         public int MaximumJobs=0;                            //How many jobs can be stored in the Jobs array
    41.         public int JobsWaiting=0;                            //How many jobs are waiting to be consumed by the pool
    42.         public int JobsRunning=0;                            //How many jobs are currently still running
    43.         public int NextEmptyJobIndex=0;                        //Index of the next empty slot in the Jobs array, where we can set up a new job. Since Jobs is circular, the index wraps around to 0 when =Length
    44.         public int NextJobToRun=0;                            //Index of the next job that's set up ready to be run by a thread
    45.         public int OldestJob=0;                                //Index of the oldest job, in order to keep track of empty space and know where the space ends
    46.         public int JobsAvailable=0;                            //How many jobs are available for assigning new tasks
    47.         public bool LookForNewJobs=true;                    //Whether threads should be allowed to get new jobs. Until this is true, new jobs cannot be run, but running jobs will continue.
    48.  
    49.         public int Default_MaximumPoolSize=16;                //Default number of real threads in the theadpool - a good size is AT LEAST the number of CPU cores, or that number x2. Make sure to account for 'hyperthreading cores' as well which run 2 threads each (e.g. quad core reports as 8 cores, so try 16 threads). If the thread is stalling or waiting a lot or not using a lot of cpu time, you might want more threads to keep the pool busy.
    50.         public int Default_MaximumJobs=4096;                //Default size of job queue - can't create more jobs than this at once. GetNewJob() will return null if there are none free. Or see GetJobSpace() and WaitForJobSpace().
    51.         public bool Default_LookForNewJobs=true;            //Default for whether to look for new jobs or not
    52.  
    53.         readonly object ThreadLocker = new object();        //A lock to use to isolate shared code/data in each thread and to give us the ability to send a signal to each thread to check for jobs
    54.         public delegate void Job(object obj);                //Define the function pointer format for the job function - accepts a JobData object cast as an object
    55.  
    56.         void Start(){
    57.             //Startup initialization
    58.             Debug.Log("Starting");
    59.             InitializeThreadPool(Default_MaximumPoolSize,Default_MaximumJobs);    //Create a threadpool if there isn't one set up yet
    60.             Test();
    61.         }
    62.  
    63.         public void Test(){
    64.             //Experiment to test if the threadpool is working
    65.             int index;
    66.             for (index=0;index<1000;index++){                //Add 1000 jobs
    67.                 WaitForJobSpace(1,0);                        //Make sure we have room, in case the queue size is smaller than the number of jobs we're going to add. Don't need this if you know there is enough room for the number of jobs you're adding. You could just do a WaitForJobSpace(1000) outside the loop
    68.                 JobData MyJob = GetNewJob();                //Get a new job to edit
    69.                 MyJob.JobToRun = TestFunction;                //The function we want to run on the threadpool
    70.                 MyJob.Text1 = "Hello World";                //The parameter for the function
    71.                 AssignJob(true);                            //Assign it and start executing it right now while we're still adding jobs - could just wait till all jobs are assigned but if queue size is < number of jobs being added, we will be stuck
    72.             }
    73.             ExecuteJobs();                                    //Run the jobs
    74.             //FinishAllJobs(1);                                //Finish all jobs and wait for them to be done
    75.             //Debug.Log("All Jobs are Done");
    76.             CloseThreadPool();                                //Finish jobs and then close the pool
    77.             Debug.Log("Pool is closed");
    78.         }
    79.  
    80.         public void TestFunction(object obj){
    81.             //Test function to print a string parameter and do 10 million calculations
    82.             JobData MyJob = (JobData)obj;
    83.             Debug.Log(MyJob.Text1+" Job: "+MyJob.JobIndex);    //Say "Hello World"
    84.             int index;
    85.             int total=0;
    86.             for (index=0;index<1000000;index++){
    87.                 total++;                                    //Do work
    88.             }
    89.         }
    90.  
    91.         public ThreadPool(){
    92.             //Constructor which will get called if someone does like: ThreadPool myPool = new ThreadPool();
    93.             //With no parameters, a default-size threadpool will be created
    94.             InitializeThreadPool(Default_MaximumPoolSize,Default_MaximumJobs);    //Create a threadpool
    95.         }
    96.  
    97.         public ThreadPool(int PoolSize){
    98.             //Constructor which will get called if someone does like: ThreadPool myPool = new ThreadPool(16);
    99.             //PoolSize parameter defines how many threads to allocate
    100.             //Default size of job queue will be created
    101.             InitializeThreadPool(PoolSize,Default_MaximumJobs);    //Create a pool
    102.         }
    103.  
    104.         public ThreadPool(int PoolSize, int MaxJobs){
    105.             //Constructor which will get called if someone does like: ThreadPool myPool = new ThreadPool(16,1024);
    106.             //PoolSize parameter defines how many threads to allocate
    107.             //MaxJobs defines what size job queue to create - maximum number of simultaneous job entries
    108.             InitializeThreadPool(PoolSize,MaxJobs);    //Create a threadpool if there isn't one set up yet
    109.         }
    110.  
    111.         ~ThreadPool(){
    112.             //Destructor called automatically when object is being destroyed - you cannot call this manually
    113.             //To shut down the threadpool manaully call CloseThreadPool()
    114.             CloseThreadPool();                                //Shut down the threadpool if it hasn't been done already. This will only finish jobs/close threads if Threads[] is != null
    115.         }
    116.  
    117.         public void InitializeThreadPool(int PoolSize,int MaxJobs){
    118.             //Initialize the threadpool to a given number of threads
    119.             //This will erase any existing jobs or threads, which is dangerous if threads are still running jobs
    120.             //PoolSize is how many threads to allocate
    121.             //MaxJobs is how many jobs can be queued at one time - make sure there is enough room in the circular array to store currently executing jobs (for as many threads as there are in the pool) + room for new jobs (and at least enough space for new jobs 1 per thread so that we can close all the threads)
    122.  
    123.             //Create room for storing jobs
    124.             if (MaxJobs<PoolSize*2){
    125.                 MaxJobs=PoolSize*2;                            //Make sure it's at least big enough to store 1 currently running job per thread + 1 yet-to-run job per thread so that threads can be closed - this won't necessarily guarantee that there's enough space since the active job zone of the queue can be fragmented with randoly finished jobs
    126.             }
    127.             Jobs=new JobData[MaxJobs];                        //Room for data for each job
    128.             int index;
    129.             for (index=0;index<MaxJobs;index++){
    130.                 Jobs[index]=new JobData();                    //Create a re-useable job data object which we can store new jobs in, and the reals thread can read for job info and parameters to pass to their functions
    131.                 Jobs[index].JobIndex=index;                    //Store the index of each job in case functions want to refer to it
    132.             }
    133.             JobsWaiting=0;                                    //No jobs to run yet
    134.             JobsRunning=0;                                    //How many jobs are currently running
    135.             NextEmptyJobIndex=0;                            //Start at the beginning
    136.             NextJobToRun=0;                                    //Next job to run is currently at end of Jobs array because there are no jobs yet, but if we do that, then when we assign a job it will be job 0 and nextjobtorun will be the previous job, so we need to fake it to 0 here assuming there are no jobs running yet so that the thread will get the first job. The first call to AssignJob will put a job at 0 and not change 'nextjob' so the next job to run will be at 0
    137.             OldestJob=0;                                    //The oldest job is at 0 because there are no jobs yet
    138.             JobsAvailable=MaxJobs;                            //All jobs are now available
    139.             MaximumJobs=MaxJobs;                            //Largest number of jobs that can be stored at once
    140.             LookForNewJobs=Default_LookForNewJobs;            //Whether to allow threads to get new jobs                          
    141.  
    142.             //Create real threads on platforms that support them
    143.             #if !UNITY_WEBGL
    144.             Threads = new Thread[PoolSize];                    //Room to store real threads
    145.             for (index=0;index<PoolSize;index++){
    146.                 Threads[index]=new Thread(JobProcessor);    //Create a new real thread for the threadpool to use, each will run a JobProcessor function which will look for and process jobs
    147.                 Threads[index].Start(this);                    //Start the thread running. It will look for jobs. We pass in 'this' threadpool class instance so that all JobProcessors can access and share the threadpool data
    148.             }
    149.             ThreadCount=PoolSize;                            //We now have this many threads waiting for work
    150.             #else
    151.             ThreadCount=0;                                    //No threads supported!
    152.             #endif
    153.         }
    154.  
    155.         public void ThreadsSeeNewJobs(bool SeeJobs){
    156.             //Whether the threads can see and theefore consume new jobs from the queue (at some point after they are added), or whether to ignore them
    157.             //Calling this with 'true' will allow the threads to start working on jobs, after having finished their current job or after a call to Execute() which wakes them up from sleeping
    158.             //Just because threads are allowed to see new jobs doesn't mean they will run them yet
    159.             //Call this with 'false' allows you to let threads continue running their current jobs, but then go to sleep while you load a batch of new jobs with GetNewJob() and AssignJob().
    160.             //When you are then done assigning jobs you can call ThreadsSeeNewJobs(true) followed by EcecuteJobs()
    161.             //This allows you to 'delay' the running of new jobs but not interrupt existing jobs
    162.             //Default is that new jobs are always seen automatically
    163.             #if !UNITY_WEBGL
    164.             lock (ThreadLocker){                            //Get exclusive access
    165.             #endif
    166.                 LookForNewJobs=SeeJobs;                        //Set it
    167.             #if !UNITY_WEBGL
    168.             }
    169.             #endif
    170.         }
    171.  
    172.         public JobData GetNewJob(){
    173.             //Get the JobData object for a new job, so that you can modify the data to assign a function, set parameters etc
    174.             //This doesn't need to be locked because no threads will touch empty jobs
    175.             //Will return null if there are no empty jobs available, so check for ==null after calling
    176.             #if !UNITY_WEBGL
    177.             lock (ThreadLocker){                            //Get exclusive access
    178.             #endif
    179.                 if (JobsAvailable>0){                        //Fail if the next job is still running or we've used all our jobs up
    180.                     return Jobs[NextEmptyJobIndex];            //Return reference to the next job that can be assigned
    181.                 } else {
    182.                     return null;                            //No job available
    183.                 }
    184.             #if !UNITY_WEBGL
    185.             }
    186.             #endif
    187.         }
    188.  
    189.         public bool AssignJob(bool ExecuteNow=false){
    190.             //Now that the JobData has been filled in by the user, advance the list of jobs to the next job
    191.             //This does not execute the jobs unless ExecuteNow==true. Either threads will find the jobs automatically when they're done with their current jobs, or you will need to call ExecuteJobs()
    192.             //Return true or false whether it was possible to assign the job. Will only be false if there is no more job space left for this job to be assigned (all jobs are busy or queued)
    193.             #if !UNITY_WEBGL
    194.             lock (ThreadLocker){                            //Get exclusive access
    195.             #endif
    196.                 if (JobsAvailable==0){                        //Is there room left? This was checked in GetNewJob() but just to be sure we don't overwrite an active job
    197.                     return false;                            //Can't assign the job, no space left in the queue
    198.                 }
    199.                 JobsWaiting++;                                //One more job is now waiting
    200.                 JobsAvailable--;                            //One less job available
    201.                 NextEmptyJobIndex++;                        //Next job
    202.                 if (NextEmptyJobIndex==MaximumJobs){
    203.                     NextEmptyJobIndex=0;                    //Warp around circular queue Job array
    204.                 }
    205.             #if !UNITY_WEBGL
    206.             }
    207.             #endif
    208.             if (ExecuteNow==true){
    209.                 ExecuteJobs();                                //Run the job now (and any other pending jobs) - signal threads to look for jobs
    210.             }
    211.             return true;                                    //This job was successfully assigned and possibly executed
    212.         }
    213.          
    214.         public void ExecuteJobs(){
    215.             //Trigger any waiting real threads to wake up and check for work, since we just loaded some new jobs into the Jobs array
    216.             //If threading is disabled, the jobs will be executed in sequence on the main thread before returning from this function, otherwise they will run in parallel
    217.             //Note that LookForNewJobs must ==true otherwise the awoken threads will not see any work to do -- see ThreadsSeeNewJobs(true);
    218.  
    219.             #if !UNITY_WEBGL
    220.             //Run on real threads
    221.             lock (ThreadLocker){                            //Obtain exclusive access to the threadpool data
    222.                 Monitor.PulseAll(ThreadLocker);                //Trigger suspended threads to check for jobs
    223.             }
    224.             #else
    225.             //Run on main thread only
    226.             if (JobsWaiting>0){
    227.                 int index;
    228.                 int jobindex=NextJobToRun;                    //Start with this job
    229.                 JobData ThisJob;                            //Its data
    230.                 int jobcount=JobsWaiting;                    //Need to record copy of this here because JobsWaiting will change in loop
    231.                 for (index=0;index<jobcount;index++){        //Go through all open jobs
    232.                     //Get the job
    233.                     ThisJob = Jobs[NextJobToRun];            //Get the data object
    234.                     ThisJob.JobRunning=true;                //Job is running
    235.                     JobsRunning++;                            //One more job is running
    236.                     JobsWaiting--;                            //One less job to do
    237.                     NextJobToRun++;                            //Next job
    238.                     if (NextJobToRun==MaximumJobs){            //At end of queue
    239.                         NextJobToRun=0;                        //Wrap around circular queue array
    240.                     }  
    241.                     //Check if this job is a dummy job asking us to close a thread
    242.                     if (ThisJob.JobToRun==null){
    243.                         //Skip it;        return;                //ThreadPool asked us to close this thread so 'return' from the JobProcessor function, which means the thread is done processing, which will close the thread (for threadless version, don't 'return')
    244.                     } else {
    245.                         //Otherwise can execute
    246.                         ThisJob.JobToRun(ThisJob);            //Call the function and pass parameter data to it
    247.                         //Finish up
    248.                         ThisJob.JobRunning=false;            //This job is done
    249.                         JobsRunning--;                        //One less job is running (not necessarily sequentually organized)
    250.                         if (ThisJob.JobIndex==OldestJob){    //Check if this job is the oldest job, because if so then we need to now scan to see how many jobs after us also have ended and move the OldestJob marker to the oldest unfinished job
    251.                             //Find the oldest active job
    252.                             int index=OldestJob;            //Current oldest position in the circular queue
    253.                             int maxjobs=MaximumJobs;        //For speed
    254.                             bool finished=false;            //Not done yet
    255.                             while (finished==false){
    256.                                 if (Jobs[index].JobRunning==false){
    257.                                     //This job is not running, go to the next one
    258.                                     JobsAvailable++;        //One more job has become available
    259.                                     index++;                //Next job to check
    260.                                     if (index==maxjobs){    //If past the right end of the queue
    261.                                         index=0;            //Wrap to the left
    262.                                     }
    263.                                     if ((index==NextJobToRun) || (index==NextEmptyJobIndex)){    //Did we get to the next non-running job that's actually a new job that hasn't been run yet? Or are there no new jobs and we just arrived at the next non-running empty job?
    264.                                         finished=true;        //We're done
    265.                                         OldestJob=index;    //This 'new' non-running job is (or will be) the OldestJob
    266.                                     }
    267.                                 } else {
    268.                                     //This job is running, so its the new oldest
    269.                                     finished=true;
    270.                                     OldestJob=index;        //This index is the oldest active job
    271.                                 }
    272.                             }
    273.                         }      
    274.                     }
    275.                 }
    276.             }
    277.             #endif
    278.         }
    279.  
    280.         public void FinishAllJobs(int MillisecsPerCheck=1){
    281.             //Execute all assigned jobs and then wait for them all to finish before returning
    282.             //MillisecsPerCheck is how many millisecs to wait between checks, 0 will cooperatively multitask but may check more often, otherwise checking every 1 millisec may waste time if jobs are done soon after a check
    283.             if (MillisecsPerCheck<0){
    284.                 MillisecsPerCheck=0;                        //Demand positive milliseconds
    285.             }
    286.             ThreadsSeeNewJobs(true);                        //Make sure the jobs will be seen if user for some reason hid jobs and didn't execute them yet
    287.             ExecuteJobs();                                    //Run the jobs
    288.             WaitForJobSpace(MaximumJobs,MillisecsPerCheck);        //Check every 1ms until there are absolutely no jobs left or running. We could wait until only the termination jobs are left (=ThreadCount).
    289.         }
    290.  
    291.         public void WaitForJobSpace(int SpaceNeeded=1, int MillisecsPerCheck=1){
    292.             //Since we keep threads alive and re-use them, the notion of waiting for jobs to be done can't be checked by using Join() unless the threads are going to permanently close
    293.             //Therefore we need to periodically check how many jobs are left, but we don't want to use a lot of cpu time constantly checking
    294.             //SpaceNeeded is how many jobs should be available for assignment before we return. If =MaximumJobs it will wait for all jobs to finish, or <MaximumJobs it will wait until there are that many jobs available
    295.             //e.g. if you need room for 16 jobs, and you have a Jobs queue size of 100 jobs, then waiting for 100-16=84 so it will wait until there are only 84 jobs assigned, leaving room for 16 more
    296.             //MillisecsPerCheck is how many milliseconds to wait between checks. Note that a lot of work can be done in 1 millisecond, so this could potentially waste time where the main thread is waiting unnecessarily
    297.             //But provided the job queue has plenty of busy work for threads to perform, hopefully most of the processing time is taken doing actual work and only >1 millisecond wasted
    298.             //Note then that each time you call this with a MillisecsPerCheck=1, you could be wasting up to 1 full millsecond of cpu time if all the jobs are done already
    299.             //Minimum time between checks is 0 millisecond. If you wait for 0 milliseconds, multitasking will hand over cpu time to the threads without a delay, but this will likely mean more frequent checks
    300.             //being performed by the main thread, which will reduce performance. However, the checks are small so giving up cpu timeslice quickly may still allow lots of work to be done
    301.             //If you want to immediately check how many jobs are left to run, call GetJobsRunning(), or how much space there is for new jobs call GetJobSpace().
    302.             //Note if you call with a high MillisecsPerCheck, there will be no way to break out of this until the timeout has expired, freezing Unity, because it puts the main thread to sleep
    303.             //Remember that a 1 millisecond wait means 1000 checks per second, or rather ~16 checks per frame at 60hz. If you feel you are wasting up to 1 millsecond of valuable cpu time, use MillisecsPerCheck=0
    304.             if (MillisecsPerCheck<0){
    305.                 MillisecsPerCheck=0;                        //Demand positive milliseconds
    306.             }
    307.             if (SpaceNeeded<0){
    308.                 SpaceNeeded=0;                                //Demand positive space left
    309.             }
    310.             bool done=false;
    311.             while (done==false){
    312.                 #if !UNITY_WEBGL
    313.                 lock(ThreadLocker){                            //Get exclusive access
    314.                 #endif
    315.                     if (JobsAvailable>=SpaceNeeded){        //Get current actual space available for jobs, is it enough?
    316.                         done=true;                            //We're at our threshold, there is enough room for the number of needed new jobs, can return now
    317.                     }
    318.                 #if !UNITY_WEBGL
    319.                 }
    320.                 #endif
    321.                 if (done==false){                            //If there are still too many jobs left...
    322.                     Thread.Sleep(MillisecsPerCheck);        //Go to sleep for a while/cooperatively multitask, then check how much space is left
    323.                 }
    324.             }
    325.         }
    326.  
    327.         private int GetJobsRunning(){
    328.             //Return number of jobs that are currently still running. This isn't necessarily the same as the amount of space available for new jobs because there is fragmentation in the
    329.             //block of currently running/recently finished jobs.
    330.             //This only tells you how many jobs are actually being processed, not how much space is left for new jobs. For that, use GetJobSpace()
    331.             #if !UNITY_WEBGL
    332.             lock(ThreadLocker){                                //Get exclusive access
    333.             #endif
    334.                 return JobsRunning;                            //Return number of running jobs
    335.             #if !UNITY_WEBGL
    336.             }
    337.             #endif
    338.         }
    339.  
    340.         private int GetJobSpace(){
    341.             //Return how much space is left for new jobs. Unlike WaitForJobSpace this does not wait for a certain amount of space to become available, it just tells you how much space there currently is
    342.             #if !UNITY_WEBGL
    343.             lock(ThreadLocker){                                //Get exclusive access
    344.             #endif
    345.                 return JobsAvailable;                        //Return number of available jobs
    346.             #if !UNITY_WEBGL
    347.             }
    348.             #endif
    349.         }
    350.                      
    351.         public void CloseThreadPool(){
    352.             //Signal all threads to shut down by setting up as many dummy jobs as there are threads, so that each thread will grab 1 job and then close
    353.             //Even if there are jobs left to run you can call this and it will allow all remaining jobs to execute, even ones which were not seen yet, and only then it will shut down the system
    354.             //Note that if you call this inside of Unity Editor, and the functions you're excecuting contain something that requires Unity to respond (e.g. Debug.Log()), and you enter
    355.             //into a Join() state with the threads to wait for them to end, this main thread will not be able to return control to Unity and it won't be able to process the editor updates,
    356.             //which may result in a deadlock, freeze or crash. Same would be true if your functions depend on something like this from plugins or other behaviors that might cause a deadlock.
    357.             //However, removal of Debug.Log() or anything else causing Unity to hang will allow this to work. Threads will process and work will be done to the end, then closed.
    358.             #if !UNITY_WEBGL
    359.             if (Threads != null){                            //Only try to shut down jobs/threads if we still have real threads created
    360.                 //There may still be jobs running. If there aren't enough free jobs, 1 per thread, to tell them to close, we will have to wait somehow until enough jobs are free
    361.                 WaitForJobSpace(ThreadCount,1);                    //Check every 1ms until there are at least enough jobs left to finish
    362.                 int index;
    363.                 JobData MyJob;
    364.                 for (index=0;index<ThreadCount;index++){
    365.                     MyJob=GetNewJob();                            //Get a new job to modify
    366.                     MyJob.JobToRun=null;                        //Set job to have no function, which will signal to thread to close permanently
    367.                     AssignJob(false);                            //Next job
    368.                 }
    369.                 //At this point there may still be jobs left to run besides the termination jobs, since we didn't wait for JobsLeft=0, so we need to wait here before doing Join()s
    370.                 FinishAllJobs(1);                                //Execute all remaining jobs and wait for them to finish
    371.                 //It'd be more efficient to do Join() on each thread to wait until they are all done, but its possible they may already have exited due to our null functions, and trying to Join() a thread that is already closed will make it hang
    372.                 //for (index=0;index<ThreadCount;index++){
    373.                 //    if (Threads[index].IsAlive==true){
    374.                 //        Threads[index].Join();                    //Join main thread's program flow to the thread, which basically means wait until the thread exits and closes before continuing, so we will wait for all threads to exit
    375.                 //    }
    376.                 //}
    377.             }
    378.             ThreadCount=0;                                    //No threads left
    379.             JobsWaiting=0;                                    //No jobs left
    380.             JobsAvailable=0;                                //No jobs available
    381.             MaximumJobs=0;                                    //No jobs
    382.             OldestJob=0;                                    //Oldest is first
    383.             NextJobToRun=0;                                    //Next job is 0
    384.             NextEmptyJobIndex=0;                            //Next new job is 0
    385.             Jobs=null;                                        //Release the job array
    386.             Threads=null;                                    //Release the threads
    387.             #endif
    388.         }
    389.  
    390.         void JobProcessor(object obj){
    391.             //This function is run by each individual real thread and looks for jobs to do in the array of jobs
    392.             //An instance of the threadpool class itself should be passed in, cast as an object, so that we can get access to the threadpool's state/data and interact with the jobs
    393.             //Use Thread.CurrentThread.ManagedThreadId to get the ID of this thread if you want it, then maybe assign it to a spare parameter in the JobData to make it accessible to the job function
    394.  
    395.             ThreadPool ThePool=(ThreadPool)obj;                //Cast the object back to a threadpool. This is now one of many references to the pool, so we will have to establish a lock or mutex to get safe access
    396.             JobData ThisJob;                                //Its data
    397.  
    398.             //Run forever, or until a 'null' job function is found which triggers the thread to permanently close
    399.             while (true){                                    //Continue processing forever until the exit state is reached - exit state is when we create a job with a 'StopThread' flag set to true
    400.                 lock (ThreadLocker){                        //Obtain an exclusive lock before running the enclosed code so no other threads can interfere
    401.                     //Wait for a new job
    402.                     while ((ThePool.JobsWaiting==0) || (ThePool.LookForNewJobs==false)){    //If there are no jobs waiting, or there are jobs waiting but we're not allowed to look for new jobs yet, then we'll put this thread on hold and wait to be signalled
    403.                         Monitor.Wait(ThreadLocker);            //No jobs available to consume, so cancel this lock temporarily and wait for a signal from the ThreadPool that there are new jobs available
    404.                     }
    405.  
    406.                     //Get the job
    407.                     ThisJob = ThePool.Jobs[ThePool.NextJobToRun];    //Get the data object
    408.                     ThisJob.JobRunning=true;                //Job is running
    409.                     ThePool.JobsRunning++;                    //One more job is running
    410.                     ThePool.JobsWaiting--;                    //One less job to do
    411.                     ThePool.NextJobToRun++;                    //Next job
    412.                     if (ThePool.NextJobToRun==ThePool.MaximumJobs){    //At end of queue
    413.                         ThePool.NextJobToRun=0;                //Wrap around circular queue array
    414.                     }
    415.                 }
    416.  
    417.                 //Execute the function
    418.                 if (ThisJob.JobToRun != null){                //Don't run the function if it's a null/terminate function
    419.                     ThisJob.JobToRun(ThisJob);                //Call the function and pass parameter data to it
    420.                 }
    421.  
    422.                 //Finish up
    423.                 lock (ThreadLocker){
    424.                     ThisJob.JobRunning=false;                //This job is done
    425.                     ThePool.JobsRunning--;                    //One less job is running (not necessarily sequentually organized)
    426.                     if (ThisJob.JobIndex==ThePool.OldestJob){    //Check if this job is the oldest job, because if so then we need to now scan to see how many jobs after us also have ended and move the OldestJob marker to the oldest unfinished job
    427.                         //Find the oldest active job
    428.                         int index=ThePool.OldestJob;        //Current oldest position in the circular queue
    429.                         int maxjobs=ThePool.MaximumJobs;    //For speed
    430.                         bool finished=false;                //Not done yet
    431.                         while (finished==false){
    432.                             if (ThePool.Jobs[index].JobRunning==false){
    433.                                 //This job is not running, go to the next one
    434.                                 ThePool.JobsAvailable++;    //One more job has become available
    435.                                 index++;                    //Next job to check
    436.                                 if (index==maxjobs){        //If past the right end of the queue
    437.                                     index=0;                //Wrap to the left
    438.                                 }
    439.                                 if ((index==NextJobToRun) || (index==NextEmptyJobIndex)){    //Did we get to the next non-running job that's actually a new job that hasn't been run yet? Or are there no new jobs and we just arrived at the next non-running empty job?
    440.                                     finished=true;            //We're done
    441.                                     ThePool.OldestJob=index;    //This 'new' non-running job is (or will be) the OldestJob
    442.                                 }
    443.                             } else {
    444.                                 //This job is running, so its the new oldest
    445.                                 finished=true;
    446.                                 ThePool.OldestJob=index;    //This index is the oldest active job
    447.                             }
    448.                         }
    449.                     }
    450.                     if (ThisJob.JobToRun==null){
    451.                         return;                                //This was a termination job, close the thread permanently!
    452.                     }
    453.                 }
    454.             }
    455.  
    456.         }
    457.  
    458.         public class JobData{
    459.             //Data associated with a job function to be consumed by the threadpool
    460.             //This class is nested inside the ThreadPool class to allow is to share the delegate definition of 'Job'
    461.  
    462.             //Internal fields
    463.             public bool JobRunning=false;                //Whether this is a running job that some thread has claimed ownership of. If so, you should not modify this job yet. DO NOT MANUALLY CHANGE THIS, only threads should change this internally
    464.             public int JobIndex=0;                        //Index of this job, in case functions wish to use this - DO NOT MANUALLY CHANGE THIS it will be assigned automatically. Also used to identify which index a job is at when figuring out if its the oldest job
    465.  
    466.             //User fields
    467.             public Job JobToRun;                        //A function to be run by the thread. Set this to null to tell the thread to close itself permanently. Since this is a delegate, you can do JobToRun+=extrafunction as many times as you like to add multiple function calls, provided they will all share the same parameter data
    468.             public int Integer1;                        //Function parameter data in the form of integers - you only need to set the ones that the function will use
    469.             public int Integer2;
    470.             public int Integer3;
    471.             public int Float1;                            //Function parameter data in the form of floats - you only need to set the ones that the function will use
    472.             public int Float2;
    473.             public int Float3;
    474.             public string Text1;                        //Text parameter data in the form of strings - you only need to set the ones that the function will use
    475.         }
    476.     }
    477. }
     
    Last edited: Jan 23, 2017
    yuliyF, akareactor and BAIZOR like this.
  2. bartofzo

    bartofzo

    Joined:
    Mar 16, 2017
    Posts:
    150
    Thanks for this code. I gave it a try today. Works the first time around, but then when I stop my project and start again. Unity hangs... any idea?

    EDIT: also when I attatch the script to a gameobject in a scene... after say a minute or so Unity freezes...
     
  3. imaginaryhuman

    imaginaryhuman

    Joined:
    Mar 21, 2010
    Posts:
    5,834
    I saw that happening a lot when something was not right, ie some bug. I crashed unity many many times. For me it seemed to run without that issue eventually once I figured out what was going on. But this stuff is pretty difficult to program correctly, and no warranties on whether the code is stable or safe or not.... just providing it as-is for anyone that wants to try it or can point out any problems with it. I did make a more recent version with possibly some bug fixes and extra features but I don't want to share that at the moment. To my knowledge the above version was working ok but maybe there's something I missed? What I generally had to do was start to comment out some stuff and test it and see if doing so fixed the hang, to try to narrow down where the problem was. Most hangs and freezes were usually something to do with some code not being properly locked or things being locked at the wrong time or a deadlock or something like that. It's not easy to debug.
     
    Last edited: Mar 20, 2017
  4. Feeble1

    Feeble1

    Joined:
    Jan 20, 2014
    Posts:
    19
    I know it has been a while, but I'm curious about your implementation of a thread pool. Have you been working with it? Did it prove stable?
     
  5. imaginaryhuman

    imaginaryhuman

    Joined:
    Mar 21, 2010
    Posts:
    5,834
    Unity is about to come out with new threaded jobs in 2018.1 which is probably a far more reliable and safe way to go about this. I believe my system worked and I took it further than the code I posted above, but temporarily shelved that project. I was always a bit nervous about whether I was doing it exactly right and avoiding crashes because one wrong move and the whole editor froze. So I am kind of waiting to use Unity's new system which I think will be a lot safer and probably easier. The reason I made my own was I wanted to be able to run 'programs' of commands (arrays of function delegates).
     
  6. Feeble1

    Feeble1

    Joined:
    Jan 20, 2014
    Posts:
    19
    I can absolutely understand your nervousness. I am experiencing a lot of freezes myself. It is certainly a result of something that I have done, but I'm not always able to find my errors. I wish there was a better debugger for threads.

    You have made a neat little pool, and I think I've learned something from it, thank you!
     
  7. imaginaryhuman

    imaginaryhuman

    Joined:
    Mar 21, 2010
    Posts:
    5,834
    Yah it was a good learning exercise. I built it because I wanted to do a lot of pixel processing per-frame and needed all the beef of multi core.
     
  8. imaginaryhuman

    imaginaryhuman

    Joined:
    Mar 21, 2010
    Posts:
    5,834
    i recommend using unity's c# job system rather than this ... it's likely much more stable and efficient.
     
    atalantus likes this.