Redesigned TaskQueue to have a queue and distributor model so that threads can keep...
[xonotic/darkplaces.git] / taskqueue.c
1 #include "quakedef.h"\r
2 #include "taskqueue.h"\r
3 \r
4 cvar_t taskqueue_minthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_minthreads", "4", "minimum number of threads to keep active for executing tasks"};\r
5 cvar_t taskqueue_maxthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_maxthreads", "32", "maximum number of threads to start up as needed based on task count"};\r
6 cvar_t taskqueue_tasksperthread = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_tasksperthread", "4000", "expected amount of work that a single thread can do in a frame - the number of threads being used depends on the average workload in recent frames"};\r
7 \r
8 #define MAXTHREADS 1024\r
9 #define RECENTFRAMES 64 // averaging thread activity over this many frames to decide how many threads we need\r
10 #define THREADTASKS 256 // thread can hold this many tasks in its own queue\r
11 #define THREADBATCH 64 // thread will run this many tasks before checking status again\r
12 #define THREADSLEEPCOUNT 1000 // thread will sleep for a little while if it checks this many times and has no work to do\r
13 \r
14 typedef struct taskqueue_state_thread_s\r
15 {\r
16         void *handle;\r
17         unsigned int quit;\r
18         unsigned int thread_index;\r
19         unsigned int tasks_completed;\r
20 \r
21         unsigned int enqueueposition;\r
22         unsigned int dequeueposition;\r
23         taskqueue_task_t *queue[THREADTASKS];\r
24 }\r
25 taskqueue_state_thread_t;\r
26 \r
27 typedef struct taskqueue_state_s\r
28 {\r
29         // TaskQueue_DistributeTasks cycles through the threads when assigning, each has its own queue\r
30         unsigned int enqueuethread;\r
31         int numthreads;\r
32         taskqueue_state_thread_t threads[MAXTHREADS];\r
33 \r
34         // synchronization point for enqueue and some other memory access\r
35         Thread_SpinLock command_lock;\r
36 \r
37         // distributor queue (not assigned to threads yet, or waiting on other tasks)\r
38         unsigned int queue_enqueueposition;\r
39         unsigned int queue_dequeueposition;\r
40         unsigned int queue_size;\r
41         taskqueue_task_t **queue_data;\r
42 \r
43         // metrics to balance workload vs cpu resources\r
44         unsigned int tasks_recentframesindex;\r
45         unsigned int tasks_recentframes[RECENTFRAMES];\r
46         unsigned int tasks_thisframe;\r
47         unsigned int tasks_averageperframe;\r
48 }\r
49 taskqueue_state_t;\r
50 \r
51 static taskqueue_state_t taskqueue_state;\r
52 \r
53 void TaskQueue_Init(void)\r
54 {\r
55         Cvar_RegisterVariable(&taskqueue_minthreads);\r
56         Cvar_RegisterVariable(&taskqueue_maxthreads);\r
57         Cvar_RegisterVariable(&taskqueue_tasksperthread);\r
58 }\r
59 \r
60 void TaskQueue_Shutdown(void)\r
61 {\r
62         if (taskqueue_state.numthreads)\r
63                 TaskQueue_Frame(true);\r
64 }\r
65 \r
66 static void TaskQueue_ExecuteTask(taskqueue_task_t *t)\r
67 {\r
68         // see if t is waiting on something\r
69         if (t->preceding && t->preceding->done == 0)\r
70                 TaskQueue_Yield(t);\r
71         else\r
72                 t->func(t);\r
73 }\r
74 \r
75 // FIXME: don't use mutex\r
76 // FIXME: this is basically fibers but less featureful - context switching for yield is not implemented\r
77 static int TaskQueue_ThreadFunc(void *d)\r
78 {\r
79         taskqueue_state_thread_t *s = (taskqueue_state_thread_t *)d;\r
80         unsigned int sleepcounter = 0;\r
81         for (;;)\r
82         {\r
83                 qboolean quit;\r
84                 while (s->dequeueposition != s->enqueueposition)\r
85                 {\r
86                         taskqueue_task_t *t = s->queue[s->dequeueposition % THREADTASKS];\r
87                         TaskQueue_ExecuteTask(t);\r
88                         // when we advance, also clear the pointer for good measure\r
89                         s->queue[s->dequeueposition++ % THREADTASKS] = NULL;\r
90                         sleepcounter = 0;\r
91                 }\r
92                 Thread_AtomicLock(&taskqueue_state.command_lock);\r
93                 quit = s->quit != 0;\r
94                 Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
95                 if (quit)\r
96                         break;\r
97                 sleepcounter++;\r
98                 if (sleepcounter >= THREADSLEEPCOUNT)\r
99                         Sys_Sleep(1000);\r
100                 sleepcounter = 0;\r
101         }\r
102         return 0;\r
103 }\r
104 \r
105 void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks)\r
106 {\r
107         int i;\r
108         Thread_AtomicLock(&taskqueue_state.command_lock);\r
109         if (taskqueue_state.queue_size <\r
110                 (taskqueue_state.queue_enqueueposition < taskqueue_state.queue_dequeueposition ? taskqueue_state.queue_size : 0) +\r
111                 taskqueue_state.queue_enqueueposition - taskqueue_state.queue_dequeueposition + numtasks)\r
112         {\r
113                 // we have to grow the queue...\r
114                 unsigned int newsize = (taskqueue_state.queue_size + numtasks) * 2;\r
115                 if (newsize < 1024)\r
116                         newsize = 1024;\r
117                 taskqueue_state.queue_data = Mem_Realloc(zonemempool, taskqueue_state.queue_data, sizeof(*taskqueue_state.queue_data) * newsize);\r
118                 taskqueue_state.queue_size = newsize;\r
119         }\r
120         for (i = 0; i < numtasks; i++)\r
121         {\r
122                 if (tasks[i].yieldcount == 0)\r
123                         taskqueue_state.tasks_thisframe++;\r
124                 taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = &tasks[i];\r
125                 taskqueue_state.queue_enqueueposition++;\r
126                 if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)\r
127                         taskqueue_state.queue_enqueueposition = 0;\r
128         }\r
129         Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
130 }\r
131 \r
132 // if the task can not be completed due yet to preconditions, just enqueue it again...\r
133 void TaskQueue_Yield(taskqueue_task_t *t)\r
134 {\r
135         t->yieldcount++;\r
136         TaskQueue_Enqueue(1, t);\r
137 }\r
138 \r
139 qboolean TaskQueue_IsDone(taskqueue_task_t *t)\r
140 {\r
141         return !t->done != 0;\r
142 }\r
143 \r
144 void TaskQueue_DistributeTasks(void)\r
145 {\r
146         Thread_AtomicLock(&taskqueue_state.command_lock);\r
147         if (taskqueue_state.numthreads > 0)\r
148         {\r
149                 unsigned int attempts = taskqueue_state.numthreads;\r
150                 while (attempts-- > 0 && taskqueue_state.queue_enqueueposition != taskqueue_state.queue_dequeueposition)\r
151                 {\r
152                         taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];\r
153                         if (t->preceding && t->preceding->done == 0)\r
154                         {\r
155                                 // task is waiting on something\r
156                                 // first dequeue it properly\r
157                                 taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;\r
158                                 taskqueue_state.queue_dequeueposition++;\r
159                                 if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)\r
160                                         taskqueue_state.queue_dequeueposition = 0;\r
161                                 // now put it back in the distributor queue - we know there is room because we just made room\r
162                                 taskqueue_state.queue_data[taskqueue_state.queue_enqueueposition] = t;\r
163                                 taskqueue_state.queue_enqueueposition++;\r
164                                 if (taskqueue_state.queue_enqueueposition >= taskqueue_state.queue_size)\r
165                                         taskqueue_state.queue_enqueueposition = 0;\r
166                                 // we do not refresh the attempt counter here to avoid deadlock - quite often the only things sitting in the distributor queue are waiting on other tasks\r
167                         }\r
168                         else\r
169                         {\r
170                                 taskqueue_state_thread_t *s = &taskqueue_state.threads[taskqueue_state.enqueuethread];\r
171                                 if (s->enqueueposition - s->dequeueposition < THREADTASKS)\r
172                                 {\r
173                                         // add the task to the thread's queue\r
174                                         s->queue[(s->enqueueposition++) % THREADTASKS] = t;\r
175                                         // since we succeeded in assigning the task, advance the distributor queue\r
176                                         taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition] = NULL;\r
177                                         taskqueue_state.queue_dequeueposition++;\r
178                                         if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)\r
179                                                 taskqueue_state.queue_dequeueposition = 0;\r
180                                         // refresh our attempt counter because we did manage to assign something to a thread\r
181                                         attempts = taskqueue_state.numthreads;\r
182                                 }\r
183                         }\r
184                 }\r
185         }\r
186         Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
187         // execute one pending task on the distributor queue, this matters if numthreads is 0\r
188         if (taskqueue_state.queue_dequeueposition != taskqueue_state.queue_enqueueposition)\r
189         {\r
190                 taskqueue_task_t *t = taskqueue_state.queue_data[taskqueue_state.queue_dequeueposition];\r
191                 taskqueue_state.queue_dequeueposition++;\r
192                 if (taskqueue_state.queue_dequeueposition >= taskqueue_state.queue_size)\r
193                         taskqueue_state.queue_dequeueposition = 0;\r
194                 if (t)\r
195                         TaskQueue_ExecuteTask(t);\r
196         }\r
197 }\r
198 \r
199 void TaskQueue_WaitForTaskDone(taskqueue_task_t *t)\r
200 {\r
201         qboolean done = false;\r
202         for (;;)\r
203         {\r
204                 Thread_AtomicLock(&taskqueue_state.command_lock);\r
205                 done = t->done != 0;\r
206                 Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
207                 if (done)\r
208                         break;\r
209                 TaskQueue_DistributeTasks();\r
210         }\r
211 }\r
212 \r
213 void TaskQueue_Frame(qboolean shutdown)\r
214 {\r
215         int i;\r
216         unsigned long long int avg;\r
217         int maxthreads = bound(0, taskqueue_maxthreads.integer, MAXTHREADS);\r
218         int numthreads = maxthreads;\r
219         int tasksperthread = bound(10, taskqueue_tasksperthread.integer, 100000);\r
220 #ifdef THREADDISABLE\r
221         numthreads = 0;\r
222 #endif\r
223 \r
224         Thread_AtomicLock(&taskqueue_state.command_lock);\r
225         taskqueue_state.tasks_recentframesindex = (taskqueue_state.tasks_recentframesindex + 1) % RECENTFRAMES;\r
226         taskqueue_state.tasks_recentframes[taskqueue_state.tasks_recentframesindex] = taskqueue_state.tasks_thisframe;\r
227         taskqueue_state.tasks_thisframe = 0;\r
228         avg = 0;\r
229         for (i = 0; i < RECENTFRAMES; i++)\r
230                 avg += taskqueue_state.tasks_recentframes[i];\r
231         taskqueue_state.tasks_averageperframe = avg / RECENTFRAMES;\r
232         Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
233 \r
234         numthreads = taskqueue_state.tasks_averageperframe / tasksperthread;\r
235         numthreads = bound(taskqueue_minthreads.integer, numthreads, taskqueue_maxthreads.integer);\r
236 \r
237         if (shutdown)\r
238                 numthreads = 0;\r
239 \r
240         // check if we need to close some threads\r
241         if (taskqueue_state.numthreads > numthreads)\r
242         {\r
243                 // tell extra threads to quit\r
244                 Thread_AtomicLock(&taskqueue_state.command_lock);\r
245                 for (i = numthreads; i < taskqueue_state.numthreads; i++)\r
246                         taskqueue_state.threads[i].quit = 1;\r
247                 Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
248                 for (i = numthreads; i < taskqueue_state.numthreads; i++)\r
249                 {\r
250                         if (taskqueue_state.threads[i].handle)\r
251                                 Thread_WaitThread(taskqueue_state.threads[i].handle, 0);\r
252                         taskqueue_state.threads[i].handle = NULL;\r
253                 }\r
254                 // okay we're at the new state now\r
255                 taskqueue_state.numthreads = numthreads;\r
256         }\r
257 \r
258         // check if we need to start more threads\r
259         if (taskqueue_state.numthreads < numthreads)\r
260         {\r
261                 // make sure we're not telling new threads to just quit on startup\r
262                 Thread_AtomicLock(&taskqueue_state.command_lock);\r
263                 for (i = taskqueue_state.numthreads; i < numthreads; i++)\r
264                         taskqueue_state.threads[i].quit = 0;\r
265                 Thread_AtomicUnlock(&taskqueue_state.command_lock);\r
266 \r
267                 // start new threads\r
268                 for (i = taskqueue_state.numthreads; i < numthreads; i++)\r
269                 {\r
270                         taskqueue_state.threads[i].thread_index = i;\r
271                         taskqueue_state.threads[i].handle = Thread_CreateThread(TaskQueue_ThreadFunc, &taskqueue_state.threads[i]);\r
272                 }\r
273 \r
274                 // okay we're at the new state now\r
275                 taskqueue_state.numthreads = numthreads;\r
276         }\r
277 \r
278         // just for good measure, distribute any pending tasks that span across frames\r
279         TaskQueue_DistributeTasks();\r
280 }\r
281 \r
282 void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1)\r
283 {\r
284         memset(t, 0, sizeof(*t));\r
285         t->preceding = preceding;\r
286         t->func = func;\r
287         t->i[0] = i0;\r
288         t->i[1] = i1;\r
289         t->p[0] = p0;\r
290         t->p[1] = p1;\r
291 }\r
292 \r
293 void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t)\r
294 {\r
295         size_t numtasks = t->i[0];\r
296         taskqueue_task_t *tasks = t->p[0];\r
297         while (numtasks > 0)\r
298         {\r
299                 // check the last task first as it's usually going to be the last to finish, so we do the least work by checking it first\r
300                 if (!tasks[numtasks - 1].done)\r
301                 {\r
302                         // update our partial progress, then yield to another pending task.\r
303                         t->i[0] = numtasks;\r
304                         // set our preceding task to one of the ones we are watching for\r
305                         t->preceding = &tasks[numtasks - 1];\r
306                         TaskQueue_Yield(t);\r
307                         return;\r
308                 }\r
309                 numtasks--;\r
310         }\r
311         t->done = 1;\r
312 }\r