qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

test-aio-multithread.c (10867B)


      1 /*
      2  * AioContext multithreading tests
      3  *
      4  * Copyright Red Hat, Inc. 2016
      5  *
      6  * Authors:
      7  *  Paolo Bonzini    <pbonzini@redhat.com>
      8  *
      9  * This work is licensed under the terms of the GNU LGPL, version 2 or later.
     10  * See the COPYING.LIB file in the top-level directory.
     11  */
     12 
     13 #include "qemu/osdep.h"
     14 #include "block/aio.h"
     15 #include "qemu/coroutine.h"
     16 #include "qemu/thread.h"
     17 #include "qemu/error-report.h"
     18 #include "iothread.h"
     19 
     20 /* AioContext management */
     21 
     22 #define NUM_CONTEXTS 5
     23 
     24 static IOThread *threads[NUM_CONTEXTS];
     25 static AioContext *ctx[NUM_CONTEXTS];
     26 static __thread int id = -1;
     27 
     28 static QemuEvent done_event;
     29 
     30 /* Run a function synchronously on a remote iothread. */
     31 
     32 typedef struct CtxRunData {
     33     QEMUBHFunc *cb;
     34     void *arg;
     35 } CtxRunData;
     36 
     37 static void ctx_run_bh_cb(void *opaque)
     38 {
     39     CtxRunData *data = opaque;
     40 
     41     data->cb(data->arg);
     42     qemu_event_set(&done_event);
     43 }
     44 
     45 static void ctx_run(int i, QEMUBHFunc *cb, void *opaque)
     46 {
     47     CtxRunData data = {
     48         .cb = cb,
     49         .arg = opaque
     50     };
     51 
     52     qemu_event_reset(&done_event);
     53     aio_bh_schedule_oneshot(ctx[i], ctx_run_bh_cb, &data);
     54     qemu_event_wait(&done_event);
     55 }
     56 
     57 /* Starting the iothreads. */
     58 
     59 static void set_id_cb(void *opaque)
     60 {
     61     int *i = opaque;
     62 
     63     id = *i;
     64 }
     65 
     66 static void create_aio_contexts(void)
     67 {
     68     int i;
     69 
     70     for (i = 0; i < NUM_CONTEXTS; i++) {
     71         threads[i] = iothread_new();
     72         ctx[i] = iothread_get_aio_context(threads[i]);
     73     }
     74 
     75     qemu_event_init(&done_event, false);
     76     for (i = 0; i < NUM_CONTEXTS; i++) {
     77         ctx_run(i, set_id_cb, &i);
     78     }
     79 }
     80 
     81 /* Stopping the iothreads. */
     82 
     83 static void join_aio_contexts(void)
     84 {
     85     int i;
     86 
     87     for (i = 0; i < NUM_CONTEXTS; i++) {
     88         aio_context_ref(ctx[i]);
     89     }
     90     for (i = 0; i < NUM_CONTEXTS; i++) {
     91         iothread_join(threads[i]);
     92     }
     93     for (i = 0; i < NUM_CONTEXTS; i++) {
     94         aio_context_unref(ctx[i]);
     95     }
     96     qemu_event_destroy(&done_event);
     97 }
     98 
     99 /* Basic test for the stuff above. */
    100 
    101 static void test_lifecycle(void)
    102 {
    103     create_aio_contexts();
    104     join_aio_contexts();
    105 }
    106 
    107 /* aio_co_schedule test.  */
    108 
    109 static Coroutine *to_schedule[NUM_CONTEXTS];
    110 
    111 static bool now_stopping;
    112 
    113 static int count_retry;
    114 static int count_here;
    115 static int count_other;
    116 
    117 static bool schedule_next(int n)
    118 {
    119     Coroutine *co;
    120 
    121     co = qatomic_xchg(&to_schedule[n], NULL);
    122     if (!co) {
    123         qatomic_inc(&count_retry);
    124         return false;
    125     }
    126 
    127     if (n == id) {
    128         qatomic_inc(&count_here);
    129     } else {
    130         qatomic_inc(&count_other);
    131     }
    132 
    133     aio_co_schedule(ctx[n], co);
    134     return true;
    135 }
    136 
    137 static void finish_cb(void *opaque)
    138 {
    139     schedule_next(id);
    140 }
    141 
    142 static coroutine_fn void test_multi_co_schedule_entry(void *opaque)
    143 {
    144     g_assert(to_schedule[id] == NULL);
    145 
    146     while (!qatomic_mb_read(&now_stopping)) {
    147         int n;
    148 
    149         n = g_test_rand_int_range(0, NUM_CONTEXTS);
    150         schedule_next(n);
    151 
    152         qatomic_mb_set(&to_schedule[id], qemu_coroutine_self());
    153         qemu_coroutine_yield();
    154         g_assert(to_schedule[id] == NULL);
    155     }
    156 }
    157 
    158 
    159 static void test_multi_co_schedule(int seconds)
    160 {
    161     int i;
    162 
    163     count_here = count_other = count_retry = 0;
    164     now_stopping = false;
    165 
    166     create_aio_contexts();
    167     for (i = 0; i < NUM_CONTEXTS; i++) {
    168         Coroutine *co1 = qemu_coroutine_create(test_multi_co_schedule_entry, NULL);
    169         aio_co_schedule(ctx[i], co1);
    170     }
    171 
    172     g_usleep(seconds * 1000000);
    173 
    174     qatomic_mb_set(&now_stopping, true);
    175     for (i = 0; i < NUM_CONTEXTS; i++) {
    176         ctx_run(i, finish_cb, NULL);
    177         to_schedule[i] = NULL;
    178     }
    179 
    180     join_aio_contexts();
    181     g_test_message("scheduled %d, queued %d, retry %d, total %d",
    182                   count_other, count_here, count_retry,
    183                   count_here + count_other + count_retry);
    184 }
    185 
    186 static void test_multi_co_schedule_1(void)
    187 {
    188     test_multi_co_schedule(1);
    189 }
    190 
    191 static void test_multi_co_schedule_10(void)
    192 {
    193     test_multi_co_schedule(10);
    194 }
    195 
    196 /* CoMutex thread-safety.  */
    197 
    198 static uint32_t atomic_counter;
    199 static uint32_t running;
    200 static uint32_t counter;
    201 static CoMutex comutex;
    202 
    203 static void coroutine_fn test_multi_co_mutex_entry(void *opaque)
    204 {
    205     while (!qatomic_mb_read(&now_stopping)) {
    206         qemu_co_mutex_lock(&comutex);
    207         counter++;
    208         qemu_co_mutex_unlock(&comutex);
    209 
    210         /* Increase atomic_counter *after* releasing the mutex.  Otherwise
    211          * there is a chance (it happens about 1 in 3 runs) that the iothread
    212          * exits before the coroutine is woken up, causing a spurious
    213          * assertion failure.
    214          */
    215         qatomic_inc(&atomic_counter);
    216     }
    217     qatomic_dec(&running);
    218 }
    219 
    220 static void test_multi_co_mutex(int threads, int seconds)
    221 {
    222     int i;
    223 
    224     qemu_co_mutex_init(&comutex);
    225     counter = 0;
    226     atomic_counter = 0;
    227     now_stopping = false;
    228 
    229     create_aio_contexts();
    230     assert(threads <= NUM_CONTEXTS);
    231     running = threads;
    232     for (i = 0; i < threads; i++) {
    233         Coroutine *co1 = qemu_coroutine_create(test_multi_co_mutex_entry, NULL);
    234         aio_co_schedule(ctx[i], co1);
    235     }
    236 
    237     g_usleep(seconds * 1000000);
    238 
    239     qatomic_mb_set(&now_stopping, true);
    240     while (running > 0) {
    241         g_usleep(100000);
    242     }
    243 
    244     join_aio_contexts();
    245     g_test_message("%d iterations/second", counter / seconds);
    246     g_assert_cmpint(counter, ==, atomic_counter);
    247 }
    248 
    249 /* Testing with NUM_CONTEXTS threads focuses on the queue.  The mutex however
    250  * is too contended (and the threads spend too much time in aio_poll)
    251  * to actually stress the handoff protocol.
    252  */
    253 static void test_multi_co_mutex_1(void)
    254 {
    255     test_multi_co_mutex(NUM_CONTEXTS, 1);
    256 }
    257 
    258 static void test_multi_co_mutex_10(void)
    259 {
    260     test_multi_co_mutex(NUM_CONTEXTS, 10);
    261 }
    262 
    263 /* Testing with fewer threads stresses the handoff protocol too.  Still, the
    264  * case where the locker _can_ pick up a handoff is very rare, happening
    265  * about 10 times in 1 million, so increase the runtime a bit compared to
    266  * other "quick" testcases that only run for 1 second.
    267  */
    268 static void test_multi_co_mutex_2_3(void)
    269 {
    270     test_multi_co_mutex(2, 3);
    271 }
    272 
    273 static void test_multi_co_mutex_2_30(void)
    274 {
    275     test_multi_co_mutex(2, 30);
    276 }
    277 
    278 /* Same test with fair mutexes, for performance comparison.  */
    279 
    280 #ifdef CONFIG_LINUX
    281 #include "qemu/futex.h"
    282 
    283 /* The nodes for the mutex reside in this structure (on which we try to avoid
    284  * false sharing).  The head of the mutex is in the "mutex_head" variable.
    285  */
    286 static struct {
    287     int next, locked;
    288     int padding[14];
    289 } nodes[NUM_CONTEXTS] __attribute__((__aligned__(64)));
    290 
    291 static int mutex_head = -1;
    292 
    293 static void mcs_mutex_lock(void)
    294 {
    295     int prev;
    296 
    297     nodes[id].next = -1;
    298     nodes[id].locked = 1;
    299     prev = qatomic_xchg(&mutex_head, id);
    300     if (prev != -1) {
    301         qatomic_set(&nodes[prev].next, id);
    302         qemu_futex_wait(&nodes[id].locked, 1);
    303     }
    304 }
    305 
    306 static void mcs_mutex_unlock(void)
    307 {
    308     int next;
    309     if (qatomic_read(&nodes[id].next) == -1) {
    310         if (qatomic_read(&mutex_head) == id &&
    311             qatomic_cmpxchg(&mutex_head, id, -1) == id) {
    312             /* Last item in the list, exit.  */
    313             return;
    314         }
    315         while (qatomic_read(&nodes[id].next) == -1) {
    316             /* mcs_mutex_lock did the xchg, but has not updated
    317              * nodes[prev].next yet.
    318              */
    319         }
    320     }
    321 
    322     /* Wake up the next in line.  */
    323     next = qatomic_read(&nodes[id].next);
    324     nodes[next].locked = 0;
    325     qemu_futex_wake(&nodes[next].locked, 1);
    326 }
    327 
    328 static void test_multi_fair_mutex_entry(void *opaque)
    329 {
    330     while (!qatomic_mb_read(&now_stopping)) {
    331         mcs_mutex_lock();
    332         counter++;
    333         mcs_mutex_unlock();
    334         qatomic_inc(&atomic_counter);
    335     }
    336     qatomic_dec(&running);
    337 }
    338 
    339 static void test_multi_fair_mutex(int threads, int seconds)
    340 {
    341     int i;
    342 
    343     assert(mutex_head == -1);
    344     counter = 0;
    345     atomic_counter = 0;
    346     now_stopping = false;
    347 
    348     create_aio_contexts();
    349     assert(threads <= NUM_CONTEXTS);
    350     running = threads;
    351     for (i = 0; i < threads; i++) {
    352         Coroutine *co1 = qemu_coroutine_create(test_multi_fair_mutex_entry, NULL);
    353         aio_co_schedule(ctx[i], co1);
    354     }
    355 
    356     g_usleep(seconds * 1000000);
    357 
    358     qatomic_mb_set(&now_stopping, true);
    359     while (running > 0) {
    360         g_usleep(100000);
    361     }
    362 
    363     join_aio_contexts();
    364     g_test_message("%d iterations/second", counter / seconds);
    365     g_assert_cmpint(counter, ==, atomic_counter);
    366 }
    367 
    368 static void test_multi_fair_mutex_1(void)
    369 {
    370     test_multi_fair_mutex(NUM_CONTEXTS, 1);
    371 }
    372 
    373 static void test_multi_fair_mutex_10(void)
    374 {
    375     test_multi_fair_mutex(NUM_CONTEXTS, 10);
    376 }
    377 #endif
    378 
    379 /* Same test with pthread mutexes, for performance comparison and
    380  * portability.  */
    381 
    382 static QemuMutex mutex;
    383 
    384 static void test_multi_mutex_entry(void *opaque)
    385 {
    386     while (!qatomic_mb_read(&now_stopping)) {
    387         qemu_mutex_lock(&mutex);
    388         counter++;
    389         qemu_mutex_unlock(&mutex);
    390         qatomic_inc(&atomic_counter);
    391     }
    392     qatomic_dec(&running);
    393 }
    394 
    395 static void test_multi_mutex(int threads, int seconds)
    396 {
    397     int i;
    398 
    399     qemu_mutex_init(&mutex);
    400     counter = 0;
    401     atomic_counter = 0;
    402     now_stopping = false;
    403 
    404     create_aio_contexts();
    405     assert(threads <= NUM_CONTEXTS);
    406     running = threads;
    407     for (i = 0; i < threads; i++) {
    408         Coroutine *co1 = qemu_coroutine_create(test_multi_mutex_entry, NULL);
    409         aio_co_schedule(ctx[i], co1);
    410     }
    411 
    412     g_usleep(seconds * 1000000);
    413 
    414     qatomic_mb_set(&now_stopping, true);
    415     while (running > 0) {
    416         g_usleep(100000);
    417     }
    418 
    419     join_aio_contexts();
    420     g_test_message("%d iterations/second", counter / seconds);
    421     g_assert_cmpint(counter, ==, atomic_counter);
    422 }
    423 
    424 static void test_multi_mutex_1(void)
    425 {
    426     test_multi_mutex(NUM_CONTEXTS, 1);
    427 }
    428 
    429 static void test_multi_mutex_10(void)
    430 {
    431     test_multi_mutex(NUM_CONTEXTS, 10);
    432 }
    433 
    434 /* End of tests.  */
    435 
    436 int main(int argc, char **argv)
    437 {
    438     init_clocks(NULL);
    439 
    440     g_test_init(&argc, &argv, NULL);
    441     g_test_add_func("/aio/multi/lifecycle", test_lifecycle);
    442     if (g_test_quick()) {
    443         g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
    444         g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_1);
    445         g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_3);
    446 #ifdef CONFIG_LINUX
    447         g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_1);
    448 #endif
    449         g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_1);
    450     } else {
    451         g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);
    452         g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_10);
    453         g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_30);
    454 #ifdef CONFIG_LINUX
    455         g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_10);
    456 #endif
    457         g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_10);
    458     }
    459     return g_test_run();
    460 }