648db22b |
1 | /* |
2 | * Copyright (c) Meta Platforms, Inc. and affiliates. |
3 | * All rights reserved. |
4 | * |
5 | * This source code is licensed under both the BSD-style license (found in the |
6 | * LICENSE file in the root directory of this source tree) and the GPLv2 (found |
7 | * in the COPYING file in the root directory of this source tree). |
8 | * You may select, at your option, one of the above-listed licenses. |
9 | */ |
10 | |
11 | /* |
12 | * FileIO AsyncIO exposes read/write IO pools that allow doing IO asynchronously. |
13 | * Current implementation relies on having one thread that reads and one that |
14 | * writes. |
15 | * Each IO pool supports up to `MAX_IO_JOBS` that can be enqueued for work, but |
16 | * are performed serially by the appropriate worker thread. |
17 | * Most systems exposes better primitives to perform asynchronous IO, such as |
18 | * io_uring on newer linux systems. The API is built in such a way that in the |
19 | * future we could replace the threads with better solutions when available. |
20 | */ |
21 | |
22 | #ifndef ZSTD_FILEIO_ASYNCIO_H |
23 | #define ZSTD_FILEIO_ASYNCIO_H |
24 | |
25 | #if defined (__cplusplus) |
26 | extern "C" { |
27 | #endif |
28 | |
29 | #include "../lib/common/mem.h" /* U32, U64 */ |
30 | #include "fileio_types.h" |
31 | #include "platform.h" |
32 | #include "util.h" |
33 | #include "../lib/common/pool.h" |
34 | #include "../lib/common/threading.h" |
35 | |
36 | #define MAX_IO_JOBS (10) |
37 | |
38 | typedef struct { |
39 | /* These struct fields should be set only on creation and not changed afterwards */ |
40 | POOL_ctx* threadPool; |
41 | int threadPoolActive; |
42 | int totalIoJobs; |
43 | const FIO_prefs_t* prefs; |
44 | POOL_function poolFunction; |
45 | |
46 | /* Controls the file we currently write to, make changes only by using provided utility functions */ |
47 | FILE* file; |
48 | |
49 | /* The jobs and availableJobsCount fields are accessed by both the main and worker threads and should |
50 | * only be mutated after locking the mutex */ |
51 | ZSTD_pthread_mutex_t ioJobsMutex; |
52 | void* availableJobs[MAX_IO_JOBS]; |
53 | int availableJobsCount; |
54 | size_t jobBufferSize; |
55 | } IOPoolCtx_t; |
56 | |
57 | typedef struct { |
58 | IOPoolCtx_t base; |
59 | |
60 | /* State regarding the currently read file */ |
61 | int reachedEof; |
62 | U64 nextReadOffset; |
63 | U64 waitingOnOffset; |
64 | |
65 | /* We may hold an IOJob object as needed if we actively expose its buffer. */ |
66 | void *currentJobHeld; |
67 | |
68 | /* Coalesce buffer is used to join two buffers in case where we need to read more bytes than left in |
69 | * the first of them. Shouldn't be accessed from outside ot utility functions. */ |
70 | U8 *coalesceBuffer; |
71 | |
72 | /* Read buffer can be used by consumer code, take care when copying this pointer aside as it might |
73 | * change when consuming / refilling buffer. */ |
74 | U8 *srcBuffer; |
75 | size_t srcBufferLoaded; |
76 | |
77 | /* We need to know what tasks completed so we can use their buffers when their time comes. |
78 | * Should only be accessed after locking base.ioJobsMutex . */ |
79 | void* completedJobs[MAX_IO_JOBS]; |
80 | int completedJobsCount; |
81 | ZSTD_pthread_cond_t jobCompletedCond; |
82 | } ReadPoolCtx_t; |
83 | |
84 | typedef struct { |
85 | IOPoolCtx_t base; |
86 | unsigned storedSkips; |
87 | } WritePoolCtx_t; |
88 | |
89 | typedef struct { |
90 | /* These fields are automatically set and shouldn't be changed by non WritePool code. */ |
91 | void *ctx; |
92 | FILE* file; |
93 | void *buffer; |
94 | size_t bufferSize; |
95 | |
96 | /* This field should be changed before a job is queued for execution and should contain the number |
97 | * of bytes to write from the buffer. */ |
98 | size_t usedBufferSize; |
99 | U64 offset; |
100 | } IOJob_t; |
101 | |
102 | /* AIO_supported: |
103 | * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */ |
104 | int AIO_supported(void); |
105 | |
106 | |
107 | /* AIO_WritePool_releaseIoJob: |
108 | * Releases an acquired job back to the pool. Doesn't execute the job. */ |
109 | void AIO_WritePool_releaseIoJob(IOJob_t *job); |
110 | |
111 | /* AIO_WritePool_acquireJob: |
112 | * Returns an available write job to be used for a future write. */ |
113 | IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx); |
114 | |
115 | /* AIO_WritePool_enqueueAndReacquireWriteJob: |
116 | * Enqueues a write job for execution and acquires a new one. |
117 | * After execution `job`'s pointed value would change to the newly acquired job. |
118 | * Make sure to set `usedBufferSize` to the wanted length before call. |
119 | * The queued job shouldn't be used directly after queueing it. */ |
120 | void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job); |
121 | |
122 | /* AIO_WritePool_sparseWriteEnd: |
123 | * Ends sparse writes to the current file. |
124 | * Blocks on completion of all current write jobs before executing. */ |
125 | void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx); |
126 | |
127 | /* AIO_WritePool_setFile: |
128 | * Sets the destination file for future writes in the pool. |
129 | * Requires completion of all queues write jobs and release of all otherwise acquired jobs. |
130 | * Also requires ending of sparse write if a previous file was used in sparse mode. */ |
131 | void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file); |
132 | |
133 | /* AIO_WritePool_getFile: |
134 | * Returns the file the writePool is currently set to write to. */ |
135 | FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx); |
136 | |
137 | /* AIO_WritePool_closeFile: |
138 | * Ends sparse write and closes the writePool's current file and sets the file to NULL. |
139 | * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */ |
140 | int AIO_WritePool_closeFile(WritePoolCtx_t *ctx); |
141 | |
142 | /* AIO_WritePool_create: |
143 | * Allocates and sets and a new write pool including its included jobs. |
144 | * bufferSize should be set to the maximal buffer we want to write to at a time. */ |
145 | WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize); |
146 | |
147 | /* AIO_WritePool_free: |
148 | * Frees and releases a writePool and its resources. Closes destination file. */ |
149 | void AIO_WritePool_free(WritePoolCtx_t* ctx); |
150 | |
151 | /* AIO_WritePool_setAsync: |
152 | * Allows (de)activating async mode, to be used when the expected overhead |
153 | * of asyncio costs more than the expected gains. */ |
154 | void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async); |
155 | |
156 | /* AIO_ReadPool_create: |
157 | * Allocates and sets and a new readPool including its included jobs. |
158 | * bufferSize should be set to the maximal buffer we want to read at a time, will also be used |
159 | * as our basic read size. */ |
160 | ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize); |
161 | |
162 | /* AIO_ReadPool_free: |
163 | * Frees and releases a readPool and its resources. Closes source file. */ |
164 | void AIO_ReadPool_free(ReadPoolCtx_t* ctx); |
165 | |
166 | /* AIO_ReadPool_setAsync: |
167 | * Allows (de)activating async mode, to be used when the expected overhead |
168 | * of asyncio costs more than the expected gains. */ |
169 | void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async); |
170 | |
171 | /* AIO_ReadPool_consumeBytes: |
172 | * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */ |
173 | void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n); |
174 | |
175 | /* AIO_ReadPool_fillBuffer: |
176 | * Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initialized bufferSize). |
177 | * Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file. |
178 | * Return value is the number of bytes added to the buffer. |
179 | * Note that srcBuffer might have up to 2 times bufferSize bytes. */ |
180 | size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n); |
181 | |
182 | /* AIO_ReadPool_consumeAndRefill: |
183 | * Consumes the current buffer and refills it with bufferSize bytes. */ |
184 | size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx); |
185 | |
186 | /* AIO_ReadPool_setFile: |
187 | * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL. |
188 | * Waits for all current enqueued tasks to complete if a previous file was set. */ |
189 | void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file); |
190 | |
191 | /* AIO_ReadPool_getFile: |
192 | * Returns the current file set for the read pool. */ |
193 | FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx); |
194 | |
195 | /* AIO_ReadPool_closeFile: |
196 | * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */ |
197 | int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx); |
198 | |
199 | #if defined (__cplusplus) |
200 | } |
201 | #endif |
202 | |
203 | #endif /* ZSTD_FILEIO_ASYNCIO_H */ |