648db22b |
1 | /* |
2 | * Copyright (c) Meta Platforms, Inc. and affiliates. |
3 | * All rights reserved. |
4 | * |
5 | * This source code is licensed under both the BSD-style license (found in the |
6 | * LICENSE file in the root directory of this source tree) and the GPLv2 (found |
7 | * in the COPYING file in the root directory of this source tree). |
8 | */ |
9 | #pragma once |
10 | |
11 | #include "ErrorHolder.h" |
12 | #include "Logging.h" |
13 | #include "Options.h" |
14 | #include "utils/Buffer.h" |
15 | #include "utils/Range.h" |
16 | #include "utils/ResourcePool.h" |
17 | #include "utils/ThreadPool.h" |
18 | #include "utils/WorkQueue.h" |
19 | #define ZSTD_STATIC_LINKING_ONLY |
20 | #define ZSTD_DISABLE_DEPRECATE_WARNINGS /* No deprecation warnings, pzstd itself is deprecated |
21 | * and uses deprecated functions |
22 | */ |
23 | #include "zstd.h" |
24 | #undef ZSTD_STATIC_LINKING_ONLY |
25 | |
26 | #include <cstddef> |
27 | #include <cstdint> |
28 | #include <memory> |
29 | |
30 | namespace pzstd { |
31 | /** |
32 | * Runs pzstd with `options` and returns the number of bytes written. |
33 | * An error occurred if `errorHandler.hasError()`. |
34 | * |
35 | * @param options The pzstd options to use for (de)compression |
36 | * @returns 0 upon success and non-zero on failure. |
37 | */ |
38 | int pzstdMain(const Options& options); |
39 | |
40 | class SharedState { |
41 | public: |
42 | SharedState(const Options& options) : log(options.verbosity) { |
43 | if (!options.decompress) { |
44 | auto parameters = options.determineParameters(); |
45 | cStreamPool.reset(new ResourcePool<ZSTD_CStream>{ |
46 | [this, parameters]() -> ZSTD_CStream* { |
47 | this->log(kLogVerbose, "%s\n", "Creating new ZSTD_CStream"); |
48 | auto zcs = ZSTD_createCStream(); |
49 | if (zcs) { |
50 | auto err = ZSTD_initCStream_advanced( |
51 | zcs, nullptr, 0, parameters, 0); |
52 | if (ZSTD_isError(err)) { |
53 | ZSTD_freeCStream(zcs); |
54 | return nullptr; |
55 | } |
56 | } |
57 | return zcs; |
58 | }, |
59 | [](ZSTD_CStream *zcs) { |
60 | ZSTD_freeCStream(zcs); |
61 | }}); |
62 | } else { |
63 | dStreamPool.reset(new ResourcePool<ZSTD_DStream>{ |
64 | [this]() -> ZSTD_DStream* { |
65 | this->log(kLogVerbose, "%s\n", "Creating new ZSTD_DStream"); |
66 | auto zds = ZSTD_createDStream(); |
67 | if (zds) { |
68 | auto err = ZSTD_initDStream(zds); |
69 | if (ZSTD_isError(err)) { |
70 | ZSTD_freeDStream(zds); |
71 | return nullptr; |
72 | } |
73 | } |
74 | return zds; |
75 | }, |
76 | [](ZSTD_DStream *zds) { |
77 | ZSTD_freeDStream(zds); |
78 | }}); |
79 | } |
80 | } |
81 | |
82 | ~SharedState() { |
83 | // The resource pools have references to this, so destroy them first. |
84 | cStreamPool.reset(); |
85 | dStreamPool.reset(); |
86 | } |
87 | |
88 | Logger log; |
89 | ErrorHolder errorHolder; |
90 | std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool; |
91 | std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool; |
92 | }; |
93 | |
94 | /** |
95 | * Streams input from `fd`, breaks input up into chunks, and compresses each |
96 | * chunk independently. Output of each chunk gets streamed to a queue, and |
97 | * the output queues get put into `chunks` in order. |
98 | * |
99 | * @param state The shared state |
100 | * @param chunks Each compression jobs output queue gets `pushed()` here |
101 | * as soon as it is available |
102 | * @param executor The thread pool to run compression jobs in |
103 | * @param fd The input file descriptor |
104 | * @param size The size of the input file if known, 0 otherwise |
105 | * @param numThreads The number of threads in the thread pool |
106 | * @param parameters The zstd parameters to use for compression |
107 | * @returns The number of bytes read from the file |
108 | */ |
109 | std::uint64_t asyncCompressChunks( |
110 | SharedState& state, |
111 | WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks, |
112 | ThreadPool& executor, |
113 | FILE* fd, |
114 | std::uintmax_t size, |
115 | std::size_t numThreads, |
116 | ZSTD_parameters parameters); |
117 | |
118 | /** |
119 | * Streams input from `fd`. If pzstd headers are available it breaks the input |
120 | * up into independent frames. It sends each frame to an independent |
121 | * decompression job. Output of each frame gets streamed to a queue, and |
122 | * the output queues get put into `frames` in order. |
123 | * |
124 | * @param state The shared state |
125 | * @param frames Each decompression jobs output queue gets `pushed()` here |
126 | * as soon as it is available |
127 | * @param executor The thread pool to run compression jobs in |
128 | * @param fd The input file descriptor |
129 | * @returns The number of bytes read from the file |
130 | */ |
131 | std::uint64_t asyncDecompressFrames( |
132 | SharedState& state, |
133 | WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames, |
134 | ThreadPool& executor, |
135 | FILE* fd); |
136 | |
137 | /** |
138 | * Streams input in from each queue in `outs` in order, and writes the data to |
139 | * `outputFd`. |
140 | * |
141 | * @param state The shared state |
142 | * @param outs A queue of output queues, one for each |
143 | * (de)compression job. |
144 | * @param outputFd The file descriptor to write to |
145 | * @param decompress Are we decompressing? |
146 | * @returns The number of bytes written |
147 | */ |
148 | std::uint64_t writeFile( |
149 | SharedState& state, |
150 | WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs, |
151 | FILE* outputFd, |
152 | bool decompress); |
153 | } |