qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

qemu-coroutine-lock.c (13806B)


      1 /*
      2  * coroutine queues and locks
      3  *
      4  * Copyright (c) 2011 Kevin Wolf <kwolf@redhat.com>
      5  *
      6  * Permission is hereby granted, free of charge, to any person obtaining a copy
      7  * of this software and associated documentation files (the "Software"), to deal
      8  * in the Software without restriction, including without limitation the rights
      9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
     10  * copies of the Software, and to permit persons to whom the Software is
     11  * furnished to do so, subject to the following conditions:
     12  *
     13  * The above copyright notice and this permission notice shall be included in
     14  * all copies or substantial portions of the Software.
     15  *
     16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
     19  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     22  * THE SOFTWARE.
     23  *
     24  * The lock-free mutex implementation is based on OSv
     25  * (core/lfmutex.cc, include/lockfree/mutex.hh).
     26  * Copyright (C) 2013 Cloudius Systems, Ltd.
     27  */
     28 
     29 #include "qemu/osdep.h"
     30 #include "qemu/coroutine.h"
     31 #include "qemu/coroutine_int.h"
     32 #include "qemu/processor.h"
     33 #include "qemu/queue.h"
     34 #include "block/aio.h"
     35 #include "trace.h"
     36 
     37 void qemu_co_queue_init(CoQueue *queue)
     38 {
     39     QSIMPLEQ_INIT(&queue->entries);
     40 }
     41 
     42 void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuLockable *lock,
     43                                           CoQueueWaitFlags flags)
     44 {
     45     Coroutine *self = qemu_coroutine_self();
     46     if (flags & CO_QUEUE_WAIT_FRONT) {
     47         QSIMPLEQ_INSERT_HEAD(&queue->entries, self, co_queue_next);
     48     } else {
     49         QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
     50     }
     51 
     52     if (lock) {
     53         qemu_lockable_unlock(lock);
     54     }
     55 
     56     /* There is no race condition here.  Other threads will call
     57      * aio_co_schedule on our AioContext, which can reenter this
     58      * coroutine but only after this yield and after the main loop
     59      * has gone through the next iteration.
     60      */
     61     qemu_coroutine_yield();
     62     assert(qemu_in_coroutine());
     63 
     64     /* TODO: OSv implements wait morphing here, where the wakeup
     65      * primitive automatically places the woken coroutine on the
     66      * mutex's queue.  This avoids the thundering herd effect.
     67      * This could be implemented for CoMutexes, but not really for
     68      * other cases of QemuLockable.
     69      */
     70     if (lock) {
     71         qemu_lockable_lock(lock);
     72     }
     73 }
     74 
     75 bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock)
     76 {
     77     Coroutine *next;
     78 
     79     next = QSIMPLEQ_FIRST(&queue->entries);
     80     if (!next) {
     81         return false;
     82     }
     83 
     84     QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
     85     if (lock) {
     86         qemu_lockable_unlock(lock);
     87     }
     88     aio_co_wake(next);
     89     if (lock) {
     90         qemu_lockable_lock(lock);
     91     }
     92     return true;
     93 }
     94 
     95 bool coroutine_fn qemu_co_queue_next(CoQueue *queue)
     96 {
     97     /* No unlock/lock needed in coroutine context.  */
     98     return qemu_co_enter_next_impl(queue, NULL);
     99 }
    100 
    101 void qemu_co_enter_all_impl(CoQueue *queue, QemuLockable *lock)
    102 {
    103     while (qemu_co_enter_next_impl(queue, lock)) {
    104         /* just loop */
    105     }
    106 }
    107 
    108 void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue)
    109 {
    110     /* No unlock/lock needed in coroutine context.  */
    111     qemu_co_enter_all_impl(queue, NULL);
    112 }
    113 
    114 bool qemu_co_queue_empty(CoQueue *queue)
    115 {
    116     return QSIMPLEQ_FIRST(&queue->entries) == NULL;
    117 }
    118 
    119 /* The wait records are handled with a multiple-producer, single-consumer
    120  * lock-free queue.  There cannot be two concurrent pop_waiter() calls
    121  * because pop_waiter() can only be called while mutex->handoff is zero.
    122  * This can happen in three cases:
    123  * - in qemu_co_mutex_unlock, before the hand-off protocol has started.
    124  *   In this case, qemu_co_mutex_lock will see mutex->handoff == 0 and
    125  *   not take part in the handoff.
    126  * - in qemu_co_mutex_lock, if it steals the hand-off responsibility from
    127  *   qemu_co_mutex_unlock.  In this case, qemu_co_mutex_unlock will fail
    128  *   the cmpxchg (it will see either 0 or the next sequence value) and
    129  *   exit.  The next hand-off cannot begin until qemu_co_mutex_lock has
    130  *   woken up someone.
    131  * - in qemu_co_mutex_unlock, if it takes the hand-off token itself.
    132  *   In this case another iteration starts with mutex->handoff == 0;
    133  *   a concurrent qemu_co_mutex_lock will fail the cmpxchg, and
    134  *   qemu_co_mutex_unlock will go back to case (1).
    135  *
    136  * The following functions manage this queue.
    137  */
    138 typedef struct CoWaitRecord {
    139     Coroutine *co;
    140     QSLIST_ENTRY(CoWaitRecord) next;
    141 } CoWaitRecord;
    142 
    143 static void coroutine_fn push_waiter(CoMutex *mutex, CoWaitRecord *w)
    144 {
    145     w->co = qemu_coroutine_self();
    146     QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next);
    147 }
    148 
    149 static void move_waiters(CoMutex *mutex)
    150 {
    151     QSLIST_HEAD(, CoWaitRecord) reversed;
    152     QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push);
    153     while (!QSLIST_EMPTY(&reversed)) {
    154         CoWaitRecord *w = QSLIST_FIRST(&reversed);
    155         QSLIST_REMOVE_HEAD(&reversed, next);
    156         QSLIST_INSERT_HEAD(&mutex->to_pop, w, next);
    157     }
    158 }
    159 
    160 static CoWaitRecord *pop_waiter(CoMutex *mutex)
    161 {
    162     CoWaitRecord *w;
    163 
    164     if (QSLIST_EMPTY(&mutex->to_pop)) {
    165         move_waiters(mutex);
    166         if (QSLIST_EMPTY(&mutex->to_pop)) {
    167             return NULL;
    168         }
    169     }
    170     w = QSLIST_FIRST(&mutex->to_pop);
    171     QSLIST_REMOVE_HEAD(&mutex->to_pop, next);
    172     return w;
    173 }
    174 
    175 static bool has_waiters(CoMutex *mutex)
    176 {
    177     return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push);
    178 }
    179 
    180 void qemu_co_mutex_init(CoMutex *mutex)
    181 {
    182     memset(mutex, 0, sizeof(*mutex));
    183 }
    184 
    185 static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co)
    186 {
    187     /* Read co before co->ctx; pairs with smp_wmb() in
    188      * qemu_coroutine_enter().
    189      */
    190     smp_read_barrier_depends();
    191     mutex->ctx = co->ctx;
    192     aio_co_wake(co);
    193 }
    194 
    195 static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx,
    196                                                      CoMutex *mutex)
    197 {
    198     Coroutine *self = qemu_coroutine_self();
    199     CoWaitRecord w;
    200     unsigned old_handoff;
    201 
    202     trace_qemu_co_mutex_lock_entry(mutex, self);
    203     push_waiter(mutex, &w);
    204 
    205     /* This is the "Responsibility Hand-Off" protocol; a lock() picks from
    206      * a concurrent unlock() the responsibility of waking somebody up.
    207      */
    208     old_handoff = qatomic_mb_read(&mutex->handoff);
    209     if (old_handoff &&
    210         has_waiters(mutex) &&
    211         qatomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) {
    212         /* There can be no concurrent pops, because there can be only
    213          * one active handoff at a time.
    214          */
    215         CoWaitRecord *to_wake = pop_waiter(mutex);
    216         Coroutine *co = to_wake->co;
    217         if (co == self) {
    218             /* We got the lock ourselves!  */
    219             assert(to_wake == &w);
    220             mutex->ctx = ctx;
    221             return;
    222         }
    223 
    224         qemu_co_mutex_wake(mutex, co);
    225     }
    226 
    227     qemu_coroutine_yield();
    228     trace_qemu_co_mutex_lock_return(mutex, self);
    229 }
    230 
    231 void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
    232 {
    233     AioContext *ctx = qemu_get_current_aio_context();
    234     Coroutine *self = qemu_coroutine_self();
    235     int waiters, i;
    236 
    237     /* Running a very small critical section on pthread_mutex_t and CoMutex
    238      * shows that pthread_mutex_t is much faster because it doesn't actually
    239      * go to sleep.  What happens is that the critical section is shorter
    240      * than the latency of entering the kernel and thus FUTEX_WAIT always
    241      * fails.  With CoMutex there is no such latency but you still want to
    242      * avoid wait and wakeup.  So introduce it artificially.
    243      */
    244     i = 0;
    245 retry_fast_path:
    246     waiters = qatomic_cmpxchg(&mutex->locked, 0, 1);
    247     if (waiters != 0) {
    248         while (waiters == 1 && ++i < 1000) {
    249             if (qatomic_read(&mutex->ctx) == ctx) {
    250                 break;
    251             }
    252             if (qatomic_read(&mutex->locked) == 0) {
    253                 goto retry_fast_path;
    254             }
    255             cpu_relax();
    256         }
    257         waiters = qatomic_fetch_inc(&mutex->locked);
    258     }
    259 
    260     if (waiters == 0) {
    261         /* Uncontended.  */
    262         trace_qemu_co_mutex_lock_uncontended(mutex, self);
    263         mutex->ctx = ctx;
    264     } else {
    265         qemu_co_mutex_lock_slowpath(ctx, mutex);
    266     }
    267     mutex->holder = self;
    268     self->locks_held++;
    269 }
    270 
    271 void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
    272 {
    273     Coroutine *self = qemu_coroutine_self();
    274 
    275     trace_qemu_co_mutex_unlock_entry(mutex, self);
    276 
    277     assert(mutex->locked);
    278     assert(mutex->holder == self);
    279     assert(qemu_in_coroutine());
    280 
    281     mutex->ctx = NULL;
    282     mutex->holder = NULL;
    283     self->locks_held--;
    284     if (qatomic_fetch_dec(&mutex->locked) == 1) {
    285         /* No waiting qemu_co_mutex_lock().  Pfew, that was easy!  */
    286         return;
    287     }
    288 
    289     for (;;) {
    290         CoWaitRecord *to_wake = pop_waiter(mutex);
    291         unsigned our_handoff;
    292 
    293         if (to_wake) {
    294             qemu_co_mutex_wake(mutex, to_wake->co);
    295             break;
    296         }
    297 
    298         /* Some concurrent lock() is in progress (we know this because
    299          * mutex->locked was >1) but it hasn't yet put itself on the wait
    300          * queue.  Pick a sequence number for the handoff protocol (not 0).
    301          */
    302         if (++mutex->sequence == 0) {
    303             mutex->sequence = 1;
    304         }
    305 
    306         our_handoff = mutex->sequence;
    307         qatomic_mb_set(&mutex->handoff, our_handoff);
    308         if (!has_waiters(mutex)) {
    309             /* The concurrent lock has not added itself yet, so it
    310              * will be able to pick our handoff.
    311              */
    312             break;
    313         }
    314 
    315         /* Try to do the handoff protocol ourselves; if somebody else has
    316          * already taken it, however, we're done and they're responsible.
    317          */
    318         if (qatomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) {
    319             break;
    320         }
    321     }
    322 
    323     trace_qemu_co_mutex_unlock_return(mutex, self);
    324 }
    325 
    326 struct CoRwTicket {
    327     bool read;
    328     Coroutine *co;
    329     QSIMPLEQ_ENTRY(CoRwTicket) next;
    330 };
    331 
    332 void qemu_co_rwlock_init(CoRwlock *lock)
    333 {
    334     qemu_co_mutex_init(&lock->mutex);
    335     lock->owners = 0;
    336     QSIMPLEQ_INIT(&lock->tickets);
    337 }
    338 
    339 /* Releases the internal CoMutex.  */
    340 static void coroutine_fn qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)
    341 {
    342     CoRwTicket *tkt = QSIMPLEQ_FIRST(&lock->tickets);
    343     Coroutine *co = NULL;
    344 
    345     /*
    346      * Setting lock->owners here prevents rdlock and wrlock from
    347      * sneaking in between unlock and wake.
    348      */
    349 
    350     if (tkt) {
    351         if (tkt->read) {
    352             if (lock->owners >= 0) {
    353                 lock->owners++;
    354                 co = tkt->co;
    355             }
    356         } else {
    357             if (lock->owners == 0) {
    358                 lock->owners = -1;
    359                 co = tkt->co;
    360             }
    361         }
    362     }
    363 
    364     if (co) {
    365         QSIMPLEQ_REMOVE_HEAD(&lock->tickets, next);
    366         qemu_co_mutex_unlock(&lock->mutex);
    367         aio_co_wake(co);
    368     } else {
    369         qemu_co_mutex_unlock(&lock->mutex);
    370     }
    371 }
    372 
    373 void coroutine_fn qemu_co_rwlock_rdlock(CoRwlock *lock)
    374 {
    375     Coroutine *self = qemu_coroutine_self();
    376 
    377     qemu_co_mutex_lock(&lock->mutex);
    378     /* For fairness, wait if a writer is in line.  */
    379     if (lock->owners == 0 || (lock->owners > 0 && QSIMPLEQ_EMPTY(&lock->tickets))) {
    380         lock->owners++;
    381         qemu_co_mutex_unlock(&lock->mutex);
    382     } else {
    383         CoRwTicket my_ticket = { true, self };
    384 
    385         QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
    386         qemu_co_mutex_unlock(&lock->mutex);
    387         qemu_coroutine_yield();
    388         assert(lock->owners >= 1);
    389 
    390         /* Possibly wake another reader, which will wake the next in line.  */
    391         qemu_co_mutex_lock(&lock->mutex);
    392         qemu_co_rwlock_maybe_wake_one(lock);
    393     }
    394 
    395     self->locks_held++;
    396 }
    397 
    398 void coroutine_fn qemu_co_rwlock_unlock(CoRwlock *lock)
    399 {
    400     Coroutine *self = qemu_coroutine_self();
    401 
    402     assert(qemu_in_coroutine());
    403     self->locks_held--;
    404 
    405     qemu_co_mutex_lock(&lock->mutex);
    406     if (lock->owners > 0) {
    407         lock->owners--;
    408     } else {
    409         assert(lock->owners == -1);
    410         lock->owners = 0;
    411     }
    412 
    413     qemu_co_rwlock_maybe_wake_one(lock);
    414 }
    415 
    416 void coroutine_fn qemu_co_rwlock_downgrade(CoRwlock *lock)
    417 {
    418     qemu_co_mutex_lock(&lock->mutex);
    419     assert(lock->owners == -1);
    420     lock->owners = 1;
    421 
    422     /* Possibly wake another reader, which will wake the next in line.  */
    423     qemu_co_rwlock_maybe_wake_one(lock);
    424 }
    425 
    426 void coroutine_fn qemu_co_rwlock_wrlock(CoRwlock *lock)
    427 {
    428     Coroutine *self = qemu_coroutine_self();
    429 
    430     qemu_co_mutex_lock(&lock->mutex);
    431     if (lock->owners == 0) {
    432         lock->owners = -1;
    433         qemu_co_mutex_unlock(&lock->mutex);
    434     } else {
    435         CoRwTicket my_ticket = { false, qemu_coroutine_self() };
    436 
    437         QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
    438         qemu_co_mutex_unlock(&lock->mutex);
    439         qemu_coroutine_yield();
    440         assert(lock->owners == -1);
    441     }
    442 
    443     self->locks_held++;
    444 }
    445 
    446 void coroutine_fn qemu_co_rwlock_upgrade(CoRwlock *lock)
    447 {
    448     qemu_co_mutex_lock(&lock->mutex);
    449     assert(lock->owners > 0);
    450     /* For fairness, wait if a writer is in line.  */
    451     if (lock->owners == 1 && QSIMPLEQ_EMPTY(&lock->tickets)) {
    452         lock->owners = -1;
    453         qemu_co_mutex_unlock(&lock->mutex);
    454     } else {
    455         CoRwTicket my_ticket = { false, qemu_coroutine_self() };
    456 
    457         lock->owners--;
    458         QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
    459         qemu_co_rwlock_maybe_wake_one(lock);
    460         qemu_coroutine_yield();
    461         assert(lock->owners == -1);
    462     }
    463 }