cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

thread-pool.c (9591B)


      1/*
      2 * QEMU block layer thread pool
      3 *
      4 * Copyright IBM, Corp. 2008
      5 * Copyright Red Hat, Inc. 2012
      6 *
      7 * Authors:
      8 *  Anthony Liguori   <aliguori@us.ibm.com>
      9 *  Paolo Bonzini     <pbonzini@redhat.com>
     10 *
     11 * This work is licensed under the terms of the GNU GPL, version 2.  See
     12 * the COPYING file in the top-level directory.
     13 *
     14 * Contributions after 2012-01-13 are licensed under the terms of the
     15 * GNU GPL, version 2 or (at your option) any later version.
     16 */
     17#include "qemu/osdep.h"
     18#include "qemu/queue.h"
     19#include "qemu/thread.h"
     20#include "qemu/coroutine.h"
     21#include "trace.h"
     22#include "block/thread-pool.h"
     23#include "qemu/main-loop.h"
     24
     25static void do_spawn_thread(ThreadPool *pool);
     26
     27typedef struct ThreadPoolElement ThreadPoolElement;
     28
     29enum ThreadState {
     30    THREAD_QUEUED,
     31    THREAD_ACTIVE,
     32    THREAD_DONE,
     33};
     34
     35struct ThreadPoolElement {
     36    BlockAIOCB common;
     37    ThreadPool *pool;
     38    ThreadPoolFunc *func;
     39    void *arg;
     40
     41    /* Moving state out of THREAD_QUEUED is protected by lock.  After
     42     * that, only the worker thread can write to it.  Reads and writes
     43     * of state and ret are ordered with memory barriers.
     44     */
     45    enum ThreadState state;
     46    int ret;
     47
     48    /* Access to this list is protected by lock.  */
     49    QTAILQ_ENTRY(ThreadPoolElement) reqs;
     50
     51    /* Access to this list is protected by the global mutex.  */
     52    QLIST_ENTRY(ThreadPoolElement) all;
     53};
     54
     55struct ThreadPool {
     56    AioContext *ctx;
     57    QEMUBH *completion_bh;
     58    QemuMutex lock;
     59    QemuCond worker_stopped;
     60    QemuSemaphore sem;
     61    int max_threads;
     62    QEMUBH *new_thread_bh;
     63
     64    /* The following variables are only accessed from one AioContext. */
     65    QLIST_HEAD(, ThreadPoolElement) head;
     66
     67    /* The following variables are protected by lock.  */
     68    QTAILQ_HEAD(, ThreadPoolElement) request_list;
     69    int cur_threads;
     70    int idle_threads;
     71    int new_threads;     /* backlog of threads we need to create */
     72    int pending_threads; /* threads created but not running yet */
     73    bool stopping;
     74};
     75
     76static void *worker_thread(void *opaque)
     77{
     78    ThreadPool *pool = opaque;
     79
     80    qemu_mutex_lock(&pool->lock);
     81    pool->pending_threads--;
     82    do_spawn_thread(pool);
     83
     84    while (!pool->stopping) {
     85        ThreadPoolElement *req;
     86        int ret;
     87
     88        do {
     89            pool->idle_threads++;
     90            qemu_mutex_unlock(&pool->lock);
     91            ret = qemu_sem_timedwait(&pool->sem, 10000);
     92            qemu_mutex_lock(&pool->lock);
     93            pool->idle_threads--;
     94        } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
     95        if (ret == -1 || pool->stopping) {
     96            break;
     97        }
     98
     99        req = QTAILQ_FIRST(&pool->request_list);
    100        QTAILQ_REMOVE(&pool->request_list, req, reqs);
    101        req->state = THREAD_ACTIVE;
    102        qemu_mutex_unlock(&pool->lock);
    103
    104        ret = req->func(req->arg);
    105
    106        req->ret = ret;
    107        /* Write ret before state.  */
    108        smp_wmb();
    109        req->state = THREAD_DONE;
    110
    111        qemu_mutex_lock(&pool->lock);
    112
    113        qemu_bh_schedule(pool->completion_bh);
    114    }
    115
    116    pool->cur_threads--;
    117    qemu_cond_signal(&pool->worker_stopped);
    118    qemu_mutex_unlock(&pool->lock);
    119    return NULL;
    120}
    121
    122static void do_spawn_thread(ThreadPool *pool)
    123{
    124    QemuThread t;
    125
    126    /* Runs with lock taken.  */
    127    if (!pool->new_threads) {
    128        return;
    129    }
    130
    131    pool->new_threads--;
    132    pool->pending_threads++;
    133
    134    qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED);
    135}
    136
    137static void spawn_thread_bh_fn(void *opaque)
    138{
    139    ThreadPool *pool = opaque;
    140
    141    qemu_mutex_lock(&pool->lock);
    142    do_spawn_thread(pool);
    143    qemu_mutex_unlock(&pool->lock);
    144}
    145
    146static void spawn_thread(ThreadPool *pool)
    147{
    148    pool->cur_threads++;
    149    pool->new_threads++;
    150    /* If there are threads being created, they will spawn new workers, so
    151     * we don't spend time creating many threads in a loop holding a mutex or
    152     * starving the current vcpu.
    153     *
    154     * If there are no idle threads, ask the main thread to create one, so we
    155     * inherit the correct affinity instead of the vcpu affinity.
    156     */
    157    if (!pool->pending_threads) {
    158        qemu_bh_schedule(pool->new_thread_bh);
    159    }
    160}
    161
    162static void thread_pool_completion_bh(void *opaque)
    163{
    164    ThreadPool *pool = opaque;
    165    ThreadPoolElement *elem, *next;
    166
    167    aio_context_acquire(pool->ctx);
    168restart:
    169    QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
    170        if (elem->state != THREAD_DONE) {
    171            continue;
    172        }
    173
    174        trace_thread_pool_complete(pool, elem, elem->common.opaque,
    175                                   elem->ret);
    176        QLIST_REMOVE(elem, all);
    177
    178        if (elem->common.cb) {
    179            /* Read state before ret.  */
    180            smp_rmb();
    181
    182            /* Schedule ourselves in case elem->common.cb() calls aio_poll() to
    183             * wait for another request that completed at the same time.
    184             */
    185            qemu_bh_schedule(pool->completion_bh);
    186
    187            aio_context_release(pool->ctx);
    188            elem->common.cb(elem->common.opaque, elem->ret);
    189            aio_context_acquire(pool->ctx);
    190
    191            /* We can safely cancel the completion_bh here regardless of someone
    192             * else having scheduled it meanwhile because we reenter the
    193             * completion function anyway (goto restart).
    194             */
    195            qemu_bh_cancel(pool->completion_bh);
    196
    197            qemu_aio_unref(elem);
    198            goto restart;
    199        } else {
    200            qemu_aio_unref(elem);
    201        }
    202    }
    203    aio_context_release(pool->ctx);
    204}
    205
    206static void thread_pool_cancel(BlockAIOCB *acb)
    207{
    208    ThreadPoolElement *elem = (ThreadPoolElement *)acb;
    209    ThreadPool *pool = elem->pool;
    210
    211    trace_thread_pool_cancel(elem, elem->common.opaque);
    212
    213    QEMU_LOCK_GUARD(&pool->lock);
    214    if (elem->state == THREAD_QUEUED &&
    215        /* No thread has yet started working on elem. we can try to "steal"
    216         * the item from the worker if we can get a signal from the
    217         * semaphore.  Because this is non-blocking, we can do it with
    218         * the lock taken and ensure that elem will remain THREAD_QUEUED.
    219         */
    220        qemu_sem_timedwait(&pool->sem, 0) == 0) {
    221        QTAILQ_REMOVE(&pool->request_list, elem, reqs);
    222        qemu_bh_schedule(pool->completion_bh);
    223
    224        elem->state = THREAD_DONE;
    225        elem->ret = -ECANCELED;
    226    }
    227
    228}
    229
    230static AioContext *thread_pool_get_aio_context(BlockAIOCB *acb)
    231{
    232    ThreadPoolElement *elem = (ThreadPoolElement *)acb;
    233    ThreadPool *pool = elem->pool;
    234    return pool->ctx;
    235}
    236
    237static const AIOCBInfo thread_pool_aiocb_info = {
    238    .aiocb_size         = sizeof(ThreadPoolElement),
    239    .cancel_async       = thread_pool_cancel,
    240    .get_aio_context    = thread_pool_get_aio_context,
    241};
    242
    243BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
    244        ThreadPoolFunc *func, void *arg,
    245        BlockCompletionFunc *cb, void *opaque)
    246{
    247    ThreadPoolElement *req;
    248
    249    req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
    250    req->func = func;
    251    req->arg = arg;
    252    req->state = THREAD_QUEUED;
    253    req->pool = pool;
    254
    255    QLIST_INSERT_HEAD(&pool->head, req, all);
    256
    257    trace_thread_pool_submit(pool, req, arg);
    258
    259    qemu_mutex_lock(&pool->lock);
    260    if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
    261        spawn_thread(pool);
    262    }
    263    QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
    264    qemu_mutex_unlock(&pool->lock);
    265    qemu_sem_post(&pool->sem);
    266    return &req->common;
    267}
    268
    269typedef struct ThreadPoolCo {
    270    Coroutine *co;
    271    int ret;
    272} ThreadPoolCo;
    273
    274static void thread_pool_co_cb(void *opaque, int ret)
    275{
    276    ThreadPoolCo *co = opaque;
    277
    278    co->ret = ret;
    279    aio_co_wake(co->co);
    280}
    281
    282int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
    283                                       void *arg)
    284{
    285    ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
    286    assert(qemu_in_coroutine());
    287    thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
    288    qemu_coroutine_yield();
    289    return tpc.ret;
    290}
    291
    292void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
    293{
    294    thread_pool_submit_aio(pool, func, arg, NULL, NULL);
    295}
    296
    297static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
    298{
    299    if (!ctx) {
    300        ctx = qemu_get_aio_context();
    301    }
    302
    303    memset(pool, 0, sizeof(*pool));
    304    pool->ctx = ctx;
    305    pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool);
    306    qemu_mutex_init(&pool->lock);
    307    qemu_cond_init(&pool->worker_stopped);
    308    qemu_sem_init(&pool->sem, 0);
    309    pool->max_threads = 64;
    310    pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
    311
    312    QLIST_INIT(&pool->head);
    313    QTAILQ_INIT(&pool->request_list);
    314}
    315
    316ThreadPool *thread_pool_new(AioContext *ctx)
    317{
    318    ThreadPool *pool = g_new(ThreadPool, 1);
    319    thread_pool_init_one(pool, ctx);
    320    return pool;
    321}
    322
    323void thread_pool_free(ThreadPool *pool)
    324{
    325    if (!pool) {
    326        return;
    327    }
    328
    329    assert(QLIST_EMPTY(&pool->head));
    330
    331    qemu_mutex_lock(&pool->lock);
    332
    333    /* Stop new threads from spawning */
    334    qemu_bh_delete(pool->new_thread_bh);
    335    pool->cur_threads -= pool->new_threads;
    336    pool->new_threads = 0;
    337
    338    /* Wait for worker threads to terminate */
    339    pool->stopping = true;
    340    while (pool->cur_threads > 0) {
    341        qemu_sem_post(&pool->sem);
    342        qemu_cond_wait(&pool->worker_stopped, &pool->lock);
    343    }
    344
    345    qemu_mutex_unlock(&pool->lock);
    346
    347    qemu_bh_delete(pool->completion_bh);
    348    qemu_sem_destroy(&pool->sem);
    349    qemu_cond_destroy(&pool->worker_stopped);
    350    qemu_mutex_destroy(&pool->lock);
    351    g_free(pool);
    352}