capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

async-unix.c++ (34525B)


      1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #if !_WIN32
     23 
     24 #include "async-unix.h"
     25 #include "debug.h"
     26 #include "threadlocal.h"
     27 #include <setjmp.h>
     28 #include <errno.h>
     29 #include <inttypes.h>
     30 #include <limits>
     31 #include <pthread.h>
     32 #include <map>
     33 #include <sys/wait.h>
     34 #include <unistd.h>
     35 
     36 #if KJ_USE_EPOLL
     37 #include <sys/epoll.h>
     38 #include <sys/signalfd.h>
     39 #include <sys/eventfd.h>
     40 #else
     41 #include <poll.h>
     42 #include <fcntl.h>
     43 #endif
     44 
     45 namespace kj {
     46 
     47 // =======================================================================================
     48 // Signal code common to multiple implementations
     49 
     50 namespace {
     51 
     52 int reservedSignal = SIGUSR1;
     53 bool tooLateToSetReserved = false;
     54 bool capturedChildExit = false;
     55 bool threadClaimedChildExits = false;
     56 
     57 struct SignalCapture {
     58   sigjmp_buf jumpTo;
     59   siginfo_t siginfo;
     60 
     61 #if __APPLE__
     62   sigset_t originalMask;
     63   // The signal mask to be restored when jumping out of the signal handler.
     64   //
     65   // "But wait!" you say, "Isn't the whole point of siglongjmp() that it does this for you?" Well,
     66   // yes, that is supposed to be the point. However, Apple implemented in wrong. On macOS,
     67   // siglongjmp() uses sigprocmask() -- not pthread_sigmask() -- to restore the signal mask.
     68   // Unfortunately, sigprocmask() on macOS affects threads other than the current thread. Arguably
     69   // this is conformant: sigprocmask() is documented as having unspecified behavior in the presence
     70   // of threads, and pthread_sigmask() must be used instead. However, this means siglongjmp()
     71   // cannot be used in the presence of threads.
     72   //
     73   // We'll just have to restore the signal mask ourselves, rather than rely on siglongjmp()...
     74   //
     75   // ... but we ONLY do that on Apple systems, because it turns out, ironically, on Android, this
     76   // hack breaks signal delivery. pthread_sigmask() vs. sigprocmask() is not the issue; we
     77   // apparently MUST let siglongjmp() itself deal with the signal mask, otherwise various tests in
     78   // async-unix-test.c++ end up hanging (I haven't gotten to the bottom of why). Note that on stock
     79   // Linux, _either_ strategy works fine; this appears to be a problem with Android's Bionic libc.
     80   // Since letting siglongjmp() do the work _seeems_ more "correct", we'll make it the default and
     81   // only do something different on Apple platforms.
     82 #define KJ_BROKEN_SIGLONGJMP 1
     83 #endif
     84 };
     85 
     86 #if !KJ_USE_EPOLL  // on Linux we'll use signalfd
     87 KJ_THREADLOCAL_PTR(SignalCapture) threadCapture = nullptr;
     88 
     89 void signalHandler(int, siginfo_t* siginfo, void*) {
     90   SignalCapture* capture = threadCapture;
     91   if (capture != nullptr) {
     92     capture->siginfo = *siginfo;
     93 
     94 #if KJ_BROKEN_SIGLONGJMP
     95     // See comments on SignalCapture::originalMask, above: We can't rely on siglongjmp() to restore
     96     // the signal mask; we must do it ourselves using pthread_sigmask(). We pass false as the
     97     // second parameter to siglongjmp() so that it skips changing the signal mask. This makes it
     98     // equivalent to `longjmp()` on Linux or `_longjmp()` on BSD/macOS. See comments on
     99     // SignalCapture::originalMask for explanation.
    100     pthread_sigmask(SIG_SETMASK, &capture->originalMask, nullptr);
    101     siglongjmp(capture->jumpTo, false);
    102 #else
    103     siglongjmp(capture->jumpTo, true);
    104 #endif
    105   }
    106 }
    107 #endif
    108 
    109 void registerSignalHandler(int signum) {
    110   tooLateToSetReserved = true;
    111 
    112   sigset_t mask;
    113   KJ_SYSCALL(sigemptyset(&mask));
    114   KJ_SYSCALL(sigaddset(&mask, signum));
    115   KJ_SYSCALL(pthread_sigmask(SIG_BLOCK, &mask, nullptr));
    116 
    117 #if !KJ_USE_EPOLL  // on Linux we'll use signalfd
    118   struct sigaction action;
    119   memset(&action, 0, sizeof(action));
    120   action.sa_sigaction = &signalHandler;
    121   KJ_SYSCALL(sigfillset(&action.sa_mask));
    122   action.sa_flags = SA_SIGINFO;
    123   KJ_SYSCALL(sigaction(signum, &action, nullptr));
    124 #endif
    125 }
    126 
    127 #if !KJ_USE_EPOLL && !KJ_USE_PIPE_FOR_WAKEUP
    128 void registerReservedSignal() {
    129   registerSignalHandler(reservedSignal);
    130 }
    131 #endif
    132 
    133 void ignoreSigpipe() {
    134   // We disable SIGPIPE because users of UnixEventPort almost certainly don't want it.
    135   while (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
    136     int error = errno;
    137     if (error != EINTR) {
    138       KJ_FAIL_SYSCALL("signal(SIGPIPE, SIG_IGN)", error);
    139     }
    140   }
    141 }
    142 
    143 }  // namespace
    144 
    145 struct UnixEventPort::ChildSet {
    146   std::map<pid_t, ChildExitPromiseAdapter*> waiters;
    147 
    148   void checkExits();
    149 };
    150 
    151 class UnixEventPort::ChildExitPromiseAdapter {
    152 public:
    153   inline ChildExitPromiseAdapter(PromiseFulfiller<int>& fulfiller,
    154                                  ChildSet& childSet, Maybe<pid_t>& pidRef)
    155       : childSet(childSet),
    156         pid(KJ_REQUIRE_NONNULL(pidRef,
    157             "`pid` must be non-null at the time `onChildExit()` is called")),
    158         pidRef(pidRef), fulfiller(fulfiller) {
    159     KJ_REQUIRE(childSet.waiters.insert(std::make_pair(pid, this)).second,
    160         "already called onChildExit() for this pid");
    161   }
    162 
    163   ~ChildExitPromiseAdapter() noexcept(false) {
    164     childSet.waiters.erase(pid);
    165   }
    166 
    167   ChildSet& childSet;
    168   pid_t pid;
    169   Maybe<pid_t>& pidRef;
    170   PromiseFulfiller<int>& fulfiller;
    171 };
    172 
    173 void UnixEventPort::ChildSet::checkExits() {
    174   for (;;) {
    175     int status;
    176     pid_t pid;
    177     KJ_SYSCALL_HANDLE_ERRORS(pid = waitpid(-1, &status, WNOHANG)) {
    178       case ECHILD:
    179         return;
    180       default:
    181         KJ_FAIL_SYSCALL("waitpid()", error);
    182     }
    183     if (pid == 0) break;
    184 
    185     auto iter = waiters.find(pid);
    186     if (iter != waiters.end()) {
    187       iter->second->pidRef = nullptr;
    188       iter->second->fulfiller.fulfill(kj::cp(status));
    189     }
    190   }
    191 }
    192 
    193 Promise<int> UnixEventPort::onChildExit(Maybe<pid_t>& pid) {
    194   KJ_REQUIRE(capturedChildExit,
    195       "must call UnixEventPort::captureChildExit() to use onChildExit().");
    196 
    197   ChildSet* cs;
    198   KJ_IF_MAYBE(c, childSet) {
    199     cs = *c;
    200   } else {
    201     // In theory we should do an atomic compare-and-swap on threadClaimedChildExits, but this is
    202     // for debug purposes only so it's not a big deal.
    203     KJ_REQUIRE(!threadClaimedChildExits,
    204         "only one UnixEvertPort per process may listen for child exits");
    205     threadClaimedChildExits = true;
    206 
    207     auto newChildSet = kj::heap<ChildSet>();
    208     cs = newChildSet;
    209     childSet = kj::mv(newChildSet);
    210   }
    211 
    212   return kj::newAdaptedPromise<int, ChildExitPromiseAdapter>(*cs, pid);
    213 }
    214 
    215 void UnixEventPort::captureChildExit() {
    216   captureSignal(SIGCHLD);
    217   capturedChildExit = true;
    218 }
    219 
    220 class UnixEventPort::SignalPromiseAdapter {
    221 public:
    222   inline SignalPromiseAdapter(PromiseFulfiller<siginfo_t>& fulfiller,
    223                               UnixEventPort& loop, int signum)
    224       : loop(loop), signum(signum), fulfiller(fulfiller) {
    225     prev = loop.signalTail;
    226     *loop.signalTail = this;
    227     loop.signalTail = &next;
    228   }
    229 
    230   ~SignalPromiseAdapter() noexcept(false) {
    231     if (prev != nullptr) {
    232       if (next == nullptr) {
    233         loop.signalTail = prev;
    234       } else {
    235         next->prev = prev;
    236       }
    237       *prev = next;
    238     }
    239   }
    240 
    241   SignalPromiseAdapter* removeFromList() {
    242     auto result = next;
    243     if (next == nullptr) {
    244       loop.signalTail = prev;
    245     } else {
    246       next->prev = prev;
    247     }
    248     *prev = next;
    249     next = nullptr;
    250     prev = nullptr;
    251     return result;
    252   }
    253 
    254   UnixEventPort& loop;
    255   int signum;
    256   PromiseFulfiller<siginfo_t>& fulfiller;
    257   SignalPromiseAdapter* next = nullptr;
    258   SignalPromiseAdapter** prev = nullptr;
    259 };
    260 
    261 Promise<siginfo_t> UnixEventPort::onSignal(int signum) {
    262   KJ_REQUIRE(signum != SIGCHLD || !capturedChildExit,
    263       "can't call onSigal(SIGCHLD) when kj::UnixEventPort::captureChildExit() has been called");
    264   return newAdaptedPromise<siginfo_t, SignalPromiseAdapter>(*this, signum);
    265 }
    266 
    267 void UnixEventPort::captureSignal(int signum) {
    268   if (reservedSignal == SIGUSR1) {
    269     KJ_REQUIRE(signum != SIGUSR1,
    270                "Sorry, SIGUSR1 is reserved by the UnixEventPort implementation.  You may call "
    271                "UnixEventPort::setReservedSignal() to reserve a different signal.");
    272   } else {
    273     KJ_REQUIRE(signum != reservedSignal,
    274                "Can't capture signal reserved using setReservedSignal().", signum);
    275   }
    276   registerSignalHandler(signum);
    277 }
    278 
    279 void UnixEventPort::setReservedSignal(int signum) {
    280   KJ_REQUIRE(!tooLateToSetReserved,
    281              "setReservedSignal() must be called before any calls to `captureSignal()` and "
    282              "before any `UnixEventPort` is constructed.");
    283   if (reservedSignal != SIGUSR1 && reservedSignal != signum) {
    284     KJ_FAIL_REQUIRE("Detected multiple conflicting calls to setReservedSignal().  Please only "
    285                     "call this once, or always call it with the same signal number.");
    286   }
    287   reservedSignal = signum;
    288 }
    289 
    290 void UnixEventPort::gotSignal(const siginfo_t& siginfo) {
    291   // If onChildExit() has been called and this is SIGCHLD, check for child exits.
    292   KJ_IF_MAYBE(cs, childSet) {
    293     if (siginfo.si_signo == SIGCHLD) {
    294       cs->get()->checkExits();
    295       return;
    296     }
    297   }
    298 
    299   // Fire any events waiting on this signal.
    300   auto ptr = signalHead;
    301   while (ptr != nullptr) {
    302     if (ptr->signum == siginfo.si_signo) {
    303       ptr->fulfiller.fulfill(kj::cp(siginfo));
    304       ptr = ptr->removeFromList();
    305     } else {
    306       ptr = ptr->next;
    307     }
    308   }
    309 }
    310 
    311 #if KJ_USE_EPOLL
    312 // =======================================================================================
    313 // epoll FdObserver implementation
    314 
    315 UnixEventPort::UnixEventPort()
    316     : clock(systemPreciseMonotonicClock()),
    317       timerImpl(clock.now()),
    318       epollFd(-1),
    319       signalFd(-1),
    320       eventFd(-1) {
    321   ignoreSigpipe();
    322 
    323   int fd;
    324   KJ_SYSCALL(fd = epoll_create1(EPOLL_CLOEXEC));
    325   epollFd = AutoCloseFd(fd);
    326 
    327   memset(&signalFdSigset, 0, sizeof(signalFdSigset));
    328 
    329   KJ_SYSCALL(sigemptyset(&signalFdSigset));
    330   KJ_SYSCALL(fd = signalfd(-1, &signalFdSigset, SFD_NONBLOCK | SFD_CLOEXEC));
    331   signalFd = AutoCloseFd(fd);
    332 
    333   KJ_SYSCALL(fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK));
    334   eventFd = AutoCloseFd(fd);
    335 
    336 
    337   struct epoll_event event;
    338   memset(&event, 0, sizeof(event));
    339   event.events = EPOLLIN;
    340   event.data.u64 = 0;
    341   KJ_SYSCALL(epoll_ctl(epollFd, EPOLL_CTL_ADD, signalFd, &event));
    342   event.data.u64 = 1;
    343   KJ_SYSCALL(epoll_ctl(epollFd, EPOLL_CTL_ADD, eventFd, &event));
    344 }
    345 
    346 UnixEventPort::~UnixEventPort() noexcept(false) {
    347   if (childSet != nullptr) {
    348     // We had claimed the exclusive right to call onChildExit(). Release that right.
    349     threadClaimedChildExits = false;
    350   }
    351 }
    352 
    353 UnixEventPort::FdObserver::FdObserver(UnixEventPort& eventPort, int fd, uint flags)
    354     : eventPort(eventPort), fd(fd), flags(flags) {
    355   struct epoll_event event;
    356   memset(&event, 0, sizeof(event));
    357 
    358   if (flags & OBSERVE_READ) {
    359     event.events |= EPOLLIN | EPOLLRDHUP;
    360   }
    361   if (flags & OBSERVE_WRITE) {
    362     event.events |= EPOLLOUT;
    363   }
    364   if (flags & OBSERVE_URGENT) {
    365     event.events |= EPOLLPRI;
    366   }
    367   event.events |= EPOLLET;  // Set edge-triggered mode.
    368 
    369   event.data.ptr = this;
    370 
    371   KJ_SYSCALL(epoll_ctl(eventPort.epollFd, EPOLL_CTL_ADD, fd, &event));
    372 }
    373 
    374 UnixEventPort::FdObserver::~FdObserver() noexcept(false) {
    375   KJ_SYSCALL(epoll_ctl(eventPort.epollFd, EPOLL_CTL_DEL, fd, nullptr)) { break; }
    376 }
    377 
    378 void UnixEventPort::FdObserver::fire(short events) {
    379   if (events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR)) {
    380     if (events & (EPOLLHUP | EPOLLRDHUP)) {
    381       atEnd = true;
    382     } else {
    383       // Since we didn't receive EPOLLRDHUP, we know that we're not at the end.
    384       atEnd = false;
    385     }
    386 
    387     KJ_IF_MAYBE(f, readFulfiller) {
    388       f->get()->fulfill();
    389       readFulfiller = nullptr;
    390     }
    391   }
    392 
    393   if (events & (EPOLLOUT | EPOLLHUP | EPOLLERR)) {
    394     KJ_IF_MAYBE(f, writeFulfiller) {
    395       f->get()->fulfill();
    396       writeFulfiller = nullptr;
    397     }
    398   }
    399 
    400   if (events & (EPOLLHUP | EPOLLERR)) {
    401     KJ_IF_MAYBE(f, hupFulfiller) {
    402       f->get()->fulfill();
    403       hupFulfiller = nullptr;
    404     }
    405   }
    406 
    407   if (events & EPOLLPRI) {
    408     KJ_IF_MAYBE(f, urgentFulfiller) {
    409       f->get()->fulfill();
    410       urgentFulfiller = nullptr;
    411     }
    412   }
    413 }
    414 
    415 Promise<void> UnixEventPort::FdObserver::whenBecomesReadable() {
    416   KJ_REQUIRE(flags & OBSERVE_READ, "FdObserver was not set to observe reads.");
    417 
    418   auto paf = newPromiseAndFulfiller<void>();
    419   readFulfiller = kj::mv(paf.fulfiller);
    420   return kj::mv(paf.promise);
    421 }
    422 
    423 Promise<void> UnixEventPort::FdObserver::whenBecomesWritable() {
    424   KJ_REQUIRE(flags & OBSERVE_WRITE, "FdObserver was not set to observe writes.");
    425 
    426   auto paf = newPromiseAndFulfiller<void>();
    427   writeFulfiller = kj::mv(paf.fulfiller);
    428   return kj::mv(paf.promise);
    429 }
    430 
    431 Promise<void> UnixEventPort::FdObserver::whenUrgentDataAvailable() {
    432   KJ_REQUIRE(flags & OBSERVE_URGENT,
    433       "FdObserver was not set to observe availability of urgent data.");
    434 
    435   auto paf = newPromiseAndFulfiller<void>();
    436   urgentFulfiller = kj::mv(paf.fulfiller);
    437   return kj::mv(paf.promise);
    438 }
    439 
    440 Promise<void> UnixEventPort::FdObserver::whenWriteDisconnected() {
    441   auto paf = newPromiseAndFulfiller<void>();
    442   hupFulfiller = kj::mv(paf.fulfiller);
    443   return kj::mv(paf.promise);
    444 }
    445 
    446 bool UnixEventPort::wait() {
    447   return doEpollWait(
    448       timerImpl.timeoutToNextEvent(clock.now(), MILLISECONDS, int(maxValue))
    449           .map([](uint64_t t) -> int { return t; })
    450           .orDefault(-1));
    451 }
    452 
    453 bool UnixEventPort::poll() {
    454   return doEpollWait(0);
    455 }
    456 
    457 void UnixEventPort::wake() const {
    458   uint64_t one = 1;
    459   ssize_t n;
    460   KJ_NONBLOCKING_SYSCALL(n = write(eventFd, &one, sizeof(one)));
    461   KJ_ASSERT(n < 0 || n == sizeof(one));
    462 }
    463 
    464 static siginfo_t toRegularSiginfo(const struct signalfd_siginfo& siginfo) {
    465   // Unfortunately, siginfo_t is mostly a big union and the correct set of fields to fill in
    466   // depends on the type of signal. OTOH, signalfd_siginfo is a flat struct that expands all
    467   // siginfo_t's union fields out to be non-overlapping. We can't just copy all the fields over
    468   // because of the unions; we have to carefully figure out which fields are appropriate to fill
    469   // in for this signal. Ick.
    470 
    471   siginfo_t result;
    472   memset(&result, 0, sizeof(result));
    473 
    474   result.si_signo = siginfo.ssi_signo;
    475   result.si_errno = siginfo.ssi_errno;
    476   result.si_code = siginfo.ssi_code;
    477 
    478   if (siginfo.ssi_code > 0) {
    479     // Signal originated from the kernel. The structure of the siginfo depends primarily on the
    480     // signal number.
    481 
    482     switch (siginfo.ssi_signo) {
    483     case SIGCHLD:
    484       result.si_pid = siginfo.ssi_pid;
    485       result.si_uid = siginfo.ssi_uid;
    486       result.si_status = siginfo.ssi_status;
    487       result.si_utime = siginfo.ssi_utime;
    488       result.si_stime = siginfo.ssi_stime;
    489       break;
    490 
    491     case SIGILL:
    492     case SIGFPE:
    493     case SIGSEGV:
    494     case SIGBUS:
    495     case SIGTRAP:
    496       result.si_addr = reinterpret_cast<void*>(static_cast<uintptr_t>(siginfo.ssi_addr));
    497 #ifdef si_trapno
    498       result.si_trapno = siginfo.ssi_trapno;
    499 #endif
    500 #ifdef si_addr_lsb
    501       // ssi_addr_lsb is defined as coming immediately after ssi_addr in the kernel headers but
    502       // apparently the userspace headers were never updated. So we do a pointer hack. :(
    503       result.si_addr_lsb = *reinterpret_cast<const uint16_t*>(&siginfo.ssi_addr + 1);
    504 #endif
    505       break;
    506 
    507     case SIGIO:
    508       static_assert(SIGIO == SIGPOLL, "SIGIO != SIGPOLL?");
    509 
    510       // Note: Technically, code can arrange for SIGIO signals to be delivered with a signal number
    511       //   other than SIGIO. AFAICT there is no way for us to detect this in the siginfo. Luckily
    512       //   SIGIO is totally obsoleted by epoll so it shouldn't come up.
    513 
    514       result.si_band = siginfo.ssi_band;
    515       result.si_fd = siginfo.ssi_fd;
    516       break;
    517 
    518     case SIGSYS:
    519       // Apparently SIGSYS's fields are not available in signalfd_siginfo?
    520       break;
    521     }
    522 
    523   } else {
    524     // Signal originated from userspace. The sender could specify whatever signal number they
    525     // wanted. The structure of the signal is determined by the API they used, which is identified
    526     // by SI_CODE.
    527 
    528     switch (siginfo.ssi_code) {
    529       case SI_USER:
    530       case SI_TKILL:
    531         // kill(), tkill(), or tgkill().
    532         result.si_pid = siginfo.ssi_pid;
    533         result.si_uid = siginfo.ssi_uid;
    534         break;
    535 
    536       case SI_QUEUE:
    537       case SI_MESGQ:
    538       case SI_ASYNCIO:
    539       default:
    540         result.si_pid = siginfo.ssi_pid;
    541         result.si_uid = siginfo.ssi_uid;
    542 
    543         // This is awkward. In siginfo_t, si_ptr and si_int are in a union together. In
    544         // signalfd_siginfo, they are not. We don't really know whether the app intended to send
    545         // an int or a pointer. Presumably since the pointer is always larger than the int, if
    546         // we write the pointer, we'll end up with the right value for the int? Presumably the
    547         // two fields of signalfd_siginfo are actually extracted from one of these unions
    548         // originally, so actually contain redundant data? Better write some tests...
    549         //
    550         // Making matters even stranger, siginfo.ssi_ptr is 64-bit even on 32-bit systems, and
    551         // it appears that instead of doing the obvious thing by casting the pointer value to
    552         // 64 bits, the kernel actually memcpy()s the 32-bit value into the 64-bit space. As
    553         // a result, on big-endian 32-bit systems, the original pointer value ends up in the
    554         // *upper* 32 bits of siginfo.ssi_ptr, which is totally weird. We play along and use
    555         // a memcpy() on our end too, to get the right result on all platforms.
    556         memcpy(&result.si_ptr, &siginfo.ssi_ptr, sizeof(result.si_ptr));
    557         break;
    558 
    559       case SI_TIMER:
    560         result.si_timerid = siginfo.ssi_tid;
    561         result.si_overrun = siginfo.ssi_overrun;
    562 
    563         // Again with this weirdness...
    564         result.si_ptr = reinterpret_cast<void*>(static_cast<uintptr_t>(siginfo.ssi_ptr));
    565         break;
    566     }
    567   }
    568 
    569   return result;
    570 }
    571 
    572 bool UnixEventPort::doEpollWait(int timeout) {
    573   sigset_t newMask;
    574   memset(&newMask, 0, sizeof(newMask));
    575   sigemptyset(&newMask);
    576 
    577   {
    578     auto ptr = signalHead;
    579     while (ptr != nullptr) {
    580       sigaddset(&newMask, ptr->signum);
    581       ptr = ptr->next;
    582     }
    583     if (childSet != nullptr) {
    584       sigaddset(&newMask, SIGCHLD);
    585     }
    586   }
    587 
    588   if (memcmp(&newMask, &signalFdSigset, sizeof(newMask)) != 0) {
    589     // Apparently we're not waiting on the same signals as last time. Need to update the signal
    590     // FD's mask.
    591     signalFdSigset = newMask;
    592     KJ_SYSCALL(signalfd(signalFd, &signalFdSigset, SFD_NONBLOCK | SFD_CLOEXEC));
    593   }
    594 
    595   struct epoll_event events[16];
    596   int n = epoll_wait(epollFd, events, kj::size(events), timeout);
    597   if (n < 0) {
    598     int error = errno;
    599     if (error == EINTR) {
    600       // We can't simply restart the epoll call because we need to recompute the timeout. Instead,
    601       // we pretend epoll_wait() returned zero events. This will cause the event loop to spin once,
    602       // decide it has nothing to do, recompute timeouts, then return to waiting.
    603       n = 0;
    604     } else {
    605       KJ_FAIL_SYSCALL("epoll_wait()", error);
    606     }
    607   }
    608 
    609   bool woken = false;
    610 
    611   for (int i = 0; i < n; i++) {
    612     if (events[i].data.u64 == 0) {
    613       for (;;) {
    614         struct signalfd_siginfo siginfo;
    615         ssize_t n;
    616         KJ_NONBLOCKING_SYSCALL(n = read(signalFd, &siginfo, sizeof(siginfo)));
    617         if (n < 0) break;  // no more signals
    618 
    619         KJ_ASSERT(n == sizeof(siginfo));
    620 
    621         gotSignal(toRegularSiginfo(siginfo));
    622 
    623 #ifdef SIGRTMIN
    624         if (siginfo.ssi_signo >= SIGRTMIN) {
    625           // This is an RT signal. There could be multiple copies queued. We need to remove it from
    626           // the signalfd's signal mask before we continue, to avoid accidentally reading and
    627           // discarding the extra copies.
    628           // TODO(perf): If high throughput of RT signals is desired then perhaps we should read
    629           //   them all into userspace and queue them here. Maybe we even need a better interface
    630           //   than onSignal() for receiving high-volume RT signals.
    631           KJ_SYSCALL(sigdelset(&signalFdSigset, siginfo.ssi_signo));
    632           KJ_SYSCALL(signalfd(signalFd, &signalFdSigset, SFD_NONBLOCK | SFD_CLOEXEC));
    633         }
    634 #endif
    635       }
    636     } else if (events[i].data.u64 == 1) {
    637       // Someone called wake() from another thread. Consume the event.
    638       uint64_t value;
    639       ssize_t n;
    640       KJ_NONBLOCKING_SYSCALL(n = read(eventFd, &value, sizeof(value)));
    641       KJ_ASSERT(n < 0 || n == sizeof(value));
    642 
    643       // We were woken. Need to return true.
    644       woken = true;
    645     } else {
    646       FdObserver* observer = reinterpret_cast<FdObserver*>(events[i].data.ptr);
    647       observer->fire(events[i].events);
    648     }
    649   }
    650 
    651   timerImpl.advanceTo(clock.now());
    652 
    653   return woken;
    654 }
    655 
    656 #else  // KJ_USE_EPOLL
    657 // =======================================================================================
    658 // Traditional poll() FdObserver implementation.
    659 
    660 #ifndef POLLRDHUP
    661 #define POLLRDHUP 0
    662 #endif
    663 
    664 UnixEventPort::UnixEventPort()
    665     : clock(systemPreciseMonotonicClock()),
    666       timerImpl(clock.now()) {
    667 #if KJ_USE_PIPE_FOR_WAKEUP
    668   // Allocate a pipe to which we'll write a byte in order to wake this thread.
    669   int fds[2];
    670   KJ_SYSCALL(pipe(fds));
    671   wakePipeIn = kj::AutoCloseFd(fds[0]);
    672   wakePipeOut = kj::AutoCloseFd(fds[1]);
    673   KJ_SYSCALL(fcntl(wakePipeIn, F_SETFD, FD_CLOEXEC));
    674   KJ_SYSCALL(fcntl(wakePipeOut, F_SETFD, FD_CLOEXEC));
    675 #else
    676   static_assert(sizeof(threadId) >= sizeof(pthread_t),
    677                 "pthread_t is larger than a long long on your platform.  Please port.");
    678   *reinterpret_cast<pthread_t*>(&threadId) = pthread_self();
    679 
    680   // Note: We used to use a pthread_once to call registerReservedSignal() only once per process.
    681   //   This didn't work correctly because registerReservedSignal() not only registers the
    682   //   (process-wide) signal handler, but also sets the (per-thread) signal mask to block the
    683   //   signal. Thus, if threads were spawned before the first UnixEventPort was created, and then
    684   //   multiple threads created UnixEventPorts, only one of them would have the signal properly
    685   //   blocked. We could have changed things so that only the handler registration was protected
    686   //   by the pthread_once and the mask update happened in every thread, but registering a signal
    687   //   handler is not an expensive operation, so whatever... we'll do it in every thread.
    688   registerReservedSignal();
    689 #endif
    690 
    691   ignoreSigpipe();
    692 }
    693 
    694 UnixEventPort::~UnixEventPort() noexcept(false) {}
    695 
    696 UnixEventPort::FdObserver::FdObserver(UnixEventPort& eventPort, int fd, uint flags)
    697     : eventPort(eventPort), fd(fd), flags(flags), next(nullptr), prev(nullptr) {}
    698 
    699 UnixEventPort::FdObserver::~FdObserver() noexcept(false) {
    700   if (prev != nullptr) {
    701     if (next == nullptr) {
    702       eventPort.observersTail = prev;
    703     } else {
    704       next->prev = prev;
    705     }
    706     *prev = next;
    707   }
    708 }
    709 
    710 void UnixEventPort::FdObserver::fire(short events) {
    711   if (events & (POLLIN | POLLHUP | POLLRDHUP | POLLERR | POLLNVAL)) {
    712     if (events & (POLLHUP | POLLRDHUP)) {
    713       atEnd = true;
    714 #if POLLRDHUP
    715     } else {
    716       // Since POLLRDHUP exists on this platform, and we didn't receive it, we know that we're not
    717       // at the end.
    718       atEnd = false;
    719 #endif
    720     }
    721 
    722     KJ_IF_MAYBE(f, readFulfiller) {
    723       f->get()->fulfill();
    724       readFulfiller = nullptr;
    725     }
    726   }
    727 
    728   if (events & (POLLOUT | POLLHUP | POLLERR | POLLNVAL)) {
    729     KJ_IF_MAYBE(f, writeFulfiller) {
    730       f->get()->fulfill();
    731       writeFulfiller = nullptr;
    732     }
    733   }
    734 
    735   if (events & (POLLHUP | POLLERR | POLLNVAL)) {
    736     KJ_IF_MAYBE(f, hupFulfiller) {
    737       f->get()->fulfill();
    738       hupFulfiller = nullptr;
    739     }
    740   }
    741 
    742   if (events & POLLPRI) {
    743     KJ_IF_MAYBE(f, urgentFulfiller) {
    744       f->get()->fulfill();
    745       urgentFulfiller = nullptr;
    746     }
    747   }
    748 
    749   if (readFulfiller == nullptr && writeFulfiller == nullptr && urgentFulfiller == nullptr &&
    750       hupFulfiller == nullptr) {
    751     // Remove from list.
    752     if (next == nullptr) {
    753       eventPort.observersTail = prev;
    754     } else {
    755       next->prev = prev;
    756     }
    757     *prev = next;
    758     next = nullptr;
    759     prev = nullptr;
    760   }
    761 }
    762 
    763 short UnixEventPort::FdObserver::getEventMask() {
    764   return (readFulfiller == nullptr ? 0 : (POLLIN | POLLRDHUP)) |
    765          (writeFulfiller == nullptr ? 0 : POLLOUT) |
    766          (urgentFulfiller == nullptr ? 0 : POLLPRI) |
    767          // The POSIX standard says POLLHUP and POLLERR will be reported even if not requested.
    768          // But on MacOS, if `events` is 0, then POLLHUP apparently will not be reported:
    769          //   https://openradar.appspot.com/37537852
    770          // It seems that by settingc any non-zero value -- even one documented as ignored -- we
    771          // cause POLLHUP to be reported. Both POLLHUP and POLLERR are documented as being ignored.
    772          // So, we'll go ahead and set them. This has no effect on non-broken OSs, causes MacOS to
    773          // do the right thing, and sort of looks as if we're explicitly requesting notification of
    774          // these two conditions, which we do after all want to know about.
    775          POLLHUP | POLLERR;
    776 }
    777 
    778 Promise<void> UnixEventPort::FdObserver::whenBecomesReadable() {
    779   KJ_REQUIRE(flags & OBSERVE_READ, "FdObserver was not set to observe reads.");
    780 
    781   if (prev == nullptr) {
    782     KJ_DASSERT(next == nullptr);
    783     prev = eventPort.observersTail;
    784     *prev = this;
    785     eventPort.observersTail = &next;
    786   }
    787 
    788   auto paf = newPromiseAndFulfiller<void>();
    789   readFulfiller = kj::mv(paf.fulfiller);
    790   return kj::mv(paf.promise);
    791 }
    792 
    793 Promise<void> UnixEventPort::FdObserver::whenBecomesWritable() {
    794   KJ_REQUIRE(flags & OBSERVE_WRITE, "FdObserver was not set to observe writes.");
    795 
    796   if (prev == nullptr) {
    797     KJ_DASSERT(next == nullptr);
    798     prev = eventPort.observersTail;
    799     *prev = this;
    800     eventPort.observersTail = &next;
    801   }
    802 
    803   auto paf = newPromiseAndFulfiller<void>();
    804   writeFulfiller = kj::mv(paf.fulfiller);
    805   return kj::mv(paf.promise);
    806 }
    807 
    808 Promise<void> UnixEventPort::FdObserver::whenUrgentDataAvailable() {
    809   KJ_REQUIRE(flags & OBSERVE_URGENT,
    810       "FdObserver was not set to observe availability of urgent data.");
    811 
    812   if (prev == nullptr) {
    813     KJ_DASSERT(next == nullptr);
    814     prev = eventPort.observersTail;
    815     *prev = this;
    816     eventPort.observersTail = &next;
    817   }
    818 
    819   auto paf = newPromiseAndFulfiller<void>();
    820   urgentFulfiller = kj::mv(paf.fulfiller);
    821   return kj::mv(paf.promise);
    822 }
    823 
    824 Promise<void> UnixEventPort::FdObserver::whenWriteDisconnected() {
    825   if (prev == nullptr) {
    826     KJ_DASSERT(next == nullptr);
    827     prev = eventPort.observersTail;
    828     *prev = this;
    829     eventPort.observersTail = &next;
    830   }
    831 
    832   auto paf = newPromiseAndFulfiller<void>();
    833   hupFulfiller = kj::mv(paf.fulfiller);
    834   return kj::mv(paf.promise);
    835 }
    836 
    837 class UnixEventPort::PollContext {
    838 public:
    839   PollContext(UnixEventPort& port) {
    840     for (FdObserver* ptr = port.observersHead; ptr != nullptr; ptr = ptr->next) {
    841       struct pollfd pollfd;
    842       memset(&pollfd, 0, sizeof(pollfd));
    843       pollfd.fd = ptr->fd;
    844       pollfd.events = ptr->getEventMask();
    845       pollfds.add(pollfd);
    846       pollEvents.add(ptr);
    847     }
    848 
    849 #if KJ_USE_PIPE_FOR_WAKEUP
    850     {
    851       struct pollfd pollfd;
    852       memset(&pollfd, 0, sizeof(pollfd));
    853       pollfd.fd = port.wakePipeIn;
    854       pollfd.events = POLLIN;
    855       pollfds.add(pollfd);
    856     }
    857 #endif
    858   }
    859 
    860   void run(int timeout) {
    861     pollResult = ::poll(pollfds.begin(), pollfds.size(), timeout);
    862     pollError = pollResult < 0 ? errno : 0;
    863 
    864     if (pollError == EINTR) {
    865       // We can't simply restart the poll call because we need to recompute the timeout. Instead,
    866       // we pretend poll() returned zero events. This will cause the event loop to spin once,
    867       // decide it has nothing to do, recompute timeouts, then return to waiting.
    868       pollResult = 0;
    869       pollError = 0;
    870     }
    871   }
    872 
    873   bool processResults() {
    874     if (pollResult < 0) {
    875       KJ_FAIL_SYSCALL("poll()", pollError);
    876     }
    877 
    878     bool woken = false;
    879     for (auto i: indices(pollfds)) {
    880       if (pollfds[i].revents != 0) {
    881 #if KJ_USE_PIPE_FOR_WAKEUP
    882         if (i == pollEvents.size()) {
    883           // The last pollfd is our cross-thread wake pipe.
    884           woken = true;
    885           // Discard junk in the wake pipe.
    886           char junk[256];
    887           ssize_t n;
    888           do {
    889             KJ_NONBLOCKING_SYSCALL(n = read(pollfds[i].fd, junk, sizeof(junk)));
    890           } while (n >= 256);
    891         } else {
    892 #endif
    893           pollEvents[i]->fire(pollfds[i].revents);
    894 #if KJ_USE_PIPE_FOR_WAKEUP
    895         }
    896 #endif
    897         if (--pollResult <= 0) {
    898           break;
    899         }
    900       }
    901     }
    902     return woken;
    903   }
    904 
    905 private:
    906   kj::Vector<struct pollfd> pollfds;
    907   kj::Vector<FdObserver*> pollEvents;
    908   int pollResult = 0;
    909   int pollError = 0;
    910 };
    911 
    912 bool UnixEventPort::wait() {
    913   sigset_t newMask;
    914   sigemptyset(&newMask);
    915 
    916 #if !KJ_USE_PIPE_FOR_WAKEUP
    917   sigaddset(&newMask, reservedSignal);
    918 #endif
    919 
    920   {
    921     auto ptr = signalHead;
    922     while (ptr != nullptr) {
    923       sigaddset(&newMask, ptr->signum);
    924       ptr = ptr->next;
    925     }
    926     if (childSet != nullptr) {
    927       sigaddset(&newMask, SIGCHLD);
    928     }
    929   }
    930 
    931   PollContext pollContext(*this);
    932 
    933   // Capture signals.
    934   SignalCapture capture;
    935 
    936 #if KJ_BROKEN_SIGLONGJMP
    937   if (sigsetjmp(capture.jumpTo, false)) {
    938 #else
    939   if (sigsetjmp(capture.jumpTo, true)) {
    940 #endif
    941     // We received a signal and longjmp'd back out of the signal handler.
    942     threadCapture = nullptr;
    943 
    944 #if !KJ_USE_PIPE_FOR_WAKEUP
    945     if (capture.siginfo.si_signo == reservedSignal) {
    946       return true;
    947     } else {
    948 #endif
    949       gotSignal(capture.siginfo);
    950       return false;
    951 #if !KJ_USE_PIPE_FOR_WAKEUP
    952     }
    953 #endif
    954   }
    955 
    956   // Enable signals, run the poll, then mask them again.
    957 #if KJ_BROKEN_SIGLONGJMP
    958   auto& originalMask = capture.originalMask;
    959 #else
    960   sigset_t originalMask;
    961 #endif
    962   threadCapture = &capture;
    963   pthread_sigmask(SIG_UNBLOCK, &newMask, &originalMask);
    964 
    965   pollContext.run(
    966       timerImpl.timeoutToNextEvent(clock.now(), MILLISECONDS, int(maxValue))
    967           .map([](uint64_t t) -> int { return t; })
    968           .orDefault(-1));
    969 
    970   pthread_sigmask(SIG_SETMASK, &originalMask, nullptr);
    971   threadCapture = nullptr;
    972 
    973   // Queue events.
    974   bool result = pollContext.processResults();
    975   timerImpl.advanceTo(clock.now());
    976 
    977   return result;
    978 }
    979 
    980 bool UnixEventPort::poll() {
    981   // volatile so that longjmp() doesn't clobber it.
    982   volatile bool woken = false;
    983 
    984   sigset_t pending;
    985   sigset_t waitMask;
    986   sigemptyset(&pending);
    987   sigfillset(&waitMask);
    988 
    989   // Count how many signals that we care about are pending.
    990   KJ_SYSCALL(sigpending(&pending));
    991   uint signalCount = 0;
    992 
    993 #if !KJ_USE_PIPE_FOR_WAKEUP
    994   if (sigismember(&pending, reservedSignal)) {
    995     ++signalCount;
    996     sigdelset(&pending, reservedSignal);
    997     sigdelset(&waitMask, reservedSignal);
    998   }
    999 #endif
   1000 
   1001   {
   1002     auto ptr = signalHead;
   1003     while (ptr != nullptr) {
   1004       if (sigismember(&pending, ptr->signum)) {
   1005         ++signalCount;
   1006         sigdelset(&pending, ptr->signum);
   1007         sigdelset(&waitMask, ptr->signum);
   1008       }
   1009       ptr = ptr->next;
   1010     }
   1011   }
   1012 
   1013   // Wait for each pending signal.  It would be nice to use sigtimedwait() here but it is not
   1014   // available on OSX.  :(  Instead, we call sigsuspend() once per expected signal.
   1015   {
   1016     SignalCapture capture;
   1017 #if KJ_BROKEN_SIGLONGJMP
   1018     pthread_sigmask(SIG_SETMASK, nullptr, &capture.originalMask);
   1019 #endif
   1020     threadCapture = &capture;
   1021     KJ_DEFER(threadCapture = nullptr);
   1022     while (signalCount-- > 0) {
   1023 #if KJ_BROKEN_SIGLONGJMP
   1024       if (sigsetjmp(capture.jumpTo, false)) {
   1025 #else
   1026       if (sigsetjmp(capture.jumpTo, true)) {
   1027 #endif
   1028         // We received a signal and longjmp'd back out of the signal handler.
   1029         sigdelset(&waitMask, capture.siginfo.si_signo);
   1030 #if !KJ_USE_PIPE_FOR_WAKEUP
   1031         if (capture.siginfo.si_signo == reservedSignal) {
   1032           woken = true;
   1033         } else {
   1034 #endif
   1035           gotSignal(capture.siginfo);
   1036 #if !KJ_USE_PIPE_FOR_WAKEUP
   1037         }
   1038 #endif
   1039       } else {
   1040 #if __CYGWIN__
   1041         // Cygwin's sigpending() incorrectly reports signals pending for any thread, not just our
   1042         // own thread. As a work-around, instead of using sigsuspend() (which would block forever
   1043         // if the signal is not pending on *this* thread), we un-mask the signals and immediately
   1044         // mask them again. If any signals are pending, they *should* be delivered before the first
   1045         // sigprocmask() returns, and the handler will then longjmp() to the block above. If it
   1046         // turns out no signal is pending, we'll block the signals again and break out of the
   1047         // loop.
   1048         //
   1049         // Bug reported here: https://cygwin.com/ml/cygwin/2019-07/msg00051.html
   1050         sigset_t origMask;
   1051         sigprocmask(SIG_SETMASK, &waitMask, &origMask);
   1052         sigprocmask(SIG_SETMASK, &origMask, nullptr);
   1053         break;
   1054 #else
   1055         sigsuspend(&waitMask);
   1056         KJ_FAIL_ASSERT("sigsuspend() shouldn't return because the signal handler should "
   1057                       "have siglongjmp()ed.");
   1058 #endif
   1059       }
   1060     }
   1061   }
   1062 
   1063   {
   1064     PollContext pollContext(*this);
   1065     pollContext.run(0);
   1066     if (pollContext.processResults()) {
   1067       woken = true;
   1068     }
   1069   }
   1070   timerImpl.advanceTo(clock.now());
   1071 
   1072   return woken;
   1073 }
   1074 
   1075 void UnixEventPort::wake() const {
   1076 #if KJ_USE_PIPE_FOR_WAKEUP
   1077   // We're going to write() a single byte to our wake pipe in order to cause poll() to complete in
   1078   // the target thread.
   1079   //
   1080   // If this write() fails with EWOULDBLOCK, we don't care, because the target thread is already
   1081   // scheduled to wake up.
   1082   char c = 0;
   1083   KJ_NONBLOCKING_SYSCALL(write(wakePipeOut, &c, 1));
   1084 #else
   1085   int error = pthread_kill(*reinterpret_cast<const pthread_t*>(&threadId), reservedSignal);
   1086   if (error != 0) {
   1087     KJ_FAIL_SYSCALL("pthread_kill", error);
   1088   }
   1089 #endif
   1090 }
   1091 
   1092 #endif  // KJ_USE_EPOLL, else
   1093 
   1094 }  // namespace kj
   1095 
   1096 #endif  // !_WIN32