cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

colo.c (25929B)


      1/*
      2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
      3 * (a.k.a. Fault Tolerance or Continuous Replication)
      4 *
      5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
      6 * Copyright (c) 2016 FUJITSU LIMITED
      7 * Copyright (c) 2016 Intel Corporation
      8 *
      9 * This work is licensed under the terms of the GNU GPL, version 2 or
     10 * later.  See the COPYING file in the top-level directory.
     11 */
     12
     13#include "qemu/osdep.h"
     14#include "sysemu/sysemu.h"
     15#include "qapi/error.h"
     16#include "qapi/qapi-commands-migration.h"
     17#include "qemu-file-channel.h"
     18#include "migration.h"
     19#include "qemu-file.h"
     20#include "savevm.h"
     21#include "migration/colo.h"
     22#include "block.h"
     23#include "io/channel-buffer.h"
     24#include "trace.h"
     25#include "qemu/error-report.h"
     26#include "qemu/main-loop.h"
     27#include "qemu/rcu.h"
     28#include "migration/failover.h"
     29#include "migration/ram.h"
     30#ifdef CONFIG_REPLICATION
     31#include "block/replication.h"
     32#endif
     33#include "net/colo-compare.h"
     34#include "net/colo.h"
     35#include "block/block.h"
     36#include "qapi/qapi-events-migration.h"
     37#include "qapi/qmp/qerror.h"
     38#include "sysemu/cpus.h"
     39#include "sysemu/runstate.h"
     40#include "net/filter.h"
     41
     42static bool vmstate_loading;
     43static Notifier packets_compare_notifier;
     44
     45/* User need to know colo mode after COLO failover */
     46static COLOMode last_colo_mode;
     47
     48#define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
     49
     50bool migration_in_colo_state(void)
     51{
     52    MigrationState *s = migrate_get_current();
     53
     54    return (s->state == MIGRATION_STATUS_COLO);
     55}
     56
     57bool migration_incoming_in_colo_state(void)
     58{
     59    MigrationIncomingState *mis = migration_incoming_get_current();
     60
     61    return mis && (mis->state == MIGRATION_STATUS_COLO);
     62}
     63
     64static bool colo_runstate_is_stopped(void)
     65{
     66    return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
     67}
     68
     69static void secondary_vm_do_failover(void)
     70{
     71/* COLO needs enable block-replication */
     72#ifdef CONFIG_REPLICATION
     73    int old_state;
     74    MigrationIncomingState *mis = migration_incoming_get_current();
     75    Error *local_err = NULL;
     76
     77    /* Can not do failover during the process of VM's loading VMstate, Or
     78     * it will break the secondary VM.
     79     */
     80    if (vmstate_loading) {
     81        old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
     82                        FAILOVER_STATUS_RELAUNCH);
     83        if (old_state != FAILOVER_STATUS_ACTIVE) {
     84            error_report("Unknown error while do failover for secondary VM,"
     85                         "old_state: %s", FailoverStatus_str(old_state));
     86        }
     87        return;
     88    }
     89
     90    migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
     91                      MIGRATION_STATUS_COMPLETED);
     92
     93    replication_stop_all(true, &local_err);
     94    if (local_err) {
     95        error_report_err(local_err);
     96        local_err = NULL;
     97    }
     98
     99    /* Notify all filters of all NIC to do checkpoint */
    100    colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err);
    101    if (local_err) {
    102        error_report_err(local_err);
    103    }
    104
    105    if (!autostart) {
    106        error_report("\"-S\" qemu option will be ignored in secondary side");
    107        /* recover runstate to normal migration finish state */
    108        autostart = true;
    109    }
    110    /*
    111     * Make sure COLO incoming thread not block in recv or send,
    112     * If mis->from_src_file and mis->to_src_file use the same fd,
    113     * The second shutdown() will return -1, we ignore this value,
    114     * It is harmless.
    115     */
    116    if (mis->from_src_file) {
    117        qemu_file_shutdown(mis->from_src_file);
    118    }
    119    if (mis->to_src_file) {
    120        qemu_file_shutdown(mis->to_src_file);
    121    }
    122
    123    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
    124                                   FAILOVER_STATUS_COMPLETED);
    125    if (old_state != FAILOVER_STATUS_ACTIVE) {
    126        error_report("Incorrect state (%s) while doing failover for "
    127                     "secondary VM", FailoverStatus_str(old_state));
    128        return;
    129    }
    130    /* Notify COLO incoming thread that failover work is finished */
    131    qemu_sem_post(&mis->colo_incoming_sem);
    132
    133    /* For Secondary VM, jump to incoming co */
    134    if (mis->migration_incoming_co) {
    135        qemu_coroutine_enter(mis->migration_incoming_co);
    136    }
    137#else
    138    abort();
    139#endif
    140}
    141
    142static void primary_vm_do_failover(void)
    143{
    144#ifdef CONFIG_REPLICATION
    145    MigrationState *s = migrate_get_current();
    146    int old_state;
    147    Error *local_err = NULL;
    148
    149    migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
    150                      MIGRATION_STATUS_COMPLETED);
    151    /*
    152     * kick COLO thread which might wait at
    153     * qemu_sem_wait(&s->colo_checkpoint_sem).
    154     */
    155    colo_checkpoint_notify(migrate_get_current());
    156
    157    /*
    158     * Wake up COLO thread which may blocked in recv() or send(),
    159     * The s->rp_state.from_dst_file and s->to_dst_file may use the
    160     * same fd, but we still shutdown the fd for twice, it is harmless.
    161     */
    162    if (s->to_dst_file) {
    163        qemu_file_shutdown(s->to_dst_file);
    164    }
    165    if (s->rp_state.from_dst_file) {
    166        qemu_file_shutdown(s->rp_state.from_dst_file);
    167    }
    168
    169    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
    170                                   FAILOVER_STATUS_COMPLETED);
    171    if (old_state != FAILOVER_STATUS_ACTIVE) {
    172        error_report("Incorrect state (%s) while doing failover for Primary VM",
    173                     FailoverStatus_str(old_state));
    174        return;
    175    }
    176
    177    replication_stop_all(true, &local_err);
    178    if (local_err) {
    179        error_report_err(local_err);
    180        local_err = NULL;
    181    }
    182
    183    /* Notify COLO thread that failover work is finished */
    184    qemu_sem_post(&s->colo_exit_sem);
    185#else
    186    abort();
    187#endif
    188}
    189
    190COLOMode get_colo_mode(void)
    191{
    192    if (migration_in_colo_state()) {
    193        return COLO_MODE_PRIMARY;
    194    } else if (migration_incoming_in_colo_state()) {
    195        return COLO_MODE_SECONDARY;
    196    } else {
    197        return COLO_MODE_NONE;
    198    }
    199}
    200
    201void colo_do_failover(void)
    202{
    203    /* Make sure VM stopped while failover happened. */
    204    if (!colo_runstate_is_stopped()) {
    205        vm_stop_force_state(RUN_STATE_COLO);
    206    }
    207
    208    switch (get_colo_mode()) {
    209    case COLO_MODE_PRIMARY:
    210        primary_vm_do_failover();
    211        break;
    212    case COLO_MODE_SECONDARY:
    213        secondary_vm_do_failover();
    214        break;
    215    default:
    216        error_report("colo_do_failover failed because the colo mode"
    217                     " could not be obtained");
    218    }
    219}
    220
    221#ifdef CONFIG_REPLICATION
    222void qmp_xen_set_replication(bool enable, bool primary,
    223                             bool has_failover, bool failover,
    224                             Error **errp)
    225{
    226    ReplicationMode mode = primary ?
    227                           REPLICATION_MODE_PRIMARY :
    228                           REPLICATION_MODE_SECONDARY;
    229
    230    if (has_failover && enable) {
    231        error_setg(errp, "Parameter 'failover' is only for"
    232                   " stopping replication");
    233        return;
    234    }
    235
    236    if (enable) {
    237        replication_start_all(mode, errp);
    238    } else {
    239        if (!has_failover) {
    240            failover = NULL;
    241        }
    242        replication_stop_all(failover, failover ? NULL : errp);
    243    }
    244}
    245
    246ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
    247{
    248    Error *err = NULL;
    249    ReplicationStatus *s = g_new0(ReplicationStatus, 1);
    250
    251    replication_get_error_all(&err);
    252    if (err) {
    253        s->error = true;
    254        s->has_desc = true;
    255        s->desc = g_strdup(error_get_pretty(err));
    256    } else {
    257        s->error = false;
    258    }
    259
    260    error_free(err);
    261    return s;
    262}
    263
    264void qmp_xen_colo_do_checkpoint(Error **errp)
    265{
    266    Error *err = NULL;
    267
    268    replication_do_checkpoint_all(&err);
    269    if (err) {
    270        error_propagate(errp, err);
    271        return;
    272    }
    273    /* Notify all filters of all NIC to do checkpoint */
    274    colo_notify_filters_event(COLO_EVENT_CHECKPOINT, errp);
    275}
    276#endif
    277
    278COLOStatus *qmp_query_colo_status(Error **errp)
    279{
    280    COLOStatus *s = g_new0(COLOStatus, 1);
    281
    282    s->mode = get_colo_mode();
    283    s->last_mode = last_colo_mode;
    284
    285    switch (failover_get_state()) {
    286    case FAILOVER_STATUS_NONE:
    287        s->reason = COLO_EXIT_REASON_NONE;
    288        break;
    289    case FAILOVER_STATUS_COMPLETED:
    290        s->reason = COLO_EXIT_REASON_REQUEST;
    291        break;
    292    default:
    293        if (migration_in_colo_state()) {
    294            s->reason = COLO_EXIT_REASON_PROCESSING;
    295        } else {
    296            s->reason = COLO_EXIT_REASON_ERROR;
    297        }
    298    }
    299
    300    return s;
    301}
    302
    303static void colo_send_message(QEMUFile *f, COLOMessage msg,
    304                              Error **errp)
    305{
    306    int ret;
    307
    308    if (msg >= COLO_MESSAGE__MAX) {
    309        error_setg(errp, "%s: Invalid message", __func__);
    310        return;
    311    }
    312    qemu_put_be32(f, msg);
    313    qemu_fflush(f);
    314
    315    ret = qemu_file_get_error(f);
    316    if (ret < 0) {
    317        error_setg_errno(errp, -ret, "Can't send COLO message");
    318    }
    319    trace_colo_send_message(COLOMessage_str(msg));
    320}
    321
    322static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
    323                                    uint64_t value, Error **errp)
    324{
    325    Error *local_err = NULL;
    326    int ret;
    327
    328    colo_send_message(f, msg, &local_err);
    329    if (local_err) {
    330        error_propagate(errp, local_err);
    331        return;
    332    }
    333    qemu_put_be64(f, value);
    334    qemu_fflush(f);
    335
    336    ret = qemu_file_get_error(f);
    337    if (ret < 0) {
    338        error_setg_errno(errp, -ret, "Failed to send value for message:%s",
    339                         COLOMessage_str(msg));
    340    }
    341}
    342
    343static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
    344{
    345    COLOMessage msg;
    346    int ret;
    347
    348    msg = qemu_get_be32(f);
    349    ret = qemu_file_get_error(f);
    350    if (ret < 0) {
    351        error_setg_errno(errp, -ret, "Can't receive COLO message");
    352        return msg;
    353    }
    354    if (msg >= COLO_MESSAGE__MAX) {
    355        error_setg(errp, "%s: Invalid message", __func__);
    356        return msg;
    357    }
    358    trace_colo_receive_message(COLOMessage_str(msg));
    359    return msg;
    360}
    361
    362static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
    363                                       Error **errp)
    364{
    365    COLOMessage msg;
    366    Error *local_err = NULL;
    367
    368    msg = colo_receive_message(f, &local_err);
    369    if (local_err) {
    370        error_propagate(errp, local_err);
    371        return;
    372    }
    373    if (msg != expect_msg) {
    374        error_setg(errp, "Unexpected COLO message %d, expected %d",
    375                          msg, expect_msg);
    376    }
    377}
    378
    379static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
    380                                           Error **errp)
    381{
    382    Error *local_err = NULL;
    383    uint64_t value;
    384    int ret;
    385
    386    colo_receive_check_message(f, expect_msg, &local_err);
    387    if (local_err) {
    388        error_propagate(errp, local_err);
    389        return 0;
    390    }
    391
    392    value = qemu_get_be64(f);
    393    ret = qemu_file_get_error(f);
    394    if (ret < 0) {
    395        error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
    396                         COLOMessage_str(expect_msg));
    397    }
    398    return value;
    399}
    400
    401static int colo_do_checkpoint_transaction(MigrationState *s,
    402                                          QIOChannelBuffer *bioc,
    403                                          QEMUFile *fb)
    404{
    405    Error *local_err = NULL;
    406    int ret = -1;
    407
    408    colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
    409                      &local_err);
    410    if (local_err) {
    411        goto out;
    412    }
    413
    414    colo_receive_check_message(s->rp_state.from_dst_file,
    415                    COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
    416    if (local_err) {
    417        goto out;
    418    }
    419    /* Reset channel-buffer directly */
    420    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
    421    bioc->usage = 0;
    422
    423    qemu_mutex_lock_iothread();
    424    if (failover_get_state() != FAILOVER_STATUS_NONE) {
    425        qemu_mutex_unlock_iothread();
    426        goto out;
    427    }
    428    vm_stop_force_state(RUN_STATE_COLO);
    429    qemu_mutex_unlock_iothread();
    430    trace_colo_vm_state_change("run", "stop");
    431    /*
    432     * Failover request bh could be called after vm_stop_force_state(),
    433     * So we need check failover_request_is_active() again.
    434     */
    435    if (failover_get_state() != FAILOVER_STATUS_NONE) {
    436        goto out;
    437    }
    438    qemu_mutex_lock_iothread();
    439
    440#ifdef CONFIG_REPLICATION
    441    replication_do_checkpoint_all(&local_err);
    442    if (local_err) {
    443        qemu_mutex_unlock_iothread();
    444        goto out;
    445    }
    446#else
    447        abort();
    448#endif
    449
    450    colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
    451    if (local_err) {
    452        qemu_mutex_unlock_iothread();
    453        goto out;
    454    }
    455    /* Note: device state is saved into buffer */
    456    ret = qemu_save_device_state(fb);
    457
    458    qemu_mutex_unlock_iothread();
    459    if (ret < 0) {
    460        goto out;
    461    }
    462    /*
    463     * Only save VM's live state, which not including device state.
    464     * TODO: We may need a timeout mechanism to prevent COLO process
    465     * to be blocked here.
    466     */
    467    qemu_savevm_live_state(s->to_dst_file);
    468
    469    qemu_fflush(fb);
    470
    471    /*
    472     * We need the size of the VMstate data in Secondary side,
    473     * With which we can decide how much data should be read.
    474     */
    475    colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
    476                            bioc->usage, &local_err);
    477    if (local_err) {
    478        goto out;
    479    }
    480
    481    qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
    482    qemu_fflush(s->to_dst_file);
    483    ret = qemu_file_get_error(s->to_dst_file);
    484    if (ret < 0) {
    485        goto out;
    486    }
    487
    488    colo_receive_check_message(s->rp_state.from_dst_file,
    489                       COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
    490    if (local_err) {
    491        goto out;
    492    }
    493
    494    qemu_event_reset(&s->colo_checkpoint_event);
    495    colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err);
    496    if (local_err) {
    497        goto out;
    498    }
    499
    500    colo_receive_check_message(s->rp_state.from_dst_file,
    501                       COLO_MESSAGE_VMSTATE_LOADED, &local_err);
    502    if (local_err) {
    503        goto out;
    504    }
    505
    506    ret = 0;
    507
    508    qemu_mutex_lock_iothread();
    509    vm_start();
    510    qemu_mutex_unlock_iothread();
    511    trace_colo_vm_state_change("stop", "run");
    512
    513out:
    514    if (local_err) {
    515        error_report_err(local_err);
    516    }
    517    return ret;
    518}
    519
    520static void colo_compare_notify_checkpoint(Notifier *notifier, void *data)
    521{
    522    colo_checkpoint_notify(data);
    523}
    524
    525static void colo_process_checkpoint(MigrationState *s)
    526{
    527    QIOChannelBuffer *bioc;
    528    QEMUFile *fb = NULL;
    529    int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
    530    Error *local_err = NULL;
    531    int ret;
    532
    533    last_colo_mode = get_colo_mode();
    534    if (last_colo_mode != COLO_MODE_PRIMARY) {
    535        error_report("COLO mode must be COLO_MODE_PRIMARY");
    536        return;
    537    }
    538
    539    failover_init_state();
    540
    541    s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
    542    if (!s->rp_state.from_dst_file) {
    543        error_report("Open QEMUFile from_dst_file failed");
    544        goto out;
    545    }
    546
    547    packets_compare_notifier.notify = colo_compare_notify_checkpoint;
    548    colo_compare_register_notifier(&packets_compare_notifier);
    549
    550    /*
    551     * Wait for Secondary finish loading VM states and enter COLO
    552     * restore.
    553     */
    554    colo_receive_check_message(s->rp_state.from_dst_file,
    555                       COLO_MESSAGE_CHECKPOINT_READY, &local_err);
    556    if (local_err) {
    557        goto out;
    558    }
    559    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
    560    fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
    561    object_unref(OBJECT(bioc));
    562
    563    qemu_mutex_lock_iothread();
    564#ifdef CONFIG_REPLICATION
    565    replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
    566    if (local_err) {
    567        qemu_mutex_unlock_iothread();
    568        goto out;
    569    }
    570#else
    571        abort();
    572#endif
    573
    574    vm_start();
    575    qemu_mutex_unlock_iothread();
    576    trace_colo_vm_state_change("stop", "run");
    577
    578    timer_mod(s->colo_delay_timer,
    579            current_time + s->parameters.x_checkpoint_delay);
    580
    581    while (s->state == MIGRATION_STATUS_COLO) {
    582        if (failover_get_state() != FAILOVER_STATUS_NONE) {
    583            error_report("failover request");
    584            goto out;
    585        }
    586
    587        qemu_event_wait(&s->colo_checkpoint_event);
    588
    589        if (s->state != MIGRATION_STATUS_COLO) {
    590            goto out;
    591        }
    592        ret = colo_do_checkpoint_transaction(s, bioc, fb);
    593        if (ret < 0) {
    594            goto out;
    595        }
    596    }
    597
    598out:
    599    /* Throw the unreported error message after exited from loop */
    600    if (local_err) {
    601        error_report_err(local_err);
    602    }
    603
    604    if (fb) {
    605        qemu_fclose(fb);
    606    }
    607
    608    /*
    609     * There are only two reasons we can get here, some error happened
    610     * or the user triggered failover.
    611     */
    612    switch (failover_get_state()) {
    613    case FAILOVER_STATUS_COMPLETED:
    614        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
    615                                  COLO_EXIT_REASON_REQUEST);
    616        break;
    617    default:
    618        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
    619                                  COLO_EXIT_REASON_ERROR);
    620    }
    621
    622    /* Hope this not to be too long to wait here */
    623    qemu_sem_wait(&s->colo_exit_sem);
    624    qemu_sem_destroy(&s->colo_exit_sem);
    625
    626    /*
    627     * It is safe to unregister notifier after failover finished.
    628     * Besides, colo_delay_timer and colo_checkpoint_sem can't be
    629     * released before unregister notifier, or there will be use-after-free
    630     * error.
    631     */
    632    colo_compare_unregister_notifier(&packets_compare_notifier);
    633    timer_free(s->colo_delay_timer);
    634    qemu_event_destroy(&s->colo_checkpoint_event);
    635
    636    /*
    637     * Must be called after failover BH is completed,
    638     * Or the failover BH may shutdown the wrong fd that
    639     * re-used by other threads after we release here.
    640     */
    641    if (s->rp_state.from_dst_file) {
    642        qemu_fclose(s->rp_state.from_dst_file);
    643    }
    644}
    645
    646void colo_checkpoint_notify(void *opaque)
    647{
    648    MigrationState *s = opaque;
    649    int64_t next_notify_time;
    650
    651    qemu_event_set(&s->colo_checkpoint_event);
    652    s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
    653    next_notify_time = s->colo_checkpoint_time +
    654                    s->parameters.x_checkpoint_delay;
    655    timer_mod(s->colo_delay_timer, next_notify_time);
    656}
    657
    658void migrate_start_colo_process(MigrationState *s)
    659{
    660    qemu_mutex_unlock_iothread();
    661    qemu_event_init(&s->colo_checkpoint_event, false);
    662    s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
    663                                colo_checkpoint_notify, s);
    664
    665    qemu_sem_init(&s->colo_exit_sem, 0);
    666    migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
    667                      MIGRATION_STATUS_COLO);
    668    colo_process_checkpoint(s);
    669    qemu_mutex_lock_iothread();
    670}
    671
    672static void colo_incoming_process_checkpoint(MigrationIncomingState *mis,
    673                      QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
    674{
    675    uint64_t total_size;
    676    uint64_t value;
    677    Error *local_err = NULL;
    678    int ret;
    679
    680    qemu_mutex_lock_iothread();
    681    vm_stop_force_state(RUN_STATE_COLO);
    682    trace_colo_vm_state_change("run", "stop");
    683    qemu_mutex_unlock_iothread();
    684
    685    /* FIXME: This is unnecessary for periodic checkpoint mode */
    686    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
    687                 &local_err);
    688    if (local_err) {
    689        error_propagate(errp, local_err);
    690        return;
    691    }
    692
    693    colo_receive_check_message(mis->from_src_file,
    694                       COLO_MESSAGE_VMSTATE_SEND, &local_err);
    695    if (local_err) {
    696        error_propagate(errp, local_err);
    697        return;
    698    }
    699
    700    qemu_mutex_lock_iothread();
    701    cpu_synchronize_all_states();
    702    ret = qemu_loadvm_state_main(mis->from_src_file, mis);
    703    qemu_mutex_unlock_iothread();
    704
    705    if (ret < 0) {
    706        error_setg(errp, "Load VM's live state (ram) error");
    707        return;
    708    }
    709
    710    value = colo_receive_message_value(mis->from_src_file,
    711                             COLO_MESSAGE_VMSTATE_SIZE, &local_err);
    712    if (local_err) {
    713        error_propagate(errp, local_err);
    714        return;
    715    }
    716
    717    /*
    718     * Read VM device state data into channel buffer,
    719     * It's better to re-use the memory allocated.
    720     * Here we need to handle the channel buffer directly.
    721     */
    722    if (value > bioc->capacity) {
    723        bioc->capacity = value;
    724        bioc->data = g_realloc(bioc->data, bioc->capacity);
    725    }
    726    total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
    727    if (total_size != value) {
    728        error_setg(errp, "Got %" PRIu64 " VMState data, less than expected"
    729                    " %" PRIu64, total_size, value);
    730        return;
    731    }
    732    bioc->usage = total_size;
    733    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
    734
    735    colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
    736                 &local_err);
    737    if (local_err) {
    738        error_propagate(errp, local_err);
    739        return;
    740    }
    741
    742    qemu_mutex_lock_iothread();
    743    vmstate_loading = true;
    744    colo_flush_ram_cache();
    745    ret = qemu_load_device_state(fb);
    746    if (ret < 0) {
    747        error_setg(errp, "COLO: load device state failed");
    748        vmstate_loading = false;
    749        qemu_mutex_unlock_iothread();
    750        return;
    751    }
    752
    753#ifdef CONFIG_REPLICATION
    754    replication_get_error_all(&local_err);
    755    if (local_err) {
    756        error_propagate(errp, local_err);
    757        vmstate_loading = false;
    758        qemu_mutex_unlock_iothread();
    759        return;
    760    }
    761
    762    /* discard colo disk buffer */
    763    replication_do_checkpoint_all(&local_err);
    764    if (local_err) {
    765        error_propagate(errp, local_err);
    766        vmstate_loading = false;
    767        qemu_mutex_unlock_iothread();
    768        return;
    769    }
    770#else
    771    abort();
    772#endif
    773    /* Notify all filters of all NIC to do checkpoint */
    774    colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
    775
    776    if (local_err) {
    777        error_propagate(errp, local_err);
    778        vmstate_loading = false;
    779        qemu_mutex_unlock_iothread();
    780        return;
    781    }
    782
    783    vmstate_loading = false;
    784    vm_start();
    785    trace_colo_vm_state_change("stop", "run");
    786    qemu_mutex_unlock_iothread();
    787
    788    if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
    789        return;
    790    }
    791
    792    colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
    793                 &local_err);
    794    error_propagate(errp, local_err);
    795}
    796
    797static void colo_wait_handle_message(MigrationIncomingState *mis,
    798                QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
    799{
    800    COLOMessage msg;
    801    Error *local_err = NULL;
    802
    803    msg = colo_receive_message(mis->from_src_file, &local_err);
    804    if (local_err) {
    805        error_propagate(errp, local_err);
    806        return;
    807    }
    808
    809    switch (msg) {
    810    case COLO_MESSAGE_CHECKPOINT_REQUEST:
    811        colo_incoming_process_checkpoint(mis, fb, bioc, errp);
    812        break;
    813    default:
    814        error_setg(errp, "Got unknown COLO message: %d", msg);
    815        break;
    816    }
    817}
    818
    819void *colo_process_incoming_thread(void *opaque)
    820{
    821    MigrationIncomingState *mis = opaque;
    822    QEMUFile *fb = NULL;
    823    QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
    824    Error *local_err = NULL;
    825
    826    rcu_register_thread();
    827    qemu_sem_init(&mis->colo_incoming_sem, 0);
    828
    829    migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
    830                      MIGRATION_STATUS_COLO);
    831
    832    last_colo_mode = get_colo_mode();
    833    if (last_colo_mode != COLO_MODE_SECONDARY) {
    834        error_report("COLO mode must be COLO_MODE_SECONDARY");
    835        return NULL;
    836    }
    837
    838    failover_init_state();
    839
    840    mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
    841    if (!mis->to_src_file) {
    842        error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
    843        goto out;
    844    }
    845    /*
    846     * Note: the communication between Primary side and Secondary side
    847     * should be sequential, we set the fd to unblocked in migration incoming
    848     * coroutine, and here we are in the COLO incoming thread, so it is ok to
    849     * set the fd back to blocked.
    850     */
    851    qemu_file_set_blocking(mis->from_src_file, true);
    852
    853    colo_incoming_start_dirty_log();
    854
    855    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
    856    fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
    857    object_unref(OBJECT(bioc));
    858
    859    qemu_mutex_lock_iothread();
    860#ifdef CONFIG_REPLICATION
    861    replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
    862    if (local_err) {
    863        qemu_mutex_unlock_iothread();
    864        goto out;
    865    }
    866#else
    867        abort();
    868#endif
    869    vm_start();
    870    trace_colo_vm_state_change("stop", "run");
    871    qemu_mutex_unlock_iothread();
    872
    873    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
    874                      &local_err);
    875    if (local_err) {
    876        goto out;
    877    }
    878
    879    while (mis->state == MIGRATION_STATUS_COLO) {
    880        colo_wait_handle_message(mis, fb, bioc, &local_err);
    881        if (local_err) {
    882            error_report_err(local_err);
    883            break;
    884        }
    885
    886        if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
    887            failover_set_state(FAILOVER_STATUS_RELAUNCH,
    888                            FAILOVER_STATUS_NONE);
    889            failover_request_active(NULL);
    890            break;
    891        }
    892
    893        if (failover_get_state() != FAILOVER_STATUS_NONE) {
    894            error_report("failover request");
    895            break;
    896        }
    897    }
    898
    899out:
    900    /*
    901     * There are only two reasons we can get here, some error happened
    902     * or the user triggered failover.
    903     */
    904    switch (failover_get_state()) {
    905    case FAILOVER_STATUS_COMPLETED:
    906        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
    907                                  COLO_EXIT_REASON_REQUEST);
    908        break;
    909    default:
    910        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
    911                                  COLO_EXIT_REASON_ERROR);
    912    }
    913
    914    if (fb) {
    915        qemu_fclose(fb);
    916    }
    917
    918    /* Hope this not to be too long to loop here */
    919    qemu_sem_wait(&mis->colo_incoming_sem);
    920    qemu_sem_destroy(&mis->colo_incoming_sem);
    921    /* Must be called after failover BH is completed */
    922    if (mis->to_src_file) {
    923        qemu_fclose(mis->to_src_file);
    924        mis->to_src_file = NULL;
    925    }
    926
    927    rcu_unregister_thread();
    928    return NULL;
    929}