qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

server.c (87575B)


      1 /*
      2  *  Copyright (C) 2016-2022 Red Hat, Inc.
      3  *  Copyright (C) 2005  Anthony Liguori <anthony@codemonkey.ws>
      4  *
      5  *  Network Block Device Server Side
      6  *
      7  *  This program is free software; you can redistribute it and/or modify
      8  *  it under the terms of the GNU General Public License as published by
      9  *  the Free Software Foundation; under version 2 of the License.
     10  *
     11  *  This program is distributed in the hope that it will be useful,
     12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
     13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     14  *  GNU General Public License for more details.
     15  *
     16  *  You should have received a copy of the GNU General Public License
     17  *  along with this program; if not, see <http://www.gnu.org/licenses/>.
     18  */
     19 
     20 #include "qemu/osdep.h"
     21 
     22 #include "block/export.h"
     23 #include "qapi/error.h"
     24 #include "qemu/queue.h"
     25 #include "trace.h"
     26 #include "nbd-internal.h"
     27 #include "qemu/units.h"
     28 #include "qemu/memalign.h"
     29 
     30 #define NBD_META_ID_BASE_ALLOCATION 0
     31 #define NBD_META_ID_ALLOCATION_DEPTH 1
     32 /* Dirty bitmaps use 'NBD_META_ID_DIRTY_BITMAP + i', so keep this id last. */
     33 #define NBD_META_ID_DIRTY_BITMAP 2
     34 
     35 /*
     36  * NBD_MAX_BLOCK_STATUS_EXTENTS: 1 MiB of extents data. An empirical
     37  * constant. If an increase is needed, note that the NBD protocol
     38  * recommends no larger than 32 mb, so that the client won't consider
     39  * the reply as a denial of service attack.
     40  */
     41 #define NBD_MAX_BLOCK_STATUS_EXTENTS (1 * MiB / 8)
     42 
     43 static int system_errno_to_nbd_errno(int err)
     44 {
     45     switch (err) {
     46     case 0:
     47         return NBD_SUCCESS;
     48     case EPERM:
     49     case EROFS:
     50         return NBD_EPERM;
     51     case EIO:
     52         return NBD_EIO;
     53     case ENOMEM:
     54         return NBD_ENOMEM;
     55 #ifdef EDQUOT
     56     case EDQUOT:
     57 #endif
     58     case EFBIG:
     59     case ENOSPC:
     60         return NBD_ENOSPC;
     61     case EOVERFLOW:
     62         return NBD_EOVERFLOW;
     63     case ENOTSUP:
     64 #if ENOTSUP != EOPNOTSUPP
     65     case EOPNOTSUPP:
     66 #endif
     67         return NBD_ENOTSUP;
     68     case ESHUTDOWN:
     69         return NBD_ESHUTDOWN;
     70     case EINVAL:
     71     default:
     72         return NBD_EINVAL;
     73     }
     74 }
     75 
     76 /* Definitions for opaque data types */
     77 
     78 typedef struct NBDRequestData NBDRequestData;
     79 
     80 struct NBDRequestData {
     81     NBDClient *client;
     82     uint8_t *data;
     83     bool complete;
     84 };
     85 
     86 struct NBDExport {
     87     BlockExport common;
     88 
     89     char *name;
     90     char *description;
     91     uint64_t size;
     92     uint16_t nbdflags;
     93     QTAILQ_HEAD(, NBDClient) clients;
     94     QTAILQ_ENTRY(NBDExport) next;
     95 
     96     BlockBackend *eject_notifier_blk;
     97     Notifier eject_notifier;
     98 
     99     bool allocation_depth;
    100     BdrvDirtyBitmap **export_bitmaps;
    101     size_t nr_export_bitmaps;
    102 };
    103 
    104 static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
    105 
    106 /* NBDExportMetaContexts represents a list of contexts to be exported,
    107  * as selected by NBD_OPT_SET_META_CONTEXT. Also used for
    108  * NBD_OPT_LIST_META_CONTEXT. */
    109 typedef struct NBDExportMetaContexts {
    110     NBDExport *exp;
    111     size_t count; /* number of negotiated contexts */
    112     bool base_allocation; /* export base:allocation context (block status) */
    113     bool allocation_depth; /* export qemu:allocation-depth */
    114     bool *bitmaps; /*
    115                     * export qemu:dirty-bitmap:<export bitmap name>,
    116                     * sized by exp->nr_export_bitmaps
    117                     */
    118 } NBDExportMetaContexts;
    119 
    120 struct NBDClient {
    121     int refcount;
    122     void (*close_fn)(NBDClient *client, bool negotiated);
    123 
    124     NBDExport *exp;
    125     QCryptoTLSCreds *tlscreds;
    126     char *tlsauthz;
    127     QIOChannelSocket *sioc; /* The underlying data channel */
    128     QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
    129 
    130     Coroutine *recv_coroutine;
    131 
    132     CoMutex send_lock;
    133     Coroutine *send_coroutine;
    134 
    135     bool read_yielding;
    136     bool quiescing;
    137 
    138     QTAILQ_ENTRY(NBDClient) next;
    139     int nb_requests;
    140     bool closing;
    141 
    142     uint32_t check_align; /* If non-zero, check for aligned client requests */
    143 
    144     bool structured_reply;
    145     NBDExportMetaContexts export_meta;
    146 
    147     uint32_t opt; /* Current option being negotiated */
    148     uint32_t optlen; /* remaining length of data in ioc for the option being
    149                         negotiated now */
    150 };
    151 
    152 static void nbd_client_receive_next_request(NBDClient *client);
    153 
    154 /* Basic flow for negotiation
    155 
    156    Server         Client
    157    Negotiate
    158 
    159    or
    160 
    161    Server         Client
    162    Negotiate #1
    163                   Option
    164    Negotiate #2
    165 
    166    ----
    167 
    168    followed by
    169 
    170    Server         Client
    171                   Request
    172    Response
    173                   Request
    174    Response
    175                   ...
    176    ...
    177                   Request (type == 2)
    178 
    179 */
    180 
    181 static inline void set_be_option_rep(NBDOptionReply *rep, uint32_t option,
    182                                      uint32_t type, uint32_t length)
    183 {
    184     stq_be_p(&rep->magic, NBD_REP_MAGIC);
    185     stl_be_p(&rep->option, option);
    186     stl_be_p(&rep->type, type);
    187     stl_be_p(&rep->length, length);
    188 }
    189 
    190 /* Send a reply header, including length, but no payload.
    191  * Return -errno on error, 0 on success. */
    192 static int nbd_negotiate_send_rep_len(NBDClient *client, uint32_t type,
    193                                       uint32_t len, Error **errp)
    194 {
    195     NBDOptionReply rep;
    196 
    197     trace_nbd_negotiate_send_rep_len(client->opt, nbd_opt_lookup(client->opt),
    198                                      type, nbd_rep_lookup(type), len);
    199 
    200     assert(len < NBD_MAX_BUFFER_SIZE);
    201 
    202     set_be_option_rep(&rep, client->opt, type, len);
    203     return nbd_write(client->ioc, &rep, sizeof(rep), errp);
    204 }
    205 
    206 /* Send a reply header with default 0 length.
    207  * Return -errno on error, 0 on success. */
    208 static int nbd_negotiate_send_rep(NBDClient *client, uint32_t type,
    209                                   Error **errp)
    210 {
    211     return nbd_negotiate_send_rep_len(client, type, 0, errp);
    212 }
    213 
    214 /* Send an error reply.
    215  * Return -errno on error, 0 on success. */
    216 static int G_GNUC_PRINTF(4, 0)
    217 nbd_negotiate_send_rep_verr(NBDClient *client, uint32_t type,
    218                             Error **errp, const char *fmt, va_list va)
    219 {
    220     ERRP_GUARD();
    221     g_autofree char *msg = NULL;
    222     int ret;
    223     size_t len;
    224 
    225     msg = g_strdup_vprintf(fmt, va);
    226     len = strlen(msg);
    227     assert(len < NBD_MAX_STRING_SIZE);
    228     trace_nbd_negotiate_send_rep_err(msg);
    229     ret = nbd_negotiate_send_rep_len(client, type, len, errp);
    230     if (ret < 0) {
    231         return ret;
    232     }
    233     if (nbd_write(client->ioc, msg, len, errp) < 0) {
    234         error_prepend(errp, "write failed (error message): ");
    235         return -EIO;
    236     }
    237 
    238     return 0;
    239 }
    240 
    241 /*
    242  * Return a malloc'd copy of @name suitable for use in an error reply.
    243  */
    244 static char *
    245 nbd_sanitize_name(const char *name)
    246 {
    247     if (strnlen(name, 80) < 80) {
    248         return g_strdup(name);
    249     }
    250     /* XXX Should we also try to sanitize any control characters? */
    251     return g_strdup_printf("%.80s...", name);
    252 }
    253 
    254 /* Send an error reply.
    255  * Return -errno on error, 0 on success. */
    256 static int G_GNUC_PRINTF(4, 5)
    257 nbd_negotiate_send_rep_err(NBDClient *client, uint32_t type,
    258                            Error **errp, const char *fmt, ...)
    259 {
    260     va_list va;
    261     int ret;
    262 
    263     va_start(va, fmt);
    264     ret = nbd_negotiate_send_rep_verr(client, type, errp, fmt, va);
    265     va_end(va);
    266     return ret;
    267 }
    268 
    269 /* Drop remainder of the current option, and send a reply with the
    270  * given error type and message. Return -errno on read or write
    271  * failure; or 0 if connection is still live. */
    272 static int G_GNUC_PRINTF(4, 0)
    273 nbd_opt_vdrop(NBDClient *client, uint32_t type, Error **errp,
    274               const char *fmt, va_list va)
    275 {
    276     int ret = nbd_drop(client->ioc, client->optlen, errp);
    277 
    278     client->optlen = 0;
    279     if (!ret) {
    280         ret = nbd_negotiate_send_rep_verr(client, type, errp, fmt, va);
    281     }
    282     return ret;
    283 }
    284 
    285 static int G_GNUC_PRINTF(4, 5)
    286 nbd_opt_drop(NBDClient *client, uint32_t type, Error **errp,
    287              const char *fmt, ...)
    288 {
    289     int ret;
    290     va_list va;
    291 
    292     va_start(va, fmt);
    293     ret = nbd_opt_vdrop(client, type, errp, fmt, va);
    294     va_end(va);
    295 
    296     return ret;
    297 }
    298 
    299 static int G_GNUC_PRINTF(3, 4)
    300 nbd_opt_invalid(NBDClient *client, Error **errp, const char *fmt, ...)
    301 {
    302     int ret;
    303     va_list va;
    304 
    305     va_start(va, fmt);
    306     ret = nbd_opt_vdrop(client, NBD_REP_ERR_INVALID, errp, fmt, va);
    307     va_end(va);
    308 
    309     return ret;
    310 }
    311 
    312 /* Read size bytes from the unparsed payload of the current option.
    313  * If @check_nul, require that no NUL bytes appear in buffer.
    314  * Return -errno on I/O error, 0 if option was completely handled by
    315  * sending a reply about inconsistent lengths, or 1 on success. */
    316 static int nbd_opt_read(NBDClient *client, void *buffer, size_t size,
    317                         bool check_nul, Error **errp)
    318 {
    319     if (size > client->optlen) {
    320         return nbd_opt_invalid(client, errp,
    321                                "Inconsistent lengths in option %s",
    322                                nbd_opt_lookup(client->opt));
    323     }
    324     client->optlen -= size;
    325     if (qio_channel_read_all(client->ioc, buffer, size, errp) < 0) {
    326         return -EIO;
    327     }
    328 
    329     if (check_nul && strnlen(buffer, size) != size) {
    330         return nbd_opt_invalid(client, errp,
    331                                "Unexpected embedded NUL in option %s",
    332                                nbd_opt_lookup(client->opt));
    333     }
    334     return 1;
    335 }
    336 
    337 /* Drop size bytes from the unparsed payload of the current option.
    338  * Return -errno on I/O error, 0 if option was completely handled by
    339  * sending a reply about inconsistent lengths, or 1 on success. */
    340 static int nbd_opt_skip(NBDClient *client, size_t size, Error **errp)
    341 {
    342     if (size > client->optlen) {
    343         return nbd_opt_invalid(client, errp,
    344                                "Inconsistent lengths in option %s",
    345                                nbd_opt_lookup(client->opt));
    346     }
    347     client->optlen -= size;
    348     return nbd_drop(client->ioc, size, errp) < 0 ? -EIO : 1;
    349 }
    350 
    351 /* nbd_opt_read_name
    352  *
    353  * Read a string with the format:
    354  *   uint32_t len     (<= NBD_MAX_STRING_SIZE)
    355  *   len bytes string (not 0-terminated)
    356  *
    357  * On success, @name will be allocated.
    358  * If @length is non-null, it will be set to the actual string length.
    359  *
    360  * Return -errno on I/O error, 0 if option was completely handled by
    361  * sending a reply about inconsistent lengths, or 1 on success.
    362  */
    363 static int nbd_opt_read_name(NBDClient *client, char **name, uint32_t *length,
    364                              Error **errp)
    365 {
    366     int ret;
    367     uint32_t len;
    368     g_autofree char *local_name = NULL;
    369 
    370     *name = NULL;
    371     ret = nbd_opt_read(client, &len, sizeof(len), false, errp);
    372     if (ret <= 0) {
    373         return ret;
    374     }
    375     len = cpu_to_be32(len);
    376 
    377     if (len > NBD_MAX_STRING_SIZE) {
    378         return nbd_opt_invalid(client, errp,
    379                                "Invalid name length: %" PRIu32, len);
    380     }
    381 
    382     local_name = g_malloc(len + 1);
    383     ret = nbd_opt_read(client, local_name, len, true, errp);
    384     if (ret <= 0) {
    385         return ret;
    386     }
    387     local_name[len] = '\0';
    388 
    389     if (length) {
    390         *length = len;
    391     }
    392     *name = g_steal_pointer(&local_name);
    393 
    394     return 1;
    395 }
    396 
    397 /* Send a single NBD_REP_SERVER reply to NBD_OPT_LIST, including payload.
    398  * Return -errno on error, 0 on success. */
    399 static int nbd_negotiate_send_rep_list(NBDClient *client, NBDExport *exp,
    400                                        Error **errp)
    401 {
    402     ERRP_GUARD();
    403     size_t name_len, desc_len;
    404     uint32_t len;
    405     const char *name = exp->name ? exp->name : "";
    406     const char *desc = exp->description ? exp->description : "";
    407     QIOChannel *ioc = client->ioc;
    408     int ret;
    409 
    410     trace_nbd_negotiate_send_rep_list(name, desc);
    411     name_len = strlen(name);
    412     desc_len = strlen(desc);
    413     assert(name_len <= NBD_MAX_STRING_SIZE && desc_len <= NBD_MAX_STRING_SIZE);
    414     len = name_len + desc_len + sizeof(len);
    415     ret = nbd_negotiate_send_rep_len(client, NBD_REP_SERVER, len, errp);
    416     if (ret < 0) {
    417         return ret;
    418     }
    419 
    420     len = cpu_to_be32(name_len);
    421     if (nbd_write(ioc, &len, sizeof(len), errp) < 0) {
    422         error_prepend(errp, "write failed (name length): ");
    423         return -EINVAL;
    424     }
    425 
    426     if (nbd_write(ioc, name, name_len, errp) < 0) {
    427         error_prepend(errp, "write failed (name buffer): ");
    428         return -EINVAL;
    429     }
    430 
    431     if (nbd_write(ioc, desc, desc_len, errp) < 0) {
    432         error_prepend(errp, "write failed (description buffer): ");
    433         return -EINVAL;
    434     }
    435 
    436     return 0;
    437 }
    438 
    439 /* Process the NBD_OPT_LIST command, with a potential series of replies.
    440  * Return -errno on error, 0 on success. */
    441 static int nbd_negotiate_handle_list(NBDClient *client, Error **errp)
    442 {
    443     NBDExport *exp;
    444     assert(client->opt == NBD_OPT_LIST);
    445 
    446     /* For each export, send a NBD_REP_SERVER reply. */
    447     QTAILQ_FOREACH(exp, &exports, next) {
    448         if (nbd_negotiate_send_rep_list(client, exp, errp)) {
    449             return -EINVAL;
    450         }
    451     }
    452     /* Finish with a NBD_REP_ACK. */
    453     return nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
    454 }
    455 
    456 static void nbd_check_meta_export(NBDClient *client)
    457 {
    458     if (client->exp != client->export_meta.exp) {
    459         client->export_meta.count = 0;
    460     }
    461 }
    462 
    463 /* Send a reply to NBD_OPT_EXPORT_NAME.
    464  * Return -errno on error, 0 on success. */
    465 static int nbd_negotiate_handle_export_name(NBDClient *client, bool no_zeroes,
    466                                             Error **errp)
    467 {
    468     ERRP_GUARD();
    469     g_autofree char *name = NULL;
    470     char buf[NBD_REPLY_EXPORT_NAME_SIZE] = "";
    471     size_t len;
    472     int ret;
    473     uint16_t myflags;
    474 
    475     /* Client sends:
    476         [20 ..  xx]   export name (length bytes)
    477        Server replies:
    478         [ 0 ..   7]   size
    479         [ 8 ..   9]   export flags
    480         [10 .. 133]   reserved     (0) [unless no_zeroes]
    481      */
    482     trace_nbd_negotiate_handle_export_name();
    483     if (client->optlen > NBD_MAX_STRING_SIZE) {
    484         error_setg(errp, "Bad length received");
    485         return -EINVAL;
    486     }
    487     name = g_malloc(client->optlen + 1);
    488     if (nbd_read(client->ioc, name, client->optlen, "export name", errp) < 0) {
    489         return -EIO;
    490     }
    491     name[client->optlen] = '\0';
    492     client->optlen = 0;
    493 
    494     trace_nbd_negotiate_handle_export_name_request(name);
    495 
    496     client->exp = nbd_export_find(name);
    497     if (!client->exp) {
    498         error_setg(errp, "export not found");
    499         return -EINVAL;
    500     }
    501 
    502     myflags = client->exp->nbdflags;
    503     if (client->structured_reply) {
    504         myflags |= NBD_FLAG_SEND_DF;
    505     }
    506     trace_nbd_negotiate_new_style_size_flags(client->exp->size, myflags);
    507     stq_be_p(buf, client->exp->size);
    508     stw_be_p(buf + 8, myflags);
    509     len = no_zeroes ? 10 : sizeof(buf);
    510     ret = nbd_write(client->ioc, buf, len, errp);
    511     if (ret < 0) {
    512         error_prepend(errp, "write failed: ");
    513         return ret;
    514     }
    515 
    516     QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
    517     blk_exp_ref(&client->exp->common);
    518     nbd_check_meta_export(client);
    519 
    520     return 0;
    521 }
    522 
    523 /* Send a single NBD_REP_INFO, with a buffer @buf of @length bytes.
    524  * The buffer does NOT include the info type prefix.
    525  * Return -errno on error, 0 if ready to send more. */
    526 static int nbd_negotiate_send_info(NBDClient *client,
    527                                    uint16_t info, uint32_t length, void *buf,
    528                                    Error **errp)
    529 {
    530     int rc;
    531 
    532     trace_nbd_negotiate_send_info(info, nbd_info_lookup(info), length);
    533     rc = nbd_negotiate_send_rep_len(client, NBD_REP_INFO,
    534                                     sizeof(info) + length, errp);
    535     if (rc < 0) {
    536         return rc;
    537     }
    538     info = cpu_to_be16(info);
    539     if (nbd_write(client->ioc, &info, sizeof(info), errp) < 0) {
    540         return -EIO;
    541     }
    542     if (nbd_write(client->ioc, buf, length, errp) < 0) {
    543         return -EIO;
    544     }
    545     return 0;
    546 }
    547 
    548 /* nbd_reject_length: Handle any unexpected payload.
    549  * @fatal requests that we quit talking to the client, even if we are able
    550  * to successfully send an error reply.
    551  * Return:
    552  * -errno  transmission error occurred or @fatal was requested, errp is set
    553  * 0       error message successfully sent to client, errp is not set
    554  */
    555 static int nbd_reject_length(NBDClient *client, bool fatal, Error **errp)
    556 {
    557     int ret;
    558 
    559     assert(client->optlen);
    560     ret = nbd_opt_invalid(client, errp, "option '%s' has unexpected length",
    561                           nbd_opt_lookup(client->opt));
    562     if (fatal && !ret) {
    563         error_setg(errp, "option '%s' has unexpected length",
    564                    nbd_opt_lookup(client->opt));
    565         return -EINVAL;
    566     }
    567     return ret;
    568 }
    569 
    570 /* Handle NBD_OPT_INFO and NBD_OPT_GO.
    571  * Return -errno on error, 0 if ready for next option, and 1 to move
    572  * into transmission phase.  */
    573 static int nbd_negotiate_handle_info(NBDClient *client, Error **errp)
    574 {
    575     int rc;
    576     g_autofree char *name = NULL;
    577     NBDExport *exp;
    578     uint16_t requests;
    579     uint16_t request;
    580     uint32_t namelen = 0;
    581     bool sendname = false;
    582     bool blocksize = false;
    583     uint32_t sizes[3];
    584     char buf[sizeof(uint64_t) + sizeof(uint16_t)];
    585     uint32_t check_align = 0;
    586     uint16_t myflags;
    587 
    588     /* Client sends:
    589         4 bytes: L, name length (can be 0)
    590         L bytes: export name
    591         2 bytes: N, number of requests (can be 0)
    592         N * 2 bytes: N requests
    593     */
    594     rc = nbd_opt_read_name(client, &name, &namelen, errp);
    595     if (rc <= 0) {
    596         return rc;
    597     }
    598     trace_nbd_negotiate_handle_export_name_request(name);
    599 
    600     rc = nbd_opt_read(client, &requests, sizeof(requests), false, errp);
    601     if (rc <= 0) {
    602         return rc;
    603     }
    604     requests = be16_to_cpu(requests);
    605     trace_nbd_negotiate_handle_info_requests(requests);
    606     while (requests--) {
    607         rc = nbd_opt_read(client, &request, sizeof(request), false, errp);
    608         if (rc <= 0) {
    609             return rc;
    610         }
    611         request = be16_to_cpu(request);
    612         trace_nbd_negotiate_handle_info_request(request,
    613                                                 nbd_info_lookup(request));
    614         /* We care about NBD_INFO_NAME and NBD_INFO_BLOCK_SIZE;
    615          * everything else is either a request we don't know or
    616          * something we send regardless of request */
    617         switch (request) {
    618         case NBD_INFO_NAME:
    619             sendname = true;
    620             break;
    621         case NBD_INFO_BLOCK_SIZE:
    622             blocksize = true;
    623             break;
    624         }
    625     }
    626     if (client->optlen) {
    627         return nbd_reject_length(client, false, errp);
    628     }
    629 
    630     exp = nbd_export_find(name);
    631     if (!exp) {
    632         g_autofree char *sane_name = nbd_sanitize_name(name);
    633 
    634         return nbd_negotiate_send_rep_err(client, NBD_REP_ERR_UNKNOWN,
    635                                           errp, "export '%s' not present",
    636                                           sane_name);
    637     }
    638 
    639     /* Don't bother sending NBD_INFO_NAME unless client requested it */
    640     if (sendname) {
    641         rc = nbd_negotiate_send_info(client, NBD_INFO_NAME, namelen, name,
    642                                      errp);
    643         if (rc < 0) {
    644             return rc;
    645         }
    646     }
    647 
    648     /* Send NBD_INFO_DESCRIPTION only if available, regardless of
    649      * client request */
    650     if (exp->description) {
    651         size_t len = strlen(exp->description);
    652 
    653         assert(len <= NBD_MAX_STRING_SIZE);
    654         rc = nbd_negotiate_send_info(client, NBD_INFO_DESCRIPTION,
    655                                      len, exp->description, errp);
    656         if (rc < 0) {
    657             return rc;
    658         }
    659     }
    660 
    661     /* Send NBD_INFO_BLOCK_SIZE always, but tweak the minimum size
    662      * according to whether the client requested it, and according to
    663      * whether this is OPT_INFO or OPT_GO. */
    664     /* minimum - 1 for back-compat, or actual if client will obey it. */
    665     if (client->opt == NBD_OPT_INFO || blocksize) {
    666         check_align = sizes[0] = blk_get_request_alignment(exp->common.blk);
    667     } else {
    668         sizes[0] = 1;
    669     }
    670     assert(sizes[0] <= NBD_MAX_BUFFER_SIZE);
    671     /* preferred - Hard-code to 4096 for now.
    672      * TODO: is blk_bs(blk)->bl.opt_transfer appropriate? */
    673     sizes[1] = MAX(4096, sizes[0]);
    674     /* maximum - At most 32M, but smaller as appropriate. */
    675     sizes[2] = MIN(blk_get_max_transfer(exp->common.blk), NBD_MAX_BUFFER_SIZE);
    676     trace_nbd_negotiate_handle_info_block_size(sizes[0], sizes[1], sizes[2]);
    677     sizes[0] = cpu_to_be32(sizes[0]);
    678     sizes[1] = cpu_to_be32(sizes[1]);
    679     sizes[2] = cpu_to_be32(sizes[2]);
    680     rc = nbd_negotiate_send_info(client, NBD_INFO_BLOCK_SIZE,
    681                                  sizeof(sizes), sizes, errp);
    682     if (rc < 0) {
    683         return rc;
    684     }
    685 
    686     /* Send NBD_INFO_EXPORT always */
    687     myflags = exp->nbdflags;
    688     if (client->structured_reply) {
    689         myflags |= NBD_FLAG_SEND_DF;
    690     }
    691     trace_nbd_negotiate_new_style_size_flags(exp->size, myflags);
    692     stq_be_p(buf, exp->size);
    693     stw_be_p(buf + 8, myflags);
    694     rc = nbd_negotiate_send_info(client, NBD_INFO_EXPORT,
    695                                  sizeof(buf), buf, errp);
    696     if (rc < 0) {
    697         return rc;
    698     }
    699 
    700     /*
    701      * If the client is just asking for NBD_OPT_INFO, but forgot to
    702      * request block sizes in a situation that would impact
    703      * performance, then return an error. But for NBD_OPT_GO, we
    704      * tolerate all clients, regardless of alignments.
    705      */
    706     if (client->opt == NBD_OPT_INFO && !blocksize &&
    707         blk_get_request_alignment(exp->common.blk) > 1) {
    708         return nbd_negotiate_send_rep_err(client,
    709                                           NBD_REP_ERR_BLOCK_SIZE_REQD,
    710                                           errp,
    711                                           "request NBD_INFO_BLOCK_SIZE to "
    712                                           "use this export");
    713     }
    714 
    715     /* Final reply */
    716     rc = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
    717     if (rc < 0) {
    718         return rc;
    719     }
    720 
    721     if (client->opt == NBD_OPT_GO) {
    722         client->exp = exp;
    723         client->check_align = check_align;
    724         QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
    725         blk_exp_ref(&client->exp->common);
    726         nbd_check_meta_export(client);
    727         rc = 1;
    728     }
    729     return rc;
    730 }
    731 
    732 
    733 /* Handle NBD_OPT_STARTTLS. Return NULL to drop connection, or else the
    734  * new channel for all further (now-encrypted) communication. */
    735 static QIOChannel *nbd_negotiate_handle_starttls(NBDClient *client,
    736                                                  Error **errp)
    737 {
    738     QIOChannel *ioc;
    739     QIOChannelTLS *tioc;
    740     struct NBDTLSHandshakeData data = { 0 };
    741 
    742     assert(client->opt == NBD_OPT_STARTTLS);
    743 
    744     trace_nbd_negotiate_handle_starttls();
    745     ioc = client->ioc;
    746 
    747     if (nbd_negotiate_send_rep(client, NBD_REP_ACK, errp) < 0) {
    748         return NULL;
    749     }
    750 
    751     tioc = qio_channel_tls_new_server(ioc,
    752                                       client->tlscreds,
    753                                       client->tlsauthz,
    754                                       errp);
    755     if (!tioc) {
    756         return NULL;
    757     }
    758 
    759     qio_channel_set_name(QIO_CHANNEL(tioc), "nbd-server-tls");
    760     trace_nbd_negotiate_handle_starttls_handshake();
    761     data.loop = g_main_loop_new(g_main_context_default(), FALSE);
    762     qio_channel_tls_handshake(tioc,
    763                               nbd_tls_handshake,
    764                               &data,
    765                               NULL,
    766                               NULL);
    767 
    768     if (!data.complete) {
    769         g_main_loop_run(data.loop);
    770     }
    771     g_main_loop_unref(data.loop);
    772     if (data.error) {
    773         object_unref(OBJECT(tioc));
    774         error_propagate(errp, data.error);
    775         return NULL;
    776     }
    777 
    778     return QIO_CHANNEL(tioc);
    779 }
    780 
    781 /* nbd_negotiate_send_meta_context
    782  *
    783  * Send one chunk of reply to NBD_OPT_{LIST,SET}_META_CONTEXT
    784  *
    785  * For NBD_OPT_LIST_META_CONTEXT @context_id is ignored, 0 is used instead.
    786  */
    787 static int nbd_negotiate_send_meta_context(NBDClient *client,
    788                                            const char *context,
    789                                            uint32_t context_id,
    790                                            Error **errp)
    791 {
    792     NBDOptionReplyMetaContext opt;
    793     struct iovec iov[] = {
    794         {.iov_base = &opt, .iov_len = sizeof(opt)},
    795         {.iov_base = (void *)context, .iov_len = strlen(context)}
    796     };
    797 
    798     assert(iov[1].iov_len <= NBD_MAX_STRING_SIZE);
    799     if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
    800         context_id = 0;
    801     }
    802 
    803     trace_nbd_negotiate_meta_query_reply(context, context_id);
    804     set_be_option_rep(&opt.h, client->opt, NBD_REP_META_CONTEXT,
    805                       sizeof(opt) - sizeof(opt.h) + iov[1].iov_len);
    806     stl_be_p(&opt.context_id, context_id);
    807 
    808     return qio_channel_writev_all(client->ioc, iov, 2, errp) < 0 ? -EIO : 0;
    809 }
    810 
    811 /*
    812  * Return true if @query matches @pattern, or if @query is empty when
    813  * the @client is performing _LIST_.
    814  */
    815 static bool nbd_meta_empty_or_pattern(NBDClient *client, const char *pattern,
    816                                       const char *query)
    817 {
    818     if (!*query) {
    819         trace_nbd_negotiate_meta_query_parse("empty");
    820         return client->opt == NBD_OPT_LIST_META_CONTEXT;
    821     }
    822     if (strcmp(query, pattern) == 0) {
    823         trace_nbd_negotiate_meta_query_parse(pattern);
    824         return true;
    825     }
    826     trace_nbd_negotiate_meta_query_skip("pattern not matched");
    827     return false;
    828 }
    829 
    830 /*
    831  * Return true and adjust @str in place if it begins with @prefix.
    832  */
    833 static bool nbd_strshift(const char **str, const char *prefix)
    834 {
    835     size_t len = strlen(prefix);
    836 
    837     if (strncmp(*str, prefix, len) == 0) {
    838         *str += len;
    839         return true;
    840     }
    841     return false;
    842 }
    843 
    844 /* nbd_meta_base_query
    845  *
    846  * Handle queries to 'base' namespace. For now, only the base:allocation
    847  * context is available.  Return true if @query has been handled.
    848  */
    849 static bool nbd_meta_base_query(NBDClient *client, NBDExportMetaContexts *meta,
    850                                 const char *query)
    851 {
    852     if (!nbd_strshift(&query, "base:")) {
    853         return false;
    854     }
    855     trace_nbd_negotiate_meta_query_parse("base:");
    856 
    857     if (nbd_meta_empty_or_pattern(client, "allocation", query)) {
    858         meta->base_allocation = true;
    859     }
    860     return true;
    861 }
    862 
    863 /* nbd_meta_qemu_query
    864  *
    865  * Handle queries to 'qemu' namespace. For now, only the qemu:dirty-bitmap:
    866  * and qemu:allocation-depth contexts are available.  Return true if @query
    867  * has been handled.
    868  */
    869 static bool nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta,
    870                                 const char *query)
    871 {
    872     size_t i;
    873 
    874     if (!nbd_strshift(&query, "qemu:")) {
    875         return false;
    876     }
    877     trace_nbd_negotiate_meta_query_parse("qemu:");
    878 
    879     if (!*query) {
    880         if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
    881             meta->allocation_depth = meta->exp->allocation_depth;
    882             if (meta->exp->nr_export_bitmaps) {
    883                 memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
    884             }
    885         }
    886         trace_nbd_negotiate_meta_query_parse("empty");
    887         return true;
    888     }
    889 
    890     if (strcmp(query, "allocation-depth") == 0) {
    891         trace_nbd_negotiate_meta_query_parse("allocation-depth");
    892         meta->allocation_depth = meta->exp->allocation_depth;
    893         return true;
    894     }
    895 
    896     if (nbd_strshift(&query, "dirty-bitmap:")) {
    897         trace_nbd_negotiate_meta_query_parse("dirty-bitmap:");
    898         if (!*query) {
    899             if (client->opt == NBD_OPT_LIST_META_CONTEXT &&
    900                 meta->exp->nr_export_bitmaps) {
    901                 memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
    902             }
    903             trace_nbd_negotiate_meta_query_parse("empty");
    904             return true;
    905         }
    906 
    907         for (i = 0; i < meta->exp->nr_export_bitmaps; i++) {
    908             const char *bm_name;
    909 
    910             bm_name = bdrv_dirty_bitmap_name(meta->exp->export_bitmaps[i]);
    911             if (strcmp(bm_name, query) == 0) {
    912                 meta->bitmaps[i] = true;
    913                 trace_nbd_negotiate_meta_query_parse(query);
    914                 return true;
    915             }
    916         }
    917         trace_nbd_negotiate_meta_query_skip("no dirty-bitmap match");
    918         return true;
    919     }
    920 
    921     trace_nbd_negotiate_meta_query_skip("unknown qemu context");
    922     return true;
    923 }
    924 
    925 /* nbd_negotiate_meta_query
    926  *
    927  * Parse namespace name and call corresponding function to parse body of the
    928  * query.
    929  *
    930  * The only supported namespaces are 'base' and 'qemu'.
    931  *
    932  * Return -errno on I/O error, 0 if option was completely handled by
    933  * sending a reply about inconsistent lengths, or 1 on success. */
    934 static int nbd_negotiate_meta_query(NBDClient *client,
    935                                     NBDExportMetaContexts *meta, Error **errp)
    936 {
    937     int ret;
    938     g_autofree char *query = NULL;
    939     uint32_t len;
    940 
    941     ret = nbd_opt_read(client, &len, sizeof(len), false, errp);
    942     if (ret <= 0) {
    943         return ret;
    944     }
    945     len = cpu_to_be32(len);
    946 
    947     if (len > NBD_MAX_STRING_SIZE) {
    948         trace_nbd_negotiate_meta_query_skip("length too long");
    949         return nbd_opt_skip(client, len, errp);
    950     }
    951 
    952     query = g_malloc(len + 1);
    953     ret = nbd_opt_read(client, query, len, true, errp);
    954     if (ret <= 0) {
    955         return ret;
    956     }
    957     query[len] = '\0';
    958 
    959     if (nbd_meta_base_query(client, meta, query)) {
    960         return 1;
    961     }
    962     if (nbd_meta_qemu_query(client, meta, query)) {
    963         return 1;
    964     }
    965 
    966     trace_nbd_negotiate_meta_query_skip("unknown namespace");
    967     return 1;
    968 }
    969 
    970 /* nbd_negotiate_meta_queries
    971  * Handle NBD_OPT_LIST_META_CONTEXT and NBD_OPT_SET_META_CONTEXT
    972  *
    973  * Return -errno on I/O error, or 0 if option was completely handled. */
    974 static int nbd_negotiate_meta_queries(NBDClient *client,
    975                                       NBDExportMetaContexts *meta, Error **errp)
    976 {
    977     int ret;
    978     g_autofree char *export_name = NULL;
    979     /* Mark unused to work around https://bugs.llvm.org/show_bug.cgi?id=3888 */
    980     g_autofree G_GNUC_UNUSED bool *bitmaps = NULL;
    981     NBDExportMetaContexts local_meta = {0};
    982     uint32_t nb_queries;
    983     size_t i;
    984     size_t count = 0;
    985 
    986     if (client->opt == NBD_OPT_SET_META_CONTEXT && !client->structured_reply) {
    987         return nbd_opt_invalid(client, errp,
    988                                "request option '%s' when structured reply "
    989                                "is not negotiated",
    990                                nbd_opt_lookup(client->opt));
    991     }
    992 
    993     if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
    994         /* Only change the caller's meta on SET. */
    995         meta = &local_meta;
    996     }
    997 
    998     g_free(meta->bitmaps);
    999     memset(meta, 0, sizeof(*meta));
   1000 
   1001     ret = nbd_opt_read_name(client, &export_name, NULL, errp);
   1002     if (ret <= 0) {
   1003         return ret;
   1004     }
   1005 
   1006     meta->exp = nbd_export_find(export_name);
   1007     if (meta->exp == NULL) {
   1008         g_autofree char *sane_name = nbd_sanitize_name(export_name);
   1009 
   1010         return nbd_opt_drop(client, NBD_REP_ERR_UNKNOWN, errp,
   1011                             "export '%s' not present", sane_name);
   1012     }
   1013     meta->bitmaps = g_new0(bool, meta->exp->nr_export_bitmaps);
   1014     if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
   1015         bitmaps = meta->bitmaps;
   1016     }
   1017 
   1018     ret = nbd_opt_read(client, &nb_queries, sizeof(nb_queries), false, errp);
   1019     if (ret <= 0) {
   1020         return ret;
   1021     }
   1022     nb_queries = cpu_to_be32(nb_queries);
   1023     trace_nbd_negotiate_meta_context(nbd_opt_lookup(client->opt),
   1024                                      export_name, nb_queries);
   1025 
   1026     if (client->opt == NBD_OPT_LIST_META_CONTEXT && !nb_queries) {
   1027         /* enable all known contexts */
   1028         meta->base_allocation = true;
   1029         meta->allocation_depth = meta->exp->allocation_depth;
   1030         if (meta->exp->nr_export_bitmaps) {
   1031             memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
   1032         }
   1033     } else {
   1034         for (i = 0; i < nb_queries; ++i) {
   1035             ret = nbd_negotiate_meta_query(client, meta, errp);
   1036             if (ret <= 0) {
   1037                 return ret;
   1038             }
   1039         }
   1040     }
   1041 
   1042     if (meta->base_allocation) {
   1043         ret = nbd_negotiate_send_meta_context(client, "base:allocation",
   1044                                               NBD_META_ID_BASE_ALLOCATION,
   1045                                               errp);
   1046         if (ret < 0) {
   1047             return ret;
   1048         }
   1049         count++;
   1050     }
   1051 
   1052     if (meta->allocation_depth) {
   1053         ret = nbd_negotiate_send_meta_context(client, "qemu:allocation-depth",
   1054                                               NBD_META_ID_ALLOCATION_DEPTH,
   1055                                               errp);
   1056         if (ret < 0) {
   1057             return ret;
   1058         }
   1059         count++;
   1060     }
   1061 
   1062     for (i = 0; i < meta->exp->nr_export_bitmaps; i++) {
   1063         const char *bm_name;
   1064         g_autofree char *context = NULL;
   1065 
   1066         if (!meta->bitmaps[i]) {
   1067             continue;
   1068         }
   1069 
   1070         bm_name = bdrv_dirty_bitmap_name(meta->exp->export_bitmaps[i]);
   1071         context = g_strdup_printf("qemu:dirty-bitmap:%s", bm_name);
   1072 
   1073         ret = nbd_negotiate_send_meta_context(client, context,
   1074                                               NBD_META_ID_DIRTY_BITMAP + i,
   1075                                               errp);
   1076         if (ret < 0) {
   1077             return ret;
   1078         }
   1079         count++;
   1080     }
   1081 
   1082     ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
   1083     if (ret == 0) {
   1084         meta->count = count;
   1085     }
   1086 
   1087     return ret;
   1088 }
   1089 
   1090 /* nbd_negotiate_options
   1091  * Process all NBD_OPT_* client option commands, during fixed newstyle
   1092  * negotiation.
   1093  * Return:
   1094  * -errno  on error, errp is set
   1095  * 0       on successful negotiation, errp is not set
   1096  * 1       if client sent NBD_OPT_ABORT, i.e. on valid disconnect,
   1097  *         errp is not set
   1098  */
   1099 static int nbd_negotiate_options(NBDClient *client, Error **errp)
   1100 {
   1101     uint32_t flags;
   1102     bool fixedNewstyle = false;
   1103     bool no_zeroes = false;
   1104 
   1105     /* Client sends:
   1106         [ 0 ..   3]   client flags
   1107 
   1108        Then we loop until NBD_OPT_EXPORT_NAME or NBD_OPT_GO:
   1109         [ 0 ..   7]   NBD_OPTS_MAGIC
   1110         [ 8 ..  11]   NBD option
   1111         [12 ..  15]   Data length
   1112         ...           Rest of request
   1113 
   1114         [ 0 ..   7]   NBD_OPTS_MAGIC
   1115         [ 8 ..  11]   Second NBD option
   1116         [12 ..  15]   Data length
   1117         ...           Rest of request
   1118     */
   1119 
   1120     if (nbd_read32(client->ioc, &flags, "flags", errp) < 0) {
   1121         return -EIO;
   1122     }
   1123     trace_nbd_negotiate_options_flags(flags);
   1124     if (flags & NBD_FLAG_C_FIXED_NEWSTYLE) {
   1125         fixedNewstyle = true;
   1126         flags &= ~NBD_FLAG_C_FIXED_NEWSTYLE;
   1127     }
   1128     if (flags & NBD_FLAG_C_NO_ZEROES) {
   1129         no_zeroes = true;
   1130         flags &= ~NBD_FLAG_C_NO_ZEROES;
   1131     }
   1132     if (flags != 0) {
   1133         error_setg(errp, "Unknown client flags 0x%" PRIx32 " received", flags);
   1134         return -EINVAL;
   1135     }
   1136 
   1137     while (1) {
   1138         int ret;
   1139         uint32_t option, length;
   1140         uint64_t magic;
   1141 
   1142         if (nbd_read64(client->ioc, &magic, "opts magic", errp) < 0) {
   1143             return -EINVAL;
   1144         }
   1145         trace_nbd_negotiate_options_check_magic(magic);
   1146         if (magic != NBD_OPTS_MAGIC) {
   1147             error_setg(errp, "Bad magic received");
   1148             return -EINVAL;
   1149         }
   1150 
   1151         if (nbd_read32(client->ioc, &option, "option", errp) < 0) {
   1152             return -EINVAL;
   1153         }
   1154         client->opt = option;
   1155 
   1156         if (nbd_read32(client->ioc, &length, "option length", errp) < 0) {
   1157             return -EINVAL;
   1158         }
   1159         assert(!client->optlen);
   1160         client->optlen = length;
   1161 
   1162         if (length > NBD_MAX_BUFFER_SIZE) {
   1163             error_setg(errp, "len (%" PRIu32" ) is larger than max len (%u)",
   1164                        length, NBD_MAX_BUFFER_SIZE);
   1165             return -EINVAL;
   1166         }
   1167 
   1168         trace_nbd_negotiate_options_check_option(option,
   1169                                                  nbd_opt_lookup(option));
   1170         if (client->tlscreds &&
   1171             client->ioc == (QIOChannel *)client->sioc) {
   1172             QIOChannel *tioc;
   1173             if (!fixedNewstyle) {
   1174                 error_setg(errp, "Unsupported option 0x%" PRIx32, option);
   1175                 return -EINVAL;
   1176             }
   1177             switch (option) {
   1178             case NBD_OPT_STARTTLS:
   1179                 if (length) {
   1180                     /* Unconditionally drop the connection if the client
   1181                      * can't start a TLS negotiation correctly */
   1182                     return nbd_reject_length(client, true, errp);
   1183                 }
   1184                 tioc = nbd_negotiate_handle_starttls(client, errp);
   1185                 if (!tioc) {
   1186                     return -EIO;
   1187                 }
   1188                 ret = 0;
   1189                 object_unref(OBJECT(client->ioc));
   1190                 client->ioc = QIO_CHANNEL(tioc);
   1191                 break;
   1192 
   1193             case NBD_OPT_EXPORT_NAME:
   1194                 /* No way to return an error to client, so drop connection */
   1195                 error_setg(errp, "Option 0x%x not permitted before TLS",
   1196                            option);
   1197                 return -EINVAL;
   1198 
   1199             default:
   1200                 /* Let the client keep trying, unless they asked to
   1201                  * quit. Always try to give an error back to the
   1202                  * client; but when replying to OPT_ABORT, be aware
   1203                  * that the client may hang up before receiving the
   1204                  * error, in which case we are fine ignoring the
   1205                  * resulting EPIPE. */
   1206                 ret = nbd_opt_drop(client, NBD_REP_ERR_TLS_REQD,
   1207                                    option == NBD_OPT_ABORT ? NULL : errp,
   1208                                    "Option 0x%" PRIx32
   1209                                    " not permitted before TLS", option);
   1210                 if (option == NBD_OPT_ABORT) {
   1211                     return 1;
   1212                 }
   1213                 break;
   1214             }
   1215         } else if (fixedNewstyle) {
   1216             switch (option) {
   1217             case NBD_OPT_LIST:
   1218                 if (length) {
   1219                     ret = nbd_reject_length(client, false, errp);
   1220                 } else {
   1221                     ret = nbd_negotiate_handle_list(client, errp);
   1222                 }
   1223                 break;
   1224 
   1225             case NBD_OPT_ABORT:
   1226                 /* NBD spec says we must try to reply before
   1227                  * disconnecting, but that we must also tolerate
   1228                  * guests that don't wait for our reply. */
   1229                 nbd_negotiate_send_rep(client, NBD_REP_ACK, NULL);
   1230                 return 1;
   1231 
   1232             case NBD_OPT_EXPORT_NAME:
   1233                 return nbd_negotiate_handle_export_name(client, no_zeroes,
   1234                                                         errp);
   1235 
   1236             case NBD_OPT_INFO:
   1237             case NBD_OPT_GO:
   1238                 ret = nbd_negotiate_handle_info(client, errp);
   1239                 if (ret == 1) {
   1240                     assert(option == NBD_OPT_GO);
   1241                     return 0;
   1242                 }
   1243                 break;
   1244 
   1245             case NBD_OPT_STARTTLS:
   1246                 if (length) {
   1247                     ret = nbd_reject_length(client, false, errp);
   1248                 } else if (client->tlscreds) {
   1249                     ret = nbd_negotiate_send_rep_err(client,
   1250                                                      NBD_REP_ERR_INVALID, errp,
   1251                                                      "TLS already enabled");
   1252                 } else {
   1253                     ret = nbd_negotiate_send_rep_err(client,
   1254                                                      NBD_REP_ERR_POLICY, errp,
   1255                                                      "TLS not configured");
   1256                 }
   1257                 break;
   1258 
   1259             case NBD_OPT_STRUCTURED_REPLY:
   1260                 if (length) {
   1261                     ret = nbd_reject_length(client, false, errp);
   1262                 } else if (client->structured_reply) {
   1263                     ret = nbd_negotiate_send_rep_err(
   1264                         client, NBD_REP_ERR_INVALID, errp,
   1265                         "structured reply already negotiated");
   1266                 } else {
   1267                     ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
   1268                     client->structured_reply = true;
   1269                 }
   1270                 break;
   1271 
   1272             case NBD_OPT_LIST_META_CONTEXT:
   1273             case NBD_OPT_SET_META_CONTEXT:
   1274                 ret = nbd_negotiate_meta_queries(client, &client->export_meta,
   1275                                                  errp);
   1276                 break;
   1277 
   1278             default:
   1279                 ret = nbd_opt_drop(client, NBD_REP_ERR_UNSUP, errp,
   1280                                    "Unsupported option %" PRIu32 " (%s)",
   1281                                    option, nbd_opt_lookup(option));
   1282                 break;
   1283             }
   1284         } else {
   1285             /*
   1286              * If broken new-style we should drop the connection
   1287              * for anything except NBD_OPT_EXPORT_NAME
   1288              */
   1289             switch (option) {
   1290             case NBD_OPT_EXPORT_NAME:
   1291                 return nbd_negotiate_handle_export_name(client, no_zeroes,
   1292                                                         errp);
   1293 
   1294             default:
   1295                 error_setg(errp, "Unsupported option %" PRIu32 " (%s)",
   1296                            option, nbd_opt_lookup(option));
   1297                 return -EINVAL;
   1298             }
   1299         }
   1300         if (ret < 0) {
   1301             return ret;
   1302         }
   1303     }
   1304 }
   1305 
   1306 /* nbd_negotiate
   1307  * Return:
   1308  * -errno  on error, errp is set
   1309  * 0       on successful negotiation, errp is not set
   1310  * 1       if client sent NBD_OPT_ABORT, i.e. on valid disconnect,
   1311  *         errp is not set
   1312  */
   1313 static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
   1314 {
   1315     ERRP_GUARD();
   1316     char buf[NBD_OLDSTYLE_NEGOTIATE_SIZE] = "";
   1317     int ret;
   1318 
   1319     /* Old style negotiation header, no room for options
   1320         [ 0 ..   7]   passwd       ("NBDMAGIC")
   1321         [ 8 ..  15]   magic        (NBD_CLIENT_MAGIC)
   1322         [16 ..  23]   size
   1323         [24 ..  27]   export flags (zero-extended)
   1324         [28 .. 151]   reserved     (0)
   1325 
   1326        New style negotiation header, client can send options
   1327         [ 0 ..   7]   passwd       ("NBDMAGIC")
   1328         [ 8 ..  15]   magic        (NBD_OPTS_MAGIC)
   1329         [16 ..  17]   server flags (0)
   1330         ....options sent, ending in NBD_OPT_EXPORT_NAME or NBD_OPT_GO....
   1331      */
   1332 
   1333     qio_channel_set_blocking(client->ioc, false, NULL);
   1334 
   1335     trace_nbd_negotiate_begin();
   1336     memcpy(buf, "NBDMAGIC", 8);
   1337 
   1338     stq_be_p(buf + 8, NBD_OPTS_MAGIC);
   1339     stw_be_p(buf + 16, NBD_FLAG_FIXED_NEWSTYLE | NBD_FLAG_NO_ZEROES);
   1340 
   1341     if (nbd_write(client->ioc, buf, 18, errp) < 0) {
   1342         error_prepend(errp, "write failed: ");
   1343         return -EINVAL;
   1344     }
   1345     ret = nbd_negotiate_options(client, errp);
   1346     if (ret != 0) {
   1347         if (ret < 0) {
   1348             error_prepend(errp, "option negotiation failed: ");
   1349         }
   1350         return ret;
   1351     }
   1352 
   1353     /* Attach the channel to the same AioContext as the export */
   1354     if (client->exp && client->exp->common.ctx) {
   1355         qio_channel_attach_aio_context(client->ioc, client->exp->common.ctx);
   1356     }
   1357 
   1358     assert(!client->optlen);
   1359     trace_nbd_negotiate_success();
   1360 
   1361     return 0;
   1362 }
   1363 
   1364 /* nbd_read_eof
   1365  * Tries to read @size bytes from @ioc. This is a local implementation of
   1366  * qio_channel_readv_all_eof. We have it here because we need it to be
   1367  * interruptible and to know when the coroutine is yielding.
   1368  * Returns 1 on success
   1369  *         0 on eof, when no data was read (errp is not set)
   1370  *         negative errno on failure (errp is set)
   1371  */
   1372 static inline int coroutine_fn
   1373 nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
   1374 {
   1375     bool partial = false;
   1376 
   1377     assert(size);
   1378     while (size > 0) {
   1379         struct iovec iov = { .iov_base = buffer, .iov_len = size };
   1380         ssize_t len;
   1381 
   1382         len = qio_channel_readv(client->ioc, &iov, 1, errp);
   1383         if (len == QIO_CHANNEL_ERR_BLOCK) {
   1384             client->read_yielding = true;
   1385             qio_channel_yield(client->ioc, G_IO_IN);
   1386             client->read_yielding = false;
   1387             if (client->quiescing) {
   1388                 return -EAGAIN;
   1389             }
   1390             continue;
   1391         } else if (len < 0) {
   1392             return -EIO;
   1393         } else if (len == 0) {
   1394             if (partial) {
   1395                 error_setg(errp,
   1396                            "Unexpected end-of-file before all bytes were read");
   1397                 return -EIO;
   1398             } else {
   1399                 return 0;
   1400             }
   1401         }
   1402 
   1403         partial = true;
   1404         size -= len;
   1405         buffer = (uint8_t *) buffer + len;
   1406     }
   1407     return 1;
   1408 }
   1409 
   1410 static int nbd_receive_request(NBDClient *client, NBDRequest *request,
   1411                                Error **errp)
   1412 {
   1413     uint8_t buf[NBD_REQUEST_SIZE];
   1414     uint32_t magic;
   1415     int ret;
   1416 
   1417     ret = nbd_read_eof(client, buf, sizeof(buf), errp);
   1418     if (ret < 0) {
   1419         return ret;
   1420     }
   1421     if (ret == 0) {
   1422         return -EIO;
   1423     }
   1424 
   1425     /* Request
   1426        [ 0 ..  3]   magic   (NBD_REQUEST_MAGIC)
   1427        [ 4 ..  5]   flags   (NBD_CMD_FLAG_FUA, ...)
   1428        [ 6 ..  7]   type    (NBD_CMD_READ, ...)
   1429        [ 8 .. 15]   handle
   1430        [16 .. 23]   from
   1431        [24 .. 27]   len
   1432      */
   1433 
   1434     magic = ldl_be_p(buf);
   1435     request->flags  = lduw_be_p(buf + 4);
   1436     request->type   = lduw_be_p(buf + 6);
   1437     request->handle = ldq_be_p(buf + 8);
   1438     request->from   = ldq_be_p(buf + 16);
   1439     request->len    = ldl_be_p(buf + 24);
   1440 
   1441     trace_nbd_receive_request(magic, request->flags, request->type,
   1442                               request->from, request->len);
   1443 
   1444     if (magic != NBD_REQUEST_MAGIC) {
   1445         error_setg(errp, "invalid magic (got 0x%" PRIx32 ")", magic);
   1446         return -EINVAL;
   1447     }
   1448     return 0;
   1449 }
   1450 
   1451 #define MAX_NBD_REQUESTS 16
   1452 
   1453 void nbd_client_get(NBDClient *client)
   1454 {
   1455     client->refcount++;
   1456 }
   1457 
   1458 void nbd_client_put(NBDClient *client)
   1459 {
   1460     if (--client->refcount == 0) {
   1461         /* The last reference should be dropped by client->close,
   1462          * which is called by client_close.
   1463          */
   1464         assert(client->closing);
   1465 
   1466         qio_channel_detach_aio_context(client->ioc);
   1467         object_unref(OBJECT(client->sioc));
   1468         object_unref(OBJECT(client->ioc));
   1469         if (client->tlscreds) {
   1470             object_unref(OBJECT(client->tlscreds));
   1471         }
   1472         g_free(client->tlsauthz);
   1473         if (client->exp) {
   1474             QTAILQ_REMOVE(&client->exp->clients, client, next);
   1475             blk_exp_unref(&client->exp->common);
   1476         }
   1477         g_free(client->export_meta.bitmaps);
   1478         g_free(client);
   1479     }
   1480 }
   1481 
   1482 static void client_close(NBDClient *client, bool negotiated)
   1483 {
   1484     if (client->closing) {
   1485         return;
   1486     }
   1487 
   1488     client->closing = true;
   1489 
   1490     /* Force requests to finish.  They will drop their own references,
   1491      * then we'll close the socket and free the NBDClient.
   1492      */
   1493     qio_channel_shutdown(client->ioc, QIO_CHANNEL_SHUTDOWN_BOTH,
   1494                          NULL);
   1495 
   1496     /* Also tell the client, so that they release their reference.  */
   1497     if (client->close_fn) {
   1498         client->close_fn(client, negotiated);
   1499     }
   1500 }
   1501 
   1502 static NBDRequestData *nbd_request_get(NBDClient *client)
   1503 {
   1504     NBDRequestData *req;
   1505 
   1506     assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
   1507     client->nb_requests++;
   1508 
   1509     req = g_new0(NBDRequestData, 1);
   1510     nbd_client_get(client);
   1511     req->client = client;
   1512     return req;
   1513 }
   1514 
   1515 static void nbd_request_put(NBDRequestData *req)
   1516 {
   1517     NBDClient *client = req->client;
   1518 
   1519     if (req->data) {
   1520         qemu_vfree(req->data);
   1521     }
   1522     g_free(req);
   1523 
   1524     client->nb_requests--;
   1525 
   1526     if (client->quiescing && client->nb_requests == 0) {
   1527         aio_wait_kick();
   1528     }
   1529 
   1530     nbd_client_receive_next_request(client);
   1531 
   1532     nbd_client_put(client);
   1533 }
   1534 
   1535 static void blk_aio_attached(AioContext *ctx, void *opaque)
   1536 {
   1537     NBDExport *exp = opaque;
   1538     NBDClient *client;
   1539 
   1540     trace_nbd_blk_aio_attached(exp->name, ctx);
   1541 
   1542     exp->common.ctx = ctx;
   1543 
   1544     QTAILQ_FOREACH(client, &exp->clients, next) {
   1545         qio_channel_attach_aio_context(client->ioc, ctx);
   1546 
   1547         assert(client->nb_requests == 0);
   1548         assert(client->recv_coroutine == NULL);
   1549         assert(client->send_coroutine == NULL);
   1550     }
   1551 }
   1552 
   1553 static void blk_aio_detach(void *opaque)
   1554 {
   1555     NBDExport *exp = opaque;
   1556     NBDClient *client;
   1557 
   1558     trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
   1559 
   1560     QTAILQ_FOREACH(client, &exp->clients, next) {
   1561         qio_channel_detach_aio_context(client->ioc);
   1562     }
   1563 
   1564     exp->common.ctx = NULL;
   1565 }
   1566 
   1567 static void nbd_drained_begin(void *opaque)
   1568 {
   1569     NBDExport *exp = opaque;
   1570     NBDClient *client;
   1571 
   1572     QTAILQ_FOREACH(client, &exp->clients, next) {
   1573         client->quiescing = true;
   1574     }
   1575 }
   1576 
   1577 static void nbd_drained_end(void *opaque)
   1578 {
   1579     NBDExport *exp = opaque;
   1580     NBDClient *client;
   1581 
   1582     QTAILQ_FOREACH(client, &exp->clients, next) {
   1583         client->quiescing = false;
   1584         nbd_client_receive_next_request(client);
   1585     }
   1586 }
   1587 
   1588 static bool nbd_drained_poll(void *opaque)
   1589 {
   1590     NBDExport *exp = opaque;
   1591     NBDClient *client;
   1592 
   1593     QTAILQ_FOREACH(client, &exp->clients, next) {
   1594         if (client->nb_requests != 0) {
   1595             /*
   1596              * If there's a coroutine waiting for a request on nbd_read_eof()
   1597              * enter it here so we don't depend on the client to wake it up.
   1598              */
   1599             if (client->recv_coroutine != NULL && client->read_yielding) {
   1600                 qemu_aio_coroutine_enter(exp->common.ctx,
   1601                                          client->recv_coroutine);
   1602             }
   1603 
   1604             return true;
   1605         }
   1606     }
   1607 
   1608     return false;
   1609 }
   1610 
   1611 static void nbd_eject_notifier(Notifier *n, void *data)
   1612 {
   1613     NBDExport *exp = container_of(n, NBDExport, eject_notifier);
   1614 
   1615     blk_exp_request_shutdown(&exp->common);
   1616 }
   1617 
   1618 void nbd_export_set_on_eject_blk(BlockExport *exp, BlockBackend *blk)
   1619 {
   1620     NBDExport *nbd_exp = container_of(exp, NBDExport, common);
   1621     assert(exp->drv == &blk_exp_nbd);
   1622     assert(nbd_exp->eject_notifier_blk == NULL);
   1623 
   1624     blk_ref(blk);
   1625     nbd_exp->eject_notifier_blk = blk;
   1626     nbd_exp->eject_notifier.notify = nbd_eject_notifier;
   1627     blk_add_remove_bs_notifier(blk, &nbd_exp->eject_notifier);
   1628 }
   1629 
   1630 static const BlockDevOps nbd_block_ops = {
   1631     .drained_begin = nbd_drained_begin,
   1632     .drained_end = nbd_drained_end,
   1633     .drained_poll = nbd_drained_poll,
   1634 };
   1635 
   1636 static int nbd_export_create(BlockExport *blk_exp, BlockExportOptions *exp_args,
   1637                              Error **errp)
   1638 {
   1639     NBDExport *exp = container_of(blk_exp, NBDExport, common);
   1640     BlockExportOptionsNbd *arg = &exp_args->u.nbd;
   1641     BlockBackend *blk = blk_exp->blk;
   1642     int64_t size;
   1643     uint64_t perm, shared_perm;
   1644     bool readonly = !exp_args->writable;
   1645     BlockDirtyBitmapOrStrList *bitmaps;
   1646     size_t i;
   1647     int ret;
   1648 
   1649     assert(exp_args->type == BLOCK_EXPORT_TYPE_NBD);
   1650 
   1651     if (!nbd_server_is_running()) {
   1652         error_setg(errp, "NBD server not running");
   1653         return -EINVAL;
   1654     }
   1655 
   1656     if (!arg->has_name) {
   1657         arg->name = exp_args->node_name;
   1658     }
   1659 
   1660     if (strlen(arg->name) > NBD_MAX_STRING_SIZE) {
   1661         error_setg(errp, "export name '%s' too long", arg->name);
   1662         return -EINVAL;
   1663     }
   1664 
   1665     if (arg->description && strlen(arg->description) > NBD_MAX_STRING_SIZE) {
   1666         error_setg(errp, "description '%s' too long", arg->description);
   1667         return -EINVAL;
   1668     }
   1669 
   1670     if (nbd_export_find(arg->name)) {
   1671         error_setg(errp, "NBD server already has export named '%s'", arg->name);
   1672         return -EEXIST;
   1673     }
   1674 
   1675     size = blk_getlength(blk);
   1676     if (size < 0) {
   1677         error_setg_errno(errp, -size,
   1678                          "Failed to determine the NBD export's length");
   1679         return size;
   1680     }
   1681 
   1682     /* Don't allow resize while the NBD server is running, otherwise we don't
   1683      * care what happens with the node. */
   1684     blk_get_perm(blk, &perm, &shared_perm);
   1685     ret = blk_set_perm(blk, perm, shared_perm & ~BLK_PERM_RESIZE, errp);
   1686     if (ret < 0) {
   1687         return ret;
   1688     }
   1689 
   1690     QTAILQ_INIT(&exp->clients);
   1691     exp->name = g_strdup(arg->name);
   1692     exp->description = g_strdup(arg->description);
   1693     exp->nbdflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_FLUSH |
   1694                      NBD_FLAG_SEND_FUA | NBD_FLAG_SEND_CACHE);
   1695 
   1696     if (nbd_server_max_connections() != 1) {
   1697         exp->nbdflags |= NBD_FLAG_CAN_MULTI_CONN;
   1698     }
   1699     if (readonly) {
   1700         exp->nbdflags |= NBD_FLAG_READ_ONLY;
   1701     } else {
   1702         exp->nbdflags |= (NBD_FLAG_SEND_TRIM | NBD_FLAG_SEND_WRITE_ZEROES |
   1703                           NBD_FLAG_SEND_FAST_ZERO);
   1704     }
   1705     exp->size = QEMU_ALIGN_DOWN(size, BDRV_SECTOR_SIZE);
   1706 
   1707     for (bitmaps = arg->bitmaps; bitmaps; bitmaps = bitmaps->next) {
   1708         exp->nr_export_bitmaps++;
   1709     }
   1710     exp->export_bitmaps = g_new0(BdrvDirtyBitmap *, exp->nr_export_bitmaps);
   1711     for (i = 0, bitmaps = arg->bitmaps; bitmaps;
   1712          i++, bitmaps = bitmaps->next)
   1713     {
   1714         const char *bitmap;
   1715         BlockDriverState *bs = blk_bs(blk);
   1716         BdrvDirtyBitmap *bm = NULL;
   1717 
   1718         switch (bitmaps->value->type) {
   1719         case QTYPE_QSTRING:
   1720             bitmap = bitmaps->value->u.local;
   1721             while (bs) {
   1722                 bm = bdrv_find_dirty_bitmap(bs, bitmap);
   1723                 if (bm != NULL) {
   1724                     break;
   1725                 }
   1726 
   1727                 bs = bdrv_filter_or_cow_bs(bs);
   1728             }
   1729 
   1730             if (bm == NULL) {
   1731                 ret = -ENOENT;
   1732                 error_setg(errp, "Bitmap '%s' is not found",
   1733                            bitmaps->value->u.local);
   1734                 goto fail;
   1735             }
   1736 
   1737             if (readonly && bdrv_is_writable(bs) &&
   1738                 bdrv_dirty_bitmap_enabled(bm)) {
   1739                 ret = -EINVAL;
   1740                 error_setg(errp, "Enabled bitmap '%s' incompatible with "
   1741                            "readonly export", bitmap);
   1742                 goto fail;
   1743             }
   1744             break;
   1745         case QTYPE_QDICT:
   1746             bitmap = bitmaps->value->u.external.name;
   1747             bm = block_dirty_bitmap_lookup(bitmaps->value->u.external.node,
   1748                                            bitmap, NULL, errp);
   1749             if (!bm) {
   1750                 ret = -ENOENT;
   1751                 goto fail;
   1752             }
   1753             break;
   1754         default:
   1755             abort();
   1756         }
   1757 
   1758         assert(bm);
   1759 
   1760         if (bdrv_dirty_bitmap_check(bm, BDRV_BITMAP_ALLOW_RO, errp)) {
   1761             ret = -EINVAL;
   1762             goto fail;
   1763         }
   1764 
   1765         exp->export_bitmaps[i] = bm;
   1766         assert(strlen(bitmap) <= BDRV_BITMAP_MAX_NAME_SIZE);
   1767     }
   1768 
   1769     /* Mark bitmaps busy in a separate loop, to simplify roll-back concerns. */
   1770     for (i = 0; i < exp->nr_export_bitmaps; i++) {
   1771         bdrv_dirty_bitmap_set_busy(exp->export_bitmaps[i], true);
   1772     }
   1773 
   1774     exp->allocation_depth = arg->allocation_depth;
   1775 
   1776     /*
   1777      * We need to inhibit request queuing in the block layer to ensure we can
   1778      * be properly quiesced when entering a drained section, as our coroutines
   1779      * servicing pending requests might enter blk_pread().
   1780      */
   1781     blk_set_disable_request_queuing(blk, true);
   1782 
   1783     blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
   1784 
   1785     blk_set_dev_ops(blk, &nbd_block_ops, exp);
   1786 
   1787     QTAILQ_INSERT_TAIL(&exports, exp, next);
   1788 
   1789     return 0;
   1790 
   1791 fail:
   1792     g_free(exp->export_bitmaps);
   1793     g_free(exp->name);
   1794     g_free(exp->description);
   1795     return ret;
   1796 }
   1797 
   1798 NBDExport *nbd_export_find(const char *name)
   1799 {
   1800     NBDExport *exp;
   1801     QTAILQ_FOREACH(exp, &exports, next) {
   1802         if (strcmp(name, exp->name) == 0) {
   1803             return exp;
   1804         }
   1805     }
   1806 
   1807     return NULL;
   1808 }
   1809 
   1810 AioContext *
   1811 nbd_export_aio_context(NBDExport *exp)
   1812 {
   1813     return exp->common.ctx;
   1814 }
   1815 
   1816 static void nbd_export_request_shutdown(BlockExport *blk_exp)
   1817 {
   1818     NBDExport *exp = container_of(blk_exp, NBDExport, common);
   1819     NBDClient *client, *next;
   1820 
   1821     blk_exp_ref(&exp->common);
   1822     /*
   1823      * TODO: Should we expand QMP NbdServerRemoveNode enum to allow a
   1824      * close mode that stops advertising the export to new clients but
   1825      * still permits existing clients to run to completion? Because of
   1826      * that possibility, nbd_export_close() can be called more than
   1827      * once on an export.
   1828      */
   1829     QTAILQ_FOREACH_SAFE(client, &exp->clients, next, next) {
   1830         client_close(client, true);
   1831     }
   1832     if (exp->name) {
   1833         g_free(exp->name);
   1834         exp->name = NULL;
   1835         QTAILQ_REMOVE(&exports, exp, next);
   1836     }
   1837     blk_exp_unref(&exp->common);
   1838 }
   1839 
   1840 static void nbd_export_delete(BlockExport *blk_exp)
   1841 {
   1842     size_t i;
   1843     NBDExport *exp = container_of(blk_exp, NBDExport, common);
   1844 
   1845     assert(exp->name == NULL);
   1846     assert(QTAILQ_EMPTY(&exp->clients));
   1847 
   1848     g_free(exp->description);
   1849     exp->description = NULL;
   1850 
   1851     if (exp->common.blk) {
   1852         if (exp->eject_notifier_blk) {
   1853             notifier_remove(&exp->eject_notifier);
   1854             blk_unref(exp->eject_notifier_blk);
   1855         }
   1856         blk_remove_aio_context_notifier(exp->common.blk, blk_aio_attached,
   1857                                         blk_aio_detach, exp);
   1858         blk_set_disable_request_queuing(exp->common.blk, false);
   1859     }
   1860 
   1861     for (i = 0; i < exp->nr_export_bitmaps; i++) {
   1862         bdrv_dirty_bitmap_set_busy(exp->export_bitmaps[i], false);
   1863     }
   1864 }
   1865 
   1866 const BlockExportDriver blk_exp_nbd = {
   1867     .type               = BLOCK_EXPORT_TYPE_NBD,
   1868     .instance_size      = sizeof(NBDExport),
   1869     .create             = nbd_export_create,
   1870     .delete             = nbd_export_delete,
   1871     .request_shutdown   = nbd_export_request_shutdown,
   1872 };
   1873 
   1874 static int coroutine_fn nbd_co_send_iov(NBDClient *client, struct iovec *iov,
   1875                                         unsigned niov, Error **errp)
   1876 {
   1877     int ret;
   1878 
   1879     g_assert(qemu_in_coroutine());
   1880     qemu_co_mutex_lock(&client->send_lock);
   1881     client->send_coroutine = qemu_coroutine_self();
   1882 
   1883     ret = qio_channel_writev_all(client->ioc, iov, niov, errp) < 0 ? -EIO : 0;
   1884 
   1885     client->send_coroutine = NULL;
   1886     qemu_co_mutex_unlock(&client->send_lock);
   1887 
   1888     return ret;
   1889 }
   1890 
   1891 static inline void set_be_simple_reply(NBDSimpleReply *reply, uint64_t error,
   1892                                        uint64_t handle)
   1893 {
   1894     stl_be_p(&reply->magic, NBD_SIMPLE_REPLY_MAGIC);
   1895     stl_be_p(&reply->error, error);
   1896     stq_be_p(&reply->handle, handle);
   1897 }
   1898 
   1899 static int nbd_co_send_simple_reply(NBDClient *client,
   1900                                     uint64_t handle,
   1901                                     uint32_t error,
   1902                                     void *data,
   1903                                     size_t len,
   1904                                     Error **errp)
   1905 {
   1906     NBDSimpleReply reply;
   1907     int nbd_err = system_errno_to_nbd_errno(error);
   1908     struct iovec iov[] = {
   1909         {.iov_base = &reply, .iov_len = sizeof(reply)},
   1910         {.iov_base = data, .iov_len = len}
   1911     };
   1912 
   1913     trace_nbd_co_send_simple_reply(handle, nbd_err, nbd_err_lookup(nbd_err),
   1914                                    len);
   1915     set_be_simple_reply(&reply, nbd_err, handle);
   1916 
   1917     return nbd_co_send_iov(client, iov, len ? 2 : 1, errp);
   1918 }
   1919 
   1920 static inline void set_be_chunk(NBDStructuredReplyChunk *chunk, uint16_t flags,
   1921                                 uint16_t type, uint64_t handle, uint32_t length)
   1922 {
   1923     stl_be_p(&chunk->magic, NBD_STRUCTURED_REPLY_MAGIC);
   1924     stw_be_p(&chunk->flags, flags);
   1925     stw_be_p(&chunk->type, type);
   1926     stq_be_p(&chunk->handle, handle);
   1927     stl_be_p(&chunk->length, length);
   1928 }
   1929 
   1930 static int coroutine_fn nbd_co_send_structured_done(NBDClient *client,
   1931                                                     uint64_t handle,
   1932                                                     Error **errp)
   1933 {
   1934     NBDStructuredReplyChunk chunk;
   1935     struct iovec iov[] = {
   1936         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
   1937     };
   1938 
   1939     trace_nbd_co_send_structured_done(handle);
   1940     set_be_chunk(&chunk, NBD_REPLY_FLAG_DONE, NBD_REPLY_TYPE_NONE, handle, 0);
   1941 
   1942     return nbd_co_send_iov(client, iov, 1, errp);
   1943 }
   1944 
   1945 static int coroutine_fn nbd_co_send_structured_read(NBDClient *client,
   1946                                                     uint64_t handle,
   1947                                                     uint64_t offset,
   1948                                                     void *data,
   1949                                                     size_t size,
   1950                                                     bool final,
   1951                                                     Error **errp)
   1952 {
   1953     NBDStructuredReadData chunk;
   1954     struct iovec iov[] = {
   1955         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
   1956         {.iov_base = data, .iov_len = size}
   1957     };
   1958 
   1959     assert(size);
   1960     trace_nbd_co_send_structured_read(handle, offset, data, size);
   1961     set_be_chunk(&chunk.h, final ? NBD_REPLY_FLAG_DONE : 0,
   1962                  NBD_REPLY_TYPE_OFFSET_DATA, handle,
   1963                  sizeof(chunk) - sizeof(chunk.h) + size);
   1964     stq_be_p(&chunk.offset, offset);
   1965 
   1966     return nbd_co_send_iov(client, iov, 2, errp);
   1967 }
   1968 
   1969 static int coroutine_fn nbd_co_send_structured_error(NBDClient *client,
   1970                                                      uint64_t handle,
   1971                                                      uint32_t error,
   1972                                                      const char *msg,
   1973                                                      Error **errp)
   1974 {
   1975     NBDStructuredError chunk;
   1976     int nbd_err = system_errno_to_nbd_errno(error);
   1977     struct iovec iov[] = {
   1978         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
   1979         {.iov_base = (char *)msg, .iov_len = msg ? strlen(msg) : 0},
   1980     };
   1981 
   1982     assert(nbd_err);
   1983     trace_nbd_co_send_structured_error(handle, nbd_err,
   1984                                        nbd_err_lookup(nbd_err), msg ? msg : "");
   1985     set_be_chunk(&chunk.h, NBD_REPLY_FLAG_DONE, NBD_REPLY_TYPE_ERROR, handle,
   1986                  sizeof(chunk) - sizeof(chunk.h) + iov[1].iov_len);
   1987     stl_be_p(&chunk.error, nbd_err);
   1988     stw_be_p(&chunk.message_length, iov[1].iov_len);
   1989 
   1990     return nbd_co_send_iov(client, iov, 1 + !!iov[1].iov_len, errp);
   1991 }
   1992 
   1993 /* Do a sparse read and send the structured reply to the client.
   1994  * Returns -errno if sending fails. bdrv_block_status_above() failure is
   1995  * reported to the client, at which point this function succeeds.
   1996  */
   1997 static int coroutine_fn nbd_co_send_sparse_read(NBDClient *client,
   1998                                                 uint64_t handle,
   1999                                                 uint64_t offset,
   2000                                                 uint8_t *data,
   2001                                                 size_t size,
   2002                                                 Error **errp)
   2003 {
   2004     int ret = 0;
   2005     NBDExport *exp = client->exp;
   2006     size_t progress = 0;
   2007 
   2008     while (progress < size) {
   2009         int64_t pnum;
   2010         int status = bdrv_block_status_above(blk_bs(exp->common.blk), NULL,
   2011                                              offset + progress,
   2012                                              size - progress, &pnum, NULL,
   2013                                              NULL);
   2014         bool final;
   2015 
   2016         if (status < 0) {
   2017             char *msg = g_strdup_printf("unable to check for holes: %s",
   2018                                         strerror(-status));
   2019 
   2020             ret = nbd_co_send_structured_error(client, handle, -status, msg,
   2021                                                errp);
   2022             g_free(msg);
   2023             return ret;
   2024         }
   2025         assert(pnum && pnum <= size - progress);
   2026         final = progress + pnum == size;
   2027         if (status & BDRV_BLOCK_ZERO) {
   2028             NBDStructuredReadHole chunk;
   2029             struct iovec iov[] = {
   2030                 {.iov_base = &chunk, .iov_len = sizeof(chunk)},
   2031             };
   2032 
   2033             trace_nbd_co_send_structured_read_hole(handle, offset + progress,
   2034                                                    pnum);
   2035             set_be_chunk(&chunk.h, final ? NBD_REPLY_FLAG_DONE : 0,
   2036                          NBD_REPLY_TYPE_OFFSET_HOLE,
   2037                          handle, sizeof(chunk) - sizeof(chunk.h));
   2038             stq_be_p(&chunk.offset, offset + progress);
   2039             stl_be_p(&chunk.length, pnum);
   2040             ret = nbd_co_send_iov(client, iov, 1, errp);
   2041         } else {
   2042             ret = blk_pread(exp->common.blk, offset + progress, pnum,
   2043                             data + progress, 0);
   2044             if (ret < 0) {
   2045                 error_setg_errno(errp, -ret, "reading from file failed");
   2046                 break;
   2047             }
   2048             ret = nbd_co_send_structured_read(client, handle, offset + progress,
   2049                                               data + progress, pnum, final,
   2050                                               errp);
   2051         }
   2052 
   2053         if (ret < 0) {
   2054             break;
   2055         }
   2056         progress += pnum;
   2057     }
   2058     return ret;
   2059 }
   2060 
   2061 typedef struct NBDExtentArray {
   2062     NBDExtent *extents;
   2063     unsigned int nb_alloc;
   2064     unsigned int count;
   2065     uint64_t total_length;
   2066     bool can_add;
   2067     bool converted_to_be;
   2068 } NBDExtentArray;
   2069 
   2070 static NBDExtentArray *nbd_extent_array_new(unsigned int nb_alloc)
   2071 {
   2072     NBDExtentArray *ea = g_new0(NBDExtentArray, 1);
   2073 
   2074     ea->nb_alloc = nb_alloc;
   2075     ea->extents = g_new(NBDExtent, nb_alloc);
   2076     ea->can_add = true;
   2077 
   2078     return ea;
   2079 }
   2080 
   2081 static void nbd_extent_array_free(NBDExtentArray *ea)
   2082 {
   2083     g_free(ea->extents);
   2084     g_free(ea);
   2085 }
   2086 G_DEFINE_AUTOPTR_CLEANUP_FUNC(NBDExtentArray, nbd_extent_array_free)
   2087 
   2088 /* Further modifications of the array after conversion are abandoned */
   2089 static void nbd_extent_array_convert_to_be(NBDExtentArray *ea)
   2090 {
   2091     int i;
   2092 
   2093     assert(!ea->converted_to_be);
   2094     ea->can_add = false;
   2095     ea->converted_to_be = true;
   2096 
   2097     for (i = 0; i < ea->count; i++) {
   2098         ea->extents[i].flags = cpu_to_be32(ea->extents[i].flags);
   2099         ea->extents[i].length = cpu_to_be32(ea->extents[i].length);
   2100     }
   2101 }
   2102 
   2103 /*
   2104  * Add extent to NBDExtentArray. If extent can't be added (no available space),
   2105  * return -1.
   2106  * For safety, when returning -1 for the first time, .can_add is set to false,
   2107  * and further calls to nbd_extent_array_add() will crash.
   2108  * (this avoids the situation where a caller ignores failure to add one extent,
   2109  * where adding another extent that would squash into the last array entry
   2110  * would result in an incorrect range reported to the client)
   2111  */
   2112 static int nbd_extent_array_add(NBDExtentArray *ea,
   2113                                 uint32_t length, uint32_t flags)
   2114 {
   2115     assert(ea->can_add);
   2116 
   2117     if (!length) {
   2118         return 0;
   2119     }
   2120 
   2121     /* Extend previous extent if flags are the same */
   2122     if (ea->count > 0 && flags == ea->extents[ea->count - 1].flags) {
   2123         uint64_t sum = (uint64_t)length + ea->extents[ea->count - 1].length;
   2124 
   2125         if (sum <= UINT32_MAX) {
   2126             ea->extents[ea->count - 1].length = sum;
   2127             ea->total_length += length;
   2128             return 0;
   2129         }
   2130     }
   2131 
   2132     if (ea->count >= ea->nb_alloc) {
   2133         ea->can_add = false;
   2134         return -1;
   2135     }
   2136 
   2137     ea->total_length += length;
   2138     ea->extents[ea->count] = (NBDExtent) {.length = length, .flags = flags};
   2139     ea->count++;
   2140 
   2141     return 0;
   2142 }
   2143 
   2144 static int blockstatus_to_extents(BlockDriverState *bs, uint64_t offset,
   2145                                   uint64_t bytes, NBDExtentArray *ea)
   2146 {
   2147     while (bytes) {
   2148         uint32_t flags;
   2149         int64_t num;
   2150         int ret = bdrv_block_status_above(bs, NULL, offset, bytes, &num,
   2151                                           NULL, NULL);
   2152 
   2153         if (ret < 0) {
   2154             return ret;
   2155         }
   2156 
   2157         flags = (ret & BDRV_BLOCK_DATA ? 0 : NBD_STATE_HOLE) |
   2158                 (ret & BDRV_BLOCK_ZERO ? NBD_STATE_ZERO : 0);
   2159 
   2160         if (nbd_extent_array_add(ea, num, flags) < 0) {
   2161             return 0;
   2162         }
   2163 
   2164         offset += num;
   2165         bytes -= num;
   2166     }
   2167 
   2168     return 0;
   2169 }
   2170 
   2171 static int blockalloc_to_extents(BlockDriverState *bs, uint64_t offset,
   2172                                  uint64_t bytes, NBDExtentArray *ea)
   2173 {
   2174     while (bytes) {
   2175         int64_t num;
   2176         int ret = bdrv_is_allocated_above(bs, NULL, false, offset, bytes,
   2177                                           &num);
   2178 
   2179         if (ret < 0) {
   2180             return ret;
   2181         }
   2182 
   2183         if (nbd_extent_array_add(ea, num, ret) < 0) {
   2184             return 0;
   2185         }
   2186 
   2187         offset += num;
   2188         bytes -= num;
   2189     }
   2190 
   2191     return 0;
   2192 }
   2193 
   2194 /*
   2195  * nbd_co_send_extents
   2196  *
   2197  * @ea is converted to BE by the function
   2198  * @last controls whether NBD_REPLY_FLAG_DONE is sent.
   2199  */
   2200 static int nbd_co_send_extents(NBDClient *client, uint64_t handle,
   2201                                NBDExtentArray *ea,
   2202                                bool last, uint32_t context_id, Error **errp)
   2203 {
   2204     NBDStructuredMeta chunk;
   2205     struct iovec iov[] = {
   2206         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
   2207         {.iov_base = ea->extents, .iov_len = ea->count * sizeof(ea->extents[0])}
   2208     };
   2209 
   2210     nbd_extent_array_convert_to_be(ea);
   2211 
   2212     trace_nbd_co_send_extents(handle, ea->count, context_id, ea->total_length,
   2213                               last);
   2214     set_be_chunk(&chunk.h, last ? NBD_REPLY_FLAG_DONE : 0,
   2215                  NBD_REPLY_TYPE_BLOCK_STATUS,
   2216                  handle, sizeof(chunk) - sizeof(chunk.h) + iov[1].iov_len);
   2217     stl_be_p(&chunk.context_id, context_id);
   2218 
   2219     return nbd_co_send_iov(client, iov, 2, errp);
   2220 }
   2221 
   2222 /* Get block status from the exported device and send it to the client */
   2223 static int nbd_co_send_block_status(NBDClient *client, uint64_t handle,
   2224                                     BlockDriverState *bs, uint64_t offset,
   2225                                     uint32_t length, bool dont_fragment,
   2226                                     bool last, uint32_t context_id,
   2227                                     Error **errp)
   2228 {
   2229     int ret;
   2230     unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
   2231     g_autoptr(NBDExtentArray) ea = nbd_extent_array_new(nb_extents);
   2232 
   2233     if (context_id == NBD_META_ID_BASE_ALLOCATION) {
   2234         ret = blockstatus_to_extents(bs, offset, length, ea);
   2235     } else {
   2236         ret = blockalloc_to_extents(bs, offset, length, ea);
   2237     }
   2238     if (ret < 0) {
   2239         return nbd_co_send_structured_error(
   2240                 client, handle, -ret, "can't get block status", errp);
   2241     }
   2242 
   2243     return nbd_co_send_extents(client, handle, ea, last, context_id, errp);
   2244 }
   2245 
   2246 /* Populate @ea from a dirty bitmap. */
   2247 static void bitmap_to_extents(BdrvDirtyBitmap *bitmap,
   2248                               uint64_t offset, uint64_t length,
   2249                               NBDExtentArray *es)
   2250 {
   2251     int64_t start, dirty_start, dirty_count;
   2252     int64_t end = offset + length;
   2253     bool full = false;
   2254 
   2255     bdrv_dirty_bitmap_lock(bitmap);
   2256 
   2257     for (start = offset;
   2258          bdrv_dirty_bitmap_next_dirty_area(bitmap, start, end, INT32_MAX,
   2259                                            &dirty_start, &dirty_count);
   2260          start = dirty_start + dirty_count)
   2261     {
   2262         if ((nbd_extent_array_add(es, dirty_start - start, 0) < 0) ||
   2263             (nbd_extent_array_add(es, dirty_count, NBD_STATE_DIRTY) < 0))
   2264         {
   2265             full = true;
   2266             break;
   2267         }
   2268     }
   2269 
   2270     if (!full) {
   2271         /* last non dirty extent, nothing to do if array is now full */
   2272         (void) nbd_extent_array_add(es, end - start, 0);
   2273     }
   2274 
   2275     bdrv_dirty_bitmap_unlock(bitmap);
   2276 }
   2277 
   2278 static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle,
   2279                               BdrvDirtyBitmap *bitmap, uint64_t offset,
   2280                               uint32_t length, bool dont_fragment, bool last,
   2281                               uint32_t context_id, Error **errp)
   2282 {
   2283     unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
   2284     g_autoptr(NBDExtentArray) ea = nbd_extent_array_new(nb_extents);
   2285 
   2286     bitmap_to_extents(bitmap, offset, length, ea);
   2287 
   2288     return nbd_co_send_extents(client, handle, ea, last, context_id, errp);
   2289 }
   2290 
   2291 /* nbd_co_receive_request
   2292  * Collect a client request. Return 0 if request looks valid, -EIO to drop
   2293  * connection right away, -EAGAIN to indicate we were interrupted and the
   2294  * channel should be quiesced, and any other negative value to report an error
   2295  * to the client (although the caller may still need to disconnect after
   2296  * reporting the error).
   2297  */
   2298 static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
   2299                                   Error **errp)
   2300 {
   2301     NBDClient *client = req->client;
   2302     int valid_flags;
   2303     int ret;
   2304 
   2305     g_assert(qemu_in_coroutine());
   2306     assert(client->recv_coroutine == qemu_coroutine_self());
   2307     ret = nbd_receive_request(client, request, errp);
   2308     if (ret < 0) {
   2309         return ret;
   2310     }
   2311 
   2312     trace_nbd_co_receive_request_decode_type(request->handle, request->type,
   2313                                              nbd_cmd_lookup(request->type));
   2314 
   2315     if (request->type != NBD_CMD_WRITE) {
   2316         /* No payload, we are ready to read the next request.  */
   2317         req->complete = true;
   2318     }
   2319 
   2320     if (request->type == NBD_CMD_DISC) {
   2321         /* Special case: we're going to disconnect without a reply,
   2322          * whether or not flags, from, or len are bogus */
   2323         return -EIO;
   2324     }
   2325 
   2326     if (request->type == NBD_CMD_READ || request->type == NBD_CMD_WRITE ||
   2327         request->type == NBD_CMD_CACHE)
   2328     {
   2329         if (request->len > NBD_MAX_BUFFER_SIZE) {
   2330             error_setg(errp, "len (%" PRIu32" ) is larger than max len (%u)",
   2331                        request->len, NBD_MAX_BUFFER_SIZE);
   2332             return -EINVAL;
   2333         }
   2334 
   2335         if (request->type != NBD_CMD_CACHE) {
   2336             req->data = blk_try_blockalign(client->exp->common.blk,
   2337                                            request->len);
   2338             if (req->data == NULL) {
   2339                 error_setg(errp, "No memory");
   2340                 return -ENOMEM;
   2341             }
   2342         }
   2343     }
   2344 
   2345     if (request->type == NBD_CMD_WRITE) {
   2346         if (nbd_read(client->ioc, req->data, request->len, "CMD_WRITE data",
   2347                      errp) < 0)
   2348         {
   2349             return -EIO;
   2350         }
   2351         req->complete = true;
   2352 
   2353         trace_nbd_co_receive_request_payload_received(request->handle,
   2354                                                       request->len);
   2355     }
   2356 
   2357     /* Sanity checks. */
   2358     if (client->exp->nbdflags & NBD_FLAG_READ_ONLY &&
   2359         (request->type == NBD_CMD_WRITE ||
   2360          request->type == NBD_CMD_WRITE_ZEROES ||
   2361          request->type == NBD_CMD_TRIM)) {
   2362         error_setg(errp, "Export is read-only");
   2363         return -EROFS;
   2364     }
   2365     if (request->from > client->exp->size ||
   2366         request->len > client->exp->size - request->from) {
   2367         error_setg(errp, "operation past EOF; From: %" PRIu64 ", Len: %" PRIu32
   2368                    ", Size: %" PRIu64, request->from, request->len,
   2369                    client->exp->size);
   2370         return (request->type == NBD_CMD_WRITE ||
   2371                 request->type == NBD_CMD_WRITE_ZEROES) ? -ENOSPC : -EINVAL;
   2372     }
   2373     if (client->check_align && !QEMU_IS_ALIGNED(request->from | request->len,
   2374                                                 client->check_align)) {
   2375         /*
   2376          * The block layer gracefully handles unaligned requests, but
   2377          * it's still worth tracing client non-compliance
   2378          */
   2379         trace_nbd_co_receive_align_compliance(nbd_cmd_lookup(request->type),
   2380                                               request->from,
   2381                                               request->len,
   2382                                               client->check_align);
   2383     }
   2384     valid_flags = NBD_CMD_FLAG_FUA;
   2385     if (request->type == NBD_CMD_READ && client->structured_reply) {
   2386         valid_flags |= NBD_CMD_FLAG_DF;
   2387     } else if (request->type == NBD_CMD_WRITE_ZEROES) {
   2388         valid_flags |= NBD_CMD_FLAG_NO_HOLE | NBD_CMD_FLAG_FAST_ZERO;
   2389     } else if (request->type == NBD_CMD_BLOCK_STATUS) {
   2390         valid_flags |= NBD_CMD_FLAG_REQ_ONE;
   2391     }
   2392     if (request->flags & ~valid_flags) {
   2393         error_setg(errp, "unsupported flags for command %s (got 0x%x)",
   2394                    nbd_cmd_lookup(request->type), request->flags);
   2395         return -EINVAL;
   2396     }
   2397 
   2398     return 0;
   2399 }
   2400 
   2401 /* Send simple reply without a payload, or a structured error
   2402  * @error_msg is ignored if @ret >= 0
   2403  * Returns 0 if connection is still live, -errno on failure to talk to client
   2404  */
   2405 static coroutine_fn int nbd_send_generic_reply(NBDClient *client,
   2406                                                uint64_t handle,
   2407                                                int ret,
   2408                                                const char *error_msg,
   2409                                                Error **errp)
   2410 {
   2411     if (client->structured_reply && ret < 0) {
   2412         return nbd_co_send_structured_error(client, handle, -ret, error_msg,
   2413                                             errp);
   2414     } else {
   2415         return nbd_co_send_simple_reply(client, handle, ret < 0 ? -ret : 0,
   2416                                         NULL, 0, errp);
   2417     }
   2418 }
   2419 
   2420 /* Handle NBD_CMD_READ request.
   2421  * Return -errno if sending fails. Other errors are reported directly to the
   2422  * client as an error reply. */
   2423 static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request,
   2424                                         uint8_t *data, Error **errp)
   2425 {
   2426     int ret;
   2427     NBDExport *exp = client->exp;
   2428 
   2429     assert(request->type == NBD_CMD_READ);
   2430 
   2431     /* XXX: NBD Protocol only documents use of FUA with WRITE */
   2432     if (request->flags & NBD_CMD_FLAG_FUA) {
   2433         ret = blk_co_flush(exp->common.blk);
   2434         if (ret < 0) {
   2435             return nbd_send_generic_reply(client, request->handle, ret,
   2436                                           "flush failed", errp);
   2437         }
   2438     }
   2439 
   2440     if (client->structured_reply && !(request->flags & NBD_CMD_FLAG_DF) &&
   2441         request->len)
   2442     {
   2443         return nbd_co_send_sparse_read(client, request->handle, request->from,
   2444                                        data, request->len, errp);
   2445     }
   2446 
   2447     ret = blk_pread(exp->common.blk, request->from, request->len, data, 0);
   2448     if (ret < 0) {
   2449         return nbd_send_generic_reply(client, request->handle, ret,
   2450                                       "reading from file failed", errp);
   2451     }
   2452 
   2453     if (client->structured_reply) {
   2454         if (request->len) {
   2455             return nbd_co_send_structured_read(client, request->handle,
   2456                                                request->from, data,
   2457                                                request->len, true, errp);
   2458         } else {
   2459             return nbd_co_send_structured_done(client, request->handle, errp);
   2460         }
   2461     } else {
   2462         return nbd_co_send_simple_reply(client, request->handle, 0,
   2463                                         data, request->len, errp);
   2464     }
   2465 }
   2466 
   2467 /*
   2468  * nbd_do_cmd_cache
   2469  *
   2470  * Handle NBD_CMD_CACHE request.
   2471  * Return -errno if sending fails. Other errors are reported directly to the
   2472  * client as an error reply.
   2473  */
   2474 static coroutine_fn int nbd_do_cmd_cache(NBDClient *client, NBDRequest *request,
   2475                                          Error **errp)
   2476 {
   2477     int ret;
   2478     NBDExport *exp = client->exp;
   2479 
   2480     assert(request->type == NBD_CMD_CACHE);
   2481 
   2482     ret = blk_co_preadv(exp->common.blk, request->from, request->len,
   2483                         NULL, BDRV_REQ_COPY_ON_READ | BDRV_REQ_PREFETCH);
   2484 
   2485     return nbd_send_generic_reply(client, request->handle, ret,
   2486                                   "caching data failed", errp);
   2487 }
   2488 
   2489 /* Handle NBD request.
   2490  * Return -errno if sending fails. Other errors are reported directly to the
   2491  * client as an error reply. */
   2492 static coroutine_fn int nbd_handle_request(NBDClient *client,
   2493                                            NBDRequest *request,
   2494                                            uint8_t *data, Error **errp)
   2495 {
   2496     int ret;
   2497     int flags;
   2498     NBDExport *exp = client->exp;
   2499     char *msg;
   2500     size_t i;
   2501 
   2502     switch (request->type) {
   2503     case NBD_CMD_CACHE:
   2504         return nbd_do_cmd_cache(client, request, errp);
   2505 
   2506     case NBD_CMD_READ:
   2507         return nbd_do_cmd_read(client, request, data, errp);
   2508 
   2509     case NBD_CMD_WRITE:
   2510         flags = 0;
   2511         if (request->flags & NBD_CMD_FLAG_FUA) {
   2512             flags |= BDRV_REQ_FUA;
   2513         }
   2514         ret = blk_pwrite(exp->common.blk, request->from, request->len, data,
   2515                          flags);
   2516         return nbd_send_generic_reply(client, request->handle, ret,
   2517                                       "writing to file failed", errp);
   2518 
   2519     case NBD_CMD_WRITE_ZEROES:
   2520         flags = 0;
   2521         if (request->flags & NBD_CMD_FLAG_FUA) {
   2522             flags |= BDRV_REQ_FUA;
   2523         }
   2524         if (!(request->flags & NBD_CMD_FLAG_NO_HOLE)) {
   2525             flags |= BDRV_REQ_MAY_UNMAP;
   2526         }
   2527         if (request->flags & NBD_CMD_FLAG_FAST_ZERO) {
   2528             flags |= BDRV_REQ_NO_FALLBACK;
   2529         }
   2530         ret = blk_pwrite_zeroes(exp->common.blk, request->from, request->len,
   2531                                 flags);
   2532         return nbd_send_generic_reply(client, request->handle, ret,
   2533                                       "writing to file failed", errp);
   2534 
   2535     case NBD_CMD_DISC:
   2536         /* unreachable, thanks to special case in nbd_co_receive_request() */
   2537         abort();
   2538 
   2539     case NBD_CMD_FLUSH:
   2540         ret = blk_co_flush(exp->common.blk);
   2541         return nbd_send_generic_reply(client, request->handle, ret,
   2542                                       "flush failed", errp);
   2543 
   2544     case NBD_CMD_TRIM:
   2545         ret = blk_co_pdiscard(exp->common.blk, request->from, request->len);
   2546         if (ret >= 0 && request->flags & NBD_CMD_FLAG_FUA) {
   2547             ret = blk_co_flush(exp->common.blk);
   2548         }
   2549         return nbd_send_generic_reply(client, request->handle, ret,
   2550                                       "discard failed", errp);
   2551 
   2552     case NBD_CMD_BLOCK_STATUS:
   2553         if (!request->len) {
   2554             return nbd_send_generic_reply(client, request->handle, -EINVAL,
   2555                                           "need non-zero length", errp);
   2556         }
   2557         if (client->export_meta.count) {
   2558             bool dont_fragment = request->flags & NBD_CMD_FLAG_REQ_ONE;
   2559             int contexts_remaining = client->export_meta.count;
   2560 
   2561             if (client->export_meta.base_allocation) {
   2562                 ret = nbd_co_send_block_status(client, request->handle,
   2563                                                blk_bs(exp->common.blk),
   2564                                                request->from,
   2565                                                request->len, dont_fragment,
   2566                                                !--contexts_remaining,
   2567                                                NBD_META_ID_BASE_ALLOCATION,
   2568                                                errp);
   2569                 if (ret < 0) {
   2570                     return ret;
   2571                 }
   2572             }
   2573 
   2574             if (client->export_meta.allocation_depth) {
   2575                 ret = nbd_co_send_block_status(client, request->handle,
   2576                                                blk_bs(exp->common.blk),
   2577                                                request->from, request->len,
   2578                                                dont_fragment,
   2579                                                !--contexts_remaining,
   2580                                                NBD_META_ID_ALLOCATION_DEPTH,
   2581                                                errp);
   2582                 if (ret < 0) {
   2583                     return ret;
   2584                 }
   2585             }
   2586 
   2587             for (i = 0; i < client->exp->nr_export_bitmaps; i++) {
   2588                 if (!client->export_meta.bitmaps[i]) {
   2589                     continue;
   2590                 }
   2591                 ret = nbd_co_send_bitmap(client, request->handle,
   2592                                          client->exp->export_bitmaps[i],
   2593                                          request->from, request->len,
   2594                                          dont_fragment, !--contexts_remaining,
   2595                                          NBD_META_ID_DIRTY_BITMAP + i, errp);
   2596                 if (ret < 0) {
   2597                     return ret;
   2598                 }
   2599             }
   2600 
   2601             assert(!contexts_remaining);
   2602 
   2603             return 0;
   2604         } else {
   2605             return nbd_send_generic_reply(client, request->handle, -EINVAL,
   2606                                           "CMD_BLOCK_STATUS not negotiated",
   2607                                           errp);
   2608         }
   2609 
   2610     default:
   2611         msg = g_strdup_printf("invalid request type (%" PRIu32 ") received",
   2612                               request->type);
   2613         ret = nbd_send_generic_reply(client, request->handle, -EINVAL, msg,
   2614                                      errp);
   2615         g_free(msg);
   2616         return ret;
   2617     }
   2618 }
   2619 
   2620 /* Owns a reference to the NBDClient passed as opaque.  */
   2621 static coroutine_fn void nbd_trip(void *opaque)
   2622 {
   2623     NBDClient *client = opaque;
   2624     NBDRequestData *req;
   2625     NBDRequest request = { 0 };    /* GCC thinks it can be used uninitialized */
   2626     int ret;
   2627     Error *local_err = NULL;
   2628 
   2629     trace_nbd_trip();
   2630     if (client->closing) {
   2631         nbd_client_put(client);
   2632         return;
   2633     }
   2634 
   2635     if (client->quiescing) {
   2636         /*
   2637          * We're switching between AIO contexts. Don't attempt to receive a new
   2638          * request and kick the main context which may be waiting for us.
   2639          */
   2640         nbd_client_put(client);
   2641         client->recv_coroutine = NULL;
   2642         aio_wait_kick();
   2643         return;
   2644     }
   2645 
   2646     req = nbd_request_get(client);
   2647     ret = nbd_co_receive_request(req, &request, &local_err);
   2648     client->recv_coroutine = NULL;
   2649 
   2650     if (client->closing) {
   2651         /*
   2652          * The client may be closed when we are blocked in
   2653          * nbd_co_receive_request()
   2654          */
   2655         goto done;
   2656     }
   2657 
   2658     if (ret == -EAGAIN) {
   2659         assert(client->quiescing);
   2660         goto done;
   2661     }
   2662 
   2663     nbd_client_receive_next_request(client);
   2664     if (ret == -EIO) {
   2665         goto disconnect;
   2666     }
   2667 
   2668     if (ret < 0) {
   2669         /* It wasn't -EIO, so, according to nbd_co_receive_request()
   2670          * semantics, we should return the error to the client. */
   2671         Error *export_err = local_err;
   2672 
   2673         local_err = NULL;
   2674         ret = nbd_send_generic_reply(client, request.handle, -EINVAL,
   2675                                      error_get_pretty(export_err), &local_err);
   2676         error_free(export_err);
   2677     } else {
   2678         ret = nbd_handle_request(client, &request, req->data, &local_err);
   2679     }
   2680     if (ret < 0) {
   2681         error_prepend(&local_err, "Failed to send reply: ");
   2682         goto disconnect;
   2683     }
   2684 
   2685     /* We must disconnect after NBD_CMD_WRITE if we did not
   2686      * read the payload.
   2687      */
   2688     if (!req->complete) {
   2689         error_setg(&local_err, "Request handling failed in intermediate state");
   2690         goto disconnect;
   2691     }
   2692 
   2693 done:
   2694     nbd_request_put(req);
   2695     nbd_client_put(client);
   2696     return;
   2697 
   2698 disconnect:
   2699     if (local_err) {
   2700         error_reportf_err(local_err, "Disconnect client, due to: ");
   2701     }
   2702     nbd_request_put(req);
   2703     client_close(client, true);
   2704     nbd_client_put(client);
   2705 }
   2706 
   2707 static void nbd_client_receive_next_request(NBDClient *client)
   2708 {
   2709     if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS &&
   2710         !client->quiescing) {
   2711         nbd_client_get(client);
   2712         client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
   2713         aio_co_schedule(client->exp->common.ctx, client->recv_coroutine);
   2714     }
   2715 }
   2716 
   2717 static coroutine_fn void nbd_co_client_start(void *opaque)
   2718 {
   2719     NBDClient *client = opaque;
   2720     Error *local_err = NULL;
   2721 
   2722     qemu_co_mutex_init(&client->send_lock);
   2723 
   2724     if (nbd_negotiate(client, &local_err)) {
   2725         if (local_err) {
   2726             error_report_err(local_err);
   2727         }
   2728         client_close(client, false);
   2729         return;
   2730     }
   2731 
   2732     nbd_client_receive_next_request(client);
   2733 }
   2734 
   2735 /*
   2736  * Create a new client listener using the given channel @sioc.
   2737  * Begin servicing it in a coroutine.  When the connection closes, call
   2738  * @close_fn with an indication of whether the client completed negotiation.
   2739  */
   2740 void nbd_client_new(QIOChannelSocket *sioc,
   2741                     QCryptoTLSCreds *tlscreds,
   2742                     const char *tlsauthz,
   2743                     void (*close_fn)(NBDClient *, bool))
   2744 {
   2745     NBDClient *client;
   2746     Coroutine *co;
   2747 
   2748     client = g_new0(NBDClient, 1);
   2749     client->refcount = 1;
   2750     client->tlscreds = tlscreds;
   2751     if (tlscreds) {
   2752         object_ref(OBJECT(client->tlscreds));
   2753     }
   2754     client->tlsauthz = g_strdup(tlsauthz);
   2755     client->sioc = sioc;
   2756     object_ref(OBJECT(client->sioc));
   2757     client->ioc = QIO_CHANNEL(sioc);
   2758     object_ref(OBJECT(client->ioc));
   2759     client->close_fn = close_fn;
   2760 
   2761     co = qemu_coroutine_create(nbd_co_client_start, client);
   2762     qemu_coroutine_enter(co);
   2763 }