async-inl.h (72965B)
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 // This file contains extended inline implementation details that are required along with async.h. 23 // We move this all into a separate file to make async.h more readable. 24 // 25 // Non-inline declarations here are defined in async.c++. 26 27 #pragma once 28 29 #ifndef KJ_ASYNC_H_INCLUDED 30 #error "Do not include this directly; include kj/async.h." 31 #include "async.h" // help IDE parse this file 32 #endif 33 34 KJ_BEGIN_HEADER 35 36 #include "list.h" 37 38 namespace kj { 39 namespace _ { // private 40 41 template <typename T> 42 class ExceptionOr; 43 44 class ExceptionOrValue { 45 public: 46 ExceptionOrValue(bool, Exception&& exception): exception(kj::mv(exception)) {} 47 KJ_DISALLOW_COPY(ExceptionOrValue); 48 49 void addException(Exception&& exception) { 50 if (this->exception == nullptr) { 51 this->exception = kj::mv(exception); 52 } 53 } 54 55 template <typename T> 56 ExceptionOr<T>& as() { return *static_cast<ExceptionOr<T>*>(this); } 57 template <typename T> 58 const ExceptionOr<T>& as() const { return *static_cast<const ExceptionOr<T>*>(this); } 59 60 Maybe<Exception> exception; 61 62 protected: 63 // Allow subclasses to have move constructor / assignment. 64 ExceptionOrValue() = default; 65 ExceptionOrValue(ExceptionOrValue&& other) = default; 66 ExceptionOrValue& operator=(ExceptionOrValue&& other) = default; 67 }; 68 69 template <typename T> 70 class ExceptionOr: public ExceptionOrValue { 71 public: 72 ExceptionOr() = default; 73 ExceptionOr(T&& value): value(kj::mv(value)) {} 74 ExceptionOr(bool, Exception&& exception): ExceptionOrValue(false, kj::mv(exception)) {} 75 ExceptionOr(ExceptionOr&&) = default; 76 ExceptionOr& operator=(ExceptionOr&&) = default; 77 78 Maybe<T> value; 79 }; 80 81 template <typename T> 82 inline T convertToReturn(ExceptionOr<T>&& result) { 83 KJ_IF_MAYBE(value, result.value) { 84 KJ_IF_MAYBE(exception, result.exception) { 85 throwRecoverableException(kj::mv(*exception)); 86 } 87 return _::returnMaybeVoid(kj::mv(*value)); 88 } else KJ_IF_MAYBE(exception, result.exception) { 89 throwFatalException(kj::mv(*exception)); 90 } else { 91 // Result contained neither a value nor an exception? 92 KJ_UNREACHABLE; 93 } 94 } 95 96 inline void convertToReturn(ExceptionOr<Void>&& result) { 97 // Override <void> case to use throwRecoverableException(). 98 99 if (result.value != nullptr) { 100 KJ_IF_MAYBE(exception, result.exception) { 101 throwRecoverableException(kj::mv(*exception)); 102 } 103 } else KJ_IF_MAYBE(exception, result.exception) { 104 throwRecoverableException(kj::mv(*exception)); 105 } else { 106 // Result contained neither a value nor an exception? 107 KJ_UNREACHABLE; 108 } 109 } 110 111 class TraceBuilder { 112 // Helper for methods that build a call trace. 113 public: 114 TraceBuilder(ArrayPtr<void*> space) 115 : start(space.begin()), current(space.begin()), limit(space.end()) {} 116 117 inline void add(void* addr) { 118 if (current < limit) { 119 *current++ = addr; 120 } 121 } 122 123 inline bool full() const { return current == limit; } 124 125 ArrayPtr<void*> finish() { 126 return arrayPtr(start, current); 127 } 128 129 String toString(); 130 131 private: 132 void** start; 133 void** current; 134 void** limit; 135 }; 136 137 class Event { 138 // An event waiting to be executed. Not for direct use by applications -- promises use this 139 // internally. 140 141 public: 142 Event(); 143 Event(kj::EventLoop& loop); 144 ~Event() noexcept(false); 145 KJ_DISALLOW_COPY(Event); 146 147 void armDepthFirst(); 148 // Enqueue this event so that `fire()` will be called from the event loop soon. 149 // 150 // Events scheduled in this way are executed in depth-first order: if an event callback arms 151 // more events, those events are placed at the front of the queue (in the order in which they 152 // were armed), so that they run immediately after the first event's callback returns. 153 // 154 // Depth-first event scheduling is appropriate for events that represent simple continuations 155 // of a previous event that should be globbed together for performance. Depth-first scheduling 156 // can lead to starvation, so any long-running task must occasionally yield with 157 // `armBreadthFirst()`. (Promise::then() uses depth-first whereas evalLater() uses 158 // breadth-first.) 159 // 160 // To use breadth-first scheduling instead, use `armBreadthFirst()`. 161 162 void armBreadthFirst(); 163 // Like `armDepthFirst()` except that the event is placed at the end of the queue. 164 165 void armLast(); 166 // Enqueues this event to happen after all other events have run to completion and there is 167 // really nothing left to do except wait for I/O. 168 169 bool isNext(); 170 // True if the Event has been armed and is next in line to be fired. This can be used after 171 // calling PromiseNode::onReady(event) to determine if a promise being waited is immediately 172 // ready, in which case continuations may be optimistically run without returning to the event 173 // loop. Note that this optimization is only valid if we know that we would otherwise immediately 174 // return to the event loop without running more application code. So this turns out to be useful 175 // in fairly narrow circumstances, chiefly when a coroutine is about to suspend, but discovers it 176 // doesn't need to. 177 // 178 // Returns false if the event loop is not currently running. This ensures that promise 179 // continuations don't execute except under a call to .wait(). 180 181 void disarm(); 182 // If the event is armed but hasn't fired, cancel it. (Destroying the event does this 183 // implicitly.) 184 185 virtual void traceEvent(TraceBuilder& builder) = 0; 186 // Build a trace of the callers leading up to this event. `builder` will be populated with 187 // "return addresses" of the promise chain waiting on this event. The return addresses may 188 // actually the addresses of lambdas passed to .then(), but in any case, feeding them into 189 // addr2line should produce useful source code locations. 190 // 191 // `traceEvent()` may be called from an async signal handler while `fire()` is executing. It 192 // must not allocate nor take locks. 193 194 String traceEvent(); 195 // Helper that builds a trace and stringifies it. 196 197 protected: 198 virtual Maybe<Own<Event>> fire() = 0; 199 // Fire the event. Possibly returns a pointer to itself, which will be discarded by the 200 // caller. This is the only way that an event can delete itself as a result of firing, as 201 // doing so from within fire() will throw an exception. 202 203 private: 204 friend class kj::EventLoop; 205 EventLoop& loop; 206 Event* next; 207 Event** prev; 208 bool firing = false; 209 }; 210 211 class PromiseNode { 212 // A Promise<T> contains a chain of PromiseNodes tracking the pending transformations. 213 // 214 // To reduce generated code bloat, PromiseNode is not a template. Instead, it makes very hacky 215 // use of pointers to ExceptionOrValue which actually point to ExceptionOr<T>, but are only 216 // so down-cast in the few places that really need to be templated. Luckily this is all 217 // internal implementation details. 218 219 public: 220 virtual void onReady(Event* event) noexcept = 0; 221 // Arms the given event when ready. 222 // 223 // May be called multiple times. If called again before the event was armed, the old event will 224 // never be armed, only the new one. If called again after the event was armed, the new event 225 // will be armed immediately. Can be called with nullptr to un-register the existing event. 226 227 virtual void setSelfPointer(Own<PromiseNode>* selfPtr) noexcept; 228 // Tells the node that `selfPtr` is the pointer that owns this node, and will continue to own 229 // this node until it is destroyed or setSelfPointer() is called again. ChainPromiseNode uses 230 // this to shorten redundant chains. The default implementation does nothing; only 231 // ChainPromiseNode should implement this. 232 233 virtual void get(ExceptionOrValue& output) noexcept = 0; 234 // Get the result. `output` points to an ExceptionOr<T> into which the result will be written. 235 // Can only be called once, and only after the node is ready. Must be called directly from the 236 // event loop, with no application code on the stack. 237 238 virtual void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) = 0; 239 // Build a trace of this promise chain, showing what it is currently waiting on. 240 // 241 // Since traces are ordered callee-before-caller, PromiseNode::tracePromise() should typically 242 // recurse to its child first, then after the child returns, add itself to the trace. 243 // 244 // If `stopAtNextEvent` is true, then the trace should stop as soon as it hits a PromiseNode that 245 // also implements Event, and should not trace that node or its children. This is used in 246 // conjuction with Event::traceEvent(). The chain of Events is often more sparse than the chain 247 // of PromiseNodes, because a TransformPromiseNode (which implements .then()) is not itself an 248 // Event. TransformPromiseNode instead tells its child node to directly notify its *parent* node 249 // when it is ready, and then TransformPromiseNode applies the .then() transformation during the 250 // call to .get(). 251 // 252 // So, when we trace the chain of Events backwards, we end up hoping over segments of 253 // TransformPromiseNodes (and other similar types). In order to get those added to the trace, 254 // each Event must call back down the PromiseNode chain in the opposite direction, using this 255 // method. 256 // 257 // `tracePromise()` may be called from an async signal handler while `get()` is executing. It 258 // must not allocate nor take locks. 259 260 template <typename T> 261 static Own<PromiseNode> from(T&& promise) { 262 // Given a Promise, extract the PromiseNode. 263 return kj::mv(promise.node); 264 } 265 template <typename T> 266 static PromiseNode& from(T& promise) { 267 // Given a Promise, extract the PromiseNode. 268 return *promise.node; 269 } 270 template <typename T> 271 static T to(Own<PromiseNode>&& node) { 272 // Construct a Promise from a PromiseNode. (T should be a Promise type.) 273 return T(false, kj::mv(node)); 274 } 275 276 protected: 277 class OnReadyEvent { 278 // Helper class for implementing onReady(). 279 280 public: 281 void init(Event* newEvent); 282 283 void arm(); 284 void armBreadthFirst(); 285 // Arms the event if init() has already been called and makes future calls to init() 286 // automatically arm the event. 287 288 inline void traceEvent(TraceBuilder& builder) { 289 if (event != nullptr && !builder.full()) event->traceEvent(builder); 290 } 291 292 private: 293 Event* event = nullptr; 294 }; 295 }; 296 297 // ------------------------------------------------------------------- 298 299 template <typename T> 300 inline NeverDone::operator Promise<T>() const { 301 return PromiseNode::to<Promise<T>>(neverDone()); 302 } 303 304 // ------------------------------------------------------------------- 305 306 class ImmediatePromiseNodeBase: public PromiseNode { 307 public: 308 ImmediatePromiseNodeBase(); 309 ~ImmediatePromiseNodeBase() noexcept(false); 310 311 void onReady(Event* event) noexcept override; 312 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; 313 }; 314 315 template <typename T> 316 class ImmediatePromiseNode final: public ImmediatePromiseNodeBase { 317 // A promise that has already been resolved to an immediate value or exception. 318 319 public: 320 ImmediatePromiseNode(ExceptionOr<T>&& result): result(kj::mv(result)) {} 321 322 void get(ExceptionOrValue& output) noexcept override { 323 output.as<T>() = kj::mv(result); 324 } 325 326 private: 327 ExceptionOr<T> result; 328 }; 329 330 class ImmediateBrokenPromiseNode final: public ImmediatePromiseNodeBase { 331 public: 332 ImmediateBrokenPromiseNode(Exception&& exception); 333 334 void get(ExceptionOrValue& output) noexcept override; 335 336 private: 337 Exception exception; 338 }; 339 340 // ------------------------------------------------------------------- 341 342 class AttachmentPromiseNodeBase: public PromiseNode { 343 public: 344 AttachmentPromiseNodeBase(Own<PromiseNode>&& dependency); 345 346 void onReady(Event* event) noexcept override; 347 void get(ExceptionOrValue& output) noexcept override; 348 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; 349 350 private: 351 Own<PromiseNode> dependency; 352 353 void dropDependency(); 354 355 template <typename> 356 friend class AttachmentPromiseNode; 357 }; 358 359 template <typename Attachment> 360 class AttachmentPromiseNode final: public AttachmentPromiseNodeBase { 361 // A PromiseNode that holds on to some object (usually, an Own<T>, but could be any movable 362 // object) until the promise resolves. 363 364 public: 365 AttachmentPromiseNode(Own<PromiseNode>&& dependency, Attachment&& attachment) 366 : AttachmentPromiseNodeBase(kj::mv(dependency)), 367 attachment(kj::mv<Attachment>(attachment)) {} 368 369 ~AttachmentPromiseNode() noexcept(false) { 370 // We need to make sure the dependency is deleted before we delete the attachment because the 371 // dependency may be using the attachment. 372 dropDependency(); 373 } 374 375 private: 376 Attachment attachment; 377 }; 378 379 // ------------------------------------------------------------------- 380 381 #if __GNUC__ >= 8 && !__clang__ 382 // GCC 8's class-memaccess warning rightly does not like the memcpy()'s below, but there's no 383 // "legal" way for us to extract the content of a PTMF so too bad. 384 #pragma GCC diagnostic push 385 #pragma GCC diagnostic ignored "-Wclass-memaccess" 386 #if __GNUC__ >= 11 387 // GCC 11's array-bounds is similarly upset with us for digging into "private" implementation 388 // details. But the format is well-defined by the ABI which cannot change so please just let us 389 // do it kthx. 390 #pragma GCC diagnostic ignored "-Warray-bounds" 391 #endif 392 #endif 393 394 template <typename T, typename ReturnType, typename... ParamTypes> 395 void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...)); 396 template <typename T, typename ReturnType, typename... ParamTypes> 397 void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const); 398 // Given an object and a pointer-to-method, return the start address of the method's code. The 399 // intent is that this address can be used in a trace; addr2line should map it to the start of 400 // the function's definition. For virtual methods, this does a vtable lookup on `obj` to determine 401 // the address of the specific implementation (otherwise, `obj` wouldn't be needed). 402 // 403 // Note that if the method is overloaded or is a template, you will need to explicitly specify 404 // the param and return types, otherwise the compiler won't know which overload / template 405 // specialization you are requesting. 406 407 class PtmfHelper { 408 // This class is a private helper for GetFunctorStartAddress and getMethodStartAddress(). The 409 // class represents the internal representation of a pointer-to-member-function. 410 411 template <typename... ParamTypes> 412 friend struct GetFunctorStartAddress; 413 template <typename T, typename ReturnType, typename... ParamTypes> 414 friend void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...)); 415 template <typename T, typename ReturnType, typename... ParamTypes> 416 friend void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const); 417 418 #if __GNUG__ 419 420 void* ptr; 421 ptrdiff_t adj; 422 // Layout of a pointer-to-member-function used by GCC and compatible compilers. 423 424 void* apply(const void* obj) { 425 #if defined(__arm__) || defined(__mips__) || defined(__aarch64__) 426 if (adj & 1) { 427 ptrdiff_t voff = (ptrdiff_t)ptr; 428 #else 429 ptrdiff_t voff = (ptrdiff_t)ptr; 430 if (voff & 1) { 431 voff &= ~1; 432 #endif 433 return *(void**)(*(char**)obj + voff); 434 } else { 435 return ptr; 436 } 437 } 438 439 #define BODY \ 440 PtmfHelper result; \ 441 static_assert(sizeof(p) == sizeof(result), "unknown ptmf layout"); \ 442 memcpy(&result, &p, sizeof(result)); \ 443 return result 444 445 #else // __GNUG__ 446 447 void* apply(const void* obj) { return nullptr; } 448 // TODO(port): PTMF instruction address extraction 449 450 #define BODY return PtmfHelper{} 451 452 #endif // __GNUG__, else 453 454 template <typename R, typename C, typename... P, typename F> 455 static PtmfHelper from(F p) { BODY; } 456 // Create a PtmfHelper from some arbitrary pointer-to-member-function which is not 457 // overloaded nor a template. In this case the compiler is able to deduce the full function 458 // signature directly given the name since there is only one function with that name. 459 460 template <typename R, typename C, typename... P> 461 static PtmfHelper from(R (C::*p)(NoInfer<P>...)) { BODY; } 462 template <typename R, typename C, typename... P> 463 static PtmfHelper from(R (C::*p)(NoInfer<P>...) const) { BODY; } 464 // Create a PtmfHelper from some poniter-to-member-function which is a template. In this case 465 // the function must match exactly the containing type C, return type R, and parameter types P... 466 // GetFunctorStartAddress normally specifies exactly the correct C and R, but can only make a 467 // guess at P. Luckily, if the function parameters are template parameters then it's not 468 // necessary to be precise about P. 469 #undef BODY 470 }; 471 472 #if __GNUC__ >= 8 && !__clang__ 473 #pragma GCC diagnostic pop 474 #endif 475 476 template <typename T, typename ReturnType, typename... ParamTypes> 477 void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...)) { 478 return PtmfHelper::from<ReturnType, T, ParamTypes...>(method).apply(&obj); 479 } 480 template <typename T, typename ReturnType, typename... ParamTypes> 481 void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const) { 482 return PtmfHelper::from<ReturnType, T, ParamTypes...>(method).apply(&obj); 483 } 484 485 template <typename... ParamTypes> 486 struct GetFunctorStartAddress { 487 // Given a functor (any object defining operator()), return the start address of the function, 488 // suitable for passing to addr2line to obtain a source file/line for debugging purposes. 489 // 490 // This turns out to be incredibly hard to implement in the presence of overloaded or templated 491 // functors. Therefore, we impose these specific restrictions, specific to our use case: 492 // - Overloading is not allowed, but templating is. (Generally we only intend to support lambdas 493 // anyway.) 494 // - The template parameters to GetFunctorStartAddress specify a hint as to the expected 495 // parameter types. If the functor is templated, its parameters must match exactly these types. 496 // (If it's not templated, ParamTypes are ignored.) 497 498 template <typename Func> 499 static void* apply(Func&& func) { 500 typedef decltype(func(instance<ParamTypes>()...)) ReturnType; 501 return PtmfHelper::from<ReturnType, Decay<Func>, ParamTypes...>( 502 &Decay<Func>::operator()).apply(&func); 503 } 504 }; 505 506 template <> 507 struct GetFunctorStartAddress<Void&&>: public GetFunctorStartAddress<> {}; 508 // Hack for TransformPromiseNode use case: an input type of `Void` indicates that the function 509 // actually has no parameters. 510 511 class TransformPromiseNodeBase: public PromiseNode { 512 public: 513 TransformPromiseNodeBase(Own<PromiseNode>&& dependency, void* continuationTracePtr); 514 515 void onReady(Event* event) noexcept override; 516 void get(ExceptionOrValue& output) noexcept override; 517 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; 518 519 private: 520 Own<PromiseNode> dependency; 521 void* continuationTracePtr; 522 523 void dropDependency(); 524 void getDepResult(ExceptionOrValue& output); 525 526 virtual void getImpl(ExceptionOrValue& output) = 0; 527 528 template <typename, typename, typename, typename> 529 friend class TransformPromiseNode; 530 }; 531 532 template <typename T, typename DepT, typename Func, typename ErrorFunc> 533 class TransformPromiseNode final: public TransformPromiseNodeBase { 534 // A PromiseNode that transforms the result of another PromiseNode through an application-provided 535 // function (implements `then()`). 536 537 public: 538 TransformPromiseNode(Own<PromiseNode>&& dependency, Func&& func, ErrorFunc&& errorHandler, 539 void* continuationTracePtr) 540 : TransformPromiseNodeBase(kj::mv(dependency), continuationTracePtr), 541 func(kj::fwd<Func>(func)), errorHandler(kj::fwd<ErrorFunc>(errorHandler)) {} 542 543 ~TransformPromiseNode() noexcept(false) { 544 // We need to make sure the dependency is deleted before we delete the continuations because it 545 // is a common pattern for the continuations to hold ownership of objects that might be in-use 546 // by the dependency. 547 dropDependency(); 548 } 549 550 private: 551 Func func; 552 ErrorFunc errorHandler; 553 554 void getImpl(ExceptionOrValue& output) override { 555 ExceptionOr<DepT> depResult; 556 getDepResult(depResult); 557 KJ_IF_MAYBE(depException, depResult.exception) { 558 output.as<T>() = handle( 559 MaybeVoidCaller<Exception, FixVoid<ReturnType<ErrorFunc, Exception>>>::apply( 560 errorHandler, kj::mv(*depException))); 561 } else KJ_IF_MAYBE(depValue, depResult.value) { 562 output.as<T>() = handle(MaybeVoidCaller<DepT, T>::apply(func, kj::mv(*depValue))); 563 } 564 } 565 566 ExceptionOr<T> handle(T&& value) { 567 return kj::mv(value); 568 } 569 ExceptionOr<T> handle(PropagateException::Bottom&& value) { 570 return ExceptionOr<T>(false, value.asException()); 571 } 572 }; 573 574 // ------------------------------------------------------------------- 575 576 class ForkHubBase; 577 578 class ForkBranchBase: public PromiseNode { 579 public: 580 ForkBranchBase(Own<ForkHubBase>&& hub); 581 ~ForkBranchBase() noexcept(false); 582 583 void hubReady() noexcept; 584 // Called by the hub to indicate that it is ready. 585 586 // implements PromiseNode ------------------------------------------ 587 void onReady(Event* event) noexcept override; 588 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; 589 590 protected: 591 inline ExceptionOrValue& getHubResultRef(); 592 593 void releaseHub(ExceptionOrValue& output); 594 // Release the hub. If an exception is thrown, add it to `output`. 595 596 private: 597 OnReadyEvent onReadyEvent; 598 599 Own<ForkHubBase> hub; 600 ForkBranchBase* next = nullptr; 601 ForkBranchBase** prevPtr = nullptr; 602 603 friend class ForkHubBase; 604 }; 605 606 template <typename T> T copyOrAddRef(T& t) { return t; } 607 template <typename T> Own<T> copyOrAddRef(Own<T>& t) { return t->addRef(); } 608 template <typename T> Maybe<Own<T>> copyOrAddRef(Maybe<Own<T>>& t) { 609 return t.map([](Own<T>& ptr) { 610 return ptr->addRef(); 611 }); 612 } 613 614 template <typename T> 615 class ForkBranch final: public ForkBranchBase { 616 // A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives 617 // a const reference. 618 619 public: 620 ForkBranch(Own<ForkHubBase>&& hub): ForkBranchBase(kj::mv(hub)) {} 621 622 void get(ExceptionOrValue& output) noexcept override { 623 ExceptionOr<T>& hubResult = getHubResultRef().template as<T>(); 624 KJ_IF_MAYBE(value, hubResult.value) { 625 output.as<T>().value = copyOrAddRef(*value); 626 } else { 627 output.as<T>().value = nullptr; 628 } 629 output.exception = hubResult.exception; 630 releaseHub(output); 631 } 632 }; 633 634 template <typename T, size_t index> 635 class SplitBranch final: public ForkBranchBase { 636 // A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives 637 // a const reference. 638 639 public: 640 SplitBranch(Own<ForkHubBase>&& hub): ForkBranchBase(kj::mv(hub)) {} 641 642 typedef kj::Decay<decltype(kj::get<index>(kj::instance<T>()))> Element; 643 644 void get(ExceptionOrValue& output) noexcept override { 645 ExceptionOr<T>& hubResult = getHubResultRef().template as<T>(); 646 KJ_IF_MAYBE(value, hubResult.value) { 647 output.as<Element>().value = kj::mv(kj::get<index>(*value)); 648 } else { 649 output.as<Element>().value = nullptr; 650 } 651 output.exception = hubResult.exception; 652 releaseHub(output); 653 } 654 }; 655 656 // ------------------------------------------------------------------- 657 658 class ForkHubBase: public Refcounted, protected Event { 659 public: 660 ForkHubBase(Own<PromiseNode>&& inner, ExceptionOrValue& resultRef); 661 662 inline ExceptionOrValue& getResultRef() { return resultRef; } 663 664 private: 665 Own<PromiseNode> inner; 666 ExceptionOrValue& resultRef; 667 668 ForkBranchBase* headBranch = nullptr; 669 ForkBranchBase** tailBranch = &headBranch; 670 // Tail becomes null once the inner promise is ready and all branches have been notified. 671 672 Maybe<Own<Event>> fire() override; 673 void traceEvent(TraceBuilder& builder) override; 674 675 friend class ForkBranchBase; 676 }; 677 678 template <typename T> 679 class ForkHub final: public ForkHubBase { 680 // A PromiseNode that implements the hub of a fork. The first call to Promise::fork() replaces 681 // the promise's outer node with a ForkHub, and subsequent calls add branches to that hub (if 682 // possible). 683 684 public: 685 ForkHub(Own<PromiseNode>&& inner): ForkHubBase(kj::mv(inner), result) {} 686 687 Promise<_::UnfixVoid<T>> addBranch() { 688 return _::PromiseNode::to<Promise<_::UnfixVoid<T>>>(kj::heap<ForkBranch<T>>(addRef(*this))); 689 } 690 691 _::SplitTuplePromise<T> split() { 692 return splitImpl(MakeIndexes<tupleSize<T>()>()); 693 } 694 695 private: 696 ExceptionOr<T> result; 697 698 template <size_t... indexes> 699 _::SplitTuplePromise<T> splitImpl(Indexes<indexes...>) { 700 return kj::tuple(addSplit<indexes>()...); 701 } 702 703 template <size_t index> 704 ReducePromises<typename SplitBranch<T, index>::Element> addSplit() { 705 return _::PromiseNode::to<ReducePromises<typename SplitBranch<T, index>::Element>>( 706 maybeChain(kj::heap<SplitBranch<T, index>>(addRef(*this)), 707 implicitCast<typename SplitBranch<T, index>::Element*>(nullptr))); 708 } 709 }; 710 711 inline ExceptionOrValue& ForkBranchBase::getHubResultRef() { 712 return hub->getResultRef(); 713 } 714 715 // ------------------------------------------------------------------- 716 717 class ChainPromiseNode final: public PromiseNode, public Event { 718 // Promise node which reduces Promise<Promise<T>> to Promise<T>. 719 // 720 // `Event` is only a public base class because otherwise we can't cast Own<ChainPromiseNode> to 721 // Own<Event>. Ugh, templates and private... 722 723 public: 724 explicit ChainPromiseNode(Own<PromiseNode> inner); 725 ~ChainPromiseNode() noexcept(false); 726 727 void onReady(Event* event) noexcept override; 728 void setSelfPointer(Own<PromiseNode>* selfPtr) noexcept override; 729 void get(ExceptionOrValue& output) noexcept override; 730 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; 731 732 private: 733 enum State { 734 STEP1, 735 STEP2 736 }; 737 738 State state; 739 740 Own<PromiseNode> inner; 741 // In STEP1, a PromiseNode for a Promise<T>. 742 // In STEP2, a PromiseNode for a T. 743 744 Event* onReadyEvent = nullptr; 745 Own<PromiseNode>* selfPtr = nullptr; 746 747 Maybe<Own<Event>> fire() override; 748 void traceEvent(TraceBuilder& builder) override; 749 }; 750 751 template <typename T> 752 Own<PromiseNode> maybeChain(Own<PromiseNode>&& node, Promise<T>*) { 753 return heap<ChainPromiseNode>(kj::mv(node)); 754 } 755 756 template <typename T> 757 Own<PromiseNode>&& maybeChain(Own<PromiseNode>&& node, T*) { 758 return kj::mv(node); 759 } 760 761 template <typename T, typename Result = decltype(T::reducePromise(instance<Promise<T>>()))> 762 inline Result maybeReduce(Promise<T>&& promise, bool) { 763 return T::reducePromise(kj::mv(promise)); 764 } 765 766 template <typename T> 767 inline Promise<T> maybeReduce(Promise<T>&& promise, ...) { 768 return kj::mv(promise); 769 } 770 771 // ------------------------------------------------------------------- 772 773 class ExclusiveJoinPromiseNode final: public PromiseNode { 774 public: 775 ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<PromiseNode> right); 776 ~ExclusiveJoinPromiseNode() noexcept(false); 777 778 void onReady(Event* event) noexcept override; 779 void get(ExceptionOrValue& output) noexcept override; 780 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; 781 782 private: 783 class Branch: public Event { 784 public: 785 Branch(ExclusiveJoinPromiseNode& joinNode, Own<PromiseNode> dependency); 786 ~Branch() noexcept(false); 787 788 bool get(ExceptionOrValue& output); 789 // Returns true if this is the side that finished. 790 791 Maybe<Own<Event>> fire() override; 792 void traceEvent(TraceBuilder& builder) override; 793 794 private: 795 ExclusiveJoinPromiseNode& joinNode; 796 Own<PromiseNode> dependency; 797 798 friend class ExclusiveJoinPromiseNode; 799 }; 800 801 Branch left; 802 Branch right; 803 OnReadyEvent onReadyEvent; 804 }; 805 806 // ------------------------------------------------------------------- 807 808 class ArrayJoinPromiseNodeBase: public PromiseNode { 809 public: 810 ArrayJoinPromiseNodeBase(Array<Own<PromiseNode>> promises, 811 ExceptionOrValue* resultParts, size_t partSize); 812 ~ArrayJoinPromiseNodeBase() noexcept(false); 813 814 void onReady(Event* event) noexcept override final; 815 void get(ExceptionOrValue& output) noexcept override final; 816 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override final; 817 818 protected: 819 virtual void getNoError(ExceptionOrValue& output) noexcept = 0; 820 // Called to compile the result only in the case where there were no errors. 821 822 private: 823 uint countLeft; 824 OnReadyEvent onReadyEvent; 825 826 class Branch final: public Event { 827 public: 828 Branch(ArrayJoinPromiseNodeBase& joinNode, Own<PromiseNode> dependency, 829 ExceptionOrValue& output); 830 ~Branch() noexcept(false); 831 832 Maybe<Own<Event>> fire() override; 833 void traceEvent(TraceBuilder& builder) override; 834 835 Maybe<Exception> getPart(); 836 // Calls dependency->get(output). If there was an exception, return it. 837 838 private: 839 ArrayJoinPromiseNodeBase& joinNode; 840 Own<PromiseNode> dependency; 841 ExceptionOrValue& output; 842 843 friend class ArrayJoinPromiseNodeBase; 844 }; 845 846 Array<Branch> branches; 847 }; 848 849 template <typename T> 850 class ArrayJoinPromiseNode final: public ArrayJoinPromiseNodeBase { 851 public: 852 ArrayJoinPromiseNode(Array<Own<PromiseNode>> promises, 853 Array<ExceptionOr<T>> resultParts) 854 : ArrayJoinPromiseNodeBase(kj::mv(promises), resultParts.begin(), sizeof(ExceptionOr<T>)), 855 resultParts(kj::mv(resultParts)) {} 856 857 protected: 858 void getNoError(ExceptionOrValue& output) noexcept override { 859 auto builder = heapArrayBuilder<T>(resultParts.size()); 860 for (auto& part: resultParts) { 861 KJ_IASSERT(part.value != nullptr, 862 "Bug in KJ promise framework: Promise result had neither value no exception."); 863 builder.add(kj::mv(*_::readMaybe(part.value))); 864 } 865 output.as<Array<T>>() = builder.finish(); 866 } 867 868 private: 869 Array<ExceptionOr<T>> resultParts; 870 }; 871 872 template <> 873 class ArrayJoinPromiseNode<void> final: public ArrayJoinPromiseNodeBase { 874 public: 875 ArrayJoinPromiseNode(Array<Own<PromiseNode>> promises, 876 Array<ExceptionOr<_::Void>> resultParts); 877 ~ArrayJoinPromiseNode(); 878 879 protected: 880 void getNoError(ExceptionOrValue& output) noexcept override; 881 882 private: 883 Array<ExceptionOr<_::Void>> resultParts; 884 }; 885 886 // ------------------------------------------------------------------- 887 888 class EagerPromiseNodeBase: public PromiseNode, protected Event { 889 // A PromiseNode that eagerly evaluates its dependency even if its dependent does not eagerly 890 // evaluate it. 891 892 public: 893 EagerPromiseNodeBase(Own<PromiseNode>&& dependency, ExceptionOrValue& resultRef); 894 895 void onReady(Event* event) noexcept override; 896 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; 897 898 private: 899 Own<PromiseNode> dependency; 900 OnReadyEvent onReadyEvent; 901 902 ExceptionOrValue& resultRef; 903 904 Maybe<Own<Event>> fire() override; 905 void traceEvent(TraceBuilder& builder) override; 906 }; 907 908 template <typename T> 909 class EagerPromiseNode final: public EagerPromiseNodeBase { 910 public: 911 EagerPromiseNode(Own<PromiseNode>&& dependency) 912 : EagerPromiseNodeBase(kj::mv(dependency), result) {} 913 914 void get(ExceptionOrValue& output) noexcept override { 915 output.as<T>() = kj::mv(result); 916 } 917 918 private: 919 ExceptionOr<T> result; 920 }; 921 922 template <typename T> 923 Own<PromiseNode> spark(Own<PromiseNode>&& node) { 924 // Forces evaluation of the given node to begin as soon as possible, even if no one is waiting 925 // on it. 926 return heap<EagerPromiseNode<T>>(kj::mv(node)); 927 } 928 929 // ------------------------------------------------------------------- 930 931 class AdapterPromiseNodeBase: public PromiseNode { 932 public: 933 void onReady(Event* event) noexcept override; 934 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; 935 936 protected: 937 inline void setReady() { 938 onReadyEvent.arm(); 939 } 940 941 private: 942 OnReadyEvent onReadyEvent; 943 }; 944 945 template <typename T, typename Adapter> 946 class AdapterPromiseNode final: public AdapterPromiseNodeBase, 947 private PromiseFulfiller<UnfixVoid<T>> { 948 // A PromiseNode that wraps a PromiseAdapter. 949 950 public: 951 template <typename... Params> 952 AdapterPromiseNode(Params&&... params) 953 : adapter(static_cast<PromiseFulfiller<UnfixVoid<T>>&>(*this), kj::fwd<Params>(params)...) {} 954 955 void get(ExceptionOrValue& output) noexcept override { 956 KJ_IREQUIRE(!isWaiting()); 957 output.as<T>() = kj::mv(result); 958 } 959 960 private: 961 ExceptionOr<T> result; 962 bool waiting = true; 963 Adapter adapter; 964 965 void fulfill(T&& value) override { 966 if (waiting) { 967 waiting = false; 968 result = ExceptionOr<T>(kj::mv(value)); 969 setReady(); 970 } 971 } 972 973 void reject(Exception&& exception) override { 974 if (waiting) { 975 waiting = false; 976 result = ExceptionOr<T>(false, kj::mv(exception)); 977 setReady(); 978 } 979 } 980 981 bool isWaiting() override { 982 return waiting; 983 } 984 }; 985 986 // ------------------------------------------------------------------- 987 988 class FiberBase: public PromiseNode, private Event { 989 // Base class for the outer PromiseNode representing a fiber. 990 991 public: 992 explicit FiberBase(size_t stackSize, _::ExceptionOrValue& result); 993 explicit FiberBase(const FiberPool& pool, _::ExceptionOrValue& result); 994 ~FiberBase() noexcept(false); 995 996 void start() { armDepthFirst(); } 997 // Call immediately after construction to begin executing the fiber. 998 999 class WaitDoneEvent; 1000 1001 void onReady(_::Event* event) noexcept override; 1002 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; 1003 1004 protected: 1005 bool isFinished() { return state == FINISHED; } 1006 void destroy(); 1007 1008 private: 1009 enum { WAITING, RUNNING, CANCELED, FINISHED } state; 1010 1011 _::PromiseNode* currentInner = nullptr; 1012 OnReadyEvent onReadyEvent; 1013 Own<FiberStack> stack; 1014 _::ExceptionOrValue& result; 1015 1016 void run(); 1017 virtual void runImpl(WaitScope& waitScope) = 0; 1018 1019 Maybe<Own<Event>> fire() override; 1020 void traceEvent(TraceBuilder& builder) override; 1021 // Implements Event. Each time the event is fired, switchToFiber() is called. 1022 1023 friend class FiberStack; 1024 friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, 1025 WaitScope& waitScope); 1026 friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope); 1027 }; 1028 1029 template <typename Func> 1030 class Fiber final: public FiberBase { 1031 public: 1032 explicit Fiber(size_t stackSize, Func&& func): FiberBase(stackSize, result), func(kj::fwd<Func>(func)) {} 1033 explicit Fiber(const FiberPool& pool, Func&& func): FiberBase(pool, result), func(kj::fwd<Func>(func)) {} 1034 ~Fiber() noexcept(false) { destroy(); } 1035 1036 typedef FixVoid<decltype(kj::instance<Func&>()(kj::instance<WaitScope&>()))> ResultType; 1037 1038 void get(ExceptionOrValue& output) noexcept override { 1039 KJ_IREQUIRE(isFinished()); 1040 output.as<ResultType>() = kj::mv(result); 1041 } 1042 1043 private: 1044 Func func; 1045 ExceptionOr<ResultType> result; 1046 1047 void runImpl(WaitScope& waitScope) override { 1048 result.template as<ResultType>() = 1049 MaybeVoidCaller<WaitScope&, ResultType>::apply(func, waitScope); 1050 } 1051 }; 1052 1053 } // namespace _ (private) 1054 1055 // ======================================================================================= 1056 1057 template <typename T> 1058 Promise<T>::Promise(_::FixVoid<T> value) 1059 : PromiseBase(heap<_::ImmediatePromiseNode<_::FixVoid<T>>>(kj::mv(value))) {} 1060 1061 template <typename T> 1062 Promise<T>::Promise(kj::Exception&& exception) 1063 : PromiseBase(heap<_::ImmediateBrokenPromiseNode>(kj::mv(exception))) {} 1064 1065 template <typename T> 1066 template <typename Func, typename ErrorFunc> 1067 PromiseForResult<Func, T> Promise<T>::then(Func&& func, ErrorFunc&& errorHandler) { 1068 typedef _::FixVoid<_::ReturnType<Func, T>> ResultT; 1069 1070 void* continuationTracePtr = _::GetFunctorStartAddress<_::FixVoid<T>&&>::apply(func); 1071 Own<_::PromiseNode> intermediate = 1072 heap<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>( 1073 kj::mv(node), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler), 1074 continuationTracePtr); 1075 auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, T>>>( 1076 _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr))); 1077 return _::maybeReduce(kj::mv(result), false); 1078 } 1079 1080 namespace _ { // private 1081 1082 template <typename T> 1083 struct IdentityFunc { 1084 inline T operator()(T&& value) const { 1085 return kj::mv(value); 1086 } 1087 }; 1088 template <typename T> 1089 struct IdentityFunc<Promise<T>> { 1090 inline Promise<T> operator()(T&& value) const { 1091 return kj::mv(value); 1092 } 1093 }; 1094 template <> 1095 struct IdentityFunc<void> { 1096 inline void operator()() const {} 1097 }; 1098 template <> 1099 struct IdentityFunc<Promise<void>> { 1100 Promise<void> operator()() const; 1101 // This can't be inline because it will make the translation unit depend on kj-async. Awkwardly, 1102 // Cap'n Proto relies on being able to include this header without creating such a link-time 1103 // dependency. 1104 }; 1105 1106 } // namespace _ (private) 1107 1108 template <typename T> 1109 template <typename ErrorFunc> 1110 Promise<T> Promise<T>::catch_(ErrorFunc&& errorHandler) { 1111 // then()'s ErrorFunc can only return a Promise if Func also returns a Promise. In this case, 1112 // Func is being filled in automatically. We want to make sure ErrorFunc can return a Promise, 1113 // but we don't want the extra overhead of promise chaining if ErrorFunc doesn't actually 1114 // return a promise. So we make our Func return match ErrorFunc. 1115 typedef _::IdentityFunc<decltype(errorHandler(instance<Exception&&>()))> Func; 1116 typedef _::FixVoid<_::ReturnType<Func, T>> ResultT; 1117 1118 // The reason catch_() isn't simply implemented in terms of then() is because we want the trace 1119 // pointer to be based on ErrorFunc rather than Func. 1120 void* continuationTracePtr = _::GetFunctorStartAddress<kj::Exception&&>::apply(errorHandler); 1121 Own<_::PromiseNode> intermediate = 1122 heap<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>( 1123 kj::mv(node), Func(), kj::fwd<ErrorFunc>(errorHandler), continuationTracePtr); 1124 auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, T>>>( 1125 _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr))); 1126 return _::maybeReduce(kj::mv(result), false); 1127 } 1128 1129 template <typename T> 1130 T Promise<T>::wait(WaitScope& waitScope) { 1131 _::ExceptionOr<_::FixVoid<T>> result; 1132 _::waitImpl(kj::mv(node), result, waitScope); 1133 return convertToReturn(kj::mv(result)); 1134 } 1135 1136 template <typename T> 1137 bool Promise<T>::poll(WaitScope& waitScope) { 1138 return _::pollImpl(*node, waitScope); 1139 } 1140 1141 template <typename T> 1142 ForkedPromise<T> Promise<T>::fork() { 1143 return ForkedPromise<T>(false, refcounted<_::ForkHub<_::FixVoid<T>>>(kj::mv(node))); 1144 } 1145 1146 template <typename T> 1147 Promise<T> ForkedPromise<T>::addBranch() { 1148 return hub->addBranch(); 1149 } 1150 1151 template <typename T> 1152 bool ForkedPromise<T>::hasBranches() { 1153 return hub->isShared(); 1154 } 1155 1156 template <typename T> 1157 _::SplitTuplePromise<T> Promise<T>::split() { 1158 return refcounted<_::ForkHub<_::FixVoid<T>>>(kj::mv(node))->split(); 1159 } 1160 1161 template <typename T> 1162 Promise<T> Promise<T>::exclusiveJoin(Promise<T>&& other) { 1163 return Promise(false, heap<_::ExclusiveJoinPromiseNode>(kj::mv(node), kj::mv(other.node))); 1164 } 1165 1166 template <typename T> 1167 template <typename... Attachments> 1168 Promise<T> Promise<T>::attach(Attachments&&... attachments) { 1169 return Promise(false, kj::heap<_::AttachmentPromiseNode<Tuple<Attachments...>>>( 1170 kj::mv(node), kj::tuple(kj::fwd<Attachments>(attachments)...))); 1171 } 1172 1173 template <typename T> 1174 template <typename ErrorFunc> 1175 Promise<T> Promise<T>::eagerlyEvaluate(ErrorFunc&& errorHandler) { 1176 // See catch_() for commentary. 1177 return Promise(false, _::spark<_::FixVoid<T>>(then( 1178 _::IdentityFunc<decltype(errorHandler(instance<Exception&&>()))>(), 1179 kj::fwd<ErrorFunc>(errorHandler)).node)); 1180 } 1181 1182 template <typename T> 1183 Promise<T> Promise<T>::eagerlyEvaluate(decltype(nullptr)) { 1184 return Promise(false, _::spark<_::FixVoid<T>>(kj::mv(node))); 1185 } 1186 1187 template <typename T> 1188 kj::String Promise<T>::trace() { 1189 return PromiseBase::trace(); 1190 } 1191 1192 template <typename Func> 1193 inline PromiseForResult<Func, void> evalLater(Func&& func) { 1194 return _::yield().then(kj::fwd<Func>(func), _::PropagateException()); 1195 } 1196 1197 template <typename Func> 1198 inline PromiseForResult<Func, void> evalLast(Func&& func) { 1199 return _::yieldHarder().then(kj::fwd<Func>(func), _::PropagateException()); 1200 } 1201 1202 template <typename Func> 1203 inline PromiseForResult<Func, void> evalNow(Func&& func) { 1204 PromiseForResult<Func, void> result = nullptr; 1205 KJ_IF_MAYBE(e, kj::runCatchingExceptions([&]() { 1206 result = func(); 1207 })) { 1208 result = kj::mv(*e); 1209 } 1210 return result; 1211 } 1212 1213 template <typename Func> 1214 struct RetryOnDisconnect_ { 1215 static inline PromiseForResult<Func, void> apply(Func&& func) { 1216 return evalLater([func = kj::mv(func)]() mutable -> PromiseForResult<Func, void> { 1217 auto promise = evalNow(func); 1218 return promise.catch_([func = kj::mv(func)](kj::Exception&& e) mutable -> PromiseForResult<Func, void> { 1219 if (e.getType() == kj::Exception::Type::DISCONNECTED) { 1220 return func(); 1221 } else { 1222 return kj::mv(e); 1223 } 1224 }); 1225 }); 1226 } 1227 }; 1228 template <typename Func> 1229 struct RetryOnDisconnect_<Func&> { 1230 // Specialization for references. Needed because the syntax for capturing references in a 1231 // lambda is different. :( 1232 static inline PromiseForResult<Func, void> apply(Func& func) { 1233 auto promise = evalLater(func); 1234 return promise.catch_([&func](kj::Exception&& e) -> PromiseForResult<Func, void> { 1235 if (e.getType() == kj::Exception::Type::DISCONNECTED) { 1236 return func(); 1237 } else { 1238 return kj::mv(e); 1239 } 1240 }); 1241 } 1242 }; 1243 1244 template <typename Func> 1245 inline PromiseForResult<Func, void> retryOnDisconnect(Func&& func) { 1246 return RetryOnDisconnect_<Func>::apply(kj::fwd<Func>(func)); 1247 } 1248 1249 template <typename Func> 1250 inline PromiseForResult<Func, WaitScope&> startFiber(size_t stackSize, Func&& func) { 1251 typedef _::FixVoid<_::ReturnType<Func, WaitScope&>> ResultT; 1252 1253 Own<_::FiberBase> intermediate = kj::heap<_::Fiber<Func>>(stackSize, kj::fwd<Func>(func)); 1254 intermediate->start(); 1255 auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, WaitScope&>>>( 1256 _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr))); 1257 return _::maybeReduce(kj::mv(result), false); 1258 } 1259 1260 template <typename Func> 1261 inline PromiseForResult<Func, WaitScope&> FiberPool::startFiber(Func&& func) const { 1262 typedef _::FixVoid<_::ReturnType<Func, WaitScope&>> ResultT; 1263 1264 Own<_::FiberBase> intermediate = kj::heap<_::Fiber<Func>>(*this, kj::fwd<Func>(func)); 1265 intermediate->start(); 1266 auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, WaitScope&>>>( 1267 _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr))); 1268 return _::maybeReduce(kj::mv(result), false); 1269 } 1270 1271 template <typename T> 1272 template <typename ErrorFunc> 1273 void Promise<T>::detach(ErrorFunc&& errorHandler) { 1274 return _::detach(then([](T&&) {}, kj::fwd<ErrorFunc>(errorHandler))); 1275 } 1276 1277 template <> 1278 template <typename ErrorFunc> 1279 void Promise<void>::detach(ErrorFunc&& errorHandler) { 1280 return _::detach(then([]() {}, kj::fwd<ErrorFunc>(errorHandler))); 1281 } 1282 1283 template <typename T> 1284 Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises) { 1285 return _::PromiseNode::to<Promise<Array<T>>>(kj::heap<_::ArrayJoinPromiseNode<T>>( 1286 KJ_MAP(p, promises) { return _::PromiseNode::from(kj::mv(p)); }, 1287 heapArray<_::ExceptionOr<T>>(promises.size()))); 1288 } 1289 1290 // ======================================================================================= 1291 1292 namespace _ { // private 1293 1294 class WeakFulfillerBase: protected kj::Disposer { 1295 protected: 1296 WeakFulfillerBase(): inner(nullptr) {} 1297 virtual ~WeakFulfillerBase() noexcept(false) {} 1298 1299 template <typename T> 1300 inline PromiseFulfiller<T>* getInner() { 1301 return static_cast<PromiseFulfiller<T>*>(inner); 1302 }; 1303 template <typename T> 1304 inline void setInner(PromiseFulfiller<T>* ptr) { 1305 inner = ptr; 1306 }; 1307 1308 private: 1309 mutable PromiseRejector* inner; 1310 1311 void disposeImpl(void* pointer) const override; 1312 }; 1313 1314 template <typename T> 1315 class WeakFulfiller final: public PromiseFulfiller<T>, public WeakFulfillerBase { 1316 // A wrapper around PromiseFulfiller which can be detached. 1317 // 1318 // There are a couple non-trivialities here: 1319 // - If the WeakFulfiller is discarded, we want the promise it fulfills to be implicitly 1320 // rejected. 1321 // - We cannot destroy the WeakFulfiller until the application has discarded it *and* it has been 1322 // detached from the underlying fulfiller, because otherwise the later detach() call will go 1323 // to a dangling pointer. Essentially, WeakFulfiller is reference counted, although the 1324 // refcount never goes over 2 and we manually implement the refcounting because we need to do 1325 // other special things when each side detaches anyway. To this end, WeakFulfiller is its own 1326 // Disposer -- dispose() is called when the application discards its owned pointer to the 1327 // fulfiller and detach() is called when the promise is destroyed. 1328 1329 public: 1330 KJ_DISALLOW_COPY(WeakFulfiller); 1331 1332 static kj::Own<WeakFulfiller> make() { 1333 WeakFulfiller* ptr = new WeakFulfiller; 1334 return Own<WeakFulfiller>(ptr, *ptr); 1335 } 1336 1337 void fulfill(FixVoid<T>&& value) override { 1338 if (getInner<T>() != nullptr) { 1339 getInner<T>()->fulfill(kj::mv(value)); 1340 } 1341 } 1342 1343 void reject(Exception&& exception) override { 1344 if (getInner<T>() != nullptr) { 1345 getInner<T>()->reject(kj::mv(exception)); 1346 } 1347 } 1348 1349 bool isWaiting() override { 1350 return getInner<T>() != nullptr && getInner<T>()->isWaiting(); 1351 } 1352 1353 void attach(PromiseFulfiller<T>& newInner) { 1354 setInner<T>(&newInner); 1355 } 1356 1357 void detach(PromiseFulfiller<T>& from) { 1358 if (getInner<T>() == nullptr) { 1359 // Already disposed. 1360 delete this; 1361 } else { 1362 KJ_IREQUIRE(getInner<T>() == &from); 1363 setInner<T>(nullptr); 1364 } 1365 } 1366 1367 private: 1368 WeakFulfiller() {} 1369 }; 1370 1371 template <typename T> 1372 class PromiseAndFulfillerAdapter { 1373 public: 1374 PromiseAndFulfillerAdapter(PromiseFulfiller<T>& fulfiller, 1375 WeakFulfiller<T>& wrapper) 1376 : fulfiller(fulfiller), wrapper(wrapper) { 1377 wrapper.attach(fulfiller); 1378 } 1379 1380 ~PromiseAndFulfillerAdapter() noexcept(false) { 1381 wrapper.detach(fulfiller); 1382 } 1383 1384 private: 1385 PromiseFulfiller<T>& fulfiller; 1386 WeakFulfiller<T>& wrapper; 1387 }; 1388 1389 } // namespace _ (private) 1390 1391 template <typename T> 1392 template <typename Func> 1393 bool PromiseFulfiller<T>::rejectIfThrows(Func&& func) { 1394 KJ_IF_MAYBE(exception, kj::runCatchingExceptions(kj::mv(func))) { 1395 reject(kj::mv(*exception)); 1396 return false; 1397 } else { 1398 return true; 1399 } 1400 } 1401 1402 template <typename Func> 1403 bool PromiseFulfiller<void>::rejectIfThrows(Func&& func) { 1404 KJ_IF_MAYBE(exception, kj::runCatchingExceptions(kj::mv(func))) { 1405 reject(kj::mv(*exception)); 1406 return false; 1407 } else { 1408 return true; 1409 } 1410 } 1411 1412 template <typename T, typename Adapter, typename... Params> 1413 _::ReducePromises<T> newAdaptedPromise(Params&&... adapterConstructorParams) { 1414 Own<_::PromiseNode> intermediate( 1415 heap<_::AdapterPromiseNode<_::FixVoid<T>, Adapter>>( 1416 kj::fwd<Params>(adapterConstructorParams)...)); 1417 return _::PromiseNode::to<_::ReducePromises<T>>( 1418 _::maybeChain(kj::mv(intermediate), implicitCast<T*>(nullptr))); 1419 } 1420 1421 template <typename T> 1422 PromiseFulfillerPair<T> newPromiseAndFulfiller() { 1423 auto wrapper = _::WeakFulfiller<T>::make(); 1424 1425 Own<_::PromiseNode> intermediate( 1426 heap<_::AdapterPromiseNode<_::FixVoid<T>, _::PromiseAndFulfillerAdapter<T>>>(*wrapper)); 1427 auto promise = _::PromiseNode::to<_::ReducePromises<T>>( 1428 _::maybeChain(kj::mv(intermediate), implicitCast<T*>(nullptr))); 1429 1430 return PromiseFulfillerPair<T> { kj::mv(promise), kj::mv(wrapper) }; 1431 } 1432 1433 // ======================================================================================= 1434 // cross-thread stuff 1435 1436 namespace _ { // (private) 1437 1438 class XThreadEvent: private Event, // it's an event in the target thread 1439 public PromiseNode { // it's a PromiseNode in the requesting thread 1440 public: 1441 XThreadEvent(ExceptionOrValue& result, const Executor& targetExecutor, void* funcTracePtr); 1442 1443 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; 1444 1445 protected: 1446 void ensureDoneOrCanceled(); 1447 // MUST be called in destructor of subclasses to make sure the object is not destroyed while 1448 // still being accessed by the other thread. (This can't be placed in ~XThreadEvent() because 1449 // that destructor doesn't run until the subclass has already been destroyed.) 1450 1451 virtual kj::Maybe<Own<PromiseNode>> execute() = 0; 1452 // Run the function. If the function returns a promise, returns the inner PromiseNode, otherwise 1453 // returns null. 1454 1455 // implements PromiseNode ---------------------------------------------------- 1456 void onReady(Event* event) noexcept override; 1457 1458 private: 1459 ExceptionOrValue& result; 1460 void* funcTracePtr; 1461 1462 kj::Own<const Executor> targetExecutor; 1463 Maybe<const Executor&> replyExecutor; // If executeAsync() was used. 1464 1465 kj::Maybe<Own<PromiseNode>> promiseNode; 1466 // Accessed only in target thread. 1467 1468 ListLink<XThreadEvent> targetLink; 1469 // Membership in one of the linked lists in the target Executor's work list or cancel list. These 1470 // fields are protected by the target Executor's mutex. 1471 1472 enum { 1473 UNUSED, 1474 // Object was never queued on another thread. 1475 1476 QUEUED, 1477 // Target thread has not yet dequeued the event from the state.start list. The requesting 1478 // thread can cancel execution by removing the event from the list. 1479 1480 EXECUTING, 1481 // Target thread has dequeued the event from state.start and moved it to state.executing. To 1482 // cancel, the requesting thread must add the event to the state.cancel list and change the 1483 // state to CANCELING. 1484 1485 CANCELING, 1486 // Requesting thread is trying to cancel this event. The target thread will change the state to 1487 // `DONE` once canceled. 1488 1489 DONE 1490 // Target thread has completed handling this event and will not touch it again. The requesting 1491 // thread can safely delete the object. The `state` is updated to `DONE` using an atomic 1492 // release operation after ensuring that the event will not be touched again, so that the 1493 // requesting can safely skip locking if it observes the state is already DONE. 1494 } state = UNUSED; 1495 // State, which is also protected by `targetExecutor`'s mutex. 1496 1497 ListLink<XThreadEvent> replyLink; 1498 // Membership in `replyExecutor`'s reply list. Protected by `replyExecutor`'s mutex. The 1499 // executing thread places the event in the reply list near the end of the `EXECUTING` state. 1500 // Because the thread cannot lock two mutexes at once, it's possible that the reply executor 1501 // will receive the reply while the event is still listed in the EXECUTING state, but it can 1502 // ignore the state and proceed with the result. 1503 1504 OnReadyEvent onReadyEvent; 1505 // Accessed only in requesting thread. 1506 1507 friend class kj::Executor; 1508 1509 void done(); 1510 // Sets the state to `DONE` and notifies the originating thread that this event is done. Do NOT 1511 // call under lock. 1512 1513 void sendReply(); 1514 // Notifies the originating thread that this event is done, but doesn't set the state to DONE 1515 // yet. Do NOT call under lock. 1516 1517 void setDoneState(); 1518 // Assigns `state` to `DONE`, being careful to use an atomic-release-store if needed. This must 1519 // only be called in the destination thread, and must either be called under lock, or the thread 1520 // must take the lock and release it again shortly after setting the state (because some threads 1521 // may be waiting on the DONE state using a conditional wait on the mutex). After calling 1522 // setDoneState(), the destination thread MUST NOT touch this object ever again; it now belongs 1523 // solely to the requesting thread. 1524 1525 void setDisconnected(); 1526 // Sets the result to a DISCONNECTED exception indicating that the target event loop exited. 1527 1528 class DelayedDoneHack; 1529 1530 // implements Event ---------------------------------------------------------- 1531 Maybe<Own<Event>> fire() override; 1532 // If called with promiseNode == nullptr, it's time to call execute(). If promiseNode != nullptr, 1533 // then it just indicated readiness and we need to get its result. 1534 1535 void traceEvent(TraceBuilder& builder) override; 1536 }; 1537 1538 template <typename Func, typename = _::FixVoid<_::ReturnType<Func, void>>> 1539 class XThreadEventImpl final: public XThreadEvent { 1540 // Implementation for a function that does not return a Promise. 1541 public: 1542 XThreadEventImpl(Func&& func, const Executor& target) 1543 : XThreadEvent(result, target, GetFunctorStartAddress<>::apply(func)), 1544 func(kj::fwd<Func>(func)) {} 1545 ~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); } 1546 1547 typedef _::FixVoid<_::ReturnType<Func, void>> ResultT; 1548 1549 kj::Maybe<Own<_::PromiseNode>> execute() override { 1550 result.value = MaybeVoidCaller<Void, FixVoid<decltype(func())>>::apply(func, Void()); 1551 return nullptr; 1552 } 1553 1554 // implements PromiseNode ---------------------------------------------------- 1555 void get(ExceptionOrValue& output) noexcept override { 1556 output.as<ResultT>() = kj::mv(result); 1557 } 1558 1559 private: 1560 Func func; 1561 ExceptionOr<ResultT> result; 1562 friend Executor; 1563 }; 1564 1565 template <typename Func, typename T> 1566 class XThreadEventImpl<Func, Promise<T>> final: public XThreadEvent { 1567 // Implementation for a function that DOES return a Promise. 1568 public: 1569 XThreadEventImpl(Func&& func, const Executor& target) 1570 : XThreadEvent(result, target, GetFunctorStartAddress<>::apply(func)), 1571 func(kj::fwd<Func>(func)) {} 1572 ~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); } 1573 1574 typedef _::FixVoid<_::UnwrapPromise<PromiseForResult<Func, void>>> ResultT; 1575 1576 kj::Maybe<Own<_::PromiseNode>> execute() override { 1577 auto result = _::PromiseNode::from(func()); 1578 KJ_IREQUIRE(result.get() != nullptr); 1579 return kj::mv(result); 1580 } 1581 1582 // implements PromiseNode ---------------------------------------------------- 1583 void get(ExceptionOrValue& output) noexcept override { 1584 output.as<ResultT>() = kj::mv(result); 1585 } 1586 1587 private: 1588 Func func; 1589 ExceptionOr<ResultT> result; 1590 friend Executor; 1591 }; 1592 1593 } // namespace _ (private) 1594 1595 template <typename Func> 1596 _::UnwrapPromise<PromiseForResult<Func, void>> Executor::executeSync(Func&& func) const { 1597 _::XThreadEventImpl<Func> event(kj::fwd<Func>(func), *this); 1598 send(event, true); 1599 return convertToReturn(kj::mv(event.result)); 1600 } 1601 1602 template <typename Func> 1603 PromiseForResult<Func, void> Executor::executeAsync(Func&& func) const { 1604 auto event = kj::heap<_::XThreadEventImpl<Func>>(kj::fwd<Func>(func), *this); 1605 send(*event, false); 1606 return _::PromiseNode::to<PromiseForResult<Func, void>>(kj::mv(event)); 1607 } 1608 1609 // ----------------------------------------------------------------------------- 1610 1611 namespace _ { // (private) 1612 1613 template <typename T> 1614 class XThreadFulfiller; 1615 1616 class XThreadPaf: public PromiseNode { 1617 public: 1618 XThreadPaf(); 1619 virtual ~XThreadPaf() noexcept(false); 1620 1621 class Disposer: public kj::Disposer { 1622 public: 1623 void disposeImpl(void* pointer) const override; 1624 }; 1625 static const Disposer DISPOSER; 1626 1627 // implements PromiseNode ---------------------------------------------------- 1628 void onReady(Event* event) noexcept override; 1629 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; 1630 1631 private: 1632 enum { 1633 WAITING, 1634 // Not yet fulfilled, and the waiter is still waiting. 1635 // 1636 // Starting from this state, the state may transition to either FULFILLING or CANCELED 1637 // using an atomic compare-and-swap. 1638 1639 FULFILLING, 1640 // The fulfiller thread atomically transitions the state from WAITING to FULFILLING when it 1641 // wishes to fulfill the promise. By doing so, it guarantees that the `executor` will not 1642 // disappear out from under it. It then fills in the result value, locks the executor mutex, 1643 // adds the object to the executor's list of fulfilled XThreadPafs, changes the state to 1644 // FULFILLED, and finally unlocks the mutex. 1645 // 1646 // If the waiting thread tries to cancel but discovers the object in this state, then it 1647 // must perform a conditional wait on the executor mutex to await the state becoming FULFILLED. 1648 // It can then delete the object. 1649 1650 FULFILLED, 1651 // The fulfilling thread has completed filling in the result value and inserting the object 1652 // into the waiting thread's executor event queue. Moreover, the fulfilling thread no longer 1653 // holds any pointers to this object. The waiting thread is responsible for deleting it. 1654 1655 DISPATCHED, 1656 // The object reached FULFILLED state, and then was dispatched from the waiting thread's 1657 // executor's event queue. Therefore, the object is completely owned by the waiting thread with 1658 // no need to lock anything. 1659 1660 CANCELED 1661 // The waiting thread atomically transitions the state from WAITING to CANCELED if it is no 1662 // longer listening. In this state, it is the fulfiller thread's responsibility to destroy the 1663 // object. 1664 } state; 1665 1666 const Executor& executor; 1667 // Executor of the waiting thread. Only guaranteed to be valid when state is `WAITING` or 1668 // `FULFILLING`. After any other state has been reached, this reference may be invalidated. 1669 1670 ListLink<XThreadPaf> link; 1671 // In the FULFILLING/FULFILLED states, the object is placed in a linked list within the waiting 1672 // thread's executor. In those states, these pointers are guarded by said executor's mutex. 1673 1674 OnReadyEvent onReadyEvent; 1675 1676 class FulfillScope; 1677 1678 static kj::Exception unfulfilledException(); 1679 // Construct appropriate exception to use to reject an unfulfilled XThreadPaf. 1680 1681 template <typename T> 1682 friend class XThreadFulfiller; 1683 friend Executor; 1684 }; 1685 1686 template <typename T> 1687 class XThreadPafImpl final: public XThreadPaf { 1688 public: 1689 // implements PromiseNode ---------------------------------------------------- 1690 void get(ExceptionOrValue& output) noexcept override { 1691 output.as<FixVoid<T>>() = kj::mv(result); 1692 } 1693 1694 private: 1695 ExceptionOr<FixVoid<T>> result; 1696 1697 friend class XThreadFulfiller<T>; 1698 }; 1699 1700 class XThreadPaf::FulfillScope { 1701 // Create on stack while setting `XThreadPafImpl<T>::result`. 1702 // 1703 // This ensures that: 1704 // - Only one call is carried out, even if multiple threads try to fulfill concurrently. 1705 // - The waiting thread is correctly signaled. 1706 public: 1707 FulfillScope(XThreadPaf** pointer); 1708 // Atomically nulls out *pointer and takes ownership of the pointer. 1709 1710 ~FulfillScope() noexcept(false); 1711 1712 KJ_DISALLOW_COPY(FulfillScope); 1713 1714 bool shouldFulfill() { return obj != nullptr; } 1715 1716 template <typename T> 1717 XThreadPafImpl<T>* getTarget() { return static_cast<XThreadPafImpl<T>*>(obj); } 1718 1719 private: 1720 XThreadPaf* obj; 1721 }; 1722 1723 template <typename T> 1724 class XThreadFulfiller final: public CrossThreadPromiseFulfiller<T> { 1725 public: 1726 XThreadFulfiller(XThreadPafImpl<T>* target): target(target) {} 1727 1728 ~XThreadFulfiller() noexcept(false) { 1729 if (target != nullptr) { 1730 reject(XThreadPaf::unfulfilledException()); 1731 } 1732 } 1733 void fulfill(FixVoid<T>&& value) const override { 1734 XThreadPaf::FulfillScope scope(&target); 1735 if (scope.shouldFulfill()) { 1736 scope.getTarget<T>()->result = kj::mv(value); 1737 } 1738 } 1739 void reject(Exception&& exception) const override { 1740 XThreadPaf::FulfillScope scope(&target); 1741 if (scope.shouldFulfill()) { 1742 scope.getTarget<T>()->result.addException(kj::mv(exception)); 1743 } 1744 } 1745 bool isWaiting() const override { 1746 KJ_IF_MAYBE(t, target) { 1747 #if _MSC_VER && !__clang__ 1748 // Just assume 1-byte loads are atomic... on what kind of absurd platform would they not be? 1749 return t->state == XThreadPaf::WAITING; 1750 #else 1751 return __atomic_load_n(&t->state, __ATOMIC_RELAXED) == XThreadPaf::WAITING; 1752 #endif 1753 } else { 1754 return false; 1755 } 1756 } 1757 1758 private: 1759 mutable XThreadPaf* target; // accessed using atomic ops 1760 }; 1761 1762 template <typename T> 1763 class XThreadFulfiller<kj::Promise<T>> { 1764 public: 1765 static_assert(sizeof(T) < 0, 1766 "newCrosssThreadPromiseAndFulfiller<Promise<T>>() is not currently supported"); 1767 // TODO(someday): Is this worth supporting? Presumably, when someone calls `fulfill(somePromise)`, 1768 // then `somePromise` should be assumed to be a promise owned by the fulfilling thread, not 1769 // the waiting thread. 1770 }; 1771 1772 } // namespace _ (private) 1773 1774 template <typename T> 1775 PromiseCrossThreadFulfillerPair<T> newPromiseAndCrossThreadFulfiller() { 1776 kj::Own<_::XThreadPafImpl<T>> node(new _::XThreadPafImpl<T>, _::XThreadPaf::DISPOSER); 1777 auto fulfiller = kj::heap<_::XThreadFulfiller<T>>(node); 1778 return { _::PromiseNode::to<_::ReducePromises<T>>(kj::mv(node)), kj::mv(fulfiller) }; 1779 } 1780 1781 } // namespace kj 1782 1783 #if KJ_HAS_COROUTINE 1784 1785 // ======================================================================================= 1786 // Coroutines TS integration with kj::Promise<T>. 1787 // 1788 // Here's a simple coroutine: 1789 // 1790 // Promise<Own<AsyncIoStream>> connectToService(Network& n) { 1791 // auto a = co_await n.parseAddress(IP, PORT); 1792 // auto c = co_await a->connect(); 1793 // co_return kj::mv(c); 1794 // } 1795 // 1796 // The presence of the co_await and co_return keywords tell the compiler it is a coroutine. 1797 // Although it looks similar to a function, it has a couple large differences. First, everything 1798 // that would normally live in the stack frame lives instead in a heap-based coroutine frame. 1799 // Second, the coroutine has the ability to return from its scope without deallocating this frame 1800 // (to suspend, in other words), and the ability to resume from its last suspension point. 1801 // 1802 // In order to know how to suspend, resume, and return from a coroutine, the compiler looks up a 1803 // coroutine implementation type via a traits class parameterized by the coroutine return and 1804 // parameter types. We'll name our coroutine implementation `kj::_::Coroutine<T>`, 1805 1806 namespace kj::_ { template <typename T> class Coroutine; } 1807 1808 // Specializing the appropriate traits class tells the compiler about `kj::_::Coroutine<T>`. 1809 1810 namespace KJ_COROUTINE_STD_NAMESPACE { 1811 1812 template <class T, class... Args> 1813 struct coroutine_traits<kj::Promise<T>, Args...> { 1814 // `Args...` are the coroutine's parameter types. 1815 1816 using promise_type = kj::_::Coroutine<T>; 1817 // The Coroutines TS calls this the "promise type". This makes sense when thinking of coroutines 1818 // returning `std::future<T>`, since the coroutine implementation would be a wrapper around 1819 // a `std::promise<T>`. It's extremely confusing from a KJ perspective, however, so I call it 1820 // the "coroutine implementation type" instead. 1821 }; 1822 1823 } // namespace KJ_COROUTINE_STD_NAMESPACE 1824 1825 // Now when the compiler sees our `connectToService()` coroutine above, it default-constructs a 1826 // `coroutine_traits<Promise<Own<AsyncIoStream>>, Network&>::promise_type`, or 1827 // `kj::_::Coroutine<Own<AsyncIoStream>>`. 1828 // 1829 // The implementation object lives in the heap-allocated coroutine frame. It gets destroyed and 1830 // deallocated when the frame does. 1831 1832 namespace kj::_ { 1833 1834 namespace stdcoro = KJ_COROUTINE_STD_NAMESPACE; 1835 1836 class CoroutineBase: public PromiseNode, 1837 public Event, 1838 public Disposer { 1839 public: 1840 CoroutineBase(stdcoro::coroutine_handle<> coroutine, ExceptionOrValue& resultRef); 1841 ~CoroutineBase() noexcept(false); 1842 KJ_DISALLOW_COPY(CoroutineBase); 1843 1844 auto initial_suspend() { return stdcoro::suspend_never(); } 1845 auto final_suspend() noexcept { return stdcoro::suspend_always(); } 1846 // These adjust the suspension behavior of coroutines immediately upon initiation, and immediately 1847 // after completion. 1848 // 1849 // The initial suspension point could allow us to defer the initial synchronous execution of a 1850 // coroutine -- everything before its first co_await, that is. 1851 // 1852 // The final suspension point is useful to delay deallocation of the coroutine frame to match the 1853 // lifetime of the enclosing promise. 1854 1855 void unhandled_exception(); 1856 1857 protected: 1858 class AwaiterBase; 1859 1860 bool isWaiting() { return waiting; } 1861 void scheduleResumption() { 1862 onReadyEvent.arm(); 1863 waiting = false; 1864 } 1865 1866 private: 1867 // ------------------------------------------------------- 1868 // PromiseNode implementation 1869 1870 void onReady(Event* event) noexcept override; 1871 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; 1872 1873 // ------------------------------------------------------- 1874 // Event implementation 1875 1876 Maybe<Own<Event>> fire() override; 1877 void traceEvent(TraceBuilder& builder) override; 1878 1879 // ------------------------------------------------------- 1880 // Disposer implementation 1881 1882 void disposeImpl(void* pointer) const override; 1883 void destroy(); 1884 1885 stdcoro::coroutine_handle<> coroutine; 1886 ExceptionOrValue& resultRef; 1887 1888 OnReadyEvent onReadyEvent; 1889 bool waiting = true; 1890 1891 Maybe<PromiseNode&> promiseNodeForTrace; 1892 // Whenever this coroutine is suspended waiting on another promise, we keep a reference to that 1893 // promise so tracePromise()/traceEvent() can trace into it. 1894 1895 UnwindDetector unwindDetector; 1896 1897 struct DisposalResults { 1898 bool destructorRan = false; 1899 Maybe<Exception> exception; 1900 }; 1901 Maybe<DisposalResults&> maybeDisposalResults; 1902 // Only non-null during destruction. Before calling coroutine.destroy(), our disposer sets this 1903 // to point to a DisposalResults on the stack so unhandled_exception() will have some place to 1904 // store unwind exceptions. We can't store them in this Coroutine, because we'll be destroyed once 1905 // coroutine.destroy() has returned. Our disposer then rethrows as needed. 1906 }; 1907 1908 template <typename Self, typename T> 1909 class CoroutineMixin; 1910 // CRTP mixin, covered later. 1911 1912 template <typename T> 1913 class Coroutine final: public CoroutineBase, 1914 public CoroutineMixin<Coroutine<T>, T> { 1915 // The standard calls this the `promise_type` object. We can call this the "coroutine 1916 // implementation object" since the word promise means different things in KJ and std styles. This 1917 // is where we implement how a `kj::Promise<T>` is returned from a coroutine, and how that promise 1918 // is later fulfilled. We also fill in a few lifetime-related details. 1919 // 1920 // The implementation object is also where we can customize memory allocation of coroutine frames, 1921 // by implementing a member `operator new(size_t, Args...)` (same `Args...` as in 1922 // coroutine_traits). 1923 // 1924 // We can also customize how await-expressions are transformed within `kj::Promise<T>`-based 1925 // coroutines by implementing an `await_transform(P)` member function, where `P` is some type for 1926 // which we want to implement co_await support, e.g. `kj::Promise<U>`. This feature allows us to 1927 // provide an optimized `kj::EventLoop` integration when the coroutine's return type and the 1928 // await-expression's type are both `kj::Promise` instantiations -- see further comments under 1929 // `await_transform()`. 1930 1931 public: 1932 using Handle = stdcoro::coroutine_handle<Coroutine<T>>; 1933 1934 Coroutine(): CoroutineBase(Handle::from_promise(*this), result) {} 1935 1936 Promise<T> get_return_object() { 1937 // Called after coroutine frame construction and before initial_suspend() to create the 1938 // coroutine's return object. `this` itself lives inside the coroutine frame, and we arrange for 1939 // the returned Promise<T> to own `this` via a custom Disposer and by always leaving the 1940 // coroutine in a suspended state. 1941 return PromiseNode::to<Promise<T>>(Own<PromiseNode>(this, *this)); 1942 } 1943 1944 public: 1945 template <typename U> 1946 class Awaiter; 1947 1948 template <typename U> 1949 Awaiter<U> await_transform(kj::Promise<U>& promise) { return Awaiter<U>(kj::mv(promise)); } 1950 template <typename U> 1951 Awaiter<U> await_transform(kj::Promise<U>&& promise) { return Awaiter<U>(kj::mv(promise)); } 1952 // Called when someone writes `co_await promise`, where `promise` is a kj::Promise<U>. We return 1953 // an Awaiter<U>, which implements coroutine suspension and resumption in terms of the KJ async 1954 // event system. 1955 // 1956 // There is another hook we could implement: an `operator co_await()` free function. However, a 1957 // free function would be unaware of the type of the enclosing coroutine. Since Awaiter<U> is a 1958 // member class template of Coroutine<T>, it is able to implement an 1959 // `await_suspend(Coroutine<T>::Handle)` override, providing it type-safe access to our enclosing 1960 // coroutine's PromiseNode. An `operator co_await()` free function would have to implement 1961 // a type-erased `await_suspend(stdcoro::coroutine_handle<void>)` override, and implement 1962 // suspension and resumption in terms of .then(). Yuck! 1963 1964 private: 1965 // ------------------------------------------------------- 1966 // PromiseNode implementation 1967 1968 void get(ExceptionOrValue& output) noexcept override { 1969 output.as<FixVoid<T>>() = kj::mv(result); 1970 } 1971 1972 void fulfill(FixVoid<T>&& value) { 1973 // Called by the return_value()/return_void() functions in our mixin class. 1974 1975 if (isWaiting()) { 1976 result = kj::mv(value); 1977 scheduleResumption(); 1978 } 1979 } 1980 1981 ExceptionOr<FixVoid<T>> result; 1982 1983 friend class CoroutineMixin<Coroutine<T>, T>; 1984 }; 1985 1986 template <typename Self, typename T> 1987 class CoroutineMixin { 1988 public: 1989 void return_value(T value) { 1990 static_cast<Self*>(this)->fulfill(kj::mv(value)); 1991 } 1992 }; 1993 template <typename Self> 1994 class CoroutineMixin<Self, void> { 1995 public: 1996 void return_void() { 1997 static_cast<Self*>(this)->fulfill(_::Void()); 1998 } 1999 }; 2000 // The Coroutines spec has no `_::FixVoid<T>` equivalent to unify valueful and valueless co_return 2001 // statements, and programs are ill-formed if the coroutine implementation object (Coroutine<T>) has 2002 // both a `return_value()` and `return_void()`. No amount of EnableIffery can get around it, so 2003 // these return_* functions live in a CRTP mixin. 2004 2005 class CoroutineBase::AwaiterBase { 2006 public: 2007 explicit AwaiterBase(Own<PromiseNode> node); 2008 AwaiterBase(AwaiterBase&&); 2009 ~AwaiterBase() noexcept(false); 2010 KJ_DISALLOW_COPY(AwaiterBase); 2011 2012 bool await_ready() const { return false; } 2013 // This could return "`node->get()` is safe to call" instead, which would make suspension-less 2014 // co_awaits possible for immediately-fulfilled promises. However, we need an Event to figure that 2015 // out, and we won't have access to the Coroutine Event until await_suspend() is called. So, we 2016 // must return false here. Fortunately, await_suspend() has a trick up its sleeve to enable 2017 // suspension-less co_awaits. 2018 2019 protected: 2020 void getImpl(ExceptionOrValue& result); 2021 bool awaitSuspendImpl(CoroutineBase& coroutineEvent); 2022 2023 private: 2024 UnwindDetector unwindDetector; 2025 Own<PromiseNode> node; 2026 2027 Maybe<CoroutineBase&> maybeCoroutineEvent; 2028 // If we do suspend waiting for our wrapped promise, we store a reference to `node` in our 2029 // enclosing Coroutine for tracing purposes. To guard against any edge cases where an async stack 2030 // trace is generated when an Awaiter was destroyed without Coroutine::fire() having been called, 2031 // we need our own reference to the enclosing Coroutine. (I struggle to think up any such 2032 // scenarios, but perhaps they could occur when destroying a suspended coroutine.) 2033 }; 2034 2035 template <typename T> 2036 template <typename U> 2037 class Coroutine<T>::Awaiter: public AwaiterBase { 2038 // Wrapper around a co_await'ed promise and some storage space for the result of that promise. 2039 // The compiler arranges to call our await_suspend() to suspend, which arranges to be woken up 2040 // when the awaited promise is settled. Once that happens, the enclosing coroutine's Event 2041 // implementation resumes the coroutine, which transitively calls await_resume() to unwrap the 2042 // awaited promise result. 2043 2044 public: 2045 explicit Awaiter(Promise<U> promise): AwaiterBase(PromiseNode::from(kj::mv(promise))) {} 2046 2047 U await_resume() { 2048 getImpl(result); 2049 auto value = kj::_::readMaybe(result.value); 2050 KJ_IASSERT(value != nullptr, "Neither exception nor value present."); 2051 return U(kj::mv(*value)); 2052 } 2053 2054 bool await_suspend(Coroutine::Handle coroutine) { 2055 return awaitSuspendImpl(coroutine.promise()); 2056 } 2057 2058 private: 2059 ExceptionOr<FixVoid<U>> result; 2060 }; 2061 2062 #undef KJ_COROUTINE_STD_NAMESPACE 2063 2064 } // namespace kj::_ (private) 2065 2066 #endif // KJ_HAS_COROUTINE 2067 2068 KJ_END_HEADER