gzip.c++ (8749B)
1 // Copyright (c) 2017 Cloudflare, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #if KJ_HAS_ZLIB 23 24 #include "gzip.h" 25 #include <kj/debug.h> 26 27 namespace kj { 28 29 namespace _ { // private 30 31 GzipOutputContext::GzipOutputContext(kj::Maybe<int> compressionLevel) { 32 int initResult; 33 34 KJ_IF_MAYBE(level, compressionLevel) { 35 compressing = true; 36 initResult = 37 deflateInit2(&ctx, *level, Z_DEFLATED, 38 15 + 16, // windowBits = 15 (maximum) + magic value 16 to ask for gzip. 39 8, // memLevel = 8 (the default) 40 Z_DEFAULT_STRATEGY); 41 } else { 42 compressing = false; 43 initResult = inflateInit2(&ctx, 15 + 16); 44 } 45 46 if (initResult != Z_OK) { 47 fail(initResult); 48 } 49 } 50 51 GzipOutputContext::~GzipOutputContext() noexcept(false) { 52 compressing ? deflateEnd(&ctx) : inflateEnd(&ctx); 53 } 54 55 void GzipOutputContext::setInput(const void* in, size_t size) { 56 ctx.next_in = const_cast<byte*>(reinterpret_cast<const byte*>(in)); 57 ctx.avail_in = size; 58 } 59 60 kj::Tuple<bool, kj::ArrayPtr<const byte>> GzipOutputContext::pumpOnce(int flush) { 61 ctx.next_out = buffer; 62 ctx.avail_out = sizeof(buffer); 63 64 auto result = compressing ? deflate(&ctx, flush) : inflate(&ctx, flush); 65 if (result != Z_OK && result != Z_BUF_ERROR && result != Z_STREAM_END) { 66 fail(result); 67 } 68 69 // - Z_STREAM_END means we have finished the stream successfully. 70 // - Z_BUF_ERROR means we didn't have any more input to process 71 // (but still have to make a call to write to potentially flush data). 72 return kj::tuple(result == Z_OK, kj::arrayPtr(buffer, sizeof(buffer) - ctx.avail_out)); 73 } 74 75 void GzipOutputContext::fail(int result) { 76 auto header = compressing ? "gzip compression failed" : "gzip decompression failed"; 77 if (ctx.msg == nullptr) { 78 KJ_FAIL_REQUIRE(header, result); 79 } else { 80 KJ_FAIL_REQUIRE(header, ctx.msg); 81 } 82 } 83 84 } // namespace _ (private) 85 86 GzipInputStream::GzipInputStream(InputStream& inner) 87 : inner(inner) { 88 // windowBits = 15 (maximum) + magic value 16 to ask for gzip. 89 KJ_ASSERT(inflateInit2(&ctx, 15 + 16) == Z_OK); 90 } 91 92 GzipInputStream::~GzipInputStream() noexcept(false) { 93 inflateEnd(&ctx); 94 } 95 96 size_t GzipInputStream::tryRead(void* out, size_t minBytes, size_t maxBytes) { 97 if (maxBytes == 0) return size_t(0); 98 99 return readImpl(reinterpret_cast<byte*>(out), minBytes, maxBytes, 0); 100 } 101 102 size_t GzipInputStream::readImpl( 103 byte* out, size_t minBytes, size_t maxBytes, size_t alreadyRead) { 104 if (ctx.avail_in == 0) { 105 size_t amount = inner.tryRead(buffer, 1, sizeof(buffer)); 106 if (amount == 0) { 107 if (!atValidEndpoint) { 108 KJ_FAIL_REQUIRE("gzip compressed stream ended prematurely"); 109 } 110 return alreadyRead; 111 } else { 112 ctx.next_in = buffer; 113 ctx.avail_in = amount; 114 } 115 } 116 117 ctx.next_out = reinterpret_cast<byte*>(out); 118 ctx.avail_out = maxBytes; 119 120 auto inflateResult = inflate(&ctx, Z_NO_FLUSH); 121 atValidEndpoint = inflateResult == Z_STREAM_END; 122 if (inflateResult == Z_OK || inflateResult == Z_STREAM_END) { 123 if (atValidEndpoint && ctx.avail_in > 0) { 124 // There's more data available. Assume start of new content. 125 KJ_ASSERT(inflateReset(&ctx) == Z_OK); 126 } 127 128 size_t n = maxBytes - ctx.avail_out; 129 if (n >= minBytes) { 130 return n + alreadyRead; 131 } else { 132 return readImpl(out + n, minBytes - n, maxBytes - n, alreadyRead + n); 133 } 134 } else { 135 if (ctx.msg == nullptr) { 136 KJ_FAIL_REQUIRE("gzip decompression failed", inflateResult); 137 } else { 138 KJ_FAIL_REQUIRE("gzip decompression failed", ctx.msg); 139 } 140 } 141 } 142 143 // ======================================================================================= 144 145 GzipOutputStream::GzipOutputStream(OutputStream& inner, int compressionLevel) 146 : inner(inner), ctx(compressionLevel) {} 147 148 GzipOutputStream::GzipOutputStream(OutputStream& inner, decltype(DECOMPRESS)) 149 : inner(inner), ctx(nullptr) {} 150 151 GzipOutputStream::~GzipOutputStream() noexcept(false) { 152 pump(Z_FINISH); 153 } 154 155 void GzipOutputStream::write(const void* in, size_t size) { 156 ctx.setInput(in, size); 157 pump(Z_NO_FLUSH); 158 } 159 160 void GzipOutputStream::pump(int flush) { 161 bool ok; 162 do { 163 auto result = ctx.pumpOnce(flush); 164 ok = get<0>(result); 165 auto chunk = get<1>(result); 166 if (chunk.size() > 0) { 167 inner.write(chunk.begin(), chunk.size()); 168 } 169 } while (ok); 170 } 171 172 // ======================================================================================= 173 174 GzipAsyncInputStream::GzipAsyncInputStream(AsyncInputStream& inner) 175 : inner(inner) { 176 // windowBits = 15 (maximum) + magic value 16 to ask for gzip. 177 KJ_ASSERT(inflateInit2(&ctx, 15 + 16) == Z_OK); 178 } 179 180 GzipAsyncInputStream::~GzipAsyncInputStream() noexcept(false) { 181 inflateEnd(&ctx); 182 } 183 184 Promise<size_t> GzipAsyncInputStream::tryRead(void* out, size_t minBytes, size_t maxBytes) { 185 if (maxBytes == 0) return size_t(0); 186 187 return readImpl(reinterpret_cast<byte*>(out), minBytes, maxBytes, 0); 188 } 189 190 Promise<size_t> GzipAsyncInputStream::readImpl( 191 byte* out, size_t minBytes, size_t maxBytes, size_t alreadyRead) { 192 if (ctx.avail_in == 0) { 193 return inner.tryRead(buffer, 1, sizeof(buffer)) 194 .then([this,out,minBytes,maxBytes,alreadyRead](size_t amount) -> Promise<size_t> { 195 if (amount == 0) { 196 if (!atValidEndpoint) { 197 return KJ_EXCEPTION(DISCONNECTED, "gzip compressed stream ended prematurely"); 198 } 199 return alreadyRead; 200 } else { 201 ctx.next_in = buffer; 202 ctx.avail_in = amount; 203 return readImpl(out, minBytes, maxBytes, alreadyRead); 204 } 205 }); 206 } 207 208 ctx.next_out = reinterpret_cast<byte*>(out); 209 ctx.avail_out = maxBytes; 210 211 auto inflateResult = inflate(&ctx, Z_NO_FLUSH); 212 atValidEndpoint = inflateResult == Z_STREAM_END; 213 if (inflateResult == Z_OK || inflateResult == Z_STREAM_END) { 214 if (atValidEndpoint && ctx.avail_in > 0) { 215 // There's more data available. Assume start of new content. 216 KJ_ASSERT(inflateReset(&ctx) == Z_OK); 217 } 218 219 size_t n = maxBytes - ctx.avail_out; 220 if (n >= minBytes) { 221 return n + alreadyRead; 222 } else { 223 return readImpl(out + n, minBytes - n, maxBytes - n, alreadyRead + n); 224 } 225 } else { 226 if (ctx.msg == nullptr) { 227 KJ_FAIL_REQUIRE("gzip decompression failed", inflateResult); 228 } else { 229 KJ_FAIL_REQUIRE("gzip decompression failed", ctx.msg); 230 } 231 } 232 } 233 234 // ======================================================================================= 235 236 GzipAsyncOutputStream::GzipAsyncOutputStream(AsyncOutputStream& inner, int compressionLevel) 237 : inner(inner), ctx(compressionLevel) {} 238 239 GzipAsyncOutputStream::GzipAsyncOutputStream(AsyncOutputStream& inner, decltype(DECOMPRESS)) 240 : inner(inner), ctx(nullptr) {} 241 242 Promise<void> GzipAsyncOutputStream::write(const void* in, size_t size) { 243 ctx.setInput(in, size); 244 return pump(Z_NO_FLUSH); 245 } 246 247 Promise<void> GzipAsyncOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) { 248 if (pieces.size() == 0) return kj::READY_NOW; 249 return write(pieces[0].begin(), pieces[0].size()) 250 .then([this,pieces]() { 251 return write(pieces.slice(1, pieces.size())); 252 }); 253 } 254 255 kj::Promise<void> GzipAsyncOutputStream::pump(int flush) { 256 auto result = ctx.pumpOnce(flush); 257 auto ok = get<0>(result); 258 auto chunk = get<1>(result); 259 260 if (chunk.size() == 0) { 261 if (ok) { 262 return pump(flush); 263 } else { 264 return kj::READY_NOW; 265 } 266 } else { 267 auto promise = inner.write(chunk.begin(), chunk.size()); 268 if (ok) { 269 promise = promise.then([this, flush]() { return pump(flush); }); 270 } 271 return promise; 272 } 273 } 274 275 } // namespace kj 276 277 #endif // KJ_HAS_ZLIB