multifd-zstd.c (9070B)
1/* 2 * Multifd zlib compression implementation 3 * 4 * Copyright (c) 2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13#include "qemu/osdep.h" 14#include <zstd.h> 15#include "qemu/rcu.h" 16#include "exec/target_page.h" 17#include "qapi/error.h" 18#include "migration.h" 19#include "trace.h" 20#include "multifd.h" 21 22struct zstd_data { 23 /* stream for compression */ 24 ZSTD_CStream *zcs; 25 /* stream for decompression */ 26 ZSTD_DStream *zds; 27 /* buffers */ 28 ZSTD_inBuffer in; 29 ZSTD_outBuffer out; 30 /* compressed buffer */ 31 uint8_t *zbuff; 32 /* size of compressed buffer */ 33 uint32_t zbuff_len; 34}; 35 36/* Multifd zstd compression */ 37 38/** 39 * zstd_send_setup: setup send side 40 * 41 * Setup each channel with zstd compression. 42 * 43 * Returns 0 for success or -1 for error 44 * 45 * @p: Params for the channel that we are using 46 * @errp: pointer to an error 47 */ 48static int zstd_send_setup(MultiFDSendParams *p, Error **errp) 49{ 50 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 51 struct zstd_data *z = g_new0(struct zstd_data, 1); 52 int res; 53 54 p->data = z; 55 z->zcs = ZSTD_createCStream(); 56 if (!z->zcs) { 57 g_free(z); 58 error_setg(errp, "multifd %d: zstd createCStream failed", p->id); 59 return -1; 60 } 61 62 res = ZSTD_initCStream(z->zcs, migrate_multifd_zstd_level()); 63 if (ZSTD_isError(res)) { 64 ZSTD_freeCStream(z->zcs); 65 g_free(z); 66 error_setg(errp, "multifd %d: initCStream failed with error %s", 67 p->id, ZSTD_getErrorName(res)); 68 return -1; 69 } 70 /* We will never have more than page_count pages */ 71 z->zbuff_len = page_count * qemu_target_page_size(); 72 z->zbuff_len *= 2; 73 z->zbuff = g_try_malloc(z->zbuff_len); 74 if (!z->zbuff) { 75 ZSTD_freeCStream(z->zcs); 76 g_free(z); 77 error_setg(errp, "multifd %d: out of memory for zbuff", p->id); 78 return -1; 79 } 80 return 0; 81} 82 83/** 84 * zstd_send_cleanup: cleanup send side 85 * 86 * Close the channel and return memory. 87 * 88 * @p: Params for the channel that we are using 89 */ 90static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp) 91{ 92 struct zstd_data *z = p->data; 93 94 ZSTD_freeCStream(z->zcs); 95 z->zcs = NULL; 96 g_free(z->zbuff); 97 z->zbuff = NULL; 98 g_free(p->data); 99 p->data = NULL; 100} 101 102/** 103 * zstd_send_prepare: prepare date to be able to send 104 * 105 * Create a compressed buffer with all the pages that we are going to 106 * send. 107 * 108 * Returns 0 for success or -1 for error 109 * 110 * @p: Params for the channel that we are using 111 * @used: number of pages used 112 */ 113static int zstd_send_prepare(MultiFDSendParams *p, uint32_t used, Error **errp) 114{ 115 struct iovec *iov = p->pages->iov; 116 struct zstd_data *z = p->data; 117 int ret; 118 uint32_t i; 119 120 z->out.dst = z->zbuff; 121 z->out.size = z->zbuff_len; 122 z->out.pos = 0; 123 124 for (i = 0; i < used; i++) { 125 ZSTD_EndDirective flush = ZSTD_e_continue; 126 127 if (i == used - 1) { 128 flush = ZSTD_e_flush; 129 } 130 z->in.src = iov[i].iov_base; 131 z->in.size = iov[i].iov_len; 132 z->in.pos = 0; 133 134 /* 135 * Welcome to compressStream2 semantics 136 * 137 * We need to loop while: 138 * - return is > 0 139 * - there is input available 140 * - there is output space free 141 */ 142 do { 143 ret = ZSTD_compressStream2(z->zcs, &z->out, &z->in, flush); 144 } while (ret > 0 && (z->in.size - z->in.pos > 0) 145 && (z->out.size - z->out.pos > 0)); 146 if (ret > 0 && (z->in.size - z->in.pos > 0)) { 147 error_setg(errp, "multifd %d: compressStream buffer too small", 148 p->id); 149 return -1; 150 } 151 if (ZSTD_isError(ret)) { 152 error_setg(errp, "multifd %d: compressStream error %s", 153 p->id, ZSTD_getErrorName(ret)); 154 return -1; 155 } 156 } 157 p->next_packet_size = z->out.pos; 158 p->flags |= MULTIFD_FLAG_ZSTD; 159 160 return 0; 161} 162 163/** 164 * zstd_send_write: do the actual write of the data 165 * 166 * Do the actual write of the comprresed buffer. 167 * 168 * Returns 0 for success or -1 for error 169 * 170 * @p: Params for the channel that we are using 171 * @used: number of pages used 172 * @errp: pointer to an error 173 */ 174static int zstd_send_write(MultiFDSendParams *p, uint32_t used, Error **errp) 175{ 176 struct zstd_data *z = p->data; 177 178 return qio_channel_write_all(p->c, (void *)z->zbuff, p->next_packet_size, 179 errp); 180} 181 182/** 183 * zstd_recv_setup: setup receive side 184 * 185 * Create the compressed channel and buffer. 186 * 187 * Returns 0 for success or -1 for error 188 * 189 * @p: Params for the channel that we are using 190 * @errp: pointer to an error 191 */ 192static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp) 193{ 194 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 195 struct zstd_data *z = g_new0(struct zstd_data, 1); 196 int ret; 197 198 p->data = z; 199 z->zds = ZSTD_createDStream(); 200 if (!z->zds) { 201 g_free(z); 202 error_setg(errp, "multifd %d: zstd createDStream failed", p->id); 203 return -1; 204 } 205 206 ret = ZSTD_initDStream(z->zds); 207 if (ZSTD_isError(ret)) { 208 ZSTD_freeDStream(z->zds); 209 g_free(z); 210 error_setg(errp, "multifd %d: initDStream failed with error %s", 211 p->id, ZSTD_getErrorName(ret)); 212 return -1; 213 } 214 215 /* We will never have more than page_count pages */ 216 z->zbuff_len = page_count * qemu_target_page_size(); 217 /* We know compression "could" use more space */ 218 z->zbuff_len *= 2; 219 z->zbuff = g_try_malloc(z->zbuff_len); 220 if (!z->zbuff) { 221 ZSTD_freeDStream(z->zds); 222 g_free(z); 223 error_setg(errp, "multifd %d: out of memory for zbuff", p->id); 224 return -1; 225 } 226 return 0; 227} 228 229/** 230 * zstd_recv_cleanup: setup receive side 231 * 232 * For no compression this function does nothing. 233 * 234 * @p: Params for the channel that we are using 235 */ 236static void zstd_recv_cleanup(MultiFDRecvParams *p) 237{ 238 struct zstd_data *z = p->data; 239 240 ZSTD_freeDStream(z->zds); 241 z->zds = NULL; 242 g_free(z->zbuff); 243 z->zbuff = NULL; 244 g_free(p->data); 245 p->data = NULL; 246} 247 248/** 249 * zstd_recv_pages: read the data from the channel into actual pages 250 * 251 * Read the compressed buffer, and uncompress it into the actual 252 * pages. 253 * 254 * Returns 0 for success or -1 for error 255 * 256 * @p: Params for the channel that we are using 257 * @used: number of pages used 258 * @errp: pointer to an error 259 */ 260static int zstd_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp) 261{ 262 uint32_t in_size = p->next_packet_size; 263 uint32_t out_size = 0; 264 uint32_t expected_size = used * qemu_target_page_size(); 265 uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 266 struct zstd_data *z = p->data; 267 int ret; 268 int i; 269 270 if (flags != MULTIFD_FLAG_ZSTD) { 271 error_setg(errp, "multifd %d: flags received %x flags expected %x", 272 p->id, flags, MULTIFD_FLAG_ZSTD); 273 return -1; 274 } 275 ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp); 276 277 if (ret != 0) { 278 return ret; 279 } 280 281 z->in.src = z->zbuff; 282 z->in.size = in_size; 283 z->in.pos = 0; 284 285 for (i = 0; i < used; i++) { 286 struct iovec *iov = &p->pages->iov[i]; 287 288 z->out.dst = iov->iov_base; 289 z->out.size = iov->iov_len; 290 z->out.pos = 0; 291 292 /* 293 * Welcome to decompressStream semantics 294 * 295 * We need to loop while: 296 * - return is > 0 297 * - there is input available 298 * - we haven't put out a full page 299 */ 300 do { 301 ret = ZSTD_decompressStream(z->zds, &z->out, &z->in); 302 } while (ret > 0 && (z->in.size - z->in.pos > 0) 303 && (z->out.pos < iov->iov_len)); 304 if (ret > 0 && (z->out.pos < iov->iov_len)) { 305 error_setg(errp, "multifd %d: decompressStream buffer too small", 306 p->id); 307 return -1; 308 } 309 if (ZSTD_isError(ret)) { 310 error_setg(errp, "multifd %d: decompressStream returned %s", 311 p->id, ZSTD_getErrorName(ret)); 312 return ret; 313 } 314 out_size += z->out.pos; 315 } 316 if (out_size != expected_size) { 317 error_setg(errp, "multifd %d: packet size received %d size expected %d", 318 p->id, out_size, expected_size); 319 return -1; 320 } 321 return 0; 322} 323 324static MultiFDMethods multifd_zstd_ops = { 325 .send_setup = zstd_send_setup, 326 .send_cleanup = zstd_send_cleanup, 327 .send_prepare = zstd_send_prepare, 328 .send_write = zstd_send_write, 329 .recv_setup = zstd_recv_setup, 330 .recv_cleanup = zstd_recv_cleanup, 331 .recv_pages = zstd_recv_pages 332}; 333 334static void multifd_zstd_register(void) 335{ 336 multifd_register_ops(MULTIFD_COMPRESSION_ZSTD, &multifd_zstd_ops); 337} 338 339migration_init(multifd_zstd_register);