From 52abc18f1c15c62ec512263ac95750097568bdb4 Mon Sep 17 00:00:00 2001 From: Keqian Zhu Date: Fri, 12 Aug 2022 18:28:14 +0800 Subject: [PATCH] qos: More bugfixes for qos management Take another VM stop reason to account, add aditional setting for cpu QOS and add a job to sync VM pids to resctrl. Signed-off-by: Keqian Zhu --- ...Add-a-job-to-sync-VM-pids-to-resctrl.patch | 323 ++++++++++++++++++ ...os-Add-aditional-setting-for-cpu-QOS.patch | 43 +++ ...ke-another-VM-stop-reason-to-account.patch | 96 ++++++ skylark.spec | 12 +- 4 files changed, 473 insertions(+), 1 deletion(-) create mode 100644 cachembw_qos-Add-a-job-to-sync-VM-pids-to-resctrl.patch create mode 100644 cpu_qos-Add-aditional-setting-for-cpu-QOS.patch create mode 100644 guestinfo-Take-another-VM-stop-reason-to-account.patch diff --git a/cachembw_qos-Add-a-job-to-sync-VM-pids-to-resctrl.patch b/cachembw_qos-Add-a-job-to-sync-VM-pids-to-resctrl.patch new file mode 100644 index 0000000..229ca69 --- /dev/null +++ b/cachembw_qos-Add-a-job-to-sync-VM-pids-to-resctrl.patch @@ -0,0 +1,323 @@ +From fe004fc62e8d47835c96902b26fc3f6e088559e6 Mon Sep 17 00:00:00 2001 +From: Dongxu Sun +Date: Tue, 16 Aug 2022 10:46:47 +0800 +Subject: [PATCH 3/3] cachembw_qos: Add a job to sync VM pids to resctrl + +Delete unnecessary libvirt event registrations, and poll +the tasks file to sync VM pids to resctrl task file. + +Signed-off-by: Dongxu Sun +--- + data_collector/datacollector.py | 7 -- + data_collector/guestinfo.py | 7 -- + qos_controller/cachembwcontroller.py | 22 ++--- + skylark.py | 129 +++++---------------------- + 4 files changed, 28 insertions(+), 137 deletions(-) + +diff --git a/data_collector/datacollector.py b/data_collector/datacollector.py +index aaddc42..8aa20ab 100644 +--- a/data_collector/datacollector.py ++++ b/data_collector/datacollector.py +@@ -31,13 +31,6 @@ class DataCollector: + def set_static_power_info(self): + self.host_info.set_host_power_attribute() + +- def reset_base_info(self, vir_conn): +- self.guest_info.clear_guest_info() +- self.guest_info.update_guest_info(vir_conn, self.host_info.host_topo) +- +- def reset_power_info(self): +- self.host_info.update_host_power_info() +- + def update_base_info(self, vir_conn): + self.guest_info.update_guest_info(vir_conn, self.host_info.host_topo) + +diff --git a/data_collector/guestinfo.py b/data_collector/guestinfo.py +index 38fa827..97d9f20 100644 +--- a/data_collector/guestinfo.py ++++ b/data_collector/guestinfo.py +@@ -125,13 +125,6 @@ class GuestInfo: + self.domain_online = [] + self.running_domain_in_cpus = [] + +- def clear_guest_info(self): +- self.vm_dict.clear() +- self.low_prio_vm_dict.clear() +- self.vm_online_dict.clear() +- self.domain_online.clear() +- self.running_domain_in_cpus.clear() +- + def update_guest_info(self, conn, host_topo): + self.running_domain_in_cpus.clear() + self.vm_online_dict.clear() +diff --git a/qos_controller/cachembwcontroller.py b/qos_controller/cachembwcontroller.py +index 8be9329..bbfe08f 100644 +--- a/qos_controller/cachembwcontroller.py ++++ b/qos_controller/cachembwcontroller.py +@@ -25,7 +25,6 @@ from data_collector.guestinfo import GuestInfo + from data_collector.hostinfo import ResctrlInfo + + LOW_VMS_RESGROUP_PATH = "/sys/fs/resctrl/low_prio_machine" +-LOW_VMS_PID_CGRP_PATH = "/sys/fs/cgroup/pids/low_prio_machine.slice" + LOW_MBW_INIT_FLOOR = 0.1 + LOW_MBW_INIT_CEIL = 0.2 + LOW_CACHE_INIT_FLOOR = 1 +@@ -56,11 +55,6 @@ class CacheMBWController: + ResgroupFileOperations.create_group_dir(LOW_VMS_RESGROUP_PATH) + self.__get_low_init_alloc(resctrl_info) + self.set_low_init_alloc(resctrl_info) +- with os.scandir(LOW_VMS_PID_CGRP_PATH) as it: +- for entry in it: +- if entry.is_file(): +- continue +- self.__add_vm_pids(entry.name) + + def __get_low_init_alloc(self, resctrl_info: ResctrlInfo): + low_vms_mbw_init = float(os.getenv("MIN_MBW_LOW_VMS")) +@@ -111,25 +105,21 @@ class CacheMBWController: + util.file_write(os.path.join( + LOW_VMS_RESGROUP_PATH, "schemata"), schemata_mbw_alloc) + +- def domain_updated(self, domain, guest_info: GuestInfo): +- if domain.ID() in guest_info.low_prio_vm_dict: +- self.__add_vm_pids(guest_info.vm_dict[domain.ID()].cgroup_name) +- + @staticmethod +- def __add_vm_pids(vm_cgrp_name): +- tasks_path = os.path.join(LOW_VMS_PID_CGRP_PATH, vm_cgrp_name, "tasks") ++ def add_vm_pids(tasks_path): + if not os.access(tasks_path, os.R_OK): + LOGGER.warning( + "The path %s is not readable, please check." % tasks_path) + return +- LOGGER.info("Add %s pids to %s" % +- (vm_cgrp_name, os.path.join(LOW_VMS_RESGROUP_PATH, "tasks"))) ++ ++ resctrl_tsk_path = os.path.join(LOW_VMS_RESGROUP_PATH, "tasks") ++ LOGGER.debug("Add %s pids to %s" % (tasks_path, resctrl_tsk_path)) + try: + with open(tasks_path) as tasks: + for task in tasks: +- util.file_write(os.path.join(LOW_VMS_RESGROUP_PATH, "tasks"), task) ++ util.file_write(resctrl_tsk_path, task) + except IOError as e: +- LOGGER.error("Failed to add VM(%s) pids to resctrl: %s" % (vm_cgrp_name, str(e))) ++ LOGGER.error("Failed to add %s pids to resctrl: %s" % (tasks_path, str(e))) + # If the VM doesn't stop, raise exception. + if os.access(tasks_path, os.F_OK): + raise +diff --git a/skylark.py b/skylark.py +index 8962ba5..c281a54 100644 +--- a/skylark.py ++++ b/skylark.py +@@ -27,7 +27,7 @@ import stat + import subprocess + import sys + +-from apscheduler.schedulers.background import BackgroundScheduler ++from apscheduler.schedulers.background import BlockingScheduler + from apscheduler.events import EVENT_JOB_ERROR + import libvirt + +@@ -47,12 +47,7 @@ LIBVIRT_DRIVE_TYPE = None + PID_FILE = None + MSR_PATH = "/dev/cpu/0/msr" + PID_FILE_NAME = "/var/run/skylarkd.pid" +- +-STATE_TO_STRING = ['VIR_DOMAIN_EVENT_DEFINED', 'VIR_DOMAIN_EVENT_UNDEFINED', +- 'VIR_DOMAIN_EVENT_STARTED', 'VIR_DOMAIN_EVENT_SUSPENDED', +- 'VIR_DOMAIN_EVENT_RESUMED', 'VIR_DOMAIN_EVENT_STOPPED', +- 'VIR_DOMAIN_EVENT_SHUTDOWN', 'VIR_DOMAIN_EVENT_PMSUSPENDED', +- 'VIR_DOMAIN_EVENT_CRASHED', 'VIR_DOMAIN_EVENT_LAST'] ++LOW_VMS_PID_CGRP_PATH = "/sys/fs/cgroup/pids/low_prio_machine.slice" + + + class QosManager: +@@ -63,17 +58,18 @@ class QosManager: + self.cpu_controller = CpuController() + self.net_controller = NetController() + self.cachembw_controller = CacheMBWController() +- self.has_job = False + + def scheduler_listener(self, event): + if event.exception: ++ LOGGER.info("The Scheduler detects an exception, send SIGABRT and restart skylark...") + self.scheduler.remove_all_jobs() ++ os.kill(os.getpid(), signal.SIGABRT) + + def init_scheduler(self): +- self.scheduler = BackgroundScheduler(logger=LOGGER) ++ self.scheduler = BlockingScheduler(logger=LOGGER) + if os.getenv("POWER_QOS_MANAGEMENT", "false").lower() == "true": + self.scheduler.add_job(self.__do_power_manage, trigger='interval', seconds=1, id='do_power_manage') +- self.has_job = True ++ self.scheduler.add_job(self.__do_resctrl_sync, trigger='interval', seconds=0.5, id='do_resctrl_sync') + self.scheduler.add_listener(self.scheduler_listener, EVENT_JOB_ERROR) + + def init_data_collector(self): +@@ -95,19 +91,19 @@ class QosManager: + def start_scheduler(self): + self.scheduler.start() + +- def reset_data_collector(self): +- self.scheduler.shutdown(wait=True) +- self.data_collector.reset_base_info(self.vir_conn) +- if os.getenv("POWER_QOS_MANAGEMENT", "false").lower() == "true": +- self.data_collector.reset_power_info() +- self.init_scheduler() +- self.start_scheduler() +- + def __do_power_manage(self): + self.data_collector.update_base_info(self.vir_conn) + self.data_collector.update_power_info() + self.power_analyzer.power_manage(self.data_collector, self.cpu_controller) + ++ def __do_resctrl_sync(self): ++ with os.scandir(LOW_VMS_PID_CGRP_PATH) as it: ++ for entry in it: ++ if entry.is_file(): ++ continue ++ tasks_path = os.path.join(LOW_VMS_PID_CGRP_PATH, entry.name, "tasks") ++ self.cachembw_controller.add_vm_pids(tasks_path) ++ + + def create_pid_file(): + global PID_FILE +@@ -140,85 +136,27 @@ def remove_pid_file(): + util.remove_file(PID_FILE.name) + + +-def register_callback_event(conn, event_id, callback_func, opaque): +- if conn is not None and event_id >= 0: +- try: +- return conn.domainEventRegisterAny(None, event_id, callback_func, opaque) +- except libvirt.libvirtError as error: +- LOGGER.warning("Register event exception %s" % str(error)) +- return -1 +- +- +-def deregister_callback_event(conn, callback_id): +- if conn is not None and callback_id >= 0: +- try: +- conn.domainEventDeregisterAny(callback_id) +- except libvirt.libvirtError as error: +- LOGGER.warning("Deregister event exception %s" % str(error)) +- +- +-def event_lifecycle_callback(conn, dom, event, detail, opaque): +- LOGGER.info("Occur lifecycle event: domain %s(%d) state changed to %s" % ( +- dom.name(), dom.ID(), STATE_TO_STRING[event])) +- vm_started = (event == libvirt.VIR_DOMAIN_EVENT_STARTED) +- vm_stopped = (event == libvirt.VIR_DOMAIN_EVENT_STOPPED) +- if vm_started or vm_stopped: +- QOS_MANAGER_ENTRY.reset_data_collector() +- if vm_started: +- QOS_MANAGER_ENTRY.cachembw_controller.domain_updated(dom, +- QOS_MANAGER_ENTRY.data_collector.guest_info) +- return 0 +- +- +-def event_device_added_callback(conn, dom, dev_alias, opaque): +- device_name = str(dev_alias[0:4]) +- if device_name == "vcpu": +- LOGGER.info("Occur device added event: domain %s(%d) add vcpu" % (dom.name(), dom.ID())) +- QOS_MANAGER_ENTRY.reset_data_collector() +- QOS_MANAGER_ENTRY.cachembw_controller.domain_updated(dom, +- QOS_MANAGER_ENTRY.data_collector.guest_info) +- +- +-def event_device_removed_callback(conn, dom, dev_alias, opaque): +- device_name = str(dev_alias[0:4]) +- if device_name == "vcpu": +- LOGGER.info("Occur device removed event: domain %s(%d) removed vcpu" % (dom.name(), dom.ID())) +- QOS_MANAGER_ENTRY.reset_data_collector() +- +- + def sigterm_handler(signo, stack): + sys.exit(0) + ++def sigabrt_handler(signo, stack): ++ sys.exit(1) ++ + + def func_daemon(): + global LIBVIRT_CONN + global QOS_MANAGER_ENTRY + +- event_lifecycle_id = -1 +- event_device_added_id = -1 +- event_device_removed_id = -1 +- + signal.signal(signal.SIGTERM, sigterm_handler) +- signal.signal(signal.SIGABRT, sigterm_handler) ++ signal.signal(signal.SIGABRT, sigabrt_handler) + + @atexit.register + def daemon_exit_func(): +- deregister_callback_event(LIBVIRT_CONN, event_lifecycle_id) +- deregister_callback_event(LIBVIRT_CONN, event_device_added_id) +- deregister_callback_event(LIBVIRT_CONN, event_device_removed_id) + LIBVIRT_CONN.close() + remove_pid_file() + + create_pid_file() + +- try: +- if libvirt.virEventRegisterDefaultImpl() < 0: +- LOGGER.error("Failed to register event implementation!") +- sys.exit(1) +- except libvirt.libvirtError: +- LOGGER.error("System internal error!") +- sys.exit(1) +- + LOGGER.info("Try to open libvirtd connection") + try: + LIBVIRT_CONN = libvirt.open(LIBVIRT_URI) +@@ -227,40 +165,17 @@ def func_daemon(): + LOGGER.error("System internal error, failed to open libvirtd connection!") + sys.exit(1) + +- event_lifecycle_id = register_callback_event(LIBVIRT_CONN, +- libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE, +- event_lifecycle_callback, None) +- event_device_added_id = register_callback_event(LIBVIRT_CONN, +- libvirt.VIR_DOMAIN_EVENT_ID_DEVICE_ADDED, +- event_device_added_callback, None) +- event_device_removed_id = register_callback_event(LIBVIRT_CONN, +- libvirt.VIR_DOMAIN_EVENT_ID_DEVICE_REMOVED, +- event_device_removed_callback, None) +- if event_lifecycle_id < 0 or event_device_added_id < 0 or event_device_removed_id < 0: +- LOGGER.error("Failed to register libvirt event %d, %d, %d" +- % (event_lifecycle_id, event_device_added_id, event_device_removed_id)) +- sys.exit(1) +- LOGGER.info("Libvirtd connected and libvirt event registered.") ++ LOGGER.info("Libvirtd connected.") + + QOS_MANAGER_ENTRY = QosManager(LIBVIRT_CONN) + QOS_MANAGER_ENTRY.init_scheduler() + QOS_MANAGER_ENTRY.init_data_collector() + QOS_MANAGER_ENTRY.init_qos_analyzer() + QOS_MANAGER_ENTRY.init_qos_controller() ++ ++ LOGGER.info("QoS management ready to start.") + QOS_MANAGER_ENTRY.start_scheduler() +- LOGGER.info("QoS management started.") +- +- while LIBVIRT_CONN.isAlive(): +- if not QOS_MANAGER_ENTRY.scheduler.get_jobs() and QOS_MANAGER_ENTRY.has_job: +- LOGGER.error("The Scheduler detects an exception, process will exit!") +- break +- try: +- if libvirt.virEventRunDefaultImpl() < 0: +- LOGGER.warning("Failed to run event loop") +- break +- except libvirt.libvirtError as error: +- LOGGER.warning("Run libvirt event loop failed: %s" % str(error)) +- break ++ + sys.exit(1) + + +-- +2.33.0 + diff --git a/cpu_qos-Add-aditional-setting-for-cpu-QOS.patch b/cpu_qos-Add-aditional-setting-for-cpu-QOS.patch new file mode 100644 index 0000000..560ceac --- /dev/null +++ b/cpu_qos-Add-aditional-setting-for-cpu-QOS.patch @@ -0,0 +1,43 @@ +From 14b8a140c5794b28ccbb1c396924a9363767b7ea Mon Sep 17 00:00:00 2001 +From: Keqian Zhu +Date: Thu, 4 Aug 2022 15:34:59 -0400 +Subject: [PATCH 2/3] cpu_qos: Add aditional setting for cpu QOS + +Set overload_detect_period and offline_wait_interval, so as +to prevent low priority vm to be hungry too much time. + +Signed-off-by: Keqian Zhu +--- + qos_controller/cpucontroller.py | 8 +++++++- + 1 file changed, 7 insertions(+), 1 deletion(-) + +diff --git a/qos_controller/cpucontroller.py b/qos_controller/cpucontroller.py +index f857973..f2a67e0 100644 +--- a/qos_controller/cpucontroller.py ++++ b/qos_controller/cpucontroller.py +@@ -23,6 +23,10 @@ import util + + LOW_PRIORITY_SLICES_PATH = "/sys/fs/cgroup/cpu/low_prio_machine.slice" + LOW_PRIORITY_QOS_LEVEL = -1 ++OVERLOAG_DETECT_PERIOD_PATH = "/proc/sys/kernel/qos_overload_detect_period_ms" ++OVERLOAG_DETECT_PERIOD_MS = 100 ++OFFLINE_WAIT_INTERVAL_PATH = "/proc/sys/kernel/qos_offline_wait_interval_ms" ++OFFLINE_WAIT_INTERVAL_MS = 100 + MIN_QUOTA_US = 0 + + +@@ -36,8 +40,10 @@ class CpuController: + qos_level_path = os.path.join(LOW_PRIORITY_SLICES_PATH, "cpu.qos_level") + try: + util.file_write(qos_level_path, str(LOW_PRIORITY_QOS_LEVEL)) ++ util.file_write(OVERLOAG_DETECT_PERIOD_PATH, str(OVERLOAG_DETECT_PERIOD_MS)) ++ util.file_write(OFFLINE_WAIT_INTERVAL_PATH, str(OFFLINE_WAIT_INTERVAL_MS)) + except IOError as error: +- LOGGER.error("Failed to set low priority cpu qos level: %s" % str(error)) ++ LOGGER.error("Failed to configure CPU QoS for low priority VMs: %s" % str(error)) + raise + + def limit_domain_bandwidth(self, guest_info, quota_threshold, abnormal_threshold): +-- +2.33.0 + diff --git a/guestinfo-Take-another-VM-stop-reason-to-account.patch b/guestinfo-Take-another-VM-stop-reason-to-account.patch new file mode 100644 index 0000000..bbab87d --- /dev/null +++ b/guestinfo-Take-another-VM-stop-reason-to-account.patch @@ -0,0 +1,96 @@ +From bdd805eec082062e042acda6caf38ca17dbaec50 Mon Sep 17 00:00:00 2001 +From: Keqian Zhu +Date: Thu, 4 Aug 2022 14:46:33 -0400 +Subject: [PATCH 1/3] guestinfo: Take another VM stop reason to account + +When VM is closed by OpenStack, the exception code is not +VIR_ERR_NO_DOMAIN but exception message contains "domain +is not running". + +And refactor code to make it more readable. + +Signed-off-by: Keqian Zhu +--- + data_collector/guestinfo.py | 48 +++++++++++++++++-------------------- + 1 file changed, 22 insertions(+), 26 deletions(-) + +diff --git a/data_collector/guestinfo.py b/data_collector/guestinfo.py +index 415b3f6..38fa827 100644 +--- a/data_collector/guestinfo.py ++++ b/data_collector/guestinfo.py +@@ -28,6 +28,7 @@ DEFAULT_PRIORITY = "machine" + HIGH_PRIORITY = "high_prio_machine" + LOW_PRIORITY = "low_prio_machine" + PIDS_CGRP_PATH = "/sys/fs/cgroup/pids" ++DOMAIN_STOP_MSG = "domain is not running" + + + class DomainInfo: +@@ -141,43 +142,38 @@ class GuestInfo: + self.running_domain_in_cpus.append([]) + + self.get_all_active_domain(conn) +- + for dom in self.domain_online: + self.vm_online_dict[dom.ID()] = dom ++ # Remove ever see but now stopped domains ++ for vm_id in list(self.vm_dict): ++ if vm_id not in self.vm_online_dict: ++ del self.vm_dict[vm_id] ++ + for vm_id in self.vm_online_dict: +- ret = -1 +- if vm_id in self.vm_dict: +- try: ++ try: ++ if vm_id in self.vm_dict: + ret = self.vm_dict.get(vm_id).update_domain_info(self.vm_online_dict.get(vm_id), host_topo) +- except libvirt.libvirtError as e: +- if e.get_error_code() != libvirt.VIR_ERR_NO_DOMAIN: +- raise +- if ret < 0: +- del self.vm_dict[vm_id] +- continue +- else: +- try: +- vm_info = DomainInfo() +- ret = vm_info.set_domain_attribute(self.vm_online_dict.get(vm_id), host_topo) +- except libvirt.libvirtError as e: +- if e.get_error_code() != libvirt.VIR_ERR_NO_DOMAIN: +- raise +- if ret < 0: +- continue +- self.vm_dict[vm_id] = vm_info ++ else: ++ self.vm_dict[vm_id] = DomainInfo() ++ ret = self.vm_dict.get(vm_id).set_domain_attribute(self.vm_online_dict.get(vm_id), host_topo) ++ except libvirt.libvirtError as e: ++ ret = -1 ++ # If domain doesn't stop, raise exception ++ if e.get_error_code() != libvirt.VIR_ERR_NO_DOMAIN and \ ++ DOMAIN_STOP_MSG not in e.get_error_message(): ++ raise ++ if ret < 0: ++ del self.vm_dict[vm_id] ++ continue + ++ if self.vm_dict.get(vm_id).priority == 1: ++ self.low_prio_vm_dict[vm_id] = self.vm_dict.get(vm_id) + for cpu in range(host_topo.max_cpu_nums): + self.running_domain_in_cpus[cpu].append((self.vm_dict.get(vm_id).cpu_usage[cpu], + self.vm_dict.get(vm_id).domain_id, + self.vm_dict.get(vm_id).domain_name, + self.vm_dict.get(vm_id).priority)) + +- for vm_id in list(self.vm_dict): +- if vm_id not in self.vm_online_dict: +- del self.vm_dict[vm_id] +- elif vm_id not in self.low_prio_vm_dict and self.vm_dict.get(vm_id).priority == 1: +- self.low_prio_vm_dict[vm_id] = self.vm_dict.get(vm_id) +- + def get_all_active_domain(self, conn): + try: + self.domain_online = conn.listAllDomains(flags=libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE) +-- +2.33.0 + diff --git a/skylark.spec b/skylark.spec index f3fde4a..9d2b965 100644 --- a/skylark.spec +++ b/skylark.spec @@ -1,12 +1,16 @@ Name: skylark Version: 1.0.0 -Release: 3 +Release: 4 Summary: Skylark is a next-generation QoS-aware scheduler. License: Mulan PSL v2 URL: https://gitee.com/openeuler/skylark Source0: %{name}-%{version}.tar.gz +Patch0001: guestinfo-Take-another-VM-stop-reason-to-account.patch +Patch0002: cpu_qos-Add-aditional-setting-for-cpu-QOS.patch +Patch0003: cachembw_qos-Add-a-job-to-sync-VM-pids-to-resctrl.patch + BuildRequires: python3-devel make gcc coreutils systemd-units Requires: python3 python3-APScheduler python3-libvirt # For resource partition management @@ -27,6 +31,7 @@ Skylark is a next-generation QoS-aware scheduler which provides coordinated reso %prep %setup -q +%autopatch -p1 %build @@ -54,6 +59,11 @@ make install DESTDIR=%{buildroot} %changelog +* Fri Aug 19 2022 Keqian Zhu - 1.0.0-4 +- guestinfo: Take another VM stop reason to account +- cpu_qos: Add aditional setting for cpu QOS +- cachembw_qos: Add a job to sync VM pids to resctrl + * Wed Aug 10 2022 Keqian Zhu - 1.0.0-3 - spec: Add missing dependencies of build and run