648db22b |
1 | /* |
2 | * Copyright (c) Meta Platforms, Inc. and affiliates. |
3 | * All rights reserved. |
4 | * |
5 | * This source code is licensed under both the BSD-style license (found in the |
6 | * LICENSE file in the root directory of this source tree) and the GPLv2 (found |
7 | * in the COPYING file in the root directory of this source tree). |
8 | * You may select, at your option, one of the above-listed licenses. |
9 | */ |
10 | |
11 | #include "platform.h" |
12 | #include <stdio.h> /* fprintf, open, fdopen, fread, _fileno, stdin, stdout */ |
13 | #include <stdlib.h> /* malloc, free */ |
14 | #include <assert.h> |
15 | #include <errno.h> /* errno */ |
16 | |
17 | #if defined (_MSC_VER) |
18 | # include <sys/stat.h> |
19 | # include <io.h> |
20 | #endif |
21 | |
22 | #include "fileio_asyncio.h" |
23 | #include "fileio_common.h" |
24 | |
25 | /* ********************************************************************** |
26 | * Sparse write |
27 | ************************************************************************/ |
28 | |
29 | /** AIO_fwriteSparse() : |
30 | * @return : storedSkips, |
31 | * argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */ |
32 | static unsigned |
33 | AIO_fwriteSparse(FILE* file, |
34 | const void* buffer, size_t bufferSize, |
35 | const FIO_prefs_t* const prefs, |
36 | unsigned storedSkips) |
37 | { |
38 | const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */ |
39 | size_t bufferSizeT = bufferSize / sizeof(size_t); |
40 | const size_t* const bufferTEnd = bufferT + bufferSizeT; |
41 | const size_t* ptrT = bufferT; |
42 | static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */ |
43 | |
44 | if (prefs->testMode) return 0; /* do not output anything in test mode */ |
45 | |
46 | if (!prefs->sparseFileSupport) { /* normal write */ |
47 | size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file); |
48 | if (sizeCheck != bufferSize) |
49 | EXM_THROW(70, "Write error : cannot write block : %s", |
50 | strerror(errno)); |
51 | return 0; |
52 | } |
53 | |
54 | /* avoid int overflow */ |
55 | if (storedSkips > 1 GB) { |
56 | if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0) |
57 | EXM_THROW(91, "1 GB skip error (sparse file support)"); |
58 | storedSkips -= 1 GB; |
59 | } |
60 | |
61 | while (ptrT < bufferTEnd) { |
62 | size_t nb0T; |
63 | |
64 | /* adjust last segment if < 32 KB */ |
65 | size_t seg0SizeT = segmentSizeT; |
66 | if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT; |
67 | bufferSizeT -= seg0SizeT; |
68 | |
69 | /* count leading zeroes */ |
70 | for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ; |
71 | storedSkips += (unsigned)(nb0T * sizeof(size_t)); |
72 | |
73 | if (nb0T != seg0SizeT) { /* not all 0s */ |
74 | size_t const nbNon0ST = seg0SizeT - nb0T; |
75 | /* skip leading zeros */ |
76 | if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) |
77 | EXM_THROW(92, "Sparse skip error ; try --no-sparse"); |
78 | storedSkips = 0; |
79 | /* write the rest */ |
80 | if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST) |
81 | EXM_THROW(93, "Write error : cannot write block : %s", |
82 | strerror(errno)); |
83 | } |
84 | ptrT += seg0SizeT; |
85 | } |
86 | |
87 | { static size_t const maskT = sizeof(size_t)-1; |
88 | if (bufferSize & maskT) { |
89 | /* size not multiple of sizeof(size_t) : implies end of block */ |
90 | const char* const restStart = (const char*)bufferTEnd; |
91 | const char* restPtr = restStart; |
92 | const char* const restEnd = (const char*)buffer + bufferSize; |
93 | assert(restEnd > restStart && restEnd < restStart + sizeof(size_t)); |
94 | for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ; |
95 | storedSkips += (unsigned) (restPtr - restStart); |
96 | if (restPtr != restEnd) { |
97 | /* not all remaining bytes are 0 */ |
98 | size_t const restSize = (size_t)(restEnd - restPtr); |
99 | if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) |
100 | EXM_THROW(92, "Sparse skip error ; try --no-sparse"); |
101 | if (fwrite(restPtr, 1, restSize, file) != restSize) |
102 | EXM_THROW(95, "Write error : cannot write end of decoded block : %s", |
103 | strerror(errno)); |
104 | storedSkips = 0; |
105 | } } } |
106 | |
107 | return storedSkips; |
108 | } |
109 | |
110 | static void |
111 | AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips) |
112 | { |
113 | if (prefs->testMode) assert(storedSkips == 0); |
114 | if (storedSkips>0) { |
115 | assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */ |
116 | (void)prefs; /* assert can be disabled, in which case prefs becomes unused */ |
117 | if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0) |
118 | EXM_THROW(69, "Final skip error (sparse file support)"); |
119 | /* last zero must be explicitly written, |
120 | * so that skipped ones get implicitly translated as zero by FS */ |
121 | { const char lastZeroByte[1] = { 0 }; |
122 | if (fwrite(lastZeroByte, 1, 1, file) != 1) |
123 | EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno)); |
124 | } } |
125 | } |
126 | |
127 | |
128 | /* ********************************************************************** |
129 | * AsyncIO functionality |
130 | ************************************************************************/ |
131 | |
132 | /* AIO_supported: |
133 | * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */ |
134 | int AIO_supported(void) { |
135 | #ifdef ZSTD_MULTITHREAD |
136 | return 1; |
137 | #else |
138 | return 0; |
139 | #endif |
140 | } |
141 | |
142 | /* *********************************** |
143 | * Generic IoPool implementation |
144 | *************************************/ |
145 | |
146 | static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) { |
147 | IOJob_t* const job = (IOJob_t*) malloc(sizeof(IOJob_t)); |
148 | void* const buffer = malloc(bufferSize); |
149 | if(!job || !buffer) |
150 | EXM_THROW(101, "Allocation error : not enough memory"); |
151 | job->buffer = buffer; |
152 | job->bufferSize = bufferSize; |
153 | job->usedBufferSize = 0; |
154 | job->file = NULL; |
155 | job->ctx = ctx; |
156 | job->offset = 0; |
157 | return job; |
158 | } |
159 | |
160 | |
161 | /* AIO_IOPool_createThreadPool: |
162 | * Creates a thread pool and a mutex for threaded IO pool. |
163 | * Displays warning if asyncio is requested but MT isn't available. */ |
164 | static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) { |
165 | ctx->threadPool = NULL; |
166 | ctx->threadPoolActive = 0; |
167 | if(prefs->asyncIO) { |
168 | if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL)) |
169 | EXM_THROW(102,"Failed creating ioJobsMutex mutex"); |
170 | /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to |
171 | * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */ |
172 | assert(MAX_IO_JOBS >= 2); |
173 | ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2); |
174 | ctx->threadPoolActive = 1; |
175 | if (!ctx->threadPool) |
176 | EXM_THROW(104, "Failed creating I/O thread pool"); |
177 | } |
178 | } |
179 | |
180 | /* AIO_IOPool_init: |
181 | * Allocates and sets and a new I/O thread pool including its included availableJobs. */ |
182 | static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) { |
183 | int i; |
184 | AIO_IOPool_createThreadPool(ctx, prefs); |
185 | ctx->prefs = prefs; |
186 | ctx->poolFunction = poolFunction; |
187 | ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2; |
188 | ctx->availableJobsCount = ctx->totalIoJobs; |
189 | for(i=0; i < ctx->availableJobsCount; i++) { |
190 | ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize); |
191 | } |
192 | ctx->jobBufferSize = bufferSize; |
193 | ctx->file = NULL; |
194 | } |
195 | |
196 | |
197 | /* AIO_IOPool_threadPoolActive: |
198 | * Check if current operation uses thread pool. |
199 | * Note that in some cases we have a thread pool initialized but choose not to use it. */ |
200 | static int AIO_IOPool_threadPoolActive(IOPoolCtx_t* ctx) { |
201 | return ctx->threadPool && ctx->threadPoolActive; |
202 | } |
203 | |
204 | |
205 | /* AIO_IOPool_lockJobsMutex: |
206 | * Locks the IO jobs mutex if threading is active */ |
207 | static void AIO_IOPool_lockJobsMutex(IOPoolCtx_t* ctx) { |
208 | if(AIO_IOPool_threadPoolActive(ctx)) |
209 | ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex); |
210 | } |
211 | |
212 | /* AIO_IOPool_unlockJobsMutex: |
213 | * Unlocks the IO jobs mutex if threading is active */ |
214 | static void AIO_IOPool_unlockJobsMutex(IOPoolCtx_t* ctx) { |
215 | if(AIO_IOPool_threadPoolActive(ctx)) |
216 | ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex); |
217 | } |
218 | |
219 | /* AIO_IOPool_releaseIoJob: |
220 | * Releases an acquired job back to the pool. Doesn't execute the job. */ |
221 | static void AIO_IOPool_releaseIoJob(IOJob_t* job) { |
222 | IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx; |
223 | AIO_IOPool_lockJobsMutex(ctx); |
224 | assert(ctx->availableJobsCount < ctx->totalIoJobs); |
225 | ctx->availableJobs[ctx->availableJobsCount++] = job; |
226 | AIO_IOPool_unlockJobsMutex(ctx); |
227 | } |
228 | |
229 | /* AIO_IOPool_join: |
230 | * Waits for all tasks in the pool to finish executing. */ |
231 | static void AIO_IOPool_join(IOPoolCtx_t* ctx) { |
232 | if(AIO_IOPool_threadPoolActive(ctx)) |
233 | POOL_joinJobs(ctx->threadPool); |
234 | } |
235 | |
236 | /* AIO_IOPool_setThreaded: |
237 | * Allows (de)activating threaded mode, to be used when the expected overhead |
238 | * of threading costs more than the expected gains. */ |
239 | static void AIO_IOPool_setThreaded(IOPoolCtx_t* ctx, int threaded) { |
240 | assert(threaded == 0 || threaded == 1); |
241 | assert(ctx != NULL); |
242 | if(ctx->threadPoolActive != threaded) { |
243 | AIO_IOPool_join(ctx); |
244 | ctx->threadPoolActive = threaded; |
245 | } |
246 | } |
247 | |
248 | /* AIO_IOPool_free: |
249 | * Release a previously allocated IO thread pool. Makes sure all tasks are done and released. */ |
250 | static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) { |
251 | int i; |
252 | if(ctx->threadPool) { |
253 | /* Make sure we finish all tasks and then free the resources */ |
254 | AIO_IOPool_join(ctx); |
255 | /* Make sure we are not leaking availableJobs */ |
256 | assert(ctx->availableJobsCount == ctx->totalIoJobs); |
257 | POOL_free(ctx->threadPool); |
258 | ZSTD_pthread_mutex_destroy(&ctx->ioJobsMutex); |
259 | } |
260 | assert(ctx->file == NULL); |
261 | for(i=0; i<ctx->availableJobsCount; i++) { |
262 | IOJob_t* job = (IOJob_t*) ctx->availableJobs[i]; |
263 | free(job->buffer); |
264 | free(job); |
265 | } |
266 | } |
267 | |
268 | /* AIO_IOPool_acquireJob: |
269 | * Returns an available io job to be used for a future io. */ |
270 | static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) { |
271 | IOJob_t *job; |
272 | assert(ctx->file != NULL || ctx->prefs->testMode); |
273 | AIO_IOPool_lockJobsMutex(ctx); |
274 | assert(ctx->availableJobsCount > 0); |
275 | job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount]; |
276 | AIO_IOPool_unlockJobsMutex(ctx); |
277 | job->usedBufferSize = 0; |
278 | job->file = ctx->file; |
279 | job->offset = 0; |
280 | return job; |
281 | } |
282 | |
283 | |
284 | /* AIO_IOPool_setFile: |
285 | * Sets the destination file for future files in the pool. |
286 | * Requires completion of all queued jobs and release of all otherwise acquired jobs. */ |
287 | static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) { |
288 | assert(ctx!=NULL); |
289 | AIO_IOPool_join(ctx); |
290 | assert(ctx->availableJobsCount == ctx->totalIoJobs); |
291 | ctx->file = file; |
292 | } |
293 | |
294 | static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) { |
295 | return ctx->file; |
296 | } |
297 | |
298 | /* AIO_IOPool_enqueueJob: |
299 | * Enqueues an io job for execution. |
300 | * The queued job shouldn't be used directly after queueing it. */ |
301 | static void AIO_IOPool_enqueueJob(IOJob_t* job) { |
302 | IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx; |
303 | if(AIO_IOPool_threadPoolActive(ctx)) |
304 | POOL_add(ctx->threadPool, ctx->poolFunction, job); |
305 | else |
306 | ctx->poolFunction(job); |
307 | } |
308 | |
309 | /* *********************************** |
310 | * WritePool implementation |
311 | *************************************/ |
312 | |
313 | /* AIO_WritePool_acquireJob: |
314 | * Returns an available write job to be used for a future write. */ |
315 | IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) { |
316 | return AIO_IOPool_acquireJob(&ctx->base); |
317 | } |
318 | |
319 | /* AIO_WritePool_enqueueAndReacquireWriteJob: |
320 | * Queues a write job for execution and acquires a new one. |
321 | * After execution `job`'s pointed value would change to the newly acquired job. |
322 | * Make sure to set `usedBufferSize` to the wanted length before call. |
323 | * The queued job shouldn't be used directly after queueing it. */ |
324 | void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) { |
325 | AIO_IOPool_enqueueJob(*job); |
326 | *job = AIO_IOPool_acquireJob((IOPoolCtx_t *)(*job)->ctx); |
327 | } |
328 | |
329 | /* AIO_WritePool_sparseWriteEnd: |
330 | * Ends sparse writes to the current file. |
331 | * Blocks on completion of all current write jobs before executing. */ |
332 | void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) { |
333 | assert(ctx != NULL); |
334 | AIO_IOPool_join(&ctx->base); |
335 | AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips); |
336 | ctx->storedSkips = 0; |
337 | } |
338 | |
339 | /* AIO_WritePool_setFile: |
340 | * Sets the destination file for future writes in the pool. |
341 | * Requires completion of all queues write jobs and release of all otherwise acquired jobs. |
342 | * Also requires ending of sparse write if a previous file was used in sparse mode. */ |
343 | void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) { |
344 | AIO_IOPool_setFile(&ctx->base, file); |
345 | assert(ctx->storedSkips == 0); |
346 | } |
347 | |
348 | /* AIO_WritePool_getFile: |
349 | * Returns the file the writePool is currently set to write to. */ |
350 | FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) { |
351 | return AIO_IOPool_getFile(&ctx->base); |
352 | } |
353 | |
354 | /* AIO_WritePool_releaseIoJob: |
355 | * Releases an acquired job back to the pool. Doesn't execute the job. */ |
356 | void AIO_WritePool_releaseIoJob(IOJob_t* job) { |
357 | AIO_IOPool_releaseIoJob(job); |
358 | } |
359 | |
360 | /* AIO_WritePool_closeFile: |
361 | * Ends sparse write and closes the writePool's current file and sets the file to NULL. |
362 | * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */ |
363 | int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) { |
364 | FILE* const dstFile = ctx->base.file; |
365 | assert(dstFile!=NULL || ctx->base.prefs->testMode!=0); |
366 | AIO_WritePool_sparseWriteEnd(ctx); |
367 | AIO_IOPool_setFile(&ctx->base, NULL); |
368 | return fclose(dstFile); |
369 | } |
370 | |
371 | /* AIO_WritePool_executeWriteJob: |
372 | * Executes a write job synchronously. Can be used as a function for a thread pool. */ |
373 | static void AIO_WritePool_executeWriteJob(void* opaque){ |
374 | IOJob_t* const job = (IOJob_t*) opaque; |
375 | WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx; |
376 | ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips); |
377 | AIO_IOPool_releaseIoJob(job); |
378 | } |
379 | |
380 | /* AIO_WritePool_create: |
381 | * Allocates and sets and a new write pool including its included jobs. */ |
382 | WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) { |
383 | WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t)); |
384 | if(!ctx) EXM_THROW(100, "Allocation error : not enough memory"); |
385 | AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize); |
386 | ctx->storedSkips = 0; |
387 | return ctx; |
388 | } |
389 | |
390 | /* AIO_WritePool_free: |
391 | * Frees and releases a writePool and its resources. Closes destination file if needs to. */ |
392 | void AIO_WritePool_free(WritePoolCtx_t* ctx) { |
393 | /* Make sure we finish all tasks and then free the resources */ |
394 | if(AIO_WritePool_getFile(ctx)) |
395 | AIO_WritePool_closeFile(ctx); |
396 | AIO_IOPool_destroy(&ctx->base); |
397 | assert(ctx->storedSkips==0); |
398 | free(ctx); |
399 | } |
400 | |
401 | /* AIO_WritePool_setAsync: |
402 | * Allows (de)activating async mode, to be used when the expected overhead |
403 | * of asyncio costs more than the expected gains. */ |
404 | void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) { |
405 | AIO_IOPool_setThreaded(&ctx->base, async); |
406 | } |
407 | |
408 | |
409 | /* *********************************** |
410 | * ReadPool implementation |
411 | *************************************/ |
412 | static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) { |
413 | int i; |
414 | for(i=0; i<ctx->completedJobsCount; i++) { |
415 | IOJob_t* job = (IOJob_t*) ctx->completedJobs[i]; |
416 | AIO_IOPool_releaseIoJob(job); |
417 | } |
418 | ctx->completedJobsCount = 0; |
419 | } |
420 | |
421 | static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) { |
422 | ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx; |
423 | AIO_IOPool_lockJobsMutex(&ctx->base); |
424 | assert(ctx->completedJobsCount < MAX_IO_JOBS); |
425 | ctx->completedJobs[ctx->completedJobsCount++] = job; |
426 | if(AIO_IOPool_threadPoolActive(&ctx->base)) { |
427 | ZSTD_pthread_cond_signal(&ctx->jobCompletedCond); |
428 | } |
429 | AIO_IOPool_unlockJobsMutex(&ctx->base); |
430 | } |
431 | |
432 | /* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked: |
433 | * Looks through the completed jobs for a job matching the waitingOnOffset and returns it, |
434 | * if job wasn't found returns NULL. |
435 | * IMPORTANT: assumes ioJobsMutex is locked. */ |
436 | static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) { |
437 | IOJob_t *job = NULL; |
438 | int i; |
439 | /* This implementation goes through all completed jobs and looks for the one matching the next offset. |
440 | * While not strictly needed for a single threaded reader implementation (as in such a case we could expect |
441 | * reads to be completed in order) this implementation was chosen as it better fits other asyncio |
442 | * interfaces (such as io_uring) that do not provide promises regarding order of completion. */ |
443 | for (i=0; i<ctx->completedJobsCount; i++) { |
444 | job = (IOJob_t *) ctx->completedJobs[i]; |
445 | if (job->offset == ctx->waitingOnOffset) { |
446 | ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount]; |
447 | return job; |
448 | } |
449 | } |
450 | return NULL; |
451 | } |
452 | |
453 | /* AIO_ReadPool_numReadsInFlight: |
454 | * Returns the number of IO read jobs currently in flight. */ |
455 | static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) { |
f535537f |
456 | const int jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1); |
457 | return (size_t)(ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld)); |
648db22b |
458 | } |
459 | |
460 | /* AIO_ReadPool_getNextCompletedJob: |
461 | * Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset. |
462 | * Would block. */ |
463 | static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) { |
464 | IOJob_t *job = NULL; |
465 | AIO_IOPool_lockJobsMutex(&ctx->base); |
466 | |
467 | job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx); |
468 | |
469 | /* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */ |
470 | while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) { |
471 | assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */ |
472 | ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex); |
473 | job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx); |
474 | } |
475 | |
476 | if(job) { |
477 | assert(job->offset == ctx->waitingOnOffset); |
478 | ctx->waitingOnOffset += job->usedBufferSize; |
479 | } |
480 | |
481 | AIO_IOPool_unlockJobsMutex(&ctx->base); |
482 | return job; |
483 | } |
484 | |
485 | |
486 | /* AIO_ReadPool_executeReadJob: |
487 | * Executes a read job synchronously. Can be used as a function for a thread pool. */ |
488 | static void AIO_ReadPool_executeReadJob(void* opaque){ |
489 | IOJob_t* const job = (IOJob_t*) opaque; |
490 | ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx; |
491 | if(ctx->reachedEof) { |
492 | job->usedBufferSize = 0; |
493 | AIO_ReadPool_addJobToCompleted(job); |
494 | return; |
495 | } |
496 | job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file); |
497 | if(job->usedBufferSize < job->bufferSize) { |
498 | if(ferror(job->file)) { |
499 | EXM_THROW(37, "Read error"); |
500 | } else if(feof(job->file)) { |
501 | ctx->reachedEof = 1; |
502 | } else { |
503 | EXM_THROW(37, "Unexpected short read"); |
504 | } |
505 | } |
506 | AIO_ReadPool_addJobToCompleted(job); |
507 | } |
508 | |
509 | static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) { |
510 | IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base); |
511 | job->offset = ctx->nextReadOffset; |
512 | ctx->nextReadOffset += job->bufferSize; |
513 | AIO_IOPool_enqueueJob(job); |
514 | } |
515 | |
516 | static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) { |
f535537f |
517 | while(ctx->base.availableJobsCount) { |
648db22b |
518 | AIO_ReadPool_enqueueRead(ctx); |
519 | } |
520 | } |
521 | |
522 | /* AIO_ReadPool_setFile: |
523 | * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL. |
524 | * Waits for all current enqueued tasks to complete if a previous file was set. */ |
525 | void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) { |
526 | assert(ctx!=NULL); |
527 | AIO_IOPool_join(&ctx->base); |
528 | AIO_ReadPool_releaseAllCompletedJobs(ctx); |
529 | if (ctx->currentJobHeld) { |
530 | AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld); |
531 | ctx->currentJobHeld = NULL; |
532 | } |
533 | AIO_IOPool_setFile(&ctx->base, file); |
534 | ctx->nextReadOffset = 0; |
535 | ctx->waitingOnOffset = 0; |
536 | ctx->srcBuffer = ctx->coalesceBuffer; |
537 | ctx->srcBufferLoaded = 0; |
538 | ctx->reachedEof = 0; |
539 | if(file != NULL) |
540 | AIO_ReadPool_startReading(ctx); |
541 | } |
542 | |
543 | /* AIO_ReadPool_create: |
544 | * Allocates and sets and a new readPool including its included jobs. |
545 | * bufferSize should be set to the maximal buffer we want to read at a time, will also be used |
546 | * as our basic read size. */ |
547 | ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) { |
548 | ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t)); |
549 | if(!ctx) EXM_THROW(100, "Allocation error : not enough memory"); |
550 | AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize); |
551 | |
552 | ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2); |
f535537f |
553 | if(!ctx->coalesceBuffer) EXM_THROW(100, "Allocation error : not enough memory"); |
648db22b |
554 | ctx->srcBuffer = ctx->coalesceBuffer; |
555 | ctx->srcBufferLoaded = 0; |
556 | ctx->completedJobsCount = 0; |
557 | ctx->currentJobHeld = NULL; |
558 | |
559 | if(ctx->base.threadPool) |
560 | if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL)) |
561 | EXM_THROW(103,"Failed creating jobCompletedCond cond"); |
562 | |
563 | return ctx; |
564 | } |
565 | |
566 | /* AIO_ReadPool_free: |
567 | * Frees and releases a readPool and its resources. Closes source file. */ |
568 | void AIO_ReadPool_free(ReadPoolCtx_t* ctx) { |
569 | if(AIO_ReadPool_getFile(ctx)) |
570 | AIO_ReadPool_closeFile(ctx); |
571 | if(ctx->base.threadPool) |
572 | ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond); |
573 | AIO_IOPool_destroy(&ctx->base); |
574 | free(ctx->coalesceBuffer); |
575 | free(ctx); |
576 | } |
577 | |
578 | /* AIO_ReadPool_consumeBytes: |
579 | * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */ |
580 | void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) { |
581 | assert(n <= ctx->srcBufferLoaded); |
582 | ctx->srcBufferLoaded -= n; |
583 | ctx->srcBuffer += n; |
584 | } |
585 | |
586 | /* AIO_ReadPool_releaseCurrentlyHeldAndGetNext: |
587 | * Release the current held job and get the next one, returns NULL if no next job available. */ |
588 | static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) { |
589 | if (ctx->currentJobHeld) { |
590 | AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld); |
591 | ctx->currentJobHeld = NULL; |
592 | AIO_ReadPool_enqueueRead(ctx); |
593 | } |
594 | ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx); |
595 | return (IOJob_t*) ctx->currentJobHeld; |
596 | } |
597 | |
598 | /* AIO_ReadPool_fillBuffer: |
599 | * Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller). |
600 | * Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file. |
601 | * Return value is the number of bytes added to the buffer. |
602 | * Note that srcBuffer might have up to 2 times jobBufferSize bytes. */ |
603 | size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) { |
604 | IOJob_t *job; |
605 | int useCoalesce = 0; |
606 | if(n > ctx->base.jobBufferSize) |
607 | n = ctx->base.jobBufferSize; |
608 | |
609 | /* We are good, don't read anything */ |
610 | if (ctx->srcBufferLoaded >= n) |
611 | return 0; |
612 | |
613 | /* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job |
614 | * and coalesce the remaining bytes with the next job's buffer */ |
615 | if (ctx->srcBufferLoaded > 0) { |
616 | useCoalesce = 1; |
617 | memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded); |
618 | ctx->srcBuffer = ctx->coalesceBuffer; |
619 | } |
620 | |
621 | /* Read the next chunk */ |
622 | job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx); |
623 | if(!job) |
624 | return 0; |
625 | if(useCoalesce) { |
626 | assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize); |
627 | memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize); |
628 | ctx->srcBufferLoaded += job->usedBufferSize; |
629 | } |
630 | else { |
631 | ctx->srcBuffer = (U8 *) job->buffer; |
632 | ctx->srcBufferLoaded = job->usedBufferSize; |
633 | } |
634 | return job->usedBufferSize; |
635 | } |
636 | |
637 | /* AIO_ReadPool_consumeAndRefill: |
638 | * Consumes the current buffer and refills it with bufferSize bytes. */ |
639 | size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) { |
640 | AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded); |
641 | return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize); |
642 | } |
643 | |
644 | /* AIO_ReadPool_getFile: |
645 | * Returns the current file set for the read pool. */ |
646 | FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) { |
647 | return AIO_IOPool_getFile(&ctx->base); |
648 | } |
649 | |
650 | /* AIO_ReadPool_closeFile: |
651 | * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */ |
652 | int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) { |
653 | FILE* const file = AIO_ReadPool_getFile(ctx); |
654 | AIO_ReadPool_setFile(ctx, NULL); |
655 | return fclose(file); |
656 | } |
657 | |
658 | /* AIO_ReadPool_setAsync: |
659 | * Allows (de)activating async mode, to be used when the expected overhead |
660 | * of asyncio costs more than the expected gains. */ |
661 | void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async) { |
662 | AIO_IOPool_setThreaded(&ctx->base, async); |
663 | } |