cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

job.c (27578B)


      1/*
      2 * Background jobs (long-running operations)
      3 *
      4 * Copyright (c) 2011 IBM Corp.
      5 * Copyright (c) 2012, 2018 Red Hat, Inc.
      6 *
      7 * Permission is hereby granted, free of charge, to any person obtaining a copy
      8 * of this software and associated documentation files (the "Software"), to deal
      9 * in the Software without restriction, including without limitation the rights
     10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
     11 * copies of the Software, and to permit persons to whom the Software is
     12 * furnished to do so, subject to the following conditions:
     13 *
     14 * The above copyright notice and this permission notice shall be included in
     15 * all copies or substantial portions of the Software.
     16 *
     17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
     20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     23 * THE SOFTWARE.
     24 */
     25
     26#include "qemu/osdep.h"
     27#include "qapi/error.h"
     28#include "qemu/job.h"
     29#include "qemu/id.h"
     30#include "qemu/main-loop.h"
     31#include "block/aio-wait.h"
     32#include "trace/trace-root.h"
     33#include "qapi/qapi-events-job.h"
     34
     35static QLIST_HEAD(, Job) jobs = QLIST_HEAD_INITIALIZER(jobs);
     36
     37/* Job State Transition Table */
     38bool JobSTT[JOB_STATUS__MAX][JOB_STATUS__MAX] = {
     39                                    /* U, C, R, P, Y, S, W, D, X, E, N */
     40    /* U: */ [JOB_STATUS_UNDEFINED] = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
     41    /* C: */ [JOB_STATUS_CREATED]   = {0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1},
     42    /* R: */ [JOB_STATUS_RUNNING]   = {0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0},
     43    /* P: */ [JOB_STATUS_PAUSED]    = {0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
     44    /* Y: */ [JOB_STATUS_READY]     = {0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0},
     45    /* S: */ [JOB_STATUS_STANDBY]   = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
     46    /* W: */ [JOB_STATUS_WAITING]   = {0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0},
     47    /* D: */ [JOB_STATUS_PENDING]   = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
     48    /* X: */ [JOB_STATUS_ABORTING]  = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
     49    /* E: */ [JOB_STATUS_CONCLUDED] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
     50    /* N: */ [JOB_STATUS_NULL]      = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
     51};
     52
     53bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
     54                                    /* U, C, R, P, Y, S, W, D, X, E, N */
     55    [JOB_VERB_CANCEL]               = {0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0},
     56    [JOB_VERB_PAUSE]                = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
     57    [JOB_VERB_RESUME]               = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
     58    [JOB_VERB_SET_SPEED]            = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
     59    [JOB_VERB_COMPLETE]             = {0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0},
     60    [JOB_VERB_FINALIZE]             = {0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
     61    [JOB_VERB_DISMISS]              = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
     62};
     63
     64/* Transactional group of jobs */
     65struct JobTxn {
     66
     67    /* Is this txn being cancelled? */
     68    bool aborting;
     69
     70    /* List of jobs */
     71    QLIST_HEAD(, Job) jobs;
     72
     73    /* Reference count */
     74    int refcnt;
     75};
     76
     77/* Right now, this mutex is only needed to synchronize accesses to job->busy
     78 * and job->sleep_timer, such as concurrent calls to job_do_yield and
     79 * job_enter. */
     80static QemuMutex job_mutex;
     81
     82static void job_lock(void)
     83{
     84    qemu_mutex_lock(&job_mutex);
     85}
     86
     87static void job_unlock(void)
     88{
     89    qemu_mutex_unlock(&job_mutex);
     90}
     91
     92static void __attribute__((__constructor__)) job_init(void)
     93{
     94    qemu_mutex_init(&job_mutex);
     95}
     96
     97JobTxn *job_txn_new(void)
     98{
     99    JobTxn *txn = g_new0(JobTxn, 1);
    100    QLIST_INIT(&txn->jobs);
    101    txn->refcnt = 1;
    102    return txn;
    103}
    104
    105static void job_txn_ref(JobTxn *txn)
    106{
    107    txn->refcnt++;
    108}
    109
    110void job_txn_unref(JobTxn *txn)
    111{
    112    if (txn && --txn->refcnt == 0) {
    113        g_free(txn);
    114    }
    115}
    116
    117void job_txn_add_job(JobTxn *txn, Job *job)
    118{
    119    if (!txn) {
    120        return;
    121    }
    122
    123    assert(!job->txn);
    124    job->txn = txn;
    125
    126    QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
    127    job_txn_ref(txn);
    128}
    129
    130static void job_txn_del_job(Job *job)
    131{
    132    if (job->txn) {
    133        QLIST_REMOVE(job, txn_list);
    134        job_txn_unref(job->txn);
    135        job->txn = NULL;
    136    }
    137}
    138
    139static int job_txn_apply(Job *job, int fn(Job *))
    140{
    141    AioContext *inner_ctx;
    142    Job *other_job, *next;
    143    JobTxn *txn = job->txn;
    144    int rc = 0;
    145
    146    /*
    147     * Similar to job_completed_txn_abort, we take each job's lock before
    148     * applying fn, but since we assume that outer_ctx is held by the caller,
    149     * we need to release it here to avoid holding the lock twice - which would
    150     * break AIO_WAIT_WHILE from within fn.
    151     */
    152    job_ref(job);
    153    aio_context_release(job->aio_context);
    154
    155    QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
    156        inner_ctx = other_job->aio_context;
    157        aio_context_acquire(inner_ctx);
    158        rc = fn(other_job);
    159        aio_context_release(inner_ctx);
    160        if (rc) {
    161            break;
    162        }
    163    }
    164
    165    /*
    166     * Note that job->aio_context might have been changed by calling fn, so we
    167     * can't use a local variable to cache it.
    168     */
    169    aio_context_acquire(job->aio_context);
    170    job_unref(job);
    171    return rc;
    172}
    173
    174bool job_is_internal(Job *job)
    175{
    176    return (job->id == NULL);
    177}
    178
    179static void job_state_transition(Job *job, JobStatus s1)
    180{
    181    JobStatus s0 = job->status;
    182    assert(s1 >= 0 && s1 < JOB_STATUS__MAX);
    183    trace_job_state_transition(job, job->ret,
    184                               JobSTT[s0][s1] ? "allowed" : "disallowed",
    185                               JobStatus_str(s0), JobStatus_str(s1));
    186    assert(JobSTT[s0][s1]);
    187    job->status = s1;
    188
    189    if (!job_is_internal(job) && s1 != s0) {
    190        qapi_event_send_job_status_change(job->id, job->status);
    191    }
    192}
    193
    194int job_apply_verb(Job *job, JobVerb verb, Error **errp)
    195{
    196    JobStatus s0 = job->status;
    197    assert(verb >= 0 && verb < JOB_VERB__MAX);
    198    trace_job_apply_verb(job, JobStatus_str(s0), JobVerb_str(verb),
    199                         JobVerbTable[verb][s0] ? "allowed" : "prohibited");
    200    if (JobVerbTable[verb][s0]) {
    201        return 0;
    202    }
    203    error_setg(errp, "Job '%s' in state '%s' cannot accept command verb '%s'",
    204               job->id, JobStatus_str(s0), JobVerb_str(verb));
    205    return -EPERM;
    206}
    207
    208JobType job_type(const Job *job)
    209{
    210    return job->driver->job_type;
    211}
    212
    213const char *job_type_str(const Job *job)
    214{
    215    return JobType_str(job_type(job));
    216}
    217
    218bool job_is_cancelled(Job *job)
    219{
    220    /* force_cancel may be true only if cancelled is true, too */
    221    assert(job->cancelled || !job->force_cancel);
    222    return job->force_cancel;
    223}
    224
    225bool job_cancel_requested(Job *job)
    226{
    227    return job->cancelled;
    228}
    229
    230bool job_is_ready(Job *job)
    231{
    232    switch (job->status) {
    233    case JOB_STATUS_UNDEFINED:
    234    case JOB_STATUS_CREATED:
    235    case JOB_STATUS_RUNNING:
    236    case JOB_STATUS_PAUSED:
    237    case JOB_STATUS_WAITING:
    238    case JOB_STATUS_PENDING:
    239    case JOB_STATUS_ABORTING:
    240    case JOB_STATUS_CONCLUDED:
    241    case JOB_STATUS_NULL:
    242        return false;
    243    case JOB_STATUS_READY:
    244    case JOB_STATUS_STANDBY:
    245        return true;
    246    default:
    247        g_assert_not_reached();
    248    }
    249    return false;
    250}
    251
    252bool job_is_completed(Job *job)
    253{
    254    switch (job->status) {
    255    case JOB_STATUS_UNDEFINED:
    256    case JOB_STATUS_CREATED:
    257    case JOB_STATUS_RUNNING:
    258    case JOB_STATUS_PAUSED:
    259    case JOB_STATUS_READY:
    260    case JOB_STATUS_STANDBY:
    261        return false;
    262    case JOB_STATUS_WAITING:
    263    case JOB_STATUS_PENDING:
    264    case JOB_STATUS_ABORTING:
    265    case JOB_STATUS_CONCLUDED:
    266    case JOB_STATUS_NULL:
    267        return true;
    268    default:
    269        g_assert_not_reached();
    270    }
    271    return false;
    272}
    273
    274static bool job_started(Job *job)
    275{
    276    return job->co;
    277}
    278
    279static bool job_should_pause(Job *job)
    280{
    281    return job->pause_count > 0;
    282}
    283
    284Job *job_next(Job *job)
    285{
    286    if (!job) {
    287        return QLIST_FIRST(&jobs);
    288    }
    289    return QLIST_NEXT(job, job_list);
    290}
    291
    292Job *job_get(const char *id)
    293{
    294    Job *job;
    295
    296    QLIST_FOREACH(job, &jobs, job_list) {
    297        if (job->id && !strcmp(id, job->id)) {
    298            return job;
    299        }
    300    }
    301
    302    return NULL;
    303}
    304
    305static void job_sleep_timer_cb(void *opaque)
    306{
    307    Job *job = opaque;
    308
    309    job_enter(job);
    310}
    311
    312void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
    313                 AioContext *ctx, int flags, BlockCompletionFunc *cb,
    314                 void *opaque, Error **errp)
    315{
    316    Job *job;
    317
    318    if (job_id) {
    319        if (flags & JOB_INTERNAL) {
    320            error_setg(errp, "Cannot specify job ID for internal job");
    321            return NULL;
    322        }
    323        if (!id_wellformed(job_id)) {
    324            error_setg(errp, "Invalid job ID '%s'", job_id);
    325            return NULL;
    326        }
    327        if (job_get(job_id)) {
    328            error_setg(errp, "Job ID '%s' already in use", job_id);
    329            return NULL;
    330        }
    331    } else if (!(flags & JOB_INTERNAL)) {
    332        error_setg(errp, "An explicit job ID is required");
    333        return NULL;
    334    }
    335
    336    job = g_malloc0(driver->instance_size);
    337    job->driver        = driver;
    338    job->id            = g_strdup(job_id);
    339    job->refcnt        = 1;
    340    job->aio_context   = ctx;
    341    job->busy          = false;
    342    job->paused        = true;
    343    job->pause_count   = 1;
    344    job->auto_finalize = !(flags & JOB_MANUAL_FINALIZE);
    345    job->auto_dismiss  = !(flags & JOB_MANUAL_DISMISS);
    346    job->cb            = cb;
    347    job->opaque        = opaque;
    348
    349    progress_init(&job->progress);
    350
    351    notifier_list_init(&job->on_finalize_cancelled);
    352    notifier_list_init(&job->on_finalize_completed);
    353    notifier_list_init(&job->on_pending);
    354    notifier_list_init(&job->on_ready);
    355
    356    job_state_transition(job, JOB_STATUS_CREATED);
    357    aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
    358                   QEMU_CLOCK_REALTIME, SCALE_NS,
    359                   job_sleep_timer_cb, job);
    360
    361    QLIST_INSERT_HEAD(&jobs, job, job_list);
    362
    363    /* Single jobs are modeled as single-job transactions for sake of
    364     * consolidating the job management logic */
    365    if (!txn) {
    366        txn = job_txn_new();
    367        job_txn_add_job(txn, job);
    368        job_txn_unref(txn);
    369    } else {
    370        job_txn_add_job(txn, job);
    371    }
    372
    373    return job;
    374}
    375
    376void job_ref(Job *job)
    377{
    378    ++job->refcnt;
    379}
    380
    381void job_unref(Job *job)
    382{
    383    if (--job->refcnt == 0) {
    384        assert(job->status == JOB_STATUS_NULL);
    385        assert(!timer_pending(&job->sleep_timer));
    386        assert(!job->txn);
    387
    388        if (job->driver->free) {
    389            job->driver->free(job);
    390        }
    391
    392        QLIST_REMOVE(job, job_list);
    393
    394        progress_destroy(&job->progress);
    395        error_free(job->err);
    396        g_free(job->id);
    397        g_free(job);
    398    }
    399}
    400
    401void job_progress_update(Job *job, uint64_t done)
    402{
    403    progress_work_done(&job->progress, done);
    404}
    405
    406void job_progress_set_remaining(Job *job, uint64_t remaining)
    407{
    408    progress_set_remaining(&job->progress, remaining);
    409}
    410
    411void job_progress_increase_remaining(Job *job, uint64_t delta)
    412{
    413    progress_increase_remaining(&job->progress, delta);
    414}
    415
    416void job_event_cancelled(Job *job)
    417{
    418    notifier_list_notify(&job->on_finalize_cancelled, job);
    419}
    420
    421void job_event_completed(Job *job)
    422{
    423    notifier_list_notify(&job->on_finalize_completed, job);
    424}
    425
    426static void job_event_pending(Job *job)
    427{
    428    notifier_list_notify(&job->on_pending, job);
    429}
    430
    431static void job_event_ready(Job *job)
    432{
    433    notifier_list_notify(&job->on_ready, job);
    434}
    435
    436static void job_event_idle(Job *job)
    437{
    438    notifier_list_notify(&job->on_idle, job);
    439}
    440
    441void job_enter_cond(Job *job, bool(*fn)(Job *job))
    442{
    443    if (!job_started(job)) {
    444        return;
    445    }
    446    if (job->deferred_to_main_loop) {
    447        return;
    448    }
    449
    450    job_lock();
    451    if (job->busy) {
    452        job_unlock();
    453        return;
    454    }
    455
    456    if (fn && !fn(job)) {
    457        job_unlock();
    458        return;
    459    }
    460
    461    assert(!job->deferred_to_main_loop);
    462    timer_del(&job->sleep_timer);
    463    job->busy = true;
    464    job_unlock();
    465    aio_co_enter(job->aio_context, job->co);
    466}
    467
    468void job_enter(Job *job)
    469{
    470    job_enter_cond(job, NULL);
    471}
    472
    473/* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
    474 * Reentering the job coroutine with job_enter() before the timer has expired
    475 * is allowed and cancels the timer.
    476 *
    477 * If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be
    478 * called explicitly. */
    479static void coroutine_fn job_do_yield(Job *job, uint64_t ns)
    480{
    481    job_lock();
    482    if (ns != -1) {
    483        timer_mod(&job->sleep_timer, ns);
    484    }
    485    job->busy = false;
    486    job_event_idle(job);
    487    job_unlock();
    488    qemu_coroutine_yield();
    489
    490    /* Set by job_enter_cond() before re-entering the coroutine.  */
    491    assert(job->busy);
    492}
    493
    494void coroutine_fn job_pause_point(Job *job)
    495{
    496    assert(job && job_started(job));
    497
    498    if (!job_should_pause(job)) {
    499        return;
    500    }
    501    if (job_is_cancelled(job)) {
    502        return;
    503    }
    504
    505    if (job->driver->pause) {
    506        job->driver->pause(job);
    507    }
    508
    509    if (job_should_pause(job) && !job_is_cancelled(job)) {
    510        JobStatus status = job->status;
    511        job_state_transition(job, status == JOB_STATUS_READY
    512                                  ? JOB_STATUS_STANDBY
    513                                  : JOB_STATUS_PAUSED);
    514        job->paused = true;
    515        job_do_yield(job, -1);
    516        job->paused = false;
    517        job_state_transition(job, status);
    518    }
    519
    520    if (job->driver->resume) {
    521        job->driver->resume(job);
    522    }
    523}
    524
    525void job_yield(Job *job)
    526{
    527    assert(job->busy);
    528
    529    /* Check cancellation *before* setting busy = false, too!  */
    530    if (job_is_cancelled(job)) {
    531        return;
    532    }
    533
    534    if (!job_should_pause(job)) {
    535        job_do_yield(job, -1);
    536    }
    537
    538    job_pause_point(job);
    539}
    540
    541void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
    542{
    543    assert(job->busy);
    544
    545    /* Check cancellation *before* setting busy = false, too!  */
    546    if (job_is_cancelled(job)) {
    547        return;
    548    }
    549
    550    if (!job_should_pause(job)) {
    551        job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
    552    }
    553
    554    job_pause_point(job);
    555}
    556
    557/* Assumes the block_job_mutex is held */
    558static bool job_timer_not_pending(Job *job)
    559{
    560    return !timer_pending(&job->sleep_timer);
    561}
    562
    563void job_pause(Job *job)
    564{
    565    job->pause_count++;
    566    if (!job->paused) {
    567        job_enter(job);
    568    }
    569}
    570
    571void job_resume(Job *job)
    572{
    573    assert(job->pause_count > 0);
    574    job->pause_count--;
    575    if (job->pause_count) {
    576        return;
    577    }
    578
    579    /* kick only if no timer is pending */
    580    job_enter_cond(job, job_timer_not_pending);
    581}
    582
    583void job_user_pause(Job *job, Error **errp)
    584{
    585    if (job_apply_verb(job, JOB_VERB_PAUSE, errp)) {
    586        return;
    587    }
    588    if (job->user_paused) {
    589        error_setg(errp, "Job is already paused");
    590        return;
    591    }
    592    job->user_paused = true;
    593    job_pause(job);
    594}
    595
    596bool job_user_paused(Job *job)
    597{
    598    return job->user_paused;
    599}
    600
    601void job_user_resume(Job *job, Error **errp)
    602{
    603    assert(job);
    604    if (!job->user_paused || job->pause_count <= 0) {
    605        error_setg(errp, "Can't resume a job that was not paused");
    606        return;
    607    }
    608    if (job_apply_verb(job, JOB_VERB_RESUME, errp)) {
    609        return;
    610    }
    611    if (job->driver->user_resume) {
    612        job->driver->user_resume(job);
    613    }
    614    job->user_paused = false;
    615    job_resume(job);
    616}
    617
    618static void job_do_dismiss(Job *job)
    619{
    620    assert(job);
    621    job->busy = false;
    622    job->paused = false;
    623    job->deferred_to_main_loop = true;
    624
    625    job_txn_del_job(job);
    626
    627    job_state_transition(job, JOB_STATUS_NULL);
    628    job_unref(job);
    629}
    630
    631void job_dismiss(Job **jobptr, Error **errp)
    632{
    633    Job *job = *jobptr;
    634    /* similarly to _complete, this is QMP-interface only. */
    635    assert(job->id);
    636    if (job_apply_verb(job, JOB_VERB_DISMISS, errp)) {
    637        return;
    638    }
    639
    640    job_do_dismiss(job);
    641    *jobptr = NULL;
    642}
    643
    644void job_early_fail(Job *job)
    645{
    646    assert(job->status == JOB_STATUS_CREATED);
    647    job_do_dismiss(job);
    648}
    649
    650static void job_conclude(Job *job)
    651{
    652    job_state_transition(job, JOB_STATUS_CONCLUDED);
    653    if (job->auto_dismiss || !job_started(job)) {
    654        job_do_dismiss(job);
    655    }
    656}
    657
    658static void job_update_rc(Job *job)
    659{
    660    if (!job->ret && job_is_cancelled(job)) {
    661        job->ret = -ECANCELED;
    662    }
    663    if (job->ret) {
    664        if (!job->err) {
    665            error_setg(&job->err, "%s", strerror(-job->ret));
    666        }
    667        job_state_transition(job, JOB_STATUS_ABORTING);
    668    }
    669}
    670
    671static void job_commit(Job *job)
    672{
    673    assert(!job->ret);
    674    if (job->driver->commit) {
    675        job->driver->commit(job);
    676    }
    677}
    678
    679static void job_abort(Job *job)
    680{
    681    assert(job->ret);
    682    if (job->driver->abort) {
    683        job->driver->abort(job);
    684    }
    685}
    686
    687static void job_clean(Job *job)
    688{
    689    if (job->driver->clean) {
    690        job->driver->clean(job);
    691    }
    692}
    693
    694static int job_finalize_single(Job *job)
    695{
    696    assert(job_is_completed(job));
    697
    698    /* Ensure abort is called for late-transactional failures */
    699    job_update_rc(job);
    700
    701    if (!job->ret) {
    702        job_commit(job);
    703    } else {
    704        job_abort(job);
    705    }
    706    job_clean(job);
    707
    708    if (job->cb) {
    709        job->cb(job->opaque, job->ret);
    710    }
    711
    712    /* Emit events only if we actually started */
    713    if (job_started(job)) {
    714        if (job_is_cancelled(job)) {
    715            job_event_cancelled(job);
    716        } else {
    717            job_event_completed(job);
    718        }
    719    }
    720
    721    job_txn_del_job(job);
    722    job_conclude(job);
    723    return 0;
    724}
    725
    726static void job_cancel_async(Job *job, bool force)
    727{
    728    if (job->driver->cancel) {
    729        force = job->driver->cancel(job, force);
    730    } else {
    731        /* No .cancel() means the job will behave as if force-cancelled */
    732        force = true;
    733    }
    734
    735    if (job->user_paused) {
    736        /* Do not call job_enter here, the caller will handle it.  */
    737        if (job->driver->user_resume) {
    738            job->driver->user_resume(job);
    739        }
    740        job->user_paused = false;
    741        assert(job->pause_count > 0);
    742        job->pause_count--;
    743    }
    744
    745    /*
    746     * Ignore soft cancel requests after the job is already done
    747     * (We will still invoke job->driver->cancel() above, but if the
    748     * job driver supports soft cancelling and the job is done, that
    749     * should be a no-op, too.  We still call it so it can override
    750     * @force.)
    751     */
    752    if (force || !job->deferred_to_main_loop) {
    753        job->cancelled = true;
    754        /* To prevent 'force == false' overriding a previous 'force == true' */
    755        job->force_cancel |= force;
    756    }
    757}
    758
    759static void job_completed_txn_abort(Job *job)
    760{
    761    AioContext *ctx;
    762    JobTxn *txn = job->txn;
    763    Job *other_job;
    764
    765    if (txn->aborting) {
    766        /*
    767         * We are cancelled by another job, which will handle everything.
    768         */
    769        return;
    770    }
    771    txn->aborting = true;
    772    job_txn_ref(txn);
    773
    774    /*
    775     * We can only hold the single job's AioContext lock while calling
    776     * job_finalize_single() because the finalization callbacks can involve
    777     * calls of AIO_WAIT_WHILE(), which could deadlock otherwise.
    778     * Note that the job's AioContext may change when it is finalized.
    779     */
    780    job_ref(job);
    781    aio_context_release(job->aio_context);
    782
    783    /* Other jobs are effectively cancelled by us, set the status for
    784     * them; this job, however, may or may not be cancelled, depending
    785     * on the caller, so leave it. */
    786    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
    787        if (other_job != job) {
    788            ctx = other_job->aio_context;
    789            aio_context_acquire(ctx);
    790            /*
    791             * This is a transaction: If one job failed, no result will matter.
    792             * Therefore, pass force=true to terminate all other jobs as quickly
    793             * as possible.
    794             */
    795            job_cancel_async(other_job, true);
    796            aio_context_release(ctx);
    797        }
    798    }
    799    while (!QLIST_EMPTY(&txn->jobs)) {
    800        other_job = QLIST_FIRST(&txn->jobs);
    801        /*
    802         * The job's AioContext may change, so store it in @ctx so we
    803         * release the same context that we have acquired before.
    804         */
    805        ctx = other_job->aio_context;
    806        aio_context_acquire(ctx);
    807        if (!job_is_completed(other_job)) {
    808            assert(job_cancel_requested(other_job));
    809            job_finish_sync(other_job, NULL, NULL);
    810        }
    811        job_finalize_single(other_job);
    812        aio_context_release(ctx);
    813    }
    814
    815    /*
    816     * Use job_ref()/job_unref() so we can read the AioContext here
    817     * even if the job went away during job_finalize_single().
    818     */
    819    aio_context_acquire(job->aio_context);
    820    job_unref(job);
    821
    822    job_txn_unref(txn);
    823}
    824
    825static int job_prepare(Job *job)
    826{
    827    if (job->ret == 0 && job->driver->prepare) {
    828        job->ret = job->driver->prepare(job);
    829        job_update_rc(job);
    830    }
    831    return job->ret;
    832}
    833
    834static int job_needs_finalize(Job *job)
    835{
    836    return !job->auto_finalize;
    837}
    838
    839static void job_do_finalize(Job *job)
    840{
    841    int rc;
    842    assert(job && job->txn);
    843
    844    /* prepare the transaction to complete */
    845    rc = job_txn_apply(job, job_prepare);
    846    if (rc) {
    847        job_completed_txn_abort(job);
    848    } else {
    849        job_txn_apply(job, job_finalize_single);
    850    }
    851}
    852
    853void job_finalize(Job *job, Error **errp)
    854{
    855    assert(job && job->id);
    856    if (job_apply_verb(job, JOB_VERB_FINALIZE, errp)) {
    857        return;
    858    }
    859    job_do_finalize(job);
    860}
    861
    862static int job_transition_to_pending(Job *job)
    863{
    864    job_state_transition(job, JOB_STATUS_PENDING);
    865    if (!job->auto_finalize) {
    866        job_event_pending(job);
    867    }
    868    return 0;
    869}
    870
    871void job_transition_to_ready(Job *job)
    872{
    873    job_state_transition(job, JOB_STATUS_READY);
    874    job_event_ready(job);
    875}
    876
    877static void job_completed_txn_success(Job *job)
    878{
    879    JobTxn *txn = job->txn;
    880    Job *other_job;
    881
    882    job_state_transition(job, JOB_STATUS_WAITING);
    883
    884    /*
    885     * Successful completion, see if there are other running jobs in this
    886     * txn.
    887     */
    888    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
    889        if (!job_is_completed(other_job)) {
    890            return;
    891        }
    892        assert(other_job->ret == 0);
    893    }
    894
    895    job_txn_apply(job, job_transition_to_pending);
    896
    897    /* If no jobs need manual finalization, automatically do so */
    898    if (job_txn_apply(job, job_needs_finalize) == 0) {
    899        job_do_finalize(job);
    900    }
    901}
    902
    903static void job_completed(Job *job)
    904{
    905    assert(job && job->txn && !job_is_completed(job));
    906
    907    job_update_rc(job);
    908    trace_job_completed(job, job->ret);
    909    if (job->ret) {
    910        job_completed_txn_abort(job);
    911    } else {
    912        job_completed_txn_success(job);
    913    }
    914}
    915
    916/** Useful only as a type shim for aio_bh_schedule_oneshot. */
    917static void job_exit(void *opaque)
    918{
    919    Job *job = (Job *)opaque;
    920    AioContext *ctx;
    921
    922    job_ref(job);
    923    aio_context_acquire(job->aio_context);
    924
    925    /* This is a lie, we're not quiescent, but still doing the completion
    926     * callbacks. However, completion callbacks tend to involve operations that
    927     * drain block nodes, and if .drained_poll still returned true, we would
    928     * deadlock. */
    929    job->busy = false;
    930    job_event_idle(job);
    931
    932    job_completed(job);
    933
    934    /*
    935     * Note that calling job_completed can move the job to a different
    936     * aio_context, so we cannot cache from above. job_txn_apply takes care of
    937     * acquiring the new lock, and we ref/unref to avoid job_completed freeing
    938     * the job underneath us.
    939     */
    940    ctx = job->aio_context;
    941    job_unref(job);
    942    aio_context_release(ctx);
    943}
    944
    945/**
    946 * All jobs must allow a pause point before entering their job proper. This
    947 * ensures that jobs can be paused prior to being started, then resumed later.
    948 */
    949static void coroutine_fn job_co_entry(void *opaque)
    950{
    951    Job *job = opaque;
    952
    953    assert(job && job->driver && job->driver->run);
    954    job_pause_point(job);
    955    job->ret = job->driver->run(job, &job->err);
    956    job->deferred_to_main_loop = true;
    957    job->busy = true;
    958    aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
    959}
    960
    961void job_start(Job *job)
    962{
    963    assert(job && !job_started(job) && job->paused &&
    964           job->driver && job->driver->run);
    965    job->co = qemu_coroutine_create(job_co_entry, job);
    966    job->pause_count--;
    967    job->busy = true;
    968    job->paused = false;
    969    job_state_transition(job, JOB_STATUS_RUNNING);
    970    aio_co_enter(job->aio_context, job->co);
    971}
    972
    973void job_cancel(Job *job, bool force)
    974{
    975    if (job->status == JOB_STATUS_CONCLUDED) {
    976        job_do_dismiss(job);
    977        return;
    978    }
    979    job_cancel_async(job, force);
    980    if (!job_started(job)) {
    981        job_completed(job);
    982    } else if (job->deferred_to_main_loop) {
    983        /*
    984         * job_cancel_async() ignores soft-cancel requests for jobs
    985         * that are already done (i.e. deferred to the main loop).  We
    986         * have to check again whether the job is really cancelled.
    987         * (job_cancel_requested() and job_is_cancelled() are equivalent
    988         * here, because job_cancel_async() will make soft-cancel
    989         * requests no-ops when deferred_to_main_loop is true.  We
    990         * choose to call job_is_cancelled() to show that we invoke
    991         * job_completed_txn_abort() only for force-cancelled jobs.)
    992         */
    993        if (job_is_cancelled(job)) {
    994            job_completed_txn_abort(job);
    995        }
    996    } else {
    997        job_enter(job);
    998    }
    999}
   1000
   1001void job_user_cancel(Job *job, bool force, Error **errp)
   1002{
   1003    if (job_apply_verb(job, JOB_VERB_CANCEL, errp)) {
   1004        return;
   1005    }
   1006    job_cancel(job, force);
   1007}
   1008
   1009/* A wrapper around job_cancel() taking an Error ** parameter so it may be
   1010 * used with job_finish_sync() without the need for (rather nasty) function
   1011 * pointer casts there. */
   1012static void job_cancel_err(Job *job, Error **errp)
   1013{
   1014    job_cancel(job, false);
   1015}
   1016
   1017/**
   1018 * Same as job_cancel_err(), but force-cancel.
   1019 */
   1020static void job_force_cancel_err(Job *job, Error **errp)
   1021{
   1022    job_cancel(job, true);
   1023}
   1024
   1025int job_cancel_sync(Job *job, bool force)
   1026{
   1027    if (force) {
   1028        return job_finish_sync(job, &job_force_cancel_err, NULL);
   1029    } else {
   1030        return job_finish_sync(job, &job_cancel_err, NULL);
   1031    }
   1032}
   1033
   1034void job_cancel_sync_all(void)
   1035{
   1036    Job *job;
   1037    AioContext *aio_context;
   1038
   1039    while ((job = job_next(NULL))) {
   1040        aio_context = job->aio_context;
   1041        aio_context_acquire(aio_context);
   1042        job_cancel_sync(job, true);
   1043        aio_context_release(aio_context);
   1044    }
   1045}
   1046
   1047int job_complete_sync(Job *job, Error **errp)
   1048{
   1049    return job_finish_sync(job, job_complete, errp);
   1050}
   1051
   1052void job_complete(Job *job, Error **errp)
   1053{
   1054    /* Should not be reachable via external interface for internal jobs */
   1055    assert(job->id);
   1056    if (job_apply_verb(job, JOB_VERB_COMPLETE, errp)) {
   1057        return;
   1058    }
   1059    if (job_cancel_requested(job) || !job->driver->complete) {
   1060        error_setg(errp, "The active block job '%s' cannot be completed",
   1061                   job->id);
   1062        return;
   1063    }
   1064
   1065    job->driver->complete(job, errp);
   1066}
   1067
   1068int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp)
   1069{
   1070    Error *local_err = NULL;
   1071    int ret;
   1072
   1073    job_ref(job);
   1074
   1075    if (finish) {
   1076        finish(job, &local_err);
   1077    }
   1078    if (local_err) {
   1079        error_propagate(errp, local_err);
   1080        job_unref(job);
   1081        return -EBUSY;
   1082    }
   1083
   1084    AIO_WAIT_WHILE(job->aio_context,
   1085                   (job_enter(job), !job_is_completed(job)));
   1086
   1087    ret = (job_is_cancelled(job) && job->ret == 0) ? -ECANCELED : job->ret;
   1088    job_unref(job);
   1089    return ret;
   1090}