648db22b |
1 | /* |
2 | * Copyright (c) Meta Platforms, Inc. and affiliates. |
3 | * All rights reserved. |
4 | * |
5 | * This source code is licensed under both the BSD-style license (found in the |
6 | * LICENSE file in the root directory of this source tree) and the GPLv2 (found |
7 | * in the COPYING file in the root directory of this source tree). |
8 | */ |
9 | #include "utils/Buffer.h" |
10 | #include "utils/WorkQueue.h" |
11 | |
12 | #include <gtest/gtest.h> |
13 | #include <iostream> |
14 | #include <memory> |
15 | #include <mutex> |
16 | #include <thread> |
17 | #include <vector> |
18 | |
19 | using namespace pzstd; |
20 | |
21 | namespace { |
22 | struct Popper { |
23 | WorkQueue<int>* queue; |
24 | int* results; |
25 | std::mutex* mutex; |
26 | |
27 | void operator()() { |
28 | int result; |
29 | while (queue->pop(result)) { |
30 | std::lock_guard<std::mutex> lock(*mutex); |
31 | results[result] = result; |
32 | } |
33 | } |
34 | }; |
35 | } |
36 | |
37 | TEST(WorkQueue, SingleThreaded) { |
38 | WorkQueue<int> queue; |
39 | int result; |
40 | |
41 | queue.push(5); |
42 | EXPECT_TRUE(queue.pop(result)); |
43 | EXPECT_EQ(5, result); |
44 | |
45 | queue.push(1); |
46 | queue.push(2); |
47 | EXPECT_TRUE(queue.pop(result)); |
48 | EXPECT_EQ(1, result); |
49 | EXPECT_TRUE(queue.pop(result)); |
50 | EXPECT_EQ(2, result); |
51 | |
52 | queue.push(1); |
53 | queue.push(2); |
54 | queue.finish(); |
55 | EXPECT_TRUE(queue.pop(result)); |
56 | EXPECT_EQ(1, result); |
57 | EXPECT_TRUE(queue.pop(result)); |
58 | EXPECT_EQ(2, result); |
59 | EXPECT_FALSE(queue.pop(result)); |
60 | |
61 | queue.waitUntilFinished(); |
62 | } |
63 | |
64 | TEST(WorkQueue, SPSC) { |
65 | WorkQueue<int> queue; |
66 | const int max = 100; |
67 | |
68 | for (int i = 0; i < 10; ++i) { |
69 | queue.push(int{i}); |
70 | } |
71 | |
72 | std::thread thread([ &queue, max ] { |
73 | int result; |
74 | for (int i = 0;; ++i) { |
75 | if (!queue.pop(result)) { |
76 | EXPECT_EQ(i, max); |
77 | break; |
78 | } |
79 | EXPECT_EQ(i, result); |
80 | } |
81 | }); |
82 | |
83 | std::this_thread::yield(); |
84 | for (int i = 10; i < max; ++i) { |
85 | queue.push(int{i}); |
86 | } |
87 | queue.finish(); |
88 | |
89 | thread.join(); |
90 | } |
91 | |
92 | TEST(WorkQueue, SPMC) { |
93 | WorkQueue<int> queue; |
94 | std::vector<int> results(50, -1); |
95 | std::mutex mutex; |
96 | std::vector<std::thread> threads; |
97 | for (int i = 0; i < 5; ++i) { |
98 | threads.emplace_back(Popper{&queue, results.data(), &mutex}); |
99 | } |
100 | |
101 | for (int i = 0; i < 50; ++i) { |
102 | queue.push(int{i}); |
103 | } |
104 | queue.finish(); |
105 | |
106 | for (auto& thread : threads) { |
107 | thread.join(); |
108 | } |
109 | |
110 | for (int i = 0; i < 50; ++i) { |
111 | EXPECT_EQ(i, results[i]); |
112 | } |
113 | } |
114 | |
115 | TEST(WorkQueue, MPMC) { |
116 | WorkQueue<int> queue; |
117 | std::vector<int> results(100, -1); |
118 | std::mutex mutex; |
119 | std::vector<std::thread> popperThreads; |
120 | for (int i = 0; i < 4; ++i) { |
121 | popperThreads.emplace_back(Popper{&queue, results.data(), &mutex}); |
122 | } |
123 | |
124 | std::vector<std::thread> pusherThreads; |
125 | for (int i = 0; i < 2; ++i) { |
126 | auto min = i * 50; |
127 | auto max = (i + 1) * 50; |
128 | pusherThreads.emplace_back( |
129 | [ &queue, min, max ] { |
130 | for (int i = min; i < max; ++i) { |
131 | queue.push(int{i}); |
132 | } |
133 | }); |
134 | } |
135 | |
136 | for (auto& thread : pusherThreads) { |
137 | thread.join(); |
138 | } |
139 | queue.finish(); |
140 | |
141 | for (auto& thread : popperThreads) { |
142 | thread.join(); |
143 | } |
144 | |
145 | for (int i = 0; i < 100; ++i) { |
146 | EXPECT_EQ(i, results[i]); |
147 | } |
148 | } |
149 | |
150 | TEST(WorkQueue, BoundedSizeWorks) { |
151 | WorkQueue<int> queue(1); |
152 | int result; |
153 | queue.push(5); |
154 | queue.pop(result); |
155 | queue.push(5); |
156 | queue.pop(result); |
157 | queue.push(5); |
158 | queue.finish(); |
159 | queue.pop(result); |
160 | EXPECT_EQ(5, result); |
161 | } |
162 | |
163 | TEST(WorkQueue, BoundedSizePushAfterFinish) { |
164 | WorkQueue<int> queue(1); |
165 | int result; |
166 | queue.push(5); |
167 | std::thread pusher([&queue] { |
168 | queue.push(6); |
169 | }); |
170 | // Dirtily try and make sure that pusher has run. |
171 | std::this_thread::sleep_for(std::chrono::seconds(1)); |
172 | queue.finish(); |
173 | EXPECT_TRUE(queue.pop(result)); |
174 | EXPECT_EQ(5, result); |
175 | EXPECT_FALSE(queue.pop(result)); |
176 | |
177 | pusher.join(); |
178 | } |
179 | |
180 | TEST(WorkQueue, SetMaxSize) { |
181 | WorkQueue<int> queue(2); |
182 | int result; |
183 | queue.push(5); |
184 | queue.push(6); |
185 | queue.setMaxSize(1); |
186 | std::thread pusher([&queue] { |
187 | queue.push(7); |
188 | }); |
189 | // Dirtily try and make sure that pusher has run. |
190 | std::this_thread::sleep_for(std::chrono::seconds(1)); |
191 | queue.finish(); |
192 | EXPECT_TRUE(queue.pop(result)); |
193 | EXPECT_EQ(5, result); |
194 | EXPECT_TRUE(queue.pop(result)); |
195 | EXPECT_EQ(6, result); |
196 | EXPECT_FALSE(queue.pop(result)); |
197 | |
198 | pusher.join(); |
199 | } |
200 | |
201 | TEST(WorkQueue, BoundedSizeMPMC) { |
202 | WorkQueue<int> queue(10); |
203 | std::vector<int> results(200, -1); |
204 | std::mutex mutex; |
205 | std::cerr << "Creating popperThreads" << std::endl; |
206 | std::vector<std::thread> popperThreads; |
207 | for (int i = 0; i < 4; ++i) { |
208 | popperThreads.emplace_back(Popper{&queue, results.data(), &mutex}); |
209 | } |
210 | |
211 | std::cerr << "Creating pusherThreads" << std::endl; |
212 | std::vector<std::thread> pusherThreads; |
213 | for (int i = 0; i < 2; ++i) { |
214 | auto min = i * 100; |
215 | auto max = (i + 1) * 100; |
216 | pusherThreads.emplace_back( |
217 | [ &queue, min, max ] { |
218 | for (int i = min; i < max; ++i) { |
219 | queue.push(int{i}); |
220 | } |
221 | }); |
222 | } |
223 | |
224 | std::cerr << "Joining pusherThreads" << std::endl; |
225 | for (auto& thread : pusherThreads) { |
226 | thread.join(); |
227 | } |
228 | std::cerr << "Finishing queue" << std::endl; |
229 | queue.finish(); |
230 | |
231 | std::cerr << "Joining popperThreads" << std::endl; |
232 | for (auto& thread : popperThreads) { |
233 | thread.join(); |
234 | } |
235 | |
236 | std::cerr << "Inspecting results" << std::endl; |
237 | for (int i = 0; i < 200; ++i) { |
238 | EXPECT_EQ(i, results[i]); |
239 | } |
240 | } |
241 | |
242 | TEST(WorkQueue, FailedPush) { |
243 | WorkQueue<std::unique_ptr<int>> queue; |
244 | std::unique_ptr<int> x(new int{5}); |
245 | EXPECT_TRUE(queue.push(std::move(x))); |
246 | EXPECT_EQ(nullptr, x); |
247 | queue.finish(); |
248 | x.reset(new int{6}); |
249 | EXPECT_FALSE(queue.push(std::move(x))); |
250 | EXPECT_NE(nullptr, x); |
251 | EXPECT_EQ(6, *x); |
252 | } |
253 | |
254 | TEST(BufferWorkQueue, SizeCalculatedCorrectly) { |
255 | { |
256 | BufferWorkQueue queue; |
257 | queue.finish(); |
258 | EXPECT_EQ(0, queue.size()); |
259 | } |
260 | { |
261 | BufferWorkQueue queue; |
262 | queue.push(Buffer(10)); |
263 | queue.finish(); |
264 | EXPECT_EQ(10, queue.size()); |
265 | } |
266 | { |
267 | BufferWorkQueue queue; |
268 | queue.push(Buffer(10)); |
269 | queue.push(Buffer(5)); |
270 | queue.finish(); |
271 | EXPECT_EQ(15, queue.size()); |
272 | } |
273 | { |
274 | BufferWorkQueue queue; |
275 | queue.push(Buffer(10)); |
276 | queue.push(Buffer(5)); |
277 | queue.finish(); |
278 | Buffer buffer; |
279 | queue.pop(buffer); |
280 | EXPECT_EQ(5, queue.size()); |
281 | } |
282 | } |