cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

rbd.c (44562B)


      1/*
      2 * QEMU Block driver for RADOS (Ceph)
      3 *
      4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
      5 *                         Josh Durgin <josh.durgin@dreamhost.com>
      6 *
      7 * This work is licensed under the terms of the GNU GPL, version 2.  See
      8 * the COPYING file in the top-level directory.
      9 *
     10 * Contributions after 2012-01-13 are licensed under the terms of the
     11 * GNU GPL, version 2 or (at your option) any later version.
     12 */
     13
     14#include "qemu/osdep.h"
     15
     16#include <rbd/librbd.h>
     17#include "qapi/error.h"
     18#include "qemu/error-report.h"
     19#include "qemu/module.h"
     20#include "qemu/option.h"
     21#include "block/block_int.h"
     22#include "block/qdict.h"
     23#include "crypto/secret.h"
     24#include "qemu/cutils.h"
     25#include "sysemu/replay.h"
     26#include "qapi/qmp/qstring.h"
     27#include "qapi/qmp/qdict.h"
     28#include "qapi/qmp/qjson.h"
     29#include "qapi/qmp/qlist.h"
     30#include "qapi/qobject-input-visitor.h"
     31#include "qapi/qapi-visit-block-core.h"
     32
     33/*
     34 * When specifying the image filename use:
     35 *
     36 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
     37 *
     38 * poolname must be the name of an existing rados pool.
     39 *
     40 * devicename is the name of the rbd image.
     41 *
     42 * Each option given is used to configure rados, and may be any valid
     43 * Ceph option, "id", or "conf".
     44 *
     45 * The "id" option indicates what user we should authenticate as to
     46 * the Ceph cluster.  If it is excluded we will use the Ceph default
     47 * (normally 'admin').
     48 *
     49 * The "conf" option specifies a Ceph configuration file to read.  If
     50 * it is not specified, we will read from the default Ceph locations
     51 * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
     52 * file, specify conf=/dev/null.
     53 *
     54 * Configuration values containing :, @, or = can be escaped with a
     55 * leading "\".
     56 */
     57
     58#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
     59
     60#define RBD_MAX_SNAPS 100
     61
     62#define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
     63
     64static const char rbd_luks_header_verification[
     65        RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
     66    'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
     67};
     68
     69static const char rbd_luks2_header_verification[
     70        RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
     71    'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
     72};
     73
     74typedef enum {
     75    RBD_AIO_READ,
     76    RBD_AIO_WRITE,
     77    RBD_AIO_DISCARD,
     78    RBD_AIO_FLUSH,
     79    RBD_AIO_WRITE_ZEROES
     80} RBDAIOCmd;
     81
     82typedef struct BDRVRBDState {
     83    rados_t cluster;
     84    rados_ioctx_t io_ctx;
     85    rbd_image_t image;
     86    char *image_name;
     87    char *snap;
     88    char *namespace;
     89    uint64_t image_size;
     90    uint64_t object_size;
     91} BDRVRBDState;
     92
     93typedef struct RBDTask {
     94    BlockDriverState *bs;
     95    Coroutine *co;
     96    bool complete;
     97    int64_t ret;
     98} RBDTask;
     99
    100static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
    101                            BlockdevOptionsRbd *opts, bool cache,
    102                            const char *keypairs, const char *secretid,
    103                            Error **errp);
    104
    105static char *qemu_rbd_strchr(char *src, char delim)
    106{
    107    char *p;
    108
    109    for (p = src; *p; ++p) {
    110        if (*p == delim) {
    111            return p;
    112        }
    113        if (*p == '\\' && p[1] != '\0') {
    114            ++p;
    115        }
    116    }
    117
    118    return NULL;
    119}
    120
    121
    122static char *qemu_rbd_next_tok(char *src, char delim, char **p)
    123{
    124    char *end;
    125
    126    *p = NULL;
    127
    128    end = qemu_rbd_strchr(src, delim);
    129    if (end) {
    130        *p = end + 1;
    131        *end = '\0';
    132    }
    133    return src;
    134}
    135
    136static void qemu_rbd_unescape(char *src)
    137{
    138    char *p;
    139
    140    for (p = src; *src; ++src, ++p) {
    141        if (*src == '\\' && src[1] != '\0') {
    142            src++;
    143        }
    144        *p = *src;
    145    }
    146    *p = '\0';
    147}
    148
    149static void qemu_rbd_parse_filename(const char *filename, QDict *options,
    150                                    Error **errp)
    151{
    152    const char *start;
    153    char *p, *buf;
    154    QList *keypairs = NULL;
    155    char *found_str, *image_name;
    156
    157    if (!strstart(filename, "rbd:", &start)) {
    158        error_setg(errp, "File name must start with 'rbd:'");
    159        return;
    160    }
    161
    162    buf = g_strdup(start);
    163    p = buf;
    164
    165    found_str = qemu_rbd_next_tok(p, '/', &p);
    166    if (!p) {
    167        error_setg(errp, "Pool name is required");
    168        goto done;
    169    }
    170    qemu_rbd_unescape(found_str);
    171    qdict_put_str(options, "pool", found_str);
    172
    173    if (qemu_rbd_strchr(p, '@')) {
    174        image_name = qemu_rbd_next_tok(p, '@', &p);
    175
    176        found_str = qemu_rbd_next_tok(p, ':', &p);
    177        qemu_rbd_unescape(found_str);
    178        qdict_put_str(options, "snapshot", found_str);
    179    } else {
    180        image_name = qemu_rbd_next_tok(p, ':', &p);
    181    }
    182    /* Check for namespace in the image_name */
    183    if (qemu_rbd_strchr(image_name, '/')) {
    184        found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
    185        qemu_rbd_unescape(found_str);
    186        qdict_put_str(options, "namespace", found_str);
    187    } else {
    188        qdict_put_str(options, "namespace", "");
    189    }
    190    qemu_rbd_unescape(image_name);
    191    qdict_put_str(options, "image", image_name);
    192    if (!p) {
    193        goto done;
    194    }
    195
    196    /* The following are essentially all key/value pairs, and we treat
    197     * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
    198    while (p) {
    199        char *name, *value;
    200        name = qemu_rbd_next_tok(p, '=', &p);
    201        if (!p) {
    202            error_setg(errp, "conf option %s has no value", name);
    203            break;
    204        }
    205
    206        qemu_rbd_unescape(name);
    207
    208        value = qemu_rbd_next_tok(p, ':', &p);
    209        qemu_rbd_unescape(value);
    210
    211        if (!strcmp(name, "conf")) {
    212            qdict_put_str(options, "conf", value);
    213        } else if (!strcmp(name, "id")) {
    214            qdict_put_str(options, "user", value);
    215        } else {
    216            /*
    217             * We pass these internally to qemu_rbd_set_keypairs(), so
    218             * we can get away with the simpler list of [ "key1",
    219             * "value1", "key2", "value2" ] rather than a raw dict
    220             * { "key1": "value1", "key2": "value2" } where we can't
    221             * guarantee order, or even a more correct but complex
    222             * [ { "key1": "value1" }, { "key2": "value2" } ]
    223             */
    224            if (!keypairs) {
    225                keypairs = qlist_new();
    226            }
    227            qlist_append_str(keypairs, name);
    228            qlist_append_str(keypairs, value);
    229        }
    230    }
    231
    232    if (keypairs) {
    233        qdict_put(options, "=keyvalue-pairs",
    234                  qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
    235    }
    236
    237done:
    238    g_free(buf);
    239    qobject_unref(keypairs);
    240    return;
    241}
    242
    243static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
    244                             Error **errp)
    245{
    246    char *key, *acr;
    247    int r;
    248    GString *accu;
    249    RbdAuthModeList *auth;
    250
    251    if (opts->key_secret) {
    252        key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
    253        if (!key) {
    254            return -EIO;
    255        }
    256        r = rados_conf_set(cluster, "key", key);
    257        g_free(key);
    258        if (r < 0) {
    259            error_setg_errno(errp, -r, "Could not set 'key'");
    260            return r;
    261        }
    262    }
    263
    264    if (opts->has_auth_client_required) {
    265        accu = g_string_new("");
    266        for (auth = opts->auth_client_required; auth; auth = auth->next) {
    267            if (accu->str[0]) {
    268                g_string_append_c(accu, ';');
    269            }
    270            g_string_append(accu, RbdAuthMode_str(auth->value));
    271        }
    272        acr = g_string_free(accu, FALSE);
    273        r = rados_conf_set(cluster, "auth_client_required", acr);
    274        g_free(acr);
    275        if (r < 0) {
    276            error_setg_errno(errp, -r,
    277                             "Could not set 'auth_client_required'");
    278            return r;
    279        }
    280    }
    281
    282    return 0;
    283}
    284
    285static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
    286                                 Error **errp)
    287{
    288    QList *keypairs;
    289    QString *name;
    290    QString *value;
    291    const char *key;
    292    size_t remaining;
    293    int ret = 0;
    294
    295    if (!keypairs_json) {
    296        return ret;
    297    }
    298    keypairs = qobject_to(QList,
    299                          qobject_from_json(keypairs_json, &error_abort));
    300    remaining = qlist_size(keypairs) / 2;
    301    assert(remaining);
    302
    303    while (remaining--) {
    304        name = qobject_to(QString, qlist_pop(keypairs));
    305        value = qobject_to(QString, qlist_pop(keypairs));
    306        assert(name && value);
    307        key = qstring_get_str(name);
    308
    309        ret = rados_conf_set(cluster, key, qstring_get_str(value));
    310        qobject_unref(value);
    311        if (ret < 0) {
    312            error_setg_errno(errp, -ret, "invalid conf option %s", key);
    313            qobject_unref(name);
    314            ret = -EINVAL;
    315            break;
    316        }
    317        qobject_unref(name);
    318    }
    319
    320    qobject_unref(keypairs);
    321    return ret;
    322}
    323
    324#ifdef LIBRBD_SUPPORTS_ENCRYPTION
    325static int qemu_rbd_convert_luks_options(
    326        RbdEncryptionOptionsLUKSBase *luks_opts,
    327        char **passphrase,
    328        size_t *passphrase_len,
    329        Error **errp)
    330{
    331    return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
    332                                 passphrase_len, errp);
    333}
    334
    335static int qemu_rbd_convert_luks_create_options(
    336        RbdEncryptionCreateOptionsLUKSBase *luks_opts,
    337        rbd_encryption_algorithm_t *alg,
    338        char **passphrase,
    339        size_t *passphrase_len,
    340        Error **errp)
    341{
    342    int r = 0;
    343
    344    r = qemu_rbd_convert_luks_options(
    345            qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
    346            passphrase, passphrase_len, errp);
    347    if (r < 0) {
    348        return r;
    349    }
    350
    351    if (luks_opts->has_cipher_alg) {
    352        switch (luks_opts->cipher_alg) {
    353            case QCRYPTO_CIPHER_ALG_AES_128: {
    354                *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
    355                break;
    356            }
    357            case QCRYPTO_CIPHER_ALG_AES_256: {
    358                *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
    359                break;
    360            }
    361            default: {
    362                r = -ENOTSUP;
    363                error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
    364                                 luks_opts->cipher_alg);
    365                return r;
    366            }
    367        }
    368    } else {
    369        /* default alg */
    370        *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
    371    }
    372
    373    return 0;
    374}
    375
    376static int qemu_rbd_encryption_format(rbd_image_t image,
    377                                      RbdEncryptionCreateOptions *encrypt,
    378                                      Error **errp)
    379{
    380    int r = 0;
    381    g_autofree char *passphrase = NULL;
    382    size_t passphrase_len;
    383    rbd_encryption_format_t format;
    384    rbd_encryption_options_t opts;
    385    rbd_encryption_luks1_format_options_t luks_opts;
    386    rbd_encryption_luks2_format_options_t luks2_opts;
    387    size_t opts_size;
    388    uint64_t raw_size, effective_size;
    389
    390    r = rbd_get_size(image, &raw_size);
    391    if (r < 0) {
    392        error_setg_errno(errp, -r, "cannot get raw image size");
    393        return r;
    394    }
    395
    396    switch (encrypt->format) {
    397        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
    398            memset(&luks_opts, 0, sizeof(luks_opts));
    399            format = RBD_ENCRYPTION_FORMAT_LUKS1;
    400            opts = &luks_opts;
    401            opts_size = sizeof(luks_opts);
    402            r = qemu_rbd_convert_luks_create_options(
    403                    qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
    404                    &luks_opts.alg, &passphrase, &passphrase_len, errp);
    405            if (r < 0) {
    406                return r;
    407            }
    408            luks_opts.passphrase = passphrase;
    409            luks_opts.passphrase_size = passphrase_len;
    410            break;
    411        }
    412        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
    413            memset(&luks2_opts, 0, sizeof(luks2_opts));
    414            format = RBD_ENCRYPTION_FORMAT_LUKS2;
    415            opts = &luks2_opts;
    416            opts_size = sizeof(luks2_opts);
    417            r = qemu_rbd_convert_luks_create_options(
    418                    qapi_RbdEncryptionCreateOptionsLUKS2_base(
    419                            &encrypt->u.luks2),
    420                    &luks2_opts.alg, &passphrase, &passphrase_len, errp);
    421            if (r < 0) {
    422                return r;
    423            }
    424            luks2_opts.passphrase = passphrase;
    425            luks2_opts.passphrase_size = passphrase_len;
    426            break;
    427        }
    428        default: {
    429            r = -ENOTSUP;
    430            error_setg_errno(
    431                    errp, -r, "unknown image encryption format: %u",
    432                    encrypt->format);
    433            return r;
    434        }
    435    }
    436
    437    r = rbd_encryption_format(image, format, opts, opts_size);
    438    if (r < 0) {
    439        error_setg_errno(errp, -r, "encryption format fail");
    440        return r;
    441    }
    442
    443    r = rbd_get_size(image, &effective_size);
    444    if (r < 0) {
    445        error_setg_errno(errp, -r, "cannot get effective image size");
    446        return r;
    447    }
    448
    449    r = rbd_resize(image, raw_size + (raw_size - effective_size));
    450    if (r < 0) {
    451        error_setg_errno(errp, -r, "cannot resize image after format");
    452        return r;
    453    }
    454
    455    return 0;
    456}
    457
    458static int qemu_rbd_encryption_load(rbd_image_t image,
    459                                    RbdEncryptionOptions *encrypt,
    460                                    Error **errp)
    461{
    462    int r = 0;
    463    g_autofree char *passphrase = NULL;
    464    size_t passphrase_len;
    465    rbd_encryption_luks1_format_options_t luks_opts;
    466    rbd_encryption_luks2_format_options_t luks2_opts;
    467    rbd_encryption_format_t format;
    468    rbd_encryption_options_t opts;
    469    size_t opts_size;
    470
    471    switch (encrypt->format) {
    472        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
    473            memset(&luks_opts, 0, sizeof(luks_opts));
    474            format = RBD_ENCRYPTION_FORMAT_LUKS1;
    475            opts = &luks_opts;
    476            opts_size = sizeof(luks_opts);
    477            r = qemu_rbd_convert_luks_options(
    478                    qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
    479                    &passphrase, &passphrase_len, errp);
    480            if (r < 0) {
    481                return r;
    482            }
    483            luks_opts.passphrase = passphrase;
    484            luks_opts.passphrase_size = passphrase_len;
    485            break;
    486        }
    487        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
    488            memset(&luks2_opts, 0, sizeof(luks2_opts));
    489            format = RBD_ENCRYPTION_FORMAT_LUKS2;
    490            opts = &luks2_opts;
    491            opts_size = sizeof(luks2_opts);
    492            r = qemu_rbd_convert_luks_options(
    493                    qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
    494                    &passphrase, &passphrase_len, errp);
    495            if (r < 0) {
    496                return r;
    497            }
    498            luks2_opts.passphrase = passphrase;
    499            luks2_opts.passphrase_size = passphrase_len;
    500            break;
    501        }
    502        default: {
    503            r = -ENOTSUP;
    504            error_setg_errno(
    505                    errp, -r, "unknown image encryption format: %u",
    506                    encrypt->format);
    507            return r;
    508        }
    509    }
    510
    511    r = rbd_encryption_load(image, format, opts, opts_size);
    512    if (r < 0) {
    513        error_setg_errno(errp, -r, "encryption load fail");
    514        return r;
    515    }
    516
    517    return 0;
    518}
    519#endif
    520
    521/* FIXME Deprecate and remove keypairs or make it available in QMP. */
    522static int qemu_rbd_do_create(BlockdevCreateOptions *options,
    523                              const char *keypairs, const char *password_secret,
    524                              Error **errp)
    525{
    526    BlockdevCreateOptionsRbd *opts = &options->u.rbd;
    527    rados_t cluster;
    528    rados_ioctx_t io_ctx;
    529    int obj_order = 0;
    530    int ret;
    531
    532    assert(options->driver == BLOCKDEV_DRIVER_RBD);
    533    if (opts->location->has_snapshot) {
    534        error_setg(errp, "Can't use snapshot name for image creation");
    535        return -EINVAL;
    536    }
    537
    538#ifndef LIBRBD_SUPPORTS_ENCRYPTION
    539    if (opts->has_encrypt) {
    540        error_setg(errp, "RBD library does not support image encryption");
    541        return -ENOTSUP;
    542    }
    543#endif
    544
    545    if (opts->has_cluster_size) {
    546        int64_t objsize = opts->cluster_size;
    547        if ((objsize - 1) & objsize) {    /* not a power of 2? */
    548            error_setg(errp, "obj size needs to be power of 2");
    549            return -EINVAL;
    550        }
    551        if (objsize < 4096) {
    552            error_setg(errp, "obj size too small");
    553            return -EINVAL;
    554        }
    555        obj_order = ctz32(objsize);
    556    }
    557
    558    ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
    559                           password_secret, errp);
    560    if (ret < 0) {
    561        return ret;
    562    }
    563
    564    ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
    565    if (ret < 0) {
    566        error_setg_errno(errp, -ret, "error rbd create");
    567        goto out;
    568    }
    569
    570#ifdef LIBRBD_SUPPORTS_ENCRYPTION
    571    if (opts->has_encrypt) {
    572        rbd_image_t image;
    573
    574        ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
    575        if (ret < 0) {
    576            error_setg_errno(errp, -ret,
    577                             "error opening image '%s' for encryption format",
    578                             opts->location->image);
    579            goto out;
    580        }
    581
    582        ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
    583        rbd_close(image);
    584        if (ret < 0) {
    585            /* encryption format fail, try removing the image */
    586            rbd_remove(io_ctx, opts->location->image);
    587            goto out;
    588        }
    589    }
    590#endif
    591
    592    ret = 0;
    593out:
    594    rados_ioctx_destroy(io_ctx);
    595    rados_shutdown(cluster);
    596    return ret;
    597}
    598
    599static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
    600{
    601    return qemu_rbd_do_create(options, NULL, NULL, errp);
    602}
    603
    604static int qemu_rbd_extract_encryption_create_options(
    605        QemuOpts *opts,
    606        RbdEncryptionCreateOptions **spec,
    607        Error **errp)
    608{
    609    QDict *opts_qdict;
    610    QDict *encrypt_qdict;
    611    Visitor *v;
    612    int ret = 0;
    613
    614    opts_qdict = qemu_opts_to_qdict(opts, NULL);
    615    qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
    616    qobject_unref(opts_qdict);
    617    if (!qdict_size(encrypt_qdict)) {
    618        *spec = NULL;
    619        goto exit;
    620    }
    621
    622    /* Convert options into a QAPI object */
    623    v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
    624    if (!v) {
    625        ret = -EINVAL;
    626        goto exit;
    627    }
    628
    629    visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
    630    visit_free(v);
    631    if (!*spec) {
    632        ret = -EINVAL;
    633        goto exit;
    634    }
    635
    636exit:
    637    qobject_unref(encrypt_qdict);
    638    return ret;
    639}
    640
    641static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
    642                                                const char *filename,
    643                                                QemuOpts *opts,
    644                                                Error **errp)
    645{
    646    BlockdevCreateOptions *create_options;
    647    BlockdevCreateOptionsRbd *rbd_opts;
    648    BlockdevOptionsRbd *loc;
    649    RbdEncryptionCreateOptions *encrypt = NULL;
    650    Error *local_err = NULL;
    651    const char *keypairs, *password_secret;
    652    QDict *options = NULL;
    653    int ret = 0;
    654
    655    create_options = g_new0(BlockdevCreateOptions, 1);
    656    create_options->driver = BLOCKDEV_DRIVER_RBD;
    657    rbd_opts = &create_options->u.rbd;
    658
    659    rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
    660
    661    password_secret = qemu_opt_get(opts, "password-secret");
    662
    663    /* Read out options */
    664    rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
    665                              BDRV_SECTOR_SIZE);
    666    rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
    667                                                   BLOCK_OPT_CLUSTER_SIZE, 0);
    668    rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
    669
    670    options = qdict_new();
    671    qemu_rbd_parse_filename(filename, options, &local_err);
    672    if (local_err) {
    673        ret = -EINVAL;
    674        error_propagate(errp, local_err);
    675        goto exit;
    676    }
    677
    678    ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
    679    if (ret < 0) {
    680        goto exit;
    681    }
    682    rbd_opts->encrypt     = encrypt;
    683    rbd_opts->has_encrypt = !!encrypt;
    684
    685    /*
    686     * Caution: while qdict_get_try_str() is fine, getting non-string
    687     * types would require more care.  When @options come from -blockdev
    688     * or blockdev_add, its members are typed according to the QAPI
    689     * schema, but when they come from -drive, they're all QString.
    690     */
    691    loc = rbd_opts->location;
    692    loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
    693    loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
    694    loc->has_conf    = !!loc->conf;
    695    loc->user        = g_strdup(qdict_get_try_str(options, "user"));
    696    loc->has_user    = !!loc->user;
    697    loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
    698    loc->has_q_namespace = !!loc->q_namespace;
    699    loc->image       = g_strdup(qdict_get_try_str(options, "image"));
    700    keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
    701
    702    ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
    703    if (ret < 0) {
    704        goto exit;
    705    }
    706
    707exit:
    708    qobject_unref(options);
    709    qapi_free_BlockdevCreateOptions(create_options);
    710    return ret;
    711}
    712
    713static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
    714{
    715    const char **vals;
    716    const char *host, *port;
    717    char *rados_str;
    718    InetSocketAddressBaseList *p;
    719    int i, cnt;
    720
    721    if (!opts->has_server) {
    722        return NULL;
    723    }
    724
    725    for (cnt = 0, p = opts->server; p; p = p->next) {
    726        cnt++;
    727    }
    728
    729    vals = g_new(const char *, cnt + 1);
    730
    731    for (i = 0, p = opts->server; p; p = p->next, i++) {
    732        host = p->value->host;
    733        port = p->value->port;
    734
    735        if (strchr(host, ':')) {
    736            vals[i] = g_strdup_printf("[%s]:%s", host, port);
    737        } else {
    738            vals[i] = g_strdup_printf("%s:%s", host, port);
    739        }
    740    }
    741    vals[i] = NULL;
    742
    743    rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
    744    g_strfreev((char **)vals);
    745    return rados_str;
    746}
    747
    748static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
    749                            BlockdevOptionsRbd *opts, bool cache,
    750                            const char *keypairs, const char *secretid,
    751                            Error **errp)
    752{
    753    char *mon_host = NULL;
    754    Error *local_err = NULL;
    755    int r;
    756
    757    if (secretid) {
    758        if (opts->key_secret) {
    759            error_setg(errp,
    760                       "Legacy 'password-secret' clashes with 'key-secret'");
    761            return -EINVAL;
    762        }
    763        opts->key_secret = g_strdup(secretid);
    764        opts->has_key_secret = true;
    765    }
    766
    767    mon_host = qemu_rbd_mon_host(opts, &local_err);
    768    if (local_err) {
    769        error_propagate(errp, local_err);
    770        r = -EINVAL;
    771        goto out;
    772    }
    773
    774    r = rados_create(cluster, opts->user);
    775    if (r < 0) {
    776        error_setg_errno(errp, -r, "error initializing");
    777        goto out;
    778    }
    779
    780    /* try default location when conf=NULL, but ignore failure */
    781    r = rados_conf_read_file(*cluster, opts->conf);
    782    if (opts->has_conf && r < 0) {
    783        error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
    784        goto failed_shutdown;
    785    }
    786
    787    r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
    788    if (r < 0) {
    789        goto failed_shutdown;
    790    }
    791
    792    if (mon_host) {
    793        r = rados_conf_set(*cluster, "mon_host", mon_host);
    794        if (r < 0) {
    795            goto failed_shutdown;
    796        }
    797    }
    798
    799    r = qemu_rbd_set_auth(*cluster, opts, errp);
    800    if (r < 0) {
    801        goto failed_shutdown;
    802    }
    803
    804    /*
    805     * Fallback to more conservative semantics if setting cache
    806     * options fails. Ignore errors from setting rbd_cache because the
    807     * only possible error is that the option does not exist, and
    808     * librbd defaults to no caching. If write through caching cannot
    809     * be set up, fall back to no caching.
    810     */
    811    if (cache) {
    812        rados_conf_set(*cluster, "rbd_cache", "true");
    813    } else {
    814        rados_conf_set(*cluster, "rbd_cache", "false");
    815    }
    816
    817    r = rados_connect(*cluster);
    818    if (r < 0) {
    819        error_setg_errno(errp, -r, "error connecting");
    820        goto failed_shutdown;
    821    }
    822
    823    r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
    824    if (r < 0) {
    825        error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
    826        goto failed_shutdown;
    827    }
    828    /*
    829     * Set the namespace after opening the io context on the pool,
    830     * if nspace == NULL or if nspace == "", it is just as we did nothing
    831     */
    832    rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
    833
    834    r = 0;
    835    goto out;
    836
    837failed_shutdown:
    838    rados_shutdown(*cluster);
    839out:
    840    g_free(mon_host);
    841    return r;
    842}
    843
    844static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
    845                                    Error **errp)
    846{
    847    Visitor *v;
    848
    849    /* Convert the remaining options into a QAPI object */
    850    v = qobject_input_visitor_new_flat_confused(options, errp);
    851    if (!v) {
    852        return -EINVAL;
    853    }
    854
    855    visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
    856    visit_free(v);
    857    if (!opts) {
    858        return -EINVAL;
    859    }
    860
    861    return 0;
    862}
    863
    864static int qemu_rbd_attempt_legacy_options(QDict *options,
    865                                           BlockdevOptionsRbd **opts,
    866                                           char **keypairs)
    867{
    868    char *filename;
    869    int r;
    870
    871    filename = g_strdup(qdict_get_try_str(options, "filename"));
    872    if (!filename) {
    873        return -EINVAL;
    874    }
    875    qdict_del(options, "filename");
    876
    877    qemu_rbd_parse_filename(filename, options, NULL);
    878
    879    /* keypairs freed by caller */
    880    *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
    881    if (*keypairs) {
    882        qdict_del(options, "=keyvalue-pairs");
    883    }
    884
    885    r = qemu_rbd_convert_options(options, opts, NULL);
    886
    887    g_free(filename);
    888    return r;
    889}
    890
    891static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
    892                         Error **errp)
    893{
    894    BDRVRBDState *s = bs->opaque;
    895    BlockdevOptionsRbd *opts = NULL;
    896    const QDictEntry *e;
    897    Error *local_err = NULL;
    898    char *keypairs, *secretid;
    899    rbd_image_info_t info;
    900    int r;
    901
    902    keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
    903    if (keypairs) {
    904        qdict_del(options, "=keyvalue-pairs");
    905    }
    906
    907    secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
    908    if (secretid) {
    909        qdict_del(options, "password-secret");
    910    }
    911
    912    r = qemu_rbd_convert_options(options, &opts, &local_err);
    913    if (local_err) {
    914        /* If keypairs are present, that means some options are present in
    915         * the modern option format.  Don't attempt to parse legacy option
    916         * formats, as we won't support mixed usage. */
    917        if (keypairs) {
    918            error_propagate(errp, local_err);
    919            goto out;
    920        }
    921
    922        /* If the initial attempt to convert and process the options failed,
    923         * we may be attempting to open an image file that has the rbd options
    924         * specified in the older format consisting of all key/value pairs
    925         * encoded in the filename.  Go ahead and attempt to parse the
    926         * filename, and see if we can pull out the required options. */
    927        r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
    928        if (r < 0) {
    929            /* Propagate the original error, not the legacy parsing fallback
    930             * error, as the latter was just a best-effort attempt. */
    931            error_propagate(errp, local_err);
    932            goto out;
    933        }
    934        /* Take care whenever deciding to actually deprecate; once this ability
    935         * is removed, we will not be able to open any images with legacy-styled
    936         * backing image strings. */
    937        warn_report("RBD options encoded in the filename as keyvalue pairs "
    938                    "is deprecated");
    939    }
    940
    941    /* Remove the processed options from the QDict (the visitor processes
    942     * _all_ options in the QDict) */
    943    while ((e = qdict_first(options))) {
    944        qdict_del(options, e->key);
    945    }
    946
    947    r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
    948                         !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
    949    if (r < 0) {
    950        goto out;
    951    }
    952
    953    s->snap = g_strdup(opts->snapshot);
    954    s->image_name = g_strdup(opts->image);
    955
    956    /* rbd_open is always r/w */
    957    r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
    958    if (r < 0) {
    959        error_setg_errno(errp, -r, "error reading header from %s",
    960                         s->image_name);
    961        goto failed_open;
    962    }
    963
    964    if (opts->has_encrypt) {
    965#ifdef LIBRBD_SUPPORTS_ENCRYPTION
    966        r = qemu_rbd_encryption_load(s->image, opts->encrypt, errp);
    967        if (r < 0) {
    968            goto failed_post_open;
    969        }
    970#else
    971        r = -ENOTSUP;
    972        error_setg(errp, "RBD library does not support image encryption");
    973        goto failed_post_open;
    974#endif
    975    }
    976
    977    r = rbd_stat(s->image, &info, sizeof(info));
    978    if (r < 0) {
    979        error_setg_errno(errp, -r, "error getting image info from %s",
    980                         s->image_name);
    981        goto failed_post_open;
    982    }
    983    s->image_size = info.size;
    984    s->object_size = info.obj_size;
    985
    986    /* If we are using an rbd snapshot, we must be r/o, otherwise
    987     * leave as-is */
    988    if (s->snap != NULL) {
    989        r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
    990        if (r < 0) {
    991            goto failed_post_open;
    992        }
    993    }
    994
    995#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
    996    bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
    997#endif
    998
    999    /* When extending regular files, we get zeros from the OS */
   1000    bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
   1001
   1002    r = 0;
   1003    goto out;
   1004
   1005failed_post_open:
   1006    rbd_close(s->image);
   1007failed_open:
   1008    rados_ioctx_destroy(s->io_ctx);
   1009    g_free(s->snap);
   1010    g_free(s->image_name);
   1011    rados_shutdown(s->cluster);
   1012out:
   1013    qapi_free_BlockdevOptionsRbd(opts);
   1014    g_free(keypairs);
   1015    g_free(secretid);
   1016    return r;
   1017}
   1018
   1019
   1020/* Since RBD is currently always opened R/W via the API,
   1021 * we just need to check if we are using a snapshot or not, in
   1022 * order to determine if we will allow it to be R/W */
   1023static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
   1024                                   BlockReopenQueue *queue, Error **errp)
   1025{
   1026    BDRVRBDState *s = state->bs->opaque;
   1027    int ret = 0;
   1028
   1029    if (s->snap && state->flags & BDRV_O_RDWR) {
   1030        error_setg(errp,
   1031                   "Cannot change node '%s' to r/w when using RBD snapshot",
   1032                   bdrv_get_device_or_node_name(state->bs));
   1033        ret = -EINVAL;
   1034    }
   1035
   1036    return ret;
   1037}
   1038
   1039static void qemu_rbd_close(BlockDriverState *bs)
   1040{
   1041    BDRVRBDState *s = bs->opaque;
   1042
   1043    rbd_close(s->image);
   1044    rados_ioctx_destroy(s->io_ctx);
   1045    g_free(s->snap);
   1046    g_free(s->image_name);
   1047    rados_shutdown(s->cluster);
   1048}
   1049
   1050/* Resize the RBD image and update the 'image_size' with the current size */
   1051static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
   1052{
   1053    BDRVRBDState *s = bs->opaque;
   1054    int r;
   1055
   1056    r = rbd_resize(s->image, size);
   1057    if (r < 0) {
   1058        return r;
   1059    }
   1060
   1061    s->image_size = size;
   1062
   1063    return 0;
   1064}
   1065
   1066static void qemu_rbd_finish_bh(void *opaque)
   1067{
   1068    RBDTask *task = opaque;
   1069    task->complete = true;
   1070    aio_co_wake(task->co);
   1071}
   1072
   1073/*
   1074 * This is the completion callback function for all rbd aio calls
   1075 * started from qemu_rbd_start_co().
   1076 *
   1077 * Note: this function is being called from a non qemu thread so
   1078 * we need to be careful about what we do here. Generally we only
   1079 * schedule a BH, and do the rest of the io completion handling
   1080 * from qemu_rbd_finish_bh() which runs in a qemu context.
   1081 */
   1082static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
   1083{
   1084    task->ret = rbd_aio_get_return_value(c);
   1085    rbd_aio_release(c);
   1086    aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
   1087                            qemu_rbd_finish_bh, task);
   1088}
   1089
   1090static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
   1091                                          uint64_t offset,
   1092                                          uint64_t bytes,
   1093                                          QEMUIOVector *qiov,
   1094                                          int flags,
   1095                                          RBDAIOCmd cmd)
   1096{
   1097    BDRVRBDState *s = bs->opaque;
   1098    RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
   1099    rbd_completion_t c;
   1100    int r;
   1101
   1102    assert(!qiov || qiov->size == bytes);
   1103
   1104    r = rbd_aio_create_completion(&task,
   1105                                  (rbd_callback_t) qemu_rbd_completion_cb, &c);
   1106    if (r < 0) {
   1107        return r;
   1108    }
   1109
   1110    switch (cmd) {
   1111    case RBD_AIO_READ:
   1112        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
   1113        break;
   1114    case RBD_AIO_WRITE:
   1115        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
   1116        break;
   1117    case RBD_AIO_DISCARD:
   1118        r = rbd_aio_discard(s->image, offset, bytes, c);
   1119        break;
   1120    case RBD_AIO_FLUSH:
   1121        r = rbd_aio_flush(s->image, c);
   1122        break;
   1123#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
   1124    case RBD_AIO_WRITE_ZEROES: {
   1125        int zero_flags = 0;
   1126#ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
   1127        if (!(flags & BDRV_REQ_MAY_UNMAP)) {
   1128            zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
   1129        }
   1130#endif
   1131        r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
   1132        break;
   1133    }
   1134#endif
   1135    default:
   1136        r = -EINVAL;
   1137    }
   1138
   1139    if (r < 0) {
   1140        error_report("rbd request failed early: cmd %d offset %" PRIu64
   1141                     " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
   1142                     bytes, flags, r, strerror(-r));
   1143        rbd_aio_release(c);
   1144        return r;
   1145    }
   1146
   1147    while (!task.complete) {
   1148        qemu_coroutine_yield();
   1149    }
   1150
   1151    if (task.ret < 0) {
   1152        error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
   1153                     PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
   1154                     bytes, flags, task.ret, strerror(-task.ret));
   1155        return task.ret;
   1156    }
   1157
   1158    /* zero pad short reads */
   1159    if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
   1160        qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
   1161    }
   1162
   1163    return 0;
   1164}
   1165
   1166static int
   1167coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
   1168                                int64_t bytes, QEMUIOVector *qiov,
   1169                                BdrvRequestFlags flags)
   1170{
   1171    return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
   1172}
   1173
   1174static int
   1175coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
   1176                                 int64_t bytes, QEMUIOVector *qiov,
   1177                                 BdrvRequestFlags flags)
   1178{
   1179    BDRVRBDState *s = bs->opaque;
   1180    /*
   1181     * RBD APIs don't allow us to write more than actual size, so in order
   1182     * to support growing images, we resize the image before write
   1183     * operations that exceed the current size.
   1184     */
   1185    if (offset + bytes > s->image_size) {
   1186        int r = qemu_rbd_resize(bs, offset + bytes);
   1187        if (r < 0) {
   1188            return r;
   1189        }
   1190    }
   1191    return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
   1192}
   1193
   1194static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
   1195{
   1196    return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
   1197}
   1198
   1199static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
   1200                                             int64_t offset, int64_t bytes)
   1201{
   1202    return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
   1203}
   1204
   1205#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
   1206static int
   1207coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
   1208                                       int64_t bytes, BdrvRequestFlags flags)
   1209{
   1210    return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
   1211                             RBD_AIO_WRITE_ZEROES);
   1212}
   1213#endif
   1214
   1215static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
   1216{
   1217    BDRVRBDState *s = bs->opaque;
   1218    bdi->cluster_size = s->object_size;
   1219    return 0;
   1220}
   1221
   1222static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
   1223                                                     Error **errp)
   1224{
   1225    BDRVRBDState *s = bs->opaque;
   1226    ImageInfoSpecific *spec_info;
   1227    char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
   1228    int r;
   1229
   1230    if (s->image_size >= RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
   1231        r = rbd_read(s->image, 0,
   1232                     RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
   1233        if (r < 0) {
   1234            error_setg_errno(errp, -r, "cannot read image start for probe");
   1235            return NULL;
   1236        }
   1237    }
   1238
   1239    spec_info = g_new(ImageInfoSpecific, 1);
   1240    *spec_info = (ImageInfoSpecific){
   1241        .type  = IMAGE_INFO_SPECIFIC_KIND_RBD,
   1242        .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
   1243    };
   1244
   1245    if (memcmp(buf, rbd_luks_header_verification,
   1246               RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
   1247        spec_info->u.rbd.data->encryption_format =
   1248                RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
   1249        spec_info->u.rbd.data->has_encryption_format = true;
   1250    } else if (memcmp(buf, rbd_luks2_header_verification,
   1251               RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
   1252        spec_info->u.rbd.data->encryption_format =
   1253                RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
   1254        spec_info->u.rbd.data->has_encryption_format = true;
   1255    } else {
   1256        spec_info->u.rbd.data->has_encryption_format = false;
   1257    }
   1258
   1259    return spec_info;
   1260}
   1261
   1262static int64_t qemu_rbd_getlength(BlockDriverState *bs)
   1263{
   1264    BDRVRBDState *s = bs->opaque;
   1265    int r;
   1266
   1267    r = rbd_get_size(s->image, &s->image_size);
   1268    if (r < 0) {
   1269        return r;
   1270    }
   1271
   1272    return s->image_size;
   1273}
   1274
   1275static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
   1276                                             int64_t offset,
   1277                                             bool exact,
   1278                                             PreallocMode prealloc,
   1279                                             BdrvRequestFlags flags,
   1280                                             Error **errp)
   1281{
   1282    int r;
   1283
   1284    if (prealloc != PREALLOC_MODE_OFF) {
   1285        error_setg(errp, "Unsupported preallocation mode '%s'",
   1286                   PreallocMode_str(prealloc));
   1287        return -ENOTSUP;
   1288    }
   1289
   1290    r = qemu_rbd_resize(bs, offset);
   1291    if (r < 0) {
   1292        error_setg_errno(errp, -r, "Failed to resize file");
   1293        return r;
   1294    }
   1295
   1296    return 0;
   1297}
   1298
   1299static int qemu_rbd_snap_create(BlockDriverState *bs,
   1300                                QEMUSnapshotInfo *sn_info)
   1301{
   1302    BDRVRBDState *s = bs->opaque;
   1303    int r;
   1304
   1305    if (sn_info->name[0] == '\0') {
   1306        return -EINVAL; /* we need a name for rbd snapshots */
   1307    }
   1308
   1309    /*
   1310     * rbd snapshots are using the name as the user controlled unique identifier
   1311     * we can't use the rbd snapid for that purpose, as it can't be set
   1312     */
   1313    if (sn_info->id_str[0] != '\0' &&
   1314        strcmp(sn_info->id_str, sn_info->name) != 0) {
   1315        return -EINVAL;
   1316    }
   1317
   1318    if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
   1319        return -ERANGE;
   1320    }
   1321
   1322    r = rbd_snap_create(s->image, sn_info->name);
   1323    if (r < 0) {
   1324        error_report("failed to create snap: %s", strerror(-r));
   1325        return r;
   1326    }
   1327
   1328    return 0;
   1329}
   1330
   1331static int qemu_rbd_snap_remove(BlockDriverState *bs,
   1332                                const char *snapshot_id,
   1333                                const char *snapshot_name,
   1334                                Error **errp)
   1335{
   1336    BDRVRBDState *s = bs->opaque;
   1337    int r;
   1338
   1339    if (!snapshot_name) {
   1340        error_setg(errp, "rbd need a valid snapshot name");
   1341        return -EINVAL;
   1342    }
   1343
   1344    /* If snapshot_id is specified, it must be equal to name, see
   1345       qemu_rbd_snap_list() */
   1346    if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
   1347        error_setg(errp,
   1348                   "rbd do not support snapshot id, it should be NULL or "
   1349                   "equal to snapshot name");
   1350        return -EINVAL;
   1351    }
   1352
   1353    r = rbd_snap_remove(s->image, snapshot_name);
   1354    if (r < 0) {
   1355        error_setg_errno(errp, -r, "Failed to remove the snapshot");
   1356    }
   1357    return r;
   1358}
   1359
   1360static int qemu_rbd_snap_rollback(BlockDriverState *bs,
   1361                                  const char *snapshot_name)
   1362{
   1363    BDRVRBDState *s = bs->opaque;
   1364
   1365    return rbd_snap_rollback(s->image, snapshot_name);
   1366}
   1367
   1368static int qemu_rbd_snap_list(BlockDriverState *bs,
   1369                              QEMUSnapshotInfo **psn_tab)
   1370{
   1371    BDRVRBDState *s = bs->opaque;
   1372    QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
   1373    int i, snap_count;
   1374    rbd_snap_info_t *snaps;
   1375    int max_snaps = RBD_MAX_SNAPS;
   1376
   1377    do {
   1378        snaps = g_new(rbd_snap_info_t, max_snaps);
   1379        snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
   1380        if (snap_count <= 0) {
   1381            g_free(snaps);
   1382        }
   1383    } while (snap_count == -ERANGE);
   1384
   1385    if (snap_count <= 0) {
   1386        goto done;
   1387    }
   1388
   1389    sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
   1390
   1391    for (i = 0; i < snap_count; i++) {
   1392        const char *snap_name = snaps[i].name;
   1393
   1394        sn_info = sn_tab + i;
   1395        pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
   1396        pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
   1397
   1398        sn_info->vm_state_size = snaps[i].size;
   1399        sn_info->date_sec = 0;
   1400        sn_info->date_nsec = 0;
   1401        sn_info->vm_clock_nsec = 0;
   1402    }
   1403    rbd_snap_list_end(snaps);
   1404    g_free(snaps);
   1405
   1406 done:
   1407    *psn_tab = sn_tab;
   1408    return snap_count;
   1409}
   1410
   1411static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
   1412                                                      Error **errp)
   1413{
   1414    BDRVRBDState *s = bs->opaque;
   1415    int r = rbd_invalidate_cache(s->image);
   1416    if (r < 0) {
   1417        error_setg_errno(errp, -r, "Failed to invalidate the cache");
   1418    }
   1419}
   1420
   1421static QemuOptsList qemu_rbd_create_opts = {
   1422    .name = "rbd-create-opts",
   1423    .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
   1424    .desc = {
   1425        {
   1426            .name = BLOCK_OPT_SIZE,
   1427            .type = QEMU_OPT_SIZE,
   1428            .help = "Virtual disk size"
   1429        },
   1430        {
   1431            .name = BLOCK_OPT_CLUSTER_SIZE,
   1432            .type = QEMU_OPT_SIZE,
   1433            .help = "RBD object size"
   1434        },
   1435        {
   1436            .name = "password-secret",
   1437            .type = QEMU_OPT_STRING,
   1438            .help = "ID of secret providing the password",
   1439        },
   1440        {
   1441            .name = "encrypt.format",
   1442            .type = QEMU_OPT_STRING,
   1443            .help = "Encrypt the image, format choices: 'luks', 'luks2'",
   1444        },
   1445        {
   1446            .name = "encrypt.cipher-alg",
   1447            .type = QEMU_OPT_STRING,
   1448            .help = "Name of encryption cipher algorithm"
   1449                    " (allowed values: aes-128, aes-256)",
   1450        },
   1451        {
   1452            .name = "encrypt.key-secret",
   1453            .type = QEMU_OPT_STRING,
   1454            .help = "ID of secret providing LUKS passphrase",
   1455        },
   1456        { /* end of list */ }
   1457    }
   1458};
   1459
   1460static const char *const qemu_rbd_strong_runtime_opts[] = {
   1461    "pool",
   1462    "namespace",
   1463    "image",
   1464    "conf",
   1465    "snapshot",
   1466    "user",
   1467    "server.",
   1468    "password-secret",
   1469
   1470    NULL
   1471};
   1472
   1473static BlockDriver bdrv_rbd = {
   1474    .format_name            = "rbd",
   1475    .instance_size          = sizeof(BDRVRBDState),
   1476    .bdrv_parse_filename    = qemu_rbd_parse_filename,
   1477    .bdrv_file_open         = qemu_rbd_open,
   1478    .bdrv_close             = qemu_rbd_close,
   1479    .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
   1480    .bdrv_co_create         = qemu_rbd_co_create,
   1481    .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
   1482    .bdrv_has_zero_init     = bdrv_has_zero_init_1,
   1483    .bdrv_get_info          = qemu_rbd_getinfo,
   1484    .bdrv_get_specific_info = qemu_rbd_get_specific_info,
   1485    .create_opts            = &qemu_rbd_create_opts,
   1486    .bdrv_getlength         = qemu_rbd_getlength,
   1487    .bdrv_co_truncate       = qemu_rbd_co_truncate,
   1488    .protocol_name          = "rbd",
   1489
   1490    .bdrv_co_preadv         = qemu_rbd_co_preadv,
   1491    .bdrv_co_pwritev        = qemu_rbd_co_pwritev,
   1492    .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
   1493    .bdrv_co_pdiscard       = qemu_rbd_co_pdiscard,
   1494#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
   1495    .bdrv_co_pwrite_zeroes  = qemu_rbd_co_pwrite_zeroes,
   1496#endif
   1497
   1498    .bdrv_snapshot_create   = qemu_rbd_snap_create,
   1499    .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
   1500    .bdrv_snapshot_list     = qemu_rbd_snap_list,
   1501    .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
   1502    .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
   1503
   1504    .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
   1505};
   1506
   1507static void bdrv_rbd_init(void)
   1508{
   1509    bdrv_register(&bdrv_rbd);
   1510}
   1511
   1512block_init(bdrv_rbd_init);