async.h (57525B)
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #pragma once 23 24 #include "async-prelude.h" 25 #include "exception.h" 26 #include "refcount.h" 27 28 KJ_BEGIN_HEADER 29 30 #ifndef KJ_USE_FIBERS 31 #if __BIONIC__ || __FreeBSD__ || __OpenBSD__ || KJ_NO_EXCEPTIONS 32 // These platforms don't support fibers. 33 #define KJ_USE_FIBERS 0 34 #else 35 #define KJ_USE_FIBERS 1 36 #endif 37 #else 38 #if KJ_NO_EXCEPTIONS && KJ_USE_FIBERS 39 #error "Fibers cannot be enabled when exceptions are disabled." 40 #endif 41 #endif 42 43 namespace kj { 44 45 class EventLoop; 46 class WaitScope; 47 48 template <typename T> 49 class Promise; 50 template <typename T> 51 class ForkedPromise; 52 template <typename T> 53 class PromiseFulfiller; 54 template <typename T> 55 struct PromiseFulfillerPair; 56 57 template <typename Func> 58 class FunctionParam; 59 60 template <typename Func, typename T> 61 using PromiseForResult = _::ReducePromises<_::ReturnType<Func, T>>; 62 // Evaluates to the type of Promise for the result of calling functor type Func with parameter type 63 // T. If T is void, then the promise is for the result of calling Func with no arguments. If 64 // Func itself returns a promise, the promises are joined, so you never get Promise<Promise<T>>. 65 66 // ======================================================================================= 67 // Promises 68 69 template <typename T> 70 class Promise: protected _::PromiseBase { 71 // The basic primitive of asynchronous computation in KJ. Similar to "futures", but designed 72 // specifically for event loop concurrency. Similar to E promises and JavaScript Promises/A. 73 // 74 // A Promise represents a promise to produce a value of type T some time in the future. Once 75 // that value has been produced, the promise is "fulfilled". Alternatively, a promise can be 76 // "broken", with an Exception describing what went wrong. You may implicitly convert a value of 77 // type T to an already-fulfilled Promise<T>. You may implicitly convert the constant 78 // `kj::READY_NOW` to an already-fulfilled Promise<void>. You may also implicitly convert a 79 // `kj::Exception` to an already-broken promise of any type. 80 // 81 // Promises are linear types -- they are moveable but not copyable. If a Promise is destroyed 82 // or goes out of scope (without being moved elsewhere), any ongoing asynchronous operations 83 // meant to fulfill the promise will be canceled if possible. All methods of `Promise` (unless 84 // otherwise noted) actually consume the promise in the sense of move semantics. (Arguably they 85 // should be rvalue-qualified, but at the time this interface was created compilers didn't widely 86 // support that yet and anyway it would be pretty ugly typing kj::mv(promise).whatever().) If 87 // you want to use one Promise in two different places, you must fork it with `fork()`. 88 // 89 // To use the result of a Promise, you must call `then()` and supply a callback function to 90 // call with the result. `then()` returns another promise, for the result of the callback. 91 // Any time that this would result in Promise<Promise<T>>, the promises are collapsed into a 92 // simple Promise<T> that first waits for the outer promise, then the inner. Example: 93 // 94 // // Open a remote file, read the content, and then count the 95 // // number of lines of text. 96 // // Note that none of the calls here block. `file`, `content` 97 // // and `lineCount` are all initialized immediately before any 98 // // asynchronous operations occur. The lambda callbacks are 99 // // called later. 100 // Promise<Own<File>> file = openFtp("ftp://host/foo/bar"); 101 // Promise<String> content = file.then( 102 // [](Own<File> file) -> Promise<String> { 103 // return file.readAll(); 104 // }); 105 // Promise<int> lineCount = content.then( 106 // [](String text) -> int { 107 // uint count = 0; 108 // for (char c: text) count += (c == '\n'); 109 // return count; 110 // }); 111 // 112 // For `then()` to work, the current thread must have an active `EventLoop`. Each callback 113 // is scheduled to execute in that loop. Since `then()` schedules callbacks only on the current 114 // thread's event loop, you do not need to worry about two callbacks running at the same time. 115 // You will need to set up at least one `EventLoop` at the top level of your program before you 116 // can use promises. 117 // 118 // To adapt a non-Promise-based asynchronous API to promises, use `newAdaptedPromise()`. 119 // 120 // Systems using promises should consider supporting the concept of "pipelining". Pipelining 121 // means allowing a caller to start issuing method calls against a promised object before the 122 // promise has actually been fulfilled. This is particularly useful if the promise is for a 123 // remote object living across a network, as this can avoid round trips when chaining a series 124 // of calls. It is suggested that any class T which supports pipelining implement a subclass of 125 // Promise<T> which adds "eventual send" methods -- methods which, when called, say "please 126 // invoke the corresponding method on the promised value once it is available". These methods 127 // should in turn return promises for the eventual results of said invocations. Cap'n Proto, 128 // for example, implements the type `RemotePromise` which supports pipelining RPC requests -- see 129 // `capnp/capability.h`. 130 // 131 // KJ Promises are based on E promises: 132 // http://wiki.erights.org/wiki/Walnut/Distributed_Computing#Promises 133 // 134 // KJ Promises are also inspired in part by the evolving standards for JavaScript/ECMAScript 135 // promises, which are themselves influenced by E promises: 136 // http://promisesaplus.com/ 137 // https://github.com/domenic/promises-unwrapping 138 139 public: 140 Promise(_::FixVoid<T> value); 141 // Construct an already-fulfilled Promise from a value of type T. For non-void promises, the 142 // parameter type is simply T. So, e.g., in a function that returns `Promise<int>`, you can 143 // say `return 123;` to return a promise that is already fulfilled to 123. 144 // 145 // For void promises, use `kj::READY_NOW` as the value, e.g. `return kj::READY_NOW`. 146 147 Promise(kj::Exception&& e); 148 // Construct an already-broken Promise. 149 150 inline Promise(decltype(nullptr)) {} 151 152 template <typename Func, typename ErrorFunc = _::PropagateException> 153 PromiseForResult<Func, T> then(Func&& func, ErrorFunc&& errorHandler = _::PropagateException()) 154 KJ_WARN_UNUSED_RESULT; 155 // Register a continuation function to be executed when the promise completes. The continuation 156 // (`func`) takes the promised value (an rvalue of type `T`) as its parameter. The continuation 157 // may return a new value; `then()` itself returns a promise for the continuation's eventual 158 // result. If the continuation itself returns a `Promise<U>`, then `then()` shall also return 159 // a `Promise<U>` which first waits for the original promise, then executes the continuation, 160 // then waits for the inner promise (i.e. it automatically "unwraps" the promise). 161 // 162 // In all cases, `then()` returns immediately. The continuation is executed later. The 163 // continuation is always executed on the same EventLoop (and, therefore, the same thread) which 164 // called `then()`, therefore no synchronization is necessary on state shared by the continuation 165 // and the surrounding scope. If no EventLoop is running on the current thread, `then()` throws 166 // an exception. 167 // 168 // You may also specify an error handler continuation as the second parameter. `errorHandler` 169 // must be a functor taking a parameter of type `kj::Exception&&`. It must return the same 170 // type as `func` returns (except when `func` returns `Promise<U>`, in which case `errorHandler` 171 // may return either `Promise<U>` or just `U`). The default error handler simply propagates the 172 // exception to the returned promise. 173 // 174 // Either `func` or `errorHandler` may, of course, throw an exception, in which case the promise 175 // is broken. When compiled with -fno-exceptions, the framework will still detect when a 176 // recoverable exception was thrown inside of a continuation and will consider the promise 177 // broken even though a (presumably garbage) result was returned. 178 // 179 // If the returned promise is destroyed before the callback runs, the callback will be canceled 180 // (it will never run). 181 // 182 // Note that `then()` -- like all other Promise methods -- consumes the promise on which it is 183 // called, in the sense of move semantics. After returning, the original promise is no longer 184 // valid, but `then()` returns a new promise. 185 // 186 // *Advanced implementation tips:* Most users will never need to worry about the below, but 187 // it is good to be aware of. 188 // 189 // As an optimization, if the callback function `func` does _not_ return another promise, then 190 // execution of `func` itself may be delayed until its result is known to be needed. The 191 // expectation here is that `func` is just doing some transformation on the results, not 192 // scheduling any other actions, therefore the system doesn't need to be proactive about 193 // evaluating it. This way, a chain of trivial then() transformations can be executed all at 194 // once without repeatedly re-scheduling through the event loop. Use the `eagerlyEvaluate()` 195 // method to suppress this behavior. 196 // 197 // On the other hand, if `func` _does_ return another promise, then the system evaluates `func` 198 // as soon as possible, because the promise it returns might be for a newly-scheduled 199 // long-running asynchronous task. 200 // 201 // As another optimization, when a callback function registered with `then()` is actually 202 // scheduled, it is scheduled to occur immediately, preempting other work in the event queue. 203 // This allows a long chain of `then`s to execute all at once, improving cache locality by 204 // clustering operations on the same data. However, this implies that starvation can occur 205 // if a chain of `then()`s takes a very long time to execute without ever stopping to wait for 206 // actual I/O. To solve this, use `kj::evalLater()` to yield control; this way, all other events 207 // in the queue will get a chance to run before your callback is executed. 208 209 Promise<void> ignoreResult() KJ_WARN_UNUSED_RESULT { return then([](T&&) {}); } 210 // Convenience method to convert the promise to a void promise by ignoring the return value. 211 // 212 // You must still wait on the returned promise if you want the task to execute. 213 214 template <typename ErrorFunc> 215 Promise<T> catch_(ErrorFunc&& errorHandler) KJ_WARN_UNUSED_RESULT; 216 // Equivalent to `.then(identityFunc, errorHandler)`, where `identifyFunc` is a function that 217 // just returns its input. 218 219 T wait(WaitScope& waitScope); 220 // Run the event loop until the promise is fulfilled, then return its result. If the promise 221 // is rejected, throw an exception. 222 // 223 // wait() is primarily useful at the top level of a program -- typically, within the function 224 // that allocated the EventLoop. For example, a program that performs one or two RPCs and then 225 // exits would likely use wait() in its main() function to wait on each RPC. On the other hand, 226 // server-side code generally cannot use wait(), because it has to be able to accept multiple 227 // requests at once. 228 // 229 // If the promise is rejected, `wait()` throws an exception. If the program was compiled without 230 // exceptions (-fno-exceptions), this will usually abort. In this case you really should first 231 // use `then()` to set an appropriate handler for the exception case, so that the promise you 232 // actually wait on never throws. 233 // 234 // `waitScope` is an object proving that the caller is in a scope where wait() is allowed. By 235 // convention, any function which might call wait(), or which might call another function which 236 // might call wait(), must take `WaitScope&` as one of its parameters. This is needed for two 237 // reasons: 238 // * `wait()` is not allowed during an event callback, because event callbacks are themselves 239 // called during some other `wait()`, and such recursive `wait()`s would only be able to 240 // complete in LIFO order, which might mean that the outer `wait()` ends up waiting longer 241 // than it is supposed to. To prevent this, a `WaitScope` cannot be constructed or used during 242 // an event callback. 243 // * Since `wait()` runs the event loop, unrelated event callbacks may execute before `wait()` 244 // returns. This means that anyone calling `wait()` must be reentrant -- state may change 245 // around them in arbitrary ways. Therefore, callers really need to know if a function they 246 // are calling might wait(), and the `WaitScope&` parameter makes this clear. 247 // 248 // Usually, there is only one `WaitScope` for each `EventLoop`, and it can only be used at the 249 // top level of the thread owning the loop. Calling `wait()` with this `WaitScope` is what 250 // actually causes the event loop to run at all. This top-level `WaitScope` cannot be used 251 // recursively, so cannot be used within an event callback. 252 // 253 // However, it is possible to obtain a `WaitScope` in lower-level code by using fibers. Use 254 // kj::startFiber() to start some code executing on an alternate call stack. That code will get 255 // its own `WaitScope` allowing it to operate in a synchronous style. In this case, `wait()` 256 // switches back to the main stack in order to run the event loop, returning to the fiber's stack 257 // once the awaited promise resolves. 258 259 bool poll(WaitScope& waitScope); 260 // Returns true if a call to wait() would complete without blocking, false if it would block. 261 // 262 // If the promise is not yet resolved, poll() will pump the event loop and poll for I/O in an 263 // attempt to resolve it. Only when there is nothing left to do will it return false. 264 // 265 // Generally, poll() is most useful in tests. Often, you may want to verify that a promise does 266 // not resolve until some specific event occurs. To do so, poll() the promise before the event to 267 // verify it isn't resolved, then trigger the event, then poll() again to verify that it resolves. 268 // The first poll() verifies that the promise doesn't resolve early, which would otherwise be 269 // hard to do deterministically. The second poll() allows you to check that the promise has 270 // resolved and avoid a wait() that might deadlock in the case that it hasn't. 271 // 272 // poll() is not supported in fibers; it will throw an exception. 273 274 ForkedPromise<T> fork() KJ_WARN_UNUSED_RESULT; 275 // Forks the promise, so that multiple different clients can independently wait on the result. 276 // `T` must be copy-constructable for this to work. Or, in the special case where `T` is 277 // `Own<U>`, `U` must have a method `Own<U> addRef()` which returns a new reference to the same 278 // (or an equivalent) object (probably implemented via reference counting). 279 280 _::SplitTuplePromise<T> split(); 281 // Split a promise for a tuple into a tuple of promises. 282 // 283 // E.g. if you have `Promise<kj::Tuple<T, U>>`, `split()` returns 284 // `kj::Tuple<Promise<T>, Promise<U>>`. 285 286 Promise<T> exclusiveJoin(Promise<T>&& other) KJ_WARN_UNUSED_RESULT; 287 // Return a new promise that resolves when either the original promise resolves or `other` 288 // resolves (whichever comes first). The promise that didn't resolve first is canceled. 289 290 // TODO(someday): inclusiveJoin(), or perhaps just join(), which waits for both completions 291 // and produces a tuple? 292 293 template <typename... Attachments> 294 Promise<T> attach(Attachments&&... attachments) KJ_WARN_UNUSED_RESULT; 295 // "Attaches" one or more movable objects (often, Own<T>s) to the promise, such that they will 296 // be destroyed when the promise resolves. This is useful when a promise's callback contains 297 // pointers into some object and you want to make sure the object still exists when the callback 298 // runs -- after calling then(), use attach() to add necessary objects to the result. 299 300 template <typename ErrorFunc> 301 Promise<T> eagerlyEvaluate(ErrorFunc&& errorHandler) KJ_WARN_UNUSED_RESULT; 302 Promise<T> eagerlyEvaluate(decltype(nullptr)) KJ_WARN_UNUSED_RESULT; 303 // Force eager evaluation of this promise. Use this if you are going to hold on to the promise 304 // for awhile without consuming the result, but you want to make sure that the system actually 305 // processes it. 306 // 307 // `errorHandler` is a function that takes `kj::Exception&&`, like the second parameter to 308 // `then()`, or the parameter to `catch_()`. We make you specify this because otherwise it's 309 // easy to forget to handle errors in a promise that you never use. You may specify nullptr for 310 // the error handler if you are sure that ignoring errors is fine, or if you know that you'll 311 // eventually wait on the promise somewhere. 312 313 template <typename ErrorFunc> 314 void detach(ErrorFunc&& errorHandler); 315 // Allows the promise to continue running in the background until it completes or the 316 // `EventLoop` is destroyed. Be careful when using this: since you can no longer cancel this 317 // promise, you need to make sure that the promise owns all the objects it touches or make sure 318 // those objects outlive the EventLoop. 319 // 320 // `errorHandler` is a function that takes `kj::Exception&&`, like the second parameter to 321 // `then()`, except that it must return void. 322 // 323 // This function exists mainly to implement the Cap'n Proto requirement that RPC calls cannot be 324 // canceled unless the callee explicitly permits it. 325 326 kj::String trace(); 327 // Returns a dump of debug info about this promise. Not for production use. Requires RTTI. 328 // This method does NOT consume the promise as other methods do. 329 330 private: 331 Promise(bool, Own<_::PromiseNode>&& node): PromiseBase(kj::mv(node)) {} 332 // Second parameter prevent ambiguity with immediate-value constructor. 333 334 friend class _::PromiseNode; 335 }; 336 337 template <typename T> 338 class ForkedPromise { 339 // The result of `Promise::fork()` and `EventLoop::fork()`. Allows branches to be created. 340 // Like `Promise<T>`, this is a pass-by-move type. 341 342 public: 343 inline ForkedPromise(decltype(nullptr)) {} 344 345 Promise<T> addBranch(); 346 // Add a new branch to the fork. The branch is equivalent to the original promise. 347 348 bool hasBranches(); 349 // Returns true if there are any branches that haven't been canceled. 350 351 private: 352 Own<_::ForkHub<_::FixVoid<T>>> hub; 353 354 inline ForkedPromise(bool, Own<_::ForkHub<_::FixVoid<T>>>&& hub): hub(kj::mv(hub)) {} 355 356 friend class Promise<T>; 357 friend class EventLoop; 358 }; 359 360 constexpr _::Void READY_NOW = _::Void(); 361 // Use this when you need a Promise<void> that is already fulfilled -- this value can be implicitly 362 // cast to `Promise<void>`. 363 364 constexpr _::NeverDone NEVER_DONE = _::NeverDone(); 365 // The opposite of `READY_NOW`, return this when the promise should never resolve. This can be 366 // implicitly converted to any promise type. You may also call `NEVER_DONE.wait()` to wait 367 // forever (useful for servers). 368 369 template <typename Func> 370 PromiseForResult<Func, void> evalLater(Func&& func) KJ_WARN_UNUSED_RESULT; 371 // Schedule for the given zero-parameter function to be executed in the event loop at some 372 // point in the near future. Returns a Promise for its result -- or, if `func()` itself returns 373 // a promise, `evalLater()` returns a Promise for the result of resolving that promise. 374 // 375 // Example usage: 376 // Promise<int> x = evalLater([]() { return 123; }); 377 // 378 // The above is exactly equivalent to: 379 // Promise<int> x = Promise<void>(READY_NOW).then([]() { return 123; }); 380 // 381 // If the returned promise is destroyed before the callback runs, the callback will be canceled 382 // (never called). 383 // 384 // If you schedule several evaluations with `evalLater` during the same callback, they are 385 // guaranteed to be executed in order. 386 387 template <typename Func> 388 PromiseForResult<Func, void> evalNow(Func&& func) KJ_WARN_UNUSED_RESULT; 389 // Run `func()` and return a promise for its result. `func()` executes before `evalNow()` returns. 390 // If `func()` throws an exception, the exception is caught and wrapped in a promise -- this is the 391 // main reason why `evalNow()` is useful. 392 393 template <typename Func> 394 PromiseForResult<Func, void> evalLast(Func&& func) KJ_WARN_UNUSED_RESULT; 395 // Like `evalLater()`, except that the function doesn't run until the event queue is otherwise 396 // completely empty and the thread is about to suspend waiting for I/O. 397 // 398 // This is useful when you need to perform some disruptive action and you want to make sure that 399 // you don't interrupt some other task between two .then() continuations. For example, say you want 400 // to cancel a read() operation on a socket and know for sure that if any bytes were read, you saw 401 // them. It could be that a read() has completed and bytes have been transferred to the target 402 // buffer, but the .then() callback that handles the read result hasn't executed yet. If you 403 // cancel the promise at this inopportune moment, the bytes in the buffer are lost. If you do 404 // evalLast(), then you can be sure that any pending .then() callbacks had a chance to finish out 405 // and if you didn't receive the read result yet, then you know nothing has been read, and you can 406 // simply drop the promise. 407 // 408 // If evalLast() is called multiple times, functions are executed in LIFO order. If the first 409 // callback enqueues new events, then latter callbacks will not execute until those events are 410 // drained. 411 412 ArrayPtr<void* const> getAsyncTrace(ArrayPtr<void*> space); 413 kj::String getAsyncTrace(); 414 // If the event loop is currently running in this thread, get a trace back through the promise 415 // chain leading to the currently-executing event. The format is the same as kj::getStackTrace() 416 // from exception.c++. 417 418 template <typename Func> 419 PromiseForResult<Func, void> retryOnDisconnect(Func&& func) KJ_WARN_UNUSED_RESULT; 420 // Promises to run `func()` asynchronously, retrying once if it fails with a DISCONNECTED exception. 421 // If the retry also fails, the exception is passed through. 422 // 423 // `func()` should return a `Promise`. `retryOnDisconnect(func)` returns the same promise, except 424 // with the retry logic added. 425 426 template <typename Func> 427 PromiseForResult<Func, WaitScope&> startFiber(size_t stackSize, Func&& func) KJ_WARN_UNUSED_RESULT; 428 // Executes `func()` in a fiber, returning a promise for the eventual reseult. `func()` will be 429 // passed a `WaitScope&` as its parameter, allowing it to call `.wait()` on promises. Thus, `func()` 430 // can be written in a synchronous, blocking style, instead of using `.then()`. This is often much 431 // easier to write and read, and may even be significantly faster if it allows the use of stack 432 // allocation rather than heap allocation. 433 // 434 // However, fibers have a major disadvantage: memory must be allocated for the fiber's call stack. 435 // The entire stack must be allocated at once, making it necessary to choose a stack size upfront 436 // that is big enough for whatever the fiber needs to do. Estimating this is often difficult. That 437 // said, over-estimating is not too terrible since pages of the stack will actually be allocated 438 // lazily when first accessed; actual memory usage will correspond to the "high watermark" of the 439 // actual stack usage. That said, this lazy allocation forces page faults, which can be quite slow. 440 // Worse, freeing a stack forces a TLB flush and shootdown -- all currently-executing threads will 441 // have to be interrupted to flush their CPU cores' TLB caches. 442 // 443 // In short, when performance matters, you should try to avoid creating fibers very frequently. 444 445 class FiberPool final { 446 // A freelist pool of fibers with a set stack size. This improves CPU usage with fibers at 447 // the expense of memory usage. Fibers in this pool will always use the max amount of memory 448 // used until the pool is destroyed. 449 450 public: 451 explicit FiberPool(size_t stackSize); 452 ~FiberPool() noexcept(false); 453 KJ_DISALLOW_COPY(FiberPool); 454 455 void setMaxFreelist(size_t count); 456 // Set the maximum number of stacks to add to the freelist. If the freelist is full, stacks will 457 // be deleted rather than returned to the freelist. 458 459 void useCoreLocalFreelists(); 460 // EXPERIMENTAL: Call to tell FiberPool to try to use core-local stack freelists, which 461 // in theory should increase L1/L2 cache efficacy for freelisted stacks. In practice, as of 462 // this writing, no performance advantage has yet been demonstrated. Note that currently this 463 // feature is only supported on Linux (the flag has no effect on other operating systems). 464 465 template <typename Func> 466 PromiseForResult<Func, WaitScope&> startFiber(Func&& func) const KJ_WARN_UNUSED_RESULT; 467 // Executes `func()` in a fiber from this pool, returning a promise for the eventual result. 468 // `func()` will be passed a `WaitScope&` as its parameter, allowing it to call `.wait()` on 469 // promises. Thus, `func()` can be written in a synchronous, blocking style, instead of 470 // using `.then()`. This is often much easier to write and read, and may even be significantly 471 // faster if it allows the use of stack allocation rather than heap allocation. 472 473 void runSynchronously(kj::FunctionParam<void()> func) const; 474 // Use one of the stacks in the pool to synchronously execute func(), returning the result that 475 // func() returns. This is not the usual use case for fibers, but can be a nice optimization 476 // in programs that have many threads that mostly only need small stacks, but occasionally need 477 // a much bigger stack to run some deeply recursive algorithm. If the algorithm is run on each 478 // thread's normal call stack, then every thread's stack will tend to grow to be very big 479 // (usually, stacks automatically grow as needed, but do not shrink until the thread exits 480 // completely). If the thread can share a small set of big stacks that they use only when calling 481 // the deeply recursive algorithm, and use small stacks for everything else, overall memory usage 482 // is reduced. 483 // 484 // TODO(someday): If func() returns a value, return it from runSynchronously? Current use case 485 // doesn't need it. 486 487 size_t getFreelistSize() const; 488 // Get the number of stacks currently in the freelist. Does not count stacks that are active. 489 490 private: 491 class Impl; 492 Own<Impl> impl; 493 494 friend class _::FiberStack; 495 friend class _::FiberBase; 496 }; 497 498 template <typename T> 499 Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises); 500 // Join an array of promises into a promise for an array. 501 502 // ======================================================================================= 503 // Hack for creating a lambda that holds an owned pointer. 504 505 template <typename Func, typename MovedParam> 506 class CaptureByMove { 507 public: 508 inline CaptureByMove(Func&& func, MovedParam&& param) 509 : func(kj::mv(func)), param(kj::mv(param)) {} 510 511 template <typename... Params> 512 inline auto operator()(Params&&... params) 513 -> decltype(kj::instance<Func>()(kj::instance<MovedParam&&>(), kj::fwd<Params>(params)...)) { 514 return func(kj::mv(param), kj::fwd<Params>(params)...); 515 } 516 517 private: 518 Func func; 519 MovedParam param; 520 }; 521 522 template <typename Func, typename MovedParam> 523 inline CaptureByMove<Func, Decay<MovedParam>> mvCapture(MovedParam&& param, Func&& func) { 524 // Hack to create a "lambda" which captures a variable by moving it rather than copying or 525 // referencing. C++14 generalized captures should make this obsolete, but for now in C++11 this 526 // is commonly needed for Promise continuations that own their state. Example usage: 527 // 528 // Own<Foo> ptr = makeFoo(); 529 // Promise<int> promise = callRpc(); 530 // promise.then(mvCapture(ptr, [](Own<Foo>&& ptr, int result) { 531 // return ptr->finish(result); 532 // })); 533 534 return CaptureByMove<Func, Decay<MovedParam>>(kj::fwd<Func>(func), kj::mv(param)); 535 } 536 537 // ======================================================================================= 538 // Advanced promise construction 539 540 class PromiseRejector { 541 // Superclass of PromiseFulfiller containing the non-typed methods. Useful when you only really 542 // need to be able to reject a promise, and you need to operate on fulfillers of different types. 543 public: 544 virtual void reject(Exception&& exception) = 0; 545 virtual bool isWaiting() = 0; 546 }; 547 548 template <typename T> 549 class PromiseFulfiller: public PromiseRejector { 550 // A callback which can be used to fulfill a promise. Only the first call to fulfill() or 551 // reject() matters; subsequent calls are ignored. 552 553 public: 554 virtual void fulfill(T&& value) = 0; 555 // Fulfill the promise with the given value. 556 557 virtual void reject(Exception&& exception) = 0; 558 // Reject the promise with an error. 559 560 virtual bool isWaiting() = 0; 561 // Returns true if the promise is still unfulfilled and someone is potentially waiting for it. 562 // Returns false if fulfill()/reject() has already been called *or* if the promise to be 563 // fulfilled has been discarded and therefore the result will never be used anyway. 564 565 template <typename Func> 566 bool rejectIfThrows(Func&& func); 567 // Call the function (with no arguments) and return true. If an exception is thrown, call 568 // `fulfiller.reject()` and then return false. When compiled with exceptions disabled, 569 // non-fatal exceptions are still detected and handled correctly. 570 }; 571 572 template <> 573 class PromiseFulfiller<void>: public PromiseRejector { 574 // Specialization of PromiseFulfiller for void promises. See PromiseFulfiller<T>. 575 576 public: 577 virtual void fulfill(_::Void&& value = _::Void()) = 0; 578 // Call with zero parameters. The parameter is a dummy that only exists so that subclasses don't 579 // have to specialize for <void>. 580 581 virtual void reject(Exception&& exception) = 0; 582 virtual bool isWaiting() = 0; 583 584 template <typename Func> 585 bool rejectIfThrows(Func&& func); 586 }; 587 588 template <typename T, typename Adapter, typename... Params> 589 _::ReducePromises<T> newAdaptedPromise(Params&&... adapterConstructorParams); 590 // Creates a new promise which owns an instance of `Adapter` which encapsulates the operation 591 // that will eventually fulfill the promise. This is primarily useful for adapting non-KJ 592 // asynchronous APIs to use promises. 593 // 594 // An instance of `Adapter` will be allocated and owned by the returned `Promise`. A 595 // `PromiseFulfiller<T>&` will be passed as the first parameter to the adapter's constructor, 596 // and `adapterConstructorParams` will be forwarded as the subsequent parameters. The adapter 597 // is expected to perform some asynchronous operation and call the `PromiseFulfiller<T>` once 598 // it is finished. 599 // 600 // The adapter is destroyed when its owning Promise is destroyed. This may occur before the 601 // Promise has been fulfilled. In this case, the adapter's destructor should cancel the 602 // asynchronous operation. Once the adapter is destroyed, the fulfillment callback cannot be 603 // called. 604 // 605 // An adapter implementation should be carefully written to ensure that it cannot accidentally 606 // be left unfulfilled permanently because of an exception. Consider making liberal use of 607 // `PromiseFulfiller<T>::rejectIfThrows()`. 608 609 template <typename T> 610 struct PromiseFulfillerPair { 611 _::ReducePromises<T> promise; 612 Own<PromiseFulfiller<T>> fulfiller; 613 }; 614 615 template <typename T> 616 PromiseFulfillerPair<T> newPromiseAndFulfiller(); 617 // Construct a Promise and a separate PromiseFulfiller which can be used to fulfill the promise. 618 // If the PromiseFulfiller is destroyed before either of its methods are called, the Promise is 619 // implicitly rejected. 620 // 621 // Although this function is easier to use than `newAdaptedPromise()`, it has the serious drawback 622 // that there is no way to handle cancellation (i.e. detect when the Promise is discarded). 623 // 624 // You can arrange to fulfill a promise with another promise by using a promise type for T. E.g. 625 // `newPromiseAndFulfiller<Promise<U>>()` will produce a promise of type `Promise<U>` but the 626 // fulfiller will be of type `PromiseFulfiller<Promise<U>>`. Thus you pass a `Promise<U>` to the 627 // `fulfill()` callback, and the promises are chained. 628 629 template <typename T> 630 class CrossThreadPromiseFulfiller: public kj::PromiseFulfiller<T> { 631 // Like PromiseFulfiller<T> but the methods are `const`, indicating they can safely be called 632 // from another thread. 633 634 public: 635 virtual void fulfill(T&& value) const = 0; 636 virtual void reject(Exception&& exception) const = 0; 637 virtual bool isWaiting() const = 0; 638 639 void fulfill(T&& value) override { return constThis()->fulfill(kj::fwd<T>(value)); } 640 void reject(Exception&& exception) override { return constThis()->reject(kj::mv(exception)); } 641 bool isWaiting() override { return constThis()->isWaiting(); } 642 643 private: 644 const CrossThreadPromiseFulfiller* constThis() { return this; } 645 }; 646 647 template <> 648 class CrossThreadPromiseFulfiller<void>: public kj::PromiseFulfiller<void> { 649 // Specialization of CrossThreadPromiseFulfiller for void promises. See 650 // CrossThreadPromiseFulfiller<T>. 651 652 public: 653 virtual void fulfill(_::Void&& value = _::Void()) const = 0; 654 virtual void reject(Exception&& exception) const = 0; 655 virtual bool isWaiting() const = 0; 656 657 void fulfill(_::Void&& value) override { return constThis()->fulfill(kj::mv(value)); } 658 void reject(Exception&& exception) override { return constThis()->reject(kj::mv(exception)); } 659 bool isWaiting() override { return constThis()->isWaiting(); } 660 661 private: 662 const CrossThreadPromiseFulfiller* constThis() { return this; } 663 }; 664 665 template <typename T> 666 struct PromiseCrossThreadFulfillerPair { 667 _::ReducePromises<T> promise; 668 Own<CrossThreadPromiseFulfiller<T>> fulfiller; 669 }; 670 671 template <typename T> 672 PromiseCrossThreadFulfillerPair<T> newPromiseAndCrossThreadFulfiller(); 673 // Like `newPromiseAndFulfiller()`, but the fulfiller is allowed to be invoked from any thread, 674 // not just the one that called this method. Note that the Promise is still tied to the calling 675 // thread's event loop and *cannot* be used from another thread -- only the PromiseFulfiller is 676 // cross-thread. 677 678 // ======================================================================================= 679 // Canceler 680 681 class Canceler { 682 // A Canceler can wrap some set of Promises and then forcefully cancel them on-demand, or 683 // implicitly when the Canceler is destroyed. 684 // 685 // The cancellation is done in such a way that once cancel() (or the Canceler's destructor) 686 // returns, it's guaranteed that the promise has already been canceled and destroyed. This 687 // guarantee is important for enforcing ownership constraints. For example, imagine that Alice 688 // calls a method on Bob that returns a Promise. That Promise encapsulates a task that uses Bob's 689 // internal state. But, imagine that Alice does not own Bob, and indeed Bob might be destroyed 690 // at random without Alice having canceled the promise. In this case, it is necessary for Bob to 691 // ensure that the promise will be forcefully canceled. Bob can do this by constructing a 692 // Canceler and using it to wrap promises before returning them to callers. When Bob is 693 // destroyed, the Canceler is destroyed too, and all promises Bob wrapped with it throw errors. 694 // 695 // Note that another common strategy for cancelation is to use exclusiveJoin() to join a promise 696 // with some "cancellation promise" which only resolves if the operation should be canceled. The 697 // cancellation promise could itself be created by newPromiseAndFulfiller<void>(), and thus 698 // calling the PromiseFulfiller cancels the operation. There is a major problem with this 699 // approach: upon invoking the fulfiller, an arbitrary amount of time may pass before the 700 // exclusive-joined promise actually resolves and cancels its other fork. During that time, the 701 // task might continue to execute. If it holds pointers to objects that have been destroyed, this 702 // might cause segfaults. Thus, it is safer to use a Canceler. 703 704 public: 705 inline Canceler() {} 706 ~Canceler() noexcept(false); 707 KJ_DISALLOW_COPY(Canceler); 708 709 template <typename T> 710 Promise<T> wrap(Promise<T> promise) { 711 return newAdaptedPromise<T, AdapterImpl<T>>(*this, kj::mv(promise)); 712 } 713 714 void cancel(StringPtr cancelReason); 715 void cancel(const Exception& exception); 716 // Cancel all previously-wrapped promises that have not already completed, causing them to throw 717 // the given exception. If you provide just a description message instead of an exception, then 718 // an exception object will be constructed from it -- but only if there are requests to cancel. 719 720 void release(); 721 // Releases previously-wrapped promises, so that they will not be canceled regardless of what 722 // happens to this Canceler. 723 724 bool isEmpty() const { return list == nullptr; } 725 // Indicates if any previously-wrapped promises are still executing. (If this returns true, then 726 // cancel() would be a no-op.) 727 728 private: 729 class AdapterBase { 730 public: 731 AdapterBase(Canceler& canceler); 732 ~AdapterBase() noexcept(false); 733 734 virtual void cancel(Exception&& e) = 0; 735 736 void unlink(); 737 738 private: 739 Maybe<Maybe<AdapterBase&>&> prev; 740 Maybe<AdapterBase&> next; 741 friend class Canceler; 742 }; 743 744 template <typename T> 745 class AdapterImpl: public AdapterBase { 746 public: 747 AdapterImpl(PromiseFulfiller<T>& fulfiller, 748 Canceler& canceler, Promise<T> inner) 749 : AdapterBase(canceler), 750 fulfiller(fulfiller), 751 inner(inner.then( 752 [&fulfiller](T&& value) { fulfiller.fulfill(kj::mv(value)); }, 753 [&fulfiller](Exception&& e) { fulfiller.reject(kj::mv(e)); }) 754 .eagerlyEvaluate(nullptr)) {} 755 756 void cancel(Exception&& e) override { 757 fulfiller.reject(kj::mv(e)); 758 inner = nullptr; 759 } 760 761 private: 762 PromiseFulfiller<T>& fulfiller; 763 Promise<void> inner; 764 }; 765 766 Maybe<AdapterBase&> list; 767 }; 768 769 template <> 770 class Canceler::AdapterImpl<void>: public AdapterBase { 771 public: 772 AdapterImpl(kj::PromiseFulfiller<void>& fulfiller, 773 Canceler& canceler, kj::Promise<void> inner); 774 void cancel(kj::Exception&& e) override; 775 // These must be defined in async.c++ to prevent translation units compiled by MSVC from trying to 776 // link with symbols defined in async.c++ merely because they included async.h. 777 778 private: 779 kj::PromiseFulfiller<void>& fulfiller; 780 kj::Promise<void> inner; 781 }; 782 783 // ======================================================================================= 784 // TaskSet 785 786 class TaskSet { 787 // Holds a collection of Promise<void>s and ensures that each executes to completion. Memory 788 // associated with each promise is automatically freed when the promise completes. Destroying 789 // the TaskSet itself automatically cancels all unfinished promises. 790 // 791 // This is useful for "daemon" objects that perform background tasks which aren't intended to 792 // fulfill any particular external promise, but which may need to be canceled (and thus can't 793 // use `Promise::detach()`). The daemon object holds a TaskSet to collect these tasks it is 794 // working on. This way, if the daemon itself is destroyed, the TaskSet is detroyed as well, 795 // and everything the daemon is doing is canceled. 796 797 public: 798 class ErrorHandler { 799 public: 800 virtual void taskFailed(kj::Exception&& exception) = 0; 801 }; 802 803 TaskSet(ErrorHandler& errorHandler); 804 // `errorHandler` will be executed any time a task throws an exception, and will execute within 805 // the given EventLoop. 806 807 ~TaskSet() noexcept(false); 808 809 void add(Promise<void>&& promise); 810 811 kj::String trace(); 812 // Return debug info about all promises currently in the TaskSet. 813 814 bool isEmpty() { return tasks == nullptr; } 815 // Check if any tasks are running. 816 817 Promise<void> onEmpty(); 818 // Returns a promise that fulfills the next time the TaskSet is empty. Only one such promise can 819 // exist at a time. 820 821 private: 822 class Task; 823 824 TaskSet::ErrorHandler& errorHandler; 825 Maybe<Own<Task>> tasks; 826 Maybe<Own<PromiseFulfiller<void>>> emptyFulfiller; 827 }; 828 829 // ======================================================================================= 830 // Cross-thread execution. 831 832 class Executor { 833 // Executes code on another thread's event loop. 834 // 835 // Use `kj::getCurrentThreadExecutor()` to get an executor that schedules calls on the current 836 // thread's event loop. You may then pass the reference to other threads to enable them to call 837 // back to this one. 838 839 public: 840 Executor(EventLoop& loop, Badge<EventLoop>); 841 ~Executor() noexcept(false); 842 843 virtual kj::Own<const Executor> addRef() const = 0; 844 // Add a reference to this Executor. The Executor will not be destroyed until all references are 845 // dropped. This uses atomic refcounting for thread-safety. 846 // 847 // Use this when you can't guarantee that the target thread's event loop won't concurrently exit 848 // (including due to an uncaught exception!) while another thread is still using the Executor. 849 // Otherwise, the Executor object is destroyed when the owning event loop exits. 850 // 851 // If the target event loop has exited, then `execute{Async,Sync}` will throw DISCONNECTED 852 // exceptions. 853 854 bool isLive() const; 855 // Returns true if the remote event loop still exists, false if it has been destroyed. In the 856 // latter case, `execute{Async,Sync}()` will definitely throw. Of course, if this returns true, 857 // it could still change to false at any moment, and `execute{Async,Sync}()` could still throw as 858 // a result. 859 // 860 // TODO(cleanup): Should we have tryExecute{Async,Sync}() that return Maybes that are null if 861 // the remote event loop exited? Currently there are multiple known use cases that check 862 // isLive() after catching a DISCONNECTED exception to decide whether it is due to the executor 863 // exiting, and then handling that case. This is borderline in violation of KJ exception 864 // philosophy, but right now I'm not excited about the extra template metaprogramming needed 865 // for "try" versions... 866 867 template <typename Func> 868 PromiseForResult<Func, void> executeAsync(Func&& func) const; 869 // Call from any thread to request that the given function be executed on the executor's thread, 870 // returning a promise for the result. 871 // 872 // The Promise returned by executeAsync() belongs to the requesting thread, not the executor 873 // thread. Hence, for example, continuations added to this promise with .then() will execute in 874 // the requesting thread. 875 // 876 // If func() itself returns a Promise, that Promise is *not* returned verbatim to the requesting 877 // thread -- after all, Promise objects cannot be used cross-thread. Instead, the executor thread 878 // awaits the promise. Once it resolves to a final result, that result is transferred to the 879 // requesting thread, resolving the promise that executeAsync() returned earlier. 880 // 881 // `func` will be destroyed in the requesting thread, after the final result has been returned 882 // from the executor thread. This means that it is safe for `func` to capture objects that cannot 883 // safely be destroyed from another thread. It is also safe for `func` to be an lvalue reference, 884 // so long as the functor remains live until the promise completes or is canceled, and the 885 // function is thread-safe. 886 // 887 // Of course, the body of `func` must be careful that any access it makes on these objects is 888 // safe cross-thread. For example, it must not attempt to access Promise-related objects 889 // cross-thread; you cannot create a `PromiseFulfiller` in one thread and then `fulfill()` it 890 // from another. Unfortunately, the usual convention of using const-correctness to enforce 891 // thread-safety does not work here, because applications can often ensure that `func` has 892 // exclusive access to captured objects, and thus can safely mutate them even in non-thread-safe 893 // ways; the const qualifier is not sufficient to express this. 894 // 895 // The final return value of `func` is transferred between threads, and hence is constructed and 896 // destroyed in separate threads. It is the app's responsibility to make sure this is OK. 897 // Alternatively, the app can perhaps arrange to send the return value back to the original 898 // thread for destruction, if needed. 899 // 900 // If the requesting thread destroys the returned Promise, the destructor will block waiting for 901 // the executor thread to acknowledge cancellation. This ensures that `func` can be destroyed 902 // before the Promise's destructor returns. 903 // 904 // Multiple calls to executeAsync() from the same requesting thread to the same target thread 905 // will be delivered in the same order in which they were requested. (However, if func() returns 906 // a promise, delivery of subsequent calls is not blocked on that promise. In other words, this 907 // call provides E-Order in the same way as Cap'n Proto.) 908 909 template <typename Func> 910 _::UnwrapPromise<PromiseForResult<Func, void>> executeSync(Func&& func) const; 911 // Schedules `func()` to execute on the executor thread, and then blocks the requesting thread 912 // until `func()` completes. If `func()` returns a Promise, then the wait will continue until 913 // that promise resolves, and the final result will be returned to the requesting thread. 914 // 915 // The requesting thread does not need to have an EventLoop. If it does have an EventLoop, that 916 // loop will *not* execute while the thread is blocked. This method is particularly useful to 917 // allow non-event-loop threads to perform I/O via a separate event-loop thread. 918 // 919 // As with `executeAsync()`, `func` is always destroyed on the requesting thread, after the 920 // executor thread has signaled completion. The return value is transferred between threads. 921 922 private: 923 struct Impl; 924 Own<Impl> impl; 925 // To avoid including mutex.h... 926 927 friend class EventLoop; 928 friend class _::XThreadEvent; 929 friend class _::XThreadPaf; 930 931 void send(_::XThreadEvent& event, bool sync) const; 932 void wait(); 933 bool poll(); 934 935 EventLoop& getLoop() const; 936 }; 937 938 const Executor& getCurrentThreadExecutor(); 939 // Get the executor for the current thread's event loop. This reference can then be passed to other 940 // threads. 941 942 // ======================================================================================= 943 // The EventLoop class 944 945 class EventPort { 946 // Interfaces between an `EventLoop` and events originating from outside of the loop's thread. 947 // All such events come in through the `EventPort` implementation. 948 // 949 // An `EventPort` implementation may interface with low-level operating system APIs and/or other 950 // threads. You can also write an `EventPort` which wraps some other (non-KJ) event loop 951 // framework, allowing the two to coexist in a single thread. 952 953 public: 954 virtual bool wait() = 0; 955 // Wait for an external event to arrive, sleeping if necessary. Once at least one event has 956 // arrived, queue it to the event loop (e.g. by fulfilling a promise) and return. 957 // 958 // This is called during `Promise::wait()` whenever the event queue becomes empty, in order to 959 // wait for new events to populate the queue. 960 // 961 // It is safe to return even if nothing has actually been queued, so long as calling `wait()` in 962 // a loop will eventually sleep. (That is to say, false positives are fine.) 963 // 964 // Returns true if wake() has been called from another thread. (Precisely, returns true if 965 // no previous call to wait `wait()` nor `poll()` has returned true since `wake()` was last 966 // called.) 967 968 virtual bool poll() = 0; 969 // Check if any external events have arrived, but do not sleep. If any events have arrived, 970 // add them to the event queue (e.g. by fulfilling promises) before returning. 971 // 972 // This may be called during `Promise::wait()` when the EventLoop has been executing for a while 973 // without a break but is still non-empty. 974 // 975 // Returns true if wake() has been called from another thread. (Precisely, returns true if 976 // no previous call to wait `wait()` nor `poll()` has returned true since `wake()` was last 977 // called.) 978 979 virtual void setRunnable(bool runnable); 980 // Called to notify the `EventPort` when the `EventLoop` has work to do; specifically when it 981 // transitions from empty -> runnable or runnable -> empty. This is typically useful when 982 // integrating with an external event loop; if the loop is currently runnable then you should 983 // arrange to call run() on it soon. The default implementation does nothing. 984 985 virtual void wake() const; 986 // Wake up the EventPort's thread from another thread. 987 // 988 // Unlike all other methods on this interface, `wake()` may be called from another thread, hence 989 // it is `const`. 990 // 991 // Technically speaking, `wake()` causes the target thread to cease sleeping and not to sleep 992 // again until `wait()` or `poll()` has returned true at least once. 993 // 994 // The default implementation throws an UNIMPLEMENTED exception. 995 }; 996 997 class EventLoop { 998 // Represents a queue of events being executed in a loop. Most code won't interact with 999 // EventLoop directly, but instead use `Promise`s to interact with it indirectly. See the 1000 // documentation for `Promise`. 1001 // 1002 // Each thread can have at most one current EventLoop. To make an `EventLoop` current for 1003 // the thread, create a `WaitScope`. Async APIs require that the thread has a current EventLoop, 1004 // or they will throw exceptions. APIs that use `Promise::wait()` additionally must explicitly 1005 // be passed a reference to the `WaitScope` to make the caller aware that they might block. 1006 // 1007 // Generally, you will want to construct an `EventLoop` at the top level of your program, e.g. 1008 // in the main() function, or in the start function of a thread. You can then use it to 1009 // construct some promises and wait on the result. Example: 1010 // 1011 // int main() { 1012 // // `loop` becomes the official EventLoop for the thread. 1013 // MyEventPort eventPort; 1014 // EventLoop loop(eventPort); 1015 // 1016 // // Now we can call an async function. 1017 // Promise<String> textPromise = getHttp("http://example.com"); 1018 // 1019 // // And we can wait for the promise to complete. Note that you can only use `wait()` 1020 // // from the top level, not from inside a promise callback. 1021 // String text = textPromise.wait(); 1022 // print(text); 1023 // return 0; 1024 // } 1025 // 1026 // Most applications that do I/O will prefer to use `setupAsyncIo()` from `async-io.h` rather 1027 // than allocate an `EventLoop` directly. 1028 1029 public: 1030 EventLoop(); 1031 // Construct an `EventLoop` which does not receive external events at all. 1032 1033 explicit EventLoop(EventPort& port); 1034 // Construct an `EventLoop` which receives external events through the given `EventPort`. 1035 1036 ~EventLoop() noexcept(false); 1037 1038 void run(uint maxTurnCount = maxValue); 1039 // Run the event loop for `maxTurnCount` turns or until there is nothing left to be done, 1040 // whichever comes first. This never calls the `EventPort`'s `sleep()` or `poll()`. It will 1041 // call the `EventPort`'s `setRunnable(false)` if the queue becomes empty. 1042 1043 bool isRunnable(); 1044 // Returns true if run() would currently do anything, or false if the queue is empty. 1045 1046 const Executor& getExecutor(); 1047 // Returns an Executor that can be used to schedule events on this EventLoop from another thread. 1048 // 1049 // Use the global function kj::getCurrentThreadExecutor() to get the current thread's EventLoop's 1050 // Executor. 1051 // 1052 // Note that this is only needed for cross-thread scheduling. To schedule code to run later in 1053 // the current thread, use `kj::evalLater()`, which will be more efficient. 1054 1055 private: 1056 kj::Maybe<EventPort&> port; 1057 // If null, this thread doesn't receive I/O events from the OS. It can potentially receive 1058 // events from other threads via the Executor. 1059 1060 bool running = false; 1061 // True while looping -- wait() is then not allowed. 1062 1063 bool lastRunnableState = false; 1064 // What did we last pass to port.setRunnable()? 1065 1066 _::Event* head = nullptr; 1067 _::Event** tail = &head; 1068 _::Event** depthFirstInsertPoint = &head; 1069 _::Event** breadthFirstInsertPoint = &head; 1070 1071 kj::Maybe<Own<Executor>> executor; 1072 // Allocated the first time getExecutor() is requested, making cross-thread request possible. 1073 1074 Own<TaskSet> daemons; 1075 1076 _::Event* currentlyFiring = nullptr; 1077 1078 bool turn(); 1079 void setRunnable(bool runnable); 1080 void enterScope(); 1081 void leaveScope(); 1082 1083 void wait(); 1084 void poll(); 1085 1086 friend void _::detach(kj::Promise<void>&& promise); 1087 friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, 1088 WaitScope& waitScope); 1089 friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope); 1090 friend class _::Event; 1091 friend class WaitScope; 1092 friend class Executor; 1093 friend class _::XThreadEvent; 1094 friend class _::XThreadPaf; 1095 friend class _::FiberBase; 1096 friend class _::FiberStack; 1097 friend ArrayPtr<void* const> getAsyncTrace(ArrayPtr<void*> space); 1098 }; 1099 1100 class WaitScope { 1101 // Represents a scope in which asynchronous programming can occur. A `WaitScope` should usually 1102 // be allocated on the stack and serves two purposes: 1103 // * While the `WaitScope` exists, its `EventLoop` is registered as the current loop for the 1104 // thread. Most operations dealing with `Promise` (including all of its methods) do not work 1105 // unless the thread has a current `EventLoop`. 1106 // * `WaitScope` may be passed to `Promise::wait()` to synchronously wait for a particular 1107 // promise to complete. See `Promise::wait()` for an extended discussion. 1108 1109 public: 1110 inline explicit WaitScope(EventLoop& loop): loop(loop) { loop.enterScope(); } 1111 inline ~WaitScope() { if (fiber == nullptr) loop.leaveScope(); } 1112 KJ_DISALLOW_COPY(WaitScope); 1113 1114 void poll(); 1115 // Pumps the event queue and polls for I/O until there's nothing left to do (without blocking). 1116 // 1117 // Not supported in fibers. 1118 1119 void setBusyPollInterval(uint count) { busyPollInterval = count; } 1120 // Set the maximum number of events to run in a row before calling poll() on the EventPort to 1121 // check for new I/O. 1122 // 1123 // This has no effect when used in a fiber. 1124 1125 void runEventCallbacksOnStackPool(kj::Maybe<const FiberPool&> pool) { runningStacksPool = pool; } 1126 // Arranges to switch stacks while event callbacks are executing. This is an optimization that 1127 // is useful for programs that use extremely high thread counts, where each thread has its own 1128 // event loop, but each thread has relatively low event throughput, i.e. each thread spends 1129 // most of its time waiting for I/O. Normally, the biggest problem with having lots of threads 1130 // is that each thread must allocate a stack, and stacks can take a lot of memory if the 1131 // application commonly makes deep calls. But, most of that stack space is only needed while 1132 // the thread is executing, not while it's sleeping. So, if threads only switch to a big stack 1133 // during execution, switching back when it's time to sleep, and if those stacks are freelisted 1134 // so that they can be shared among threads, then a lot of memory is saved. 1135 // 1136 // We use the `FiberPool` type here because it implements a freelist of stacks, which is exactly 1137 // what we happen to want! In our case, though, we don't use those stacks to implement fibers; 1138 // we use them as the main thread stack. 1139 // 1140 // This has no effect if this WaitScope itself is for a fiber. 1141 // 1142 // Pass `nullptr` as the parameter to go back to running events on the main stack. 1143 1144 void cancelAllDetached(); 1145 // HACK: Immediately cancel all detached promises. 1146 // 1147 // New code should not use detached promises, and therefore should not need this. 1148 // 1149 // This method exists to help existing code deal with the problems of detached promises, 1150 // especially at teardown time. 1151 // 1152 // This method may be removed in the future. 1153 1154 private: 1155 EventLoop& loop; 1156 uint busyPollInterval = kj::maxValue; 1157 1158 kj::Maybe<_::FiberBase&> fiber; 1159 kj::Maybe<const FiberPool&> runningStacksPool; 1160 1161 explicit WaitScope(EventLoop& loop, _::FiberBase& fiber) 1162 : loop(loop), fiber(fiber) {} 1163 1164 template <typename Func> 1165 inline void runOnStackPool(Func&& func) { 1166 KJ_IF_MAYBE(pool, runningStacksPool) { 1167 pool->runSynchronously(kj::fwd<Func>(func)); 1168 } else { 1169 func(); 1170 } 1171 } 1172 1173 friend class EventLoop; 1174 friend class _::FiberBase; 1175 friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, 1176 WaitScope& waitScope); 1177 friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope); 1178 }; 1179 1180 } // namespace kj 1181 1182 #define KJ_ASYNC_H_INCLUDED 1183 #include "async-inl.h" 1184 1185 KJ_END_HEADER