qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

thread-pool.c (10637B)


      1 /*
      2  * QEMU block layer thread pool
      3  *
      4  * Copyright IBM, Corp. 2008
      5  * Copyright Red Hat, Inc. 2012
      6  *
      7  * Authors:
      8  *  Anthony Liguori   <aliguori@us.ibm.com>
      9  *  Paolo Bonzini     <pbonzini@redhat.com>
     10  *
     11  * This work is licensed under the terms of the GNU GPL, version 2.  See
     12  * the COPYING file in the top-level directory.
     13  *
     14  * Contributions after 2012-01-13 are licensed under the terms of the
     15  * GNU GPL, version 2 or (at your option) any later version.
     16  */
     17 #include "qemu/osdep.h"
     18 #include "qemu/queue.h"
     19 #include "qemu/thread.h"
     20 #include "qemu/coroutine.h"
     21 #include "trace.h"
     22 #include "block/thread-pool.h"
     23 #include "qemu/main-loop.h"
     24 
     25 static void do_spawn_thread(ThreadPool *pool);
     26 
     27 typedef struct ThreadPoolElement ThreadPoolElement;
     28 
     29 enum ThreadState {
     30     THREAD_QUEUED,
     31     THREAD_ACTIVE,
     32     THREAD_DONE,
     33 };
     34 
     35 struct ThreadPoolElement {
     36     BlockAIOCB common;
     37     ThreadPool *pool;
     38     ThreadPoolFunc *func;
     39     void *arg;
     40 
     41     /* Moving state out of THREAD_QUEUED is protected by lock.  After
     42      * that, only the worker thread can write to it.  Reads and writes
     43      * of state and ret are ordered with memory barriers.
     44      */
     45     enum ThreadState state;
     46     int ret;
     47 
     48     /* Access to this list is protected by lock.  */
     49     QTAILQ_ENTRY(ThreadPoolElement) reqs;
     50 
     51     /* Access to this list is protected by the global mutex.  */
     52     QLIST_ENTRY(ThreadPoolElement) all;
     53 };
     54 
     55 struct ThreadPool {
     56     AioContext *ctx;
     57     QEMUBH *completion_bh;
     58     QemuMutex lock;
     59     QemuCond worker_stopped;
     60     QemuCond request_cond;
     61     QEMUBH *new_thread_bh;
     62 
     63     /* The following variables are only accessed from one AioContext. */
     64     QLIST_HEAD(, ThreadPoolElement) head;
     65 
     66     /* The following variables are protected by lock.  */
     67     QTAILQ_HEAD(, ThreadPoolElement) request_list;
     68     int cur_threads;
     69     int idle_threads;
     70     int new_threads;     /* backlog of threads we need to create */
     71     int pending_threads; /* threads created but not running yet */
     72     int min_threads;
     73     int max_threads;
     74 };
     75 
     76 static void *worker_thread(void *opaque)
     77 {
     78     ThreadPool *pool = opaque;
     79 
     80     qemu_mutex_lock(&pool->lock);
     81     pool->pending_threads--;
     82     do_spawn_thread(pool);
     83 
     84     while (pool->cur_threads <= pool->max_threads) {
     85         ThreadPoolElement *req;
     86         int ret;
     87 
     88         if (QTAILQ_EMPTY(&pool->request_list)) {
     89             pool->idle_threads++;
     90             ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
     91             pool->idle_threads--;
     92             if (ret == 0 &&
     93                 QTAILQ_EMPTY(&pool->request_list) &&
     94                 pool->cur_threads > pool->min_threads) {
     95                 /* Timed out + no work to do + no need for warm threads = exit.  */
     96                 break;
     97             }
     98             /*
     99              * Even if there was some work to do, check if there aren't
    100              * too many worker threads before picking it up.
    101              */
    102             continue;
    103         }
    104 
    105         req = QTAILQ_FIRST(&pool->request_list);
    106         QTAILQ_REMOVE(&pool->request_list, req, reqs);
    107         req->state = THREAD_ACTIVE;
    108         qemu_mutex_unlock(&pool->lock);
    109 
    110         ret = req->func(req->arg);
    111 
    112         req->ret = ret;
    113         /* Write ret before state.  */
    114         smp_wmb();
    115         req->state = THREAD_DONE;
    116 
    117         qemu_bh_schedule(pool->completion_bh);
    118         qemu_mutex_lock(&pool->lock);
    119     }
    120 
    121     pool->cur_threads--;
    122     qemu_cond_signal(&pool->worker_stopped);
    123     qemu_mutex_unlock(&pool->lock);
    124 
    125     /*
    126      * Wake up another thread, in case we got a wakeup but decided
    127      * to exit due to pool->cur_threads > pool->max_threads.
    128      */
    129     qemu_cond_signal(&pool->request_cond);
    130     return NULL;
    131 }
    132 
    133 static void do_spawn_thread(ThreadPool *pool)
    134 {
    135     QemuThread t;
    136 
    137     /* Runs with lock taken.  */
    138     if (!pool->new_threads) {
    139         return;
    140     }
    141 
    142     pool->new_threads--;
    143     pool->pending_threads++;
    144 
    145     qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED);
    146 }
    147 
    148 static void spawn_thread_bh_fn(void *opaque)
    149 {
    150     ThreadPool *pool = opaque;
    151 
    152     qemu_mutex_lock(&pool->lock);
    153     do_spawn_thread(pool);
    154     qemu_mutex_unlock(&pool->lock);
    155 }
    156 
    157 static void spawn_thread(ThreadPool *pool)
    158 {
    159     pool->cur_threads++;
    160     pool->new_threads++;
    161     /* If there are threads being created, they will spawn new workers, so
    162      * we don't spend time creating many threads in a loop holding a mutex or
    163      * starving the current vcpu.
    164      *
    165      * If there are no idle threads, ask the main thread to create one, so we
    166      * inherit the correct affinity instead of the vcpu affinity.
    167      */
    168     if (!pool->pending_threads) {
    169         qemu_bh_schedule(pool->new_thread_bh);
    170     }
    171 }
    172 
    173 static void thread_pool_completion_bh(void *opaque)
    174 {
    175     ThreadPool *pool = opaque;
    176     ThreadPoolElement *elem, *next;
    177 
    178     aio_context_acquire(pool->ctx);
    179 restart:
    180     QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
    181         if (elem->state != THREAD_DONE) {
    182             continue;
    183         }
    184 
    185         trace_thread_pool_complete(pool, elem, elem->common.opaque,
    186                                    elem->ret);
    187         QLIST_REMOVE(elem, all);
    188 
    189         if (elem->common.cb) {
    190             /* Read state before ret.  */
    191             smp_rmb();
    192 
    193             /* Schedule ourselves in case elem->common.cb() calls aio_poll() to
    194              * wait for another request that completed at the same time.
    195              */
    196             qemu_bh_schedule(pool->completion_bh);
    197 
    198             aio_context_release(pool->ctx);
    199             elem->common.cb(elem->common.opaque, elem->ret);
    200             aio_context_acquire(pool->ctx);
    201 
    202             /* We can safely cancel the completion_bh here regardless of someone
    203              * else having scheduled it meanwhile because we reenter the
    204              * completion function anyway (goto restart).
    205              */
    206             qemu_bh_cancel(pool->completion_bh);
    207 
    208             qemu_aio_unref(elem);
    209             goto restart;
    210         } else {
    211             qemu_aio_unref(elem);
    212         }
    213     }
    214     aio_context_release(pool->ctx);
    215 }
    216 
    217 static void thread_pool_cancel(BlockAIOCB *acb)
    218 {
    219     ThreadPoolElement *elem = (ThreadPoolElement *)acb;
    220     ThreadPool *pool = elem->pool;
    221 
    222     trace_thread_pool_cancel(elem, elem->common.opaque);
    223 
    224     QEMU_LOCK_GUARD(&pool->lock);
    225     if (elem->state == THREAD_QUEUED) {
    226         QTAILQ_REMOVE(&pool->request_list, elem, reqs);
    227         qemu_bh_schedule(pool->completion_bh);
    228 
    229         elem->state = THREAD_DONE;
    230         elem->ret = -ECANCELED;
    231     }
    232 
    233 }
    234 
    235 static AioContext *thread_pool_get_aio_context(BlockAIOCB *acb)
    236 {
    237     ThreadPoolElement *elem = (ThreadPoolElement *)acb;
    238     ThreadPool *pool = elem->pool;
    239     return pool->ctx;
    240 }
    241 
    242 static const AIOCBInfo thread_pool_aiocb_info = {
    243     .aiocb_size         = sizeof(ThreadPoolElement),
    244     .cancel_async       = thread_pool_cancel,
    245     .get_aio_context    = thread_pool_get_aio_context,
    246 };
    247 
    248 BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
    249         ThreadPoolFunc *func, void *arg,
    250         BlockCompletionFunc *cb, void *opaque)
    251 {
    252     ThreadPoolElement *req;
    253 
    254     req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
    255     req->func = func;
    256     req->arg = arg;
    257     req->state = THREAD_QUEUED;
    258     req->pool = pool;
    259 
    260     QLIST_INSERT_HEAD(&pool->head, req, all);
    261 
    262     trace_thread_pool_submit(pool, req, arg);
    263 
    264     qemu_mutex_lock(&pool->lock);
    265     if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
    266         spawn_thread(pool);
    267     }
    268     QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
    269     qemu_mutex_unlock(&pool->lock);
    270     qemu_cond_signal(&pool->request_cond);
    271     return &req->common;
    272 }
    273 
    274 typedef struct ThreadPoolCo {
    275     Coroutine *co;
    276     int ret;
    277 } ThreadPoolCo;
    278 
    279 static void thread_pool_co_cb(void *opaque, int ret)
    280 {
    281     ThreadPoolCo *co = opaque;
    282 
    283     co->ret = ret;
    284     aio_co_wake(co->co);
    285 }
    286 
    287 int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
    288                                        void *arg)
    289 {
    290     ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
    291     assert(qemu_in_coroutine());
    292     thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
    293     qemu_coroutine_yield();
    294     return tpc.ret;
    295 }
    296 
    297 void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
    298 {
    299     thread_pool_submit_aio(pool, func, arg, NULL, NULL);
    300 }
    301 
    302 void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
    303 {
    304     qemu_mutex_lock(&pool->lock);
    305 
    306     pool->min_threads = ctx->thread_pool_min;
    307     pool->max_threads = ctx->thread_pool_max;
    308 
    309     /*
    310      * We either have to:
    311      *  - Increase the number available of threads until over the min_threads
    312      *    threshold.
    313      *  - Bump the worker threads so that they exit, until under the max_threads
    314      *    threshold.
    315      *  - Do nothing. The current number of threads fall in between the min and
    316      *    max thresholds. We'll let the pool manage itself.
    317      */
    318     for (int i = pool->cur_threads; i < pool->min_threads; i++) {
    319         spawn_thread(pool);
    320     }
    321 
    322     for (int i = pool->cur_threads; i > pool->max_threads; i--) {
    323         qemu_cond_signal(&pool->request_cond);
    324     }
    325 
    326     qemu_mutex_unlock(&pool->lock);
    327 }
    328 
    329 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
    330 {
    331     if (!ctx) {
    332         ctx = qemu_get_aio_context();
    333     }
    334 
    335     memset(pool, 0, sizeof(*pool));
    336     pool->ctx = ctx;
    337     pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool);
    338     qemu_mutex_init(&pool->lock);
    339     qemu_cond_init(&pool->worker_stopped);
    340     qemu_cond_init(&pool->request_cond);
    341     pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
    342 
    343     QLIST_INIT(&pool->head);
    344     QTAILQ_INIT(&pool->request_list);
    345 
    346     thread_pool_update_params(pool, ctx);
    347 }
    348 
    349 ThreadPool *thread_pool_new(AioContext *ctx)
    350 {
    351     ThreadPool *pool = g_new(ThreadPool, 1);
    352     thread_pool_init_one(pool, ctx);
    353     return pool;
    354 }
    355 
    356 void thread_pool_free(ThreadPool *pool)
    357 {
    358     if (!pool) {
    359         return;
    360     }
    361 
    362     assert(QLIST_EMPTY(&pool->head));
    363 
    364     qemu_mutex_lock(&pool->lock);
    365 
    366     /* Stop new threads from spawning */
    367     qemu_bh_delete(pool->new_thread_bh);
    368     pool->cur_threads -= pool->new_threads;
    369     pool->new_threads = 0;
    370 
    371     /* Wait for worker threads to terminate */
    372     pool->max_threads = 0;
    373     qemu_cond_broadcast(&pool->request_cond);
    374     while (pool->cur_threads > 0) {
    375         qemu_cond_wait(&pool->worker_stopped, &pool->lock);
    376     }
    377 
    378     qemu_mutex_unlock(&pool->lock);
    379 
    380     qemu_bh_delete(pool->completion_bh);
    381     qemu_cond_destroy(&pool->request_cond);
    382     qemu_cond_destroy(&pool->worker_stopped);
    383     qemu_mutex_destroy(&pool->lock);
    384     g_free(pool);
    385 }