drc: handle upto 64k page size
[pcsx_rearmed.git] / deps / libretro-common / rthreads / tpool.c
1 /*
2  * Copyright (c) 2010-2020 The RetroArch team
3  * Copyright (c) 2017 John Schember <john@nachtimwald.com>
4  *
5  * ---------------------------------------------------------------------------------------
6  * The following license statement only applies to this file (tpool.c).
7  * ---------------------------------------------------------------------------------------
8  * 
9  * Permission is hereby granted, free of charge, to any person obtaining a copy
10  * of this software and associated documentation files (the "Software"), to deal
11  * in the Software without restriction, including without limitation the rights
12  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13  * copies of the Software, and to permit persons to whom the Software is
14  * furnished to do so, subject to the following conditions:
15  * 
16  * The above copyright notice and this permission notice shall be included in
17  * all copies or substantial portions of the Software.
18  * 
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25  * THE SOFTWARE
26  */
27
28 #include <stdlib.h>
29 #include <boolean.h>
30
31 #include <rthreads/rthreads.h>
32 #include <rthreads/tpool.h>
33
34 /* Work object which will sit in a queue
35  * waiting for the pool to process it.
36  *
37  * It is a singly linked list acting as a FIFO queue. */
38 struct tpool_work
39 {
40    thread_func_t      func;  /* Function to be called. */
41    void              *arg;   /* Data to be passed to func. */
42    struct tpool_work *next;  /* Next work item in the queue. */
43 };
44 typedef struct tpool_work tpool_work_t;
45
46 struct tpool
47 {
48    tpool_work_t    *work_first;   /* First work item in the work queue. */
49    tpool_work_t    *work_last;    /* Last work item in the work queue. */
50    slock_t         *work_mutex;   /* Mutex protecting inserting and removing work from the work queue. */
51    scond_t         *work_cond;    /* Conditional to signal when there is work to process. */
52    scond_t         *working_cond; /* Conditional to signal when there is no work processing.
53                                        This will also signal when there are no threads running. */
54    size_t           working_cnt;  /* The number of threads processing work (Not waiting for work). */
55    size_t           thread_cnt;   /* Total number of threads within the pool. */
56    bool             stop;         /* Marker to tell the work threads to exit. */
57 };
58
59 static tpool_work_t *tpool_work_create(thread_func_t func, void *arg)
60 {
61    tpool_work_t *work;
62
63    if (!func)
64       return NULL;
65
66    work       = (tpool_work_t*)calloc(1, sizeof(*work));
67    work->func = func;
68    work->arg  = arg;
69    work->next = NULL;
70    return work;
71 }
72
73 static void tpool_work_destroy(tpool_work_t *work)
74 {
75    if (work)
76       free(work);
77 }
78
79 /* Pull the first work item out of the queue. */
80 static tpool_work_t *tpool_work_get(tpool_t *tp)
81 {
82    tpool_work_t *work;
83
84    if (!tp)
85       return NULL;
86
87    work = tp->work_first;
88    if (!work)
89       return NULL;
90
91    if (!work->next)
92    {
93       tp->work_first = NULL;
94       tp->work_last  = NULL;
95    }
96    else
97       tp->work_first = work->next;
98
99    return work;
100 }
101
102 static void tpool_worker(void *arg)
103 {
104    tpool_work_t *work = NULL;
105    tpool_t      *tp   = (tpool_t*)arg;
106
107    for (;;)
108    {
109       slock_lock(tp->work_mutex);
110       /* Keep running until told to stop. */
111       if (tp->stop)
112          break;
113
114       /* If there is no work in the queue wait in the conditional until
115        * there is work to take. */
116       if (!tp->work_first)
117          scond_wait(tp->work_cond, tp->work_mutex);
118
119       /* Try to pull work from the queue. */
120       work = tpool_work_get(tp);
121       tp->working_cnt++;
122       slock_unlock(tp->work_mutex);
123
124       /* Call the work function and let it process.
125        *
126        * work can legitimately be NULL. Since multiple threads from the pool
127        * will wake when there is work, a thread might not get any work. 1
128        * piece of work and 2 threads, both will wake but with 1 only work 1
129        * will get the work and the other won't.
130        *
131        * working_cnt has been increment and work could be NULL. While it's
132        * not true there is work processing the thread is considered working
133        * because it's not waiting in the conditional. Pedantic but...
134        */
135       if (work)
136       {
137          work->func(work->arg);
138          tpool_work_destroy(work);
139       }
140
141       slock_lock(tp->work_mutex);
142       tp->working_cnt--;
143       /* Since we're in a lock no work can be added or removed form the queue.
144        * Also, the working_cnt can't be changed (except the thread holding the lock).
145        * At this point if there isn't any work processing and if there is no work
146        * signal this is the case. */
147       if (!tp->stop && tp->working_cnt == 0 && !tp->work_first)
148          scond_signal(tp->working_cond);
149       slock_unlock(tp->work_mutex);
150    }
151
152    tp->thread_cnt--;
153    if (tp->thread_cnt == 0)
154       scond_signal(tp->working_cond);
155    slock_unlock(tp->work_mutex);
156 }
157
158 tpool_t *tpool_create(size_t num)
159 {
160    tpool_t   *tp;
161    sthread_t *thread;
162    size_t     i;
163
164    if (num == 0)
165       num = 2;
166
167    tp               = (tpool_t*)calloc(1, sizeof(*tp));
168    tp->thread_cnt   = num;
169
170    tp->work_mutex   = slock_new();
171    tp->work_cond    = scond_new();
172    tp->working_cond = scond_new();
173
174    tp->work_first   = NULL;
175    tp->work_last    = NULL;
176
177    /* Create the requested number of thread and detach them. */
178    for (i = 0; i < num; i++)
179    {
180       thread = sthread_create(tpool_worker, tp);
181       sthread_detach(thread);
182    }
183
184    return tp;
185 }
186
187 void tpool_destroy(tpool_t *tp)
188 {
189    tpool_work_t *work;
190    tpool_work_t *work2;
191
192    if (!tp)
193       return;
194
195    /* Take all work out of the queue and destroy it. */
196    slock_lock(tp->work_mutex);
197    work = tp->work_first;
198    while (work)
199    {
200       work2 = work->next;
201       tpool_work_destroy(work);
202       work = work2;
203    }
204
205    /* Tell the worker threads to stop. */
206    tp->stop = true;
207    scond_broadcast(tp->work_cond);
208    slock_unlock(tp->work_mutex);
209
210    /* Wait for all threads to stop. */
211    tpool_wait(tp);
212
213    slock_free(tp->work_mutex);
214    scond_free(tp->work_cond);
215    scond_free(tp->working_cond);
216
217    free(tp);
218 }
219
220 bool tpool_add_work(tpool_t *tp, thread_func_t func, void *arg)
221 {
222    tpool_work_t *work;
223
224    if (!tp)
225       return false;
226
227    work = tpool_work_create(func, arg);
228    if (!work)
229       return false;
230
231    slock_lock(tp->work_mutex);
232    if (!tp->work_first)
233    {
234       tp->work_first      = work;
235       tp->work_last       = tp->work_first;
236    }
237    else
238    {
239       tp->work_last->next = work;
240       tp->work_last       = work;
241    }
242
243    scond_broadcast(tp->work_cond);
244    slock_unlock(tp->work_mutex);
245
246    return true;
247 }
248
249 void tpool_wait(tpool_t *tp)
250 {
251    if (!tp)
252       return;
253
254    slock_lock(tp->work_mutex);
255
256    for (;;)
257    {
258       /* working_cond is dual use. It signals when we're not stopping but the
259        * working_cnt is 0 indicating there isn't any work processing. If we
260        * are stopping it will trigger when there aren't any threads running. */
261       if ((!tp->stop && tp->working_cnt != 0) || (tp->stop && tp->thread_cnt != 0))
262          scond_wait(tp->working_cond, tp->work_mutex);
263       else
264          break;
265    }
266
267    slock_unlock(tp->work_mutex);
268 }