capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

capnproto-common.h (15160B)


      1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #pragma once
     23 
     24 #if defined(__GNUC__) && !defined(CAPNP_HEADER_WARNINGS)
     25 #pragma GCC system_header
     26 #endif
     27 
     28 #include "common.h"
     29 #include <capnp/serialize.h>
     30 #include <capnp/serialize-packed.h>
     31 #include <kj/debug.h>
     32 #if HAVE_SNAPPY
     33 #include <capnp/serialize-snappy.h>
     34 #endif  // HAVE_SNAPPY
     35 #include <thread>
     36 
     37 namespace capnp {
     38 namespace benchmark {
     39 namespace capnp {
     40 
     41 class CountingOutputStream: public kj::FdOutputStream {
     42 public:
     43   CountingOutputStream(int fd): FdOutputStream(fd), throughput(0) {}
     44 
     45   uint64_t throughput;
     46 
     47   void write(const void* buffer, size_t size) override {
     48     FdOutputStream::write(buffer, size);
     49     throughput += size;
     50   }
     51 
     52   void write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
     53     FdOutputStream::write(pieces);
     54     for (auto& piece: pieces) {
     55       throughput += piece.size();
     56     }
     57   }
     58 };
     59 
     60 // =======================================================================================
     61 
     62 struct Uncompressed {
     63   typedef kj::FdInputStream& BufferedInput;
     64   typedef InputStreamMessageReader MessageReader;
     65 
     66   class ArrayMessageReader: public FlatArrayMessageReader {
     67   public:
     68     ArrayMessageReader(kj::ArrayPtr<const byte> array,
     69                        ReaderOptions options = ReaderOptions(),
     70                        kj::ArrayPtr<word> scratchSpace = nullptr)
     71       : FlatArrayMessageReader(kj::arrayPtr(
     72           reinterpret_cast<const word*>(array.begin()),
     73           reinterpret_cast<const word*>(array.end())), options) {}
     74   };
     75 
     76   static inline void write(kj::OutputStream& output, MessageBuilder& builder) {
     77     writeMessage(output, builder);
     78   }
     79 };
     80 
     81 struct Packed {
     82   typedef kj::BufferedInputStreamWrapper BufferedInput;
     83   typedef PackedMessageReader MessageReader;
     84 
     85   class ArrayMessageReader: private kj::ArrayInputStream, public PackedMessageReader {
     86   public:
     87     ArrayMessageReader(kj::ArrayPtr<const byte> array,
     88                        ReaderOptions options = ReaderOptions(),
     89                        kj::ArrayPtr<word> scratchSpace = nullptr)
     90       : ArrayInputStream(array),
     91         PackedMessageReader(*this, options, scratchSpace) {}
     92   };
     93 
     94   static inline void write(kj::OutputStream& output, MessageBuilder& builder) {
     95     writePackedMessage(output, builder);
     96   }
     97 
     98   static inline void write(kj::BufferedOutputStream& output, MessageBuilder& builder) {
     99     writePackedMessage(output, builder);
    100   }
    101 };
    102 
    103 #if HAVE_SNAPPY
    104 static byte snappyReadBuffer[SNAPPY_BUFFER_SIZE];
    105 static byte snappyWriteBuffer[SNAPPY_BUFFER_SIZE];
    106 static byte snappyCompressedBuffer[SNAPPY_COMPRESSED_BUFFER_SIZE];
    107 
    108 struct SnappyCompressed {
    109   typedef BufferedInputStreamWrapper BufferedInput;
    110   typedef SnappyPackedMessageReader MessageReader;
    111 
    112   class ArrayMessageReader: private ArrayInputStream, public SnappyPackedMessageReader {
    113   public:
    114     ArrayMessageReader(kj::ArrayPtr<const byte> array,
    115                        ReaderOptions options = ReaderOptions(),
    116                        kj::ArrayPtr<word> scratchSpace = nullptr)
    117       : ArrayInputStream(array),
    118         SnappyPackedMessageReader(static_cast<ArrayInputStream&>(*this), options, scratchSpace,
    119                                   kj::arrayPtr(snappyReadBuffer, SNAPPY_BUFFER_SIZE)) {}
    120   };
    121 
    122   static inline void write(OutputStream& output, MessageBuilder& builder) {
    123     writeSnappyPackedMessage(output, builder,
    124         kj::arrayPtr(snappyWriteBuffer, SNAPPY_BUFFER_SIZE),
    125         kj::arrayPtr(snappyCompressedBuffer, SNAPPY_COMPRESSED_BUFFER_SIZE));
    126   }
    127 };
    128 #endif  // HAVE_SNAPPY
    129 
    130 // =======================================================================================
    131 
    132 struct NoScratch {
    133   struct ScratchSpace {};
    134 
    135   template <typename Compression>
    136   class MessageReader: public Compression::MessageReader {
    137   public:
    138     inline MessageReader(typename Compression::BufferedInput& input, ScratchSpace& scratch)
    139         : Compression::MessageReader(input) {}
    140   };
    141 
    142   template <typename Compression>
    143   class ArrayMessageReader: public Compression::ArrayMessageReader {
    144   public:
    145     inline ArrayMessageReader(kj::ArrayPtr<const byte> input, ScratchSpace& scratch)
    146         : Compression::ArrayMessageReader(input) {}
    147   };
    148 
    149   class MessageBuilder: public MallocMessageBuilder {
    150   public:
    151     inline MessageBuilder(ScratchSpace& scratch): MallocMessageBuilder() {}
    152   };
    153 
    154   class ObjectSizeCounter {
    155   public:
    156     ObjectSizeCounter(uint64_t iters): counter(0) {}
    157 
    158     template <typename RequestBuilder, typename ResponseBuilder>
    159     void add(RequestBuilder& request, ResponseBuilder& response) {
    160       for (auto segment: request.getSegmentsForOutput()) {
    161         counter += segment.size() * sizeof(word);
    162       }
    163       for (auto segment: response.getSegmentsForOutput()) {
    164         counter += segment.size() * sizeof(word);
    165       }
    166     }
    167 
    168     uint64_t get() { return counter; }
    169 
    170   private:
    171     uint64_t counter;
    172   };
    173 };
    174 
    175 constexpr size_t SCRATCH_SIZE = 128 * 1024;
    176 word scratchSpace[6 * SCRATCH_SIZE];
    177 int scratchCounter = 0;
    178 
    179 struct UseScratch {
    180   struct ScratchSpace {
    181     word* words;
    182 
    183     ScratchSpace() {
    184       KJ_REQUIRE(scratchCounter < 6, "Too many scratch spaces needed at once.");
    185       words = scratchSpace + scratchCounter++ * SCRATCH_SIZE;
    186     }
    187     ~ScratchSpace() noexcept {
    188       --scratchCounter;
    189     }
    190   };
    191 
    192   template <typename Compression>
    193   class MessageReader: public Compression::MessageReader {
    194   public:
    195     inline MessageReader(typename Compression::BufferedInput& input, ScratchSpace& scratch)
    196         : Compression::MessageReader(
    197             input, ReaderOptions(), kj::arrayPtr(scratch.words, SCRATCH_SIZE)) {}
    198   };
    199 
    200   template <typename Compression>
    201   class ArrayMessageReader: public Compression::ArrayMessageReader {
    202   public:
    203     inline ArrayMessageReader(kj::ArrayPtr<const byte> input, ScratchSpace& scratch)
    204         : Compression::ArrayMessageReader(
    205             input, ReaderOptions(), kj::arrayPtr(scratch.words, SCRATCH_SIZE)) {}
    206   };
    207 
    208   class MessageBuilder: public MallocMessageBuilder {
    209   public:
    210     inline MessageBuilder(ScratchSpace& scratch)
    211         : MallocMessageBuilder(kj::arrayPtr(scratch.words, SCRATCH_SIZE)) {}
    212   };
    213 
    214   class ObjectSizeCounter {
    215   public:
    216     ObjectSizeCounter(uint64_t iters): iters(iters), maxSize(0) {}
    217 
    218     template <typename RequestBuilder, typename ResponseBuilder>
    219     void add(RequestBuilder& request, ResponseBuilder& response) {
    220       size_t counter = 0;
    221       for (auto segment: request.getSegmentsForOutput()) {
    222         counter += segment.size() * sizeof(word);
    223       }
    224       for (auto segment: response.getSegmentsForOutput()) {
    225         counter += segment.size() * sizeof(word);
    226       }
    227       maxSize = std::max(counter, maxSize);
    228     }
    229 
    230     uint64_t get() { return iters * maxSize; }
    231 
    232   private:
    233     uint64_t iters;
    234     size_t maxSize;
    235   };
    236 };
    237 
    238 // =======================================================================================
    239 
    240 template <typename TestCase, typename ReuseStrategy, typename Compression>
    241 struct BenchmarkMethods {
    242   static uint64_t syncClient(int inputFd, int outputFd, uint64_t iters) {
    243     kj::FdInputStream inputStream(inputFd);
    244     typename Compression::BufferedInput bufferedInput(inputStream);
    245 
    246     CountingOutputStream output(outputFd);
    247     typename ReuseStrategy::ScratchSpace builderScratch;
    248     typename ReuseStrategy::ScratchSpace readerScratch;
    249 
    250     for (; iters > 0; --iters) {
    251       typename TestCase::Expectation expected;
    252       {
    253         typename ReuseStrategy::MessageBuilder builder(builderScratch);
    254         expected = TestCase::setupRequest(
    255             builder.template initRoot<typename TestCase::Request>());
    256         Compression::write(output, builder);
    257       }
    258 
    259       {
    260         typename ReuseStrategy::template MessageReader<Compression> reader(
    261             bufferedInput, readerScratch);
    262         if (!TestCase::checkResponse(
    263             reader.template getRoot<typename TestCase::Response>(), expected)) {
    264           throw std::logic_error("Incorrect response.");
    265         }
    266       }
    267     }
    268 
    269     return output.throughput;
    270   }
    271 
    272   static uint64_t asyncClientSender(
    273       int outputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations,
    274       uint64_t iters) {
    275     CountingOutputStream output(outputFd);
    276     typename ReuseStrategy::ScratchSpace scratch;
    277 
    278     for (; iters > 0; --iters) {
    279       typename ReuseStrategy::MessageBuilder builder(scratch);
    280       expectations->post(TestCase::setupRequest(
    281           builder.template initRoot<typename TestCase::Request>()));
    282       Compression::write(output, builder);
    283     }
    284 
    285     return output.throughput;
    286   }
    287 
    288   static void asyncClientReceiver(
    289       int inputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations,
    290       uint64_t iters) {
    291     kj::FdInputStream inputStream(inputFd);
    292     typename Compression::BufferedInput bufferedInput(inputStream);
    293 
    294     typename ReuseStrategy::ScratchSpace scratch;
    295 
    296     for (; iters > 0; --iters) {
    297       typename TestCase::Expectation expected = expectations->next();
    298       typename ReuseStrategy::template MessageReader<Compression> reader(bufferedInput, scratch);
    299       if (!TestCase::checkResponse(
    300           reader.template getRoot<typename TestCase::Response>(), expected)) {
    301         throw std::logic_error("Incorrect response.");
    302       }
    303     }
    304   }
    305 
    306   static uint64_t asyncClient(int inputFd, int outputFd, uint64_t iters) {
    307     ProducerConsumerQueue<typename TestCase::Expectation> expectations;
    308     std::thread receiverThread(asyncClientReceiver, inputFd, &expectations, iters);
    309     uint64_t throughput = asyncClientSender(outputFd, &expectations, iters);
    310     receiverThread.join();
    311     return throughput;
    312   }
    313 
    314   static uint64_t server(int inputFd, int outputFd, uint64_t iters) {
    315     kj::FdInputStream inputStream(inputFd);
    316     typename Compression::BufferedInput bufferedInput(inputStream);
    317 
    318     CountingOutputStream output(outputFd);
    319     typename ReuseStrategy::ScratchSpace builderScratch;
    320     typename ReuseStrategy::ScratchSpace readerScratch;
    321 
    322     for (; iters > 0; --iters) {
    323       typename ReuseStrategy::MessageBuilder builder(builderScratch);
    324       typename ReuseStrategy::template MessageReader<Compression> reader(
    325           bufferedInput, readerScratch);
    326       TestCase::handleRequest(reader.template getRoot<typename TestCase::Request>(),
    327                               builder.template initRoot<typename TestCase::Response>());
    328       Compression::write(output, builder);
    329     }
    330 
    331     return output.throughput;
    332   }
    333 
    334   static uint64_t passByObject(uint64_t iters, bool countObjectSize) {
    335     typename ReuseStrategy::ScratchSpace requestScratch;
    336     typename ReuseStrategy::ScratchSpace responseScratch;
    337 
    338     typename ReuseStrategy::ObjectSizeCounter counter(iters);
    339 
    340     for (; iters > 0; --iters) {
    341       typename ReuseStrategy::MessageBuilder requestMessage(requestScratch);
    342       auto request = requestMessage.template initRoot<typename TestCase::Request>();
    343       typename TestCase::Expectation expected = TestCase::setupRequest(request);
    344 
    345       typename ReuseStrategy::MessageBuilder responseMessage(responseScratch);
    346       auto response = responseMessage.template initRoot<typename TestCase::Response>();
    347       TestCase::handleRequest(request.asReader(), response);
    348 
    349       if (!TestCase::checkResponse(response.asReader(), expected)) {
    350         throw std::logic_error("Incorrect response.");
    351       }
    352 
    353       if (countObjectSize) {
    354         counter.add(requestMessage, responseMessage);
    355       }
    356     }
    357 
    358     return counter.get();
    359   }
    360 
    361   static uint64_t passByBytes(uint64_t iters) {
    362     uint64_t throughput = 0;
    363     typename ReuseStrategy::ScratchSpace clientRequestScratch;
    364     UseScratch::ScratchSpace requestBytesScratch;
    365     typename ReuseStrategy::ScratchSpace serverRequestScratch;
    366     typename ReuseStrategy::ScratchSpace serverResponseScratch;
    367     UseScratch::ScratchSpace responseBytesScratch;
    368     typename ReuseStrategy::ScratchSpace clientResponseScratch;
    369 
    370     for (; iters > 0; --iters) {
    371       typename ReuseStrategy::MessageBuilder requestBuilder(clientRequestScratch);
    372       typename TestCase::Expectation expected = TestCase::setupRequest(
    373           requestBuilder.template initRoot<typename TestCase::Request>());
    374 
    375       kj::ArrayOutputStream requestOutput(kj::arrayPtr(
    376           reinterpret_cast<byte*>(requestBytesScratch.words), SCRATCH_SIZE * sizeof(word)));
    377       Compression::write(requestOutput, requestBuilder);
    378       throughput += requestOutput.getArray().size();
    379       typename ReuseStrategy::template ArrayMessageReader<Compression> requestReader(
    380           requestOutput.getArray(), serverRequestScratch);
    381 
    382       typename ReuseStrategy::MessageBuilder responseBuilder(serverResponseScratch);
    383       TestCase::handleRequest(requestReader.template getRoot<typename TestCase::Request>(),
    384                               responseBuilder.template initRoot<typename TestCase::Response>());
    385 
    386       kj::ArrayOutputStream responseOutput(
    387           kj::arrayPtr(reinterpret_cast<byte*>(responseBytesScratch.words),
    388                        SCRATCH_SIZE * sizeof(word)));
    389       Compression::write(responseOutput, responseBuilder);
    390       throughput += responseOutput.getArray().size();
    391       typename ReuseStrategy::template ArrayMessageReader<Compression> responseReader(
    392           responseOutput.getArray(), clientResponseScratch);
    393 
    394       if (!TestCase::checkResponse(
    395           responseReader.template getRoot<typename TestCase::Response>(), expected)) {
    396         throw std::logic_error("Incorrect response.");
    397       }
    398      }
    399 
    400     return throughput;
    401   }
    402 };
    403 
    404 struct BenchmarkTypes {
    405   typedef capnp::Uncompressed Uncompressed;
    406   typedef capnp::Packed Packed;
    407 #if HAVE_SNAPPY
    408   typedef capnp::SnappyCompressed SnappyCompressed;
    409 #endif  // HAVE_SNAPPY
    410 
    411   typedef capnp::UseScratch ReusableResources;
    412   typedef capnp::NoScratch SingleUseResources;
    413 
    414   template <typename TestCase, typename ReuseStrategy, typename Compression>
    415   struct BenchmarkMethods: public capnp::BenchmarkMethods<TestCase, ReuseStrategy, Compression> {};
    416 };
    417 
    418 }  // namespace capnp
    419 }  // namespace benchmark
    420 }  // namespace capnp