cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

colo-compare.c (45472B)


      1/*
      2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
      3 * (a.k.a. Fault Tolerance or Continuous Replication)
      4 *
      5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
      6 * Copyright (c) 2016 FUJITSU LIMITED
      7 * Copyright (c) 2016 Intel Corporation
      8 *
      9 * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
     10 *
     11 * This work is licensed under the terms of the GNU GPL, version 2 or
     12 * later.  See the COPYING file in the top-level directory.
     13 */
     14
     15#include "qemu/osdep.h"
     16#include "qemu-common.h"
     17#include "qemu/error-report.h"
     18#include "trace.h"
     19#include "qapi/error.h"
     20#include "net/net.h"
     21#include "net/eth.h"
     22#include "qom/object_interfaces.h"
     23#include "qemu/iov.h"
     24#include "qom/object.h"
     25#include "net/queue.h"
     26#include "chardev/char-fe.h"
     27#include "qemu/sockets.h"
     28#include "colo.h"
     29#include "sysemu/iothread.h"
     30#include "net/colo-compare.h"
     31#include "migration/colo.h"
     32#include "migration/migration.h"
     33#include "util.h"
     34
     35#include "block/aio-wait.h"
     36#include "qemu/coroutine.h"
     37
     38#define TYPE_COLO_COMPARE "colo-compare"
     39typedef struct CompareState CompareState;
     40DECLARE_INSTANCE_CHECKER(CompareState, COLO_COMPARE,
     41                         TYPE_COLO_COMPARE)
     42
     43static QTAILQ_HEAD(, CompareState) net_compares =
     44       QTAILQ_HEAD_INITIALIZER(net_compares);
     45
     46static NotifierList colo_compare_notifiers =
     47    NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
     48
     49#define COMPARE_READ_LEN_MAX NET_BUFSIZE
     50#define MAX_QUEUE_SIZE 1024
     51
     52#define COLO_COMPARE_FREE_PRIMARY     0x01
     53#define COLO_COMPARE_FREE_SECONDARY   0x02
     54
     55#define REGULAR_PACKET_CHECK_MS 1000
     56#define DEFAULT_TIME_OUT_MS 3000
     57
     58/* #define DEBUG_COLO_PACKETS */
     59
     60static QemuMutex colo_compare_mutex;
     61static bool colo_compare_active;
     62static QemuMutex event_mtx;
     63static QemuCond event_complete_cond;
     64static int event_unhandled_count;
     65static uint32_t max_queue_size;
     66
     67/*
     68 *  + CompareState ++
     69 *  |               |
     70 *  +---------------+   +---------------+         +---------------+
     71 *  |   conn list   + - >      conn     + ------- >      conn     + -- > ......
     72 *  +---------------+   +---------------+         +---------------+
     73 *  |               |     |           |             |          |
     74 *  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
     75 *                    |primary |  |secondary    |primary | |secondary
     76 *                    |packet  |  |packet  +    |packet  | |packet  +
     77 *                    +--------+  +--------+    +--------+ +--------+
     78 *                        |           |             |          |
     79 *                    +---v----+  +---v----+    +---v----+ +---v----+
     80 *                    |primary |  |secondary    |primary | |secondary
     81 *                    |packet  |  |packet  +    |packet  | |packet  +
     82 *                    +--------+  +--------+    +--------+ +--------+
     83 *                        |           |             |          |
     84 *                    +---v----+  +---v----+    +---v----+ +---v----+
     85 *                    |primary |  |secondary    |primary | |secondary
     86 *                    |packet  |  |packet  +    |packet  | |packet  +
     87 *                    +--------+  +--------+    +--------+ +--------+
     88 */
     89
     90typedef struct SendCo {
     91    Coroutine *co;
     92    struct CompareState *s;
     93    CharBackend *chr;
     94    GQueue send_list;
     95    bool notify_remote_frame;
     96    bool done;
     97    int ret;
     98} SendCo;
     99
    100typedef struct SendEntry {
    101    uint32_t size;
    102    uint32_t vnet_hdr_len;
    103    uint8_t *buf;
    104} SendEntry;
    105
    106struct CompareState {
    107    Object parent;
    108
    109    char *pri_indev;
    110    char *sec_indev;
    111    char *outdev;
    112    char *notify_dev;
    113    CharBackend chr_pri_in;
    114    CharBackend chr_sec_in;
    115    CharBackend chr_out;
    116    CharBackend chr_notify_dev;
    117    SocketReadState pri_rs;
    118    SocketReadState sec_rs;
    119    SocketReadState notify_rs;
    120    SendCo out_sendco;
    121    SendCo notify_sendco;
    122    bool vnet_hdr;
    123    uint64_t compare_timeout;
    124    uint32_t expired_scan_cycle;
    125
    126    /*
    127     * Record the connection that through the NIC
    128     * Element type: Connection
    129     */
    130    GQueue conn_list;
    131    /* Record the connection without repetition */
    132    GHashTable *connection_track_table;
    133
    134    IOThread *iothread;
    135    GMainContext *worker_context;
    136    QEMUTimer *packet_check_timer;
    137
    138    QEMUBH *event_bh;
    139    enum colo_event event;
    140
    141    QTAILQ_ENTRY(CompareState) next;
    142};
    143
    144typedef struct CompareClass {
    145    ObjectClass parent_class;
    146} CompareClass;
    147
    148enum {
    149    PRIMARY_IN = 0,
    150    SECONDARY_IN,
    151};
    152
    153static const char *colo_mode[] = {
    154    [PRIMARY_IN] = "primary",
    155    [SECONDARY_IN] = "secondary",
    156};
    157
    158static int compare_chr_send(CompareState *s,
    159                            uint8_t *buf,
    160                            uint32_t size,
    161                            uint32_t vnet_hdr_len,
    162                            bool notify_remote_frame,
    163                            bool zero_copy);
    164
    165static bool packet_matches_str(const char *str,
    166                               const uint8_t *buf,
    167                               uint32_t packet_len)
    168{
    169    if (packet_len != strlen(str)) {
    170        return false;
    171    }
    172
    173    return !memcmp(str, buf, strlen(str));
    174}
    175
    176static void notify_remote_frame(CompareState *s)
    177{
    178    char msg[] = "DO_CHECKPOINT";
    179    int ret = 0;
    180
    181    ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
    182    if (ret < 0) {
    183        error_report("Notify Xen COLO-frame failed");
    184    }
    185}
    186
    187static void colo_compare_inconsistency_notify(CompareState *s)
    188{
    189    if (s->notify_dev) {
    190        notify_remote_frame(s);
    191    } else {
    192        notifier_list_notify(&colo_compare_notifiers,
    193                             migrate_get_current());
    194    }
    195}
    196
    197/* Use restricted to colo_insert_packet() */
    198static gint seq_sorter(Packet *a, Packet *b, gpointer data)
    199{
    200    return a->tcp_seq - b->tcp_seq;
    201}
    202
    203static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
    204{
    205    Packet *pkt = data;
    206    struct tcp_hdr *tcphd;
    207
    208    tcphd = (struct tcp_hdr *)pkt->transport_header;
    209
    210    pkt->tcp_seq = ntohl(tcphd->th_seq);
    211    pkt->tcp_ack = ntohl(tcphd->th_ack);
    212    *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
    213    pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
    214                       + (tcphd->th_off << 2);
    215    pkt->payload_size = pkt->size - pkt->header_size;
    216    pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
    217    pkt->flags = tcphd->th_flags;
    218}
    219
    220/*
    221 * Return 1 on success, if return 0 means the
    222 * packet will be dropped
    223 */
    224static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
    225{
    226    if (g_queue_get_length(queue) <= max_queue_size) {
    227        if (pkt->ip->ip_p == IPPROTO_TCP) {
    228            fill_pkt_tcp_info(pkt, max_ack);
    229            g_queue_insert_sorted(queue,
    230                                  pkt,
    231                                  (GCompareDataFunc)seq_sorter,
    232                                  NULL);
    233        } else {
    234            g_queue_push_tail(queue, pkt);
    235        }
    236        return 1;
    237    }
    238    return 0;
    239}
    240
    241/*
    242 * Return 0 on success, if return -1 means the pkt
    243 * is unsupported(arp and ipv6) and will be sent later
    244 */
    245static int packet_enqueue(CompareState *s, int mode, Connection **con)
    246{
    247    ConnectionKey key;
    248    Packet *pkt = NULL;
    249    Connection *conn;
    250    int ret;
    251
    252    if (mode == PRIMARY_IN) {
    253        pkt = packet_new(s->pri_rs.buf,
    254                         s->pri_rs.packet_len,
    255                         s->pri_rs.vnet_hdr_len);
    256    } else {
    257        pkt = packet_new(s->sec_rs.buf,
    258                         s->sec_rs.packet_len,
    259                         s->sec_rs.vnet_hdr_len);
    260    }
    261
    262    if (parse_packet_early(pkt)) {
    263        packet_destroy(pkt, NULL);
    264        pkt = NULL;
    265        return -1;
    266    }
    267    fill_connection_key(pkt, &key);
    268
    269    conn = connection_get(s->connection_track_table,
    270                          &key,
    271                          &s->conn_list);
    272
    273    if (!conn->processing) {
    274        g_queue_push_tail(&s->conn_list, conn);
    275        conn->processing = true;
    276    }
    277
    278    if (mode == PRIMARY_IN) {
    279        ret = colo_insert_packet(&conn->primary_list, pkt, &conn->pack);
    280    } else {
    281        ret = colo_insert_packet(&conn->secondary_list, pkt, &conn->sack);
    282    }
    283
    284    if (!ret) {
    285        trace_colo_compare_drop_packet(colo_mode[mode],
    286            "queue size too big, drop packet");
    287        packet_destroy(pkt, NULL);
    288        pkt = NULL;
    289    }
    290
    291    *con = conn;
    292
    293    return 0;
    294}
    295
    296static inline bool after(uint32_t seq1, uint32_t seq2)
    297{
    298        return (int32_t)(seq1 - seq2) > 0;
    299}
    300
    301static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
    302{
    303    int ret;
    304    ret = compare_chr_send(s,
    305                           pkt->data,
    306                           pkt->size,
    307                           pkt->vnet_hdr_len,
    308                           false,
    309                           true);
    310    if (ret < 0) {
    311        error_report("colo send primary packet failed");
    312    }
    313    trace_colo_compare_main("packet same and release packet");
    314    packet_destroy_partial(pkt, NULL);
    315}
    316
    317/*
    318 * The IP packets sent by primary and secondary
    319 * will be compared in here
    320 * TODO support ip fragment, Out-Of-Order
    321 * return:    0  means packet same
    322 *            > 0 || < 0 means packet different
    323 */
    324static int colo_compare_packet_payload(Packet *ppkt,
    325                                       Packet *spkt,
    326                                       uint16_t poffset,
    327                                       uint16_t soffset,
    328                                       uint16_t len)
    329
    330{
    331    if (trace_event_get_state_backends(TRACE_COLO_COMPARE_IP_INFO)) {
    332        char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
    333
    334        strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
    335        strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
    336        strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
    337        strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
    338
    339        trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
    340                                   pri_ip_dst, spkt->size,
    341                                   sec_ip_src, sec_ip_dst);
    342    }
    343
    344    return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
    345}
    346
    347/*
    348 * return true means that the payload is consist and
    349 * need to make the next comparison, false means do
    350 * the checkpoint
    351*/
    352static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
    353                              int8_t *mark, uint32_t max_ack)
    354{
    355    *mark = 0;
    356
    357    if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
    358        if (!colo_compare_packet_payload(ppkt, spkt,
    359                                        ppkt->header_size, spkt->header_size,
    360                                        ppkt->payload_size)) {
    361            *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
    362            return true;
    363        }
    364    }
    365
    366    /* one part of secondary packet payload still need to be compared */
    367    if (!after(ppkt->seq_end, spkt->seq_end)) {
    368        if (!colo_compare_packet_payload(ppkt, spkt,
    369                                        ppkt->header_size + ppkt->offset,
    370                                        spkt->header_size + spkt->offset,
    371                                        ppkt->payload_size - ppkt->offset)) {
    372            if (!after(ppkt->tcp_ack, max_ack)) {
    373                *mark = COLO_COMPARE_FREE_PRIMARY;
    374                spkt->offset += ppkt->payload_size - ppkt->offset;
    375                return true;
    376            } else {
    377                /* secondary guest hasn't ack the data, don't send
    378                 * out this packet
    379                 */
    380                return false;
    381            }
    382        }
    383    } else {
    384        /* primary packet is longer than secondary packet, compare
    385         * the same part and mark the primary packet offset
    386         */
    387        if (!colo_compare_packet_payload(ppkt, spkt,
    388                                        ppkt->header_size + ppkt->offset,
    389                                        spkt->header_size + spkt->offset,
    390                                        spkt->payload_size - spkt->offset)) {
    391            *mark = COLO_COMPARE_FREE_SECONDARY;
    392            ppkt->offset += spkt->payload_size - spkt->offset;
    393            return true;
    394        }
    395    }
    396
    397    return false;
    398}
    399
    400static void colo_compare_tcp(CompareState *s, Connection *conn)
    401{
    402    Packet *ppkt = NULL, *spkt = NULL;
    403    int8_t mark;
    404
    405    /*
    406     * If ppkt and spkt have the same payload, but ppkt's ACK
    407     * is greater than spkt's ACK, in this case we can not
    408     * send the ppkt because it will cause the secondary guest
    409     * to miss sending some data in the next. Therefore, we
    410     * record the maximum ACK in the current queue at both
    411     * primary side and secondary side. Only when the ack is
    412     * less than the smaller of the two maximum ack, then we
    413     * can ensure that the packet's payload is acknowledged by
    414     * primary and secondary.
    415    */
    416    uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
    417
    418pri:
    419    if (g_queue_is_empty(&conn->primary_list)) {
    420        return;
    421    }
    422    ppkt = g_queue_pop_head(&conn->primary_list);
    423sec:
    424    if (g_queue_is_empty(&conn->secondary_list)) {
    425        g_queue_push_head(&conn->primary_list, ppkt);
    426        return;
    427    }
    428    spkt = g_queue_pop_head(&conn->secondary_list);
    429
    430    if (ppkt->tcp_seq == ppkt->seq_end) {
    431        colo_release_primary_pkt(s, ppkt);
    432        ppkt = NULL;
    433    }
    434
    435    if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
    436        trace_colo_compare_main("pri: this packet has compared");
    437        colo_release_primary_pkt(s, ppkt);
    438        ppkt = NULL;
    439    }
    440
    441    if (spkt->tcp_seq == spkt->seq_end) {
    442        packet_destroy(spkt, NULL);
    443        if (!ppkt) {
    444            goto pri;
    445        } else {
    446            goto sec;
    447        }
    448    } else {
    449        if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
    450            trace_colo_compare_main("sec: this packet has compared");
    451            packet_destroy(spkt, NULL);
    452            if (!ppkt) {
    453                goto pri;
    454            } else {
    455                goto sec;
    456            }
    457        }
    458        if (!ppkt) {
    459            g_queue_push_head(&conn->secondary_list, spkt);
    460            goto pri;
    461        }
    462    }
    463
    464    if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
    465        trace_colo_compare_tcp_info("pri",
    466                                    ppkt->tcp_seq, ppkt->tcp_ack,
    467                                    ppkt->header_size, ppkt->payload_size,
    468                                    ppkt->offset, ppkt->flags);
    469
    470        trace_colo_compare_tcp_info("sec",
    471                                    spkt->tcp_seq, spkt->tcp_ack,
    472                                    spkt->header_size, spkt->payload_size,
    473                                    spkt->offset, spkt->flags);
    474
    475        if (mark == COLO_COMPARE_FREE_PRIMARY) {
    476            conn->compare_seq = ppkt->seq_end;
    477            colo_release_primary_pkt(s, ppkt);
    478            g_queue_push_head(&conn->secondary_list, spkt);
    479            goto pri;
    480        } else if (mark == COLO_COMPARE_FREE_SECONDARY) {
    481            conn->compare_seq = spkt->seq_end;
    482            packet_destroy(spkt, NULL);
    483            goto sec;
    484        } else if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
    485            conn->compare_seq = ppkt->seq_end;
    486            colo_release_primary_pkt(s, ppkt);
    487            packet_destroy(spkt, NULL);
    488            goto pri;
    489        }
    490    } else {
    491        g_queue_push_head(&conn->primary_list, ppkt);
    492        g_queue_push_head(&conn->secondary_list, spkt);
    493
    494#ifdef DEBUG_COLO_PACKETS
    495        qemu_hexdump(stderr, "colo-compare ppkt", ppkt->data, ppkt->size);
    496        qemu_hexdump(stderr, "colo-compare spkt", spkt->data, spkt->size);
    497#endif
    498
    499        colo_compare_inconsistency_notify(s);
    500    }
    501}
    502
    503
    504/*
    505 * Called from the compare thread on the primary
    506 * for compare udp packet
    507 */
    508static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
    509{
    510    uint16_t network_header_length = ppkt->ip->ip_hl << 2;
    511    uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
    512
    513    trace_colo_compare_main("compare udp");
    514
    515    /*
    516     * Because of ppkt and spkt are both in the same connection,
    517     * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
    518     * same with spkt. In addition, IP header's Identification is a random
    519     * field, we can handle it in IP fragmentation function later.
    520     * COLO just concern the response net packet payload from primary guest
    521     * and secondary guest are same or not, So we ignored all IP header include
    522     * other field like TOS,TTL,IP Checksum. we only need to compare
    523     * the ip payload here.
    524     */
    525    if (ppkt->size != spkt->size) {
    526        trace_colo_compare_main("UDP: payload size of packets are different");
    527        return -1;
    528    }
    529    if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
    530                                    ppkt->size - offset)) {
    531        trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
    532        trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
    533#ifdef DEBUG_COLO_PACKETS
    534        qemu_hexdump(stderr, "colo-compare pri pkt", ppkt->data, ppkt->size);
    535        qemu_hexdump(stderr, "colo-compare sec pkt", spkt->data, spkt->size);
    536#endif
    537        return -1;
    538    } else {
    539        return 0;
    540    }
    541}
    542
    543/*
    544 * Called from the compare thread on the primary
    545 * for compare icmp packet
    546 */
    547static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
    548{
    549    uint16_t network_header_length = ppkt->ip->ip_hl << 2;
    550    uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
    551
    552    trace_colo_compare_main("compare icmp");
    553
    554    /*
    555     * Because of ppkt and spkt are both in the same connection,
    556     * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
    557     * same with spkt. In addition, IP header's Identification is a random
    558     * field, we can handle it in IP fragmentation function later.
    559     * COLO just concern the response net packet payload from primary guest
    560     * and secondary guest are same or not, So we ignored all IP header include
    561     * other field like TOS,TTL,IP Checksum. we only need to compare
    562     * the ip payload here.
    563     */
    564    if (ppkt->size != spkt->size) {
    565        trace_colo_compare_main("ICMP: payload size of packets are different");
    566        return -1;
    567    }
    568    if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
    569                                    ppkt->size - offset)) {
    570        trace_colo_compare_icmp_miscompare("primary pkt size",
    571                                           ppkt->size);
    572        trace_colo_compare_icmp_miscompare("Secondary pkt size",
    573                                           spkt->size);
    574#ifdef DEBUG_COLO_PACKETS
    575        qemu_hexdump(stderr, "colo-compare pri pkt", ppkt->data, ppkt->size);
    576        qemu_hexdump(stderr, "colo-compare sec pkt", spkt->data, spkt->size);
    577#endif
    578        return -1;
    579    } else {
    580        return 0;
    581    }
    582}
    583
    584/*
    585 * Called from the compare thread on the primary
    586 * for compare other packet
    587 */
    588static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
    589{
    590    uint16_t offset = ppkt->vnet_hdr_len;
    591
    592    trace_colo_compare_main("compare other");
    593    if (ppkt->size != spkt->size) {
    594        trace_colo_compare_main("Other: payload size of packets are different");
    595        return -1;
    596    }
    597    return colo_compare_packet_payload(ppkt, spkt, offset, offset,
    598                                       ppkt->size - offset);
    599}
    600
    601static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
    602{
    603    int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
    604
    605    if ((now - pkt->creation_ms) > (*check_time)) {
    606        trace_colo_old_packet_check_found(pkt->creation_ms);
    607        return 0;
    608    } else {
    609        return 1;
    610    }
    611}
    612
    613void colo_compare_register_notifier(Notifier *notify)
    614{
    615    notifier_list_add(&colo_compare_notifiers, notify);
    616}
    617
    618void colo_compare_unregister_notifier(Notifier *notify)
    619{
    620    notifier_remove(notify);
    621}
    622
    623static int colo_old_packet_check_one_conn(Connection *conn,
    624                                          CompareState *s)
    625{
    626    if (!g_queue_is_empty(&conn->primary_list)) {
    627        if (g_queue_find_custom(&conn->primary_list,
    628                                &s->compare_timeout,
    629                                (GCompareFunc)colo_old_packet_check_one))
    630            goto out;
    631    }
    632
    633    if (!g_queue_is_empty(&conn->secondary_list)) {
    634        if (g_queue_find_custom(&conn->secondary_list,
    635                                &s->compare_timeout,
    636                                (GCompareFunc)colo_old_packet_check_one))
    637            goto out;
    638    }
    639
    640    return 1;
    641
    642out:
    643    /* Do checkpoint will flush old packet */
    644    colo_compare_inconsistency_notify(s);
    645    return 0;
    646}
    647
    648/*
    649 * Look for old packets that the secondary hasn't matched,
    650 * if we have some then we have to checkpoint to wake
    651 * the secondary up.
    652 */
    653static void colo_old_packet_check(void *opaque)
    654{
    655    CompareState *s = opaque;
    656
    657    /*
    658     * If we find one old packet, stop finding job and notify
    659     * COLO frame do checkpoint.
    660     */
    661    g_queue_find_custom(&s->conn_list, s,
    662                        (GCompareFunc)colo_old_packet_check_one_conn);
    663}
    664
    665static void colo_compare_packet(CompareState *s, Connection *conn,
    666                                int (*HandlePacket)(Packet *spkt,
    667                                Packet *ppkt))
    668{
    669    Packet *pkt = NULL;
    670    GList *result = NULL;
    671
    672    while (!g_queue_is_empty(&conn->primary_list) &&
    673           !g_queue_is_empty(&conn->secondary_list)) {
    674        pkt = g_queue_pop_head(&conn->primary_list);
    675        result = g_queue_find_custom(&conn->secondary_list,
    676                 pkt, (GCompareFunc)HandlePacket);
    677
    678        if (result) {
    679            colo_release_primary_pkt(s, pkt);
    680            packet_destroy(result->data, NULL);
    681            g_queue_delete_link(&conn->secondary_list, result);
    682        } else {
    683            /*
    684             * If one packet arrive late, the secondary_list or
    685             * primary_list will be empty, so we can't compare it
    686             * until next comparison. If the packets in the list are
    687             * timeout, it will trigger a checkpoint request.
    688             */
    689            trace_colo_compare_main("packet different");
    690            g_queue_push_head(&conn->primary_list, pkt);
    691
    692            colo_compare_inconsistency_notify(s);
    693            break;
    694        }
    695    }
    696}
    697
    698/*
    699 * Called from the compare thread on the primary
    700 * for compare packet with secondary list of the
    701 * specified connection when a new packet was
    702 * queued to it.
    703 */
    704static void colo_compare_connection(void *opaque, void *user_data)
    705{
    706    CompareState *s = user_data;
    707    Connection *conn = opaque;
    708
    709    switch (conn->ip_proto) {
    710    case IPPROTO_TCP:
    711        colo_compare_tcp(s, conn);
    712        break;
    713    case IPPROTO_UDP:
    714        colo_compare_packet(s, conn, colo_packet_compare_udp);
    715        break;
    716    case IPPROTO_ICMP:
    717        colo_compare_packet(s, conn, colo_packet_compare_icmp);
    718        break;
    719    default:
    720        colo_compare_packet(s, conn, colo_packet_compare_other);
    721        break;
    722    }
    723}
    724
    725static void coroutine_fn _compare_chr_send(void *opaque)
    726{
    727    SendCo *sendco = opaque;
    728    CompareState *s = sendco->s;
    729    int ret = 0;
    730
    731    while (!g_queue_is_empty(&sendco->send_list)) {
    732        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
    733        uint32_t len = htonl(entry->size);
    734
    735        ret = qemu_chr_fe_write_all(sendco->chr, (uint8_t *)&len, sizeof(len));
    736
    737        if (ret != sizeof(len)) {
    738            g_free(entry->buf);
    739            g_slice_free(SendEntry, entry);
    740            goto err;
    741        }
    742
    743        if (!sendco->notify_remote_frame && s->vnet_hdr) {
    744            /*
    745             * We send vnet header len make other module(like filter-redirector)
    746             * know how to parse net packet correctly.
    747             */
    748            len = htonl(entry->vnet_hdr_len);
    749
    750            ret = qemu_chr_fe_write_all(sendco->chr,
    751                                        (uint8_t *)&len,
    752                                        sizeof(len));
    753
    754            if (ret != sizeof(len)) {
    755                g_free(entry->buf);
    756                g_slice_free(SendEntry, entry);
    757                goto err;
    758            }
    759        }
    760
    761        ret = qemu_chr_fe_write_all(sendco->chr,
    762                                    (uint8_t *)entry->buf,
    763                                    entry->size);
    764
    765        if (ret != entry->size) {
    766            g_free(entry->buf);
    767            g_slice_free(SendEntry, entry);
    768            goto err;
    769        }
    770
    771        g_free(entry->buf);
    772        g_slice_free(SendEntry, entry);
    773    }
    774
    775    sendco->ret = 0;
    776    goto out;
    777
    778err:
    779    while (!g_queue_is_empty(&sendco->send_list)) {
    780        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
    781        g_free(entry->buf);
    782        g_slice_free(SendEntry, entry);
    783    }
    784    sendco->ret = ret < 0 ? ret : -EIO;
    785out:
    786    sendco->co = NULL;
    787    sendco->done = true;
    788    aio_wait_kick();
    789}
    790
    791static int compare_chr_send(CompareState *s,
    792                            uint8_t *buf,
    793                            uint32_t size,
    794                            uint32_t vnet_hdr_len,
    795                            bool notify_remote_frame,
    796                            bool zero_copy)
    797{
    798    SendCo *sendco;
    799    SendEntry *entry;
    800
    801    if (notify_remote_frame) {
    802        sendco = &s->notify_sendco;
    803    } else {
    804        sendco = &s->out_sendco;
    805    }
    806
    807    if (!size) {
    808        return 0;
    809    }
    810
    811    entry = g_slice_new(SendEntry);
    812    entry->size = size;
    813    entry->vnet_hdr_len = vnet_hdr_len;
    814    if (zero_copy) {
    815        entry->buf = buf;
    816    } else {
    817        entry->buf = g_malloc(size);
    818        memcpy(entry->buf, buf, size);
    819    }
    820    g_queue_push_head(&sendco->send_list, entry);
    821
    822    if (sendco->done) {
    823        sendco->co = qemu_coroutine_create(_compare_chr_send, sendco);
    824        sendco->done = false;
    825        qemu_coroutine_enter(sendco->co);
    826        if (sendco->done) {
    827            /* report early errors */
    828            return sendco->ret;
    829        }
    830    }
    831
    832    /* assume success */
    833    return 0;
    834}
    835
    836static int compare_chr_can_read(void *opaque)
    837{
    838    return COMPARE_READ_LEN_MAX;
    839}
    840
    841/*
    842 * Called from the main thread on the primary for packets
    843 * arriving over the socket from the primary.
    844 */
    845static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
    846{
    847    CompareState *s = COLO_COMPARE(opaque);
    848    int ret;
    849
    850    ret = net_fill_rstate(&s->pri_rs, buf, size);
    851    if (ret == -1) {
    852        qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
    853                                 NULL, NULL, true);
    854        error_report("colo-compare primary_in error");
    855    }
    856}
    857
    858/*
    859 * Called from the main thread on the primary for packets
    860 * arriving over the socket from the secondary.
    861 */
    862static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
    863{
    864    CompareState *s = COLO_COMPARE(opaque);
    865    int ret;
    866
    867    ret = net_fill_rstate(&s->sec_rs, buf, size);
    868    if (ret == -1) {
    869        qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
    870                                 NULL, NULL, true);
    871        error_report("colo-compare secondary_in error");
    872    }
    873}
    874
    875static void compare_notify_chr(void *opaque, const uint8_t *buf, int size)
    876{
    877    CompareState *s = COLO_COMPARE(opaque);
    878    int ret;
    879
    880    ret = net_fill_rstate(&s->notify_rs, buf, size);
    881    if (ret == -1) {
    882        qemu_chr_fe_set_handlers(&s->chr_notify_dev, NULL, NULL, NULL, NULL,
    883                                 NULL, NULL, true);
    884        error_report("colo-compare notify_dev error");
    885    }
    886}
    887
    888/*
    889 * Check old packet regularly so it can watch for any packets
    890 * that the secondary hasn't produced equivalents of.
    891 */
    892static void check_old_packet_regular(void *opaque)
    893{
    894    CompareState *s = opaque;
    895
    896    /* if have old packet we will notify checkpoint */
    897    colo_old_packet_check(s);
    898    timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) +
    899              s->expired_scan_cycle);
    900}
    901
    902/* Public API, Used for COLO frame to notify compare event */
    903void colo_notify_compares_event(void *opaque, int event, Error **errp)
    904{
    905    CompareState *s;
    906    qemu_mutex_lock(&colo_compare_mutex);
    907
    908    if (!colo_compare_active) {
    909        qemu_mutex_unlock(&colo_compare_mutex);
    910        return;
    911    }
    912
    913    qemu_mutex_lock(&event_mtx);
    914    QTAILQ_FOREACH(s, &net_compares, next) {
    915        s->event = event;
    916        qemu_bh_schedule(s->event_bh);
    917        event_unhandled_count++;
    918    }
    919    /* Wait all compare threads to finish handling this event */
    920    while (event_unhandled_count > 0) {
    921        qemu_cond_wait(&event_complete_cond, &event_mtx);
    922    }
    923
    924    qemu_mutex_unlock(&event_mtx);
    925    qemu_mutex_unlock(&colo_compare_mutex);
    926}
    927
    928static void colo_compare_timer_init(CompareState *s)
    929{
    930    AioContext *ctx = iothread_get_aio_context(s->iothread);
    931
    932    s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_HOST,
    933                                SCALE_MS, check_old_packet_regular,
    934                                s);
    935    timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) +
    936              s->expired_scan_cycle);
    937}
    938
    939static void colo_compare_timer_del(CompareState *s)
    940{
    941    if (s->packet_check_timer) {
    942        timer_free(s->packet_check_timer);
    943        s->packet_check_timer = NULL;
    944    }
    945 }
    946
    947static void colo_flush_packets(void *opaque, void *user_data);
    948
    949static void colo_compare_handle_event(void *opaque)
    950{
    951    CompareState *s = opaque;
    952
    953    switch (s->event) {
    954    case COLO_EVENT_CHECKPOINT:
    955        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
    956        break;
    957    case COLO_EVENT_FAILOVER:
    958        break;
    959    default:
    960        break;
    961    }
    962
    963    qemu_mutex_lock(&event_mtx);
    964    assert(event_unhandled_count > 0);
    965    event_unhandled_count--;
    966    qemu_cond_broadcast(&event_complete_cond);
    967    qemu_mutex_unlock(&event_mtx);
    968}
    969
    970static void colo_compare_iothread(CompareState *s)
    971{
    972    AioContext *ctx = iothread_get_aio_context(s->iothread);
    973    object_ref(OBJECT(s->iothread));
    974    s->worker_context = iothread_get_g_main_context(s->iothread);
    975
    976    qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
    977                             compare_pri_chr_in, NULL, NULL,
    978                             s, s->worker_context, true);
    979    qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
    980                             compare_sec_chr_in, NULL, NULL,
    981                             s, s->worker_context, true);
    982    if (s->notify_dev) {
    983        qemu_chr_fe_set_handlers(&s->chr_notify_dev, compare_chr_can_read,
    984                                 compare_notify_chr, NULL, NULL,
    985                                 s, s->worker_context, true);
    986    }
    987
    988    colo_compare_timer_init(s);
    989    s->event_bh = aio_bh_new(ctx, colo_compare_handle_event, s);
    990}
    991
    992static char *compare_get_pri_indev(Object *obj, Error **errp)
    993{
    994    CompareState *s = COLO_COMPARE(obj);
    995
    996    return g_strdup(s->pri_indev);
    997}
    998
    999static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
   1000{
   1001    CompareState *s = COLO_COMPARE(obj);
   1002
   1003    g_free(s->pri_indev);
   1004    s->pri_indev = g_strdup(value);
   1005}
   1006
   1007static char *compare_get_sec_indev(Object *obj, Error **errp)
   1008{
   1009    CompareState *s = COLO_COMPARE(obj);
   1010
   1011    return g_strdup(s->sec_indev);
   1012}
   1013
   1014static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
   1015{
   1016    CompareState *s = COLO_COMPARE(obj);
   1017
   1018    g_free(s->sec_indev);
   1019    s->sec_indev = g_strdup(value);
   1020}
   1021
   1022static char *compare_get_outdev(Object *obj, Error **errp)
   1023{
   1024    CompareState *s = COLO_COMPARE(obj);
   1025
   1026    return g_strdup(s->outdev);
   1027}
   1028
   1029static void compare_set_outdev(Object *obj, const char *value, Error **errp)
   1030{
   1031    CompareState *s = COLO_COMPARE(obj);
   1032
   1033    g_free(s->outdev);
   1034    s->outdev = g_strdup(value);
   1035}
   1036
   1037static bool compare_get_vnet_hdr(Object *obj, Error **errp)
   1038{
   1039    CompareState *s = COLO_COMPARE(obj);
   1040
   1041    return s->vnet_hdr;
   1042}
   1043
   1044static void compare_set_vnet_hdr(Object *obj,
   1045                                 bool value,
   1046                                 Error **errp)
   1047{
   1048    CompareState *s = COLO_COMPARE(obj);
   1049
   1050    s->vnet_hdr = value;
   1051}
   1052
   1053static char *compare_get_notify_dev(Object *obj, Error **errp)
   1054{
   1055    CompareState *s = COLO_COMPARE(obj);
   1056
   1057    return g_strdup(s->notify_dev);
   1058}
   1059
   1060static void compare_set_notify_dev(Object *obj, const char *value, Error **errp)
   1061{
   1062    CompareState *s = COLO_COMPARE(obj);
   1063
   1064    g_free(s->notify_dev);
   1065    s->notify_dev = g_strdup(value);
   1066}
   1067
   1068static void compare_get_timeout(Object *obj, Visitor *v,
   1069                                const char *name, void *opaque,
   1070                                Error **errp)
   1071{
   1072    CompareState *s = COLO_COMPARE(obj);
   1073    uint64_t value = s->compare_timeout;
   1074
   1075    visit_type_uint64(v, name, &value, errp);
   1076}
   1077
   1078static void compare_set_timeout(Object *obj, Visitor *v,
   1079                                const char *name, void *opaque,
   1080                                Error **errp)
   1081{
   1082    CompareState *s = COLO_COMPARE(obj);
   1083    uint32_t value;
   1084
   1085    if (!visit_type_uint32(v, name, &value, errp)) {
   1086        return;
   1087    }
   1088    if (!value) {
   1089        error_setg(errp, "Property '%s.%s' requires a positive value",
   1090                   object_get_typename(obj), name);
   1091        return;
   1092    }
   1093    s->compare_timeout = value;
   1094}
   1095
   1096static void compare_get_expired_scan_cycle(Object *obj, Visitor *v,
   1097                                           const char *name, void *opaque,
   1098                                           Error **errp)
   1099{
   1100    CompareState *s = COLO_COMPARE(obj);
   1101    uint32_t value = s->expired_scan_cycle;
   1102
   1103    visit_type_uint32(v, name, &value, errp);
   1104}
   1105
   1106static void compare_set_expired_scan_cycle(Object *obj, Visitor *v,
   1107                                           const char *name, void *opaque,
   1108                                           Error **errp)
   1109{
   1110    CompareState *s = COLO_COMPARE(obj);
   1111    uint32_t value;
   1112
   1113    if (!visit_type_uint32(v, name, &value, errp)) {
   1114        return;
   1115    }
   1116    if (!value) {
   1117        error_setg(errp, "Property '%s.%s' requires a positive value",
   1118                   object_get_typename(obj), name);
   1119        return;
   1120    }
   1121    s->expired_scan_cycle = value;
   1122}
   1123
   1124static void get_max_queue_size(Object *obj, Visitor *v,
   1125                               const char *name, void *opaque,
   1126                               Error **errp)
   1127{
   1128    uint32_t value = max_queue_size;
   1129
   1130    visit_type_uint32(v, name, &value, errp);
   1131}
   1132
   1133static void set_max_queue_size(Object *obj, Visitor *v,
   1134                               const char *name, void *opaque,
   1135                               Error **errp)
   1136{
   1137    Error *local_err = NULL;
   1138    uint64_t value;
   1139
   1140    visit_type_uint64(v, name, &value, &local_err);
   1141    if (local_err) {
   1142        goto out;
   1143    }
   1144    if (!value) {
   1145        error_setg(&local_err, "Property '%s.%s' requires a positive value",
   1146                   object_get_typename(obj), name);
   1147        goto out;
   1148    }
   1149    max_queue_size = value;
   1150
   1151out:
   1152    error_propagate(errp, local_err);
   1153}
   1154
   1155static void compare_pri_rs_finalize(SocketReadState *pri_rs)
   1156{
   1157    CompareState *s = container_of(pri_rs, CompareState, pri_rs);
   1158    Connection *conn = NULL;
   1159
   1160    if (packet_enqueue(s, PRIMARY_IN, &conn)) {
   1161        trace_colo_compare_main("primary: unsupported packet in");
   1162        compare_chr_send(s,
   1163                         pri_rs->buf,
   1164                         pri_rs->packet_len,
   1165                         pri_rs->vnet_hdr_len,
   1166                         false,
   1167                         false);
   1168    } else {
   1169        /* compare packet in the specified connection */
   1170        colo_compare_connection(conn, s);
   1171    }
   1172}
   1173
   1174static void compare_sec_rs_finalize(SocketReadState *sec_rs)
   1175{
   1176    CompareState *s = container_of(sec_rs, CompareState, sec_rs);
   1177    Connection *conn = NULL;
   1178
   1179    if (packet_enqueue(s, SECONDARY_IN, &conn)) {
   1180        trace_colo_compare_main("secondary: unsupported packet in");
   1181    } else {
   1182        /* compare packet in the specified connection */
   1183        colo_compare_connection(conn, s);
   1184    }
   1185}
   1186
   1187static void compare_notify_rs_finalize(SocketReadState *notify_rs)
   1188{
   1189    CompareState *s = container_of(notify_rs, CompareState, notify_rs);
   1190
   1191    const char msg[] = "COLO_COMPARE_GET_XEN_INIT";
   1192    int ret;
   1193
   1194    if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
   1195                           notify_rs->buf,
   1196                           notify_rs->packet_len)) {
   1197        ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
   1198        if (ret < 0) {
   1199            error_report("Notify Xen COLO-frame INIT failed");
   1200        }
   1201    } else if (packet_matches_str("COLO_CHECKPOINT",
   1202                                  notify_rs->buf,
   1203                                  notify_rs->packet_len)) {
   1204        /* colo-compare do checkpoint, flush pri packet and remove sec packet */
   1205        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
   1206    } else {
   1207        error_report("COLO compare got unsupported instruction");
   1208    }
   1209}
   1210
   1211/*
   1212 * Return 0 is success.
   1213 * Return 1 is failed.
   1214 */
   1215static int find_and_check_chardev(Chardev **chr,
   1216                                  char *chr_name,
   1217                                  Error **errp)
   1218{
   1219    *chr = qemu_chr_find(chr_name);
   1220    if (*chr == NULL) {
   1221        error_setg(errp, "Device '%s' not found",
   1222                   chr_name);
   1223        return 1;
   1224    }
   1225
   1226    if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
   1227        error_setg(errp, "chardev \"%s\" is not reconnectable",
   1228                   chr_name);
   1229        return 1;
   1230    }
   1231
   1232    if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) {
   1233        error_setg(errp, "chardev \"%s\" cannot switch context",
   1234                   chr_name);
   1235        return 1;
   1236    }
   1237
   1238    return 0;
   1239}
   1240
   1241/*
   1242 * Called from the main thread on the primary
   1243 * to setup colo-compare.
   1244 */
   1245static void colo_compare_complete(UserCreatable *uc, Error **errp)
   1246{
   1247    CompareState *s = COLO_COMPARE(uc);
   1248    Chardev *chr;
   1249
   1250    if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
   1251        error_setg(errp, "colo compare needs 'primary_in' ,"
   1252                   "'secondary_in','outdev','iothread' property set");
   1253        return;
   1254    } else if (!strcmp(s->pri_indev, s->outdev) ||
   1255               !strcmp(s->sec_indev, s->outdev) ||
   1256               !strcmp(s->pri_indev, s->sec_indev)) {
   1257        error_setg(errp, "'indev' and 'outdev' could not be same "
   1258                   "for compare module");
   1259        return;
   1260    }
   1261
   1262    if (!s->compare_timeout) {
   1263        /* Set default value to 3000 MS */
   1264        s->compare_timeout = DEFAULT_TIME_OUT_MS;
   1265    }
   1266
   1267    if (!s->expired_scan_cycle) {
   1268        /* Set default value to 3000 MS */
   1269        s->expired_scan_cycle = REGULAR_PACKET_CHECK_MS;
   1270    }
   1271
   1272    if (!max_queue_size) {
   1273        /* Set default queue size to 1024 */
   1274        max_queue_size = MAX_QUEUE_SIZE;
   1275    }
   1276
   1277    if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
   1278        !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
   1279        return;
   1280    }
   1281
   1282    if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
   1283        !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
   1284        return;
   1285    }
   1286
   1287    if (find_and_check_chardev(&chr, s->outdev, errp) ||
   1288        !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
   1289        return;
   1290    }
   1291
   1292    net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
   1293    net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
   1294
   1295    /* Try to enable remote notify chardev, currently just for Xen COLO */
   1296    if (s->notify_dev) {
   1297        if (find_and_check_chardev(&chr, s->notify_dev, errp) ||
   1298            !qemu_chr_fe_init(&s->chr_notify_dev, chr, errp)) {
   1299            return;
   1300        }
   1301
   1302        net_socket_rs_init(&s->notify_rs, compare_notify_rs_finalize,
   1303                           s->vnet_hdr);
   1304    }
   1305
   1306    s->out_sendco.s = s;
   1307    s->out_sendco.chr = &s->chr_out;
   1308    s->out_sendco.notify_remote_frame = false;
   1309    s->out_sendco.done = true;
   1310    g_queue_init(&s->out_sendco.send_list);
   1311
   1312    if (s->notify_dev) {
   1313        s->notify_sendco.s = s;
   1314        s->notify_sendco.chr = &s->chr_notify_dev;
   1315        s->notify_sendco.notify_remote_frame = true;
   1316        s->notify_sendco.done = true;
   1317        g_queue_init(&s->notify_sendco.send_list);
   1318    }
   1319
   1320    g_queue_init(&s->conn_list);
   1321
   1322    s->connection_track_table = g_hash_table_new_full(connection_key_hash,
   1323                                                      connection_key_equal,
   1324                                                      g_free,
   1325                                                      connection_destroy);
   1326
   1327    colo_compare_iothread(s);
   1328
   1329    qemu_mutex_lock(&colo_compare_mutex);
   1330    if (!colo_compare_active) {
   1331        qemu_mutex_init(&event_mtx);
   1332        qemu_cond_init(&event_complete_cond);
   1333        colo_compare_active = true;
   1334    }
   1335    QTAILQ_INSERT_TAIL(&net_compares, s, next);
   1336    qemu_mutex_unlock(&colo_compare_mutex);
   1337
   1338    return;
   1339}
   1340
   1341static void colo_flush_packets(void *opaque, void *user_data)
   1342{
   1343    CompareState *s = user_data;
   1344    Connection *conn = opaque;
   1345    Packet *pkt = NULL;
   1346
   1347    while (!g_queue_is_empty(&conn->primary_list)) {
   1348        pkt = g_queue_pop_head(&conn->primary_list);
   1349        compare_chr_send(s,
   1350                         pkt->data,
   1351                         pkt->size,
   1352                         pkt->vnet_hdr_len,
   1353                         false,
   1354                         true);
   1355        packet_destroy_partial(pkt, NULL);
   1356    }
   1357    while (!g_queue_is_empty(&conn->secondary_list)) {
   1358        pkt = g_queue_pop_head(&conn->secondary_list);
   1359        packet_destroy(pkt, NULL);
   1360    }
   1361}
   1362
   1363static void colo_compare_class_init(ObjectClass *oc, void *data)
   1364{
   1365    UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
   1366
   1367    ucc->complete = colo_compare_complete;
   1368}
   1369
   1370static void colo_compare_init(Object *obj)
   1371{
   1372    CompareState *s = COLO_COMPARE(obj);
   1373
   1374    object_property_add_str(obj, "primary_in",
   1375                            compare_get_pri_indev, compare_set_pri_indev);
   1376    object_property_add_str(obj, "secondary_in",
   1377                            compare_get_sec_indev, compare_set_sec_indev);
   1378    object_property_add_str(obj, "outdev",
   1379                            compare_get_outdev, compare_set_outdev);
   1380    object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
   1381                            (Object **)&s->iothread,
   1382                            object_property_allow_set_link,
   1383                            OBJ_PROP_LINK_STRONG);
   1384    /* This parameter just for Xen COLO */
   1385    object_property_add_str(obj, "notify_dev",
   1386                            compare_get_notify_dev, compare_set_notify_dev);
   1387
   1388    object_property_add(obj, "compare_timeout", "uint64",
   1389                        compare_get_timeout,
   1390                        compare_set_timeout, NULL, NULL);
   1391
   1392    object_property_add(obj, "expired_scan_cycle", "uint32",
   1393                        compare_get_expired_scan_cycle,
   1394                        compare_set_expired_scan_cycle, NULL, NULL);
   1395
   1396    object_property_add(obj, "max_queue_size", "uint32",
   1397                        get_max_queue_size,
   1398                        set_max_queue_size, NULL, NULL);
   1399
   1400    s->vnet_hdr = false;
   1401    object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
   1402                             compare_set_vnet_hdr);
   1403}
   1404
   1405void colo_compare_cleanup(void)
   1406{
   1407    CompareState *tmp = NULL;
   1408    CompareState *n = NULL;
   1409
   1410    QTAILQ_FOREACH_SAFE(tmp, &net_compares, next, n) {
   1411        object_unparent(OBJECT(tmp));
   1412    }
   1413}
   1414
   1415static void colo_compare_finalize(Object *obj)
   1416{
   1417    CompareState *s = COLO_COMPARE(obj);
   1418    CompareState *tmp = NULL;
   1419
   1420    qemu_mutex_lock(&colo_compare_mutex);
   1421    QTAILQ_FOREACH(tmp, &net_compares, next) {
   1422        if (tmp == s) {
   1423            QTAILQ_REMOVE(&net_compares, s, next);
   1424            break;
   1425        }
   1426    }
   1427    if (QTAILQ_EMPTY(&net_compares)) {
   1428        colo_compare_active = false;
   1429        qemu_mutex_destroy(&event_mtx);
   1430        qemu_cond_destroy(&event_complete_cond);
   1431    }
   1432    qemu_mutex_unlock(&colo_compare_mutex);
   1433
   1434    qemu_chr_fe_deinit(&s->chr_pri_in, false);
   1435    qemu_chr_fe_deinit(&s->chr_sec_in, false);
   1436    qemu_chr_fe_deinit(&s->chr_out, false);
   1437    if (s->notify_dev) {
   1438        qemu_chr_fe_deinit(&s->chr_notify_dev, false);
   1439    }
   1440
   1441    colo_compare_timer_del(s);
   1442
   1443    qemu_bh_delete(s->event_bh);
   1444
   1445    AioContext *ctx = iothread_get_aio_context(s->iothread);
   1446    aio_context_acquire(ctx);
   1447    AIO_WAIT_WHILE(ctx, !s->out_sendco.done);
   1448    if (s->notify_dev) {
   1449        AIO_WAIT_WHILE(ctx, !s->notify_sendco.done);
   1450    }
   1451    aio_context_release(ctx);
   1452
   1453    /* Release all unhandled packets after compare thead exited */
   1454    g_queue_foreach(&s->conn_list, colo_flush_packets, s);
   1455    AIO_WAIT_WHILE(NULL, !s->out_sendco.done);
   1456
   1457    g_queue_clear(&s->conn_list);
   1458    g_queue_clear(&s->out_sendco.send_list);
   1459    if (s->notify_dev) {
   1460        g_queue_clear(&s->notify_sendco.send_list);
   1461    }
   1462
   1463    if (s->connection_track_table) {
   1464        g_hash_table_destroy(s->connection_track_table);
   1465    }
   1466
   1467    object_unref(OBJECT(s->iothread));
   1468
   1469    g_free(s->pri_indev);
   1470    g_free(s->sec_indev);
   1471    g_free(s->outdev);
   1472    g_free(s->notify_dev);
   1473}
   1474
   1475static void __attribute__((__constructor__)) colo_compare_init_globals(void)
   1476{
   1477    colo_compare_active = false;
   1478    qemu_mutex_init(&colo_compare_mutex);
   1479}
   1480
   1481static const TypeInfo colo_compare_info = {
   1482    .name = TYPE_COLO_COMPARE,
   1483    .parent = TYPE_OBJECT,
   1484    .instance_size = sizeof(CompareState),
   1485    .instance_init = colo_compare_init,
   1486    .instance_finalize = colo_compare_finalize,
   1487    .class_size = sizeof(CompareClass),
   1488    .class_init = colo_compare_class_init,
   1489    .interfaces = (InterfaceInfo[]) {
   1490        { TYPE_USER_CREATABLE },
   1491        { }
   1492    }
   1493};
   1494
   1495static void register_types(void)
   1496{
   1497    type_register_static(&colo_compare_info);
   1498}
   1499
   1500type_init(register_types);