cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

fuse_virtio.c (32758B)


      1/*
      2 * virtio-fs glue for FUSE
      3 * Copyright (C) 2018 Red Hat, Inc. and/or its affiliates
      4 *
      5 * Authors:
      6 *   Dave Gilbert  <dgilbert@redhat.com>
      7 *
      8 * Implements the glue between libfuse and libvhost-user
      9 *
     10 * This program can be distributed under the terms of the GNU LGPLv2.
     11 * See the file COPYING.LIB
     12 */
     13
     14#include "qemu/osdep.h"
     15#include "qemu/iov.h"
     16#include "qapi/error.h"
     17#include "fuse_i.h"
     18#include "standard-headers/linux/fuse.h"
     19#include "fuse_misc.h"
     20#include "fuse_opt.h"
     21#include "fuse_virtio.h"
     22
     23#include <sys/eventfd.h>
     24#include <sys/socket.h>
     25#include <sys/un.h>
     26#include <grp.h>
     27
     28#include "libvhost-user.h"
     29
     30struct fv_VuDev;
     31struct fv_QueueInfo {
     32    pthread_t thread;
     33    /*
     34     * This lock protects the VuVirtq preventing races between
     35     * fv_queue_thread() and fv_queue_worker().
     36     */
     37    pthread_mutex_t vq_lock;
     38
     39    struct fv_VuDev *virtio_dev;
     40
     41    /* Our queue index, corresponds to array position */
     42    int qidx;
     43    int kick_fd;
     44    int kill_fd; /* For killing the thread */
     45};
     46
     47/* A FUSE request */
     48typedef struct {
     49    VuVirtqElement elem;
     50    struct fuse_chan ch;
     51
     52    /* Used to complete requests that involve no reply */
     53    bool reply_sent;
     54} FVRequest;
     55
     56/*
     57 * We pass the dev element into libvhost-user
     58 * and then use it to get back to the outer
     59 * container for other data.
     60 */
     61struct fv_VuDev {
     62    VuDev dev;
     63    struct fuse_session *se;
     64
     65    /*
     66     * Either handle virtqueues or vhost-user protocol messages.  Don't do
     67     * both at the same time since that could lead to race conditions if
     68     * virtqueues or memory tables change while another thread is accessing
     69     * them.
     70     *
     71     * The assumptions are:
     72     * 1. fv_queue_thread() reads/writes to virtqueues and only reads VuDev.
     73     * 2. virtio_loop() reads/writes virtqueues and VuDev.
     74     */
     75    pthread_rwlock_t vu_dispatch_rwlock;
     76
     77    /*
     78     * The following pair of fields are only accessed in the main
     79     * virtio_loop
     80     */
     81    size_t nqueues;
     82    struct fv_QueueInfo **qi;
     83};
     84
     85/* From spec */
     86struct virtio_fs_config {
     87    char tag[36];
     88    uint32_t num_queues;
     89};
     90
     91/* Callback from libvhost-user */
     92static uint64_t fv_get_features(VuDev *dev)
     93{
     94    return 1ULL << VIRTIO_F_VERSION_1;
     95}
     96
     97/* Callback from libvhost-user */
     98static void fv_set_features(VuDev *dev, uint64_t features)
     99{
    100}
    101
    102/*
    103 * Callback from libvhost-user if there's a new fd we're supposed to listen
    104 * to, typically a queue kick?
    105 */
    106static void fv_set_watch(VuDev *dev, int fd, int condition, vu_watch_cb cb,
    107                         void *data)
    108{
    109    fuse_log(FUSE_LOG_WARNING, "%s: TODO! fd=%d\n", __func__, fd);
    110}
    111
    112/*
    113 * Callback from libvhost-user if we're no longer supposed to listen on an fd
    114 */
    115static void fv_remove_watch(VuDev *dev, int fd)
    116{
    117    fuse_log(FUSE_LOG_WARNING, "%s: TODO! fd=%d\n", __func__, fd);
    118}
    119
    120/* Callback from libvhost-user to panic */
    121static void fv_panic(VuDev *dev, const char *err)
    122{
    123    fuse_log(FUSE_LOG_ERR, "%s: libvhost-user: %s\n", __func__, err);
    124    /* TODO: Allow reconnects?? */
    125    exit(EXIT_FAILURE);
    126}
    127
    128/*
    129 * Copy from an iovec into a fuse_buf (memory only)
    130 * Caller must ensure there is space
    131 */
    132static size_t copy_from_iov(struct fuse_buf *buf, size_t out_num,
    133                            const struct iovec *out_sg,
    134                            size_t max)
    135{
    136    void *dest = buf->mem;
    137    size_t copied = 0;
    138
    139    while (out_num && max) {
    140        size_t onelen = out_sg->iov_len;
    141        onelen = MIN(onelen, max);
    142        memcpy(dest, out_sg->iov_base, onelen);
    143        dest += onelen;
    144        copied += onelen;
    145        out_sg++;
    146        out_num--;
    147        max -= onelen;
    148    }
    149
    150    return copied;
    151}
    152
    153/*
    154 * Skip 'skip' bytes in the iov; 'sg_1stindex' is set as
    155 * the index for the 1st iovec to read data from, and
    156 * 'sg_1stskip' is the number of bytes to skip in that entry.
    157 *
    158 * Returns True if there are at least 'skip' bytes in the iovec
    159 *
    160 */
    161static bool skip_iov(const struct iovec *sg, size_t sg_size,
    162                     size_t skip,
    163                     size_t *sg_1stindex, size_t *sg_1stskip)
    164{
    165    size_t vec;
    166
    167    for (vec = 0; vec < sg_size; vec++) {
    168        if (sg[vec].iov_len > skip) {
    169            *sg_1stskip = skip;
    170            *sg_1stindex = vec;
    171
    172            return true;
    173        }
    174
    175        skip -= sg[vec].iov_len;
    176    }
    177
    178    *sg_1stindex = vec;
    179    *sg_1stskip = 0;
    180    return skip == 0;
    181}
    182
    183/*
    184 * Copy from one iov to another, the given number of bytes
    185 * The caller must have checked sizes.
    186 */
    187static void copy_iov(struct iovec *src_iov, int src_count,
    188                     struct iovec *dst_iov, int dst_count, size_t to_copy)
    189{
    190    size_t dst_offset = 0;
    191    /* Outer loop copies 'src' elements */
    192    while (to_copy) {
    193        assert(src_count);
    194        size_t src_len = src_iov[0].iov_len;
    195        size_t src_offset = 0;
    196
    197        if (src_len > to_copy) {
    198            src_len = to_copy;
    199        }
    200        /* Inner loop copies contents of one 'src' to maybe multiple dst. */
    201        while (src_len) {
    202            assert(dst_count);
    203            size_t dst_len = dst_iov[0].iov_len - dst_offset;
    204            if (dst_len > src_len) {
    205                dst_len = src_len;
    206            }
    207
    208            memcpy(dst_iov[0].iov_base + dst_offset,
    209                   src_iov[0].iov_base + src_offset, dst_len);
    210            src_len -= dst_len;
    211            to_copy -= dst_len;
    212            src_offset += dst_len;
    213            dst_offset += dst_len;
    214
    215            assert(dst_offset <= dst_iov[0].iov_len);
    216            if (dst_offset == dst_iov[0].iov_len) {
    217                dst_offset = 0;
    218                dst_iov++;
    219                dst_count--;
    220            }
    221        }
    222        src_iov++;
    223        src_count--;
    224    }
    225}
    226
    227/*
    228 * pthread_rwlock_rdlock() and pthread_rwlock_wrlock can fail if
    229 * a deadlock condition is detected or the current thread already
    230 * owns the lock. They can also fail, like pthread_rwlock_unlock(),
    231 * if the mutex wasn't properly initialized. None of these are ever
    232 * expected to happen.
    233 */
    234static void vu_dispatch_rdlock(struct fv_VuDev *vud)
    235{
    236    int ret = pthread_rwlock_rdlock(&vud->vu_dispatch_rwlock);
    237    assert(ret == 0);
    238}
    239
    240static void vu_dispatch_wrlock(struct fv_VuDev *vud)
    241{
    242    int ret = pthread_rwlock_wrlock(&vud->vu_dispatch_rwlock);
    243    assert(ret == 0);
    244}
    245
    246static void vu_dispatch_unlock(struct fv_VuDev *vud)
    247{
    248    int ret = pthread_rwlock_unlock(&vud->vu_dispatch_rwlock);
    249    assert(ret == 0);
    250}
    251
    252/*
    253 * Called back by ll whenever it wants to send a reply/message back
    254 * The 1st element of the iov starts with the fuse_out_header
    255 * 'unique'==0 means it's a notify message.
    256 */
    257int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
    258                    struct iovec *iov, int count)
    259{
    260    FVRequest *req = container_of(ch, FVRequest, ch);
    261    struct fv_QueueInfo *qi = ch->qi;
    262    VuDev *dev = &se->virtio_dev->dev;
    263    VuVirtq *q = vu_get_queue(dev, qi->qidx);
    264    VuVirtqElement *elem = &req->elem;
    265    int ret = 0;
    266
    267    assert(count >= 1);
    268    assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
    269
    270    struct fuse_out_header *out = iov[0].iov_base;
    271    /* TODO: Endianness! */
    272
    273    size_t tosend_len = iov_size(iov, count);
    274
    275    /* unique == 0 is notification, which we don't support */
    276    assert(out->unique);
    277    assert(!req->reply_sent);
    278
    279    /* The 'in' part of the elem is to qemu */
    280    unsigned int in_num = elem->in_num;
    281    struct iovec *in_sg = elem->in_sg;
    282    size_t in_len = iov_size(in_sg, in_num);
    283    fuse_log(FUSE_LOG_DEBUG, "%s: elem %d: with %d in desc of length %zd\n",
    284             __func__, elem->index, in_num, in_len);
    285
    286    /*
    287     * The elem should have room for a 'fuse_out_header' (out from fuse)
    288     * plus the data based on the len in the header.
    289     */
    290    if (in_len < sizeof(struct fuse_out_header)) {
    291        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for out_header\n",
    292                 __func__, elem->index);
    293        ret = -E2BIG;
    294        goto err;
    295    }
    296    if (in_len < tosend_len) {
    297        fuse_log(FUSE_LOG_ERR, "%s: elem %d too small for data len %zd\n",
    298                 __func__, elem->index, tosend_len);
    299        ret = -E2BIG;
    300        goto err;
    301    }
    302
    303    copy_iov(iov, count, in_sg, in_num, tosend_len);
    304
    305    vu_dispatch_rdlock(qi->virtio_dev);
    306    pthread_mutex_lock(&qi->vq_lock);
    307    vu_queue_push(dev, q, elem, tosend_len);
    308    vu_queue_notify(dev, q);
    309    pthread_mutex_unlock(&qi->vq_lock);
    310    vu_dispatch_unlock(qi->virtio_dev);
    311
    312    req->reply_sent = true;
    313
    314err:
    315    return ret;
    316}
    317
    318/*
    319 * Callback from fuse_send_data_iov_* when it's virtio and the buffer
    320 * is a single FD with FUSE_BUF_IS_FD | FUSE_BUF_FD_SEEK
    321 * We need send the iov and then the buffer.
    322 * Return 0 on success
    323 */
    324int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
    325                         struct iovec *iov, int count, struct fuse_bufvec *buf,
    326                         size_t len)
    327{
    328    FVRequest *req = container_of(ch, FVRequest, ch);
    329    struct fv_QueueInfo *qi = ch->qi;
    330    VuDev *dev = &se->virtio_dev->dev;
    331    VuVirtq *q = vu_get_queue(dev, qi->qidx);
    332    VuVirtqElement *elem = &req->elem;
    333    int ret = 0;
    334    g_autofree struct iovec *in_sg_cpy = NULL;
    335
    336    assert(count >= 1);
    337    assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
    338
    339    struct fuse_out_header *out = iov[0].iov_base;
    340    /* TODO: Endianness! */
    341
    342    size_t iov_len = iov_size(iov, count);
    343    size_t tosend_len = iov_len + len;
    344
    345    out->len = tosend_len;
    346
    347    fuse_log(FUSE_LOG_DEBUG, "%s: count=%d len=%zd iov_len=%zd\n", __func__,
    348             count, len, iov_len);
    349
    350    /* unique == 0 is notification which we don't support */
    351    assert(out->unique);
    352
    353    assert(!req->reply_sent);
    354
    355    /* The 'in' part of the elem is to qemu */
    356    unsigned int in_num = elem->in_num;
    357    struct iovec *in_sg = elem->in_sg;
    358    size_t in_len = iov_size(in_sg, in_num);
    359    fuse_log(FUSE_LOG_DEBUG, "%s: elem %d: with %d in desc of length %zd\n",
    360             __func__, elem->index, in_num, in_len);
    361
    362    /*
    363     * The elem should have room for a 'fuse_out_header' (out from fuse)
    364     * plus the data based on the len in the header.
    365     */
    366    if (in_len < sizeof(struct fuse_out_header)) {
    367        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for out_header\n",
    368                 __func__, elem->index);
    369        return E2BIG;
    370    }
    371    if (in_len < tosend_len) {
    372        fuse_log(FUSE_LOG_ERR, "%s: elem %d too small for data len %zd\n",
    373                 __func__, elem->index, tosend_len);
    374        return E2BIG;
    375    }
    376
    377    /* TODO: Limit to 'len' */
    378
    379    /* First copy the header data from iov->in_sg */
    380    copy_iov(iov, count, in_sg, in_num, iov_len);
    381
    382    /*
    383     * Build a copy of the the in_sg iov so we can skip bits in it,
    384     * including changing the offsets
    385     */
    386    in_sg_cpy = g_new(struct iovec, in_num);
    387    memcpy(in_sg_cpy, in_sg, sizeof(struct iovec) * in_num);
    388    /* These get updated as we skip */
    389    struct iovec *in_sg_ptr = in_sg_cpy;
    390    unsigned int in_sg_cpy_count = in_num;
    391
    392    /* skip over parts of in_sg that contained the header iov */
    393    iov_discard_front(&in_sg_ptr, &in_sg_cpy_count, iov_len);
    394
    395    do {
    396        fuse_log(FUSE_LOG_DEBUG, "%s: in_sg_cpy_count=%d len remaining=%zd\n",
    397                 __func__, in_sg_cpy_count, len);
    398
    399        ret = preadv(buf->buf[0].fd, in_sg_ptr, in_sg_cpy_count,
    400                     buf->buf[0].pos);
    401
    402        if (ret == -1) {
    403            ret = errno;
    404            if (ret == EINTR) {
    405                continue;
    406            }
    407            fuse_log(FUSE_LOG_DEBUG, "%s: preadv failed (%m) len=%zd\n",
    408                     __func__, len);
    409            return ret;
    410        }
    411
    412        if (!ret) {
    413            /* EOF case? */
    414            fuse_log(FUSE_LOG_DEBUG, "%s: !ret len remaining=%zd\n", __func__,
    415                     len);
    416            break;
    417        }
    418        fuse_log(FUSE_LOG_DEBUG, "%s: preadv ret=%d len=%zd\n", __func__,
    419                 ret, len);
    420
    421        len -= ret;
    422        /* Short read. Retry reading remaining bytes */
    423        if (len) {
    424            fuse_log(FUSE_LOG_DEBUG, "%s: ret < len\n", __func__);
    425            /* Skip over this much next time around */
    426            iov_discard_front(&in_sg_ptr, &in_sg_cpy_count, ret);
    427            buf->buf[0].pos += ret;
    428        }
    429    } while (len);
    430
    431    /* Need to fix out->len on EOF */
    432    if (len) {
    433        struct fuse_out_header *out_sg = in_sg[0].iov_base;
    434
    435        tosend_len -= len;
    436        out_sg->len = tosend_len;
    437    }
    438
    439    vu_dispatch_rdlock(qi->virtio_dev);
    440    pthread_mutex_lock(&qi->vq_lock);
    441    vu_queue_push(dev, q, elem, tosend_len);
    442    vu_queue_notify(dev, q);
    443    pthread_mutex_unlock(&qi->vq_lock);
    444    vu_dispatch_unlock(qi->virtio_dev);
    445    req->reply_sent = true;
    446    return 0;
    447}
    448
    449static __thread bool clone_fs_called;
    450
    451/* Process one FVRequest in a thread pool */
    452static void fv_queue_worker(gpointer data, gpointer user_data)
    453{
    454    struct fv_QueueInfo *qi = user_data;
    455    struct fuse_session *se = qi->virtio_dev->se;
    456    struct VuDev *dev = &qi->virtio_dev->dev;
    457    FVRequest *req = data;
    458    VuVirtqElement *elem = &req->elem;
    459    struct fuse_buf fbuf = {};
    460    bool allocated_bufv = false;
    461    struct fuse_bufvec bufv;
    462    struct fuse_bufvec *pbufv;
    463    struct fuse_in_header inh;
    464
    465    assert(se->bufsize > sizeof(struct fuse_in_header));
    466
    467    if (!clone_fs_called) {
    468        int ret;
    469
    470        /* unshare FS for xattr operation */
    471        ret = unshare(CLONE_FS);
    472        /* should not fail */
    473        assert(ret == 0);
    474
    475        clone_fs_called = true;
    476    }
    477
    478    /*
    479     * An element contains one request and the space to send our response
    480     * They're spread over multiple descriptors in a scatter/gather set
    481     * and we can't trust the guest to keep them still; so copy in/out.
    482     */
    483    fbuf.mem = g_malloc(se->bufsize);
    484
    485    fuse_mutex_init(&req->ch.lock);
    486    req->ch.fd = -1;
    487    req->ch.qi = qi;
    488
    489    /* The 'out' part of the elem is from qemu */
    490    unsigned int out_num = elem->out_num;
    491    struct iovec *out_sg = elem->out_sg;
    492    size_t out_len = iov_size(out_sg, out_num);
    493    fuse_log(FUSE_LOG_DEBUG,
    494             "%s: elem %d: with %d out desc of length %zd\n",
    495             __func__, elem->index, out_num, out_len);
    496
    497    /*
    498     * The elem should contain a 'fuse_in_header' (in to fuse)
    499     * plus the data based on the len in the header.
    500     */
    501    if (out_len < sizeof(struct fuse_in_header)) {
    502        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
    503                 __func__, elem->index);
    504        assert(0); /* TODO */
    505    }
    506    if (out_len > se->bufsize) {
    507        fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n", __func__,
    508                 elem->index);
    509        assert(0); /* TODO */
    510    }
    511    /* Copy just the fuse_in_header and look at it */
    512    copy_from_iov(&fbuf, out_num, out_sg,
    513                  sizeof(struct fuse_in_header));
    514    memcpy(&inh, fbuf.mem, sizeof(struct fuse_in_header));
    515
    516    pbufv = NULL; /* Compiler thinks an unitialised path */
    517    if (inh.opcode == FUSE_WRITE &&
    518        out_len >= (sizeof(struct fuse_in_header) +
    519                    sizeof(struct fuse_write_in))) {
    520        /*
    521         * For a write we don't actually need to copy the
    522         * data, we can just do it straight out of guest memory
    523         * but we must still copy the headers in case the guest
    524         * was nasty and changed them while we were using them.
    525         */
    526        fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
    527
    528        fbuf.size = copy_from_iov(&fbuf, out_num, out_sg,
    529                                  sizeof(struct fuse_in_header) +
    530                                  sizeof(struct fuse_write_in));
    531        /* That copy reread the in_header, make sure we use the original */
    532        memcpy(fbuf.mem, &inh, sizeof(struct fuse_in_header));
    533
    534        /* Allocate the bufv, with space for the rest of the iov */
    535        pbufv = g_try_malloc(sizeof(struct fuse_bufvec) +
    536                             sizeof(struct fuse_buf) * out_num);
    537        if (!pbufv) {
    538            fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
    539                    __func__);
    540            goto out;
    541        }
    542
    543        allocated_bufv = true;
    544        pbufv->count = 1;
    545        pbufv->buf[0] = fbuf;
    546
    547        size_t iovindex, pbufvindex, iov_bytes_skip;
    548        pbufvindex = 1; /* 2 headers, 1 fusebuf */
    549
    550        if (!skip_iov(out_sg, out_num,
    551                      sizeof(struct fuse_in_header) +
    552                      sizeof(struct fuse_write_in),
    553                      &iovindex, &iov_bytes_skip)) {
    554            fuse_log(FUSE_LOG_ERR, "%s: skip failed\n",
    555                    __func__);
    556            goto out;
    557        }
    558
    559        for (; iovindex < out_num; iovindex++, pbufvindex++) {
    560            pbufv->count++;
    561            pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
    562            pbufv->buf[pbufvindex].flags = 0;
    563            pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
    564            pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
    565
    566            if (iov_bytes_skip) {
    567                pbufv->buf[pbufvindex].mem += iov_bytes_skip;
    568                pbufv->buf[pbufvindex].size -= iov_bytes_skip;
    569                iov_bytes_skip = 0;
    570            }
    571        }
    572    } else {
    573        /* Normal (non fast write) path */
    574
    575        copy_from_iov(&fbuf, out_num, out_sg, se->bufsize);
    576        /* That copy reread the in_header, make sure we use the original */
    577        memcpy(fbuf.mem, &inh, sizeof(struct fuse_in_header));
    578        fbuf.size = out_len;
    579
    580        /* TODO! Endianness of header */
    581
    582        /* TODO: Add checks for fuse_session_exited */
    583        bufv.buf[0] = fbuf;
    584        bufv.count = 1;
    585        pbufv = &bufv;
    586    }
    587    pbufv->idx = 0;
    588    pbufv->off = 0;
    589    fuse_session_process_buf_int(se, pbufv, &req->ch);
    590
    591out:
    592    if (allocated_bufv) {
    593        g_free(pbufv);
    594    }
    595
    596    /* If the request has no reply, still recycle the virtqueue element */
    597    if (!req->reply_sent) {
    598        struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
    599
    600        fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n", __func__,
    601                 elem->index);
    602
    603        vu_dispatch_rdlock(qi->virtio_dev);
    604        pthread_mutex_lock(&qi->vq_lock);
    605        vu_queue_push(dev, q, elem, 0);
    606        vu_queue_notify(dev, q);
    607        pthread_mutex_unlock(&qi->vq_lock);
    608        vu_dispatch_unlock(qi->virtio_dev);
    609    }
    610
    611    pthread_mutex_destroy(&req->ch.lock);
    612    g_free(fbuf.mem);
    613    free(req);
    614}
    615
    616/* Thread function for individual queues, created when a queue is 'started' */
    617static void *fv_queue_thread(void *opaque)
    618{
    619    struct fv_QueueInfo *qi = opaque;
    620    struct VuDev *dev = &qi->virtio_dev->dev;
    621    struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
    622    struct fuse_session *se = qi->virtio_dev->se;
    623    GThreadPool *pool = NULL;
    624    GList *req_list = NULL;
    625
    626    if (se->thread_pool_size) {
    627        fuse_log(FUSE_LOG_DEBUG, "%s: Creating thread pool for Queue %d\n",
    628                 __func__, qi->qidx);
    629        pool = g_thread_pool_new(fv_queue_worker, qi, se->thread_pool_size,
    630                                 FALSE, NULL);
    631        if (!pool) {
    632            fuse_log(FUSE_LOG_ERR, "%s: g_thread_pool_new failed\n", __func__);
    633            return NULL;
    634        }
    635    }
    636
    637    fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__,
    638             qi->qidx, qi->kick_fd);
    639    while (1) {
    640        struct pollfd pf[2];
    641
    642        pf[0].fd = qi->kick_fd;
    643        pf[0].events = POLLIN;
    644        pf[0].revents = 0;
    645        pf[1].fd = qi->kill_fd;
    646        pf[1].events = POLLIN;
    647        pf[1].revents = 0;
    648
    649        fuse_log(FUSE_LOG_DEBUG, "%s: Waiting for Queue %d event\n", __func__,
    650                 qi->qidx);
    651        int poll_res = ppoll(pf, 2, NULL, NULL);
    652
    653        if (poll_res == -1) {
    654            if (errno == EINTR) {
    655                fuse_log(FUSE_LOG_INFO, "%s: ppoll interrupted, going around\n",
    656                         __func__);
    657                continue;
    658            }
    659            fuse_log(FUSE_LOG_ERR, "fv_queue_thread ppoll: %m\n");
    660            break;
    661        }
    662        assert(poll_res >= 1);
    663        if (pf[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
    664            fuse_log(FUSE_LOG_ERR, "%s: Unexpected poll revents %x Queue %d\n",
    665                     __func__, pf[0].revents, qi->qidx);
    666            break;
    667        }
    668        if (pf[1].revents & (POLLERR | POLLHUP | POLLNVAL)) {
    669            fuse_log(FUSE_LOG_ERR,
    670                     "%s: Unexpected poll revents %x Queue %d killfd\n",
    671                     __func__, pf[1].revents, qi->qidx);
    672            break;
    673        }
    674        if (pf[1].revents) {
    675            fuse_log(FUSE_LOG_INFO, "%s: kill event on queue %d - quitting\n",
    676                     __func__, qi->qidx);
    677            break;
    678        }
    679        assert(pf[0].revents & POLLIN);
    680        fuse_log(FUSE_LOG_DEBUG, "%s: Got queue event on Queue %d\n", __func__,
    681                 qi->qidx);
    682
    683        eventfd_t evalue;
    684        if (eventfd_read(qi->kick_fd, &evalue)) {
    685            fuse_log(FUSE_LOG_ERR, "Eventfd_read for queue: %m\n");
    686            break;
    687        }
    688        /* Mutual exclusion with virtio_loop() */
    689        vu_dispatch_rdlock(qi->virtio_dev);
    690        pthread_mutex_lock(&qi->vq_lock);
    691        /* out is from guest, in is too guest */
    692        unsigned int in_bytes, out_bytes;
    693        vu_queue_get_avail_bytes(dev, q, &in_bytes, &out_bytes, ~0, ~0);
    694
    695        fuse_log(FUSE_LOG_DEBUG,
    696                 "%s: Queue %d gave evalue: %zx available: in: %u out: %u\n",
    697                 __func__, qi->qidx, (size_t)evalue, in_bytes, out_bytes);
    698
    699        while (1) {
    700            FVRequest *req = vu_queue_pop(dev, q, sizeof(FVRequest));
    701            if (!req) {
    702                break;
    703            }
    704
    705            req->reply_sent = false;
    706
    707            if (!se->thread_pool_size) {
    708                req_list = g_list_prepend(req_list, req);
    709            } else {
    710                g_thread_pool_push(pool, req, NULL);
    711            }
    712        }
    713
    714        pthread_mutex_unlock(&qi->vq_lock);
    715        vu_dispatch_unlock(qi->virtio_dev);
    716
    717        /* Process all the requests. */
    718        if (!se->thread_pool_size && req_list != NULL) {
    719            req_list = g_list_reverse(req_list);
    720            g_list_foreach(req_list, fv_queue_worker, qi);
    721            g_list_free(req_list);
    722            req_list = NULL;
    723        }
    724    }
    725
    726    if (pool) {
    727        g_thread_pool_free(pool, FALSE, TRUE);
    728    }
    729
    730    return NULL;
    731}
    732
    733static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx)
    734{
    735    int ret;
    736    struct fv_QueueInfo *ourqi;
    737
    738    assert(qidx < vud->nqueues);
    739    ourqi = vud->qi[qidx];
    740
    741    /* Kill the thread */
    742    if (eventfd_write(ourqi->kill_fd, 1)) {
    743        fuse_log(FUSE_LOG_ERR, "Eventfd_write for queue %d: %s\n",
    744                 qidx, strerror(errno));
    745    }
    746    ret = pthread_join(ourqi->thread, NULL);
    747    if (ret) {
    748        fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err %d\n",
    749                 __func__, qidx, ret);
    750    }
    751    pthread_mutex_destroy(&ourqi->vq_lock);
    752    close(ourqi->kill_fd);
    753    ourqi->kick_fd = -1;
    754    g_free(vud->qi[qidx]);
    755    vud->qi[qidx] = NULL;
    756}
    757
    758/* Callback from libvhost-user on start or stop of a queue */
    759static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
    760{
    761    struct fv_VuDev *vud = container_of(dev, struct fv_VuDev, dev);
    762    struct fv_QueueInfo *ourqi;
    763
    764    fuse_log(FUSE_LOG_INFO, "%s: qidx=%d started=%d\n", __func__, qidx,
    765             started);
    766    assert(qidx >= 0);
    767
    768    /*
    769     * Ignore additional request queues for now.  passthrough_ll.c must be
    770     * audited for thread-safety issues first.  It was written with a
    771     * well-behaved client in mind and may not protect against all types of
    772     * races yet.
    773     */
    774    if (qidx > 1) {
    775        fuse_log(FUSE_LOG_ERR,
    776                 "%s: multiple request queues not yet implemented, please only "
    777                 "configure 1 request queue\n",
    778                 __func__);
    779        exit(EXIT_FAILURE);
    780    }
    781
    782    if (started) {
    783        /* Fire up a thread to watch this queue */
    784        if (qidx >= vud->nqueues) {
    785            vud->qi = g_realloc_n(vud->qi, qidx + 1, sizeof(vud->qi[0]));
    786            memset(vud->qi + vud->nqueues, 0,
    787                   sizeof(vud->qi[0]) * (1 + (qidx - vud->nqueues)));
    788            vud->nqueues = qidx + 1;
    789        }
    790        if (!vud->qi[qidx]) {
    791            vud->qi[qidx] = g_new0(struct fv_QueueInfo, 1);
    792            vud->qi[qidx]->virtio_dev = vud;
    793            vud->qi[qidx]->qidx = qidx;
    794        } else {
    795            /* Shouldn't have been started */
    796            assert(vud->qi[qidx]->kick_fd == -1);
    797        }
    798        ourqi = vud->qi[qidx];
    799        ourqi->kick_fd = dev->vq[qidx].kick_fd;
    800
    801        ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
    802        assert(ourqi->kill_fd != -1);
    803        pthread_mutex_init(&ourqi->vq_lock, NULL);
    804
    805        if (pthread_create(&ourqi->thread, NULL, fv_queue_thread, ourqi)) {
    806            fuse_log(FUSE_LOG_ERR, "%s: Failed to create thread for queue %d\n",
    807                     __func__, qidx);
    808            assert(0);
    809        }
    810    } else {
    811        /*
    812         * Temporarily drop write-lock taken in virtio_loop() so that
    813         * the queue thread doesn't block in virtio_send_msg().
    814         */
    815        vu_dispatch_unlock(vud);
    816        fv_queue_cleanup_thread(vud, qidx);
    817        vu_dispatch_wrlock(vud);
    818    }
    819}
    820
    821static bool fv_queue_order(VuDev *dev, int qidx)
    822{
    823    return false;
    824}
    825
    826static const VuDevIface fv_iface = {
    827    .get_features = fv_get_features,
    828    .set_features = fv_set_features,
    829
    830    /* Don't need process message, we've not got any at vhost-user level */
    831    .queue_set_started = fv_queue_set_started,
    832
    833    .queue_is_processed_in_order = fv_queue_order,
    834};
    835
    836/*
    837 * Main loop; this mostly deals with events on the vhost-user
    838 * socket itself, and not actual fuse data.
    839 */
    840int virtio_loop(struct fuse_session *se)
    841{
    842    fuse_log(FUSE_LOG_INFO, "%s: Entry\n", __func__);
    843
    844    while (!fuse_session_exited(se)) {
    845        struct pollfd pf[1];
    846        bool ok;
    847        pf[0].fd = se->vu_socketfd;
    848        pf[0].events = POLLIN;
    849        pf[0].revents = 0;
    850
    851        fuse_log(FUSE_LOG_DEBUG, "%s: Waiting for VU event\n", __func__);
    852        int poll_res = ppoll(pf, 1, NULL, NULL);
    853
    854        if (poll_res == -1) {
    855            if (errno == EINTR) {
    856                fuse_log(FUSE_LOG_INFO, "%s: ppoll interrupted, going around\n",
    857                         __func__);
    858                continue;
    859            }
    860            fuse_log(FUSE_LOG_ERR, "virtio_loop ppoll: %m\n");
    861            break;
    862        }
    863        assert(poll_res == 1);
    864        if (pf[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
    865            fuse_log(FUSE_LOG_ERR, "%s: Unexpected poll revents %x\n", __func__,
    866                     pf[0].revents);
    867            break;
    868        }
    869        assert(pf[0].revents & POLLIN);
    870        fuse_log(FUSE_LOG_DEBUG, "%s: Got VU event\n", __func__);
    871        /* Mutual exclusion with fv_queue_thread() */
    872        vu_dispatch_wrlock(se->virtio_dev);
    873
    874        ok = vu_dispatch(&se->virtio_dev->dev);
    875
    876        vu_dispatch_unlock(se->virtio_dev);
    877
    878        if (!ok) {
    879            fuse_log(FUSE_LOG_ERR, "%s: vu_dispatch failed\n", __func__);
    880            break;
    881        }
    882    }
    883
    884    /*
    885     * Make sure all fv_queue_thread()s quit on exit, as we're about to
    886     * free virtio dev and fuse session, no one should access them anymore.
    887     */
    888    for (int i = 0; i < se->virtio_dev->nqueues; i++) {
    889        if (!se->virtio_dev->qi[i]) {
    890            continue;
    891        }
    892
    893        fuse_log(FUSE_LOG_INFO, "%s: Stopping queue %d thread\n", __func__, i);
    894        fv_queue_cleanup_thread(se->virtio_dev, i);
    895    }
    896
    897    fuse_log(FUSE_LOG_INFO, "%s: Exit\n", __func__);
    898
    899    return 0;
    900}
    901
    902static void strreplace(char *s, char old, char new)
    903{
    904    for (; *s; ++s) {
    905        if (*s == old) {
    906            *s = new;
    907        }
    908    }
    909}
    910
    911static bool fv_socket_lock(struct fuse_session *se)
    912{
    913    g_autofree gchar *sk_name = NULL;
    914    g_autofree gchar *pidfile = NULL;
    915    g_autofree gchar *dir = NULL;
    916    Error *local_err = NULL;
    917
    918    dir = qemu_get_local_state_pathname("run/virtiofsd");
    919
    920    if (g_mkdir_with_parents(dir, S_IRWXU) < 0) {
    921        fuse_log(FUSE_LOG_ERR, "%s: Failed to create directory %s: %s\n",
    922                 __func__, dir, strerror(errno));
    923        return false;
    924    }
    925
    926    sk_name = g_strdup(se->vu_socket_path);
    927    strreplace(sk_name, '/', '.');
    928    pidfile = g_strdup_printf("%s/%s.pid", dir, sk_name);
    929
    930    if (!qemu_write_pidfile(pidfile, &local_err)) {
    931        error_report_err(local_err);
    932        return false;
    933    }
    934
    935    return true;
    936}
    937
    938static int fv_create_listen_socket(struct fuse_session *se)
    939{
    940    struct sockaddr_un un;
    941    mode_t old_umask;
    942
    943    /* Nothing to do if fd is already initialized */
    944    if (se->vu_listen_fd >= 0) {
    945        return 0;
    946    }
    947
    948    if (strlen(se->vu_socket_path) >= sizeof(un.sun_path)) {
    949        fuse_log(FUSE_LOG_ERR, "Socket path too long\n");
    950        return -1;
    951    }
    952
    953    if (!strlen(se->vu_socket_path)) {
    954        fuse_log(FUSE_LOG_ERR, "Socket path is empty\n");
    955        return -1;
    956    }
    957
    958    /* Check the vu_socket_path is already used */
    959    if (!fv_socket_lock(se)) {
    960        return -1;
    961    }
    962
    963    /*
    964     * Create the Unix socket to communicate with qemu
    965     * based on QEMU's vhost-user-bridge
    966     */
    967    unlink(se->vu_socket_path);
    968    strcpy(un.sun_path, se->vu_socket_path);
    969    size_t addr_len = sizeof(un);
    970
    971    int listen_sock = socket(AF_UNIX, SOCK_STREAM, 0);
    972    if (listen_sock == -1) {
    973        fuse_log(FUSE_LOG_ERR, "vhost socket creation: %m\n");
    974        return -1;
    975    }
    976    un.sun_family = AF_UNIX;
    977
    978    /*
    979     * Unfortunately bind doesn't let you set the mask on the socket,
    980     * so set umask appropriately and restore it later.
    981     */
    982    if (se->vu_socket_group) {
    983        old_umask = umask(S_IROTH | S_IWOTH | S_IXOTH);
    984    } else {
    985        old_umask = umask(S_IRGRP | S_IWGRP | S_IXGRP |
    986                          S_IROTH | S_IWOTH | S_IXOTH);
    987    }
    988    if (bind(listen_sock, (struct sockaddr *)&un, addr_len) == -1) {
    989        fuse_log(FUSE_LOG_ERR, "vhost socket bind: %m\n");
    990        close(listen_sock);
    991        umask(old_umask);
    992        return -1;
    993    }
    994    if (se->vu_socket_group) {
    995        struct group *g = getgrnam(se->vu_socket_group);
    996        if (g) {
    997            if (chown(se->vu_socket_path, -1, g->gr_gid) == -1) {
    998                fuse_log(FUSE_LOG_WARNING,
    999                         "vhost socket failed to set group to %s (%d): %m\n",
   1000                         se->vu_socket_group, g->gr_gid);
   1001            }
   1002        }
   1003    }
   1004    umask(old_umask);
   1005
   1006    if (listen(listen_sock, 1) == -1) {
   1007        fuse_log(FUSE_LOG_ERR, "vhost socket listen: %m\n");
   1008        close(listen_sock);
   1009        return -1;
   1010    }
   1011
   1012    se->vu_listen_fd = listen_sock;
   1013    return 0;
   1014}
   1015
   1016int virtio_session_mount(struct fuse_session *se)
   1017{
   1018    int ret;
   1019
   1020    /*
   1021     * Test that unshare(CLONE_FS) works. fv_queue_worker() will need it. It's
   1022     * an unprivileged system call but some Docker/Moby versions are known to
   1023     * reject it via seccomp when CAP_SYS_ADMIN is not given.
   1024     *
   1025     * Note that the program is single-threaded here so this syscall has no
   1026     * visible effect and is safe to make.
   1027     */
   1028    ret = unshare(CLONE_FS);
   1029    if (ret == -1 && errno == EPERM) {
   1030        fuse_log(FUSE_LOG_ERR, "unshare(CLONE_FS) failed with EPERM. If "
   1031                "running in a container please check that the container "
   1032                "runtime seccomp policy allows unshare.\n");
   1033        return -1;
   1034    }
   1035
   1036    ret = fv_create_listen_socket(se);
   1037    if (ret < 0) {
   1038        return ret;
   1039    }
   1040
   1041    se->fd = -1;
   1042
   1043    fuse_log(FUSE_LOG_INFO, "%s: Waiting for vhost-user socket connection...\n",
   1044             __func__);
   1045    int data_sock = accept(se->vu_listen_fd, NULL, NULL);
   1046    if (data_sock == -1) {
   1047        fuse_log(FUSE_LOG_ERR, "vhost socket accept: %m\n");
   1048        close(se->vu_listen_fd);
   1049        return -1;
   1050    }
   1051    close(se->vu_listen_fd);
   1052    se->vu_listen_fd = -1;
   1053    fuse_log(FUSE_LOG_INFO, "%s: Received vhost-user socket connection\n",
   1054             __func__);
   1055
   1056    /* TODO: Some cleanup/deallocation! */
   1057    se->virtio_dev = g_new0(struct fv_VuDev, 1);
   1058
   1059    se->vu_socketfd = data_sock;
   1060    se->virtio_dev->se = se;
   1061    pthread_rwlock_init(&se->virtio_dev->vu_dispatch_rwlock, NULL);
   1062    if (!vu_init(&se->virtio_dev->dev, 2, se->vu_socketfd, fv_panic, NULL,
   1063                 fv_set_watch, fv_remove_watch, &fv_iface)) {
   1064        fuse_log(FUSE_LOG_ERR, "%s: vu_init failed\n", __func__);
   1065        return -1;
   1066    }
   1067
   1068    return 0;
   1069}
   1070
   1071void virtio_session_close(struct fuse_session *se)
   1072{
   1073    close(se->vu_socketfd);
   1074
   1075    if (!se->virtio_dev) {
   1076        return;
   1077    }
   1078
   1079    g_free(se->virtio_dev->qi);
   1080    pthread_rwlock_destroy(&se->virtio_dev->vu_dispatch_rwlock);
   1081    g_free(se->virtio_dev);
   1082    se->virtio_dev = NULL;
   1083}