protobuf-common.h (12347B)
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #include "common.h" 23 #include <google/protobuf/io/zero_copy_stream_impl.h> 24 #include <google/protobuf/io/coded_stream.h> 25 #include <thread> 26 #if HAVE_SNAPPY 27 #include <snappy/snappy.h> 28 #include <snappy/snappy-sinksource.h> 29 #endif // HAVE_SNAPPY 30 31 namespace capnp { 32 namespace benchmark { 33 namespace protobuf { 34 35 // ======================================================================================= 36 37 struct SingleUseMessages { 38 template <typename MessageType> 39 struct Message { 40 struct Reusable {}; 41 struct SingleUse: public MessageType { 42 inline SingleUse(Reusable&) {} 43 }; 44 }; 45 46 struct ReusableString {}; 47 struct SingleUseString: std::string { 48 inline SingleUseString(ReusableString&) {} 49 }; 50 51 template <typename MessageType> 52 static inline void doneWith(MessageType& message) { 53 // Don't clear -- single-use. 54 } 55 }; 56 57 struct ReusableMessages { 58 template <typename MessageType> 59 struct Message { 60 struct Reusable: public MessageType {}; 61 typedef MessageType& SingleUse; 62 }; 63 64 typedef std::string ReusableString; 65 typedef std::string& SingleUseString; 66 67 template <typename MessageType> 68 static inline void doneWith(MessageType& message) { 69 message.Clear(); 70 } 71 }; 72 73 // ======================================================================================= 74 // The protobuf Java library defines a format for writing multiple protobufs to a stream, in which 75 // each message is prefixed by a varint size. This was never added to the C++ library. It's easy 76 // to do naively, but tricky to implement without accidentally losing various optimizations. These 77 // two functions should be optimal. 78 79 struct Uncompressed { 80 typedef google::protobuf::io::FileInputStream InputStream; 81 typedef google::protobuf::io::FileOutputStream OutputStream; 82 83 static uint64_t write(const google::protobuf::MessageLite& message, 84 google::protobuf::io::FileOutputStream* rawOutput) { 85 google::protobuf::io::CodedOutputStream output(rawOutput); 86 const int size = message.ByteSize(); 87 output.WriteVarint32(size); 88 uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size); 89 if (buffer != NULL) { 90 message.SerializeWithCachedSizesToArray(buffer); 91 } else { 92 message.SerializeWithCachedSizes(&output); 93 if (output.HadError()) { 94 throw OsException(rawOutput->GetErrno()); 95 } 96 } 97 98 return size; 99 } 100 101 static void read(google::protobuf::io::ZeroCopyInputStream* rawInput, 102 google::protobuf::MessageLite* message) { 103 google::protobuf::io::CodedInputStream input(rawInput); 104 uint32_t size; 105 GOOGLE_CHECK(input.ReadVarint32(&size)); 106 107 auto limit = input.PushLimit(size); 108 109 GOOGLE_CHECK(message->MergePartialFromCodedStream(&input) && 110 input.ConsumedEntireMessage()); 111 112 input.PopLimit(limit); 113 } 114 115 static void flush(google::protobuf::io::FileOutputStream* output) { 116 if (!output->Flush()) throw OsException(output->GetErrno()); 117 } 118 }; 119 120 // ======================================================================================= 121 // The Snappy interface is really obnoxious. I gave up here and am just reading/writing flat 122 // arrays in some static scratch space. This probably gives protobufs an edge that it doesn't 123 // deserve. 124 125 #if HAVE_SNAPPY 126 127 static char scratch[1 << 20]; 128 static char scratch2[1 << 20]; 129 130 struct SnappyCompressed { 131 typedef int InputStream; 132 typedef int OutputStream; 133 134 static uint64_t write(const google::protobuf::MessageLite& message, int* output) { 135 size_t size = message.ByteSize(); 136 GOOGLE_CHECK_LE(size, sizeof(scratch)); 137 138 message.SerializeWithCachedSizesToArray(reinterpret_cast<uint8_t*>(scratch)); 139 140 size_t compressedSize = 0; 141 snappy::RawCompress(scratch, size, scratch2 + sizeof(uint32_t), &compressedSize); 142 uint32_t tag = compressedSize; 143 memcpy(scratch2, &tag, sizeof(tag)); 144 145 writeAll(*output, scratch2, compressedSize + sizeof(tag)); 146 return compressedSize + sizeof(tag); 147 } 148 149 static void read(int* input, google::protobuf::MessageLite* message) { 150 uint32_t size; 151 readAll(*input, &size, sizeof(size)); 152 readAll(*input, scratch, size); 153 154 size_t uncompressedSize; 155 GOOGLE_CHECK(snappy::GetUncompressedLength(scratch, size, &uncompressedSize)); 156 GOOGLE_CHECK(snappy::RawUncompress(scratch, size, scratch2)); 157 158 GOOGLE_CHECK(message->ParsePartialFromArray(scratch2, uncompressedSize)); 159 } 160 161 static void flush(OutputStream*) {} 162 }; 163 164 #endif // HAVE_SNAPPY 165 166 // ======================================================================================= 167 168 #define REUSABLE(type) \ 169 typename ReuseStrategy::template Message<typename TestCase::type>::Reusable 170 #define SINGLE_USE(type) \ 171 typename ReuseStrategy::template Message<typename TestCase::type>::SingleUse 172 173 template <typename TestCase, typename ReuseStrategy, typename Compression> 174 struct BenchmarkMethods { 175 static uint64_t syncClient(int inputFd, int outputFd, uint64_t iters) { 176 uint64_t throughput = 0; 177 178 typename Compression::OutputStream output(outputFd); 179 typename Compression::InputStream input(inputFd); 180 181 REUSABLE(Request) reusableRequest; 182 REUSABLE(Response) reusableResponse; 183 184 for (; iters > 0; --iters) { 185 SINGLE_USE(Request) request(reusableRequest); 186 typename TestCase::Expectation expected = TestCase::setupRequest(&request); 187 throughput += Compression::write(request, &output); 188 Compression::flush(&output); 189 ReuseStrategy::doneWith(request); 190 191 SINGLE_USE(Response) response(reusableResponse); 192 Compression::read(&input, &response); 193 if (!TestCase::checkResponse(response, expected)) { 194 throw std::logic_error("Incorrect response."); 195 } 196 ReuseStrategy::doneWith(response); 197 } 198 199 return throughput; 200 } 201 202 static uint64_t asyncClientSender( 203 int outputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations, 204 uint64_t iters) { 205 uint64_t throughput = 0; 206 207 typename Compression::OutputStream output(outputFd); 208 REUSABLE(Request) reusableRequest; 209 210 for (; iters > 0; --iters) { 211 SINGLE_USE(Request) request(reusableRequest); 212 expectations->post(TestCase::setupRequest(&request)); 213 throughput += Compression::write(request, &output); 214 Compression::flush(&output); 215 ReuseStrategy::doneWith(request); 216 } 217 218 return throughput; 219 } 220 221 static void asyncClientReceiver( 222 int inputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations, 223 uint64_t iters) { 224 typename Compression::InputStream input(inputFd); 225 REUSABLE(Response) reusableResponse; 226 227 for (; iters > 0; --iters) { 228 typename TestCase::Expectation expected = expectations->next(); 229 SINGLE_USE(Response) response(reusableResponse); 230 Compression::read(&input, &response); 231 if (!TestCase::checkResponse(response, expected)) { 232 throw std::logic_error("Incorrect response."); 233 } 234 ReuseStrategy::doneWith(response); 235 } 236 } 237 238 static uint64_t asyncClient(int inputFd, int outputFd, uint64_t iters) { 239 ProducerConsumerQueue<typename TestCase::Expectation> expectations; 240 std::thread receiverThread(asyncClientReceiver, inputFd, &expectations, iters); 241 uint64_t throughput = asyncClientSender(outputFd, &expectations, iters); 242 receiverThread.join(); 243 244 return throughput; 245 } 246 247 static uint64_t server(int inputFd, int outputFd, uint64_t iters) { 248 uint64_t throughput = 0; 249 250 typename Compression::OutputStream output(outputFd); 251 typename Compression::InputStream input(inputFd); 252 253 REUSABLE(Request) reusableRequest; 254 REUSABLE(Response) reusableResponse; 255 256 for (; iters > 0; --iters) { 257 SINGLE_USE(Request) request(reusableRequest); 258 Compression::read(&input, &request); 259 260 SINGLE_USE(Response) response(reusableResponse); 261 TestCase::handleRequest(request, &response); 262 ReuseStrategy::doneWith(request); 263 264 throughput += Compression::write(response, &output); 265 Compression::flush(&output); 266 ReuseStrategy::doneWith(response); 267 } 268 269 return throughput; 270 } 271 272 static uint64_t passByObject(uint64_t iters, bool countObjectSize) { 273 uint64_t throughput = 0; 274 275 REUSABLE(Request) reusableRequest; 276 REUSABLE(Response) reusableResponse; 277 278 for (; iters > 0; --iters) { 279 SINGLE_USE(Request) request(reusableRequest); 280 typename TestCase::Expectation expected = TestCase::setupRequest(&request); 281 282 SINGLE_USE(Response) response(reusableResponse); 283 TestCase::handleRequest(request, &response); 284 ReuseStrategy::doneWith(request); 285 if (!TestCase::checkResponse(response, expected)) { 286 throw std::logic_error("Incorrect response."); 287 } 288 ReuseStrategy::doneWith(response); 289 290 if (countObjectSize) { 291 throughput += request.SpaceUsed(); 292 throughput += response.SpaceUsed(); 293 } 294 } 295 296 return throughput; 297 } 298 299 static uint64_t passByBytes(uint64_t iters) { 300 uint64_t throughput = 0; 301 302 REUSABLE(Request) reusableClientRequest; 303 REUSABLE(Request) reusableServerRequest; 304 REUSABLE(Response) reusableServerResponse; 305 REUSABLE(Response) reusableClientResponse; 306 typename ReuseStrategy::ReusableString reusableRequestString, reusableResponseString; 307 308 for (; iters > 0; --iters) { 309 SINGLE_USE(Request) clientRequest(reusableClientRequest); 310 typename TestCase::Expectation expected = TestCase::setupRequest(&clientRequest); 311 312 typename ReuseStrategy::SingleUseString requestString(reusableRequestString); 313 clientRequest.SerializePartialToString(&requestString); 314 throughput += requestString.size(); 315 ReuseStrategy::doneWith(clientRequest); 316 317 SINGLE_USE(Request) serverRequest(reusableServerRequest); 318 serverRequest.ParsePartialFromString(requestString); 319 320 SINGLE_USE(Response) serverResponse(reusableServerResponse); 321 TestCase::handleRequest(serverRequest, &serverResponse); 322 ReuseStrategy::doneWith(serverRequest); 323 324 typename ReuseStrategy::SingleUseString responseString(reusableResponseString); 325 serverResponse.SerializePartialToString(&responseString); 326 throughput += responseString.size(); 327 ReuseStrategy::doneWith(serverResponse); 328 329 SINGLE_USE(Response) clientResponse(reusableClientResponse); 330 clientResponse.ParsePartialFromString(responseString); 331 332 if (!TestCase::checkResponse(clientResponse, expected)) { 333 throw std::logic_error("Incorrect response."); 334 } 335 ReuseStrategy::doneWith(clientResponse); 336 } 337 338 return throughput; 339 } 340 }; 341 342 struct BenchmarkTypes { 343 typedef protobuf::Uncompressed Uncompressed; 344 typedef protobuf::Uncompressed Packed; 345 #if HAVE_SNAPPY 346 typedef protobuf::SnappyCompressed SnappyCompressed; 347 #endif // HAVE_SNAPPY 348 349 typedef protobuf::ReusableMessages ReusableResources; 350 typedef protobuf::SingleUseMessages SingleUseResources; 351 352 template <typename TestCase, typename ReuseStrategy, typename Compression> 353 struct BenchmarkMethods 354 : public protobuf::BenchmarkMethods<TestCase, ReuseStrategy, Compression> {}; 355 }; 356 357 } // namespace protobuf 358 } // namespace benchmark 359 } // namespace capnp