capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

async-io-test.c++ (94798B)


      1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #if _WIN32
     23 // Request Vista-level APIs.
     24 #include "win32-api-version.h"
     25 #elif !defined(_GNU_SOURCE)
     26 #define _GNU_SOURCE
     27 #endif
     28 
     29 #include "async-io.h"
     30 #include "async-io-internal.h"
     31 #include "debug.h"
     32 #include "io.h"
     33 #include "miniposix.h"
     34 #include <kj/compat/gtest.h>
     35 #include <kj/time.h>
     36 #include <sys/types.h>
     37 #if _WIN32
     38 #include <ws2tcpip.h>
     39 #include "windows-sanity.h"
     40 #define inet_pton InetPtonA
     41 #define inet_ntop InetNtopA
     42 #else
     43 #include <netdb.h>
     44 #include <unistd.h>
     45 #include <fcntl.h>
     46 #include <sys/socket.h>
     47 #include <arpa/inet.h>
     48 #include <netinet/in.h>
     49 #endif
     50 
     51 namespace kj {
     52 namespace {
     53 
     54 TEST(AsyncIo, SimpleNetwork) {
     55   auto ioContext = setupAsyncIo();
     56   auto& network = ioContext.provider->getNetwork();
     57 
     58   Own<ConnectionReceiver> listener;
     59   Own<AsyncIoStream> server;
     60   Own<AsyncIoStream> client;
     61 
     62   char receiveBuffer[4];
     63 
     64   auto port = newPromiseAndFulfiller<uint>();
     65 
     66   port.promise.then([&](uint portnum) {
     67     return network.parseAddress("localhost", portnum);
     68   }).then([&](Own<NetworkAddress>&& result) {
     69     return result->connect();
     70   }).then([&](Own<AsyncIoStream>&& result) {
     71     client = kj::mv(result);
     72     return client->write("foo", 3);
     73   }).detach([](kj::Exception&& exception) {
     74     KJ_FAIL_EXPECT(exception);
     75   });
     76 
     77   kj::String result = network.parseAddress("*").then([&](Own<NetworkAddress>&& result) {
     78     listener = result->listen();
     79     port.fulfiller->fulfill(listener->getPort());
     80     return listener->accept();
     81   }).then([&](Own<AsyncIoStream>&& result) {
     82     server = kj::mv(result);
     83     return server->tryRead(receiveBuffer, 3, 4);
     84   }).then([&](size_t n) {
     85     EXPECT_EQ(3u, n);
     86     return heapString(receiveBuffer, n);
     87   }).wait(ioContext.waitScope);
     88 
     89   EXPECT_EQ("foo", result);
     90 }
     91 
     92 #if !_WIN32  // TODO(0.10): Implement NetworkPeerIdentity for Win32.
     93 TEST(AsyncIo, SimpleNetworkAuthentication) {
     94   auto ioContext = setupAsyncIo();
     95   auto& network = ioContext.provider->getNetwork();
     96 
     97   Own<ConnectionReceiver> listener;
     98   Own<AsyncIoStream> server;
     99   Own<AsyncIoStream> client;
    100 
    101   char receiveBuffer[4];
    102 
    103   auto port = newPromiseAndFulfiller<uint>();
    104 
    105   port.promise.then([&](uint portnum) {
    106     return network.parseAddress("localhost", portnum);
    107   }).then([&](Own<NetworkAddress>&& addr) {
    108     auto promise = addr->connectAuthenticated();
    109     return promise.then([&,addr=kj::mv(addr)](AuthenticatedStream result) mutable {
    110       auto id = result.peerIdentity.downcast<NetworkPeerIdentity>();
    111 
    112       // `addr` was resolved from `localhost` and may contain multiple addresses, but
    113       // result.peerIdentity tells us the specific address that was used. So it should be one
    114       // of the ones on the list, but only one.
    115       KJ_EXPECT(strstr(addr->toString().cStr(), id->getAddress().toString().cStr()) != nullptr);
    116       KJ_EXPECT(id->getAddress().toString().findFirst(',') == nullptr);
    117 
    118       client = kj::mv(result.stream);
    119 
    120       // `id` should match client->getpeername().
    121       union {
    122         struct sockaddr generic;
    123         struct sockaddr_in ip4;
    124         struct sockaddr_in6 ip6;
    125       } rawAddr;
    126       uint len = sizeof(rawAddr);
    127       client->getpeername(&rawAddr.generic, &len);
    128       auto peername = network.getSockaddr(&rawAddr.generic, len);
    129       KJ_EXPECT(id->toString() == peername->toString());
    130 
    131       return client->write("foo", 3);
    132     });
    133   }).detach([](kj::Exception&& exception) {
    134     KJ_FAIL_EXPECT(exception);
    135   });
    136 
    137   kj::String result = network.parseAddress("*").then([&](Own<NetworkAddress>&& result) {
    138     listener = result->listen();
    139     port.fulfiller->fulfill(listener->getPort());
    140     return listener->acceptAuthenticated();
    141   }).then([&](AuthenticatedStream result) {
    142     auto id = result.peerIdentity.downcast<NetworkPeerIdentity>();
    143     server = kj::mv(result.stream);
    144 
    145     // `id` should match server->getpeername().
    146     union {
    147       struct sockaddr generic;
    148       struct sockaddr_in ip4;
    149       struct sockaddr_in6 ip6;
    150     } addr;
    151     uint len = sizeof(addr);
    152     server->getpeername(&addr.generic, &len);
    153     auto peername = network.getSockaddr(&addr.generic, len);
    154     KJ_EXPECT(id->toString() == peername->toString());
    155 
    156     return server->tryRead(receiveBuffer, 3, 4);
    157   }).then([&](size_t n) {
    158     EXPECT_EQ(3u, n);
    159     return heapString(receiveBuffer, n);
    160   }).wait(ioContext.waitScope);
    161 
    162   EXPECT_EQ("foo", result);
    163 }
    164 #endif
    165 
    166 #if !_WIN32 && !__CYGWIN__  // TODO(someday): Debug why this deadlocks on Cygwin.
    167 
    168 #if __ANDROID__
    169 #define TMPDIR "/data/local/tmp"
    170 #else
    171 #define TMPDIR "/tmp"
    172 #endif
    173 
    174 TEST(AsyncIo, UnixSocket) {
    175   auto ioContext = setupAsyncIo();
    176   auto& network = ioContext.provider->getNetwork();
    177 
    178   auto path = kj::str(TMPDIR "/kj-async-io-test.", getpid());
    179   KJ_DEFER(unlink(path.cStr()));
    180 
    181   Own<ConnectionReceiver> listener;
    182   Own<AsyncIoStream> server;
    183   Own<AsyncIoStream> client;
    184 
    185   char receiveBuffer[4];
    186 
    187   auto ready = newPromiseAndFulfiller<void>();
    188 
    189   ready.promise.then([&]() {
    190     return network.parseAddress(kj::str("unix:", path));
    191   }).then([&](Own<NetworkAddress>&& addr) {
    192     auto promise = addr->connectAuthenticated();
    193     return promise.then([&,addr=kj::mv(addr)](AuthenticatedStream result) mutable {
    194       auto id = result.peerIdentity.downcast<LocalPeerIdentity>();
    195       auto creds = id->getCredentials();
    196       KJ_IF_MAYBE(p, creds.pid) {
    197         KJ_EXPECT(*p == getpid());
    198 #if __linux__ || __APPLE__
    199       } else {
    200         KJ_FAIL_EXPECT("LocalPeerIdentity for unix socket had null PID");
    201 #endif
    202       }
    203       KJ_IF_MAYBE(u, creds.uid) {
    204         KJ_EXPECT(*u == getuid());
    205       } else {
    206         KJ_FAIL_EXPECT("LocalPeerIdentity for unix socket had null UID");
    207       }
    208 
    209       client = kj::mv(result.stream);
    210       return client->write("foo", 3);
    211     });
    212   }).detach([](kj::Exception&& exception) {
    213     KJ_FAIL_EXPECT(exception);
    214   });
    215 
    216   kj::String result = network.parseAddress(kj::str("unix:", path))
    217       .then([&](Own<NetworkAddress>&& result) {
    218     listener = result->listen();
    219     ready.fulfiller->fulfill();
    220     return listener->acceptAuthenticated();
    221   }).then([&](AuthenticatedStream result) {
    222     auto id = result.peerIdentity.downcast<LocalPeerIdentity>();
    223     auto creds = id->getCredentials();
    224     KJ_IF_MAYBE(p, creds.pid) {
    225       KJ_EXPECT(*p == getpid());
    226 #if __linux__ || __APPLE__
    227     } else {
    228       KJ_FAIL_EXPECT("LocalPeerIdentity for unix socket had null PID");
    229 #endif
    230     }
    231     KJ_IF_MAYBE(u, creds.uid) {
    232       KJ_EXPECT(*u == getuid());
    233     } else {
    234       KJ_FAIL_EXPECT("LocalPeerIdentity for unix socket had null UID");
    235     }
    236 
    237     server = kj::mv(result.stream);
    238     return server->tryRead(receiveBuffer, 3, 4);
    239   }).then([&](size_t n) {
    240     EXPECT_EQ(3u, n);
    241     return heapString(receiveBuffer, n);
    242   }).wait(ioContext.waitScope);
    243 
    244   EXPECT_EQ("foo", result);
    245 }
    246 
    247 TEST(AsyncIo, AncillaryMessageHandlerNoMsg) {
    248   auto ioContext = setupAsyncIo();
    249   auto& network = ioContext.provider->getNetwork();
    250 
    251   Own<ConnectionReceiver> listener;
    252   Own<AsyncIoStream> server;
    253   Own<AsyncIoStream> client;
    254 
    255   char receiveBuffer[4];
    256 
    257   bool clientHandlerCalled = false;
    258   kj::Function<void(kj::ArrayPtr<AncillaryMessage>)> clientHandler =
    259     [&](kj::ArrayPtr<AncillaryMessage>) {
    260     clientHandlerCalled = true;
    261   };
    262   bool serverHandlerCalled = false;
    263   kj::Function<void(kj::ArrayPtr<AncillaryMessage>)> serverHandler =
    264     [&](kj::ArrayPtr<AncillaryMessage>) {
    265     serverHandlerCalled = true;
    266   };
    267 
    268   auto port = newPromiseAndFulfiller<uint>();
    269 
    270   port.promise.then([&](uint portnum) {
    271     return network.parseAddress("localhost", portnum);
    272   }).then([&](Own<NetworkAddress>&& addr) {
    273     auto promise = addr->connectAuthenticated();
    274     return promise.then([&,addr=kj::mv(addr)](AuthenticatedStream result) mutable {
    275       client = kj::mv(result.stream);
    276       client->registerAncillaryMessageHandler(kj::mv(clientHandler));
    277       return client->write("foo", 3);
    278     });
    279   }).detach([](kj::Exception&& exception) {
    280     KJ_FAIL_EXPECT(exception);
    281   });
    282 
    283   kj::String result = network.parseAddress("*").then([&](Own<NetworkAddress>&& result) {
    284     listener = result->listen();
    285     port.fulfiller->fulfill(listener->getPort());
    286     return listener->acceptAuthenticated();
    287   }).then([&](AuthenticatedStream result) {
    288     server = kj::mv(result.stream);
    289     server->registerAncillaryMessageHandler(kj::mv(serverHandler));
    290     return server->tryRead(receiveBuffer, 3, 4);
    291   }).then([&](size_t n) {
    292     EXPECT_EQ(3u, n);
    293     return heapString(receiveBuffer, n);
    294   }).wait(ioContext.waitScope);
    295 
    296   EXPECT_EQ("foo", result);
    297   EXPECT_FALSE(clientHandlerCalled);
    298   EXPECT_FALSE(serverHandlerCalled);
    299 }
    300 #endif
    301 
    302 // This test uses SO_TIMESTAMP on a SOCK_STREAM, which is only supported by Linux. Ideally we'd
    303 // rewrite the test to use some other message type that is widely supported on streams. But for
    304 // now we just limit the test to Linux. Also, it doesn't work on Android for some reason, and it
    305 // isn't worth investigating, so we skip it there.
    306 #if __linux__ && !__ANDROID__
    307 TEST(AsyncIo, AncillaryMessageHandler) {
    308   auto ioContext = setupAsyncIo();
    309   auto& network = ioContext.provider->getNetwork();
    310 
    311   Own<ConnectionReceiver> listener;
    312   Own<AsyncIoStream> server;
    313   Own<AsyncIoStream> client;
    314 
    315   char receiveBuffer[4];
    316 
    317   bool clientHandlerCalled = false;
    318   kj::Function<void(kj::ArrayPtr<AncillaryMessage>)> clientHandler =
    319     [&](kj::ArrayPtr<AncillaryMessage>) {
    320     clientHandlerCalled = true;
    321   };
    322   bool serverHandlerCalled = false;
    323   kj::Function<void(kj::ArrayPtr<AncillaryMessage>)> serverHandler =
    324     [&](kj::ArrayPtr<AncillaryMessage> msgs) {
    325     serverHandlerCalled = true;
    326     EXPECT_EQ(1, msgs.size());
    327     EXPECT_EQ(SOL_SOCKET, msgs[0].getLevel());
    328     EXPECT_EQ(SO_TIMESTAMP, msgs[0].getType());
    329   };
    330 
    331   auto port = newPromiseAndFulfiller<uint>();
    332 
    333   port.promise.then([&](uint portnum) {
    334     return network.parseAddress("localhost", portnum);
    335   }).then([&](Own<NetworkAddress>&& addr) {
    336     auto promise = addr->connectAuthenticated();
    337     return promise.then([&,addr=kj::mv(addr)](AuthenticatedStream result) mutable {
    338       client = kj::mv(result.stream);
    339       client->registerAncillaryMessageHandler(kj::mv(clientHandler));
    340       return client->write("foo", 3);
    341     });
    342   }).detach([](kj::Exception&& exception) {
    343     KJ_FAIL_EXPECT(exception);
    344   });
    345 
    346   kj::String result = network.parseAddress("*").then([&](Own<NetworkAddress>&& result) {
    347     listener = result->listen();
    348     // Register interest in having the timestamp delivered via cmsg on each recvmsg.
    349     int yes = 1;
    350     listener->setsockopt(SOL_SOCKET, SO_TIMESTAMP, &yes, sizeof(yes));
    351     port.fulfiller->fulfill(listener->getPort());
    352     return listener->acceptAuthenticated();
    353   }).then([&](AuthenticatedStream result) {
    354     server = kj::mv(result.stream);
    355     server->registerAncillaryMessageHandler(kj::mv(serverHandler));
    356     return server->tryRead(receiveBuffer, 3, 4);
    357   }).then([&](size_t n) {
    358     EXPECT_EQ(3u, n);
    359     return heapString(receiveBuffer, n);
    360   }).wait(ioContext.waitScope);
    361 
    362   EXPECT_EQ("foo", result);
    363   EXPECT_FALSE(clientHandlerCalled);
    364   EXPECT_TRUE(serverHandlerCalled);
    365 }
    366 #endif
    367 
    368 String tryParse(WaitScope& waitScope, Network& network, StringPtr text, uint portHint = 0) {
    369   return network.parseAddress(text, portHint).wait(waitScope)->toString();
    370 }
    371 
    372 bool systemSupportsAddress(StringPtr addr, StringPtr service = nullptr) {
    373   // Can getaddrinfo() parse this addresses? This is only true if the address family (e.g., ipv6)
    374   // is configured on at least one interface. (The loopback interface usually has both ipv4 and
    375   // ipv6 configured, but not always.)
    376   struct addrinfo* list;
    377   int status = getaddrinfo(
    378       addr.cStr(), service == nullptr ? nullptr : service.cStr(), nullptr, &list);
    379   if (status == 0) {
    380     freeaddrinfo(list);
    381     return true;
    382   } else {
    383     return false;
    384   }
    385 }
    386 
    387 TEST(AsyncIo, AddressParsing) {
    388   auto ioContext = setupAsyncIo();
    389   auto& w = ioContext.waitScope;
    390   auto& network = ioContext.provider->getNetwork();
    391 
    392   EXPECT_EQ("*:0", tryParse(w, network, "*"));
    393   EXPECT_EQ("*:123", tryParse(w, network, "*:123"));
    394   EXPECT_EQ("0.0.0.0:0", tryParse(w, network, "0.0.0.0"));
    395   EXPECT_EQ("1.2.3.4:5678", tryParse(w, network, "1.2.3.4", 5678));
    396 
    397 #if !_WIN32
    398   EXPECT_EQ("unix:foo/bar/baz", tryParse(w, network, "unix:foo/bar/baz"));
    399   EXPECT_EQ("unix-abstract:foo/bar/baz", tryParse(w, network, "unix-abstract:foo/bar/baz"));
    400 #endif
    401 
    402   // We can parse services by name...
    403   //
    404   // For some reason, Android and some various Linux distros do not support service names.
    405   if (systemSupportsAddress("1.2.3.4", "http")) {
    406     EXPECT_EQ("1.2.3.4:80", tryParse(w, network, "1.2.3.4:http", 5678));
    407     EXPECT_EQ("*:80", tryParse(w, network, "*:http", 5678));
    408   } else {
    409     KJ_LOG(WARNING, "system does not support resolving service names on ipv4; skipping tests");
    410   }
    411 
    412   // IPv6 tests. Annoyingly, these don't work on machines that don't have IPv6 configured on any
    413   // interfaces.
    414   if (systemSupportsAddress("::")) {
    415     EXPECT_EQ("[::]:123", tryParse(w, network, "0::0", 123));
    416     EXPECT_EQ("[12ab:cd::34]:321", tryParse(w, network, "[12ab:cd:0::0:34]:321", 432));
    417     if (systemSupportsAddress("12ab:cd::34", "http")) {
    418       EXPECT_EQ("[::]:80", tryParse(w, network, "[::]:http", 5678));
    419       EXPECT_EQ("[12ab:cd::34]:80", tryParse(w, network, "[12ab:cd::34]:http", 5678));
    420     } else {
    421       KJ_LOG(WARNING, "system does not support resolving service names on ipv6; skipping tests");
    422     }
    423   } else {
    424     KJ_LOG(WARNING, "system does not support ipv6; skipping tests");
    425   }
    426 
    427   // It would be nice to test DNS lookup here but the test would not be very hermetic.  Even
    428   // localhost can map to different addresses depending on whether IPv6 is enabled.  We do
    429   // connect to "localhost" in a different test, though.
    430 }
    431 
    432 TEST(AsyncIo, OneWayPipe) {
    433   auto ioContext = setupAsyncIo();
    434 
    435   auto pipe = ioContext.provider->newOneWayPipe();
    436   char receiveBuffer[4];
    437 
    438   pipe.out->write("foo", 3).detach([](kj::Exception&& exception) {
    439     KJ_FAIL_EXPECT(exception);
    440   });
    441 
    442   kj::String result = pipe.in->tryRead(receiveBuffer, 3, 4).then([&](size_t n) {
    443     EXPECT_EQ(3u, n);
    444     return heapString(receiveBuffer, n);
    445   }).wait(ioContext.waitScope);
    446 
    447   EXPECT_EQ("foo", result);
    448 }
    449 
    450 TEST(AsyncIo, TwoWayPipe) {
    451   auto ioContext = setupAsyncIo();
    452 
    453   auto pipe = ioContext.provider->newTwoWayPipe();
    454   char receiveBuffer1[4];
    455   char receiveBuffer2[4];
    456 
    457   auto promise = pipe.ends[0]->write("foo", 3).then([&]() {
    458     return pipe.ends[0]->tryRead(receiveBuffer1, 3, 4);
    459   }).then([&](size_t n) {
    460     EXPECT_EQ(3u, n);
    461     return heapString(receiveBuffer1, n);
    462   });
    463 
    464   kj::String result = pipe.ends[1]->write("bar", 3).then([&]() {
    465     return pipe.ends[1]->tryRead(receiveBuffer2, 3, 4);
    466   }).then([&](size_t n) {
    467     EXPECT_EQ(3u, n);
    468     return heapString(receiveBuffer2, n);
    469   }).wait(ioContext.waitScope);
    470 
    471   kj::String result2 = promise.wait(ioContext.waitScope);
    472 
    473   EXPECT_EQ("foo", result);
    474   EXPECT_EQ("bar", result2);
    475 }
    476 
    477 TEST(AsyncIo, InMemoryCapabilityPipe) {
    478   EventLoop loop;
    479   WaitScope waitScope(loop);
    480 
    481   auto pipe = newCapabilityPipe();
    482   auto pipe2 = newCapabilityPipe();
    483   char receiveBuffer1[4];
    484   char receiveBuffer2[4];
    485 
    486   // Expect to receive a stream, then read "foo" from it, then write "bar" to it.
    487   Own<AsyncCapabilityStream> receivedStream;
    488   auto promise = pipe2.ends[1]->receiveStream()
    489       .then([&](Own<AsyncCapabilityStream> stream) {
    490     receivedStream = kj::mv(stream);
    491     return receivedStream->tryRead(receiveBuffer2, 3, 4);
    492   }).then([&](size_t n) {
    493     EXPECT_EQ(3u, n);
    494     return receivedStream->write("bar", 3).then([&receiveBuffer2,n]() {
    495       return heapString(receiveBuffer2, n);
    496     });
    497   });
    498 
    499   // Send a stream, then write "foo" to the other end of the sent stream, then receive "bar"
    500   // from it.
    501   kj::String result = pipe2.ends[0]->sendStream(kj::mv(pipe.ends[1]))
    502       .then([&]() {
    503     return pipe.ends[0]->write("foo", 3);
    504   }).then([&]() {
    505     return pipe.ends[0]->tryRead(receiveBuffer1, 3, 4);
    506   }).then([&](size_t n) {
    507     EXPECT_EQ(3u, n);
    508     return heapString(receiveBuffer1, n);
    509   }).wait(waitScope);
    510 
    511   kj::String result2 = promise.wait(waitScope);
    512 
    513   EXPECT_EQ("bar", result);
    514   EXPECT_EQ("foo", result2);
    515 }
    516 
    517 #if !_WIN32 && !__CYGWIN__
    518 TEST(AsyncIo, CapabilityPipe) {
    519   auto ioContext = setupAsyncIo();
    520 
    521   auto pipe = ioContext.provider->newCapabilityPipe();
    522   auto pipe2 = ioContext.provider->newCapabilityPipe();
    523   char receiveBuffer1[4];
    524   char receiveBuffer2[4];
    525 
    526   // Expect to receive a stream, then write "bar" to it, then receive "foo" from it.
    527   Own<AsyncCapabilityStream> receivedStream;
    528   auto promise = pipe2.ends[1]->receiveStream()
    529       .then([&](Own<AsyncCapabilityStream> stream) {
    530     receivedStream = kj::mv(stream);
    531     return receivedStream->write("bar", 3);
    532   }).then([&]() {
    533     return receivedStream->tryRead(receiveBuffer2, 3, 4);
    534   }).then([&](size_t n) {
    535     EXPECT_EQ(3u, n);
    536     return heapString(receiveBuffer2, n);
    537   });
    538 
    539   // Send a stream, then write "foo" to the other end of the sent stream, then receive "bar"
    540   // from it.
    541   kj::String result = pipe2.ends[0]->sendStream(kj::mv(pipe.ends[1]))
    542       .then([&]() {
    543     return pipe.ends[0]->write("foo", 3);
    544   }).then([&]() {
    545     return pipe.ends[0]->tryRead(receiveBuffer1, 3, 4);
    546   }).then([&](size_t n) {
    547     EXPECT_EQ(3u, n);
    548     return heapString(receiveBuffer1, n);
    549   }).wait(ioContext.waitScope);
    550 
    551   kj::String result2 = promise.wait(ioContext.waitScope);
    552 
    553   EXPECT_EQ("bar", result);
    554   EXPECT_EQ("foo", result2);
    555 }
    556 
    557 TEST(AsyncIo, CapabilityPipeBlockedSendStream) {
    558   // Check for a bug that existed at one point where if a sendStream() call couldn't complete
    559   // immediately, it would fail.
    560 
    561   auto io = setupAsyncIo();
    562 
    563   auto pipe = io.provider->newCapabilityPipe();
    564 
    565   Promise<void> promise = nullptr;
    566   Own<AsyncIoStream> endpoint1;
    567   uint nonBlockedCount = 0;
    568   for (;;) {
    569     auto pipe2 = io.provider->newCapabilityPipe();
    570     promise = pipe.ends[0]->sendStream(kj::mv(pipe2.ends[0]));
    571     if (promise.poll(io.waitScope)) {
    572       // Send completed immediately, because there was enough space in the stream.
    573       ++nonBlockedCount;
    574       promise.wait(io.waitScope);
    575     } else {
    576       // Send blocked! Let's continue with this promise then!
    577       endpoint1 = kj::mv(pipe2.ends[1]);
    578       break;
    579     }
    580   }
    581 
    582   for (uint i KJ_UNUSED: kj::zeroTo(nonBlockedCount)) {
    583     // Receive and ignore all the streams that were sent without blocking.
    584     pipe.ends[1]->receiveStream().wait(io.waitScope);
    585   }
    586 
    587   // Now that write that blocked should have been able to complete.
    588   promise.wait(io.waitScope);
    589 
    590   // Now get the one that blocked.
    591   auto endpoint2 = pipe.ends[1]->receiveStream().wait(io.waitScope);
    592 
    593   endpoint1->write("foo", 3).wait(io.waitScope);
    594   endpoint1->shutdownWrite();
    595   KJ_EXPECT(endpoint2->readAllText().wait(io.waitScope) == "foo");
    596 }
    597 
    598 TEST(AsyncIo, CapabilityPipeMultiStreamMessage) {
    599   auto ioContext = setupAsyncIo();
    600 
    601   auto pipe = ioContext.provider->newCapabilityPipe();
    602   auto pipe2 = ioContext.provider->newCapabilityPipe();
    603   auto pipe3 = ioContext.provider->newCapabilityPipe();
    604 
    605   auto streams = heapArrayBuilder<Own<AsyncCapabilityStream>>(2);
    606   streams.add(kj::mv(pipe2.ends[0]));
    607   streams.add(kj::mv(pipe3.ends[0]));
    608 
    609   ArrayPtr<const byte> secondBuf = "bar"_kj.asBytes();
    610   pipe.ends[0]->writeWithStreams("foo"_kj.asBytes(), arrayPtr(&secondBuf, 1), streams.finish())
    611       .wait(ioContext.waitScope);
    612 
    613   char receiveBuffer[7];
    614   Own<AsyncCapabilityStream> receiveStreams[3];
    615   auto result = pipe.ends[1]->tryReadWithStreams(receiveBuffer, 6, 7, receiveStreams, 3)
    616       .wait(ioContext.waitScope);
    617 
    618   KJ_EXPECT(result.byteCount == 6);
    619   receiveBuffer[6] = '\0';
    620   KJ_EXPECT(kj::StringPtr(receiveBuffer) == "foobar");
    621 
    622   KJ_ASSERT(result.capCount == 2);
    623 
    624   receiveStreams[0]->write("baz", 3).wait(ioContext.waitScope);
    625   receiveStreams[0] = nullptr;
    626   KJ_EXPECT(pipe2.ends[1]->readAllText().wait(ioContext.waitScope) == "baz");
    627 
    628   pipe3.ends[1]->write("qux", 3).wait(ioContext.waitScope);
    629   pipe3.ends[1] = nullptr;
    630   KJ_EXPECT(receiveStreams[1]->readAllText().wait(ioContext.waitScope) == "qux");
    631 }
    632 
    633 TEST(AsyncIo, ScmRightsTruncatedOdd) {
    634   // Test that if we send two FDs over a unix socket, but the receiving end only receives one, we
    635   // don't leak the other FD.
    636 
    637   auto io = setupAsyncIo();
    638 
    639   auto capPipe = io.provider->newCapabilityPipe();
    640 
    641   int pipeFds[2];
    642   KJ_SYSCALL(miniposix::pipe(pipeFds));
    643   kj::AutoCloseFd in1(pipeFds[0]);
    644   kj::AutoCloseFd out1(pipeFds[1]);
    645 
    646   KJ_SYSCALL(miniposix::pipe(pipeFds));
    647   kj::AutoCloseFd in2(pipeFds[0]);
    648   kj::AutoCloseFd out2(pipeFds[1]);
    649 
    650   {
    651     AutoCloseFd sendFds[2] = { kj::mv(out1), kj::mv(out2) };
    652     capPipe.ends[0]->writeWithFds("foo"_kj.asBytes(), nullptr, sendFds).wait(io.waitScope);
    653   }
    654 
    655   {
    656     char buffer[4];
    657     AutoCloseFd fdBuffer[1];
    658     auto result = capPipe.ends[1]->tryReadWithFds(buffer, 3, 3, fdBuffer, 1).wait(io.waitScope);
    659     KJ_ASSERT(result.capCount == 1);
    660     kj::FdOutputStream(fdBuffer[0].get()).write("bar", 3);
    661   }
    662 
    663   // We want to carefully verify that out1 and out2 were closed, without deadlocking if they
    664   // weren't. So we manually set nonblocking mode and then issue read()s.
    665   KJ_SYSCALL(fcntl(in1, F_SETFL, O_NONBLOCK));
    666   KJ_SYSCALL(fcntl(in2, F_SETFL, O_NONBLOCK));
    667 
    668   char buffer[4];
    669   ssize_t n;
    670 
    671   // First we read "bar" from in1.
    672   KJ_NONBLOCKING_SYSCALL(n = read(in1, buffer, 4));
    673   KJ_ASSERT(n == 3);
    674   buffer[3] = '\0';
    675   KJ_ASSERT(kj::StringPtr(buffer) == "bar");
    676 
    677   // Now it should be EOF.
    678   KJ_NONBLOCKING_SYSCALL(n = read(in1, buffer, 4));
    679   if (n < 0) {
    680     KJ_FAIL_ASSERT("out1 was not closed");
    681   }
    682   KJ_ASSERT(n == 0);
    683 
    684   // Second pipe should have been closed implicitly because we didn't provide space to receive it.
    685   KJ_NONBLOCKING_SYSCALL(n = read(in2, buffer, 4));
    686   if (n < 0) {
    687     KJ_FAIL_ASSERT("out2 was not closed. This could indicate that your operating system kernel is "
    688         "buggy and leaks file descriptors when an SCM_RIGHTS message is truncated. FreeBSD was "
    689         "known to do this until late 2018, while MacOS still has this bug as of this writing in "
    690         "2019. However, KJ works around the problem on those platforms. You need to enable the "
    691         "same work-around for your OS -- search for 'SCM_RIGHTS' in src/kj/async-io-unix.c++.");
    692   }
    693   KJ_ASSERT(n == 0);
    694 }
    695 
    696 #if !__aarch64__
    697 // This test fails under qemu-user, probably due to a bug in qemu's syscall emulation rather than
    698 // a bug in the kernel. We don't have a good way to detect qemu so we just skip the test on aarch64
    699 // in general.
    700 
    701 TEST(AsyncIo, ScmRightsTruncatedEven) {
    702   // Test that if we send three FDs over a unix socket, but the receiving end only receives two, we
    703   // don't leak the third FD. This is different from the send-two-receive-one case in that
    704   // CMSG_SPACE() on many systems rounds up such that there is always space for an even number of
    705   // FDs. In that case the other test only verifies that our userspace code to close unwanted FDs
    706   // is correct, whereas *this* test really verifies that the *kernel* properly closes truncated
    707   // FDs.
    708 
    709   auto io = setupAsyncIo();
    710 
    711   auto capPipe = io.provider->newCapabilityPipe();
    712 
    713   int pipeFds[2];
    714   KJ_SYSCALL(miniposix::pipe(pipeFds));
    715   kj::AutoCloseFd in1(pipeFds[0]);
    716   kj::AutoCloseFd out1(pipeFds[1]);
    717 
    718   KJ_SYSCALL(miniposix::pipe(pipeFds));
    719   kj::AutoCloseFd in2(pipeFds[0]);
    720   kj::AutoCloseFd out2(pipeFds[1]);
    721 
    722   KJ_SYSCALL(miniposix::pipe(pipeFds));
    723   kj::AutoCloseFd in3(pipeFds[0]);
    724   kj::AutoCloseFd out3(pipeFds[1]);
    725 
    726   {
    727     AutoCloseFd sendFds[3] = { kj::mv(out1), kj::mv(out2), kj::mv(out3) };
    728     capPipe.ends[0]->writeWithFds("foo"_kj.asBytes(), nullptr, sendFds).wait(io.waitScope);
    729   }
    730 
    731   {
    732     char buffer[4];
    733     AutoCloseFd fdBuffer[2];
    734     auto result = capPipe.ends[1]->tryReadWithFds(buffer, 3, 3, fdBuffer, 2).wait(io.waitScope);
    735     KJ_ASSERT(result.capCount == 2);
    736     kj::FdOutputStream(fdBuffer[0].get()).write("bar", 3);
    737     kj::FdOutputStream(fdBuffer[1].get()).write("baz", 3);
    738   }
    739 
    740   // We want to carefully verify that out1, out2, and out3 were closed, without deadlocking if they
    741   // weren't. So we manually set nonblocking mode and then issue read()s.
    742   KJ_SYSCALL(fcntl(in1, F_SETFL, O_NONBLOCK));
    743   KJ_SYSCALL(fcntl(in2, F_SETFL, O_NONBLOCK));
    744   KJ_SYSCALL(fcntl(in3, F_SETFL, O_NONBLOCK));
    745 
    746   char buffer[4];
    747   ssize_t n;
    748 
    749   // First we read "bar" from in1.
    750   KJ_NONBLOCKING_SYSCALL(n = read(in1, buffer, 4));
    751   KJ_ASSERT(n == 3);
    752   buffer[3] = '\0';
    753   KJ_ASSERT(kj::StringPtr(buffer) == "bar");
    754 
    755   // Now it should be EOF.
    756   KJ_NONBLOCKING_SYSCALL(n = read(in1, buffer, 4));
    757   if (n < 0) {
    758     KJ_FAIL_ASSERT("out1 was not closed");
    759   }
    760   KJ_ASSERT(n == 0);
    761 
    762   // Next we read "baz" from in2.
    763   KJ_NONBLOCKING_SYSCALL(n = read(in2, buffer, 4));
    764   KJ_ASSERT(n == 3);
    765   buffer[3] = '\0';
    766   KJ_ASSERT(kj::StringPtr(buffer) == "baz");
    767 
    768   // Now it should be EOF.
    769   KJ_NONBLOCKING_SYSCALL(n = read(in2, buffer, 4));
    770   if (n < 0) {
    771     KJ_FAIL_ASSERT("out2 was not closed");
    772   }
    773   KJ_ASSERT(n == 0);
    774 
    775   // Third pipe should have been closed implicitly because we didn't provide space to receive it.
    776   KJ_NONBLOCKING_SYSCALL(n = read(in3, buffer, 4));
    777   if (n < 0) {
    778     KJ_FAIL_ASSERT("out3 was not closed. This could indicate that your operating system kernel is "
    779         "buggy and leaks file descriptors when an SCM_RIGHTS message is truncated. FreeBSD was "
    780         "known to do this until late 2018, while MacOS still has this bug as of this writing in "
    781         "2019. However, KJ works around the problem on those platforms. You need to enable the "
    782         "same work-around for your OS -- search for 'SCM_RIGHTS' in src/kj/async-io-unix.c++.");
    783   }
    784   KJ_ASSERT(n == 0);
    785 }
    786 
    787 #endif  // !__aarch64__
    788 
    789 #endif  // !_WIN32 && !__CYGWIN__
    790 
    791 TEST(AsyncIo, PipeThread) {
    792   auto ioContext = setupAsyncIo();
    793 
    794   auto pipeThread = ioContext.provider->newPipeThread(
    795       [](AsyncIoProvider& ioProvider, AsyncIoStream& stream, WaitScope& waitScope) {
    796     char buf[4];
    797     stream.write("foo", 3).wait(waitScope);
    798     EXPECT_EQ(3u, stream.tryRead(buf, 3, 4).wait(waitScope));
    799     EXPECT_EQ("bar", heapString(buf, 3));
    800 
    801     // Expect disconnect.
    802     EXPECT_EQ(0, stream.tryRead(buf, 1, 1).wait(waitScope));
    803   });
    804 
    805   char buf[4];
    806   pipeThread.pipe->write("bar", 3).wait(ioContext.waitScope);
    807   EXPECT_EQ(3u, pipeThread.pipe->tryRead(buf, 3, 4).wait(ioContext.waitScope));
    808   EXPECT_EQ("foo", heapString(buf, 3));
    809 }
    810 
    811 TEST(AsyncIo, PipeThreadDisconnects) {
    812   // Like above, but in this case we expect the main thread to detect the pipe thread disconnecting.
    813 
    814   auto ioContext = setupAsyncIo();
    815 
    816   auto pipeThread = ioContext.provider->newPipeThread(
    817       [](AsyncIoProvider& ioProvider, AsyncIoStream& stream, WaitScope& waitScope) {
    818     char buf[4];
    819     stream.write("foo", 3).wait(waitScope);
    820     EXPECT_EQ(3u, stream.tryRead(buf, 3, 4).wait(waitScope));
    821     EXPECT_EQ("bar", heapString(buf, 3));
    822   });
    823 
    824   char buf[4];
    825   EXPECT_EQ(3u, pipeThread.pipe->tryRead(buf, 3, 4).wait(ioContext.waitScope));
    826   EXPECT_EQ("foo", heapString(buf, 3));
    827 
    828   pipeThread.pipe->write("bar", 3).wait(ioContext.waitScope);
    829 
    830   // Expect disconnect.
    831   EXPECT_EQ(0, pipeThread.pipe->tryRead(buf, 1, 1).wait(ioContext.waitScope));
    832 }
    833 
    834 TEST(AsyncIo, Timeouts) {
    835   auto ioContext = setupAsyncIo();
    836 
    837   Timer& timer = ioContext.provider->getTimer();
    838 
    839   auto promise1 = timer.timeoutAfter(10 * MILLISECONDS, kj::Promise<void>(kj::NEVER_DONE));
    840   auto promise2 = timer.timeoutAfter(100 * MILLISECONDS, kj::Promise<int>(123));
    841 
    842   EXPECT_TRUE(promise1.then([]() { return false; }, [](kj::Exception&& e) { return true; })
    843       .wait(ioContext.waitScope));
    844   EXPECT_EQ(123, promise2.wait(ioContext.waitScope));
    845 }
    846 
    847 #if !_WIN32  // datagrams not implemented on win32 yet
    848 
    849 bool isMsgTruncBroken() {
    850   // Detect if the kernel fails to set MSG_TRUNC on recvmsg(). This seems to be the case at least
    851   // when running an arm64 binary under qemu.
    852 
    853   int fd;
    854   KJ_SYSCALL(fd = socket(AF_INET, SOCK_DGRAM, 0));
    855   KJ_DEFER(close(fd));
    856 
    857   struct sockaddr_in addr;
    858   memset(&addr, 0, sizeof(addr));
    859   addr.sin_family = AF_INET;
    860   addr.sin_addr.s_addr = htonl(0x7f000001);
    861   KJ_SYSCALL(bind(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)));
    862 
    863   // Read back the assigned port.
    864   socklen_t len = sizeof(addr);
    865   KJ_SYSCALL(getsockname(fd, reinterpret_cast<struct sockaddr*>(&addr), &len));
    866   KJ_ASSERT(len == sizeof(addr));
    867 
    868   const char* message = "foobar";
    869   KJ_SYSCALL(sendto(fd, message, strlen(message), 0,
    870       reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)));
    871 
    872   char buf[4];
    873   struct iovec iov;
    874   iov.iov_base = buf;
    875   iov.iov_len = 3;
    876   struct msghdr msg;
    877   memset(&msg, 0, sizeof(msg));
    878   msg.msg_iov = &iov;
    879   msg.msg_iovlen = 1;
    880   ssize_t n;
    881   KJ_SYSCALL(n = recvmsg(fd, &msg, 0));
    882   KJ_ASSERT(n == 3);
    883 
    884   buf[3] = 0;
    885   KJ_ASSERT(kj::StringPtr(buf) == "foo");
    886 
    887   return (msg.msg_flags & MSG_TRUNC) == 0;
    888 }
    889 
    890 TEST(AsyncIo, Udp) {
    891   bool msgTruncBroken = isMsgTruncBroken();
    892 
    893   auto ioContext = setupAsyncIo();
    894 
    895   auto addr = ioContext.provider->getNetwork().parseAddress("127.0.0.1").wait(ioContext.waitScope);
    896 
    897   auto port1 = addr->bindDatagramPort();
    898   auto port2 = addr->bindDatagramPort();
    899 
    900   auto addr1 = ioContext.provider->getNetwork().parseAddress("127.0.0.1", port1->getPort())
    901       .wait(ioContext.waitScope);
    902   auto addr2 = ioContext.provider->getNetwork().parseAddress("127.0.0.1", port2->getPort())
    903       .wait(ioContext.waitScope);
    904 
    905   Own<NetworkAddress> receivedAddr;
    906 
    907   {
    908     // Send a message and receive it.
    909     EXPECT_EQ(3, port1->send("foo", 3, *addr2).wait(ioContext.waitScope));
    910     auto receiver = port2->makeReceiver();
    911 
    912     receiver->receive().wait(ioContext.waitScope);
    913     {
    914       auto content = receiver->getContent();
    915       EXPECT_EQ("foo", kj::heapString(content.value.asChars()));
    916       EXPECT_FALSE(content.isTruncated);
    917     }
    918     receivedAddr = receiver->getSource().clone();
    919     EXPECT_EQ(addr1->toString(), receivedAddr->toString());
    920     {
    921       auto ancillary = receiver->getAncillary();
    922       EXPECT_EQ(0, ancillary.value.size());
    923       EXPECT_FALSE(ancillary.isTruncated);
    924     }
    925 
    926     // Receive a second message with the same receiver.
    927     {
    928       auto promise = receiver->receive();  // This time, start receiving before sending
    929       EXPECT_EQ(6, port1->send("barbaz", 6, *addr2).wait(ioContext.waitScope));
    930       promise.wait(ioContext.waitScope);
    931       auto content = receiver->getContent();
    932       EXPECT_EQ("barbaz", kj::heapString(content.value.asChars()));
    933       EXPECT_FALSE(content.isTruncated);
    934     }
    935   }
    936 
    937   DatagramReceiver::Capacity capacity;
    938   capacity.content = 8;
    939   capacity.ancillary = 1024;
    940 
    941   {
    942     // Send a reply that will be truncated.
    943     EXPECT_EQ(16, port2->send("0123456789abcdef", 16, *receivedAddr).wait(ioContext.waitScope));
    944     auto recv1 = port1->makeReceiver(capacity);
    945 
    946     recv1->receive().wait(ioContext.waitScope);
    947     {
    948       auto content = recv1->getContent();
    949       EXPECT_EQ("01234567", kj::heapString(content.value.asChars()));
    950       EXPECT_TRUE(content.isTruncated || msgTruncBroken);
    951     }
    952     EXPECT_EQ(addr2->toString(), recv1->getSource().toString());
    953     {
    954       auto ancillary = recv1->getAncillary();
    955       EXPECT_EQ(0, ancillary.value.size());
    956       EXPECT_FALSE(ancillary.isTruncated);
    957     }
    958 
    959 #if defined(IP_PKTINFO) && !__CYGWIN__ && !__aarch64__
    960     // Set IP_PKTINFO header and try to receive it.
    961     //
    962     // Doesn't work on Cygwin; see: https://cygwin.com/ml/cygwin/2009-01/msg00350.html
    963     // TODO(someday): Might work on more-recent Cygwin; I'm still testing against 1.7.
    964     //
    965     // Doesn't work when running arm64 binaries under QEMU -- in fact, it crashes QEMU. We don't
    966     // have a good way to test if we're under QEMU so we just skip this test on aarch64.
    967     int one = 1;
    968     port1->setsockopt(IPPROTO_IP, IP_PKTINFO, &one, sizeof(one));
    969 
    970     EXPECT_EQ(3, port2->send("foo", 3, *addr1).wait(ioContext.waitScope));
    971 
    972     recv1->receive().wait(ioContext.waitScope);
    973     {
    974       auto content = recv1->getContent();
    975       EXPECT_EQ("foo", kj::heapString(content.value.asChars()));
    976       EXPECT_FALSE(content.isTruncated);
    977     }
    978     EXPECT_EQ(addr2->toString(), recv1->getSource().toString());
    979     {
    980       auto ancillary = recv1->getAncillary();
    981       EXPECT_FALSE(ancillary.isTruncated);
    982       ASSERT_EQ(1, ancillary.value.size());
    983 
    984       auto message = ancillary.value[0];
    985       EXPECT_EQ(IPPROTO_IP, message.getLevel());
    986       EXPECT_EQ(IP_PKTINFO, message.getType());
    987       EXPECT_EQ(sizeof(struct in_pktinfo), message.asArray<byte>().size());
    988       auto& pktinfo = KJ_ASSERT_NONNULL(message.as<struct in_pktinfo>());
    989       EXPECT_EQ(htonl(0x7F000001), pktinfo.ipi_addr.s_addr);  // 127.0.0.1
    990     }
    991 
    992     // See what happens if there's not quite enough space for in_pktinfo.
    993     capacity.ancillary = CMSG_SPACE(sizeof(struct in_pktinfo)) - 8;
    994     recv1 = port1->makeReceiver(capacity);
    995 
    996     EXPECT_EQ(3, port2->send("bar", 3, *addr1).wait(ioContext.waitScope));
    997 
    998     recv1->receive().wait(ioContext.waitScope);
    999     {
   1000       auto content = recv1->getContent();
   1001       EXPECT_EQ("bar", kj::heapString(content.value.asChars()));
   1002       EXPECT_FALSE(content.isTruncated);
   1003     }
   1004     EXPECT_EQ(addr2->toString(), recv1->getSource().toString());
   1005     {
   1006       auto ancillary = recv1->getAncillary();
   1007       EXPECT_TRUE(ancillary.isTruncated || msgTruncBroken);
   1008 
   1009       // We might get a message, but it will be truncated.
   1010       if (ancillary.value.size() != 0) {
   1011         EXPECT_EQ(1, ancillary.value.size());
   1012 
   1013         auto message = ancillary.value[0];
   1014         EXPECT_EQ(IPPROTO_IP, message.getLevel());
   1015         EXPECT_EQ(IP_PKTINFO, message.getType());
   1016 
   1017         EXPECT_TRUE(message.as<struct in_pktinfo>() == nullptr);
   1018         EXPECT_LT(message.asArray<byte>().size(), sizeof(struct in_pktinfo));
   1019       }
   1020     }
   1021 
   1022 #if __APPLE__
   1023 // On MacOS, `CMSG_SPACE(0)` triggers a bogus warning.
   1024 #pragma GCC diagnostic ignored "-Wnull-pointer-arithmetic"
   1025 #endif
   1026     // See what happens if there's not enough space even for the cmsghdr.
   1027     capacity.ancillary = CMSG_SPACE(0) - 8;
   1028     recv1 = port1->makeReceiver(capacity);
   1029 
   1030     EXPECT_EQ(3, port2->send("baz", 3, *addr1).wait(ioContext.waitScope));
   1031 
   1032     recv1->receive().wait(ioContext.waitScope);
   1033     {
   1034       auto content = recv1->getContent();
   1035       EXPECT_EQ("baz", kj::heapString(content.value.asChars()));
   1036       EXPECT_FALSE(content.isTruncated);
   1037     }
   1038     EXPECT_EQ(addr2->toString(), recv1->getSource().toString());
   1039     {
   1040       auto ancillary = recv1->getAncillary();
   1041       EXPECT_TRUE(ancillary.isTruncated);
   1042       EXPECT_EQ(0, ancillary.value.size());
   1043     }
   1044 #endif
   1045   }
   1046 }
   1047 
   1048 #endif  // !_WIN32
   1049 
   1050 #ifdef __linux__  // Abstract unix sockets are only supported on Linux
   1051 
   1052 TEST(AsyncIo, AbstractUnixSocket) {
   1053   auto ioContext = setupAsyncIo();
   1054   auto& network = ioContext.provider->getNetwork();
   1055   auto elapsedSinceEpoch = systemPreciseMonotonicClock().now() - kj::origin<TimePoint>();
   1056   auto address = kj::str("unix-abstract:foo", getpid(), elapsedSinceEpoch / kj::NANOSECONDS);
   1057 
   1058   Own<NetworkAddress> addr = network.parseAddress(address).wait(ioContext.waitScope);
   1059 
   1060   Own<ConnectionReceiver> listener = addr->listen();
   1061   // chdir proves no filesystem dependence. Test fails for regular unix socket
   1062   // but passes for abstract unix socket.
   1063   int originalDirFd;
   1064   KJ_SYSCALL(originalDirFd = open(".", O_RDONLY | O_DIRECTORY | O_CLOEXEC));
   1065   KJ_DEFER(close(originalDirFd));
   1066   KJ_SYSCALL(chdir("/"));
   1067   KJ_DEFER(KJ_SYSCALL(fchdir(originalDirFd)));
   1068 
   1069   addr->connect().attach(kj::mv(listener)).wait(ioContext.waitScope);
   1070 }
   1071 
   1072 #endif  // __linux__
   1073 
   1074 KJ_TEST("CIDR parsing") {
   1075   KJ_EXPECT(_::CidrRange("1.2.3.4/16").toString() == "1.2.0.0/16");
   1076   KJ_EXPECT(_::CidrRange("1.2.255.4/18").toString() == "1.2.192.0/18");
   1077   KJ_EXPECT(_::CidrRange("1234::abcd:ffff:ffff/98").toString() == "1234::abcd:c000:0/98");
   1078 
   1079   KJ_EXPECT(_::CidrRange::inet4({1,2,255,4}, 18).toString() == "1.2.192.0/18");
   1080   KJ_EXPECT(_::CidrRange::inet6({0x1234, 0x5678}, {0xabcd, 0xffff, 0xffff}, 98).toString() ==
   1081             "1234:5678::abcd:c000:0/98");
   1082 
   1083   union {
   1084     struct sockaddr addr;
   1085     struct sockaddr_in addr4;
   1086     struct sockaddr_in6 addr6;
   1087   };
   1088   memset(&addr6, 0, sizeof(addr6));
   1089 
   1090   {
   1091     addr4.sin_family = AF_INET;
   1092     addr4.sin_addr.s_addr = htonl(0x0102dfff);
   1093     KJ_EXPECT(_::CidrRange("1.2.255.255/18").matches(&addr));
   1094     KJ_EXPECT(!_::CidrRange("1.2.255.255/19").matches(&addr));
   1095     KJ_EXPECT(_::CidrRange("1.2.0.0/16").matches(&addr));
   1096     KJ_EXPECT(!_::CidrRange("1.3.0.0/16").matches(&addr));
   1097     KJ_EXPECT(_::CidrRange("1.2.223.255/32").matches(&addr));
   1098     KJ_EXPECT(_::CidrRange("0.0.0.0/0").matches(&addr));
   1099     KJ_EXPECT(!_::CidrRange("::/0").matches(&addr));
   1100   }
   1101 
   1102   {
   1103     addr4.sin_family = AF_INET6;
   1104     byte bytes[16] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
   1105     memcpy(addr6.sin6_addr.s6_addr, bytes, 16);
   1106     KJ_EXPECT(_::CidrRange("0102:03ff::/24").matches(&addr));
   1107     KJ_EXPECT(!_::CidrRange("0102:02ff::/24").matches(&addr));
   1108     KJ_EXPECT(_::CidrRange("0102:02ff::/23").matches(&addr));
   1109     KJ_EXPECT(_::CidrRange("0102:0304:0506:0708:090a:0b0c:0d0e:0f10/128").matches(&addr));
   1110     KJ_EXPECT(_::CidrRange("::/0").matches(&addr));
   1111     KJ_EXPECT(!_::CidrRange("0.0.0.0/0").matches(&addr));
   1112   }
   1113 
   1114   {
   1115     addr4.sin_family = AF_INET6;
   1116     inet_pton(AF_INET6, "::ffff:1.2.223.255", &addr6.sin6_addr);
   1117     KJ_EXPECT(_::CidrRange("1.2.255.255/18").matches(&addr));
   1118     KJ_EXPECT(!_::CidrRange("1.2.255.255/19").matches(&addr));
   1119     KJ_EXPECT(_::CidrRange("1.2.0.0/16").matches(&addr));
   1120     KJ_EXPECT(!_::CidrRange("1.3.0.0/16").matches(&addr));
   1121     KJ_EXPECT(_::CidrRange("1.2.223.255/32").matches(&addr));
   1122     KJ_EXPECT(_::CidrRange("0.0.0.0/0").matches(&addr));
   1123     KJ_EXPECT(_::CidrRange("::/0").matches(&addr));
   1124   }
   1125 }
   1126 
   1127 bool allowed4(_::NetworkFilter& filter, StringPtr addrStr) {
   1128   struct sockaddr_in addr;
   1129   memset(&addr, 0, sizeof(addr));
   1130   addr.sin_family = AF_INET;
   1131   inet_pton(AF_INET, addrStr.cStr(), &addr.sin_addr);
   1132   return filter.shouldAllow(reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr));
   1133 }
   1134 
   1135 bool allowed6(_::NetworkFilter& filter, StringPtr addrStr) {
   1136   struct sockaddr_in6 addr;
   1137   memset(&addr, 0, sizeof(addr));
   1138   addr.sin6_family = AF_INET6;
   1139   inet_pton(AF_INET6, addrStr.cStr(), &addr.sin6_addr);
   1140   return filter.shouldAllow(reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr));
   1141 }
   1142 
   1143 KJ_TEST("NetworkFilter") {
   1144   _::NetworkFilter base;
   1145 
   1146   KJ_EXPECT(allowed4(base, "8.8.8.8"));
   1147   KJ_EXPECT(!allowed4(base, "240.1.2.3"));
   1148 
   1149   {
   1150     _::NetworkFilter filter({"public"}, {}, base);
   1151 
   1152     KJ_EXPECT(allowed4(filter, "8.8.8.8"));
   1153     KJ_EXPECT(!allowed4(filter, "240.1.2.3"));
   1154 
   1155     KJ_EXPECT(!allowed4(filter, "192.168.0.1"));
   1156     KJ_EXPECT(!allowed4(filter, "10.1.2.3"));
   1157     KJ_EXPECT(!allowed4(filter, "127.0.0.1"));
   1158     KJ_EXPECT(!allowed4(filter, "0.0.0.0"));
   1159 
   1160     KJ_EXPECT(allowed6(filter, "2400:cb00:2048:1::c629:d7a2"));
   1161     KJ_EXPECT(!allowed6(filter, "fc00::1234"));
   1162     KJ_EXPECT(!allowed6(filter, "::1"));
   1163     KJ_EXPECT(!allowed6(filter, "::"));
   1164   }
   1165 
   1166   {
   1167     _::NetworkFilter filter({"private"}, {"local"}, base);
   1168 
   1169     KJ_EXPECT(!allowed4(filter, "8.8.8.8"));
   1170     KJ_EXPECT(!allowed4(filter, "240.1.2.3"));
   1171 
   1172     KJ_EXPECT(allowed4(filter, "192.168.0.1"));
   1173     KJ_EXPECT(allowed4(filter, "10.1.2.3"));
   1174     KJ_EXPECT(!allowed4(filter, "127.0.0.1"));
   1175     KJ_EXPECT(!allowed4(filter, "0.0.0.0"));
   1176 
   1177     KJ_EXPECT(!allowed6(filter, "2400:cb00:2048:1::c629:d7a2"));
   1178     KJ_EXPECT(allowed6(filter, "fc00::1234"));
   1179     KJ_EXPECT(!allowed6(filter, "::1"));
   1180     KJ_EXPECT(!allowed6(filter, "::"));
   1181   }
   1182 
   1183   {
   1184     _::NetworkFilter filter({"1.0.0.0/8", "1.2.3.0/24"}, {"1.2.0.0/16", "1.2.3.4/32"}, base);
   1185 
   1186     KJ_EXPECT(!allowed4(filter, "8.8.8.8"));
   1187     KJ_EXPECT(!allowed4(filter, "240.1.2.3"));
   1188 
   1189     KJ_EXPECT(allowed4(filter, "1.0.0.1"));
   1190     KJ_EXPECT(!allowed4(filter, "1.2.2.1"));
   1191     KJ_EXPECT(allowed4(filter, "1.2.3.1"));
   1192     KJ_EXPECT(!allowed4(filter, "1.2.3.4"));
   1193   }
   1194 }
   1195 
   1196 KJ_TEST("Network::restrictPeers()") {
   1197   auto ioContext = setupAsyncIo();
   1198   auto& w = ioContext.waitScope;
   1199   auto& network = ioContext.provider->getNetwork();
   1200   auto restrictedNetwork = network.restrictPeers({"public"});
   1201 
   1202   KJ_EXPECT(tryParse(w, *restrictedNetwork, "8.8.8.8") == "8.8.8.8:0");
   1203 #if !_WIN32
   1204   KJ_EXPECT_THROW_MESSAGE("restrictPeers", tryParse(w, *restrictedNetwork, "unix:/foo"));
   1205 #endif
   1206 
   1207   auto addr = restrictedNetwork->parseAddress("127.0.0.1").wait(w);
   1208 
   1209   auto listener = addr->listen();
   1210   auto acceptTask = listener->accept()
   1211       .then([](kj::Own<kj::AsyncIoStream>) {
   1212     KJ_FAIL_EXPECT("should not have received connection");
   1213   }).eagerlyEvaluate(nullptr);
   1214 
   1215   KJ_EXPECT_THROW_MESSAGE("restrictPeers", addr->connect().wait(w));
   1216 
   1217   // We can connect to the listener but the connection will be immediately closed.
   1218   auto addr2 = network.parseAddress("127.0.0.1", listener->getPort()).wait(w);
   1219   auto conn = addr2->connect().wait(w);
   1220   KJ_EXPECT(conn->readAllText().wait(w) == "");
   1221 }
   1222 
   1223 kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) {
   1224   if (expected.size() == 0) return kj::READY_NOW;
   1225 
   1226   auto buffer = kj::heapArray<char>(expected.size());
   1227 
   1228   auto promise = in.tryRead(buffer.begin(), 1, buffer.size());
   1229   return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<char> buffer, size_t amount) {
   1230     if (amount == 0) {
   1231       KJ_FAIL_ASSERT("expected data never sent", expected);
   1232     }
   1233 
   1234     auto actual = buffer.slice(0, amount);
   1235     if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) {
   1236       KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual);
   1237     }
   1238 
   1239     return expectRead(in, expected.slice(amount));
   1240   }));
   1241 }
   1242 
   1243 class MockAsyncInputStream final: public AsyncInputStream {
   1244 public:
   1245   MockAsyncInputStream(kj::ArrayPtr<const byte> bytes, size_t blockSize)
   1246       : bytes(bytes), blockSize(blockSize) {}
   1247 
   1248   kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
   1249     // Clamp max read to blockSize.
   1250     size_t n = kj::min(blockSize, maxBytes);
   1251 
   1252     // Unless that's less than minBytes -- in which case, use minBytes.
   1253     n = kj::max(n, minBytes);
   1254 
   1255     // But also don't read more data than we have.
   1256     n = kj::min(n, bytes.size());
   1257 
   1258     memcpy(buffer, bytes.begin(), n);
   1259     bytes = bytes.slice(n, bytes.size());
   1260     return n;
   1261   }
   1262 
   1263 private:
   1264   kj::ArrayPtr<const byte> bytes;
   1265   size_t blockSize;
   1266 };
   1267 
   1268 KJ_TEST("AsyncInputStream::readAllText() / readAllBytes()") {
   1269   kj::EventLoop loop;
   1270   WaitScope ws(loop);
   1271 
   1272   auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ",");
   1273   size_t inputSizes[] = { 0, 1, 256, 4096, 8191, 8192, 8193, 10000, bigText.size() };
   1274   size_t blockSizes[] = { 1, 4, 256, 4096, 8192, bigText.size() };
   1275   uint64_t limits[] = {
   1276     0, 1, 256,
   1277     bigText.size() / 2,
   1278     bigText.size() - 1,
   1279     bigText.size(),
   1280     bigText.size() + 1,
   1281     kj::maxValue
   1282   };
   1283 
   1284   for (size_t inputSize: inputSizes) {
   1285     for (size_t blockSize: blockSizes) {
   1286       for (uint64_t limit: limits) {
   1287         KJ_CONTEXT(inputSize, blockSize, limit);
   1288         auto textSlice = bigText.asBytes().slice(0, inputSize);
   1289         auto readAllText = [&]() {
   1290           MockAsyncInputStream input(textSlice, blockSize);
   1291           return input.readAllText(limit).wait(ws);
   1292         };
   1293         auto readAllBytes = [&]() {
   1294           MockAsyncInputStream input(textSlice, blockSize);
   1295           return input.readAllBytes(limit).wait(ws);
   1296         };
   1297         if (limit > inputSize) {
   1298           KJ_EXPECT(readAllText().asBytes() == textSlice);
   1299           KJ_EXPECT(readAllBytes() == textSlice);
   1300         } else {
   1301           KJ_EXPECT_THROW_MESSAGE("Reached limit before EOF.", readAllText());
   1302           KJ_EXPECT_THROW_MESSAGE("Reached limit before EOF.", readAllBytes());
   1303         }
   1304       }
   1305     }
   1306   }
   1307 }
   1308 
   1309 KJ_TEST("Userland pipe") {
   1310   kj::EventLoop loop;
   1311   WaitScope ws(loop);
   1312 
   1313   auto pipe = newOneWayPipe();
   1314 
   1315   auto promise = pipe.out->write("foo", 3);
   1316   KJ_EXPECT(!promise.poll(ws));
   1317 
   1318   char buf[4];
   1319   KJ_EXPECT(pipe.in->tryRead(buf, 1, 4).wait(ws) == 3);
   1320   buf[3] = '\0';
   1321   KJ_EXPECT(buf == "foo"_kj);
   1322 
   1323   promise.wait(ws);
   1324 
   1325   auto promise2 = pipe.in->readAllText();
   1326   KJ_EXPECT(!promise2.poll(ws));
   1327 
   1328   pipe.out = nullptr;
   1329   KJ_EXPECT(promise2.wait(ws) == "");
   1330 }
   1331 
   1332 KJ_TEST("Userland pipe cancel write") {
   1333   kj::EventLoop loop;
   1334   WaitScope ws(loop);
   1335 
   1336   auto pipe = newOneWayPipe();
   1337 
   1338   auto promise = pipe.out->write("foobar", 6);
   1339   KJ_EXPECT(!promise.poll(ws));
   1340 
   1341   expectRead(*pipe.in, "foo").wait(ws);
   1342   KJ_EXPECT(!promise.poll(ws));
   1343   promise = nullptr;
   1344 
   1345   promise = pipe.out->write("baz", 3);
   1346   expectRead(*pipe.in, "baz").wait(ws);
   1347   promise.wait(ws);
   1348 
   1349   pipe.out = nullptr;
   1350   KJ_EXPECT(pipe.in->readAllText().wait(ws) == "");
   1351 }
   1352 
   1353 KJ_TEST("Userland pipe cancel read") {
   1354   kj::EventLoop loop;
   1355   WaitScope ws(loop);
   1356 
   1357   auto pipe = newOneWayPipe();
   1358 
   1359   auto writeOp = pipe.out->write("foo", 3);
   1360   auto readOp = expectRead(*pipe.in, "foobar");
   1361   writeOp.wait(ws);
   1362   KJ_EXPECT(!readOp.poll(ws));
   1363   readOp = nullptr;
   1364 
   1365   auto writeOp2 = pipe.out->write("baz", 3);
   1366   expectRead(*pipe.in, "baz").wait(ws);
   1367 }
   1368 
   1369 KJ_TEST("Userland pipe pumpTo") {
   1370   kj::EventLoop loop;
   1371   WaitScope ws(loop);
   1372 
   1373   auto pipe = newOneWayPipe();
   1374   auto pipe2 = newOneWayPipe();
   1375   auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
   1376 
   1377   auto promise = pipe.out->write("foo", 3);
   1378   KJ_EXPECT(!promise.poll(ws));
   1379 
   1380   expectRead(*pipe2.in, "foo").wait(ws);
   1381 
   1382   promise.wait(ws);
   1383 
   1384   auto promise2 = pipe2.in->readAllText();
   1385   KJ_EXPECT(!promise2.poll(ws));
   1386 
   1387   pipe.out = nullptr;
   1388   KJ_EXPECT(pumpPromise.wait(ws) == 3);
   1389 }
   1390 
   1391 KJ_TEST("Userland pipe tryPumpFrom") {
   1392   kj::EventLoop loop;
   1393   WaitScope ws(loop);
   1394 
   1395   auto pipe = newOneWayPipe();
   1396   auto pipe2 = newOneWayPipe();
   1397   auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
   1398 
   1399   auto promise = pipe.out->write("foo", 3);
   1400   KJ_EXPECT(!promise.poll(ws));
   1401 
   1402   expectRead(*pipe2.in, "foo").wait(ws);
   1403 
   1404   promise.wait(ws);
   1405 
   1406   auto promise2 = pipe2.in->readAllText();
   1407   KJ_EXPECT(!promise2.poll(ws));
   1408 
   1409   pipe.out = nullptr;
   1410   KJ_EXPECT(!promise2.poll(ws));
   1411   KJ_EXPECT(pumpPromise.wait(ws) == 3);
   1412 }
   1413 
   1414 KJ_TEST("Userland pipe pumpTo cancel") {
   1415   kj::EventLoop loop;
   1416   WaitScope ws(loop);
   1417 
   1418   auto pipe = newOneWayPipe();
   1419   auto pipe2 = newOneWayPipe();
   1420   auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
   1421 
   1422   auto promise = pipe.out->write("foobar", 3);
   1423   KJ_EXPECT(!promise.poll(ws));
   1424 
   1425   expectRead(*pipe2.in, "foo").wait(ws);
   1426 
   1427   // Cancel pump.
   1428   pumpPromise = nullptr;
   1429 
   1430   auto promise3 = pipe2.out->write("baz", 3);
   1431   expectRead(*pipe2.in, "baz").wait(ws);
   1432 }
   1433 
   1434 KJ_TEST("Userland pipe tryPumpFrom cancel") {
   1435   kj::EventLoop loop;
   1436   WaitScope ws(loop);
   1437 
   1438   auto pipe = newOneWayPipe();
   1439   auto pipe2 = newOneWayPipe();
   1440   auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
   1441 
   1442   auto promise = pipe.out->write("foobar", 3);
   1443   KJ_EXPECT(!promise.poll(ws));
   1444 
   1445   expectRead(*pipe2.in, "foo").wait(ws);
   1446 
   1447   // Cancel pump.
   1448   pumpPromise = nullptr;
   1449 
   1450   auto promise3 = pipe2.out->write("baz", 3);
   1451   expectRead(*pipe2.in, "baz").wait(ws);
   1452 }
   1453 
   1454 KJ_TEST("Userland pipe with limit") {
   1455   kj::EventLoop loop;
   1456   WaitScope ws(loop);
   1457 
   1458   auto pipe = newOneWayPipe(6);
   1459 
   1460   {
   1461     auto promise = pipe.out->write("foo", 3);
   1462     KJ_EXPECT(!promise.poll(ws));
   1463     expectRead(*pipe.in, "foo").wait(ws);
   1464     promise.wait(ws);
   1465   }
   1466 
   1467   {
   1468     auto promise = pipe.in->readAllText();
   1469     KJ_EXPECT(!promise.poll(ws));
   1470     auto promise2 = pipe.out->write("barbaz", 6);
   1471     KJ_EXPECT(promise.wait(ws) == "bar");
   1472     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("read end of pipe was aborted", promise2.wait(ws));
   1473   }
   1474 
   1475   // Further writes throw and reads return EOF.
   1476   KJ_EXPECT_THROW_RECOVERABLE_MESSAGE(
   1477       "abortRead() has been called", pipe.out->write("baz", 3).wait(ws));
   1478   KJ_EXPECT(pipe.in->readAllText().wait(ws) == "");
   1479 }
   1480 
   1481 KJ_TEST("Userland pipe pumpTo with limit") {
   1482   kj::EventLoop loop;
   1483   WaitScope ws(loop);
   1484 
   1485   auto pipe = newOneWayPipe(6);
   1486   auto pipe2 = newOneWayPipe();
   1487   auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
   1488 
   1489   {
   1490     auto promise = pipe.out->write("foo", 3);
   1491     KJ_EXPECT(!promise.poll(ws));
   1492     expectRead(*pipe2.in, "foo").wait(ws);
   1493     promise.wait(ws);
   1494   }
   1495 
   1496   {
   1497     auto promise = expectRead(*pipe2.in, "bar");
   1498     KJ_EXPECT(!promise.poll(ws));
   1499     auto promise2 = pipe.out->write("barbaz", 6);
   1500     promise.wait(ws);
   1501     pumpPromise.wait(ws);
   1502     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("read end of pipe was aborted", promise2.wait(ws));
   1503   }
   1504 
   1505   // Further writes throw.
   1506   KJ_EXPECT_THROW_RECOVERABLE_MESSAGE(
   1507       "abortRead() has been called", pipe.out->write("baz", 3).wait(ws));
   1508 }
   1509 
   1510 KJ_TEST("Userland pipe pump into zero-limited pipe, no data to pump") {
   1511   kj::EventLoop loop;
   1512   WaitScope ws(loop);
   1513 
   1514   auto pipe = newOneWayPipe();
   1515   auto pipe2 = newOneWayPipe(uint64_t(0));
   1516   auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
   1517 
   1518   expectRead(*pipe2.in, "");
   1519   pipe.out = nullptr;
   1520   KJ_EXPECT(pumpPromise.wait(ws) == 0);
   1521 }
   1522 
   1523 KJ_TEST("Userland pipe pump into zero-limited pipe, data is pumped") {
   1524   kj::EventLoop loop;
   1525   WaitScope ws(loop);
   1526 
   1527   auto pipe = newOneWayPipe();
   1528   auto pipe2 = newOneWayPipe(uint64_t(0));
   1529   auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
   1530 
   1531   expectRead(*pipe2.in, "");
   1532   auto writePromise = pipe.out->write("foo", 3);
   1533   KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("abortRead() has been called", pumpPromise.wait(ws));
   1534 }
   1535 
   1536 KJ_TEST("Userland pipe gather write") {
   1537   kj::EventLoop loop;
   1538   WaitScope ws(loop);
   1539 
   1540   auto pipe = newOneWayPipe();
   1541 
   1542   ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
   1543   auto promise = pipe.out->write(parts);
   1544   KJ_EXPECT(!promise.poll(ws));
   1545   expectRead(*pipe.in, "foobar").wait(ws);
   1546   promise.wait(ws);
   1547 
   1548   auto promise2 = pipe.in->readAllText();
   1549   KJ_EXPECT(!promise2.poll(ws));
   1550 
   1551   pipe.out = nullptr;
   1552   KJ_EXPECT(promise2.wait(ws) == "");
   1553 }
   1554 
   1555 KJ_TEST("Userland pipe gather write split on buffer boundary") {
   1556   kj::EventLoop loop;
   1557   WaitScope ws(loop);
   1558 
   1559   auto pipe = newOneWayPipe();
   1560 
   1561   ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
   1562   auto promise = pipe.out->write(parts);
   1563   KJ_EXPECT(!promise.poll(ws));
   1564   expectRead(*pipe.in, "foo").wait(ws);
   1565   expectRead(*pipe.in, "bar").wait(ws);
   1566   promise.wait(ws);
   1567 
   1568   auto promise2 = pipe.in->readAllText();
   1569   KJ_EXPECT(!promise2.poll(ws));
   1570 
   1571   pipe.out = nullptr;
   1572   KJ_EXPECT(promise2.wait(ws) == "");
   1573 }
   1574 
   1575 KJ_TEST("Userland pipe gather write split mid-first-buffer") {
   1576   kj::EventLoop loop;
   1577   WaitScope ws(loop);
   1578 
   1579   auto pipe = newOneWayPipe();
   1580 
   1581   ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
   1582   auto promise = pipe.out->write(parts);
   1583   KJ_EXPECT(!promise.poll(ws));
   1584   expectRead(*pipe.in, "fo").wait(ws);
   1585   expectRead(*pipe.in, "obar").wait(ws);
   1586   promise.wait(ws);
   1587 
   1588   auto promise2 = pipe.in->readAllText();
   1589   KJ_EXPECT(!promise2.poll(ws));
   1590 
   1591   pipe.out = nullptr;
   1592   KJ_EXPECT(promise2.wait(ws) == "");
   1593 }
   1594 
   1595 KJ_TEST("Userland pipe gather write split mid-second-buffer") {
   1596   kj::EventLoop loop;
   1597   WaitScope ws(loop);
   1598 
   1599   auto pipe = newOneWayPipe();
   1600 
   1601   ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
   1602   auto promise = pipe.out->write(parts);
   1603   KJ_EXPECT(!promise.poll(ws));
   1604   expectRead(*pipe.in, "foob").wait(ws);
   1605   expectRead(*pipe.in, "ar").wait(ws);
   1606   promise.wait(ws);
   1607 
   1608   auto promise2 = pipe.in->readAllText();
   1609   KJ_EXPECT(!promise2.poll(ws));
   1610 
   1611   pipe.out = nullptr;
   1612   KJ_EXPECT(promise2.wait(ws) == "");
   1613 }
   1614 
   1615 KJ_TEST("Userland pipe gather write pump") {
   1616   kj::EventLoop loop;
   1617   WaitScope ws(loop);
   1618 
   1619   auto pipe = newOneWayPipe();
   1620   auto pipe2 = newOneWayPipe();
   1621   auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
   1622 
   1623   ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
   1624   auto promise = pipe.out->write(parts);
   1625   KJ_EXPECT(!promise.poll(ws));
   1626   expectRead(*pipe2.in, "foobar").wait(ws);
   1627   promise.wait(ws);
   1628 
   1629   pipe.out = nullptr;
   1630   KJ_EXPECT(pumpPromise.wait(ws) == 6);
   1631 }
   1632 
   1633 KJ_TEST("Userland pipe gather write pump split on buffer boundary") {
   1634   kj::EventLoop loop;
   1635   WaitScope ws(loop);
   1636 
   1637   auto pipe = newOneWayPipe();
   1638   auto pipe2 = newOneWayPipe();
   1639   auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
   1640 
   1641   ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
   1642   auto promise = pipe.out->write(parts);
   1643   KJ_EXPECT(!promise.poll(ws));
   1644   expectRead(*pipe2.in, "foo").wait(ws);
   1645   expectRead(*pipe2.in, "bar").wait(ws);
   1646   promise.wait(ws);
   1647 
   1648   pipe.out = nullptr;
   1649   KJ_EXPECT(pumpPromise.wait(ws) == 6);
   1650 }
   1651 
   1652 KJ_TEST("Userland pipe gather write pump split mid-first-buffer") {
   1653   kj::EventLoop loop;
   1654   WaitScope ws(loop);
   1655 
   1656   auto pipe = newOneWayPipe();
   1657   auto pipe2 = newOneWayPipe();
   1658   auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
   1659 
   1660   ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
   1661   auto promise = pipe.out->write(parts);
   1662   KJ_EXPECT(!promise.poll(ws));
   1663   expectRead(*pipe2.in, "fo").wait(ws);
   1664   expectRead(*pipe2.in, "obar").wait(ws);
   1665   promise.wait(ws);
   1666 
   1667   pipe.out = nullptr;
   1668   KJ_EXPECT(pumpPromise.wait(ws) == 6);
   1669 }
   1670 
   1671 KJ_TEST("Userland pipe gather write pump split mid-second-buffer") {
   1672   kj::EventLoop loop;
   1673   WaitScope ws(loop);
   1674 
   1675   auto pipe = newOneWayPipe();
   1676   auto pipe2 = newOneWayPipe();
   1677   auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
   1678 
   1679   ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
   1680   auto promise = pipe.out->write(parts);
   1681   KJ_EXPECT(!promise.poll(ws));
   1682   expectRead(*pipe2.in, "foob").wait(ws);
   1683   expectRead(*pipe2.in, "ar").wait(ws);
   1684   promise.wait(ws);
   1685 
   1686   pipe.out = nullptr;
   1687   KJ_EXPECT(pumpPromise.wait(ws) == 6);
   1688 }
   1689 
   1690 KJ_TEST("Userland pipe gather write split pump on buffer boundary") {
   1691   kj::EventLoop loop;
   1692   WaitScope ws(loop);
   1693 
   1694   auto pipe = newOneWayPipe();
   1695   auto pipe2 = newOneWayPipe();
   1696   auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 3)
   1697       .then([&](uint64_t i) {
   1698     KJ_EXPECT(i == 3);
   1699     return pipe.in->pumpTo(*pipe2.out, 3);
   1700   });
   1701 
   1702   ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
   1703   auto promise = pipe.out->write(parts);
   1704   KJ_EXPECT(!promise.poll(ws));
   1705   expectRead(*pipe2.in, "foobar").wait(ws);
   1706   promise.wait(ws);
   1707 
   1708   pipe.out = nullptr;
   1709   KJ_EXPECT(pumpPromise.wait(ws) == 3);
   1710 }
   1711 
   1712 KJ_TEST("Userland pipe gather write split pump mid-first-buffer") {
   1713   kj::EventLoop loop;
   1714   WaitScope ws(loop);
   1715 
   1716   auto pipe = newOneWayPipe();
   1717   auto pipe2 = newOneWayPipe();
   1718   auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 2)
   1719       .then([&](uint64_t i) {
   1720     KJ_EXPECT(i == 2);
   1721     return pipe.in->pumpTo(*pipe2.out, 4);
   1722   });
   1723 
   1724   ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
   1725   auto promise = pipe.out->write(parts);
   1726   KJ_EXPECT(!promise.poll(ws));
   1727   expectRead(*pipe2.in, "foobar").wait(ws);
   1728   promise.wait(ws);
   1729 
   1730   pipe.out = nullptr;
   1731   KJ_EXPECT(pumpPromise.wait(ws) == 4);
   1732 }
   1733 
   1734 KJ_TEST("Userland pipe gather write split pump mid-second-buffer") {
   1735   kj::EventLoop loop;
   1736   WaitScope ws(loop);
   1737 
   1738   auto pipe = newOneWayPipe();
   1739   auto pipe2 = newOneWayPipe();
   1740   auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 4)
   1741       .then([&](uint64_t i) {
   1742     KJ_EXPECT(i == 4);
   1743     return pipe.in->pumpTo(*pipe2.out, 2);
   1744   });
   1745 
   1746   ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
   1747   auto promise = pipe.out->write(parts);
   1748   KJ_EXPECT(!promise.poll(ws));
   1749   expectRead(*pipe2.in, "foobar").wait(ws);
   1750   promise.wait(ws);
   1751 
   1752   pipe.out = nullptr;
   1753   KJ_EXPECT(pumpPromise.wait(ws) == 2);
   1754 }
   1755 
   1756 KJ_TEST("Userland pipe gather write pumpFrom") {
   1757   kj::EventLoop loop;
   1758   WaitScope ws(loop);
   1759 
   1760   auto pipe = newOneWayPipe();
   1761   auto pipe2 = newOneWayPipe();
   1762   auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
   1763 
   1764   ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
   1765   auto promise = pipe.out->write(parts);
   1766   KJ_EXPECT(!promise.poll(ws));
   1767   expectRead(*pipe2.in, "foobar").wait(ws);
   1768   promise.wait(ws);
   1769 
   1770   pipe.out = nullptr;
   1771   char c;
   1772   auto eofPromise = pipe2.in->tryRead(&c, 1, 1);
   1773   eofPromise.poll(ws);  // force pump to notice EOF
   1774   KJ_EXPECT(pumpPromise.wait(ws) == 6);
   1775   pipe2.out = nullptr;
   1776   KJ_EXPECT(eofPromise.wait(ws) == 0);
   1777 }
   1778 
   1779 KJ_TEST("Userland pipe gather write pumpFrom split on buffer boundary") {
   1780   kj::EventLoop loop;
   1781   WaitScope ws(loop);
   1782 
   1783   auto pipe = newOneWayPipe();
   1784   auto pipe2 = newOneWayPipe();
   1785   auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
   1786 
   1787   ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
   1788   auto promise = pipe.out->write(parts);
   1789   KJ_EXPECT(!promise.poll(ws));
   1790   expectRead(*pipe2.in, "foo").wait(ws);
   1791   expectRead(*pipe2.in, "bar").wait(ws);
   1792   promise.wait(ws);
   1793 
   1794   pipe.out = nullptr;
   1795   char c;
   1796   auto eofPromise = pipe2.in->tryRead(&c, 1, 1);
   1797   eofPromise.poll(ws);  // force pump to notice EOF
   1798   KJ_EXPECT(pumpPromise.wait(ws) == 6);
   1799   pipe2.out = nullptr;
   1800   KJ_EXPECT(eofPromise.wait(ws) == 0);
   1801 }
   1802 
   1803 KJ_TEST("Userland pipe gather write pumpFrom split mid-first-buffer") {
   1804   kj::EventLoop loop;
   1805   WaitScope ws(loop);
   1806 
   1807   auto pipe = newOneWayPipe();
   1808   auto pipe2 = newOneWayPipe();
   1809   auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
   1810 
   1811   ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
   1812   auto promise = pipe.out->write(parts);
   1813   KJ_EXPECT(!promise.poll(ws));
   1814   expectRead(*pipe2.in, "fo").wait(ws);
   1815   expectRead(*pipe2.in, "obar").wait(ws);
   1816   promise.wait(ws);
   1817 
   1818   pipe.out = nullptr;
   1819   char c;
   1820   auto eofPromise = pipe2.in->tryRead(&c, 1, 1);
   1821   eofPromise.poll(ws);  // force pump to notice EOF
   1822   KJ_EXPECT(pumpPromise.wait(ws) == 6);
   1823   pipe2.out = nullptr;
   1824   KJ_EXPECT(eofPromise.wait(ws) == 0);
   1825 }
   1826 
   1827 KJ_TEST("Userland pipe gather write pumpFrom split mid-second-buffer") {
   1828   kj::EventLoop loop;
   1829   WaitScope ws(loop);
   1830 
   1831   auto pipe = newOneWayPipe();
   1832   auto pipe2 = newOneWayPipe();
   1833   auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
   1834 
   1835   ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
   1836   auto promise = pipe.out->write(parts);
   1837   KJ_EXPECT(!promise.poll(ws));
   1838   expectRead(*pipe2.in, "foob").wait(ws);
   1839   expectRead(*pipe2.in, "ar").wait(ws);
   1840   promise.wait(ws);
   1841 
   1842   pipe.out = nullptr;
   1843   char c;
   1844   auto eofPromise = pipe2.in->tryRead(&c, 1, 1);
   1845   eofPromise.poll(ws);  // force pump to notice EOF
   1846   KJ_EXPECT(pumpPromise.wait(ws) == 6);
   1847   pipe2.out = nullptr;
   1848   KJ_EXPECT(eofPromise.wait(ws) == 0);
   1849 }
   1850 
   1851 KJ_TEST("Userland pipe gather write split pumpFrom on buffer boundary") {
   1852   kj::EventLoop loop;
   1853   WaitScope ws(loop);
   1854 
   1855   auto pipe = newOneWayPipe();
   1856   auto pipe2 = newOneWayPipe();
   1857   auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 3))
   1858       .then([&](uint64_t i) {
   1859     KJ_EXPECT(i == 3);
   1860     return KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 3));
   1861   });
   1862 
   1863   ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
   1864   auto promise = pipe.out->write(parts);
   1865   KJ_EXPECT(!promise.poll(ws));
   1866   expectRead(*pipe2.in, "foobar").wait(ws);
   1867   promise.wait(ws);
   1868 
   1869   pipe.out = nullptr;
   1870   KJ_EXPECT(pumpPromise.wait(ws) == 3);
   1871 }
   1872 
   1873 KJ_TEST("Userland pipe gather write split pumpFrom mid-first-buffer") {
   1874   kj::EventLoop loop;
   1875   WaitScope ws(loop);
   1876 
   1877   auto pipe = newOneWayPipe();
   1878   auto pipe2 = newOneWayPipe();
   1879   auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 2))
   1880       .then([&](uint64_t i) {
   1881     KJ_EXPECT(i == 2);
   1882     return KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 4));
   1883   });
   1884 
   1885   ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
   1886   auto promise = pipe.out->write(parts);
   1887   KJ_EXPECT(!promise.poll(ws));
   1888   expectRead(*pipe2.in, "foobar").wait(ws);
   1889   promise.wait(ws);
   1890 
   1891   pipe.out = nullptr;
   1892   KJ_EXPECT(pumpPromise.wait(ws) == 4);
   1893 }
   1894 
   1895 KJ_TEST("Userland pipe gather write split pumpFrom mid-second-buffer") {
   1896   kj::EventLoop loop;
   1897   WaitScope ws(loop);
   1898 
   1899   auto pipe = newOneWayPipe();
   1900   auto pipe2 = newOneWayPipe();
   1901   auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 4))
   1902       .then([&](uint64_t i) {
   1903     KJ_EXPECT(i == 4);
   1904     return KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 2));
   1905   });
   1906 
   1907   ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
   1908   auto promise = pipe.out->write(parts);
   1909   KJ_EXPECT(!promise.poll(ws));
   1910   expectRead(*pipe2.in, "foobar").wait(ws);
   1911   promise.wait(ws);
   1912 
   1913   pipe.out = nullptr;
   1914   KJ_EXPECT(pumpPromise.wait(ws) == 2);
   1915 }
   1916 
   1917 KJ_TEST("Userland pipe pumpTo less than write amount") {
   1918   kj::EventLoop loop;
   1919   WaitScope ws(loop);
   1920 
   1921   auto pipe = newOneWayPipe();
   1922   auto pipe2 = newOneWayPipe();
   1923   auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 1);
   1924 
   1925   auto pieces = kj::heapArray<ArrayPtr<const byte>>(2);
   1926   byte a[1] = { 'a' };
   1927   byte b[1] = { 'b' };
   1928   pieces[0] = arrayPtr(a, 1);
   1929   pieces[1] = arrayPtr(b, 1);
   1930 
   1931   auto writePromise = pipe.out->write(pieces);
   1932   KJ_EXPECT(!writePromise.poll(ws));
   1933 
   1934   expectRead(*pipe2.in, "a").wait(ws);
   1935   KJ_EXPECT(pumpPromise.wait(ws) == 1);
   1936   KJ_EXPECT(!writePromise.poll(ws));
   1937 
   1938   pumpPromise = pipe.in->pumpTo(*pipe2.out, 1);
   1939 
   1940   expectRead(*pipe2.in, "b").wait(ws);
   1941   KJ_EXPECT(pumpPromise.wait(ws) == 1);
   1942   writePromise.wait(ws);
   1943 }
   1944 
   1945 KJ_TEST("Userland pipe pumpFrom EOF on abortRead()") {
   1946   kj::EventLoop loop;
   1947   WaitScope ws(loop);
   1948 
   1949   auto pipe = newOneWayPipe();
   1950   auto pipe2 = newOneWayPipe();
   1951   auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
   1952 
   1953   auto promise = pipe.out->write("foobar", 6);
   1954   KJ_EXPECT(!promise.poll(ws));
   1955   expectRead(*pipe2.in, "foobar").wait(ws);
   1956   promise.wait(ws);
   1957 
   1958   KJ_EXPECT(!pumpPromise.poll(ws));
   1959   pipe.out = nullptr;
   1960   pipe2.in = nullptr;  // force pump to notice EOF
   1961   KJ_EXPECT(pumpPromise.wait(ws) == 6);
   1962   pipe2.out = nullptr;
   1963 }
   1964 
   1965 KJ_TEST("Userland pipe EOF fulfills pumpFrom promise") {
   1966   kj::EventLoop loop;
   1967   WaitScope ws(loop);
   1968 
   1969   auto pipe = newOneWayPipe();
   1970   auto pipe2 = newOneWayPipe();
   1971   auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
   1972 
   1973   auto writePromise = pipe.out->write("foobar", 6);
   1974   KJ_EXPECT(!writePromise.poll(ws));
   1975   auto pipe3 = newOneWayPipe();
   1976   auto pumpPromise2 = pipe2.in->pumpTo(*pipe3.out);
   1977   KJ_EXPECT(!pumpPromise2.poll(ws));
   1978   expectRead(*pipe3.in, "foobar").wait(ws);
   1979   writePromise.wait(ws);
   1980 
   1981   KJ_EXPECT(!pumpPromise.poll(ws));
   1982   pipe.out = nullptr;
   1983   KJ_EXPECT(pumpPromise.wait(ws) == 6);
   1984 
   1985   KJ_EXPECT(!pumpPromise2.poll(ws));
   1986   pipe2.out = nullptr;
   1987   KJ_EXPECT(pumpPromise2.wait(ws) == 6);
   1988 }
   1989 
   1990 KJ_TEST("Userland pipe tryPumpFrom to pumpTo for same amount fulfills simultaneously") {
   1991   kj::EventLoop loop;
   1992   WaitScope ws(loop);
   1993 
   1994   auto pipe = newOneWayPipe();
   1995   auto pipe2 = newOneWayPipe();
   1996   auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 6));
   1997 
   1998   auto writePromise = pipe.out->write("foobar", 6);
   1999   KJ_EXPECT(!writePromise.poll(ws));
   2000   auto pipe3 = newOneWayPipe();
   2001   auto pumpPromise2 = pipe2.in->pumpTo(*pipe3.out, 6);
   2002   KJ_EXPECT(!pumpPromise2.poll(ws));
   2003   expectRead(*pipe3.in, "foobar").wait(ws);
   2004   writePromise.wait(ws);
   2005 
   2006   KJ_EXPECT(pumpPromise.wait(ws) == 6);
   2007   KJ_EXPECT(pumpPromise2.wait(ws) == 6);
   2008 }
   2009 
   2010 KJ_TEST("Userland pipe multi-part write doesn't quit early") {
   2011   kj::EventLoop loop;
   2012   WaitScope ws(loop);
   2013 
   2014   auto pipe = newOneWayPipe();
   2015 
   2016   auto readPromise = expectRead(*pipe.in, "foo");
   2017 
   2018   kj::ArrayPtr<const byte> pieces[2] = { "foobar"_kj.asBytes(), "baz"_kj.asBytes() };
   2019   auto writePromise = pipe.out->write(pieces);
   2020 
   2021   readPromise.wait(ws);
   2022   KJ_EXPECT(!writePromise.poll(ws));
   2023   expectRead(*pipe.in, "bar").wait(ws);
   2024   KJ_EXPECT(!writePromise.poll(ws));
   2025   expectRead(*pipe.in, "baz").wait(ws);
   2026   writePromise.wait(ws);
   2027 }
   2028 
   2029 KJ_TEST("Userland pipe BlockedRead gets empty tryPumpFrom") {
   2030   kj::EventLoop loop;
   2031   WaitScope ws(loop);
   2032 
   2033   auto pipe = newOneWayPipe();
   2034   auto pipe2 = newOneWayPipe();
   2035 
   2036   // First start a read from the back end.
   2037   char buffer[4];
   2038   auto readPromise = pipe2.in->tryRead(buffer, 1, 4);
   2039 
   2040   // Now arrange a pump between the pipes, using tryPumpFrom().
   2041   auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
   2042 
   2043   // Disconnect the front pipe, causing EOF on the pump.
   2044   pipe.out = nullptr;
   2045 
   2046   // The pump should have produced zero bytes.
   2047   KJ_EXPECT(pumpPromise.wait(ws) == 0);
   2048 
   2049   // The read is incomplete.
   2050   KJ_EXPECT(!readPromise.poll(ws));
   2051 
   2052   // A subsequent write() completes the read.
   2053   pipe2.out->write("foo", 3).wait(ws);
   2054   KJ_EXPECT(readPromise.wait(ws) == 3);
   2055   buffer[3] = '\0';
   2056   KJ_EXPECT(kj::StringPtr(buffer, 3) == "foo");
   2057 }
   2058 
   2059 constexpr static auto TEE_MAX_CHUNK_SIZE = 1 << 14;
   2060 // AsyncTee::MAX_CHUNK_SIZE, 16k as of this writing
   2061 
   2062 KJ_TEST("Userland tee") {
   2063   kj::EventLoop loop;
   2064   WaitScope ws(loop);
   2065 
   2066   auto pipe = newOneWayPipe();
   2067   auto tee = newTee(kj::mv(pipe.in));
   2068   auto left = kj::mv(tee.branches[0]);
   2069   auto right = kj::mv(tee.branches[1]);
   2070 
   2071   auto writePromise = pipe.out->write("foobar", 6);
   2072 
   2073   expectRead(*left, "foobar").wait(ws);
   2074   writePromise.wait(ws);
   2075   expectRead(*right, "foobar").wait(ws);
   2076 }
   2077 
   2078 KJ_TEST("Userland nested tee") {
   2079   kj::EventLoop loop;
   2080   WaitScope ws(loop);
   2081 
   2082   auto pipe = newOneWayPipe();
   2083   auto tee = newTee(kj::mv(pipe.in));
   2084   auto left = kj::mv(tee.branches[0]);
   2085   auto right = kj::mv(tee.branches[1]);
   2086 
   2087   auto tee2 = newTee(kj::mv(right));
   2088   auto rightLeft = kj::mv(tee2.branches[0]);
   2089   auto rightRight = kj::mv(tee2.branches[1]);
   2090 
   2091   auto writePromise = pipe.out->write("foobar", 6);
   2092 
   2093   expectRead(*left, "foobar").wait(ws);
   2094   writePromise.wait(ws);
   2095   expectRead(*rightLeft, "foobar").wait(ws);
   2096   expectRead(*rightRight, "foo").wait(ws);
   2097 
   2098   auto tee3 = newTee(kj::mv(rightRight));
   2099   auto rightRightLeft = kj::mv(tee3.branches[0]);
   2100   auto rightRightRight = kj::mv(tee3.branches[1]);
   2101   expectRead(*rightRightLeft, "bar").wait(ws);
   2102   expectRead(*rightRightRight, "b").wait(ws);
   2103 
   2104   auto tee4 = newTee(kj::mv(rightRightRight));
   2105   auto rightRightRightLeft = kj::mv(tee4.branches[0]);
   2106   auto rightRightRightRight = kj::mv(tee4.branches[1]);
   2107   expectRead(*rightRightRightLeft, "ar").wait(ws);
   2108   expectRead(*rightRightRightRight, "ar").wait(ws);
   2109 }
   2110 
   2111 KJ_TEST("Userland tee concurrent read") {
   2112   kj::EventLoop loop;
   2113   WaitScope ws(loop);
   2114 
   2115   auto pipe = newOneWayPipe();
   2116   auto tee = newTee(kj::mv(pipe.in));
   2117   auto left = kj::mv(tee.branches[0]);
   2118   auto right = kj::mv(tee.branches[1]);
   2119 
   2120   uint8_t leftBuf[6] = { 0 };
   2121   uint8_t rightBuf[6] = { 0 };
   2122   auto leftPromise = left->tryRead(leftBuf, 6, 6);
   2123   auto rightPromise = right->tryRead(rightBuf, 6, 6);
   2124   KJ_EXPECT(!leftPromise.poll(ws));
   2125   KJ_EXPECT(!rightPromise.poll(ws));
   2126 
   2127   pipe.out->write("foobar", 6).wait(ws);
   2128 
   2129   KJ_EXPECT(leftPromise.wait(ws) == 6);
   2130   KJ_EXPECT(rightPromise.wait(ws) == 6);
   2131 
   2132   KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0);
   2133   KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0);
   2134 }
   2135 
   2136 KJ_TEST("Userland tee cancel and restart read") {
   2137   kj::EventLoop loop;
   2138   WaitScope ws(loop);
   2139 
   2140   auto pipe = newOneWayPipe();
   2141   auto tee = newTee(kj::mv(pipe.in));
   2142   auto left = kj::mv(tee.branches[0]);
   2143   auto right = kj::mv(tee.branches[1]);
   2144 
   2145   auto writePromise = pipe.out->write("foobar", 6);
   2146 
   2147   {
   2148     // Initiate a read and immediately cancel it.
   2149     uint8_t buf[6] = { 0 };
   2150     auto promise = left->tryRead(buf, 6, 6);
   2151   }
   2152 
   2153   // Subsequent reads still see the full data.
   2154   expectRead(*left, "foobar").wait(ws);
   2155   writePromise.wait(ws);
   2156   expectRead(*right, "foobar").wait(ws);
   2157 }
   2158 
   2159 KJ_TEST("Userland tee cancel read and destroy branch then read other branch") {
   2160   kj::EventLoop loop;
   2161   WaitScope ws(loop);
   2162 
   2163   auto pipe = newOneWayPipe();
   2164   auto tee = newTee(kj::mv(pipe.in));
   2165   auto left = kj::mv(tee.branches[0]);
   2166   auto right = kj::mv(tee.branches[1]);
   2167 
   2168   auto writePromise = pipe.out->write("foobar", 6);
   2169 
   2170   {
   2171     // Initiate a read and immediately cancel it.
   2172     uint8_t buf[6] = { 0 };
   2173     auto promise = left->tryRead(buf, 6, 6);
   2174   }
   2175 
   2176   // And destroy the branch for good measure.
   2177   left = nullptr;
   2178 
   2179   // Subsequent reads on the other branch still see the full data.
   2180   expectRead(*right, "foobar").wait(ws);
   2181   writePromise.wait(ws);
   2182 }
   2183 
   2184 KJ_TEST("Userland tee subsequent other-branch reads are READY_NOW") {
   2185   kj::EventLoop loop;
   2186   WaitScope ws(loop);
   2187 
   2188   auto pipe = newOneWayPipe();
   2189   auto tee = newTee(kj::mv(pipe.in));
   2190   auto left = kj::mv(tee.branches[0]);
   2191   auto right = kj::mv(tee.branches[1]);
   2192 
   2193   uint8_t leftBuf[6] = { 0 };
   2194   auto leftPromise = left->tryRead(leftBuf, 6, 6);
   2195   // This is the first read, so there should NOT be buffered data.
   2196   KJ_EXPECT(!leftPromise.poll(ws));
   2197   pipe.out->write("foobar", 6).wait(ws);
   2198   leftPromise.wait(ws);
   2199   KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0);
   2200 
   2201   uint8_t rightBuf[6] = { 0 };
   2202   auto rightPromise = right->tryRead(rightBuf, 6, 6);
   2203   // The left read promise was fulfilled, so there SHOULD be buffered data.
   2204   KJ_EXPECT(rightPromise.poll(ws));
   2205   rightPromise.wait(ws);
   2206   KJ_EXPECT(memcmp(rightBuf, "foobar", 6) == 0);
   2207 }
   2208 
   2209 KJ_TEST("Userland tee read EOF propagation") {
   2210   kj::EventLoop loop;
   2211   WaitScope ws(loop);
   2212 
   2213   auto pipe = newOneWayPipe();
   2214   auto writePromise = pipe.out->write("foobar", 6);
   2215   auto tee = newTee(mv(pipe.in));
   2216   auto left = kj::mv(tee.branches[0]);
   2217   auto right = kj::mv(tee.branches[1]);
   2218 
   2219   // Lengthless pipe, so ...
   2220   KJ_EXPECT(left->tryGetLength() == nullptr);
   2221   KJ_EXPECT(right->tryGetLength() == nullptr);
   2222 
   2223   uint8_t leftBuf[7] = { 0 };
   2224   auto leftPromise = left->tryRead(leftBuf, size(leftBuf), size(leftBuf));
   2225   writePromise.wait(ws);
   2226   // Destroying the output side should force a short read.
   2227   pipe.out = nullptr;
   2228 
   2229   KJ_EXPECT(leftPromise.wait(ws) == 6);
   2230   KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0);
   2231 
   2232   // And we should see a short read here, too.
   2233   uint8_t rightBuf[7] = { 0 };
   2234   auto rightPromise = right->tryRead(rightBuf, size(rightBuf), size(rightBuf));
   2235   KJ_EXPECT(rightPromise.wait(ws) == 6);
   2236   KJ_EXPECT(memcmp(rightBuf, "foobar", 6) == 0);
   2237 
   2238   // Further reads should all be short.
   2239   KJ_EXPECT(left->tryRead(leftBuf, 1, size(leftBuf)).wait(ws) == 0);
   2240   KJ_EXPECT(right->tryRead(rightBuf, 1, size(rightBuf)).wait(ws) == 0);
   2241 }
   2242 
   2243 KJ_TEST("Userland tee read exception propagation") {
   2244   kj::EventLoop loop;
   2245   WaitScope ws(loop);
   2246 
   2247   // Make a pipe expecting to read more than we're actually going to write. This will force a "pipe
   2248   // ended prematurely" exception when we destroy the output side early.
   2249   auto pipe = newOneWayPipe(7);
   2250   auto writePromise = pipe.out->write("foobar", 6);
   2251   auto tee = newTee(mv(pipe.in));
   2252   auto left = kj::mv(tee.branches[0]);
   2253   auto right = kj::mv(tee.branches[1]);
   2254 
   2255   // Test tryGetLength() while we're at it.
   2256   KJ_EXPECT(KJ_ASSERT_NONNULL(left->tryGetLength()) == 7);
   2257   KJ_EXPECT(KJ_ASSERT_NONNULL(right->tryGetLength()) == 7);
   2258 
   2259   uint8_t leftBuf[7] = { 0 };
   2260   auto leftPromise = left->tryRead(leftBuf, 6, size(leftBuf));
   2261   writePromise.wait(ws);
   2262   // Destroying the output side should force a fulfillment of the read (since we reached minBytes).
   2263   pipe.out = nullptr;
   2264   KJ_EXPECT(leftPromise.wait(ws) == 6);
   2265   KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0);
   2266 
   2267   // The next read sees the exception.
   2268   KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely",
   2269       left->tryRead(leftBuf, 1, size(leftBuf)).ignoreResult().wait(ws));
   2270 
   2271   // Test tryGetLength() here -- the unread branch still sees the original length value.
   2272   KJ_EXPECT(KJ_ASSERT_NONNULL(left->tryGetLength()) == 1);
   2273   KJ_EXPECT(KJ_ASSERT_NONNULL(right->tryGetLength()) == 7);
   2274 
   2275   // We should see the buffered data on the other side, even though we don't reach our minBytes.
   2276   uint8_t rightBuf[7] = { 0 };
   2277   auto rightPromise = right->tryRead(rightBuf, size(rightBuf), size(rightBuf));
   2278   KJ_EXPECT(rightPromise.wait(ws) == 6);
   2279   KJ_EXPECT(memcmp(rightBuf, "foobar", 6) == 0);
   2280   KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely",
   2281       right->tryRead(rightBuf, 1, size(leftBuf)).ignoreResult().wait(ws));
   2282 
   2283   // Further reads should all see the exception again.
   2284   KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely",
   2285       left->tryRead(leftBuf, 1, size(leftBuf)).ignoreResult().wait(ws));
   2286   KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely",
   2287       right->tryRead(rightBuf, 1, size(leftBuf)).ignoreResult().wait(ws));
   2288 }
   2289 
   2290 KJ_TEST("Userland tee read exception propagation w/ data loss") {
   2291   kj::EventLoop loop;
   2292   WaitScope ws(loop);
   2293 
   2294   // Make a pipe expecting to read more than we're actually going to write. This will force a "pipe
   2295   // ended prematurely" exception once the pipe sees a short read.
   2296   auto pipe = newOneWayPipe(7);
   2297   auto writePromise = pipe.out->write("foobar", 6);
   2298   auto tee = newTee(mv(pipe.in));
   2299   auto left = kj::mv(tee.branches[0]);
   2300   auto right = kj::mv(tee.branches[1]);
   2301 
   2302   uint8_t leftBuf[7] = { 0 };
   2303   auto leftPromise = left->tryRead(leftBuf, 7, 7);
   2304   writePromise.wait(ws);
   2305   // Destroying the output side should force an exception, since we didn't reach our minBytes.
   2306   pipe.out = nullptr;
   2307   KJ_EXPECT_THROW_RECOVERABLE_MESSAGE(
   2308       "pipe ended prematurely", leftPromise.ignoreResult().wait(ws));
   2309 
   2310   // And we should see a short read here, too. In fact, we shouldn't see anything: the short read
   2311   // above read all of the pipe's data, but then failed to buffer it because it encountered an
   2312   // exception. It buffered the exception, instead.
   2313   uint8_t rightBuf[7] = { 0 };
   2314   KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely",
   2315       right->tryRead(rightBuf, 1, 1).ignoreResult().wait(ws));
   2316 }
   2317 
   2318 KJ_TEST("Userland tee read into different buffer sizes") {
   2319   kj::EventLoop loop;
   2320   WaitScope ws(loop);
   2321 
   2322   auto tee = newTee(heap<MockAsyncInputStream>("foo bar baz"_kj.asBytes(), 11));
   2323   auto left = kj::mv(tee.branches[0]);
   2324   auto right = kj::mv(tee.branches[1]);
   2325 
   2326   uint8_t leftBuf[5] = { 0 };
   2327   uint8_t rightBuf[11] = { 0 };
   2328 
   2329   auto leftPromise = left->tryRead(leftBuf, 5, 5);
   2330   auto rightPromise = right->tryRead(rightBuf, 11, 11);
   2331 
   2332   KJ_EXPECT(leftPromise.wait(ws) == 5);
   2333   KJ_EXPECT(rightPromise.wait(ws) == 11);
   2334 }
   2335 
   2336 KJ_TEST("Userland tee reads see max(minBytes...) and min(maxBytes...)") {
   2337   kj::EventLoop loop;
   2338   WaitScope ws(loop);
   2339 
   2340   auto tee = newTee(heap<MockAsyncInputStream>("foo bar baz"_kj.asBytes(), 11));
   2341   auto left = kj::mv(tee.branches[0]);
   2342   auto right = kj::mv(tee.branches[1]);
   2343 
   2344   {
   2345     uint8_t leftBuf[5] = { 0 };
   2346     uint8_t rightBuf[11] = { 0 };
   2347 
   2348     // Subrange of another range. The smaller maxBytes should win.
   2349     auto leftPromise = left->tryRead(leftBuf, 3, 5);
   2350     auto rightPromise = right->tryRead(rightBuf, 1, 11);
   2351 
   2352     KJ_EXPECT(leftPromise.wait(ws) == 5);
   2353     KJ_EXPECT(rightPromise.wait(ws) == 5);
   2354   }
   2355 
   2356   {
   2357     uint8_t leftBuf[5] = { 0 };
   2358     uint8_t rightBuf[11] = { 0 };
   2359 
   2360     // Disjoint ranges. The larger minBytes should win.
   2361     auto leftPromise = left->tryRead(leftBuf, 3, 5);
   2362     auto rightPromise = right->tryRead(rightBuf, 6, 11);
   2363 
   2364     KJ_EXPECT(leftPromise.wait(ws) == 5);
   2365     KJ_EXPECT(rightPromise.wait(ws) == 6);
   2366 
   2367     KJ_EXPECT(left->tryRead(leftBuf, 1, 2).wait(ws) == 1);
   2368   }
   2369 }
   2370 
   2371 KJ_TEST("Userland tee read stress test") {
   2372   kj::EventLoop loop;
   2373   WaitScope ws(loop);
   2374 
   2375   auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ",");
   2376 
   2377   auto tee = newTee(heap<MockAsyncInputStream>(bigText.asBytes(), bigText.size()));
   2378   auto left = kj::mv(tee.branches[0]);
   2379   auto right = kj::mv(tee.branches[1]);
   2380 
   2381   auto leftBuffer = heapArray<byte>(bigText.size());
   2382 
   2383   {
   2384     auto leftSlice = leftBuffer.slice(0, leftBuffer.size());
   2385     while (leftSlice.size() > 0) {
   2386       for (size_t blockSize: { 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59 }) {
   2387         if (leftSlice.size() == 0) break;
   2388         auto maxBytes = min(blockSize, leftSlice.size());
   2389         auto amount = left->tryRead(leftSlice.begin(), 1, maxBytes).wait(ws);
   2390         leftSlice = leftSlice.slice(amount, leftSlice.size());
   2391       }
   2392     }
   2393   }
   2394 
   2395   KJ_EXPECT(memcmp(leftBuffer.begin(), bigText.begin(), leftBuffer.size()) == 0);
   2396   KJ_EXPECT(right->readAllText().wait(ws) == bigText);
   2397 }
   2398 
   2399 KJ_TEST("Userland tee pump") {
   2400   kj::EventLoop loop;
   2401   WaitScope ws(loop);
   2402 
   2403   auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ",");
   2404 
   2405   auto tee = newTee(heap<MockAsyncInputStream>(bigText.asBytes(), bigText.size()));
   2406   auto left = kj::mv(tee.branches[0]);
   2407   auto right = kj::mv(tee.branches[1]);
   2408 
   2409   auto leftPipe = newOneWayPipe();
   2410   auto rightPipe = newOneWayPipe();
   2411 
   2412   auto leftPumpPromise = left->pumpTo(*leftPipe.out, 7);
   2413   KJ_EXPECT(!leftPumpPromise.poll(ws));
   2414 
   2415   auto rightPumpPromise = right->pumpTo(*rightPipe.out);
   2416   // Neither are ready yet, because the left pump's backpressure has blocked the AsyncTee's pull
   2417   // loop until we read from leftPipe.
   2418   KJ_EXPECT(!leftPumpPromise.poll(ws));
   2419   KJ_EXPECT(!rightPumpPromise.poll(ws));
   2420 
   2421   expectRead(*leftPipe.in, "foo bar").wait(ws);
   2422   KJ_EXPECT(leftPumpPromise.wait(ws) == 7);
   2423   KJ_EXPECT(!rightPumpPromise.poll(ws));
   2424 
   2425   // We should be able to read up to how far the left side pumped, and beyond. The left side will
   2426   // now have data in its buffer.
   2427   expectRead(*rightPipe.in, "foo bar baz,foo bar baz,foo").wait(ws);
   2428 
   2429   // Consume the left side buffer.
   2430   expectRead(*left, " baz,foo bar").wait(ws);
   2431 
   2432   // We can destroy the left branch entirely and the right branch will still see all data.
   2433   left = nullptr;
   2434   KJ_EXPECT(!rightPumpPromise.poll(ws));
   2435   auto allTextPromise = rightPipe.in->readAllText();
   2436   KJ_EXPECT(rightPumpPromise.wait(ws) == bigText.size());
   2437   // Need to force an EOF in the right pipe to check the result.
   2438   rightPipe.out = nullptr;
   2439   KJ_EXPECT(allTextPromise.wait(ws) == bigText.slice(27));
   2440 }
   2441 
   2442 KJ_TEST("Userland tee pump slows down reads") {
   2443   kj::EventLoop loop;
   2444   WaitScope ws(loop);
   2445 
   2446   auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ",");
   2447 
   2448   auto tee = newTee(heap<MockAsyncInputStream>(bigText.asBytes(), bigText.size()));
   2449   auto left = kj::mv(tee.branches[0]);
   2450   auto right = kj::mv(tee.branches[1]);
   2451 
   2452   auto leftPipe = newOneWayPipe();
   2453   auto leftPumpPromise = left->pumpTo(*leftPipe.out);
   2454   KJ_EXPECT(!leftPumpPromise.poll(ws));
   2455 
   2456   // The left pump will cause some data to be buffered on the right branch, which we can read.
   2457   auto rightExpectation0 = kj::str(bigText.slice(0, TEE_MAX_CHUNK_SIZE));
   2458   expectRead(*right, rightExpectation0).wait(ws);
   2459 
   2460   // But the next right branch read is blocked by the left pipe's backpressure.
   2461   auto rightExpectation1 = kj::str(bigText.slice(TEE_MAX_CHUNK_SIZE, TEE_MAX_CHUNK_SIZE + 10));
   2462   auto rightPromise = expectRead(*right, rightExpectation1);
   2463   KJ_EXPECT(!rightPromise.poll(ws));
   2464 
   2465   // The right branch read finishes when we relieve the pressure in the left pipe.
   2466   auto allTextPromise = leftPipe.in->readAllText();
   2467   rightPromise.wait(ws);
   2468   KJ_EXPECT(leftPumpPromise.wait(ws) == bigText.size());
   2469   leftPipe.out = nullptr;
   2470   KJ_EXPECT(allTextPromise.wait(ws) == bigText);
   2471 }
   2472 
   2473 KJ_TEST("Userland tee pump EOF propagation") {
   2474   kj::EventLoop loop;
   2475   WaitScope ws(loop);
   2476 
   2477   {
   2478     // EOF encountered by two pump operations.
   2479     auto pipe = newOneWayPipe();
   2480     auto writePromise = pipe.out->write("foo bar", 7);
   2481     auto tee = newTee(mv(pipe.in));
   2482     auto left = kj::mv(tee.branches[0]);
   2483     auto right = kj::mv(tee.branches[1]);
   2484 
   2485     auto leftPipe = newOneWayPipe();
   2486     auto rightPipe = newOneWayPipe();
   2487 
   2488     // Pump the first bit, and block.
   2489 
   2490     auto leftPumpPromise = left->pumpTo(*leftPipe.out);
   2491     KJ_EXPECT(!leftPumpPromise.poll(ws));
   2492     auto rightPumpPromise = right->pumpTo(*rightPipe.out);
   2493     writePromise.wait(ws);
   2494     KJ_EXPECT(!leftPumpPromise.poll(ws));
   2495     KJ_EXPECT(!rightPumpPromise.poll(ws));
   2496 
   2497     // Induce an EOF. We should see it propagated to both pump promises.
   2498 
   2499     pipe.out = nullptr;
   2500 
   2501     // Relieve backpressure.
   2502     auto leftAllPromise = leftPipe.in->readAllText();
   2503     auto rightAllPromise = rightPipe.in->readAllText();
   2504     KJ_EXPECT(leftPumpPromise.wait(ws) == 7);
   2505     KJ_EXPECT(rightPumpPromise.wait(ws) == 7);
   2506 
   2507     // Make sure we got the data on the pipes that were being pumped to.
   2508     KJ_EXPECT(!leftAllPromise.poll(ws));
   2509     KJ_EXPECT(!rightAllPromise.poll(ws));
   2510     leftPipe.out = nullptr;
   2511     rightPipe.out = nullptr;
   2512     KJ_EXPECT(leftAllPromise.wait(ws) == "foo bar");
   2513     KJ_EXPECT(rightAllPromise.wait(ws) == "foo bar");
   2514   }
   2515 
   2516   {
   2517     // EOF encountered by a read and pump operation.
   2518     auto pipe = newOneWayPipe();
   2519     auto writePromise = pipe.out->write("foo bar", 7);
   2520     auto tee = newTee(mv(pipe.in));
   2521     auto left = kj::mv(tee.branches[0]);
   2522     auto right = kj::mv(tee.branches[1]);
   2523 
   2524     auto leftPipe = newOneWayPipe();
   2525     auto rightPipe = newOneWayPipe();
   2526 
   2527     // Pump one branch, read another.
   2528 
   2529     auto leftPumpPromise = left->pumpTo(*leftPipe.out);
   2530     KJ_EXPECT(!leftPumpPromise.poll(ws));
   2531     expectRead(*right, "foo bar").wait(ws);
   2532     writePromise.wait(ws);
   2533     uint8_t dummy = 0;
   2534     auto rightReadPromise = right->tryRead(&dummy, 1, 1);
   2535 
   2536     // Induce an EOF. We should see it propagated to both the read and pump promises.
   2537 
   2538     pipe.out = nullptr;
   2539 
   2540     // Relieve backpressure in the tee to see the EOF.
   2541     auto leftAllPromise = leftPipe.in->readAllText();
   2542     KJ_EXPECT(leftPumpPromise.wait(ws) == 7);
   2543     KJ_EXPECT(rightReadPromise.wait(ws) == 0);
   2544 
   2545     // Make sure we got the data on the pipe that was being pumped to.
   2546     KJ_EXPECT(!leftAllPromise.poll(ws));
   2547     leftPipe.out = nullptr;
   2548     KJ_EXPECT(leftAllPromise.wait(ws) == "foo bar");
   2549   }
   2550 }
   2551 
   2552 KJ_TEST("Userland tee pump EOF on chunk boundary") {
   2553   kj::EventLoop loop;
   2554   WaitScope ws(loop);
   2555 
   2556   auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ",");
   2557 
   2558   // Conjure an EOF right on the boundary of the tee's internal chunk.
   2559   auto chunkText = kj::str(bigText.slice(0, TEE_MAX_CHUNK_SIZE));
   2560   auto tee = newTee(heap<MockAsyncInputStream>(chunkText.asBytes(), chunkText.size()));
   2561   auto left = kj::mv(tee.branches[0]);
   2562   auto right = kj::mv(tee.branches[1]);
   2563 
   2564   auto leftPipe = newOneWayPipe();
   2565   auto rightPipe = newOneWayPipe();
   2566 
   2567   auto leftPumpPromise = left->pumpTo(*leftPipe.out);
   2568   auto rightPumpPromise = right->pumpTo(*rightPipe.out);
   2569   KJ_EXPECT(!leftPumpPromise.poll(ws));
   2570   KJ_EXPECT(!rightPumpPromise.poll(ws));
   2571 
   2572   auto leftAllPromise = leftPipe.in->readAllText();
   2573   auto rightAllPromise = rightPipe.in->readAllText();
   2574 
   2575   // The pumps should see the EOF and stop.
   2576   KJ_EXPECT(leftPumpPromise.wait(ws) == TEE_MAX_CHUNK_SIZE);
   2577   KJ_EXPECT(rightPumpPromise.wait(ws) == TEE_MAX_CHUNK_SIZE);
   2578 
   2579   // Verify that we saw the data on the other end of the destination pipes.
   2580   leftPipe.out = nullptr;
   2581   rightPipe.out = nullptr;
   2582   KJ_EXPECT(leftAllPromise.wait(ws) == chunkText);
   2583   KJ_EXPECT(rightAllPromise.wait(ws) == chunkText);
   2584 }
   2585 
   2586 KJ_TEST("Userland tee pump read exception propagation") {
   2587   kj::EventLoop loop;
   2588   WaitScope ws(loop);
   2589 
   2590   {
   2591     // Exception encountered by two pump operations.
   2592     auto pipe = newOneWayPipe(14);
   2593     auto writePromise = pipe.out->write("foo bar", 7);
   2594     auto tee = newTee(mv(pipe.in));
   2595     auto left = kj::mv(tee.branches[0]);
   2596     auto right = kj::mv(tee.branches[1]);
   2597 
   2598     auto leftPipe = newOneWayPipe();
   2599     auto rightPipe = newOneWayPipe();
   2600 
   2601     // Pump the first bit, and block.
   2602 
   2603     auto leftPumpPromise = left->pumpTo(*leftPipe.out);
   2604     KJ_EXPECT(!leftPumpPromise.poll(ws));
   2605     auto rightPumpPromise = right->pumpTo(*rightPipe.out);
   2606     writePromise.wait(ws);
   2607     KJ_EXPECT(!leftPumpPromise.poll(ws));
   2608     KJ_EXPECT(!rightPumpPromise.poll(ws));
   2609 
   2610     // Induce a read exception. We should see it propagated to both pump promises.
   2611 
   2612     pipe.out = nullptr;
   2613 
   2614     // Both promises must exist before the backpressure in the tee is relieved, and the tee pull
   2615     // loop actually sees the exception.
   2616     auto leftAllPromise = leftPipe.in->readAllText();
   2617     auto rightAllPromise = rightPipe.in->readAllText();
   2618     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE(
   2619         "pipe ended prematurely", leftPumpPromise.ignoreResult().wait(ws));
   2620     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE(
   2621         "pipe ended prematurely", rightPumpPromise.ignoreResult().wait(ws));
   2622 
   2623     // Make sure we got the data on the destination pipes.
   2624     KJ_EXPECT(!leftAllPromise.poll(ws));
   2625     KJ_EXPECT(!rightAllPromise.poll(ws));
   2626     leftPipe.out = nullptr;
   2627     rightPipe.out = nullptr;
   2628     KJ_EXPECT(leftAllPromise.wait(ws) == "foo bar");
   2629     KJ_EXPECT(rightAllPromise.wait(ws) == "foo bar");
   2630   }
   2631 
   2632   {
   2633     // Exception encountered by a read and pump operation.
   2634     auto pipe = newOneWayPipe(14);
   2635     auto writePromise = pipe.out->write("foo bar", 7);
   2636     auto tee = newTee(mv(pipe.in));
   2637     auto left = kj::mv(tee.branches[0]);
   2638     auto right = kj::mv(tee.branches[1]);
   2639 
   2640     auto leftPipe = newOneWayPipe();
   2641     auto rightPipe = newOneWayPipe();
   2642 
   2643     // Pump one branch, read another.
   2644 
   2645     auto leftPumpPromise = left->pumpTo(*leftPipe.out);
   2646     KJ_EXPECT(!leftPumpPromise.poll(ws));
   2647     expectRead(*right, "foo bar").wait(ws);
   2648     writePromise.wait(ws);
   2649     uint8_t dummy = 0;
   2650     auto rightReadPromise = right->tryRead(&dummy, 1, 1);
   2651 
   2652     // Induce a read exception. We should see it propagated to both the read and pump promises.
   2653 
   2654     pipe.out = nullptr;
   2655 
   2656     // Relieve backpressure in the tee to see the exceptions.
   2657     auto leftAllPromise = leftPipe.in->readAllText();
   2658     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE(
   2659         "pipe ended prematurely", leftPumpPromise.ignoreResult().wait(ws));
   2660     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE(
   2661         "pipe ended prematurely", rightReadPromise.ignoreResult().wait(ws));
   2662 
   2663     // Make sure we got the data on the destination pipe.
   2664     KJ_EXPECT(!leftAllPromise.poll(ws));
   2665     leftPipe.out = nullptr;
   2666     KJ_EXPECT(leftAllPromise.wait(ws) == "foo bar");
   2667   }
   2668 }
   2669 
   2670 KJ_TEST("Userland tee pump write exception propagation") {
   2671   kj::EventLoop loop;
   2672   WaitScope ws(loop);
   2673 
   2674   auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ",");
   2675 
   2676   auto tee = newTee(heap<MockAsyncInputStream>(bigText.asBytes(), bigText.size()));
   2677   auto left = kj::mv(tee.branches[0]);
   2678   auto right = kj::mv(tee.branches[1]);
   2679 
   2680   // Set up two pumps and let them block.
   2681   auto leftPipe = newOneWayPipe();
   2682   auto rightPipe = newOneWayPipe();
   2683   auto leftPumpPromise = left->pumpTo(*leftPipe.out);
   2684   auto rightPumpPromise = right->pumpTo(*rightPipe.out);
   2685   KJ_EXPECT(!leftPumpPromise.poll(ws));
   2686   KJ_EXPECT(!rightPumpPromise.poll(ws));
   2687 
   2688   // Induce a write exception in the right branch pump. It should propagate to the right pump
   2689   // promise.
   2690   rightPipe.in = nullptr;
   2691   KJ_EXPECT_THROW_RECOVERABLE_MESSAGE(
   2692       "read end of pipe was aborted", rightPumpPromise.ignoreResult().wait(ws));
   2693 
   2694   // The left pump promise does not see the right branch's write exception.
   2695   KJ_EXPECT(!leftPumpPromise.poll(ws));
   2696   auto allTextPromise = leftPipe.in->readAllText();
   2697   KJ_EXPECT(leftPumpPromise.wait(ws) == bigText.size());
   2698   leftPipe.out = nullptr;
   2699   KJ_EXPECT(allTextPromise.wait(ws) == bigText);
   2700 }
   2701 
   2702 KJ_TEST("Userland tee pump cancellation implies write cancellation") {
   2703   kj::EventLoop loop;
   2704   WaitScope ws(loop);
   2705 
   2706   auto text = "foo bar baz"_kj;
   2707 
   2708   auto tee = newTee(heap<MockAsyncInputStream>(text.asBytes(), text.size()));
   2709   auto left = kj::mv(tee.branches[0]);
   2710   auto right = kj::mv(tee.branches[1]);
   2711 
   2712   auto leftPipe = newOneWayPipe();
   2713   auto leftPumpPromise = left->pumpTo(*leftPipe.out);
   2714 
   2715   // Arrange to block the left pump on its write operation.
   2716   expectRead(*right, "foo ").wait(ws);
   2717   KJ_EXPECT(!leftPumpPromise.poll(ws));
   2718 
   2719   // Then cancel the pump, while it's still blocked.
   2720   leftPumpPromise = nullptr;
   2721   // It should cancel its write operations, so it should now be safe to destroy the output stream to
   2722   // which it was pumping.
   2723   KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
   2724     leftPipe.out = nullptr;
   2725   })) {
   2726     KJ_FAIL_EXPECT("write promises were not canceled", exception);
   2727   }
   2728 }
   2729 
   2730 KJ_TEST("Userland tee buffer size limit") {
   2731   kj::EventLoop loop;
   2732   WaitScope ws(loop);
   2733 
   2734   auto text = "foo bar baz"_kj;
   2735 
   2736   {
   2737     // We can carefully read data to stay under our ridiculously low limit.
   2738 
   2739     auto tee = newTee(heap<MockAsyncInputStream>(text.asBytes(), text.size()), 2);
   2740     auto left = kj::mv(tee.branches[0]);
   2741     auto right = kj::mv(tee.branches[1]);
   2742 
   2743     expectRead(*left, "fo").wait(ws);
   2744     expectRead(*right, "foo ").wait(ws);
   2745     expectRead(*left, "o ba").wait(ws);
   2746     expectRead(*right, "bar ").wait(ws);
   2747     expectRead(*left, "r ba").wait(ws);
   2748     expectRead(*right, "baz").wait(ws);
   2749     expectRead(*left, "z").wait(ws);
   2750   }
   2751 
   2752   {
   2753     // Exceeding the limit causes both branches to see the exception after exhausting their buffers.
   2754 
   2755     auto tee = newTee(heap<MockAsyncInputStream>(text.asBytes(), text.size()), 2);
   2756     auto left = kj::mv(tee.branches[0]);
   2757     auto right = kj::mv(tee.branches[1]);
   2758 
   2759     expectRead(*left, "fo").wait(ws);
   2760     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("tee buffer size limit exceeded",
   2761         expectRead(*left, "o").wait(ws));
   2762     expectRead(*right, "fo").wait(ws);
   2763     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("tee buffer size limit exceeded",
   2764         expectRead(*right, "o").wait(ws));
   2765   }
   2766 
   2767   {
   2768     // We guarantee that two pumps started simultaneously will never exceed our buffer size limit.
   2769 
   2770     auto tee = newTee(heap<MockAsyncInputStream>(text.asBytes(), text.size()), 2);
   2771     auto left = kj::mv(tee.branches[0]);
   2772     auto right = kj::mv(tee.branches[1]);
   2773     auto leftPipe = kj::newOneWayPipe();
   2774     auto rightPipe = kj::newOneWayPipe();
   2775 
   2776     auto leftPumpPromise = left->pumpTo(*leftPipe.out);
   2777     auto rightPumpPromise = right->pumpTo(*rightPipe.out);
   2778     KJ_EXPECT(!leftPumpPromise.poll(ws));
   2779     KJ_EXPECT(!rightPumpPromise.poll(ws));
   2780 
   2781     uint8_t leftBuf[11] = { 0 };
   2782     uint8_t rightBuf[11] = { 0 };
   2783 
   2784     // The first read on the left pipe will succeed.
   2785     auto leftPromise = leftPipe.in->tryRead(leftBuf, 1, 11);
   2786     KJ_EXPECT(leftPromise.wait(ws) == 2);
   2787     KJ_EXPECT(memcmp(leftBuf, text.begin(), 2) == 0);
   2788 
   2789     // But the second will block until we relieve pressure on the right pipe.
   2790     leftPromise = leftPipe.in->tryRead(leftBuf + 2, 1, 9);
   2791     KJ_EXPECT(!leftPromise.poll(ws));
   2792 
   2793     // Relieve the right pipe pressure ...
   2794     auto rightPromise = rightPipe.in->tryRead(rightBuf, 1, 11);
   2795     KJ_EXPECT(rightPromise.wait(ws) == 2);
   2796     KJ_EXPECT(memcmp(rightBuf, text.begin(), 2) == 0);
   2797 
   2798     // Now the second left pipe read will complete.
   2799     KJ_EXPECT(leftPromise.wait(ws) == 2);
   2800     KJ_EXPECT(memcmp(leftBuf, text.begin(), 4) == 0);
   2801 
   2802     // Leapfrog the left branch with the right. There should be 2 bytes in the buffer, so we can
   2803     // demand a total of 4.
   2804     rightPromise = rightPipe.in->tryRead(rightBuf + 2, 4, 9);
   2805     KJ_EXPECT(rightPromise.wait(ws) == 4);
   2806     KJ_EXPECT(memcmp(rightBuf, text.begin(), 6) == 0);
   2807 
   2808     // Leapfrog the right with the left. We demand the entire rest of the stream, so this should
   2809     // block. Note that a regular read for this amount on one of the tee branches directly would
   2810     // exceed our buffer size limit, but this one does not, because we have the pipe to regulate
   2811     // backpressure for us.
   2812     leftPromise = leftPipe.in->tryRead(leftBuf + 4, 7, 7);
   2813     KJ_EXPECT(!leftPromise.poll(ws));
   2814 
   2815     // Ask for the entire rest of the stream on the right branch and wrap things up.
   2816     rightPromise = rightPipe.in->tryRead(rightBuf + 6, 5, 5);
   2817 
   2818     KJ_EXPECT(leftPromise.wait(ws) == 7);
   2819     KJ_EXPECT(memcmp(leftBuf, text.begin(), 11) == 0);
   2820 
   2821     KJ_EXPECT(rightPromise.wait(ws) == 5);
   2822     KJ_EXPECT(memcmp(rightBuf, text.begin(), 11) == 0);
   2823   }
   2824 }
   2825 
   2826 KJ_TEST("Userspace OneWayPipe whenWriteDisconnected()") {
   2827   kj::EventLoop loop;
   2828   WaitScope ws(loop);
   2829 
   2830   auto pipe = newOneWayPipe();
   2831 
   2832   auto abortedPromise = pipe.out->whenWriteDisconnected();
   2833   KJ_ASSERT(!abortedPromise.poll(ws));
   2834 
   2835   pipe.in = nullptr;
   2836 
   2837   KJ_ASSERT(abortedPromise.poll(ws));
   2838   abortedPromise.wait(ws);
   2839 }
   2840 
   2841 KJ_TEST("Userspace TwoWayPipe whenWriteDisconnected()") {
   2842   kj::EventLoop loop;
   2843   WaitScope ws(loop);
   2844 
   2845   auto pipe = newTwoWayPipe();
   2846 
   2847   auto abortedPromise = pipe.ends[0]->whenWriteDisconnected();
   2848   KJ_ASSERT(!abortedPromise.poll(ws));
   2849 
   2850   pipe.ends[1] = nullptr;
   2851 
   2852   KJ_ASSERT(abortedPromise.poll(ws));
   2853   abortedPromise.wait(ws);
   2854 }
   2855 
   2856 #if !_WIN32  // We don't currently support detecting disconnect with IOCP.
   2857 #if !__CYGWIN__  // TODO(someday): Figure out why whenWriteDisconnected() doesn't work on Cygwin.
   2858 
   2859 KJ_TEST("OS OneWayPipe whenWriteDisconnected()") {
   2860   auto io = setupAsyncIo();
   2861 
   2862   auto pipe = io.provider->newOneWayPipe();
   2863 
   2864   pipe.out->write("foo", 3).wait(io.waitScope);
   2865   auto abortedPromise = pipe.out->whenWriteDisconnected();
   2866   KJ_ASSERT(!abortedPromise.poll(io.waitScope));
   2867 
   2868   pipe.in = nullptr;
   2869 
   2870   KJ_ASSERT(abortedPromise.poll(io.waitScope));
   2871   abortedPromise.wait(io.waitScope);
   2872 }
   2873 
   2874 KJ_TEST("OS TwoWayPipe whenWriteDisconnected()") {
   2875   auto io = setupAsyncIo();
   2876 
   2877   auto pipe = io.provider->newTwoWayPipe();
   2878 
   2879   pipe.ends[0]->write("foo", 3).wait(io.waitScope);
   2880   pipe.ends[1]->write("bar", 3).wait(io.waitScope);
   2881 
   2882   auto abortedPromise = pipe.ends[0]->whenWriteDisconnected();
   2883   KJ_ASSERT(!abortedPromise.poll(io.waitScope));
   2884 
   2885   pipe.ends[1] = nullptr;
   2886 
   2887   KJ_ASSERT(abortedPromise.poll(io.waitScope));
   2888   abortedPromise.wait(io.waitScope);
   2889 
   2890   char buffer[4];
   2891   KJ_ASSERT(pipe.ends[0]->tryRead(&buffer, 3, 3).wait(io.waitScope) == 3);
   2892   buffer[3] = '\0';
   2893   KJ_EXPECT(buffer == "bar"_kj);
   2894 
   2895   // Note: Reading any further in pipe.ends[0] would throw "connection reset".
   2896 }
   2897 
   2898 KJ_TEST("import socket FD that's already broken") {
   2899   auto io = setupAsyncIo();
   2900 
   2901   int fds[2];
   2902   KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
   2903   KJ_SYSCALL(write(fds[1], "foo", 3));
   2904   KJ_SYSCALL(close(fds[1]));
   2905 
   2906   auto stream = io.lowLevelProvider->wrapSocketFd(fds[0], LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
   2907 
   2908   auto abortedPromise = stream->whenWriteDisconnected();
   2909   KJ_ASSERT(abortedPromise.poll(io.waitScope));
   2910   abortedPromise.wait(io.waitScope);
   2911 
   2912   char buffer[4];
   2913   KJ_ASSERT(stream->tryRead(&buffer, sizeof(buffer), sizeof(buffer)).wait(io.waitScope) == 3);
   2914   buffer[3] = '\0';
   2915   KJ_EXPECT(buffer == "foo"_kj);
   2916 }
   2917 
   2918 #endif  // !__CYGWIN__
   2919 #endif  // !_WIN32
   2920 
   2921 }  // namespace
   2922 }  // namespace kj