diff --git a/migration-Add-multi-thread-compress-ops.patch b/migration-Add-multi-thread-compress-ops.patch new file mode 100644 index 0000000..043d9f9 --- /dev/null +++ b/migration-Add-multi-thread-compress-ops.patch @@ -0,0 +1,442 @@ +From 99fddf2ffeefc99ab15b3428dbd2b46476be3e7e Mon Sep 17 00:00:00 2001 +From: Zeyu Jin +Date: Sat, 30 Jan 2021 15:57:31 +0800 +Subject: [PATCH] migration: Add multi-thread compress ops + +Add the MigrationCompressOps and MigrationDecompressOps structures to make +the compression method configurable for multi-thread compression migration. + +Signed-off-by: Zeyu Jin +Signed-off-by: Ying Fang +--- + migration/migration.c | 9 ++ + migration/migration.h | 1 + + migration/ram.c | 269 ++++++++++++++++++++++++++++++------------ + 3 files changed, 201 insertions(+), 78 deletions(-) + +diff --git a/migration/migration.c b/migration/migration.c +index c79bf09269..67425fde7a 100644 +--- a/migration/migration.c ++++ b/migration/migration.c +@@ -2143,6 +2143,15 @@ int migrate_decompress_threads(void) + return s->parameters.decompress_threads; + } + ++CompressMethod migrate_compress_method(void) ++{ ++ MigrationState *s; ++ ++ s = migrate_get_current(); ++ ++ return s->parameters.compress_method; ++} ++ + bool migrate_dirty_bitmaps(void) + { + MigrationState *s; +diff --git a/migration/migration.h b/migration/migration.h +index f2bd4ebe33..4aa72297fc 100644 +--- a/migration/migration.h ++++ b/migration/migration.h +@@ -319,6 +319,7 @@ int migrate_compress_level(void); + int migrate_compress_threads(void); + int migrate_compress_wait_thread(void); + int migrate_decompress_threads(void); ++CompressMethod migrate_compress_method(void); + bool migrate_use_events(void); + bool migrate_postcopy_blocktime(void); + +diff --git a/migration/ram.c b/migration/ram.c +index f78a681ca2..3ed808a4ca 100644 +--- a/migration/ram.c ++++ b/migration/ram.c +@@ -417,6 +417,9 @@ struct CompressParam { + /* internally used fields */ + z_stream stream; + uint8_t *originbuf; ++ ++ /* for zlib compression */ ++ z_stream stream; + }; + typedef struct CompressParam CompressParam; + +@@ -428,12 +431,29 @@ struct DecompressParam { + void *des; + uint8_t *compbuf; + int len; ++ ++ /* for zlib compression */ + z_stream stream; + }; + typedef struct DecompressParam DecompressParam; + ++typedef struct { ++ int (*save_setup)(CompressParam *param); ++ void (*save_cleanup)(CompressParam *param); ++ ssize_t (*compress_data)(CompressParam *param, size_t size); ++} MigrationCompressOps; ++ ++typedef struct { ++ int (*load_setup)(DecompressParam *param); ++ void (*load_cleanup)(DecompressParam *param); ++ int (*decompress_data)(DecompressParam *param, uint8_t *dest, size_t size); ++ int (*check_len)(int len); ++} MigrationDecompressOps; ++ + static CompressParam *comp_param; + static QemuThread *compress_threads; ++static MigrationCompressOps *compress_ops; ++static MigrationDecompressOps *decompress_ops; + /* comp_done_cond is used to wake up the migration thread when + * one of the compression threads has finished the compression. + * comp_done_lock is used to co-work with comp_done_cond. +@@ -451,6 +471,157 @@ static QemuCond decomp_done_cond; + + static bool do_compress_ram_page(CompressParam *param, RAMBlock *block); + ++static int zlib_save_setup(CompressParam *param) ++{ ++ if (deflateInit(¶m->stream, ++ migrate_compress_level()) != Z_OK) { ++ return -1; ++ } ++ ++ return 0; ++} ++ ++static ssize_t zlib_compress_data(CompressParam *param, size_t size) ++ ++ int err; ++ uint8_t *dest = NULL; ++ z_stream *stream = ¶m->stream; ++ uint8_t *p = param->originbuf; ++ QEMUFile *f = f = param->file; ++ ssize_t blen = qemu_put_compress_start(f, &dest); ++ ++ if (blen < compressBound(size)) { ++ return -1; ++ } ++ ++ err = deflateReset(stream); ++ if (err != Z_OK) { ++ return -1; ++ } ++ ++ stream->avail_in = size; ++ stream->next_in = p; ++ stream->avail_out = blen; ++ stream->next_out = dest; ++ ++ err = deflate(stream, Z_FINISH); ++ if (err != Z_STREAM_END) { ++ return -1; ++ } ++ ++ blen = stream->next_out - dest; ++ if (blen < 0) { ++ return -1; ++ } ++ ++ qemu_put_compress_end(f, blen); ++ return blen + sizeof(int32_t); ++} ++ ++static void zlib_save_cleanup(CompressParam *param) ++{ ++ deflateEnd(¶m->stream); ++} ++ ++static int zlib_load_setup(DecompressParam *param) ++{ ++ if (inflateInit(¶m->stream) != Z_OK) { ++ return -1; ++ } ++ ++ return 0; ++} ++ ++static int ++zlib_decompress_data(DecompressParam *param, uint8_t *dest, size_t size) ++{ ++ int err; ++ ++ z_stream *stream = ¶m->stream; ++ ++ err = inflateReset(stream); ++ if (err != Z_OK) { ++ return -1; ++ } ++ ++ stream->avail_in = param->len; ++ stream->next_in = param->compbuf; ++ stream->avail_out = size; ++ stream->next_out = dest; ++ ++ err = inflate(stream, Z_NO_FLUSH); ++ if (err != Z_STREAM_END) { ++ return -1; ++ } ++ ++ return stream->total_out; ++} ++ ++static void zlib_load_cleanup(DecompressParam *param) ++{ ++ inflateEnd(¶m->stream); ++} ++ ++static int zlib_check_len(int len) ++{ ++ return len < 0 || len > compressBound(TARGET_PAGE_SIZE); ++} ++ ++static int set_compress_ops(void) ++{ ++ compress_ops = g_new0(MigrationCompressOps, 1); ++ ++ switch (migrate_compress_method()) { ++ case COMPRESS_METHOD_ZLIB: ++ compress_ops->save_setup = zlib_save_setup; ++ compress_ops->save_cleanup = zlib_save_cleanup; ++ compress_ops->compress_data = zlib_compress_data; ++ break; ++ default: ++ return -1; ++ } ++ ++ return 0; ++} ++ ++static int set_decompress_ops(void) ++{ ++ decompress_ops = g_new0(MigrationDecompressOps, 1); ++ ++ switch (migrate_compress_method()) { ++ case COMPRESS_METHOD_ZLIB: ++ decompress_ops->load_setup = zlib_load_setup; ++ decompress_ops->load_cleanup = zlib_load_cleanup; ++ decompress_ops->decompress_data = zlib_decompress_data; ++ decompress_ops->check_len = zlib_check_len; ++ break; ++ default: ++ return -1; ++ } ++ ++ return 0; ++} ++ ++static void clean_compress_ops(void) ++{ ++ compress_ops->save_setup = NULL; ++ compress_ops->save_cleanup = NULL; ++ compress_ops->compress_data = NULL; ++ ++ g_free(compress_ops); ++ compress_ops = NULL; ++} ++ ++static void clean_decompress_ops(void) ++{ ++ decompress_ops->load_setup = NULL; ++ decompress_ops->load_cleanup = NULL; ++ decompress_ops->decompress_data = NULL; ++ ++ g_free(decompress_ops); ++ decompress_ops = NULL; ++} ++ + static void *do_data_compress(void *opaque) + { + CompressParam *param = opaque; +@@ -508,7 +679,7 @@ static void compress_threads_save_cleanup(void) + qemu_thread_join(compress_threads + i); + qemu_mutex_destroy(&comp_param[i].mutex); + qemu_cond_destroy(&comp_param[i].cond); +- deflateEnd(&comp_param[i].stream); ++ compress_ops->save_cleanup(&comp_param[i]); + g_free(comp_param[i].originbuf); + qemu_fclose(comp_param[i].file); + comp_param[i].file = NULL; +@@ -519,6 +690,7 @@ static void compress_threads_save_cleanup(void) + g_free(comp_param); + compress_threads = NULL; + comp_param = NULL; ++ clean_compress_ops(); + } + + static int compress_threads_save_setup(void) +@@ -528,6 +700,12 @@ static int compress_threads_save_setup(void) + if (!migrate_use_compression()) { + return 0; + } ++ ++ if (set_compress_ops() < 0) { ++ clean_compress_ops(); ++ return -1; ++ } ++ + thread_count = migrate_compress_threads(); + compress_threads = g_new0(QemuThread, thread_count); + comp_param = g_new0(CompressParam, thread_count); +@@ -539,8 +717,7 @@ static int compress_threads_save_setup(void) + goto exit; + } + +- if (deflateInit(&comp_param[i].stream, +- migrate_compress_level()) != Z_OK) { ++ if (compress_ops->save_setup(&comp_param[i]) < 0) { + g_free(comp_param[i].originbuf); + goto exit; + } +@@ -2208,50 +2385,6 @@ static int ram_save_multifd_page(RAMState *rs, RAMBlock *block, + return 1; + } + +-/* +- * Compress size bytes of data start at p and store the compressed +- * data to the buffer of f. +- * +- * Since the file is dummy file with empty_ops, return -1 if f has no space to +- * save the compressed data. +- */ +-static ssize_t qemu_put_compression_data(CompressParam *param, size_t size) +-{ +- int err; +- uint8_t *dest = NULL; +- z_stream *stream = ¶m->stream; +- uint8_t *p = param->originbuf; +- QEMUFile *f = f = param->file; +- ssize_t blen = qemu_put_compress_start(f, &dest); +- +- if (blen < compressBound(size)) { +- return -1; +- } +- +- err = deflateReset(stream); +- if (err != Z_OK) { +- return -1; +- } +- +- stream->avail_in = size; +- stream->next_in = p; +- stream->avail_out = blen; +- stream->next_out = dest; +- +- err = deflate(stream, Z_FINISH); +- if (err != Z_STREAM_END) { +- return -1; +- } +- +- blen = stream->next_out - dest; +- if (blen < 0) { +- return -1; +- } +- +- qemu_put_compress_end(f, blen); +- return blen + sizeof(int32_t); +-} +- + static bool do_compress_ram_page(CompressParam *param, RAMBlock *block) + { + RAMState *rs = ram_state; +@@ -2274,7 +2407,7 @@ static bool do_compress_ram_page(CompressParam *param, RAMBlock *block) + * decompression + */ + memcpy(param->originbuf, p, TARGET_PAGE_SIZE); +- ret = qemu_put_compression_data(param, TARGET_PAGE_SIZE); ++ ret = compress_ops->compress_data(param, TARGET_PAGE_SIZE); + if (ret < 0) { + qemu_file_set_error(migrate_get_current()->to_dst_file, ret); + error_report("compressed data failed!"); +@@ -3965,32 +4098,6 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size) + } + } + +-/* return the size after decompression, or negative value on error */ +-static int +-qemu_uncompress_data(DecompressParam *param, uint8_t *dest, size_t pagesize) +-{ +- int err; +- +- z_stream *stream = ¶m->stream; +- +- err = inflateReset(stream); +- if (err != Z_OK) { +- return -1; +- } +- +- stream->avail_in = param->len; +- stream->next_in = param->compbuf; +- stream->avail_out = pagesize; +- stream->next_out = dest; +- +- err = inflate(stream, Z_NO_FLUSH); +- if (err != Z_STREAM_END) { +- return -1; +- } +- +- return stream->total_out; +-} +- + static void *do_data_decompress(void *opaque) + { + DecompressParam *param = opaque; +@@ -4004,7 +4111,7 @@ static void *do_data_decompress(void *opaque) + param->des = 0; + qemu_mutex_unlock(¶m->mutex); + +- ret = qemu_uncompress_data(param, des, TARGET_PAGE_SIZE); ++ ret = decompress_ops->decompress_data(param, des, TARGET_PAGE_SIZE); + if (ret < 0 && migrate_get_current()->decompress_error_check) { + error_report("decompress data failed"); + qemu_file_set_error(decomp_file, ret); +@@ -4074,7 +4181,7 @@ static void compress_threads_load_cleanup(void) + qemu_thread_join(decompress_threads + i); + qemu_mutex_destroy(&decomp_param[i].mutex); + qemu_cond_destroy(&decomp_param[i].cond); +- inflateEnd(&decomp_param[i].stream); ++ decompress_ops->load_cleanup(&decomp_param[i]); + g_free(decomp_param[i].compbuf); + decomp_param[i].compbuf = NULL; + } +@@ -4083,6 +4190,7 @@ static void compress_threads_load_cleanup(void) + decompress_threads = NULL; + decomp_param = NULL; + decomp_file = NULL; ++ clean_decompress_ops(); + } + + static int compress_threads_load_setup(QEMUFile *f) +@@ -4093,6 +4201,11 @@ static int compress_threads_load_setup(QEMUFile *f) + return 0; + } + ++ if (set_decompress_ops() < 0) { ++ clean_decompress_ops(); ++ return -1; ++ } ++ + thread_count = migrate_decompress_threads(); + decompress_threads = g_new0(QemuThread, thread_count); + decomp_param = g_new0(DecompressParam, thread_count); +@@ -4100,7 +4213,7 @@ static int compress_threads_load_setup(QEMUFile *f) + qemu_cond_init(&decomp_done_cond); + decomp_file = f; + for (i = 0; i < thread_count; i++) { +- if (inflateInit(&decomp_param[i].stream) != Z_OK) { ++ if (decompress_ops->load_setup(&decomp_param[i]) < 0) { + goto exit; + } + +@@ -4642,7 +4755,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) + + case RAM_SAVE_FLAG_COMPRESS_PAGE: + len = qemu_get_be32(f); +- if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) { ++ if (decompress_ops->check_len(len)) { + error_report("Invalid compressed data length: %d", len); + ret = -EINVAL; + break; +-- +2.27.0 +