capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

gzip.h (4224B)


      1 // Copyright (c) 2017 Cloudflare, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #pragma once
     23 
     24 #include <kj/io.h>
     25 #include <kj/async-io.h>
     26 #include <zlib.h>
     27 
     28 namespace kj {
     29 
     30 namespace _ {  // private
     31 
     32 class GzipOutputContext final {
     33 public:
     34   GzipOutputContext(kj::Maybe<int> compressionLevel);
     35   ~GzipOutputContext() noexcept(false);
     36   KJ_DISALLOW_COPY(GzipOutputContext);
     37 
     38   void setInput(const void* in, size_t size);
     39   kj::Tuple<bool, kj::ArrayPtr<const byte>> pumpOnce(int flush);
     40 
     41 private:
     42   bool compressing;
     43   z_stream ctx = {};
     44   byte buffer[4096];
     45 
     46   [[noreturn]] void fail(int result);
     47 };
     48 
     49 }  // namespace _ (private)
     50 
     51 class GzipInputStream final: public InputStream {
     52 public:
     53   GzipInputStream(InputStream& inner);
     54   ~GzipInputStream() noexcept(false);
     55   KJ_DISALLOW_COPY(GzipInputStream);
     56 
     57   size_t tryRead(void* buffer, size_t minBytes, size_t maxBytes) override;
     58 
     59 private:
     60   InputStream& inner;
     61   z_stream ctx = {};
     62   bool atValidEndpoint = false;
     63 
     64   byte buffer[4096];
     65 
     66   size_t readImpl(byte* buffer, size_t minBytes, size_t maxBytes, size_t alreadyRead);
     67 };
     68 
     69 class GzipOutputStream final: public OutputStream {
     70 public:
     71   enum { DECOMPRESS };
     72 
     73   GzipOutputStream(OutputStream& inner, int compressionLevel = Z_DEFAULT_COMPRESSION);
     74   GzipOutputStream(OutputStream& inner, decltype(DECOMPRESS));
     75   ~GzipOutputStream() noexcept(false);
     76   KJ_DISALLOW_COPY(GzipOutputStream);
     77 
     78   void write(const void* buffer, size_t size) override;
     79   using OutputStream::write;
     80 
     81   inline void flush() {
     82     pump(Z_SYNC_FLUSH);
     83   }
     84 
     85 private:
     86   OutputStream& inner;
     87   _::GzipOutputContext ctx;
     88 
     89   void pump(int flush);
     90 };
     91 
     92 class GzipAsyncInputStream final: public AsyncInputStream {
     93 public:
     94   GzipAsyncInputStream(AsyncInputStream& inner);
     95   ~GzipAsyncInputStream() noexcept(false);
     96   KJ_DISALLOW_COPY(GzipAsyncInputStream);
     97 
     98   Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override;
     99 
    100 private:
    101   AsyncInputStream& inner;
    102   z_stream ctx = {};
    103   bool atValidEndpoint = false;
    104 
    105   byte buffer[4096];
    106 
    107   Promise<size_t> readImpl(byte* buffer, size_t minBytes, size_t maxBytes, size_t alreadyRead);
    108 };
    109 
    110 class GzipAsyncOutputStream final: public AsyncOutputStream {
    111 public:
    112   enum { DECOMPRESS };
    113 
    114   GzipAsyncOutputStream(AsyncOutputStream& inner, int compressionLevel = Z_DEFAULT_COMPRESSION);
    115   GzipAsyncOutputStream(AsyncOutputStream& inner, decltype(DECOMPRESS));
    116   KJ_DISALLOW_COPY(GzipAsyncOutputStream);
    117 
    118   Promise<void> write(const void* buffer, size_t size) override;
    119   Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override;
    120 
    121   Promise<void> whenWriteDisconnected() override { return inner.whenWriteDisconnected(); }
    122 
    123   inline Promise<void> flush() {
    124     return pump(Z_SYNC_FLUSH);
    125   }
    126   // Call if you need to flush a stream at an arbitrary data point.
    127 
    128   Promise<void> end() {
    129     return pump(Z_FINISH);
    130   }
    131   // Must call to flush and finish the stream, since some data may be buffered.
    132   //
    133   // TODO(cleanup): This should be a virtual method on AsyncOutputStream.
    134 
    135 private:
    136   AsyncOutputStream& inner;
    137   _::GzipOutputContext ctx;
    138 
    139   kj::Promise<void> pump(int flush);
    140 };
    141 
    142 }  // namespace kj