cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

quorum.c (40011B)


      1/*
      2 * Quorum Block filter
      3 *
      4 * Copyright (C) 2012-2014 Nodalink, EURL.
      5 *
      6 * Author:
      7 *   BenoƮt Canet <benoit.canet@irqsave.net>
      8 *
      9 * Based on the design and code of blkverify.c (Copyright (C) 2010 IBM, Corp)
     10 * and blkmirror.c (Copyright (C) 2011 Red Hat, Inc).
     11 *
     12 * This work is licensed under the terms of the GNU GPL, version 2 or later.
     13 * See the COPYING file in the top-level directory.
     14 */
     15
     16#include "qemu/osdep.h"
     17#include "qemu/cutils.h"
     18#include "qemu/module.h"
     19#include "qemu/option.h"
     20#include "block/block_int.h"
     21#include "block/coroutines.h"
     22#include "block/qdict.h"
     23#include "qapi/error.h"
     24#include "qapi/qapi-events-block.h"
     25#include "qapi/qmp/qdict.h"
     26#include "qapi/qmp/qerror.h"
     27#include "qapi/qmp/qlist.h"
     28#include "qapi/qmp/qstring.h"
     29#include "crypto/hash.h"
     30
     31#define HASH_LENGTH 32
     32
     33#define INDEXSTR_LEN 32
     34
     35#define QUORUM_OPT_VOTE_THRESHOLD "vote-threshold"
     36#define QUORUM_OPT_BLKVERIFY      "blkverify"
     37#define QUORUM_OPT_REWRITE        "rewrite-corrupted"
     38#define QUORUM_OPT_READ_PATTERN   "read-pattern"
     39
     40/* This union holds a vote hash value */
     41typedef union QuorumVoteValue {
     42    uint8_t h[HASH_LENGTH];    /* SHA-256 hash */
     43    int64_t l;                 /* simpler 64 bits hash */
     44} QuorumVoteValue;
     45
     46/* A vote item */
     47typedef struct QuorumVoteItem {
     48    int index;
     49    QLIST_ENTRY(QuorumVoteItem) next;
     50} QuorumVoteItem;
     51
     52/* this structure is a vote version. A version is the set of votes sharing the
     53 * same vote value.
     54 * The set of votes will be tracked with the items field and its cardinality is
     55 * vote_count.
     56 */
     57typedef struct QuorumVoteVersion {
     58    QuorumVoteValue value;
     59    int index;
     60    int vote_count;
     61    QLIST_HEAD(, QuorumVoteItem) items;
     62    QLIST_ENTRY(QuorumVoteVersion) next;
     63} QuorumVoteVersion;
     64
     65/* this structure holds a group of vote versions together */
     66typedef struct QuorumVotes {
     67    QLIST_HEAD(, QuorumVoteVersion) vote_list;
     68    bool (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
     69} QuorumVotes;
     70
     71/* the following structure holds the state of one quorum instance */
     72typedef struct BDRVQuorumState {
     73    BdrvChild **children;  /* children BlockDriverStates */
     74    int num_children;      /* children count */
     75    unsigned next_child_index;  /* the index of the next child that should
     76                                 * be added
     77                                 */
     78    int threshold;         /* if less than threshold children reads gave the
     79                            * same result a quorum error occurs.
     80                            */
     81    bool is_blkverify;     /* true if the driver is in blkverify mode
     82                            * Writes are mirrored on two children devices.
     83                            * On reads the two children devices' contents are
     84                            * compared and if a difference is spotted its
     85                            * location is printed and the code aborts.
     86                            * It is useful to debug other block drivers by
     87                            * comparing them with a reference one.
     88                            */
     89    bool rewrite_corrupted;/* true if the driver must rewrite-on-read corrupted
     90                            * block if Quorum is reached.
     91                            */
     92
     93    QuorumReadPattern read_pattern;
     94} BDRVQuorumState;
     95
     96typedef struct QuorumAIOCB QuorumAIOCB;
     97
     98/* Quorum will create one instance of the following structure per operation it
     99 * performs on its children.
    100 * So for each read/write operation coming from the upper layer there will be
    101 * $children_count QuorumChildRequest.
    102 */
    103typedef struct QuorumChildRequest {
    104    BlockDriverState *bs;
    105    QEMUIOVector qiov;
    106    uint8_t *buf;
    107    int ret;
    108    QuorumAIOCB *parent;
    109} QuorumChildRequest;
    110
    111/* Quorum will use the following structure to track progress of each read/write
    112 * operation received by the upper layer.
    113 * This structure hold pointers to the QuorumChildRequest structures instances
    114 * used to do operations on each children and track overall progress.
    115 */
    116struct QuorumAIOCB {
    117    BlockDriverState *bs;
    118    Coroutine *co;
    119
    120    /* Request metadata */
    121    uint64_t offset;
    122    uint64_t bytes;
    123    int flags;
    124
    125    QEMUIOVector *qiov;         /* calling IOV */
    126
    127    QuorumChildRequest *qcrs;   /* individual child requests */
    128    int count;                  /* number of completed AIOCB */
    129    int success_count;          /* number of successfully completed AIOCB */
    130
    131    int rewrite_count;          /* number of replica to rewrite: count down to
    132                                 * zero once writes are fired
    133                                 */
    134
    135    QuorumVotes votes;
    136
    137    bool is_read;
    138    int vote_ret;
    139    int children_read;          /* how many children have been read from */
    140};
    141
    142typedef struct QuorumCo {
    143    QuorumAIOCB *acb;
    144    int idx;
    145} QuorumCo;
    146
    147static void quorum_aio_finalize(QuorumAIOCB *acb)
    148{
    149    g_free(acb->qcrs);
    150    g_free(acb);
    151}
    152
    153static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
    154{
    155    return !memcmp(a->h, b->h, HASH_LENGTH);
    156}
    157
    158static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b)
    159{
    160    return a->l == b->l;
    161}
    162
    163static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
    164                                   QEMUIOVector *qiov,
    165                                   uint64_t offset,
    166                                   uint64_t bytes,
    167                                   int flags)
    168{
    169    BDRVQuorumState *s = bs->opaque;
    170    QuorumAIOCB *acb = g_new(QuorumAIOCB, 1);
    171    int i;
    172
    173    *acb = (QuorumAIOCB) {
    174        .co                 = qemu_coroutine_self(),
    175        .bs                 = bs,
    176        .offset             = offset,
    177        .bytes              = bytes,
    178        .flags              = flags,
    179        .qiov               = qiov,
    180        .votes.compare      = quorum_sha256_compare,
    181        .votes.vote_list    = QLIST_HEAD_INITIALIZER(acb.votes.vote_list),
    182    };
    183
    184    acb->qcrs = g_new0(QuorumChildRequest, s->num_children);
    185    for (i = 0; i < s->num_children; i++) {
    186        acb->qcrs[i].buf = NULL;
    187        acb->qcrs[i].ret = 0;
    188        acb->qcrs[i].parent = acb;
    189    }
    190
    191    return acb;
    192}
    193
    194static void quorum_report_bad(QuorumOpType type, uint64_t offset,
    195                              uint64_t bytes, char *node_name, int ret)
    196{
    197    const char *msg = NULL;
    198    int64_t start_sector = offset / BDRV_SECTOR_SIZE;
    199    int64_t end_sector = DIV_ROUND_UP(offset + bytes, BDRV_SECTOR_SIZE);
    200
    201    if (ret < 0) {
    202        msg = strerror(-ret);
    203    }
    204
    205    qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name, start_sector,
    206                                      end_sector - start_sector);
    207}
    208
    209static void quorum_report_failure(QuorumAIOCB *acb)
    210{
    211    const char *reference = bdrv_get_device_or_node_name(acb->bs);
    212    int64_t start_sector = acb->offset / BDRV_SECTOR_SIZE;
    213    int64_t end_sector = DIV_ROUND_UP(acb->offset + acb->bytes,
    214                                      BDRV_SECTOR_SIZE);
    215
    216    qapi_event_send_quorum_failure(reference, start_sector,
    217                                   end_sector - start_sector);
    218}
    219
    220static int quorum_vote_error(QuorumAIOCB *acb);
    221
    222static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
    223{
    224    BDRVQuorumState *s = acb->bs->opaque;
    225
    226    if (acb->success_count < s->threshold) {
    227        acb->vote_ret = quorum_vote_error(acb);
    228        quorum_report_failure(acb);
    229        return true;
    230    }
    231
    232    return false;
    233}
    234
    235static int read_fifo_child(QuorumAIOCB *acb);
    236
    237static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
    238{
    239    int i;
    240    assert(dest->niov == source->niov);
    241    assert(dest->size == source->size);
    242    for (i = 0; i < source->niov; i++) {
    243        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
    244        memcpy(dest->iov[i].iov_base,
    245               source->iov[i].iov_base,
    246               source->iov[i].iov_len);
    247    }
    248}
    249
    250static void quorum_report_bad_acb(QuorumChildRequest *sacb, int ret)
    251{
    252    QuorumAIOCB *acb = sacb->parent;
    253    QuorumOpType type = acb->is_read ? QUORUM_OP_TYPE_READ : QUORUM_OP_TYPE_WRITE;
    254    quorum_report_bad(type, acb->offset, acb->bytes, sacb->bs->node_name, ret);
    255}
    256
    257static void quorum_report_bad_versions(BDRVQuorumState *s,
    258                                       QuorumAIOCB *acb,
    259                                       QuorumVoteValue *value)
    260{
    261    QuorumVoteVersion *version;
    262    QuorumVoteItem *item;
    263
    264    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
    265        if (acb->votes.compare(&version->value, value)) {
    266            continue;
    267        }
    268        QLIST_FOREACH(item, &version->items, next) {
    269            quorum_report_bad(QUORUM_OP_TYPE_READ, acb->offset, acb->bytes,
    270                              s->children[item->index]->bs->node_name, 0);
    271        }
    272    }
    273}
    274
    275static void quorum_rewrite_entry(void *opaque)
    276{
    277    QuorumCo *co = opaque;
    278    QuorumAIOCB *acb = co->acb;
    279    BDRVQuorumState *s = acb->bs->opaque;
    280
    281    /* Ignore any errors, it's just a correction attempt for already
    282     * corrupted data.
    283     * Mask out BDRV_REQ_WRITE_UNCHANGED because this overwrites the
    284     * area with different data from the other children. */
    285    bdrv_co_pwritev(s->children[co->idx], acb->offset, acb->bytes,
    286                    acb->qiov, acb->flags & ~BDRV_REQ_WRITE_UNCHANGED);
    287
    288    /* Wake up the caller after the last rewrite */
    289    acb->rewrite_count--;
    290    if (!acb->rewrite_count) {
    291        qemu_coroutine_enter_if_inactive(acb->co);
    292    }
    293}
    294
    295static bool quorum_rewrite_bad_versions(QuorumAIOCB *acb,
    296                                        QuorumVoteValue *value)
    297{
    298    QuorumVoteVersion *version;
    299    QuorumVoteItem *item;
    300    int count = 0;
    301
    302    /* first count the number of bad versions: done first to avoid concurrency
    303     * issues.
    304     */
    305    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
    306        if (acb->votes.compare(&version->value, value)) {
    307            continue;
    308        }
    309        QLIST_FOREACH(item, &version->items, next) {
    310            count++;
    311        }
    312    }
    313
    314    /* quorum_rewrite_entry will count down this to zero */
    315    acb->rewrite_count = count;
    316
    317    /* now fire the correcting rewrites */
    318    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
    319        if (acb->votes.compare(&version->value, value)) {
    320            continue;
    321        }
    322        QLIST_FOREACH(item, &version->items, next) {
    323            Coroutine *co;
    324            QuorumCo data = {
    325                .acb = acb,
    326                .idx = item->index,
    327            };
    328
    329            co = qemu_coroutine_create(quorum_rewrite_entry, &data);
    330            qemu_coroutine_enter(co);
    331        }
    332    }
    333
    334    /* return true if any rewrite is done else false */
    335    return count;
    336}
    337
    338static void quorum_count_vote(QuorumVotes *votes,
    339                              QuorumVoteValue *value,
    340                              int index)
    341{
    342    QuorumVoteVersion *v = NULL, *version = NULL;
    343    QuorumVoteItem *item;
    344
    345    /* look if we have something with this hash */
    346    QLIST_FOREACH(v, &votes->vote_list, next) {
    347        if (votes->compare(&v->value, value)) {
    348            version = v;
    349            break;
    350        }
    351    }
    352
    353    /* It's a version not yet in the list add it */
    354    if (!version) {
    355        version = g_new0(QuorumVoteVersion, 1);
    356        QLIST_INIT(&version->items);
    357        memcpy(&version->value, value, sizeof(version->value));
    358        version->index = index;
    359        version->vote_count = 0;
    360        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
    361    }
    362
    363    version->vote_count++;
    364
    365    item = g_new0(QuorumVoteItem, 1);
    366    item->index = index;
    367    QLIST_INSERT_HEAD(&version->items, item, next);
    368}
    369
    370static void quorum_free_vote_list(QuorumVotes *votes)
    371{
    372    QuorumVoteVersion *version, *next_version;
    373    QuorumVoteItem *item, *next_item;
    374
    375    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
    376        QLIST_REMOVE(version, next);
    377        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
    378            QLIST_REMOVE(item, next);
    379            g_free(item);
    380        }
    381        g_free(version);
    382    }
    383}
    384
    385static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
    386{
    387    QEMUIOVector *qiov = &acb->qcrs[i].qiov;
    388    size_t len = sizeof(hash->h);
    389    uint8_t *data = hash->h;
    390
    391    /* XXX - would be nice if we could pass in the Error **
    392     * and propagate that back, but this quorum code is
    393     * restricted to just errno values currently */
    394    if (qcrypto_hash_bytesv(QCRYPTO_HASH_ALG_SHA256,
    395                            qiov->iov, qiov->niov,
    396                            &data, &len,
    397                            NULL) < 0) {
    398        return -EINVAL;
    399    }
    400
    401    return 0;
    402}
    403
    404static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
    405{
    406    int max = 0;
    407    QuorumVoteVersion *candidate, *winner = NULL;
    408
    409    QLIST_FOREACH(candidate, &votes->vote_list, next) {
    410        if (candidate->vote_count > max) {
    411            max = candidate->vote_count;
    412            winner = candidate;
    413        }
    414    }
    415
    416    return winner;
    417}
    418
    419/* qemu_iovec_compare is handy for blkverify mode because it returns the first
    420 * differing byte location. Yet it is handcoded to compare vectors one byte
    421 * after another so it does not benefit from the libc SIMD optimizations.
    422 * quorum_iovec_compare is written for speed and should be used in the non
    423 * blkverify mode of quorum.
    424 */
    425static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
    426{
    427    int i;
    428    int result;
    429
    430    assert(a->niov == b->niov);
    431    for (i = 0; i < a->niov; i++) {
    432        assert(a->iov[i].iov_len == b->iov[i].iov_len);
    433        result = memcmp(a->iov[i].iov_base,
    434                        b->iov[i].iov_base,
    435                        a->iov[i].iov_len);
    436        if (result) {
    437            return false;
    438        }
    439    }
    440
    441    return true;
    442}
    443
    444static bool quorum_compare(QuorumAIOCB *acb, QEMUIOVector *a, QEMUIOVector *b)
    445{
    446    BDRVQuorumState *s = acb->bs->opaque;
    447    ssize_t offset;
    448
    449    /* This driver will replace blkverify in this particular case */
    450    if (s->is_blkverify) {
    451        offset = qemu_iovec_compare(a, b);
    452        if (offset != -1) {
    453            fprintf(stderr, "quorum: offset=%" PRIu64 " bytes=%" PRIu64
    454                    " contents mismatch at offset %" PRIu64 "\n",
    455                    acb->offset, acb->bytes, acb->offset + offset);
    456            exit(1);
    457        }
    458        return true;
    459    }
    460
    461    return quorum_iovec_compare(a, b);
    462}
    463
    464/* Do a vote to get the error code */
    465static int quorum_vote_error(QuorumAIOCB *acb)
    466{
    467    BDRVQuorumState *s = acb->bs->opaque;
    468    QuorumVoteVersion *winner = NULL;
    469    QuorumVotes error_votes;
    470    QuorumVoteValue result_value;
    471    int i, ret = 0;
    472    bool error = false;
    473
    474    QLIST_INIT(&error_votes.vote_list);
    475    error_votes.compare = quorum_64bits_compare;
    476
    477    for (i = 0; i < s->num_children; i++) {
    478        ret = acb->qcrs[i].ret;
    479        if (ret) {
    480            error = true;
    481            result_value.l = ret;
    482            quorum_count_vote(&error_votes, &result_value, i);
    483        }
    484    }
    485
    486    if (error) {
    487        winner = quorum_get_vote_winner(&error_votes);
    488        ret = winner->value.l;
    489    }
    490
    491    quorum_free_vote_list(&error_votes);
    492
    493    return ret;
    494}
    495
    496static void quorum_vote(QuorumAIOCB *acb)
    497{
    498    bool quorum = true;
    499    int i, j, ret;
    500    QuorumVoteValue hash;
    501    BDRVQuorumState *s = acb->bs->opaque;
    502    QuorumVoteVersion *winner;
    503
    504    if (quorum_has_too_much_io_failed(acb)) {
    505        return;
    506    }
    507
    508    /* get the index of the first successful read */
    509    for (i = 0; i < s->num_children; i++) {
    510        if (!acb->qcrs[i].ret) {
    511            break;
    512        }
    513    }
    514
    515    assert(i < s->num_children);
    516
    517    /* compare this read with all other successful reads stopping at quorum
    518     * failure
    519     */
    520    for (j = i + 1; j < s->num_children; j++) {
    521        if (acb->qcrs[j].ret) {
    522            continue;
    523        }
    524        quorum = quorum_compare(acb, &acb->qcrs[i].qiov, &acb->qcrs[j].qiov);
    525        if (!quorum) {
    526            break;
    527       }
    528    }
    529
    530    /* Every successful read agrees */
    531    if (quorum) {
    532        quorum_copy_qiov(acb->qiov, &acb->qcrs[i].qiov);
    533        return;
    534    }
    535
    536    /* compute hashes for each successful read, also store indexes */
    537    for (i = 0; i < s->num_children; i++) {
    538        if (acb->qcrs[i].ret) {
    539            continue;
    540        }
    541        ret = quorum_compute_hash(acb, i, &hash);
    542        /* if ever the hash computation failed */
    543        if (ret < 0) {
    544            acb->vote_ret = ret;
    545            goto free_exit;
    546        }
    547        quorum_count_vote(&acb->votes, &hash, i);
    548    }
    549
    550    /* vote to select the most represented version */
    551    winner = quorum_get_vote_winner(&acb->votes);
    552
    553    /* if the winner count is smaller than threshold the read fails */
    554    if (winner->vote_count < s->threshold) {
    555        quorum_report_failure(acb);
    556        acb->vote_ret = -EIO;
    557        goto free_exit;
    558    }
    559
    560    /* we have a winner: copy it */
    561    quorum_copy_qiov(acb->qiov, &acb->qcrs[winner->index].qiov);
    562
    563    /* some versions are bad print them */
    564    quorum_report_bad_versions(s, acb, &winner->value);
    565
    566    /* corruption correction is enabled */
    567    if (s->rewrite_corrupted) {
    568        quorum_rewrite_bad_versions(acb, &winner->value);
    569    }
    570
    571free_exit:
    572    /* free lists */
    573    quorum_free_vote_list(&acb->votes);
    574}
    575
    576static void read_quorum_children_entry(void *opaque)
    577{
    578    QuorumCo *co = opaque;
    579    QuorumAIOCB *acb = co->acb;
    580    BDRVQuorumState *s = acb->bs->opaque;
    581    int i = co->idx;
    582    QuorumChildRequest *sacb = &acb->qcrs[i];
    583
    584    sacb->bs = s->children[i]->bs;
    585    sacb->ret = bdrv_co_preadv(s->children[i], acb->offset, acb->bytes,
    586                               &acb->qcrs[i].qiov, 0);
    587
    588    if (sacb->ret == 0) {
    589        acb->success_count++;
    590    } else {
    591        quorum_report_bad_acb(sacb, sacb->ret);
    592    }
    593
    594    acb->count++;
    595    assert(acb->count <= s->num_children);
    596    assert(acb->success_count <= s->num_children);
    597
    598    /* Wake up the caller after the last read */
    599    if (acb->count == s->num_children) {
    600        qemu_coroutine_enter_if_inactive(acb->co);
    601    }
    602}
    603
    604static int read_quorum_children(QuorumAIOCB *acb)
    605{
    606    BDRVQuorumState *s = acb->bs->opaque;
    607    int i;
    608
    609    acb->children_read = s->num_children;
    610    for (i = 0; i < s->num_children; i++) {
    611        acb->qcrs[i].buf = qemu_blockalign(s->children[i]->bs, acb->qiov->size);
    612        qemu_iovec_init(&acb->qcrs[i].qiov, acb->qiov->niov);
    613        qemu_iovec_clone(&acb->qcrs[i].qiov, acb->qiov, acb->qcrs[i].buf);
    614    }
    615
    616    for (i = 0; i < s->num_children; i++) {
    617        Coroutine *co;
    618        QuorumCo data = {
    619            .acb = acb,
    620            .idx = i,
    621        };
    622
    623        co = qemu_coroutine_create(read_quorum_children_entry, &data);
    624        qemu_coroutine_enter(co);
    625    }
    626
    627    while (acb->count < s->num_children) {
    628        qemu_coroutine_yield();
    629    }
    630
    631    /* Do the vote on read */
    632    quorum_vote(acb);
    633    for (i = 0; i < s->num_children; i++) {
    634        qemu_vfree(acb->qcrs[i].buf);
    635        qemu_iovec_destroy(&acb->qcrs[i].qiov);
    636    }
    637
    638    while (acb->rewrite_count) {
    639        qemu_coroutine_yield();
    640    }
    641
    642    return acb->vote_ret;
    643}
    644
    645static int read_fifo_child(QuorumAIOCB *acb)
    646{
    647    BDRVQuorumState *s = acb->bs->opaque;
    648    int n, ret;
    649
    650    /* We try to read the next child in FIFO order if we failed to read */
    651    do {
    652        n = acb->children_read++;
    653        acb->qcrs[n].bs = s->children[n]->bs;
    654        ret = bdrv_co_preadv(s->children[n], acb->offset, acb->bytes,
    655                             acb->qiov, 0);
    656        if (ret < 0) {
    657            quorum_report_bad_acb(&acb->qcrs[n], ret);
    658        }
    659    } while (ret < 0 && acb->children_read < s->num_children);
    660
    661    /* FIXME: rewrite failed children if acb->children_read > 1? */
    662
    663    return ret;
    664}
    665
    666static int quorum_co_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes,
    667                            QEMUIOVector *qiov, BdrvRequestFlags flags)
    668{
    669    BDRVQuorumState *s = bs->opaque;
    670    QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
    671    int ret;
    672
    673    acb->is_read = true;
    674    acb->children_read = 0;
    675
    676    if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
    677        ret = read_quorum_children(acb);
    678    } else {
    679        ret = read_fifo_child(acb);
    680    }
    681    quorum_aio_finalize(acb);
    682
    683    return ret;
    684}
    685
    686static void write_quorum_entry(void *opaque)
    687{
    688    QuorumCo *co = opaque;
    689    QuorumAIOCB *acb = co->acb;
    690    BDRVQuorumState *s = acb->bs->opaque;
    691    int i = co->idx;
    692    QuorumChildRequest *sacb = &acb->qcrs[i];
    693
    694    sacb->bs = s->children[i]->bs;
    695    if (acb->flags & BDRV_REQ_ZERO_WRITE) {
    696        sacb->ret = bdrv_co_pwrite_zeroes(s->children[i], acb->offset,
    697                                          acb->bytes, acb->flags);
    698    } else {
    699        sacb->ret = bdrv_co_pwritev(s->children[i], acb->offset, acb->bytes,
    700                                    acb->qiov, acb->flags);
    701    }
    702    if (sacb->ret == 0) {
    703        acb->success_count++;
    704    } else {
    705        quorum_report_bad_acb(sacb, sacb->ret);
    706    }
    707    acb->count++;
    708    assert(acb->count <= s->num_children);
    709    assert(acb->success_count <= s->num_children);
    710
    711    /* Wake up the caller after the last write */
    712    if (acb->count == s->num_children) {
    713        qemu_coroutine_enter_if_inactive(acb->co);
    714    }
    715}
    716
    717static int quorum_co_pwritev(BlockDriverState *bs, int64_t offset,
    718                             int64_t bytes, QEMUIOVector *qiov,
    719                             BdrvRequestFlags flags)
    720{
    721    BDRVQuorumState *s = bs->opaque;
    722    QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
    723    int i, ret;
    724
    725    for (i = 0; i < s->num_children; i++) {
    726        Coroutine *co;
    727        QuorumCo data = {
    728            .acb = acb,
    729            .idx = i,
    730        };
    731
    732        co = qemu_coroutine_create(write_quorum_entry, &data);
    733        qemu_coroutine_enter(co);
    734    }
    735
    736    while (acb->count < s->num_children) {
    737        qemu_coroutine_yield();
    738    }
    739
    740    quorum_has_too_much_io_failed(acb);
    741
    742    ret = acb->vote_ret;
    743    quorum_aio_finalize(acb);
    744
    745    return ret;
    746}
    747
    748static int quorum_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
    749                                   int64_t bytes, BdrvRequestFlags flags)
    750
    751{
    752    return quorum_co_pwritev(bs, offset, bytes, NULL,
    753                             flags | BDRV_REQ_ZERO_WRITE);
    754}
    755
    756static int64_t quorum_getlength(BlockDriverState *bs)
    757{
    758    BDRVQuorumState *s = bs->opaque;
    759    int64_t result;
    760    int i;
    761
    762    /* check that all file have the same length */
    763    result = bdrv_getlength(s->children[0]->bs);
    764    if (result < 0) {
    765        return result;
    766    }
    767    for (i = 1; i < s->num_children; i++) {
    768        int64_t value = bdrv_getlength(s->children[i]->bs);
    769        if (value < 0) {
    770            return value;
    771        }
    772        if (value != result) {
    773            return -EIO;
    774        }
    775    }
    776
    777    return result;
    778}
    779
    780static coroutine_fn int quorum_co_flush(BlockDriverState *bs)
    781{
    782    BDRVQuorumState *s = bs->opaque;
    783    QuorumVoteVersion *winner = NULL;
    784    QuorumVotes error_votes;
    785    QuorumVoteValue result_value;
    786    int i;
    787    int result = 0;
    788    int success_count = 0;
    789
    790    QLIST_INIT(&error_votes.vote_list);
    791    error_votes.compare = quorum_64bits_compare;
    792
    793    for (i = 0; i < s->num_children; i++) {
    794        result = bdrv_co_flush(s->children[i]->bs);
    795        if (result) {
    796            quorum_report_bad(QUORUM_OP_TYPE_FLUSH, 0, 0,
    797                              s->children[i]->bs->node_name, result);
    798            result_value.l = result;
    799            quorum_count_vote(&error_votes, &result_value, i);
    800        } else {
    801            success_count++;
    802        }
    803    }
    804
    805    if (success_count >= s->threshold) {
    806        result = 0;
    807    } else {
    808        winner = quorum_get_vote_winner(&error_votes);
    809        result = winner->value.l;
    810    }
    811    quorum_free_vote_list(&error_votes);
    812
    813    return result;
    814}
    815
    816static bool quorum_recurse_can_replace(BlockDriverState *bs,
    817                                       BlockDriverState *to_replace)
    818{
    819    BDRVQuorumState *s = bs->opaque;
    820    int i;
    821
    822    for (i = 0; i < s->num_children; i++) {
    823        /*
    824         * We have no idea whether our children show the same data as
    825         * this node (@bs).  It is actually highly likely that
    826         * @to_replace does not, because replacing a broken child is
    827         * one of the main use cases here.
    828         *
    829         * We do know that the new BDS will match @bs, so replacing
    830         * any of our children by it will be safe.  It cannot change
    831         * the data this quorum node presents to its parents.
    832         *
    833         * However, replacing @to_replace by @bs in any of our
    834         * children's chains may change visible data somewhere in
    835         * there.  We therefore cannot recurse down those chains with
    836         * bdrv_recurse_can_replace().
    837         * (More formally, bdrv_recurse_can_replace() requires that
    838         * @to_replace will be replaced by something matching the @bs
    839         * passed to it.  We cannot guarantee that.)
    840         *
    841         * Thus, we can only check whether any of our immediate
    842         * children matches @to_replace.
    843         *
    844         * (In the future, we might add a function to recurse down a
    845         * chain that checks that nothing there cares about a change
    846         * in data from the respective child in question.  For
    847         * example, most filters do not care when their child's data
    848         * suddenly changes, as long as their parents do not care.)
    849         */
    850        if (s->children[i]->bs == to_replace) {
    851            /*
    852             * We now have to ensure that there is no other parent
    853             * that cares about replacing this child by a node with
    854             * potentially different data.
    855             * We do so by checking whether there are any other parents
    856             * at all, which is stricter than necessary, but also very
    857             * simple.  (We may decide to implement something more
    858             * complex and permissive when there is an actual need for
    859             * it.)
    860             */
    861            return QLIST_FIRST(&to_replace->parents) == s->children[i] &&
    862                QLIST_NEXT(s->children[i], next_parent) == NULL;
    863        }
    864    }
    865
    866    return false;
    867}
    868
    869static int quorum_valid_threshold(int threshold, int num_children, Error **errp)
    870{
    871
    872    if (threshold < 1) {
    873        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
    874                   "vote-threshold", "a value >= 1");
    875        return -ERANGE;
    876    }
    877
    878    if (threshold > num_children) {
    879        error_setg(errp, "threshold may not exceed children count");
    880        return -ERANGE;
    881    }
    882
    883    return 0;
    884}
    885
    886static QemuOptsList quorum_runtime_opts = {
    887    .name = "quorum",
    888    .head = QTAILQ_HEAD_INITIALIZER(quorum_runtime_opts.head),
    889    .desc = {
    890        {
    891            .name = QUORUM_OPT_VOTE_THRESHOLD,
    892            .type = QEMU_OPT_NUMBER,
    893            .help = "The number of vote needed for reaching quorum",
    894        },
    895        {
    896            .name = QUORUM_OPT_BLKVERIFY,
    897            .type = QEMU_OPT_BOOL,
    898            .help = "Trigger block verify mode if set",
    899        },
    900        {
    901            .name = QUORUM_OPT_REWRITE,
    902            .type = QEMU_OPT_BOOL,
    903            .help = "Rewrite corrupted block on read quorum",
    904        },
    905        {
    906            .name = QUORUM_OPT_READ_PATTERN,
    907            .type = QEMU_OPT_STRING,
    908            .help = "Allowed pattern: quorum, fifo. Quorum is default",
    909        },
    910        { /* end of list */ }
    911    },
    912};
    913
    914static void quorum_refresh_flags(BlockDriverState *bs)
    915{
    916    BDRVQuorumState *s = bs->opaque;
    917    int i;
    918
    919    bs->supported_zero_flags =
    920        BDRV_REQ_FUA | BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
    921
    922    for (i = 0; i < s->num_children; i++) {
    923        bs->supported_zero_flags &= s->children[i]->bs->supported_zero_flags;
    924    }
    925
    926    bs->supported_zero_flags |= BDRV_REQ_WRITE_UNCHANGED;
    927}
    928
    929static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
    930                       Error **errp)
    931{
    932    BDRVQuorumState *s = bs->opaque;
    933    QemuOpts *opts = NULL;
    934    const char *pattern_str;
    935    bool *opened;
    936    int i;
    937    int ret = 0;
    938
    939    qdict_flatten(options);
    940
    941    /* count how many different children are present */
    942    s->num_children = qdict_array_entries(options, "children.");
    943    if (s->num_children < 0) {
    944        error_setg(errp, "Option children is not a valid array");
    945        ret = -EINVAL;
    946        goto exit;
    947    }
    948    if (s->num_children < 1) {
    949        error_setg(errp, "Number of provided children must be 1 or more");
    950        ret = -EINVAL;
    951        goto exit;
    952    }
    953
    954    opts = qemu_opts_create(&quorum_runtime_opts, NULL, 0, &error_abort);
    955    if (!qemu_opts_absorb_qdict(opts, options, errp)) {
    956        ret = -EINVAL;
    957        goto exit;
    958    }
    959
    960    s->threshold = qemu_opt_get_number(opts, QUORUM_OPT_VOTE_THRESHOLD, 0);
    961    /* and validate it against s->num_children */
    962    ret = quorum_valid_threshold(s->threshold, s->num_children, errp);
    963    if (ret < 0) {
    964        goto exit;
    965    }
    966
    967    pattern_str = qemu_opt_get(opts, QUORUM_OPT_READ_PATTERN);
    968    if (!pattern_str) {
    969        ret = QUORUM_READ_PATTERN_QUORUM;
    970    } else {
    971        ret = qapi_enum_parse(&QuorumReadPattern_lookup, pattern_str,
    972                              -EINVAL, NULL);
    973    }
    974    if (ret < 0) {
    975        error_setg(errp, "Please set read-pattern as fifo or quorum");
    976        goto exit;
    977    }
    978    s->read_pattern = ret;
    979
    980    if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
    981        s->is_blkverify = qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false);
    982        if (s->is_blkverify && (s->num_children != 2 || s->threshold != 2)) {
    983            error_setg(errp, "blkverify=on can only be set if there are "
    984                       "exactly two files and vote-threshold is 2");
    985            ret = -EINVAL;
    986            goto exit;
    987        }
    988
    989        s->rewrite_corrupted = qemu_opt_get_bool(opts, QUORUM_OPT_REWRITE,
    990                                                 false);
    991        if (s->rewrite_corrupted && s->is_blkverify) {
    992            error_setg(errp,
    993                       "rewrite-corrupted=on cannot be used with blkverify=on");
    994            ret = -EINVAL;
    995            goto exit;
    996        }
    997    }
    998
    999    /* allocate the children array */
   1000    s->children = g_new0(BdrvChild *, s->num_children);
   1001    opened = g_new0(bool, s->num_children);
   1002
   1003    for (i = 0; i < s->num_children; i++) {
   1004        char indexstr[INDEXSTR_LEN];
   1005        ret = snprintf(indexstr, INDEXSTR_LEN, "children.%d", i);
   1006        assert(ret < INDEXSTR_LEN);
   1007
   1008        s->children[i] = bdrv_open_child(NULL, options, indexstr, bs,
   1009                                         &child_of_bds, BDRV_CHILD_DATA, false,
   1010                                         errp);
   1011        if (!s->children[i]) {
   1012            ret = -EINVAL;
   1013            goto close_exit;
   1014        }
   1015
   1016        opened[i] = true;
   1017    }
   1018    s->next_child_index = s->num_children;
   1019
   1020    bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED;
   1021    quorum_refresh_flags(bs);
   1022
   1023    g_free(opened);
   1024    goto exit;
   1025
   1026close_exit:
   1027    /* cleanup on error */
   1028    for (i = 0; i < s->num_children; i++) {
   1029        if (!opened[i]) {
   1030            continue;
   1031        }
   1032        bdrv_unref_child(bs, s->children[i]);
   1033    }
   1034    g_free(s->children);
   1035    g_free(opened);
   1036exit:
   1037    qemu_opts_del(opts);
   1038    return ret;
   1039}
   1040
   1041static void quorum_close(BlockDriverState *bs)
   1042{
   1043    BDRVQuorumState *s = bs->opaque;
   1044    int i;
   1045
   1046    for (i = 0; i < s->num_children; i++) {
   1047        bdrv_unref_child(bs, s->children[i]);
   1048    }
   1049
   1050    g_free(s->children);
   1051}
   1052
   1053static void quorum_add_child(BlockDriverState *bs, BlockDriverState *child_bs,
   1054                             Error **errp)
   1055{
   1056    BDRVQuorumState *s = bs->opaque;
   1057    BdrvChild *child;
   1058    char indexstr[INDEXSTR_LEN];
   1059    int ret;
   1060
   1061    if (s->is_blkverify) {
   1062        error_setg(errp, "Cannot add a child to a quorum in blkverify mode");
   1063        return;
   1064    }
   1065
   1066    assert(s->num_children <= INT_MAX / sizeof(BdrvChild *));
   1067    if (s->num_children == INT_MAX / sizeof(BdrvChild *) ||
   1068        s->next_child_index == UINT_MAX) {
   1069        error_setg(errp, "Too many children");
   1070        return;
   1071    }
   1072
   1073    ret = snprintf(indexstr, INDEXSTR_LEN, "children.%u", s->next_child_index);
   1074    if (ret < 0 || ret >= INDEXSTR_LEN) {
   1075        error_setg(errp, "cannot generate child name");
   1076        return;
   1077    }
   1078    s->next_child_index++;
   1079
   1080    bdrv_drained_begin(bs);
   1081
   1082    /* We can safely add the child now */
   1083    bdrv_ref(child_bs);
   1084
   1085    child = bdrv_attach_child(bs, child_bs, indexstr, &child_of_bds,
   1086                              BDRV_CHILD_DATA, errp);
   1087    if (child == NULL) {
   1088        s->next_child_index--;
   1089        goto out;
   1090    }
   1091    s->children = g_renew(BdrvChild *, s->children, s->num_children + 1);
   1092    s->children[s->num_children++] = child;
   1093    quorum_refresh_flags(bs);
   1094
   1095out:
   1096    bdrv_drained_end(bs);
   1097}
   1098
   1099static void quorum_del_child(BlockDriverState *bs, BdrvChild *child,
   1100                             Error **errp)
   1101{
   1102    BDRVQuorumState *s = bs->opaque;
   1103    char indexstr[INDEXSTR_LEN];
   1104    int i;
   1105
   1106    for (i = 0; i < s->num_children; i++) {
   1107        if (s->children[i] == child) {
   1108            break;
   1109        }
   1110    }
   1111
   1112    /* we have checked it in bdrv_del_child() */
   1113    assert(i < s->num_children);
   1114
   1115    if (s->num_children <= s->threshold) {
   1116        error_setg(errp,
   1117            "The number of children cannot be lower than the vote threshold %d",
   1118            s->threshold);
   1119        return;
   1120    }
   1121
   1122    /* We know now that num_children > threshold, so blkverify must be false */
   1123    assert(!s->is_blkverify);
   1124
   1125    snprintf(indexstr, INDEXSTR_LEN, "children.%u", s->next_child_index - 1);
   1126    if (!strncmp(child->name, indexstr, INDEXSTR_LEN)) {
   1127        s->next_child_index--;
   1128    }
   1129
   1130    bdrv_drained_begin(bs);
   1131
   1132    /* We can safely remove this child now */
   1133    memmove(&s->children[i], &s->children[i + 1],
   1134            (s->num_children - i - 1) * sizeof(BdrvChild *));
   1135    s->children = g_renew(BdrvChild *, s->children, --s->num_children);
   1136    bdrv_unref_child(bs, child);
   1137
   1138    quorum_refresh_flags(bs);
   1139    bdrv_drained_end(bs);
   1140}
   1141
   1142static void quorum_gather_child_options(BlockDriverState *bs, QDict *target,
   1143                                        bool backing_overridden)
   1144{
   1145    BDRVQuorumState *s = bs->opaque;
   1146    QList *children_list;
   1147    int i;
   1148
   1149    /*
   1150     * The generic implementation for gathering child options in
   1151     * bdrv_refresh_filename() would use the names of the children
   1152     * as specified for bdrv_open_child() or bdrv_attach_child(),
   1153     * which is "children.%u" with %u being a value
   1154     * (s->next_child_index) that is incremented each time a new child
   1155     * is added (and never decremented).  Since children can be
   1156     * deleted at runtime, there may be gaps in that enumeration.
   1157     * When creating a new quorum BDS and specifying the children for
   1158     * it through runtime options, the enumeration used there may not
   1159     * have any gaps, though.
   1160     *
   1161     * Therefore, we have to create a new gap-less enumeration here
   1162     * (which we can achieve by simply putting all of the children's
   1163     * full_open_options into a QList).
   1164     *
   1165     * XXX: Note that there are issues with the current child option
   1166     *      structure quorum uses (such as the fact that children do
   1167     *      not really have unique permanent names).  Therefore, this
   1168     *      is going to have to change in the future and ideally we
   1169     *      want quorum to be covered by the generic implementation.
   1170     */
   1171
   1172    children_list = qlist_new();
   1173    qdict_put(target, "children", children_list);
   1174
   1175    for (i = 0; i < s->num_children; i++) {
   1176        qlist_append(children_list,
   1177                     qobject_ref(s->children[i]->bs->full_open_options));
   1178    }
   1179}
   1180
   1181static char *quorum_dirname(BlockDriverState *bs, Error **errp)
   1182{
   1183    /* In general, there are multiple BDSs with different dirnames below this
   1184     * one; so there is no unique dirname we could return (unless all are equal
   1185     * by chance, or there is only one). Therefore, to be consistent, just
   1186     * always return NULL. */
   1187    error_setg(errp, "Cannot generate a base directory for quorum nodes");
   1188    return NULL;
   1189}
   1190
   1191static void quorum_child_perm(BlockDriverState *bs, BdrvChild *c,
   1192                              BdrvChildRole role,
   1193                              BlockReopenQueue *reopen_queue,
   1194                              uint64_t perm, uint64_t shared,
   1195                              uint64_t *nperm, uint64_t *nshared)
   1196{
   1197    BDRVQuorumState *s = bs->opaque;
   1198
   1199    *nperm = perm & DEFAULT_PERM_PASSTHROUGH;
   1200    if (s->rewrite_corrupted) {
   1201        *nperm |= BLK_PERM_WRITE;
   1202    }
   1203
   1204    /*
   1205     * We cannot share RESIZE or WRITE, as this would make the
   1206     * children differ from each other.
   1207     */
   1208    *nshared = (shared & (BLK_PERM_CONSISTENT_READ |
   1209                          BLK_PERM_WRITE_UNCHANGED))
   1210             | DEFAULT_PERM_UNCHANGED;
   1211}
   1212
   1213/*
   1214 * Each one of the children can report different status flags even
   1215 * when they contain the same data, so what this function does is
   1216 * return BDRV_BLOCK_ZERO if *all* children agree that a certain
   1217 * region contains zeroes, and BDRV_BLOCK_DATA otherwise.
   1218 */
   1219static int coroutine_fn quorum_co_block_status(BlockDriverState *bs,
   1220                                               bool want_zero,
   1221                                               int64_t offset, int64_t count,
   1222                                               int64_t *pnum, int64_t *map,
   1223                                               BlockDriverState **file)
   1224{
   1225    BDRVQuorumState *s = bs->opaque;
   1226    int i, ret;
   1227    int64_t pnum_zero = count;
   1228    int64_t pnum_data = 0;
   1229
   1230    for (i = 0; i < s->num_children; i++) {
   1231        int64_t bytes;
   1232        ret = bdrv_co_common_block_status_above(s->children[i]->bs, NULL, false,
   1233                                                want_zero, offset, count,
   1234                                                &bytes, NULL, NULL, NULL);
   1235        if (ret < 0) {
   1236            quorum_report_bad(QUORUM_OP_TYPE_READ, offset, count,
   1237                              s->children[i]->bs->node_name, ret);
   1238            pnum_data = count;
   1239            break;
   1240        }
   1241        /*
   1242         * Even if all children agree about whether there are zeroes
   1243         * or not at @offset they might disagree on the size, so use
   1244         * the smallest when reporting BDRV_BLOCK_ZERO and the largest
   1245         * when reporting BDRV_BLOCK_DATA.
   1246         */
   1247        if (ret & BDRV_BLOCK_ZERO) {
   1248            pnum_zero = MIN(pnum_zero, bytes);
   1249        } else {
   1250            pnum_data = MAX(pnum_data, bytes);
   1251        }
   1252    }
   1253
   1254    if (pnum_data) {
   1255        *pnum = pnum_data;
   1256        return BDRV_BLOCK_DATA;
   1257    } else {
   1258        *pnum = pnum_zero;
   1259        return BDRV_BLOCK_ZERO;
   1260    }
   1261}
   1262
   1263static const char *const quorum_strong_runtime_opts[] = {
   1264    QUORUM_OPT_VOTE_THRESHOLD,
   1265    QUORUM_OPT_BLKVERIFY,
   1266    QUORUM_OPT_REWRITE,
   1267    QUORUM_OPT_READ_PATTERN,
   1268
   1269    NULL
   1270};
   1271
   1272static BlockDriver bdrv_quorum = {
   1273    .format_name                        = "quorum",
   1274
   1275    .instance_size                      = sizeof(BDRVQuorumState),
   1276
   1277    .bdrv_open                          = quorum_open,
   1278    .bdrv_close                         = quorum_close,
   1279    .bdrv_gather_child_options          = quorum_gather_child_options,
   1280    .bdrv_dirname                       = quorum_dirname,
   1281    .bdrv_co_block_status               = quorum_co_block_status,
   1282
   1283    .bdrv_co_flush                      = quorum_co_flush,
   1284
   1285    .bdrv_getlength                     = quorum_getlength,
   1286
   1287    .bdrv_co_preadv                     = quorum_co_preadv,
   1288    .bdrv_co_pwritev                    = quorum_co_pwritev,
   1289    .bdrv_co_pwrite_zeroes              = quorum_co_pwrite_zeroes,
   1290
   1291    .bdrv_add_child                     = quorum_add_child,
   1292    .bdrv_del_child                     = quorum_del_child,
   1293
   1294    .bdrv_child_perm                    = quorum_child_perm,
   1295
   1296    .bdrv_recurse_can_replace           = quorum_recurse_can_replace,
   1297
   1298    .strong_runtime_opts                = quorum_strong_runtime_opts,
   1299};
   1300
   1301static void bdrv_quorum_init(void)
   1302{
   1303    if (!qcrypto_hash_supports(QCRYPTO_HASH_ALG_SHA256)) {
   1304        /* SHA256 hash support is required for quorum device */
   1305        return;
   1306    }
   1307    bdrv_register(&bdrv_quorum);
   1308}
   1309
   1310block_init(bdrv_quorum_init);