async-unix-test.c++ (28291B)
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #if !_WIN32 23 24 #include "async-unix.h" 25 #include "thread.h" 26 #include "debug.h" 27 #include "io.h" 28 #include <unistd.h> 29 #include <fcntl.h> 30 #include <sys/types.h> 31 #include <sys/socket.h> 32 #include <sys/stat.h> 33 #include <netinet/in.h> 34 #include <kj/compat/gtest.h> 35 #include <pthread.h> 36 #include <algorithm> 37 #include <sys/wait.h> 38 #include <sys/time.h> 39 #include <errno.h> 40 #include "mutex.h" 41 42 #if __BIONIC__ 43 // Android's Bionic defines SIGRTMIN but using it in sigaddset() throws EINVAL, which means we 44 // definitely can't actually use RT signals. 45 #undef SIGRTMIN 46 #endif 47 48 namespace kj { 49 namespace { 50 51 inline void delay() { usleep(10000); } 52 53 // On OSX, si_code seems to be zero when SI_USER is expected. 54 #if __linux__ || __CYGWIN__ 55 #define EXPECT_SI_CODE EXPECT_EQ 56 #else 57 #define EXPECT_SI_CODE(a,b) 58 #endif 59 60 void captureSignals() { 61 static bool captured = false; 62 if (!captured) { 63 // We use SIGIO and SIGURG as our test signals because they're two signals that we can be 64 // reasonably confident won't otherwise be delivered to any KJ or Cap'n Proto test. We can't 65 // use SIGUSR1 because it is reserved by UnixEventPort and SIGUSR2 is used by Valgrind on OSX. 66 UnixEventPort::captureSignal(SIGURG); 67 UnixEventPort::captureSignal(SIGIO); 68 69 #ifdef SIGRTMIN 70 UnixEventPort::captureSignal(SIGRTMIN); 71 #endif 72 73 UnixEventPort::captureChildExit(); 74 75 captured = true; 76 } 77 } 78 79 TEST(AsyncUnixTest, Signals) { 80 captureSignals(); 81 UnixEventPort port; 82 EventLoop loop(port); 83 WaitScope waitScope(loop); 84 85 kill(getpid(), SIGURG); 86 87 siginfo_t info = port.onSignal(SIGURG).wait(waitScope); 88 EXPECT_EQ(SIGURG, info.si_signo); 89 EXPECT_SI_CODE(SI_USER, info.si_code); 90 } 91 92 #if defined(SIGRTMIN) && !__BIONIC__ && !(__linux__ && __mips__) 93 TEST(AsyncUnixTest, SignalWithValue) { 94 // This tests that if we use sigqueue() to attach a value to the signal, that value is received 95 // correctly. Note that this only works on platforms that support real-time signals -- even 96 // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT 97 // signals. Hence this test won't run on e.g. Mac OSX. 98 // 99 // Also, Android's bionic does not appear to support sigqueue() even though the kernel does. 100 // 101 // Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips 102 // machine wants to debug the problem but they demand a patch fixing it, so we disable the test. 103 // Sad. https://github.com/sandstorm-io/capnproto/issues/204 104 105 captureSignals(); 106 UnixEventPort port; 107 EventLoop loop(port); 108 WaitScope waitScope(loop); 109 110 union sigval value; 111 memset(&value, 0, sizeof(value)); 112 value.sival_int = 123; 113 KJ_SYSCALL_HANDLE_ERRORS(sigqueue(getpid(), SIGURG, value)) { 114 case ENOSYS: 115 // sigqueue() not supported. Maybe running on WSL. 116 KJ_LOG(WARNING, "sigqueue() is not implemented by your system; skipping test"); 117 return; 118 default: 119 KJ_FAIL_SYSCALL("sigqueue(getpid(), SIGURG, value)", error); 120 } 121 122 siginfo_t info = port.onSignal(SIGURG).wait(waitScope); 123 EXPECT_EQ(SIGURG, info.si_signo); 124 EXPECT_SI_CODE(SI_QUEUE, info.si_code); 125 EXPECT_EQ(123, info.si_value.sival_int); 126 } 127 128 TEST(AsyncUnixTest, SignalWithPointerValue) { 129 // This tests that if we use sigqueue() to attach a value to the signal, that value is received 130 // correctly. Note that this only works on platforms that support real-time signals -- even 131 // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT 132 // signals. Hence this test won't run on e.g. Mac OSX. 133 // 134 // Also, Android's bionic does not appear to support sigqueue() even though the kernel does. 135 // 136 // Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips 137 // machine wants to debug the problem but they demand a patch fixing it, so we disable the test. 138 // Sad. https://github.com/sandstorm-io/capnproto/issues/204 139 140 captureSignals(); 141 UnixEventPort port; 142 EventLoop loop(port); 143 WaitScope waitScope(loop); 144 145 union sigval value; 146 memset(&value, 0, sizeof(value)); 147 value.sival_ptr = &port; 148 KJ_SYSCALL_HANDLE_ERRORS(sigqueue(getpid(), SIGURG, value)) { 149 case ENOSYS: 150 // sigqueue() not supported. Maybe running on WSL. 151 KJ_LOG(WARNING, "sigqueue() is not implemented by your system; skipping test"); 152 return; 153 default: 154 KJ_FAIL_SYSCALL("sigqueue(getpid(), SIGURG, value)", error); 155 } 156 157 siginfo_t info = port.onSignal(SIGURG).wait(waitScope); 158 EXPECT_EQ(SIGURG, info.si_signo); 159 EXPECT_SI_CODE(SI_QUEUE, info.si_code); 160 EXPECT_EQ(&port, info.si_value.sival_ptr); 161 } 162 #endif 163 164 TEST(AsyncUnixTest, SignalsMultiListen) { 165 captureSignals(); 166 UnixEventPort port; 167 EventLoop loop(port); 168 WaitScope waitScope(loop); 169 170 port.onSignal(SIGIO).then([](siginfo_t&&) { 171 KJ_FAIL_EXPECT("Received wrong signal."); 172 }).detach([](kj::Exception&& exception) { 173 KJ_FAIL_EXPECT(exception); 174 }); 175 176 kill(getpid(), SIGURG); 177 178 siginfo_t info = port.onSignal(SIGURG).wait(waitScope); 179 EXPECT_EQ(SIGURG, info.si_signo); 180 EXPECT_SI_CODE(SI_USER, info.si_code); 181 } 182 183 #if !__CYGWIN32__ 184 // Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does 185 // deliver SIGIO, if you reverse the order of the waits). Since this doesn't occur on any other 186 // platform I'm assuming it's a Cygwin bug. 187 188 TEST(AsyncUnixTest, SignalsMultiReceive) { 189 captureSignals(); 190 UnixEventPort port; 191 EventLoop loop(port); 192 WaitScope waitScope(loop); 193 194 kill(getpid(), SIGURG); 195 kill(getpid(), SIGIO); 196 197 siginfo_t info = port.onSignal(SIGURG).wait(waitScope); 198 EXPECT_EQ(SIGURG, info.si_signo); 199 EXPECT_SI_CODE(SI_USER, info.si_code); 200 201 info = port.onSignal(SIGIO).wait(waitScope); 202 EXPECT_EQ(SIGIO, info.si_signo); 203 EXPECT_SI_CODE(SI_USER, info.si_code); 204 } 205 206 #endif // !__CYGWIN32__ 207 208 TEST(AsyncUnixTest, SignalsAsync) { 209 captureSignals(); 210 UnixEventPort port; 211 EventLoop loop(port); 212 WaitScope waitScope(loop); 213 214 // Arrange for a signal to be sent from another thread. 215 pthread_t mainThread = pthread_self(); 216 Thread thread([&]() { 217 delay(); 218 pthread_kill(mainThread, SIGURG); 219 }); 220 221 siginfo_t info = port.onSignal(SIGURG).wait(waitScope); 222 EXPECT_EQ(SIGURG, info.si_signo); 223 #if __linux__ 224 EXPECT_SI_CODE(SI_TKILL, info.si_code); 225 #endif 226 } 227 228 #if !__CYGWIN32__ 229 // Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does 230 // deliver SIGIO, if you reverse the order of the waits). Since this doesn't occur on any other 231 // platform I'm assuming it's a Cygwin bug. 232 233 TEST(AsyncUnixTest, SignalsNoWait) { 234 // Verify that UnixEventPort::poll() correctly receives pending signals. 235 236 captureSignals(); 237 UnixEventPort port; 238 EventLoop loop(port); 239 WaitScope waitScope(loop); 240 241 bool receivedSigurg = false; 242 bool receivedSigio = false; 243 port.onSignal(SIGURG).then([&](siginfo_t&& info) { 244 receivedSigurg = true; 245 EXPECT_EQ(SIGURG, info.si_signo); 246 EXPECT_SI_CODE(SI_USER, info.si_code); 247 }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); }); 248 port.onSignal(SIGIO).then([&](siginfo_t&& info) { 249 receivedSigio = true; 250 EXPECT_EQ(SIGIO, info.si_signo); 251 EXPECT_SI_CODE(SI_USER, info.si_code); 252 }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); }); 253 254 kill(getpid(), SIGURG); 255 kill(getpid(), SIGIO); 256 257 EXPECT_FALSE(receivedSigurg); 258 EXPECT_FALSE(receivedSigio); 259 260 loop.run(); 261 262 EXPECT_FALSE(receivedSigurg); 263 EXPECT_FALSE(receivedSigio); 264 265 port.poll(); 266 267 EXPECT_FALSE(receivedSigurg); 268 EXPECT_FALSE(receivedSigio); 269 270 loop.run(); 271 272 EXPECT_TRUE(receivedSigurg); 273 EXPECT_TRUE(receivedSigio); 274 } 275 276 #endif // !__CYGWIN32__ 277 278 TEST(AsyncUnixTest, ReadObserver) { 279 captureSignals(); 280 UnixEventPort port; 281 EventLoop loop(port); 282 WaitScope waitScope(loop); 283 284 int pipefds[2]; 285 KJ_SYSCALL(pipe(pipefds)); 286 kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]); 287 288 UnixEventPort::FdObserver observer(port, infd, UnixEventPort::FdObserver::OBSERVE_READ); 289 290 KJ_SYSCALL(write(outfd, "foo", 3)); 291 292 observer.whenBecomesReadable().wait(waitScope); 293 294 #if __linux__ // platform known to support POLLRDHUP 295 EXPECT_FALSE(KJ_ASSERT_NONNULL(observer.atEndHint())); 296 297 char buffer[4096]; 298 ssize_t n; 299 KJ_SYSCALL(n = read(infd, &buffer, sizeof(buffer))); 300 EXPECT_EQ(3, n); 301 302 KJ_SYSCALL(write(outfd, "bar", 3)); 303 outfd = nullptr; 304 305 observer.whenBecomesReadable().wait(waitScope); 306 307 EXPECT_TRUE(KJ_ASSERT_NONNULL(observer.atEndHint())); 308 #endif 309 } 310 311 TEST(AsyncUnixTest, ReadObserverMultiListen) { 312 captureSignals(); 313 UnixEventPort port; 314 EventLoop loop(port); 315 WaitScope waitScope(loop); 316 317 int bogusPipefds[2]; 318 KJ_SYSCALL(pipe(bogusPipefds)); 319 KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); }); 320 321 UnixEventPort::FdObserver bogusObserver(port, bogusPipefds[0], 322 UnixEventPort::FdObserver::OBSERVE_READ); 323 324 bogusObserver.whenBecomesReadable().then([]() { 325 ADD_FAILURE() << "Received wrong poll."; 326 }).detach([](kj::Exception&& exception) { 327 ADD_FAILURE() << kj::str(exception).cStr(); 328 }); 329 330 int pipefds[2]; 331 KJ_SYSCALL(pipe(pipefds)); 332 KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); }); 333 334 UnixEventPort::FdObserver observer(port, pipefds[0], 335 UnixEventPort::FdObserver::OBSERVE_READ); 336 KJ_SYSCALL(write(pipefds[1], "foo", 3)); 337 338 observer.whenBecomesReadable().wait(waitScope); 339 } 340 341 TEST(AsyncUnixTest, ReadObserverMultiReceive) { 342 captureSignals(); 343 UnixEventPort port; 344 EventLoop loop(port); 345 WaitScope waitScope(loop); 346 347 int pipefds[2]; 348 KJ_SYSCALL(pipe(pipefds)); 349 KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); }); 350 351 UnixEventPort::FdObserver observer(port, pipefds[0], 352 UnixEventPort::FdObserver::OBSERVE_READ); 353 KJ_SYSCALL(write(pipefds[1], "foo", 3)); 354 355 int pipefds2[2]; 356 KJ_SYSCALL(pipe(pipefds2)); 357 KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); }); 358 359 UnixEventPort::FdObserver observer2(port, pipefds2[0], 360 UnixEventPort::FdObserver::OBSERVE_READ); 361 KJ_SYSCALL(write(pipefds2[1], "bar", 3)); 362 363 auto promise1 = observer.whenBecomesReadable(); 364 auto promise2 = observer2.whenBecomesReadable(); 365 promise1.wait(waitScope); 366 promise2.wait(waitScope); 367 } 368 369 TEST(AsyncUnixTest, ReadObserverAsync) { 370 captureSignals(); 371 UnixEventPort port; 372 EventLoop loop(port); 373 WaitScope waitScope(loop); 374 375 // Make a pipe and wait on its read end while another thread writes to it. 376 int pipefds[2]; 377 KJ_SYSCALL(pipe(pipefds)); 378 KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); }); 379 UnixEventPort::FdObserver observer(port, pipefds[0], 380 UnixEventPort::FdObserver::OBSERVE_READ); 381 382 Thread thread([&]() { 383 delay(); 384 KJ_SYSCALL(write(pipefds[1], "foo", 3)); 385 }); 386 387 // Wait for the event in this thread. 388 observer.whenBecomesReadable().wait(waitScope); 389 } 390 391 TEST(AsyncUnixTest, ReadObserverNoWait) { 392 // Verify that UnixEventPort::poll() correctly receives pending FD events. 393 394 captureSignals(); 395 UnixEventPort port; 396 EventLoop loop(port); 397 WaitScope waitScope(loop); 398 399 int pipefds[2]; 400 KJ_SYSCALL(pipe(pipefds)); 401 KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); }); 402 UnixEventPort::FdObserver observer(port, pipefds[0], 403 UnixEventPort::FdObserver::OBSERVE_READ); 404 405 int pipefds2[2]; 406 KJ_SYSCALL(pipe(pipefds2)); 407 KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); }); 408 UnixEventPort::FdObserver observer2(port, pipefds2[0], 409 UnixEventPort::FdObserver::OBSERVE_READ); 410 411 int receivedCount = 0; 412 observer.whenBecomesReadable().then([&]() { 413 receivedCount++; 414 }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); }); 415 observer2.whenBecomesReadable().then([&]() { 416 receivedCount++; 417 }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); }); 418 419 KJ_SYSCALL(write(pipefds[1], "foo", 3)); 420 KJ_SYSCALL(write(pipefds2[1], "bar", 3)); 421 422 EXPECT_EQ(0, receivedCount); 423 424 loop.run(); 425 426 EXPECT_EQ(0, receivedCount); 427 428 port.poll(); 429 430 EXPECT_EQ(0, receivedCount); 431 432 loop.run(); 433 434 EXPECT_EQ(2, receivedCount); 435 } 436 437 static void setNonblocking(int fd) { 438 int flags; 439 KJ_SYSCALL(flags = fcntl(fd, F_GETFL)); 440 if ((flags & O_NONBLOCK) == 0) { 441 KJ_SYSCALL(fcntl(fd, F_SETFL, flags | O_NONBLOCK)); 442 } 443 } 444 445 TEST(AsyncUnixTest, WriteObserver) { 446 captureSignals(); 447 UnixEventPort port; 448 EventLoop loop(port); 449 WaitScope waitScope(loop); 450 451 int pipefds[2]; 452 KJ_SYSCALL(pipe(pipefds)); 453 kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]); 454 setNonblocking(outfd); 455 setNonblocking(infd); 456 457 UnixEventPort::FdObserver observer(port, outfd, UnixEventPort::FdObserver::OBSERVE_WRITE); 458 459 // Fill buffer. 460 ssize_t n; 461 do { 462 KJ_NONBLOCKING_SYSCALL(n = write(outfd, "foo", 3)); 463 } while (n >= 0); 464 465 bool writable = false; 466 auto promise = observer.whenBecomesWritable() 467 .then([&]() { writable = true; }).eagerlyEvaluate(nullptr); 468 469 loop.run(); 470 port.poll(); 471 loop.run(); 472 473 EXPECT_FALSE(writable); 474 475 // Empty the read end so that the write end becomes writable. Note that Linux implements a 476 // high watermark / low watermark heuristic which means that only reading one byte is not 477 // sufficient. The amount we have to read is in fact architecture-dependent -- it appears to be 478 // 1 page. To be safe, we read everything. 479 char buffer[4096]; 480 do { 481 KJ_NONBLOCKING_SYSCALL(n = read(infd, &buffer, sizeof(buffer))); 482 } while (n > 0); 483 484 loop.run(); 485 port.poll(); 486 loop.run(); 487 488 EXPECT_TRUE(writable); 489 } 490 491 #if !__APPLE__ 492 // Disabled on macOS due to https://github.com/sandstorm-io/capnproto/issues/374. 493 TEST(AsyncUnixTest, UrgentObserver) { 494 // Verify that FdObserver correctly detects availability of out-of-band data. 495 // Availability of out-of-band data is implementation-specific. 496 // Linux's and OS X's TCP/IP stack supports out-of-band messages for TCP sockets, which is used 497 // for this test. 498 499 UnixEventPort port; 500 EventLoop loop(port); 501 WaitScope waitScope(loop); 502 int tmpFd; 503 char c; 504 505 // Spawn a TCP server 506 KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0)); 507 kj::AutoCloseFd serverFd(tmpFd); 508 sockaddr_in saddr; 509 memset(&saddr, 0, sizeof(saddr)); 510 saddr.sin_family = AF_INET; 511 saddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); 512 KJ_SYSCALL(bind(serverFd, reinterpret_cast<sockaddr*>(&saddr), sizeof(saddr))); 513 socklen_t saddrLen = sizeof(saddr); 514 KJ_SYSCALL(getsockname(serverFd, reinterpret_cast<sockaddr*>(&saddr), &saddrLen)); 515 KJ_SYSCALL(listen(serverFd, 1)); 516 517 // Create a pipe that we'll use to signal if MSG_OOB return EINVAL. 518 int failpipe[2]; 519 KJ_SYSCALL(pipe(failpipe)); 520 KJ_DEFER({ 521 close(failpipe[0]); 522 close(failpipe[1]); 523 }); 524 525 // Accept one connection, send in-band and OOB byte, wait for a quit message 526 Thread thread([&]() { 527 int tmpFd; 528 char c; 529 530 sockaddr_in caddr; 531 socklen_t caddrLen = sizeof(caddr); 532 KJ_SYSCALL(tmpFd = accept(serverFd, reinterpret_cast<sockaddr*>(&caddr), &caddrLen)); 533 kj::AutoCloseFd clientFd(tmpFd); 534 delay(); 535 536 // Workaround: OS X won't signal POLLPRI without POLLIN. Also enqueue some in-band data. 537 c = 'i'; 538 KJ_SYSCALL(send(clientFd, &c, 1, 0)); 539 c = 'o'; 540 KJ_SYSCALL_HANDLE_ERRORS(send(clientFd, &c, 1, MSG_OOB)) { 541 case EINVAL: 542 // Looks like MSG_OOB is not supported. (This is the case e.g. on WSL.) 543 KJ_SYSCALL(write(failpipe[1], &c, 1)); 544 break; 545 default: 546 KJ_FAIL_SYSCALL("send(..., MSG_OOB)", error); 547 } 548 549 KJ_SYSCALL(recv(clientFd, &c, 1, 0)); 550 EXPECT_EQ('q', c); 551 }); 552 KJ_DEFER({ shutdown(serverFd, SHUT_RDWR); serverFd = nullptr; }); 553 554 KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0)); 555 kj::AutoCloseFd clientFd(tmpFd); 556 KJ_SYSCALL(connect(clientFd, reinterpret_cast<sockaddr*>(&saddr), saddrLen)); 557 558 UnixEventPort::FdObserver observer(port, clientFd, 559 UnixEventPort::FdObserver::OBSERVE_READ | UnixEventPort::FdObserver::OBSERVE_URGENT); 560 UnixEventPort::FdObserver failObserver(port, failpipe[0], 561 UnixEventPort::FdObserver::OBSERVE_READ | UnixEventPort::FdObserver::OBSERVE_URGENT); 562 563 auto promise = observer.whenUrgentDataAvailable().then([]() { return true; }); 564 auto failPromise = failObserver.whenBecomesReadable().then([]() { return false; }); 565 566 bool oobSupported = promise.exclusiveJoin(kj::mv(failPromise)).wait(waitScope); 567 if (oobSupported) { 568 #if __CYGWIN__ 569 // On Cygwin, reading the urgent byte first causes the subsequent regular read to block until 570 // such a time as the connection closes -- and then the byte is successfully returned. This 571 // seems to be a cygwin bug. 572 KJ_SYSCALL(recv(clientFd, &c, 1, 0)); 573 EXPECT_EQ('i', c); 574 KJ_SYSCALL(recv(clientFd, &c, 1, MSG_OOB)); 575 EXPECT_EQ('o', c); 576 #else 577 // Attempt to read the urgent byte prior to reading the in-band byte. 578 KJ_SYSCALL(recv(clientFd, &c, 1, MSG_OOB)); 579 EXPECT_EQ('o', c); 580 KJ_SYSCALL(recv(clientFd, &c, 1, 0)); 581 EXPECT_EQ('i', c); 582 #endif 583 } else { 584 KJ_LOG(WARNING, "MSG_OOB doesn't seem to be supported on your platform."); 585 } 586 587 // Allow server thread to let its clientFd go out of scope. 588 c = 'q'; 589 KJ_SYSCALL(send(clientFd, &c, 1, 0)); 590 KJ_SYSCALL(shutdown(clientFd, SHUT_RDWR)); 591 } 592 #endif 593 594 TEST(AsyncUnixTest, SteadyTimers) { 595 captureSignals(); 596 UnixEventPort port; 597 EventLoop loop(port); 598 WaitScope waitScope(loop); 599 600 auto& timer = port.getTimer(); 601 602 auto start = timer.now(); 603 kj::Vector<TimePoint> expected; 604 kj::Vector<TimePoint> actual; 605 606 auto addTimer = [&](Duration delay) { 607 expected.add(max(start + delay, start)); 608 timer.atTime(start + delay).then([&]() { 609 actual.add(timer.now()); 610 }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); }); 611 }; 612 613 addTimer(30 * MILLISECONDS); 614 addTimer(40 * MILLISECONDS); 615 addTimer(20350 * MICROSECONDS); 616 addTimer(30 * MILLISECONDS); 617 addTimer(-10 * MILLISECONDS); 618 619 std::sort(expected.begin(), expected.end()); 620 timer.atTime(expected.back() + MILLISECONDS).wait(waitScope); 621 622 ASSERT_EQ(expected.size(), actual.size()); 623 for (int i = 0; i < expected.size(); ++i) { 624 KJ_EXPECT(expected[i] <= actual[i], "Actual time for timer i is too early.", 625 i, ((expected[i] - actual[i]) / NANOSECONDS)); 626 } 627 } 628 629 bool dummySignalHandlerCalled = false; 630 void dummySignalHandler(int) { 631 dummySignalHandlerCalled = true; 632 } 633 634 TEST(AsyncUnixTest, InterruptedTimer) { 635 captureSignals(); 636 UnixEventPort port; 637 EventLoop loop(port); 638 WaitScope waitScope(loop); 639 640 #if __linux__ 641 // Linux timeslices are 1ms. 642 constexpr auto OS_SLOWNESS_FACTOR = 1; 643 #else 644 // OSX timeslices are 10ms, so we need longer timeouts to avoid flakiness. 645 // To be safe we'll assume other OS's are similar. 646 constexpr auto OS_SLOWNESS_FACTOR = 10; 647 #endif 648 649 // Schedule a timer event in 100ms. 650 auto& timer = port.getTimer(); 651 auto start = timer.now(); 652 constexpr auto timeout = 100 * MILLISECONDS * OS_SLOWNESS_FACTOR; 653 654 // Arrange SIGALRM to be delivered in 50ms, handled in an empty signal handler. This will cause 655 // our wait to be interrupted with EINTR. We should nevertheless continue waiting for the right 656 // amount of time. 657 dummySignalHandlerCalled = false; 658 if (signal(SIGALRM, &dummySignalHandler) == SIG_ERR) { 659 KJ_FAIL_SYSCALL("signal(SIGALRM)", errno); 660 } 661 struct itimerval itv; 662 memset(&itv, 0, sizeof(itv)); 663 itv.it_value.tv_usec = 50000 * OS_SLOWNESS_FACTOR; // signal after 50ms 664 setitimer(ITIMER_REAL, &itv, nullptr); 665 666 timer.afterDelay(timeout).wait(waitScope); 667 668 KJ_EXPECT(dummySignalHandlerCalled); 669 KJ_EXPECT(timer.now() - start >= timeout); 670 KJ_EXPECT(timer.now() - start <= timeout + (timeout / 5)); // allow 20ms error 671 } 672 673 TEST(AsyncUnixTest, Wake) { 674 captureSignals(); 675 UnixEventPort port; 676 EventLoop loop(port); 677 WaitScope waitScope(loop); 678 679 EXPECT_FALSE(port.poll()); 680 port.wake(); 681 EXPECT_TRUE(port.poll()); 682 EXPECT_FALSE(port.poll()); 683 684 port.wake(); 685 EXPECT_TRUE(port.wait()); 686 687 { 688 auto promise = port.getTimer().atTime(port.getTimer().now()); 689 EXPECT_FALSE(port.wait()); 690 } 691 692 // Test wake() when already wait()ing. 693 { 694 Thread thread([&]() { 695 delay(); 696 port.wake(); 697 }); 698 699 EXPECT_TRUE(port.wait()); 700 } 701 702 // Test wait() after wake() already happened. 703 { 704 Thread thread([&]() { 705 port.wake(); 706 }); 707 708 delay(); 709 EXPECT_TRUE(port.wait()); 710 } 711 712 // Test wake() during poll() busy loop. 713 { 714 Thread thread([&]() { 715 delay(); 716 port.wake(); 717 }); 718 719 EXPECT_FALSE(port.poll()); 720 while (!port.poll()) {} 721 } 722 723 // Test poll() when wake() already delivered. 724 { 725 EXPECT_FALSE(port.poll()); 726 727 Thread thread([&]() { 728 port.wake(); 729 }); 730 731 do { 732 delay(); 733 } while (!port.poll()); 734 } 735 } 736 737 int exitCodeForSignal = 0; 738 [[noreturn]] void exitSignalHandler(int) { 739 _exit(exitCodeForSignal); 740 } 741 742 struct TestChild { 743 kj::Maybe<pid_t> pid; 744 kj::Promise<int> promise = nullptr; 745 746 TestChild(UnixEventPort& port, int exitCode) { 747 pid_t p; 748 KJ_SYSCALL(p = fork()); 749 if (p == 0) { 750 // Arrange for SIGTERM to cause the process to exit normally. 751 exitCodeForSignal = exitCode; 752 signal(SIGTERM, &exitSignalHandler); 753 sigset_t sigs; 754 sigemptyset(&sigs); 755 sigaddset(&sigs, SIGTERM); 756 pthread_sigmask(SIG_UNBLOCK, &sigs, nullptr); 757 758 for (;;) pause(); 759 } 760 pid = p; 761 promise = port.onChildExit(pid); 762 } 763 764 ~TestChild() noexcept(false) { 765 KJ_IF_MAYBE(p, pid) { 766 KJ_SYSCALL(::kill(*p, SIGKILL)) { return; } 767 int status; 768 KJ_SYSCALL(waitpid(*p, &status, 0)) { return; } 769 } 770 } 771 772 void kill(int signo) { 773 KJ_SYSCALL(::kill(KJ_REQUIRE_NONNULL(pid), signo)); 774 } 775 776 KJ_DISALLOW_COPY(TestChild); 777 }; 778 779 TEST(AsyncUnixTest, ChildProcess) { 780 captureSignals(); 781 UnixEventPort port; 782 EventLoop loop(port); 783 WaitScope waitScope(loop); 784 785 // Block SIGTERM so that we can carefully un-block it in children. 786 sigset_t sigs, oldsigs; 787 KJ_SYSCALL(sigemptyset(&sigs)); 788 KJ_SYSCALL(sigaddset(&sigs, SIGTERM)); 789 KJ_SYSCALL(pthread_sigmask(SIG_BLOCK, &sigs, &oldsigs)); 790 KJ_DEFER(KJ_SYSCALL(pthread_sigmask(SIG_SETMASK, &oldsigs, nullptr)) { break; }); 791 792 TestChild child1(port, 123); 793 KJ_EXPECT(!child1.promise.poll(waitScope)); 794 795 child1.kill(SIGTERM); 796 797 { 798 int status = child1.promise.wait(waitScope); 799 KJ_EXPECT(WIFEXITED(status)); 800 KJ_EXPECT(WEXITSTATUS(status) == 123); 801 } 802 803 TestChild child2(port, 234); 804 TestChild child3(port, 345); 805 806 KJ_EXPECT(!child2.promise.poll(waitScope)); 807 KJ_EXPECT(!child3.promise.poll(waitScope)); 808 809 child2.kill(SIGKILL); 810 811 { 812 int status = child2.promise.wait(waitScope); 813 KJ_EXPECT(!WIFEXITED(status)); 814 KJ_EXPECT(WIFSIGNALED(status)); 815 KJ_EXPECT(WTERMSIG(status) == SIGKILL); 816 } 817 818 KJ_EXPECT(!child3.promise.poll(waitScope)); 819 820 // child3 will be killed and synchronously waited on the way out. 821 } 822 823 #if !__CYGWIN__ 824 // TODO(someday): Figure out why whenWriteDisconnected() never resolves on Cygwin. 825 826 KJ_TEST("UnixEventPort whenWriteDisconnected()") { 827 captureSignals(); 828 UnixEventPort port; 829 EventLoop loop(port); 830 WaitScope waitScope(loop); 831 832 int fds_[2]; 833 KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds_)); 834 kj::AutoCloseFd fds[2] = { kj::AutoCloseFd(fds_[0]), kj::AutoCloseFd(fds_[1]) }; 835 836 UnixEventPort::FdObserver observer(port, fds[0], UnixEventPort::FdObserver::OBSERVE_READ); 837 838 // At one point, the poll()-based version of UnixEventPort had a bug where if some other event 839 // had completed previously, whenWriteDisconnected() would stop being watched for. So we watch 840 // for readability as well and check that that goes away first. 841 auto readablePromise = observer.whenBecomesReadable(); 842 auto hupPromise = observer.whenWriteDisconnected(); 843 844 KJ_EXPECT(!readablePromise.poll(waitScope)); 845 KJ_EXPECT(!hupPromise.poll(waitScope)); 846 847 KJ_SYSCALL(write(fds[1], "foo", 3)); 848 849 KJ_ASSERT(readablePromise.poll(waitScope)); 850 readablePromise.wait(waitScope); 851 852 { 853 char junk[16]; 854 ssize_t n; 855 KJ_SYSCALL(n = read(fds[0], junk, 16)); 856 KJ_EXPECT(n == 3); 857 } 858 859 KJ_EXPECT(!hupPromise.poll(waitScope)); 860 861 fds[1] = nullptr; 862 KJ_ASSERT(hupPromise.poll(waitScope)); 863 hupPromise.wait(waitScope); 864 } 865 866 KJ_TEST("UnixEventPort FdObserver(..., flags=0)::whenWriteDisconnected()") { 867 // Verifies that given `0' as a `flags' argument, 868 // FdObserver still observes whenWriteDisconnected(). 869 // 870 // This can be useful to watch disconnection on a blocking file descriptor. 871 // See discussion: https://github.com/capnproto/capnproto/issues/924 872 873 captureSignals(); 874 UnixEventPort port; 875 EventLoop loop(port); 876 WaitScope waitScope(loop); 877 878 int pipefds[2]; 879 KJ_SYSCALL(pipe(pipefds)); 880 kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]); 881 882 UnixEventPort::FdObserver observer(port, outfd, 0); 883 884 auto hupPromise = observer.whenWriteDisconnected(); 885 886 KJ_EXPECT(!hupPromise.poll(waitScope)); 887 888 infd = nullptr; 889 KJ_ASSERT(hupPromise.poll(waitScope)); 890 hupPromise.wait(waitScope); 891 } 892 893 #endif 894 895 KJ_TEST("UnixEventPort poll for signals") { 896 captureSignals(); 897 UnixEventPort port; 898 EventLoop loop(port); 899 WaitScope waitScope(loop); 900 901 auto promise1 = port.onSignal(SIGURG); 902 auto promise2 = port.onSignal(SIGIO); 903 904 KJ_EXPECT(!promise1.poll(waitScope)); 905 KJ_EXPECT(!promise2.poll(waitScope)); 906 907 KJ_SYSCALL(raise(SIGURG)); 908 KJ_SYSCALL(raise(SIGIO)); 909 port.wake(); 910 911 KJ_EXPECT(port.poll()); 912 KJ_EXPECT(promise1.poll(waitScope)); 913 KJ_EXPECT(promise2.poll(waitScope)); 914 915 promise1.wait(waitScope); 916 promise2.wait(waitScope); 917 } 918 919 #if defined(SIGRTMIN) && !__CYGWIN__ && !__aarch64__ 920 // TODO(someday): Figure out why RT signals don't seem to work correctly on Cygwin. It looks like 921 // only the first signal is delivered, like how non-RT signals work. Is it possible Cygwin 922 // advertites RT signal support but doesn't actually implement them correctly? I can't find any 923 // information on the internet about this and TBH I don't care about Cygwin enough to dig in. 924 // TODO(someday): Figure out why RT signals don't work under qemu-user emulating aarch64 on 925 // Debian Buster. 926 927 void testRtSignals(UnixEventPort& port, WaitScope& waitScope, bool doPoll) { 928 union sigval value; 929 memset(&value, 0, sizeof(value)); 930 931 // Queue three copies of the signal upfront. 932 for (uint i = 0; i < 3; i++) { 933 value.sival_int = 123 + i; 934 KJ_SYSCALL(sigqueue(getpid(), SIGRTMIN, value)); 935 } 936 937 // Now wait for them. 938 for (uint i = 0; i < 3; i++) { 939 auto promise = port.onSignal(SIGRTMIN); 940 if (doPoll) { 941 KJ_ASSERT(promise.poll(waitScope)); 942 } 943 auto info = promise.wait(waitScope); 944 KJ_EXPECT(info.si_value.sival_int == 123 + i); 945 } 946 947 KJ_EXPECT(!port.onSignal(SIGRTMIN).poll(waitScope)); 948 } 949 950 KJ_TEST("UnixEventPort can receive multiple queued instances of an RT signal") { 951 captureSignals(); 952 UnixEventPort port; 953 EventLoop loop(port); 954 WaitScope waitScope(loop); 955 956 testRtSignals(port, waitScope, true); 957 958 // Test again, but don't poll() the promises. This may test a different code path, if poll() and 959 // wait() are very different in how they read signals. (For the poll(2)-based implementation of 960 // UnixEventPort, they are indeed pretty different.) 961 testRtSignals(port, waitScope, false); 962 } 963 #endif 964 965 } // namespace 966 } // namespace kj 967 968 #endif // !_WIN32