capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

async-test.c++ (35461B)


      1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #include "async.h"
     23 #include "debug.h"
     24 #include <kj/compat/gtest.h>
     25 #include "mutex.h"
     26 #include "thread.h"
     27 
     28 #if !KJ_USE_FIBERS
     29 #include <pthread.h>
     30 #endif
     31 
     32 namespace kj {
     33 namespace {
     34 
     35 #if !_MSC_VER || defined(__clang__)
     36 // TODO(msvc): GetFunctorStartAddress is not supported on MSVC currently, so skip the test.
     37 TEST(Async, GetFunctorStartAddress) {
     38   EXPECT_TRUE(nullptr != _::GetFunctorStartAddress<>::apply([](){return 0;}));
     39 }
     40 #endif
     41 
     42 TEST(Async, EvalVoid) {
     43   EventLoop loop;
     44   WaitScope waitScope(loop);
     45 
     46   bool done = false;
     47 
     48   Promise<void> promise = evalLater([&]() { done = true; });
     49   EXPECT_FALSE(done);
     50   promise.wait(waitScope);
     51   EXPECT_TRUE(done);
     52 }
     53 
     54 TEST(Async, EvalInt) {
     55   EventLoop loop;
     56   WaitScope waitScope(loop);
     57 
     58   bool done = false;
     59 
     60   Promise<int> promise = evalLater([&]() { done = true; return 123; });
     61   EXPECT_FALSE(done);
     62   EXPECT_EQ(123, promise.wait(waitScope));
     63   EXPECT_TRUE(done);
     64 }
     65 
     66 TEST(Async, There) {
     67   EventLoop loop;
     68   WaitScope waitScope(loop);
     69 
     70   Promise<int> a = 123;
     71   bool done = false;
     72 
     73   Promise<int> promise = a.then([&](int ai) { done = true; return ai + 321; });
     74   EXPECT_FALSE(done);
     75   EXPECT_EQ(444, promise.wait(waitScope));
     76   EXPECT_TRUE(done);
     77 }
     78 
     79 TEST(Async, ThereVoid) {
     80   EventLoop loop;
     81   WaitScope waitScope(loop);
     82 
     83   Promise<int> a = 123;
     84   int value = 0;
     85 
     86   Promise<void> promise = a.then([&](int ai) { value = ai; });
     87   EXPECT_EQ(0, value);
     88   promise.wait(waitScope);
     89   EXPECT_EQ(123, value);
     90 }
     91 
     92 TEST(Async, Exception) {
     93   EventLoop loop;
     94   WaitScope waitScope(loop);
     95 
     96   Promise<int> promise = evalLater(
     97       [&]() -> int { KJ_FAIL_ASSERT("foo") { return 123; } });
     98   EXPECT_TRUE(kj::runCatchingExceptions([&]() {
     99     // wait() only returns when compiling with -fno-exceptions.
    100     EXPECT_EQ(123, promise.wait(waitScope));
    101   }) != nullptr);
    102 }
    103 
    104 TEST(Async, HandleException) {
    105   EventLoop loop;
    106   WaitScope waitScope(loop);
    107 
    108   Promise<int> promise = evalLater(
    109       [&]() -> int { KJ_FAIL_ASSERT("foo") { return 123; } });
    110   int line = __LINE__ - 1;
    111 
    112   promise = promise.then(
    113       [](int i) { return i + 1; },
    114       [&](Exception&& e) { EXPECT_EQ(line, e.getLine()); return 345; });
    115 
    116   EXPECT_EQ(345, promise.wait(waitScope));
    117 }
    118 
    119 TEST(Async, PropagateException) {
    120   EventLoop loop;
    121   WaitScope waitScope(loop);
    122 
    123   Promise<int> promise = evalLater(
    124       [&]() -> int { KJ_FAIL_ASSERT("foo") { return 123; } });
    125   int line = __LINE__ - 1;
    126 
    127   promise = promise.then([](int i) { return i + 1; });
    128 
    129   promise = promise.then(
    130       [](int i) { return i + 2; },
    131       [&](Exception&& e) { EXPECT_EQ(line, e.getLine()); return 345; });
    132 
    133   EXPECT_EQ(345, promise.wait(waitScope));
    134 }
    135 
    136 TEST(Async, PropagateExceptionTypeChange) {
    137   EventLoop loop;
    138   WaitScope waitScope(loop);
    139 
    140   Promise<int> promise = evalLater(
    141       [&]() -> int { KJ_FAIL_ASSERT("foo") { return 123; } });
    142   int line = __LINE__ - 1;
    143 
    144   Promise<StringPtr> promise2 = promise.then([](int i) -> StringPtr { return "foo"; });
    145 
    146   promise2 = promise2.then(
    147       [](StringPtr s) -> StringPtr { return "bar"; },
    148       [&](Exception&& e) -> StringPtr { EXPECT_EQ(line, e.getLine()); return "baz"; });
    149 
    150   EXPECT_EQ("baz", promise2.wait(waitScope));
    151 }
    152 
    153 TEST(Async, Then) {
    154   EventLoop loop;
    155   WaitScope waitScope(loop);
    156 
    157   bool done = false;
    158 
    159   Promise<int> promise = Promise<int>(123).then([&](int i) {
    160     done = true;
    161     return i + 321;
    162   });
    163 
    164   EXPECT_FALSE(done);
    165 
    166   EXPECT_EQ(444, promise.wait(waitScope));
    167 
    168   EXPECT_TRUE(done);
    169 }
    170 
    171 TEST(Async, Chain) {
    172   EventLoop loop;
    173   WaitScope waitScope(loop);
    174 
    175   Promise<int> promise = evalLater([&]() -> int { return 123; });
    176   Promise<int> promise2 = evalLater([&]() -> int { return 321; });
    177 
    178   auto promise3 = promise.then([&](int i) {
    179     return promise2.then([i](int j) {
    180       return i + j;
    181     });
    182   });
    183 
    184   EXPECT_EQ(444, promise3.wait(waitScope));
    185 }
    186 
    187 TEST(Async, DeepChain) {
    188   EventLoop loop;
    189   WaitScope waitScope(loop);
    190 
    191   Promise<void> promise = NEVER_DONE;
    192 
    193   // Create a ridiculous chain of promises.
    194   for (uint i = 0; i < 1000; i++) {
    195     promise = evalLater(mvCapture(promise, [](Promise<void> promise) {
    196       return kj::mv(promise);
    197     }));
    198   }
    199 
    200   loop.run();
    201 
    202   auto trace = promise.trace();
    203   uint lines = 0;
    204   for (char c: trace) {
    205     lines += c == '\n';
    206   }
    207 
    208   // Chain nodes should have been collapsed such that instead of a chain of 1000 nodes, we have
    209   // 2-ish nodes.  We'll give a little room for implementation freedom.
    210   EXPECT_LT(lines, 5);
    211 }
    212 
    213 TEST(Async, DeepChain2) {
    214   EventLoop loop;
    215   WaitScope waitScope(loop);
    216 
    217   Promise<void> promise = nullptr;
    218   promise = evalLater([&]() {
    219     auto trace = promise.trace();
    220     uint lines = 0;
    221     for (char c: trace) {
    222       lines += c == '\n';
    223     }
    224 
    225     // Chain nodes should have been collapsed such that instead of a chain of 1000 nodes, we have
    226     // 2-ish nodes.  We'll give a little room for implementation freedom.
    227     EXPECT_LT(lines, 5);
    228   });
    229 
    230   // Create a ridiculous chain of promises.
    231   for (uint i = 0; i < 1000; i++) {
    232     promise = evalLater(mvCapture(promise, [](Promise<void> promise) {
    233       return kj::mv(promise);
    234     }));
    235   }
    236 
    237   promise.wait(waitScope);
    238 }
    239 
    240 Promise<void> makeChain(uint i) {
    241   if (i > 0) {
    242     return evalLater([i]() -> Promise<void> {
    243       return makeChain(i - 1);
    244     });
    245   } else {
    246     return NEVER_DONE;
    247   }
    248 }
    249 
    250 TEST(Async, DeepChain3) {
    251   EventLoop loop;
    252   WaitScope waitScope(loop);
    253 
    254   Promise<void> promise = makeChain(1000);
    255 
    256   loop.run();
    257 
    258   auto trace = promise.trace();
    259   uint lines = 0;
    260   for (char c: trace) {
    261     lines += c == '\n';
    262   }
    263 
    264   // Chain nodes should have been collapsed such that instead of a chain of 1000 nodes, we have
    265   // 2-ish nodes.  We'll give a little room for implementation freedom.
    266   EXPECT_LT(lines, 5);
    267 }
    268 
    269 Promise<void> makeChain2(uint i, Promise<void> promise) {
    270   if (i > 0) {
    271     return evalLater(mvCapture(promise, [i](Promise<void>&& promise) -> Promise<void> {
    272       return makeChain2(i - 1, kj::mv(promise));
    273     }));
    274   } else {
    275     return kj::mv(promise);
    276   }
    277 }
    278 
    279 TEST(Async, DeepChain4) {
    280   EventLoop loop;
    281   WaitScope waitScope(loop);
    282 
    283   Promise<void> promise = nullptr;
    284   promise = evalLater([&]() {
    285     auto trace = promise.trace();
    286     uint lines = 0;
    287     for (char c: trace) {
    288       lines += c == '\n';
    289     }
    290 
    291     // Chain nodes should have been collapsed such that instead of a chain of 1000 nodes, we have
    292     // 2-ish nodes.  We'll give a little room for implementation freedom.
    293     EXPECT_LT(lines, 5);
    294   });
    295 
    296   promise = makeChain2(1000, kj::mv(promise));
    297 
    298   promise.wait(waitScope);
    299 }
    300 
    301 TEST(Async, IgnoreResult) {
    302   EventLoop loop;
    303   WaitScope waitScope(loop);
    304 
    305   bool done = false;
    306 
    307   Promise<void> promise = Promise<int>(123).then([&](int i) {
    308     done = true;
    309     return i + 321;
    310   }).ignoreResult();
    311 
    312   EXPECT_FALSE(done);
    313 
    314   promise.wait(waitScope);
    315 
    316   EXPECT_TRUE(done);
    317 }
    318 
    319 TEST(Async, SeparateFulfiller) {
    320   EventLoop loop;
    321   WaitScope waitScope(loop);
    322 
    323   auto pair = newPromiseAndFulfiller<int>();
    324 
    325   EXPECT_TRUE(pair.fulfiller->isWaiting());
    326   pair.fulfiller->fulfill(123);
    327   EXPECT_FALSE(pair.fulfiller->isWaiting());
    328 
    329   EXPECT_EQ(123, pair.promise.wait(waitScope));
    330 }
    331 
    332 TEST(Async, SeparateFulfillerVoid) {
    333   EventLoop loop;
    334   WaitScope waitScope(loop);
    335 
    336   auto pair = newPromiseAndFulfiller<void>();
    337 
    338   EXPECT_TRUE(pair.fulfiller->isWaiting());
    339   pair.fulfiller->fulfill();
    340   EXPECT_FALSE(pair.fulfiller->isWaiting());
    341 
    342   pair.promise.wait(waitScope);
    343 }
    344 
    345 TEST(Async, SeparateFulfillerCanceled) {
    346   auto pair = newPromiseAndFulfiller<void>();
    347 
    348   EXPECT_TRUE(pair.fulfiller->isWaiting());
    349   pair.promise = nullptr;
    350   EXPECT_FALSE(pair.fulfiller->isWaiting());
    351 }
    352 
    353 TEST(Async, SeparateFulfillerChained) {
    354   EventLoop loop;
    355   WaitScope waitScope(loop);
    356 
    357   auto pair = newPromiseAndFulfiller<Promise<int>>();
    358   auto inner = newPromiseAndFulfiller<int>();
    359 
    360   EXPECT_TRUE(pair.fulfiller->isWaiting());
    361   pair.fulfiller->fulfill(kj::mv(inner.promise));
    362   EXPECT_FALSE(pair.fulfiller->isWaiting());
    363 
    364   inner.fulfiller->fulfill(123);
    365 
    366   EXPECT_EQ(123, pair.promise.wait(waitScope));
    367 }
    368 
    369 TEST(Async, SeparateFulfillerDiscarded) {
    370   EventLoop loop;
    371   WaitScope waitScope(loop);
    372 
    373   auto pair = newPromiseAndFulfiller<void>();
    374   pair.fulfiller = nullptr;
    375 
    376   KJ_EXPECT_THROW_RECOVERABLE_MESSAGE(
    377       "PromiseFulfiller was destroyed without fulfilling the promise",
    378       pair.promise.wait(waitScope));
    379 }
    380 
    381 #if !KJ_NO_EXCEPTIONS
    382 TEST(Async, SeparateFulfillerDiscardedDuringUnwind) {
    383   EventLoop loop;
    384   WaitScope waitScope(loop);
    385 
    386   auto pair = newPromiseAndFulfiller<int>();
    387   kj::runCatchingExceptions([&]() {
    388     auto fulfillerToDrop = kj::mv(pair.fulfiller);
    389     kj::throwFatalException(KJ_EXCEPTION(FAILED, "test exception"));
    390   });
    391 
    392   KJ_EXPECT_THROW_RECOVERABLE_MESSAGE(
    393       "test exception", pair.promise.wait(waitScope));
    394 }
    395 #endif
    396 
    397 TEST(Async, SeparateFulfillerMemoryLeak) {
    398   auto paf = kj::newPromiseAndFulfiller<void>();
    399   paf.fulfiller->fulfill();
    400 }
    401 
    402 TEST(Async, Ordering) {
    403   EventLoop loop;
    404   WaitScope waitScope(loop);
    405 
    406   class ErrorHandlerImpl: public TaskSet::ErrorHandler {
    407   public:
    408     void taskFailed(kj::Exception&& exception) override {
    409       KJ_FAIL_EXPECT(exception);
    410     }
    411   };
    412 
    413   int counter = 0;
    414   ErrorHandlerImpl errorHandler;
    415   kj::TaskSet tasks(errorHandler);
    416 
    417   tasks.add(evalLater([&]() {
    418     EXPECT_EQ(0, counter++);
    419 
    420     {
    421       // Use a promise and fulfiller so that we can fulfill the promise after waiting on it in
    422       // order to induce depth-first scheduling.
    423       auto paf = kj::newPromiseAndFulfiller<void>();
    424       tasks.add(paf.promise.then([&]() {
    425         EXPECT_EQ(1, counter++);
    426       }));
    427       paf.fulfiller->fulfill();
    428     }
    429 
    430     // .then() is scheduled breadth-first if the promise has already resolved, but depth-first
    431     // if the promise resolves later.
    432     tasks.add(Promise<void>(READY_NOW).then([&]() {
    433       EXPECT_EQ(4, counter++);
    434     }).then([&]() {
    435       EXPECT_EQ(5, counter++);
    436       tasks.add(kj::evalLast([&]() {
    437         EXPECT_EQ(7, counter++);
    438         tasks.add(kj::evalLater([&]() {
    439           EXPECT_EQ(8, counter++);
    440         }));
    441       }));
    442     }));
    443 
    444     {
    445       auto paf = kj::newPromiseAndFulfiller<void>();
    446       tasks.add(paf.promise.then([&]() {
    447         EXPECT_EQ(2, counter++);
    448         tasks.add(kj::evalLast([&]() {
    449           EXPECT_EQ(9, counter++);
    450           tasks.add(kj::evalLater([&]() {
    451             EXPECT_EQ(10, counter++);
    452           }));
    453         }));
    454       }));
    455       paf.fulfiller->fulfill();
    456     }
    457 
    458     // evalLater() is like READY_NOW.then().
    459     tasks.add(evalLater([&]() {
    460       EXPECT_EQ(6, counter++);
    461     }));
    462   }));
    463 
    464   tasks.add(evalLater([&]() {
    465     EXPECT_EQ(3, counter++);
    466 
    467     // Making this a chain should NOT cause it to preempt the first promise.  (This was a problem
    468     // at one point.)
    469     return Promise<void>(READY_NOW);
    470   }));
    471 
    472   tasks.onEmpty().wait(waitScope);
    473 
    474   EXPECT_EQ(11, counter);
    475 }
    476 
    477 TEST(Async, Fork) {
    478   EventLoop loop;
    479   WaitScope waitScope(loop);
    480 
    481   Promise<int> promise = evalLater([&]() { return 123; });
    482 
    483   auto fork = promise.fork();
    484 
    485 #if __GNUC__ && !__clang__ && __GNUC__ >= 7
    486 // GCC 7 decides the open-brace below is "misleadingly indented" as if it were guarded by the `for`
    487 // that appears in the implementation of KJ_REQUIRE(). Shut up shut up shut up.
    488 #pragma GCC diagnostic ignored "-Wmisleading-indentation"
    489 #endif
    490   KJ_ASSERT(!fork.hasBranches());
    491   {
    492     auto cancelBranch = fork.addBranch();
    493     KJ_ASSERT(fork.hasBranches());
    494   }
    495   KJ_ASSERT(!fork.hasBranches());
    496 
    497   auto branch1 = fork.addBranch().then([](int i) {
    498     EXPECT_EQ(123, i);
    499     return 456;
    500   });
    501   KJ_ASSERT(fork.hasBranches());
    502   auto branch2 = fork.addBranch().then([](int i) {
    503     EXPECT_EQ(123, i);
    504     return 789;
    505   });
    506   KJ_ASSERT(fork.hasBranches());
    507 
    508   {
    509     auto releaseFork = kj::mv(fork);
    510   }
    511 
    512   EXPECT_EQ(456, branch1.wait(waitScope));
    513   EXPECT_EQ(789, branch2.wait(waitScope));
    514 }
    515 
    516 struct RefcountedInt: public Refcounted {
    517   RefcountedInt(int i): i(i) {}
    518   int i;
    519   Own<RefcountedInt> addRef() { return kj::addRef(*this); }
    520 };
    521 
    522 TEST(Async, ForkRef) {
    523   EventLoop loop;
    524   WaitScope waitScope(loop);
    525 
    526   Promise<Own<RefcountedInt>> promise = evalLater([&]() {
    527     return refcounted<RefcountedInt>(123);
    528   });
    529 
    530   auto fork = promise.fork();
    531 
    532   auto branch1 = fork.addBranch().then([](Own<RefcountedInt>&& i) {
    533     EXPECT_EQ(123, i->i);
    534     return 456;
    535   });
    536   auto branch2 = fork.addBranch().then([](Own<RefcountedInt>&& i) {
    537     EXPECT_EQ(123, i->i);
    538     return 789;
    539   });
    540 
    541   {
    542     auto releaseFork = kj::mv(fork);
    543   }
    544 
    545   EXPECT_EQ(456, branch1.wait(waitScope));
    546   EXPECT_EQ(789, branch2.wait(waitScope));
    547 }
    548 
    549 TEST(Async, ForkMaybeRef) {
    550   EventLoop loop;
    551   WaitScope waitScope(loop);
    552 
    553   Promise<Maybe<Own<RefcountedInt>>> promise = evalLater([&]() {
    554     return Maybe<Own<RefcountedInt>>(refcounted<RefcountedInt>(123));
    555   });
    556 
    557   auto fork = promise.fork();
    558 
    559   auto branch1 = fork.addBranch().then([](Maybe<Own<RefcountedInt>>&& i) {
    560     EXPECT_EQ(123, KJ_REQUIRE_NONNULL(i)->i);
    561     return 456;
    562   });
    563   auto branch2 = fork.addBranch().then([](Maybe<Own<RefcountedInt>>&& i) {
    564     EXPECT_EQ(123, KJ_REQUIRE_NONNULL(i)->i);
    565     return 789;
    566   });
    567 
    568   {
    569     auto releaseFork = kj::mv(fork);
    570   }
    571 
    572   EXPECT_EQ(456, branch1.wait(waitScope));
    573   EXPECT_EQ(789, branch2.wait(waitScope));
    574 }
    575 
    576 
    577 TEST(Async, Split) {
    578   EventLoop loop;
    579   WaitScope waitScope(loop);
    580 
    581   Promise<Tuple<int, String, Promise<int>>> promise = evalLater([&]() {
    582     return kj::tuple(123, str("foo"), Promise<int>(321));
    583   });
    584 
    585   Tuple<Promise<int>, Promise<String>, Promise<int>> split = promise.split();
    586 
    587   EXPECT_EQ(123, get<0>(split).wait(waitScope));
    588   EXPECT_EQ("foo", get<1>(split).wait(waitScope));
    589   EXPECT_EQ(321, get<2>(split).wait(waitScope));
    590 }
    591 
    592 TEST(Async, ExclusiveJoin) {
    593   {
    594     EventLoop loop;
    595     WaitScope waitScope(loop);
    596 
    597     auto left = evalLater([&]() { return 123; });
    598     auto right = newPromiseAndFulfiller<int>();  // never fulfilled
    599 
    600     EXPECT_EQ(123, left.exclusiveJoin(kj::mv(right.promise)).wait(waitScope));
    601   }
    602 
    603   {
    604     EventLoop loop;
    605     WaitScope waitScope(loop);
    606 
    607     auto left = newPromiseAndFulfiller<int>();  // never fulfilled
    608     auto right = evalLater([&]() { return 123; });
    609 
    610     EXPECT_EQ(123, left.promise.exclusiveJoin(kj::mv(right)).wait(waitScope));
    611   }
    612 
    613   {
    614     EventLoop loop;
    615     WaitScope waitScope(loop);
    616 
    617     auto left = evalLater([&]() { return 123; });
    618     auto right = evalLater([&]() { return 456; });
    619 
    620     EXPECT_EQ(123, left.exclusiveJoin(kj::mv(right)).wait(waitScope));
    621   }
    622 
    623   {
    624     EventLoop loop;
    625     WaitScope waitScope(loop);
    626 
    627     auto left = evalLater([&]() { return 123; });
    628     auto right = evalLater([&]() { return 456; }).eagerlyEvaluate(nullptr);
    629 
    630     EXPECT_EQ(456, left.exclusiveJoin(kj::mv(right)).wait(waitScope));
    631   }
    632 }
    633 
    634 TEST(Async, ArrayJoin) {
    635   EventLoop loop;
    636   WaitScope waitScope(loop);
    637 
    638   auto builder = heapArrayBuilder<Promise<int>>(3);
    639   builder.add(123);
    640   builder.add(456);
    641   builder.add(789);
    642 
    643   Promise<Array<int>> promise = joinPromises(builder.finish());
    644 
    645   auto result = promise.wait(waitScope);
    646 
    647   ASSERT_EQ(3u, result.size());
    648   EXPECT_EQ(123, result[0]);
    649   EXPECT_EQ(456, result[1]);
    650   EXPECT_EQ(789, result[2]);
    651 }
    652 
    653 TEST(Async, ArrayJoinVoid) {
    654   EventLoop loop;
    655   WaitScope waitScope(loop);
    656 
    657   auto builder = heapArrayBuilder<Promise<void>>(3);
    658   builder.add(READY_NOW);
    659   builder.add(READY_NOW);
    660   builder.add(READY_NOW);
    661 
    662   Promise<void> promise = joinPromises(builder.finish());
    663 
    664   promise.wait(waitScope);
    665 }
    666 
    667 TEST(Async, Canceler) {
    668   EventLoop loop;
    669   WaitScope waitScope(loop);
    670   Canceler canceler;
    671 
    672   auto never = canceler.wrap(kj::Promise<void>(kj::NEVER_DONE));
    673   auto now = canceler.wrap(kj::Promise<void>(kj::READY_NOW));
    674   auto neverI = canceler.wrap(kj::Promise<void>(kj::NEVER_DONE).then([]() { return 123u; }));
    675   auto nowI = canceler.wrap(kj::Promise<uint>(123u));
    676 
    677   KJ_EXPECT(!never.poll(waitScope));
    678   KJ_EXPECT(now.poll(waitScope));
    679   KJ_EXPECT(!neverI.poll(waitScope));
    680   KJ_EXPECT(nowI.poll(waitScope));
    681 
    682   canceler.cancel("foobar");
    683 
    684   KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("foobar", never.wait(waitScope));
    685   now.wait(waitScope);
    686   KJ_EXPECT_THROW_MESSAGE("foobar", neverI.wait(waitScope));
    687   KJ_EXPECT(nowI.wait(waitScope) == 123u);
    688 }
    689 
    690 TEST(Async, CancelerDoubleWrap) {
    691   EventLoop loop;
    692   WaitScope waitScope(loop);
    693 
    694   // This used to crash.
    695   Canceler canceler;
    696   auto promise = canceler.wrap(canceler.wrap(kj::Promise<void>(kj::NEVER_DONE)));
    697   canceler.cancel("whoops");
    698 }
    699 
    700 class ErrorHandlerImpl: public TaskSet::ErrorHandler {
    701 public:
    702   uint exceptionCount = 0;
    703   void taskFailed(kj::Exception&& exception) override {
    704     EXPECT_TRUE(exception.getDescription().endsWith("example TaskSet failure"));
    705     ++exceptionCount;
    706   }
    707 };
    708 
    709 TEST(Async, TaskSet) {
    710   EventLoop loop;
    711   WaitScope waitScope(loop);
    712   ErrorHandlerImpl errorHandler;
    713   TaskSet tasks(errorHandler);
    714 
    715   int counter = 0;
    716 
    717   tasks.add(evalLater([&]() {
    718     EXPECT_EQ(0, counter++);
    719   }));
    720   tasks.add(evalLater([&]() {
    721     EXPECT_EQ(1, counter++);
    722     KJ_FAIL_ASSERT("example TaskSet failure") { break; }
    723   }));
    724   tasks.add(evalLater([&]() {
    725     EXPECT_EQ(2, counter++);
    726   }));
    727 
    728   auto ignore KJ_UNUSED = evalLater([&]() {
    729     KJ_FAIL_EXPECT("Promise without waiter shouldn't execute.");
    730   });
    731 
    732   evalLater([&]() {
    733     EXPECT_EQ(3, counter++);
    734   }).wait(waitScope);
    735 
    736   EXPECT_EQ(4, counter);
    737   EXPECT_EQ(1u, errorHandler.exceptionCount);
    738 }
    739 
    740 TEST(Async, LargeTaskSetDestruction) {
    741   static constexpr size_t stackSize = 200 * 1024;
    742 
    743   static auto testBody = [] {
    744 
    745     ErrorHandlerImpl errorHandler;
    746     TaskSet tasks(errorHandler);
    747 
    748     for (int i = 0; i < stackSize / sizeof(void*); i++) {
    749       tasks.add(kj::NEVER_DONE);
    750     }
    751   };
    752 
    753 #if KJ_USE_FIBERS
    754   EventLoop loop;
    755   WaitScope waitScope(loop);
    756 
    757   startFiber(stackSize,
    758       [](WaitScope&) mutable {
    759     testBody();
    760   }).wait(waitScope);
    761 
    762 #else
    763   pthread_attr_t attr;
    764   KJ_REQUIRE(0 == pthread_attr_init(&attr));
    765   KJ_DEFER(KJ_REQUIRE(0 == pthread_attr_destroy(&attr)));
    766 
    767   KJ_REQUIRE(0 == pthread_attr_setstacksize(&attr, stackSize));
    768   pthread_t thread;
    769   KJ_REQUIRE(0 == pthread_create(&thread, &attr, [](void*) -> void* {
    770     EventLoop loop;
    771     WaitScope waitScope(loop);
    772     testBody();
    773     return nullptr;
    774   }, nullptr));
    775   KJ_REQUIRE(0 == pthread_join(thread, nullptr));
    776 #endif
    777 }
    778 
    779 TEST(Async, TaskSet) {
    780   EventLoop loop;
    781   WaitScope waitScope(loop);
    782 
    783   bool destroyed = false;
    784 
    785   {
    786     ErrorHandlerImpl errorHandler;
    787     TaskSet tasks(errorHandler);
    788 
    789     tasks.add(kj::Promise<void>(kj::NEVER_DONE)
    790         .attach(kj::defer([&]() {
    791       // During cancellation, append another task!
    792       // It had better be canceled too!
    793       tasks.add(kj::Promise<void>(kj::READY_NOW)
    794           .then([]() { KJ_FAIL_EXPECT("shouldn't get here"); },
    795                 [](auto) { KJ_FAIL_EXPECT("shouldn't get here"); })
    796           .attach(kj::defer([&]() {
    797         destroyed = true;
    798       })));
    799     })));
    800   }
    801 
    802   KJ_EXPECT(destroyed);
    803 
    804   // Give a chance for the "shouldn't get here" asserts to execute, if the event is still running,
    805   // which it shouldn't be.
    806   waitScope.poll();
    807 }
    808 
    809 TEST(Async, TaskSetOnEmpty) {
    810   EventLoop loop;
    811   WaitScope waitScope(loop);
    812   ErrorHandlerImpl errorHandler;
    813   TaskSet tasks(errorHandler);
    814 
    815   KJ_EXPECT(tasks.isEmpty());
    816 
    817   auto paf = newPromiseAndFulfiller<void>();
    818   tasks.add(kj::mv(paf.promise));
    819   tasks.add(evalLater([]() {}));
    820 
    821   KJ_EXPECT(!tasks.isEmpty());
    822 
    823   auto promise = tasks.onEmpty();
    824   KJ_EXPECT(!promise.poll(waitScope));
    825   KJ_EXPECT(!tasks.isEmpty());
    826 
    827   paf.fulfiller->fulfill();
    828   KJ_ASSERT(promise.poll(waitScope));
    829   KJ_EXPECT(tasks.isEmpty());
    830   promise.wait(waitScope);
    831 }
    832 
    833 class DestructorDetector {
    834 public:
    835   DestructorDetector(bool& setTrue): setTrue(setTrue) {}
    836   ~DestructorDetector() { setTrue = true; }
    837 
    838 private:
    839   bool& setTrue;
    840 };
    841 
    842 TEST(Async, Attach) {
    843   bool destroyed = false;
    844 
    845   EventLoop loop;
    846   WaitScope waitScope(loop);
    847 
    848   Promise<int> promise = evalLater([&]() {
    849     EXPECT_FALSE(destroyed);
    850     return 123;
    851   }).attach(kj::heap<DestructorDetector>(destroyed));
    852 
    853   promise = promise.then([&](int i) {
    854     EXPECT_TRUE(destroyed);
    855     return i + 321;
    856   });
    857 
    858   EXPECT_FALSE(destroyed);
    859   EXPECT_EQ(444, promise.wait(waitScope));
    860   EXPECT_TRUE(destroyed);
    861 }
    862 
    863 TEST(Async, EagerlyEvaluate) {
    864   bool called = false;
    865 
    866   EventLoop loop;
    867   WaitScope waitScope(loop);
    868 
    869   Promise<void> promise = Promise<void>(READY_NOW).then([&]() {
    870     called = true;
    871   });
    872   evalLater([]() {}).wait(waitScope);
    873 
    874   EXPECT_FALSE(called);
    875 
    876   promise = promise.eagerlyEvaluate(nullptr);
    877 
    878   evalLater([]() {}).wait(waitScope);
    879 
    880   EXPECT_TRUE(called);
    881 }
    882 
    883 TEST(Async, Detach) {
    884   EventLoop loop;
    885   WaitScope waitScope(loop);
    886 
    887   bool ran1 = false;
    888   bool ran2 = false;
    889   bool ran3 = false;
    890 
    891   {
    892     // let returned promise be destroyed (canceled)
    893     auto ignore KJ_UNUSED = evalLater([&]() { ran1 = true; });
    894   }
    895   evalLater([&]() { ran2 = true; }).detach([](kj::Exception&&) { ADD_FAILURE(); });
    896   evalLater([]() { KJ_FAIL_ASSERT("foo"){break;} }).detach([&](kj::Exception&& e) { ran3 = true; });
    897 
    898   EXPECT_FALSE(ran1);
    899   EXPECT_FALSE(ran2);
    900   EXPECT_FALSE(ran3);
    901 
    902   evalLater([]() {}).wait(waitScope);
    903 
    904   EXPECT_FALSE(ran1);
    905   EXPECT_TRUE(ran2);
    906   EXPECT_TRUE(ran3);
    907 }
    908 
    909 class DummyEventPort: public EventPort {
    910 public:
    911   bool runnable = false;
    912   int callCount = 0;
    913 
    914   bool wait() override { KJ_FAIL_ASSERT("Nothing to wait for."); }
    915   bool poll() override { return false; }
    916   void setRunnable(bool runnable) override {
    917     this->runnable = runnable;
    918     ++callCount;
    919   }
    920 };
    921 
    922 TEST(Async, SetRunnable) {
    923   DummyEventPort port;
    924   EventLoop loop(port);
    925   WaitScope waitScope(loop);
    926 
    927   EXPECT_FALSE(port.runnable);
    928   EXPECT_EQ(0, port.callCount);
    929 
    930   {
    931     auto promise = evalLater([]() {}).eagerlyEvaluate(nullptr);
    932 
    933     EXPECT_TRUE(port.runnable);
    934     loop.run(1);
    935     EXPECT_FALSE(port.runnable);
    936     EXPECT_EQ(2, port.callCount);
    937 
    938     promise.wait(waitScope);
    939     EXPECT_FALSE(port.runnable);
    940     EXPECT_EQ(4, port.callCount);
    941   }
    942 
    943   {
    944     auto paf = newPromiseAndFulfiller<void>();
    945     auto promise = paf.promise.then([]() {}).eagerlyEvaluate(nullptr);
    946     EXPECT_FALSE(port.runnable);
    947 
    948     auto promise2 = evalLater([]() {}).eagerlyEvaluate(nullptr);
    949     paf.fulfiller->fulfill();
    950 
    951     EXPECT_TRUE(port.runnable);
    952     loop.run(1);
    953     EXPECT_TRUE(port.runnable);
    954     loop.run(10);
    955     EXPECT_FALSE(port.runnable);
    956 
    957     promise.wait(waitScope);
    958     EXPECT_FALSE(port.runnable);
    959 
    960     EXPECT_EQ(8, port.callCount);
    961   }
    962 }
    963 
    964 TEST(Async, Poll) {
    965   EventLoop loop;
    966   WaitScope waitScope(loop);
    967 
    968   auto paf = newPromiseAndFulfiller<void>();
    969   KJ_ASSERT(!paf.promise.poll(waitScope));
    970   paf.fulfiller->fulfill();
    971   KJ_ASSERT(paf.promise.poll(waitScope));
    972   paf.promise.wait(waitScope);
    973 }
    974 
    975 KJ_TEST("exclusiveJoin both events complete simultaneously") {
    976   // Previously, if both branches of an exclusiveJoin() completed simultaneously, then the parent
    977   // event could be armed twice. This is an error, but the exact results of this error depend on
    978   // the parent PromiseNode type. One case where it matters is ArrayJoinPromiseNode, which counts
    979   // events and decides it is done when it has received exactly the number of events expected.
    980 
    981   EventLoop loop;
    982   WaitScope waitScope(loop);
    983 
    984   auto builder = kj::heapArrayBuilder<kj::Promise<uint>>(2);
    985   builder.add(kj::Promise<uint>(123).exclusiveJoin(kj::Promise<uint>(456)));
    986   builder.add(kj::NEVER_DONE);
    987   auto joined = kj::joinPromises(builder.finish());
    988 
    989   KJ_EXPECT(!joined.poll(waitScope));
    990 }
    991 
    992 #if KJ_USE_FIBERS
    993 KJ_TEST("start a fiber") {
    994   EventLoop loop;
    995   WaitScope waitScope(loop);
    996 
    997   auto paf = newPromiseAndFulfiller<int>();
    998 
    999   Promise<StringPtr> fiber = startFiber(65536,
   1000       [promise = kj::mv(paf.promise)](WaitScope& fiberScope) mutable {
   1001     int i = promise.wait(fiberScope);
   1002     KJ_EXPECT(i == 123);
   1003     return "foo"_kj;
   1004   });
   1005 
   1006   KJ_EXPECT(!fiber.poll(waitScope));
   1007 
   1008   paf.fulfiller->fulfill(123);
   1009 
   1010   KJ_ASSERT(fiber.poll(waitScope));
   1011   KJ_EXPECT(fiber.wait(waitScope) == "foo");
   1012 }
   1013 
   1014 KJ_TEST("fiber promise chaining") {
   1015   EventLoop loop;
   1016   WaitScope waitScope(loop);
   1017 
   1018   auto paf = newPromiseAndFulfiller<int>();
   1019   bool ran = false;
   1020 
   1021   Promise<int> fiber = startFiber(65536,
   1022       [promise = kj::mv(paf.promise), &ran](WaitScope& fiberScope) mutable {
   1023     ran = true;
   1024     return kj::mv(promise);
   1025   });
   1026 
   1027   KJ_EXPECT(!ran);
   1028   KJ_EXPECT(!fiber.poll(waitScope));
   1029   KJ_EXPECT(ran);
   1030 
   1031   paf.fulfiller->fulfill(123);
   1032 
   1033   KJ_ASSERT(fiber.poll(waitScope));
   1034   KJ_EXPECT(fiber.wait(waitScope) == 123);
   1035 }
   1036 
   1037 KJ_TEST("throw from a fiber") {
   1038   EventLoop loop;
   1039   WaitScope waitScope(loop);
   1040 
   1041   auto paf = newPromiseAndFulfiller<void>();
   1042 
   1043   Promise<void> fiber = startFiber(65536,
   1044       [promise = kj::mv(paf.promise)](WaitScope& fiberScope) mutable {
   1045     promise.wait(fiberScope);
   1046     KJ_FAIL_EXPECT("wait() should have thrown");
   1047   });
   1048 
   1049   KJ_EXPECT(!fiber.poll(waitScope));
   1050 
   1051   paf.fulfiller->reject(KJ_EXCEPTION(FAILED, "test exception"));
   1052 
   1053   KJ_ASSERT(fiber.poll(waitScope));
   1054   KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test exception", fiber.wait(waitScope));
   1055 }
   1056 
   1057 #if !__MINGW32__ || __MINGW64__
   1058 // This test fails on MinGW 32-bit builds due to a compiler bug with exceptions + fibers:
   1059 //     https://sourceforge.net/p/mingw-w64/bugs/835/
   1060 KJ_TEST("cancel a fiber") {
   1061   EventLoop loop;
   1062   WaitScope waitScope(loop);
   1063 
   1064   // When exceptions are disabled we can't wait() on a non-void promise that throws.
   1065   auto paf = newPromiseAndFulfiller<void>();
   1066 
   1067   bool exited = false;
   1068   bool canceled = false;
   1069 
   1070   {
   1071     Promise<StringPtr> fiber = startFiber(65536,
   1072         [promise = kj::mv(paf.promise), &exited, &canceled](WaitScope& fiberScope) mutable {
   1073       KJ_DEFER(exited = true);
   1074       try {
   1075         promise.wait(fiberScope);
   1076       } catch (kj::CanceledException) {
   1077         canceled = true;
   1078         throw;
   1079       }
   1080       return "foo"_kj;
   1081     });
   1082 
   1083     KJ_EXPECT(!fiber.poll(waitScope));
   1084     KJ_EXPECT(!exited);
   1085     KJ_EXPECT(!canceled);
   1086   }
   1087 
   1088   KJ_EXPECT(exited);
   1089   KJ_EXPECT(canceled);
   1090 }
   1091 #endif
   1092 
   1093 KJ_TEST("fiber pool") {
   1094   EventLoop loop;
   1095   WaitScope waitScope(loop);
   1096   FiberPool pool(65536);
   1097 
   1098   int* i1_local = nullptr;
   1099   int* i2_local = nullptr;
   1100 
   1101   auto run = [&]() mutable {
   1102     auto paf1 = newPromiseAndFulfiller<int>();
   1103     auto paf2 = newPromiseAndFulfiller<int>();
   1104 
   1105     {
   1106       Promise<int> fiber1 = pool.startFiber([&, promise = kj::mv(paf1.promise)](WaitScope& scope) mutable {
   1107         int i = promise.wait(scope);
   1108         KJ_EXPECT(i == 123);
   1109         if (i1_local == nullptr) {
   1110           i1_local = &i;
   1111         } else {
   1112           KJ_ASSERT(i1_local == &i);
   1113         }
   1114         return i;
   1115       });
   1116       {
   1117         Promise<int> fiber2 = pool.startFiber([&, promise = kj::mv(paf2.promise)](WaitScope& scope) mutable {
   1118           int i = promise.wait(scope);
   1119           KJ_EXPECT(i == 456);
   1120           if (i2_local == nullptr) {
   1121             i2_local = &i;
   1122           } else {
   1123             KJ_ASSERT(i2_local == &i);
   1124           }
   1125           return i;
   1126         });
   1127 
   1128         KJ_EXPECT(!fiber1.poll(waitScope));
   1129         KJ_EXPECT(!fiber2.poll(waitScope));
   1130 
   1131         KJ_EXPECT(pool.getFreelistSize() == 0);
   1132 
   1133         paf2.fulfiller->fulfill(456);
   1134 
   1135         KJ_EXPECT(!fiber1.poll(waitScope));
   1136         KJ_ASSERT(fiber2.poll(waitScope));
   1137         KJ_EXPECT(fiber2.wait(waitScope) == 456);
   1138 
   1139         KJ_EXPECT(pool.getFreelistSize() == 1);
   1140       }
   1141 
   1142       paf1.fulfiller->fulfill(123);
   1143 
   1144       KJ_ASSERT(fiber1.poll(waitScope));
   1145       KJ_EXPECT(fiber1.wait(waitScope) == 123);
   1146 
   1147       KJ_EXPECT(pool.getFreelistSize() == 2);
   1148     }
   1149   };
   1150   run();
   1151   KJ_ASSERT_NONNULL(i1_local);
   1152   KJ_ASSERT_NONNULL(i2_local);
   1153   // run the same thing and reuse the fibers
   1154   run();
   1155 }
   1156 
   1157 bool onOurStack(char* p) {
   1158   // If p points less than 64k away from a random stack variable, then it must be on the same
   1159   // stack, since we never allocate stacks smaller than 64k.
   1160   char c;
   1161   ptrdiff_t diff = p - &c;
   1162   return diff < 65536 && diff > -65536;
   1163 }
   1164 
   1165 KJ_TEST("fiber pool runSynchronously()") {
   1166   FiberPool pool(65536);
   1167 
   1168   {
   1169     char c;
   1170     KJ_EXPECT(onOurStack(&c));  // sanity check...
   1171   }
   1172 
   1173   char* ptr1 = nullptr;
   1174   char* ptr2 = nullptr;
   1175 
   1176   pool.runSynchronously([&]() {
   1177     char c;
   1178     ptr1 = &c;
   1179   });
   1180   KJ_ASSERT(ptr1 != nullptr);
   1181 
   1182   pool.runSynchronously([&]() {
   1183     char c;
   1184     ptr2 = &c;
   1185   });
   1186   KJ_ASSERT(ptr2 != nullptr);
   1187 
   1188   // Should have used the same stack both times, so local var would be in the same place.
   1189   KJ_EXPECT(ptr1 == ptr2);
   1190 
   1191   // Should have been on a different stack from the main stack.
   1192   KJ_EXPECT(!onOurStack(ptr1));
   1193 
   1194   KJ_EXPECT_THROW_MESSAGE("test exception",
   1195       pool.runSynchronously([&]() { KJ_FAIL_ASSERT("test exception"); }));
   1196 }
   1197 
   1198 KJ_TEST("fiber pool limit") {
   1199   FiberPool pool(65536);
   1200 
   1201   pool.setMaxFreelist(1);
   1202 
   1203   kj::MutexGuarded<uint> state;
   1204 
   1205   char* ptr1;
   1206   char* ptr2;
   1207 
   1208   // Run some code that uses two stacks in separate threads at the same time.
   1209   {
   1210     kj::Thread thread([&]() noexcept {
   1211       auto lock = state.lockExclusive();
   1212       lock.wait([](uint val) { return val == 1; });
   1213 
   1214       pool.runSynchronously([&]() {
   1215         char c;
   1216         ptr2 = &c;
   1217 
   1218         *lock = 2;
   1219         lock.wait([](uint val) { return val == 3; });
   1220       });
   1221     });
   1222 
   1223     ([&]() noexcept {
   1224       auto lock = state.lockExclusive();
   1225 
   1226       pool.runSynchronously([&]() {
   1227         char c;
   1228         ptr1 = &c;
   1229 
   1230         *lock = 1;
   1231         lock.wait([](uint val) { return val == 2; });
   1232       });
   1233 
   1234       *lock = 3;
   1235     })();
   1236   }
   1237 
   1238   KJ_EXPECT(pool.getFreelistSize() == 1);
   1239 
   1240   // We expect that if we reuse a stack from the pool, it will be the last one that exited, which
   1241   // is the one from the thread.
   1242   pool.runSynchronously([&]() {
   1243     KJ_EXPECT(onOurStack(ptr2));
   1244     KJ_EXPECT(!onOurStack(ptr1));
   1245 
   1246     KJ_EXPECT(pool.getFreelistSize() == 0);
   1247   });
   1248 
   1249   KJ_EXPECT(pool.getFreelistSize() == 1);
   1250 
   1251   // Note that it would NOT work to try to allocate two stacks at the same time again and verify
   1252   // that the second stack doesn't match the previously-deleted stack, because there's a high
   1253   // likelihood that the new stack would be allocated in the same location.
   1254 }
   1255 
   1256 KJ_TEST("run event loop on freelisted stacks") {
   1257   FiberPool pool(65536);
   1258 
   1259   class MockEventPort: public EventPort {
   1260   public:
   1261     bool wait() override {
   1262       char c;
   1263       waitStack = &c;
   1264       KJ_IF_MAYBE(f, fulfiller) {
   1265         f->get()->fulfill();
   1266         fulfiller = nullptr;
   1267       }
   1268       return false;
   1269     }
   1270     bool poll() override {
   1271       char c;
   1272       pollStack = &c;
   1273       KJ_IF_MAYBE(f, fulfiller) {
   1274         f->get()->fulfill();
   1275         fulfiller = nullptr;
   1276       }
   1277       return false;
   1278     }
   1279 
   1280     char* waitStack = nullptr;
   1281     char* pollStack = nullptr;
   1282 
   1283     kj::Maybe<kj::Own<PromiseFulfiller<void>>> fulfiller;
   1284   };
   1285 
   1286   MockEventPort port;
   1287   EventLoop loop(port);
   1288   WaitScope waitScope(loop);
   1289   waitScope.runEventCallbacksOnStackPool(pool);
   1290 
   1291   {
   1292     auto paf = newPromiseAndFulfiller<void>();
   1293     port.fulfiller = kj::mv(paf.fulfiller);
   1294 
   1295     char* ptr1 = nullptr;
   1296     char* ptr2 = nullptr;
   1297     kj::evalLater([&]() {
   1298       char c;
   1299       ptr1 = &c;
   1300       return kj::mv(paf.promise);
   1301     }).then([&]() {
   1302       char c;
   1303       ptr2 = &c;
   1304     }).wait(waitScope);
   1305 
   1306     KJ_EXPECT(ptr1 != nullptr);
   1307     KJ_EXPECT(ptr2 != nullptr);
   1308     KJ_EXPECT(port.waitStack != nullptr);
   1309     KJ_EXPECT(port.pollStack == nullptr);
   1310 
   1311     // The event callbacks should have run on a different stack, but the wait should have been on
   1312     // the main stack.
   1313     KJ_EXPECT(!onOurStack(ptr1));
   1314     KJ_EXPECT(!onOurStack(ptr2));
   1315     KJ_EXPECT(onOurStack(port.waitStack));
   1316 
   1317     pool.runSynchronously([&]() {
   1318       // This should run on the same stack where the event callbacks ran.
   1319       KJ_EXPECT(onOurStack(ptr1));
   1320       KJ_EXPECT(onOurStack(ptr2));
   1321       KJ_EXPECT(!onOurStack(port.waitStack));
   1322     });
   1323   }
   1324 
   1325   port.waitStack = nullptr;
   1326   port.pollStack = nullptr;
   1327 
   1328   // Now try poll() instead of wait(). Note that since poll() doesn't block, we let it run on the
   1329   // event stack.
   1330   {
   1331     auto paf = newPromiseAndFulfiller<void>();
   1332     port.fulfiller = kj::mv(paf.fulfiller);
   1333 
   1334     char* ptr1 = nullptr;
   1335     char* ptr2 = nullptr;
   1336     auto promise = kj::evalLater([&]() {
   1337       char c;
   1338       ptr1 = &c;
   1339       return kj::mv(paf.promise);
   1340     }).then([&]() {
   1341       char c;
   1342       ptr2 = &c;
   1343     });
   1344 
   1345     KJ_EXPECT(promise.poll(waitScope));
   1346 
   1347     KJ_EXPECT(ptr1 != nullptr);
   1348     KJ_EXPECT(ptr2 == nullptr);  // didn't run because of lazy continuation evaluation
   1349     KJ_EXPECT(port.waitStack == nullptr);
   1350     KJ_EXPECT(port.pollStack != nullptr);
   1351 
   1352     // The event callback should have run on a different stack, and poll() should have run on
   1353     // a separate stack too.
   1354     KJ_EXPECT(!onOurStack(ptr1));
   1355     KJ_EXPECT(!onOurStack(port.pollStack));
   1356 
   1357     pool.runSynchronously([&]() {
   1358       // This should run on the same stack where the event callbacks ran.
   1359       KJ_EXPECT(onOurStack(ptr1));
   1360       KJ_EXPECT(onOurStack(port.pollStack));
   1361     });
   1362   }
   1363 }
   1364 #endif
   1365 
   1366 KJ_TEST("retryOnDisconnect") {
   1367   EventLoop loop;
   1368   WaitScope waitScope(loop);
   1369 
   1370   {
   1371     uint i = 0;
   1372     auto promise = retryOnDisconnect([&]() -> Promise<int> {
   1373       i++;
   1374       return 123;
   1375     });
   1376     KJ_EXPECT(i == 0);
   1377     KJ_EXPECT(promise.wait(waitScope) == 123);
   1378     KJ_EXPECT(i == 1);
   1379   }
   1380 
   1381   {
   1382     uint i = 0;
   1383     auto promise = retryOnDisconnect([&]() -> Promise<int> {
   1384       if (i++ == 0) {
   1385         return KJ_EXCEPTION(DISCONNECTED, "test disconnect");
   1386       } else {
   1387         return 123;
   1388       }
   1389     });
   1390     KJ_EXPECT(i == 0);
   1391     KJ_EXPECT(promise.wait(waitScope) == 123);
   1392     KJ_EXPECT(i == 2);
   1393   }
   1394 
   1395 
   1396   {
   1397     uint i = 0;
   1398     auto promise = retryOnDisconnect([&]() -> Promise<int> {
   1399       if (i++ <= 1) {
   1400         return KJ_EXCEPTION(DISCONNECTED, "test disconnect", i);
   1401       } else {
   1402         return 123;
   1403       }
   1404     });
   1405     KJ_EXPECT(i == 0);
   1406     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test disconnect; i = 2",
   1407         promise.ignoreResult().wait(waitScope));
   1408     KJ_EXPECT(i == 2);
   1409   }
   1410 
   1411   {
   1412     // Test passing a reference to a function.
   1413     struct Func {
   1414       uint i = 0;
   1415       Promise<int> operator()() {
   1416         if (i++ == 0) {
   1417           return KJ_EXCEPTION(DISCONNECTED, "test disconnect");
   1418         } else {
   1419           return 123;
   1420         }
   1421       }
   1422     };
   1423     Func func;
   1424 
   1425     auto promise = retryOnDisconnect(func);
   1426     KJ_EXPECT(func.i == 0);
   1427     KJ_EXPECT(promise.wait(waitScope) == 123);
   1428     KJ_EXPECT(func.i == 2);
   1429   }
   1430 }
   1431 
   1432 }  // namespace
   1433 }  // namespace kj