2 * Copyright (c) Meta Platforms, Inc. and affiliates.
5 * This source code is licensed under both the BSD-style license (found in the
6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7 * in the COPYING file in the root directory of this source tree).
11 #include "utils/Buffer.h"
16 #include <condition_variable>
24 /// Unbounded thread-safe work queue.
27 // Protects all member variable access
29 std::condition_variable readerCv_;
30 std::condition_variable writerCv_;
31 std::condition_variable finishCv_;
37 // Must have lock to call this function
42 return queue_.size() >= maxSize_;
47 * Constructs an empty work queue with an optional max size.
48 * If `maxSize == 0` the queue size is unbounded.
50 * @param maxSize The maximum allowed size of the work queue.
52 WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {}
55 * Push an item onto the work queue. Notify a single thread that work is
56 * available. If `finish()` has been called, do nothing and return false.
57 * If `push()` returns false, then `item` has not been moved from.
59 * @param item Item to push onto the queue.
60 * @returns True upon success, false if `finish()` has been called. An
61 * item was pushed iff `push()` returns true.
65 std::unique_lock<std::mutex> lock(mutex_);
66 while (full() && !done_) {
72 queue_.push(std::move(item));
74 readerCv_.notify_one();
79 * Attempts to pop an item off the work queue. It will block until data is
80 * available or `finish()` has been called.
82 * @param[out] item If `pop` returns `true`, it contains the popped item.
83 * If `pop` returns `false`, it is unmodified.
84 * @returns True upon success. False if the queue is empty and
85 * `finish()` has been called.
89 std::unique_lock<std::mutex> lock(mutex_);
90 while (queue_.empty() && !done_) {
97 item = std::move(queue_.front());
100 writerCv_.notify_one();
105 * Sets the maximum queue size. If `maxSize == 0` then it is unbounded.
107 * @param maxSize The new maximum queue size.
109 void setMaxSize(std::size_t maxSize) {
111 std::lock_guard<std::mutex> lock(mutex_);
114 writerCv_.notify_all();
118 * Promise that `push()` won't be called again, so once the queue is empty
119 * there will never any more work.
123 std::lock_guard<std::mutex> lock(mutex_);
127 readerCv_.notify_all();
128 writerCv_.notify_all();
129 finishCv_.notify_all();
132 /// Blocks until `finish()` has been called (but the queue may not be empty).
133 void waitUntilFinished() {
134 std::unique_lock<std::mutex> lock(mutex_);
136 finishCv_.wait(lock);
141 /// Work queue for `Buffer`s that knows the total number of bytes in the queue.
142 class BufferWorkQueue {
143 WorkQueue<Buffer> queue_;
144 std::atomic<std::size_t> size_;
147 BufferWorkQueue(std::size_t maxSize = 0) : queue_(maxSize), size_(0) {}
149 void push(Buffer buffer) {
150 size_.fetch_add(buffer.size());
151 queue_.push(std::move(buffer));
154 bool pop(Buffer& buffer) {
155 bool result = queue_.pop(buffer);
157 size_.fetch_sub(buffer.size());
162 void setMaxSize(std::size_t maxSize) {
163 queue_.setMaxSize(maxSize);
171 * Blocks until `finish()` has been called.
173 * @returns The total number of bytes of all the `Buffer`s currently in the
177 queue_.waitUntilFinished();