qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

multifd.c (37360B)


      1 /*
      2  * Multifd common code
      3  *
      4  * Copyright (c) 2019-2020 Red Hat Inc
      5  *
      6  * Authors:
      7  *  Juan Quintela <quintela@redhat.com>
      8  *
      9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
     10  * See the COPYING file in the top-level directory.
     11  */
     12 
     13 #include "qemu/osdep.h"
     14 #include "qemu/rcu.h"
     15 #include "exec/target_page.h"
     16 #include "sysemu/sysemu.h"
     17 #include "exec/ramblock.h"
     18 #include "qemu/error-report.h"
     19 #include "qapi/error.h"
     20 #include "ram.h"
     21 #include "migration.h"
     22 #include "socket.h"
     23 #include "tls.h"
     24 #include "qemu-file.h"
     25 #include "trace.h"
     26 #include "multifd.h"
     27 
     28 #include "qemu/yank.h"
     29 #include "io/channel-socket.h"
     30 #include "yank_functions.h"
     31 
     32 /* Multiple fd's */
     33 
     34 #define MULTIFD_MAGIC 0x11223344U
     35 #define MULTIFD_VERSION 1
     36 
     37 typedef struct {
     38     uint32_t magic;
     39     uint32_t version;
     40     unsigned char uuid[16]; /* QemuUUID */
     41     uint8_t id;
     42     uint8_t unused1[7];     /* Reserved for future use */
     43     uint64_t unused2[4];    /* Reserved for future use */
     44 } __attribute__((packed)) MultiFDInit_t;
     45 
     46 /* Multifd without compression */
     47 
     48 /**
     49  * nocomp_send_setup: setup send side
     50  *
     51  * For no compression this function does nothing.
     52  *
     53  * Returns 0 for success or -1 for error
     54  *
     55  * @p: Params for the channel that we are using
     56  * @errp: pointer to an error
     57  */
     58 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
     59 {
     60     return 0;
     61 }
     62 
     63 /**
     64  * nocomp_send_cleanup: cleanup send side
     65  *
     66  * For no compression this function does nothing.
     67  *
     68  * @p: Params for the channel that we are using
     69  * @errp: pointer to an error
     70  */
     71 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
     72 {
     73     return;
     74 }
     75 
     76 /**
     77  * nocomp_send_prepare: prepare date to be able to send
     78  *
     79  * For no compression we just have to calculate the size of the
     80  * packet.
     81  *
     82  * Returns 0 for success or -1 for error
     83  *
     84  * @p: Params for the channel that we are using
     85  * @errp: pointer to an error
     86  */
     87 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
     88 {
     89     MultiFDPages_t *pages = p->pages;
     90     size_t page_size = qemu_target_page_size();
     91 
     92     for (int i = 0; i < p->normal_num; i++) {
     93         p->iov[p->iovs_num].iov_base = pages->block->host + p->normal[i];
     94         p->iov[p->iovs_num].iov_len = page_size;
     95         p->iovs_num++;
     96     }
     97 
     98     p->next_packet_size = p->normal_num * page_size;
     99     p->flags |= MULTIFD_FLAG_NOCOMP;
    100     return 0;
    101 }
    102 
    103 /**
    104  * nocomp_recv_setup: setup receive side
    105  *
    106  * For no compression this function does nothing.
    107  *
    108  * Returns 0 for success or -1 for error
    109  *
    110  * @p: Params for the channel that we are using
    111  * @errp: pointer to an error
    112  */
    113 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
    114 {
    115     return 0;
    116 }
    117 
    118 /**
    119  * nocomp_recv_cleanup: setup receive side
    120  *
    121  * For no compression this function does nothing.
    122  *
    123  * @p: Params for the channel that we are using
    124  */
    125 static void nocomp_recv_cleanup(MultiFDRecvParams *p)
    126 {
    127 }
    128 
    129 /**
    130  * nocomp_recv_pages: read the data from the channel into actual pages
    131  *
    132  * For no compression we just need to read things into the correct place.
    133  *
    134  * Returns 0 for success or -1 for error
    135  *
    136  * @p: Params for the channel that we are using
    137  * @errp: pointer to an error
    138  */
    139 static int nocomp_recv_pages(MultiFDRecvParams *p, Error **errp)
    140 {
    141     uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
    142     size_t page_size = qemu_target_page_size();
    143 
    144     if (flags != MULTIFD_FLAG_NOCOMP) {
    145         error_setg(errp, "multifd %u: flags received %x flags expected %x",
    146                    p->id, flags, MULTIFD_FLAG_NOCOMP);
    147         return -1;
    148     }
    149     for (int i = 0; i < p->normal_num; i++) {
    150         p->iov[i].iov_base = p->host + p->normal[i];
    151         p->iov[i].iov_len = page_size;
    152     }
    153     return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
    154 }
    155 
    156 static MultiFDMethods multifd_nocomp_ops = {
    157     .send_setup = nocomp_send_setup,
    158     .send_cleanup = nocomp_send_cleanup,
    159     .send_prepare = nocomp_send_prepare,
    160     .recv_setup = nocomp_recv_setup,
    161     .recv_cleanup = nocomp_recv_cleanup,
    162     .recv_pages = nocomp_recv_pages
    163 };
    164 
    165 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
    166     [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
    167 };
    168 
    169 void multifd_register_ops(int method, MultiFDMethods *ops)
    170 {
    171     assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
    172     multifd_ops[method] = ops;
    173 }
    174 
    175 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
    176 {
    177     MultiFDInit_t msg = {};
    178     int ret;
    179 
    180     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
    181     msg.version = cpu_to_be32(MULTIFD_VERSION);
    182     msg.id = p->id;
    183     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
    184 
    185     ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
    186     if (ret != 0) {
    187         return -1;
    188     }
    189     return 0;
    190 }
    191 
    192 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
    193 {
    194     MultiFDInit_t msg;
    195     int ret;
    196 
    197     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
    198     if (ret != 0) {
    199         return -1;
    200     }
    201 
    202     msg.magic = be32_to_cpu(msg.magic);
    203     msg.version = be32_to_cpu(msg.version);
    204 
    205     if (msg.magic != MULTIFD_MAGIC) {
    206         error_setg(errp, "multifd: received packet magic %x "
    207                    "expected %x", msg.magic, MULTIFD_MAGIC);
    208         return -1;
    209     }
    210 
    211     if (msg.version != MULTIFD_VERSION) {
    212         error_setg(errp, "multifd: received packet version %u "
    213                    "expected %u", msg.version, MULTIFD_VERSION);
    214         return -1;
    215     }
    216 
    217     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
    218         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
    219         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
    220 
    221         error_setg(errp, "multifd: received uuid '%s' and expected "
    222                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
    223         g_free(uuid);
    224         g_free(msg_uuid);
    225         return -1;
    226     }
    227 
    228     if (msg.id > migrate_multifd_channels()) {
    229         error_setg(errp, "multifd: received channel version %u "
    230                    "expected %u", msg.version, MULTIFD_VERSION);
    231         return -1;
    232     }
    233 
    234     return msg.id;
    235 }
    236 
    237 static MultiFDPages_t *multifd_pages_init(size_t size)
    238 {
    239     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
    240 
    241     pages->allocated = size;
    242     pages->offset = g_new0(ram_addr_t, size);
    243 
    244     return pages;
    245 }
    246 
    247 static void multifd_pages_clear(MultiFDPages_t *pages)
    248 {
    249     pages->num = 0;
    250     pages->allocated = 0;
    251     pages->packet_num = 0;
    252     pages->block = NULL;
    253     g_free(pages->offset);
    254     pages->offset = NULL;
    255     g_free(pages);
    256 }
    257 
    258 static void multifd_send_fill_packet(MultiFDSendParams *p)
    259 {
    260     MultiFDPacket_t *packet = p->packet;
    261     int i;
    262 
    263     packet->flags = cpu_to_be32(p->flags);
    264     packet->pages_alloc = cpu_to_be32(p->pages->allocated);
    265     packet->normal_pages = cpu_to_be32(p->normal_num);
    266     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
    267     packet->packet_num = cpu_to_be64(p->packet_num);
    268 
    269     if (p->pages->block) {
    270         strncpy(packet->ramblock, p->pages->block->idstr, 256);
    271     }
    272 
    273     for (i = 0; i < p->normal_num; i++) {
    274         /* there are architectures where ram_addr_t is 32 bit */
    275         uint64_t temp = p->normal[i];
    276 
    277         packet->offset[i] = cpu_to_be64(temp);
    278     }
    279 }
    280 
    281 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
    282 {
    283     MultiFDPacket_t *packet = p->packet;
    284     size_t page_size = qemu_target_page_size();
    285     uint32_t page_count = MULTIFD_PACKET_SIZE / page_size;
    286     RAMBlock *block;
    287     int i;
    288 
    289     packet->magic = be32_to_cpu(packet->magic);
    290     if (packet->magic != MULTIFD_MAGIC) {
    291         error_setg(errp, "multifd: received packet "
    292                    "magic %x and expected magic %x",
    293                    packet->magic, MULTIFD_MAGIC);
    294         return -1;
    295     }
    296 
    297     packet->version = be32_to_cpu(packet->version);
    298     if (packet->version != MULTIFD_VERSION) {
    299         error_setg(errp, "multifd: received packet "
    300                    "version %u and expected version %u",
    301                    packet->version, MULTIFD_VERSION);
    302         return -1;
    303     }
    304 
    305     p->flags = be32_to_cpu(packet->flags);
    306 
    307     packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
    308     /*
    309      * If we received a packet that is 100 times bigger than expected
    310      * just stop migration.  It is a magic number.
    311      */
    312     if (packet->pages_alloc > page_count) {
    313         error_setg(errp, "multifd: received packet "
    314                    "with size %u and expected a size of %u",
    315                    packet->pages_alloc, page_count) ;
    316         return -1;
    317     }
    318 
    319     p->normal_num = be32_to_cpu(packet->normal_pages);
    320     if (p->normal_num > packet->pages_alloc) {
    321         error_setg(errp, "multifd: received packet "
    322                    "with %u pages and expected maximum pages are %u",
    323                    p->normal_num, packet->pages_alloc) ;
    324         return -1;
    325     }
    326 
    327     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
    328     p->packet_num = be64_to_cpu(packet->packet_num);
    329 
    330     if (p->normal_num == 0) {
    331         return 0;
    332     }
    333 
    334     /* make sure that ramblock is 0 terminated */
    335     packet->ramblock[255] = 0;
    336     block = qemu_ram_block_by_name(packet->ramblock);
    337     if (!block) {
    338         error_setg(errp, "multifd: unknown ram block %s",
    339                    packet->ramblock);
    340         return -1;
    341     }
    342 
    343     p->host = block->host;
    344     for (i = 0; i < p->normal_num; i++) {
    345         uint64_t offset = be64_to_cpu(packet->offset[i]);
    346 
    347         if (offset > (block->used_length - page_size)) {
    348             error_setg(errp, "multifd: offset too long %" PRIu64
    349                        " (max " RAM_ADDR_FMT ")",
    350                        offset, block->used_length);
    351             return -1;
    352         }
    353         p->normal[i] = offset;
    354     }
    355 
    356     return 0;
    357 }
    358 
    359 struct {
    360     MultiFDSendParams *params;
    361     /* array of pages to sent */
    362     MultiFDPages_t *pages;
    363     /* global number of generated multifd packets */
    364     uint64_t packet_num;
    365     /* send channels ready */
    366     QemuSemaphore channels_ready;
    367     /*
    368      * Have we already run terminate threads.  There is a race when it
    369      * happens that we got one error while we are exiting.
    370      * We will use atomic operations.  Only valid values are 0 and 1.
    371      */
    372     int exiting;
    373     /* multifd ops */
    374     MultiFDMethods *ops;
    375 } *multifd_send_state;
    376 
    377 /*
    378  * How we use multifd_send_state->pages and channel->pages?
    379  *
    380  * We create a pages for each channel, and a main one.  Each time that
    381  * we need to send a batch of pages we interchange the ones between
    382  * multifd_send_state and the channel that is sending it.  There are
    383  * two reasons for that:
    384  *    - to not have to do so many mallocs during migration
    385  *    - to make easier to know what to free at the end of migration
    386  *
    387  * This way we always know who is the owner of each "pages" struct,
    388  * and we don't need any locking.  It belongs to the migration thread
    389  * or to the channel thread.  Switching is safe because the migration
    390  * thread is using the channel mutex when changing it, and the channel
    391  * have to had finish with its own, otherwise pending_job can't be
    392  * false.
    393  */
    394 
    395 static int multifd_send_pages(QEMUFile *f)
    396 {
    397     int i;
    398     static int next_channel;
    399     MultiFDSendParams *p = NULL; /* make happy gcc */
    400     MultiFDPages_t *pages = multifd_send_state->pages;
    401     uint64_t transferred;
    402 
    403     if (qatomic_read(&multifd_send_state->exiting)) {
    404         return -1;
    405     }
    406 
    407     qemu_sem_wait(&multifd_send_state->channels_ready);
    408     /*
    409      * next_channel can remain from a previous migration that was
    410      * using more channels, so ensure it doesn't overflow if the
    411      * limit is lower now.
    412      */
    413     next_channel %= migrate_multifd_channels();
    414     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
    415         p = &multifd_send_state->params[i];
    416 
    417         qemu_mutex_lock(&p->mutex);
    418         if (p->quit) {
    419             error_report("%s: channel %d has already quit!", __func__, i);
    420             qemu_mutex_unlock(&p->mutex);
    421             return -1;
    422         }
    423         if (!p->pending_job) {
    424             p->pending_job++;
    425             next_channel = (i + 1) % migrate_multifd_channels();
    426             break;
    427         }
    428         qemu_mutex_unlock(&p->mutex);
    429     }
    430     assert(!p->pages->num);
    431     assert(!p->pages->block);
    432 
    433     p->packet_num = multifd_send_state->packet_num++;
    434     multifd_send_state->pages = p->pages;
    435     p->pages = pages;
    436     transferred = ((uint64_t) pages->num) * qemu_target_page_size()
    437                 + p->packet_len;
    438     qemu_file_acct_rate_limit(f, transferred);
    439     ram_counters.multifd_bytes += transferred;
    440     ram_counters.transferred += transferred;
    441     qemu_mutex_unlock(&p->mutex);
    442     qemu_sem_post(&p->sem);
    443 
    444     return 1;
    445 }
    446 
    447 int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
    448 {
    449     MultiFDPages_t *pages = multifd_send_state->pages;
    450 
    451     if (!pages->block) {
    452         pages->block = block;
    453     }
    454 
    455     if (pages->block == block) {
    456         pages->offset[pages->num] = offset;
    457         pages->num++;
    458 
    459         if (pages->num < pages->allocated) {
    460             return 1;
    461         }
    462     }
    463 
    464     if (multifd_send_pages(f) < 0) {
    465         return -1;
    466     }
    467 
    468     if (pages->block != block) {
    469         return  multifd_queue_page(f, block, offset);
    470     }
    471 
    472     return 1;
    473 }
    474 
    475 static void multifd_send_terminate_threads(Error *err)
    476 {
    477     int i;
    478 
    479     trace_multifd_send_terminate_threads(err != NULL);
    480 
    481     if (err) {
    482         MigrationState *s = migrate_get_current();
    483         migrate_set_error(s, err);
    484         if (s->state == MIGRATION_STATUS_SETUP ||
    485             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
    486             s->state == MIGRATION_STATUS_DEVICE ||
    487             s->state == MIGRATION_STATUS_ACTIVE) {
    488             migrate_set_state(&s->state, s->state,
    489                               MIGRATION_STATUS_FAILED);
    490         }
    491     }
    492 
    493     /*
    494      * We don't want to exit each threads twice.  Depending on where
    495      * we get the error, or if there are two independent errors in two
    496      * threads at the same time, we can end calling this function
    497      * twice.
    498      */
    499     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
    500         return;
    501     }
    502 
    503     for (i = 0; i < migrate_multifd_channels(); i++) {
    504         MultiFDSendParams *p = &multifd_send_state->params[i];
    505 
    506         qemu_mutex_lock(&p->mutex);
    507         p->quit = true;
    508         qemu_sem_post(&p->sem);
    509         if (p->c) {
    510             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
    511         }
    512         qemu_mutex_unlock(&p->mutex);
    513     }
    514 }
    515 
    516 void multifd_save_cleanup(void)
    517 {
    518     int i;
    519 
    520     if (!migrate_use_multifd() || !migrate_multi_channels_is_allowed()) {
    521         return;
    522     }
    523     multifd_send_terminate_threads(NULL);
    524     for (i = 0; i < migrate_multifd_channels(); i++) {
    525         MultiFDSendParams *p = &multifd_send_state->params[i];
    526 
    527         if (p->running) {
    528             qemu_thread_join(&p->thread);
    529         }
    530     }
    531     for (i = 0; i < migrate_multifd_channels(); i++) {
    532         MultiFDSendParams *p = &multifd_send_state->params[i];
    533         Error *local_err = NULL;
    534 
    535         if (p->registered_yank) {
    536             migration_ioc_unregister_yank(p->c);
    537         }
    538         socket_send_channel_destroy(p->c);
    539         p->c = NULL;
    540         qemu_mutex_destroy(&p->mutex);
    541         qemu_sem_destroy(&p->sem);
    542         qemu_sem_destroy(&p->sem_sync);
    543         g_free(p->name);
    544         p->name = NULL;
    545         multifd_pages_clear(p->pages);
    546         p->pages = NULL;
    547         p->packet_len = 0;
    548         g_free(p->packet);
    549         p->packet = NULL;
    550         g_free(p->iov);
    551         p->iov = NULL;
    552         g_free(p->normal);
    553         p->normal = NULL;
    554         multifd_send_state->ops->send_cleanup(p, &local_err);
    555         if (local_err) {
    556             migrate_set_error(migrate_get_current(), local_err);
    557             error_free(local_err);
    558         }
    559     }
    560     qemu_sem_destroy(&multifd_send_state->channels_ready);
    561     g_free(multifd_send_state->params);
    562     multifd_send_state->params = NULL;
    563     multifd_pages_clear(multifd_send_state->pages);
    564     multifd_send_state->pages = NULL;
    565     g_free(multifd_send_state);
    566     multifd_send_state = NULL;
    567 }
    568 
    569 static int multifd_zero_copy_flush(QIOChannel *c)
    570 {
    571     int ret;
    572     Error *err = NULL;
    573 
    574     ret = qio_channel_flush(c, &err);
    575     if (ret < 0) {
    576         error_report_err(err);
    577         return -1;
    578     }
    579     if (ret == 1) {
    580         dirty_sync_missed_zero_copy();
    581     }
    582 
    583     return ret;
    584 }
    585 
    586 int multifd_send_sync_main(QEMUFile *f)
    587 {
    588     int i;
    589     bool flush_zero_copy;
    590 
    591     if (!migrate_use_multifd()) {
    592         return 0;
    593     }
    594     if (multifd_send_state->pages->num) {
    595         if (multifd_send_pages(f) < 0) {
    596             error_report("%s: multifd_send_pages fail", __func__);
    597             return -1;
    598         }
    599     }
    600 
    601     /*
    602      * When using zero-copy, it's necessary to flush the pages before any of
    603      * the pages can be sent again, so we'll make sure the new version of the
    604      * pages will always arrive _later_ than the old pages.
    605      *
    606      * Currently we achieve this by flushing the zero-page requested writes
    607      * per ram iteration, but in the future we could potentially optimize it
    608      * to be less frequent, e.g. only after we finished one whole scanning of
    609      * all the dirty bitmaps.
    610      */
    611 
    612     flush_zero_copy = migrate_use_zero_copy_send();
    613 
    614     for (i = 0; i < migrate_multifd_channels(); i++) {
    615         MultiFDSendParams *p = &multifd_send_state->params[i];
    616 
    617         trace_multifd_send_sync_main_signal(p->id);
    618 
    619         qemu_mutex_lock(&p->mutex);
    620 
    621         if (p->quit) {
    622             error_report("%s: channel %d has already quit", __func__, i);
    623             qemu_mutex_unlock(&p->mutex);
    624             return -1;
    625         }
    626 
    627         p->packet_num = multifd_send_state->packet_num++;
    628         p->flags |= MULTIFD_FLAG_SYNC;
    629         p->pending_job++;
    630         qemu_file_acct_rate_limit(f, p->packet_len);
    631         ram_counters.multifd_bytes += p->packet_len;
    632         ram_counters.transferred += p->packet_len;
    633         qemu_mutex_unlock(&p->mutex);
    634         qemu_sem_post(&p->sem);
    635 
    636         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
    637             return -1;
    638         }
    639     }
    640     for (i = 0; i < migrate_multifd_channels(); i++) {
    641         MultiFDSendParams *p = &multifd_send_state->params[i];
    642 
    643         trace_multifd_send_sync_main_wait(p->id);
    644         qemu_sem_wait(&p->sem_sync);
    645     }
    646     trace_multifd_send_sync_main(multifd_send_state->packet_num);
    647 
    648     return 0;
    649 }
    650 
    651 static void *multifd_send_thread(void *opaque)
    652 {
    653     MultiFDSendParams *p = opaque;
    654     Error *local_err = NULL;
    655     int ret = 0;
    656     bool use_zero_copy_send = migrate_use_zero_copy_send();
    657 
    658     trace_multifd_send_thread_start(p->id);
    659     rcu_register_thread();
    660 
    661     if (multifd_send_initial_packet(p, &local_err) < 0) {
    662         ret = -1;
    663         goto out;
    664     }
    665     /* initial packet */
    666     p->num_packets = 1;
    667 
    668     while (true) {
    669         qemu_sem_wait(&p->sem);
    670 
    671         if (qatomic_read(&multifd_send_state->exiting)) {
    672             break;
    673         }
    674         qemu_mutex_lock(&p->mutex);
    675 
    676         if (p->pending_job) {
    677             uint64_t packet_num = p->packet_num;
    678             uint32_t flags = p->flags;
    679             p->normal_num = 0;
    680 
    681             if (use_zero_copy_send) {
    682                 p->iovs_num = 0;
    683             } else {
    684                 p->iovs_num = 1;
    685             }
    686 
    687             for (int i = 0; i < p->pages->num; i++) {
    688                 p->normal[p->normal_num] = p->pages->offset[i];
    689                 p->normal_num++;
    690             }
    691 
    692             if (p->normal_num) {
    693                 ret = multifd_send_state->ops->send_prepare(p, &local_err);
    694                 if (ret != 0) {
    695                     qemu_mutex_unlock(&p->mutex);
    696                     break;
    697                 }
    698             }
    699             multifd_send_fill_packet(p);
    700             p->flags = 0;
    701             p->num_packets++;
    702             p->total_normal_pages += p->normal_num;
    703             p->pages->num = 0;
    704             p->pages->block = NULL;
    705             qemu_mutex_unlock(&p->mutex);
    706 
    707             trace_multifd_send(p->id, packet_num, p->normal_num, flags,
    708                                p->next_packet_size);
    709 
    710             if (use_zero_copy_send) {
    711                 /* Send header first, without zerocopy */
    712                 ret = qio_channel_write_all(p->c, (void *)p->packet,
    713                                             p->packet_len, &local_err);
    714                 if (ret != 0) {
    715                     break;
    716                 }
    717             } else {
    718                 /* Send header using the same writev call */
    719                 p->iov[0].iov_len = p->packet_len;
    720                 p->iov[0].iov_base = p->packet;
    721             }
    722 
    723             ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
    724                                               0, p->write_flags, &local_err);
    725             if (ret != 0) {
    726                 break;
    727             }
    728 
    729             qemu_mutex_lock(&p->mutex);
    730             p->pending_job--;
    731             qemu_mutex_unlock(&p->mutex);
    732 
    733             if (flags & MULTIFD_FLAG_SYNC) {
    734                 qemu_sem_post(&p->sem_sync);
    735             }
    736             qemu_sem_post(&multifd_send_state->channels_ready);
    737         } else if (p->quit) {
    738             qemu_mutex_unlock(&p->mutex);
    739             break;
    740         } else {
    741             qemu_mutex_unlock(&p->mutex);
    742             /* sometimes there are spurious wakeups */
    743         }
    744     }
    745 
    746 out:
    747     if (local_err) {
    748         trace_multifd_send_error(p->id);
    749         multifd_send_terminate_threads(local_err);
    750         error_free(local_err);
    751     }
    752 
    753     /*
    754      * Error happen, I will exit, but I can't just leave, tell
    755      * who pay attention to me.
    756      */
    757     if (ret != 0) {
    758         qemu_sem_post(&p->sem_sync);
    759         qemu_sem_post(&multifd_send_state->channels_ready);
    760     }
    761 
    762     qemu_mutex_lock(&p->mutex);
    763     p->running = false;
    764     qemu_mutex_unlock(&p->mutex);
    765 
    766     rcu_unregister_thread();
    767     trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages);
    768 
    769     return NULL;
    770 }
    771 
    772 static bool multifd_channel_connect(MultiFDSendParams *p,
    773                                     QIOChannel *ioc,
    774                                     Error *error);
    775 
    776 static void multifd_tls_outgoing_handshake(QIOTask *task,
    777                                            gpointer opaque)
    778 {
    779     MultiFDSendParams *p = opaque;
    780     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
    781     Error *err = NULL;
    782 
    783     if (qio_task_propagate_error(task, &err)) {
    784         trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
    785     } else {
    786         trace_multifd_tls_outgoing_handshake_complete(ioc);
    787     }
    788 
    789     if (!multifd_channel_connect(p, ioc, err)) {
    790         /*
    791          * Error happen, mark multifd_send_thread status as 'quit' although it
    792          * is not created, and then tell who pay attention to me.
    793          */
    794         p->quit = true;
    795         qemu_sem_post(&multifd_send_state->channels_ready);
    796         qemu_sem_post(&p->sem_sync);
    797     }
    798 }
    799 
    800 static void *multifd_tls_handshake_thread(void *opaque)
    801 {
    802     MultiFDSendParams *p = opaque;
    803     QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c);
    804 
    805     qio_channel_tls_handshake(tioc,
    806                               multifd_tls_outgoing_handshake,
    807                               p,
    808                               NULL,
    809                               NULL);
    810     return NULL;
    811 }
    812 
    813 static void multifd_tls_channel_connect(MultiFDSendParams *p,
    814                                         QIOChannel *ioc,
    815                                         Error **errp)
    816 {
    817     MigrationState *s = migrate_get_current();
    818     const char *hostname = s->hostname;
    819     QIOChannelTLS *tioc;
    820 
    821     tioc = migration_tls_client_create(s, ioc, hostname, errp);
    822     if (!tioc) {
    823         return;
    824     }
    825 
    826     object_unref(OBJECT(ioc));
    827     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
    828     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
    829     p->c = QIO_CHANNEL(tioc);
    830     qemu_thread_create(&p->thread, "multifd-tls-handshake-worker",
    831                        multifd_tls_handshake_thread, p,
    832                        QEMU_THREAD_JOINABLE);
    833 }
    834 
    835 static bool multifd_channel_connect(MultiFDSendParams *p,
    836                                     QIOChannel *ioc,
    837                                     Error *error)
    838 {
    839     trace_multifd_set_outgoing_channel(
    840         ioc, object_get_typename(OBJECT(ioc)),
    841         migrate_get_current()->hostname, error);
    842 
    843     if (!error) {
    844         if (migrate_channel_requires_tls_upgrade(ioc)) {
    845             multifd_tls_channel_connect(p, ioc, &error);
    846             if (!error) {
    847                 /*
    848                  * tls_channel_connect will call back to this
    849                  * function after the TLS handshake,
    850                  * so we mustn't call multifd_send_thread until then
    851                  */
    852                 return true;
    853             } else {
    854                 return false;
    855             }
    856         } else {
    857             migration_ioc_register_yank(ioc);
    858             p->registered_yank = true;
    859             p->c = ioc;
    860             qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
    861                                    QEMU_THREAD_JOINABLE);
    862        }
    863        return true;
    864     }
    865 
    866     return false;
    867 }
    868 
    869 static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
    870                                              QIOChannel *ioc, Error *err)
    871 {
    872      migrate_set_error(migrate_get_current(), err);
    873      /* Error happen, we need to tell who pay attention to me */
    874      qemu_sem_post(&multifd_send_state->channels_ready);
    875      qemu_sem_post(&p->sem_sync);
    876      /*
    877       * Although multifd_send_thread is not created, but main migration
    878       * thread neet to judge whether it is running, so we need to mark
    879       * its status.
    880       */
    881      p->quit = true;
    882      object_unref(OBJECT(ioc));
    883      error_free(err);
    884 }
    885 
    886 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
    887 {
    888     MultiFDSendParams *p = opaque;
    889     QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
    890     Error *local_err = NULL;
    891 
    892     trace_multifd_new_send_channel_async(p->id);
    893     if (qio_task_propagate_error(task, &local_err)) {
    894         goto cleanup;
    895     } else {
    896         p->c = QIO_CHANNEL(sioc);
    897         qio_channel_set_delay(p->c, false);
    898         p->running = true;
    899         if (!multifd_channel_connect(p, sioc, local_err)) {
    900             goto cleanup;
    901         }
    902         return;
    903     }
    904 
    905 cleanup:
    906     multifd_new_send_channel_cleanup(p, sioc, local_err);
    907 }
    908 
    909 int multifd_save_setup(Error **errp)
    910 {
    911     int thread_count;
    912     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
    913     uint8_t i;
    914 
    915     if (!migrate_use_multifd()) {
    916         return 0;
    917     }
    918     if (!migrate_multi_channels_is_allowed()) {
    919         error_setg(errp, "multifd is not supported by current protocol");
    920         return -1;
    921     }
    922 
    923     thread_count = migrate_multifd_channels();
    924     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
    925     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
    926     multifd_send_state->pages = multifd_pages_init(page_count);
    927     qemu_sem_init(&multifd_send_state->channels_ready, 0);
    928     qatomic_set(&multifd_send_state->exiting, 0);
    929     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
    930 
    931     for (i = 0; i < thread_count; i++) {
    932         MultiFDSendParams *p = &multifd_send_state->params[i];
    933 
    934         qemu_mutex_init(&p->mutex);
    935         qemu_sem_init(&p->sem, 0);
    936         qemu_sem_init(&p->sem_sync, 0);
    937         p->quit = false;
    938         p->pending_job = 0;
    939         p->id = i;
    940         p->pages = multifd_pages_init(page_count);
    941         p->packet_len = sizeof(MultiFDPacket_t)
    942                       + sizeof(uint64_t) * page_count;
    943         p->packet = g_malloc0(p->packet_len);
    944         p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
    945         p->packet->version = cpu_to_be32(MULTIFD_VERSION);
    946         p->name = g_strdup_printf("multifdsend_%d", i);
    947         /* We need one extra place for the packet header */
    948         p->iov = g_new0(struct iovec, page_count + 1);
    949         p->normal = g_new0(ram_addr_t, page_count);
    950 
    951         if (migrate_use_zero_copy_send()) {
    952             p->write_flags = QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
    953         } else {
    954             p->write_flags = 0;
    955         }
    956 
    957         socket_send_channel_create(multifd_new_send_channel_async, p);
    958     }
    959 
    960     for (i = 0; i < thread_count; i++) {
    961         MultiFDSendParams *p = &multifd_send_state->params[i];
    962         Error *local_err = NULL;
    963         int ret;
    964 
    965         ret = multifd_send_state->ops->send_setup(p, &local_err);
    966         if (ret) {
    967             error_propagate(errp, local_err);
    968             return ret;
    969         }
    970     }
    971     return 0;
    972 }
    973 
    974 struct {
    975     MultiFDRecvParams *params;
    976     /* number of created threads */
    977     int count;
    978     /* syncs main thread and channels */
    979     QemuSemaphore sem_sync;
    980     /* global number of generated multifd packets */
    981     uint64_t packet_num;
    982     /* multifd ops */
    983     MultiFDMethods *ops;
    984 } *multifd_recv_state;
    985 
    986 static void multifd_recv_terminate_threads(Error *err)
    987 {
    988     int i;
    989 
    990     trace_multifd_recv_terminate_threads(err != NULL);
    991 
    992     if (err) {
    993         MigrationState *s = migrate_get_current();
    994         migrate_set_error(s, err);
    995         if (s->state == MIGRATION_STATUS_SETUP ||
    996             s->state == MIGRATION_STATUS_ACTIVE) {
    997             migrate_set_state(&s->state, s->state,
    998                               MIGRATION_STATUS_FAILED);
    999         }
   1000     }
   1001 
   1002     for (i = 0; i < migrate_multifd_channels(); i++) {
   1003         MultiFDRecvParams *p = &multifd_recv_state->params[i];
   1004 
   1005         qemu_mutex_lock(&p->mutex);
   1006         p->quit = true;
   1007         /*
   1008          * We could arrive here for two reasons:
   1009          *  - normal quit, i.e. everything went fine, just finished
   1010          *  - error quit: We close the channels so the channel threads
   1011          *    finish the qio_channel_read_all_eof()
   1012          */
   1013         if (p->c) {
   1014             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
   1015         }
   1016         qemu_mutex_unlock(&p->mutex);
   1017     }
   1018 }
   1019 
   1020 int multifd_load_cleanup(Error **errp)
   1021 {
   1022     int i;
   1023 
   1024     if (!migrate_use_multifd() || !migrate_multi_channels_is_allowed()) {
   1025         return 0;
   1026     }
   1027     multifd_recv_terminate_threads(NULL);
   1028     for (i = 0; i < migrate_multifd_channels(); i++) {
   1029         MultiFDRecvParams *p = &multifd_recv_state->params[i];
   1030 
   1031         if (p->running) {
   1032             p->quit = true;
   1033             /*
   1034              * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
   1035              * however try to wakeup it without harm in cleanup phase.
   1036              */
   1037             qemu_sem_post(&p->sem_sync);
   1038             qemu_thread_join(&p->thread);
   1039         }
   1040     }
   1041     for (i = 0; i < migrate_multifd_channels(); i++) {
   1042         MultiFDRecvParams *p = &multifd_recv_state->params[i];
   1043 
   1044         migration_ioc_unregister_yank(p->c);
   1045         object_unref(OBJECT(p->c));
   1046         p->c = NULL;
   1047         qemu_mutex_destroy(&p->mutex);
   1048         qemu_sem_destroy(&p->sem_sync);
   1049         g_free(p->name);
   1050         p->name = NULL;
   1051         p->packet_len = 0;
   1052         g_free(p->packet);
   1053         p->packet = NULL;
   1054         g_free(p->iov);
   1055         p->iov = NULL;
   1056         g_free(p->normal);
   1057         p->normal = NULL;
   1058         multifd_recv_state->ops->recv_cleanup(p);
   1059     }
   1060     qemu_sem_destroy(&multifd_recv_state->sem_sync);
   1061     g_free(multifd_recv_state->params);
   1062     multifd_recv_state->params = NULL;
   1063     g_free(multifd_recv_state);
   1064     multifd_recv_state = NULL;
   1065 
   1066     return 0;
   1067 }
   1068 
   1069 void multifd_recv_sync_main(void)
   1070 {
   1071     int i;
   1072 
   1073     if (!migrate_use_multifd()) {
   1074         return;
   1075     }
   1076     for (i = 0; i < migrate_multifd_channels(); i++) {
   1077         MultiFDRecvParams *p = &multifd_recv_state->params[i];
   1078 
   1079         trace_multifd_recv_sync_main_wait(p->id);
   1080         qemu_sem_wait(&multifd_recv_state->sem_sync);
   1081     }
   1082     for (i = 0; i < migrate_multifd_channels(); i++) {
   1083         MultiFDRecvParams *p = &multifd_recv_state->params[i];
   1084 
   1085         WITH_QEMU_LOCK_GUARD(&p->mutex) {
   1086             if (multifd_recv_state->packet_num < p->packet_num) {
   1087                 multifd_recv_state->packet_num = p->packet_num;
   1088             }
   1089         }
   1090         trace_multifd_recv_sync_main_signal(p->id);
   1091         qemu_sem_post(&p->sem_sync);
   1092     }
   1093     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
   1094 }
   1095 
   1096 static void *multifd_recv_thread(void *opaque)
   1097 {
   1098     MultiFDRecvParams *p = opaque;
   1099     Error *local_err = NULL;
   1100     int ret;
   1101 
   1102     trace_multifd_recv_thread_start(p->id);
   1103     rcu_register_thread();
   1104 
   1105     while (true) {
   1106         uint32_t flags;
   1107 
   1108         if (p->quit) {
   1109             break;
   1110         }
   1111 
   1112         ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
   1113                                        p->packet_len, &local_err);
   1114         if (ret == 0) {   /* EOF */
   1115             break;
   1116         }
   1117         if (ret == -1) {   /* Error */
   1118             break;
   1119         }
   1120 
   1121         qemu_mutex_lock(&p->mutex);
   1122         ret = multifd_recv_unfill_packet(p, &local_err);
   1123         if (ret) {
   1124             qemu_mutex_unlock(&p->mutex);
   1125             break;
   1126         }
   1127 
   1128         flags = p->flags;
   1129         /* recv methods don't know how to handle the SYNC flag */
   1130         p->flags &= ~MULTIFD_FLAG_SYNC;
   1131         trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
   1132                            p->next_packet_size);
   1133         p->num_packets++;
   1134         p->total_normal_pages += p->normal_num;
   1135         qemu_mutex_unlock(&p->mutex);
   1136 
   1137         if (p->normal_num) {
   1138             ret = multifd_recv_state->ops->recv_pages(p, &local_err);
   1139             if (ret != 0) {
   1140                 break;
   1141             }
   1142         }
   1143 
   1144         if (flags & MULTIFD_FLAG_SYNC) {
   1145             qemu_sem_post(&multifd_recv_state->sem_sync);
   1146             qemu_sem_wait(&p->sem_sync);
   1147         }
   1148     }
   1149 
   1150     if (local_err) {
   1151         multifd_recv_terminate_threads(local_err);
   1152         error_free(local_err);
   1153     }
   1154     qemu_mutex_lock(&p->mutex);
   1155     p->running = false;
   1156     qemu_mutex_unlock(&p->mutex);
   1157 
   1158     rcu_unregister_thread();
   1159     trace_multifd_recv_thread_end(p->id, p->num_packets, p->total_normal_pages);
   1160 
   1161     return NULL;
   1162 }
   1163 
   1164 int multifd_load_setup(Error **errp)
   1165 {
   1166     int thread_count;
   1167     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
   1168     uint8_t i;
   1169 
   1170     if (!migrate_use_multifd()) {
   1171         return 0;
   1172     }
   1173     if (!migrate_multi_channels_is_allowed()) {
   1174         error_setg(errp, "multifd is not supported by current protocol");
   1175         return -1;
   1176     }
   1177     thread_count = migrate_multifd_channels();
   1178     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
   1179     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
   1180     qatomic_set(&multifd_recv_state->count, 0);
   1181     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
   1182     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
   1183 
   1184     for (i = 0; i < thread_count; i++) {
   1185         MultiFDRecvParams *p = &multifd_recv_state->params[i];
   1186 
   1187         qemu_mutex_init(&p->mutex);
   1188         qemu_sem_init(&p->sem_sync, 0);
   1189         p->quit = false;
   1190         p->id = i;
   1191         p->packet_len = sizeof(MultiFDPacket_t)
   1192                       + sizeof(uint64_t) * page_count;
   1193         p->packet = g_malloc0(p->packet_len);
   1194         p->name = g_strdup_printf("multifdrecv_%d", i);
   1195         p->iov = g_new0(struct iovec, page_count);
   1196         p->normal = g_new0(ram_addr_t, page_count);
   1197     }
   1198 
   1199     for (i = 0; i < thread_count; i++) {
   1200         MultiFDRecvParams *p = &multifd_recv_state->params[i];
   1201         Error *local_err = NULL;
   1202         int ret;
   1203 
   1204         ret = multifd_recv_state->ops->recv_setup(p, &local_err);
   1205         if (ret) {
   1206             error_propagate(errp, local_err);
   1207             return ret;
   1208         }
   1209     }
   1210     return 0;
   1211 }
   1212 
   1213 bool multifd_recv_all_channels_created(void)
   1214 {
   1215     int thread_count = migrate_multifd_channels();
   1216 
   1217     if (!migrate_use_multifd()) {
   1218         return true;
   1219     }
   1220 
   1221     if (!multifd_recv_state) {
   1222         /* Called before any connections created */
   1223         return false;
   1224     }
   1225 
   1226     return thread_count == qatomic_read(&multifd_recv_state->count);
   1227 }
   1228 
   1229 /*
   1230  * Try to receive all multifd channels to get ready for the migration.
   1231  * - Return true and do not set @errp when correctly receiving all channels;
   1232  * - Return false and do not set @errp when correctly receiving the current one;
   1233  * - Return false and set @errp when failing to receive the current channel.
   1234  */
   1235 bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
   1236 {
   1237     MultiFDRecvParams *p;
   1238     Error *local_err = NULL;
   1239     int id;
   1240 
   1241     id = multifd_recv_initial_packet(ioc, &local_err);
   1242     if (id < 0) {
   1243         multifd_recv_terminate_threads(local_err);
   1244         error_propagate_prepend(errp, local_err,
   1245                                 "failed to receive packet"
   1246                                 " via multifd channel %d: ",
   1247                                 qatomic_read(&multifd_recv_state->count));
   1248         return false;
   1249     }
   1250     trace_multifd_recv_new_channel(id);
   1251 
   1252     p = &multifd_recv_state->params[id];
   1253     if (p->c != NULL) {
   1254         error_setg(&local_err, "multifd: received id '%d' already setup'",
   1255                    id);
   1256         multifd_recv_terminate_threads(local_err);
   1257         error_propagate(errp, local_err);
   1258         return false;
   1259     }
   1260     p->c = ioc;
   1261     object_ref(OBJECT(ioc));
   1262     /* initial packet */
   1263     p->num_packets = 1;
   1264 
   1265     p->running = true;
   1266     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
   1267                        QEMU_THREAD_JOINABLE);
   1268     qatomic_inc(&multifd_recv_state->count);
   1269     return qatomic_read(&multifd_recv_state->count) ==
   1270            migrate_multifd_channels();
   1271 }