capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

async-io-unix.c++ (67624B)


      1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #if !_WIN32
     23 // For Win32 implementation, see async-io-win32.c++.
     24 
     25 #ifndef _GNU_SOURCE
     26 #define _GNU_SOURCE
     27 #endif
     28 
     29 #include "async-io.h"
     30 #include "async-io-internal.h"
     31 #include "async-unix.h"
     32 #include "debug.h"
     33 #include "thread.h"
     34 #include "io.h"
     35 #include "miniposix.h"
     36 #include <unistd.h>
     37 #include <sys/uio.h>
     38 #include <errno.h>
     39 #include <fcntl.h>
     40 #include <sys/types.h>
     41 #include <sys/socket.h>
     42 #include <sys/un.h>
     43 #include <netinet/in.h>
     44 #include <netinet/tcp.h>
     45 #include <stddef.h>
     46 #include <stdlib.h>
     47 #include <arpa/inet.h>
     48 #include <netdb.h>
     49 #include <set>
     50 #include <poll.h>
     51 #include <limits.h>
     52 #include <sys/ioctl.h>
     53 
     54 #if !defined(SO_PEERCRED) && defined(LOCAL_PEERCRED)
     55 #include <sys/ucred.h>
     56 #endif
     57 
     58 #if !defined(SOL_LOCAL) && (__FreeBSD__ || __DragonflyBSD__)
     59 // On DragonFly or FreeBSD < 12.2 you're supposed to use 0 for SOL_LOCAL.
     60 #define SOL_LOCAL 0
     61 #endif
     62 
     63 namespace kj {
     64 
     65 namespace {
     66 
     67 void setNonblocking(int fd) {
     68 #ifdef FIONBIO
     69   int opt = 1;
     70   KJ_SYSCALL(ioctl(fd, FIONBIO, &opt));
     71 #else
     72   int flags;
     73   KJ_SYSCALL(flags = fcntl(fd, F_GETFL));
     74   if ((flags & O_NONBLOCK) == 0) {
     75     KJ_SYSCALL(fcntl(fd, F_SETFL, flags | O_NONBLOCK));
     76   }
     77 #endif
     78 }
     79 
     80 void setCloseOnExec(int fd) {
     81 #ifdef FIOCLEX
     82   KJ_SYSCALL(ioctl(fd, FIOCLEX));
     83 #else
     84   int flags;
     85   KJ_SYSCALL(flags = fcntl(fd, F_GETFD));
     86   if ((flags & FD_CLOEXEC) == 0) {
     87     KJ_SYSCALL(fcntl(fd, F_SETFD, flags | FD_CLOEXEC));
     88   }
     89 #endif
     90 }
     91 
     92 static constexpr uint NEW_FD_FLAGS =
     93 #if __linux__ && !__BIONIC__
     94     LowLevelAsyncIoProvider::ALREADY_CLOEXEC | LowLevelAsyncIoProvider::ALREADY_NONBLOCK |
     95 #endif
     96     LowLevelAsyncIoProvider::TAKE_OWNERSHIP;
     97 // We always try to open FDs with CLOEXEC and NONBLOCK already set on Linux, but on other platforms
     98 // this is not possible.
     99 
    100 class OwnedFileDescriptor {
    101 public:
    102   OwnedFileDescriptor(int fd, uint flags): fd(fd), flags(flags) {
    103     if (flags & LowLevelAsyncIoProvider::ALREADY_NONBLOCK) {
    104       KJ_DREQUIRE(fcntl(fd, F_GETFL) & O_NONBLOCK, "You claimed you set NONBLOCK, but you didn't.");
    105     } else {
    106       setNonblocking(fd);
    107     }
    108 
    109     if (flags & LowLevelAsyncIoProvider::TAKE_OWNERSHIP) {
    110       if (flags & LowLevelAsyncIoProvider::ALREADY_CLOEXEC) {
    111         KJ_DREQUIRE(fcntl(fd, F_GETFD) & FD_CLOEXEC,
    112                     "You claimed you set CLOEXEC, but you didn't.");
    113       } else {
    114         setCloseOnExec(fd);
    115       }
    116     }
    117   }
    118 
    119   ~OwnedFileDescriptor() noexcept(false) {
    120     // Don't use SYSCALL() here because close() should not be repeated on EINTR.
    121     if ((flags & LowLevelAsyncIoProvider::TAKE_OWNERSHIP) && close(fd) < 0) {
    122       KJ_FAIL_SYSCALL("close", errno, fd) {
    123         // Recoverable exceptions are safe in destructors.
    124         break;
    125       }
    126     }
    127   }
    128 
    129 protected:
    130   const int fd;
    131 
    132 private:
    133   uint flags;
    134 };
    135 
    136 // =======================================================================================
    137 
    138 class AsyncStreamFd: public OwnedFileDescriptor, public AsyncCapabilityStream {
    139 public:
    140   AsyncStreamFd(UnixEventPort& eventPort, int fd, uint flags)
    141       : OwnedFileDescriptor(fd, flags),
    142         eventPort(eventPort),
    143         observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ_WRITE) {}
    144   virtual ~AsyncStreamFd() noexcept(false) {}
    145 
    146   Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
    147     return tryReadInternal(buffer, minBytes, maxBytes, nullptr, 0, {0,0})
    148         .then([](ReadResult r) { return r.byteCount; });
    149   }
    150 
    151   Promise<ReadResult> tryReadWithFds(void* buffer, size_t minBytes, size_t maxBytes,
    152                                      AutoCloseFd* fdBuffer, size_t maxFds) override {
    153     return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, {0,0});
    154   }
    155 
    156   Promise<ReadResult> tryReadWithStreams(
    157       void* buffer, size_t minBytes, size_t maxBytes,
    158       Own<AsyncCapabilityStream>* streamBuffer, size_t maxStreams) override {
    159     auto fdBuffer = kj::heapArray<AutoCloseFd>(maxStreams);
    160     auto promise = tryReadInternal(buffer, minBytes, maxBytes, fdBuffer.begin(), maxStreams, {0,0});
    161 
    162     return promise.then([this, fdBuffer = kj::mv(fdBuffer), streamBuffer]
    163                         (ReadResult result) mutable {
    164       for (auto i: kj::zeroTo(result.capCount)) {
    165         streamBuffer[i] = kj::heap<AsyncStreamFd>(eventPort, fdBuffer[i].release(),
    166             LowLevelAsyncIoProvider::TAKE_OWNERSHIP | LowLevelAsyncIoProvider::ALREADY_CLOEXEC);
    167       }
    168       return result;
    169     });
    170   }
    171 
    172   Promise<void> write(const void* buffer, size_t size) override {
    173     ssize_t n;
    174     KJ_NONBLOCKING_SYSCALL(n = ::write(fd, buffer, size)) {
    175       // Error.
    176 
    177       // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
    178       // a bug that exists in both Clang and GCC:
    179       //   http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
    180       //   http://llvm.org/bugs/show_bug.cgi?id=12286
    181       goto error;
    182     }
    183     if (false) {
    184     error:
    185       return kj::READY_NOW;
    186     }
    187 
    188     if (n < 0) {
    189       // EAGAIN -- need to wait for writability and try again.
    190       return observer.whenBecomesWritable().then([=]() {
    191         return write(buffer, size);
    192       });
    193     } else if (n == size) {
    194       // All done.
    195       return READY_NOW;
    196     } else {
    197       // Fewer than `size` bytes were written, but we CANNOT assume we're out of buffer space, as
    198       // Linux is known to return partial reads/writes when interrupted by a signal -- yes, even
    199       // for non-blocking operations. So, we'll need to write() again now, even though it will
    200       // almost certainly fail with EAGAIN. See comments in the read path for more info.
    201       buffer = reinterpret_cast<const byte*>(buffer) + n;
    202       size -= n;
    203       return write(buffer, size);
    204     }
    205   }
    206 
    207   Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
    208     if (pieces.size() == 0) {
    209       return writeInternal(nullptr, nullptr, nullptr);
    210     } else {
    211       return writeInternal(pieces[0], pieces.slice(1, pieces.size()), nullptr);
    212     }
    213   }
    214 
    215   Promise<void> writeWithFds(ArrayPtr<const byte> data,
    216                              ArrayPtr<const ArrayPtr<const byte>> moreData,
    217                              ArrayPtr<const int> fds) override {
    218     return writeInternal(data, moreData, fds);
    219   }
    220 
    221   Promise<void> writeWithStreams(ArrayPtr<const byte> data,
    222                                  ArrayPtr<const ArrayPtr<const byte>> moreData,
    223                                  Array<Own<AsyncCapabilityStream>> streams) override {
    224     auto fds = KJ_MAP(stream, streams) {
    225       return downcast<AsyncStreamFd>(*stream).fd;
    226     };
    227     auto promise = writeInternal(data, moreData, fds);
    228     return promise.attach(kj::mv(fds), kj::mv(streams));
    229   }
    230 
    231   Promise<void> whenWriteDisconnected() override {
    232     KJ_IF_MAYBE(p, writeDisconnectedPromise) {
    233       return p->addBranch();
    234     } else {
    235       auto fork = observer.whenWriteDisconnected().fork();
    236       auto result = fork.addBranch();
    237       writeDisconnectedPromise = kj::mv(fork);
    238       return kj::mv(result);
    239     }
    240   }
    241 
    242   void shutdownWrite() override {
    243     // There's no legitimate way to get an AsyncStreamFd that isn't a socket through the
    244     // UnixAsyncIoProvider interface.
    245     KJ_SYSCALL(shutdown(fd, SHUT_WR));
    246   }
    247 
    248   void abortRead() override {
    249     // There's no legitimate way to get an AsyncStreamFd that isn't a socket through the
    250     // UnixAsyncIoProvider interface.
    251     KJ_SYSCALL(shutdown(fd, SHUT_RD));
    252   }
    253 
    254   void getsockopt(int level, int option, void* value, uint* length) override {
    255     socklen_t socklen = *length;
    256     KJ_SYSCALL(::getsockopt(fd, level, option, value, &socklen));
    257     *length = socklen;
    258   }
    259 
    260   void setsockopt(int level, int option, const void* value, uint length) override {
    261     KJ_SYSCALL(::setsockopt(fd, level, option, value, length));
    262   }
    263 
    264   void getsockname(struct sockaddr* addr, uint* length) override {
    265     socklen_t socklen = *length;
    266     KJ_SYSCALL(::getsockname(fd, addr, &socklen));
    267     *length = socklen;
    268   }
    269 
    270   void getpeername(struct sockaddr* addr, uint* length) override {
    271     socklen_t socklen = *length;
    272     KJ_SYSCALL(::getpeername(fd, addr, &socklen));
    273     *length = socklen;
    274   }
    275 
    276   kj::Maybe<int> getFd() const override {
    277     return fd;
    278   }
    279 
    280   void registerAncillaryMessageHandler(
    281       kj::Function<void(kj::ArrayPtr<AncillaryMessage>)> fn) override {
    282     ancillaryMsgCallback = kj::mv(fn);
    283   }
    284 
    285   Promise<void> waitConnected() {
    286     // Wait until initial connection has completed. This actually just waits until it is writable.
    287 
    288     // Can't just go directly to writeObserver.whenBecomesWritable() because of edge triggering. We
    289     // need to explicitly check if the socket is already connected.
    290 
    291     struct pollfd pollfd;
    292     memset(&pollfd, 0, sizeof(pollfd));
    293     pollfd.fd = fd;
    294     pollfd.events = POLLOUT;
    295 
    296     int pollResult;
    297     KJ_SYSCALL(pollResult = poll(&pollfd, 1, 0));
    298 
    299     if (pollResult == 0) {
    300       // Not ready yet. We can safely use the edge-triggered observer.
    301       return observer.whenBecomesWritable();
    302     } else {
    303       // Ready now.
    304       return kj::READY_NOW;
    305     }
    306   }
    307 
    308 private:
    309   UnixEventPort& eventPort;
    310   UnixEventPort::FdObserver observer;
    311   Maybe<ForkedPromise<void>> writeDisconnectedPromise;
    312   Maybe<Function<void(ArrayPtr<AncillaryMessage>)>> ancillaryMsgCallback;
    313 
    314   Promise<ReadResult> tryReadInternal(void* buffer, size_t minBytes, size_t maxBytes,
    315                                       AutoCloseFd* fdBuffer, size_t maxFds,
    316                                       ReadResult alreadyRead) {
    317     // `alreadyRead` is the number of bytes we have already received via previous reads -- minBytes,
    318     // maxBytes, and buffer have already been adjusted to account for them, but this count must
    319     // be included in the final return value.
    320 
    321     ssize_t n;
    322     if (maxFds == 0 && ancillaryMsgCallback == nullptr) {
    323       KJ_NONBLOCKING_SYSCALL(n = ::read(fd, buffer, maxBytes)) {
    324         // Error.
    325 
    326         // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
    327         // a bug that exists in both Clang and GCC:
    328         //   http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
    329         //   http://llvm.org/bugs/show_bug.cgi?id=12286
    330         goto error;
    331       }
    332     } else {
    333       struct msghdr msg;
    334       memset(&msg, 0, sizeof(msg));
    335 
    336       struct iovec iov;
    337       memset(&iov, 0, sizeof(iov));
    338       iov.iov_base = buffer;
    339       iov.iov_len = maxBytes;
    340       msg.msg_iov = &iov;
    341       msg.msg_iovlen = 1;
    342 
    343       // Allocate space to receive a cmsg.
    344       size_t msgBytes;
    345       if (ancillaryMsgCallback == nullptr) {
    346 #if __APPLE__ || __FreeBSD__
    347         // Until very recently (late 2018 / early 2019), FreeBSD suffered from a bug in which when
    348         // an SCM_RIGHTS message was truncated on delivery, it would not close the FDs that weren't
    349         // delivered -- they would simply leak: https://bugs.freebsd.org/131876
    350         //
    351         // My testing indicates that MacOS has this same bug as of today (April 2019). I don't know
    352         // if they plan to fix it or are even aware of it.
    353         //
    354         // To handle both cases, we will always provide space to receive 512 FDs. Hopefully, this is
    355         // greater than the maximum number of FDs that these kernels will transmit in one message
    356         // PLUS enough space for any other ancillary messages that could be sent before the
    357         // SCM_RIGHTS message to push it back in the buffer. I couldn't find any firm documentation
    358         // on these limits, though -- I only know that Linux is limited to 253, and I saw a hint in
    359         // a comment in someone else's application that suggested FreeBSD is the same. Hopefully,
    360         // then, this is sufficient to prevent attacks. But if not, there's nothing more we can do;
    361         // it's really up to the kernel to fix this.
    362         msgBytes = CMSG_SPACE(sizeof(int) * 512);
    363 #else
    364         msgBytes = CMSG_SPACE(sizeof(int) * maxFds);
    365 #endif
    366       } else {
    367         // If we want room for ancillary messages instead of or in addition to FDs, just use the
    368         // same amount of cushion as in the MacOS/FreeBSD case above.
    369         // Someday we may want to allow customization here, but there's no immediate use for it.
    370         msgBytes = CMSG_SPACE(sizeof(int) * 512);
    371       }
    372 
    373       // On Linux, CMSG_SPACE will align to a word-size boundary, but on Mac it always aligns to a
    374       // 32-bit boundary. I guess aligning to 32 bits helps avoid the problem where you
    375       // surprisingly end up with space for two file descriptors when you only wanted one. However,
    376       // cmsghdr's preferred alignment is word-size (it contains a size_t). If we stack-allocate
    377       // the buffer, we need to make sure it is aligned properly (maybe not on x64, but maybe on
    378       // other platforms), so we want to allocate an array of words (we use void*). So... we use
    379       // CMSG_SPACE() and then additionally round up to deal with Mac.
    380       size_t msgWords = (msgBytes + sizeof(void*) - 1) / sizeof(void*);
    381       KJ_STACK_ARRAY(void*, cmsgSpace, msgWords, 16, 256);
    382       auto cmsgBytes = cmsgSpace.asBytes();
    383       memset(cmsgBytes.begin(), 0, cmsgBytes.size());
    384       msg.msg_control = cmsgBytes.begin();
    385       msg.msg_controllen = msgBytes;
    386 
    387 #ifdef MSG_CMSG_CLOEXEC
    388       static constexpr int RECVMSG_FLAGS = MSG_CMSG_CLOEXEC;
    389 #else
    390       static constexpr int RECVMSG_FLAGS = 0;
    391 #endif
    392 
    393       KJ_NONBLOCKING_SYSCALL(n = ::recvmsg(fd, &msg, RECVMSG_FLAGS)) {
    394         // Error.
    395 
    396         // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
    397         // a bug that exists in both Clang and GCC:
    398         //   http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
    399         //   http://llvm.org/bugs/show_bug.cgi?id=12286
    400         goto error;
    401       }
    402 
    403       if (n >= 0) {
    404         // Process all messages.
    405         //
    406         // WARNING DANGER: We have to be VERY careful not to miss a file descriptor here, because
    407         // if we do, then that FD will never be closed, and a malicious peer could exploit this to
    408         // fill up our FD table, creating a DoS attack. Some things to keep in mind:
    409         // - CMSG_SPACE() could have rounded up the space for alignment purposes, and this could
    410         //   mean we permitted the kernel to deliver more file descriptors than `maxFds`. We need
    411         //   to close the extras.
    412         // - We can receive multiple ancillary messages at once. In particular, there is also
    413         //   SCM_CREDENTIALS. The sender decides what to send. They could send SCM_CREDENTIALS
    414         //   first followed by SCM_RIGHTS. We need to make sure we see both.
    415         size_t nfds = 0;
    416         size_t spaceLeft = msg.msg_controllen;
    417         Vector<AncillaryMessage> ancillaryMessages;
    418         for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
    419             cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
    420           if (spaceLeft >= CMSG_LEN(0) &&
    421               cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
    422             // Some operating systems (like MacOS) do not adjust csmg_len when the message is
    423             // truncated. We must do so ourselves or risk overrunning the buffer.
    424             auto len = kj::min(cmsg->cmsg_len, spaceLeft);
    425             auto data = arrayPtr(reinterpret_cast<int*>(CMSG_DATA(cmsg)),
    426                                  (len - CMSG_LEN(0)) / sizeof(int));
    427             kj::Vector<kj::AutoCloseFd> trashFds;
    428             for (auto fd: data) {
    429               kj::AutoCloseFd ownFd(fd);
    430               if (nfds < maxFds) {
    431                 fdBuffer[nfds++] = kj::mv(ownFd);
    432               } else {
    433                 trashFds.add(kj::mv(ownFd));
    434               }
    435             }
    436           } else if (spaceLeft >= CMSG_LEN(0) && ancillaryMsgCallback != nullptr) {
    437             auto len = kj::min(cmsg->cmsg_len, spaceLeft);
    438             auto data = ArrayPtr<const byte>(CMSG_DATA(cmsg), len - CMSG_LEN(0));
    439             ancillaryMessages.add(cmsg->cmsg_level, cmsg->cmsg_type, data);
    440           }
    441 
    442           if (spaceLeft >= CMSG_LEN(0) && spaceLeft >= cmsg->cmsg_len) {
    443             spaceLeft -= cmsg->cmsg_len;
    444           } else {
    445             spaceLeft = 0;
    446           }
    447         }
    448 
    449 #ifndef MSG_CMSG_CLOEXEC
    450         for (size_t i = 0; i < nfds; i++) {
    451           setCloseOnExec(fdBuffer[i]);
    452         }
    453 #endif
    454 
    455         if (ancillaryMessages.size() > 0) {
    456           KJ_IF_MAYBE(fn, ancillaryMsgCallback) {
    457             (*fn)(ancillaryMessages.asPtr());
    458           }
    459         }
    460 
    461         alreadyRead.capCount += nfds;
    462         fdBuffer += nfds;
    463         maxFds -= nfds;
    464       }
    465     }
    466 
    467     if (false) {
    468     error:
    469       return alreadyRead;
    470     }
    471 
    472     if (n < 0) {
    473       // Read would block.
    474       return observer.whenBecomesReadable().then([=]() {
    475         return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, alreadyRead);
    476       });
    477     } else if (n == 0) {
    478       // EOF -OR- maxBytes == 0.
    479       return alreadyRead;
    480     } else if (implicitCast<size_t>(n) >= minBytes) {
    481       // We read enough to stop here.
    482       alreadyRead.byteCount += n;
    483       return alreadyRead;
    484     } else {
    485       // The kernel returned fewer bytes than we asked for (and fewer than we need).
    486 
    487       buffer = reinterpret_cast<byte*>(buffer) + n;
    488       minBytes -= n;
    489       maxBytes -= n;
    490       alreadyRead.byteCount += n;
    491 
    492       // According to David Klempner, who works on Stubby at Google, we sadly CANNOT assume that
    493       // we've consumed the whole read buffer here. If a signal is delivered in the middle of a
    494       // read() -- yes, even a non-blocking read -- it can cause the kernel to return a partial
    495       // result, with data still in the buffer.
    496       //     https://bugzilla.kernel.org/show_bug.cgi?id=199131
    497       //     https://twitter.com/CaptainSegfault/status/1112622245531144194
    498       //
    499       // Unfortunately, we have no choice but to issue more read()s until it either tells us EOF
    500       // or EAGAIN. We used to have an optimization here using observer.atEndHint() (when it is
    501       // non-null) to avoid a redundant call to read(). Alas...
    502       return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, alreadyRead);
    503     }
    504   }
    505 
    506   Promise<void> writeInternal(ArrayPtr<const byte> firstPiece,
    507                               ArrayPtr<const ArrayPtr<const byte>> morePieces,
    508                               ArrayPtr<const int> fds) {
    509     const size_t iovmax = kj::miniposix::iovMax();
    510     // If there are more than IOV_MAX pieces, we'll only write the first IOV_MAX for now, and
    511     // then we'll loop later.
    512     KJ_STACK_ARRAY(struct iovec, iov, kj::min(1 + morePieces.size(), iovmax), 16, 128);
    513     size_t iovTotal = 0;
    514 
    515     // writev() interface is not const-correct.  :(
    516     iov[0].iov_base = const_cast<byte*>(firstPiece.begin());
    517     iov[0].iov_len = firstPiece.size();
    518     iovTotal += iov[0].iov_len;
    519     for (uint i = 1; i < iov.size(); i++) {
    520       iov[i].iov_base = const_cast<byte*>(morePieces[i - 1].begin());
    521       iov[i].iov_len = morePieces[i - 1].size();
    522       iovTotal += iov[i].iov_len;
    523     }
    524 
    525     if (iovTotal == 0) {
    526       KJ_REQUIRE(fds.size() == 0, "can't write FDs without bytes");
    527       return kj::READY_NOW;
    528     }
    529 
    530     ssize_t n;
    531     if (fds.size() == 0) {
    532       KJ_NONBLOCKING_SYSCALL(n = ::writev(fd, iov.begin(), iov.size()), iovTotal, iov.size()) {
    533         // Error.
    534 
    535         // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
    536         // a bug that exists in both Clang and GCC:
    537         //   http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
    538         //   http://llvm.org/bugs/show_bug.cgi?id=12286
    539         goto error;
    540       }
    541     } else {
    542       struct msghdr msg;
    543       memset(&msg, 0, sizeof(msg));
    544       msg.msg_iov = iov.begin();
    545       msg.msg_iovlen = iov.size();
    546 
    547       // Allocate space to send a cmsg.
    548       size_t msgBytes = CMSG_SPACE(sizeof(int) * fds.size());
    549       // On Linux, CMSG_SPACE will align to a word-size boundary, but on Mac it always aligns to a
    550       // 32-bit boundary. I guess aligning to 32 bits helps avoid the problem where you
    551       // surprisingly end up with space for two file descriptors when you only wanted one. However,
    552       // cmsghdr's preferred alignment is word-size (it contains a size_t). If we stack-allocate
    553       // the buffer, we need to make sure it is aligned properly (maybe not on x64, but maybe on
    554       // other platforms), so we want to allocate an array of words (we use void*). So... we use
    555       // CMSG_SPACE() and then additionally round up to deal with Mac.
    556       size_t msgWords = (msgBytes + sizeof(void*) - 1) / sizeof(void*);
    557       KJ_STACK_ARRAY(void*, cmsgSpace, msgWords, 16, 256);
    558       auto cmsgBytes = cmsgSpace.asBytes();
    559       memset(cmsgBytes.begin(), 0, cmsgBytes.size());
    560       msg.msg_control = cmsgBytes.begin();
    561       msg.msg_controllen = msgBytes;
    562 
    563       struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
    564       cmsg->cmsg_level = SOL_SOCKET;
    565       cmsg->cmsg_type = SCM_RIGHTS;
    566       cmsg->cmsg_len = CMSG_LEN(sizeof(int) * fds.size());
    567       memcpy(CMSG_DATA(cmsg), fds.begin(), fds.asBytes().size());
    568 
    569       KJ_NONBLOCKING_SYSCALL(n = ::sendmsg(fd, &msg, 0)) {
    570         // Error.
    571 
    572         // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
    573         // a bug that exists in both Clang and GCC:
    574         //   http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
    575         //   http://llvm.org/bugs/show_bug.cgi?id=12286
    576         goto error;
    577       }
    578     }
    579 
    580     if (false) {
    581     error:
    582       return kj::READY_NOW;
    583     }
    584 
    585     if (n < 0) {
    586       // Got EAGAIN. Nothing was written.
    587       return observer.whenBecomesWritable().then([=]() {
    588         return writeInternal(firstPiece, morePieces, fds);
    589       });
    590     } else if (n == 0) {
    591       // Why would a sendmsg() with a non-empty message ever return 0 when writing to a stream
    592       // socket? If there's no room in the send buffer, it should fail with EAGAIN. If the
    593       // connection is closed, it should fail with EPIPE. Various documents and forum posts around
    594       // the internet claim this can happen but no one seems to know when. My guess is it can only
    595       // happen if we try to send an empty message -- which we didn't. So I think this is
    596       // impossible. If it is possible, we need to figure out how to correctly handle it, which
    597       // depends on what caused it.
    598       //
    599       // Note in particular that if 0 is a valid return here, and we sent an SCM_RIGHTS message,
    600       // we need to know whether the message was sent or not, in order to decide whether to retry
    601       // sending it!
    602       KJ_FAIL_ASSERT("non-empty sendmsg() returned 0");
    603     }
    604 
    605     // Non-zero bytes were written. This also implies that *all* FDs were written.
    606 
    607     // Discard all data that was written, then issue a new write for what's left (if any).
    608     for (;;) {
    609       if (n < firstPiece.size()) {
    610         // Only part of the first piece was consumed.  Wait for buffer space and then write again.
    611         firstPiece = firstPiece.slice(n, firstPiece.size());
    612         iovTotal -= n;
    613 
    614         if (iovTotal == 0) {
    615           // Oops, what actually happened is that we hit the IOV_MAX limit. Don't wait.
    616           return writeInternal(firstPiece, morePieces, nullptr);
    617         }
    618 
    619         // As with read(), we cannot assume that a short write() really means the write buffer is
    620         // full (see comments in the read path above). We have to write again.
    621         return writeInternal(firstPiece, morePieces, nullptr);
    622       } else if (morePieces.size() == 0) {
    623         // First piece was fully-consumed and there are no more pieces, so we're done.
    624         KJ_DASSERT(n == firstPiece.size(), n);
    625         return READY_NOW;
    626       } else {
    627         // First piece was fully consumed, so move on to the next piece.
    628         n -= firstPiece.size();
    629         iovTotal -= firstPiece.size();
    630         firstPiece = morePieces[0];
    631         morePieces = morePieces.slice(1, morePieces.size());
    632       }
    633     }
    634   }
    635 };
    636 
    637 // =======================================================================================
    638 
    639 class SocketAddress {
    640 public:
    641   SocketAddress(const void* sockaddr, uint len): addrlen(len) {
    642     KJ_REQUIRE(len <= sizeof(addr), "Sorry, your sockaddr is too big for me.");
    643     memcpy(&addr.generic, sockaddr, len);
    644   }
    645 
    646   bool operator<(const SocketAddress& other) const {
    647     // So we can use std::set<SocketAddress>...  see DNS lookup code.
    648 
    649     if (wildcard < other.wildcard) return true;
    650     if (wildcard > other.wildcard) return false;
    651 
    652     if (addrlen < other.addrlen) return true;
    653     if (addrlen > other.addrlen) return false;
    654 
    655     return memcmp(&addr.generic, &other.addr.generic, addrlen) < 0;
    656   }
    657 
    658   const struct sockaddr* getRaw() const { return &addr.generic; }
    659   socklen_t getRawSize() const { return addrlen; }
    660 
    661   int socket(int type) const {
    662     bool isStream = type == SOCK_STREAM;
    663 
    664     int result;
    665 #if __linux__ && !__BIONIC__
    666     type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
    667 #endif
    668     KJ_SYSCALL(result = ::socket(addr.generic.sa_family, type, 0));
    669 
    670     if (isStream && (addr.generic.sa_family == AF_INET ||
    671                      addr.generic.sa_family == AF_INET6)) {
    672       // TODO(perf):  As a hack for the 0.4 release we are always setting
    673       //   TCP_NODELAY because Nagle's algorithm pretty much kills Cap'n Proto's
    674       //   RPC protocol.  Later, we should extend the interface to provide more
    675       //   control over this.  Perhaps write() should have a flag which
    676       //   specifies whether to pass MSG_MORE.
    677       int one = 1;
    678       KJ_SYSCALL(setsockopt(
    679           result, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(one)));
    680     }
    681 
    682     return result;
    683   }
    684 
    685   void bind(int sockfd) const {
    686 #if !defined(__OpenBSD__)
    687     if (wildcard) {
    688       // Disable IPV6_V6ONLY because we want to handle both ipv4 and ipv6 on this socket.  (The
    689       // default value of this option varies across platforms.)
    690       int value = 0;
    691       KJ_SYSCALL(setsockopt(sockfd, IPPROTO_IPV6, IPV6_V6ONLY, &value, sizeof(value)));
    692     }
    693 #endif
    694 
    695     KJ_SYSCALL(::bind(sockfd, &addr.generic, addrlen), toString());
    696   }
    697 
    698   uint getPort() const {
    699     switch (addr.generic.sa_family) {
    700       case AF_INET: return ntohs(addr.inet4.sin_port);
    701       case AF_INET6: return ntohs(addr.inet6.sin6_port);
    702       default: return 0;
    703     }
    704   }
    705 
    706   String toString() const {
    707     if (wildcard) {
    708       return str("*:", getPort());
    709     }
    710 
    711     switch (addr.generic.sa_family) {
    712       case AF_INET: {
    713         char buffer[INET6_ADDRSTRLEN];
    714         if (inet_ntop(addr.inet4.sin_family, &addr.inet4.sin_addr,
    715                       buffer, sizeof(buffer)) == nullptr) {
    716           KJ_FAIL_SYSCALL("inet_ntop", errno) { break; }
    717           return heapString("(inet_ntop error)");
    718         }
    719         return str(buffer, ':', ntohs(addr.inet4.sin_port));
    720       }
    721       case AF_INET6: {
    722         char buffer[INET6_ADDRSTRLEN];
    723         if (inet_ntop(addr.inet6.sin6_family, &addr.inet6.sin6_addr,
    724                       buffer, sizeof(buffer)) == nullptr) {
    725           KJ_FAIL_SYSCALL("inet_ntop", errno) { break; }
    726           return heapString("(inet_ntop error)");
    727         }
    728         return str('[', buffer, "]:", ntohs(addr.inet6.sin6_port));
    729       }
    730       case AF_UNIX: {
    731         auto path = _::safeUnixPath(&addr.unixDomain, addrlen);
    732         if (path.size() > 0 && path[0] == '\0') {
    733           return str("unix-abstract:", path.slice(1, path.size()));
    734         } else {
    735           return str("unix:", path);
    736         }
    737       }
    738       default:
    739         return str("(unknown address family ", addr.generic.sa_family, ")");
    740     }
    741   }
    742 
    743   static Promise<Array<SocketAddress>> lookupHost(
    744       LowLevelAsyncIoProvider& lowLevel, kj::String host, kj::String service, uint portHint,
    745       _::NetworkFilter& filter);
    746   // Perform a DNS lookup.
    747 
    748   static Promise<Array<SocketAddress>> parse(
    749       LowLevelAsyncIoProvider& lowLevel, StringPtr str, uint portHint, _::NetworkFilter& filter) {
    750     // TODO(someday):  Allow commas in `str`.
    751 
    752     SocketAddress result;
    753 
    754     if (str.startsWith("unix:")) {
    755       StringPtr path = str.slice(strlen("unix:"));
    756       KJ_REQUIRE(path.size() < sizeof(addr.unixDomain.sun_path),
    757                  "Unix domain socket address is too long.", str);
    758       KJ_REQUIRE(path.size() == strlen(path.cStr()),
    759                  "Unix domain socket address contains NULL. Use"
    760                  " 'unix-abstract:' for the abstract namespace.");
    761       result.addr.unixDomain.sun_family = AF_UNIX;
    762       strcpy(result.addr.unixDomain.sun_path, path.cStr());
    763       result.addrlen = offsetof(struct sockaddr_un, sun_path) + path.size() + 1;
    764 
    765       if (!result.parseAllowedBy(filter)) {
    766         KJ_FAIL_REQUIRE("unix sockets blocked by restrictPeers()");
    767         return Array<SocketAddress>();
    768       }
    769 
    770       auto array = kj::heapArrayBuilder<SocketAddress>(1);
    771       array.add(result);
    772       return array.finish();
    773     }
    774 
    775     if (str.startsWith("unix-abstract:")) {
    776       StringPtr path = str.slice(strlen("unix-abstract:"));
    777       KJ_REQUIRE(path.size() + 1 < sizeof(addr.unixDomain.sun_path),
    778                  "Unix domain socket address is too long.", str);
    779       result.addr.unixDomain.sun_family = AF_UNIX;
    780       result.addr.unixDomain.sun_path[0] = '\0';
    781       // although not strictly required by Linux, also copy the trailing
    782       // NULL terminator so that we can safely read it back in toString
    783       memcpy(result.addr.unixDomain.sun_path + 1, path.cStr(), path.size() + 1);
    784       result.addrlen = offsetof(struct sockaddr_un, sun_path) + path.size() + 1;
    785 
    786       if (!result.parseAllowedBy(filter)) {
    787         KJ_FAIL_REQUIRE("abstract unix sockets blocked by restrictPeers()");
    788         return Array<SocketAddress>();
    789       }
    790 
    791       auto array = kj::heapArrayBuilder<SocketAddress>(1);
    792       array.add(result);
    793       return array.finish();
    794     }
    795 
    796     // Try to separate the address and port.
    797     ArrayPtr<const char> addrPart;
    798     Maybe<StringPtr> portPart;
    799 
    800     int af;
    801 
    802     if (str.startsWith("[")) {
    803       // Address starts with a bracket, which is a common way to write an ip6 address with a port,
    804       // since without brackets around the address part, the port looks like another segment of
    805       // the address.
    806       af = AF_INET6;
    807       size_t closeBracket = KJ_ASSERT_NONNULL(str.findLast(']'),
    808           "Unclosed '[' in address string.", str);
    809 
    810       addrPart = str.slice(1, closeBracket);
    811       if (str.size() > closeBracket + 1) {
    812         KJ_REQUIRE(str.slice(closeBracket + 1).startsWith(":"),
    813                    "Expected port suffix after ']'.", str);
    814         portPart = str.slice(closeBracket + 2);
    815       }
    816     } else {
    817       KJ_IF_MAYBE(colon, str.findFirst(':')) {
    818         if (str.slice(*colon + 1).findFirst(':') == nullptr) {
    819           // There is exactly one colon and no brackets, so it must be an ip4 address with port.
    820           af = AF_INET;
    821           addrPart = str.slice(0, *colon);
    822           portPart = str.slice(*colon + 1);
    823         } else {
    824           // There are two or more colons and no brackets, so the whole thing must be an ip6
    825           // address with no port.
    826           af = AF_INET6;
    827           addrPart = str;
    828         }
    829       } else {
    830         // No colons, so it must be an ip4 address without port.
    831         af = AF_INET;
    832         addrPart = str;
    833       }
    834     }
    835 
    836     // Parse the port.
    837     unsigned long port;
    838     KJ_IF_MAYBE(portText, portPart) {
    839       char* endptr;
    840       port = strtoul(portText->cStr(), &endptr, 0);
    841       if (portText->size() == 0 || *endptr != '\0') {
    842         // Not a number.  Maybe it's a service name.  Fall back to DNS.
    843         return lookupHost(lowLevel, kj::heapString(addrPart), kj::heapString(*portText), portHint,
    844                           filter);
    845       }
    846       KJ_REQUIRE(port < 65536, "Port number too large.");
    847     } else {
    848       port = portHint;
    849     }
    850 
    851     // Check for wildcard.
    852     if (addrPart.size() == 1 && addrPart[0] == '*') {
    853       result.wildcard = true;
    854 #if defined(__OpenBSD__)
    855       // On OpenBSD, all sockets are either v4-only or v6-only, so use v4 as a
    856       // temporary workaround for wildcards.
    857       result.addrlen = sizeof(addr.inet4);
    858       result.addr.inet4.sin_family = AF_INET;
    859       result.addr.inet4.sin_port = htons(port);
    860 #else
    861       // Create an ip6 socket and set IPV6_V6ONLY to 0 later.
    862       result.addrlen = sizeof(addr.inet6);
    863       result.addr.inet6.sin6_family = AF_INET6;
    864       result.addr.inet6.sin6_port = htons(port);
    865 #endif
    866 
    867       auto array = kj::heapArrayBuilder<SocketAddress>(1);
    868       array.add(result);
    869       return array.finish();
    870     }
    871 
    872     void* addrTarget;
    873     if (af == AF_INET6) {
    874       result.addrlen = sizeof(addr.inet6);
    875       result.addr.inet6.sin6_family = AF_INET6;
    876       result.addr.inet6.sin6_port = htons(port);
    877       addrTarget = &result.addr.inet6.sin6_addr;
    878     } else {
    879       result.addrlen = sizeof(addr.inet4);
    880       result.addr.inet4.sin_family = AF_INET;
    881       result.addr.inet4.sin_port = htons(port);
    882       addrTarget = &result.addr.inet4.sin_addr;
    883     }
    884 
    885     if (addrPart.size() < INET6_ADDRSTRLEN - 1) {
    886       // addrPart is not necessarily NUL-terminated so we have to make a copy.  :(
    887       char buffer[INET6_ADDRSTRLEN];
    888       memcpy(buffer, addrPart.begin(), addrPart.size());
    889       buffer[addrPart.size()] = '\0';
    890 
    891       // OK, parse it!
    892       switch (inet_pton(af, buffer, addrTarget)) {
    893         case 1: {
    894           // success.
    895           if (!result.parseAllowedBy(filter)) {
    896             KJ_FAIL_REQUIRE("address family blocked by restrictPeers()");
    897             return Array<SocketAddress>();
    898           }
    899 
    900           auto array = kj::heapArrayBuilder<SocketAddress>(1);
    901           array.add(result);
    902           return array.finish();
    903         }
    904         case 0:
    905           // It's apparently not a simple address...  fall back to DNS.
    906           break;
    907         default:
    908           KJ_FAIL_SYSCALL("inet_pton", errno, af, addrPart);
    909       }
    910     }
    911 
    912     return lookupHost(lowLevel, kj::heapString(addrPart), nullptr, port, filter);
    913   }
    914 
    915   static SocketAddress getLocalAddress(int sockfd) {
    916     SocketAddress result;
    917     result.addrlen = sizeof(addr);
    918     KJ_SYSCALL(getsockname(sockfd, &result.addr.generic, &result.addrlen));
    919     return result;
    920   }
    921 
    922   bool allowedBy(LowLevelAsyncIoProvider::NetworkFilter& filter) {
    923     return filter.shouldAllow(&addr.generic, addrlen);
    924   }
    925 
    926   bool parseAllowedBy(_::NetworkFilter& filter) {
    927     return filter.shouldAllowParse(&addr.generic, addrlen);
    928   }
    929 
    930   kj::Own<PeerIdentity> getIdentity(LowLevelAsyncIoProvider& llaiop,
    931                                     LowLevelAsyncIoProvider::NetworkFilter& filter,
    932                                     AsyncIoStream& stream) const;
    933 
    934 private:
    935   SocketAddress() {
    936     // We need to memset the whole object 0 otherwise Valgrind gets unhappy when we write it to a
    937     // pipe, due to the padding bytes being uninitialized.
    938     memset(this, 0, sizeof(*this));
    939   }
    940 
    941   socklen_t addrlen;
    942   bool wildcard = false;
    943   union {
    944     struct sockaddr generic;
    945     struct sockaddr_in inet4;
    946     struct sockaddr_in6 inet6;
    947     struct sockaddr_un unixDomain;
    948     struct sockaddr_storage storage;
    949   } addr;
    950 
    951   struct LookupParams;
    952   class LookupReader;
    953 };
    954 
    955 class SocketAddress::LookupReader {
    956   // Reads SocketAddresses off of a pipe coming from another thread that is performing
    957   // getaddrinfo.
    958 
    959 public:
    960   LookupReader(kj::Own<Thread>&& thread, kj::Own<AsyncInputStream>&& input,
    961                _::NetworkFilter& filter)
    962       : thread(kj::mv(thread)), input(kj::mv(input)), filter(filter) {}
    963 
    964   ~LookupReader() {
    965     if (thread) thread->detach();
    966   }
    967 
    968   Promise<Array<SocketAddress>> read() {
    969     return input->tryRead(&current, sizeof(current), sizeof(current)).then(
    970         [this](size_t n) -> Promise<Array<SocketAddress>> {
    971       if (n < sizeof(current)) {
    972         thread = nullptr;
    973         // getaddrinfo()'s docs seem to say it will never return an empty list, but let's check
    974         // anyway.
    975         KJ_REQUIRE(addresses.size() > 0, "DNS lookup returned no permitted addresses.") { break; }
    976         return addresses.releaseAsArray();
    977       } else {
    978         // getaddrinfo() can return multiple copies of the same address for several reasons.
    979         // A major one is that we don't give it a socket type (SOCK_STREAM vs. SOCK_DGRAM), so
    980         // it may return two copies of the same address, one for each type, unless it explicitly
    981         // knows that the service name given is specific to one type.  But we can't tell it a type,
    982         // because we don't actually know which one the user wants, and if we specify SOCK_STREAM
    983         // while the user specified a UDP service name then they'll get a resolution error which
    984         // is lame.  (At least, I think that's how it works.)
    985         //
    986         // So we instead resort to de-duping results.
    987         if (alreadySeen.insert(current).second) {
    988           if (current.parseAllowedBy(filter)) {
    989             addresses.add(current);
    990           }
    991         }
    992         return read();
    993       }
    994     });
    995   }
    996 
    997 private:
    998   kj::Own<Thread> thread;
    999   kj::Own<AsyncInputStream> input;
   1000   _::NetworkFilter& filter;
   1001   SocketAddress current;
   1002   kj::Vector<SocketAddress> addresses;
   1003   std::set<SocketAddress> alreadySeen;
   1004 };
   1005 
   1006 struct SocketAddress::LookupParams {
   1007   kj::String host;
   1008   kj::String service;
   1009 };
   1010 
   1011 Promise<Array<SocketAddress>> SocketAddress::lookupHost(
   1012     LowLevelAsyncIoProvider& lowLevel, kj::String host, kj::String service, uint portHint,
   1013     _::NetworkFilter& filter) {
   1014   // This shitty function spawns a thread to run getaddrinfo().  Unfortunately, getaddrinfo() is
   1015   // the only cross-platform DNS API and it is blocking.
   1016   //
   1017   // TODO(perf):  Use a thread pool?  Maybe kj::Thread should use a thread pool automatically?
   1018   //   Maybe use the various platform-specific asynchronous DNS libraries?  Please do not implement
   1019   //   a custom DNS resolver...
   1020 
   1021   int fds[2];
   1022 #if __linux__ && !__BIONIC__
   1023   KJ_SYSCALL(pipe2(fds, O_NONBLOCK | O_CLOEXEC));
   1024 #else
   1025   KJ_SYSCALL(pipe(fds));
   1026 #endif
   1027 
   1028   auto input = lowLevel.wrapInputFd(fds[0], NEW_FD_FLAGS);
   1029 
   1030   int outFd = fds[1];
   1031 
   1032   LookupParams params = { kj::mv(host), kj::mv(service) };
   1033 
   1034   auto thread = heap<Thread>(kj::mvCapture(params, [outFd,portHint](LookupParams&& params) {
   1035     FdOutputStream output((AutoCloseFd(outFd)));
   1036 
   1037     struct addrinfo* list;
   1038     int status = getaddrinfo(
   1039         params.host == "*" ? nullptr : params.host.cStr(),
   1040         params.service == nullptr ? nullptr : params.service.cStr(),
   1041         nullptr, &list);
   1042     if (status == 0) {
   1043       KJ_DEFER(freeaddrinfo(list));
   1044 
   1045       struct addrinfo* cur = list;
   1046       while (cur != nullptr) {
   1047         if (params.service == nullptr) {
   1048           switch (cur->ai_addr->sa_family) {
   1049             case AF_INET:
   1050               ((struct sockaddr_in*)cur->ai_addr)->sin_port = htons(portHint);
   1051               break;
   1052             case AF_INET6:
   1053               ((struct sockaddr_in6*)cur->ai_addr)->sin6_port = htons(portHint);
   1054               break;
   1055             default:
   1056               break;
   1057           }
   1058         }
   1059 
   1060         SocketAddress addr;
   1061         if (params.host == "*") {
   1062           // Set up a wildcard SocketAddress.  Only use the port number returned by getaddrinfo().
   1063           addr.wildcard = true;
   1064           addr.addrlen = sizeof(addr.addr.inet6);
   1065           addr.addr.inet6.sin6_family = AF_INET6;
   1066           switch (cur->ai_addr->sa_family) {
   1067             case AF_INET:
   1068               addr.addr.inet6.sin6_port = ((struct sockaddr_in*)cur->ai_addr)->sin_port;
   1069               break;
   1070             case AF_INET6:
   1071               addr.addr.inet6.sin6_port = ((struct sockaddr_in6*)cur->ai_addr)->sin6_port;
   1072               break;
   1073             default:
   1074               addr.addr.inet6.sin6_port = portHint;
   1075               break;
   1076           }
   1077         } else {
   1078           addr.addrlen = cur->ai_addrlen;
   1079           memcpy(&addr.addr.generic, cur->ai_addr, cur->ai_addrlen);
   1080         }
   1081         KJ_ASSERT_CAN_MEMCPY(SocketAddress);
   1082         output.write(&addr, sizeof(addr));
   1083         cur = cur->ai_next;
   1084       }
   1085     } else if (status == EAI_SYSTEM) {
   1086       KJ_FAIL_SYSCALL("getaddrinfo", errno, params.host, params.service) {
   1087         return;
   1088       }
   1089     } else {
   1090       KJ_FAIL_REQUIRE("DNS lookup failed.",
   1091                       params.host, params.service, gai_strerror(status)) {
   1092         return;
   1093       }
   1094     }
   1095   }));
   1096 
   1097   auto reader = heap<LookupReader>(kj::mv(thread), kj::mv(input), filter);
   1098   return reader->read().attach(kj::mv(reader));
   1099 }
   1100 
   1101 // =======================================================================================
   1102 
   1103 class FdConnectionReceiver final: public ConnectionReceiver, public OwnedFileDescriptor {
   1104 public:
   1105   FdConnectionReceiver(LowLevelAsyncIoProvider& lowLevel,
   1106                        UnixEventPort& eventPort, int fd,
   1107                        LowLevelAsyncIoProvider::NetworkFilter& filter, uint flags)
   1108       : OwnedFileDescriptor(fd, flags), lowLevel(lowLevel), eventPort(eventPort), filter(filter),
   1109         observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ) {}
   1110 
   1111   Promise<Own<AsyncIoStream>> accept() override {
   1112     return acceptImpl(false).then([](AuthenticatedStream&& a) { return kj::mv(a.stream); });
   1113   }
   1114 
   1115   Promise<AuthenticatedStream> acceptAuthenticated() override {
   1116     return acceptImpl(true);
   1117   }
   1118 
   1119   Promise<AuthenticatedStream> acceptImpl(bool authenticated) {
   1120     int newFd;
   1121 
   1122     struct sockaddr_storage addr;
   1123     socklen_t addrlen = sizeof(addr);
   1124 
   1125   retry:
   1126 #if __linux__ && !__BIONIC__
   1127     newFd = ::accept4(fd, reinterpret_cast<struct sockaddr*>(&addr), &addrlen,
   1128                       SOCK_NONBLOCK | SOCK_CLOEXEC);
   1129 #else
   1130     newFd = ::accept(fd, reinterpret_cast<struct sockaddr*>(&addr), &addrlen);
   1131 #endif
   1132 
   1133     if (newFd >= 0) {
   1134       kj::AutoCloseFd ownFd(newFd);
   1135       if (!filter.shouldAllow(reinterpret_cast<struct sockaddr*>(&addr), addrlen)) {
   1136         // Ignore disallowed address.
   1137         return acceptImpl(authenticated);
   1138       } else {
   1139         // TODO(perf):  As a hack for the 0.4 release we are always setting
   1140         //   TCP_NODELAY because Nagle's algorithm pretty much kills Cap'n Proto's
   1141         //   RPC protocol.  Later, we should extend the interface to provide more
   1142         //   control over this.  Perhaps write() should have a flag which
   1143         //   specifies whether to pass MSG_MORE.
   1144         int one = 1;
   1145         KJ_SYSCALL_HANDLE_ERRORS(::setsockopt(
   1146               ownFd.get(), IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(one))) {
   1147           case EOPNOTSUPP:
   1148           case ENOPROTOOPT: // (returned for AF_UNIX in cygwin)
   1149 #if __FreeBSD__
   1150           case EINVAL: // (returned for AF_UNIX in FreeBSD)
   1151 #endif
   1152             break;
   1153           default:
   1154             KJ_FAIL_SYSCALL("setsocketopt(IPPROTO_TCP, TCP_NODELAY)", error);
   1155         }
   1156 
   1157         AuthenticatedStream result;
   1158         result.stream = heap<AsyncStreamFd>(eventPort, ownFd.release(), NEW_FD_FLAGS);
   1159         if (authenticated) {
   1160           result.peerIdentity = SocketAddress(reinterpret_cast<struct sockaddr*>(&addr), addrlen)
   1161               .getIdentity(lowLevel, filter, *result.stream);
   1162         }
   1163         return kj::mv(result);
   1164       }
   1165     } else {
   1166       int error = errno;
   1167 
   1168       switch (error) {
   1169         case EAGAIN:
   1170 #if EAGAIN != EWOULDBLOCK
   1171         case EWOULDBLOCK:
   1172 #endif
   1173           // Not ready yet.
   1174           return observer.whenBecomesReadable().then([this,authenticated]() {
   1175             return acceptImpl(authenticated);
   1176           });
   1177 
   1178         case EINTR:
   1179         case ENETDOWN:
   1180 #ifdef EPROTO
   1181         // EPROTO is not defined on OpenBSD.
   1182         case EPROTO:
   1183 #endif
   1184         case EHOSTDOWN:
   1185         case EHOSTUNREACH:
   1186         case ENETUNREACH:
   1187         case ECONNABORTED:
   1188         case ETIMEDOUT:
   1189           // According to the Linux man page, accept() may report an error if the accepted
   1190           // connection is already broken.  In this case, we really ought to just ignore it and
   1191           // keep waiting.  But it's hard to say exactly what errors are such network errors and
   1192           // which ones are permanent errors.  We've made a guess here.
   1193           goto retry;
   1194 
   1195         default:
   1196           KJ_FAIL_SYSCALL("accept", error);
   1197       }
   1198 
   1199     }
   1200   }
   1201 
   1202   uint getPort() override {
   1203     return SocketAddress::getLocalAddress(fd).getPort();
   1204   }
   1205 
   1206   void getsockopt(int level, int option, void* value, uint* length) override {
   1207     socklen_t socklen = *length;
   1208     KJ_SYSCALL(::getsockopt(fd, level, option, value, &socklen));
   1209     *length = socklen;
   1210   }
   1211   void setsockopt(int level, int option, const void* value, uint length) override {
   1212     KJ_SYSCALL(::setsockopt(fd, level, option, value, length));
   1213   }
   1214   void getsockname(struct sockaddr* addr, uint* length) override {
   1215     socklen_t socklen = *length;
   1216     KJ_SYSCALL(::getsockname(fd, addr, &socklen));
   1217     *length = socklen;
   1218   }
   1219 
   1220 public:
   1221   LowLevelAsyncIoProvider& lowLevel;
   1222   UnixEventPort& eventPort;
   1223   LowLevelAsyncIoProvider::NetworkFilter& filter;
   1224   UnixEventPort::FdObserver observer;
   1225 };
   1226 
   1227 class DatagramPortImpl final: public DatagramPort, public OwnedFileDescriptor {
   1228 public:
   1229   DatagramPortImpl(LowLevelAsyncIoProvider& lowLevel, UnixEventPort& eventPort, int fd,
   1230                    LowLevelAsyncIoProvider::NetworkFilter& filter, uint flags)
   1231       : OwnedFileDescriptor(fd, flags), lowLevel(lowLevel), eventPort(eventPort), filter(filter),
   1232         observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ |
   1233                                 UnixEventPort::FdObserver::OBSERVE_WRITE) {}
   1234 
   1235   Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) override;
   1236   Promise<size_t> send(
   1237       ArrayPtr<const ArrayPtr<const byte>> pieces, NetworkAddress& destination) override;
   1238 
   1239   class ReceiverImpl;
   1240 
   1241   Own<DatagramReceiver> makeReceiver(DatagramReceiver::Capacity capacity) override;
   1242 
   1243   uint getPort() override {
   1244     return SocketAddress::getLocalAddress(fd).getPort();
   1245   }
   1246 
   1247   void getsockopt(int level, int option, void* value, uint* length) override {
   1248     socklen_t socklen = *length;
   1249     KJ_SYSCALL(::getsockopt(fd, level, option, value, &socklen));
   1250     *length = socklen;
   1251   }
   1252   void setsockopt(int level, int option, const void* value, uint length) override {
   1253     KJ_SYSCALL(::setsockopt(fd, level, option, value, length));
   1254   }
   1255 
   1256 public:
   1257   LowLevelAsyncIoProvider& lowLevel;
   1258   UnixEventPort& eventPort;
   1259   LowLevelAsyncIoProvider::NetworkFilter& filter;
   1260   UnixEventPort::FdObserver observer;
   1261 };
   1262 
   1263 class LowLevelAsyncIoProviderImpl final: public LowLevelAsyncIoProvider {
   1264 public:
   1265   LowLevelAsyncIoProviderImpl()
   1266       : eventLoop(eventPort), waitScope(eventLoop) {}
   1267 
   1268   inline WaitScope& getWaitScope() { return waitScope; }
   1269 
   1270   Own<AsyncInputStream> wrapInputFd(int fd, uint flags = 0) override {
   1271     return heap<AsyncStreamFd>(eventPort, fd, flags);
   1272   }
   1273   Own<AsyncOutputStream> wrapOutputFd(int fd, uint flags = 0) override {
   1274     return heap<AsyncStreamFd>(eventPort, fd, flags);
   1275   }
   1276   Own<AsyncIoStream> wrapSocketFd(int fd, uint flags = 0) override {
   1277     return heap<AsyncStreamFd>(eventPort, fd, flags);
   1278   }
   1279   Own<AsyncCapabilityStream> wrapUnixSocketFd(Fd fd, uint flags = 0) override {
   1280     return heap<AsyncStreamFd>(eventPort, fd, flags);
   1281   }
   1282   Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(
   1283       int fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) override {
   1284     // It's important that we construct the AsyncStreamFd first, so that `flags` are honored,
   1285     // especially setting nonblocking mode and taking ownership.
   1286     auto result = heap<AsyncStreamFd>(eventPort, fd, flags);
   1287 
   1288     // Unfortunately connect() doesn't fit the mold of KJ_NONBLOCKING_SYSCALL, since it indicates
   1289     // non-blocking using EINPROGRESS.
   1290     for (;;) {
   1291       if (::connect(fd, addr, addrlen) < 0) {
   1292         int error = errno;
   1293         if (error == EINPROGRESS) {
   1294           // Fine.
   1295           break;
   1296         } else if (error != EINTR) {
   1297           auto address = SocketAddress(addr, addrlen).toString();
   1298           KJ_FAIL_SYSCALL("connect()", error, address) { break; }
   1299           return Own<AsyncIoStream>();
   1300         }
   1301       } else {
   1302         // no error
   1303         break;
   1304       }
   1305     }
   1306 
   1307     auto connected = result->waitConnected();
   1308     return connected.then(kj::mvCapture(result, [fd](Own<AsyncIoStream>&& stream) {
   1309       int err;
   1310       socklen_t errlen = sizeof(err);
   1311       KJ_SYSCALL(getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen));
   1312       if (err != 0) {
   1313         KJ_FAIL_SYSCALL("connect()", err) { break; }
   1314       }
   1315       return kj::mv(stream);
   1316     }));
   1317   }
   1318   Own<ConnectionReceiver> wrapListenSocketFd(
   1319       int fd, NetworkFilter& filter, uint flags = 0) override {
   1320     return heap<FdConnectionReceiver>(*this, eventPort, fd, filter, flags);
   1321   }
   1322   Own<DatagramPort> wrapDatagramSocketFd(
   1323       int fd, NetworkFilter& filter, uint flags = 0) override {
   1324     return heap<DatagramPortImpl>(*this, eventPort, fd, filter, flags);
   1325   }
   1326 
   1327   Timer& getTimer() override { return eventPort.getTimer(); }
   1328 
   1329   UnixEventPort& getEventPort() { return eventPort; }
   1330 
   1331 private:
   1332   UnixEventPort eventPort;
   1333   EventLoop eventLoop;
   1334   WaitScope waitScope;
   1335 };
   1336 
   1337 // =======================================================================================
   1338 
   1339 class NetworkAddressImpl final: public NetworkAddress {
   1340 public:
   1341   NetworkAddressImpl(LowLevelAsyncIoProvider& lowLevel,
   1342                      LowLevelAsyncIoProvider::NetworkFilter& filter,
   1343                      Array<SocketAddress> addrs)
   1344       : lowLevel(lowLevel), filter(filter), addrs(kj::mv(addrs)) {}
   1345 
   1346   Promise<Own<AsyncIoStream>> connect() override {
   1347     auto addrsCopy = heapArray(addrs.asPtr());
   1348     auto promise = connectImpl(lowLevel, filter, addrsCopy, false);
   1349     return promise.attach(kj::mv(addrsCopy))
   1350         .then([](AuthenticatedStream&& a) { return kj::mv(a.stream); });
   1351   }
   1352 
   1353   Promise<AuthenticatedStream> connectAuthenticated() override {
   1354     auto addrsCopy = heapArray(addrs.asPtr());
   1355     auto promise = connectImpl(lowLevel, filter, addrsCopy, true);
   1356     return promise.attach(kj::mv(addrsCopy));
   1357   }
   1358 
   1359   Own<ConnectionReceiver> listen() override {
   1360     if (addrs.size() > 1) {
   1361       KJ_LOG(WARNING, "Bind address resolved to multiple addresses.  Only the first address will "
   1362           "be used.  If this is incorrect, specify the address numerically.  This may be fixed "
   1363           "in the future.", addrs[0].toString());
   1364     }
   1365 
   1366     int fd = addrs[0].socket(SOCK_STREAM);
   1367 
   1368     {
   1369       KJ_ON_SCOPE_FAILURE(close(fd));
   1370 
   1371       // We always enable SO_REUSEADDR because having to take your server down for five minutes
   1372       // before it can restart really sucks.
   1373       int optval = 1;
   1374       KJ_SYSCALL(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)));
   1375 
   1376       addrs[0].bind(fd);
   1377 
   1378       // TODO(someday):  Let queue size be specified explicitly in string addresses.
   1379       KJ_SYSCALL(::listen(fd, SOMAXCONN));
   1380     }
   1381 
   1382     return lowLevel.wrapListenSocketFd(fd, filter, NEW_FD_FLAGS);
   1383   }
   1384 
   1385   Own<DatagramPort> bindDatagramPort() override {
   1386     if (addrs.size() > 1) {
   1387       KJ_LOG(WARNING, "Bind address resolved to multiple addresses.  Only the first address will "
   1388           "be used.  If this is incorrect, specify the address numerically.  This may be fixed "
   1389           "in the future.", addrs[0].toString());
   1390     }
   1391 
   1392     int fd = addrs[0].socket(SOCK_DGRAM);
   1393 
   1394     {
   1395       KJ_ON_SCOPE_FAILURE(close(fd));
   1396 
   1397       // We always enable SO_REUSEADDR because having to take your server down for five minutes
   1398       // before it can restart really sucks.
   1399       int optval = 1;
   1400       KJ_SYSCALL(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)));
   1401 
   1402       addrs[0].bind(fd);
   1403     }
   1404 
   1405     return lowLevel.wrapDatagramSocketFd(fd, filter, NEW_FD_FLAGS);
   1406   }
   1407 
   1408   Own<NetworkAddress> clone() override {
   1409     return kj::heap<NetworkAddressImpl>(lowLevel, filter, kj::heapArray(addrs.asPtr()));
   1410   }
   1411 
   1412   String toString() override {
   1413     return strArray(KJ_MAP(addr, addrs) { return addr.toString(); }, ",");
   1414   }
   1415 
   1416   const SocketAddress& chooseOneAddress() {
   1417     KJ_REQUIRE(addrs.size() > 0, "No addresses available.");
   1418     return addrs[counter++ % addrs.size()];
   1419   }
   1420 
   1421 private:
   1422   LowLevelAsyncIoProvider& lowLevel;
   1423   LowLevelAsyncIoProvider::NetworkFilter& filter;
   1424   Array<SocketAddress> addrs;
   1425   uint counter = 0;
   1426 
   1427   static Promise<AuthenticatedStream> connectImpl(
   1428       LowLevelAsyncIoProvider& lowLevel,
   1429       LowLevelAsyncIoProvider::NetworkFilter& filter,
   1430       ArrayPtr<SocketAddress> addrs,
   1431       bool authenticated) {
   1432     KJ_ASSERT(addrs.size() > 0);
   1433 
   1434     return kj::evalNow([&]() -> Promise<Own<AsyncIoStream>> {
   1435       if (!addrs[0].allowedBy(filter)) {
   1436         return KJ_EXCEPTION(FAILED, "connect() blocked by restrictPeers()");
   1437       } else {
   1438         int fd = addrs[0].socket(SOCK_STREAM);
   1439         return lowLevel.wrapConnectingSocketFd(
   1440             fd, addrs[0].getRaw(), addrs[0].getRawSize(), NEW_FD_FLAGS);
   1441       }
   1442     }).then([&lowLevel,&filter,addrs,authenticated](Own<AsyncIoStream>&& stream)
   1443         -> Promise<AuthenticatedStream> {
   1444       // Success, pass along.
   1445       AuthenticatedStream result;
   1446       result.stream = kj::mv(stream);
   1447       if (authenticated) {
   1448         result.peerIdentity = addrs[0].getIdentity(lowLevel, filter, *result.stream);
   1449       }
   1450       return kj::mv(result);
   1451     }, [&lowLevel,&filter,addrs,authenticated](Exception&& exception) mutable
   1452         -> Promise<AuthenticatedStream> {
   1453       // Connect failed.
   1454       if (addrs.size() > 1) {
   1455         // Try the next address instead.
   1456         return connectImpl(lowLevel, filter, addrs.slice(1, addrs.size()), authenticated);
   1457       } else {
   1458         // No more addresses to try, so propagate the exception.
   1459         return kj::mv(exception);
   1460       }
   1461     });
   1462   }
   1463 };
   1464 
   1465 kj::Own<PeerIdentity> SocketAddress::getIdentity(kj::LowLevelAsyncIoProvider& llaiop,
   1466                                                  LowLevelAsyncIoProvider::NetworkFilter& filter,
   1467                                                  AsyncIoStream& stream) const {
   1468   switch (addr.generic.sa_family) {
   1469     case AF_INET:
   1470     case AF_INET6: {
   1471       auto builder = kj::heapArrayBuilder<SocketAddress>(1);
   1472       builder.add(*this);
   1473       return NetworkPeerIdentity::newInstance(
   1474           kj::heap<NetworkAddressImpl>(llaiop, filter, builder.finish()));
   1475     }
   1476     case AF_UNIX: {
   1477       LocalPeerIdentity::Credentials result;
   1478 
   1479       // There is little documentation on what happens when the uid/pid can't be obtained, but I've
   1480       // seen vague references on the internet saying that a PID of 0 and a UID of uid_t(-1) are used
   1481       // as invalid values.
   1482 
   1483 // OpenBSD defines SO_PEERCRED but uses a different interface for it
   1484 // hence we're falling back to LOCAL_PEERCRED
   1485 #if defined(SO_PEERCRED) && !__OpenBSD__
   1486       struct ucred creds;
   1487       uint length = sizeof(creds);
   1488       stream.getsockopt(SOL_SOCKET, SO_PEERCRED, &creds, &length);
   1489       if (creds.pid > 0) {
   1490         result.pid = creds.pid;
   1491       }
   1492       if (creds.uid != static_cast<uid_t>(-1)) {
   1493         result.uid = creds.uid;
   1494       }
   1495 
   1496 #elif defined(LOCAL_PEERCRED)
   1497       // MacOS / FreeBSD / OpenBSD
   1498       struct xucred creds;
   1499       uint length = sizeof(creds);
   1500       stream.getsockopt(SOL_LOCAL, LOCAL_PEERCRED, &creds, &length);
   1501       KJ_ASSERT(length == sizeof(creds));
   1502       if (creds.cr_uid != static_cast<uid_t>(-1)) {
   1503         result.uid = creds.cr_uid;
   1504       }
   1505 
   1506 #if defined(LOCAL_PEERPID)
   1507       // MacOS only?
   1508       pid_t pid;
   1509       length = sizeof(pid);
   1510       stream.getsockopt(SOL_LOCAL, LOCAL_PEERPID, &pid, &length);
   1511       KJ_ASSERT(length == sizeof(pid));
   1512       if (pid > 0) {
   1513         result.pid = pid;
   1514       }
   1515 #endif
   1516 #endif
   1517 
   1518       return LocalPeerIdentity::newInstance(result);
   1519     }
   1520     default:
   1521       return UnknownPeerIdentity::newInstance();
   1522   }
   1523 }
   1524 
   1525 class SocketNetwork final: public Network {
   1526 public:
   1527   explicit SocketNetwork(LowLevelAsyncIoProvider& lowLevel): lowLevel(lowLevel) {}
   1528   explicit SocketNetwork(SocketNetwork& parent,
   1529                          kj::ArrayPtr<const kj::StringPtr> allow,
   1530                          kj::ArrayPtr<const kj::StringPtr> deny)
   1531       : lowLevel(parent.lowLevel), filter(allow, deny, parent.filter) {}
   1532 
   1533   Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) override {
   1534     return evalLater(mvCapture(heapString(addr), [this,portHint](String&& addr) {
   1535       return SocketAddress::parse(lowLevel, addr, portHint, filter);
   1536     })).then([this](Array<SocketAddress> addresses) -> Own<NetworkAddress> {
   1537       return heap<NetworkAddressImpl>(lowLevel, filter, kj::mv(addresses));
   1538     });
   1539   }
   1540 
   1541   Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) override {
   1542     auto array = kj::heapArrayBuilder<SocketAddress>(1);
   1543     array.add(SocketAddress(sockaddr, len));
   1544     KJ_REQUIRE(array[0].allowedBy(filter), "address blocked by restrictPeers()") { break; }
   1545     return Own<NetworkAddress>(heap<NetworkAddressImpl>(lowLevel, filter, array.finish()));
   1546   }
   1547 
   1548   Own<Network> restrictPeers(
   1549       kj::ArrayPtr<const kj::StringPtr> allow,
   1550       kj::ArrayPtr<const kj::StringPtr> deny = nullptr) override {
   1551     return heap<SocketNetwork>(*this, allow, deny);
   1552   }
   1553 
   1554 private:
   1555   LowLevelAsyncIoProvider& lowLevel;
   1556   _::NetworkFilter filter;
   1557 };
   1558 
   1559 // =======================================================================================
   1560 
   1561 Promise<size_t> DatagramPortImpl::send(
   1562     const void* buffer, size_t size, NetworkAddress& destination) {
   1563   auto& addr = downcast<NetworkAddressImpl>(destination).chooseOneAddress();
   1564 
   1565   ssize_t n;
   1566   KJ_NONBLOCKING_SYSCALL(n = sendto(fd, buffer, size, 0, addr.getRaw(), addr.getRawSize()));
   1567   if (n < 0) {
   1568     // Write buffer full.
   1569     return observer.whenBecomesWritable().then([this, buffer, size, &destination]() {
   1570       return send(buffer, size, destination);
   1571     });
   1572   } else {
   1573     // If less than the whole message was sent, then it got truncated, and there's nothing we can
   1574     // do about it.
   1575     return n;
   1576   }
   1577 }
   1578 
   1579 Promise<size_t> DatagramPortImpl::send(
   1580     ArrayPtr<const ArrayPtr<const byte>> pieces, NetworkAddress& destination) {
   1581   struct msghdr msg;
   1582   memset(&msg, 0, sizeof(msg));
   1583 
   1584   auto& addr = downcast<NetworkAddressImpl>(destination).chooseOneAddress();
   1585   msg.msg_name = const_cast<void*>(implicitCast<const void*>(addr.getRaw()));
   1586   msg.msg_namelen = addr.getRawSize();
   1587 
   1588   const size_t iovmax = kj::miniposix::iovMax();
   1589   KJ_STACK_ARRAY(struct iovec, iov, kj::min(pieces.size(), iovmax), 16, 64);
   1590 
   1591   for (size_t i: kj::indices(pieces)) {
   1592     iov[i].iov_base = const_cast<void*>(implicitCast<const void*>(pieces[i].begin()));
   1593     iov[i].iov_len = pieces[i].size();
   1594   }
   1595 
   1596   Array<byte> extra;
   1597   if (pieces.size() > iovmax) {
   1598     // Too many pieces, but we can't use multiple syscalls because they'd send separate
   1599     // datagrams. We'll have to copy the trailing pieces into a temporary array.
   1600     //
   1601     // TODO(perf): On Linux we could use multiple syscalls via MSG_MORE or sendmsg/sendmmsg.
   1602     size_t extraSize = 0;
   1603     for (size_t i = iovmax - 1; i < pieces.size(); i++) {
   1604       extraSize += pieces[i].size();
   1605     }
   1606     extra = kj::heapArray<byte>(extraSize);
   1607     extraSize = 0;
   1608     for (size_t i = iovmax - 1; i < pieces.size(); i++) {
   1609       memcpy(extra.begin() + extraSize, pieces[i].begin(), pieces[i].size());
   1610       extraSize += pieces[i].size();
   1611     }
   1612     iov.back().iov_base = extra.begin();
   1613     iov.back().iov_len = extra.size();
   1614   }
   1615 
   1616   msg.msg_iov = iov.begin();
   1617   msg.msg_iovlen = iov.size();
   1618 
   1619   ssize_t n;
   1620   KJ_NONBLOCKING_SYSCALL(n = sendmsg(fd, &msg, 0));
   1621   if (n < 0) {
   1622     // Write buffer full.
   1623     return observer.whenBecomesWritable().then([this, pieces, &destination]() {
   1624       return send(pieces, destination);
   1625     });
   1626   } else {
   1627     // If less than the whole message was sent, then it was truncated, and there's nothing we can
   1628     // do about that now.
   1629     return n;
   1630   }
   1631 }
   1632 
   1633 class DatagramPortImpl::ReceiverImpl final: public DatagramReceiver {
   1634 public:
   1635   explicit ReceiverImpl(DatagramPortImpl& port, Capacity capacity)
   1636       : port(port),
   1637         contentBuffer(heapArray<byte>(capacity.content)),
   1638         ancillaryBuffer(capacity.ancillary > 0 ? heapArray<byte>(capacity.ancillary)
   1639                                                : Array<byte>(nullptr)) {}
   1640 
   1641   Promise<void> receive() override {
   1642     struct msghdr msg;
   1643     memset(&msg, 0, sizeof(msg));
   1644 
   1645     struct sockaddr_storage addr;
   1646     memset(&addr, 0, sizeof(addr));
   1647     msg.msg_name = &addr;
   1648     msg.msg_namelen = sizeof(addr);
   1649 
   1650     struct iovec iov;
   1651     iov.iov_base = contentBuffer.begin();
   1652     iov.iov_len = contentBuffer.size();
   1653     msg.msg_iov = &iov;
   1654     msg.msg_iovlen = 1;
   1655     msg.msg_control = ancillaryBuffer.begin();
   1656     msg.msg_controllen = ancillaryBuffer.size();
   1657 
   1658     ssize_t n;
   1659     KJ_NONBLOCKING_SYSCALL(n = recvmsg(port.fd, &msg, 0));
   1660 
   1661     if (n < 0) {
   1662       // No data available. Wait.
   1663       return port.observer.whenBecomesReadable().then([this]() {
   1664         return receive();
   1665       });
   1666     } else {
   1667       if (!port.filter.shouldAllow(reinterpret_cast<const struct sockaddr*>(msg.msg_name),
   1668                                    msg.msg_namelen)) {
   1669         // Ignore message from disallowed source.
   1670         return receive();
   1671       }
   1672 
   1673       receivedSize = n;
   1674       contentTruncated = msg.msg_flags & MSG_TRUNC;
   1675 
   1676       source.emplace(port.lowLevel, port.filter, msg.msg_name, msg.msg_namelen);
   1677 
   1678       ancillaryList.resize(0);
   1679       ancillaryTruncated = msg.msg_flags & MSG_CTRUNC;
   1680 
   1681       for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr;
   1682            cmsg = CMSG_NXTHDR(&msg, cmsg)) {
   1683         // On some platforms (OSX), a cmsghdr's length may cross the end of the ancillary buffer
   1684         // when truncated. On other platforms (Linux) the length in cmsghdr will itself be
   1685         // truncated to fit within the buffer.
   1686 
   1687 #if __APPLE__
   1688 // On MacOS, `CMSG_SPACE(0)` triggers a bogus warning.
   1689 #pragma GCC diagnostic ignored "-Wnull-pointer-arithmetic"
   1690 #endif
   1691         const byte* pos = reinterpret_cast<const byte*>(cmsg);
   1692         size_t available = ancillaryBuffer.end() - pos;
   1693         if (available < CMSG_SPACE(0)) {
   1694           // The buffer ends in the middle of the header. We can't use this message.
   1695           // (On Linux, this never happens, because the message is not included if there isn't
   1696           // space for a header. I'm not sure how other systems behave, though, so let's be safe.)
   1697           break;
   1698         }
   1699 
   1700         // OK, we know the cmsghdr is valid, at least.
   1701 
   1702         // Find the start of the message payload.
   1703         const byte* begin = (const byte *)CMSG_DATA(cmsg);
   1704 
   1705         // Cap the message length to the available space.
   1706         const byte* end = pos + kj::min(available, cmsg->cmsg_len);
   1707 
   1708         ancillaryList.add(AncillaryMessage(
   1709             cmsg->cmsg_level, cmsg->cmsg_type, arrayPtr(begin, end)));
   1710       }
   1711 
   1712       return READY_NOW;
   1713     }
   1714   }
   1715 
   1716   MaybeTruncated<ArrayPtr<const byte>> getContent() override {
   1717     return { contentBuffer.slice(0, receivedSize), contentTruncated };
   1718   }
   1719 
   1720   MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() override {
   1721     return { ancillaryList.asPtr(), ancillaryTruncated };
   1722   }
   1723 
   1724   NetworkAddress& getSource() override {
   1725     return KJ_REQUIRE_NONNULL(source, "Haven't sent a message yet.").abstract;
   1726   }
   1727 
   1728 private:
   1729   DatagramPortImpl& port;
   1730   Array<byte> contentBuffer;
   1731   Array<byte> ancillaryBuffer;
   1732   Vector<AncillaryMessage> ancillaryList;
   1733   size_t receivedSize = 0;
   1734   bool contentTruncated = false;
   1735   bool ancillaryTruncated = false;
   1736 
   1737   struct StoredAddress {
   1738     StoredAddress(LowLevelAsyncIoProvider& lowLevel, LowLevelAsyncIoProvider::NetworkFilter& filter,
   1739                   const void* sockaddr, uint length)
   1740         : raw(sockaddr, length),
   1741           abstract(lowLevel, filter, Array<SocketAddress>(&raw, 1, NullArrayDisposer::instance)) {}
   1742 
   1743     SocketAddress raw;
   1744     NetworkAddressImpl abstract;
   1745   };
   1746 
   1747   kj::Maybe<StoredAddress> source;
   1748 };
   1749 
   1750 Own<DatagramReceiver> DatagramPortImpl::makeReceiver(DatagramReceiver::Capacity capacity) {
   1751   return kj::heap<ReceiverImpl>(*this, capacity);
   1752 }
   1753 
   1754 // =======================================================================================
   1755 
   1756 class AsyncIoProviderImpl final: public AsyncIoProvider {
   1757 public:
   1758   AsyncIoProviderImpl(LowLevelAsyncIoProvider& lowLevel)
   1759       : lowLevel(lowLevel), network(lowLevel) {}
   1760 
   1761   OneWayPipe newOneWayPipe() override {
   1762     int fds[2];
   1763 #if __linux__ && !__BIONIC__
   1764     KJ_SYSCALL(pipe2(fds, O_NONBLOCK | O_CLOEXEC));
   1765 #else
   1766     KJ_SYSCALL(pipe(fds));
   1767 #endif
   1768     return OneWayPipe {
   1769       lowLevel.wrapInputFd(fds[0], NEW_FD_FLAGS),
   1770       lowLevel.wrapOutputFd(fds[1], NEW_FD_FLAGS)
   1771     };
   1772   }
   1773 
   1774   TwoWayPipe newTwoWayPipe() override {
   1775     int fds[2];
   1776     int type = SOCK_STREAM;
   1777 #if __linux__ && !__BIONIC__
   1778     type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
   1779 #endif
   1780     KJ_SYSCALL(socketpair(AF_UNIX, type, 0, fds));
   1781     return TwoWayPipe { {
   1782       lowLevel.wrapSocketFd(fds[0], NEW_FD_FLAGS),
   1783       lowLevel.wrapSocketFd(fds[1], NEW_FD_FLAGS)
   1784     } };
   1785   }
   1786 
   1787   CapabilityPipe newCapabilityPipe() override {
   1788     int fds[2];
   1789     int type = SOCK_STREAM;
   1790 #if __linux__ && !__BIONIC__
   1791     type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
   1792 #endif
   1793     KJ_SYSCALL(socketpair(AF_UNIX, type, 0, fds));
   1794     return CapabilityPipe { {
   1795       lowLevel.wrapUnixSocketFd(fds[0], NEW_FD_FLAGS),
   1796       lowLevel.wrapUnixSocketFd(fds[1], NEW_FD_FLAGS)
   1797     } };
   1798   }
   1799 
   1800   Network& getNetwork() override {
   1801     return network;
   1802   }
   1803 
   1804   PipeThread newPipeThread(
   1805       Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) override {
   1806     int fds[2];
   1807     int type = SOCK_STREAM;
   1808 #if __linux__ && !__BIONIC__
   1809     type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
   1810 #endif
   1811     KJ_SYSCALL(socketpair(AF_UNIX, type, 0, fds));
   1812 
   1813     int threadFd = fds[1];
   1814     KJ_ON_SCOPE_FAILURE(close(threadFd));
   1815 
   1816     auto pipe = lowLevel.wrapSocketFd(fds[0], NEW_FD_FLAGS);
   1817 
   1818     auto thread = heap<Thread>(kj::mvCapture(startFunc,
   1819         [threadFd](Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)>&& startFunc) {
   1820       LowLevelAsyncIoProviderImpl lowLevel;
   1821       auto stream = lowLevel.wrapSocketFd(threadFd, NEW_FD_FLAGS);
   1822       AsyncIoProviderImpl ioProvider(lowLevel);
   1823       startFunc(ioProvider, *stream, lowLevel.getWaitScope());
   1824     }));
   1825 
   1826     return { kj::mv(thread), kj::mv(pipe) };
   1827   }
   1828 
   1829   Timer& getTimer() override { return lowLevel.getTimer(); }
   1830 
   1831 private:
   1832   LowLevelAsyncIoProvider& lowLevel;
   1833   SocketNetwork network;
   1834 };
   1835 
   1836 }  // namespace
   1837 
   1838 Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel) {
   1839   return kj::heap<AsyncIoProviderImpl>(lowLevel);
   1840 }
   1841 
   1842 AsyncIoContext setupAsyncIo() {
   1843   auto lowLevel = heap<LowLevelAsyncIoProviderImpl>();
   1844   auto ioProvider = kj::heap<AsyncIoProviderImpl>(*lowLevel);
   1845   auto& waitScope = lowLevel->getWaitScope();
   1846   auto& eventPort = lowLevel->getEventPort();
   1847   return { kj::mv(lowLevel), kj::mv(ioProvider), waitScope, eventPort };
   1848 }
   1849 
   1850 }  // namespace kj
   1851 
   1852 #endif  // !_WIN32