From 5896dedf32c7e4417bd7f3e889ca41a34b06f5db Mon Sep 17 00:00:00 2001 From: Chuan Zheng Date: Sat, 30 Jan 2021 15:57:31 +0800 Subject: [PATCH] migration: Add multi-thread compress ops Add the MigrationCompressOps and MigrationDecompressOps structures to make the compression method configurable for multi-thread compression migration. Signed-off-by: Chuan Zheng Signed-off-by: Zeyu Jin Signed-off-by: Ying Fang --- migration/options.c | 9 ++ migration/options.h | 1 + migration/ram-compress.c | 261 ++++++++++++++++++++++++++------------- migration/ram-compress.h | 31 ++++- migration/ram.c | 4 +- 5 files changed, 215 insertions(+), 91 deletions(-) diff --git a/migration/options.c b/migration/options.c index af7ea7b346..6aaee702dc 100644 --- a/migration/options.c +++ b/migration/options.c @@ -799,6 +799,15 @@ int migrate_decompress_threads(void) return s->parameters.decompress_threads; } +CompressMethod migrate_compress_method(void) +{ + MigrationState *s; + + s = migrate_get_current(); + + return s->parameters.compress_method; +} + uint64_t migrate_downtime_limit(void) { MigrationState *s = migrate_get_current(); diff --git a/migration/options.h b/migration/options.h index 246c160aee..9aca5e41ad 100644 --- a/migration/options.h +++ b/migration/options.h @@ -78,6 +78,7 @@ uint8_t migrate_cpu_throttle_increment(void); uint8_t migrate_cpu_throttle_initial(void); bool migrate_cpu_throttle_tailslow(void); int migrate_decompress_threads(void); +CompressMethod migrate_compress_method(void); uint64_t migrate_downtime_limit(void); uint8_t migrate_max_cpu_throttle(void); uint64_t migrate_max_bandwidth(void); diff --git a/migration/ram-compress.c b/migration/ram-compress.c index 2be344acbc..6e37b22492 100644 --- a/migration/ram-compress.c +++ b/migration/ram-compress.c @@ -65,26 +65,167 @@ static QemuThread *compress_threads; static QemuMutex comp_done_lock; static QemuCond comp_done_cond; -struct DecompressParam { - bool done; - bool quit; - QemuMutex mutex; - QemuCond cond; - void *des; - uint8_t *compbuf; - int len; - z_stream stream; -}; -typedef struct DecompressParam DecompressParam; - static QEMUFile *decomp_file; static DecompressParam *decomp_param; static QemuThread *decompress_threads; +MigrationCompressOps *compress_ops; +MigrationDecompressOps *decompress_ops; static QemuMutex decomp_done_lock; static QemuCond decomp_done_cond; static CompressResult do_compress_ram_page(CompressParam *param, RAMBlock *block); +static int zlib_save_setup(CompressParam *param) +{ + if (deflateInit(¶m->stream, + migrate_compress_level()) != Z_OK) { + return -1; + } + + return 0; +} + +static ssize_t zlib_compress_data(CompressParam *param, size_t size) +{ + int err; + uint8_t *dest = NULL; + z_stream *stream = ¶m->stream; + uint8_t *p = param->originbuf; + QEMUFile *f = f = param->file; + ssize_t blen = qemu_put_compress_start(f, &dest); + + if (blen < compressBound(size)) { + return -1; + } + + err = deflateReset(stream); + if (err != Z_OK) { + return -1; + } + + stream->avail_in = size; + stream->next_in = p; + stream->avail_out = blen; + stream->next_out = dest; + + err = deflate(stream, Z_FINISH); + if (err != Z_STREAM_END) { + return -1; + } + + blen = stream->next_out - dest; + if (blen < 0) { + return -1; + } + + qemu_put_compress_end(f, blen); + return blen + sizeof(int32_t); +} + +static void zlib_save_cleanup(CompressParam *param) +{ + deflateEnd(¶m->stream); +} + +static int zlib_load_setup(DecompressParam *param) +{ + if (inflateInit(¶m->stream) != Z_OK) { + return -1; + } + + return 0; +} + +static int +zlib_decompress_data(DecompressParam *param, uint8_t *dest, size_t size) +{ + int err; + + z_stream *stream = ¶m->stream; + + err = inflateReset(stream); + if (err != Z_OK) { + return -1; + } + + stream->avail_in = param->len; + stream->next_in = param->compbuf; + stream->avail_out = size; + stream->next_out = dest; + + err = inflate(stream, Z_NO_FLUSH); + if (err != Z_STREAM_END) { + return -1; + } + + return stream->total_out; +} + +static void zlib_load_cleanup(DecompressParam *param) +{ + inflateEnd(¶m->stream); +} + +static int zlib_check_len(int len) +{ + return len < 0 || len > compressBound(TARGET_PAGE_SIZE); +} + +static int set_compress_ops(void) +{ + compress_ops = g_new0(MigrationCompressOps, 1); + + switch (migrate_compress_method()) { + case COMPRESS_METHOD_ZLIB: + compress_ops->save_setup = zlib_save_setup; + compress_ops->save_cleanup = zlib_save_cleanup; + compress_ops->compress_data = zlib_compress_data; + break; + default: + return -1; + } + + return 0; +} + +static int set_decompress_ops(void) +{ + decompress_ops = g_new0(MigrationDecompressOps, 1); + + switch (migrate_compress_method()) { + case COMPRESS_METHOD_ZLIB: + decompress_ops->load_setup = zlib_load_setup; + decompress_ops->load_cleanup = zlib_load_cleanup; + decompress_ops->decompress_data = zlib_decompress_data; + decompress_ops->check_len = zlib_check_len; + break; + default: + return -1; + } + + return 0; +} + +static void clean_compress_ops(void) +{ + compress_ops->save_setup = NULL; + compress_ops->save_cleanup = NULL; + compress_ops->compress_data = NULL; + + g_free(compress_ops); + compress_ops = NULL; +} + +static void clean_decompress_ops(void) +{ + decompress_ops->load_setup = NULL; + decompress_ops->load_cleanup = NULL; + decompress_ops->decompress_data = NULL; + + g_free(decompress_ops); + decompress_ops = NULL; +} + static void *do_data_compress(void *opaque) { CompressParam *param = opaque; @@ -141,7 +282,7 @@ void compress_threads_save_cleanup(void) qemu_thread_join(compress_threads + i); qemu_mutex_destroy(&comp_param[i].mutex); qemu_cond_destroy(&comp_param[i].cond); - deflateEnd(&comp_param[i].stream); + compress_ops->save_cleanup(&comp_param[i]); g_free(comp_param[i].originbuf); qemu_fclose(comp_param[i].file); comp_param[i].file = NULL; @@ -152,6 +293,7 @@ void compress_threads_save_cleanup(void) g_free(comp_param); compress_threads = NULL; comp_param = NULL; + clean_compress_ops(); } int compress_threads_save_setup(void) @@ -161,6 +303,12 @@ int compress_threads_save_setup(void) if (!migrate_compress()) { return 0; } + + if (set_compress_ops() < 0) { + clean_compress_ops(); + return -1; + } + thread_count = migrate_compress_threads(); compress_threads = g_new0(QemuThread, thread_count); comp_param = g_new0(CompressParam, thread_count); @@ -172,8 +320,7 @@ int compress_threads_save_setup(void) goto exit; } - if (deflateInit(&comp_param[i].stream, - migrate_compress_level()) != Z_OK) { + if (compress_ops->save_setup(&comp_param[i]) < 0) { g_free(comp_param[i].originbuf); goto exit; } @@ -198,50 +345,6 @@ exit: return -1; } -/* - * Compress size bytes of data start at p and store the compressed - * data to the buffer of f. - * - * Since the file is dummy file with empty_ops, return -1 if f has no space to - * save the compressed data. - */ -static ssize_t qemu_put_compression_data(CompressParam *param, size_t size) -{ - int err; - uint8_t *dest = NULL; - z_stream *stream = ¶m->stream; - uint8_t *p = param->originbuf; - QEMUFile *f = f = param->file; - ssize_t blen = qemu_put_compress_start(f, &dest); - - if (blen < compressBound(size)) { - return -1; - } - - err = deflateReset(stream); - if (err != Z_OK) { - return -1; - } - - stream->avail_in = size; - stream->next_in = p; - stream->avail_out = blen; - stream->next_out = dest; - - err = deflate(stream, Z_FINISH); - if (err != Z_STREAM_END) { - return -1; - } - - blen = stream->next_out - dest; - if (blen < 0) { - return -1; - } - - qemu_put_compress_end(f, blen); - return blen + sizeof(int32_t); -} - static CompressResult do_compress_ram_page(CompressParam *param, RAMBlock *block) { uint8_t *p = block->host + (param->offset & TARGET_PAGE_MASK); @@ -260,7 +363,7 @@ static CompressResult do_compress_ram_page(CompressParam *param, RAMBlock *block * decompression */ memcpy(param->originbuf, p, page_size); - ret = qemu_put_compression_data(param, page_size); + ret = compress_ops->compress_data(param, page_size); if (ret < 0) { qemu_file_set_error(migrate_get_current()->to_dst_file, ret); error_report("compressed data failed!"); @@ -356,32 +459,6 @@ bool compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset, } } -/* return the size after decompression, or negative value on error */ -static int -qemu_uncompress_data(DecompressParam *param, uint8_t *dest, size_t pagesize) -{ - int err; - - z_stream *stream = ¶m->stream; - - err = inflateReset(stream); - if (err != Z_OK) { - return -1; - } - - stream->avail_in = param->len; - stream->next_in = param->compbuf; - stream->avail_out = pagesize; - stream->next_out = dest; - - err = inflate(stream, Z_NO_FLUSH); - if (err != Z_STREAM_END) { - return -1; - } - - return stream->total_out; -} - static void *do_data_decompress(void *opaque) { DecompressParam *param = opaque; @@ -398,7 +475,7 @@ static void *do_data_decompress(void *opaque) pagesize = qemu_target_page_size(); - ret = qemu_uncompress_data(param, des, pagesize); + ret = decompress_ops->decompress_data(param, des, pagesize); if (ret < 0 && migrate_get_current()->decompress_error_check) { error_report("decompress data failed"); qemu_file_set_error(decomp_file, ret); @@ -466,7 +543,7 @@ void compress_threads_load_cleanup(void) qemu_thread_join(decompress_threads + i); qemu_mutex_destroy(&decomp_param[i].mutex); qemu_cond_destroy(&decomp_param[i].cond); - inflateEnd(&decomp_param[i].stream); + decompress_ops->load_cleanup(&decomp_param[i]); g_free(decomp_param[i].compbuf); decomp_param[i].compbuf = NULL; } @@ -475,6 +552,7 @@ void compress_threads_load_cleanup(void) decompress_threads = NULL; decomp_param = NULL; decomp_file = NULL; + clean_decompress_ops(); } int compress_threads_load_setup(QEMUFile *f) @@ -485,6 +563,11 @@ int compress_threads_load_setup(QEMUFile *f) return 0; } + if (set_decompress_ops() < 0) { + clean_decompress_ops(); + return -1; + } + /* * set compression_counters memory to zero for a new migration */ @@ -497,7 +580,7 @@ int compress_threads_load_setup(QEMUFile *f) qemu_cond_init(&decomp_done_cond); decomp_file = f; for (i = 0; i < thread_count; i++) { - if (inflateInit(&decomp_param[i].stream) != Z_OK) { + if (decompress_ops->load_setup(&decomp_param[i]) < 0) { goto exit; } diff --git a/migration/ram-compress.h b/migration/ram-compress.h index 0d89a2f55e..daf241987f 100644 --- a/migration/ram-compress.h +++ b/migration/ram-compress.h @@ -39,6 +39,20 @@ enum CompressResult { }; typedef enum CompressResult CompressResult; +struct DecompressParam { + bool done; + bool quit; + QemuMutex mutex; + QemuCond cond; + void *des; + uint8_t *compbuf; + int len; + + /* for zlib compression */ + z_stream stream; +}; +typedef struct DecompressParam DecompressParam; + struct CompressParam { bool done; bool quit; @@ -51,11 +65,26 @@ struct CompressParam { ram_addr_t offset; /* internally used fields */ - z_stream stream; uint8_t *originbuf; + + /* for zlib compression */ + z_stream stream; }; typedef struct CompressParam CompressParam; +typedef struct { + int (*save_setup)(CompressParam *param); + void (*save_cleanup)(CompressParam *param); + ssize_t (*compress_data)(CompressParam *param, size_t size); +} MigrationCompressOps; + +typedef struct { + int (*load_setup)(DecompressParam *param); + void (*load_cleanup)(DecompressParam *param); + int (*decompress_data)(DecompressParam *param, uint8_t *dest, size_t size); + int (*check_len)(int len); +} MigrationDecompressOps; + void compress_threads_save_cleanup(void); int compress_threads_save_setup(void); diff --git a/migration/ram.c b/migration/ram.c index 8c7886ab79..f9b2b9b985 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -96,6 +96,8 @@ XBZRLECacheStats xbzrle_counters; +extern MigrationDecompressOps *decompress_ops; + /* used by the search for pages to send */ struct PageSearchStatus { /* The migration channel used for a specific host page */ @@ -3979,7 +3981,7 @@ static int ram_load_precopy(QEMUFile *f) case RAM_SAVE_FLAG_COMPRESS_PAGE: len = qemu_get_be32(f); - if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) { + if (decompress_ops->check_len(len)) { error_report("Invalid compressed data length: %d", len); ret = -EINVAL; break; -- 2.27.0