capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

mutex.c++ (40017B)


      1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #if _WIN32 || __CYGWIN__
     23 #include "win32-api-version.h"
     24 #endif
     25 
     26 #include "mutex.h"
     27 #include "debug.h"
     28 
     29 #if !_WIN32 && !__CYGWIN__
     30 #include <time.h>
     31 #include <errno.h>
     32 #endif
     33 
     34 #if KJ_USE_FUTEX
     35 #include <unistd.h>
     36 #include <sys/syscall.h>
     37 #include <linux/futex.h>
     38 #include <limits.h>
     39 
     40 #ifndef SYS_futex
     41 // Missing on Android/Bionic.
     42 #ifdef __NR_futex
     43 #define SYS_futex __NR_futex
     44 #elif defined(SYS_futex_time64)
     45 #define SYS_futex SYS_futex_time64
     46 #else
     47 #error "Need working SYS_futex"
     48 #endif
     49 #endif
     50 
     51 #ifndef FUTEX_WAIT_PRIVATE
     52 // Missing on Android/Bionic.
     53 #define FUTEX_WAIT_PRIVATE FUTEX_WAIT
     54 #define FUTEX_WAKE_PRIVATE FUTEX_WAKE
     55 #endif
     56 
     57 #elif _WIN32 || __CYGWIN__
     58 #include <windows.h>
     59 #endif
     60 
     61 namespace kj {
     62 #if KJ_TRACK_LOCK_BLOCKING
     63 static thread_local const BlockedOnReason* tlsBlockReason __attribute((tls_model("initial-exec")));
     64 // The initial-exec model ensures that even if this code is part of a shared library built PIC, then
     65 // we still place this variable in the appropriate ELF section so that __tls_get_addr is avoided.
     66 // It's unclear if __tls_get_addr is still not async signal safe in glibc. The only negative
     67 // downside of this approach is that a shared library built with kj & lock tracking will fail if
     68 // dlopen'ed which isn't an intended use-case for the initial implementation.
     69 
     70 Maybe<const BlockedOnReason&> blockedReason() noexcept {
     71   if (tlsBlockReason == nullptr) {
     72     return nullptr;
     73   }
     74   return *tlsBlockReason;
     75 }
     76 
     77 static void setCurrentThreadIsWaitingFor(const BlockedOnReason* meta) {
     78   tlsBlockReason = meta;
     79 }
     80 
     81 static void setCurrentThreadIsNoLongerWaiting() {
     82   tlsBlockReason = nullptr;
     83 }
     84 #elif KJ_USE_FUTEX
     85 struct BlockedOnMutexAcquisition {
     86   constexpr BlockedOnMutexAcquisition(const _::Mutex& mutex, LockSourceLocationArg) {}
     87 };
     88 
     89 struct BlockedOnCondVarWait {
     90   constexpr BlockedOnCondVarWait(const _::Mutex& mutex, const void *waiter,
     91       LockSourceLocationArg) {}
     92 };
     93 
     94 struct BlockedOnOnceInit {
     95   constexpr BlockedOnOnceInit(const _::Once& once, LockSourceLocationArg) {}
     96 };
     97 
     98 struct BlockedOnReason {
     99   constexpr BlockedOnReason(const BlockedOnMutexAcquisition&) {}
    100   constexpr BlockedOnReason(const BlockedOnCondVarWait&) {}
    101   constexpr BlockedOnReason(const BlockedOnOnceInit&) {}
    102 };
    103 
    104 static void setCurrentThreadIsWaitingFor(const BlockedOnReason* meta) {}
    105 static void setCurrentThreadIsNoLongerWaiting() {}
    106 #endif
    107 
    108 namespace _ {  // private
    109 
    110 #if KJ_USE_FUTEX
    111 constexpr uint Mutex::EXCLUSIVE_HELD;
    112 constexpr uint Mutex::EXCLUSIVE_REQUESTED;
    113 constexpr uint Mutex::SHARED_COUNT_MASK;
    114 #endif
    115 
    116 inline void Mutex::addWaiter(Waiter& waiter) {
    117 #ifdef KJ_DEBUG
    118   assertLockedByCaller(EXCLUSIVE);
    119 #endif
    120   *waitersTail = waiter;
    121   waitersTail = &waiter.next;
    122 }
    123 inline void Mutex::removeWaiter(Waiter& waiter) {
    124 #ifdef KJ_DEBUG
    125   assertLockedByCaller(EXCLUSIVE);
    126 #endif
    127   *waiter.prev = waiter.next;
    128   KJ_IF_MAYBE(next, waiter.next) {
    129     next->prev = waiter.prev;
    130   } else {
    131     KJ_DASSERT(waitersTail == &waiter.next);
    132     waitersTail = waiter.prev;
    133   }
    134 }
    135 
    136 bool Mutex::checkPredicate(Waiter& waiter) {
    137   // Run the predicate from a thread other than the waiting thread, returning true if it's time to
    138   // signal the waiting thread. This is not only when the predicate passes, but also when it
    139   // throws, in which case we want to propagate the exception to the waiting thread.
    140 
    141   if (waiter.exception != nullptr) return true;  // don't run again after an exception
    142 
    143   bool result = false;
    144   KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
    145     result = waiter.predicate.check();
    146   })) {
    147     // Exception thown.
    148     result = true;
    149     waiter.exception = kj::heap(kj::mv(*exception));
    150   };
    151   return result;
    152 }
    153 
    154 #if !_WIN32 && !__CYGWIN__
    155 namespace {
    156 
    157 TimePoint toTimePoint(struct timespec ts) {
    158   return kj::origin<TimePoint>() + ts.tv_sec * kj::SECONDS + ts.tv_nsec * kj::NANOSECONDS;
    159 }
    160 TimePoint now() {
    161   struct timespec now;
    162   KJ_SYSCALL(clock_gettime(CLOCK_MONOTONIC, &now));
    163   return toTimePoint(now);
    164 }
    165 struct timespec toRelativeTimespec(Duration timeout) {
    166   struct timespec ts;
    167   ts.tv_sec = timeout / kj::SECONDS;
    168   ts.tv_nsec = timeout % kj::SECONDS / kj::NANOSECONDS;
    169   return ts;
    170 }
    171 struct timespec toAbsoluteTimespec(TimePoint time) {
    172   return toRelativeTimespec(time - kj::origin<TimePoint>());
    173 }
    174 
    175 }  // namespace
    176 #endif
    177 
    178 #if KJ_USE_FUTEX
    179 // =======================================================================================
    180 // Futex-based implementation (Linux-only)
    181 
    182 #if KJ_SAVE_ACQUIRED_LOCK_INFO
    183 #if !__GLIBC_PREREQ(2, 30)
    184 #ifndef SYS_gettid
    185 #error SYS_gettid is unavailable on this system
    186 #endif
    187 
    188 #define gettid() ((pid_t)syscall(SYS_gettid))
    189 #endif
    190 
    191 static thread_local pid_t tlsTid = gettid();
    192 #define TRACK_ACQUIRED_TID() tlsTid
    193 
    194 Mutex::AcquiredMetadata Mutex::lockedInfo() const {
    195   auto state = __atomic_load_n(&futex, __ATOMIC_RELAXED);
    196   auto tid = lockedExclusivelyByThread;
    197   auto location = lockAcquiredLocation;
    198 
    199   if (state & EXCLUSIVE_HELD) {
    200     return HoldingExclusively{tid, location};
    201   } else {
    202     return HoldingShared{location};
    203   }
    204 }
    205 
    206 #else
    207 #define TRACK_ACQUIRED_TID() 0
    208 #endif
    209 
    210 Mutex::Mutex(): futex(0) {}
    211 Mutex::~Mutex() {
    212   // This will crash anyway, might as well crash with a nice error message.
    213   KJ_ASSERT(futex == 0, "Mutex destroyed while locked.") { break; }
    214 }
    215 
    216 bool Mutex::lock(Exclusivity exclusivity, Maybe<Duration> timeout, LockSourceLocationArg location) {
    217   BlockedOnReason blockReason = BlockedOnMutexAcquisition{*this, location};
    218   KJ_DEFER(setCurrentThreadIsNoLongerWaiting());
    219 
    220   auto spec = timeout.map([](Duration d) { return toRelativeTimespec(d); });
    221   struct timespec* specp = nullptr;
    222   KJ_IF_MAYBE(s, spec) {
    223     specp = s;
    224   }
    225 
    226   switch (exclusivity) {
    227     case EXCLUSIVE:
    228       for (;;) {
    229         uint state = 0;
    230         if (KJ_LIKELY(__atomic_compare_exchange_n(&futex, &state, EXCLUSIVE_HELD, false,
    231                                                   __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) {
    232 
    233           // Acquired.
    234           break;
    235         }
    236 
    237         // The mutex is contended.  Set the exclusive-requested bit and wait.
    238         if ((state & EXCLUSIVE_REQUESTED) == 0) {
    239           if (!__atomic_compare_exchange_n(&futex, &state, state | EXCLUSIVE_REQUESTED, false,
    240                                            __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
    241             // Oops, the state changed before we could set the request bit.  Start over.
    242             continue;
    243           }
    244 
    245           state |= EXCLUSIVE_REQUESTED;
    246         }
    247 
    248         setCurrentThreadIsWaitingFor(&blockReason);
    249 
    250         auto result = syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, state, specp, nullptr, 0);
    251         if (result < 0) {
    252           if (errno == ETIMEDOUT) {
    253             setCurrentThreadIsNoLongerWaiting();
    254             // We timed out, we can't remove the exclusive request flag (since others might be waiting)
    255             // so we just return false.
    256             return false;
    257           }
    258         }
    259       }
    260       acquiredExclusive(TRACK_ACQUIRED_TID(), location);
    261 #if KJ_CONTENTION_WARNING_THRESHOLD
    262       printContendedReader = false;
    263 #endif
    264       break;
    265     case SHARED: {
    266 #if KJ_CONTENTION_WARNING_THRESHOLD
    267       kj::Maybe<kj::TimePoint> contentionWaitStart;
    268 #endif
    269 
    270       uint state = __atomic_add_fetch(&futex, 1, __ATOMIC_ACQUIRE);
    271 
    272       for (;;) {
    273         if (KJ_LIKELY((state & EXCLUSIVE_HELD) == 0)) {
    274           // Acquired.
    275           break;
    276         }
    277 
    278 #if KJ_CONTENTION_WARNING_THRESHOLD
    279         if (contentionWaitStart == nullptr) {
    280           // We could have the exclusive mutex tell us how long it was holding the lock. That would
    281           // be the nicest. However, I'm hesitant to bloat the structure. I suspect having a reader
    282           // tell us how long it was waiting for is probably a good proxy.
    283           contentionWaitStart = kj::systemPreciseMonotonicClock().now();
    284         }
    285 #endif
    286 
    287         setCurrentThreadIsWaitingFor(&blockReason);
    288 
    289         // The mutex is exclusively locked by another thread.  Since we incremented the counter
    290         // already, we just have to wait for it to be unlocked.
    291         auto result = syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, state, specp, nullptr, 0);
    292         if (result < 0) {
    293           // If we timeout though, we need to signal that we're not waiting anymore.
    294           if (errno == ETIMEDOUT) {
    295             setCurrentThreadIsNoLongerWaiting();
    296             state = __atomic_sub_fetch(&futex, 1, __ATOMIC_RELAXED);
    297 
    298             // We may have unlocked since we timed out. So act like we just unlocked the mutex
    299             // and maybe send a wait signal if needed. See Mutex::unlock SHARED case.
    300             if (KJ_UNLIKELY(state == EXCLUSIVE_REQUESTED)) {
    301               if (__atomic_compare_exchange_n(
    302                   &futex, &state, 0, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
    303                 // Wake all exclusive waiters.  We have to wake all of them because one of them will
    304                 // grab the lock while the others will re-establish the exclusive-requested bit.
    305                 syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
    306               }
    307             }
    308             return false;
    309           }
    310         }
    311         state = __atomic_load_n(&futex, __ATOMIC_ACQUIRE);
    312       }
    313 
    314 #ifdef KJ_CONTENTION_WARNING_THRESHOLD
    315       KJ_IF_MAYBE(start, contentionWaitStart) {
    316         if (__atomic_load_n(&printContendedReader, __ATOMIC_RELAXED)) {
    317           // Double-checked lock avoids the CPU needing to acquire the lock in most cases.
    318           if (__atomic_exchange_n(&printContendedReader, false, __ATOMIC_RELAXED)) {
    319             auto contentionDuration = kj::systemPreciseMonotonicClock().now() - *start;
    320             KJ_LOG(WARNING, "Acquired contended lock", location, contentionDuration,
    321                 kj::getStackTrace());
    322           }
    323         }
    324       }
    325 #endif
    326 
    327       // We just want to record the lock being acquired somewhere but the specific location doesn't
    328       // matter. This does mean that race conditions could occur where a thread might read this
    329       // inconsistently (e.g. filename from 1 lock & function from another). This currently is just
    330       // meant to be a debugging aid for manual analysis so it's OK for that purpose. If it's ever
    331       // required for this to be used for anything else, then this should probably be changed to
    332       // use an additional atomic variable that can ensure only one writer updates this. Or use the
    333       // futex variable to ensure that this is only done for the first one to acquire the lock,
    334       // although there may be thundering herd problems with that whereby there's a long wallclock
    335       // time between when the lock is acquired and when the location is updated (since the first
    336       // locker isn't really guaranteed to be the first one unlocked).
    337       acquiredShared(location);
    338 
    339       break;
    340     }
    341   }
    342   return true;
    343 }
    344 
    345 void Mutex::unlock(Exclusivity exclusivity, Waiter* waiterToSkip) {
    346   switch (exclusivity) {
    347     case EXCLUSIVE: {
    348       KJ_DASSERT(futex & EXCLUSIVE_HELD, "Unlocked a mutex that wasn't locked.");
    349 
    350 #ifdef KJ_CONTENTION_WARNING_THRESHOLD
    351       auto acquiredLocation = releasingExclusive();
    352 #endif
    353 
    354       // First check if there are any conditional waiters. Note we only do this when unlocking an
    355       // exclusive lock since under a shared lock the state couldn't have changed.
    356       auto nextWaiter = waitersHead;
    357       for (;;) {
    358         KJ_IF_MAYBE(waiter, nextWaiter) {
    359           nextWaiter = waiter->next;
    360 
    361           if (waiter != waiterToSkip && checkPredicate(*waiter)) {
    362             // This waiter's predicate now evaluates true, so wake it up.
    363             if (waiter->hasTimeout) {
    364               // In this case we need to be careful to make sure the target thread isn't already
    365               // processing a timeout, so we need to do an atomic CAS rather than just a store.
    366               uint expected = 0;
    367               if (__atomic_compare_exchange_n(&waiter->futex, &expected, 1, false,
    368                                               __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
    369                 // Good, we set it to 1, transferring ownership of the mutex. Continue on below.
    370               } else {
    371                 // Looks like the thread already timed out and set its own futex to 1. In that
    372                 // case it is going to try to lock the mutex itself, so we should NOT attempt an
    373                 // ownership transfer as this will deadlock.
    374                 //
    375                 // We have two options here: We can continue along the waiter list looking for
    376                 // another waiter that's ready to be signaled, or we could drop out of the list
    377                 // immediately since we know that another thread is already waiting for the lock
    378                 // and will re-evaluate the waiter queue itself when it is done. It feels cleaner
    379                 // to me to continue.
    380                 continue;
    381               }
    382             } else {
    383               __atomic_store_n(&waiter->futex, 1, __ATOMIC_RELEASE);
    384             }
    385             syscall(SYS_futex, &waiter->futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
    386 
    387             // We transferred ownership of the lock to this waiter, so we're done now.
    388             return;
    389           }
    390         } else {
    391           // No more waiters.
    392           break;
    393         }
    394       }
    395 
    396 #ifdef KJ_CONTENTION_WARNING_THRESHOLD
    397       uint readerCount;
    398       {
    399         uint oldState = __atomic_load_n(&futex, __ATOMIC_RELAXED);
    400         readerCount = oldState & SHARED_COUNT_MASK;
    401         if (readerCount >= KJ_CONTENTION_WARNING_THRESHOLD) {
    402           // Atomic not needed because we're still holding the exclusive lock.
    403           printContendedReader = true;
    404         }
    405       }
    406 #endif
    407 
    408       // Didn't wake any waiters, so wake normally.
    409       uint oldState = __atomic_fetch_and(
    410           &futex, ~(EXCLUSIVE_HELD | EXCLUSIVE_REQUESTED), __ATOMIC_RELEASE);
    411 
    412       if (KJ_UNLIKELY(oldState & ~EXCLUSIVE_HELD)) {
    413         // Other threads are waiting.  If there are any shared waiters, they now collectively hold
    414         // the lock, and we must wake them up.  If there are any exclusive waiters, we must wake
    415         // them up even if readers are waiting so that at the very least they may re-establish the
    416         // EXCLUSIVE_REQUESTED bit that we just removed.
    417         syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
    418 
    419 #ifdef KJ_CONTENTION_WARNING_THRESHOLD
    420         if (readerCount >= KJ_CONTENTION_WARNING_THRESHOLD) {
    421           KJ_LOG(WARNING, "excessively many readers were waiting on this lock", readerCount,
    422               acquiredLocation, kj::getStackTrace());
    423         }
    424 #endif
    425       }
    426       break;
    427     }
    428 
    429     case SHARED: {
    430       KJ_DASSERT(futex & SHARED_COUNT_MASK, "Unshared a mutex that wasn't shared.");
    431       uint state = __atomic_sub_fetch(&futex, 1, __ATOMIC_RELEASE);
    432 
    433       // The only case where anyone is waiting is if EXCLUSIVE_REQUESTED is set, and the only time
    434       // it makes sense to wake up that waiter is if the shared count has reached zero.
    435       if (KJ_UNLIKELY(state == EXCLUSIVE_REQUESTED)) {
    436         if (__atomic_compare_exchange_n(
    437             &futex, &state, 0, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
    438           // Wake all exclusive waiters.  We have to wake all of them because one of them will
    439           // grab the lock while the others will re-establish the exclusive-requested bit.
    440           syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
    441         }
    442       }
    443       break;
    444     }
    445   }
    446 }
    447 
    448 void Mutex::assertLockedByCaller(Exclusivity exclusivity) const {
    449   switch (exclusivity) {
    450     case EXCLUSIVE:
    451       KJ_ASSERT(futex & EXCLUSIVE_HELD,
    452                 "Tried to call getAlreadyLocked*() but lock is not held.");
    453       break;
    454     case SHARED:
    455       KJ_ASSERT(futex & SHARED_COUNT_MASK,
    456                 "Tried to call getAlreadyLocked*() but lock is not held.");
    457       break;
    458   }
    459 }
    460 
    461 void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout, LockSourceLocationArg location) {
    462   // Add waiter to list.
    463   Waiter waiter { nullptr, waitersTail, predicate, nullptr, 0, timeout != nullptr };
    464   addWaiter(waiter);
    465 
    466   BlockedOnReason blockReason = BlockedOnCondVarWait{*this, &waiter, location};
    467   KJ_DEFER(setCurrentThreadIsNoLongerWaiting());
    468 
    469   // To guarantee that we've re-locked the mutex before scope exit, keep track of whether it is
    470   // currently.
    471   bool currentlyLocked = true;
    472   KJ_DEFER({
    473     // Infinite timeout for re-obtaining the lock is on purpose because the post-condition for this
    474     // function has to be that the lock state hasn't changed (& we have to be locked when we enter
    475     // since that's how condvars work).
    476     if (!currentlyLocked) lock(EXCLUSIVE, nullptr, location);
    477     removeWaiter(waiter);
    478   });
    479 
    480   if (!predicate.check()) {
    481     unlock(EXCLUSIVE, &waiter);
    482     currentlyLocked = false;
    483 
    484     struct timespec ts;
    485     struct timespec* tsp = nullptr;
    486     KJ_IF_MAYBE(t, timeout) {
    487       ts = toAbsoluteTimespec(now() + *t);
    488       tsp = &ts;
    489     }
    490 
    491     setCurrentThreadIsWaitingFor(&blockReason);
    492 
    493     // Wait for someone to set our futex to 1.
    494     for (;;) {
    495       // Note we use FUTEX_WAIT_BITSET_PRIVATE + FUTEX_BITSET_MATCH_ANY to get the same effect as
    496       // FUTEX_WAIT_PRIVATE except that the timeout is specified as an absolute time based on
    497       // CLOCK_MONOTONIC. Otherwise, FUTEX_WAIT_PRIVATE interprets it as a relative time, forcing
    498       // us to recompute the time after every iteration.
    499       KJ_SYSCALL_HANDLE_ERRORS(syscall(SYS_futex,
    500           &waiter.futex, FUTEX_WAIT_BITSET_PRIVATE, 0, tsp, nullptr, FUTEX_BITSET_MATCH_ANY)) {
    501         case EAGAIN:
    502           // Indicates that the futex was already non-zero by the time the kernal looked at it.
    503           // Not an error.
    504           break;
    505         case ETIMEDOUT: {
    506           // Wait timed out. This leaves us in a bit of a pickle: Ownership of the mutex was not
    507           // transferred to us from another thread. So, we need to lock it ourselves. But, another
    508           // thread might be in the process of signaling us and transferring ownership. So, we
    509           // first must atomically take control of our destiny.
    510           KJ_ASSERT(timeout != nullptr);
    511           uint expected = 0;
    512           if (__atomic_compare_exchange_n(&waiter.futex, &expected, 1, false,
    513                                           __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) {
    514             // OK, we set our own futex to 1. That means no other thread will, and so we won't be
    515             // receiving a mutex ownership transfer. We have to lock the mutex ourselves.
    516             setCurrentThreadIsNoLongerWaiting();
    517             lock(EXCLUSIVE, nullptr, location);
    518             currentlyLocked = true;
    519             return;
    520           } else {
    521             // Oh, someone else actually did signal us, apparently. Let's move on as if the futex
    522             // call told us so.
    523             break;
    524           }
    525         }
    526         default:
    527           KJ_FAIL_SYSCALL("futex(FUTEX_WAIT_PRIVATE)", error);
    528       }
    529 
    530       setCurrentThreadIsNoLongerWaiting();
    531 
    532       if (__atomic_load_n(&waiter.futex, __ATOMIC_ACQUIRE)) {
    533         // We received a lock ownership transfer from another thread.
    534         currentlyLocked = true;
    535 
    536         // The other thread checked the predicate before the transfer.
    537 #ifdef KJ_DEBUG
    538         assertLockedByCaller(EXCLUSIVE);
    539 #endif
    540 
    541         KJ_IF_MAYBE(exception, waiter.exception) {
    542           // The predicate threw an exception, apparently. Propagate it.
    543           // TODO(someday): Could we somehow have this be a recoverable exception? Presumably we'd
    544           //   then want MutexGuarded::when() to skip calling the callback, but then what should it
    545           //   return, since it normally returns the callback's result? Or maybe people who disable
    546           //   exceptions just really should not write predicates that can throw.
    547           kj::throwFatalException(kj::mv(**exception));
    548         }
    549 
    550         return;
    551       }
    552     }
    553   }
    554 }
    555 
    556 void Mutex::induceSpuriousWakeupForTest() {
    557   auto nextWaiter = waitersHead;
    558   for (;;) {
    559     KJ_IF_MAYBE(waiter, nextWaiter) {
    560       nextWaiter = waiter->next;
    561       syscall(SYS_futex, &waiter->futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
    562     } else {
    563       // No more waiters.
    564       break;
    565     }
    566   }
    567 }
    568 
    569 uint Mutex::numReadersWaitingForTest() const {
    570   assertLockedByCaller(EXCLUSIVE);
    571   return futex & SHARED_COUNT_MASK;
    572 }
    573 
    574 void Once::runOnce(Initializer& init, LockSourceLocationArg location) {
    575 startOver:
    576   uint state = UNINITIALIZED;
    577   if (__atomic_compare_exchange_n(&futex, &state, INITIALIZING, false,
    578                                   __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
    579     // It's our job to initialize!
    580     {
    581       KJ_ON_SCOPE_FAILURE({
    582         // An exception was thrown by the initializer.  We have to revert.
    583         if (__atomic_exchange_n(&futex, UNINITIALIZED, __ATOMIC_RELEASE) ==
    584             INITIALIZING_WITH_WAITERS) {
    585           // Someone was waiting for us to finish.
    586           syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
    587         }
    588       });
    589 
    590       init.run();
    591     }
    592     if (__atomic_exchange_n(&futex, INITIALIZED, __ATOMIC_RELEASE) ==
    593         INITIALIZING_WITH_WAITERS) {
    594       // Someone was waiting for us to finish.
    595       syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
    596     }
    597   } else {
    598     BlockedOnReason blockReason = BlockedOnOnceInit{*this, location};
    599     KJ_DEFER(setCurrentThreadIsNoLongerWaiting());
    600 
    601     for (;;) {
    602       if (state == INITIALIZED) {
    603         break;
    604       } else if (state == INITIALIZING) {
    605         // Initialization is taking place in another thread.  Indicate that we're waiting.
    606         if (!__atomic_compare_exchange_n(&futex, &state, INITIALIZING_WITH_WAITERS, true,
    607                                          __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) {
    608           // State changed, retry.
    609           continue;
    610         }
    611       } else {
    612         KJ_DASSERT(state == INITIALIZING_WITH_WAITERS);
    613       }
    614 
    615       // Wait for initialization.
    616       setCurrentThreadIsWaitingFor(&blockReason);
    617       syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, INITIALIZING_WITH_WAITERS,
    618                          nullptr, nullptr, 0);
    619       state = __atomic_load_n(&futex, __ATOMIC_ACQUIRE);
    620 
    621       if (state == UNINITIALIZED) {
    622         // Oh hey, apparently whoever was trying to initialize gave up.  Let's take it from the
    623         // top.
    624         goto startOver;
    625       }
    626     }
    627   }
    628 }
    629 
    630 void Once::reset() {
    631   uint state = INITIALIZED;
    632   if (!__atomic_compare_exchange_n(&futex, &state, UNINITIALIZED,
    633                                    false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
    634     KJ_FAIL_REQUIRE("reset() called while not initialized.");
    635   }
    636 }
    637 
    638 #elif _WIN32 || __CYGWIN__
    639 // =======================================================================================
    640 // Win32 implementation
    641 
    642 #define coercedSrwLock (*reinterpret_cast<SRWLOCK*>(&srwLock))
    643 #define coercedInitOnce (*reinterpret_cast<INIT_ONCE*>(&initOnce))
    644 #define coercedCondvar(var) (*reinterpret_cast<CONDITION_VARIABLE*>(&var))
    645 
    646 Mutex::Mutex() {
    647   static_assert(sizeof(SRWLOCK) == sizeof(srwLock), "SRWLOCK is not a pointer?");
    648   InitializeSRWLock(&coercedSrwLock);
    649 }
    650 Mutex::~Mutex() {}
    651 
    652 bool Mutex::lock(Exclusivity exclusivity, Maybe<Duration> timeout, NoopSourceLocation) {
    653   if (timeout != nullptr) {
    654     KJ_UNIMPLEMENTED("Locking a mutex with a timeout is only supported on Linux.");
    655   }
    656   switch (exclusivity) {
    657     case EXCLUSIVE:
    658       AcquireSRWLockExclusive(&coercedSrwLock);
    659       break;
    660     case SHARED:
    661       AcquireSRWLockShared(&coercedSrwLock);
    662       break;
    663   }
    664   return true;
    665 }
    666 
    667 void Mutex::wakeReadyWaiter(Waiter* waiterToSkip) {
    668   // Look for a waiter whose predicate is now evaluating true, and wake it. We wake no more than
    669   // one waiter because only one waiter could get the lock anyway, and once it releases that lock
    670   // it will awake the next waiter if necessary.
    671 
    672   auto nextWaiter = waitersHead;
    673   for (;;) {
    674     KJ_IF_MAYBE(waiter, nextWaiter) {
    675       nextWaiter = waiter->next;
    676 
    677       if (waiter != waiterToSkip && checkPredicate(*waiter)) {
    678         // This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we
    679         // use Wake vs. WakeAll here since there's always only one thread waiting.
    680         WakeConditionVariable(&coercedCondvar(waiter->condvar));
    681 
    682         // We only need to wake one waiter. Note that unlike the futex-based implementation, we
    683         // cannot "transfer ownership" of the lock to the waiter, therefore we cannot guarantee
    684         // that the condition is still true when that waiter finally awakes. However, if the
    685         // condition is no longer true at that point, the waiter will re-check all other
    686         // waiters' conditions and possibly wake up any other waiter who is now ready, hence we
    687         // still only need to wake one waiter here.
    688         return;
    689       }
    690     } else {
    691       // No more waiters.
    692       break;
    693     }
    694   }
    695 }
    696 
    697 void Mutex::unlock(Exclusivity exclusivity, Waiter* waiterToSkip) {
    698   switch (exclusivity) {
    699     case EXCLUSIVE: {
    700       KJ_DEFER(ReleaseSRWLockExclusive(&coercedSrwLock));
    701 
    702       // Check if there are any conditional waiters. Note we only do this when unlocking an
    703       // exclusive lock since under a shared lock the state couldn't have changed.
    704       wakeReadyWaiter(waiterToSkip);
    705       break;
    706     }
    707 
    708     case SHARED:
    709       ReleaseSRWLockShared(&coercedSrwLock);
    710       break;
    711   }
    712 }
    713 
    714 void Mutex::assertLockedByCaller(Exclusivity exclusivity) const {
    715   // We could use TryAcquireSRWLock*() here like we do with the pthread version. However, as of
    716   // this writing, my version of Wine (1.6.2) doesn't implement these functions and will abort if
    717   // they are called. Since we were only going to use them as a hacky way to check if the lock is
    718   // held for debug purposes anyway, we just don't bother.
    719 }
    720 
    721 void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout, NoopSourceLocation) {
    722   // Add waiter to list.
    723   Waiter waiter { nullptr, waitersTail, predicate, nullptr, 0 };
    724   static_assert(sizeof(waiter.condvar) == sizeof(CONDITION_VARIABLE),
    725                 "CONDITION_VARIABLE is not a pointer?");
    726   InitializeConditionVariable(&coercedCondvar(waiter.condvar));
    727 
    728   addWaiter(waiter);
    729   KJ_DEFER(removeWaiter(waiter));
    730 
    731   DWORD sleepMs;
    732 
    733   // Only initialized if `timeout` is non-null.
    734   const MonotonicClock* clock = nullptr;
    735   kj::Maybe<kj::TimePoint> endTime;
    736 
    737   KJ_IF_MAYBE(t, timeout) {
    738     // Windows sleeps are inaccurate -- they can be longer *or shorter* than the requested amount.
    739     // For many use cases of our API, a too-short sleep would be unacceptable. Experimentally, it
    740     // seems like sleeps can be up to half a millisecond short, so we'll add half a millisecond
    741     // (and then we round up, below).
    742     *t += 500 * kj::MICROSECONDS;
    743 
    744     // Compute initial sleep time.
    745     sleepMs = *t / kj::MILLISECONDS;
    746     if (*t % kj::MILLISECONDS > 0 * kj::SECONDS) {
    747       // We guarantee we won't wake up too early.
    748       ++sleepMs;
    749     }
    750 
    751     clock = &systemPreciseMonotonicClock();
    752     endTime = clock->now() + *t;
    753   } else {
    754     sleepMs = INFINITE;
    755   }
    756 
    757   while (!predicate.check()) {
    758     // SleepConditionVariableSRW() will temporarily release the lock, so we need to signal other
    759     // waiters that are now ready.
    760     wakeReadyWaiter(&waiter);
    761 
    762     if (SleepConditionVariableSRW(&coercedCondvar(waiter.condvar), &coercedSrwLock, sleepMs, 0)) {
    763       // Normal result. Continue loop to check predicate.
    764     } else {
    765       DWORD error = GetLastError();
    766       if (error == ERROR_TIMEOUT) {
    767         // Windows may have woken us up too early, so don't return yet. Instead, proceed through the
    768         // loop and rely on our sleep time recalculation to detect if we timed out.
    769       } else {
    770         KJ_FAIL_WIN32("SleepConditionVariableSRW()", error);
    771       }
    772     }
    773 
    774     KJ_IF_MAYBE(exception, waiter.exception) {
    775       // The predicate threw an exception, apparently. Propagate it.
    776       // TODO(someday): Could we somehow have this be a recoverable exception? Presumably we'd
    777       //   then want MutexGuarded::when() to skip calling the callback, but then what should it
    778       //   return, since it normally returns the callback's result? Or maybe people who disable
    779       //   exceptions just really should not write predicates that can throw.
    780       kj::throwFatalException(kj::mv(**exception));
    781     }
    782 
    783     // Recompute sleep time.
    784     KJ_IF_MAYBE(e, endTime) {
    785       auto now = clock->now();
    786 
    787       if (*e > now) {
    788         auto sleepTime = *e - now;
    789         sleepMs = sleepTime / kj::MILLISECONDS;
    790         if (sleepTime % kj::MILLISECONDS > 0 * kj::SECONDS) {
    791           // We guarantee we won't wake up too early.
    792           ++sleepMs;
    793         }
    794       } else {
    795         // Oops, already timed out.
    796         return;
    797       }
    798     }
    799   }
    800 }
    801 
    802 void Mutex::induceSpuriousWakeupForTest() {
    803   auto nextWaiter = waitersHead;
    804   for (;;) {
    805     KJ_IF_MAYBE(waiter, nextWaiter) {
    806       nextWaiter = waiter->next;
    807       WakeConditionVariable(&coercedCondvar(waiter->condvar));
    808     } else {
    809       // No more waiters.
    810       break;
    811     }
    812   }
    813 }
    814 
    815 static BOOL WINAPI nullInitializer(PINIT_ONCE initOnce, PVOID parameter, PVOID* context) {
    816   return true;
    817 }
    818 
    819 Once::Once(bool startInitialized) {
    820   static_assert(sizeof(INIT_ONCE) == sizeof(initOnce), "INIT_ONCE is not a pointer?");
    821   InitOnceInitialize(&coercedInitOnce);
    822   if (startInitialized) {
    823     InitOnceExecuteOnce(&coercedInitOnce, &nullInitializer, nullptr, nullptr);
    824   }
    825 }
    826 Once::~Once() {}
    827 
    828 void Once::runOnce(Initializer& init, NoopSourceLocation) {
    829   BOOL needInit;
    830   while (!InitOnceBeginInitialize(&coercedInitOnce, 0, &needInit, nullptr)) {
    831     // Init was occurring in another thread, but then failed with an exception. Retry.
    832   }
    833 
    834   if (needInit) {
    835     {
    836       KJ_ON_SCOPE_FAILURE(InitOnceComplete(&coercedInitOnce, INIT_ONCE_INIT_FAILED, nullptr));
    837       init.run();
    838     }
    839 
    840     KJ_ASSERT(InitOnceComplete(&coercedInitOnce, 0, nullptr));
    841   }
    842 }
    843 
    844 bool Once::isInitialized() noexcept {
    845   BOOL junk;
    846   return InitOnceBeginInitialize(&coercedInitOnce, INIT_ONCE_CHECK_ONLY, &junk, nullptr);
    847 }
    848 
    849 void Once::reset() {
    850   InitOnceInitialize(&coercedInitOnce);
    851 }
    852 
    853 #else
    854 // =======================================================================================
    855 // Generic pthreads-based implementation
    856 
    857 #define KJ_PTHREAD_CALL(code) \
    858   { \
    859     int pthreadError = code; \
    860     if (pthreadError != 0) { \
    861       KJ_FAIL_SYSCALL(#code, pthreadError); \
    862     } \
    863   }
    864 
    865 #define KJ_PTHREAD_CLEANUP(code) \
    866   { \
    867     int pthreadError = code; \
    868     if (pthreadError != 0) { \
    869       KJ_LOG(ERROR, #code, strerror(pthreadError)); \
    870     } \
    871   }
    872 
    873 Mutex::Mutex(): mutex(PTHREAD_RWLOCK_INITIALIZER) {}
    874 Mutex::~Mutex() {
    875   KJ_PTHREAD_CLEANUP(pthread_rwlock_destroy(&mutex));
    876 }
    877 
    878 bool Mutex::lock(Exclusivity exclusivity, Maybe<Duration> timeout, NoopSourceLocation) {
    879   if (timeout != nullptr) {
    880     KJ_UNIMPLEMENTED("Locking a mutex with a timeout is only supported on Linux.");
    881   }
    882   switch (exclusivity) {
    883     case EXCLUSIVE:
    884       KJ_PTHREAD_CALL(pthread_rwlock_wrlock(&mutex));
    885       break;
    886     case SHARED:
    887       KJ_PTHREAD_CALL(pthread_rwlock_rdlock(&mutex));
    888       break;
    889   }
    890   return true;
    891 }
    892 
    893 void Mutex::unlock(Exclusivity exclusivity, Waiter* waiterToSkip) {
    894   KJ_DEFER(KJ_PTHREAD_CALL(pthread_rwlock_unlock(&mutex)));
    895 
    896   if (exclusivity == EXCLUSIVE) {
    897     // Check if there are any conditional waiters. Note we only do this when unlocking an
    898     // exclusive lock since under a shared lock the state couldn't have changed.
    899     auto nextWaiter = waitersHead;
    900     for (;;) {
    901       KJ_IF_MAYBE(waiter, nextWaiter) {
    902         nextWaiter = waiter->next;
    903 
    904         if (waiter != waiterToSkip && checkPredicate(*waiter)) {
    905           // This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we
    906           // use _signal() vs. _broadcast() here since there's always only one thread waiting.
    907           KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter->stupidMutex));
    908           KJ_PTHREAD_CALL(pthread_cond_signal(&waiter->condvar));
    909           KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter->stupidMutex));
    910 
    911           // We only need to wake one waiter. Note that unlike the futex-based implementation, we
    912           // cannot "transfer ownership" of the lock to the waiter, therefore we cannot guarantee
    913           // that the condition is still true when that waiter finally awakes. However, if the
    914           // condition is no longer true at that point, the waiter will re-check all other waiters'
    915           // conditions and possibly wake up any other waiter who is now ready, hence we still only
    916           // need to wake one waiter here.
    917           break;
    918         }
    919       } else {
    920         // No more waiters.
    921         break;
    922       }
    923     }
    924   }
    925 }
    926 
    927 void Mutex::assertLockedByCaller(Exclusivity exclusivity) const {
    928   switch (exclusivity) {
    929     case EXCLUSIVE:
    930       // A read lock should fail if the mutex is already held for writing.
    931       if (pthread_rwlock_tryrdlock(&mutex) == 0) {
    932         pthread_rwlock_unlock(&mutex);
    933         KJ_FAIL_ASSERT("Tried to call getAlreadyLocked*() but lock is not held.");
    934       }
    935       break;
    936     case SHARED:
    937       // A write lock should fail if the mutex is already held for reading or writing.  We don't
    938       // have any way to prove that the lock is held only for reading.
    939       if (pthread_rwlock_trywrlock(&mutex) == 0) {
    940         pthread_rwlock_unlock(&mutex);
    941         KJ_FAIL_ASSERT("Tried to call getAlreadyLocked*() but lock is not held.");
    942       }
    943       break;
    944   }
    945 }
    946 
    947 void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout, NoopSourceLocation) {
    948   // Add waiter to list.
    949   Waiter waiter {
    950     nullptr, waitersTail, predicate, nullptr,
    951     PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER
    952   };
    953   addWaiter(waiter);
    954 
    955   // To guarantee that we've re-locked the mutex before scope exit, keep track of whether it is
    956   // currently.
    957   bool currentlyLocked = true;
    958   KJ_DEFER({
    959     if (!currentlyLocked) lock(EXCLUSIVE, nullptr, NoopSourceLocation{});
    960     removeWaiter(waiter);
    961 
    962     // Destroy pthread objects.
    963     KJ_PTHREAD_CLEANUP(pthread_mutex_destroy(&waiter.stupidMutex));
    964     KJ_PTHREAD_CLEANUP(pthread_cond_destroy(&waiter.condvar));
    965   });
    966 
    967 #if !__APPLE__
    968   if (timeout != nullptr) {
    969     // Oops, the default condvar uses the wall clock, which is dumb... fix it to use the monotonic
    970     // clock. (Except not on macOS, where pthread_condattr_setclock() is unimplemented, but there's
    971     // a bizarre pthread_cond_timedwait_relative_np() method we can use instead...)
    972     pthread_condattr_t attr;
    973     KJ_PTHREAD_CALL(pthread_condattr_init(&attr));
    974     KJ_PTHREAD_CALL(pthread_condattr_setclock(&attr, CLOCK_MONOTONIC));
    975     pthread_cond_init(&waiter.condvar, &attr);
    976     KJ_PTHREAD_CALL(pthread_condattr_destroy(&attr));
    977   }
    978 #endif
    979 
    980   Maybe<struct timespec> endTime = timeout.map([](Duration d) {
    981     return toAbsoluteTimespec(now() + d);
    982   });
    983 
    984   while (!predicate.check()) {
    985     // pthread condvars only work with basic mutexes, not rwlocks. So, we need to lock a basic
    986     // mutex before we unlock the real mutex, and the signaling thread also needs to lock this
    987     // mutex, in order to ensure that this thread is actually waiting on the condvar before it is
    988     // signaled.
    989     KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter.stupidMutex));
    990 
    991     // OK, now we can unlock the main mutex.
    992     unlock(EXCLUSIVE, &waiter);
    993     currentlyLocked = false;
    994 
    995     bool timedOut = false;
    996 
    997     // Wait for someone to signal the condvar.
    998     KJ_IF_MAYBE(t, endTime) {
    999 #if __APPLE__
   1000       // On macOS, the absolute timeout can only be specified in wall time, not monotonic time,
   1001       // which means modifying the system clock will break the wait. However, macOS happens to
   1002       // provide an alternative relative-time wait function, so I guess we'll use that. It does
   1003       // require recomputing the time every iteration...
   1004       struct timespec ts = toRelativeTimespec(kj::max(toTimePoint(*t) - now(), 0 * kj::SECONDS));
   1005       int error = pthread_cond_timedwait_relative_np(&waiter.condvar, &waiter.stupidMutex, &ts);
   1006 #else
   1007       int error = pthread_cond_timedwait(&waiter.condvar, &waiter.stupidMutex, t);
   1008 #endif
   1009       if (error != 0) {
   1010         if (error == ETIMEDOUT) {
   1011           timedOut = true;
   1012         } else {
   1013           KJ_FAIL_SYSCALL("pthread_cond_timedwait", error);
   1014         }
   1015       }
   1016     } else {
   1017       KJ_PTHREAD_CALL(pthread_cond_wait(&waiter.condvar, &waiter.stupidMutex));
   1018     }
   1019 
   1020     // We have to be very careful about lock ordering here. We need to unlock stupidMutex before
   1021     // re-locking the main mutex, because another thread may have a lock on the main mutex already
   1022     // and be waiting for a lock on stupidMutex. Note that other thread may signal the condvar
   1023     // right after we unlock stupidMutex but before we re-lock the main mutex. That is fine,
   1024     // because we've already been signaled.
   1025     KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter.stupidMutex));
   1026 
   1027     lock(EXCLUSIVE, nullptr, NoopSourceLocation{});
   1028     currentlyLocked = true;
   1029 
   1030     KJ_IF_MAYBE(exception, waiter.exception) {
   1031       // The predicate threw an exception, apparently. Propagate it.
   1032       // TODO(someday): Could we somehow have this be a recoverable exception? Presumably we'd
   1033       //   then want MutexGuarded::when() to skip calling the callback, but then what should it
   1034       //   return, since it normally returns the callback's result? Or maybe people who disable
   1035       //   exceptions just really should not write predicates that can throw.
   1036       kj::throwFatalException(kj::mv(**exception));
   1037     }
   1038 
   1039     if (timedOut) {
   1040       return;
   1041     }
   1042   }
   1043 }
   1044 
   1045 void Mutex::induceSpuriousWakeupForTest() {
   1046   auto nextWaiter = waitersHead;
   1047   for (;;) {
   1048     KJ_IF_MAYBE(waiter, nextWaiter) {
   1049       nextWaiter = waiter->next;
   1050       KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter->stupidMutex));
   1051       KJ_PTHREAD_CALL(pthread_cond_signal(&waiter->condvar));
   1052       KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter->stupidMutex));
   1053     } else {
   1054       // No more waiters.
   1055       break;
   1056     }
   1057   }
   1058 }
   1059 
   1060 Once::Once(bool startInitialized)
   1061     : state(startInitialized ? INITIALIZED : UNINITIALIZED),
   1062       mutex(PTHREAD_MUTEX_INITIALIZER) {}
   1063 Once::~Once() {
   1064   KJ_PTHREAD_CLEANUP(pthread_mutex_destroy(&mutex));
   1065 }
   1066 
   1067 void Once::runOnce(Initializer& init, NoopSourceLocation) {
   1068   KJ_PTHREAD_CALL(pthread_mutex_lock(&mutex));
   1069   KJ_DEFER(KJ_PTHREAD_CALL(pthread_mutex_unlock(&mutex)));
   1070 
   1071   if (state != UNINITIALIZED) {
   1072     return;
   1073   }
   1074 
   1075   init.run();
   1076 
   1077   __atomic_store_n(&state, INITIALIZED, __ATOMIC_RELEASE);
   1078 }
   1079 
   1080 void Once::reset() {
   1081   State oldState = INITIALIZED;
   1082   if (!__atomic_compare_exchange_n(&state, &oldState, UNINITIALIZED,
   1083                                    false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
   1084     KJ_FAIL_REQUIRE("reset() called while not initialized.");
   1085   }
   1086 }
   1087 
   1088 #endif
   1089 
   1090 }  // namespace _ (private)
   1091 }  // namespace kj