diff --git a/migration-Refactoring-multi-thread-compress-migratio.patch b/migration-Refactoring-multi-thread-compress-migratio.patch new file mode 100644 index 0000000..d3ab4d0 --- /dev/null +++ b/migration-Refactoring-multi-thread-compress-migratio.patch @@ -0,0 +1,305 @@ +From 524d8cee48006918cf181f2817e4ec3ce5a3bb12 Mon Sep 17 00:00:00 2001 +From: Zeyu Jin +Date: Sat, 30 Jan 2021 15:21:17 +0800 +Subject: [PATCH] migration: Refactoring multi-thread compress migration + +Code refactor for the compression procedure which includes: + +1. Move qemu_compress_data and qemu_put_compression_data from qemu-file.c to +ram.c, for the reason that most part of the code logical has nothing to do +with qemu-file. Besides, the decompression code is located at ram.c only. + +2. Simplify the function input arguments for compression and decompression. +Wrap the input into the param structure which already exists. This change also +makes the function much more flexible for other compression methods. + +Signed-off-by: Zeyu Jin +Signed-off-by: Ying Fang +--- + migration/qemu-file.c | 78 ++++++--------------------------------- + migration/qemu-file.h | 4 +- + migration/ram.c | 85 +++++++++++++++++++++++++++++++------------ + 3 files changed, 75 insertions(+), 92 deletions(-) + +diff --git a/migration/qemu-file.c b/migration/qemu-file.c +index be0d6c8ca8..3bba694ed4 100644 +--- a/migration/qemu-file.c ++++ b/migration/qemu-file.c +@@ -695,72 +695,6 @@ uint64_t qemu_get_be64(QEMUFile *f) + return v; + } + +-/* return the size after compression, or negative value on error */ +-static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len, +- const uint8_t *source, size_t source_len) +-{ +- int err; +- +- err = deflateReset(stream); +- if (err != Z_OK) { +- return -1; +- } +- +- stream->avail_in = source_len; +- stream->next_in = (uint8_t *)source; +- stream->avail_out = dest_len; +- stream->next_out = dest; +- +- err = deflate(stream, Z_FINISH); +- if (err != Z_STREAM_END) { +- return -1; +- } +- +- return stream->next_out - dest; +-} +- +-/* Compress size bytes of data start at p and store the compressed +- * data to the buffer of f. +- * +- * When f is not writable, return -1 if f has no space to save the +- * compressed data. +- * When f is wirtable and it has no space to save the compressed data, +- * do fflush first, if f still has no space to save the compressed +- * data, return -1. +- */ +-ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream, +- const uint8_t *p, size_t size) +-{ +- ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t); +- +- if (blen < compressBound(size)) { +- if (!qemu_file_is_writable(f)) { +- return -1; +- } +- qemu_fflush(f); +- blen = IO_BUF_SIZE - sizeof(int32_t); +- if (blen < compressBound(size)) { +- return -1; +- } +- } +- +- blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t), +- blen, p, size); +- if (blen < 0) { +- return -1; +- } +- +- qemu_put_be32(f, blen); +- if (f->ops->writev_buffer) { +- add_to_iovec(f, f->buf + f->buf_index, blen, false); +- } +- f->buf_index += blen; +- if (f->buf_index == IO_BUF_SIZE) { +- qemu_fflush(f); +- } +- return blen + sizeof(int32_t); +-} +- + /* Put the data in the buffer of f_src to the buffer of f_des, and + * then reset the buf_index of f_src to 0. + */ +@@ -820,3 +754,15 @@ void qemu_file_set_blocking(QEMUFile *f, bool block) + f->ops->set_blocking(f->opaque, block); + } + } ++ ++ssize_t qemu_put_compress_start(QEMUFile *f, uint8_t **dest_ptr) ++{ ++ *dest_ptr = f->buf + f->buf_index + sizeof(int32_t); ++ return IO_BUF_SIZE - f->buf_index - sizeof(int32_t); ++} ++ ++void qemu_put_compress_end(QEMUFile *f, unsigned int v) ++{ ++ qemu_put_be32(f, v); ++ add_buf_to_iovec(f, v); ++} +diff --git a/migration/qemu-file.h b/migration/qemu-file.h +index 5de9fa2e96..6570e53e13 100644 +--- a/migration/qemu-file.h ++++ b/migration/qemu-file.h +@@ -134,8 +134,6 @@ bool qemu_file_is_writable(QEMUFile *f); + + size_t qemu_peek_buffer(QEMUFile *f, uint8_t **buf, size_t size, size_t offset); + size_t qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, size_t size); +-ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream, +- const uint8_t *p, size_t size); + int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src); + + /* +@@ -162,6 +160,8 @@ void ram_control_before_iterate(QEMUFile *f, uint64_t flags); + void ram_control_after_iterate(QEMUFile *f, uint64_t flags); + void ram_control_load_hook(QEMUFile *f, uint64_t flags, void *data); + ++ssize_t qemu_put_compress_start(QEMUFile *f, uint8_t **dest_ptr); ++void qemu_put_compress_end(QEMUFile *f, unsigned int v); + /* Whenever this is found in the data stream, the flags + * will be passed to ram_control_load_hook in the incoming-migration + * side. This lets before_ram_iterate/after_ram_iterate add +diff --git a/migration/ram.c b/migration/ram.c +index 92ce1a53e7..f78a681ca2 100644 +--- a/migration/ram.c ++++ b/migration/ram.c +@@ -449,26 +449,22 @@ static QemuThread *decompress_threads; + static QemuMutex decomp_done_lock; + static QemuCond decomp_done_cond; + +-static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, +- ram_addr_t offset, uint8_t *source_buf); ++static bool do_compress_ram_page(CompressParam *param, RAMBlock *block); + + static void *do_data_compress(void *opaque) + { + CompressParam *param = opaque; + RAMBlock *block; +- ram_addr_t offset; + bool zero_page; + + qemu_mutex_lock(¶m->mutex); + while (!param->quit) { + if (param->block) { + block = param->block; +- offset = param->offset; + param->block = NULL; + qemu_mutex_unlock(¶m->mutex); + +- zero_page = do_compress_ram_page(param->file, ¶m->stream, +- block, offset, param->originbuf); ++ zero_page = do_compress_ram_page(param, block); + + qemu_mutex_lock(&comp_done_lock); + param->done = true; +@@ -2212,28 +2208,73 @@ static int ram_save_multifd_page(RAMState *rs, RAMBlock *block, + return 1; + } + +-static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, +- ram_addr_t offset, uint8_t *source_buf) ++/* ++ * Compress size bytes of data start at p and store the compressed ++ * data to the buffer of f. ++ * ++ * Since the file is dummy file with empty_ops, return -1 if f has no space to ++ * save the compressed data. ++ */ ++static ssize_t qemu_put_compression_data(CompressParam *param, size_t size) ++{ ++ int err; ++ uint8_t *dest = NULL; ++ z_stream *stream = ¶m->stream; ++ uint8_t *p = param->originbuf; ++ QEMUFile *f = f = param->file; ++ ssize_t blen = qemu_put_compress_start(f, &dest); ++ ++ if (blen < compressBound(size)) { ++ return -1; ++ } ++ ++ err = deflateReset(stream); ++ if (err != Z_OK) { ++ return -1; ++ } ++ ++ stream->avail_in = size; ++ stream->next_in = p; ++ stream->avail_out = blen; ++ stream->next_out = dest; ++ ++ err = deflate(stream, Z_FINISH); ++ if (err != Z_STREAM_END) { ++ return -1; ++ } ++ ++ blen = stream->next_out - dest; ++ if (blen < 0) { ++ return -1; ++ } ++ ++ qemu_put_compress_end(f, blen); ++ return blen + sizeof(int32_t); ++} ++ ++static bool do_compress_ram_page(CompressParam *param, RAMBlock *block) + { + RAMState *rs = ram_state; ++ ram_addr_t offset = param->offset; + uint8_t *p = block->host + (offset & TARGET_PAGE_MASK); + bool zero_page = false; + int ret; + +- if (save_zero_page_to_file(rs, f, block, offset)) { ++ if (save_zero_page_to_file(rs, param->file, block, offset)) { + zero_page = true; + goto exit; + } + +- save_page_header(rs, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE); ++ save_page_header(rs, param->file, block, ++ offset | RAM_SAVE_FLAG_COMPRESS_PAGE); + + /* + * copy it to a internal buffer to avoid it being modified by VM + * so that we can catch up the error during compression and + * decompression + */ +- memcpy(source_buf, p, TARGET_PAGE_SIZE); +- ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE); ++ memcpy(param->originbuf, p, TARGET_PAGE_SIZE); ++ ret = qemu_put_compression_data(param, TARGET_PAGE_SIZE); + if (ret < 0) { + qemu_file_set_error(migrate_get_current()->to_dst_file, ret); + error_report("compressed data failed!"); +@@ -3926,19 +3967,20 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size) + + /* return the size after decompression, or negative value on error */ + static int +-qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len, +- const uint8_t *source, size_t source_len) ++qemu_uncompress_data(DecompressParam *param, uint8_t *dest, size_t pagesize) + { + int err; + ++ z_stream *stream = ¶m->stream; ++ + err = inflateReset(stream); + if (err != Z_OK) { + return -1; + } + +- stream->avail_in = source_len; +- stream->next_in = (uint8_t *)source; +- stream->avail_out = dest_len; ++ stream->avail_in = param->len; ++ stream->next_in = param->compbuf; ++ stream->avail_out = pagesize; + stream->next_out = dest; + + err = inflate(stream, Z_NO_FLUSH); +@@ -3952,22 +3994,17 @@ qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len, + static void *do_data_decompress(void *opaque) + { + DecompressParam *param = opaque; +- unsigned long pagesize; + uint8_t *des; +- int len, ret; ++ int ret; + + qemu_mutex_lock(¶m->mutex); + while (!param->quit) { + if (param->des) { + des = param->des; +- len = param->len; + param->des = 0; + qemu_mutex_unlock(¶m->mutex); + +- pagesize = TARGET_PAGE_SIZE; +- +- ret = qemu_uncompress_data(¶m->stream, des, pagesize, +- param->compbuf, len); ++ ret = qemu_uncompress_data(param, des, TARGET_PAGE_SIZE); + if (ret < 0 && migrate_get_current()->decompress_error_check) { + error_report("decompress data failed"); + qemu_file_set_error(decomp_file, ret); +-- +2.27.0 +