git subrepo pull (merge) --force deps/libchdr
[pcsx_rearmed.git] / deps / libchdr / deps / zstd-1.5.5 / programs / fileio_asyncio.c
CommitLineData
648db22b 1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under both the BSD-style license (found in the
6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7 * in the COPYING file in the root directory of this source tree).
8 * You may select, at your option, one of the above-listed licenses.
9 */
10
11#include "platform.h"
12#include <stdio.h> /* fprintf, open, fdopen, fread, _fileno, stdin, stdout */
13#include <stdlib.h> /* malloc, free */
14#include <assert.h>
15#include <errno.h> /* errno */
16
17#if defined (_MSC_VER)
18# include <sys/stat.h>
19# include <io.h>
20#endif
21
22#include "fileio_asyncio.h"
23#include "fileio_common.h"
24
25/* **********************************************************************
26 * Sparse write
27 ************************************************************************/
28
29/** AIO_fwriteSparse() :
30* @return : storedSkips,
31* argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
32static unsigned
33AIO_fwriteSparse(FILE* file,
34 const void* buffer, size_t bufferSize,
35 const FIO_prefs_t* const prefs,
36 unsigned storedSkips)
37{
38 const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */
39 size_t bufferSizeT = bufferSize / sizeof(size_t);
40 const size_t* const bufferTEnd = bufferT + bufferSizeT;
41 const size_t* ptrT = bufferT;
42 static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */
43
44 if (prefs->testMode) return 0; /* do not output anything in test mode */
45
46 if (!prefs->sparseFileSupport) { /* normal write */
47 size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
48 if (sizeCheck != bufferSize)
49 EXM_THROW(70, "Write error : cannot write block : %s",
50 strerror(errno));
51 return 0;
52 }
53
54 /* avoid int overflow */
55 if (storedSkips > 1 GB) {
56 if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0)
57 EXM_THROW(91, "1 GB skip error (sparse file support)");
58 storedSkips -= 1 GB;
59 }
60
61 while (ptrT < bufferTEnd) {
62 size_t nb0T;
63
64 /* adjust last segment if < 32 KB */
65 size_t seg0SizeT = segmentSizeT;
66 if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT;
67 bufferSizeT -= seg0SizeT;
68
69 /* count leading zeroes */
70 for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ;
71 storedSkips += (unsigned)(nb0T * sizeof(size_t));
72
73 if (nb0T != seg0SizeT) { /* not all 0s */
74 size_t const nbNon0ST = seg0SizeT - nb0T;
75 /* skip leading zeros */
76 if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
77 EXM_THROW(92, "Sparse skip error ; try --no-sparse");
78 storedSkips = 0;
79 /* write the rest */
80 if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
81 EXM_THROW(93, "Write error : cannot write block : %s",
82 strerror(errno));
83 }
84 ptrT += seg0SizeT;
85 }
86
87 { static size_t const maskT = sizeof(size_t)-1;
88 if (bufferSize & maskT) {
89 /* size not multiple of sizeof(size_t) : implies end of block */
90 const char* const restStart = (const char*)bufferTEnd;
91 const char* restPtr = restStart;
92 const char* const restEnd = (const char*)buffer + bufferSize;
93 assert(restEnd > restStart && restEnd < restStart + sizeof(size_t));
94 for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ;
95 storedSkips += (unsigned) (restPtr - restStart);
96 if (restPtr != restEnd) {
97 /* not all remaining bytes are 0 */
98 size_t const restSize = (size_t)(restEnd - restPtr);
99 if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
100 EXM_THROW(92, "Sparse skip error ; try --no-sparse");
101 if (fwrite(restPtr, 1, restSize, file) != restSize)
102 EXM_THROW(95, "Write error : cannot write end of decoded block : %s",
103 strerror(errno));
104 storedSkips = 0;
105 } } }
106
107 return storedSkips;
108}
109
110static void
111AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
112{
113 if (prefs->testMode) assert(storedSkips == 0);
114 if (storedSkips>0) {
115 assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */
116 (void)prefs; /* assert can be disabled, in which case prefs becomes unused */
117 if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0)
118 EXM_THROW(69, "Final skip error (sparse file support)");
119 /* last zero must be explicitly written,
120 * so that skipped ones get implicitly translated as zero by FS */
121 { const char lastZeroByte[1] = { 0 };
122 if (fwrite(lastZeroByte, 1, 1, file) != 1)
123 EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno));
124 } }
125}
126
127
128/* **********************************************************************
129 * AsyncIO functionality
130 ************************************************************************/
131
132/* AIO_supported:
133 * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
134int AIO_supported(void) {
135#ifdef ZSTD_MULTITHREAD
136 return 1;
137#else
138 return 0;
139#endif
140}
141
142/* ***********************************
143 * Generic IoPool implementation
144 *************************************/
145
146static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
147 IOJob_t* const job = (IOJob_t*) malloc(sizeof(IOJob_t));
148 void* const buffer = malloc(bufferSize);
149 if(!job || !buffer)
150 EXM_THROW(101, "Allocation error : not enough memory");
151 job->buffer = buffer;
152 job->bufferSize = bufferSize;
153 job->usedBufferSize = 0;
154 job->file = NULL;
155 job->ctx = ctx;
156 job->offset = 0;
157 return job;
158}
159
160
161/* AIO_IOPool_createThreadPool:
162 * Creates a thread pool and a mutex for threaded IO pool.
163 * Displays warning if asyncio is requested but MT isn't available. */
164static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {
165 ctx->threadPool = NULL;
166 ctx->threadPoolActive = 0;
167 if(prefs->asyncIO) {
168 if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
169 EXM_THROW(102,"Failed creating ioJobsMutex mutex");
170 /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
171 * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
172 assert(MAX_IO_JOBS >= 2);
173 ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
174 ctx->threadPoolActive = 1;
175 if (!ctx->threadPool)
176 EXM_THROW(104, "Failed creating I/O thread pool");
177 }
178}
179
180/* AIO_IOPool_init:
181 * Allocates and sets and a new I/O thread pool including its included availableJobs. */
182static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {
183 int i;
184 AIO_IOPool_createThreadPool(ctx, prefs);
185 ctx->prefs = prefs;
186 ctx->poolFunction = poolFunction;
187 ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2;
188 ctx->availableJobsCount = ctx->totalIoJobs;
189 for(i=0; i < ctx->availableJobsCount; i++) {
190 ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize);
191 }
192 ctx->jobBufferSize = bufferSize;
193 ctx->file = NULL;
194}
195
196
197/* AIO_IOPool_threadPoolActive:
198 * Check if current operation uses thread pool.
199 * Note that in some cases we have a thread pool initialized but choose not to use it. */
200static int AIO_IOPool_threadPoolActive(IOPoolCtx_t* ctx) {
201 return ctx->threadPool && ctx->threadPoolActive;
202}
203
204
205/* AIO_IOPool_lockJobsMutex:
206 * Locks the IO jobs mutex if threading is active */
207static void AIO_IOPool_lockJobsMutex(IOPoolCtx_t* ctx) {
208 if(AIO_IOPool_threadPoolActive(ctx))
209 ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
210}
211
212/* AIO_IOPool_unlockJobsMutex:
213 * Unlocks the IO jobs mutex if threading is active */
214static void AIO_IOPool_unlockJobsMutex(IOPoolCtx_t* ctx) {
215 if(AIO_IOPool_threadPoolActive(ctx))
216 ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
217}
218
219/* AIO_IOPool_releaseIoJob:
220 * Releases an acquired job back to the pool. Doesn't execute the job. */
221static void AIO_IOPool_releaseIoJob(IOJob_t* job) {
222 IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
223 AIO_IOPool_lockJobsMutex(ctx);
224 assert(ctx->availableJobsCount < ctx->totalIoJobs);
225 ctx->availableJobs[ctx->availableJobsCount++] = job;
226 AIO_IOPool_unlockJobsMutex(ctx);
227}
228
229/* AIO_IOPool_join:
230 * Waits for all tasks in the pool to finish executing. */
231static void AIO_IOPool_join(IOPoolCtx_t* ctx) {
232 if(AIO_IOPool_threadPoolActive(ctx))
233 POOL_joinJobs(ctx->threadPool);
234}
235
236/* AIO_IOPool_setThreaded:
237 * Allows (de)activating threaded mode, to be used when the expected overhead
238 * of threading costs more than the expected gains. */
239static void AIO_IOPool_setThreaded(IOPoolCtx_t* ctx, int threaded) {
240 assert(threaded == 0 || threaded == 1);
241 assert(ctx != NULL);
242 if(ctx->threadPoolActive != threaded) {
243 AIO_IOPool_join(ctx);
244 ctx->threadPoolActive = threaded;
245 }
246}
247
248/* AIO_IOPool_free:
249 * Release a previously allocated IO thread pool. Makes sure all tasks are done and released. */
250static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
251 int i;
252 if(ctx->threadPool) {
253 /* Make sure we finish all tasks and then free the resources */
254 AIO_IOPool_join(ctx);
255 /* Make sure we are not leaking availableJobs */
256 assert(ctx->availableJobsCount == ctx->totalIoJobs);
257 POOL_free(ctx->threadPool);
258 ZSTD_pthread_mutex_destroy(&ctx->ioJobsMutex);
259 }
260 assert(ctx->file == NULL);
261 for(i=0; i<ctx->availableJobsCount; i++) {
262 IOJob_t* job = (IOJob_t*) ctx->availableJobs[i];
263 free(job->buffer);
264 free(job);
265 }
266}
267
268/* AIO_IOPool_acquireJob:
269 * Returns an available io job to be used for a future io. */
270static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
271 IOJob_t *job;
272 assert(ctx->file != NULL || ctx->prefs->testMode);
273 AIO_IOPool_lockJobsMutex(ctx);
274 assert(ctx->availableJobsCount > 0);
275 job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
276 AIO_IOPool_unlockJobsMutex(ctx);
277 job->usedBufferSize = 0;
278 job->file = ctx->file;
279 job->offset = 0;
280 return job;
281}
282
283
284/* AIO_IOPool_setFile:
285 * Sets the destination file for future files in the pool.
286 * Requires completion of all queued jobs and release of all otherwise acquired jobs. */
287static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) {
288 assert(ctx!=NULL);
289 AIO_IOPool_join(ctx);
290 assert(ctx->availableJobsCount == ctx->totalIoJobs);
291 ctx->file = file;
292}
293
294static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) {
295 return ctx->file;
296}
297
298/* AIO_IOPool_enqueueJob:
299 * Enqueues an io job for execution.
300 * The queued job shouldn't be used directly after queueing it. */
301static void AIO_IOPool_enqueueJob(IOJob_t* job) {
302 IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
303 if(AIO_IOPool_threadPoolActive(ctx))
304 POOL_add(ctx->threadPool, ctx->poolFunction, job);
305 else
306 ctx->poolFunction(job);
307}
308
309/* ***********************************
310 * WritePool implementation
311 *************************************/
312
313/* AIO_WritePool_acquireJob:
314 * Returns an available write job to be used for a future write. */
315IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) {
316 return AIO_IOPool_acquireJob(&ctx->base);
317}
318
319/* AIO_WritePool_enqueueAndReacquireWriteJob:
320 * Queues a write job for execution and acquires a new one.
321 * After execution `job`'s pointed value would change to the newly acquired job.
322 * Make sure to set `usedBufferSize` to the wanted length before call.
323 * The queued job shouldn't be used directly after queueing it. */
324void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
325 AIO_IOPool_enqueueJob(*job);
326 *job = AIO_IOPool_acquireJob((IOPoolCtx_t *)(*job)->ctx);
327}
328
329/* AIO_WritePool_sparseWriteEnd:
330 * Ends sparse writes to the current file.
331 * Blocks on completion of all current write jobs before executing. */
332void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) {
333 assert(ctx != NULL);
334 AIO_IOPool_join(&ctx->base);
335 AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips);
336 ctx->storedSkips = 0;
337}
338
339/* AIO_WritePool_setFile:
340 * Sets the destination file for future writes in the pool.
341 * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
342 * Also requires ending of sparse write if a previous file was used in sparse mode. */
343void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) {
344 AIO_IOPool_setFile(&ctx->base, file);
345 assert(ctx->storedSkips == 0);
346}
347
348/* AIO_WritePool_getFile:
349 * Returns the file the writePool is currently set to write to. */
350FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) {
351 return AIO_IOPool_getFile(&ctx->base);
352}
353
354/* AIO_WritePool_releaseIoJob:
355 * Releases an acquired job back to the pool. Doesn't execute the job. */
356void AIO_WritePool_releaseIoJob(IOJob_t* job) {
357 AIO_IOPool_releaseIoJob(job);
358}
359
360/* AIO_WritePool_closeFile:
361 * Ends sparse write and closes the writePool's current file and sets the file to NULL.
362 * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */
363int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) {
364 FILE* const dstFile = ctx->base.file;
365 assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);
366 AIO_WritePool_sparseWriteEnd(ctx);
367 AIO_IOPool_setFile(&ctx->base, NULL);
368 return fclose(dstFile);
369}
370
371/* AIO_WritePool_executeWriteJob:
372 * Executes a write job synchronously. Can be used as a function for a thread pool. */
373static void AIO_WritePool_executeWriteJob(void* opaque){
374 IOJob_t* const job = (IOJob_t*) opaque;
375 WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx;
376 ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
377 AIO_IOPool_releaseIoJob(job);
378}
379
380/* AIO_WritePool_create:
381 * Allocates and sets and a new write pool including its included jobs. */
382WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
383 WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
384 if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
385 AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
386 ctx->storedSkips = 0;
387 return ctx;
388}
389
390/* AIO_WritePool_free:
391 * Frees and releases a writePool and its resources. Closes destination file if needs to. */
392void AIO_WritePool_free(WritePoolCtx_t* ctx) {
393 /* Make sure we finish all tasks and then free the resources */
394 if(AIO_WritePool_getFile(ctx))
395 AIO_WritePool_closeFile(ctx);
396 AIO_IOPool_destroy(&ctx->base);
397 assert(ctx->storedSkips==0);
398 free(ctx);
399}
400
401/* AIO_WritePool_setAsync:
402 * Allows (de)activating async mode, to be used when the expected overhead
403 * of asyncio costs more than the expected gains. */
404void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) {
405 AIO_IOPool_setThreaded(&ctx->base, async);
406}
407
408
409/* ***********************************
410 * ReadPool implementation
411 *************************************/
412static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
413 int i;
414 for(i=0; i<ctx->completedJobsCount; i++) {
415 IOJob_t* job = (IOJob_t*) ctx->completedJobs[i];
416 AIO_IOPool_releaseIoJob(job);
417 }
418 ctx->completedJobsCount = 0;
419}
420
421static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
422 ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
423 AIO_IOPool_lockJobsMutex(&ctx->base);
424 assert(ctx->completedJobsCount < MAX_IO_JOBS);
425 ctx->completedJobs[ctx->completedJobsCount++] = job;
426 if(AIO_IOPool_threadPoolActive(&ctx->base)) {
427 ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
428 }
429 AIO_IOPool_unlockJobsMutex(&ctx->base);
430}
431
432/* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
433 * Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
434 * if job wasn't found returns NULL.
435 * IMPORTANT: assumes ioJobsMutex is locked. */
436static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {
437 IOJob_t *job = NULL;
438 int i;
439 /* This implementation goes through all completed jobs and looks for the one matching the next offset.
440 * While not strictly needed for a single threaded reader implementation (as in such a case we could expect
441 * reads to be completed in order) this implementation was chosen as it better fits other asyncio
442 * interfaces (such as io_uring) that do not provide promises regarding order of completion. */
443 for (i=0; i<ctx->completedJobsCount; i++) {
444 job = (IOJob_t *) ctx->completedJobs[i];
445 if (job->offset == ctx->waitingOnOffset) {
446 ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];
447 return job;
448 }
449 }
450 return NULL;
451}
452
453/* AIO_ReadPool_numReadsInFlight:
454 * Returns the number of IO read jobs currently in flight. */
455static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {
456 const size_t jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1);
457 return ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld);
458}
459
460/* AIO_ReadPool_getNextCompletedJob:
461 * Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
462 * Would block. */
463static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
464 IOJob_t *job = NULL;
465 AIO_IOPool_lockJobsMutex(&ctx->base);
466
467 job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
468
469 /* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */
470 while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {
471 assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */
472 ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);
473 job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
474 }
475
476 if(job) {
477 assert(job->offset == ctx->waitingOnOffset);
478 ctx->waitingOnOffset += job->usedBufferSize;
479 }
480
481 AIO_IOPool_unlockJobsMutex(&ctx->base);
482 return job;
483}
484
485
486/* AIO_ReadPool_executeReadJob:
487 * Executes a read job synchronously. Can be used as a function for a thread pool. */
488static void AIO_ReadPool_executeReadJob(void* opaque){
489 IOJob_t* const job = (IOJob_t*) opaque;
490 ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
491 if(ctx->reachedEof) {
492 job->usedBufferSize = 0;
493 AIO_ReadPool_addJobToCompleted(job);
494 return;
495 }
496 job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file);
497 if(job->usedBufferSize < job->bufferSize) {
498 if(ferror(job->file)) {
499 EXM_THROW(37, "Read error");
500 } else if(feof(job->file)) {
501 ctx->reachedEof = 1;
502 } else {
503 EXM_THROW(37, "Unexpected short read");
504 }
505 }
506 AIO_ReadPool_addJobToCompleted(job);
507}
508
509static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) {
510 IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base);
511 job->offset = ctx->nextReadOffset;
512 ctx->nextReadOffset += job->bufferSize;
513 AIO_IOPool_enqueueJob(job);
514}
515
516static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) {
517 int i;
518 for (i = 0; i < ctx->base.availableJobsCount; i++) {
519 AIO_ReadPool_enqueueRead(ctx);
520 }
521}
522
523/* AIO_ReadPool_setFile:
524 * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
525 * Waits for all current enqueued tasks to complete if a previous file was set. */
526void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) {
527 assert(ctx!=NULL);
528 AIO_IOPool_join(&ctx->base);
529 AIO_ReadPool_releaseAllCompletedJobs(ctx);
530 if (ctx->currentJobHeld) {
531 AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
532 ctx->currentJobHeld = NULL;
533 }
534 AIO_IOPool_setFile(&ctx->base, file);
535 ctx->nextReadOffset = 0;
536 ctx->waitingOnOffset = 0;
537 ctx->srcBuffer = ctx->coalesceBuffer;
538 ctx->srcBufferLoaded = 0;
539 ctx->reachedEof = 0;
540 if(file != NULL)
541 AIO_ReadPool_startReading(ctx);
542}
543
544/* AIO_ReadPool_create:
545 * Allocates and sets and a new readPool including its included jobs.
546 * bufferSize should be set to the maximal buffer we want to read at a time, will also be used
547 * as our basic read size. */
548ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
549 ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t));
550 if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
551 AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize);
552
553 ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2);
554 ctx->srcBuffer = ctx->coalesceBuffer;
555 ctx->srcBufferLoaded = 0;
556 ctx->completedJobsCount = 0;
557 ctx->currentJobHeld = NULL;
558
559 if(ctx->base.threadPool)
560 if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))
561 EXM_THROW(103,"Failed creating jobCompletedCond cond");
562
563 return ctx;
564}
565
566/* AIO_ReadPool_free:
567 * Frees and releases a readPool and its resources. Closes source file. */
568void AIO_ReadPool_free(ReadPoolCtx_t* ctx) {
569 if(AIO_ReadPool_getFile(ctx))
570 AIO_ReadPool_closeFile(ctx);
571 if(ctx->base.threadPool)
572 ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond);
573 AIO_IOPool_destroy(&ctx->base);
574 free(ctx->coalesceBuffer);
575 free(ctx);
576}
577
578/* AIO_ReadPool_consumeBytes:
579 * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
580void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) {
581 assert(n <= ctx->srcBufferLoaded);
582 ctx->srcBufferLoaded -= n;
583 ctx->srcBuffer += n;
584}
585
586/* AIO_ReadPool_releaseCurrentlyHeldAndGetNext:
587 * Release the current held job and get the next one, returns NULL if no next job available. */
588static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) {
589 if (ctx->currentJobHeld) {
590 AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
591 ctx->currentJobHeld = NULL;
592 AIO_ReadPool_enqueueRead(ctx);
593 }
594 ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx);
595 return (IOJob_t*) ctx->currentJobHeld;
596}
597
598/* AIO_ReadPool_fillBuffer:
599 * Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller).
600 * Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file.
601 * Return value is the number of bytes added to the buffer.
602 * Note that srcBuffer might have up to 2 times jobBufferSize bytes. */
603size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) {
604 IOJob_t *job;
605 int useCoalesce = 0;
606 if(n > ctx->base.jobBufferSize)
607 n = ctx->base.jobBufferSize;
608
609 /* We are good, don't read anything */
610 if (ctx->srcBufferLoaded >= n)
611 return 0;
612
613 /* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job
614 * and coalesce the remaining bytes with the next job's buffer */
615 if (ctx->srcBufferLoaded > 0) {
616 useCoalesce = 1;
617 memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded);
618 ctx->srcBuffer = ctx->coalesceBuffer;
619 }
620
621 /* Read the next chunk */
622 job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx);
623 if(!job)
624 return 0;
625 if(useCoalesce) {
626 assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize);
627 memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize);
628 ctx->srcBufferLoaded += job->usedBufferSize;
629 }
630 else {
631 ctx->srcBuffer = (U8 *) job->buffer;
632 ctx->srcBufferLoaded = job->usedBufferSize;
633 }
634 return job->usedBufferSize;
635}
636
637/* AIO_ReadPool_consumeAndRefill:
638 * Consumes the current buffer and refills it with bufferSize bytes. */
639size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) {
640 AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded);
641 return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize);
642}
643
644/* AIO_ReadPool_getFile:
645 * Returns the current file set for the read pool. */
646FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) {
647 return AIO_IOPool_getFile(&ctx->base);
648}
649
650/* AIO_ReadPool_closeFile:
651 * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
652int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) {
653 FILE* const file = AIO_ReadPool_getFile(ctx);
654 AIO_ReadPool_setFile(ctx, NULL);
655 return fclose(file);
656}
657
658/* AIO_ReadPool_setAsync:
659 * Allows (de)activating async mode, to be used when the expected overhead
660 * of asyncio costs more than the expected gains. */
661void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async) {
662 AIO_IOPool_setThreaded(&ctx->base, async);
663}