Commit | Line | Data |
---|---|---|
3719602c PC |
1 | /* |
2 | * Copyright (c) 2010-2020 The RetroArch team | |
3 | * Copyright (c) 2017 John Schember <john@nachtimwald.com> | |
4 | * | |
5 | * --------------------------------------------------------------------------------------- | |
6 | * The following license statement only applies to this file (tpool.c). | |
7 | * --------------------------------------------------------------------------------------- | |
8 | * | |
9 | * Permission is hereby granted, free of charge, to any person obtaining a copy | |
10 | * of this software and associated documentation files (the "Software"), to deal | |
11 | * in the Software without restriction, including without limitation the rights | |
12 | * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
13 | * copies of the Software, and to permit persons to whom the Software is | |
14 | * furnished to do so, subject to the following conditions: | |
15 | * | |
16 | * The above copyright notice and this permission notice shall be included in | |
17 | * all copies or substantial portions of the Software. | |
18 | * | |
19 | * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
20 | * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
21 | * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
22 | * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
23 | * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
24 | * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
25 | * THE SOFTWARE | |
26 | */ | |
27 | ||
28 | #include <stdlib.h> | |
29 | #include <boolean.h> | |
30 | ||
31 | #include <rthreads/rthreads.h> | |
32 | #include <rthreads/tpool.h> | |
33 | ||
34 | /* Work object which will sit in a queue | |
35 | * waiting for the pool to process it. | |
36 | * | |
37 | * It is a singly linked list acting as a FIFO queue. */ | |
38 | struct tpool_work | |
39 | { | |
40 | thread_func_t func; /* Function to be called. */ | |
41 | void *arg; /* Data to be passed to func. */ | |
42 | struct tpool_work *next; /* Next work item in the queue. */ | |
43 | }; | |
44 | typedef struct tpool_work tpool_work_t; | |
45 | ||
46 | struct tpool | |
47 | { | |
48 | tpool_work_t *work_first; /* First work item in the work queue. */ | |
49 | tpool_work_t *work_last; /* Last work item in the work queue. */ | |
50 | slock_t *work_mutex; /* Mutex protecting inserting and removing work from the work queue. */ | |
51 | scond_t *work_cond; /* Conditional to signal when there is work to process. */ | |
52 | scond_t *working_cond; /* Conditional to signal when there is no work processing. | |
53 | This will also signal when there are no threads running. */ | |
54 | size_t working_cnt; /* The number of threads processing work (Not waiting for work). */ | |
55 | size_t thread_cnt; /* Total number of threads within the pool. */ | |
56 | bool stop; /* Marker to tell the work threads to exit. */ | |
57 | }; | |
58 | ||
59 | static tpool_work_t *tpool_work_create(thread_func_t func, void *arg) | |
60 | { | |
61 | tpool_work_t *work; | |
62 | ||
63 | if (!func) | |
64 | return NULL; | |
65 | ||
66 | work = (tpool_work_t*)calloc(1, sizeof(*work)); | |
67 | work->func = func; | |
68 | work->arg = arg; | |
69 | work->next = NULL; | |
70 | return work; | |
71 | } | |
72 | ||
73 | static void tpool_work_destroy(tpool_work_t *work) | |
74 | { | |
75 | if (work) | |
76 | free(work); | |
77 | } | |
78 | ||
79 | /* Pull the first work item out of the queue. */ | |
80 | static tpool_work_t *tpool_work_get(tpool_t *tp) | |
81 | { | |
82 | tpool_work_t *work; | |
83 | ||
84 | if (!tp) | |
85 | return NULL; | |
86 | ||
87 | work = tp->work_first; | |
88 | if (!work) | |
89 | return NULL; | |
90 | ||
91 | if (!work->next) | |
92 | { | |
93 | tp->work_first = NULL; | |
94 | tp->work_last = NULL; | |
95 | } | |
96 | else | |
97 | tp->work_first = work->next; | |
98 | ||
99 | return work; | |
100 | } | |
101 | ||
102 | static void tpool_worker(void *arg) | |
103 | { | |
104 | tpool_work_t *work = NULL; | |
105 | tpool_t *tp = (tpool_t*)arg; | |
106 | ||
107 | for (;;) | |
108 | { | |
109 | slock_lock(tp->work_mutex); | |
110 | /* Keep running until told to stop. */ | |
111 | if (tp->stop) | |
112 | break; | |
113 | ||
114 | /* If there is no work in the queue wait in the conditional until | |
115 | * there is work to take. */ | |
116 | if (!tp->work_first) | |
117 | scond_wait(tp->work_cond, tp->work_mutex); | |
118 | ||
119 | /* Try to pull work from the queue. */ | |
120 | work = tpool_work_get(tp); | |
121 | tp->working_cnt++; | |
122 | slock_unlock(tp->work_mutex); | |
123 | ||
124 | /* Call the work function and let it process. | |
125 | * | |
126 | * work can legitimately be NULL. Since multiple threads from the pool | |
127 | * will wake when there is work, a thread might not get any work. 1 | |
128 | * piece of work and 2 threads, both will wake but with 1 only work 1 | |
129 | * will get the work and the other won't. | |
130 | * | |
131 | * working_cnt has been increment and work could be NULL. While it's | |
132 | * not true there is work processing the thread is considered working | |
133 | * because it's not waiting in the conditional. Pedantic but... | |
134 | */ | |
135 | if (work) | |
136 | { | |
137 | work->func(work->arg); | |
138 | tpool_work_destroy(work); | |
139 | } | |
140 | ||
141 | slock_lock(tp->work_mutex); | |
142 | tp->working_cnt--; | |
143 | /* Since we're in a lock no work can be added or removed form the queue. | |
144 | * Also, the working_cnt can't be changed (except the thread holding the lock). | |
145 | * At this point if there isn't any work processing and if there is no work | |
146 | * signal this is the case. */ | |
147 | if (!tp->stop && tp->working_cnt == 0 && !tp->work_first) | |
148 | scond_signal(tp->working_cond); | |
149 | slock_unlock(tp->work_mutex); | |
150 | } | |
151 | ||
152 | tp->thread_cnt--; | |
153 | if (tp->thread_cnt == 0) | |
154 | scond_signal(tp->working_cond); | |
155 | slock_unlock(tp->work_mutex); | |
156 | } | |
157 | ||
158 | tpool_t *tpool_create(size_t num) | |
159 | { | |
160 | tpool_t *tp; | |
161 | sthread_t *thread; | |
162 | size_t i; | |
163 | ||
164 | if (num == 0) | |
165 | num = 2; | |
166 | ||
167 | tp = (tpool_t*)calloc(1, sizeof(*tp)); | |
168 | tp->thread_cnt = num; | |
169 | ||
170 | tp->work_mutex = slock_new(); | |
171 | tp->work_cond = scond_new(); | |
172 | tp->working_cond = scond_new(); | |
173 | ||
174 | tp->work_first = NULL; | |
175 | tp->work_last = NULL; | |
176 | ||
177 | /* Create the requested number of thread and detach them. */ | |
178 | for (i = 0; i < num; i++) | |
179 | { | |
180 | thread = sthread_create(tpool_worker, tp); | |
181 | sthread_detach(thread); | |
182 | } | |
183 | ||
184 | return tp; | |
185 | } | |
186 | ||
187 | void tpool_destroy(tpool_t *tp) | |
188 | { | |
189 | tpool_work_t *work; | |
190 | tpool_work_t *work2; | |
191 | ||
192 | if (!tp) | |
193 | return; | |
194 | ||
195 | /* Take all work out of the queue and destroy it. */ | |
196 | slock_lock(tp->work_mutex); | |
197 | work = tp->work_first; | |
198 | while (work) | |
199 | { | |
200 | work2 = work->next; | |
201 | tpool_work_destroy(work); | |
202 | work = work2; | |
203 | } | |
204 | ||
205 | /* Tell the worker threads to stop. */ | |
206 | tp->stop = true; | |
207 | scond_broadcast(tp->work_cond); | |
208 | slock_unlock(tp->work_mutex); | |
209 | ||
210 | /* Wait for all threads to stop. */ | |
211 | tpool_wait(tp); | |
212 | ||
213 | slock_free(tp->work_mutex); | |
214 | scond_free(tp->work_cond); | |
215 | scond_free(tp->working_cond); | |
216 | ||
217 | free(tp); | |
218 | } | |
219 | ||
220 | bool tpool_add_work(tpool_t *tp, thread_func_t func, void *arg) | |
221 | { | |
222 | tpool_work_t *work; | |
223 | ||
224 | if (!tp) | |
225 | return false; | |
226 | ||
227 | work = tpool_work_create(func, arg); | |
228 | if (!work) | |
229 | return false; | |
230 | ||
231 | slock_lock(tp->work_mutex); | |
232 | if (!tp->work_first) | |
233 | { | |
234 | tp->work_first = work; | |
235 | tp->work_last = tp->work_first; | |
236 | } | |
237 | else | |
238 | { | |
239 | tp->work_last->next = work; | |
240 | tp->work_last = work; | |
241 | } | |
242 | ||
243 | scond_broadcast(tp->work_cond); | |
244 | slock_unlock(tp->work_mutex); | |
245 | ||
246 | return true; | |
247 | } | |
248 | ||
249 | void tpool_wait(tpool_t *tp) | |
250 | { | |
251 | if (!tp) | |
252 | return; | |
253 | ||
254 | slock_lock(tp->work_mutex); | |
255 | ||
256 | for (;;) | |
257 | { | |
258 | /* working_cond is dual use. It signals when we're not stopping but the | |
259 | * working_cnt is 0 indicating there isn't any work processing. If we | |
260 | * are stopping it will trigger when there aren't any threads running. */ | |
261 | if ((!tp->stop && tp->working_cnt != 0) || (tp->stop && tp->thread_cnt != 0)) | |
262 | scond_wait(tp->working_cond, tp->work_mutex); | |
263 | else | |
264 | break; | |
265 | } | |
266 | ||
267 | slock_unlock(tp->work_mutex); | |
268 | } |