cdrom: adjust timing
[pcsx_rearmed.git] / deps / libretro-common / rthreads / tpool.c
CommitLineData
3719602c
PC
1/*
2 * Copyright (c) 2010-2020 The RetroArch team
3 * Copyright (c) 2017 John Schember <john@nachtimwald.com>
4 *
5 * ---------------------------------------------------------------------------------------
6 * The following license statement only applies to this file (tpool.c).
7 * ---------------------------------------------------------------------------------------
8 *
9 * Permission is hereby granted, free of charge, to any person obtaining a copy
10 * of this software and associated documentation files (the "Software"), to deal
11 * in the Software without restriction, including without limitation the rights
12 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 * copies of the Software, and to permit persons to whom the Software is
14 * furnished to do so, subject to the following conditions:
15 *
16 * The above copyright notice and this permission notice shall be included in
17 * all copies or substantial portions of the Software.
18 *
19 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 * THE SOFTWARE
26 */
27
28#include <stdlib.h>
29#include <boolean.h>
30
31#include <rthreads/rthreads.h>
32#include <rthreads/tpool.h>
33
34/* Work object which will sit in a queue
35 * waiting for the pool to process it.
36 *
37 * It is a singly linked list acting as a FIFO queue. */
38struct tpool_work
39{
40 thread_func_t func; /* Function to be called. */
41 void *arg; /* Data to be passed to func. */
42 struct tpool_work *next; /* Next work item in the queue. */
43};
44typedef struct tpool_work tpool_work_t;
45
46struct tpool
47{
48 tpool_work_t *work_first; /* First work item in the work queue. */
49 tpool_work_t *work_last; /* Last work item in the work queue. */
50 slock_t *work_mutex; /* Mutex protecting inserting and removing work from the work queue. */
51 scond_t *work_cond; /* Conditional to signal when there is work to process. */
52 scond_t *working_cond; /* Conditional to signal when there is no work processing.
53 This will also signal when there are no threads running. */
54 size_t working_cnt; /* The number of threads processing work (Not waiting for work). */
55 size_t thread_cnt; /* Total number of threads within the pool. */
56 bool stop; /* Marker to tell the work threads to exit. */
57};
58
59static tpool_work_t *tpool_work_create(thread_func_t func, void *arg)
60{
61 tpool_work_t *work;
62
63 if (!func)
64 return NULL;
65
66 work = (tpool_work_t*)calloc(1, sizeof(*work));
67 work->func = func;
68 work->arg = arg;
69 work->next = NULL;
70 return work;
71}
72
73static void tpool_work_destroy(tpool_work_t *work)
74{
75 if (work)
76 free(work);
77}
78
79/* Pull the first work item out of the queue. */
80static tpool_work_t *tpool_work_get(tpool_t *tp)
81{
82 tpool_work_t *work;
83
84 if (!tp)
85 return NULL;
86
87 work = tp->work_first;
88 if (!work)
89 return NULL;
90
91 if (!work->next)
92 {
93 tp->work_first = NULL;
94 tp->work_last = NULL;
95 }
96 else
97 tp->work_first = work->next;
98
99 return work;
100}
101
102static void tpool_worker(void *arg)
103{
104 tpool_work_t *work = NULL;
105 tpool_t *tp = (tpool_t*)arg;
106
107 for (;;)
108 {
109 slock_lock(tp->work_mutex);
110 /* Keep running until told to stop. */
111 if (tp->stop)
112 break;
113
114 /* If there is no work in the queue wait in the conditional until
115 * there is work to take. */
116 if (!tp->work_first)
117 scond_wait(tp->work_cond, tp->work_mutex);
118
119 /* Try to pull work from the queue. */
120 work = tpool_work_get(tp);
121 tp->working_cnt++;
122 slock_unlock(tp->work_mutex);
123
124 /* Call the work function and let it process.
125 *
126 * work can legitimately be NULL. Since multiple threads from the pool
127 * will wake when there is work, a thread might not get any work. 1
128 * piece of work and 2 threads, both will wake but with 1 only work 1
129 * will get the work and the other won't.
130 *
131 * working_cnt has been increment and work could be NULL. While it's
132 * not true there is work processing the thread is considered working
133 * because it's not waiting in the conditional. Pedantic but...
134 */
135 if (work)
136 {
137 work->func(work->arg);
138 tpool_work_destroy(work);
139 }
140
141 slock_lock(tp->work_mutex);
142 tp->working_cnt--;
143 /* Since we're in a lock no work can be added or removed form the queue.
144 * Also, the working_cnt can't be changed (except the thread holding the lock).
145 * At this point if there isn't any work processing and if there is no work
146 * signal this is the case. */
147 if (!tp->stop && tp->working_cnt == 0 && !tp->work_first)
148 scond_signal(tp->working_cond);
149 slock_unlock(tp->work_mutex);
150 }
151
152 tp->thread_cnt--;
153 if (tp->thread_cnt == 0)
154 scond_signal(tp->working_cond);
155 slock_unlock(tp->work_mutex);
156}
157
158tpool_t *tpool_create(size_t num)
159{
160 tpool_t *tp;
161 sthread_t *thread;
162 size_t i;
163
164 if (num == 0)
165 num = 2;
166
167 tp = (tpool_t*)calloc(1, sizeof(*tp));
168 tp->thread_cnt = num;
169
170 tp->work_mutex = slock_new();
171 tp->work_cond = scond_new();
172 tp->working_cond = scond_new();
173
174 tp->work_first = NULL;
175 tp->work_last = NULL;
176
177 /* Create the requested number of thread and detach them. */
178 for (i = 0; i < num; i++)
179 {
180 thread = sthread_create(tpool_worker, tp);
181 sthread_detach(thread);
182 }
183
184 return tp;
185}
186
187void tpool_destroy(tpool_t *tp)
188{
189 tpool_work_t *work;
190 tpool_work_t *work2;
191
192 if (!tp)
193 return;
194
195 /* Take all work out of the queue and destroy it. */
196 slock_lock(tp->work_mutex);
197 work = tp->work_first;
198 while (work)
199 {
200 work2 = work->next;
201 tpool_work_destroy(work);
202 work = work2;
203 }
204
205 /* Tell the worker threads to stop. */
206 tp->stop = true;
207 scond_broadcast(tp->work_cond);
208 slock_unlock(tp->work_mutex);
209
210 /* Wait for all threads to stop. */
211 tpool_wait(tp);
212
213 slock_free(tp->work_mutex);
214 scond_free(tp->work_cond);
215 scond_free(tp->working_cond);
216
217 free(tp);
218}
219
220bool tpool_add_work(tpool_t *tp, thread_func_t func, void *arg)
221{
222 tpool_work_t *work;
223
224 if (!tp)
225 return false;
226
227 work = tpool_work_create(func, arg);
228 if (!work)
229 return false;
230
231 slock_lock(tp->work_mutex);
232 if (!tp->work_first)
233 {
234 tp->work_first = work;
235 tp->work_last = tp->work_first;
236 }
237 else
238 {
239 tp->work_last->next = work;
240 tp->work_last = work;
241 }
242
243 scond_broadcast(tp->work_cond);
244 slock_unlock(tp->work_mutex);
245
246 return true;
247}
248
249void tpool_wait(tpool_t *tp)
250{
251 if (!tp)
252 return;
253
254 slock_lock(tp->work_mutex);
255
256 for (;;)
257 {
258 /* working_cond is dual use. It signals when we're not stopping but the
259 * working_cnt is 0 indicating there isn't any work processing. If we
260 * are stopping it will trigger when there aren't any threads running. */
261 if ((!tp->stop && tp->working_cnt != 0) || (tp->stop && tp->thread_cnt != 0))
262 scond_wait(tp->working_cond, tp->work_mutex);
263 else
264 break;
265 }
266
267 slock_unlock(tp->work_mutex);
268}