async.c++ (93249B)
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #undef _FORTIFY_SOURCE 23 // If _FORTIFY_SOURCE is defined, longjmp will complain when it detects the stack 24 // pointer moving in the "wrong direction", thinking you're jumping to a non-existent 25 // stack frame. But we use longjmp to jump between different stacks to implement fibers, 26 // so this check isn't appropriate for us. 27 28 #if _WIN32 || __CYGWIN__ 29 #include "win32-api-version.h" 30 #elif __APPLE__ 31 // getcontext() and friends are marked deprecated on MacOS but seemingly no replacement is 32 // provided. It appears as if they deprecated it solely because the standards bodies deprecated it, 33 // which they seemingly did mainly because the proper sematics are too difficult for them to 34 // define. I doubt MacOS would actually remove these functions as they are widely used. But if they 35 // do, then I guess we'll need to fall back to using setjmp()/longjmp(), and some sort of hack 36 // involving sigaltstack() (and generating a fake signal I guess) in order to initialize the fiber 37 // in the first place. Or we could use assembly, I suppose. Either way, ick. 38 #pragma GCC diagnostic ignored "-Wdeprecated-declarations" 39 #define _XOPEN_SOURCE // Must be defined to see getcontext() on MacOS. 40 #endif 41 42 #include "async.h" 43 #include "debug.h" 44 #include "vector.h" 45 #include "threadlocal.h" 46 #include "mutex.h" 47 #include "one-of.h" 48 #include "function.h" 49 #include "list.h" 50 #include <deque> 51 52 #if _WIN32 || __CYGWIN__ 53 #include <windows.h> // for Sleep(0) and fibers 54 #include "windows-sanity.h" 55 #else 56 57 #if KJ_USE_FIBERS 58 #include <ucontext.h> 59 #include <setjmp.h> // for fibers 60 #endif 61 62 #include <sys/mman.h> // mmap(), for allocating new stacks 63 #include <unistd.h> // sysconf() 64 #include <errno.h> 65 #endif 66 67 #if !_WIN32 68 #include <sched.h> // just for sched_yield() 69 #endif 70 71 #if !KJ_NO_RTTI 72 #include <typeinfo> 73 #if __GNUC__ 74 #include <cxxabi.h> 75 #endif 76 #endif 77 78 #include <stdlib.h> 79 80 #if _MSC_VER && !__clang__ 81 // MSVC's atomic intrinsics are weird and different, whereas the C++ standard atomics match the GCC 82 // builtins -- except for requiring the obnoxious std::atomic<T> wrapper. So, on MSVC let's just 83 // #define the builtins based on the C++ library, reinterpret-casting native types to 84 // std::atomic... this is cheating but ugh, whatever. 85 #include <atomic> 86 template <typename T> 87 static std::atomic<T>* reinterpretAtomic(T* ptr) { return reinterpret_cast<std::atomic<T>*>(ptr); } 88 #define __atomic_store_n(ptr, val, order) \ 89 std::atomic_store_explicit(reinterpretAtomic(ptr), val, order) 90 #define __atomic_load_n(ptr, order) \ 91 std::atomic_load_explicit(reinterpretAtomic(ptr), order) 92 #define __atomic_compare_exchange_n(ptr, expected, desired, weak, succ, fail) \ 93 std::atomic_compare_exchange_strong_explicit( \ 94 reinterpretAtomic(ptr), expected, desired, succ, fail) 95 #define __atomic_exchange_n(ptr, val, order) \ 96 std::atomic_exchange_explicit(reinterpretAtomic(ptr), val, order) 97 #define __ATOMIC_RELAXED std::memory_order_relaxed 98 #define __ATOMIC_ACQUIRE std::memory_order_acquire 99 #define __ATOMIC_RELEASE std::memory_order_release 100 #endif 101 102 namespace kj { 103 104 namespace { 105 106 KJ_THREADLOCAL_PTR(EventLoop) threadLocalEventLoop = nullptr; 107 108 #define _kJ_ALREADY_READY reinterpret_cast< ::kj::_::Event*>(1) 109 110 EventLoop& currentEventLoop() { 111 EventLoop* loop = threadLocalEventLoop; 112 KJ_REQUIRE(loop != nullptr, "No event loop is running on this thread."); 113 return *loop; 114 } 115 116 class RootEvent: public _::Event { 117 public: 118 RootEvent(_::PromiseNode* node, void* traceAddr): node(node), traceAddr(traceAddr) {} 119 120 bool fired = false; 121 122 Maybe<Own<_::Event>> fire() override { 123 fired = true; 124 return nullptr; 125 } 126 127 void traceEvent(_::TraceBuilder& builder) override { 128 node->tracePromise(builder, true); 129 builder.add(traceAddr); 130 } 131 132 private: 133 _::PromiseNode* node; 134 void* traceAddr; 135 }; 136 137 struct DummyFunctor { 138 void operator()() {}; 139 }; 140 141 class YieldPromiseNode final: public _::PromiseNode { 142 public: 143 void onReady(_::Event* event) noexcept override { 144 if (event) event->armBreadthFirst(); 145 } 146 void get(_::ExceptionOrValue& output) noexcept override { 147 output.as<_::Void>() = _::Void(); 148 } 149 void tracePromise(_::TraceBuilder& builder, bool stopAtNextEvent) override { 150 builder.add(reinterpret_cast<void*>(&kj::evalLater<DummyFunctor>)); 151 } 152 }; 153 154 class YieldHarderPromiseNode final: public _::PromiseNode { 155 public: 156 void onReady(_::Event* event) noexcept override { 157 if (event) event->armLast(); 158 } 159 void get(_::ExceptionOrValue& output) noexcept override { 160 output.as<_::Void>() = _::Void(); 161 } 162 void tracePromise(_::TraceBuilder& builder, bool stopAtNextEvent) override { 163 builder.add(reinterpret_cast<void*>(&kj::evalLast<DummyFunctor>)); 164 } 165 }; 166 167 class NeverDonePromiseNode final: public _::PromiseNode { 168 public: 169 void onReady(_::Event* event) noexcept override { 170 // ignore 171 } 172 void get(_::ExceptionOrValue& output) noexcept override { 173 KJ_FAIL_REQUIRE("Not ready."); 174 } 175 void tracePromise(_::TraceBuilder& builder, bool stopAtNextEvent) override { 176 builder.add(_::getMethodStartAddress(kj::NEVER_DONE, &_::NeverDone::wait)); 177 } 178 }; 179 180 } // namespace 181 182 // ======================================================================================= 183 184 void END_CANCELER_STACK_START_CANCELEE_STACK() {} 185 // Dummy symbol used when reporting how a Canceler was canceled. We end up combining two stack 186 // traces into one and we use this as a separator. 187 188 Canceler::~Canceler() noexcept(false) { 189 if (isEmpty()) return; 190 cancel(getDestructionReason( 191 reinterpret_cast<void*>(&END_CANCELER_STACK_START_CANCELEE_STACK), 192 Exception::Type::DISCONNECTED, __FILE__, __LINE__, "operation canceled"_kj)); 193 } 194 195 void Canceler::cancel(StringPtr cancelReason) { 196 if (isEmpty()) return; 197 // We can't use getDestructionReason() here because if an exception is in-flight, it would use 198 // that exception, totally discarding the reason given by the caller. This would probably be 199 // unexpected. The caller can always use getDestructionReason() themselves if desired. 200 cancel(Exception(Exception::Type::DISCONNECTED, __FILE__, __LINE__, kj::str(cancelReason))); 201 } 202 203 void Canceler::cancel(const Exception& exception) { 204 for (;;) { 205 KJ_IF_MAYBE(a, list) { 206 a->unlink(); 207 a->cancel(kj::cp(exception)); 208 } else { 209 break; 210 } 211 } 212 } 213 214 void Canceler::release() { 215 for (;;) { 216 KJ_IF_MAYBE(a, list) { 217 a->unlink(); 218 } else { 219 break; 220 } 221 } 222 } 223 224 Canceler::AdapterBase::AdapterBase(Canceler& canceler) 225 : prev(canceler.list), 226 next(canceler.list) { 227 canceler.list = *this; 228 KJ_IF_MAYBE(n, next) { 229 n->prev = next; 230 } 231 } 232 233 Canceler::AdapterBase::~AdapterBase() noexcept(false) { 234 unlink(); 235 } 236 237 void Canceler::AdapterBase::unlink() { 238 KJ_IF_MAYBE(p, prev) { 239 *p = next; 240 } 241 KJ_IF_MAYBE(n, next) { 242 n->prev = prev; 243 } 244 next = nullptr; 245 prev = nullptr; 246 } 247 248 Canceler::AdapterImpl<void>::AdapterImpl(kj::PromiseFulfiller<void>& fulfiller, 249 Canceler& canceler, kj::Promise<void> inner) 250 : AdapterBase(canceler), 251 fulfiller(fulfiller), 252 inner(inner.then( 253 [&fulfiller]() { fulfiller.fulfill(); }, 254 [&fulfiller](kj::Exception&& e) { fulfiller.reject(kj::mv(e)); }) 255 .eagerlyEvaluate(nullptr)) {} 256 257 void Canceler::AdapterImpl<void>::cancel(kj::Exception&& e) { 258 fulfiller.reject(kj::mv(e)); 259 inner = nullptr; 260 } 261 262 // ======================================================================================= 263 264 TaskSet::TaskSet(TaskSet::ErrorHandler& errorHandler) 265 : errorHandler(errorHandler) {} 266 class TaskSet::Task final: public _::Event { 267 public: 268 Task(TaskSet& taskSet, Own<_::PromiseNode>&& nodeParam) 269 : taskSet(taskSet), node(kj::mv(nodeParam)) { 270 node->setSelfPointer(&node); 271 node->onReady(this); 272 } 273 274 Own<Task> pop() { 275 KJ_IF_MAYBE(n, next) { n->get()->prev = prev; } 276 Own<Task> self = kj::mv(KJ_ASSERT_NONNULL(*prev)); 277 KJ_ASSERT(self.get() == this); 278 *prev = kj::mv(next); 279 next = nullptr; 280 prev = nullptr; 281 return self; 282 } 283 284 Maybe<Own<Task>> next; 285 Maybe<Own<Task>>* prev = nullptr; 286 287 kj::String trace() { 288 void* space[32]; 289 _::TraceBuilder builder(space); 290 node->tracePromise(builder, false); 291 return kj::str("task: ", builder); 292 } 293 294 protected: 295 Maybe<Own<Event>> fire() override { 296 // Get the result. 297 _::ExceptionOr<_::Void> result; 298 node->get(result); 299 300 // Delete the node, catching any exceptions. 301 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() { 302 node = nullptr; 303 })) { 304 result.addException(kj::mv(*exception)); 305 } 306 307 // Call the error handler if there was an exception. 308 KJ_IF_MAYBE(e, result.exception) { 309 taskSet.errorHandler.taskFailed(kj::mv(*e)); 310 } 311 312 // Remove from the task list. 313 auto self = pop(); 314 315 KJ_IF_MAYBE(f, taskSet.emptyFulfiller) { 316 if (taskSet.tasks == nullptr) { 317 f->get()->fulfill(); 318 taskSet.emptyFulfiller = nullptr; 319 } 320 } 321 322 return mv(self); 323 } 324 325 void traceEvent(_::TraceBuilder& builder) override { 326 // Pointing out the ErrorHandler's taskFailed() implementation will usually identify the 327 // particular TaskSet that contains this event. 328 builder.add(_::getMethodStartAddress(taskSet.errorHandler, &ErrorHandler::taskFailed)); 329 } 330 331 private: 332 TaskSet& taskSet; 333 Own<_::PromiseNode> node; 334 }; 335 336 TaskSet::~TaskSet() noexcept(false) { 337 // You could argue it is dubious, but some applications would like for the destructor of a 338 // task to be able to schedule new tasks. So when we cancel our tasks... we might find new 339 // tasks added! We'll have to repeatedly cancel. Additionally, we need to make sure that we destroy 340 // the items in a loop to prevent any issues with stack overflow. 341 while (tasks != nullptr) { 342 auto removed = KJ_REQUIRE_NONNULL(tasks)->pop(); 343 } 344 } 345 346 void TaskSet::add(Promise<void>&& promise) { 347 auto task = heap<Task>(*this, _::PromiseNode::from(kj::mv(promise))); 348 KJ_IF_MAYBE(head, tasks) { 349 head->get()->prev = &task->next; 350 task->next = kj::mv(tasks); 351 } 352 task->prev = &tasks; 353 tasks = kj::mv(task); 354 } 355 356 kj::String TaskSet::trace() { 357 kj::Vector<kj::String> traces; 358 359 Maybe<Own<Task>>* ptr = &tasks; 360 for (;;) { 361 KJ_IF_MAYBE(task, *ptr) { 362 traces.add(task->get()->trace()); 363 ptr = &task->get()->next; 364 } else { 365 break; 366 } 367 } 368 369 return kj::strArray(traces, "\n"); 370 } 371 372 Promise<void> TaskSet::onEmpty() { 373 KJ_IF_MAYBE(fulfiller, emptyFulfiller) { 374 if (fulfiller->get()->isWaiting()) { 375 KJ_FAIL_REQUIRE("onEmpty() can only be called once at a time"); 376 } 377 } 378 379 if (tasks == nullptr) { 380 return READY_NOW; 381 } else { 382 auto paf = newPromiseAndFulfiller<void>(); 383 emptyFulfiller = kj::mv(paf.fulfiller); 384 return kj::mv(paf.promise); 385 } 386 } 387 388 // ======================================================================================= 389 390 namespace { 391 392 #if _WIN32 || __CYGWIN__ 393 thread_local void* threadMainFiber = nullptr; 394 395 void* getMainWin32Fiber() { 396 return threadMainFiber; 397 } 398 #endif 399 400 inline void ensureThreadCanRunFibers() { 401 #if _WIN32 || __CYGWIN__ 402 // Make sure the current thread has been converted to a fiber. 403 void* fiber = threadMainFiber; 404 if (fiber == nullptr) { 405 // Thread not initialized. Convert it to a fiber now. 406 // Note: Unfortunately, if the application has already converted the thread to a fiber, I 407 // guess this will fail. But trying to call GetCurrentFiber() when the thread isn't a fiber 408 // doesn't work (it returns null on WINE but not on real windows, ugh). So I guess we're 409 // just incompatible with the application doing anything with fibers, which is sad. 410 threadMainFiber = fiber = ConvertThreadToFiber(nullptr); 411 } 412 #endif 413 } 414 415 } // namespace 416 417 namespace _ { 418 419 class FiberStack final { 420 // A class containing a fiber stack impl. This is separate from fiber 421 // promises since it lets us move the stack itself around and reuse it. 422 423 public: 424 FiberStack(size_t stackSize); 425 ~FiberStack() noexcept(false); 426 427 struct SynchronousFunc { 428 kj::FunctionParam<void()>& func; 429 kj::Maybe<kj::Exception> exception; 430 }; 431 432 void initialize(FiberBase& fiber); 433 void initialize(SynchronousFunc& syncFunc); 434 435 void reset() { 436 main = {}; 437 } 438 439 void switchToFiber(); 440 void switchToMain(); 441 442 void trace(TraceBuilder& builder) { 443 // TODO(someday): Trace through fiber stack? Can it be done??? 444 builder.add(getMethodStartAddress(*this, &FiberStack::trace)); 445 } 446 447 private: 448 size_t stackSize; 449 OneOf<FiberBase*, SynchronousFunc*> main; 450 451 friend class FiberBase; 452 friend class FiberPool::Impl; 453 454 struct StartRoutine; 455 456 #if KJ_USE_FIBERS 457 #if _WIN32 || __CYGWIN__ 458 void* osFiber; 459 #else 460 struct Impl; 461 Impl* impl; 462 #endif 463 #endif 464 465 [[noreturn]] void run(); 466 467 bool isReset() { return main == nullptr; } 468 }; 469 470 } // namespace _ 471 472 #if __linux__ 473 // TODO(someday): Support core-local freelists on OSs other than Linux. The only tricky part is 474 // finding what to use instead of sched_getcpu() to get the current CPU ID. 475 #define USE_CORE_LOCAL_FREELISTS 1 476 #endif 477 478 #if USE_CORE_LOCAL_FREELISTS 479 static const size_t CACHE_LINE_SIZE = 64; 480 // Most modern architectures have 64-byte cache lines. 481 #endif 482 483 class FiberPool::Impl final: private Disposer { 484 public: 485 Impl(size_t stackSize): stackSize(stackSize) {} 486 ~Impl() noexcept(false) { 487 #if USE_CORE_LOCAL_FREELISTS 488 if (coreLocalFreelists != nullptr) { 489 KJ_DEFER(free(coreLocalFreelists)); 490 491 for (uint i: kj::zeroTo(nproc)) { 492 for (auto stack: coreLocalFreelists[i].stacks) { 493 if (stack != nullptr) { 494 delete stack; 495 } 496 } 497 } 498 } 499 #endif 500 501 // Make sure we're not leaking anything from the global freelist either. 502 auto lock = freelist.lockExclusive(); 503 auto dangling = kj::mv(*lock); 504 for (auto& stack: dangling) { 505 delete stack; 506 } 507 } 508 509 void setMaxFreelist(size_t count) { 510 maxFreelist = count; 511 } 512 513 size_t getFreelistSize() const { 514 return freelist.lockShared()->size(); 515 } 516 517 void useCoreLocalFreelists() { 518 #if USE_CORE_LOCAL_FREELISTS 519 if (coreLocalFreelists != nullptr) { 520 // Ignore repeat call. 521 return; 522 } 523 524 int nproc_; 525 KJ_SYSCALL(nproc_ = sysconf(_SC_NPROCESSORS_CONF)); 526 nproc = nproc_; 527 528 void* allocPtr; 529 size_t totalSize = nproc * sizeof(CoreLocalFreelist); 530 int error = posix_memalign(&allocPtr, CACHE_LINE_SIZE, totalSize); 531 if (error != 0) { 532 KJ_FAIL_SYSCALL("posix_memalign", error); 533 } 534 memset(allocPtr, 0, totalSize); 535 coreLocalFreelists = reinterpret_cast<CoreLocalFreelist*>(allocPtr); 536 #endif 537 } 538 539 Own<_::FiberStack> takeStack() const { 540 // Get a stack from the pool. The disposer on the returned Own pointer will return the stack 541 // to the pool, provided that reset() has been called to indicate that the stack is not in 542 // a weird state. 543 544 #if USE_CORE_LOCAL_FREELISTS 545 KJ_IF_MAYBE(core, lookupCoreLocalFreelist()) { 546 for (auto& stackPtr: core->stacks) { 547 _::FiberStack* result = __atomic_exchange_n(&stackPtr, nullptr, __ATOMIC_ACQUIRE); 548 if (result != nullptr) { 549 // Found a stack in this slot! 550 return { result, *this }; 551 } 552 } 553 // No stacks found, fall back to global freelist. 554 } 555 #endif 556 557 { 558 auto lock = freelist.lockExclusive(); 559 if (!lock->empty()) { 560 _::FiberStack* result = lock->back(); 561 lock->pop_back(); 562 return { result, *this }; 563 } 564 } 565 566 _::FiberStack* result = new _::FiberStack(stackSize); 567 return { result, *this }; 568 } 569 570 private: 571 size_t stackSize; 572 size_t maxFreelist = kj::maxValue; 573 MutexGuarded<std::deque<_::FiberStack*>> freelist; 574 575 #if USE_CORE_LOCAL_FREELISTS 576 struct CoreLocalFreelist { 577 union { 578 _::FiberStack* stacks[2]; 579 // For now, we don't try to freelist more than 2 stacks per core. If you have three or more 580 // threads interleaved on a core, chances are you have bigger problems... 581 582 byte padToCacheLine[CACHE_LINE_SIZE]; 583 // We don't want two core-local freelists to live in the same cache line, otherwise the 584 // cores will fight over ownership of that line. 585 }; 586 }; 587 588 uint nproc; 589 CoreLocalFreelist* coreLocalFreelists = nullptr; 590 591 kj::Maybe<CoreLocalFreelist&> lookupCoreLocalFreelist() const { 592 if (coreLocalFreelists == nullptr) { 593 return nullptr; 594 } else { 595 int cpu = sched_getcpu(); 596 if (cpu >= 0) { 597 // TODO(perf): Perhaps two hyperthreads on the same physical core should share a freelist? 598 // But I don't know how to find out if the system uses hyperthreading. 599 return coreLocalFreelists[cpu]; 600 } else { 601 static bool logged = false; 602 if (!logged) { 603 KJ_LOG(ERROR, "invalid cpu number from sched_getcpu()?", cpu, nproc); 604 logged = true; 605 } 606 return nullptr; 607 } 608 } 609 } 610 #endif 611 612 void disposeImpl(void* pointer) const { 613 _::FiberStack* stack = reinterpret_cast<_::FiberStack*>(pointer); 614 KJ_DEFER(delete stack); 615 616 // Verify that the stack was reset before returning, otherwise it might be in a weird state 617 // where we don't want to reuse it. 618 if (stack->isReset()) { 619 #if USE_CORE_LOCAL_FREELISTS 620 KJ_IF_MAYBE(core, lookupCoreLocalFreelist()) { 621 for (auto& stackPtr: core->stacks) { 622 stack = __atomic_exchange_n(&stackPtr, stack, __ATOMIC_RELEASE); 623 if (stack == nullptr) { 624 // Cool, we inserted the stack into an unused slot. We're done. 625 return; 626 } 627 } 628 // All slots were occupied, so we inserted the new stack in the front, pushed the rest back, 629 // and now `stack` refers to the stack that fell off the end of the core-local list. That 630 // needs to go into the global freelist. 631 } 632 #endif 633 634 auto lock = freelist.lockExclusive(); 635 lock->push_back(stack); 636 if (lock->size() > maxFreelist) { 637 stack = lock->front(); 638 lock->pop_front(); 639 } else { 640 stack = nullptr; 641 } 642 } 643 } 644 }; 645 646 FiberPool::FiberPool(size_t stackSize) : impl(kj::heap<FiberPool::Impl>(stackSize)) {} 647 FiberPool::~FiberPool() noexcept(false) {} 648 649 void FiberPool::setMaxFreelist(size_t count) { 650 impl->setMaxFreelist(count); 651 } 652 653 size_t FiberPool::getFreelistSize() const { 654 return impl->getFreelistSize(); 655 } 656 657 void FiberPool::useCoreLocalFreelists() { 658 impl->useCoreLocalFreelists(); 659 } 660 661 void FiberPool::runSynchronously(kj::FunctionParam<void()> func) const { 662 ensureThreadCanRunFibers(); 663 664 _::FiberStack::SynchronousFunc syncFunc { func, nullptr }; 665 666 { 667 auto stack = impl->takeStack(); 668 stack->initialize(syncFunc); 669 stack->switchToFiber(); 670 stack->reset(); // safe to reuse 671 } 672 673 KJ_IF_MAYBE(e, syncFunc.exception) { 674 kj::throwRecoverableException(kj::mv(*e)); 675 } 676 } 677 678 namespace _ { // private 679 680 class LoggingErrorHandler: public TaskSet::ErrorHandler { 681 public: 682 static LoggingErrorHandler instance; 683 684 void taskFailed(kj::Exception&& exception) override { 685 KJ_LOG(ERROR, "Uncaught exception in daemonized task.", exception); 686 } 687 }; 688 689 LoggingErrorHandler LoggingErrorHandler::instance = LoggingErrorHandler(); 690 691 } // namespace _ (private) 692 693 // ======================================================================================= 694 695 struct Executor::Impl { 696 Impl(EventLoop& loop): state(loop) {} 697 698 struct State { 699 // Queues of notifications from other threads that need this thread's attention. 700 701 State(EventLoop& loop): loop(loop) {} 702 703 kj::Maybe<EventLoop&> loop; 704 // Becomes null when the loop is destroyed. 705 706 List<_::XThreadEvent, &_::XThreadEvent::targetLink> start; 707 List<_::XThreadEvent, &_::XThreadEvent::targetLink> cancel; 708 List<_::XThreadEvent, &_::XThreadEvent::replyLink> replies; 709 // Lists of events that need actioning by this thread. 710 711 List<_::XThreadEvent, &_::XThreadEvent::targetLink> executing; 712 // Events that have already been dispatched and are happily executing. This list is maintained 713 // so that they can be canceled if the event loop exits. 714 715 List<_::XThreadPaf, &_::XThreadPaf::link> fulfilled; 716 // Set of XThreadPafs that have been fulfilled by another thread. 717 718 bool waitingForCancel = false; 719 // True if this thread is currently blocked waiting for some other thread to pump its 720 // cancellation queue. If that other thread tries to block on *this* thread, then it could 721 // deadlock -- it must take precautions against this. 722 723 bool isDispatchNeeded() const { 724 return !start.empty() || !cancel.empty() || !replies.empty() || !fulfilled.empty(); 725 } 726 727 void dispatchAll(Vector<_::XThreadEvent*>& eventsToCancelOutsideLock) { 728 for (auto& event: start) { 729 start.remove(event); 730 executing.add(event); 731 event.state = _::XThreadEvent::EXECUTING; 732 event.armBreadthFirst(); 733 } 734 735 dispatchCancels(eventsToCancelOutsideLock); 736 737 for (auto& event: replies) { 738 replies.remove(event); 739 event.onReadyEvent.armBreadthFirst(); 740 } 741 742 for (auto& event: fulfilled) { 743 fulfilled.remove(event); 744 event.state = _::XThreadPaf::DISPATCHED; 745 event.onReadyEvent.armBreadthFirst(); 746 } 747 } 748 749 void dispatchCancels(Vector<_::XThreadEvent*>& eventsToCancelOutsideLock) { 750 for (auto& event: cancel) { 751 cancel.remove(event); 752 753 if (event.promiseNode == nullptr) { 754 event.setDoneState(); 755 } else { 756 // We can't destroy the promiseNode while the mutex is locked, because we don't know 757 // what the destructor might do. But, we *must* destroy it before acknowledging 758 // cancellation. So we have to add it to a list to destroy later. 759 eventsToCancelOutsideLock.add(&event); 760 } 761 } 762 } 763 }; 764 765 kj::MutexGuarded<State> state; 766 // After modifying state from another thread, the loop's port.wake() must be called. 767 768 void processAsyncCancellations(Vector<_::XThreadEvent*>& eventsToCancelOutsideLock) { 769 // After calling dispatchAll() or dispatchCancels() with the lock held, it may be that some 770 // cancellations require dropping the lock before destroying the promiseNode. In that case 771 // those cancellations will be added to the eventsToCancelOutsideLock Vector passed to the 772 // method. That vector must then be passed to processAsyncCancellations() as soon as the lock 773 // is released. 774 775 for (auto& event: eventsToCancelOutsideLock) { 776 event->promiseNode = nullptr; 777 event->disarm(); 778 } 779 780 // Now we need to mark all the events "done" under lock. 781 auto lock = state.lockExclusive(); 782 for (auto& event: eventsToCancelOutsideLock) { 783 event->setDoneState(); 784 } 785 } 786 787 void disconnect() { 788 state.lockExclusive()->loop = nullptr; 789 790 // Now that `loop` is set null in `state`, other threads will no longer try to manipulate our 791 // lists, so we can access them without a lock. That's convenient because a bunch of the things 792 // we want to do with them would require dropping the lock to avoid deadlocks. We'd end up 793 // copying all the lists over into separate vectors first, dropping the lock, operating on 794 // them, and then locking again. 795 auto& s = state.getWithoutLock(); 796 797 // We do, however, take and release the lock on the way out, to make sure anyone performing 798 // a conditional wait for state changes gets a chance to have their wait condition re-checked. 799 KJ_DEFER(state.lockExclusive()); 800 801 for (auto& event: s.start) { 802 KJ_ASSERT(event.state == _::XThreadEvent::QUEUED, event.state) { break; } 803 s.start.remove(event); 804 event.setDisconnected(); 805 event.sendReply(); 806 event.setDoneState(); 807 } 808 809 for (auto& event: s.executing) { 810 KJ_ASSERT(event.state == _::XThreadEvent::EXECUTING, event.state) { break; } 811 s.executing.remove(event); 812 event.promiseNode = nullptr; 813 event.setDisconnected(); 814 event.sendReply(); 815 event.setDoneState(); 816 } 817 818 for (auto& event: s.cancel) { 819 KJ_ASSERT(event.state == _::XThreadEvent::CANCELING, event.state) { break; } 820 s.cancel.remove(event); 821 event.promiseNode = nullptr; 822 event.setDoneState(); 823 } 824 825 // The replies list "should" be empty, because any locally-initiated tasks should have been 826 // canceled before destroying the EventLoop. 827 if (!s.replies.empty()) { 828 KJ_LOG(ERROR, "EventLoop destroyed with cross-thread event replies outstanding"); 829 for (auto& event: s.replies) { 830 s.replies.remove(event); 831 } 832 } 833 834 // Similarly for cross-thread fulfillers. The waiting tasks should have been canceled. 835 if (!s.fulfilled.empty()) { 836 KJ_LOG(ERROR, "EventLoop destroyed with cross-thread fulfiller replies outstanding"); 837 for (auto& event: s.fulfilled) { 838 s.fulfilled.remove(event); 839 event.state = _::XThreadPaf::DISPATCHED; 840 } 841 } 842 }}; 843 844 namespace _ { // (private) 845 846 XThreadEvent::XThreadEvent( 847 ExceptionOrValue& result, const Executor& targetExecutor, void* funcTracePtr) 848 : Event(targetExecutor.getLoop()), result(result), funcTracePtr(funcTracePtr), 849 targetExecutor(targetExecutor.addRef()) {} 850 851 void XThreadEvent::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) { 852 // We can't safely trace into another thread, so we'll stop here. 853 builder.add(funcTracePtr); 854 } 855 856 void XThreadEvent::ensureDoneOrCanceled() { 857 if (__atomic_load_n(&state, __ATOMIC_ACQUIRE) != DONE) { 858 auto lock = targetExecutor->impl->state.lockExclusive(); 859 860 const EventLoop* loop; 861 KJ_IF_MAYBE(l, lock->loop) { 862 loop = l; 863 } else { 864 // Target event loop is already dead, so we know it's already working on transitioning all 865 // events to the DONE state. We can just wait. 866 lock.wait([&](auto&) { return state == DONE; }); 867 return; 868 } 869 870 switch (state) { 871 case UNUSED: 872 // Nothing to do. 873 break; 874 case QUEUED: 875 lock->start.remove(*this); 876 // No wake needed since we removed work rather than adding it. 877 state = DONE; 878 break; 879 case EXECUTING: { 880 lock->executing.remove(*this); 881 lock->cancel.add(*this); 882 state = CANCELING; 883 KJ_IF_MAYBE(p, loop->port) { 884 p->wake(); 885 } 886 887 Maybe<Executor&> maybeSelfExecutor = nullptr; 888 if (threadLocalEventLoop != nullptr) { 889 KJ_IF_MAYBE(e, threadLocalEventLoop->executor) { 890 maybeSelfExecutor = **e; 891 } 892 } 893 894 KJ_IF_MAYBE(selfExecutor, maybeSelfExecutor) { 895 // If, while waiting for other threads to process our cancellation request, we have 896 // cancellation requests queued back to this thread, we must process them. Otherwise, 897 // we could deadlock with two threads waiting on each other to process cancellations. 898 // 899 // We don't have a terribly good way to detect this, except to check if the remote 900 // thread is itself waiting for cancellations and, if so, wake ourselves up to check for 901 // cancellations to process. This will busy-loop but at least it should eventually 902 // resolve assuming fair scheduling. 903 // 904 // To make things extra-annoying, in order to update our waitingForCancel flag, we have 905 // to lock our own executor state, but we can't take both locks at once, so we have to 906 // release the other lock in the meantime. 907 908 // Make sure we unset waitingForCancel on the way out. 909 KJ_DEFER({ 910 lock = {}; 911 912 Vector<_::XThreadEvent*> eventsToCancelOutsideLock; 913 KJ_DEFER(selfExecutor->impl->processAsyncCancellations(eventsToCancelOutsideLock)); 914 915 auto selfLock = selfExecutor->impl->state.lockExclusive(); 916 selfLock->waitingForCancel = false; 917 selfLock->dispatchCancels(eventsToCancelOutsideLock); 918 919 // We don't need to re-take the lock on the other executor here; it's not used again 920 // after this scope. 921 }); 922 923 while (state != DONE) { 924 bool otherThreadIsWaiting = lock->waitingForCancel; 925 926 // Make sure our waitingForCancel is on and dispatch any pending cancellations on this 927 // thread. 928 lock = {}; 929 { 930 Vector<_::XThreadEvent*> eventsToCancelOutsideLock; 931 KJ_DEFER(selfExecutor->impl->processAsyncCancellations(eventsToCancelOutsideLock)); 932 933 auto selfLock = selfExecutor->impl->state.lockExclusive(); 934 selfLock->waitingForCancel = true; 935 936 // Note that we don't have to proactively delete the PromiseNodes extracted from 937 // the canceled events because those nodes belong to this thread and can't possibly 938 // continue executing while we're blocked here. 939 selfLock->dispatchCancels(eventsToCancelOutsideLock); 940 } 941 942 if (otherThreadIsWaiting) { 943 // We know the other thread was waiting for cancellations to complete a moment ago. 944 // We may have just processed the necessary cancellations in this thread, in which 945 // case the other thread needs a chance to receive control and notice this. Or, it 946 // may be that the other thread is waiting for some third thread to take action. 947 // Either way, we should yield control here to give things a chance to settle. 948 // Otherwise we could end up in a tight busy loop. 949 #if _WIN32 950 Sleep(0); 951 #else 952 sched_yield(); 953 #endif 954 } 955 956 // OK now we can take the original lock again. 957 lock = targetExecutor->impl->state.lockExclusive(); 958 959 // OK, now we can wait for the other thread to either process our cancellation or 960 // indicate that it is waiting for remote cancellation. 961 lock.wait([&](const Executor::Impl::State& executorState) { 962 return state == DONE || executorState.waitingForCancel; 963 }); 964 } 965 } else { 966 // We have no executor of our own so we don't have to worry about cancellation cycles 967 // causing deadlock. 968 // 969 // NOTE: I don't think we can actually get here, because it implies that this is a 970 // synchronous execution, which means there's no way to cancel it. 971 lock.wait([&](auto&) { return state == DONE; }); 972 } 973 KJ_DASSERT(!targetLink.isLinked()); 974 break; 975 } 976 case CANCELING: 977 KJ_FAIL_ASSERT("impossible state: CANCELING should only be set within the above case"); 978 case DONE: 979 // Became done while we waited for lock. Nothing to do. 980 break; 981 } 982 } 983 984 KJ_IF_MAYBE(e, replyExecutor) { 985 // Since we know we reached the DONE state (or never left UNUSED), we know that the remote 986 // thread is all done playing with our `replyPrev` pointer. Only the current thread could 987 // possibly modify it after this point. So we can skip the lock if it's already null. 988 if (replyLink.isLinked()) { 989 auto lock = e->impl->state.lockExclusive(); 990 lock->replies.remove(*this); 991 } 992 } 993 } 994 995 void XThreadEvent::sendReply() { 996 KJ_IF_MAYBE(e, replyExecutor) { 997 // Queue the reply. 998 const EventLoop* replyLoop; 999 { 1000 auto lock = e->impl->state.lockExclusive(); 1001 KJ_IF_MAYBE(l, lock->loop) { 1002 lock->replies.add(*this); 1003 replyLoop = l; 1004 } else { 1005 // Calling thread exited without cancelling the promise. This is UB. In fact, 1006 // `replyExecutor` is probably already destroyed and we are in use-after-free territory 1007 // already. Better abort. 1008 KJ_LOG(FATAL, 1009 "the thread which called kj::Executor::executeAsync() apparently exited its own " 1010 "event loop without canceling the cross-thread promise first; this is undefined " 1011 "behavior so I will crash now"); 1012 abort(); 1013 } 1014 } 1015 1016 // Note that it's safe to assume `replyLoop` still exists even though we dropped the lock 1017 // because that thread would have had to cancel any promises before destroying its own 1018 // EventLoop, and when it tries to destroy this promise, it will wait for `state` to become 1019 // `DONE`, which we don't set until later on. That's nice because wake() probably makes a 1020 // syscall and we'd rather not hold the lock through syscalls. 1021 KJ_IF_MAYBE(p, replyLoop->port) { 1022 p->wake(); 1023 } 1024 } 1025 } 1026 1027 void XThreadEvent::done() { 1028 KJ_ASSERT(targetExecutor.get() == ¤tEventLoop().getExecutor(), 1029 "calling done() from wrong thread?"); 1030 1031 sendReply(); 1032 1033 { 1034 auto lock = targetExecutor->impl->state.lockExclusive(); 1035 1036 switch (state) { 1037 case EXECUTING: 1038 lock->executing.remove(*this); 1039 break; 1040 case CANCELING: 1041 // Sending thread requested cancelation, but we're done anyway, so it doesn't matter at this 1042 // point. 1043 lock->cancel.remove(*this); 1044 break; 1045 default: 1046 KJ_FAIL_ASSERT("can't call done() from this state", (uint)state); 1047 } 1048 1049 setDoneState(); 1050 } 1051 } 1052 1053 inline void XThreadEvent::setDoneState() { 1054 __atomic_store_n(&state, DONE, __ATOMIC_RELEASE); 1055 } 1056 1057 void XThreadEvent::setDisconnected() { 1058 result.addException(KJ_EXCEPTION(DISCONNECTED, 1059 "Executor's event loop exited before cross-thread event could complete")); 1060 } 1061 1062 class XThreadEvent::DelayedDoneHack: public Disposer { 1063 // Crazy hack: In fire(), we want to call done() if the event is finished. But done() signals 1064 // the requesting thread to wake up and possibly delete the XThreadEvent. But the caller (the 1065 // EventLoop) still has to set `event->firing = false` after `fire()` returns, so this would be 1066 // a race condition use-after-free. 1067 // 1068 // It just so happens, though, that fire() is allowed to return an optional `Own<Event>` to drop, 1069 // and the caller drops that pointer immediately after setting event->firing = false. So we 1070 // return a pointer whose disposer calls done(). 1071 // 1072 // It's not quite as much of a hack as it seems: The whole reason fire() returns an Own<Event> is 1073 // so that the event can delete itself, but do so after the caller sets event->firing = false. 1074 // It just happens to be that in this case, the event isn't deleting itself, but rather releasing 1075 // itself back to the other thread. 1076 1077 protected: 1078 void disposeImpl(void* pointer) const override { 1079 reinterpret_cast<XThreadEvent*>(pointer)->done(); 1080 } 1081 }; 1082 1083 Maybe<Own<Event>> XThreadEvent::fire() { 1084 static constexpr DelayedDoneHack DISPOSER {}; 1085 1086 KJ_IF_MAYBE(n, promiseNode) { 1087 n->get()->get(result); 1088 promiseNode = nullptr; // make sure to destroy in the thread that created it 1089 return Own<Event>(this, DISPOSER); 1090 } else { 1091 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { 1092 promiseNode = execute(); 1093 })) { 1094 result.addException(kj::mv(*exception)); 1095 }; 1096 KJ_IF_MAYBE(n, promiseNode) { 1097 n->get()->onReady(this); 1098 } else { 1099 return Own<Event>(this, DISPOSER); 1100 } 1101 } 1102 1103 return nullptr; 1104 } 1105 1106 void XThreadEvent::traceEvent(TraceBuilder& builder) { 1107 KJ_IF_MAYBE(n, promiseNode) { 1108 n->get()->tracePromise(builder, true); 1109 } 1110 1111 // We can't safely trace into another thread, so we'll stop here. 1112 builder.add(funcTracePtr); 1113 } 1114 1115 void XThreadEvent::onReady(Event* event) noexcept { 1116 onReadyEvent.init(event); 1117 } 1118 1119 XThreadPaf::XThreadPaf() 1120 : state(WAITING), executor(getCurrentThreadExecutor()) {} 1121 XThreadPaf::~XThreadPaf() noexcept(false) {} 1122 1123 void XThreadPaf::Disposer::disposeImpl(void* pointer) const { 1124 XThreadPaf* obj = reinterpret_cast<XThreadPaf*>(pointer); 1125 auto oldState = WAITING; 1126 1127 if (__atomic_load_n(&obj->state, __ATOMIC_ACQUIRE) == DISPATCHED) { 1128 // Common case: Promise was fully fulfilled and dispatched, no need for locking. 1129 delete obj; 1130 } else if (__atomic_compare_exchange_n(&obj->state, &oldState, CANCELED, false, 1131 __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) { 1132 // State transitioned from WAITING to CANCELED, so now it's the fulfiller's job to destroy the 1133 // object. 1134 } else { 1135 // Whoops, another thread is already in the process of fulfilling this promise. We'll have to 1136 // wait for it to finish and transition the state to FULFILLED. 1137 obj->executor.impl->state.when([&](auto&) { 1138 return obj->state == FULFILLED || obj->state == DISPATCHED; 1139 }, [&](Executor::Impl::State& exState) { 1140 if (obj->state == FULFILLED) { 1141 // The object is on the queue but was not yet dispatched. Remove it. 1142 exState.fulfilled.remove(*obj); 1143 } 1144 }); 1145 1146 // It's ours now, delete it. 1147 delete obj; 1148 } 1149 } 1150 1151 const XThreadPaf::Disposer XThreadPaf::DISPOSER; 1152 1153 void XThreadPaf::onReady(Event* event) noexcept { 1154 onReadyEvent.init(event); 1155 } 1156 1157 void XThreadPaf::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) { 1158 // We can't safely trace into another thread, so we'll stop here. 1159 // Maybe returning the address of get() will give us a function name with meaningful type 1160 // information. 1161 builder.add(getMethodStartAddress(implicitCast<PromiseNode&>(*this), &PromiseNode::get)); 1162 } 1163 1164 XThreadPaf::FulfillScope::FulfillScope(XThreadPaf** pointer) { 1165 obj = __atomic_exchange_n(pointer, static_cast<XThreadPaf*>(nullptr), __ATOMIC_ACQUIRE); 1166 auto oldState = WAITING; 1167 if (obj == nullptr) { 1168 // Already fulfilled (possibly by another thread). 1169 } else if (__atomic_compare_exchange_n(&obj->state, &oldState, FULFILLING, false, 1170 __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) { 1171 // Transitioned to FULFILLING, good. 1172 } else { 1173 // The waiting thread must have canceled. 1174 KJ_ASSERT(oldState == CANCELED); 1175 1176 // It's our responsibility to clean up, then. 1177 delete obj; 1178 1179 // Set `obj` null so that we don't try to fill it in or delete it later. 1180 obj = nullptr; 1181 } 1182 } 1183 XThreadPaf::FulfillScope::~FulfillScope() noexcept(false) { 1184 if (obj != nullptr) { 1185 auto lock = obj->executor.impl->state.lockExclusive(); 1186 KJ_IF_MAYBE(l, lock->loop) { 1187 lock->fulfilled.add(*obj); 1188 __atomic_store_n(&obj->state, FULFILLED, __ATOMIC_RELEASE); 1189 KJ_IF_MAYBE(p, l->port) { 1190 // TODO(perf): It's annoying we have to call wake() with the lock held, but we have to 1191 // prevent the destination EventLoop from being destroyed first. 1192 p->wake(); 1193 } 1194 } else { 1195 KJ_LOG(FATAL, 1196 "the thread which called kj::newPromiseAndCrossThreadFulfiller<T>() apparently exited " 1197 "its own event loop without canceling the cross-thread promise first; this is " 1198 "undefined behavior so I will crash now"); 1199 abort(); 1200 } 1201 } 1202 } 1203 1204 kj::Exception XThreadPaf::unfulfilledException() { 1205 // TODO(cleanup): Share code with regular PromiseAndFulfiller for stack tracing here. 1206 return kj::Exception(kj::Exception::Type::FAILED, __FILE__, __LINE__, kj::heapString( 1207 "cross-thread PromiseFulfiller was destroyed without fulfilling the promise.")); 1208 } 1209 1210 class ExecutorImpl: public Executor, public AtomicRefcounted { 1211 public: 1212 using Executor::Executor; 1213 1214 kj::Own<const Executor> addRef() const override { 1215 return kj::atomicAddRef(*this); 1216 } 1217 }; 1218 1219 } // namespace _ 1220 1221 Executor::Executor(EventLoop& loop, Badge<EventLoop>): impl(kj::heap<Impl>(loop)) {} 1222 Executor::~Executor() noexcept(false) {} 1223 1224 bool Executor::isLive() const { 1225 return impl->state.lockShared()->loop != nullptr; 1226 } 1227 1228 void Executor::send(_::XThreadEvent& event, bool sync) const { 1229 KJ_ASSERT(event.state == _::XThreadEvent::UNUSED); 1230 1231 if (sync) { 1232 EventLoop* thisThread = threadLocalEventLoop; 1233 if (thisThread != nullptr && 1234 thisThread->executor.map([this](auto& e) { return e == this; }).orDefault(false)) { 1235 // Invoking a sync request on our own thread. Just execute it directly; if we try to queue 1236 // it to the loop, we'll deadlock. 1237 auto promiseNode = event.execute(); 1238 1239 // If the function returns a promise, we have no way to pump the event loop to wait for it, 1240 // because the event loop may already be pumping somewhere up the stack. 1241 KJ_ASSERT(promiseNode == nullptr, 1242 "can't call executeSync() on own thread's executor with a promise-returning function"); 1243 1244 return; 1245 } 1246 } else { 1247 event.replyExecutor = getCurrentThreadExecutor(); 1248 1249 // Note that async requests will "just work" even if the target executor is our own thread's 1250 // executor. In theory we could detect this case to avoid some locking and signals but that 1251 // would be extra code complexity for probably little benefit. 1252 } 1253 1254 auto lock = impl->state.lockExclusive(); 1255 const EventLoop* loop; 1256 KJ_IF_MAYBE(l, lock->loop) { 1257 loop = l; 1258 } else { 1259 event.setDisconnected(); 1260 return; 1261 } 1262 1263 event.state = _::XThreadEvent::QUEUED; 1264 lock->start.add(event); 1265 1266 KJ_IF_MAYBE(p, loop->port) { 1267 p->wake(); 1268 } else { 1269 // Event loop will be waiting on executor.wait(), which will be woken when we unlock the mutex. 1270 } 1271 1272 if (sync) { 1273 lock.wait([&](auto&) { return event.state == _::XThreadEvent::DONE; }); 1274 } 1275 } 1276 1277 void Executor::wait() { 1278 Vector<_::XThreadEvent*> eventsToCancelOutsideLock; 1279 KJ_DEFER(impl->processAsyncCancellations(eventsToCancelOutsideLock)); 1280 1281 auto lock = impl->state.lockExclusive(); 1282 1283 lock.wait([](const Impl::State& state) { 1284 return state.isDispatchNeeded(); 1285 }); 1286 1287 lock->dispatchAll(eventsToCancelOutsideLock); 1288 } 1289 1290 bool Executor::poll() { 1291 Vector<_::XThreadEvent*> eventsToCancelOutsideLock; 1292 KJ_DEFER(impl->processAsyncCancellations(eventsToCancelOutsideLock)); 1293 1294 auto lock = impl->state.lockExclusive(); 1295 if (lock->isDispatchNeeded()) { 1296 lock->dispatchAll(eventsToCancelOutsideLock); 1297 return true; 1298 } else { 1299 return false; 1300 } 1301 } 1302 1303 EventLoop& Executor::getLoop() const { 1304 KJ_IF_MAYBE(l, impl->state.lockShared()->loop) { 1305 return *l; 1306 } else { 1307 kj::throwFatalException(KJ_EXCEPTION(DISCONNECTED, "Executor's event loop has exited")); 1308 } 1309 } 1310 1311 const Executor& getCurrentThreadExecutor() { 1312 return currentEventLoop().getExecutor(); 1313 } 1314 1315 // ======================================================================================= 1316 // Fiber implementation. 1317 1318 namespace _ { // private 1319 1320 #if KJ_USE_FIBERS 1321 #if !(_WIN32 || __CYGWIN__) 1322 struct FiberStack::Impl { 1323 // This struct serves two purposes: 1324 // - It contains OS-specific state that we don't want to declare in the header. 1325 // - It is allocated at the top of the fiber's stack area, so the Impl pointer also serves to 1326 // track where the stack was allocated. 1327 1328 jmp_buf fiberJmpBuf; 1329 jmp_buf originalJmpBuf; 1330 1331 static Impl* alloc(size_t stackSize, ucontext_t* context) { 1332 #ifndef MAP_ANONYMOUS 1333 #define MAP_ANONYMOUS MAP_ANON 1334 #endif 1335 #ifndef MAP_STACK 1336 #define MAP_STACK 0 1337 #endif 1338 1339 size_t pageSize = getPageSize(); 1340 size_t allocSize = stackSize + pageSize; // size plus guard page 1341 1342 // Allocate virtual address space for the stack but make it inaccessible initially. 1343 // TODO(someday): Does it make sense to use MAP_GROWSDOWN on Linux? It's a kind of bizarre flag 1344 // that causes the mapping to automatically allocate extra pages (beyond the range specified) 1345 // until it hits something... 1346 void* stack = mmap(nullptr, allocSize, PROT_NONE, 1347 MAP_ANONYMOUS | MAP_PRIVATE | MAP_STACK, -1, 0); 1348 if (stack == MAP_FAILED) { 1349 KJ_FAIL_SYSCALL("mmap(new stack)", errno); 1350 } 1351 KJ_ON_SCOPE_FAILURE({ 1352 KJ_SYSCALL(munmap(stack, allocSize)) { break; } 1353 }); 1354 1355 // Now mark everything except the guard page as read-write. We assume the stack grows down, so 1356 // the guard page is at the beginning. No modern architecture uses stacks that grow up. 1357 KJ_SYSCALL(mprotect(reinterpret_cast<byte*>(stack) + pageSize, 1358 stackSize, PROT_READ | PROT_WRITE)); 1359 1360 // Stick `Impl` at the top of the stack. 1361 Impl* impl = (reinterpret_cast<Impl*>(reinterpret_cast<byte*>(stack) + allocSize) - 1); 1362 1363 // Note: mmap() allocates zero'd pages so we don't have to memset() anything here. 1364 1365 KJ_SYSCALL(getcontext(context)); 1366 context->uc_stack.ss_size = allocSize - sizeof(Impl); 1367 context->uc_stack.ss_sp = reinterpret_cast<char*>(stack); 1368 context->uc_stack.ss_flags = 0; 1369 // We don't use uc_link since our fiber start routine runs forever in a loop to allow for 1370 // reuse. When we're done with the fiber, we just destroy it, without switching to it's 1371 // stack. This is safe since the start routine doesn't allocate any memory or RAII objects 1372 // before looping. 1373 context->uc_link = 0; 1374 1375 return impl; 1376 } 1377 1378 static void free(Impl* impl, size_t stackSize) { 1379 size_t allocSize = stackSize + getPageSize(); 1380 void* stack = reinterpret_cast<byte*>(impl + 1) - allocSize; 1381 KJ_SYSCALL(munmap(stack, allocSize)) { break; } 1382 } 1383 1384 static size_t getPageSize() { 1385 #ifndef _SC_PAGESIZE 1386 #define _SC_PAGESIZE _SC_PAGE_SIZE 1387 #endif 1388 static size_t result = sysconf(_SC_PAGE_SIZE); 1389 return result; 1390 } 1391 }; 1392 #endif 1393 #endif 1394 1395 struct FiberStack::StartRoutine { 1396 #if _WIN32 || __CYGWIN__ 1397 static void WINAPI run(LPVOID ptr) { 1398 // This is the static C-style function we pass to CreateFiber(). 1399 reinterpret_cast<FiberStack*>(ptr)->run(); 1400 } 1401 #else 1402 [[noreturn]] static void run(int arg1, int arg2) { 1403 // This is the static C-style function we pass to makeContext(). 1404 1405 // POSIX says the arguments are ints, not pointers. So we split our pointer in half in order to 1406 // work correctly on 64-bit machines. Gross. 1407 uintptr_t ptr = static_cast<uint>(arg1); 1408 ptr |= static_cast<uintptr_t>(static_cast<uint>(arg2)) << (sizeof(ptr) * 4); 1409 1410 auto& stack = *reinterpret_cast<FiberStack*>(ptr); 1411 1412 // We first switch to the fiber inside of the FiberStack constructor. This is just for 1413 // initialization purposes, and we're expected to switch back immediately. 1414 stack.switchToMain(); 1415 1416 // OK now have a real job. 1417 stack.run(); 1418 } 1419 #endif 1420 }; 1421 1422 void FiberStack::run() { 1423 // Loop forever so that the fiber can be reused. 1424 for (;;) { 1425 KJ_SWITCH_ONEOF(main) { 1426 KJ_CASE_ONEOF(event, FiberBase*) { 1427 event->run(); 1428 } 1429 KJ_CASE_ONEOF(func, SynchronousFunc*) { 1430 KJ_IF_MAYBE(exception, kj::runCatchingExceptions(func->func)) { 1431 func->exception.emplace(kj::mv(*exception)); 1432 } 1433 } 1434 } 1435 1436 // Wait for the fiber to be used again. Note the fiber might simply be destroyed without this 1437 // ever returning. That's OK because we don't have any nontrivial destructors on the stack 1438 // at this point. 1439 switchToMain(); 1440 } 1441 } 1442 1443 FiberStack::FiberStack(size_t stackSizeParam) 1444 // Force stackSize to a reasonable minimum. 1445 : stackSize(kj::max(stackSizeParam, 65536)) 1446 { 1447 1448 #if KJ_USE_FIBERS 1449 #if _WIN32 || __CYGWIN__ 1450 // We can create fibers before we convert the main thread into a fiber in FiberBase 1451 KJ_WIN32(osFiber = CreateFiber(stackSize, &StartRoutine::run, this)); 1452 1453 #else 1454 // Note: Nothing below here can throw. If that changes then we need to call Impl::free(impl) 1455 // on exceptions... 1456 ucontext_t context; 1457 impl = Impl::alloc(stackSize, &context); 1458 1459 // POSIX says the arguments are ints, not pointers. So we split our pointer in half in order to 1460 // work correctly on 64-bit machines. Gross. 1461 uintptr_t ptr = reinterpret_cast<uintptr_t>(this); 1462 int arg1 = ptr & ((uintptr_t(1) << (sizeof(ptr) * 4)) - 1); 1463 int arg2 = ptr >> (sizeof(ptr) * 4); 1464 1465 makecontext(&context, reinterpret_cast<void(*)()>(&StartRoutine::run), 2, arg1, arg2); 1466 1467 if (_setjmp(impl->originalJmpBuf) == 0) { 1468 setcontext(&context); 1469 } 1470 #endif 1471 #else 1472 #if KJ_NO_EXCEPTIONS 1473 KJ_UNIMPLEMENTED("Fibers are not implemented because exceptions are disabled"); 1474 #else 1475 KJ_UNIMPLEMENTED( 1476 "Fibers are not implemented on this platform because its C library lacks setcontext() " 1477 "and friends. If you'd like to see fiber support added, file a bug to let us know. " 1478 "We can likely make it happen using assembly, but didn't want to try unless it was " 1479 "actually needed."); 1480 #endif 1481 #endif 1482 } 1483 1484 FiberStack::~FiberStack() noexcept(false) { 1485 #if KJ_USE_FIBERS 1486 #if _WIN32 || __CYGWIN__ 1487 DeleteFiber(osFiber); 1488 #else 1489 Impl::free(impl, stackSize); 1490 #endif 1491 #endif 1492 } 1493 1494 void FiberStack::initialize(FiberBase& fiber) { 1495 KJ_REQUIRE(this->main == nullptr); 1496 this->main = &fiber; 1497 } 1498 1499 void FiberStack::initialize(SynchronousFunc& func) { 1500 KJ_REQUIRE(this->main == nullptr); 1501 this->main = &func; 1502 } 1503 1504 FiberBase::FiberBase(size_t stackSize, _::ExceptionOrValue& result) 1505 : state(WAITING), stack(kj::heap<FiberStack>(stackSize)), result(result) { 1506 stack->initialize(*this); 1507 ensureThreadCanRunFibers(); 1508 } 1509 1510 FiberBase::FiberBase(const FiberPool& pool, _::ExceptionOrValue& result) 1511 : state(WAITING), result(result) { 1512 stack = pool.impl->takeStack(); 1513 stack->initialize(*this); 1514 ensureThreadCanRunFibers(); 1515 } 1516 1517 FiberBase::~FiberBase() noexcept(false) {} 1518 1519 void FiberBase::destroy() { 1520 // Called by `~Fiber()` to begin teardown. We can't do this work in `~FiberBase()` because the 1521 // `Fiber` subclass contains members that may still be in-use until the fiber stops. 1522 1523 switch (state) { 1524 case WAITING: 1525 // We can't just free the stack while the fiber is running. We need to force it to execute 1526 // until finished, so we cause it to throw an exception. 1527 state = CANCELED; 1528 stack->switchToFiber(); 1529 1530 // The fiber should only switch back to the main stack on completion, because any further 1531 // calls to wait() would throw before trying to switch. 1532 KJ_ASSERT(state == FINISHED); 1533 1534 // The fiber shut down properly so the stack is safe to reuse. 1535 stack->reset(); 1536 break; 1537 1538 case RUNNING: 1539 case CANCELED: 1540 // Bad news. 1541 KJ_LOG(FATAL, "fiber tried to destroy itself"); 1542 ::abort(); 1543 break; 1544 1545 case FINISHED: 1546 // Normal completion, yay. 1547 stack->reset(); 1548 break; 1549 } 1550 } 1551 1552 Maybe<Own<Event>> FiberBase::fire() { 1553 KJ_ASSERT(state == WAITING); 1554 state = RUNNING; 1555 stack->switchToFiber(); 1556 return nullptr; 1557 } 1558 1559 void FiberStack::switchToFiber() { 1560 // Switch from the main stack to the fiber. Returns once the fiber either calls switchToMain() 1561 // or returns from its main function. 1562 #if KJ_USE_FIBERS 1563 #if _WIN32 || __CYGWIN__ 1564 SwitchToFiber(osFiber); 1565 #else 1566 if (_setjmp(impl->originalJmpBuf) == 0) { 1567 _longjmp(impl->fiberJmpBuf, 1); 1568 } 1569 #endif 1570 #endif 1571 } 1572 void FiberStack::switchToMain() { 1573 // Switch from the fiber to the main stack. Returns the next time the main stack calls 1574 // switchToFiber(). 1575 #if KJ_USE_FIBERS 1576 #if _WIN32 || __CYGWIN__ 1577 SwitchToFiber(getMainWin32Fiber()); 1578 #else 1579 if (_setjmp(impl->fiberJmpBuf) == 0) { 1580 _longjmp(impl->originalJmpBuf, 1); 1581 } 1582 #endif 1583 #endif 1584 } 1585 1586 void FiberBase::run() { 1587 #if !KJ_NO_EXCEPTIONS 1588 bool caughtCanceled = false; 1589 state = RUNNING; 1590 KJ_DEFER(state = FINISHED); 1591 1592 WaitScope waitScope(currentEventLoop(), *this); 1593 1594 try { 1595 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { 1596 runImpl(waitScope); 1597 })) { 1598 result.addException(kj::mv(*exception)); 1599 } 1600 } catch (CanceledException) { 1601 if (state != CANCELED) { 1602 // no idea who would throw this but it's not really our problem 1603 result.addException(KJ_EXCEPTION(FAILED, "Caught CanceledException, but fiber wasn't canceled")); 1604 } 1605 caughtCanceled = true; 1606 } 1607 1608 if (state == CANCELED && !caughtCanceled) { 1609 KJ_LOG(ERROR, "Canceled fiber apparently caught CanceledException and didn't rethrow it. " 1610 "Generally, applications should not catch CanceledException, but if they do, they must always rethrow."); 1611 } 1612 1613 onReadyEvent.arm(); 1614 #endif 1615 } 1616 1617 void FiberBase::onReady(_::Event* event) noexcept { 1618 onReadyEvent.init(event); 1619 } 1620 1621 void FiberBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) { 1622 if (stopAtNextEvent) return; 1623 currentInner->tracePromise(builder, false); 1624 stack->trace(builder); 1625 } 1626 1627 void FiberBase::traceEvent(TraceBuilder& builder) { 1628 currentInner->tracePromise(builder, true); 1629 stack->trace(builder); 1630 onReadyEvent.traceEvent(builder); 1631 } 1632 1633 } // namespace _ (private) 1634 1635 // ======================================================================================= 1636 1637 void EventPort::setRunnable(bool runnable) {} 1638 1639 void EventPort::wake() const { 1640 kj::throwRecoverableException(KJ_EXCEPTION(UNIMPLEMENTED, 1641 "cross-thread wake() not implemented by this EventPort implementation")); 1642 } 1643 1644 EventLoop::EventLoop() 1645 : daemons(kj::heap<TaskSet>(_::LoggingErrorHandler::instance)) {} 1646 1647 EventLoop::EventLoop(EventPort& port) 1648 : port(port), 1649 daemons(kj::heap<TaskSet>(_::LoggingErrorHandler::instance)) {} 1650 1651 EventLoop::~EventLoop() noexcept(false) { 1652 // Destroy all "daemon" tasks, noting that their destructors might register more daemon tasks. 1653 while (!daemons->isEmpty()) { 1654 auto oldDaemons = kj::mv(daemons); 1655 daemons = kj::heap<TaskSet>(_::LoggingErrorHandler::instance); 1656 } 1657 daemons = nullptr; 1658 1659 KJ_IF_MAYBE(e, executor) { 1660 // Cancel all outstanding cross-thread events. 1661 e->get()->impl->disconnect(); 1662 } 1663 1664 // The application _should_ destroy everything using the EventLoop before destroying the 1665 // EventLoop itself, so if there are events on the loop, this indicates a memory leak. 1666 KJ_REQUIRE(head == nullptr, "EventLoop destroyed with events still in the queue. Memory leak?", 1667 head->traceEvent()) { 1668 // Unlink all the events and hope that no one ever fires them... 1669 _::Event* event = head; 1670 while (event != nullptr) { 1671 _::Event* next = event->next; 1672 event->next = nullptr; 1673 event->prev = nullptr; 1674 event = next; 1675 } 1676 break; 1677 } 1678 1679 KJ_REQUIRE(threadLocalEventLoop != this, 1680 "EventLoop destroyed while still current for the thread.") { 1681 threadLocalEventLoop = nullptr; 1682 break; 1683 } 1684 } 1685 1686 void EventLoop::run(uint maxTurnCount) { 1687 running = true; 1688 KJ_DEFER(running = false); 1689 1690 for (uint i = 0; i < maxTurnCount; i++) { 1691 if (!turn()) { 1692 break; 1693 } 1694 } 1695 1696 setRunnable(isRunnable()); 1697 } 1698 1699 bool EventLoop::turn() { 1700 _::Event* event = head; 1701 1702 if (event == nullptr) { 1703 // No events in the queue. 1704 return false; 1705 } else { 1706 head = event->next; 1707 if (head != nullptr) { 1708 head->prev = &head; 1709 } 1710 1711 depthFirstInsertPoint = &head; 1712 if (breadthFirstInsertPoint == &event->next) { 1713 breadthFirstInsertPoint = &head; 1714 } 1715 if (tail == &event->next) { 1716 tail = &head; 1717 } 1718 1719 event->next = nullptr; 1720 event->prev = nullptr; 1721 1722 Maybe<Own<_::Event>> eventToDestroy; 1723 { 1724 event->firing = true; 1725 KJ_DEFER(event->firing = false); 1726 currentlyFiring = event; 1727 KJ_DEFER(currentlyFiring = nullptr); 1728 eventToDestroy = event->fire(); 1729 } 1730 1731 depthFirstInsertPoint = &head; 1732 return true; 1733 } 1734 } 1735 1736 bool EventLoop::isRunnable() { 1737 return head != nullptr; 1738 } 1739 1740 const Executor& EventLoop::getExecutor() { 1741 KJ_IF_MAYBE(e, executor) { 1742 return **e; 1743 } else { 1744 return *executor.emplace(kj::atomicRefcounted<_::ExecutorImpl>(*this, Badge<EventLoop>())); 1745 } 1746 } 1747 1748 void EventLoop::setRunnable(bool runnable) { 1749 if (runnable != lastRunnableState) { 1750 KJ_IF_MAYBE(p, port) { 1751 p->setRunnable(runnable); 1752 } 1753 lastRunnableState = runnable; 1754 } 1755 } 1756 1757 void EventLoop::enterScope() { 1758 KJ_REQUIRE(threadLocalEventLoop == nullptr, "This thread already has an EventLoop."); 1759 threadLocalEventLoop = this; 1760 } 1761 1762 void EventLoop::leaveScope() { 1763 KJ_REQUIRE(threadLocalEventLoop == this, 1764 "WaitScope destroyed in a different thread than it was created in.") { 1765 break; 1766 } 1767 threadLocalEventLoop = nullptr; 1768 } 1769 1770 void EventLoop::wait() { 1771 KJ_IF_MAYBE(p, port) { 1772 if (p->wait()) { 1773 // Another thread called wake(). Check for cross-thread events. 1774 KJ_IF_MAYBE(e, executor) { 1775 e->get()->poll(); 1776 } 1777 } 1778 } else KJ_IF_MAYBE(e, executor) { 1779 e->get()->wait(); 1780 } else { 1781 KJ_FAIL_REQUIRE("Nothing to wait for; this thread would hang forever."); 1782 } 1783 } 1784 1785 void EventLoop::poll() { 1786 KJ_IF_MAYBE(p, port) { 1787 if (p->poll()) { 1788 // Another thread called wake(). Check for cross-thread events. 1789 KJ_IF_MAYBE(e, executor) { 1790 e->get()->poll(); 1791 } 1792 } 1793 } else KJ_IF_MAYBE(e, executor) { 1794 e->get()->poll(); 1795 } 1796 } 1797 1798 void WaitScope::poll() { 1799 KJ_REQUIRE(&loop == threadLocalEventLoop, "WaitScope not valid for this thread."); 1800 KJ_REQUIRE(!loop.running, "poll() is not allowed from within event callbacks."); 1801 1802 loop.running = true; 1803 KJ_DEFER(loop.running = false); 1804 1805 runOnStackPool([&]() { 1806 for (;;) { 1807 if (!loop.turn()) { 1808 // No events in the queue. Poll for I/O. 1809 loop.poll(); 1810 1811 if (!loop.isRunnable()) { 1812 // Still no events in the queue. We're done. 1813 return; 1814 } 1815 } 1816 } 1817 }); 1818 } 1819 1820 void WaitScope::cancelAllDetached() { 1821 KJ_REQUIRE(fiber == nullptr, 1822 "can't call cancelAllDetached() on a fiber WaitScope, only top-level"); 1823 1824 while (!loop.daemons->isEmpty()) { 1825 auto oldDaemons = kj::mv(loop.daemons); 1826 loop.daemons = kj::heap<TaskSet>(_::LoggingErrorHandler::instance); 1827 // Destroying `oldDaemons` could theoretically add new ones. 1828 } 1829 } 1830 1831 namespace _ { // private 1832 1833 #if !KJ_NO_EXCEPTIONS 1834 static kj::CanceledException fiberCanceledException() { 1835 // Construct the exception to throw from wait() when the fiber has been canceled (because the 1836 // promise returned by startFiber() was dropped before completion). 1837 return kj::CanceledException { }; 1838 }; 1839 #endif 1840 1841 void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope& waitScope) { 1842 EventLoop& loop = waitScope.loop; 1843 KJ_REQUIRE(&loop == threadLocalEventLoop, "WaitScope not valid for this thread."); 1844 1845 #if !KJ_NO_EXCEPTIONS 1846 // we don't support fibers when running without exceptions, so just remove the whole block 1847 KJ_IF_MAYBE(fiber, waitScope.fiber) { 1848 if (fiber->state == FiberBase::CANCELED) { 1849 throw fiberCanceledException(); 1850 } 1851 KJ_REQUIRE(fiber->state == FiberBase::RUNNING, 1852 "This WaitScope can only be used within the fiber that created it."); 1853 1854 node->setSelfPointer(&node); 1855 node->onReady(fiber); 1856 1857 fiber->currentInner = node; 1858 KJ_DEFER(fiber->currentInner = nullptr); 1859 1860 // Switch to the main stack to run the event loop. 1861 fiber->state = FiberBase::WAITING; 1862 fiber->stack->switchToMain(); 1863 1864 // The main stack switched back to us, meaning either the event we registered with 1865 // node->onReady() fired, or we are being canceled by FiberBase's destructor. 1866 1867 if (fiber->state == FiberBase::CANCELED) { 1868 throw fiberCanceledException(); 1869 } 1870 1871 KJ_ASSERT(fiber->state == FiberBase::RUNNING); 1872 } else { 1873 #endif 1874 KJ_REQUIRE(!loop.running, "wait() is not allowed from within event callbacks."); 1875 1876 RootEvent doneEvent(node, reinterpret_cast<void*>(&waitImpl)); 1877 node->setSelfPointer(&node); 1878 node->onReady(&doneEvent); 1879 1880 loop.running = true; 1881 KJ_DEFER(loop.running = false); 1882 1883 for (;;) { 1884 waitScope.runOnStackPool([&]() { 1885 uint counter = 0; 1886 while (!doneEvent.fired) { 1887 if (!loop.turn()) { 1888 // No events in the queue. Wait for callback. 1889 return; 1890 } else if (++counter > waitScope.busyPollInterval) { 1891 // Note: It's intentional that if busyPollInterval is kj::maxValue, we never poll. 1892 counter = 0; 1893 loop.poll(); 1894 } 1895 } 1896 }); 1897 1898 if (doneEvent.fired) { 1899 break; 1900 } else { 1901 loop.wait(); 1902 } 1903 } 1904 1905 loop.setRunnable(loop.isRunnable()); 1906 #if !KJ_NO_EXCEPTIONS 1907 } 1908 #endif 1909 1910 waitScope.runOnStackPool([&]() { 1911 node->get(result); 1912 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { 1913 node = nullptr; 1914 })) { 1915 result.addException(kj::mv(*exception)); 1916 } 1917 }); 1918 } 1919 1920 bool pollImpl(_::PromiseNode& node, WaitScope& waitScope) { 1921 EventLoop& loop = waitScope.loop; 1922 KJ_REQUIRE(&loop == threadLocalEventLoop, "WaitScope not valid for this thread."); 1923 KJ_REQUIRE(waitScope.fiber == nullptr, "poll() is not supported in fibers."); 1924 KJ_REQUIRE(!loop.running, "poll() is not allowed from within event callbacks."); 1925 1926 RootEvent doneEvent(&node, reinterpret_cast<void*>(&pollImpl)); 1927 node.onReady(&doneEvent); 1928 1929 loop.running = true; 1930 KJ_DEFER(loop.running = false); 1931 1932 waitScope.runOnStackPool([&]() { 1933 while (!doneEvent.fired) { 1934 if (!loop.turn()) { 1935 // No events in the queue. Poll for I/O. 1936 loop.poll(); 1937 1938 if (!doneEvent.fired && !loop.isRunnable()) { 1939 // No progress. Give up. 1940 node.onReady(nullptr); 1941 loop.setRunnable(false); 1942 break; 1943 } 1944 } 1945 } 1946 }); 1947 1948 if (!doneEvent.fired) { 1949 return false; 1950 } 1951 1952 loop.setRunnable(loop.isRunnable()); 1953 return true; 1954 } 1955 1956 Promise<void> yield() { 1957 return _::PromiseNode::to<Promise<void>>(kj::heap<YieldPromiseNode>()); 1958 } 1959 1960 Promise<void> yieldHarder() { 1961 return _::PromiseNode::to<Promise<void>>(kj::heap<YieldHarderPromiseNode>()); 1962 } 1963 1964 Own<PromiseNode> neverDone() { 1965 return kj::heap<NeverDonePromiseNode>(); 1966 } 1967 1968 void NeverDone::wait(WaitScope& waitScope) const { 1969 ExceptionOr<Void> dummy; 1970 waitImpl(neverDone(), dummy, waitScope); 1971 KJ_UNREACHABLE; 1972 } 1973 1974 void detach(kj::Promise<void>&& promise) { 1975 EventLoop& loop = currentEventLoop(); 1976 KJ_REQUIRE(loop.daemons.get() != nullptr, "EventLoop is shutting down.") { return; } 1977 loop.daemons->add(kj::mv(promise)); 1978 } 1979 1980 Event::Event() 1981 : loop(currentEventLoop()), next(nullptr), prev(nullptr) {} 1982 1983 Event::Event(kj::EventLoop& loop) 1984 : loop(loop), next(nullptr), prev(nullptr) {} 1985 1986 Event::~Event() noexcept(false) { 1987 disarm(); 1988 1989 KJ_REQUIRE(!firing, "Promise callback destroyed itself."); 1990 } 1991 1992 void Event::armDepthFirst() { 1993 KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr, 1994 "Event armed from different thread than it was created in. You must use " 1995 "Executor to queue events cross-thread."); 1996 1997 if (prev == nullptr) { 1998 next = *loop.depthFirstInsertPoint; 1999 prev = loop.depthFirstInsertPoint; 2000 *prev = this; 2001 if (next != nullptr) { 2002 next->prev = &next; 2003 } 2004 2005 loop.depthFirstInsertPoint = &next; 2006 2007 if (loop.breadthFirstInsertPoint == prev) { 2008 loop.breadthFirstInsertPoint = &next; 2009 } 2010 if (loop.tail == prev) { 2011 loop.tail = &next; 2012 } 2013 2014 loop.setRunnable(true); 2015 } 2016 } 2017 2018 void Event::armBreadthFirst() { 2019 KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr, 2020 "Event armed from different thread than it was created in. You must use " 2021 "Executor to queue events cross-thread."); 2022 2023 if (prev == nullptr) { 2024 next = *loop.breadthFirstInsertPoint; 2025 prev = loop.breadthFirstInsertPoint; 2026 *prev = this; 2027 if (next != nullptr) { 2028 next->prev = &next; 2029 } 2030 2031 loop.breadthFirstInsertPoint = &next; 2032 2033 if (loop.tail == prev) { 2034 loop.tail = &next; 2035 } 2036 2037 loop.setRunnable(true); 2038 } 2039 } 2040 2041 void Event::armLast() { 2042 KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr, 2043 "Event armed from different thread than it was created in. You must use " 2044 "Executor to queue events cross-thread."); 2045 2046 if (prev == nullptr) { 2047 next = *loop.breadthFirstInsertPoint; 2048 prev = loop.breadthFirstInsertPoint; 2049 *prev = this; 2050 if (next != nullptr) { 2051 next->prev = &next; 2052 } 2053 2054 // We don't update loop.breadthFirstInsertPoint because we want further inserts to go *before* 2055 // this event. 2056 2057 if (loop.tail == prev) { 2058 loop.tail = &next; 2059 } 2060 2061 loop.setRunnable(true); 2062 } 2063 } 2064 2065 bool Event::isNext() { 2066 return loop.running && loop.head == this; 2067 } 2068 2069 void Event::disarm() { 2070 if (prev != nullptr) { 2071 if (threadLocalEventLoop != &loop && threadLocalEventLoop != nullptr) { 2072 KJ_LOG(FATAL, "Promise destroyed from a different thread than it was created in."); 2073 // There's no way out of this place without UB, so abort now. 2074 abort(); 2075 } 2076 2077 if (loop.tail == &next) { 2078 loop.tail = prev; 2079 } 2080 if (loop.depthFirstInsertPoint == &next) { 2081 loop.depthFirstInsertPoint = prev; 2082 } 2083 if (loop.breadthFirstInsertPoint == &next) { 2084 loop.breadthFirstInsertPoint = prev; 2085 } 2086 2087 *prev = next; 2088 if (next != nullptr) { 2089 next->prev = prev; 2090 } 2091 2092 prev = nullptr; 2093 next = nullptr; 2094 } 2095 } 2096 2097 String Event::traceEvent() { 2098 void* space[32]; 2099 TraceBuilder builder(space); 2100 traceEvent(builder); 2101 return kj::str(builder); 2102 } 2103 2104 String TraceBuilder::toString() { 2105 auto result = finish(); 2106 return kj::str(stringifyStackTraceAddresses(result), 2107 stringifyStackTrace(result)); 2108 } 2109 2110 } // namespace _ (private) 2111 2112 ArrayPtr<void* const> getAsyncTrace(ArrayPtr<void*> space) { 2113 EventLoop* loop = threadLocalEventLoop; 2114 if (loop == nullptr) return nullptr; 2115 if (loop->currentlyFiring == nullptr) return nullptr; 2116 2117 _::TraceBuilder builder(space); 2118 loop->currentlyFiring->traceEvent(builder); 2119 return builder.finish(); 2120 } 2121 2122 kj::String getAsyncTrace() { 2123 void* space[32]; 2124 auto trace = getAsyncTrace(space); 2125 return kj::str(stringifyStackTraceAddresses(trace), stringifyStackTrace(trace)); 2126 } 2127 2128 // ======================================================================================= 2129 2130 namespace _ { // private 2131 2132 kj::String PromiseBase::trace() { 2133 void* space[32]; 2134 TraceBuilder builder(space); 2135 node->tracePromise(builder, false); 2136 return kj::str(builder); 2137 } 2138 2139 void PromiseNode::setSelfPointer(Own<PromiseNode>* selfPtr) noexcept {} 2140 2141 void PromiseNode::OnReadyEvent::init(Event* newEvent) { 2142 if (event == _kJ_ALREADY_READY) { 2143 // A new continuation was added to a promise that was already ready. In this case, we schedule 2144 // breadth-first, to make it difficult for applications to accidentally starve the event loop 2145 // by repeatedly waiting on immediate promises. 2146 if (newEvent) newEvent->armBreadthFirst(); 2147 } else { 2148 event = newEvent; 2149 } 2150 } 2151 2152 void PromiseNode::OnReadyEvent::arm() { 2153 KJ_ASSERT(event != _kJ_ALREADY_READY, "arm() should only be called once"); 2154 2155 if (event != nullptr) { 2156 // A promise resolved and an event is already waiting on it. In this case, arm in depth-first 2157 // order so that the event runs immediately after the current one. This way, chained promises 2158 // execute together for better cache locality and lower latency. 2159 event->armDepthFirst(); 2160 } 2161 2162 event = _kJ_ALREADY_READY; 2163 } 2164 2165 void PromiseNode::OnReadyEvent::armBreadthFirst() { 2166 KJ_ASSERT(event != _kJ_ALREADY_READY, "armBreadthFirst() should only be called once"); 2167 2168 if (event != nullptr) { 2169 // A promise resolved and an event is already waiting on it. 2170 event->armBreadthFirst(); 2171 } 2172 2173 event = _kJ_ALREADY_READY; 2174 } 2175 2176 // ------------------------------------------------------------------- 2177 2178 ImmediatePromiseNodeBase::ImmediatePromiseNodeBase() {} 2179 ImmediatePromiseNodeBase::~ImmediatePromiseNodeBase() noexcept(false) {} 2180 2181 void ImmediatePromiseNodeBase::onReady(Event* event) noexcept { 2182 if (event) event->armBreadthFirst(); 2183 } 2184 2185 void ImmediatePromiseNodeBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) { 2186 // Maybe returning the address of get() will give us a function name with meaningful type 2187 // information. 2188 builder.add(getMethodStartAddress(implicitCast<PromiseNode&>(*this), &PromiseNode::get)); 2189 } 2190 2191 ImmediateBrokenPromiseNode::ImmediateBrokenPromiseNode(Exception&& exception) 2192 : exception(kj::mv(exception)) {} 2193 2194 void ImmediateBrokenPromiseNode::get(ExceptionOrValue& output) noexcept { 2195 output.exception = kj::mv(exception); 2196 } 2197 2198 // ------------------------------------------------------------------- 2199 2200 AttachmentPromiseNodeBase::AttachmentPromiseNodeBase(Own<PromiseNode>&& dependencyParam) 2201 : dependency(kj::mv(dependencyParam)) { 2202 dependency->setSelfPointer(&dependency); 2203 } 2204 2205 void AttachmentPromiseNodeBase::onReady(Event* event) noexcept { 2206 dependency->onReady(event); 2207 } 2208 2209 void AttachmentPromiseNodeBase::get(ExceptionOrValue& output) noexcept { 2210 dependency->get(output); 2211 } 2212 2213 void AttachmentPromiseNodeBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) { 2214 dependency->tracePromise(builder, stopAtNextEvent); 2215 2216 // TODO(debug): Maybe use __builtin_return_address to get the locations that called fork() and 2217 // addBranch()? 2218 } 2219 2220 void AttachmentPromiseNodeBase::dropDependency() { 2221 dependency = nullptr; 2222 } 2223 2224 // ------------------------------------------------------------------- 2225 2226 TransformPromiseNodeBase::TransformPromiseNodeBase( 2227 Own<PromiseNode>&& dependencyParam, void* continuationTracePtr) 2228 : dependency(kj::mv(dependencyParam)), continuationTracePtr(continuationTracePtr) { 2229 dependency->setSelfPointer(&dependency); 2230 } 2231 2232 void TransformPromiseNodeBase::onReady(Event* event) noexcept { 2233 dependency->onReady(event); 2234 } 2235 2236 void TransformPromiseNodeBase::get(ExceptionOrValue& output) noexcept { 2237 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { 2238 getImpl(output); 2239 dropDependency(); 2240 })) { 2241 output.addException(kj::mv(*exception)); 2242 } 2243 } 2244 2245 void TransformPromiseNodeBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) { 2246 // Note that we null out the dependency just before calling our own continuation, which 2247 // conveniently means that if we're currently executing the continuation when the trace is 2248 // requested, it won't trace into the obsolete dependency. Nice. 2249 if (dependency.get() != nullptr) { 2250 dependency->tracePromise(builder, stopAtNextEvent); 2251 } 2252 2253 builder.add(continuationTracePtr); 2254 } 2255 2256 void TransformPromiseNodeBase::dropDependency() { 2257 dependency = nullptr; 2258 } 2259 2260 void TransformPromiseNodeBase::getDepResult(ExceptionOrValue& output) { 2261 dependency->get(output); 2262 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { 2263 dependency = nullptr; 2264 })) { 2265 output.addException(kj::mv(*exception)); 2266 } 2267 2268 KJ_IF_MAYBE(e, output.exception) { 2269 e->addTrace(continuationTracePtr); 2270 } 2271 } 2272 2273 // ------------------------------------------------------------------- 2274 2275 ForkBranchBase::ForkBranchBase(Own<ForkHubBase>&& hubParam): hub(kj::mv(hubParam)) { 2276 if (hub->tailBranch == nullptr) { 2277 onReadyEvent.arm(); 2278 } else { 2279 // Insert into hub's linked list of branches. 2280 prevPtr = hub->tailBranch; 2281 *prevPtr = this; 2282 next = nullptr; 2283 hub->tailBranch = &next; 2284 } 2285 } 2286 2287 ForkBranchBase::~ForkBranchBase() noexcept(false) { 2288 if (prevPtr != nullptr) { 2289 // Remove from hub's linked list of branches. 2290 *prevPtr = next; 2291 (next == nullptr ? hub->tailBranch : next->prevPtr) = prevPtr; 2292 } 2293 } 2294 2295 void ForkBranchBase::hubReady() noexcept { 2296 onReadyEvent.arm(); 2297 } 2298 2299 void ForkBranchBase::releaseHub(ExceptionOrValue& output) { 2300 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() { 2301 hub = nullptr; 2302 })) { 2303 output.addException(kj::mv(*exception)); 2304 } 2305 } 2306 2307 void ForkBranchBase::onReady(Event* event) noexcept { 2308 onReadyEvent.init(event); 2309 } 2310 2311 void ForkBranchBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) { 2312 if (stopAtNextEvent) return; 2313 2314 if (hub.get() != nullptr) { 2315 hub->inner->tracePromise(builder, false); 2316 } 2317 2318 // TODO(debug): Maybe use __builtin_return_address to get the locations that called fork() and 2319 // addBranch()? 2320 } 2321 2322 // ------------------------------------------------------------------- 2323 2324 ForkHubBase::ForkHubBase(Own<PromiseNode>&& innerParam, ExceptionOrValue& resultRef) 2325 : inner(kj::mv(innerParam)), resultRef(resultRef) { 2326 inner->setSelfPointer(&inner); 2327 inner->onReady(this); 2328 } 2329 2330 Maybe<Own<Event>> ForkHubBase::fire() { 2331 // Dependency is ready. Fetch its result and then delete the node. 2332 inner->get(resultRef); 2333 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() { 2334 inner = nullptr; 2335 })) { 2336 resultRef.addException(kj::mv(*exception)); 2337 } 2338 2339 for (auto branch = headBranch; branch != nullptr; branch = branch->next) { 2340 branch->hubReady(); 2341 *branch->prevPtr = nullptr; 2342 branch->prevPtr = nullptr; 2343 } 2344 *tailBranch = nullptr; 2345 2346 // Indicate that the list is no longer active. 2347 tailBranch = nullptr; 2348 2349 return nullptr; 2350 } 2351 2352 void ForkHubBase::traceEvent(TraceBuilder& builder) { 2353 if (inner.get() != nullptr) { 2354 inner->tracePromise(builder, true); 2355 } 2356 2357 if (headBranch != nullptr) { 2358 // We'll trace down the first branch, I guess. 2359 headBranch->onReadyEvent.traceEvent(builder); 2360 } 2361 } 2362 2363 // ------------------------------------------------------------------- 2364 2365 ChainPromiseNode::ChainPromiseNode(Own<PromiseNode> innerParam) 2366 : state(STEP1), inner(kj::mv(innerParam)) { 2367 inner->setSelfPointer(&inner); 2368 inner->onReady(this); 2369 } 2370 2371 ChainPromiseNode::~ChainPromiseNode() noexcept(false) {} 2372 2373 void ChainPromiseNode::onReady(Event* event) noexcept { 2374 switch (state) { 2375 case STEP1: 2376 onReadyEvent = event; 2377 return; 2378 case STEP2: 2379 inner->onReady(event); 2380 return; 2381 } 2382 KJ_UNREACHABLE; 2383 } 2384 2385 void ChainPromiseNode::setSelfPointer(Own<PromiseNode>* selfPtr) noexcept { 2386 if (state == STEP2) { 2387 *selfPtr = kj::mv(inner); // deletes this! 2388 selfPtr->get()->setSelfPointer(selfPtr); 2389 } else { 2390 this->selfPtr = selfPtr; 2391 } 2392 } 2393 2394 void ChainPromiseNode::get(ExceptionOrValue& output) noexcept { 2395 KJ_REQUIRE(state == STEP2); 2396 return inner->get(output); 2397 } 2398 2399 void ChainPromiseNode::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) { 2400 if (stopAtNextEvent && state == STEP1) { 2401 // In STEP1, we are an Event -- when the inner node resolves, it will arm *this* object. 2402 // In STEP2, we are not an Event -- when the inner node resolves, it directly arms our parent 2403 // event. 2404 return; 2405 } 2406 2407 inner->tracePromise(builder, stopAtNextEvent); 2408 } 2409 2410 Maybe<Own<Event>> ChainPromiseNode::fire() { 2411 KJ_REQUIRE(state != STEP2); 2412 2413 static_assert(sizeof(Promise<int>) == sizeof(PromiseBase), 2414 "This code assumes Promise<T> does not add any new members to PromiseBase."); 2415 2416 ExceptionOr<PromiseBase> intermediate; 2417 inner->get(intermediate); 2418 2419 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() { 2420 inner = nullptr; 2421 })) { 2422 intermediate.addException(kj::mv(*exception)); 2423 } 2424 2425 KJ_IF_MAYBE(exception, intermediate.exception) { 2426 // There is an exception. If there is also a value, delete it. 2427 kj::runCatchingExceptions([&]() { intermediate.value = nullptr; }); 2428 // Now set step2 to a rejected promise. 2429 inner = heap<ImmediateBrokenPromiseNode>(kj::mv(*exception)); 2430 } else KJ_IF_MAYBE(value, intermediate.value) { 2431 // There is a value and no exception. The value is itself a promise. Adopt it as our 2432 // step2. 2433 inner = _::PromiseNode::from(kj::mv(*value)); 2434 } else { 2435 // We can only get here if inner->get() returned neither an exception nor a 2436 // value, which never actually happens. 2437 KJ_FAIL_ASSERT("Inner node returned empty value."); 2438 } 2439 state = STEP2; 2440 2441 if (selfPtr != nullptr) { 2442 // Hey, we can shorten the chain here. 2443 auto chain = selfPtr->downcast<ChainPromiseNode>(); 2444 *selfPtr = kj::mv(inner); 2445 selfPtr->get()->setSelfPointer(selfPtr); 2446 if (onReadyEvent != nullptr) { 2447 selfPtr->get()->onReady(onReadyEvent); 2448 } 2449 2450 // Return our self-pointer so that the caller takes care of deleting it. 2451 return Own<Event>(kj::mv(chain)); 2452 } else { 2453 inner->setSelfPointer(&inner); 2454 if (onReadyEvent != nullptr) { 2455 inner->onReady(onReadyEvent); 2456 } 2457 2458 return nullptr; 2459 } 2460 } 2461 2462 void ChainPromiseNode::traceEvent(TraceBuilder& builder) { 2463 switch (state) { 2464 case STEP1: 2465 if (inner.get() != nullptr) { 2466 inner->tracePromise(builder, true); 2467 } 2468 if (!builder.full() && onReadyEvent != nullptr) { 2469 onReadyEvent->traceEvent(builder); 2470 } 2471 break; 2472 case STEP2: 2473 // This probably never happens -- a trace being generated after the meat of fire() already 2474 // executed. If it does, though, we probably can't do anything here. We don't know if 2475 // `onReadyEvent` is still valid because we passed it on to the phase-2 promise, and tracing 2476 // just `inner` would probably be confusing. Let's just do nothing. 2477 break; 2478 } 2479 } 2480 2481 // ------------------------------------------------------------------- 2482 2483 ExclusiveJoinPromiseNode::ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<PromiseNode> right) 2484 : left(*this, kj::mv(left)), right(*this, kj::mv(right)) {} 2485 2486 ExclusiveJoinPromiseNode::~ExclusiveJoinPromiseNode() noexcept(false) {} 2487 2488 void ExclusiveJoinPromiseNode::onReady(Event* event) noexcept { 2489 onReadyEvent.init(event); 2490 } 2491 2492 void ExclusiveJoinPromiseNode::get(ExceptionOrValue& output) noexcept { 2493 KJ_REQUIRE(left.get(output) || right.get(output), "get() called before ready."); 2494 } 2495 2496 void ExclusiveJoinPromiseNode::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) { 2497 // TODO(debug): Maybe use __builtin_return_address to get the locations that called 2498 // exclusiveJoin()? 2499 2500 if (stopAtNextEvent) return; 2501 2502 // Trace the left branch I guess. 2503 if (left.dependency.get() != nullptr) { 2504 left.dependency->tracePromise(builder, false); 2505 } else if (right.dependency.get() != nullptr) { 2506 right.dependency->tracePromise(builder, false); 2507 } 2508 } 2509 2510 ExclusiveJoinPromiseNode::Branch::Branch( 2511 ExclusiveJoinPromiseNode& joinNode, Own<PromiseNode> dependencyParam) 2512 : joinNode(joinNode), dependency(kj::mv(dependencyParam)) { 2513 dependency->setSelfPointer(&dependency); 2514 dependency->onReady(this); 2515 } 2516 2517 ExclusiveJoinPromiseNode::Branch::~Branch() noexcept(false) {} 2518 2519 bool ExclusiveJoinPromiseNode::Branch::get(ExceptionOrValue& output) { 2520 if (dependency) { 2521 dependency->get(output); 2522 return true; 2523 } else { 2524 return false; 2525 } 2526 } 2527 2528 Maybe<Own<Event>> ExclusiveJoinPromiseNode::Branch::fire() { 2529 if (dependency) { 2530 // Cancel the branch that didn't return first. Ignore exceptions caused by cancellation. 2531 if (this == &joinNode.left) { 2532 kj::runCatchingExceptions([&]() { joinNode.right.dependency = nullptr; }); 2533 } else { 2534 kj::runCatchingExceptions([&]() { joinNode.left.dependency = nullptr; }); 2535 } 2536 2537 joinNode.onReadyEvent.arm(); 2538 } else { 2539 // The other branch already fired, and this branch was canceled. It's possible for both 2540 // branches to fire if both were armed simultaneously. 2541 } 2542 return nullptr; 2543 } 2544 2545 void ExclusiveJoinPromiseNode::Branch::traceEvent(TraceBuilder& builder) { 2546 if (dependency.get() != nullptr) { 2547 dependency->tracePromise(builder, true); 2548 } 2549 joinNode.onReadyEvent.traceEvent(builder); 2550 } 2551 2552 // ------------------------------------------------------------------- 2553 2554 ArrayJoinPromiseNodeBase::ArrayJoinPromiseNodeBase( 2555 Array<Own<PromiseNode>> promises, ExceptionOrValue* resultParts, size_t partSize) 2556 : countLeft(promises.size()) { 2557 // Make the branches. 2558 auto builder = heapArrayBuilder<Branch>(promises.size()); 2559 for (uint i: indices(promises)) { 2560 ExceptionOrValue& output = *reinterpret_cast<ExceptionOrValue*>( 2561 reinterpret_cast<byte*>(resultParts) + i * partSize); 2562 builder.add(*this, kj::mv(promises[i]), output); 2563 } 2564 branches = builder.finish(); 2565 2566 if (branches.size() == 0) { 2567 onReadyEvent.arm(); 2568 } 2569 } 2570 ArrayJoinPromiseNodeBase::~ArrayJoinPromiseNodeBase() noexcept(false) {} 2571 2572 void ArrayJoinPromiseNodeBase::onReady(Event* event) noexcept { 2573 onReadyEvent.init(event); 2574 } 2575 2576 void ArrayJoinPromiseNodeBase::get(ExceptionOrValue& output) noexcept { 2577 // If any of the elements threw exceptions, propagate them. 2578 for (auto& branch: branches) { 2579 KJ_IF_MAYBE(exception, branch.getPart()) { 2580 output.addException(kj::mv(*exception)); 2581 } 2582 } 2583 2584 if (output.exception == nullptr) { 2585 // No errors. The template subclass will need to fill in the result. 2586 getNoError(output); 2587 } 2588 } 2589 2590 void ArrayJoinPromiseNodeBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) { 2591 // TODO(debug): Maybe use __builtin_return_address to get the locations that called 2592 // joinPromises()? 2593 2594 if (stopAtNextEvent) return; 2595 2596 // Trace the first branch I guess. 2597 if (branches != nullptr) { 2598 branches[0].dependency->tracePromise(builder, false); 2599 } 2600 } 2601 2602 ArrayJoinPromiseNodeBase::Branch::Branch( 2603 ArrayJoinPromiseNodeBase& joinNode, Own<PromiseNode> dependencyParam, ExceptionOrValue& output) 2604 : joinNode(joinNode), dependency(kj::mv(dependencyParam)), output(output) { 2605 dependency->setSelfPointer(&dependency); 2606 dependency->onReady(this); 2607 } 2608 2609 ArrayJoinPromiseNodeBase::Branch::~Branch() noexcept(false) {} 2610 2611 Maybe<Own<Event>> ArrayJoinPromiseNodeBase::Branch::fire() { 2612 if (--joinNode.countLeft == 0) { 2613 joinNode.onReadyEvent.arm(); 2614 } 2615 return nullptr; 2616 } 2617 2618 void ArrayJoinPromiseNodeBase::Branch::traceEvent(TraceBuilder& builder) { 2619 dependency->tracePromise(builder, true); 2620 joinNode.onReadyEvent.traceEvent(builder); 2621 } 2622 2623 Maybe<Exception> ArrayJoinPromiseNodeBase::Branch::getPart() { 2624 dependency->get(output); 2625 return kj::mv(output.exception); 2626 } 2627 2628 ArrayJoinPromiseNode<void>::ArrayJoinPromiseNode( 2629 Array<Own<PromiseNode>> promises, Array<ExceptionOr<_::Void>> resultParts) 2630 : ArrayJoinPromiseNodeBase(kj::mv(promises), resultParts.begin(), sizeof(ExceptionOr<_::Void>)), 2631 resultParts(kj::mv(resultParts)) {} 2632 2633 ArrayJoinPromiseNode<void>::~ArrayJoinPromiseNode() {} 2634 2635 void ArrayJoinPromiseNode<void>::getNoError(ExceptionOrValue& output) noexcept { 2636 output.as<_::Void>() = _::Void(); 2637 } 2638 2639 } // namespace _ (private) 2640 2641 Promise<void> joinPromises(Array<Promise<void>>&& promises) { 2642 return _::PromiseNode::to<Promise<void>>(kj::heap<_::ArrayJoinPromiseNode<void>>( 2643 KJ_MAP(p, promises) { return _::PromiseNode::from(kj::mv(p)); }, 2644 heapArray<_::ExceptionOr<_::Void>>(promises.size()))); 2645 } 2646 2647 namespace _ { // (private) 2648 2649 // ------------------------------------------------------------------- 2650 2651 EagerPromiseNodeBase::EagerPromiseNodeBase( 2652 Own<PromiseNode>&& dependencyParam, ExceptionOrValue& resultRef) 2653 : dependency(kj::mv(dependencyParam)), resultRef(resultRef) { 2654 dependency->setSelfPointer(&dependency); 2655 dependency->onReady(this); 2656 } 2657 2658 void EagerPromiseNodeBase::onReady(Event* event) noexcept { 2659 onReadyEvent.init(event); 2660 } 2661 2662 void EagerPromiseNodeBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) { 2663 // TODO(debug): Maybe use __builtin_return_address to get the locations that called 2664 // eagerlyEvaluate()? But note that if a non-null exception handler was passed to it, that 2665 // creates a TransformPromiseNode which will report the location anyhow. 2666 2667 if (stopAtNextEvent) return; 2668 if (dependency.get() != nullptr) { 2669 dependency->tracePromise(builder, stopAtNextEvent); 2670 } 2671 } 2672 2673 void EagerPromiseNodeBase::traceEvent(TraceBuilder& builder) { 2674 if (dependency.get() != nullptr) { 2675 dependency->tracePromise(builder, true); 2676 } 2677 onReadyEvent.traceEvent(builder); 2678 } 2679 2680 Maybe<Own<Event>> EagerPromiseNodeBase::fire() { 2681 dependency->get(resultRef); 2682 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() { 2683 dependency = nullptr; 2684 })) { 2685 resultRef.addException(kj::mv(*exception)); 2686 } 2687 2688 onReadyEvent.arm(); 2689 return nullptr; 2690 } 2691 2692 // ------------------------------------------------------------------- 2693 2694 void AdapterPromiseNodeBase::onReady(Event* event) noexcept { 2695 onReadyEvent.init(event); 2696 } 2697 2698 void AdapterPromiseNodeBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) { 2699 // Maybe returning the address of get() will give us a function name with meaningful type 2700 // information. 2701 builder.add(getMethodStartAddress(implicitCast<PromiseNode&>(*this), &PromiseNode::get)); 2702 } 2703 2704 void END_FULFILLER_STACK_START_LISTENER_STACK() {} 2705 // Dummy symbol used when reporting how a PromiseFulfiller was destroyed without fulfilling the 2706 // promise. We end up combining two stack traces into one and we use this as a separator. 2707 2708 void WeakFulfillerBase::disposeImpl(void* pointer) const { 2709 if (inner == nullptr) { 2710 // Already detached. 2711 delete this; 2712 } else { 2713 if (inner->isWaiting()) { 2714 // Let's find out if there's an exception being thrown. If so, we'll use it to reject the 2715 // promise. 2716 inner->reject(getDestructionReason( 2717 reinterpret_cast<void*>(&END_FULFILLER_STACK_START_LISTENER_STACK), 2718 kj::Exception::Type::FAILED, __FILE__, __LINE__, 2719 "PromiseFulfiller was destroyed without fulfilling the promise."_kj)); 2720 } 2721 inner = nullptr; 2722 } 2723 } 2724 2725 } // namespace _ (private) 2726 2727 // ------------------------------------------------------------------- 2728 2729 namespace _ { // (private) 2730 2731 Promise<void> IdentityFunc<Promise<void>>::operator()() const { return READY_NOW; } 2732 2733 } // namespace _ (private) 2734 2735 // ------------------------------------------------------------------- 2736 2737 #if KJ_HAS_COROUTINE 2738 2739 namespace _ { // (private) 2740 2741 CoroutineBase::CoroutineBase(stdcoro::coroutine_handle<> coroutine, ExceptionOrValue& resultRef) 2742 : coroutine(coroutine), 2743 resultRef(resultRef) {} 2744 CoroutineBase::~CoroutineBase() noexcept(false) { 2745 readMaybe(maybeDisposalResults)->destructorRan = true; 2746 } 2747 2748 void CoroutineBase::unhandled_exception() { 2749 // Pretty self-explanatory, we propagate the exception to the promise which owns us, unless 2750 // we're being destroyed, in which case we propagate it back to our disposer. Note that all 2751 // unhandled exceptions end up here, not just ones after the first co_await. 2752 2753 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([] { throw; })) { 2754 KJ_IF_MAYBE(disposalResults, maybeDisposalResults) { 2755 // Exception during coroutine destruction. Only record the first one. 2756 if (disposalResults->exception == nullptr) { 2757 disposalResults->exception = kj::mv(*exception); 2758 } 2759 } else if (isWaiting()) { 2760 // Exception during coroutine execution. 2761 resultRef.addException(kj::mv(*exception)); 2762 scheduleResumption(); 2763 } else { 2764 // Okay, what could this mean? We've already been fulfilled or rejected, but we aren't being 2765 // destroyed yet. The only possibility is that we are unwinding the coroutine frame due to a 2766 // successful completion, and something in the frame threw. We can't already be rejected, 2767 // because rejecting a coroutine involves throwing, which would have unwound the frame prior 2768 // to setting `waiting = false`. 2769 // 2770 // Since we know we're unwinding due to a successful completion, we also know that whatever 2771 // Event we may have armed has not yet fired, because we haven't had a chance to return to 2772 // the event loop. 2773 2774 // final_suspend() has not been called. 2775 KJ_IASSERT(!coroutine.done()); 2776 2777 // Since final_suspend() hasn't been called, whatever Event is waiting on us has not fired, 2778 // and will see this exception. 2779 resultRef.addException(kj::mv(*exception)); 2780 } 2781 } else { 2782 KJ_UNREACHABLE; 2783 } 2784 } 2785 2786 void CoroutineBase::onReady(Event* event) noexcept { 2787 onReadyEvent.init(event); 2788 } 2789 2790 void CoroutineBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) { 2791 if (stopAtNextEvent) return; 2792 2793 KJ_IF_MAYBE(promise, promiseNodeForTrace) { 2794 promise->tracePromise(builder, stopAtNextEvent); 2795 } 2796 2797 // Maybe returning the address of coroutine() will give us a function name with meaningful type 2798 // information. (Narrator: It doesn't.) 2799 builder.add(GetFunctorStartAddress<>::apply(coroutine)); 2800 }; 2801 2802 Maybe<Own<Event>> CoroutineBase::fire() { 2803 // Call Awaiter::await_resume() and proceed with the coroutine. Note that this will not destroy 2804 // the coroutine if control flows off the end of it, because we return suspend_always() from 2805 // final_suspend(). 2806 // 2807 // It's tempting to arrange to check for exceptions right now and reject the promise that owns 2808 // us without resuming the coroutine, which would save us from throwing an exception when we 2809 // already know where it's going. But, we don't really know: unlike in the KJ_NO_EXCEPTIONS 2810 // case, the `co_await` might be in a try-catch block, so we have no choice but to resume and 2811 // throw later. 2812 // 2813 // TODO(someday): If we ever support coroutines with -fno-exceptions, we'll need to reject the 2814 // enclosing coroutine promise here, if the Awaiter's result is exceptional. 2815 2816 promiseNodeForTrace = nullptr; 2817 2818 coroutine.resume(); 2819 2820 return nullptr; 2821 } 2822 2823 void CoroutineBase::traceEvent(TraceBuilder& builder) { 2824 KJ_IF_MAYBE(promise, promiseNodeForTrace) { 2825 promise->tracePromise(builder, true); 2826 } 2827 2828 // Maybe returning the address of coroutine() will give us a function name with meaningful type 2829 // information. (Narrator: It doesn't.) 2830 builder.add(GetFunctorStartAddress<>::apply(coroutine)); 2831 2832 onReadyEvent.traceEvent(builder); 2833 } 2834 2835 void CoroutineBase::disposeImpl(void* pointer) const { 2836 KJ_IASSERT(pointer == this); 2837 2838 // const_cast okay -- every Own<PromiseNode> that we build in get_return_object() uses itself 2839 // as the disposer, thus every disposer is unique and there are no thread-safety concerns. 2840 const_cast<CoroutineBase&>(*this).destroy(); 2841 } 2842 2843 void CoroutineBase::destroy() { 2844 // Mutable helper function for disposeImpl(). Basically a wrapper around coroutine.destroy() 2845 // with some stuff to propagate exceptions appropriately. 2846 2847 // Objects in the coroutine frame might throw from their destructors, so unhandled_exception() 2848 // will need some way to communicate those exceptions back to us. Separately, we also want 2849 // confirmation that our own ~Coroutine() destructor ran. To solve this, we put a 2850 // DisposalResults object on the stack and set a pointer to it in the Coroutine object. This 2851 // indicates to unhandled_exception() and ~Coroutine() where to store the results of the 2852 // destruction operation. 2853 DisposalResults disposalResults; 2854 maybeDisposalResults = &disposalResults; 2855 2856 // Need to save this while `unwindDetector` is still valid. 2857 bool shouldRethrow = !unwindDetector.isUnwinding(); 2858 2859 do { 2860 // Clang's implementation of the Coroutines TS does not destroy the Coroutine object or 2861 // deallocate the coroutine frame if a destructor of an object on the frame threw an 2862 // exception. This is despite the fact that it delivered the exception to _us_ via 2863 // unhandled_exception(). Anyway, it appears we can work around this by running 2864 // coroutine.destroy() a second time. 2865 // 2866 // On Clang, `disposalResults.exception != nullptr` implies `!disposalResults.destructorRan`. 2867 // We could optimize out the separate `destructorRan` flag if we verify that other compilers 2868 // behave the same way. 2869 coroutine.destroy(); 2870 } while (!disposalResults.destructorRan); 2871 2872 // WARNING: `this` is now a dangling pointer. 2873 2874 KJ_IF_MAYBE(exception, disposalResults.exception) { 2875 if (shouldRethrow) { 2876 kj::throwFatalException(kj::mv(*exception)); 2877 } else { 2878 // An exception is already unwinding the stack, so throwing this secondary exception would 2879 // call std::terminate(). 2880 } 2881 } 2882 } 2883 2884 CoroutineBase::AwaiterBase::AwaiterBase(Own<PromiseNode> node): node(kj::mv(node)) {} 2885 CoroutineBase::AwaiterBase::AwaiterBase(AwaiterBase&&) = default; 2886 CoroutineBase::AwaiterBase::~AwaiterBase() noexcept(false) { 2887 // Make sure it's safe to generate an async stack trace between now and when the Coroutine is 2888 // destroyed. 2889 KJ_IF_MAYBE(coroutineEvent, maybeCoroutineEvent) { 2890 coroutineEvent->promiseNodeForTrace = nullptr; 2891 } 2892 2893 unwindDetector.catchExceptionsIfUnwinding([this]() { 2894 // No need to check for a moved-from state, node will just ignore the nullification. 2895 node = nullptr; 2896 }); 2897 } 2898 2899 void CoroutineBase::AwaiterBase::getImpl(ExceptionOrValue& result) { 2900 node->get(result); 2901 2902 KJ_IF_MAYBE(exception, result.exception) { 2903 kj::throwFatalException(kj::mv(*exception)); 2904 } 2905 } 2906 2907 bool CoroutineBase::AwaiterBase::awaitSuspendImpl(CoroutineBase& coroutineEvent) { 2908 node->setSelfPointer(&node); 2909 node->onReady(&coroutineEvent); 2910 2911 if (coroutineEvent.isNext()) { 2912 // The result is immediately ready! Let's cancel our event. 2913 coroutineEvent.disarm(); 2914 2915 // We can resume ourselves by returning false. This accomplishes the same thing as if we had 2916 // returned true from await_ready(). 2917 return false; 2918 } else { 2919 // Otherwise, we must suspend. Store a reference to the promise we're waiting on for tracing 2920 // purposes; coroutineEvent.fire() and/or ~Adapter() will null this out. 2921 coroutineEvent.promiseNodeForTrace = *node; 2922 maybeCoroutineEvent = coroutineEvent; 2923 2924 return true; 2925 } 2926 } 2927 2928 } // namespace _ (private) 2929 2930 #endif // KJ_HAS_COROUTINE 2931 2932 } // namespace kj