async-io-unix.c++ (67624B)
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #if !_WIN32 23 // For Win32 implementation, see async-io-win32.c++. 24 25 #ifndef _GNU_SOURCE 26 #define _GNU_SOURCE 27 #endif 28 29 #include "async-io.h" 30 #include "async-io-internal.h" 31 #include "async-unix.h" 32 #include "debug.h" 33 #include "thread.h" 34 #include "io.h" 35 #include "miniposix.h" 36 #include <unistd.h> 37 #include <sys/uio.h> 38 #include <errno.h> 39 #include <fcntl.h> 40 #include <sys/types.h> 41 #include <sys/socket.h> 42 #include <sys/un.h> 43 #include <netinet/in.h> 44 #include <netinet/tcp.h> 45 #include <stddef.h> 46 #include <stdlib.h> 47 #include <arpa/inet.h> 48 #include <netdb.h> 49 #include <set> 50 #include <poll.h> 51 #include <limits.h> 52 #include <sys/ioctl.h> 53 54 #if !defined(SO_PEERCRED) && defined(LOCAL_PEERCRED) 55 #include <sys/ucred.h> 56 #endif 57 58 #if !defined(SOL_LOCAL) && (__FreeBSD__ || __DragonflyBSD__) 59 // On DragonFly or FreeBSD < 12.2 you're supposed to use 0 for SOL_LOCAL. 60 #define SOL_LOCAL 0 61 #endif 62 63 namespace kj { 64 65 namespace { 66 67 void setNonblocking(int fd) { 68 #ifdef FIONBIO 69 int opt = 1; 70 KJ_SYSCALL(ioctl(fd, FIONBIO, &opt)); 71 #else 72 int flags; 73 KJ_SYSCALL(flags = fcntl(fd, F_GETFL)); 74 if ((flags & O_NONBLOCK) == 0) { 75 KJ_SYSCALL(fcntl(fd, F_SETFL, flags | O_NONBLOCK)); 76 } 77 #endif 78 } 79 80 void setCloseOnExec(int fd) { 81 #ifdef FIOCLEX 82 KJ_SYSCALL(ioctl(fd, FIOCLEX)); 83 #else 84 int flags; 85 KJ_SYSCALL(flags = fcntl(fd, F_GETFD)); 86 if ((flags & FD_CLOEXEC) == 0) { 87 KJ_SYSCALL(fcntl(fd, F_SETFD, flags | FD_CLOEXEC)); 88 } 89 #endif 90 } 91 92 static constexpr uint NEW_FD_FLAGS = 93 #if __linux__ && !__BIONIC__ 94 LowLevelAsyncIoProvider::ALREADY_CLOEXEC | LowLevelAsyncIoProvider::ALREADY_NONBLOCK | 95 #endif 96 LowLevelAsyncIoProvider::TAKE_OWNERSHIP; 97 // We always try to open FDs with CLOEXEC and NONBLOCK already set on Linux, but on other platforms 98 // this is not possible. 99 100 class OwnedFileDescriptor { 101 public: 102 OwnedFileDescriptor(int fd, uint flags): fd(fd), flags(flags) { 103 if (flags & LowLevelAsyncIoProvider::ALREADY_NONBLOCK) { 104 KJ_DREQUIRE(fcntl(fd, F_GETFL) & O_NONBLOCK, "You claimed you set NONBLOCK, but you didn't."); 105 } else { 106 setNonblocking(fd); 107 } 108 109 if (flags & LowLevelAsyncIoProvider::TAKE_OWNERSHIP) { 110 if (flags & LowLevelAsyncIoProvider::ALREADY_CLOEXEC) { 111 KJ_DREQUIRE(fcntl(fd, F_GETFD) & FD_CLOEXEC, 112 "You claimed you set CLOEXEC, but you didn't."); 113 } else { 114 setCloseOnExec(fd); 115 } 116 } 117 } 118 119 ~OwnedFileDescriptor() noexcept(false) { 120 // Don't use SYSCALL() here because close() should not be repeated on EINTR. 121 if ((flags & LowLevelAsyncIoProvider::TAKE_OWNERSHIP) && close(fd) < 0) { 122 KJ_FAIL_SYSCALL("close", errno, fd) { 123 // Recoverable exceptions are safe in destructors. 124 break; 125 } 126 } 127 } 128 129 protected: 130 const int fd; 131 132 private: 133 uint flags; 134 }; 135 136 // ======================================================================================= 137 138 class AsyncStreamFd: public OwnedFileDescriptor, public AsyncCapabilityStream { 139 public: 140 AsyncStreamFd(UnixEventPort& eventPort, int fd, uint flags) 141 : OwnedFileDescriptor(fd, flags), 142 eventPort(eventPort), 143 observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ_WRITE) {} 144 virtual ~AsyncStreamFd() noexcept(false) {} 145 146 Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { 147 return tryReadInternal(buffer, minBytes, maxBytes, nullptr, 0, {0,0}) 148 .then([](ReadResult r) { return r.byteCount; }); 149 } 150 151 Promise<ReadResult> tryReadWithFds(void* buffer, size_t minBytes, size_t maxBytes, 152 AutoCloseFd* fdBuffer, size_t maxFds) override { 153 return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, {0,0}); 154 } 155 156 Promise<ReadResult> tryReadWithStreams( 157 void* buffer, size_t minBytes, size_t maxBytes, 158 Own<AsyncCapabilityStream>* streamBuffer, size_t maxStreams) override { 159 auto fdBuffer = kj::heapArray<AutoCloseFd>(maxStreams); 160 auto promise = tryReadInternal(buffer, minBytes, maxBytes, fdBuffer.begin(), maxStreams, {0,0}); 161 162 return promise.then([this, fdBuffer = kj::mv(fdBuffer), streamBuffer] 163 (ReadResult result) mutable { 164 for (auto i: kj::zeroTo(result.capCount)) { 165 streamBuffer[i] = kj::heap<AsyncStreamFd>(eventPort, fdBuffer[i].release(), 166 LowLevelAsyncIoProvider::TAKE_OWNERSHIP | LowLevelAsyncIoProvider::ALREADY_CLOEXEC); 167 } 168 return result; 169 }); 170 } 171 172 Promise<void> write(const void* buffer, size_t size) override { 173 ssize_t n; 174 KJ_NONBLOCKING_SYSCALL(n = ::write(fd, buffer, size)) { 175 // Error. 176 177 // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to 178 // a bug that exists in both Clang and GCC: 179 // http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799 180 // http://llvm.org/bugs/show_bug.cgi?id=12286 181 goto error; 182 } 183 if (false) { 184 error: 185 return kj::READY_NOW; 186 } 187 188 if (n < 0) { 189 // EAGAIN -- need to wait for writability and try again. 190 return observer.whenBecomesWritable().then([=]() { 191 return write(buffer, size); 192 }); 193 } else if (n == size) { 194 // All done. 195 return READY_NOW; 196 } else { 197 // Fewer than `size` bytes were written, but we CANNOT assume we're out of buffer space, as 198 // Linux is known to return partial reads/writes when interrupted by a signal -- yes, even 199 // for non-blocking operations. So, we'll need to write() again now, even though it will 200 // almost certainly fail with EAGAIN. See comments in the read path for more info. 201 buffer = reinterpret_cast<const byte*>(buffer) + n; 202 size -= n; 203 return write(buffer, size); 204 } 205 } 206 207 Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override { 208 if (pieces.size() == 0) { 209 return writeInternal(nullptr, nullptr, nullptr); 210 } else { 211 return writeInternal(pieces[0], pieces.slice(1, pieces.size()), nullptr); 212 } 213 } 214 215 Promise<void> writeWithFds(ArrayPtr<const byte> data, 216 ArrayPtr<const ArrayPtr<const byte>> moreData, 217 ArrayPtr<const int> fds) override { 218 return writeInternal(data, moreData, fds); 219 } 220 221 Promise<void> writeWithStreams(ArrayPtr<const byte> data, 222 ArrayPtr<const ArrayPtr<const byte>> moreData, 223 Array<Own<AsyncCapabilityStream>> streams) override { 224 auto fds = KJ_MAP(stream, streams) { 225 return downcast<AsyncStreamFd>(*stream).fd; 226 }; 227 auto promise = writeInternal(data, moreData, fds); 228 return promise.attach(kj::mv(fds), kj::mv(streams)); 229 } 230 231 Promise<void> whenWriteDisconnected() override { 232 KJ_IF_MAYBE(p, writeDisconnectedPromise) { 233 return p->addBranch(); 234 } else { 235 auto fork = observer.whenWriteDisconnected().fork(); 236 auto result = fork.addBranch(); 237 writeDisconnectedPromise = kj::mv(fork); 238 return kj::mv(result); 239 } 240 } 241 242 void shutdownWrite() override { 243 // There's no legitimate way to get an AsyncStreamFd that isn't a socket through the 244 // UnixAsyncIoProvider interface. 245 KJ_SYSCALL(shutdown(fd, SHUT_WR)); 246 } 247 248 void abortRead() override { 249 // There's no legitimate way to get an AsyncStreamFd that isn't a socket through the 250 // UnixAsyncIoProvider interface. 251 KJ_SYSCALL(shutdown(fd, SHUT_RD)); 252 } 253 254 void getsockopt(int level, int option, void* value, uint* length) override { 255 socklen_t socklen = *length; 256 KJ_SYSCALL(::getsockopt(fd, level, option, value, &socklen)); 257 *length = socklen; 258 } 259 260 void setsockopt(int level, int option, const void* value, uint length) override { 261 KJ_SYSCALL(::setsockopt(fd, level, option, value, length)); 262 } 263 264 void getsockname(struct sockaddr* addr, uint* length) override { 265 socklen_t socklen = *length; 266 KJ_SYSCALL(::getsockname(fd, addr, &socklen)); 267 *length = socklen; 268 } 269 270 void getpeername(struct sockaddr* addr, uint* length) override { 271 socklen_t socklen = *length; 272 KJ_SYSCALL(::getpeername(fd, addr, &socklen)); 273 *length = socklen; 274 } 275 276 kj::Maybe<int> getFd() const override { 277 return fd; 278 } 279 280 void registerAncillaryMessageHandler( 281 kj::Function<void(kj::ArrayPtr<AncillaryMessage>)> fn) override { 282 ancillaryMsgCallback = kj::mv(fn); 283 } 284 285 Promise<void> waitConnected() { 286 // Wait until initial connection has completed. This actually just waits until it is writable. 287 288 // Can't just go directly to writeObserver.whenBecomesWritable() because of edge triggering. We 289 // need to explicitly check if the socket is already connected. 290 291 struct pollfd pollfd; 292 memset(&pollfd, 0, sizeof(pollfd)); 293 pollfd.fd = fd; 294 pollfd.events = POLLOUT; 295 296 int pollResult; 297 KJ_SYSCALL(pollResult = poll(&pollfd, 1, 0)); 298 299 if (pollResult == 0) { 300 // Not ready yet. We can safely use the edge-triggered observer. 301 return observer.whenBecomesWritable(); 302 } else { 303 // Ready now. 304 return kj::READY_NOW; 305 } 306 } 307 308 private: 309 UnixEventPort& eventPort; 310 UnixEventPort::FdObserver observer; 311 Maybe<ForkedPromise<void>> writeDisconnectedPromise; 312 Maybe<Function<void(ArrayPtr<AncillaryMessage>)>> ancillaryMsgCallback; 313 314 Promise<ReadResult> tryReadInternal(void* buffer, size_t minBytes, size_t maxBytes, 315 AutoCloseFd* fdBuffer, size_t maxFds, 316 ReadResult alreadyRead) { 317 // `alreadyRead` is the number of bytes we have already received via previous reads -- minBytes, 318 // maxBytes, and buffer have already been adjusted to account for them, but this count must 319 // be included in the final return value. 320 321 ssize_t n; 322 if (maxFds == 0 && ancillaryMsgCallback == nullptr) { 323 KJ_NONBLOCKING_SYSCALL(n = ::read(fd, buffer, maxBytes)) { 324 // Error. 325 326 // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to 327 // a bug that exists in both Clang and GCC: 328 // http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799 329 // http://llvm.org/bugs/show_bug.cgi?id=12286 330 goto error; 331 } 332 } else { 333 struct msghdr msg; 334 memset(&msg, 0, sizeof(msg)); 335 336 struct iovec iov; 337 memset(&iov, 0, sizeof(iov)); 338 iov.iov_base = buffer; 339 iov.iov_len = maxBytes; 340 msg.msg_iov = &iov; 341 msg.msg_iovlen = 1; 342 343 // Allocate space to receive a cmsg. 344 size_t msgBytes; 345 if (ancillaryMsgCallback == nullptr) { 346 #if __APPLE__ || __FreeBSD__ 347 // Until very recently (late 2018 / early 2019), FreeBSD suffered from a bug in which when 348 // an SCM_RIGHTS message was truncated on delivery, it would not close the FDs that weren't 349 // delivered -- they would simply leak: https://bugs.freebsd.org/131876 350 // 351 // My testing indicates that MacOS has this same bug as of today (April 2019). I don't know 352 // if they plan to fix it or are even aware of it. 353 // 354 // To handle both cases, we will always provide space to receive 512 FDs. Hopefully, this is 355 // greater than the maximum number of FDs that these kernels will transmit in one message 356 // PLUS enough space for any other ancillary messages that could be sent before the 357 // SCM_RIGHTS message to push it back in the buffer. I couldn't find any firm documentation 358 // on these limits, though -- I only know that Linux is limited to 253, and I saw a hint in 359 // a comment in someone else's application that suggested FreeBSD is the same. Hopefully, 360 // then, this is sufficient to prevent attacks. But if not, there's nothing more we can do; 361 // it's really up to the kernel to fix this. 362 msgBytes = CMSG_SPACE(sizeof(int) * 512); 363 #else 364 msgBytes = CMSG_SPACE(sizeof(int) * maxFds); 365 #endif 366 } else { 367 // If we want room for ancillary messages instead of or in addition to FDs, just use the 368 // same amount of cushion as in the MacOS/FreeBSD case above. 369 // Someday we may want to allow customization here, but there's no immediate use for it. 370 msgBytes = CMSG_SPACE(sizeof(int) * 512); 371 } 372 373 // On Linux, CMSG_SPACE will align to a word-size boundary, but on Mac it always aligns to a 374 // 32-bit boundary. I guess aligning to 32 bits helps avoid the problem where you 375 // surprisingly end up with space for two file descriptors when you only wanted one. However, 376 // cmsghdr's preferred alignment is word-size (it contains a size_t). If we stack-allocate 377 // the buffer, we need to make sure it is aligned properly (maybe not on x64, but maybe on 378 // other platforms), so we want to allocate an array of words (we use void*). So... we use 379 // CMSG_SPACE() and then additionally round up to deal with Mac. 380 size_t msgWords = (msgBytes + sizeof(void*) - 1) / sizeof(void*); 381 KJ_STACK_ARRAY(void*, cmsgSpace, msgWords, 16, 256); 382 auto cmsgBytes = cmsgSpace.asBytes(); 383 memset(cmsgBytes.begin(), 0, cmsgBytes.size()); 384 msg.msg_control = cmsgBytes.begin(); 385 msg.msg_controllen = msgBytes; 386 387 #ifdef MSG_CMSG_CLOEXEC 388 static constexpr int RECVMSG_FLAGS = MSG_CMSG_CLOEXEC; 389 #else 390 static constexpr int RECVMSG_FLAGS = 0; 391 #endif 392 393 KJ_NONBLOCKING_SYSCALL(n = ::recvmsg(fd, &msg, RECVMSG_FLAGS)) { 394 // Error. 395 396 // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to 397 // a bug that exists in both Clang and GCC: 398 // http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799 399 // http://llvm.org/bugs/show_bug.cgi?id=12286 400 goto error; 401 } 402 403 if (n >= 0) { 404 // Process all messages. 405 // 406 // WARNING DANGER: We have to be VERY careful not to miss a file descriptor here, because 407 // if we do, then that FD will never be closed, and a malicious peer could exploit this to 408 // fill up our FD table, creating a DoS attack. Some things to keep in mind: 409 // - CMSG_SPACE() could have rounded up the space for alignment purposes, and this could 410 // mean we permitted the kernel to deliver more file descriptors than `maxFds`. We need 411 // to close the extras. 412 // - We can receive multiple ancillary messages at once. In particular, there is also 413 // SCM_CREDENTIALS. The sender decides what to send. They could send SCM_CREDENTIALS 414 // first followed by SCM_RIGHTS. We need to make sure we see both. 415 size_t nfds = 0; 416 size_t spaceLeft = msg.msg_controllen; 417 Vector<AncillaryMessage> ancillaryMessages; 418 for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg); 419 cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) { 420 if (spaceLeft >= CMSG_LEN(0) && 421 cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) { 422 // Some operating systems (like MacOS) do not adjust csmg_len when the message is 423 // truncated. We must do so ourselves or risk overrunning the buffer. 424 auto len = kj::min(cmsg->cmsg_len, spaceLeft); 425 auto data = arrayPtr(reinterpret_cast<int*>(CMSG_DATA(cmsg)), 426 (len - CMSG_LEN(0)) / sizeof(int)); 427 kj::Vector<kj::AutoCloseFd> trashFds; 428 for (auto fd: data) { 429 kj::AutoCloseFd ownFd(fd); 430 if (nfds < maxFds) { 431 fdBuffer[nfds++] = kj::mv(ownFd); 432 } else { 433 trashFds.add(kj::mv(ownFd)); 434 } 435 } 436 } else if (spaceLeft >= CMSG_LEN(0) && ancillaryMsgCallback != nullptr) { 437 auto len = kj::min(cmsg->cmsg_len, spaceLeft); 438 auto data = ArrayPtr<const byte>(CMSG_DATA(cmsg), len - CMSG_LEN(0)); 439 ancillaryMessages.add(cmsg->cmsg_level, cmsg->cmsg_type, data); 440 } 441 442 if (spaceLeft >= CMSG_LEN(0) && spaceLeft >= cmsg->cmsg_len) { 443 spaceLeft -= cmsg->cmsg_len; 444 } else { 445 spaceLeft = 0; 446 } 447 } 448 449 #ifndef MSG_CMSG_CLOEXEC 450 for (size_t i = 0; i < nfds; i++) { 451 setCloseOnExec(fdBuffer[i]); 452 } 453 #endif 454 455 if (ancillaryMessages.size() > 0) { 456 KJ_IF_MAYBE(fn, ancillaryMsgCallback) { 457 (*fn)(ancillaryMessages.asPtr()); 458 } 459 } 460 461 alreadyRead.capCount += nfds; 462 fdBuffer += nfds; 463 maxFds -= nfds; 464 } 465 } 466 467 if (false) { 468 error: 469 return alreadyRead; 470 } 471 472 if (n < 0) { 473 // Read would block. 474 return observer.whenBecomesReadable().then([=]() { 475 return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, alreadyRead); 476 }); 477 } else if (n == 0) { 478 // EOF -OR- maxBytes == 0. 479 return alreadyRead; 480 } else if (implicitCast<size_t>(n) >= minBytes) { 481 // We read enough to stop here. 482 alreadyRead.byteCount += n; 483 return alreadyRead; 484 } else { 485 // The kernel returned fewer bytes than we asked for (and fewer than we need). 486 487 buffer = reinterpret_cast<byte*>(buffer) + n; 488 minBytes -= n; 489 maxBytes -= n; 490 alreadyRead.byteCount += n; 491 492 // According to David Klempner, who works on Stubby at Google, we sadly CANNOT assume that 493 // we've consumed the whole read buffer here. If a signal is delivered in the middle of a 494 // read() -- yes, even a non-blocking read -- it can cause the kernel to return a partial 495 // result, with data still in the buffer. 496 // https://bugzilla.kernel.org/show_bug.cgi?id=199131 497 // https://twitter.com/CaptainSegfault/status/1112622245531144194 498 // 499 // Unfortunately, we have no choice but to issue more read()s until it either tells us EOF 500 // or EAGAIN. We used to have an optimization here using observer.atEndHint() (when it is 501 // non-null) to avoid a redundant call to read(). Alas... 502 return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, alreadyRead); 503 } 504 } 505 506 Promise<void> writeInternal(ArrayPtr<const byte> firstPiece, 507 ArrayPtr<const ArrayPtr<const byte>> morePieces, 508 ArrayPtr<const int> fds) { 509 const size_t iovmax = kj::miniposix::iovMax(); 510 // If there are more than IOV_MAX pieces, we'll only write the first IOV_MAX for now, and 511 // then we'll loop later. 512 KJ_STACK_ARRAY(struct iovec, iov, kj::min(1 + morePieces.size(), iovmax), 16, 128); 513 size_t iovTotal = 0; 514 515 // writev() interface is not const-correct. :( 516 iov[0].iov_base = const_cast<byte*>(firstPiece.begin()); 517 iov[0].iov_len = firstPiece.size(); 518 iovTotal += iov[0].iov_len; 519 for (uint i = 1; i < iov.size(); i++) { 520 iov[i].iov_base = const_cast<byte*>(morePieces[i - 1].begin()); 521 iov[i].iov_len = morePieces[i - 1].size(); 522 iovTotal += iov[i].iov_len; 523 } 524 525 if (iovTotal == 0) { 526 KJ_REQUIRE(fds.size() == 0, "can't write FDs without bytes"); 527 return kj::READY_NOW; 528 } 529 530 ssize_t n; 531 if (fds.size() == 0) { 532 KJ_NONBLOCKING_SYSCALL(n = ::writev(fd, iov.begin(), iov.size()), iovTotal, iov.size()) { 533 // Error. 534 535 // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to 536 // a bug that exists in both Clang and GCC: 537 // http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799 538 // http://llvm.org/bugs/show_bug.cgi?id=12286 539 goto error; 540 } 541 } else { 542 struct msghdr msg; 543 memset(&msg, 0, sizeof(msg)); 544 msg.msg_iov = iov.begin(); 545 msg.msg_iovlen = iov.size(); 546 547 // Allocate space to send a cmsg. 548 size_t msgBytes = CMSG_SPACE(sizeof(int) * fds.size()); 549 // On Linux, CMSG_SPACE will align to a word-size boundary, but on Mac it always aligns to a 550 // 32-bit boundary. I guess aligning to 32 bits helps avoid the problem where you 551 // surprisingly end up with space for two file descriptors when you only wanted one. However, 552 // cmsghdr's preferred alignment is word-size (it contains a size_t). If we stack-allocate 553 // the buffer, we need to make sure it is aligned properly (maybe not on x64, but maybe on 554 // other platforms), so we want to allocate an array of words (we use void*). So... we use 555 // CMSG_SPACE() and then additionally round up to deal with Mac. 556 size_t msgWords = (msgBytes + sizeof(void*) - 1) / sizeof(void*); 557 KJ_STACK_ARRAY(void*, cmsgSpace, msgWords, 16, 256); 558 auto cmsgBytes = cmsgSpace.asBytes(); 559 memset(cmsgBytes.begin(), 0, cmsgBytes.size()); 560 msg.msg_control = cmsgBytes.begin(); 561 msg.msg_controllen = msgBytes; 562 563 struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg); 564 cmsg->cmsg_level = SOL_SOCKET; 565 cmsg->cmsg_type = SCM_RIGHTS; 566 cmsg->cmsg_len = CMSG_LEN(sizeof(int) * fds.size()); 567 memcpy(CMSG_DATA(cmsg), fds.begin(), fds.asBytes().size()); 568 569 KJ_NONBLOCKING_SYSCALL(n = ::sendmsg(fd, &msg, 0)) { 570 // Error. 571 572 // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to 573 // a bug that exists in both Clang and GCC: 574 // http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799 575 // http://llvm.org/bugs/show_bug.cgi?id=12286 576 goto error; 577 } 578 } 579 580 if (false) { 581 error: 582 return kj::READY_NOW; 583 } 584 585 if (n < 0) { 586 // Got EAGAIN. Nothing was written. 587 return observer.whenBecomesWritable().then([=]() { 588 return writeInternal(firstPiece, morePieces, fds); 589 }); 590 } else if (n == 0) { 591 // Why would a sendmsg() with a non-empty message ever return 0 when writing to a stream 592 // socket? If there's no room in the send buffer, it should fail with EAGAIN. If the 593 // connection is closed, it should fail with EPIPE. Various documents and forum posts around 594 // the internet claim this can happen but no one seems to know when. My guess is it can only 595 // happen if we try to send an empty message -- which we didn't. So I think this is 596 // impossible. If it is possible, we need to figure out how to correctly handle it, which 597 // depends on what caused it. 598 // 599 // Note in particular that if 0 is a valid return here, and we sent an SCM_RIGHTS message, 600 // we need to know whether the message was sent or not, in order to decide whether to retry 601 // sending it! 602 KJ_FAIL_ASSERT("non-empty sendmsg() returned 0"); 603 } 604 605 // Non-zero bytes were written. This also implies that *all* FDs were written. 606 607 // Discard all data that was written, then issue a new write for what's left (if any). 608 for (;;) { 609 if (n < firstPiece.size()) { 610 // Only part of the first piece was consumed. Wait for buffer space and then write again. 611 firstPiece = firstPiece.slice(n, firstPiece.size()); 612 iovTotal -= n; 613 614 if (iovTotal == 0) { 615 // Oops, what actually happened is that we hit the IOV_MAX limit. Don't wait. 616 return writeInternal(firstPiece, morePieces, nullptr); 617 } 618 619 // As with read(), we cannot assume that a short write() really means the write buffer is 620 // full (see comments in the read path above). We have to write again. 621 return writeInternal(firstPiece, morePieces, nullptr); 622 } else if (morePieces.size() == 0) { 623 // First piece was fully-consumed and there are no more pieces, so we're done. 624 KJ_DASSERT(n == firstPiece.size(), n); 625 return READY_NOW; 626 } else { 627 // First piece was fully consumed, so move on to the next piece. 628 n -= firstPiece.size(); 629 iovTotal -= firstPiece.size(); 630 firstPiece = morePieces[0]; 631 morePieces = morePieces.slice(1, morePieces.size()); 632 } 633 } 634 } 635 }; 636 637 // ======================================================================================= 638 639 class SocketAddress { 640 public: 641 SocketAddress(const void* sockaddr, uint len): addrlen(len) { 642 KJ_REQUIRE(len <= sizeof(addr), "Sorry, your sockaddr is too big for me."); 643 memcpy(&addr.generic, sockaddr, len); 644 } 645 646 bool operator<(const SocketAddress& other) const { 647 // So we can use std::set<SocketAddress>... see DNS lookup code. 648 649 if (wildcard < other.wildcard) return true; 650 if (wildcard > other.wildcard) return false; 651 652 if (addrlen < other.addrlen) return true; 653 if (addrlen > other.addrlen) return false; 654 655 return memcmp(&addr.generic, &other.addr.generic, addrlen) < 0; 656 } 657 658 const struct sockaddr* getRaw() const { return &addr.generic; } 659 socklen_t getRawSize() const { return addrlen; } 660 661 int socket(int type) const { 662 bool isStream = type == SOCK_STREAM; 663 664 int result; 665 #if __linux__ && !__BIONIC__ 666 type |= SOCK_NONBLOCK | SOCK_CLOEXEC; 667 #endif 668 KJ_SYSCALL(result = ::socket(addr.generic.sa_family, type, 0)); 669 670 if (isStream && (addr.generic.sa_family == AF_INET || 671 addr.generic.sa_family == AF_INET6)) { 672 // TODO(perf): As a hack for the 0.4 release we are always setting 673 // TCP_NODELAY because Nagle's algorithm pretty much kills Cap'n Proto's 674 // RPC protocol. Later, we should extend the interface to provide more 675 // control over this. Perhaps write() should have a flag which 676 // specifies whether to pass MSG_MORE. 677 int one = 1; 678 KJ_SYSCALL(setsockopt( 679 result, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(one))); 680 } 681 682 return result; 683 } 684 685 void bind(int sockfd) const { 686 #if !defined(__OpenBSD__) 687 if (wildcard) { 688 // Disable IPV6_V6ONLY because we want to handle both ipv4 and ipv6 on this socket. (The 689 // default value of this option varies across platforms.) 690 int value = 0; 691 KJ_SYSCALL(setsockopt(sockfd, IPPROTO_IPV6, IPV6_V6ONLY, &value, sizeof(value))); 692 } 693 #endif 694 695 KJ_SYSCALL(::bind(sockfd, &addr.generic, addrlen), toString()); 696 } 697 698 uint getPort() const { 699 switch (addr.generic.sa_family) { 700 case AF_INET: return ntohs(addr.inet4.sin_port); 701 case AF_INET6: return ntohs(addr.inet6.sin6_port); 702 default: return 0; 703 } 704 } 705 706 String toString() const { 707 if (wildcard) { 708 return str("*:", getPort()); 709 } 710 711 switch (addr.generic.sa_family) { 712 case AF_INET: { 713 char buffer[INET6_ADDRSTRLEN]; 714 if (inet_ntop(addr.inet4.sin_family, &addr.inet4.sin_addr, 715 buffer, sizeof(buffer)) == nullptr) { 716 KJ_FAIL_SYSCALL("inet_ntop", errno) { break; } 717 return heapString("(inet_ntop error)"); 718 } 719 return str(buffer, ':', ntohs(addr.inet4.sin_port)); 720 } 721 case AF_INET6: { 722 char buffer[INET6_ADDRSTRLEN]; 723 if (inet_ntop(addr.inet6.sin6_family, &addr.inet6.sin6_addr, 724 buffer, sizeof(buffer)) == nullptr) { 725 KJ_FAIL_SYSCALL("inet_ntop", errno) { break; } 726 return heapString("(inet_ntop error)"); 727 } 728 return str('[', buffer, "]:", ntohs(addr.inet6.sin6_port)); 729 } 730 case AF_UNIX: { 731 auto path = _::safeUnixPath(&addr.unixDomain, addrlen); 732 if (path.size() > 0 && path[0] == '\0') { 733 return str("unix-abstract:", path.slice(1, path.size())); 734 } else { 735 return str("unix:", path); 736 } 737 } 738 default: 739 return str("(unknown address family ", addr.generic.sa_family, ")"); 740 } 741 } 742 743 static Promise<Array<SocketAddress>> lookupHost( 744 LowLevelAsyncIoProvider& lowLevel, kj::String host, kj::String service, uint portHint, 745 _::NetworkFilter& filter); 746 // Perform a DNS lookup. 747 748 static Promise<Array<SocketAddress>> parse( 749 LowLevelAsyncIoProvider& lowLevel, StringPtr str, uint portHint, _::NetworkFilter& filter) { 750 // TODO(someday): Allow commas in `str`. 751 752 SocketAddress result; 753 754 if (str.startsWith("unix:")) { 755 StringPtr path = str.slice(strlen("unix:")); 756 KJ_REQUIRE(path.size() < sizeof(addr.unixDomain.sun_path), 757 "Unix domain socket address is too long.", str); 758 KJ_REQUIRE(path.size() == strlen(path.cStr()), 759 "Unix domain socket address contains NULL. Use" 760 " 'unix-abstract:' for the abstract namespace."); 761 result.addr.unixDomain.sun_family = AF_UNIX; 762 strcpy(result.addr.unixDomain.sun_path, path.cStr()); 763 result.addrlen = offsetof(struct sockaddr_un, sun_path) + path.size() + 1; 764 765 if (!result.parseAllowedBy(filter)) { 766 KJ_FAIL_REQUIRE("unix sockets blocked by restrictPeers()"); 767 return Array<SocketAddress>(); 768 } 769 770 auto array = kj::heapArrayBuilder<SocketAddress>(1); 771 array.add(result); 772 return array.finish(); 773 } 774 775 if (str.startsWith("unix-abstract:")) { 776 StringPtr path = str.slice(strlen("unix-abstract:")); 777 KJ_REQUIRE(path.size() + 1 < sizeof(addr.unixDomain.sun_path), 778 "Unix domain socket address is too long.", str); 779 result.addr.unixDomain.sun_family = AF_UNIX; 780 result.addr.unixDomain.sun_path[0] = '\0'; 781 // although not strictly required by Linux, also copy the trailing 782 // NULL terminator so that we can safely read it back in toString 783 memcpy(result.addr.unixDomain.sun_path + 1, path.cStr(), path.size() + 1); 784 result.addrlen = offsetof(struct sockaddr_un, sun_path) + path.size() + 1; 785 786 if (!result.parseAllowedBy(filter)) { 787 KJ_FAIL_REQUIRE("abstract unix sockets blocked by restrictPeers()"); 788 return Array<SocketAddress>(); 789 } 790 791 auto array = kj::heapArrayBuilder<SocketAddress>(1); 792 array.add(result); 793 return array.finish(); 794 } 795 796 // Try to separate the address and port. 797 ArrayPtr<const char> addrPart; 798 Maybe<StringPtr> portPart; 799 800 int af; 801 802 if (str.startsWith("[")) { 803 // Address starts with a bracket, which is a common way to write an ip6 address with a port, 804 // since without brackets around the address part, the port looks like another segment of 805 // the address. 806 af = AF_INET6; 807 size_t closeBracket = KJ_ASSERT_NONNULL(str.findLast(']'), 808 "Unclosed '[' in address string.", str); 809 810 addrPart = str.slice(1, closeBracket); 811 if (str.size() > closeBracket + 1) { 812 KJ_REQUIRE(str.slice(closeBracket + 1).startsWith(":"), 813 "Expected port suffix after ']'.", str); 814 portPart = str.slice(closeBracket + 2); 815 } 816 } else { 817 KJ_IF_MAYBE(colon, str.findFirst(':')) { 818 if (str.slice(*colon + 1).findFirst(':') == nullptr) { 819 // There is exactly one colon and no brackets, so it must be an ip4 address with port. 820 af = AF_INET; 821 addrPart = str.slice(0, *colon); 822 portPart = str.slice(*colon + 1); 823 } else { 824 // There are two or more colons and no brackets, so the whole thing must be an ip6 825 // address with no port. 826 af = AF_INET6; 827 addrPart = str; 828 } 829 } else { 830 // No colons, so it must be an ip4 address without port. 831 af = AF_INET; 832 addrPart = str; 833 } 834 } 835 836 // Parse the port. 837 unsigned long port; 838 KJ_IF_MAYBE(portText, portPart) { 839 char* endptr; 840 port = strtoul(portText->cStr(), &endptr, 0); 841 if (portText->size() == 0 || *endptr != '\0') { 842 // Not a number. Maybe it's a service name. Fall back to DNS. 843 return lookupHost(lowLevel, kj::heapString(addrPart), kj::heapString(*portText), portHint, 844 filter); 845 } 846 KJ_REQUIRE(port < 65536, "Port number too large."); 847 } else { 848 port = portHint; 849 } 850 851 // Check for wildcard. 852 if (addrPart.size() == 1 && addrPart[0] == '*') { 853 result.wildcard = true; 854 #if defined(__OpenBSD__) 855 // On OpenBSD, all sockets are either v4-only or v6-only, so use v4 as a 856 // temporary workaround for wildcards. 857 result.addrlen = sizeof(addr.inet4); 858 result.addr.inet4.sin_family = AF_INET; 859 result.addr.inet4.sin_port = htons(port); 860 #else 861 // Create an ip6 socket and set IPV6_V6ONLY to 0 later. 862 result.addrlen = sizeof(addr.inet6); 863 result.addr.inet6.sin6_family = AF_INET6; 864 result.addr.inet6.sin6_port = htons(port); 865 #endif 866 867 auto array = kj::heapArrayBuilder<SocketAddress>(1); 868 array.add(result); 869 return array.finish(); 870 } 871 872 void* addrTarget; 873 if (af == AF_INET6) { 874 result.addrlen = sizeof(addr.inet6); 875 result.addr.inet6.sin6_family = AF_INET6; 876 result.addr.inet6.sin6_port = htons(port); 877 addrTarget = &result.addr.inet6.sin6_addr; 878 } else { 879 result.addrlen = sizeof(addr.inet4); 880 result.addr.inet4.sin_family = AF_INET; 881 result.addr.inet4.sin_port = htons(port); 882 addrTarget = &result.addr.inet4.sin_addr; 883 } 884 885 if (addrPart.size() < INET6_ADDRSTRLEN - 1) { 886 // addrPart is not necessarily NUL-terminated so we have to make a copy. :( 887 char buffer[INET6_ADDRSTRLEN]; 888 memcpy(buffer, addrPart.begin(), addrPart.size()); 889 buffer[addrPart.size()] = '\0'; 890 891 // OK, parse it! 892 switch (inet_pton(af, buffer, addrTarget)) { 893 case 1: { 894 // success. 895 if (!result.parseAllowedBy(filter)) { 896 KJ_FAIL_REQUIRE("address family blocked by restrictPeers()"); 897 return Array<SocketAddress>(); 898 } 899 900 auto array = kj::heapArrayBuilder<SocketAddress>(1); 901 array.add(result); 902 return array.finish(); 903 } 904 case 0: 905 // It's apparently not a simple address... fall back to DNS. 906 break; 907 default: 908 KJ_FAIL_SYSCALL("inet_pton", errno, af, addrPart); 909 } 910 } 911 912 return lookupHost(lowLevel, kj::heapString(addrPart), nullptr, port, filter); 913 } 914 915 static SocketAddress getLocalAddress(int sockfd) { 916 SocketAddress result; 917 result.addrlen = sizeof(addr); 918 KJ_SYSCALL(getsockname(sockfd, &result.addr.generic, &result.addrlen)); 919 return result; 920 } 921 922 bool allowedBy(LowLevelAsyncIoProvider::NetworkFilter& filter) { 923 return filter.shouldAllow(&addr.generic, addrlen); 924 } 925 926 bool parseAllowedBy(_::NetworkFilter& filter) { 927 return filter.shouldAllowParse(&addr.generic, addrlen); 928 } 929 930 kj::Own<PeerIdentity> getIdentity(LowLevelAsyncIoProvider& llaiop, 931 LowLevelAsyncIoProvider::NetworkFilter& filter, 932 AsyncIoStream& stream) const; 933 934 private: 935 SocketAddress() { 936 // We need to memset the whole object 0 otherwise Valgrind gets unhappy when we write it to a 937 // pipe, due to the padding bytes being uninitialized. 938 memset(this, 0, sizeof(*this)); 939 } 940 941 socklen_t addrlen; 942 bool wildcard = false; 943 union { 944 struct sockaddr generic; 945 struct sockaddr_in inet4; 946 struct sockaddr_in6 inet6; 947 struct sockaddr_un unixDomain; 948 struct sockaddr_storage storage; 949 } addr; 950 951 struct LookupParams; 952 class LookupReader; 953 }; 954 955 class SocketAddress::LookupReader { 956 // Reads SocketAddresses off of a pipe coming from another thread that is performing 957 // getaddrinfo. 958 959 public: 960 LookupReader(kj::Own<Thread>&& thread, kj::Own<AsyncInputStream>&& input, 961 _::NetworkFilter& filter) 962 : thread(kj::mv(thread)), input(kj::mv(input)), filter(filter) {} 963 964 ~LookupReader() { 965 if (thread) thread->detach(); 966 } 967 968 Promise<Array<SocketAddress>> read() { 969 return input->tryRead(¤t, sizeof(current), sizeof(current)).then( 970 [this](size_t n) -> Promise<Array<SocketAddress>> { 971 if (n < sizeof(current)) { 972 thread = nullptr; 973 // getaddrinfo()'s docs seem to say it will never return an empty list, but let's check 974 // anyway. 975 KJ_REQUIRE(addresses.size() > 0, "DNS lookup returned no permitted addresses.") { break; } 976 return addresses.releaseAsArray(); 977 } else { 978 // getaddrinfo() can return multiple copies of the same address for several reasons. 979 // A major one is that we don't give it a socket type (SOCK_STREAM vs. SOCK_DGRAM), so 980 // it may return two copies of the same address, one for each type, unless it explicitly 981 // knows that the service name given is specific to one type. But we can't tell it a type, 982 // because we don't actually know which one the user wants, and if we specify SOCK_STREAM 983 // while the user specified a UDP service name then they'll get a resolution error which 984 // is lame. (At least, I think that's how it works.) 985 // 986 // So we instead resort to de-duping results. 987 if (alreadySeen.insert(current).second) { 988 if (current.parseAllowedBy(filter)) { 989 addresses.add(current); 990 } 991 } 992 return read(); 993 } 994 }); 995 } 996 997 private: 998 kj::Own<Thread> thread; 999 kj::Own<AsyncInputStream> input; 1000 _::NetworkFilter& filter; 1001 SocketAddress current; 1002 kj::Vector<SocketAddress> addresses; 1003 std::set<SocketAddress> alreadySeen; 1004 }; 1005 1006 struct SocketAddress::LookupParams { 1007 kj::String host; 1008 kj::String service; 1009 }; 1010 1011 Promise<Array<SocketAddress>> SocketAddress::lookupHost( 1012 LowLevelAsyncIoProvider& lowLevel, kj::String host, kj::String service, uint portHint, 1013 _::NetworkFilter& filter) { 1014 // This shitty function spawns a thread to run getaddrinfo(). Unfortunately, getaddrinfo() is 1015 // the only cross-platform DNS API and it is blocking. 1016 // 1017 // TODO(perf): Use a thread pool? Maybe kj::Thread should use a thread pool automatically? 1018 // Maybe use the various platform-specific asynchronous DNS libraries? Please do not implement 1019 // a custom DNS resolver... 1020 1021 int fds[2]; 1022 #if __linux__ && !__BIONIC__ 1023 KJ_SYSCALL(pipe2(fds, O_NONBLOCK | O_CLOEXEC)); 1024 #else 1025 KJ_SYSCALL(pipe(fds)); 1026 #endif 1027 1028 auto input = lowLevel.wrapInputFd(fds[0], NEW_FD_FLAGS); 1029 1030 int outFd = fds[1]; 1031 1032 LookupParams params = { kj::mv(host), kj::mv(service) }; 1033 1034 auto thread = heap<Thread>(kj::mvCapture(params, [outFd,portHint](LookupParams&& params) { 1035 FdOutputStream output((AutoCloseFd(outFd))); 1036 1037 struct addrinfo* list; 1038 int status = getaddrinfo( 1039 params.host == "*" ? nullptr : params.host.cStr(), 1040 params.service == nullptr ? nullptr : params.service.cStr(), 1041 nullptr, &list); 1042 if (status == 0) { 1043 KJ_DEFER(freeaddrinfo(list)); 1044 1045 struct addrinfo* cur = list; 1046 while (cur != nullptr) { 1047 if (params.service == nullptr) { 1048 switch (cur->ai_addr->sa_family) { 1049 case AF_INET: 1050 ((struct sockaddr_in*)cur->ai_addr)->sin_port = htons(portHint); 1051 break; 1052 case AF_INET6: 1053 ((struct sockaddr_in6*)cur->ai_addr)->sin6_port = htons(portHint); 1054 break; 1055 default: 1056 break; 1057 } 1058 } 1059 1060 SocketAddress addr; 1061 if (params.host == "*") { 1062 // Set up a wildcard SocketAddress. Only use the port number returned by getaddrinfo(). 1063 addr.wildcard = true; 1064 addr.addrlen = sizeof(addr.addr.inet6); 1065 addr.addr.inet6.sin6_family = AF_INET6; 1066 switch (cur->ai_addr->sa_family) { 1067 case AF_INET: 1068 addr.addr.inet6.sin6_port = ((struct sockaddr_in*)cur->ai_addr)->sin_port; 1069 break; 1070 case AF_INET6: 1071 addr.addr.inet6.sin6_port = ((struct sockaddr_in6*)cur->ai_addr)->sin6_port; 1072 break; 1073 default: 1074 addr.addr.inet6.sin6_port = portHint; 1075 break; 1076 } 1077 } else { 1078 addr.addrlen = cur->ai_addrlen; 1079 memcpy(&addr.addr.generic, cur->ai_addr, cur->ai_addrlen); 1080 } 1081 KJ_ASSERT_CAN_MEMCPY(SocketAddress); 1082 output.write(&addr, sizeof(addr)); 1083 cur = cur->ai_next; 1084 } 1085 } else if (status == EAI_SYSTEM) { 1086 KJ_FAIL_SYSCALL("getaddrinfo", errno, params.host, params.service) { 1087 return; 1088 } 1089 } else { 1090 KJ_FAIL_REQUIRE("DNS lookup failed.", 1091 params.host, params.service, gai_strerror(status)) { 1092 return; 1093 } 1094 } 1095 })); 1096 1097 auto reader = heap<LookupReader>(kj::mv(thread), kj::mv(input), filter); 1098 return reader->read().attach(kj::mv(reader)); 1099 } 1100 1101 // ======================================================================================= 1102 1103 class FdConnectionReceiver final: public ConnectionReceiver, public OwnedFileDescriptor { 1104 public: 1105 FdConnectionReceiver(LowLevelAsyncIoProvider& lowLevel, 1106 UnixEventPort& eventPort, int fd, 1107 LowLevelAsyncIoProvider::NetworkFilter& filter, uint flags) 1108 : OwnedFileDescriptor(fd, flags), lowLevel(lowLevel), eventPort(eventPort), filter(filter), 1109 observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ) {} 1110 1111 Promise<Own<AsyncIoStream>> accept() override { 1112 return acceptImpl(false).then([](AuthenticatedStream&& a) { return kj::mv(a.stream); }); 1113 } 1114 1115 Promise<AuthenticatedStream> acceptAuthenticated() override { 1116 return acceptImpl(true); 1117 } 1118 1119 Promise<AuthenticatedStream> acceptImpl(bool authenticated) { 1120 int newFd; 1121 1122 struct sockaddr_storage addr; 1123 socklen_t addrlen = sizeof(addr); 1124 1125 retry: 1126 #if __linux__ && !__BIONIC__ 1127 newFd = ::accept4(fd, reinterpret_cast<struct sockaddr*>(&addr), &addrlen, 1128 SOCK_NONBLOCK | SOCK_CLOEXEC); 1129 #else 1130 newFd = ::accept(fd, reinterpret_cast<struct sockaddr*>(&addr), &addrlen); 1131 #endif 1132 1133 if (newFd >= 0) { 1134 kj::AutoCloseFd ownFd(newFd); 1135 if (!filter.shouldAllow(reinterpret_cast<struct sockaddr*>(&addr), addrlen)) { 1136 // Ignore disallowed address. 1137 return acceptImpl(authenticated); 1138 } else { 1139 // TODO(perf): As a hack for the 0.4 release we are always setting 1140 // TCP_NODELAY because Nagle's algorithm pretty much kills Cap'n Proto's 1141 // RPC protocol. Later, we should extend the interface to provide more 1142 // control over this. Perhaps write() should have a flag which 1143 // specifies whether to pass MSG_MORE. 1144 int one = 1; 1145 KJ_SYSCALL_HANDLE_ERRORS(::setsockopt( 1146 ownFd.get(), IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(one))) { 1147 case EOPNOTSUPP: 1148 case ENOPROTOOPT: // (returned for AF_UNIX in cygwin) 1149 #if __FreeBSD__ 1150 case EINVAL: // (returned for AF_UNIX in FreeBSD) 1151 #endif 1152 break; 1153 default: 1154 KJ_FAIL_SYSCALL("setsocketopt(IPPROTO_TCP, TCP_NODELAY)", error); 1155 } 1156 1157 AuthenticatedStream result; 1158 result.stream = heap<AsyncStreamFd>(eventPort, ownFd.release(), NEW_FD_FLAGS); 1159 if (authenticated) { 1160 result.peerIdentity = SocketAddress(reinterpret_cast<struct sockaddr*>(&addr), addrlen) 1161 .getIdentity(lowLevel, filter, *result.stream); 1162 } 1163 return kj::mv(result); 1164 } 1165 } else { 1166 int error = errno; 1167 1168 switch (error) { 1169 case EAGAIN: 1170 #if EAGAIN != EWOULDBLOCK 1171 case EWOULDBLOCK: 1172 #endif 1173 // Not ready yet. 1174 return observer.whenBecomesReadable().then([this,authenticated]() { 1175 return acceptImpl(authenticated); 1176 }); 1177 1178 case EINTR: 1179 case ENETDOWN: 1180 #ifdef EPROTO 1181 // EPROTO is not defined on OpenBSD. 1182 case EPROTO: 1183 #endif 1184 case EHOSTDOWN: 1185 case EHOSTUNREACH: 1186 case ENETUNREACH: 1187 case ECONNABORTED: 1188 case ETIMEDOUT: 1189 // According to the Linux man page, accept() may report an error if the accepted 1190 // connection is already broken. In this case, we really ought to just ignore it and 1191 // keep waiting. But it's hard to say exactly what errors are such network errors and 1192 // which ones are permanent errors. We've made a guess here. 1193 goto retry; 1194 1195 default: 1196 KJ_FAIL_SYSCALL("accept", error); 1197 } 1198 1199 } 1200 } 1201 1202 uint getPort() override { 1203 return SocketAddress::getLocalAddress(fd).getPort(); 1204 } 1205 1206 void getsockopt(int level, int option, void* value, uint* length) override { 1207 socklen_t socklen = *length; 1208 KJ_SYSCALL(::getsockopt(fd, level, option, value, &socklen)); 1209 *length = socklen; 1210 } 1211 void setsockopt(int level, int option, const void* value, uint length) override { 1212 KJ_SYSCALL(::setsockopt(fd, level, option, value, length)); 1213 } 1214 void getsockname(struct sockaddr* addr, uint* length) override { 1215 socklen_t socklen = *length; 1216 KJ_SYSCALL(::getsockname(fd, addr, &socklen)); 1217 *length = socklen; 1218 } 1219 1220 public: 1221 LowLevelAsyncIoProvider& lowLevel; 1222 UnixEventPort& eventPort; 1223 LowLevelAsyncIoProvider::NetworkFilter& filter; 1224 UnixEventPort::FdObserver observer; 1225 }; 1226 1227 class DatagramPortImpl final: public DatagramPort, public OwnedFileDescriptor { 1228 public: 1229 DatagramPortImpl(LowLevelAsyncIoProvider& lowLevel, UnixEventPort& eventPort, int fd, 1230 LowLevelAsyncIoProvider::NetworkFilter& filter, uint flags) 1231 : OwnedFileDescriptor(fd, flags), lowLevel(lowLevel), eventPort(eventPort), filter(filter), 1232 observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ | 1233 UnixEventPort::FdObserver::OBSERVE_WRITE) {} 1234 1235 Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) override; 1236 Promise<size_t> send( 1237 ArrayPtr<const ArrayPtr<const byte>> pieces, NetworkAddress& destination) override; 1238 1239 class ReceiverImpl; 1240 1241 Own<DatagramReceiver> makeReceiver(DatagramReceiver::Capacity capacity) override; 1242 1243 uint getPort() override { 1244 return SocketAddress::getLocalAddress(fd).getPort(); 1245 } 1246 1247 void getsockopt(int level, int option, void* value, uint* length) override { 1248 socklen_t socklen = *length; 1249 KJ_SYSCALL(::getsockopt(fd, level, option, value, &socklen)); 1250 *length = socklen; 1251 } 1252 void setsockopt(int level, int option, const void* value, uint length) override { 1253 KJ_SYSCALL(::setsockopt(fd, level, option, value, length)); 1254 } 1255 1256 public: 1257 LowLevelAsyncIoProvider& lowLevel; 1258 UnixEventPort& eventPort; 1259 LowLevelAsyncIoProvider::NetworkFilter& filter; 1260 UnixEventPort::FdObserver observer; 1261 }; 1262 1263 class LowLevelAsyncIoProviderImpl final: public LowLevelAsyncIoProvider { 1264 public: 1265 LowLevelAsyncIoProviderImpl() 1266 : eventLoop(eventPort), waitScope(eventLoop) {} 1267 1268 inline WaitScope& getWaitScope() { return waitScope; } 1269 1270 Own<AsyncInputStream> wrapInputFd(int fd, uint flags = 0) override { 1271 return heap<AsyncStreamFd>(eventPort, fd, flags); 1272 } 1273 Own<AsyncOutputStream> wrapOutputFd(int fd, uint flags = 0) override { 1274 return heap<AsyncStreamFd>(eventPort, fd, flags); 1275 } 1276 Own<AsyncIoStream> wrapSocketFd(int fd, uint flags = 0) override { 1277 return heap<AsyncStreamFd>(eventPort, fd, flags); 1278 } 1279 Own<AsyncCapabilityStream> wrapUnixSocketFd(Fd fd, uint flags = 0) override { 1280 return heap<AsyncStreamFd>(eventPort, fd, flags); 1281 } 1282 Promise<Own<AsyncIoStream>> wrapConnectingSocketFd( 1283 int fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) override { 1284 // It's important that we construct the AsyncStreamFd first, so that `flags` are honored, 1285 // especially setting nonblocking mode and taking ownership. 1286 auto result = heap<AsyncStreamFd>(eventPort, fd, flags); 1287 1288 // Unfortunately connect() doesn't fit the mold of KJ_NONBLOCKING_SYSCALL, since it indicates 1289 // non-blocking using EINPROGRESS. 1290 for (;;) { 1291 if (::connect(fd, addr, addrlen) < 0) { 1292 int error = errno; 1293 if (error == EINPROGRESS) { 1294 // Fine. 1295 break; 1296 } else if (error != EINTR) { 1297 auto address = SocketAddress(addr, addrlen).toString(); 1298 KJ_FAIL_SYSCALL("connect()", error, address) { break; } 1299 return Own<AsyncIoStream>(); 1300 } 1301 } else { 1302 // no error 1303 break; 1304 } 1305 } 1306 1307 auto connected = result->waitConnected(); 1308 return connected.then(kj::mvCapture(result, [fd](Own<AsyncIoStream>&& stream) { 1309 int err; 1310 socklen_t errlen = sizeof(err); 1311 KJ_SYSCALL(getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen)); 1312 if (err != 0) { 1313 KJ_FAIL_SYSCALL("connect()", err) { break; } 1314 } 1315 return kj::mv(stream); 1316 })); 1317 } 1318 Own<ConnectionReceiver> wrapListenSocketFd( 1319 int fd, NetworkFilter& filter, uint flags = 0) override { 1320 return heap<FdConnectionReceiver>(*this, eventPort, fd, filter, flags); 1321 } 1322 Own<DatagramPort> wrapDatagramSocketFd( 1323 int fd, NetworkFilter& filter, uint flags = 0) override { 1324 return heap<DatagramPortImpl>(*this, eventPort, fd, filter, flags); 1325 } 1326 1327 Timer& getTimer() override { return eventPort.getTimer(); } 1328 1329 UnixEventPort& getEventPort() { return eventPort; } 1330 1331 private: 1332 UnixEventPort eventPort; 1333 EventLoop eventLoop; 1334 WaitScope waitScope; 1335 }; 1336 1337 // ======================================================================================= 1338 1339 class NetworkAddressImpl final: public NetworkAddress { 1340 public: 1341 NetworkAddressImpl(LowLevelAsyncIoProvider& lowLevel, 1342 LowLevelAsyncIoProvider::NetworkFilter& filter, 1343 Array<SocketAddress> addrs) 1344 : lowLevel(lowLevel), filter(filter), addrs(kj::mv(addrs)) {} 1345 1346 Promise<Own<AsyncIoStream>> connect() override { 1347 auto addrsCopy = heapArray(addrs.asPtr()); 1348 auto promise = connectImpl(lowLevel, filter, addrsCopy, false); 1349 return promise.attach(kj::mv(addrsCopy)) 1350 .then([](AuthenticatedStream&& a) { return kj::mv(a.stream); }); 1351 } 1352 1353 Promise<AuthenticatedStream> connectAuthenticated() override { 1354 auto addrsCopy = heapArray(addrs.asPtr()); 1355 auto promise = connectImpl(lowLevel, filter, addrsCopy, true); 1356 return promise.attach(kj::mv(addrsCopy)); 1357 } 1358 1359 Own<ConnectionReceiver> listen() override { 1360 if (addrs.size() > 1) { 1361 KJ_LOG(WARNING, "Bind address resolved to multiple addresses. Only the first address will " 1362 "be used. If this is incorrect, specify the address numerically. This may be fixed " 1363 "in the future.", addrs[0].toString()); 1364 } 1365 1366 int fd = addrs[0].socket(SOCK_STREAM); 1367 1368 { 1369 KJ_ON_SCOPE_FAILURE(close(fd)); 1370 1371 // We always enable SO_REUSEADDR because having to take your server down for five minutes 1372 // before it can restart really sucks. 1373 int optval = 1; 1374 KJ_SYSCALL(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval))); 1375 1376 addrs[0].bind(fd); 1377 1378 // TODO(someday): Let queue size be specified explicitly in string addresses. 1379 KJ_SYSCALL(::listen(fd, SOMAXCONN)); 1380 } 1381 1382 return lowLevel.wrapListenSocketFd(fd, filter, NEW_FD_FLAGS); 1383 } 1384 1385 Own<DatagramPort> bindDatagramPort() override { 1386 if (addrs.size() > 1) { 1387 KJ_LOG(WARNING, "Bind address resolved to multiple addresses. Only the first address will " 1388 "be used. If this is incorrect, specify the address numerically. This may be fixed " 1389 "in the future.", addrs[0].toString()); 1390 } 1391 1392 int fd = addrs[0].socket(SOCK_DGRAM); 1393 1394 { 1395 KJ_ON_SCOPE_FAILURE(close(fd)); 1396 1397 // We always enable SO_REUSEADDR because having to take your server down for five minutes 1398 // before it can restart really sucks. 1399 int optval = 1; 1400 KJ_SYSCALL(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval))); 1401 1402 addrs[0].bind(fd); 1403 } 1404 1405 return lowLevel.wrapDatagramSocketFd(fd, filter, NEW_FD_FLAGS); 1406 } 1407 1408 Own<NetworkAddress> clone() override { 1409 return kj::heap<NetworkAddressImpl>(lowLevel, filter, kj::heapArray(addrs.asPtr())); 1410 } 1411 1412 String toString() override { 1413 return strArray(KJ_MAP(addr, addrs) { return addr.toString(); }, ","); 1414 } 1415 1416 const SocketAddress& chooseOneAddress() { 1417 KJ_REQUIRE(addrs.size() > 0, "No addresses available."); 1418 return addrs[counter++ % addrs.size()]; 1419 } 1420 1421 private: 1422 LowLevelAsyncIoProvider& lowLevel; 1423 LowLevelAsyncIoProvider::NetworkFilter& filter; 1424 Array<SocketAddress> addrs; 1425 uint counter = 0; 1426 1427 static Promise<AuthenticatedStream> connectImpl( 1428 LowLevelAsyncIoProvider& lowLevel, 1429 LowLevelAsyncIoProvider::NetworkFilter& filter, 1430 ArrayPtr<SocketAddress> addrs, 1431 bool authenticated) { 1432 KJ_ASSERT(addrs.size() > 0); 1433 1434 return kj::evalNow([&]() -> Promise<Own<AsyncIoStream>> { 1435 if (!addrs[0].allowedBy(filter)) { 1436 return KJ_EXCEPTION(FAILED, "connect() blocked by restrictPeers()"); 1437 } else { 1438 int fd = addrs[0].socket(SOCK_STREAM); 1439 return lowLevel.wrapConnectingSocketFd( 1440 fd, addrs[0].getRaw(), addrs[0].getRawSize(), NEW_FD_FLAGS); 1441 } 1442 }).then([&lowLevel,&filter,addrs,authenticated](Own<AsyncIoStream>&& stream) 1443 -> Promise<AuthenticatedStream> { 1444 // Success, pass along. 1445 AuthenticatedStream result; 1446 result.stream = kj::mv(stream); 1447 if (authenticated) { 1448 result.peerIdentity = addrs[0].getIdentity(lowLevel, filter, *result.stream); 1449 } 1450 return kj::mv(result); 1451 }, [&lowLevel,&filter,addrs,authenticated](Exception&& exception) mutable 1452 -> Promise<AuthenticatedStream> { 1453 // Connect failed. 1454 if (addrs.size() > 1) { 1455 // Try the next address instead. 1456 return connectImpl(lowLevel, filter, addrs.slice(1, addrs.size()), authenticated); 1457 } else { 1458 // No more addresses to try, so propagate the exception. 1459 return kj::mv(exception); 1460 } 1461 }); 1462 } 1463 }; 1464 1465 kj::Own<PeerIdentity> SocketAddress::getIdentity(kj::LowLevelAsyncIoProvider& llaiop, 1466 LowLevelAsyncIoProvider::NetworkFilter& filter, 1467 AsyncIoStream& stream) const { 1468 switch (addr.generic.sa_family) { 1469 case AF_INET: 1470 case AF_INET6: { 1471 auto builder = kj::heapArrayBuilder<SocketAddress>(1); 1472 builder.add(*this); 1473 return NetworkPeerIdentity::newInstance( 1474 kj::heap<NetworkAddressImpl>(llaiop, filter, builder.finish())); 1475 } 1476 case AF_UNIX: { 1477 LocalPeerIdentity::Credentials result; 1478 1479 // There is little documentation on what happens when the uid/pid can't be obtained, but I've 1480 // seen vague references on the internet saying that a PID of 0 and a UID of uid_t(-1) are used 1481 // as invalid values. 1482 1483 // OpenBSD defines SO_PEERCRED but uses a different interface for it 1484 // hence we're falling back to LOCAL_PEERCRED 1485 #if defined(SO_PEERCRED) && !__OpenBSD__ 1486 struct ucred creds; 1487 uint length = sizeof(creds); 1488 stream.getsockopt(SOL_SOCKET, SO_PEERCRED, &creds, &length); 1489 if (creds.pid > 0) { 1490 result.pid = creds.pid; 1491 } 1492 if (creds.uid != static_cast<uid_t>(-1)) { 1493 result.uid = creds.uid; 1494 } 1495 1496 #elif defined(LOCAL_PEERCRED) 1497 // MacOS / FreeBSD / OpenBSD 1498 struct xucred creds; 1499 uint length = sizeof(creds); 1500 stream.getsockopt(SOL_LOCAL, LOCAL_PEERCRED, &creds, &length); 1501 KJ_ASSERT(length == sizeof(creds)); 1502 if (creds.cr_uid != static_cast<uid_t>(-1)) { 1503 result.uid = creds.cr_uid; 1504 } 1505 1506 #if defined(LOCAL_PEERPID) 1507 // MacOS only? 1508 pid_t pid; 1509 length = sizeof(pid); 1510 stream.getsockopt(SOL_LOCAL, LOCAL_PEERPID, &pid, &length); 1511 KJ_ASSERT(length == sizeof(pid)); 1512 if (pid > 0) { 1513 result.pid = pid; 1514 } 1515 #endif 1516 #endif 1517 1518 return LocalPeerIdentity::newInstance(result); 1519 } 1520 default: 1521 return UnknownPeerIdentity::newInstance(); 1522 } 1523 } 1524 1525 class SocketNetwork final: public Network { 1526 public: 1527 explicit SocketNetwork(LowLevelAsyncIoProvider& lowLevel): lowLevel(lowLevel) {} 1528 explicit SocketNetwork(SocketNetwork& parent, 1529 kj::ArrayPtr<const kj::StringPtr> allow, 1530 kj::ArrayPtr<const kj::StringPtr> deny) 1531 : lowLevel(parent.lowLevel), filter(allow, deny, parent.filter) {} 1532 1533 Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) override { 1534 return evalLater(mvCapture(heapString(addr), [this,portHint](String&& addr) { 1535 return SocketAddress::parse(lowLevel, addr, portHint, filter); 1536 })).then([this](Array<SocketAddress> addresses) -> Own<NetworkAddress> { 1537 return heap<NetworkAddressImpl>(lowLevel, filter, kj::mv(addresses)); 1538 }); 1539 } 1540 1541 Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) override { 1542 auto array = kj::heapArrayBuilder<SocketAddress>(1); 1543 array.add(SocketAddress(sockaddr, len)); 1544 KJ_REQUIRE(array[0].allowedBy(filter), "address blocked by restrictPeers()") { break; } 1545 return Own<NetworkAddress>(heap<NetworkAddressImpl>(lowLevel, filter, array.finish())); 1546 } 1547 1548 Own<Network> restrictPeers( 1549 kj::ArrayPtr<const kj::StringPtr> allow, 1550 kj::ArrayPtr<const kj::StringPtr> deny = nullptr) override { 1551 return heap<SocketNetwork>(*this, allow, deny); 1552 } 1553 1554 private: 1555 LowLevelAsyncIoProvider& lowLevel; 1556 _::NetworkFilter filter; 1557 }; 1558 1559 // ======================================================================================= 1560 1561 Promise<size_t> DatagramPortImpl::send( 1562 const void* buffer, size_t size, NetworkAddress& destination) { 1563 auto& addr = downcast<NetworkAddressImpl>(destination).chooseOneAddress(); 1564 1565 ssize_t n; 1566 KJ_NONBLOCKING_SYSCALL(n = sendto(fd, buffer, size, 0, addr.getRaw(), addr.getRawSize())); 1567 if (n < 0) { 1568 // Write buffer full. 1569 return observer.whenBecomesWritable().then([this, buffer, size, &destination]() { 1570 return send(buffer, size, destination); 1571 }); 1572 } else { 1573 // If less than the whole message was sent, then it got truncated, and there's nothing we can 1574 // do about it. 1575 return n; 1576 } 1577 } 1578 1579 Promise<size_t> DatagramPortImpl::send( 1580 ArrayPtr<const ArrayPtr<const byte>> pieces, NetworkAddress& destination) { 1581 struct msghdr msg; 1582 memset(&msg, 0, sizeof(msg)); 1583 1584 auto& addr = downcast<NetworkAddressImpl>(destination).chooseOneAddress(); 1585 msg.msg_name = const_cast<void*>(implicitCast<const void*>(addr.getRaw())); 1586 msg.msg_namelen = addr.getRawSize(); 1587 1588 const size_t iovmax = kj::miniposix::iovMax(); 1589 KJ_STACK_ARRAY(struct iovec, iov, kj::min(pieces.size(), iovmax), 16, 64); 1590 1591 for (size_t i: kj::indices(pieces)) { 1592 iov[i].iov_base = const_cast<void*>(implicitCast<const void*>(pieces[i].begin())); 1593 iov[i].iov_len = pieces[i].size(); 1594 } 1595 1596 Array<byte> extra; 1597 if (pieces.size() > iovmax) { 1598 // Too many pieces, but we can't use multiple syscalls because they'd send separate 1599 // datagrams. We'll have to copy the trailing pieces into a temporary array. 1600 // 1601 // TODO(perf): On Linux we could use multiple syscalls via MSG_MORE or sendmsg/sendmmsg. 1602 size_t extraSize = 0; 1603 for (size_t i = iovmax - 1; i < pieces.size(); i++) { 1604 extraSize += pieces[i].size(); 1605 } 1606 extra = kj::heapArray<byte>(extraSize); 1607 extraSize = 0; 1608 for (size_t i = iovmax - 1; i < pieces.size(); i++) { 1609 memcpy(extra.begin() + extraSize, pieces[i].begin(), pieces[i].size()); 1610 extraSize += pieces[i].size(); 1611 } 1612 iov.back().iov_base = extra.begin(); 1613 iov.back().iov_len = extra.size(); 1614 } 1615 1616 msg.msg_iov = iov.begin(); 1617 msg.msg_iovlen = iov.size(); 1618 1619 ssize_t n; 1620 KJ_NONBLOCKING_SYSCALL(n = sendmsg(fd, &msg, 0)); 1621 if (n < 0) { 1622 // Write buffer full. 1623 return observer.whenBecomesWritable().then([this, pieces, &destination]() { 1624 return send(pieces, destination); 1625 }); 1626 } else { 1627 // If less than the whole message was sent, then it was truncated, and there's nothing we can 1628 // do about that now. 1629 return n; 1630 } 1631 } 1632 1633 class DatagramPortImpl::ReceiverImpl final: public DatagramReceiver { 1634 public: 1635 explicit ReceiverImpl(DatagramPortImpl& port, Capacity capacity) 1636 : port(port), 1637 contentBuffer(heapArray<byte>(capacity.content)), 1638 ancillaryBuffer(capacity.ancillary > 0 ? heapArray<byte>(capacity.ancillary) 1639 : Array<byte>(nullptr)) {} 1640 1641 Promise<void> receive() override { 1642 struct msghdr msg; 1643 memset(&msg, 0, sizeof(msg)); 1644 1645 struct sockaddr_storage addr; 1646 memset(&addr, 0, sizeof(addr)); 1647 msg.msg_name = &addr; 1648 msg.msg_namelen = sizeof(addr); 1649 1650 struct iovec iov; 1651 iov.iov_base = contentBuffer.begin(); 1652 iov.iov_len = contentBuffer.size(); 1653 msg.msg_iov = &iov; 1654 msg.msg_iovlen = 1; 1655 msg.msg_control = ancillaryBuffer.begin(); 1656 msg.msg_controllen = ancillaryBuffer.size(); 1657 1658 ssize_t n; 1659 KJ_NONBLOCKING_SYSCALL(n = recvmsg(port.fd, &msg, 0)); 1660 1661 if (n < 0) { 1662 // No data available. Wait. 1663 return port.observer.whenBecomesReadable().then([this]() { 1664 return receive(); 1665 }); 1666 } else { 1667 if (!port.filter.shouldAllow(reinterpret_cast<const struct sockaddr*>(msg.msg_name), 1668 msg.msg_namelen)) { 1669 // Ignore message from disallowed source. 1670 return receive(); 1671 } 1672 1673 receivedSize = n; 1674 contentTruncated = msg.msg_flags & MSG_TRUNC; 1675 1676 source.emplace(port.lowLevel, port.filter, msg.msg_name, msg.msg_namelen); 1677 1678 ancillaryList.resize(0); 1679 ancillaryTruncated = msg.msg_flags & MSG_CTRUNC; 1680 1681 for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr; 1682 cmsg = CMSG_NXTHDR(&msg, cmsg)) { 1683 // On some platforms (OSX), a cmsghdr's length may cross the end of the ancillary buffer 1684 // when truncated. On other platforms (Linux) the length in cmsghdr will itself be 1685 // truncated to fit within the buffer. 1686 1687 #if __APPLE__ 1688 // On MacOS, `CMSG_SPACE(0)` triggers a bogus warning. 1689 #pragma GCC diagnostic ignored "-Wnull-pointer-arithmetic" 1690 #endif 1691 const byte* pos = reinterpret_cast<const byte*>(cmsg); 1692 size_t available = ancillaryBuffer.end() - pos; 1693 if (available < CMSG_SPACE(0)) { 1694 // The buffer ends in the middle of the header. We can't use this message. 1695 // (On Linux, this never happens, because the message is not included if there isn't 1696 // space for a header. I'm not sure how other systems behave, though, so let's be safe.) 1697 break; 1698 } 1699 1700 // OK, we know the cmsghdr is valid, at least. 1701 1702 // Find the start of the message payload. 1703 const byte* begin = (const byte *)CMSG_DATA(cmsg); 1704 1705 // Cap the message length to the available space. 1706 const byte* end = pos + kj::min(available, cmsg->cmsg_len); 1707 1708 ancillaryList.add(AncillaryMessage( 1709 cmsg->cmsg_level, cmsg->cmsg_type, arrayPtr(begin, end))); 1710 } 1711 1712 return READY_NOW; 1713 } 1714 } 1715 1716 MaybeTruncated<ArrayPtr<const byte>> getContent() override { 1717 return { contentBuffer.slice(0, receivedSize), contentTruncated }; 1718 } 1719 1720 MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() override { 1721 return { ancillaryList.asPtr(), ancillaryTruncated }; 1722 } 1723 1724 NetworkAddress& getSource() override { 1725 return KJ_REQUIRE_NONNULL(source, "Haven't sent a message yet.").abstract; 1726 } 1727 1728 private: 1729 DatagramPortImpl& port; 1730 Array<byte> contentBuffer; 1731 Array<byte> ancillaryBuffer; 1732 Vector<AncillaryMessage> ancillaryList; 1733 size_t receivedSize = 0; 1734 bool contentTruncated = false; 1735 bool ancillaryTruncated = false; 1736 1737 struct StoredAddress { 1738 StoredAddress(LowLevelAsyncIoProvider& lowLevel, LowLevelAsyncIoProvider::NetworkFilter& filter, 1739 const void* sockaddr, uint length) 1740 : raw(sockaddr, length), 1741 abstract(lowLevel, filter, Array<SocketAddress>(&raw, 1, NullArrayDisposer::instance)) {} 1742 1743 SocketAddress raw; 1744 NetworkAddressImpl abstract; 1745 }; 1746 1747 kj::Maybe<StoredAddress> source; 1748 }; 1749 1750 Own<DatagramReceiver> DatagramPortImpl::makeReceiver(DatagramReceiver::Capacity capacity) { 1751 return kj::heap<ReceiverImpl>(*this, capacity); 1752 } 1753 1754 // ======================================================================================= 1755 1756 class AsyncIoProviderImpl final: public AsyncIoProvider { 1757 public: 1758 AsyncIoProviderImpl(LowLevelAsyncIoProvider& lowLevel) 1759 : lowLevel(lowLevel), network(lowLevel) {} 1760 1761 OneWayPipe newOneWayPipe() override { 1762 int fds[2]; 1763 #if __linux__ && !__BIONIC__ 1764 KJ_SYSCALL(pipe2(fds, O_NONBLOCK | O_CLOEXEC)); 1765 #else 1766 KJ_SYSCALL(pipe(fds)); 1767 #endif 1768 return OneWayPipe { 1769 lowLevel.wrapInputFd(fds[0], NEW_FD_FLAGS), 1770 lowLevel.wrapOutputFd(fds[1], NEW_FD_FLAGS) 1771 }; 1772 } 1773 1774 TwoWayPipe newTwoWayPipe() override { 1775 int fds[2]; 1776 int type = SOCK_STREAM; 1777 #if __linux__ && !__BIONIC__ 1778 type |= SOCK_NONBLOCK | SOCK_CLOEXEC; 1779 #endif 1780 KJ_SYSCALL(socketpair(AF_UNIX, type, 0, fds)); 1781 return TwoWayPipe { { 1782 lowLevel.wrapSocketFd(fds[0], NEW_FD_FLAGS), 1783 lowLevel.wrapSocketFd(fds[1], NEW_FD_FLAGS) 1784 } }; 1785 } 1786 1787 CapabilityPipe newCapabilityPipe() override { 1788 int fds[2]; 1789 int type = SOCK_STREAM; 1790 #if __linux__ && !__BIONIC__ 1791 type |= SOCK_NONBLOCK | SOCK_CLOEXEC; 1792 #endif 1793 KJ_SYSCALL(socketpair(AF_UNIX, type, 0, fds)); 1794 return CapabilityPipe { { 1795 lowLevel.wrapUnixSocketFd(fds[0], NEW_FD_FLAGS), 1796 lowLevel.wrapUnixSocketFd(fds[1], NEW_FD_FLAGS) 1797 } }; 1798 } 1799 1800 Network& getNetwork() override { 1801 return network; 1802 } 1803 1804 PipeThread newPipeThread( 1805 Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) override { 1806 int fds[2]; 1807 int type = SOCK_STREAM; 1808 #if __linux__ && !__BIONIC__ 1809 type |= SOCK_NONBLOCK | SOCK_CLOEXEC; 1810 #endif 1811 KJ_SYSCALL(socketpair(AF_UNIX, type, 0, fds)); 1812 1813 int threadFd = fds[1]; 1814 KJ_ON_SCOPE_FAILURE(close(threadFd)); 1815 1816 auto pipe = lowLevel.wrapSocketFd(fds[0], NEW_FD_FLAGS); 1817 1818 auto thread = heap<Thread>(kj::mvCapture(startFunc, 1819 [threadFd](Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)>&& startFunc) { 1820 LowLevelAsyncIoProviderImpl lowLevel; 1821 auto stream = lowLevel.wrapSocketFd(threadFd, NEW_FD_FLAGS); 1822 AsyncIoProviderImpl ioProvider(lowLevel); 1823 startFunc(ioProvider, *stream, lowLevel.getWaitScope()); 1824 })); 1825 1826 return { kj::mv(thread), kj::mv(pipe) }; 1827 } 1828 1829 Timer& getTimer() override { return lowLevel.getTimer(); } 1830 1831 private: 1832 LowLevelAsyncIoProvider& lowLevel; 1833 SocketNetwork network; 1834 }; 1835 1836 } // namespace 1837 1838 Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel) { 1839 return kj::heap<AsyncIoProviderImpl>(lowLevel); 1840 } 1841 1842 AsyncIoContext setupAsyncIo() { 1843 auto lowLevel = heap<LowLevelAsyncIoProviderImpl>(); 1844 auto ioProvider = kj::heap<AsyncIoProviderImpl>(*lowLevel); 1845 auto& waitScope = lowLevel->getWaitScope(); 1846 auto& eventPort = lowLevel->getEventPort(); 1847 return { kj::mv(lowLevel), kj::mv(ioProvider), waitScope, eventPort }; 1848 } 1849 1850 } // namespace kj 1851 1852 #endif // !_WIN32