2 * Copyright (c) Meta Platforms, Inc. and affiliates.
5 * This source code is licensed under both the BSD-style license (found in the
6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7 * in the COPYING file in the root directory of this source tree).
9 #include "utils/Buffer.h"
10 #include "utils/WorkQueue.h"
12 #include <gtest/gtest.h>
19 using namespace pzstd;
23 WorkQueue<int>* queue;
29 while (queue->pop(result)) {
30 std::lock_guard<std::mutex> lock(*mutex);
31 results[result] = result;
37 TEST(WorkQueue, SingleThreaded) {
42 EXPECT_TRUE(queue.pop(result));
47 EXPECT_TRUE(queue.pop(result));
49 EXPECT_TRUE(queue.pop(result));
55 EXPECT_TRUE(queue.pop(result));
57 EXPECT_TRUE(queue.pop(result));
59 EXPECT_FALSE(queue.pop(result));
61 queue.waitUntilFinished();
64 TEST(WorkQueue, SPSC) {
68 for (int i = 0; i < 10; ++i) {
72 std::thread thread([ &queue, max ] {
74 for (int i = 0;; ++i) {
75 if (!queue.pop(result)) {
83 std::this_thread::yield();
84 for (int i = 10; i < max; ++i) {
92 TEST(WorkQueue, SPMC) {
94 std::vector<int> results(50, -1);
96 std::vector<std::thread> threads;
97 for (int i = 0; i < 5; ++i) {
98 threads.emplace_back(Popper{&queue, results.data(), &mutex});
101 for (int i = 0; i < 50; ++i) {
106 for (auto& thread : threads) {
110 for (int i = 0; i < 50; ++i) {
111 EXPECT_EQ(i, results[i]);
115 TEST(WorkQueue, MPMC) {
116 WorkQueue<int> queue;
117 std::vector<int> results(100, -1);
119 std::vector<std::thread> popperThreads;
120 for (int i = 0; i < 4; ++i) {
121 popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
124 std::vector<std::thread> pusherThreads;
125 for (int i = 0; i < 2; ++i) {
127 auto max = (i + 1) * 50;
128 pusherThreads.emplace_back(
129 [ &queue, min, max ] {
130 for (int i = min; i < max; ++i) {
136 for (auto& thread : pusherThreads) {
141 for (auto& thread : popperThreads) {
145 for (int i = 0; i < 100; ++i) {
146 EXPECT_EQ(i, results[i]);
150 TEST(WorkQueue, BoundedSizeWorks) {
151 WorkQueue<int> queue(1);
160 EXPECT_EQ(5, result);
163 TEST(WorkQueue, BoundedSizePushAfterFinish) {
164 WorkQueue<int> queue(1);
167 std::thread pusher([&queue] {
170 // Dirtily try and make sure that pusher has run.
171 std::this_thread::sleep_for(std::chrono::seconds(1));
173 EXPECT_TRUE(queue.pop(result));
174 EXPECT_EQ(5, result);
175 EXPECT_FALSE(queue.pop(result));
180 TEST(WorkQueue, SetMaxSize) {
181 WorkQueue<int> queue(2);
186 std::thread pusher([&queue] {
189 // Dirtily try and make sure that pusher has run.
190 std::this_thread::sleep_for(std::chrono::seconds(1));
192 EXPECT_TRUE(queue.pop(result));
193 EXPECT_EQ(5, result);
194 EXPECT_TRUE(queue.pop(result));
195 EXPECT_EQ(6, result);
196 EXPECT_FALSE(queue.pop(result));
201 TEST(WorkQueue, BoundedSizeMPMC) {
202 WorkQueue<int> queue(10);
203 std::vector<int> results(200, -1);
205 std::cerr << "Creating popperThreads" << std::endl;
206 std::vector<std::thread> popperThreads;
207 for (int i = 0; i < 4; ++i) {
208 popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
211 std::cerr << "Creating pusherThreads" << std::endl;
212 std::vector<std::thread> pusherThreads;
213 for (int i = 0; i < 2; ++i) {
215 auto max = (i + 1) * 100;
216 pusherThreads.emplace_back(
217 [ &queue, min, max ] {
218 for (int i = min; i < max; ++i) {
224 std::cerr << "Joining pusherThreads" << std::endl;
225 for (auto& thread : pusherThreads) {
228 std::cerr << "Finishing queue" << std::endl;
231 std::cerr << "Joining popperThreads" << std::endl;
232 for (auto& thread : popperThreads) {
236 std::cerr << "Inspecting results" << std::endl;
237 for (int i = 0; i < 200; ++i) {
238 EXPECT_EQ(i, results[i]);
242 TEST(WorkQueue, FailedPush) {
243 WorkQueue<std::unique_ptr<int>> queue;
244 std::unique_ptr<int> x(new int{5});
245 EXPECT_TRUE(queue.push(std::move(x)));
246 EXPECT_EQ(nullptr, x);
249 EXPECT_FALSE(queue.push(std::move(x)));
250 EXPECT_NE(nullptr, x);
254 TEST(BufferWorkQueue, SizeCalculatedCorrectly) {
256 BufferWorkQueue queue;
258 EXPECT_EQ(0, queue.size());
261 BufferWorkQueue queue;
262 queue.push(Buffer(10));
264 EXPECT_EQ(10, queue.size());
267 BufferWorkQueue queue;
268 queue.push(Buffer(10));
269 queue.push(Buffer(5));
271 EXPECT_EQ(15, queue.size());
274 BufferWorkQueue queue;
275 queue.push(Buffer(10));
276 queue.push(Buffer(5));
280 EXPECT_EQ(5, queue.size());