serialize-async-test.c++ (11462B)
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #ifndef _GNU_SOURCE 23 #define _GNU_SOURCE 24 #endif 25 26 #if _WIN32 27 #include <kj/win32-api-version.h> 28 #endif 29 30 #include "serialize-async.h" 31 #include "serialize.h" 32 #include <kj/debug.h> 33 #include <kj/thread.h> 34 #include <stdlib.h> 35 #include <kj/miniposix.h> 36 #include "test-util.h" 37 #include <kj/compat/gtest.h> 38 39 #if _WIN32 40 #include <winsock2.h> 41 #include <kj/windows-sanity.h> 42 namespace kj { 43 namespace _ { 44 int win32Socketpair(SOCKET socks[2]); 45 } 46 } 47 #else 48 #include <sys/socket.h> 49 #endif 50 51 namespace capnp { 52 namespace _ { // private 53 namespace { 54 55 #if _WIN32 56 inline void delay() { Sleep(5); } 57 #else 58 inline void delay() { usleep(5000); } 59 #endif 60 61 class FragmentingOutputStream: public kj::OutputStream { 62 public: 63 FragmentingOutputStream(kj::OutputStream& inner): inner(inner) {} 64 65 void write(const void* buffer, size_t size) override { 66 while (size > 0) { 67 delay(); 68 size_t n = rand() % size + 1; 69 inner.write(buffer, n); 70 buffer = reinterpret_cast<const byte*>(buffer) + n; 71 size -= n; 72 } 73 } 74 75 private: 76 kj::OutputStream& inner; 77 }; 78 79 class TestMessageBuilder: public MallocMessageBuilder { 80 // A MessageBuilder that tries to allocate an exact number of total segments, by allocating 81 // minimum-size segments until it reaches the number, then allocating one large segment to 82 // finish. 83 84 public: 85 explicit TestMessageBuilder(uint desiredSegmentCount) 86 : MallocMessageBuilder(0, AllocationStrategy::FIXED_SIZE), 87 desiredSegmentCount(desiredSegmentCount) {} 88 ~TestMessageBuilder() { 89 EXPECT_EQ(0u, desiredSegmentCount); 90 } 91 92 kj::ArrayPtr<word> allocateSegment(uint minimumSize) override { 93 if (desiredSegmentCount <= 1) { 94 if (desiredSegmentCount < 1) { 95 ADD_FAILURE() << "Allocated more segments than desired."; 96 } else { 97 --desiredSegmentCount; 98 } 99 return MallocMessageBuilder::allocateSegment(8192); 100 } else { 101 --desiredSegmentCount; 102 return MallocMessageBuilder::allocateSegment(minimumSize); 103 } 104 } 105 106 private: 107 uint desiredSegmentCount; 108 }; 109 110 class PipeWithSmallBuffer { 111 public: 112 #ifdef _WIN32 113 #define KJ_SOCKCALL KJ_WINSOCK 114 #ifndef SHUT_WR 115 #define SHUT_WR SD_SEND 116 #endif 117 #define socketpair(family, type, flags, fds) kj::_::win32Socketpair(fds) 118 #else 119 #define KJ_SOCKCALL KJ_SYSCALL 120 #endif 121 122 PipeWithSmallBuffer() { 123 // Use a socketpair rather than a pipe so that we can set the buffer size extremely small. 124 KJ_SOCKCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds)); 125 126 KJ_SOCKCALL(shutdown(fds[0], SHUT_WR)); 127 // Note: OSX reports ENOTCONN if we also try to shutdown(fds[1], SHUT_RD). 128 129 // Request that the buffer size be as small as possible, to force the event loop to kick in. 130 // FUN STUFF: 131 // - On Linux, the kernel rounds up to the smallest size it permits, so we can ask for a size of 132 // zero. 133 // - On OSX, the kernel reports EINVAL on zero, but will dutifully use a 1-byte buffer if we 134 // set the size to 1. This tends to cause stack overflows due to ridiculously long promise 135 // chains. 136 // - Cygwin will apparently actually use a buffer size of 0 and therefore block forever waiting 137 // for buffer space. 138 // - GNU HURD throws ENOPROTOOPT for SO_RCVBUF. Apparently, technically, a Unix domain socket 139 // has only one buffer, and it's controlled via SO_SNDBUF on the other end. OK, we'll ignore 140 // errors on SO_RCVBUF, then. 141 // 142 // Anyway, we now use 127 to avoid these issues (but also to screw around with non-word-boundary 143 // writes). 144 uint small = 127; 145 setsockopt(fds[0], SOL_SOCKET, SO_RCVBUF, (const char*)&small, sizeof(small)); 146 KJ_SOCKCALL(setsockopt(fds[1], SOL_SOCKET, SO_SNDBUF, (const char*)&small, sizeof(small))); 147 } 148 ~PipeWithSmallBuffer() { 149 #if _WIN32 150 closesocket(fds[0]); 151 closesocket(fds[1]); 152 #else 153 close(fds[0]); 154 close(fds[1]); 155 #endif 156 } 157 158 inline int operator[](uint index) { return fds[index]; } 159 160 private: 161 #ifdef _WIN32 162 SOCKET fds[2]; 163 #else 164 int fds[2]; 165 #endif 166 }; 167 168 #if _WIN32 169 // Sockets on win32 are not file descriptors. Ugh. 170 // 171 // TODO(cleanup): Maybe put these somewhere reusable? kj/io.h is inappropriate since we don't 172 // really want to link against winsock. 173 174 class SocketOutputStream: public kj::OutputStream { 175 public: 176 explicit SocketOutputStream(SOCKET fd): fd(fd) {} 177 178 void write(const void* buffer, size_t size) override { 179 const char* ptr = reinterpret_cast<const char*>(buffer); 180 while (size > 0) { 181 kj::miniposix::ssize_t n; 182 KJ_SOCKCALL(n = send(fd, ptr, size, 0)); 183 size -= n; 184 ptr += n; 185 } 186 } 187 188 private: 189 SOCKET fd; 190 }; 191 192 class SocketInputStream: public kj::InputStream { 193 public: 194 explicit SocketInputStream(SOCKET fd): fd(fd) {} 195 196 size_t tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { 197 char* ptr = reinterpret_cast<char*>(buffer); 198 size_t total = 0; 199 while (total < minBytes) { 200 kj::miniposix::ssize_t n; 201 KJ_SOCKCALL(n = recv(fd, ptr, maxBytes, 0)); 202 total += n; 203 maxBytes -= n; 204 ptr += n; 205 } 206 return total; 207 } 208 209 private: 210 SOCKET fd; 211 }; 212 #else // _WIN32 213 typedef kj::FdOutputStream SocketOutputStream; 214 typedef kj::FdInputStream SocketInputStream; 215 #endif // _WIN32, else 216 217 TEST(SerializeAsyncTest, ParseAsync) { 218 PipeWithSmallBuffer fds; 219 auto ioContext = kj::setupAsyncIo(); 220 auto input = ioContext.lowLevelProvider->wrapInputFd(fds[0]); 221 SocketOutputStream rawOutput(fds[1]); 222 FragmentingOutputStream output(rawOutput); 223 224 TestMessageBuilder message(1); 225 initTestMessage(message.getRoot<TestAllTypes>()); 226 227 kj::Thread thread([&]() { 228 writeMessage(output, message); 229 }); 230 231 auto received = readMessage(*input).wait(ioContext.waitScope); 232 233 checkTestMessage(received->getRoot<TestAllTypes>()); 234 } 235 236 TEST(SerializeAsyncTest, ParseAsyncOddSegmentCount) { 237 PipeWithSmallBuffer fds; 238 auto ioContext = kj::setupAsyncIo(); 239 auto input = ioContext.lowLevelProvider->wrapInputFd(fds[0]); 240 SocketOutputStream rawOutput(fds[1]); 241 FragmentingOutputStream output(rawOutput); 242 243 TestMessageBuilder message(7); 244 initTestMessage(message.getRoot<TestAllTypes>()); 245 246 kj::Thread thread([&]() { 247 writeMessage(output, message); 248 }); 249 250 auto received = readMessage(*input).wait(ioContext.waitScope); 251 252 checkTestMessage(received->getRoot<TestAllTypes>()); 253 } 254 255 TEST(SerializeAsyncTest, ParseAsyncEvenSegmentCount) { 256 PipeWithSmallBuffer fds; 257 auto ioContext = kj::setupAsyncIo(); 258 auto input = ioContext.lowLevelProvider->wrapInputFd(fds[0]); 259 SocketOutputStream rawOutput(fds[1]); 260 FragmentingOutputStream output(rawOutput); 261 262 TestMessageBuilder message(10); 263 initTestMessage(message.getRoot<TestAllTypes>()); 264 265 kj::Thread thread([&]() { 266 writeMessage(output, message); 267 }); 268 269 auto received = readMessage(*input).wait(ioContext.waitScope); 270 271 checkTestMessage(received->getRoot<TestAllTypes>()); 272 } 273 274 TEST(SerializeAsyncTest, WriteAsync) { 275 PipeWithSmallBuffer fds; 276 auto ioContext = kj::setupAsyncIo(); 277 auto output = ioContext.lowLevelProvider->wrapOutputFd(fds[1]); 278 279 TestMessageBuilder message(1); 280 auto root = message.getRoot<TestAllTypes>(); 281 auto list = root.initStructList(16); 282 for (auto element: list) { 283 initTestMessage(element); 284 } 285 286 kj::Thread thread([&]() { 287 SocketInputStream input(fds[0]); 288 InputStreamMessageReader reader(input); 289 auto listReader = reader.getRoot<TestAllTypes>().getStructList(); 290 EXPECT_EQ(list.size(), listReader.size()); 291 for (auto element: listReader) { 292 checkTestMessage(element); 293 } 294 }); 295 296 writeMessage(*output, message).wait(ioContext.waitScope); 297 } 298 299 TEST(SerializeAsyncTest, WriteAsyncOddSegmentCount) { 300 PipeWithSmallBuffer fds; 301 auto ioContext = kj::setupAsyncIo(); 302 auto output = ioContext.lowLevelProvider->wrapOutputFd(fds[1]); 303 304 TestMessageBuilder message(7); 305 auto root = message.getRoot<TestAllTypes>(); 306 auto list = root.initStructList(16); 307 for (auto element: list) { 308 initTestMessage(element); 309 } 310 311 kj::Thread thread([&]() { 312 SocketInputStream input(fds[0]); 313 InputStreamMessageReader reader(input); 314 auto listReader = reader.getRoot<TestAllTypes>().getStructList(); 315 EXPECT_EQ(list.size(), listReader.size()); 316 for (auto element: listReader) { 317 checkTestMessage(element); 318 } 319 }); 320 321 writeMessage(*output, message).wait(ioContext.waitScope); 322 } 323 324 TEST(SerializeAsyncTest, WriteAsyncEvenSegmentCount) { 325 PipeWithSmallBuffer fds; 326 auto ioContext = kj::setupAsyncIo(); 327 auto output = ioContext.lowLevelProvider->wrapOutputFd(fds[1]); 328 329 TestMessageBuilder message(10); 330 auto root = message.getRoot<TestAllTypes>(); 331 auto list = root.initStructList(16); 332 for (auto element: list) { 333 initTestMessage(element); 334 } 335 336 kj::Thread thread([&]() { 337 SocketInputStream input(fds[0]); 338 InputStreamMessageReader reader(input); 339 auto listReader = reader.getRoot<TestAllTypes>().getStructList(); 340 EXPECT_EQ(list.size(), listReader.size()); 341 for (auto element: listReader) { 342 checkTestMessage(element); 343 } 344 }); 345 346 writeMessage(*output, message).wait(ioContext.waitScope); 347 } 348 349 TEST(SerializeAsyncTest, WriteMultipleMessagesAsync) { 350 PipeWithSmallBuffer fds; 351 auto ioContext = kj::setupAsyncIo(); 352 auto output = ioContext.lowLevelProvider->wrapOutputFd(fds[1]); 353 354 const int numMessages = 5; 355 const int baseListSize = 16; 356 auto messages = kj::heapArrayBuilder<TestMessageBuilder>(numMessages); 357 for (int i = 0; i < numMessages; ++i) { 358 messages.add(i+1); 359 auto root = messages[i].getRoot<TestAllTypes>(); 360 auto list = root.initStructList(baseListSize+i); 361 for (auto element: list) { 362 initTestMessage(element); 363 } 364 } 365 366 kj::Thread thread([&]() { 367 SocketInputStream input(fds[0]); 368 for (int i = 0; i < numMessages; ++i) { 369 InputStreamMessageReader reader(input); 370 auto listReader = reader.getRoot<TestAllTypes>().getStructList(); 371 EXPECT_EQ(baseListSize+i, listReader.size()); 372 for (auto element: listReader) { 373 checkTestMessage(element); 374 } 375 } 376 }); 377 378 auto msgs = kj::heapArray<capnp::MessageBuilder*>(numMessages); 379 for (int i = 0; i < numMessages; ++i) { 380 msgs[i] = &messages[i]; 381 } 382 writeMessages(*output, msgs).wait(ioContext.waitScope); 383 } 384 385 } // namespace 386 } // namespace _ (private) 387 } // namespace capnp