git subrepo pull (merge) --force deps/libchdr
[pcsx_rearmed.git] / deps / libchdr / deps / zstd-1.5.5 / contrib / pzstd / utils / test / WorkQueueTest.cpp
CommitLineData
648db22b 1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under both the BSD-style license (found in the
6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7 * in the COPYING file in the root directory of this source tree).
8 */
9#include "utils/Buffer.h"
10#include "utils/WorkQueue.h"
11
12#include <gtest/gtest.h>
13#include <iostream>
14#include <memory>
15#include <mutex>
16#include <thread>
17#include <vector>
18
19using namespace pzstd;
20
21namespace {
22struct Popper {
23 WorkQueue<int>* queue;
24 int* results;
25 std::mutex* mutex;
26
27 void operator()() {
28 int result;
29 while (queue->pop(result)) {
30 std::lock_guard<std::mutex> lock(*mutex);
31 results[result] = result;
32 }
33 }
34};
35}
36
37TEST(WorkQueue, SingleThreaded) {
38 WorkQueue<int> queue;
39 int result;
40
41 queue.push(5);
42 EXPECT_TRUE(queue.pop(result));
43 EXPECT_EQ(5, result);
44
45 queue.push(1);
46 queue.push(2);
47 EXPECT_TRUE(queue.pop(result));
48 EXPECT_EQ(1, result);
49 EXPECT_TRUE(queue.pop(result));
50 EXPECT_EQ(2, result);
51
52 queue.push(1);
53 queue.push(2);
54 queue.finish();
55 EXPECT_TRUE(queue.pop(result));
56 EXPECT_EQ(1, result);
57 EXPECT_TRUE(queue.pop(result));
58 EXPECT_EQ(2, result);
59 EXPECT_FALSE(queue.pop(result));
60
61 queue.waitUntilFinished();
62}
63
64TEST(WorkQueue, SPSC) {
65 WorkQueue<int> queue;
66 const int max = 100;
67
68 for (int i = 0; i < 10; ++i) {
69 queue.push(int{i});
70 }
71
72 std::thread thread([ &queue, max ] {
73 int result;
74 for (int i = 0;; ++i) {
75 if (!queue.pop(result)) {
76 EXPECT_EQ(i, max);
77 break;
78 }
79 EXPECT_EQ(i, result);
80 }
81 });
82
83 std::this_thread::yield();
84 for (int i = 10; i < max; ++i) {
85 queue.push(int{i});
86 }
87 queue.finish();
88
89 thread.join();
90}
91
92TEST(WorkQueue, SPMC) {
93 WorkQueue<int> queue;
94 std::vector<int> results(50, -1);
95 std::mutex mutex;
96 std::vector<std::thread> threads;
97 for (int i = 0; i < 5; ++i) {
98 threads.emplace_back(Popper{&queue, results.data(), &mutex});
99 }
100
101 for (int i = 0; i < 50; ++i) {
102 queue.push(int{i});
103 }
104 queue.finish();
105
106 for (auto& thread : threads) {
107 thread.join();
108 }
109
110 for (int i = 0; i < 50; ++i) {
111 EXPECT_EQ(i, results[i]);
112 }
113}
114
115TEST(WorkQueue, MPMC) {
116 WorkQueue<int> queue;
117 std::vector<int> results(100, -1);
118 std::mutex mutex;
119 std::vector<std::thread> popperThreads;
120 for (int i = 0; i < 4; ++i) {
121 popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
122 }
123
124 std::vector<std::thread> pusherThreads;
125 for (int i = 0; i < 2; ++i) {
126 auto min = i * 50;
127 auto max = (i + 1) * 50;
128 pusherThreads.emplace_back(
129 [ &queue, min, max ] {
130 for (int i = min; i < max; ++i) {
131 queue.push(int{i});
132 }
133 });
134 }
135
136 for (auto& thread : pusherThreads) {
137 thread.join();
138 }
139 queue.finish();
140
141 for (auto& thread : popperThreads) {
142 thread.join();
143 }
144
145 for (int i = 0; i < 100; ++i) {
146 EXPECT_EQ(i, results[i]);
147 }
148}
149
150TEST(WorkQueue, BoundedSizeWorks) {
151 WorkQueue<int> queue(1);
152 int result;
153 queue.push(5);
154 queue.pop(result);
155 queue.push(5);
156 queue.pop(result);
157 queue.push(5);
158 queue.finish();
159 queue.pop(result);
160 EXPECT_EQ(5, result);
161}
162
163TEST(WorkQueue, BoundedSizePushAfterFinish) {
164 WorkQueue<int> queue(1);
165 int result;
166 queue.push(5);
167 std::thread pusher([&queue] {
168 queue.push(6);
169 });
170 // Dirtily try and make sure that pusher has run.
171 std::this_thread::sleep_for(std::chrono::seconds(1));
172 queue.finish();
173 EXPECT_TRUE(queue.pop(result));
174 EXPECT_EQ(5, result);
175 EXPECT_FALSE(queue.pop(result));
176
177 pusher.join();
178}
179
180TEST(WorkQueue, SetMaxSize) {
181 WorkQueue<int> queue(2);
182 int result;
183 queue.push(5);
184 queue.push(6);
185 queue.setMaxSize(1);
186 std::thread pusher([&queue] {
187 queue.push(7);
188 });
189 // Dirtily try and make sure that pusher has run.
190 std::this_thread::sleep_for(std::chrono::seconds(1));
191 queue.finish();
192 EXPECT_TRUE(queue.pop(result));
193 EXPECT_EQ(5, result);
194 EXPECT_TRUE(queue.pop(result));
195 EXPECT_EQ(6, result);
196 EXPECT_FALSE(queue.pop(result));
197
198 pusher.join();
199}
200
201TEST(WorkQueue, BoundedSizeMPMC) {
202 WorkQueue<int> queue(10);
203 std::vector<int> results(200, -1);
204 std::mutex mutex;
205 std::cerr << "Creating popperThreads" << std::endl;
206 std::vector<std::thread> popperThreads;
207 for (int i = 0; i < 4; ++i) {
208 popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
209 }
210
211 std::cerr << "Creating pusherThreads" << std::endl;
212 std::vector<std::thread> pusherThreads;
213 for (int i = 0; i < 2; ++i) {
214 auto min = i * 100;
215 auto max = (i + 1) * 100;
216 pusherThreads.emplace_back(
217 [ &queue, min, max ] {
218 for (int i = min; i < max; ++i) {
219 queue.push(int{i});
220 }
221 });
222 }
223
224 std::cerr << "Joining pusherThreads" << std::endl;
225 for (auto& thread : pusherThreads) {
226 thread.join();
227 }
228 std::cerr << "Finishing queue" << std::endl;
229 queue.finish();
230
231 std::cerr << "Joining popperThreads" << std::endl;
232 for (auto& thread : popperThreads) {
233 thread.join();
234 }
235
236 std::cerr << "Inspecting results" << std::endl;
237 for (int i = 0; i < 200; ++i) {
238 EXPECT_EQ(i, results[i]);
239 }
240}
241
242TEST(WorkQueue, FailedPush) {
243 WorkQueue<std::unique_ptr<int>> queue;
244 std::unique_ptr<int> x(new int{5});
245 EXPECT_TRUE(queue.push(std::move(x)));
246 EXPECT_EQ(nullptr, x);
247 queue.finish();
248 x.reset(new int{6});
249 EXPECT_FALSE(queue.push(std::move(x)));
250 EXPECT_NE(nullptr, x);
251 EXPECT_EQ(6, *x);
252}
253
254TEST(BufferWorkQueue, SizeCalculatedCorrectly) {
255 {
256 BufferWorkQueue queue;
257 queue.finish();
258 EXPECT_EQ(0, queue.size());
259 }
260 {
261 BufferWorkQueue queue;
262 queue.push(Buffer(10));
263 queue.finish();
264 EXPECT_EQ(10, queue.size());
265 }
266 {
267 BufferWorkQueue queue;
268 queue.push(Buffer(10));
269 queue.push(Buffer(5));
270 queue.finish();
271 EXPECT_EQ(15, queue.size());
272 }
273 {
274 BufferWorkQueue queue;
275 queue.push(Buffer(10));
276 queue.push(Buffer(5));
277 queue.finish();
278 Buffer buffer;
279 queue.pop(buffer);
280 EXPECT_EQ(5, queue.size());
281 }
282}