qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

io_uring.c (12904B)


      1 /*
      2  * Linux io_uring support.
      3  *
      4  * Copyright (C) 2009 IBM, Corp.
      5  * Copyright (C) 2009 Red Hat, Inc.
      6  * Copyright (C) 2019 Aarushi Mehta
      7  *
      8  * This work is licensed under the terms of the GNU GPL, version 2 or later.
      9  * See the COPYING file in the top-level directory.
     10  */
     11 #include "qemu/osdep.h"
     12 #include <liburing.h>
     13 #include "block/aio.h"
     14 #include "qemu/queue.h"
     15 #include "block/block.h"
     16 #include "block/raw-aio.h"
     17 #include "qemu/coroutine.h"
     18 #include "qapi/error.h"
     19 #include "trace.h"
     20 
     21 /* io_uring ring size */
     22 #define MAX_ENTRIES 128
     23 
     24 typedef struct LuringAIOCB {
     25     Coroutine *co;
     26     struct io_uring_sqe sqeq;
     27     ssize_t ret;
     28     QEMUIOVector *qiov;
     29     bool is_read;
     30     QSIMPLEQ_ENTRY(LuringAIOCB) next;
     31 
     32     /*
     33      * Buffered reads may require resubmission, see
     34      * luring_resubmit_short_read().
     35      */
     36     int total_read;
     37     QEMUIOVector resubmit_qiov;
     38 } LuringAIOCB;
     39 
     40 typedef struct LuringQueue {
     41     int plugged;
     42     unsigned int in_queue;
     43     unsigned int in_flight;
     44     bool blocked;
     45     QSIMPLEQ_HEAD(, LuringAIOCB) submit_queue;
     46 } LuringQueue;
     47 
     48 typedef struct LuringState {
     49     AioContext *aio_context;
     50 
     51     struct io_uring ring;
     52 
     53     /* io queue for submit at batch.  Protected by AioContext lock. */
     54     LuringQueue io_q;
     55 
     56     /* I/O completion processing.  Only runs in I/O thread.  */
     57     QEMUBH *completion_bh;
     58 } LuringState;
     59 
     60 /**
     61  * luring_resubmit:
     62  *
     63  * Resubmit a request by appending it to submit_queue.  The caller must ensure
     64  * that ioq_submit() is called later so that submit_queue requests are started.
     65  */
     66 static void luring_resubmit(LuringState *s, LuringAIOCB *luringcb)
     67 {
     68     QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next);
     69     s->io_q.in_queue++;
     70 }
     71 
     72 /**
     73  * luring_resubmit_short_read:
     74  *
     75  * Short reads are rare but may occur. The remaining read request needs to be
     76  * resubmitted.
     77  */
     78 static void luring_resubmit_short_read(LuringState *s, LuringAIOCB *luringcb,
     79                                        int nread)
     80 {
     81     QEMUIOVector *resubmit_qiov;
     82     size_t remaining;
     83 
     84     trace_luring_resubmit_short_read(s, luringcb, nread);
     85 
     86     /* Update read position */
     87     luringcb->total_read += nread;
     88     remaining = luringcb->qiov->size - luringcb->total_read;
     89 
     90     /* Shorten qiov */
     91     resubmit_qiov = &luringcb->resubmit_qiov;
     92     if (resubmit_qiov->iov == NULL) {
     93         qemu_iovec_init(resubmit_qiov, luringcb->qiov->niov);
     94     } else {
     95         qemu_iovec_reset(resubmit_qiov);
     96     }
     97     qemu_iovec_concat(resubmit_qiov, luringcb->qiov, luringcb->total_read,
     98                       remaining);
     99 
    100     /* Update sqe */
    101     luringcb->sqeq.off += nread;
    102     luringcb->sqeq.addr = (__u64)(uintptr_t)luringcb->resubmit_qiov.iov;
    103     luringcb->sqeq.len = luringcb->resubmit_qiov.niov;
    104 
    105     luring_resubmit(s, luringcb);
    106 }
    107 
    108 /**
    109  * luring_process_completions:
    110  * @s: AIO state
    111  *
    112  * Fetches completed I/O requests, consumes cqes and invokes their callbacks
    113  * The function is somewhat tricky because it supports nested event loops, for
    114  * example when a request callback invokes aio_poll().
    115  *
    116  * Function schedules BH completion so it  can be called again in a nested
    117  * event loop.  When there are no events left  to complete the BH is being
    118  * canceled.
    119  *
    120  */
    121 static void luring_process_completions(LuringState *s)
    122 {
    123     struct io_uring_cqe *cqes;
    124     int total_bytes;
    125     /*
    126      * Request completion callbacks can run the nested event loop.
    127      * Schedule ourselves so the nested event loop will "see" remaining
    128      * completed requests and process them.  Without this, completion
    129      * callbacks that wait for other requests using a nested event loop
    130      * would hang forever.
    131      *
    132      * This workaround is needed because io_uring uses poll_wait, which
    133      * is woken up when new events are added to the uring, thus polling on
    134      * the same uring fd will block unless more events are received.
    135      *
    136      * Other leaf block drivers (drivers that access the data themselves)
    137      * are networking based, so they poll sockets for data and run the
    138      * correct coroutine.
    139      */
    140     qemu_bh_schedule(s->completion_bh);
    141 
    142     while (io_uring_peek_cqe(&s->ring, &cqes) == 0) {
    143         LuringAIOCB *luringcb;
    144         int ret;
    145 
    146         if (!cqes) {
    147             break;
    148         }
    149 
    150         luringcb = io_uring_cqe_get_data(cqes);
    151         ret = cqes->res;
    152         io_uring_cqe_seen(&s->ring, cqes);
    153         cqes = NULL;
    154 
    155         /* Change counters one-by-one because we can be nested. */
    156         s->io_q.in_flight--;
    157         trace_luring_process_completion(s, luringcb, ret);
    158 
    159         /* total_read is non-zero only for resubmitted read requests */
    160         total_bytes = ret + luringcb->total_read;
    161 
    162         if (ret < 0) {
    163             /*
    164              * Only writev/readv/fsync requests on regular files or host block
    165              * devices are submitted. Therefore -EAGAIN is not expected but it's
    166              * known to happen sometimes with Linux SCSI. Submit again and hope
    167              * the request completes successfully.
    168              *
    169              * For more information, see:
    170              * https://lore.kernel.org/io-uring/20210727165811.284510-3-axboe@kernel.dk/T/#u
    171              *
    172              * If the code is changed to submit other types of requests in the
    173              * future, then this workaround may need to be extended to deal with
    174              * genuine -EAGAIN results that should not be resubmitted
    175              * immediately.
    176              */
    177             if (ret == -EINTR || ret == -EAGAIN) {
    178                 luring_resubmit(s, luringcb);
    179                 continue;
    180             }
    181         } else if (!luringcb->qiov) {
    182             goto end;
    183         } else if (total_bytes == luringcb->qiov->size) {
    184             ret = 0;
    185         /* Only read/write */
    186         } else {
    187             /* Short Read/Write */
    188             if (luringcb->is_read) {
    189                 if (ret > 0) {
    190                     luring_resubmit_short_read(s, luringcb, ret);
    191                     continue;
    192                 } else {
    193                     /* Pad with zeroes */
    194                     qemu_iovec_memset(luringcb->qiov, total_bytes, 0,
    195                                       luringcb->qiov->size - total_bytes);
    196                     ret = 0;
    197                 }
    198             } else {
    199                 ret = -ENOSPC;
    200             }
    201         }
    202 end:
    203         luringcb->ret = ret;
    204         qemu_iovec_destroy(&luringcb->resubmit_qiov);
    205 
    206         /*
    207          * If the coroutine is already entered it must be in ioq_submit()
    208          * and will notice luringcb->ret has been filled in when it
    209          * eventually runs later. Coroutines cannot be entered recursively
    210          * so avoid doing that!
    211          */
    212         if (!qemu_coroutine_entered(luringcb->co)) {
    213             aio_co_wake(luringcb->co);
    214         }
    215     }
    216     qemu_bh_cancel(s->completion_bh);
    217 }
    218 
    219 static int ioq_submit(LuringState *s)
    220 {
    221     int ret = 0;
    222     LuringAIOCB *luringcb, *luringcb_next;
    223 
    224     while (s->io_q.in_queue > 0) {
    225         /*
    226          * Try to fetch sqes from the ring for requests waiting in
    227          * the overflow queue
    228          */
    229         QSIMPLEQ_FOREACH_SAFE(luringcb, &s->io_q.submit_queue, next,
    230                               luringcb_next) {
    231             struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
    232             if (!sqes) {
    233                 break;
    234             }
    235             /* Prep sqe for submission */
    236             *sqes = luringcb->sqeq;
    237             QSIMPLEQ_REMOVE_HEAD(&s->io_q.submit_queue, next);
    238         }
    239         ret = io_uring_submit(&s->ring);
    240         trace_luring_io_uring_submit(s, ret);
    241         /* Prevent infinite loop if submission is refused */
    242         if (ret <= 0) {
    243             if (ret == -EAGAIN || ret == -EINTR) {
    244                 continue;
    245             }
    246             break;
    247         }
    248         s->io_q.in_flight += ret;
    249         s->io_q.in_queue  -= ret;
    250     }
    251     s->io_q.blocked = (s->io_q.in_queue > 0);
    252 
    253     if (s->io_q.in_flight) {
    254         /*
    255          * We can try to complete something just right away if there are
    256          * still requests in-flight.
    257          */
    258         luring_process_completions(s);
    259     }
    260     return ret;
    261 }
    262 
    263 static void luring_process_completions_and_submit(LuringState *s)
    264 {
    265     aio_context_acquire(s->aio_context);
    266     luring_process_completions(s);
    267 
    268     if (!s->io_q.plugged && s->io_q.in_queue > 0) {
    269         ioq_submit(s);
    270     }
    271     aio_context_release(s->aio_context);
    272 }
    273 
    274 static void qemu_luring_completion_bh(void *opaque)
    275 {
    276     LuringState *s = opaque;
    277     luring_process_completions_and_submit(s);
    278 }
    279 
    280 static void qemu_luring_completion_cb(void *opaque)
    281 {
    282     LuringState *s = opaque;
    283     luring_process_completions_and_submit(s);
    284 }
    285 
    286 static bool qemu_luring_poll_cb(void *opaque)
    287 {
    288     LuringState *s = opaque;
    289 
    290     return io_uring_cq_ready(&s->ring);
    291 }
    292 
    293 static void qemu_luring_poll_ready(void *opaque)
    294 {
    295     LuringState *s = opaque;
    296 
    297     luring_process_completions_and_submit(s);
    298 }
    299 
    300 static void ioq_init(LuringQueue *io_q)
    301 {
    302     QSIMPLEQ_INIT(&io_q->submit_queue);
    303     io_q->plugged = 0;
    304     io_q->in_queue = 0;
    305     io_q->in_flight = 0;
    306     io_q->blocked = false;
    307 }
    308 
    309 void luring_io_plug(BlockDriverState *bs, LuringState *s)
    310 {
    311     trace_luring_io_plug(s);
    312     s->io_q.plugged++;
    313 }
    314 
    315 void luring_io_unplug(BlockDriverState *bs, LuringState *s)
    316 {
    317     assert(s->io_q.plugged);
    318     trace_luring_io_unplug(s, s->io_q.blocked, s->io_q.plugged,
    319                            s->io_q.in_queue, s->io_q.in_flight);
    320     if (--s->io_q.plugged == 0 &&
    321         !s->io_q.blocked && s->io_q.in_queue > 0) {
    322         ioq_submit(s);
    323     }
    324 }
    325 
    326 /**
    327  * luring_do_submit:
    328  * @fd: file descriptor for I/O
    329  * @luringcb: AIO control block
    330  * @s: AIO state
    331  * @offset: offset for request
    332  * @type: type of request
    333  *
    334  * Fetches sqes from ring, adds to pending queue and preps them
    335  *
    336  */
    337 static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s,
    338                             uint64_t offset, int type)
    339 {
    340     int ret;
    341     struct io_uring_sqe *sqes = &luringcb->sqeq;
    342 
    343     switch (type) {
    344     case QEMU_AIO_WRITE:
    345         io_uring_prep_writev(sqes, fd, luringcb->qiov->iov,
    346                              luringcb->qiov->niov, offset);
    347         break;
    348     case QEMU_AIO_READ:
    349         io_uring_prep_readv(sqes, fd, luringcb->qiov->iov,
    350                             luringcb->qiov->niov, offset);
    351         break;
    352     case QEMU_AIO_FLUSH:
    353         io_uring_prep_fsync(sqes, fd, IORING_FSYNC_DATASYNC);
    354         break;
    355     default:
    356         fprintf(stderr, "%s: invalid AIO request type, aborting 0x%x.\n",
    357                         __func__, type);
    358         abort();
    359     }
    360     io_uring_sqe_set_data(sqes, luringcb);
    361 
    362     QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next);
    363     s->io_q.in_queue++;
    364     trace_luring_do_submit(s, s->io_q.blocked, s->io_q.plugged,
    365                            s->io_q.in_queue, s->io_q.in_flight);
    366     if (!s->io_q.blocked &&
    367         (!s->io_q.plugged ||
    368          s->io_q.in_flight + s->io_q.in_queue >= MAX_ENTRIES)) {
    369         ret = ioq_submit(s);
    370         trace_luring_do_submit_done(s, ret);
    371         return ret;
    372     }
    373     return 0;
    374 }
    375 
    376 int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd,
    377                                   uint64_t offset, QEMUIOVector *qiov, int type)
    378 {
    379     int ret;
    380     LuringAIOCB luringcb = {
    381         .co         = qemu_coroutine_self(),
    382         .ret        = -EINPROGRESS,
    383         .qiov       = qiov,
    384         .is_read    = (type == QEMU_AIO_READ),
    385     };
    386     trace_luring_co_submit(bs, s, &luringcb, fd, offset, qiov ? qiov->size : 0,
    387                            type);
    388     ret = luring_do_submit(fd, &luringcb, s, offset, type);
    389 
    390     if (ret < 0) {
    391         return ret;
    392     }
    393 
    394     if (luringcb.ret == -EINPROGRESS) {
    395         qemu_coroutine_yield();
    396     }
    397     return luringcb.ret;
    398 }
    399 
    400 void luring_detach_aio_context(LuringState *s, AioContext *old_context)
    401 {
    402     aio_set_fd_handler(old_context, s->ring.ring_fd, false,
    403                        NULL, NULL, NULL, NULL, s);
    404     qemu_bh_delete(s->completion_bh);
    405     s->aio_context = NULL;
    406 }
    407 
    408 void luring_attach_aio_context(LuringState *s, AioContext *new_context)
    409 {
    410     s->aio_context = new_context;
    411     s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s);
    412     aio_set_fd_handler(s->aio_context, s->ring.ring_fd, false,
    413                        qemu_luring_completion_cb, NULL,
    414                        qemu_luring_poll_cb, qemu_luring_poll_ready, s);
    415 }
    416 
    417 LuringState *luring_init(Error **errp)
    418 {
    419     int rc;
    420     LuringState *s = g_new0(LuringState, 1);
    421     struct io_uring *ring = &s->ring;
    422 
    423     trace_luring_init_state(s, sizeof(*s));
    424 
    425     rc = io_uring_queue_init(MAX_ENTRIES, ring, 0);
    426     if (rc < 0) {
    427         error_setg_errno(errp, errno, "failed to init linux io_uring ring");
    428         g_free(s);
    429         return NULL;
    430     }
    431 
    432     ioq_init(&s->io_q);
    433     return s;
    434 
    435 }
    436 
    437 void luring_cleanup(LuringState *s)
    438 {
    439     io_uring_queue_exit(&s->ring);
    440     trace_luring_cleanup_state(s);
    441     g_free(s);
    442 }