qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

rbd.c (50071B)


      1 /*
      2  * QEMU Block driver for RADOS (Ceph)
      3  *
      4  * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
      5  *                         Josh Durgin <josh.durgin@dreamhost.com>
      6  *
      7  * This work is licensed under the terms of the GNU GPL, version 2.  See
      8  * the COPYING file in the top-level directory.
      9  *
     10  * Contributions after 2012-01-13 are licensed under the terms of the
     11  * GNU GPL, version 2 or (at your option) any later version.
     12  */
     13 
     14 #include "qemu/osdep.h"
     15 
     16 #include <rbd/librbd.h>
     17 #include "qapi/error.h"
     18 #include "qemu/error-report.h"
     19 #include "qemu/module.h"
     20 #include "qemu/option.h"
     21 #include "block/block_int.h"
     22 #include "block/qdict.h"
     23 #include "crypto/secret.h"
     24 #include "qemu/cutils.h"
     25 #include "sysemu/replay.h"
     26 #include "qapi/qmp/qstring.h"
     27 #include "qapi/qmp/qdict.h"
     28 #include "qapi/qmp/qjson.h"
     29 #include "qapi/qmp/qlist.h"
     30 #include "qapi/qobject-input-visitor.h"
     31 #include "qapi/qapi-visit-block-core.h"
     32 
     33 /*
     34  * When specifying the image filename use:
     35  *
     36  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
     37  *
     38  * poolname must be the name of an existing rados pool.
     39  *
     40  * devicename is the name of the rbd image.
     41  *
     42  * Each option given is used to configure rados, and may be any valid
     43  * Ceph option, "id", or "conf".
     44  *
     45  * The "id" option indicates what user we should authenticate as to
     46  * the Ceph cluster.  If it is excluded we will use the Ceph default
     47  * (normally 'admin').
     48  *
     49  * The "conf" option specifies a Ceph configuration file to read.  If
     50  * it is not specified, we will read from the default Ceph locations
     51  * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
     52  * file, specify conf=/dev/null.
     53  *
     54  * Configuration values containing :, @, or = can be escaped with a
     55  * leading "\".
     56  */
     57 
     58 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
     59 
     60 #define RBD_MAX_SNAPS 100
     61 
     62 #define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
     63 
     64 static const char rbd_luks_header_verification[
     65         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
     66     'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
     67 };
     68 
     69 static const char rbd_luks2_header_verification[
     70         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
     71     'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
     72 };
     73 
     74 typedef enum {
     75     RBD_AIO_READ,
     76     RBD_AIO_WRITE,
     77     RBD_AIO_DISCARD,
     78     RBD_AIO_FLUSH,
     79     RBD_AIO_WRITE_ZEROES
     80 } RBDAIOCmd;
     81 
     82 typedef struct BDRVRBDState {
     83     rados_t cluster;
     84     rados_ioctx_t io_ctx;
     85     rbd_image_t image;
     86     char *image_name;
     87     char *snap;
     88     char *namespace;
     89     uint64_t image_size;
     90     uint64_t object_size;
     91 } BDRVRBDState;
     92 
     93 typedef struct RBDTask {
     94     BlockDriverState *bs;
     95     Coroutine *co;
     96     bool complete;
     97     int64_t ret;
     98 } RBDTask;
     99 
    100 typedef struct RBDDiffIterateReq {
    101     uint64_t offs;
    102     uint64_t bytes;
    103     bool exists;
    104 } RBDDiffIterateReq;
    105 
    106 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
    107                             BlockdevOptionsRbd *opts, bool cache,
    108                             const char *keypairs, const char *secretid,
    109                             Error **errp);
    110 
    111 static char *qemu_rbd_strchr(char *src, char delim)
    112 {
    113     char *p;
    114 
    115     for (p = src; *p; ++p) {
    116         if (*p == delim) {
    117             return p;
    118         }
    119         if (*p == '\\' && p[1] != '\0') {
    120             ++p;
    121         }
    122     }
    123 
    124     return NULL;
    125 }
    126 
    127 
    128 static char *qemu_rbd_next_tok(char *src, char delim, char **p)
    129 {
    130     char *end;
    131 
    132     *p = NULL;
    133 
    134     end = qemu_rbd_strchr(src, delim);
    135     if (end) {
    136         *p = end + 1;
    137         *end = '\0';
    138     }
    139     return src;
    140 }
    141 
    142 static void qemu_rbd_unescape(char *src)
    143 {
    144     char *p;
    145 
    146     for (p = src; *src; ++src, ++p) {
    147         if (*src == '\\' && src[1] != '\0') {
    148             src++;
    149         }
    150         *p = *src;
    151     }
    152     *p = '\0';
    153 }
    154 
    155 static void qemu_rbd_parse_filename(const char *filename, QDict *options,
    156                                     Error **errp)
    157 {
    158     const char *start;
    159     char *p, *buf;
    160     QList *keypairs = NULL;
    161     char *found_str, *image_name;
    162 
    163     if (!strstart(filename, "rbd:", &start)) {
    164         error_setg(errp, "File name must start with 'rbd:'");
    165         return;
    166     }
    167 
    168     buf = g_strdup(start);
    169     p = buf;
    170 
    171     found_str = qemu_rbd_next_tok(p, '/', &p);
    172     if (!p) {
    173         error_setg(errp, "Pool name is required");
    174         goto done;
    175     }
    176     qemu_rbd_unescape(found_str);
    177     qdict_put_str(options, "pool", found_str);
    178 
    179     if (qemu_rbd_strchr(p, '@')) {
    180         image_name = qemu_rbd_next_tok(p, '@', &p);
    181 
    182         found_str = qemu_rbd_next_tok(p, ':', &p);
    183         qemu_rbd_unescape(found_str);
    184         qdict_put_str(options, "snapshot", found_str);
    185     } else {
    186         image_name = qemu_rbd_next_tok(p, ':', &p);
    187     }
    188     /* Check for namespace in the image_name */
    189     if (qemu_rbd_strchr(image_name, '/')) {
    190         found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
    191         qemu_rbd_unescape(found_str);
    192         qdict_put_str(options, "namespace", found_str);
    193     } else {
    194         qdict_put_str(options, "namespace", "");
    195     }
    196     qemu_rbd_unescape(image_name);
    197     qdict_put_str(options, "image", image_name);
    198     if (!p) {
    199         goto done;
    200     }
    201 
    202     /* The following are essentially all key/value pairs, and we treat
    203      * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
    204     while (p) {
    205         char *name, *value;
    206         name = qemu_rbd_next_tok(p, '=', &p);
    207         if (!p) {
    208             error_setg(errp, "conf option %s has no value", name);
    209             break;
    210         }
    211 
    212         qemu_rbd_unescape(name);
    213 
    214         value = qemu_rbd_next_tok(p, ':', &p);
    215         qemu_rbd_unescape(value);
    216 
    217         if (!strcmp(name, "conf")) {
    218             qdict_put_str(options, "conf", value);
    219         } else if (!strcmp(name, "id")) {
    220             qdict_put_str(options, "user", value);
    221         } else {
    222             /*
    223              * We pass these internally to qemu_rbd_set_keypairs(), so
    224              * we can get away with the simpler list of [ "key1",
    225              * "value1", "key2", "value2" ] rather than a raw dict
    226              * { "key1": "value1", "key2": "value2" } where we can't
    227              * guarantee order, or even a more correct but complex
    228              * [ { "key1": "value1" }, { "key2": "value2" } ]
    229              */
    230             if (!keypairs) {
    231                 keypairs = qlist_new();
    232             }
    233             qlist_append_str(keypairs, name);
    234             qlist_append_str(keypairs, value);
    235         }
    236     }
    237 
    238     if (keypairs) {
    239         qdict_put(options, "=keyvalue-pairs",
    240                   qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
    241     }
    242 
    243 done:
    244     g_free(buf);
    245     qobject_unref(keypairs);
    246     return;
    247 }
    248 
    249 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
    250                              Error **errp)
    251 {
    252     char *key, *acr;
    253     int r;
    254     GString *accu;
    255     RbdAuthModeList *auth;
    256 
    257     if (opts->key_secret) {
    258         key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
    259         if (!key) {
    260             return -EIO;
    261         }
    262         r = rados_conf_set(cluster, "key", key);
    263         g_free(key);
    264         if (r < 0) {
    265             error_setg_errno(errp, -r, "Could not set 'key'");
    266             return r;
    267         }
    268     }
    269 
    270     if (opts->has_auth_client_required) {
    271         accu = g_string_new("");
    272         for (auth = opts->auth_client_required; auth; auth = auth->next) {
    273             if (accu->str[0]) {
    274                 g_string_append_c(accu, ';');
    275             }
    276             g_string_append(accu, RbdAuthMode_str(auth->value));
    277         }
    278         acr = g_string_free(accu, FALSE);
    279         r = rados_conf_set(cluster, "auth_client_required", acr);
    280         g_free(acr);
    281         if (r < 0) {
    282             error_setg_errno(errp, -r,
    283                              "Could not set 'auth_client_required'");
    284             return r;
    285         }
    286     }
    287 
    288     return 0;
    289 }
    290 
    291 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
    292                                  Error **errp)
    293 {
    294     QList *keypairs;
    295     QString *name;
    296     QString *value;
    297     const char *key;
    298     size_t remaining;
    299     int ret = 0;
    300 
    301     if (!keypairs_json) {
    302         return ret;
    303     }
    304     keypairs = qobject_to(QList,
    305                           qobject_from_json(keypairs_json, &error_abort));
    306     remaining = qlist_size(keypairs) / 2;
    307     assert(remaining);
    308 
    309     while (remaining--) {
    310         name = qobject_to(QString, qlist_pop(keypairs));
    311         value = qobject_to(QString, qlist_pop(keypairs));
    312         assert(name && value);
    313         key = qstring_get_str(name);
    314 
    315         ret = rados_conf_set(cluster, key, qstring_get_str(value));
    316         qobject_unref(value);
    317         if (ret < 0) {
    318             error_setg_errno(errp, -ret, "invalid conf option %s", key);
    319             qobject_unref(name);
    320             ret = -EINVAL;
    321             break;
    322         }
    323         qobject_unref(name);
    324     }
    325 
    326     qobject_unref(keypairs);
    327     return ret;
    328 }
    329 
    330 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
    331 static int qemu_rbd_convert_luks_options(
    332         RbdEncryptionOptionsLUKSBase *luks_opts,
    333         char **passphrase,
    334         size_t *passphrase_len,
    335         Error **errp)
    336 {
    337     return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
    338                                  passphrase_len, errp);
    339 }
    340 
    341 static int qemu_rbd_convert_luks_create_options(
    342         RbdEncryptionCreateOptionsLUKSBase *luks_opts,
    343         rbd_encryption_algorithm_t *alg,
    344         char **passphrase,
    345         size_t *passphrase_len,
    346         Error **errp)
    347 {
    348     int r = 0;
    349 
    350     r = qemu_rbd_convert_luks_options(
    351             qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
    352             passphrase, passphrase_len, errp);
    353     if (r < 0) {
    354         return r;
    355     }
    356 
    357     if (luks_opts->has_cipher_alg) {
    358         switch (luks_opts->cipher_alg) {
    359             case QCRYPTO_CIPHER_ALG_AES_128: {
    360                 *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
    361                 break;
    362             }
    363             case QCRYPTO_CIPHER_ALG_AES_256: {
    364                 *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
    365                 break;
    366             }
    367             default: {
    368                 r = -ENOTSUP;
    369                 error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
    370                                  luks_opts->cipher_alg);
    371                 return r;
    372             }
    373         }
    374     } else {
    375         /* default alg */
    376         *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
    377     }
    378 
    379     return 0;
    380 }
    381 
    382 static int qemu_rbd_encryption_format(rbd_image_t image,
    383                                       RbdEncryptionCreateOptions *encrypt,
    384                                       Error **errp)
    385 {
    386     int r = 0;
    387     g_autofree char *passphrase = NULL;
    388     size_t passphrase_len;
    389     rbd_encryption_format_t format;
    390     rbd_encryption_options_t opts;
    391     rbd_encryption_luks1_format_options_t luks_opts;
    392     rbd_encryption_luks2_format_options_t luks2_opts;
    393     size_t opts_size;
    394     uint64_t raw_size, effective_size;
    395 
    396     r = rbd_get_size(image, &raw_size);
    397     if (r < 0) {
    398         error_setg_errno(errp, -r, "cannot get raw image size");
    399         return r;
    400     }
    401 
    402     switch (encrypt->format) {
    403         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
    404             memset(&luks_opts, 0, sizeof(luks_opts));
    405             format = RBD_ENCRYPTION_FORMAT_LUKS1;
    406             opts = &luks_opts;
    407             opts_size = sizeof(luks_opts);
    408             r = qemu_rbd_convert_luks_create_options(
    409                     qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
    410                     &luks_opts.alg, &passphrase, &passphrase_len, errp);
    411             if (r < 0) {
    412                 return r;
    413             }
    414             luks_opts.passphrase = passphrase;
    415             luks_opts.passphrase_size = passphrase_len;
    416             break;
    417         }
    418         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
    419             memset(&luks2_opts, 0, sizeof(luks2_opts));
    420             format = RBD_ENCRYPTION_FORMAT_LUKS2;
    421             opts = &luks2_opts;
    422             opts_size = sizeof(luks2_opts);
    423             r = qemu_rbd_convert_luks_create_options(
    424                     qapi_RbdEncryptionCreateOptionsLUKS2_base(
    425                             &encrypt->u.luks2),
    426                     &luks2_opts.alg, &passphrase, &passphrase_len, errp);
    427             if (r < 0) {
    428                 return r;
    429             }
    430             luks2_opts.passphrase = passphrase;
    431             luks2_opts.passphrase_size = passphrase_len;
    432             break;
    433         }
    434         default: {
    435             r = -ENOTSUP;
    436             error_setg_errno(
    437                     errp, -r, "unknown image encryption format: %u",
    438                     encrypt->format);
    439             return r;
    440         }
    441     }
    442 
    443     r = rbd_encryption_format(image, format, opts, opts_size);
    444     if (r < 0) {
    445         error_setg_errno(errp, -r, "encryption format fail");
    446         return r;
    447     }
    448 
    449     r = rbd_get_size(image, &effective_size);
    450     if (r < 0) {
    451         error_setg_errno(errp, -r, "cannot get effective image size");
    452         return r;
    453     }
    454 
    455     r = rbd_resize(image, raw_size + (raw_size - effective_size));
    456     if (r < 0) {
    457         error_setg_errno(errp, -r, "cannot resize image after format");
    458         return r;
    459     }
    460 
    461     return 0;
    462 }
    463 
    464 static int qemu_rbd_encryption_load(rbd_image_t image,
    465                                     RbdEncryptionOptions *encrypt,
    466                                     Error **errp)
    467 {
    468     int r = 0;
    469     g_autofree char *passphrase = NULL;
    470     size_t passphrase_len;
    471     rbd_encryption_luks1_format_options_t luks_opts;
    472     rbd_encryption_luks2_format_options_t luks2_opts;
    473     rbd_encryption_format_t format;
    474     rbd_encryption_options_t opts;
    475     size_t opts_size;
    476 
    477     switch (encrypt->format) {
    478         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
    479             memset(&luks_opts, 0, sizeof(luks_opts));
    480             format = RBD_ENCRYPTION_FORMAT_LUKS1;
    481             opts = &luks_opts;
    482             opts_size = sizeof(luks_opts);
    483             r = qemu_rbd_convert_luks_options(
    484                     qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
    485                     &passphrase, &passphrase_len, errp);
    486             if (r < 0) {
    487                 return r;
    488             }
    489             luks_opts.passphrase = passphrase;
    490             luks_opts.passphrase_size = passphrase_len;
    491             break;
    492         }
    493         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
    494             memset(&luks2_opts, 0, sizeof(luks2_opts));
    495             format = RBD_ENCRYPTION_FORMAT_LUKS2;
    496             opts = &luks2_opts;
    497             opts_size = sizeof(luks2_opts);
    498             r = qemu_rbd_convert_luks_options(
    499                     qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
    500                     &passphrase, &passphrase_len, errp);
    501             if (r < 0) {
    502                 return r;
    503             }
    504             luks2_opts.passphrase = passphrase;
    505             luks2_opts.passphrase_size = passphrase_len;
    506             break;
    507         }
    508         default: {
    509             r = -ENOTSUP;
    510             error_setg_errno(
    511                     errp, -r, "unknown image encryption format: %u",
    512                     encrypt->format);
    513             return r;
    514         }
    515     }
    516 
    517     r = rbd_encryption_load(image, format, opts, opts_size);
    518     if (r < 0) {
    519         error_setg_errno(errp, -r, "encryption load fail");
    520         return r;
    521     }
    522 
    523     return 0;
    524 }
    525 #endif
    526 
    527 /* FIXME Deprecate and remove keypairs or make it available in QMP. */
    528 static int qemu_rbd_do_create(BlockdevCreateOptions *options,
    529                               const char *keypairs, const char *password_secret,
    530                               Error **errp)
    531 {
    532     BlockdevCreateOptionsRbd *opts = &options->u.rbd;
    533     rados_t cluster;
    534     rados_ioctx_t io_ctx;
    535     int obj_order = 0;
    536     int ret;
    537 
    538     assert(options->driver == BLOCKDEV_DRIVER_RBD);
    539     if (opts->location->has_snapshot) {
    540         error_setg(errp, "Can't use snapshot name for image creation");
    541         return -EINVAL;
    542     }
    543 
    544 #ifndef LIBRBD_SUPPORTS_ENCRYPTION
    545     if (opts->has_encrypt) {
    546         error_setg(errp, "RBD library does not support image encryption");
    547         return -ENOTSUP;
    548     }
    549 #endif
    550 
    551     if (opts->has_cluster_size) {
    552         int64_t objsize = opts->cluster_size;
    553         if ((objsize - 1) & objsize) {    /* not a power of 2? */
    554             error_setg(errp, "obj size needs to be power of 2");
    555             return -EINVAL;
    556         }
    557         if (objsize < 4096) {
    558             error_setg(errp, "obj size too small");
    559             return -EINVAL;
    560         }
    561         obj_order = ctz32(objsize);
    562     }
    563 
    564     ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
    565                            password_secret, errp);
    566     if (ret < 0) {
    567         return ret;
    568     }
    569 
    570     ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
    571     if (ret < 0) {
    572         error_setg_errno(errp, -ret, "error rbd create");
    573         goto out;
    574     }
    575 
    576 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
    577     if (opts->has_encrypt) {
    578         rbd_image_t image;
    579 
    580         ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
    581         if (ret < 0) {
    582             error_setg_errno(errp, -ret,
    583                              "error opening image '%s' for encryption format",
    584                              opts->location->image);
    585             goto out;
    586         }
    587 
    588         ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
    589         rbd_close(image);
    590         if (ret < 0) {
    591             /* encryption format fail, try removing the image */
    592             rbd_remove(io_ctx, opts->location->image);
    593             goto out;
    594         }
    595     }
    596 #endif
    597 
    598     ret = 0;
    599 out:
    600     rados_ioctx_destroy(io_ctx);
    601     rados_shutdown(cluster);
    602     return ret;
    603 }
    604 
    605 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
    606 {
    607     return qemu_rbd_do_create(options, NULL, NULL, errp);
    608 }
    609 
    610 static int qemu_rbd_extract_encryption_create_options(
    611         QemuOpts *opts,
    612         RbdEncryptionCreateOptions **spec,
    613         Error **errp)
    614 {
    615     QDict *opts_qdict;
    616     QDict *encrypt_qdict;
    617     Visitor *v;
    618     int ret = 0;
    619 
    620     opts_qdict = qemu_opts_to_qdict(opts, NULL);
    621     qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
    622     qobject_unref(opts_qdict);
    623     if (!qdict_size(encrypt_qdict)) {
    624         *spec = NULL;
    625         goto exit;
    626     }
    627 
    628     /* Convert options into a QAPI object */
    629     v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
    630     if (!v) {
    631         ret = -EINVAL;
    632         goto exit;
    633     }
    634 
    635     visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
    636     visit_free(v);
    637     if (!*spec) {
    638         ret = -EINVAL;
    639         goto exit;
    640     }
    641 
    642 exit:
    643     qobject_unref(encrypt_qdict);
    644     return ret;
    645 }
    646 
    647 static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
    648                                                 const char *filename,
    649                                                 QemuOpts *opts,
    650                                                 Error **errp)
    651 {
    652     BlockdevCreateOptions *create_options;
    653     BlockdevCreateOptionsRbd *rbd_opts;
    654     BlockdevOptionsRbd *loc;
    655     RbdEncryptionCreateOptions *encrypt = NULL;
    656     Error *local_err = NULL;
    657     const char *keypairs, *password_secret;
    658     QDict *options = NULL;
    659     int ret = 0;
    660 
    661     create_options = g_new0(BlockdevCreateOptions, 1);
    662     create_options->driver = BLOCKDEV_DRIVER_RBD;
    663     rbd_opts = &create_options->u.rbd;
    664 
    665     rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
    666 
    667     password_secret = qemu_opt_get(opts, "password-secret");
    668 
    669     /* Read out options */
    670     rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
    671                               BDRV_SECTOR_SIZE);
    672     rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
    673                                                    BLOCK_OPT_CLUSTER_SIZE, 0);
    674     rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
    675 
    676     options = qdict_new();
    677     qemu_rbd_parse_filename(filename, options, &local_err);
    678     if (local_err) {
    679         ret = -EINVAL;
    680         error_propagate(errp, local_err);
    681         goto exit;
    682     }
    683 
    684     ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
    685     if (ret < 0) {
    686         goto exit;
    687     }
    688     rbd_opts->encrypt     = encrypt;
    689     rbd_opts->has_encrypt = !!encrypt;
    690 
    691     /*
    692      * Caution: while qdict_get_try_str() is fine, getting non-string
    693      * types would require more care.  When @options come from -blockdev
    694      * or blockdev_add, its members are typed according to the QAPI
    695      * schema, but when they come from -drive, they're all QString.
    696      */
    697     loc = rbd_opts->location;
    698     loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
    699     loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
    700     loc->has_conf    = !!loc->conf;
    701     loc->user        = g_strdup(qdict_get_try_str(options, "user"));
    702     loc->has_user    = !!loc->user;
    703     loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
    704     loc->has_q_namespace = !!loc->q_namespace;
    705     loc->image       = g_strdup(qdict_get_try_str(options, "image"));
    706     keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
    707 
    708     ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
    709     if (ret < 0) {
    710         goto exit;
    711     }
    712 
    713 exit:
    714     qobject_unref(options);
    715     qapi_free_BlockdevCreateOptions(create_options);
    716     return ret;
    717 }
    718 
    719 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
    720 {
    721     const char **vals;
    722     const char *host, *port;
    723     char *rados_str;
    724     InetSocketAddressBaseList *p;
    725     int i, cnt;
    726 
    727     if (!opts->has_server) {
    728         return NULL;
    729     }
    730 
    731     for (cnt = 0, p = opts->server; p; p = p->next) {
    732         cnt++;
    733     }
    734 
    735     vals = g_new(const char *, cnt + 1);
    736 
    737     for (i = 0, p = opts->server; p; p = p->next, i++) {
    738         host = p->value->host;
    739         port = p->value->port;
    740 
    741         if (strchr(host, ':')) {
    742             vals[i] = g_strdup_printf("[%s]:%s", host, port);
    743         } else {
    744             vals[i] = g_strdup_printf("%s:%s", host, port);
    745         }
    746     }
    747     vals[i] = NULL;
    748 
    749     rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
    750     g_strfreev((char **)vals);
    751     return rados_str;
    752 }
    753 
    754 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
    755                             BlockdevOptionsRbd *opts, bool cache,
    756                             const char *keypairs, const char *secretid,
    757                             Error **errp)
    758 {
    759     char *mon_host = NULL;
    760     Error *local_err = NULL;
    761     int r;
    762 
    763     if (secretid) {
    764         if (opts->key_secret) {
    765             error_setg(errp,
    766                        "Legacy 'password-secret' clashes with 'key-secret'");
    767             return -EINVAL;
    768         }
    769         opts->key_secret = g_strdup(secretid);
    770         opts->has_key_secret = true;
    771     }
    772 
    773     mon_host = qemu_rbd_mon_host(opts, &local_err);
    774     if (local_err) {
    775         error_propagate(errp, local_err);
    776         r = -EINVAL;
    777         goto out;
    778     }
    779 
    780     r = rados_create(cluster, opts->user);
    781     if (r < 0) {
    782         error_setg_errno(errp, -r, "error initializing");
    783         goto out;
    784     }
    785 
    786     /* try default location when conf=NULL, but ignore failure */
    787     r = rados_conf_read_file(*cluster, opts->conf);
    788     if (opts->has_conf && r < 0) {
    789         error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
    790         goto failed_shutdown;
    791     }
    792 
    793     r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
    794     if (r < 0) {
    795         goto failed_shutdown;
    796     }
    797 
    798     if (mon_host) {
    799         r = rados_conf_set(*cluster, "mon_host", mon_host);
    800         if (r < 0) {
    801             goto failed_shutdown;
    802         }
    803     }
    804 
    805     r = qemu_rbd_set_auth(*cluster, opts, errp);
    806     if (r < 0) {
    807         goto failed_shutdown;
    808     }
    809 
    810     /*
    811      * Fallback to more conservative semantics if setting cache
    812      * options fails. Ignore errors from setting rbd_cache because the
    813      * only possible error is that the option does not exist, and
    814      * librbd defaults to no caching. If write through caching cannot
    815      * be set up, fall back to no caching.
    816      */
    817     if (cache) {
    818         rados_conf_set(*cluster, "rbd_cache", "true");
    819     } else {
    820         rados_conf_set(*cluster, "rbd_cache", "false");
    821     }
    822 
    823     r = rados_connect(*cluster);
    824     if (r < 0) {
    825         error_setg_errno(errp, -r, "error connecting");
    826         goto failed_shutdown;
    827     }
    828 
    829     r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
    830     if (r < 0) {
    831         error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
    832         goto failed_shutdown;
    833     }
    834 
    835 #ifdef HAVE_RBD_NAMESPACE_EXISTS
    836     if (opts->has_q_namespace && strlen(opts->q_namespace) > 0) {
    837         bool exists;
    838 
    839         r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists);
    840         if (r < 0) {
    841             error_setg_errno(errp, -r, "error checking namespace");
    842             goto failed_ioctx_destroy;
    843         }
    844 
    845         if (!exists) {
    846             error_setg(errp, "namespace '%s' does not exist",
    847                        opts->q_namespace);
    848             r = -ENOENT;
    849             goto failed_ioctx_destroy;
    850         }
    851     }
    852 #endif
    853 
    854     /*
    855      * Set the namespace after opening the io context on the pool,
    856      * if nspace == NULL or if nspace == "", it is just as we did nothing
    857      */
    858     rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
    859 
    860     r = 0;
    861     goto out;
    862 
    863 #ifdef HAVE_RBD_NAMESPACE_EXISTS
    864 failed_ioctx_destroy:
    865     rados_ioctx_destroy(*io_ctx);
    866 #endif
    867 failed_shutdown:
    868     rados_shutdown(*cluster);
    869 out:
    870     g_free(mon_host);
    871     return r;
    872 }
    873 
    874 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
    875                                     Error **errp)
    876 {
    877     Visitor *v;
    878 
    879     /* Convert the remaining options into a QAPI object */
    880     v = qobject_input_visitor_new_flat_confused(options, errp);
    881     if (!v) {
    882         return -EINVAL;
    883     }
    884 
    885     visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
    886     visit_free(v);
    887     if (!opts) {
    888         return -EINVAL;
    889     }
    890 
    891     return 0;
    892 }
    893 
    894 static int qemu_rbd_attempt_legacy_options(QDict *options,
    895                                            BlockdevOptionsRbd **opts,
    896                                            char **keypairs)
    897 {
    898     char *filename;
    899     int r;
    900 
    901     filename = g_strdup(qdict_get_try_str(options, "filename"));
    902     if (!filename) {
    903         return -EINVAL;
    904     }
    905     qdict_del(options, "filename");
    906 
    907     qemu_rbd_parse_filename(filename, options, NULL);
    908 
    909     /* keypairs freed by caller */
    910     *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
    911     if (*keypairs) {
    912         qdict_del(options, "=keyvalue-pairs");
    913     }
    914 
    915     r = qemu_rbd_convert_options(options, opts, NULL);
    916 
    917     g_free(filename);
    918     return r;
    919 }
    920 
    921 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
    922                          Error **errp)
    923 {
    924     BDRVRBDState *s = bs->opaque;
    925     BlockdevOptionsRbd *opts = NULL;
    926     const QDictEntry *e;
    927     Error *local_err = NULL;
    928     char *keypairs, *secretid;
    929     rbd_image_info_t info;
    930     int r;
    931 
    932     keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
    933     if (keypairs) {
    934         qdict_del(options, "=keyvalue-pairs");
    935     }
    936 
    937     secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
    938     if (secretid) {
    939         qdict_del(options, "password-secret");
    940     }
    941 
    942     r = qemu_rbd_convert_options(options, &opts, &local_err);
    943     if (local_err) {
    944         /* If keypairs are present, that means some options are present in
    945          * the modern option format.  Don't attempt to parse legacy option
    946          * formats, as we won't support mixed usage. */
    947         if (keypairs) {
    948             error_propagate(errp, local_err);
    949             goto out;
    950         }
    951 
    952         /* If the initial attempt to convert and process the options failed,
    953          * we may be attempting to open an image file that has the rbd options
    954          * specified in the older format consisting of all key/value pairs
    955          * encoded in the filename.  Go ahead and attempt to parse the
    956          * filename, and see if we can pull out the required options. */
    957         r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
    958         if (r < 0) {
    959             /* Propagate the original error, not the legacy parsing fallback
    960              * error, as the latter was just a best-effort attempt. */
    961             error_propagate(errp, local_err);
    962             goto out;
    963         }
    964         /* Take care whenever deciding to actually deprecate; once this ability
    965          * is removed, we will not be able to open any images with legacy-styled
    966          * backing image strings. */
    967         warn_report("RBD options encoded in the filename as keyvalue pairs "
    968                     "is deprecated");
    969     }
    970 
    971     /* Remove the processed options from the QDict (the visitor processes
    972      * _all_ options in the QDict) */
    973     while ((e = qdict_first(options))) {
    974         qdict_del(options, e->key);
    975     }
    976 
    977     r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
    978                          !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
    979     if (r < 0) {
    980         goto out;
    981     }
    982 
    983     s->snap = g_strdup(opts->snapshot);
    984     s->image_name = g_strdup(opts->image);
    985 
    986     /* rbd_open is always r/w */
    987     r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
    988     if (r < 0) {
    989         error_setg_errno(errp, -r, "error reading header from %s",
    990                          s->image_name);
    991         goto failed_open;
    992     }
    993 
    994     if (opts->has_encrypt) {
    995 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
    996         r = qemu_rbd_encryption_load(s->image, opts->encrypt, errp);
    997         if (r < 0) {
    998             goto failed_post_open;
    999         }
   1000 #else
   1001         r = -ENOTSUP;
   1002         error_setg(errp, "RBD library does not support image encryption");
   1003         goto failed_post_open;
   1004 #endif
   1005     }
   1006 
   1007     r = rbd_stat(s->image, &info, sizeof(info));
   1008     if (r < 0) {
   1009         error_setg_errno(errp, -r, "error getting image info from %s",
   1010                          s->image_name);
   1011         goto failed_post_open;
   1012     }
   1013     s->image_size = info.size;
   1014     s->object_size = info.obj_size;
   1015 
   1016     /* If we are using an rbd snapshot, we must be r/o, otherwise
   1017      * leave as-is */
   1018     if (s->snap != NULL) {
   1019         r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
   1020         if (r < 0) {
   1021             goto failed_post_open;
   1022         }
   1023     }
   1024 
   1025 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
   1026     bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
   1027 #endif
   1028 
   1029     /* When extending regular files, we get zeros from the OS */
   1030     bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
   1031 
   1032     r = 0;
   1033     goto out;
   1034 
   1035 failed_post_open:
   1036     rbd_close(s->image);
   1037 failed_open:
   1038     rados_ioctx_destroy(s->io_ctx);
   1039     g_free(s->snap);
   1040     g_free(s->image_name);
   1041     rados_shutdown(s->cluster);
   1042 out:
   1043     qapi_free_BlockdevOptionsRbd(opts);
   1044     g_free(keypairs);
   1045     g_free(secretid);
   1046     return r;
   1047 }
   1048 
   1049 
   1050 /* Since RBD is currently always opened R/W via the API,
   1051  * we just need to check if we are using a snapshot or not, in
   1052  * order to determine if we will allow it to be R/W */
   1053 static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
   1054                                    BlockReopenQueue *queue, Error **errp)
   1055 {
   1056     BDRVRBDState *s = state->bs->opaque;
   1057     int ret = 0;
   1058 
   1059     if (s->snap && state->flags & BDRV_O_RDWR) {
   1060         error_setg(errp,
   1061                    "Cannot change node '%s' to r/w when using RBD snapshot",
   1062                    bdrv_get_device_or_node_name(state->bs));
   1063         ret = -EINVAL;
   1064     }
   1065 
   1066     return ret;
   1067 }
   1068 
   1069 static void qemu_rbd_close(BlockDriverState *bs)
   1070 {
   1071     BDRVRBDState *s = bs->opaque;
   1072 
   1073     rbd_close(s->image);
   1074     rados_ioctx_destroy(s->io_ctx);
   1075     g_free(s->snap);
   1076     g_free(s->image_name);
   1077     rados_shutdown(s->cluster);
   1078 }
   1079 
   1080 /* Resize the RBD image and update the 'image_size' with the current size */
   1081 static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
   1082 {
   1083     BDRVRBDState *s = bs->opaque;
   1084     int r;
   1085 
   1086     r = rbd_resize(s->image, size);
   1087     if (r < 0) {
   1088         return r;
   1089     }
   1090 
   1091     s->image_size = size;
   1092 
   1093     return 0;
   1094 }
   1095 
   1096 static void qemu_rbd_finish_bh(void *opaque)
   1097 {
   1098     RBDTask *task = opaque;
   1099     task->complete = true;
   1100     aio_co_wake(task->co);
   1101 }
   1102 
   1103 /*
   1104  * This is the completion callback function for all rbd aio calls
   1105  * started from qemu_rbd_start_co().
   1106  *
   1107  * Note: this function is being called from a non qemu thread so
   1108  * we need to be careful about what we do here. Generally we only
   1109  * schedule a BH, and do the rest of the io completion handling
   1110  * from qemu_rbd_finish_bh() which runs in a qemu context.
   1111  */
   1112 static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
   1113 {
   1114     task->ret = rbd_aio_get_return_value(c);
   1115     rbd_aio_release(c);
   1116     aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
   1117                             qemu_rbd_finish_bh, task);
   1118 }
   1119 
   1120 static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
   1121                                           uint64_t offset,
   1122                                           uint64_t bytes,
   1123                                           QEMUIOVector *qiov,
   1124                                           int flags,
   1125                                           RBDAIOCmd cmd)
   1126 {
   1127     BDRVRBDState *s = bs->opaque;
   1128     RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
   1129     rbd_completion_t c;
   1130     int r;
   1131 
   1132     assert(!qiov || qiov->size == bytes);
   1133 
   1134     if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) {
   1135         /*
   1136          * RBD APIs don't allow us to write more than actual size, so in order
   1137          * to support growing images, we resize the image before write
   1138          * operations that exceed the current size.
   1139          */
   1140         if (offset + bytes > s->image_size) {
   1141             int r = qemu_rbd_resize(bs, offset + bytes);
   1142             if (r < 0) {
   1143                 return r;
   1144             }
   1145         }
   1146     }
   1147 
   1148     r = rbd_aio_create_completion(&task,
   1149                                   (rbd_callback_t) qemu_rbd_completion_cb, &c);
   1150     if (r < 0) {
   1151         return r;
   1152     }
   1153 
   1154     switch (cmd) {
   1155     case RBD_AIO_READ:
   1156         r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
   1157         break;
   1158     case RBD_AIO_WRITE:
   1159         r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
   1160         break;
   1161     case RBD_AIO_DISCARD:
   1162         r = rbd_aio_discard(s->image, offset, bytes, c);
   1163         break;
   1164     case RBD_AIO_FLUSH:
   1165         r = rbd_aio_flush(s->image, c);
   1166         break;
   1167 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
   1168     case RBD_AIO_WRITE_ZEROES: {
   1169         int zero_flags = 0;
   1170 #ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
   1171         if (!(flags & BDRV_REQ_MAY_UNMAP)) {
   1172             zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
   1173         }
   1174 #endif
   1175         r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
   1176         break;
   1177     }
   1178 #endif
   1179     default:
   1180         r = -EINVAL;
   1181     }
   1182 
   1183     if (r < 0) {
   1184         error_report("rbd request failed early: cmd %d offset %" PRIu64
   1185                      " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
   1186                      bytes, flags, r, strerror(-r));
   1187         rbd_aio_release(c);
   1188         return r;
   1189     }
   1190 
   1191     while (!task.complete) {
   1192         qemu_coroutine_yield();
   1193     }
   1194 
   1195     if (task.ret < 0) {
   1196         error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
   1197                      PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
   1198                      bytes, flags, task.ret, strerror(-task.ret));
   1199         return task.ret;
   1200     }
   1201 
   1202     /* zero pad short reads */
   1203     if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
   1204         qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
   1205     }
   1206 
   1207     return 0;
   1208 }
   1209 
   1210 static int
   1211 coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
   1212                                 int64_t bytes, QEMUIOVector *qiov,
   1213                                 BdrvRequestFlags flags)
   1214 {
   1215     return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
   1216 }
   1217 
   1218 static int
   1219 coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
   1220                                  int64_t bytes, QEMUIOVector *qiov,
   1221                                  BdrvRequestFlags flags)
   1222 {
   1223     return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
   1224 }
   1225 
   1226 static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
   1227 {
   1228     return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
   1229 }
   1230 
   1231 static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
   1232                                              int64_t offset, int64_t bytes)
   1233 {
   1234     return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
   1235 }
   1236 
   1237 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
   1238 static int
   1239 coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
   1240                                        int64_t bytes, BdrvRequestFlags flags)
   1241 {
   1242     return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
   1243                              RBD_AIO_WRITE_ZEROES);
   1244 }
   1245 #endif
   1246 
   1247 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
   1248 {
   1249     BDRVRBDState *s = bs->opaque;
   1250     bdi->cluster_size = s->object_size;
   1251     return 0;
   1252 }
   1253 
   1254 static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
   1255                                                      Error **errp)
   1256 {
   1257     BDRVRBDState *s = bs->opaque;
   1258     ImageInfoSpecific *spec_info;
   1259     char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
   1260     int r;
   1261 
   1262     if (s->image_size >= RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
   1263         r = rbd_read(s->image, 0,
   1264                      RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
   1265         if (r < 0) {
   1266             error_setg_errno(errp, -r, "cannot read image start for probe");
   1267             return NULL;
   1268         }
   1269     }
   1270 
   1271     spec_info = g_new(ImageInfoSpecific, 1);
   1272     *spec_info = (ImageInfoSpecific){
   1273         .type  = IMAGE_INFO_SPECIFIC_KIND_RBD,
   1274         .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
   1275     };
   1276 
   1277     if (memcmp(buf, rbd_luks_header_verification,
   1278                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
   1279         spec_info->u.rbd.data->encryption_format =
   1280                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
   1281         spec_info->u.rbd.data->has_encryption_format = true;
   1282     } else if (memcmp(buf, rbd_luks2_header_verification,
   1283                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
   1284         spec_info->u.rbd.data->encryption_format =
   1285                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
   1286         spec_info->u.rbd.data->has_encryption_format = true;
   1287     } else {
   1288         spec_info->u.rbd.data->has_encryption_format = false;
   1289     }
   1290 
   1291     return spec_info;
   1292 }
   1293 
   1294 /*
   1295  * rbd_diff_iterate2 allows to interrupt the exection by returning a negative
   1296  * value in the callback routine. Choose a value that does not conflict with
   1297  * an existing exitcode and return it if we want to prematurely stop the
   1298  * execution because we detected a change in the allocation status.
   1299  */
   1300 #define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000
   1301 
   1302 static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len,
   1303                                     int exists, void *opaque)
   1304 {
   1305     RBDDiffIterateReq *req = opaque;
   1306 
   1307     assert(req->offs + req->bytes <= offs);
   1308 
   1309     /* treat a hole like an unallocated area and bail out */
   1310     if (!exists) {
   1311         return 0;
   1312     }
   1313 
   1314     if (!req->exists && offs > req->offs) {
   1315         /*
   1316          * we started in an unallocated area and hit the first allocated
   1317          * block. req->bytes must be set to the length of the unallocated area
   1318          * before the allocated area. stop further processing.
   1319          */
   1320         req->bytes = offs - req->offs;
   1321         return QEMU_RBD_EXIT_DIFF_ITERATE2;
   1322     }
   1323 
   1324     if (req->exists && offs > req->offs + req->bytes) {
   1325         /*
   1326          * we started in an allocated area and jumped over an unallocated area,
   1327          * req->bytes contains the length of the allocated area before the
   1328          * unallocated area. stop further processing.
   1329          */
   1330         return QEMU_RBD_EXIT_DIFF_ITERATE2;
   1331     }
   1332 
   1333     req->bytes += len;
   1334     req->exists = true;
   1335 
   1336     return 0;
   1337 }
   1338 
   1339 static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs,
   1340                                                  bool want_zero, int64_t offset,
   1341                                                  int64_t bytes, int64_t *pnum,
   1342                                                  int64_t *map,
   1343                                                  BlockDriverState **file)
   1344 {
   1345     BDRVRBDState *s = bs->opaque;
   1346     int status, r;
   1347     RBDDiffIterateReq req = { .offs = offset };
   1348     uint64_t features, flags;
   1349     uint64_t head = 0;
   1350 
   1351     assert(offset + bytes <= s->image_size);
   1352 
   1353     /* default to all sectors allocated */
   1354     status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
   1355     *map = offset;
   1356     *file = bs;
   1357     *pnum = bytes;
   1358 
   1359     /* check if RBD image supports fast-diff */
   1360     r = rbd_get_features(s->image, &features);
   1361     if (r < 0) {
   1362         return status;
   1363     }
   1364     if (!(features & RBD_FEATURE_FAST_DIFF)) {
   1365         return status;
   1366     }
   1367 
   1368     /* check if RBD fast-diff result is valid */
   1369     r = rbd_get_flags(s->image, &flags);
   1370     if (r < 0) {
   1371         return status;
   1372     }
   1373     if (flags & RBD_FLAG_FAST_DIFF_INVALID) {
   1374         return status;
   1375     }
   1376 
   1377 #if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0)
   1378     /*
   1379      * librbd had a bug until early 2022 that affected all versions of ceph that
   1380      * supported fast-diff. This bug results in reporting of incorrect offsets
   1381      * if the offset parameter to rbd_diff_iterate2 is not object aligned.
   1382      * Work around this bug by rounding down the offset to object boundaries.
   1383      * This is OK because we call rbd_diff_iterate2 with whole_object = true.
   1384      * However, this workaround only works for non cloned images with default
   1385      * striping.
   1386      *
   1387      * See: https://tracker.ceph.com/issues/53784
   1388      */
   1389 
   1390     /* check if RBD image has non-default striping enabled */
   1391     if (features & RBD_FEATURE_STRIPINGV2) {
   1392         return status;
   1393     }
   1394 
   1395 #pragma GCC diagnostic push
   1396 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
   1397     /*
   1398      * check if RBD image is a clone (= has a parent).
   1399      *
   1400      * rbd_get_parent_info is deprecated from Nautilus onwards, but the
   1401      * replacement rbd_get_parent is not present in Luminous and Mimic.
   1402      */
   1403     if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) {
   1404         return status;
   1405     }
   1406 #pragma GCC diagnostic pop
   1407 
   1408     head = req.offs & (s->object_size - 1);
   1409     req.offs -= head;
   1410     bytes += head;
   1411 #endif
   1412 
   1413     r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true,
   1414                           qemu_rbd_diff_iterate_cb, &req);
   1415     if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) {
   1416         return status;
   1417     }
   1418     assert(req.bytes <= bytes);
   1419     if (!req.exists) {
   1420         if (r == 0) {
   1421             /*
   1422              * rbd_diff_iterate2 does not invoke callbacks for unallocated
   1423              * areas. This here catches the case where no callback was
   1424              * invoked at all (req.bytes == 0).
   1425              */
   1426             assert(req.bytes == 0);
   1427             req.bytes = bytes;
   1428         }
   1429         status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID;
   1430     }
   1431 
   1432     assert(req.bytes > head);
   1433     *pnum = req.bytes - head;
   1434     return status;
   1435 }
   1436 
   1437 static int64_t qemu_rbd_getlength(BlockDriverState *bs)
   1438 {
   1439     BDRVRBDState *s = bs->opaque;
   1440     int r;
   1441 
   1442     r = rbd_get_size(s->image, &s->image_size);
   1443     if (r < 0) {
   1444         return r;
   1445     }
   1446 
   1447     return s->image_size;
   1448 }
   1449 
   1450 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
   1451                                              int64_t offset,
   1452                                              bool exact,
   1453                                              PreallocMode prealloc,
   1454                                              BdrvRequestFlags flags,
   1455                                              Error **errp)
   1456 {
   1457     int r;
   1458 
   1459     if (prealloc != PREALLOC_MODE_OFF) {
   1460         error_setg(errp, "Unsupported preallocation mode '%s'",
   1461                    PreallocMode_str(prealloc));
   1462         return -ENOTSUP;
   1463     }
   1464 
   1465     r = qemu_rbd_resize(bs, offset);
   1466     if (r < 0) {
   1467         error_setg_errno(errp, -r, "Failed to resize file");
   1468         return r;
   1469     }
   1470 
   1471     return 0;
   1472 }
   1473 
   1474 static int qemu_rbd_snap_create(BlockDriverState *bs,
   1475                                 QEMUSnapshotInfo *sn_info)
   1476 {
   1477     BDRVRBDState *s = bs->opaque;
   1478     int r;
   1479 
   1480     if (sn_info->name[0] == '\0') {
   1481         return -EINVAL; /* we need a name for rbd snapshots */
   1482     }
   1483 
   1484     /*
   1485      * rbd snapshots are using the name as the user controlled unique identifier
   1486      * we can't use the rbd snapid for that purpose, as it can't be set
   1487      */
   1488     if (sn_info->id_str[0] != '\0' &&
   1489         strcmp(sn_info->id_str, sn_info->name) != 0) {
   1490         return -EINVAL;
   1491     }
   1492 
   1493     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
   1494         return -ERANGE;
   1495     }
   1496 
   1497     r = rbd_snap_create(s->image, sn_info->name);
   1498     if (r < 0) {
   1499         error_report("failed to create snap: %s", strerror(-r));
   1500         return r;
   1501     }
   1502 
   1503     return 0;
   1504 }
   1505 
   1506 static int qemu_rbd_snap_remove(BlockDriverState *bs,
   1507                                 const char *snapshot_id,
   1508                                 const char *snapshot_name,
   1509                                 Error **errp)
   1510 {
   1511     BDRVRBDState *s = bs->opaque;
   1512     int r;
   1513 
   1514     if (!snapshot_name) {
   1515         error_setg(errp, "rbd need a valid snapshot name");
   1516         return -EINVAL;
   1517     }
   1518 
   1519     /* If snapshot_id is specified, it must be equal to name, see
   1520        qemu_rbd_snap_list() */
   1521     if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
   1522         error_setg(errp,
   1523                    "rbd do not support snapshot id, it should be NULL or "
   1524                    "equal to snapshot name");
   1525         return -EINVAL;
   1526     }
   1527 
   1528     r = rbd_snap_remove(s->image, snapshot_name);
   1529     if (r < 0) {
   1530         error_setg_errno(errp, -r, "Failed to remove the snapshot");
   1531     }
   1532     return r;
   1533 }
   1534 
   1535 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
   1536                                   const char *snapshot_name)
   1537 {
   1538     BDRVRBDState *s = bs->opaque;
   1539 
   1540     return rbd_snap_rollback(s->image, snapshot_name);
   1541 }
   1542 
   1543 static int qemu_rbd_snap_list(BlockDriverState *bs,
   1544                               QEMUSnapshotInfo **psn_tab)
   1545 {
   1546     BDRVRBDState *s = bs->opaque;
   1547     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
   1548     int i, snap_count;
   1549     rbd_snap_info_t *snaps;
   1550     int max_snaps = RBD_MAX_SNAPS;
   1551 
   1552     do {
   1553         snaps = g_new(rbd_snap_info_t, max_snaps);
   1554         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
   1555         if (snap_count <= 0) {
   1556             g_free(snaps);
   1557         }
   1558     } while (snap_count == -ERANGE);
   1559 
   1560     if (snap_count <= 0) {
   1561         goto done;
   1562     }
   1563 
   1564     sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
   1565 
   1566     for (i = 0; i < snap_count; i++) {
   1567         const char *snap_name = snaps[i].name;
   1568 
   1569         sn_info = sn_tab + i;
   1570         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
   1571         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
   1572 
   1573         sn_info->vm_state_size = snaps[i].size;
   1574         sn_info->date_sec = 0;
   1575         sn_info->date_nsec = 0;
   1576         sn_info->vm_clock_nsec = 0;
   1577     }
   1578     rbd_snap_list_end(snaps);
   1579     g_free(snaps);
   1580 
   1581  done:
   1582     *psn_tab = sn_tab;
   1583     return snap_count;
   1584 }
   1585 
   1586 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
   1587                                                       Error **errp)
   1588 {
   1589     BDRVRBDState *s = bs->opaque;
   1590     int r = rbd_invalidate_cache(s->image);
   1591     if (r < 0) {
   1592         error_setg_errno(errp, -r, "Failed to invalidate the cache");
   1593     }
   1594 }
   1595 
   1596 static QemuOptsList qemu_rbd_create_opts = {
   1597     .name = "rbd-create-opts",
   1598     .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
   1599     .desc = {
   1600         {
   1601             .name = BLOCK_OPT_SIZE,
   1602             .type = QEMU_OPT_SIZE,
   1603             .help = "Virtual disk size"
   1604         },
   1605         {
   1606             .name = BLOCK_OPT_CLUSTER_SIZE,
   1607             .type = QEMU_OPT_SIZE,
   1608             .help = "RBD object size"
   1609         },
   1610         {
   1611             .name = "password-secret",
   1612             .type = QEMU_OPT_STRING,
   1613             .help = "ID of secret providing the password",
   1614         },
   1615         {
   1616             .name = "encrypt.format",
   1617             .type = QEMU_OPT_STRING,
   1618             .help = "Encrypt the image, format choices: 'luks', 'luks2'",
   1619         },
   1620         {
   1621             .name = "encrypt.cipher-alg",
   1622             .type = QEMU_OPT_STRING,
   1623             .help = "Name of encryption cipher algorithm"
   1624                     " (allowed values: aes-128, aes-256)",
   1625         },
   1626         {
   1627             .name = "encrypt.key-secret",
   1628             .type = QEMU_OPT_STRING,
   1629             .help = "ID of secret providing LUKS passphrase",
   1630         },
   1631         { /* end of list */ }
   1632     }
   1633 };
   1634 
   1635 static const char *const qemu_rbd_strong_runtime_opts[] = {
   1636     "pool",
   1637     "namespace",
   1638     "image",
   1639     "conf",
   1640     "snapshot",
   1641     "user",
   1642     "server.",
   1643     "password-secret",
   1644 
   1645     NULL
   1646 };
   1647 
   1648 static BlockDriver bdrv_rbd = {
   1649     .format_name            = "rbd",
   1650     .instance_size          = sizeof(BDRVRBDState),
   1651     .bdrv_parse_filename    = qemu_rbd_parse_filename,
   1652     .bdrv_file_open         = qemu_rbd_open,
   1653     .bdrv_close             = qemu_rbd_close,
   1654     .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
   1655     .bdrv_co_create         = qemu_rbd_co_create,
   1656     .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
   1657     .bdrv_has_zero_init     = bdrv_has_zero_init_1,
   1658     .bdrv_get_info          = qemu_rbd_getinfo,
   1659     .bdrv_get_specific_info = qemu_rbd_get_specific_info,
   1660     .create_opts            = &qemu_rbd_create_opts,
   1661     .bdrv_getlength         = qemu_rbd_getlength,
   1662     .bdrv_co_truncate       = qemu_rbd_co_truncate,
   1663     .protocol_name          = "rbd",
   1664 
   1665     .bdrv_co_preadv         = qemu_rbd_co_preadv,
   1666     .bdrv_co_pwritev        = qemu_rbd_co_pwritev,
   1667     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
   1668     .bdrv_co_pdiscard       = qemu_rbd_co_pdiscard,
   1669 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
   1670     .bdrv_co_pwrite_zeroes  = qemu_rbd_co_pwrite_zeroes,
   1671 #endif
   1672     .bdrv_co_block_status   = qemu_rbd_co_block_status,
   1673 
   1674     .bdrv_snapshot_create   = qemu_rbd_snap_create,
   1675     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
   1676     .bdrv_snapshot_list     = qemu_rbd_snap_list,
   1677     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
   1678     .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
   1679 
   1680     .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
   1681 };
   1682 
   1683 static void bdrv_rbd_init(void)
   1684 {
   1685     bdrv_register(&bdrv_rbd);
   1686 }
   1687 
   1688 block_init(bdrv_rbd_init);