1 /* Copyright (C) 2010-2020 The RetroArch team
3 * ---------------------------------------------------------------------------------------
4 * The following license statement only applies to this file (task_queue.c).
5 * ---------------------------------------------------------------------------------------
7 * Permission is hereby granted, free of charge,
8 * to any person obtaining a copy of this software and associated documentation files (the "Software"),
9 * to deal in the Software without restriction, including without limitation the rights to
10 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
11 * and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
13 * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
16 * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
18 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 #include <queues/task_queue.h>
29 #include <features/features_cpu.h>
32 #include <rthreads/rthreads.h>
41 struct retro_task_impl
43 retro_task_queue_msg_t msg_push;
44 void (*push_running)(retro_task_t *);
45 void (*cancel)(void *);
47 void (*wait)(retro_task_condition_fn_t, void *);
49 bool (*find)(retro_task_finder_t, void*);
50 void (*retrieve)(task_retriever_data_t *data);
55 /* TODO/FIXME - static globals */
56 static retro_task_queue_msg_t msg_push_bak = NULL;
57 static task_queue_t tasks_running = {NULL, NULL};
58 static task_queue_t tasks_finished = {NULL, NULL};
60 static struct retro_task_impl *impl_current = NULL;
61 static bool task_threaded_enable = false;
64 static uintptr_t main_thread_id = 0;
65 static slock_t *running_lock = NULL;
66 static slock_t *finished_lock = NULL;
67 static slock_t *property_lock = NULL;
68 static slock_t *queue_lock = NULL;
69 static scond_t *worker_cond = NULL;
70 static sthread_t *worker_thread = NULL;
71 static bool worker_continue = true;
72 /* use running_lock when touching it */
75 static void task_queue_msg_push(retro_task_t *task,
76 unsigned prio, unsigned duration,
77 bool flush, const char *fmt, ...)
85 vsnprintf(buf, sizeof(buf), fmt, ap);
88 if (impl_current->msg_push)
89 impl_current->msg_push(task, buf, prio, duration, flush);
92 static void task_queue_push_progress(retro_task_t *task)
95 /* msg_push callback interacts directly with the task properties (particularly title).
96 * make sure another thread doesn't modify them while rendering
98 slock_lock(property_lock);
101 if (task->title && !task->mute)
106 task_queue_msg_push(task, 1, 60, true, "%s: %s",
107 "Task failed", task->title);
109 task_queue_msg_push(task, 1, 60, false, "100%%: %s", task->title);
113 if (task->progress >= 0 && task->progress <= 100)
114 task_queue_msg_push(task, 1, 60, true, "%i%%: %s",
115 task->progress, task->title);
117 task_queue_msg_push(task, 1, 60, false, "%s...", task->title);
120 if (task->progress_cb)
121 task->progress_cb(task);
125 slock_unlock(property_lock);
129 static void task_queue_put(task_queue_t *queue, retro_task_t *task)
135 /* Make sure to insert in order - the queue is
136 * sorted by 'when' so items that aren't scheduled
137 * to run immediately are at the back of the queue.
138 * Items with the same 'when' are inserted after
139 * all the other items with the same 'when'.
140 * This primarily affects items with a 'when' of 0.
144 if (queue->back->when > task->when)
146 retro_task_t** prev = &queue->front;
147 while (*prev && (*prev)->when <= task->when)
148 prev = &((*prev)->next);
155 queue->back->next = task;
164 static retro_task_t *task_queue_get(task_queue_t *queue)
166 retro_task_t *task = queue->front;
170 queue->front = task->next;
177 static void retro_task_internal_gather(void)
179 retro_task_t *task = NULL;
180 while ((task = task_queue_get(&tasks_finished)))
182 task_queue_push_progress(task);
185 task->callback(task, task->task_data, task->user_data, task->error);
200 static void retro_task_regular_push_running(retro_task_t *task)
202 task_queue_put(&tasks_running, task);
205 static void retro_task_regular_cancel(void *task)
207 retro_task_t *t = (retro_task_t*)task;
211 static void retro_task_regular_gather(void)
213 retro_task_t *task = NULL;
214 retro_task_t *queue = NULL;
215 retro_task_t *next = NULL;
217 while ((task = task_queue_get(&tasks_running)))
223 for (task = queue; task; task = next)
227 if (!task->when || task->when < cpu_features_get_time_usec())
231 task_queue_push_progress(task);
235 task_queue_put(&tasks_finished, task);
237 retro_task_regular_push_running(task);
240 retro_task_internal_gather();
243 static void retro_task_regular_wait(retro_task_condition_fn_t cond, void* data)
245 while ((tasks_running.front && !tasks_running.front->when) && (!cond || cond(data)))
246 retro_task_regular_gather();
249 static void retro_task_regular_reset(void)
251 retro_task_t *task = tasks_running.front;
253 for (; task; task = task->next)
254 task->cancelled = true;
257 static void retro_task_regular_init(void) { }
258 static void retro_task_regular_deinit(void) { }
260 static bool retro_task_regular_find(retro_task_finder_t func, void *user_data)
262 retro_task_t *task = tasks_running.front;
264 for (; task; task = task->next)
266 if (func(task, user_data))
273 static void retro_task_regular_retrieve(task_retriever_data_t *data)
275 retro_task_t *task = NULL;
276 task_retriever_info_t *tail = NULL;
278 /* Parse all running tasks and handle matching handlers */
279 for (task = tasks_running.front; task != NULL; task = task->next)
281 task_retriever_info_t *info = NULL;
282 if (task->handler != data->handler)
285 /* Create new link */
286 info = (task_retriever_info_t*)
287 malloc(sizeof(task_retriever_info_t));
288 info->data = malloc(data->element_size);
291 /* Call retriever function and fill info-specific data */
292 if (!data->func(task, info->data))
299 /* Add link to list */
318 static struct retro_task_impl impl_regular = {
320 retro_task_regular_push_running,
321 retro_task_regular_cancel,
322 retro_task_regular_reset,
323 retro_task_regular_wait,
324 retro_task_regular_gather,
325 retro_task_regular_find,
326 retro_task_regular_retrieve,
327 retro_task_regular_init,
328 retro_task_regular_deinit
333 /* 'queue_lock' must be held for the duration of this function */
334 static void task_queue_remove(task_queue_t *queue, retro_task_t *task)
336 retro_task_t *t = NULL;
337 retro_task_t *front = queue->front;
339 /* Remove first element if needed */
342 queue->front = task->next;
343 if (queue->back == task) /* if only element, also update back */
354 /* Remove task and update queue */
357 t->next = task->next;
360 /* When removing the tail of the queue, update the tail pointer */
361 if (queue->back == task)
363 if (queue->back == task)
369 /* Update iterator */
374 static void retro_task_threaded_push_running(retro_task_t *task)
376 slock_lock(running_lock);
377 slock_lock(queue_lock);
378 task_queue_put(&tasks_running, task);
379 scond_signal(worker_cond);
380 slock_unlock(queue_lock);
381 slock_unlock(running_lock);
384 static void retro_task_threaded_cancel(void *task)
388 slock_lock(running_lock);
390 for (t = tasks_running.front; t; t = t->next)
399 slock_unlock(running_lock);
402 static void retro_task_threaded_gather(void)
404 retro_task_t *task = NULL;
406 slock_lock(running_lock);
407 for (task = tasks_running.front; task; task = task->next)
408 task_queue_push_progress(task);
409 slock_unlock(running_lock);
411 slock_lock(finished_lock);
412 retro_task_internal_gather();
413 slock_unlock(finished_lock);
416 static void retro_task_threaded_wait(retro_task_condition_fn_t cond, void* data)
422 retro_task_threaded_gather();
424 slock_lock(running_lock);
425 wait = (tasks_running.front && !tasks_running.front->when);
426 slock_unlock(running_lock);
430 slock_lock(finished_lock);
431 wait = (tasks_finished.front && !tasks_finished.front->when);
432 slock_unlock(finished_lock);
434 } while (wait && (!cond || cond(data)));
437 static void retro_task_threaded_reset(void)
439 retro_task_t *task = NULL;
441 slock_lock(running_lock);
442 for (task = tasks_running.front; task; task = task->next)
443 task->cancelled = true;
444 slock_unlock(running_lock);
447 static bool retro_task_threaded_find(
448 retro_task_finder_t func, void *user_data)
450 retro_task_t *task = NULL;
453 slock_lock(running_lock);
454 for (task = tasks_running.front; task; task = task->next)
456 if (func(task, user_data))
462 slock_unlock(running_lock);
467 static void retro_task_threaded_retrieve(task_retriever_data_t *data)
469 /* Protect access to running tasks */
470 slock_lock(running_lock);
472 /* Call regular retrieve function */
473 retro_task_regular_retrieve(data);
475 /* Release access to running tasks */
476 slock_unlock(running_lock);
479 static void threaded_worker(void *userdata)
485 retro_task_t *task = NULL;
486 bool finished = false;
488 if (!worker_continue)
489 break; /* should we keep running until all tasks finished? */
491 slock_lock(running_lock);
493 /* Get first task to run */
494 if (!(task = tasks_running.front))
496 scond_wait(worker_cond, running_lock);
497 slock_unlock(running_lock);
503 retro_time_t now = cpu_features_get_time_usec();
504 retro_time_t delay = task->when - now - 500; /* allow half a millisecond for context switching */
507 scond_wait_timeout(worker_cond, running_lock, delay);
508 slock_unlock(running_lock);
513 slock_unlock(running_lock);
517 slock_lock(property_lock);
518 finished = task->finished;
519 slock_unlock(property_lock);
524 /* Move the task to the back of the queue */
525 /* mimics retro_task_threaded_push_running,
526 * but also includes a task_queue_remove */
527 slock_lock(running_lock);
528 slock_lock(queue_lock);
530 /* do nothing if only item in queue */
533 task_queue_remove(&tasks_running, task);
534 task_queue_put(&tasks_running, task);
535 scond_signal(worker_cond);
537 slock_unlock(queue_lock);
538 slock_unlock(running_lock);
542 /* Remove task from running queue */
543 slock_lock(running_lock);
544 slock_lock(queue_lock);
545 task_queue_remove(&tasks_running, task);
546 slock_unlock(queue_lock);
547 slock_unlock(running_lock);
549 /* Add task to finished queue */
550 slock_lock(finished_lock);
551 task_queue_put(&tasks_finished, task);
552 slock_unlock(finished_lock);
557 static void retro_task_threaded_init(void)
559 running_lock = slock_new();
560 finished_lock = slock_new();
561 property_lock = slock_new();
562 queue_lock = slock_new();
563 worker_cond = scond_new();
565 slock_lock(running_lock);
566 worker_continue = true;
567 slock_unlock(running_lock);
569 worker_thread = sthread_create(threaded_worker, NULL);
572 static void retro_task_threaded_deinit(void)
574 slock_lock(running_lock);
575 worker_continue = false;
576 scond_signal(worker_cond);
577 slock_unlock(running_lock);
579 sthread_join(worker_thread);
581 scond_free(worker_cond);
582 slock_free(running_lock);
583 slock_free(finished_lock);
584 slock_free(property_lock);
585 slock_free(queue_lock);
587 worker_thread = NULL;
590 finished_lock = NULL;
591 property_lock = NULL;
595 static struct retro_task_impl impl_threaded = {
597 retro_task_threaded_push_running,
598 retro_task_threaded_cancel,
599 retro_task_threaded_reset,
600 retro_task_threaded_wait,
601 retro_task_threaded_gather,
602 retro_task_threaded_find,
603 retro_task_threaded_retrieve,
604 retro_task_threaded_init,
605 retro_task_threaded_deinit
609 /* Deinitializes the task system.
610 * This deinitializes the task system.
611 * The tasks that are running at
612 * the moment will stay on hold */
613 void task_queue_deinit(void)
616 impl_current->deinit();
620 void task_queue_init(bool threaded, retro_task_queue_msg_t msg_push)
622 impl_current = &impl_regular;
624 main_thread_id = sthread_get_current_thread_id();
627 task_threaded_enable = true;
628 impl_current = &impl_threaded;
632 msg_push_bak = msg_push;
634 impl_current->msg_push = msg_push;
635 impl_current->init();
638 void task_queue_set_threaded(void)
640 task_threaded_enable = true;
643 void task_queue_unset_threaded(void)
645 task_threaded_enable = false;
648 bool task_queue_is_threaded(void)
650 return task_threaded_enable;
653 bool task_queue_find(task_finder_data_t *find_data)
655 return impl_current->find(find_data->func, find_data->userdata);
658 void task_queue_retrieve(task_retriever_data_t *data)
660 impl_current->retrieve(data);
663 void task_queue_check(void)
666 bool current_threaded = (impl_current == &impl_threaded);
667 bool want_threaded = task_threaded_enable;
669 if (want_threaded != current_threaded)
673 task_queue_init(want_threaded, msg_push_bak);
676 impl_current->gather();
679 bool task_queue_push(retro_task_t *task)
681 /* Ignore this task if a related one is already running */
682 if (task->type == TASK_TYPE_BLOCKING)
684 retro_task_t *running = NULL;
688 slock_lock(queue_lock);
690 running = tasks_running.front;
692 for (; running; running = running->next)
694 if (running->type == TASK_TYPE_BLOCKING)
702 slock_unlock(queue_lock);
705 /* skip this task, user must try again later */
710 /* The lack of NULL checks in the following functions
711 * is proposital to ensure correct control flow by the users. */
712 impl_current->push_running(task);
717 void task_queue_wait(retro_task_condition_fn_t cond, void* data)
719 impl_current->wait(cond, data);
722 void task_queue_reset(void)
724 impl_current->reset();
728 * Signals a task to end without waiting for
730 void task_queue_cancel_task(void *task)
732 impl_current->cancel(task);
735 void *task_queue_retriever_info_next(task_retriever_info_t **link)
739 /* Grab data and move to next link */
742 data = (*link)->data;
743 *link = (*link)->next;
749 void task_queue_retriever_info_free(task_retriever_info_t *list)
751 task_retriever_info_t *info;
753 /* Free links including retriever-specific data */
763 bool task_is_on_main_thread(void)
766 return sthread_get_current_thread_id() == main_thread_id;
772 void task_set_finished(retro_task_t *task, bool finished)
775 slock_lock(property_lock);
777 task->finished = finished;
779 slock_unlock(property_lock);
783 void task_set_mute(retro_task_t *task, bool mute)
786 slock_lock(property_lock);
790 slock_unlock(property_lock);
794 void task_set_error(retro_task_t *task, char *error)
797 slock_lock(property_lock);
801 slock_unlock(property_lock);
805 void task_set_progress(retro_task_t *task, int8_t progress)
808 slock_lock(property_lock);
810 task->progress = progress;
812 slock_unlock(property_lock);
816 void task_set_title(retro_task_t *task, char *title)
819 slock_lock(property_lock);
823 slock_unlock(property_lock);
827 void task_set_data(retro_task_t *task, void *data)
830 slock_lock(running_lock);
832 task->task_data = data;
834 slock_unlock(running_lock);
838 void task_set_cancelled(retro_task_t *task, bool cancelled)
841 slock_lock(running_lock);
843 task->cancelled = cancelled;
845 slock_unlock(running_lock);
849 void task_free_title(retro_task_t *task)
852 slock_lock(property_lock);
858 slock_unlock(property_lock);
862 void* task_get_data(retro_task_t *task)
867 slock_lock(running_lock);
869 data = task->task_data;
871 slock_unlock(running_lock);
877 bool task_get_cancelled(retro_task_t *task)
879 bool cancelled = false;
882 slock_lock(running_lock);
884 cancelled = task->cancelled;
886 slock_unlock(running_lock);
892 bool task_get_finished(retro_task_t *task)
894 bool finished = false;
897 slock_lock(property_lock);
899 finished = task->finished;
901 slock_unlock(property_lock);
907 bool task_get_mute(retro_task_t *task)
912 slock_lock(property_lock);
916 slock_unlock(property_lock);
922 char* task_get_error(retro_task_t *task)
927 slock_lock(property_lock);
931 slock_unlock(property_lock);
937 int8_t task_get_progress(retro_task_t *task)
942 slock_lock(property_lock);
944 progress = task->progress;
946 slock_unlock(property_lock);
952 char* task_get_title(retro_task_t *task)
957 slock_lock(property_lock);
961 slock_unlock(property_lock);
967 retro_task_t *task_init(void)
969 /* TODO/FIXME - static local global */
970 static uint32_t task_count = 0;
971 retro_task_t *task = (retro_task_t*)malloc(sizeof(*task));
976 task->handler = NULL;
977 task->callback = NULL;
978 task->cleanup = NULL;
979 task->finished = false;
980 task->cancelled = false;
982 task->task_data = NULL;
983 task->user_data = NULL;
987 task->progress_cb = NULL;
989 task->type = TASK_TYPE_NONE;
990 task->ident = task_count++;
991 task->frontend_userdata = NULL;
992 task->alternative_look = false;