From fe004fc62e8d47835c96902b26fc3f6e088559e6 Mon Sep 17 00:00:00 2001 From: Dongxu Sun Date: Tue, 16 Aug 2022 10:46:47 +0800 Subject: [PATCH 3/3] cachembw_qos: Add a job to sync VM pids to resctrl Delete unnecessary libvirt event registrations, and poll the tasks file to sync VM pids to resctrl task file. Signed-off-by: Dongxu Sun --- data_collector/datacollector.py | 7 -- data_collector/guestinfo.py | 7 -- qos_controller/cachembwcontroller.py | 22 ++--- skylark.py | 129 +++++---------------------- 4 files changed, 28 insertions(+), 137 deletions(-) diff --git a/data_collector/datacollector.py b/data_collector/datacollector.py index aaddc42..8aa20ab 100644 --- a/data_collector/datacollector.py +++ b/data_collector/datacollector.py @@ -31,13 +31,6 @@ class DataCollector: def set_static_power_info(self): self.host_info.set_host_power_attribute() - def reset_base_info(self, vir_conn): - self.guest_info.clear_guest_info() - self.guest_info.update_guest_info(vir_conn, self.host_info.host_topo) - - def reset_power_info(self): - self.host_info.update_host_power_info() - def update_base_info(self, vir_conn): self.guest_info.update_guest_info(vir_conn, self.host_info.host_topo) diff --git a/data_collector/guestinfo.py b/data_collector/guestinfo.py index 38fa827..97d9f20 100644 --- a/data_collector/guestinfo.py +++ b/data_collector/guestinfo.py @@ -125,13 +125,6 @@ class GuestInfo: self.domain_online = [] self.running_domain_in_cpus = [] - def clear_guest_info(self): - self.vm_dict.clear() - self.low_prio_vm_dict.clear() - self.vm_online_dict.clear() - self.domain_online.clear() - self.running_domain_in_cpus.clear() - def update_guest_info(self, conn, host_topo): self.running_domain_in_cpus.clear() self.vm_online_dict.clear() diff --git a/qos_controller/cachembwcontroller.py b/qos_controller/cachembwcontroller.py index 8be9329..bbfe08f 100644 --- a/qos_controller/cachembwcontroller.py +++ b/qos_controller/cachembwcontroller.py @@ -25,7 +25,6 @@ from data_collector.guestinfo import GuestInfo from data_collector.hostinfo import ResctrlInfo LOW_VMS_RESGROUP_PATH = "/sys/fs/resctrl/low_prio_machine" -LOW_VMS_PID_CGRP_PATH = "/sys/fs/cgroup/pids/low_prio_machine.slice" LOW_MBW_INIT_FLOOR = 0.1 LOW_MBW_INIT_CEIL = 0.2 LOW_CACHE_INIT_FLOOR = 1 @@ -56,11 +55,6 @@ class CacheMBWController: ResgroupFileOperations.create_group_dir(LOW_VMS_RESGROUP_PATH) self.__get_low_init_alloc(resctrl_info) self.set_low_init_alloc(resctrl_info) - with os.scandir(LOW_VMS_PID_CGRP_PATH) as it: - for entry in it: - if entry.is_file(): - continue - self.__add_vm_pids(entry.name) def __get_low_init_alloc(self, resctrl_info: ResctrlInfo): low_vms_mbw_init = float(os.getenv("MIN_MBW_LOW_VMS")) @@ -111,25 +105,21 @@ class CacheMBWController: util.file_write(os.path.join( LOW_VMS_RESGROUP_PATH, "schemata"), schemata_mbw_alloc) - def domain_updated(self, domain, guest_info: GuestInfo): - if domain.ID() in guest_info.low_prio_vm_dict: - self.__add_vm_pids(guest_info.vm_dict[domain.ID()].cgroup_name) - @staticmethod - def __add_vm_pids(vm_cgrp_name): - tasks_path = os.path.join(LOW_VMS_PID_CGRP_PATH, vm_cgrp_name, "tasks") + def add_vm_pids(tasks_path): if not os.access(tasks_path, os.R_OK): LOGGER.warning( "The path %s is not readable, please check." % tasks_path) return - LOGGER.info("Add %s pids to %s" % - (vm_cgrp_name, os.path.join(LOW_VMS_RESGROUP_PATH, "tasks"))) + + resctrl_tsk_path = os.path.join(LOW_VMS_RESGROUP_PATH, "tasks") + LOGGER.debug("Add %s pids to %s" % (tasks_path, resctrl_tsk_path)) try: with open(tasks_path) as tasks: for task in tasks: - util.file_write(os.path.join(LOW_VMS_RESGROUP_PATH, "tasks"), task) + util.file_write(resctrl_tsk_path, task) except IOError as e: - LOGGER.error("Failed to add VM(%s) pids to resctrl: %s" % (vm_cgrp_name, str(e))) + LOGGER.error("Failed to add %s pids to resctrl: %s" % (tasks_path, str(e))) # If the VM doesn't stop, raise exception. if os.access(tasks_path, os.F_OK): raise diff --git a/skylark.py b/skylark.py index 8962ba5..c281a54 100644 --- a/skylark.py +++ b/skylark.py @@ -27,7 +27,7 @@ import stat import subprocess import sys -from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.schedulers.background import BlockingScheduler from apscheduler.events import EVENT_JOB_ERROR import libvirt @@ -47,12 +47,7 @@ LIBVIRT_DRIVE_TYPE = None PID_FILE = None MSR_PATH = "/dev/cpu/0/msr" PID_FILE_NAME = "/var/run/skylarkd.pid" - -STATE_TO_STRING = ['VIR_DOMAIN_EVENT_DEFINED', 'VIR_DOMAIN_EVENT_UNDEFINED', - 'VIR_DOMAIN_EVENT_STARTED', 'VIR_DOMAIN_EVENT_SUSPENDED', - 'VIR_DOMAIN_EVENT_RESUMED', 'VIR_DOMAIN_EVENT_STOPPED', - 'VIR_DOMAIN_EVENT_SHUTDOWN', 'VIR_DOMAIN_EVENT_PMSUSPENDED', - 'VIR_DOMAIN_EVENT_CRASHED', 'VIR_DOMAIN_EVENT_LAST'] +LOW_VMS_PID_CGRP_PATH = "/sys/fs/cgroup/pids/low_prio_machine.slice" class QosManager: @@ -63,17 +58,18 @@ class QosManager: self.cpu_controller = CpuController() self.net_controller = NetController() self.cachembw_controller = CacheMBWController() - self.has_job = False def scheduler_listener(self, event): if event.exception: + LOGGER.info("The Scheduler detects an exception, send SIGABRT and restart skylark...") self.scheduler.remove_all_jobs() + os.kill(os.getpid(), signal.SIGABRT) def init_scheduler(self): - self.scheduler = BackgroundScheduler(logger=LOGGER) + self.scheduler = BlockingScheduler(logger=LOGGER) if os.getenv("POWER_QOS_MANAGEMENT", "false").lower() == "true": self.scheduler.add_job(self.__do_power_manage, trigger='interval', seconds=1, id='do_power_manage') - self.has_job = True + self.scheduler.add_job(self.__do_resctrl_sync, trigger='interval', seconds=0.5, id='do_resctrl_sync') self.scheduler.add_listener(self.scheduler_listener, EVENT_JOB_ERROR) def init_data_collector(self): @@ -95,19 +91,19 @@ class QosManager: def start_scheduler(self): self.scheduler.start() - def reset_data_collector(self): - self.scheduler.shutdown(wait=True) - self.data_collector.reset_base_info(self.vir_conn) - if os.getenv("POWER_QOS_MANAGEMENT", "false").lower() == "true": - self.data_collector.reset_power_info() - self.init_scheduler() - self.start_scheduler() - def __do_power_manage(self): self.data_collector.update_base_info(self.vir_conn) self.data_collector.update_power_info() self.power_analyzer.power_manage(self.data_collector, self.cpu_controller) + def __do_resctrl_sync(self): + with os.scandir(LOW_VMS_PID_CGRP_PATH) as it: + for entry in it: + if entry.is_file(): + continue + tasks_path = os.path.join(LOW_VMS_PID_CGRP_PATH, entry.name, "tasks") + self.cachembw_controller.add_vm_pids(tasks_path) + def create_pid_file(): global PID_FILE @@ -140,85 +136,27 @@ def remove_pid_file(): util.remove_file(PID_FILE.name) -def register_callback_event(conn, event_id, callback_func, opaque): - if conn is not None and event_id >= 0: - try: - return conn.domainEventRegisterAny(None, event_id, callback_func, opaque) - except libvirt.libvirtError as error: - LOGGER.warning("Register event exception %s" % str(error)) - return -1 - - -def deregister_callback_event(conn, callback_id): - if conn is not None and callback_id >= 0: - try: - conn.domainEventDeregisterAny(callback_id) - except libvirt.libvirtError as error: - LOGGER.warning("Deregister event exception %s" % str(error)) - - -def event_lifecycle_callback(conn, dom, event, detail, opaque): - LOGGER.info("Occur lifecycle event: domain %s(%d) state changed to %s" % ( - dom.name(), dom.ID(), STATE_TO_STRING[event])) - vm_started = (event == libvirt.VIR_DOMAIN_EVENT_STARTED) - vm_stopped = (event == libvirt.VIR_DOMAIN_EVENT_STOPPED) - if vm_started or vm_stopped: - QOS_MANAGER_ENTRY.reset_data_collector() - if vm_started: - QOS_MANAGER_ENTRY.cachembw_controller.domain_updated(dom, - QOS_MANAGER_ENTRY.data_collector.guest_info) - return 0 - - -def event_device_added_callback(conn, dom, dev_alias, opaque): - device_name = str(dev_alias[0:4]) - if device_name == "vcpu": - LOGGER.info("Occur device added event: domain %s(%d) add vcpu" % (dom.name(), dom.ID())) - QOS_MANAGER_ENTRY.reset_data_collector() - QOS_MANAGER_ENTRY.cachembw_controller.domain_updated(dom, - QOS_MANAGER_ENTRY.data_collector.guest_info) - - -def event_device_removed_callback(conn, dom, dev_alias, opaque): - device_name = str(dev_alias[0:4]) - if device_name == "vcpu": - LOGGER.info("Occur device removed event: domain %s(%d) removed vcpu" % (dom.name(), dom.ID())) - QOS_MANAGER_ENTRY.reset_data_collector() - - def sigterm_handler(signo, stack): sys.exit(0) +def sigabrt_handler(signo, stack): + sys.exit(1) + def func_daemon(): global LIBVIRT_CONN global QOS_MANAGER_ENTRY - event_lifecycle_id = -1 - event_device_added_id = -1 - event_device_removed_id = -1 - signal.signal(signal.SIGTERM, sigterm_handler) - signal.signal(signal.SIGABRT, sigterm_handler) + signal.signal(signal.SIGABRT, sigabrt_handler) @atexit.register def daemon_exit_func(): - deregister_callback_event(LIBVIRT_CONN, event_lifecycle_id) - deregister_callback_event(LIBVIRT_CONN, event_device_added_id) - deregister_callback_event(LIBVIRT_CONN, event_device_removed_id) LIBVIRT_CONN.close() remove_pid_file() create_pid_file() - try: - if libvirt.virEventRegisterDefaultImpl() < 0: - LOGGER.error("Failed to register event implementation!") - sys.exit(1) - except libvirt.libvirtError: - LOGGER.error("System internal error!") - sys.exit(1) - LOGGER.info("Try to open libvirtd connection") try: LIBVIRT_CONN = libvirt.open(LIBVIRT_URI) @@ -227,40 +165,17 @@ def func_daemon(): LOGGER.error("System internal error, failed to open libvirtd connection!") sys.exit(1) - event_lifecycle_id = register_callback_event(LIBVIRT_CONN, - libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE, - event_lifecycle_callback, None) - event_device_added_id = register_callback_event(LIBVIRT_CONN, - libvirt.VIR_DOMAIN_EVENT_ID_DEVICE_ADDED, - event_device_added_callback, None) - event_device_removed_id = register_callback_event(LIBVIRT_CONN, - libvirt.VIR_DOMAIN_EVENT_ID_DEVICE_REMOVED, - event_device_removed_callback, None) - if event_lifecycle_id < 0 or event_device_added_id < 0 or event_device_removed_id < 0: - LOGGER.error("Failed to register libvirt event %d, %d, %d" - % (event_lifecycle_id, event_device_added_id, event_device_removed_id)) - sys.exit(1) - LOGGER.info("Libvirtd connected and libvirt event registered.") + LOGGER.info("Libvirtd connected.") QOS_MANAGER_ENTRY = QosManager(LIBVIRT_CONN) QOS_MANAGER_ENTRY.init_scheduler() QOS_MANAGER_ENTRY.init_data_collector() QOS_MANAGER_ENTRY.init_qos_analyzer() QOS_MANAGER_ENTRY.init_qos_controller() + + LOGGER.info("QoS management ready to start.") QOS_MANAGER_ENTRY.start_scheduler() - LOGGER.info("QoS management started.") - - while LIBVIRT_CONN.isAlive(): - if not QOS_MANAGER_ENTRY.scheduler.get_jobs() and QOS_MANAGER_ENTRY.has_job: - LOGGER.error("The Scheduler detects an exception, process will exit!") - break - try: - if libvirt.virEventRunDefaultImpl() < 0: - LOGGER.warning("Failed to run event loop") - break - except libvirt.libvirtError as error: - LOGGER.warning("Run libvirt event loop failed: %s" % str(error)) - break + sys.exit(1) -- 2.33.0