qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

fuse_virtio.c (32667B)


      1 /*
      2  * virtio-fs glue for FUSE
      3  * Copyright (C) 2018 Red Hat, Inc. and/or its affiliates
      4  *
      5  * Authors:
      6  *   Dave Gilbert  <dgilbert@redhat.com>
      7  *
      8  * Implements the glue between libfuse and libvhost-user
      9  *
     10  * This program can be distributed under the terms of the GNU LGPLv2.
     11  * See the file COPYING.LIB
     12  */
     13 
     14 #include "qemu/osdep.h"
     15 #include "qemu/iov.h"
     16 #include "qapi/error.h"
     17 #include "fuse_i.h"
     18 #include "standard-headers/linux/fuse.h"
     19 #include "fuse_misc.h"
     20 #include "fuse_opt.h"
     21 #include "fuse_virtio.h"
     22 
     23 #include <sys/eventfd.h>
     24 #include <sys/socket.h>
     25 #include <sys/un.h>
     26 #include <grp.h>
     27 
     28 #include "libvhost-user.h"
     29 
     30 struct fv_VuDev;
     31 struct fv_QueueInfo {
     32     pthread_t thread;
     33     /*
     34      * This lock protects the VuVirtq preventing races between
     35      * fv_queue_thread() and fv_queue_worker().
     36      */
     37     pthread_mutex_t vq_lock;
     38 
     39     struct fv_VuDev *virtio_dev;
     40 
     41     /* Our queue index, corresponds to array position */
     42     int qidx;
     43     int kick_fd;
     44     int kill_fd; /* For killing the thread */
     45 };
     46 
     47 /* A FUSE request */
     48 typedef struct {
     49     VuVirtqElement elem;
     50     struct fuse_chan ch;
     51 
     52     /* Used to complete requests that involve no reply */
     53     bool reply_sent;
     54 } FVRequest;
     55 
     56 /*
     57  * We pass the dev element into libvhost-user
     58  * and then use it to get back to the outer
     59  * container for other data.
     60  */
     61 struct fv_VuDev {
     62     VuDev dev;
     63     struct fuse_session *se;
     64 
     65     /*
     66      * Either handle virtqueues or vhost-user protocol messages.  Don't do
     67      * both at the same time since that could lead to race conditions if
     68      * virtqueues or memory tables change while another thread is accessing
     69      * them.
     70      *
     71      * The assumptions are:
     72      * 1. fv_queue_thread() reads/writes to virtqueues and only reads VuDev.
     73      * 2. virtio_loop() reads/writes virtqueues and VuDev.
     74      */
     75     pthread_rwlock_t vu_dispatch_rwlock;
     76 
     77     /*
     78      * The following pair of fields are only accessed in the main
     79      * virtio_loop
     80      */
     81     size_t nqueues;
     82     struct fv_QueueInfo **qi;
     83 };
     84 
     85 /* Callback from libvhost-user */
     86 static uint64_t fv_get_features(VuDev *dev)
     87 {
     88     return 1ULL << VIRTIO_F_VERSION_1;
     89 }
     90 
     91 /* Callback from libvhost-user */
     92 static void fv_set_features(VuDev *dev, uint64_t features)
     93 {
     94 }
     95 
     96 /*
     97  * Callback from libvhost-user if there's a new fd we're supposed to listen
     98  * to, typically a queue kick?
     99  */
    100 static void fv_set_watch(VuDev *dev, int fd, int condition, vu_watch_cb cb,
    101                          void *data)
    102 {
    103     fuse_log(FUSE_LOG_WARNING, "%s: TODO! fd=%d\n", __func__, fd);
    104 }
    105 
    106 /*
    107  * Callback from libvhost-user if we're no longer supposed to listen on an fd
    108  */
    109 static void fv_remove_watch(VuDev *dev, int fd)
    110 {
    111     fuse_log(FUSE_LOG_WARNING, "%s: TODO! fd=%d\n", __func__, fd);
    112 }
    113 
    114 /* Callback from libvhost-user to panic */
    115 static void fv_panic(VuDev *dev, const char *err)
    116 {
    117     fuse_log(FUSE_LOG_ERR, "%s: libvhost-user: %s\n", __func__, err);
    118     /* TODO: Allow reconnects?? */
    119     exit(EXIT_FAILURE);
    120 }
    121 
    122 /*
    123  * Copy from an iovec into a fuse_buf (memory only)
    124  * Caller must ensure there is space
    125  */
    126 static size_t copy_from_iov(struct fuse_buf *buf, size_t out_num,
    127                             const struct iovec *out_sg,
    128                             size_t max)
    129 {
    130     void *dest = buf->mem;
    131     size_t copied = 0;
    132 
    133     while (out_num && max) {
    134         size_t onelen = out_sg->iov_len;
    135         onelen = MIN(onelen, max);
    136         memcpy(dest, out_sg->iov_base, onelen);
    137         dest += onelen;
    138         copied += onelen;
    139         out_sg++;
    140         out_num--;
    141         max -= onelen;
    142     }
    143 
    144     return copied;
    145 }
    146 
    147 /*
    148  * Skip 'skip' bytes in the iov; 'sg_1stindex' is set as
    149  * the index for the 1st iovec to read data from, and
    150  * 'sg_1stskip' is the number of bytes to skip in that entry.
    151  *
    152  * Returns True if there are at least 'skip' bytes in the iovec
    153  *
    154  */
    155 static bool skip_iov(const struct iovec *sg, size_t sg_size,
    156                      size_t skip,
    157                      size_t *sg_1stindex, size_t *sg_1stskip)
    158 {
    159     size_t vec;
    160 
    161     for (vec = 0; vec < sg_size; vec++) {
    162         if (sg[vec].iov_len > skip) {
    163             *sg_1stskip = skip;
    164             *sg_1stindex = vec;
    165 
    166             return true;
    167         }
    168 
    169         skip -= sg[vec].iov_len;
    170     }
    171 
    172     *sg_1stindex = vec;
    173     *sg_1stskip = 0;
    174     return skip == 0;
    175 }
    176 
    177 /*
    178  * Copy from one iov to another, the given number of bytes
    179  * The caller must have checked sizes.
    180  */
    181 static void copy_iov(struct iovec *src_iov, int src_count,
    182                      struct iovec *dst_iov, int dst_count, size_t to_copy)
    183 {
    184     size_t dst_offset = 0;
    185     /* Outer loop copies 'src' elements */
    186     while (to_copy) {
    187         assert(src_count);
    188         size_t src_len = src_iov[0].iov_len;
    189         size_t src_offset = 0;
    190 
    191         if (src_len > to_copy) {
    192             src_len = to_copy;
    193         }
    194         /* Inner loop copies contents of one 'src' to maybe multiple dst. */
    195         while (src_len) {
    196             assert(dst_count);
    197             size_t dst_len = dst_iov[0].iov_len - dst_offset;
    198             if (dst_len > src_len) {
    199                 dst_len = src_len;
    200             }
    201 
    202             memcpy(dst_iov[0].iov_base + dst_offset,
    203                    src_iov[0].iov_base + src_offset, dst_len);
    204             src_len -= dst_len;
    205             to_copy -= dst_len;
    206             src_offset += dst_len;
    207             dst_offset += dst_len;
    208 
    209             assert(dst_offset <= dst_iov[0].iov_len);
    210             if (dst_offset == dst_iov[0].iov_len) {
    211                 dst_offset = 0;
    212                 dst_iov++;
    213                 dst_count--;
    214             }
    215         }
    216         src_iov++;
    217         src_count--;
    218     }
    219 }
    220 
    221 /*
    222  * pthread_rwlock_rdlock() and pthread_rwlock_wrlock can fail if
    223  * a deadlock condition is detected or the current thread already
    224  * owns the lock. They can also fail, like pthread_rwlock_unlock(),
    225  * if the mutex wasn't properly initialized. None of these are ever
    226  * expected to happen.
    227  */
    228 static void vu_dispatch_rdlock(struct fv_VuDev *vud)
    229 {
    230     int ret = pthread_rwlock_rdlock(&vud->vu_dispatch_rwlock);
    231     assert(ret == 0);
    232 }
    233 
    234 static void vu_dispatch_wrlock(struct fv_VuDev *vud)
    235 {
    236     int ret = pthread_rwlock_wrlock(&vud->vu_dispatch_rwlock);
    237     assert(ret == 0);
    238 }
    239 
    240 static void vu_dispatch_unlock(struct fv_VuDev *vud)
    241 {
    242     int ret = pthread_rwlock_unlock(&vud->vu_dispatch_rwlock);
    243     assert(ret == 0);
    244 }
    245 
    246 static void vq_send_element(struct fv_QueueInfo *qi, VuVirtqElement *elem,
    247                             ssize_t len)
    248 {
    249     struct fuse_session *se = qi->virtio_dev->se;
    250     VuDev *dev = &se->virtio_dev->dev;
    251     VuVirtq *q = vu_get_queue(dev, qi->qidx);
    252 
    253     vu_dispatch_rdlock(qi->virtio_dev);
    254     pthread_mutex_lock(&qi->vq_lock);
    255     vu_queue_push(dev, q, elem, len);
    256     vu_queue_notify(dev, q);
    257     pthread_mutex_unlock(&qi->vq_lock);
    258     vu_dispatch_unlock(qi->virtio_dev);
    259 }
    260 
    261 /*
    262  * Called back by ll whenever it wants to send a reply/message back
    263  * The 1st element of the iov starts with the fuse_out_header
    264  * 'unique'==0 means it's a notify message.
    265  */
    266 int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
    267                     struct iovec *iov, int count)
    268 {
    269     FVRequest *req = container_of(ch, FVRequest, ch);
    270     struct fv_QueueInfo *qi = ch->qi;
    271     VuVirtqElement *elem = &req->elem;
    272     int ret = 0;
    273 
    274     assert(count >= 1);
    275     assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
    276 
    277     struct fuse_out_header *out = iov[0].iov_base;
    278     /* TODO: Endianness! */
    279 
    280     size_t tosend_len = iov_size(iov, count);
    281 
    282     /* unique == 0 is notification, which we don't support */
    283     assert(out->unique);
    284     assert(!req->reply_sent);
    285 
    286     /* The 'in' part of the elem is to qemu */
    287     unsigned int in_num = elem->in_num;
    288     struct iovec *in_sg = elem->in_sg;
    289     size_t in_len = iov_size(in_sg, in_num);
    290     fuse_log(FUSE_LOG_DEBUG, "%s: elem %d: with %d in desc of length %zd\n",
    291              __func__, elem->index, in_num, in_len);
    292 
    293     /*
    294      * The elem should have room for a 'fuse_out_header' (out from fuse)
    295      * plus the data based on the len in the header.
    296      */
    297     if (in_len < sizeof(struct fuse_out_header)) {
    298         fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for out_header\n",
    299                  __func__, elem->index);
    300         ret = -E2BIG;
    301         goto err;
    302     }
    303     if (in_len < tosend_len) {
    304         fuse_log(FUSE_LOG_ERR, "%s: elem %d too small for data len %zd\n",
    305                  __func__, elem->index, tosend_len);
    306         ret = -E2BIG;
    307         goto err;
    308     }
    309 
    310     copy_iov(iov, count, in_sg, in_num, tosend_len);
    311 
    312     vq_send_element(qi, elem, tosend_len);
    313     req->reply_sent = true;
    314 
    315 err:
    316     return ret;
    317 }
    318 
    319 /*
    320  * Callback from fuse_send_data_iov_* when it's virtio and the buffer
    321  * is a single FD with FUSE_BUF_IS_FD | FUSE_BUF_FD_SEEK
    322  * We need send the iov and then the buffer.
    323  * Return 0 on success
    324  */
    325 int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
    326                          struct iovec *iov, int count, struct fuse_bufvec *buf,
    327                          size_t len)
    328 {
    329     FVRequest *req = container_of(ch, FVRequest, ch);
    330     struct fv_QueueInfo *qi = ch->qi;
    331     VuVirtqElement *elem = &req->elem;
    332     int ret = 0;
    333     g_autofree struct iovec *in_sg_cpy = NULL;
    334 
    335     assert(count >= 1);
    336     assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
    337 
    338     struct fuse_out_header *out = iov[0].iov_base;
    339     /* TODO: Endianness! */
    340 
    341     size_t iov_len = iov_size(iov, count);
    342     size_t tosend_len = iov_len + len;
    343 
    344     out->len = tosend_len;
    345 
    346     fuse_log(FUSE_LOG_DEBUG, "%s: count=%d len=%zd iov_len=%zd\n", __func__,
    347              count, len, iov_len);
    348 
    349     /* unique == 0 is notification which we don't support */
    350     assert(out->unique);
    351 
    352     assert(!req->reply_sent);
    353 
    354     /* The 'in' part of the elem is to qemu */
    355     unsigned int in_num = elem->in_num;
    356     struct iovec *in_sg = elem->in_sg;
    357     size_t in_len = iov_size(in_sg, in_num);
    358     fuse_log(FUSE_LOG_DEBUG, "%s: elem %d: with %d in desc of length %zd\n",
    359              __func__, elem->index, in_num, in_len);
    360 
    361     /*
    362      * The elem should have room for a 'fuse_out_header' (out from fuse)
    363      * plus the data based on the len in the header.
    364      */
    365     if (in_len < sizeof(struct fuse_out_header)) {
    366         fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for out_header\n",
    367                  __func__, elem->index);
    368         return E2BIG;
    369     }
    370     if (in_len < tosend_len) {
    371         fuse_log(FUSE_LOG_ERR, "%s: elem %d too small for data len %zd\n",
    372                  __func__, elem->index, tosend_len);
    373         return E2BIG;
    374     }
    375 
    376     /* TODO: Limit to 'len' */
    377 
    378     /* First copy the header data from iov->in_sg */
    379     copy_iov(iov, count, in_sg, in_num, iov_len);
    380 
    381     /*
    382      * Build a copy of the in_sg iov so we can skip bits in it,
    383      * including changing the offsets
    384      */
    385     in_sg_cpy = g_new(struct iovec, in_num);
    386     memcpy(in_sg_cpy, in_sg, sizeof(struct iovec) * in_num);
    387     /* These get updated as we skip */
    388     struct iovec *in_sg_ptr = in_sg_cpy;
    389     unsigned int in_sg_cpy_count = in_num;
    390 
    391     /* skip over parts of in_sg that contained the header iov */
    392     iov_discard_front(&in_sg_ptr, &in_sg_cpy_count, iov_len);
    393 
    394     do {
    395         fuse_log(FUSE_LOG_DEBUG, "%s: in_sg_cpy_count=%d len remaining=%zd\n",
    396                  __func__, in_sg_cpy_count, len);
    397 
    398         ret = preadv(buf->buf[0].fd, in_sg_ptr, in_sg_cpy_count,
    399                      buf->buf[0].pos);
    400 
    401         if (ret == -1) {
    402             ret = errno;
    403             if (ret == EINTR) {
    404                 continue;
    405             }
    406             fuse_log(FUSE_LOG_DEBUG, "%s: preadv failed (%m) len=%zd\n",
    407                      __func__, len);
    408             return ret;
    409         }
    410 
    411         if (!ret) {
    412             /* EOF case? */
    413             fuse_log(FUSE_LOG_DEBUG, "%s: !ret len remaining=%zd\n", __func__,
    414                      len);
    415             break;
    416         }
    417         fuse_log(FUSE_LOG_DEBUG, "%s: preadv ret=%d len=%zd\n", __func__,
    418                  ret, len);
    419 
    420         len -= ret;
    421         /* Short read. Retry reading remaining bytes */
    422         if (len) {
    423             fuse_log(FUSE_LOG_DEBUG, "%s: ret < len\n", __func__);
    424             /* Skip over this much next time around */
    425             iov_discard_front(&in_sg_ptr, &in_sg_cpy_count, ret);
    426             buf->buf[0].pos += ret;
    427         }
    428     } while (len);
    429 
    430     /* Need to fix out->len on EOF */
    431     if (len) {
    432         struct fuse_out_header *out_sg = in_sg[0].iov_base;
    433 
    434         tosend_len -= len;
    435         out_sg->len = tosend_len;
    436     }
    437 
    438     vq_send_element(qi, elem, tosend_len);
    439     req->reply_sent = true;
    440     return 0;
    441 }
    442 
    443 static __thread bool clone_fs_called;
    444 
    445 /* Process one FVRequest in a thread pool */
    446 static void fv_queue_worker(gpointer data, gpointer user_data)
    447 {
    448     struct fv_QueueInfo *qi = user_data;
    449     struct fuse_session *se = qi->virtio_dev->se;
    450     FVRequest *req = data;
    451     VuVirtqElement *elem = &req->elem;
    452     struct fuse_buf fbuf = {};
    453     bool allocated_bufv = false;
    454     struct fuse_bufvec bufv;
    455     struct fuse_bufvec *pbufv;
    456     struct fuse_in_header inh;
    457 
    458     assert(se->bufsize > sizeof(struct fuse_in_header));
    459 
    460     if (!clone_fs_called) {
    461         int ret;
    462 
    463         /* unshare FS for xattr operation */
    464         ret = unshare(CLONE_FS);
    465         /* should not fail */
    466         assert(ret == 0);
    467 
    468         clone_fs_called = true;
    469     }
    470 
    471     /*
    472      * An element contains one request and the space to send our response
    473      * They're spread over multiple descriptors in a scatter/gather set
    474      * and we can't trust the guest to keep them still; so copy in/out.
    475      */
    476     fbuf.mem = g_malloc(se->bufsize);
    477 
    478     fuse_mutex_init(&req->ch.lock);
    479     req->ch.fd = -1;
    480     req->ch.qi = qi;
    481 
    482     /* The 'out' part of the elem is from qemu */
    483     unsigned int out_num = elem->out_num;
    484     struct iovec *out_sg = elem->out_sg;
    485     size_t out_len = iov_size(out_sg, out_num);
    486     fuse_log(FUSE_LOG_DEBUG,
    487              "%s: elem %d: with %d out desc of length %zd\n",
    488              __func__, elem->index, out_num, out_len);
    489 
    490     /*
    491      * The elem should contain a 'fuse_in_header' (in to fuse)
    492      * plus the data based on the len in the header.
    493      */
    494     if (out_len < sizeof(struct fuse_in_header)) {
    495         fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
    496                  __func__, elem->index);
    497         assert(0); /* TODO */
    498     }
    499     if (out_len > se->bufsize) {
    500         fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n", __func__,
    501                  elem->index);
    502         assert(0); /* TODO */
    503     }
    504     /* Copy just the fuse_in_header and look at it */
    505     copy_from_iov(&fbuf, out_num, out_sg,
    506                   sizeof(struct fuse_in_header));
    507     memcpy(&inh, fbuf.mem, sizeof(struct fuse_in_header));
    508 
    509     pbufv = NULL; /* Compiler thinks an unitialised path */
    510     if (inh.opcode == FUSE_WRITE &&
    511         out_len >= (sizeof(struct fuse_in_header) +
    512                     sizeof(struct fuse_write_in))) {
    513         /*
    514          * For a write we don't actually need to copy the
    515          * data, we can just do it straight out of guest memory
    516          * but we must still copy the headers in case the guest
    517          * was nasty and changed them while we were using them.
    518          */
    519         fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
    520 
    521         fbuf.size = copy_from_iov(&fbuf, out_num, out_sg,
    522                                   sizeof(struct fuse_in_header) +
    523                                   sizeof(struct fuse_write_in));
    524         /* That copy reread the in_header, make sure we use the original */
    525         memcpy(fbuf.mem, &inh, sizeof(struct fuse_in_header));
    526 
    527         /* Allocate the bufv, with space for the rest of the iov */
    528         pbufv = g_try_malloc(sizeof(struct fuse_bufvec) +
    529                              sizeof(struct fuse_buf) * out_num);
    530         if (!pbufv) {
    531             fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
    532                     __func__);
    533             goto out;
    534         }
    535 
    536         allocated_bufv = true;
    537         pbufv->count = 1;
    538         pbufv->buf[0] = fbuf;
    539 
    540         size_t iovindex, pbufvindex, iov_bytes_skip;
    541         pbufvindex = 1; /* 2 headers, 1 fusebuf */
    542 
    543         if (!skip_iov(out_sg, out_num,
    544                       sizeof(struct fuse_in_header) +
    545                       sizeof(struct fuse_write_in),
    546                       &iovindex, &iov_bytes_skip)) {
    547             fuse_log(FUSE_LOG_ERR, "%s: skip failed\n",
    548                     __func__);
    549             goto out;
    550         }
    551 
    552         for (; iovindex < out_num; iovindex++, pbufvindex++) {
    553             pbufv->count++;
    554             pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
    555             pbufv->buf[pbufvindex].flags = 0;
    556             pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
    557             pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
    558 
    559             if (iov_bytes_skip) {
    560                 pbufv->buf[pbufvindex].mem += iov_bytes_skip;
    561                 pbufv->buf[pbufvindex].size -= iov_bytes_skip;
    562                 iov_bytes_skip = 0;
    563             }
    564         }
    565     } else {
    566         /* Normal (non fast write) path */
    567 
    568         copy_from_iov(&fbuf, out_num, out_sg, se->bufsize);
    569         /* That copy reread the in_header, make sure we use the original */
    570         memcpy(fbuf.mem, &inh, sizeof(struct fuse_in_header));
    571         fbuf.size = out_len;
    572 
    573         /* TODO! Endianness of header */
    574 
    575         /* TODO: Add checks for fuse_session_exited */
    576         bufv.buf[0] = fbuf;
    577         bufv.count = 1;
    578         pbufv = &bufv;
    579     }
    580     pbufv->idx = 0;
    581     pbufv->off = 0;
    582     fuse_session_process_buf_int(se, pbufv, &req->ch);
    583 
    584 out:
    585     if (allocated_bufv) {
    586         g_free(pbufv);
    587     }
    588 
    589     /* If the request has no reply, still recycle the virtqueue element */
    590     if (!req->reply_sent) {
    591         fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n", __func__,
    592                  elem->index);
    593         vq_send_element(qi, elem, 0);
    594     }
    595 
    596     pthread_mutex_destroy(&req->ch.lock);
    597     g_free(fbuf.mem);
    598     free(req);
    599 }
    600 
    601 /* Thread function for individual queues, created when a queue is 'started' */
    602 static void *fv_queue_thread(void *opaque)
    603 {
    604     struct fv_QueueInfo *qi = opaque;
    605     struct VuDev *dev = &qi->virtio_dev->dev;
    606     struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
    607     struct fuse_session *se = qi->virtio_dev->se;
    608     GThreadPool *pool = NULL;
    609     GList *req_list = NULL;
    610 
    611     if (se->thread_pool_size) {
    612         fuse_log(FUSE_LOG_DEBUG, "%s: Creating thread pool for Queue %d\n",
    613                  __func__, qi->qidx);
    614         pool = g_thread_pool_new(fv_queue_worker, qi, se->thread_pool_size,
    615                                  FALSE, NULL);
    616         if (!pool) {
    617             fuse_log(FUSE_LOG_ERR, "%s: g_thread_pool_new failed\n", __func__);
    618             return NULL;
    619         }
    620     }
    621 
    622     fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__,
    623              qi->qidx, qi->kick_fd);
    624     while (1) {
    625         struct pollfd pf[2];
    626 
    627         pf[0].fd = qi->kick_fd;
    628         pf[0].events = POLLIN;
    629         pf[0].revents = 0;
    630         pf[1].fd = qi->kill_fd;
    631         pf[1].events = POLLIN;
    632         pf[1].revents = 0;
    633 
    634         fuse_log(FUSE_LOG_DEBUG, "%s: Waiting for Queue %d event\n", __func__,
    635                  qi->qidx);
    636         int poll_res = ppoll(pf, 2, NULL, NULL);
    637 
    638         if (poll_res == -1) {
    639             if (errno == EINTR) {
    640                 fuse_log(FUSE_LOG_INFO, "%s: ppoll interrupted, going around\n",
    641                          __func__);
    642                 continue;
    643             }
    644             fuse_log(FUSE_LOG_ERR, "fv_queue_thread ppoll: %m\n");
    645             break;
    646         }
    647         assert(poll_res >= 1);
    648         if (pf[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
    649             fuse_log(FUSE_LOG_ERR, "%s: Unexpected poll revents %x Queue %d\n",
    650                      __func__, pf[0].revents, qi->qidx);
    651             break;
    652         }
    653         if (pf[1].revents & (POLLERR | POLLHUP | POLLNVAL)) {
    654             fuse_log(FUSE_LOG_ERR,
    655                      "%s: Unexpected poll revents %x Queue %d killfd\n",
    656                      __func__, pf[1].revents, qi->qidx);
    657             break;
    658         }
    659         if (pf[1].revents) {
    660             fuse_log(FUSE_LOG_INFO, "%s: kill event on queue %d - quitting\n",
    661                      __func__, qi->qidx);
    662             break;
    663         }
    664         assert(pf[0].revents & POLLIN);
    665         fuse_log(FUSE_LOG_DEBUG, "%s: Got queue event on Queue %d\n", __func__,
    666                  qi->qidx);
    667 
    668         eventfd_t evalue;
    669         if (eventfd_read(qi->kick_fd, &evalue)) {
    670             fuse_log(FUSE_LOG_ERR, "Eventfd_read for queue: %m\n");
    671             break;
    672         }
    673         /* Mutual exclusion with virtio_loop() */
    674         vu_dispatch_rdlock(qi->virtio_dev);
    675         pthread_mutex_lock(&qi->vq_lock);
    676         /* out is from guest, in is too guest */
    677         unsigned int in_bytes, out_bytes;
    678         vu_queue_get_avail_bytes(dev, q, &in_bytes, &out_bytes, ~0, ~0);
    679 
    680         fuse_log(FUSE_LOG_DEBUG,
    681                  "%s: Queue %d gave evalue: %zx available: in: %u out: %u\n",
    682                  __func__, qi->qidx, (size_t)evalue, in_bytes, out_bytes);
    683 
    684         while (1) {
    685             FVRequest *req = vu_queue_pop(dev, q, sizeof(FVRequest));
    686             if (!req) {
    687                 break;
    688             }
    689 
    690             req->reply_sent = false;
    691 
    692             if (!se->thread_pool_size) {
    693                 req_list = g_list_prepend(req_list, req);
    694             } else {
    695                 g_thread_pool_push(pool, req, NULL);
    696             }
    697         }
    698 
    699         pthread_mutex_unlock(&qi->vq_lock);
    700         vu_dispatch_unlock(qi->virtio_dev);
    701 
    702         /* Process all the requests. */
    703         if (!se->thread_pool_size && req_list != NULL) {
    704             req_list = g_list_reverse(req_list);
    705             g_list_foreach(req_list, fv_queue_worker, qi);
    706             g_list_free(req_list);
    707             req_list = NULL;
    708         }
    709     }
    710 
    711     if (pool) {
    712         g_thread_pool_free(pool, FALSE, TRUE);
    713     }
    714 
    715     return NULL;
    716 }
    717 
    718 static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx)
    719 {
    720     int ret;
    721     struct fv_QueueInfo *ourqi;
    722 
    723     assert(qidx < vud->nqueues);
    724     ourqi = vud->qi[qidx];
    725 
    726     /* Kill the thread */
    727     if (eventfd_write(ourqi->kill_fd, 1)) {
    728         fuse_log(FUSE_LOG_ERR, "Eventfd_write for queue %d: %s\n",
    729                  qidx, strerror(errno));
    730     }
    731     ret = pthread_join(ourqi->thread, NULL);
    732     if (ret) {
    733         fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err %d\n",
    734                  __func__, qidx, ret);
    735     }
    736     pthread_mutex_destroy(&ourqi->vq_lock);
    737     close(ourqi->kill_fd);
    738     ourqi->kick_fd = -1;
    739     g_free(vud->qi[qidx]);
    740     vud->qi[qidx] = NULL;
    741 }
    742 
    743 static void stop_all_queues(struct fv_VuDev *vud)
    744 {
    745     for (int i = 0; i < vud->nqueues; i++) {
    746         if (!vud->qi[i]) {
    747             continue;
    748         }
    749 
    750         fuse_log(FUSE_LOG_INFO, "%s: Stopping queue %d thread\n", __func__, i);
    751         fv_queue_cleanup_thread(vud, i);
    752     }
    753 }
    754 
    755 /* Callback from libvhost-user on start or stop of a queue */
    756 static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
    757 {
    758     struct fv_VuDev *vud = container_of(dev, struct fv_VuDev, dev);
    759     struct fv_QueueInfo *ourqi;
    760 
    761     fuse_log(FUSE_LOG_INFO, "%s: qidx=%d started=%d\n", __func__, qidx,
    762              started);
    763     assert(qidx >= 0);
    764 
    765     /*
    766      * Ignore additional request queues for now.  passthrough_ll.c must be
    767      * audited for thread-safety issues first.  It was written with a
    768      * well-behaved client in mind and may not protect against all types of
    769      * races yet.
    770      */
    771     if (qidx > 1) {
    772         fuse_log(FUSE_LOG_ERR,
    773                  "%s: multiple request queues not yet implemented, please only "
    774                  "configure 1 request queue\n",
    775                  __func__);
    776         exit(EXIT_FAILURE);
    777     }
    778 
    779     if (started) {
    780         /* Fire up a thread to watch this queue */
    781         if (qidx >= vud->nqueues) {
    782             vud->qi = g_realloc_n(vud->qi, qidx + 1, sizeof(vud->qi[0]));
    783             memset(vud->qi + vud->nqueues, 0,
    784                    sizeof(vud->qi[0]) * (1 + (qidx - vud->nqueues)));
    785             vud->nqueues = qidx + 1;
    786         }
    787         if (!vud->qi[qidx]) {
    788             vud->qi[qidx] = g_new0(struct fv_QueueInfo, 1);
    789             vud->qi[qidx]->virtio_dev = vud;
    790             vud->qi[qidx]->qidx = qidx;
    791         } else {
    792             /* Shouldn't have been started */
    793             assert(vud->qi[qidx]->kick_fd == -1);
    794         }
    795         ourqi = vud->qi[qidx];
    796         ourqi->kick_fd = dev->vq[qidx].kick_fd;
    797 
    798         ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
    799         assert(ourqi->kill_fd != -1);
    800         pthread_mutex_init(&ourqi->vq_lock, NULL);
    801 
    802         if (pthread_create(&ourqi->thread, NULL, fv_queue_thread, ourqi)) {
    803             fuse_log(FUSE_LOG_ERR, "%s: Failed to create thread for queue %d\n",
    804                      __func__, qidx);
    805             assert(0);
    806         }
    807     } else {
    808         /*
    809          * Temporarily drop write-lock taken in virtio_loop() so that
    810          * the queue thread doesn't block in virtio_send_msg().
    811          */
    812         vu_dispatch_unlock(vud);
    813         fv_queue_cleanup_thread(vud, qidx);
    814         vu_dispatch_wrlock(vud);
    815     }
    816 }
    817 
    818 static bool fv_queue_order(VuDev *dev, int qidx)
    819 {
    820     return false;
    821 }
    822 
    823 static const VuDevIface fv_iface = {
    824     .get_features = fv_get_features,
    825     .set_features = fv_set_features,
    826 
    827     /* Don't need process message, we've not got any at vhost-user level */
    828     .queue_set_started = fv_queue_set_started,
    829 
    830     .queue_is_processed_in_order = fv_queue_order,
    831 };
    832 
    833 /*
    834  * Main loop; this mostly deals with events on the vhost-user
    835  * socket itself, and not actual fuse data.
    836  */
    837 int virtio_loop(struct fuse_session *se)
    838 {
    839     fuse_log(FUSE_LOG_INFO, "%s: Entry\n", __func__);
    840 
    841     while (!fuse_session_exited(se)) {
    842         struct pollfd pf[1];
    843         bool ok;
    844         pf[0].fd = se->vu_socketfd;
    845         pf[0].events = POLLIN;
    846         pf[0].revents = 0;
    847 
    848         fuse_log(FUSE_LOG_DEBUG, "%s: Waiting for VU event\n", __func__);
    849         int poll_res = ppoll(pf, 1, NULL, NULL);
    850 
    851         if (poll_res == -1) {
    852             if (errno == EINTR) {
    853                 fuse_log(FUSE_LOG_INFO, "%s: ppoll interrupted, going around\n",
    854                          __func__);
    855                 continue;
    856             }
    857             fuse_log(FUSE_LOG_ERR, "virtio_loop ppoll: %m\n");
    858             break;
    859         }
    860         assert(poll_res == 1);
    861         if (pf[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
    862             fuse_log(FUSE_LOG_ERR, "%s: Unexpected poll revents %x\n", __func__,
    863                      pf[0].revents);
    864             break;
    865         }
    866         assert(pf[0].revents & POLLIN);
    867         fuse_log(FUSE_LOG_DEBUG, "%s: Got VU event\n", __func__);
    868         /* Mutual exclusion with fv_queue_thread() */
    869         vu_dispatch_wrlock(se->virtio_dev);
    870 
    871         ok = vu_dispatch(&se->virtio_dev->dev);
    872 
    873         vu_dispatch_unlock(se->virtio_dev);
    874 
    875         if (!ok) {
    876             fuse_log(FUSE_LOG_ERR, "%s: vu_dispatch failed\n", __func__);
    877             break;
    878         }
    879     }
    880 
    881     /*
    882      * Make sure all fv_queue_thread()s quit on exit, as we're about to
    883      * free virtio dev and fuse session, no one should access them anymore.
    884      */
    885     stop_all_queues(se->virtio_dev);
    886     fuse_log(FUSE_LOG_INFO, "%s: Exit\n", __func__);
    887 
    888     return 0;
    889 }
    890 
    891 static void strreplace(char *s, char old, char new)
    892 {
    893     for (; *s; ++s) {
    894         if (*s == old) {
    895             *s = new;
    896         }
    897     }
    898 }
    899 
    900 static bool fv_socket_lock(struct fuse_session *se)
    901 {
    902     g_autofree gchar *sk_name = NULL;
    903     g_autofree gchar *pidfile = NULL;
    904     g_autofree gchar *state = NULL;
    905     g_autofree gchar *dir = NULL;
    906     Error *local_err = NULL;
    907 
    908     state = qemu_get_local_state_dir();
    909     dir = g_build_filename(state, "run", "virtiofsd", NULL);
    910 
    911     if (g_mkdir_with_parents(dir, S_IRWXU) < 0) {
    912         fuse_log(FUSE_LOG_ERR, "%s: Failed to create directory %s: %s\n",
    913                  __func__, dir, strerror(errno));
    914         return false;
    915     }
    916 
    917     sk_name = g_strdup(se->vu_socket_path);
    918     strreplace(sk_name, '/', '.');
    919     pidfile = g_strdup_printf("%s/%s.pid", dir, sk_name);
    920 
    921     if (!qemu_write_pidfile(pidfile, &local_err)) {
    922         error_report_err(local_err);
    923         return false;
    924     }
    925 
    926     return true;
    927 }
    928 
    929 static int fv_create_listen_socket(struct fuse_session *se)
    930 {
    931     struct sockaddr_un un;
    932     mode_t old_umask;
    933 
    934     /* Nothing to do if fd is already initialized */
    935     if (se->vu_listen_fd >= 0) {
    936         return 0;
    937     }
    938 
    939     if (strlen(se->vu_socket_path) >= sizeof(un.sun_path)) {
    940         fuse_log(FUSE_LOG_ERR, "Socket path too long\n");
    941         return -1;
    942     }
    943 
    944     if (!strlen(se->vu_socket_path)) {
    945         fuse_log(FUSE_LOG_ERR, "Socket path is empty\n");
    946         return -1;
    947     }
    948 
    949     /* Check the vu_socket_path is already used */
    950     if (!fv_socket_lock(se)) {
    951         return -1;
    952     }
    953 
    954     /*
    955      * Create the Unix socket to communicate with qemu
    956      * based on QEMU's vhost-user-bridge
    957      */
    958     unlink(se->vu_socket_path);
    959     strcpy(un.sun_path, se->vu_socket_path);
    960     size_t addr_len = sizeof(un);
    961 
    962     int listen_sock = socket(AF_UNIX, SOCK_STREAM, 0);
    963     if (listen_sock == -1) {
    964         fuse_log(FUSE_LOG_ERR, "vhost socket creation: %m\n");
    965         return -1;
    966     }
    967     un.sun_family = AF_UNIX;
    968 
    969     /*
    970      * Unfortunately bind doesn't let you set the mask on the socket,
    971      * so set umask appropriately and restore it later.
    972      */
    973     if (se->vu_socket_group) {
    974         old_umask = umask(S_IROTH | S_IWOTH | S_IXOTH);
    975     } else {
    976         old_umask = umask(S_IRGRP | S_IWGRP | S_IXGRP |
    977                           S_IROTH | S_IWOTH | S_IXOTH);
    978     }
    979     if (bind(listen_sock, (struct sockaddr *)&un, addr_len) == -1) {
    980         fuse_log(FUSE_LOG_ERR, "vhost socket bind: %m\n");
    981         close(listen_sock);
    982         umask(old_umask);
    983         return -1;
    984     }
    985     if (se->vu_socket_group) {
    986         struct group *g = getgrnam(se->vu_socket_group);
    987         if (g) {
    988             if (chown(se->vu_socket_path, -1, g->gr_gid) == -1) {
    989                 fuse_log(FUSE_LOG_WARNING,
    990                          "vhost socket failed to set group to %s (%d): %m\n",
    991                          se->vu_socket_group, g->gr_gid);
    992             }
    993         } else {
    994             fuse_log(FUSE_LOG_ERR,
    995                      "vhost socket: unable to find group '%s'\n",
    996                      se->vu_socket_group);
    997             close(listen_sock);
    998             umask(old_umask);
    999             return -1;
   1000         }
   1001     }
   1002     umask(old_umask);
   1003 
   1004     if (listen(listen_sock, 1) == -1) {
   1005         fuse_log(FUSE_LOG_ERR, "vhost socket listen: %m\n");
   1006         close(listen_sock);
   1007         return -1;
   1008     }
   1009 
   1010     se->vu_listen_fd = listen_sock;
   1011     return 0;
   1012 }
   1013 
   1014 int virtio_session_mount(struct fuse_session *se)
   1015 {
   1016     int ret;
   1017 
   1018     /*
   1019      * Test that unshare(CLONE_FS) works. fv_queue_worker() will need it. It's
   1020      * an unprivileged system call but some Docker/Moby versions are known to
   1021      * reject it via seccomp when CAP_SYS_ADMIN is not given.
   1022      *
   1023      * Note that the program is single-threaded here so this syscall has no
   1024      * visible effect and is safe to make.
   1025      */
   1026     ret = unshare(CLONE_FS);
   1027     if (ret == -1 && errno == EPERM) {
   1028         fuse_log(FUSE_LOG_ERR, "unshare(CLONE_FS) failed with EPERM. If "
   1029                 "running in a container please check that the container "
   1030                 "runtime seccomp policy allows unshare.\n");
   1031         return -1;
   1032     }
   1033 
   1034     ret = fv_create_listen_socket(se);
   1035     if (ret < 0) {
   1036         return ret;
   1037     }
   1038 
   1039     se->fd = -1;
   1040 
   1041     fuse_log(FUSE_LOG_INFO, "%s: Waiting for vhost-user socket connection...\n",
   1042              __func__);
   1043     int data_sock = accept(se->vu_listen_fd, NULL, NULL);
   1044     if (data_sock == -1) {
   1045         fuse_log(FUSE_LOG_ERR, "vhost socket accept: %m\n");
   1046         close(se->vu_listen_fd);
   1047         return -1;
   1048     }
   1049     close(se->vu_listen_fd);
   1050     se->vu_listen_fd = -1;
   1051     fuse_log(FUSE_LOG_INFO, "%s: Received vhost-user socket connection\n",
   1052              __func__);
   1053 
   1054     /* TODO: Some cleanup/deallocation! */
   1055     se->virtio_dev = g_new0(struct fv_VuDev, 1);
   1056 
   1057     se->vu_socketfd = data_sock;
   1058     se->virtio_dev->se = se;
   1059     pthread_rwlock_init(&se->virtio_dev->vu_dispatch_rwlock, NULL);
   1060     if (!vu_init(&se->virtio_dev->dev, 2, se->vu_socketfd, fv_panic, NULL,
   1061                  fv_set_watch, fv_remove_watch, &fv_iface)) {
   1062         fuse_log(FUSE_LOG_ERR, "%s: vu_init failed\n", __func__);
   1063         return -1;
   1064     }
   1065 
   1066     return 0;
   1067 }
   1068 
   1069 void virtio_session_close(struct fuse_session *se)
   1070 {
   1071     close(se->vu_socketfd);
   1072 
   1073     if (!se->virtio_dev) {
   1074         return;
   1075     }
   1076 
   1077     g_free(se->virtio_dev->qi);
   1078     pthread_rwlock_destroy(&se->virtio_dev->vu_dispatch_rwlock);
   1079     g_free(se->virtio_dev);
   1080     se->virtio_dev = NULL;
   1081 }