git subrepo pull (merge) --force deps/libchdr
[pcsx_rearmed.git] / deps / libchdr / deps / zstd-1.5.5 / programs / fileio_asyncio.c
1 /*
2  * Copyright (c) Meta Platforms, Inc. and affiliates.
3  * All rights reserved.
4  *
5  * This source code is licensed under both the BSD-style license (found in the
6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7  * in the COPYING file in the root directory of this source tree).
8  * You may select, at your option, one of the above-listed licenses.
9  */
10
11 #include "platform.h"
12 #include <stdio.h>      /* fprintf, open, fdopen, fread, _fileno, stdin, stdout */
13 #include <stdlib.h>     /* malloc, free */
14 #include <assert.h>
15 #include <errno.h>      /* errno */
16
17 #if defined (_MSC_VER)
18 #  include <sys/stat.h>
19 #  include <io.h>
20 #endif
21
22 #include "fileio_asyncio.h"
23 #include "fileio_common.h"
24
25 /* **********************************************************************
26  *  Sparse write
27  ************************************************************************/
28
29 /** AIO_fwriteSparse() :
30 *  @return : storedSkips,
31 *            argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
32 static unsigned
33 AIO_fwriteSparse(FILE* file,
34                  const void* buffer, size_t bufferSize,
35                  const FIO_prefs_t* const prefs,
36                  unsigned storedSkips)
37 {
38     const size_t* const bufferT = (const size_t*)buffer;   /* Buffer is supposed malloc'ed, hence aligned on size_t */
39     size_t bufferSizeT = bufferSize / sizeof(size_t);
40     const size_t* const bufferTEnd = bufferT + bufferSizeT;
41     const size_t* ptrT = bufferT;
42     static const size_t segmentSizeT = (32 KB) / sizeof(size_t);   /* check every 32 KB */
43
44     if (prefs->testMode) return 0;  /* do not output anything in test mode */
45
46     if (!prefs->sparseFileSupport) {  /* normal write */
47         size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
48         if (sizeCheck != bufferSize)
49             EXM_THROW(70, "Write error : cannot write block : %s",
50                       strerror(errno));
51         return 0;
52     }
53
54     /* avoid int overflow */
55     if (storedSkips > 1 GB) {
56         if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0)
57         EXM_THROW(91, "1 GB skip error (sparse file support)");
58         storedSkips -= 1 GB;
59     }
60
61     while (ptrT < bufferTEnd) {
62         size_t nb0T;
63
64         /* adjust last segment if < 32 KB */
65         size_t seg0SizeT = segmentSizeT;
66         if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT;
67         bufferSizeT -= seg0SizeT;
68
69         /* count leading zeroes */
70         for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ;
71         storedSkips += (unsigned)(nb0T * sizeof(size_t));
72
73         if (nb0T != seg0SizeT) {   /* not all 0s */
74             size_t const nbNon0ST = seg0SizeT - nb0T;
75             /* skip leading zeros */
76             if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
77                 EXM_THROW(92, "Sparse skip error ; try --no-sparse");
78             storedSkips = 0;
79             /* write the rest */
80             if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
81                 EXM_THROW(93, "Write error : cannot write block : %s",
82                           strerror(errno));
83         }
84         ptrT += seg0SizeT;
85     }
86
87     {   static size_t const maskT = sizeof(size_t)-1;
88         if (bufferSize & maskT) {
89             /* size not multiple of sizeof(size_t) : implies end of block */
90             const char* const restStart = (const char*)bufferTEnd;
91             const char* restPtr = restStart;
92             const char* const restEnd = (const char*)buffer + bufferSize;
93             assert(restEnd > restStart && restEnd < restStart + sizeof(size_t));
94             for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ;
95             storedSkips += (unsigned) (restPtr - restStart);
96             if (restPtr != restEnd) {
97                 /* not all remaining bytes are 0 */
98                 size_t const restSize = (size_t)(restEnd - restPtr);
99                 if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
100                     EXM_THROW(92, "Sparse skip error ; try --no-sparse");
101                 if (fwrite(restPtr, 1, restSize, file) != restSize)
102                     EXM_THROW(95, "Write error : cannot write end of decoded block : %s",
103                               strerror(errno));
104                 storedSkips = 0;
105             }   }   }
106
107     return storedSkips;
108 }
109
110 static void
111 AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
112 {
113     if (prefs->testMode) assert(storedSkips == 0);
114     if (storedSkips>0) {
115         assert(prefs->sparseFileSupport > 0);  /* storedSkips>0 implies sparse support is enabled */
116         (void)prefs;   /* assert can be disabled, in which case prefs becomes unused */
117         if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0)
118             EXM_THROW(69, "Final skip error (sparse file support)");
119         /* last zero must be explicitly written,
120          * so that skipped ones get implicitly translated as zero by FS */
121         {   const char lastZeroByte[1] = { 0 };
122             if (fwrite(lastZeroByte, 1, 1, file) != 1)
123                 EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno));
124         }   }
125 }
126
127
128 /* **********************************************************************
129  *  AsyncIO functionality
130  ************************************************************************/
131
132 /* AIO_supported:
133  * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
134 int AIO_supported(void) {
135 #ifdef ZSTD_MULTITHREAD
136     return 1;
137 #else
138     return 0;
139 #endif
140 }
141
142 /* ***********************************
143  *  Generic IoPool implementation
144  *************************************/
145
146 static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
147     IOJob_t* const job  = (IOJob_t*) malloc(sizeof(IOJob_t));
148     void* const buffer = malloc(bufferSize);
149     if(!job || !buffer)
150         EXM_THROW(101, "Allocation error : not enough memory");
151     job->buffer = buffer;
152     job->bufferSize = bufferSize;
153     job->usedBufferSize = 0;
154     job->file = NULL;
155     job->ctx = ctx;
156     job->offset = 0;
157     return job;
158 }
159
160
161 /* AIO_IOPool_createThreadPool:
162  * Creates a thread pool and a mutex for threaded IO pool.
163  * Displays warning if asyncio is requested but MT isn't available. */
164 static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {
165     ctx->threadPool = NULL;
166     ctx->threadPoolActive = 0;
167     if(prefs->asyncIO) {
168         if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
169             EXM_THROW(102,"Failed creating ioJobsMutex mutex");
170         /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
171          * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
172         assert(MAX_IO_JOBS >= 2);
173         ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
174         ctx->threadPoolActive = 1;
175         if (!ctx->threadPool)
176             EXM_THROW(104, "Failed creating I/O thread pool");
177     }
178 }
179
180 /* AIO_IOPool_init:
181  * Allocates and sets and a new I/O thread pool including its included availableJobs. */
182 static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {
183     int i;
184     AIO_IOPool_createThreadPool(ctx, prefs);
185     ctx->prefs = prefs;
186     ctx->poolFunction = poolFunction;
187     ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2;
188     ctx->availableJobsCount = ctx->totalIoJobs;
189     for(i=0; i < ctx->availableJobsCount; i++) {
190         ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize);
191     }
192     ctx->jobBufferSize = bufferSize;
193     ctx->file = NULL;
194 }
195
196
197 /* AIO_IOPool_threadPoolActive:
198  * Check if current operation uses thread pool.
199  * Note that in some cases we have a thread pool initialized but choose not to use it. */
200 static int AIO_IOPool_threadPoolActive(IOPoolCtx_t* ctx) {
201     return ctx->threadPool && ctx->threadPoolActive;
202 }
203
204
205 /* AIO_IOPool_lockJobsMutex:
206  * Locks the IO jobs mutex if threading is active */
207 static void AIO_IOPool_lockJobsMutex(IOPoolCtx_t* ctx) {
208     if(AIO_IOPool_threadPoolActive(ctx))
209         ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
210 }
211
212 /* AIO_IOPool_unlockJobsMutex:
213  * Unlocks the IO jobs mutex if threading is active */
214 static void AIO_IOPool_unlockJobsMutex(IOPoolCtx_t* ctx) {
215     if(AIO_IOPool_threadPoolActive(ctx))
216         ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
217 }
218
219 /* AIO_IOPool_releaseIoJob:
220  * Releases an acquired job back to the pool. Doesn't execute the job. */
221 static void AIO_IOPool_releaseIoJob(IOJob_t* job) {
222     IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
223     AIO_IOPool_lockJobsMutex(ctx);
224     assert(ctx->availableJobsCount < ctx->totalIoJobs);
225     ctx->availableJobs[ctx->availableJobsCount++] = job;
226     AIO_IOPool_unlockJobsMutex(ctx);
227 }
228
229 /* AIO_IOPool_join:
230  * Waits for all tasks in the pool to finish executing. */
231 static void AIO_IOPool_join(IOPoolCtx_t* ctx) {
232     if(AIO_IOPool_threadPoolActive(ctx))
233         POOL_joinJobs(ctx->threadPool);
234 }
235
236 /* AIO_IOPool_setThreaded:
237  * Allows (de)activating threaded mode, to be used when the expected overhead
238  * of threading costs more than the expected gains. */
239 static void AIO_IOPool_setThreaded(IOPoolCtx_t* ctx, int threaded) {
240     assert(threaded == 0 || threaded == 1);
241     assert(ctx != NULL);
242     if(ctx->threadPoolActive != threaded) {
243         AIO_IOPool_join(ctx);
244         ctx->threadPoolActive = threaded;
245     }
246 }
247
248 /* AIO_IOPool_free:
249  * Release a previously allocated IO thread pool. Makes sure all tasks are done and released. */
250 static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
251     int i;
252     if(ctx->threadPool) {
253         /* Make sure we finish all tasks and then free the resources */
254         AIO_IOPool_join(ctx);
255         /* Make sure we are not leaking availableJobs */
256         assert(ctx->availableJobsCount == ctx->totalIoJobs);
257         POOL_free(ctx->threadPool);
258         ZSTD_pthread_mutex_destroy(&ctx->ioJobsMutex);
259     }
260     assert(ctx->file == NULL);
261     for(i=0; i<ctx->availableJobsCount; i++) {
262         IOJob_t* job = (IOJob_t*) ctx->availableJobs[i];
263         free(job->buffer);
264         free(job);
265     }
266 }
267
268 /* AIO_IOPool_acquireJob:
269  * Returns an available io job to be used for a future io. */
270 static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
271     IOJob_t *job;
272     assert(ctx->file != NULL || ctx->prefs->testMode);
273     AIO_IOPool_lockJobsMutex(ctx);
274     assert(ctx->availableJobsCount > 0);
275     job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
276     AIO_IOPool_unlockJobsMutex(ctx);
277     job->usedBufferSize = 0;
278     job->file = ctx->file;
279     job->offset = 0;
280     return job;
281 }
282
283
284 /* AIO_IOPool_setFile:
285  * Sets the destination file for future files in the pool.
286  * Requires completion of all queued jobs and release of all otherwise acquired jobs. */
287 static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) {
288     assert(ctx!=NULL);
289     AIO_IOPool_join(ctx);
290     assert(ctx->availableJobsCount == ctx->totalIoJobs);
291     ctx->file = file;
292 }
293
294 static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) {
295     return ctx->file;
296 }
297
298 /* AIO_IOPool_enqueueJob:
299  * Enqueues an io job for execution.
300  * The queued job shouldn't be used directly after queueing it. */
301 static void AIO_IOPool_enqueueJob(IOJob_t* job) {
302     IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
303     if(AIO_IOPool_threadPoolActive(ctx))
304         POOL_add(ctx->threadPool, ctx->poolFunction, job);
305     else
306         ctx->poolFunction(job);
307 }
308
309 /* ***********************************
310  *  WritePool implementation
311  *************************************/
312
313 /* AIO_WritePool_acquireJob:
314  * Returns an available write job to be used for a future write. */
315 IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) {
316     return AIO_IOPool_acquireJob(&ctx->base);
317 }
318
319 /* AIO_WritePool_enqueueAndReacquireWriteJob:
320  * Queues a write job for execution and acquires a new one.
321  * After execution `job`'s pointed value would change to the newly acquired job.
322  * Make sure to set `usedBufferSize` to the wanted length before call.
323  * The queued job shouldn't be used directly after queueing it. */
324 void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
325     AIO_IOPool_enqueueJob(*job);
326     *job = AIO_IOPool_acquireJob((IOPoolCtx_t *)(*job)->ctx);
327 }
328
329 /* AIO_WritePool_sparseWriteEnd:
330  * Ends sparse writes to the current file.
331  * Blocks on completion of all current write jobs before executing. */
332 void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) {
333     assert(ctx != NULL);
334     AIO_IOPool_join(&ctx->base);
335     AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips);
336     ctx->storedSkips = 0;
337 }
338
339 /* AIO_WritePool_setFile:
340  * Sets the destination file for future writes in the pool.
341  * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
342  * Also requires ending of sparse write if a previous file was used in sparse mode. */
343 void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) {
344     AIO_IOPool_setFile(&ctx->base, file);
345     assert(ctx->storedSkips == 0);
346 }
347
348 /* AIO_WritePool_getFile:
349  * Returns the file the writePool is currently set to write to. */
350 FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) {
351     return AIO_IOPool_getFile(&ctx->base);
352 }
353
354 /* AIO_WritePool_releaseIoJob:
355  * Releases an acquired job back to the pool. Doesn't execute the job. */
356 void AIO_WritePool_releaseIoJob(IOJob_t* job) {
357     AIO_IOPool_releaseIoJob(job);
358 }
359
360 /* AIO_WritePool_closeFile:
361  * Ends sparse write and closes the writePool's current file and sets the file to NULL.
362  * Requires completion of all queues write jobs and release of all otherwise acquired jobs.  */
363 int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) {
364     FILE* const dstFile = ctx->base.file;
365     assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);
366     AIO_WritePool_sparseWriteEnd(ctx);
367     AIO_IOPool_setFile(&ctx->base, NULL);
368     return fclose(dstFile);
369 }
370
371 /* AIO_WritePool_executeWriteJob:
372  * Executes a write job synchronously. Can be used as a function for a thread pool. */
373 static void AIO_WritePool_executeWriteJob(void* opaque){
374     IOJob_t* const job = (IOJob_t*) opaque;
375     WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx;
376     ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
377     AIO_IOPool_releaseIoJob(job);
378 }
379
380 /* AIO_WritePool_create:
381  * Allocates and sets and a new write pool including its included jobs. */
382 WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
383     WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
384     if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
385     AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
386     ctx->storedSkips = 0;
387     return ctx;
388 }
389
390 /* AIO_WritePool_free:
391  * Frees and releases a writePool and its resources. Closes destination file if needs to. */
392 void AIO_WritePool_free(WritePoolCtx_t* ctx) {
393     /* Make sure we finish all tasks and then free the resources */
394     if(AIO_WritePool_getFile(ctx))
395         AIO_WritePool_closeFile(ctx);
396     AIO_IOPool_destroy(&ctx->base);
397     assert(ctx->storedSkips==0);
398     free(ctx);
399 }
400
401 /* AIO_WritePool_setAsync:
402  * Allows (de)activating async mode, to be used when the expected overhead
403  * of asyncio costs more than the expected gains. */
404 void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) {
405     AIO_IOPool_setThreaded(&ctx->base, async);
406 }
407
408
409 /* ***********************************
410  *  ReadPool implementation
411  *************************************/
412 static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
413     int i;
414     for(i=0; i<ctx->completedJobsCount; i++) {
415         IOJob_t* job = (IOJob_t*) ctx->completedJobs[i];
416         AIO_IOPool_releaseIoJob(job);
417     }
418     ctx->completedJobsCount = 0;
419 }
420
421 static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
422     ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
423     AIO_IOPool_lockJobsMutex(&ctx->base);
424     assert(ctx->completedJobsCount < MAX_IO_JOBS);
425     ctx->completedJobs[ctx->completedJobsCount++] = job;
426     if(AIO_IOPool_threadPoolActive(&ctx->base)) {
427         ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
428     }
429     AIO_IOPool_unlockJobsMutex(&ctx->base);
430 }
431
432 /* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
433  * Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
434  * if job wasn't found returns NULL.
435  * IMPORTANT: assumes ioJobsMutex is locked. */
436 static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {
437     IOJob_t *job = NULL;
438     int i;
439     /* This implementation goes through all completed jobs and looks for the one matching the next offset.
440      * While not strictly needed for a single threaded reader implementation (as in such a case we could expect
441      * reads to be completed in order) this implementation was chosen as it better fits other asyncio
442      * interfaces (such as io_uring) that do not provide promises regarding order of completion. */
443     for (i=0; i<ctx->completedJobsCount; i++) {
444         job = (IOJob_t *) ctx->completedJobs[i];
445         if (job->offset == ctx->waitingOnOffset) {
446             ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];
447             return job;
448         }
449     }
450     return NULL;
451 }
452
453 /* AIO_ReadPool_numReadsInFlight:
454  * Returns the number of IO read jobs currently in flight. */
455 static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {
456     const size_t jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1);
457     return ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld);
458 }
459
460 /* AIO_ReadPool_getNextCompletedJob:
461  * Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
462  * Would block. */
463 static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
464     IOJob_t *job = NULL;
465     AIO_IOPool_lockJobsMutex(&ctx->base);
466
467     job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
468
469     /* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */
470     while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {
471         assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */
472         ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);
473         job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
474     }
475
476     if(job) {
477         assert(job->offset == ctx->waitingOnOffset);
478         ctx->waitingOnOffset += job->usedBufferSize;
479     }
480
481     AIO_IOPool_unlockJobsMutex(&ctx->base);
482     return job;
483 }
484
485
486 /* AIO_ReadPool_executeReadJob:
487  * Executes a read job synchronously. Can be used as a function for a thread pool. */
488 static void AIO_ReadPool_executeReadJob(void* opaque){
489     IOJob_t* const job = (IOJob_t*) opaque;
490     ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
491     if(ctx->reachedEof) {
492         job->usedBufferSize = 0;
493         AIO_ReadPool_addJobToCompleted(job);
494         return;
495     }
496     job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file);
497     if(job->usedBufferSize < job->bufferSize) {
498         if(ferror(job->file)) {
499             EXM_THROW(37, "Read error");
500         } else if(feof(job->file)) {
501             ctx->reachedEof = 1;
502         } else {
503             EXM_THROW(37, "Unexpected short read");
504         }
505     }
506     AIO_ReadPool_addJobToCompleted(job);
507 }
508
509 static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) {
510     IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base);
511     job->offset = ctx->nextReadOffset;
512     ctx->nextReadOffset += job->bufferSize;
513     AIO_IOPool_enqueueJob(job);
514 }
515
516 static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) {
517     int i;
518     for (i = 0; i < ctx->base.availableJobsCount; i++) {
519         AIO_ReadPool_enqueueRead(ctx);
520     }
521 }
522
523 /* AIO_ReadPool_setFile:
524  * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
525  * Waits for all current enqueued tasks to complete if a previous file was set. */
526 void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) {
527     assert(ctx!=NULL);
528     AIO_IOPool_join(&ctx->base);
529     AIO_ReadPool_releaseAllCompletedJobs(ctx);
530     if (ctx->currentJobHeld) {
531         AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
532         ctx->currentJobHeld = NULL;
533     }
534     AIO_IOPool_setFile(&ctx->base, file);
535     ctx->nextReadOffset = 0;
536     ctx->waitingOnOffset = 0;
537     ctx->srcBuffer = ctx->coalesceBuffer;
538     ctx->srcBufferLoaded = 0;
539     ctx->reachedEof = 0;
540     if(file != NULL)
541         AIO_ReadPool_startReading(ctx);
542 }
543
544 /* AIO_ReadPool_create:
545  * Allocates and sets and a new readPool including its included jobs.
546  * bufferSize should be set to the maximal buffer we want to read at a time, will also be used
547  * as our basic read size. */
548 ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
549     ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t));
550     if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
551     AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize);
552
553     ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2);
554     ctx->srcBuffer = ctx->coalesceBuffer;
555     ctx->srcBufferLoaded = 0;
556     ctx->completedJobsCount = 0;
557     ctx->currentJobHeld = NULL;
558
559     if(ctx->base.threadPool)
560         if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))
561             EXM_THROW(103,"Failed creating jobCompletedCond cond");
562
563     return ctx;
564 }
565
566 /* AIO_ReadPool_free:
567  * Frees and releases a readPool and its resources. Closes source file. */
568 void AIO_ReadPool_free(ReadPoolCtx_t* ctx) {
569     if(AIO_ReadPool_getFile(ctx))
570         AIO_ReadPool_closeFile(ctx);
571     if(ctx->base.threadPool)
572         ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond);
573     AIO_IOPool_destroy(&ctx->base);
574     free(ctx->coalesceBuffer);
575     free(ctx);
576 }
577
578 /* AIO_ReadPool_consumeBytes:
579  * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
580 void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) {
581     assert(n <= ctx->srcBufferLoaded);
582     ctx->srcBufferLoaded -= n;
583     ctx->srcBuffer += n;
584 }
585
586 /* AIO_ReadPool_releaseCurrentlyHeldAndGetNext:
587  * Release the current held job and get the next one, returns NULL if no next job available. */
588 static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) {
589     if (ctx->currentJobHeld) {
590         AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
591         ctx->currentJobHeld = NULL;
592         AIO_ReadPool_enqueueRead(ctx);
593     }
594     ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx);
595     return (IOJob_t*) ctx->currentJobHeld;
596 }
597
598 /* AIO_ReadPool_fillBuffer:
599  * Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller).
600  * Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file.
601  * Return value is the number of bytes added to the buffer.
602  * Note that srcBuffer might have up to 2 times jobBufferSize bytes. */
603 size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) {
604     IOJob_t *job;
605     int useCoalesce = 0;
606     if(n > ctx->base.jobBufferSize)
607         n = ctx->base.jobBufferSize;
608
609     /* We are good, don't read anything */
610     if (ctx->srcBufferLoaded >= n)
611         return 0;
612
613     /* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job
614      * and coalesce the remaining bytes with the next job's buffer */
615     if (ctx->srcBufferLoaded > 0) {
616         useCoalesce = 1;
617         memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded);
618         ctx->srcBuffer = ctx->coalesceBuffer;
619     }
620
621     /* Read the next chunk */
622     job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx);
623     if(!job)
624         return 0;
625     if(useCoalesce) {
626         assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize);
627         memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize);
628         ctx->srcBufferLoaded += job->usedBufferSize;
629     }
630     else {
631         ctx->srcBuffer = (U8 *) job->buffer;
632         ctx->srcBufferLoaded = job->usedBufferSize;
633     }
634     return job->usedBufferSize;
635 }
636
637 /* AIO_ReadPool_consumeAndRefill:
638  * Consumes the current buffer and refills it with bufferSize bytes. */
639 size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) {
640     AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded);
641     return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize);
642 }
643
644 /* AIO_ReadPool_getFile:
645  * Returns the current file set for the read pool. */
646 FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) {
647     return AIO_IOPool_getFile(&ctx->base);
648 }
649
650 /* AIO_ReadPool_closeFile:
651  * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
652 int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) {
653     FILE* const file = AIO_ReadPool_getFile(ctx);
654     AIO_ReadPool_setFile(ctx, NULL);
655     return fclose(file);
656 }
657
658 /* AIO_ReadPool_setAsync:
659  * Allows (de)activating async mode, to be used when the expected overhead
660  * of asyncio costs more than the expected gains. */
661 void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async) {
662     AIO_IOPool_setThreaded(&ctx->base, async);
663 }