qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

nbd.c (68618B)


      1 /*
      2  * QEMU Block driver for  NBD
      3  *
      4  * Copyright (c) 2019 Virtuozzo International GmbH.
      5  * Copyright (C) 2016 Red Hat, Inc.
      6  * Copyright (C) 2008 Bull S.A.S.
      7  *     Author: Laurent Vivier <Laurent.Vivier@bull.net>
      8  *
      9  * Some parts:
     10  *    Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
     11  *
     12  * Permission is hereby granted, free of charge, to any person obtaining a copy
     13  * of this software and associated documentation files (the "Software"), to deal
     14  * in the Software without restriction, including without limitation the rights
     15  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
     16  * copies of the Software, and to permit persons to whom the Software is
     17  * furnished to do so, subject to the following conditions:
     18  *
     19  * The above copyright notice and this permission notice shall be included in
     20  * all copies or substantial portions of the Software.
     21  *
     22  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     23  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     24  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
     25  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     26  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     27  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     28  * THE SOFTWARE.
     29  */
     30 
     31 #include "qemu/osdep.h"
     32 
     33 #include "trace.h"
     34 #include "qemu/uri.h"
     35 #include "qemu/option.h"
     36 #include "qemu/cutils.h"
     37 #include "qemu/main-loop.h"
     38 
     39 #include "qapi/qapi-visit-sockets.h"
     40 #include "qapi/qmp/qstring.h"
     41 #include "qapi/clone-visitor.h"
     42 
     43 #include "block/qdict.h"
     44 #include "block/nbd.h"
     45 #include "block/block_int.h"
     46 #include "block/coroutines.h"
     47 
     48 #include "qemu/yank.h"
     49 
     50 #define EN_OPTSTR ":exportname="
     51 #define MAX_NBD_REQUESTS    16
     52 
     53 #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ (uint64_t)(intptr_t)(bs))
     54 #define INDEX_TO_HANDLE(bs, index)  ((index)  ^ (uint64_t)(intptr_t)(bs))
     55 
     56 typedef struct {
     57     Coroutine *coroutine;
     58     uint64_t offset;        /* original offset of the request */
     59     bool receiving;         /* sleeping in the yield in nbd_receive_replies */
     60 } NBDClientRequest;
     61 
     62 typedef enum NBDClientState {
     63     NBD_CLIENT_CONNECTING_WAIT,
     64     NBD_CLIENT_CONNECTING_NOWAIT,
     65     NBD_CLIENT_CONNECTED,
     66     NBD_CLIENT_QUIT
     67 } NBDClientState;
     68 
     69 typedef struct BDRVNBDState {
     70     QIOChannel *ioc; /* The current I/O channel */
     71     NBDExportInfo info;
     72 
     73     /*
     74      * Protects state, free_sema, in_flight, requests[].coroutine,
     75      * reconnect_delay_timer.
     76      */
     77     QemuMutex requests_lock;
     78     NBDClientState state;
     79     CoQueue free_sema;
     80     unsigned in_flight;
     81     NBDClientRequest requests[MAX_NBD_REQUESTS];
     82     QEMUTimer *reconnect_delay_timer;
     83 
     84     /* Protects sending data on the socket.  */
     85     CoMutex send_mutex;
     86 
     87     /*
     88      * Protects receiving reply headers from the socket, as well as the
     89      * fields reply and requests[].receiving
     90      */
     91     CoMutex receive_mutex;
     92     NBDReply reply;
     93 
     94     QEMUTimer *open_timer;
     95 
     96     BlockDriverState *bs;
     97 
     98     /* Connection parameters */
     99     uint32_t reconnect_delay;
    100     uint32_t open_timeout;
    101     SocketAddress *saddr;
    102     char *export;
    103     char *tlscredsid;
    104     QCryptoTLSCreds *tlscreds;
    105     char *tlshostname;
    106     char *x_dirty_bitmap;
    107     bool alloc_depth;
    108 
    109     NBDClientConnection *conn;
    110 } BDRVNBDState;
    111 
    112 static void nbd_yank(void *opaque);
    113 
    114 static void nbd_clear_bdrvstate(BlockDriverState *bs)
    115 {
    116     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
    117 
    118     nbd_client_connection_release(s->conn);
    119     s->conn = NULL;
    120 
    121     yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
    122 
    123     /* Must not leave timers behind that would access freed data */
    124     assert(!s->reconnect_delay_timer);
    125     assert(!s->open_timer);
    126 
    127     object_unref(OBJECT(s->tlscreds));
    128     qapi_free_SocketAddress(s->saddr);
    129     s->saddr = NULL;
    130     g_free(s->export);
    131     s->export = NULL;
    132     g_free(s->tlscredsid);
    133     s->tlscredsid = NULL;
    134     g_free(s->tlshostname);
    135     s->tlshostname = NULL;
    136     g_free(s->x_dirty_bitmap);
    137     s->x_dirty_bitmap = NULL;
    138 }
    139 
    140 /* Called with s->receive_mutex taken.  */
    141 static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
    142 {
    143     if (req->receiving) {
    144         req->receiving = false;
    145         aio_co_wake(req->coroutine);
    146         return true;
    147     }
    148 
    149     return false;
    150 }
    151 
    152 static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s)
    153 {
    154     int i;
    155 
    156     QEMU_LOCK_GUARD(&s->receive_mutex);
    157     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
    158         if (nbd_recv_coroutine_wake_one(&s->requests[i])) {
    159             return;
    160         }
    161     }
    162 }
    163 
    164 /* Called with s->requests_lock held.  */
    165 static void coroutine_fn nbd_channel_error_locked(BDRVNBDState *s, int ret)
    166 {
    167     if (s->state == NBD_CLIENT_CONNECTED) {
    168         qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
    169     }
    170 
    171     if (ret == -EIO) {
    172         if (s->state == NBD_CLIENT_CONNECTED) {
    173             s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
    174                                             NBD_CLIENT_CONNECTING_NOWAIT;
    175         }
    176     } else {
    177         s->state = NBD_CLIENT_QUIT;
    178     }
    179 }
    180 
    181 static void coroutine_fn nbd_channel_error(BDRVNBDState *s, int ret)
    182 {
    183     QEMU_LOCK_GUARD(&s->requests_lock);
    184     nbd_channel_error_locked(s, ret);
    185 }
    186 
    187 static void reconnect_delay_timer_del(BDRVNBDState *s)
    188 {
    189     if (s->reconnect_delay_timer) {
    190         timer_free(s->reconnect_delay_timer);
    191         s->reconnect_delay_timer = NULL;
    192     }
    193 }
    194 
    195 static void reconnect_delay_timer_cb(void *opaque)
    196 {
    197     BDRVNBDState *s = opaque;
    198 
    199     reconnect_delay_timer_del(s);
    200     WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
    201         if (s->state != NBD_CLIENT_CONNECTING_WAIT) {
    202             return;
    203         }
    204         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
    205     }
    206     nbd_co_establish_connection_cancel(s->conn);
    207 }
    208 
    209 static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
    210 {
    211     assert(!s->reconnect_delay_timer);
    212     s->reconnect_delay_timer = aio_timer_new(bdrv_get_aio_context(s->bs),
    213                                              QEMU_CLOCK_REALTIME,
    214                                              SCALE_NS,
    215                                              reconnect_delay_timer_cb, s);
    216     timer_mod(s->reconnect_delay_timer, expire_time_ns);
    217 }
    218 
    219 static void nbd_teardown_connection(BlockDriverState *bs)
    220 {
    221     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
    222 
    223     assert(!s->in_flight);
    224 
    225     if (s->ioc) {
    226         qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
    227         yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
    228                                  nbd_yank, s->bs);
    229         object_unref(OBJECT(s->ioc));
    230         s->ioc = NULL;
    231     }
    232 
    233     WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
    234         s->state = NBD_CLIENT_QUIT;
    235     }
    236 }
    237 
    238 static void open_timer_del(BDRVNBDState *s)
    239 {
    240     if (s->open_timer) {
    241         timer_free(s->open_timer);
    242         s->open_timer = NULL;
    243     }
    244 }
    245 
    246 static void open_timer_cb(void *opaque)
    247 {
    248     BDRVNBDState *s = opaque;
    249 
    250     nbd_co_establish_connection_cancel(s->conn);
    251     open_timer_del(s);
    252 }
    253 
    254 static void open_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
    255 {
    256     assert(!s->open_timer);
    257     s->open_timer = aio_timer_new(bdrv_get_aio_context(s->bs),
    258                                   QEMU_CLOCK_REALTIME,
    259                                   SCALE_NS,
    260                                   open_timer_cb, s);
    261     timer_mod(s->open_timer, expire_time_ns);
    262 }
    263 
    264 static bool nbd_client_will_reconnect(BDRVNBDState *s)
    265 {
    266     /*
    267      * Called only after a socket error, so this is not performance sensitive.
    268      */
    269     QEMU_LOCK_GUARD(&s->requests_lock);
    270     return s->state == NBD_CLIENT_CONNECTING_WAIT;
    271 }
    272 
    273 /*
    274  * Update @bs with information learned during a completed negotiation process.
    275  * Return failure if the server's advertised options are incompatible with the
    276  * client's needs.
    277  */
    278 static int nbd_handle_updated_info(BlockDriverState *bs, Error **errp)
    279 {
    280     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
    281     int ret;
    282 
    283     if (s->x_dirty_bitmap) {
    284         if (!s->info.base_allocation) {
    285             error_setg(errp, "requested x-dirty-bitmap %s not found",
    286                        s->x_dirty_bitmap);
    287             return -EINVAL;
    288         }
    289         if (strcmp(s->x_dirty_bitmap, "qemu:allocation-depth") == 0) {
    290             s->alloc_depth = true;
    291         }
    292     }
    293 
    294     if (s->info.flags & NBD_FLAG_READ_ONLY) {
    295         ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
    296         if (ret < 0) {
    297             return ret;
    298         }
    299     }
    300 
    301     if (s->info.flags & NBD_FLAG_SEND_FUA) {
    302         bs->supported_write_flags = BDRV_REQ_FUA;
    303         bs->supported_zero_flags |= BDRV_REQ_FUA;
    304     }
    305 
    306     if (s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
    307         bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
    308         if (s->info.flags & NBD_FLAG_SEND_FAST_ZERO) {
    309             bs->supported_zero_flags |= BDRV_REQ_NO_FALLBACK;
    310         }
    311     }
    312 
    313     trace_nbd_client_handshake_success(s->export);
    314 
    315     return 0;
    316 }
    317 
    318 int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
    319                                                 bool blocking, Error **errp)
    320 {
    321     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
    322     int ret;
    323     IO_CODE();
    324 
    325     assert(!s->ioc);
    326 
    327     s->ioc = nbd_co_establish_connection(s->conn, &s->info, blocking, errp);
    328     if (!s->ioc) {
    329         return -ECONNREFUSED;
    330     }
    331 
    332     yank_register_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), nbd_yank,
    333                            bs);
    334 
    335     ret = nbd_handle_updated_info(s->bs, NULL);
    336     if (ret < 0) {
    337         /*
    338          * We have connected, but must fail for other reasons.
    339          * Send NBD_CMD_DISC as a courtesy to the server.
    340          */
    341         NBDRequest request = { .type = NBD_CMD_DISC };
    342 
    343         nbd_send_request(s->ioc, &request);
    344 
    345         yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
    346                                  nbd_yank, bs);
    347         object_unref(OBJECT(s->ioc));
    348         s->ioc = NULL;
    349 
    350         return ret;
    351     }
    352 
    353     qio_channel_set_blocking(s->ioc, false, NULL);
    354     qio_channel_attach_aio_context(s->ioc, bdrv_get_aio_context(bs));
    355 
    356     /* successfully connected */
    357     WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
    358         s->state = NBD_CLIENT_CONNECTED;
    359     }
    360 
    361     return 0;
    362 }
    363 
    364 /* Called with s->requests_lock held.  */
    365 static bool nbd_client_connecting(BDRVNBDState *s)
    366 {
    367     return s->state == NBD_CLIENT_CONNECTING_WAIT ||
    368         s->state == NBD_CLIENT_CONNECTING_NOWAIT;
    369 }
    370 
    371 /* Called with s->requests_lock taken.  */
    372 static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
    373 {
    374     int ret;
    375     bool blocking = s->state == NBD_CLIENT_CONNECTING_WAIT;
    376 
    377     /*
    378      * Now we are sure that nobody is accessing the channel, and no one will
    379      * try until we set the state to CONNECTED.
    380      */
    381     assert(nbd_client_connecting(s));
    382     assert(s->in_flight == 1);
    383 
    384     trace_nbd_reconnect_attempt(s->bs->in_flight);
    385 
    386     if (blocking && !s->reconnect_delay_timer) {
    387         /*
    388          * It's the first reconnect attempt after switching to
    389          * NBD_CLIENT_CONNECTING_WAIT
    390          */
    391         g_assert(s->reconnect_delay);
    392         reconnect_delay_timer_init(s,
    393             qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
    394             s->reconnect_delay * NANOSECONDS_PER_SECOND);
    395     }
    396 
    397     /* Finalize previous connection if any */
    398     if (s->ioc) {
    399         qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
    400         yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
    401                                  nbd_yank, s->bs);
    402         object_unref(OBJECT(s->ioc));
    403         s->ioc = NULL;
    404     }
    405 
    406     qemu_mutex_unlock(&s->requests_lock);
    407     ret = nbd_co_do_establish_connection(s->bs, blocking, NULL);
    408     trace_nbd_reconnect_attempt_result(ret, s->bs->in_flight);
    409     qemu_mutex_lock(&s->requests_lock);
    410 
    411     /*
    412      * The reconnect attempt is done (maybe successfully, maybe not), so
    413      * we no longer need this timer.  Delete it so it will not outlive
    414      * this I/O request (so draining removes all timers).
    415      */
    416     reconnect_delay_timer_del(s);
    417 }
    418 
    419 static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t handle)
    420 {
    421     int ret;
    422     uint64_t ind = HANDLE_TO_INDEX(s, handle), ind2;
    423     QEMU_LOCK_GUARD(&s->receive_mutex);
    424 
    425     while (true) {
    426         if (s->reply.handle == handle) {
    427             /* We are done */
    428             return 0;
    429         }
    430 
    431         if (s->reply.handle != 0) {
    432             /*
    433              * Some other request is being handled now. It should already be
    434              * woken by whoever set s->reply.handle (or never wait in this
    435              * yield). So, we should not wake it here.
    436              */
    437             ind2 = HANDLE_TO_INDEX(s, s->reply.handle);
    438             assert(!s->requests[ind2].receiving);
    439 
    440             s->requests[ind].receiving = true;
    441             qemu_co_mutex_unlock(&s->receive_mutex);
    442 
    443             qemu_coroutine_yield();
    444             /*
    445              * We may be woken for 2 reasons:
    446              * 1. From this function, executing in parallel coroutine, when our
    447              *    handle is received.
    448              * 2. From nbd_co_receive_one_chunk(), when previous request is
    449              *    finished and s->reply.handle set to 0.
    450              * Anyway, it's OK to lock the mutex and go to the next iteration.
    451              */
    452 
    453             qemu_co_mutex_lock(&s->receive_mutex);
    454             assert(!s->requests[ind].receiving);
    455             continue;
    456         }
    457 
    458         /* We are under mutex and handle is 0. We have to do the dirty work. */
    459         assert(s->reply.handle == 0);
    460         ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, NULL);
    461         if (ret <= 0) {
    462             ret = ret ? ret : -EIO;
    463             nbd_channel_error(s, ret);
    464             return ret;
    465         }
    466         if (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply) {
    467             nbd_channel_error(s, -EINVAL);
    468             return -EINVAL;
    469         }
    470         ind2 = HANDLE_TO_INDEX(s, s->reply.handle);
    471         if (ind2 >= MAX_NBD_REQUESTS || !s->requests[ind2].coroutine) {
    472             nbd_channel_error(s, -EINVAL);
    473             return -EINVAL;
    474         }
    475         if (s->reply.handle == handle) {
    476             /* We are done */
    477             return 0;
    478         }
    479         nbd_recv_coroutine_wake_one(&s->requests[ind2]);
    480     }
    481 }
    482 
    483 static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
    484                                             NBDRequest *request,
    485                                             QEMUIOVector *qiov)
    486 {
    487     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
    488     int rc, i = -1;
    489 
    490     qemu_mutex_lock(&s->requests_lock);
    491     while (s->in_flight == MAX_NBD_REQUESTS ||
    492            (s->state != NBD_CLIENT_CONNECTED && s->in_flight > 0)) {
    493         qemu_co_queue_wait(&s->free_sema, &s->requests_lock);
    494     }
    495 
    496     s->in_flight++;
    497     if (s->state != NBD_CLIENT_CONNECTED) {
    498         if (nbd_client_connecting(s)) {
    499             nbd_reconnect_attempt(s);
    500             qemu_co_queue_restart_all(&s->free_sema);
    501         }
    502         if (s->state != NBD_CLIENT_CONNECTED) {
    503             rc = -EIO;
    504             goto err;
    505         }
    506     }
    507 
    508     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
    509         if (s->requests[i].coroutine == NULL) {
    510             break;
    511         }
    512     }
    513 
    514     assert(i < MAX_NBD_REQUESTS);
    515     s->requests[i].coroutine = qemu_coroutine_self();
    516     s->requests[i].offset = request->from;
    517     s->requests[i].receiving = false;
    518     qemu_mutex_unlock(&s->requests_lock);
    519 
    520     qemu_co_mutex_lock(&s->send_mutex);
    521     request->handle = INDEX_TO_HANDLE(s, i);
    522 
    523     assert(s->ioc);
    524 
    525     if (qiov) {
    526         qio_channel_set_cork(s->ioc, true);
    527         rc = nbd_send_request(s->ioc, request);
    528         if (rc >= 0 && qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
    529                                               NULL) < 0) {
    530             rc = -EIO;
    531         }
    532         qio_channel_set_cork(s->ioc, false);
    533     } else {
    534         rc = nbd_send_request(s->ioc, request);
    535     }
    536     qemu_co_mutex_unlock(&s->send_mutex);
    537 
    538     if (rc < 0) {
    539         qemu_mutex_lock(&s->requests_lock);
    540 err:
    541         nbd_channel_error_locked(s, rc);
    542         if (i != -1) {
    543             s->requests[i].coroutine = NULL;
    544         }
    545         s->in_flight--;
    546         qemu_co_queue_next(&s->free_sema);
    547         qemu_mutex_unlock(&s->requests_lock);
    548     }
    549     return rc;
    550 }
    551 
    552 static inline uint16_t payload_advance16(uint8_t **payload)
    553 {
    554     *payload += 2;
    555     return lduw_be_p(*payload - 2);
    556 }
    557 
    558 static inline uint32_t payload_advance32(uint8_t **payload)
    559 {
    560     *payload += 4;
    561     return ldl_be_p(*payload - 4);
    562 }
    563 
    564 static inline uint64_t payload_advance64(uint8_t **payload)
    565 {
    566     *payload += 8;
    567     return ldq_be_p(*payload - 8);
    568 }
    569 
    570 static int nbd_parse_offset_hole_payload(BDRVNBDState *s,
    571                                          NBDStructuredReplyChunk *chunk,
    572                                          uint8_t *payload, uint64_t orig_offset,
    573                                          QEMUIOVector *qiov, Error **errp)
    574 {
    575     uint64_t offset;
    576     uint32_t hole_size;
    577 
    578     if (chunk->length != sizeof(offset) + sizeof(hole_size)) {
    579         error_setg(errp, "Protocol error: invalid payload for "
    580                          "NBD_REPLY_TYPE_OFFSET_HOLE");
    581         return -EINVAL;
    582     }
    583 
    584     offset = payload_advance64(&payload);
    585     hole_size = payload_advance32(&payload);
    586 
    587     if (!hole_size || offset < orig_offset || hole_size > qiov->size ||
    588         offset > orig_offset + qiov->size - hole_size) {
    589         error_setg(errp, "Protocol error: server sent chunk exceeding requested"
    590                          " region");
    591         return -EINVAL;
    592     }
    593     if (s->info.min_block &&
    594         !QEMU_IS_ALIGNED(hole_size, s->info.min_block)) {
    595         trace_nbd_structured_read_compliance("hole");
    596     }
    597 
    598     qemu_iovec_memset(qiov, offset - orig_offset, 0, hole_size);
    599 
    600     return 0;
    601 }
    602 
    603 /*
    604  * nbd_parse_blockstatus_payload
    605  * Based on our request, we expect only one extent in reply, for the
    606  * base:allocation context.
    607  */
    608 static int nbd_parse_blockstatus_payload(BDRVNBDState *s,
    609                                          NBDStructuredReplyChunk *chunk,
    610                                          uint8_t *payload, uint64_t orig_length,
    611                                          NBDExtent *extent, Error **errp)
    612 {
    613     uint32_t context_id;
    614 
    615     /* The server succeeded, so it must have sent [at least] one extent */
    616     if (chunk->length < sizeof(context_id) + sizeof(*extent)) {
    617         error_setg(errp, "Protocol error: invalid payload for "
    618                          "NBD_REPLY_TYPE_BLOCK_STATUS");
    619         return -EINVAL;
    620     }
    621 
    622     context_id = payload_advance32(&payload);
    623     if (s->info.context_id != context_id) {
    624         error_setg(errp, "Protocol error: unexpected context id %d for "
    625                          "NBD_REPLY_TYPE_BLOCK_STATUS, when negotiated context "
    626                          "id is %d", context_id,
    627                          s->info.context_id);
    628         return -EINVAL;
    629     }
    630 
    631     extent->length = payload_advance32(&payload);
    632     extent->flags = payload_advance32(&payload);
    633 
    634     if (extent->length == 0) {
    635         error_setg(errp, "Protocol error: server sent status chunk with "
    636                    "zero length");
    637         return -EINVAL;
    638     }
    639 
    640     /*
    641      * A server sending unaligned block status is in violation of the
    642      * protocol, but as qemu-nbd 3.1 is such a server (at least for
    643      * POSIX files that are not a multiple of 512 bytes, since qemu
    644      * rounds files up to 512-byte multiples but lseek(SEEK_HOLE)
    645      * still sees an implicit hole beyond the real EOF), it's nicer to
    646      * work around the misbehaving server. If the request included
    647      * more than the final unaligned block, truncate it back to an
    648      * aligned result; if the request was only the final block, round
    649      * up to the full block and change the status to fully-allocated
    650      * (always a safe status, even if it loses information).
    651      */
    652     if (s->info.min_block && !QEMU_IS_ALIGNED(extent->length,
    653                                                    s->info.min_block)) {
    654         trace_nbd_parse_blockstatus_compliance("extent length is unaligned");
    655         if (extent->length > s->info.min_block) {
    656             extent->length = QEMU_ALIGN_DOWN(extent->length,
    657                                              s->info.min_block);
    658         } else {
    659             extent->length = s->info.min_block;
    660             extent->flags = 0;
    661         }
    662     }
    663 
    664     /*
    665      * We used NBD_CMD_FLAG_REQ_ONE, so the server should not have
    666      * sent us any more than one extent, nor should it have included
    667      * status beyond our request in that extent. However, it's easy
    668      * enough to ignore the server's noncompliance without killing the
    669      * connection; just ignore trailing extents, and clamp things to
    670      * the length of our request.
    671      */
    672     if (chunk->length > sizeof(context_id) + sizeof(*extent)) {
    673         trace_nbd_parse_blockstatus_compliance("more than one extent");
    674     }
    675     if (extent->length > orig_length) {
    676         extent->length = orig_length;
    677         trace_nbd_parse_blockstatus_compliance("extent length too large");
    678     }
    679 
    680     /*
    681      * HACK: if we are using x-dirty-bitmaps to access
    682      * qemu:allocation-depth, treat all depths > 2 the same as 2,
    683      * since nbd_client_co_block_status is only expecting the low two
    684      * bits to be set.
    685      */
    686     if (s->alloc_depth && extent->flags > 2) {
    687         extent->flags = 2;
    688     }
    689 
    690     return 0;
    691 }
    692 
    693 /*
    694  * nbd_parse_error_payload
    695  * on success @errp contains message describing nbd error reply
    696  */
    697 static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
    698                                    uint8_t *payload, int *request_ret,
    699                                    Error **errp)
    700 {
    701     uint32_t error;
    702     uint16_t message_size;
    703 
    704     assert(chunk->type & (1 << 15));
    705 
    706     if (chunk->length < sizeof(error) + sizeof(message_size)) {
    707         error_setg(errp,
    708                    "Protocol error: invalid payload for structured error");
    709         return -EINVAL;
    710     }
    711 
    712     error = nbd_errno_to_system_errno(payload_advance32(&payload));
    713     if (error == 0) {
    714         error_setg(errp, "Protocol error: server sent structured error chunk "
    715                          "with error = 0");
    716         return -EINVAL;
    717     }
    718 
    719     *request_ret = -error;
    720     message_size = payload_advance16(&payload);
    721 
    722     if (message_size > chunk->length - sizeof(error) - sizeof(message_size)) {
    723         error_setg(errp, "Protocol error: server sent structured error chunk "
    724                          "with incorrect message size");
    725         return -EINVAL;
    726     }
    727 
    728     /* TODO: Add a trace point to mention the server complaint */
    729 
    730     /* TODO handle ERROR_OFFSET */
    731 
    732     return 0;
    733 }
    734 
    735 static int coroutine_fn
    736 nbd_co_receive_offset_data_payload(BDRVNBDState *s, uint64_t orig_offset,
    737                                    QEMUIOVector *qiov, Error **errp)
    738 {
    739     QEMUIOVector sub_qiov;
    740     uint64_t offset;
    741     size_t data_size;
    742     int ret;
    743     NBDStructuredReplyChunk *chunk = &s->reply.structured;
    744 
    745     assert(nbd_reply_is_structured(&s->reply));
    746 
    747     /* The NBD spec requires at least one byte of payload */
    748     if (chunk->length <= sizeof(offset)) {
    749         error_setg(errp, "Protocol error: invalid payload for "
    750                          "NBD_REPLY_TYPE_OFFSET_DATA");
    751         return -EINVAL;
    752     }
    753 
    754     if (nbd_read64(s->ioc, &offset, "OFFSET_DATA offset", errp) < 0) {
    755         return -EIO;
    756     }
    757 
    758     data_size = chunk->length - sizeof(offset);
    759     assert(data_size);
    760     if (offset < orig_offset || data_size > qiov->size ||
    761         offset > orig_offset + qiov->size - data_size) {
    762         error_setg(errp, "Protocol error: server sent chunk exceeding requested"
    763                          " region");
    764         return -EINVAL;
    765     }
    766     if (s->info.min_block && !QEMU_IS_ALIGNED(data_size, s->info.min_block)) {
    767         trace_nbd_structured_read_compliance("data");
    768     }
    769 
    770     qemu_iovec_init(&sub_qiov, qiov->niov);
    771     qemu_iovec_concat(&sub_qiov, qiov, offset - orig_offset, data_size);
    772     ret = qio_channel_readv_all(s->ioc, sub_qiov.iov, sub_qiov.niov, errp);
    773     qemu_iovec_destroy(&sub_qiov);
    774 
    775     return ret < 0 ? -EIO : 0;
    776 }
    777 
    778 #define NBD_MAX_MALLOC_PAYLOAD 1000
    779 static coroutine_fn int nbd_co_receive_structured_payload(
    780         BDRVNBDState *s, void **payload, Error **errp)
    781 {
    782     int ret;
    783     uint32_t len;
    784 
    785     assert(nbd_reply_is_structured(&s->reply));
    786 
    787     len = s->reply.structured.length;
    788 
    789     if (len == 0) {
    790         return 0;
    791     }
    792 
    793     if (payload == NULL) {
    794         error_setg(errp, "Unexpected structured payload");
    795         return -EINVAL;
    796     }
    797 
    798     if (len > NBD_MAX_MALLOC_PAYLOAD) {
    799         error_setg(errp, "Payload too large");
    800         return -EINVAL;
    801     }
    802 
    803     *payload = g_new(char, len);
    804     ret = nbd_read(s->ioc, *payload, len, "structured payload", errp);
    805     if (ret < 0) {
    806         g_free(*payload);
    807         *payload = NULL;
    808         return ret;
    809     }
    810 
    811     return 0;
    812 }
    813 
    814 /*
    815  * nbd_co_do_receive_one_chunk
    816  * for simple reply:
    817  *   set request_ret to received reply error
    818  *   if qiov is not NULL: read payload to @qiov
    819  * for structured reply chunk:
    820  *   if error chunk: read payload, set @request_ret, do not set @payload
    821  *   else if offset_data chunk: read payload data to @qiov, do not set @payload
    822  *   else: read payload to @payload
    823  *
    824  * If function fails, @errp contains corresponding error message, and the
    825  * connection with the server is suspect.  If it returns 0, then the
    826  * transaction succeeded (although @request_ret may be a negative errno
    827  * corresponding to the server's error reply), and errp is unchanged.
    828  */
    829 static coroutine_fn int nbd_co_do_receive_one_chunk(
    830         BDRVNBDState *s, uint64_t handle, bool only_structured,
    831         int *request_ret, QEMUIOVector *qiov, void **payload, Error **errp)
    832 {
    833     int ret;
    834     int i = HANDLE_TO_INDEX(s, handle);
    835     void *local_payload = NULL;
    836     NBDStructuredReplyChunk *chunk;
    837 
    838     if (payload) {
    839         *payload = NULL;
    840     }
    841     *request_ret = 0;
    842 
    843     ret = nbd_receive_replies(s, handle);
    844     if (ret < 0) {
    845         error_setg(errp, "Connection closed");
    846         return -EIO;
    847     }
    848     assert(s->ioc);
    849 
    850     assert(s->reply.handle == handle);
    851 
    852     if (nbd_reply_is_simple(&s->reply)) {
    853         if (only_structured) {
    854             error_setg(errp, "Protocol error: simple reply when structured "
    855                              "reply chunk was expected");
    856             return -EINVAL;
    857         }
    858 
    859         *request_ret = -nbd_errno_to_system_errno(s->reply.simple.error);
    860         if (*request_ret < 0 || !qiov) {
    861             return 0;
    862         }
    863 
    864         return qio_channel_readv_all(s->ioc, qiov->iov, qiov->niov,
    865                                      errp) < 0 ? -EIO : 0;
    866     }
    867 
    868     /* handle structured reply chunk */
    869     assert(s->info.structured_reply);
    870     chunk = &s->reply.structured;
    871 
    872     if (chunk->type == NBD_REPLY_TYPE_NONE) {
    873         if (!(chunk->flags & NBD_REPLY_FLAG_DONE)) {
    874             error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk without"
    875                        " NBD_REPLY_FLAG_DONE flag set");
    876             return -EINVAL;
    877         }
    878         if (chunk->length) {
    879             error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk with"
    880                        " nonzero length");
    881             return -EINVAL;
    882         }
    883         return 0;
    884     }
    885 
    886     if (chunk->type == NBD_REPLY_TYPE_OFFSET_DATA) {
    887         if (!qiov) {
    888             error_setg(errp, "Unexpected NBD_REPLY_TYPE_OFFSET_DATA chunk");
    889             return -EINVAL;
    890         }
    891 
    892         return nbd_co_receive_offset_data_payload(s, s->requests[i].offset,
    893                                                   qiov, errp);
    894     }
    895 
    896     if (nbd_reply_type_is_error(chunk->type)) {
    897         payload = &local_payload;
    898     }
    899 
    900     ret = nbd_co_receive_structured_payload(s, payload, errp);
    901     if (ret < 0) {
    902         return ret;
    903     }
    904 
    905     if (nbd_reply_type_is_error(chunk->type)) {
    906         ret = nbd_parse_error_payload(chunk, local_payload, request_ret, errp);
    907         g_free(local_payload);
    908         return ret;
    909     }
    910 
    911     return 0;
    912 }
    913 
    914 /*
    915  * nbd_co_receive_one_chunk
    916  * Read reply, wake up connection_co and set s->quit if needed.
    917  * Return value is a fatal error code or normal nbd reply error code
    918  */
    919 static coroutine_fn int nbd_co_receive_one_chunk(
    920         BDRVNBDState *s, uint64_t handle, bool only_structured,
    921         int *request_ret, QEMUIOVector *qiov, NBDReply *reply, void **payload,
    922         Error **errp)
    923 {
    924     int ret = nbd_co_do_receive_one_chunk(s, handle, only_structured,
    925                                           request_ret, qiov, payload, errp);
    926 
    927     if (ret < 0) {
    928         memset(reply, 0, sizeof(*reply));
    929         nbd_channel_error(s, ret);
    930     } else {
    931         /* For assert at loop start in nbd_connection_entry */
    932         *reply = s->reply;
    933     }
    934     s->reply.handle = 0;
    935 
    936     nbd_recv_coroutines_wake(s);
    937 
    938     return ret;
    939 }
    940 
    941 typedef struct NBDReplyChunkIter {
    942     int ret;
    943     int request_ret;
    944     Error *err;
    945     bool done, only_structured;
    946 } NBDReplyChunkIter;
    947 
    948 static void nbd_iter_channel_error(NBDReplyChunkIter *iter,
    949                                    int ret, Error **local_err)
    950 {
    951     assert(local_err && *local_err);
    952     assert(ret < 0);
    953 
    954     if (!iter->ret) {
    955         iter->ret = ret;
    956         error_propagate(&iter->err, *local_err);
    957     } else {
    958         error_free(*local_err);
    959     }
    960 
    961     *local_err = NULL;
    962 }
    963 
    964 static void nbd_iter_request_error(NBDReplyChunkIter *iter, int ret)
    965 {
    966     assert(ret < 0);
    967 
    968     if (!iter->request_ret) {
    969         iter->request_ret = ret;
    970     }
    971 }
    972 
    973 /*
    974  * NBD_FOREACH_REPLY_CHUNK
    975  * The pointer stored in @payload requires g_free() to free it.
    976  */
    977 #define NBD_FOREACH_REPLY_CHUNK(s, iter, handle, structured, \
    978                                 qiov, reply, payload) \
    979     for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \
    980          nbd_reply_chunk_iter_receive(s, &iter, handle, qiov, reply, payload);)
    981 
    982 /*
    983  * nbd_reply_chunk_iter_receive
    984  * The pointer stored in @payload requires g_free() to free it.
    985  */
    986 static bool coroutine_fn nbd_reply_chunk_iter_receive(BDRVNBDState *s,
    987                                                       NBDReplyChunkIter *iter,
    988                                                       uint64_t handle,
    989                                                       QEMUIOVector *qiov,
    990                                                       NBDReply *reply,
    991                                                       void **payload)
    992 {
    993     int ret, request_ret;
    994     NBDReply local_reply;
    995     NBDStructuredReplyChunk *chunk;
    996     Error *local_err = NULL;
    997 
    998     if (iter->done) {
    999         /* Previous iteration was last. */
   1000         goto break_loop;
   1001     }
   1002 
   1003     if (reply == NULL) {
   1004         reply = &local_reply;
   1005     }
   1006 
   1007     ret = nbd_co_receive_one_chunk(s, handle, iter->only_structured,
   1008                                    &request_ret, qiov, reply, payload,
   1009                                    &local_err);
   1010     if (ret < 0) {
   1011         nbd_iter_channel_error(iter, ret, &local_err);
   1012     } else if (request_ret < 0) {
   1013         nbd_iter_request_error(iter, request_ret);
   1014     }
   1015 
   1016     /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
   1017     if (nbd_reply_is_simple(reply) || iter->ret < 0) {
   1018         goto break_loop;
   1019     }
   1020 
   1021     chunk = &reply->structured;
   1022     iter->only_structured = true;
   1023 
   1024     if (chunk->type == NBD_REPLY_TYPE_NONE) {
   1025         /* NBD_REPLY_FLAG_DONE is already checked in nbd_co_receive_one_chunk */
   1026         assert(chunk->flags & NBD_REPLY_FLAG_DONE);
   1027         goto break_loop;
   1028     }
   1029 
   1030     if (chunk->flags & NBD_REPLY_FLAG_DONE) {
   1031         /* This iteration is last. */
   1032         iter->done = true;
   1033     }
   1034 
   1035     /* Execute the loop body */
   1036     return true;
   1037 
   1038 break_loop:
   1039     qemu_mutex_lock(&s->requests_lock);
   1040     s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
   1041     s->in_flight--;
   1042     qemu_co_queue_next(&s->free_sema);
   1043     qemu_mutex_unlock(&s->requests_lock);
   1044 
   1045     return false;
   1046 }
   1047 
   1048 static int coroutine_fn nbd_co_receive_return_code(BDRVNBDState *s, uint64_t handle,
   1049                                                    int *request_ret, Error **errp)
   1050 {
   1051     NBDReplyChunkIter iter;
   1052 
   1053     NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, NULL, NULL) {
   1054         /* nbd_reply_chunk_iter_receive does all the work */
   1055     }
   1056 
   1057     error_propagate(errp, iter.err);
   1058     *request_ret = iter.request_ret;
   1059     return iter.ret;
   1060 }
   1061 
   1062 static int coroutine_fn nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle,
   1063                                                      uint64_t offset, QEMUIOVector *qiov,
   1064                                                      int *request_ret, Error **errp)
   1065 {
   1066     NBDReplyChunkIter iter;
   1067     NBDReply reply;
   1068     void *payload = NULL;
   1069     Error *local_err = NULL;
   1070 
   1071     NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
   1072                             qiov, &reply, &payload)
   1073     {
   1074         int ret;
   1075         NBDStructuredReplyChunk *chunk = &reply.structured;
   1076 
   1077         assert(nbd_reply_is_structured(&reply));
   1078 
   1079         switch (chunk->type) {
   1080         case NBD_REPLY_TYPE_OFFSET_DATA:
   1081             /*
   1082              * special cased in nbd_co_receive_one_chunk, data is already
   1083              * in qiov
   1084              */
   1085             break;
   1086         case NBD_REPLY_TYPE_OFFSET_HOLE:
   1087             ret = nbd_parse_offset_hole_payload(s, &reply.structured, payload,
   1088                                                 offset, qiov, &local_err);
   1089             if (ret < 0) {
   1090                 nbd_channel_error(s, ret);
   1091                 nbd_iter_channel_error(&iter, ret, &local_err);
   1092             }
   1093             break;
   1094         default:
   1095             if (!nbd_reply_type_is_error(chunk->type)) {
   1096                 /* not allowed reply type */
   1097                 nbd_channel_error(s, -EINVAL);
   1098                 error_setg(&local_err,
   1099                            "Unexpected reply type: %d (%s) for CMD_READ",
   1100                            chunk->type, nbd_reply_type_lookup(chunk->type));
   1101                 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
   1102             }
   1103         }
   1104 
   1105         g_free(payload);
   1106         payload = NULL;
   1107     }
   1108 
   1109     error_propagate(errp, iter.err);
   1110     *request_ret = iter.request_ret;
   1111     return iter.ret;
   1112 }
   1113 
   1114 static int coroutine_fn nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
   1115                                                          uint64_t handle, uint64_t length,
   1116                                                          NBDExtent *extent,
   1117                                                          int *request_ret, Error **errp)
   1118 {
   1119     NBDReplyChunkIter iter;
   1120     NBDReply reply;
   1121     void *payload = NULL;
   1122     Error *local_err = NULL;
   1123     bool received = false;
   1124 
   1125     assert(!extent->length);
   1126     NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, &reply, &payload) {
   1127         int ret;
   1128         NBDStructuredReplyChunk *chunk = &reply.structured;
   1129 
   1130         assert(nbd_reply_is_structured(&reply));
   1131 
   1132         switch (chunk->type) {
   1133         case NBD_REPLY_TYPE_BLOCK_STATUS:
   1134             if (received) {
   1135                 nbd_channel_error(s, -EINVAL);
   1136                 error_setg(&local_err, "Several BLOCK_STATUS chunks in reply");
   1137                 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
   1138             }
   1139             received = true;
   1140 
   1141             ret = nbd_parse_blockstatus_payload(s, &reply.structured,
   1142                                                 payload, length, extent,
   1143                                                 &local_err);
   1144             if (ret < 0) {
   1145                 nbd_channel_error(s, ret);
   1146                 nbd_iter_channel_error(&iter, ret, &local_err);
   1147             }
   1148             break;
   1149         default:
   1150             if (!nbd_reply_type_is_error(chunk->type)) {
   1151                 nbd_channel_error(s, -EINVAL);
   1152                 error_setg(&local_err,
   1153                            "Unexpected reply type: %d (%s) "
   1154                            "for CMD_BLOCK_STATUS",
   1155                            chunk->type, nbd_reply_type_lookup(chunk->type));
   1156                 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
   1157             }
   1158         }
   1159 
   1160         g_free(payload);
   1161         payload = NULL;
   1162     }
   1163 
   1164     if (!extent->length && !iter.request_ret) {
   1165         error_setg(&local_err, "Server did not reply with any status extents");
   1166         nbd_iter_channel_error(&iter, -EIO, &local_err);
   1167     }
   1168 
   1169     error_propagate(errp, iter.err);
   1170     *request_ret = iter.request_ret;
   1171     return iter.ret;
   1172 }
   1173 
   1174 static int coroutine_fn nbd_co_request(BlockDriverState *bs, NBDRequest *request,
   1175                                        QEMUIOVector *write_qiov)
   1176 {
   1177     int ret, request_ret;
   1178     Error *local_err = NULL;
   1179     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
   1180 
   1181     assert(request->type != NBD_CMD_READ);
   1182     if (write_qiov) {
   1183         assert(request->type == NBD_CMD_WRITE);
   1184         assert(request->len == iov_size(write_qiov->iov, write_qiov->niov));
   1185     } else {
   1186         assert(request->type != NBD_CMD_WRITE);
   1187     }
   1188 
   1189     do {
   1190         ret = nbd_co_send_request(bs, request, write_qiov);
   1191         if (ret < 0) {
   1192             continue;
   1193         }
   1194 
   1195         ret = nbd_co_receive_return_code(s, request->handle,
   1196                                          &request_ret, &local_err);
   1197         if (local_err) {
   1198             trace_nbd_co_request_fail(request->from, request->len,
   1199                                       request->handle, request->flags,
   1200                                       request->type,
   1201                                       nbd_cmd_lookup(request->type),
   1202                                       ret, error_get_pretty(local_err));
   1203             error_free(local_err);
   1204             local_err = NULL;
   1205         }
   1206     } while (ret < 0 && nbd_client_will_reconnect(s));
   1207 
   1208     return ret ? ret : request_ret;
   1209 }
   1210 
   1211 static int coroutine_fn nbd_client_co_preadv(BlockDriverState *bs, int64_t offset,
   1212                                              int64_t bytes, QEMUIOVector *qiov,
   1213                                              BdrvRequestFlags flags)
   1214 {
   1215     int ret, request_ret;
   1216     Error *local_err = NULL;
   1217     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
   1218     NBDRequest request = {
   1219         .type = NBD_CMD_READ,
   1220         .from = offset,
   1221         .len = bytes,
   1222     };
   1223 
   1224     assert(bytes <= NBD_MAX_BUFFER_SIZE);
   1225 
   1226     if (!bytes) {
   1227         return 0;
   1228     }
   1229     /*
   1230      * Work around the fact that the block layer doesn't do
   1231      * byte-accurate sizing yet - if the read exceeds the server's
   1232      * advertised size because the block layer rounded size up, then
   1233      * truncate the request to the server and tail-pad with zero.
   1234      */
   1235     if (offset >= s->info.size) {
   1236         assert(bytes < BDRV_SECTOR_SIZE);
   1237         qemu_iovec_memset(qiov, 0, 0, bytes);
   1238         return 0;
   1239     }
   1240     if (offset + bytes > s->info.size) {
   1241         uint64_t slop = offset + bytes - s->info.size;
   1242 
   1243         assert(slop < BDRV_SECTOR_SIZE);
   1244         qemu_iovec_memset(qiov, bytes - slop, 0, slop);
   1245         request.len -= slop;
   1246     }
   1247 
   1248     do {
   1249         ret = nbd_co_send_request(bs, &request, NULL);
   1250         if (ret < 0) {
   1251             continue;
   1252         }
   1253 
   1254         ret = nbd_co_receive_cmdread_reply(s, request.handle, offset, qiov,
   1255                                            &request_ret, &local_err);
   1256         if (local_err) {
   1257             trace_nbd_co_request_fail(request.from, request.len, request.handle,
   1258                                       request.flags, request.type,
   1259                                       nbd_cmd_lookup(request.type),
   1260                                       ret, error_get_pretty(local_err));
   1261             error_free(local_err);
   1262             local_err = NULL;
   1263         }
   1264     } while (ret < 0 && nbd_client_will_reconnect(s));
   1265 
   1266     return ret ? ret : request_ret;
   1267 }
   1268 
   1269 static int coroutine_fn nbd_client_co_pwritev(BlockDriverState *bs, int64_t offset,
   1270                                               int64_t bytes, QEMUIOVector *qiov,
   1271                                               BdrvRequestFlags flags)
   1272 {
   1273     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
   1274     NBDRequest request = {
   1275         .type = NBD_CMD_WRITE,
   1276         .from = offset,
   1277         .len = bytes,
   1278     };
   1279 
   1280     assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
   1281     if (flags & BDRV_REQ_FUA) {
   1282         assert(s->info.flags & NBD_FLAG_SEND_FUA);
   1283         request.flags |= NBD_CMD_FLAG_FUA;
   1284     }
   1285 
   1286     assert(bytes <= NBD_MAX_BUFFER_SIZE);
   1287 
   1288     if (!bytes) {
   1289         return 0;
   1290     }
   1291     return nbd_co_request(bs, &request, qiov);
   1292 }
   1293 
   1294 static int coroutine_fn nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
   1295                                                     int64_t bytes, BdrvRequestFlags flags)
   1296 {
   1297     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
   1298     NBDRequest request = {
   1299         .type = NBD_CMD_WRITE_ZEROES,
   1300         .from = offset,
   1301         .len = bytes,  /* .len is uint32_t actually */
   1302     };
   1303 
   1304     assert(bytes <= UINT32_MAX); /* rely on max_pwrite_zeroes */
   1305 
   1306     assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
   1307     if (!(s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
   1308         return -ENOTSUP;
   1309     }
   1310 
   1311     if (flags & BDRV_REQ_FUA) {
   1312         assert(s->info.flags & NBD_FLAG_SEND_FUA);
   1313         request.flags |= NBD_CMD_FLAG_FUA;
   1314     }
   1315     if (!(flags & BDRV_REQ_MAY_UNMAP)) {
   1316         request.flags |= NBD_CMD_FLAG_NO_HOLE;
   1317     }
   1318     if (flags & BDRV_REQ_NO_FALLBACK) {
   1319         assert(s->info.flags & NBD_FLAG_SEND_FAST_ZERO);
   1320         request.flags |= NBD_CMD_FLAG_FAST_ZERO;
   1321     }
   1322 
   1323     if (!bytes) {
   1324         return 0;
   1325     }
   1326     return nbd_co_request(bs, &request, NULL);
   1327 }
   1328 
   1329 static int coroutine_fn nbd_client_co_flush(BlockDriverState *bs)
   1330 {
   1331     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
   1332     NBDRequest request = { .type = NBD_CMD_FLUSH };
   1333 
   1334     if (!(s->info.flags & NBD_FLAG_SEND_FLUSH)) {
   1335         return 0;
   1336     }
   1337 
   1338     request.from = 0;
   1339     request.len = 0;
   1340 
   1341     return nbd_co_request(bs, &request, NULL);
   1342 }
   1343 
   1344 static int coroutine_fn nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset,
   1345                                                int64_t bytes)
   1346 {
   1347     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
   1348     NBDRequest request = {
   1349         .type = NBD_CMD_TRIM,
   1350         .from = offset,
   1351         .len = bytes, /* len is uint32_t */
   1352     };
   1353 
   1354     assert(bytes <= UINT32_MAX); /* rely on max_pdiscard */
   1355 
   1356     assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
   1357     if (!(s->info.flags & NBD_FLAG_SEND_TRIM) || !bytes) {
   1358         return 0;
   1359     }
   1360 
   1361     return nbd_co_request(bs, &request, NULL);
   1362 }
   1363 
   1364 static int coroutine_fn nbd_client_co_block_status(
   1365         BlockDriverState *bs, bool want_zero, int64_t offset, int64_t bytes,
   1366         int64_t *pnum, int64_t *map, BlockDriverState **file)
   1367 {
   1368     int ret, request_ret;
   1369     NBDExtent extent = { 0 };
   1370     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
   1371     Error *local_err = NULL;
   1372 
   1373     NBDRequest request = {
   1374         .type = NBD_CMD_BLOCK_STATUS,
   1375         .from = offset,
   1376         .len = MIN(QEMU_ALIGN_DOWN(INT_MAX, bs->bl.request_alignment),
   1377                    MIN(bytes, s->info.size - offset)),
   1378         .flags = NBD_CMD_FLAG_REQ_ONE,
   1379     };
   1380 
   1381     if (!s->info.base_allocation) {
   1382         *pnum = bytes;
   1383         *map = offset;
   1384         *file = bs;
   1385         return BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
   1386     }
   1387 
   1388     /*
   1389      * Work around the fact that the block layer doesn't do
   1390      * byte-accurate sizing yet - if the status request exceeds the
   1391      * server's advertised size because the block layer rounded size
   1392      * up, we truncated the request to the server (above), or are
   1393      * called on just the hole.
   1394      */
   1395     if (offset >= s->info.size) {
   1396         *pnum = bytes;
   1397         assert(bytes < BDRV_SECTOR_SIZE);
   1398         /* Intentionally don't report offset_valid for the hole */
   1399         return BDRV_BLOCK_ZERO;
   1400     }
   1401 
   1402     if (s->info.min_block) {
   1403         assert(QEMU_IS_ALIGNED(request.len, s->info.min_block));
   1404     }
   1405     do {
   1406         ret = nbd_co_send_request(bs, &request, NULL);
   1407         if (ret < 0) {
   1408             continue;
   1409         }
   1410 
   1411         ret = nbd_co_receive_blockstatus_reply(s, request.handle, bytes,
   1412                                                &extent, &request_ret,
   1413                                                &local_err);
   1414         if (local_err) {
   1415             trace_nbd_co_request_fail(request.from, request.len, request.handle,
   1416                                       request.flags, request.type,
   1417                                       nbd_cmd_lookup(request.type),
   1418                                       ret, error_get_pretty(local_err));
   1419             error_free(local_err);
   1420             local_err = NULL;
   1421         }
   1422     } while (ret < 0 && nbd_client_will_reconnect(s));
   1423 
   1424     if (ret < 0 || request_ret < 0) {
   1425         return ret ? ret : request_ret;
   1426     }
   1427 
   1428     assert(extent.length);
   1429     *pnum = extent.length;
   1430     *map = offset;
   1431     *file = bs;
   1432     return (extent.flags & NBD_STATE_HOLE ? 0 : BDRV_BLOCK_DATA) |
   1433         (extent.flags & NBD_STATE_ZERO ? BDRV_BLOCK_ZERO : 0) |
   1434         BDRV_BLOCK_OFFSET_VALID;
   1435 }
   1436 
   1437 static int nbd_client_reopen_prepare(BDRVReopenState *state,
   1438                                      BlockReopenQueue *queue, Error **errp)
   1439 {
   1440     BDRVNBDState *s = (BDRVNBDState *)state->bs->opaque;
   1441 
   1442     if ((state->flags & BDRV_O_RDWR) && (s->info.flags & NBD_FLAG_READ_ONLY)) {
   1443         error_setg(errp, "Can't reopen read-only NBD mount as read/write");
   1444         return -EACCES;
   1445     }
   1446     return 0;
   1447 }
   1448 
   1449 static void nbd_yank(void *opaque)
   1450 {
   1451     BlockDriverState *bs = opaque;
   1452     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
   1453 
   1454     QEMU_LOCK_GUARD(&s->requests_lock);
   1455     qio_channel_shutdown(QIO_CHANNEL(s->ioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
   1456     s->state = NBD_CLIENT_QUIT;
   1457 }
   1458 
   1459 static void nbd_client_close(BlockDriverState *bs)
   1460 {
   1461     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
   1462     NBDRequest request = { .type = NBD_CMD_DISC };
   1463 
   1464     if (s->ioc) {
   1465         nbd_send_request(s->ioc, &request);
   1466     }
   1467 
   1468     nbd_teardown_connection(bs);
   1469 }
   1470 
   1471 
   1472 /*
   1473  * Parse nbd_open options
   1474  */
   1475 
   1476 static int nbd_parse_uri(const char *filename, QDict *options)
   1477 {
   1478     URI *uri;
   1479     const char *p;
   1480     QueryParams *qp = NULL;
   1481     int ret = 0;
   1482     bool is_unix;
   1483 
   1484     uri = uri_parse(filename);
   1485     if (!uri) {
   1486         return -EINVAL;
   1487     }
   1488 
   1489     /* transport */
   1490     if (!g_strcmp0(uri->scheme, "nbd")) {
   1491         is_unix = false;
   1492     } else if (!g_strcmp0(uri->scheme, "nbd+tcp")) {
   1493         is_unix = false;
   1494     } else if (!g_strcmp0(uri->scheme, "nbd+unix")) {
   1495         is_unix = true;
   1496     } else {
   1497         ret = -EINVAL;
   1498         goto out;
   1499     }
   1500 
   1501     p = uri->path ? uri->path : "";
   1502     if (p[0] == '/') {
   1503         p++;
   1504     }
   1505     if (p[0]) {
   1506         qdict_put_str(options, "export", p);
   1507     }
   1508 
   1509     qp = query_params_parse(uri->query);
   1510     if (qp->n > 1 || (is_unix && !qp->n) || (!is_unix && qp->n)) {
   1511         ret = -EINVAL;
   1512         goto out;
   1513     }
   1514 
   1515     if (is_unix) {
   1516         /* nbd+unix:///export?socket=path */
   1517         if (uri->server || uri->port || strcmp(qp->p[0].name, "socket")) {
   1518             ret = -EINVAL;
   1519             goto out;
   1520         }
   1521         qdict_put_str(options, "server.type", "unix");
   1522         qdict_put_str(options, "server.path", qp->p[0].value);
   1523     } else {
   1524         QString *host;
   1525         char *port_str;
   1526 
   1527         /* nbd[+tcp]://host[:port]/export */
   1528         if (!uri->server) {
   1529             ret = -EINVAL;
   1530             goto out;
   1531         }
   1532 
   1533         /* strip braces from literal IPv6 address */
   1534         if (uri->server[0] == '[') {
   1535             host = qstring_from_substr(uri->server, 1,
   1536                                        strlen(uri->server) - 1);
   1537         } else {
   1538             host = qstring_from_str(uri->server);
   1539         }
   1540 
   1541         qdict_put_str(options, "server.type", "inet");
   1542         qdict_put(options, "server.host", host);
   1543 
   1544         port_str = g_strdup_printf("%d", uri->port ?: NBD_DEFAULT_PORT);
   1545         qdict_put_str(options, "server.port", port_str);
   1546         g_free(port_str);
   1547     }
   1548 
   1549 out:
   1550     if (qp) {
   1551         query_params_free(qp);
   1552     }
   1553     uri_free(uri);
   1554     return ret;
   1555 }
   1556 
   1557 static bool nbd_has_filename_options_conflict(QDict *options, Error **errp)
   1558 {
   1559     const QDictEntry *e;
   1560 
   1561     for (e = qdict_first(options); e; e = qdict_next(options, e)) {
   1562         if (!strcmp(e->key, "host") ||
   1563             !strcmp(e->key, "port") ||
   1564             !strcmp(e->key, "path") ||
   1565             !strcmp(e->key, "export") ||
   1566             strstart(e->key, "server.", NULL))
   1567         {
   1568             error_setg(errp, "Option '%s' cannot be used with a file name",
   1569                        e->key);
   1570             return true;
   1571         }
   1572     }
   1573 
   1574     return false;
   1575 }
   1576 
   1577 static void nbd_parse_filename(const char *filename, QDict *options,
   1578                                Error **errp)
   1579 {
   1580     g_autofree char *file = NULL;
   1581     char *export_name;
   1582     const char *host_spec;
   1583     const char *unixpath;
   1584 
   1585     if (nbd_has_filename_options_conflict(options, errp)) {
   1586         return;
   1587     }
   1588 
   1589     if (strstr(filename, "://")) {
   1590         int ret = nbd_parse_uri(filename, options);
   1591         if (ret < 0) {
   1592             error_setg(errp, "No valid URL specified");
   1593         }
   1594         return;
   1595     }
   1596 
   1597     file = g_strdup(filename);
   1598 
   1599     export_name = strstr(file, EN_OPTSTR);
   1600     if (export_name) {
   1601         if (export_name[strlen(EN_OPTSTR)] == 0) {
   1602             return;
   1603         }
   1604         export_name[0] = 0; /* truncate 'file' */
   1605         export_name += strlen(EN_OPTSTR);
   1606 
   1607         qdict_put_str(options, "export", export_name);
   1608     }
   1609 
   1610     /* extract the host_spec - fail if it's not nbd:... */
   1611     if (!strstart(file, "nbd:", &host_spec)) {
   1612         error_setg(errp, "File name string for NBD must start with 'nbd:'");
   1613         return;
   1614     }
   1615 
   1616     if (!*host_spec) {
   1617         return;
   1618     }
   1619 
   1620     /* are we a UNIX or TCP socket? */
   1621     if (strstart(host_spec, "unix:", &unixpath)) {
   1622         qdict_put_str(options, "server.type", "unix");
   1623         qdict_put_str(options, "server.path", unixpath);
   1624     } else {
   1625         InetSocketAddress *addr = g_new(InetSocketAddress, 1);
   1626 
   1627         if (inet_parse(addr, host_spec, errp)) {
   1628             goto out_inet;
   1629         }
   1630 
   1631         qdict_put_str(options, "server.type", "inet");
   1632         qdict_put_str(options, "server.host", addr->host);
   1633         qdict_put_str(options, "server.port", addr->port);
   1634     out_inet:
   1635         qapi_free_InetSocketAddress(addr);
   1636     }
   1637 }
   1638 
   1639 static bool nbd_process_legacy_socket_options(QDict *output_options,
   1640                                               QemuOpts *legacy_opts,
   1641                                               Error **errp)
   1642 {
   1643     const char *path = qemu_opt_get(legacy_opts, "path");
   1644     const char *host = qemu_opt_get(legacy_opts, "host");
   1645     const char *port = qemu_opt_get(legacy_opts, "port");
   1646     const QDictEntry *e;
   1647 
   1648     if (!path && !host && !port) {
   1649         return true;
   1650     }
   1651 
   1652     for (e = qdict_first(output_options); e; e = qdict_next(output_options, e))
   1653     {
   1654         if (strstart(e->key, "server.", NULL)) {
   1655             error_setg(errp, "Cannot use 'server' and path/host/port at the "
   1656                        "same time");
   1657             return false;
   1658         }
   1659     }
   1660 
   1661     if (path && host) {
   1662         error_setg(errp, "path and host may not be used at the same time");
   1663         return false;
   1664     } else if (path) {
   1665         if (port) {
   1666             error_setg(errp, "port may not be used without host");
   1667             return false;
   1668         }
   1669 
   1670         qdict_put_str(output_options, "server.type", "unix");
   1671         qdict_put_str(output_options, "server.path", path);
   1672     } else if (host) {
   1673         qdict_put_str(output_options, "server.type", "inet");
   1674         qdict_put_str(output_options, "server.host", host);
   1675         qdict_put_str(output_options, "server.port",
   1676                       port ?: stringify(NBD_DEFAULT_PORT));
   1677     }
   1678 
   1679     return true;
   1680 }
   1681 
   1682 static SocketAddress *nbd_config(BDRVNBDState *s, QDict *options,
   1683                                  Error **errp)
   1684 {
   1685     SocketAddress *saddr = NULL;
   1686     QDict *addr = NULL;
   1687     Visitor *iv = NULL;
   1688 
   1689     qdict_extract_subqdict(options, &addr, "server.");
   1690     if (!qdict_size(addr)) {
   1691         error_setg(errp, "NBD server address missing");
   1692         goto done;
   1693     }
   1694 
   1695     iv = qobject_input_visitor_new_flat_confused(addr, errp);
   1696     if (!iv) {
   1697         goto done;
   1698     }
   1699 
   1700     if (!visit_type_SocketAddress(iv, NULL, &saddr, errp)) {
   1701         goto done;
   1702     }
   1703 
   1704     if (socket_address_parse_named_fd(saddr, errp) < 0) {
   1705         qapi_free_SocketAddress(saddr);
   1706         saddr = NULL;
   1707         goto done;
   1708     }
   1709 
   1710 done:
   1711     qobject_unref(addr);
   1712     visit_free(iv);
   1713     return saddr;
   1714 }
   1715 
   1716 static QCryptoTLSCreds *nbd_get_tls_creds(const char *id, Error **errp)
   1717 {
   1718     Object *obj;
   1719     QCryptoTLSCreds *creds;
   1720 
   1721     obj = object_resolve_path_component(
   1722         object_get_objects_root(), id);
   1723     if (!obj) {
   1724         error_setg(errp, "No TLS credentials with id '%s'",
   1725                    id);
   1726         return NULL;
   1727     }
   1728     creds = (QCryptoTLSCreds *)
   1729         object_dynamic_cast(obj, TYPE_QCRYPTO_TLS_CREDS);
   1730     if (!creds) {
   1731         error_setg(errp, "Object with id '%s' is not TLS credentials",
   1732                    id);
   1733         return NULL;
   1734     }
   1735 
   1736     if (!qcrypto_tls_creds_check_endpoint(creds,
   1737                                           QCRYPTO_TLS_CREDS_ENDPOINT_CLIENT,
   1738                                           errp)) {
   1739         return NULL;
   1740     }
   1741     object_ref(obj);
   1742     return creds;
   1743 }
   1744 
   1745 
   1746 static QemuOptsList nbd_runtime_opts = {
   1747     .name = "nbd",
   1748     .head = QTAILQ_HEAD_INITIALIZER(nbd_runtime_opts.head),
   1749     .desc = {
   1750         {
   1751             .name = "host",
   1752             .type = QEMU_OPT_STRING,
   1753             .help = "TCP host to connect to",
   1754         },
   1755         {
   1756             .name = "port",
   1757             .type = QEMU_OPT_STRING,
   1758             .help = "TCP port to connect to",
   1759         },
   1760         {
   1761             .name = "path",
   1762             .type = QEMU_OPT_STRING,
   1763             .help = "Unix socket path to connect to",
   1764         },
   1765         {
   1766             .name = "export",
   1767             .type = QEMU_OPT_STRING,
   1768             .help = "Name of the NBD export to open",
   1769         },
   1770         {
   1771             .name = "tls-creds",
   1772             .type = QEMU_OPT_STRING,
   1773             .help = "ID of the TLS credentials to use",
   1774         },
   1775         {
   1776             .name = "tls-hostname",
   1777             .type = QEMU_OPT_STRING,
   1778             .help = "Override hostname for validating TLS x509 certificate",
   1779         },
   1780         {
   1781             .name = "x-dirty-bitmap",
   1782             .type = QEMU_OPT_STRING,
   1783             .help = "experimental: expose named dirty bitmap in place of "
   1784                     "block status",
   1785         },
   1786         {
   1787             .name = "reconnect-delay",
   1788             .type = QEMU_OPT_NUMBER,
   1789             .help = "On an unexpected disconnect, the nbd client tries to "
   1790                     "connect again until succeeding or encountering a serious "
   1791                     "error.  During the first @reconnect-delay seconds, all "
   1792                     "requests are paused and will be rerun on a successful "
   1793                     "reconnect. After that time, any delayed requests and all "
   1794                     "future requests before a successful reconnect will "
   1795                     "immediately fail. Default 0",
   1796         },
   1797         {
   1798             .name = "open-timeout",
   1799             .type = QEMU_OPT_NUMBER,
   1800             .help = "In seconds. If zero, the nbd driver tries the connection "
   1801                     "only once, and fails to open if the connection fails. "
   1802                     "If non-zero, the nbd driver will repeat connection "
   1803                     "attempts until successful or until @open-timeout seconds "
   1804                     "have elapsed. Default 0",
   1805         },
   1806         { /* end of list */ }
   1807     },
   1808 };
   1809 
   1810 static int nbd_process_options(BlockDriverState *bs, QDict *options,
   1811                                Error **errp)
   1812 {
   1813     BDRVNBDState *s = bs->opaque;
   1814     QemuOpts *opts;
   1815     int ret = -EINVAL;
   1816 
   1817     opts = qemu_opts_create(&nbd_runtime_opts, NULL, 0, &error_abort);
   1818     if (!qemu_opts_absorb_qdict(opts, options, errp)) {
   1819         goto error;
   1820     }
   1821 
   1822     /* Translate @host, @port, and @path to a SocketAddress */
   1823     if (!nbd_process_legacy_socket_options(options, opts, errp)) {
   1824         goto error;
   1825     }
   1826 
   1827     /* Pop the config into our state object. Exit if invalid. */
   1828     s->saddr = nbd_config(s, options, errp);
   1829     if (!s->saddr) {
   1830         goto error;
   1831     }
   1832 
   1833     s->export = g_strdup(qemu_opt_get(opts, "export"));
   1834     if (s->export && strlen(s->export) > NBD_MAX_STRING_SIZE) {
   1835         error_setg(errp, "export name too long to send to server");
   1836         goto error;
   1837     }
   1838 
   1839     s->tlscredsid = g_strdup(qemu_opt_get(opts, "tls-creds"));
   1840     if (s->tlscredsid) {
   1841         s->tlscreds = nbd_get_tls_creds(s->tlscredsid, errp);
   1842         if (!s->tlscreds) {
   1843             goto error;
   1844         }
   1845 
   1846         s->tlshostname = g_strdup(qemu_opt_get(opts, "tls-hostname"));
   1847         if (!s->tlshostname &&
   1848             s->saddr->type == SOCKET_ADDRESS_TYPE_INET) {
   1849             s->tlshostname = g_strdup(s->saddr->u.inet.host);
   1850         }
   1851     }
   1852 
   1853     s->x_dirty_bitmap = g_strdup(qemu_opt_get(opts, "x-dirty-bitmap"));
   1854     if (s->x_dirty_bitmap && strlen(s->x_dirty_bitmap) > NBD_MAX_STRING_SIZE) {
   1855         error_setg(errp, "x-dirty-bitmap query too long to send to server");
   1856         goto error;
   1857     }
   1858 
   1859     s->reconnect_delay = qemu_opt_get_number(opts, "reconnect-delay", 0);
   1860     s->open_timeout = qemu_opt_get_number(opts, "open-timeout", 0);
   1861 
   1862     ret = 0;
   1863 
   1864  error:
   1865     qemu_opts_del(opts);
   1866     return ret;
   1867 }
   1868 
   1869 static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
   1870                     Error **errp)
   1871 {
   1872     int ret;
   1873     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
   1874 
   1875     s->bs = bs;
   1876     qemu_mutex_init(&s->requests_lock);
   1877     qemu_co_queue_init(&s->free_sema);
   1878     qemu_co_mutex_init(&s->send_mutex);
   1879     qemu_co_mutex_init(&s->receive_mutex);
   1880 
   1881     if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
   1882         return -EEXIST;
   1883     }
   1884 
   1885     ret = nbd_process_options(bs, options, errp);
   1886     if (ret < 0) {
   1887         goto fail;
   1888     }
   1889 
   1890     s->conn = nbd_client_connection_new(s->saddr, true, s->export,
   1891                                         s->x_dirty_bitmap, s->tlscreds,
   1892                                         s->tlshostname);
   1893 
   1894     if (s->open_timeout) {
   1895         nbd_client_connection_enable_retry(s->conn);
   1896         open_timer_init(s, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
   1897                         s->open_timeout * NANOSECONDS_PER_SECOND);
   1898     }
   1899 
   1900     s->state = NBD_CLIENT_CONNECTING_WAIT;
   1901     ret = nbd_do_establish_connection(bs, true, errp);
   1902     if (ret < 0) {
   1903         goto fail;
   1904     }
   1905 
   1906     /*
   1907      * The connect attempt is done, so we no longer need this timer.
   1908      * Delete it, because we do not want it to be around when this node
   1909      * is drained or closed.
   1910      */
   1911     open_timer_del(s);
   1912 
   1913     nbd_client_connection_enable_retry(s->conn);
   1914 
   1915     return 0;
   1916 
   1917 fail:
   1918     open_timer_del(s);
   1919     nbd_clear_bdrvstate(bs);
   1920     return ret;
   1921 }
   1922 
   1923 static int coroutine_fn nbd_co_flush(BlockDriverState *bs)
   1924 {
   1925     return nbd_client_co_flush(bs);
   1926 }
   1927 
   1928 static void nbd_refresh_limits(BlockDriverState *bs, Error **errp)
   1929 {
   1930     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
   1931     uint32_t min = s->info.min_block;
   1932     uint32_t max = MIN_NON_ZERO(NBD_MAX_BUFFER_SIZE, s->info.max_block);
   1933 
   1934     /*
   1935      * If the server did not advertise an alignment:
   1936      * - a size that is not sector-aligned implies that an alignment
   1937      *   of 1 can be used to access those tail bytes
   1938      * - advertisement of block status requires an alignment of 1, so
   1939      *   that we don't violate block layer constraints that block
   1940      *   status is always aligned (as we can't control whether the
   1941      *   server will report sub-sector extents, such as a hole at EOF
   1942      *   on an unaligned POSIX file)
   1943      * - otherwise, assume the server is so old that we are safer avoiding
   1944      *   sub-sector requests
   1945      */
   1946     if (!min) {
   1947         min = (!QEMU_IS_ALIGNED(s->info.size, BDRV_SECTOR_SIZE) ||
   1948                s->info.base_allocation) ? 1 : BDRV_SECTOR_SIZE;
   1949     }
   1950 
   1951     bs->bl.request_alignment = min;
   1952     bs->bl.max_pdiscard = QEMU_ALIGN_DOWN(INT_MAX, min);
   1953     bs->bl.max_pwrite_zeroes = max;
   1954     bs->bl.max_transfer = max;
   1955 
   1956     if (s->info.opt_block &&
   1957         s->info.opt_block > bs->bl.opt_transfer) {
   1958         bs->bl.opt_transfer = s->info.opt_block;
   1959     }
   1960 }
   1961 
   1962 static void nbd_close(BlockDriverState *bs)
   1963 {
   1964     nbd_client_close(bs);
   1965     nbd_clear_bdrvstate(bs);
   1966 }
   1967 
   1968 /*
   1969  * NBD cannot truncate, but if the caller asks to truncate to the same size, or
   1970  * to a smaller size with exact=false, there is no reason to fail the
   1971  * operation.
   1972  *
   1973  * Preallocation mode is ignored since it does not seems useful to fail when
   1974  * we never change anything.
   1975  */
   1976 static int coroutine_fn nbd_co_truncate(BlockDriverState *bs, int64_t offset,
   1977                                         bool exact, PreallocMode prealloc,
   1978                                         BdrvRequestFlags flags, Error **errp)
   1979 {
   1980     BDRVNBDState *s = bs->opaque;
   1981 
   1982     if (offset != s->info.size && exact) {
   1983         error_setg(errp, "Cannot resize NBD nodes");
   1984         return -ENOTSUP;
   1985     }
   1986 
   1987     if (offset > s->info.size) {
   1988         error_setg(errp, "Cannot grow NBD nodes");
   1989         return -EINVAL;
   1990     }
   1991 
   1992     return 0;
   1993 }
   1994 
   1995 static int64_t nbd_getlength(BlockDriverState *bs)
   1996 {
   1997     BDRVNBDState *s = bs->opaque;
   1998 
   1999     return s->info.size;
   2000 }
   2001 
   2002 static void nbd_refresh_filename(BlockDriverState *bs)
   2003 {
   2004     BDRVNBDState *s = bs->opaque;
   2005     const char *host = NULL, *port = NULL, *path = NULL;
   2006     size_t len = 0;
   2007 
   2008     if (s->saddr->type == SOCKET_ADDRESS_TYPE_INET) {
   2009         const InetSocketAddress *inet = &s->saddr->u.inet;
   2010         if (!inet->has_ipv4 && !inet->has_ipv6 && !inet->has_to) {
   2011             host = inet->host;
   2012             port = inet->port;
   2013         }
   2014     } else if (s->saddr->type == SOCKET_ADDRESS_TYPE_UNIX) {
   2015         path = s->saddr->u.q_unix.path;
   2016     } /* else can't represent as pseudo-filename */
   2017 
   2018     if (path && s->export) {
   2019         len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
   2020                        "nbd+unix:///%s?socket=%s", s->export, path);
   2021     } else if (path && !s->export) {
   2022         len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
   2023                        "nbd+unix://?socket=%s", path);
   2024     } else if (host && s->export) {
   2025         len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
   2026                        "nbd://%s:%s/%s", host, port, s->export);
   2027     } else if (host && !s->export) {
   2028         len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
   2029                        "nbd://%s:%s", host, port);
   2030     }
   2031     if (len >= sizeof(bs->exact_filename)) {
   2032         /* Name is too long to represent exactly, so leave it empty. */
   2033         bs->exact_filename[0] = '\0';
   2034     }
   2035 }
   2036 
   2037 static char *nbd_dirname(BlockDriverState *bs, Error **errp)
   2038 {
   2039     /* The generic bdrv_dirname() implementation is able to work out some
   2040      * directory name for NBD nodes, but that would be wrong. So far there is no
   2041      * specification for how "export paths" would work, so NBD does not have
   2042      * directory names. */
   2043     error_setg(errp, "Cannot generate a base directory for NBD nodes");
   2044     return NULL;
   2045 }
   2046 
   2047 static const char *const nbd_strong_runtime_opts[] = {
   2048     "path",
   2049     "host",
   2050     "port",
   2051     "export",
   2052     "tls-creds",
   2053     "tls-hostname",
   2054     "server.",
   2055 
   2056     NULL
   2057 };
   2058 
   2059 static void nbd_cancel_in_flight(BlockDriverState *bs)
   2060 {
   2061     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
   2062 
   2063     reconnect_delay_timer_del(s);
   2064 
   2065     qemu_mutex_lock(&s->requests_lock);
   2066     if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
   2067         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
   2068     }
   2069     qemu_mutex_unlock(&s->requests_lock);
   2070 
   2071     nbd_co_establish_connection_cancel(s->conn);
   2072 }
   2073 
   2074 static void nbd_attach_aio_context(BlockDriverState *bs,
   2075                                    AioContext *new_context)
   2076 {
   2077     BDRVNBDState *s = bs->opaque;
   2078 
   2079     /* The open_timer is used only during nbd_open() */
   2080     assert(!s->open_timer);
   2081 
   2082     /*
   2083      * The reconnect_delay_timer is scheduled in I/O paths when the
   2084      * connection is lost, to cancel the reconnection attempt after a
   2085      * given time.  Once this attempt is done (successfully or not),
   2086      * nbd_reconnect_attempt() ensures the timer is deleted before the
   2087      * respective I/O request is resumed.
   2088      * Since the AioContext can only be changed when a node is drained,
   2089      * the reconnect_delay_timer cannot be active here.
   2090      */
   2091     assert(!s->reconnect_delay_timer);
   2092 
   2093     if (s->ioc) {
   2094         qio_channel_attach_aio_context(s->ioc, new_context);
   2095     }
   2096 }
   2097 
   2098 static void nbd_detach_aio_context(BlockDriverState *bs)
   2099 {
   2100     BDRVNBDState *s = bs->opaque;
   2101 
   2102     assert(!s->open_timer);
   2103     assert(!s->reconnect_delay_timer);
   2104 
   2105     if (s->ioc) {
   2106         qio_channel_detach_aio_context(s->ioc);
   2107     }
   2108 }
   2109 
   2110 static BlockDriver bdrv_nbd = {
   2111     .format_name                = "nbd",
   2112     .protocol_name              = "nbd",
   2113     .instance_size              = sizeof(BDRVNBDState),
   2114     .bdrv_parse_filename        = nbd_parse_filename,
   2115     .bdrv_co_create_opts        = bdrv_co_create_opts_simple,
   2116     .create_opts                = &bdrv_create_opts_simple,
   2117     .bdrv_file_open             = nbd_open,
   2118     .bdrv_reopen_prepare        = nbd_client_reopen_prepare,
   2119     .bdrv_co_preadv             = nbd_client_co_preadv,
   2120     .bdrv_co_pwritev            = nbd_client_co_pwritev,
   2121     .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
   2122     .bdrv_close                 = nbd_close,
   2123     .bdrv_co_flush_to_os        = nbd_co_flush,
   2124     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
   2125     .bdrv_refresh_limits        = nbd_refresh_limits,
   2126     .bdrv_co_truncate           = nbd_co_truncate,
   2127     .bdrv_getlength             = nbd_getlength,
   2128     .bdrv_refresh_filename      = nbd_refresh_filename,
   2129     .bdrv_co_block_status       = nbd_client_co_block_status,
   2130     .bdrv_dirname               = nbd_dirname,
   2131     .strong_runtime_opts        = nbd_strong_runtime_opts,
   2132     .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
   2133 
   2134     .bdrv_attach_aio_context    = nbd_attach_aio_context,
   2135     .bdrv_detach_aio_context    = nbd_detach_aio_context,
   2136 };
   2137 
   2138 static BlockDriver bdrv_nbd_tcp = {
   2139     .format_name                = "nbd",
   2140     .protocol_name              = "nbd+tcp",
   2141     .instance_size              = sizeof(BDRVNBDState),
   2142     .bdrv_parse_filename        = nbd_parse_filename,
   2143     .bdrv_co_create_opts        = bdrv_co_create_opts_simple,
   2144     .create_opts                = &bdrv_create_opts_simple,
   2145     .bdrv_file_open             = nbd_open,
   2146     .bdrv_reopen_prepare        = nbd_client_reopen_prepare,
   2147     .bdrv_co_preadv             = nbd_client_co_preadv,
   2148     .bdrv_co_pwritev            = nbd_client_co_pwritev,
   2149     .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
   2150     .bdrv_close                 = nbd_close,
   2151     .bdrv_co_flush_to_os        = nbd_co_flush,
   2152     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
   2153     .bdrv_refresh_limits        = nbd_refresh_limits,
   2154     .bdrv_co_truncate           = nbd_co_truncate,
   2155     .bdrv_getlength             = nbd_getlength,
   2156     .bdrv_refresh_filename      = nbd_refresh_filename,
   2157     .bdrv_co_block_status       = nbd_client_co_block_status,
   2158     .bdrv_dirname               = nbd_dirname,
   2159     .strong_runtime_opts        = nbd_strong_runtime_opts,
   2160     .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
   2161 
   2162     .bdrv_attach_aio_context    = nbd_attach_aio_context,
   2163     .bdrv_detach_aio_context    = nbd_detach_aio_context,
   2164 };
   2165 
   2166 static BlockDriver bdrv_nbd_unix = {
   2167     .format_name                = "nbd",
   2168     .protocol_name              = "nbd+unix",
   2169     .instance_size              = sizeof(BDRVNBDState),
   2170     .bdrv_parse_filename        = nbd_parse_filename,
   2171     .bdrv_co_create_opts        = bdrv_co_create_opts_simple,
   2172     .create_opts                = &bdrv_create_opts_simple,
   2173     .bdrv_file_open             = nbd_open,
   2174     .bdrv_reopen_prepare        = nbd_client_reopen_prepare,
   2175     .bdrv_co_preadv             = nbd_client_co_preadv,
   2176     .bdrv_co_pwritev            = nbd_client_co_pwritev,
   2177     .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
   2178     .bdrv_close                 = nbd_close,
   2179     .bdrv_co_flush_to_os        = nbd_co_flush,
   2180     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
   2181     .bdrv_refresh_limits        = nbd_refresh_limits,
   2182     .bdrv_co_truncate           = nbd_co_truncate,
   2183     .bdrv_getlength             = nbd_getlength,
   2184     .bdrv_refresh_filename      = nbd_refresh_filename,
   2185     .bdrv_co_block_status       = nbd_client_co_block_status,
   2186     .bdrv_dirname               = nbd_dirname,
   2187     .strong_runtime_opts        = nbd_strong_runtime_opts,
   2188     .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
   2189 
   2190     .bdrv_attach_aio_context    = nbd_attach_aio_context,
   2191     .bdrv_detach_aio_context    = nbd_detach_aio_context,
   2192 };
   2193 
   2194 static void bdrv_nbd_init(void)
   2195 {
   2196     bdrv_register(&bdrv_nbd);
   2197     bdrv_register(&bdrv_nbd_tcp);
   2198     bdrv_register(&bdrv_nbd_unix);
   2199 }
   2200 
   2201 block_init(bdrv_nbd_init);