]> de.git.xonotic.org Git - xonotic/darkplaces.git/blobdiff - taskqueue.c
Redesigned TaskQueue to have a queue and distributor model so that threads can keep...
[xonotic/darkplaces.git] / taskqueue.c
index 710a40e2ef4671f18af5681b9992c2aaea5277fb..35a6c753d691378a87c30aba381ecf891399a152 100644 (file)
@@ -1,26 +1,50 @@
 #include "quakedef.h"\r
 #include "taskqueue.h"\r
 \r
-cvar_t taskqueue_maxthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_maxthreads", "4", "how many threads to use for executing tasks"};\r
+cvar_t taskqueue_minthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_minthreads", "4", "minimum number of threads to keep active for executing tasks"};\r
+cvar_t taskqueue_maxthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_maxthreads", "32", "maximum number of threads to start up as needed based on task count"};\r
+cvar_t taskqueue_tasksperthread = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_tasksperthread", "4000", "expected amount of work that a single thread can do in a frame - the number of threads being used depends on the average workload in recent frames"};\r
+\r
+#define MAXTHREADS 1024\r
+#define RECENTFRAMES 64 // averaging thread activity over this many frames to decide how many threads we need\r
+#define THREADTASKS 256 // thread can hold this many tasks in its own queue\r
+#define THREADBATCH 64 // thread will run this many tasks before checking status again\r
+#define THREADSLEEPCOUNT 1000 // thread will sleep for a little while if it checks this many times and has no work to do\r
 \r
 typedef struct taskqueue_state_thread_s\r
 {\r
        void *handle;\r
+       unsigned int quit;\r
+       unsigned int thread_index;\r
+       unsigned int tasks_completed;\r
+\r
+       unsigned int enqueueposition;\r
+       unsigned int dequeueposition;\r
+       taskqueue_task_t *queue[THREADTASKS];\r
 }\r
 taskqueue_state_thread_t;\r
 \r
 typedef struct taskqueue_state_s\r
 {\r
+       // TaskQueue_DistributeTasks cycles through the threads when assigning, each has its own queue\r
+       unsigned int enqueuethread;\r
        int numthreads;\r
-       taskqueue_state_thread_t threads[1024];\r
+       taskqueue_state_thread_t threads[MAXTHREADS];\r
 \r
-       // command \r
+       // synchronization point for enqueue and some other memory access\r
        Thread_SpinLock command_lock;\r
 \r
-       int threads_quit;\r
+       // distributor queue (not assigned to threads yet, or waiting on other tasks)\r
+       unsigned int queue_enqueueposition;\r
+       unsigned int queue_dequeueposition;\r
+       unsigned int queue_size;\r
+       taskqueue_task_t **queue_data;\r
 \r
-       // doubly linked list - enqueue pushes to list.prev, dequeue pops from list.next\r
-       taskqueue_task_t list;\r
+       // metrics to balance workload vs cpu resources\r
+       unsigned int tasks_recentframesindex;\r
+       unsigned int tasks_recentframes[RECENTFRAMES];\r
+       unsigned int tasks_thisframe;\r
+       unsigned int tasks_averageperframe;\r
 }\r
 taskqueue_state_t;\r
 \r
@@ -28,10 +52,9 @@ static taskqueue_state_t taskqueue_state;
 \r
 void TaskQueue_Init(void)\r
 {\r
+       Cvar_RegisterVariable(&taskqueue_minthreads);\r
        Cvar_RegisterVariable(&taskqueue_maxthreads);\r
-       // initialize the doubly-linked list header\r
-       taskqueue_state.list.next = &taskqueue_state.list;\r
-       taskqueue_state.list.prev = &taskqueue_state.list;\r
+       Cvar_RegisterVariable(&taskqueue_tasksperthread);\r
 }\r
 \r
 void TaskQueue_Shutdown(void)\r
@@ -40,20 +63,6 @@ void TaskQueue_Shutdown(void)
                TaskQueue_Frame(true);\r
 }\r
 \r
-static taskqueue_task_t *TaskQueue_GetPending(void)\r
-{\r
-       taskqueue_task_t *t = NULL;\r
-       if (taskqueue_state.list.next != &taskqueue_state.list)\r
-       {\r
-               // pop from list.next\r
-               t = taskqueue_state.list.next;\r
-               t->next->prev = t->prev;\r
-               t->prev->next = t->next;\r
-               t->prev = t->next = NULL;\r
-       }\r
-       return t;\r
-}\r
-\r
 static void TaskQueue_ExecuteTask(taskqueue_task_t *t)\r
 {\r
        // see if t is waiting on something\r
@@ -67,59 +76,55 @@ static void TaskQueue_ExecuteTask(taskqueue_task_t *t)
 // FIXME: this is basically fibers but less featureful - context switching for yield is not implemented\r
 static int TaskQueue_ThreadFunc(void *d)\r
 {\r
+       taskqueue_state_thread_t *s = (taskqueue_state_thread_t *)d;\r
+       unsigned int sleepcounter = 0;\r
        for (;;)\r
        {\r
                qboolean quit;\r
-               taskqueue_task_t *t = NULL;\r
+               while (s->dequeueposition != s->enqueueposition)\r
+               {\r
+                       taskqueue_task_t *t = s->queue[s->dequeueposition % THREADTASKS];\r
+                       TaskQueue_ExecuteTask(t);\r
+                       // when we advance, also clear the pointer for good measure\r
+                       s->queue[s->dequeueposition++ % THREADTASKS] = NULL;\r
+                       sleepcounter = 0;\r
+               }\r
                Thread_AtomicLock(&taskqueue_state.command_lock);\r
-               quit = taskqueue_state.threads_quit != 0;\r
-               t = TaskQueue_GetPending();\r
+               quit = s->quit != 0;\r
                Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
-               if (t)\r
-                       TaskQueue_ExecuteTask(t);\r
-               else if (quit)\r
+               if (quit)\r
                        break;\r
+               sleepcounter++;\r
+               if (sleepcounter >= THREADSLEEPCOUNT)\r
+                       Sys_Sleep(1000);\r
+               sleepcounter = 0;\r
        }\r
        return 0;\r
 }\r
 \r
-void TaskQueue_Execute(qboolean force)\r
-{\r
-       // if we have no threads to run the tasks, just start executing them now\r
-       if (taskqueue_state.numthreads == 0 || force)\r
-       {\r
-               for (;;)\r
-               {\r
-                       taskqueue_task_t *t = NULL;\r
-                       Thread_AtomicLock(&taskqueue_state.command_lock);\r
-                       t = TaskQueue_GetPending();\r
-                       Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
-                       if (!t)\r
-                               break;\r
-                       TaskQueue_ExecuteTask(t);\r
-               }\r
-       }\r
-}\r
-\r
 void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks)\r
 {\r
        int i;\r
-       // try not to spinlock for a long time by breaking up large enqueues\r
-       while (numtasks > 64)\r
+       Thread_AtomicLock(&taskqueue_state.command_lock);\r
+       if (taskqueue_state.queue_size <\r
+               (taskqueue_state.queue_enqueueposition < taskqueue_state.queue_dequeueposition ? taskqueue_state.queue_size : 0) +\r
+               taskqueue_state.queue_enqueueposition - taskqueue_state.queue_dequeueposition + numtasks)\r
        {\r
-               TaskQueue_Enqueue(64, tasks);\r
-               tasks += 64;\r
-               numtasks -= 64;\r
+               // we have to grow the queue...\r
+               unsigned int newsize = (taskqueue_state.queue_size + numtasks) * 2;\r
+               if (newsize < 1024)\r
+                       newsize = 1024;\r
+               taskqueue_state.queue_data = Mem_Realloc(zonemempool, taskqueue_state.queue_data, sizeof(*taskqueue_state.queue_data) * newsize);\r
+               taskqueue_state.queue_size = newsize;\r
        }\r
-       Thread_AtomicLock(&taskqueue_state.command_lock);\r
        for (i = 0; i < numtasks; i++)\r
        {\r
-               taskqueue_task_t *t = &tasks[i];\r
-               // push to list.prev\r
-               t->next = &taskqueue_state.list;\r
-               t->prev = taskqueue_state.list.prev;\r
-               t->next->prev = t;\r
-               t->prev->next = t;\r
+               if (tasks[i].yieldcount == 0)\r
+                       taskqueue_state.tasks_thisframe++;\r
+               taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = &tasks[i];\r
+               taskqueue_state.queue_enqueueposition++;\r
+               if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)\r
+                       taskqueue_state.queue_enqueueposition = 0;\r
        }\r
        Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
 }\r
@@ -136,31 +141,109 @@ qboolean TaskQueue_IsDone(taskqueue_task_t *t)
        return !t->done != 0;\r
 }\r
 \r
+void TaskQueue_DistributeTasks(void)\r
+{\r
+       Thread_AtomicLock(&taskqueue_state.command_lock);\r
+       if (taskqueue_state.numthreads > 0)\r
+       {\r
+               unsigned int attempts = taskqueue_state.numthreads;\r
+               while (attempts-- > 0 && taskqueue_state.queue_enqueueposition != taskqueue_state.queue_dequeueposition)\r
+               {\r
+                       taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];\r
+                       if (t->preceding && t->preceding->done == 0)\r
+                       {\r
+                               // task is waiting on something\r
+                               // first dequeue it properly\r
+                               taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;\r
+                               taskqueue_state.queue_dequeueposition++;\r
+                               if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)\r
+                                       taskqueue_state.queue_dequeueposition = 0;\r
+                               // now put it back in the distributor queue - we know there is room because we just made room\r
+                               taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = t;\r
+                               taskqueue_state.queue_enqueueposition++;\r
+                               if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)\r
+                                       taskqueue_state.queue_enqueueposition = 0;\r
+                               // we do not refresh the attempt counter here to avoid deadlock - quite often the only things sitting in the distributor queue are waiting on other tasks\r
+                       }\r
+                       else\r
+                       {\r
+                               taskqueue_state_thread_t *s = &taskqueue_state.threads[taskqueue_state.enqueuethread];\r
+                               if (s->enqueueposition - s->dequeueposition < THREADTASKS)\r
+                               {\r
+                                       // add the task to the thread's queue\r
+                                       s->queue[(s->enqueueposition++) % THREADTASKS] = t;\r
+                                       // since we succeeded in assigning the task, advance the distributor queue\r
+                                       taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;\r
+                                       taskqueue_state.queue_dequeueposition++;\r
+                                       if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)\r
+                                               taskqueue_state.queue_dequeueposition = 0;\r
+                                       // refresh our attempt counter because we did manage to assign something to a thread\r
+                                       attempts = taskqueue_state.numthreads;\r
+                               }\r
+                       }\r
+               }\r
+       }\r
+       Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
+       // execute one pending task on the distributor queue, this matters if numthreads is 0\r
+       if (taskqueue_state.queue_dequeueposition != taskqueue_state.queue_enqueueposition)\r
+       {\r
+               taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];\r
+               taskqueue_state.queue_dequeueposition++;\r
+               if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)\r
+                       taskqueue_state.queue_dequeueposition = 0;\r
+               if (t)\r
+                       TaskQueue_ExecuteTask(t);\r
+       }\r
+}\r
+\r
 void TaskQueue_WaitForTaskDone(taskqueue_task_t *t)\r
 {\r
        qboolean done = false;\r
-       while (!done)\r
+       for (;;)\r
        {\r
                Thread_AtomicLock(&taskqueue_state.command_lock);\r
                done = t->done != 0;\r
                Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
-               // if there are no threads, just execute the tasks immediately\r
-               if (!done && taskqueue_state.numthreads == 0)\r
-                       TaskQueue_Execute(true);\r
+               if (done)\r
+                       break;\r
+               TaskQueue_DistributeTasks();\r
        }\r
 }\r
 \r
 void TaskQueue_Frame(qboolean shutdown)\r
 {\r
-       int numthreads = shutdown ? 0 : bound(0, taskqueue_maxthreads.integer, sizeof(taskqueue_state.threads) / sizeof(taskqueue_state.threads[0]));\r
+       int i;\r
+       unsigned long long int avg;\r
+       int maxthreads = bound(0, taskqueue_maxthreads.integer, MAXTHREADS);\r
+       int numthreads = maxthreads;\r
+       int tasksperthread = bound(10, taskqueue_tasksperthread.integer, 100000);\r
 #ifdef THREADDISABLE\r
        numthreads = 0;\r
 #endif\r
-       if (taskqueue_state.numthreads != numthreads)\r
+\r
+       Thread_AtomicLock(&taskqueue_state.command_lock);\r
+       taskqueue_state.tasks_recentframesindex = (taskqueue_state.tasks_recentframesindex + 1) % RECENTFRAMES;\r
+       taskqueue_state.tasks_recentframes[taskqueue_state.tasks_recentframesindex] = taskqueue_state.tasks_thisframe;\r
+       taskqueue_state.tasks_thisframe = 0;\r
+       avg = 0;\r
+       for (i = 0; i < RECENTFRAMES; i++)\r
+               avg += taskqueue_state.tasks_recentframes[i];\r
+       taskqueue_state.tasks_averageperframe = avg / RECENTFRAMES;\r
+       Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
+\r
+       numthreads = taskqueue_state.tasks_averageperframe / tasksperthread;\r
+       numthreads = bound(taskqueue_minthreads.integer, numthreads, taskqueue_maxthreads.integer);\r
+\r
+       if (shutdown)\r
+               numthreads = 0;\r
+\r
+       // check if we need to close some threads\r
+       if (taskqueue_state.numthreads > numthreads)\r
        {\r
-               int i;\r
+               // tell extra threads to quit\r
                Thread_AtomicLock(&taskqueue_state.command_lock);\r
-               taskqueue_state.threads_quit = 1;\r
+               for (i = numthreads; i < taskqueue_state.numthreads; i++)\r
+                       taskqueue_state.threads[i].quit = 1;\r
                Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
                for (i = numthreads; i < taskqueue_state.numthreads; i++)\r
                {\r
@@ -168,15 +251,32 @@ void TaskQueue_Frame(qboolean shutdown)
                                Thread_WaitThread(taskqueue_state.threads[i].handle, 0);\r
                        taskqueue_state.threads[i].handle = NULL;\r
                }\r
+               // okay we're at the new state now\r
+               taskqueue_state.numthreads = numthreads;\r
+       }\r
+\r
+       // check if we need to start more threads\r
+       if (taskqueue_state.numthreads < numthreads)\r
+       {\r
+               // make sure we're not telling new threads to just quit on startup\r
                Thread_AtomicLock(&taskqueue_state.command_lock);\r
-               taskqueue_state.threads_quit = 0;\r
+               for (i = taskqueue_state.numthreads; i < numthreads; i++)\r
+                       taskqueue_state.threads[i].quit = 0;\r
                Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
+\r
+               // start new threads\r
                for (i = taskqueue_state.numthreads; i < numthreads; i++)\r
+               {\r
+                       taskqueue_state.threads[i].thread_index = i;\r
                        taskqueue_state.threads[i].handle = Thread_CreateThread(TaskQueue_ThreadFunc, &taskqueue_state.threads[i]);\r
+               }\r
+\r
+               // okay we're at the new state now\r
                taskqueue_state.numthreads = numthreads;\r
-               // if there are still pending tasks (e.g. no threads), execute them on main thread now\r
-               TaskQueue_Execute(true);\r
        }\r
+\r
+       // just for good measure, distribute any pending tasks that span across frames\r
+       TaskQueue_DistributeTasks();\r
 }\r
 \r
 void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1)\r
@@ -201,11 +301,12 @@ void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t)
                {\r
                        // update our partial progress, then yield to another pending task.\r
                        t->i[0] = numtasks;\r
+                       // set our preceding task to one of the ones we are watching for\r
+                       t->preceding = &tasks[numtasks - 1];\r
                        TaskQueue_Yield(t);\r
                        return;\r
                }\r
                numtasks--;\r
        }\r
-       t->started = 1;\r
        t->done = 1;\r
 }\r