capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

async-io-win32.c++ (43554B)


      1 // Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #if _WIN32
     23 // For Unix implementation, see async-io-unix.c++.
     24 
     25 // Request Vista-level APIs.
     26 #include "win32-api-version.h"
     27 
     28 #include "async-io.h"
     29 #include "async-io-internal.h"
     30 #include "async-win32.h"
     31 #include "debug.h"
     32 #include "thread.h"
     33 #include "io.h"
     34 #include "vector.h"
     35 #include <set>
     36 
     37 #include <winsock2.h>
     38 #include <ws2ipdef.h>
     39 #include <ws2tcpip.h>
     40 #include <mswsock.h>
     41 #include <stdlib.h>
     42 
     43 #ifndef IPV6_V6ONLY
     44 // MinGW's headers are missing this.
     45 #define IPV6_V6ONLY 27
     46 #endif
     47 
     48 namespace kj {
     49 
     50 namespace _ {  // private
     51 
     52 struct WinsockInitializer {
     53   WinsockInitializer() {
     54     WSADATA dontcare;
     55     int result = WSAStartup(MAKEWORD(2, 2), &dontcare);
     56     if (result != 0) {
     57       KJ_FAIL_WIN32("WSAStartup()", result);
     58     }
     59   }
     60 };
     61 
     62 void initWinsockOnce() {
     63   static WinsockInitializer initializer;
     64 }
     65 
     66 int win32Socketpair(SOCKET socks[2]) {
     67   // This function from: https://github.com/ncm/selectable-socketpair/blob/master/socketpair.c
     68   //
     69   // Copyright notice:
     70   //
     71   //    Copyright 2007, 2010 by Nathan C. Myers <ncm@cantrip.org>
     72   //    Redistribution and use in source and binary forms, with or without modification,
     73   //    are permitted provided that the following conditions are met:
     74   //
     75   //        Redistributions of source code must retain the above copyright notice, this
     76   //        list of conditions and the following disclaimer.
     77   //
     78   //        Redistributions in binary form must reproduce the above copyright notice,
     79   //        this list of conditions and the following disclaimer in the documentation
     80   //        and/or other materials provided with the distribution.
     81   //
     82   //        The name of the author must not be used to endorse or promote products
     83   //        derived from this software without specific prior written permission.
     84   //
     85   //    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
     86   //    AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     87   //    IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
     88   //    DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
     89   //    FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     90   //    DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
     91   //    SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
     92   //    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
     93   //    OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     94   //    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     95 
     96   // Note: This function is called from some Cap'n Proto unit tests, despite not having a public
     97   //   header declaration.
     98   // TODO(cleanup): Consider putting this somewhere public? Note that since it depends on Winsock,
     99   //   it needs to be in the kj-async library.
    100 
    101   initWinsockOnce();
    102 
    103   union {
    104     struct sockaddr_in inaddr;
    105     struct sockaddr addr;
    106   } a;
    107   SOCKET listener;
    108   int e;
    109   socklen_t addrlen = sizeof(a.inaddr);
    110   int reuse = 1;
    111 
    112   if (socks == 0) {
    113     WSASetLastError(WSAEINVAL);
    114     return SOCKET_ERROR;
    115   }
    116   socks[0] = socks[1] = -1;
    117 
    118   listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    119   if (listener == -1)
    120     return SOCKET_ERROR;
    121 
    122   memset(&a, 0, sizeof(a));
    123   a.inaddr.sin_family = AF_INET;
    124   a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
    125   a.inaddr.sin_port = 0;
    126 
    127   for (;;) {
    128     if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR,
    129            (char*) &reuse, (socklen_t) sizeof(reuse)) == -1)
    130       break;
    131     if  (bind(listener, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
    132       break;
    133 
    134     memset(&a, 0, sizeof(a));
    135     if  (getsockname(listener, &a.addr, &addrlen) == SOCKET_ERROR)
    136       break;
    137     // win32 getsockname may only set the port number, p=0.0005.
    138     // ( http://msdn.microsoft.com/library/ms738543.aspx ):
    139     a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
    140     a.inaddr.sin_family = AF_INET;
    141 
    142     if (listen(listener, 1) == SOCKET_ERROR)
    143       break;
    144 
    145     socks[0] = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    146     if (socks[0] == -1)
    147       break;
    148     if (connect(socks[0], &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
    149       break;
    150 
    151   retryAccept:
    152     socks[1] = accept(listener, NULL, NULL);
    153     if (socks[1] == -1)
    154       break;
    155 
    156     // Verify that the client is actually us and not someone else who raced to connect first.
    157     // (This check added by Kenton for security.)
    158     union {
    159       struct sockaddr_in inaddr;
    160       struct sockaddr addr;
    161     } b, c;
    162     socklen_t bAddrlen = sizeof(b.inaddr);
    163     socklen_t cAddrlen = sizeof(b.inaddr);
    164     if (getpeername(socks[1], &b.addr, &bAddrlen) == SOCKET_ERROR)
    165       break;
    166     if (getsockname(socks[0], &c.addr, &cAddrlen) == SOCKET_ERROR)
    167       break;
    168     if (bAddrlen != cAddrlen || memcmp(&b.addr, &c.addr, bAddrlen) != 0) {
    169       // Someone raced to connect first. Ignore.
    170       closesocket(socks[1]);
    171       goto retryAccept;
    172     }
    173 
    174     closesocket(listener);
    175     return 0;
    176   }
    177 
    178   e = WSAGetLastError();
    179   closesocket(listener);
    180   closesocket(socks[0]);
    181   closesocket(socks[1]);
    182   WSASetLastError(e);
    183   socks[0] = socks[1] = -1;
    184   return SOCKET_ERROR;
    185 }
    186 
    187 }  // namespace _
    188 
    189 namespace {
    190 
    191 // =======================================================================================
    192 
    193 static constexpr uint NEW_FD_FLAGS = LowLevelAsyncIoProvider::TAKE_OWNERSHIP;
    194 
    195 class OwnedFd {
    196 public:
    197   OwnedFd(SOCKET fd, uint flags): fd(fd), flags(flags) {
    198     // TODO(perf): Maybe use SetFileCompletionNotificationModes() to tell Windows not to bother
    199     //   delivering an event when the operation completes inline. Not currently implemented on
    200     //   Wine, though.
    201   }
    202 
    203   ~OwnedFd() noexcept(false) {
    204     if (flags & LowLevelAsyncIoProvider::TAKE_OWNERSHIP) {
    205       KJ_WINSOCK(closesocket(fd)) { break; }
    206     }
    207   }
    208 
    209 protected:
    210   SOCKET fd;
    211 
    212 private:
    213   uint flags;
    214 };
    215 
    216 // =======================================================================================
    217 
    218 class AsyncStreamFd: public OwnedFd, public AsyncIoStream {
    219 public:
    220   AsyncStreamFd(Win32EventPort& eventPort, SOCKET fd, uint flags)
    221       : OwnedFd(fd, flags),
    222         observer(eventPort.observeIo(reinterpret_cast<HANDLE>(fd))) {}
    223   virtual ~AsyncStreamFd() noexcept(false) {}
    224 
    225   Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
    226     return tryRead(buffer, minBytes, maxBytes).then([=](size_t result) {
    227       KJ_REQUIRE(result >= minBytes, "Premature EOF") {
    228         // Pretend we read zeros from the input.
    229         memset(reinterpret_cast<byte*>(buffer) + result, 0, minBytes - result);
    230         return minBytes;
    231       }
    232       return result;
    233     });
    234   }
    235 
    236   Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
    237     auto bufs = heapArray<WSABUF>(1);
    238     bufs[0].buf = reinterpret_cast<char*>(buffer);
    239     bufs[0].len = maxBytes;
    240 
    241     ArrayPtr<WSABUF> ref = bufs;
    242     return tryReadInternal(ref, minBytes, 0).attach(kj::mv(bufs));
    243   }
    244 
    245   Promise<void> write(const void* buffer, size_t size) override {
    246     auto bufs = heapArray<WSABUF>(1);
    247     bufs[0].buf = const_cast<char*>(reinterpret_cast<const char*>(buffer));
    248     bufs[0].len = size;
    249 
    250     ArrayPtr<WSABUF> ref = bufs;
    251     return writeInternal(ref).attach(kj::mv(bufs));
    252   }
    253 
    254   Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
    255     auto bufs = heapArray<WSABUF>(pieces.size());
    256     for (auto i: kj::indices(pieces)) {
    257       bufs[i].buf = const_cast<char*>(pieces[i].asChars().begin());
    258       bufs[i].len = pieces[i].size();
    259     }
    260 
    261     ArrayPtr<WSABUF> ref = bufs;
    262     return writeInternal(ref).attach(kj::mv(bufs));
    263   }
    264 
    265   kj::Promise<void> connect(const struct sockaddr* addr, uint addrlen) {
    266     // In order to connect asynchronously, we need the ConnectEx() function. Apparently, we have
    267     // to query the socket for it dynamically, I guess because of the insanity in which winsock
    268     // can be implemented in userspace and old implementations may not support it.
    269     GUID guid = WSAID_CONNECTEX;
    270     LPFN_CONNECTEX connectEx = nullptr;
    271     DWORD n = 0;
    272     KJ_WINSOCK(WSAIoctl(fd, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
    273                         &connectEx, sizeof(connectEx), &n, NULL, NULL)) {
    274       goto fail;  // avoid memory leak due to compiler bugs
    275     }
    276     if (false) {
    277     fail:
    278       return kj::READY_NOW;
    279     }
    280 
    281     // OK, phew, we now have our ConnectEx function pointer. Call it.
    282     auto op = observer->newOperation(0);
    283 
    284     if (!connectEx(fd, addr, addrlen, NULL, 0, NULL, op->getOverlapped())) {
    285       DWORD error = WSAGetLastError();
    286       if (error != ERROR_IO_PENDING) {
    287         KJ_FAIL_WIN32("ConnectEx()", error) { break; }
    288         return kj::READY_NOW;
    289       }
    290     }
    291 
    292     return op->onComplete().then([this](Win32EventPort::IoResult result) {
    293       if (result.errorCode != ERROR_SUCCESS) {
    294         KJ_FAIL_WIN32("ConnectEx()", result.errorCode) { return; }
    295       }
    296 
    297       // Enable shutdown() to work.
    298       setsockopt(SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
    299     });
    300   }
    301 
    302   Promise<void> whenWriteDisconnected() override {
    303     // Windows IOCP does not provide a direct, documented way to detect when the socket disconnects
    304     // without actually doing a read or write. However, there is an undocoumented-but-stable
    305     // ioctl called IOCTL_AFD_POLL which can be used for this purpose. In fact, select() is
    306     // implemented in terms of this ioctl -- performed synchronously -- but it's entirely possible
    307     // to put only one socket into the list and perform the ioctl asynchronously. Here's the
    308     // source code for select() in Windows 2000 (not sure how this became public...):
    309     //
    310     //     https://github.com/pustladi/Windows-2000/blob/661d000d50637ed6fab2329d30e31775046588a9/private/net/sockets/winsock2/wsp/msafd/select.c#L59-L655
    311     //
    312     // And here's an interesting discussion: https://github.com/python-trio/trio/issues/52
    313     //
    314     // TODO(someday): Implement this with IOCTL_AFD_POLL. For now I'm leaving it unimplemented
    315     //   because I added this method for a Linux-only use case.
    316     return NEVER_DONE;
    317   }
    318 
    319   void shutdownWrite() override {
    320     // There's no legitimate way to get an AsyncStreamFd that isn't a socket through the
    321     // Win32AsyncIoProvider interface.
    322     KJ_WINSOCK(shutdown(fd, SD_SEND));
    323   }
    324 
    325   void abortRead() override {
    326     // There's no legitimate way to get an AsyncStreamFd that isn't a socket through the
    327     // Win32AsyncIoProvider interface.
    328     KJ_WINSOCK(shutdown(fd, SD_RECEIVE));
    329   }
    330 
    331   void getsockopt(int level, int option, void* value, uint* length) override {
    332     socklen_t socklen = *length;
    333     KJ_WINSOCK(::getsockopt(fd, level, option,
    334                             reinterpret_cast<char*>(value), &socklen));
    335     *length = socklen;
    336   }
    337 
    338   void setsockopt(int level, int option, const void* value, uint length) override {
    339     KJ_WINSOCK(::setsockopt(fd, level, option,
    340                             reinterpret_cast<const char*>(value), length));
    341   }
    342 
    343   void getsockname(struct sockaddr* addr, uint* length) override {
    344     socklen_t socklen = *length;
    345     KJ_WINSOCK(::getsockname(fd, addr, &socklen));
    346     *length = socklen;
    347   }
    348 
    349   void getpeername(struct sockaddr* addr, uint* length) override {
    350     socklen_t socklen = *length;
    351     KJ_WINSOCK(::getpeername(fd, addr, &socklen));
    352     *length = socklen;
    353   }
    354 
    355 private:
    356   Own<Win32EventPort::IoObserver> observer;
    357 
    358   Promise<size_t> tryReadInternal(ArrayPtr<WSABUF> bufs, size_t minBytes, size_t alreadyRead) {
    359     // `bufs` will remain valid until the promise completes and may be freely modified.
    360     //
    361     // `alreadyRead` is the number of bytes we have already received via previous reads -- minBytes
    362     // and buffer have already been adjusted to account for them, but this count must be included
    363     // in the final return value.
    364 
    365     auto op = observer->newOperation(0);
    366 
    367     DWORD flags = 0;
    368     if (WSARecv(fd, bufs.begin(), bufs.size(), NULL, &flags,
    369                 op->getOverlapped(), NULL) == SOCKET_ERROR) {
    370       DWORD error = WSAGetLastError();
    371       if (error != WSA_IO_PENDING) {
    372         KJ_FAIL_WIN32("WSARecv()", error) { break; }
    373         return alreadyRead;
    374       }
    375     }
    376 
    377     return op->onComplete()
    378         .then([this,KJ_CPCAP(bufs),minBytes,alreadyRead](Win32IocpEventPort::IoResult result) mutable
    379               -> Promise<size_t> {
    380       if (result.errorCode != ERROR_SUCCESS) {
    381         if (alreadyRead > 0) {
    382           // Report what we already read.
    383           return alreadyRead;
    384         } else {
    385           KJ_FAIL_WIN32("WSARecv()", result.errorCode) { break; }
    386           return size_t(0);
    387         }
    388       }
    389 
    390       if (result.bytesTransferred == 0) {
    391         return alreadyRead;
    392       }
    393 
    394       alreadyRead += result.bytesTransferred;
    395       if (result.bytesTransferred >= minBytes) {
    396         // We can stop here.
    397         return alreadyRead;
    398       }
    399       minBytes -= result.bytesTransferred;
    400 
    401       while (result.bytesTransferred >= bufs[0].len) {
    402         result.bytesTransferred -= bufs[0].len;
    403         bufs = bufs.slice(1, bufs.size());
    404       }
    405 
    406       if (result.bytesTransferred > 0) {
    407         bufs[0].buf += result.bytesTransferred;
    408         bufs[0].len -= result.bytesTransferred;
    409       }
    410 
    411       return tryReadInternal(bufs, minBytes, alreadyRead);
    412     }).attach(kj::mv(bufs));
    413   }
    414 
    415   Promise<void> writeInternal(ArrayPtr<WSABUF> bufs) {
    416     // `bufs` will remain valid until the promise completes and may be freely modified.
    417 
    418     auto op = observer->newOperation(0);
    419 
    420     if (WSASend(fd, bufs.begin(), bufs.size(), NULL, 0,
    421                 op->getOverlapped(), NULL) == SOCKET_ERROR) {
    422       DWORD error = WSAGetLastError();
    423       if (error != WSA_IO_PENDING) {
    424         KJ_FAIL_WIN32("WSASend()", error) { break; }
    425         return kj::READY_NOW;
    426       }
    427     }
    428 
    429     return op->onComplete()
    430         .then([this,KJ_CPCAP(bufs)](Win32IocpEventPort::IoResult result) mutable -> Promise<void> {
    431       if (result.errorCode != ERROR_SUCCESS) {
    432         KJ_FAIL_WIN32("WSASend()", result.errorCode) { break; }
    433         return kj::READY_NOW;
    434       }
    435 
    436       while (bufs.size() > 0 && result.bytesTransferred >= bufs[0].len) {
    437         result.bytesTransferred -= bufs[0].len;
    438         bufs = bufs.slice(1, bufs.size());
    439       }
    440 
    441       if (result.bytesTransferred > 0) {
    442         bufs[0].buf += result.bytesTransferred;
    443         bufs[0].len -= result.bytesTransferred;
    444       }
    445 
    446       if (bufs.size() > 0) {
    447         return writeInternal(bufs);
    448       } else {
    449         return kj::READY_NOW;
    450       }
    451     }).attach(kj::mv(bufs));
    452   }
    453 };
    454 
    455 // =======================================================================================
    456 
    457 class SocketAddress {
    458 public:
    459   SocketAddress(const void* sockaddr, uint len): addrlen(len) {
    460     KJ_REQUIRE(len <= sizeof(addr), "Sorry, your sockaddr is too big for me.");
    461     memcpy(&addr.generic, sockaddr, len);
    462   }
    463 
    464   bool operator<(const SocketAddress& other) const {
    465     // So we can use std::set<SocketAddress>...  see DNS lookup code.
    466 
    467     if (wildcard < other.wildcard) return true;
    468     if (wildcard > other.wildcard) return false;
    469 
    470     if (addrlen < other.addrlen) return true;
    471     if (addrlen > other.addrlen) return false;
    472 
    473     return memcmp(&addr.generic, &other.addr.generic, addrlen) < 0;
    474   }
    475 
    476   const struct sockaddr* getRaw() const { return &addr.generic; }
    477   int getRawSize() const { return addrlen; }
    478 
    479   SOCKET socket(int type) const {
    480     bool isStream = type == SOCK_STREAM;
    481 
    482     SOCKET result = ::socket(addr.generic.sa_family, type, 0);
    483 
    484     if (result == INVALID_SOCKET) {
    485       KJ_FAIL_WIN32("WSASocket()", WSAGetLastError()) { return INVALID_SOCKET; }
    486     }
    487 
    488     if (isStream && (addr.generic.sa_family == AF_INET ||
    489                      addr.generic.sa_family == AF_INET6)) {
    490       // TODO(perf):  As a hack for the 0.4 release we are always setting
    491       //   TCP_NODELAY because Nagle's algorithm pretty much kills Cap'n Proto's
    492       //   RPC protocol.  Later, we should extend the interface to provide more
    493       //   control over this.  Perhaps write() should have a flag which
    494       //   specifies whether to pass MSG_MORE.
    495       BOOL one = TRUE;
    496       KJ_WINSOCK(setsockopt(result, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(one)));
    497     }
    498 
    499     return result;
    500   }
    501 
    502   void bind(SOCKET sockfd) const {
    503     if (wildcard) {
    504       // Disable IPV6_V6ONLY because we want to handle both ipv4 and ipv6 on this socket.  (The
    505       // default value of this option varies across platforms.)
    506       DWORD value = 0;
    507       KJ_WINSOCK(setsockopt(sockfd, IPPROTO_IPV6, IPV6_V6ONLY,
    508                             reinterpret_cast<char*>(&value), sizeof(value)));
    509     }
    510 
    511     KJ_WINSOCK(::bind(sockfd, &addr.generic, addrlen), toString());
    512   }
    513 
    514   uint getPort() const {
    515     switch (addr.generic.sa_family) {
    516       case AF_INET: return ntohs(addr.inet4.sin_port);
    517       case AF_INET6: return ntohs(addr.inet6.sin6_port);
    518       default: return 0;
    519     }
    520   }
    521 
    522   String toString() const {
    523     if (wildcard) {
    524       return str("*:", getPort());
    525     }
    526 
    527     switch (addr.generic.sa_family) {
    528       case AF_INET: {
    529         char buffer[16];
    530         if (InetNtopA(addr.inet4.sin_family, const_cast<struct in_addr*>(&addr.inet4.sin_addr),
    531                       buffer, sizeof(buffer)) == nullptr) {
    532           KJ_FAIL_WIN32("InetNtop", WSAGetLastError()) { break; }
    533           return heapString("(inet_ntop error)");
    534         }
    535         return str(buffer, ':', ntohs(addr.inet4.sin_port));
    536       }
    537       case AF_INET6: {
    538         char buffer[46];
    539         if (InetNtopA(addr.inet6.sin6_family, const_cast<struct in6_addr*>(&addr.inet6.sin6_addr),
    540                       buffer, sizeof(buffer)) == nullptr) {
    541           KJ_FAIL_WIN32("InetNtop", WSAGetLastError()) { break; }
    542           return heapString("(inet_ntop error)");
    543         }
    544         return str('[', buffer, "]:", ntohs(addr.inet6.sin6_port));
    545       }
    546       default:
    547         return str("(unknown address family ", addr.generic.sa_family, ")");
    548     }
    549   }
    550 
    551   static Promise<Array<SocketAddress>> lookupHost(
    552       LowLevelAsyncIoProvider& lowLevel, kj::String host, kj::String service, uint portHint,
    553       _::NetworkFilter& filter);
    554   // Perform a DNS lookup.
    555 
    556   static Promise<Array<SocketAddress>> parse(
    557       LowLevelAsyncIoProvider& lowLevel, StringPtr str, uint portHint, _::NetworkFilter& filter) {
    558     // TODO(someday):  Allow commas in `str`.
    559 
    560     SocketAddress result;
    561 
    562     // Try to separate the address and port.
    563     ArrayPtr<const char> addrPart;
    564     Maybe<StringPtr> portPart;
    565 
    566     int af;
    567 
    568     if (str.startsWith("[")) {
    569       // Address starts with a bracket, which is a common way to write an ip6 address with a port,
    570       // since without brackets around the address part, the port looks like another segment of
    571       // the address.
    572       af = AF_INET6;
    573       size_t closeBracket = KJ_ASSERT_NONNULL(str.findLast(']'),
    574           "Unclosed '[' in address string.", str);
    575 
    576       addrPart = str.slice(1, closeBracket);
    577       if (str.size() > closeBracket + 1) {
    578         KJ_REQUIRE(str.slice(closeBracket + 1).startsWith(":"),
    579                    "Expected port suffix after ']'.", str);
    580         portPart = str.slice(closeBracket + 2);
    581       }
    582     } else {
    583       KJ_IF_MAYBE(colon, str.findFirst(':')) {
    584         if (str.slice(*colon + 1).findFirst(':') == nullptr) {
    585           // There is exactly one colon and no brackets, so it must be an ip4 address with port.
    586           af = AF_INET;
    587           addrPart = str.slice(0, *colon);
    588           portPart = str.slice(*colon + 1);
    589         } else {
    590           // There are two or more colons and no brackets, so the whole thing must be an ip6
    591           // address with no port.
    592           af = AF_INET6;
    593           addrPart = str;
    594         }
    595       } else {
    596         // No colons, so it must be an ip4 address without port.
    597         af = AF_INET;
    598         addrPart = str;
    599       }
    600     }
    601 
    602     // Parse the port.
    603     unsigned long port;
    604     KJ_IF_MAYBE(portText, portPart) {
    605       char* endptr;
    606       port = strtoul(portText->cStr(), &endptr, 0);
    607       if (portText->size() == 0 || *endptr != '\0') {
    608         // Not a number.  Maybe it's a service name.  Fall back to DNS.
    609         return lookupHost(lowLevel, kj::heapString(addrPart), kj::heapString(*portText), portHint,
    610                           filter);
    611       }
    612       KJ_REQUIRE(port < 65536, "Port number too large.");
    613     } else {
    614       port = portHint;
    615     }
    616 
    617     // Check for wildcard.
    618     if (addrPart.size() == 1 && addrPart[0] == '*') {
    619       result.wildcard = true;
    620       // Create an ip6 socket and set IPV6_V6ONLY to 0 later.
    621       result.addrlen = sizeof(addr.inet6);
    622       result.addr.inet6.sin6_family = AF_INET6;
    623       result.addr.inet6.sin6_port = htons(port);
    624       auto array = kj::heapArrayBuilder<SocketAddress>(1);
    625       array.add(result);
    626       return array.finish();
    627     }
    628 
    629     void* addrTarget;
    630     if (af == AF_INET6) {
    631       result.addrlen = sizeof(addr.inet6);
    632       result.addr.inet6.sin6_family = AF_INET6;
    633       result.addr.inet6.sin6_port = htons(port);
    634       addrTarget = &result.addr.inet6.sin6_addr;
    635     } else {
    636       result.addrlen = sizeof(addr.inet4);
    637       result.addr.inet4.sin_family = AF_INET;
    638       result.addr.inet4.sin_port = htons(port);
    639       addrTarget = &result.addr.inet4.sin_addr;
    640     }
    641 
    642     char buffer[64];
    643     if (addrPart.size() < sizeof(buffer) - 1) {
    644       // addrPart is not necessarily NUL-terminated so we have to make a copy.  :(
    645       memcpy(buffer, addrPart.begin(), addrPart.size());
    646       buffer[addrPart.size()] = '\0';
    647 
    648       // OK, parse it!
    649       switch (InetPtonA(af, buffer, addrTarget)) {
    650         case 1: {
    651           // success.
    652           if (!result.parseAllowedBy(filter)) {
    653             KJ_FAIL_REQUIRE("address family blocked by restrictPeers()");
    654             return Array<SocketAddress>();
    655           }
    656 
    657           auto array = kj::heapArrayBuilder<SocketAddress>(1);
    658           array.add(result);
    659           return array.finish();
    660         }
    661         case 0:
    662           // It's apparently not a simple address...  fall back to DNS.
    663           break;
    664         default:
    665           KJ_FAIL_WIN32("InetPton", WSAGetLastError(), af, addrPart);
    666       }
    667     }
    668 
    669     return lookupHost(lowLevel, kj::heapString(addrPart), nullptr, port, filter);
    670   }
    671 
    672   static SocketAddress getLocalAddress(SOCKET sockfd) {
    673     SocketAddress result;
    674     result.addrlen = sizeof(addr);
    675     KJ_WINSOCK(getsockname(sockfd, &result.addr.generic, &result.addrlen));
    676     return result;
    677   }
    678 
    679   static SocketAddress getPeerAddress(SOCKET sockfd) {
    680     SocketAddress result;
    681     result.addrlen = sizeof(addr);
    682     KJ_WINSOCK(getpeername(sockfd, &result.addr.generic, &result.addrlen));
    683     return result;
    684   }
    685 
    686   bool allowedBy(LowLevelAsyncIoProvider::NetworkFilter& filter) {
    687     return filter.shouldAllow(&addr.generic, addrlen);
    688   }
    689 
    690   bool parseAllowedBy(_::NetworkFilter& filter) {
    691     return filter.shouldAllowParse(&addr.generic, addrlen);
    692   }
    693 
    694   static SocketAddress getWildcardForFamily(int family) {
    695     SocketAddress result;
    696     switch (family) {
    697       case AF_INET:
    698         result.addrlen = sizeof(addr.inet4);
    699         result.addr.inet4.sin_family = AF_INET;
    700         return result;
    701       case AF_INET6:
    702         result.addrlen = sizeof(addr.inet6);
    703         result.addr.inet6.sin6_family = AF_INET6;
    704         return result;
    705       default:
    706         KJ_FAIL_REQUIRE("unknown address family", family);
    707     }
    708   }
    709 
    710 private:
    711   SocketAddress(): addrlen(0) {
    712     memset(&addr, 0, sizeof(addr));
    713   }
    714 
    715   socklen_t addrlen;
    716   bool wildcard = false;
    717   union {
    718     struct sockaddr generic;
    719     struct sockaddr_in inet4;
    720     struct sockaddr_in6 inet6;
    721     struct sockaddr_storage storage;
    722   } addr;
    723 
    724   struct LookupParams;
    725   class LookupReader;
    726 };
    727 
    728 class SocketAddress::LookupReader {
    729   // Reads SocketAddresses off of a pipe coming from another thread that is performing
    730   // getaddrinfo.
    731 
    732 public:
    733   LookupReader(kj::Own<Thread>&& thread, kj::Own<AsyncInputStream>&& input,
    734                _::NetworkFilter& filter)
    735       : thread(kj::mv(thread)), input(kj::mv(input)), filter(filter) {}
    736 
    737   ~LookupReader() {
    738     if (thread) thread->detach();
    739   }
    740 
    741   Promise<Array<SocketAddress>> read() {
    742     return input->tryRead(&current, sizeof(current), sizeof(current)).then(
    743         [this](size_t n) -> Promise<Array<SocketAddress>> {
    744       if (n < sizeof(current)) {
    745         thread = nullptr;
    746         // getaddrinfo()'s docs seem to say it will never return an empty list, but let's check
    747         // anyway.
    748         KJ_REQUIRE(addresses.size() > 0, "DNS lookup returned no permitted addresses.") { break; }
    749         return addresses.releaseAsArray();
    750       } else {
    751         // getaddrinfo() can return multiple copies of the same address for several reasons.
    752         // A major one is that we don't give it a socket type (SOCK_STREAM vs. SOCK_DGRAM), so
    753         // it may return two copies of the same address, one for each type, unless it explicitly
    754         // knows that the service name given is specific to one type.  But we can't tell it a type,
    755         // because we don't actually know which one the user wants, and if we specify SOCK_STREAM
    756         // while the user specified a UDP service name then they'll get a resolution error which
    757         // is lame.  (At least, I think that's how it works.)
    758         //
    759         // So we instead resort to de-duping results.
    760         if (alreadySeen.insert(current).second) {
    761           if (current.parseAllowedBy(filter)) {
    762             addresses.add(current);
    763           }
    764         }
    765         return read();
    766       }
    767     });
    768   }
    769 
    770 private:
    771   kj::Own<Thread> thread;
    772   kj::Own<AsyncInputStream> input;
    773   _::NetworkFilter& filter;
    774   SocketAddress current;
    775   kj::Vector<SocketAddress> addresses;
    776   std::set<SocketAddress> alreadySeen;
    777 };
    778 
    779 struct SocketAddress::LookupParams {
    780   kj::String host;
    781   kj::String service;
    782 };
    783 
    784 Promise<Array<SocketAddress>> SocketAddress::lookupHost(
    785     LowLevelAsyncIoProvider& lowLevel, kj::String host, kj::String service, uint portHint,
    786     _::NetworkFilter& filter) {
    787   // This shitty function spawns a thread to run getaddrinfo().  Unfortunately, getaddrinfo() is
    788   // the only cross-platform DNS API and it is blocking.
    789   //
    790   // TODO(perf): Use GetAddrInfoEx(). But there are problems:
    791   // - Not implemented in Wine.
    792   // - Doesn't seem compatible with I/O completion ports, in particular because it's not associated
    793   //   with a handle. Could signal completion as an APC instead, but that requires the IOCP code
    794   //   to use GetQueuedCompletionStatusEx() which it doesn't right now because it's not available
    795   //   in Wine.
    796   // - Requires Unicode, for some reason. Only GetAddrInfoExW() supports async, according to the
    797   //   docs. Never mind that DNS itself is ASCII...
    798 
    799   SOCKET fds[2];
    800   KJ_WINSOCK(_::win32Socketpair(fds));
    801 
    802   auto input = lowLevel.wrapInputFd(fds[0], NEW_FD_FLAGS);
    803 
    804   int outFd = fds[1];
    805 
    806   LookupParams params = { kj::mv(host), kj::mv(service) };
    807 
    808   auto thread = heap<Thread>(kj::mvCapture(params, [outFd,portHint](LookupParams&& params) {
    809     KJ_DEFER(closesocket(outFd));
    810 
    811     struct addrinfo* list;
    812     int status = getaddrinfo(
    813         params.host == "*" ? nullptr : params.host.cStr(),
    814         params.service == nullptr ? nullptr : params.service.cStr(),
    815         nullptr, &list);
    816     if (status == 0) {
    817       KJ_DEFER(freeaddrinfo(list));
    818 
    819       struct addrinfo* cur = list;
    820       while (cur != nullptr) {
    821         if (params.service == nullptr) {
    822           switch (cur->ai_addr->sa_family) {
    823             case AF_INET:
    824               ((struct sockaddr_in*)cur->ai_addr)->sin_port = htons(portHint);
    825               break;
    826             case AF_INET6:
    827               ((struct sockaddr_in6*)cur->ai_addr)->sin6_port = htons(portHint);
    828               break;
    829             default:
    830               break;
    831           }
    832         }
    833 
    834         SocketAddress addr;
    835         memset(&addr, 0, sizeof(addr));  // mollify valgrind
    836         if (params.host == "*") {
    837           // Set up a wildcard SocketAddress.  Only use the port number returned by getaddrinfo().
    838           addr.wildcard = true;
    839           addr.addrlen = sizeof(addr.addr.inet6);
    840           addr.addr.inet6.sin6_family = AF_INET6;
    841           switch (cur->ai_addr->sa_family) {
    842             case AF_INET:
    843               addr.addr.inet6.sin6_port = ((struct sockaddr_in*)cur->ai_addr)->sin_port;
    844               break;
    845             case AF_INET6:
    846               addr.addr.inet6.sin6_port = ((struct sockaddr_in6*)cur->ai_addr)->sin6_port;
    847               break;
    848             default:
    849               addr.addr.inet6.sin6_port = portHint;
    850               break;
    851           }
    852         } else {
    853           addr.addrlen = cur->ai_addrlen;
    854           memcpy(&addr.addr.generic, cur->ai_addr, cur->ai_addrlen);
    855         }
    856         KJ_ASSERT_CAN_MEMCPY(SocketAddress);
    857 
    858         const char* data = reinterpret_cast<const char*>(&addr);
    859         size_t size = sizeof(addr);
    860         while (size > 0) {
    861           int n;
    862           KJ_WINSOCK(n = send(outFd, data, size, 0));
    863           data += n;
    864           size -= n;
    865         }
    866 
    867         cur = cur->ai_next;
    868       }
    869     } else {
    870       KJ_FAIL_WIN32("getaddrinfo()", status, params.host, params.service) {
    871         return;
    872       }
    873     }
    874   }));
    875 
    876   auto reader = heap<LookupReader>(kj::mv(thread), kj::mv(input), filter);
    877   return reader->read().attach(kj::mv(reader));
    878 }
    879 
    880 // =======================================================================================
    881 
    882 class FdConnectionReceiver final: public ConnectionReceiver, public OwnedFd {
    883 public:
    884   FdConnectionReceiver(Win32EventPort& eventPort, SOCKET fd,
    885                        LowLevelAsyncIoProvider::NetworkFilter& filter, uint flags)
    886       : OwnedFd(fd, flags), eventPort(eventPort), filter(filter),
    887         observer(eventPort.observeIo(reinterpret_cast<HANDLE>(fd))),
    888         address(SocketAddress::getLocalAddress(fd)) {
    889     // In order to accept asynchronously, we need the AcceptEx() function. Apparently, we have
    890     // to query the socket for it dynamically, I guess because of the insanity in which winsock
    891     // can be implemented in userspace and old implementations may not support it.
    892     GUID guid = WSAID_ACCEPTEX;
    893     DWORD n = 0;
    894     KJ_WINSOCK(WSAIoctl(fd, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
    895                         &acceptEx, sizeof(acceptEx), &n, NULL, NULL)) {
    896       acceptEx = nullptr;
    897       return;
    898     }
    899   }
    900 
    901   Promise<Own<AsyncIoStream>> accept() override {
    902     SOCKET newFd = address.socket(SOCK_STREAM);
    903     KJ_ASSERT(newFd != INVALID_SOCKET);
    904     auto result = heap<AsyncStreamFd>(eventPort, newFd, NEW_FD_FLAGS);
    905 
    906     auto scratch = heapArray<byte>(256);
    907     DWORD dummy;
    908     auto op = observer->newOperation(0);
    909     if (!acceptEx(fd, newFd, scratch.begin(), 0, 128, 128, &dummy, op->getOverlapped())) {
    910       DWORD error = WSAGetLastError();
    911       if (error != ERROR_IO_PENDING) {
    912         KJ_FAIL_WIN32("AcceptEx()", error) { break; }
    913         return Own<AsyncIoStream>(kj::mv(result));  // dummy, won't be used
    914       }
    915     }
    916 
    917     return op->onComplete().then(mvCapture(result, mvCapture(scratch,
    918         [this,newFd]
    919         (Array<byte> scratch, Own<AsyncIoStream> stream, Win32EventPort::IoResult ioResult)
    920         -> Promise<Own<AsyncIoStream>> {
    921       if (ioResult.errorCode != ERROR_SUCCESS) {
    922         KJ_FAIL_WIN32("AcceptEx()", ioResult.errorCode) { break; }
    923       } else {
    924         SOCKET me = fd;
    925         stream->setsockopt(SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
    926                            reinterpret_cast<char*>(&me), sizeof(me));
    927       }
    928 
    929       // Supposedly, AcceptEx() places the local and peer addresses into the buffer (which we've
    930       // named `scratch`). However, the format in which it writes these is undocumented, and
    931       // doesn't even match between native Windows and WINE. Apparently it is useless. I don't know
    932       // why they require the buffer to have space for it in the first place. We'll need to call
    933       // getpeername() to get the address.
    934       auto addr = SocketAddress::getPeerAddress(newFd);
    935       if (addr.allowedBy(filter)) {
    936         return kj::mv(stream);
    937       } else {
    938         return accept();
    939       }
    940     })));
    941   }
    942 
    943   uint getPort() override {
    944     return address.getPort();
    945   }
    946 
    947   void getsockopt(int level, int option, void* value, uint* length) override {
    948     socklen_t socklen = *length;
    949     KJ_WINSOCK(::getsockopt(fd, level, option,
    950                             reinterpret_cast<char*>(value), &socklen));
    951     *length = socklen;
    952   }
    953   void setsockopt(int level, int option, const void* value, uint length) override {
    954     KJ_WINSOCK(::setsockopt(fd, level, option,
    955                             reinterpret_cast<const char*>(value), length));
    956   }
    957   void getsockname(struct sockaddr* addr, uint* length) override {
    958     socklen_t socklen = *length;
    959     KJ_WINSOCK(::getsockname(fd, addr, &socklen));
    960     *length = socklen;
    961   }
    962 
    963 public:
    964   Win32EventPort& eventPort;
    965   LowLevelAsyncIoProvider::NetworkFilter& filter;
    966   Own<Win32EventPort::IoObserver> observer;
    967   LPFN_ACCEPTEX acceptEx = nullptr;
    968   SocketAddress address;
    969 };
    970 
    971 // TODO(someday): DatagramPortImpl
    972 
    973 class LowLevelAsyncIoProviderImpl final: public LowLevelAsyncIoProvider {
    974 public:
    975   LowLevelAsyncIoProviderImpl()
    976       : eventLoop(eventPort), waitScope(eventLoop) {}
    977 
    978   inline WaitScope& getWaitScope() { return waitScope; }
    979 
    980   Own<AsyncInputStream> wrapInputFd(SOCKET fd, uint flags = 0) override {
    981     return heap<AsyncStreamFd>(eventPort, fd, flags);
    982   }
    983   Own<AsyncOutputStream> wrapOutputFd(SOCKET fd, uint flags = 0) override {
    984     return heap<AsyncStreamFd>(eventPort, fd, flags);
    985   }
    986   Own<AsyncIoStream> wrapSocketFd(SOCKET fd, uint flags = 0) override {
    987     return heap<AsyncStreamFd>(eventPort, fd, flags);
    988   }
    989   Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(
    990       SOCKET fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) override {
    991     auto result = heap<AsyncStreamFd>(eventPort, fd, flags);
    992 
    993     // ConnectEx requires that the socket be bound, for some reason. Bind to an arbitrary port.
    994     SocketAddress::getWildcardForFamily(addr->sa_family).bind(fd);
    995 
    996     auto connected = result->connect(addr, addrlen);
    997     return connected.then(kj::mvCapture(result, [](Own<AsyncIoStream>&& result) {
    998       return kj::mv(result);
    999     }));
   1000   }
   1001   Own<ConnectionReceiver> wrapListenSocketFd(
   1002       SOCKET fd, NetworkFilter& filter, uint flags = 0) override {
   1003     return heap<FdConnectionReceiver>(eventPort, fd, filter, flags);
   1004   }
   1005 
   1006   Timer& getTimer() override { return eventPort.getTimer(); }
   1007 
   1008   Win32EventPort& getEventPort() { return eventPort; }
   1009 
   1010 private:
   1011   Win32IocpEventPort eventPort;
   1012   EventLoop eventLoop;
   1013   WaitScope waitScope;
   1014 };
   1015 
   1016 // =======================================================================================
   1017 
   1018 class NetworkAddressImpl final: public NetworkAddress {
   1019 public:
   1020   NetworkAddressImpl(LowLevelAsyncIoProvider& lowLevel,
   1021                      LowLevelAsyncIoProvider::NetworkFilter& filter,
   1022                      Array<SocketAddress> addrs)
   1023       : lowLevel(lowLevel), filter(filter), addrs(kj::mv(addrs)) {}
   1024 
   1025   Promise<Own<AsyncIoStream>> connect() override {
   1026     auto addrsCopy = heapArray(addrs.asPtr());
   1027     auto promise = connectImpl(lowLevel, filter, addrsCopy);
   1028     return promise.attach(kj::mv(addrsCopy));
   1029   }
   1030 
   1031   Own<ConnectionReceiver> listen() override {
   1032     if (addrs.size() > 1) {
   1033       KJ_LOG(WARNING, "Bind address resolved to multiple addresses.  Only the first address will "
   1034           "be used.  If this is incorrect, specify the address numerically.  This may be fixed "
   1035           "in the future.", addrs[0].toString());
   1036     }
   1037 
   1038     int fd = addrs[0].socket(SOCK_STREAM);
   1039 
   1040     {
   1041       KJ_ON_SCOPE_FAILURE(closesocket(fd));
   1042 
   1043       // We always enable SO_REUSEADDR because having to take your server down for five minutes
   1044       // before it can restart really sucks.
   1045       int optval = 1;
   1046       KJ_WINSOCK(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
   1047                             reinterpret_cast<char*>(&optval), sizeof(optval)));
   1048 
   1049       addrs[0].bind(fd);
   1050 
   1051       // TODO(someday):  Let queue size be specified explicitly in string addresses.
   1052       KJ_WINSOCK(::listen(fd, SOMAXCONN));
   1053     }
   1054 
   1055     return lowLevel.wrapListenSocketFd(fd, filter, NEW_FD_FLAGS);
   1056   }
   1057 
   1058   Own<DatagramPort> bindDatagramPort() override {
   1059     if (addrs.size() > 1) {
   1060       KJ_LOG(WARNING, "Bind address resolved to multiple addresses.  Only the first address will "
   1061           "be used.  If this is incorrect, specify the address numerically.  This may be fixed "
   1062           "in the future.", addrs[0].toString());
   1063     }
   1064 
   1065     int fd = addrs[0].socket(SOCK_DGRAM);
   1066 
   1067     {
   1068       KJ_ON_SCOPE_FAILURE(closesocket(fd));
   1069 
   1070       // We always enable SO_REUSEADDR because having to take your server down for five minutes
   1071       // before it can restart really sucks.
   1072       int optval = 1;
   1073       KJ_WINSOCK(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
   1074                             reinterpret_cast<char*>(&optval), sizeof(optval)));
   1075 
   1076       addrs[0].bind(fd);
   1077     }
   1078 
   1079     return lowLevel.wrapDatagramSocketFd(fd, filter, NEW_FD_FLAGS);
   1080   }
   1081 
   1082   Own<NetworkAddress> clone() override {
   1083     return kj::heap<NetworkAddressImpl>(lowLevel, filter, kj::heapArray(addrs.asPtr()));
   1084   }
   1085 
   1086   String toString() override {
   1087     return strArray(KJ_MAP(addr, addrs) { return addr.toString(); }, ",");
   1088   }
   1089 
   1090   const SocketAddress& chooseOneAddress() {
   1091     KJ_REQUIRE(addrs.size() > 0, "No addresses available.");
   1092     return addrs[counter++ % addrs.size()];
   1093   }
   1094 
   1095 private:
   1096   LowLevelAsyncIoProvider& lowLevel;
   1097   LowLevelAsyncIoProvider::NetworkFilter& filter;
   1098   Array<SocketAddress> addrs;
   1099   uint counter = 0;
   1100 
   1101   static Promise<Own<AsyncIoStream>> connectImpl(
   1102       LowLevelAsyncIoProvider& lowLevel,
   1103       LowLevelAsyncIoProvider::NetworkFilter& filter,
   1104       ArrayPtr<SocketAddress> addrs) {
   1105     KJ_ASSERT(addrs.size() > 0);
   1106 
   1107     int fd = addrs[0].socket(SOCK_STREAM);
   1108 
   1109     return kj::evalNow([&]() -> Promise<Own<AsyncIoStream>> {
   1110       if (!addrs[0].allowedBy(filter)) {
   1111         return KJ_EXCEPTION(FAILED, "connect() blocked by restrictPeers()");
   1112       } else {
   1113         return lowLevel.wrapConnectingSocketFd(
   1114             fd, addrs[0].getRaw(), addrs[0].getRawSize(), NEW_FD_FLAGS);
   1115       }
   1116     }).then([](Own<AsyncIoStream>&& stream) -> Promise<Own<AsyncIoStream>> {
   1117       // Success, pass along.
   1118       return kj::mv(stream);
   1119     }, [&lowLevel,&filter,KJ_CPCAP(addrs)](Exception&& exception) mutable
   1120         -> Promise<Own<AsyncIoStream>> {
   1121       // Connect failed.
   1122       if (addrs.size() > 1) {
   1123         // Try the next address instead.
   1124         return connectImpl(lowLevel, filter, addrs.slice(1, addrs.size()));
   1125       } else {
   1126         // No more addresses to try, so propagate the exception.
   1127         return kj::mv(exception);
   1128       }
   1129     });
   1130   }
   1131 };
   1132 
   1133 class SocketNetwork final: public Network {
   1134 public:
   1135   explicit SocketNetwork(LowLevelAsyncIoProvider& lowLevel): lowLevel(lowLevel) {}
   1136   explicit SocketNetwork(SocketNetwork& parent,
   1137                          kj::ArrayPtr<const kj::StringPtr> allow,
   1138                          kj::ArrayPtr<const kj::StringPtr> deny)
   1139       : lowLevel(parent.lowLevel), filter(allow, deny, parent.filter) {}
   1140 
   1141   Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) override {
   1142     return evalLater(mvCapture(heapString(addr), [this,portHint](String&& addr) {
   1143       return SocketAddress::parse(lowLevel, addr, portHint, filter);
   1144     })).then([this](Array<SocketAddress> addresses) -> Own<NetworkAddress> {
   1145       return heap<NetworkAddressImpl>(lowLevel, filter, kj::mv(addresses));
   1146     });
   1147   }
   1148 
   1149   Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) override {
   1150     auto array = kj::heapArrayBuilder<SocketAddress>(1);
   1151     array.add(SocketAddress(sockaddr, len));
   1152     KJ_REQUIRE(array[0].allowedBy(filter), "address blocked by restrictPeers()") { break; }
   1153     return Own<NetworkAddress>(heap<NetworkAddressImpl>(lowLevel, filter, array.finish()));
   1154   }
   1155 
   1156   Own<Network> restrictPeers(
   1157       kj::ArrayPtr<const kj::StringPtr> allow,
   1158       kj::ArrayPtr<const kj::StringPtr> deny = nullptr) override {
   1159     return heap<SocketNetwork>(*this, allow, deny);
   1160   }
   1161 
   1162 private:
   1163   LowLevelAsyncIoProvider& lowLevel;
   1164   _::NetworkFilter filter;
   1165 };
   1166 
   1167 // =======================================================================================
   1168 
   1169 class AsyncIoProviderImpl final: public AsyncIoProvider {
   1170 public:
   1171   AsyncIoProviderImpl(LowLevelAsyncIoProvider& lowLevel)
   1172       : lowLevel(lowLevel), network(lowLevel) {}
   1173 
   1174   OneWayPipe newOneWayPipe() override {
   1175     SOCKET fds[2];
   1176     KJ_WINSOCK(_::win32Socketpair(fds));
   1177     auto in = lowLevel.wrapSocketFd(fds[0], NEW_FD_FLAGS);
   1178     auto out = lowLevel.wrapOutputFd(fds[1], NEW_FD_FLAGS);
   1179     in->shutdownWrite();
   1180     return { kj::mv(in), kj::mv(out) };
   1181   }
   1182 
   1183   TwoWayPipe newTwoWayPipe() override {
   1184     SOCKET fds[2];
   1185     KJ_WINSOCK(_::win32Socketpair(fds));
   1186     return TwoWayPipe { {
   1187       lowLevel.wrapSocketFd(fds[0], NEW_FD_FLAGS),
   1188       lowLevel.wrapSocketFd(fds[1], NEW_FD_FLAGS)
   1189     } };
   1190   }
   1191 
   1192   Network& getNetwork() override {
   1193     return network;
   1194   }
   1195 
   1196   PipeThread newPipeThread(
   1197       Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) override {
   1198     SOCKET fds[2];
   1199     KJ_WINSOCK(_::win32Socketpair(fds));
   1200 
   1201     int threadFd = fds[1];
   1202     KJ_ON_SCOPE_FAILURE(closesocket(threadFd));
   1203 
   1204     auto pipe = lowLevel.wrapSocketFd(fds[0], NEW_FD_FLAGS);
   1205 
   1206     auto thread = heap<Thread>(kj::mvCapture(startFunc,
   1207         [threadFd](Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)>&& startFunc) {
   1208       LowLevelAsyncIoProviderImpl lowLevel;
   1209       auto stream = lowLevel.wrapSocketFd(threadFd, NEW_FD_FLAGS);
   1210       AsyncIoProviderImpl ioProvider(lowLevel);
   1211       startFunc(ioProvider, *stream, lowLevel.getWaitScope());
   1212     }));
   1213 
   1214     return { kj::mv(thread), kj::mv(pipe) };
   1215   }
   1216 
   1217   Timer& getTimer() override { return lowLevel.getTimer(); }
   1218 
   1219 private:
   1220   LowLevelAsyncIoProvider& lowLevel;
   1221   SocketNetwork network;
   1222 };
   1223 
   1224 }  // namespace
   1225 
   1226 Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel) {
   1227   return kj::heap<AsyncIoProviderImpl>(lowLevel);
   1228 }
   1229 
   1230 AsyncIoContext setupAsyncIo() {
   1231   _::initWinsockOnce();
   1232 
   1233   auto lowLevel = heap<LowLevelAsyncIoProviderImpl>();
   1234   auto ioProvider = kj::heap<AsyncIoProviderImpl>(*lowLevel);
   1235   auto& waitScope = lowLevel->getWaitScope();
   1236   auto& eventPort = lowLevel->getEventPort();
   1237   return { kj::mv(lowLevel), kj::mv(ioProvider), waitScope, eventPort };
   1238 }
   1239 
   1240 }  // namespace kj
   1241 
   1242 #endif  // _WIN32