capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

gzip.c++ (8749B)


      1 // Copyright (c) 2017 Cloudflare, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #if KJ_HAS_ZLIB
     23 
     24 #include "gzip.h"
     25 #include <kj/debug.h>
     26 
     27 namespace kj {
     28 
     29 namespace _ {  // private
     30 
     31 GzipOutputContext::GzipOutputContext(kj::Maybe<int> compressionLevel) {
     32   int initResult;
     33 
     34   KJ_IF_MAYBE(level, compressionLevel) {
     35     compressing = true;
     36     initResult =
     37       deflateInit2(&ctx, *level, Z_DEFLATED,
     38                    15 + 16,  // windowBits = 15 (maximum) + magic value 16 to ask for gzip.
     39                    8,        // memLevel = 8 (the default)
     40                    Z_DEFAULT_STRATEGY);
     41   } else {
     42     compressing = false;
     43     initResult = inflateInit2(&ctx, 15 + 16);
     44   }
     45 
     46   if (initResult != Z_OK) {
     47     fail(initResult);
     48   }
     49 }
     50 
     51 GzipOutputContext::~GzipOutputContext() noexcept(false) {
     52   compressing ? deflateEnd(&ctx) : inflateEnd(&ctx);
     53 }
     54 
     55 void GzipOutputContext::setInput(const void* in, size_t size) {
     56   ctx.next_in = const_cast<byte*>(reinterpret_cast<const byte*>(in));
     57   ctx.avail_in = size;
     58 }
     59 
     60 kj::Tuple<bool, kj::ArrayPtr<const byte>> GzipOutputContext::pumpOnce(int flush) {
     61   ctx.next_out = buffer;
     62   ctx.avail_out = sizeof(buffer);
     63 
     64   auto result = compressing ? deflate(&ctx, flush) : inflate(&ctx, flush);
     65   if (result != Z_OK && result != Z_BUF_ERROR && result != Z_STREAM_END) {
     66     fail(result);
     67   }
     68 
     69   // - Z_STREAM_END means we have finished the stream successfully.
     70   // - Z_BUF_ERROR means we didn't have any more input to process
     71   //   (but still have to make a call to write to potentially flush data).
     72   return kj::tuple(result == Z_OK, kj::arrayPtr(buffer, sizeof(buffer) - ctx.avail_out));
     73 }
     74 
     75 void GzipOutputContext::fail(int result) {
     76   auto header = compressing ? "gzip compression failed" : "gzip decompression failed";
     77   if (ctx.msg == nullptr) {
     78     KJ_FAIL_REQUIRE(header, result);
     79   } else {
     80     KJ_FAIL_REQUIRE(header, ctx.msg);
     81   }
     82 }
     83 
     84 }  // namespace _ (private)
     85 
     86 GzipInputStream::GzipInputStream(InputStream& inner)
     87     : inner(inner) {
     88   // windowBits = 15 (maximum) + magic value 16 to ask for gzip.
     89   KJ_ASSERT(inflateInit2(&ctx, 15 + 16) == Z_OK);
     90 }
     91 
     92 GzipInputStream::~GzipInputStream() noexcept(false) {
     93   inflateEnd(&ctx);
     94 }
     95 
     96 size_t GzipInputStream::tryRead(void* out, size_t minBytes, size_t maxBytes) {
     97   if (maxBytes == 0) return size_t(0);
     98 
     99   return readImpl(reinterpret_cast<byte*>(out), minBytes, maxBytes, 0);
    100 }
    101 
    102 size_t GzipInputStream::readImpl(
    103     byte* out, size_t minBytes, size_t maxBytes, size_t alreadyRead) {
    104   if (ctx.avail_in == 0) {
    105     size_t amount = inner.tryRead(buffer, 1, sizeof(buffer));
    106     if (amount == 0) {
    107       if (!atValidEndpoint) {
    108         KJ_FAIL_REQUIRE("gzip compressed stream ended prematurely");
    109       }
    110       return alreadyRead;
    111     } else {
    112       ctx.next_in = buffer;
    113       ctx.avail_in = amount;
    114     }
    115   }
    116 
    117   ctx.next_out = reinterpret_cast<byte*>(out);
    118   ctx.avail_out = maxBytes;
    119 
    120   auto inflateResult = inflate(&ctx, Z_NO_FLUSH);
    121   atValidEndpoint = inflateResult == Z_STREAM_END;
    122   if (inflateResult == Z_OK || inflateResult == Z_STREAM_END) {
    123     if (atValidEndpoint && ctx.avail_in > 0) {
    124       // There's more data available. Assume start of new content.
    125       KJ_ASSERT(inflateReset(&ctx) == Z_OK);
    126     }
    127 
    128     size_t n = maxBytes - ctx.avail_out;
    129     if (n >= minBytes) {
    130       return n + alreadyRead;
    131     } else {
    132       return readImpl(out + n, minBytes - n, maxBytes - n, alreadyRead + n);
    133     }
    134   } else {
    135     if (ctx.msg == nullptr) {
    136       KJ_FAIL_REQUIRE("gzip decompression failed", inflateResult);
    137     } else {
    138       KJ_FAIL_REQUIRE("gzip decompression failed", ctx.msg);
    139     }
    140   }
    141 }
    142 
    143 // =======================================================================================
    144 
    145 GzipOutputStream::GzipOutputStream(OutputStream& inner, int compressionLevel)
    146     : inner(inner), ctx(compressionLevel) {}
    147 
    148 GzipOutputStream::GzipOutputStream(OutputStream& inner, decltype(DECOMPRESS))
    149     : inner(inner), ctx(nullptr) {}
    150 
    151 GzipOutputStream::~GzipOutputStream() noexcept(false) {
    152   pump(Z_FINISH);
    153 }
    154 
    155 void GzipOutputStream::write(const void* in, size_t size) {
    156   ctx.setInput(in, size);
    157   pump(Z_NO_FLUSH);
    158 }
    159 
    160 void GzipOutputStream::pump(int flush) {
    161   bool ok;
    162   do {
    163     auto result = ctx.pumpOnce(flush);
    164     ok = get<0>(result);
    165     auto chunk = get<1>(result);
    166     if (chunk.size() > 0) {
    167       inner.write(chunk.begin(), chunk.size());
    168     }
    169   } while (ok);
    170 }
    171 
    172 // =======================================================================================
    173 
    174 GzipAsyncInputStream::GzipAsyncInputStream(AsyncInputStream& inner)
    175     : inner(inner) {
    176   // windowBits = 15 (maximum) + magic value 16 to ask for gzip.
    177   KJ_ASSERT(inflateInit2(&ctx, 15 + 16) == Z_OK);
    178 }
    179 
    180 GzipAsyncInputStream::~GzipAsyncInputStream() noexcept(false) {
    181   inflateEnd(&ctx);
    182 }
    183 
    184 Promise<size_t> GzipAsyncInputStream::tryRead(void* out, size_t minBytes, size_t maxBytes) {
    185   if (maxBytes == 0) return size_t(0);
    186 
    187   return readImpl(reinterpret_cast<byte*>(out), minBytes, maxBytes, 0);
    188 }
    189 
    190 Promise<size_t> GzipAsyncInputStream::readImpl(
    191     byte* out, size_t minBytes, size_t maxBytes, size_t alreadyRead) {
    192   if (ctx.avail_in == 0) {
    193     return inner.tryRead(buffer, 1, sizeof(buffer))
    194         .then([this,out,minBytes,maxBytes,alreadyRead](size_t amount) -> Promise<size_t> {
    195       if (amount == 0) {
    196         if (!atValidEndpoint) {
    197           return KJ_EXCEPTION(DISCONNECTED, "gzip compressed stream ended prematurely");
    198         }
    199         return alreadyRead;
    200       } else {
    201         ctx.next_in = buffer;
    202         ctx.avail_in = amount;
    203         return readImpl(out, minBytes, maxBytes, alreadyRead);
    204       }
    205     });
    206   }
    207 
    208   ctx.next_out = reinterpret_cast<byte*>(out);
    209   ctx.avail_out = maxBytes;
    210 
    211   auto inflateResult = inflate(&ctx, Z_NO_FLUSH);
    212   atValidEndpoint = inflateResult == Z_STREAM_END;
    213   if (inflateResult == Z_OK || inflateResult == Z_STREAM_END) {
    214     if (atValidEndpoint && ctx.avail_in > 0) {
    215       // There's more data available. Assume start of new content.
    216       KJ_ASSERT(inflateReset(&ctx) == Z_OK);
    217     }
    218 
    219     size_t n = maxBytes - ctx.avail_out;
    220     if (n >= minBytes) {
    221       return n + alreadyRead;
    222     } else {
    223       return readImpl(out + n, minBytes - n, maxBytes - n, alreadyRead + n);
    224     }
    225   } else {
    226     if (ctx.msg == nullptr) {
    227       KJ_FAIL_REQUIRE("gzip decompression failed", inflateResult);
    228     } else {
    229       KJ_FAIL_REQUIRE("gzip decompression failed", ctx.msg);
    230     }
    231   }
    232 }
    233 
    234 // =======================================================================================
    235 
    236 GzipAsyncOutputStream::GzipAsyncOutputStream(AsyncOutputStream& inner, int compressionLevel)
    237     : inner(inner), ctx(compressionLevel) {}
    238 
    239 GzipAsyncOutputStream::GzipAsyncOutputStream(AsyncOutputStream& inner, decltype(DECOMPRESS))
    240     : inner(inner), ctx(nullptr) {}
    241 
    242 Promise<void> GzipAsyncOutputStream::write(const void* in, size_t size) {
    243   ctx.setInput(in, size);
    244   return pump(Z_NO_FLUSH);
    245 }
    246 
    247 Promise<void> GzipAsyncOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
    248   if (pieces.size() == 0) return kj::READY_NOW;
    249   return write(pieces[0].begin(), pieces[0].size())
    250       .then([this,pieces]() {
    251     return write(pieces.slice(1, pieces.size()));
    252   });
    253 }
    254 
    255 kj::Promise<void> GzipAsyncOutputStream::pump(int flush) {
    256   auto result = ctx.pumpOnce(flush);
    257   auto ok = get<0>(result);
    258   auto chunk = get<1>(result);
    259 
    260   if (chunk.size() == 0) {
    261     if (ok) {
    262       return pump(flush);
    263     } else {
    264       return kj::READY_NOW;
    265     }
    266   } else {
    267     auto promise = inner.write(chunk.begin(), chunk.size());
    268     if (ok) {
    269       promise = promise.then([this, flush]() { return pump(flush); });
    270     }
    271     return promise;
    272   }
    273 }
    274 
    275 }  // namespace kj
    276 
    277 #endif  // KJ_HAS_ZLIB