qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

rdma.c (130723B)


      1 /*
      2  * RDMA protocol and interfaces
      3  *
      4  * Copyright IBM, Corp. 2010-2013
      5  * Copyright Red Hat, Inc. 2015-2016
      6  *
      7  * Authors:
      8  *  Michael R. Hines <mrhines@us.ibm.com>
      9  *  Jiuxing Liu <jl@us.ibm.com>
     10  *  Daniel P. Berrange <berrange@redhat.com>
     11  *
     12  * This work is licensed under the terms of the GNU GPL, version 2 or
     13  * later.  See the COPYING file in the top-level directory.
     14  *
     15  */
     16 
     17 #include "qemu/osdep.h"
     18 #include "qapi/error.h"
     19 #include "qemu/cutils.h"
     20 #include "rdma.h"
     21 #include "migration.h"
     22 #include "qemu-file.h"
     23 #include "ram.h"
     24 #include "qemu/error-report.h"
     25 #include "qemu/main-loop.h"
     26 #include "qemu/module.h"
     27 #include "qemu/rcu.h"
     28 #include "qemu/sockets.h"
     29 #include "qemu/bitmap.h"
     30 #include "qemu/coroutine.h"
     31 #include "exec/memory.h"
     32 #include <sys/socket.h>
     33 #include <netdb.h>
     34 #include <arpa/inet.h>
     35 #include <rdma/rdma_cma.h>
     36 #include "trace.h"
     37 #include "qom/object.h"
     38 #include <poll.h>
     39 
     40 /*
     41  * Print and error on both the Monitor and the Log file.
     42  */
     43 #define ERROR(errp, fmt, ...) \
     44     do { \
     45         fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
     46         if (errp && (*(errp) == NULL)) { \
     47             error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
     48         } \
     49     } while (0)
     50 
     51 #define RDMA_RESOLVE_TIMEOUT_MS 10000
     52 
     53 /* Do not merge data if larger than this. */
     54 #define RDMA_MERGE_MAX (2 * 1024 * 1024)
     55 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
     56 
     57 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
     58 
     59 /*
     60  * This is only for non-live state being migrated.
     61  * Instead of RDMA_WRITE messages, we use RDMA_SEND
     62  * messages for that state, which requires a different
     63  * delivery design than main memory.
     64  */
     65 #define RDMA_SEND_INCREMENT 32768
     66 
     67 /*
     68  * Maximum size infiniband SEND message
     69  */
     70 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
     71 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
     72 
     73 #define RDMA_CONTROL_VERSION_CURRENT 1
     74 /*
     75  * Capabilities for negotiation.
     76  */
     77 #define RDMA_CAPABILITY_PIN_ALL 0x01
     78 
     79 /*
     80  * Add the other flags above to this list of known capabilities
     81  * as they are introduced.
     82  */
     83 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
     84 
     85 #define CHECK_ERROR_STATE() \
     86     do { \
     87         if (rdma->error_state) { \
     88             if (!rdma->error_reported) { \
     89                 error_report("RDMA is in an error state waiting migration" \
     90                                 " to abort!"); \
     91                 rdma->error_reported = 1; \
     92             } \
     93             return rdma->error_state; \
     94         } \
     95     } while (0)
     96 
     97 /*
     98  * A work request ID is 64-bits and we split up these bits
     99  * into 3 parts:
    100  *
    101  * bits 0-15 : type of control message, 2^16
    102  * bits 16-29: ram block index, 2^14
    103  * bits 30-63: ram block chunk number, 2^34
    104  *
    105  * The last two bit ranges are only used for RDMA writes,
    106  * in order to track their completion and potentially
    107  * also track unregistration status of the message.
    108  */
    109 #define RDMA_WRID_TYPE_SHIFT  0UL
    110 #define RDMA_WRID_BLOCK_SHIFT 16UL
    111 #define RDMA_WRID_CHUNK_SHIFT 30UL
    112 
    113 #define RDMA_WRID_TYPE_MASK \
    114     ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
    115 
    116 #define RDMA_WRID_BLOCK_MASK \
    117     (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
    118 
    119 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
    120 
    121 /*
    122  * RDMA migration protocol:
    123  * 1. RDMA Writes (data messages, i.e. RAM)
    124  * 2. IB Send/Recv (control channel messages)
    125  */
    126 enum {
    127     RDMA_WRID_NONE = 0,
    128     RDMA_WRID_RDMA_WRITE = 1,
    129     RDMA_WRID_SEND_CONTROL = 2000,
    130     RDMA_WRID_RECV_CONTROL = 4000,
    131 };
    132 
    133 static const char *wrid_desc[] = {
    134     [RDMA_WRID_NONE] = "NONE",
    135     [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
    136     [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
    137     [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
    138 };
    139 
    140 /*
    141  * Work request IDs for IB SEND messages only (not RDMA writes).
    142  * This is used by the migration protocol to transmit
    143  * control messages (such as device state and registration commands)
    144  *
    145  * We could use more WRs, but we have enough for now.
    146  */
    147 enum {
    148     RDMA_WRID_READY = 0,
    149     RDMA_WRID_DATA,
    150     RDMA_WRID_CONTROL,
    151     RDMA_WRID_MAX,
    152 };
    153 
    154 /*
    155  * SEND/RECV IB Control Messages.
    156  */
    157 enum {
    158     RDMA_CONTROL_NONE = 0,
    159     RDMA_CONTROL_ERROR,
    160     RDMA_CONTROL_READY,               /* ready to receive */
    161     RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
    162     RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
    163     RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
    164     RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
    165     RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
    166     RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
    167     RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
    168     RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
    169     RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
    170 };
    171 
    172 
    173 /*
    174  * Memory and MR structures used to represent an IB Send/Recv work request.
    175  * This is *not* used for RDMA writes, only IB Send/Recv.
    176  */
    177 typedef struct {
    178     uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
    179     struct   ibv_mr *control_mr;               /* registration metadata */
    180     size_t   control_len;                      /* length of the message */
    181     uint8_t *control_curr;                     /* start of unconsumed bytes */
    182 } RDMAWorkRequestData;
    183 
    184 /*
    185  * Negotiate RDMA capabilities during connection-setup time.
    186  */
    187 typedef struct {
    188     uint32_t version;
    189     uint32_t flags;
    190 } RDMACapabilities;
    191 
    192 static void caps_to_network(RDMACapabilities *cap)
    193 {
    194     cap->version = htonl(cap->version);
    195     cap->flags = htonl(cap->flags);
    196 }
    197 
    198 static void network_to_caps(RDMACapabilities *cap)
    199 {
    200     cap->version = ntohl(cap->version);
    201     cap->flags = ntohl(cap->flags);
    202 }
    203 
    204 /*
    205  * Representation of a RAMBlock from an RDMA perspective.
    206  * This is not transmitted, only local.
    207  * This and subsequent structures cannot be linked lists
    208  * because we're using a single IB message to transmit
    209  * the information. It's small anyway, so a list is overkill.
    210  */
    211 typedef struct RDMALocalBlock {
    212     char          *block_name;
    213     uint8_t       *local_host_addr; /* local virtual address */
    214     uint64_t       remote_host_addr; /* remote virtual address */
    215     uint64_t       offset;
    216     uint64_t       length;
    217     struct         ibv_mr **pmr;    /* MRs for chunk-level registration */
    218     struct         ibv_mr *mr;      /* MR for non-chunk-level registration */
    219     uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
    220     uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration */
    221     int            index;           /* which block are we */
    222     unsigned int   src_index;       /* (Only used on dest) */
    223     bool           is_ram_block;
    224     int            nb_chunks;
    225     unsigned long *transit_bitmap;
    226     unsigned long *unregister_bitmap;
    227 } RDMALocalBlock;
    228 
    229 /*
    230  * Also represents a RAMblock, but only on the dest.
    231  * This gets transmitted by the dest during connection-time
    232  * to the source VM and then is used to populate the
    233  * corresponding RDMALocalBlock with
    234  * the information needed to perform the actual RDMA.
    235  */
    236 typedef struct QEMU_PACKED RDMADestBlock {
    237     uint64_t remote_host_addr;
    238     uint64_t offset;
    239     uint64_t length;
    240     uint32_t remote_rkey;
    241     uint32_t padding;
    242 } RDMADestBlock;
    243 
    244 static const char *control_desc(unsigned int rdma_control)
    245 {
    246     static const char *strs[] = {
    247         [RDMA_CONTROL_NONE] = "NONE",
    248         [RDMA_CONTROL_ERROR] = "ERROR",
    249         [RDMA_CONTROL_READY] = "READY",
    250         [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
    251         [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
    252         [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
    253         [RDMA_CONTROL_COMPRESS] = "COMPRESS",
    254         [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
    255         [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
    256         [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
    257         [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
    258         [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
    259     };
    260 
    261     if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
    262         return "??BAD CONTROL VALUE??";
    263     }
    264 
    265     return strs[rdma_control];
    266 }
    267 
    268 static uint64_t htonll(uint64_t v)
    269 {
    270     union { uint32_t lv[2]; uint64_t llv; } u;
    271     u.lv[0] = htonl(v >> 32);
    272     u.lv[1] = htonl(v & 0xFFFFFFFFULL);
    273     return u.llv;
    274 }
    275 
    276 static uint64_t ntohll(uint64_t v)
    277 {
    278     union { uint32_t lv[2]; uint64_t llv; } u;
    279     u.llv = v;
    280     return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
    281 }
    282 
    283 static void dest_block_to_network(RDMADestBlock *db)
    284 {
    285     db->remote_host_addr = htonll(db->remote_host_addr);
    286     db->offset = htonll(db->offset);
    287     db->length = htonll(db->length);
    288     db->remote_rkey = htonl(db->remote_rkey);
    289 }
    290 
    291 static void network_to_dest_block(RDMADestBlock *db)
    292 {
    293     db->remote_host_addr = ntohll(db->remote_host_addr);
    294     db->offset = ntohll(db->offset);
    295     db->length = ntohll(db->length);
    296     db->remote_rkey = ntohl(db->remote_rkey);
    297 }
    298 
    299 /*
    300  * Virtual address of the above structures used for transmitting
    301  * the RAMBlock descriptions at connection-time.
    302  * This structure is *not* transmitted.
    303  */
    304 typedef struct RDMALocalBlocks {
    305     int nb_blocks;
    306     bool     init;             /* main memory init complete */
    307     RDMALocalBlock *block;
    308 } RDMALocalBlocks;
    309 
    310 /*
    311  * Main data structure for RDMA state.
    312  * While there is only one copy of this structure being allocated right now,
    313  * this is the place where one would start if you wanted to consider
    314  * having more than one RDMA connection open at the same time.
    315  */
    316 typedef struct RDMAContext {
    317     char *host;
    318     int port;
    319     char *host_port;
    320 
    321     RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
    322 
    323     /*
    324      * This is used by *_exchange_send() to figure out whether or not
    325      * the initial "READY" message has already been received or not.
    326      * This is because other functions may potentially poll() and detect
    327      * the READY message before send() does, in which case we need to
    328      * know if it completed.
    329      */
    330     int control_ready_expected;
    331 
    332     /* number of outstanding writes */
    333     int nb_sent;
    334 
    335     /* store info about current buffer so that we can
    336        merge it with future sends */
    337     uint64_t current_addr;
    338     uint64_t current_length;
    339     /* index of ram block the current buffer belongs to */
    340     int current_index;
    341     /* index of the chunk in the current ram block */
    342     int current_chunk;
    343 
    344     bool pin_all;
    345 
    346     /*
    347      * infiniband-specific variables for opening the device
    348      * and maintaining connection state and so forth.
    349      *
    350      * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
    351      * cm_id->verbs, cm_id->channel, and cm_id->qp.
    352      */
    353     struct rdma_cm_id *cm_id;               /* connection manager ID */
    354     struct rdma_cm_id *listen_id;
    355     bool connected;
    356 
    357     struct ibv_context          *verbs;
    358     struct rdma_event_channel   *channel;
    359     struct ibv_qp *qp;                      /* queue pair */
    360     struct ibv_comp_channel *recv_comp_channel;  /* recv completion channel */
    361     struct ibv_comp_channel *send_comp_channel;  /* send completion channel */
    362     struct ibv_pd *pd;                      /* protection domain */
    363     struct ibv_cq *recv_cq;                 /* recvieve completion queue */
    364     struct ibv_cq *send_cq;                 /* send completion queue */
    365 
    366     /*
    367      * If a previous write failed (perhaps because of a failed
    368      * memory registration, then do not attempt any future work
    369      * and remember the error state.
    370      */
    371     int error_state;
    372     int error_reported;
    373     int received_error;
    374 
    375     /*
    376      * Description of ram blocks used throughout the code.
    377      */
    378     RDMALocalBlocks local_ram_blocks;
    379     RDMADestBlock  *dest_blocks;
    380 
    381     /* Index of the next RAMBlock received during block registration */
    382     unsigned int    next_src_index;
    383 
    384     /*
    385      * Migration on *destination* started.
    386      * Then use coroutine yield function.
    387      * Source runs in a thread, so we don't care.
    388      */
    389     int migration_started_on_destination;
    390 
    391     int total_registrations;
    392     int total_writes;
    393 
    394     int unregister_current, unregister_next;
    395     uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
    396 
    397     GHashTable *blockmap;
    398 
    399     /* the RDMAContext for return path */
    400     struct RDMAContext *return_path;
    401     bool is_return_path;
    402 } RDMAContext;
    403 
    404 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
    405 OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA)
    406 
    407 
    408 
    409 struct QIOChannelRDMA {
    410     QIOChannel parent;
    411     RDMAContext *rdmain;
    412     RDMAContext *rdmaout;
    413     QEMUFile *file;
    414     bool blocking; /* XXX we don't actually honour this yet */
    415 };
    416 
    417 /*
    418  * Main structure for IB Send/Recv control messages.
    419  * This gets prepended at the beginning of every Send/Recv.
    420  */
    421 typedef struct QEMU_PACKED {
    422     uint32_t len;     /* Total length of data portion */
    423     uint32_t type;    /* which control command to perform */
    424     uint32_t repeat;  /* number of commands in data portion of same type */
    425     uint32_t padding;
    426 } RDMAControlHeader;
    427 
    428 static void control_to_network(RDMAControlHeader *control)
    429 {
    430     control->type = htonl(control->type);
    431     control->len = htonl(control->len);
    432     control->repeat = htonl(control->repeat);
    433 }
    434 
    435 static void network_to_control(RDMAControlHeader *control)
    436 {
    437     control->type = ntohl(control->type);
    438     control->len = ntohl(control->len);
    439     control->repeat = ntohl(control->repeat);
    440 }
    441 
    442 /*
    443  * Register a single Chunk.
    444  * Information sent by the source VM to inform the dest
    445  * to register an single chunk of memory before we can perform
    446  * the actual RDMA operation.
    447  */
    448 typedef struct QEMU_PACKED {
    449     union QEMU_PACKED {
    450         uint64_t current_addr;  /* offset into the ram_addr_t space */
    451         uint64_t chunk;         /* chunk to lookup if unregistering */
    452     } key;
    453     uint32_t current_index; /* which ramblock the chunk belongs to */
    454     uint32_t padding;
    455     uint64_t chunks;            /* how many sequential chunks to register */
    456 } RDMARegister;
    457 
    458 static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
    459 {
    460     RDMALocalBlock *local_block;
    461     local_block  = &rdma->local_ram_blocks.block[reg->current_index];
    462 
    463     if (local_block->is_ram_block) {
    464         /*
    465          * current_addr as passed in is an address in the local ram_addr_t
    466          * space, we need to translate this for the destination
    467          */
    468         reg->key.current_addr -= local_block->offset;
    469         reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
    470     }
    471     reg->key.current_addr = htonll(reg->key.current_addr);
    472     reg->current_index = htonl(reg->current_index);
    473     reg->chunks = htonll(reg->chunks);
    474 }
    475 
    476 static void network_to_register(RDMARegister *reg)
    477 {
    478     reg->key.current_addr = ntohll(reg->key.current_addr);
    479     reg->current_index = ntohl(reg->current_index);
    480     reg->chunks = ntohll(reg->chunks);
    481 }
    482 
    483 typedef struct QEMU_PACKED {
    484     uint32_t value;     /* if zero, we will madvise() */
    485     uint32_t block_idx; /* which ram block index */
    486     uint64_t offset;    /* Address in remote ram_addr_t space */
    487     uint64_t length;    /* length of the chunk */
    488 } RDMACompress;
    489 
    490 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
    491 {
    492     comp->value = htonl(comp->value);
    493     /*
    494      * comp->offset as passed in is an address in the local ram_addr_t
    495      * space, we need to translate this for the destination
    496      */
    497     comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
    498     comp->offset += rdma->dest_blocks[comp->block_idx].offset;
    499     comp->block_idx = htonl(comp->block_idx);
    500     comp->offset = htonll(comp->offset);
    501     comp->length = htonll(comp->length);
    502 }
    503 
    504 static void network_to_compress(RDMACompress *comp)
    505 {
    506     comp->value = ntohl(comp->value);
    507     comp->block_idx = ntohl(comp->block_idx);
    508     comp->offset = ntohll(comp->offset);
    509     comp->length = ntohll(comp->length);
    510 }
    511 
    512 /*
    513  * The result of the dest's memory registration produces an "rkey"
    514  * which the source VM must reference in order to perform
    515  * the RDMA operation.
    516  */
    517 typedef struct QEMU_PACKED {
    518     uint32_t rkey;
    519     uint32_t padding;
    520     uint64_t host_addr;
    521 } RDMARegisterResult;
    522 
    523 static void result_to_network(RDMARegisterResult *result)
    524 {
    525     result->rkey = htonl(result->rkey);
    526     result->host_addr = htonll(result->host_addr);
    527 };
    528 
    529 static void network_to_result(RDMARegisterResult *result)
    530 {
    531     result->rkey = ntohl(result->rkey);
    532     result->host_addr = ntohll(result->host_addr);
    533 };
    534 
    535 const char *print_wrid(int wrid);
    536 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
    537                                    uint8_t *data, RDMAControlHeader *resp,
    538                                    int *resp_idx,
    539                                    int (*callback)(RDMAContext *rdma));
    540 
    541 static inline uint64_t ram_chunk_index(const uint8_t *start,
    542                                        const uint8_t *host)
    543 {
    544     return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
    545 }
    546 
    547 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
    548                                        uint64_t i)
    549 {
    550     return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
    551                                   (i << RDMA_REG_CHUNK_SHIFT));
    552 }
    553 
    554 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
    555                                      uint64_t i)
    556 {
    557     uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
    558                                          (1UL << RDMA_REG_CHUNK_SHIFT);
    559 
    560     if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
    561         result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
    562     }
    563 
    564     return result;
    565 }
    566 
    567 static int rdma_add_block(RDMAContext *rdma, const char *block_name,
    568                          void *host_addr,
    569                          ram_addr_t block_offset, uint64_t length)
    570 {
    571     RDMALocalBlocks *local = &rdma->local_ram_blocks;
    572     RDMALocalBlock *block;
    573     RDMALocalBlock *old = local->block;
    574 
    575     local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
    576 
    577     if (local->nb_blocks) {
    578         int x;
    579 
    580         if (rdma->blockmap) {
    581             for (x = 0; x < local->nb_blocks; x++) {
    582                 g_hash_table_remove(rdma->blockmap,
    583                                     (void *)(uintptr_t)old[x].offset);
    584                 g_hash_table_insert(rdma->blockmap,
    585                                     (void *)(uintptr_t)old[x].offset,
    586                                     &local->block[x]);
    587             }
    588         }
    589         memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
    590         g_free(old);
    591     }
    592 
    593     block = &local->block[local->nb_blocks];
    594 
    595     block->block_name = g_strdup(block_name);
    596     block->local_host_addr = host_addr;
    597     block->offset = block_offset;
    598     block->length = length;
    599     block->index = local->nb_blocks;
    600     block->src_index = ~0U; /* Filled in by the receipt of the block list */
    601     block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
    602     block->transit_bitmap = bitmap_new(block->nb_chunks);
    603     bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
    604     block->unregister_bitmap = bitmap_new(block->nb_chunks);
    605     bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
    606     block->remote_keys = g_new0(uint32_t, block->nb_chunks);
    607 
    608     block->is_ram_block = local->init ? false : true;
    609 
    610     if (rdma->blockmap) {
    611         g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
    612     }
    613 
    614     trace_rdma_add_block(block_name, local->nb_blocks,
    615                          (uintptr_t) block->local_host_addr,
    616                          block->offset, block->length,
    617                          (uintptr_t) (block->local_host_addr + block->length),
    618                          BITS_TO_LONGS(block->nb_chunks) *
    619                              sizeof(unsigned long) * 8,
    620                          block->nb_chunks);
    621 
    622     local->nb_blocks++;
    623 
    624     return 0;
    625 }
    626 
    627 /*
    628  * Memory regions need to be registered with the device and queue pairs setup
    629  * in advanced before the migration starts. This tells us where the RAM blocks
    630  * are so that we can register them individually.
    631  */
    632 static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque)
    633 {
    634     const char *block_name = qemu_ram_get_idstr(rb);
    635     void *host_addr = qemu_ram_get_host_addr(rb);
    636     ram_addr_t block_offset = qemu_ram_get_offset(rb);
    637     ram_addr_t length = qemu_ram_get_used_length(rb);
    638     return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
    639 }
    640 
    641 /*
    642  * Identify the RAMBlocks and their quantity. They will be references to
    643  * identify chunk boundaries inside each RAMBlock and also be referenced
    644  * during dynamic page registration.
    645  */
    646 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
    647 {
    648     RDMALocalBlocks *local = &rdma->local_ram_blocks;
    649     int ret;
    650 
    651     assert(rdma->blockmap == NULL);
    652     memset(local, 0, sizeof *local);
    653     ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma);
    654     if (ret) {
    655         return ret;
    656     }
    657     trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
    658     rdma->dest_blocks = g_new0(RDMADestBlock,
    659                                rdma->local_ram_blocks.nb_blocks);
    660     local->init = true;
    661     return 0;
    662 }
    663 
    664 /*
    665  * Note: If used outside of cleanup, the caller must ensure that the destination
    666  * block structures are also updated
    667  */
    668 static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
    669 {
    670     RDMALocalBlocks *local = &rdma->local_ram_blocks;
    671     RDMALocalBlock *old = local->block;
    672     int x;
    673 
    674     if (rdma->blockmap) {
    675         g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
    676     }
    677     if (block->pmr) {
    678         int j;
    679 
    680         for (j = 0; j < block->nb_chunks; j++) {
    681             if (!block->pmr[j]) {
    682                 continue;
    683             }
    684             ibv_dereg_mr(block->pmr[j]);
    685             rdma->total_registrations--;
    686         }
    687         g_free(block->pmr);
    688         block->pmr = NULL;
    689     }
    690 
    691     if (block->mr) {
    692         ibv_dereg_mr(block->mr);
    693         rdma->total_registrations--;
    694         block->mr = NULL;
    695     }
    696 
    697     g_free(block->transit_bitmap);
    698     block->transit_bitmap = NULL;
    699 
    700     g_free(block->unregister_bitmap);
    701     block->unregister_bitmap = NULL;
    702 
    703     g_free(block->remote_keys);
    704     block->remote_keys = NULL;
    705 
    706     g_free(block->block_name);
    707     block->block_name = NULL;
    708 
    709     if (rdma->blockmap) {
    710         for (x = 0; x < local->nb_blocks; x++) {
    711             g_hash_table_remove(rdma->blockmap,
    712                                 (void *)(uintptr_t)old[x].offset);
    713         }
    714     }
    715 
    716     if (local->nb_blocks > 1) {
    717 
    718         local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
    719 
    720         if (block->index) {
    721             memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
    722         }
    723 
    724         if (block->index < (local->nb_blocks - 1)) {
    725             memcpy(local->block + block->index, old + (block->index + 1),
    726                 sizeof(RDMALocalBlock) *
    727                     (local->nb_blocks - (block->index + 1)));
    728             for (x = block->index; x < local->nb_blocks - 1; x++) {
    729                 local->block[x].index--;
    730             }
    731         }
    732     } else {
    733         assert(block == local->block);
    734         local->block = NULL;
    735     }
    736 
    737     trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
    738                            block->offset, block->length,
    739                             (uintptr_t)(block->local_host_addr + block->length),
    740                            BITS_TO_LONGS(block->nb_chunks) *
    741                                sizeof(unsigned long) * 8, block->nb_chunks);
    742 
    743     g_free(old);
    744 
    745     local->nb_blocks--;
    746 
    747     if (local->nb_blocks && rdma->blockmap) {
    748         for (x = 0; x < local->nb_blocks; x++) {
    749             g_hash_table_insert(rdma->blockmap,
    750                                 (void *)(uintptr_t)local->block[x].offset,
    751                                 &local->block[x]);
    752         }
    753     }
    754 
    755     return 0;
    756 }
    757 
    758 /*
    759  * Put in the log file which RDMA device was opened and the details
    760  * associated with that device.
    761  */
    762 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
    763 {
    764     struct ibv_port_attr port;
    765 
    766     if (ibv_query_port(verbs, 1, &port)) {
    767         error_report("Failed to query port information");
    768         return;
    769     }
    770 
    771     printf("%s RDMA Device opened: kernel name %s "
    772            "uverbs device name %s, "
    773            "infiniband_verbs class device path %s, "
    774            "infiniband class device path %s, "
    775            "transport: (%d) %s\n",
    776                 who,
    777                 verbs->device->name,
    778                 verbs->device->dev_name,
    779                 verbs->device->dev_path,
    780                 verbs->device->ibdev_path,
    781                 port.link_layer,
    782                 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
    783                  ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
    784                     ? "Ethernet" : "Unknown"));
    785 }
    786 
    787 /*
    788  * Put in the log file the RDMA gid addressing information,
    789  * useful for folks who have trouble understanding the
    790  * RDMA device hierarchy in the kernel.
    791  */
    792 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
    793 {
    794     char sgid[33];
    795     char dgid[33];
    796     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
    797     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
    798     trace_qemu_rdma_dump_gid(who, sgid, dgid);
    799 }
    800 
    801 /*
    802  * As of now, IPv6 over RoCE / iWARP is not supported by linux.
    803  * We will try the next addrinfo struct, and fail if there are
    804  * no other valid addresses to bind against.
    805  *
    806  * If user is listening on '[::]', then we will not have a opened a device
    807  * yet and have no way of verifying if the device is RoCE or not.
    808  *
    809  * In this case, the source VM will throw an error for ALL types of
    810  * connections (both IPv4 and IPv6) if the destination machine does not have
    811  * a regular infiniband network available for use.
    812  *
    813  * The only way to guarantee that an error is thrown for broken kernels is
    814  * for the management software to choose a *specific* interface at bind time
    815  * and validate what time of hardware it is.
    816  *
    817  * Unfortunately, this puts the user in a fix:
    818  *
    819  *  If the source VM connects with an IPv4 address without knowing that the
    820  *  destination has bound to '[::]' the migration will unconditionally fail
    821  *  unless the management software is explicitly listening on the IPv4
    822  *  address while using a RoCE-based device.
    823  *
    824  *  If the source VM connects with an IPv6 address, then we're OK because we can
    825  *  throw an error on the source (and similarly on the destination).
    826  *
    827  *  But in mixed environments, this will be broken for a while until it is fixed
    828  *  inside linux.
    829  *
    830  * We do provide a *tiny* bit of help in this function: We can list all of the
    831  * devices in the system and check to see if all the devices are RoCE or
    832  * Infiniband.
    833  *
    834  * If we detect that we have a *pure* RoCE environment, then we can safely
    835  * thrown an error even if the management software has specified '[::]' as the
    836  * bind address.
    837  *
    838  * However, if there is are multiple hetergeneous devices, then we cannot make
    839  * this assumption and the user just has to be sure they know what they are
    840  * doing.
    841  *
    842  * Patches are being reviewed on linux-rdma.
    843  */
    844 static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
    845 {
    846     /* This bug only exists in linux, to our knowledge. */
    847 #ifdef CONFIG_LINUX
    848     struct ibv_port_attr port_attr;
    849 
    850     /*
    851      * Verbs are only NULL if management has bound to '[::]'.
    852      *
    853      * Let's iterate through all the devices and see if there any pure IB
    854      * devices (non-ethernet).
    855      *
    856      * If not, then we can safely proceed with the migration.
    857      * Otherwise, there are no guarantees until the bug is fixed in linux.
    858      */
    859     if (!verbs) {
    860         int num_devices, x;
    861         struct ibv_device **dev_list = ibv_get_device_list(&num_devices);
    862         bool roce_found = false;
    863         bool ib_found = false;
    864 
    865         for (x = 0; x < num_devices; x++) {
    866             verbs = ibv_open_device(dev_list[x]);
    867             if (!verbs) {
    868                 if (errno == EPERM) {
    869                     continue;
    870                 } else {
    871                     return -EINVAL;
    872                 }
    873             }
    874 
    875             if (ibv_query_port(verbs, 1, &port_attr)) {
    876                 ibv_close_device(verbs);
    877                 ERROR(errp, "Could not query initial IB port");
    878                 return -EINVAL;
    879             }
    880 
    881             if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
    882                 ib_found = true;
    883             } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
    884                 roce_found = true;
    885             }
    886 
    887             ibv_close_device(verbs);
    888 
    889         }
    890 
    891         if (roce_found) {
    892             if (ib_found) {
    893                 fprintf(stderr, "WARN: migrations may fail:"
    894                                 " IPv6 over RoCE / iWARP in linux"
    895                                 " is broken. But since you appear to have a"
    896                                 " mixed RoCE / IB environment, be sure to only"
    897                                 " migrate over the IB fabric until the kernel "
    898                                 " fixes the bug.\n");
    899             } else {
    900                 ERROR(errp, "You only have RoCE / iWARP devices in your systems"
    901                             " and your management software has specified '[::]'"
    902                             ", but IPv6 over RoCE / iWARP is not supported in Linux.");
    903                 return -ENONET;
    904             }
    905         }
    906 
    907         return 0;
    908     }
    909 
    910     /*
    911      * If we have a verbs context, that means that some other than '[::]' was
    912      * used by the management software for binding. In which case we can
    913      * actually warn the user about a potentially broken kernel.
    914      */
    915 
    916     /* IB ports start with 1, not 0 */
    917     if (ibv_query_port(verbs, 1, &port_attr)) {
    918         ERROR(errp, "Could not query initial IB port");
    919         return -EINVAL;
    920     }
    921 
    922     if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
    923         ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
    924                     "(but patches on linux-rdma in progress)");
    925         return -ENONET;
    926     }
    927 
    928 #endif
    929 
    930     return 0;
    931 }
    932 
    933 /*
    934  * Figure out which RDMA device corresponds to the requested IP hostname
    935  * Also create the initial connection manager identifiers for opening
    936  * the connection.
    937  */
    938 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
    939 {
    940     int ret;
    941     struct rdma_addrinfo *res;
    942     char port_str[16];
    943     struct rdma_cm_event *cm_event;
    944     char ip[40] = "unknown";
    945     struct rdma_addrinfo *e;
    946 
    947     if (rdma->host == NULL || !strcmp(rdma->host, "")) {
    948         ERROR(errp, "RDMA hostname has not been set");
    949         return -EINVAL;
    950     }
    951 
    952     /* create CM channel */
    953     rdma->channel = rdma_create_event_channel();
    954     if (!rdma->channel) {
    955         ERROR(errp, "could not create CM channel");
    956         return -EINVAL;
    957     }
    958 
    959     /* create CM id */
    960     ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
    961     if (ret) {
    962         ERROR(errp, "could not create channel id");
    963         goto err_resolve_create_id;
    964     }
    965 
    966     snprintf(port_str, 16, "%d", rdma->port);
    967     port_str[15] = '\0';
    968 
    969     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
    970     if (ret < 0) {
    971         ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
    972         goto err_resolve_get_addr;
    973     }
    974 
    975     for (e = res; e != NULL; e = e->ai_next) {
    976         inet_ntop(e->ai_family,
    977             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
    978         trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
    979 
    980         ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
    981                 RDMA_RESOLVE_TIMEOUT_MS);
    982         if (!ret) {
    983             if (e->ai_family == AF_INET6) {
    984                 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp);
    985                 if (ret) {
    986                     continue;
    987                 }
    988             }
    989             goto route;
    990         }
    991     }
    992 
    993     rdma_freeaddrinfo(res);
    994     ERROR(errp, "could not resolve address %s", rdma->host);
    995     goto err_resolve_get_addr;
    996 
    997 route:
    998     rdma_freeaddrinfo(res);
    999     qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
   1000 
   1001     ret = rdma_get_cm_event(rdma->channel, &cm_event);
   1002     if (ret) {
   1003         ERROR(errp, "could not perform event_addr_resolved");
   1004         goto err_resolve_get_addr;
   1005     }
   1006 
   1007     if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
   1008         ERROR(errp, "result not equal to event_addr_resolved %s",
   1009                 rdma_event_str(cm_event->event));
   1010         error_report("rdma_resolve_addr");
   1011         rdma_ack_cm_event(cm_event);
   1012         ret = -EINVAL;
   1013         goto err_resolve_get_addr;
   1014     }
   1015     rdma_ack_cm_event(cm_event);
   1016 
   1017     /* resolve route */
   1018     ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
   1019     if (ret) {
   1020         ERROR(errp, "could not resolve rdma route");
   1021         goto err_resolve_get_addr;
   1022     }
   1023 
   1024     ret = rdma_get_cm_event(rdma->channel, &cm_event);
   1025     if (ret) {
   1026         ERROR(errp, "could not perform event_route_resolved");
   1027         goto err_resolve_get_addr;
   1028     }
   1029     if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
   1030         ERROR(errp, "result not equal to event_route_resolved: %s",
   1031                         rdma_event_str(cm_event->event));
   1032         rdma_ack_cm_event(cm_event);
   1033         ret = -EINVAL;
   1034         goto err_resolve_get_addr;
   1035     }
   1036     rdma_ack_cm_event(cm_event);
   1037     rdma->verbs = rdma->cm_id->verbs;
   1038     qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
   1039     qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
   1040     return 0;
   1041 
   1042 err_resolve_get_addr:
   1043     rdma_destroy_id(rdma->cm_id);
   1044     rdma->cm_id = NULL;
   1045 err_resolve_create_id:
   1046     rdma_destroy_event_channel(rdma->channel);
   1047     rdma->channel = NULL;
   1048     return ret;
   1049 }
   1050 
   1051 /*
   1052  * Create protection domain and completion queues
   1053  */
   1054 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
   1055 {
   1056     /* allocate pd */
   1057     rdma->pd = ibv_alloc_pd(rdma->verbs);
   1058     if (!rdma->pd) {
   1059         error_report("failed to allocate protection domain");
   1060         return -1;
   1061     }
   1062 
   1063     /* create receive completion channel */
   1064     rdma->recv_comp_channel = ibv_create_comp_channel(rdma->verbs);
   1065     if (!rdma->recv_comp_channel) {
   1066         error_report("failed to allocate receive completion channel");
   1067         goto err_alloc_pd_cq;
   1068     }
   1069 
   1070     /*
   1071      * Completion queue can be filled by read work requests.
   1072      */
   1073     rdma->recv_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
   1074                                   NULL, rdma->recv_comp_channel, 0);
   1075     if (!rdma->recv_cq) {
   1076         error_report("failed to allocate receive completion queue");
   1077         goto err_alloc_pd_cq;
   1078     }
   1079 
   1080     /* create send completion channel */
   1081     rdma->send_comp_channel = ibv_create_comp_channel(rdma->verbs);
   1082     if (!rdma->send_comp_channel) {
   1083         error_report("failed to allocate send completion channel");
   1084         goto err_alloc_pd_cq;
   1085     }
   1086 
   1087     rdma->send_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
   1088                                   NULL, rdma->send_comp_channel, 0);
   1089     if (!rdma->send_cq) {
   1090         error_report("failed to allocate send completion queue");
   1091         goto err_alloc_pd_cq;
   1092     }
   1093 
   1094     return 0;
   1095 
   1096 err_alloc_pd_cq:
   1097     if (rdma->pd) {
   1098         ibv_dealloc_pd(rdma->pd);
   1099     }
   1100     if (rdma->recv_comp_channel) {
   1101         ibv_destroy_comp_channel(rdma->recv_comp_channel);
   1102     }
   1103     if (rdma->send_comp_channel) {
   1104         ibv_destroy_comp_channel(rdma->send_comp_channel);
   1105     }
   1106     if (rdma->recv_cq) {
   1107         ibv_destroy_cq(rdma->recv_cq);
   1108         rdma->recv_cq = NULL;
   1109     }
   1110     rdma->pd = NULL;
   1111     rdma->recv_comp_channel = NULL;
   1112     rdma->send_comp_channel = NULL;
   1113     return -1;
   1114 
   1115 }
   1116 
   1117 /*
   1118  * Create queue pairs.
   1119  */
   1120 static int qemu_rdma_alloc_qp(RDMAContext *rdma)
   1121 {
   1122     struct ibv_qp_init_attr attr = { 0 };
   1123     int ret;
   1124 
   1125     attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
   1126     attr.cap.max_recv_wr = 3;
   1127     attr.cap.max_send_sge = 1;
   1128     attr.cap.max_recv_sge = 1;
   1129     attr.send_cq = rdma->send_cq;
   1130     attr.recv_cq = rdma->recv_cq;
   1131     attr.qp_type = IBV_QPT_RC;
   1132 
   1133     ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
   1134     if (ret) {
   1135         return -1;
   1136     }
   1137 
   1138     rdma->qp = rdma->cm_id->qp;
   1139     return 0;
   1140 }
   1141 
   1142 /* Check whether On-Demand Paging is supported by RDAM device */
   1143 static bool rdma_support_odp(struct ibv_context *dev)
   1144 {
   1145     struct ibv_device_attr_ex attr = {0};
   1146     int ret = ibv_query_device_ex(dev, NULL, &attr);
   1147     if (ret) {
   1148         return false;
   1149     }
   1150 
   1151     if (attr.odp_caps.general_caps & IBV_ODP_SUPPORT) {
   1152         return true;
   1153     }
   1154 
   1155     return false;
   1156 }
   1157 
   1158 /*
   1159  * ibv_advise_mr to avoid RNR NAK error as far as possible.
   1160  * The responder mr registering with ODP will sent RNR NAK back to
   1161  * the requester in the face of the page fault.
   1162  */
   1163 static void qemu_rdma_advise_prefetch_mr(struct ibv_pd *pd, uint64_t addr,
   1164                                          uint32_t len,  uint32_t lkey,
   1165                                          const char *name, bool wr)
   1166 {
   1167 #ifdef HAVE_IBV_ADVISE_MR
   1168     int ret;
   1169     int advice = wr ? IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE :
   1170                  IBV_ADVISE_MR_ADVICE_PREFETCH;
   1171     struct ibv_sge sg_list = {.lkey = lkey, .addr = addr, .length = len};
   1172 
   1173     ret = ibv_advise_mr(pd, advice,
   1174                         IBV_ADVISE_MR_FLAG_FLUSH, &sg_list, 1);
   1175     /* ignore the error */
   1176     if (ret) {
   1177         trace_qemu_rdma_advise_mr(name, len, addr, strerror(errno));
   1178     } else {
   1179         trace_qemu_rdma_advise_mr(name, len, addr, "successed");
   1180     }
   1181 #endif
   1182 }
   1183 
   1184 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
   1185 {
   1186     int i;
   1187     RDMALocalBlocks *local = &rdma->local_ram_blocks;
   1188 
   1189     for (i = 0; i < local->nb_blocks; i++) {
   1190         int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE;
   1191 
   1192         local->block[i].mr =
   1193             ibv_reg_mr(rdma->pd,
   1194                     local->block[i].local_host_addr,
   1195                     local->block[i].length, access
   1196                     );
   1197 
   1198         if (!local->block[i].mr &&
   1199             errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
   1200                 access |= IBV_ACCESS_ON_DEMAND;
   1201                 /* register ODP mr */
   1202                 local->block[i].mr =
   1203                     ibv_reg_mr(rdma->pd,
   1204                                local->block[i].local_host_addr,
   1205                                local->block[i].length, access);
   1206                 trace_qemu_rdma_register_odp_mr(local->block[i].block_name);
   1207 
   1208                 if (local->block[i].mr) {
   1209                     qemu_rdma_advise_prefetch_mr(rdma->pd,
   1210                                     (uintptr_t)local->block[i].local_host_addr,
   1211                                     local->block[i].length,
   1212                                     local->block[i].mr->lkey,
   1213                                     local->block[i].block_name,
   1214                                     true);
   1215                 }
   1216         }
   1217 
   1218         if (!local->block[i].mr) {
   1219             perror("Failed to register local dest ram block!");
   1220             break;
   1221         }
   1222         rdma->total_registrations++;
   1223     }
   1224 
   1225     if (i >= local->nb_blocks) {
   1226         return 0;
   1227     }
   1228 
   1229     for (i--; i >= 0; i--) {
   1230         ibv_dereg_mr(local->block[i].mr);
   1231         local->block[i].mr = NULL;
   1232         rdma->total_registrations--;
   1233     }
   1234 
   1235     return -1;
   1236 
   1237 }
   1238 
   1239 /*
   1240  * Find the ram block that corresponds to the page requested to be
   1241  * transmitted by QEMU.
   1242  *
   1243  * Once the block is found, also identify which 'chunk' within that
   1244  * block that the page belongs to.
   1245  *
   1246  * This search cannot fail or the migration will fail.
   1247  */
   1248 static int qemu_rdma_search_ram_block(RDMAContext *rdma,
   1249                                       uintptr_t block_offset,
   1250                                       uint64_t offset,
   1251                                       uint64_t length,
   1252                                       uint64_t *block_index,
   1253                                       uint64_t *chunk_index)
   1254 {
   1255     uint64_t current_addr = block_offset + offset;
   1256     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
   1257                                                 (void *) block_offset);
   1258     assert(block);
   1259     assert(current_addr >= block->offset);
   1260     assert((current_addr + length) <= (block->offset + block->length));
   1261 
   1262     *block_index = block->index;
   1263     *chunk_index = ram_chunk_index(block->local_host_addr,
   1264                 block->local_host_addr + (current_addr - block->offset));
   1265 
   1266     return 0;
   1267 }
   1268 
   1269 /*
   1270  * Register a chunk with IB. If the chunk was already registered
   1271  * previously, then skip.
   1272  *
   1273  * Also return the keys associated with the registration needed
   1274  * to perform the actual RDMA operation.
   1275  */
   1276 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
   1277         RDMALocalBlock *block, uintptr_t host_addr,
   1278         uint32_t *lkey, uint32_t *rkey, int chunk,
   1279         uint8_t *chunk_start, uint8_t *chunk_end)
   1280 {
   1281     if (block->mr) {
   1282         if (lkey) {
   1283             *lkey = block->mr->lkey;
   1284         }
   1285         if (rkey) {
   1286             *rkey = block->mr->rkey;
   1287         }
   1288         return 0;
   1289     }
   1290 
   1291     /* allocate memory to store chunk MRs */
   1292     if (!block->pmr) {
   1293         block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
   1294     }
   1295 
   1296     /*
   1297      * If 'rkey', then we're the destination, so grant access to the source.
   1298      *
   1299      * If 'lkey', then we're the source VM, so grant access only to ourselves.
   1300      */
   1301     if (!block->pmr[chunk]) {
   1302         uint64_t len = chunk_end - chunk_start;
   1303         int access = rkey ? IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE :
   1304                      0;
   1305 
   1306         trace_qemu_rdma_register_and_get_keys(len, chunk_start);
   1307 
   1308         block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
   1309         if (!block->pmr[chunk] &&
   1310             errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
   1311             access |= IBV_ACCESS_ON_DEMAND;
   1312             /* register ODP mr */
   1313             block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
   1314             trace_qemu_rdma_register_odp_mr(block->block_name);
   1315 
   1316             if (block->pmr[chunk]) {
   1317                 qemu_rdma_advise_prefetch_mr(rdma->pd, (uintptr_t)chunk_start,
   1318                                             len, block->pmr[chunk]->lkey,
   1319                                             block->block_name, rkey);
   1320 
   1321             }
   1322         }
   1323     }
   1324     if (!block->pmr[chunk]) {
   1325         perror("Failed to register chunk!");
   1326         fprintf(stderr, "Chunk details: block: %d chunk index %d"
   1327                         " start %" PRIuPTR " end %" PRIuPTR
   1328                         " host %" PRIuPTR
   1329                         " local %" PRIuPTR " registrations: %d\n",
   1330                         block->index, chunk, (uintptr_t)chunk_start,
   1331                         (uintptr_t)chunk_end, host_addr,
   1332                         (uintptr_t)block->local_host_addr,
   1333                         rdma->total_registrations);
   1334         return -1;
   1335     }
   1336     rdma->total_registrations++;
   1337 
   1338     if (lkey) {
   1339         *lkey = block->pmr[chunk]->lkey;
   1340     }
   1341     if (rkey) {
   1342         *rkey = block->pmr[chunk]->rkey;
   1343     }
   1344     return 0;
   1345 }
   1346 
   1347 /*
   1348  * Register (at connection time) the memory used for control
   1349  * channel messages.
   1350  */
   1351 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
   1352 {
   1353     rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
   1354             rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
   1355             IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
   1356     if (rdma->wr_data[idx].control_mr) {
   1357         rdma->total_registrations++;
   1358         return 0;
   1359     }
   1360     error_report("qemu_rdma_reg_control failed");
   1361     return -1;
   1362 }
   1363 
   1364 const char *print_wrid(int wrid)
   1365 {
   1366     if (wrid >= RDMA_WRID_RECV_CONTROL) {
   1367         return wrid_desc[RDMA_WRID_RECV_CONTROL];
   1368     }
   1369     return wrid_desc[wrid];
   1370 }
   1371 
   1372 /*
   1373  * Perform a non-optimized memory unregistration after every transfer
   1374  * for demonstration purposes, only if pin-all is not requested.
   1375  *
   1376  * Potential optimizations:
   1377  * 1. Start a new thread to run this function continuously
   1378         - for bit clearing
   1379         - and for receipt of unregister messages
   1380  * 2. Use an LRU.
   1381  * 3. Use workload hints.
   1382  */
   1383 static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
   1384 {
   1385     while (rdma->unregistrations[rdma->unregister_current]) {
   1386         int ret;
   1387         uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
   1388         uint64_t chunk =
   1389             (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
   1390         uint64_t index =
   1391             (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
   1392         RDMALocalBlock *block =
   1393             &(rdma->local_ram_blocks.block[index]);
   1394         RDMARegister reg = { .current_index = index };
   1395         RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
   1396                                  };
   1397         RDMAControlHeader head = { .len = sizeof(RDMARegister),
   1398                                    .type = RDMA_CONTROL_UNREGISTER_REQUEST,
   1399                                    .repeat = 1,
   1400                                  };
   1401 
   1402         trace_qemu_rdma_unregister_waiting_proc(chunk,
   1403                                                 rdma->unregister_current);
   1404 
   1405         rdma->unregistrations[rdma->unregister_current] = 0;
   1406         rdma->unregister_current++;
   1407 
   1408         if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
   1409             rdma->unregister_current = 0;
   1410         }
   1411 
   1412 
   1413         /*
   1414          * Unregistration is speculative (because migration is single-threaded
   1415          * and we cannot break the protocol's inifinband message ordering).
   1416          * Thus, if the memory is currently being used for transmission,
   1417          * then abort the attempt to unregister and try again
   1418          * later the next time a completion is received for this memory.
   1419          */
   1420         clear_bit(chunk, block->unregister_bitmap);
   1421 
   1422         if (test_bit(chunk, block->transit_bitmap)) {
   1423             trace_qemu_rdma_unregister_waiting_inflight(chunk);
   1424             continue;
   1425         }
   1426 
   1427         trace_qemu_rdma_unregister_waiting_send(chunk);
   1428 
   1429         ret = ibv_dereg_mr(block->pmr[chunk]);
   1430         block->pmr[chunk] = NULL;
   1431         block->remote_keys[chunk] = 0;
   1432 
   1433         if (ret != 0) {
   1434             perror("unregistration chunk failed");
   1435             return -ret;
   1436         }
   1437         rdma->total_registrations--;
   1438 
   1439         reg.key.chunk = chunk;
   1440         register_to_network(rdma, &reg);
   1441         ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
   1442                                 &resp, NULL, NULL);
   1443         if (ret < 0) {
   1444             return ret;
   1445         }
   1446 
   1447         trace_qemu_rdma_unregister_waiting_complete(chunk);
   1448     }
   1449 
   1450     return 0;
   1451 }
   1452 
   1453 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
   1454                                          uint64_t chunk)
   1455 {
   1456     uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
   1457 
   1458     result |= (index << RDMA_WRID_BLOCK_SHIFT);
   1459     result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
   1460 
   1461     return result;
   1462 }
   1463 
   1464 /*
   1465  * Consult the connection manager to see a work request
   1466  * (of any kind) has completed.
   1467  * Return the work request ID that completed.
   1468  */
   1469 static uint64_t qemu_rdma_poll(RDMAContext *rdma, struct ibv_cq *cq,
   1470                                uint64_t *wr_id_out, uint32_t *byte_len)
   1471 {
   1472     int ret;
   1473     struct ibv_wc wc;
   1474     uint64_t wr_id;
   1475 
   1476     ret = ibv_poll_cq(cq, 1, &wc);
   1477 
   1478     if (!ret) {
   1479         *wr_id_out = RDMA_WRID_NONE;
   1480         return 0;
   1481     }
   1482 
   1483     if (ret < 0) {
   1484         error_report("ibv_poll_cq return %d", ret);
   1485         return ret;
   1486     }
   1487 
   1488     wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
   1489 
   1490     if (wc.status != IBV_WC_SUCCESS) {
   1491         fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
   1492                         wc.status, ibv_wc_status_str(wc.status));
   1493         fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
   1494 
   1495         return -1;
   1496     }
   1497 
   1498     if (rdma->control_ready_expected &&
   1499         (wr_id >= RDMA_WRID_RECV_CONTROL)) {
   1500         trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL],
   1501                   wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
   1502         rdma->control_ready_expected = 0;
   1503     }
   1504 
   1505     if (wr_id == RDMA_WRID_RDMA_WRITE) {
   1506         uint64_t chunk =
   1507             (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
   1508         uint64_t index =
   1509             (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
   1510         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
   1511 
   1512         trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
   1513                                    index, chunk, block->local_host_addr,
   1514                                    (void *)(uintptr_t)block->remote_host_addr);
   1515 
   1516         clear_bit(chunk, block->transit_bitmap);
   1517 
   1518         if (rdma->nb_sent > 0) {
   1519             rdma->nb_sent--;
   1520         }
   1521     } else {
   1522         trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
   1523     }
   1524 
   1525     *wr_id_out = wc.wr_id;
   1526     if (byte_len) {
   1527         *byte_len = wc.byte_len;
   1528     }
   1529 
   1530     return  0;
   1531 }
   1532 
   1533 /* Wait for activity on the completion channel.
   1534  * Returns 0 on success, none-0 on error.
   1535  */
   1536 static int qemu_rdma_wait_comp_channel(RDMAContext *rdma,
   1537                                        struct ibv_comp_channel *comp_channel)
   1538 {
   1539     struct rdma_cm_event *cm_event;
   1540     int ret = -1;
   1541 
   1542     /*
   1543      * Coroutine doesn't start until migration_fd_process_incoming()
   1544      * so don't yield unless we know we're running inside of a coroutine.
   1545      */
   1546     if (rdma->migration_started_on_destination &&
   1547         migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
   1548         yield_until_fd_readable(comp_channel->fd);
   1549     } else {
   1550         /* This is the source side, we're in a separate thread
   1551          * or destination prior to migration_fd_process_incoming()
   1552          * after postcopy, the destination also in a separate thread.
   1553          * we can't yield; so we have to poll the fd.
   1554          * But we need to be able to handle 'cancel' or an error
   1555          * without hanging forever.
   1556          */
   1557         while (!rdma->error_state  && !rdma->received_error) {
   1558             GPollFD pfds[2];
   1559             pfds[0].fd = comp_channel->fd;
   1560             pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
   1561             pfds[0].revents = 0;
   1562 
   1563             pfds[1].fd = rdma->channel->fd;
   1564             pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
   1565             pfds[1].revents = 0;
   1566 
   1567             /* 0.1s timeout, should be fine for a 'cancel' */
   1568             switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) {
   1569             case 2:
   1570             case 1: /* fd active */
   1571                 if (pfds[0].revents) {
   1572                     return 0;
   1573                 }
   1574 
   1575                 if (pfds[1].revents) {
   1576                     ret = rdma_get_cm_event(rdma->channel, &cm_event);
   1577                     if (ret) {
   1578                         error_report("failed to get cm event while wait "
   1579                                      "completion channel");
   1580                         return -EPIPE;
   1581                     }
   1582 
   1583                     error_report("receive cm event while wait comp channel,"
   1584                                  "cm event is %d", cm_event->event);
   1585                     if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
   1586                         cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
   1587                         rdma_ack_cm_event(cm_event);
   1588                         return -EPIPE;
   1589                     }
   1590                     rdma_ack_cm_event(cm_event);
   1591                 }
   1592                 break;
   1593 
   1594             case 0: /* Timeout, go around again */
   1595                 break;
   1596 
   1597             default: /* Error of some type -
   1598                       * I don't trust errno from qemu_poll_ns
   1599                      */
   1600                 error_report("%s: poll failed", __func__);
   1601                 return -EPIPE;
   1602             }
   1603 
   1604             if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
   1605                 /* Bail out and let the cancellation happen */
   1606                 return -EPIPE;
   1607             }
   1608         }
   1609     }
   1610 
   1611     if (rdma->received_error) {
   1612         return -EPIPE;
   1613     }
   1614     return rdma->error_state;
   1615 }
   1616 
   1617 static struct ibv_comp_channel *to_channel(RDMAContext *rdma, int wrid)
   1618 {
   1619     return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_comp_channel :
   1620            rdma->recv_comp_channel;
   1621 }
   1622 
   1623 static struct ibv_cq *to_cq(RDMAContext *rdma, int wrid)
   1624 {
   1625     return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_cq : rdma->recv_cq;
   1626 }
   1627 
   1628 /*
   1629  * Block until the next work request has completed.
   1630  *
   1631  * First poll to see if a work request has already completed,
   1632  * otherwise block.
   1633  *
   1634  * If we encounter completed work requests for IDs other than
   1635  * the one we're interested in, then that's generally an error.
   1636  *
   1637  * The only exception is actual RDMA Write completions. These
   1638  * completions only need to be recorded, but do not actually
   1639  * need further processing.
   1640  */
   1641 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
   1642                                     uint32_t *byte_len)
   1643 {
   1644     int num_cq_events = 0, ret = 0;
   1645     struct ibv_cq *cq;
   1646     void *cq_ctx;
   1647     uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
   1648     struct ibv_comp_channel *ch = to_channel(rdma, wrid_requested);
   1649     struct ibv_cq *poll_cq = to_cq(rdma, wrid_requested);
   1650 
   1651     if (ibv_req_notify_cq(poll_cq, 0)) {
   1652         return -1;
   1653     }
   1654     /* poll cq first */
   1655     while (wr_id != wrid_requested) {
   1656         ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
   1657         if (ret < 0) {
   1658             return ret;
   1659         }
   1660 
   1661         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
   1662 
   1663         if (wr_id == RDMA_WRID_NONE) {
   1664             break;
   1665         }
   1666         if (wr_id != wrid_requested) {
   1667             trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
   1668                        wrid_requested, print_wrid(wr_id), wr_id);
   1669         }
   1670     }
   1671 
   1672     if (wr_id == wrid_requested) {
   1673         return 0;
   1674     }
   1675 
   1676     while (1) {
   1677         ret = qemu_rdma_wait_comp_channel(rdma, ch);
   1678         if (ret) {
   1679             goto err_block_for_wrid;
   1680         }
   1681 
   1682         ret = ibv_get_cq_event(ch, &cq, &cq_ctx);
   1683         if (ret) {
   1684             perror("ibv_get_cq_event");
   1685             goto err_block_for_wrid;
   1686         }
   1687 
   1688         num_cq_events++;
   1689 
   1690         ret = -ibv_req_notify_cq(cq, 0);
   1691         if (ret) {
   1692             goto err_block_for_wrid;
   1693         }
   1694 
   1695         while (wr_id != wrid_requested) {
   1696             ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
   1697             if (ret < 0) {
   1698                 goto err_block_for_wrid;
   1699             }
   1700 
   1701             wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
   1702 
   1703             if (wr_id == RDMA_WRID_NONE) {
   1704                 break;
   1705             }
   1706             if (wr_id != wrid_requested) {
   1707                 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
   1708                                    wrid_requested, print_wrid(wr_id), wr_id);
   1709             }
   1710         }
   1711 
   1712         if (wr_id == wrid_requested) {
   1713             goto success_block_for_wrid;
   1714         }
   1715     }
   1716 
   1717 success_block_for_wrid:
   1718     if (num_cq_events) {
   1719         ibv_ack_cq_events(cq, num_cq_events);
   1720     }
   1721     return 0;
   1722 
   1723 err_block_for_wrid:
   1724     if (num_cq_events) {
   1725         ibv_ack_cq_events(cq, num_cq_events);
   1726     }
   1727 
   1728     rdma->error_state = ret;
   1729     return ret;
   1730 }
   1731 
   1732 /*
   1733  * Post a SEND message work request for the control channel
   1734  * containing some data and block until the post completes.
   1735  */
   1736 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
   1737                                        RDMAControlHeader *head)
   1738 {
   1739     int ret = 0;
   1740     RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
   1741     struct ibv_send_wr *bad_wr;
   1742     struct ibv_sge sge = {
   1743                            .addr = (uintptr_t)(wr->control),
   1744                            .length = head->len + sizeof(RDMAControlHeader),
   1745                            .lkey = wr->control_mr->lkey,
   1746                          };
   1747     struct ibv_send_wr send_wr = {
   1748                                    .wr_id = RDMA_WRID_SEND_CONTROL,
   1749                                    .opcode = IBV_WR_SEND,
   1750                                    .send_flags = IBV_SEND_SIGNALED,
   1751                                    .sg_list = &sge,
   1752                                    .num_sge = 1,
   1753                                 };
   1754 
   1755     trace_qemu_rdma_post_send_control(control_desc(head->type));
   1756 
   1757     /*
   1758      * We don't actually need to do a memcpy() in here if we used
   1759      * the "sge" properly, but since we're only sending control messages
   1760      * (not RAM in a performance-critical path), then its OK for now.
   1761      *
   1762      * The copy makes the RDMAControlHeader simpler to manipulate
   1763      * for the time being.
   1764      */
   1765     assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
   1766     memcpy(wr->control, head, sizeof(RDMAControlHeader));
   1767     control_to_network((void *) wr->control);
   1768 
   1769     if (buf) {
   1770         memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
   1771     }
   1772 
   1773 
   1774     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
   1775 
   1776     if (ret > 0) {
   1777         error_report("Failed to use post IB SEND for control");
   1778         return -ret;
   1779     }
   1780 
   1781     ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
   1782     if (ret < 0) {
   1783         error_report("rdma migration: send polling control error");
   1784     }
   1785 
   1786     return ret;
   1787 }
   1788 
   1789 /*
   1790  * Post a RECV work request in anticipation of some future receipt
   1791  * of data on the control channel.
   1792  */
   1793 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
   1794 {
   1795     struct ibv_recv_wr *bad_wr;
   1796     struct ibv_sge sge = {
   1797                             .addr = (uintptr_t)(rdma->wr_data[idx].control),
   1798                             .length = RDMA_CONTROL_MAX_BUFFER,
   1799                             .lkey = rdma->wr_data[idx].control_mr->lkey,
   1800                          };
   1801 
   1802     struct ibv_recv_wr recv_wr = {
   1803                                     .wr_id = RDMA_WRID_RECV_CONTROL + idx,
   1804                                     .sg_list = &sge,
   1805                                     .num_sge = 1,
   1806                                  };
   1807 
   1808 
   1809     if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
   1810         return -1;
   1811     }
   1812 
   1813     return 0;
   1814 }
   1815 
   1816 /*
   1817  * Block and wait for a RECV control channel message to arrive.
   1818  */
   1819 static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
   1820                 RDMAControlHeader *head, int expecting, int idx)
   1821 {
   1822     uint32_t byte_len;
   1823     int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
   1824                                        &byte_len);
   1825 
   1826     if (ret < 0) {
   1827         error_report("rdma migration: recv polling control error!");
   1828         return ret;
   1829     }
   1830 
   1831     network_to_control((void *) rdma->wr_data[idx].control);
   1832     memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
   1833 
   1834     trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
   1835 
   1836     if (expecting == RDMA_CONTROL_NONE) {
   1837         trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
   1838                                              head->type);
   1839     } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
   1840         error_report("Was expecting a %s (%d) control message"
   1841                 ", but got: %s (%d), length: %d",
   1842                 control_desc(expecting), expecting,
   1843                 control_desc(head->type), head->type, head->len);
   1844         if (head->type == RDMA_CONTROL_ERROR) {
   1845             rdma->received_error = true;
   1846         }
   1847         return -EIO;
   1848     }
   1849     if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
   1850         error_report("too long length: %d", head->len);
   1851         return -EINVAL;
   1852     }
   1853     if (sizeof(*head) + head->len != byte_len) {
   1854         error_report("Malformed length: %d byte_len %d", head->len, byte_len);
   1855         return -EINVAL;
   1856     }
   1857 
   1858     return 0;
   1859 }
   1860 
   1861 /*
   1862  * When a RECV work request has completed, the work request's
   1863  * buffer is pointed at the header.
   1864  *
   1865  * This will advance the pointer to the data portion
   1866  * of the control message of the work request's buffer that
   1867  * was populated after the work request finished.
   1868  */
   1869 static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
   1870                                   RDMAControlHeader *head)
   1871 {
   1872     rdma->wr_data[idx].control_len = head->len;
   1873     rdma->wr_data[idx].control_curr =
   1874         rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
   1875 }
   1876 
   1877 /*
   1878  * This is an 'atomic' high-level operation to deliver a single, unified
   1879  * control-channel message.
   1880  *
   1881  * Additionally, if the user is expecting some kind of reply to this message,
   1882  * they can request a 'resp' response message be filled in by posting an
   1883  * additional work request on behalf of the user and waiting for an additional
   1884  * completion.
   1885  *
   1886  * The extra (optional) response is used during registration to us from having
   1887  * to perform an *additional* exchange of message just to provide a response by
   1888  * instead piggy-backing on the acknowledgement.
   1889  */
   1890 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
   1891                                    uint8_t *data, RDMAControlHeader *resp,
   1892                                    int *resp_idx,
   1893                                    int (*callback)(RDMAContext *rdma))
   1894 {
   1895     int ret = 0;
   1896 
   1897     /*
   1898      * Wait until the dest is ready before attempting to deliver the message
   1899      * by waiting for a READY message.
   1900      */
   1901     if (rdma->control_ready_expected) {
   1902         RDMAControlHeader resp;
   1903         ret = qemu_rdma_exchange_get_response(rdma,
   1904                                     &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
   1905         if (ret < 0) {
   1906             return ret;
   1907         }
   1908     }
   1909 
   1910     /*
   1911      * If the user is expecting a response, post a WR in anticipation of it.
   1912      */
   1913     if (resp) {
   1914         ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
   1915         if (ret) {
   1916             error_report("rdma migration: error posting"
   1917                     " extra control recv for anticipated result!");
   1918             return ret;
   1919         }
   1920     }
   1921 
   1922     /*
   1923      * Post a WR to replace the one we just consumed for the READY message.
   1924      */
   1925     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
   1926     if (ret) {
   1927         error_report("rdma migration: error posting first control recv!");
   1928         return ret;
   1929     }
   1930 
   1931     /*
   1932      * Deliver the control message that was requested.
   1933      */
   1934     ret = qemu_rdma_post_send_control(rdma, data, head);
   1935 
   1936     if (ret < 0) {
   1937         error_report("Failed to send control buffer!");
   1938         return ret;
   1939     }
   1940 
   1941     /*
   1942      * If we're expecting a response, block and wait for it.
   1943      */
   1944     if (resp) {
   1945         if (callback) {
   1946             trace_qemu_rdma_exchange_send_issue_callback();
   1947             ret = callback(rdma);
   1948             if (ret < 0) {
   1949                 return ret;
   1950             }
   1951         }
   1952 
   1953         trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
   1954         ret = qemu_rdma_exchange_get_response(rdma, resp,
   1955                                               resp->type, RDMA_WRID_DATA);
   1956 
   1957         if (ret < 0) {
   1958             return ret;
   1959         }
   1960 
   1961         qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
   1962         if (resp_idx) {
   1963             *resp_idx = RDMA_WRID_DATA;
   1964         }
   1965         trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
   1966     }
   1967 
   1968     rdma->control_ready_expected = 1;
   1969 
   1970     return 0;
   1971 }
   1972 
   1973 /*
   1974  * This is an 'atomic' high-level operation to receive a single, unified
   1975  * control-channel message.
   1976  */
   1977 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
   1978                                 int expecting)
   1979 {
   1980     RDMAControlHeader ready = {
   1981                                 .len = 0,
   1982                                 .type = RDMA_CONTROL_READY,
   1983                                 .repeat = 1,
   1984                               };
   1985     int ret;
   1986 
   1987     /*
   1988      * Inform the source that we're ready to receive a message.
   1989      */
   1990     ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
   1991 
   1992     if (ret < 0) {
   1993         error_report("Failed to send control buffer!");
   1994         return ret;
   1995     }
   1996 
   1997     /*
   1998      * Block and wait for the message.
   1999      */
   2000     ret = qemu_rdma_exchange_get_response(rdma, head,
   2001                                           expecting, RDMA_WRID_READY);
   2002 
   2003     if (ret < 0) {
   2004         return ret;
   2005     }
   2006 
   2007     qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
   2008 
   2009     /*
   2010      * Post a new RECV work request to replace the one we just consumed.
   2011      */
   2012     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
   2013     if (ret) {
   2014         error_report("rdma migration: error posting second control recv!");
   2015         return ret;
   2016     }
   2017 
   2018     return 0;
   2019 }
   2020 
   2021 /*
   2022  * Write an actual chunk of memory using RDMA.
   2023  *
   2024  * If we're using dynamic registration on the dest-side, we have to
   2025  * send a registration command first.
   2026  */
   2027 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
   2028                                int current_index, uint64_t current_addr,
   2029                                uint64_t length)
   2030 {
   2031     struct ibv_sge sge;
   2032     struct ibv_send_wr send_wr = { 0 };
   2033     struct ibv_send_wr *bad_wr;
   2034     int reg_result_idx, ret, count = 0;
   2035     uint64_t chunk, chunks;
   2036     uint8_t *chunk_start, *chunk_end;
   2037     RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
   2038     RDMARegister reg;
   2039     RDMARegisterResult *reg_result;
   2040     RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
   2041     RDMAControlHeader head = { .len = sizeof(RDMARegister),
   2042                                .type = RDMA_CONTROL_REGISTER_REQUEST,
   2043                                .repeat = 1,
   2044                              };
   2045 
   2046 retry:
   2047     sge.addr = (uintptr_t)(block->local_host_addr +
   2048                             (current_addr - block->offset));
   2049     sge.length = length;
   2050 
   2051     chunk = ram_chunk_index(block->local_host_addr,
   2052                             (uint8_t *)(uintptr_t)sge.addr);
   2053     chunk_start = ram_chunk_start(block, chunk);
   2054 
   2055     if (block->is_ram_block) {
   2056         chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
   2057 
   2058         if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
   2059             chunks--;
   2060         }
   2061     } else {
   2062         chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
   2063 
   2064         if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
   2065             chunks--;
   2066         }
   2067     }
   2068 
   2069     trace_qemu_rdma_write_one_top(chunks + 1,
   2070                                   (chunks + 1) *
   2071                                   (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
   2072 
   2073     chunk_end = ram_chunk_end(block, chunk + chunks);
   2074 
   2075 
   2076     while (test_bit(chunk, block->transit_bitmap)) {
   2077         (void)count;
   2078         trace_qemu_rdma_write_one_block(count++, current_index, chunk,
   2079                 sge.addr, length, rdma->nb_sent, block->nb_chunks);
   2080 
   2081         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
   2082 
   2083         if (ret < 0) {
   2084             error_report("Failed to Wait for previous write to complete "
   2085                     "block %d chunk %" PRIu64
   2086                     " current %" PRIu64 " len %" PRIu64 " %d",
   2087                     current_index, chunk, sge.addr, length, rdma->nb_sent);
   2088             return ret;
   2089         }
   2090     }
   2091 
   2092     if (!rdma->pin_all || !block->is_ram_block) {
   2093         if (!block->remote_keys[chunk]) {
   2094             /*
   2095              * This chunk has not yet been registered, so first check to see
   2096              * if the entire chunk is zero. If so, tell the other size to
   2097              * memset() + madvise() the entire chunk without RDMA.
   2098              */
   2099 
   2100             if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
   2101                 RDMACompress comp = {
   2102                                         .offset = current_addr,
   2103                                         .value = 0,
   2104                                         .block_idx = current_index,
   2105                                         .length = length,
   2106                                     };
   2107 
   2108                 head.len = sizeof(comp);
   2109                 head.type = RDMA_CONTROL_COMPRESS;
   2110 
   2111                 trace_qemu_rdma_write_one_zero(chunk, sge.length,
   2112                                                current_index, current_addr);
   2113 
   2114                 compress_to_network(rdma, &comp);
   2115                 ret = qemu_rdma_exchange_send(rdma, &head,
   2116                                 (uint8_t *) &comp, NULL, NULL, NULL);
   2117 
   2118                 if (ret < 0) {
   2119                     return -EIO;
   2120                 }
   2121 
   2122                 acct_update_position(f, sge.length, true);
   2123 
   2124                 return 1;
   2125             }
   2126 
   2127             /*
   2128              * Otherwise, tell other side to register.
   2129              */
   2130             reg.current_index = current_index;
   2131             if (block->is_ram_block) {
   2132                 reg.key.current_addr = current_addr;
   2133             } else {
   2134                 reg.key.chunk = chunk;
   2135             }
   2136             reg.chunks = chunks;
   2137 
   2138             trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
   2139                                               current_addr);
   2140 
   2141             register_to_network(rdma, &reg);
   2142             ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
   2143                                     &resp, &reg_result_idx, NULL);
   2144             if (ret < 0) {
   2145                 return ret;
   2146             }
   2147 
   2148             /* try to overlap this single registration with the one we sent. */
   2149             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
   2150                                                 &sge.lkey, NULL, chunk,
   2151                                                 chunk_start, chunk_end)) {
   2152                 error_report("cannot get lkey");
   2153                 return -EINVAL;
   2154             }
   2155 
   2156             reg_result = (RDMARegisterResult *)
   2157                     rdma->wr_data[reg_result_idx].control_curr;
   2158 
   2159             network_to_result(reg_result);
   2160 
   2161             trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
   2162                                                  reg_result->rkey, chunk);
   2163 
   2164             block->remote_keys[chunk] = reg_result->rkey;
   2165             block->remote_host_addr = reg_result->host_addr;
   2166         } else {
   2167             /* already registered before */
   2168             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
   2169                                                 &sge.lkey, NULL, chunk,
   2170                                                 chunk_start, chunk_end)) {
   2171                 error_report("cannot get lkey!");
   2172                 return -EINVAL;
   2173             }
   2174         }
   2175 
   2176         send_wr.wr.rdma.rkey = block->remote_keys[chunk];
   2177     } else {
   2178         send_wr.wr.rdma.rkey = block->remote_rkey;
   2179 
   2180         if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
   2181                                                      &sge.lkey, NULL, chunk,
   2182                                                      chunk_start, chunk_end)) {
   2183             error_report("cannot get lkey!");
   2184             return -EINVAL;
   2185         }
   2186     }
   2187 
   2188     /*
   2189      * Encode the ram block index and chunk within this wrid.
   2190      * We will use this information at the time of completion
   2191      * to figure out which bitmap to check against and then which
   2192      * chunk in the bitmap to look for.
   2193      */
   2194     send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
   2195                                         current_index, chunk);
   2196 
   2197     send_wr.opcode = IBV_WR_RDMA_WRITE;
   2198     send_wr.send_flags = IBV_SEND_SIGNALED;
   2199     send_wr.sg_list = &sge;
   2200     send_wr.num_sge = 1;
   2201     send_wr.wr.rdma.remote_addr = block->remote_host_addr +
   2202                                 (current_addr - block->offset);
   2203 
   2204     trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
   2205                                    sge.length);
   2206 
   2207     /*
   2208      * ibv_post_send() does not return negative error numbers,
   2209      * per the specification they are positive - no idea why.
   2210      */
   2211     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
   2212 
   2213     if (ret == ENOMEM) {
   2214         trace_qemu_rdma_write_one_queue_full();
   2215         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
   2216         if (ret < 0) {
   2217             error_report("rdma migration: failed to make "
   2218                          "room in full send queue! %d", ret);
   2219             return ret;
   2220         }
   2221 
   2222         goto retry;
   2223 
   2224     } else if (ret > 0) {
   2225         perror("rdma migration: post rdma write failed");
   2226         return -ret;
   2227     }
   2228 
   2229     set_bit(chunk, block->transit_bitmap);
   2230     acct_update_position(f, sge.length, false);
   2231     rdma->total_writes++;
   2232 
   2233     return 0;
   2234 }
   2235 
   2236 /*
   2237  * Push out any unwritten RDMA operations.
   2238  *
   2239  * We support sending out multiple chunks at the same time.
   2240  * Not all of them need to get signaled in the completion queue.
   2241  */
   2242 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
   2243 {
   2244     int ret;
   2245 
   2246     if (!rdma->current_length) {
   2247         return 0;
   2248     }
   2249 
   2250     ret = qemu_rdma_write_one(f, rdma,
   2251             rdma->current_index, rdma->current_addr, rdma->current_length);
   2252 
   2253     if (ret < 0) {
   2254         return ret;
   2255     }
   2256 
   2257     if (ret == 0) {
   2258         rdma->nb_sent++;
   2259         trace_qemu_rdma_write_flush(rdma->nb_sent);
   2260     }
   2261 
   2262     rdma->current_length = 0;
   2263     rdma->current_addr = 0;
   2264 
   2265     return 0;
   2266 }
   2267 
   2268 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
   2269                     uint64_t offset, uint64_t len)
   2270 {
   2271     RDMALocalBlock *block;
   2272     uint8_t *host_addr;
   2273     uint8_t *chunk_end;
   2274 
   2275     if (rdma->current_index < 0) {
   2276         return 0;
   2277     }
   2278 
   2279     if (rdma->current_chunk < 0) {
   2280         return 0;
   2281     }
   2282 
   2283     block = &(rdma->local_ram_blocks.block[rdma->current_index]);
   2284     host_addr = block->local_host_addr + (offset - block->offset);
   2285     chunk_end = ram_chunk_end(block, rdma->current_chunk);
   2286 
   2287     if (rdma->current_length == 0) {
   2288         return 0;
   2289     }
   2290 
   2291     /*
   2292      * Only merge into chunk sequentially.
   2293      */
   2294     if (offset != (rdma->current_addr + rdma->current_length)) {
   2295         return 0;
   2296     }
   2297 
   2298     if (offset < block->offset) {
   2299         return 0;
   2300     }
   2301 
   2302     if ((offset + len) > (block->offset + block->length)) {
   2303         return 0;
   2304     }
   2305 
   2306     if ((host_addr + len) > chunk_end) {
   2307         return 0;
   2308     }
   2309 
   2310     return 1;
   2311 }
   2312 
   2313 /*
   2314  * We're not actually writing here, but doing three things:
   2315  *
   2316  * 1. Identify the chunk the buffer belongs to.
   2317  * 2. If the chunk is full or the buffer doesn't belong to the current
   2318  *    chunk, then start a new chunk and flush() the old chunk.
   2319  * 3. To keep the hardware busy, we also group chunks into batches
   2320  *    and only require that a batch gets acknowledged in the completion
   2321  *    queue instead of each individual chunk.
   2322  */
   2323 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
   2324                            uint64_t block_offset, uint64_t offset,
   2325                            uint64_t len)
   2326 {
   2327     uint64_t current_addr = block_offset + offset;
   2328     uint64_t index = rdma->current_index;
   2329     uint64_t chunk = rdma->current_chunk;
   2330     int ret;
   2331 
   2332     /* If we cannot merge it, we flush the current buffer first. */
   2333     if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
   2334         ret = qemu_rdma_write_flush(f, rdma);
   2335         if (ret) {
   2336             return ret;
   2337         }
   2338         rdma->current_length = 0;
   2339         rdma->current_addr = current_addr;
   2340 
   2341         ret = qemu_rdma_search_ram_block(rdma, block_offset,
   2342                                          offset, len, &index, &chunk);
   2343         if (ret) {
   2344             error_report("ram block search failed");
   2345             return ret;
   2346         }
   2347         rdma->current_index = index;
   2348         rdma->current_chunk = chunk;
   2349     }
   2350 
   2351     /* merge it */
   2352     rdma->current_length += len;
   2353 
   2354     /* flush it if buffer is too large */
   2355     if (rdma->current_length >= RDMA_MERGE_MAX) {
   2356         return qemu_rdma_write_flush(f, rdma);
   2357     }
   2358 
   2359     return 0;
   2360 }
   2361 
   2362 static void qemu_rdma_cleanup(RDMAContext *rdma)
   2363 {
   2364     int idx;
   2365 
   2366     if (rdma->cm_id && rdma->connected) {
   2367         if ((rdma->error_state ||
   2368              migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
   2369             !rdma->received_error) {
   2370             RDMAControlHeader head = { .len = 0,
   2371                                        .type = RDMA_CONTROL_ERROR,
   2372                                        .repeat = 1,
   2373                                      };
   2374             error_report("Early error. Sending error.");
   2375             qemu_rdma_post_send_control(rdma, NULL, &head);
   2376         }
   2377 
   2378         rdma_disconnect(rdma->cm_id);
   2379         trace_qemu_rdma_cleanup_disconnect();
   2380         rdma->connected = false;
   2381     }
   2382 
   2383     if (rdma->channel) {
   2384         qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
   2385     }
   2386     g_free(rdma->dest_blocks);
   2387     rdma->dest_blocks = NULL;
   2388 
   2389     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
   2390         if (rdma->wr_data[idx].control_mr) {
   2391             rdma->total_registrations--;
   2392             ibv_dereg_mr(rdma->wr_data[idx].control_mr);
   2393         }
   2394         rdma->wr_data[idx].control_mr = NULL;
   2395     }
   2396 
   2397     if (rdma->local_ram_blocks.block) {
   2398         while (rdma->local_ram_blocks.nb_blocks) {
   2399             rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
   2400         }
   2401     }
   2402 
   2403     if (rdma->qp) {
   2404         rdma_destroy_qp(rdma->cm_id);
   2405         rdma->qp = NULL;
   2406     }
   2407     if (rdma->recv_cq) {
   2408         ibv_destroy_cq(rdma->recv_cq);
   2409         rdma->recv_cq = NULL;
   2410     }
   2411     if (rdma->send_cq) {
   2412         ibv_destroy_cq(rdma->send_cq);
   2413         rdma->send_cq = NULL;
   2414     }
   2415     if (rdma->recv_comp_channel) {
   2416         ibv_destroy_comp_channel(rdma->recv_comp_channel);
   2417         rdma->recv_comp_channel = NULL;
   2418     }
   2419     if (rdma->send_comp_channel) {
   2420         ibv_destroy_comp_channel(rdma->send_comp_channel);
   2421         rdma->send_comp_channel = NULL;
   2422     }
   2423     if (rdma->pd) {
   2424         ibv_dealloc_pd(rdma->pd);
   2425         rdma->pd = NULL;
   2426     }
   2427     if (rdma->cm_id) {
   2428         rdma_destroy_id(rdma->cm_id);
   2429         rdma->cm_id = NULL;
   2430     }
   2431 
   2432     /* the destination side, listen_id and channel is shared */
   2433     if (rdma->listen_id) {
   2434         if (!rdma->is_return_path) {
   2435             rdma_destroy_id(rdma->listen_id);
   2436         }
   2437         rdma->listen_id = NULL;
   2438 
   2439         if (rdma->channel) {
   2440             if (!rdma->is_return_path) {
   2441                 rdma_destroy_event_channel(rdma->channel);
   2442             }
   2443             rdma->channel = NULL;
   2444         }
   2445     }
   2446 
   2447     if (rdma->channel) {
   2448         rdma_destroy_event_channel(rdma->channel);
   2449         rdma->channel = NULL;
   2450     }
   2451     g_free(rdma->host);
   2452     g_free(rdma->host_port);
   2453     rdma->host = NULL;
   2454     rdma->host_port = NULL;
   2455 }
   2456 
   2457 
   2458 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
   2459 {
   2460     int ret, idx;
   2461     Error *local_err = NULL, **temp = &local_err;
   2462 
   2463     /*
   2464      * Will be validated against destination's actual capabilities
   2465      * after the connect() completes.
   2466      */
   2467     rdma->pin_all = pin_all;
   2468 
   2469     ret = qemu_rdma_resolve_host(rdma, temp);
   2470     if (ret) {
   2471         goto err_rdma_source_init;
   2472     }
   2473 
   2474     ret = qemu_rdma_alloc_pd_cq(rdma);
   2475     if (ret) {
   2476         ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
   2477                     " limits may be too low. Please check $ ulimit -a # and "
   2478                     "search for 'ulimit -l' in the output");
   2479         goto err_rdma_source_init;
   2480     }
   2481 
   2482     ret = qemu_rdma_alloc_qp(rdma);
   2483     if (ret) {
   2484         ERROR(temp, "rdma migration: error allocating qp!");
   2485         goto err_rdma_source_init;
   2486     }
   2487 
   2488     ret = qemu_rdma_init_ram_blocks(rdma);
   2489     if (ret) {
   2490         ERROR(temp, "rdma migration: error initializing ram blocks!");
   2491         goto err_rdma_source_init;
   2492     }
   2493 
   2494     /* Build the hash that maps from offset to RAMBlock */
   2495     rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
   2496     for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
   2497         g_hash_table_insert(rdma->blockmap,
   2498                 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
   2499                 &rdma->local_ram_blocks.block[idx]);
   2500     }
   2501 
   2502     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
   2503         ret = qemu_rdma_reg_control(rdma, idx);
   2504         if (ret) {
   2505             ERROR(temp, "rdma migration: error registering %d control!",
   2506                                                             idx);
   2507             goto err_rdma_source_init;
   2508         }
   2509     }
   2510 
   2511     return 0;
   2512 
   2513 err_rdma_source_init:
   2514     error_propagate(errp, local_err);
   2515     qemu_rdma_cleanup(rdma);
   2516     return -1;
   2517 }
   2518 
   2519 static int qemu_get_cm_event_timeout(RDMAContext *rdma,
   2520                                      struct rdma_cm_event **cm_event,
   2521                                      long msec, Error **errp)
   2522 {
   2523     int ret;
   2524     struct pollfd poll_fd = {
   2525                                 .fd = rdma->channel->fd,
   2526                                 .events = POLLIN,
   2527                                 .revents = 0
   2528                             };
   2529 
   2530     do {
   2531         ret = poll(&poll_fd, 1, msec);
   2532     } while (ret < 0 && errno == EINTR);
   2533 
   2534     if (ret == 0) {
   2535         ERROR(errp, "poll cm event timeout");
   2536         return -1;
   2537     } else if (ret < 0) {
   2538         ERROR(errp, "failed to poll cm event, errno=%i", errno);
   2539         return -1;
   2540     } else if (poll_fd.revents & POLLIN) {
   2541         return rdma_get_cm_event(rdma->channel, cm_event);
   2542     } else {
   2543         ERROR(errp, "no POLLIN event, revent=%x", poll_fd.revents);
   2544         return -1;
   2545     }
   2546 }
   2547 
   2548 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp, bool return_path)
   2549 {
   2550     RDMACapabilities cap = {
   2551                                 .version = RDMA_CONTROL_VERSION_CURRENT,
   2552                                 .flags = 0,
   2553                            };
   2554     struct rdma_conn_param conn_param = { .initiator_depth = 2,
   2555                                           .retry_count = 5,
   2556                                           .private_data = &cap,
   2557                                           .private_data_len = sizeof(cap),
   2558                                         };
   2559     struct rdma_cm_event *cm_event;
   2560     int ret;
   2561 
   2562     /*
   2563      * Only negotiate the capability with destination if the user
   2564      * on the source first requested the capability.
   2565      */
   2566     if (rdma->pin_all) {
   2567         trace_qemu_rdma_connect_pin_all_requested();
   2568         cap.flags |= RDMA_CAPABILITY_PIN_ALL;
   2569     }
   2570 
   2571     caps_to_network(&cap);
   2572 
   2573     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
   2574     if (ret) {
   2575         ERROR(errp, "posting second control recv");
   2576         goto err_rdma_source_connect;
   2577     }
   2578 
   2579     ret = rdma_connect(rdma->cm_id, &conn_param);
   2580     if (ret) {
   2581         perror("rdma_connect");
   2582         ERROR(errp, "connecting to destination!");
   2583         goto err_rdma_source_connect;
   2584     }
   2585 
   2586     if (return_path) {
   2587         ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp);
   2588     } else {
   2589         ret = rdma_get_cm_event(rdma->channel, &cm_event);
   2590     }
   2591     if (ret) {
   2592         perror("rdma_get_cm_event after rdma_connect");
   2593         ERROR(errp, "connecting to destination!");
   2594         goto err_rdma_source_connect;
   2595     }
   2596 
   2597     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
   2598         error_report("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
   2599         ERROR(errp, "connecting to destination!");
   2600         rdma_ack_cm_event(cm_event);
   2601         goto err_rdma_source_connect;
   2602     }
   2603     rdma->connected = true;
   2604 
   2605     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
   2606     network_to_caps(&cap);
   2607 
   2608     /*
   2609      * Verify that the *requested* capabilities are supported by the destination
   2610      * and disable them otherwise.
   2611      */
   2612     if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
   2613         ERROR(errp, "Server cannot support pinning all memory. "
   2614                         "Will register memory dynamically.");
   2615         rdma->pin_all = false;
   2616     }
   2617 
   2618     trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
   2619 
   2620     rdma_ack_cm_event(cm_event);
   2621 
   2622     rdma->control_ready_expected = 1;
   2623     rdma->nb_sent = 0;
   2624     return 0;
   2625 
   2626 err_rdma_source_connect:
   2627     qemu_rdma_cleanup(rdma);
   2628     return -1;
   2629 }
   2630 
   2631 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
   2632 {
   2633     int ret, idx;
   2634     struct rdma_cm_id *listen_id;
   2635     char ip[40] = "unknown";
   2636     struct rdma_addrinfo *res, *e;
   2637     char port_str[16];
   2638     int reuse = 1;
   2639 
   2640     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
   2641         rdma->wr_data[idx].control_len = 0;
   2642         rdma->wr_data[idx].control_curr = NULL;
   2643     }
   2644 
   2645     if (!rdma->host || !rdma->host[0]) {
   2646         ERROR(errp, "RDMA host is not set!");
   2647         rdma->error_state = -EINVAL;
   2648         return -1;
   2649     }
   2650     /* create CM channel */
   2651     rdma->channel = rdma_create_event_channel();
   2652     if (!rdma->channel) {
   2653         ERROR(errp, "could not create rdma event channel");
   2654         rdma->error_state = -EINVAL;
   2655         return -1;
   2656     }
   2657 
   2658     /* create CM id */
   2659     ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
   2660     if (ret) {
   2661         ERROR(errp, "could not create cm_id!");
   2662         goto err_dest_init_create_listen_id;
   2663     }
   2664 
   2665     snprintf(port_str, 16, "%d", rdma->port);
   2666     port_str[15] = '\0';
   2667 
   2668     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
   2669     if (ret < 0) {
   2670         ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
   2671         goto err_dest_init_bind_addr;
   2672     }
   2673 
   2674     ret = rdma_set_option(listen_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
   2675                           &reuse, sizeof reuse);
   2676     if (ret) {
   2677         ERROR(errp, "Error: could not set REUSEADDR option");
   2678         goto err_dest_init_bind_addr;
   2679     }
   2680     for (e = res; e != NULL; e = e->ai_next) {
   2681         inet_ntop(e->ai_family,
   2682             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
   2683         trace_qemu_rdma_dest_init_trying(rdma->host, ip);
   2684         ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
   2685         if (ret) {
   2686             continue;
   2687         }
   2688         if (e->ai_family == AF_INET6) {
   2689             ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp);
   2690             if (ret) {
   2691                 continue;
   2692             }
   2693         }
   2694         break;
   2695     }
   2696 
   2697     rdma_freeaddrinfo(res);
   2698     if (!e) {
   2699         ERROR(errp, "Error: could not rdma_bind_addr!");
   2700         goto err_dest_init_bind_addr;
   2701     }
   2702 
   2703     rdma->listen_id = listen_id;
   2704     qemu_rdma_dump_gid("dest_init", listen_id);
   2705     return 0;
   2706 
   2707 err_dest_init_bind_addr:
   2708     rdma_destroy_id(listen_id);
   2709 err_dest_init_create_listen_id:
   2710     rdma_destroy_event_channel(rdma->channel);
   2711     rdma->channel = NULL;
   2712     rdma->error_state = ret;
   2713     return ret;
   2714 
   2715 }
   2716 
   2717 static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
   2718                                             RDMAContext *rdma)
   2719 {
   2720     int idx;
   2721 
   2722     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
   2723         rdma_return_path->wr_data[idx].control_len = 0;
   2724         rdma_return_path->wr_data[idx].control_curr = NULL;
   2725     }
   2726 
   2727     /*the CM channel and CM id is shared*/
   2728     rdma_return_path->channel = rdma->channel;
   2729     rdma_return_path->listen_id = rdma->listen_id;
   2730 
   2731     rdma->return_path = rdma_return_path;
   2732     rdma_return_path->return_path = rdma;
   2733     rdma_return_path->is_return_path = true;
   2734 }
   2735 
   2736 static void *qemu_rdma_data_init(const char *host_port, Error **errp)
   2737 {
   2738     RDMAContext *rdma = NULL;
   2739     InetSocketAddress *addr;
   2740 
   2741     if (host_port) {
   2742         rdma = g_new0(RDMAContext, 1);
   2743         rdma->current_index = -1;
   2744         rdma->current_chunk = -1;
   2745 
   2746         addr = g_new(InetSocketAddress, 1);
   2747         if (!inet_parse(addr, host_port, NULL)) {
   2748             rdma->port = atoi(addr->port);
   2749             rdma->host = g_strdup(addr->host);
   2750             rdma->host_port = g_strdup(host_port);
   2751         } else {
   2752             ERROR(errp, "bad RDMA migration address '%s'", host_port);
   2753             g_free(rdma);
   2754             rdma = NULL;
   2755         }
   2756 
   2757         qapi_free_InetSocketAddress(addr);
   2758     }
   2759 
   2760     return rdma;
   2761 }
   2762 
   2763 /*
   2764  * QEMUFile interface to the control channel.
   2765  * SEND messages for control only.
   2766  * VM's ram is handled with regular RDMA messages.
   2767  */
   2768 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
   2769                                        const struct iovec *iov,
   2770                                        size_t niov,
   2771                                        int *fds,
   2772                                        size_t nfds,
   2773                                        int flags,
   2774                                        Error **errp)
   2775 {
   2776     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
   2777     QEMUFile *f = rioc->file;
   2778     RDMAContext *rdma;
   2779     int ret;
   2780     ssize_t done = 0;
   2781     size_t i;
   2782     size_t len = 0;
   2783 
   2784     RCU_READ_LOCK_GUARD();
   2785     rdma = qatomic_rcu_read(&rioc->rdmaout);
   2786 
   2787     if (!rdma) {
   2788         return -EIO;
   2789     }
   2790 
   2791     CHECK_ERROR_STATE();
   2792 
   2793     /*
   2794      * Push out any writes that
   2795      * we're queued up for VM's ram.
   2796      */
   2797     ret = qemu_rdma_write_flush(f, rdma);
   2798     if (ret < 0) {
   2799         rdma->error_state = ret;
   2800         return ret;
   2801     }
   2802 
   2803     for (i = 0; i < niov; i++) {
   2804         size_t remaining = iov[i].iov_len;
   2805         uint8_t * data = (void *)iov[i].iov_base;
   2806         while (remaining) {
   2807             RDMAControlHeader head;
   2808 
   2809             len = MIN(remaining, RDMA_SEND_INCREMENT);
   2810             remaining -= len;
   2811 
   2812             head.len = len;
   2813             head.type = RDMA_CONTROL_QEMU_FILE;
   2814 
   2815             ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
   2816 
   2817             if (ret < 0) {
   2818                 rdma->error_state = ret;
   2819                 return ret;
   2820             }
   2821 
   2822             data += len;
   2823             done += len;
   2824         }
   2825     }
   2826 
   2827     return done;
   2828 }
   2829 
   2830 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
   2831                              size_t size, int idx)
   2832 {
   2833     size_t len = 0;
   2834 
   2835     if (rdma->wr_data[idx].control_len) {
   2836         trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
   2837 
   2838         len = MIN(size, rdma->wr_data[idx].control_len);
   2839         memcpy(buf, rdma->wr_data[idx].control_curr, len);
   2840         rdma->wr_data[idx].control_curr += len;
   2841         rdma->wr_data[idx].control_len -= len;
   2842     }
   2843 
   2844     return len;
   2845 }
   2846 
   2847 /*
   2848  * QEMUFile interface to the control channel.
   2849  * RDMA links don't use bytestreams, so we have to
   2850  * return bytes to QEMUFile opportunistically.
   2851  */
   2852 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
   2853                                       const struct iovec *iov,
   2854                                       size_t niov,
   2855                                       int **fds,
   2856                                       size_t *nfds,
   2857                                       Error **errp)
   2858 {
   2859     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
   2860     RDMAContext *rdma;
   2861     RDMAControlHeader head;
   2862     int ret = 0;
   2863     ssize_t i;
   2864     size_t done = 0;
   2865 
   2866     RCU_READ_LOCK_GUARD();
   2867     rdma = qatomic_rcu_read(&rioc->rdmain);
   2868 
   2869     if (!rdma) {
   2870         return -EIO;
   2871     }
   2872 
   2873     CHECK_ERROR_STATE();
   2874 
   2875     for (i = 0; i < niov; i++) {
   2876         size_t want = iov[i].iov_len;
   2877         uint8_t *data = (void *)iov[i].iov_base;
   2878 
   2879         /*
   2880          * First, we hold on to the last SEND message we
   2881          * were given and dish out the bytes until we run
   2882          * out of bytes.
   2883          */
   2884         ret = qemu_rdma_fill(rdma, data, want, 0);
   2885         done += ret;
   2886         want -= ret;
   2887         /* Got what we needed, so go to next iovec */
   2888         if (want == 0) {
   2889             continue;
   2890         }
   2891 
   2892         /* If we got any data so far, then don't wait
   2893          * for more, just return what we have */
   2894         if (done > 0) {
   2895             break;
   2896         }
   2897 
   2898 
   2899         /* We've got nothing at all, so lets wait for
   2900          * more to arrive
   2901          */
   2902         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
   2903 
   2904         if (ret < 0) {
   2905             rdma->error_state = ret;
   2906             return ret;
   2907         }
   2908 
   2909         /*
   2910          * SEND was received with new bytes, now try again.
   2911          */
   2912         ret = qemu_rdma_fill(rdma, data, want, 0);
   2913         done += ret;
   2914         want -= ret;
   2915 
   2916         /* Still didn't get enough, so lets just return */
   2917         if (want) {
   2918             if (done == 0) {
   2919                 return QIO_CHANNEL_ERR_BLOCK;
   2920             } else {
   2921                 break;
   2922             }
   2923         }
   2924     }
   2925     return done;
   2926 }
   2927 
   2928 /*
   2929  * Block until all the outstanding chunks have been delivered by the hardware.
   2930  */
   2931 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
   2932 {
   2933     int ret;
   2934 
   2935     if (qemu_rdma_write_flush(f, rdma) < 0) {
   2936         return -EIO;
   2937     }
   2938 
   2939     while (rdma->nb_sent) {
   2940         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
   2941         if (ret < 0) {
   2942             error_report("rdma migration: complete polling error!");
   2943             return -EIO;
   2944         }
   2945     }
   2946 
   2947     qemu_rdma_unregister_waiting(rdma);
   2948 
   2949     return 0;
   2950 }
   2951 
   2952 
   2953 static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
   2954                                          bool blocking,
   2955                                          Error **errp)
   2956 {
   2957     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
   2958     /* XXX we should make readv/writev actually honour this :-) */
   2959     rioc->blocking = blocking;
   2960     return 0;
   2961 }
   2962 
   2963 
   2964 typedef struct QIOChannelRDMASource QIOChannelRDMASource;
   2965 struct QIOChannelRDMASource {
   2966     GSource parent;
   2967     QIOChannelRDMA *rioc;
   2968     GIOCondition condition;
   2969 };
   2970 
   2971 static gboolean
   2972 qio_channel_rdma_source_prepare(GSource *source,
   2973                                 gint *timeout)
   2974 {
   2975     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
   2976     RDMAContext *rdma;
   2977     GIOCondition cond = 0;
   2978     *timeout = -1;
   2979 
   2980     RCU_READ_LOCK_GUARD();
   2981     if (rsource->condition == G_IO_IN) {
   2982         rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
   2983     } else {
   2984         rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
   2985     }
   2986 
   2987     if (!rdma) {
   2988         error_report("RDMAContext is NULL when prepare Gsource");
   2989         return FALSE;
   2990     }
   2991 
   2992     if (rdma->wr_data[0].control_len) {
   2993         cond |= G_IO_IN;
   2994     }
   2995     cond |= G_IO_OUT;
   2996 
   2997     return cond & rsource->condition;
   2998 }
   2999 
   3000 static gboolean
   3001 qio_channel_rdma_source_check(GSource *source)
   3002 {
   3003     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
   3004     RDMAContext *rdma;
   3005     GIOCondition cond = 0;
   3006 
   3007     RCU_READ_LOCK_GUARD();
   3008     if (rsource->condition == G_IO_IN) {
   3009         rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
   3010     } else {
   3011         rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
   3012     }
   3013 
   3014     if (!rdma) {
   3015         error_report("RDMAContext is NULL when check Gsource");
   3016         return FALSE;
   3017     }
   3018 
   3019     if (rdma->wr_data[0].control_len) {
   3020         cond |= G_IO_IN;
   3021     }
   3022     cond |= G_IO_OUT;
   3023 
   3024     return cond & rsource->condition;
   3025 }
   3026 
   3027 static gboolean
   3028 qio_channel_rdma_source_dispatch(GSource *source,
   3029                                  GSourceFunc callback,
   3030                                  gpointer user_data)
   3031 {
   3032     QIOChannelFunc func = (QIOChannelFunc)callback;
   3033     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
   3034     RDMAContext *rdma;
   3035     GIOCondition cond = 0;
   3036 
   3037     RCU_READ_LOCK_GUARD();
   3038     if (rsource->condition == G_IO_IN) {
   3039         rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
   3040     } else {
   3041         rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
   3042     }
   3043 
   3044     if (!rdma) {
   3045         error_report("RDMAContext is NULL when dispatch Gsource");
   3046         return FALSE;
   3047     }
   3048 
   3049     if (rdma->wr_data[0].control_len) {
   3050         cond |= G_IO_IN;
   3051     }
   3052     cond |= G_IO_OUT;
   3053 
   3054     return (*func)(QIO_CHANNEL(rsource->rioc),
   3055                    (cond & rsource->condition),
   3056                    user_data);
   3057 }
   3058 
   3059 static void
   3060 qio_channel_rdma_source_finalize(GSource *source)
   3061 {
   3062     QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
   3063 
   3064     object_unref(OBJECT(ssource->rioc));
   3065 }
   3066 
   3067 GSourceFuncs qio_channel_rdma_source_funcs = {
   3068     qio_channel_rdma_source_prepare,
   3069     qio_channel_rdma_source_check,
   3070     qio_channel_rdma_source_dispatch,
   3071     qio_channel_rdma_source_finalize
   3072 };
   3073 
   3074 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
   3075                                               GIOCondition condition)
   3076 {
   3077     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
   3078     QIOChannelRDMASource *ssource;
   3079     GSource *source;
   3080 
   3081     source = g_source_new(&qio_channel_rdma_source_funcs,
   3082                           sizeof(QIOChannelRDMASource));
   3083     ssource = (QIOChannelRDMASource *)source;
   3084 
   3085     ssource->rioc = rioc;
   3086     object_ref(OBJECT(rioc));
   3087 
   3088     ssource->condition = condition;
   3089 
   3090     return source;
   3091 }
   3092 
   3093 static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
   3094                                                   AioContext *ctx,
   3095                                                   IOHandler *io_read,
   3096                                                   IOHandler *io_write,
   3097                                                   void *opaque)
   3098 {
   3099     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
   3100     if (io_read) {
   3101         aio_set_fd_handler(ctx, rioc->rdmain->recv_comp_channel->fd,
   3102                            false, io_read, io_write, NULL, NULL, opaque);
   3103         aio_set_fd_handler(ctx, rioc->rdmain->send_comp_channel->fd,
   3104                            false, io_read, io_write, NULL, NULL, opaque);
   3105     } else {
   3106         aio_set_fd_handler(ctx, rioc->rdmaout->recv_comp_channel->fd,
   3107                            false, io_read, io_write, NULL, NULL, opaque);
   3108         aio_set_fd_handler(ctx, rioc->rdmaout->send_comp_channel->fd,
   3109                            false, io_read, io_write, NULL, NULL, opaque);
   3110     }
   3111 }
   3112 
   3113 struct rdma_close_rcu {
   3114     struct rcu_head rcu;
   3115     RDMAContext *rdmain;
   3116     RDMAContext *rdmaout;
   3117 };
   3118 
   3119 /* callback from qio_channel_rdma_close via call_rcu */
   3120 static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu)
   3121 {
   3122     if (rcu->rdmain) {
   3123         qemu_rdma_cleanup(rcu->rdmain);
   3124     }
   3125 
   3126     if (rcu->rdmaout) {
   3127         qemu_rdma_cleanup(rcu->rdmaout);
   3128     }
   3129 
   3130     g_free(rcu->rdmain);
   3131     g_free(rcu->rdmaout);
   3132     g_free(rcu);
   3133 }
   3134 
   3135 static int qio_channel_rdma_close(QIOChannel *ioc,
   3136                                   Error **errp)
   3137 {
   3138     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
   3139     RDMAContext *rdmain, *rdmaout;
   3140     struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1);
   3141 
   3142     trace_qemu_rdma_close();
   3143 
   3144     rdmain = rioc->rdmain;
   3145     if (rdmain) {
   3146         qatomic_rcu_set(&rioc->rdmain, NULL);
   3147     }
   3148 
   3149     rdmaout = rioc->rdmaout;
   3150     if (rdmaout) {
   3151         qatomic_rcu_set(&rioc->rdmaout, NULL);
   3152     }
   3153 
   3154     rcu->rdmain = rdmain;
   3155     rcu->rdmaout = rdmaout;
   3156     call_rcu(rcu, qio_channel_rdma_close_rcu, rcu);
   3157 
   3158     return 0;
   3159 }
   3160 
   3161 static int
   3162 qio_channel_rdma_shutdown(QIOChannel *ioc,
   3163                             QIOChannelShutdown how,
   3164                             Error **errp)
   3165 {
   3166     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
   3167     RDMAContext *rdmain, *rdmaout;
   3168 
   3169     RCU_READ_LOCK_GUARD();
   3170 
   3171     rdmain = qatomic_rcu_read(&rioc->rdmain);
   3172     rdmaout = qatomic_rcu_read(&rioc->rdmain);
   3173 
   3174     switch (how) {
   3175     case QIO_CHANNEL_SHUTDOWN_READ:
   3176         if (rdmain) {
   3177             rdmain->error_state = -1;
   3178         }
   3179         break;
   3180     case QIO_CHANNEL_SHUTDOWN_WRITE:
   3181         if (rdmaout) {
   3182             rdmaout->error_state = -1;
   3183         }
   3184         break;
   3185     case QIO_CHANNEL_SHUTDOWN_BOTH:
   3186     default:
   3187         if (rdmain) {
   3188             rdmain->error_state = -1;
   3189         }
   3190         if (rdmaout) {
   3191             rdmaout->error_state = -1;
   3192         }
   3193         break;
   3194     }
   3195 
   3196     return 0;
   3197 }
   3198 
   3199 /*
   3200  * Parameters:
   3201  *    @offset == 0 :
   3202  *        This means that 'block_offset' is a full virtual address that does not
   3203  *        belong to a RAMBlock of the virtual machine and instead
   3204  *        represents a private malloc'd memory area that the caller wishes to
   3205  *        transfer.
   3206  *
   3207  *    @offset != 0 :
   3208  *        Offset is an offset to be added to block_offset and used
   3209  *        to also lookup the corresponding RAMBlock.
   3210  *
   3211  *    @size : Number of bytes to transfer
   3212  *
   3213  *    @bytes_sent : User-specificed pointer to indicate how many bytes were
   3214  *                  sent. Usually, this will not be more than a few bytes of
   3215  *                  the protocol because most transfers are sent asynchronously.
   3216  */
   3217 static size_t qemu_rdma_save_page(QEMUFile *f,
   3218                                   ram_addr_t block_offset, ram_addr_t offset,
   3219                                   size_t size, uint64_t *bytes_sent)
   3220 {
   3221     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
   3222     RDMAContext *rdma;
   3223     int ret;
   3224 
   3225     RCU_READ_LOCK_GUARD();
   3226     rdma = qatomic_rcu_read(&rioc->rdmaout);
   3227 
   3228     if (!rdma) {
   3229         return -EIO;
   3230     }
   3231 
   3232     CHECK_ERROR_STATE();
   3233 
   3234     if (migration_in_postcopy()) {
   3235         return RAM_SAVE_CONTROL_NOT_SUPP;
   3236     }
   3237 
   3238     qemu_fflush(f);
   3239 
   3240     /*
   3241      * Add this page to the current 'chunk'. If the chunk
   3242      * is full, or the page doesn't belong to the current chunk,
   3243      * an actual RDMA write will occur and a new chunk will be formed.
   3244      */
   3245     ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
   3246     if (ret < 0) {
   3247         error_report("rdma migration: write error! %d", ret);
   3248         goto err;
   3249     }
   3250 
   3251     /*
   3252      * We always return 1 bytes because the RDMA
   3253      * protocol is completely asynchronous. We do not yet know
   3254      * whether an  identified chunk is zero or not because we're
   3255      * waiting for other pages to potentially be merged with
   3256      * the current chunk. So, we have to call qemu_update_position()
   3257      * later on when the actual write occurs.
   3258      */
   3259     if (bytes_sent) {
   3260         *bytes_sent = 1;
   3261     }
   3262 
   3263     /*
   3264      * Drain the Completion Queue if possible, but do not block,
   3265      * just poll.
   3266      *
   3267      * If nothing to poll, the end of the iteration will do this
   3268      * again to make sure we don't overflow the request queue.
   3269      */
   3270     while (1) {
   3271         uint64_t wr_id, wr_id_in;
   3272         int ret = qemu_rdma_poll(rdma, rdma->recv_cq, &wr_id_in, NULL);
   3273         if (ret < 0) {
   3274             error_report("rdma migration: polling error! %d", ret);
   3275             goto err;
   3276         }
   3277 
   3278         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
   3279 
   3280         if (wr_id == RDMA_WRID_NONE) {
   3281             break;
   3282         }
   3283     }
   3284 
   3285     while (1) {
   3286         uint64_t wr_id, wr_id_in;
   3287         int ret = qemu_rdma_poll(rdma, rdma->send_cq, &wr_id_in, NULL);
   3288         if (ret < 0) {
   3289             error_report("rdma migration: polling error! %d", ret);
   3290             goto err;
   3291         }
   3292 
   3293         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
   3294 
   3295         if (wr_id == RDMA_WRID_NONE) {
   3296             break;
   3297         }
   3298     }
   3299 
   3300     return RAM_SAVE_CONTROL_DELAYED;
   3301 err:
   3302     rdma->error_state = ret;
   3303     return ret;
   3304 }
   3305 
   3306 static void rdma_accept_incoming_migration(void *opaque);
   3307 
   3308 static void rdma_cm_poll_handler(void *opaque)
   3309 {
   3310     RDMAContext *rdma = opaque;
   3311     int ret;
   3312     struct rdma_cm_event *cm_event;
   3313     MigrationIncomingState *mis = migration_incoming_get_current();
   3314 
   3315     ret = rdma_get_cm_event(rdma->channel, &cm_event);
   3316     if (ret) {
   3317         error_report("get_cm_event failed %d", errno);
   3318         return;
   3319     }
   3320 
   3321     if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
   3322         cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
   3323         if (!rdma->error_state &&
   3324             migration_incoming_get_current()->state !=
   3325               MIGRATION_STATUS_COMPLETED) {
   3326             error_report("receive cm event, cm event is %d", cm_event->event);
   3327             rdma->error_state = -EPIPE;
   3328             if (rdma->return_path) {
   3329                 rdma->return_path->error_state = -EPIPE;
   3330             }
   3331         }
   3332         rdma_ack_cm_event(cm_event);
   3333 
   3334         if (mis->migration_incoming_co) {
   3335             qemu_coroutine_enter(mis->migration_incoming_co);
   3336         }
   3337         return;
   3338     }
   3339     rdma_ack_cm_event(cm_event);
   3340 }
   3341 
   3342 static int qemu_rdma_accept(RDMAContext *rdma)
   3343 {
   3344     RDMACapabilities cap;
   3345     struct rdma_conn_param conn_param = {
   3346                                             .responder_resources = 2,
   3347                                             .private_data = &cap,
   3348                                             .private_data_len = sizeof(cap),
   3349                                          };
   3350     RDMAContext *rdma_return_path = NULL;
   3351     struct rdma_cm_event *cm_event;
   3352     struct ibv_context *verbs;
   3353     int ret = -EINVAL;
   3354     int idx;
   3355 
   3356     ret = rdma_get_cm_event(rdma->channel, &cm_event);
   3357     if (ret) {
   3358         goto err_rdma_dest_wait;
   3359     }
   3360 
   3361     if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
   3362         rdma_ack_cm_event(cm_event);
   3363         goto err_rdma_dest_wait;
   3364     }
   3365 
   3366     /*
   3367      * initialize the RDMAContext for return path for postcopy after first
   3368      * connection request reached.
   3369      */
   3370     if (migrate_postcopy() && !rdma->is_return_path) {
   3371         rdma_return_path = qemu_rdma_data_init(rdma->host_port, NULL);
   3372         if (rdma_return_path == NULL) {
   3373             rdma_ack_cm_event(cm_event);
   3374             goto err_rdma_dest_wait;
   3375         }
   3376 
   3377         qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
   3378     }
   3379 
   3380     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
   3381 
   3382     network_to_caps(&cap);
   3383 
   3384     if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
   3385             error_report("Unknown source RDMA version: %d, bailing...",
   3386                             cap.version);
   3387             rdma_ack_cm_event(cm_event);
   3388             goto err_rdma_dest_wait;
   3389     }
   3390 
   3391     /*
   3392      * Respond with only the capabilities this version of QEMU knows about.
   3393      */
   3394     cap.flags &= known_capabilities;
   3395 
   3396     /*
   3397      * Enable the ones that we do know about.
   3398      * Add other checks here as new ones are introduced.
   3399      */
   3400     if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
   3401         rdma->pin_all = true;
   3402     }
   3403 
   3404     rdma->cm_id = cm_event->id;
   3405     verbs = cm_event->id->verbs;
   3406 
   3407     rdma_ack_cm_event(cm_event);
   3408 
   3409     trace_qemu_rdma_accept_pin_state(rdma->pin_all);
   3410 
   3411     caps_to_network(&cap);
   3412 
   3413     trace_qemu_rdma_accept_pin_verbsc(verbs);
   3414 
   3415     if (!rdma->verbs) {
   3416         rdma->verbs = verbs;
   3417     } else if (rdma->verbs != verbs) {
   3418             error_report("ibv context not matching %p, %p!", rdma->verbs,
   3419                          verbs);
   3420             goto err_rdma_dest_wait;
   3421     }
   3422 
   3423     qemu_rdma_dump_id("dest_init", verbs);
   3424 
   3425     ret = qemu_rdma_alloc_pd_cq(rdma);
   3426     if (ret) {
   3427         error_report("rdma migration: error allocating pd and cq!");
   3428         goto err_rdma_dest_wait;
   3429     }
   3430 
   3431     ret = qemu_rdma_alloc_qp(rdma);
   3432     if (ret) {
   3433         error_report("rdma migration: error allocating qp!");
   3434         goto err_rdma_dest_wait;
   3435     }
   3436 
   3437     ret = qemu_rdma_init_ram_blocks(rdma);
   3438     if (ret) {
   3439         error_report("rdma migration: error initializing ram blocks!");
   3440         goto err_rdma_dest_wait;
   3441     }
   3442 
   3443     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
   3444         ret = qemu_rdma_reg_control(rdma, idx);
   3445         if (ret) {
   3446             error_report("rdma: error registering %d control", idx);
   3447             goto err_rdma_dest_wait;
   3448         }
   3449     }
   3450 
   3451     /* Accept the second connection request for return path */
   3452     if (migrate_postcopy() && !rdma->is_return_path) {
   3453         qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
   3454                             NULL,
   3455                             (void *)(intptr_t)rdma->return_path);
   3456     } else {
   3457         qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
   3458                             NULL, rdma);
   3459     }
   3460 
   3461     ret = rdma_accept(rdma->cm_id, &conn_param);
   3462     if (ret) {
   3463         error_report("rdma_accept returns %d", ret);
   3464         goto err_rdma_dest_wait;
   3465     }
   3466 
   3467     ret = rdma_get_cm_event(rdma->channel, &cm_event);
   3468     if (ret) {
   3469         error_report("rdma_accept get_cm_event failed %d", ret);
   3470         goto err_rdma_dest_wait;
   3471     }
   3472 
   3473     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
   3474         error_report("rdma_accept not event established");
   3475         rdma_ack_cm_event(cm_event);
   3476         goto err_rdma_dest_wait;
   3477     }
   3478 
   3479     rdma_ack_cm_event(cm_event);
   3480     rdma->connected = true;
   3481 
   3482     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
   3483     if (ret) {
   3484         error_report("rdma migration: error posting second control recv");
   3485         goto err_rdma_dest_wait;
   3486     }
   3487 
   3488     qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
   3489 
   3490     return 0;
   3491 
   3492 err_rdma_dest_wait:
   3493     rdma->error_state = ret;
   3494     qemu_rdma_cleanup(rdma);
   3495     g_free(rdma_return_path);
   3496     return ret;
   3497 }
   3498 
   3499 static int dest_ram_sort_func(const void *a, const void *b)
   3500 {
   3501     unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
   3502     unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
   3503 
   3504     return (a_index < b_index) ? -1 : (a_index != b_index);
   3505 }
   3506 
   3507 /*
   3508  * During each iteration of the migration, we listen for instructions
   3509  * by the source VM to perform dynamic page registrations before they
   3510  * can perform RDMA operations.
   3511  *
   3512  * We respond with the 'rkey'.
   3513  *
   3514  * Keep doing this until the source tells us to stop.
   3515  */
   3516 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
   3517 {
   3518     RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
   3519                                .type = RDMA_CONTROL_REGISTER_RESULT,
   3520                                .repeat = 0,
   3521                              };
   3522     RDMAControlHeader unreg_resp = { .len = 0,
   3523                                .type = RDMA_CONTROL_UNREGISTER_FINISHED,
   3524                                .repeat = 0,
   3525                              };
   3526     RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
   3527                                  .repeat = 1 };
   3528     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
   3529     RDMAContext *rdma;
   3530     RDMALocalBlocks *local;
   3531     RDMAControlHeader head;
   3532     RDMARegister *reg, *registers;
   3533     RDMACompress *comp;
   3534     RDMARegisterResult *reg_result;
   3535     static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
   3536     RDMALocalBlock *block;
   3537     void *host_addr;
   3538     int ret = 0;
   3539     int idx = 0;
   3540     int count = 0;
   3541     int i = 0;
   3542 
   3543     RCU_READ_LOCK_GUARD();
   3544     rdma = qatomic_rcu_read(&rioc->rdmain);
   3545 
   3546     if (!rdma) {
   3547         return -EIO;
   3548     }
   3549 
   3550     CHECK_ERROR_STATE();
   3551 
   3552     local = &rdma->local_ram_blocks;
   3553     do {
   3554         trace_qemu_rdma_registration_handle_wait();
   3555 
   3556         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
   3557 
   3558         if (ret < 0) {
   3559             break;
   3560         }
   3561 
   3562         if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
   3563             error_report("rdma: Too many requests in this message (%d)."
   3564                             "Bailing.", head.repeat);
   3565             ret = -EIO;
   3566             break;
   3567         }
   3568 
   3569         switch (head.type) {
   3570         case RDMA_CONTROL_COMPRESS:
   3571             comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
   3572             network_to_compress(comp);
   3573 
   3574             trace_qemu_rdma_registration_handle_compress(comp->length,
   3575                                                          comp->block_idx,
   3576                                                          comp->offset);
   3577             if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
   3578                 error_report("rdma: 'compress' bad block index %u (vs %d)",
   3579                              (unsigned int)comp->block_idx,
   3580                              rdma->local_ram_blocks.nb_blocks);
   3581                 ret = -EIO;
   3582                 goto out;
   3583             }
   3584             block = &(rdma->local_ram_blocks.block[comp->block_idx]);
   3585 
   3586             host_addr = block->local_host_addr +
   3587                             (comp->offset - block->offset);
   3588 
   3589             ram_handle_compressed(host_addr, comp->value, comp->length);
   3590             break;
   3591 
   3592         case RDMA_CONTROL_REGISTER_FINISHED:
   3593             trace_qemu_rdma_registration_handle_finished();
   3594             goto out;
   3595 
   3596         case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
   3597             trace_qemu_rdma_registration_handle_ram_blocks();
   3598 
   3599             /* Sort our local RAM Block list so it's the same as the source,
   3600              * we can do this since we've filled in a src_index in the list
   3601              * as we received the RAMBlock list earlier.
   3602              */
   3603             qsort(rdma->local_ram_blocks.block,
   3604                   rdma->local_ram_blocks.nb_blocks,
   3605                   sizeof(RDMALocalBlock), dest_ram_sort_func);
   3606             for (i = 0; i < local->nb_blocks; i++) {
   3607                 local->block[i].index = i;
   3608             }
   3609 
   3610             if (rdma->pin_all) {
   3611                 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
   3612                 if (ret) {
   3613                     error_report("rdma migration: error dest "
   3614                                     "registering ram blocks");
   3615                     goto out;
   3616                 }
   3617             }
   3618 
   3619             /*
   3620              * Dest uses this to prepare to transmit the RAMBlock descriptions
   3621              * to the source VM after connection setup.
   3622              * Both sides use the "remote" structure to communicate and update
   3623              * their "local" descriptions with what was sent.
   3624              */
   3625             for (i = 0; i < local->nb_blocks; i++) {
   3626                 rdma->dest_blocks[i].remote_host_addr =
   3627                     (uintptr_t)(local->block[i].local_host_addr);
   3628 
   3629                 if (rdma->pin_all) {
   3630                     rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
   3631                 }
   3632 
   3633                 rdma->dest_blocks[i].offset = local->block[i].offset;
   3634                 rdma->dest_blocks[i].length = local->block[i].length;
   3635 
   3636                 dest_block_to_network(&rdma->dest_blocks[i]);
   3637                 trace_qemu_rdma_registration_handle_ram_blocks_loop(
   3638                     local->block[i].block_name,
   3639                     local->block[i].offset,
   3640                     local->block[i].length,
   3641                     local->block[i].local_host_addr,
   3642                     local->block[i].src_index);
   3643             }
   3644 
   3645             blocks.len = rdma->local_ram_blocks.nb_blocks
   3646                                                 * sizeof(RDMADestBlock);
   3647 
   3648 
   3649             ret = qemu_rdma_post_send_control(rdma,
   3650                                         (uint8_t *) rdma->dest_blocks, &blocks);
   3651 
   3652             if (ret < 0) {
   3653                 error_report("rdma migration: error sending remote info");
   3654                 goto out;
   3655             }
   3656 
   3657             break;
   3658         case RDMA_CONTROL_REGISTER_REQUEST:
   3659             trace_qemu_rdma_registration_handle_register(head.repeat);
   3660 
   3661             reg_resp.repeat = head.repeat;
   3662             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
   3663 
   3664             for (count = 0; count < head.repeat; count++) {
   3665                 uint64_t chunk;
   3666                 uint8_t *chunk_start, *chunk_end;
   3667 
   3668                 reg = &registers[count];
   3669                 network_to_register(reg);
   3670 
   3671                 reg_result = &results[count];
   3672 
   3673                 trace_qemu_rdma_registration_handle_register_loop(count,
   3674                          reg->current_index, reg->key.current_addr, reg->chunks);
   3675 
   3676                 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
   3677                     error_report("rdma: 'register' bad block index %u (vs %d)",
   3678                                  (unsigned int)reg->current_index,
   3679                                  rdma->local_ram_blocks.nb_blocks);
   3680                     ret = -ENOENT;
   3681                     goto out;
   3682                 }
   3683                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
   3684                 if (block->is_ram_block) {
   3685                     if (block->offset > reg->key.current_addr) {
   3686                         error_report("rdma: bad register address for block %s"
   3687                             " offset: %" PRIx64 " current_addr: %" PRIx64,
   3688                             block->block_name, block->offset,
   3689                             reg->key.current_addr);
   3690                         ret = -ERANGE;
   3691                         goto out;
   3692                     }
   3693                     host_addr = (block->local_host_addr +
   3694                                 (reg->key.current_addr - block->offset));
   3695                     chunk = ram_chunk_index(block->local_host_addr,
   3696                                             (uint8_t *) host_addr);
   3697                 } else {
   3698                     chunk = reg->key.chunk;
   3699                     host_addr = block->local_host_addr +
   3700                         (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
   3701                     /* Check for particularly bad chunk value */
   3702                     if (host_addr < (void *)block->local_host_addr) {
   3703                         error_report("rdma: bad chunk for block %s"
   3704                             " chunk: %" PRIx64,
   3705                             block->block_name, reg->key.chunk);
   3706                         ret = -ERANGE;
   3707                         goto out;
   3708                     }
   3709                 }
   3710                 chunk_start = ram_chunk_start(block, chunk);
   3711                 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
   3712                 /* avoid "-Waddress-of-packed-member" warning */
   3713                 uint32_t tmp_rkey = 0;
   3714                 if (qemu_rdma_register_and_get_keys(rdma, block,
   3715                             (uintptr_t)host_addr, NULL, &tmp_rkey,
   3716                             chunk, chunk_start, chunk_end)) {
   3717                     error_report("cannot get rkey");
   3718                     ret = -EINVAL;
   3719                     goto out;
   3720                 }
   3721                 reg_result->rkey = tmp_rkey;
   3722 
   3723                 reg_result->host_addr = (uintptr_t)block->local_host_addr;
   3724 
   3725                 trace_qemu_rdma_registration_handle_register_rkey(
   3726                                                            reg_result->rkey);
   3727 
   3728                 result_to_network(reg_result);
   3729             }
   3730 
   3731             ret = qemu_rdma_post_send_control(rdma,
   3732                             (uint8_t *) results, &reg_resp);
   3733 
   3734             if (ret < 0) {
   3735                 error_report("Failed to send control buffer");
   3736                 goto out;
   3737             }
   3738             break;
   3739         case RDMA_CONTROL_UNREGISTER_REQUEST:
   3740             trace_qemu_rdma_registration_handle_unregister(head.repeat);
   3741             unreg_resp.repeat = head.repeat;
   3742             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
   3743 
   3744             for (count = 0; count < head.repeat; count++) {
   3745                 reg = &registers[count];
   3746                 network_to_register(reg);
   3747 
   3748                 trace_qemu_rdma_registration_handle_unregister_loop(count,
   3749                            reg->current_index, reg->key.chunk);
   3750 
   3751                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
   3752 
   3753                 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
   3754                 block->pmr[reg->key.chunk] = NULL;
   3755 
   3756                 if (ret != 0) {
   3757                     perror("rdma unregistration chunk failed");
   3758                     ret = -ret;
   3759                     goto out;
   3760                 }
   3761 
   3762                 rdma->total_registrations--;
   3763 
   3764                 trace_qemu_rdma_registration_handle_unregister_success(
   3765                                                        reg->key.chunk);
   3766             }
   3767 
   3768             ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
   3769 
   3770             if (ret < 0) {
   3771                 error_report("Failed to send control buffer");
   3772                 goto out;
   3773             }
   3774             break;
   3775         case RDMA_CONTROL_REGISTER_RESULT:
   3776             error_report("Invalid RESULT message at dest.");
   3777             ret = -EIO;
   3778             goto out;
   3779         default:
   3780             error_report("Unknown control message %s", control_desc(head.type));
   3781             ret = -EIO;
   3782             goto out;
   3783         }
   3784     } while (1);
   3785 out:
   3786     if (ret < 0) {
   3787         rdma->error_state = ret;
   3788     }
   3789     return ret;
   3790 }
   3791 
   3792 /* Destination:
   3793  * Called via a ram_control_load_hook during the initial RAM load section which
   3794  * lists the RAMBlocks by name.  This lets us know the order of the RAMBlocks
   3795  * on the source.
   3796  * We've already built our local RAMBlock list, but not yet sent the list to
   3797  * the source.
   3798  */
   3799 static int
   3800 rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
   3801 {
   3802     RDMAContext *rdma;
   3803     int curr;
   3804     int found = -1;
   3805 
   3806     RCU_READ_LOCK_GUARD();
   3807     rdma = qatomic_rcu_read(&rioc->rdmain);
   3808 
   3809     if (!rdma) {
   3810         return -EIO;
   3811     }
   3812 
   3813     /* Find the matching RAMBlock in our local list */
   3814     for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
   3815         if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
   3816             found = curr;
   3817             break;
   3818         }
   3819     }
   3820 
   3821     if (found == -1) {
   3822         error_report("RAMBlock '%s' not found on destination", name);
   3823         return -ENOENT;
   3824     }
   3825 
   3826     rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
   3827     trace_rdma_block_notification_handle(name, rdma->next_src_index);
   3828     rdma->next_src_index++;
   3829 
   3830     return 0;
   3831 }
   3832 
   3833 static int rdma_load_hook(QEMUFile *f, uint64_t flags, void *data)
   3834 {
   3835     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
   3836     switch (flags) {
   3837     case RAM_CONTROL_BLOCK_REG:
   3838         return rdma_block_notification_handle(rioc, data);
   3839 
   3840     case RAM_CONTROL_HOOK:
   3841         return qemu_rdma_registration_handle(f, rioc);
   3842 
   3843     default:
   3844         /* Shouldn't be called with any other values */
   3845         abort();
   3846     }
   3847 }
   3848 
   3849 static int qemu_rdma_registration_start(QEMUFile *f,
   3850                                         uint64_t flags, void *data)
   3851 {
   3852     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
   3853     RDMAContext *rdma;
   3854 
   3855     RCU_READ_LOCK_GUARD();
   3856     rdma = qatomic_rcu_read(&rioc->rdmaout);
   3857     if (!rdma) {
   3858         return -EIO;
   3859     }
   3860 
   3861     CHECK_ERROR_STATE();
   3862 
   3863     if (migration_in_postcopy()) {
   3864         return 0;
   3865     }
   3866 
   3867     trace_qemu_rdma_registration_start(flags);
   3868     qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
   3869     qemu_fflush(f);
   3870 
   3871     return 0;
   3872 }
   3873 
   3874 /*
   3875  * Inform dest that dynamic registrations are done for now.
   3876  * First, flush writes, if any.
   3877  */
   3878 static int qemu_rdma_registration_stop(QEMUFile *f,
   3879                                        uint64_t flags, void *data)
   3880 {
   3881     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
   3882     RDMAContext *rdma;
   3883     RDMAControlHeader head = { .len = 0, .repeat = 1 };
   3884     int ret = 0;
   3885 
   3886     RCU_READ_LOCK_GUARD();
   3887     rdma = qatomic_rcu_read(&rioc->rdmaout);
   3888     if (!rdma) {
   3889         return -EIO;
   3890     }
   3891 
   3892     CHECK_ERROR_STATE();
   3893 
   3894     if (migration_in_postcopy()) {
   3895         return 0;
   3896     }
   3897 
   3898     qemu_fflush(f);
   3899     ret = qemu_rdma_drain_cq(f, rdma);
   3900 
   3901     if (ret < 0) {
   3902         goto err;
   3903     }
   3904 
   3905     if (flags == RAM_CONTROL_SETUP) {
   3906         RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
   3907         RDMALocalBlocks *local = &rdma->local_ram_blocks;
   3908         int reg_result_idx, i, nb_dest_blocks;
   3909 
   3910         head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
   3911         trace_qemu_rdma_registration_stop_ram();
   3912 
   3913         /*
   3914          * Make sure that we parallelize the pinning on both sides.
   3915          * For very large guests, doing this serially takes a really
   3916          * long time, so we have to 'interleave' the pinning locally
   3917          * with the control messages by performing the pinning on this
   3918          * side before we receive the control response from the other
   3919          * side that the pinning has completed.
   3920          */
   3921         ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
   3922                     &reg_result_idx, rdma->pin_all ?
   3923                     qemu_rdma_reg_whole_ram_blocks : NULL);
   3924         if (ret < 0) {
   3925             fprintf(stderr, "receiving remote info!");
   3926             return ret;
   3927         }
   3928 
   3929         nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
   3930 
   3931         /*
   3932          * The protocol uses two different sets of rkeys (mutually exclusive):
   3933          * 1. One key to represent the virtual address of the entire ram block.
   3934          *    (dynamic chunk registration disabled - pin everything with one rkey.)
   3935          * 2. One to represent individual chunks within a ram block.
   3936          *    (dynamic chunk registration enabled - pin individual chunks.)
   3937          *
   3938          * Once the capability is successfully negotiated, the destination transmits
   3939          * the keys to use (or sends them later) including the virtual addresses
   3940          * and then propagates the remote ram block descriptions to his local copy.
   3941          */
   3942 
   3943         if (local->nb_blocks != nb_dest_blocks) {
   3944             fprintf(stderr, "ram blocks mismatch (Number of blocks %d vs %d) "
   3945                     "Your QEMU command line parameters are probably "
   3946                     "not identical on both the source and destination.",
   3947                     local->nb_blocks, nb_dest_blocks);
   3948             rdma->error_state = -EINVAL;
   3949             return -EINVAL;
   3950         }
   3951 
   3952         qemu_rdma_move_header(rdma, reg_result_idx, &resp);
   3953         memcpy(rdma->dest_blocks,
   3954             rdma->wr_data[reg_result_idx].control_curr, resp.len);
   3955         for (i = 0; i < nb_dest_blocks; i++) {
   3956             network_to_dest_block(&rdma->dest_blocks[i]);
   3957 
   3958             /* We require that the blocks are in the same order */
   3959             if (rdma->dest_blocks[i].length != local->block[i].length) {
   3960                 fprintf(stderr, "Block %s/%d has a different length %" PRIu64
   3961                         "vs %" PRIu64, local->block[i].block_name, i,
   3962                         local->block[i].length,
   3963                         rdma->dest_blocks[i].length);
   3964                 rdma->error_state = -EINVAL;
   3965                 return -EINVAL;
   3966             }
   3967             local->block[i].remote_host_addr =
   3968                     rdma->dest_blocks[i].remote_host_addr;
   3969             local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
   3970         }
   3971     }
   3972 
   3973     trace_qemu_rdma_registration_stop(flags);
   3974 
   3975     head.type = RDMA_CONTROL_REGISTER_FINISHED;
   3976     ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
   3977 
   3978     if (ret < 0) {
   3979         goto err;
   3980     }
   3981 
   3982     return 0;
   3983 err:
   3984     rdma->error_state = ret;
   3985     return ret;
   3986 }
   3987 
   3988 static const QEMUFileHooks rdma_read_hooks = {
   3989     .hook_ram_load = rdma_load_hook,
   3990 };
   3991 
   3992 static const QEMUFileHooks rdma_write_hooks = {
   3993     .before_ram_iterate = qemu_rdma_registration_start,
   3994     .after_ram_iterate  = qemu_rdma_registration_stop,
   3995     .save_page          = qemu_rdma_save_page,
   3996 };
   3997 
   3998 
   3999 static void qio_channel_rdma_finalize(Object *obj)
   4000 {
   4001     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
   4002     if (rioc->rdmain) {
   4003         qemu_rdma_cleanup(rioc->rdmain);
   4004         g_free(rioc->rdmain);
   4005         rioc->rdmain = NULL;
   4006     }
   4007     if (rioc->rdmaout) {
   4008         qemu_rdma_cleanup(rioc->rdmaout);
   4009         g_free(rioc->rdmaout);
   4010         rioc->rdmaout = NULL;
   4011     }
   4012 }
   4013 
   4014 static void qio_channel_rdma_class_init(ObjectClass *klass,
   4015                                         void *class_data G_GNUC_UNUSED)
   4016 {
   4017     QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
   4018 
   4019     ioc_klass->io_writev = qio_channel_rdma_writev;
   4020     ioc_klass->io_readv = qio_channel_rdma_readv;
   4021     ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
   4022     ioc_klass->io_close = qio_channel_rdma_close;
   4023     ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
   4024     ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
   4025     ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
   4026 }
   4027 
   4028 static const TypeInfo qio_channel_rdma_info = {
   4029     .parent = TYPE_QIO_CHANNEL,
   4030     .name = TYPE_QIO_CHANNEL_RDMA,
   4031     .instance_size = sizeof(QIOChannelRDMA),
   4032     .instance_finalize = qio_channel_rdma_finalize,
   4033     .class_init = qio_channel_rdma_class_init,
   4034 };
   4035 
   4036 static void qio_channel_rdma_register_types(void)
   4037 {
   4038     type_register_static(&qio_channel_rdma_info);
   4039 }
   4040 
   4041 type_init(qio_channel_rdma_register_types);
   4042 
   4043 static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
   4044 {
   4045     QIOChannelRDMA *rioc;
   4046 
   4047     if (qemu_file_mode_is_not_valid(mode)) {
   4048         return NULL;
   4049     }
   4050 
   4051     rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
   4052 
   4053     if (mode[0] == 'w') {
   4054         rioc->file = qemu_file_new_output(QIO_CHANNEL(rioc));
   4055         rioc->rdmaout = rdma;
   4056         rioc->rdmain = rdma->return_path;
   4057         qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
   4058     } else {
   4059         rioc->file = qemu_file_new_input(QIO_CHANNEL(rioc));
   4060         rioc->rdmain = rdma;
   4061         rioc->rdmaout = rdma->return_path;
   4062         qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
   4063     }
   4064 
   4065     return rioc->file;
   4066 }
   4067 
   4068 static void rdma_accept_incoming_migration(void *opaque)
   4069 {
   4070     RDMAContext *rdma = opaque;
   4071     int ret;
   4072     QEMUFile *f;
   4073     Error *local_err = NULL;
   4074 
   4075     trace_qemu_rdma_accept_incoming_migration();
   4076     ret = qemu_rdma_accept(rdma);
   4077 
   4078     if (ret) {
   4079         fprintf(stderr, "RDMA ERROR: Migration initialization failed\n");
   4080         return;
   4081     }
   4082 
   4083     trace_qemu_rdma_accept_incoming_migration_accepted();
   4084 
   4085     if (rdma->is_return_path) {
   4086         return;
   4087     }
   4088 
   4089     f = qemu_fopen_rdma(rdma, "rb");
   4090     if (f == NULL) {
   4091         fprintf(stderr, "RDMA ERROR: could not qemu_fopen_rdma\n");
   4092         qemu_rdma_cleanup(rdma);
   4093         return;
   4094     }
   4095 
   4096     rdma->migration_started_on_destination = 1;
   4097     migration_fd_process_incoming(f, &local_err);
   4098     if (local_err) {
   4099         error_reportf_err(local_err, "RDMA ERROR:");
   4100     }
   4101 }
   4102 
   4103 void rdma_start_incoming_migration(const char *host_port, Error **errp)
   4104 {
   4105     int ret;
   4106     RDMAContext *rdma, *rdma_return_path = NULL;
   4107     Error *local_err = NULL;
   4108 
   4109     trace_rdma_start_incoming_migration();
   4110 
   4111     /* Avoid ram_block_discard_disable(), cannot change during migration. */
   4112     if (ram_block_discard_is_required()) {
   4113         error_setg(errp, "RDMA: cannot disable RAM discard");
   4114         return;
   4115     }
   4116 
   4117     rdma = qemu_rdma_data_init(host_port, &local_err);
   4118     if (rdma == NULL) {
   4119         goto err;
   4120     }
   4121 
   4122     ret = qemu_rdma_dest_init(rdma, &local_err);
   4123 
   4124     if (ret) {
   4125         goto err;
   4126     }
   4127 
   4128     trace_rdma_start_incoming_migration_after_dest_init();
   4129 
   4130     ret = rdma_listen(rdma->listen_id, 5);
   4131 
   4132     if (ret) {
   4133         ERROR(errp, "listening on socket!");
   4134         goto cleanup_rdma;
   4135     }
   4136 
   4137     trace_rdma_start_incoming_migration_after_rdma_listen();
   4138 
   4139     qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
   4140                         NULL, (void *)(intptr_t)rdma);
   4141     return;
   4142 
   4143 cleanup_rdma:
   4144     qemu_rdma_cleanup(rdma);
   4145 err:
   4146     error_propagate(errp, local_err);
   4147     if (rdma) {
   4148         g_free(rdma->host);
   4149         g_free(rdma->host_port);
   4150     }
   4151     g_free(rdma);
   4152     g_free(rdma_return_path);
   4153 }
   4154 
   4155 void rdma_start_outgoing_migration(void *opaque,
   4156                             const char *host_port, Error **errp)
   4157 {
   4158     MigrationState *s = opaque;
   4159     RDMAContext *rdma_return_path = NULL;
   4160     RDMAContext *rdma;
   4161     int ret = 0;
   4162 
   4163     /* Avoid ram_block_discard_disable(), cannot change during migration. */
   4164     if (ram_block_discard_is_required()) {
   4165         error_setg(errp, "RDMA: cannot disable RAM discard");
   4166         return;
   4167     }
   4168 
   4169     rdma = qemu_rdma_data_init(host_port, errp);
   4170     if (rdma == NULL) {
   4171         goto err;
   4172     }
   4173 
   4174     ret = qemu_rdma_source_init(rdma,
   4175         s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
   4176 
   4177     if (ret) {
   4178         goto err;
   4179     }
   4180 
   4181     trace_rdma_start_outgoing_migration_after_rdma_source_init();
   4182     ret = qemu_rdma_connect(rdma, errp, false);
   4183 
   4184     if (ret) {
   4185         goto err;
   4186     }
   4187 
   4188     /* RDMA postcopy need a separate queue pair for return path */
   4189     if (migrate_postcopy()) {
   4190         rdma_return_path = qemu_rdma_data_init(host_port, errp);
   4191 
   4192         if (rdma_return_path == NULL) {
   4193             goto return_path_err;
   4194         }
   4195 
   4196         ret = qemu_rdma_source_init(rdma_return_path,
   4197             s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
   4198 
   4199         if (ret) {
   4200             goto return_path_err;
   4201         }
   4202 
   4203         ret = qemu_rdma_connect(rdma_return_path, errp, true);
   4204 
   4205         if (ret) {
   4206             goto return_path_err;
   4207         }
   4208 
   4209         rdma->return_path = rdma_return_path;
   4210         rdma_return_path->return_path = rdma;
   4211         rdma_return_path->is_return_path = true;
   4212     }
   4213 
   4214     trace_rdma_start_outgoing_migration_after_rdma_connect();
   4215 
   4216     s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
   4217     migrate_fd_connect(s, NULL);
   4218     return;
   4219 return_path_err:
   4220     qemu_rdma_cleanup(rdma);
   4221 err:
   4222     g_free(rdma);
   4223     g_free(rdma_return_path);
   4224 }