2 * Copyright (c) Meta Platforms, Inc. and affiliates.
5 * This source code is licensed under both the BSD-style license (found in the
6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7 * in the COPYING file in the root directory of this source tree).
11 #include "ErrorHolder.h"
14 #include "utils/Buffer.h"
15 #include "utils/Range.h"
16 #include "utils/ResourcePool.h"
17 #include "utils/ThreadPool.h"
18 #include "utils/WorkQueue.h"
19 #define ZSTD_STATIC_LINKING_ONLY
20 #define ZSTD_DISABLE_DEPRECATE_WARNINGS /* No deprecation warnings, pzstd itself is deprecated
21 * and uses deprecated functions
24 #undef ZSTD_STATIC_LINKING_ONLY
32 * Runs pzstd with `options` and returns the number of bytes written.
33 * An error occurred if `errorHandler.hasError()`.
35 * @param options The pzstd options to use for (de)compression
36 * @returns 0 upon success and non-zero on failure.
38 int pzstdMain(const Options& options);
42 SharedState(const Options& options) : log(options.verbosity) {
43 if (!options.decompress) {
44 auto parameters = options.determineParameters();
45 cStreamPool.reset(new ResourcePool<ZSTD_CStream>{
46 [this, parameters]() -> ZSTD_CStream* {
47 this->log(kLogVerbose, "%s\n", "Creating new ZSTD_CStream");
48 auto zcs = ZSTD_createCStream();
50 auto err = ZSTD_initCStream_advanced(
51 zcs, nullptr, 0, parameters, 0);
52 if (ZSTD_isError(err)) {
53 ZSTD_freeCStream(zcs);
59 [](ZSTD_CStream *zcs) {
60 ZSTD_freeCStream(zcs);
63 dStreamPool.reset(new ResourcePool<ZSTD_DStream>{
64 [this]() -> ZSTD_DStream* {
65 this->log(kLogVerbose, "%s\n", "Creating new ZSTD_DStream");
66 auto zds = ZSTD_createDStream();
68 auto err = ZSTD_initDStream(zds);
69 if (ZSTD_isError(err)) {
70 ZSTD_freeDStream(zds);
76 [](ZSTD_DStream *zds) {
77 ZSTD_freeDStream(zds);
83 // The resource pools have references to this, so destroy them first.
89 ErrorHolder errorHolder;
90 std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool;
91 std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool;
95 * Streams input from `fd`, breaks input up into chunks, and compresses each
96 * chunk independently. Output of each chunk gets streamed to a queue, and
97 * the output queues get put into `chunks` in order.
99 * @param state The shared state
100 * @param chunks Each compression jobs output queue gets `pushed()` here
101 * as soon as it is available
102 * @param executor The thread pool to run compression jobs in
103 * @param fd The input file descriptor
104 * @param size The size of the input file if known, 0 otherwise
105 * @param numThreads The number of threads in the thread pool
106 * @param parameters The zstd parameters to use for compression
107 * @returns The number of bytes read from the file
109 std::uint64_t asyncCompressChunks(
111 WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
112 ThreadPool& executor,
115 std::size_t numThreads,
116 ZSTD_parameters parameters);
119 * Streams input from `fd`. If pzstd headers are available it breaks the input
120 * up into independent frames. It sends each frame to an independent
121 * decompression job. Output of each frame gets streamed to a queue, and
122 * the output queues get put into `frames` in order.
124 * @param state The shared state
125 * @param frames Each decompression jobs output queue gets `pushed()` here
126 * as soon as it is available
127 * @param executor The thread pool to run compression jobs in
128 * @param fd The input file descriptor
129 * @returns The number of bytes read from the file
131 std::uint64_t asyncDecompressFrames(
133 WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
134 ThreadPool& executor,
138 * Streams input in from each queue in `outs` in order, and writes the data to
141 * @param state The shared state
142 * @param outs A queue of output queues, one for each
143 * (de)compression job.
144 * @param outputFd The file descriptor to write to
145 * @param decompress Are we decompressing?
146 * @returns The number of bytes written
148 std::uint64_t writeFile(
150 WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,