cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

multifd.c (35651B)


      1/*
      2 * Multifd common code
      3 *
      4 * Copyright (c) 2019-2020 Red Hat Inc
      5 *
      6 * Authors:
      7 *  Juan Quintela <quintela@redhat.com>
      8 *
      9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
     10 * See the COPYING file in the top-level directory.
     11 */
     12
     13#include "qemu/osdep.h"
     14#include "qemu/rcu.h"
     15#include "exec/target_page.h"
     16#include "sysemu/sysemu.h"
     17#include "exec/ramblock.h"
     18#include "qemu/error-report.h"
     19#include "qapi/error.h"
     20#include "ram.h"
     21#include "migration.h"
     22#include "socket.h"
     23#include "tls.h"
     24#include "qemu-file.h"
     25#include "trace.h"
     26#include "multifd.h"
     27
     28#include "qemu/yank.h"
     29#include "io/channel-socket.h"
     30#include "yank_functions.h"
     31
     32/* Multiple fd's */
     33
     34#define MULTIFD_MAGIC 0x11223344U
     35#define MULTIFD_VERSION 1
     36
     37typedef struct {
     38    uint32_t magic;
     39    uint32_t version;
     40    unsigned char uuid[16]; /* QemuUUID */
     41    uint8_t id;
     42    uint8_t unused1[7];     /* Reserved for future use */
     43    uint64_t unused2[4];    /* Reserved for future use */
     44} __attribute__((packed)) MultiFDInit_t;
     45
     46/* Multifd without compression */
     47
     48/**
     49 * nocomp_send_setup: setup send side
     50 *
     51 * For no compression this function does nothing.
     52 *
     53 * Returns 0 for success or -1 for error
     54 *
     55 * @p: Params for the channel that we are using
     56 * @errp: pointer to an error
     57 */
     58static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
     59{
     60    return 0;
     61}
     62
     63/**
     64 * nocomp_send_cleanup: cleanup send side
     65 *
     66 * For no compression this function does nothing.
     67 *
     68 * @p: Params for the channel that we are using
     69 */
     70static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
     71{
     72    return;
     73}
     74
     75/**
     76 * nocomp_send_prepare: prepare date to be able to send
     77 *
     78 * For no compression we just have to calculate the size of the
     79 * packet.
     80 *
     81 * Returns 0 for success or -1 for error
     82 *
     83 * @p: Params for the channel that we are using
     84 * @used: number of pages used
     85 * @errp: pointer to an error
     86 */
     87static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used,
     88                               Error **errp)
     89{
     90    p->next_packet_size = used * qemu_target_page_size();
     91    p->flags |= MULTIFD_FLAG_NOCOMP;
     92    return 0;
     93}
     94
     95/**
     96 * nocomp_send_write: do the actual write of the data
     97 *
     98 * For no compression we just have to write the data.
     99 *
    100 * Returns 0 for success or -1 for error
    101 *
    102 * @p: Params for the channel that we are using
    103 * @used: number of pages used
    104 * @errp: pointer to an error
    105 */
    106static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
    107{
    108    return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
    109}
    110
    111/**
    112 * nocomp_recv_setup: setup receive side
    113 *
    114 * For no compression this function does nothing.
    115 *
    116 * Returns 0 for success or -1 for error
    117 *
    118 * @p: Params for the channel that we are using
    119 * @errp: pointer to an error
    120 */
    121static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
    122{
    123    return 0;
    124}
    125
    126/**
    127 * nocomp_recv_cleanup: setup receive side
    128 *
    129 * For no compression this function does nothing.
    130 *
    131 * @p: Params for the channel that we are using
    132 */
    133static void nocomp_recv_cleanup(MultiFDRecvParams *p)
    134{
    135}
    136
    137/**
    138 * nocomp_recv_pages: read the data from the channel into actual pages
    139 *
    140 * For no compression we just need to read things into the correct place.
    141 *
    142 * Returns 0 for success or -1 for error
    143 *
    144 * @p: Params for the channel that we are using
    145 * @used: number of pages used
    146 * @errp: pointer to an error
    147 */
    148static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
    149{
    150    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
    151
    152    if (flags != MULTIFD_FLAG_NOCOMP) {
    153        error_setg(errp, "multifd %d: flags received %x flags expected %x",
    154                   p->id, flags, MULTIFD_FLAG_NOCOMP);
    155        return -1;
    156    }
    157    return qio_channel_readv_all(p->c, p->pages->iov, used, errp);
    158}
    159
    160static MultiFDMethods multifd_nocomp_ops = {
    161    .send_setup = nocomp_send_setup,
    162    .send_cleanup = nocomp_send_cleanup,
    163    .send_prepare = nocomp_send_prepare,
    164    .send_write = nocomp_send_write,
    165    .recv_setup = nocomp_recv_setup,
    166    .recv_cleanup = nocomp_recv_cleanup,
    167    .recv_pages = nocomp_recv_pages
    168};
    169
    170static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
    171    [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
    172};
    173
    174void multifd_register_ops(int method, MultiFDMethods *ops)
    175{
    176    assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
    177    multifd_ops[method] = ops;
    178}
    179
    180static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
    181{
    182    MultiFDInit_t msg = {};
    183    int ret;
    184
    185    msg.magic = cpu_to_be32(MULTIFD_MAGIC);
    186    msg.version = cpu_to_be32(MULTIFD_VERSION);
    187    msg.id = p->id;
    188    memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
    189
    190    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
    191    if (ret != 0) {
    192        return -1;
    193    }
    194    return 0;
    195}
    196
    197static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
    198{
    199    MultiFDInit_t msg;
    200    int ret;
    201
    202    ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
    203    if (ret != 0) {
    204        return -1;
    205    }
    206
    207    msg.magic = be32_to_cpu(msg.magic);
    208    msg.version = be32_to_cpu(msg.version);
    209
    210    if (msg.magic != MULTIFD_MAGIC) {
    211        error_setg(errp, "multifd: received packet magic %x "
    212                   "expected %x", msg.magic, MULTIFD_MAGIC);
    213        return -1;
    214    }
    215
    216    if (msg.version != MULTIFD_VERSION) {
    217        error_setg(errp, "multifd: received packet version %d "
    218                   "expected %d", msg.version, MULTIFD_VERSION);
    219        return -1;
    220    }
    221
    222    if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
    223        char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
    224        char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
    225
    226        error_setg(errp, "multifd: received uuid '%s' and expected "
    227                   "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
    228        g_free(uuid);
    229        g_free(msg_uuid);
    230        return -1;
    231    }
    232
    233    if (msg.id > migrate_multifd_channels()) {
    234        error_setg(errp, "multifd: received channel version %d "
    235                   "expected %d", msg.version, MULTIFD_VERSION);
    236        return -1;
    237    }
    238
    239    return msg.id;
    240}
    241
    242static MultiFDPages_t *multifd_pages_init(size_t size)
    243{
    244    MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
    245
    246    pages->allocated = size;
    247    pages->iov = g_new0(struct iovec, size);
    248    pages->offset = g_new0(ram_addr_t, size);
    249
    250    return pages;
    251}
    252
    253static void multifd_pages_clear(MultiFDPages_t *pages)
    254{
    255    pages->used = 0;
    256    pages->allocated = 0;
    257    pages->packet_num = 0;
    258    pages->block = NULL;
    259    g_free(pages->iov);
    260    pages->iov = NULL;
    261    g_free(pages->offset);
    262    pages->offset = NULL;
    263    g_free(pages);
    264}
    265
    266static void multifd_send_fill_packet(MultiFDSendParams *p)
    267{
    268    MultiFDPacket_t *packet = p->packet;
    269    int i;
    270
    271    packet->flags = cpu_to_be32(p->flags);
    272    packet->pages_alloc = cpu_to_be32(p->pages->allocated);
    273    packet->pages_used = cpu_to_be32(p->pages->used);
    274    packet->next_packet_size = cpu_to_be32(p->next_packet_size);
    275    packet->packet_num = cpu_to_be64(p->packet_num);
    276
    277    if (p->pages->block) {
    278        strncpy(packet->ramblock, p->pages->block->idstr, 256);
    279    }
    280
    281    for (i = 0; i < p->pages->used; i++) {
    282        /* there are architectures where ram_addr_t is 32 bit */
    283        uint64_t temp = p->pages->offset[i];
    284
    285        packet->offset[i] = cpu_to_be64(temp);
    286    }
    287}
    288
    289static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
    290{
    291    MultiFDPacket_t *packet = p->packet;
    292    uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size();
    293    RAMBlock *block;
    294    int i;
    295
    296    packet->magic = be32_to_cpu(packet->magic);
    297    if (packet->magic != MULTIFD_MAGIC) {
    298        error_setg(errp, "multifd: received packet "
    299                   "magic %x and expected magic %x",
    300                   packet->magic, MULTIFD_MAGIC);
    301        return -1;
    302    }
    303
    304    packet->version = be32_to_cpu(packet->version);
    305    if (packet->version != MULTIFD_VERSION) {
    306        error_setg(errp, "multifd: received packet "
    307                   "version %d and expected version %d",
    308                   packet->version, MULTIFD_VERSION);
    309        return -1;
    310    }
    311
    312    p->flags = be32_to_cpu(packet->flags);
    313
    314    packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
    315    /*
    316     * If we received a packet that is 100 times bigger than expected
    317     * just stop migration.  It is a magic number.
    318     */
    319    if (packet->pages_alloc > pages_max * 100) {
    320        error_setg(errp, "multifd: received packet "
    321                   "with size %d and expected a maximum size of %d",
    322                   packet->pages_alloc, pages_max * 100) ;
    323        return -1;
    324    }
    325    /*
    326     * We received a packet that is bigger than expected but inside
    327     * reasonable limits (see previous comment).  Just reallocate.
    328     */
    329    if (packet->pages_alloc > p->pages->allocated) {
    330        multifd_pages_clear(p->pages);
    331        p->pages = multifd_pages_init(packet->pages_alloc);
    332    }
    333
    334    p->pages->used = be32_to_cpu(packet->pages_used);
    335    if (p->pages->used > packet->pages_alloc) {
    336        error_setg(errp, "multifd: received packet "
    337                   "with %d pages and expected maximum pages are %d",
    338                   p->pages->used, packet->pages_alloc) ;
    339        return -1;
    340    }
    341
    342    p->next_packet_size = be32_to_cpu(packet->next_packet_size);
    343    p->packet_num = be64_to_cpu(packet->packet_num);
    344
    345    if (p->pages->used == 0) {
    346        return 0;
    347    }
    348
    349    /* make sure that ramblock is 0 terminated */
    350    packet->ramblock[255] = 0;
    351    block = qemu_ram_block_by_name(packet->ramblock);
    352    if (!block) {
    353        error_setg(errp, "multifd: unknown ram block %s",
    354                   packet->ramblock);
    355        return -1;
    356    }
    357
    358    for (i = 0; i < p->pages->used; i++) {
    359        uint64_t offset = be64_to_cpu(packet->offset[i]);
    360
    361        if (offset > (block->used_length - qemu_target_page_size())) {
    362            error_setg(errp, "multifd: offset too long %" PRIu64
    363                       " (max " RAM_ADDR_FMT ")",
    364                       offset, block->used_length);
    365            return -1;
    366        }
    367        p->pages->iov[i].iov_base = block->host + offset;
    368        p->pages->iov[i].iov_len = qemu_target_page_size();
    369    }
    370
    371    return 0;
    372}
    373
    374struct {
    375    MultiFDSendParams *params;
    376    /* array of pages to sent */
    377    MultiFDPages_t *pages;
    378    /* global number of generated multifd packets */
    379    uint64_t packet_num;
    380    /* send channels ready */
    381    QemuSemaphore channels_ready;
    382    /*
    383     * Have we already run terminate threads.  There is a race when it
    384     * happens that we got one error while we are exiting.
    385     * We will use atomic operations.  Only valid values are 0 and 1.
    386     */
    387    int exiting;
    388    /* multifd ops */
    389    MultiFDMethods *ops;
    390} *multifd_send_state;
    391
    392/*
    393 * How we use multifd_send_state->pages and channel->pages?
    394 *
    395 * We create a pages for each channel, and a main one.  Each time that
    396 * we need to send a batch of pages we interchange the ones between
    397 * multifd_send_state and the channel that is sending it.  There are
    398 * two reasons for that:
    399 *    - to not have to do so many mallocs during migration
    400 *    - to make easier to know what to free at the end of migration
    401 *
    402 * This way we always know who is the owner of each "pages" struct,
    403 * and we don't need any locking.  It belongs to the migration thread
    404 * or to the channel thread.  Switching is safe because the migration
    405 * thread is using the channel mutex when changing it, and the channel
    406 * have to had finish with its own, otherwise pending_job can't be
    407 * false.
    408 */
    409
    410static int multifd_send_pages(QEMUFile *f)
    411{
    412    int i;
    413    static int next_channel;
    414    MultiFDSendParams *p = NULL; /* make happy gcc */
    415    MultiFDPages_t *pages = multifd_send_state->pages;
    416    uint64_t transferred;
    417
    418    if (qatomic_read(&multifd_send_state->exiting)) {
    419        return -1;
    420    }
    421
    422    qemu_sem_wait(&multifd_send_state->channels_ready);
    423    /*
    424     * next_channel can remain from a previous migration that was
    425     * using more channels, so ensure it doesn't overflow if the
    426     * limit is lower now.
    427     */
    428    next_channel %= migrate_multifd_channels();
    429    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
    430        p = &multifd_send_state->params[i];
    431
    432        qemu_mutex_lock(&p->mutex);
    433        if (p->quit) {
    434            error_report("%s: channel %d has already quit!", __func__, i);
    435            qemu_mutex_unlock(&p->mutex);
    436            return -1;
    437        }
    438        if (!p->pending_job) {
    439            p->pending_job++;
    440            next_channel = (i + 1) % migrate_multifd_channels();
    441            break;
    442        }
    443        qemu_mutex_unlock(&p->mutex);
    444    }
    445    assert(!p->pages->used);
    446    assert(!p->pages->block);
    447
    448    p->packet_num = multifd_send_state->packet_num++;
    449    multifd_send_state->pages = p->pages;
    450    p->pages = pages;
    451    transferred = ((uint64_t) pages->used) * qemu_target_page_size()
    452                + p->packet_len;
    453    qemu_file_update_transfer(f, transferred);
    454    ram_counters.multifd_bytes += transferred;
    455    ram_counters.transferred += transferred;
    456    qemu_mutex_unlock(&p->mutex);
    457    qemu_sem_post(&p->sem);
    458
    459    return 1;
    460}
    461
    462int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
    463{
    464    MultiFDPages_t *pages = multifd_send_state->pages;
    465
    466    if (!pages->block) {
    467        pages->block = block;
    468    }
    469
    470    if (pages->block == block) {
    471        pages->offset[pages->used] = offset;
    472        pages->iov[pages->used].iov_base = block->host + offset;
    473        pages->iov[pages->used].iov_len = qemu_target_page_size();
    474        pages->used++;
    475
    476        if (pages->used < pages->allocated) {
    477            return 1;
    478        }
    479    }
    480
    481    if (multifd_send_pages(f) < 0) {
    482        return -1;
    483    }
    484
    485    if (pages->block != block) {
    486        return  multifd_queue_page(f, block, offset);
    487    }
    488
    489    return 1;
    490}
    491
    492static void multifd_send_terminate_threads(Error *err)
    493{
    494    int i;
    495
    496    trace_multifd_send_terminate_threads(err != NULL);
    497
    498    if (err) {
    499        MigrationState *s = migrate_get_current();
    500        migrate_set_error(s, err);
    501        if (s->state == MIGRATION_STATUS_SETUP ||
    502            s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
    503            s->state == MIGRATION_STATUS_DEVICE ||
    504            s->state == MIGRATION_STATUS_ACTIVE) {
    505            migrate_set_state(&s->state, s->state,
    506                              MIGRATION_STATUS_FAILED);
    507        }
    508    }
    509
    510    /*
    511     * We don't want to exit each threads twice.  Depending on where
    512     * we get the error, or if there are two independent errors in two
    513     * threads at the same time, we can end calling this function
    514     * twice.
    515     */
    516    if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
    517        return;
    518    }
    519
    520    for (i = 0; i < migrate_multifd_channels(); i++) {
    521        MultiFDSendParams *p = &multifd_send_state->params[i];
    522
    523        qemu_mutex_lock(&p->mutex);
    524        p->quit = true;
    525        qemu_sem_post(&p->sem);
    526        qemu_mutex_unlock(&p->mutex);
    527    }
    528}
    529
    530void multifd_save_cleanup(void)
    531{
    532    int i;
    533
    534    if (!migrate_use_multifd()) {
    535        return;
    536    }
    537    multifd_send_terminate_threads(NULL);
    538    for (i = 0; i < migrate_multifd_channels(); i++) {
    539        MultiFDSendParams *p = &multifd_send_state->params[i];
    540
    541        if (p->running) {
    542            qemu_thread_join(&p->thread);
    543        }
    544    }
    545    for (i = 0; i < migrate_multifd_channels(); i++) {
    546        MultiFDSendParams *p = &multifd_send_state->params[i];
    547        Error *local_err = NULL;
    548
    549        socket_send_channel_destroy(p->c);
    550        p->c = NULL;
    551        qemu_mutex_destroy(&p->mutex);
    552        qemu_sem_destroy(&p->sem);
    553        qemu_sem_destroy(&p->sem_sync);
    554        g_free(p->name);
    555        p->name = NULL;
    556        g_free(p->tls_hostname);
    557        p->tls_hostname = NULL;
    558        multifd_pages_clear(p->pages);
    559        p->pages = NULL;
    560        p->packet_len = 0;
    561        g_free(p->packet);
    562        p->packet = NULL;
    563        multifd_send_state->ops->send_cleanup(p, &local_err);
    564        if (local_err) {
    565            migrate_set_error(migrate_get_current(), local_err);
    566            error_free(local_err);
    567        }
    568    }
    569    qemu_sem_destroy(&multifd_send_state->channels_ready);
    570    g_free(multifd_send_state->params);
    571    multifd_send_state->params = NULL;
    572    multifd_pages_clear(multifd_send_state->pages);
    573    multifd_send_state->pages = NULL;
    574    g_free(multifd_send_state);
    575    multifd_send_state = NULL;
    576}
    577
    578void multifd_send_sync_main(QEMUFile *f)
    579{
    580    int i;
    581
    582    if (!migrate_use_multifd()) {
    583        return;
    584    }
    585    if (multifd_send_state->pages->used) {
    586        if (multifd_send_pages(f) < 0) {
    587            error_report("%s: multifd_send_pages fail", __func__);
    588            return;
    589        }
    590    }
    591    for (i = 0; i < migrate_multifd_channels(); i++) {
    592        MultiFDSendParams *p = &multifd_send_state->params[i];
    593
    594        trace_multifd_send_sync_main_signal(p->id);
    595
    596        qemu_mutex_lock(&p->mutex);
    597
    598        if (p->quit) {
    599            error_report("%s: channel %d has already quit", __func__, i);
    600            qemu_mutex_unlock(&p->mutex);
    601            return;
    602        }
    603
    604        p->packet_num = multifd_send_state->packet_num++;
    605        p->flags |= MULTIFD_FLAG_SYNC;
    606        p->pending_job++;
    607        qemu_file_update_transfer(f, p->packet_len);
    608        ram_counters.multifd_bytes += p->packet_len;
    609        ram_counters.transferred += p->packet_len;
    610        qemu_mutex_unlock(&p->mutex);
    611        qemu_sem_post(&p->sem);
    612    }
    613    for (i = 0; i < migrate_multifd_channels(); i++) {
    614        MultiFDSendParams *p = &multifd_send_state->params[i];
    615
    616        trace_multifd_send_sync_main_wait(p->id);
    617        qemu_sem_wait(&p->sem_sync);
    618    }
    619    trace_multifd_send_sync_main(multifd_send_state->packet_num);
    620}
    621
    622static void *multifd_send_thread(void *opaque)
    623{
    624    MultiFDSendParams *p = opaque;
    625    Error *local_err = NULL;
    626    int ret = 0;
    627    uint32_t flags = 0;
    628
    629    trace_multifd_send_thread_start(p->id);
    630    rcu_register_thread();
    631
    632    if (multifd_send_initial_packet(p, &local_err) < 0) {
    633        ret = -1;
    634        goto out;
    635    }
    636    /* initial packet */
    637    p->num_packets = 1;
    638
    639    while (true) {
    640        qemu_sem_wait(&p->sem);
    641
    642        if (qatomic_read(&multifd_send_state->exiting)) {
    643            break;
    644        }
    645        qemu_mutex_lock(&p->mutex);
    646
    647        if (p->pending_job) {
    648            uint32_t used = p->pages->used;
    649            uint64_t packet_num = p->packet_num;
    650            flags = p->flags;
    651
    652            if (used) {
    653                ret = multifd_send_state->ops->send_prepare(p, used,
    654                                                            &local_err);
    655                if (ret != 0) {
    656                    qemu_mutex_unlock(&p->mutex);
    657                    break;
    658                }
    659            }
    660            multifd_send_fill_packet(p);
    661            p->flags = 0;
    662            p->num_packets++;
    663            p->num_pages += used;
    664            p->pages->used = 0;
    665            p->pages->block = NULL;
    666            qemu_mutex_unlock(&p->mutex);
    667
    668            trace_multifd_send(p->id, packet_num, used, flags,
    669                               p->next_packet_size);
    670
    671            ret = qio_channel_write_all(p->c, (void *)p->packet,
    672                                        p->packet_len, &local_err);
    673            if (ret != 0) {
    674                break;
    675            }
    676
    677            if (used) {
    678                ret = multifd_send_state->ops->send_write(p, used, &local_err);
    679                if (ret != 0) {
    680                    break;
    681                }
    682            }
    683
    684            qemu_mutex_lock(&p->mutex);
    685            p->pending_job--;
    686            qemu_mutex_unlock(&p->mutex);
    687
    688            if (flags & MULTIFD_FLAG_SYNC) {
    689                qemu_sem_post(&p->sem_sync);
    690            }
    691            qemu_sem_post(&multifd_send_state->channels_ready);
    692        } else if (p->quit) {
    693            qemu_mutex_unlock(&p->mutex);
    694            break;
    695        } else {
    696            qemu_mutex_unlock(&p->mutex);
    697            /* sometimes there are spurious wakeups */
    698        }
    699    }
    700
    701out:
    702    if (local_err) {
    703        trace_multifd_send_error(p->id);
    704        multifd_send_terminate_threads(local_err);
    705        error_free(local_err);
    706    }
    707
    708    /*
    709     * Error happen, I will exit, but I can't just leave, tell
    710     * who pay attention to me.
    711     */
    712    if (ret != 0) {
    713        qemu_sem_post(&p->sem_sync);
    714        qemu_sem_post(&multifd_send_state->channels_ready);
    715    }
    716
    717    qemu_mutex_lock(&p->mutex);
    718    p->running = false;
    719    qemu_mutex_unlock(&p->mutex);
    720
    721    rcu_unregister_thread();
    722    trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
    723
    724    return NULL;
    725}
    726
    727static bool multifd_channel_connect(MultiFDSendParams *p,
    728                                    QIOChannel *ioc,
    729                                    Error *error);
    730
    731static void multifd_tls_outgoing_handshake(QIOTask *task,
    732                                           gpointer opaque)
    733{
    734    MultiFDSendParams *p = opaque;
    735    QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
    736    Error *err = NULL;
    737
    738    if (qio_task_propagate_error(task, &err)) {
    739        trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
    740    } else {
    741        trace_multifd_tls_outgoing_handshake_complete(ioc);
    742    }
    743
    744    if (!multifd_channel_connect(p, ioc, err)) {
    745        /*
    746         * Error happen, mark multifd_send_thread status as 'quit' although it
    747         * is not created, and then tell who pay attention to me.
    748         */
    749        p->quit = true;
    750        qemu_sem_post(&multifd_send_state->channels_ready);
    751        qemu_sem_post(&p->sem_sync);
    752    }
    753}
    754
    755static void *multifd_tls_handshake_thread(void *opaque)
    756{
    757    MultiFDSendParams *p = opaque;
    758    QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c);
    759
    760    qio_channel_tls_handshake(tioc,
    761                              multifd_tls_outgoing_handshake,
    762                              p,
    763                              NULL,
    764                              NULL);
    765    return NULL;
    766}
    767
    768static void multifd_tls_channel_connect(MultiFDSendParams *p,
    769                                        QIOChannel *ioc,
    770                                        Error **errp)
    771{
    772    MigrationState *s = migrate_get_current();
    773    const char *hostname = p->tls_hostname;
    774    QIOChannelTLS *tioc;
    775
    776    tioc = migration_tls_client_create(s, ioc, hostname, errp);
    777    if (!tioc) {
    778        return;
    779    }
    780
    781    object_unref(OBJECT(ioc));
    782    trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
    783    qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
    784    p->c = QIO_CHANNEL(tioc);
    785    qemu_thread_create(&p->thread, "multifd-tls-handshake-worker",
    786                       multifd_tls_handshake_thread, p,
    787                       QEMU_THREAD_JOINABLE);
    788}
    789
    790static bool multifd_channel_connect(MultiFDSendParams *p,
    791                                    QIOChannel *ioc,
    792                                    Error *error)
    793{
    794    MigrationState *s = migrate_get_current();
    795
    796    trace_multifd_set_outgoing_channel(
    797        ioc, object_get_typename(OBJECT(ioc)), p->tls_hostname, error);
    798
    799    if (!error) {
    800        if (s->parameters.tls_creds &&
    801            *s->parameters.tls_creds &&
    802            !object_dynamic_cast(OBJECT(ioc),
    803                                 TYPE_QIO_CHANNEL_TLS)) {
    804            multifd_tls_channel_connect(p, ioc, &error);
    805            if (!error) {
    806                /*
    807                 * tls_channel_connect will call back to this
    808                 * function after the TLS handshake,
    809                 * so we mustn't call multifd_send_thread until then
    810                 */
    811                return true;
    812            } else {
    813                return false;
    814            }
    815        } else {
    816            /* update for tls qio channel */
    817            p->c = ioc;
    818            qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
    819                                   QEMU_THREAD_JOINABLE);
    820       }
    821       return true;
    822    }
    823
    824    return false;
    825}
    826
    827static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
    828                                             QIOChannel *ioc, Error *err)
    829{
    830     migrate_set_error(migrate_get_current(), err);
    831     /* Error happen, we need to tell who pay attention to me */
    832     qemu_sem_post(&multifd_send_state->channels_ready);
    833     qemu_sem_post(&p->sem_sync);
    834     /*
    835      * Although multifd_send_thread is not created, but main migration
    836      * thread neet to judge whether it is running, so we need to mark
    837      * its status.
    838      */
    839     p->quit = true;
    840     object_unref(OBJECT(ioc));
    841     error_free(err);
    842}
    843
    844static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
    845{
    846    MultiFDSendParams *p = opaque;
    847    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
    848    Error *local_err = NULL;
    849
    850    trace_multifd_new_send_channel_async(p->id);
    851    if (qio_task_propagate_error(task, &local_err)) {
    852        goto cleanup;
    853    } else {
    854        p->c = QIO_CHANNEL(sioc);
    855        qio_channel_set_delay(p->c, false);
    856        p->running = true;
    857        if (!multifd_channel_connect(p, sioc, local_err)) {
    858            goto cleanup;
    859        }
    860        return;
    861    }
    862
    863cleanup:
    864    multifd_new_send_channel_cleanup(p, sioc, local_err);
    865}
    866
    867int multifd_save_setup(Error **errp)
    868{
    869    int thread_count;
    870    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
    871    uint8_t i;
    872    MigrationState *s;
    873
    874    if (!migrate_use_multifd()) {
    875        return 0;
    876    }
    877    s = migrate_get_current();
    878    thread_count = migrate_multifd_channels();
    879    multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
    880    multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
    881    multifd_send_state->pages = multifd_pages_init(page_count);
    882    qemu_sem_init(&multifd_send_state->channels_ready, 0);
    883    qatomic_set(&multifd_send_state->exiting, 0);
    884    multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
    885
    886    for (i = 0; i < thread_count; i++) {
    887        MultiFDSendParams *p = &multifd_send_state->params[i];
    888
    889        qemu_mutex_init(&p->mutex);
    890        qemu_sem_init(&p->sem, 0);
    891        qemu_sem_init(&p->sem_sync, 0);
    892        p->quit = false;
    893        p->pending_job = 0;
    894        p->id = i;
    895        p->pages = multifd_pages_init(page_count);
    896        p->packet_len = sizeof(MultiFDPacket_t)
    897                      + sizeof(uint64_t) * page_count;
    898        p->packet = g_malloc0(p->packet_len);
    899        p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
    900        p->packet->version = cpu_to_be32(MULTIFD_VERSION);
    901        p->name = g_strdup_printf("multifdsend_%d", i);
    902        p->tls_hostname = g_strdup(s->hostname);
    903        socket_send_channel_create(multifd_new_send_channel_async, p);
    904    }
    905
    906    for (i = 0; i < thread_count; i++) {
    907        MultiFDSendParams *p = &multifd_send_state->params[i];
    908        Error *local_err = NULL;
    909        int ret;
    910
    911        ret = multifd_send_state->ops->send_setup(p, &local_err);
    912        if (ret) {
    913            error_propagate(errp, local_err);
    914            return ret;
    915        }
    916    }
    917    return 0;
    918}
    919
    920struct {
    921    MultiFDRecvParams *params;
    922    /* number of created threads */
    923    int count;
    924    /* syncs main thread and channels */
    925    QemuSemaphore sem_sync;
    926    /* global number of generated multifd packets */
    927    uint64_t packet_num;
    928    /* multifd ops */
    929    MultiFDMethods *ops;
    930} *multifd_recv_state;
    931
    932static void multifd_recv_terminate_threads(Error *err)
    933{
    934    int i;
    935
    936    trace_multifd_recv_terminate_threads(err != NULL);
    937
    938    if (err) {
    939        MigrationState *s = migrate_get_current();
    940        migrate_set_error(s, err);
    941        if (s->state == MIGRATION_STATUS_SETUP ||
    942            s->state == MIGRATION_STATUS_ACTIVE) {
    943            migrate_set_state(&s->state, s->state,
    944                              MIGRATION_STATUS_FAILED);
    945        }
    946    }
    947
    948    for (i = 0; i < migrate_multifd_channels(); i++) {
    949        MultiFDRecvParams *p = &multifd_recv_state->params[i];
    950
    951        qemu_mutex_lock(&p->mutex);
    952        p->quit = true;
    953        /*
    954         * We could arrive here for two reasons:
    955         *  - normal quit, i.e. everything went fine, just finished
    956         *  - error quit: We close the channels so the channel threads
    957         *    finish the qio_channel_read_all_eof()
    958         */
    959        if (p->c) {
    960            qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
    961        }
    962        qemu_mutex_unlock(&p->mutex);
    963    }
    964}
    965
    966int multifd_load_cleanup(Error **errp)
    967{
    968    int i;
    969
    970    if (!migrate_use_multifd()) {
    971        return 0;
    972    }
    973    multifd_recv_terminate_threads(NULL);
    974    for (i = 0; i < migrate_multifd_channels(); i++) {
    975        MultiFDRecvParams *p = &multifd_recv_state->params[i];
    976
    977        if (p->running) {
    978            p->quit = true;
    979            /*
    980             * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
    981             * however try to wakeup it without harm in cleanup phase.
    982             */
    983            qemu_sem_post(&p->sem_sync);
    984            qemu_thread_join(&p->thread);
    985        }
    986    }
    987    for (i = 0; i < migrate_multifd_channels(); i++) {
    988        MultiFDRecvParams *p = &multifd_recv_state->params[i];
    989
    990        if (OBJECT(p->c)->ref == 1) {
    991            migration_ioc_unregister_yank(p->c);
    992        }
    993
    994        object_unref(OBJECT(p->c));
    995        p->c = NULL;
    996        qemu_mutex_destroy(&p->mutex);
    997        qemu_sem_destroy(&p->sem_sync);
    998        g_free(p->name);
    999        p->name = NULL;
   1000        multifd_pages_clear(p->pages);
   1001        p->pages = NULL;
   1002        p->packet_len = 0;
   1003        g_free(p->packet);
   1004        p->packet = NULL;
   1005        multifd_recv_state->ops->recv_cleanup(p);
   1006    }
   1007    qemu_sem_destroy(&multifd_recv_state->sem_sync);
   1008    g_free(multifd_recv_state->params);
   1009    multifd_recv_state->params = NULL;
   1010    g_free(multifd_recv_state);
   1011    multifd_recv_state = NULL;
   1012
   1013    return 0;
   1014}
   1015
   1016void multifd_recv_sync_main(void)
   1017{
   1018    int i;
   1019
   1020    if (!migrate_use_multifd()) {
   1021        return;
   1022    }
   1023    for (i = 0; i < migrate_multifd_channels(); i++) {
   1024        MultiFDRecvParams *p = &multifd_recv_state->params[i];
   1025
   1026        trace_multifd_recv_sync_main_wait(p->id);
   1027        qemu_sem_wait(&multifd_recv_state->sem_sync);
   1028    }
   1029    for (i = 0; i < migrate_multifd_channels(); i++) {
   1030        MultiFDRecvParams *p = &multifd_recv_state->params[i];
   1031
   1032        WITH_QEMU_LOCK_GUARD(&p->mutex) {
   1033            if (multifd_recv_state->packet_num < p->packet_num) {
   1034                multifd_recv_state->packet_num = p->packet_num;
   1035            }
   1036        }
   1037        trace_multifd_recv_sync_main_signal(p->id);
   1038        qemu_sem_post(&p->sem_sync);
   1039    }
   1040    trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
   1041}
   1042
   1043static void *multifd_recv_thread(void *opaque)
   1044{
   1045    MultiFDRecvParams *p = opaque;
   1046    Error *local_err = NULL;
   1047    int ret;
   1048
   1049    trace_multifd_recv_thread_start(p->id);
   1050    rcu_register_thread();
   1051
   1052    while (true) {
   1053        uint32_t used;
   1054        uint32_t flags;
   1055
   1056        if (p->quit) {
   1057            break;
   1058        }
   1059
   1060        ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
   1061                                       p->packet_len, &local_err);
   1062        if (ret == 0) {   /* EOF */
   1063            break;
   1064        }
   1065        if (ret == -1) {   /* Error */
   1066            break;
   1067        }
   1068
   1069        qemu_mutex_lock(&p->mutex);
   1070        ret = multifd_recv_unfill_packet(p, &local_err);
   1071        if (ret) {
   1072            qemu_mutex_unlock(&p->mutex);
   1073            break;
   1074        }
   1075
   1076        used = p->pages->used;
   1077        flags = p->flags;
   1078        /* recv methods don't know how to handle the SYNC flag */
   1079        p->flags &= ~MULTIFD_FLAG_SYNC;
   1080        trace_multifd_recv(p->id, p->packet_num, used, flags,
   1081                           p->next_packet_size);
   1082        p->num_packets++;
   1083        p->num_pages += used;
   1084        qemu_mutex_unlock(&p->mutex);
   1085
   1086        if (used) {
   1087            ret = multifd_recv_state->ops->recv_pages(p, used, &local_err);
   1088            if (ret != 0) {
   1089                break;
   1090            }
   1091        }
   1092
   1093        if (flags & MULTIFD_FLAG_SYNC) {
   1094            qemu_sem_post(&multifd_recv_state->sem_sync);
   1095            qemu_sem_wait(&p->sem_sync);
   1096        }
   1097    }
   1098
   1099    if (local_err) {
   1100        multifd_recv_terminate_threads(local_err);
   1101        error_free(local_err);
   1102    }
   1103    qemu_mutex_lock(&p->mutex);
   1104    p->running = false;
   1105    qemu_mutex_unlock(&p->mutex);
   1106
   1107    rcu_unregister_thread();
   1108    trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
   1109
   1110    return NULL;
   1111}
   1112
   1113int multifd_load_setup(Error **errp)
   1114{
   1115    int thread_count;
   1116    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
   1117    uint8_t i;
   1118
   1119    if (!migrate_use_multifd()) {
   1120        return 0;
   1121    }
   1122    thread_count = migrate_multifd_channels();
   1123    multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
   1124    multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
   1125    qatomic_set(&multifd_recv_state->count, 0);
   1126    qemu_sem_init(&multifd_recv_state->sem_sync, 0);
   1127    multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
   1128
   1129    for (i = 0; i < thread_count; i++) {
   1130        MultiFDRecvParams *p = &multifd_recv_state->params[i];
   1131
   1132        qemu_mutex_init(&p->mutex);
   1133        qemu_sem_init(&p->sem_sync, 0);
   1134        p->quit = false;
   1135        p->id = i;
   1136        p->pages = multifd_pages_init(page_count);
   1137        p->packet_len = sizeof(MultiFDPacket_t)
   1138                      + sizeof(uint64_t) * page_count;
   1139        p->packet = g_malloc0(p->packet_len);
   1140        p->name = g_strdup_printf("multifdrecv_%d", i);
   1141    }
   1142
   1143    for (i = 0; i < thread_count; i++) {
   1144        MultiFDRecvParams *p = &multifd_recv_state->params[i];
   1145        Error *local_err = NULL;
   1146        int ret;
   1147
   1148        ret = multifd_recv_state->ops->recv_setup(p, &local_err);
   1149        if (ret) {
   1150            error_propagate(errp, local_err);
   1151            return ret;
   1152        }
   1153    }
   1154    return 0;
   1155}
   1156
   1157bool multifd_recv_all_channels_created(void)
   1158{
   1159    int thread_count = migrate_multifd_channels();
   1160
   1161    if (!migrate_use_multifd()) {
   1162        return true;
   1163    }
   1164
   1165    if (!multifd_recv_state) {
   1166        /* Called before any connections created */
   1167        return false;
   1168    }
   1169
   1170    return thread_count == qatomic_read(&multifd_recv_state->count);
   1171}
   1172
   1173/*
   1174 * Try to receive all multifd channels to get ready for the migration.
   1175 * - Return true and do not set @errp when correctly receiving all channels;
   1176 * - Return false and do not set @errp when correctly receiving the current one;
   1177 * - Return false and set @errp when failing to receive the current channel.
   1178 */
   1179bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
   1180{
   1181    MultiFDRecvParams *p;
   1182    Error *local_err = NULL;
   1183    int id;
   1184
   1185    id = multifd_recv_initial_packet(ioc, &local_err);
   1186    if (id < 0) {
   1187        multifd_recv_terminate_threads(local_err);
   1188        error_propagate_prepend(errp, local_err,
   1189                                "failed to receive packet"
   1190                                " via multifd channel %d: ",
   1191                                qatomic_read(&multifd_recv_state->count));
   1192        return false;
   1193    }
   1194    trace_multifd_recv_new_channel(id);
   1195
   1196    p = &multifd_recv_state->params[id];
   1197    if (p->c != NULL) {
   1198        error_setg(&local_err, "multifd: received id '%d' already setup'",
   1199                   id);
   1200        multifd_recv_terminate_threads(local_err);
   1201        error_propagate(errp, local_err);
   1202        return false;
   1203    }
   1204    p->c = ioc;
   1205    object_ref(OBJECT(ioc));
   1206    /* initial packet */
   1207    p->num_packets = 1;
   1208
   1209    p->running = true;
   1210    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
   1211                       QEMU_THREAD_JOINABLE);
   1212    qatomic_inc(&multifd_recv_state->count);
   1213    return qatomic_read(&multifd_recv_state->count) ==
   1214           migrate_multifd_channels();
   1215}