cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

rdma.c (129923B)


      1/*
      2 * RDMA protocol and interfaces
      3 *
      4 * Copyright IBM, Corp. 2010-2013
      5 * Copyright Red Hat, Inc. 2015-2016
      6 *
      7 * Authors:
      8 *  Michael R. Hines <mrhines@us.ibm.com>
      9 *  Jiuxing Liu <jl@us.ibm.com>
     10 *  Daniel P. Berrange <berrange@redhat.com>
     11 *
     12 * This work is licensed under the terms of the GNU GPL, version 2 or
     13 * later.  See the COPYING file in the top-level directory.
     14 *
     15 */
     16
     17#include "qemu/osdep.h"
     18#include "qapi/error.h"
     19#include "qemu/cutils.h"
     20#include "rdma.h"
     21#include "migration.h"
     22#include "qemu-file.h"
     23#include "ram.h"
     24#include "qemu-file-channel.h"
     25#include "qemu/error-report.h"
     26#include "qemu/main-loop.h"
     27#include "qemu/module.h"
     28#include "qemu/rcu.h"
     29#include "qemu/sockets.h"
     30#include "qemu/bitmap.h"
     31#include "qemu/coroutine.h"
     32#include "exec/memory.h"
     33#include <sys/socket.h>
     34#include <netdb.h>
     35#include <arpa/inet.h>
     36#include <rdma/rdma_cma.h>
     37#include "trace.h"
     38#include "qom/object.h"
     39#include <poll.h>
     40
     41/*
     42 * Print and error on both the Monitor and the Log file.
     43 */
     44#define ERROR(errp, fmt, ...) \
     45    do { \
     46        fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
     47        if (errp && (*(errp) == NULL)) { \
     48            error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
     49        } \
     50    } while (0)
     51
     52#define RDMA_RESOLVE_TIMEOUT_MS 10000
     53
     54/* Do not merge data if larger than this. */
     55#define RDMA_MERGE_MAX (2 * 1024 * 1024)
     56#define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
     57
     58#define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
     59
     60/*
     61 * This is only for non-live state being migrated.
     62 * Instead of RDMA_WRITE messages, we use RDMA_SEND
     63 * messages for that state, which requires a different
     64 * delivery design than main memory.
     65 */
     66#define RDMA_SEND_INCREMENT 32768
     67
     68/*
     69 * Maximum size infiniband SEND message
     70 */
     71#define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
     72#define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
     73
     74#define RDMA_CONTROL_VERSION_CURRENT 1
     75/*
     76 * Capabilities for negotiation.
     77 */
     78#define RDMA_CAPABILITY_PIN_ALL 0x01
     79
     80/*
     81 * Add the other flags above to this list of known capabilities
     82 * as they are introduced.
     83 */
     84static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
     85
     86#define CHECK_ERROR_STATE() \
     87    do { \
     88        if (rdma->error_state) { \
     89            if (!rdma->error_reported) { \
     90                error_report("RDMA is in an error state waiting migration" \
     91                                " to abort!"); \
     92                rdma->error_reported = 1; \
     93            } \
     94            return rdma->error_state; \
     95        } \
     96    } while (0)
     97
     98/*
     99 * A work request ID is 64-bits and we split up these bits
    100 * into 3 parts:
    101 *
    102 * bits 0-15 : type of control message, 2^16
    103 * bits 16-29: ram block index, 2^14
    104 * bits 30-63: ram block chunk number, 2^34
    105 *
    106 * The last two bit ranges are only used for RDMA writes,
    107 * in order to track their completion and potentially
    108 * also track unregistration status of the message.
    109 */
    110#define RDMA_WRID_TYPE_SHIFT  0UL
    111#define RDMA_WRID_BLOCK_SHIFT 16UL
    112#define RDMA_WRID_CHUNK_SHIFT 30UL
    113
    114#define RDMA_WRID_TYPE_MASK \
    115    ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
    116
    117#define RDMA_WRID_BLOCK_MASK \
    118    (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
    119
    120#define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
    121
    122/*
    123 * RDMA migration protocol:
    124 * 1. RDMA Writes (data messages, i.e. RAM)
    125 * 2. IB Send/Recv (control channel messages)
    126 */
    127enum {
    128    RDMA_WRID_NONE = 0,
    129    RDMA_WRID_RDMA_WRITE = 1,
    130    RDMA_WRID_SEND_CONTROL = 2000,
    131    RDMA_WRID_RECV_CONTROL = 4000,
    132};
    133
    134static const char *wrid_desc[] = {
    135    [RDMA_WRID_NONE] = "NONE",
    136    [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
    137    [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
    138    [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
    139};
    140
    141/*
    142 * Work request IDs for IB SEND messages only (not RDMA writes).
    143 * This is used by the migration protocol to transmit
    144 * control messages (such as device state and registration commands)
    145 *
    146 * We could use more WRs, but we have enough for now.
    147 */
    148enum {
    149    RDMA_WRID_READY = 0,
    150    RDMA_WRID_DATA,
    151    RDMA_WRID_CONTROL,
    152    RDMA_WRID_MAX,
    153};
    154
    155/*
    156 * SEND/RECV IB Control Messages.
    157 */
    158enum {
    159    RDMA_CONTROL_NONE = 0,
    160    RDMA_CONTROL_ERROR,
    161    RDMA_CONTROL_READY,               /* ready to receive */
    162    RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
    163    RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
    164    RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
    165    RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
    166    RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
    167    RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
    168    RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
    169    RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
    170    RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
    171};
    172
    173
    174/*
    175 * Memory and MR structures used to represent an IB Send/Recv work request.
    176 * This is *not* used for RDMA writes, only IB Send/Recv.
    177 */
    178typedef struct {
    179    uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
    180    struct   ibv_mr *control_mr;               /* registration metadata */
    181    size_t   control_len;                      /* length of the message */
    182    uint8_t *control_curr;                     /* start of unconsumed bytes */
    183} RDMAWorkRequestData;
    184
    185/*
    186 * Negotiate RDMA capabilities during connection-setup time.
    187 */
    188typedef struct {
    189    uint32_t version;
    190    uint32_t flags;
    191} RDMACapabilities;
    192
    193static void caps_to_network(RDMACapabilities *cap)
    194{
    195    cap->version = htonl(cap->version);
    196    cap->flags = htonl(cap->flags);
    197}
    198
    199static void network_to_caps(RDMACapabilities *cap)
    200{
    201    cap->version = ntohl(cap->version);
    202    cap->flags = ntohl(cap->flags);
    203}
    204
    205/*
    206 * Representation of a RAMBlock from an RDMA perspective.
    207 * This is not transmitted, only local.
    208 * This and subsequent structures cannot be linked lists
    209 * because we're using a single IB message to transmit
    210 * the information. It's small anyway, so a list is overkill.
    211 */
    212typedef struct RDMALocalBlock {
    213    char          *block_name;
    214    uint8_t       *local_host_addr; /* local virtual address */
    215    uint64_t       remote_host_addr; /* remote virtual address */
    216    uint64_t       offset;
    217    uint64_t       length;
    218    struct         ibv_mr **pmr;    /* MRs for chunk-level registration */
    219    struct         ibv_mr *mr;      /* MR for non-chunk-level registration */
    220    uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
    221    uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration */
    222    int            index;           /* which block are we */
    223    unsigned int   src_index;       /* (Only used on dest) */
    224    bool           is_ram_block;
    225    int            nb_chunks;
    226    unsigned long *transit_bitmap;
    227    unsigned long *unregister_bitmap;
    228} RDMALocalBlock;
    229
    230/*
    231 * Also represents a RAMblock, but only on the dest.
    232 * This gets transmitted by the dest during connection-time
    233 * to the source VM and then is used to populate the
    234 * corresponding RDMALocalBlock with
    235 * the information needed to perform the actual RDMA.
    236 */
    237typedef struct QEMU_PACKED RDMADestBlock {
    238    uint64_t remote_host_addr;
    239    uint64_t offset;
    240    uint64_t length;
    241    uint32_t remote_rkey;
    242    uint32_t padding;
    243} RDMADestBlock;
    244
    245static const char *control_desc(unsigned int rdma_control)
    246{
    247    static const char *strs[] = {
    248        [RDMA_CONTROL_NONE] = "NONE",
    249        [RDMA_CONTROL_ERROR] = "ERROR",
    250        [RDMA_CONTROL_READY] = "READY",
    251        [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
    252        [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
    253        [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
    254        [RDMA_CONTROL_COMPRESS] = "COMPRESS",
    255        [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
    256        [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
    257        [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
    258        [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
    259        [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
    260    };
    261
    262    if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
    263        return "??BAD CONTROL VALUE??";
    264    }
    265
    266    return strs[rdma_control];
    267}
    268
    269static uint64_t htonll(uint64_t v)
    270{
    271    union { uint32_t lv[2]; uint64_t llv; } u;
    272    u.lv[0] = htonl(v >> 32);
    273    u.lv[1] = htonl(v & 0xFFFFFFFFULL);
    274    return u.llv;
    275}
    276
    277static uint64_t ntohll(uint64_t v)
    278{
    279    union { uint32_t lv[2]; uint64_t llv; } u;
    280    u.llv = v;
    281    return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
    282}
    283
    284static void dest_block_to_network(RDMADestBlock *db)
    285{
    286    db->remote_host_addr = htonll(db->remote_host_addr);
    287    db->offset = htonll(db->offset);
    288    db->length = htonll(db->length);
    289    db->remote_rkey = htonl(db->remote_rkey);
    290}
    291
    292static void network_to_dest_block(RDMADestBlock *db)
    293{
    294    db->remote_host_addr = ntohll(db->remote_host_addr);
    295    db->offset = ntohll(db->offset);
    296    db->length = ntohll(db->length);
    297    db->remote_rkey = ntohl(db->remote_rkey);
    298}
    299
    300/*
    301 * Virtual address of the above structures used for transmitting
    302 * the RAMBlock descriptions at connection-time.
    303 * This structure is *not* transmitted.
    304 */
    305typedef struct RDMALocalBlocks {
    306    int nb_blocks;
    307    bool     init;             /* main memory init complete */
    308    RDMALocalBlock *block;
    309} RDMALocalBlocks;
    310
    311/*
    312 * Main data structure for RDMA state.
    313 * While there is only one copy of this structure being allocated right now,
    314 * this is the place where one would start if you wanted to consider
    315 * having more than one RDMA connection open at the same time.
    316 */
    317typedef struct RDMAContext {
    318    char *host;
    319    int port;
    320    char *host_port;
    321
    322    RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
    323
    324    /*
    325     * This is used by *_exchange_send() to figure out whether or not
    326     * the initial "READY" message has already been received or not.
    327     * This is because other functions may potentially poll() and detect
    328     * the READY message before send() does, in which case we need to
    329     * know if it completed.
    330     */
    331    int control_ready_expected;
    332
    333    /* number of outstanding writes */
    334    int nb_sent;
    335
    336    /* store info about current buffer so that we can
    337       merge it with future sends */
    338    uint64_t current_addr;
    339    uint64_t current_length;
    340    /* index of ram block the current buffer belongs to */
    341    int current_index;
    342    /* index of the chunk in the current ram block */
    343    int current_chunk;
    344
    345    bool pin_all;
    346
    347    /*
    348     * infiniband-specific variables for opening the device
    349     * and maintaining connection state and so forth.
    350     *
    351     * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
    352     * cm_id->verbs, cm_id->channel, and cm_id->qp.
    353     */
    354    struct rdma_cm_id *cm_id;               /* connection manager ID */
    355    struct rdma_cm_id *listen_id;
    356    bool connected;
    357
    358    struct ibv_context          *verbs;
    359    struct rdma_event_channel   *channel;
    360    struct ibv_qp *qp;                      /* queue pair */
    361    struct ibv_comp_channel *comp_channel;  /* completion channel */
    362    struct ibv_pd *pd;                      /* protection domain */
    363    struct ibv_cq *cq;                      /* completion queue */
    364
    365    /*
    366     * If a previous write failed (perhaps because of a failed
    367     * memory registration, then do not attempt any future work
    368     * and remember the error state.
    369     */
    370    int error_state;
    371    int error_reported;
    372    int received_error;
    373
    374    /*
    375     * Description of ram blocks used throughout the code.
    376     */
    377    RDMALocalBlocks local_ram_blocks;
    378    RDMADestBlock  *dest_blocks;
    379
    380    /* Index of the next RAMBlock received during block registration */
    381    unsigned int    next_src_index;
    382
    383    /*
    384     * Migration on *destination* started.
    385     * Then use coroutine yield function.
    386     * Source runs in a thread, so we don't care.
    387     */
    388    int migration_started_on_destination;
    389
    390    int total_registrations;
    391    int total_writes;
    392
    393    int unregister_current, unregister_next;
    394    uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
    395
    396    GHashTable *blockmap;
    397
    398    /* the RDMAContext for return path */
    399    struct RDMAContext *return_path;
    400    bool is_return_path;
    401} RDMAContext;
    402
    403#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
    404OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA)
    405
    406
    407
    408struct QIOChannelRDMA {
    409    QIOChannel parent;
    410    RDMAContext *rdmain;
    411    RDMAContext *rdmaout;
    412    QEMUFile *file;
    413    bool blocking; /* XXX we don't actually honour this yet */
    414};
    415
    416/*
    417 * Main structure for IB Send/Recv control messages.
    418 * This gets prepended at the beginning of every Send/Recv.
    419 */
    420typedef struct QEMU_PACKED {
    421    uint32_t len;     /* Total length of data portion */
    422    uint32_t type;    /* which control command to perform */
    423    uint32_t repeat;  /* number of commands in data portion of same type */
    424    uint32_t padding;
    425} RDMAControlHeader;
    426
    427static void control_to_network(RDMAControlHeader *control)
    428{
    429    control->type = htonl(control->type);
    430    control->len = htonl(control->len);
    431    control->repeat = htonl(control->repeat);
    432}
    433
    434static void network_to_control(RDMAControlHeader *control)
    435{
    436    control->type = ntohl(control->type);
    437    control->len = ntohl(control->len);
    438    control->repeat = ntohl(control->repeat);
    439}
    440
    441/*
    442 * Register a single Chunk.
    443 * Information sent by the source VM to inform the dest
    444 * to register an single chunk of memory before we can perform
    445 * the actual RDMA operation.
    446 */
    447typedef struct QEMU_PACKED {
    448    union QEMU_PACKED {
    449        uint64_t current_addr;  /* offset into the ram_addr_t space */
    450        uint64_t chunk;         /* chunk to lookup if unregistering */
    451    } key;
    452    uint32_t current_index; /* which ramblock the chunk belongs to */
    453    uint32_t padding;
    454    uint64_t chunks;            /* how many sequential chunks to register */
    455} RDMARegister;
    456
    457static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
    458{
    459    RDMALocalBlock *local_block;
    460    local_block  = &rdma->local_ram_blocks.block[reg->current_index];
    461
    462    if (local_block->is_ram_block) {
    463        /*
    464         * current_addr as passed in is an address in the local ram_addr_t
    465         * space, we need to translate this for the destination
    466         */
    467        reg->key.current_addr -= local_block->offset;
    468        reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
    469    }
    470    reg->key.current_addr = htonll(reg->key.current_addr);
    471    reg->current_index = htonl(reg->current_index);
    472    reg->chunks = htonll(reg->chunks);
    473}
    474
    475static void network_to_register(RDMARegister *reg)
    476{
    477    reg->key.current_addr = ntohll(reg->key.current_addr);
    478    reg->current_index = ntohl(reg->current_index);
    479    reg->chunks = ntohll(reg->chunks);
    480}
    481
    482typedef struct QEMU_PACKED {
    483    uint32_t value;     /* if zero, we will madvise() */
    484    uint32_t block_idx; /* which ram block index */
    485    uint64_t offset;    /* Address in remote ram_addr_t space */
    486    uint64_t length;    /* length of the chunk */
    487} RDMACompress;
    488
    489static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
    490{
    491    comp->value = htonl(comp->value);
    492    /*
    493     * comp->offset as passed in is an address in the local ram_addr_t
    494     * space, we need to translate this for the destination
    495     */
    496    comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
    497    comp->offset += rdma->dest_blocks[comp->block_idx].offset;
    498    comp->block_idx = htonl(comp->block_idx);
    499    comp->offset = htonll(comp->offset);
    500    comp->length = htonll(comp->length);
    501}
    502
    503static void network_to_compress(RDMACompress *comp)
    504{
    505    comp->value = ntohl(comp->value);
    506    comp->block_idx = ntohl(comp->block_idx);
    507    comp->offset = ntohll(comp->offset);
    508    comp->length = ntohll(comp->length);
    509}
    510
    511/*
    512 * The result of the dest's memory registration produces an "rkey"
    513 * which the source VM must reference in order to perform
    514 * the RDMA operation.
    515 */
    516typedef struct QEMU_PACKED {
    517    uint32_t rkey;
    518    uint32_t padding;
    519    uint64_t host_addr;
    520} RDMARegisterResult;
    521
    522static void result_to_network(RDMARegisterResult *result)
    523{
    524    result->rkey = htonl(result->rkey);
    525    result->host_addr = htonll(result->host_addr);
    526};
    527
    528static void network_to_result(RDMARegisterResult *result)
    529{
    530    result->rkey = ntohl(result->rkey);
    531    result->host_addr = ntohll(result->host_addr);
    532};
    533
    534const char *print_wrid(int wrid);
    535static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
    536                                   uint8_t *data, RDMAControlHeader *resp,
    537                                   int *resp_idx,
    538                                   int (*callback)(RDMAContext *rdma));
    539
    540static inline uint64_t ram_chunk_index(const uint8_t *start,
    541                                       const uint8_t *host)
    542{
    543    return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
    544}
    545
    546static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
    547                                       uint64_t i)
    548{
    549    return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
    550                                  (i << RDMA_REG_CHUNK_SHIFT));
    551}
    552
    553static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
    554                                     uint64_t i)
    555{
    556    uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
    557                                         (1UL << RDMA_REG_CHUNK_SHIFT);
    558
    559    if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
    560        result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
    561    }
    562
    563    return result;
    564}
    565
    566static int rdma_add_block(RDMAContext *rdma, const char *block_name,
    567                         void *host_addr,
    568                         ram_addr_t block_offset, uint64_t length)
    569{
    570    RDMALocalBlocks *local = &rdma->local_ram_blocks;
    571    RDMALocalBlock *block;
    572    RDMALocalBlock *old = local->block;
    573
    574    local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
    575
    576    if (local->nb_blocks) {
    577        int x;
    578
    579        if (rdma->blockmap) {
    580            for (x = 0; x < local->nb_blocks; x++) {
    581                g_hash_table_remove(rdma->blockmap,
    582                                    (void *)(uintptr_t)old[x].offset);
    583                g_hash_table_insert(rdma->blockmap,
    584                                    (void *)(uintptr_t)old[x].offset,
    585                                    &local->block[x]);
    586            }
    587        }
    588        memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
    589        g_free(old);
    590    }
    591
    592    block = &local->block[local->nb_blocks];
    593
    594    block->block_name = g_strdup(block_name);
    595    block->local_host_addr = host_addr;
    596    block->offset = block_offset;
    597    block->length = length;
    598    block->index = local->nb_blocks;
    599    block->src_index = ~0U; /* Filled in by the receipt of the block list */
    600    block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
    601    block->transit_bitmap = bitmap_new(block->nb_chunks);
    602    bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
    603    block->unregister_bitmap = bitmap_new(block->nb_chunks);
    604    bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
    605    block->remote_keys = g_new0(uint32_t, block->nb_chunks);
    606
    607    block->is_ram_block = local->init ? false : true;
    608
    609    if (rdma->blockmap) {
    610        g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
    611    }
    612
    613    trace_rdma_add_block(block_name, local->nb_blocks,
    614                         (uintptr_t) block->local_host_addr,
    615                         block->offset, block->length,
    616                         (uintptr_t) (block->local_host_addr + block->length),
    617                         BITS_TO_LONGS(block->nb_chunks) *
    618                             sizeof(unsigned long) * 8,
    619                         block->nb_chunks);
    620
    621    local->nb_blocks++;
    622
    623    return 0;
    624}
    625
    626/*
    627 * Memory regions need to be registered with the device and queue pairs setup
    628 * in advanced before the migration starts. This tells us where the RAM blocks
    629 * are so that we can register them individually.
    630 */
    631static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque)
    632{
    633    const char *block_name = qemu_ram_get_idstr(rb);
    634    void *host_addr = qemu_ram_get_host_addr(rb);
    635    ram_addr_t block_offset = qemu_ram_get_offset(rb);
    636    ram_addr_t length = qemu_ram_get_used_length(rb);
    637    return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
    638}
    639
    640/*
    641 * Identify the RAMBlocks and their quantity. They will be references to
    642 * identify chunk boundaries inside each RAMBlock and also be referenced
    643 * during dynamic page registration.
    644 */
    645static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
    646{
    647    RDMALocalBlocks *local = &rdma->local_ram_blocks;
    648    int ret;
    649
    650    assert(rdma->blockmap == NULL);
    651    memset(local, 0, sizeof *local);
    652    ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma);
    653    if (ret) {
    654        return ret;
    655    }
    656    trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
    657    rdma->dest_blocks = g_new0(RDMADestBlock,
    658                               rdma->local_ram_blocks.nb_blocks);
    659    local->init = true;
    660    return 0;
    661}
    662
    663/*
    664 * Note: If used outside of cleanup, the caller must ensure that the destination
    665 * block structures are also updated
    666 */
    667static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
    668{
    669    RDMALocalBlocks *local = &rdma->local_ram_blocks;
    670    RDMALocalBlock *old = local->block;
    671    int x;
    672
    673    if (rdma->blockmap) {
    674        g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
    675    }
    676    if (block->pmr) {
    677        int j;
    678
    679        for (j = 0; j < block->nb_chunks; j++) {
    680            if (!block->pmr[j]) {
    681                continue;
    682            }
    683            ibv_dereg_mr(block->pmr[j]);
    684            rdma->total_registrations--;
    685        }
    686        g_free(block->pmr);
    687        block->pmr = NULL;
    688    }
    689
    690    if (block->mr) {
    691        ibv_dereg_mr(block->mr);
    692        rdma->total_registrations--;
    693        block->mr = NULL;
    694    }
    695
    696    g_free(block->transit_bitmap);
    697    block->transit_bitmap = NULL;
    698
    699    g_free(block->unregister_bitmap);
    700    block->unregister_bitmap = NULL;
    701
    702    g_free(block->remote_keys);
    703    block->remote_keys = NULL;
    704
    705    g_free(block->block_name);
    706    block->block_name = NULL;
    707
    708    if (rdma->blockmap) {
    709        for (x = 0; x < local->nb_blocks; x++) {
    710            g_hash_table_remove(rdma->blockmap,
    711                                (void *)(uintptr_t)old[x].offset);
    712        }
    713    }
    714
    715    if (local->nb_blocks > 1) {
    716
    717        local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
    718
    719        if (block->index) {
    720            memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
    721        }
    722
    723        if (block->index < (local->nb_blocks - 1)) {
    724            memcpy(local->block + block->index, old + (block->index + 1),
    725                sizeof(RDMALocalBlock) *
    726                    (local->nb_blocks - (block->index + 1)));
    727            for (x = block->index; x < local->nb_blocks - 1; x++) {
    728                local->block[x].index--;
    729            }
    730        }
    731    } else {
    732        assert(block == local->block);
    733        local->block = NULL;
    734    }
    735
    736    trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
    737                           block->offset, block->length,
    738                            (uintptr_t)(block->local_host_addr + block->length),
    739                           BITS_TO_LONGS(block->nb_chunks) *
    740                               sizeof(unsigned long) * 8, block->nb_chunks);
    741
    742    g_free(old);
    743
    744    local->nb_blocks--;
    745
    746    if (local->nb_blocks && rdma->blockmap) {
    747        for (x = 0; x < local->nb_blocks; x++) {
    748            g_hash_table_insert(rdma->blockmap,
    749                                (void *)(uintptr_t)local->block[x].offset,
    750                                &local->block[x]);
    751        }
    752    }
    753
    754    return 0;
    755}
    756
    757/*
    758 * Put in the log file which RDMA device was opened and the details
    759 * associated with that device.
    760 */
    761static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
    762{
    763    struct ibv_port_attr port;
    764
    765    if (ibv_query_port(verbs, 1, &port)) {
    766        error_report("Failed to query port information");
    767        return;
    768    }
    769
    770    printf("%s RDMA Device opened: kernel name %s "
    771           "uverbs device name %s, "
    772           "infiniband_verbs class device path %s, "
    773           "infiniband class device path %s, "
    774           "transport: (%d) %s\n",
    775                who,
    776                verbs->device->name,
    777                verbs->device->dev_name,
    778                verbs->device->dev_path,
    779                verbs->device->ibdev_path,
    780                port.link_layer,
    781                (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
    782                 ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
    783                    ? "Ethernet" : "Unknown"));
    784}
    785
    786/*
    787 * Put in the log file the RDMA gid addressing information,
    788 * useful for folks who have trouble understanding the
    789 * RDMA device hierarchy in the kernel.
    790 */
    791static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
    792{
    793    char sgid[33];
    794    char dgid[33];
    795    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
    796    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
    797    trace_qemu_rdma_dump_gid(who, sgid, dgid);
    798}
    799
    800/*
    801 * As of now, IPv6 over RoCE / iWARP is not supported by linux.
    802 * We will try the next addrinfo struct, and fail if there are
    803 * no other valid addresses to bind against.
    804 *
    805 * If user is listening on '[::]', then we will not have a opened a device
    806 * yet and have no way of verifying if the device is RoCE or not.
    807 *
    808 * In this case, the source VM will throw an error for ALL types of
    809 * connections (both IPv4 and IPv6) if the destination machine does not have
    810 * a regular infiniband network available for use.
    811 *
    812 * The only way to guarantee that an error is thrown for broken kernels is
    813 * for the management software to choose a *specific* interface at bind time
    814 * and validate what time of hardware it is.
    815 *
    816 * Unfortunately, this puts the user in a fix:
    817 *
    818 *  If the source VM connects with an IPv4 address without knowing that the
    819 *  destination has bound to '[::]' the migration will unconditionally fail
    820 *  unless the management software is explicitly listening on the IPv4
    821 *  address while using a RoCE-based device.
    822 *
    823 *  If the source VM connects with an IPv6 address, then we're OK because we can
    824 *  throw an error on the source (and similarly on the destination).
    825 *
    826 *  But in mixed environments, this will be broken for a while until it is fixed
    827 *  inside linux.
    828 *
    829 * We do provide a *tiny* bit of help in this function: We can list all of the
    830 * devices in the system and check to see if all the devices are RoCE or
    831 * Infiniband.
    832 *
    833 * If we detect that we have a *pure* RoCE environment, then we can safely
    834 * thrown an error even if the management software has specified '[::]' as the
    835 * bind address.
    836 *
    837 * However, if there is are multiple hetergeneous devices, then we cannot make
    838 * this assumption and the user just has to be sure they know what they are
    839 * doing.
    840 *
    841 * Patches are being reviewed on linux-rdma.
    842 */
    843static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
    844{
    845    /* This bug only exists in linux, to our knowledge. */
    846#ifdef CONFIG_LINUX
    847    struct ibv_port_attr port_attr;
    848
    849    /*
    850     * Verbs are only NULL if management has bound to '[::]'.
    851     *
    852     * Let's iterate through all the devices and see if there any pure IB
    853     * devices (non-ethernet).
    854     *
    855     * If not, then we can safely proceed with the migration.
    856     * Otherwise, there are no guarantees until the bug is fixed in linux.
    857     */
    858    if (!verbs) {
    859        int num_devices, x;
    860        struct ibv_device **dev_list = ibv_get_device_list(&num_devices);
    861        bool roce_found = false;
    862        bool ib_found = false;
    863
    864        for (x = 0; x < num_devices; x++) {
    865            verbs = ibv_open_device(dev_list[x]);
    866            if (!verbs) {
    867                if (errno == EPERM) {
    868                    continue;
    869                } else {
    870                    return -EINVAL;
    871                }
    872            }
    873
    874            if (ibv_query_port(verbs, 1, &port_attr)) {
    875                ibv_close_device(verbs);
    876                ERROR(errp, "Could not query initial IB port");
    877                return -EINVAL;
    878            }
    879
    880            if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
    881                ib_found = true;
    882            } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
    883                roce_found = true;
    884            }
    885
    886            ibv_close_device(verbs);
    887
    888        }
    889
    890        if (roce_found) {
    891            if (ib_found) {
    892                fprintf(stderr, "WARN: migrations may fail:"
    893                                " IPv6 over RoCE / iWARP in linux"
    894                                " is broken. But since you appear to have a"
    895                                " mixed RoCE / IB environment, be sure to only"
    896                                " migrate over the IB fabric until the kernel "
    897                                " fixes the bug.\n");
    898            } else {
    899                ERROR(errp, "You only have RoCE / iWARP devices in your systems"
    900                            " and your management software has specified '[::]'"
    901                            ", but IPv6 over RoCE / iWARP is not supported in Linux.");
    902                return -ENONET;
    903            }
    904        }
    905
    906        return 0;
    907    }
    908
    909    /*
    910     * If we have a verbs context, that means that some other than '[::]' was
    911     * used by the management software for binding. In which case we can
    912     * actually warn the user about a potentially broken kernel.
    913     */
    914
    915    /* IB ports start with 1, not 0 */
    916    if (ibv_query_port(verbs, 1, &port_attr)) {
    917        ERROR(errp, "Could not query initial IB port");
    918        return -EINVAL;
    919    }
    920
    921    if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
    922        ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
    923                    "(but patches on linux-rdma in progress)");
    924        return -ENONET;
    925    }
    926
    927#endif
    928
    929    return 0;
    930}
    931
    932/*
    933 * Figure out which RDMA device corresponds to the requested IP hostname
    934 * Also create the initial connection manager identifiers for opening
    935 * the connection.
    936 */
    937static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
    938{
    939    int ret;
    940    struct rdma_addrinfo *res;
    941    char port_str[16];
    942    struct rdma_cm_event *cm_event;
    943    char ip[40] = "unknown";
    944    struct rdma_addrinfo *e;
    945
    946    if (rdma->host == NULL || !strcmp(rdma->host, "")) {
    947        ERROR(errp, "RDMA hostname has not been set");
    948        return -EINVAL;
    949    }
    950
    951    /* create CM channel */
    952    rdma->channel = rdma_create_event_channel();
    953    if (!rdma->channel) {
    954        ERROR(errp, "could not create CM channel");
    955        return -EINVAL;
    956    }
    957
    958    /* create CM id */
    959    ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
    960    if (ret) {
    961        ERROR(errp, "could not create channel id");
    962        goto err_resolve_create_id;
    963    }
    964
    965    snprintf(port_str, 16, "%d", rdma->port);
    966    port_str[15] = '\0';
    967
    968    ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
    969    if (ret < 0) {
    970        ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
    971        goto err_resolve_get_addr;
    972    }
    973
    974    for (e = res; e != NULL; e = e->ai_next) {
    975        inet_ntop(e->ai_family,
    976            &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
    977        trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
    978
    979        ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
    980                RDMA_RESOLVE_TIMEOUT_MS);
    981        if (!ret) {
    982            if (e->ai_family == AF_INET6) {
    983                ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp);
    984                if (ret) {
    985                    continue;
    986                }
    987            }
    988            goto route;
    989        }
    990    }
    991
    992    rdma_freeaddrinfo(res);
    993    ERROR(errp, "could not resolve address %s", rdma->host);
    994    goto err_resolve_get_addr;
    995
    996route:
    997    rdma_freeaddrinfo(res);
    998    qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
    999
   1000    ret = rdma_get_cm_event(rdma->channel, &cm_event);
   1001    if (ret) {
   1002        ERROR(errp, "could not perform event_addr_resolved");
   1003        goto err_resolve_get_addr;
   1004    }
   1005
   1006    if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
   1007        ERROR(errp, "result not equal to event_addr_resolved %s",
   1008                rdma_event_str(cm_event->event));
   1009        error_report("rdma_resolve_addr");
   1010        rdma_ack_cm_event(cm_event);
   1011        ret = -EINVAL;
   1012        goto err_resolve_get_addr;
   1013    }
   1014    rdma_ack_cm_event(cm_event);
   1015
   1016    /* resolve route */
   1017    ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
   1018    if (ret) {
   1019        ERROR(errp, "could not resolve rdma route");
   1020        goto err_resolve_get_addr;
   1021    }
   1022
   1023    ret = rdma_get_cm_event(rdma->channel, &cm_event);
   1024    if (ret) {
   1025        ERROR(errp, "could not perform event_route_resolved");
   1026        goto err_resolve_get_addr;
   1027    }
   1028    if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
   1029        ERROR(errp, "result not equal to event_route_resolved: %s",
   1030                        rdma_event_str(cm_event->event));
   1031        rdma_ack_cm_event(cm_event);
   1032        ret = -EINVAL;
   1033        goto err_resolve_get_addr;
   1034    }
   1035    rdma_ack_cm_event(cm_event);
   1036    rdma->verbs = rdma->cm_id->verbs;
   1037    qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
   1038    qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
   1039    return 0;
   1040
   1041err_resolve_get_addr:
   1042    rdma_destroy_id(rdma->cm_id);
   1043    rdma->cm_id = NULL;
   1044err_resolve_create_id:
   1045    rdma_destroy_event_channel(rdma->channel);
   1046    rdma->channel = NULL;
   1047    return ret;
   1048}
   1049
   1050/*
   1051 * Create protection domain and completion queues
   1052 */
   1053static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
   1054{
   1055    /* allocate pd */
   1056    rdma->pd = ibv_alloc_pd(rdma->verbs);
   1057    if (!rdma->pd) {
   1058        error_report("failed to allocate protection domain");
   1059        return -1;
   1060    }
   1061
   1062    /* create completion channel */
   1063    rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
   1064    if (!rdma->comp_channel) {
   1065        error_report("failed to allocate completion channel");
   1066        goto err_alloc_pd_cq;
   1067    }
   1068
   1069    /*
   1070     * Completion queue can be filled by both read and write work requests,
   1071     * so must reflect the sum of both possible queue sizes.
   1072     */
   1073    rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
   1074            NULL, rdma->comp_channel, 0);
   1075    if (!rdma->cq) {
   1076        error_report("failed to allocate completion queue");
   1077        goto err_alloc_pd_cq;
   1078    }
   1079
   1080    return 0;
   1081
   1082err_alloc_pd_cq:
   1083    if (rdma->pd) {
   1084        ibv_dealloc_pd(rdma->pd);
   1085    }
   1086    if (rdma->comp_channel) {
   1087        ibv_destroy_comp_channel(rdma->comp_channel);
   1088    }
   1089    rdma->pd = NULL;
   1090    rdma->comp_channel = NULL;
   1091    return -1;
   1092
   1093}
   1094
   1095/*
   1096 * Create queue pairs.
   1097 */
   1098static int qemu_rdma_alloc_qp(RDMAContext *rdma)
   1099{
   1100    struct ibv_qp_init_attr attr = { 0 };
   1101    int ret;
   1102
   1103    attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
   1104    attr.cap.max_recv_wr = 3;
   1105    attr.cap.max_send_sge = 1;
   1106    attr.cap.max_recv_sge = 1;
   1107    attr.send_cq = rdma->cq;
   1108    attr.recv_cq = rdma->cq;
   1109    attr.qp_type = IBV_QPT_RC;
   1110
   1111    ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
   1112    if (ret) {
   1113        return -1;
   1114    }
   1115
   1116    rdma->qp = rdma->cm_id->qp;
   1117    return 0;
   1118}
   1119
   1120static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
   1121{
   1122    int i;
   1123    RDMALocalBlocks *local = &rdma->local_ram_blocks;
   1124
   1125    for (i = 0; i < local->nb_blocks; i++) {
   1126        local->block[i].mr =
   1127            ibv_reg_mr(rdma->pd,
   1128                    local->block[i].local_host_addr,
   1129                    local->block[i].length,
   1130                    IBV_ACCESS_LOCAL_WRITE |
   1131                    IBV_ACCESS_REMOTE_WRITE
   1132                    );
   1133        if (!local->block[i].mr) {
   1134            perror("Failed to register local dest ram block!");
   1135            break;
   1136        }
   1137        rdma->total_registrations++;
   1138    }
   1139
   1140    if (i >= local->nb_blocks) {
   1141        return 0;
   1142    }
   1143
   1144    for (i--; i >= 0; i--) {
   1145        ibv_dereg_mr(local->block[i].mr);
   1146        local->block[i].mr = NULL;
   1147        rdma->total_registrations--;
   1148    }
   1149
   1150    return -1;
   1151
   1152}
   1153
   1154/*
   1155 * Find the ram block that corresponds to the page requested to be
   1156 * transmitted by QEMU.
   1157 *
   1158 * Once the block is found, also identify which 'chunk' within that
   1159 * block that the page belongs to.
   1160 *
   1161 * This search cannot fail or the migration will fail.
   1162 */
   1163static int qemu_rdma_search_ram_block(RDMAContext *rdma,
   1164                                      uintptr_t block_offset,
   1165                                      uint64_t offset,
   1166                                      uint64_t length,
   1167                                      uint64_t *block_index,
   1168                                      uint64_t *chunk_index)
   1169{
   1170    uint64_t current_addr = block_offset + offset;
   1171    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
   1172                                                (void *) block_offset);
   1173    assert(block);
   1174    assert(current_addr >= block->offset);
   1175    assert((current_addr + length) <= (block->offset + block->length));
   1176
   1177    *block_index = block->index;
   1178    *chunk_index = ram_chunk_index(block->local_host_addr,
   1179                block->local_host_addr + (current_addr - block->offset));
   1180
   1181    return 0;
   1182}
   1183
   1184/*
   1185 * Register a chunk with IB. If the chunk was already registered
   1186 * previously, then skip.
   1187 *
   1188 * Also return the keys associated with the registration needed
   1189 * to perform the actual RDMA operation.
   1190 */
   1191static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
   1192        RDMALocalBlock *block, uintptr_t host_addr,
   1193        uint32_t *lkey, uint32_t *rkey, int chunk,
   1194        uint8_t *chunk_start, uint8_t *chunk_end)
   1195{
   1196    if (block->mr) {
   1197        if (lkey) {
   1198            *lkey = block->mr->lkey;
   1199        }
   1200        if (rkey) {
   1201            *rkey = block->mr->rkey;
   1202        }
   1203        return 0;
   1204    }
   1205
   1206    /* allocate memory to store chunk MRs */
   1207    if (!block->pmr) {
   1208        block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
   1209    }
   1210
   1211    /*
   1212     * If 'rkey', then we're the destination, so grant access to the source.
   1213     *
   1214     * If 'lkey', then we're the source VM, so grant access only to ourselves.
   1215     */
   1216    if (!block->pmr[chunk]) {
   1217        uint64_t len = chunk_end - chunk_start;
   1218
   1219        trace_qemu_rdma_register_and_get_keys(len, chunk_start);
   1220
   1221        block->pmr[chunk] = ibv_reg_mr(rdma->pd,
   1222                chunk_start, len,
   1223                (rkey ? (IBV_ACCESS_LOCAL_WRITE |
   1224                        IBV_ACCESS_REMOTE_WRITE) : 0));
   1225
   1226        if (!block->pmr[chunk]) {
   1227            perror("Failed to register chunk!");
   1228            fprintf(stderr, "Chunk details: block: %d chunk index %d"
   1229                            " start %" PRIuPTR " end %" PRIuPTR
   1230                            " host %" PRIuPTR
   1231                            " local %" PRIuPTR " registrations: %d\n",
   1232                            block->index, chunk, (uintptr_t)chunk_start,
   1233                            (uintptr_t)chunk_end, host_addr,
   1234                            (uintptr_t)block->local_host_addr,
   1235                            rdma->total_registrations);
   1236            return -1;
   1237        }
   1238        rdma->total_registrations++;
   1239    }
   1240
   1241    if (lkey) {
   1242        *lkey = block->pmr[chunk]->lkey;
   1243    }
   1244    if (rkey) {
   1245        *rkey = block->pmr[chunk]->rkey;
   1246    }
   1247    return 0;
   1248}
   1249
   1250/*
   1251 * Register (at connection time) the memory used for control
   1252 * channel messages.
   1253 */
   1254static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
   1255{
   1256    rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
   1257            rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
   1258            IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
   1259    if (rdma->wr_data[idx].control_mr) {
   1260        rdma->total_registrations++;
   1261        return 0;
   1262    }
   1263    error_report("qemu_rdma_reg_control failed");
   1264    return -1;
   1265}
   1266
   1267const char *print_wrid(int wrid)
   1268{
   1269    if (wrid >= RDMA_WRID_RECV_CONTROL) {
   1270        return wrid_desc[RDMA_WRID_RECV_CONTROL];
   1271    }
   1272    return wrid_desc[wrid];
   1273}
   1274
   1275/*
   1276 * RDMA requires memory registration (mlock/pinning), but this is not good for
   1277 * overcommitment.
   1278 *
   1279 * In preparation for the future where LRU information or workload-specific
   1280 * writable writable working set memory access behavior is available to QEMU
   1281 * it would be nice to have in place the ability to UN-register/UN-pin
   1282 * particular memory regions from the RDMA hardware when it is determine that
   1283 * those regions of memory will likely not be accessed again in the near future.
   1284 *
   1285 * While we do not yet have such information right now, the following
   1286 * compile-time option allows us to perform a non-optimized version of this
   1287 * behavior.
   1288 *
   1289 * By uncommenting this option, you will cause *all* RDMA transfers to be
   1290 * unregistered immediately after the transfer completes on both sides of the
   1291 * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
   1292 *
   1293 * This will have a terrible impact on migration performance, so until future
   1294 * workload information or LRU information is available, do not attempt to use
   1295 * this feature except for basic testing.
   1296 */
   1297/* #define RDMA_UNREGISTRATION_EXAMPLE */
   1298
   1299/*
   1300 * Perform a non-optimized memory unregistration after every transfer
   1301 * for demonstration purposes, only if pin-all is not requested.
   1302 *
   1303 * Potential optimizations:
   1304 * 1. Start a new thread to run this function continuously
   1305        - for bit clearing
   1306        - and for receipt of unregister messages
   1307 * 2. Use an LRU.
   1308 * 3. Use workload hints.
   1309 */
   1310static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
   1311{
   1312    while (rdma->unregistrations[rdma->unregister_current]) {
   1313        int ret;
   1314        uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
   1315        uint64_t chunk =
   1316            (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
   1317        uint64_t index =
   1318            (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
   1319        RDMALocalBlock *block =
   1320            &(rdma->local_ram_blocks.block[index]);
   1321        RDMARegister reg = { .current_index = index };
   1322        RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
   1323                                 };
   1324        RDMAControlHeader head = { .len = sizeof(RDMARegister),
   1325                                   .type = RDMA_CONTROL_UNREGISTER_REQUEST,
   1326                                   .repeat = 1,
   1327                                 };
   1328
   1329        trace_qemu_rdma_unregister_waiting_proc(chunk,
   1330                                                rdma->unregister_current);
   1331
   1332        rdma->unregistrations[rdma->unregister_current] = 0;
   1333        rdma->unregister_current++;
   1334
   1335        if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
   1336            rdma->unregister_current = 0;
   1337        }
   1338
   1339
   1340        /*
   1341         * Unregistration is speculative (because migration is single-threaded
   1342         * and we cannot break the protocol's inifinband message ordering).
   1343         * Thus, if the memory is currently being used for transmission,
   1344         * then abort the attempt to unregister and try again
   1345         * later the next time a completion is received for this memory.
   1346         */
   1347        clear_bit(chunk, block->unregister_bitmap);
   1348
   1349        if (test_bit(chunk, block->transit_bitmap)) {
   1350            trace_qemu_rdma_unregister_waiting_inflight(chunk);
   1351            continue;
   1352        }
   1353
   1354        trace_qemu_rdma_unregister_waiting_send(chunk);
   1355
   1356        ret = ibv_dereg_mr(block->pmr[chunk]);
   1357        block->pmr[chunk] = NULL;
   1358        block->remote_keys[chunk] = 0;
   1359
   1360        if (ret != 0) {
   1361            perror("unregistration chunk failed");
   1362            return -ret;
   1363        }
   1364        rdma->total_registrations--;
   1365
   1366        reg.key.chunk = chunk;
   1367        register_to_network(rdma, &reg);
   1368        ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
   1369                                &resp, NULL, NULL);
   1370        if (ret < 0) {
   1371            return ret;
   1372        }
   1373
   1374        trace_qemu_rdma_unregister_waiting_complete(chunk);
   1375    }
   1376
   1377    return 0;
   1378}
   1379
   1380static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
   1381                                         uint64_t chunk)
   1382{
   1383    uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
   1384
   1385    result |= (index << RDMA_WRID_BLOCK_SHIFT);
   1386    result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
   1387
   1388    return result;
   1389}
   1390
   1391/*
   1392 * Set bit for unregistration in the next iteration.
   1393 * We cannot transmit right here, but will unpin later.
   1394 */
   1395static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
   1396                                        uint64_t chunk, uint64_t wr_id)
   1397{
   1398    if (rdma->unregistrations[rdma->unregister_next] != 0) {
   1399        error_report("rdma migration: queue is full");
   1400    } else {
   1401        RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
   1402
   1403        if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
   1404            trace_qemu_rdma_signal_unregister_append(chunk,
   1405                                                     rdma->unregister_next);
   1406
   1407            rdma->unregistrations[rdma->unregister_next++] =
   1408                    qemu_rdma_make_wrid(wr_id, index, chunk);
   1409
   1410            if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
   1411                rdma->unregister_next = 0;
   1412            }
   1413        } else {
   1414            trace_qemu_rdma_signal_unregister_already(chunk);
   1415        }
   1416    }
   1417}
   1418
   1419/*
   1420 * Consult the connection manager to see a work request
   1421 * (of any kind) has completed.
   1422 * Return the work request ID that completed.
   1423 */
   1424static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
   1425                               uint32_t *byte_len)
   1426{
   1427    int ret;
   1428    struct ibv_wc wc;
   1429    uint64_t wr_id;
   1430
   1431    ret = ibv_poll_cq(rdma->cq, 1, &wc);
   1432
   1433    if (!ret) {
   1434        *wr_id_out = RDMA_WRID_NONE;
   1435        return 0;
   1436    }
   1437
   1438    if (ret < 0) {
   1439        error_report("ibv_poll_cq return %d", ret);
   1440        return ret;
   1441    }
   1442
   1443    wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
   1444
   1445    if (wc.status != IBV_WC_SUCCESS) {
   1446        fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
   1447                        wc.status, ibv_wc_status_str(wc.status));
   1448        fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
   1449
   1450        return -1;
   1451    }
   1452
   1453    if (rdma->control_ready_expected &&
   1454        (wr_id >= RDMA_WRID_RECV_CONTROL)) {
   1455        trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL],
   1456                  wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
   1457        rdma->control_ready_expected = 0;
   1458    }
   1459
   1460    if (wr_id == RDMA_WRID_RDMA_WRITE) {
   1461        uint64_t chunk =
   1462            (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
   1463        uint64_t index =
   1464            (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
   1465        RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
   1466
   1467        trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
   1468                                   index, chunk, block->local_host_addr,
   1469                                   (void *)(uintptr_t)block->remote_host_addr);
   1470
   1471        clear_bit(chunk, block->transit_bitmap);
   1472
   1473        if (rdma->nb_sent > 0) {
   1474            rdma->nb_sent--;
   1475        }
   1476
   1477        if (!rdma->pin_all) {
   1478            /*
   1479             * FYI: If one wanted to signal a specific chunk to be unregistered
   1480             * using LRU or workload-specific information, this is the function
   1481             * you would call to do so. That chunk would then get asynchronously
   1482             * unregistered later.
   1483             */
   1484#ifdef RDMA_UNREGISTRATION_EXAMPLE
   1485            qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
   1486#endif
   1487        }
   1488    } else {
   1489        trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
   1490    }
   1491
   1492    *wr_id_out = wc.wr_id;
   1493    if (byte_len) {
   1494        *byte_len = wc.byte_len;
   1495    }
   1496
   1497    return  0;
   1498}
   1499
   1500/* Wait for activity on the completion channel.
   1501 * Returns 0 on success, none-0 on error.
   1502 */
   1503static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
   1504{
   1505    struct rdma_cm_event *cm_event;
   1506    int ret = -1;
   1507
   1508    /*
   1509     * Coroutine doesn't start until migration_fd_process_incoming()
   1510     * so don't yield unless we know we're running inside of a coroutine.
   1511     */
   1512    if (rdma->migration_started_on_destination &&
   1513        migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
   1514        yield_until_fd_readable(rdma->comp_channel->fd);
   1515    } else {
   1516        /* This is the source side, we're in a separate thread
   1517         * or destination prior to migration_fd_process_incoming()
   1518         * after postcopy, the destination also in a separate thread.
   1519         * we can't yield; so we have to poll the fd.
   1520         * But we need to be able to handle 'cancel' or an error
   1521         * without hanging forever.
   1522         */
   1523        while (!rdma->error_state  && !rdma->received_error) {
   1524            GPollFD pfds[2];
   1525            pfds[0].fd = rdma->comp_channel->fd;
   1526            pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
   1527            pfds[0].revents = 0;
   1528
   1529            pfds[1].fd = rdma->channel->fd;
   1530            pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
   1531            pfds[1].revents = 0;
   1532
   1533            /* 0.1s timeout, should be fine for a 'cancel' */
   1534            switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) {
   1535            case 2:
   1536            case 1: /* fd active */
   1537                if (pfds[0].revents) {
   1538                    return 0;
   1539                }
   1540
   1541                if (pfds[1].revents) {
   1542                    ret = rdma_get_cm_event(rdma->channel, &cm_event);
   1543                    if (ret) {
   1544                        error_report("failed to get cm event while wait "
   1545                                     "completion channel");
   1546                        return -EPIPE;
   1547                    }
   1548
   1549                    error_report("receive cm event while wait comp channel,"
   1550                                 "cm event is %d", cm_event->event);
   1551                    if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
   1552                        cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
   1553                        rdma_ack_cm_event(cm_event);
   1554                        return -EPIPE;
   1555                    }
   1556                    rdma_ack_cm_event(cm_event);
   1557                }
   1558                break;
   1559
   1560            case 0: /* Timeout, go around again */
   1561                break;
   1562
   1563            default: /* Error of some type -
   1564                      * I don't trust errno from qemu_poll_ns
   1565                     */
   1566                error_report("%s: poll failed", __func__);
   1567                return -EPIPE;
   1568            }
   1569
   1570            if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
   1571                /* Bail out and let the cancellation happen */
   1572                return -EPIPE;
   1573            }
   1574        }
   1575    }
   1576
   1577    if (rdma->received_error) {
   1578        return -EPIPE;
   1579    }
   1580    return rdma->error_state;
   1581}
   1582
   1583/*
   1584 * Block until the next work request has completed.
   1585 *
   1586 * First poll to see if a work request has already completed,
   1587 * otherwise block.
   1588 *
   1589 * If we encounter completed work requests for IDs other than
   1590 * the one we're interested in, then that's generally an error.
   1591 *
   1592 * The only exception is actual RDMA Write completions. These
   1593 * completions only need to be recorded, but do not actually
   1594 * need further processing.
   1595 */
   1596static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
   1597                                    uint32_t *byte_len)
   1598{
   1599    int num_cq_events = 0, ret = 0;
   1600    struct ibv_cq *cq;
   1601    void *cq_ctx;
   1602    uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
   1603
   1604    if (ibv_req_notify_cq(rdma->cq, 0)) {
   1605        return -1;
   1606    }
   1607    /* poll cq first */
   1608    while (wr_id != wrid_requested) {
   1609        ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
   1610        if (ret < 0) {
   1611            return ret;
   1612        }
   1613
   1614        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
   1615
   1616        if (wr_id == RDMA_WRID_NONE) {
   1617            break;
   1618        }
   1619        if (wr_id != wrid_requested) {
   1620            trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
   1621                       wrid_requested, print_wrid(wr_id), wr_id);
   1622        }
   1623    }
   1624
   1625    if (wr_id == wrid_requested) {
   1626        return 0;
   1627    }
   1628
   1629    while (1) {
   1630        ret = qemu_rdma_wait_comp_channel(rdma);
   1631        if (ret) {
   1632            goto err_block_for_wrid;
   1633        }
   1634
   1635        ret = ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx);
   1636        if (ret) {
   1637            perror("ibv_get_cq_event");
   1638            goto err_block_for_wrid;
   1639        }
   1640
   1641        num_cq_events++;
   1642
   1643        ret = -ibv_req_notify_cq(cq, 0);
   1644        if (ret) {
   1645            goto err_block_for_wrid;
   1646        }
   1647
   1648        while (wr_id != wrid_requested) {
   1649            ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
   1650            if (ret < 0) {
   1651                goto err_block_for_wrid;
   1652            }
   1653
   1654            wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
   1655
   1656            if (wr_id == RDMA_WRID_NONE) {
   1657                break;
   1658            }
   1659            if (wr_id != wrid_requested) {
   1660                trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
   1661                                   wrid_requested, print_wrid(wr_id), wr_id);
   1662            }
   1663        }
   1664
   1665        if (wr_id == wrid_requested) {
   1666            goto success_block_for_wrid;
   1667        }
   1668    }
   1669
   1670success_block_for_wrid:
   1671    if (num_cq_events) {
   1672        ibv_ack_cq_events(cq, num_cq_events);
   1673    }
   1674    return 0;
   1675
   1676err_block_for_wrid:
   1677    if (num_cq_events) {
   1678        ibv_ack_cq_events(cq, num_cq_events);
   1679    }
   1680
   1681    rdma->error_state = ret;
   1682    return ret;
   1683}
   1684
   1685/*
   1686 * Post a SEND message work request for the control channel
   1687 * containing some data and block until the post completes.
   1688 */
   1689static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
   1690                                       RDMAControlHeader *head)
   1691{
   1692    int ret = 0;
   1693    RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
   1694    struct ibv_send_wr *bad_wr;
   1695    struct ibv_sge sge = {
   1696                           .addr = (uintptr_t)(wr->control),
   1697                           .length = head->len + sizeof(RDMAControlHeader),
   1698                           .lkey = wr->control_mr->lkey,
   1699                         };
   1700    struct ibv_send_wr send_wr = {
   1701                                   .wr_id = RDMA_WRID_SEND_CONTROL,
   1702                                   .opcode = IBV_WR_SEND,
   1703                                   .send_flags = IBV_SEND_SIGNALED,
   1704                                   .sg_list = &sge,
   1705                                   .num_sge = 1,
   1706                                };
   1707
   1708    trace_qemu_rdma_post_send_control(control_desc(head->type));
   1709
   1710    /*
   1711     * We don't actually need to do a memcpy() in here if we used
   1712     * the "sge" properly, but since we're only sending control messages
   1713     * (not RAM in a performance-critical path), then its OK for now.
   1714     *
   1715     * The copy makes the RDMAControlHeader simpler to manipulate
   1716     * for the time being.
   1717     */
   1718    assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
   1719    memcpy(wr->control, head, sizeof(RDMAControlHeader));
   1720    control_to_network((void *) wr->control);
   1721
   1722    if (buf) {
   1723        memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
   1724    }
   1725
   1726
   1727    ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
   1728
   1729    if (ret > 0) {
   1730        error_report("Failed to use post IB SEND for control");
   1731        return -ret;
   1732    }
   1733
   1734    ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
   1735    if (ret < 0) {
   1736        error_report("rdma migration: send polling control error");
   1737    }
   1738
   1739    return ret;
   1740}
   1741
   1742/*
   1743 * Post a RECV work request in anticipation of some future receipt
   1744 * of data on the control channel.
   1745 */
   1746static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
   1747{
   1748    struct ibv_recv_wr *bad_wr;
   1749    struct ibv_sge sge = {
   1750                            .addr = (uintptr_t)(rdma->wr_data[idx].control),
   1751                            .length = RDMA_CONTROL_MAX_BUFFER,
   1752                            .lkey = rdma->wr_data[idx].control_mr->lkey,
   1753                         };
   1754
   1755    struct ibv_recv_wr recv_wr = {
   1756                                    .wr_id = RDMA_WRID_RECV_CONTROL + idx,
   1757                                    .sg_list = &sge,
   1758                                    .num_sge = 1,
   1759                                 };
   1760
   1761
   1762    if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
   1763        return -1;
   1764    }
   1765
   1766    return 0;
   1767}
   1768
   1769/*
   1770 * Block and wait for a RECV control channel message to arrive.
   1771 */
   1772static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
   1773                RDMAControlHeader *head, int expecting, int idx)
   1774{
   1775    uint32_t byte_len;
   1776    int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
   1777                                       &byte_len);
   1778
   1779    if (ret < 0) {
   1780        error_report("rdma migration: recv polling control error!");
   1781        return ret;
   1782    }
   1783
   1784    network_to_control((void *) rdma->wr_data[idx].control);
   1785    memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
   1786
   1787    trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
   1788
   1789    if (expecting == RDMA_CONTROL_NONE) {
   1790        trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
   1791                                             head->type);
   1792    } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
   1793        error_report("Was expecting a %s (%d) control message"
   1794                ", but got: %s (%d), length: %d",
   1795                control_desc(expecting), expecting,
   1796                control_desc(head->type), head->type, head->len);
   1797        if (head->type == RDMA_CONTROL_ERROR) {
   1798            rdma->received_error = true;
   1799        }
   1800        return -EIO;
   1801    }
   1802    if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
   1803        error_report("too long length: %d", head->len);
   1804        return -EINVAL;
   1805    }
   1806    if (sizeof(*head) + head->len != byte_len) {
   1807        error_report("Malformed length: %d byte_len %d", head->len, byte_len);
   1808        return -EINVAL;
   1809    }
   1810
   1811    return 0;
   1812}
   1813
   1814/*
   1815 * When a RECV work request has completed, the work request's
   1816 * buffer is pointed at the header.
   1817 *
   1818 * This will advance the pointer to the data portion
   1819 * of the control message of the work request's buffer that
   1820 * was populated after the work request finished.
   1821 */
   1822static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
   1823                                  RDMAControlHeader *head)
   1824{
   1825    rdma->wr_data[idx].control_len = head->len;
   1826    rdma->wr_data[idx].control_curr =
   1827        rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
   1828}
   1829
   1830/*
   1831 * This is an 'atomic' high-level operation to deliver a single, unified
   1832 * control-channel message.
   1833 *
   1834 * Additionally, if the user is expecting some kind of reply to this message,
   1835 * they can request a 'resp' response message be filled in by posting an
   1836 * additional work request on behalf of the user and waiting for an additional
   1837 * completion.
   1838 *
   1839 * The extra (optional) response is used during registration to us from having
   1840 * to perform an *additional* exchange of message just to provide a response by
   1841 * instead piggy-backing on the acknowledgement.
   1842 */
   1843static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
   1844                                   uint8_t *data, RDMAControlHeader *resp,
   1845                                   int *resp_idx,
   1846                                   int (*callback)(RDMAContext *rdma))
   1847{
   1848    int ret = 0;
   1849
   1850    /*
   1851     * Wait until the dest is ready before attempting to deliver the message
   1852     * by waiting for a READY message.
   1853     */
   1854    if (rdma->control_ready_expected) {
   1855        RDMAControlHeader resp;
   1856        ret = qemu_rdma_exchange_get_response(rdma,
   1857                                    &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
   1858        if (ret < 0) {
   1859            return ret;
   1860        }
   1861    }
   1862
   1863    /*
   1864     * If the user is expecting a response, post a WR in anticipation of it.
   1865     */
   1866    if (resp) {
   1867        ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
   1868        if (ret) {
   1869            error_report("rdma migration: error posting"
   1870                    " extra control recv for anticipated result!");
   1871            return ret;
   1872        }
   1873    }
   1874
   1875    /*
   1876     * Post a WR to replace the one we just consumed for the READY message.
   1877     */
   1878    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
   1879    if (ret) {
   1880        error_report("rdma migration: error posting first control recv!");
   1881        return ret;
   1882    }
   1883
   1884    /*
   1885     * Deliver the control message that was requested.
   1886     */
   1887    ret = qemu_rdma_post_send_control(rdma, data, head);
   1888
   1889    if (ret < 0) {
   1890        error_report("Failed to send control buffer!");
   1891        return ret;
   1892    }
   1893
   1894    /*
   1895     * If we're expecting a response, block and wait for it.
   1896     */
   1897    if (resp) {
   1898        if (callback) {
   1899            trace_qemu_rdma_exchange_send_issue_callback();
   1900            ret = callback(rdma);
   1901            if (ret < 0) {
   1902                return ret;
   1903            }
   1904        }
   1905
   1906        trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
   1907        ret = qemu_rdma_exchange_get_response(rdma, resp,
   1908                                              resp->type, RDMA_WRID_DATA);
   1909
   1910        if (ret < 0) {
   1911            return ret;
   1912        }
   1913
   1914        qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
   1915        if (resp_idx) {
   1916            *resp_idx = RDMA_WRID_DATA;
   1917        }
   1918        trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
   1919    }
   1920
   1921    rdma->control_ready_expected = 1;
   1922
   1923    return 0;
   1924}
   1925
   1926/*
   1927 * This is an 'atomic' high-level operation to receive a single, unified
   1928 * control-channel message.
   1929 */
   1930static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
   1931                                int expecting)
   1932{
   1933    RDMAControlHeader ready = {
   1934                                .len = 0,
   1935                                .type = RDMA_CONTROL_READY,
   1936                                .repeat = 1,
   1937                              };
   1938    int ret;
   1939
   1940    /*
   1941     * Inform the source that we're ready to receive a message.
   1942     */
   1943    ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
   1944
   1945    if (ret < 0) {
   1946        error_report("Failed to send control buffer!");
   1947        return ret;
   1948    }
   1949
   1950    /*
   1951     * Block and wait for the message.
   1952     */
   1953    ret = qemu_rdma_exchange_get_response(rdma, head,
   1954                                          expecting, RDMA_WRID_READY);
   1955
   1956    if (ret < 0) {
   1957        return ret;
   1958    }
   1959
   1960    qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
   1961
   1962    /*
   1963     * Post a new RECV work request to replace the one we just consumed.
   1964     */
   1965    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
   1966    if (ret) {
   1967        error_report("rdma migration: error posting second control recv!");
   1968        return ret;
   1969    }
   1970
   1971    return 0;
   1972}
   1973
   1974/*
   1975 * Write an actual chunk of memory using RDMA.
   1976 *
   1977 * If we're using dynamic registration on the dest-side, we have to
   1978 * send a registration command first.
   1979 */
   1980static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
   1981                               int current_index, uint64_t current_addr,
   1982                               uint64_t length)
   1983{
   1984    struct ibv_sge sge;
   1985    struct ibv_send_wr send_wr = { 0 };
   1986    struct ibv_send_wr *bad_wr;
   1987    int reg_result_idx, ret, count = 0;
   1988    uint64_t chunk, chunks;
   1989    uint8_t *chunk_start, *chunk_end;
   1990    RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
   1991    RDMARegister reg;
   1992    RDMARegisterResult *reg_result;
   1993    RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
   1994    RDMAControlHeader head = { .len = sizeof(RDMARegister),
   1995                               .type = RDMA_CONTROL_REGISTER_REQUEST,
   1996                               .repeat = 1,
   1997                             };
   1998
   1999retry:
   2000    sge.addr = (uintptr_t)(block->local_host_addr +
   2001                            (current_addr - block->offset));
   2002    sge.length = length;
   2003
   2004    chunk = ram_chunk_index(block->local_host_addr,
   2005                            (uint8_t *)(uintptr_t)sge.addr);
   2006    chunk_start = ram_chunk_start(block, chunk);
   2007
   2008    if (block->is_ram_block) {
   2009        chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
   2010
   2011        if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
   2012            chunks--;
   2013        }
   2014    } else {
   2015        chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
   2016
   2017        if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
   2018            chunks--;
   2019        }
   2020    }
   2021
   2022    trace_qemu_rdma_write_one_top(chunks + 1,
   2023                                  (chunks + 1) *
   2024                                  (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
   2025
   2026    chunk_end = ram_chunk_end(block, chunk + chunks);
   2027
   2028    if (!rdma->pin_all) {
   2029#ifdef RDMA_UNREGISTRATION_EXAMPLE
   2030        qemu_rdma_unregister_waiting(rdma);
   2031#endif
   2032    }
   2033
   2034    while (test_bit(chunk, block->transit_bitmap)) {
   2035        (void)count;
   2036        trace_qemu_rdma_write_one_block(count++, current_index, chunk,
   2037                sge.addr, length, rdma->nb_sent, block->nb_chunks);
   2038
   2039        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
   2040
   2041        if (ret < 0) {
   2042            error_report("Failed to Wait for previous write to complete "
   2043                    "block %d chunk %" PRIu64
   2044                    " current %" PRIu64 " len %" PRIu64 " %d",
   2045                    current_index, chunk, sge.addr, length, rdma->nb_sent);
   2046            return ret;
   2047        }
   2048    }
   2049
   2050    if (!rdma->pin_all || !block->is_ram_block) {
   2051        if (!block->remote_keys[chunk]) {
   2052            /*
   2053             * This chunk has not yet been registered, so first check to see
   2054             * if the entire chunk is zero. If so, tell the other size to
   2055             * memset() + madvise() the entire chunk without RDMA.
   2056             */
   2057
   2058            if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
   2059                RDMACompress comp = {
   2060                                        .offset = current_addr,
   2061                                        .value = 0,
   2062                                        .block_idx = current_index,
   2063                                        .length = length,
   2064                                    };
   2065
   2066                head.len = sizeof(comp);
   2067                head.type = RDMA_CONTROL_COMPRESS;
   2068
   2069                trace_qemu_rdma_write_one_zero(chunk, sge.length,
   2070                                               current_index, current_addr);
   2071
   2072                compress_to_network(rdma, &comp);
   2073                ret = qemu_rdma_exchange_send(rdma, &head,
   2074                                (uint8_t *) &comp, NULL, NULL, NULL);
   2075
   2076                if (ret < 0) {
   2077                    return -EIO;
   2078                }
   2079
   2080                acct_update_position(f, sge.length, true);
   2081
   2082                return 1;
   2083            }
   2084
   2085            /*
   2086             * Otherwise, tell other side to register.
   2087             */
   2088            reg.current_index = current_index;
   2089            if (block->is_ram_block) {
   2090                reg.key.current_addr = current_addr;
   2091            } else {
   2092                reg.key.chunk = chunk;
   2093            }
   2094            reg.chunks = chunks;
   2095
   2096            trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
   2097                                              current_addr);
   2098
   2099            register_to_network(rdma, &reg);
   2100            ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
   2101                                    &resp, &reg_result_idx, NULL);
   2102            if (ret < 0) {
   2103                return ret;
   2104            }
   2105
   2106            /* try to overlap this single registration with the one we sent. */
   2107            if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
   2108                                                &sge.lkey, NULL, chunk,
   2109                                                chunk_start, chunk_end)) {
   2110                error_report("cannot get lkey");
   2111                return -EINVAL;
   2112            }
   2113
   2114            reg_result = (RDMARegisterResult *)
   2115                    rdma->wr_data[reg_result_idx].control_curr;
   2116
   2117            network_to_result(reg_result);
   2118
   2119            trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
   2120                                                 reg_result->rkey, chunk);
   2121
   2122            block->remote_keys[chunk] = reg_result->rkey;
   2123            block->remote_host_addr = reg_result->host_addr;
   2124        } else {
   2125            /* already registered before */
   2126            if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
   2127                                                &sge.lkey, NULL, chunk,
   2128                                                chunk_start, chunk_end)) {
   2129                error_report("cannot get lkey!");
   2130                return -EINVAL;
   2131            }
   2132        }
   2133
   2134        send_wr.wr.rdma.rkey = block->remote_keys[chunk];
   2135    } else {
   2136        send_wr.wr.rdma.rkey = block->remote_rkey;
   2137
   2138        if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
   2139                                                     &sge.lkey, NULL, chunk,
   2140                                                     chunk_start, chunk_end)) {
   2141            error_report("cannot get lkey!");
   2142            return -EINVAL;
   2143        }
   2144    }
   2145
   2146    /*
   2147     * Encode the ram block index and chunk within this wrid.
   2148     * We will use this information at the time of completion
   2149     * to figure out which bitmap to check against and then which
   2150     * chunk in the bitmap to look for.
   2151     */
   2152    send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
   2153                                        current_index, chunk);
   2154
   2155    send_wr.opcode = IBV_WR_RDMA_WRITE;
   2156    send_wr.send_flags = IBV_SEND_SIGNALED;
   2157    send_wr.sg_list = &sge;
   2158    send_wr.num_sge = 1;
   2159    send_wr.wr.rdma.remote_addr = block->remote_host_addr +
   2160                                (current_addr - block->offset);
   2161
   2162    trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
   2163                                   sge.length);
   2164
   2165    /*
   2166     * ibv_post_send() does not return negative error numbers,
   2167     * per the specification they are positive - no idea why.
   2168     */
   2169    ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
   2170
   2171    if (ret == ENOMEM) {
   2172        trace_qemu_rdma_write_one_queue_full();
   2173        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
   2174        if (ret < 0) {
   2175            error_report("rdma migration: failed to make "
   2176                         "room in full send queue! %d", ret);
   2177            return ret;
   2178        }
   2179
   2180        goto retry;
   2181
   2182    } else if (ret > 0) {
   2183        perror("rdma migration: post rdma write failed");
   2184        return -ret;
   2185    }
   2186
   2187    set_bit(chunk, block->transit_bitmap);
   2188    acct_update_position(f, sge.length, false);
   2189    rdma->total_writes++;
   2190
   2191    return 0;
   2192}
   2193
   2194/*
   2195 * Push out any unwritten RDMA operations.
   2196 *
   2197 * We support sending out multiple chunks at the same time.
   2198 * Not all of them need to get signaled in the completion queue.
   2199 */
   2200static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
   2201{
   2202    int ret;
   2203
   2204    if (!rdma->current_length) {
   2205        return 0;
   2206    }
   2207
   2208    ret = qemu_rdma_write_one(f, rdma,
   2209            rdma->current_index, rdma->current_addr, rdma->current_length);
   2210
   2211    if (ret < 0) {
   2212        return ret;
   2213    }
   2214
   2215    if (ret == 0) {
   2216        rdma->nb_sent++;
   2217        trace_qemu_rdma_write_flush(rdma->nb_sent);
   2218    }
   2219
   2220    rdma->current_length = 0;
   2221    rdma->current_addr = 0;
   2222
   2223    return 0;
   2224}
   2225
   2226static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
   2227                    uint64_t offset, uint64_t len)
   2228{
   2229    RDMALocalBlock *block;
   2230    uint8_t *host_addr;
   2231    uint8_t *chunk_end;
   2232
   2233    if (rdma->current_index < 0) {
   2234        return 0;
   2235    }
   2236
   2237    if (rdma->current_chunk < 0) {
   2238        return 0;
   2239    }
   2240
   2241    block = &(rdma->local_ram_blocks.block[rdma->current_index]);
   2242    host_addr = block->local_host_addr + (offset - block->offset);
   2243    chunk_end = ram_chunk_end(block, rdma->current_chunk);
   2244
   2245    if (rdma->current_length == 0) {
   2246        return 0;
   2247    }
   2248
   2249    /*
   2250     * Only merge into chunk sequentially.
   2251     */
   2252    if (offset != (rdma->current_addr + rdma->current_length)) {
   2253        return 0;
   2254    }
   2255
   2256    if (offset < block->offset) {
   2257        return 0;
   2258    }
   2259
   2260    if ((offset + len) > (block->offset + block->length)) {
   2261        return 0;
   2262    }
   2263
   2264    if ((host_addr + len) > chunk_end) {
   2265        return 0;
   2266    }
   2267
   2268    return 1;
   2269}
   2270
   2271/*
   2272 * We're not actually writing here, but doing three things:
   2273 *
   2274 * 1. Identify the chunk the buffer belongs to.
   2275 * 2. If the chunk is full or the buffer doesn't belong to the current
   2276 *    chunk, then start a new chunk and flush() the old chunk.
   2277 * 3. To keep the hardware busy, we also group chunks into batches
   2278 *    and only require that a batch gets acknowledged in the completion
   2279 *    queue instead of each individual chunk.
   2280 */
   2281static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
   2282                           uint64_t block_offset, uint64_t offset,
   2283                           uint64_t len)
   2284{
   2285    uint64_t current_addr = block_offset + offset;
   2286    uint64_t index = rdma->current_index;
   2287    uint64_t chunk = rdma->current_chunk;
   2288    int ret;
   2289
   2290    /* If we cannot merge it, we flush the current buffer first. */
   2291    if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
   2292        ret = qemu_rdma_write_flush(f, rdma);
   2293        if (ret) {
   2294            return ret;
   2295        }
   2296        rdma->current_length = 0;
   2297        rdma->current_addr = current_addr;
   2298
   2299        ret = qemu_rdma_search_ram_block(rdma, block_offset,
   2300                                         offset, len, &index, &chunk);
   2301        if (ret) {
   2302            error_report("ram block search failed");
   2303            return ret;
   2304        }
   2305        rdma->current_index = index;
   2306        rdma->current_chunk = chunk;
   2307    }
   2308
   2309    /* merge it */
   2310    rdma->current_length += len;
   2311
   2312    /* flush it if buffer is too large */
   2313    if (rdma->current_length >= RDMA_MERGE_MAX) {
   2314        return qemu_rdma_write_flush(f, rdma);
   2315    }
   2316
   2317    return 0;
   2318}
   2319
   2320static void qemu_rdma_cleanup(RDMAContext *rdma)
   2321{
   2322    int idx;
   2323
   2324    if (rdma->cm_id && rdma->connected) {
   2325        if ((rdma->error_state ||
   2326             migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
   2327            !rdma->received_error) {
   2328            RDMAControlHeader head = { .len = 0,
   2329                                       .type = RDMA_CONTROL_ERROR,
   2330                                       .repeat = 1,
   2331                                     };
   2332            error_report("Early error. Sending error.");
   2333            qemu_rdma_post_send_control(rdma, NULL, &head);
   2334        }
   2335
   2336        rdma_disconnect(rdma->cm_id);
   2337        trace_qemu_rdma_cleanup_disconnect();
   2338        rdma->connected = false;
   2339    }
   2340
   2341    if (rdma->channel) {
   2342        qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
   2343    }
   2344    g_free(rdma->dest_blocks);
   2345    rdma->dest_blocks = NULL;
   2346
   2347    for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
   2348        if (rdma->wr_data[idx].control_mr) {
   2349            rdma->total_registrations--;
   2350            ibv_dereg_mr(rdma->wr_data[idx].control_mr);
   2351        }
   2352        rdma->wr_data[idx].control_mr = NULL;
   2353    }
   2354
   2355    if (rdma->local_ram_blocks.block) {
   2356        while (rdma->local_ram_blocks.nb_blocks) {
   2357            rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
   2358        }
   2359    }
   2360
   2361    if (rdma->qp) {
   2362        rdma_destroy_qp(rdma->cm_id);
   2363        rdma->qp = NULL;
   2364    }
   2365    if (rdma->cq) {
   2366        ibv_destroy_cq(rdma->cq);
   2367        rdma->cq = NULL;
   2368    }
   2369    if (rdma->comp_channel) {
   2370        ibv_destroy_comp_channel(rdma->comp_channel);
   2371        rdma->comp_channel = NULL;
   2372    }
   2373    if (rdma->pd) {
   2374        ibv_dealloc_pd(rdma->pd);
   2375        rdma->pd = NULL;
   2376    }
   2377    if (rdma->cm_id) {
   2378        rdma_destroy_id(rdma->cm_id);
   2379        rdma->cm_id = NULL;
   2380    }
   2381
   2382    /* the destination side, listen_id and channel is shared */
   2383    if (rdma->listen_id) {
   2384        if (!rdma->is_return_path) {
   2385            rdma_destroy_id(rdma->listen_id);
   2386        }
   2387        rdma->listen_id = NULL;
   2388
   2389        if (rdma->channel) {
   2390            if (!rdma->is_return_path) {
   2391                rdma_destroy_event_channel(rdma->channel);
   2392            }
   2393            rdma->channel = NULL;
   2394        }
   2395    }
   2396
   2397    if (rdma->channel) {
   2398        rdma_destroy_event_channel(rdma->channel);
   2399        rdma->channel = NULL;
   2400    }
   2401    g_free(rdma->host);
   2402    g_free(rdma->host_port);
   2403    rdma->host = NULL;
   2404    rdma->host_port = NULL;
   2405}
   2406
   2407
   2408static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
   2409{
   2410    int ret, idx;
   2411    Error *local_err = NULL, **temp = &local_err;
   2412
   2413    /*
   2414     * Will be validated against destination's actual capabilities
   2415     * after the connect() completes.
   2416     */
   2417    rdma->pin_all = pin_all;
   2418
   2419    ret = qemu_rdma_resolve_host(rdma, temp);
   2420    if (ret) {
   2421        goto err_rdma_source_init;
   2422    }
   2423
   2424    ret = qemu_rdma_alloc_pd_cq(rdma);
   2425    if (ret) {
   2426        ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
   2427                    " limits may be too low. Please check $ ulimit -a # and "
   2428                    "search for 'ulimit -l' in the output");
   2429        goto err_rdma_source_init;
   2430    }
   2431
   2432    ret = qemu_rdma_alloc_qp(rdma);
   2433    if (ret) {
   2434        ERROR(temp, "rdma migration: error allocating qp!");
   2435        goto err_rdma_source_init;
   2436    }
   2437
   2438    ret = qemu_rdma_init_ram_blocks(rdma);
   2439    if (ret) {
   2440        ERROR(temp, "rdma migration: error initializing ram blocks!");
   2441        goto err_rdma_source_init;
   2442    }
   2443
   2444    /* Build the hash that maps from offset to RAMBlock */
   2445    rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
   2446    for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
   2447        g_hash_table_insert(rdma->blockmap,
   2448                (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
   2449                &rdma->local_ram_blocks.block[idx]);
   2450    }
   2451
   2452    for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
   2453        ret = qemu_rdma_reg_control(rdma, idx);
   2454        if (ret) {
   2455            ERROR(temp, "rdma migration: error registering %d control!",
   2456                                                            idx);
   2457            goto err_rdma_source_init;
   2458        }
   2459    }
   2460
   2461    return 0;
   2462
   2463err_rdma_source_init:
   2464    error_propagate(errp, local_err);
   2465    qemu_rdma_cleanup(rdma);
   2466    return -1;
   2467}
   2468
   2469static int qemu_get_cm_event_timeout(RDMAContext *rdma,
   2470                                     struct rdma_cm_event **cm_event,
   2471                                     long msec, Error **errp)
   2472{
   2473    int ret;
   2474    struct pollfd poll_fd = {
   2475                                .fd = rdma->channel->fd,
   2476                                .events = POLLIN,
   2477                                .revents = 0
   2478                            };
   2479
   2480    do {
   2481        ret = poll(&poll_fd, 1, msec);
   2482    } while (ret < 0 && errno == EINTR);
   2483
   2484    if (ret == 0) {
   2485        ERROR(errp, "poll cm event timeout");
   2486        return -1;
   2487    } else if (ret < 0) {
   2488        ERROR(errp, "failed to poll cm event, errno=%i", errno);
   2489        return -1;
   2490    } else if (poll_fd.revents & POLLIN) {
   2491        return rdma_get_cm_event(rdma->channel, cm_event);
   2492    } else {
   2493        ERROR(errp, "no POLLIN event, revent=%x", poll_fd.revents);
   2494        return -1;
   2495    }
   2496}
   2497
   2498static int qemu_rdma_connect(RDMAContext *rdma, Error **errp, bool return_path)
   2499{
   2500    RDMACapabilities cap = {
   2501                                .version = RDMA_CONTROL_VERSION_CURRENT,
   2502                                .flags = 0,
   2503                           };
   2504    struct rdma_conn_param conn_param = { .initiator_depth = 2,
   2505                                          .retry_count = 5,
   2506                                          .private_data = &cap,
   2507                                          .private_data_len = sizeof(cap),
   2508                                        };
   2509    struct rdma_cm_event *cm_event;
   2510    int ret;
   2511
   2512    /*
   2513     * Only negotiate the capability with destination if the user
   2514     * on the source first requested the capability.
   2515     */
   2516    if (rdma->pin_all) {
   2517        trace_qemu_rdma_connect_pin_all_requested();
   2518        cap.flags |= RDMA_CAPABILITY_PIN_ALL;
   2519    }
   2520
   2521    caps_to_network(&cap);
   2522
   2523    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
   2524    if (ret) {
   2525        ERROR(errp, "posting second control recv");
   2526        goto err_rdma_source_connect;
   2527    }
   2528
   2529    ret = rdma_connect(rdma->cm_id, &conn_param);
   2530    if (ret) {
   2531        perror("rdma_connect");
   2532        ERROR(errp, "connecting to destination!");
   2533        goto err_rdma_source_connect;
   2534    }
   2535
   2536    if (return_path) {
   2537        ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp);
   2538    } else {
   2539        ret = rdma_get_cm_event(rdma->channel, &cm_event);
   2540    }
   2541    if (ret) {
   2542        perror("rdma_get_cm_event after rdma_connect");
   2543        ERROR(errp, "connecting to destination!");
   2544        goto err_rdma_source_connect;
   2545    }
   2546
   2547    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
   2548        error_report("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
   2549        ERROR(errp, "connecting to destination!");
   2550        rdma_ack_cm_event(cm_event);
   2551        goto err_rdma_source_connect;
   2552    }
   2553    rdma->connected = true;
   2554
   2555    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
   2556    network_to_caps(&cap);
   2557
   2558    /*
   2559     * Verify that the *requested* capabilities are supported by the destination
   2560     * and disable them otherwise.
   2561     */
   2562    if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
   2563        ERROR(errp, "Server cannot support pinning all memory. "
   2564                        "Will register memory dynamically.");
   2565        rdma->pin_all = false;
   2566    }
   2567
   2568    trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
   2569
   2570    rdma_ack_cm_event(cm_event);
   2571
   2572    rdma->control_ready_expected = 1;
   2573    rdma->nb_sent = 0;
   2574    return 0;
   2575
   2576err_rdma_source_connect:
   2577    qemu_rdma_cleanup(rdma);
   2578    return -1;
   2579}
   2580
   2581static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
   2582{
   2583    int ret, idx;
   2584    struct rdma_cm_id *listen_id;
   2585    char ip[40] = "unknown";
   2586    struct rdma_addrinfo *res, *e;
   2587    char port_str[16];
   2588
   2589    for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
   2590        rdma->wr_data[idx].control_len = 0;
   2591        rdma->wr_data[idx].control_curr = NULL;
   2592    }
   2593
   2594    if (!rdma->host || !rdma->host[0]) {
   2595        ERROR(errp, "RDMA host is not set!");
   2596        rdma->error_state = -EINVAL;
   2597        return -1;
   2598    }
   2599    /* create CM channel */
   2600    rdma->channel = rdma_create_event_channel();
   2601    if (!rdma->channel) {
   2602        ERROR(errp, "could not create rdma event channel");
   2603        rdma->error_state = -EINVAL;
   2604        return -1;
   2605    }
   2606
   2607    /* create CM id */
   2608    ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
   2609    if (ret) {
   2610        ERROR(errp, "could not create cm_id!");
   2611        goto err_dest_init_create_listen_id;
   2612    }
   2613
   2614    snprintf(port_str, 16, "%d", rdma->port);
   2615    port_str[15] = '\0';
   2616
   2617    ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
   2618    if (ret < 0) {
   2619        ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
   2620        goto err_dest_init_bind_addr;
   2621    }
   2622
   2623    for (e = res; e != NULL; e = e->ai_next) {
   2624        inet_ntop(e->ai_family,
   2625            &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
   2626        trace_qemu_rdma_dest_init_trying(rdma->host, ip);
   2627        ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
   2628        if (ret) {
   2629            continue;
   2630        }
   2631        if (e->ai_family == AF_INET6) {
   2632            ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp);
   2633            if (ret) {
   2634                continue;
   2635            }
   2636        }
   2637        break;
   2638    }
   2639
   2640    rdma_freeaddrinfo(res);
   2641    if (!e) {
   2642        ERROR(errp, "Error: could not rdma_bind_addr!");
   2643        goto err_dest_init_bind_addr;
   2644    }
   2645
   2646    rdma->listen_id = listen_id;
   2647    qemu_rdma_dump_gid("dest_init", listen_id);
   2648    return 0;
   2649
   2650err_dest_init_bind_addr:
   2651    rdma_destroy_id(listen_id);
   2652err_dest_init_create_listen_id:
   2653    rdma_destroy_event_channel(rdma->channel);
   2654    rdma->channel = NULL;
   2655    rdma->error_state = ret;
   2656    return ret;
   2657
   2658}
   2659
   2660static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
   2661                                            RDMAContext *rdma)
   2662{
   2663    int idx;
   2664
   2665    for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
   2666        rdma_return_path->wr_data[idx].control_len = 0;
   2667        rdma_return_path->wr_data[idx].control_curr = NULL;
   2668    }
   2669
   2670    /*the CM channel and CM id is shared*/
   2671    rdma_return_path->channel = rdma->channel;
   2672    rdma_return_path->listen_id = rdma->listen_id;
   2673
   2674    rdma->return_path = rdma_return_path;
   2675    rdma_return_path->return_path = rdma;
   2676    rdma_return_path->is_return_path = true;
   2677}
   2678
   2679static void *qemu_rdma_data_init(const char *host_port, Error **errp)
   2680{
   2681    RDMAContext *rdma = NULL;
   2682    InetSocketAddress *addr;
   2683
   2684    if (host_port) {
   2685        rdma = g_new0(RDMAContext, 1);
   2686        rdma->current_index = -1;
   2687        rdma->current_chunk = -1;
   2688
   2689        addr = g_new(InetSocketAddress, 1);
   2690        if (!inet_parse(addr, host_port, NULL)) {
   2691            rdma->port = atoi(addr->port);
   2692            rdma->host = g_strdup(addr->host);
   2693            rdma->host_port = g_strdup(host_port);
   2694        } else {
   2695            ERROR(errp, "bad RDMA migration address '%s'", host_port);
   2696            g_free(rdma);
   2697            rdma = NULL;
   2698        }
   2699
   2700        qapi_free_InetSocketAddress(addr);
   2701    }
   2702
   2703    return rdma;
   2704}
   2705
   2706/*
   2707 * QEMUFile interface to the control channel.
   2708 * SEND messages for control only.
   2709 * VM's ram is handled with regular RDMA messages.
   2710 */
   2711static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
   2712                                       const struct iovec *iov,
   2713                                       size_t niov,
   2714                                       int *fds,
   2715                                       size_t nfds,
   2716                                       Error **errp)
   2717{
   2718    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
   2719    QEMUFile *f = rioc->file;
   2720    RDMAContext *rdma;
   2721    int ret;
   2722    ssize_t done = 0;
   2723    size_t i;
   2724    size_t len = 0;
   2725
   2726    RCU_READ_LOCK_GUARD();
   2727    rdma = qatomic_rcu_read(&rioc->rdmaout);
   2728
   2729    if (!rdma) {
   2730        return -EIO;
   2731    }
   2732
   2733    CHECK_ERROR_STATE();
   2734
   2735    /*
   2736     * Push out any writes that
   2737     * we're queued up for VM's ram.
   2738     */
   2739    ret = qemu_rdma_write_flush(f, rdma);
   2740    if (ret < 0) {
   2741        rdma->error_state = ret;
   2742        return ret;
   2743    }
   2744
   2745    for (i = 0; i < niov; i++) {
   2746        size_t remaining = iov[i].iov_len;
   2747        uint8_t * data = (void *)iov[i].iov_base;
   2748        while (remaining) {
   2749            RDMAControlHeader head;
   2750
   2751            len = MIN(remaining, RDMA_SEND_INCREMENT);
   2752            remaining -= len;
   2753
   2754            head.len = len;
   2755            head.type = RDMA_CONTROL_QEMU_FILE;
   2756
   2757            ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
   2758
   2759            if (ret < 0) {
   2760                rdma->error_state = ret;
   2761                return ret;
   2762            }
   2763
   2764            data += len;
   2765            done += len;
   2766        }
   2767    }
   2768
   2769    return done;
   2770}
   2771
   2772static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
   2773                             size_t size, int idx)
   2774{
   2775    size_t len = 0;
   2776
   2777    if (rdma->wr_data[idx].control_len) {
   2778        trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
   2779
   2780        len = MIN(size, rdma->wr_data[idx].control_len);
   2781        memcpy(buf, rdma->wr_data[idx].control_curr, len);
   2782        rdma->wr_data[idx].control_curr += len;
   2783        rdma->wr_data[idx].control_len -= len;
   2784    }
   2785
   2786    return len;
   2787}
   2788
   2789/*
   2790 * QEMUFile interface to the control channel.
   2791 * RDMA links don't use bytestreams, so we have to
   2792 * return bytes to QEMUFile opportunistically.
   2793 */
   2794static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
   2795                                      const struct iovec *iov,
   2796                                      size_t niov,
   2797                                      int **fds,
   2798                                      size_t *nfds,
   2799                                      Error **errp)
   2800{
   2801    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
   2802    RDMAContext *rdma;
   2803    RDMAControlHeader head;
   2804    int ret = 0;
   2805    ssize_t i;
   2806    size_t done = 0;
   2807
   2808    RCU_READ_LOCK_GUARD();
   2809    rdma = qatomic_rcu_read(&rioc->rdmain);
   2810
   2811    if (!rdma) {
   2812        return -EIO;
   2813    }
   2814
   2815    CHECK_ERROR_STATE();
   2816
   2817    for (i = 0; i < niov; i++) {
   2818        size_t want = iov[i].iov_len;
   2819        uint8_t *data = (void *)iov[i].iov_base;
   2820
   2821        /*
   2822         * First, we hold on to the last SEND message we
   2823         * were given and dish out the bytes until we run
   2824         * out of bytes.
   2825         */
   2826        ret = qemu_rdma_fill(rdma, data, want, 0);
   2827        done += ret;
   2828        want -= ret;
   2829        /* Got what we needed, so go to next iovec */
   2830        if (want == 0) {
   2831            continue;
   2832        }
   2833
   2834        /* If we got any data so far, then don't wait
   2835         * for more, just return what we have */
   2836        if (done > 0) {
   2837            break;
   2838        }
   2839
   2840
   2841        /* We've got nothing at all, so lets wait for
   2842         * more to arrive
   2843         */
   2844        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
   2845
   2846        if (ret < 0) {
   2847            rdma->error_state = ret;
   2848            return ret;
   2849        }
   2850
   2851        /*
   2852         * SEND was received with new bytes, now try again.
   2853         */
   2854        ret = qemu_rdma_fill(rdma, data, want, 0);
   2855        done += ret;
   2856        want -= ret;
   2857
   2858        /* Still didn't get enough, so lets just return */
   2859        if (want) {
   2860            if (done == 0) {
   2861                return QIO_CHANNEL_ERR_BLOCK;
   2862            } else {
   2863                break;
   2864            }
   2865        }
   2866    }
   2867    return done;
   2868}
   2869
   2870/*
   2871 * Block until all the outstanding chunks have been delivered by the hardware.
   2872 */
   2873static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
   2874{
   2875    int ret;
   2876
   2877    if (qemu_rdma_write_flush(f, rdma) < 0) {
   2878        return -EIO;
   2879    }
   2880
   2881    while (rdma->nb_sent) {
   2882        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
   2883        if (ret < 0) {
   2884            error_report("rdma migration: complete polling error!");
   2885            return -EIO;
   2886        }
   2887    }
   2888
   2889    qemu_rdma_unregister_waiting(rdma);
   2890
   2891    return 0;
   2892}
   2893
   2894
   2895static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
   2896                                         bool blocking,
   2897                                         Error **errp)
   2898{
   2899    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
   2900    /* XXX we should make readv/writev actually honour this :-) */
   2901    rioc->blocking = blocking;
   2902    return 0;
   2903}
   2904
   2905
   2906typedef struct QIOChannelRDMASource QIOChannelRDMASource;
   2907struct QIOChannelRDMASource {
   2908    GSource parent;
   2909    QIOChannelRDMA *rioc;
   2910    GIOCondition condition;
   2911};
   2912
   2913static gboolean
   2914qio_channel_rdma_source_prepare(GSource *source,
   2915                                gint *timeout)
   2916{
   2917    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
   2918    RDMAContext *rdma;
   2919    GIOCondition cond = 0;
   2920    *timeout = -1;
   2921
   2922    RCU_READ_LOCK_GUARD();
   2923    if (rsource->condition == G_IO_IN) {
   2924        rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
   2925    } else {
   2926        rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
   2927    }
   2928
   2929    if (!rdma) {
   2930        error_report("RDMAContext is NULL when prepare Gsource");
   2931        return FALSE;
   2932    }
   2933
   2934    if (rdma->wr_data[0].control_len) {
   2935        cond |= G_IO_IN;
   2936    }
   2937    cond |= G_IO_OUT;
   2938
   2939    return cond & rsource->condition;
   2940}
   2941
   2942static gboolean
   2943qio_channel_rdma_source_check(GSource *source)
   2944{
   2945    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
   2946    RDMAContext *rdma;
   2947    GIOCondition cond = 0;
   2948
   2949    RCU_READ_LOCK_GUARD();
   2950    if (rsource->condition == G_IO_IN) {
   2951        rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
   2952    } else {
   2953        rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
   2954    }
   2955
   2956    if (!rdma) {
   2957        error_report("RDMAContext is NULL when check Gsource");
   2958        return FALSE;
   2959    }
   2960
   2961    if (rdma->wr_data[0].control_len) {
   2962        cond |= G_IO_IN;
   2963    }
   2964    cond |= G_IO_OUT;
   2965
   2966    return cond & rsource->condition;
   2967}
   2968
   2969static gboolean
   2970qio_channel_rdma_source_dispatch(GSource *source,
   2971                                 GSourceFunc callback,
   2972                                 gpointer user_data)
   2973{
   2974    QIOChannelFunc func = (QIOChannelFunc)callback;
   2975    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
   2976    RDMAContext *rdma;
   2977    GIOCondition cond = 0;
   2978
   2979    RCU_READ_LOCK_GUARD();
   2980    if (rsource->condition == G_IO_IN) {
   2981        rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
   2982    } else {
   2983        rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
   2984    }
   2985
   2986    if (!rdma) {
   2987        error_report("RDMAContext is NULL when dispatch Gsource");
   2988        return FALSE;
   2989    }
   2990
   2991    if (rdma->wr_data[0].control_len) {
   2992        cond |= G_IO_IN;
   2993    }
   2994    cond |= G_IO_OUT;
   2995
   2996    return (*func)(QIO_CHANNEL(rsource->rioc),
   2997                   (cond & rsource->condition),
   2998                   user_data);
   2999}
   3000
   3001static void
   3002qio_channel_rdma_source_finalize(GSource *source)
   3003{
   3004    QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
   3005
   3006    object_unref(OBJECT(ssource->rioc));
   3007}
   3008
   3009GSourceFuncs qio_channel_rdma_source_funcs = {
   3010    qio_channel_rdma_source_prepare,
   3011    qio_channel_rdma_source_check,
   3012    qio_channel_rdma_source_dispatch,
   3013    qio_channel_rdma_source_finalize
   3014};
   3015
   3016static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
   3017                                              GIOCondition condition)
   3018{
   3019    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
   3020    QIOChannelRDMASource *ssource;
   3021    GSource *source;
   3022
   3023    source = g_source_new(&qio_channel_rdma_source_funcs,
   3024                          sizeof(QIOChannelRDMASource));
   3025    ssource = (QIOChannelRDMASource *)source;
   3026
   3027    ssource->rioc = rioc;
   3028    object_ref(OBJECT(rioc));
   3029
   3030    ssource->condition = condition;
   3031
   3032    return source;
   3033}
   3034
   3035static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
   3036                                                  AioContext *ctx,
   3037                                                  IOHandler *io_read,
   3038                                                  IOHandler *io_write,
   3039                                                  void *opaque)
   3040{
   3041    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
   3042    if (io_read) {
   3043        aio_set_fd_handler(ctx, rioc->rdmain->comp_channel->fd,
   3044                           false, io_read, io_write, NULL, opaque);
   3045    } else {
   3046        aio_set_fd_handler(ctx, rioc->rdmaout->comp_channel->fd,
   3047                           false, io_read, io_write, NULL, opaque);
   3048    }
   3049}
   3050
   3051struct rdma_close_rcu {
   3052    struct rcu_head rcu;
   3053    RDMAContext *rdmain;
   3054    RDMAContext *rdmaout;
   3055};
   3056
   3057/* callback from qio_channel_rdma_close via call_rcu */
   3058static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu)
   3059{
   3060    if (rcu->rdmain) {
   3061        qemu_rdma_cleanup(rcu->rdmain);
   3062    }
   3063
   3064    if (rcu->rdmaout) {
   3065        qemu_rdma_cleanup(rcu->rdmaout);
   3066    }
   3067
   3068    g_free(rcu->rdmain);
   3069    g_free(rcu->rdmaout);
   3070    g_free(rcu);
   3071}
   3072
   3073static int qio_channel_rdma_close(QIOChannel *ioc,
   3074                                  Error **errp)
   3075{
   3076    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
   3077    RDMAContext *rdmain, *rdmaout;
   3078    struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1);
   3079
   3080    trace_qemu_rdma_close();
   3081
   3082    rdmain = rioc->rdmain;
   3083    if (rdmain) {
   3084        qatomic_rcu_set(&rioc->rdmain, NULL);
   3085    }
   3086
   3087    rdmaout = rioc->rdmaout;
   3088    if (rdmaout) {
   3089        qatomic_rcu_set(&rioc->rdmaout, NULL);
   3090    }
   3091
   3092    rcu->rdmain = rdmain;
   3093    rcu->rdmaout = rdmaout;
   3094    call_rcu(rcu, qio_channel_rdma_close_rcu, rcu);
   3095
   3096    return 0;
   3097}
   3098
   3099static int
   3100qio_channel_rdma_shutdown(QIOChannel *ioc,
   3101                            QIOChannelShutdown how,
   3102                            Error **errp)
   3103{
   3104    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
   3105    RDMAContext *rdmain, *rdmaout;
   3106
   3107    RCU_READ_LOCK_GUARD();
   3108
   3109    rdmain = qatomic_rcu_read(&rioc->rdmain);
   3110    rdmaout = qatomic_rcu_read(&rioc->rdmain);
   3111
   3112    switch (how) {
   3113    case QIO_CHANNEL_SHUTDOWN_READ:
   3114        if (rdmain) {
   3115            rdmain->error_state = -1;
   3116        }
   3117        break;
   3118    case QIO_CHANNEL_SHUTDOWN_WRITE:
   3119        if (rdmaout) {
   3120            rdmaout->error_state = -1;
   3121        }
   3122        break;
   3123    case QIO_CHANNEL_SHUTDOWN_BOTH:
   3124    default:
   3125        if (rdmain) {
   3126            rdmain->error_state = -1;
   3127        }
   3128        if (rdmaout) {
   3129            rdmaout->error_state = -1;
   3130        }
   3131        break;
   3132    }
   3133
   3134    return 0;
   3135}
   3136
   3137/*
   3138 * Parameters:
   3139 *    @offset == 0 :
   3140 *        This means that 'block_offset' is a full virtual address that does not
   3141 *        belong to a RAMBlock of the virtual machine and instead
   3142 *        represents a private malloc'd memory area that the caller wishes to
   3143 *        transfer.
   3144 *
   3145 *    @offset != 0 :
   3146 *        Offset is an offset to be added to block_offset and used
   3147 *        to also lookup the corresponding RAMBlock.
   3148 *
   3149 *    @size > 0 :
   3150 *        Initiate an transfer this size.
   3151 *
   3152 *    @size == 0 :
   3153 *        A 'hint' or 'advice' that means that we wish to speculatively
   3154 *        and asynchronously unregister this memory. In this case, there is no
   3155 *        guarantee that the unregister will actually happen, for example,
   3156 *        if the memory is being actively transmitted. Additionally, the memory
   3157 *        may be re-registered at any future time if a write within the same
   3158 *        chunk was requested again, even if you attempted to unregister it
   3159 *        here.
   3160 *
   3161 *    @size < 0 : TODO, not yet supported
   3162 *        Unregister the memory NOW. This means that the caller does not
   3163 *        expect there to be any future RDMA transfers and we just want to clean
   3164 *        things up. This is used in case the upper layer owns the memory and
   3165 *        cannot wait for qemu_fclose() to occur.
   3166 *
   3167 *    @bytes_sent : User-specificed pointer to indicate how many bytes were
   3168 *                  sent. Usually, this will not be more than a few bytes of
   3169 *                  the protocol because most transfers are sent asynchronously.
   3170 */
   3171static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
   3172                                  ram_addr_t block_offset, ram_addr_t offset,
   3173                                  size_t size, uint64_t *bytes_sent)
   3174{
   3175    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
   3176    RDMAContext *rdma;
   3177    int ret;
   3178
   3179    RCU_READ_LOCK_GUARD();
   3180    rdma = qatomic_rcu_read(&rioc->rdmaout);
   3181
   3182    if (!rdma) {
   3183        return -EIO;
   3184    }
   3185
   3186    CHECK_ERROR_STATE();
   3187
   3188    if (migration_in_postcopy()) {
   3189        return RAM_SAVE_CONTROL_NOT_SUPP;
   3190    }
   3191
   3192    qemu_fflush(f);
   3193
   3194    if (size > 0) {
   3195        /*
   3196         * Add this page to the current 'chunk'. If the chunk
   3197         * is full, or the page doesn't belong to the current chunk,
   3198         * an actual RDMA write will occur and a new chunk will be formed.
   3199         */
   3200        ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
   3201        if (ret < 0) {
   3202            error_report("rdma migration: write error! %d", ret);
   3203            goto err;
   3204        }
   3205
   3206        /*
   3207         * We always return 1 bytes because the RDMA
   3208         * protocol is completely asynchronous. We do not yet know
   3209         * whether an  identified chunk is zero or not because we're
   3210         * waiting for other pages to potentially be merged with
   3211         * the current chunk. So, we have to call qemu_update_position()
   3212         * later on when the actual write occurs.
   3213         */
   3214        if (bytes_sent) {
   3215            *bytes_sent = 1;
   3216        }
   3217    } else {
   3218        uint64_t index, chunk;
   3219
   3220        /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
   3221        if (size < 0) {
   3222            ret = qemu_rdma_drain_cq(f, rdma);
   3223            if (ret < 0) {
   3224                fprintf(stderr, "rdma: failed to synchronously drain"
   3225                                " completion queue before unregistration.\n");
   3226                goto err;
   3227            }
   3228        }
   3229        */
   3230
   3231        ret = qemu_rdma_search_ram_block(rdma, block_offset,
   3232                                         offset, size, &index, &chunk);
   3233
   3234        if (ret) {
   3235            error_report("ram block search failed");
   3236            goto err;
   3237        }
   3238
   3239        qemu_rdma_signal_unregister(rdma, index, chunk, 0);
   3240
   3241        /*
   3242         * TODO: Synchronous, guaranteed unregistration (should not occur during
   3243         * fast-path). Otherwise, unregisters will process on the next call to
   3244         * qemu_rdma_drain_cq()
   3245        if (size < 0) {
   3246            qemu_rdma_unregister_waiting(rdma);
   3247        }
   3248        */
   3249    }
   3250
   3251    /*
   3252     * Drain the Completion Queue if possible, but do not block,
   3253     * just poll.
   3254     *
   3255     * If nothing to poll, the end of the iteration will do this
   3256     * again to make sure we don't overflow the request queue.
   3257     */
   3258    while (1) {
   3259        uint64_t wr_id, wr_id_in;
   3260        int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
   3261        if (ret < 0) {
   3262            error_report("rdma migration: polling error! %d", ret);
   3263            goto err;
   3264        }
   3265
   3266        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
   3267
   3268        if (wr_id == RDMA_WRID_NONE) {
   3269            break;
   3270        }
   3271    }
   3272
   3273    return RAM_SAVE_CONTROL_DELAYED;
   3274err:
   3275    rdma->error_state = ret;
   3276    return ret;
   3277}
   3278
   3279static void rdma_accept_incoming_migration(void *opaque);
   3280
   3281static void rdma_cm_poll_handler(void *opaque)
   3282{
   3283    RDMAContext *rdma = opaque;
   3284    int ret;
   3285    struct rdma_cm_event *cm_event;
   3286    MigrationIncomingState *mis = migration_incoming_get_current();
   3287
   3288    ret = rdma_get_cm_event(rdma->channel, &cm_event);
   3289    if (ret) {
   3290        error_report("get_cm_event failed %d", errno);
   3291        return;
   3292    }
   3293
   3294    if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
   3295        cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
   3296        if (!rdma->error_state &&
   3297            migration_incoming_get_current()->state !=
   3298              MIGRATION_STATUS_COMPLETED) {
   3299            error_report("receive cm event, cm event is %d", cm_event->event);
   3300            rdma->error_state = -EPIPE;
   3301            if (rdma->return_path) {
   3302                rdma->return_path->error_state = -EPIPE;
   3303            }
   3304        }
   3305        rdma_ack_cm_event(cm_event);
   3306
   3307        if (mis->migration_incoming_co) {
   3308            qemu_coroutine_enter(mis->migration_incoming_co);
   3309        }
   3310        return;
   3311    }
   3312    rdma_ack_cm_event(cm_event);
   3313}
   3314
   3315static int qemu_rdma_accept(RDMAContext *rdma)
   3316{
   3317    RDMACapabilities cap;
   3318    struct rdma_conn_param conn_param = {
   3319                                            .responder_resources = 2,
   3320                                            .private_data = &cap,
   3321                                            .private_data_len = sizeof(cap),
   3322                                         };
   3323    RDMAContext *rdma_return_path = NULL;
   3324    struct rdma_cm_event *cm_event;
   3325    struct ibv_context *verbs;
   3326    int ret = -EINVAL;
   3327    int idx;
   3328
   3329    ret = rdma_get_cm_event(rdma->channel, &cm_event);
   3330    if (ret) {
   3331        goto err_rdma_dest_wait;
   3332    }
   3333
   3334    if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
   3335        rdma_ack_cm_event(cm_event);
   3336        goto err_rdma_dest_wait;
   3337    }
   3338
   3339    /*
   3340     * initialize the RDMAContext for return path for postcopy after first
   3341     * connection request reached.
   3342     */
   3343    if (migrate_postcopy() && !rdma->is_return_path) {
   3344        rdma_return_path = qemu_rdma_data_init(rdma->host_port, NULL);
   3345        if (rdma_return_path == NULL) {
   3346            rdma_ack_cm_event(cm_event);
   3347            goto err_rdma_dest_wait;
   3348        }
   3349
   3350        qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
   3351    }
   3352
   3353    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
   3354
   3355    network_to_caps(&cap);
   3356
   3357    if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
   3358            error_report("Unknown source RDMA version: %d, bailing...",
   3359                            cap.version);
   3360            rdma_ack_cm_event(cm_event);
   3361            goto err_rdma_dest_wait;
   3362    }
   3363
   3364    /*
   3365     * Respond with only the capabilities this version of QEMU knows about.
   3366     */
   3367    cap.flags &= known_capabilities;
   3368
   3369    /*
   3370     * Enable the ones that we do know about.
   3371     * Add other checks here as new ones are introduced.
   3372     */
   3373    if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
   3374        rdma->pin_all = true;
   3375    }
   3376
   3377    rdma->cm_id = cm_event->id;
   3378    verbs = cm_event->id->verbs;
   3379
   3380    rdma_ack_cm_event(cm_event);
   3381
   3382    trace_qemu_rdma_accept_pin_state(rdma->pin_all);
   3383
   3384    caps_to_network(&cap);
   3385
   3386    trace_qemu_rdma_accept_pin_verbsc(verbs);
   3387
   3388    if (!rdma->verbs) {
   3389        rdma->verbs = verbs;
   3390    } else if (rdma->verbs != verbs) {
   3391            error_report("ibv context not matching %p, %p!", rdma->verbs,
   3392                         verbs);
   3393            goto err_rdma_dest_wait;
   3394    }
   3395
   3396    qemu_rdma_dump_id("dest_init", verbs);
   3397
   3398    ret = qemu_rdma_alloc_pd_cq(rdma);
   3399    if (ret) {
   3400        error_report("rdma migration: error allocating pd and cq!");
   3401        goto err_rdma_dest_wait;
   3402    }
   3403
   3404    ret = qemu_rdma_alloc_qp(rdma);
   3405    if (ret) {
   3406        error_report("rdma migration: error allocating qp!");
   3407        goto err_rdma_dest_wait;
   3408    }
   3409
   3410    ret = qemu_rdma_init_ram_blocks(rdma);
   3411    if (ret) {
   3412        error_report("rdma migration: error initializing ram blocks!");
   3413        goto err_rdma_dest_wait;
   3414    }
   3415
   3416    for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
   3417        ret = qemu_rdma_reg_control(rdma, idx);
   3418        if (ret) {
   3419            error_report("rdma: error registering %d control", idx);
   3420            goto err_rdma_dest_wait;
   3421        }
   3422    }
   3423
   3424    /* Accept the second connection request for return path */
   3425    if (migrate_postcopy() && !rdma->is_return_path) {
   3426        qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
   3427                            NULL,
   3428                            (void *)(intptr_t)rdma->return_path);
   3429    } else {
   3430        qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
   3431                            NULL, rdma);
   3432    }
   3433
   3434    ret = rdma_accept(rdma->cm_id, &conn_param);
   3435    if (ret) {
   3436        error_report("rdma_accept returns %d", ret);
   3437        goto err_rdma_dest_wait;
   3438    }
   3439
   3440    ret = rdma_get_cm_event(rdma->channel, &cm_event);
   3441    if (ret) {
   3442        error_report("rdma_accept get_cm_event failed %d", ret);
   3443        goto err_rdma_dest_wait;
   3444    }
   3445
   3446    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
   3447        error_report("rdma_accept not event established");
   3448        rdma_ack_cm_event(cm_event);
   3449        goto err_rdma_dest_wait;
   3450    }
   3451
   3452    rdma_ack_cm_event(cm_event);
   3453    rdma->connected = true;
   3454
   3455    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
   3456    if (ret) {
   3457        error_report("rdma migration: error posting second control recv");
   3458        goto err_rdma_dest_wait;
   3459    }
   3460
   3461    qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
   3462
   3463    return 0;
   3464
   3465err_rdma_dest_wait:
   3466    rdma->error_state = ret;
   3467    qemu_rdma_cleanup(rdma);
   3468    g_free(rdma_return_path);
   3469    return ret;
   3470}
   3471
   3472static int dest_ram_sort_func(const void *a, const void *b)
   3473{
   3474    unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
   3475    unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
   3476
   3477    return (a_index < b_index) ? -1 : (a_index != b_index);
   3478}
   3479
   3480/*
   3481 * During each iteration of the migration, we listen for instructions
   3482 * by the source VM to perform dynamic page registrations before they
   3483 * can perform RDMA operations.
   3484 *
   3485 * We respond with the 'rkey'.
   3486 *
   3487 * Keep doing this until the source tells us to stop.
   3488 */
   3489static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
   3490{
   3491    RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
   3492                               .type = RDMA_CONTROL_REGISTER_RESULT,
   3493                               .repeat = 0,
   3494                             };
   3495    RDMAControlHeader unreg_resp = { .len = 0,
   3496                               .type = RDMA_CONTROL_UNREGISTER_FINISHED,
   3497                               .repeat = 0,
   3498                             };
   3499    RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
   3500                                 .repeat = 1 };
   3501    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
   3502    RDMAContext *rdma;
   3503    RDMALocalBlocks *local;
   3504    RDMAControlHeader head;
   3505    RDMARegister *reg, *registers;
   3506    RDMACompress *comp;
   3507    RDMARegisterResult *reg_result;
   3508    static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
   3509    RDMALocalBlock *block;
   3510    void *host_addr;
   3511    int ret = 0;
   3512    int idx = 0;
   3513    int count = 0;
   3514    int i = 0;
   3515
   3516    RCU_READ_LOCK_GUARD();
   3517    rdma = qatomic_rcu_read(&rioc->rdmain);
   3518
   3519    if (!rdma) {
   3520        return -EIO;
   3521    }
   3522
   3523    CHECK_ERROR_STATE();
   3524
   3525    local = &rdma->local_ram_blocks;
   3526    do {
   3527        trace_qemu_rdma_registration_handle_wait();
   3528
   3529        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
   3530
   3531        if (ret < 0) {
   3532            break;
   3533        }
   3534
   3535        if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
   3536            error_report("rdma: Too many requests in this message (%d)."
   3537                            "Bailing.", head.repeat);
   3538            ret = -EIO;
   3539            break;
   3540        }
   3541
   3542        switch (head.type) {
   3543        case RDMA_CONTROL_COMPRESS:
   3544            comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
   3545            network_to_compress(comp);
   3546
   3547            trace_qemu_rdma_registration_handle_compress(comp->length,
   3548                                                         comp->block_idx,
   3549                                                         comp->offset);
   3550            if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
   3551                error_report("rdma: 'compress' bad block index %u (vs %d)",
   3552                             (unsigned int)comp->block_idx,
   3553                             rdma->local_ram_blocks.nb_blocks);
   3554                ret = -EIO;
   3555                goto out;
   3556            }
   3557            block = &(rdma->local_ram_blocks.block[comp->block_idx]);
   3558
   3559            host_addr = block->local_host_addr +
   3560                            (comp->offset - block->offset);
   3561
   3562            ram_handle_compressed(host_addr, comp->value, comp->length);
   3563            break;
   3564
   3565        case RDMA_CONTROL_REGISTER_FINISHED:
   3566            trace_qemu_rdma_registration_handle_finished();
   3567            goto out;
   3568
   3569        case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
   3570            trace_qemu_rdma_registration_handle_ram_blocks();
   3571
   3572            /* Sort our local RAM Block list so it's the same as the source,
   3573             * we can do this since we've filled in a src_index in the list
   3574             * as we received the RAMBlock list earlier.
   3575             */
   3576            qsort(rdma->local_ram_blocks.block,
   3577                  rdma->local_ram_blocks.nb_blocks,
   3578                  sizeof(RDMALocalBlock), dest_ram_sort_func);
   3579            for (i = 0; i < local->nb_blocks; i++) {
   3580                local->block[i].index = i;
   3581            }
   3582
   3583            if (rdma->pin_all) {
   3584                ret = qemu_rdma_reg_whole_ram_blocks(rdma);
   3585                if (ret) {
   3586                    error_report("rdma migration: error dest "
   3587                                    "registering ram blocks");
   3588                    goto out;
   3589                }
   3590            }
   3591
   3592            /*
   3593             * Dest uses this to prepare to transmit the RAMBlock descriptions
   3594             * to the source VM after connection setup.
   3595             * Both sides use the "remote" structure to communicate and update
   3596             * their "local" descriptions with what was sent.
   3597             */
   3598            for (i = 0; i < local->nb_blocks; i++) {
   3599                rdma->dest_blocks[i].remote_host_addr =
   3600                    (uintptr_t)(local->block[i].local_host_addr);
   3601
   3602                if (rdma->pin_all) {
   3603                    rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
   3604                }
   3605
   3606                rdma->dest_blocks[i].offset = local->block[i].offset;
   3607                rdma->dest_blocks[i].length = local->block[i].length;
   3608
   3609                dest_block_to_network(&rdma->dest_blocks[i]);
   3610                trace_qemu_rdma_registration_handle_ram_blocks_loop(
   3611                    local->block[i].block_name,
   3612                    local->block[i].offset,
   3613                    local->block[i].length,
   3614                    local->block[i].local_host_addr,
   3615                    local->block[i].src_index);
   3616            }
   3617
   3618            blocks.len = rdma->local_ram_blocks.nb_blocks
   3619                                                * sizeof(RDMADestBlock);
   3620
   3621
   3622            ret = qemu_rdma_post_send_control(rdma,
   3623                                        (uint8_t *) rdma->dest_blocks, &blocks);
   3624
   3625            if (ret < 0) {
   3626                error_report("rdma migration: error sending remote info");
   3627                goto out;
   3628            }
   3629
   3630            break;
   3631        case RDMA_CONTROL_REGISTER_REQUEST:
   3632            trace_qemu_rdma_registration_handle_register(head.repeat);
   3633
   3634            reg_resp.repeat = head.repeat;
   3635            registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
   3636
   3637            for (count = 0; count < head.repeat; count++) {
   3638                uint64_t chunk;
   3639                uint8_t *chunk_start, *chunk_end;
   3640
   3641                reg = &registers[count];
   3642                network_to_register(reg);
   3643
   3644                reg_result = &results[count];
   3645
   3646                trace_qemu_rdma_registration_handle_register_loop(count,
   3647                         reg->current_index, reg->key.current_addr, reg->chunks);
   3648
   3649                if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
   3650                    error_report("rdma: 'register' bad block index %u (vs %d)",
   3651                                 (unsigned int)reg->current_index,
   3652                                 rdma->local_ram_blocks.nb_blocks);
   3653                    ret = -ENOENT;
   3654                    goto out;
   3655                }
   3656                block = &(rdma->local_ram_blocks.block[reg->current_index]);
   3657                if (block->is_ram_block) {
   3658                    if (block->offset > reg->key.current_addr) {
   3659                        error_report("rdma: bad register address for block %s"
   3660                            " offset: %" PRIx64 " current_addr: %" PRIx64,
   3661                            block->block_name, block->offset,
   3662                            reg->key.current_addr);
   3663                        ret = -ERANGE;
   3664                        goto out;
   3665                    }
   3666                    host_addr = (block->local_host_addr +
   3667                                (reg->key.current_addr - block->offset));
   3668                    chunk = ram_chunk_index(block->local_host_addr,
   3669                                            (uint8_t *) host_addr);
   3670                } else {
   3671                    chunk = reg->key.chunk;
   3672                    host_addr = block->local_host_addr +
   3673                        (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
   3674                    /* Check for particularly bad chunk value */
   3675                    if (host_addr < (void *)block->local_host_addr) {
   3676                        error_report("rdma: bad chunk for block %s"
   3677                            " chunk: %" PRIx64,
   3678                            block->block_name, reg->key.chunk);
   3679                        ret = -ERANGE;
   3680                        goto out;
   3681                    }
   3682                }
   3683                chunk_start = ram_chunk_start(block, chunk);
   3684                chunk_end = ram_chunk_end(block, chunk + reg->chunks);
   3685                /* avoid "-Waddress-of-packed-member" warning */
   3686                uint32_t tmp_rkey = 0;
   3687                if (qemu_rdma_register_and_get_keys(rdma, block,
   3688                            (uintptr_t)host_addr, NULL, &tmp_rkey,
   3689                            chunk, chunk_start, chunk_end)) {
   3690                    error_report("cannot get rkey");
   3691                    ret = -EINVAL;
   3692                    goto out;
   3693                }
   3694                reg_result->rkey = tmp_rkey;
   3695
   3696                reg_result->host_addr = (uintptr_t)block->local_host_addr;
   3697
   3698                trace_qemu_rdma_registration_handle_register_rkey(
   3699                                                           reg_result->rkey);
   3700
   3701                result_to_network(reg_result);
   3702            }
   3703
   3704            ret = qemu_rdma_post_send_control(rdma,
   3705                            (uint8_t *) results, &reg_resp);
   3706
   3707            if (ret < 0) {
   3708                error_report("Failed to send control buffer");
   3709                goto out;
   3710            }
   3711            break;
   3712        case RDMA_CONTROL_UNREGISTER_REQUEST:
   3713            trace_qemu_rdma_registration_handle_unregister(head.repeat);
   3714            unreg_resp.repeat = head.repeat;
   3715            registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
   3716
   3717            for (count = 0; count < head.repeat; count++) {
   3718                reg = &registers[count];
   3719                network_to_register(reg);
   3720
   3721                trace_qemu_rdma_registration_handle_unregister_loop(count,
   3722                           reg->current_index, reg->key.chunk);
   3723
   3724                block = &(rdma->local_ram_blocks.block[reg->current_index]);
   3725
   3726                ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
   3727                block->pmr[reg->key.chunk] = NULL;
   3728
   3729                if (ret != 0) {
   3730                    perror("rdma unregistration chunk failed");
   3731                    ret = -ret;
   3732                    goto out;
   3733                }
   3734
   3735                rdma->total_registrations--;
   3736
   3737                trace_qemu_rdma_registration_handle_unregister_success(
   3738                                                       reg->key.chunk);
   3739            }
   3740
   3741            ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
   3742
   3743            if (ret < 0) {
   3744                error_report("Failed to send control buffer");
   3745                goto out;
   3746            }
   3747            break;
   3748        case RDMA_CONTROL_REGISTER_RESULT:
   3749            error_report("Invalid RESULT message at dest.");
   3750            ret = -EIO;
   3751            goto out;
   3752        default:
   3753            error_report("Unknown control message %s", control_desc(head.type));
   3754            ret = -EIO;
   3755            goto out;
   3756        }
   3757    } while (1);
   3758out:
   3759    if (ret < 0) {
   3760        rdma->error_state = ret;
   3761    }
   3762    return ret;
   3763}
   3764
   3765/* Destination:
   3766 * Called via a ram_control_load_hook during the initial RAM load section which
   3767 * lists the RAMBlocks by name.  This lets us know the order of the RAMBlocks
   3768 * on the source.
   3769 * We've already built our local RAMBlock list, but not yet sent the list to
   3770 * the source.
   3771 */
   3772static int
   3773rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
   3774{
   3775    RDMAContext *rdma;
   3776    int curr;
   3777    int found = -1;
   3778
   3779    RCU_READ_LOCK_GUARD();
   3780    rdma = qatomic_rcu_read(&rioc->rdmain);
   3781
   3782    if (!rdma) {
   3783        return -EIO;
   3784    }
   3785
   3786    /* Find the matching RAMBlock in our local list */
   3787    for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
   3788        if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
   3789            found = curr;
   3790            break;
   3791        }
   3792    }
   3793
   3794    if (found == -1) {
   3795        error_report("RAMBlock '%s' not found on destination", name);
   3796        return -ENOENT;
   3797    }
   3798
   3799    rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
   3800    trace_rdma_block_notification_handle(name, rdma->next_src_index);
   3801    rdma->next_src_index++;
   3802
   3803    return 0;
   3804}
   3805
   3806static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
   3807{
   3808    switch (flags) {
   3809    case RAM_CONTROL_BLOCK_REG:
   3810        return rdma_block_notification_handle(opaque, data);
   3811
   3812    case RAM_CONTROL_HOOK:
   3813        return qemu_rdma_registration_handle(f, opaque);
   3814
   3815    default:
   3816        /* Shouldn't be called with any other values */
   3817        abort();
   3818    }
   3819}
   3820
   3821static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
   3822                                        uint64_t flags, void *data)
   3823{
   3824    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
   3825    RDMAContext *rdma;
   3826
   3827    RCU_READ_LOCK_GUARD();
   3828    rdma = qatomic_rcu_read(&rioc->rdmaout);
   3829    if (!rdma) {
   3830        return -EIO;
   3831    }
   3832
   3833    CHECK_ERROR_STATE();
   3834
   3835    if (migration_in_postcopy()) {
   3836        return 0;
   3837    }
   3838
   3839    trace_qemu_rdma_registration_start(flags);
   3840    qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
   3841    qemu_fflush(f);
   3842
   3843    return 0;
   3844}
   3845
   3846/*
   3847 * Inform dest that dynamic registrations are done for now.
   3848 * First, flush writes, if any.
   3849 */
   3850static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
   3851                                       uint64_t flags, void *data)
   3852{
   3853    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
   3854    RDMAContext *rdma;
   3855    RDMAControlHeader head = { .len = 0, .repeat = 1 };
   3856    int ret = 0;
   3857
   3858    RCU_READ_LOCK_GUARD();
   3859    rdma = qatomic_rcu_read(&rioc->rdmaout);
   3860    if (!rdma) {
   3861        return -EIO;
   3862    }
   3863
   3864    CHECK_ERROR_STATE();
   3865
   3866    if (migration_in_postcopy()) {
   3867        return 0;
   3868    }
   3869
   3870    qemu_fflush(f);
   3871    ret = qemu_rdma_drain_cq(f, rdma);
   3872
   3873    if (ret < 0) {
   3874        goto err;
   3875    }
   3876
   3877    if (flags == RAM_CONTROL_SETUP) {
   3878        RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
   3879        RDMALocalBlocks *local = &rdma->local_ram_blocks;
   3880        int reg_result_idx, i, nb_dest_blocks;
   3881
   3882        head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
   3883        trace_qemu_rdma_registration_stop_ram();
   3884
   3885        /*
   3886         * Make sure that we parallelize the pinning on both sides.
   3887         * For very large guests, doing this serially takes a really
   3888         * long time, so we have to 'interleave' the pinning locally
   3889         * with the control messages by performing the pinning on this
   3890         * side before we receive the control response from the other
   3891         * side that the pinning has completed.
   3892         */
   3893        ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
   3894                    &reg_result_idx, rdma->pin_all ?
   3895                    qemu_rdma_reg_whole_ram_blocks : NULL);
   3896        if (ret < 0) {
   3897            fprintf(stderr, "receiving remote info!");
   3898            return ret;
   3899        }
   3900
   3901        nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
   3902
   3903        /*
   3904         * The protocol uses two different sets of rkeys (mutually exclusive):
   3905         * 1. One key to represent the virtual address of the entire ram block.
   3906         *    (dynamic chunk registration disabled - pin everything with one rkey.)
   3907         * 2. One to represent individual chunks within a ram block.
   3908         *    (dynamic chunk registration enabled - pin individual chunks.)
   3909         *
   3910         * Once the capability is successfully negotiated, the destination transmits
   3911         * the keys to use (or sends them later) including the virtual addresses
   3912         * and then propagates the remote ram block descriptions to his local copy.
   3913         */
   3914
   3915        if (local->nb_blocks != nb_dest_blocks) {
   3916            fprintf(stderr, "ram blocks mismatch (Number of blocks %d vs %d) "
   3917                    "Your QEMU command line parameters are probably "
   3918                    "not identical on both the source and destination.",
   3919                    local->nb_blocks, nb_dest_blocks);
   3920            rdma->error_state = -EINVAL;
   3921            return -EINVAL;
   3922        }
   3923
   3924        qemu_rdma_move_header(rdma, reg_result_idx, &resp);
   3925        memcpy(rdma->dest_blocks,
   3926            rdma->wr_data[reg_result_idx].control_curr, resp.len);
   3927        for (i = 0; i < nb_dest_blocks; i++) {
   3928            network_to_dest_block(&rdma->dest_blocks[i]);
   3929
   3930            /* We require that the blocks are in the same order */
   3931            if (rdma->dest_blocks[i].length != local->block[i].length) {
   3932                fprintf(stderr, "Block %s/%d has a different length %" PRIu64
   3933                        "vs %" PRIu64, local->block[i].block_name, i,
   3934                        local->block[i].length,
   3935                        rdma->dest_blocks[i].length);
   3936                rdma->error_state = -EINVAL;
   3937                return -EINVAL;
   3938            }
   3939            local->block[i].remote_host_addr =
   3940                    rdma->dest_blocks[i].remote_host_addr;
   3941            local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
   3942        }
   3943    }
   3944
   3945    trace_qemu_rdma_registration_stop(flags);
   3946
   3947    head.type = RDMA_CONTROL_REGISTER_FINISHED;
   3948    ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
   3949
   3950    if (ret < 0) {
   3951        goto err;
   3952    }
   3953
   3954    return 0;
   3955err:
   3956    rdma->error_state = ret;
   3957    return ret;
   3958}
   3959
   3960static const QEMUFileHooks rdma_read_hooks = {
   3961    .hook_ram_load = rdma_load_hook,
   3962};
   3963
   3964static const QEMUFileHooks rdma_write_hooks = {
   3965    .before_ram_iterate = qemu_rdma_registration_start,
   3966    .after_ram_iterate  = qemu_rdma_registration_stop,
   3967    .save_page          = qemu_rdma_save_page,
   3968};
   3969
   3970
   3971static void qio_channel_rdma_finalize(Object *obj)
   3972{
   3973    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
   3974    if (rioc->rdmain) {
   3975        qemu_rdma_cleanup(rioc->rdmain);
   3976        g_free(rioc->rdmain);
   3977        rioc->rdmain = NULL;
   3978    }
   3979    if (rioc->rdmaout) {
   3980        qemu_rdma_cleanup(rioc->rdmaout);
   3981        g_free(rioc->rdmaout);
   3982        rioc->rdmaout = NULL;
   3983    }
   3984}
   3985
   3986static void qio_channel_rdma_class_init(ObjectClass *klass,
   3987                                        void *class_data G_GNUC_UNUSED)
   3988{
   3989    QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
   3990
   3991    ioc_klass->io_writev = qio_channel_rdma_writev;
   3992    ioc_klass->io_readv = qio_channel_rdma_readv;
   3993    ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
   3994    ioc_klass->io_close = qio_channel_rdma_close;
   3995    ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
   3996    ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
   3997    ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
   3998}
   3999
   4000static const TypeInfo qio_channel_rdma_info = {
   4001    .parent = TYPE_QIO_CHANNEL,
   4002    .name = TYPE_QIO_CHANNEL_RDMA,
   4003    .instance_size = sizeof(QIOChannelRDMA),
   4004    .instance_finalize = qio_channel_rdma_finalize,
   4005    .class_init = qio_channel_rdma_class_init,
   4006};
   4007
   4008static void qio_channel_rdma_register_types(void)
   4009{
   4010    type_register_static(&qio_channel_rdma_info);
   4011}
   4012
   4013type_init(qio_channel_rdma_register_types);
   4014
   4015static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
   4016{
   4017    QIOChannelRDMA *rioc;
   4018
   4019    if (qemu_file_mode_is_not_valid(mode)) {
   4020        return NULL;
   4021    }
   4022
   4023    rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
   4024
   4025    if (mode[0] == 'w') {
   4026        rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
   4027        rioc->rdmaout = rdma;
   4028        rioc->rdmain = rdma->return_path;
   4029        qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
   4030    } else {
   4031        rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
   4032        rioc->rdmain = rdma;
   4033        rioc->rdmaout = rdma->return_path;
   4034        qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
   4035    }
   4036
   4037    return rioc->file;
   4038}
   4039
   4040static void rdma_accept_incoming_migration(void *opaque)
   4041{
   4042    RDMAContext *rdma = opaque;
   4043    int ret;
   4044    QEMUFile *f;
   4045    Error *local_err = NULL;
   4046
   4047    trace_qemu_rdma_accept_incoming_migration();
   4048    ret = qemu_rdma_accept(rdma);
   4049
   4050    if (ret) {
   4051        fprintf(stderr, "RDMA ERROR: Migration initialization failed\n");
   4052        return;
   4053    }
   4054
   4055    trace_qemu_rdma_accept_incoming_migration_accepted();
   4056
   4057    if (rdma->is_return_path) {
   4058        return;
   4059    }
   4060
   4061    f = qemu_fopen_rdma(rdma, "rb");
   4062    if (f == NULL) {
   4063        fprintf(stderr, "RDMA ERROR: could not qemu_fopen_rdma\n");
   4064        qemu_rdma_cleanup(rdma);
   4065        return;
   4066    }
   4067
   4068    rdma->migration_started_on_destination = 1;
   4069    migration_fd_process_incoming(f, &local_err);
   4070    if (local_err) {
   4071        error_reportf_err(local_err, "RDMA ERROR:");
   4072    }
   4073}
   4074
   4075void rdma_start_incoming_migration(const char *host_port, Error **errp)
   4076{
   4077    int ret;
   4078    RDMAContext *rdma, *rdma_return_path = NULL;
   4079    Error *local_err = NULL;
   4080
   4081    trace_rdma_start_incoming_migration();
   4082
   4083    /* Avoid ram_block_discard_disable(), cannot change during migration. */
   4084    if (ram_block_discard_is_required()) {
   4085        error_setg(errp, "RDMA: cannot disable RAM discard");
   4086        return;
   4087    }
   4088
   4089    rdma = qemu_rdma_data_init(host_port, &local_err);
   4090    if (rdma == NULL) {
   4091        goto err;
   4092    }
   4093
   4094    ret = qemu_rdma_dest_init(rdma, &local_err);
   4095
   4096    if (ret) {
   4097        goto err;
   4098    }
   4099
   4100    trace_rdma_start_incoming_migration_after_dest_init();
   4101
   4102    ret = rdma_listen(rdma->listen_id, 5);
   4103
   4104    if (ret) {
   4105        ERROR(errp, "listening on socket!");
   4106        goto cleanup_rdma;
   4107    }
   4108
   4109    trace_rdma_start_incoming_migration_after_rdma_listen();
   4110
   4111    qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
   4112                        NULL, (void *)(intptr_t)rdma);
   4113    return;
   4114
   4115cleanup_rdma:
   4116    qemu_rdma_cleanup(rdma);
   4117err:
   4118    error_propagate(errp, local_err);
   4119    if (rdma) {
   4120        g_free(rdma->host);
   4121        g_free(rdma->host_port);
   4122    }
   4123    g_free(rdma);
   4124    g_free(rdma_return_path);
   4125}
   4126
   4127void rdma_start_outgoing_migration(void *opaque,
   4128                            const char *host_port, Error **errp)
   4129{
   4130    MigrationState *s = opaque;
   4131    RDMAContext *rdma_return_path = NULL;
   4132    RDMAContext *rdma;
   4133    int ret = 0;
   4134
   4135    /* Avoid ram_block_discard_disable(), cannot change during migration. */
   4136    if (ram_block_discard_is_required()) {
   4137        error_setg(errp, "RDMA: cannot disable RAM discard");
   4138        return;
   4139    }
   4140
   4141    rdma = qemu_rdma_data_init(host_port, errp);
   4142    if (rdma == NULL) {
   4143        goto err;
   4144    }
   4145
   4146    ret = qemu_rdma_source_init(rdma,
   4147        s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
   4148
   4149    if (ret) {
   4150        goto err;
   4151    }
   4152
   4153    trace_rdma_start_outgoing_migration_after_rdma_source_init();
   4154    ret = qemu_rdma_connect(rdma, errp, false);
   4155
   4156    if (ret) {
   4157        goto err;
   4158    }
   4159
   4160    /* RDMA postcopy need a separate queue pair for return path */
   4161    if (migrate_postcopy()) {
   4162        rdma_return_path = qemu_rdma_data_init(host_port, errp);
   4163
   4164        if (rdma_return_path == NULL) {
   4165            goto return_path_err;
   4166        }
   4167
   4168        ret = qemu_rdma_source_init(rdma_return_path,
   4169            s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
   4170
   4171        if (ret) {
   4172            goto return_path_err;
   4173        }
   4174
   4175        ret = qemu_rdma_connect(rdma_return_path, errp, true);
   4176
   4177        if (ret) {
   4178            goto return_path_err;
   4179        }
   4180
   4181        rdma->return_path = rdma_return_path;
   4182        rdma_return_path->return_path = rdma;
   4183        rdma_return_path->is_return_path = true;
   4184    }
   4185
   4186    trace_rdma_start_outgoing_migration_after_rdma_connect();
   4187
   4188    s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
   4189    migrate_fd_connect(s, NULL);
   4190    return;
   4191return_path_err:
   4192    qemu_rdma_cleanup(rdma);
   4193err:
   4194    g_free(rdma);
   4195    g_free(rdma_return_path);
   4196}