capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

async.c++ (93249B)


      1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #undef _FORTIFY_SOURCE
     23 // If _FORTIFY_SOURCE is defined, longjmp will complain when it detects the stack
     24 // pointer moving in the "wrong direction", thinking you're jumping to a non-existent
     25 // stack frame. But we use longjmp to jump between different stacks to implement fibers,
     26 // so this check isn't appropriate for us.
     27 
     28 #if _WIN32 || __CYGWIN__
     29 #include "win32-api-version.h"
     30 #elif __APPLE__
     31 // getcontext() and friends are marked deprecated on MacOS but seemingly no replacement is
     32 // provided. It appears as if they deprecated it solely because the standards bodies deprecated it,
     33 // which they seemingly did mainly because the proper sematics are too difficult for them to
     34 // define. I doubt MacOS would actually remove these functions as they are widely used. But if they
     35 // do, then I guess we'll need to fall back to using setjmp()/longjmp(), and some sort of hack
     36 // involving sigaltstack() (and generating a fake signal I guess) in order to initialize the fiber
     37 // in the first place. Or we could use assembly, I suppose. Either way, ick.
     38 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
     39 #define _XOPEN_SOURCE  // Must be defined to see getcontext() on MacOS.
     40 #endif
     41 
     42 #include "async.h"
     43 #include "debug.h"
     44 #include "vector.h"
     45 #include "threadlocal.h"
     46 #include "mutex.h"
     47 #include "one-of.h"
     48 #include "function.h"
     49 #include "list.h"
     50 #include <deque>
     51 
     52 #if _WIN32 || __CYGWIN__
     53 #include <windows.h>  // for Sleep(0) and fibers
     54 #include "windows-sanity.h"
     55 #else
     56 
     57 #if KJ_USE_FIBERS
     58 #include <ucontext.h>
     59 #include <setjmp.h>    // for fibers
     60 #endif
     61 
     62 #include <sys/mman.h>  // mmap(), for allocating new stacks
     63 #include <unistd.h>    // sysconf()
     64 #include <errno.h>
     65 #endif
     66 
     67 #if !_WIN32
     68 #include <sched.h>    // just for sched_yield()
     69 #endif
     70 
     71 #if !KJ_NO_RTTI
     72 #include <typeinfo>
     73 #if __GNUC__
     74 #include <cxxabi.h>
     75 #endif
     76 #endif
     77 
     78 #include <stdlib.h>
     79 
     80 #if _MSC_VER && !__clang__
     81 // MSVC's atomic intrinsics are weird and different, whereas the C++ standard atomics match the GCC
     82 // builtins -- except for requiring the obnoxious std::atomic<T> wrapper. So, on MSVC let's just
     83 // #define the builtins based on the C++ library, reinterpret-casting native types to
     84 // std::atomic... this is cheating but ugh, whatever.
     85 #include <atomic>
     86 template <typename T>
     87 static std::atomic<T>* reinterpretAtomic(T* ptr) { return reinterpret_cast<std::atomic<T>*>(ptr); }
     88 #define __atomic_store_n(ptr, val, order) \
     89     std::atomic_store_explicit(reinterpretAtomic(ptr), val, order)
     90 #define __atomic_load_n(ptr, order) \
     91     std::atomic_load_explicit(reinterpretAtomic(ptr), order)
     92 #define __atomic_compare_exchange_n(ptr, expected, desired, weak, succ, fail) \
     93     std::atomic_compare_exchange_strong_explicit( \
     94         reinterpretAtomic(ptr), expected, desired, succ, fail)
     95 #define __atomic_exchange_n(ptr, val, order) \
     96     std::atomic_exchange_explicit(reinterpretAtomic(ptr), val, order)
     97 #define __ATOMIC_RELAXED std::memory_order_relaxed
     98 #define __ATOMIC_ACQUIRE std::memory_order_acquire
     99 #define __ATOMIC_RELEASE std::memory_order_release
    100 #endif
    101 
    102 namespace kj {
    103 
    104 namespace {
    105 
    106 KJ_THREADLOCAL_PTR(EventLoop) threadLocalEventLoop = nullptr;
    107 
    108 #define _kJ_ALREADY_READY reinterpret_cast< ::kj::_::Event*>(1)
    109 
    110 EventLoop& currentEventLoop() {
    111   EventLoop* loop = threadLocalEventLoop;
    112   KJ_REQUIRE(loop != nullptr, "No event loop is running on this thread.");
    113   return *loop;
    114 }
    115 
    116 class RootEvent: public _::Event {
    117 public:
    118   RootEvent(_::PromiseNode* node, void* traceAddr): node(node), traceAddr(traceAddr) {}
    119 
    120   bool fired = false;
    121 
    122   Maybe<Own<_::Event>> fire() override {
    123     fired = true;
    124     return nullptr;
    125   }
    126 
    127   void traceEvent(_::TraceBuilder& builder) override {
    128     node->tracePromise(builder, true);
    129     builder.add(traceAddr);
    130   }
    131 
    132 private:
    133   _::PromiseNode* node;
    134   void* traceAddr;
    135 };
    136 
    137 struct DummyFunctor {
    138   void operator()() {};
    139 };
    140 
    141 class YieldPromiseNode final: public _::PromiseNode {
    142 public:
    143   void onReady(_::Event* event) noexcept override {
    144     if (event) event->armBreadthFirst();
    145   }
    146   void get(_::ExceptionOrValue& output) noexcept override {
    147     output.as<_::Void>() = _::Void();
    148   }
    149   void tracePromise(_::TraceBuilder& builder, bool stopAtNextEvent) override {
    150     builder.add(reinterpret_cast<void*>(&kj::evalLater<DummyFunctor>));
    151   }
    152 };
    153 
    154 class YieldHarderPromiseNode final: public _::PromiseNode {
    155 public:
    156   void onReady(_::Event* event) noexcept override {
    157     if (event) event->armLast();
    158   }
    159   void get(_::ExceptionOrValue& output) noexcept override {
    160     output.as<_::Void>() = _::Void();
    161   }
    162   void tracePromise(_::TraceBuilder& builder, bool stopAtNextEvent) override {
    163     builder.add(reinterpret_cast<void*>(&kj::evalLast<DummyFunctor>));
    164   }
    165 };
    166 
    167 class NeverDonePromiseNode final: public _::PromiseNode {
    168 public:
    169   void onReady(_::Event* event) noexcept override {
    170     // ignore
    171   }
    172   void get(_::ExceptionOrValue& output) noexcept override {
    173     KJ_FAIL_REQUIRE("Not ready.");
    174   }
    175   void tracePromise(_::TraceBuilder& builder, bool stopAtNextEvent) override {
    176     builder.add(_::getMethodStartAddress(kj::NEVER_DONE, &_::NeverDone::wait));
    177   }
    178 };
    179 
    180 }  // namespace
    181 
    182 // =======================================================================================
    183 
    184 void END_CANCELER_STACK_START_CANCELEE_STACK() {}
    185 // Dummy symbol used when reporting how a Canceler was canceled. We end up combining two stack
    186 // traces into one and we use this as a separator.
    187 
    188 Canceler::~Canceler() noexcept(false) {
    189   if (isEmpty()) return;
    190   cancel(getDestructionReason(
    191       reinterpret_cast<void*>(&END_CANCELER_STACK_START_CANCELEE_STACK),
    192       Exception::Type::DISCONNECTED, __FILE__, __LINE__, "operation canceled"_kj));
    193 }
    194 
    195 void Canceler::cancel(StringPtr cancelReason) {
    196   if (isEmpty()) return;
    197   // We can't use getDestructionReason() here because if an exception is in-flight, it would use
    198   // that exception, totally discarding the reason given by the caller. This would probably be
    199   // unexpected. The caller can always use getDestructionReason() themselves if desired.
    200   cancel(Exception(Exception::Type::DISCONNECTED, __FILE__, __LINE__, kj::str(cancelReason)));
    201 }
    202 
    203 void Canceler::cancel(const Exception& exception) {
    204   for (;;) {
    205     KJ_IF_MAYBE(a, list) {
    206       a->unlink();
    207       a->cancel(kj::cp(exception));
    208     } else {
    209       break;
    210     }
    211   }
    212 }
    213 
    214 void Canceler::release() {
    215   for (;;) {
    216     KJ_IF_MAYBE(a, list) {
    217       a->unlink();
    218     } else {
    219       break;
    220     }
    221   }
    222 }
    223 
    224 Canceler::AdapterBase::AdapterBase(Canceler& canceler)
    225     : prev(canceler.list),
    226       next(canceler.list) {
    227   canceler.list = *this;
    228   KJ_IF_MAYBE(n, next) {
    229     n->prev = next;
    230   }
    231 }
    232 
    233 Canceler::AdapterBase::~AdapterBase() noexcept(false) {
    234   unlink();
    235 }
    236 
    237 void Canceler::AdapterBase::unlink() {
    238   KJ_IF_MAYBE(p, prev) {
    239     *p = next;
    240   }
    241   KJ_IF_MAYBE(n, next) {
    242     n->prev = prev;
    243   }
    244   next = nullptr;
    245   prev = nullptr;
    246 }
    247 
    248 Canceler::AdapterImpl<void>::AdapterImpl(kj::PromiseFulfiller<void>& fulfiller,
    249             Canceler& canceler, kj::Promise<void> inner)
    250     : AdapterBase(canceler),
    251       fulfiller(fulfiller),
    252       inner(inner.then(
    253           [&fulfiller]() { fulfiller.fulfill(); },
    254           [&fulfiller](kj::Exception&& e) { fulfiller.reject(kj::mv(e)); })
    255           .eagerlyEvaluate(nullptr)) {}
    256 
    257 void Canceler::AdapterImpl<void>::cancel(kj::Exception&& e) {
    258   fulfiller.reject(kj::mv(e));
    259   inner = nullptr;
    260 }
    261 
    262 // =======================================================================================
    263 
    264 TaskSet::TaskSet(TaskSet::ErrorHandler& errorHandler)
    265   : errorHandler(errorHandler) {}
    266 class TaskSet::Task final: public _::Event {
    267 public:
    268   Task(TaskSet& taskSet, Own<_::PromiseNode>&& nodeParam)
    269       : taskSet(taskSet), node(kj::mv(nodeParam)) {
    270     node->setSelfPointer(&node);
    271     node->onReady(this);
    272   }
    273 
    274   Own<Task> pop() {
    275     KJ_IF_MAYBE(n, next) { n->get()->prev = prev; }
    276     Own<Task> self = kj::mv(KJ_ASSERT_NONNULL(*prev));
    277     KJ_ASSERT(self.get() == this);
    278     *prev = kj::mv(next);
    279     next = nullptr;
    280     prev = nullptr;
    281     return self;
    282   }
    283 
    284   Maybe<Own<Task>> next;
    285   Maybe<Own<Task>>* prev = nullptr;
    286 
    287   kj::String trace() {
    288     void* space[32];
    289     _::TraceBuilder builder(space);
    290     node->tracePromise(builder, false);
    291     return kj::str("task: ", builder);
    292   }
    293 
    294 protected:
    295   Maybe<Own<Event>> fire() override {
    296     // Get the result.
    297     _::ExceptionOr<_::Void> result;
    298     node->get(result);
    299 
    300     // Delete the node, catching any exceptions.
    301     KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
    302       node = nullptr;
    303     })) {
    304       result.addException(kj::mv(*exception));
    305     }
    306 
    307     // Call the error handler if there was an exception.
    308     KJ_IF_MAYBE(e, result.exception) {
    309       taskSet.errorHandler.taskFailed(kj::mv(*e));
    310     }
    311 
    312     // Remove from the task list.
    313     auto self = pop();
    314 
    315     KJ_IF_MAYBE(f, taskSet.emptyFulfiller) {
    316       if (taskSet.tasks == nullptr) {
    317         f->get()->fulfill();
    318         taskSet.emptyFulfiller = nullptr;
    319       }
    320     }
    321 
    322     return mv(self);
    323   }
    324 
    325   void traceEvent(_::TraceBuilder& builder) override {
    326     // Pointing out the ErrorHandler's taskFailed() implementation will usually identify the
    327     // particular TaskSet that contains this event.
    328     builder.add(_::getMethodStartAddress(taskSet.errorHandler, &ErrorHandler::taskFailed));
    329   }
    330 
    331 private:
    332   TaskSet& taskSet;
    333   Own<_::PromiseNode> node;
    334 };
    335 
    336 TaskSet::~TaskSet() noexcept(false) {
    337   // You could argue it is dubious, but some applications would like for the destructor of a
    338   // task to be able to schedule new tasks. So when we cancel our tasks... we might find new
    339   // tasks added! We'll have to repeatedly cancel. Additionally, we need to make sure that we destroy
    340   // the items in a loop to prevent any issues with stack overflow.
    341   while (tasks != nullptr) {
    342     auto removed = KJ_REQUIRE_NONNULL(tasks)->pop();
    343   }
    344 }
    345 
    346 void TaskSet::add(Promise<void>&& promise) {
    347   auto task = heap<Task>(*this, _::PromiseNode::from(kj::mv(promise)));
    348   KJ_IF_MAYBE(head, tasks) {
    349     head->get()->prev = &task->next;
    350     task->next = kj::mv(tasks);
    351   }
    352   task->prev = &tasks;
    353   tasks = kj::mv(task);
    354 }
    355 
    356 kj::String TaskSet::trace() {
    357   kj::Vector<kj::String> traces;
    358 
    359   Maybe<Own<Task>>* ptr = &tasks;
    360   for (;;) {
    361     KJ_IF_MAYBE(task, *ptr) {
    362       traces.add(task->get()->trace());
    363       ptr = &task->get()->next;
    364     } else {
    365       break;
    366     }
    367   }
    368 
    369   return kj::strArray(traces, "\n");
    370 }
    371 
    372 Promise<void> TaskSet::onEmpty() {
    373   KJ_IF_MAYBE(fulfiller, emptyFulfiller) {
    374     if (fulfiller->get()->isWaiting()) {
    375       KJ_FAIL_REQUIRE("onEmpty() can only be called once at a time");
    376     }
    377   }
    378 
    379   if (tasks == nullptr) {
    380     return READY_NOW;
    381   } else {
    382     auto paf = newPromiseAndFulfiller<void>();
    383     emptyFulfiller = kj::mv(paf.fulfiller);
    384     return kj::mv(paf.promise);
    385   }
    386 }
    387 
    388 // =======================================================================================
    389 
    390 namespace {
    391 
    392 #if _WIN32 || __CYGWIN__
    393 thread_local void* threadMainFiber = nullptr;
    394 
    395 void* getMainWin32Fiber() {
    396   return threadMainFiber;
    397 }
    398 #endif
    399 
    400 inline void ensureThreadCanRunFibers() {
    401 #if _WIN32 || __CYGWIN__
    402   // Make sure the current thread has been converted to a fiber.
    403   void* fiber = threadMainFiber;
    404   if (fiber == nullptr) {
    405     // Thread not initialized. Convert it to a fiber now.
    406     // Note: Unfortunately, if the application has already converted the thread to a fiber, I
    407     //   guess this will fail. But trying to call GetCurrentFiber() when the thread isn't a fiber
    408     //   doesn't work (it returns null on WINE but not on real windows, ugh). So I guess we're
    409     //   just incompatible with the application doing anything with fibers, which is sad.
    410     threadMainFiber = fiber = ConvertThreadToFiber(nullptr);
    411   }
    412 #endif
    413 }
    414 
    415 }  // namespace
    416 
    417 namespace _ {
    418 
    419 class FiberStack final {
    420   // A class containing a fiber stack impl. This is separate from fiber
    421   // promises since it lets us move the stack itself around and reuse it.
    422 
    423 public:
    424   FiberStack(size_t stackSize);
    425   ~FiberStack() noexcept(false);
    426 
    427   struct SynchronousFunc {
    428     kj::FunctionParam<void()>& func;
    429     kj::Maybe<kj::Exception> exception;
    430   };
    431 
    432   void initialize(FiberBase& fiber);
    433   void initialize(SynchronousFunc& syncFunc);
    434 
    435   void reset() {
    436     main = {};
    437   }
    438 
    439   void switchToFiber();
    440   void switchToMain();
    441 
    442   void trace(TraceBuilder& builder) {
    443     // TODO(someday): Trace through fiber stack? Can it be done???
    444     builder.add(getMethodStartAddress(*this, &FiberStack::trace));
    445   }
    446 
    447 private:
    448   size_t stackSize;
    449   OneOf<FiberBase*, SynchronousFunc*> main;
    450 
    451   friend class FiberBase;
    452   friend class FiberPool::Impl;
    453 
    454   struct StartRoutine;
    455 
    456 #if KJ_USE_FIBERS
    457 #if _WIN32 || __CYGWIN__
    458   void* osFiber;
    459 #else
    460   struct Impl;
    461   Impl* impl;
    462 #endif
    463 #endif
    464 
    465   [[noreturn]] void run();
    466 
    467   bool isReset() { return main == nullptr; }
    468 };
    469 
    470 }  // namespace _
    471 
    472 #if __linux__
    473 // TODO(someday): Support core-local freelists on OSs other than Linux. The only tricky part is
    474 //   finding what to use instead of sched_getcpu() to get the current CPU ID.
    475 #define USE_CORE_LOCAL_FREELISTS 1
    476 #endif
    477 
    478 #if USE_CORE_LOCAL_FREELISTS
    479 static const size_t CACHE_LINE_SIZE = 64;
    480 // Most modern architectures have 64-byte cache lines.
    481 #endif
    482 
    483 class FiberPool::Impl final: private Disposer {
    484 public:
    485   Impl(size_t stackSize): stackSize(stackSize) {}
    486   ~Impl() noexcept(false) {
    487 #if USE_CORE_LOCAL_FREELISTS
    488     if (coreLocalFreelists != nullptr) {
    489       KJ_DEFER(free(coreLocalFreelists));
    490 
    491       for (uint i: kj::zeroTo(nproc)) {
    492         for (auto stack: coreLocalFreelists[i].stacks) {
    493           if (stack != nullptr) {
    494             delete stack;
    495           }
    496         }
    497       }
    498     }
    499 #endif
    500 
    501     // Make sure we're not leaking anything from the global freelist either.
    502     auto lock = freelist.lockExclusive();
    503     auto dangling = kj::mv(*lock);
    504     for (auto& stack: dangling) {
    505       delete stack;
    506     }
    507   }
    508 
    509   void setMaxFreelist(size_t count) {
    510     maxFreelist = count;
    511   }
    512 
    513   size_t getFreelistSize() const {
    514     return freelist.lockShared()->size();
    515   }
    516 
    517   void useCoreLocalFreelists() {
    518 #if USE_CORE_LOCAL_FREELISTS
    519     if (coreLocalFreelists != nullptr) {
    520       // Ignore repeat call.
    521       return;
    522     }
    523 
    524     int nproc_;
    525     KJ_SYSCALL(nproc_ = sysconf(_SC_NPROCESSORS_CONF));
    526     nproc = nproc_;
    527 
    528     void* allocPtr;
    529     size_t totalSize = nproc * sizeof(CoreLocalFreelist);
    530     int error = posix_memalign(&allocPtr, CACHE_LINE_SIZE, totalSize);
    531     if (error != 0) {
    532       KJ_FAIL_SYSCALL("posix_memalign", error);
    533     }
    534     memset(allocPtr, 0, totalSize);
    535     coreLocalFreelists = reinterpret_cast<CoreLocalFreelist*>(allocPtr);
    536 #endif
    537   }
    538 
    539   Own<_::FiberStack> takeStack() const {
    540     // Get a stack from the pool. The disposer on the returned Own pointer will return the stack
    541     // to the pool, provided that reset() has been called to indicate that the stack is not in
    542     // a weird state.
    543 
    544 #if USE_CORE_LOCAL_FREELISTS
    545     KJ_IF_MAYBE(core, lookupCoreLocalFreelist()) {
    546       for (auto& stackPtr: core->stacks) {
    547         _::FiberStack* result = __atomic_exchange_n(&stackPtr, nullptr, __ATOMIC_ACQUIRE);
    548         if (result != nullptr) {
    549           // Found a stack in this slot!
    550           return { result, *this };
    551         }
    552       }
    553       // No stacks found, fall back to global freelist.
    554     }
    555 #endif
    556 
    557     {
    558       auto lock = freelist.lockExclusive();
    559       if (!lock->empty()) {
    560         _::FiberStack* result = lock->back();
    561         lock->pop_back();
    562         return { result, *this };
    563       }
    564     }
    565 
    566     _::FiberStack* result = new _::FiberStack(stackSize);
    567     return { result, *this };
    568   }
    569 
    570 private:
    571   size_t stackSize;
    572   size_t maxFreelist = kj::maxValue;
    573   MutexGuarded<std::deque<_::FiberStack*>> freelist;
    574 
    575 #if USE_CORE_LOCAL_FREELISTS
    576   struct CoreLocalFreelist {
    577     union {
    578       _::FiberStack* stacks[2];
    579       // For now, we don't try to freelist more than 2 stacks per core. If you have three or more
    580       // threads interleaved on a core, chances are you have bigger problems...
    581 
    582       byte padToCacheLine[CACHE_LINE_SIZE];
    583       // We don't want two core-local freelists to live in the same cache line, otherwise the
    584       // cores will fight over ownership of that line.
    585     };
    586   };
    587 
    588   uint nproc;
    589   CoreLocalFreelist* coreLocalFreelists = nullptr;
    590 
    591   kj::Maybe<CoreLocalFreelist&> lookupCoreLocalFreelist() const {
    592     if (coreLocalFreelists == nullptr) {
    593       return nullptr;
    594     } else {
    595       int cpu = sched_getcpu();
    596       if (cpu >= 0) {
    597         // TODO(perf): Perhaps two hyperthreads on the same physical core should share a freelist?
    598         //   But I don't know how to find out if the system uses hyperthreading.
    599         return coreLocalFreelists[cpu];
    600       } else {
    601         static bool logged = false;
    602         if (!logged) {
    603           KJ_LOG(ERROR, "invalid cpu number from sched_getcpu()?", cpu, nproc);
    604           logged = true;
    605         }
    606         return nullptr;
    607       }
    608     }
    609   }
    610 #endif
    611 
    612   void disposeImpl(void* pointer) const {
    613     _::FiberStack* stack = reinterpret_cast<_::FiberStack*>(pointer);
    614     KJ_DEFER(delete stack);
    615 
    616     // Verify that the stack was reset before returning, otherwise it might be in a weird state
    617     // where we don't want to reuse it.
    618     if (stack->isReset()) {
    619 #if USE_CORE_LOCAL_FREELISTS
    620       KJ_IF_MAYBE(core, lookupCoreLocalFreelist()) {
    621         for (auto& stackPtr: core->stacks) {
    622           stack = __atomic_exchange_n(&stackPtr, stack, __ATOMIC_RELEASE);
    623           if (stack == nullptr) {
    624             // Cool, we inserted the stack into an unused slot. We're done.
    625             return;
    626           }
    627         }
    628         // All slots were occupied, so we inserted the new stack in the front, pushed the rest back,
    629         // and now `stack` refers to the stack that fell off the end of the core-local list. That
    630         // needs to go into the global freelist.
    631       }
    632 #endif
    633 
    634       auto lock = freelist.lockExclusive();
    635       lock->push_back(stack);
    636       if (lock->size() > maxFreelist) {
    637         stack = lock->front();
    638         lock->pop_front();
    639       } else {
    640         stack = nullptr;
    641       }
    642     }
    643   }
    644 };
    645 
    646 FiberPool::FiberPool(size_t stackSize) : impl(kj::heap<FiberPool::Impl>(stackSize)) {}
    647 FiberPool::~FiberPool() noexcept(false) {}
    648 
    649 void FiberPool::setMaxFreelist(size_t count) {
    650   impl->setMaxFreelist(count);
    651 }
    652 
    653 size_t FiberPool::getFreelistSize() const {
    654   return impl->getFreelistSize();
    655 }
    656 
    657 void FiberPool::useCoreLocalFreelists() {
    658   impl->useCoreLocalFreelists();
    659 }
    660 
    661 void FiberPool::runSynchronously(kj::FunctionParam<void()> func) const {
    662   ensureThreadCanRunFibers();
    663 
    664   _::FiberStack::SynchronousFunc syncFunc { func, nullptr };
    665 
    666   {
    667     auto stack = impl->takeStack();
    668     stack->initialize(syncFunc);
    669     stack->switchToFiber();
    670     stack->reset();  // safe to reuse
    671   }
    672 
    673   KJ_IF_MAYBE(e, syncFunc.exception) {
    674     kj::throwRecoverableException(kj::mv(*e));
    675   }
    676 }
    677 
    678 namespace _ {  // private
    679 
    680 class LoggingErrorHandler: public TaskSet::ErrorHandler {
    681 public:
    682   static LoggingErrorHandler instance;
    683 
    684   void taskFailed(kj::Exception&& exception) override {
    685     KJ_LOG(ERROR, "Uncaught exception in daemonized task.", exception);
    686   }
    687 };
    688 
    689 LoggingErrorHandler LoggingErrorHandler::instance = LoggingErrorHandler();
    690 
    691 }  // namespace _ (private)
    692 
    693 // =======================================================================================
    694 
    695 struct Executor::Impl {
    696   Impl(EventLoop& loop): state(loop) {}
    697 
    698   struct State {
    699     // Queues of notifications from other threads that need this thread's attention.
    700 
    701     State(EventLoop& loop): loop(loop) {}
    702 
    703     kj::Maybe<EventLoop&> loop;
    704     // Becomes null when the loop is destroyed.
    705 
    706     List<_::XThreadEvent, &_::XThreadEvent::targetLink> start;
    707     List<_::XThreadEvent, &_::XThreadEvent::targetLink> cancel;
    708     List<_::XThreadEvent, &_::XThreadEvent::replyLink> replies;
    709     // Lists of events that need actioning by this thread.
    710 
    711     List<_::XThreadEvent, &_::XThreadEvent::targetLink> executing;
    712     // Events that have already been dispatched and are happily executing. This list is maintained
    713     // so that they can be canceled if the event loop exits.
    714 
    715     List<_::XThreadPaf, &_::XThreadPaf::link> fulfilled;
    716     // Set of XThreadPafs that have been fulfilled by another thread.
    717 
    718     bool waitingForCancel = false;
    719     // True if this thread is currently blocked waiting for some other thread to pump its
    720     // cancellation queue. If that other thread tries to block on *this* thread, then it could
    721     // deadlock -- it must take precautions against this.
    722 
    723     bool isDispatchNeeded() const {
    724       return !start.empty() || !cancel.empty() || !replies.empty() || !fulfilled.empty();
    725     }
    726 
    727     void dispatchAll(Vector<_::XThreadEvent*>& eventsToCancelOutsideLock) {
    728       for (auto& event: start) {
    729         start.remove(event);
    730         executing.add(event);
    731         event.state = _::XThreadEvent::EXECUTING;
    732         event.armBreadthFirst();
    733       }
    734 
    735       dispatchCancels(eventsToCancelOutsideLock);
    736 
    737       for (auto& event: replies) {
    738         replies.remove(event);
    739         event.onReadyEvent.armBreadthFirst();
    740       }
    741 
    742       for (auto& event: fulfilled) {
    743         fulfilled.remove(event);
    744         event.state = _::XThreadPaf::DISPATCHED;
    745         event.onReadyEvent.armBreadthFirst();
    746       }
    747     }
    748 
    749     void dispatchCancels(Vector<_::XThreadEvent*>& eventsToCancelOutsideLock) {
    750       for (auto& event: cancel) {
    751         cancel.remove(event);
    752 
    753         if (event.promiseNode == nullptr) {
    754           event.setDoneState();
    755         } else {
    756           // We can't destroy the promiseNode while the mutex is locked, because we don't know
    757           // what the destructor might do. But, we *must* destroy it before acknowledging
    758           // cancellation. So we have to add it to a list to destroy later.
    759           eventsToCancelOutsideLock.add(&event);
    760         }
    761       }
    762     }
    763   };
    764 
    765   kj::MutexGuarded<State> state;
    766   // After modifying state from another thread, the loop's port.wake() must be called.
    767 
    768   void processAsyncCancellations(Vector<_::XThreadEvent*>& eventsToCancelOutsideLock) {
    769     // After calling dispatchAll() or dispatchCancels() with the lock held, it may be that some
    770     // cancellations require dropping the lock before destroying the promiseNode. In that case
    771     // those cancellations will be added to the eventsToCancelOutsideLock Vector passed to the
    772     // method. That vector must then be passed to processAsyncCancellations() as soon as the lock
    773     // is released.
    774 
    775     for (auto& event: eventsToCancelOutsideLock) {
    776       event->promiseNode = nullptr;
    777       event->disarm();
    778     }
    779 
    780     // Now we need to mark all the events "done" under lock.
    781     auto lock = state.lockExclusive();
    782     for (auto& event: eventsToCancelOutsideLock) {
    783       event->setDoneState();
    784     }
    785   }
    786 
    787   void disconnect() {
    788     state.lockExclusive()->loop = nullptr;
    789 
    790     // Now that `loop` is set null in `state`, other threads will no longer try to manipulate our
    791     // lists, so we can access them without a lock. That's convenient because a bunch of the things
    792     // we want to do with them would require dropping the lock to avoid deadlocks. We'd end up
    793     // copying all the lists over into separate vectors first, dropping the lock, operating on
    794     // them, and then locking again.
    795     auto& s = state.getWithoutLock();
    796 
    797     // We do, however, take and release the lock on the way out, to make sure anyone performing
    798     // a conditional wait for state changes gets a chance to have their wait condition re-checked.
    799     KJ_DEFER(state.lockExclusive());
    800 
    801     for (auto& event: s.start) {
    802       KJ_ASSERT(event.state == _::XThreadEvent::QUEUED, event.state) { break; }
    803       s.start.remove(event);
    804       event.setDisconnected();
    805       event.sendReply();
    806       event.setDoneState();
    807     }
    808 
    809     for (auto& event: s.executing) {
    810       KJ_ASSERT(event.state == _::XThreadEvent::EXECUTING, event.state) { break; }
    811       s.executing.remove(event);
    812       event.promiseNode = nullptr;
    813       event.setDisconnected();
    814       event.sendReply();
    815       event.setDoneState();
    816     }
    817 
    818     for (auto& event: s.cancel) {
    819       KJ_ASSERT(event.state == _::XThreadEvent::CANCELING, event.state) { break; }
    820       s.cancel.remove(event);
    821       event.promiseNode = nullptr;
    822       event.setDoneState();
    823     }
    824 
    825     // The replies list "should" be empty, because any locally-initiated tasks should have been
    826     // canceled before destroying the EventLoop.
    827     if (!s.replies.empty()) {
    828       KJ_LOG(ERROR, "EventLoop destroyed with cross-thread event replies outstanding");
    829       for (auto& event: s.replies) {
    830         s.replies.remove(event);
    831       }
    832     }
    833 
    834     // Similarly for cross-thread fulfillers. The waiting tasks should have been canceled.
    835     if (!s.fulfilled.empty()) {
    836       KJ_LOG(ERROR, "EventLoop destroyed with cross-thread fulfiller replies outstanding");
    837       for (auto& event: s.fulfilled) {
    838         s.fulfilled.remove(event);
    839         event.state = _::XThreadPaf::DISPATCHED;
    840       }
    841     }
    842   }};
    843 
    844 namespace _ {  // (private)
    845 
    846 XThreadEvent::XThreadEvent(
    847     ExceptionOrValue& result, const Executor& targetExecutor, void* funcTracePtr)
    848     : Event(targetExecutor.getLoop()), result(result), funcTracePtr(funcTracePtr),
    849       targetExecutor(targetExecutor.addRef()) {}
    850 
    851 void XThreadEvent::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
    852   // We can't safely trace into another thread, so we'll stop here.
    853   builder.add(funcTracePtr);
    854 }
    855 
    856 void XThreadEvent::ensureDoneOrCanceled() {
    857   if (__atomic_load_n(&state, __ATOMIC_ACQUIRE) != DONE) {
    858     auto lock = targetExecutor->impl->state.lockExclusive();
    859 
    860     const EventLoop* loop;
    861     KJ_IF_MAYBE(l, lock->loop) {
    862       loop = l;
    863     } else {
    864       // Target event loop is already dead, so we know it's already working on transitioning all
    865       // events to the DONE state. We can just wait.
    866       lock.wait([&](auto&) { return state == DONE; });
    867       return;
    868     }
    869 
    870     switch (state) {
    871       case UNUSED:
    872         // Nothing to do.
    873         break;
    874       case QUEUED:
    875         lock->start.remove(*this);
    876         // No wake needed since we removed work rather than adding it.
    877         state = DONE;
    878         break;
    879       case EXECUTING: {
    880         lock->executing.remove(*this);
    881         lock->cancel.add(*this);
    882         state = CANCELING;
    883         KJ_IF_MAYBE(p, loop->port) {
    884           p->wake();
    885         }
    886 
    887         Maybe<Executor&> maybeSelfExecutor = nullptr;
    888         if (threadLocalEventLoop != nullptr) {
    889           KJ_IF_MAYBE(e, threadLocalEventLoop->executor) {
    890             maybeSelfExecutor = **e;
    891           }
    892         }
    893 
    894         KJ_IF_MAYBE(selfExecutor, maybeSelfExecutor) {
    895           // If, while waiting for other threads to process our cancellation request, we have
    896           // cancellation requests queued back to this thread, we must process them. Otherwise,
    897           // we could deadlock with two threads waiting on each other to process cancellations.
    898           //
    899           // We don't have a terribly good way to detect this, except to check if the remote
    900           // thread is itself waiting for cancellations and, if so, wake ourselves up to check for
    901           // cancellations to process. This will busy-loop but at least it should eventually
    902           // resolve assuming fair scheduling.
    903           //
    904           // To make things extra-annoying, in order to update our waitingForCancel flag, we have
    905           // to lock our own executor state, but we can't take both locks at once, so we have to
    906           // release the other lock in the meantime.
    907 
    908           // Make sure we unset waitingForCancel on the way out.
    909           KJ_DEFER({
    910             lock = {};
    911 
    912             Vector<_::XThreadEvent*> eventsToCancelOutsideLock;
    913             KJ_DEFER(selfExecutor->impl->processAsyncCancellations(eventsToCancelOutsideLock));
    914 
    915             auto selfLock = selfExecutor->impl->state.lockExclusive();
    916             selfLock->waitingForCancel = false;
    917             selfLock->dispatchCancels(eventsToCancelOutsideLock);
    918 
    919             // We don't need to re-take the lock on the other executor here; it's not used again
    920             // after this scope.
    921           });
    922 
    923           while (state != DONE) {
    924             bool otherThreadIsWaiting = lock->waitingForCancel;
    925 
    926             // Make sure our waitingForCancel is on and dispatch any pending cancellations on this
    927             // thread.
    928             lock = {};
    929             {
    930               Vector<_::XThreadEvent*> eventsToCancelOutsideLock;
    931               KJ_DEFER(selfExecutor->impl->processAsyncCancellations(eventsToCancelOutsideLock));
    932 
    933               auto selfLock = selfExecutor->impl->state.lockExclusive();
    934               selfLock->waitingForCancel = true;
    935 
    936               // Note that we don't have to proactively delete the PromiseNodes extracted from
    937               // the canceled events because those nodes belong to this thread and can't possibly
    938               // continue executing while we're blocked here.
    939               selfLock->dispatchCancels(eventsToCancelOutsideLock);
    940             }
    941 
    942             if (otherThreadIsWaiting) {
    943               // We know the other thread was waiting for cancellations to complete a moment ago.
    944               // We may have just processed the necessary cancellations in this thread, in which
    945               // case the other thread needs a chance to receive control and notice this. Or, it
    946               // may be that the other thread is waiting for some third thread to take action.
    947               // Either way, we should yield control here to give things a chance to settle.
    948               // Otherwise we could end up in a tight busy loop.
    949 #if _WIN32
    950               Sleep(0);
    951 #else
    952               sched_yield();
    953 #endif
    954             }
    955 
    956             // OK now we can take the original lock again.
    957             lock = targetExecutor->impl->state.lockExclusive();
    958 
    959             // OK, now we can wait for the other thread to either process our cancellation or
    960             // indicate that it is waiting for remote cancellation.
    961             lock.wait([&](const Executor::Impl::State& executorState) {
    962               return state == DONE || executorState.waitingForCancel;
    963             });
    964           }
    965         } else {
    966           // We have no executor of our own so we don't have to worry about cancellation cycles
    967           // causing deadlock.
    968           //
    969           // NOTE: I don't think we can actually get here, because it implies that this is a
    970           //   synchronous execution, which means there's no way to cancel it.
    971           lock.wait([&](auto&) { return state == DONE; });
    972         }
    973         KJ_DASSERT(!targetLink.isLinked());
    974         break;
    975       }
    976       case CANCELING:
    977         KJ_FAIL_ASSERT("impossible state: CANCELING should only be set within the above case");
    978       case DONE:
    979         // Became done while we waited for lock. Nothing to do.
    980         break;
    981     }
    982   }
    983 
    984   KJ_IF_MAYBE(e, replyExecutor) {
    985     // Since we know we reached the DONE state (or never left UNUSED), we know that the remote
    986     // thread is all done playing with our `replyPrev` pointer. Only the current thread could
    987     // possibly modify it after this point. So we can skip the lock if it's already null.
    988     if (replyLink.isLinked()) {
    989       auto lock = e->impl->state.lockExclusive();
    990       lock->replies.remove(*this);
    991     }
    992   }
    993 }
    994 
    995 void XThreadEvent::sendReply() {
    996   KJ_IF_MAYBE(e, replyExecutor) {
    997     // Queue the reply.
    998     const EventLoop* replyLoop;
    999     {
   1000       auto lock = e->impl->state.lockExclusive();
   1001       KJ_IF_MAYBE(l, lock->loop) {
   1002         lock->replies.add(*this);
   1003         replyLoop = l;
   1004       } else {
   1005         // Calling thread exited without cancelling the promise. This is UB. In fact,
   1006         // `replyExecutor` is probably already destroyed and we are in use-after-free territory
   1007         // already. Better abort.
   1008         KJ_LOG(FATAL,
   1009             "the thread which called kj::Executor::executeAsync() apparently exited its own "
   1010             "event loop without canceling the cross-thread promise first; this is undefined "
   1011             "behavior so I will crash now");
   1012         abort();
   1013       }
   1014     }
   1015 
   1016     // Note that it's safe to assume `replyLoop` still exists even though we dropped the lock
   1017     // because that thread would have had to cancel any promises before destroying its own
   1018     // EventLoop, and when it tries to destroy this promise, it will wait for `state` to become
   1019     // `DONE`, which we don't set until later on. That's nice because wake() probably makes a
   1020     // syscall and we'd rather not hold the lock through syscalls.
   1021     KJ_IF_MAYBE(p, replyLoop->port) {
   1022       p->wake();
   1023     }
   1024   }
   1025 }
   1026 
   1027 void XThreadEvent::done() {
   1028   KJ_ASSERT(targetExecutor.get() == &currentEventLoop().getExecutor(),
   1029       "calling done() from wrong thread?");
   1030 
   1031   sendReply();
   1032 
   1033   {
   1034     auto lock = targetExecutor->impl->state.lockExclusive();
   1035 
   1036     switch (state) {
   1037       case EXECUTING:
   1038         lock->executing.remove(*this);
   1039         break;
   1040       case CANCELING:
   1041         // Sending thread requested cancelation, but we're done anyway, so it doesn't matter at this
   1042         // point.
   1043         lock->cancel.remove(*this);
   1044         break;
   1045       default:
   1046         KJ_FAIL_ASSERT("can't call done() from this state", (uint)state);
   1047     }
   1048 
   1049     setDoneState();
   1050   }
   1051 }
   1052 
   1053 inline void XThreadEvent::setDoneState() {
   1054   __atomic_store_n(&state, DONE, __ATOMIC_RELEASE);
   1055 }
   1056 
   1057 void XThreadEvent::setDisconnected() {
   1058   result.addException(KJ_EXCEPTION(DISCONNECTED,
   1059       "Executor's event loop exited before cross-thread event could complete"));
   1060 }
   1061 
   1062 class XThreadEvent::DelayedDoneHack: public Disposer {
   1063   // Crazy hack: In fire(), we want to call done() if the event is finished. But done() signals
   1064   // the requesting thread to wake up and possibly delete the XThreadEvent. But the caller (the
   1065   // EventLoop) still has to set `event->firing = false` after `fire()` returns, so this would be
   1066   // a race condition use-after-free.
   1067   //
   1068   // It just so happens, though, that fire() is allowed to return an optional `Own<Event>` to drop,
   1069   // and the caller drops that pointer immediately after setting event->firing = false. So we
   1070   // return a pointer whose disposer calls done().
   1071   //
   1072   // It's not quite as much of a hack as it seems: The whole reason fire() returns an Own<Event> is
   1073   // so that the event can delete itself, but do so after the caller sets event->firing = false.
   1074   // It just happens to be that in this case, the event isn't deleting itself, but rather releasing
   1075   // itself back to the other thread.
   1076 
   1077 protected:
   1078   void disposeImpl(void* pointer) const override {
   1079     reinterpret_cast<XThreadEvent*>(pointer)->done();
   1080   }
   1081 };
   1082 
   1083 Maybe<Own<Event>> XThreadEvent::fire() {
   1084   static constexpr DelayedDoneHack DISPOSER {};
   1085 
   1086   KJ_IF_MAYBE(n, promiseNode) {
   1087     n->get()->get(result);
   1088     promiseNode = nullptr;  // make sure to destroy in the thread that created it
   1089     return Own<Event>(this, DISPOSER);
   1090   } else {
   1091     KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
   1092       promiseNode = execute();
   1093     })) {
   1094       result.addException(kj::mv(*exception));
   1095     };
   1096     KJ_IF_MAYBE(n, promiseNode) {
   1097       n->get()->onReady(this);
   1098     } else {
   1099       return Own<Event>(this, DISPOSER);
   1100     }
   1101   }
   1102 
   1103   return nullptr;
   1104 }
   1105 
   1106 void XThreadEvent::traceEvent(TraceBuilder& builder) {
   1107   KJ_IF_MAYBE(n, promiseNode) {
   1108     n->get()->tracePromise(builder, true);
   1109   }
   1110 
   1111   // We can't safely trace into another thread, so we'll stop here.
   1112   builder.add(funcTracePtr);
   1113 }
   1114 
   1115 void XThreadEvent::onReady(Event* event) noexcept {
   1116   onReadyEvent.init(event);
   1117 }
   1118 
   1119 XThreadPaf::XThreadPaf()
   1120     : state(WAITING), executor(getCurrentThreadExecutor()) {}
   1121 XThreadPaf::~XThreadPaf() noexcept(false) {}
   1122 
   1123 void XThreadPaf::Disposer::disposeImpl(void* pointer) const {
   1124   XThreadPaf* obj = reinterpret_cast<XThreadPaf*>(pointer);
   1125   auto oldState = WAITING;
   1126 
   1127   if (__atomic_load_n(&obj->state, __ATOMIC_ACQUIRE) == DISPATCHED) {
   1128     // Common case: Promise was fully fulfilled and dispatched, no need for locking.
   1129     delete obj;
   1130   } else if (__atomic_compare_exchange_n(&obj->state, &oldState, CANCELED, false,
   1131                                          __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) {
   1132     // State transitioned from WAITING to CANCELED, so now it's the fulfiller's job to destroy the
   1133     // object.
   1134   } else {
   1135     // Whoops, another thread is already in the process of fulfilling this promise. We'll have to
   1136     // wait for it to finish and transition the state to FULFILLED.
   1137     obj->executor.impl->state.when([&](auto&) {
   1138       return obj->state == FULFILLED || obj->state == DISPATCHED;
   1139     }, [&](Executor::Impl::State& exState) {
   1140       if (obj->state == FULFILLED) {
   1141         // The object is on the queue but was not yet dispatched. Remove it.
   1142         exState.fulfilled.remove(*obj);
   1143       }
   1144     });
   1145 
   1146     // It's ours now, delete it.
   1147     delete obj;
   1148   }
   1149 }
   1150 
   1151 const XThreadPaf::Disposer XThreadPaf::DISPOSER;
   1152 
   1153 void XThreadPaf::onReady(Event* event) noexcept {
   1154   onReadyEvent.init(event);
   1155 }
   1156 
   1157 void XThreadPaf::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
   1158   // We can't safely trace into another thread, so we'll stop here.
   1159   // Maybe returning the address of get() will give us a function name with meaningful type
   1160   // information.
   1161   builder.add(getMethodStartAddress(implicitCast<PromiseNode&>(*this), &PromiseNode::get));
   1162 }
   1163 
   1164 XThreadPaf::FulfillScope::FulfillScope(XThreadPaf** pointer) {
   1165   obj = __atomic_exchange_n(pointer, static_cast<XThreadPaf*>(nullptr), __ATOMIC_ACQUIRE);
   1166   auto oldState = WAITING;
   1167   if (obj == nullptr) {
   1168     // Already fulfilled (possibly by another thread).
   1169   } else if (__atomic_compare_exchange_n(&obj->state, &oldState, FULFILLING, false,
   1170                                          __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) {
   1171     // Transitioned to FULFILLING, good.
   1172   } else {
   1173     // The waiting thread must have canceled.
   1174     KJ_ASSERT(oldState == CANCELED);
   1175 
   1176     // It's our responsibility to clean up, then.
   1177     delete obj;
   1178 
   1179     // Set `obj` null so that we don't try to fill it in or delete it later.
   1180     obj = nullptr;
   1181   }
   1182 }
   1183 XThreadPaf::FulfillScope::~FulfillScope() noexcept(false) {
   1184   if (obj != nullptr) {
   1185     auto lock = obj->executor.impl->state.lockExclusive();
   1186     KJ_IF_MAYBE(l, lock->loop) {
   1187       lock->fulfilled.add(*obj);
   1188       __atomic_store_n(&obj->state, FULFILLED, __ATOMIC_RELEASE);
   1189       KJ_IF_MAYBE(p, l->port) {
   1190         // TODO(perf): It's annoying we have to call wake() with the lock held, but we have to
   1191         //   prevent the destination EventLoop from being destroyed first.
   1192         p->wake();
   1193       }
   1194     } else {
   1195       KJ_LOG(FATAL,
   1196           "the thread which called kj::newPromiseAndCrossThreadFulfiller<T>() apparently exited "
   1197           "its own event loop without canceling the cross-thread promise first; this is "
   1198           "undefined behavior so I will crash now");
   1199       abort();
   1200     }
   1201   }
   1202 }
   1203 
   1204 kj::Exception XThreadPaf::unfulfilledException() {
   1205   // TODO(cleanup): Share code with regular PromiseAndFulfiller for stack tracing here.
   1206   return kj::Exception(kj::Exception::Type::FAILED, __FILE__, __LINE__, kj::heapString(
   1207       "cross-thread PromiseFulfiller was destroyed without fulfilling the promise."));
   1208 }
   1209 
   1210 class ExecutorImpl: public Executor, public AtomicRefcounted {
   1211 public:
   1212   using Executor::Executor;
   1213 
   1214   kj::Own<const Executor> addRef() const override {
   1215     return kj::atomicAddRef(*this);
   1216   }
   1217 };
   1218 
   1219 }  // namespace _
   1220 
   1221 Executor::Executor(EventLoop& loop, Badge<EventLoop>): impl(kj::heap<Impl>(loop)) {}
   1222 Executor::~Executor() noexcept(false) {}
   1223 
   1224 bool Executor::isLive() const {
   1225   return impl->state.lockShared()->loop != nullptr;
   1226 }
   1227 
   1228 void Executor::send(_::XThreadEvent& event, bool sync) const {
   1229   KJ_ASSERT(event.state == _::XThreadEvent::UNUSED);
   1230 
   1231   if (sync) {
   1232     EventLoop* thisThread = threadLocalEventLoop;
   1233     if (thisThread != nullptr &&
   1234         thisThread->executor.map([this](auto& e) { return e == this; }).orDefault(false)) {
   1235       // Invoking a sync request on our own thread. Just execute it directly; if we try to queue
   1236       // it to the loop, we'll deadlock.
   1237       auto promiseNode = event.execute();
   1238 
   1239       // If the function returns a promise, we have no way to pump the event loop to wait for it,
   1240       // because the event loop may already be pumping somewhere up the stack.
   1241       KJ_ASSERT(promiseNode == nullptr,
   1242           "can't call executeSync() on own thread's executor with a promise-returning function");
   1243 
   1244       return;
   1245     }
   1246   } else {
   1247     event.replyExecutor = getCurrentThreadExecutor();
   1248 
   1249     // Note that async requests will "just work" even if the target executor is our own thread's
   1250     // executor. In theory we could detect this case to avoid some locking and signals but that
   1251     // would be extra code complexity for probably little benefit.
   1252   }
   1253 
   1254   auto lock = impl->state.lockExclusive();
   1255   const EventLoop* loop;
   1256   KJ_IF_MAYBE(l, lock->loop) {
   1257     loop = l;
   1258   } else {
   1259     event.setDisconnected();
   1260     return;
   1261   }
   1262 
   1263   event.state = _::XThreadEvent::QUEUED;
   1264   lock->start.add(event);
   1265 
   1266   KJ_IF_MAYBE(p, loop->port) {
   1267     p->wake();
   1268   } else {
   1269     // Event loop will be waiting on executor.wait(), which will be woken when we unlock the mutex.
   1270   }
   1271 
   1272   if (sync) {
   1273     lock.wait([&](auto&) { return event.state == _::XThreadEvent::DONE; });
   1274   }
   1275 }
   1276 
   1277 void Executor::wait() {
   1278   Vector<_::XThreadEvent*> eventsToCancelOutsideLock;
   1279   KJ_DEFER(impl->processAsyncCancellations(eventsToCancelOutsideLock));
   1280 
   1281   auto lock = impl->state.lockExclusive();
   1282 
   1283   lock.wait([](const Impl::State& state) {
   1284     return state.isDispatchNeeded();
   1285   });
   1286 
   1287   lock->dispatchAll(eventsToCancelOutsideLock);
   1288 }
   1289 
   1290 bool Executor::poll() {
   1291   Vector<_::XThreadEvent*> eventsToCancelOutsideLock;
   1292   KJ_DEFER(impl->processAsyncCancellations(eventsToCancelOutsideLock));
   1293 
   1294   auto lock = impl->state.lockExclusive();
   1295   if (lock->isDispatchNeeded()) {
   1296     lock->dispatchAll(eventsToCancelOutsideLock);
   1297     return true;
   1298   } else {
   1299     return false;
   1300   }
   1301 }
   1302 
   1303 EventLoop& Executor::getLoop() const {
   1304   KJ_IF_MAYBE(l, impl->state.lockShared()->loop) {
   1305     return *l;
   1306   } else {
   1307     kj::throwFatalException(KJ_EXCEPTION(DISCONNECTED, "Executor's event loop has exited"));
   1308   }
   1309 }
   1310 
   1311 const Executor& getCurrentThreadExecutor() {
   1312   return currentEventLoop().getExecutor();
   1313 }
   1314 
   1315 // =======================================================================================
   1316 // Fiber implementation.
   1317 
   1318 namespace _ {  // private
   1319 
   1320 #if KJ_USE_FIBERS
   1321 #if !(_WIN32 || __CYGWIN__)
   1322 struct FiberStack::Impl {
   1323   // This struct serves two purposes:
   1324   // - It contains OS-specific state that we don't want to declare in the header.
   1325   // - It is allocated at the top of the fiber's stack area, so the Impl pointer also serves to
   1326   //   track where the stack was allocated.
   1327 
   1328   jmp_buf fiberJmpBuf;
   1329   jmp_buf originalJmpBuf;
   1330 
   1331   static Impl* alloc(size_t stackSize, ucontext_t* context) {
   1332 #ifndef MAP_ANONYMOUS
   1333 #define MAP_ANONYMOUS MAP_ANON
   1334 #endif
   1335 #ifndef MAP_STACK
   1336 #define MAP_STACK 0
   1337 #endif
   1338 
   1339     size_t pageSize = getPageSize();
   1340     size_t allocSize = stackSize + pageSize;  // size plus guard page
   1341 
   1342     // Allocate virtual address space for the stack but make it inaccessible initially.
   1343     // TODO(someday): Does it make sense to use MAP_GROWSDOWN on Linux? It's a kind of bizarre flag
   1344     //   that causes the mapping to automatically allocate extra pages (beyond the range specified)
   1345     //   until it hits something...
   1346     void* stack = mmap(nullptr, allocSize, PROT_NONE,
   1347         MAP_ANONYMOUS | MAP_PRIVATE | MAP_STACK, -1, 0);
   1348     if (stack == MAP_FAILED) {
   1349       KJ_FAIL_SYSCALL("mmap(new stack)", errno);
   1350     }
   1351     KJ_ON_SCOPE_FAILURE({
   1352       KJ_SYSCALL(munmap(stack, allocSize)) { break; }
   1353     });
   1354 
   1355     // Now mark everything except the guard page as read-write. We assume the stack grows down, so
   1356     // the guard page is at the beginning. No modern architecture uses stacks that grow up.
   1357     KJ_SYSCALL(mprotect(reinterpret_cast<byte*>(stack) + pageSize,
   1358                         stackSize, PROT_READ | PROT_WRITE));
   1359 
   1360     // Stick `Impl` at the top of the stack.
   1361     Impl* impl = (reinterpret_cast<Impl*>(reinterpret_cast<byte*>(stack) + allocSize) - 1);
   1362 
   1363     // Note: mmap() allocates zero'd pages so we don't have to memset() anything here.
   1364 
   1365     KJ_SYSCALL(getcontext(context));
   1366     context->uc_stack.ss_size = allocSize - sizeof(Impl);
   1367     context->uc_stack.ss_sp = reinterpret_cast<char*>(stack);
   1368     context->uc_stack.ss_flags = 0;
   1369     // We don't use uc_link since our fiber start routine runs forever in a loop to allow for
   1370     // reuse. When we're done with the fiber, we just destroy it, without switching to it's
   1371     // stack. This is safe since the start routine doesn't allocate any memory or RAII objects
   1372     // before looping.
   1373     context->uc_link = 0;
   1374 
   1375     return impl;
   1376   }
   1377 
   1378   static void free(Impl* impl, size_t stackSize) {
   1379     size_t allocSize = stackSize + getPageSize();
   1380     void* stack = reinterpret_cast<byte*>(impl + 1) - allocSize;
   1381     KJ_SYSCALL(munmap(stack, allocSize)) { break; }
   1382   }
   1383 
   1384   static size_t getPageSize() {
   1385 #ifndef _SC_PAGESIZE
   1386 #define _SC_PAGESIZE _SC_PAGE_SIZE
   1387 #endif
   1388     static size_t result = sysconf(_SC_PAGE_SIZE);
   1389     return result;
   1390   }
   1391 };
   1392 #endif
   1393 #endif
   1394 
   1395 struct FiberStack::StartRoutine {
   1396 #if _WIN32 || __CYGWIN__
   1397   static void WINAPI run(LPVOID ptr) {
   1398     // This is the static C-style function we pass to CreateFiber().
   1399     reinterpret_cast<FiberStack*>(ptr)->run();
   1400   }
   1401 #else
   1402   [[noreturn]] static void run(int arg1, int arg2) {
   1403     // This is the static C-style function we pass to makeContext().
   1404 
   1405     // POSIX says the arguments are ints, not pointers. So we split our pointer in half in order to
   1406     // work correctly on 64-bit machines. Gross.
   1407     uintptr_t ptr = static_cast<uint>(arg1);
   1408     ptr |= static_cast<uintptr_t>(static_cast<uint>(arg2)) << (sizeof(ptr) * 4);
   1409 
   1410     auto& stack = *reinterpret_cast<FiberStack*>(ptr);
   1411 
   1412     // We first switch to the fiber inside of the FiberStack constructor. This is just for
   1413     // initialization purposes, and we're expected to switch back immediately.
   1414     stack.switchToMain();
   1415 
   1416     // OK now have a real job.
   1417     stack.run();
   1418   }
   1419 #endif
   1420 };
   1421 
   1422 void FiberStack::run() {
   1423   // Loop forever so that the fiber can be reused.
   1424   for (;;) {
   1425     KJ_SWITCH_ONEOF(main) {
   1426       KJ_CASE_ONEOF(event, FiberBase*) {
   1427         event->run();
   1428       }
   1429       KJ_CASE_ONEOF(func, SynchronousFunc*) {
   1430         KJ_IF_MAYBE(exception, kj::runCatchingExceptions(func->func)) {
   1431           func->exception.emplace(kj::mv(*exception));
   1432         }
   1433       }
   1434     }
   1435 
   1436     // Wait for the fiber to be used again. Note the fiber might simply be destroyed without this
   1437     // ever returning. That's OK because we don't have any nontrivial destructors on the stack
   1438     // at this point.
   1439     switchToMain();
   1440   }
   1441 }
   1442 
   1443 FiberStack::FiberStack(size_t stackSizeParam)
   1444     // Force stackSize to a reasonable minimum.
   1445     : stackSize(kj::max(stackSizeParam, 65536))
   1446 {
   1447 
   1448 #if KJ_USE_FIBERS
   1449 #if _WIN32 || __CYGWIN__
   1450   // We can create fibers before we convert the main thread into a fiber in FiberBase
   1451   KJ_WIN32(osFiber = CreateFiber(stackSize, &StartRoutine::run, this));
   1452 
   1453 #else
   1454   // Note: Nothing below here can throw. If that changes then we need to call Impl::free(impl)
   1455   //   on exceptions...
   1456   ucontext_t context;
   1457   impl = Impl::alloc(stackSize, &context);
   1458 
   1459   // POSIX says the arguments are ints, not pointers. So we split our pointer in half in order to
   1460   // work correctly on 64-bit machines. Gross.
   1461   uintptr_t ptr = reinterpret_cast<uintptr_t>(this);
   1462   int arg1 = ptr & ((uintptr_t(1) << (sizeof(ptr) * 4)) - 1);
   1463   int arg2 = ptr >> (sizeof(ptr) * 4);
   1464 
   1465   makecontext(&context, reinterpret_cast<void(*)()>(&StartRoutine::run), 2, arg1, arg2);
   1466 
   1467   if (_setjmp(impl->originalJmpBuf) == 0) {
   1468     setcontext(&context);
   1469   }
   1470 #endif
   1471 #else
   1472 #if KJ_NO_EXCEPTIONS
   1473   KJ_UNIMPLEMENTED("Fibers are not implemented because exceptions are disabled");
   1474 #else
   1475   KJ_UNIMPLEMENTED(
   1476       "Fibers are not implemented on this platform because its C library lacks setcontext() "
   1477       "and friends. If you'd like to see fiber support added, file a bug to let us know. "
   1478       "We can likely make it happen using assembly, but didn't want to try unless it was "
   1479       "actually needed.");
   1480 #endif
   1481 #endif
   1482 }
   1483 
   1484 FiberStack::~FiberStack() noexcept(false) {
   1485 #if KJ_USE_FIBERS
   1486 #if _WIN32 || __CYGWIN__
   1487   DeleteFiber(osFiber);
   1488 #else
   1489   Impl::free(impl, stackSize);
   1490 #endif
   1491 #endif
   1492 }
   1493 
   1494 void FiberStack::initialize(FiberBase& fiber) {
   1495   KJ_REQUIRE(this->main == nullptr);
   1496   this->main = &fiber;
   1497 }
   1498 
   1499 void FiberStack::initialize(SynchronousFunc& func) {
   1500   KJ_REQUIRE(this->main == nullptr);
   1501   this->main = &func;
   1502 }
   1503 
   1504 FiberBase::FiberBase(size_t stackSize, _::ExceptionOrValue& result)
   1505     : state(WAITING), stack(kj::heap<FiberStack>(stackSize)), result(result) {
   1506   stack->initialize(*this);
   1507   ensureThreadCanRunFibers();
   1508 }
   1509 
   1510 FiberBase::FiberBase(const FiberPool& pool, _::ExceptionOrValue& result)
   1511     : state(WAITING), result(result) {
   1512   stack = pool.impl->takeStack();
   1513   stack->initialize(*this);
   1514   ensureThreadCanRunFibers();
   1515 }
   1516 
   1517 FiberBase::~FiberBase() noexcept(false) {}
   1518 
   1519 void FiberBase::destroy() {
   1520   // Called by `~Fiber()` to begin teardown. We can't do this work in `~FiberBase()` because the
   1521   // `Fiber` subclass contains members that may still be in-use until the fiber stops.
   1522 
   1523   switch (state) {
   1524     case WAITING:
   1525       // We can't just free the stack while the fiber is running. We need to force it to execute
   1526       // until finished, so we cause it to throw an exception.
   1527       state = CANCELED;
   1528       stack->switchToFiber();
   1529 
   1530       // The fiber should only switch back to the main stack on completion, because any further
   1531       // calls to wait() would throw before trying to switch.
   1532       KJ_ASSERT(state == FINISHED);
   1533 
   1534       // The fiber shut down properly so the stack is safe to reuse.
   1535       stack->reset();
   1536       break;
   1537 
   1538     case RUNNING:
   1539     case CANCELED:
   1540       // Bad news.
   1541       KJ_LOG(FATAL, "fiber tried to destroy itself");
   1542       ::abort();
   1543       break;
   1544 
   1545     case FINISHED:
   1546       // Normal completion, yay.
   1547       stack->reset();
   1548       break;
   1549   }
   1550 }
   1551 
   1552 Maybe<Own<Event>> FiberBase::fire() {
   1553   KJ_ASSERT(state == WAITING);
   1554   state = RUNNING;
   1555   stack->switchToFiber();
   1556   return nullptr;
   1557 }
   1558 
   1559 void FiberStack::switchToFiber() {
   1560   // Switch from the main stack to the fiber. Returns once the fiber either calls switchToMain()
   1561   // or returns from its main function.
   1562 #if KJ_USE_FIBERS
   1563 #if _WIN32 || __CYGWIN__
   1564   SwitchToFiber(osFiber);
   1565 #else
   1566   if (_setjmp(impl->originalJmpBuf) == 0) {
   1567     _longjmp(impl->fiberJmpBuf, 1);
   1568   }
   1569 #endif
   1570 #endif
   1571 }
   1572 void FiberStack::switchToMain() {
   1573   // Switch from the fiber to the main stack. Returns the next time the main stack calls
   1574   // switchToFiber().
   1575 #if KJ_USE_FIBERS
   1576 #if _WIN32 || __CYGWIN__
   1577   SwitchToFiber(getMainWin32Fiber());
   1578 #else
   1579   if (_setjmp(impl->fiberJmpBuf) == 0) {
   1580     _longjmp(impl->originalJmpBuf, 1);
   1581   }
   1582 #endif
   1583 #endif
   1584 }
   1585 
   1586 void FiberBase::run() {
   1587 #if !KJ_NO_EXCEPTIONS
   1588   bool caughtCanceled = false;
   1589   state = RUNNING;
   1590   KJ_DEFER(state = FINISHED);
   1591 
   1592   WaitScope waitScope(currentEventLoop(), *this);
   1593 
   1594   try {
   1595     KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
   1596       runImpl(waitScope);
   1597     })) {
   1598       result.addException(kj::mv(*exception));
   1599     }
   1600   } catch (CanceledException) {
   1601     if (state != CANCELED) {
   1602       // no idea who would throw this but it's not really our problem
   1603       result.addException(KJ_EXCEPTION(FAILED, "Caught CanceledException, but fiber wasn't canceled"));
   1604     }
   1605     caughtCanceled = true;
   1606   }
   1607 
   1608   if (state == CANCELED && !caughtCanceled) {
   1609     KJ_LOG(ERROR, "Canceled fiber apparently caught CanceledException and didn't rethrow it. "
   1610       "Generally, applications should not catch CanceledException, but if they do, they must always rethrow.");
   1611   }
   1612 
   1613   onReadyEvent.arm();
   1614 #endif
   1615 }
   1616 
   1617 void FiberBase::onReady(_::Event* event) noexcept {
   1618   onReadyEvent.init(event);
   1619 }
   1620 
   1621 void FiberBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
   1622   if (stopAtNextEvent) return;
   1623   currentInner->tracePromise(builder, false);
   1624   stack->trace(builder);
   1625 }
   1626 
   1627 void FiberBase::traceEvent(TraceBuilder& builder) {
   1628   currentInner->tracePromise(builder, true);
   1629   stack->trace(builder);
   1630   onReadyEvent.traceEvent(builder);
   1631 }
   1632 
   1633 }  // namespace _ (private)
   1634 
   1635 // =======================================================================================
   1636 
   1637 void EventPort::setRunnable(bool runnable) {}
   1638 
   1639 void EventPort::wake() const {
   1640   kj::throwRecoverableException(KJ_EXCEPTION(UNIMPLEMENTED,
   1641       "cross-thread wake() not implemented by this EventPort implementation"));
   1642 }
   1643 
   1644 EventLoop::EventLoop()
   1645     : daemons(kj::heap<TaskSet>(_::LoggingErrorHandler::instance)) {}
   1646 
   1647 EventLoop::EventLoop(EventPort& port)
   1648     : port(port),
   1649       daemons(kj::heap<TaskSet>(_::LoggingErrorHandler::instance)) {}
   1650 
   1651 EventLoop::~EventLoop() noexcept(false) {
   1652   // Destroy all "daemon" tasks, noting that their destructors might register more daemon tasks.
   1653   while (!daemons->isEmpty()) {
   1654     auto oldDaemons = kj::mv(daemons);
   1655     daemons = kj::heap<TaskSet>(_::LoggingErrorHandler::instance);
   1656   }
   1657   daemons = nullptr;
   1658 
   1659   KJ_IF_MAYBE(e, executor) {
   1660     // Cancel all outstanding cross-thread events.
   1661     e->get()->impl->disconnect();
   1662   }
   1663 
   1664   // The application _should_ destroy everything using the EventLoop before destroying the
   1665   // EventLoop itself, so if there are events on the loop, this indicates a memory leak.
   1666   KJ_REQUIRE(head == nullptr, "EventLoop destroyed with events still in the queue.  Memory leak?",
   1667              head->traceEvent()) {
   1668     // Unlink all the events and hope that no one ever fires them...
   1669     _::Event* event = head;
   1670     while (event != nullptr) {
   1671       _::Event* next = event->next;
   1672       event->next = nullptr;
   1673       event->prev = nullptr;
   1674       event = next;
   1675     }
   1676     break;
   1677   }
   1678 
   1679   KJ_REQUIRE(threadLocalEventLoop != this,
   1680              "EventLoop destroyed while still current for the thread.") {
   1681     threadLocalEventLoop = nullptr;
   1682     break;
   1683   }
   1684 }
   1685 
   1686 void EventLoop::run(uint maxTurnCount) {
   1687   running = true;
   1688   KJ_DEFER(running = false);
   1689 
   1690   for (uint i = 0; i < maxTurnCount; i++) {
   1691     if (!turn()) {
   1692       break;
   1693     }
   1694   }
   1695 
   1696   setRunnable(isRunnable());
   1697 }
   1698 
   1699 bool EventLoop::turn() {
   1700   _::Event* event = head;
   1701 
   1702   if (event == nullptr) {
   1703     // No events in the queue.
   1704     return false;
   1705   } else {
   1706     head = event->next;
   1707     if (head != nullptr) {
   1708       head->prev = &head;
   1709     }
   1710 
   1711     depthFirstInsertPoint = &head;
   1712     if (breadthFirstInsertPoint == &event->next) {
   1713       breadthFirstInsertPoint = &head;
   1714     }
   1715     if (tail == &event->next) {
   1716       tail = &head;
   1717     }
   1718 
   1719     event->next = nullptr;
   1720     event->prev = nullptr;
   1721 
   1722     Maybe<Own<_::Event>> eventToDestroy;
   1723     {
   1724       event->firing = true;
   1725       KJ_DEFER(event->firing = false);
   1726       currentlyFiring = event;
   1727       KJ_DEFER(currentlyFiring = nullptr);
   1728       eventToDestroy = event->fire();
   1729     }
   1730 
   1731     depthFirstInsertPoint = &head;
   1732     return true;
   1733   }
   1734 }
   1735 
   1736 bool EventLoop::isRunnable() {
   1737   return head != nullptr;
   1738 }
   1739 
   1740 const Executor& EventLoop::getExecutor() {
   1741   KJ_IF_MAYBE(e, executor) {
   1742     return **e;
   1743   } else {
   1744     return *executor.emplace(kj::atomicRefcounted<_::ExecutorImpl>(*this, Badge<EventLoop>()));
   1745   }
   1746 }
   1747 
   1748 void EventLoop::setRunnable(bool runnable) {
   1749   if (runnable != lastRunnableState) {
   1750     KJ_IF_MAYBE(p, port) {
   1751       p->setRunnable(runnable);
   1752     }
   1753     lastRunnableState = runnable;
   1754   }
   1755 }
   1756 
   1757 void EventLoop::enterScope() {
   1758   KJ_REQUIRE(threadLocalEventLoop == nullptr, "This thread already has an EventLoop.");
   1759   threadLocalEventLoop = this;
   1760 }
   1761 
   1762 void EventLoop::leaveScope() {
   1763   KJ_REQUIRE(threadLocalEventLoop == this,
   1764              "WaitScope destroyed in a different thread than it was created in.") {
   1765     break;
   1766   }
   1767   threadLocalEventLoop = nullptr;
   1768 }
   1769 
   1770 void EventLoop::wait() {
   1771   KJ_IF_MAYBE(p, port) {
   1772     if (p->wait()) {
   1773       // Another thread called wake(). Check for cross-thread events.
   1774       KJ_IF_MAYBE(e, executor) {
   1775         e->get()->poll();
   1776       }
   1777     }
   1778   } else KJ_IF_MAYBE(e, executor) {
   1779     e->get()->wait();
   1780   } else {
   1781     KJ_FAIL_REQUIRE("Nothing to wait for; this thread would hang forever.");
   1782   }
   1783 }
   1784 
   1785 void EventLoop::poll() {
   1786   KJ_IF_MAYBE(p, port) {
   1787     if (p->poll()) {
   1788       // Another thread called wake(). Check for cross-thread events.
   1789       KJ_IF_MAYBE(e, executor) {
   1790         e->get()->poll();
   1791       }
   1792     }
   1793   } else KJ_IF_MAYBE(e, executor) {
   1794     e->get()->poll();
   1795   }
   1796 }
   1797 
   1798 void WaitScope::poll() {
   1799   KJ_REQUIRE(&loop == threadLocalEventLoop, "WaitScope not valid for this thread.");
   1800   KJ_REQUIRE(!loop.running, "poll() is not allowed from within event callbacks.");
   1801 
   1802   loop.running = true;
   1803   KJ_DEFER(loop.running = false);
   1804 
   1805   runOnStackPool([&]() {
   1806     for (;;) {
   1807       if (!loop.turn()) {
   1808         // No events in the queue.  Poll for I/O.
   1809         loop.poll();
   1810 
   1811         if (!loop.isRunnable()) {
   1812           // Still no events in the queue. We're done.
   1813           return;
   1814         }
   1815       }
   1816     }
   1817   });
   1818 }
   1819 
   1820 void WaitScope::cancelAllDetached() {
   1821   KJ_REQUIRE(fiber == nullptr,
   1822       "can't call cancelAllDetached() on a fiber WaitScope, only top-level");
   1823 
   1824   while (!loop.daemons->isEmpty()) {
   1825     auto oldDaemons = kj::mv(loop.daemons);
   1826     loop.daemons = kj::heap<TaskSet>(_::LoggingErrorHandler::instance);
   1827     // Destroying `oldDaemons` could theoretically add new ones.
   1828   }
   1829 }
   1830 
   1831 namespace _ {  // private
   1832 
   1833 #if !KJ_NO_EXCEPTIONS
   1834 static kj::CanceledException fiberCanceledException() {
   1835   // Construct the exception to throw from wait() when the fiber has been canceled (because the
   1836   // promise returned by startFiber() was dropped before completion).
   1837   return kj::CanceledException { };
   1838 };
   1839 #endif
   1840 
   1841 void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope& waitScope) {
   1842   EventLoop& loop = waitScope.loop;
   1843   KJ_REQUIRE(&loop == threadLocalEventLoop, "WaitScope not valid for this thread.");
   1844 
   1845 #if !KJ_NO_EXCEPTIONS
   1846   // we don't support fibers when running without exceptions, so just remove the whole block
   1847   KJ_IF_MAYBE(fiber, waitScope.fiber) {
   1848     if (fiber->state == FiberBase::CANCELED) {
   1849       throw fiberCanceledException();
   1850     }
   1851     KJ_REQUIRE(fiber->state == FiberBase::RUNNING,
   1852         "This WaitScope can only be used within the fiber that created it.");
   1853 
   1854     node->setSelfPointer(&node);
   1855     node->onReady(fiber);
   1856 
   1857     fiber->currentInner = node;
   1858     KJ_DEFER(fiber->currentInner = nullptr);
   1859 
   1860     // Switch to the main stack to run the event loop.
   1861     fiber->state = FiberBase::WAITING;
   1862     fiber->stack->switchToMain();
   1863 
   1864     // The main stack switched back to us, meaning either the event we registered with
   1865     // node->onReady() fired, or we are being canceled by FiberBase's destructor.
   1866 
   1867     if (fiber->state == FiberBase::CANCELED) {
   1868       throw fiberCanceledException();
   1869     }
   1870 
   1871     KJ_ASSERT(fiber->state == FiberBase::RUNNING);
   1872   } else {
   1873 #endif
   1874     KJ_REQUIRE(!loop.running, "wait() is not allowed from within event callbacks.");
   1875 
   1876     RootEvent doneEvent(node, reinterpret_cast<void*>(&waitImpl));
   1877     node->setSelfPointer(&node);
   1878     node->onReady(&doneEvent);
   1879 
   1880     loop.running = true;
   1881     KJ_DEFER(loop.running = false);
   1882 
   1883     for (;;) {
   1884       waitScope.runOnStackPool([&]() {
   1885         uint counter = 0;
   1886         while (!doneEvent.fired) {
   1887           if (!loop.turn()) {
   1888             // No events in the queue.  Wait for callback.
   1889             return;
   1890           } else if (++counter > waitScope.busyPollInterval) {
   1891             // Note: It's intentional that if busyPollInterval is kj::maxValue, we never poll.
   1892             counter = 0;
   1893             loop.poll();
   1894           }
   1895         }
   1896       });
   1897 
   1898       if (doneEvent.fired) {
   1899         break;
   1900       } else {
   1901         loop.wait();
   1902       }
   1903     }
   1904 
   1905     loop.setRunnable(loop.isRunnable());
   1906 #if !KJ_NO_EXCEPTIONS
   1907   }
   1908 #endif
   1909 
   1910   waitScope.runOnStackPool([&]() {
   1911     node->get(result);
   1912     KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
   1913       node = nullptr;
   1914     })) {
   1915       result.addException(kj::mv(*exception));
   1916     }
   1917   });
   1918 }
   1919 
   1920 bool pollImpl(_::PromiseNode& node, WaitScope& waitScope) {
   1921   EventLoop& loop = waitScope.loop;
   1922   KJ_REQUIRE(&loop == threadLocalEventLoop, "WaitScope not valid for this thread.");
   1923   KJ_REQUIRE(waitScope.fiber == nullptr, "poll() is not supported in fibers.");
   1924   KJ_REQUIRE(!loop.running, "poll() is not allowed from within event callbacks.");
   1925 
   1926   RootEvent doneEvent(&node, reinterpret_cast<void*>(&pollImpl));
   1927   node.onReady(&doneEvent);
   1928 
   1929   loop.running = true;
   1930   KJ_DEFER(loop.running = false);
   1931 
   1932   waitScope.runOnStackPool([&]() {
   1933     while (!doneEvent.fired) {
   1934       if (!loop.turn()) {
   1935         // No events in the queue.  Poll for I/O.
   1936         loop.poll();
   1937 
   1938         if (!doneEvent.fired && !loop.isRunnable()) {
   1939           // No progress. Give up.
   1940           node.onReady(nullptr);
   1941           loop.setRunnable(false);
   1942           break;
   1943         }
   1944       }
   1945     }
   1946   });
   1947 
   1948   if (!doneEvent.fired) {
   1949     return false;
   1950   }
   1951 
   1952   loop.setRunnable(loop.isRunnable());
   1953   return true;
   1954 }
   1955 
   1956 Promise<void> yield() {
   1957   return _::PromiseNode::to<Promise<void>>(kj::heap<YieldPromiseNode>());
   1958 }
   1959 
   1960 Promise<void> yieldHarder() {
   1961   return _::PromiseNode::to<Promise<void>>(kj::heap<YieldHarderPromiseNode>());
   1962 }
   1963 
   1964 Own<PromiseNode> neverDone() {
   1965   return kj::heap<NeverDonePromiseNode>();
   1966 }
   1967 
   1968 void NeverDone::wait(WaitScope& waitScope) const {
   1969   ExceptionOr<Void> dummy;
   1970   waitImpl(neverDone(), dummy, waitScope);
   1971   KJ_UNREACHABLE;
   1972 }
   1973 
   1974 void detach(kj::Promise<void>&& promise) {
   1975   EventLoop& loop = currentEventLoop();
   1976   KJ_REQUIRE(loop.daemons.get() != nullptr, "EventLoop is shutting down.") { return; }
   1977   loop.daemons->add(kj::mv(promise));
   1978 }
   1979 
   1980 Event::Event()
   1981     : loop(currentEventLoop()), next(nullptr), prev(nullptr) {}
   1982 
   1983 Event::Event(kj::EventLoop& loop)
   1984     : loop(loop), next(nullptr), prev(nullptr) {}
   1985 
   1986 Event::~Event() noexcept(false) {
   1987   disarm();
   1988 
   1989   KJ_REQUIRE(!firing, "Promise callback destroyed itself.");
   1990 }
   1991 
   1992 void Event::armDepthFirst() {
   1993   KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
   1994              "Event armed from different thread than it was created in.  You must use "
   1995              "Executor to queue events cross-thread.");
   1996 
   1997   if (prev == nullptr) {
   1998     next = *loop.depthFirstInsertPoint;
   1999     prev = loop.depthFirstInsertPoint;
   2000     *prev = this;
   2001     if (next != nullptr) {
   2002       next->prev = &next;
   2003     }
   2004 
   2005     loop.depthFirstInsertPoint = &next;
   2006 
   2007     if (loop.breadthFirstInsertPoint == prev) {
   2008       loop.breadthFirstInsertPoint = &next;
   2009     }
   2010     if (loop.tail == prev) {
   2011       loop.tail = &next;
   2012     }
   2013 
   2014     loop.setRunnable(true);
   2015   }
   2016 }
   2017 
   2018 void Event::armBreadthFirst() {
   2019   KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
   2020              "Event armed from different thread than it was created in.  You must use "
   2021              "Executor to queue events cross-thread.");
   2022 
   2023   if (prev == nullptr) {
   2024     next = *loop.breadthFirstInsertPoint;
   2025     prev = loop.breadthFirstInsertPoint;
   2026     *prev = this;
   2027     if (next != nullptr) {
   2028       next->prev = &next;
   2029     }
   2030 
   2031     loop.breadthFirstInsertPoint = &next;
   2032 
   2033     if (loop.tail == prev) {
   2034       loop.tail = &next;
   2035     }
   2036 
   2037     loop.setRunnable(true);
   2038   }
   2039 }
   2040 
   2041 void Event::armLast() {
   2042   KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
   2043              "Event armed from different thread than it was created in.  You must use "
   2044              "Executor to queue events cross-thread.");
   2045 
   2046   if (prev == nullptr) {
   2047     next = *loop.breadthFirstInsertPoint;
   2048     prev = loop.breadthFirstInsertPoint;
   2049     *prev = this;
   2050     if (next != nullptr) {
   2051       next->prev = &next;
   2052     }
   2053 
   2054     // We don't update loop.breadthFirstInsertPoint because we want further inserts to go *before*
   2055     // this event.
   2056 
   2057     if (loop.tail == prev) {
   2058       loop.tail = &next;
   2059     }
   2060 
   2061     loop.setRunnable(true);
   2062   }
   2063 }
   2064 
   2065 bool Event::isNext() {
   2066   return loop.running && loop.head == this;
   2067 }
   2068 
   2069 void Event::disarm() {
   2070   if (prev != nullptr) {
   2071     if (threadLocalEventLoop != &loop && threadLocalEventLoop != nullptr) {
   2072       KJ_LOG(FATAL, "Promise destroyed from a different thread than it was created in.");
   2073       // There's no way out of this place without UB, so abort now.
   2074       abort();
   2075     }
   2076 
   2077     if (loop.tail == &next) {
   2078       loop.tail = prev;
   2079     }
   2080     if (loop.depthFirstInsertPoint == &next) {
   2081       loop.depthFirstInsertPoint = prev;
   2082     }
   2083     if (loop.breadthFirstInsertPoint == &next) {
   2084       loop.breadthFirstInsertPoint = prev;
   2085     }
   2086 
   2087     *prev = next;
   2088     if (next != nullptr) {
   2089       next->prev = prev;
   2090     }
   2091 
   2092     prev = nullptr;
   2093     next = nullptr;
   2094   }
   2095 }
   2096 
   2097 String Event::traceEvent() {
   2098   void* space[32];
   2099   TraceBuilder builder(space);
   2100   traceEvent(builder);
   2101   return kj::str(builder);
   2102 }
   2103 
   2104 String TraceBuilder::toString() {
   2105   auto result = finish();
   2106   return kj::str(stringifyStackTraceAddresses(result),
   2107                  stringifyStackTrace(result));
   2108 }
   2109 
   2110 }  // namespace _ (private)
   2111 
   2112 ArrayPtr<void* const> getAsyncTrace(ArrayPtr<void*> space) {
   2113   EventLoop* loop = threadLocalEventLoop;
   2114   if (loop == nullptr) return nullptr;
   2115   if (loop->currentlyFiring == nullptr) return nullptr;
   2116 
   2117   _::TraceBuilder builder(space);
   2118   loop->currentlyFiring->traceEvent(builder);
   2119   return builder.finish();
   2120 }
   2121 
   2122 kj::String getAsyncTrace() {
   2123   void* space[32];
   2124   auto trace = getAsyncTrace(space);
   2125   return kj::str(stringifyStackTraceAddresses(trace), stringifyStackTrace(trace));
   2126 }
   2127 
   2128 // =======================================================================================
   2129 
   2130 namespace _ {  // private
   2131 
   2132 kj::String PromiseBase::trace() {
   2133   void* space[32];
   2134   TraceBuilder builder(space);
   2135   node->tracePromise(builder, false);
   2136   return kj::str(builder);
   2137 }
   2138 
   2139 void PromiseNode::setSelfPointer(Own<PromiseNode>* selfPtr) noexcept {}
   2140 
   2141 void PromiseNode::OnReadyEvent::init(Event* newEvent) {
   2142   if (event == _kJ_ALREADY_READY) {
   2143     // A new continuation was added to a promise that was already ready.  In this case, we schedule
   2144     // breadth-first, to make it difficult for applications to accidentally starve the event loop
   2145     // by repeatedly waiting on immediate promises.
   2146     if (newEvent) newEvent->armBreadthFirst();
   2147   } else {
   2148     event = newEvent;
   2149   }
   2150 }
   2151 
   2152 void PromiseNode::OnReadyEvent::arm() {
   2153   KJ_ASSERT(event != _kJ_ALREADY_READY, "arm() should only be called once");
   2154 
   2155   if (event != nullptr) {
   2156     // A promise resolved and an event is already waiting on it.  In this case, arm in depth-first
   2157     // order so that the event runs immediately after the current one.  This way, chained promises
   2158     // execute together for better cache locality and lower latency.
   2159     event->armDepthFirst();
   2160   }
   2161 
   2162   event = _kJ_ALREADY_READY;
   2163 }
   2164 
   2165 void PromiseNode::OnReadyEvent::armBreadthFirst() {
   2166   KJ_ASSERT(event != _kJ_ALREADY_READY, "armBreadthFirst() should only be called once");
   2167 
   2168   if (event != nullptr) {
   2169     // A promise resolved and an event is already waiting on it.
   2170     event->armBreadthFirst();
   2171   }
   2172 
   2173   event = _kJ_ALREADY_READY;
   2174 }
   2175 
   2176 // -------------------------------------------------------------------
   2177 
   2178 ImmediatePromiseNodeBase::ImmediatePromiseNodeBase() {}
   2179 ImmediatePromiseNodeBase::~ImmediatePromiseNodeBase() noexcept(false) {}
   2180 
   2181 void ImmediatePromiseNodeBase::onReady(Event* event) noexcept {
   2182   if (event) event->armBreadthFirst();
   2183 }
   2184 
   2185 void ImmediatePromiseNodeBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
   2186   // Maybe returning the address of get() will give us a function name with meaningful type
   2187   // information.
   2188   builder.add(getMethodStartAddress(implicitCast<PromiseNode&>(*this), &PromiseNode::get));
   2189 }
   2190 
   2191 ImmediateBrokenPromiseNode::ImmediateBrokenPromiseNode(Exception&& exception)
   2192     : exception(kj::mv(exception)) {}
   2193 
   2194 void ImmediateBrokenPromiseNode::get(ExceptionOrValue& output) noexcept {
   2195   output.exception = kj::mv(exception);
   2196 }
   2197 
   2198 // -------------------------------------------------------------------
   2199 
   2200 AttachmentPromiseNodeBase::AttachmentPromiseNodeBase(Own<PromiseNode>&& dependencyParam)
   2201     : dependency(kj::mv(dependencyParam)) {
   2202   dependency->setSelfPointer(&dependency);
   2203 }
   2204 
   2205 void AttachmentPromiseNodeBase::onReady(Event* event) noexcept {
   2206   dependency->onReady(event);
   2207 }
   2208 
   2209 void AttachmentPromiseNodeBase::get(ExceptionOrValue& output) noexcept {
   2210   dependency->get(output);
   2211 }
   2212 
   2213 void AttachmentPromiseNodeBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
   2214   dependency->tracePromise(builder, stopAtNextEvent);
   2215 
   2216   // TODO(debug): Maybe use __builtin_return_address to get the locations that called fork() and
   2217   //   addBranch()?
   2218 }
   2219 
   2220 void AttachmentPromiseNodeBase::dropDependency() {
   2221   dependency = nullptr;
   2222 }
   2223 
   2224 // -------------------------------------------------------------------
   2225 
   2226 TransformPromiseNodeBase::TransformPromiseNodeBase(
   2227     Own<PromiseNode>&& dependencyParam, void* continuationTracePtr)
   2228     : dependency(kj::mv(dependencyParam)), continuationTracePtr(continuationTracePtr) {
   2229   dependency->setSelfPointer(&dependency);
   2230 }
   2231 
   2232 void TransformPromiseNodeBase::onReady(Event* event) noexcept {
   2233   dependency->onReady(event);
   2234 }
   2235 
   2236 void TransformPromiseNodeBase::get(ExceptionOrValue& output) noexcept {
   2237   KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
   2238     getImpl(output);
   2239     dropDependency();
   2240   })) {
   2241     output.addException(kj::mv(*exception));
   2242   }
   2243 }
   2244 
   2245 void TransformPromiseNodeBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
   2246   // Note that we null out the dependency just before calling our own continuation, which
   2247   // conveniently means that if we're currently executing the continuation when the trace is
   2248   // requested, it won't trace into the obsolete dependency. Nice.
   2249   if (dependency.get() != nullptr) {
   2250     dependency->tracePromise(builder, stopAtNextEvent);
   2251   }
   2252 
   2253   builder.add(continuationTracePtr);
   2254 }
   2255 
   2256 void TransformPromiseNodeBase::dropDependency() {
   2257   dependency = nullptr;
   2258 }
   2259 
   2260 void TransformPromiseNodeBase::getDepResult(ExceptionOrValue& output) {
   2261   dependency->get(output);
   2262   KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
   2263     dependency = nullptr;
   2264   })) {
   2265     output.addException(kj::mv(*exception));
   2266   }
   2267 
   2268   KJ_IF_MAYBE(e, output.exception) {
   2269     e->addTrace(continuationTracePtr);
   2270   }
   2271 }
   2272 
   2273 // -------------------------------------------------------------------
   2274 
   2275 ForkBranchBase::ForkBranchBase(Own<ForkHubBase>&& hubParam): hub(kj::mv(hubParam)) {
   2276   if (hub->tailBranch == nullptr) {
   2277     onReadyEvent.arm();
   2278   } else {
   2279     // Insert into hub's linked list of branches.
   2280     prevPtr = hub->tailBranch;
   2281     *prevPtr = this;
   2282     next = nullptr;
   2283     hub->tailBranch = &next;
   2284   }
   2285 }
   2286 
   2287 ForkBranchBase::~ForkBranchBase() noexcept(false) {
   2288   if (prevPtr != nullptr) {
   2289     // Remove from hub's linked list of branches.
   2290     *prevPtr = next;
   2291     (next == nullptr ? hub->tailBranch : next->prevPtr) = prevPtr;
   2292   }
   2293 }
   2294 
   2295 void ForkBranchBase::hubReady() noexcept {
   2296   onReadyEvent.arm();
   2297 }
   2298 
   2299 void ForkBranchBase::releaseHub(ExceptionOrValue& output) {
   2300   KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
   2301     hub = nullptr;
   2302   })) {
   2303     output.addException(kj::mv(*exception));
   2304   }
   2305 }
   2306 
   2307 void ForkBranchBase::onReady(Event* event) noexcept {
   2308   onReadyEvent.init(event);
   2309 }
   2310 
   2311 void ForkBranchBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
   2312   if (stopAtNextEvent) return;
   2313 
   2314   if (hub.get() != nullptr) {
   2315     hub->inner->tracePromise(builder, false);
   2316   }
   2317 
   2318   // TODO(debug): Maybe use __builtin_return_address to get the locations that called fork() and
   2319   //   addBranch()?
   2320 }
   2321 
   2322 // -------------------------------------------------------------------
   2323 
   2324 ForkHubBase::ForkHubBase(Own<PromiseNode>&& innerParam, ExceptionOrValue& resultRef)
   2325     : inner(kj::mv(innerParam)), resultRef(resultRef) {
   2326   inner->setSelfPointer(&inner);
   2327   inner->onReady(this);
   2328 }
   2329 
   2330 Maybe<Own<Event>> ForkHubBase::fire() {
   2331   // Dependency is ready.  Fetch its result and then delete the node.
   2332   inner->get(resultRef);
   2333   KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
   2334     inner = nullptr;
   2335   })) {
   2336     resultRef.addException(kj::mv(*exception));
   2337   }
   2338 
   2339   for (auto branch = headBranch; branch != nullptr; branch = branch->next) {
   2340     branch->hubReady();
   2341     *branch->prevPtr = nullptr;
   2342     branch->prevPtr = nullptr;
   2343   }
   2344   *tailBranch = nullptr;
   2345 
   2346   // Indicate that the list is no longer active.
   2347   tailBranch = nullptr;
   2348 
   2349   return nullptr;
   2350 }
   2351 
   2352 void ForkHubBase::traceEvent(TraceBuilder& builder) {
   2353   if (inner.get() != nullptr) {
   2354     inner->tracePromise(builder, true);
   2355   }
   2356 
   2357   if (headBranch != nullptr) {
   2358     // We'll trace down the first branch, I guess.
   2359     headBranch->onReadyEvent.traceEvent(builder);
   2360   }
   2361 }
   2362 
   2363 // -------------------------------------------------------------------
   2364 
   2365 ChainPromiseNode::ChainPromiseNode(Own<PromiseNode> innerParam)
   2366     : state(STEP1), inner(kj::mv(innerParam)) {
   2367   inner->setSelfPointer(&inner);
   2368   inner->onReady(this);
   2369 }
   2370 
   2371 ChainPromiseNode::~ChainPromiseNode() noexcept(false) {}
   2372 
   2373 void ChainPromiseNode::onReady(Event* event) noexcept {
   2374   switch (state) {
   2375     case STEP1:
   2376       onReadyEvent = event;
   2377       return;
   2378     case STEP2:
   2379       inner->onReady(event);
   2380       return;
   2381   }
   2382   KJ_UNREACHABLE;
   2383 }
   2384 
   2385 void ChainPromiseNode::setSelfPointer(Own<PromiseNode>* selfPtr) noexcept {
   2386   if (state == STEP2) {
   2387     *selfPtr = kj::mv(inner);  // deletes this!
   2388     selfPtr->get()->setSelfPointer(selfPtr);
   2389   } else {
   2390     this->selfPtr = selfPtr;
   2391   }
   2392 }
   2393 
   2394 void ChainPromiseNode::get(ExceptionOrValue& output) noexcept {
   2395   KJ_REQUIRE(state == STEP2);
   2396   return inner->get(output);
   2397 }
   2398 
   2399 void ChainPromiseNode::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
   2400   if (stopAtNextEvent && state == STEP1) {
   2401     // In STEP1, we are an Event -- when the inner node resolves, it will arm *this* object.
   2402     // In STEP2, we are not an Event -- when the inner node resolves, it directly arms our parent
   2403     // event.
   2404     return;
   2405   }
   2406 
   2407   inner->tracePromise(builder, stopAtNextEvent);
   2408 }
   2409 
   2410 Maybe<Own<Event>> ChainPromiseNode::fire() {
   2411   KJ_REQUIRE(state != STEP2);
   2412 
   2413   static_assert(sizeof(Promise<int>) == sizeof(PromiseBase),
   2414       "This code assumes Promise<T> does not add any new members to PromiseBase.");
   2415 
   2416   ExceptionOr<PromiseBase> intermediate;
   2417   inner->get(intermediate);
   2418 
   2419   KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
   2420     inner = nullptr;
   2421   })) {
   2422     intermediate.addException(kj::mv(*exception));
   2423   }
   2424 
   2425   KJ_IF_MAYBE(exception, intermediate.exception) {
   2426     // There is an exception.  If there is also a value, delete it.
   2427     kj::runCatchingExceptions([&]() { intermediate.value = nullptr; });
   2428     // Now set step2 to a rejected promise.
   2429     inner = heap<ImmediateBrokenPromiseNode>(kj::mv(*exception));
   2430   } else KJ_IF_MAYBE(value, intermediate.value) {
   2431     // There is a value and no exception.  The value is itself a promise.  Adopt it as our
   2432     // step2.
   2433     inner = _::PromiseNode::from(kj::mv(*value));
   2434   } else {
   2435     // We can only get here if inner->get() returned neither an exception nor a
   2436     // value, which never actually happens.
   2437     KJ_FAIL_ASSERT("Inner node returned empty value.");
   2438   }
   2439   state = STEP2;
   2440 
   2441   if (selfPtr != nullptr) {
   2442     // Hey, we can shorten the chain here.
   2443     auto chain = selfPtr->downcast<ChainPromiseNode>();
   2444     *selfPtr = kj::mv(inner);
   2445     selfPtr->get()->setSelfPointer(selfPtr);
   2446     if (onReadyEvent != nullptr) {
   2447       selfPtr->get()->onReady(onReadyEvent);
   2448     }
   2449 
   2450     // Return our self-pointer so that the caller takes care of deleting it.
   2451     return Own<Event>(kj::mv(chain));
   2452   } else {
   2453     inner->setSelfPointer(&inner);
   2454     if (onReadyEvent != nullptr) {
   2455       inner->onReady(onReadyEvent);
   2456     }
   2457 
   2458     return nullptr;
   2459   }
   2460 }
   2461 
   2462 void ChainPromiseNode::traceEvent(TraceBuilder& builder) {
   2463   switch (state) {
   2464     case STEP1:
   2465       if (inner.get() != nullptr) {
   2466         inner->tracePromise(builder, true);
   2467       }
   2468       if (!builder.full() && onReadyEvent != nullptr) {
   2469         onReadyEvent->traceEvent(builder);
   2470       }
   2471       break;
   2472     case STEP2:
   2473       // This probably never happens -- a trace being generated after the meat of fire() already
   2474       // executed. If it does, though, we probably can't do anything here. We don't know if
   2475       // `onReadyEvent` is still valid because we passed it on to the phase-2 promise, and tracing
   2476       // just `inner` would probably be confusing. Let's just do nothing.
   2477       break;
   2478   }
   2479 }
   2480 
   2481 // -------------------------------------------------------------------
   2482 
   2483 ExclusiveJoinPromiseNode::ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<PromiseNode> right)
   2484     : left(*this, kj::mv(left)), right(*this, kj::mv(right)) {}
   2485 
   2486 ExclusiveJoinPromiseNode::~ExclusiveJoinPromiseNode() noexcept(false) {}
   2487 
   2488 void ExclusiveJoinPromiseNode::onReady(Event* event) noexcept {
   2489   onReadyEvent.init(event);
   2490 }
   2491 
   2492 void ExclusiveJoinPromiseNode::get(ExceptionOrValue& output) noexcept {
   2493   KJ_REQUIRE(left.get(output) || right.get(output), "get() called before ready.");
   2494 }
   2495 
   2496 void ExclusiveJoinPromiseNode::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
   2497   // TODO(debug): Maybe use __builtin_return_address to get the locations that called
   2498   //   exclusiveJoin()?
   2499 
   2500   if (stopAtNextEvent) return;
   2501 
   2502   // Trace the left branch I guess.
   2503   if (left.dependency.get() != nullptr) {
   2504     left.dependency->tracePromise(builder, false);
   2505   } else if (right.dependency.get() != nullptr) {
   2506     right.dependency->tracePromise(builder, false);
   2507   }
   2508 }
   2509 
   2510 ExclusiveJoinPromiseNode::Branch::Branch(
   2511     ExclusiveJoinPromiseNode& joinNode, Own<PromiseNode> dependencyParam)
   2512     : joinNode(joinNode), dependency(kj::mv(dependencyParam)) {
   2513   dependency->setSelfPointer(&dependency);
   2514   dependency->onReady(this);
   2515 }
   2516 
   2517 ExclusiveJoinPromiseNode::Branch::~Branch() noexcept(false) {}
   2518 
   2519 bool ExclusiveJoinPromiseNode::Branch::get(ExceptionOrValue& output) {
   2520   if (dependency) {
   2521     dependency->get(output);
   2522     return true;
   2523   } else {
   2524     return false;
   2525   }
   2526 }
   2527 
   2528 Maybe<Own<Event>> ExclusiveJoinPromiseNode::Branch::fire() {
   2529   if (dependency) {
   2530     // Cancel the branch that didn't return first.  Ignore exceptions caused by cancellation.
   2531     if (this == &joinNode.left) {
   2532       kj::runCatchingExceptions([&]() { joinNode.right.dependency = nullptr; });
   2533     } else {
   2534       kj::runCatchingExceptions([&]() { joinNode.left.dependency = nullptr; });
   2535     }
   2536 
   2537     joinNode.onReadyEvent.arm();
   2538   } else {
   2539     // The other branch already fired, and this branch was canceled. It's possible for both
   2540     // branches to fire if both were armed simultaneously.
   2541   }
   2542   return nullptr;
   2543 }
   2544 
   2545 void ExclusiveJoinPromiseNode::Branch::traceEvent(TraceBuilder& builder) {
   2546   if (dependency.get() != nullptr) {
   2547     dependency->tracePromise(builder, true);
   2548   }
   2549   joinNode.onReadyEvent.traceEvent(builder);
   2550 }
   2551 
   2552 // -------------------------------------------------------------------
   2553 
   2554 ArrayJoinPromiseNodeBase::ArrayJoinPromiseNodeBase(
   2555     Array<Own<PromiseNode>> promises, ExceptionOrValue* resultParts, size_t partSize)
   2556     : countLeft(promises.size()) {
   2557   // Make the branches.
   2558   auto builder = heapArrayBuilder<Branch>(promises.size());
   2559   for (uint i: indices(promises)) {
   2560     ExceptionOrValue& output = *reinterpret_cast<ExceptionOrValue*>(
   2561         reinterpret_cast<byte*>(resultParts) + i * partSize);
   2562     builder.add(*this, kj::mv(promises[i]), output);
   2563   }
   2564   branches = builder.finish();
   2565 
   2566   if (branches.size() == 0) {
   2567     onReadyEvent.arm();
   2568   }
   2569 }
   2570 ArrayJoinPromiseNodeBase::~ArrayJoinPromiseNodeBase() noexcept(false) {}
   2571 
   2572 void ArrayJoinPromiseNodeBase::onReady(Event* event) noexcept {
   2573   onReadyEvent.init(event);
   2574 }
   2575 
   2576 void ArrayJoinPromiseNodeBase::get(ExceptionOrValue& output) noexcept {
   2577   // If any of the elements threw exceptions, propagate them.
   2578   for (auto& branch: branches) {
   2579     KJ_IF_MAYBE(exception, branch.getPart()) {
   2580       output.addException(kj::mv(*exception));
   2581     }
   2582   }
   2583 
   2584   if (output.exception == nullptr) {
   2585     // No errors.  The template subclass will need to fill in the result.
   2586     getNoError(output);
   2587   }
   2588 }
   2589 
   2590 void ArrayJoinPromiseNodeBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
   2591   // TODO(debug): Maybe use __builtin_return_address to get the locations that called
   2592   //   joinPromises()?
   2593 
   2594   if (stopAtNextEvent) return;
   2595 
   2596   // Trace the first branch I guess.
   2597   if (branches != nullptr) {
   2598     branches[0].dependency->tracePromise(builder, false);
   2599   }
   2600 }
   2601 
   2602 ArrayJoinPromiseNodeBase::Branch::Branch(
   2603     ArrayJoinPromiseNodeBase& joinNode, Own<PromiseNode> dependencyParam, ExceptionOrValue& output)
   2604     : joinNode(joinNode), dependency(kj::mv(dependencyParam)), output(output) {
   2605   dependency->setSelfPointer(&dependency);
   2606   dependency->onReady(this);
   2607 }
   2608 
   2609 ArrayJoinPromiseNodeBase::Branch::~Branch() noexcept(false) {}
   2610 
   2611 Maybe<Own<Event>> ArrayJoinPromiseNodeBase::Branch::fire() {
   2612   if (--joinNode.countLeft == 0) {
   2613     joinNode.onReadyEvent.arm();
   2614   }
   2615   return nullptr;
   2616 }
   2617 
   2618 void ArrayJoinPromiseNodeBase::Branch::traceEvent(TraceBuilder& builder) {
   2619   dependency->tracePromise(builder, true);
   2620   joinNode.onReadyEvent.traceEvent(builder);
   2621 }
   2622 
   2623 Maybe<Exception> ArrayJoinPromiseNodeBase::Branch::getPart() {
   2624   dependency->get(output);
   2625   return kj::mv(output.exception);
   2626 }
   2627 
   2628 ArrayJoinPromiseNode<void>::ArrayJoinPromiseNode(
   2629     Array<Own<PromiseNode>> promises, Array<ExceptionOr<_::Void>> resultParts)
   2630     : ArrayJoinPromiseNodeBase(kj::mv(promises), resultParts.begin(), sizeof(ExceptionOr<_::Void>)),
   2631       resultParts(kj::mv(resultParts)) {}
   2632 
   2633 ArrayJoinPromiseNode<void>::~ArrayJoinPromiseNode() {}
   2634 
   2635 void ArrayJoinPromiseNode<void>::getNoError(ExceptionOrValue& output) noexcept {
   2636   output.as<_::Void>() = _::Void();
   2637 }
   2638 
   2639 }  // namespace _ (private)
   2640 
   2641 Promise<void> joinPromises(Array<Promise<void>>&& promises) {
   2642   return _::PromiseNode::to<Promise<void>>(kj::heap<_::ArrayJoinPromiseNode<void>>(
   2643       KJ_MAP(p, promises) { return _::PromiseNode::from(kj::mv(p)); },
   2644       heapArray<_::ExceptionOr<_::Void>>(promises.size())));
   2645 }
   2646 
   2647 namespace _ {  // (private)
   2648 
   2649 // -------------------------------------------------------------------
   2650 
   2651 EagerPromiseNodeBase::EagerPromiseNodeBase(
   2652     Own<PromiseNode>&& dependencyParam, ExceptionOrValue& resultRef)
   2653     : dependency(kj::mv(dependencyParam)), resultRef(resultRef) {
   2654   dependency->setSelfPointer(&dependency);
   2655   dependency->onReady(this);
   2656 }
   2657 
   2658 void EagerPromiseNodeBase::onReady(Event* event) noexcept {
   2659   onReadyEvent.init(event);
   2660 }
   2661 
   2662 void EagerPromiseNodeBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
   2663   // TODO(debug): Maybe use __builtin_return_address to get the locations that called
   2664   //   eagerlyEvaluate()? But note that if a non-null exception handler was passed to it, that
   2665   //   creates a TransformPromiseNode which will report the location anyhow.
   2666 
   2667   if (stopAtNextEvent) return;
   2668   if (dependency.get() != nullptr) {
   2669     dependency->tracePromise(builder, stopAtNextEvent);
   2670   }
   2671 }
   2672 
   2673 void EagerPromiseNodeBase::traceEvent(TraceBuilder& builder) {
   2674   if (dependency.get() != nullptr) {
   2675     dependency->tracePromise(builder, true);
   2676   }
   2677   onReadyEvent.traceEvent(builder);
   2678 }
   2679 
   2680 Maybe<Own<Event>> EagerPromiseNodeBase::fire() {
   2681   dependency->get(resultRef);
   2682   KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
   2683     dependency = nullptr;
   2684   })) {
   2685     resultRef.addException(kj::mv(*exception));
   2686   }
   2687 
   2688   onReadyEvent.arm();
   2689   return nullptr;
   2690 }
   2691 
   2692 // -------------------------------------------------------------------
   2693 
   2694 void AdapterPromiseNodeBase::onReady(Event* event) noexcept {
   2695   onReadyEvent.init(event);
   2696 }
   2697 
   2698 void AdapterPromiseNodeBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
   2699   // Maybe returning the address of get() will give us a function name with meaningful type
   2700   // information.
   2701   builder.add(getMethodStartAddress(implicitCast<PromiseNode&>(*this), &PromiseNode::get));
   2702 }
   2703 
   2704 void END_FULFILLER_STACK_START_LISTENER_STACK() {}
   2705 // Dummy symbol used when reporting how a PromiseFulfiller was destroyed without fulfilling the
   2706 // promise. We end up combining two stack traces into one and we use this as a separator.
   2707 
   2708 void WeakFulfillerBase::disposeImpl(void* pointer) const {
   2709   if (inner == nullptr) {
   2710     // Already detached.
   2711     delete this;
   2712   } else {
   2713     if (inner->isWaiting()) {
   2714       // Let's find out if there's an exception being thrown. If so, we'll use it to reject the
   2715       // promise.
   2716       inner->reject(getDestructionReason(
   2717           reinterpret_cast<void*>(&END_FULFILLER_STACK_START_LISTENER_STACK),
   2718           kj::Exception::Type::FAILED, __FILE__, __LINE__,
   2719           "PromiseFulfiller was destroyed without fulfilling the promise."_kj));
   2720     }
   2721     inner = nullptr;
   2722   }
   2723 }
   2724 
   2725 }  // namespace _ (private)
   2726 
   2727 // -------------------------------------------------------------------
   2728 
   2729 namespace _ {  // (private)
   2730 
   2731 Promise<void> IdentityFunc<Promise<void>>::operator()() const { return READY_NOW; }
   2732 
   2733 }  // namespace _ (private)
   2734 
   2735 // -------------------------------------------------------------------
   2736 
   2737 #if KJ_HAS_COROUTINE
   2738 
   2739 namespace _ {  // (private)
   2740 
   2741 CoroutineBase::CoroutineBase(stdcoro::coroutine_handle<> coroutine, ExceptionOrValue& resultRef)
   2742     : coroutine(coroutine),
   2743       resultRef(resultRef) {}
   2744 CoroutineBase::~CoroutineBase() noexcept(false) {
   2745   readMaybe(maybeDisposalResults)->destructorRan = true;
   2746 }
   2747 
   2748 void CoroutineBase::unhandled_exception() {
   2749   // Pretty self-explanatory, we propagate the exception to the promise which owns us, unless
   2750   // we're being destroyed, in which case we propagate it back to our disposer. Note that all
   2751   // unhandled exceptions end up here, not just ones after the first co_await.
   2752 
   2753   KJ_IF_MAYBE(exception, kj::runCatchingExceptions([] { throw; })) {
   2754     KJ_IF_MAYBE(disposalResults, maybeDisposalResults) {
   2755       // Exception during coroutine destruction. Only record the first one.
   2756       if (disposalResults->exception == nullptr) {
   2757         disposalResults->exception = kj::mv(*exception);
   2758       }
   2759     } else if (isWaiting()) {
   2760       // Exception during coroutine execution.
   2761       resultRef.addException(kj::mv(*exception));
   2762       scheduleResumption();
   2763     } else {
   2764       // Okay, what could this mean? We've already been fulfilled or rejected, but we aren't being
   2765       // destroyed yet. The only possibility is that we are unwinding the coroutine frame due to a
   2766       // successful completion, and something in the frame threw. We can't already be rejected,
   2767       // because rejecting a coroutine involves throwing, which would have unwound the frame prior
   2768       // to setting `waiting = false`.
   2769       //
   2770       // Since we know we're unwinding due to a successful completion, we also know that whatever
   2771       // Event we may have armed has not yet fired, because we haven't had a chance to return to
   2772       // the event loop.
   2773 
   2774       // final_suspend() has not been called.
   2775       KJ_IASSERT(!coroutine.done());
   2776 
   2777       // Since final_suspend() hasn't been called, whatever Event is waiting on us has not fired,
   2778       // and will see this exception.
   2779       resultRef.addException(kj::mv(*exception));
   2780     }
   2781   } else {
   2782     KJ_UNREACHABLE;
   2783   }
   2784 }
   2785 
   2786 void CoroutineBase::onReady(Event* event) noexcept {
   2787   onReadyEvent.init(event);
   2788 }
   2789 
   2790 void CoroutineBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
   2791   if (stopAtNextEvent) return;
   2792 
   2793   KJ_IF_MAYBE(promise, promiseNodeForTrace) {
   2794     promise->tracePromise(builder, stopAtNextEvent);
   2795   }
   2796 
   2797   // Maybe returning the address of coroutine() will give us a function name with meaningful type
   2798   // information. (Narrator: It doesn't.)
   2799   builder.add(GetFunctorStartAddress<>::apply(coroutine));
   2800 };
   2801 
   2802 Maybe<Own<Event>> CoroutineBase::fire() {
   2803   // Call Awaiter::await_resume() and proceed with the coroutine. Note that this will not destroy
   2804   // the coroutine if control flows off the end of it, because we return suspend_always() from
   2805   // final_suspend().
   2806   //
   2807   // It's tempting to arrange to check for exceptions right now and reject the promise that owns
   2808   // us without resuming the coroutine, which would save us from throwing an exception when we
   2809   // already know where it's going. But, we don't really know: unlike in the KJ_NO_EXCEPTIONS
   2810   // case, the `co_await` might be in a try-catch block, so we have no choice but to resume and
   2811   // throw later.
   2812   //
   2813   // TODO(someday): If we ever support coroutines with -fno-exceptions, we'll need to reject the
   2814   //   enclosing coroutine promise here, if the Awaiter's result is exceptional.
   2815 
   2816   promiseNodeForTrace = nullptr;
   2817 
   2818   coroutine.resume();
   2819 
   2820   return nullptr;
   2821 }
   2822 
   2823 void CoroutineBase::traceEvent(TraceBuilder& builder) {
   2824   KJ_IF_MAYBE(promise, promiseNodeForTrace) {
   2825     promise->tracePromise(builder, true);
   2826   }
   2827 
   2828   // Maybe returning the address of coroutine() will give us a function name with meaningful type
   2829   // information. (Narrator: It doesn't.)
   2830   builder.add(GetFunctorStartAddress<>::apply(coroutine));
   2831 
   2832   onReadyEvent.traceEvent(builder);
   2833 }
   2834 
   2835 void CoroutineBase::disposeImpl(void* pointer) const {
   2836   KJ_IASSERT(pointer == this);
   2837 
   2838   // const_cast okay -- every Own<PromiseNode> that we build in get_return_object() uses itself
   2839   // as the disposer, thus every disposer is unique and there are no thread-safety concerns.
   2840   const_cast<CoroutineBase&>(*this).destroy();
   2841 }
   2842 
   2843 void CoroutineBase::destroy() {
   2844   // Mutable helper function for disposeImpl(). Basically a wrapper around coroutine.destroy()
   2845   // with some stuff to propagate exceptions appropriately.
   2846 
   2847   // Objects in the coroutine frame might throw from their destructors, so unhandled_exception()
   2848   // will need some way to communicate those exceptions back to us. Separately, we also want
   2849   // confirmation that our own ~Coroutine() destructor ran. To solve this, we put a
   2850   // DisposalResults object on the stack and set a pointer to it in the Coroutine object. This
   2851   // indicates to unhandled_exception() and ~Coroutine() where to store the results of the
   2852   // destruction operation.
   2853   DisposalResults disposalResults;
   2854   maybeDisposalResults = &disposalResults;
   2855 
   2856   // Need to save this while `unwindDetector` is still valid.
   2857   bool shouldRethrow = !unwindDetector.isUnwinding();
   2858 
   2859   do {
   2860     // Clang's implementation of the Coroutines TS does not destroy the Coroutine object or
   2861     // deallocate the coroutine frame if a destructor of an object on the frame threw an
   2862     // exception. This is despite the fact that it delivered the exception to _us_ via
   2863     // unhandled_exception(). Anyway, it appears we can work around this by running
   2864     // coroutine.destroy() a second time.
   2865     //
   2866     // On Clang, `disposalResults.exception != nullptr` implies `!disposalResults.destructorRan`.
   2867     // We could optimize out the separate `destructorRan` flag if we verify that other compilers
   2868     // behave the same way.
   2869     coroutine.destroy();
   2870   } while (!disposalResults.destructorRan);
   2871 
   2872   // WARNING: `this` is now a dangling pointer.
   2873 
   2874   KJ_IF_MAYBE(exception, disposalResults.exception) {
   2875     if (shouldRethrow) {
   2876       kj::throwFatalException(kj::mv(*exception));
   2877     } else {
   2878       // An exception is already unwinding the stack, so throwing this secondary exception would
   2879       // call std::terminate().
   2880     }
   2881   }
   2882 }
   2883 
   2884 CoroutineBase::AwaiterBase::AwaiterBase(Own<PromiseNode> node): node(kj::mv(node)) {}
   2885 CoroutineBase::AwaiterBase::AwaiterBase(AwaiterBase&&) = default;
   2886 CoroutineBase::AwaiterBase::~AwaiterBase() noexcept(false) {
   2887   // Make sure it's safe to generate an async stack trace between now and when the Coroutine is
   2888   // destroyed.
   2889   KJ_IF_MAYBE(coroutineEvent, maybeCoroutineEvent) {
   2890     coroutineEvent->promiseNodeForTrace = nullptr;
   2891   }
   2892 
   2893   unwindDetector.catchExceptionsIfUnwinding([this]() {
   2894     // No need to check for a moved-from state, node will just ignore the nullification.
   2895     node = nullptr;
   2896   });
   2897 }
   2898 
   2899 void CoroutineBase::AwaiterBase::getImpl(ExceptionOrValue& result) {
   2900   node->get(result);
   2901 
   2902   KJ_IF_MAYBE(exception, result.exception) {
   2903     kj::throwFatalException(kj::mv(*exception));
   2904   }
   2905 }
   2906 
   2907 bool CoroutineBase::AwaiterBase::awaitSuspendImpl(CoroutineBase& coroutineEvent) {
   2908   node->setSelfPointer(&node);
   2909   node->onReady(&coroutineEvent);
   2910 
   2911   if (coroutineEvent.isNext()) {
   2912     // The result is immediately ready! Let's cancel our event.
   2913     coroutineEvent.disarm();
   2914 
   2915     // We can resume ourselves by returning false. This accomplishes the same thing as if we had
   2916     // returned true from await_ready().
   2917     return false;
   2918   } else {
   2919     // Otherwise, we must suspend. Store a reference to the promise we're waiting on for tracing
   2920     // purposes; coroutineEvent.fire() and/or ~Adapter() will null this out.
   2921     coroutineEvent.promiseNodeForTrace = *node;
   2922     maybeCoroutineEvent = coroutineEvent;
   2923 
   2924     return true;
   2925   }
   2926 }
   2927 
   2928 }  // namespace _ (private)
   2929 
   2930 #endif  // KJ_HAS_COROUTINE
   2931 
   2932 }  // namespace kj