rdma.c (129923B)
1/* 2 * RDMA protocol and interfaces 3 * 4 * Copyright IBM, Corp. 2010-2013 5 * Copyright Red Hat, Inc. 2015-2016 6 * 7 * Authors: 8 * Michael R. Hines <mrhines@us.ibm.com> 9 * Jiuxing Liu <jl@us.ibm.com> 10 * Daniel P. Berrange <berrange@redhat.com> 11 * 12 * This work is licensed under the terms of the GNU GPL, version 2 or 13 * later. See the COPYING file in the top-level directory. 14 * 15 */ 16 17#include "qemu/osdep.h" 18#include "qapi/error.h" 19#include "qemu/cutils.h" 20#include "rdma.h" 21#include "migration.h" 22#include "qemu-file.h" 23#include "ram.h" 24#include "qemu-file-channel.h" 25#include "qemu/error-report.h" 26#include "qemu/main-loop.h" 27#include "qemu/module.h" 28#include "qemu/rcu.h" 29#include "qemu/sockets.h" 30#include "qemu/bitmap.h" 31#include "qemu/coroutine.h" 32#include "exec/memory.h" 33#include <sys/socket.h> 34#include <netdb.h> 35#include <arpa/inet.h> 36#include <rdma/rdma_cma.h> 37#include "trace.h" 38#include "qom/object.h" 39#include <poll.h> 40 41/* 42 * Print and error on both the Monitor and the Log file. 43 */ 44#define ERROR(errp, fmt, ...) \ 45 do { \ 46 fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \ 47 if (errp && (*(errp) == NULL)) { \ 48 error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \ 49 } \ 50 } while (0) 51 52#define RDMA_RESOLVE_TIMEOUT_MS 10000 53 54/* Do not merge data if larger than this. */ 55#define RDMA_MERGE_MAX (2 * 1024 * 1024) 56#define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096) 57 58#define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */ 59 60/* 61 * This is only for non-live state being migrated. 62 * Instead of RDMA_WRITE messages, we use RDMA_SEND 63 * messages for that state, which requires a different 64 * delivery design than main memory. 65 */ 66#define RDMA_SEND_INCREMENT 32768 67 68/* 69 * Maximum size infiniband SEND message 70 */ 71#define RDMA_CONTROL_MAX_BUFFER (512 * 1024) 72#define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096 73 74#define RDMA_CONTROL_VERSION_CURRENT 1 75/* 76 * Capabilities for negotiation. 77 */ 78#define RDMA_CAPABILITY_PIN_ALL 0x01 79 80/* 81 * Add the other flags above to this list of known capabilities 82 * as they are introduced. 83 */ 84static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL; 85 86#define CHECK_ERROR_STATE() \ 87 do { \ 88 if (rdma->error_state) { \ 89 if (!rdma->error_reported) { \ 90 error_report("RDMA is in an error state waiting migration" \ 91 " to abort!"); \ 92 rdma->error_reported = 1; \ 93 } \ 94 return rdma->error_state; \ 95 } \ 96 } while (0) 97 98/* 99 * A work request ID is 64-bits and we split up these bits 100 * into 3 parts: 101 * 102 * bits 0-15 : type of control message, 2^16 103 * bits 16-29: ram block index, 2^14 104 * bits 30-63: ram block chunk number, 2^34 105 * 106 * The last two bit ranges are only used for RDMA writes, 107 * in order to track their completion and potentially 108 * also track unregistration status of the message. 109 */ 110#define RDMA_WRID_TYPE_SHIFT 0UL 111#define RDMA_WRID_BLOCK_SHIFT 16UL 112#define RDMA_WRID_CHUNK_SHIFT 30UL 113 114#define RDMA_WRID_TYPE_MASK \ 115 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL) 116 117#define RDMA_WRID_BLOCK_MASK \ 118 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL)) 119 120#define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK) 121 122/* 123 * RDMA migration protocol: 124 * 1. RDMA Writes (data messages, i.e. RAM) 125 * 2. IB Send/Recv (control channel messages) 126 */ 127enum { 128 RDMA_WRID_NONE = 0, 129 RDMA_WRID_RDMA_WRITE = 1, 130 RDMA_WRID_SEND_CONTROL = 2000, 131 RDMA_WRID_RECV_CONTROL = 4000, 132}; 133 134static const char *wrid_desc[] = { 135 [RDMA_WRID_NONE] = "NONE", 136 [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA", 137 [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND", 138 [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV", 139}; 140 141/* 142 * Work request IDs for IB SEND messages only (not RDMA writes). 143 * This is used by the migration protocol to transmit 144 * control messages (such as device state and registration commands) 145 * 146 * We could use more WRs, but we have enough for now. 147 */ 148enum { 149 RDMA_WRID_READY = 0, 150 RDMA_WRID_DATA, 151 RDMA_WRID_CONTROL, 152 RDMA_WRID_MAX, 153}; 154 155/* 156 * SEND/RECV IB Control Messages. 157 */ 158enum { 159 RDMA_CONTROL_NONE = 0, 160 RDMA_CONTROL_ERROR, 161 RDMA_CONTROL_READY, /* ready to receive */ 162 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */ 163 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */ 164 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */ 165 RDMA_CONTROL_COMPRESS, /* page contains repeat values */ 166 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */ 167 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */ 168 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */ 169 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */ 170 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */ 171}; 172 173 174/* 175 * Memory and MR structures used to represent an IB Send/Recv work request. 176 * This is *not* used for RDMA writes, only IB Send/Recv. 177 */ 178typedef struct { 179 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */ 180 struct ibv_mr *control_mr; /* registration metadata */ 181 size_t control_len; /* length of the message */ 182 uint8_t *control_curr; /* start of unconsumed bytes */ 183} RDMAWorkRequestData; 184 185/* 186 * Negotiate RDMA capabilities during connection-setup time. 187 */ 188typedef struct { 189 uint32_t version; 190 uint32_t flags; 191} RDMACapabilities; 192 193static void caps_to_network(RDMACapabilities *cap) 194{ 195 cap->version = htonl(cap->version); 196 cap->flags = htonl(cap->flags); 197} 198 199static void network_to_caps(RDMACapabilities *cap) 200{ 201 cap->version = ntohl(cap->version); 202 cap->flags = ntohl(cap->flags); 203} 204 205/* 206 * Representation of a RAMBlock from an RDMA perspective. 207 * This is not transmitted, only local. 208 * This and subsequent structures cannot be linked lists 209 * because we're using a single IB message to transmit 210 * the information. It's small anyway, so a list is overkill. 211 */ 212typedef struct RDMALocalBlock { 213 char *block_name; 214 uint8_t *local_host_addr; /* local virtual address */ 215 uint64_t remote_host_addr; /* remote virtual address */ 216 uint64_t offset; 217 uint64_t length; 218 struct ibv_mr **pmr; /* MRs for chunk-level registration */ 219 struct ibv_mr *mr; /* MR for non-chunk-level registration */ 220 uint32_t *remote_keys; /* rkeys for chunk-level registration */ 221 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */ 222 int index; /* which block are we */ 223 unsigned int src_index; /* (Only used on dest) */ 224 bool is_ram_block; 225 int nb_chunks; 226 unsigned long *transit_bitmap; 227 unsigned long *unregister_bitmap; 228} RDMALocalBlock; 229 230/* 231 * Also represents a RAMblock, but only on the dest. 232 * This gets transmitted by the dest during connection-time 233 * to the source VM and then is used to populate the 234 * corresponding RDMALocalBlock with 235 * the information needed to perform the actual RDMA. 236 */ 237typedef struct QEMU_PACKED RDMADestBlock { 238 uint64_t remote_host_addr; 239 uint64_t offset; 240 uint64_t length; 241 uint32_t remote_rkey; 242 uint32_t padding; 243} RDMADestBlock; 244 245static const char *control_desc(unsigned int rdma_control) 246{ 247 static const char *strs[] = { 248 [RDMA_CONTROL_NONE] = "NONE", 249 [RDMA_CONTROL_ERROR] = "ERROR", 250 [RDMA_CONTROL_READY] = "READY", 251 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE", 252 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST", 253 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT", 254 [RDMA_CONTROL_COMPRESS] = "COMPRESS", 255 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST", 256 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT", 257 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED", 258 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST", 259 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED", 260 }; 261 262 if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) { 263 return "??BAD CONTROL VALUE??"; 264 } 265 266 return strs[rdma_control]; 267} 268 269static uint64_t htonll(uint64_t v) 270{ 271 union { uint32_t lv[2]; uint64_t llv; } u; 272 u.lv[0] = htonl(v >> 32); 273 u.lv[1] = htonl(v & 0xFFFFFFFFULL); 274 return u.llv; 275} 276 277static uint64_t ntohll(uint64_t v) 278{ 279 union { uint32_t lv[2]; uint64_t llv; } u; 280 u.llv = v; 281 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]); 282} 283 284static void dest_block_to_network(RDMADestBlock *db) 285{ 286 db->remote_host_addr = htonll(db->remote_host_addr); 287 db->offset = htonll(db->offset); 288 db->length = htonll(db->length); 289 db->remote_rkey = htonl(db->remote_rkey); 290} 291 292static void network_to_dest_block(RDMADestBlock *db) 293{ 294 db->remote_host_addr = ntohll(db->remote_host_addr); 295 db->offset = ntohll(db->offset); 296 db->length = ntohll(db->length); 297 db->remote_rkey = ntohl(db->remote_rkey); 298} 299 300/* 301 * Virtual address of the above structures used for transmitting 302 * the RAMBlock descriptions at connection-time. 303 * This structure is *not* transmitted. 304 */ 305typedef struct RDMALocalBlocks { 306 int nb_blocks; 307 bool init; /* main memory init complete */ 308 RDMALocalBlock *block; 309} RDMALocalBlocks; 310 311/* 312 * Main data structure for RDMA state. 313 * While there is only one copy of this structure being allocated right now, 314 * this is the place where one would start if you wanted to consider 315 * having more than one RDMA connection open at the same time. 316 */ 317typedef struct RDMAContext { 318 char *host; 319 int port; 320 char *host_port; 321 322 RDMAWorkRequestData wr_data[RDMA_WRID_MAX]; 323 324 /* 325 * This is used by *_exchange_send() to figure out whether or not 326 * the initial "READY" message has already been received or not. 327 * This is because other functions may potentially poll() and detect 328 * the READY message before send() does, in which case we need to 329 * know if it completed. 330 */ 331 int control_ready_expected; 332 333 /* number of outstanding writes */ 334 int nb_sent; 335 336 /* store info about current buffer so that we can 337 merge it with future sends */ 338 uint64_t current_addr; 339 uint64_t current_length; 340 /* index of ram block the current buffer belongs to */ 341 int current_index; 342 /* index of the chunk in the current ram block */ 343 int current_chunk; 344 345 bool pin_all; 346 347 /* 348 * infiniband-specific variables for opening the device 349 * and maintaining connection state and so forth. 350 * 351 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in 352 * cm_id->verbs, cm_id->channel, and cm_id->qp. 353 */ 354 struct rdma_cm_id *cm_id; /* connection manager ID */ 355 struct rdma_cm_id *listen_id; 356 bool connected; 357 358 struct ibv_context *verbs; 359 struct rdma_event_channel *channel; 360 struct ibv_qp *qp; /* queue pair */ 361 struct ibv_comp_channel *comp_channel; /* completion channel */ 362 struct ibv_pd *pd; /* protection domain */ 363 struct ibv_cq *cq; /* completion queue */ 364 365 /* 366 * If a previous write failed (perhaps because of a failed 367 * memory registration, then do not attempt any future work 368 * and remember the error state. 369 */ 370 int error_state; 371 int error_reported; 372 int received_error; 373 374 /* 375 * Description of ram blocks used throughout the code. 376 */ 377 RDMALocalBlocks local_ram_blocks; 378 RDMADestBlock *dest_blocks; 379 380 /* Index of the next RAMBlock received during block registration */ 381 unsigned int next_src_index; 382 383 /* 384 * Migration on *destination* started. 385 * Then use coroutine yield function. 386 * Source runs in a thread, so we don't care. 387 */ 388 int migration_started_on_destination; 389 390 int total_registrations; 391 int total_writes; 392 393 int unregister_current, unregister_next; 394 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX]; 395 396 GHashTable *blockmap; 397 398 /* the RDMAContext for return path */ 399 struct RDMAContext *return_path; 400 bool is_return_path; 401} RDMAContext; 402 403#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma" 404OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA) 405 406 407 408struct QIOChannelRDMA { 409 QIOChannel parent; 410 RDMAContext *rdmain; 411 RDMAContext *rdmaout; 412 QEMUFile *file; 413 bool blocking; /* XXX we don't actually honour this yet */ 414}; 415 416/* 417 * Main structure for IB Send/Recv control messages. 418 * This gets prepended at the beginning of every Send/Recv. 419 */ 420typedef struct QEMU_PACKED { 421 uint32_t len; /* Total length of data portion */ 422 uint32_t type; /* which control command to perform */ 423 uint32_t repeat; /* number of commands in data portion of same type */ 424 uint32_t padding; 425} RDMAControlHeader; 426 427static void control_to_network(RDMAControlHeader *control) 428{ 429 control->type = htonl(control->type); 430 control->len = htonl(control->len); 431 control->repeat = htonl(control->repeat); 432} 433 434static void network_to_control(RDMAControlHeader *control) 435{ 436 control->type = ntohl(control->type); 437 control->len = ntohl(control->len); 438 control->repeat = ntohl(control->repeat); 439} 440 441/* 442 * Register a single Chunk. 443 * Information sent by the source VM to inform the dest 444 * to register an single chunk of memory before we can perform 445 * the actual RDMA operation. 446 */ 447typedef struct QEMU_PACKED { 448 union QEMU_PACKED { 449 uint64_t current_addr; /* offset into the ram_addr_t space */ 450 uint64_t chunk; /* chunk to lookup if unregistering */ 451 } key; 452 uint32_t current_index; /* which ramblock the chunk belongs to */ 453 uint32_t padding; 454 uint64_t chunks; /* how many sequential chunks to register */ 455} RDMARegister; 456 457static void register_to_network(RDMAContext *rdma, RDMARegister *reg) 458{ 459 RDMALocalBlock *local_block; 460 local_block = &rdma->local_ram_blocks.block[reg->current_index]; 461 462 if (local_block->is_ram_block) { 463 /* 464 * current_addr as passed in is an address in the local ram_addr_t 465 * space, we need to translate this for the destination 466 */ 467 reg->key.current_addr -= local_block->offset; 468 reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset; 469 } 470 reg->key.current_addr = htonll(reg->key.current_addr); 471 reg->current_index = htonl(reg->current_index); 472 reg->chunks = htonll(reg->chunks); 473} 474 475static void network_to_register(RDMARegister *reg) 476{ 477 reg->key.current_addr = ntohll(reg->key.current_addr); 478 reg->current_index = ntohl(reg->current_index); 479 reg->chunks = ntohll(reg->chunks); 480} 481 482typedef struct QEMU_PACKED { 483 uint32_t value; /* if zero, we will madvise() */ 484 uint32_t block_idx; /* which ram block index */ 485 uint64_t offset; /* Address in remote ram_addr_t space */ 486 uint64_t length; /* length of the chunk */ 487} RDMACompress; 488 489static void compress_to_network(RDMAContext *rdma, RDMACompress *comp) 490{ 491 comp->value = htonl(comp->value); 492 /* 493 * comp->offset as passed in is an address in the local ram_addr_t 494 * space, we need to translate this for the destination 495 */ 496 comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset; 497 comp->offset += rdma->dest_blocks[comp->block_idx].offset; 498 comp->block_idx = htonl(comp->block_idx); 499 comp->offset = htonll(comp->offset); 500 comp->length = htonll(comp->length); 501} 502 503static void network_to_compress(RDMACompress *comp) 504{ 505 comp->value = ntohl(comp->value); 506 comp->block_idx = ntohl(comp->block_idx); 507 comp->offset = ntohll(comp->offset); 508 comp->length = ntohll(comp->length); 509} 510 511/* 512 * The result of the dest's memory registration produces an "rkey" 513 * which the source VM must reference in order to perform 514 * the RDMA operation. 515 */ 516typedef struct QEMU_PACKED { 517 uint32_t rkey; 518 uint32_t padding; 519 uint64_t host_addr; 520} RDMARegisterResult; 521 522static void result_to_network(RDMARegisterResult *result) 523{ 524 result->rkey = htonl(result->rkey); 525 result->host_addr = htonll(result->host_addr); 526}; 527 528static void network_to_result(RDMARegisterResult *result) 529{ 530 result->rkey = ntohl(result->rkey); 531 result->host_addr = ntohll(result->host_addr); 532}; 533 534const char *print_wrid(int wrid); 535static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 536 uint8_t *data, RDMAControlHeader *resp, 537 int *resp_idx, 538 int (*callback)(RDMAContext *rdma)); 539 540static inline uint64_t ram_chunk_index(const uint8_t *start, 541 const uint8_t *host) 542{ 543 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT; 544} 545 546static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block, 547 uint64_t i) 548{ 549 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr + 550 (i << RDMA_REG_CHUNK_SHIFT)); 551} 552 553static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block, 554 uint64_t i) 555{ 556 uint8_t *result = ram_chunk_start(rdma_ram_block, i) + 557 (1UL << RDMA_REG_CHUNK_SHIFT); 558 559 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) { 560 result = rdma_ram_block->local_host_addr + rdma_ram_block->length; 561 } 562 563 return result; 564} 565 566static int rdma_add_block(RDMAContext *rdma, const char *block_name, 567 void *host_addr, 568 ram_addr_t block_offset, uint64_t length) 569{ 570 RDMALocalBlocks *local = &rdma->local_ram_blocks; 571 RDMALocalBlock *block; 572 RDMALocalBlock *old = local->block; 573 574 local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1); 575 576 if (local->nb_blocks) { 577 int x; 578 579 if (rdma->blockmap) { 580 for (x = 0; x < local->nb_blocks; x++) { 581 g_hash_table_remove(rdma->blockmap, 582 (void *)(uintptr_t)old[x].offset); 583 g_hash_table_insert(rdma->blockmap, 584 (void *)(uintptr_t)old[x].offset, 585 &local->block[x]); 586 } 587 } 588 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks); 589 g_free(old); 590 } 591 592 block = &local->block[local->nb_blocks]; 593 594 block->block_name = g_strdup(block_name); 595 block->local_host_addr = host_addr; 596 block->offset = block_offset; 597 block->length = length; 598 block->index = local->nb_blocks; 599 block->src_index = ~0U; /* Filled in by the receipt of the block list */ 600 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL; 601 block->transit_bitmap = bitmap_new(block->nb_chunks); 602 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks); 603 block->unregister_bitmap = bitmap_new(block->nb_chunks); 604 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks); 605 block->remote_keys = g_new0(uint32_t, block->nb_chunks); 606 607 block->is_ram_block = local->init ? false : true; 608 609 if (rdma->blockmap) { 610 g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block); 611 } 612 613 trace_rdma_add_block(block_name, local->nb_blocks, 614 (uintptr_t) block->local_host_addr, 615 block->offset, block->length, 616 (uintptr_t) (block->local_host_addr + block->length), 617 BITS_TO_LONGS(block->nb_chunks) * 618 sizeof(unsigned long) * 8, 619 block->nb_chunks); 620 621 local->nb_blocks++; 622 623 return 0; 624} 625 626/* 627 * Memory regions need to be registered with the device and queue pairs setup 628 * in advanced before the migration starts. This tells us where the RAM blocks 629 * are so that we can register them individually. 630 */ 631static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque) 632{ 633 const char *block_name = qemu_ram_get_idstr(rb); 634 void *host_addr = qemu_ram_get_host_addr(rb); 635 ram_addr_t block_offset = qemu_ram_get_offset(rb); 636 ram_addr_t length = qemu_ram_get_used_length(rb); 637 return rdma_add_block(opaque, block_name, host_addr, block_offset, length); 638} 639 640/* 641 * Identify the RAMBlocks and their quantity. They will be references to 642 * identify chunk boundaries inside each RAMBlock and also be referenced 643 * during dynamic page registration. 644 */ 645static int qemu_rdma_init_ram_blocks(RDMAContext *rdma) 646{ 647 RDMALocalBlocks *local = &rdma->local_ram_blocks; 648 int ret; 649 650 assert(rdma->blockmap == NULL); 651 memset(local, 0, sizeof *local); 652 ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma); 653 if (ret) { 654 return ret; 655 } 656 trace_qemu_rdma_init_ram_blocks(local->nb_blocks); 657 rdma->dest_blocks = g_new0(RDMADestBlock, 658 rdma->local_ram_blocks.nb_blocks); 659 local->init = true; 660 return 0; 661} 662 663/* 664 * Note: If used outside of cleanup, the caller must ensure that the destination 665 * block structures are also updated 666 */ 667static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block) 668{ 669 RDMALocalBlocks *local = &rdma->local_ram_blocks; 670 RDMALocalBlock *old = local->block; 671 int x; 672 673 if (rdma->blockmap) { 674 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset); 675 } 676 if (block->pmr) { 677 int j; 678 679 for (j = 0; j < block->nb_chunks; j++) { 680 if (!block->pmr[j]) { 681 continue; 682 } 683 ibv_dereg_mr(block->pmr[j]); 684 rdma->total_registrations--; 685 } 686 g_free(block->pmr); 687 block->pmr = NULL; 688 } 689 690 if (block->mr) { 691 ibv_dereg_mr(block->mr); 692 rdma->total_registrations--; 693 block->mr = NULL; 694 } 695 696 g_free(block->transit_bitmap); 697 block->transit_bitmap = NULL; 698 699 g_free(block->unregister_bitmap); 700 block->unregister_bitmap = NULL; 701 702 g_free(block->remote_keys); 703 block->remote_keys = NULL; 704 705 g_free(block->block_name); 706 block->block_name = NULL; 707 708 if (rdma->blockmap) { 709 for (x = 0; x < local->nb_blocks; x++) { 710 g_hash_table_remove(rdma->blockmap, 711 (void *)(uintptr_t)old[x].offset); 712 } 713 } 714 715 if (local->nb_blocks > 1) { 716 717 local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1); 718 719 if (block->index) { 720 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index); 721 } 722 723 if (block->index < (local->nb_blocks - 1)) { 724 memcpy(local->block + block->index, old + (block->index + 1), 725 sizeof(RDMALocalBlock) * 726 (local->nb_blocks - (block->index + 1))); 727 for (x = block->index; x < local->nb_blocks - 1; x++) { 728 local->block[x].index--; 729 } 730 } 731 } else { 732 assert(block == local->block); 733 local->block = NULL; 734 } 735 736 trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr, 737 block->offset, block->length, 738 (uintptr_t)(block->local_host_addr + block->length), 739 BITS_TO_LONGS(block->nb_chunks) * 740 sizeof(unsigned long) * 8, block->nb_chunks); 741 742 g_free(old); 743 744 local->nb_blocks--; 745 746 if (local->nb_blocks && rdma->blockmap) { 747 for (x = 0; x < local->nb_blocks; x++) { 748 g_hash_table_insert(rdma->blockmap, 749 (void *)(uintptr_t)local->block[x].offset, 750 &local->block[x]); 751 } 752 } 753 754 return 0; 755} 756 757/* 758 * Put in the log file which RDMA device was opened and the details 759 * associated with that device. 760 */ 761static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs) 762{ 763 struct ibv_port_attr port; 764 765 if (ibv_query_port(verbs, 1, &port)) { 766 error_report("Failed to query port information"); 767 return; 768 } 769 770 printf("%s RDMA Device opened: kernel name %s " 771 "uverbs device name %s, " 772 "infiniband_verbs class device path %s, " 773 "infiniband class device path %s, " 774 "transport: (%d) %s\n", 775 who, 776 verbs->device->name, 777 verbs->device->dev_name, 778 verbs->device->dev_path, 779 verbs->device->ibdev_path, 780 port.link_layer, 781 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" : 782 ((port.link_layer == IBV_LINK_LAYER_ETHERNET) 783 ? "Ethernet" : "Unknown")); 784} 785 786/* 787 * Put in the log file the RDMA gid addressing information, 788 * useful for folks who have trouble understanding the 789 * RDMA device hierarchy in the kernel. 790 */ 791static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id) 792{ 793 char sgid[33]; 794 char dgid[33]; 795 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid); 796 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid); 797 trace_qemu_rdma_dump_gid(who, sgid, dgid); 798} 799 800/* 801 * As of now, IPv6 over RoCE / iWARP is not supported by linux. 802 * We will try the next addrinfo struct, and fail if there are 803 * no other valid addresses to bind against. 804 * 805 * If user is listening on '[::]', then we will not have a opened a device 806 * yet and have no way of verifying if the device is RoCE or not. 807 * 808 * In this case, the source VM will throw an error for ALL types of 809 * connections (both IPv4 and IPv6) if the destination machine does not have 810 * a regular infiniband network available for use. 811 * 812 * The only way to guarantee that an error is thrown for broken kernels is 813 * for the management software to choose a *specific* interface at bind time 814 * and validate what time of hardware it is. 815 * 816 * Unfortunately, this puts the user in a fix: 817 * 818 * If the source VM connects with an IPv4 address without knowing that the 819 * destination has bound to '[::]' the migration will unconditionally fail 820 * unless the management software is explicitly listening on the IPv4 821 * address while using a RoCE-based device. 822 * 823 * If the source VM connects with an IPv6 address, then we're OK because we can 824 * throw an error on the source (and similarly on the destination). 825 * 826 * But in mixed environments, this will be broken for a while until it is fixed 827 * inside linux. 828 * 829 * We do provide a *tiny* bit of help in this function: We can list all of the 830 * devices in the system and check to see if all the devices are RoCE or 831 * Infiniband. 832 * 833 * If we detect that we have a *pure* RoCE environment, then we can safely 834 * thrown an error even if the management software has specified '[::]' as the 835 * bind address. 836 * 837 * However, if there is are multiple hetergeneous devices, then we cannot make 838 * this assumption and the user just has to be sure they know what they are 839 * doing. 840 * 841 * Patches are being reviewed on linux-rdma. 842 */ 843static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp) 844{ 845 /* This bug only exists in linux, to our knowledge. */ 846#ifdef CONFIG_LINUX 847 struct ibv_port_attr port_attr; 848 849 /* 850 * Verbs are only NULL if management has bound to '[::]'. 851 * 852 * Let's iterate through all the devices and see if there any pure IB 853 * devices (non-ethernet). 854 * 855 * If not, then we can safely proceed with the migration. 856 * Otherwise, there are no guarantees until the bug is fixed in linux. 857 */ 858 if (!verbs) { 859 int num_devices, x; 860 struct ibv_device **dev_list = ibv_get_device_list(&num_devices); 861 bool roce_found = false; 862 bool ib_found = false; 863 864 for (x = 0; x < num_devices; x++) { 865 verbs = ibv_open_device(dev_list[x]); 866 if (!verbs) { 867 if (errno == EPERM) { 868 continue; 869 } else { 870 return -EINVAL; 871 } 872 } 873 874 if (ibv_query_port(verbs, 1, &port_attr)) { 875 ibv_close_device(verbs); 876 ERROR(errp, "Could not query initial IB port"); 877 return -EINVAL; 878 } 879 880 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) { 881 ib_found = true; 882 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 883 roce_found = true; 884 } 885 886 ibv_close_device(verbs); 887 888 } 889 890 if (roce_found) { 891 if (ib_found) { 892 fprintf(stderr, "WARN: migrations may fail:" 893 " IPv6 over RoCE / iWARP in linux" 894 " is broken. But since you appear to have a" 895 " mixed RoCE / IB environment, be sure to only" 896 " migrate over the IB fabric until the kernel " 897 " fixes the bug.\n"); 898 } else { 899 ERROR(errp, "You only have RoCE / iWARP devices in your systems" 900 " and your management software has specified '[::]'" 901 ", but IPv6 over RoCE / iWARP is not supported in Linux."); 902 return -ENONET; 903 } 904 } 905 906 return 0; 907 } 908 909 /* 910 * If we have a verbs context, that means that some other than '[::]' was 911 * used by the management software for binding. In which case we can 912 * actually warn the user about a potentially broken kernel. 913 */ 914 915 /* IB ports start with 1, not 0 */ 916 if (ibv_query_port(verbs, 1, &port_attr)) { 917 ERROR(errp, "Could not query initial IB port"); 918 return -EINVAL; 919 } 920 921 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 922 ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 " 923 "(but patches on linux-rdma in progress)"); 924 return -ENONET; 925 } 926 927#endif 928 929 return 0; 930} 931 932/* 933 * Figure out which RDMA device corresponds to the requested IP hostname 934 * Also create the initial connection manager identifiers for opening 935 * the connection. 936 */ 937static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp) 938{ 939 int ret; 940 struct rdma_addrinfo *res; 941 char port_str[16]; 942 struct rdma_cm_event *cm_event; 943 char ip[40] = "unknown"; 944 struct rdma_addrinfo *e; 945 946 if (rdma->host == NULL || !strcmp(rdma->host, "")) { 947 ERROR(errp, "RDMA hostname has not been set"); 948 return -EINVAL; 949 } 950 951 /* create CM channel */ 952 rdma->channel = rdma_create_event_channel(); 953 if (!rdma->channel) { 954 ERROR(errp, "could not create CM channel"); 955 return -EINVAL; 956 } 957 958 /* create CM id */ 959 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP); 960 if (ret) { 961 ERROR(errp, "could not create channel id"); 962 goto err_resolve_create_id; 963 } 964 965 snprintf(port_str, 16, "%d", rdma->port); 966 port_str[15] = '\0'; 967 968 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 969 if (ret < 0) { 970 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 971 goto err_resolve_get_addr; 972 } 973 974 for (e = res; e != NULL; e = e->ai_next) { 975 inet_ntop(e->ai_family, 976 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 977 trace_qemu_rdma_resolve_host_trying(rdma->host, ip); 978 979 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr, 980 RDMA_RESOLVE_TIMEOUT_MS); 981 if (!ret) { 982 if (e->ai_family == AF_INET6) { 983 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp); 984 if (ret) { 985 continue; 986 } 987 } 988 goto route; 989 } 990 } 991 992 rdma_freeaddrinfo(res); 993 ERROR(errp, "could not resolve address %s", rdma->host); 994 goto err_resolve_get_addr; 995 996route: 997 rdma_freeaddrinfo(res); 998 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id); 999 1000 ret = rdma_get_cm_event(rdma->channel, &cm_event); 1001 if (ret) { 1002 ERROR(errp, "could not perform event_addr_resolved"); 1003 goto err_resolve_get_addr; 1004 } 1005 1006 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) { 1007 ERROR(errp, "result not equal to event_addr_resolved %s", 1008 rdma_event_str(cm_event->event)); 1009 error_report("rdma_resolve_addr"); 1010 rdma_ack_cm_event(cm_event); 1011 ret = -EINVAL; 1012 goto err_resolve_get_addr; 1013 } 1014 rdma_ack_cm_event(cm_event); 1015 1016 /* resolve route */ 1017 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS); 1018 if (ret) { 1019 ERROR(errp, "could not resolve rdma route"); 1020 goto err_resolve_get_addr; 1021 } 1022 1023 ret = rdma_get_cm_event(rdma->channel, &cm_event); 1024 if (ret) { 1025 ERROR(errp, "could not perform event_route_resolved"); 1026 goto err_resolve_get_addr; 1027 } 1028 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) { 1029 ERROR(errp, "result not equal to event_route_resolved: %s", 1030 rdma_event_str(cm_event->event)); 1031 rdma_ack_cm_event(cm_event); 1032 ret = -EINVAL; 1033 goto err_resolve_get_addr; 1034 } 1035 rdma_ack_cm_event(cm_event); 1036 rdma->verbs = rdma->cm_id->verbs; 1037 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs); 1038 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id); 1039 return 0; 1040 1041err_resolve_get_addr: 1042 rdma_destroy_id(rdma->cm_id); 1043 rdma->cm_id = NULL; 1044err_resolve_create_id: 1045 rdma_destroy_event_channel(rdma->channel); 1046 rdma->channel = NULL; 1047 return ret; 1048} 1049 1050/* 1051 * Create protection domain and completion queues 1052 */ 1053static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma) 1054{ 1055 /* allocate pd */ 1056 rdma->pd = ibv_alloc_pd(rdma->verbs); 1057 if (!rdma->pd) { 1058 error_report("failed to allocate protection domain"); 1059 return -1; 1060 } 1061 1062 /* create completion channel */ 1063 rdma->comp_channel = ibv_create_comp_channel(rdma->verbs); 1064 if (!rdma->comp_channel) { 1065 error_report("failed to allocate completion channel"); 1066 goto err_alloc_pd_cq; 1067 } 1068 1069 /* 1070 * Completion queue can be filled by both read and write work requests, 1071 * so must reflect the sum of both possible queue sizes. 1072 */ 1073 rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3), 1074 NULL, rdma->comp_channel, 0); 1075 if (!rdma->cq) { 1076 error_report("failed to allocate completion queue"); 1077 goto err_alloc_pd_cq; 1078 } 1079 1080 return 0; 1081 1082err_alloc_pd_cq: 1083 if (rdma->pd) { 1084 ibv_dealloc_pd(rdma->pd); 1085 } 1086 if (rdma->comp_channel) { 1087 ibv_destroy_comp_channel(rdma->comp_channel); 1088 } 1089 rdma->pd = NULL; 1090 rdma->comp_channel = NULL; 1091 return -1; 1092 1093} 1094 1095/* 1096 * Create queue pairs. 1097 */ 1098static int qemu_rdma_alloc_qp(RDMAContext *rdma) 1099{ 1100 struct ibv_qp_init_attr attr = { 0 }; 1101 int ret; 1102 1103 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX; 1104 attr.cap.max_recv_wr = 3; 1105 attr.cap.max_send_sge = 1; 1106 attr.cap.max_recv_sge = 1; 1107 attr.send_cq = rdma->cq; 1108 attr.recv_cq = rdma->cq; 1109 attr.qp_type = IBV_QPT_RC; 1110 1111 ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr); 1112 if (ret) { 1113 return -1; 1114 } 1115 1116 rdma->qp = rdma->cm_id->qp; 1117 return 0; 1118} 1119 1120static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma) 1121{ 1122 int i; 1123 RDMALocalBlocks *local = &rdma->local_ram_blocks; 1124 1125 for (i = 0; i < local->nb_blocks; i++) { 1126 local->block[i].mr = 1127 ibv_reg_mr(rdma->pd, 1128 local->block[i].local_host_addr, 1129 local->block[i].length, 1130 IBV_ACCESS_LOCAL_WRITE | 1131 IBV_ACCESS_REMOTE_WRITE 1132 ); 1133 if (!local->block[i].mr) { 1134 perror("Failed to register local dest ram block!"); 1135 break; 1136 } 1137 rdma->total_registrations++; 1138 } 1139 1140 if (i >= local->nb_blocks) { 1141 return 0; 1142 } 1143 1144 for (i--; i >= 0; i--) { 1145 ibv_dereg_mr(local->block[i].mr); 1146 local->block[i].mr = NULL; 1147 rdma->total_registrations--; 1148 } 1149 1150 return -1; 1151 1152} 1153 1154/* 1155 * Find the ram block that corresponds to the page requested to be 1156 * transmitted by QEMU. 1157 * 1158 * Once the block is found, also identify which 'chunk' within that 1159 * block that the page belongs to. 1160 * 1161 * This search cannot fail or the migration will fail. 1162 */ 1163static int qemu_rdma_search_ram_block(RDMAContext *rdma, 1164 uintptr_t block_offset, 1165 uint64_t offset, 1166 uint64_t length, 1167 uint64_t *block_index, 1168 uint64_t *chunk_index) 1169{ 1170 uint64_t current_addr = block_offset + offset; 1171 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap, 1172 (void *) block_offset); 1173 assert(block); 1174 assert(current_addr >= block->offset); 1175 assert((current_addr + length) <= (block->offset + block->length)); 1176 1177 *block_index = block->index; 1178 *chunk_index = ram_chunk_index(block->local_host_addr, 1179 block->local_host_addr + (current_addr - block->offset)); 1180 1181 return 0; 1182} 1183 1184/* 1185 * Register a chunk with IB. If the chunk was already registered 1186 * previously, then skip. 1187 * 1188 * Also return the keys associated with the registration needed 1189 * to perform the actual RDMA operation. 1190 */ 1191static int qemu_rdma_register_and_get_keys(RDMAContext *rdma, 1192 RDMALocalBlock *block, uintptr_t host_addr, 1193 uint32_t *lkey, uint32_t *rkey, int chunk, 1194 uint8_t *chunk_start, uint8_t *chunk_end) 1195{ 1196 if (block->mr) { 1197 if (lkey) { 1198 *lkey = block->mr->lkey; 1199 } 1200 if (rkey) { 1201 *rkey = block->mr->rkey; 1202 } 1203 return 0; 1204 } 1205 1206 /* allocate memory to store chunk MRs */ 1207 if (!block->pmr) { 1208 block->pmr = g_new0(struct ibv_mr *, block->nb_chunks); 1209 } 1210 1211 /* 1212 * If 'rkey', then we're the destination, so grant access to the source. 1213 * 1214 * If 'lkey', then we're the source VM, so grant access only to ourselves. 1215 */ 1216 if (!block->pmr[chunk]) { 1217 uint64_t len = chunk_end - chunk_start; 1218 1219 trace_qemu_rdma_register_and_get_keys(len, chunk_start); 1220 1221 block->pmr[chunk] = ibv_reg_mr(rdma->pd, 1222 chunk_start, len, 1223 (rkey ? (IBV_ACCESS_LOCAL_WRITE | 1224 IBV_ACCESS_REMOTE_WRITE) : 0)); 1225 1226 if (!block->pmr[chunk]) { 1227 perror("Failed to register chunk!"); 1228 fprintf(stderr, "Chunk details: block: %d chunk index %d" 1229 " start %" PRIuPTR " end %" PRIuPTR 1230 " host %" PRIuPTR 1231 " local %" PRIuPTR " registrations: %d\n", 1232 block->index, chunk, (uintptr_t)chunk_start, 1233 (uintptr_t)chunk_end, host_addr, 1234 (uintptr_t)block->local_host_addr, 1235 rdma->total_registrations); 1236 return -1; 1237 } 1238 rdma->total_registrations++; 1239 } 1240 1241 if (lkey) { 1242 *lkey = block->pmr[chunk]->lkey; 1243 } 1244 if (rkey) { 1245 *rkey = block->pmr[chunk]->rkey; 1246 } 1247 return 0; 1248} 1249 1250/* 1251 * Register (at connection time) the memory used for control 1252 * channel messages. 1253 */ 1254static int qemu_rdma_reg_control(RDMAContext *rdma, int idx) 1255{ 1256 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd, 1257 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER, 1258 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); 1259 if (rdma->wr_data[idx].control_mr) { 1260 rdma->total_registrations++; 1261 return 0; 1262 } 1263 error_report("qemu_rdma_reg_control failed"); 1264 return -1; 1265} 1266 1267const char *print_wrid(int wrid) 1268{ 1269 if (wrid >= RDMA_WRID_RECV_CONTROL) { 1270 return wrid_desc[RDMA_WRID_RECV_CONTROL]; 1271 } 1272 return wrid_desc[wrid]; 1273} 1274 1275/* 1276 * RDMA requires memory registration (mlock/pinning), but this is not good for 1277 * overcommitment. 1278 * 1279 * In preparation for the future where LRU information or workload-specific 1280 * writable writable working set memory access behavior is available to QEMU 1281 * it would be nice to have in place the ability to UN-register/UN-pin 1282 * particular memory regions from the RDMA hardware when it is determine that 1283 * those regions of memory will likely not be accessed again in the near future. 1284 * 1285 * While we do not yet have such information right now, the following 1286 * compile-time option allows us to perform a non-optimized version of this 1287 * behavior. 1288 * 1289 * By uncommenting this option, you will cause *all* RDMA transfers to be 1290 * unregistered immediately after the transfer completes on both sides of the 1291 * connection. This has no effect in 'rdma-pin-all' mode, only regular mode. 1292 * 1293 * This will have a terrible impact on migration performance, so until future 1294 * workload information or LRU information is available, do not attempt to use 1295 * this feature except for basic testing. 1296 */ 1297/* #define RDMA_UNREGISTRATION_EXAMPLE */ 1298 1299/* 1300 * Perform a non-optimized memory unregistration after every transfer 1301 * for demonstration purposes, only if pin-all is not requested. 1302 * 1303 * Potential optimizations: 1304 * 1. Start a new thread to run this function continuously 1305 - for bit clearing 1306 - and for receipt of unregister messages 1307 * 2. Use an LRU. 1308 * 3. Use workload hints. 1309 */ 1310static int qemu_rdma_unregister_waiting(RDMAContext *rdma) 1311{ 1312 while (rdma->unregistrations[rdma->unregister_current]) { 1313 int ret; 1314 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current]; 1315 uint64_t chunk = 1316 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1317 uint64_t index = 1318 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1319 RDMALocalBlock *block = 1320 &(rdma->local_ram_blocks.block[index]); 1321 RDMARegister reg = { .current_index = index }; 1322 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED, 1323 }; 1324 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1325 .type = RDMA_CONTROL_UNREGISTER_REQUEST, 1326 .repeat = 1, 1327 }; 1328 1329 trace_qemu_rdma_unregister_waiting_proc(chunk, 1330 rdma->unregister_current); 1331 1332 rdma->unregistrations[rdma->unregister_current] = 0; 1333 rdma->unregister_current++; 1334 1335 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) { 1336 rdma->unregister_current = 0; 1337 } 1338 1339 1340 /* 1341 * Unregistration is speculative (because migration is single-threaded 1342 * and we cannot break the protocol's inifinband message ordering). 1343 * Thus, if the memory is currently being used for transmission, 1344 * then abort the attempt to unregister and try again 1345 * later the next time a completion is received for this memory. 1346 */ 1347 clear_bit(chunk, block->unregister_bitmap); 1348 1349 if (test_bit(chunk, block->transit_bitmap)) { 1350 trace_qemu_rdma_unregister_waiting_inflight(chunk); 1351 continue; 1352 } 1353 1354 trace_qemu_rdma_unregister_waiting_send(chunk); 1355 1356 ret = ibv_dereg_mr(block->pmr[chunk]); 1357 block->pmr[chunk] = NULL; 1358 block->remote_keys[chunk] = 0; 1359 1360 if (ret != 0) { 1361 perror("unregistration chunk failed"); 1362 return -ret; 1363 } 1364 rdma->total_registrations--; 1365 1366 reg.key.chunk = chunk; 1367 register_to_network(rdma, ®); 1368 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1369 &resp, NULL, NULL); 1370 if (ret < 0) { 1371 return ret; 1372 } 1373 1374 trace_qemu_rdma_unregister_waiting_complete(chunk); 1375 } 1376 1377 return 0; 1378} 1379 1380static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index, 1381 uint64_t chunk) 1382{ 1383 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK; 1384 1385 result |= (index << RDMA_WRID_BLOCK_SHIFT); 1386 result |= (chunk << RDMA_WRID_CHUNK_SHIFT); 1387 1388 return result; 1389} 1390 1391/* 1392 * Set bit for unregistration in the next iteration. 1393 * We cannot transmit right here, but will unpin later. 1394 */ 1395static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index, 1396 uint64_t chunk, uint64_t wr_id) 1397{ 1398 if (rdma->unregistrations[rdma->unregister_next] != 0) { 1399 error_report("rdma migration: queue is full"); 1400 } else { 1401 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1402 1403 if (!test_and_set_bit(chunk, block->unregister_bitmap)) { 1404 trace_qemu_rdma_signal_unregister_append(chunk, 1405 rdma->unregister_next); 1406 1407 rdma->unregistrations[rdma->unregister_next++] = 1408 qemu_rdma_make_wrid(wr_id, index, chunk); 1409 1410 if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) { 1411 rdma->unregister_next = 0; 1412 } 1413 } else { 1414 trace_qemu_rdma_signal_unregister_already(chunk); 1415 } 1416 } 1417} 1418 1419/* 1420 * Consult the connection manager to see a work request 1421 * (of any kind) has completed. 1422 * Return the work request ID that completed. 1423 */ 1424static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out, 1425 uint32_t *byte_len) 1426{ 1427 int ret; 1428 struct ibv_wc wc; 1429 uint64_t wr_id; 1430 1431 ret = ibv_poll_cq(rdma->cq, 1, &wc); 1432 1433 if (!ret) { 1434 *wr_id_out = RDMA_WRID_NONE; 1435 return 0; 1436 } 1437 1438 if (ret < 0) { 1439 error_report("ibv_poll_cq return %d", ret); 1440 return ret; 1441 } 1442 1443 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK; 1444 1445 if (wc.status != IBV_WC_SUCCESS) { 1446 fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n", 1447 wc.status, ibv_wc_status_str(wc.status)); 1448 fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]); 1449 1450 return -1; 1451 } 1452 1453 if (rdma->control_ready_expected && 1454 (wr_id >= RDMA_WRID_RECV_CONTROL)) { 1455 trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL], 1456 wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent); 1457 rdma->control_ready_expected = 0; 1458 } 1459 1460 if (wr_id == RDMA_WRID_RDMA_WRITE) { 1461 uint64_t chunk = 1462 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1463 uint64_t index = 1464 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1465 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1466 1467 trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent, 1468 index, chunk, block->local_host_addr, 1469 (void *)(uintptr_t)block->remote_host_addr); 1470 1471 clear_bit(chunk, block->transit_bitmap); 1472 1473 if (rdma->nb_sent > 0) { 1474 rdma->nb_sent--; 1475 } 1476 1477 if (!rdma->pin_all) { 1478 /* 1479 * FYI: If one wanted to signal a specific chunk to be unregistered 1480 * using LRU or workload-specific information, this is the function 1481 * you would call to do so. That chunk would then get asynchronously 1482 * unregistered later. 1483 */ 1484#ifdef RDMA_UNREGISTRATION_EXAMPLE 1485 qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id); 1486#endif 1487 } 1488 } else { 1489 trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent); 1490 } 1491 1492 *wr_id_out = wc.wr_id; 1493 if (byte_len) { 1494 *byte_len = wc.byte_len; 1495 } 1496 1497 return 0; 1498} 1499 1500/* Wait for activity on the completion channel. 1501 * Returns 0 on success, none-0 on error. 1502 */ 1503static int qemu_rdma_wait_comp_channel(RDMAContext *rdma) 1504{ 1505 struct rdma_cm_event *cm_event; 1506 int ret = -1; 1507 1508 /* 1509 * Coroutine doesn't start until migration_fd_process_incoming() 1510 * so don't yield unless we know we're running inside of a coroutine. 1511 */ 1512 if (rdma->migration_started_on_destination && 1513 migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) { 1514 yield_until_fd_readable(rdma->comp_channel->fd); 1515 } else { 1516 /* This is the source side, we're in a separate thread 1517 * or destination prior to migration_fd_process_incoming() 1518 * after postcopy, the destination also in a separate thread. 1519 * we can't yield; so we have to poll the fd. 1520 * But we need to be able to handle 'cancel' or an error 1521 * without hanging forever. 1522 */ 1523 while (!rdma->error_state && !rdma->received_error) { 1524 GPollFD pfds[2]; 1525 pfds[0].fd = rdma->comp_channel->fd; 1526 pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR; 1527 pfds[0].revents = 0; 1528 1529 pfds[1].fd = rdma->channel->fd; 1530 pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR; 1531 pfds[1].revents = 0; 1532 1533 /* 0.1s timeout, should be fine for a 'cancel' */ 1534 switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) { 1535 case 2: 1536 case 1: /* fd active */ 1537 if (pfds[0].revents) { 1538 return 0; 1539 } 1540 1541 if (pfds[1].revents) { 1542 ret = rdma_get_cm_event(rdma->channel, &cm_event); 1543 if (ret) { 1544 error_report("failed to get cm event while wait " 1545 "completion channel"); 1546 return -EPIPE; 1547 } 1548 1549 error_report("receive cm event while wait comp channel," 1550 "cm event is %d", cm_event->event); 1551 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED || 1552 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) { 1553 rdma_ack_cm_event(cm_event); 1554 return -EPIPE; 1555 } 1556 rdma_ack_cm_event(cm_event); 1557 } 1558 break; 1559 1560 case 0: /* Timeout, go around again */ 1561 break; 1562 1563 default: /* Error of some type - 1564 * I don't trust errno from qemu_poll_ns 1565 */ 1566 error_report("%s: poll failed", __func__); 1567 return -EPIPE; 1568 } 1569 1570 if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) { 1571 /* Bail out and let the cancellation happen */ 1572 return -EPIPE; 1573 } 1574 } 1575 } 1576 1577 if (rdma->received_error) { 1578 return -EPIPE; 1579 } 1580 return rdma->error_state; 1581} 1582 1583/* 1584 * Block until the next work request has completed. 1585 * 1586 * First poll to see if a work request has already completed, 1587 * otherwise block. 1588 * 1589 * If we encounter completed work requests for IDs other than 1590 * the one we're interested in, then that's generally an error. 1591 * 1592 * The only exception is actual RDMA Write completions. These 1593 * completions only need to be recorded, but do not actually 1594 * need further processing. 1595 */ 1596static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested, 1597 uint32_t *byte_len) 1598{ 1599 int num_cq_events = 0, ret = 0; 1600 struct ibv_cq *cq; 1601 void *cq_ctx; 1602 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in; 1603 1604 if (ibv_req_notify_cq(rdma->cq, 0)) { 1605 return -1; 1606 } 1607 /* poll cq first */ 1608 while (wr_id != wrid_requested) { 1609 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1610 if (ret < 0) { 1611 return ret; 1612 } 1613 1614 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1615 1616 if (wr_id == RDMA_WRID_NONE) { 1617 break; 1618 } 1619 if (wr_id != wrid_requested) { 1620 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1621 wrid_requested, print_wrid(wr_id), wr_id); 1622 } 1623 } 1624 1625 if (wr_id == wrid_requested) { 1626 return 0; 1627 } 1628 1629 while (1) { 1630 ret = qemu_rdma_wait_comp_channel(rdma); 1631 if (ret) { 1632 goto err_block_for_wrid; 1633 } 1634 1635 ret = ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx); 1636 if (ret) { 1637 perror("ibv_get_cq_event"); 1638 goto err_block_for_wrid; 1639 } 1640 1641 num_cq_events++; 1642 1643 ret = -ibv_req_notify_cq(cq, 0); 1644 if (ret) { 1645 goto err_block_for_wrid; 1646 } 1647 1648 while (wr_id != wrid_requested) { 1649 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1650 if (ret < 0) { 1651 goto err_block_for_wrid; 1652 } 1653 1654 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1655 1656 if (wr_id == RDMA_WRID_NONE) { 1657 break; 1658 } 1659 if (wr_id != wrid_requested) { 1660 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1661 wrid_requested, print_wrid(wr_id), wr_id); 1662 } 1663 } 1664 1665 if (wr_id == wrid_requested) { 1666 goto success_block_for_wrid; 1667 } 1668 } 1669 1670success_block_for_wrid: 1671 if (num_cq_events) { 1672 ibv_ack_cq_events(cq, num_cq_events); 1673 } 1674 return 0; 1675 1676err_block_for_wrid: 1677 if (num_cq_events) { 1678 ibv_ack_cq_events(cq, num_cq_events); 1679 } 1680 1681 rdma->error_state = ret; 1682 return ret; 1683} 1684 1685/* 1686 * Post a SEND message work request for the control channel 1687 * containing some data and block until the post completes. 1688 */ 1689static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf, 1690 RDMAControlHeader *head) 1691{ 1692 int ret = 0; 1693 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL]; 1694 struct ibv_send_wr *bad_wr; 1695 struct ibv_sge sge = { 1696 .addr = (uintptr_t)(wr->control), 1697 .length = head->len + sizeof(RDMAControlHeader), 1698 .lkey = wr->control_mr->lkey, 1699 }; 1700 struct ibv_send_wr send_wr = { 1701 .wr_id = RDMA_WRID_SEND_CONTROL, 1702 .opcode = IBV_WR_SEND, 1703 .send_flags = IBV_SEND_SIGNALED, 1704 .sg_list = &sge, 1705 .num_sge = 1, 1706 }; 1707 1708 trace_qemu_rdma_post_send_control(control_desc(head->type)); 1709 1710 /* 1711 * We don't actually need to do a memcpy() in here if we used 1712 * the "sge" properly, but since we're only sending control messages 1713 * (not RAM in a performance-critical path), then its OK for now. 1714 * 1715 * The copy makes the RDMAControlHeader simpler to manipulate 1716 * for the time being. 1717 */ 1718 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head)); 1719 memcpy(wr->control, head, sizeof(RDMAControlHeader)); 1720 control_to_network((void *) wr->control); 1721 1722 if (buf) { 1723 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len); 1724 } 1725 1726 1727 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 1728 1729 if (ret > 0) { 1730 error_report("Failed to use post IB SEND for control"); 1731 return -ret; 1732 } 1733 1734 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL); 1735 if (ret < 0) { 1736 error_report("rdma migration: send polling control error"); 1737 } 1738 1739 return ret; 1740} 1741 1742/* 1743 * Post a RECV work request in anticipation of some future receipt 1744 * of data on the control channel. 1745 */ 1746static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx) 1747{ 1748 struct ibv_recv_wr *bad_wr; 1749 struct ibv_sge sge = { 1750 .addr = (uintptr_t)(rdma->wr_data[idx].control), 1751 .length = RDMA_CONTROL_MAX_BUFFER, 1752 .lkey = rdma->wr_data[idx].control_mr->lkey, 1753 }; 1754 1755 struct ibv_recv_wr recv_wr = { 1756 .wr_id = RDMA_WRID_RECV_CONTROL + idx, 1757 .sg_list = &sge, 1758 .num_sge = 1, 1759 }; 1760 1761 1762 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) { 1763 return -1; 1764 } 1765 1766 return 0; 1767} 1768 1769/* 1770 * Block and wait for a RECV control channel message to arrive. 1771 */ 1772static int qemu_rdma_exchange_get_response(RDMAContext *rdma, 1773 RDMAControlHeader *head, int expecting, int idx) 1774{ 1775 uint32_t byte_len; 1776 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx, 1777 &byte_len); 1778 1779 if (ret < 0) { 1780 error_report("rdma migration: recv polling control error!"); 1781 return ret; 1782 } 1783 1784 network_to_control((void *) rdma->wr_data[idx].control); 1785 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader)); 1786 1787 trace_qemu_rdma_exchange_get_response_start(control_desc(expecting)); 1788 1789 if (expecting == RDMA_CONTROL_NONE) { 1790 trace_qemu_rdma_exchange_get_response_none(control_desc(head->type), 1791 head->type); 1792 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) { 1793 error_report("Was expecting a %s (%d) control message" 1794 ", but got: %s (%d), length: %d", 1795 control_desc(expecting), expecting, 1796 control_desc(head->type), head->type, head->len); 1797 if (head->type == RDMA_CONTROL_ERROR) { 1798 rdma->received_error = true; 1799 } 1800 return -EIO; 1801 } 1802 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) { 1803 error_report("too long length: %d", head->len); 1804 return -EINVAL; 1805 } 1806 if (sizeof(*head) + head->len != byte_len) { 1807 error_report("Malformed length: %d byte_len %d", head->len, byte_len); 1808 return -EINVAL; 1809 } 1810 1811 return 0; 1812} 1813 1814/* 1815 * When a RECV work request has completed, the work request's 1816 * buffer is pointed at the header. 1817 * 1818 * This will advance the pointer to the data portion 1819 * of the control message of the work request's buffer that 1820 * was populated after the work request finished. 1821 */ 1822static void qemu_rdma_move_header(RDMAContext *rdma, int idx, 1823 RDMAControlHeader *head) 1824{ 1825 rdma->wr_data[idx].control_len = head->len; 1826 rdma->wr_data[idx].control_curr = 1827 rdma->wr_data[idx].control + sizeof(RDMAControlHeader); 1828} 1829 1830/* 1831 * This is an 'atomic' high-level operation to deliver a single, unified 1832 * control-channel message. 1833 * 1834 * Additionally, if the user is expecting some kind of reply to this message, 1835 * they can request a 'resp' response message be filled in by posting an 1836 * additional work request on behalf of the user and waiting for an additional 1837 * completion. 1838 * 1839 * The extra (optional) response is used during registration to us from having 1840 * to perform an *additional* exchange of message just to provide a response by 1841 * instead piggy-backing on the acknowledgement. 1842 */ 1843static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 1844 uint8_t *data, RDMAControlHeader *resp, 1845 int *resp_idx, 1846 int (*callback)(RDMAContext *rdma)) 1847{ 1848 int ret = 0; 1849 1850 /* 1851 * Wait until the dest is ready before attempting to deliver the message 1852 * by waiting for a READY message. 1853 */ 1854 if (rdma->control_ready_expected) { 1855 RDMAControlHeader resp; 1856 ret = qemu_rdma_exchange_get_response(rdma, 1857 &resp, RDMA_CONTROL_READY, RDMA_WRID_READY); 1858 if (ret < 0) { 1859 return ret; 1860 } 1861 } 1862 1863 /* 1864 * If the user is expecting a response, post a WR in anticipation of it. 1865 */ 1866 if (resp) { 1867 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA); 1868 if (ret) { 1869 error_report("rdma migration: error posting" 1870 " extra control recv for anticipated result!"); 1871 return ret; 1872 } 1873 } 1874 1875 /* 1876 * Post a WR to replace the one we just consumed for the READY message. 1877 */ 1878 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1879 if (ret) { 1880 error_report("rdma migration: error posting first control recv!"); 1881 return ret; 1882 } 1883 1884 /* 1885 * Deliver the control message that was requested. 1886 */ 1887 ret = qemu_rdma_post_send_control(rdma, data, head); 1888 1889 if (ret < 0) { 1890 error_report("Failed to send control buffer!"); 1891 return ret; 1892 } 1893 1894 /* 1895 * If we're expecting a response, block and wait for it. 1896 */ 1897 if (resp) { 1898 if (callback) { 1899 trace_qemu_rdma_exchange_send_issue_callback(); 1900 ret = callback(rdma); 1901 if (ret < 0) { 1902 return ret; 1903 } 1904 } 1905 1906 trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type)); 1907 ret = qemu_rdma_exchange_get_response(rdma, resp, 1908 resp->type, RDMA_WRID_DATA); 1909 1910 if (ret < 0) { 1911 return ret; 1912 } 1913 1914 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp); 1915 if (resp_idx) { 1916 *resp_idx = RDMA_WRID_DATA; 1917 } 1918 trace_qemu_rdma_exchange_send_received(control_desc(resp->type)); 1919 } 1920 1921 rdma->control_ready_expected = 1; 1922 1923 return 0; 1924} 1925 1926/* 1927 * This is an 'atomic' high-level operation to receive a single, unified 1928 * control-channel message. 1929 */ 1930static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head, 1931 int expecting) 1932{ 1933 RDMAControlHeader ready = { 1934 .len = 0, 1935 .type = RDMA_CONTROL_READY, 1936 .repeat = 1, 1937 }; 1938 int ret; 1939 1940 /* 1941 * Inform the source that we're ready to receive a message. 1942 */ 1943 ret = qemu_rdma_post_send_control(rdma, NULL, &ready); 1944 1945 if (ret < 0) { 1946 error_report("Failed to send control buffer!"); 1947 return ret; 1948 } 1949 1950 /* 1951 * Block and wait for the message. 1952 */ 1953 ret = qemu_rdma_exchange_get_response(rdma, head, 1954 expecting, RDMA_WRID_READY); 1955 1956 if (ret < 0) { 1957 return ret; 1958 } 1959 1960 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head); 1961 1962 /* 1963 * Post a new RECV work request to replace the one we just consumed. 1964 */ 1965 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1966 if (ret) { 1967 error_report("rdma migration: error posting second control recv!"); 1968 return ret; 1969 } 1970 1971 return 0; 1972} 1973 1974/* 1975 * Write an actual chunk of memory using RDMA. 1976 * 1977 * If we're using dynamic registration on the dest-side, we have to 1978 * send a registration command first. 1979 */ 1980static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma, 1981 int current_index, uint64_t current_addr, 1982 uint64_t length) 1983{ 1984 struct ibv_sge sge; 1985 struct ibv_send_wr send_wr = { 0 }; 1986 struct ibv_send_wr *bad_wr; 1987 int reg_result_idx, ret, count = 0; 1988 uint64_t chunk, chunks; 1989 uint8_t *chunk_start, *chunk_end; 1990 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]); 1991 RDMARegister reg; 1992 RDMARegisterResult *reg_result; 1993 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT }; 1994 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1995 .type = RDMA_CONTROL_REGISTER_REQUEST, 1996 .repeat = 1, 1997 }; 1998 1999retry: 2000 sge.addr = (uintptr_t)(block->local_host_addr + 2001 (current_addr - block->offset)); 2002 sge.length = length; 2003 2004 chunk = ram_chunk_index(block->local_host_addr, 2005 (uint8_t *)(uintptr_t)sge.addr); 2006 chunk_start = ram_chunk_start(block, chunk); 2007 2008 if (block->is_ram_block) { 2009 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT); 2010 2011 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 2012 chunks--; 2013 } 2014 } else { 2015 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT); 2016 2017 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 2018 chunks--; 2019 } 2020 } 2021 2022 trace_qemu_rdma_write_one_top(chunks + 1, 2023 (chunks + 1) * 2024 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024); 2025 2026 chunk_end = ram_chunk_end(block, chunk + chunks); 2027 2028 if (!rdma->pin_all) { 2029#ifdef RDMA_UNREGISTRATION_EXAMPLE 2030 qemu_rdma_unregister_waiting(rdma); 2031#endif 2032 } 2033 2034 while (test_bit(chunk, block->transit_bitmap)) { 2035 (void)count; 2036 trace_qemu_rdma_write_one_block(count++, current_index, chunk, 2037 sge.addr, length, rdma->nb_sent, block->nb_chunks); 2038 2039 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2040 2041 if (ret < 0) { 2042 error_report("Failed to Wait for previous write to complete " 2043 "block %d chunk %" PRIu64 2044 " current %" PRIu64 " len %" PRIu64 " %d", 2045 current_index, chunk, sge.addr, length, rdma->nb_sent); 2046 return ret; 2047 } 2048 } 2049 2050 if (!rdma->pin_all || !block->is_ram_block) { 2051 if (!block->remote_keys[chunk]) { 2052 /* 2053 * This chunk has not yet been registered, so first check to see 2054 * if the entire chunk is zero. If so, tell the other size to 2055 * memset() + madvise() the entire chunk without RDMA. 2056 */ 2057 2058 if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) { 2059 RDMACompress comp = { 2060 .offset = current_addr, 2061 .value = 0, 2062 .block_idx = current_index, 2063 .length = length, 2064 }; 2065 2066 head.len = sizeof(comp); 2067 head.type = RDMA_CONTROL_COMPRESS; 2068 2069 trace_qemu_rdma_write_one_zero(chunk, sge.length, 2070 current_index, current_addr); 2071 2072 compress_to_network(rdma, &comp); 2073 ret = qemu_rdma_exchange_send(rdma, &head, 2074 (uint8_t *) &comp, NULL, NULL, NULL); 2075 2076 if (ret < 0) { 2077 return -EIO; 2078 } 2079 2080 acct_update_position(f, sge.length, true); 2081 2082 return 1; 2083 } 2084 2085 /* 2086 * Otherwise, tell other side to register. 2087 */ 2088 reg.current_index = current_index; 2089 if (block->is_ram_block) { 2090 reg.key.current_addr = current_addr; 2091 } else { 2092 reg.key.chunk = chunk; 2093 } 2094 reg.chunks = chunks; 2095 2096 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index, 2097 current_addr); 2098 2099 register_to_network(rdma, ®); 2100 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 2101 &resp, ®_result_idx, NULL); 2102 if (ret < 0) { 2103 return ret; 2104 } 2105 2106 /* try to overlap this single registration with the one we sent. */ 2107 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2108 &sge.lkey, NULL, chunk, 2109 chunk_start, chunk_end)) { 2110 error_report("cannot get lkey"); 2111 return -EINVAL; 2112 } 2113 2114 reg_result = (RDMARegisterResult *) 2115 rdma->wr_data[reg_result_idx].control_curr; 2116 2117 network_to_result(reg_result); 2118 2119 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk], 2120 reg_result->rkey, chunk); 2121 2122 block->remote_keys[chunk] = reg_result->rkey; 2123 block->remote_host_addr = reg_result->host_addr; 2124 } else { 2125 /* already registered before */ 2126 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2127 &sge.lkey, NULL, chunk, 2128 chunk_start, chunk_end)) { 2129 error_report("cannot get lkey!"); 2130 return -EINVAL; 2131 } 2132 } 2133 2134 send_wr.wr.rdma.rkey = block->remote_keys[chunk]; 2135 } else { 2136 send_wr.wr.rdma.rkey = block->remote_rkey; 2137 2138 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2139 &sge.lkey, NULL, chunk, 2140 chunk_start, chunk_end)) { 2141 error_report("cannot get lkey!"); 2142 return -EINVAL; 2143 } 2144 } 2145 2146 /* 2147 * Encode the ram block index and chunk within this wrid. 2148 * We will use this information at the time of completion 2149 * to figure out which bitmap to check against and then which 2150 * chunk in the bitmap to look for. 2151 */ 2152 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE, 2153 current_index, chunk); 2154 2155 send_wr.opcode = IBV_WR_RDMA_WRITE; 2156 send_wr.send_flags = IBV_SEND_SIGNALED; 2157 send_wr.sg_list = &sge; 2158 send_wr.num_sge = 1; 2159 send_wr.wr.rdma.remote_addr = block->remote_host_addr + 2160 (current_addr - block->offset); 2161 2162 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr, 2163 sge.length); 2164 2165 /* 2166 * ibv_post_send() does not return negative error numbers, 2167 * per the specification they are positive - no idea why. 2168 */ 2169 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 2170 2171 if (ret == ENOMEM) { 2172 trace_qemu_rdma_write_one_queue_full(); 2173 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2174 if (ret < 0) { 2175 error_report("rdma migration: failed to make " 2176 "room in full send queue! %d", ret); 2177 return ret; 2178 } 2179 2180 goto retry; 2181 2182 } else if (ret > 0) { 2183 perror("rdma migration: post rdma write failed"); 2184 return -ret; 2185 } 2186 2187 set_bit(chunk, block->transit_bitmap); 2188 acct_update_position(f, sge.length, false); 2189 rdma->total_writes++; 2190 2191 return 0; 2192} 2193 2194/* 2195 * Push out any unwritten RDMA operations. 2196 * 2197 * We support sending out multiple chunks at the same time. 2198 * Not all of them need to get signaled in the completion queue. 2199 */ 2200static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma) 2201{ 2202 int ret; 2203 2204 if (!rdma->current_length) { 2205 return 0; 2206 } 2207 2208 ret = qemu_rdma_write_one(f, rdma, 2209 rdma->current_index, rdma->current_addr, rdma->current_length); 2210 2211 if (ret < 0) { 2212 return ret; 2213 } 2214 2215 if (ret == 0) { 2216 rdma->nb_sent++; 2217 trace_qemu_rdma_write_flush(rdma->nb_sent); 2218 } 2219 2220 rdma->current_length = 0; 2221 rdma->current_addr = 0; 2222 2223 return 0; 2224} 2225 2226static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma, 2227 uint64_t offset, uint64_t len) 2228{ 2229 RDMALocalBlock *block; 2230 uint8_t *host_addr; 2231 uint8_t *chunk_end; 2232 2233 if (rdma->current_index < 0) { 2234 return 0; 2235 } 2236 2237 if (rdma->current_chunk < 0) { 2238 return 0; 2239 } 2240 2241 block = &(rdma->local_ram_blocks.block[rdma->current_index]); 2242 host_addr = block->local_host_addr + (offset - block->offset); 2243 chunk_end = ram_chunk_end(block, rdma->current_chunk); 2244 2245 if (rdma->current_length == 0) { 2246 return 0; 2247 } 2248 2249 /* 2250 * Only merge into chunk sequentially. 2251 */ 2252 if (offset != (rdma->current_addr + rdma->current_length)) { 2253 return 0; 2254 } 2255 2256 if (offset < block->offset) { 2257 return 0; 2258 } 2259 2260 if ((offset + len) > (block->offset + block->length)) { 2261 return 0; 2262 } 2263 2264 if ((host_addr + len) > chunk_end) { 2265 return 0; 2266 } 2267 2268 return 1; 2269} 2270 2271/* 2272 * We're not actually writing here, but doing three things: 2273 * 2274 * 1. Identify the chunk the buffer belongs to. 2275 * 2. If the chunk is full or the buffer doesn't belong to the current 2276 * chunk, then start a new chunk and flush() the old chunk. 2277 * 3. To keep the hardware busy, we also group chunks into batches 2278 * and only require that a batch gets acknowledged in the completion 2279 * queue instead of each individual chunk. 2280 */ 2281static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma, 2282 uint64_t block_offset, uint64_t offset, 2283 uint64_t len) 2284{ 2285 uint64_t current_addr = block_offset + offset; 2286 uint64_t index = rdma->current_index; 2287 uint64_t chunk = rdma->current_chunk; 2288 int ret; 2289 2290 /* If we cannot merge it, we flush the current buffer first. */ 2291 if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) { 2292 ret = qemu_rdma_write_flush(f, rdma); 2293 if (ret) { 2294 return ret; 2295 } 2296 rdma->current_length = 0; 2297 rdma->current_addr = current_addr; 2298 2299 ret = qemu_rdma_search_ram_block(rdma, block_offset, 2300 offset, len, &index, &chunk); 2301 if (ret) { 2302 error_report("ram block search failed"); 2303 return ret; 2304 } 2305 rdma->current_index = index; 2306 rdma->current_chunk = chunk; 2307 } 2308 2309 /* merge it */ 2310 rdma->current_length += len; 2311 2312 /* flush it if buffer is too large */ 2313 if (rdma->current_length >= RDMA_MERGE_MAX) { 2314 return qemu_rdma_write_flush(f, rdma); 2315 } 2316 2317 return 0; 2318} 2319 2320static void qemu_rdma_cleanup(RDMAContext *rdma) 2321{ 2322 int idx; 2323 2324 if (rdma->cm_id && rdma->connected) { 2325 if ((rdma->error_state || 2326 migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) && 2327 !rdma->received_error) { 2328 RDMAControlHeader head = { .len = 0, 2329 .type = RDMA_CONTROL_ERROR, 2330 .repeat = 1, 2331 }; 2332 error_report("Early error. Sending error."); 2333 qemu_rdma_post_send_control(rdma, NULL, &head); 2334 } 2335 2336 rdma_disconnect(rdma->cm_id); 2337 trace_qemu_rdma_cleanup_disconnect(); 2338 rdma->connected = false; 2339 } 2340 2341 if (rdma->channel) { 2342 qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL); 2343 } 2344 g_free(rdma->dest_blocks); 2345 rdma->dest_blocks = NULL; 2346 2347 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2348 if (rdma->wr_data[idx].control_mr) { 2349 rdma->total_registrations--; 2350 ibv_dereg_mr(rdma->wr_data[idx].control_mr); 2351 } 2352 rdma->wr_data[idx].control_mr = NULL; 2353 } 2354 2355 if (rdma->local_ram_blocks.block) { 2356 while (rdma->local_ram_blocks.nb_blocks) { 2357 rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]); 2358 } 2359 } 2360 2361 if (rdma->qp) { 2362 rdma_destroy_qp(rdma->cm_id); 2363 rdma->qp = NULL; 2364 } 2365 if (rdma->cq) { 2366 ibv_destroy_cq(rdma->cq); 2367 rdma->cq = NULL; 2368 } 2369 if (rdma->comp_channel) { 2370 ibv_destroy_comp_channel(rdma->comp_channel); 2371 rdma->comp_channel = NULL; 2372 } 2373 if (rdma->pd) { 2374 ibv_dealloc_pd(rdma->pd); 2375 rdma->pd = NULL; 2376 } 2377 if (rdma->cm_id) { 2378 rdma_destroy_id(rdma->cm_id); 2379 rdma->cm_id = NULL; 2380 } 2381 2382 /* the destination side, listen_id and channel is shared */ 2383 if (rdma->listen_id) { 2384 if (!rdma->is_return_path) { 2385 rdma_destroy_id(rdma->listen_id); 2386 } 2387 rdma->listen_id = NULL; 2388 2389 if (rdma->channel) { 2390 if (!rdma->is_return_path) { 2391 rdma_destroy_event_channel(rdma->channel); 2392 } 2393 rdma->channel = NULL; 2394 } 2395 } 2396 2397 if (rdma->channel) { 2398 rdma_destroy_event_channel(rdma->channel); 2399 rdma->channel = NULL; 2400 } 2401 g_free(rdma->host); 2402 g_free(rdma->host_port); 2403 rdma->host = NULL; 2404 rdma->host_port = NULL; 2405} 2406 2407 2408static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp) 2409{ 2410 int ret, idx; 2411 Error *local_err = NULL, **temp = &local_err; 2412 2413 /* 2414 * Will be validated against destination's actual capabilities 2415 * after the connect() completes. 2416 */ 2417 rdma->pin_all = pin_all; 2418 2419 ret = qemu_rdma_resolve_host(rdma, temp); 2420 if (ret) { 2421 goto err_rdma_source_init; 2422 } 2423 2424 ret = qemu_rdma_alloc_pd_cq(rdma); 2425 if (ret) { 2426 ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()" 2427 " limits may be too low. Please check $ ulimit -a # and " 2428 "search for 'ulimit -l' in the output"); 2429 goto err_rdma_source_init; 2430 } 2431 2432 ret = qemu_rdma_alloc_qp(rdma); 2433 if (ret) { 2434 ERROR(temp, "rdma migration: error allocating qp!"); 2435 goto err_rdma_source_init; 2436 } 2437 2438 ret = qemu_rdma_init_ram_blocks(rdma); 2439 if (ret) { 2440 ERROR(temp, "rdma migration: error initializing ram blocks!"); 2441 goto err_rdma_source_init; 2442 } 2443 2444 /* Build the hash that maps from offset to RAMBlock */ 2445 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal); 2446 for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) { 2447 g_hash_table_insert(rdma->blockmap, 2448 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset, 2449 &rdma->local_ram_blocks.block[idx]); 2450 } 2451 2452 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2453 ret = qemu_rdma_reg_control(rdma, idx); 2454 if (ret) { 2455 ERROR(temp, "rdma migration: error registering %d control!", 2456 idx); 2457 goto err_rdma_source_init; 2458 } 2459 } 2460 2461 return 0; 2462 2463err_rdma_source_init: 2464 error_propagate(errp, local_err); 2465 qemu_rdma_cleanup(rdma); 2466 return -1; 2467} 2468 2469static int qemu_get_cm_event_timeout(RDMAContext *rdma, 2470 struct rdma_cm_event **cm_event, 2471 long msec, Error **errp) 2472{ 2473 int ret; 2474 struct pollfd poll_fd = { 2475 .fd = rdma->channel->fd, 2476 .events = POLLIN, 2477 .revents = 0 2478 }; 2479 2480 do { 2481 ret = poll(&poll_fd, 1, msec); 2482 } while (ret < 0 && errno == EINTR); 2483 2484 if (ret == 0) { 2485 ERROR(errp, "poll cm event timeout"); 2486 return -1; 2487 } else if (ret < 0) { 2488 ERROR(errp, "failed to poll cm event, errno=%i", errno); 2489 return -1; 2490 } else if (poll_fd.revents & POLLIN) { 2491 return rdma_get_cm_event(rdma->channel, cm_event); 2492 } else { 2493 ERROR(errp, "no POLLIN event, revent=%x", poll_fd.revents); 2494 return -1; 2495 } 2496} 2497 2498static int qemu_rdma_connect(RDMAContext *rdma, Error **errp, bool return_path) 2499{ 2500 RDMACapabilities cap = { 2501 .version = RDMA_CONTROL_VERSION_CURRENT, 2502 .flags = 0, 2503 }; 2504 struct rdma_conn_param conn_param = { .initiator_depth = 2, 2505 .retry_count = 5, 2506 .private_data = &cap, 2507 .private_data_len = sizeof(cap), 2508 }; 2509 struct rdma_cm_event *cm_event; 2510 int ret; 2511 2512 /* 2513 * Only negotiate the capability with destination if the user 2514 * on the source first requested the capability. 2515 */ 2516 if (rdma->pin_all) { 2517 trace_qemu_rdma_connect_pin_all_requested(); 2518 cap.flags |= RDMA_CAPABILITY_PIN_ALL; 2519 } 2520 2521 caps_to_network(&cap); 2522 2523 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2524 if (ret) { 2525 ERROR(errp, "posting second control recv"); 2526 goto err_rdma_source_connect; 2527 } 2528 2529 ret = rdma_connect(rdma->cm_id, &conn_param); 2530 if (ret) { 2531 perror("rdma_connect"); 2532 ERROR(errp, "connecting to destination!"); 2533 goto err_rdma_source_connect; 2534 } 2535 2536 if (return_path) { 2537 ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp); 2538 } else { 2539 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2540 } 2541 if (ret) { 2542 perror("rdma_get_cm_event after rdma_connect"); 2543 ERROR(errp, "connecting to destination!"); 2544 goto err_rdma_source_connect; 2545 } 2546 2547 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 2548 error_report("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect"); 2549 ERROR(errp, "connecting to destination!"); 2550 rdma_ack_cm_event(cm_event); 2551 goto err_rdma_source_connect; 2552 } 2553 rdma->connected = true; 2554 2555 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2556 network_to_caps(&cap); 2557 2558 /* 2559 * Verify that the *requested* capabilities are supported by the destination 2560 * and disable them otherwise. 2561 */ 2562 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) { 2563 ERROR(errp, "Server cannot support pinning all memory. " 2564 "Will register memory dynamically."); 2565 rdma->pin_all = false; 2566 } 2567 2568 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all); 2569 2570 rdma_ack_cm_event(cm_event); 2571 2572 rdma->control_ready_expected = 1; 2573 rdma->nb_sent = 0; 2574 return 0; 2575 2576err_rdma_source_connect: 2577 qemu_rdma_cleanup(rdma); 2578 return -1; 2579} 2580 2581static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp) 2582{ 2583 int ret, idx; 2584 struct rdma_cm_id *listen_id; 2585 char ip[40] = "unknown"; 2586 struct rdma_addrinfo *res, *e; 2587 char port_str[16]; 2588 2589 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2590 rdma->wr_data[idx].control_len = 0; 2591 rdma->wr_data[idx].control_curr = NULL; 2592 } 2593 2594 if (!rdma->host || !rdma->host[0]) { 2595 ERROR(errp, "RDMA host is not set!"); 2596 rdma->error_state = -EINVAL; 2597 return -1; 2598 } 2599 /* create CM channel */ 2600 rdma->channel = rdma_create_event_channel(); 2601 if (!rdma->channel) { 2602 ERROR(errp, "could not create rdma event channel"); 2603 rdma->error_state = -EINVAL; 2604 return -1; 2605 } 2606 2607 /* create CM id */ 2608 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP); 2609 if (ret) { 2610 ERROR(errp, "could not create cm_id!"); 2611 goto err_dest_init_create_listen_id; 2612 } 2613 2614 snprintf(port_str, 16, "%d", rdma->port); 2615 port_str[15] = '\0'; 2616 2617 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 2618 if (ret < 0) { 2619 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 2620 goto err_dest_init_bind_addr; 2621 } 2622 2623 for (e = res; e != NULL; e = e->ai_next) { 2624 inet_ntop(e->ai_family, 2625 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 2626 trace_qemu_rdma_dest_init_trying(rdma->host, ip); 2627 ret = rdma_bind_addr(listen_id, e->ai_dst_addr); 2628 if (ret) { 2629 continue; 2630 } 2631 if (e->ai_family == AF_INET6) { 2632 ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp); 2633 if (ret) { 2634 continue; 2635 } 2636 } 2637 break; 2638 } 2639 2640 rdma_freeaddrinfo(res); 2641 if (!e) { 2642 ERROR(errp, "Error: could not rdma_bind_addr!"); 2643 goto err_dest_init_bind_addr; 2644 } 2645 2646 rdma->listen_id = listen_id; 2647 qemu_rdma_dump_gid("dest_init", listen_id); 2648 return 0; 2649 2650err_dest_init_bind_addr: 2651 rdma_destroy_id(listen_id); 2652err_dest_init_create_listen_id: 2653 rdma_destroy_event_channel(rdma->channel); 2654 rdma->channel = NULL; 2655 rdma->error_state = ret; 2656 return ret; 2657 2658} 2659 2660static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path, 2661 RDMAContext *rdma) 2662{ 2663 int idx; 2664 2665 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2666 rdma_return_path->wr_data[idx].control_len = 0; 2667 rdma_return_path->wr_data[idx].control_curr = NULL; 2668 } 2669 2670 /*the CM channel and CM id is shared*/ 2671 rdma_return_path->channel = rdma->channel; 2672 rdma_return_path->listen_id = rdma->listen_id; 2673 2674 rdma->return_path = rdma_return_path; 2675 rdma_return_path->return_path = rdma; 2676 rdma_return_path->is_return_path = true; 2677} 2678 2679static void *qemu_rdma_data_init(const char *host_port, Error **errp) 2680{ 2681 RDMAContext *rdma = NULL; 2682 InetSocketAddress *addr; 2683 2684 if (host_port) { 2685 rdma = g_new0(RDMAContext, 1); 2686 rdma->current_index = -1; 2687 rdma->current_chunk = -1; 2688 2689 addr = g_new(InetSocketAddress, 1); 2690 if (!inet_parse(addr, host_port, NULL)) { 2691 rdma->port = atoi(addr->port); 2692 rdma->host = g_strdup(addr->host); 2693 rdma->host_port = g_strdup(host_port); 2694 } else { 2695 ERROR(errp, "bad RDMA migration address '%s'", host_port); 2696 g_free(rdma); 2697 rdma = NULL; 2698 } 2699 2700 qapi_free_InetSocketAddress(addr); 2701 } 2702 2703 return rdma; 2704} 2705 2706/* 2707 * QEMUFile interface to the control channel. 2708 * SEND messages for control only. 2709 * VM's ram is handled with regular RDMA messages. 2710 */ 2711static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, 2712 const struct iovec *iov, 2713 size_t niov, 2714 int *fds, 2715 size_t nfds, 2716 Error **errp) 2717{ 2718 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2719 QEMUFile *f = rioc->file; 2720 RDMAContext *rdma; 2721 int ret; 2722 ssize_t done = 0; 2723 size_t i; 2724 size_t len = 0; 2725 2726 RCU_READ_LOCK_GUARD(); 2727 rdma = qatomic_rcu_read(&rioc->rdmaout); 2728 2729 if (!rdma) { 2730 return -EIO; 2731 } 2732 2733 CHECK_ERROR_STATE(); 2734 2735 /* 2736 * Push out any writes that 2737 * we're queued up for VM's ram. 2738 */ 2739 ret = qemu_rdma_write_flush(f, rdma); 2740 if (ret < 0) { 2741 rdma->error_state = ret; 2742 return ret; 2743 } 2744 2745 for (i = 0; i < niov; i++) { 2746 size_t remaining = iov[i].iov_len; 2747 uint8_t * data = (void *)iov[i].iov_base; 2748 while (remaining) { 2749 RDMAControlHeader head; 2750 2751 len = MIN(remaining, RDMA_SEND_INCREMENT); 2752 remaining -= len; 2753 2754 head.len = len; 2755 head.type = RDMA_CONTROL_QEMU_FILE; 2756 2757 ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL); 2758 2759 if (ret < 0) { 2760 rdma->error_state = ret; 2761 return ret; 2762 } 2763 2764 data += len; 2765 done += len; 2766 } 2767 } 2768 2769 return done; 2770} 2771 2772static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, 2773 size_t size, int idx) 2774{ 2775 size_t len = 0; 2776 2777 if (rdma->wr_data[idx].control_len) { 2778 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size); 2779 2780 len = MIN(size, rdma->wr_data[idx].control_len); 2781 memcpy(buf, rdma->wr_data[idx].control_curr, len); 2782 rdma->wr_data[idx].control_curr += len; 2783 rdma->wr_data[idx].control_len -= len; 2784 } 2785 2786 return len; 2787} 2788 2789/* 2790 * QEMUFile interface to the control channel. 2791 * RDMA links don't use bytestreams, so we have to 2792 * return bytes to QEMUFile opportunistically. 2793 */ 2794static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, 2795 const struct iovec *iov, 2796 size_t niov, 2797 int **fds, 2798 size_t *nfds, 2799 Error **errp) 2800{ 2801 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2802 RDMAContext *rdma; 2803 RDMAControlHeader head; 2804 int ret = 0; 2805 ssize_t i; 2806 size_t done = 0; 2807 2808 RCU_READ_LOCK_GUARD(); 2809 rdma = qatomic_rcu_read(&rioc->rdmain); 2810 2811 if (!rdma) { 2812 return -EIO; 2813 } 2814 2815 CHECK_ERROR_STATE(); 2816 2817 for (i = 0; i < niov; i++) { 2818 size_t want = iov[i].iov_len; 2819 uint8_t *data = (void *)iov[i].iov_base; 2820 2821 /* 2822 * First, we hold on to the last SEND message we 2823 * were given and dish out the bytes until we run 2824 * out of bytes. 2825 */ 2826 ret = qemu_rdma_fill(rdma, data, want, 0); 2827 done += ret; 2828 want -= ret; 2829 /* Got what we needed, so go to next iovec */ 2830 if (want == 0) { 2831 continue; 2832 } 2833 2834 /* If we got any data so far, then don't wait 2835 * for more, just return what we have */ 2836 if (done > 0) { 2837 break; 2838 } 2839 2840 2841 /* We've got nothing at all, so lets wait for 2842 * more to arrive 2843 */ 2844 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE); 2845 2846 if (ret < 0) { 2847 rdma->error_state = ret; 2848 return ret; 2849 } 2850 2851 /* 2852 * SEND was received with new bytes, now try again. 2853 */ 2854 ret = qemu_rdma_fill(rdma, data, want, 0); 2855 done += ret; 2856 want -= ret; 2857 2858 /* Still didn't get enough, so lets just return */ 2859 if (want) { 2860 if (done == 0) { 2861 return QIO_CHANNEL_ERR_BLOCK; 2862 } else { 2863 break; 2864 } 2865 } 2866 } 2867 return done; 2868} 2869 2870/* 2871 * Block until all the outstanding chunks have been delivered by the hardware. 2872 */ 2873static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma) 2874{ 2875 int ret; 2876 2877 if (qemu_rdma_write_flush(f, rdma) < 0) { 2878 return -EIO; 2879 } 2880 2881 while (rdma->nb_sent) { 2882 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2883 if (ret < 0) { 2884 error_report("rdma migration: complete polling error!"); 2885 return -EIO; 2886 } 2887 } 2888 2889 qemu_rdma_unregister_waiting(rdma); 2890 2891 return 0; 2892} 2893 2894 2895static int qio_channel_rdma_set_blocking(QIOChannel *ioc, 2896 bool blocking, 2897 Error **errp) 2898{ 2899 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2900 /* XXX we should make readv/writev actually honour this :-) */ 2901 rioc->blocking = blocking; 2902 return 0; 2903} 2904 2905 2906typedef struct QIOChannelRDMASource QIOChannelRDMASource; 2907struct QIOChannelRDMASource { 2908 GSource parent; 2909 QIOChannelRDMA *rioc; 2910 GIOCondition condition; 2911}; 2912 2913static gboolean 2914qio_channel_rdma_source_prepare(GSource *source, 2915 gint *timeout) 2916{ 2917 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 2918 RDMAContext *rdma; 2919 GIOCondition cond = 0; 2920 *timeout = -1; 2921 2922 RCU_READ_LOCK_GUARD(); 2923 if (rsource->condition == G_IO_IN) { 2924 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 2925 } else { 2926 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 2927 } 2928 2929 if (!rdma) { 2930 error_report("RDMAContext is NULL when prepare Gsource"); 2931 return FALSE; 2932 } 2933 2934 if (rdma->wr_data[0].control_len) { 2935 cond |= G_IO_IN; 2936 } 2937 cond |= G_IO_OUT; 2938 2939 return cond & rsource->condition; 2940} 2941 2942static gboolean 2943qio_channel_rdma_source_check(GSource *source) 2944{ 2945 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 2946 RDMAContext *rdma; 2947 GIOCondition cond = 0; 2948 2949 RCU_READ_LOCK_GUARD(); 2950 if (rsource->condition == G_IO_IN) { 2951 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 2952 } else { 2953 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 2954 } 2955 2956 if (!rdma) { 2957 error_report("RDMAContext is NULL when check Gsource"); 2958 return FALSE; 2959 } 2960 2961 if (rdma->wr_data[0].control_len) { 2962 cond |= G_IO_IN; 2963 } 2964 cond |= G_IO_OUT; 2965 2966 return cond & rsource->condition; 2967} 2968 2969static gboolean 2970qio_channel_rdma_source_dispatch(GSource *source, 2971 GSourceFunc callback, 2972 gpointer user_data) 2973{ 2974 QIOChannelFunc func = (QIOChannelFunc)callback; 2975 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 2976 RDMAContext *rdma; 2977 GIOCondition cond = 0; 2978 2979 RCU_READ_LOCK_GUARD(); 2980 if (rsource->condition == G_IO_IN) { 2981 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 2982 } else { 2983 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 2984 } 2985 2986 if (!rdma) { 2987 error_report("RDMAContext is NULL when dispatch Gsource"); 2988 return FALSE; 2989 } 2990 2991 if (rdma->wr_data[0].control_len) { 2992 cond |= G_IO_IN; 2993 } 2994 cond |= G_IO_OUT; 2995 2996 return (*func)(QIO_CHANNEL(rsource->rioc), 2997 (cond & rsource->condition), 2998 user_data); 2999} 3000 3001static void 3002qio_channel_rdma_source_finalize(GSource *source) 3003{ 3004 QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source; 3005 3006 object_unref(OBJECT(ssource->rioc)); 3007} 3008 3009GSourceFuncs qio_channel_rdma_source_funcs = { 3010 qio_channel_rdma_source_prepare, 3011 qio_channel_rdma_source_check, 3012 qio_channel_rdma_source_dispatch, 3013 qio_channel_rdma_source_finalize 3014}; 3015 3016static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc, 3017 GIOCondition condition) 3018{ 3019 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3020 QIOChannelRDMASource *ssource; 3021 GSource *source; 3022 3023 source = g_source_new(&qio_channel_rdma_source_funcs, 3024 sizeof(QIOChannelRDMASource)); 3025 ssource = (QIOChannelRDMASource *)source; 3026 3027 ssource->rioc = rioc; 3028 object_ref(OBJECT(rioc)); 3029 3030 ssource->condition = condition; 3031 3032 return source; 3033} 3034 3035static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc, 3036 AioContext *ctx, 3037 IOHandler *io_read, 3038 IOHandler *io_write, 3039 void *opaque) 3040{ 3041 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3042 if (io_read) { 3043 aio_set_fd_handler(ctx, rioc->rdmain->comp_channel->fd, 3044 false, io_read, io_write, NULL, opaque); 3045 } else { 3046 aio_set_fd_handler(ctx, rioc->rdmaout->comp_channel->fd, 3047 false, io_read, io_write, NULL, opaque); 3048 } 3049} 3050 3051struct rdma_close_rcu { 3052 struct rcu_head rcu; 3053 RDMAContext *rdmain; 3054 RDMAContext *rdmaout; 3055}; 3056 3057/* callback from qio_channel_rdma_close via call_rcu */ 3058static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu) 3059{ 3060 if (rcu->rdmain) { 3061 qemu_rdma_cleanup(rcu->rdmain); 3062 } 3063 3064 if (rcu->rdmaout) { 3065 qemu_rdma_cleanup(rcu->rdmaout); 3066 } 3067 3068 g_free(rcu->rdmain); 3069 g_free(rcu->rdmaout); 3070 g_free(rcu); 3071} 3072 3073static int qio_channel_rdma_close(QIOChannel *ioc, 3074 Error **errp) 3075{ 3076 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3077 RDMAContext *rdmain, *rdmaout; 3078 struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1); 3079 3080 trace_qemu_rdma_close(); 3081 3082 rdmain = rioc->rdmain; 3083 if (rdmain) { 3084 qatomic_rcu_set(&rioc->rdmain, NULL); 3085 } 3086 3087 rdmaout = rioc->rdmaout; 3088 if (rdmaout) { 3089 qatomic_rcu_set(&rioc->rdmaout, NULL); 3090 } 3091 3092 rcu->rdmain = rdmain; 3093 rcu->rdmaout = rdmaout; 3094 call_rcu(rcu, qio_channel_rdma_close_rcu, rcu); 3095 3096 return 0; 3097} 3098 3099static int 3100qio_channel_rdma_shutdown(QIOChannel *ioc, 3101 QIOChannelShutdown how, 3102 Error **errp) 3103{ 3104 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3105 RDMAContext *rdmain, *rdmaout; 3106 3107 RCU_READ_LOCK_GUARD(); 3108 3109 rdmain = qatomic_rcu_read(&rioc->rdmain); 3110 rdmaout = qatomic_rcu_read(&rioc->rdmain); 3111 3112 switch (how) { 3113 case QIO_CHANNEL_SHUTDOWN_READ: 3114 if (rdmain) { 3115 rdmain->error_state = -1; 3116 } 3117 break; 3118 case QIO_CHANNEL_SHUTDOWN_WRITE: 3119 if (rdmaout) { 3120 rdmaout->error_state = -1; 3121 } 3122 break; 3123 case QIO_CHANNEL_SHUTDOWN_BOTH: 3124 default: 3125 if (rdmain) { 3126 rdmain->error_state = -1; 3127 } 3128 if (rdmaout) { 3129 rdmaout->error_state = -1; 3130 } 3131 break; 3132 } 3133 3134 return 0; 3135} 3136 3137/* 3138 * Parameters: 3139 * @offset == 0 : 3140 * This means that 'block_offset' is a full virtual address that does not 3141 * belong to a RAMBlock of the virtual machine and instead 3142 * represents a private malloc'd memory area that the caller wishes to 3143 * transfer. 3144 * 3145 * @offset != 0 : 3146 * Offset is an offset to be added to block_offset and used 3147 * to also lookup the corresponding RAMBlock. 3148 * 3149 * @size > 0 : 3150 * Initiate an transfer this size. 3151 * 3152 * @size == 0 : 3153 * A 'hint' or 'advice' that means that we wish to speculatively 3154 * and asynchronously unregister this memory. In this case, there is no 3155 * guarantee that the unregister will actually happen, for example, 3156 * if the memory is being actively transmitted. Additionally, the memory 3157 * may be re-registered at any future time if a write within the same 3158 * chunk was requested again, even if you attempted to unregister it 3159 * here. 3160 * 3161 * @size < 0 : TODO, not yet supported 3162 * Unregister the memory NOW. This means that the caller does not 3163 * expect there to be any future RDMA transfers and we just want to clean 3164 * things up. This is used in case the upper layer owns the memory and 3165 * cannot wait for qemu_fclose() to occur. 3166 * 3167 * @bytes_sent : User-specificed pointer to indicate how many bytes were 3168 * sent. Usually, this will not be more than a few bytes of 3169 * the protocol because most transfers are sent asynchronously. 3170 */ 3171static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque, 3172 ram_addr_t block_offset, ram_addr_t offset, 3173 size_t size, uint64_t *bytes_sent) 3174{ 3175 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3176 RDMAContext *rdma; 3177 int ret; 3178 3179 RCU_READ_LOCK_GUARD(); 3180 rdma = qatomic_rcu_read(&rioc->rdmaout); 3181 3182 if (!rdma) { 3183 return -EIO; 3184 } 3185 3186 CHECK_ERROR_STATE(); 3187 3188 if (migration_in_postcopy()) { 3189 return RAM_SAVE_CONTROL_NOT_SUPP; 3190 } 3191 3192 qemu_fflush(f); 3193 3194 if (size > 0) { 3195 /* 3196 * Add this page to the current 'chunk'. If the chunk 3197 * is full, or the page doesn't belong to the current chunk, 3198 * an actual RDMA write will occur and a new chunk will be formed. 3199 */ 3200 ret = qemu_rdma_write(f, rdma, block_offset, offset, size); 3201 if (ret < 0) { 3202 error_report("rdma migration: write error! %d", ret); 3203 goto err; 3204 } 3205 3206 /* 3207 * We always return 1 bytes because the RDMA 3208 * protocol is completely asynchronous. We do not yet know 3209 * whether an identified chunk is zero or not because we're 3210 * waiting for other pages to potentially be merged with 3211 * the current chunk. So, we have to call qemu_update_position() 3212 * later on when the actual write occurs. 3213 */ 3214 if (bytes_sent) { 3215 *bytes_sent = 1; 3216 } 3217 } else { 3218 uint64_t index, chunk; 3219 3220 /* TODO: Change QEMUFileOps prototype to be signed: size_t => long 3221 if (size < 0) { 3222 ret = qemu_rdma_drain_cq(f, rdma); 3223 if (ret < 0) { 3224 fprintf(stderr, "rdma: failed to synchronously drain" 3225 " completion queue before unregistration.\n"); 3226 goto err; 3227 } 3228 } 3229 */ 3230 3231 ret = qemu_rdma_search_ram_block(rdma, block_offset, 3232 offset, size, &index, &chunk); 3233 3234 if (ret) { 3235 error_report("ram block search failed"); 3236 goto err; 3237 } 3238 3239 qemu_rdma_signal_unregister(rdma, index, chunk, 0); 3240 3241 /* 3242 * TODO: Synchronous, guaranteed unregistration (should not occur during 3243 * fast-path). Otherwise, unregisters will process on the next call to 3244 * qemu_rdma_drain_cq() 3245 if (size < 0) { 3246 qemu_rdma_unregister_waiting(rdma); 3247 } 3248 */ 3249 } 3250 3251 /* 3252 * Drain the Completion Queue if possible, but do not block, 3253 * just poll. 3254 * 3255 * If nothing to poll, the end of the iteration will do this 3256 * again to make sure we don't overflow the request queue. 3257 */ 3258 while (1) { 3259 uint64_t wr_id, wr_id_in; 3260 int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL); 3261 if (ret < 0) { 3262 error_report("rdma migration: polling error! %d", ret); 3263 goto err; 3264 } 3265 3266 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 3267 3268 if (wr_id == RDMA_WRID_NONE) { 3269 break; 3270 } 3271 } 3272 3273 return RAM_SAVE_CONTROL_DELAYED; 3274err: 3275 rdma->error_state = ret; 3276 return ret; 3277} 3278 3279static void rdma_accept_incoming_migration(void *opaque); 3280 3281static void rdma_cm_poll_handler(void *opaque) 3282{ 3283 RDMAContext *rdma = opaque; 3284 int ret; 3285 struct rdma_cm_event *cm_event; 3286 MigrationIncomingState *mis = migration_incoming_get_current(); 3287 3288 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3289 if (ret) { 3290 error_report("get_cm_event failed %d", errno); 3291 return; 3292 } 3293 3294 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED || 3295 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) { 3296 if (!rdma->error_state && 3297 migration_incoming_get_current()->state != 3298 MIGRATION_STATUS_COMPLETED) { 3299 error_report("receive cm event, cm event is %d", cm_event->event); 3300 rdma->error_state = -EPIPE; 3301 if (rdma->return_path) { 3302 rdma->return_path->error_state = -EPIPE; 3303 } 3304 } 3305 rdma_ack_cm_event(cm_event); 3306 3307 if (mis->migration_incoming_co) { 3308 qemu_coroutine_enter(mis->migration_incoming_co); 3309 } 3310 return; 3311 } 3312 rdma_ack_cm_event(cm_event); 3313} 3314 3315static int qemu_rdma_accept(RDMAContext *rdma) 3316{ 3317 RDMACapabilities cap; 3318 struct rdma_conn_param conn_param = { 3319 .responder_resources = 2, 3320 .private_data = &cap, 3321 .private_data_len = sizeof(cap), 3322 }; 3323 RDMAContext *rdma_return_path = NULL; 3324 struct rdma_cm_event *cm_event; 3325 struct ibv_context *verbs; 3326 int ret = -EINVAL; 3327 int idx; 3328 3329 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3330 if (ret) { 3331 goto err_rdma_dest_wait; 3332 } 3333 3334 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 3335 rdma_ack_cm_event(cm_event); 3336 goto err_rdma_dest_wait; 3337 } 3338 3339 /* 3340 * initialize the RDMAContext for return path for postcopy after first 3341 * connection request reached. 3342 */ 3343 if (migrate_postcopy() && !rdma->is_return_path) { 3344 rdma_return_path = qemu_rdma_data_init(rdma->host_port, NULL); 3345 if (rdma_return_path == NULL) { 3346 rdma_ack_cm_event(cm_event); 3347 goto err_rdma_dest_wait; 3348 } 3349 3350 qemu_rdma_return_path_dest_init(rdma_return_path, rdma); 3351 } 3352 3353 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 3354 3355 network_to_caps(&cap); 3356 3357 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) { 3358 error_report("Unknown source RDMA version: %d, bailing...", 3359 cap.version); 3360 rdma_ack_cm_event(cm_event); 3361 goto err_rdma_dest_wait; 3362 } 3363 3364 /* 3365 * Respond with only the capabilities this version of QEMU knows about. 3366 */ 3367 cap.flags &= known_capabilities; 3368 3369 /* 3370 * Enable the ones that we do know about. 3371 * Add other checks here as new ones are introduced. 3372 */ 3373 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) { 3374 rdma->pin_all = true; 3375 } 3376 3377 rdma->cm_id = cm_event->id; 3378 verbs = cm_event->id->verbs; 3379 3380 rdma_ack_cm_event(cm_event); 3381 3382 trace_qemu_rdma_accept_pin_state(rdma->pin_all); 3383 3384 caps_to_network(&cap); 3385 3386 trace_qemu_rdma_accept_pin_verbsc(verbs); 3387 3388 if (!rdma->verbs) { 3389 rdma->verbs = verbs; 3390 } else if (rdma->verbs != verbs) { 3391 error_report("ibv context not matching %p, %p!", rdma->verbs, 3392 verbs); 3393 goto err_rdma_dest_wait; 3394 } 3395 3396 qemu_rdma_dump_id("dest_init", verbs); 3397 3398 ret = qemu_rdma_alloc_pd_cq(rdma); 3399 if (ret) { 3400 error_report("rdma migration: error allocating pd and cq!"); 3401 goto err_rdma_dest_wait; 3402 } 3403 3404 ret = qemu_rdma_alloc_qp(rdma); 3405 if (ret) { 3406 error_report("rdma migration: error allocating qp!"); 3407 goto err_rdma_dest_wait; 3408 } 3409 3410 ret = qemu_rdma_init_ram_blocks(rdma); 3411 if (ret) { 3412 error_report("rdma migration: error initializing ram blocks!"); 3413 goto err_rdma_dest_wait; 3414 } 3415 3416 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 3417 ret = qemu_rdma_reg_control(rdma, idx); 3418 if (ret) { 3419 error_report("rdma: error registering %d control", idx); 3420 goto err_rdma_dest_wait; 3421 } 3422 } 3423 3424 /* Accept the second connection request for return path */ 3425 if (migrate_postcopy() && !rdma->is_return_path) { 3426 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 3427 NULL, 3428 (void *)(intptr_t)rdma->return_path); 3429 } else { 3430 qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler, 3431 NULL, rdma); 3432 } 3433 3434 ret = rdma_accept(rdma->cm_id, &conn_param); 3435 if (ret) { 3436 error_report("rdma_accept returns %d", ret); 3437 goto err_rdma_dest_wait; 3438 } 3439 3440 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3441 if (ret) { 3442 error_report("rdma_accept get_cm_event failed %d", ret); 3443 goto err_rdma_dest_wait; 3444 } 3445 3446 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 3447 error_report("rdma_accept not event established"); 3448 rdma_ack_cm_event(cm_event); 3449 goto err_rdma_dest_wait; 3450 } 3451 3452 rdma_ack_cm_event(cm_event); 3453 rdma->connected = true; 3454 3455 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 3456 if (ret) { 3457 error_report("rdma migration: error posting second control recv"); 3458 goto err_rdma_dest_wait; 3459 } 3460 3461 qemu_rdma_dump_gid("dest_connect", rdma->cm_id); 3462 3463 return 0; 3464 3465err_rdma_dest_wait: 3466 rdma->error_state = ret; 3467 qemu_rdma_cleanup(rdma); 3468 g_free(rdma_return_path); 3469 return ret; 3470} 3471 3472static int dest_ram_sort_func(const void *a, const void *b) 3473{ 3474 unsigned int a_index = ((const RDMALocalBlock *)a)->src_index; 3475 unsigned int b_index = ((const RDMALocalBlock *)b)->src_index; 3476 3477 return (a_index < b_index) ? -1 : (a_index != b_index); 3478} 3479 3480/* 3481 * During each iteration of the migration, we listen for instructions 3482 * by the source VM to perform dynamic page registrations before they 3483 * can perform RDMA operations. 3484 * 3485 * We respond with the 'rkey'. 3486 * 3487 * Keep doing this until the source tells us to stop. 3488 */ 3489static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque) 3490{ 3491 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult), 3492 .type = RDMA_CONTROL_REGISTER_RESULT, 3493 .repeat = 0, 3494 }; 3495 RDMAControlHeader unreg_resp = { .len = 0, 3496 .type = RDMA_CONTROL_UNREGISTER_FINISHED, 3497 .repeat = 0, 3498 }; 3499 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, 3500 .repeat = 1 }; 3501 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3502 RDMAContext *rdma; 3503 RDMALocalBlocks *local; 3504 RDMAControlHeader head; 3505 RDMARegister *reg, *registers; 3506 RDMACompress *comp; 3507 RDMARegisterResult *reg_result; 3508 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE]; 3509 RDMALocalBlock *block; 3510 void *host_addr; 3511 int ret = 0; 3512 int idx = 0; 3513 int count = 0; 3514 int i = 0; 3515 3516 RCU_READ_LOCK_GUARD(); 3517 rdma = qatomic_rcu_read(&rioc->rdmain); 3518 3519 if (!rdma) { 3520 return -EIO; 3521 } 3522 3523 CHECK_ERROR_STATE(); 3524 3525 local = &rdma->local_ram_blocks; 3526 do { 3527 trace_qemu_rdma_registration_handle_wait(); 3528 3529 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE); 3530 3531 if (ret < 0) { 3532 break; 3533 } 3534 3535 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) { 3536 error_report("rdma: Too many requests in this message (%d)." 3537 "Bailing.", head.repeat); 3538 ret = -EIO; 3539 break; 3540 } 3541 3542 switch (head.type) { 3543 case RDMA_CONTROL_COMPRESS: 3544 comp = (RDMACompress *) rdma->wr_data[idx].control_curr; 3545 network_to_compress(comp); 3546 3547 trace_qemu_rdma_registration_handle_compress(comp->length, 3548 comp->block_idx, 3549 comp->offset); 3550 if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) { 3551 error_report("rdma: 'compress' bad block index %u (vs %d)", 3552 (unsigned int)comp->block_idx, 3553 rdma->local_ram_blocks.nb_blocks); 3554 ret = -EIO; 3555 goto out; 3556 } 3557 block = &(rdma->local_ram_blocks.block[comp->block_idx]); 3558 3559 host_addr = block->local_host_addr + 3560 (comp->offset - block->offset); 3561 3562 ram_handle_compressed(host_addr, comp->value, comp->length); 3563 break; 3564 3565 case RDMA_CONTROL_REGISTER_FINISHED: 3566 trace_qemu_rdma_registration_handle_finished(); 3567 goto out; 3568 3569 case RDMA_CONTROL_RAM_BLOCKS_REQUEST: 3570 trace_qemu_rdma_registration_handle_ram_blocks(); 3571 3572 /* Sort our local RAM Block list so it's the same as the source, 3573 * we can do this since we've filled in a src_index in the list 3574 * as we received the RAMBlock list earlier. 3575 */ 3576 qsort(rdma->local_ram_blocks.block, 3577 rdma->local_ram_blocks.nb_blocks, 3578 sizeof(RDMALocalBlock), dest_ram_sort_func); 3579 for (i = 0; i < local->nb_blocks; i++) { 3580 local->block[i].index = i; 3581 } 3582 3583 if (rdma->pin_all) { 3584 ret = qemu_rdma_reg_whole_ram_blocks(rdma); 3585 if (ret) { 3586 error_report("rdma migration: error dest " 3587 "registering ram blocks"); 3588 goto out; 3589 } 3590 } 3591 3592 /* 3593 * Dest uses this to prepare to transmit the RAMBlock descriptions 3594 * to the source VM after connection setup. 3595 * Both sides use the "remote" structure to communicate and update 3596 * their "local" descriptions with what was sent. 3597 */ 3598 for (i = 0; i < local->nb_blocks; i++) { 3599 rdma->dest_blocks[i].remote_host_addr = 3600 (uintptr_t)(local->block[i].local_host_addr); 3601 3602 if (rdma->pin_all) { 3603 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey; 3604 } 3605 3606 rdma->dest_blocks[i].offset = local->block[i].offset; 3607 rdma->dest_blocks[i].length = local->block[i].length; 3608 3609 dest_block_to_network(&rdma->dest_blocks[i]); 3610 trace_qemu_rdma_registration_handle_ram_blocks_loop( 3611 local->block[i].block_name, 3612 local->block[i].offset, 3613 local->block[i].length, 3614 local->block[i].local_host_addr, 3615 local->block[i].src_index); 3616 } 3617 3618 blocks.len = rdma->local_ram_blocks.nb_blocks 3619 * sizeof(RDMADestBlock); 3620 3621 3622 ret = qemu_rdma_post_send_control(rdma, 3623 (uint8_t *) rdma->dest_blocks, &blocks); 3624 3625 if (ret < 0) { 3626 error_report("rdma migration: error sending remote info"); 3627 goto out; 3628 } 3629 3630 break; 3631 case RDMA_CONTROL_REGISTER_REQUEST: 3632 trace_qemu_rdma_registration_handle_register(head.repeat); 3633 3634 reg_resp.repeat = head.repeat; 3635 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3636 3637 for (count = 0; count < head.repeat; count++) { 3638 uint64_t chunk; 3639 uint8_t *chunk_start, *chunk_end; 3640 3641 reg = ®isters[count]; 3642 network_to_register(reg); 3643 3644 reg_result = &results[count]; 3645 3646 trace_qemu_rdma_registration_handle_register_loop(count, 3647 reg->current_index, reg->key.current_addr, reg->chunks); 3648 3649 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) { 3650 error_report("rdma: 'register' bad block index %u (vs %d)", 3651 (unsigned int)reg->current_index, 3652 rdma->local_ram_blocks.nb_blocks); 3653 ret = -ENOENT; 3654 goto out; 3655 } 3656 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3657 if (block->is_ram_block) { 3658 if (block->offset > reg->key.current_addr) { 3659 error_report("rdma: bad register address for block %s" 3660 " offset: %" PRIx64 " current_addr: %" PRIx64, 3661 block->block_name, block->offset, 3662 reg->key.current_addr); 3663 ret = -ERANGE; 3664 goto out; 3665 } 3666 host_addr = (block->local_host_addr + 3667 (reg->key.current_addr - block->offset)); 3668 chunk = ram_chunk_index(block->local_host_addr, 3669 (uint8_t *) host_addr); 3670 } else { 3671 chunk = reg->key.chunk; 3672 host_addr = block->local_host_addr + 3673 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT)); 3674 /* Check for particularly bad chunk value */ 3675 if (host_addr < (void *)block->local_host_addr) { 3676 error_report("rdma: bad chunk for block %s" 3677 " chunk: %" PRIx64, 3678 block->block_name, reg->key.chunk); 3679 ret = -ERANGE; 3680 goto out; 3681 } 3682 } 3683 chunk_start = ram_chunk_start(block, chunk); 3684 chunk_end = ram_chunk_end(block, chunk + reg->chunks); 3685 /* avoid "-Waddress-of-packed-member" warning */ 3686 uint32_t tmp_rkey = 0; 3687 if (qemu_rdma_register_and_get_keys(rdma, block, 3688 (uintptr_t)host_addr, NULL, &tmp_rkey, 3689 chunk, chunk_start, chunk_end)) { 3690 error_report("cannot get rkey"); 3691 ret = -EINVAL; 3692 goto out; 3693 } 3694 reg_result->rkey = tmp_rkey; 3695 3696 reg_result->host_addr = (uintptr_t)block->local_host_addr; 3697 3698 trace_qemu_rdma_registration_handle_register_rkey( 3699 reg_result->rkey); 3700 3701 result_to_network(reg_result); 3702 } 3703 3704 ret = qemu_rdma_post_send_control(rdma, 3705 (uint8_t *) results, ®_resp); 3706 3707 if (ret < 0) { 3708 error_report("Failed to send control buffer"); 3709 goto out; 3710 } 3711 break; 3712 case RDMA_CONTROL_UNREGISTER_REQUEST: 3713 trace_qemu_rdma_registration_handle_unregister(head.repeat); 3714 unreg_resp.repeat = head.repeat; 3715 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3716 3717 for (count = 0; count < head.repeat; count++) { 3718 reg = ®isters[count]; 3719 network_to_register(reg); 3720 3721 trace_qemu_rdma_registration_handle_unregister_loop(count, 3722 reg->current_index, reg->key.chunk); 3723 3724 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3725 3726 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]); 3727 block->pmr[reg->key.chunk] = NULL; 3728 3729 if (ret != 0) { 3730 perror("rdma unregistration chunk failed"); 3731 ret = -ret; 3732 goto out; 3733 } 3734 3735 rdma->total_registrations--; 3736 3737 trace_qemu_rdma_registration_handle_unregister_success( 3738 reg->key.chunk); 3739 } 3740 3741 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp); 3742 3743 if (ret < 0) { 3744 error_report("Failed to send control buffer"); 3745 goto out; 3746 } 3747 break; 3748 case RDMA_CONTROL_REGISTER_RESULT: 3749 error_report("Invalid RESULT message at dest."); 3750 ret = -EIO; 3751 goto out; 3752 default: 3753 error_report("Unknown control message %s", control_desc(head.type)); 3754 ret = -EIO; 3755 goto out; 3756 } 3757 } while (1); 3758out: 3759 if (ret < 0) { 3760 rdma->error_state = ret; 3761 } 3762 return ret; 3763} 3764 3765/* Destination: 3766 * Called via a ram_control_load_hook during the initial RAM load section which 3767 * lists the RAMBlocks by name. This lets us know the order of the RAMBlocks 3768 * on the source. 3769 * We've already built our local RAMBlock list, but not yet sent the list to 3770 * the source. 3771 */ 3772static int 3773rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name) 3774{ 3775 RDMAContext *rdma; 3776 int curr; 3777 int found = -1; 3778 3779 RCU_READ_LOCK_GUARD(); 3780 rdma = qatomic_rcu_read(&rioc->rdmain); 3781 3782 if (!rdma) { 3783 return -EIO; 3784 } 3785 3786 /* Find the matching RAMBlock in our local list */ 3787 for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) { 3788 if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) { 3789 found = curr; 3790 break; 3791 } 3792 } 3793 3794 if (found == -1) { 3795 error_report("RAMBlock '%s' not found on destination", name); 3796 return -ENOENT; 3797 } 3798 3799 rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index; 3800 trace_rdma_block_notification_handle(name, rdma->next_src_index); 3801 rdma->next_src_index++; 3802 3803 return 0; 3804} 3805 3806static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data) 3807{ 3808 switch (flags) { 3809 case RAM_CONTROL_BLOCK_REG: 3810 return rdma_block_notification_handle(opaque, data); 3811 3812 case RAM_CONTROL_HOOK: 3813 return qemu_rdma_registration_handle(f, opaque); 3814 3815 default: 3816 /* Shouldn't be called with any other values */ 3817 abort(); 3818 } 3819} 3820 3821static int qemu_rdma_registration_start(QEMUFile *f, void *opaque, 3822 uint64_t flags, void *data) 3823{ 3824 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3825 RDMAContext *rdma; 3826 3827 RCU_READ_LOCK_GUARD(); 3828 rdma = qatomic_rcu_read(&rioc->rdmaout); 3829 if (!rdma) { 3830 return -EIO; 3831 } 3832 3833 CHECK_ERROR_STATE(); 3834 3835 if (migration_in_postcopy()) { 3836 return 0; 3837 } 3838 3839 trace_qemu_rdma_registration_start(flags); 3840 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK); 3841 qemu_fflush(f); 3842 3843 return 0; 3844} 3845 3846/* 3847 * Inform dest that dynamic registrations are done for now. 3848 * First, flush writes, if any. 3849 */ 3850static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, 3851 uint64_t flags, void *data) 3852{ 3853 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3854 RDMAContext *rdma; 3855 RDMAControlHeader head = { .len = 0, .repeat = 1 }; 3856 int ret = 0; 3857 3858 RCU_READ_LOCK_GUARD(); 3859 rdma = qatomic_rcu_read(&rioc->rdmaout); 3860 if (!rdma) { 3861 return -EIO; 3862 } 3863 3864 CHECK_ERROR_STATE(); 3865 3866 if (migration_in_postcopy()) { 3867 return 0; 3868 } 3869 3870 qemu_fflush(f); 3871 ret = qemu_rdma_drain_cq(f, rdma); 3872 3873 if (ret < 0) { 3874 goto err; 3875 } 3876 3877 if (flags == RAM_CONTROL_SETUP) { 3878 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT }; 3879 RDMALocalBlocks *local = &rdma->local_ram_blocks; 3880 int reg_result_idx, i, nb_dest_blocks; 3881 3882 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST; 3883 trace_qemu_rdma_registration_stop_ram(); 3884 3885 /* 3886 * Make sure that we parallelize the pinning on both sides. 3887 * For very large guests, doing this serially takes a really 3888 * long time, so we have to 'interleave' the pinning locally 3889 * with the control messages by performing the pinning on this 3890 * side before we receive the control response from the other 3891 * side that the pinning has completed. 3892 */ 3893 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp, 3894 ®_result_idx, rdma->pin_all ? 3895 qemu_rdma_reg_whole_ram_blocks : NULL); 3896 if (ret < 0) { 3897 fprintf(stderr, "receiving remote info!"); 3898 return ret; 3899 } 3900 3901 nb_dest_blocks = resp.len / sizeof(RDMADestBlock); 3902 3903 /* 3904 * The protocol uses two different sets of rkeys (mutually exclusive): 3905 * 1. One key to represent the virtual address of the entire ram block. 3906 * (dynamic chunk registration disabled - pin everything with one rkey.) 3907 * 2. One to represent individual chunks within a ram block. 3908 * (dynamic chunk registration enabled - pin individual chunks.) 3909 * 3910 * Once the capability is successfully negotiated, the destination transmits 3911 * the keys to use (or sends them later) including the virtual addresses 3912 * and then propagates the remote ram block descriptions to his local copy. 3913 */ 3914 3915 if (local->nb_blocks != nb_dest_blocks) { 3916 fprintf(stderr, "ram blocks mismatch (Number of blocks %d vs %d) " 3917 "Your QEMU command line parameters are probably " 3918 "not identical on both the source and destination.", 3919 local->nb_blocks, nb_dest_blocks); 3920 rdma->error_state = -EINVAL; 3921 return -EINVAL; 3922 } 3923 3924 qemu_rdma_move_header(rdma, reg_result_idx, &resp); 3925 memcpy(rdma->dest_blocks, 3926 rdma->wr_data[reg_result_idx].control_curr, resp.len); 3927 for (i = 0; i < nb_dest_blocks; i++) { 3928 network_to_dest_block(&rdma->dest_blocks[i]); 3929 3930 /* We require that the blocks are in the same order */ 3931 if (rdma->dest_blocks[i].length != local->block[i].length) { 3932 fprintf(stderr, "Block %s/%d has a different length %" PRIu64 3933 "vs %" PRIu64, local->block[i].block_name, i, 3934 local->block[i].length, 3935 rdma->dest_blocks[i].length); 3936 rdma->error_state = -EINVAL; 3937 return -EINVAL; 3938 } 3939 local->block[i].remote_host_addr = 3940 rdma->dest_blocks[i].remote_host_addr; 3941 local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey; 3942 } 3943 } 3944 3945 trace_qemu_rdma_registration_stop(flags); 3946 3947 head.type = RDMA_CONTROL_REGISTER_FINISHED; 3948 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL); 3949 3950 if (ret < 0) { 3951 goto err; 3952 } 3953 3954 return 0; 3955err: 3956 rdma->error_state = ret; 3957 return ret; 3958} 3959 3960static const QEMUFileHooks rdma_read_hooks = { 3961 .hook_ram_load = rdma_load_hook, 3962}; 3963 3964static const QEMUFileHooks rdma_write_hooks = { 3965 .before_ram_iterate = qemu_rdma_registration_start, 3966 .after_ram_iterate = qemu_rdma_registration_stop, 3967 .save_page = qemu_rdma_save_page, 3968}; 3969 3970 3971static void qio_channel_rdma_finalize(Object *obj) 3972{ 3973 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj); 3974 if (rioc->rdmain) { 3975 qemu_rdma_cleanup(rioc->rdmain); 3976 g_free(rioc->rdmain); 3977 rioc->rdmain = NULL; 3978 } 3979 if (rioc->rdmaout) { 3980 qemu_rdma_cleanup(rioc->rdmaout); 3981 g_free(rioc->rdmaout); 3982 rioc->rdmaout = NULL; 3983 } 3984} 3985 3986static void qio_channel_rdma_class_init(ObjectClass *klass, 3987 void *class_data G_GNUC_UNUSED) 3988{ 3989 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass); 3990 3991 ioc_klass->io_writev = qio_channel_rdma_writev; 3992 ioc_klass->io_readv = qio_channel_rdma_readv; 3993 ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking; 3994 ioc_klass->io_close = qio_channel_rdma_close; 3995 ioc_klass->io_create_watch = qio_channel_rdma_create_watch; 3996 ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler; 3997 ioc_klass->io_shutdown = qio_channel_rdma_shutdown; 3998} 3999 4000static const TypeInfo qio_channel_rdma_info = { 4001 .parent = TYPE_QIO_CHANNEL, 4002 .name = TYPE_QIO_CHANNEL_RDMA, 4003 .instance_size = sizeof(QIOChannelRDMA), 4004 .instance_finalize = qio_channel_rdma_finalize, 4005 .class_init = qio_channel_rdma_class_init, 4006}; 4007 4008static void qio_channel_rdma_register_types(void) 4009{ 4010 type_register_static(&qio_channel_rdma_info); 4011} 4012 4013type_init(qio_channel_rdma_register_types); 4014 4015static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) 4016{ 4017 QIOChannelRDMA *rioc; 4018 4019 if (qemu_file_mode_is_not_valid(mode)) { 4020 return NULL; 4021 } 4022 4023 rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); 4024 4025 if (mode[0] == 'w') { 4026 rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc)); 4027 rioc->rdmaout = rdma; 4028 rioc->rdmain = rdma->return_path; 4029 qemu_file_set_hooks(rioc->file, &rdma_write_hooks); 4030 } else { 4031 rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc)); 4032 rioc->rdmain = rdma; 4033 rioc->rdmaout = rdma->return_path; 4034 qemu_file_set_hooks(rioc->file, &rdma_read_hooks); 4035 } 4036 4037 return rioc->file; 4038} 4039 4040static void rdma_accept_incoming_migration(void *opaque) 4041{ 4042 RDMAContext *rdma = opaque; 4043 int ret; 4044 QEMUFile *f; 4045 Error *local_err = NULL; 4046 4047 trace_qemu_rdma_accept_incoming_migration(); 4048 ret = qemu_rdma_accept(rdma); 4049 4050 if (ret) { 4051 fprintf(stderr, "RDMA ERROR: Migration initialization failed\n"); 4052 return; 4053 } 4054 4055 trace_qemu_rdma_accept_incoming_migration_accepted(); 4056 4057 if (rdma->is_return_path) { 4058 return; 4059 } 4060 4061 f = qemu_fopen_rdma(rdma, "rb"); 4062 if (f == NULL) { 4063 fprintf(stderr, "RDMA ERROR: could not qemu_fopen_rdma\n"); 4064 qemu_rdma_cleanup(rdma); 4065 return; 4066 } 4067 4068 rdma->migration_started_on_destination = 1; 4069 migration_fd_process_incoming(f, &local_err); 4070 if (local_err) { 4071 error_reportf_err(local_err, "RDMA ERROR:"); 4072 } 4073} 4074 4075void rdma_start_incoming_migration(const char *host_port, Error **errp) 4076{ 4077 int ret; 4078 RDMAContext *rdma, *rdma_return_path = NULL; 4079 Error *local_err = NULL; 4080 4081 trace_rdma_start_incoming_migration(); 4082 4083 /* Avoid ram_block_discard_disable(), cannot change during migration. */ 4084 if (ram_block_discard_is_required()) { 4085 error_setg(errp, "RDMA: cannot disable RAM discard"); 4086 return; 4087 } 4088 4089 rdma = qemu_rdma_data_init(host_port, &local_err); 4090 if (rdma == NULL) { 4091 goto err; 4092 } 4093 4094 ret = qemu_rdma_dest_init(rdma, &local_err); 4095 4096 if (ret) { 4097 goto err; 4098 } 4099 4100 trace_rdma_start_incoming_migration_after_dest_init(); 4101 4102 ret = rdma_listen(rdma->listen_id, 5); 4103 4104 if (ret) { 4105 ERROR(errp, "listening on socket!"); 4106 goto cleanup_rdma; 4107 } 4108 4109 trace_rdma_start_incoming_migration_after_rdma_listen(); 4110 4111 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 4112 NULL, (void *)(intptr_t)rdma); 4113 return; 4114 4115cleanup_rdma: 4116 qemu_rdma_cleanup(rdma); 4117err: 4118 error_propagate(errp, local_err); 4119 if (rdma) { 4120 g_free(rdma->host); 4121 g_free(rdma->host_port); 4122 } 4123 g_free(rdma); 4124 g_free(rdma_return_path); 4125} 4126 4127void rdma_start_outgoing_migration(void *opaque, 4128 const char *host_port, Error **errp) 4129{ 4130 MigrationState *s = opaque; 4131 RDMAContext *rdma_return_path = NULL; 4132 RDMAContext *rdma; 4133 int ret = 0; 4134 4135 /* Avoid ram_block_discard_disable(), cannot change during migration. */ 4136 if (ram_block_discard_is_required()) { 4137 error_setg(errp, "RDMA: cannot disable RAM discard"); 4138 return; 4139 } 4140 4141 rdma = qemu_rdma_data_init(host_port, errp); 4142 if (rdma == NULL) { 4143 goto err; 4144 } 4145 4146 ret = qemu_rdma_source_init(rdma, 4147 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp); 4148 4149 if (ret) { 4150 goto err; 4151 } 4152 4153 trace_rdma_start_outgoing_migration_after_rdma_source_init(); 4154 ret = qemu_rdma_connect(rdma, errp, false); 4155 4156 if (ret) { 4157 goto err; 4158 } 4159 4160 /* RDMA postcopy need a separate queue pair for return path */ 4161 if (migrate_postcopy()) { 4162 rdma_return_path = qemu_rdma_data_init(host_port, errp); 4163 4164 if (rdma_return_path == NULL) { 4165 goto return_path_err; 4166 } 4167 4168 ret = qemu_rdma_source_init(rdma_return_path, 4169 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp); 4170 4171 if (ret) { 4172 goto return_path_err; 4173 } 4174 4175 ret = qemu_rdma_connect(rdma_return_path, errp, true); 4176 4177 if (ret) { 4178 goto return_path_err; 4179 } 4180 4181 rdma->return_path = rdma_return_path; 4182 rdma_return_path->return_path = rdma; 4183 rdma_return_path->is_return_path = true; 4184 } 4185 4186 trace_rdma_start_outgoing_migration_after_rdma_connect(); 4187 4188 s->to_dst_file = qemu_fopen_rdma(rdma, "wb"); 4189 migrate_fd_connect(s, NULL); 4190 return; 4191return_path_err: 4192 qemu_rdma_cleanup(rdma); 4193err: 4194 g_free(rdma); 4195 g_free(rdma_return_path); 4196}