async-unix.c++ (34525B)
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #if !_WIN32 23 24 #include "async-unix.h" 25 #include "debug.h" 26 #include "threadlocal.h" 27 #include <setjmp.h> 28 #include <errno.h> 29 #include <inttypes.h> 30 #include <limits> 31 #include <pthread.h> 32 #include <map> 33 #include <sys/wait.h> 34 #include <unistd.h> 35 36 #if KJ_USE_EPOLL 37 #include <sys/epoll.h> 38 #include <sys/signalfd.h> 39 #include <sys/eventfd.h> 40 #else 41 #include <poll.h> 42 #include <fcntl.h> 43 #endif 44 45 namespace kj { 46 47 // ======================================================================================= 48 // Signal code common to multiple implementations 49 50 namespace { 51 52 int reservedSignal = SIGUSR1; 53 bool tooLateToSetReserved = false; 54 bool capturedChildExit = false; 55 bool threadClaimedChildExits = false; 56 57 struct SignalCapture { 58 sigjmp_buf jumpTo; 59 siginfo_t siginfo; 60 61 #if __APPLE__ 62 sigset_t originalMask; 63 // The signal mask to be restored when jumping out of the signal handler. 64 // 65 // "But wait!" you say, "Isn't the whole point of siglongjmp() that it does this for you?" Well, 66 // yes, that is supposed to be the point. However, Apple implemented in wrong. On macOS, 67 // siglongjmp() uses sigprocmask() -- not pthread_sigmask() -- to restore the signal mask. 68 // Unfortunately, sigprocmask() on macOS affects threads other than the current thread. Arguably 69 // this is conformant: sigprocmask() is documented as having unspecified behavior in the presence 70 // of threads, and pthread_sigmask() must be used instead. However, this means siglongjmp() 71 // cannot be used in the presence of threads. 72 // 73 // We'll just have to restore the signal mask ourselves, rather than rely on siglongjmp()... 74 // 75 // ... but we ONLY do that on Apple systems, because it turns out, ironically, on Android, this 76 // hack breaks signal delivery. pthread_sigmask() vs. sigprocmask() is not the issue; we 77 // apparently MUST let siglongjmp() itself deal with the signal mask, otherwise various tests in 78 // async-unix-test.c++ end up hanging (I haven't gotten to the bottom of why). Note that on stock 79 // Linux, _either_ strategy works fine; this appears to be a problem with Android's Bionic libc. 80 // Since letting siglongjmp() do the work _seeems_ more "correct", we'll make it the default and 81 // only do something different on Apple platforms. 82 #define KJ_BROKEN_SIGLONGJMP 1 83 #endif 84 }; 85 86 #if !KJ_USE_EPOLL // on Linux we'll use signalfd 87 KJ_THREADLOCAL_PTR(SignalCapture) threadCapture = nullptr; 88 89 void signalHandler(int, siginfo_t* siginfo, void*) { 90 SignalCapture* capture = threadCapture; 91 if (capture != nullptr) { 92 capture->siginfo = *siginfo; 93 94 #if KJ_BROKEN_SIGLONGJMP 95 // See comments on SignalCapture::originalMask, above: We can't rely on siglongjmp() to restore 96 // the signal mask; we must do it ourselves using pthread_sigmask(). We pass false as the 97 // second parameter to siglongjmp() so that it skips changing the signal mask. This makes it 98 // equivalent to `longjmp()` on Linux or `_longjmp()` on BSD/macOS. See comments on 99 // SignalCapture::originalMask for explanation. 100 pthread_sigmask(SIG_SETMASK, &capture->originalMask, nullptr); 101 siglongjmp(capture->jumpTo, false); 102 #else 103 siglongjmp(capture->jumpTo, true); 104 #endif 105 } 106 } 107 #endif 108 109 void registerSignalHandler(int signum) { 110 tooLateToSetReserved = true; 111 112 sigset_t mask; 113 KJ_SYSCALL(sigemptyset(&mask)); 114 KJ_SYSCALL(sigaddset(&mask, signum)); 115 KJ_SYSCALL(pthread_sigmask(SIG_BLOCK, &mask, nullptr)); 116 117 #if !KJ_USE_EPOLL // on Linux we'll use signalfd 118 struct sigaction action; 119 memset(&action, 0, sizeof(action)); 120 action.sa_sigaction = &signalHandler; 121 KJ_SYSCALL(sigfillset(&action.sa_mask)); 122 action.sa_flags = SA_SIGINFO; 123 KJ_SYSCALL(sigaction(signum, &action, nullptr)); 124 #endif 125 } 126 127 #if !KJ_USE_EPOLL && !KJ_USE_PIPE_FOR_WAKEUP 128 void registerReservedSignal() { 129 registerSignalHandler(reservedSignal); 130 } 131 #endif 132 133 void ignoreSigpipe() { 134 // We disable SIGPIPE because users of UnixEventPort almost certainly don't want it. 135 while (signal(SIGPIPE, SIG_IGN) == SIG_ERR) { 136 int error = errno; 137 if (error != EINTR) { 138 KJ_FAIL_SYSCALL("signal(SIGPIPE, SIG_IGN)", error); 139 } 140 } 141 } 142 143 } // namespace 144 145 struct UnixEventPort::ChildSet { 146 std::map<pid_t, ChildExitPromiseAdapter*> waiters; 147 148 void checkExits(); 149 }; 150 151 class UnixEventPort::ChildExitPromiseAdapter { 152 public: 153 inline ChildExitPromiseAdapter(PromiseFulfiller<int>& fulfiller, 154 ChildSet& childSet, Maybe<pid_t>& pidRef) 155 : childSet(childSet), 156 pid(KJ_REQUIRE_NONNULL(pidRef, 157 "`pid` must be non-null at the time `onChildExit()` is called")), 158 pidRef(pidRef), fulfiller(fulfiller) { 159 KJ_REQUIRE(childSet.waiters.insert(std::make_pair(pid, this)).second, 160 "already called onChildExit() for this pid"); 161 } 162 163 ~ChildExitPromiseAdapter() noexcept(false) { 164 childSet.waiters.erase(pid); 165 } 166 167 ChildSet& childSet; 168 pid_t pid; 169 Maybe<pid_t>& pidRef; 170 PromiseFulfiller<int>& fulfiller; 171 }; 172 173 void UnixEventPort::ChildSet::checkExits() { 174 for (;;) { 175 int status; 176 pid_t pid; 177 KJ_SYSCALL_HANDLE_ERRORS(pid = waitpid(-1, &status, WNOHANG)) { 178 case ECHILD: 179 return; 180 default: 181 KJ_FAIL_SYSCALL("waitpid()", error); 182 } 183 if (pid == 0) break; 184 185 auto iter = waiters.find(pid); 186 if (iter != waiters.end()) { 187 iter->second->pidRef = nullptr; 188 iter->second->fulfiller.fulfill(kj::cp(status)); 189 } 190 } 191 } 192 193 Promise<int> UnixEventPort::onChildExit(Maybe<pid_t>& pid) { 194 KJ_REQUIRE(capturedChildExit, 195 "must call UnixEventPort::captureChildExit() to use onChildExit()."); 196 197 ChildSet* cs; 198 KJ_IF_MAYBE(c, childSet) { 199 cs = *c; 200 } else { 201 // In theory we should do an atomic compare-and-swap on threadClaimedChildExits, but this is 202 // for debug purposes only so it's not a big deal. 203 KJ_REQUIRE(!threadClaimedChildExits, 204 "only one UnixEvertPort per process may listen for child exits"); 205 threadClaimedChildExits = true; 206 207 auto newChildSet = kj::heap<ChildSet>(); 208 cs = newChildSet; 209 childSet = kj::mv(newChildSet); 210 } 211 212 return kj::newAdaptedPromise<int, ChildExitPromiseAdapter>(*cs, pid); 213 } 214 215 void UnixEventPort::captureChildExit() { 216 captureSignal(SIGCHLD); 217 capturedChildExit = true; 218 } 219 220 class UnixEventPort::SignalPromiseAdapter { 221 public: 222 inline SignalPromiseAdapter(PromiseFulfiller<siginfo_t>& fulfiller, 223 UnixEventPort& loop, int signum) 224 : loop(loop), signum(signum), fulfiller(fulfiller) { 225 prev = loop.signalTail; 226 *loop.signalTail = this; 227 loop.signalTail = &next; 228 } 229 230 ~SignalPromiseAdapter() noexcept(false) { 231 if (prev != nullptr) { 232 if (next == nullptr) { 233 loop.signalTail = prev; 234 } else { 235 next->prev = prev; 236 } 237 *prev = next; 238 } 239 } 240 241 SignalPromiseAdapter* removeFromList() { 242 auto result = next; 243 if (next == nullptr) { 244 loop.signalTail = prev; 245 } else { 246 next->prev = prev; 247 } 248 *prev = next; 249 next = nullptr; 250 prev = nullptr; 251 return result; 252 } 253 254 UnixEventPort& loop; 255 int signum; 256 PromiseFulfiller<siginfo_t>& fulfiller; 257 SignalPromiseAdapter* next = nullptr; 258 SignalPromiseAdapter** prev = nullptr; 259 }; 260 261 Promise<siginfo_t> UnixEventPort::onSignal(int signum) { 262 KJ_REQUIRE(signum != SIGCHLD || !capturedChildExit, 263 "can't call onSigal(SIGCHLD) when kj::UnixEventPort::captureChildExit() has been called"); 264 return newAdaptedPromise<siginfo_t, SignalPromiseAdapter>(*this, signum); 265 } 266 267 void UnixEventPort::captureSignal(int signum) { 268 if (reservedSignal == SIGUSR1) { 269 KJ_REQUIRE(signum != SIGUSR1, 270 "Sorry, SIGUSR1 is reserved by the UnixEventPort implementation. You may call " 271 "UnixEventPort::setReservedSignal() to reserve a different signal."); 272 } else { 273 KJ_REQUIRE(signum != reservedSignal, 274 "Can't capture signal reserved using setReservedSignal().", signum); 275 } 276 registerSignalHandler(signum); 277 } 278 279 void UnixEventPort::setReservedSignal(int signum) { 280 KJ_REQUIRE(!tooLateToSetReserved, 281 "setReservedSignal() must be called before any calls to `captureSignal()` and " 282 "before any `UnixEventPort` is constructed."); 283 if (reservedSignal != SIGUSR1 && reservedSignal != signum) { 284 KJ_FAIL_REQUIRE("Detected multiple conflicting calls to setReservedSignal(). Please only " 285 "call this once, or always call it with the same signal number."); 286 } 287 reservedSignal = signum; 288 } 289 290 void UnixEventPort::gotSignal(const siginfo_t& siginfo) { 291 // If onChildExit() has been called and this is SIGCHLD, check for child exits. 292 KJ_IF_MAYBE(cs, childSet) { 293 if (siginfo.si_signo == SIGCHLD) { 294 cs->get()->checkExits(); 295 return; 296 } 297 } 298 299 // Fire any events waiting on this signal. 300 auto ptr = signalHead; 301 while (ptr != nullptr) { 302 if (ptr->signum == siginfo.si_signo) { 303 ptr->fulfiller.fulfill(kj::cp(siginfo)); 304 ptr = ptr->removeFromList(); 305 } else { 306 ptr = ptr->next; 307 } 308 } 309 } 310 311 #if KJ_USE_EPOLL 312 // ======================================================================================= 313 // epoll FdObserver implementation 314 315 UnixEventPort::UnixEventPort() 316 : clock(systemPreciseMonotonicClock()), 317 timerImpl(clock.now()), 318 epollFd(-1), 319 signalFd(-1), 320 eventFd(-1) { 321 ignoreSigpipe(); 322 323 int fd; 324 KJ_SYSCALL(fd = epoll_create1(EPOLL_CLOEXEC)); 325 epollFd = AutoCloseFd(fd); 326 327 memset(&signalFdSigset, 0, sizeof(signalFdSigset)); 328 329 KJ_SYSCALL(sigemptyset(&signalFdSigset)); 330 KJ_SYSCALL(fd = signalfd(-1, &signalFdSigset, SFD_NONBLOCK | SFD_CLOEXEC)); 331 signalFd = AutoCloseFd(fd); 332 333 KJ_SYSCALL(fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)); 334 eventFd = AutoCloseFd(fd); 335 336 337 struct epoll_event event; 338 memset(&event, 0, sizeof(event)); 339 event.events = EPOLLIN; 340 event.data.u64 = 0; 341 KJ_SYSCALL(epoll_ctl(epollFd, EPOLL_CTL_ADD, signalFd, &event)); 342 event.data.u64 = 1; 343 KJ_SYSCALL(epoll_ctl(epollFd, EPOLL_CTL_ADD, eventFd, &event)); 344 } 345 346 UnixEventPort::~UnixEventPort() noexcept(false) { 347 if (childSet != nullptr) { 348 // We had claimed the exclusive right to call onChildExit(). Release that right. 349 threadClaimedChildExits = false; 350 } 351 } 352 353 UnixEventPort::FdObserver::FdObserver(UnixEventPort& eventPort, int fd, uint flags) 354 : eventPort(eventPort), fd(fd), flags(flags) { 355 struct epoll_event event; 356 memset(&event, 0, sizeof(event)); 357 358 if (flags & OBSERVE_READ) { 359 event.events |= EPOLLIN | EPOLLRDHUP; 360 } 361 if (flags & OBSERVE_WRITE) { 362 event.events |= EPOLLOUT; 363 } 364 if (flags & OBSERVE_URGENT) { 365 event.events |= EPOLLPRI; 366 } 367 event.events |= EPOLLET; // Set edge-triggered mode. 368 369 event.data.ptr = this; 370 371 KJ_SYSCALL(epoll_ctl(eventPort.epollFd, EPOLL_CTL_ADD, fd, &event)); 372 } 373 374 UnixEventPort::FdObserver::~FdObserver() noexcept(false) { 375 KJ_SYSCALL(epoll_ctl(eventPort.epollFd, EPOLL_CTL_DEL, fd, nullptr)) { break; } 376 } 377 378 void UnixEventPort::FdObserver::fire(short events) { 379 if (events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR)) { 380 if (events & (EPOLLHUP | EPOLLRDHUP)) { 381 atEnd = true; 382 } else { 383 // Since we didn't receive EPOLLRDHUP, we know that we're not at the end. 384 atEnd = false; 385 } 386 387 KJ_IF_MAYBE(f, readFulfiller) { 388 f->get()->fulfill(); 389 readFulfiller = nullptr; 390 } 391 } 392 393 if (events & (EPOLLOUT | EPOLLHUP | EPOLLERR)) { 394 KJ_IF_MAYBE(f, writeFulfiller) { 395 f->get()->fulfill(); 396 writeFulfiller = nullptr; 397 } 398 } 399 400 if (events & (EPOLLHUP | EPOLLERR)) { 401 KJ_IF_MAYBE(f, hupFulfiller) { 402 f->get()->fulfill(); 403 hupFulfiller = nullptr; 404 } 405 } 406 407 if (events & EPOLLPRI) { 408 KJ_IF_MAYBE(f, urgentFulfiller) { 409 f->get()->fulfill(); 410 urgentFulfiller = nullptr; 411 } 412 } 413 } 414 415 Promise<void> UnixEventPort::FdObserver::whenBecomesReadable() { 416 KJ_REQUIRE(flags & OBSERVE_READ, "FdObserver was not set to observe reads."); 417 418 auto paf = newPromiseAndFulfiller<void>(); 419 readFulfiller = kj::mv(paf.fulfiller); 420 return kj::mv(paf.promise); 421 } 422 423 Promise<void> UnixEventPort::FdObserver::whenBecomesWritable() { 424 KJ_REQUIRE(flags & OBSERVE_WRITE, "FdObserver was not set to observe writes."); 425 426 auto paf = newPromiseAndFulfiller<void>(); 427 writeFulfiller = kj::mv(paf.fulfiller); 428 return kj::mv(paf.promise); 429 } 430 431 Promise<void> UnixEventPort::FdObserver::whenUrgentDataAvailable() { 432 KJ_REQUIRE(flags & OBSERVE_URGENT, 433 "FdObserver was not set to observe availability of urgent data."); 434 435 auto paf = newPromiseAndFulfiller<void>(); 436 urgentFulfiller = kj::mv(paf.fulfiller); 437 return kj::mv(paf.promise); 438 } 439 440 Promise<void> UnixEventPort::FdObserver::whenWriteDisconnected() { 441 auto paf = newPromiseAndFulfiller<void>(); 442 hupFulfiller = kj::mv(paf.fulfiller); 443 return kj::mv(paf.promise); 444 } 445 446 bool UnixEventPort::wait() { 447 return doEpollWait( 448 timerImpl.timeoutToNextEvent(clock.now(), MILLISECONDS, int(maxValue)) 449 .map([](uint64_t t) -> int { return t; }) 450 .orDefault(-1)); 451 } 452 453 bool UnixEventPort::poll() { 454 return doEpollWait(0); 455 } 456 457 void UnixEventPort::wake() const { 458 uint64_t one = 1; 459 ssize_t n; 460 KJ_NONBLOCKING_SYSCALL(n = write(eventFd, &one, sizeof(one))); 461 KJ_ASSERT(n < 0 || n == sizeof(one)); 462 } 463 464 static siginfo_t toRegularSiginfo(const struct signalfd_siginfo& siginfo) { 465 // Unfortunately, siginfo_t is mostly a big union and the correct set of fields to fill in 466 // depends on the type of signal. OTOH, signalfd_siginfo is a flat struct that expands all 467 // siginfo_t's union fields out to be non-overlapping. We can't just copy all the fields over 468 // because of the unions; we have to carefully figure out which fields are appropriate to fill 469 // in for this signal. Ick. 470 471 siginfo_t result; 472 memset(&result, 0, sizeof(result)); 473 474 result.si_signo = siginfo.ssi_signo; 475 result.si_errno = siginfo.ssi_errno; 476 result.si_code = siginfo.ssi_code; 477 478 if (siginfo.ssi_code > 0) { 479 // Signal originated from the kernel. The structure of the siginfo depends primarily on the 480 // signal number. 481 482 switch (siginfo.ssi_signo) { 483 case SIGCHLD: 484 result.si_pid = siginfo.ssi_pid; 485 result.si_uid = siginfo.ssi_uid; 486 result.si_status = siginfo.ssi_status; 487 result.si_utime = siginfo.ssi_utime; 488 result.si_stime = siginfo.ssi_stime; 489 break; 490 491 case SIGILL: 492 case SIGFPE: 493 case SIGSEGV: 494 case SIGBUS: 495 case SIGTRAP: 496 result.si_addr = reinterpret_cast<void*>(static_cast<uintptr_t>(siginfo.ssi_addr)); 497 #ifdef si_trapno 498 result.si_trapno = siginfo.ssi_trapno; 499 #endif 500 #ifdef si_addr_lsb 501 // ssi_addr_lsb is defined as coming immediately after ssi_addr in the kernel headers but 502 // apparently the userspace headers were never updated. So we do a pointer hack. :( 503 result.si_addr_lsb = *reinterpret_cast<const uint16_t*>(&siginfo.ssi_addr + 1); 504 #endif 505 break; 506 507 case SIGIO: 508 static_assert(SIGIO == SIGPOLL, "SIGIO != SIGPOLL?"); 509 510 // Note: Technically, code can arrange for SIGIO signals to be delivered with a signal number 511 // other than SIGIO. AFAICT there is no way for us to detect this in the siginfo. Luckily 512 // SIGIO is totally obsoleted by epoll so it shouldn't come up. 513 514 result.si_band = siginfo.ssi_band; 515 result.si_fd = siginfo.ssi_fd; 516 break; 517 518 case SIGSYS: 519 // Apparently SIGSYS's fields are not available in signalfd_siginfo? 520 break; 521 } 522 523 } else { 524 // Signal originated from userspace. The sender could specify whatever signal number they 525 // wanted. The structure of the signal is determined by the API they used, which is identified 526 // by SI_CODE. 527 528 switch (siginfo.ssi_code) { 529 case SI_USER: 530 case SI_TKILL: 531 // kill(), tkill(), or tgkill(). 532 result.si_pid = siginfo.ssi_pid; 533 result.si_uid = siginfo.ssi_uid; 534 break; 535 536 case SI_QUEUE: 537 case SI_MESGQ: 538 case SI_ASYNCIO: 539 default: 540 result.si_pid = siginfo.ssi_pid; 541 result.si_uid = siginfo.ssi_uid; 542 543 // This is awkward. In siginfo_t, si_ptr and si_int are in a union together. In 544 // signalfd_siginfo, they are not. We don't really know whether the app intended to send 545 // an int or a pointer. Presumably since the pointer is always larger than the int, if 546 // we write the pointer, we'll end up with the right value for the int? Presumably the 547 // two fields of signalfd_siginfo are actually extracted from one of these unions 548 // originally, so actually contain redundant data? Better write some tests... 549 // 550 // Making matters even stranger, siginfo.ssi_ptr is 64-bit even on 32-bit systems, and 551 // it appears that instead of doing the obvious thing by casting the pointer value to 552 // 64 bits, the kernel actually memcpy()s the 32-bit value into the 64-bit space. As 553 // a result, on big-endian 32-bit systems, the original pointer value ends up in the 554 // *upper* 32 bits of siginfo.ssi_ptr, which is totally weird. We play along and use 555 // a memcpy() on our end too, to get the right result on all platforms. 556 memcpy(&result.si_ptr, &siginfo.ssi_ptr, sizeof(result.si_ptr)); 557 break; 558 559 case SI_TIMER: 560 result.si_timerid = siginfo.ssi_tid; 561 result.si_overrun = siginfo.ssi_overrun; 562 563 // Again with this weirdness... 564 result.si_ptr = reinterpret_cast<void*>(static_cast<uintptr_t>(siginfo.ssi_ptr)); 565 break; 566 } 567 } 568 569 return result; 570 } 571 572 bool UnixEventPort::doEpollWait(int timeout) { 573 sigset_t newMask; 574 memset(&newMask, 0, sizeof(newMask)); 575 sigemptyset(&newMask); 576 577 { 578 auto ptr = signalHead; 579 while (ptr != nullptr) { 580 sigaddset(&newMask, ptr->signum); 581 ptr = ptr->next; 582 } 583 if (childSet != nullptr) { 584 sigaddset(&newMask, SIGCHLD); 585 } 586 } 587 588 if (memcmp(&newMask, &signalFdSigset, sizeof(newMask)) != 0) { 589 // Apparently we're not waiting on the same signals as last time. Need to update the signal 590 // FD's mask. 591 signalFdSigset = newMask; 592 KJ_SYSCALL(signalfd(signalFd, &signalFdSigset, SFD_NONBLOCK | SFD_CLOEXEC)); 593 } 594 595 struct epoll_event events[16]; 596 int n = epoll_wait(epollFd, events, kj::size(events), timeout); 597 if (n < 0) { 598 int error = errno; 599 if (error == EINTR) { 600 // We can't simply restart the epoll call because we need to recompute the timeout. Instead, 601 // we pretend epoll_wait() returned zero events. This will cause the event loop to spin once, 602 // decide it has nothing to do, recompute timeouts, then return to waiting. 603 n = 0; 604 } else { 605 KJ_FAIL_SYSCALL("epoll_wait()", error); 606 } 607 } 608 609 bool woken = false; 610 611 for (int i = 0; i < n; i++) { 612 if (events[i].data.u64 == 0) { 613 for (;;) { 614 struct signalfd_siginfo siginfo; 615 ssize_t n; 616 KJ_NONBLOCKING_SYSCALL(n = read(signalFd, &siginfo, sizeof(siginfo))); 617 if (n < 0) break; // no more signals 618 619 KJ_ASSERT(n == sizeof(siginfo)); 620 621 gotSignal(toRegularSiginfo(siginfo)); 622 623 #ifdef SIGRTMIN 624 if (siginfo.ssi_signo >= SIGRTMIN) { 625 // This is an RT signal. There could be multiple copies queued. We need to remove it from 626 // the signalfd's signal mask before we continue, to avoid accidentally reading and 627 // discarding the extra copies. 628 // TODO(perf): If high throughput of RT signals is desired then perhaps we should read 629 // them all into userspace and queue them here. Maybe we even need a better interface 630 // than onSignal() for receiving high-volume RT signals. 631 KJ_SYSCALL(sigdelset(&signalFdSigset, siginfo.ssi_signo)); 632 KJ_SYSCALL(signalfd(signalFd, &signalFdSigset, SFD_NONBLOCK | SFD_CLOEXEC)); 633 } 634 #endif 635 } 636 } else if (events[i].data.u64 == 1) { 637 // Someone called wake() from another thread. Consume the event. 638 uint64_t value; 639 ssize_t n; 640 KJ_NONBLOCKING_SYSCALL(n = read(eventFd, &value, sizeof(value))); 641 KJ_ASSERT(n < 0 || n == sizeof(value)); 642 643 // We were woken. Need to return true. 644 woken = true; 645 } else { 646 FdObserver* observer = reinterpret_cast<FdObserver*>(events[i].data.ptr); 647 observer->fire(events[i].events); 648 } 649 } 650 651 timerImpl.advanceTo(clock.now()); 652 653 return woken; 654 } 655 656 #else // KJ_USE_EPOLL 657 // ======================================================================================= 658 // Traditional poll() FdObserver implementation. 659 660 #ifndef POLLRDHUP 661 #define POLLRDHUP 0 662 #endif 663 664 UnixEventPort::UnixEventPort() 665 : clock(systemPreciseMonotonicClock()), 666 timerImpl(clock.now()) { 667 #if KJ_USE_PIPE_FOR_WAKEUP 668 // Allocate a pipe to which we'll write a byte in order to wake this thread. 669 int fds[2]; 670 KJ_SYSCALL(pipe(fds)); 671 wakePipeIn = kj::AutoCloseFd(fds[0]); 672 wakePipeOut = kj::AutoCloseFd(fds[1]); 673 KJ_SYSCALL(fcntl(wakePipeIn, F_SETFD, FD_CLOEXEC)); 674 KJ_SYSCALL(fcntl(wakePipeOut, F_SETFD, FD_CLOEXEC)); 675 #else 676 static_assert(sizeof(threadId) >= sizeof(pthread_t), 677 "pthread_t is larger than a long long on your platform. Please port."); 678 *reinterpret_cast<pthread_t*>(&threadId) = pthread_self(); 679 680 // Note: We used to use a pthread_once to call registerReservedSignal() only once per process. 681 // This didn't work correctly because registerReservedSignal() not only registers the 682 // (process-wide) signal handler, but also sets the (per-thread) signal mask to block the 683 // signal. Thus, if threads were spawned before the first UnixEventPort was created, and then 684 // multiple threads created UnixEventPorts, only one of them would have the signal properly 685 // blocked. We could have changed things so that only the handler registration was protected 686 // by the pthread_once and the mask update happened in every thread, but registering a signal 687 // handler is not an expensive operation, so whatever... we'll do it in every thread. 688 registerReservedSignal(); 689 #endif 690 691 ignoreSigpipe(); 692 } 693 694 UnixEventPort::~UnixEventPort() noexcept(false) {} 695 696 UnixEventPort::FdObserver::FdObserver(UnixEventPort& eventPort, int fd, uint flags) 697 : eventPort(eventPort), fd(fd), flags(flags), next(nullptr), prev(nullptr) {} 698 699 UnixEventPort::FdObserver::~FdObserver() noexcept(false) { 700 if (prev != nullptr) { 701 if (next == nullptr) { 702 eventPort.observersTail = prev; 703 } else { 704 next->prev = prev; 705 } 706 *prev = next; 707 } 708 } 709 710 void UnixEventPort::FdObserver::fire(short events) { 711 if (events & (POLLIN | POLLHUP | POLLRDHUP | POLLERR | POLLNVAL)) { 712 if (events & (POLLHUP | POLLRDHUP)) { 713 atEnd = true; 714 #if POLLRDHUP 715 } else { 716 // Since POLLRDHUP exists on this platform, and we didn't receive it, we know that we're not 717 // at the end. 718 atEnd = false; 719 #endif 720 } 721 722 KJ_IF_MAYBE(f, readFulfiller) { 723 f->get()->fulfill(); 724 readFulfiller = nullptr; 725 } 726 } 727 728 if (events & (POLLOUT | POLLHUP | POLLERR | POLLNVAL)) { 729 KJ_IF_MAYBE(f, writeFulfiller) { 730 f->get()->fulfill(); 731 writeFulfiller = nullptr; 732 } 733 } 734 735 if (events & (POLLHUP | POLLERR | POLLNVAL)) { 736 KJ_IF_MAYBE(f, hupFulfiller) { 737 f->get()->fulfill(); 738 hupFulfiller = nullptr; 739 } 740 } 741 742 if (events & POLLPRI) { 743 KJ_IF_MAYBE(f, urgentFulfiller) { 744 f->get()->fulfill(); 745 urgentFulfiller = nullptr; 746 } 747 } 748 749 if (readFulfiller == nullptr && writeFulfiller == nullptr && urgentFulfiller == nullptr && 750 hupFulfiller == nullptr) { 751 // Remove from list. 752 if (next == nullptr) { 753 eventPort.observersTail = prev; 754 } else { 755 next->prev = prev; 756 } 757 *prev = next; 758 next = nullptr; 759 prev = nullptr; 760 } 761 } 762 763 short UnixEventPort::FdObserver::getEventMask() { 764 return (readFulfiller == nullptr ? 0 : (POLLIN | POLLRDHUP)) | 765 (writeFulfiller == nullptr ? 0 : POLLOUT) | 766 (urgentFulfiller == nullptr ? 0 : POLLPRI) | 767 // The POSIX standard says POLLHUP and POLLERR will be reported even if not requested. 768 // But on MacOS, if `events` is 0, then POLLHUP apparently will not be reported: 769 // https://openradar.appspot.com/37537852 770 // It seems that by settingc any non-zero value -- even one documented as ignored -- we 771 // cause POLLHUP to be reported. Both POLLHUP and POLLERR are documented as being ignored. 772 // So, we'll go ahead and set them. This has no effect on non-broken OSs, causes MacOS to 773 // do the right thing, and sort of looks as if we're explicitly requesting notification of 774 // these two conditions, which we do after all want to know about. 775 POLLHUP | POLLERR; 776 } 777 778 Promise<void> UnixEventPort::FdObserver::whenBecomesReadable() { 779 KJ_REQUIRE(flags & OBSERVE_READ, "FdObserver was not set to observe reads."); 780 781 if (prev == nullptr) { 782 KJ_DASSERT(next == nullptr); 783 prev = eventPort.observersTail; 784 *prev = this; 785 eventPort.observersTail = &next; 786 } 787 788 auto paf = newPromiseAndFulfiller<void>(); 789 readFulfiller = kj::mv(paf.fulfiller); 790 return kj::mv(paf.promise); 791 } 792 793 Promise<void> UnixEventPort::FdObserver::whenBecomesWritable() { 794 KJ_REQUIRE(flags & OBSERVE_WRITE, "FdObserver was not set to observe writes."); 795 796 if (prev == nullptr) { 797 KJ_DASSERT(next == nullptr); 798 prev = eventPort.observersTail; 799 *prev = this; 800 eventPort.observersTail = &next; 801 } 802 803 auto paf = newPromiseAndFulfiller<void>(); 804 writeFulfiller = kj::mv(paf.fulfiller); 805 return kj::mv(paf.promise); 806 } 807 808 Promise<void> UnixEventPort::FdObserver::whenUrgentDataAvailable() { 809 KJ_REQUIRE(flags & OBSERVE_URGENT, 810 "FdObserver was not set to observe availability of urgent data."); 811 812 if (prev == nullptr) { 813 KJ_DASSERT(next == nullptr); 814 prev = eventPort.observersTail; 815 *prev = this; 816 eventPort.observersTail = &next; 817 } 818 819 auto paf = newPromiseAndFulfiller<void>(); 820 urgentFulfiller = kj::mv(paf.fulfiller); 821 return kj::mv(paf.promise); 822 } 823 824 Promise<void> UnixEventPort::FdObserver::whenWriteDisconnected() { 825 if (prev == nullptr) { 826 KJ_DASSERT(next == nullptr); 827 prev = eventPort.observersTail; 828 *prev = this; 829 eventPort.observersTail = &next; 830 } 831 832 auto paf = newPromiseAndFulfiller<void>(); 833 hupFulfiller = kj::mv(paf.fulfiller); 834 return kj::mv(paf.promise); 835 } 836 837 class UnixEventPort::PollContext { 838 public: 839 PollContext(UnixEventPort& port) { 840 for (FdObserver* ptr = port.observersHead; ptr != nullptr; ptr = ptr->next) { 841 struct pollfd pollfd; 842 memset(&pollfd, 0, sizeof(pollfd)); 843 pollfd.fd = ptr->fd; 844 pollfd.events = ptr->getEventMask(); 845 pollfds.add(pollfd); 846 pollEvents.add(ptr); 847 } 848 849 #if KJ_USE_PIPE_FOR_WAKEUP 850 { 851 struct pollfd pollfd; 852 memset(&pollfd, 0, sizeof(pollfd)); 853 pollfd.fd = port.wakePipeIn; 854 pollfd.events = POLLIN; 855 pollfds.add(pollfd); 856 } 857 #endif 858 } 859 860 void run(int timeout) { 861 pollResult = ::poll(pollfds.begin(), pollfds.size(), timeout); 862 pollError = pollResult < 0 ? errno : 0; 863 864 if (pollError == EINTR) { 865 // We can't simply restart the poll call because we need to recompute the timeout. Instead, 866 // we pretend poll() returned zero events. This will cause the event loop to spin once, 867 // decide it has nothing to do, recompute timeouts, then return to waiting. 868 pollResult = 0; 869 pollError = 0; 870 } 871 } 872 873 bool processResults() { 874 if (pollResult < 0) { 875 KJ_FAIL_SYSCALL("poll()", pollError); 876 } 877 878 bool woken = false; 879 for (auto i: indices(pollfds)) { 880 if (pollfds[i].revents != 0) { 881 #if KJ_USE_PIPE_FOR_WAKEUP 882 if (i == pollEvents.size()) { 883 // The last pollfd is our cross-thread wake pipe. 884 woken = true; 885 // Discard junk in the wake pipe. 886 char junk[256]; 887 ssize_t n; 888 do { 889 KJ_NONBLOCKING_SYSCALL(n = read(pollfds[i].fd, junk, sizeof(junk))); 890 } while (n >= 256); 891 } else { 892 #endif 893 pollEvents[i]->fire(pollfds[i].revents); 894 #if KJ_USE_PIPE_FOR_WAKEUP 895 } 896 #endif 897 if (--pollResult <= 0) { 898 break; 899 } 900 } 901 } 902 return woken; 903 } 904 905 private: 906 kj::Vector<struct pollfd> pollfds; 907 kj::Vector<FdObserver*> pollEvents; 908 int pollResult = 0; 909 int pollError = 0; 910 }; 911 912 bool UnixEventPort::wait() { 913 sigset_t newMask; 914 sigemptyset(&newMask); 915 916 #if !KJ_USE_PIPE_FOR_WAKEUP 917 sigaddset(&newMask, reservedSignal); 918 #endif 919 920 { 921 auto ptr = signalHead; 922 while (ptr != nullptr) { 923 sigaddset(&newMask, ptr->signum); 924 ptr = ptr->next; 925 } 926 if (childSet != nullptr) { 927 sigaddset(&newMask, SIGCHLD); 928 } 929 } 930 931 PollContext pollContext(*this); 932 933 // Capture signals. 934 SignalCapture capture; 935 936 #if KJ_BROKEN_SIGLONGJMP 937 if (sigsetjmp(capture.jumpTo, false)) { 938 #else 939 if (sigsetjmp(capture.jumpTo, true)) { 940 #endif 941 // We received a signal and longjmp'd back out of the signal handler. 942 threadCapture = nullptr; 943 944 #if !KJ_USE_PIPE_FOR_WAKEUP 945 if (capture.siginfo.si_signo == reservedSignal) { 946 return true; 947 } else { 948 #endif 949 gotSignal(capture.siginfo); 950 return false; 951 #if !KJ_USE_PIPE_FOR_WAKEUP 952 } 953 #endif 954 } 955 956 // Enable signals, run the poll, then mask them again. 957 #if KJ_BROKEN_SIGLONGJMP 958 auto& originalMask = capture.originalMask; 959 #else 960 sigset_t originalMask; 961 #endif 962 threadCapture = &capture; 963 pthread_sigmask(SIG_UNBLOCK, &newMask, &originalMask); 964 965 pollContext.run( 966 timerImpl.timeoutToNextEvent(clock.now(), MILLISECONDS, int(maxValue)) 967 .map([](uint64_t t) -> int { return t; }) 968 .orDefault(-1)); 969 970 pthread_sigmask(SIG_SETMASK, &originalMask, nullptr); 971 threadCapture = nullptr; 972 973 // Queue events. 974 bool result = pollContext.processResults(); 975 timerImpl.advanceTo(clock.now()); 976 977 return result; 978 } 979 980 bool UnixEventPort::poll() { 981 // volatile so that longjmp() doesn't clobber it. 982 volatile bool woken = false; 983 984 sigset_t pending; 985 sigset_t waitMask; 986 sigemptyset(&pending); 987 sigfillset(&waitMask); 988 989 // Count how many signals that we care about are pending. 990 KJ_SYSCALL(sigpending(&pending)); 991 uint signalCount = 0; 992 993 #if !KJ_USE_PIPE_FOR_WAKEUP 994 if (sigismember(&pending, reservedSignal)) { 995 ++signalCount; 996 sigdelset(&pending, reservedSignal); 997 sigdelset(&waitMask, reservedSignal); 998 } 999 #endif 1000 1001 { 1002 auto ptr = signalHead; 1003 while (ptr != nullptr) { 1004 if (sigismember(&pending, ptr->signum)) { 1005 ++signalCount; 1006 sigdelset(&pending, ptr->signum); 1007 sigdelset(&waitMask, ptr->signum); 1008 } 1009 ptr = ptr->next; 1010 } 1011 } 1012 1013 // Wait for each pending signal. It would be nice to use sigtimedwait() here but it is not 1014 // available on OSX. :( Instead, we call sigsuspend() once per expected signal. 1015 { 1016 SignalCapture capture; 1017 #if KJ_BROKEN_SIGLONGJMP 1018 pthread_sigmask(SIG_SETMASK, nullptr, &capture.originalMask); 1019 #endif 1020 threadCapture = &capture; 1021 KJ_DEFER(threadCapture = nullptr); 1022 while (signalCount-- > 0) { 1023 #if KJ_BROKEN_SIGLONGJMP 1024 if (sigsetjmp(capture.jumpTo, false)) { 1025 #else 1026 if (sigsetjmp(capture.jumpTo, true)) { 1027 #endif 1028 // We received a signal and longjmp'd back out of the signal handler. 1029 sigdelset(&waitMask, capture.siginfo.si_signo); 1030 #if !KJ_USE_PIPE_FOR_WAKEUP 1031 if (capture.siginfo.si_signo == reservedSignal) { 1032 woken = true; 1033 } else { 1034 #endif 1035 gotSignal(capture.siginfo); 1036 #if !KJ_USE_PIPE_FOR_WAKEUP 1037 } 1038 #endif 1039 } else { 1040 #if __CYGWIN__ 1041 // Cygwin's sigpending() incorrectly reports signals pending for any thread, not just our 1042 // own thread. As a work-around, instead of using sigsuspend() (which would block forever 1043 // if the signal is not pending on *this* thread), we un-mask the signals and immediately 1044 // mask them again. If any signals are pending, they *should* be delivered before the first 1045 // sigprocmask() returns, and the handler will then longjmp() to the block above. If it 1046 // turns out no signal is pending, we'll block the signals again and break out of the 1047 // loop. 1048 // 1049 // Bug reported here: https://cygwin.com/ml/cygwin/2019-07/msg00051.html 1050 sigset_t origMask; 1051 sigprocmask(SIG_SETMASK, &waitMask, &origMask); 1052 sigprocmask(SIG_SETMASK, &origMask, nullptr); 1053 break; 1054 #else 1055 sigsuspend(&waitMask); 1056 KJ_FAIL_ASSERT("sigsuspend() shouldn't return because the signal handler should " 1057 "have siglongjmp()ed."); 1058 #endif 1059 } 1060 } 1061 } 1062 1063 { 1064 PollContext pollContext(*this); 1065 pollContext.run(0); 1066 if (pollContext.processResults()) { 1067 woken = true; 1068 } 1069 } 1070 timerImpl.advanceTo(clock.now()); 1071 1072 return woken; 1073 } 1074 1075 void UnixEventPort::wake() const { 1076 #if KJ_USE_PIPE_FOR_WAKEUP 1077 // We're going to write() a single byte to our wake pipe in order to cause poll() to complete in 1078 // the target thread. 1079 // 1080 // If this write() fails with EWOULDBLOCK, we don't care, because the target thread is already 1081 // scheduled to wake up. 1082 char c = 0; 1083 KJ_NONBLOCKING_SYSCALL(write(wakePipeOut, &c, 1)); 1084 #else 1085 int error = pthread_kill(*reinterpret_cast<const pthread_t*>(&threadId), reservedSignal); 1086 if (error != 0) { 1087 KJ_FAIL_SYSCALL("pthread_kill", error); 1088 } 1089 #endif 1090 } 1091 1092 #endif // KJ_USE_EPOLL, else 1093 1094 } // namespace kj 1095 1096 #endif // !_WIN32