From: havoc Date: Thu, 23 Jan 2020 08:19:11 +0000 (+0000) Subject: Redesigned TaskQueue to have a queue and distributor model so that threads can keep... X-Git-Url: http://de.git.xonotic.org/?p=xonotic%2Fdarkplaces.git;a=commitdiff_plain;h=749072a8f60b88ef37448e224e83944b7096e909;ds=inline Redesigned TaskQueue to have a queue and distributor model so that threads can keep their own queues of work to do without locking to check for more work. Tasks are not executed until TaskQueue_WaitForTaskDone calls TaskQueue_DistributeTasks. Added a fast path to TaskQueue_DistributeTasks for tasks that are waiting on other tasks to finish first, they are simply dequeued and re-enqueued immediately, and tend to just live on the distributor queue. TaskQueue thread count is now dynamically adjusted based on tasks being queued per frame, and has a minimum and maximum thread count. TaskQueue threads now sleep when idle, this may not be ideal, but did substantially lower the cpu usage so that it doesn't make other applications relatively unresponsive like it did before. Removed taskqueue_task_t->started field and some other unused fields. git-svn-id: svn://svn.icculus.org/twilight/trunk/darkplaces@12513 d7cf8633-e32d-0410-b094-e92efae38249 --- diff --git a/r_shadow.c b/r_shadow.c index 12941412..3d14ab03 100644 --- a/r_shadow.c +++ b/r_shadow.c @@ -1948,8 +1948,6 @@ static void R_Shadow_BounceGrid_AssignPhotons_Task(taskqueue_task_t *t) randomseed_t randomseed; vec3_t baseshotcolor; - t->started = 1; - normalphotonscaling = 1.0f / max(0.0000001f, r_shadow_bouncegrid_state.settings.energyperphoton); for (lightindex = 0;lightindex < range2;lightindex++) { @@ -2333,7 +2331,6 @@ static void R_Shadow_BounceGrid_Slice(int zi) static void R_Shadow_BounceGrid_Slice_Task(taskqueue_task_t *t) { - t->started = 1; R_Shadow_BounceGrid_Slice((int)t->i[0]); t->done = 1; } @@ -2347,7 +2344,6 @@ static void R_Shadow_BounceGrid_EnqueueSlices_Task(taskqueue_task_t *t) TaskQueue_Yield(t); return; } - t->started = 1; slices = r_shadow_bouncegrid_state.resolution[2] - 2; for (i = 0; i < slices; i++) TaskQueue_Setup(r_shadow_bouncegrid_state.slices_tasks + i, NULL, R_Shadow_BounceGrid_Slice_Task, i + 1, 0, NULL, NULL); @@ -2393,7 +2389,6 @@ static void R_Shadow_BounceGrid_BlurPixels_Task(taskqueue_task_t *t) { float *pixels[4]; unsigned int resolution[3]; - t->started = 1; if (r_shadow_bouncegrid_state.settings.blur) { VectorCopy(r_shadow_bouncegrid_state.resolution, resolution); @@ -2593,7 +2588,6 @@ static void R_Shadow_BounceGrid_ConvertPixelsAndUpload(void) void R_Shadow_BounceGrid_ClearTex_Task(taskqueue_task_t *t) { - t->started = 1; memset(r_shadow_bouncegrid_state.highpixels, 0, r_shadow_bouncegrid_state.numpixels * sizeof(float[4])); t->done = 1; } @@ -2730,7 +2724,6 @@ static void R_Shadow_BounceGrid_TracePhotons_Shot(r_shadow_bouncegrid_photon_t * static void R_Shadow_BounceGrid_TracePhotons_ShotTask(taskqueue_task_t *t) { r_shadow_bouncegrid_photon_t *p = (r_shadow_bouncegrid_photon_t *)t->p[0]; - t->started = 1; R_Shadow_BounceGrid_TracePhotons_Shot(p, r_shadow_bouncegrid_state.settings.maxbounce, p->start, p->end, p->color, p->bounceminimumintensity2, p->startrefractiveindex); t->done = 1; } @@ -2738,7 +2731,6 @@ static void R_Shadow_BounceGrid_TracePhotons_ShotTask(taskqueue_task_t *t) static void R_Shadow_BounceGrid_EnqueuePhotons_Task(taskqueue_task_t *t) { int i; - t->started = 1; for (i = 0; i < r_shadow_bouncegrid_state.numphotons; i++) TaskQueue_Setup(r_shadow_bouncegrid_state.photons_tasks + i, NULL, R_Shadow_BounceGrid_TracePhotons_ShotTask, 0, 0, r_shadow_bouncegrid_state.photons + i, NULL); TaskQueue_Setup(&r_shadow_bouncegrid_state.photons_done_task, NULL, TaskQueue_Task_CheckTasksDone, r_shadow_bouncegrid_state.numphotons, 0, r_shadow_bouncegrid_state.photons_tasks, NULL); diff --git a/taskqueue.c b/taskqueue.c index 710a40e2..35a6c753 100644 --- a/taskqueue.c +++ b/taskqueue.c @@ -1,26 +1,50 @@ #include "quakedef.h" #include "taskqueue.h" -cvar_t taskqueue_maxthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_maxthreads", "4", "how many threads to use for executing tasks"}; +cvar_t taskqueue_minthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_minthreads", "4", "minimum number of threads to keep active for executing tasks"}; +cvar_t taskqueue_maxthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_maxthreads", "32", "maximum number of threads to start up as needed based on task count"}; +cvar_t taskqueue_tasksperthread = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_tasksperthread", "4000", "expected amount of work that a single thread can do in a frame - the number of threads being used depends on the average workload in recent frames"}; + +#define MAXTHREADS 1024 +#define RECENTFRAMES 64 // averaging thread activity over this many frames to decide how many threads we need +#define THREADTASKS 256 // thread can hold this many tasks in its own queue +#define THREADBATCH 64 // thread will run this many tasks before checking status again +#define THREADSLEEPCOUNT 1000 // thread will sleep for a little while if it checks this many times and has no work to do typedef struct taskqueue_state_thread_s { void *handle; + unsigned int quit; + unsigned int thread_index; + unsigned int tasks_completed; + + unsigned int enqueueposition; + unsigned int dequeueposition; + taskqueue_task_t *queue[THREADTASKS]; } taskqueue_state_thread_t; typedef struct taskqueue_state_s { + // TaskQueue_DistributeTasks cycles through the threads when assigning, each has its own queue + unsigned int enqueuethread; int numthreads; - taskqueue_state_thread_t threads[1024]; + taskqueue_state_thread_t threads[MAXTHREADS]; - // command + // synchronization point for enqueue and some other memory access Thread_SpinLock command_lock; - int threads_quit; + // distributor queue (not assigned to threads yet, or waiting on other tasks) + unsigned int queue_enqueueposition; + unsigned int queue_dequeueposition; + unsigned int queue_size; + taskqueue_task_t **queue_data; - // doubly linked list - enqueue pushes to list.prev, dequeue pops from list.next - taskqueue_task_t list; + // metrics to balance workload vs cpu resources + unsigned int tasks_recentframesindex; + unsigned int tasks_recentframes[RECENTFRAMES]; + unsigned int tasks_thisframe; + unsigned int tasks_averageperframe; } taskqueue_state_t; @@ -28,10 +52,9 @@ static taskqueue_state_t taskqueue_state; void TaskQueue_Init(void) { + Cvar_RegisterVariable(&taskqueue_minthreads); Cvar_RegisterVariable(&taskqueue_maxthreads); - // initialize the doubly-linked list header - taskqueue_state.list.next = &taskqueue_state.list; - taskqueue_state.list.prev = &taskqueue_state.list; + Cvar_RegisterVariable(&taskqueue_tasksperthread); } void TaskQueue_Shutdown(void) @@ -40,20 +63,6 @@ void TaskQueue_Shutdown(void) TaskQueue_Frame(true); } -static taskqueue_task_t *TaskQueue_GetPending(void) -{ - taskqueue_task_t *t = NULL; - if (taskqueue_state.list.next != &taskqueue_state.list) - { - // pop from list.next - t = taskqueue_state.list.next; - t->next->prev = t->prev; - t->prev->next = t->next; - t->prev = t->next = NULL; - } - return t; -} - static void TaskQueue_ExecuteTask(taskqueue_task_t *t) { // see if t is waiting on something @@ -67,59 +76,55 @@ static void TaskQueue_ExecuteTask(taskqueue_task_t *t) // FIXME: this is basically fibers but less featureful - context switching for yield is not implemented static int TaskQueue_ThreadFunc(void *d) { + taskqueue_state_thread_t *s = (taskqueue_state_thread_t *)d; + unsigned int sleepcounter = 0; for (;;) { qboolean quit; - taskqueue_task_t *t = NULL; + while (s->dequeueposition != s->enqueueposition) + { + taskqueue_task_t *t = s->queue[s->dequeueposition % THREADTASKS]; + TaskQueue_ExecuteTask(t); + // when we advance, also clear the pointer for good measure + s->queue[s->dequeueposition++ % THREADTASKS] = NULL; + sleepcounter = 0; + } Thread_AtomicLock(&taskqueue_state.command_lock); - quit = taskqueue_state.threads_quit != 0; - t = TaskQueue_GetPending(); + quit = s->quit != 0; Thread_AtomicUnlock(&taskqueue_state.command_lock); - if (t) - TaskQueue_ExecuteTask(t); - else if (quit) + if (quit) break; + sleepcounter++; + if (sleepcounter >= THREADSLEEPCOUNT) + Sys_Sleep(1000); + sleepcounter = 0; } return 0; } -void TaskQueue_Execute(qboolean force) -{ - // if we have no threads to run the tasks, just start executing them now - if (taskqueue_state.numthreads == 0 || force) - { - for (;;) - { - taskqueue_task_t *t = NULL; - Thread_AtomicLock(&taskqueue_state.command_lock); - t = TaskQueue_GetPending(); - Thread_AtomicUnlock(&taskqueue_state.command_lock); - if (!t) - break; - TaskQueue_ExecuteTask(t); - } - } -} - void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks) { int i; - // try not to spinlock for a long time by breaking up large enqueues - while (numtasks > 64) + Thread_AtomicLock(&taskqueue_state.command_lock); + if (taskqueue_state.queue_size < + (taskqueue_state.queue_enqueueposition < taskqueue_state.queue_dequeueposition ? taskqueue_state.queue_size : 0) + + taskqueue_state.queue_enqueueposition - taskqueue_state.queue_dequeueposition + numtasks) { - TaskQueue_Enqueue(64, tasks); - tasks += 64; - numtasks -= 64; + // we have to grow the queue... + unsigned int newsize = (taskqueue_state.queue_size + numtasks) * 2; + if (newsize < 1024) + newsize = 1024; + taskqueue_state.queue_data = Mem_Realloc(zonemempool, taskqueue_state.queue_data, sizeof(*taskqueue_state.queue_data) * newsize); + taskqueue_state.queue_size = newsize; } - Thread_AtomicLock(&taskqueue_state.command_lock); for (i = 0; i < numtasks; i++) { - taskqueue_task_t *t = &tasks[i]; - // push to list.prev - t->next = &taskqueue_state.list; - t->prev = taskqueue_state.list.prev; - t->next->prev = t; - t->prev->next = t; + if (tasks[i].yieldcount == 0) + taskqueue_state.tasks_thisframe++; + taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = &tasks[i]; + taskqueue_state.queue_enqueueposition++; + if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size) + taskqueue_state.queue_enqueueposition = 0; } Thread_AtomicUnlock(&taskqueue_state.command_lock); } @@ -136,31 +141,109 @@ qboolean TaskQueue_IsDone(taskqueue_task_t *t) return !t->done != 0; } +void TaskQueue_DistributeTasks(void) +{ + Thread_AtomicLock(&taskqueue_state.command_lock); + if (taskqueue_state.numthreads > 0) + { + unsigned int attempts = taskqueue_state.numthreads; + while (attempts-- > 0 && taskqueue_state.queue_enqueueposition != taskqueue_state.queue_dequeueposition) + { + taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition]; + if (t->preceding && t->preceding->done == 0) + { + // task is waiting on something + // first dequeue it properly + taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL; + taskqueue_state.queue_dequeueposition++; + if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size) + taskqueue_state.queue_dequeueposition = 0; + // now put it back in the distributor queue - we know there is room because we just made room + taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = t; + taskqueue_state.queue_enqueueposition++; + if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size) + taskqueue_state.queue_enqueueposition = 0; + // we do not refresh the attempt counter here to avoid deadlock - quite often the only things sitting in the distributor queue are waiting on other tasks + } + else + { + taskqueue_state_thread_t *s = &taskqueue_state.threads[taskqueue_state.enqueuethread]; + if (s->enqueueposition - s->dequeueposition < THREADTASKS) + { + // add the task to the thread's queue + s->queue[(s->enqueueposition++) % THREADTASKS] = t; + // since we succeeded in assigning the task, advance the distributor queue + taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL; + taskqueue_state.queue_dequeueposition++; + if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size) + taskqueue_state.queue_dequeueposition = 0; + // refresh our attempt counter because we did manage to assign something to a thread + attempts = taskqueue_state.numthreads; + } + } + } + } + Thread_AtomicUnlock(&taskqueue_state.command_lock); + // execute one pending task on the distributor queue, this matters if numthreads is 0 + if (taskqueue_state.queue_dequeueposition != taskqueue_state.queue_enqueueposition) + { + taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition]; + taskqueue_state.queue_dequeueposition++; + if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size) + taskqueue_state.queue_dequeueposition = 0; + if (t) + TaskQueue_ExecuteTask(t); + } +} + void TaskQueue_WaitForTaskDone(taskqueue_task_t *t) { qboolean done = false; - while (!done) + for (;;) { Thread_AtomicLock(&taskqueue_state.command_lock); done = t->done != 0; Thread_AtomicUnlock(&taskqueue_state.command_lock); - // if there are no threads, just execute the tasks immediately - if (!done && taskqueue_state.numthreads == 0) - TaskQueue_Execute(true); + if (done) + break; + TaskQueue_DistributeTasks(); } } void TaskQueue_Frame(qboolean shutdown) { - int numthreads = shutdown ? 0 : bound(0, taskqueue_maxthreads.integer, sizeof(taskqueue_state.threads) / sizeof(taskqueue_state.threads[0])); + int i; + unsigned long long int avg; + int maxthreads = bound(0, taskqueue_maxthreads.integer, MAXTHREADS); + int numthreads = maxthreads; + int tasksperthread = bound(10, taskqueue_tasksperthread.integer, 100000); #ifdef THREADDISABLE numthreads = 0; #endif - if (taskqueue_state.numthreads != numthreads) + + Thread_AtomicLock(&taskqueue_state.command_lock); + taskqueue_state.tasks_recentframesindex = (taskqueue_state.tasks_recentframesindex + 1) % RECENTFRAMES; + taskqueue_state.tasks_recentframes[taskqueue_state.tasks_recentframesindex] = taskqueue_state.tasks_thisframe; + taskqueue_state.tasks_thisframe = 0; + avg = 0; + for (i = 0; i < RECENTFRAMES; i++) + avg += taskqueue_state.tasks_recentframes[i]; + taskqueue_state.tasks_averageperframe = avg / RECENTFRAMES; + Thread_AtomicUnlock(&taskqueue_state.command_lock); + + numthreads = taskqueue_state.tasks_averageperframe / tasksperthread; + numthreads = bound(taskqueue_minthreads.integer, numthreads, taskqueue_maxthreads.integer); + + if (shutdown) + numthreads = 0; + + // check if we need to close some threads + if (taskqueue_state.numthreads > numthreads) { - int i; + // tell extra threads to quit Thread_AtomicLock(&taskqueue_state.command_lock); - taskqueue_state.threads_quit = 1; + for (i = numthreads; i < taskqueue_state.numthreads; i++) + taskqueue_state.threads[i].quit = 1; Thread_AtomicUnlock(&taskqueue_state.command_lock); for (i = numthreads; i < taskqueue_state.numthreads; i++) { @@ -168,15 +251,32 @@ void TaskQueue_Frame(qboolean shutdown) Thread_WaitThread(taskqueue_state.threads[i].handle, 0); taskqueue_state.threads[i].handle = NULL; } + // okay we're at the new state now + taskqueue_state.numthreads = numthreads; + } + + // check if we need to start more threads + if (taskqueue_state.numthreads < numthreads) + { + // make sure we're not telling new threads to just quit on startup Thread_AtomicLock(&taskqueue_state.command_lock); - taskqueue_state.threads_quit = 0; + for (i = taskqueue_state.numthreads; i < numthreads; i++) + taskqueue_state.threads[i].quit = 0; Thread_AtomicUnlock(&taskqueue_state.command_lock); + + // start new threads for (i = taskqueue_state.numthreads; i < numthreads; i++) + { + taskqueue_state.threads[i].thread_index = i; taskqueue_state.threads[i].handle = Thread_CreateThread(TaskQueue_ThreadFunc, &taskqueue_state.threads[i]); + } + + // okay we're at the new state now taskqueue_state.numthreads = numthreads; - // if there are still pending tasks (e.g. no threads), execute them on main thread now - TaskQueue_Execute(true); } + + // just for good measure, distribute any pending tasks that span across frames + TaskQueue_DistributeTasks(); } void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1) @@ -201,11 +301,12 @@ void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t) { // update our partial progress, then yield to another pending task. t->i[0] = numtasks; + // set our preceding task to one of the ones we are watching for + t->preceding = &tasks[numtasks - 1]; TaskQueue_Yield(t); return; } numtasks--; } - t->started = 1; t->done = 1; } diff --git a/taskqueue.h b/taskqueue.h index 454057ac..c7a53d98 100644 --- a/taskqueue.h +++ b/taskqueue.h @@ -7,41 +7,32 @@ typedef struct taskqueue_task_s { - // doubly linked list - struct taskqueue_task_s * volatile prev; - struct taskqueue_task_s * volatile next; - - // if not NULL, this task must be done before this one will dequeue (faster than simply Yielding immediately) + // if not NULL, this task must be done before this one will dequeue (faster than simply calling TaskQueue_Yield immediately) struct taskqueue_task_s *preceding; - // see TaskQueue_IsDone() to use proper atomics to poll done status - volatile int started; + // use TaskQueue_IsDone() to poll done status volatile int done; // function to call, and parameters for it to use void(*func)(struct taskqueue_task_s *task); - void *p[4]; - size_t i[4]; + // general purpose parameters + void *p[2]; + size_t i[2]; - // stats: - unsigned int yieldcount; // number of times this task has been requeued + unsigned int yieldcount; // number of times this task has been requeued - each task counts only once for purposes of tasksperthread averaging } taskqueue_task_t; -// immediately execute any pending tasks if threading is disabled (or if force is true) -// TRY NOT TO USE THIS IF POSSIBLE - poll task->done instead. -void TaskQueue_Execute(qboolean force); - -// queue the tasks to be executed, or executes them immediately if threading is disabled. +// queue the tasks to be executed, but does not start them (until TaskQueue_WaitforTaskDone is called) void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks); // if the task can not be completed due yet to preconditions, just enqueue it again... void TaskQueue_Yield(taskqueue_task_t *t); -// polls for status of task and returns the result immediately - use this instead of checking ->done directly, as this uses atomics +// polls for status of task and returns the result, does not cause tasks to be executed (see TaskQueue_WaitForTaskDone for that) qboolean TaskQueue_IsDone(taskqueue_task_t *t); -// polls for status of task and waits for it to be done +// triggers execution of queued tasks, and waits for the specified task to be done void TaskQueue_WaitForTaskDone(taskqueue_task_t *t); // convenience function for setting up a task structure. Does not do the Enqueue, just fills in the struct.