qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

test-thread-pool.c (6521B)


      1 #include "qemu/osdep.h"
      2 #include "block/aio.h"
      3 #include "block/thread-pool.h"
      4 #include "block/block.h"
      5 #include "qapi/error.h"
      6 #include "qemu/timer.h"
      7 #include "qemu/error-report.h"
      8 #include "qemu/main-loop.h"
      9 
     10 static AioContext *ctx;
     11 static ThreadPool *pool;
     12 static int active;
     13 
     14 typedef struct {
     15     BlockAIOCB *aiocb;
     16     int n;
     17     int ret;
     18 } WorkerTestData;
     19 
     20 static int worker_cb(void *opaque)
     21 {
     22     WorkerTestData *data = opaque;
     23     return qatomic_fetch_inc(&data->n);
     24 }
     25 
     26 static int long_cb(void *opaque)
     27 {
     28     WorkerTestData *data = opaque;
     29     if (qatomic_cmpxchg(&data->n, 0, 1) == 0) {
     30         g_usleep(2000000);
     31         qatomic_or(&data->n, 2);
     32     }
     33     return 0;
     34 }
     35 
     36 static void done_cb(void *opaque, int ret)
     37 {
     38     WorkerTestData *data = opaque;
     39     g_assert(data->ret == -EINPROGRESS || data->ret == -ECANCELED);
     40     data->ret = ret;
     41     data->aiocb = NULL;
     42 
     43     /* Callbacks are serialized, so no need to use atomic ops.  */
     44     active--;
     45 }
     46 
     47 static void test_submit(void)
     48 {
     49     WorkerTestData data = { .n = 0 };
     50     thread_pool_submit(pool, worker_cb, &data);
     51     while (data.n == 0) {
     52         aio_poll(ctx, true);
     53     }
     54     g_assert_cmpint(data.n, ==, 1);
     55 }
     56 
     57 static void test_submit_aio(void)
     58 {
     59     WorkerTestData data = { .n = 0, .ret = -EINPROGRESS };
     60     data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data,
     61                                         done_cb, &data);
     62 
     63     /* The callbacks are not called until after the first wait.  */
     64     active = 1;
     65     g_assert_cmpint(data.ret, ==, -EINPROGRESS);
     66     while (data.ret == -EINPROGRESS) {
     67         aio_poll(ctx, true);
     68     }
     69     g_assert_cmpint(active, ==, 0);
     70     g_assert_cmpint(data.n, ==, 1);
     71     g_assert_cmpint(data.ret, ==, 0);
     72 }
     73 
     74 static void co_test_cb(void *opaque)
     75 {
     76     WorkerTestData *data = opaque;
     77 
     78     active = 1;
     79     data->n = 0;
     80     data->ret = -EINPROGRESS;
     81     thread_pool_submit_co(pool, worker_cb, data);
     82 
     83     /* The test continues in test_submit_co, after qemu_coroutine_enter... */
     84 
     85     g_assert_cmpint(data->n, ==, 1);
     86     data->ret = 0;
     87     active--;
     88 
     89     /* The test continues in test_submit_co, after aio_poll... */
     90 }
     91 
     92 static void test_submit_co(void)
     93 {
     94     WorkerTestData data;
     95     Coroutine *co = qemu_coroutine_create(co_test_cb, &data);
     96 
     97     qemu_coroutine_enter(co);
     98 
     99     /* Back here once the worker has started.  */
    100 
    101     g_assert_cmpint(active, ==, 1);
    102     g_assert_cmpint(data.ret, ==, -EINPROGRESS);
    103 
    104     /* aio_poll will execute the rest of the coroutine.  */
    105 
    106     while (data.ret == -EINPROGRESS) {
    107         aio_poll(ctx, true);
    108     }
    109 
    110     /* Back here after the coroutine has finished.  */
    111 
    112     g_assert_cmpint(active, ==, 0);
    113     g_assert_cmpint(data.ret, ==, 0);
    114 }
    115 
    116 static void test_submit_many(void)
    117 {
    118     WorkerTestData data[100];
    119     int i;
    120 
    121     /* Start more work items than there will be threads.  */
    122     for (i = 0; i < 100; i++) {
    123         data[i].n = 0;
    124         data[i].ret = -EINPROGRESS;
    125         thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]);
    126     }
    127 
    128     active = 100;
    129     while (active > 0) {
    130         aio_poll(ctx, true);
    131     }
    132     for (i = 0; i < 100; i++) {
    133         g_assert_cmpint(data[i].n, ==, 1);
    134         g_assert_cmpint(data[i].ret, ==, 0);
    135     }
    136 }
    137 
    138 static void do_test_cancel(bool sync)
    139 {
    140     WorkerTestData data[100];
    141     int num_canceled;
    142     int i;
    143 
    144     /* Start more work items than there will be threads, to ensure
    145      * the pool is full.
    146      */
    147     test_submit_many();
    148 
    149     /* Start long running jobs, to ensure we can cancel some.  */
    150     for (i = 0; i < 100; i++) {
    151         data[i].n = 0;
    152         data[i].ret = -EINPROGRESS;
    153         data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i],
    154                                                done_cb, &data[i]);
    155     }
    156 
    157     /* Starting the threads may be left to a bottom half.  Let it
    158      * run, but do not waste too much time...
    159      */
    160     active = 100;
    161     aio_notify(ctx);
    162     aio_poll(ctx, false);
    163 
    164     /* Wait some time for the threads to start, with some sanity
    165      * testing on the behavior of the scheduler...
    166      */
    167     g_assert_cmpint(active, ==, 100);
    168     g_usleep(1000000);
    169     g_assert_cmpint(active, >, 50);
    170 
    171     /* Cancel the jobs that haven't been started yet.  */
    172     num_canceled = 0;
    173     for (i = 0; i < 100; i++) {
    174         if (qatomic_cmpxchg(&data[i].n, 0, 4) == 0) {
    175             data[i].ret = -ECANCELED;
    176             if (sync) {
    177                 bdrv_aio_cancel(data[i].aiocb);
    178             } else {
    179                 bdrv_aio_cancel_async(data[i].aiocb);
    180             }
    181             num_canceled++;
    182         }
    183     }
    184     g_assert_cmpint(active, >, 0);
    185     g_assert_cmpint(num_canceled, <, 100);
    186 
    187     for (i = 0; i < 100; i++) {
    188         if (data[i].aiocb && qatomic_read(&data[i].n) < 4) {
    189             if (sync) {
    190                 /* Canceling the others will be a blocking operation.  */
    191                 bdrv_aio_cancel(data[i].aiocb);
    192             } else {
    193                 bdrv_aio_cancel_async(data[i].aiocb);
    194             }
    195         }
    196     }
    197 
    198     /* Finish execution and execute any remaining callbacks.  */
    199     while (active > 0) {
    200         aio_poll(ctx, true);
    201     }
    202     g_assert_cmpint(active, ==, 0);
    203     for (i = 0; i < 100; i++) {
    204         g_assert(data[i].aiocb == NULL);
    205         switch (data[i].n) {
    206         case 0:
    207             fprintf(stderr, "Callback not canceled but never started?\n");
    208             abort();
    209         case 3:
    210             /* Couldn't be canceled asynchronously, must have completed.  */
    211             g_assert_cmpint(data[i].ret, ==, 0);
    212             break;
    213         case 4:
    214             /* Could be canceled asynchronously, never started.  */
    215             g_assert_cmpint(data[i].ret, ==, -ECANCELED);
    216             break;
    217         default:
    218             fprintf(stderr, "Callback aborted while running?\n");
    219             abort();
    220         }
    221     }
    222 }
    223 
    224 static void test_cancel(void)
    225 {
    226     do_test_cancel(true);
    227 }
    228 
    229 static void test_cancel_async(void)
    230 {
    231     do_test_cancel(false);
    232 }
    233 
    234 int main(int argc, char **argv)
    235 {
    236     qemu_init_main_loop(&error_abort);
    237     ctx = qemu_get_current_aio_context();
    238     pool = aio_get_thread_pool(ctx);
    239 
    240     g_test_init(&argc, &argv, NULL);
    241     g_test_add_func("/thread-pool/submit", test_submit);
    242     g_test_add_func("/thread-pool/submit-aio", test_submit_aio);
    243     g_test_add_func("/thread-pool/submit-co", test_submit_co);
    244     g_test_add_func("/thread-pool/submit-many", test_submit_many);
    245     g_test_add_func("/thread-pool/cancel", test_cancel);
    246     g_test_add_func("/thread-pool/cancel-async", test_cancel_async);
    247 
    248     return g_test_run();
    249 }