capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

async-xthread-test.c++ (32599B)


      1 // Copyright (c) 2019 Cloudflare, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #if _WIN32
     23 #include "win32-api-version.h"
     24 #endif
     25 
     26 #include "async.h"
     27 #include "debug.h"
     28 #include "thread.h"
     29 #include "mutex.h"
     30 #include <kj/test.h>
     31 
     32 #if _WIN32
     33 #include <windows.h>
     34 #include "windows-sanity.h"
     35 inline void delay() { Sleep(10); }
     36 #else
     37 #include <unistd.h>
     38 inline void delay() { usleep(10000); }
     39 #endif
     40 
     41 // This file is #included from async-unix-xthread-test.c++ and async-win32-xthread-test.c++ after
     42 // defining KJ_XTHREAD_TEST_SETUP_LOOP to set up a loop with the corresponding EventPort.
     43 #ifndef KJ_XTHREAD_TEST_SETUP_LOOP
     44 #define KJ_XTHREAD_TEST_SETUP_LOOP \
     45   EventLoop loop; \
     46   WaitScope waitScope(loop)
     47 #endif
     48 
     49 namespace kj {
     50 namespace {
     51 
     52 KJ_TEST("synchonous simple cross-thread events") {
     53   MutexGuarded<kj::Maybe<const Executor&>> executor;  // to get the Executor from the other thread
     54   Own<PromiseFulfiller<uint>> fulfiller;  // accessed only from the subthread
     55   thread_local bool isChild = false;  // to assert which thread we're in
     56 
     57   // We use `noexcept` so that any uncaught exceptions immediately terminate the process without
     58   // unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
     59   // the other thread.
     60   Thread thread([&]() noexcept {
     61     isChild = true;
     62 
     63     KJ_XTHREAD_TEST_SETUP_LOOP;
     64 
     65     auto paf = newPromiseAndFulfiller<uint>();
     66     fulfiller = kj::mv(paf.fulfiller);
     67 
     68     *executor.lockExclusive() = getCurrentThreadExecutor();
     69 
     70     KJ_ASSERT(paf.promise.wait(waitScope) == 123);
     71 
     72     // Wait until parent thread sets executor to null, as a way to tell us to quit.
     73     executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
     74   });
     75 
     76   ([&]() noexcept {
     77     const Executor* exec;
     78     {
     79       auto lock = executor.lockExclusive();
     80       lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
     81       exec = &KJ_ASSERT_NONNULL(*lock);
     82     }
     83 
     84     KJ_ASSERT(!isChild);
     85 
     86     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test exception", exec->executeSync([&]() {
     87       KJ_ASSERT(isChild);
     88       KJ_FAIL_ASSERT("test exception") { break; }
     89     }));
     90 
     91     uint i = exec->executeSync([&]() {
     92       KJ_ASSERT(isChild);
     93       fulfiller->fulfill(123);
     94       return 456;
     95     });
     96     KJ_EXPECT(i == 456);
     97 
     98     *executor.lockExclusive() = nullptr;
     99   })();
    100 }
    101 
    102 KJ_TEST("asynchonous simple cross-thread events") {
    103   MutexGuarded<kj::Maybe<const Executor&>> executor;  // to get the Executor from the other thread
    104   Own<PromiseFulfiller<uint>> fulfiller;  // accessed only from the subthread
    105   thread_local bool isChild = false;  // to assert which thread we're in
    106 
    107   // We use `noexcept` so that any uncaught exceptions immediately terminate the process without
    108   // unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
    109   // the other thread.
    110   Thread thread([&]() noexcept {
    111     isChild = true;
    112 
    113     KJ_XTHREAD_TEST_SETUP_LOOP;
    114 
    115     auto paf = newPromiseAndFulfiller<uint>();
    116     fulfiller = kj::mv(paf.fulfiller);
    117 
    118     *executor.lockExclusive() = getCurrentThreadExecutor();
    119 
    120     KJ_ASSERT(paf.promise.wait(waitScope) == 123);
    121 
    122     // Wait until parent thread sets executor to null, as a way to tell us to quit.
    123     executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
    124   });
    125 
    126   ([&]() noexcept {
    127     KJ_XTHREAD_TEST_SETUP_LOOP;
    128 
    129     const Executor* exec;
    130     {
    131       auto lock = executor.lockExclusive();
    132       lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
    133       exec = &KJ_ASSERT_NONNULL(*lock);
    134     }
    135 
    136     KJ_ASSERT(!isChild);
    137 
    138     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test exception", exec->executeAsync([&]() {
    139       KJ_ASSERT(isChild);
    140       KJ_FAIL_ASSERT("test exception") { break; }
    141     }).wait(waitScope));
    142 
    143     Promise<uint> promise = exec->executeAsync([&]() {
    144       KJ_ASSERT(isChild);
    145       fulfiller->fulfill(123);
    146       return 456u;
    147     });
    148     KJ_EXPECT(promise.wait(waitScope) == 456);
    149 
    150     *executor.lockExclusive() = nullptr;
    151   })();
    152 }
    153 
    154 KJ_TEST("synchonous promise cross-thread events") {
    155   MutexGuarded<kj::Maybe<const Executor&>> executor;  // to get the Executor from the other thread
    156   Own<PromiseFulfiller<uint>> fulfiller;  // accessed only from the subthread
    157   Promise<uint> promise = nullptr;  // accessed only from the subthread
    158   thread_local bool isChild = false;  // to assert which thread we're in
    159 
    160   // We use `noexcept` so that any uncaught exceptions immediately terminate the process without
    161   // unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
    162   // the other thread.
    163   Thread thread([&]() noexcept {
    164     isChild = true;
    165 
    166     KJ_XTHREAD_TEST_SETUP_LOOP;
    167 
    168     auto paf = newPromiseAndFulfiller<uint>();
    169     fulfiller = kj::mv(paf.fulfiller);
    170 
    171     auto paf2 = newPromiseAndFulfiller<uint>();
    172     promise = kj::mv(paf2.promise);
    173 
    174     *executor.lockExclusive() = getCurrentThreadExecutor();
    175 
    176     KJ_ASSERT(paf.promise.wait(waitScope) == 123);
    177 
    178     paf2.fulfiller->fulfill(321);
    179 
    180     // Make sure reply gets sent.
    181     loop.run();
    182 
    183     // Wait until parent thread sets executor to null, as a way to tell us to quit.
    184     executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
    185   });
    186 
    187   ([&]() noexcept {
    188     const Executor* exec;
    189     {
    190       auto lock = executor.lockExclusive();
    191       lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
    192       exec = &KJ_ASSERT_NONNULL(*lock);
    193     }
    194 
    195     KJ_ASSERT(!isChild);
    196 
    197     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test exception", exec->executeSync([&]() {
    198       KJ_ASSERT(isChild);
    199       return kj::Promise<void>(KJ_EXCEPTION(FAILED, "test exception"));
    200     }));
    201 
    202     uint i = exec->executeSync([&]() {
    203       KJ_ASSERT(isChild);
    204       fulfiller->fulfill(123);
    205       return kj::mv(promise);
    206     });
    207     KJ_EXPECT(i == 321);
    208 
    209     *executor.lockExclusive() = nullptr;
    210   })();
    211 }
    212 
    213 KJ_TEST("asynchonous promise cross-thread events") {
    214   MutexGuarded<kj::Maybe<const Executor&>> executor;  // to get the Executor from the other thread
    215   Own<PromiseFulfiller<uint>> fulfiller;  // accessed only from the subthread
    216   Promise<uint> promise = nullptr;  // accessed only from the subthread
    217   thread_local bool isChild = false;  // to assert which thread we're in
    218 
    219   // We use `noexcept` so that any uncaught exceptions immediately terminate the process without
    220   // unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
    221   // the other thread.
    222   Thread thread([&]() noexcept {
    223     isChild = true;
    224 
    225     KJ_XTHREAD_TEST_SETUP_LOOP;
    226 
    227     auto paf = newPromiseAndFulfiller<uint>();
    228     fulfiller = kj::mv(paf.fulfiller);
    229 
    230     auto paf2 = newPromiseAndFulfiller<uint>();
    231     promise = kj::mv(paf2.promise);
    232 
    233     *executor.lockExclusive() = getCurrentThreadExecutor();
    234 
    235     KJ_ASSERT(paf.promise.wait(waitScope) == 123);
    236 
    237     paf2.fulfiller->fulfill(321);
    238 
    239     // Make sure reply gets sent.
    240     loop.run();
    241 
    242     // Wait until parent thread sets executor to null, as a way to tell us to quit.
    243     executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
    244   });
    245 
    246   ([&]() noexcept {
    247     KJ_XTHREAD_TEST_SETUP_LOOP;
    248 
    249     const Executor* exec;
    250     {
    251       auto lock = executor.lockExclusive();
    252       lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
    253       exec = &KJ_ASSERT_NONNULL(*lock);
    254     }
    255 
    256     KJ_ASSERT(!isChild);
    257 
    258     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test exception", exec->executeAsync([&]() {
    259       KJ_ASSERT(isChild);
    260       return kj::Promise<void>(KJ_EXCEPTION(FAILED, "test exception"));
    261     }).wait(waitScope));
    262 
    263     Promise<uint> promise2 = exec->executeAsync([&]() {
    264       KJ_ASSERT(isChild);
    265       fulfiller->fulfill(123);
    266       return kj::mv(promise);
    267     });
    268     KJ_EXPECT(promise2.wait(waitScope) == 321);
    269 
    270     *executor.lockExclusive() = nullptr;
    271   })();
    272 }
    273 
    274 KJ_TEST("cancel cross-thread event before it runs") {
    275   MutexGuarded<kj::Maybe<const Executor&>> executor;  // to get the Executor from the other thread
    276 
    277   // We use `noexcept` so that any uncaught exceptions immediately terminate the process without
    278   // unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
    279   // the other thread.
    280   Thread thread([&]() noexcept {
    281     KJ_XTHREAD_TEST_SETUP_LOOP;
    282 
    283     *executor.lockExclusive() = getCurrentThreadExecutor();
    284 
    285     // We never run the loop here, so that when the event is canceled, it's still queued.
    286 
    287     // Wait until parent thread sets executor to null, as a way to tell us to quit.
    288     executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
    289   });
    290 
    291   ([&]() noexcept {
    292     KJ_XTHREAD_TEST_SETUP_LOOP;
    293 
    294     const Executor* exec;
    295     {
    296       auto lock = executor.lockExclusive();
    297       lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
    298       exec = &KJ_ASSERT_NONNULL(*lock);
    299     }
    300 
    301     volatile bool called = false;
    302     {
    303       Promise<uint> promise = exec->executeAsync([&]() { called = true; return 123u; });
    304       delay();
    305       KJ_EXPECT(!promise.poll(waitScope));
    306     }
    307     KJ_EXPECT(!called);
    308 
    309     *executor.lockExclusive() = nullptr;
    310   })();
    311 }
    312 
    313 KJ_TEST("cancel cross-thread event while it runs") {
    314   MutexGuarded<kj::Maybe<const Executor&>> executor;  // to get the Executor from the other thread
    315   Own<PromiseFulfiller<void>> fulfiller;  // accessed only from the subthread
    316 
    317   // We use `noexcept` so that any uncaught exceptions immediately terminate the process without
    318   // unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
    319   // the other thread.
    320   Thread thread([&]() noexcept {
    321     KJ_XTHREAD_TEST_SETUP_LOOP;
    322 
    323     auto paf = newPromiseAndFulfiller<void>();
    324     fulfiller = kj::mv(paf.fulfiller);
    325 
    326     *executor.lockExclusive() = getCurrentThreadExecutor();
    327 
    328     paf.promise.wait(waitScope);
    329 
    330     // Wait until parent thread sets executor to null, as a way to tell us to quit.
    331     executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
    332   });
    333 
    334   ([&]() noexcept {
    335     KJ_XTHREAD_TEST_SETUP_LOOP;
    336 
    337     const Executor* exec;
    338     {
    339       auto lock = executor.lockExclusive();
    340       lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
    341       exec = &KJ_ASSERT_NONNULL(*lock);
    342     }
    343 
    344     {
    345       volatile bool called = false;
    346       Promise<uint> promise = exec->executeAsync([&]() -> kj::Promise<uint> {
    347         called = true;
    348         return kj::NEVER_DONE;
    349       });
    350       while (!called) {
    351         delay();
    352       }
    353       KJ_EXPECT(!promise.poll(waitScope));
    354     }
    355 
    356     exec->executeSync([&]() { fulfiller->fulfill(); });
    357 
    358     *executor.lockExclusive() = nullptr;
    359   })();
    360 }
    361 
    362 KJ_TEST("cross-thread cancellation in both directions at once") {
    363   MutexGuarded<kj::Maybe<const Executor&>> childExecutor;
    364   MutexGuarded<kj::Maybe<const Executor&>> parentExecutor;
    365 
    366   MutexGuarded<uint> readyCount(0);
    367 
    368   thread_local uint threadNumber = 0;
    369   thread_local bool receivedFinalCall = false;
    370 
    371   // Code to execute simultaneously in two threads...
    372   // We mark this noexcept so that any exceptions thrown will immediately invoke the termination
    373   // handler, skipping any destructors that would deadlock.
    374   auto simultaneous = [&](MutexGuarded<kj::Maybe<const Executor&>>& selfExecutor,
    375                           MutexGuarded<kj::Maybe<const Executor&>>& otherExecutor,
    376                           uint threadCount) noexcept {
    377     KJ_XTHREAD_TEST_SETUP_LOOP;
    378 
    379     *selfExecutor.lockExclusive() = getCurrentThreadExecutor();
    380 
    381     const Executor* exec;
    382     {
    383       auto lock = otherExecutor.lockExclusive();
    384       lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
    385       exec = &KJ_ASSERT_NONNULL(*lock);
    386     }
    387 
    388     // Create a ton of cross-thread promises to cancel.
    389     Vector<Promise<void>> promises;
    390     for (uint i = 0; i < 1000; i++) {
    391       promises.add(exec->executeAsync([&]() -> kj::Promise<void> {
    392         return kj::Promise<void>(kj::NEVER_DONE)
    393             .attach(kj::defer([wasThreadNumber = threadNumber]() {
    394           // Make sure destruction happens in the correct thread.
    395           KJ_ASSERT(threadNumber == wasThreadNumber);
    396         }));
    397       }));
    398     }
    399 
    400     // Signal other thread that we're done queueing, and wait for it to signal same.
    401     {
    402       auto lock = readyCount.lockExclusive();
    403       ++*lock;
    404       lock.wait([&](uint i) { return i >= threadCount; });
    405     }
    406 
    407     // Run event loop to start all executions queued by the other thread.
    408     waitScope.poll();
    409     loop.run();
    410 
    411     // Signal other thread that we've run the loop, and wait for it to signal same.
    412     {
    413       auto lock = readyCount.lockExclusive();
    414       ++*lock;
    415       lock.wait([&](uint i) { return i >= threadCount * 2; });
    416     }
    417 
    418     // Cancel all the promises.
    419     promises.clear();
    420 
    421     // All our cancellations completed, but the other thread may still be waiting for some
    422     // cancellations from us. We need to pump our event loop to make sure we continue handling
    423     // those cancellation requests. In particular we'll queue a function to the other thread and
    424     // wait for it to complete. The other thread will queue its own function to this thread just
    425     // before completing the function we queued to it.
    426     receivedFinalCall = false;
    427     exec->executeAsync([&]() { receivedFinalCall = true; }).wait(waitScope);
    428 
    429     // To be safe, make sure we've actually executed the function that the other thread queued to
    430     // us by repeatedly polling until `receivedFinalCall` becomes true in this thread.
    431     while (!receivedFinalCall) {
    432       waitScope.poll();
    433       loop.run();
    434     }
    435 
    436     // OK, signal other that we're all done.
    437     *otherExecutor.lockExclusive() = nullptr;
    438 
    439     // Wait until other thread sets executor to null, as a way to tell us to quit.
    440     selfExecutor.lockExclusive().wait([](auto& val) { return val == nullptr; });
    441   };
    442 
    443   {
    444     Thread thread([&]() {
    445       threadNumber = 1;
    446       simultaneous(childExecutor, parentExecutor, 2);
    447     });
    448 
    449     threadNumber = 0;
    450     simultaneous(parentExecutor, childExecutor, 2);
    451   }
    452 
    453   // Let's even have a three-thread version, with cyclic cancellation requests.
    454   MutexGuarded<kj::Maybe<const Executor&>> child2Executor;
    455   *readyCount.lockExclusive() = 0;
    456 
    457   {
    458     Thread thread1([&]() {
    459       threadNumber = 1;
    460       simultaneous(childExecutor, child2Executor, 3);
    461     });
    462 
    463     Thread thread2([&]() {
    464       threadNumber = 2;
    465       simultaneous(child2Executor, parentExecutor, 3);
    466     });
    467 
    468     threadNumber = 0;
    469     simultaneous(parentExecutor, childExecutor, 3);
    470   }
    471 }
    472 
    473 KJ_TEST("cross-thread cancellation cycle") {
    474   // Another multi-way cancellation test where we set up an actual cycle between three threads
    475   // waiting on each other to complete a single event.
    476 
    477   MutexGuarded<kj::Maybe<const Executor&>> child1Executor, child2Executor;
    478 
    479   Own<PromiseFulfiller<void>> fulfiller1, fulfiller2;
    480 
    481   auto threadMain = [](MutexGuarded<kj::Maybe<const Executor&>>& executor,
    482                        Own<PromiseFulfiller<void>>& fulfiller) noexcept {
    483     KJ_XTHREAD_TEST_SETUP_LOOP;
    484 
    485     auto paf = newPromiseAndFulfiller<void>();
    486     fulfiller = kj::mv(paf.fulfiller);
    487 
    488     *executor.lockExclusive() = getCurrentThreadExecutor();
    489 
    490     paf.promise.wait(waitScope);
    491 
    492     // Wait until parent thread sets executor to null, as a way to tell us to quit.
    493     executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
    494   };
    495 
    496   Thread thread1([&]() noexcept { threadMain(child1Executor, fulfiller1); });
    497   Thread thread2([&]() noexcept { threadMain(child2Executor, fulfiller2); });
    498 
    499   ([&]() noexcept {
    500     KJ_XTHREAD_TEST_SETUP_LOOP;
    501     auto& parentExecutor = getCurrentThreadExecutor();
    502 
    503     const Executor* exec1;
    504     {
    505       auto lock = child1Executor.lockExclusive();
    506       lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
    507       exec1 = &KJ_ASSERT_NONNULL(*lock);
    508     }
    509     const Executor* exec2;
    510     {
    511       auto lock = child2Executor.lockExclusive();
    512       lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
    513       exec2 = &KJ_ASSERT_NONNULL(*lock);
    514     }
    515 
    516     // Create an event that cycles through both threads and back to this one, and then cancel it.
    517     bool cycleAllDestroyed = false;
    518     {
    519       auto paf = kj::newPromiseAndFulfiller<void>();
    520       Promise<uint> promise = exec1->executeAsync([&]() -> kj::Promise<uint> {
    521         return exec2->executeAsync([&]() -> kj::Promise<uint> {
    522           return parentExecutor.executeAsync([&]() -> kj::Promise<uint> {
    523             paf.fulfiller->fulfill();
    524             return kj::Promise<uint>(kj::NEVER_DONE).attach(kj::defer([&]() {
    525               cycleAllDestroyed = true;
    526             }));
    527           });
    528         });
    529       });
    530 
    531       // Wait until the cycle has come all the way around.
    532       paf.promise.wait(waitScope);
    533 
    534       KJ_EXPECT(!promise.poll(waitScope));
    535     }
    536 
    537     KJ_EXPECT(cycleAllDestroyed);
    538 
    539     exec1->executeSync([&]() { fulfiller1->fulfill(); });
    540     exec2->executeSync([&]() { fulfiller2->fulfill(); });
    541 
    542     *child1Executor.lockExclusive() = nullptr;
    543     *child2Executor.lockExclusive() = nullptr;
    544   })();
    545 }
    546 
    547 KJ_TEST("call own thread's executor") {
    548   KJ_XTHREAD_TEST_SETUP_LOOP;
    549 
    550   auto& executor = getCurrentThreadExecutor();
    551 
    552   {
    553     uint i = executor.executeSync([]() {
    554       return 123u;
    555     });
    556     KJ_EXPECT(i == 123);
    557   }
    558 
    559   KJ_EXPECT_THROW_MESSAGE(
    560       "can't call executeSync() on own thread's executor with a promise-returning function",
    561       executor.executeSync([]() { return kj::evalLater([]() {}); }));
    562 
    563   {
    564     uint i = executor.executeAsync([]() {
    565       return 123u;
    566     }).wait(waitScope);
    567     KJ_EXPECT(i == 123);
    568   }
    569 }
    570 
    571 KJ_TEST("synchronous cross-thread event disconnected") {
    572   MutexGuarded<kj::Maybe<const Executor&>> executor;  // to get the Executor from the other thread
    573   Own<PromiseFulfiller<void>> fulfiller;  // accessed only from the subthread
    574   thread_local bool isChild = false;  // to assert which thread we're in
    575 
    576   Thread thread([&]() noexcept {
    577     isChild = true;
    578 
    579     {
    580       KJ_XTHREAD_TEST_SETUP_LOOP;
    581 
    582       auto paf = newPromiseAndFulfiller<void>();
    583       fulfiller = kj::mv(paf.fulfiller);
    584 
    585       *executor.lockExclusive() = getCurrentThreadExecutor();
    586 
    587       paf.promise.wait(waitScope);
    588 
    589       // Exit the event loop!
    590     }
    591 
    592     // Wait until parent thread sets executor to null, as a way to tell us to quit.
    593     executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
    594   });
    595 
    596   ([&]() noexcept {
    597     Own<const Executor> exec;
    598     {
    599       auto lock = executor.lockExclusive();
    600       lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
    601       exec = KJ_ASSERT_NONNULL(*lock).addRef();
    602     }
    603 
    604     KJ_EXPECT(!isChild);
    605 
    606     KJ_EXPECT(exec->isLive());
    607 
    608     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE(
    609         "Executor's event loop exited before cross-thread event could complete",
    610         exec->executeSync([&]() -> Promise<void> {
    611           fulfiller->fulfill();
    612           return kj::NEVER_DONE;
    613         }));
    614 
    615     KJ_EXPECT(!exec->isLive());
    616 
    617     KJ_EXPECT_THROW_MESSAGE(
    618         "Executor's event loop has exited",
    619         exec->executeSync([&]() {}));
    620 
    621     *executor.lockExclusive() = nullptr;
    622   })();
    623 }
    624 
    625 KJ_TEST("asynchronous cross-thread event disconnected") {
    626   MutexGuarded<kj::Maybe<const Executor&>> executor;  // to get the Executor from the other thread
    627   Own<PromiseFulfiller<void>> fulfiller;  // accessed only from the subthread
    628   thread_local bool isChild = false;  // to assert which thread we're in
    629 
    630   Thread thread([&]() noexcept {
    631     isChild = true;
    632 
    633     {
    634       KJ_XTHREAD_TEST_SETUP_LOOP;
    635 
    636       auto paf = newPromiseAndFulfiller<void>();
    637       fulfiller = kj::mv(paf.fulfiller);
    638 
    639       *executor.lockExclusive() = getCurrentThreadExecutor();
    640 
    641       paf.promise.wait(waitScope);
    642 
    643       // Exit the event loop!
    644     }
    645 
    646     // Wait until parent thread sets executor to null, as a way to tell us to quit.
    647     executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
    648   });
    649 
    650   ([&]() noexcept {
    651     KJ_XTHREAD_TEST_SETUP_LOOP;
    652 
    653     Own<const Executor> exec;
    654     {
    655       auto lock = executor.lockExclusive();
    656       lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
    657       exec = KJ_ASSERT_NONNULL(*lock).addRef();
    658     }
    659 
    660     KJ_EXPECT(!isChild);
    661 
    662     KJ_EXPECT(exec->isLive());
    663 
    664     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE(
    665         "Executor's event loop exited before cross-thread event could complete",
    666         exec->executeAsync([&]() -> Promise<void> {
    667           fulfiller->fulfill();
    668           return kj::NEVER_DONE;
    669         }).wait(waitScope));
    670 
    671     KJ_EXPECT(!exec->isLive());
    672 
    673     KJ_EXPECT_THROW_MESSAGE(
    674         "Executor's event loop has exited",
    675         exec->executeAsync([&]() {}).wait(waitScope));
    676 
    677     *executor.lockExclusive() = nullptr;
    678   })();
    679 }
    680 
    681 KJ_TEST("cross-thread event disconnected before it runs") {
    682   MutexGuarded<kj::Maybe<const Executor&>> executor;  // to get the Executor from the other thread
    683   thread_local bool isChild = false;  // to assert which thread we're in
    684 
    685   Thread thread([&]() noexcept {
    686     isChild = true;
    687 
    688     KJ_XTHREAD_TEST_SETUP_LOOP;
    689 
    690     *executor.lockExclusive() = getCurrentThreadExecutor();
    691 
    692     // Don't actually run the event loop. Destroy it when the other thread signals us to.
    693     executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
    694   });
    695 
    696   ([&]() noexcept {
    697     KJ_XTHREAD_TEST_SETUP_LOOP;
    698 
    699     Own<const Executor> exec;
    700     {
    701       auto lock = executor.lockExclusive();
    702       lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
    703       exec = KJ_ASSERT_NONNULL(*lock).addRef();
    704     }
    705 
    706     KJ_EXPECT(!isChild);
    707 
    708     KJ_EXPECT(exec->isLive());
    709 
    710     auto promise = exec->executeAsync([&]() {
    711       KJ_LOG(ERROR, "shouldn't have executed");
    712     });
    713     KJ_EXPECT(!promise.poll(waitScope));
    714 
    715     *executor.lockExclusive() = nullptr;
    716 
    717     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE(
    718         "Executor's event loop exited before cross-thread event could complete",
    719         promise.wait(waitScope));
    720 
    721     KJ_EXPECT(!exec->isLive());
    722   })();
    723 }
    724 
    725 KJ_TEST("cross-thread event disconnected without holding Executor ref") {
    726   MutexGuarded<kj::Maybe<const Executor&>> executor;  // to get the Executor from the other thread
    727   Own<PromiseFulfiller<void>> fulfiller;  // accessed only from the subthread
    728   thread_local bool isChild = false;  // to assert which thread we're in
    729 
    730   Thread thread([&]() noexcept {
    731     isChild = true;
    732 
    733     {
    734       KJ_XTHREAD_TEST_SETUP_LOOP;
    735 
    736       auto paf = newPromiseAndFulfiller<void>();
    737       fulfiller = kj::mv(paf.fulfiller);
    738 
    739       *executor.lockExclusive() = getCurrentThreadExecutor();
    740 
    741       paf.promise.wait(waitScope);
    742 
    743       // Exit the event loop!
    744     }
    745 
    746     // Wait until parent thread sets executor to null, as a way to tell us to quit.
    747     executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
    748   });
    749 
    750   ([&]() noexcept {
    751     const Executor* exec;
    752     {
    753       auto lock = executor.lockExclusive();
    754       lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
    755       exec = &KJ_ASSERT_NONNULL(*lock);
    756     }
    757 
    758     KJ_EXPECT(!isChild);
    759 
    760     KJ_EXPECT(exec->isLive());
    761 
    762     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE(
    763         "Executor's event loop exited before cross-thread event could complete",
    764         exec->executeSync([&]() -> Promise<void> {
    765           fulfiller->fulfill();
    766           return kj::NEVER_DONE;
    767         }));
    768 
    769     // Can't check `exec->isLive()` because it's been destroyed by now.
    770 
    771     *executor.lockExclusive() = nullptr;
    772   })();
    773 }
    774 
    775 KJ_TEST("detached cross-thread event doesn't cause crash") {
    776   MutexGuarded<kj::Maybe<const Executor&>> executor;  // to get the Executor from the other thread
    777   Own<PromiseFulfiller<void>> fulfiller;  // accessed only from the subthread
    778 
    779   Thread thread([&]() noexcept {
    780     KJ_XTHREAD_TEST_SETUP_LOOP;
    781 
    782     auto paf = newPromiseAndFulfiller<void>();
    783     fulfiller = kj::mv(paf.fulfiller);
    784 
    785     *executor.lockExclusive() = getCurrentThreadExecutor();
    786 
    787     paf.promise.wait(waitScope);
    788 
    789     // Without this poll(), we don't attempt to reply to the other thread? But this isn't required
    790     // in other tests, for some reason? Oh well.
    791     waitScope.poll();
    792 
    793     executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
    794   });
    795 
    796   ([&]() noexcept {
    797     {
    798       KJ_XTHREAD_TEST_SETUP_LOOP;
    799 
    800       const Executor* exec;
    801       {
    802         auto lock = executor.lockExclusive();
    803         lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
    804         exec = &KJ_ASSERT_NONNULL(*lock);
    805       }
    806 
    807       exec->executeAsync([&]() -> kj::Promise<void> {
    808         // Make sure other thread gets time to exit its EventLoop.
    809         delay();
    810         delay();
    811         delay();
    812         fulfiller->fulfill();
    813         return kj::READY_NOW;
    814       }).detach([&](kj::Exception&& e) {
    815         KJ_LOG(ERROR, e);
    816       });
    817 
    818       // Give the other thread a chance to wake up and start working on the event.
    819       delay();
    820 
    821       // Now we'll destroy our EventLoop. That *should* cause detached promises to be destroyed,
    822       // thereby cancelling it, before disabling our own executor. However, at one point in the
    823       // past, our executor was shut down first, followed by destroying detached promises, which
    824       // led to an abort because the other thread had no way to reply back to this thread.
    825     }
    826 
    827     *executor.lockExclusive() = nullptr;
    828   })();
    829 }
    830 
    831 KJ_TEST("cross-thread event cancel requested while destination thread being destroyed") {
    832   // This exercises the code in Executor::Impl::disconnect() which tears down the list of
    833   // cross-thread events which have already been canceled. At one point this code had a bug which
    834   // would cause it to throw if any events were present in the cancel list.
    835 
    836   MutexGuarded<kj::Maybe<const Executor&>> executor;  // to get the Executor from the other thread
    837   Own<PromiseFulfiller<void>> fulfiller;  // accessed only from the subthread
    838 
    839   Thread thread([&]() noexcept {
    840     KJ_XTHREAD_TEST_SETUP_LOOP;
    841 
    842     auto paf = newPromiseAndFulfiller<void>();
    843     fulfiller = kj::mv(paf.fulfiller);
    844 
    845     *executor.lockExclusive() = getCurrentThreadExecutor();
    846 
    847     // Wait for other thread to start a cross-thread task.
    848     paf.promise.wait(waitScope);
    849 
    850     // Let the other thread know, out-of-band, that the task is running, so that it can now request
    851     // cancellation. We do this by setting `executor` to null (but we could also use some separate
    852     // MutexGuarded conditional variable instead).
    853     *executor.lockExclusive() = nullptr;
    854 
    855     // Give other thread a chance to request cancellation of the promise.
    856     delay();
    857 
    858     // now we exit the event loop
    859   });
    860 
    861   ([&]() noexcept {
    862     KJ_XTHREAD_TEST_SETUP_LOOP;
    863 
    864     const Executor* exec;
    865     {
    866       auto lock = executor.lockExclusive();
    867       lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
    868       exec = &KJ_ASSERT_NONNULL(*lock);
    869     }
    870 
    871     KJ_EXPECT(exec->isLive());
    872 
    873     auto promise = exec->executeAsync([&]() -> Promise<void> {
    874       fulfiller->fulfill();
    875       return kj::NEVER_DONE;
    876     });
    877 
    878     // Wait for the other thread to signal to us that it has indeed started executing our task.
    879     executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
    880 
    881     // Cancel the promise.
    882     promise = nullptr;
    883   })();
    884 }
    885 
    886 KJ_TEST("cross-thread fulfiller") {
    887   MutexGuarded<Maybe<Own<PromiseFulfiller<int>>>> fulfillerMutex;
    888 
    889   Thread thread([&]() noexcept {
    890     KJ_XTHREAD_TEST_SETUP_LOOP;
    891 
    892     auto paf = kj::newPromiseAndCrossThreadFulfiller<int>();
    893     *fulfillerMutex.lockExclusive() = kj::mv(paf.fulfiller);
    894 
    895     int result = paf.promise.wait(waitScope);
    896     KJ_EXPECT(result == 123);
    897   });
    898 
    899   ([&]() noexcept {
    900     KJ_XTHREAD_TEST_SETUP_LOOP;
    901 
    902     Own<PromiseFulfiller<int>> fulfiller;
    903     {
    904       auto lock = fulfillerMutex.lockExclusive();
    905       lock.wait([&](auto& value) { return value != nullptr; });
    906       fulfiller = kj::mv(KJ_ASSERT_NONNULL(*lock));
    907     }
    908 
    909     fulfiller->fulfill(123);
    910   })();
    911 }
    912 
    913 KJ_TEST("cross-thread fulfiller rejects") {
    914   MutexGuarded<Maybe<Own<PromiseFulfiller<void>>>> fulfillerMutex;
    915 
    916   Thread thread([&]() noexcept {
    917     KJ_XTHREAD_TEST_SETUP_LOOP;
    918 
    919     auto paf = kj::newPromiseAndCrossThreadFulfiller<void>();
    920     *fulfillerMutex.lockExclusive() = kj::mv(paf.fulfiller);
    921 
    922     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("foo exception", paf.promise.wait(waitScope));
    923   });
    924 
    925   ([&]() noexcept {
    926     KJ_XTHREAD_TEST_SETUP_LOOP;
    927 
    928     Own<PromiseFulfiller<void>> fulfiller;
    929     {
    930       auto lock = fulfillerMutex.lockExclusive();
    931       lock.wait([&](auto& value) { return value != nullptr; });
    932       fulfiller = kj::mv(KJ_ASSERT_NONNULL(*lock));
    933     }
    934 
    935     fulfiller->reject(KJ_EXCEPTION(FAILED, "foo exception"));
    936   })();
    937 }
    938 
    939 KJ_TEST("cross-thread fulfiller destroyed") {
    940   MutexGuarded<Maybe<Own<PromiseFulfiller<void>>>> fulfillerMutex;
    941 
    942   Thread thread([&]() noexcept {
    943     KJ_XTHREAD_TEST_SETUP_LOOP;
    944 
    945     auto paf = kj::newPromiseAndCrossThreadFulfiller<void>();
    946     *fulfillerMutex.lockExclusive() = kj::mv(paf.fulfiller);
    947 
    948     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE(
    949         "cross-thread PromiseFulfiller was destroyed without fulfilling the promise",
    950         paf.promise.wait(waitScope));
    951   });
    952 
    953   ([&]() noexcept {
    954     KJ_XTHREAD_TEST_SETUP_LOOP;
    955 
    956     Own<PromiseFulfiller<void>> fulfiller;
    957     {
    958       auto lock = fulfillerMutex.lockExclusive();
    959       lock.wait([&](auto& value) { return value != nullptr; });
    960       fulfiller = kj::mv(KJ_ASSERT_NONNULL(*lock));
    961     }
    962 
    963     fulfiller = nullptr;
    964   })();
    965 }
    966 
    967 KJ_TEST("cross-thread fulfiller canceled") {
    968   MutexGuarded<Maybe<Own<PromiseFulfiller<void>>>> fulfillerMutex;
    969   MutexGuarded<bool> done;
    970 
    971   Thread thread([&]() noexcept {
    972     KJ_XTHREAD_TEST_SETUP_LOOP;
    973 
    974     auto paf = kj::newPromiseAndCrossThreadFulfiller<void>();
    975     {
    976       auto lock = fulfillerMutex.lockExclusive();
    977       *lock = kj::mv(paf.fulfiller);
    978       lock.wait([](auto& value) { return value == nullptr; });
    979     }
    980 
    981     // cancel
    982     paf.promise = nullptr;
    983 
    984     {
    985       auto lock = done.lockExclusive();
    986       lock.wait([](bool value) { return value; });
    987     }
    988   });
    989 
    990   ([&]() noexcept {
    991     KJ_XTHREAD_TEST_SETUP_LOOP;
    992 
    993     Own<PromiseFulfiller<void>> fulfiller;
    994     {
    995       auto lock = fulfillerMutex.lockExclusive();
    996       lock.wait([&](auto& value) { return value != nullptr; });
    997       fulfiller = kj::mv(KJ_ASSERT_NONNULL(*lock));
    998       KJ_ASSERT(fulfiller->isWaiting());
    999       *lock = nullptr;
   1000     }
   1001 
   1002     // Should eventually show not waiting.
   1003     while (fulfiller->isWaiting()) {
   1004       delay();
   1005     }
   1006 
   1007     *done.lockExclusive() = true;
   1008   })();
   1009 }
   1010 
   1011 KJ_TEST("cross-thread fulfiller multiple fulfills") {
   1012   MutexGuarded<Maybe<Own<PromiseFulfiller<int>>>> fulfillerMutex;
   1013 
   1014   Thread thread([&]() noexcept {
   1015     KJ_XTHREAD_TEST_SETUP_LOOP;
   1016 
   1017     auto paf = kj::newPromiseAndCrossThreadFulfiller<int>();
   1018     *fulfillerMutex.lockExclusive() = kj::mv(paf.fulfiller);
   1019 
   1020     int result = paf.promise.wait(waitScope);
   1021     KJ_EXPECT(result == 123);
   1022   });
   1023 
   1024   auto func = [&]() noexcept {
   1025     KJ_XTHREAD_TEST_SETUP_LOOP;
   1026 
   1027     PromiseFulfiller<int>* fulfiller;
   1028     {
   1029       auto lock = fulfillerMutex.lockExclusive();
   1030       lock.wait([&](auto& value) { return value != nullptr; });
   1031       fulfiller = KJ_ASSERT_NONNULL(*lock).get();
   1032     }
   1033 
   1034     fulfiller->fulfill(123);
   1035   };
   1036 
   1037   kj::Thread thread1(func);
   1038   kj::Thread thread2(func);
   1039   kj::Thread thread3(func);
   1040   kj::Thread thread4(func);
   1041 }
   1042 
   1043 }  // namespace
   1044 }  // namespace kj