qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

vhost-user-server.c (13805B)


      1 /*
      2  * Sharing QEMU devices via vhost-user protocol
      3  *
      4  * Copyright (c) Coiby Xu <coiby.xu@gmail.com>.
      5  * Copyright (c) 2020 Red Hat, Inc.
      6  *
      7  * This work is licensed under the terms of the GNU GPL, version 2 or
      8  * later.  See the COPYING file in the top-level directory.
      9  */
     10 #include "qemu/osdep.h"
     11 #include "qemu/main-loop.h"
     12 #include "qemu/vhost-user-server.h"
     13 #include "block/aio-wait.h"
     14 
     15 /*
     16  * Theory of operation:
     17  *
     18  * VuServer is started and stopped by vhost_user_server_start() and
     19  * vhost_user_server_stop() from the main loop thread. Starting the server
     20  * opens a vhost-user UNIX domain socket and listens for incoming connections.
     21  * Only one connection is allowed at a time.
     22  *
     23  * The connection is handled by the vu_client_trip() coroutine in the
     24  * VuServer->ctx AioContext. The coroutine consists of a vu_dispatch() loop
     25  * where libvhost-user calls vu_message_read() to receive the next vhost-user
     26  * protocol messages over the UNIX domain socket.
     27  *
     28  * When virtqueues are set up libvhost-user calls set_watch() to monitor kick
     29  * fds. These fds are also handled in the VuServer->ctx AioContext.
     30  *
     31  * Both vu_client_trip() and kick fd monitoring can be stopped by shutting down
     32  * the socket connection. Shutting down the socket connection causes
     33  * vu_message_read() to fail since no more data can be received from the socket.
     34  * After vu_dispatch() fails, vu_client_trip() calls vu_deinit() to stop
     35  * libvhost-user before terminating the coroutine. vu_deinit() calls
     36  * remove_watch() to stop monitoring kick fds and this stops virtqueue
     37  * processing.
     38  *
     39  * When vu_client_trip() has finished cleaning up it schedules a BH in the main
     40  * loop thread to accept the next client connection.
     41  *
     42  * When libvhost-user detects an error it calls panic_cb() and sets the
     43  * dev->broken flag. Both vu_client_trip() and kick fd processing stop when
     44  * the dev->broken flag is set.
     45  *
     46  * It is possible to switch AioContexts using
     47  * vhost_user_server_detach_aio_context() and
     48  * vhost_user_server_attach_aio_context(). They stop monitoring fds in the old
     49  * AioContext and resume monitoring in the new AioContext. The vu_client_trip()
     50  * coroutine remains in a yielded state during the switch. This is made
     51  * possible by QIOChannel's support for spurious coroutine re-entry in
     52  * qio_channel_yield(). The coroutine will restart I/O when re-entered from the
     53  * new AioContext.
     54  */
     55 
     56 static void vmsg_close_fds(VhostUserMsg *vmsg)
     57 {
     58     int i;
     59     for (i = 0; i < vmsg->fd_num; i++) {
     60         close(vmsg->fds[i]);
     61     }
     62 }
     63 
     64 static void vmsg_unblock_fds(VhostUserMsg *vmsg)
     65 {
     66     int i;
     67     for (i = 0; i < vmsg->fd_num; i++) {
     68         qemu_socket_set_nonblock(vmsg->fds[i]);
     69     }
     70 }
     71 
     72 static void panic_cb(VuDev *vu_dev, const char *buf)
     73 {
     74     error_report("vu_panic: %s", buf);
     75 }
     76 
     77 void vhost_user_server_ref(VuServer *server)
     78 {
     79     assert(!server->wait_idle);
     80     server->refcount++;
     81 }
     82 
     83 void vhost_user_server_unref(VuServer *server)
     84 {
     85     server->refcount--;
     86     if (server->wait_idle && !server->refcount) {
     87         aio_co_wake(server->co_trip);
     88     }
     89 }
     90 
     91 static bool coroutine_fn
     92 vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg)
     93 {
     94     struct iovec iov = {
     95         .iov_base = (char *)vmsg,
     96         .iov_len = VHOST_USER_HDR_SIZE,
     97     };
     98     int rc, read_bytes = 0;
     99     Error *local_err = NULL;
    100     const size_t max_fds = G_N_ELEMENTS(vmsg->fds);
    101     VuServer *server = container_of(vu_dev, VuServer, vu_dev);
    102     QIOChannel *ioc = server->ioc;
    103 
    104     vmsg->fd_num = 0;
    105     if (!ioc) {
    106         error_report_err(local_err);
    107         goto fail;
    108     }
    109 
    110     assert(qemu_in_coroutine());
    111     do {
    112         size_t nfds = 0;
    113         int *fds = NULL;
    114 
    115         /*
    116          * qio_channel_readv_full may have short reads, keeping calling it
    117          * until getting VHOST_USER_HDR_SIZE or 0 bytes in total
    118          */
    119         rc = qio_channel_readv_full(ioc, &iov, 1, &fds, &nfds, &local_err);
    120         if (rc < 0) {
    121             if (rc == QIO_CHANNEL_ERR_BLOCK) {
    122                 assert(local_err == NULL);
    123                 qio_channel_yield(ioc, G_IO_IN);
    124                 continue;
    125             } else {
    126                 error_report_err(local_err);
    127                 goto fail;
    128             }
    129         }
    130 
    131         if (nfds > 0) {
    132             if (vmsg->fd_num + nfds > max_fds) {
    133                 error_report("A maximum of %zu fds are allowed, "
    134                              "however got %zu fds now",
    135                              max_fds, vmsg->fd_num + nfds);
    136                 g_free(fds);
    137                 goto fail;
    138             }
    139             memcpy(vmsg->fds + vmsg->fd_num, fds, nfds * sizeof(vmsg->fds[0]));
    140             vmsg->fd_num += nfds;
    141             g_free(fds);
    142         }
    143 
    144         if (rc == 0) { /* socket closed */
    145             goto fail;
    146         }
    147 
    148         iov.iov_base += rc;
    149         iov.iov_len -= rc;
    150         read_bytes += rc;
    151     } while (read_bytes != VHOST_USER_HDR_SIZE);
    152 
    153     /* qio_channel_readv_full will make socket fds blocking, unblock them */
    154     vmsg_unblock_fds(vmsg);
    155     if (vmsg->size > sizeof(vmsg->payload)) {
    156         error_report("Error: too big message request: %d, "
    157                      "size: vmsg->size: %u, "
    158                      "while sizeof(vmsg->payload) = %zu",
    159                      vmsg->request, vmsg->size, sizeof(vmsg->payload));
    160         goto fail;
    161     }
    162 
    163     struct iovec iov_payload = {
    164         .iov_base = (char *)&vmsg->payload,
    165         .iov_len = vmsg->size,
    166     };
    167     if (vmsg->size) {
    168         rc = qio_channel_readv_all_eof(ioc, &iov_payload, 1, &local_err);
    169         if (rc != 1) {
    170             if (local_err) {
    171                 error_report_err(local_err);
    172             }
    173             goto fail;
    174         }
    175     }
    176 
    177     return true;
    178 
    179 fail:
    180     vmsg_close_fds(vmsg);
    181 
    182     return false;
    183 }
    184 
    185 static coroutine_fn void vu_client_trip(void *opaque)
    186 {
    187     VuServer *server = opaque;
    188     VuDev *vu_dev = &server->vu_dev;
    189 
    190     while (!vu_dev->broken && vu_dispatch(vu_dev)) {
    191         /* Keep running */
    192     }
    193 
    194     if (server->refcount) {
    195         /* Wait for requests to complete before we can unmap the memory */
    196         server->wait_idle = true;
    197         qemu_coroutine_yield();
    198         server->wait_idle = false;
    199     }
    200     assert(server->refcount == 0);
    201 
    202     vu_deinit(vu_dev);
    203 
    204     /* vu_deinit() should have called remove_watch() */
    205     assert(QTAILQ_EMPTY(&server->vu_fd_watches));
    206 
    207     object_unref(OBJECT(server->sioc));
    208     server->sioc = NULL;
    209 
    210     object_unref(OBJECT(server->ioc));
    211     server->ioc = NULL;
    212 
    213     server->co_trip = NULL;
    214     if (server->restart_listener_bh) {
    215         qemu_bh_schedule(server->restart_listener_bh);
    216     }
    217     aio_wait_kick();
    218 }
    219 
    220 /*
    221  * a wrapper for vu_kick_cb
    222  *
    223  * since aio_dispatch can only pass one user data pointer to the
    224  * callback function, pack VuDev and pvt into a struct. Then unpack it
    225  * and pass them to vu_kick_cb
    226  */
    227 static void kick_handler(void *opaque)
    228 {
    229     VuFdWatch *vu_fd_watch = opaque;
    230     VuDev *vu_dev = vu_fd_watch->vu_dev;
    231 
    232     vu_fd_watch->cb(vu_dev, 0, vu_fd_watch->pvt);
    233 
    234     /* Stop vu_client_trip() if an error occurred in vu_fd_watch->cb() */
    235     if (vu_dev->broken) {
    236         VuServer *server = container_of(vu_dev, VuServer, vu_dev);
    237 
    238         qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
    239     }
    240 }
    241 
    242 static VuFdWatch *find_vu_fd_watch(VuServer *server, int fd)
    243 {
    244 
    245     VuFdWatch *vu_fd_watch, *next;
    246     QTAILQ_FOREACH_SAFE(vu_fd_watch, &server->vu_fd_watches, next, next) {
    247         if (vu_fd_watch->fd == fd) {
    248             return vu_fd_watch;
    249         }
    250     }
    251     return NULL;
    252 }
    253 
    254 static void
    255 set_watch(VuDev *vu_dev, int fd, int vu_evt,
    256           vu_watch_cb cb, void *pvt)
    257 {
    258 
    259     VuServer *server = container_of(vu_dev, VuServer, vu_dev);
    260     g_assert(vu_dev);
    261     g_assert(fd >= 0);
    262     g_assert(cb);
    263 
    264     VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd);
    265 
    266     if (!vu_fd_watch) {
    267         VuFdWatch *vu_fd_watch = g_new0(VuFdWatch, 1);
    268 
    269         QTAILQ_INSERT_TAIL(&server->vu_fd_watches, vu_fd_watch, next);
    270 
    271         vu_fd_watch->fd = fd;
    272         vu_fd_watch->cb = cb;
    273         qemu_socket_set_nonblock(fd);
    274         aio_set_fd_handler(server->ioc->ctx, fd, true, kick_handler,
    275                            NULL, NULL, NULL, vu_fd_watch);
    276         vu_fd_watch->vu_dev = vu_dev;
    277         vu_fd_watch->pvt = pvt;
    278     }
    279 }
    280 
    281 
    282 static void remove_watch(VuDev *vu_dev, int fd)
    283 {
    284     VuServer *server;
    285     g_assert(vu_dev);
    286     g_assert(fd >= 0);
    287 
    288     server = container_of(vu_dev, VuServer, vu_dev);
    289 
    290     VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd);
    291 
    292     if (!vu_fd_watch) {
    293         return;
    294     }
    295     aio_set_fd_handler(server->ioc->ctx, fd, true,
    296                        NULL, NULL, NULL, NULL, NULL);
    297 
    298     QTAILQ_REMOVE(&server->vu_fd_watches, vu_fd_watch, next);
    299     g_free(vu_fd_watch);
    300 }
    301 
    302 
    303 static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
    304                       gpointer opaque)
    305 {
    306     VuServer *server = opaque;
    307 
    308     if (server->sioc) {
    309         warn_report("Only one vhost-user client is allowed to "
    310                     "connect the server one time");
    311         return;
    312     }
    313 
    314     if (!vu_init(&server->vu_dev, server->max_queues, sioc->fd, panic_cb,
    315                  vu_message_read, set_watch, remove_watch, server->vu_iface)) {
    316         error_report("Failed to initialize libvhost-user");
    317         return;
    318     }
    319 
    320     /*
    321      * Unset the callback function for network listener to make another
    322      * vhost-user client keeping waiting until this client disconnects
    323      */
    324     qio_net_listener_set_client_func(server->listener,
    325                                      NULL,
    326                                      NULL,
    327                                      NULL);
    328     server->sioc = sioc;
    329     /*
    330      * Increase the object reference, so sioc will not freed by
    331      * qio_net_listener_channel_func which will call object_unref(OBJECT(sioc))
    332      */
    333     object_ref(OBJECT(server->sioc));
    334     qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client");
    335     server->ioc = QIO_CHANNEL(sioc);
    336     object_ref(OBJECT(server->ioc));
    337 
    338     /* TODO vu_message_write() spins if non-blocking! */
    339     qio_channel_set_blocking(server->ioc, false, NULL);
    340 
    341     server->co_trip = qemu_coroutine_create(vu_client_trip, server);
    342 
    343     aio_context_acquire(server->ctx);
    344     vhost_user_server_attach_aio_context(server, server->ctx);
    345     aio_context_release(server->ctx);
    346 }
    347 
    348 void vhost_user_server_stop(VuServer *server)
    349 {
    350     aio_context_acquire(server->ctx);
    351 
    352     qemu_bh_delete(server->restart_listener_bh);
    353     server->restart_listener_bh = NULL;
    354 
    355     if (server->sioc) {
    356         VuFdWatch *vu_fd_watch;
    357 
    358         QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
    359             aio_set_fd_handler(server->ctx, vu_fd_watch->fd, true,
    360                                NULL, NULL, NULL, NULL, vu_fd_watch);
    361         }
    362 
    363         qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
    364 
    365         AIO_WAIT_WHILE(server->ctx, server->co_trip);
    366     }
    367 
    368     aio_context_release(server->ctx);
    369 
    370     if (server->listener) {
    371         qio_net_listener_disconnect(server->listener);
    372         object_unref(OBJECT(server->listener));
    373     }
    374 }
    375 
    376 /*
    377  * Allow the next client to connect to the server. Called from a BH in the main
    378  * loop.
    379  */
    380 static void restart_listener_bh(void *opaque)
    381 {
    382     VuServer *server = opaque;
    383 
    384     qio_net_listener_set_client_func(server->listener, vu_accept, server,
    385                                      NULL);
    386 }
    387 
    388 /* Called with ctx acquired */
    389 void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx)
    390 {
    391     VuFdWatch *vu_fd_watch;
    392 
    393     server->ctx = ctx;
    394 
    395     if (!server->sioc) {
    396         return;
    397     }
    398 
    399     qio_channel_attach_aio_context(server->ioc, ctx);
    400 
    401     QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
    402         aio_set_fd_handler(ctx, vu_fd_watch->fd, true, kick_handler, NULL,
    403                            NULL, NULL, vu_fd_watch);
    404     }
    405 
    406     aio_co_schedule(ctx, server->co_trip);
    407 }
    408 
    409 /* Called with server->ctx acquired */
    410 void vhost_user_server_detach_aio_context(VuServer *server)
    411 {
    412     if (server->sioc) {
    413         VuFdWatch *vu_fd_watch;
    414 
    415         QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
    416             aio_set_fd_handler(server->ctx, vu_fd_watch->fd, true,
    417                                NULL, NULL, NULL, NULL, vu_fd_watch);
    418         }
    419 
    420         qio_channel_detach_aio_context(server->ioc);
    421     }
    422 
    423     server->ctx = NULL;
    424 }
    425 
    426 bool vhost_user_server_start(VuServer *server,
    427                              SocketAddress *socket_addr,
    428                              AioContext *ctx,
    429                              uint16_t max_queues,
    430                              const VuDevIface *vu_iface,
    431                              Error **errp)
    432 {
    433     QEMUBH *bh;
    434     QIONetListener *listener;
    435 
    436     if (socket_addr->type != SOCKET_ADDRESS_TYPE_UNIX &&
    437         socket_addr->type != SOCKET_ADDRESS_TYPE_FD) {
    438         error_setg(errp, "Only socket address types 'unix' and 'fd' are supported");
    439         return false;
    440     }
    441 
    442     listener = qio_net_listener_new();
    443     if (qio_net_listener_open_sync(listener, socket_addr, 1,
    444                                    errp) < 0) {
    445         object_unref(OBJECT(listener));
    446         return false;
    447     }
    448 
    449     bh = qemu_bh_new(restart_listener_bh, server);
    450 
    451     /* zero out unspecified fields */
    452     *server = (VuServer) {
    453         .listener              = listener,
    454         .restart_listener_bh   = bh,
    455         .vu_iface              = vu_iface,
    456         .max_queues            = max_queues,
    457         .ctx                   = ctx,
    458     };
    459 
    460     qio_net_listener_set_name(server->listener, "vhost-user-backend-listener");
    461 
    462     qio_net_listener_set_client_func(server->listener,
    463                                      vu_accept,
    464                                      server,
    465                                      NULL);
    466 
    467     QTAILQ_INIT(&server->vu_fd_watches);
    468     return true;
    469 }