capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

async-unix-test.c++ (28291B)


      1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #if !_WIN32
     23 
     24 #include "async-unix.h"
     25 #include "thread.h"
     26 #include "debug.h"
     27 #include "io.h"
     28 #include <unistd.h>
     29 #include <fcntl.h>
     30 #include <sys/types.h>
     31 #include <sys/socket.h>
     32 #include <sys/stat.h>
     33 #include <netinet/in.h>
     34 #include <kj/compat/gtest.h>
     35 #include <pthread.h>
     36 #include <algorithm>
     37 #include <sys/wait.h>
     38 #include <sys/time.h>
     39 #include <errno.h>
     40 #include "mutex.h"
     41 
     42 #if __BIONIC__
     43 // Android's Bionic defines SIGRTMIN but using it in sigaddset() throws EINVAL, which means we
     44 // definitely can't actually use RT signals.
     45 #undef SIGRTMIN
     46 #endif
     47 
     48 namespace kj {
     49 namespace {
     50 
     51 inline void delay() { usleep(10000); }
     52 
     53 // On OSX, si_code seems to be zero when SI_USER is expected.
     54 #if __linux__ || __CYGWIN__
     55 #define EXPECT_SI_CODE EXPECT_EQ
     56 #else
     57 #define EXPECT_SI_CODE(a,b)
     58 #endif
     59 
     60 void captureSignals() {
     61   static bool captured = false;
     62   if (!captured) {
     63     // We use SIGIO and SIGURG as our test signals because they're two signals that we can be
     64     // reasonably confident won't otherwise be delivered to any KJ or Cap'n Proto test.  We can't
     65     // use SIGUSR1 because it is reserved by UnixEventPort and SIGUSR2 is used by Valgrind on OSX.
     66     UnixEventPort::captureSignal(SIGURG);
     67     UnixEventPort::captureSignal(SIGIO);
     68 
     69 #ifdef SIGRTMIN
     70     UnixEventPort::captureSignal(SIGRTMIN);
     71 #endif
     72 
     73     UnixEventPort::captureChildExit();
     74 
     75     captured = true;
     76   }
     77 }
     78 
     79 TEST(AsyncUnixTest, Signals) {
     80   captureSignals();
     81   UnixEventPort port;
     82   EventLoop loop(port);
     83   WaitScope waitScope(loop);
     84 
     85   kill(getpid(), SIGURG);
     86 
     87   siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
     88   EXPECT_EQ(SIGURG, info.si_signo);
     89   EXPECT_SI_CODE(SI_USER, info.si_code);
     90 }
     91 
     92 #if defined(SIGRTMIN) && !__BIONIC__ && !(__linux__ && __mips__)
     93 TEST(AsyncUnixTest, SignalWithValue) {
     94   // This tests that if we use sigqueue() to attach a value to the signal, that value is received
     95   // correctly.  Note that this only works on platforms that support real-time signals -- even
     96   // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
     97   // signals.  Hence this test won't run on e.g. Mac OSX.
     98   //
     99   // Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
    100   //
    101   // Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips
    102   // machine wants to debug the problem but they demand a patch fixing it, so we disable the test.
    103   // Sad. https://github.com/sandstorm-io/capnproto/issues/204
    104 
    105   captureSignals();
    106   UnixEventPort port;
    107   EventLoop loop(port);
    108   WaitScope waitScope(loop);
    109 
    110   union sigval value;
    111   memset(&value, 0, sizeof(value));
    112   value.sival_int = 123;
    113   KJ_SYSCALL_HANDLE_ERRORS(sigqueue(getpid(), SIGURG, value)) {
    114     case ENOSYS:
    115       // sigqueue() not supported. Maybe running on WSL.
    116       KJ_LOG(WARNING, "sigqueue() is not implemented by your system; skipping test");
    117       return;
    118     default:
    119       KJ_FAIL_SYSCALL("sigqueue(getpid(), SIGURG, value)", error);
    120   }
    121 
    122   siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
    123   EXPECT_EQ(SIGURG, info.si_signo);
    124   EXPECT_SI_CODE(SI_QUEUE, info.si_code);
    125   EXPECT_EQ(123, info.si_value.sival_int);
    126 }
    127 
    128 TEST(AsyncUnixTest, SignalWithPointerValue) {
    129   // This tests that if we use sigqueue() to attach a value to the signal, that value is received
    130   // correctly.  Note that this only works on platforms that support real-time signals -- even
    131   // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
    132   // signals.  Hence this test won't run on e.g. Mac OSX.
    133   //
    134   // Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
    135   //
    136   // Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips
    137   // machine wants to debug the problem but they demand a patch fixing it, so we disable the test.
    138   // Sad. https://github.com/sandstorm-io/capnproto/issues/204
    139 
    140   captureSignals();
    141   UnixEventPort port;
    142   EventLoop loop(port);
    143   WaitScope waitScope(loop);
    144 
    145   union sigval value;
    146   memset(&value, 0, sizeof(value));
    147   value.sival_ptr = &port;
    148   KJ_SYSCALL_HANDLE_ERRORS(sigqueue(getpid(), SIGURG, value)) {
    149     case ENOSYS:
    150       // sigqueue() not supported. Maybe running on WSL.
    151       KJ_LOG(WARNING, "sigqueue() is not implemented by your system; skipping test");
    152       return;
    153     default:
    154       KJ_FAIL_SYSCALL("sigqueue(getpid(), SIGURG, value)", error);
    155   }
    156 
    157   siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
    158   EXPECT_EQ(SIGURG, info.si_signo);
    159   EXPECT_SI_CODE(SI_QUEUE, info.si_code);
    160   EXPECT_EQ(&port, info.si_value.sival_ptr);
    161 }
    162 #endif
    163 
    164 TEST(AsyncUnixTest, SignalsMultiListen) {
    165   captureSignals();
    166   UnixEventPort port;
    167   EventLoop loop(port);
    168   WaitScope waitScope(loop);
    169 
    170   port.onSignal(SIGIO).then([](siginfo_t&&) {
    171     KJ_FAIL_EXPECT("Received wrong signal.");
    172   }).detach([](kj::Exception&& exception) {
    173     KJ_FAIL_EXPECT(exception);
    174   });
    175 
    176   kill(getpid(), SIGURG);
    177 
    178   siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
    179   EXPECT_EQ(SIGURG, info.si_signo);
    180   EXPECT_SI_CODE(SI_USER, info.si_code);
    181 }
    182 
    183 #if !__CYGWIN32__
    184 // Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
    185 // deliver SIGIO, if you reverse the order of the waits).  Since this doesn't occur on any other
    186 // platform I'm assuming it's a Cygwin bug.
    187 
    188 TEST(AsyncUnixTest, SignalsMultiReceive) {
    189   captureSignals();
    190   UnixEventPort port;
    191   EventLoop loop(port);
    192   WaitScope waitScope(loop);
    193 
    194   kill(getpid(), SIGURG);
    195   kill(getpid(), SIGIO);
    196 
    197   siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
    198   EXPECT_EQ(SIGURG, info.si_signo);
    199   EXPECT_SI_CODE(SI_USER, info.si_code);
    200 
    201   info = port.onSignal(SIGIO).wait(waitScope);
    202   EXPECT_EQ(SIGIO, info.si_signo);
    203   EXPECT_SI_CODE(SI_USER, info.si_code);
    204 }
    205 
    206 #endif  // !__CYGWIN32__
    207 
    208 TEST(AsyncUnixTest, SignalsAsync) {
    209   captureSignals();
    210   UnixEventPort port;
    211   EventLoop loop(port);
    212   WaitScope waitScope(loop);
    213 
    214   // Arrange for a signal to be sent from another thread.
    215   pthread_t mainThread = pthread_self();
    216   Thread thread([&]() {
    217     delay();
    218     pthread_kill(mainThread, SIGURG);
    219   });
    220 
    221   siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
    222   EXPECT_EQ(SIGURG, info.si_signo);
    223 #if __linux__
    224   EXPECT_SI_CODE(SI_TKILL, info.si_code);
    225 #endif
    226 }
    227 
    228 #if !__CYGWIN32__
    229 // Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
    230 // deliver SIGIO, if you reverse the order of the waits).  Since this doesn't occur on any other
    231 // platform I'm assuming it's a Cygwin bug.
    232 
    233 TEST(AsyncUnixTest, SignalsNoWait) {
    234   // Verify that UnixEventPort::poll() correctly receives pending signals.
    235 
    236   captureSignals();
    237   UnixEventPort port;
    238   EventLoop loop(port);
    239   WaitScope waitScope(loop);
    240 
    241   bool receivedSigurg = false;
    242   bool receivedSigio = false;
    243   port.onSignal(SIGURG).then([&](siginfo_t&& info) {
    244     receivedSigurg = true;
    245     EXPECT_EQ(SIGURG, info.si_signo);
    246     EXPECT_SI_CODE(SI_USER, info.si_code);
    247   }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
    248   port.onSignal(SIGIO).then([&](siginfo_t&& info) {
    249     receivedSigio = true;
    250     EXPECT_EQ(SIGIO, info.si_signo);
    251     EXPECT_SI_CODE(SI_USER, info.si_code);
    252   }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
    253 
    254   kill(getpid(), SIGURG);
    255   kill(getpid(), SIGIO);
    256 
    257   EXPECT_FALSE(receivedSigurg);
    258   EXPECT_FALSE(receivedSigio);
    259 
    260   loop.run();
    261 
    262   EXPECT_FALSE(receivedSigurg);
    263   EXPECT_FALSE(receivedSigio);
    264 
    265   port.poll();
    266 
    267   EXPECT_FALSE(receivedSigurg);
    268   EXPECT_FALSE(receivedSigio);
    269 
    270   loop.run();
    271 
    272   EXPECT_TRUE(receivedSigurg);
    273   EXPECT_TRUE(receivedSigio);
    274 }
    275 
    276 #endif  // !__CYGWIN32__
    277 
    278 TEST(AsyncUnixTest, ReadObserver) {
    279   captureSignals();
    280   UnixEventPort port;
    281   EventLoop loop(port);
    282   WaitScope waitScope(loop);
    283 
    284   int pipefds[2];
    285   KJ_SYSCALL(pipe(pipefds));
    286   kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
    287 
    288   UnixEventPort::FdObserver observer(port, infd, UnixEventPort::FdObserver::OBSERVE_READ);
    289 
    290   KJ_SYSCALL(write(outfd, "foo", 3));
    291 
    292   observer.whenBecomesReadable().wait(waitScope);
    293 
    294 #if __linux__  // platform known to support POLLRDHUP
    295   EXPECT_FALSE(KJ_ASSERT_NONNULL(observer.atEndHint()));
    296 
    297   char buffer[4096];
    298   ssize_t n;
    299   KJ_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
    300   EXPECT_EQ(3, n);
    301 
    302   KJ_SYSCALL(write(outfd, "bar", 3));
    303   outfd = nullptr;
    304 
    305   observer.whenBecomesReadable().wait(waitScope);
    306 
    307   EXPECT_TRUE(KJ_ASSERT_NONNULL(observer.atEndHint()));
    308 #endif
    309 }
    310 
    311 TEST(AsyncUnixTest, ReadObserverMultiListen) {
    312   captureSignals();
    313   UnixEventPort port;
    314   EventLoop loop(port);
    315   WaitScope waitScope(loop);
    316 
    317   int bogusPipefds[2];
    318   KJ_SYSCALL(pipe(bogusPipefds));
    319   KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); });
    320 
    321   UnixEventPort::FdObserver bogusObserver(port, bogusPipefds[0],
    322       UnixEventPort::FdObserver::OBSERVE_READ);
    323 
    324   bogusObserver.whenBecomesReadable().then([]() {
    325     ADD_FAILURE() << "Received wrong poll.";
    326   }).detach([](kj::Exception&& exception) {
    327     ADD_FAILURE() << kj::str(exception).cStr();
    328   });
    329 
    330   int pipefds[2];
    331   KJ_SYSCALL(pipe(pipefds));
    332   KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
    333 
    334   UnixEventPort::FdObserver observer(port, pipefds[0],
    335       UnixEventPort::FdObserver::OBSERVE_READ);
    336   KJ_SYSCALL(write(pipefds[1], "foo", 3));
    337 
    338   observer.whenBecomesReadable().wait(waitScope);
    339 }
    340 
    341 TEST(AsyncUnixTest, ReadObserverMultiReceive) {
    342   captureSignals();
    343   UnixEventPort port;
    344   EventLoop loop(port);
    345   WaitScope waitScope(loop);
    346 
    347   int pipefds[2];
    348   KJ_SYSCALL(pipe(pipefds));
    349   KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
    350 
    351   UnixEventPort::FdObserver observer(port, pipefds[0],
    352       UnixEventPort::FdObserver::OBSERVE_READ);
    353   KJ_SYSCALL(write(pipefds[1], "foo", 3));
    354 
    355   int pipefds2[2];
    356   KJ_SYSCALL(pipe(pipefds2));
    357   KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
    358 
    359   UnixEventPort::FdObserver observer2(port, pipefds2[0],
    360       UnixEventPort::FdObserver::OBSERVE_READ);
    361   KJ_SYSCALL(write(pipefds2[1], "bar", 3));
    362 
    363   auto promise1 = observer.whenBecomesReadable();
    364   auto promise2 = observer2.whenBecomesReadable();
    365   promise1.wait(waitScope);
    366   promise2.wait(waitScope);
    367 }
    368 
    369 TEST(AsyncUnixTest, ReadObserverAsync) {
    370   captureSignals();
    371   UnixEventPort port;
    372   EventLoop loop(port);
    373   WaitScope waitScope(loop);
    374 
    375   // Make a pipe and wait on its read end while another thread writes to it.
    376   int pipefds[2];
    377   KJ_SYSCALL(pipe(pipefds));
    378   KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
    379   UnixEventPort::FdObserver observer(port, pipefds[0],
    380       UnixEventPort::FdObserver::OBSERVE_READ);
    381 
    382   Thread thread([&]() {
    383     delay();
    384     KJ_SYSCALL(write(pipefds[1], "foo", 3));
    385   });
    386 
    387   // Wait for the event in this thread.
    388   observer.whenBecomesReadable().wait(waitScope);
    389 }
    390 
    391 TEST(AsyncUnixTest, ReadObserverNoWait) {
    392   // Verify that UnixEventPort::poll() correctly receives pending FD events.
    393 
    394   captureSignals();
    395   UnixEventPort port;
    396   EventLoop loop(port);
    397   WaitScope waitScope(loop);
    398 
    399   int pipefds[2];
    400   KJ_SYSCALL(pipe(pipefds));
    401   KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
    402   UnixEventPort::FdObserver observer(port, pipefds[0],
    403       UnixEventPort::FdObserver::OBSERVE_READ);
    404 
    405   int pipefds2[2];
    406   KJ_SYSCALL(pipe(pipefds2));
    407   KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
    408   UnixEventPort::FdObserver observer2(port, pipefds2[0],
    409       UnixEventPort::FdObserver::OBSERVE_READ);
    410 
    411   int receivedCount = 0;
    412   observer.whenBecomesReadable().then([&]() {
    413     receivedCount++;
    414   }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
    415   observer2.whenBecomesReadable().then([&]() {
    416     receivedCount++;
    417   }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
    418 
    419   KJ_SYSCALL(write(pipefds[1], "foo", 3));
    420   KJ_SYSCALL(write(pipefds2[1], "bar", 3));
    421 
    422   EXPECT_EQ(0, receivedCount);
    423 
    424   loop.run();
    425 
    426   EXPECT_EQ(0, receivedCount);
    427 
    428   port.poll();
    429 
    430   EXPECT_EQ(0, receivedCount);
    431 
    432   loop.run();
    433 
    434   EXPECT_EQ(2, receivedCount);
    435 }
    436 
    437 static void setNonblocking(int fd) {
    438   int flags;
    439   KJ_SYSCALL(flags = fcntl(fd, F_GETFL));
    440   if ((flags & O_NONBLOCK) == 0) {
    441     KJ_SYSCALL(fcntl(fd, F_SETFL, flags | O_NONBLOCK));
    442   }
    443 }
    444 
    445 TEST(AsyncUnixTest, WriteObserver) {
    446   captureSignals();
    447   UnixEventPort port;
    448   EventLoop loop(port);
    449   WaitScope waitScope(loop);
    450 
    451   int pipefds[2];
    452   KJ_SYSCALL(pipe(pipefds));
    453   kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
    454   setNonblocking(outfd);
    455   setNonblocking(infd);
    456 
    457   UnixEventPort::FdObserver observer(port, outfd, UnixEventPort::FdObserver::OBSERVE_WRITE);
    458 
    459   // Fill buffer.
    460   ssize_t n;
    461   do {
    462     KJ_NONBLOCKING_SYSCALL(n = write(outfd, "foo", 3));
    463   } while (n >= 0);
    464 
    465   bool writable = false;
    466   auto promise = observer.whenBecomesWritable()
    467       .then([&]() { writable = true; }).eagerlyEvaluate(nullptr);
    468 
    469   loop.run();
    470   port.poll();
    471   loop.run();
    472 
    473   EXPECT_FALSE(writable);
    474 
    475   // Empty the read end so that the write end becomes writable. Note that Linux implements a
    476   // high watermark / low watermark heuristic which means that only reading one byte is not
    477   // sufficient. The amount we have to read is in fact architecture-dependent -- it appears to be
    478   // 1 page. To be safe, we read everything.
    479   char buffer[4096];
    480   do {
    481     KJ_NONBLOCKING_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
    482   } while (n > 0);
    483 
    484   loop.run();
    485   port.poll();
    486   loop.run();
    487 
    488   EXPECT_TRUE(writable);
    489 }
    490 
    491 #if !__APPLE__
    492 // Disabled on macOS due to https://github.com/sandstorm-io/capnproto/issues/374.
    493 TEST(AsyncUnixTest, UrgentObserver) {
    494   // Verify that FdObserver correctly detects availability of out-of-band data.
    495   // Availability of out-of-band data is implementation-specific.
    496   // Linux's and OS X's TCP/IP stack supports out-of-band messages for TCP sockets, which is used
    497   // for this test.
    498 
    499   UnixEventPort port;
    500   EventLoop loop(port);
    501   WaitScope waitScope(loop);
    502   int tmpFd;
    503   char c;
    504 
    505   // Spawn a TCP server
    506   KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0));
    507   kj::AutoCloseFd serverFd(tmpFd);
    508   sockaddr_in saddr;
    509   memset(&saddr, 0, sizeof(saddr));
    510   saddr.sin_family = AF_INET;
    511   saddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
    512   KJ_SYSCALL(bind(serverFd, reinterpret_cast<sockaddr*>(&saddr), sizeof(saddr)));
    513   socklen_t saddrLen = sizeof(saddr);
    514   KJ_SYSCALL(getsockname(serverFd, reinterpret_cast<sockaddr*>(&saddr), &saddrLen));
    515   KJ_SYSCALL(listen(serverFd, 1));
    516 
    517   // Create a pipe that we'll use to signal if MSG_OOB return EINVAL.
    518   int failpipe[2];
    519   KJ_SYSCALL(pipe(failpipe));
    520   KJ_DEFER({
    521     close(failpipe[0]);
    522     close(failpipe[1]);
    523   });
    524 
    525   // Accept one connection, send in-band and OOB byte, wait for a quit message
    526   Thread thread([&]() {
    527     int tmpFd;
    528     char c;
    529 
    530     sockaddr_in caddr;
    531     socklen_t caddrLen = sizeof(caddr);
    532     KJ_SYSCALL(tmpFd = accept(serverFd, reinterpret_cast<sockaddr*>(&caddr), &caddrLen));
    533     kj::AutoCloseFd clientFd(tmpFd);
    534     delay();
    535 
    536     // Workaround: OS X won't signal POLLPRI without POLLIN. Also enqueue some in-band data.
    537     c = 'i';
    538     KJ_SYSCALL(send(clientFd, &c, 1, 0));
    539     c = 'o';
    540     KJ_SYSCALL_HANDLE_ERRORS(send(clientFd, &c, 1, MSG_OOB)) {
    541       case EINVAL:
    542         // Looks like MSG_OOB is not supported. (This is the case e.g. on WSL.)
    543         KJ_SYSCALL(write(failpipe[1], &c, 1));
    544         break;
    545       default:
    546         KJ_FAIL_SYSCALL("send(..., MSG_OOB)", error);
    547     }
    548 
    549     KJ_SYSCALL(recv(clientFd, &c, 1, 0));
    550     EXPECT_EQ('q', c);
    551   });
    552   KJ_DEFER({ shutdown(serverFd, SHUT_RDWR); serverFd = nullptr; });
    553 
    554   KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0));
    555   kj::AutoCloseFd clientFd(tmpFd);
    556   KJ_SYSCALL(connect(clientFd, reinterpret_cast<sockaddr*>(&saddr), saddrLen));
    557 
    558   UnixEventPort::FdObserver observer(port, clientFd,
    559       UnixEventPort::FdObserver::OBSERVE_READ | UnixEventPort::FdObserver::OBSERVE_URGENT);
    560   UnixEventPort::FdObserver failObserver(port, failpipe[0],
    561       UnixEventPort::FdObserver::OBSERVE_READ | UnixEventPort::FdObserver::OBSERVE_URGENT);
    562 
    563   auto promise = observer.whenUrgentDataAvailable().then([]() { return true; });
    564   auto failPromise = failObserver.whenBecomesReadable().then([]() { return false; });
    565 
    566   bool oobSupported = promise.exclusiveJoin(kj::mv(failPromise)).wait(waitScope);
    567   if (oobSupported) {
    568 #if __CYGWIN__
    569     // On Cygwin, reading the urgent byte first causes the subsequent regular read to block until
    570     // such a time as the connection closes -- and then the byte is successfully returned. This
    571     // seems to be a cygwin bug.
    572     KJ_SYSCALL(recv(clientFd, &c, 1, 0));
    573     EXPECT_EQ('i', c);
    574     KJ_SYSCALL(recv(clientFd, &c, 1, MSG_OOB));
    575     EXPECT_EQ('o', c);
    576 #else
    577     // Attempt to read the urgent byte prior to reading the in-band byte.
    578     KJ_SYSCALL(recv(clientFd, &c, 1, MSG_OOB));
    579     EXPECT_EQ('o', c);
    580     KJ_SYSCALL(recv(clientFd, &c, 1, 0));
    581     EXPECT_EQ('i', c);
    582 #endif
    583   } else {
    584     KJ_LOG(WARNING, "MSG_OOB doesn't seem to be supported on your platform.");
    585   }
    586 
    587   // Allow server thread to let its clientFd go out of scope.
    588   c = 'q';
    589   KJ_SYSCALL(send(clientFd, &c, 1, 0));
    590   KJ_SYSCALL(shutdown(clientFd, SHUT_RDWR));
    591 }
    592 #endif
    593 
    594 TEST(AsyncUnixTest, SteadyTimers) {
    595   captureSignals();
    596   UnixEventPort port;
    597   EventLoop loop(port);
    598   WaitScope waitScope(loop);
    599 
    600   auto& timer = port.getTimer();
    601 
    602   auto start = timer.now();
    603   kj::Vector<TimePoint> expected;
    604   kj::Vector<TimePoint> actual;
    605 
    606   auto addTimer = [&](Duration delay) {
    607     expected.add(max(start + delay, start));
    608     timer.atTime(start + delay).then([&]() {
    609       actual.add(timer.now());
    610     }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
    611   };
    612 
    613   addTimer(30 * MILLISECONDS);
    614   addTimer(40 * MILLISECONDS);
    615   addTimer(20350 * MICROSECONDS);
    616   addTimer(30 * MILLISECONDS);
    617   addTimer(-10 * MILLISECONDS);
    618 
    619   std::sort(expected.begin(), expected.end());
    620   timer.atTime(expected.back() + MILLISECONDS).wait(waitScope);
    621 
    622   ASSERT_EQ(expected.size(), actual.size());
    623   for (int i = 0; i < expected.size(); ++i) {
    624     KJ_EXPECT(expected[i] <= actual[i], "Actual time for timer i is too early.",
    625               i, ((expected[i] - actual[i]) / NANOSECONDS));
    626   }
    627 }
    628 
    629 bool dummySignalHandlerCalled = false;
    630 void dummySignalHandler(int) {
    631   dummySignalHandlerCalled = true;
    632 }
    633 
    634 TEST(AsyncUnixTest, InterruptedTimer) {
    635   captureSignals();
    636   UnixEventPort port;
    637   EventLoop loop(port);
    638   WaitScope waitScope(loop);
    639 
    640 #if __linux__
    641   // Linux timeslices are 1ms.
    642   constexpr auto OS_SLOWNESS_FACTOR = 1;
    643 #else
    644   // OSX timeslices are 10ms, so we need longer timeouts to avoid flakiness.
    645   // To be safe we'll assume other OS's are similar.
    646   constexpr auto OS_SLOWNESS_FACTOR = 10;
    647 #endif
    648 
    649   // Schedule a timer event in 100ms.
    650   auto& timer = port.getTimer();
    651   auto start = timer.now();
    652   constexpr auto timeout = 100 * MILLISECONDS * OS_SLOWNESS_FACTOR;
    653 
    654   // Arrange SIGALRM to be delivered in 50ms, handled in an empty signal handler. This will cause
    655   // our wait to be interrupted with EINTR. We should nevertheless continue waiting for the right
    656   // amount of time.
    657   dummySignalHandlerCalled = false;
    658   if (signal(SIGALRM, &dummySignalHandler) == SIG_ERR) {
    659     KJ_FAIL_SYSCALL("signal(SIGALRM)", errno);
    660   }
    661   struct itimerval itv;
    662   memset(&itv, 0, sizeof(itv));
    663   itv.it_value.tv_usec = 50000 * OS_SLOWNESS_FACTOR;  // signal after 50ms
    664   setitimer(ITIMER_REAL, &itv, nullptr);
    665 
    666   timer.afterDelay(timeout).wait(waitScope);
    667 
    668   KJ_EXPECT(dummySignalHandlerCalled);
    669   KJ_EXPECT(timer.now() - start >= timeout);
    670   KJ_EXPECT(timer.now() - start <= timeout + (timeout / 5));  // allow 20ms error
    671 }
    672 
    673 TEST(AsyncUnixTest, Wake) {
    674   captureSignals();
    675   UnixEventPort port;
    676   EventLoop loop(port);
    677   WaitScope waitScope(loop);
    678 
    679   EXPECT_FALSE(port.poll());
    680   port.wake();
    681   EXPECT_TRUE(port.poll());
    682   EXPECT_FALSE(port.poll());
    683 
    684   port.wake();
    685   EXPECT_TRUE(port.wait());
    686 
    687   {
    688     auto promise = port.getTimer().atTime(port.getTimer().now());
    689     EXPECT_FALSE(port.wait());
    690   }
    691 
    692   // Test wake() when already wait()ing.
    693   {
    694     Thread thread([&]() {
    695       delay();
    696       port.wake();
    697     });
    698 
    699     EXPECT_TRUE(port.wait());
    700   }
    701 
    702   // Test wait() after wake() already happened.
    703   {
    704     Thread thread([&]() {
    705       port.wake();
    706     });
    707 
    708     delay();
    709     EXPECT_TRUE(port.wait());
    710   }
    711 
    712   // Test wake() during poll() busy loop.
    713   {
    714     Thread thread([&]() {
    715       delay();
    716       port.wake();
    717     });
    718 
    719     EXPECT_FALSE(port.poll());
    720     while (!port.poll()) {}
    721   }
    722 
    723   // Test poll() when wake() already delivered.
    724   {
    725     EXPECT_FALSE(port.poll());
    726 
    727     Thread thread([&]() {
    728       port.wake();
    729     });
    730 
    731     do {
    732       delay();
    733     } while (!port.poll());
    734   }
    735 }
    736 
    737 int exitCodeForSignal = 0;
    738 [[noreturn]] void exitSignalHandler(int) {
    739   _exit(exitCodeForSignal);
    740 }
    741 
    742 struct TestChild {
    743   kj::Maybe<pid_t> pid;
    744   kj::Promise<int> promise = nullptr;
    745 
    746   TestChild(UnixEventPort& port, int exitCode) {
    747     pid_t p;
    748     KJ_SYSCALL(p = fork());
    749     if (p == 0) {
    750       // Arrange for SIGTERM to cause the process to exit normally.
    751       exitCodeForSignal = exitCode;
    752       signal(SIGTERM, &exitSignalHandler);
    753       sigset_t sigs;
    754       sigemptyset(&sigs);
    755       sigaddset(&sigs, SIGTERM);
    756       pthread_sigmask(SIG_UNBLOCK, &sigs, nullptr);
    757 
    758       for (;;) pause();
    759     }
    760     pid = p;
    761     promise = port.onChildExit(pid);
    762   }
    763 
    764   ~TestChild() noexcept(false) {
    765     KJ_IF_MAYBE(p, pid) {
    766       KJ_SYSCALL(::kill(*p, SIGKILL)) { return; }
    767       int status;
    768       KJ_SYSCALL(waitpid(*p, &status, 0)) { return; }
    769     }
    770   }
    771 
    772   void kill(int signo) {
    773     KJ_SYSCALL(::kill(KJ_REQUIRE_NONNULL(pid), signo));
    774   }
    775 
    776   KJ_DISALLOW_COPY(TestChild);
    777 };
    778 
    779 TEST(AsyncUnixTest, ChildProcess) {
    780   captureSignals();
    781   UnixEventPort port;
    782   EventLoop loop(port);
    783   WaitScope waitScope(loop);
    784 
    785   // Block SIGTERM so that we can carefully un-block it in children.
    786   sigset_t sigs, oldsigs;
    787   KJ_SYSCALL(sigemptyset(&sigs));
    788   KJ_SYSCALL(sigaddset(&sigs, SIGTERM));
    789   KJ_SYSCALL(pthread_sigmask(SIG_BLOCK, &sigs, &oldsigs));
    790   KJ_DEFER(KJ_SYSCALL(pthread_sigmask(SIG_SETMASK, &oldsigs, nullptr)) { break; });
    791 
    792   TestChild child1(port, 123);
    793   KJ_EXPECT(!child1.promise.poll(waitScope));
    794 
    795   child1.kill(SIGTERM);
    796 
    797   {
    798     int status = child1.promise.wait(waitScope);
    799     KJ_EXPECT(WIFEXITED(status));
    800     KJ_EXPECT(WEXITSTATUS(status) == 123);
    801   }
    802 
    803   TestChild child2(port, 234);
    804   TestChild child3(port, 345);
    805 
    806   KJ_EXPECT(!child2.promise.poll(waitScope));
    807   KJ_EXPECT(!child3.promise.poll(waitScope));
    808 
    809   child2.kill(SIGKILL);
    810 
    811   {
    812     int status = child2.promise.wait(waitScope);
    813     KJ_EXPECT(!WIFEXITED(status));
    814     KJ_EXPECT(WIFSIGNALED(status));
    815     KJ_EXPECT(WTERMSIG(status) == SIGKILL);
    816   }
    817 
    818   KJ_EXPECT(!child3.promise.poll(waitScope));
    819 
    820   // child3 will be killed and synchronously waited on the way out.
    821 }
    822 
    823 #if !__CYGWIN__
    824 // TODO(someday): Figure out why whenWriteDisconnected() never resolves on Cygwin.
    825 
    826 KJ_TEST("UnixEventPort whenWriteDisconnected()") {
    827   captureSignals();
    828   UnixEventPort port;
    829   EventLoop loop(port);
    830   WaitScope waitScope(loop);
    831 
    832   int fds_[2];
    833   KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds_));
    834   kj::AutoCloseFd fds[2] = { kj::AutoCloseFd(fds_[0]), kj::AutoCloseFd(fds_[1]) };
    835 
    836   UnixEventPort::FdObserver observer(port, fds[0], UnixEventPort::FdObserver::OBSERVE_READ);
    837 
    838   // At one point, the poll()-based version of UnixEventPort had a bug where if some other event
    839   // had completed previously, whenWriteDisconnected() would stop being watched for. So we watch
    840   // for readability as well and check that that goes away first.
    841   auto readablePromise = observer.whenBecomesReadable();
    842   auto hupPromise = observer.whenWriteDisconnected();
    843 
    844   KJ_EXPECT(!readablePromise.poll(waitScope));
    845   KJ_EXPECT(!hupPromise.poll(waitScope));
    846 
    847   KJ_SYSCALL(write(fds[1], "foo", 3));
    848 
    849   KJ_ASSERT(readablePromise.poll(waitScope));
    850   readablePromise.wait(waitScope);
    851 
    852   {
    853     char junk[16];
    854     ssize_t n;
    855     KJ_SYSCALL(n = read(fds[0], junk, 16));
    856     KJ_EXPECT(n == 3);
    857   }
    858 
    859   KJ_EXPECT(!hupPromise.poll(waitScope));
    860 
    861   fds[1] = nullptr;
    862   KJ_ASSERT(hupPromise.poll(waitScope));
    863   hupPromise.wait(waitScope);
    864 }
    865 
    866 KJ_TEST("UnixEventPort FdObserver(..., flags=0)::whenWriteDisconnected()") {
    867   // Verifies that given `0' as a `flags' argument,
    868   // FdObserver still observes whenWriteDisconnected().
    869   //
    870   // This can be useful to watch disconnection on a blocking file descriptor.
    871   // See discussion: https://github.com/capnproto/capnproto/issues/924
    872 
    873   captureSignals();
    874   UnixEventPort port;
    875   EventLoop loop(port);
    876   WaitScope waitScope(loop);
    877 
    878   int pipefds[2];
    879   KJ_SYSCALL(pipe(pipefds));
    880   kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
    881 
    882   UnixEventPort::FdObserver observer(port, outfd, 0);
    883 
    884   auto hupPromise = observer.whenWriteDisconnected();
    885 
    886   KJ_EXPECT(!hupPromise.poll(waitScope));
    887 
    888   infd = nullptr;
    889   KJ_ASSERT(hupPromise.poll(waitScope));
    890   hupPromise.wait(waitScope);
    891 }
    892 
    893 #endif
    894 
    895 KJ_TEST("UnixEventPort poll for signals") {
    896   captureSignals();
    897   UnixEventPort port;
    898   EventLoop loop(port);
    899   WaitScope waitScope(loop);
    900 
    901   auto promise1 = port.onSignal(SIGURG);
    902   auto promise2 = port.onSignal(SIGIO);
    903 
    904   KJ_EXPECT(!promise1.poll(waitScope));
    905   KJ_EXPECT(!promise2.poll(waitScope));
    906 
    907   KJ_SYSCALL(raise(SIGURG));
    908   KJ_SYSCALL(raise(SIGIO));
    909   port.wake();
    910 
    911   KJ_EXPECT(port.poll());
    912   KJ_EXPECT(promise1.poll(waitScope));
    913   KJ_EXPECT(promise2.poll(waitScope));
    914 
    915   promise1.wait(waitScope);
    916   promise2.wait(waitScope);
    917 }
    918 
    919 #if defined(SIGRTMIN) && !__CYGWIN__ && !__aarch64__
    920 // TODO(someday): Figure out why RT signals don't seem to work correctly on Cygwin. It looks like
    921 //   only the first signal is delivered, like how non-RT signals work. Is it possible Cygwin
    922 //   advertites RT signal support but doesn't actually implement them correctly? I can't find any
    923 //   information on the internet about this and TBH I don't care about Cygwin enough to dig in.
    924 // TODO(someday): Figure out why RT signals don't work under qemu-user emulating aarch64 on
    925 //   Debian Buster.
    926 
    927 void testRtSignals(UnixEventPort& port, WaitScope& waitScope, bool doPoll) {
    928   union sigval value;
    929   memset(&value, 0, sizeof(value));
    930 
    931   // Queue three copies of the signal upfront.
    932   for (uint i = 0; i < 3; i++) {
    933     value.sival_int = 123 + i;
    934     KJ_SYSCALL(sigqueue(getpid(), SIGRTMIN, value));
    935   }
    936 
    937   // Now wait for them.
    938   for (uint i = 0; i < 3; i++) {
    939     auto promise = port.onSignal(SIGRTMIN);
    940     if (doPoll) {
    941       KJ_ASSERT(promise.poll(waitScope));
    942     }
    943     auto info = promise.wait(waitScope);
    944     KJ_EXPECT(info.si_value.sival_int == 123 + i);
    945   }
    946 
    947   KJ_EXPECT(!port.onSignal(SIGRTMIN).poll(waitScope));
    948 }
    949 
    950 KJ_TEST("UnixEventPort can receive multiple queued instances of an RT signal") {
    951   captureSignals();
    952   UnixEventPort port;
    953   EventLoop loop(port);
    954   WaitScope waitScope(loop);
    955 
    956   testRtSignals(port, waitScope, true);
    957 
    958   // Test again, but don't poll() the promises. This may test a different code path, if poll() and
    959   // wait() are very different in how they read signals. (For the poll(2)-based implementation of
    960   // UnixEventPort, they are indeed pretty different.)
    961   testRtSignals(port, waitScope, false);
    962 }
    963 #endif
    964 
    965 }  // namespace
    966 }  // namespace kj
    967 
    968 #endif  // !_WIN32