mutex.c++ (40017B)
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #if _WIN32 || __CYGWIN__ 23 #include "win32-api-version.h" 24 #endif 25 26 #include "mutex.h" 27 #include "debug.h" 28 29 #if !_WIN32 && !__CYGWIN__ 30 #include <time.h> 31 #include <errno.h> 32 #endif 33 34 #if KJ_USE_FUTEX 35 #include <unistd.h> 36 #include <sys/syscall.h> 37 #include <linux/futex.h> 38 #include <limits.h> 39 40 #ifndef SYS_futex 41 // Missing on Android/Bionic. 42 #ifdef __NR_futex 43 #define SYS_futex __NR_futex 44 #elif defined(SYS_futex_time64) 45 #define SYS_futex SYS_futex_time64 46 #else 47 #error "Need working SYS_futex" 48 #endif 49 #endif 50 51 #ifndef FUTEX_WAIT_PRIVATE 52 // Missing on Android/Bionic. 53 #define FUTEX_WAIT_PRIVATE FUTEX_WAIT 54 #define FUTEX_WAKE_PRIVATE FUTEX_WAKE 55 #endif 56 57 #elif _WIN32 || __CYGWIN__ 58 #include <windows.h> 59 #endif 60 61 namespace kj { 62 #if KJ_TRACK_LOCK_BLOCKING 63 static thread_local const BlockedOnReason* tlsBlockReason __attribute((tls_model("initial-exec"))); 64 // The initial-exec model ensures that even if this code is part of a shared library built PIC, then 65 // we still place this variable in the appropriate ELF section so that __tls_get_addr is avoided. 66 // It's unclear if __tls_get_addr is still not async signal safe in glibc. The only negative 67 // downside of this approach is that a shared library built with kj & lock tracking will fail if 68 // dlopen'ed which isn't an intended use-case for the initial implementation. 69 70 Maybe<const BlockedOnReason&> blockedReason() noexcept { 71 if (tlsBlockReason == nullptr) { 72 return nullptr; 73 } 74 return *tlsBlockReason; 75 } 76 77 static void setCurrentThreadIsWaitingFor(const BlockedOnReason* meta) { 78 tlsBlockReason = meta; 79 } 80 81 static void setCurrentThreadIsNoLongerWaiting() { 82 tlsBlockReason = nullptr; 83 } 84 #elif KJ_USE_FUTEX 85 struct BlockedOnMutexAcquisition { 86 constexpr BlockedOnMutexAcquisition(const _::Mutex& mutex, LockSourceLocationArg) {} 87 }; 88 89 struct BlockedOnCondVarWait { 90 constexpr BlockedOnCondVarWait(const _::Mutex& mutex, const void *waiter, 91 LockSourceLocationArg) {} 92 }; 93 94 struct BlockedOnOnceInit { 95 constexpr BlockedOnOnceInit(const _::Once& once, LockSourceLocationArg) {} 96 }; 97 98 struct BlockedOnReason { 99 constexpr BlockedOnReason(const BlockedOnMutexAcquisition&) {} 100 constexpr BlockedOnReason(const BlockedOnCondVarWait&) {} 101 constexpr BlockedOnReason(const BlockedOnOnceInit&) {} 102 }; 103 104 static void setCurrentThreadIsWaitingFor(const BlockedOnReason* meta) {} 105 static void setCurrentThreadIsNoLongerWaiting() {} 106 #endif 107 108 namespace _ { // private 109 110 #if KJ_USE_FUTEX 111 constexpr uint Mutex::EXCLUSIVE_HELD; 112 constexpr uint Mutex::EXCLUSIVE_REQUESTED; 113 constexpr uint Mutex::SHARED_COUNT_MASK; 114 #endif 115 116 inline void Mutex::addWaiter(Waiter& waiter) { 117 #ifdef KJ_DEBUG 118 assertLockedByCaller(EXCLUSIVE); 119 #endif 120 *waitersTail = waiter; 121 waitersTail = &waiter.next; 122 } 123 inline void Mutex::removeWaiter(Waiter& waiter) { 124 #ifdef KJ_DEBUG 125 assertLockedByCaller(EXCLUSIVE); 126 #endif 127 *waiter.prev = waiter.next; 128 KJ_IF_MAYBE(next, waiter.next) { 129 next->prev = waiter.prev; 130 } else { 131 KJ_DASSERT(waitersTail == &waiter.next); 132 waitersTail = waiter.prev; 133 } 134 } 135 136 bool Mutex::checkPredicate(Waiter& waiter) { 137 // Run the predicate from a thread other than the waiting thread, returning true if it's time to 138 // signal the waiting thread. This is not only when the predicate passes, but also when it 139 // throws, in which case we want to propagate the exception to the waiting thread. 140 141 if (waiter.exception != nullptr) return true; // don't run again after an exception 142 143 bool result = false; 144 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { 145 result = waiter.predicate.check(); 146 })) { 147 // Exception thown. 148 result = true; 149 waiter.exception = kj::heap(kj::mv(*exception)); 150 }; 151 return result; 152 } 153 154 #if !_WIN32 && !__CYGWIN__ 155 namespace { 156 157 TimePoint toTimePoint(struct timespec ts) { 158 return kj::origin<TimePoint>() + ts.tv_sec * kj::SECONDS + ts.tv_nsec * kj::NANOSECONDS; 159 } 160 TimePoint now() { 161 struct timespec now; 162 KJ_SYSCALL(clock_gettime(CLOCK_MONOTONIC, &now)); 163 return toTimePoint(now); 164 } 165 struct timespec toRelativeTimespec(Duration timeout) { 166 struct timespec ts; 167 ts.tv_sec = timeout / kj::SECONDS; 168 ts.tv_nsec = timeout % kj::SECONDS / kj::NANOSECONDS; 169 return ts; 170 } 171 struct timespec toAbsoluteTimespec(TimePoint time) { 172 return toRelativeTimespec(time - kj::origin<TimePoint>()); 173 } 174 175 } // namespace 176 #endif 177 178 #if KJ_USE_FUTEX 179 // ======================================================================================= 180 // Futex-based implementation (Linux-only) 181 182 #if KJ_SAVE_ACQUIRED_LOCK_INFO 183 #if !__GLIBC_PREREQ(2, 30) 184 #ifndef SYS_gettid 185 #error SYS_gettid is unavailable on this system 186 #endif 187 188 #define gettid() ((pid_t)syscall(SYS_gettid)) 189 #endif 190 191 static thread_local pid_t tlsTid = gettid(); 192 #define TRACK_ACQUIRED_TID() tlsTid 193 194 Mutex::AcquiredMetadata Mutex::lockedInfo() const { 195 auto state = __atomic_load_n(&futex, __ATOMIC_RELAXED); 196 auto tid = lockedExclusivelyByThread; 197 auto location = lockAcquiredLocation; 198 199 if (state & EXCLUSIVE_HELD) { 200 return HoldingExclusively{tid, location}; 201 } else { 202 return HoldingShared{location}; 203 } 204 } 205 206 #else 207 #define TRACK_ACQUIRED_TID() 0 208 #endif 209 210 Mutex::Mutex(): futex(0) {} 211 Mutex::~Mutex() { 212 // This will crash anyway, might as well crash with a nice error message. 213 KJ_ASSERT(futex == 0, "Mutex destroyed while locked.") { break; } 214 } 215 216 bool Mutex::lock(Exclusivity exclusivity, Maybe<Duration> timeout, LockSourceLocationArg location) { 217 BlockedOnReason blockReason = BlockedOnMutexAcquisition{*this, location}; 218 KJ_DEFER(setCurrentThreadIsNoLongerWaiting()); 219 220 auto spec = timeout.map([](Duration d) { return toRelativeTimespec(d); }); 221 struct timespec* specp = nullptr; 222 KJ_IF_MAYBE(s, spec) { 223 specp = s; 224 } 225 226 switch (exclusivity) { 227 case EXCLUSIVE: 228 for (;;) { 229 uint state = 0; 230 if (KJ_LIKELY(__atomic_compare_exchange_n(&futex, &state, EXCLUSIVE_HELD, false, 231 __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) { 232 233 // Acquired. 234 break; 235 } 236 237 // The mutex is contended. Set the exclusive-requested bit and wait. 238 if ((state & EXCLUSIVE_REQUESTED) == 0) { 239 if (!__atomic_compare_exchange_n(&futex, &state, state | EXCLUSIVE_REQUESTED, false, 240 __ATOMIC_RELAXED, __ATOMIC_RELAXED)) { 241 // Oops, the state changed before we could set the request bit. Start over. 242 continue; 243 } 244 245 state |= EXCLUSIVE_REQUESTED; 246 } 247 248 setCurrentThreadIsWaitingFor(&blockReason); 249 250 auto result = syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, state, specp, nullptr, 0); 251 if (result < 0) { 252 if (errno == ETIMEDOUT) { 253 setCurrentThreadIsNoLongerWaiting(); 254 // We timed out, we can't remove the exclusive request flag (since others might be waiting) 255 // so we just return false. 256 return false; 257 } 258 } 259 } 260 acquiredExclusive(TRACK_ACQUIRED_TID(), location); 261 #if KJ_CONTENTION_WARNING_THRESHOLD 262 printContendedReader = false; 263 #endif 264 break; 265 case SHARED: { 266 #if KJ_CONTENTION_WARNING_THRESHOLD 267 kj::Maybe<kj::TimePoint> contentionWaitStart; 268 #endif 269 270 uint state = __atomic_add_fetch(&futex, 1, __ATOMIC_ACQUIRE); 271 272 for (;;) { 273 if (KJ_LIKELY((state & EXCLUSIVE_HELD) == 0)) { 274 // Acquired. 275 break; 276 } 277 278 #if KJ_CONTENTION_WARNING_THRESHOLD 279 if (contentionWaitStart == nullptr) { 280 // We could have the exclusive mutex tell us how long it was holding the lock. That would 281 // be the nicest. However, I'm hesitant to bloat the structure. I suspect having a reader 282 // tell us how long it was waiting for is probably a good proxy. 283 contentionWaitStart = kj::systemPreciseMonotonicClock().now(); 284 } 285 #endif 286 287 setCurrentThreadIsWaitingFor(&blockReason); 288 289 // The mutex is exclusively locked by another thread. Since we incremented the counter 290 // already, we just have to wait for it to be unlocked. 291 auto result = syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, state, specp, nullptr, 0); 292 if (result < 0) { 293 // If we timeout though, we need to signal that we're not waiting anymore. 294 if (errno == ETIMEDOUT) { 295 setCurrentThreadIsNoLongerWaiting(); 296 state = __atomic_sub_fetch(&futex, 1, __ATOMIC_RELAXED); 297 298 // We may have unlocked since we timed out. So act like we just unlocked the mutex 299 // and maybe send a wait signal if needed. See Mutex::unlock SHARED case. 300 if (KJ_UNLIKELY(state == EXCLUSIVE_REQUESTED)) { 301 if (__atomic_compare_exchange_n( 302 &futex, &state, 0, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) { 303 // Wake all exclusive waiters. We have to wake all of them because one of them will 304 // grab the lock while the others will re-establish the exclusive-requested bit. 305 syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0); 306 } 307 } 308 return false; 309 } 310 } 311 state = __atomic_load_n(&futex, __ATOMIC_ACQUIRE); 312 } 313 314 #ifdef KJ_CONTENTION_WARNING_THRESHOLD 315 KJ_IF_MAYBE(start, contentionWaitStart) { 316 if (__atomic_load_n(&printContendedReader, __ATOMIC_RELAXED)) { 317 // Double-checked lock avoids the CPU needing to acquire the lock in most cases. 318 if (__atomic_exchange_n(&printContendedReader, false, __ATOMIC_RELAXED)) { 319 auto contentionDuration = kj::systemPreciseMonotonicClock().now() - *start; 320 KJ_LOG(WARNING, "Acquired contended lock", location, contentionDuration, 321 kj::getStackTrace()); 322 } 323 } 324 } 325 #endif 326 327 // We just want to record the lock being acquired somewhere but the specific location doesn't 328 // matter. This does mean that race conditions could occur where a thread might read this 329 // inconsistently (e.g. filename from 1 lock & function from another). This currently is just 330 // meant to be a debugging aid for manual analysis so it's OK for that purpose. If it's ever 331 // required for this to be used for anything else, then this should probably be changed to 332 // use an additional atomic variable that can ensure only one writer updates this. Or use the 333 // futex variable to ensure that this is only done for the first one to acquire the lock, 334 // although there may be thundering herd problems with that whereby there's a long wallclock 335 // time between when the lock is acquired and when the location is updated (since the first 336 // locker isn't really guaranteed to be the first one unlocked). 337 acquiredShared(location); 338 339 break; 340 } 341 } 342 return true; 343 } 344 345 void Mutex::unlock(Exclusivity exclusivity, Waiter* waiterToSkip) { 346 switch (exclusivity) { 347 case EXCLUSIVE: { 348 KJ_DASSERT(futex & EXCLUSIVE_HELD, "Unlocked a mutex that wasn't locked."); 349 350 #ifdef KJ_CONTENTION_WARNING_THRESHOLD 351 auto acquiredLocation = releasingExclusive(); 352 #endif 353 354 // First check if there are any conditional waiters. Note we only do this when unlocking an 355 // exclusive lock since under a shared lock the state couldn't have changed. 356 auto nextWaiter = waitersHead; 357 for (;;) { 358 KJ_IF_MAYBE(waiter, nextWaiter) { 359 nextWaiter = waiter->next; 360 361 if (waiter != waiterToSkip && checkPredicate(*waiter)) { 362 // This waiter's predicate now evaluates true, so wake it up. 363 if (waiter->hasTimeout) { 364 // In this case we need to be careful to make sure the target thread isn't already 365 // processing a timeout, so we need to do an atomic CAS rather than just a store. 366 uint expected = 0; 367 if (__atomic_compare_exchange_n(&waiter->futex, &expected, 1, false, 368 __ATOMIC_RELEASE, __ATOMIC_RELAXED)) { 369 // Good, we set it to 1, transferring ownership of the mutex. Continue on below. 370 } else { 371 // Looks like the thread already timed out and set its own futex to 1. In that 372 // case it is going to try to lock the mutex itself, so we should NOT attempt an 373 // ownership transfer as this will deadlock. 374 // 375 // We have two options here: We can continue along the waiter list looking for 376 // another waiter that's ready to be signaled, or we could drop out of the list 377 // immediately since we know that another thread is already waiting for the lock 378 // and will re-evaluate the waiter queue itself when it is done. It feels cleaner 379 // to me to continue. 380 continue; 381 } 382 } else { 383 __atomic_store_n(&waiter->futex, 1, __ATOMIC_RELEASE); 384 } 385 syscall(SYS_futex, &waiter->futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0); 386 387 // We transferred ownership of the lock to this waiter, so we're done now. 388 return; 389 } 390 } else { 391 // No more waiters. 392 break; 393 } 394 } 395 396 #ifdef KJ_CONTENTION_WARNING_THRESHOLD 397 uint readerCount; 398 { 399 uint oldState = __atomic_load_n(&futex, __ATOMIC_RELAXED); 400 readerCount = oldState & SHARED_COUNT_MASK; 401 if (readerCount >= KJ_CONTENTION_WARNING_THRESHOLD) { 402 // Atomic not needed because we're still holding the exclusive lock. 403 printContendedReader = true; 404 } 405 } 406 #endif 407 408 // Didn't wake any waiters, so wake normally. 409 uint oldState = __atomic_fetch_and( 410 &futex, ~(EXCLUSIVE_HELD | EXCLUSIVE_REQUESTED), __ATOMIC_RELEASE); 411 412 if (KJ_UNLIKELY(oldState & ~EXCLUSIVE_HELD)) { 413 // Other threads are waiting. If there are any shared waiters, they now collectively hold 414 // the lock, and we must wake them up. If there are any exclusive waiters, we must wake 415 // them up even if readers are waiting so that at the very least they may re-establish the 416 // EXCLUSIVE_REQUESTED bit that we just removed. 417 syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0); 418 419 #ifdef KJ_CONTENTION_WARNING_THRESHOLD 420 if (readerCount >= KJ_CONTENTION_WARNING_THRESHOLD) { 421 KJ_LOG(WARNING, "excessively many readers were waiting on this lock", readerCount, 422 acquiredLocation, kj::getStackTrace()); 423 } 424 #endif 425 } 426 break; 427 } 428 429 case SHARED: { 430 KJ_DASSERT(futex & SHARED_COUNT_MASK, "Unshared a mutex that wasn't shared."); 431 uint state = __atomic_sub_fetch(&futex, 1, __ATOMIC_RELEASE); 432 433 // The only case where anyone is waiting is if EXCLUSIVE_REQUESTED is set, and the only time 434 // it makes sense to wake up that waiter is if the shared count has reached zero. 435 if (KJ_UNLIKELY(state == EXCLUSIVE_REQUESTED)) { 436 if (__atomic_compare_exchange_n( 437 &futex, &state, 0, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) { 438 // Wake all exclusive waiters. We have to wake all of them because one of them will 439 // grab the lock while the others will re-establish the exclusive-requested bit. 440 syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0); 441 } 442 } 443 break; 444 } 445 } 446 } 447 448 void Mutex::assertLockedByCaller(Exclusivity exclusivity) const { 449 switch (exclusivity) { 450 case EXCLUSIVE: 451 KJ_ASSERT(futex & EXCLUSIVE_HELD, 452 "Tried to call getAlreadyLocked*() but lock is not held."); 453 break; 454 case SHARED: 455 KJ_ASSERT(futex & SHARED_COUNT_MASK, 456 "Tried to call getAlreadyLocked*() but lock is not held."); 457 break; 458 } 459 } 460 461 void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout, LockSourceLocationArg location) { 462 // Add waiter to list. 463 Waiter waiter { nullptr, waitersTail, predicate, nullptr, 0, timeout != nullptr }; 464 addWaiter(waiter); 465 466 BlockedOnReason blockReason = BlockedOnCondVarWait{*this, &waiter, location}; 467 KJ_DEFER(setCurrentThreadIsNoLongerWaiting()); 468 469 // To guarantee that we've re-locked the mutex before scope exit, keep track of whether it is 470 // currently. 471 bool currentlyLocked = true; 472 KJ_DEFER({ 473 // Infinite timeout for re-obtaining the lock is on purpose because the post-condition for this 474 // function has to be that the lock state hasn't changed (& we have to be locked when we enter 475 // since that's how condvars work). 476 if (!currentlyLocked) lock(EXCLUSIVE, nullptr, location); 477 removeWaiter(waiter); 478 }); 479 480 if (!predicate.check()) { 481 unlock(EXCLUSIVE, &waiter); 482 currentlyLocked = false; 483 484 struct timespec ts; 485 struct timespec* tsp = nullptr; 486 KJ_IF_MAYBE(t, timeout) { 487 ts = toAbsoluteTimespec(now() + *t); 488 tsp = &ts; 489 } 490 491 setCurrentThreadIsWaitingFor(&blockReason); 492 493 // Wait for someone to set our futex to 1. 494 for (;;) { 495 // Note we use FUTEX_WAIT_BITSET_PRIVATE + FUTEX_BITSET_MATCH_ANY to get the same effect as 496 // FUTEX_WAIT_PRIVATE except that the timeout is specified as an absolute time based on 497 // CLOCK_MONOTONIC. Otherwise, FUTEX_WAIT_PRIVATE interprets it as a relative time, forcing 498 // us to recompute the time after every iteration. 499 KJ_SYSCALL_HANDLE_ERRORS(syscall(SYS_futex, 500 &waiter.futex, FUTEX_WAIT_BITSET_PRIVATE, 0, tsp, nullptr, FUTEX_BITSET_MATCH_ANY)) { 501 case EAGAIN: 502 // Indicates that the futex was already non-zero by the time the kernal looked at it. 503 // Not an error. 504 break; 505 case ETIMEDOUT: { 506 // Wait timed out. This leaves us in a bit of a pickle: Ownership of the mutex was not 507 // transferred to us from another thread. So, we need to lock it ourselves. But, another 508 // thread might be in the process of signaling us and transferring ownership. So, we 509 // first must atomically take control of our destiny. 510 KJ_ASSERT(timeout != nullptr); 511 uint expected = 0; 512 if (__atomic_compare_exchange_n(&waiter.futex, &expected, 1, false, 513 __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) { 514 // OK, we set our own futex to 1. That means no other thread will, and so we won't be 515 // receiving a mutex ownership transfer. We have to lock the mutex ourselves. 516 setCurrentThreadIsNoLongerWaiting(); 517 lock(EXCLUSIVE, nullptr, location); 518 currentlyLocked = true; 519 return; 520 } else { 521 // Oh, someone else actually did signal us, apparently. Let's move on as if the futex 522 // call told us so. 523 break; 524 } 525 } 526 default: 527 KJ_FAIL_SYSCALL("futex(FUTEX_WAIT_PRIVATE)", error); 528 } 529 530 setCurrentThreadIsNoLongerWaiting(); 531 532 if (__atomic_load_n(&waiter.futex, __ATOMIC_ACQUIRE)) { 533 // We received a lock ownership transfer from another thread. 534 currentlyLocked = true; 535 536 // The other thread checked the predicate before the transfer. 537 #ifdef KJ_DEBUG 538 assertLockedByCaller(EXCLUSIVE); 539 #endif 540 541 KJ_IF_MAYBE(exception, waiter.exception) { 542 // The predicate threw an exception, apparently. Propagate it. 543 // TODO(someday): Could we somehow have this be a recoverable exception? Presumably we'd 544 // then want MutexGuarded::when() to skip calling the callback, but then what should it 545 // return, since it normally returns the callback's result? Or maybe people who disable 546 // exceptions just really should not write predicates that can throw. 547 kj::throwFatalException(kj::mv(**exception)); 548 } 549 550 return; 551 } 552 } 553 } 554 } 555 556 void Mutex::induceSpuriousWakeupForTest() { 557 auto nextWaiter = waitersHead; 558 for (;;) { 559 KJ_IF_MAYBE(waiter, nextWaiter) { 560 nextWaiter = waiter->next; 561 syscall(SYS_futex, &waiter->futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0); 562 } else { 563 // No more waiters. 564 break; 565 } 566 } 567 } 568 569 uint Mutex::numReadersWaitingForTest() const { 570 assertLockedByCaller(EXCLUSIVE); 571 return futex & SHARED_COUNT_MASK; 572 } 573 574 void Once::runOnce(Initializer& init, LockSourceLocationArg location) { 575 startOver: 576 uint state = UNINITIALIZED; 577 if (__atomic_compare_exchange_n(&futex, &state, INITIALIZING, false, 578 __ATOMIC_RELAXED, __ATOMIC_RELAXED)) { 579 // It's our job to initialize! 580 { 581 KJ_ON_SCOPE_FAILURE({ 582 // An exception was thrown by the initializer. We have to revert. 583 if (__atomic_exchange_n(&futex, UNINITIALIZED, __ATOMIC_RELEASE) == 584 INITIALIZING_WITH_WAITERS) { 585 // Someone was waiting for us to finish. 586 syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0); 587 } 588 }); 589 590 init.run(); 591 } 592 if (__atomic_exchange_n(&futex, INITIALIZED, __ATOMIC_RELEASE) == 593 INITIALIZING_WITH_WAITERS) { 594 // Someone was waiting for us to finish. 595 syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0); 596 } 597 } else { 598 BlockedOnReason blockReason = BlockedOnOnceInit{*this, location}; 599 KJ_DEFER(setCurrentThreadIsNoLongerWaiting()); 600 601 for (;;) { 602 if (state == INITIALIZED) { 603 break; 604 } else if (state == INITIALIZING) { 605 // Initialization is taking place in another thread. Indicate that we're waiting. 606 if (!__atomic_compare_exchange_n(&futex, &state, INITIALIZING_WITH_WAITERS, true, 607 __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) { 608 // State changed, retry. 609 continue; 610 } 611 } else { 612 KJ_DASSERT(state == INITIALIZING_WITH_WAITERS); 613 } 614 615 // Wait for initialization. 616 setCurrentThreadIsWaitingFor(&blockReason); 617 syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, INITIALIZING_WITH_WAITERS, 618 nullptr, nullptr, 0); 619 state = __atomic_load_n(&futex, __ATOMIC_ACQUIRE); 620 621 if (state == UNINITIALIZED) { 622 // Oh hey, apparently whoever was trying to initialize gave up. Let's take it from the 623 // top. 624 goto startOver; 625 } 626 } 627 } 628 } 629 630 void Once::reset() { 631 uint state = INITIALIZED; 632 if (!__atomic_compare_exchange_n(&futex, &state, UNINITIALIZED, 633 false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) { 634 KJ_FAIL_REQUIRE("reset() called while not initialized."); 635 } 636 } 637 638 #elif _WIN32 || __CYGWIN__ 639 // ======================================================================================= 640 // Win32 implementation 641 642 #define coercedSrwLock (*reinterpret_cast<SRWLOCK*>(&srwLock)) 643 #define coercedInitOnce (*reinterpret_cast<INIT_ONCE*>(&initOnce)) 644 #define coercedCondvar(var) (*reinterpret_cast<CONDITION_VARIABLE*>(&var)) 645 646 Mutex::Mutex() { 647 static_assert(sizeof(SRWLOCK) == sizeof(srwLock), "SRWLOCK is not a pointer?"); 648 InitializeSRWLock(&coercedSrwLock); 649 } 650 Mutex::~Mutex() {} 651 652 bool Mutex::lock(Exclusivity exclusivity, Maybe<Duration> timeout, NoopSourceLocation) { 653 if (timeout != nullptr) { 654 KJ_UNIMPLEMENTED("Locking a mutex with a timeout is only supported on Linux."); 655 } 656 switch (exclusivity) { 657 case EXCLUSIVE: 658 AcquireSRWLockExclusive(&coercedSrwLock); 659 break; 660 case SHARED: 661 AcquireSRWLockShared(&coercedSrwLock); 662 break; 663 } 664 return true; 665 } 666 667 void Mutex::wakeReadyWaiter(Waiter* waiterToSkip) { 668 // Look for a waiter whose predicate is now evaluating true, and wake it. We wake no more than 669 // one waiter because only one waiter could get the lock anyway, and once it releases that lock 670 // it will awake the next waiter if necessary. 671 672 auto nextWaiter = waitersHead; 673 for (;;) { 674 KJ_IF_MAYBE(waiter, nextWaiter) { 675 nextWaiter = waiter->next; 676 677 if (waiter != waiterToSkip && checkPredicate(*waiter)) { 678 // This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we 679 // use Wake vs. WakeAll here since there's always only one thread waiting. 680 WakeConditionVariable(&coercedCondvar(waiter->condvar)); 681 682 // We only need to wake one waiter. Note that unlike the futex-based implementation, we 683 // cannot "transfer ownership" of the lock to the waiter, therefore we cannot guarantee 684 // that the condition is still true when that waiter finally awakes. However, if the 685 // condition is no longer true at that point, the waiter will re-check all other 686 // waiters' conditions and possibly wake up any other waiter who is now ready, hence we 687 // still only need to wake one waiter here. 688 return; 689 } 690 } else { 691 // No more waiters. 692 break; 693 } 694 } 695 } 696 697 void Mutex::unlock(Exclusivity exclusivity, Waiter* waiterToSkip) { 698 switch (exclusivity) { 699 case EXCLUSIVE: { 700 KJ_DEFER(ReleaseSRWLockExclusive(&coercedSrwLock)); 701 702 // Check if there are any conditional waiters. Note we only do this when unlocking an 703 // exclusive lock since under a shared lock the state couldn't have changed. 704 wakeReadyWaiter(waiterToSkip); 705 break; 706 } 707 708 case SHARED: 709 ReleaseSRWLockShared(&coercedSrwLock); 710 break; 711 } 712 } 713 714 void Mutex::assertLockedByCaller(Exclusivity exclusivity) const { 715 // We could use TryAcquireSRWLock*() here like we do with the pthread version. However, as of 716 // this writing, my version of Wine (1.6.2) doesn't implement these functions and will abort if 717 // they are called. Since we were only going to use them as a hacky way to check if the lock is 718 // held for debug purposes anyway, we just don't bother. 719 } 720 721 void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout, NoopSourceLocation) { 722 // Add waiter to list. 723 Waiter waiter { nullptr, waitersTail, predicate, nullptr, 0 }; 724 static_assert(sizeof(waiter.condvar) == sizeof(CONDITION_VARIABLE), 725 "CONDITION_VARIABLE is not a pointer?"); 726 InitializeConditionVariable(&coercedCondvar(waiter.condvar)); 727 728 addWaiter(waiter); 729 KJ_DEFER(removeWaiter(waiter)); 730 731 DWORD sleepMs; 732 733 // Only initialized if `timeout` is non-null. 734 const MonotonicClock* clock = nullptr; 735 kj::Maybe<kj::TimePoint> endTime; 736 737 KJ_IF_MAYBE(t, timeout) { 738 // Windows sleeps are inaccurate -- they can be longer *or shorter* than the requested amount. 739 // For many use cases of our API, a too-short sleep would be unacceptable. Experimentally, it 740 // seems like sleeps can be up to half a millisecond short, so we'll add half a millisecond 741 // (and then we round up, below). 742 *t += 500 * kj::MICROSECONDS; 743 744 // Compute initial sleep time. 745 sleepMs = *t / kj::MILLISECONDS; 746 if (*t % kj::MILLISECONDS > 0 * kj::SECONDS) { 747 // We guarantee we won't wake up too early. 748 ++sleepMs; 749 } 750 751 clock = &systemPreciseMonotonicClock(); 752 endTime = clock->now() + *t; 753 } else { 754 sleepMs = INFINITE; 755 } 756 757 while (!predicate.check()) { 758 // SleepConditionVariableSRW() will temporarily release the lock, so we need to signal other 759 // waiters that are now ready. 760 wakeReadyWaiter(&waiter); 761 762 if (SleepConditionVariableSRW(&coercedCondvar(waiter.condvar), &coercedSrwLock, sleepMs, 0)) { 763 // Normal result. Continue loop to check predicate. 764 } else { 765 DWORD error = GetLastError(); 766 if (error == ERROR_TIMEOUT) { 767 // Windows may have woken us up too early, so don't return yet. Instead, proceed through the 768 // loop and rely on our sleep time recalculation to detect if we timed out. 769 } else { 770 KJ_FAIL_WIN32("SleepConditionVariableSRW()", error); 771 } 772 } 773 774 KJ_IF_MAYBE(exception, waiter.exception) { 775 // The predicate threw an exception, apparently. Propagate it. 776 // TODO(someday): Could we somehow have this be a recoverable exception? Presumably we'd 777 // then want MutexGuarded::when() to skip calling the callback, but then what should it 778 // return, since it normally returns the callback's result? Or maybe people who disable 779 // exceptions just really should not write predicates that can throw. 780 kj::throwFatalException(kj::mv(**exception)); 781 } 782 783 // Recompute sleep time. 784 KJ_IF_MAYBE(e, endTime) { 785 auto now = clock->now(); 786 787 if (*e > now) { 788 auto sleepTime = *e - now; 789 sleepMs = sleepTime / kj::MILLISECONDS; 790 if (sleepTime % kj::MILLISECONDS > 0 * kj::SECONDS) { 791 // We guarantee we won't wake up too early. 792 ++sleepMs; 793 } 794 } else { 795 // Oops, already timed out. 796 return; 797 } 798 } 799 } 800 } 801 802 void Mutex::induceSpuriousWakeupForTest() { 803 auto nextWaiter = waitersHead; 804 for (;;) { 805 KJ_IF_MAYBE(waiter, nextWaiter) { 806 nextWaiter = waiter->next; 807 WakeConditionVariable(&coercedCondvar(waiter->condvar)); 808 } else { 809 // No more waiters. 810 break; 811 } 812 } 813 } 814 815 static BOOL WINAPI nullInitializer(PINIT_ONCE initOnce, PVOID parameter, PVOID* context) { 816 return true; 817 } 818 819 Once::Once(bool startInitialized) { 820 static_assert(sizeof(INIT_ONCE) == sizeof(initOnce), "INIT_ONCE is not a pointer?"); 821 InitOnceInitialize(&coercedInitOnce); 822 if (startInitialized) { 823 InitOnceExecuteOnce(&coercedInitOnce, &nullInitializer, nullptr, nullptr); 824 } 825 } 826 Once::~Once() {} 827 828 void Once::runOnce(Initializer& init, NoopSourceLocation) { 829 BOOL needInit; 830 while (!InitOnceBeginInitialize(&coercedInitOnce, 0, &needInit, nullptr)) { 831 // Init was occurring in another thread, but then failed with an exception. Retry. 832 } 833 834 if (needInit) { 835 { 836 KJ_ON_SCOPE_FAILURE(InitOnceComplete(&coercedInitOnce, INIT_ONCE_INIT_FAILED, nullptr)); 837 init.run(); 838 } 839 840 KJ_ASSERT(InitOnceComplete(&coercedInitOnce, 0, nullptr)); 841 } 842 } 843 844 bool Once::isInitialized() noexcept { 845 BOOL junk; 846 return InitOnceBeginInitialize(&coercedInitOnce, INIT_ONCE_CHECK_ONLY, &junk, nullptr); 847 } 848 849 void Once::reset() { 850 InitOnceInitialize(&coercedInitOnce); 851 } 852 853 #else 854 // ======================================================================================= 855 // Generic pthreads-based implementation 856 857 #define KJ_PTHREAD_CALL(code) \ 858 { \ 859 int pthreadError = code; \ 860 if (pthreadError != 0) { \ 861 KJ_FAIL_SYSCALL(#code, pthreadError); \ 862 } \ 863 } 864 865 #define KJ_PTHREAD_CLEANUP(code) \ 866 { \ 867 int pthreadError = code; \ 868 if (pthreadError != 0) { \ 869 KJ_LOG(ERROR, #code, strerror(pthreadError)); \ 870 } \ 871 } 872 873 Mutex::Mutex(): mutex(PTHREAD_RWLOCK_INITIALIZER) {} 874 Mutex::~Mutex() { 875 KJ_PTHREAD_CLEANUP(pthread_rwlock_destroy(&mutex)); 876 } 877 878 bool Mutex::lock(Exclusivity exclusivity, Maybe<Duration> timeout, NoopSourceLocation) { 879 if (timeout != nullptr) { 880 KJ_UNIMPLEMENTED("Locking a mutex with a timeout is only supported on Linux."); 881 } 882 switch (exclusivity) { 883 case EXCLUSIVE: 884 KJ_PTHREAD_CALL(pthread_rwlock_wrlock(&mutex)); 885 break; 886 case SHARED: 887 KJ_PTHREAD_CALL(pthread_rwlock_rdlock(&mutex)); 888 break; 889 } 890 return true; 891 } 892 893 void Mutex::unlock(Exclusivity exclusivity, Waiter* waiterToSkip) { 894 KJ_DEFER(KJ_PTHREAD_CALL(pthread_rwlock_unlock(&mutex))); 895 896 if (exclusivity == EXCLUSIVE) { 897 // Check if there are any conditional waiters. Note we only do this when unlocking an 898 // exclusive lock since under a shared lock the state couldn't have changed. 899 auto nextWaiter = waitersHead; 900 for (;;) { 901 KJ_IF_MAYBE(waiter, nextWaiter) { 902 nextWaiter = waiter->next; 903 904 if (waiter != waiterToSkip && checkPredicate(*waiter)) { 905 // This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we 906 // use _signal() vs. _broadcast() here since there's always only one thread waiting. 907 KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter->stupidMutex)); 908 KJ_PTHREAD_CALL(pthread_cond_signal(&waiter->condvar)); 909 KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter->stupidMutex)); 910 911 // We only need to wake one waiter. Note that unlike the futex-based implementation, we 912 // cannot "transfer ownership" of the lock to the waiter, therefore we cannot guarantee 913 // that the condition is still true when that waiter finally awakes. However, if the 914 // condition is no longer true at that point, the waiter will re-check all other waiters' 915 // conditions and possibly wake up any other waiter who is now ready, hence we still only 916 // need to wake one waiter here. 917 break; 918 } 919 } else { 920 // No more waiters. 921 break; 922 } 923 } 924 } 925 } 926 927 void Mutex::assertLockedByCaller(Exclusivity exclusivity) const { 928 switch (exclusivity) { 929 case EXCLUSIVE: 930 // A read lock should fail if the mutex is already held for writing. 931 if (pthread_rwlock_tryrdlock(&mutex) == 0) { 932 pthread_rwlock_unlock(&mutex); 933 KJ_FAIL_ASSERT("Tried to call getAlreadyLocked*() but lock is not held."); 934 } 935 break; 936 case SHARED: 937 // A write lock should fail if the mutex is already held for reading or writing. We don't 938 // have any way to prove that the lock is held only for reading. 939 if (pthread_rwlock_trywrlock(&mutex) == 0) { 940 pthread_rwlock_unlock(&mutex); 941 KJ_FAIL_ASSERT("Tried to call getAlreadyLocked*() but lock is not held."); 942 } 943 break; 944 } 945 } 946 947 void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout, NoopSourceLocation) { 948 // Add waiter to list. 949 Waiter waiter { 950 nullptr, waitersTail, predicate, nullptr, 951 PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER 952 }; 953 addWaiter(waiter); 954 955 // To guarantee that we've re-locked the mutex before scope exit, keep track of whether it is 956 // currently. 957 bool currentlyLocked = true; 958 KJ_DEFER({ 959 if (!currentlyLocked) lock(EXCLUSIVE, nullptr, NoopSourceLocation{}); 960 removeWaiter(waiter); 961 962 // Destroy pthread objects. 963 KJ_PTHREAD_CLEANUP(pthread_mutex_destroy(&waiter.stupidMutex)); 964 KJ_PTHREAD_CLEANUP(pthread_cond_destroy(&waiter.condvar)); 965 }); 966 967 #if !__APPLE__ 968 if (timeout != nullptr) { 969 // Oops, the default condvar uses the wall clock, which is dumb... fix it to use the monotonic 970 // clock. (Except not on macOS, where pthread_condattr_setclock() is unimplemented, but there's 971 // a bizarre pthread_cond_timedwait_relative_np() method we can use instead...) 972 pthread_condattr_t attr; 973 KJ_PTHREAD_CALL(pthread_condattr_init(&attr)); 974 KJ_PTHREAD_CALL(pthread_condattr_setclock(&attr, CLOCK_MONOTONIC)); 975 pthread_cond_init(&waiter.condvar, &attr); 976 KJ_PTHREAD_CALL(pthread_condattr_destroy(&attr)); 977 } 978 #endif 979 980 Maybe<struct timespec> endTime = timeout.map([](Duration d) { 981 return toAbsoluteTimespec(now() + d); 982 }); 983 984 while (!predicate.check()) { 985 // pthread condvars only work with basic mutexes, not rwlocks. So, we need to lock a basic 986 // mutex before we unlock the real mutex, and the signaling thread also needs to lock this 987 // mutex, in order to ensure that this thread is actually waiting on the condvar before it is 988 // signaled. 989 KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter.stupidMutex)); 990 991 // OK, now we can unlock the main mutex. 992 unlock(EXCLUSIVE, &waiter); 993 currentlyLocked = false; 994 995 bool timedOut = false; 996 997 // Wait for someone to signal the condvar. 998 KJ_IF_MAYBE(t, endTime) { 999 #if __APPLE__ 1000 // On macOS, the absolute timeout can only be specified in wall time, not monotonic time, 1001 // which means modifying the system clock will break the wait. However, macOS happens to 1002 // provide an alternative relative-time wait function, so I guess we'll use that. It does 1003 // require recomputing the time every iteration... 1004 struct timespec ts = toRelativeTimespec(kj::max(toTimePoint(*t) - now(), 0 * kj::SECONDS)); 1005 int error = pthread_cond_timedwait_relative_np(&waiter.condvar, &waiter.stupidMutex, &ts); 1006 #else 1007 int error = pthread_cond_timedwait(&waiter.condvar, &waiter.stupidMutex, t); 1008 #endif 1009 if (error != 0) { 1010 if (error == ETIMEDOUT) { 1011 timedOut = true; 1012 } else { 1013 KJ_FAIL_SYSCALL("pthread_cond_timedwait", error); 1014 } 1015 } 1016 } else { 1017 KJ_PTHREAD_CALL(pthread_cond_wait(&waiter.condvar, &waiter.stupidMutex)); 1018 } 1019 1020 // We have to be very careful about lock ordering here. We need to unlock stupidMutex before 1021 // re-locking the main mutex, because another thread may have a lock on the main mutex already 1022 // and be waiting for a lock on stupidMutex. Note that other thread may signal the condvar 1023 // right after we unlock stupidMutex but before we re-lock the main mutex. That is fine, 1024 // because we've already been signaled. 1025 KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter.stupidMutex)); 1026 1027 lock(EXCLUSIVE, nullptr, NoopSourceLocation{}); 1028 currentlyLocked = true; 1029 1030 KJ_IF_MAYBE(exception, waiter.exception) { 1031 // The predicate threw an exception, apparently. Propagate it. 1032 // TODO(someday): Could we somehow have this be a recoverable exception? Presumably we'd 1033 // then want MutexGuarded::when() to skip calling the callback, but then what should it 1034 // return, since it normally returns the callback's result? Or maybe people who disable 1035 // exceptions just really should not write predicates that can throw. 1036 kj::throwFatalException(kj::mv(**exception)); 1037 } 1038 1039 if (timedOut) { 1040 return; 1041 } 1042 } 1043 } 1044 1045 void Mutex::induceSpuriousWakeupForTest() { 1046 auto nextWaiter = waitersHead; 1047 for (;;) { 1048 KJ_IF_MAYBE(waiter, nextWaiter) { 1049 nextWaiter = waiter->next; 1050 KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter->stupidMutex)); 1051 KJ_PTHREAD_CALL(pthread_cond_signal(&waiter->condvar)); 1052 KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter->stupidMutex)); 1053 } else { 1054 // No more waiters. 1055 break; 1056 } 1057 } 1058 } 1059 1060 Once::Once(bool startInitialized) 1061 : state(startInitialized ? INITIALIZED : UNINITIALIZED), 1062 mutex(PTHREAD_MUTEX_INITIALIZER) {} 1063 Once::~Once() { 1064 KJ_PTHREAD_CLEANUP(pthread_mutex_destroy(&mutex)); 1065 } 1066 1067 void Once::runOnce(Initializer& init, NoopSourceLocation) { 1068 KJ_PTHREAD_CALL(pthread_mutex_lock(&mutex)); 1069 KJ_DEFER(KJ_PTHREAD_CALL(pthread_mutex_unlock(&mutex))); 1070 1071 if (state != UNINITIALIZED) { 1072 return; 1073 } 1074 1075 init.run(); 1076 1077 __atomic_store_n(&state, INITIALIZED, __ATOMIC_RELEASE); 1078 } 1079 1080 void Once::reset() { 1081 State oldState = INITIALIZED; 1082 if (!__atomic_compare_exchange_n(&state, &oldState, UNINITIALIZED, 1083 false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) { 1084 KJ_FAIL_REQUIRE("reset() called while not initialized."); 1085 } 1086 } 1087 1088 #endif 1089 1090 } // namespace _ (private) 1091 } // namespace kj