648db22b |
1 | /* |
2 | * Copyright (c) Meta Platforms, Inc. and affiliates. |
3 | * All rights reserved. |
4 | * |
5 | * This source code is licensed under both the BSD-style license (found in the |
6 | * LICENSE file in the root directory of this source tree) and the GPLv2 (found |
7 | * in the COPYING file in the root directory of this source tree). |
8 | * You may select, at your option, one of the above-listed licenses. |
9 | */ |
10 | |
11 | |
12 | #include "pool.h" |
13 | #include "threading.h" |
14 | #include "util.h" |
15 | #include "timefn.h" |
16 | #include <stddef.h> |
17 | #include <stdio.h> |
18 | |
19 | #define ASSERT_TRUE(p) \ |
20 | do { \ |
21 | if (!(p)) { \ |
22 | return 1; \ |
23 | } \ |
24 | } while (0) |
25 | #define ASSERT_FALSE(p) ASSERT_TRUE(!(p)) |
26 | #define ASSERT_EQ(lhs, rhs) ASSERT_TRUE((lhs) == (rhs)) |
27 | |
28 | struct data { |
29 | ZSTD_pthread_mutex_t mutex; |
30 | unsigned data[16]; |
31 | size_t i; |
32 | }; |
33 | |
34 | static void fn(void *opaque) |
35 | { |
36 | struct data *data = (struct data *)opaque; |
37 | ZSTD_pthread_mutex_lock(&data->mutex); |
38 | data->data[data->i] = (unsigned)(data->i); |
39 | ++data->i; |
40 | ZSTD_pthread_mutex_unlock(&data->mutex); |
41 | } |
42 | |
43 | static int testOrder(size_t numThreads, size_t queueSize) |
44 | { |
45 | struct data data; |
46 | POOL_ctx* const ctx = POOL_create(numThreads, queueSize); |
47 | ASSERT_TRUE(ctx); |
48 | data.i = 0; |
49 | ASSERT_FALSE(ZSTD_pthread_mutex_init(&data.mutex, NULL)); |
50 | { size_t i; |
51 | for (i = 0; i < 16; ++i) { |
52 | POOL_add(ctx, &fn, &data); |
53 | } |
54 | } |
55 | POOL_free(ctx); |
56 | ASSERT_EQ(16, data.i); |
57 | { size_t i; |
58 | for (i = 0; i < data.i; ++i) { |
59 | ASSERT_EQ(i, data.data[i]); |
60 | } |
61 | } |
62 | ZSTD_pthread_mutex_destroy(&data.mutex); |
63 | return 0; |
64 | } |
65 | |
66 | |
67 | /* --- test deadlocks --- */ |
68 | |
69 | static void waitFn(void *opaque) { |
70 | (void)opaque; |
71 | UTIL_sleepMilli(1); |
72 | } |
73 | |
74 | /* Tests for deadlock */ |
75 | static int testWait(size_t numThreads, size_t queueSize) { |
76 | struct data data; |
77 | POOL_ctx* const ctx = POOL_create(numThreads, queueSize); |
78 | ASSERT_TRUE(ctx); |
79 | { size_t i; |
80 | for (i = 0; i < 16; ++i) { |
81 | POOL_add(ctx, &waitFn, &data); |
82 | } |
83 | } |
84 | POOL_free(ctx); |
85 | return 0; |
86 | } |
87 | |
88 | |
89 | /* --- test POOL_resize() --- */ |
90 | |
91 | typedef struct { |
92 | ZSTD_pthread_mutex_t mut; |
93 | int countdown; |
94 | int val; |
95 | int max; |
96 | ZSTD_pthread_cond_t cond; |
97 | } poolTest_t; |
98 | |
99 | static void waitLongFn(void *opaque) { |
100 | poolTest_t* const test = (poolTest_t*) opaque; |
101 | ZSTD_pthread_mutex_lock(&test->mut); |
102 | test->val++; |
103 | if (test->val > test->max) |
104 | test->max = test->val; |
105 | ZSTD_pthread_mutex_unlock(&test->mut); |
106 | |
107 | UTIL_sleepMilli(10); |
108 | |
109 | ZSTD_pthread_mutex_lock(&test->mut); |
110 | test->val--; |
111 | test->countdown--; |
112 | if (test->countdown == 0) |
113 | ZSTD_pthread_cond_signal(&test->cond); |
114 | ZSTD_pthread_mutex_unlock(&test->mut); |
115 | } |
116 | |
117 | static int testThreadReduction_internal(POOL_ctx* ctx, poolTest_t test) |
118 | { |
119 | int const nbWaits = 16; |
120 | |
121 | test.countdown = nbWaits; |
122 | test.val = 0; |
123 | test.max = 0; |
124 | |
125 | { int i; |
126 | for (i=0; i<nbWaits; i++) |
127 | POOL_add(ctx, &waitLongFn, &test); |
128 | } |
129 | ZSTD_pthread_mutex_lock(&test.mut); |
130 | while (test.countdown > 0) |
131 | ZSTD_pthread_cond_wait(&test.cond, &test.mut); |
132 | ASSERT_EQ(test.val, 0); |
133 | ASSERT_EQ(test.max, 4); |
134 | ZSTD_pthread_mutex_unlock(&test.mut); |
135 | |
136 | ASSERT_EQ( POOL_resize(ctx, 2/*nbThreads*/) , 0 ); |
137 | test.countdown = nbWaits; |
138 | test.val = 0; |
139 | test.max = 0; |
140 | { int i; |
141 | for (i=0; i<nbWaits; i++) |
142 | POOL_add(ctx, &waitLongFn, &test); |
143 | } |
144 | ZSTD_pthread_mutex_lock(&test.mut); |
145 | while (test.countdown > 0) |
146 | ZSTD_pthread_cond_wait(&test.cond, &test.mut); |
147 | ASSERT_EQ(test.val, 0); |
148 | ASSERT_EQ(test.max, 2); |
149 | ZSTD_pthread_mutex_unlock(&test.mut); |
150 | |
151 | return 0; |
152 | } |
153 | |
154 | static int testThreadReduction(void) { |
155 | int result; |
156 | poolTest_t test; |
157 | POOL_ctx* const ctx = POOL_create(4 /*nbThreads*/, 2 /*queueSize*/); |
158 | |
159 | ASSERT_TRUE(ctx); |
160 | |
161 | memset(&test, 0, sizeof(test)); |
162 | ASSERT_FALSE( ZSTD_pthread_mutex_init(&test.mut, NULL) ); |
163 | ASSERT_FALSE( ZSTD_pthread_cond_init(&test.cond, NULL) ); |
164 | |
165 | result = testThreadReduction_internal(ctx, test); |
166 | |
167 | ZSTD_pthread_mutex_destroy(&test.mut); |
168 | ZSTD_pthread_cond_destroy(&test.cond); |
169 | POOL_free(ctx); |
170 | |
171 | return result; |
172 | } |
173 | |
174 | |
175 | /* --- test abrupt ending --- */ |
176 | |
177 | typedef struct { |
178 | ZSTD_pthread_mutex_t mut; |
179 | int val; |
180 | } abruptEndCanary_t; |
181 | |
182 | static void waitIncFn(void *opaque) { |
183 | abruptEndCanary_t* test = (abruptEndCanary_t*) opaque; |
184 | UTIL_sleepMilli(10); |
185 | ZSTD_pthread_mutex_lock(&test->mut); |
186 | test->val = test->val + 1; |
187 | ZSTD_pthread_mutex_unlock(&test->mut); |
188 | } |
189 | |
190 | static int testAbruptEnding_internal(abruptEndCanary_t test) |
191 | { |
192 | int const nbWaits = 16; |
193 | |
194 | POOL_ctx* const ctx = POOL_create(3 /*numThreads*/, nbWaits /*queueSize*/); |
195 | ASSERT_TRUE(ctx); |
196 | test.val = 0; |
197 | |
198 | { int i; |
199 | for (i=0; i<nbWaits; i++) |
200 | POOL_add(ctx, &waitIncFn, &test); /* all jobs pushed into queue */ |
201 | } |
202 | ASSERT_EQ( POOL_resize(ctx, 1 /*numThreads*/) , 0 ); /* downsize numThreads, to try to break end condition */ |
203 | |
204 | POOL_free(ctx); /* must finish all jobs in queue before giving back control */ |
205 | ASSERT_EQ(test.val, nbWaits); |
206 | return 0; |
207 | } |
208 | |
209 | static int testAbruptEnding(void) { |
210 | int result; |
211 | abruptEndCanary_t test; |
212 | |
213 | memset(&test, 0, sizeof(test)); |
214 | ASSERT_FALSE( ZSTD_pthread_mutex_init(&test.mut, NULL) ); |
215 | |
216 | result = testAbruptEnding_internal(test); |
217 | |
218 | ZSTD_pthread_mutex_destroy(&test.mut); |
219 | return result; |
220 | } |
221 | |
222 | |
223 | |
224 | /* --- test launcher --- */ |
225 | |
226 | int main(int argc, const char **argv) { |
227 | size_t numThreads; |
228 | (void)argc; |
229 | (void)argv; |
230 | |
231 | if (POOL_create(0, 1)) { /* should not be possible */ |
232 | printf("FAIL: should not create POOL with 0 threads\n"); |
233 | return 1; |
234 | } |
235 | |
236 | for (numThreads = 1; numThreads <= 4; ++numThreads) { |
237 | size_t queueSize; |
238 | for (queueSize = 0; queueSize <= 2; ++queueSize) { |
239 | printf("queueSize==%u, numThreads=%u \n", |
240 | (unsigned)queueSize, (unsigned)numThreads); |
241 | if (testOrder(numThreads, queueSize)) { |
242 | printf("FAIL: testOrder\n"); |
243 | return 1; |
244 | } |
245 | printf("SUCCESS: testOrder\n"); |
246 | if (testWait(numThreads, queueSize)) { |
247 | printf("FAIL: testWait\n"); |
248 | return 1; |
249 | } |
250 | printf("SUCCESS: testWait\n"); |
251 | } |
252 | } |
253 | |
254 | if (testThreadReduction()) { |
255 | printf("FAIL: thread reduction not effective \n"); |
256 | return 1; |
257 | } else { |
258 | printf("SUCCESS: thread reduction effective \n"); |
259 | } |
260 | |
261 | if (testAbruptEnding()) { |
262 | printf("FAIL: jobs in queue not completed on early end \n"); |
263 | return 1; |
264 | } else { |
265 | printf("SUCCESS: all jobs in queue completed on early end \n"); |
266 | } |
267 | |
268 | printf("PASS: all POOL tests\n"); |
269 | |
270 | return 0; |
271 | } |