async-xthread-test.c++ (32599B)
1 // Copyright (c) 2019 Cloudflare, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #if _WIN32 23 #include "win32-api-version.h" 24 #endif 25 26 #include "async.h" 27 #include "debug.h" 28 #include "thread.h" 29 #include "mutex.h" 30 #include <kj/test.h> 31 32 #if _WIN32 33 #include <windows.h> 34 #include "windows-sanity.h" 35 inline void delay() { Sleep(10); } 36 #else 37 #include <unistd.h> 38 inline void delay() { usleep(10000); } 39 #endif 40 41 // This file is #included from async-unix-xthread-test.c++ and async-win32-xthread-test.c++ after 42 // defining KJ_XTHREAD_TEST_SETUP_LOOP to set up a loop with the corresponding EventPort. 43 #ifndef KJ_XTHREAD_TEST_SETUP_LOOP 44 #define KJ_XTHREAD_TEST_SETUP_LOOP \ 45 EventLoop loop; \ 46 WaitScope waitScope(loop) 47 #endif 48 49 namespace kj { 50 namespace { 51 52 KJ_TEST("synchonous simple cross-thread events") { 53 MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread 54 Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread 55 thread_local bool isChild = false; // to assert which thread we're in 56 57 // We use `noexcept` so that any uncaught exceptions immediately terminate the process without 58 // unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with 59 // the other thread. 60 Thread thread([&]() noexcept { 61 isChild = true; 62 63 KJ_XTHREAD_TEST_SETUP_LOOP; 64 65 auto paf = newPromiseAndFulfiller<uint>(); 66 fulfiller = kj::mv(paf.fulfiller); 67 68 *executor.lockExclusive() = getCurrentThreadExecutor(); 69 70 KJ_ASSERT(paf.promise.wait(waitScope) == 123); 71 72 // Wait until parent thread sets executor to null, as a way to tell us to quit. 73 executor.lockExclusive().wait([](auto& val) { return val == nullptr; }); 74 }); 75 76 ([&]() noexcept { 77 const Executor* exec; 78 { 79 auto lock = executor.lockExclusive(); 80 lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; }); 81 exec = &KJ_ASSERT_NONNULL(*lock); 82 } 83 84 KJ_ASSERT(!isChild); 85 86 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test exception", exec->executeSync([&]() { 87 KJ_ASSERT(isChild); 88 KJ_FAIL_ASSERT("test exception") { break; } 89 })); 90 91 uint i = exec->executeSync([&]() { 92 KJ_ASSERT(isChild); 93 fulfiller->fulfill(123); 94 return 456; 95 }); 96 KJ_EXPECT(i == 456); 97 98 *executor.lockExclusive() = nullptr; 99 })(); 100 } 101 102 KJ_TEST("asynchonous simple cross-thread events") { 103 MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread 104 Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread 105 thread_local bool isChild = false; // to assert which thread we're in 106 107 // We use `noexcept` so that any uncaught exceptions immediately terminate the process without 108 // unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with 109 // the other thread. 110 Thread thread([&]() noexcept { 111 isChild = true; 112 113 KJ_XTHREAD_TEST_SETUP_LOOP; 114 115 auto paf = newPromiseAndFulfiller<uint>(); 116 fulfiller = kj::mv(paf.fulfiller); 117 118 *executor.lockExclusive() = getCurrentThreadExecutor(); 119 120 KJ_ASSERT(paf.promise.wait(waitScope) == 123); 121 122 // Wait until parent thread sets executor to null, as a way to tell us to quit. 123 executor.lockExclusive().wait([](auto& val) { return val == nullptr; }); 124 }); 125 126 ([&]() noexcept { 127 KJ_XTHREAD_TEST_SETUP_LOOP; 128 129 const Executor* exec; 130 { 131 auto lock = executor.lockExclusive(); 132 lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; }); 133 exec = &KJ_ASSERT_NONNULL(*lock); 134 } 135 136 KJ_ASSERT(!isChild); 137 138 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test exception", exec->executeAsync([&]() { 139 KJ_ASSERT(isChild); 140 KJ_FAIL_ASSERT("test exception") { break; } 141 }).wait(waitScope)); 142 143 Promise<uint> promise = exec->executeAsync([&]() { 144 KJ_ASSERT(isChild); 145 fulfiller->fulfill(123); 146 return 456u; 147 }); 148 KJ_EXPECT(promise.wait(waitScope) == 456); 149 150 *executor.lockExclusive() = nullptr; 151 })(); 152 } 153 154 KJ_TEST("synchonous promise cross-thread events") { 155 MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread 156 Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread 157 Promise<uint> promise = nullptr; // accessed only from the subthread 158 thread_local bool isChild = false; // to assert which thread we're in 159 160 // We use `noexcept` so that any uncaught exceptions immediately terminate the process without 161 // unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with 162 // the other thread. 163 Thread thread([&]() noexcept { 164 isChild = true; 165 166 KJ_XTHREAD_TEST_SETUP_LOOP; 167 168 auto paf = newPromiseAndFulfiller<uint>(); 169 fulfiller = kj::mv(paf.fulfiller); 170 171 auto paf2 = newPromiseAndFulfiller<uint>(); 172 promise = kj::mv(paf2.promise); 173 174 *executor.lockExclusive() = getCurrentThreadExecutor(); 175 176 KJ_ASSERT(paf.promise.wait(waitScope) == 123); 177 178 paf2.fulfiller->fulfill(321); 179 180 // Make sure reply gets sent. 181 loop.run(); 182 183 // Wait until parent thread sets executor to null, as a way to tell us to quit. 184 executor.lockExclusive().wait([](auto& val) { return val == nullptr; }); 185 }); 186 187 ([&]() noexcept { 188 const Executor* exec; 189 { 190 auto lock = executor.lockExclusive(); 191 lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; }); 192 exec = &KJ_ASSERT_NONNULL(*lock); 193 } 194 195 KJ_ASSERT(!isChild); 196 197 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test exception", exec->executeSync([&]() { 198 KJ_ASSERT(isChild); 199 return kj::Promise<void>(KJ_EXCEPTION(FAILED, "test exception")); 200 })); 201 202 uint i = exec->executeSync([&]() { 203 KJ_ASSERT(isChild); 204 fulfiller->fulfill(123); 205 return kj::mv(promise); 206 }); 207 KJ_EXPECT(i == 321); 208 209 *executor.lockExclusive() = nullptr; 210 })(); 211 } 212 213 KJ_TEST("asynchonous promise cross-thread events") { 214 MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread 215 Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread 216 Promise<uint> promise = nullptr; // accessed only from the subthread 217 thread_local bool isChild = false; // to assert which thread we're in 218 219 // We use `noexcept` so that any uncaught exceptions immediately terminate the process without 220 // unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with 221 // the other thread. 222 Thread thread([&]() noexcept { 223 isChild = true; 224 225 KJ_XTHREAD_TEST_SETUP_LOOP; 226 227 auto paf = newPromiseAndFulfiller<uint>(); 228 fulfiller = kj::mv(paf.fulfiller); 229 230 auto paf2 = newPromiseAndFulfiller<uint>(); 231 promise = kj::mv(paf2.promise); 232 233 *executor.lockExclusive() = getCurrentThreadExecutor(); 234 235 KJ_ASSERT(paf.promise.wait(waitScope) == 123); 236 237 paf2.fulfiller->fulfill(321); 238 239 // Make sure reply gets sent. 240 loop.run(); 241 242 // Wait until parent thread sets executor to null, as a way to tell us to quit. 243 executor.lockExclusive().wait([](auto& val) { return val == nullptr; }); 244 }); 245 246 ([&]() noexcept { 247 KJ_XTHREAD_TEST_SETUP_LOOP; 248 249 const Executor* exec; 250 { 251 auto lock = executor.lockExclusive(); 252 lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; }); 253 exec = &KJ_ASSERT_NONNULL(*lock); 254 } 255 256 KJ_ASSERT(!isChild); 257 258 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test exception", exec->executeAsync([&]() { 259 KJ_ASSERT(isChild); 260 return kj::Promise<void>(KJ_EXCEPTION(FAILED, "test exception")); 261 }).wait(waitScope)); 262 263 Promise<uint> promise2 = exec->executeAsync([&]() { 264 KJ_ASSERT(isChild); 265 fulfiller->fulfill(123); 266 return kj::mv(promise); 267 }); 268 KJ_EXPECT(promise2.wait(waitScope) == 321); 269 270 *executor.lockExclusive() = nullptr; 271 })(); 272 } 273 274 KJ_TEST("cancel cross-thread event before it runs") { 275 MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread 276 277 // We use `noexcept` so that any uncaught exceptions immediately terminate the process without 278 // unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with 279 // the other thread. 280 Thread thread([&]() noexcept { 281 KJ_XTHREAD_TEST_SETUP_LOOP; 282 283 *executor.lockExclusive() = getCurrentThreadExecutor(); 284 285 // We never run the loop here, so that when the event is canceled, it's still queued. 286 287 // Wait until parent thread sets executor to null, as a way to tell us to quit. 288 executor.lockExclusive().wait([](auto& val) { return val == nullptr; }); 289 }); 290 291 ([&]() noexcept { 292 KJ_XTHREAD_TEST_SETUP_LOOP; 293 294 const Executor* exec; 295 { 296 auto lock = executor.lockExclusive(); 297 lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; }); 298 exec = &KJ_ASSERT_NONNULL(*lock); 299 } 300 301 volatile bool called = false; 302 { 303 Promise<uint> promise = exec->executeAsync([&]() { called = true; return 123u; }); 304 delay(); 305 KJ_EXPECT(!promise.poll(waitScope)); 306 } 307 KJ_EXPECT(!called); 308 309 *executor.lockExclusive() = nullptr; 310 })(); 311 } 312 313 KJ_TEST("cancel cross-thread event while it runs") { 314 MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread 315 Own<PromiseFulfiller<void>> fulfiller; // accessed only from the subthread 316 317 // We use `noexcept` so that any uncaught exceptions immediately terminate the process without 318 // unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with 319 // the other thread. 320 Thread thread([&]() noexcept { 321 KJ_XTHREAD_TEST_SETUP_LOOP; 322 323 auto paf = newPromiseAndFulfiller<void>(); 324 fulfiller = kj::mv(paf.fulfiller); 325 326 *executor.lockExclusive() = getCurrentThreadExecutor(); 327 328 paf.promise.wait(waitScope); 329 330 // Wait until parent thread sets executor to null, as a way to tell us to quit. 331 executor.lockExclusive().wait([](auto& val) { return val == nullptr; }); 332 }); 333 334 ([&]() noexcept { 335 KJ_XTHREAD_TEST_SETUP_LOOP; 336 337 const Executor* exec; 338 { 339 auto lock = executor.lockExclusive(); 340 lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; }); 341 exec = &KJ_ASSERT_NONNULL(*lock); 342 } 343 344 { 345 volatile bool called = false; 346 Promise<uint> promise = exec->executeAsync([&]() -> kj::Promise<uint> { 347 called = true; 348 return kj::NEVER_DONE; 349 }); 350 while (!called) { 351 delay(); 352 } 353 KJ_EXPECT(!promise.poll(waitScope)); 354 } 355 356 exec->executeSync([&]() { fulfiller->fulfill(); }); 357 358 *executor.lockExclusive() = nullptr; 359 })(); 360 } 361 362 KJ_TEST("cross-thread cancellation in both directions at once") { 363 MutexGuarded<kj::Maybe<const Executor&>> childExecutor; 364 MutexGuarded<kj::Maybe<const Executor&>> parentExecutor; 365 366 MutexGuarded<uint> readyCount(0); 367 368 thread_local uint threadNumber = 0; 369 thread_local bool receivedFinalCall = false; 370 371 // Code to execute simultaneously in two threads... 372 // We mark this noexcept so that any exceptions thrown will immediately invoke the termination 373 // handler, skipping any destructors that would deadlock. 374 auto simultaneous = [&](MutexGuarded<kj::Maybe<const Executor&>>& selfExecutor, 375 MutexGuarded<kj::Maybe<const Executor&>>& otherExecutor, 376 uint threadCount) noexcept { 377 KJ_XTHREAD_TEST_SETUP_LOOP; 378 379 *selfExecutor.lockExclusive() = getCurrentThreadExecutor(); 380 381 const Executor* exec; 382 { 383 auto lock = otherExecutor.lockExclusive(); 384 lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; }); 385 exec = &KJ_ASSERT_NONNULL(*lock); 386 } 387 388 // Create a ton of cross-thread promises to cancel. 389 Vector<Promise<void>> promises; 390 for (uint i = 0; i < 1000; i++) { 391 promises.add(exec->executeAsync([&]() -> kj::Promise<void> { 392 return kj::Promise<void>(kj::NEVER_DONE) 393 .attach(kj::defer([wasThreadNumber = threadNumber]() { 394 // Make sure destruction happens in the correct thread. 395 KJ_ASSERT(threadNumber == wasThreadNumber); 396 })); 397 })); 398 } 399 400 // Signal other thread that we're done queueing, and wait for it to signal same. 401 { 402 auto lock = readyCount.lockExclusive(); 403 ++*lock; 404 lock.wait([&](uint i) { return i >= threadCount; }); 405 } 406 407 // Run event loop to start all executions queued by the other thread. 408 waitScope.poll(); 409 loop.run(); 410 411 // Signal other thread that we've run the loop, and wait for it to signal same. 412 { 413 auto lock = readyCount.lockExclusive(); 414 ++*lock; 415 lock.wait([&](uint i) { return i >= threadCount * 2; }); 416 } 417 418 // Cancel all the promises. 419 promises.clear(); 420 421 // All our cancellations completed, but the other thread may still be waiting for some 422 // cancellations from us. We need to pump our event loop to make sure we continue handling 423 // those cancellation requests. In particular we'll queue a function to the other thread and 424 // wait for it to complete. The other thread will queue its own function to this thread just 425 // before completing the function we queued to it. 426 receivedFinalCall = false; 427 exec->executeAsync([&]() { receivedFinalCall = true; }).wait(waitScope); 428 429 // To be safe, make sure we've actually executed the function that the other thread queued to 430 // us by repeatedly polling until `receivedFinalCall` becomes true in this thread. 431 while (!receivedFinalCall) { 432 waitScope.poll(); 433 loop.run(); 434 } 435 436 // OK, signal other that we're all done. 437 *otherExecutor.lockExclusive() = nullptr; 438 439 // Wait until other thread sets executor to null, as a way to tell us to quit. 440 selfExecutor.lockExclusive().wait([](auto& val) { return val == nullptr; }); 441 }; 442 443 { 444 Thread thread([&]() { 445 threadNumber = 1; 446 simultaneous(childExecutor, parentExecutor, 2); 447 }); 448 449 threadNumber = 0; 450 simultaneous(parentExecutor, childExecutor, 2); 451 } 452 453 // Let's even have a three-thread version, with cyclic cancellation requests. 454 MutexGuarded<kj::Maybe<const Executor&>> child2Executor; 455 *readyCount.lockExclusive() = 0; 456 457 { 458 Thread thread1([&]() { 459 threadNumber = 1; 460 simultaneous(childExecutor, child2Executor, 3); 461 }); 462 463 Thread thread2([&]() { 464 threadNumber = 2; 465 simultaneous(child2Executor, parentExecutor, 3); 466 }); 467 468 threadNumber = 0; 469 simultaneous(parentExecutor, childExecutor, 3); 470 } 471 } 472 473 KJ_TEST("cross-thread cancellation cycle") { 474 // Another multi-way cancellation test where we set up an actual cycle between three threads 475 // waiting on each other to complete a single event. 476 477 MutexGuarded<kj::Maybe<const Executor&>> child1Executor, child2Executor; 478 479 Own<PromiseFulfiller<void>> fulfiller1, fulfiller2; 480 481 auto threadMain = [](MutexGuarded<kj::Maybe<const Executor&>>& executor, 482 Own<PromiseFulfiller<void>>& fulfiller) noexcept { 483 KJ_XTHREAD_TEST_SETUP_LOOP; 484 485 auto paf = newPromiseAndFulfiller<void>(); 486 fulfiller = kj::mv(paf.fulfiller); 487 488 *executor.lockExclusive() = getCurrentThreadExecutor(); 489 490 paf.promise.wait(waitScope); 491 492 // Wait until parent thread sets executor to null, as a way to tell us to quit. 493 executor.lockExclusive().wait([](auto& val) { return val == nullptr; }); 494 }; 495 496 Thread thread1([&]() noexcept { threadMain(child1Executor, fulfiller1); }); 497 Thread thread2([&]() noexcept { threadMain(child2Executor, fulfiller2); }); 498 499 ([&]() noexcept { 500 KJ_XTHREAD_TEST_SETUP_LOOP; 501 auto& parentExecutor = getCurrentThreadExecutor(); 502 503 const Executor* exec1; 504 { 505 auto lock = child1Executor.lockExclusive(); 506 lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; }); 507 exec1 = &KJ_ASSERT_NONNULL(*lock); 508 } 509 const Executor* exec2; 510 { 511 auto lock = child2Executor.lockExclusive(); 512 lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; }); 513 exec2 = &KJ_ASSERT_NONNULL(*lock); 514 } 515 516 // Create an event that cycles through both threads and back to this one, and then cancel it. 517 bool cycleAllDestroyed = false; 518 { 519 auto paf = kj::newPromiseAndFulfiller<void>(); 520 Promise<uint> promise = exec1->executeAsync([&]() -> kj::Promise<uint> { 521 return exec2->executeAsync([&]() -> kj::Promise<uint> { 522 return parentExecutor.executeAsync([&]() -> kj::Promise<uint> { 523 paf.fulfiller->fulfill(); 524 return kj::Promise<uint>(kj::NEVER_DONE).attach(kj::defer([&]() { 525 cycleAllDestroyed = true; 526 })); 527 }); 528 }); 529 }); 530 531 // Wait until the cycle has come all the way around. 532 paf.promise.wait(waitScope); 533 534 KJ_EXPECT(!promise.poll(waitScope)); 535 } 536 537 KJ_EXPECT(cycleAllDestroyed); 538 539 exec1->executeSync([&]() { fulfiller1->fulfill(); }); 540 exec2->executeSync([&]() { fulfiller2->fulfill(); }); 541 542 *child1Executor.lockExclusive() = nullptr; 543 *child2Executor.lockExclusive() = nullptr; 544 })(); 545 } 546 547 KJ_TEST("call own thread's executor") { 548 KJ_XTHREAD_TEST_SETUP_LOOP; 549 550 auto& executor = getCurrentThreadExecutor(); 551 552 { 553 uint i = executor.executeSync([]() { 554 return 123u; 555 }); 556 KJ_EXPECT(i == 123); 557 } 558 559 KJ_EXPECT_THROW_MESSAGE( 560 "can't call executeSync() on own thread's executor with a promise-returning function", 561 executor.executeSync([]() { return kj::evalLater([]() {}); })); 562 563 { 564 uint i = executor.executeAsync([]() { 565 return 123u; 566 }).wait(waitScope); 567 KJ_EXPECT(i == 123); 568 } 569 } 570 571 KJ_TEST("synchronous cross-thread event disconnected") { 572 MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread 573 Own<PromiseFulfiller<void>> fulfiller; // accessed only from the subthread 574 thread_local bool isChild = false; // to assert which thread we're in 575 576 Thread thread([&]() noexcept { 577 isChild = true; 578 579 { 580 KJ_XTHREAD_TEST_SETUP_LOOP; 581 582 auto paf = newPromiseAndFulfiller<void>(); 583 fulfiller = kj::mv(paf.fulfiller); 584 585 *executor.lockExclusive() = getCurrentThreadExecutor(); 586 587 paf.promise.wait(waitScope); 588 589 // Exit the event loop! 590 } 591 592 // Wait until parent thread sets executor to null, as a way to tell us to quit. 593 executor.lockExclusive().wait([](auto& val) { return val == nullptr; }); 594 }); 595 596 ([&]() noexcept { 597 Own<const Executor> exec; 598 { 599 auto lock = executor.lockExclusive(); 600 lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; }); 601 exec = KJ_ASSERT_NONNULL(*lock).addRef(); 602 } 603 604 KJ_EXPECT(!isChild); 605 606 KJ_EXPECT(exec->isLive()); 607 608 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE( 609 "Executor's event loop exited before cross-thread event could complete", 610 exec->executeSync([&]() -> Promise<void> { 611 fulfiller->fulfill(); 612 return kj::NEVER_DONE; 613 })); 614 615 KJ_EXPECT(!exec->isLive()); 616 617 KJ_EXPECT_THROW_MESSAGE( 618 "Executor's event loop has exited", 619 exec->executeSync([&]() {})); 620 621 *executor.lockExclusive() = nullptr; 622 })(); 623 } 624 625 KJ_TEST("asynchronous cross-thread event disconnected") { 626 MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread 627 Own<PromiseFulfiller<void>> fulfiller; // accessed only from the subthread 628 thread_local bool isChild = false; // to assert which thread we're in 629 630 Thread thread([&]() noexcept { 631 isChild = true; 632 633 { 634 KJ_XTHREAD_TEST_SETUP_LOOP; 635 636 auto paf = newPromiseAndFulfiller<void>(); 637 fulfiller = kj::mv(paf.fulfiller); 638 639 *executor.lockExclusive() = getCurrentThreadExecutor(); 640 641 paf.promise.wait(waitScope); 642 643 // Exit the event loop! 644 } 645 646 // Wait until parent thread sets executor to null, as a way to tell us to quit. 647 executor.lockExclusive().wait([](auto& val) { return val == nullptr; }); 648 }); 649 650 ([&]() noexcept { 651 KJ_XTHREAD_TEST_SETUP_LOOP; 652 653 Own<const Executor> exec; 654 { 655 auto lock = executor.lockExclusive(); 656 lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; }); 657 exec = KJ_ASSERT_NONNULL(*lock).addRef(); 658 } 659 660 KJ_EXPECT(!isChild); 661 662 KJ_EXPECT(exec->isLive()); 663 664 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE( 665 "Executor's event loop exited before cross-thread event could complete", 666 exec->executeAsync([&]() -> Promise<void> { 667 fulfiller->fulfill(); 668 return kj::NEVER_DONE; 669 }).wait(waitScope)); 670 671 KJ_EXPECT(!exec->isLive()); 672 673 KJ_EXPECT_THROW_MESSAGE( 674 "Executor's event loop has exited", 675 exec->executeAsync([&]() {}).wait(waitScope)); 676 677 *executor.lockExclusive() = nullptr; 678 })(); 679 } 680 681 KJ_TEST("cross-thread event disconnected before it runs") { 682 MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread 683 thread_local bool isChild = false; // to assert which thread we're in 684 685 Thread thread([&]() noexcept { 686 isChild = true; 687 688 KJ_XTHREAD_TEST_SETUP_LOOP; 689 690 *executor.lockExclusive() = getCurrentThreadExecutor(); 691 692 // Don't actually run the event loop. Destroy it when the other thread signals us to. 693 executor.lockExclusive().wait([](auto& val) { return val == nullptr; }); 694 }); 695 696 ([&]() noexcept { 697 KJ_XTHREAD_TEST_SETUP_LOOP; 698 699 Own<const Executor> exec; 700 { 701 auto lock = executor.lockExclusive(); 702 lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; }); 703 exec = KJ_ASSERT_NONNULL(*lock).addRef(); 704 } 705 706 KJ_EXPECT(!isChild); 707 708 KJ_EXPECT(exec->isLive()); 709 710 auto promise = exec->executeAsync([&]() { 711 KJ_LOG(ERROR, "shouldn't have executed"); 712 }); 713 KJ_EXPECT(!promise.poll(waitScope)); 714 715 *executor.lockExclusive() = nullptr; 716 717 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE( 718 "Executor's event loop exited before cross-thread event could complete", 719 promise.wait(waitScope)); 720 721 KJ_EXPECT(!exec->isLive()); 722 })(); 723 } 724 725 KJ_TEST("cross-thread event disconnected without holding Executor ref") { 726 MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread 727 Own<PromiseFulfiller<void>> fulfiller; // accessed only from the subthread 728 thread_local bool isChild = false; // to assert which thread we're in 729 730 Thread thread([&]() noexcept { 731 isChild = true; 732 733 { 734 KJ_XTHREAD_TEST_SETUP_LOOP; 735 736 auto paf = newPromiseAndFulfiller<void>(); 737 fulfiller = kj::mv(paf.fulfiller); 738 739 *executor.lockExclusive() = getCurrentThreadExecutor(); 740 741 paf.promise.wait(waitScope); 742 743 // Exit the event loop! 744 } 745 746 // Wait until parent thread sets executor to null, as a way to tell us to quit. 747 executor.lockExclusive().wait([](auto& val) { return val == nullptr; }); 748 }); 749 750 ([&]() noexcept { 751 const Executor* exec; 752 { 753 auto lock = executor.lockExclusive(); 754 lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; }); 755 exec = &KJ_ASSERT_NONNULL(*lock); 756 } 757 758 KJ_EXPECT(!isChild); 759 760 KJ_EXPECT(exec->isLive()); 761 762 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE( 763 "Executor's event loop exited before cross-thread event could complete", 764 exec->executeSync([&]() -> Promise<void> { 765 fulfiller->fulfill(); 766 return kj::NEVER_DONE; 767 })); 768 769 // Can't check `exec->isLive()` because it's been destroyed by now. 770 771 *executor.lockExclusive() = nullptr; 772 })(); 773 } 774 775 KJ_TEST("detached cross-thread event doesn't cause crash") { 776 MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread 777 Own<PromiseFulfiller<void>> fulfiller; // accessed only from the subthread 778 779 Thread thread([&]() noexcept { 780 KJ_XTHREAD_TEST_SETUP_LOOP; 781 782 auto paf = newPromiseAndFulfiller<void>(); 783 fulfiller = kj::mv(paf.fulfiller); 784 785 *executor.lockExclusive() = getCurrentThreadExecutor(); 786 787 paf.promise.wait(waitScope); 788 789 // Without this poll(), we don't attempt to reply to the other thread? But this isn't required 790 // in other tests, for some reason? Oh well. 791 waitScope.poll(); 792 793 executor.lockExclusive().wait([](auto& val) { return val == nullptr; }); 794 }); 795 796 ([&]() noexcept { 797 { 798 KJ_XTHREAD_TEST_SETUP_LOOP; 799 800 const Executor* exec; 801 { 802 auto lock = executor.lockExclusive(); 803 lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; }); 804 exec = &KJ_ASSERT_NONNULL(*lock); 805 } 806 807 exec->executeAsync([&]() -> kj::Promise<void> { 808 // Make sure other thread gets time to exit its EventLoop. 809 delay(); 810 delay(); 811 delay(); 812 fulfiller->fulfill(); 813 return kj::READY_NOW; 814 }).detach([&](kj::Exception&& e) { 815 KJ_LOG(ERROR, e); 816 }); 817 818 // Give the other thread a chance to wake up and start working on the event. 819 delay(); 820 821 // Now we'll destroy our EventLoop. That *should* cause detached promises to be destroyed, 822 // thereby cancelling it, before disabling our own executor. However, at one point in the 823 // past, our executor was shut down first, followed by destroying detached promises, which 824 // led to an abort because the other thread had no way to reply back to this thread. 825 } 826 827 *executor.lockExclusive() = nullptr; 828 })(); 829 } 830 831 KJ_TEST("cross-thread event cancel requested while destination thread being destroyed") { 832 // This exercises the code in Executor::Impl::disconnect() which tears down the list of 833 // cross-thread events which have already been canceled. At one point this code had a bug which 834 // would cause it to throw if any events were present in the cancel list. 835 836 MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread 837 Own<PromiseFulfiller<void>> fulfiller; // accessed only from the subthread 838 839 Thread thread([&]() noexcept { 840 KJ_XTHREAD_TEST_SETUP_LOOP; 841 842 auto paf = newPromiseAndFulfiller<void>(); 843 fulfiller = kj::mv(paf.fulfiller); 844 845 *executor.lockExclusive() = getCurrentThreadExecutor(); 846 847 // Wait for other thread to start a cross-thread task. 848 paf.promise.wait(waitScope); 849 850 // Let the other thread know, out-of-band, that the task is running, so that it can now request 851 // cancellation. We do this by setting `executor` to null (but we could also use some separate 852 // MutexGuarded conditional variable instead). 853 *executor.lockExclusive() = nullptr; 854 855 // Give other thread a chance to request cancellation of the promise. 856 delay(); 857 858 // now we exit the event loop 859 }); 860 861 ([&]() noexcept { 862 KJ_XTHREAD_TEST_SETUP_LOOP; 863 864 const Executor* exec; 865 { 866 auto lock = executor.lockExclusive(); 867 lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; }); 868 exec = &KJ_ASSERT_NONNULL(*lock); 869 } 870 871 KJ_EXPECT(exec->isLive()); 872 873 auto promise = exec->executeAsync([&]() -> Promise<void> { 874 fulfiller->fulfill(); 875 return kj::NEVER_DONE; 876 }); 877 878 // Wait for the other thread to signal to us that it has indeed started executing our task. 879 executor.lockExclusive().wait([](auto& val) { return val == nullptr; }); 880 881 // Cancel the promise. 882 promise = nullptr; 883 })(); 884 } 885 886 KJ_TEST("cross-thread fulfiller") { 887 MutexGuarded<Maybe<Own<PromiseFulfiller<int>>>> fulfillerMutex; 888 889 Thread thread([&]() noexcept { 890 KJ_XTHREAD_TEST_SETUP_LOOP; 891 892 auto paf = kj::newPromiseAndCrossThreadFulfiller<int>(); 893 *fulfillerMutex.lockExclusive() = kj::mv(paf.fulfiller); 894 895 int result = paf.promise.wait(waitScope); 896 KJ_EXPECT(result == 123); 897 }); 898 899 ([&]() noexcept { 900 KJ_XTHREAD_TEST_SETUP_LOOP; 901 902 Own<PromiseFulfiller<int>> fulfiller; 903 { 904 auto lock = fulfillerMutex.lockExclusive(); 905 lock.wait([&](auto& value) { return value != nullptr; }); 906 fulfiller = kj::mv(KJ_ASSERT_NONNULL(*lock)); 907 } 908 909 fulfiller->fulfill(123); 910 })(); 911 } 912 913 KJ_TEST("cross-thread fulfiller rejects") { 914 MutexGuarded<Maybe<Own<PromiseFulfiller<void>>>> fulfillerMutex; 915 916 Thread thread([&]() noexcept { 917 KJ_XTHREAD_TEST_SETUP_LOOP; 918 919 auto paf = kj::newPromiseAndCrossThreadFulfiller<void>(); 920 *fulfillerMutex.lockExclusive() = kj::mv(paf.fulfiller); 921 922 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("foo exception", paf.promise.wait(waitScope)); 923 }); 924 925 ([&]() noexcept { 926 KJ_XTHREAD_TEST_SETUP_LOOP; 927 928 Own<PromiseFulfiller<void>> fulfiller; 929 { 930 auto lock = fulfillerMutex.lockExclusive(); 931 lock.wait([&](auto& value) { return value != nullptr; }); 932 fulfiller = kj::mv(KJ_ASSERT_NONNULL(*lock)); 933 } 934 935 fulfiller->reject(KJ_EXCEPTION(FAILED, "foo exception")); 936 })(); 937 } 938 939 KJ_TEST("cross-thread fulfiller destroyed") { 940 MutexGuarded<Maybe<Own<PromiseFulfiller<void>>>> fulfillerMutex; 941 942 Thread thread([&]() noexcept { 943 KJ_XTHREAD_TEST_SETUP_LOOP; 944 945 auto paf = kj::newPromiseAndCrossThreadFulfiller<void>(); 946 *fulfillerMutex.lockExclusive() = kj::mv(paf.fulfiller); 947 948 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE( 949 "cross-thread PromiseFulfiller was destroyed without fulfilling the promise", 950 paf.promise.wait(waitScope)); 951 }); 952 953 ([&]() noexcept { 954 KJ_XTHREAD_TEST_SETUP_LOOP; 955 956 Own<PromiseFulfiller<void>> fulfiller; 957 { 958 auto lock = fulfillerMutex.lockExclusive(); 959 lock.wait([&](auto& value) { return value != nullptr; }); 960 fulfiller = kj::mv(KJ_ASSERT_NONNULL(*lock)); 961 } 962 963 fulfiller = nullptr; 964 })(); 965 } 966 967 KJ_TEST("cross-thread fulfiller canceled") { 968 MutexGuarded<Maybe<Own<PromiseFulfiller<void>>>> fulfillerMutex; 969 MutexGuarded<bool> done; 970 971 Thread thread([&]() noexcept { 972 KJ_XTHREAD_TEST_SETUP_LOOP; 973 974 auto paf = kj::newPromiseAndCrossThreadFulfiller<void>(); 975 { 976 auto lock = fulfillerMutex.lockExclusive(); 977 *lock = kj::mv(paf.fulfiller); 978 lock.wait([](auto& value) { return value == nullptr; }); 979 } 980 981 // cancel 982 paf.promise = nullptr; 983 984 { 985 auto lock = done.lockExclusive(); 986 lock.wait([](bool value) { return value; }); 987 } 988 }); 989 990 ([&]() noexcept { 991 KJ_XTHREAD_TEST_SETUP_LOOP; 992 993 Own<PromiseFulfiller<void>> fulfiller; 994 { 995 auto lock = fulfillerMutex.lockExclusive(); 996 lock.wait([&](auto& value) { return value != nullptr; }); 997 fulfiller = kj::mv(KJ_ASSERT_NONNULL(*lock)); 998 KJ_ASSERT(fulfiller->isWaiting()); 999 *lock = nullptr; 1000 } 1001 1002 // Should eventually show not waiting. 1003 while (fulfiller->isWaiting()) { 1004 delay(); 1005 } 1006 1007 *done.lockExclusive() = true; 1008 })(); 1009 } 1010 1011 KJ_TEST("cross-thread fulfiller multiple fulfills") { 1012 MutexGuarded<Maybe<Own<PromiseFulfiller<int>>>> fulfillerMutex; 1013 1014 Thread thread([&]() noexcept { 1015 KJ_XTHREAD_TEST_SETUP_LOOP; 1016 1017 auto paf = kj::newPromiseAndCrossThreadFulfiller<int>(); 1018 *fulfillerMutex.lockExclusive() = kj::mv(paf.fulfiller); 1019 1020 int result = paf.promise.wait(waitScope); 1021 KJ_EXPECT(result == 123); 1022 }); 1023 1024 auto func = [&]() noexcept { 1025 KJ_XTHREAD_TEST_SETUP_LOOP; 1026 1027 PromiseFulfiller<int>* fulfiller; 1028 { 1029 auto lock = fulfillerMutex.lockExclusive(); 1030 lock.wait([&](auto& value) { return value != nullptr; }); 1031 fulfiller = KJ_ASSERT_NONNULL(*lock).get(); 1032 } 1033 1034 fulfiller->fulfill(123); 1035 }; 1036 1037 kj::Thread thread1(func); 1038 kj::Thread thread2(func); 1039 kj::Thread thread3(func); 1040 kj::Thread thread4(func); 1041 } 1042 1043 } // namespace 1044 } // namespace kj