capnproto-common.h (15160B)
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #pragma once 23 24 #if defined(__GNUC__) && !defined(CAPNP_HEADER_WARNINGS) 25 #pragma GCC system_header 26 #endif 27 28 #include "common.h" 29 #include <capnp/serialize.h> 30 #include <capnp/serialize-packed.h> 31 #include <kj/debug.h> 32 #if HAVE_SNAPPY 33 #include <capnp/serialize-snappy.h> 34 #endif // HAVE_SNAPPY 35 #include <thread> 36 37 namespace capnp { 38 namespace benchmark { 39 namespace capnp { 40 41 class CountingOutputStream: public kj::FdOutputStream { 42 public: 43 CountingOutputStream(int fd): FdOutputStream(fd), throughput(0) {} 44 45 uint64_t throughput; 46 47 void write(const void* buffer, size_t size) override { 48 FdOutputStream::write(buffer, size); 49 throughput += size; 50 } 51 52 void write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override { 53 FdOutputStream::write(pieces); 54 for (auto& piece: pieces) { 55 throughput += piece.size(); 56 } 57 } 58 }; 59 60 // ======================================================================================= 61 62 struct Uncompressed { 63 typedef kj::FdInputStream& BufferedInput; 64 typedef InputStreamMessageReader MessageReader; 65 66 class ArrayMessageReader: public FlatArrayMessageReader { 67 public: 68 ArrayMessageReader(kj::ArrayPtr<const byte> array, 69 ReaderOptions options = ReaderOptions(), 70 kj::ArrayPtr<word> scratchSpace = nullptr) 71 : FlatArrayMessageReader(kj::arrayPtr( 72 reinterpret_cast<const word*>(array.begin()), 73 reinterpret_cast<const word*>(array.end())), options) {} 74 }; 75 76 static inline void write(kj::OutputStream& output, MessageBuilder& builder) { 77 writeMessage(output, builder); 78 } 79 }; 80 81 struct Packed { 82 typedef kj::BufferedInputStreamWrapper BufferedInput; 83 typedef PackedMessageReader MessageReader; 84 85 class ArrayMessageReader: private kj::ArrayInputStream, public PackedMessageReader { 86 public: 87 ArrayMessageReader(kj::ArrayPtr<const byte> array, 88 ReaderOptions options = ReaderOptions(), 89 kj::ArrayPtr<word> scratchSpace = nullptr) 90 : ArrayInputStream(array), 91 PackedMessageReader(*this, options, scratchSpace) {} 92 }; 93 94 static inline void write(kj::OutputStream& output, MessageBuilder& builder) { 95 writePackedMessage(output, builder); 96 } 97 98 static inline void write(kj::BufferedOutputStream& output, MessageBuilder& builder) { 99 writePackedMessage(output, builder); 100 } 101 }; 102 103 #if HAVE_SNAPPY 104 static byte snappyReadBuffer[SNAPPY_BUFFER_SIZE]; 105 static byte snappyWriteBuffer[SNAPPY_BUFFER_SIZE]; 106 static byte snappyCompressedBuffer[SNAPPY_COMPRESSED_BUFFER_SIZE]; 107 108 struct SnappyCompressed { 109 typedef BufferedInputStreamWrapper BufferedInput; 110 typedef SnappyPackedMessageReader MessageReader; 111 112 class ArrayMessageReader: private ArrayInputStream, public SnappyPackedMessageReader { 113 public: 114 ArrayMessageReader(kj::ArrayPtr<const byte> array, 115 ReaderOptions options = ReaderOptions(), 116 kj::ArrayPtr<word> scratchSpace = nullptr) 117 : ArrayInputStream(array), 118 SnappyPackedMessageReader(static_cast<ArrayInputStream&>(*this), options, scratchSpace, 119 kj::arrayPtr(snappyReadBuffer, SNAPPY_BUFFER_SIZE)) {} 120 }; 121 122 static inline void write(OutputStream& output, MessageBuilder& builder) { 123 writeSnappyPackedMessage(output, builder, 124 kj::arrayPtr(snappyWriteBuffer, SNAPPY_BUFFER_SIZE), 125 kj::arrayPtr(snappyCompressedBuffer, SNAPPY_COMPRESSED_BUFFER_SIZE)); 126 } 127 }; 128 #endif // HAVE_SNAPPY 129 130 // ======================================================================================= 131 132 struct NoScratch { 133 struct ScratchSpace {}; 134 135 template <typename Compression> 136 class MessageReader: public Compression::MessageReader { 137 public: 138 inline MessageReader(typename Compression::BufferedInput& input, ScratchSpace& scratch) 139 : Compression::MessageReader(input) {} 140 }; 141 142 template <typename Compression> 143 class ArrayMessageReader: public Compression::ArrayMessageReader { 144 public: 145 inline ArrayMessageReader(kj::ArrayPtr<const byte> input, ScratchSpace& scratch) 146 : Compression::ArrayMessageReader(input) {} 147 }; 148 149 class MessageBuilder: public MallocMessageBuilder { 150 public: 151 inline MessageBuilder(ScratchSpace& scratch): MallocMessageBuilder() {} 152 }; 153 154 class ObjectSizeCounter { 155 public: 156 ObjectSizeCounter(uint64_t iters): counter(0) {} 157 158 template <typename RequestBuilder, typename ResponseBuilder> 159 void add(RequestBuilder& request, ResponseBuilder& response) { 160 for (auto segment: request.getSegmentsForOutput()) { 161 counter += segment.size() * sizeof(word); 162 } 163 for (auto segment: response.getSegmentsForOutput()) { 164 counter += segment.size() * sizeof(word); 165 } 166 } 167 168 uint64_t get() { return counter; } 169 170 private: 171 uint64_t counter; 172 }; 173 }; 174 175 constexpr size_t SCRATCH_SIZE = 128 * 1024; 176 word scratchSpace[6 * SCRATCH_SIZE]; 177 int scratchCounter = 0; 178 179 struct UseScratch { 180 struct ScratchSpace { 181 word* words; 182 183 ScratchSpace() { 184 KJ_REQUIRE(scratchCounter < 6, "Too many scratch spaces needed at once."); 185 words = scratchSpace + scratchCounter++ * SCRATCH_SIZE; 186 } 187 ~ScratchSpace() noexcept { 188 --scratchCounter; 189 } 190 }; 191 192 template <typename Compression> 193 class MessageReader: public Compression::MessageReader { 194 public: 195 inline MessageReader(typename Compression::BufferedInput& input, ScratchSpace& scratch) 196 : Compression::MessageReader( 197 input, ReaderOptions(), kj::arrayPtr(scratch.words, SCRATCH_SIZE)) {} 198 }; 199 200 template <typename Compression> 201 class ArrayMessageReader: public Compression::ArrayMessageReader { 202 public: 203 inline ArrayMessageReader(kj::ArrayPtr<const byte> input, ScratchSpace& scratch) 204 : Compression::ArrayMessageReader( 205 input, ReaderOptions(), kj::arrayPtr(scratch.words, SCRATCH_SIZE)) {} 206 }; 207 208 class MessageBuilder: public MallocMessageBuilder { 209 public: 210 inline MessageBuilder(ScratchSpace& scratch) 211 : MallocMessageBuilder(kj::arrayPtr(scratch.words, SCRATCH_SIZE)) {} 212 }; 213 214 class ObjectSizeCounter { 215 public: 216 ObjectSizeCounter(uint64_t iters): iters(iters), maxSize(0) {} 217 218 template <typename RequestBuilder, typename ResponseBuilder> 219 void add(RequestBuilder& request, ResponseBuilder& response) { 220 size_t counter = 0; 221 for (auto segment: request.getSegmentsForOutput()) { 222 counter += segment.size() * sizeof(word); 223 } 224 for (auto segment: response.getSegmentsForOutput()) { 225 counter += segment.size() * sizeof(word); 226 } 227 maxSize = std::max(counter, maxSize); 228 } 229 230 uint64_t get() { return iters * maxSize; } 231 232 private: 233 uint64_t iters; 234 size_t maxSize; 235 }; 236 }; 237 238 // ======================================================================================= 239 240 template <typename TestCase, typename ReuseStrategy, typename Compression> 241 struct BenchmarkMethods { 242 static uint64_t syncClient(int inputFd, int outputFd, uint64_t iters) { 243 kj::FdInputStream inputStream(inputFd); 244 typename Compression::BufferedInput bufferedInput(inputStream); 245 246 CountingOutputStream output(outputFd); 247 typename ReuseStrategy::ScratchSpace builderScratch; 248 typename ReuseStrategy::ScratchSpace readerScratch; 249 250 for (; iters > 0; --iters) { 251 typename TestCase::Expectation expected; 252 { 253 typename ReuseStrategy::MessageBuilder builder(builderScratch); 254 expected = TestCase::setupRequest( 255 builder.template initRoot<typename TestCase::Request>()); 256 Compression::write(output, builder); 257 } 258 259 { 260 typename ReuseStrategy::template MessageReader<Compression> reader( 261 bufferedInput, readerScratch); 262 if (!TestCase::checkResponse( 263 reader.template getRoot<typename TestCase::Response>(), expected)) { 264 throw std::logic_error("Incorrect response."); 265 } 266 } 267 } 268 269 return output.throughput; 270 } 271 272 static uint64_t asyncClientSender( 273 int outputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations, 274 uint64_t iters) { 275 CountingOutputStream output(outputFd); 276 typename ReuseStrategy::ScratchSpace scratch; 277 278 for (; iters > 0; --iters) { 279 typename ReuseStrategy::MessageBuilder builder(scratch); 280 expectations->post(TestCase::setupRequest( 281 builder.template initRoot<typename TestCase::Request>())); 282 Compression::write(output, builder); 283 } 284 285 return output.throughput; 286 } 287 288 static void asyncClientReceiver( 289 int inputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations, 290 uint64_t iters) { 291 kj::FdInputStream inputStream(inputFd); 292 typename Compression::BufferedInput bufferedInput(inputStream); 293 294 typename ReuseStrategy::ScratchSpace scratch; 295 296 for (; iters > 0; --iters) { 297 typename TestCase::Expectation expected = expectations->next(); 298 typename ReuseStrategy::template MessageReader<Compression> reader(bufferedInput, scratch); 299 if (!TestCase::checkResponse( 300 reader.template getRoot<typename TestCase::Response>(), expected)) { 301 throw std::logic_error("Incorrect response."); 302 } 303 } 304 } 305 306 static uint64_t asyncClient(int inputFd, int outputFd, uint64_t iters) { 307 ProducerConsumerQueue<typename TestCase::Expectation> expectations; 308 std::thread receiverThread(asyncClientReceiver, inputFd, &expectations, iters); 309 uint64_t throughput = asyncClientSender(outputFd, &expectations, iters); 310 receiverThread.join(); 311 return throughput; 312 } 313 314 static uint64_t server(int inputFd, int outputFd, uint64_t iters) { 315 kj::FdInputStream inputStream(inputFd); 316 typename Compression::BufferedInput bufferedInput(inputStream); 317 318 CountingOutputStream output(outputFd); 319 typename ReuseStrategy::ScratchSpace builderScratch; 320 typename ReuseStrategy::ScratchSpace readerScratch; 321 322 for (; iters > 0; --iters) { 323 typename ReuseStrategy::MessageBuilder builder(builderScratch); 324 typename ReuseStrategy::template MessageReader<Compression> reader( 325 bufferedInput, readerScratch); 326 TestCase::handleRequest(reader.template getRoot<typename TestCase::Request>(), 327 builder.template initRoot<typename TestCase::Response>()); 328 Compression::write(output, builder); 329 } 330 331 return output.throughput; 332 } 333 334 static uint64_t passByObject(uint64_t iters, bool countObjectSize) { 335 typename ReuseStrategy::ScratchSpace requestScratch; 336 typename ReuseStrategy::ScratchSpace responseScratch; 337 338 typename ReuseStrategy::ObjectSizeCounter counter(iters); 339 340 for (; iters > 0; --iters) { 341 typename ReuseStrategy::MessageBuilder requestMessage(requestScratch); 342 auto request = requestMessage.template initRoot<typename TestCase::Request>(); 343 typename TestCase::Expectation expected = TestCase::setupRequest(request); 344 345 typename ReuseStrategy::MessageBuilder responseMessage(responseScratch); 346 auto response = responseMessage.template initRoot<typename TestCase::Response>(); 347 TestCase::handleRequest(request.asReader(), response); 348 349 if (!TestCase::checkResponse(response.asReader(), expected)) { 350 throw std::logic_error("Incorrect response."); 351 } 352 353 if (countObjectSize) { 354 counter.add(requestMessage, responseMessage); 355 } 356 } 357 358 return counter.get(); 359 } 360 361 static uint64_t passByBytes(uint64_t iters) { 362 uint64_t throughput = 0; 363 typename ReuseStrategy::ScratchSpace clientRequestScratch; 364 UseScratch::ScratchSpace requestBytesScratch; 365 typename ReuseStrategy::ScratchSpace serverRequestScratch; 366 typename ReuseStrategy::ScratchSpace serverResponseScratch; 367 UseScratch::ScratchSpace responseBytesScratch; 368 typename ReuseStrategy::ScratchSpace clientResponseScratch; 369 370 for (; iters > 0; --iters) { 371 typename ReuseStrategy::MessageBuilder requestBuilder(clientRequestScratch); 372 typename TestCase::Expectation expected = TestCase::setupRequest( 373 requestBuilder.template initRoot<typename TestCase::Request>()); 374 375 kj::ArrayOutputStream requestOutput(kj::arrayPtr( 376 reinterpret_cast<byte*>(requestBytesScratch.words), SCRATCH_SIZE * sizeof(word))); 377 Compression::write(requestOutput, requestBuilder); 378 throughput += requestOutput.getArray().size(); 379 typename ReuseStrategy::template ArrayMessageReader<Compression> requestReader( 380 requestOutput.getArray(), serverRequestScratch); 381 382 typename ReuseStrategy::MessageBuilder responseBuilder(serverResponseScratch); 383 TestCase::handleRequest(requestReader.template getRoot<typename TestCase::Request>(), 384 responseBuilder.template initRoot<typename TestCase::Response>()); 385 386 kj::ArrayOutputStream responseOutput( 387 kj::arrayPtr(reinterpret_cast<byte*>(responseBytesScratch.words), 388 SCRATCH_SIZE * sizeof(word))); 389 Compression::write(responseOutput, responseBuilder); 390 throughput += responseOutput.getArray().size(); 391 typename ReuseStrategy::template ArrayMessageReader<Compression> responseReader( 392 responseOutput.getArray(), clientResponseScratch); 393 394 if (!TestCase::checkResponse( 395 responseReader.template getRoot<typename TestCase::Response>(), expected)) { 396 throw std::logic_error("Incorrect response."); 397 } 398 } 399 400 return throughput; 401 } 402 }; 403 404 struct BenchmarkTypes { 405 typedef capnp::Uncompressed Uncompressed; 406 typedef capnp::Packed Packed; 407 #if HAVE_SNAPPY 408 typedef capnp::SnappyCompressed SnappyCompressed; 409 #endif // HAVE_SNAPPY 410 411 typedef capnp::UseScratch ReusableResources; 412 typedef capnp::NoScratch SingleUseResources; 413 414 template <typename TestCase, typename ReuseStrategy, typename Compression> 415 struct BenchmarkMethods: public capnp::BenchmarkMethods<TestCase, ReuseStrategy, Compression> {}; 416 }; 417 418 } // namespace capnp 419 } // namespace benchmark 420 } // namespace capnp