cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

io_uring.c (13060B)


      1/*
      2 * Linux io_uring support.
      3 *
      4 * Copyright (C) 2009 IBM, Corp.
      5 * Copyright (C) 2009 Red Hat, Inc.
      6 * Copyright (C) 2019 Aarushi Mehta
      7 *
      8 * This work is licensed under the terms of the GNU GPL, version 2 or later.
      9 * See the COPYING file in the top-level directory.
     10 */
     11#include "qemu/osdep.h"
     12#include <liburing.h>
     13#include "qemu-common.h"
     14#include "block/aio.h"
     15#include "qemu/queue.h"
     16#include "block/block.h"
     17#include "block/raw-aio.h"
     18#include "qemu/coroutine.h"
     19#include "qapi/error.h"
     20#include "trace.h"
     21
     22/* io_uring ring size */
     23#define MAX_ENTRIES 128
     24
     25typedef struct LuringAIOCB {
     26    Coroutine *co;
     27    struct io_uring_sqe sqeq;
     28    ssize_t ret;
     29    QEMUIOVector *qiov;
     30    bool is_read;
     31    QSIMPLEQ_ENTRY(LuringAIOCB) next;
     32
     33    /*
     34     * Buffered reads may require resubmission, see
     35     * luring_resubmit_short_read().
     36     */
     37    int total_read;
     38    QEMUIOVector resubmit_qiov;
     39} LuringAIOCB;
     40
     41typedef struct LuringQueue {
     42    int plugged;
     43    unsigned int in_queue;
     44    unsigned int in_flight;
     45    bool blocked;
     46    QSIMPLEQ_HEAD(, LuringAIOCB) submit_queue;
     47} LuringQueue;
     48
     49typedef struct LuringState {
     50    AioContext *aio_context;
     51
     52    struct io_uring ring;
     53
     54    /* io queue for submit at batch.  Protected by AioContext lock. */
     55    LuringQueue io_q;
     56
     57    /* I/O completion processing.  Only runs in I/O thread.  */
     58    QEMUBH *completion_bh;
     59} LuringState;
     60
     61/**
     62 * luring_resubmit:
     63 *
     64 * Resubmit a request by appending it to submit_queue.  The caller must ensure
     65 * that ioq_submit() is called later so that submit_queue requests are started.
     66 */
     67static void luring_resubmit(LuringState *s, LuringAIOCB *luringcb)
     68{
     69    QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next);
     70    s->io_q.in_queue++;
     71}
     72
     73/**
     74 * luring_resubmit_short_read:
     75 *
     76 * Before Linux commit 9d93a3f5a0c ("io_uring: punt short reads to async
     77 * context") a buffered I/O request with the start of the file range in the
     78 * page cache could result in a short read.  Applications need to resubmit the
     79 * remaining read request.
     80 *
     81 * This is a slow path but recent kernels never take it.
     82 */
     83static void luring_resubmit_short_read(LuringState *s, LuringAIOCB *luringcb,
     84                                       int nread)
     85{
     86    QEMUIOVector *resubmit_qiov;
     87    size_t remaining;
     88
     89    trace_luring_resubmit_short_read(s, luringcb, nread);
     90
     91    /* Update read position */
     92    luringcb->total_read = nread;
     93    remaining = luringcb->qiov->size - luringcb->total_read;
     94
     95    /* Shorten qiov */
     96    resubmit_qiov = &luringcb->resubmit_qiov;
     97    if (resubmit_qiov->iov == NULL) {
     98        qemu_iovec_init(resubmit_qiov, luringcb->qiov->niov);
     99    } else {
    100        qemu_iovec_reset(resubmit_qiov);
    101    }
    102    qemu_iovec_concat(resubmit_qiov, luringcb->qiov, luringcb->total_read,
    103                      remaining);
    104
    105    /* Update sqe */
    106    luringcb->sqeq.off = nread;
    107    luringcb->sqeq.addr = (__u64)(uintptr_t)luringcb->resubmit_qiov.iov;
    108    luringcb->sqeq.len = luringcb->resubmit_qiov.niov;
    109
    110    luring_resubmit(s, luringcb);
    111}
    112
    113/**
    114 * luring_process_completions:
    115 * @s: AIO state
    116 *
    117 * Fetches completed I/O requests, consumes cqes and invokes their callbacks
    118 * The function is somewhat tricky because it supports nested event loops, for
    119 * example when a request callback invokes aio_poll().
    120 *
    121 * Function schedules BH completion so it  can be called again in a nested
    122 * event loop.  When there are no events left  to complete the BH is being
    123 * canceled.
    124 *
    125 */
    126static void luring_process_completions(LuringState *s)
    127{
    128    struct io_uring_cqe *cqes;
    129    int total_bytes;
    130    /*
    131     * Request completion callbacks can run the nested event loop.
    132     * Schedule ourselves so the nested event loop will "see" remaining
    133     * completed requests and process them.  Without this, completion
    134     * callbacks that wait for other requests using a nested event loop
    135     * would hang forever.
    136     *
    137     * This workaround is needed because io_uring uses poll_wait, which
    138     * is woken up when new events are added to the uring, thus polling on
    139     * the same uring fd will block unless more events are received.
    140     *
    141     * Other leaf block drivers (drivers that access the data themselves)
    142     * are networking based, so they poll sockets for data and run the
    143     * correct coroutine.
    144     */
    145    qemu_bh_schedule(s->completion_bh);
    146
    147    while (io_uring_peek_cqe(&s->ring, &cqes) == 0) {
    148        LuringAIOCB *luringcb;
    149        int ret;
    150
    151        if (!cqes) {
    152            break;
    153        }
    154
    155        luringcb = io_uring_cqe_get_data(cqes);
    156        ret = cqes->res;
    157        io_uring_cqe_seen(&s->ring, cqes);
    158        cqes = NULL;
    159
    160        /* Change counters one-by-one because we can be nested. */
    161        s->io_q.in_flight--;
    162        trace_luring_process_completion(s, luringcb, ret);
    163
    164        /* total_read is non-zero only for resubmitted read requests */
    165        total_bytes = ret + luringcb->total_read;
    166
    167        if (ret < 0) {
    168            /*
    169             * Only writev/readv/fsync requests on regular files or host block
    170             * devices are submitted. Therefore -EAGAIN is not expected but it's
    171             * known to happen sometimes with Linux SCSI. Submit again and hope
    172             * the request completes successfully.
    173             *
    174             * For more information, see:
    175             * https://lore.kernel.org/io-uring/20210727165811.284510-3-axboe@kernel.dk/T/#u
    176             *
    177             * If the code is changed to submit other types of requests in the
    178             * future, then this workaround may need to be extended to deal with
    179             * genuine -EAGAIN results that should not be resubmitted
    180             * immediately.
    181             */
    182            if (ret == -EINTR || ret == -EAGAIN) {
    183                luring_resubmit(s, luringcb);
    184                continue;
    185            }
    186        } else if (!luringcb->qiov) {
    187            goto end;
    188        } else if (total_bytes == luringcb->qiov->size) {
    189            ret = 0;
    190        /* Only read/write */
    191        } else {
    192            /* Short Read/Write */
    193            if (luringcb->is_read) {
    194                if (ret > 0) {
    195                    luring_resubmit_short_read(s, luringcb, ret);
    196                    continue;
    197                } else {
    198                    /* Pad with zeroes */
    199                    qemu_iovec_memset(luringcb->qiov, total_bytes, 0,
    200                                      luringcb->qiov->size - total_bytes);
    201                    ret = 0;
    202                }
    203            } else {
    204                ret = -ENOSPC;
    205            }
    206        }
    207end:
    208        luringcb->ret = ret;
    209        qemu_iovec_destroy(&luringcb->resubmit_qiov);
    210
    211        /*
    212         * If the coroutine is already entered it must be in ioq_submit()
    213         * and will notice luringcb->ret has been filled in when it
    214         * eventually runs later. Coroutines cannot be entered recursively
    215         * so avoid doing that!
    216         */
    217        if (!qemu_coroutine_entered(luringcb->co)) {
    218            aio_co_wake(luringcb->co);
    219        }
    220    }
    221    qemu_bh_cancel(s->completion_bh);
    222}
    223
    224static int ioq_submit(LuringState *s)
    225{
    226    int ret = 0;
    227    LuringAIOCB *luringcb, *luringcb_next;
    228
    229    while (s->io_q.in_queue > 0) {
    230        /*
    231         * Try to fetch sqes from the ring for requests waiting in
    232         * the overflow queue
    233         */
    234        QSIMPLEQ_FOREACH_SAFE(luringcb, &s->io_q.submit_queue, next,
    235                              luringcb_next) {
    236            struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
    237            if (!sqes) {
    238                break;
    239            }
    240            /* Prep sqe for submission */
    241            *sqes = luringcb->sqeq;
    242            QSIMPLEQ_REMOVE_HEAD(&s->io_q.submit_queue, next);
    243        }
    244        ret = io_uring_submit(&s->ring);
    245        trace_luring_io_uring_submit(s, ret);
    246        /* Prevent infinite loop if submission is refused */
    247        if (ret <= 0) {
    248            if (ret == -EAGAIN || ret == -EINTR) {
    249                continue;
    250            }
    251            break;
    252        }
    253        s->io_q.in_flight += ret;
    254        s->io_q.in_queue  -= ret;
    255    }
    256    s->io_q.blocked = (s->io_q.in_queue > 0);
    257
    258    if (s->io_q.in_flight) {
    259        /*
    260         * We can try to complete something just right away if there are
    261         * still requests in-flight.
    262         */
    263        luring_process_completions(s);
    264    }
    265    return ret;
    266}
    267
    268static void luring_process_completions_and_submit(LuringState *s)
    269{
    270    aio_context_acquire(s->aio_context);
    271    luring_process_completions(s);
    272
    273    if (!s->io_q.plugged && s->io_q.in_queue > 0) {
    274        ioq_submit(s);
    275    }
    276    aio_context_release(s->aio_context);
    277}
    278
    279static void qemu_luring_completion_bh(void *opaque)
    280{
    281    LuringState *s = opaque;
    282    luring_process_completions_and_submit(s);
    283}
    284
    285static void qemu_luring_completion_cb(void *opaque)
    286{
    287    LuringState *s = opaque;
    288    luring_process_completions_and_submit(s);
    289}
    290
    291static bool qemu_luring_poll_cb(void *opaque)
    292{
    293    LuringState *s = opaque;
    294
    295    if (io_uring_cq_ready(&s->ring)) {
    296        luring_process_completions_and_submit(s);
    297        return true;
    298    }
    299
    300    return false;
    301}
    302
    303static void ioq_init(LuringQueue *io_q)
    304{
    305    QSIMPLEQ_INIT(&io_q->submit_queue);
    306    io_q->plugged = 0;
    307    io_q->in_queue = 0;
    308    io_q->in_flight = 0;
    309    io_q->blocked = false;
    310}
    311
    312void luring_io_plug(BlockDriverState *bs, LuringState *s)
    313{
    314    trace_luring_io_plug(s);
    315    s->io_q.plugged++;
    316}
    317
    318void luring_io_unplug(BlockDriverState *bs, LuringState *s)
    319{
    320    assert(s->io_q.plugged);
    321    trace_luring_io_unplug(s, s->io_q.blocked, s->io_q.plugged,
    322                           s->io_q.in_queue, s->io_q.in_flight);
    323    if (--s->io_q.plugged == 0 &&
    324        !s->io_q.blocked && s->io_q.in_queue > 0) {
    325        ioq_submit(s);
    326    }
    327}
    328
    329/**
    330 * luring_do_submit:
    331 * @fd: file descriptor for I/O
    332 * @luringcb: AIO control block
    333 * @s: AIO state
    334 * @offset: offset for request
    335 * @type: type of request
    336 *
    337 * Fetches sqes from ring, adds to pending queue and preps them
    338 *
    339 */
    340static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s,
    341                            uint64_t offset, int type)
    342{
    343    int ret;
    344    struct io_uring_sqe *sqes = &luringcb->sqeq;
    345
    346    switch (type) {
    347    case QEMU_AIO_WRITE:
    348        io_uring_prep_writev(sqes, fd, luringcb->qiov->iov,
    349                             luringcb->qiov->niov, offset);
    350        break;
    351    case QEMU_AIO_READ:
    352        io_uring_prep_readv(sqes, fd, luringcb->qiov->iov,
    353                            luringcb->qiov->niov, offset);
    354        break;
    355    case QEMU_AIO_FLUSH:
    356        io_uring_prep_fsync(sqes, fd, IORING_FSYNC_DATASYNC);
    357        break;
    358    default:
    359        fprintf(stderr, "%s: invalid AIO request type, aborting 0x%x.\n",
    360                        __func__, type);
    361        abort();
    362    }
    363    io_uring_sqe_set_data(sqes, luringcb);
    364
    365    QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next);
    366    s->io_q.in_queue++;
    367    trace_luring_do_submit(s, s->io_q.blocked, s->io_q.plugged,
    368                           s->io_q.in_queue, s->io_q.in_flight);
    369    if (!s->io_q.blocked &&
    370        (!s->io_q.plugged ||
    371         s->io_q.in_flight + s->io_q.in_queue >= MAX_ENTRIES)) {
    372        ret = ioq_submit(s);
    373        trace_luring_do_submit_done(s, ret);
    374        return ret;
    375    }
    376    return 0;
    377}
    378
    379int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd,
    380                                  uint64_t offset, QEMUIOVector *qiov, int type)
    381{
    382    int ret;
    383    LuringAIOCB luringcb = {
    384        .co         = qemu_coroutine_self(),
    385        .ret        = -EINPROGRESS,
    386        .qiov       = qiov,
    387        .is_read    = (type == QEMU_AIO_READ),
    388    };
    389    trace_luring_co_submit(bs, s, &luringcb, fd, offset, qiov ? qiov->size : 0,
    390                           type);
    391    ret = luring_do_submit(fd, &luringcb, s, offset, type);
    392
    393    if (ret < 0) {
    394        return ret;
    395    }
    396
    397    if (luringcb.ret == -EINPROGRESS) {
    398        qemu_coroutine_yield();
    399    }
    400    return luringcb.ret;
    401}
    402
    403void luring_detach_aio_context(LuringState *s, AioContext *old_context)
    404{
    405    aio_set_fd_handler(old_context, s->ring.ring_fd, false, NULL, NULL, NULL,
    406                       s);
    407    qemu_bh_delete(s->completion_bh);
    408    s->aio_context = NULL;
    409}
    410
    411void luring_attach_aio_context(LuringState *s, AioContext *new_context)
    412{
    413    s->aio_context = new_context;
    414    s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s);
    415    aio_set_fd_handler(s->aio_context, s->ring.ring_fd, false,
    416                       qemu_luring_completion_cb, NULL, qemu_luring_poll_cb, s);
    417}
    418
    419LuringState *luring_init(Error **errp)
    420{
    421    int rc;
    422    LuringState *s = g_new0(LuringState, 1);
    423    struct io_uring *ring = &s->ring;
    424
    425    trace_luring_init_state(s, sizeof(*s));
    426
    427    rc = io_uring_queue_init(MAX_ENTRIES, ring, 0);
    428    if (rc < 0) {
    429        error_setg_errno(errp, errno, "failed to init linux io_uring ring");
    430        g_free(s);
    431        return NULL;
    432    }
    433
    434    ioq_init(&s->io_q);
    435    return s;
    436
    437}
    438
    439void luring_cleanup(LuringState *s)
    440{
    441    io_uring_queue_exit(&s->ring);
    442    trace_luring_cleanup_state(s);
    443    g_free(s);
    444}