capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

async-inl.h (72965B)


      1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 // This file contains extended inline implementation details that are required along with async.h.
     23 // We move this all into a separate file to make async.h more readable.
     24 //
     25 // Non-inline declarations here are defined in async.c++.
     26 
     27 #pragma once
     28 
     29 #ifndef KJ_ASYNC_H_INCLUDED
     30 #error "Do not include this directly; include kj/async.h."
     31 #include "async.h"  // help IDE parse this file
     32 #endif
     33 
     34 KJ_BEGIN_HEADER
     35 
     36 #include "list.h"
     37 
     38 namespace kj {
     39 namespace _ {  // private
     40 
     41 template <typename T>
     42 class ExceptionOr;
     43 
     44 class ExceptionOrValue {
     45 public:
     46   ExceptionOrValue(bool, Exception&& exception): exception(kj::mv(exception)) {}
     47   KJ_DISALLOW_COPY(ExceptionOrValue);
     48 
     49   void addException(Exception&& exception) {
     50     if (this->exception == nullptr) {
     51       this->exception = kj::mv(exception);
     52     }
     53   }
     54 
     55   template <typename T>
     56   ExceptionOr<T>& as() { return *static_cast<ExceptionOr<T>*>(this); }
     57   template <typename T>
     58   const ExceptionOr<T>& as() const { return *static_cast<const ExceptionOr<T>*>(this); }
     59 
     60   Maybe<Exception> exception;
     61 
     62 protected:
     63   // Allow subclasses to have move constructor / assignment.
     64   ExceptionOrValue() = default;
     65   ExceptionOrValue(ExceptionOrValue&& other) = default;
     66   ExceptionOrValue& operator=(ExceptionOrValue&& other) = default;
     67 };
     68 
     69 template <typename T>
     70 class ExceptionOr: public ExceptionOrValue {
     71 public:
     72   ExceptionOr() = default;
     73   ExceptionOr(T&& value): value(kj::mv(value)) {}
     74   ExceptionOr(bool, Exception&& exception): ExceptionOrValue(false, kj::mv(exception)) {}
     75   ExceptionOr(ExceptionOr&&) = default;
     76   ExceptionOr& operator=(ExceptionOr&&) = default;
     77 
     78   Maybe<T> value;
     79 };
     80 
     81 template <typename T>
     82 inline T convertToReturn(ExceptionOr<T>&& result) {
     83   KJ_IF_MAYBE(value, result.value) {
     84     KJ_IF_MAYBE(exception, result.exception) {
     85       throwRecoverableException(kj::mv(*exception));
     86     }
     87     return _::returnMaybeVoid(kj::mv(*value));
     88   } else KJ_IF_MAYBE(exception, result.exception) {
     89     throwFatalException(kj::mv(*exception));
     90   } else {
     91     // Result contained neither a value nor an exception?
     92     KJ_UNREACHABLE;
     93   }
     94 }
     95 
     96 inline void convertToReturn(ExceptionOr<Void>&& result) {
     97   // Override <void> case to use throwRecoverableException().
     98 
     99   if (result.value != nullptr) {
    100     KJ_IF_MAYBE(exception, result.exception) {
    101       throwRecoverableException(kj::mv(*exception));
    102     }
    103   } else KJ_IF_MAYBE(exception, result.exception) {
    104     throwRecoverableException(kj::mv(*exception));
    105   } else {
    106     // Result contained neither a value nor an exception?
    107     KJ_UNREACHABLE;
    108   }
    109 }
    110 
    111 class TraceBuilder {
    112   // Helper for methods that build a call trace.
    113 public:
    114   TraceBuilder(ArrayPtr<void*> space)
    115       : start(space.begin()), current(space.begin()), limit(space.end()) {}
    116 
    117   inline void add(void* addr) {
    118     if (current < limit) {
    119       *current++ = addr;
    120     }
    121   }
    122 
    123   inline bool full() const { return current == limit; }
    124 
    125   ArrayPtr<void*> finish() {
    126     return arrayPtr(start, current);
    127   }
    128 
    129   String toString();
    130 
    131 private:
    132   void** start;
    133   void** current;
    134   void** limit;
    135 };
    136 
    137 class Event {
    138   // An event waiting to be executed.  Not for direct use by applications -- promises use this
    139   // internally.
    140 
    141 public:
    142   Event();
    143   Event(kj::EventLoop& loop);
    144   ~Event() noexcept(false);
    145   KJ_DISALLOW_COPY(Event);
    146 
    147   void armDepthFirst();
    148   // Enqueue this event so that `fire()` will be called from the event loop soon.
    149   //
    150   // Events scheduled in this way are executed in depth-first order:  if an event callback arms
    151   // more events, those events are placed at the front of the queue (in the order in which they
    152   // were armed), so that they run immediately after the first event's callback returns.
    153   //
    154   // Depth-first event scheduling is appropriate for events that represent simple continuations
    155   // of a previous event that should be globbed together for performance.  Depth-first scheduling
    156   // can lead to starvation, so any long-running task must occasionally yield with
    157   // `armBreadthFirst()`.  (Promise::then() uses depth-first whereas evalLater() uses
    158   // breadth-first.)
    159   //
    160   // To use breadth-first scheduling instead, use `armBreadthFirst()`.
    161 
    162   void armBreadthFirst();
    163   // Like `armDepthFirst()` except that the event is placed at the end of the queue.
    164 
    165   void armLast();
    166   // Enqueues this event to happen after all other events have run to completion and there is
    167   // really nothing left to do except wait for I/O.
    168 
    169   bool isNext();
    170   // True if the Event has been armed and is next in line to be fired. This can be used after
    171   // calling PromiseNode::onReady(event) to determine if a promise being waited is immediately
    172   // ready, in which case continuations may be optimistically run without returning to the event
    173   // loop. Note that this optimization is only valid if we know that we would otherwise immediately
    174   // return to the event loop without running more application code. So this turns out to be useful
    175   // in fairly narrow circumstances, chiefly when a coroutine is about to suspend, but discovers it
    176   // doesn't need to.
    177   //
    178   // Returns false if the event loop is not currently running. This ensures that promise
    179   // continuations don't execute except under a call to .wait().
    180 
    181   void disarm();
    182   // If the event is armed but hasn't fired, cancel it. (Destroying the event does this
    183   // implicitly.)
    184 
    185   virtual void traceEvent(TraceBuilder& builder) = 0;
    186   // Build a trace of the callers leading up to this event. `builder` will be populated with
    187   // "return addresses" of the promise chain waiting on this event. The return addresses may
    188   // actually the addresses of lambdas passed to .then(), but in any case, feeding them into
    189   // addr2line should produce useful source code locations.
    190   //
    191   // `traceEvent()` may be called from an async signal handler while `fire()` is executing. It
    192   // must not allocate nor take locks.
    193 
    194   String traceEvent();
    195   // Helper that builds a trace and stringifies it.
    196 
    197 protected:
    198   virtual Maybe<Own<Event>> fire() = 0;
    199   // Fire the event.  Possibly returns a pointer to itself, which will be discarded by the
    200   // caller.  This is the only way that an event can delete itself as a result of firing, as
    201   // doing so from within fire() will throw an exception.
    202 
    203 private:
    204   friend class kj::EventLoop;
    205   EventLoop& loop;
    206   Event* next;
    207   Event** prev;
    208   bool firing = false;
    209 };
    210 
    211 class PromiseNode {
    212   // A Promise<T> contains a chain of PromiseNodes tracking the pending transformations.
    213   //
    214   // To reduce generated code bloat, PromiseNode is not a template.  Instead, it makes very hacky
    215   // use of pointers to ExceptionOrValue which actually point to ExceptionOr<T>, but are only
    216   // so down-cast in the few places that really need to be templated.  Luckily this is all
    217   // internal implementation details.
    218 
    219 public:
    220   virtual void onReady(Event* event) noexcept = 0;
    221   // Arms the given event when ready.
    222   //
    223   // May be called multiple times. If called again before the event was armed, the old event will
    224   // never be armed, only the new one. If called again after the event was armed, the new event
    225   // will be armed immediately. Can be called with nullptr to un-register the existing event.
    226 
    227   virtual void setSelfPointer(Own<PromiseNode>* selfPtr) noexcept;
    228   // Tells the node that `selfPtr` is the pointer that owns this node, and will continue to own
    229   // this node until it is destroyed or setSelfPointer() is called again.  ChainPromiseNode uses
    230   // this to shorten redundant chains.  The default implementation does nothing; only
    231   // ChainPromiseNode should implement this.
    232 
    233   virtual void get(ExceptionOrValue& output) noexcept = 0;
    234   // Get the result.  `output` points to an ExceptionOr<T> into which the result will be written.
    235   // Can only be called once, and only after the node is ready.  Must be called directly from the
    236   // event loop, with no application code on the stack.
    237 
    238   virtual void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) = 0;
    239   // Build a trace of this promise chain, showing what it is currently waiting on.
    240   //
    241   // Since traces are ordered callee-before-caller, PromiseNode::tracePromise() should typically
    242   // recurse to its child first, then after the child returns, add itself to the trace.
    243   //
    244   // If `stopAtNextEvent` is true, then the trace should stop as soon as it hits a PromiseNode that
    245   // also implements Event, and should not trace that node or its children. This is used in
    246   // conjuction with Event::traceEvent(). The chain of Events is often more sparse than the chain
    247   // of PromiseNodes, because a TransformPromiseNode (which implements .then()) is not itself an
    248   // Event. TransformPromiseNode instead tells its child node to directly notify its *parent* node
    249   // when it is ready, and then TransformPromiseNode applies the .then() transformation during the
    250   // call to .get().
    251   //
    252   // So, when we trace the chain of Events backwards, we end up hoping over segments of
    253   // TransformPromiseNodes (and other similar types). In order to get those added to the trace,
    254   // each Event must call back down the PromiseNode chain in the opposite direction, using this
    255   // method.
    256   //
    257   // `tracePromise()` may be called from an async signal handler while `get()` is executing. It
    258   // must not allocate nor take locks.
    259 
    260   template <typename T>
    261   static Own<PromiseNode> from(T&& promise) {
    262     // Given a Promise, extract the PromiseNode.
    263     return kj::mv(promise.node);
    264   }
    265   template <typename T>
    266   static PromiseNode& from(T& promise) {
    267     // Given a Promise, extract the PromiseNode.
    268     return *promise.node;
    269   }
    270   template <typename T>
    271   static T to(Own<PromiseNode>&& node) {
    272     // Construct a Promise from a PromiseNode. (T should be a Promise type.)
    273     return T(false, kj::mv(node));
    274   }
    275 
    276 protected:
    277   class OnReadyEvent {
    278     // Helper class for implementing onReady().
    279 
    280   public:
    281     void init(Event* newEvent);
    282 
    283     void arm();
    284     void armBreadthFirst();
    285     // Arms the event if init() has already been called and makes future calls to init()
    286     // automatically arm the event.
    287 
    288     inline void traceEvent(TraceBuilder& builder) {
    289       if (event != nullptr && !builder.full()) event->traceEvent(builder);
    290     }
    291 
    292   private:
    293     Event* event = nullptr;
    294   };
    295 };
    296 
    297 // -------------------------------------------------------------------
    298 
    299 template <typename T>
    300 inline NeverDone::operator Promise<T>() const {
    301   return PromiseNode::to<Promise<T>>(neverDone());
    302 }
    303 
    304 // -------------------------------------------------------------------
    305 
    306 class ImmediatePromiseNodeBase: public PromiseNode {
    307 public:
    308   ImmediatePromiseNodeBase();
    309   ~ImmediatePromiseNodeBase() noexcept(false);
    310 
    311   void onReady(Event* event) noexcept override;
    312   void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
    313 };
    314 
    315 template <typename T>
    316 class ImmediatePromiseNode final: public ImmediatePromiseNodeBase {
    317   // A promise that has already been resolved to an immediate value or exception.
    318 
    319 public:
    320   ImmediatePromiseNode(ExceptionOr<T>&& result): result(kj::mv(result)) {}
    321 
    322   void get(ExceptionOrValue& output) noexcept override {
    323     output.as<T>() = kj::mv(result);
    324   }
    325 
    326 private:
    327   ExceptionOr<T> result;
    328 };
    329 
    330 class ImmediateBrokenPromiseNode final: public ImmediatePromiseNodeBase {
    331 public:
    332   ImmediateBrokenPromiseNode(Exception&& exception);
    333 
    334   void get(ExceptionOrValue& output) noexcept override;
    335 
    336 private:
    337   Exception exception;
    338 };
    339 
    340 // -------------------------------------------------------------------
    341 
    342 class AttachmentPromiseNodeBase: public PromiseNode {
    343 public:
    344   AttachmentPromiseNodeBase(Own<PromiseNode>&& dependency);
    345 
    346   void onReady(Event* event) noexcept override;
    347   void get(ExceptionOrValue& output) noexcept override;
    348   void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
    349 
    350 private:
    351   Own<PromiseNode> dependency;
    352 
    353   void dropDependency();
    354 
    355   template <typename>
    356   friend class AttachmentPromiseNode;
    357 };
    358 
    359 template <typename Attachment>
    360 class AttachmentPromiseNode final: public AttachmentPromiseNodeBase {
    361   // A PromiseNode that holds on to some object (usually, an Own<T>, but could be any movable
    362   // object) until the promise resolves.
    363 
    364 public:
    365   AttachmentPromiseNode(Own<PromiseNode>&& dependency, Attachment&& attachment)
    366       : AttachmentPromiseNodeBase(kj::mv(dependency)),
    367         attachment(kj::mv<Attachment>(attachment)) {}
    368 
    369   ~AttachmentPromiseNode() noexcept(false) {
    370     // We need to make sure the dependency is deleted before we delete the attachment because the
    371     // dependency may be using the attachment.
    372     dropDependency();
    373   }
    374 
    375 private:
    376   Attachment attachment;
    377 };
    378 
    379 // -------------------------------------------------------------------
    380 
    381 #if __GNUC__ >= 8 && !__clang__
    382 // GCC 8's class-memaccess warning rightly does not like the memcpy()'s below, but there's no
    383 // "legal" way for us to extract the content of a PTMF so too bad.
    384 #pragma GCC diagnostic push
    385 #pragma GCC diagnostic ignored "-Wclass-memaccess"
    386 #if __GNUC__ >= 11
    387 // GCC 11's array-bounds is  similarly upset with us for digging into "private" implementation
    388 // details. But the format is well-defined by the ABI which cannot change so please just let us
    389 // do it kthx.
    390 #pragma GCC diagnostic ignored "-Warray-bounds"
    391 #endif
    392 #endif
    393 
    394 template <typename T, typename ReturnType, typename... ParamTypes>
    395 void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...));
    396 template <typename T, typename ReturnType, typename... ParamTypes>
    397 void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const);
    398 // Given an object and a pointer-to-method, return the start address of the method's code. The
    399 // intent is that this address can be used in a trace; addr2line should map it to the start of
    400 // the function's definition. For virtual methods, this does a vtable lookup on `obj` to determine
    401 // the address of the specific implementation (otherwise, `obj` wouldn't be needed).
    402 //
    403 // Note that if the method is overloaded or is a template, you will need to explicitly specify
    404 // the param and return types, otherwise the compiler won't know which overload / template
    405 // specialization you are requesting.
    406 
    407 class PtmfHelper {
    408   // This class is a private helper for GetFunctorStartAddress and getMethodStartAddress(). The
    409   // class represents the internal representation of a pointer-to-member-function.
    410 
    411   template <typename... ParamTypes>
    412   friend struct GetFunctorStartAddress;
    413   template <typename T, typename ReturnType, typename... ParamTypes>
    414   friend void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...));
    415   template <typename T, typename ReturnType, typename... ParamTypes>
    416   friend void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const);
    417 
    418 #if __GNUG__
    419 
    420   void* ptr;
    421   ptrdiff_t adj;
    422   // Layout of a pointer-to-member-function used by GCC and compatible compilers.
    423 
    424   void* apply(const void* obj) {
    425 #if defined(__arm__) || defined(__mips__) || defined(__aarch64__)
    426     if (adj & 1) {
    427       ptrdiff_t voff = (ptrdiff_t)ptr;
    428 #else
    429     ptrdiff_t voff = (ptrdiff_t)ptr;
    430     if (voff & 1) {
    431       voff &= ~1;
    432 #endif
    433       return *(void**)(*(char**)obj + voff);
    434     } else {
    435       return ptr;
    436     }
    437   }
    438 
    439 #define BODY \
    440     PtmfHelper result; \
    441     static_assert(sizeof(p) == sizeof(result), "unknown ptmf layout"); \
    442     memcpy(&result, &p, sizeof(result)); \
    443     return result
    444 
    445 #else  // __GNUG__
    446 
    447   void* apply(const void* obj) { return nullptr; }
    448   // TODO(port):  PTMF instruction address extraction
    449 
    450 #define BODY return PtmfHelper{}
    451 
    452 #endif  // __GNUG__, else
    453 
    454   template <typename R, typename C, typename... P, typename F>
    455   static PtmfHelper from(F p) { BODY; }
    456   // Create a PtmfHelper from some arbitrary pointer-to-member-function which is not
    457   // overloaded nor a template. In this case the compiler is able to deduce the full function
    458   // signature directly given the name since there is only one function with that name.
    459 
    460   template <typename R, typename C, typename... P>
    461   static PtmfHelper from(R (C::*p)(NoInfer<P>...)) { BODY; }
    462   template <typename R, typename C, typename... P>
    463   static PtmfHelper from(R (C::*p)(NoInfer<P>...) const) { BODY; }
    464   // Create a PtmfHelper from some poniter-to-member-function which is a template. In this case
    465   // the function must match exactly the containing type C, return type R, and parameter types P...
    466   // GetFunctorStartAddress normally specifies exactly the correct C and R, but can only make a
    467   // guess at P. Luckily, if the function parameters are template parameters then it's not
    468   // necessary to be precise about P.
    469 #undef BODY
    470 };
    471 
    472 #if __GNUC__ >= 8 && !__clang__
    473 #pragma GCC diagnostic pop
    474 #endif
    475 
    476 template <typename T, typename ReturnType, typename... ParamTypes>
    477 void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...)) {
    478   return PtmfHelper::from<ReturnType, T, ParamTypes...>(method).apply(&obj);
    479 }
    480 template <typename T, typename ReturnType, typename... ParamTypes>
    481 void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const) {
    482   return PtmfHelper::from<ReturnType, T, ParamTypes...>(method).apply(&obj);
    483 }
    484 
    485 template <typename... ParamTypes>
    486 struct GetFunctorStartAddress {
    487   // Given a functor (any object defining operator()), return the start address of the function,
    488   // suitable for passing to addr2line to obtain a source file/line for debugging purposes.
    489   //
    490   // This turns out to be incredibly hard to implement in the presence of overloaded or templated
    491   // functors. Therefore, we impose these specific restrictions, specific to our use case:
    492   // - Overloading is not allowed, but templating is. (Generally we only intend to support lambdas
    493   //   anyway.)
    494   // - The template parameters to GetFunctorStartAddress specify a hint as to the expected
    495   //   parameter types. If the functor is templated, its parameters must match exactly these types.
    496   //   (If it's not templated, ParamTypes are ignored.)
    497 
    498   template <typename Func>
    499   static void* apply(Func&& func) {
    500     typedef decltype(func(instance<ParamTypes>()...)) ReturnType;
    501     return PtmfHelper::from<ReturnType, Decay<Func>, ParamTypes...>(
    502         &Decay<Func>::operator()).apply(&func);
    503   }
    504 };
    505 
    506 template <>
    507 struct GetFunctorStartAddress<Void&&>: public GetFunctorStartAddress<> {};
    508 // Hack for TransformPromiseNode use case: an input type of `Void` indicates that the function
    509 // actually has no parameters.
    510 
    511 class TransformPromiseNodeBase: public PromiseNode {
    512 public:
    513   TransformPromiseNodeBase(Own<PromiseNode>&& dependency, void* continuationTracePtr);
    514 
    515   void onReady(Event* event) noexcept override;
    516   void get(ExceptionOrValue& output) noexcept override;
    517   void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
    518 
    519 private:
    520   Own<PromiseNode> dependency;
    521   void* continuationTracePtr;
    522 
    523   void dropDependency();
    524   void getDepResult(ExceptionOrValue& output);
    525 
    526   virtual void getImpl(ExceptionOrValue& output) = 0;
    527 
    528   template <typename, typename, typename, typename>
    529   friend class TransformPromiseNode;
    530 };
    531 
    532 template <typename T, typename DepT, typename Func, typename ErrorFunc>
    533 class TransformPromiseNode final: public TransformPromiseNodeBase {
    534   // A PromiseNode that transforms the result of another PromiseNode through an application-provided
    535   // function (implements `then()`).
    536 
    537 public:
    538   TransformPromiseNode(Own<PromiseNode>&& dependency, Func&& func, ErrorFunc&& errorHandler,
    539                        void* continuationTracePtr)
    540       : TransformPromiseNodeBase(kj::mv(dependency), continuationTracePtr),
    541         func(kj::fwd<Func>(func)), errorHandler(kj::fwd<ErrorFunc>(errorHandler)) {}
    542 
    543   ~TransformPromiseNode() noexcept(false) {
    544     // We need to make sure the dependency is deleted before we delete the continuations because it
    545     // is a common pattern for the continuations to hold ownership of objects that might be in-use
    546     // by the dependency.
    547     dropDependency();
    548   }
    549 
    550 private:
    551   Func func;
    552   ErrorFunc errorHandler;
    553 
    554   void getImpl(ExceptionOrValue& output) override {
    555     ExceptionOr<DepT> depResult;
    556     getDepResult(depResult);
    557     KJ_IF_MAYBE(depException, depResult.exception) {
    558       output.as<T>() = handle(
    559           MaybeVoidCaller<Exception, FixVoid<ReturnType<ErrorFunc, Exception>>>::apply(
    560               errorHandler, kj::mv(*depException)));
    561     } else KJ_IF_MAYBE(depValue, depResult.value) {
    562       output.as<T>() = handle(MaybeVoidCaller<DepT, T>::apply(func, kj::mv(*depValue)));
    563     }
    564   }
    565 
    566   ExceptionOr<T> handle(T&& value) {
    567     return kj::mv(value);
    568   }
    569   ExceptionOr<T> handle(PropagateException::Bottom&& value) {
    570     return ExceptionOr<T>(false, value.asException());
    571   }
    572 };
    573 
    574 // -------------------------------------------------------------------
    575 
    576 class ForkHubBase;
    577 
    578 class ForkBranchBase: public PromiseNode {
    579 public:
    580   ForkBranchBase(Own<ForkHubBase>&& hub);
    581   ~ForkBranchBase() noexcept(false);
    582 
    583   void hubReady() noexcept;
    584   // Called by the hub to indicate that it is ready.
    585 
    586   // implements PromiseNode ------------------------------------------
    587   void onReady(Event* event) noexcept override;
    588   void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
    589 
    590 protected:
    591   inline ExceptionOrValue& getHubResultRef();
    592 
    593   void releaseHub(ExceptionOrValue& output);
    594   // Release the hub.  If an exception is thrown, add it to `output`.
    595 
    596 private:
    597   OnReadyEvent onReadyEvent;
    598 
    599   Own<ForkHubBase> hub;
    600   ForkBranchBase* next = nullptr;
    601   ForkBranchBase** prevPtr = nullptr;
    602 
    603   friend class ForkHubBase;
    604 };
    605 
    606 template <typename T> T copyOrAddRef(T& t) { return t; }
    607 template <typename T> Own<T> copyOrAddRef(Own<T>& t) { return t->addRef(); }
    608 template <typename T> Maybe<Own<T>> copyOrAddRef(Maybe<Own<T>>& t) {
    609   return t.map([](Own<T>& ptr) {
    610     return ptr->addRef();
    611   });
    612 }
    613 
    614 template <typename T>
    615 class ForkBranch final: public ForkBranchBase {
    616   // A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives
    617   // a const reference.
    618 
    619 public:
    620   ForkBranch(Own<ForkHubBase>&& hub): ForkBranchBase(kj::mv(hub)) {}
    621 
    622   void get(ExceptionOrValue& output) noexcept override {
    623     ExceptionOr<T>& hubResult = getHubResultRef().template as<T>();
    624     KJ_IF_MAYBE(value, hubResult.value) {
    625       output.as<T>().value = copyOrAddRef(*value);
    626     } else {
    627       output.as<T>().value = nullptr;
    628     }
    629     output.exception = hubResult.exception;
    630     releaseHub(output);
    631   }
    632 };
    633 
    634 template <typename T, size_t index>
    635 class SplitBranch final: public ForkBranchBase {
    636   // A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives
    637   // a const reference.
    638 
    639 public:
    640   SplitBranch(Own<ForkHubBase>&& hub): ForkBranchBase(kj::mv(hub)) {}
    641 
    642   typedef kj::Decay<decltype(kj::get<index>(kj::instance<T>()))> Element;
    643 
    644   void get(ExceptionOrValue& output) noexcept override {
    645     ExceptionOr<T>& hubResult = getHubResultRef().template as<T>();
    646     KJ_IF_MAYBE(value, hubResult.value) {
    647       output.as<Element>().value = kj::mv(kj::get<index>(*value));
    648     } else {
    649       output.as<Element>().value = nullptr;
    650     }
    651     output.exception = hubResult.exception;
    652     releaseHub(output);
    653   }
    654 };
    655 
    656 // -------------------------------------------------------------------
    657 
    658 class ForkHubBase: public Refcounted, protected Event {
    659 public:
    660   ForkHubBase(Own<PromiseNode>&& inner, ExceptionOrValue& resultRef);
    661 
    662   inline ExceptionOrValue& getResultRef() { return resultRef; }
    663 
    664 private:
    665   Own<PromiseNode> inner;
    666   ExceptionOrValue& resultRef;
    667 
    668   ForkBranchBase* headBranch = nullptr;
    669   ForkBranchBase** tailBranch = &headBranch;
    670   // Tail becomes null once the inner promise is ready and all branches have been notified.
    671 
    672   Maybe<Own<Event>> fire() override;
    673   void traceEvent(TraceBuilder& builder) override;
    674 
    675   friend class ForkBranchBase;
    676 };
    677 
    678 template <typename T>
    679 class ForkHub final: public ForkHubBase {
    680   // A PromiseNode that implements the hub of a fork.  The first call to Promise::fork() replaces
    681   // the promise's outer node with a ForkHub, and subsequent calls add branches to that hub (if
    682   // possible).
    683 
    684 public:
    685   ForkHub(Own<PromiseNode>&& inner): ForkHubBase(kj::mv(inner), result) {}
    686 
    687   Promise<_::UnfixVoid<T>> addBranch() {
    688     return _::PromiseNode::to<Promise<_::UnfixVoid<T>>>(kj::heap<ForkBranch<T>>(addRef(*this)));
    689   }
    690 
    691   _::SplitTuplePromise<T> split() {
    692     return splitImpl(MakeIndexes<tupleSize<T>()>());
    693   }
    694 
    695 private:
    696   ExceptionOr<T> result;
    697 
    698   template <size_t... indexes>
    699   _::SplitTuplePromise<T> splitImpl(Indexes<indexes...>) {
    700     return kj::tuple(addSplit<indexes>()...);
    701   }
    702 
    703   template <size_t index>
    704   ReducePromises<typename SplitBranch<T, index>::Element> addSplit() {
    705     return _::PromiseNode::to<ReducePromises<typename SplitBranch<T, index>::Element>>(
    706         maybeChain(kj::heap<SplitBranch<T, index>>(addRef(*this)),
    707                    implicitCast<typename SplitBranch<T, index>::Element*>(nullptr)));
    708   }
    709 };
    710 
    711 inline ExceptionOrValue& ForkBranchBase::getHubResultRef() {
    712   return hub->getResultRef();
    713 }
    714 
    715 // -------------------------------------------------------------------
    716 
    717 class ChainPromiseNode final: public PromiseNode, public Event {
    718   // Promise node which reduces Promise<Promise<T>> to Promise<T>.
    719   //
    720   // `Event` is only a public base class because otherwise we can't cast Own<ChainPromiseNode> to
    721   // Own<Event>.  Ugh, templates and private...
    722 
    723 public:
    724   explicit ChainPromiseNode(Own<PromiseNode> inner);
    725   ~ChainPromiseNode() noexcept(false);
    726 
    727   void onReady(Event* event) noexcept override;
    728   void setSelfPointer(Own<PromiseNode>* selfPtr) noexcept override;
    729   void get(ExceptionOrValue& output) noexcept override;
    730   void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
    731 
    732 private:
    733   enum State {
    734     STEP1,
    735     STEP2
    736   };
    737 
    738   State state;
    739 
    740   Own<PromiseNode> inner;
    741   // In STEP1, a PromiseNode for a Promise<T>.
    742   // In STEP2, a PromiseNode for a T.
    743 
    744   Event* onReadyEvent = nullptr;
    745   Own<PromiseNode>* selfPtr = nullptr;
    746 
    747   Maybe<Own<Event>> fire() override;
    748   void traceEvent(TraceBuilder& builder) override;
    749 };
    750 
    751 template <typename T>
    752 Own<PromiseNode> maybeChain(Own<PromiseNode>&& node, Promise<T>*) {
    753   return heap<ChainPromiseNode>(kj::mv(node));
    754 }
    755 
    756 template <typename T>
    757 Own<PromiseNode>&& maybeChain(Own<PromiseNode>&& node, T*) {
    758   return kj::mv(node);
    759 }
    760 
    761 template <typename T, typename Result = decltype(T::reducePromise(instance<Promise<T>>()))>
    762 inline Result maybeReduce(Promise<T>&& promise, bool) {
    763   return T::reducePromise(kj::mv(promise));
    764 }
    765 
    766 template <typename T>
    767 inline Promise<T> maybeReduce(Promise<T>&& promise, ...) {
    768   return kj::mv(promise);
    769 }
    770 
    771 // -------------------------------------------------------------------
    772 
    773 class ExclusiveJoinPromiseNode final: public PromiseNode {
    774 public:
    775   ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<PromiseNode> right);
    776   ~ExclusiveJoinPromiseNode() noexcept(false);
    777 
    778   void onReady(Event* event) noexcept override;
    779   void get(ExceptionOrValue& output) noexcept override;
    780   void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
    781 
    782 private:
    783   class Branch: public Event {
    784   public:
    785     Branch(ExclusiveJoinPromiseNode& joinNode, Own<PromiseNode> dependency);
    786     ~Branch() noexcept(false);
    787 
    788     bool get(ExceptionOrValue& output);
    789     // Returns true if this is the side that finished.
    790 
    791     Maybe<Own<Event>> fire() override;
    792     void traceEvent(TraceBuilder& builder) override;
    793 
    794   private:
    795     ExclusiveJoinPromiseNode& joinNode;
    796     Own<PromiseNode> dependency;
    797 
    798     friend class ExclusiveJoinPromiseNode;
    799   };
    800 
    801   Branch left;
    802   Branch right;
    803   OnReadyEvent onReadyEvent;
    804 };
    805 
    806 // -------------------------------------------------------------------
    807 
    808 class ArrayJoinPromiseNodeBase: public PromiseNode {
    809 public:
    810   ArrayJoinPromiseNodeBase(Array<Own<PromiseNode>> promises,
    811                            ExceptionOrValue* resultParts, size_t partSize);
    812   ~ArrayJoinPromiseNodeBase() noexcept(false);
    813 
    814   void onReady(Event* event) noexcept override final;
    815   void get(ExceptionOrValue& output) noexcept override final;
    816   void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override final;
    817 
    818 protected:
    819   virtual void getNoError(ExceptionOrValue& output) noexcept = 0;
    820   // Called to compile the result only in the case where there were no errors.
    821 
    822 private:
    823   uint countLeft;
    824   OnReadyEvent onReadyEvent;
    825 
    826   class Branch final: public Event {
    827   public:
    828     Branch(ArrayJoinPromiseNodeBase& joinNode, Own<PromiseNode> dependency,
    829            ExceptionOrValue& output);
    830     ~Branch() noexcept(false);
    831 
    832     Maybe<Own<Event>> fire() override;
    833     void traceEvent(TraceBuilder& builder) override;
    834 
    835     Maybe<Exception> getPart();
    836     // Calls dependency->get(output).  If there was an exception, return it.
    837 
    838   private:
    839     ArrayJoinPromiseNodeBase& joinNode;
    840     Own<PromiseNode> dependency;
    841     ExceptionOrValue& output;
    842 
    843     friend class ArrayJoinPromiseNodeBase;
    844   };
    845 
    846   Array<Branch> branches;
    847 };
    848 
    849 template <typename T>
    850 class ArrayJoinPromiseNode final: public ArrayJoinPromiseNodeBase {
    851 public:
    852   ArrayJoinPromiseNode(Array<Own<PromiseNode>> promises,
    853                        Array<ExceptionOr<T>> resultParts)
    854       : ArrayJoinPromiseNodeBase(kj::mv(promises), resultParts.begin(), sizeof(ExceptionOr<T>)),
    855         resultParts(kj::mv(resultParts)) {}
    856 
    857 protected:
    858   void getNoError(ExceptionOrValue& output) noexcept override {
    859     auto builder = heapArrayBuilder<T>(resultParts.size());
    860     for (auto& part: resultParts) {
    861       KJ_IASSERT(part.value != nullptr,
    862                  "Bug in KJ promise framework:  Promise result had neither value no exception.");
    863       builder.add(kj::mv(*_::readMaybe(part.value)));
    864     }
    865     output.as<Array<T>>() = builder.finish();
    866   }
    867 
    868 private:
    869   Array<ExceptionOr<T>> resultParts;
    870 };
    871 
    872 template <>
    873 class ArrayJoinPromiseNode<void> final: public ArrayJoinPromiseNodeBase {
    874 public:
    875   ArrayJoinPromiseNode(Array<Own<PromiseNode>> promises,
    876                        Array<ExceptionOr<_::Void>> resultParts);
    877   ~ArrayJoinPromiseNode();
    878 
    879 protected:
    880   void getNoError(ExceptionOrValue& output) noexcept override;
    881 
    882 private:
    883   Array<ExceptionOr<_::Void>> resultParts;
    884 };
    885 
    886 // -------------------------------------------------------------------
    887 
    888 class EagerPromiseNodeBase: public PromiseNode, protected Event {
    889   // A PromiseNode that eagerly evaluates its dependency even if its dependent does not eagerly
    890   // evaluate it.
    891 
    892 public:
    893   EagerPromiseNodeBase(Own<PromiseNode>&& dependency, ExceptionOrValue& resultRef);
    894 
    895   void onReady(Event* event) noexcept override;
    896   void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
    897 
    898 private:
    899   Own<PromiseNode> dependency;
    900   OnReadyEvent onReadyEvent;
    901 
    902   ExceptionOrValue& resultRef;
    903 
    904   Maybe<Own<Event>> fire() override;
    905   void traceEvent(TraceBuilder& builder) override;
    906 };
    907 
    908 template <typename T>
    909 class EagerPromiseNode final: public EagerPromiseNodeBase {
    910 public:
    911   EagerPromiseNode(Own<PromiseNode>&& dependency)
    912       : EagerPromiseNodeBase(kj::mv(dependency), result) {}
    913 
    914   void get(ExceptionOrValue& output) noexcept override {
    915     output.as<T>() = kj::mv(result);
    916   }
    917 
    918 private:
    919   ExceptionOr<T> result;
    920 };
    921 
    922 template <typename T>
    923 Own<PromiseNode> spark(Own<PromiseNode>&& node) {
    924   // Forces evaluation of the given node to begin as soon as possible, even if no one is waiting
    925   // on it.
    926   return heap<EagerPromiseNode<T>>(kj::mv(node));
    927 }
    928 
    929 // -------------------------------------------------------------------
    930 
    931 class AdapterPromiseNodeBase: public PromiseNode {
    932 public:
    933   void onReady(Event* event) noexcept override;
    934   void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
    935 
    936 protected:
    937   inline void setReady() {
    938     onReadyEvent.arm();
    939   }
    940 
    941 private:
    942   OnReadyEvent onReadyEvent;
    943 };
    944 
    945 template <typename T, typename Adapter>
    946 class AdapterPromiseNode final: public AdapterPromiseNodeBase,
    947                                 private PromiseFulfiller<UnfixVoid<T>> {
    948   // A PromiseNode that wraps a PromiseAdapter.
    949 
    950 public:
    951   template <typename... Params>
    952   AdapterPromiseNode(Params&&... params)
    953       : adapter(static_cast<PromiseFulfiller<UnfixVoid<T>>&>(*this), kj::fwd<Params>(params)...) {}
    954 
    955   void get(ExceptionOrValue& output) noexcept override {
    956     KJ_IREQUIRE(!isWaiting());
    957     output.as<T>() = kj::mv(result);
    958   }
    959 
    960 private:
    961   ExceptionOr<T> result;
    962   bool waiting = true;
    963   Adapter adapter;
    964 
    965   void fulfill(T&& value) override {
    966     if (waiting) {
    967       waiting = false;
    968       result = ExceptionOr<T>(kj::mv(value));
    969       setReady();
    970     }
    971   }
    972 
    973   void reject(Exception&& exception) override {
    974     if (waiting) {
    975       waiting = false;
    976       result = ExceptionOr<T>(false, kj::mv(exception));
    977       setReady();
    978     }
    979   }
    980 
    981   bool isWaiting() override {
    982     return waiting;
    983   }
    984 };
    985 
    986 // -------------------------------------------------------------------
    987 
    988 class FiberBase: public PromiseNode, private Event {
    989   // Base class for the outer PromiseNode representing a fiber.
    990 
    991 public:
    992   explicit FiberBase(size_t stackSize, _::ExceptionOrValue& result);
    993   explicit FiberBase(const FiberPool& pool, _::ExceptionOrValue& result);
    994   ~FiberBase() noexcept(false);
    995 
    996   void start() { armDepthFirst(); }
    997   // Call immediately after construction to begin executing the fiber.
    998 
    999   class WaitDoneEvent;
   1000 
   1001   void onReady(_::Event* event) noexcept override;
   1002   void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
   1003 
   1004 protected:
   1005   bool isFinished() { return state == FINISHED; }
   1006   void destroy();
   1007 
   1008 private:
   1009   enum { WAITING, RUNNING, CANCELED, FINISHED } state;
   1010 
   1011   _::PromiseNode* currentInner = nullptr;
   1012   OnReadyEvent onReadyEvent;
   1013   Own<FiberStack> stack;
   1014   _::ExceptionOrValue& result;
   1015 
   1016   void run();
   1017   virtual void runImpl(WaitScope& waitScope) = 0;
   1018 
   1019   Maybe<Own<Event>> fire() override;
   1020   void traceEvent(TraceBuilder& builder) override;
   1021   // Implements Event. Each time the event is fired, switchToFiber() is called.
   1022 
   1023   friend class FiberStack;
   1024   friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result,
   1025                           WaitScope& waitScope);
   1026   friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope);
   1027 };
   1028 
   1029 template <typename Func>
   1030 class Fiber final: public FiberBase {
   1031 public:
   1032   explicit Fiber(size_t stackSize, Func&& func): FiberBase(stackSize, result), func(kj::fwd<Func>(func)) {}
   1033   explicit Fiber(const FiberPool& pool, Func&& func): FiberBase(pool, result), func(kj::fwd<Func>(func)) {}
   1034   ~Fiber() noexcept(false) { destroy(); }
   1035 
   1036   typedef FixVoid<decltype(kj::instance<Func&>()(kj::instance<WaitScope&>()))> ResultType;
   1037 
   1038   void get(ExceptionOrValue& output) noexcept override {
   1039     KJ_IREQUIRE(isFinished());
   1040     output.as<ResultType>() = kj::mv(result);
   1041   }
   1042 
   1043 private:
   1044   Func func;
   1045   ExceptionOr<ResultType> result;
   1046 
   1047   void runImpl(WaitScope& waitScope) override {
   1048     result.template as<ResultType>() =
   1049         MaybeVoidCaller<WaitScope&, ResultType>::apply(func, waitScope);
   1050   }
   1051 };
   1052 
   1053 }  // namespace _ (private)
   1054 
   1055 // =======================================================================================
   1056 
   1057 template <typename T>
   1058 Promise<T>::Promise(_::FixVoid<T> value)
   1059     : PromiseBase(heap<_::ImmediatePromiseNode<_::FixVoid<T>>>(kj::mv(value))) {}
   1060 
   1061 template <typename T>
   1062 Promise<T>::Promise(kj::Exception&& exception)
   1063     : PromiseBase(heap<_::ImmediateBrokenPromiseNode>(kj::mv(exception))) {}
   1064 
   1065 template <typename T>
   1066 template <typename Func, typename ErrorFunc>
   1067 PromiseForResult<Func, T> Promise<T>::then(Func&& func, ErrorFunc&& errorHandler) {
   1068   typedef _::FixVoid<_::ReturnType<Func, T>> ResultT;
   1069 
   1070   void* continuationTracePtr = _::GetFunctorStartAddress<_::FixVoid<T>&&>::apply(func);
   1071   Own<_::PromiseNode> intermediate =
   1072       heap<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>(
   1073           kj::mv(node), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler),
   1074           continuationTracePtr);
   1075   auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, T>>>(
   1076       _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr)));
   1077   return _::maybeReduce(kj::mv(result), false);
   1078 }
   1079 
   1080 namespace _ {  // private
   1081 
   1082 template <typename T>
   1083 struct IdentityFunc {
   1084   inline T operator()(T&& value) const {
   1085     return kj::mv(value);
   1086   }
   1087 };
   1088 template <typename T>
   1089 struct IdentityFunc<Promise<T>> {
   1090   inline Promise<T> operator()(T&& value) const {
   1091     return kj::mv(value);
   1092   }
   1093 };
   1094 template <>
   1095 struct IdentityFunc<void> {
   1096   inline void operator()() const {}
   1097 };
   1098 template <>
   1099 struct IdentityFunc<Promise<void>> {
   1100   Promise<void> operator()() const;
   1101   // This can't be inline because it will make the translation unit depend on kj-async. Awkwardly,
   1102   // Cap'n Proto relies on being able to include this header without creating such a link-time
   1103   // dependency.
   1104 };
   1105 
   1106 }  // namespace _ (private)
   1107 
   1108 template <typename T>
   1109 template <typename ErrorFunc>
   1110 Promise<T> Promise<T>::catch_(ErrorFunc&& errorHandler) {
   1111   // then()'s ErrorFunc can only return a Promise if Func also returns a Promise. In this case,
   1112   // Func is being filled in automatically. We want to make sure ErrorFunc can return a Promise,
   1113   // but we don't want the extra overhead of promise chaining if ErrorFunc doesn't actually
   1114   // return a promise. So we make our Func return match ErrorFunc.
   1115   typedef _::IdentityFunc<decltype(errorHandler(instance<Exception&&>()))> Func;
   1116   typedef _::FixVoid<_::ReturnType<Func, T>> ResultT;
   1117 
   1118   // The reason catch_() isn't simply implemented in terms of then() is because we want the trace
   1119   // pointer to be based on ErrorFunc rather than Func.
   1120   void* continuationTracePtr = _::GetFunctorStartAddress<kj::Exception&&>::apply(errorHandler);
   1121   Own<_::PromiseNode> intermediate =
   1122       heap<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>(
   1123           kj::mv(node), Func(), kj::fwd<ErrorFunc>(errorHandler), continuationTracePtr);
   1124   auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, T>>>(
   1125       _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr)));
   1126   return _::maybeReduce(kj::mv(result), false);
   1127 }
   1128 
   1129 template <typename T>
   1130 T Promise<T>::wait(WaitScope& waitScope) {
   1131   _::ExceptionOr<_::FixVoid<T>> result;
   1132   _::waitImpl(kj::mv(node), result, waitScope);
   1133   return convertToReturn(kj::mv(result));
   1134 }
   1135 
   1136 template <typename T>
   1137 bool Promise<T>::poll(WaitScope& waitScope) {
   1138   return _::pollImpl(*node, waitScope);
   1139 }
   1140 
   1141 template <typename T>
   1142 ForkedPromise<T> Promise<T>::fork() {
   1143   return ForkedPromise<T>(false, refcounted<_::ForkHub<_::FixVoid<T>>>(kj::mv(node)));
   1144 }
   1145 
   1146 template <typename T>
   1147 Promise<T> ForkedPromise<T>::addBranch() {
   1148   return hub->addBranch();
   1149 }
   1150 
   1151 template <typename T>
   1152 bool ForkedPromise<T>::hasBranches() {
   1153   return hub->isShared();
   1154 }
   1155 
   1156 template <typename T>
   1157 _::SplitTuplePromise<T> Promise<T>::split() {
   1158   return refcounted<_::ForkHub<_::FixVoid<T>>>(kj::mv(node))->split();
   1159 }
   1160 
   1161 template <typename T>
   1162 Promise<T> Promise<T>::exclusiveJoin(Promise<T>&& other) {
   1163   return Promise(false, heap<_::ExclusiveJoinPromiseNode>(kj::mv(node), kj::mv(other.node)));
   1164 }
   1165 
   1166 template <typename T>
   1167 template <typename... Attachments>
   1168 Promise<T> Promise<T>::attach(Attachments&&... attachments) {
   1169   return Promise(false, kj::heap<_::AttachmentPromiseNode<Tuple<Attachments...>>>(
   1170       kj::mv(node), kj::tuple(kj::fwd<Attachments>(attachments)...)));
   1171 }
   1172 
   1173 template <typename T>
   1174 template <typename ErrorFunc>
   1175 Promise<T> Promise<T>::eagerlyEvaluate(ErrorFunc&& errorHandler) {
   1176   // See catch_() for commentary.
   1177   return Promise(false, _::spark<_::FixVoid<T>>(then(
   1178       _::IdentityFunc<decltype(errorHandler(instance<Exception&&>()))>(),
   1179       kj::fwd<ErrorFunc>(errorHandler)).node));
   1180 }
   1181 
   1182 template <typename T>
   1183 Promise<T> Promise<T>::eagerlyEvaluate(decltype(nullptr)) {
   1184   return Promise(false, _::spark<_::FixVoid<T>>(kj::mv(node)));
   1185 }
   1186 
   1187 template <typename T>
   1188 kj::String Promise<T>::trace() {
   1189   return PromiseBase::trace();
   1190 }
   1191 
   1192 template <typename Func>
   1193 inline PromiseForResult<Func, void> evalLater(Func&& func) {
   1194   return _::yield().then(kj::fwd<Func>(func), _::PropagateException());
   1195 }
   1196 
   1197 template <typename Func>
   1198 inline PromiseForResult<Func, void> evalLast(Func&& func) {
   1199   return _::yieldHarder().then(kj::fwd<Func>(func), _::PropagateException());
   1200 }
   1201 
   1202 template <typename Func>
   1203 inline PromiseForResult<Func, void> evalNow(Func&& func) {
   1204   PromiseForResult<Func, void> result = nullptr;
   1205   KJ_IF_MAYBE(e, kj::runCatchingExceptions([&]() {
   1206     result = func();
   1207   })) {
   1208     result = kj::mv(*e);
   1209   }
   1210   return result;
   1211 }
   1212 
   1213 template <typename Func>
   1214 struct RetryOnDisconnect_ {
   1215   static inline PromiseForResult<Func, void> apply(Func&& func) {
   1216     return evalLater([func = kj::mv(func)]() mutable -> PromiseForResult<Func, void> {
   1217       auto promise = evalNow(func);
   1218       return promise.catch_([func = kj::mv(func)](kj::Exception&& e) mutable -> PromiseForResult<Func, void> {
   1219         if (e.getType() == kj::Exception::Type::DISCONNECTED) {
   1220           return func();
   1221         } else {
   1222           return kj::mv(e);
   1223         }
   1224       });
   1225     });
   1226   }
   1227 };
   1228 template <typename Func>
   1229 struct RetryOnDisconnect_<Func&> {
   1230   // Specialization for references. Needed because the syntax for capturing references in a
   1231   // lambda is different. :(
   1232   static inline PromiseForResult<Func, void> apply(Func& func) {
   1233     auto promise = evalLater(func);
   1234     return promise.catch_([&func](kj::Exception&& e) -> PromiseForResult<Func, void> {
   1235       if (e.getType() == kj::Exception::Type::DISCONNECTED) {
   1236         return func();
   1237       } else {
   1238         return kj::mv(e);
   1239       }
   1240     });
   1241   }
   1242 };
   1243 
   1244 template <typename Func>
   1245 inline PromiseForResult<Func, void> retryOnDisconnect(Func&& func) {
   1246   return RetryOnDisconnect_<Func>::apply(kj::fwd<Func>(func));
   1247 }
   1248 
   1249 template <typename Func>
   1250 inline PromiseForResult<Func, WaitScope&> startFiber(size_t stackSize, Func&& func) {
   1251   typedef _::FixVoid<_::ReturnType<Func, WaitScope&>> ResultT;
   1252 
   1253   Own<_::FiberBase> intermediate = kj::heap<_::Fiber<Func>>(stackSize, kj::fwd<Func>(func));
   1254   intermediate->start();
   1255   auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, WaitScope&>>>(
   1256       _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr)));
   1257   return _::maybeReduce(kj::mv(result), false);
   1258 }
   1259 
   1260 template <typename Func>
   1261 inline PromiseForResult<Func, WaitScope&> FiberPool::startFiber(Func&& func) const {
   1262   typedef _::FixVoid<_::ReturnType<Func, WaitScope&>> ResultT;
   1263 
   1264   Own<_::FiberBase> intermediate = kj::heap<_::Fiber<Func>>(*this, kj::fwd<Func>(func));
   1265   intermediate->start();
   1266   auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, WaitScope&>>>(
   1267       _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr)));
   1268   return _::maybeReduce(kj::mv(result), false);
   1269 }
   1270 
   1271 template <typename T>
   1272 template <typename ErrorFunc>
   1273 void Promise<T>::detach(ErrorFunc&& errorHandler) {
   1274   return _::detach(then([](T&&) {}, kj::fwd<ErrorFunc>(errorHandler)));
   1275 }
   1276 
   1277 template <>
   1278 template <typename ErrorFunc>
   1279 void Promise<void>::detach(ErrorFunc&& errorHandler) {
   1280   return _::detach(then([]() {}, kj::fwd<ErrorFunc>(errorHandler)));
   1281 }
   1282 
   1283 template <typename T>
   1284 Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises) {
   1285   return _::PromiseNode::to<Promise<Array<T>>>(kj::heap<_::ArrayJoinPromiseNode<T>>(
   1286       KJ_MAP(p, promises) { return _::PromiseNode::from(kj::mv(p)); },
   1287       heapArray<_::ExceptionOr<T>>(promises.size())));
   1288 }
   1289 
   1290 // =======================================================================================
   1291 
   1292 namespace _ {  // private
   1293 
   1294 class WeakFulfillerBase: protected kj::Disposer {
   1295 protected:
   1296   WeakFulfillerBase(): inner(nullptr) {}
   1297   virtual ~WeakFulfillerBase() noexcept(false) {}
   1298 
   1299   template <typename T>
   1300   inline PromiseFulfiller<T>* getInner() {
   1301     return static_cast<PromiseFulfiller<T>*>(inner);
   1302   };
   1303   template <typename T>
   1304   inline void setInner(PromiseFulfiller<T>* ptr) {
   1305     inner = ptr;
   1306   };
   1307 
   1308 private:
   1309   mutable PromiseRejector* inner;
   1310 
   1311   void disposeImpl(void* pointer) const override;
   1312 };
   1313 
   1314 template <typename T>
   1315 class WeakFulfiller final: public PromiseFulfiller<T>, public WeakFulfillerBase {
   1316   // A wrapper around PromiseFulfiller which can be detached.
   1317   //
   1318   // There are a couple non-trivialities here:
   1319   // - If the WeakFulfiller is discarded, we want the promise it fulfills to be implicitly
   1320   //   rejected.
   1321   // - We cannot destroy the WeakFulfiller until the application has discarded it *and* it has been
   1322   //   detached from the underlying fulfiller, because otherwise the later detach() call will go
   1323   //   to a dangling pointer.  Essentially, WeakFulfiller is reference counted, although the
   1324   //   refcount never goes over 2 and we manually implement the refcounting because we need to do
   1325   //   other special things when each side detaches anyway.  To this end, WeakFulfiller is its own
   1326   //   Disposer -- dispose() is called when the application discards its owned pointer to the
   1327   //   fulfiller and detach() is called when the promise is destroyed.
   1328 
   1329 public:
   1330   KJ_DISALLOW_COPY(WeakFulfiller);
   1331 
   1332   static kj::Own<WeakFulfiller> make() {
   1333     WeakFulfiller* ptr = new WeakFulfiller;
   1334     return Own<WeakFulfiller>(ptr, *ptr);
   1335   }
   1336 
   1337   void fulfill(FixVoid<T>&& value) override {
   1338     if (getInner<T>() != nullptr) {
   1339       getInner<T>()->fulfill(kj::mv(value));
   1340     }
   1341   }
   1342 
   1343   void reject(Exception&& exception) override {
   1344     if (getInner<T>() != nullptr) {
   1345       getInner<T>()->reject(kj::mv(exception));
   1346     }
   1347   }
   1348 
   1349   bool isWaiting() override {
   1350     return getInner<T>() != nullptr && getInner<T>()->isWaiting();
   1351   }
   1352 
   1353   void attach(PromiseFulfiller<T>& newInner) {
   1354     setInner<T>(&newInner);
   1355   }
   1356 
   1357   void detach(PromiseFulfiller<T>& from) {
   1358     if (getInner<T>() == nullptr) {
   1359       // Already disposed.
   1360       delete this;
   1361     } else {
   1362       KJ_IREQUIRE(getInner<T>() == &from);
   1363       setInner<T>(nullptr);
   1364     }
   1365   }
   1366 
   1367 private:
   1368   WeakFulfiller() {}
   1369 };
   1370 
   1371 template <typename T>
   1372 class PromiseAndFulfillerAdapter {
   1373 public:
   1374   PromiseAndFulfillerAdapter(PromiseFulfiller<T>& fulfiller,
   1375                              WeakFulfiller<T>& wrapper)
   1376       : fulfiller(fulfiller), wrapper(wrapper) {
   1377     wrapper.attach(fulfiller);
   1378   }
   1379 
   1380   ~PromiseAndFulfillerAdapter() noexcept(false) {
   1381     wrapper.detach(fulfiller);
   1382   }
   1383 
   1384 private:
   1385   PromiseFulfiller<T>& fulfiller;
   1386   WeakFulfiller<T>& wrapper;
   1387 };
   1388 
   1389 }  // namespace _ (private)
   1390 
   1391 template <typename T>
   1392 template <typename Func>
   1393 bool PromiseFulfiller<T>::rejectIfThrows(Func&& func) {
   1394   KJ_IF_MAYBE(exception, kj::runCatchingExceptions(kj::mv(func))) {
   1395     reject(kj::mv(*exception));
   1396     return false;
   1397   } else {
   1398     return true;
   1399   }
   1400 }
   1401 
   1402 template <typename Func>
   1403 bool PromiseFulfiller<void>::rejectIfThrows(Func&& func) {
   1404   KJ_IF_MAYBE(exception, kj::runCatchingExceptions(kj::mv(func))) {
   1405     reject(kj::mv(*exception));
   1406     return false;
   1407   } else {
   1408     return true;
   1409   }
   1410 }
   1411 
   1412 template <typename T, typename Adapter, typename... Params>
   1413 _::ReducePromises<T> newAdaptedPromise(Params&&... adapterConstructorParams) {
   1414   Own<_::PromiseNode> intermediate(
   1415       heap<_::AdapterPromiseNode<_::FixVoid<T>, Adapter>>(
   1416           kj::fwd<Params>(adapterConstructorParams)...));
   1417   return _::PromiseNode::to<_::ReducePromises<T>>(
   1418       _::maybeChain(kj::mv(intermediate), implicitCast<T*>(nullptr)));
   1419 }
   1420 
   1421 template <typename T>
   1422 PromiseFulfillerPair<T> newPromiseAndFulfiller() {
   1423   auto wrapper = _::WeakFulfiller<T>::make();
   1424 
   1425   Own<_::PromiseNode> intermediate(
   1426       heap<_::AdapterPromiseNode<_::FixVoid<T>, _::PromiseAndFulfillerAdapter<T>>>(*wrapper));
   1427   auto promise = _::PromiseNode::to<_::ReducePromises<T>>(
   1428       _::maybeChain(kj::mv(intermediate), implicitCast<T*>(nullptr)));
   1429 
   1430   return PromiseFulfillerPair<T> { kj::mv(promise), kj::mv(wrapper) };
   1431 }
   1432 
   1433 // =======================================================================================
   1434 // cross-thread stuff
   1435 
   1436 namespace _ {  // (private)
   1437 
   1438 class XThreadEvent: private Event,         // it's an event in the target thread
   1439                     public PromiseNode {   // it's a PromiseNode in the requesting thread
   1440 public:
   1441   XThreadEvent(ExceptionOrValue& result, const Executor& targetExecutor, void* funcTracePtr);
   1442 
   1443   void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
   1444 
   1445 protected:
   1446   void ensureDoneOrCanceled();
   1447   // MUST be called in destructor of subclasses to make sure the object is not destroyed while
   1448   // still being accessed by the other thread. (This can't be placed in ~XThreadEvent() because
   1449   // that destructor doesn't run until the subclass has already been destroyed.)
   1450 
   1451   virtual kj::Maybe<Own<PromiseNode>> execute() = 0;
   1452   // Run the function. If the function returns a promise, returns the inner PromiseNode, otherwise
   1453   // returns null.
   1454 
   1455   // implements PromiseNode ----------------------------------------------------
   1456   void onReady(Event* event) noexcept override;
   1457 
   1458 private:
   1459   ExceptionOrValue& result;
   1460   void* funcTracePtr;
   1461 
   1462   kj::Own<const Executor> targetExecutor;
   1463   Maybe<const Executor&> replyExecutor;  // If executeAsync() was used.
   1464 
   1465   kj::Maybe<Own<PromiseNode>> promiseNode;
   1466   // Accessed only in target thread.
   1467 
   1468   ListLink<XThreadEvent> targetLink;
   1469   // Membership in one of the linked lists in the target Executor's work list or cancel list. These
   1470   // fields are protected by the target Executor's mutex.
   1471 
   1472   enum {
   1473     UNUSED,
   1474     // Object was never queued on another thread.
   1475 
   1476     QUEUED,
   1477     // Target thread has not yet dequeued the event from the state.start list. The requesting
   1478     // thread can cancel execution by removing the event from the list.
   1479 
   1480     EXECUTING,
   1481     // Target thread has dequeued the event from state.start and moved it to state.executing. To
   1482     // cancel, the requesting thread must add the event to the state.cancel list and change the
   1483     // state to CANCELING.
   1484 
   1485     CANCELING,
   1486     // Requesting thread is trying to cancel this event. The target thread will change the state to
   1487     // `DONE` once canceled.
   1488 
   1489     DONE
   1490     // Target thread has completed handling this event and will not touch it again. The requesting
   1491     // thread can safely delete the object. The `state` is updated to `DONE` using an atomic
   1492     // release operation after ensuring that the event will not be touched again, so that the
   1493     // requesting can safely skip locking if it observes the state is already DONE.
   1494   } state = UNUSED;
   1495   // State, which is also protected by `targetExecutor`'s mutex.
   1496 
   1497   ListLink<XThreadEvent> replyLink;
   1498   // Membership in `replyExecutor`'s reply list. Protected by `replyExecutor`'s mutex. The
   1499   // executing thread places the event in the reply list near the end of the `EXECUTING` state.
   1500   // Because the thread cannot lock two mutexes at once, it's possible that the reply executor
   1501   // will receive the reply while the event is still listed in the EXECUTING state, but it can
   1502   // ignore the state and proceed with the result.
   1503 
   1504   OnReadyEvent onReadyEvent;
   1505   // Accessed only in requesting thread.
   1506 
   1507   friend class kj::Executor;
   1508 
   1509   void done();
   1510   // Sets the state to `DONE` and notifies the originating thread that this event is done. Do NOT
   1511   // call under lock.
   1512 
   1513   void sendReply();
   1514   // Notifies the originating thread that this event is done, but doesn't set the state to DONE
   1515   // yet. Do NOT call under lock.
   1516 
   1517   void setDoneState();
   1518   // Assigns `state` to `DONE`, being careful to use an atomic-release-store if needed. This must
   1519   // only be called in the destination thread, and must either be called under lock, or the thread
   1520   // must take the lock and release it again shortly after setting the state (because some threads
   1521   // may be waiting on the DONE state using a conditional wait on the mutex). After calling
   1522   // setDoneState(), the destination thread MUST NOT touch this object ever again; it now belongs
   1523   // solely to the requesting thread.
   1524 
   1525   void setDisconnected();
   1526   // Sets the result to a DISCONNECTED exception indicating that the target event loop exited.
   1527 
   1528   class DelayedDoneHack;
   1529 
   1530   // implements Event ----------------------------------------------------------
   1531   Maybe<Own<Event>> fire() override;
   1532   // If called with promiseNode == nullptr, it's time to call execute(). If promiseNode != nullptr,
   1533   // then it just indicated readiness and we need to get its result.
   1534 
   1535   void traceEvent(TraceBuilder& builder) override;
   1536 };
   1537 
   1538 template <typename Func, typename = _::FixVoid<_::ReturnType<Func, void>>>
   1539 class XThreadEventImpl final: public XThreadEvent {
   1540   // Implementation for a function that does not return a Promise.
   1541 public:
   1542   XThreadEventImpl(Func&& func, const Executor& target)
   1543       : XThreadEvent(result, target, GetFunctorStartAddress<>::apply(func)),
   1544         func(kj::fwd<Func>(func)) {}
   1545   ~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); }
   1546 
   1547   typedef _::FixVoid<_::ReturnType<Func, void>> ResultT;
   1548 
   1549   kj::Maybe<Own<_::PromiseNode>> execute() override {
   1550     result.value = MaybeVoidCaller<Void, FixVoid<decltype(func())>>::apply(func, Void());
   1551     return nullptr;
   1552   }
   1553 
   1554   // implements PromiseNode ----------------------------------------------------
   1555   void get(ExceptionOrValue& output) noexcept override {
   1556     output.as<ResultT>() = kj::mv(result);
   1557   }
   1558 
   1559 private:
   1560   Func func;
   1561   ExceptionOr<ResultT> result;
   1562   friend Executor;
   1563 };
   1564 
   1565 template <typename Func, typename T>
   1566 class XThreadEventImpl<Func, Promise<T>> final: public XThreadEvent {
   1567   // Implementation for a function that DOES return a Promise.
   1568 public:
   1569   XThreadEventImpl(Func&& func, const Executor& target)
   1570       : XThreadEvent(result, target, GetFunctorStartAddress<>::apply(func)),
   1571         func(kj::fwd<Func>(func)) {}
   1572   ~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); }
   1573 
   1574   typedef _::FixVoid<_::UnwrapPromise<PromiseForResult<Func, void>>> ResultT;
   1575 
   1576   kj::Maybe<Own<_::PromiseNode>> execute() override {
   1577     auto result = _::PromiseNode::from(func());
   1578     KJ_IREQUIRE(result.get() != nullptr);
   1579     return kj::mv(result);
   1580   }
   1581 
   1582   // implements PromiseNode ----------------------------------------------------
   1583   void get(ExceptionOrValue& output) noexcept override {
   1584     output.as<ResultT>() = kj::mv(result);
   1585   }
   1586 
   1587 private:
   1588   Func func;
   1589   ExceptionOr<ResultT> result;
   1590   friend Executor;
   1591 };
   1592 
   1593 }  // namespace _ (private)
   1594 
   1595 template <typename Func>
   1596 _::UnwrapPromise<PromiseForResult<Func, void>> Executor::executeSync(Func&& func) const {
   1597   _::XThreadEventImpl<Func> event(kj::fwd<Func>(func), *this);
   1598   send(event, true);
   1599   return convertToReturn(kj::mv(event.result));
   1600 }
   1601 
   1602 template <typename Func>
   1603 PromiseForResult<Func, void> Executor::executeAsync(Func&& func) const {
   1604   auto event = kj::heap<_::XThreadEventImpl<Func>>(kj::fwd<Func>(func), *this);
   1605   send(*event, false);
   1606   return _::PromiseNode::to<PromiseForResult<Func, void>>(kj::mv(event));
   1607 }
   1608 
   1609 // -----------------------------------------------------------------------------
   1610 
   1611 namespace _ {  // (private)
   1612 
   1613 template <typename T>
   1614 class XThreadFulfiller;
   1615 
   1616 class XThreadPaf: public PromiseNode {
   1617 public:
   1618   XThreadPaf();
   1619   virtual ~XThreadPaf() noexcept(false);
   1620 
   1621   class Disposer: public kj::Disposer {
   1622   public:
   1623     void disposeImpl(void* pointer) const override;
   1624   };
   1625   static const Disposer DISPOSER;
   1626 
   1627   // implements PromiseNode ----------------------------------------------------
   1628   void onReady(Event* event) noexcept override;
   1629   void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
   1630 
   1631 private:
   1632   enum {
   1633     WAITING,
   1634     // Not yet fulfilled, and the waiter is still waiting.
   1635     //
   1636     // Starting from this state, the state may transition to either FULFILLING or CANCELED
   1637     // using an atomic compare-and-swap.
   1638 
   1639     FULFILLING,
   1640     // The fulfiller thread atomically transitions the state from WAITING to FULFILLING when it
   1641     // wishes to fulfill the promise. By doing so, it guarantees that the `executor` will not
   1642     // disappear out from under it. It then fills in the result value, locks the executor mutex,
   1643     // adds the object to the executor's list of fulfilled XThreadPafs, changes the state to
   1644     // FULFILLED, and finally unlocks the mutex.
   1645     //
   1646     // If the waiting thread tries to cancel but discovers the object in this state, then it
   1647     // must perform a conditional wait on the executor mutex to await the state becoming FULFILLED.
   1648     // It can then delete the object.
   1649 
   1650     FULFILLED,
   1651     // The fulfilling thread has completed filling in the result value and inserting the object
   1652     // into the waiting thread's executor event queue. Moreover, the fulfilling thread no longer
   1653     // holds any pointers to this object. The waiting thread is responsible for deleting it.
   1654 
   1655     DISPATCHED,
   1656     // The object reached FULFILLED state, and then was dispatched from the waiting thread's
   1657     // executor's event queue. Therefore, the object is completely owned by the waiting thread with
   1658     // no need to lock anything.
   1659 
   1660     CANCELED
   1661     // The waiting thread atomically transitions the state from WAITING to CANCELED if it is no
   1662     // longer listening. In this state, it is the fulfiller thread's responsibility to destroy the
   1663     // object.
   1664   } state;
   1665 
   1666   const Executor& executor;
   1667   // Executor of the waiting thread. Only guaranteed to be valid when state is `WAITING` or
   1668   // `FULFILLING`. After any other state has been reached, this reference may be invalidated.
   1669 
   1670   ListLink<XThreadPaf> link;
   1671   // In the FULFILLING/FULFILLED states, the object is placed in a linked list within the waiting
   1672   // thread's executor. In those states, these pointers are guarded by said executor's mutex.
   1673 
   1674   OnReadyEvent onReadyEvent;
   1675 
   1676   class FulfillScope;
   1677 
   1678   static kj::Exception unfulfilledException();
   1679   // Construct appropriate exception to use to reject an unfulfilled XThreadPaf.
   1680 
   1681   template <typename T>
   1682   friend class XThreadFulfiller;
   1683   friend Executor;
   1684 };
   1685 
   1686 template <typename T>
   1687 class XThreadPafImpl final: public XThreadPaf {
   1688 public:
   1689   // implements PromiseNode ----------------------------------------------------
   1690   void get(ExceptionOrValue& output) noexcept override {
   1691     output.as<FixVoid<T>>() = kj::mv(result);
   1692   }
   1693 
   1694 private:
   1695   ExceptionOr<FixVoid<T>> result;
   1696 
   1697   friend class XThreadFulfiller<T>;
   1698 };
   1699 
   1700 class XThreadPaf::FulfillScope {
   1701   // Create on stack while setting `XThreadPafImpl<T>::result`.
   1702   //
   1703   // This ensures that:
   1704   // - Only one call is carried out, even if multiple threads try to fulfill concurrently.
   1705   // - The waiting thread is correctly signaled.
   1706 public:
   1707   FulfillScope(XThreadPaf** pointer);
   1708   // Atomically nulls out *pointer and takes ownership of the pointer.
   1709 
   1710   ~FulfillScope() noexcept(false);
   1711 
   1712   KJ_DISALLOW_COPY(FulfillScope);
   1713 
   1714   bool shouldFulfill() { return obj != nullptr; }
   1715 
   1716   template <typename T>
   1717   XThreadPafImpl<T>* getTarget() { return static_cast<XThreadPafImpl<T>*>(obj); }
   1718 
   1719 private:
   1720   XThreadPaf* obj;
   1721 };
   1722 
   1723 template <typename T>
   1724 class XThreadFulfiller final: public CrossThreadPromiseFulfiller<T> {
   1725 public:
   1726   XThreadFulfiller(XThreadPafImpl<T>* target): target(target) {}
   1727 
   1728   ~XThreadFulfiller() noexcept(false) {
   1729     if (target != nullptr) {
   1730       reject(XThreadPaf::unfulfilledException());
   1731     }
   1732   }
   1733   void fulfill(FixVoid<T>&& value) const override {
   1734     XThreadPaf::FulfillScope scope(&target);
   1735     if (scope.shouldFulfill()) {
   1736       scope.getTarget<T>()->result = kj::mv(value);
   1737     }
   1738   }
   1739   void reject(Exception&& exception) const override {
   1740     XThreadPaf::FulfillScope scope(&target);
   1741     if (scope.shouldFulfill()) {
   1742       scope.getTarget<T>()->result.addException(kj::mv(exception));
   1743     }
   1744   }
   1745   bool isWaiting() const override {
   1746     KJ_IF_MAYBE(t, target) {
   1747 #if _MSC_VER && !__clang__
   1748       // Just assume 1-byte loads are atomic... on what kind of absurd platform would they not be?
   1749       return t->state == XThreadPaf::WAITING;
   1750 #else
   1751       return __atomic_load_n(&t->state, __ATOMIC_RELAXED) == XThreadPaf::WAITING;
   1752 #endif
   1753     } else {
   1754       return false;
   1755     }
   1756   }
   1757 
   1758 private:
   1759   mutable XThreadPaf* target;  // accessed using atomic ops
   1760 };
   1761 
   1762 template <typename T>
   1763 class XThreadFulfiller<kj::Promise<T>> {
   1764 public:
   1765   static_assert(sizeof(T) < 0,
   1766       "newCrosssThreadPromiseAndFulfiller<Promise<T>>() is not currently supported");
   1767   // TODO(someday): Is this worth supporting? Presumably, when someone calls `fulfill(somePromise)`,
   1768   //   then `somePromise` should be assumed to be a promise owned by the fulfilling thread, not
   1769   //   the waiting thread.
   1770 };
   1771 
   1772 }  // namespace _ (private)
   1773 
   1774 template <typename T>
   1775 PromiseCrossThreadFulfillerPair<T> newPromiseAndCrossThreadFulfiller() {
   1776   kj::Own<_::XThreadPafImpl<T>> node(new _::XThreadPafImpl<T>, _::XThreadPaf::DISPOSER);
   1777   auto fulfiller = kj::heap<_::XThreadFulfiller<T>>(node);
   1778   return { _::PromiseNode::to<_::ReducePromises<T>>(kj::mv(node)), kj::mv(fulfiller) };
   1779 }
   1780 
   1781 }  // namespace kj
   1782 
   1783 #if KJ_HAS_COROUTINE
   1784 
   1785 // =======================================================================================
   1786 // Coroutines TS integration with kj::Promise<T>.
   1787 //
   1788 // Here's a simple coroutine:
   1789 //
   1790 //   Promise<Own<AsyncIoStream>> connectToService(Network& n) {
   1791 //     auto a = co_await n.parseAddress(IP, PORT);
   1792 //     auto c = co_await a->connect();
   1793 //     co_return kj::mv(c);
   1794 //   }
   1795 //
   1796 // The presence of the co_await and co_return keywords tell the compiler it is a coroutine.
   1797 // Although it looks similar to a function, it has a couple large differences. First, everything
   1798 // that would normally live in the stack frame lives instead in a heap-based coroutine frame.
   1799 // Second, the coroutine has the ability to return from its scope without deallocating this frame
   1800 // (to suspend, in other words), and the ability to resume from its last suspension point.
   1801 //
   1802 // In order to know how to suspend, resume, and return from a coroutine, the compiler looks up a
   1803 // coroutine implementation type via a traits class parameterized by the coroutine return and
   1804 // parameter types. We'll name our coroutine implementation `kj::_::Coroutine<T>`,
   1805 
   1806 namespace kj::_ { template <typename T> class Coroutine; }
   1807 
   1808 // Specializing the appropriate traits class tells the compiler about `kj::_::Coroutine<T>`.
   1809 
   1810 namespace KJ_COROUTINE_STD_NAMESPACE {
   1811 
   1812 template <class T, class... Args>
   1813 struct coroutine_traits<kj::Promise<T>, Args...> {
   1814   // `Args...` are the coroutine's parameter types.
   1815 
   1816   using promise_type = kj::_::Coroutine<T>;
   1817   // The Coroutines TS calls this the "promise type". This makes sense when thinking of coroutines
   1818   // returning `std::future<T>`, since the coroutine implementation would be a wrapper around
   1819   // a `std::promise<T>`. It's extremely confusing from a KJ perspective, however, so I call it
   1820   // the "coroutine implementation type" instead.
   1821 };
   1822 
   1823 }  // namespace KJ_COROUTINE_STD_NAMESPACE
   1824 
   1825 // Now when the compiler sees our `connectToService()` coroutine above, it default-constructs a
   1826 // `coroutine_traits<Promise<Own<AsyncIoStream>>, Network&>::promise_type`, or
   1827 // `kj::_::Coroutine<Own<AsyncIoStream>>`.
   1828 //
   1829 // The implementation object lives in the heap-allocated coroutine frame. It gets destroyed and
   1830 // deallocated when the frame does.
   1831 
   1832 namespace kj::_ {
   1833 
   1834 namespace stdcoro = KJ_COROUTINE_STD_NAMESPACE;
   1835 
   1836 class CoroutineBase: public PromiseNode,
   1837                      public Event,
   1838                      public Disposer {
   1839 public:
   1840   CoroutineBase(stdcoro::coroutine_handle<> coroutine, ExceptionOrValue& resultRef);
   1841   ~CoroutineBase() noexcept(false);
   1842   KJ_DISALLOW_COPY(CoroutineBase);
   1843 
   1844   auto initial_suspend() { return stdcoro::suspend_never(); }
   1845   auto final_suspend() noexcept { return stdcoro::suspend_always(); }
   1846   // These adjust the suspension behavior of coroutines immediately upon initiation, and immediately
   1847   // after completion.
   1848   //
   1849   // The initial suspension point could allow us to defer the initial synchronous execution of a
   1850   // coroutine -- everything before its first co_await, that is.
   1851   //
   1852   // The final suspension point is useful to delay deallocation of the coroutine frame to match the
   1853   // lifetime of the enclosing promise.
   1854 
   1855   void unhandled_exception();
   1856 
   1857 protected:
   1858   class AwaiterBase;
   1859 
   1860   bool isWaiting() { return waiting; }
   1861   void scheduleResumption() {
   1862     onReadyEvent.arm();
   1863     waiting = false;
   1864   }
   1865 
   1866 private:
   1867   // -------------------------------------------------------
   1868   // PromiseNode implementation
   1869 
   1870   void onReady(Event* event) noexcept override;
   1871   void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
   1872 
   1873   // -------------------------------------------------------
   1874   // Event implementation
   1875 
   1876   Maybe<Own<Event>> fire() override;
   1877   void traceEvent(TraceBuilder& builder) override;
   1878 
   1879   // -------------------------------------------------------
   1880   // Disposer implementation
   1881 
   1882   void disposeImpl(void* pointer) const override;
   1883   void destroy();
   1884 
   1885   stdcoro::coroutine_handle<> coroutine;
   1886   ExceptionOrValue& resultRef;
   1887 
   1888   OnReadyEvent onReadyEvent;
   1889   bool waiting = true;
   1890 
   1891   Maybe<PromiseNode&> promiseNodeForTrace;
   1892   // Whenever this coroutine is suspended waiting on another promise, we keep a reference to that
   1893   // promise so tracePromise()/traceEvent() can trace into it.
   1894 
   1895   UnwindDetector unwindDetector;
   1896 
   1897   struct DisposalResults {
   1898     bool destructorRan = false;
   1899     Maybe<Exception> exception;
   1900   };
   1901   Maybe<DisposalResults&> maybeDisposalResults;
   1902   // Only non-null during destruction. Before calling coroutine.destroy(), our disposer sets this
   1903   // to point to a DisposalResults on the stack so unhandled_exception() will have some place to
   1904   // store unwind exceptions. We can't store them in this Coroutine, because we'll be destroyed once
   1905   // coroutine.destroy() has returned. Our disposer then rethrows as needed.
   1906 };
   1907 
   1908 template <typename Self, typename T>
   1909 class CoroutineMixin;
   1910 // CRTP mixin, covered later.
   1911 
   1912 template <typename T>
   1913 class Coroutine final: public CoroutineBase,
   1914                        public CoroutineMixin<Coroutine<T>, T> {
   1915   // The standard calls this the `promise_type` object. We can call this the "coroutine
   1916   // implementation object" since the word promise means different things in KJ and std styles. This
   1917   // is where we implement how a `kj::Promise<T>` is returned from a coroutine, and how that promise
   1918   // is later fulfilled. We also fill in a few lifetime-related details.
   1919   //
   1920   // The implementation object is also where we can customize memory allocation of coroutine frames,
   1921   // by implementing a member `operator new(size_t, Args...)` (same `Args...` as in
   1922   // coroutine_traits).
   1923   //
   1924   // We can also customize how await-expressions are transformed within `kj::Promise<T>`-based
   1925   // coroutines by implementing an `await_transform(P)` member function, where `P` is some type for
   1926   // which we want to implement co_await support, e.g. `kj::Promise<U>`. This feature allows us to
   1927   // provide an optimized `kj::EventLoop` integration when the coroutine's return type and the
   1928   // await-expression's type are both `kj::Promise` instantiations -- see further comments under
   1929   // `await_transform()`.
   1930 
   1931 public:
   1932   using Handle = stdcoro::coroutine_handle<Coroutine<T>>;
   1933 
   1934   Coroutine(): CoroutineBase(Handle::from_promise(*this), result) {}
   1935 
   1936   Promise<T> get_return_object() {
   1937     // Called after coroutine frame construction and before initial_suspend() to create the
   1938     // coroutine's return object. `this` itself lives inside the coroutine frame, and we arrange for
   1939     // the returned Promise<T> to own `this` via a custom Disposer and by always leaving the
   1940     // coroutine in a suspended state.
   1941     return PromiseNode::to<Promise<T>>(Own<PromiseNode>(this, *this));
   1942   }
   1943 
   1944 public:
   1945   template <typename U>
   1946   class Awaiter;
   1947 
   1948   template <typename U>
   1949   Awaiter<U> await_transform(kj::Promise<U>& promise) { return Awaiter<U>(kj::mv(promise)); }
   1950   template <typename U>
   1951   Awaiter<U> await_transform(kj::Promise<U>&& promise) { return Awaiter<U>(kj::mv(promise)); }
   1952   // Called when someone writes `co_await promise`, where `promise` is a kj::Promise<U>. We return
   1953   // an Awaiter<U>, which implements coroutine suspension and resumption in terms of the KJ async
   1954   // event system.
   1955   //
   1956   // There is another hook we could implement: an `operator co_await()` free function. However, a
   1957   // free function would be unaware of the type of the enclosing coroutine. Since Awaiter<U> is a
   1958   // member class template of Coroutine<T>, it is able to implement an
   1959   // `await_suspend(Coroutine<T>::Handle)` override, providing it type-safe access to our enclosing
   1960   // coroutine's PromiseNode. An `operator co_await()` free function would have to implement
   1961   // a type-erased `await_suspend(stdcoro::coroutine_handle<void>)` override, and implement
   1962   // suspension and resumption in terms of .then(). Yuck!
   1963 
   1964 private:
   1965   // -------------------------------------------------------
   1966   // PromiseNode implementation
   1967 
   1968   void get(ExceptionOrValue& output) noexcept override {
   1969     output.as<FixVoid<T>>() = kj::mv(result);
   1970   }
   1971 
   1972   void fulfill(FixVoid<T>&& value) {
   1973     // Called by the return_value()/return_void() functions in our mixin class.
   1974 
   1975     if (isWaiting()) {
   1976       result = kj::mv(value);
   1977       scheduleResumption();
   1978     }
   1979   }
   1980 
   1981   ExceptionOr<FixVoid<T>> result;
   1982 
   1983   friend class CoroutineMixin<Coroutine<T>, T>;
   1984 };
   1985 
   1986 template <typename Self, typename T>
   1987 class CoroutineMixin {
   1988 public:
   1989   void return_value(T value) {
   1990     static_cast<Self*>(this)->fulfill(kj::mv(value));
   1991   }
   1992 };
   1993 template <typename Self>
   1994 class CoroutineMixin<Self, void> {
   1995 public:
   1996   void return_void() {
   1997     static_cast<Self*>(this)->fulfill(_::Void());
   1998   }
   1999 };
   2000 // The Coroutines spec has no `_::FixVoid<T>` equivalent to unify valueful and valueless co_return
   2001 // statements, and programs are ill-formed if the coroutine implementation object (Coroutine<T>) has
   2002 // both a `return_value()` and `return_void()`. No amount of EnableIffery can get around it, so
   2003 // these return_* functions live in a CRTP mixin.
   2004 
   2005 class CoroutineBase::AwaiterBase {
   2006 public:
   2007   explicit AwaiterBase(Own<PromiseNode> node);
   2008   AwaiterBase(AwaiterBase&&);
   2009   ~AwaiterBase() noexcept(false);
   2010   KJ_DISALLOW_COPY(AwaiterBase);
   2011 
   2012   bool await_ready() const { return false; }
   2013   // This could return "`node->get()` is safe to call" instead, which would make suspension-less
   2014   // co_awaits possible for immediately-fulfilled promises. However, we need an Event to figure that
   2015   // out, and we won't have access to the Coroutine Event until await_suspend() is called. So, we
   2016   // must return false here. Fortunately, await_suspend() has a trick up its sleeve to enable
   2017   // suspension-less co_awaits.
   2018 
   2019 protected:
   2020   void getImpl(ExceptionOrValue& result);
   2021   bool awaitSuspendImpl(CoroutineBase& coroutineEvent);
   2022 
   2023 private:
   2024   UnwindDetector unwindDetector;
   2025   Own<PromiseNode> node;
   2026 
   2027   Maybe<CoroutineBase&> maybeCoroutineEvent;
   2028   // If we do suspend waiting for our wrapped promise, we store a reference to `node` in our
   2029   // enclosing Coroutine for tracing purposes. To guard against any edge cases where an async stack
   2030   // trace is generated when an Awaiter was destroyed without Coroutine::fire() having been called,
   2031   // we need our own reference to the enclosing Coroutine. (I struggle to think up any such
   2032   // scenarios, but perhaps they could occur when destroying a suspended coroutine.)
   2033 };
   2034 
   2035 template <typename T>
   2036 template <typename U>
   2037 class Coroutine<T>::Awaiter: public AwaiterBase {
   2038   // Wrapper around a co_await'ed promise and some storage space for the result of that promise.
   2039   // The compiler arranges to call our await_suspend() to suspend, which arranges to be woken up
   2040   // when the awaited promise is settled. Once that happens, the enclosing coroutine's Event
   2041   // implementation resumes the coroutine, which transitively calls await_resume() to unwrap the
   2042   // awaited promise result.
   2043 
   2044 public:
   2045   explicit Awaiter(Promise<U> promise): AwaiterBase(PromiseNode::from(kj::mv(promise))) {}
   2046 
   2047   U await_resume() {
   2048     getImpl(result);
   2049     auto value = kj::_::readMaybe(result.value);
   2050     KJ_IASSERT(value != nullptr, "Neither exception nor value present.");
   2051     return U(kj::mv(*value));
   2052   }
   2053 
   2054   bool await_suspend(Coroutine::Handle coroutine) {
   2055     return awaitSuspendImpl(coroutine.promise());
   2056   }
   2057 
   2058 private:
   2059   ExceptionOr<FixVoid<U>> result;
   2060 };
   2061 
   2062 #undef KJ_COROUTINE_STD_NAMESPACE
   2063 
   2064 }  // namespace kj::_ (private)
   2065 
   2066 #endif  // KJ_HAS_COROUTINE
   2067 
   2068 KJ_END_HEADER