capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

protobuf-common.h (12347B)


      1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #include "common.h"
     23 #include <google/protobuf/io/zero_copy_stream_impl.h>
     24 #include <google/protobuf/io/coded_stream.h>
     25 #include <thread>
     26 #if HAVE_SNAPPY
     27 #include <snappy/snappy.h>
     28 #include <snappy/snappy-sinksource.h>
     29 #endif  // HAVE_SNAPPY
     30 
     31 namespace capnp {
     32 namespace benchmark {
     33 namespace protobuf {
     34 
     35 // =======================================================================================
     36 
     37 struct SingleUseMessages {
     38   template <typename MessageType>
     39   struct Message {
     40     struct Reusable {};
     41     struct SingleUse: public MessageType {
     42       inline SingleUse(Reusable&) {}
     43     };
     44   };
     45 
     46   struct ReusableString {};
     47   struct SingleUseString: std::string {
     48     inline SingleUseString(ReusableString&) {}
     49   };
     50 
     51   template <typename MessageType>
     52   static inline void doneWith(MessageType& message) {
     53     // Don't clear -- single-use.
     54   }
     55 };
     56 
     57 struct ReusableMessages {
     58   template <typename MessageType>
     59   struct Message {
     60     struct Reusable: public MessageType {};
     61     typedef MessageType& SingleUse;
     62   };
     63 
     64   typedef std::string ReusableString;
     65   typedef std::string& SingleUseString;
     66 
     67   template <typename MessageType>
     68   static inline void doneWith(MessageType& message) {
     69     message.Clear();
     70   }
     71 };
     72 
     73 // =======================================================================================
     74 // The protobuf Java library defines a format for writing multiple protobufs to a stream, in which
     75 // each message is prefixed by a varint size.  This was never added to the C++ library.  It's easy
     76 // to do naively, but tricky to implement without accidentally losing various optimizations.  These
     77 // two functions should be optimal.
     78 
     79 struct Uncompressed {
     80   typedef google::protobuf::io::FileInputStream InputStream;
     81   typedef google::protobuf::io::FileOutputStream OutputStream;
     82 
     83   static uint64_t write(const google::protobuf::MessageLite& message,
     84                         google::protobuf::io::FileOutputStream* rawOutput) {
     85     google::protobuf::io::CodedOutputStream output(rawOutput);
     86     const int size = message.ByteSize();
     87     output.WriteVarint32(size);
     88     uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
     89     if (buffer != NULL) {
     90       message.SerializeWithCachedSizesToArray(buffer);
     91     } else {
     92       message.SerializeWithCachedSizes(&output);
     93       if (output.HadError()) {
     94         throw OsException(rawOutput->GetErrno());
     95       }
     96     }
     97 
     98     return size;
     99   }
    100 
    101   static void read(google::protobuf::io::ZeroCopyInputStream* rawInput,
    102                    google::protobuf::MessageLite* message) {
    103     google::protobuf::io::CodedInputStream input(rawInput);
    104     uint32_t size;
    105     GOOGLE_CHECK(input.ReadVarint32(&size));
    106 
    107     auto limit = input.PushLimit(size);
    108 
    109     GOOGLE_CHECK(message->MergePartialFromCodedStream(&input) &&
    110                  input.ConsumedEntireMessage());
    111 
    112     input.PopLimit(limit);
    113   }
    114 
    115   static void flush(google::protobuf::io::FileOutputStream* output) {
    116     if (!output->Flush()) throw OsException(output->GetErrno());
    117   }
    118 };
    119 
    120 // =======================================================================================
    121 // The Snappy interface is really obnoxious.  I gave up here and am just reading/writing flat
    122 // arrays in some static scratch space.  This probably gives protobufs an edge that it doesn't
    123 // deserve.
    124 
    125 #if HAVE_SNAPPY
    126 
    127 static char scratch[1 << 20];
    128 static char scratch2[1 << 20];
    129 
    130 struct SnappyCompressed {
    131   typedef int InputStream;
    132   typedef int OutputStream;
    133 
    134   static uint64_t write(const google::protobuf::MessageLite& message, int* output) {
    135     size_t size = message.ByteSize();
    136     GOOGLE_CHECK_LE(size, sizeof(scratch));
    137 
    138     message.SerializeWithCachedSizesToArray(reinterpret_cast<uint8_t*>(scratch));
    139 
    140     size_t compressedSize = 0;
    141     snappy::RawCompress(scratch, size, scratch2 + sizeof(uint32_t), &compressedSize);
    142     uint32_t tag = compressedSize;
    143     memcpy(scratch2, &tag, sizeof(tag));
    144 
    145     writeAll(*output, scratch2, compressedSize + sizeof(tag));
    146     return compressedSize + sizeof(tag);
    147   }
    148 
    149   static void read(int* input, google::protobuf::MessageLite* message) {
    150     uint32_t size;
    151     readAll(*input, &size, sizeof(size));
    152     readAll(*input, scratch, size);
    153 
    154     size_t uncompressedSize;
    155     GOOGLE_CHECK(snappy::GetUncompressedLength(scratch, size, &uncompressedSize));
    156     GOOGLE_CHECK(snappy::RawUncompress(scratch, size, scratch2));
    157 
    158     GOOGLE_CHECK(message->ParsePartialFromArray(scratch2, uncompressedSize));
    159   }
    160 
    161   static void flush(OutputStream*) {}
    162 };
    163 
    164 #endif  // HAVE_SNAPPY
    165 
    166 // =======================================================================================
    167 
    168 #define REUSABLE(type) \
    169   typename ReuseStrategy::template Message<typename TestCase::type>::Reusable
    170 #define SINGLE_USE(type) \
    171   typename ReuseStrategy::template Message<typename TestCase::type>::SingleUse
    172 
    173 template <typename TestCase, typename ReuseStrategy, typename Compression>
    174 struct BenchmarkMethods {
    175   static uint64_t syncClient(int inputFd, int outputFd, uint64_t iters) {
    176     uint64_t throughput = 0;
    177 
    178     typename Compression::OutputStream output(outputFd);
    179     typename Compression::InputStream input(inputFd);
    180 
    181     REUSABLE(Request) reusableRequest;
    182     REUSABLE(Response) reusableResponse;
    183 
    184     for (; iters > 0; --iters) {
    185       SINGLE_USE(Request) request(reusableRequest);
    186       typename TestCase::Expectation expected = TestCase::setupRequest(&request);
    187       throughput += Compression::write(request, &output);
    188       Compression::flush(&output);
    189       ReuseStrategy::doneWith(request);
    190 
    191       SINGLE_USE(Response) response(reusableResponse);
    192       Compression::read(&input, &response);
    193       if (!TestCase::checkResponse(response, expected)) {
    194         throw std::logic_error("Incorrect response.");
    195       }
    196       ReuseStrategy::doneWith(response);
    197     }
    198 
    199     return throughput;
    200   }
    201 
    202   static uint64_t asyncClientSender(
    203       int outputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations,
    204       uint64_t iters) {
    205     uint64_t throughput = 0;
    206 
    207     typename Compression::OutputStream output(outputFd);
    208     REUSABLE(Request) reusableRequest;
    209 
    210     for (; iters > 0; --iters) {
    211       SINGLE_USE(Request) request(reusableRequest);
    212       expectations->post(TestCase::setupRequest(&request));
    213       throughput += Compression::write(request, &output);
    214       Compression::flush(&output);
    215       ReuseStrategy::doneWith(request);
    216     }
    217 
    218     return throughput;
    219   }
    220 
    221   static void asyncClientReceiver(
    222       int inputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations,
    223       uint64_t iters) {
    224     typename Compression::InputStream input(inputFd);
    225     REUSABLE(Response) reusableResponse;
    226 
    227     for (; iters > 0; --iters) {
    228       typename TestCase::Expectation expected = expectations->next();
    229       SINGLE_USE(Response) response(reusableResponse);
    230       Compression::read(&input, &response);
    231       if (!TestCase::checkResponse(response, expected)) {
    232         throw std::logic_error("Incorrect response.");
    233       }
    234       ReuseStrategy::doneWith(response);
    235     }
    236   }
    237 
    238   static uint64_t asyncClient(int inputFd, int outputFd, uint64_t iters) {
    239     ProducerConsumerQueue<typename TestCase::Expectation> expectations;
    240     std::thread receiverThread(asyncClientReceiver, inputFd, &expectations, iters);
    241     uint64_t throughput = asyncClientSender(outputFd, &expectations, iters);
    242     receiverThread.join();
    243 
    244     return throughput;
    245   }
    246 
    247   static uint64_t server(int inputFd, int outputFd, uint64_t iters) {
    248     uint64_t throughput = 0;
    249 
    250     typename Compression::OutputStream output(outputFd);
    251     typename Compression::InputStream input(inputFd);
    252 
    253     REUSABLE(Request) reusableRequest;
    254     REUSABLE(Response) reusableResponse;
    255 
    256     for (; iters > 0; --iters) {
    257       SINGLE_USE(Request) request(reusableRequest);
    258       Compression::read(&input, &request);
    259 
    260       SINGLE_USE(Response) response(reusableResponse);
    261       TestCase::handleRequest(request, &response);
    262       ReuseStrategy::doneWith(request);
    263 
    264       throughput += Compression::write(response, &output);
    265       Compression::flush(&output);
    266       ReuseStrategy::doneWith(response);
    267     }
    268 
    269     return throughput;
    270   }
    271 
    272   static uint64_t passByObject(uint64_t iters, bool countObjectSize) {
    273     uint64_t throughput = 0;
    274 
    275     REUSABLE(Request) reusableRequest;
    276     REUSABLE(Response) reusableResponse;
    277 
    278     for (; iters > 0; --iters) {
    279       SINGLE_USE(Request) request(reusableRequest);
    280       typename TestCase::Expectation expected = TestCase::setupRequest(&request);
    281 
    282       SINGLE_USE(Response) response(reusableResponse);
    283       TestCase::handleRequest(request, &response);
    284       ReuseStrategy::doneWith(request);
    285       if (!TestCase::checkResponse(response, expected)) {
    286         throw std::logic_error("Incorrect response.");
    287       }
    288       ReuseStrategy::doneWith(response);
    289 
    290       if (countObjectSize) {
    291         throughput += request.SpaceUsed();
    292         throughput += response.SpaceUsed();
    293       }
    294     }
    295 
    296     return throughput;
    297   }
    298 
    299   static uint64_t passByBytes(uint64_t iters) {
    300     uint64_t throughput = 0;
    301 
    302     REUSABLE(Request) reusableClientRequest;
    303     REUSABLE(Request) reusableServerRequest;
    304     REUSABLE(Response) reusableServerResponse;
    305     REUSABLE(Response) reusableClientResponse;
    306     typename ReuseStrategy::ReusableString reusableRequestString, reusableResponseString;
    307 
    308     for (; iters > 0; --iters) {
    309       SINGLE_USE(Request) clientRequest(reusableClientRequest);
    310       typename TestCase::Expectation expected = TestCase::setupRequest(&clientRequest);
    311 
    312       typename ReuseStrategy::SingleUseString requestString(reusableRequestString);
    313       clientRequest.SerializePartialToString(&requestString);
    314       throughput += requestString.size();
    315       ReuseStrategy::doneWith(clientRequest);
    316 
    317       SINGLE_USE(Request) serverRequest(reusableServerRequest);
    318       serverRequest.ParsePartialFromString(requestString);
    319 
    320       SINGLE_USE(Response) serverResponse(reusableServerResponse);
    321       TestCase::handleRequest(serverRequest, &serverResponse);
    322       ReuseStrategy::doneWith(serverRequest);
    323 
    324       typename ReuseStrategy::SingleUseString responseString(reusableResponseString);
    325       serverResponse.SerializePartialToString(&responseString);
    326       throughput += responseString.size();
    327       ReuseStrategy::doneWith(serverResponse);
    328 
    329       SINGLE_USE(Response) clientResponse(reusableClientResponse);
    330       clientResponse.ParsePartialFromString(responseString);
    331 
    332       if (!TestCase::checkResponse(clientResponse, expected)) {
    333         throw std::logic_error("Incorrect response.");
    334       }
    335       ReuseStrategy::doneWith(clientResponse);
    336     }
    337 
    338     return throughput;
    339   }
    340 };
    341 
    342 struct BenchmarkTypes {
    343   typedef protobuf::Uncompressed Uncompressed;
    344   typedef protobuf::Uncompressed Packed;
    345 #if HAVE_SNAPPY
    346   typedef protobuf::SnappyCompressed SnappyCompressed;
    347 #endif  // HAVE_SNAPPY
    348 
    349   typedef protobuf::ReusableMessages ReusableResources;
    350   typedef protobuf::SingleUseMessages SingleUseResources;
    351 
    352   template <typename TestCase, typename ReuseStrategy, typename Compression>
    353   struct BenchmarkMethods
    354       : public protobuf::BenchmarkMethods<TestCase, ReuseStrategy, Compression> {};
    355 };
    356 
    357 }  // namespace protobuf
    358 }  // namespace benchmark
    359 }  // namespace capnp