capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

serialize-async-test.c++ (11462B)


      1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #ifndef _GNU_SOURCE
     23 #define _GNU_SOURCE
     24 #endif
     25 
     26 #if _WIN32
     27 #include <kj/win32-api-version.h>
     28 #endif
     29 
     30 #include "serialize-async.h"
     31 #include "serialize.h"
     32 #include <kj/debug.h>
     33 #include <kj/thread.h>
     34 #include <stdlib.h>
     35 #include <kj/miniposix.h>
     36 #include "test-util.h"
     37 #include <kj/compat/gtest.h>
     38 
     39 #if _WIN32
     40 #include <winsock2.h>
     41 #include <kj/windows-sanity.h>
     42 namespace kj {
     43   namespace _ {
     44     int win32Socketpair(SOCKET socks[2]);
     45   }
     46 }
     47 #else
     48 #include <sys/socket.h>
     49 #endif
     50 
     51 namespace capnp {
     52 namespace _ {  // private
     53 namespace {
     54 
     55 #if _WIN32
     56 inline void delay() { Sleep(5); }
     57 #else
     58 inline void delay() { usleep(5000); }
     59 #endif
     60 
     61 class FragmentingOutputStream: public kj::OutputStream {
     62 public:
     63   FragmentingOutputStream(kj::OutputStream& inner): inner(inner) {}
     64 
     65   void write(const void* buffer, size_t size) override {
     66     while (size > 0) {
     67       delay();
     68       size_t n = rand() % size + 1;
     69       inner.write(buffer, n);
     70       buffer = reinterpret_cast<const byte*>(buffer) + n;
     71       size -= n;
     72     }
     73   }
     74 
     75 private:
     76   kj::OutputStream& inner;
     77 };
     78 
     79 class TestMessageBuilder: public MallocMessageBuilder {
     80   // A MessageBuilder that tries to allocate an exact number of total segments, by allocating
     81   // minimum-size segments until it reaches the number, then allocating one large segment to
     82   // finish.
     83 
     84 public:
     85   explicit TestMessageBuilder(uint desiredSegmentCount)
     86       : MallocMessageBuilder(0, AllocationStrategy::FIXED_SIZE),
     87         desiredSegmentCount(desiredSegmentCount) {}
     88   ~TestMessageBuilder() {
     89     EXPECT_EQ(0u, desiredSegmentCount);
     90   }
     91 
     92   kj::ArrayPtr<word> allocateSegment(uint minimumSize) override {
     93     if (desiredSegmentCount <= 1) {
     94       if (desiredSegmentCount < 1) {
     95         ADD_FAILURE() << "Allocated more segments than desired.";
     96       } else {
     97         --desiredSegmentCount;
     98       }
     99       return MallocMessageBuilder::allocateSegment(8192);
    100     } else {
    101       --desiredSegmentCount;
    102       return MallocMessageBuilder::allocateSegment(minimumSize);
    103     }
    104   }
    105 
    106 private:
    107   uint desiredSegmentCount;
    108 };
    109 
    110 class PipeWithSmallBuffer {
    111 public:
    112 #ifdef _WIN32
    113 #define KJ_SOCKCALL KJ_WINSOCK
    114 #ifndef SHUT_WR
    115 #define SHUT_WR SD_SEND
    116 #endif
    117 #define socketpair(family, type, flags, fds) kj::_::win32Socketpair(fds)
    118 #else
    119 #define KJ_SOCKCALL KJ_SYSCALL
    120 #endif
    121 
    122   PipeWithSmallBuffer() {
    123     // Use a socketpair rather than a pipe so that we can set the buffer size extremely small.
    124     KJ_SOCKCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
    125 
    126     KJ_SOCKCALL(shutdown(fds[0], SHUT_WR));
    127     // Note:  OSX reports ENOTCONN if we also try to shutdown(fds[1], SHUT_RD).
    128 
    129     // Request that the buffer size be as small as possible, to force the event loop to kick in.
    130     // FUN STUFF:
    131     // - On Linux, the kernel rounds up to the smallest size it permits, so we can ask for a size of
    132     //   zero.
    133     // - On OSX, the kernel reports EINVAL on zero, but will dutifully use a 1-byte buffer if we
    134     //   set the size to 1.  This tends to cause stack overflows due to ridiculously long promise
    135     //   chains.
    136     // - Cygwin will apparently actually use a buffer size of 0 and therefore block forever waiting
    137     //   for buffer space.
    138     // - GNU HURD throws ENOPROTOOPT for SO_RCVBUF. Apparently, technically, a Unix domain socket
    139     //   has only one buffer, and it's controlled via SO_SNDBUF on the other end. OK, we'll ignore
    140     //   errors on SO_RCVBUF, then.
    141     //
    142     // Anyway, we now use 127 to avoid these issues (but also to screw around with non-word-boundary
    143     // writes).
    144     uint small = 127;
    145     setsockopt(fds[0], SOL_SOCKET, SO_RCVBUF, (const char*)&small, sizeof(small));
    146     KJ_SOCKCALL(setsockopt(fds[1], SOL_SOCKET, SO_SNDBUF, (const char*)&small, sizeof(small)));
    147   }
    148   ~PipeWithSmallBuffer() {
    149 #if _WIN32
    150     closesocket(fds[0]);
    151     closesocket(fds[1]);
    152 #else
    153     close(fds[0]);
    154     close(fds[1]);
    155 #endif
    156   }
    157 
    158   inline int operator[](uint index) { return fds[index]; }
    159 
    160 private:
    161 #ifdef _WIN32
    162   SOCKET fds[2];
    163 #else
    164   int fds[2];
    165 #endif
    166 };
    167 
    168 #if _WIN32
    169 // Sockets on win32 are not file descriptors. Ugh.
    170 //
    171 // TODO(cleanup): Maybe put these somewhere reusable? kj/io.h is inappropriate since we don't
    172 //   really want to link against winsock.
    173 
    174 class SocketOutputStream: public kj::OutputStream {
    175 public:
    176   explicit SocketOutputStream(SOCKET fd): fd(fd) {}
    177 
    178   void write(const void* buffer, size_t size) override {
    179     const char* ptr = reinterpret_cast<const char*>(buffer);
    180     while (size > 0) {
    181       kj::miniposix::ssize_t n;
    182       KJ_SOCKCALL(n = send(fd, ptr, size, 0));
    183       size -= n;
    184       ptr += n;
    185     }
    186   }
    187 
    188 private:
    189   SOCKET fd;
    190 };
    191 
    192 class SocketInputStream: public kj::InputStream {
    193 public:
    194   explicit SocketInputStream(SOCKET fd): fd(fd) {}
    195 
    196   size_t tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
    197     char* ptr = reinterpret_cast<char*>(buffer);
    198     size_t total = 0;
    199     while (total < minBytes) {
    200       kj::miniposix::ssize_t n;
    201       KJ_SOCKCALL(n = recv(fd, ptr, maxBytes, 0));
    202       total += n;
    203       maxBytes -= n;
    204       ptr += n;
    205     }
    206     return total;
    207   }
    208 
    209 private:
    210   SOCKET fd;
    211 };
    212 #else  // _WIN32
    213 typedef kj::FdOutputStream SocketOutputStream;
    214 typedef kj::FdInputStream SocketInputStream;
    215 #endif  // _WIN32, else
    216 
    217 TEST(SerializeAsyncTest, ParseAsync) {
    218   PipeWithSmallBuffer fds;
    219   auto ioContext = kj::setupAsyncIo();
    220   auto input = ioContext.lowLevelProvider->wrapInputFd(fds[0]);
    221   SocketOutputStream rawOutput(fds[1]);
    222   FragmentingOutputStream output(rawOutput);
    223 
    224   TestMessageBuilder message(1);
    225   initTestMessage(message.getRoot<TestAllTypes>());
    226 
    227   kj::Thread thread([&]() {
    228     writeMessage(output, message);
    229   });
    230 
    231   auto received = readMessage(*input).wait(ioContext.waitScope);
    232 
    233   checkTestMessage(received->getRoot<TestAllTypes>());
    234 }
    235 
    236 TEST(SerializeAsyncTest, ParseAsyncOddSegmentCount) {
    237   PipeWithSmallBuffer fds;
    238   auto ioContext = kj::setupAsyncIo();
    239   auto input = ioContext.lowLevelProvider->wrapInputFd(fds[0]);
    240   SocketOutputStream rawOutput(fds[1]);
    241   FragmentingOutputStream output(rawOutput);
    242 
    243   TestMessageBuilder message(7);
    244   initTestMessage(message.getRoot<TestAllTypes>());
    245 
    246   kj::Thread thread([&]() {
    247     writeMessage(output, message);
    248   });
    249 
    250   auto received = readMessage(*input).wait(ioContext.waitScope);
    251 
    252   checkTestMessage(received->getRoot<TestAllTypes>());
    253 }
    254 
    255 TEST(SerializeAsyncTest, ParseAsyncEvenSegmentCount) {
    256   PipeWithSmallBuffer fds;
    257   auto ioContext = kj::setupAsyncIo();
    258   auto input = ioContext.lowLevelProvider->wrapInputFd(fds[0]);
    259   SocketOutputStream rawOutput(fds[1]);
    260   FragmentingOutputStream output(rawOutput);
    261 
    262   TestMessageBuilder message(10);
    263   initTestMessage(message.getRoot<TestAllTypes>());
    264 
    265   kj::Thread thread([&]() {
    266     writeMessage(output, message);
    267   });
    268 
    269   auto received = readMessage(*input).wait(ioContext.waitScope);
    270 
    271   checkTestMessage(received->getRoot<TestAllTypes>());
    272 }
    273 
    274 TEST(SerializeAsyncTest, WriteAsync) {
    275   PipeWithSmallBuffer fds;
    276   auto ioContext = kj::setupAsyncIo();
    277   auto output = ioContext.lowLevelProvider->wrapOutputFd(fds[1]);
    278 
    279   TestMessageBuilder message(1);
    280   auto root = message.getRoot<TestAllTypes>();
    281   auto list = root.initStructList(16);
    282   for (auto element: list) {
    283     initTestMessage(element);
    284   }
    285 
    286   kj::Thread thread([&]() {
    287     SocketInputStream input(fds[0]);
    288     InputStreamMessageReader reader(input);
    289     auto listReader = reader.getRoot<TestAllTypes>().getStructList();
    290     EXPECT_EQ(list.size(), listReader.size());
    291     for (auto element: listReader) {
    292       checkTestMessage(element);
    293     }
    294   });
    295 
    296   writeMessage(*output, message).wait(ioContext.waitScope);
    297 }
    298 
    299 TEST(SerializeAsyncTest, WriteAsyncOddSegmentCount) {
    300   PipeWithSmallBuffer fds;
    301   auto ioContext = kj::setupAsyncIo();
    302   auto output = ioContext.lowLevelProvider->wrapOutputFd(fds[1]);
    303 
    304   TestMessageBuilder message(7);
    305   auto root = message.getRoot<TestAllTypes>();
    306   auto list = root.initStructList(16);
    307   for (auto element: list) {
    308     initTestMessage(element);
    309   }
    310 
    311   kj::Thread thread([&]() {
    312     SocketInputStream input(fds[0]);
    313     InputStreamMessageReader reader(input);
    314     auto listReader = reader.getRoot<TestAllTypes>().getStructList();
    315     EXPECT_EQ(list.size(), listReader.size());
    316     for (auto element: listReader) {
    317       checkTestMessage(element);
    318     }
    319   });
    320 
    321   writeMessage(*output, message).wait(ioContext.waitScope);
    322 }
    323 
    324 TEST(SerializeAsyncTest, WriteAsyncEvenSegmentCount) {
    325   PipeWithSmallBuffer fds;
    326   auto ioContext = kj::setupAsyncIo();
    327   auto output = ioContext.lowLevelProvider->wrapOutputFd(fds[1]);
    328 
    329   TestMessageBuilder message(10);
    330   auto root = message.getRoot<TestAllTypes>();
    331   auto list = root.initStructList(16);
    332   for (auto element: list) {
    333     initTestMessage(element);
    334   }
    335 
    336   kj::Thread thread([&]() {
    337     SocketInputStream input(fds[0]);
    338     InputStreamMessageReader reader(input);
    339     auto listReader = reader.getRoot<TestAllTypes>().getStructList();
    340     EXPECT_EQ(list.size(), listReader.size());
    341     for (auto element: listReader) {
    342       checkTestMessage(element);
    343     }
    344   });
    345 
    346   writeMessage(*output, message).wait(ioContext.waitScope);
    347 }
    348 
    349 TEST(SerializeAsyncTest, WriteMultipleMessagesAsync) {
    350   PipeWithSmallBuffer fds;
    351   auto ioContext = kj::setupAsyncIo();
    352   auto output = ioContext.lowLevelProvider->wrapOutputFd(fds[1]);
    353 
    354   const int numMessages = 5;
    355   const int baseListSize = 16;
    356   auto messages = kj::heapArrayBuilder<TestMessageBuilder>(numMessages);
    357   for (int i = 0; i < numMessages; ++i) {
    358     messages.add(i+1);
    359     auto root = messages[i].getRoot<TestAllTypes>();
    360     auto list = root.initStructList(baseListSize+i);
    361     for (auto element: list) {
    362       initTestMessage(element);
    363     }
    364   }
    365 
    366   kj::Thread thread([&]() {
    367     SocketInputStream input(fds[0]);
    368     for (int i = 0; i < numMessages; ++i) {
    369       InputStreamMessageReader reader(input);
    370       auto listReader = reader.getRoot<TestAllTypes>().getStructList();
    371       EXPECT_EQ(baseListSize+i, listReader.size());
    372       for (auto element: listReader) {
    373         checkTestMessage(element);
    374       }
    375     }
    376   });
    377 
    378   auto msgs = kj::heapArray<capnp::MessageBuilder*>(numMessages);
    379   for (int i = 0; i < numMessages; ++i) {
    380     msgs[i] = &messages[i];
    381   }
    382   writeMessages(*output, msgs).wait(ioContext.waitScope);
    383 }
    384 
    385 }  // namespace
    386 }  // namespace _ (private)
    387 }  // namespace capnp