1 #include "quakedef.h"
\r
2 #include "taskqueue.h"
\r
4 cvar_t taskqueue_maxthreads = {CVAR_CLIENT | CVAR_SERVER | CVAR_SAVE, "taskqueue_maxthreads", "32", "how many threads to use for executing tasks"};
\r
6 typedef struct taskqueue_state_thread_s
\r
10 taskqueue_state_thread_t;
\r
12 typedef struct taskqueue_state_s
\r
15 taskqueue_state_thread_t threads[1024];
\r
18 Thread_SpinLock command_lock;
\r
22 // doubly linked list - enqueue pushes to list.prev, dequeue pops from list.next
\r
23 taskqueue_task_t list;
\r
27 static taskqueue_state_t taskqueue_state;
\r
29 void TaskQueue_Init(void)
\r
31 Cvar_RegisterVariable(&taskqueue_maxthreads);
\r
32 // initialize the doubly-linked list header
\r
33 taskqueue_state.list.next = &taskqueue_state.list;
\r
34 taskqueue_state.list.prev = &taskqueue_state.list;
\r
37 void TaskQueue_Shutdown(void)
\r
39 if (taskqueue_state.numthreads)
\r
40 TaskQueue_Frame(true);
\r
43 static taskqueue_task_t *TaskQueue_GetPending(void)
\r
45 taskqueue_task_t *t = NULL;
\r
46 if (taskqueue_state.list.next != &taskqueue_state.list)
\r
48 // pop from list.next
\r
49 t = taskqueue_state.list.next;
\r
50 t->next->prev = t->prev;
\r
51 t->prev->next = t->next;
\r
52 t->prev = t->next = NULL;
\r
57 static void TaskQueue_ExecuteTask(taskqueue_task_t *t)
\r
59 // see if t is waiting on something
\r
60 if (t->preceding && t->preceding->done == 0)
\r
66 // FIXME: don't use mutex
\r
67 // FIXME: this is basically fibers but less featureful - context switching for yield is not implemented
\r
68 static int TaskQueue_ThreadFunc(void *d)
\r
73 taskqueue_task_t *t = NULL;
\r
74 Thread_AtomicLock(&taskqueue_state.command_lock);
\r
75 quit = taskqueue_state.threads_quit != 0;
\r
76 t = TaskQueue_GetPending();
\r
77 Thread_AtomicUnlock(&taskqueue_state.command_lock);
\r
79 TaskQueue_ExecuteTask(t);
\r
86 void TaskQueue_Execute(qboolean force)
\r
88 // if we have no threads to run the tasks, just start executing them now
\r
89 if (taskqueue_state.numthreads == 0 || force)
\r
93 taskqueue_task_t *t = NULL;
\r
94 Thread_AtomicLock(&taskqueue_state.command_lock);
\r
95 t = TaskQueue_GetPending();
\r
96 Thread_AtomicUnlock(&taskqueue_state.command_lock);
\r
99 TaskQueue_ExecuteTask(t);
\r
104 void TaskQueue_Enqueue(int numtasks, taskqueue_task_t *tasks)
\r
107 // try not to spinlock for a long time by breaking up large enqueues
\r
108 while (numtasks > 64)
\r
110 TaskQueue_Enqueue(64, tasks);
\r
114 Thread_AtomicLock(&taskqueue_state.command_lock);
\r
115 for (i = 0; i < numtasks; i++)
\r
117 taskqueue_task_t *t = &tasks[i];
\r
118 // push to list.prev
\r
119 t->next = &taskqueue_state.list;
\r
120 t->prev = taskqueue_state.list.prev;
\r
124 Thread_AtomicUnlock(&taskqueue_state.command_lock);
\r
127 // if the task can not be completed due yet to preconditions, just enqueue it again...
\r
128 void TaskQueue_Yield(taskqueue_task_t *t)
\r
131 TaskQueue_Enqueue(1, t);
\r
134 qboolean TaskQueue_IsDone(taskqueue_task_t *t)
\r
136 return !t->done != 0;
\r
139 void TaskQueue_WaitForTaskDone(taskqueue_task_t *t)
\r
141 qboolean done = false;
\r
144 Thread_AtomicLock(&taskqueue_state.command_lock);
\r
145 done = t->done != 0;
\r
146 Thread_AtomicUnlock(&taskqueue_state.command_lock);
\r
147 // if there are no threads, just execute the tasks immediately
\r
148 if (!done && taskqueue_state.numthreads == 0)
\r
149 TaskQueue_Execute(true);
\r
153 void TaskQueue_Frame(qboolean shutdown)
\r
155 int numthreads = shutdown ? 0 : bound(0, taskqueue_maxthreads.integer, sizeof(taskqueue_state.threads) / sizeof(taskqueue_state.threads[0]));
\r
156 #ifdef THREADDISABLE
\r
159 if (taskqueue_state.numthreads != numthreads)
\r
162 Thread_AtomicLock(&taskqueue_state.command_lock);
\r
163 taskqueue_state.threads_quit = 1;
\r
164 Thread_AtomicUnlock(&taskqueue_state.command_lock);
\r
165 for (i = 0; i < taskqueue_state.numthreads; i++)
\r
167 if (taskqueue_state.threads[i].handle)
\r
168 Thread_WaitThread(taskqueue_state.threads[i].handle, 0);
\r
169 taskqueue_state.threads[i].handle = NULL;
\r
171 Thread_AtomicLock(&taskqueue_state.command_lock);
\r
172 taskqueue_state.threads_quit = 0;
\r
173 Thread_AtomicUnlock(&taskqueue_state.command_lock);
\r
174 taskqueue_state.numthreads = numthreads;
\r
175 for (i = 0; i < taskqueue_state.numthreads; i++)
\r
176 taskqueue_state.threads[i].handle = Thread_CreateThread(TaskQueue_ThreadFunc, &taskqueue_state.threads[i]);
\r
177 // if there are still pending tasks (e.g. no threads), execute them on main thread now
\r
178 TaskQueue_Execute(true);
\r
182 void TaskQueue_Setup(taskqueue_task_t *t, taskqueue_task_t *preceding, void(*func)(taskqueue_task_t *), size_t i0, size_t i1, void *p0, void *p1)
\r
184 memset(t, 0, sizeof(*t));
\r
185 t->preceding = preceding;
\r
193 void TaskQueue_Task_CheckTasksDone(taskqueue_task_t *t)
\r
195 size_t numtasks = t->i[0];
\r
196 taskqueue_task_t *tasks = t->p[0];
\r
197 while (numtasks > 0)
\r
199 // check the last task first as it's usually going to be the last to finish, so we do the least work by checking it first
\r
200 if (!tasks[numtasks - 1].done)
\r
202 // update our partial progress, then yield to another pending task.
\r
203 t->i[0] = numtasks;
\r
204 TaskQueue_Yield(t);
\r