async-io-test.c++ (94798B)
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #if _WIN32 23 // Request Vista-level APIs. 24 #include "win32-api-version.h" 25 #elif !defined(_GNU_SOURCE) 26 #define _GNU_SOURCE 27 #endif 28 29 #include "async-io.h" 30 #include "async-io-internal.h" 31 #include "debug.h" 32 #include "io.h" 33 #include "miniposix.h" 34 #include <kj/compat/gtest.h> 35 #include <kj/time.h> 36 #include <sys/types.h> 37 #if _WIN32 38 #include <ws2tcpip.h> 39 #include "windows-sanity.h" 40 #define inet_pton InetPtonA 41 #define inet_ntop InetNtopA 42 #else 43 #include <netdb.h> 44 #include <unistd.h> 45 #include <fcntl.h> 46 #include <sys/socket.h> 47 #include <arpa/inet.h> 48 #include <netinet/in.h> 49 #endif 50 51 namespace kj { 52 namespace { 53 54 TEST(AsyncIo, SimpleNetwork) { 55 auto ioContext = setupAsyncIo(); 56 auto& network = ioContext.provider->getNetwork(); 57 58 Own<ConnectionReceiver> listener; 59 Own<AsyncIoStream> server; 60 Own<AsyncIoStream> client; 61 62 char receiveBuffer[4]; 63 64 auto port = newPromiseAndFulfiller<uint>(); 65 66 port.promise.then([&](uint portnum) { 67 return network.parseAddress("localhost", portnum); 68 }).then([&](Own<NetworkAddress>&& result) { 69 return result->connect(); 70 }).then([&](Own<AsyncIoStream>&& result) { 71 client = kj::mv(result); 72 return client->write("foo", 3); 73 }).detach([](kj::Exception&& exception) { 74 KJ_FAIL_EXPECT(exception); 75 }); 76 77 kj::String result = network.parseAddress("*").then([&](Own<NetworkAddress>&& result) { 78 listener = result->listen(); 79 port.fulfiller->fulfill(listener->getPort()); 80 return listener->accept(); 81 }).then([&](Own<AsyncIoStream>&& result) { 82 server = kj::mv(result); 83 return server->tryRead(receiveBuffer, 3, 4); 84 }).then([&](size_t n) { 85 EXPECT_EQ(3u, n); 86 return heapString(receiveBuffer, n); 87 }).wait(ioContext.waitScope); 88 89 EXPECT_EQ("foo", result); 90 } 91 92 #if !_WIN32 // TODO(0.10): Implement NetworkPeerIdentity for Win32. 93 TEST(AsyncIo, SimpleNetworkAuthentication) { 94 auto ioContext = setupAsyncIo(); 95 auto& network = ioContext.provider->getNetwork(); 96 97 Own<ConnectionReceiver> listener; 98 Own<AsyncIoStream> server; 99 Own<AsyncIoStream> client; 100 101 char receiveBuffer[4]; 102 103 auto port = newPromiseAndFulfiller<uint>(); 104 105 port.promise.then([&](uint portnum) { 106 return network.parseAddress("localhost", portnum); 107 }).then([&](Own<NetworkAddress>&& addr) { 108 auto promise = addr->connectAuthenticated(); 109 return promise.then([&,addr=kj::mv(addr)](AuthenticatedStream result) mutable { 110 auto id = result.peerIdentity.downcast<NetworkPeerIdentity>(); 111 112 // `addr` was resolved from `localhost` and may contain multiple addresses, but 113 // result.peerIdentity tells us the specific address that was used. So it should be one 114 // of the ones on the list, but only one. 115 KJ_EXPECT(strstr(addr->toString().cStr(), id->getAddress().toString().cStr()) != nullptr); 116 KJ_EXPECT(id->getAddress().toString().findFirst(',') == nullptr); 117 118 client = kj::mv(result.stream); 119 120 // `id` should match client->getpeername(). 121 union { 122 struct sockaddr generic; 123 struct sockaddr_in ip4; 124 struct sockaddr_in6 ip6; 125 } rawAddr; 126 uint len = sizeof(rawAddr); 127 client->getpeername(&rawAddr.generic, &len); 128 auto peername = network.getSockaddr(&rawAddr.generic, len); 129 KJ_EXPECT(id->toString() == peername->toString()); 130 131 return client->write("foo", 3); 132 }); 133 }).detach([](kj::Exception&& exception) { 134 KJ_FAIL_EXPECT(exception); 135 }); 136 137 kj::String result = network.parseAddress("*").then([&](Own<NetworkAddress>&& result) { 138 listener = result->listen(); 139 port.fulfiller->fulfill(listener->getPort()); 140 return listener->acceptAuthenticated(); 141 }).then([&](AuthenticatedStream result) { 142 auto id = result.peerIdentity.downcast<NetworkPeerIdentity>(); 143 server = kj::mv(result.stream); 144 145 // `id` should match server->getpeername(). 146 union { 147 struct sockaddr generic; 148 struct sockaddr_in ip4; 149 struct sockaddr_in6 ip6; 150 } addr; 151 uint len = sizeof(addr); 152 server->getpeername(&addr.generic, &len); 153 auto peername = network.getSockaddr(&addr.generic, len); 154 KJ_EXPECT(id->toString() == peername->toString()); 155 156 return server->tryRead(receiveBuffer, 3, 4); 157 }).then([&](size_t n) { 158 EXPECT_EQ(3u, n); 159 return heapString(receiveBuffer, n); 160 }).wait(ioContext.waitScope); 161 162 EXPECT_EQ("foo", result); 163 } 164 #endif 165 166 #if !_WIN32 && !__CYGWIN__ // TODO(someday): Debug why this deadlocks on Cygwin. 167 168 #if __ANDROID__ 169 #define TMPDIR "/data/local/tmp" 170 #else 171 #define TMPDIR "/tmp" 172 #endif 173 174 TEST(AsyncIo, UnixSocket) { 175 auto ioContext = setupAsyncIo(); 176 auto& network = ioContext.provider->getNetwork(); 177 178 auto path = kj::str(TMPDIR "/kj-async-io-test.", getpid()); 179 KJ_DEFER(unlink(path.cStr())); 180 181 Own<ConnectionReceiver> listener; 182 Own<AsyncIoStream> server; 183 Own<AsyncIoStream> client; 184 185 char receiveBuffer[4]; 186 187 auto ready = newPromiseAndFulfiller<void>(); 188 189 ready.promise.then([&]() { 190 return network.parseAddress(kj::str("unix:", path)); 191 }).then([&](Own<NetworkAddress>&& addr) { 192 auto promise = addr->connectAuthenticated(); 193 return promise.then([&,addr=kj::mv(addr)](AuthenticatedStream result) mutable { 194 auto id = result.peerIdentity.downcast<LocalPeerIdentity>(); 195 auto creds = id->getCredentials(); 196 KJ_IF_MAYBE(p, creds.pid) { 197 KJ_EXPECT(*p == getpid()); 198 #if __linux__ || __APPLE__ 199 } else { 200 KJ_FAIL_EXPECT("LocalPeerIdentity for unix socket had null PID"); 201 #endif 202 } 203 KJ_IF_MAYBE(u, creds.uid) { 204 KJ_EXPECT(*u == getuid()); 205 } else { 206 KJ_FAIL_EXPECT("LocalPeerIdentity for unix socket had null UID"); 207 } 208 209 client = kj::mv(result.stream); 210 return client->write("foo", 3); 211 }); 212 }).detach([](kj::Exception&& exception) { 213 KJ_FAIL_EXPECT(exception); 214 }); 215 216 kj::String result = network.parseAddress(kj::str("unix:", path)) 217 .then([&](Own<NetworkAddress>&& result) { 218 listener = result->listen(); 219 ready.fulfiller->fulfill(); 220 return listener->acceptAuthenticated(); 221 }).then([&](AuthenticatedStream result) { 222 auto id = result.peerIdentity.downcast<LocalPeerIdentity>(); 223 auto creds = id->getCredentials(); 224 KJ_IF_MAYBE(p, creds.pid) { 225 KJ_EXPECT(*p == getpid()); 226 #if __linux__ || __APPLE__ 227 } else { 228 KJ_FAIL_EXPECT("LocalPeerIdentity for unix socket had null PID"); 229 #endif 230 } 231 KJ_IF_MAYBE(u, creds.uid) { 232 KJ_EXPECT(*u == getuid()); 233 } else { 234 KJ_FAIL_EXPECT("LocalPeerIdentity for unix socket had null UID"); 235 } 236 237 server = kj::mv(result.stream); 238 return server->tryRead(receiveBuffer, 3, 4); 239 }).then([&](size_t n) { 240 EXPECT_EQ(3u, n); 241 return heapString(receiveBuffer, n); 242 }).wait(ioContext.waitScope); 243 244 EXPECT_EQ("foo", result); 245 } 246 247 TEST(AsyncIo, AncillaryMessageHandlerNoMsg) { 248 auto ioContext = setupAsyncIo(); 249 auto& network = ioContext.provider->getNetwork(); 250 251 Own<ConnectionReceiver> listener; 252 Own<AsyncIoStream> server; 253 Own<AsyncIoStream> client; 254 255 char receiveBuffer[4]; 256 257 bool clientHandlerCalled = false; 258 kj::Function<void(kj::ArrayPtr<AncillaryMessage>)> clientHandler = 259 [&](kj::ArrayPtr<AncillaryMessage>) { 260 clientHandlerCalled = true; 261 }; 262 bool serverHandlerCalled = false; 263 kj::Function<void(kj::ArrayPtr<AncillaryMessage>)> serverHandler = 264 [&](kj::ArrayPtr<AncillaryMessage>) { 265 serverHandlerCalled = true; 266 }; 267 268 auto port = newPromiseAndFulfiller<uint>(); 269 270 port.promise.then([&](uint portnum) { 271 return network.parseAddress("localhost", portnum); 272 }).then([&](Own<NetworkAddress>&& addr) { 273 auto promise = addr->connectAuthenticated(); 274 return promise.then([&,addr=kj::mv(addr)](AuthenticatedStream result) mutable { 275 client = kj::mv(result.stream); 276 client->registerAncillaryMessageHandler(kj::mv(clientHandler)); 277 return client->write("foo", 3); 278 }); 279 }).detach([](kj::Exception&& exception) { 280 KJ_FAIL_EXPECT(exception); 281 }); 282 283 kj::String result = network.parseAddress("*").then([&](Own<NetworkAddress>&& result) { 284 listener = result->listen(); 285 port.fulfiller->fulfill(listener->getPort()); 286 return listener->acceptAuthenticated(); 287 }).then([&](AuthenticatedStream result) { 288 server = kj::mv(result.stream); 289 server->registerAncillaryMessageHandler(kj::mv(serverHandler)); 290 return server->tryRead(receiveBuffer, 3, 4); 291 }).then([&](size_t n) { 292 EXPECT_EQ(3u, n); 293 return heapString(receiveBuffer, n); 294 }).wait(ioContext.waitScope); 295 296 EXPECT_EQ("foo", result); 297 EXPECT_FALSE(clientHandlerCalled); 298 EXPECT_FALSE(serverHandlerCalled); 299 } 300 #endif 301 302 // This test uses SO_TIMESTAMP on a SOCK_STREAM, which is only supported by Linux. Ideally we'd 303 // rewrite the test to use some other message type that is widely supported on streams. But for 304 // now we just limit the test to Linux. Also, it doesn't work on Android for some reason, and it 305 // isn't worth investigating, so we skip it there. 306 #if __linux__ && !__ANDROID__ 307 TEST(AsyncIo, AncillaryMessageHandler) { 308 auto ioContext = setupAsyncIo(); 309 auto& network = ioContext.provider->getNetwork(); 310 311 Own<ConnectionReceiver> listener; 312 Own<AsyncIoStream> server; 313 Own<AsyncIoStream> client; 314 315 char receiveBuffer[4]; 316 317 bool clientHandlerCalled = false; 318 kj::Function<void(kj::ArrayPtr<AncillaryMessage>)> clientHandler = 319 [&](kj::ArrayPtr<AncillaryMessage>) { 320 clientHandlerCalled = true; 321 }; 322 bool serverHandlerCalled = false; 323 kj::Function<void(kj::ArrayPtr<AncillaryMessage>)> serverHandler = 324 [&](kj::ArrayPtr<AncillaryMessage> msgs) { 325 serverHandlerCalled = true; 326 EXPECT_EQ(1, msgs.size()); 327 EXPECT_EQ(SOL_SOCKET, msgs[0].getLevel()); 328 EXPECT_EQ(SO_TIMESTAMP, msgs[0].getType()); 329 }; 330 331 auto port = newPromiseAndFulfiller<uint>(); 332 333 port.promise.then([&](uint portnum) { 334 return network.parseAddress("localhost", portnum); 335 }).then([&](Own<NetworkAddress>&& addr) { 336 auto promise = addr->connectAuthenticated(); 337 return promise.then([&,addr=kj::mv(addr)](AuthenticatedStream result) mutable { 338 client = kj::mv(result.stream); 339 client->registerAncillaryMessageHandler(kj::mv(clientHandler)); 340 return client->write("foo", 3); 341 }); 342 }).detach([](kj::Exception&& exception) { 343 KJ_FAIL_EXPECT(exception); 344 }); 345 346 kj::String result = network.parseAddress("*").then([&](Own<NetworkAddress>&& result) { 347 listener = result->listen(); 348 // Register interest in having the timestamp delivered via cmsg on each recvmsg. 349 int yes = 1; 350 listener->setsockopt(SOL_SOCKET, SO_TIMESTAMP, &yes, sizeof(yes)); 351 port.fulfiller->fulfill(listener->getPort()); 352 return listener->acceptAuthenticated(); 353 }).then([&](AuthenticatedStream result) { 354 server = kj::mv(result.stream); 355 server->registerAncillaryMessageHandler(kj::mv(serverHandler)); 356 return server->tryRead(receiveBuffer, 3, 4); 357 }).then([&](size_t n) { 358 EXPECT_EQ(3u, n); 359 return heapString(receiveBuffer, n); 360 }).wait(ioContext.waitScope); 361 362 EXPECT_EQ("foo", result); 363 EXPECT_FALSE(clientHandlerCalled); 364 EXPECT_TRUE(serverHandlerCalled); 365 } 366 #endif 367 368 String tryParse(WaitScope& waitScope, Network& network, StringPtr text, uint portHint = 0) { 369 return network.parseAddress(text, portHint).wait(waitScope)->toString(); 370 } 371 372 bool systemSupportsAddress(StringPtr addr, StringPtr service = nullptr) { 373 // Can getaddrinfo() parse this addresses? This is only true if the address family (e.g., ipv6) 374 // is configured on at least one interface. (The loopback interface usually has both ipv4 and 375 // ipv6 configured, but not always.) 376 struct addrinfo* list; 377 int status = getaddrinfo( 378 addr.cStr(), service == nullptr ? nullptr : service.cStr(), nullptr, &list); 379 if (status == 0) { 380 freeaddrinfo(list); 381 return true; 382 } else { 383 return false; 384 } 385 } 386 387 TEST(AsyncIo, AddressParsing) { 388 auto ioContext = setupAsyncIo(); 389 auto& w = ioContext.waitScope; 390 auto& network = ioContext.provider->getNetwork(); 391 392 EXPECT_EQ("*:0", tryParse(w, network, "*")); 393 EXPECT_EQ("*:123", tryParse(w, network, "*:123")); 394 EXPECT_EQ("0.0.0.0:0", tryParse(w, network, "0.0.0.0")); 395 EXPECT_EQ("1.2.3.4:5678", tryParse(w, network, "1.2.3.4", 5678)); 396 397 #if !_WIN32 398 EXPECT_EQ("unix:foo/bar/baz", tryParse(w, network, "unix:foo/bar/baz")); 399 EXPECT_EQ("unix-abstract:foo/bar/baz", tryParse(w, network, "unix-abstract:foo/bar/baz")); 400 #endif 401 402 // We can parse services by name... 403 // 404 // For some reason, Android and some various Linux distros do not support service names. 405 if (systemSupportsAddress("1.2.3.4", "http")) { 406 EXPECT_EQ("1.2.3.4:80", tryParse(w, network, "1.2.3.4:http", 5678)); 407 EXPECT_EQ("*:80", tryParse(w, network, "*:http", 5678)); 408 } else { 409 KJ_LOG(WARNING, "system does not support resolving service names on ipv4; skipping tests"); 410 } 411 412 // IPv6 tests. Annoyingly, these don't work on machines that don't have IPv6 configured on any 413 // interfaces. 414 if (systemSupportsAddress("::")) { 415 EXPECT_EQ("[::]:123", tryParse(w, network, "0::0", 123)); 416 EXPECT_EQ("[12ab:cd::34]:321", tryParse(w, network, "[12ab:cd:0::0:34]:321", 432)); 417 if (systemSupportsAddress("12ab:cd::34", "http")) { 418 EXPECT_EQ("[::]:80", tryParse(w, network, "[::]:http", 5678)); 419 EXPECT_EQ("[12ab:cd::34]:80", tryParse(w, network, "[12ab:cd::34]:http", 5678)); 420 } else { 421 KJ_LOG(WARNING, "system does not support resolving service names on ipv6; skipping tests"); 422 } 423 } else { 424 KJ_LOG(WARNING, "system does not support ipv6; skipping tests"); 425 } 426 427 // It would be nice to test DNS lookup here but the test would not be very hermetic. Even 428 // localhost can map to different addresses depending on whether IPv6 is enabled. We do 429 // connect to "localhost" in a different test, though. 430 } 431 432 TEST(AsyncIo, OneWayPipe) { 433 auto ioContext = setupAsyncIo(); 434 435 auto pipe = ioContext.provider->newOneWayPipe(); 436 char receiveBuffer[4]; 437 438 pipe.out->write("foo", 3).detach([](kj::Exception&& exception) { 439 KJ_FAIL_EXPECT(exception); 440 }); 441 442 kj::String result = pipe.in->tryRead(receiveBuffer, 3, 4).then([&](size_t n) { 443 EXPECT_EQ(3u, n); 444 return heapString(receiveBuffer, n); 445 }).wait(ioContext.waitScope); 446 447 EXPECT_EQ("foo", result); 448 } 449 450 TEST(AsyncIo, TwoWayPipe) { 451 auto ioContext = setupAsyncIo(); 452 453 auto pipe = ioContext.provider->newTwoWayPipe(); 454 char receiveBuffer1[4]; 455 char receiveBuffer2[4]; 456 457 auto promise = pipe.ends[0]->write("foo", 3).then([&]() { 458 return pipe.ends[0]->tryRead(receiveBuffer1, 3, 4); 459 }).then([&](size_t n) { 460 EXPECT_EQ(3u, n); 461 return heapString(receiveBuffer1, n); 462 }); 463 464 kj::String result = pipe.ends[1]->write("bar", 3).then([&]() { 465 return pipe.ends[1]->tryRead(receiveBuffer2, 3, 4); 466 }).then([&](size_t n) { 467 EXPECT_EQ(3u, n); 468 return heapString(receiveBuffer2, n); 469 }).wait(ioContext.waitScope); 470 471 kj::String result2 = promise.wait(ioContext.waitScope); 472 473 EXPECT_EQ("foo", result); 474 EXPECT_EQ("bar", result2); 475 } 476 477 TEST(AsyncIo, InMemoryCapabilityPipe) { 478 EventLoop loop; 479 WaitScope waitScope(loop); 480 481 auto pipe = newCapabilityPipe(); 482 auto pipe2 = newCapabilityPipe(); 483 char receiveBuffer1[4]; 484 char receiveBuffer2[4]; 485 486 // Expect to receive a stream, then read "foo" from it, then write "bar" to it. 487 Own<AsyncCapabilityStream> receivedStream; 488 auto promise = pipe2.ends[1]->receiveStream() 489 .then([&](Own<AsyncCapabilityStream> stream) { 490 receivedStream = kj::mv(stream); 491 return receivedStream->tryRead(receiveBuffer2, 3, 4); 492 }).then([&](size_t n) { 493 EXPECT_EQ(3u, n); 494 return receivedStream->write("bar", 3).then([&receiveBuffer2,n]() { 495 return heapString(receiveBuffer2, n); 496 }); 497 }); 498 499 // Send a stream, then write "foo" to the other end of the sent stream, then receive "bar" 500 // from it. 501 kj::String result = pipe2.ends[0]->sendStream(kj::mv(pipe.ends[1])) 502 .then([&]() { 503 return pipe.ends[0]->write("foo", 3); 504 }).then([&]() { 505 return pipe.ends[0]->tryRead(receiveBuffer1, 3, 4); 506 }).then([&](size_t n) { 507 EXPECT_EQ(3u, n); 508 return heapString(receiveBuffer1, n); 509 }).wait(waitScope); 510 511 kj::String result2 = promise.wait(waitScope); 512 513 EXPECT_EQ("bar", result); 514 EXPECT_EQ("foo", result2); 515 } 516 517 #if !_WIN32 && !__CYGWIN__ 518 TEST(AsyncIo, CapabilityPipe) { 519 auto ioContext = setupAsyncIo(); 520 521 auto pipe = ioContext.provider->newCapabilityPipe(); 522 auto pipe2 = ioContext.provider->newCapabilityPipe(); 523 char receiveBuffer1[4]; 524 char receiveBuffer2[4]; 525 526 // Expect to receive a stream, then write "bar" to it, then receive "foo" from it. 527 Own<AsyncCapabilityStream> receivedStream; 528 auto promise = pipe2.ends[1]->receiveStream() 529 .then([&](Own<AsyncCapabilityStream> stream) { 530 receivedStream = kj::mv(stream); 531 return receivedStream->write("bar", 3); 532 }).then([&]() { 533 return receivedStream->tryRead(receiveBuffer2, 3, 4); 534 }).then([&](size_t n) { 535 EXPECT_EQ(3u, n); 536 return heapString(receiveBuffer2, n); 537 }); 538 539 // Send a stream, then write "foo" to the other end of the sent stream, then receive "bar" 540 // from it. 541 kj::String result = pipe2.ends[0]->sendStream(kj::mv(pipe.ends[1])) 542 .then([&]() { 543 return pipe.ends[0]->write("foo", 3); 544 }).then([&]() { 545 return pipe.ends[0]->tryRead(receiveBuffer1, 3, 4); 546 }).then([&](size_t n) { 547 EXPECT_EQ(3u, n); 548 return heapString(receiveBuffer1, n); 549 }).wait(ioContext.waitScope); 550 551 kj::String result2 = promise.wait(ioContext.waitScope); 552 553 EXPECT_EQ("bar", result); 554 EXPECT_EQ("foo", result2); 555 } 556 557 TEST(AsyncIo, CapabilityPipeBlockedSendStream) { 558 // Check for a bug that existed at one point where if a sendStream() call couldn't complete 559 // immediately, it would fail. 560 561 auto io = setupAsyncIo(); 562 563 auto pipe = io.provider->newCapabilityPipe(); 564 565 Promise<void> promise = nullptr; 566 Own<AsyncIoStream> endpoint1; 567 uint nonBlockedCount = 0; 568 for (;;) { 569 auto pipe2 = io.provider->newCapabilityPipe(); 570 promise = pipe.ends[0]->sendStream(kj::mv(pipe2.ends[0])); 571 if (promise.poll(io.waitScope)) { 572 // Send completed immediately, because there was enough space in the stream. 573 ++nonBlockedCount; 574 promise.wait(io.waitScope); 575 } else { 576 // Send blocked! Let's continue with this promise then! 577 endpoint1 = kj::mv(pipe2.ends[1]); 578 break; 579 } 580 } 581 582 for (uint i KJ_UNUSED: kj::zeroTo(nonBlockedCount)) { 583 // Receive and ignore all the streams that were sent without blocking. 584 pipe.ends[1]->receiveStream().wait(io.waitScope); 585 } 586 587 // Now that write that blocked should have been able to complete. 588 promise.wait(io.waitScope); 589 590 // Now get the one that blocked. 591 auto endpoint2 = pipe.ends[1]->receiveStream().wait(io.waitScope); 592 593 endpoint1->write("foo", 3).wait(io.waitScope); 594 endpoint1->shutdownWrite(); 595 KJ_EXPECT(endpoint2->readAllText().wait(io.waitScope) == "foo"); 596 } 597 598 TEST(AsyncIo, CapabilityPipeMultiStreamMessage) { 599 auto ioContext = setupAsyncIo(); 600 601 auto pipe = ioContext.provider->newCapabilityPipe(); 602 auto pipe2 = ioContext.provider->newCapabilityPipe(); 603 auto pipe3 = ioContext.provider->newCapabilityPipe(); 604 605 auto streams = heapArrayBuilder<Own<AsyncCapabilityStream>>(2); 606 streams.add(kj::mv(pipe2.ends[0])); 607 streams.add(kj::mv(pipe3.ends[0])); 608 609 ArrayPtr<const byte> secondBuf = "bar"_kj.asBytes(); 610 pipe.ends[0]->writeWithStreams("foo"_kj.asBytes(), arrayPtr(&secondBuf, 1), streams.finish()) 611 .wait(ioContext.waitScope); 612 613 char receiveBuffer[7]; 614 Own<AsyncCapabilityStream> receiveStreams[3]; 615 auto result = pipe.ends[1]->tryReadWithStreams(receiveBuffer, 6, 7, receiveStreams, 3) 616 .wait(ioContext.waitScope); 617 618 KJ_EXPECT(result.byteCount == 6); 619 receiveBuffer[6] = '\0'; 620 KJ_EXPECT(kj::StringPtr(receiveBuffer) == "foobar"); 621 622 KJ_ASSERT(result.capCount == 2); 623 624 receiveStreams[0]->write("baz", 3).wait(ioContext.waitScope); 625 receiveStreams[0] = nullptr; 626 KJ_EXPECT(pipe2.ends[1]->readAllText().wait(ioContext.waitScope) == "baz"); 627 628 pipe3.ends[1]->write("qux", 3).wait(ioContext.waitScope); 629 pipe3.ends[1] = nullptr; 630 KJ_EXPECT(receiveStreams[1]->readAllText().wait(ioContext.waitScope) == "qux"); 631 } 632 633 TEST(AsyncIo, ScmRightsTruncatedOdd) { 634 // Test that if we send two FDs over a unix socket, but the receiving end only receives one, we 635 // don't leak the other FD. 636 637 auto io = setupAsyncIo(); 638 639 auto capPipe = io.provider->newCapabilityPipe(); 640 641 int pipeFds[2]; 642 KJ_SYSCALL(miniposix::pipe(pipeFds)); 643 kj::AutoCloseFd in1(pipeFds[0]); 644 kj::AutoCloseFd out1(pipeFds[1]); 645 646 KJ_SYSCALL(miniposix::pipe(pipeFds)); 647 kj::AutoCloseFd in2(pipeFds[0]); 648 kj::AutoCloseFd out2(pipeFds[1]); 649 650 { 651 AutoCloseFd sendFds[2] = { kj::mv(out1), kj::mv(out2) }; 652 capPipe.ends[0]->writeWithFds("foo"_kj.asBytes(), nullptr, sendFds).wait(io.waitScope); 653 } 654 655 { 656 char buffer[4]; 657 AutoCloseFd fdBuffer[1]; 658 auto result = capPipe.ends[1]->tryReadWithFds(buffer, 3, 3, fdBuffer, 1).wait(io.waitScope); 659 KJ_ASSERT(result.capCount == 1); 660 kj::FdOutputStream(fdBuffer[0].get()).write("bar", 3); 661 } 662 663 // We want to carefully verify that out1 and out2 were closed, without deadlocking if they 664 // weren't. So we manually set nonblocking mode and then issue read()s. 665 KJ_SYSCALL(fcntl(in1, F_SETFL, O_NONBLOCK)); 666 KJ_SYSCALL(fcntl(in2, F_SETFL, O_NONBLOCK)); 667 668 char buffer[4]; 669 ssize_t n; 670 671 // First we read "bar" from in1. 672 KJ_NONBLOCKING_SYSCALL(n = read(in1, buffer, 4)); 673 KJ_ASSERT(n == 3); 674 buffer[3] = '\0'; 675 KJ_ASSERT(kj::StringPtr(buffer) == "bar"); 676 677 // Now it should be EOF. 678 KJ_NONBLOCKING_SYSCALL(n = read(in1, buffer, 4)); 679 if (n < 0) { 680 KJ_FAIL_ASSERT("out1 was not closed"); 681 } 682 KJ_ASSERT(n == 0); 683 684 // Second pipe should have been closed implicitly because we didn't provide space to receive it. 685 KJ_NONBLOCKING_SYSCALL(n = read(in2, buffer, 4)); 686 if (n < 0) { 687 KJ_FAIL_ASSERT("out2 was not closed. This could indicate that your operating system kernel is " 688 "buggy and leaks file descriptors when an SCM_RIGHTS message is truncated. FreeBSD was " 689 "known to do this until late 2018, while MacOS still has this bug as of this writing in " 690 "2019. However, KJ works around the problem on those platforms. You need to enable the " 691 "same work-around for your OS -- search for 'SCM_RIGHTS' in src/kj/async-io-unix.c++."); 692 } 693 KJ_ASSERT(n == 0); 694 } 695 696 #if !__aarch64__ 697 // This test fails under qemu-user, probably due to a bug in qemu's syscall emulation rather than 698 // a bug in the kernel. We don't have a good way to detect qemu so we just skip the test on aarch64 699 // in general. 700 701 TEST(AsyncIo, ScmRightsTruncatedEven) { 702 // Test that if we send three FDs over a unix socket, but the receiving end only receives two, we 703 // don't leak the third FD. This is different from the send-two-receive-one case in that 704 // CMSG_SPACE() on many systems rounds up such that there is always space for an even number of 705 // FDs. In that case the other test only verifies that our userspace code to close unwanted FDs 706 // is correct, whereas *this* test really verifies that the *kernel* properly closes truncated 707 // FDs. 708 709 auto io = setupAsyncIo(); 710 711 auto capPipe = io.provider->newCapabilityPipe(); 712 713 int pipeFds[2]; 714 KJ_SYSCALL(miniposix::pipe(pipeFds)); 715 kj::AutoCloseFd in1(pipeFds[0]); 716 kj::AutoCloseFd out1(pipeFds[1]); 717 718 KJ_SYSCALL(miniposix::pipe(pipeFds)); 719 kj::AutoCloseFd in2(pipeFds[0]); 720 kj::AutoCloseFd out2(pipeFds[1]); 721 722 KJ_SYSCALL(miniposix::pipe(pipeFds)); 723 kj::AutoCloseFd in3(pipeFds[0]); 724 kj::AutoCloseFd out3(pipeFds[1]); 725 726 { 727 AutoCloseFd sendFds[3] = { kj::mv(out1), kj::mv(out2), kj::mv(out3) }; 728 capPipe.ends[0]->writeWithFds("foo"_kj.asBytes(), nullptr, sendFds).wait(io.waitScope); 729 } 730 731 { 732 char buffer[4]; 733 AutoCloseFd fdBuffer[2]; 734 auto result = capPipe.ends[1]->tryReadWithFds(buffer, 3, 3, fdBuffer, 2).wait(io.waitScope); 735 KJ_ASSERT(result.capCount == 2); 736 kj::FdOutputStream(fdBuffer[0].get()).write("bar", 3); 737 kj::FdOutputStream(fdBuffer[1].get()).write("baz", 3); 738 } 739 740 // We want to carefully verify that out1, out2, and out3 were closed, without deadlocking if they 741 // weren't. So we manually set nonblocking mode and then issue read()s. 742 KJ_SYSCALL(fcntl(in1, F_SETFL, O_NONBLOCK)); 743 KJ_SYSCALL(fcntl(in2, F_SETFL, O_NONBLOCK)); 744 KJ_SYSCALL(fcntl(in3, F_SETFL, O_NONBLOCK)); 745 746 char buffer[4]; 747 ssize_t n; 748 749 // First we read "bar" from in1. 750 KJ_NONBLOCKING_SYSCALL(n = read(in1, buffer, 4)); 751 KJ_ASSERT(n == 3); 752 buffer[3] = '\0'; 753 KJ_ASSERT(kj::StringPtr(buffer) == "bar"); 754 755 // Now it should be EOF. 756 KJ_NONBLOCKING_SYSCALL(n = read(in1, buffer, 4)); 757 if (n < 0) { 758 KJ_FAIL_ASSERT("out1 was not closed"); 759 } 760 KJ_ASSERT(n == 0); 761 762 // Next we read "baz" from in2. 763 KJ_NONBLOCKING_SYSCALL(n = read(in2, buffer, 4)); 764 KJ_ASSERT(n == 3); 765 buffer[3] = '\0'; 766 KJ_ASSERT(kj::StringPtr(buffer) == "baz"); 767 768 // Now it should be EOF. 769 KJ_NONBLOCKING_SYSCALL(n = read(in2, buffer, 4)); 770 if (n < 0) { 771 KJ_FAIL_ASSERT("out2 was not closed"); 772 } 773 KJ_ASSERT(n == 0); 774 775 // Third pipe should have been closed implicitly because we didn't provide space to receive it. 776 KJ_NONBLOCKING_SYSCALL(n = read(in3, buffer, 4)); 777 if (n < 0) { 778 KJ_FAIL_ASSERT("out3 was not closed. This could indicate that your operating system kernel is " 779 "buggy and leaks file descriptors when an SCM_RIGHTS message is truncated. FreeBSD was " 780 "known to do this until late 2018, while MacOS still has this bug as of this writing in " 781 "2019. However, KJ works around the problem on those platforms. You need to enable the " 782 "same work-around for your OS -- search for 'SCM_RIGHTS' in src/kj/async-io-unix.c++."); 783 } 784 KJ_ASSERT(n == 0); 785 } 786 787 #endif // !__aarch64__ 788 789 #endif // !_WIN32 && !__CYGWIN__ 790 791 TEST(AsyncIo, PipeThread) { 792 auto ioContext = setupAsyncIo(); 793 794 auto pipeThread = ioContext.provider->newPipeThread( 795 [](AsyncIoProvider& ioProvider, AsyncIoStream& stream, WaitScope& waitScope) { 796 char buf[4]; 797 stream.write("foo", 3).wait(waitScope); 798 EXPECT_EQ(3u, stream.tryRead(buf, 3, 4).wait(waitScope)); 799 EXPECT_EQ("bar", heapString(buf, 3)); 800 801 // Expect disconnect. 802 EXPECT_EQ(0, stream.tryRead(buf, 1, 1).wait(waitScope)); 803 }); 804 805 char buf[4]; 806 pipeThread.pipe->write("bar", 3).wait(ioContext.waitScope); 807 EXPECT_EQ(3u, pipeThread.pipe->tryRead(buf, 3, 4).wait(ioContext.waitScope)); 808 EXPECT_EQ("foo", heapString(buf, 3)); 809 } 810 811 TEST(AsyncIo, PipeThreadDisconnects) { 812 // Like above, but in this case we expect the main thread to detect the pipe thread disconnecting. 813 814 auto ioContext = setupAsyncIo(); 815 816 auto pipeThread = ioContext.provider->newPipeThread( 817 [](AsyncIoProvider& ioProvider, AsyncIoStream& stream, WaitScope& waitScope) { 818 char buf[4]; 819 stream.write("foo", 3).wait(waitScope); 820 EXPECT_EQ(3u, stream.tryRead(buf, 3, 4).wait(waitScope)); 821 EXPECT_EQ("bar", heapString(buf, 3)); 822 }); 823 824 char buf[4]; 825 EXPECT_EQ(3u, pipeThread.pipe->tryRead(buf, 3, 4).wait(ioContext.waitScope)); 826 EXPECT_EQ("foo", heapString(buf, 3)); 827 828 pipeThread.pipe->write("bar", 3).wait(ioContext.waitScope); 829 830 // Expect disconnect. 831 EXPECT_EQ(0, pipeThread.pipe->tryRead(buf, 1, 1).wait(ioContext.waitScope)); 832 } 833 834 TEST(AsyncIo, Timeouts) { 835 auto ioContext = setupAsyncIo(); 836 837 Timer& timer = ioContext.provider->getTimer(); 838 839 auto promise1 = timer.timeoutAfter(10 * MILLISECONDS, kj::Promise<void>(kj::NEVER_DONE)); 840 auto promise2 = timer.timeoutAfter(100 * MILLISECONDS, kj::Promise<int>(123)); 841 842 EXPECT_TRUE(promise1.then([]() { return false; }, [](kj::Exception&& e) { return true; }) 843 .wait(ioContext.waitScope)); 844 EXPECT_EQ(123, promise2.wait(ioContext.waitScope)); 845 } 846 847 #if !_WIN32 // datagrams not implemented on win32 yet 848 849 bool isMsgTruncBroken() { 850 // Detect if the kernel fails to set MSG_TRUNC on recvmsg(). This seems to be the case at least 851 // when running an arm64 binary under qemu. 852 853 int fd; 854 KJ_SYSCALL(fd = socket(AF_INET, SOCK_DGRAM, 0)); 855 KJ_DEFER(close(fd)); 856 857 struct sockaddr_in addr; 858 memset(&addr, 0, sizeof(addr)); 859 addr.sin_family = AF_INET; 860 addr.sin_addr.s_addr = htonl(0x7f000001); 861 KJ_SYSCALL(bind(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr))); 862 863 // Read back the assigned port. 864 socklen_t len = sizeof(addr); 865 KJ_SYSCALL(getsockname(fd, reinterpret_cast<struct sockaddr*>(&addr), &len)); 866 KJ_ASSERT(len == sizeof(addr)); 867 868 const char* message = "foobar"; 869 KJ_SYSCALL(sendto(fd, message, strlen(message), 0, 870 reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr))); 871 872 char buf[4]; 873 struct iovec iov; 874 iov.iov_base = buf; 875 iov.iov_len = 3; 876 struct msghdr msg; 877 memset(&msg, 0, sizeof(msg)); 878 msg.msg_iov = &iov; 879 msg.msg_iovlen = 1; 880 ssize_t n; 881 KJ_SYSCALL(n = recvmsg(fd, &msg, 0)); 882 KJ_ASSERT(n == 3); 883 884 buf[3] = 0; 885 KJ_ASSERT(kj::StringPtr(buf) == "foo"); 886 887 return (msg.msg_flags & MSG_TRUNC) == 0; 888 } 889 890 TEST(AsyncIo, Udp) { 891 bool msgTruncBroken = isMsgTruncBroken(); 892 893 auto ioContext = setupAsyncIo(); 894 895 auto addr = ioContext.provider->getNetwork().parseAddress("127.0.0.1").wait(ioContext.waitScope); 896 897 auto port1 = addr->bindDatagramPort(); 898 auto port2 = addr->bindDatagramPort(); 899 900 auto addr1 = ioContext.provider->getNetwork().parseAddress("127.0.0.1", port1->getPort()) 901 .wait(ioContext.waitScope); 902 auto addr2 = ioContext.provider->getNetwork().parseAddress("127.0.0.1", port2->getPort()) 903 .wait(ioContext.waitScope); 904 905 Own<NetworkAddress> receivedAddr; 906 907 { 908 // Send a message and receive it. 909 EXPECT_EQ(3, port1->send("foo", 3, *addr2).wait(ioContext.waitScope)); 910 auto receiver = port2->makeReceiver(); 911 912 receiver->receive().wait(ioContext.waitScope); 913 { 914 auto content = receiver->getContent(); 915 EXPECT_EQ("foo", kj::heapString(content.value.asChars())); 916 EXPECT_FALSE(content.isTruncated); 917 } 918 receivedAddr = receiver->getSource().clone(); 919 EXPECT_EQ(addr1->toString(), receivedAddr->toString()); 920 { 921 auto ancillary = receiver->getAncillary(); 922 EXPECT_EQ(0, ancillary.value.size()); 923 EXPECT_FALSE(ancillary.isTruncated); 924 } 925 926 // Receive a second message with the same receiver. 927 { 928 auto promise = receiver->receive(); // This time, start receiving before sending 929 EXPECT_EQ(6, port1->send("barbaz", 6, *addr2).wait(ioContext.waitScope)); 930 promise.wait(ioContext.waitScope); 931 auto content = receiver->getContent(); 932 EXPECT_EQ("barbaz", kj::heapString(content.value.asChars())); 933 EXPECT_FALSE(content.isTruncated); 934 } 935 } 936 937 DatagramReceiver::Capacity capacity; 938 capacity.content = 8; 939 capacity.ancillary = 1024; 940 941 { 942 // Send a reply that will be truncated. 943 EXPECT_EQ(16, port2->send("0123456789abcdef", 16, *receivedAddr).wait(ioContext.waitScope)); 944 auto recv1 = port1->makeReceiver(capacity); 945 946 recv1->receive().wait(ioContext.waitScope); 947 { 948 auto content = recv1->getContent(); 949 EXPECT_EQ("01234567", kj::heapString(content.value.asChars())); 950 EXPECT_TRUE(content.isTruncated || msgTruncBroken); 951 } 952 EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); 953 { 954 auto ancillary = recv1->getAncillary(); 955 EXPECT_EQ(0, ancillary.value.size()); 956 EXPECT_FALSE(ancillary.isTruncated); 957 } 958 959 #if defined(IP_PKTINFO) && !__CYGWIN__ && !__aarch64__ 960 // Set IP_PKTINFO header and try to receive it. 961 // 962 // Doesn't work on Cygwin; see: https://cygwin.com/ml/cygwin/2009-01/msg00350.html 963 // TODO(someday): Might work on more-recent Cygwin; I'm still testing against 1.7. 964 // 965 // Doesn't work when running arm64 binaries under QEMU -- in fact, it crashes QEMU. We don't 966 // have a good way to test if we're under QEMU so we just skip this test on aarch64. 967 int one = 1; 968 port1->setsockopt(IPPROTO_IP, IP_PKTINFO, &one, sizeof(one)); 969 970 EXPECT_EQ(3, port2->send("foo", 3, *addr1).wait(ioContext.waitScope)); 971 972 recv1->receive().wait(ioContext.waitScope); 973 { 974 auto content = recv1->getContent(); 975 EXPECT_EQ("foo", kj::heapString(content.value.asChars())); 976 EXPECT_FALSE(content.isTruncated); 977 } 978 EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); 979 { 980 auto ancillary = recv1->getAncillary(); 981 EXPECT_FALSE(ancillary.isTruncated); 982 ASSERT_EQ(1, ancillary.value.size()); 983 984 auto message = ancillary.value[0]; 985 EXPECT_EQ(IPPROTO_IP, message.getLevel()); 986 EXPECT_EQ(IP_PKTINFO, message.getType()); 987 EXPECT_EQ(sizeof(struct in_pktinfo), message.asArray<byte>().size()); 988 auto& pktinfo = KJ_ASSERT_NONNULL(message.as<struct in_pktinfo>()); 989 EXPECT_EQ(htonl(0x7F000001), pktinfo.ipi_addr.s_addr); // 127.0.0.1 990 } 991 992 // See what happens if there's not quite enough space for in_pktinfo. 993 capacity.ancillary = CMSG_SPACE(sizeof(struct in_pktinfo)) - 8; 994 recv1 = port1->makeReceiver(capacity); 995 996 EXPECT_EQ(3, port2->send("bar", 3, *addr1).wait(ioContext.waitScope)); 997 998 recv1->receive().wait(ioContext.waitScope); 999 { 1000 auto content = recv1->getContent(); 1001 EXPECT_EQ("bar", kj::heapString(content.value.asChars())); 1002 EXPECT_FALSE(content.isTruncated); 1003 } 1004 EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); 1005 { 1006 auto ancillary = recv1->getAncillary(); 1007 EXPECT_TRUE(ancillary.isTruncated || msgTruncBroken); 1008 1009 // We might get a message, but it will be truncated. 1010 if (ancillary.value.size() != 0) { 1011 EXPECT_EQ(1, ancillary.value.size()); 1012 1013 auto message = ancillary.value[0]; 1014 EXPECT_EQ(IPPROTO_IP, message.getLevel()); 1015 EXPECT_EQ(IP_PKTINFO, message.getType()); 1016 1017 EXPECT_TRUE(message.as<struct in_pktinfo>() == nullptr); 1018 EXPECT_LT(message.asArray<byte>().size(), sizeof(struct in_pktinfo)); 1019 } 1020 } 1021 1022 #if __APPLE__ 1023 // On MacOS, `CMSG_SPACE(0)` triggers a bogus warning. 1024 #pragma GCC diagnostic ignored "-Wnull-pointer-arithmetic" 1025 #endif 1026 // See what happens if there's not enough space even for the cmsghdr. 1027 capacity.ancillary = CMSG_SPACE(0) - 8; 1028 recv1 = port1->makeReceiver(capacity); 1029 1030 EXPECT_EQ(3, port2->send("baz", 3, *addr1).wait(ioContext.waitScope)); 1031 1032 recv1->receive().wait(ioContext.waitScope); 1033 { 1034 auto content = recv1->getContent(); 1035 EXPECT_EQ("baz", kj::heapString(content.value.asChars())); 1036 EXPECT_FALSE(content.isTruncated); 1037 } 1038 EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); 1039 { 1040 auto ancillary = recv1->getAncillary(); 1041 EXPECT_TRUE(ancillary.isTruncated); 1042 EXPECT_EQ(0, ancillary.value.size()); 1043 } 1044 #endif 1045 } 1046 } 1047 1048 #endif // !_WIN32 1049 1050 #ifdef __linux__ // Abstract unix sockets are only supported on Linux 1051 1052 TEST(AsyncIo, AbstractUnixSocket) { 1053 auto ioContext = setupAsyncIo(); 1054 auto& network = ioContext.provider->getNetwork(); 1055 auto elapsedSinceEpoch = systemPreciseMonotonicClock().now() - kj::origin<TimePoint>(); 1056 auto address = kj::str("unix-abstract:foo", getpid(), elapsedSinceEpoch / kj::NANOSECONDS); 1057 1058 Own<NetworkAddress> addr = network.parseAddress(address).wait(ioContext.waitScope); 1059 1060 Own<ConnectionReceiver> listener = addr->listen(); 1061 // chdir proves no filesystem dependence. Test fails for regular unix socket 1062 // but passes for abstract unix socket. 1063 int originalDirFd; 1064 KJ_SYSCALL(originalDirFd = open(".", O_RDONLY | O_DIRECTORY | O_CLOEXEC)); 1065 KJ_DEFER(close(originalDirFd)); 1066 KJ_SYSCALL(chdir("/")); 1067 KJ_DEFER(KJ_SYSCALL(fchdir(originalDirFd))); 1068 1069 addr->connect().attach(kj::mv(listener)).wait(ioContext.waitScope); 1070 } 1071 1072 #endif // __linux__ 1073 1074 KJ_TEST("CIDR parsing") { 1075 KJ_EXPECT(_::CidrRange("1.2.3.4/16").toString() == "1.2.0.0/16"); 1076 KJ_EXPECT(_::CidrRange("1.2.255.4/18").toString() == "1.2.192.0/18"); 1077 KJ_EXPECT(_::CidrRange("1234::abcd:ffff:ffff/98").toString() == "1234::abcd:c000:0/98"); 1078 1079 KJ_EXPECT(_::CidrRange::inet4({1,2,255,4}, 18).toString() == "1.2.192.0/18"); 1080 KJ_EXPECT(_::CidrRange::inet6({0x1234, 0x5678}, {0xabcd, 0xffff, 0xffff}, 98).toString() == 1081 "1234:5678::abcd:c000:0/98"); 1082 1083 union { 1084 struct sockaddr addr; 1085 struct sockaddr_in addr4; 1086 struct sockaddr_in6 addr6; 1087 }; 1088 memset(&addr6, 0, sizeof(addr6)); 1089 1090 { 1091 addr4.sin_family = AF_INET; 1092 addr4.sin_addr.s_addr = htonl(0x0102dfff); 1093 KJ_EXPECT(_::CidrRange("1.2.255.255/18").matches(&addr)); 1094 KJ_EXPECT(!_::CidrRange("1.2.255.255/19").matches(&addr)); 1095 KJ_EXPECT(_::CidrRange("1.2.0.0/16").matches(&addr)); 1096 KJ_EXPECT(!_::CidrRange("1.3.0.0/16").matches(&addr)); 1097 KJ_EXPECT(_::CidrRange("1.2.223.255/32").matches(&addr)); 1098 KJ_EXPECT(_::CidrRange("0.0.0.0/0").matches(&addr)); 1099 KJ_EXPECT(!_::CidrRange("::/0").matches(&addr)); 1100 } 1101 1102 { 1103 addr4.sin_family = AF_INET6; 1104 byte bytes[16] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}; 1105 memcpy(addr6.sin6_addr.s6_addr, bytes, 16); 1106 KJ_EXPECT(_::CidrRange("0102:03ff::/24").matches(&addr)); 1107 KJ_EXPECT(!_::CidrRange("0102:02ff::/24").matches(&addr)); 1108 KJ_EXPECT(_::CidrRange("0102:02ff::/23").matches(&addr)); 1109 KJ_EXPECT(_::CidrRange("0102:0304:0506:0708:090a:0b0c:0d0e:0f10/128").matches(&addr)); 1110 KJ_EXPECT(_::CidrRange("::/0").matches(&addr)); 1111 KJ_EXPECT(!_::CidrRange("0.0.0.0/0").matches(&addr)); 1112 } 1113 1114 { 1115 addr4.sin_family = AF_INET6; 1116 inet_pton(AF_INET6, "::ffff:1.2.223.255", &addr6.sin6_addr); 1117 KJ_EXPECT(_::CidrRange("1.2.255.255/18").matches(&addr)); 1118 KJ_EXPECT(!_::CidrRange("1.2.255.255/19").matches(&addr)); 1119 KJ_EXPECT(_::CidrRange("1.2.0.0/16").matches(&addr)); 1120 KJ_EXPECT(!_::CidrRange("1.3.0.0/16").matches(&addr)); 1121 KJ_EXPECT(_::CidrRange("1.2.223.255/32").matches(&addr)); 1122 KJ_EXPECT(_::CidrRange("0.0.0.0/0").matches(&addr)); 1123 KJ_EXPECT(_::CidrRange("::/0").matches(&addr)); 1124 } 1125 } 1126 1127 bool allowed4(_::NetworkFilter& filter, StringPtr addrStr) { 1128 struct sockaddr_in addr; 1129 memset(&addr, 0, sizeof(addr)); 1130 addr.sin_family = AF_INET; 1131 inet_pton(AF_INET, addrStr.cStr(), &addr.sin_addr); 1132 return filter.shouldAllow(reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)); 1133 } 1134 1135 bool allowed6(_::NetworkFilter& filter, StringPtr addrStr) { 1136 struct sockaddr_in6 addr; 1137 memset(&addr, 0, sizeof(addr)); 1138 addr.sin6_family = AF_INET6; 1139 inet_pton(AF_INET6, addrStr.cStr(), &addr.sin6_addr); 1140 return filter.shouldAllow(reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)); 1141 } 1142 1143 KJ_TEST("NetworkFilter") { 1144 _::NetworkFilter base; 1145 1146 KJ_EXPECT(allowed4(base, "8.8.8.8")); 1147 KJ_EXPECT(!allowed4(base, "240.1.2.3")); 1148 1149 { 1150 _::NetworkFilter filter({"public"}, {}, base); 1151 1152 KJ_EXPECT(allowed4(filter, "8.8.8.8")); 1153 KJ_EXPECT(!allowed4(filter, "240.1.2.3")); 1154 1155 KJ_EXPECT(!allowed4(filter, "192.168.0.1")); 1156 KJ_EXPECT(!allowed4(filter, "10.1.2.3")); 1157 KJ_EXPECT(!allowed4(filter, "127.0.0.1")); 1158 KJ_EXPECT(!allowed4(filter, "0.0.0.0")); 1159 1160 KJ_EXPECT(allowed6(filter, "2400:cb00:2048:1::c629:d7a2")); 1161 KJ_EXPECT(!allowed6(filter, "fc00::1234")); 1162 KJ_EXPECT(!allowed6(filter, "::1")); 1163 KJ_EXPECT(!allowed6(filter, "::")); 1164 } 1165 1166 { 1167 _::NetworkFilter filter({"private"}, {"local"}, base); 1168 1169 KJ_EXPECT(!allowed4(filter, "8.8.8.8")); 1170 KJ_EXPECT(!allowed4(filter, "240.1.2.3")); 1171 1172 KJ_EXPECT(allowed4(filter, "192.168.0.1")); 1173 KJ_EXPECT(allowed4(filter, "10.1.2.3")); 1174 KJ_EXPECT(!allowed4(filter, "127.0.0.1")); 1175 KJ_EXPECT(!allowed4(filter, "0.0.0.0")); 1176 1177 KJ_EXPECT(!allowed6(filter, "2400:cb00:2048:1::c629:d7a2")); 1178 KJ_EXPECT(allowed6(filter, "fc00::1234")); 1179 KJ_EXPECT(!allowed6(filter, "::1")); 1180 KJ_EXPECT(!allowed6(filter, "::")); 1181 } 1182 1183 { 1184 _::NetworkFilter filter({"1.0.0.0/8", "1.2.3.0/24"}, {"1.2.0.0/16", "1.2.3.4/32"}, base); 1185 1186 KJ_EXPECT(!allowed4(filter, "8.8.8.8")); 1187 KJ_EXPECT(!allowed4(filter, "240.1.2.3")); 1188 1189 KJ_EXPECT(allowed4(filter, "1.0.0.1")); 1190 KJ_EXPECT(!allowed4(filter, "1.2.2.1")); 1191 KJ_EXPECT(allowed4(filter, "1.2.3.1")); 1192 KJ_EXPECT(!allowed4(filter, "1.2.3.4")); 1193 } 1194 } 1195 1196 KJ_TEST("Network::restrictPeers()") { 1197 auto ioContext = setupAsyncIo(); 1198 auto& w = ioContext.waitScope; 1199 auto& network = ioContext.provider->getNetwork(); 1200 auto restrictedNetwork = network.restrictPeers({"public"}); 1201 1202 KJ_EXPECT(tryParse(w, *restrictedNetwork, "8.8.8.8") == "8.8.8.8:0"); 1203 #if !_WIN32 1204 KJ_EXPECT_THROW_MESSAGE("restrictPeers", tryParse(w, *restrictedNetwork, "unix:/foo")); 1205 #endif 1206 1207 auto addr = restrictedNetwork->parseAddress("127.0.0.1").wait(w); 1208 1209 auto listener = addr->listen(); 1210 auto acceptTask = listener->accept() 1211 .then([](kj::Own<kj::AsyncIoStream>) { 1212 KJ_FAIL_EXPECT("should not have received connection"); 1213 }).eagerlyEvaluate(nullptr); 1214 1215 KJ_EXPECT_THROW_MESSAGE("restrictPeers", addr->connect().wait(w)); 1216 1217 // We can connect to the listener but the connection will be immediately closed. 1218 auto addr2 = network.parseAddress("127.0.0.1", listener->getPort()).wait(w); 1219 auto conn = addr2->connect().wait(w); 1220 KJ_EXPECT(conn->readAllText().wait(w) == ""); 1221 } 1222 1223 kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) { 1224 if (expected.size() == 0) return kj::READY_NOW; 1225 1226 auto buffer = kj::heapArray<char>(expected.size()); 1227 1228 auto promise = in.tryRead(buffer.begin(), 1, buffer.size()); 1229 return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<char> buffer, size_t amount) { 1230 if (amount == 0) { 1231 KJ_FAIL_ASSERT("expected data never sent", expected); 1232 } 1233 1234 auto actual = buffer.slice(0, amount); 1235 if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) { 1236 KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual); 1237 } 1238 1239 return expectRead(in, expected.slice(amount)); 1240 })); 1241 } 1242 1243 class MockAsyncInputStream final: public AsyncInputStream { 1244 public: 1245 MockAsyncInputStream(kj::ArrayPtr<const byte> bytes, size_t blockSize) 1246 : bytes(bytes), blockSize(blockSize) {} 1247 1248 kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { 1249 // Clamp max read to blockSize. 1250 size_t n = kj::min(blockSize, maxBytes); 1251 1252 // Unless that's less than minBytes -- in which case, use minBytes. 1253 n = kj::max(n, minBytes); 1254 1255 // But also don't read more data than we have. 1256 n = kj::min(n, bytes.size()); 1257 1258 memcpy(buffer, bytes.begin(), n); 1259 bytes = bytes.slice(n, bytes.size()); 1260 return n; 1261 } 1262 1263 private: 1264 kj::ArrayPtr<const byte> bytes; 1265 size_t blockSize; 1266 }; 1267 1268 KJ_TEST("AsyncInputStream::readAllText() / readAllBytes()") { 1269 kj::EventLoop loop; 1270 WaitScope ws(loop); 1271 1272 auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ","); 1273 size_t inputSizes[] = { 0, 1, 256, 4096, 8191, 8192, 8193, 10000, bigText.size() }; 1274 size_t blockSizes[] = { 1, 4, 256, 4096, 8192, bigText.size() }; 1275 uint64_t limits[] = { 1276 0, 1, 256, 1277 bigText.size() / 2, 1278 bigText.size() - 1, 1279 bigText.size(), 1280 bigText.size() + 1, 1281 kj::maxValue 1282 }; 1283 1284 for (size_t inputSize: inputSizes) { 1285 for (size_t blockSize: blockSizes) { 1286 for (uint64_t limit: limits) { 1287 KJ_CONTEXT(inputSize, blockSize, limit); 1288 auto textSlice = bigText.asBytes().slice(0, inputSize); 1289 auto readAllText = [&]() { 1290 MockAsyncInputStream input(textSlice, blockSize); 1291 return input.readAllText(limit).wait(ws); 1292 }; 1293 auto readAllBytes = [&]() { 1294 MockAsyncInputStream input(textSlice, blockSize); 1295 return input.readAllBytes(limit).wait(ws); 1296 }; 1297 if (limit > inputSize) { 1298 KJ_EXPECT(readAllText().asBytes() == textSlice); 1299 KJ_EXPECT(readAllBytes() == textSlice); 1300 } else { 1301 KJ_EXPECT_THROW_MESSAGE("Reached limit before EOF.", readAllText()); 1302 KJ_EXPECT_THROW_MESSAGE("Reached limit before EOF.", readAllBytes()); 1303 } 1304 } 1305 } 1306 } 1307 } 1308 1309 KJ_TEST("Userland pipe") { 1310 kj::EventLoop loop; 1311 WaitScope ws(loop); 1312 1313 auto pipe = newOneWayPipe(); 1314 1315 auto promise = pipe.out->write("foo", 3); 1316 KJ_EXPECT(!promise.poll(ws)); 1317 1318 char buf[4]; 1319 KJ_EXPECT(pipe.in->tryRead(buf, 1, 4).wait(ws) == 3); 1320 buf[3] = '\0'; 1321 KJ_EXPECT(buf == "foo"_kj); 1322 1323 promise.wait(ws); 1324 1325 auto promise2 = pipe.in->readAllText(); 1326 KJ_EXPECT(!promise2.poll(ws)); 1327 1328 pipe.out = nullptr; 1329 KJ_EXPECT(promise2.wait(ws) == ""); 1330 } 1331 1332 KJ_TEST("Userland pipe cancel write") { 1333 kj::EventLoop loop; 1334 WaitScope ws(loop); 1335 1336 auto pipe = newOneWayPipe(); 1337 1338 auto promise = pipe.out->write("foobar", 6); 1339 KJ_EXPECT(!promise.poll(ws)); 1340 1341 expectRead(*pipe.in, "foo").wait(ws); 1342 KJ_EXPECT(!promise.poll(ws)); 1343 promise = nullptr; 1344 1345 promise = pipe.out->write("baz", 3); 1346 expectRead(*pipe.in, "baz").wait(ws); 1347 promise.wait(ws); 1348 1349 pipe.out = nullptr; 1350 KJ_EXPECT(pipe.in->readAllText().wait(ws) == ""); 1351 } 1352 1353 KJ_TEST("Userland pipe cancel read") { 1354 kj::EventLoop loop; 1355 WaitScope ws(loop); 1356 1357 auto pipe = newOneWayPipe(); 1358 1359 auto writeOp = pipe.out->write("foo", 3); 1360 auto readOp = expectRead(*pipe.in, "foobar"); 1361 writeOp.wait(ws); 1362 KJ_EXPECT(!readOp.poll(ws)); 1363 readOp = nullptr; 1364 1365 auto writeOp2 = pipe.out->write("baz", 3); 1366 expectRead(*pipe.in, "baz").wait(ws); 1367 } 1368 1369 KJ_TEST("Userland pipe pumpTo") { 1370 kj::EventLoop loop; 1371 WaitScope ws(loop); 1372 1373 auto pipe = newOneWayPipe(); 1374 auto pipe2 = newOneWayPipe(); 1375 auto pumpPromise = pipe.in->pumpTo(*pipe2.out); 1376 1377 auto promise = pipe.out->write("foo", 3); 1378 KJ_EXPECT(!promise.poll(ws)); 1379 1380 expectRead(*pipe2.in, "foo").wait(ws); 1381 1382 promise.wait(ws); 1383 1384 auto promise2 = pipe2.in->readAllText(); 1385 KJ_EXPECT(!promise2.poll(ws)); 1386 1387 pipe.out = nullptr; 1388 KJ_EXPECT(pumpPromise.wait(ws) == 3); 1389 } 1390 1391 KJ_TEST("Userland pipe tryPumpFrom") { 1392 kj::EventLoop loop; 1393 WaitScope ws(loop); 1394 1395 auto pipe = newOneWayPipe(); 1396 auto pipe2 = newOneWayPipe(); 1397 auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); 1398 1399 auto promise = pipe.out->write("foo", 3); 1400 KJ_EXPECT(!promise.poll(ws)); 1401 1402 expectRead(*pipe2.in, "foo").wait(ws); 1403 1404 promise.wait(ws); 1405 1406 auto promise2 = pipe2.in->readAllText(); 1407 KJ_EXPECT(!promise2.poll(ws)); 1408 1409 pipe.out = nullptr; 1410 KJ_EXPECT(!promise2.poll(ws)); 1411 KJ_EXPECT(pumpPromise.wait(ws) == 3); 1412 } 1413 1414 KJ_TEST("Userland pipe pumpTo cancel") { 1415 kj::EventLoop loop; 1416 WaitScope ws(loop); 1417 1418 auto pipe = newOneWayPipe(); 1419 auto pipe2 = newOneWayPipe(); 1420 auto pumpPromise = pipe.in->pumpTo(*pipe2.out); 1421 1422 auto promise = pipe.out->write("foobar", 3); 1423 KJ_EXPECT(!promise.poll(ws)); 1424 1425 expectRead(*pipe2.in, "foo").wait(ws); 1426 1427 // Cancel pump. 1428 pumpPromise = nullptr; 1429 1430 auto promise3 = pipe2.out->write("baz", 3); 1431 expectRead(*pipe2.in, "baz").wait(ws); 1432 } 1433 1434 KJ_TEST("Userland pipe tryPumpFrom cancel") { 1435 kj::EventLoop loop; 1436 WaitScope ws(loop); 1437 1438 auto pipe = newOneWayPipe(); 1439 auto pipe2 = newOneWayPipe(); 1440 auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); 1441 1442 auto promise = pipe.out->write("foobar", 3); 1443 KJ_EXPECT(!promise.poll(ws)); 1444 1445 expectRead(*pipe2.in, "foo").wait(ws); 1446 1447 // Cancel pump. 1448 pumpPromise = nullptr; 1449 1450 auto promise3 = pipe2.out->write("baz", 3); 1451 expectRead(*pipe2.in, "baz").wait(ws); 1452 } 1453 1454 KJ_TEST("Userland pipe with limit") { 1455 kj::EventLoop loop; 1456 WaitScope ws(loop); 1457 1458 auto pipe = newOneWayPipe(6); 1459 1460 { 1461 auto promise = pipe.out->write("foo", 3); 1462 KJ_EXPECT(!promise.poll(ws)); 1463 expectRead(*pipe.in, "foo").wait(ws); 1464 promise.wait(ws); 1465 } 1466 1467 { 1468 auto promise = pipe.in->readAllText(); 1469 KJ_EXPECT(!promise.poll(ws)); 1470 auto promise2 = pipe.out->write("barbaz", 6); 1471 KJ_EXPECT(promise.wait(ws) == "bar"); 1472 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("read end of pipe was aborted", promise2.wait(ws)); 1473 } 1474 1475 // Further writes throw and reads return EOF. 1476 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE( 1477 "abortRead() has been called", pipe.out->write("baz", 3).wait(ws)); 1478 KJ_EXPECT(pipe.in->readAllText().wait(ws) == ""); 1479 } 1480 1481 KJ_TEST("Userland pipe pumpTo with limit") { 1482 kj::EventLoop loop; 1483 WaitScope ws(loop); 1484 1485 auto pipe = newOneWayPipe(6); 1486 auto pipe2 = newOneWayPipe(); 1487 auto pumpPromise = pipe.in->pumpTo(*pipe2.out); 1488 1489 { 1490 auto promise = pipe.out->write("foo", 3); 1491 KJ_EXPECT(!promise.poll(ws)); 1492 expectRead(*pipe2.in, "foo").wait(ws); 1493 promise.wait(ws); 1494 } 1495 1496 { 1497 auto promise = expectRead(*pipe2.in, "bar"); 1498 KJ_EXPECT(!promise.poll(ws)); 1499 auto promise2 = pipe.out->write("barbaz", 6); 1500 promise.wait(ws); 1501 pumpPromise.wait(ws); 1502 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("read end of pipe was aborted", promise2.wait(ws)); 1503 } 1504 1505 // Further writes throw. 1506 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE( 1507 "abortRead() has been called", pipe.out->write("baz", 3).wait(ws)); 1508 } 1509 1510 KJ_TEST("Userland pipe pump into zero-limited pipe, no data to pump") { 1511 kj::EventLoop loop; 1512 WaitScope ws(loop); 1513 1514 auto pipe = newOneWayPipe(); 1515 auto pipe2 = newOneWayPipe(uint64_t(0)); 1516 auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); 1517 1518 expectRead(*pipe2.in, ""); 1519 pipe.out = nullptr; 1520 KJ_EXPECT(pumpPromise.wait(ws) == 0); 1521 } 1522 1523 KJ_TEST("Userland pipe pump into zero-limited pipe, data is pumped") { 1524 kj::EventLoop loop; 1525 WaitScope ws(loop); 1526 1527 auto pipe = newOneWayPipe(); 1528 auto pipe2 = newOneWayPipe(uint64_t(0)); 1529 auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); 1530 1531 expectRead(*pipe2.in, ""); 1532 auto writePromise = pipe.out->write("foo", 3); 1533 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("abortRead() has been called", pumpPromise.wait(ws)); 1534 } 1535 1536 KJ_TEST("Userland pipe gather write") { 1537 kj::EventLoop loop; 1538 WaitScope ws(loop); 1539 1540 auto pipe = newOneWayPipe(); 1541 1542 ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; 1543 auto promise = pipe.out->write(parts); 1544 KJ_EXPECT(!promise.poll(ws)); 1545 expectRead(*pipe.in, "foobar").wait(ws); 1546 promise.wait(ws); 1547 1548 auto promise2 = pipe.in->readAllText(); 1549 KJ_EXPECT(!promise2.poll(ws)); 1550 1551 pipe.out = nullptr; 1552 KJ_EXPECT(promise2.wait(ws) == ""); 1553 } 1554 1555 KJ_TEST("Userland pipe gather write split on buffer boundary") { 1556 kj::EventLoop loop; 1557 WaitScope ws(loop); 1558 1559 auto pipe = newOneWayPipe(); 1560 1561 ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; 1562 auto promise = pipe.out->write(parts); 1563 KJ_EXPECT(!promise.poll(ws)); 1564 expectRead(*pipe.in, "foo").wait(ws); 1565 expectRead(*pipe.in, "bar").wait(ws); 1566 promise.wait(ws); 1567 1568 auto promise2 = pipe.in->readAllText(); 1569 KJ_EXPECT(!promise2.poll(ws)); 1570 1571 pipe.out = nullptr; 1572 KJ_EXPECT(promise2.wait(ws) == ""); 1573 } 1574 1575 KJ_TEST("Userland pipe gather write split mid-first-buffer") { 1576 kj::EventLoop loop; 1577 WaitScope ws(loop); 1578 1579 auto pipe = newOneWayPipe(); 1580 1581 ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; 1582 auto promise = pipe.out->write(parts); 1583 KJ_EXPECT(!promise.poll(ws)); 1584 expectRead(*pipe.in, "fo").wait(ws); 1585 expectRead(*pipe.in, "obar").wait(ws); 1586 promise.wait(ws); 1587 1588 auto promise2 = pipe.in->readAllText(); 1589 KJ_EXPECT(!promise2.poll(ws)); 1590 1591 pipe.out = nullptr; 1592 KJ_EXPECT(promise2.wait(ws) == ""); 1593 } 1594 1595 KJ_TEST("Userland pipe gather write split mid-second-buffer") { 1596 kj::EventLoop loop; 1597 WaitScope ws(loop); 1598 1599 auto pipe = newOneWayPipe(); 1600 1601 ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; 1602 auto promise = pipe.out->write(parts); 1603 KJ_EXPECT(!promise.poll(ws)); 1604 expectRead(*pipe.in, "foob").wait(ws); 1605 expectRead(*pipe.in, "ar").wait(ws); 1606 promise.wait(ws); 1607 1608 auto promise2 = pipe.in->readAllText(); 1609 KJ_EXPECT(!promise2.poll(ws)); 1610 1611 pipe.out = nullptr; 1612 KJ_EXPECT(promise2.wait(ws) == ""); 1613 } 1614 1615 KJ_TEST("Userland pipe gather write pump") { 1616 kj::EventLoop loop; 1617 WaitScope ws(loop); 1618 1619 auto pipe = newOneWayPipe(); 1620 auto pipe2 = newOneWayPipe(); 1621 auto pumpPromise = pipe.in->pumpTo(*pipe2.out); 1622 1623 ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; 1624 auto promise = pipe.out->write(parts); 1625 KJ_EXPECT(!promise.poll(ws)); 1626 expectRead(*pipe2.in, "foobar").wait(ws); 1627 promise.wait(ws); 1628 1629 pipe.out = nullptr; 1630 KJ_EXPECT(pumpPromise.wait(ws) == 6); 1631 } 1632 1633 KJ_TEST("Userland pipe gather write pump split on buffer boundary") { 1634 kj::EventLoop loop; 1635 WaitScope ws(loop); 1636 1637 auto pipe = newOneWayPipe(); 1638 auto pipe2 = newOneWayPipe(); 1639 auto pumpPromise = pipe.in->pumpTo(*pipe2.out); 1640 1641 ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; 1642 auto promise = pipe.out->write(parts); 1643 KJ_EXPECT(!promise.poll(ws)); 1644 expectRead(*pipe2.in, "foo").wait(ws); 1645 expectRead(*pipe2.in, "bar").wait(ws); 1646 promise.wait(ws); 1647 1648 pipe.out = nullptr; 1649 KJ_EXPECT(pumpPromise.wait(ws) == 6); 1650 } 1651 1652 KJ_TEST("Userland pipe gather write pump split mid-first-buffer") { 1653 kj::EventLoop loop; 1654 WaitScope ws(loop); 1655 1656 auto pipe = newOneWayPipe(); 1657 auto pipe2 = newOneWayPipe(); 1658 auto pumpPromise = pipe.in->pumpTo(*pipe2.out); 1659 1660 ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; 1661 auto promise = pipe.out->write(parts); 1662 KJ_EXPECT(!promise.poll(ws)); 1663 expectRead(*pipe2.in, "fo").wait(ws); 1664 expectRead(*pipe2.in, "obar").wait(ws); 1665 promise.wait(ws); 1666 1667 pipe.out = nullptr; 1668 KJ_EXPECT(pumpPromise.wait(ws) == 6); 1669 } 1670 1671 KJ_TEST("Userland pipe gather write pump split mid-second-buffer") { 1672 kj::EventLoop loop; 1673 WaitScope ws(loop); 1674 1675 auto pipe = newOneWayPipe(); 1676 auto pipe2 = newOneWayPipe(); 1677 auto pumpPromise = pipe.in->pumpTo(*pipe2.out); 1678 1679 ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; 1680 auto promise = pipe.out->write(parts); 1681 KJ_EXPECT(!promise.poll(ws)); 1682 expectRead(*pipe2.in, "foob").wait(ws); 1683 expectRead(*pipe2.in, "ar").wait(ws); 1684 promise.wait(ws); 1685 1686 pipe.out = nullptr; 1687 KJ_EXPECT(pumpPromise.wait(ws) == 6); 1688 } 1689 1690 KJ_TEST("Userland pipe gather write split pump on buffer boundary") { 1691 kj::EventLoop loop; 1692 WaitScope ws(loop); 1693 1694 auto pipe = newOneWayPipe(); 1695 auto pipe2 = newOneWayPipe(); 1696 auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 3) 1697 .then([&](uint64_t i) { 1698 KJ_EXPECT(i == 3); 1699 return pipe.in->pumpTo(*pipe2.out, 3); 1700 }); 1701 1702 ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; 1703 auto promise = pipe.out->write(parts); 1704 KJ_EXPECT(!promise.poll(ws)); 1705 expectRead(*pipe2.in, "foobar").wait(ws); 1706 promise.wait(ws); 1707 1708 pipe.out = nullptr; 1709 KJ_EXPECT(pumpPromise.wait(ws) == 3); 1710 } 1711 1712 KJ_TEST("Userland pipe gather write split pump mid-first-buffer") { 1713 kj::EventLoop loop; 1714 WaitScope ws(loop); 1715 1716 auto pipe = newOneWayPipe(); 1717 auto pipe2 = newOneWayPipe(); 1718 auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 2) 1719 .then([&](uint64_t i) { 1720 KJ_EXPECT(i == 2); 1721 return pipe.in->pumpTo(*pipe2.out, 4); 1722 }); 1723 1724 ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; 1725 auto promise = pipe.out->write(parts); 1726 KJ_EXPECT(!promise.poll(ws)); 1727 expectRead(*pipe2.in, "foobar").wait(ws); 1728 promise.wait(ws); 1729 1730 pipe.out = nullptr; 1731 KJ_EXPECT(pumpPromise.wait(ws) == 4); 1732 } 1733 1734 KJ_TEST("Userland pipe gather write split pump mid-second-buffer") { 1735 kj::EventLoop loop; 1736 WaitScope ws(loop); 1737 1738 auto pipe = newOneWayPipe(); 1739 auto pipe2 = newOneWayPipe(); 1740 auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 4) 1741 .then([&](uint64_t i) { 1742 KJ_EXPECT(i == 4); 1743 return pipe.in->pumpTo(*pipe2.out, 2); 1744 }); 1745 1746 ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; 1747 auto promise = pipe.out->write(parts); 1748 KJ_EXPECT(!promise.poll(ws)); 1749 expectRead(*pipe2.in, "foobar").wait(ws); 1750 promise.wait(ws); 1751 1752 pipe.out = nullptr; 1753 KJ_EXPECT(pumpPromise.wait(ws) == 2); 1754 } 1755 1756 KJ_TEST("Userland pipe gather write pumpFrom") { 1757 kj::EventLoop loop; 1758 WaitScope ws(loop); 1759 1760 auto pipe = newOneWayPipe(); 1761 auto pipe2 = newOneWayPipe(); 1762 auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); 1763 1764 ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; 1765 auto promise = pipe.out->write(parts); 1766 KJ_EXPECT(!promise.poll(ws)); 1767 expectRead(*pipe2.in, "foobar").wait(ws); 1768 promise.wait(ws); 1769 1770 pipe.out = nullptr; 1771 char c; 1772 auto eofPromise = pipe2.in->tryRead(&c, 1, 1); 1773 eofPromise.poll(ws); // force pump to notice EOF 1774 KJ_EXPECT(pumpPromise.wait(ws) == 6); 1775 pipe2.out = nullptr; 1776 KJ_EXPECT(eofPromise.wait(ws) == 0); 1777 } 1778 1779 KJ_TEST("Userland pipe gather write pumpFrom split on buffer boundary") { 1780 kj::EventLoop loop; 1781 WaitScope ws(loop); 1782 1783 auto pipe = newOneWayPipe(); 1784 auto pipe2 = newOneWayPipe(); 1785 auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); 1786 1787 ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; 1788 auto promise = pipe.out->write(parts); 1789 KJ_EXPECT(!promise.poll(ws)); 1790 expectRead(*pipe2.in, "foo").wait(ws); 1791 expectRead(*pipe2.in, "bar").wait(ws); 1792 promise.wait(ws); 1793 1794 pipe.out = nullptr; 1795 char c; 1796 auto eofPromise = pipe2.in->tryRead(&c, 1, 1); 1797 eofPromise.poll(ws); // force pump to notice EOF 1798 KJ_EXPECT(pumpPromise.wait(ws) == 6); 1799 pipe2.out = nullptr; 1800 KJ_EXPECT(eofPromise.wait(ws) == 0); 1801 } 1802 1803 KJ_TEST("Userland pipe gather write pumpFrom split mid-first-buffer") { 1804 kj::EventLoop loop; 1805 WaitScope ws(loop); 1806 1807 auto pipe = newOneWayPipe(); 1808 auto pipe2 = newOneWayPipe(); 1809 auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); 1810 1811 ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; 1812 auto promise = pipe.out->write(parts); 1813 KJ_EXPECT(!promise.poll(ws)); 1814 expectRead(*pipe2.in, "fo").wait(ws); 1815 expectRead(*pipe2.in, "obar").wait(ws); 1816 promise.wait(ws); 1817 1818 pipe.out = nullptr; 1819 char c; 1820 auto eofPromise = pipe2.in->tryRead(&c, 1, 1); 1821 eofPromise.poll(ws); // force pump to notice EOF 1822 KJ_EXPECT(pumpPromise.wait(ws) == 6); 1823 pipe2.out = nullptr; 1824 KJ_EXPECT(eofPromise.wait(ws) == 0); 1825 } 1826 1827 KJ_TEST("Userland pipe gather write pumpFrom split mid-second-buffer") { 1828 kj::EventLoop loop; 1829 WaitScope ws(loop); 1830 1831 auto pipe = newOneWayPipe(); 1832 auto pipe2 = newOneWayPipe(); 1833 auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); 1834 1835 ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; 1836 auto promise = pipe.out->write(parts); 1837 KJ_EXPECT(!promise.poll(ws)); 1838 expectRead(*pipe2.in, "foob").wait(ws); 1839 expectRead(*pipe2.in, "ar").wait(ws); 1840 promise.wait(ws); 1841 1842 pipe.out = nullptr; 1843 char c; 1844 auto eofPromise = pipe2.in->tryRead(&c, 1, 1); 1845 eofPromise.poll(ws); // force pump to notice EOF 1846 KJ_EXPECT(pumpPromise.wait(ws) == 6); 1847 pipe2.out = nullptr; 1848 KJ_EXPECT(eofPromise.wait(ws) == 0); 1849 } 1850 1851 KJ_TEST("Userland pipe gather write split pumpFrom on buffer boundary") { 1852 kj::EventLoop loop; 1853 WaitScope ws(loop); 1854 1855 auto pipe = newOneWayPipe(); 1856 auto pipe2 = newOneWayPipe(); 1857 auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 3)) 1858 .then([&](uint64_t i) { 1859 KJ_EXPECT(i == 3); 1860 return KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 3)); 1861 }); 1862 1863 ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; 1864 auto promise = pipe.out->write(parts); 1865 KJ_EXPECT(!promise.poll(ws)); 1866 expectRead(*pipe2.in, "foobar").wait(ws); 1867 promise.wait(ws); 1868 1869 pipe.out = nullptr; 1870 KJ_EXPECT(pumpPromise.wait(ws) == 3); 1871 } 1872 1873 KJ_TEST("Userland pipe gather write split pumpFrom mid-first-buffer") { 1874 kj::EventLoop loop; 1875 WaitScope ws(loop); 1876 1877 auto pipe = newOneWayPipe(); 1878 auto pipe2 = newOneWayPipe(); 1879 auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 2)) 1880 .then([&](uint64_t i) { 1881 KJ_EXPECT(i == 2); 1882 return KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 4)); 1883 }); 1884 1885 ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; 1886 auto promise = pipe.out->write(parts); 1887 KJ_EXPECT(!promise.poll(ws)); 1888 expectRead(*pipe2.in, "foobar").wait(ws); 1889 promise.wait(ws); 1890 1891 pipe.out = nullptr; 1892 KJ_EXPECT(pumpPromise.wait(ws) == 4); 1893 } 1894 1895 KJ_TEST("Userland pipe gather write split pumpFrom mid-second-buffer") { 1896 kj::EventLoop loop; 1897 WaitScope ws(loop); 1898 1899 auto pipe = newOneWayPipe(); 1900 auto pipe2 = newOneWayPipe(); 1901 auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 4)) 1902 .then([&](uint64_t i) { 1903 KJ_EXPECT(i == 4); 1904 return KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 2)); 1905 }); 1906 1907 ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; 1908 auto promise = pipe.out->write(parts); 1909 KJ_EXPECT(!promise.poll(ws)); 1910 expectRead(*pipe2.in, "foobar").wait(ws); 1911 promise.wait(ws); 1912 1913 pipe.out = nullptr; 1914 KJ_EXPECT(pumpPromise.wait(ws) == 2); 1915 } 1916 1917 KJ_TEST("Userland pipe pumpTo less than write amount") { 1918 kj::EventLoop loop; 1919 WaitScope ws(loop); 1920 1921 auto pipe = newOneWayPipe(); 1922 auto pipe2 = newOneWayPipe(); 1923 auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 1); 1924 1925 auto pieces = kj::heapArray<ArrayPtr<const byte>>(2); 1926 byte a[1] = { 'a' }; 1927 byte b[1] = { 'b' }; 1928 pieces[0] = arrayPtr(a, 1); 1929 pieces[1] = arrayPtr(b, 1); 1930 1931 auto writePromise = pipe.out->write(pieces); 1932 KJ_EXPECT(!writePromise.poll(ws)); 1933 1934 expectRead(*pipe2.in, "a").wait(ws); 1935 KJ_EXPECT(pumpPromise.wait(ws) == 1); 1936 KJ_EXPECT(!writePromise.poll(ws)); 1937 1938 pumpPromise = pipe.in->pumpTo(*pipe2.out, 1); 1939 1940 expectRead(*pipe2.in, "b").wait(ws); 1941 KJ_EXPECT(pumpPromise.wait(ws) == 1); 1942 writePromise.wait(ws); 1943 } 1944 1945 KJ_TEST("Userland pipe pumpFrom EOF on abortRead()") { 1946 kj::EventLoop loop; 1947 WaitScope ws(loop); 1948 1949 auto pipe = newOneWayPipe(); 1950 auto pipe2 = newOneWayPipe(); 1951 auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); 1952 1953 auto promise = pipe.out->write("foobar", 6); 1954 KJ_EXPECT(!promise.poll(ws)); 1955 expectRead(*pipe2.in, "foobar").wait(ws); 1956 promise.wait(ws); 1957 1958 KJ_EXPECT(!pumpPromise.poll(ws)); 1959 pipe.out = nullptr; 1960 pipe2.in = nullptr; // force pump to notice EOF 1961 KJ_EXPECT(pumpPromise.wait(ws) == 6); 1962 pipe2.out = nullptr; 1963 } 1964 1965 KJ_TEST("Userland pipe EOF fulfills pumpFrom promise") { 1966 kj::EventLoop loop; 1967 WaitScope ws(loop); 1968 1969 auto pipe = newOneWayPipe(); 1970 auto pipe2 = newOneWayPipe(); 1971 auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); 1972 1973 auto writePromise = pipe.out->write("foobar", 6); 1974 KJ_EXPECT(!writePromise.poll(ws)); 1975 auto pipe3 = newOneWayPipe(); 1976 auto pumpPromise2 = pipe2.in->pumpTo(*pipe3.out); 1977 KJ_EXPECT(!pumpPromise2.poll(ws)); 1978 expectRead(*pipe3.in, "foobar").wait(ws); 1979 writePromise.wait(ws); 1980 1981 KJ_EXPECT(!pumpPromise.poll(ws)); 1982 pipe.out = nullptr; 1983 KJ_EXPECT(pumpPromise.wait(ws) == 6); 1984 1985 KJ_EXPECT(!pumpPromise2.poll(ws)); 1986 pipe2.out = nullptr; 1987 KJ_EXPECT(pumpPromise2.wait(ws) == 6); 1988 } 1989 1990 KJ_TEST("Userland pipe tryPumpFrom to pumpTo for same amount fulfills simultaneously") { 1991 kj::EventLoop loop; 1992 WaitScope ws(loop); 1993 1994 auto pipe = newOneWayPipe(); 1995 auto pipe2 = newOneWayPipe(); 1996 auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 6)); 1997 1998 auto writePromise = pipe.out->write("foobar", 6); 1999 KJ_EXPECT(!writePromise.poll(ws)); 2000 auto pipe3 = newOneWayPipe(); 2001 auto pumpPromise2 = pipe2.in->pumpTo(*pipe3.out, 6); 2002 KJ_EXPECT(!pumpPromise2.poll(ws)); 2003 expectRead(*pipe3.in, "foobar").wait(ws); 2004 writePromise.wait(ws); 2005 2006 KJ_EXPECT(pumpPromise.wait(ws) == 6); 2007 KJ_EXPECT(pumpPromise2.wait(ws) == 6); 2008 } 2009 2010 KJ_TEST("Userland pipe multi-part write doesn't quit early") { 2011 kj::EventLoop loop; 2012 WaitScope ws(loop); 2013 2014 auto pipe = newOneWayPipe(); 2015 2016 auto readPromise = expectRead(*pipe.in, "foo"); 2017 2018 kj::ArrayPtr<const byte> pieces[2] = { "foobar"_kj.asBytes(), "baz"_kj.asBytes() }; 2019 auto writePromise = pipe.out->write(pieces); 2020 2021 readPromise.wait(ws); 2022 KJ_EXPECT(!writePromise.poll(ws)); 2023 expectRead(*pipe.in, "bar").wait(ws); 2024 KJ_EXPECT(!writePromise.poll(ws)); 2025 expectRead(*pipe.in, "baz").wait(ws); 2026 writePromise.wait(ws); 2027 } 2028 2029 KJ_TEST("Userland pipe BlockedRead gets empty tryPumpFrom") { 2030 kj::EventLoop loop; 2031 WaitScope ws(loop); 2032 2033 auto pipe = newOneWayPipe(); 2034 auto pipe2 = newOneWayPipe(); 2035 2036 // First start a read from the back end. 2037 char buffer[4]; 2038 auto readPromise = pipe2.in->tryRead(buffer, 1, 4); 2039 2040 // Now arrange a pump between the pipes, using tryPumpFrom(). 2041 auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); 2042 2043 // Disconnect the front pipe, causing EOF on the pump. 2044 pipe.out = nullptr; 2045 2046 // The pump should have produced zero bytes. 2047 KJ_EXPECT(pumpPromise.wait(ws) == 0); 2048 2049 // The read is incomplete. 2050 KJ_EXPECT(!readPromise.poll(ws)); 2051 2052 // A subsequent write() completes the read. 2053 pipe2.out->write("foo", 3).wait(ws); 2054 KJ_EXPECT(readPromise.wait(ws) == 3); 2055 buffer[3] = '\0'; 2056 KJ_EXPECT(kj::StringPtr(buffer, 3) == "foo"); 2057 } 2058 2059 constexpr static auto TEE_MAX_CHUNK_SIZE = 1 << 14; 2060 // AsyncTee::MAX_CHUNK_SIZE, 16k as of this writing 2061 2062 KJ_TEST("Userland tee") { 2063 kj::EventLoop loop; 2064 WaitScope ws(loop); 2065 2066 auto pipe = newOneWayPipe(); 2067 auto tee = newTee(kj::mv(pipe.in)); 2068 auto left = kj::mv(tee.branches[0]); 2069 auto right = kj::mv(tee.branches[1]); 2070 2071 auto writePromise = pipe.out->write("foobar", 6); 2072 2073 expectRead(*left, "foobar").wait(ws); 2074 writePromise.wait(ws); 2075 expectRead(*right, "foobar").wait(ws); 2076 } 2077 2078 KJ_TEST("Userland nested tee") { 2079 kj::EventLoop loop; 2080 WaitScope ws(loop); 2081 2082 auto pipe = newOneWayPipe(); 2083 auto tee = newTee(kj::mv(pipe.in)); 2084 auto left = kj::mv(tee.branches[0]); 2085 auto right = kj::mv(tee.branches[1]); 2086 2087 auto tee2 = newTee(kj::mv(right)); 2088 auto rightLeft = kj::mv(tee2.branches[0]); 2089 auto rightRight = kj::mv(tee2.branches[1]); 2090 2091 auto writePromise = pipe.out->write("foobar", 6); 2092 2093 expectRead(*left, "foobar").wait(ws); 2094 writePromise.wait(ws); 2095 expectRead(*rightLeft, "foobar").wait(ws); 2096 expectRead(*rightRight, "foo").wait(ws); 2097 2098 auto tee3 = newTee(kj::mv(rightRight)); 2099 auto rightRightLeft = kj::mv(tee3.branches[0]); 2100 auto rightRightRight = kj::mv(tee3.branches[1]); 2101 expectRead(*rightRightLeft, "bar").wait(ws); 2102 expectRead(*rightRightRight, "b").wait(ws); 2103 2104 auto tee4 = newTee(kj::mv(rightRightRight)); 2105 auto rightRightRightLeft = kj::mv(tee4.branches[0]); 2106 auto rightRightRightRight = kj::mv(tee4.branches[1]); 2107 expectRead(*rightRightRightLeft, "ar").wait(ws); 2108 expectRead(*rightRightRightRight, "ar").wait(ws); 2109 } 2110 2111 KJ_TEST("Userland tee concurrent read") { 2112 kj::EventLoop loop; 2113 WaitScope ws(loop); 2114 2115 auto pipe = newOneWayPipe(); 2116 auto tee = newTee(kj::mv(pipe.in)); 2117 auto left = kj::mv(tee.branches[0]); 2118 auto right = kj::mv(tee.branches[1]); 2119 2120 uint8_t leftBuf[6] = { 0 }; 2121 uint8_t rightBuf[6] = { 0 }; 2122 auto leftPromise = left->tryRead(leftBuf, 6, 6); 2123 auto rightPromise = right->tryRead(rightBuf, 6, 6); 2124 KJ_EXPECT(!leftPromise.poll(ws)); 2125 KJ_EXPECT(!rightPromise.poll(ws)); 2126 2127 pipe.out->write("foobar", 6).wait(ws); 2128 2129 KJ_EXPECT(leftPromise.wait(ws) == 6); 2130 KJ_EXPECT(rightPromise.wait(ws) == 6); 2131 2132 KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0); 2133 KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0); 2134 } 2135 2136 KJ_TEST("Userland tee cancel and restart read") { 2137 kj::EventLoop loop; 2138 WaitScope ws(loop); 2139 2140 auto pipe = newOneWayPipe(); 2141 auto tee = newTee(kj::mv(pipe.in)); 2142 auto left = kj::mv(tee.branches[0]); 2143 auto right = kj::mv(tee.branches[1]); 2144 2145 auto writePromise = pipe.out->write("foobar", 6); 2146 2147 { 2148 // Initiate a read and immediately cancel it. 2149 uint8_t buf[6] = { 0 }; 2150 auto promise = left->tryRead(buf, 6, 6); 2151 } 2152 2153 // Subsequent reads still see the full data. 2154 expectRead(*left, "foobar").wait(ws); 2155 writePromise.wait(ws); 2156 expectRead(*right, "foobar").wait(ws); 2157 } 2158 2159 KJ_TEST("Userland tee cancel read and destroy branch then read other branch") { 2160 kj::EventLoop loop; 2161 WaitScope ws(loop); 2162 2163 auto pipe = newOneWayPipe(); 2164 auto tee = newTee(kj::mv(pipe.in)); 2165 auto left = kj::mv(tee.branches[0]); 2166 auto right = kj::mv(tee.branches[1]); 2167 2168 auto writePromise = pipe.out->write("foobar", 6); 2169 2170 { 2171 // Initiate a read and immediately cancel it. 2172 uint8_t buf[6] = { 0 }; 2173 auto promise = left->tryRead(buf, 6, 6); 2174 } 2175 2176 // And destroy the branch for good measure. 2177 left = nullptr; 2178 2179 // Subsequent reads on the other branch still see the full data. 2180 expectRead(*right, "foobar").wait(ws); 2181 writePromise.wait(ws); 2182 } 2183 2184 KJ_TEST("Userland tee subsequent other-branch reads are READY_NOW") { 2185 kj::EventLoop loop; 2186 WaitScope ws(loop); 2187 2188 auto pipe = newOneWayPipe(); 2189 auto tee = newTee(kj::mv(pipe.in)); 2190 auto left = kj::mv(tee.branches[0]); 2191 auto right = kj::mv(tee.branches[1]); 2192 2193 uint8_t leftBuf[6] = { 0 }; 2194 auto leftPromise = left->tryRead(leftBuf, 6, 6); 2195 // This is the first read, so there should NOT be buffered data. 2196 KJ_EXPECT(!leftPromise.poll(ws)); 2197 pipe.out->write("foobar", 6).wait(ws); 2198 leftPromise.wait(ws); 2199 KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0); 2200 2201 uint8_t rightBuf[6] = { 0 }; 2202 auto rightPromise = right->tryRead(rightBuf, 6, 6); 2203 // The left read promise was fulfilled, so there SHOULD be buffered data. 2204 KJ_EXPECT(rightPromise.poll(ws)); 2205 rightPromise.wait(ws); 2206 KJ_EXPECT(memcmp(rightBuf, "foobar", 6) == 0); 2207 } 2208 2209 KJ_TEST("Userland tee read EOF propagation") { 2210 kj::EventLoop loop; 2211 WaitScope ws(loop); 2212 2213 auto pipe = newOneWayPipe(); 2214 auto writePromise = pipe.out->write("foobar", 6); 2215 auto tee = newTee(mv(pipe.in)); 2216 auto left = kj::mv(tee.branches[0]); 2217 auto right = kj::mv(tee.branches[1]); 2218 2219 // Lengthless pipe, so ... 2220 KJ_EXPECT(left->tryGetLength() == nullptr); 2221 KJ_EXPECT(right->tryGetLength() == nullptr); 2222 2223 uint8_t leftBuf[7] = { 0 }; 2224 auto leftPromise = left->tryRead(leftBuf, size(leftBuf), size(leftBuf)); 2225 writePromise.wait(ws); 2226 // Destroying the output side should force a short read. 2227 pipe.out = nullptr; 2228 2229 KJ_EXPECT(leftPromise.wait(ws) == 6); 2230 KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0); 2231 2232 // And we should see a short read here, too. 2233 uint8_t rightBuf[7] = { 0 }; 2234 auto rightPromise = right->tryRead(rightBuf, size(rightBuf), size(rightBuf)); 2235 KJ_EXPECT(rightPromise.wait(ws) == 6); 2236 KJ_EXPECT(memcmp(rightBuf, "foobar", 6) == 0); 2237 2238 // Further reads should all be short. 2239 KJ_EXPECT(left->tryRead(leftBuf, 1, size(leftBuf)).wait(ws) == 0); 2240 KJ_EXPECT(right->tryRead(rightBuf, 1, size(rightBuf)).wait(ws) == 0); 2241 } 2242 2243 KJ_TEST("Userland tee read exception propagation") { 2244 kj::EventLoop loop; 2245 WaitScope ws(loop); 2246 2247 // Make a pipe expecting to read more than we're actually going to write. This will force a "pipe 2248 // ended prematurely" exception when we destroy the output side early. 2249 auto pipe = newOneWayPipe(7); 2250 auto writePromise = pipe.out->write("foobar", 6); 2251 auto tee = newTee(mv(pipe.in)); 2252 auto left = kj::mv(tee.branches[0]); 2253 auto right = kj::mv(tee.branches[1]); 2254 2255 // Test tryGetLength() while we're at it. 2256 KJ_EXPECT(KJ_ASSERT_NONNULL(left->tryGetLength()) == 7); 2257 KJ_EXPECT(KJ_ASSERT_NONNULL(right->tryGetLength()) == 7); 2258 2259 uint8_t leftBuf[7] = { 0 }; 2260 auto leftPromise = left->tryRead(leftBuf, 6, size(leftBuf)); 2261 writePromise.wait(ws); 2262 // Destroying the output side should force a fulfillment of the read (since we reached minBytes). 2263 pipe.out = nullptr; 2264 KJ_EXPECT(leftPromise.wait(ws) == 6); 2265 KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0); 2266 2267 // The next read sees the exception. 2268 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", 2269 left->tryRead(leftBuf, 1, size(leftBuf)).ignoreResult().wait(ws)); 2270 2271 // Test tryGetLength() here -- the unread branch still sees the original length value. 2272 KJ_EXPECT(KJ_ASSERT_NONNULL(left->tryGetLength()) == 1); 2273 KJ_EXPECT(KJ_ASSERT_NONNULL(right->tryGetLength()) == 7); 2274 2275 // We should see the buffered data on the other side, even though we don't reach our minBytes. 2276 uint8_t rightBuf[7] = { 0 }; 2277 auto rightPromise = right->tryRead(rightBuf, size(rightBuf), size(rightBuf)); 2278 KJ_EXPECT(rightPromise.wait(ws) == 6); 2279 KJ_EXPECT(memcmp(rightBuf, "foobar", 6) == 0); 2280 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", 2281 right->tryRead(rightBuf, 1, size(leftBuf)).ignoreResult().wait(ws)); 2282 2283 // Further reads should all see the exception again. 2284 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", 2285 left->tryRead(leftBuf, 1, size(leftBuf)).ignoreResult().wait(ws)); 2286 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", 2287 right->tryRead(rightBuf, 1, size(leftBuf)).ignoreResult().wait(ws)); 2288 } 2289 2290 KJ_TEST("Userland tee read exception propagation w/ data loss") { 2291 kj::EventLoop loop; 2292 WaitScope ws(loop); 2293 2294 // Make a pipe expecting to read more than we're actually going to write. This will force a "pipe 2295 // ended prematurely" exception once the pipe sees a short read. 2296 auto pipe = newOneWayPipe(7); 2297 auto writePromise = pipe.out->write("foobar", 6); 2298 auto tee = newTee(mv(pipe.in)); 2299 auto left = kj::mv(tee.branches[0]); 2300 auto right = kj::mv(tee.branches[1]); 2301 2302 uint8_t leftBuf[7] = { 0 }; 2303 auto leftPromise = left->tryRead(leftBuf, 7, 7); 2304 writePromise.wait(ws); 2305 // Destroying the output side should force an exception, since we didn't reach our minBytes. 2306 pipe.out = nullptr; 2307 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE( 2308 "pipe ended prematurely", leftPromise.ignoreResult().wait(ws)); 2309 2310 // And we should see a short read here, too. In fact, we shouldn't see anything: the short read 2311 // above read all of the pipe's data, but then failed to buffer it because it encountered an 2312 // exception. It buffered the exception, instead. 2313 uint8_t rightBuf[7] = { 0 }; 2314 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", 2315 right->tryRead(rightBuf, 1, 1).ignoreResult().wait(ws)); 2316 } 2317 2318 KJ_TEST("Userland tee read into different buffer sizes") { 2319 kj::EventLoop loop; 2320 WaitScope ws(loop); 2321 2322 auto tee = newTee(heap<MockAsyncInputStream>("foo bar baz"_kj.asBytes(), 11)); 2323 auto left = kj::mv(tee.branches[0]); 2324 auto right = kj::mv(tee.branches[1]); 2325 2326 uint8_t leftBuf[5] = { 0 }; 2327 uint8_t rightBuf[11] = { 0 }; 2328 2329 auto leftPromise = left->tryRead(leftBuf, 5, 5); 2330 auto rightPromise = right->tryRead(rightBuf, 11, 11); 2331 2332 KJ_EXPECT(leftPromise.wait(ws) == 5); 2333 KJ_EXPECT(rightPromise.wait(ws) == 11); 2334 } 2335 2336 KJ_TEST("Userland tee reads see max(minBytes...) and min(maxBytes...)") { 2337 kj::EventLoop loop; 2338 WaitScope ws(loop); 2339 2340 auto tee = newTee(heap<MockAsyncInputStream>("foo bar baz"_kj.asBytes(), 11)); 2341 auto left = kj::mv(tee.branches[0]); 2342 auto right = kj::mv(tee.branches[1]); 2343 2344 { 2345 uint8_t leftBuf[5] = { 0 }; 2346 uint8_t rightBuf[11] = { 0 }; 2347 2348 // Subrange of another range. The smaller maxBytes should win. 2349 auto leftPromise = left->tryRead(leftBuf, 3, 5); 2350 auto rightPromise = right->tryRead(rightBuf, 1, 11); 2351 2352 KJ_EXPECT(leftPromise.wait(ws) == 5); 2353 KJ_EXPECT(rightPromise.wait(ws) == 5); 2354 } 2355 2356 { 2357 uint8_t leftBuf[5] = { 0 }; 2358 uint8_t rightBuf[11] = { 0 }; 2359 2360 // Disjoint ranges. The larger minBytes should win. 2361 auto leftPromise = left->tryRead(leftBuf, 3, 5); 2362 auto rightPromise = right->tryRead(rightBuf, 6, 11); 2363 2364 KJ_EXPECT(leftPromise.wait(ws) == 5); 2365 KJ_EXPECT(rightPromise.wait(ws) == 6); 2366 2367 KJ_EXPECT(left->tryRead(leftBuf, 1, 2).wait(ws) == 1); 2368 } 2369 } 2370 2371 KJ_TEST("Userland tee read stress test") { 2372 kj::EventLoop loop; 2373 WaitScope ws(loop); 2374 2375 auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ","); 2376 2377 auto tee = newTee(heap<MockAsyncInputStream>(bigText.asBytes(), bigText.size())); 2378 auto left = kj::mv(tee.branches[0]); 2379 auto right = kj::mv(tee.branches[1]); 2380 2381 auto leftBuffer = heapArray<byte>(bigText.size()); 2382 2383 { 2384 auto leftSlice = leftBuffer.slice(0, leftBuffer.size()); 2385 while (leftSlice.size() > 0) { 2386 for (size_t blockSize: { 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59 }) { 2387 if (leftSlice.size() == 0) break; 2388 auto maxBytes = min(blockSize, leftSlice.size()); 2389 auto amount = left->tryRead(leftSlice.begin(), 1, maxBytes).wait(ws); 2390 leftSlice = leftSlice.slice(amount, leftSlice.size()); 2391 } 2392 } 2393 } 2394 2395 KJ_EXPECT(memcmp(leftBuffer.begin(), bigText.begin(), leftBuffer.size()) == 0); 2396 KJ_EXPECT(right->readAllText().wait(ws) == bigText); 2397 } 2398 2399 KJ_TEST("Userland tee pump") { 2400 kj::EventLoop loop; 2401 WaitScope ws(loop); 2402 2403 auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ","); 2404 2405 auto tee = newTee(heap<MockAsyncInputStream>(bigText.asBytes(), bigText.size())); 2406 auto left = kj::mv(tee.branches[0]); 2407 auto right = kj::mv(tee.branches[1]); 2408 2409 auto leftPipe = newOneWayPipe(); 2410 auto rightPipe = newOneWayPipe(); 2411 2412 auto leftPumpPromise = left->pumpTo(*leftPipe.out, 7); 2413 KJ_EXPECT(!leftPumpPromise.poll(ws)); 2414 2415 auto rightPumpPromise = right->pumpTo(*rightPipe.out); 2416 // Neither are ready yet, because the left pump's backpressure has blocked the AsyncTee's pull 2417 // loop until we read from leftPipe. 2418 KJ_EXPECT(!leftPumpPromise.poll(ws)); 2419 KJ_EXPECT(!rightPumpPromise.poll(ws)); 2420 2421 expectRead(*leftPipe.in, "foo bar").wait(ws); 2422 KJ_EXPECT(leftPumpPromise.wait(ws) == 7); 2423 KJ_EXPECT(!rightPumpPromise.poll(ws)); 2424 2425 // We should be able to read up to how far the left side pumped, and beyond. The left side will 2426 // now have data in its buffer. 2427 expectRead(*rightPipe.in, "foo bar baz,foo bar baz,foo").wait(ws); 2428 2429 // Consume the left side buffer. 2430 expectRead(*left, " baz,foo bar").wait(ws); 2431 2432 // We can destroy the left branch entirely and the right branch will still see all data. 2433 left = nullptr; 2434 KJ_EXPECT(!rightPumpPromise.poll(ws)); 2435 auto allTextPromise = rightPipe.in->readAllText(); 2436 KJ_EXPECT(rightPumpPromise.wait(ws) == bigText.size()); 2437 // Need to force an EOF in the right pipe to check the result. 2438 rightPipe.out = nullptr; 2439 KJ_EXPECT(allTextPromise.wait(ws) == bigText.slice(27)); 2440 } 2441 2442 KJ_TEST("Userland tee pump slows down reads") { 2443 kj::EventLoop loop; 2444 WaitScope ws(loop); 2445 2446 auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ","); 2447 2448 auto tee = newTee(heap<MockAsyncInputStream>(bigText.asBytes(), bigText.size())); 2449 auto left = kj::mv(tee.branches[0]); 2450 auto right = kj::mv(tee.branches[1]); 2451 2452 auto leftPipe = newOneWayPipe(); 2453 auto leftPumpPromise = left->pumpTo(*leftPipe.out); 2454 KJ_EXPECT(!leftPumpPromise.poll(ws)); 2455 2456 // The left pump will cause some data to be buffered on the right branch, which we can read. 2457 auto rightExpectation0 = kj::str(bigText.slice(0, TEE_MAX_CHUNK_SIZE)); 2458 expectRead(*right, rightExpectation0).wait(ws); 2459 2460 // But the next right branch read is blocked by the left pipe's backpressure. 2461 auto rightExpectation1 = kj::str(bigText.slice(TEE_MAX_CHUNK_SIZE, TEE_MAX_CHUNK_SIZE + 10)); 2462 auto rightPromise = expectRead(*right, rightExpectation1); 2463 KJ_EXPECT(!rightPromise.poll(ws)); 2464 2465 // The right branch read finishes when we relieve the pressure in the left pipe. 2466 auto allTextPromise = leftPipe.in->readAllText(); 2467 rightPromise.wait(ws); 2468 KJ_EXPECT(leftPumpPromise.wait(ws) == bigText.size()); 2469 leftPipe.out = nullptr; 2470 KJ_EXPECT(allTextPromise.wait(ws) == bigText); 2471 } 2472 2473 KJ_TEST("Userland tee pump EOF propagation") { 2474 kj::EventLoop loop; 2475 WaitScope ws(loop); 2476 2477 { 2478 // EOF encountered by two pump operations. 2479 auto pipe = newOneWayPipe(); 2480 auto writePromise = pipe.out->write("foo bar", 7); 2481 auto tee = newTee(mv(pipe.in)); 2482 auto left = kj::mv(tee.branches[0]); 2483 auto right = kj::mv(tee.branches[1]); 2484 2485 auto leftPipe = newOneWayPipe(); 2486 auto rightPipe = newOneWayPipe(); 2487 2488 // Pump the first bit, and block. 2489 2490 auto leftPumpPromise = left->pumpTo(*leftPipe.out); 2491 KJ_EXPECT(!leftPumpPromise.poll(ws)); 2492 auto rightPumpPromise = right->pumpTo(*rightPipe.out); 2493 writePromise.wait(ws); 2494 KJ_EXPECT(!leftPumpPromise.poll(ws)); 2495 KJ_EXPECT(!rightPumpPromise.poll(ws)); 2496 2497 // Induce an EOF. We should see it propagated to both pump promises. 2498 2499 pipe.out = nullptr; 2500 2501 // Relieve backpressure. 2502 auto leftAllPromise = leftPipe.in->readAllText(); 2503 auto rightAllPromise = rightPipe.in->readAllText(); 2504 KJ_EXPECT(leftPumpPromise.wait(ws) == 7); 2505 KJ_EXPECT(rightPumpPromise.wait(ws) == 7); 2506 2507 // Make sure we got the data on the pipes that were being pumped to. 2508 KJ_EXPECT(!leftAllPromise.poll(ws)); 2509 KJ_EXPECT(!rightAllPromise.poll(ws)); 2510 leftPipe.out = nullptr; 2511 rightPipe.out = nullptr; 2512 KJ_EXPECT(leftAllPromise.wait(ws) == "foo bar"); 2513 KJ_EXPECT(rightAllPromise.wait(ws) == "foo bar"); 2514 } 2515 2516 { 2517 // EOF encountered by a read and pump operation. 2518 auto pipe = newOneWayPipe(); 2519 auto writePromise = pipe.out->write("foo bar", 7); 2520 auto tee = newTee(mv(pipe.in)); 2521 auto left = kj::mv(tee.branches[0]); 2522 auto right = kj::mv(tee.branches[1]); 2523 2524 auto leftPipe = newOneWayPipe(); 2525 auto rightPipe = newOneWayPipe(); 2526 2527 // Pump one branch, read another. 2528 2529 auto leftPumpPromise = left->pumpTo(*leftPipe.out); 2530 KJ_EXPECT(!leftPumpPromise.poll(ws)); 2531 expectRead(*right, "foo bar").wait(ws); 2532 writePromise.wait(ws); 2533 uint8_t dummy = 0; 2534 auto rightReadPromise = right->tryRead(&dummy, 1, 1); 2535 2536 // Induce an EOF. We should see it propagated to both the read and pump promises. 2537 2538 pipe.out = nullptr; 2539 2540 // Relieve backpressure in the tee to see the EOF. 2541 auto leftAllPromise = leftPipe.in->readAllText(); 2542 KJ_EXPECT(leftPumpPromise.wait(ws) == 7); 2543 KJ_EXPECT(rightReadPromise.wait(ws) == 0); 2544 2545 // Make sure we got the data on the pipe that was being pumped to. 2546 KJ_EXPECT(!leftAllPromise.poll(ws)); 2547 leftPipe.out = nullptr; 2548 KJ_EXPECT(leftAllPromise.wait(ws) == "foo bar"); 2549 } 2550 } 2551 2552 KJ_TEST("Userland tee pump EOF on chunk boundary") { 2553 kj::EventLoop loop; 2554 WaitScope ws(loop); 2555 2556 auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ","); 2557 2558 // Conjure an EOF right on the boundary of the tee's internal chunk. 2559 auto chunkText = kj::str(bigText.slice(0, TEE_MAX_CHUNK_SIZE)); 2560 auto tee = newTee(heap<MockAsyncInputStream>(chunkText.asBytes(), chunkText.size())); 2561 auto left = kj::mv(tee.branches[0]); 2562 auto right = kj::mv(tee.branches[1]); 2563 2564 auto leftPipe = newOneWayPipe(); 2565 auto rightPipe = newOneWayPipe(); 2566 2567 auto leftPumpPromise = left->pumpTo(*leftPipe.out); 2568 auto rightPumpPromise = right->pumpTo(*rightPipe.out); 2569 KJ_EXPECT(!leftPumpPromise.poll(ws)); 2570 KJ_EXPECT(!rightPumpPromise.poll(ws)); 2571 2572 auto leftAllPromise = leftPipe.in->readAllText(); 2573 auto rightAllPromise = rightPipe.in->readAllText(); 2574 2575 // The pumps should see the EOF and stop. 2576 KJ_EXPECT(leftPumpPromise.wait(ws) == TEE_MAX_CHUNK_SIZE); 2577 KJ_EXPECT(rightPumpPromise.wait(ws) == TEE_MAX_CHUNK_SIZE); 2578 2579 // Verify that we saw the data on the other end of the destination pipes. 2580 leftPipe.out = nullptr; 2581 rightPipe.out = nullptr; 2582 KJ_EXPECT(leftAllPromise.wait(ws) == chunkText); 2583 KJ_EXPECT(rightAllPromise.wait(ws) == chunkText); 2584 } 2585 2586 KJ_TEST("Userland tee pump read exception propagation") { 2587 kj::EventLoop loop; 2588 WaitScope ws(loop); 2589 2590 { 2591 // Exception encountered by two pump operations. 2592 auto pipe = newOneWayPipe(14); 2593 auto writePromise = pipe.out->write("foo bar", 7); 2594 auto tee = newTee(mv(pipe.in)); 2595 auto left = kj::mv(tee.branches[0]); 2596 auto right = kj::mv(tee.branches[1]); 2597 2598 auto leftPipe = newOneWayPipe(); 2599 auto rightPipe = newOneWayPipe(); 2600 2601 // Pump the first bit, and block. 2602 2603 auto leftPumpPromise = left->pumpTo(*leftPipe.out); 2604 KJ_EXPECT(!leftPumpPromise.poll(ws)); 2605 auto rightPumpPromise = right->pumpTo(*rightPipe.out); 2606 writePromise.wait(ws); 2607 KJ_EXPECT(!leftPumpPromise.poll(ws)); 2608 KJ_EXPECT(!rightPumpPromise.poll(ws)); 2609 2610 // Induce a read exception. We should see it propagated to both pump promises. 2611 2612 pipe.out = nullptr; 2613 2614 // Both promises must exist before the backpressure in the tee is relieved, and the tee pull 2615 // loop actually sees the exception. 2616 auto leftAllPromise = leftPipe.in->readAllText(); 2617 auto rightAllPromise = rightPipe.in->readAllText(); 2618 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE( 2619 "pipe ended prematurely", leftPumpPromise.ignoreResult().wait(ws)); 2620 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE( 2621 "pipe ended prematurely", rightPumpPromise.ignoreResult().wait(ws)); 2622 2623 // Make sure we got the data on the destination pipes. 2624 KJ_EXPECT(!leftAllPromise.poll(ws)); 2625 KJ_EXPECT(!rightAllPromise.poll(ws)); 2626 leftPipe.out = nullptr; 2627 rightPipe.out = nullptr; 2628 KJ_EXPECT(leftAllPromise.wait(ws) == "foo bar"); 2629 KJ_EXPECT(rightAllPromise.wait(ws) == "foo bar"); 2630 } 2631 2632 { 2633 // Exception encountered by a read and pump operation. 2634 auto pipe = newOneWayPipe(14); 2635 auto writePromise = pipe.out->write("foo bar", 7); 2636 auto tee = newTee(mv(pipe.in)); 2637 auto left = kj::mv(tee.branches[0]); 2638 auto right = kj::mv(tee.branches[1]); 2639 2640 auto leftPipe = newOneWayPipe(); 2641 auto rightPipe = newOneWayPipe(); 2642 2643 // Pump one branch, read another. 2644 2645 auto leftPumpPromise = left->pumpTo(*leftPipe.out); 2646 KJ_EXPECT(!leftPumpPromise.poll(ws)); 2647 expectRead(*right, "foo bar").wait(ws); 2648 writePromise.wait(ws); 2649 uint8_t dummy = 0; 2650 auto rightReadPromise = right->tryRead(&dummy, 1, 1); 2651 2652 // Induce a read exception. We should see it propagated to both the read and pump promises. 2653 2654 pipe.out = nullptr; 2655 2656 // Relieve backpressure in the tee to see the exceptions. 2657 auto leftAllPromise = leftPipe.in->readAllText(); 2658 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE( 2659 "pipe ended prematurely", leftPumpPromise.ignoreResult().wait(ws)); 2660 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE( 2661 "pipe ended prematurely", rightReadPromise.ignoreResult().wait(ws)); 2662 2663 // Make sure we got the data on the destination pipe. 2664 KJ_EXPECT(!leftAllPromise.poll(ws)); 2665 leftPipe.out = nullptr; 2666 KJ_EXPECT(leftAllPromise.wait(ws) == "foo bar"); 2667 } 2668 } 2669 2670 KJ_TEST("Userland tee pump write exception propagation") { 2671 kj::EventLoop loop; 2672 WaitScope ws(loop); 2673 2674 auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ","); 2675 2676 auto tee = newTee(heap<MockAsyncInputStream>(bigText.asBytes(), bigText.size())); 2677 auto left = kj::mv(tee.branches[0]); 2678 auto right = kj::mv(tee.branches[1]); 2679 2680 // Set up two pumps and let them block. 2681 auto leftPipe = newOneWayPipe(); 2682 auto rightPipe = newOneWayPipe(); 2683 auto leftPumpPromise = left->pumpTo(*leftPipe.out); 2684 auto rightPumpPromise = right->pumpTo(*rightPipe.out); 2685 KJ_EXPECT(!leftPumpPromise.poll(ws)); 2686 KJ_EXPECT(!rightPumpPromise.poll(ws)); 2687 2688 // Induce a write exception in the right branch pump. It should propagate to the right pump 2689 // promise. 2690 rightPipe.in = nullptr; 2691 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE( 2692 "read end of pipe was aborted", rightPumpPromise.ignoreResult().wait(ws)); 2693 2694 // The left pump promise does not see the right branch's write exception. 2695 KJ_EXPECT(!leftPumpPromise.poll(ws)); 2696 auto allTextPromise = leftPipe.in->readAllText(); 2697 KJ_EXPECT(leftPumpPromise.wait(ws) == bigText.size()); 2698 leftPipe.out = nullptr; 2699 KJ_EXPECT(allTextPromise.wait(ws) == bigText); 2700 } 2701 2702 KJ_TEST("Userland tee pump cancellation implies write cancellation") { 2703 kj::EventLoop loop; 2704 WaitScope ws(loop); 2705 2706 auto text = "foo bar baz"_kj; 2707 2708 auto tee = newTee(heap<MockAsyncInputStream>(text.asBytes(), text.size())); 2709 auto left = kj::mv(tee.branches[0]); 2710 auto right = kj::mv(tee.branches[1]); 2711 2712 auto leftPipe = newOneWayPipe(); 2713 auto leftPumpPromise = left->pumpTo(*leftPipe.out); 2714 2715 // Arrange to block the left pump on its write operation. 2716 expectRead(*right, "foo ").wait(ws); 2717 KJ_EXPECT(!leftPumpPromise.poll(ws)); 2718 2719 // Then cancel the pump, while it's still blocked. 2720 leftPumpPromise = nullptr; 2721 // It should cancel its write operations, so it should now be safe to destroy the output stream to 2722 // which it was pumping. 2723 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { 2724 leftPipe.out = nullptr; 2725 })) { 2726 KJ_FAIL_EXPECT("write promises were not canceled", exception); 2727 } 2728 } 2729 2730 KJ_TEST("Userland tee buffer size limit") { 2731 kj::EventLoop loop; 2732 WaitScope ws(loop); 2733 2734 auto text = "foo bar baz"_kj; 2735 2736 { 2737 // We can carefully read data to stay under our ridiculously low limit. 2738 2739 auto tee = newTee(heap<MockAsyncInputStream>(text.asBytes(), text.size()), 2); 2740 auto left = kj::mv(tee.branches[0]); 2741 auto right = kj::mv(tee.branches[1]); 2742 2743 expectRead(*left, "fo").wait(ws); 2744 expectRead(*right, "foo ").wait(ws); 2745 expectRead(*left, "o ba").wait(ws); 2746 expectRead(*right, "bar ").wait(ws); 2747 expectRead(*left, "r ba").wait(ws); 2748 expectRead(*right, "baz").wait(ws); 2749 expectRead(*left, "z").wait(ws); 2750 } 2751 2752 { 2753 // Exceeding the limit causes both branches to see the exception after exhausting their buffers. 2754 2755 auto tee = newTee(heap<MockAsyncInputStream>(text.asBytes(), text.size()), 2); 2756 auto left = kj::mv(tee.branches[0]); 2757 auto right = kj::mv(tee.branches[1]); 2758 2759 expectRead(*left, "fo").wait(ws); 2760 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("tee buffer size limit exceeded", 2761 expectRead(*left, "o").wait(ws)); 2762 expectRead(*right, "fo").wait(ws); 2763 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("tee buffer size limit exceeded", 2764 expectRead(*right, "o").wait(ws)); 2765 } 2766 2767 { 2768 // We guarantee that two pumps started simultaneously will never exceed our buffer size limit. 2769 2770 auto tee = newTee(heap<MockAsyncInputStream>(text.asBytes(), text.size()), 2); 2771 auto left = kj::mv(tee.branches[0]); 2772 auto right = kj::mv(tee.branches[1]); 2773 auto leftPipe = kj::newOneWayPipe(); 2774 auto rightPipe = kj::newOneWayPipe(); 2775 2776 auto leftPumpPromise = left->pumpTo(*leftPipe.out); 2777 auto rightPumpPromise = right->pumpTo(*rightPipe.out); 2778 KJ_EXPECT(!leftPumpPromise.poll(ws)); 2779 KJ_EXPECT(!rightPumpPromise.poll(ws)); 2780 2781 uint8_t leftBuf[11] = { 0 }; 2782 uint8_t rightBuf[11] = { 0 }; 2783 2784 // The first read on the left pipe will succeed. 2785 auto leftPromise = leftPipe.in->tryRead(leftBuf, 1, 11); 2786 KJ_EXPECT(leftPromise.wait(ws) == 2); 2787 KJ_EXPECT(memcmp(leftBuf, text.begin(), 2) == 0); 2788 2789 // But the second will block until we relieve pressure on the right pipe. 2790 leftPromise = leftPipe.in->tryRead(leftBuf + 2, 1, 9); 2791 KJ_EXPECT(!leftPromise.poll(ws)); 2792 2793 // Relieve the right pipe pressure ... 2794 auto rightPromise = rightPipe.in->tryRead(rightBuf, 1, 11); 2795 KJ_EXPECT(rightPromise.wait(ws) == 2); 2796 KJ_EXPECT(memcmp(rightBuf, text.begin(), 2) == 0); 2797 2798 // Now the second left pipe read will complete. 2799 KJ_EXPECT(leftPromise.wait(ws) == 2); 2800 KJ_EXPECT(memcmp(leftBuf, text.begin(), 4) == 0); 2801 2802 // Leapfrog the left branch with the right. There should be 2 bytes in the buffer, so we can 2803 // demand a total of 4. 2804 rightPromise = rightPipe.in->tryRead(rightBuf + 2, 4, 9); 2805 KJ_EXPECT(rightPromise.wait(ws) == 4); 2806 KJ_EXPECT(memcmp(rightBuf, text.begin(), 6) == 0); 2807 2808 // Leapfrog the right with the left. We demand the entire rest of the stream, so this should 2809 // block. Note that a regular read for this amount on one of the tee branches directly would 2810 // exceed our buffer size limit, but this one does not, because we have the pipe to regulate 2811 // backpressure for us. 2812 leftPromise = leftPipe.in->tryRead(leftBuf + 4, 7, 7); 2813 KJ_EXPECT(!leftPromise.poll(ws)); 2814 2815 // Ask for the entire rest of the stream on the right branch and wrap things up. 2816 rightPromise = rightPipe.in->tryRead(rightBuf + 6, 5, 5); 2817 2818 KJ_EXPECT(leftPromise.wait(ws) == 7); 2819 KJ_EXPECT(memcmp(leftBuf, text.begin(), 11) == 0); 2820 2821 KJ_EXPECT(rightPromise.wait(ws) == 5); 2822 KJ_EXPECT(memcmp(rightBuf, text.begin(), 11) == 0); 2823 } 2824 } 2825 2826 KJ_TEST("Userspace OneWayPipe whenWriteDisconnected()") { 2827 kj::EventLoop loop; 2828 WaitScope ws(loop); 2829 2830 auto pipe = newOneWayPipe(); 2831 2832 auto abortedPromise = pipe.out->whenWriteDisconnected(); 2833 KJ_ASSERT(!abortedPromise.poll(ws)); 2834 2835 pipe.in = nullptr; 2836 2837 KJ_ASSERT(abortedPromise.poll(ws)); 2838 abortedPromise.wait(ws); 2839 } 2840 2841 KJ_TEST("Userspace TwoWayPipe whenWriteDisconnected()") { 2842 kj::EventLoop loop; 2843 WaitScope ws(loop); 2844 2845 auto pipe = newTwoWayPipe(); 2846 2847 auto abortedPromise = pipe.ends[0]->whenWriteDisconnected(); 2848 KJ_ASSERT(!abortedPromise.poll(ws)); 2849 2850 pipe.ends[1] = nullptr; 2851 2852 KJ_ASSERT(abortedPromise.poll(ws)); 2853 abortedPromise.wait(ws); 2854 } 2855 2856 #if !_WIN32 // We don't currently support detecting disconnect with IOCP. 2857 #if !__CYGWIN__ // TODO(someday): Figure out why whenWriteDisconnected() doesn't work on Cygwin. 2858 2859 KJ_TEST("OS OneWayPipe whenWriteDisconnected()") { 2860 auto io = setupAsyncIo(); 2861 2862 auto pipe = io.provider->newOneWayPipe(); 2863 2864 pipe.out->write("foo", 3).wait(io.waitScope); 2865 auto abortedPromise = pipe.out->whenWriteDisconnected(); 2866 KJ_ASSERT(!abortedPromise.poll(io.waitScope)); 2867 2868 pipe.in = nullptr; 2869 2870 KJ_ASSERT(abortedPromise.poll(io.waitScope)); 2871 abortedPromise.wait(io.waitScope); 2872 } 2873 2874 KJ_TEST("OS TwoWayPipe whenWriteDisconnected()") { 2875 auto io = setupAsyncIo(); 2876 2877 auto pipe = io.provider->newTwoWayPipe(); 2878 2879 pipe.ends[0]->write("foo", 3).wait(io.waitScope); 2880 pipe.ends[1]->write("bar", 3).wait(io.waitScope); 2881 2882 auto abortedPromise = pipe.ends[0]->whenWriteDisconnected(); 2883 KJ_ASSERT(!abortedPromise.poll(io.waitScope)); 2884 2885 pipe.ends[1] = nullptr; 2886 2887 KJ_ASSERT(abortedPromise.poll(io.waitScope)); 2888 abortedPromise.wait(io.waitScope); 2889 2890 char buffer[4]; 2891 KJ_ASSERT(pipe.ends[0]->tryRead(&buffer, 3, 3).wait(io.waitScope) == 3); 2892 buffer[3] = '\0'; 2893 KJ_EXPECT(buffer == "bar"_kj); 2894 2895 // Note: Reading any further in pipe.ends[0] would throw "connection reset". 2896 } 2897 2898 KJ_TEST("import socket FD that's already broken") { 2899 auto io = setupAsyncIo(); 2900 2901 int fds[2]; 2902 KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds)); 2903 KJ_SYSCALL(write(fds[1], "foo", 3)); 2904 KJ_SYSCALL(close(fds[1])); 2905 2906 auto stream = io.lowLevelProvider->wrapSocketFd(fds[0], LowLevelAsyncIoProvider::TAKE_OWNERSHIP); 2907 2908 auto abortedPromise = stream->whenWriteDisconnected(); 2909 KJ_ASSERT(abortedPromise.poll(io.waitScope)); 2910 abortedPromise.wait(io.waitScope); 2911 2912 char buffer[4]; 2913 KJ_ASSERT(stream->tryRead(&buffer, sizeof(buffer), sizeof(buffer)).wait(io.waitScope) == 3); 2914 buffer[3] = '\0'; 2915 KJ_EXPECT(buffer == "foo"_kj); 2916 } 2917 2918 #endif // !__CYGWIN__ 2919 #endif // !_WIN32 2920 2921 } // namespace 2922 } // namespace kj