capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

async.h (57525B)


      1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #pragma once
     23 
     24 #include "async-prelude.h"
     25 #include "exception.h"
     26 #include "refcount.h"
     27 
     28 KJ_BEGIN_HEADER
     29 
     30 #ifndef KJ_USE_FIBERS
     31   #if __BIONIC__ || __FreeBSD__ || __OpenBSD__ || KJ_NO_EXCEPTIONS
     32     // These platforms don't support fibers.
     33     #define KJ_USE_FIBERS 0
     34   #else
     35     #define KJ_USE_FIBERS 1
     36   #endif
     37 #else
     38   #if KJ_NO_EXCEPTIONS && KJ_USE_FIBERS
     39     #error "Fibers cannot be enabled when exceptions are disabled."
     40   #endif
     41 #endif
     42 
     43 namespace kj {
     44 
     45 class EventLoop;
     46 class WaitScope;
     47 
     48 template <typename T>
     49 class Promise;
     50 template <typename T>
     51 class ForkedPromise;
     52 template <typename T>
     53 class PromiseFulfiller;
     54 template <typename T>
     55 struct PromiseFulfillerPair;
     56 
     57 template <typename Func>
     58 class FunctionParam;
     59 
     60 template <typename Func, typename T>
     61 using PromiseForResult = _::ReducePromises<_::ReturnType<Func, T>>;
     62 // Evaluates to the type of Promise for the result of calling functor type Func with parameter type
     63 // T.  If T is void, then the promise is for the result of calling Func with no arguments.  If
     64 // Func itself returns a promise, the promises are joined, so you never get Promise<Promise<T>>.
     65 
     66 // =======================================================================================
     67 // Promises
     68 
     69 template <typename T>
     70 class Promise: protected _::PromiseBase {
     71   // The basic primitive of asynchronous computation in KJ.  Similar to "futures", but designed
     72   // specifically for event loop concurrency.  Similar to E promises and JavaScript Promises/A.
     73   //
     74   // A Promise represents a promise to produce a value of type T some time in the future.  Once
     75   // that value has been produced, the promise is "fulfilled".  Alternatively, a promise can be
     76   // "broken", with an Exception describing what went wrong.  You may implicitly convert a value of
     77   // type T to an already-fulfilled Promise<T>.  You may implicitly convert the constant
     78   // `kj::READY_NOW` to an already-fulfilled Promise<void>.  You may also implicitly convert a
     79   // `kj::Exception` to an already-broken promise of any type.
     80   //
     81   // Promises are linear types -- they are moveable but not copyable.  If a Promise is destroyed
     82   // or goes out of scope (without being moved elsewhere), any ongoing asynchronous operations
     83   // meant to fulfill the promise will be canceled if possible.  All methods of `Promise` (unless
     84   // otherwise noted) actually consume the promise in the sense of move semantics.  (Arguably they
     85   // should be rvalue-qualified, but at the time this interface was created compilers didn't widely
     86   // support that yet and anyway it would be pretty ugly typing kj::mv(promise).whatever().)  If
     87   // you want to use one Promise in two different places, you must fork it with `fork()`.
     88   //
     89   // To use the result of a Promise, you must call `then()` and supply a callback function to
     90   // call with the result.  `then()` returns another promise, for the result of the callback.
     91   // Any time that this would result in Promise<Promise<T>>, the promises are collapsed into a
     92   // simple Promise<T> that first waits for the outer promise, then the inner.  Example:
     93   //
     94   //     // Open a remote file, read the content, and then count the
     95   //     // number of lines of text.
     96   //     // Note that none of the calls here block.  `file`, `content`
     97   //     // and `lineCount` are all initialized immediately before any
     98   //     // asynchronous operations occur.  The lambda callbacks are
     99   //     // called later.
    100   //     Promise<Own<File>> file = openFtp("ftp://host/foo/bar");
    101   //     Promise<String> content = file.then(
    102   //         [](Own<File> file) -> Promise<String> {
    103   //           return file.readAll();
    104   //         });
    105   //     Promise<int> lineCount = content.then(
    106   //         [](String text) -> int {
    107   //           uint count = 0;
    108   //           for (char c: text) count += (c == '\n');
    109   //           return count;
    110   //         });
    111   //
    112   // For `then()` to work, the current thread must have an active `EventLoop`.  Each callback
    113   // is scheduled to execute in that loop.  Since `then()` schedules callbacks only on the current
    114   // thread's event loop, you do not need to worry about two callbacks running at the same time.
    115   // You will need to set up at least one `EventLoop` at the top level of your program before you
    116   // can use promises.
    117   //
    118   // To adapt a non-Promise-based asynchronous API to promises, use `newAdaptedPromise()`.
    119   //
    120   // Systems using promises should consider supporting the concept of "pipelining".  Pipelining
    121   // means allowing a caller to start issuing method calls against a promised object before the
    122   // promise has actually been fulfilled.  This is particularly useful if the promise is for a
    123   // remote object living across a network, as this can avoid round trips when chaining a series
    124   // of calls.  It is suggested that any class T which supports pipelining implement a subclass of
    125   // Promise<T> which adds "eventual send" methods -- methods which, when called, say "please
    126   // invoke the corresponding method on the promised value once it is available".  These methods
    127   // should in turn return promises for the eventual results of said invocations.  Cap'n Proto,
    128   // for example, implements the type `RemotePromise` which supports pipelining RPC requests -- see
    129   // `capnp/capability.h`.
    130   //
    131   // KJ Promises are based on E promises:
    132   //   http://wiki.erights.org/wiki/Walnut/Distributed_Computing#Promises
    133   //
    134   // KJ Promises are also inspired in part by the evolving standards for JavaScript/ECMAScript
    135   // promises, which are themselves influenced by E promises:
    136   //   http://promisesaplus.com/
    137   //   https://github.com/domenic/promises-unwrapping
    138 
    139 public:
    140   Promise(_::FixVoid<T> value);
    141   // Construct an already-fulfilled Promise from a value of type T.  For non-void promises, the
    142   // parameter type is simply T.  So, e.g., in a function that returns `Promise<int>`, you can
    143   // say `return 123;` to return a promise that is already fulfilled to 123.
    144   //
    145   // For void promises, use `kj::READY_NOW` as the value, e.g. `return kj::READY_NOW`.
    146 
    147   Promise(kj::Exception&& e);
    148   // Construct an already-broken Promise.
    149 
    150   inline Promise(decltype(nullptr)) {}
    151 
    152   template <typename Func, typename ErrorFunc = _::PropagateException>
    153   PromiseForResult<Func, T> then(Func&& func, ErrorFunc&& errorHandler = _::PropagateException())
    154       KJ_WARN_UNUSED_RESULT;
    155   // Register a continuation function to be executed when the promise completes.  The continuation
    156   // (`func`) takes the promised value (an rvalue of type `T`) as its parameter.  The continuation
    157   // may return a new value; `then()` itself returns a promise for the continuation's eventual
    158   // result.  If the continuation itself returns a `Promise<U>`, then `then()` shall also return
    159   // a `Promise<U>` which first waits for the original promise, then executes the continuation,
    160   // then waits for the inner promise (i.e. it automatically "unwraps" the promise).
    161   //
    162   // In all cases, `then()` returns immediately.  The continuation is executed later.  The
    163   // continuation is always executed on the same EventLoop (and, therefore, the same thread) which
    164   // called `then()`, therefore no synchronization is necessary on state shared by the continuation
    165   // and the surrounding scope.  If no EventLoop is running on the current thread, `then()` throws
    166   // an exception.
    167   //
    168   // You may also specify an error handler continuation as the second parameter.  `errorHandler`
    169   // must be a functor taking a parameter of type `kj::Exception&&`.  It must return the same
    170   // type as `func` returns (except when `func` returns `Promise<U>`, in which case `errorHandler`
    171   // may return either `Promise<U>` or just `U`).  The default error handler simply propagates the
    172   // exception to the returned promise.
    173   //
    174   // Either `func` or `errorHandler` may, of course, throw an exception, in which case the promise
    175   // is broken.  When compiled with -fno-exceptions, the framework will still detect when a
    176   // recoverable exception was thrown inside of a continuation and will consider the promise
    177   // broken even though a (presumably garbage) result was returned.
    178   //
    179   // If the returned promise is destroyed before the callback runs, the callback will be canceled
    180   // (it will never run).
    181   //
    182   // Note that `then()` -- like all other Promise methods -- consumes the promise on which it is
    183   // called, in the sense of move semantics.  After returning, the original promise is no longer
    184   // valid, but `then()` returns a new promise.
    185   //
    186   // *Advanced implementation tips:*  Most users will never need to worry about the below, but
    187   // it is good to be aware of.
    188   //
    189   // As an optimization, if the callback function `func` does _not_ return another promise, then
    190   // execution of `func` itself may be delayed until its result is known to be needed.  The
    191   // expectation here is that `func` is just doing some transformation on the results, not
    192   // scheduling any other actions, therefore the system doesn't need to be proactive about
    193   // evaluating it.  This way, a chain of trivial then() transformations can be executed all at
    194   // once without repeatedly re-scheduling through the event loop.  Use the `eagerlyEvaluate()`
    195   // method to suppress this behavior.
    196   //
    197   // On the other hand, if `func` _does_ return another promise, then the system evaluates `func`
    198   // as soon as possible, because the promise it returns might be for a newly-scheduled
    199   // long-running asynchronous task.
    200   //
    201   // As another optimization, when a callback function registered with `then()` is actually
    202   // scheduled, it is scheduled to occur immediately, preempting other work in the event queue.
    203   // This allows a long chain of `then`s to execute all at once, improving cache locality by
    204   // clustering operations on the same data.  However, this implies that starvation can occur
    205   // if a chain of `then()`s takes a very long time to execute without ever stopping to wait for
    206   // actual I/O.  To solve this, use `kj::evalLater()` to yield control; this way, all other events
    207   // in the queue will get a chance to run before your callback is executed.
    208 
    209   Promise<void> ignoreResult() KJ_WARN_UNUSED_RESULT { return then([](T&&) {}); }
    210   // Convenience method to convert the promise to a void promise by ignoring the return value.
    211   //
    212   // You must still wait on the returned promise if you want the task to execute.
    213 
    214   template <typename ErrorFunc>
    215   Promise<T> catch_(ErrorFunc&& errorHandler) KJ_WARN_UNUSED_RESULT;
    216   // Equivalent to `.then(identityFunc, errorHandler)`, where `identifyFunc` is a function that
    217   // just returns its input.
    218 
    219   T wait(WaitScope& waitScope);
    220   // Run the event loop until the promise is fulfilled, then return its result.  If the promise
    221   // is rejected, throw an exception.
    222   //
    223   // wait() is primarily useful at the top level of a program -- typically, within the function
    224   // that allocated the EventLoop.  For example, a program that performs one or two RPCs and then
    225   // exits would likely use wait() in its main() function to wait on each RPC.  On the other hand,
    226   // server-side code generally cannot use wait(), because it has to be able to accept multiple
    227   // requests at once.
    228   //
    229   // If the promise is rejected, `wait()` throws an exception.  If the program was compiled without
    230   // exceptions (-fno-exceptions), this will usually abort.  In this case you really should first
    231   // use `then()` to set an appropriate handler for the exception case, so that the promise you
    232   // actually wait on never throws.
    233   //
    234   // `waitScope` is an object proving that the caller is in a scope where wait() is allowed.  By
    235   // convention, any function which might call wait(), or which might call another function which
    236   // might call wait(), must take `WaitScope&` as one of its parameters.  This is needed for two
    237   // reasons:
    238   // * `wait()` is not allowed during an event callback, because event callbacks are themselves
    239   //   called during some other `wait()`, and such recursive `wait()`s would only be able to
    240   //   complete in LIFO order, which might mean that the outer `wait()` ends up waiting longer
    241   //   than it is supposed to.  To prevent this, a `WaitScope` cannot be constructed or used during
    242   //   an event callback.
    243   // * Since `wait()` runs the event loop, unrelated event callbacks may execute before `wait()`
    244   //   returns.  This means that anyone calling `wait()` must be reentrant -- state may change
    245   //   around them in arbitrary ways.  Therefore, callers really need to know if a function they
    246   //   are calling might wait(), and the `WaitScope&` parameter makes this clear.
    247   //
    248   // Usually, there is only one `WaitScope` for each `EventLoop`, and it can only be used at the
    249   // top level of the thread owning the loop. Calling `wait()` with this `WaitScope` is what
    250   // actually causes the event loop to run at all. This top-level `WaitScope` cannot be used
    251   // recursively, so cannot be used within an event callback.
    252   //
    253   // However, it is possible to obtain a `WaitScope` in lower-level code by using fibers. Use
    254   // kj::startFiber() to start some code executing on an alternate call stack. That code will get
    255   // its own `WaitScope` allowing it to operate in a synchronous style. In this case, `wait()`
    256   // switches back to the main stack in order to run the event loop, returning to the fiber's stack
    257   // once the awaited promise resolves.
    258 
    259   bool poll(WaitScope& waitScope);
    260   // Returns true if a call to wait() would complete without blocking, false if it would block.
    261   //
    262   // If the promise is not yet resolved, poll() will pump the event loop and poll for I/O in an
    263   // attempt to resolve it. Only when there is nothing left to do will it return false.
    264   //
    265   // Generally, poll() is most useful in tests. Often, you may want to verify that a promise does
    266   // not resolve until some specific event occurs. To do so, poll() the promise before the event to
    267   // verify it isn't resolved, then trigger the event, then poll() again to verify that it resolves.
    268   // The first poll() verifies that the promise doesn't resolve early, which would otherwise be
    269   // hard to do deterministically. The second poll() allows you to check that the promise has
    270   // resolved and avoid a wait() that might deadlock in the case that it hasn't.
    271   //
    272   // poll() is not supported in fibers; it will throw an exception.
    273 
    274   ForkedPromise<T> fork() KJ_WARN_UNUSED_RESULT;
    275   // Forks the promise, so that multiple different clients can independently wait on the result.
    276   // `T` must be copy-constructable for this to work.  Or, in the special case where `T` is
    277   // `Own<U>`, `U` must have a method `Own<U> addRef()` which returns a new reference to the same
    278   // (or an equivalent) object (probably implemented via reference counting).
    279 
    280   _::SplitTuplePromise<T> split();
    281   // Split a promise for a tuple into a tuple of promises.
    282   //
    283   // E.g. if you have `Promise<kj::Tuple<T, U>>`, `split()` returns
    284   // `kj::Tuple<Promise<T>, Promise<U>>`.
    285 
    286   Promise<T> exclusiveJoin(Promise<T>&& other) KJ_WARN_UNUSED_RESULT;
    287   // Return a new promise that resolves when either the original promise resolves or `other`
    288   // resolves (whichever comes first).  The promise that didn't resolve first is canceled.
    289 
    290   // TODO(someday): inclusiveJoin(), or perhaps just join(), which waits for both completions
    291   //   and produces a tuple?
    292 
    293   template <typename... Attachments>
    294   Promise<T> attach(Attachments&&... attachments) KJ_WARN_UNUSED_RESULT;
    295   // "Attaches" one or more movable objects (often, Own<T>s) to the promise, such that they will
    296   // be destroyed when the promise resolves.  This is useful when a promise's callback contains
    297   // pointers into some object and you want to make sure the object still exists when the callback
    298   // runs -- after calling then(), use attach() to add necessary objects to the result.
    299 
    300   template <typename ErrorFunc>
    301   Promise<T> eagerlyEvaluate(ErrorFunc&& errorHandler) KJ_WARN_UNUSED_RESULT;
    302   Promise<T> eagerlyEvaluate(decltype(nullptr)) KJ_WARN_UNUSED_RESULT;
    303   // Force eager evaluation of this promise.  Use this if you are going to hold on to the promise
    304   // for awhile without consuming the result, but you want to make sure that the system actually
    305   // processes it.
    306   //
    307   // `errorHandler` is a function that takes `kj::Exception&&`, like the second parameter to
    308   // `then()`, or the parameter to `catch_()`.  We make you specify this because otherwise it's
    309   // easy to forget to handle errors in a promise that you never use.  You may specify nullptr for
    310   // the error handler if you are sure that ignoring errors is fine, or if you know that you'll
    311   // eventually wait on the promise somewhere.
    312 
    313   template <typename ErrorFunc>
    314   void detach(ErrorFunc&& errorHandler);
    315   // Allows the promise to continue running in the background until it completes or the
    316   // `EventLoop` is destroyed.  Be careful when using this: since you can no longer cancel this
    317   // promise, you need to make sure that the promise owns all the objects it touches or make sure
    318   // those objects outlive the EventLoop.
    319   //
    320   // `errorHandler` is a function that takes `kj::Exception&&`, like the second parameter to
    321   // `then()`, except that it must return void.
    322   //
    323   // This function exists mainly to implement the Cap'n Proto requirement that RPC calls cannot be
    324   // canceled unless the callee explicitly permits it.
    325 
    326   kj::String trace();
    327   // Returns a dump of debug info about this promise.  Not for production use.  Requires RTTI.
    328   // This method does NOT consume the promise as other methods do.
    329 
    330 private:
    331   Promise(bool, Own<_::PromiseNode>&& node): PromiseBase(kj::mv(node)) {}
    332   // Second parameter prevent ambiguity with immediate-value constructor.
    333 
    334   friend class _::PromiseNode;
    335 };
    336 
    337 template <typename T>
    338 class ForkedPromise {
    339   // The result of `Promise::fork()` and `EventLoop::fork()`.  Allows branches to be created.
    340   // Like `Promise<T>`, this is a pass-by-move type.
    341 
    342 public:
    343   inline ForkedPromise(decltype(nullptr)) {}
    344 
    345   Promise<T> addBranch();
    346   // Add a new branch to the fork.  The branch is equivalent to the original promise.
    347 
    348   bool hasBranches();
    349   // Returns true if there are any branches that haven't been canceled.
    350 
    351 private:
    352   Own<_::ForkHub<_::FixVoid<T>>> hub;
    353 
    354   inline ForkedPromise(bool, Own<_::ForkHub<_::FixVoid<T>>>&& hub): hub(kj::mv(hub)) {}
    355 
    356   friend class Promise<T>;
    357   friend class EventLoop;
    358 };
    359 
    360 constexpr _::Void READY_NOW = _::Void();
    361 // Use this when you need a Promise<void> that is already fulfilled -- this value can be implicitly
    362 // cast to `Promise<void>`.
    363 
    364 constexpr _::NeverDone NEVER_DONE = _::NeverDone();
    365 // The opposite of `READY_NOW`, return this when the promise should never resolve.  This can be
    366 // implicitly converted to any promise type.  You may also call `NEVER_DONE.wait()` to wait
    367 // forever (useful for servers).
    368 
    369 template <typename Func>
    370 PromiseForResult<Func, void> evalLater(Func&& func) KJ_WARN_UNUSED_RESULT;
    371 // Schedule for the given zero-parameter function to be executed in the event loop at some
    372 // point in the near future.  Returns a Promise for its result -- or, if `func()` itself returns
    373 // a promise, `evalLater()` returns a Promise for the result of resolving that promise.
    374 //
    375 // Example usage:
    376 //     Promise<int> x = evalLater([]() { return 123; });
    377 //
    378 // The above is exactly equivalent to:
    379 //     Promise<int> x = Promise<void>(READY_NOW).then([]() { return 123; });
    380 //
    381 // If the returned promise is destroyed before the callback runs, the callback will be canceled
    382 // (never called).
    383 //
    384 // If you schedule several evaluations with `evalLater` during the same callback, they are
    385 // guaranteed to be executed in order.
    386 
    387 template <typename Func>
    388 PromiseForResult<Func, void> evalNow(Func&& func) KJ_WARN_UNUSED_RESULT;
    389 // Run `func()` and return a promise for its result. `func()` executes before `evalNow()` returns.
    390 // If `func()` throws an exception, the exception is caught and wrapped in a promise -- this is the
    391 // main reason why `evalNow()` is useful.
    392 
    393 template <typename Func>
    394 PromiseForResult<Func, void> evalLast(Func&& func) KJ_WARN_UNUSED_RESULT;
    395 // Like `evalLater()`, except that the function doesn't run until the event queue is otherwise
    396 // completely empty and the thread is about to suspend waiting for I/O.
    397 //
    398 // This is useful when you need to perform some disruptive action and you want to make sure that
    399 // you don't interrupt some other task between two .then() continuations. For example, say you want
    400 // to cancel a read() operation on a socket and know for sure that if any bytes were read, you saw
    401 // them. It could be that a read() has completed and bytes have been transferred to the target
    402 // buffer, but the .then() callback that handles the read result hasn't executed yet. If you
    403 // cancel the promise at this inopportune moment, the bytes in the buffer are lost. If you do
    404 // evalLast(), then you can be sure that any pending .then() callbacks had a chance to finish out
    405 // and if you didn't receive the read result yet, then you know nothing has been read, and you can
    406 // simply drop the promise.
    407 //
    408 // If evalLast() is called multiple times, functions are executed in LIFO order. If the first
    409 // callback enqueues new events, then latter callbacks will not execute until those events are
    410 // drained.
    411 
    412 ArrayPtr<void* const> getAsyncTrace(ArrayPtr<void*> space);
    413 kj::String getAsyncTrace();
    414 // If the event loop is currently running in this thread, get a trace back through the promise
    415 // chain leading to the currently-executing event. The format is the same as kj::getStackTrace()
    416 // from exception.c++.
    417 
    418 template <typename Func>
    419 PromiseForResult<Func, void> retryOnDisconnect(Func&& func) KJ_WARN_UNUSED_RESULT;
    420 // Promises to run `func()` asynchronously, retrying once if it fails with a DISCONNECTED exception.
    421 // If the retry also fails, the exception is passed through.
    422 //
    423 // `func()` should return a `Promise`. `retryOnDisconnect(func)` returns the same promise, except
    424 // with the retry logic added.
    425 
    426 template <typename Func>
    427 PromiseForResult<Func, WaitScope&> startFiber(size_t stackSize, Func&& func) KJ_WARN_UNUSED_RESULT;
    428 // Executes `func()` in a fiber, returning a promise for the eventual reseult. `func()` will be
    429 // passed a `WaitScope&` as its parameter, allowing it to call `.wait()` on promises. Thus, `func()`
    430 // can be written in a synchronous, blocking style, instead of using `.then()`. This is often much
    431 // easier to write and read, and may even be significantly faster if it allows the use of stack
    432 // allocation rather than heap allocation.
    433 //
    434 // However, fibers have a major disadvantage: memory must be allocated for the fiber's call stack.
    435 // The entire stack must be allocated at once, making it necessary to choose a stack size upfront
    436 // that is big enough for whatever the fiber needs to do. Estimating this is often difficult. That
    437 // said, over-estimating is not too terrible since pages of the stack will actually be allocated
    438 // lazily when first accessed; actual memory usage will correspond to the "high watermark" of the
    439 // actual stack usage. That said, this lazy allocation forces page faults, which can be quite slow.
    440 // Worse, freeing a stack forces a TLB flush and shootdown -- all currently-executing threads will
    441 // have to be interrupted to flush their CPU cores' TLB caches.
    442 //
    443 // In short, when performance matters, you should try to avoid creating fibers very frequently.
    444 
    445 class FiberPool final {
    446   // A freelist pool of fibers with a set stack size. This improves CPU usage with fibers at
    447   // the expense of memory usage. Fibers in this pool will always use the max amount of memory
    448   // used until the pool is destroyed.
    449 
    450 public:
    451   explicit FiberPool(size_t stackSize);
    452   ~FiberPool() noexcept(false);
    453   KJ_DISALLOW_COPY(FiberPool);
    454 
    455   void setMaxFreelist(size_t count);
    456   // Set the maximum number of stacks to add to the freelist. If the freelist is full, stacks will
    457   // be deleted rather than returned to the freelist.
    458 
    459   void useCoreLocalFreelists();
    460   // EXPERIMENTAL: Call to tell FiberPool to try to use core-local stack freelists, which
    461   //   in theory should increase L1/L2 cache efficacy for freelisted stacks. In practice, as of
    462   //   this writing, no performance advantage has yet been demonstrated. Note that currently this
    463   //   feature is only supported on Linux (the flag has no effect on other operating systems).
    464 
    465   template <typename Func>
    466   PromiseForResult<Func, WaitScope&> startFiber(Func&& func) const KJ_WARN_UNUSED_RESULT;
    467   // Executes `func()` in a fiber from this pool, returning a promise for the eventual result.
    468   // `func()` will be passed a `WaitScope&` as its parameter, allowing it to call `.wait()` on
    469   // promises. Thus, `func()` can be written in a synchronous, blocking style, instead of
    470   // using `.then()`. This is often much easier to write and read, and may even be significantly
    471   // faster if it allows the use of stack allocation rather than heap allocation.
    472 
    473   void runSynchronously(kj::FunctionParam<void()> func) const;
    474   // Use one of the stacks in the pool to synchronously execute func(), returning the result that
    475   // func() returns. This is not the usual use case for fibers, but can be a nice optimization
    476   // in programs that have many threads that mostly only need small stacks, but occasionally need
    477   // a much bigger stack to run some deeply recursive algorithm. If the algorithm is run on each
    478   // thread's normal call stack, then every thread's stack will tend to grow to be very big
    479   // (usually, stacks automatically grow as needed, but do not shrink until the thread exits
    480   // completely). If the thread can share a small set of big stacks that they use only when calling
    481   // the deeply recursive algorithm, and use small stacks for everything else, overall memory usage
    482   // is reduced.
    483   //
    484   // TODO(someday): If func() returns a value, return it from runSynchronously? Current use case
    485   //   doesn't need it.
    486 
    487   size_t getFreelistSize() const;
    488   // Get the number of stacks currently in the freelist. Does not count stacks that are active.
    489 
    490 private:
    491   class Impl;
    492   Own<Impl> impl;
    493 
    494   friend class _::FiberStack;
    495   friend class _::FiberBase;
    496 };
    497 
    498 template <typename T>
    499 Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises);
    500 // Join an array of promises into a promise for an array.
    501 
    502 // =======================================================================================
    503 // Hack for creating a lambda that holds an owned pointer.
    504 
    505 template <typename Func, typename MovedParam>
    506 class CaptureByMove {
    507 public:
    508   inline CaptureByMove(Func&& func, MovedParam&& param)
    509       : func(kj::mv(func)), param(kj::mv(param)) {}
    510 
    511   template <typename... Params>
    512   inline auto operator()(Params&&... params)
    513       -> decltype(kj::instance<Func>()(kj::instance<MovedParam&&>(), kj::fwd<Params>(params)...)) {
    514     return func(kj::mv(param), kj::fwd<Params>(params)...);
    515   }
    516 
    517 private:
    518   Func func;
    519   MovedParam param;
    520 };
    521 
    522 template <typename Func, typename MovedParam>
    523 inline CaptureByMove<Func, Decay<MovedParam>> mvCapture(MovedParam&& param, Func&& func) {
    524   // Hack to create a "lambda" which captures a variable by moving it rather than copying or
    525   // referencing.  C++14 generalized captures should make this obsolete, but for now in C++11 this
    526   // is commonly needed for Promise continuations that own their state.  Example usage:
    527   //
    528   //    Own<Foo> ptr = makeFoo();
    529   //    Promise<int> promise = callRpc();
    530   //    promise.then(mvCapture(ptr, [](Own<Foo>&& ptr, int result) {
    531   //      return ptr->finish(result);
    532   //    }));
    533 
    534   return CaptureByMove<Func, Decay<MovedParam>>(kj::fwd<Func>(func), kj::mv(param));
    535 }
    536 
    537 // =======================================================================================
    538 // Advanced promise construction
    539 
    540 class PromiseRejector {
    541   // Superclass of PromiseFulfiller containing the non-typed methods. Useful when you only really
    542   // need to be able to reject a promise, and you need to operate on fulfillers of different types.
    543 public:
    544   virtual void reject(Exception&& exception) = 0;
    545   virtual bool isWaiting() = 0;
    546 };
    547 
    548 template <typename T>
    549 class PromiseFulfiller: public PromiseRejector {
    550   // A callback which can be used to fulfill a promise.  Only the first call to fulfill() or
    551   // reject() matters; subsequent calls are ignored.
    552 
    553 public:
    554   virtual void fulfill(T&& value) = 0;
    555   // Fulfill the promise with the given value.
    556 
    557   virtual void reject(Exception&& exception) = 0;
    558   // Reject the promise with an error.
    559 
    560   virtual bool isWaiting() = 0;
    561   // Returns true if the promise is still unfulfilled and someone is potentially waiting for it.
    562   // Returns false if fulfill()/reject() has already been called *or* if the promise to be
    563   // fulfilled has been discarded and therefore the result will never be used anyway.
    564 
    565   template <typename Func>
    566   bool rejectIfThrows(Func&& func);
    567   // Call the function (with no arguments) and return true.  If an exception is thrown, call
    568   // `fulfiller.reject()` and then return false.  When compiled with exceptions disabled,
    569   // non-fatal exceptions are still detected and handled correctly.
    570 };
    571 
    572 template <>
    573 class PromiseFulfiller<void>: public PromiseRejector {
    574   // Specialization of PromiseFulfiller for void promises.  See PromiseFulfiller<T>.
    575 
    576 public:
    577   virtual void fulfill(_::Void&& value = _::Void()) = 0;
    578   // Call with zero parameters.  The parameter is a dummy that only exists so that subclasses don't
    579   // have to specialize for <void>.
    580 
    581   virtual void reject(Exception&& exception) = 0;
    582   virtual bool isWaiting() = 0;
    583 
    584   template <typename Func>
    585   bool rejectIfThrows(Func&& func);
    586 };
    587 
    588 template <typename T, typename Adapter, typename... Params>
    589 _::ReducePromises<T> newAdaptedPromise(Params&&... adapterConstructorParams);
    590 // Creates a new promise which owns an instance of `Adapter` which encapsulates the operation
    591 // that will eventually fulfill the promise.  This is primarily useful for adapting non-KJ
    592 // asynchronous APIs to use promises.
    593 //
    594 // An instance of `Adapter` will be allocated and owned by the returned `Promise`.  A
    595 // `PromiseFulfiller<T>&` will be passed as the first parameter to the adapter's constructor,
    596 // and `adapterConstructorParams` will be forwarded as the subsequent parameters.  The adapter
    597 // is expected to perform some asynchronous operation and call the `PromiseFulfiller<T>` once
    598 // it is finished.
    599 //
    600 // The adapter is destroyed when its owning Promise is destroyed.  This may occur before the
    601 // Promise has been fulfilled.  In this case, the adapter's destructor should cancel the
    602 // asynchronous operation.  Once the adapter is destroyed, the fulfillment callback cannot be
    603 // called.
    604 //
    605 // An adapter implementation should be carefully written to ensure that it cannot accidentally
    606 // be left unfulfilled permanently because of an exception.  Consider making liberal use of
    607 // `PromiseFulfiller<T>::rejectIfThrows()`.
    608 
    609 template <typename T>
    610 struct PromiseFulfillerPair {
    611   _::ReducePromises<T> promise;
    612   Own<PromiseFulfiller<T>> fulfiller;
    613 };
    614 
    615 template <typename T>
    616 PromiseFulfillerPair<T> newPromiseAndFulfiller();
    617 // Construct a Promise and a separate PromiseFulfiller which can be used to fulfill the promise.
    618 // If the PromiseFulfiller is destroyed before either of its methods are called, the Promise is
    619 // implicitly rejected.
    620 //
    621 // Although this function is easier to use than `newAdaptedPromise()`, it has the serious drawback
    622 // that there is no way to handle cancellation (i.e. detect when the Promise is discarded).
    623 //
    624 // You can arrange to fulfill a promise with another promise by using a promise type for T.  E.g.
    625 // `newPromiseAndFulfiller<Promise<U>>()` will produce a promise of type `Promise<U>` but the
    626 // fulfiller will be of type `PromiseFulfiller<Promise<U>>`.  Thus you pass a `Promise<U>` to the
    627 // `fulfill()` callback, and the promises are chained.
    628 
    629 template <typename T>
    630 class CrossThreadPromiseFulfiller: public kj::PromiseFulfiller<T> {
    631   // Like PromiseFulfiller<T> but the methods are `const`, indicating they can safely be called
    632   // from another thread.
    633 
    634 public:
    635   virtual void fulfill(T&& value) const = 0;
    636   virtual void reject(Exception&& exception) const = 0;
    637   virtual bool isWaiting() const = 0;
    638 
    639   void fulfill(T&& value) override { return constThis()->fulfill(kj::fwd<T>(value)); }
    640   void reject(Exception&& exception) override { return constThis()->reject(kj::mv(exception)); }
    641   bool isWaiting() override { return constThis()->isWaiting(); }
    642 
    643 private:
    644   const CrossThreadPromiseFulfiller* constThis() { return this; }
    645 };
    646 
    647 template <>
    648 class CrossThreadPromiseFulfiller<void>: public kj::PromiseFulfiller<void> {
    649   // Specialization of CrossThreadPromiseFulfiller for void promises.  See
    650   // CrossThreadPromiseFulfiller<T>.
    651 
    652 public:
    653   virtual void fulfill(_::Void&& value = _::Void()) const = 0;
    654   virtual void reject(Exception&& exception) const = 0;
    655   virtual bool isWaiting() const = 0;
    656 
    657   void fulfill(_::Void&& value) override { return constThis()->fulfill(kj::mv(value)); }
    658   void reject(Exception&& exception) override { return constThis()->reject(kj::mv(exception)); }
    659   bool isWaiting() override { return constThis()->isWaiting(); }
    660 
    661 private:
    662   const CrossThreadPromiseFulfiller* constThis() { return this; }
    663 };
    664 
    665 template <typename T>
    666 struct PromiseCrossThreadFulfillerPair {
    667   _::ReducePromises<T> promise;
    668   Own<CrossThreadPromiseFulfiller<T>> fulfiller;
    669 };
    670 
    671 template <typename T>
    672 PromiseCrossThreadFulfillerPair<T> newPromiseAndCrossThreadFulfiller();
    673 // Like `newPromiseAndFulfiller()`, but the fulfiller is allowed to be invoked from any thread,
    674 // not just the one that called this method. Note that the Promise is still tied to the calling
    675 // thread's event loop and *cannot* be used from another thread -- only the PromiseFulfiller is
    676 // cross-thread.
    677 
    678 // =======================================================================================
    679 // Canceler
    680 
    681 class Canceler {
    682   // A Canceler can wrap some set of Promises and then forcefully cancel them on-demand, or
    683   // implicitly when the Canceler is destroyed.
    684   //
    685   // The cancellation is done in such a way that once cancel() (or the Canceler's destructor)
    686   // returns, it's guaranteed that the promise has already been canceled and destroyed. This
    687   // guarantee is important for enforcing ownership constraints. For example, imagine that Alice
    688   // calls a method on Bob that returns a Promise. That Promise encapsulates a task that uses Bob's
    689   // internal state. But, imagine that Alice does not own Bob, and indeed Bob might be destroyed
    690   // at random without Alice having canceled the promise. In this case, it is necessary for Bob to
    691   // ensure that the promise will be forcefully canceled. Bob can do this by constructing a
    692   // Canceler and using it to wrap promises before returning them to callers. When Bob is
    693   // destroyed, the Canceler is destroyed too, and all promises Bob wrapped with it throw errors.
    694   //
    695   // Note that another common strategy for cancelation is to use exclusiveJoin() to join a promise
    696   // with some "cancellation promise" which only resolves if the operation should be canceled. The
    697   // cancellation promise could itself be created by newPromiseAndFulfiller<void>(), and thus
    698   // calling the PromiseFulfiller cancels the operation. There is a major problem with this
    699   // approach: upon invoking the fulfiller, an arbitrary amount of time may pass before the
    700   // exclusive-joined promise actually resolves and cancels its other fork. During that time, the
    701   // task might continue to execute. If it holds pointers to objects that have been destroyed, this
    702   // might cause segfaults. Thus, it is safer to use a Canceler.
    703 
    704 public:
    705   inline Canceler() {}
    706   ~Canceler() noexcept(false);
    707   KJ_DISALLOW_COPY(Canceler);
    708 
    709   template <typename T>
    710   Promise<T> wrap(Promise<T> promise) {
    711     return newAdaptedPromise<T, AdapterImpl<T>>(*this, kj::mv(promise));
    712   }
    713 
    714   void cancel(StringPtr cancelReason);
    715   void cancel(const Exception& exception);
    716   // Cancel all previously-wrapped promises that have not already completed, causing them to throw
    717   // the given exception. If you provide just a description message instead of an exception, then
    718   // an exception object will be constructed from it -- but only if there are requests to cancel.
    719 
    720   void release();
    721   // Releases previously-wrapped promises, so that they will not be canceled regardless of what
    722   // happens to this Canceler.
    723 
    724   bool isEmpty() const { return list == nullptr; }
    725   // Indicates if any previously-wrapped promises are still executing. (If this returns true, then
    726   // cancel() would be a no-op.)
    727 
    728 private:
    729   class AdapterBase {
    730   public:
    731     AdapterBase(Canceler& canceler);
    732     ~AdapterBase() noexcept(false);
    733 
    734     virtual void cancel(Exception&& e) = 0;
    735 
    736     void unlink();
    737 
    738   private:
    739     Maybe<Maybe<AdapterBase&>&> prev;
    740     Maybe<AdapterBase&> next;
    741     friend class Canceler;
    742   };
    743 
    744   template <typename T>
    745   class AdapterImpl: public AdapterBase {
    746   public:
    747     AdapterImpl(PromiseFulfiller<T>& fulfiller,
    748                 Canceler& canceler, Promise<T> inner)
    749         : AdapterBase(canceler),
    750           fulfiller(fulfiller),
    751           inner(inner.then(
    752               [&fulfiller](T&& value) { fulfiller.fulfill(kj::mv(value)); },
    753               [&fulfiller](Exception&& e) { fulfiller.reject(kj::mv(e)); })
    754               .eagerlyEvaluate(nullptr)) {}
    755 
    756     void cancel(Exception&& e) override {
    757       fulfiller.reject(kj::mv(e));
    758       inner = nullptr;
    759     }
    760 
    761   private:
    762     PromiseFulfiller<T>& fulfiller;
    763     Promise<void> inner;
    764   };
    765 
    766   Maybe<AdapterBase&> list;
    767 };
    768 
    769 template <>
    770 class Canceler::AdapterImpl<void>: public AdapterBase {
    771 public:
    772   AdapterImpl(kj::PromiseFulfiller<void>& fulfiller,
    773               Canceler& canceler, kj::Promise<void> inner);
    774   void cancel(kj::Exception&& e) override;
    775   // These must be defined in async.c++ to prevent translation units compiled by MSVC from trying to
    776   // link with symbols defined in async.c++ merely because they included async.h.
    777 
    778 private:
    779   kj::PromiseFulfiller<void>& fulfiller;
    780   kj::Promise<void> inner;
    781 };
    782 
    783 // =======================================================================================
    784 // TaskSet
    785 
    786 class TaskSet {
    787   // Holds a collection of Promise<void>s and ensures that each executes to completion.  Memory
    788   // associated with each promise is automatically freed when the promise completes.  Destroying
    789   // the TaskSet itself automatically cancels all unfinished promises.
    790   //
    791   // This is useful for "daemon" objects that perform background tasks which aren't intended to
    792   // fulfill any particular external promise, but which may need to be canceled (and thus can't
    793   // use `Promise::detach()`).  The daemon object holds a TaskSet to collect these tasks it is
    794   // working on.  This way, if the daemon itself is destroyed, the TaskSet is detroyed as well,
    795   // and everything the daemon is doing is canceled.
    796 
    797 public:
    798   class ErrorHandler {
    799   public:
    800     virtual void taskFailed(kj::Exception&& exception) = 0;
    801   };
    802 
    803   TaskSet(ErrorHandler& errorHandler);
    804   // `errorHandler` will be executed any time a task throws an exception, and will execute within
    805   // the given EventLoop.
    806 
    807   ~TaskSet() noexcept(false);
    808 
    809   void add(Promise<void>&& promise);
    810 
    811   kj::String trace();
    812   // Return debug info about all promises currently in the TaskSet.
    813 
    814   bool isEmpty() { return tasks == nullptr; }
    815   // Check if any tasks are running.
    816 
    817   Promise<void> onEmpty();
    818   // Returns a promise that fulfills the next time the TaskSet is empty. Only one such promise can
    819   // exist at a time.
    820 
    821 private:
    822   class Task;
    823 
    824   TaskSet::ErrorHandler& errorHandler;
    825   Maybe<Own<Task>> tasks;
    826   Maybe<Own<PromiseFulfiller<void>>> emptyFulfiller;
    827 };
    828 
    829 // =======================================================================================
    830 // Cross-thread execution.
    831 
    832 class Executor {
    833   // Executes code on another thread's event loop.
    834   //
    835   // Use `kj::getCurrentThreadExecutor()` to get an executor that schedules calls on the current
    836   // thread's event loop. You may then pass the reference to other threads to enable them to call
    837   // back to this one.
    838 
    839 public:
    840   Executor(EventLoop& loop, Badge<EventLoop>);
    841   ~Executor() noexcept(false);
    842 
    843   virtual kj::Own<const Executor> addRef() const = 0;
    844   // Add a reference to this Executor. The Executor will not be destroyed until all references are
    845   // dropped. This uses atomic refcounting for thread-safety.
    846   //
    847   // Use this when you can't guarantee that the target thread's event loop won't concurrently exit
    848   // (including due to an uncaught exception!) while another thread is still using the Executor.
    849   // Otherwise, the Executor object is destroyed when the owning event loop exits.
    850   //
    851   // If the target event loop has exited, then `execute{Async,Sync}` will throw DISCONNECTED
    852   // exceptions.
    853 
    854   bool isLive() const;
    855   // Returns true if the remote event loop still exists, false if it has been destroyed. In the
    856   // latter case, `execute{Async,Sync}()` will definitely throw. Of course, if this returns true,
    857   // it could still change to false at any moment, and `execute{Async,Sync}()` could still throw as
    858   // a result.
    859   //
    860   // TODO(cleanup): Should we have tryExecute{Async,Sync}() that return Maybes that are null if
    861   //   the remote event loop exited? Currently there are multiple known use cases that check
    862   //   isLive() after catching a DISCONNECTED exception to decide whether it is due to the executor
    863   //   exiting, and then handling that case. This is borderline in violation of KJ exception
    864   //   philosophy, but right now I'm not excited about the extra template metaprogramming needed
    865   //   for "try" versions...
    866 
    867   template <typename Func>
    868   PromiseForResult<Func, void> executeAsync(Func&& func) const;
    869   // Call from any thread to request that the given function be executed on the executor's thread,
    870   // returning a promise for the result.
    871   //
    872   // The Promise returned by executeAsync() belongs to the requesting thread, not the executor
    873   // thread. Hence, for example, continuations added to this promise with .then() will execute in
    874   // the requesting thread.
    875   //
    876   // If func() itself returns a Promise, that Promise is *not* returned verbatim to the requesting
    877   // thread -- after all, Promise objects cannot be used cross-thread. Instead, the executor thread
    878   // awaits the promise. Once it resolves to a final result, that result is transferred to the
    879   // requesting thread, resolving the promise that executeAsync() returned earlier.
    880   //
    881   // `func` will be destroyed in the requesting thread, after the final result has been returned
    882   // from the executor thread. This means that it is safe for `func` to capture objects that cannot
    883   // safely be destroyed from another thread. It is also safe for `func` to be an lvalue reference,
    884   // so long as the functor remains live until the promise completes or is canceled, and the
    885   // function is thread-safe.
    886   //
    887   // Of course, the body of `func` must be careful that any access it makes on these objects is
    888   // safe cross-thread. For example, it must not attempt to access Promise-related objects
    889   // cross-thread; you cannot create a `PromiseFulfiller` in one thread and then `fulfill()` it
    890   // from another. Unfortunately, the usual convention of using const-correctness to enforce
    891   // thread-safety does not work here, because applications can often ensure that `func` has
    892   // exclusive access to captured objects, and thus can safely mutate them even in non-thread-safe
    893   // ways; the const qualifier is not sufficient to express this.
    894   //
    895   // The final return value of `func` is transferred between threads, and hence is constructed and
    896   // destroyed in separate threads. It is the app's responsibility to make sure this is OK.
    897   // Alternatively, the app can perhaps arrange to send the return value back to the original
    898   // thread for destruction, if needed.
    899   //
    900   // If the requesting thread destroys the returned Promise, the destructor will block waiting for
    901   // the executor thread to acknowledge cancellation. This ensures that `func` can be destroyed
    902   // before the Promise's destructor returns.
    903   //
    904   // Multiple calls to executeAsync() from the same requesting thread to the same target thread
    905   // will be delivered in the same order in which they were requested. (However, if func() returns
    906   // a promise, delivery of subsequent calls is not blocked on that promise. In other words, this
    907   // call provides E-Order in the same way as Cap'n Proto.)
    908 
    909   template <typename Func>
    910   _::UnwrapPromise<PromiseForResult<Func, void>> executeSync(Func&& func) const;
    911   // Schedules `func()` to execute on the executor thread, and then blocks the requesting thread
    912   // until `func()` completes. If `func()` returns a Promise, then the wait will continue until
    913   // that promise resolves, and the final result will be returned to the requesting thread.
    914   //
    915   // The requesting thread does not need to have an EventLoop. If it does have an EventLoop, that
    916   // loop will *not* execute while the thread is blocked. This method is particularly useful to
    917   // allow non-event-loop threads to perform I/O via a separate event-loop thread.
    918   //
    919   // As with `executeAsync()`, `func` is always destroyed on the requesting thread, after the
    920   // executor thread has signaled completion. The return value is transferred between threads.
    921 
    922 private:
    923   struct Impl;
    924   Own<Impl> impl;
    925   // To avoid including mutex.h...
    926 
    927   friend class EventLoop;
    928   friend class _::XThreadEvent;
    929   friend class _::XThreadPaf;
    930 
    931   void send(_::XThreadEvent& event, bool sync) const;
    932   void wait();
    933   bool poll();
    934 
    935   EventLoop& getLoop() const;
    936 };
    937 
    938 const Executor& getCurrentThreadExecutor();
    939 // Get the executor for the current thread's event loop. This reference can then be passed to other
    940 // threads.
    941 
    942 // =======================================================================================
    943 // The EventLoop class
    944 
    945 class EventPort {
    946   // Interfaces between an `EventLoop` and events originating from outside of the loop's thread.
    947   // All such events come in through the `EventPort` implementation.
    948   //
    949   // An `EventPort` implementation may interface with low-level operating system APIs and/or other
    950   // threads.  You can also write an `EventPort` which wraps some other (non-KJ) event loop
    951   // framework, allowing the two to coexist in a single thread.
    952 
    953 public:
    954   virtual bool wait() = 0;
    955   // Wait for an external event to arrive, sleeping if necessary.  Once at least one event has
    956   // arrived, queue it to the event loop (e.g. by fulfilling a promise) and return.
    957   //
    958   // This is called during `Promise::wait()` whenever the event queue becomes empty, in order to
    959   // wait for new events to populate the queue.
    960   //
    961   // It is safe to return even if nothing has actually been queued, so long as calling `wait()` in
    962   // a loop will eventually sleep.  (That is to say, false positives are fine.)
    963   //
    964   // Returns true if wake() has been called from another thread. (Precisely, returns true if
    965   // no previous call to wait `wait()` nor `poll()` has returned true since `wake()` was last
    966   // called.)
    967 
    968   virtual bool poll() = 0;
    969   // Check if any external events have arrived, but do not sleep.  If any events have arrived,
    970   // add them to the event queue (e.g. by fulfilling promises) before returning.
    971   //
    972   // This may be called during `Promise::wait()` when the EventLoop has been executing for a while
    973   // without a break but is still non-empty.
    974   //
    975   // Returns true if wake() has been called from another thread. (Precisely, returns true if
    976   // no previous call to wait `wait()` nor `poll()` has returned true since `wake()` was last
    977   // called.)
    978 
    979   virtual void setRunnable(bool runnable);
    980   // Called to notify the `EventPort` when the `EventLoop` has work to do; specifically when it
    981   // transitions from empty -> runnable or runnable -> empty.  This is typically useful when
    982   // integrating with an external event loop; if the loop is currently runnable then you should
    983   // arrange to call run() on it soon.  The default implementation does nothing.
    984 
    985   virtual void wake() const;
    986   // Wake up the EventPort's thread from another thread.
    987   //
    988   // Unlike all other methods on this interface, `wake()` may be called from another thread, hence
    989   // it is `const`.
    990   //
    991   // Technically speaking, `wake()` causes the target thread to cease sleeping and not to sleep
    992   // again until `wait()` or `poll()` has returned true at least once.
    993   //
    994   // The default implementation throws an UNIMPLEMENTED exception.
    995 };
    996 
    997 class EventLoop {
    998   // Represents a queue of events being executed in a loop.  Most code won't interact with
    999   // EventLoop directly, but instead use `Promise`s to interact with it indirectly.  See the
   1000   // documentation for `Promise`.
   1001   //
   1002   // Each thread can have at most one current EventLoop.  To make an `EventLoop` current for
   1003   // the thread, create a `WaitScope`.  Async APIs require that the thread has a current EventLoop,
   1004   // or they will throw exceptions.  APIs that use `Promise::wait()` additionally must explicitly
   1005   // be passed a reference to the `WaitScope` to make the caller aware that they might block.
   1006   //
   1007   // Generally, you will want to construct an `EventLoop` at the top level of your program, e.g.
   1008   // in the main() function, or in the start function of a thread.  You can then use it to
   1009   // construct some promises and wait on the result.  Example:
   1010   //
   1011   //     int main() {
   1012   //       // `loop` becomes the official EventLoop for the thread.
   1013   //       MyEventPort eventPort;
   1014   //       EventLoop loop(eventPort);
   1015   //
   1016   //       // Now we can call an async function.
   1017   //       Promise<String> textPromise = getHttp("http://example.com");
   1018   //
   1019   //       // And we can wait for the promise to complete.  Note that you can only use `wait()`
   1020   //       // from the top level, not from inside a promise callback.
   1021   //       String text = textPromise.wait();
   1022   //       print(text);
   1023   //       return 0;
   1024   //     }
   1025   //
   1026   // Most applications that do I/O will prefer to use `setupAsyncIo()` from `async-io.h` rather
   1027   // than allocate an `EventLoop` directly.
   1028 
   1029 public:
   1030   EventLoop();
   1031   // Construct an `EventLoop` which does not receive external events at all.
   1032 
   1033   explicit EventLoop(EventPort& port);
   1034   // Construct an `EventLoop` which receives external events through the given `EventPort`.
   1035 
   1036   ~EventLoop() noexcept(false);
   1037 
   1038   void run(uint maxTurnCount = maxValue);
   1039   // Run the event loop for `maxTurnCount` turns or until there is nothing left to be done,
   1040   // whichever comes first.  This never calls the `EventPort`'s `sleep()` or `poll()`.  It will
   1041   // call the `EventPort`'s `setRunnable(false)` if the queue becomes empty.
   1042 
   1043   bool isRunnable();
   1044   // Returns true if run() would currently do anything, or false if the queue is empty.
   1045 
   1046   const Executor& getExecutor();
   1047   // Returns an Executor that can be used to schedule events on this EventLoop from another thread.
   1048   //
   1049   // Use the global function kj::getCurrentThreadExecutor() to get the current thread's EventLoop's
   1050   // Executor.
   1051   //
   1052   // Note that this is only needed for cross-thread scheduling. To schedule code to run later in
   1053   // the current thread, use `kj::evalLater()`, which will be more efficient.
   1054 
   1055 private:
   1056   kj::Maybe<EventPort&> port;
   1057   // If null, this thread doesn't receive I/O events from the OS. It can potentially receive
   1058   // events from other threads via the Executor.
   1059 
   1060   bool running = false;
   1061   // True while looping -- wait() is then not allowed.
   1062 
   1063   bool lastRunnableState = false;
   1064   // What did we last pass to port.setRunnable()?
   1065 
   1066   _::Event* head = nullptr;
   1067   _::Event** tail = &head;
   1068   _::Event** depthFirstInsertPoint = &head;
   1069   _::Event** breadthFirstInsertPoint = &head;
   1070 
   1071   kj::Maybe<Own<Executor>> executor;
   1072   // Allocated the first time getExecutor() is requested, making cross-thread request possible.
   1073 
   1074   Own<TaskSet> daemons;
   1075 
   1076   _::Event* currentlyFiring = nullptr;
   1077 
   1078   bool turn();
   1079   void setRunnable(bool runnable);
   1080   void enterScope();
   1081   void leaveScope();
   1082 
   1083   void wait();
   1084   void poll();
   1085 
   1086   friend void _::detach(kj::Promise<void>&& promise);
   1087   friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result,
   1088                           WaitScope& waitScope);
   1089   friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope);
   1090   friend class _::Event;
   1091   friend class WaitScope;
   1092   friend class Executor;
   1093   friend class _::XThreadEvent;
   1094   friend class _::XThreadPaf;
   1095   friend class _::FiberBase;
   1096   friend class _::FiberStack;
   1097   friend ArrayPtr<void* const> getAsyncTrace(ArrayPtr<void*> space);
   1098 };
   1099 
   1100 class WaitScope {
   1101   // Represents a scope in which asynchronous programming can occur.  A `WaitScope` should usually
   1102   // be allocated on the stack and serves two purposes:
   1103   // * While the `WaitScope` exists, its `EventLoop` is registered as the current loop for the
   1104   //   thread.  Most operations dealing with `Promise` (including all of its methods) do not work
   1105   //   unless the thread has a current `EventLoop`.
   1106   // * `WaitScope` may be passed to `Promise::wait()` to synchronously wait for a particular
   1107   //   promise to complete.  See `Promise::wait()` for an extended discussion.
   1108 
   1109 public:
   1110   inline explicit WaitScope(EventLoop& loop): loop(loop) { loop.enterScope(); }
   1111   inline ~WaitScope() { if (fiber == nullptr) loop.leaveScope(); }
   1112   KJ_DISALLOW_COPY(WaitScope);
   1113 
   1114   void poll();
   1115   // Pumps the event queue and polls for I/O until there's nothing left to do (without blocking).
   1116   //
   1117   // Not supported in fibers.
   1118 
   1119   void setBusyPollInterval(uint count) { busyPollInterval = count; }
   1120   // Set the maximum number of events to run in a row before calling poll() on the EventPort to
   1121   // check for new I/O.
   1122   //
   1123   // This has no effect when used in a fiber.
   1124 
   1125   void runEventCallbacksOnStackPool(kj::Maybe<const FiberPool&> pool) { runningStacksPool = pool; }
   1126   // Arranges to switch stacks while event callbacks are executing. This is an optimization that
   1127   // is useful for programs that use extremely high thread counts, where each thread has its own
   1128   // event loop, but each thread has relatively low event throughput, i.e. each thread spends
   1129   // most of its time waiting for I/O. Normally, the biggest problem with having lots of threads
   1130   // is that each thread must allocate a stack, and stacks can take a lot of memory if the
   1131   // application commonly makes deep calls. But, most of that stack space is only needed while
   1132   // the thread is executing, not while it's sleeping. So, if threads only switch to a big stack
   1133   // during execution, switching back when it's time to sleep, and if those stacks are freelisted
   1134   // so that they can be shared among threads, then a lot of memory is saved.
   1135   //
   1136   // We use the `FiberPool` type here because it implements a freelist of stacks, which is exactly
   1137   // what we happen to want! In our case, though, we don't use those stacks to implement fibers;
   1138   // we use them as the main thread stack.
   1139   //
   1140   // This has no effect if this WaitScope itself is for a fiber.
   1141   //
   1142   // Pass `nullptr` as the parameter to go back to running events on the main stack.
   1143 
   1144   void cancelAllDetached();
   1145   // HACK: Immediately cancel all detached promises.
   1146   //
   1147   // New code should not use detached promises, and therefore should not need this.
   1148   //
   1149   // This method exists to help existing code deal with the problems of detached promises,
   1150   // especially at teardown time.
   1151   //
   1152   // This method may be removed in the future.
   1153 
   1154 private:
   1155   EventLoop& loop;
   1156   uint busyPollInterval = kj::maxValue;
   1157 
   1158   kj::Maybe<_::FiberBase&> fiber;
   1159   kj::Maybe<const FiberPool&> runningStacksPool;
   1160 
   1161   explicit WaitScope(EventLoop& loop, _::FiberBase& fiber)
   1162       : loop(loop), fiber(fiber) {}
   1163 
   1164   template <typename Func>
   1165   inline void runOnStackPool(Func&& func) {
   1166     KJ_IF_MAYBE(pool, runningStacksPool) {
   1167       pool->runSynchronously(kj::fwd<Func>(func));
   1168     } else {
   1169       func();
   1170     }
   1171   }
   1172 
   1173   friend class EventLoop;
   1174   friend class _::FiberBase;
   1175   friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result,
   1176                           WaitScope& waitScope);
   1177   friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope);
   1178 };
   1179 
   1180 }  // namespace kj
   1181 
   1182 #define KJ_ASYNC_H_INCLUDED
   1183 #include "async-inl.h"
   1184 
   1185 KJ_END_HEADER