qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

block-copy.c (32331B)


      1 /*
      2  * block_copy API
      3  *
      4  * Copyright (C) 2013 Proxmox Server Solutions
      5  * Copyright (c) 2019 Virtuozzo International GmbH.
      6  *
      7  * Authors:
      8  *  Dietmar Maurer (dietmar@proxmox.com)
      9  *  Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
     10  *
     11  * This work is licensed under the terms of the GNU GPL, version 2 or later.
     12  * See the COPYING file in the top-level directory.
     13  */
     14 
     15 #include "qemu/osdep.h"
     16 
     17 #include "trace.h"
     18 #include "qapi/error.h"
     19 #include "block/block-copy.h"
     20 #include "block/reqlist.h"
     21 #include "sysemu/block-backend.h"
     22 #include "qemu/units.h"
     23 #include "qemu/coroutine.h"
     24 #include "block/aio_task.h"
     25 #include "qemu/error-report.h"
     26 #include "qemu/memalign.h"
     27 
     28 #define BLOCK_COPY_MAX_COPY_RANGE (16 * MiB)
     29 #define BLOCK_COPY_MAX_BUFFER (1 * MiB)
     30 #define BLOCK_COPY_MAX_MEM (128 * MiB)
     31 #define BLOCK_COPY_MAX_WORKERS 64
     32 #define BLOCK_COPY_SLICE_TIME 100000000ULL /* ns */
     33 #define BLOCK_COPY_CLUSTER_SIZE_DEFAULT (1 << 16)
     34 
     35 typedef enum {
     36     COPY_READ_WRITE_CLUSTER,
     37     COPY_READ_WRITE,
     38     COPY_WRITE_ZEROES,
     39     COPY_RANGE_SMALL,
     40     COPY_RANGE_FULL
     41 } BlockCopyMethod;
     42 
     43 static coroutine_fn int block_copy_task_entry(AioTask *task);
     44 
     45 typedef struct BlockCopyCallState {
     46     /* Fields initialized in block_copy_async() and never changed. */
     47     BlockCopyState *s;
     48     int64_t offset;
     49     int64_t bytes;
     50     int max_workers;
     51     int64_t max_chunk;
     52     bool ignore_ratelimit;
     53     BlockCopyAsyncCallbackFunc cb;
     54     void *cb_opaque;
     55     /* Coroutine where async block-copy is running */
     56     Coroutine *co;
     57 
     58     /* Fields whose state changes throughout the execution */
     59     bool finished; /* atomic */
     60     QemuCoSleep sleep; /* TODO: protect API with a lock */
     61     bool cancelled; /* atomic */
     62     /* To reference all call states from BlockCopyState */
     63     QLIST_ENTRY(BlockCopyCallState) list;
     64 
     65     /*
     66      * Fields that report information about return values and erros.
     67      * Protected by lock in BlockCopyState.
     68      */
     69     bool error_is_read;
     70     /*
     71      * @ret is set concurrently by tasks under mutex. Only set once by first
     72      * failed task (and untouched if no task failed).
     73      * After finishing (call_state->finished is true), it is not modified
     74      * anymore and may be safely read without mutex.
     75      */
     76     int ret;
     77 } BlockCopyCallState;
     78 
     79 typedef struct BlockCopyTask {
     80     AioTask task;
     81 
     82     /*
     83      * Fields initialized in block_copy_task_create()
     84      * and never changed.
     85      */
     86     BlockCopyState *s;
     87     BlockCopyCallState *call_state;
     88     /*
     89      * @method can also be set again in the while loop of
     90      * block_copy_dirty_clusters(), but it is never accessed concurrently
     91      * because the only other function that reads it is
     92      * block_copy_task_entry() and it is invoked afterwards in the same
     93      * iteration.
     94      */
     95     BlockCopyMethod method;
     96 
     97     /*
     98      * Generally, req is protected by lock in BlockCopyState, Still req.offset
     99      * is only set on task creation, so may be read concurrently after creation.
    100      * req.bytes is changed at most once, and need only protecting the case of
    101      * parallel read while updating @bytes value in block_copy_task_shrink().
    102      */
    103     BlockReq req;
    104 } BlockCopyTask;
    105 
    106 static int64_t task_end(BlockCopyTask *task)
    107 {
    108     return task->req.offset + task->req.bytes;
    109 }
    110 
    111 typedef struct BlockCopyState {
    112     /*
    113      * BdrvChild objects are not owned or managed by block-copy. They are
    114      * provided by block-copy user and user is responsible for appropriate
    115      * permissions on these children.
    116      */
    117     BdrvChild *source;
    118     BdrvChild *target;
    119 
    120     /*
    121      * Fields initialized in block_copy_state_new()
    122      * and never changed.
    123      */
    124     int64_t cluster_size;
    125     int64_t max_transfer;
    126     uint64_t len;
    127     BdrvRequestFlags write_flags;
    128 
    129     /*
    130      * Fields whose state changes throughout the execution
    131      * Protected by lock.
    132      */
    133     CoMutex lock;
    134     int64_t in_flight_bytes;
    135     BlockCopyMethod method;
    136     BlockReqList reqs;
    137     QLIST_HEAD(, BlockCopyCallState) calls;
    138     /*
    139      * skip_unallocated:
    140      *
    141      * Used by sync=top jobs, which first scan the source node for unallocated
    142      * areas and clear them in the copy_bitmap.  During this process, the bitmap
    143      * is thus not fully initialized: It may still have bits set for areas that
    144      * are unallocated and should actually not be copied.
    145      *
    146      * This is indicated by skip_unallocated.
    147      *
    148      * In this case, block_copy() will query the source’s allocation status,
    149      * skip unallocated regions, clear them in the copy_bitmap, and invoke
    150      * block_copy_reset_unallocated() every time it does.
    151      */
    152     bool skip_unallocated; /* atomic */
    153     /* State fields that use a thread-safe API */
    154     BdrvDirtyBitmap *copy_bitmap;
    155     ProgressMeter *progress;
    156     SharedResource *mem;
    157     RateLimit rate_limit;
    158 } BlockCopyState;
    159 
    160 /* Called with lock held */
    161 static int64_t block_copy_chunk_size(BlockCopyState *s)
    162 {
    163     switch (s->method) {
    164     case COPY_READ_WRITE_CLUSTER:
    165         return s->cluster_size;
    166     case COPY_READ_WRITE:
    167     case COPY_RANGE_SMALL:
    168         return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER),
    169                    s->max_transfer);
    170     case COPY_RANGE_FULL:
    171         return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_COPY_RANGE),
    172                    s->max_transfer);
    173     default:
    174         /* Cannot have COPY_WRITE_ZEROES here.  */
    175         abort();
    176     }
    177 }
    178 
    179 /*
    180  * Search for the first dirty area in offset/bytes range and create task at
    181  * the beginning of it.
    182  */
    183 static coroutine_fn BlockCopyTask *
    184 block_copy_task_create(BlockCopyState *s, BlockCopyCallState *call_state,
    185                        int64_t offset, int64_t bytes)
    186 {
    187     BlockCopyTask *task;
    188     int64_t max_chunk;
    189 
    190     QEMU_LOCK_GUARD(&s->lock);
    191     max_chunk = MIN_NON_ZERO(block_copy_chunk_size(s), call_state->max_chunk);
    192     if (!bdrv_dirty_bitmap_next_dirty_area(s->copy_bitmap,
    193                                            offset, offset + bytes,
    194                                            max_chunk, &offset, &bytes))
    195     {
    196         return NULL;
    197     }
    198 
    199     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
    200     bytes = QEMU_ALIGN_UP(bytes, s->cluster_size);
    201 
    202     /* region is dirty, so no existent tasks possible in it */
    203     assert(!reqlist_find_conflict(&s->reqs, offset, bytes));
    204 
    205     bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
    206     s->in_flight_bytes += bytes;
    207 
    208     task = g_new(BlockCopyTask, 1);
    209     *task = (BlockCopyTask) {
    210         .task.func = block_copy_task_entry,
    211         .s = s,
    212         .call_state = call_state,
    213         .method = s->method,
    214     };
    215     reqlist_init_req(&s->reqs, &task->req, offset, bytes);
    216 
    217     return task;
    218 }
    219 
    220 /*
    221  * block_copy_task_shrink
    222  *
    223  * Drop the tail of the task to be handled later. Set dirty bits back and
    224  * wake up all tasks waiting for us (may be some of them are not intersecting
    225  * with shrunk task)
    226  */
    227 static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task,
    228                                                 int64_t new_bytes)
    229 {
    230     QEMU_LOCK_GUARD(&task->s->lock);
    231     if (new_bytes == task->req.bytes) {
    232         return;
    233     }
    234 
    235     assert(new_bytes > 0 && new_bytes < task->req.bytes);
    236 
    237     task->s->in_flight_bytes -= task->req.bytes - new_bytes;
    238     bdrv_set_dirty_bitmap(task->s->copy_bitmap,
    239                           task->req.offset + new_bytes,
    240                           task->req.bytes - new_bytes);
    241 
    242     reqlist_shrink_req(&task->req, new_bytes);
    243 }
    244 
    245 static void coroutine_fn block_copy_task_end(BlockCopyTask *task, int ret)
    246 {
    247     QEMU_LOCK_GUARD(&task->s->lock);
    248     task->s->in_flight_bytes -= task->req.bytes;
    249     if (ret < 0) {
    250         bdrv_set_dirty_bitmap(task->s->copy_bitmap, task->req.offset,
    251                               task->req.bytes);
    252     }
    253     if (task->s->progress) {
    254         progress_set_remaining(task->s->progress,
    255                                bdrv_get_dirty_count(task->s->copy_bitmap) +
    256                                task->s->in_flight_bytes);
    257     }
    258     reqlist_remove_req(&task->req);
    259 }
    260 
    261 void block_copy_state_free(BlockCopyState *s)
    262 {
    263     if (!s) {
    264         return;
    265     }
    266 
    267     ratelimit_destroy(&s->rate_limit);
    268     bdrv_release_dirty_bitmap(s->copy_bitmap);
    269     shres_destroy(s->mem);
    270     g_free(s);
    271 }
    272 
    273 static uint32_t block_copy_max_transfer(BdrvChild *source, BdrvChild *target)
    274 {
    275     return MIN_NON_ZERO(INT_MAX,
    276                         MIN_NON_ZERO(source->bs->bl.max_transfer,
    277                                      target->bs->bl.max_transfer));
    278 }
    279 
    280 void block_copy_set_copy_opts(BlockCopyState *s, bool use_copy_range,
    281                               bool compress)
    282 {
    283     /* Keep BDRV_REQ_SERIALISING set (or not set) in block_copy_state_new() */
    284     s->write_flags = (s->write_flags & BDRV_REQ_SERIALISING) |
    285         (compress ? BDRV_REQ_WRITE_COMPRESSED : 0);
    286 
    287     if (s->max_transfer < s->cluster_size) {
    288         /*
    289          * copy_range does not respect max_transfer. We don't want to bother
    290          * with requests smaller than block-copy cluster size, so fallback to
    291          * buffered copying (read and write respect max_transfer on their
    292          * behalf).
    293          */
    294         s->method = COPY_READ_WRITE_CLUSTER;
    295     } else if (compress) {
    296         /* Compression supports only cluster-size writes and no copy-range. */
    297         s->method = COPY_READ_WRITE_CLUSTER;
    298     } else {
    299         /*
    300          * If copy range enabled, start with COPY_RANGE_SMALL, until first
    301          * successful copy_range (look at block_copy_do_copy).
    302          */
    303         s->method = use_copy_range ? COPY_RANGE_SMALL : COPY_READ_WRITE;
    304     }
    305 }
    306 
    307 static int64_t block_copy_calculate_cluster_size(BlockDriverState *target,
    308                                                  Error **errp)
    309 {
    310     int ret;
    311     BlockDriverInfo bdi;
    312     bool target_does_cow = bdrv_backing_chain_next(target);
    313 
    314     /*
    315      * If there is no backing file on the target, we cannot rely on COW if our
    316      * backup cluster size is smaller than the target cluster size. Even for
    317      * targets with a backing file, try to avoid COW if possible.
    318      */
    319     ret = bdrv_get_info(target, &bdi);
    320     if (ret == -ENOTSUP && !target_does_cow) {
    321         /* Cluster size is not defined */
    322         warn_report("The target block device doesn't provide "
    323                     "information about the block size and it doesn't have a "
    324                     "backing file. The default block size of %u bytes is "
    325                     "used. If the actual block size of the target exceeds "
    326                     "this default, the backup may be unusable",
    327                     BLOCK_COPY_CLUSTER_SIZE_DEFAULT);
    328         return BLOCK_COPY_CLUSTER_SIZE_DEFAULT;
    329     } else if (ret < 0 && !target_does_cow) {
    330         error_setg_errno(errp, -ret,
    331             "Couldn't determine the cluster size of the target image, "
    332             "which has no backing file");
    333         error_append_hint(errp,
    334             "Aborting, since this may create an unusable destination image\n");
    335         return ret;
    336     } else if (ret < 0 && target_does_cow) {
    337         /* Not fatal; just trudge on ahead. */
    338         return BLOCK_COPY_CLUSTER_SIZE_DEFAULT;
    339     }
    340 
    341     return MAX(BLOCK_COPY_CLUSTER_SIZE_DEFAULT, bdi.cluster_size);
    342 }
    343 
    344 BlockCopyState *block_copy_state_new(BdrvChild *source, BdrvChild *target,
    345                                      const BdrvDirtyBitmap *bitmap,
    346                                      Error **errp)
    347 {
    348     ERRP_GUARD();
    349     BlockCopyState *s;
    350     int64_t cluster_size;
    351     BdrvDirtyBitmap *copy_bitmap;
    352     bool is_fleecing;
    353 
    354     cluster_size = block_copy_calculate_cluster_size(target->bs, errp);
    355     if (cluster_size < 0) {
    356         return NULL;
    357     }
    358 
    359     copy_bitmap = bdrv_create_dirty_bitmap(source->bs, cluster_size, NULL,
    360                                            errp);
    361     if (!copy_bitmap) {
    362         return NULL;
    363     }
    364     bdrv_disable_dirty_bitmap(copy_bitmap);
    365     if (bitmap) {
    366         if (!bdrv_merge_dirty_bitmap(copy_bitmap, bitmap, NULL, errp)) {
    367             error_prepend(errp, "Failed to merge bitmap '%s' to internal "
    368                           "copy-bitmap: ", bdrv_dirty_bitmap_name(bitmap));
    369             bdrv_release_dirty_bitmap(copy_bitmap);
    370             return NULL;
    371         }
    372     } else {
    373         bdrv_set_dirty_bitmap(copy_bitmap, 0,
    374                               bdrv_dirty_bitmap_size(copy_bitmap));
    375     }
    376 
    377     /*
    378      * If source is in backing chain of target assume that target is going to be
    379      * used for "image fleecing", i.e. it should represent a kind of snapshot of
    380      * source at backup-start point in time. And target is going to be read by
    381      * somebody (for example, used as NBD export) during backup job.
    382      *
    383      * In this case, we need to add BDRV_REQ_SERIALISING write flag to avoid
    384      * intersection of backup writes and third party reads from target,
    385      * otherwise reading from target we may occasionally read already updated by
    386      * guest data.
    387      *
    388      * For more information see commit f8d59dfb40bb and test
    389      * tests/qemu-iotests/222
    390      */
    391     is_fleecing = bdrv_chain_contains(target->bs, source->bs);
    392 
    393     s = g_new(BlockCopyState, 1);
    394     *s = (BlockCopyState) {
    395         .source = source,
    396         .target = target,
    397         .copy_bitmap = copy_bitmap,
    398         .cluster_size = cluster_size,
    399         .len = bdrv_dirty_bitmap_size(copy_bitmap),
    400         .write_flags = (is_fleecing ? BDRV_REQ_SERIALISING : 0),
    401         .mem = shres_create(BLOCK_COPY_MAX_MEM),
    402         .max_transfer = QEMU_ALIGN_DOWN(
    403                                     block_copy_max_transfer(source, target),
    404                                     cluster_size),
    405     };
    406 
    407     block_copy_set_copy_opts(s, false, false);
    408 
    409     ratelimit_init(&s->rate_limit);
    410     qemu_co_mutex_init(&s->lock);
    411     QLIST_INIT(&s->reqs);
    412     QLIST_INIT(&s->calls);
    413 
    414     return s;
    415 }
    416 
    417 /* Only set before running the job, no need for locking. */
    418 void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm)
    419 {
    420     s->progress = pm;
    421 }
    422 
    423 /*
    424  * Takes ownership of @task
    425  *
    426  * If pool is NULL directly run the task, otherwise schedule it into the pool.
    427  *
    428  * Returns: task.func return code if pool is NULL
    429  *          otherwise -ECANCELED if pool status is bad
    430  *          otherwise 0 (successfully scheduled)
    431  */
    432 static coroutine_fn int block_copy_task_run(AioTaskPool *pool,
    433                                             BlockCopyTask *task)
    434 {
    435     if (!pool) {
    436         int ret = task->task.func(&task->task);
    437 
    438         g_free(task);
    439         return ret;
    440     }
    441 
    442     aio_task_pool_wait_slot(pool);
    443     if (aio_task_pool_status(pool) < 0) {
    444         co_put_to_shres(task->s->mem, task->req.bytes);
    445         block_copy_task_end(task, -ECANCELED);
    446         g_free(task);
    447         return -ECANCELED;
    448     }
    449 
    450     aio_task_pool_start_task(pool, &task->task);
    451 
    452     return 0;
    453 }
    454 
    455 /*
    456  * block_copy_do_copy
    457  *
    458  * Do copy of cluster-aligned chunk. Requested region is allowed to exceed
    459  * s->len only to cover last cluster when s->len is not aligned to clusters.
    460  *
    461  * No sync here: nor bitmap neighter intersecting requests handling, only copy.
    462  *
    463  * @method is an in-out argument, so that copy_range can be either extended to
    464  * a full-size buffer or disabled if the copy_range attempt fails.  The output
    465  * value of @method should be used for subsequent tasks.
    466  * Returns 0 on success.
    467  */
    468 static int coroutine_fn block_copy_do_copy(BlockCopyState *s,
    469                                            int64_t offset, int64_t bytes,
    470                                            BlockCopyMethod *method,
    471                                            bool *error_is_read)
    472 {
    473     int ret;
    474     int64_t nbytes = MIN(offset + bytes, s->len) - offset;
    475     void *bounce_buffer = NULL;
    476 
    477     assert(offset >= 0 && bytes > 0 && INT64_MAX - offset >= bytes);
    478     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
    479     assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
    480     assert(offset < s->len);
    481     assert(offset + bytes <= s->len ||
    482            offset + bytes == QEMU_ALIGN_UP(s->len, s->cluster_size));
    483     assert(nbytes < INT_MAX);
    484 
    485     switch (*method) {
    486     case COPY_WRITE_ZEROES:
    487         ret = bdrv_co_pwrite_zeroes(s->target, offset, nbytes, s->write_flags &
    488                                     ~BDRV_REQ_WRITE_COMPRESSED);
    489         if (ret < 0) {
    490             trace_block_copy_write_zeroes_fail(s, offset, ret);
    491             *error_is_read = false;
    492         }
    493         return ret;
    494 
    495     case COPY_RANGE_SMALL:
    496     case COPY_RANGE_FULL:
    497         ret = bdrv_co_copy_range(s->source, offset, s->target, offset, nbytes,
    498                                  0, s->write_flags);
    499         if (ret >= 0) {
    500             /* Successful copy-range, increase chunk size.  */
    501             *method = COPY_RANGE_FULL;
    502             return 0;
    503         }
    504 
    505         trace_block_copy_copy_range_fail(s, offset, ret);
    506         *method = COPY_READ_WRITE;
    507         /* Fall through to read+write with allocated buffer */
    508 
    509     case COPY_READ_WRITE_CLUSTER:
    510     case COPY_READ_WRITE:
    511         /*
    512          * In case of failed copy_range request above, we may proceed with
    513          * buffered request larger than BLOCK_COPY_MAX_BUFFER.
    514          * Still, further requests will be properly limited, so don't care too
    515          * much. Moreover the most likely case (copy_range is unsupported for
    516          * the configuration, so the very first copy_range request fails)
    517          * is handled by setting large copy_size only after first successful
    518          * copy_range.
    519          */
    520 
    521         bounce_buffer = qemu_blockalign(s->source->bs, nbytes);
    522 
    523         ret = bdrv_co_pread(s->source, offset, nbytes, bounce_buffer, 0);
    524         if (ret < 0) {
    525             trace_block_copy_read_fail(s, offset, ret);
    526             *error_is_read = true;
    527             goto out;
    528         }
    529 
    530         ret = bdrv_co_pwrite(s->target, offset, nbytes, bounce_buffer,
    531                              s->write_flags);
    532         if (ret < 0) {
    533             trace_block_copy_write_fail(s, offset, ret);
    534             *error_is_read = false;
    535             goto out;
    536         }
    537 
    538     out:
    539         qemu_vfree(bounce_buffer);
    540         break;
    541 
    542     default:
    543         abort();
    544     }
    545 
    546     return ret;
    547 }
    548 
    549 static coroutine_fn int block_copy_task_entry(AioTask *task)
    550 {
    551     BlockCopyTask *t = container_of(task, BlockCopyTask, task);
    552     BlockCopyState *s = t->s;
    553     bool error_is_read = false;
    554     BlockCopyMethod method = t->method;
    555     int ret;
    556 
    557     ret = block_copy_do_copy(s, t->req.offset, t->req.bytes, &method,
    558                              &error_is_read);
    559 
    560     WITH_QEMU_LOCK_GUARD(&s->lock) {
    561         if (s->method == t->method) {
    562             s->method = method;
    563         }
    564 
    565         if (ret < 0) {
    566             if (!t->call_state->ret) {
    567                 t->call_state->ret = ret;
    568                 t->call_state->error_is_read = error_is_read;
    569             }
    570         } else if (s->progress) {
    571             progress_work_done(s->progress, t->req.bytes);
    572         }
    573     }
    574     co_put_to_shres(s->mem, t->req.bytes);
    575     block_copy_task_end(t, ret);
    576 
    577     return ret;
    578 }
    579 
    580 static int block_copy_block_status(BlockCopyState *s, int64_t offset,
    581                                    int64_t bytes, int64_t *pnum)
    582 {
    583     int64_t num;
    584     BlockDriverState *base;
    585     int ret;
    586 
    587     if (qatomic_read(&s->skip_unallocated)) {
    588         base = bdrv_backing_chain_next(s->source->bs);
    589     } else {
    590         base = NULL;
    591     }
    592 
    593     ret = bdrv_block_status_above(s->source->bs, base, offset, bytes, &num,
    594                                   NULL, NULL);
    595     if (ret < 0 || num < s->cluster_size) {
    596         /*
    597          * On error or if failed to obtain large enough chunk just fallback to
    598          * copy one cluster.
    599          */
    600         num = s->cluster_size;
    601         ret = BDRV_BLOCK_ALLOCATED | BDRV_BLOCK_DATA;
    602     } else if (offset + num == s->len) {
    603         num = QEMU_ALIGN_UP(num, s->cluster_size);
    604     } else {
    605         num = QEMU_ALIGN_DOWN(num, s->cluster_size);
    606     }
    607 
    608     *pnum = num;
    609     return ret;
    610 }
    611 
    612 /*
    613  * Check if the cluster starting at offset is allocated or not.
    614  * return via pnum the number of contiguous clusters sharing this allocation.
    615  */
    616 static int block_copy_is_cluster_allocated(BlockCopyState *s, int64_t offset,
    617                                            int64_t *pnum)
    618 {
    619     BlockDriverState *bs = s->source->bs;
    620     int64_t count, total_count = 0;
    621     int64_t bytes = s->len - offset;
    622     int ret;
    623 
    624     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
    625 
    626     while (true) {
    627         ret = bdrv_is_allocated(bs, offset, bytes, &count);
    628         if (ret < 0) {
    629             return ret;
    630         }
    631 
    632         total_count += count;
    633 
    634         if (ret || count == 0) {
    635             /*
    636              * ret: partial segment(s) are considered allocated.
    637              * otherwise: unallocated tail is treated as an entire segment.
    638              */
    639             *pnum = DIV_ROUND_UP(total_count, s->cluster_size);
    640             return ret;
    641         }
    642 
    643         /* Unallocated segment(s) with uncertain following segment(s) */
    644         if (total_count >= s->cluster_size) {
    645             *pnum = total_count / s->cluster_size;
    646             return 0;
    647         }
    648 
    649         offset += count;
    650         bytes -= count;
    651     }
    652 }
    653 
    654 void block_copy_reset(BlockCopyState *s, int64_t offset, int64_t bytes)
    655 {
    656     QEMU_LOCK_GUARD(&s->lock);
    657 
    658     bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
    659     if (s->progress) {
    660         progress_set_remaining(s->progress,
    661                                bdrv_get_dirty_count(s->copy_bitmap) +
    662                                s->in_flight_bytes);
    663     }
    664 }
    665 
    666 /*
    667  * Reset bits in copy_bitmap starting at offset if they represent unallocated
    668  * data in the image. May reset subsequent contiguous bits.
    669  * @return 0 when the cluster at @offset was unallocated,
    670  *         1 otherwise, and -ret on error.
    671  */
    672 int64_t block_copy_reset_unallocated(BlockCopyState *s,
    673                                      int64_t offset, int64_t *count)
    674 {
    675     int ret;
    676     int64_t clusters, bytes;
    677 
    678     ret = block_copy_is_cluster_allocated(s, offset, &clusters);
    679     if (ret < 0) {
    680         return ret;
    681     }
    682 
    683     bytes = clusters * s->cluster_size;
    684 
    685     if (!ret) {
    686         block_copy_reset(s, offset, bytes);
    687     }
    688 
    689     *count = bytes;
    690     return ret;
    691 }
    692 
    693 /*
    694  * block_copy_dirty_clusters
    695  *
    696  * Copy dirty clusters in @offset/@bytes range.
    697  * Returns 1 if dirty clusters found and successfully copied, 0 if no dirty
    698  * clusters found and -errno on failure.
    699  */
    700 static int coroutine_fn
    701 block_copy_dirty_clusters(BlockCopyCallState *call_state)
    702 {
    703     BlockCopyState *s = call_state->s;
    704     int64_t offset = call_state->offset;
    705     int64_t bytes = call_state->bytes;
    706 
    707     int ret = 0;
    708     bool found_dirty = false;
    709     int64_t end = offset + bytes;
    710     AioTaskPool *aio = NULL;
    711 
    712     /*
    713      * block_copy() user is responsible for keeping source and target in same
    714      * aio context
    715      */
    716     assert(bdrv_get_aio_context(s->source->bs) ==
    717            bdrv_get_aio_context(s->target->bs));
    718 
    719     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
    720     assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
    721 
    722     while (bytes && aio_task_pool_status(aio) == 0 &&
    723            !qatomic_read(&call_state->cancelled)) {
    724         BlockCopyTask *task;
    725         int64_t status_bytes;
    726 
    727         task = block_copy_task_create(s, call_state, offset, bytes);
    728         if (!task) {
    729             /* No more dirty bits in the bitmap */
    730             trace_block_copy_skip_range(s, offset, bytes);
    731             break;
    732         }
    733         if (task->req.offset > offset) {
    734             trace_block_copy_skip_range(s, offset, task->req.offset - offset);
    735         }
    736 
    737         found_dirty = true;
    738 
    739         ret = block_copy_block_status(s, task->req.offset, task->req.bytes,
    740                                       &status_bytes);
    741         assert(ret >= 0); /* never fail */
    742         if (status_bytes < task->req.bytes) {
    743             block_copy_task_shrink(task, status_bytes);
    744         }
    745         if (qatomic_read(&s->skip_unallocated) &&
    746             !(ret & BDRV_BLOCK_ALLOCATED)) {
    747             block_copy_task_end(task, 0);
    748             trace_block_copy_skip_range(s, task->req.offset, task->req.bytes);
    749             offset = task_end(task);
    750             bytes = end - offset;
    751             g_free(task);
    752             continue;
    753         }
    754         if (ret & BDRV_BLOCK_ZERO) {
    755             task->method = COPY_WRITE_ZEROES;
    756         }
    757 
    758         if (!call_state->ignore_ratelimit) {
    759             uint64_t ns = ratelimit_calculate_delay(&s->rate_limit, 0);
    760             if (ns > 0) {
    761                 block_copy_task_end(task, -EAGAIN);
    762                 g_free(task);
    763                 qemu_co_sleep_ns_wakeable(&call_state->sleep,
    764                                           QEMU_CLOCK_REALTIME, ns);
    765                 continue;
    766             }
    767         }
    768 
    769         ratelimit_calculate_delay(&s->rate_limit, task->req.bytes);
    770 
    771         trace_block_copy_process(s, task->req.offset);
    772 
    773         co_get_from_shres(s->mem, task->req.bytes);
    774 
    775         offset = task_end(task);
    776         bytes = end - offset;
    777 
    778         if (!aio && bytes) {
    779             aio = aio_task_pool_new(call_state->max_workers);
    780         }
    781 
    782         ret = block_copy_task_run(aio, task);
    783         if (ret < 0) {
    784             goto out;
    785         }
    786     }
    787 
    788 out:
    789     if (aio) {
    790         aio_task_pool_wait_all(aio);
    791 
    792         /*
    793          * We are not really interested in -ECANCELED returned from
    794          * block_copy_task_run. If it fails, it means some task already failed
    795          * for real reason, let's return first failure.
    796          * Still, assert that we don't rewrite failure by success.
    797          *
    798          * Note: ret may be positive here because of block-status result.
    799          */
    800         assert(ret >= 0 || aio_task_pool_status(aio) < 0);
    801         ret = aio_task_pool_status(aio);
    802 
    803         aio_task_pool_free(aio);
    804     }
    805 
    806     return ret < 0 ? ret : found_dirty;
    807 }
    808 
    809 void block_copy_kick(BlockCopyCallState *call_state)
    810 {
    811     qemu_co_sleep_wake(&call_state->sleep);
    812 }
    813 
    814 /*
    815  * block_copy_common
    816  *
    817  * Copy requested region, accordingly to dirty bitmap.
    818  * Collaborate with parallel block_copy requests: if they succeed it will help
    819  * us. If they fail, we will retry not-copied regions. So, if we return error,
    820  * it means that some I/O operation failed in context of _this_ block_copy call,
    821  * not some parallel operation.
    822  */
    823 static int coroutine_fn block_copy_common(BlockCopyCallState *call_state)
    824 {
    825     int ret;
    826     BlockCopyState *s = call_state->s;
    827 
    828     qemu_co_mutex_lock(&s->lock);
    829     QLIST_INSERT_HEAD(&s->calls, call_state, list);
    830     qemu_co_mutex_unlock(&s->lock);
    831 
    832     do {
    833         ret = block_copy_dirty_clusters(call_state);
    834 
    835         if (ret == 0 && !qatomic_read(&call_state->cancelled)) {
    836             WITH_QEMU_LOCK_GUARD(&s->lock) {
    837                 /*
    838                  * Check that there is no task we still need to
    839                  * wait to complete
    840                  */
    841                 ret = reqlist_wait_one(&s->reqs, call_state->offset,
    842                                        call_state->bytes, &s->lock);
    843                 if (ret == 0) {
    844                     /*
    845                      * No pending tasks, but check again the bitmap in this
    846                      * same critical section, since a task might have failed
    847                      * between this and the critical section in
    848                      * block_copy_dirty_clusters().
    849                      *
    850                      * reqlist_wait_one return value 0 also means that it
    851                      * didn't release the lock. So, we are still in the same
    852                      * critical section, not interrupted by any concurrent
    853                      * access to state.
    854                      */
    855                     ret = bdrv_dirty_bitmap_next_dirty(s->copy_bitmap,
    856                                                        call_state->offset,
    857                                                        call_state->bytes) >= 0;
    858                 }
    859             }
    860         }
    861 
    862         /*
    863          * We retry in two cases:
    864          * 1. Some progress done
    865          *    Something was copied, which means that there were yield points
    866          *    and some new dirty bits may have appeared (due to failed parallel
    867          *    block-copy requests).
    868          * 2. We have waited for some intersecting block-copy request
    869          *    It may have failed and produced new dirty bits.
    870          */
    871     } while (ret > 0 && !qatomic_read(&call_state->cancelled));
    872 
    873     qatomic_store_release(&call_state->finished, true);
    874 
    875     if (call_state->cb) {
    876         call_state->cb(call_state->cb_opaque);
    877     }
    878 
    879     qemu_co_mutex_lock(&s->lock);
    880     QLIST_REMOVE(call_state, list);
    881     qemu_co_mutex_unlock(&s->lock);
    882 
    883     return ret;
    884 }
    885 
    886 static void coroutine_fn block_copy_async_co_entry(void *opaque)
    887 {
    888     block_copy_common(opaque);
    889 }
    890 
    891 int coroutine_fn block_copy(BlockCopyState *s, int64_t start, int64_t bytes,
    892                             bool ignore_ratelimit, uint64_t timeout_ns,
    893                             BlockCopyAsyncCallbackFunc cb,
    894                             void *cb_opaque)
    895 {
    896     int ret;
    897     BlockCopyCallState *call_state = g_new(BlockCopyCallState, 1);
    898 
    899     *call_state = (BlockCopyCallState) {
    900         .s = s,
    901         .offset = start,
    902         .bytes = bytes,
    903         .ignore_ratelimit = ignore_ratelimit,
    904         .max_workers = BLOCK_COPY_MAX_WORKERS,
    905         .cb = cb,
    906         .cb_opaque = cb_opaque,
    907     };
    908 
    909     ret = qemu_co_timeout(block_copy_async_co_entry, call_state, timeout_ns,
    910                           g_free);
    911     if (ret < 0) {
    912         assert(ret == -ETIMEDOUT);
    913         block_copy_call_cancel(call_state);
    914         /* call_state will be freed by running coroutine. */
    915         return ret;
    916     }
    917 
    918     ret = call_state->ret;
    919     g_free(call_state);
    920 
    921     return ret;
    922 }
    923 
    924 BlockCopyCallState *block_copy_async(BlockCopyState *s,
    925                                      int64_t offset, int64_t bytes,
    926                                      int max_workers, int64_t max_chunk,
    927                                      BlockCopyAsyncCallbackFunc cb,
    928                                      void *cb_opaque)
    929 {
    930     BlockCopyCallState *call_state = g_new(BlockCopyCallState, 1);
    931 
    932     *call_state = (BlockCopyCallState) {
    933         .s = s,
    934         .offset = offset,
    935         .bytes = bytes,
    936         .max_workers = max_workers,
    937         .max_chunk = max_chunk,
    938         .cb = cb,
    939         .cb_opaque = cb_opaque,
    940 
    941         .co = qemu_coroutine_create(block_copy_async_co_entry, call_state),
    942     };
    943 
    944     qemu_coroutine_enter(call_state->co);
    945 
    946     return call_state;
    947 }
    948 
    949 void block_copy_call_free(BlockCopyCallState *call_state)
    950 {
    951     if (!call_state) {
    952         return;
    953     }
    954 
    955     assert(qatomic_read(&call_state->finished));
    956     g_free(call_state);
    957 }
    958 
    959 bool block_copy_call_finished(BlockCopyCallState *call_state)
    960 {
    961     return qatomic_read(&call_state->finished);
    962 }
    963 
    964 bool block_copy_call_succeeded(BlockCopyCallState *call_state)
    965 {
    966     return qatomic_load_acquire(&call_state->finished) &&
    967            !qatomic_read(&call_state->cancelled) &&
    968            call_state->ret == 0;
    969 }
    970 
    971 bool block_copy_call_failed(BlockCopyCallState *call_state)
    972 {
    973     return qatomic_load_acquire(&call_state->finished) &&
    974            !qatomic_read(&call_state->cancelled) &&
    975            call_state->ret < 0;
    976 }
    977 
    978 bool block_copy_call_cancelled(BlockCopyCallState *call_state)
    979 {
    980     return qatomic_read(&call_state->cancelled);
    981 }
    982 
    983 int block_copy_call_status(BlockCopyCallState *call_state, bool *error_is_read)
    984 {
    985     assert(qatomic_load_acquire(&call_state->finished));
    986     if (error_is_read) {
    987         *error_is_read = call_state->error_is_read;
    988     }
    989     return call_state->ret;
    990 }
    991 
    992 /*
    993  * Note that cancelling and finishing are racy.
    994  * User can cancel a block-copy that is already finished.
    995  */
    996 void block_copy_call_cancel(BlockCopyCallState *call_state)
    997 {
    998     qatomic_set(&call_state->cancelled, true);
    999     block_copy_kick(call_state);
   1000 }
   1001 
   1002 BdrvDirtyBitmap *block_copy_dirty_bitmap(BlockCopyState *s)
   1003 {
   1004     return s->copy_bitmap;
   1005 }
   1006 
   1007 int64_t block_copy_cluster_size(BlockCopyState *s)
   1008 {
   1009     return s->cluster_size;
   1010 }
   1011 
   1012 void block_copy_set_skip_unallocated(BlockCopyState *s, bool skip)
   1013 {
   1014     qatomic_set(&s->skip_unallocated, skip);
   1015 }
   1016 
   1017 void block_copy_set_speed(BlockCopyState *s, uint64_t speed)
   1018 {
   1019     ratelimit_set_speed(&s->rate_limit, speed, BLOCK_COPY_SLICE_TIME);
   1020 
   1021     /*
   1022      * Note: it's good to kick all call states from here, but it should be done
   1023      * only from a coroutine, to not crash if s->calls list changed while
   1024      * entering one call. So for now, the only user of this function kicks its
   1025      * only one call_state by hand.
   1026      */
   1027 }