| 1 | /* |
| 2 | * Copyright (c) Meta Platforms, Inc. and affiliates. |
| 3 | * All rights reserved. |
| 4 | * |
| 5 | * This source code is licensed under both the BSD-style license (found in the |
| 6 | * LICENSE file in the root directory of this source tree) and the GPLv2 (found |
| 7 | * in the COPYING file in the root directory of this source tree). |
| 8 | */ |
| 9 | #pragma once |
| 10 | |
| 11 | #include "ErrorHolder.h" |
| 12 | #include "Logging.h" |
| 13 | #include "Options.h" |
| 14 | #include "utils/Buffer.h" |
| 15 | #include "utils/Range.h" |
| 16 | #include "utils/ResourcePool.h" |
| 17 | #include "utils/ThreadPool.h" |
| 18 | #include "utils/WorkQueue.h" |
| 19 | #define ZSTD_STATIC_LINKING_ONLY |
| 20 | #define ZSTD_DISABLE_DEPRECATE_WARNINGS /* No deprecation warnings, pzstd itself is deprecated |
| 21 | * and uses deprecated functions |
| 22 | */ |
| 23 | #include "zstd.h" |
| 24 | #undef ZSTD_STATIC_LINKING_ONLY |
| 25 | |
| 26 | #include <cstddef> |
| 27 | #include <cstdint> |
| 28 | #include <memory> |
| 29 | |
| 30 | namespace pzstd { |
| 31 | /** |
| 32 | * Runs pzstd with `options` and returns the number of bytes written. |
| 33 | * An error occurred if `errorHandler.hasError()`. |
| 34 | * |
| 35 | * @param options The pzstd options to use for (de)compression |
| 36 | * @returns 0 upon success and non-zero on failure. |
| 37 | */ |
| 38 | int pzstdMain(const Options& options); |
| 39 | |
| 40 | class SharedState { |
| 41 | public: |
| 42 | SharedState(const Options& options) : log(options.verbosity) { |
| 43 | if (!options.decompress) { |
| 44 | auto parameters = options.determineParameters(); |
| 45 | cStreamPool.reset(new ResourcePool<ZSTD_CStream>{ |
| 46 | [this, parameters]() -> ZSTD_CStream* { |
| 47 | this->log(kLogVerbose, "%s\n", "Creating new ZSTD_CStream"); |
| 48 | auto zcs = ZSTD_createCStream(); |
| 49 | if (zcs) { |
| 50 | auto err = ZSTD_initCStream_advanced( |
| 51 | zcs, nullptr, 0, parameters, 0); |
| 52 | if (ZSTD_isError(err)) { |
| 53 | ZSTD_freeCStream(zcs); |
| 54 | return nullptr; |
| 55 | } |
| 56 | } |
| 57 | return zcs; |
| 58 | }, |
| 59 | [](ZSTD_CStream *zcs) { |
| 60 | ZSTD_freeCStream(zcs); |
| 61 | }}); |
| 62 | } else { |
| 63 | dStreamPool.reset(new ResourcePool<ZSTD_DStream>{ |
| 64 | [this]() -> ZSTD_DStream* { |
| 65 | this->log(kLogVerbose, "%s\n", "Creating new ZSTD_DStream"); |
| 66 | auto zds = ZSTD_createDStream(); |
| 67 | if (zds) { |
| 68 | auto err = ZSTD_initDStream(zds); |
| 69 | if (ZSTD_isError(err)) { |
| 70 | ZSTD_freeDStream(zds); |
| 71 | return nullptr; |
| 72 | } |
| 73 | } |
| 74 | return zds; |
| 75 | }, |
| 76 | [](ZSTD_DStream *zds) { |
| 77 | ZSTD_freeDStream(zds); |
| 78 | }}); |
| 79 | } |
| 80 | } |
| 81 | |
| 82 | ~SharedState() { |
| 83 | // The resource pools have references to this, so destroy them first. |
| 84 | cStreamPool.reset(); |
| 85 | dStreamPool.reset(); |
| 86 | } |
| 87 | |
| 88 | Logger log; |
| 89 | ErrorHolder errorHolder; |
| 90 | std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool; |
| 91 | std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool; |
| 92 | }; |
| 93 | |
| 94 | /** |
| 95 | * Streams input from `fd`, breaks input up into chunks, and compresses each |
| 96 | * chunk independently. Output of each chunk gets streamed to a queue, and |
| 97 | * the output queues get put into `chunks` in order. |
| 98 | * |
| 99 | * @param state The shared state |
| 100 | * @param chunks Each compression jobs output queue gets `pushed()` here |
| 101 | * as soon as it is available |
| 102 | * @param executor The thread pool to run compression jobs in |
| 103 | * @param fd The input file descriptor |
| 104 | * @param size The size of the input file if known, 0 otherwise |
| 105 | * @param numThreads The number of threads in the thread pool |
| 106 | * @param parameters The zstd parameters to use for compression |
| 107 | * @returns The number of bytes read from the file |
| 108 | */ |
| 109 | std::uint64_t asyncCompressChunks( |
| 110 | SharedState& state, |
| 111 | WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks, |
| 112 | ThreadPool& executor, |
| 113 | FILE* fd, |
| 114 | std::uintmax_t size, |
| 115 | std::size_t numThreads, |
| 116 | ZSTD_parameters parameters); |
| 117 | |
| 118 | /** |
| 119 | * Streams input from `fd`. If pzstd headers are available it breaks the input |
| 120 | * up into independent frames. It sends each frame to an independent |
| 121 | * decompression job. Output of each frame gets streamed to a queue, and |
| 122 | * the output queues get put into `frames` in order. |
| 123 | * |
| 124 | * @param state The shared state |
| 125 | * @param frames Each decompression jobs output queue gets `pushed()` here |
| 126 | * as soon as it is available |
| 127 | * @param executor The thread pool to run compression jobs in |
| 128 | * @param fd The input file descriptor |
| 129 | * @returns The number of bytes read from the file |
| 130 | */ |
| 131 | std::uint64_t asyncDecompressFrames( |
| 132 | SharedState& state, |
| 133 | WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames, |
| 134 | ThreadPool& executor, |
| 135 | FILE* fd); |
| 136 | |
| 137 | /** |
| 138 | * Streams input in from each queue in `outs` in order, and writes the data to |
| 139 | * `outputFd`. |
| 140 | * |
| 141 | * @param state The shared state |
| 142 | * @param outs A queue of output queues, one for each |
| 143 | * (de)compression job. |
| 144 | * @param outputFd The file descriptor to write to |
| 145 | * @param decompress Are we decompressing? |
| 146 | * @returns The number of bytes written |
| 147 | */ |
| 148 | std::uint64_t writeFile( |
| 149 | SharedState& state, |
| 150 | WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs, |
| 151 | FILE* outputFd, |
| 152 | bool decompress); |
| 153 | } |