skylark/cachembw_qos-Add-a-job-to-sync-VM-pids-to-resctrl.patch

324 lines
13 KiB
Diff
Raw Permalink Normal View History

From fe004fc62e8d47835c96902b26fc3f6e088559e6 Mon Sep 17 00:00:00 2001
From: Dongxu Sun <sundongxu3@huawei.com>
Date: Tue, 16 Aug 2022 10:46:47 +0800
Subject: [PATCH 3/3] cachembw_qos: Add a job to sync VM pids to resctrl
Delete unnecessary libvirt event registrations, and poll
the tasks file to sync VM pids to resctrl task file.
Signed-off-by: Dongxu Sun <sundongxu3@huawei.com>
---
data_collector/datacollector.py | 7 --
data_collector/guestinfo.py | 7 --
qos_controller/cachembwcontroller.py | 22 ++---
skylark.py | 129 +++++----------------------
4 files changed, 28 insertions(+), 137 deletions(-)
diff --git a/data_collector/datacollector.py b/data_collector/datacollector.py
index aaddc42..8aa20ab 100644
--- a/data_collector/datacollector.py
+++ b/data_collector/datacollector.py
@@ -31,13 +31,6 @@ class DataCollector:
def set_static_power_info(self):
self.host_info.set_host_power_attribute()
- def reset_base_info(self, vir_conn):
- self.guest_info.clear_guest_info()
- self.guest_info.update_guest_info(vir_conn, self.host_info.host_topo)
-
- def reset_power_info(self):
- self.host_info.update_host_power_info()
-
def update_base_info(self, vir_conn):
self.guest_info.update_guest_info(vir_conn, self.host_info.host_topo)
diff --git a/data_collector/guestinfo.py b/data_collector/guestinfo.py
index 38fa827..97d9f20 100644
--- a/data_collector/guestinfo.py
+++ b/data_collector/guestinfo.py
@@ -125,13 +125,6 @@ class GuestInfo:
self.domain_online = []
self.running_domain_in_cpus = []
- def clear_guest_info(self):
- self.vm_dict.clear()
- self.low_prio_vm_dict.clear()
- self.vm_online_dict.clear()
- self.domain_online.clear()
- self.running_domain_in_cpus.clear()
-
def update_guest_info(self, conn, host_topo):
self.running_domain_in_cpus.clear()
self.vm_online_dict.clear()
diff --git a/qos_controller/cachembwcontroller.py b/qos_controller/cachembwcontroller.py
index 8be9329..bbfe08f 100644
--- a/qos_controller/cachembwcontroller.py
+++ b/qos_controller/cachembwcontroller.py
@@ -25,7 +25,6 @@ from data_collector.guestinfo import GuestInfo
from data_collector.hostinfo import ResctrlInfo
LOW_VMS_RESGROUP_PATH = "/sys/fs/resctrl/low_prio_machine"
-LOW_VMS_PID_CGRP_PATH = "/sys/fs/cgroup/pids/low_prio_machine.slice"
LOW_MBW_INIT_FLOOR = 0.1
LOW_MBW_INIT_CEIL = 0.2
LOW_CACHE_INIT_FLOOR = 1
@@ -56,11 +55,6 @@ class CacheMBWController:
ResgroupFileOperations.create_group_dir(LOW_VMS_RESGROUP_PATH)
self.__get_low_init_alloc(resctrl_info)
self.set_low_init_alloc(resctrl_info)
- with os.scandir(LOW_VMS_PID_CGRP_PATH) as it:
- for entry in it:
- if entry.is_file():
- continue
- self.__add_vm_pids(entry.name)
def __get_low_init_alloc(self, resctrl_info: ResctrlInfo):
low_vms_mbw_init = float(os.getenv("MIN_MBW_LOW_VMS"))
@@ -111,25 +105,21 @@ class CacheMBWController:
util.file_write(os.path.join(
LOW_VMS_RESGROUP_PATH, "schemata"), schemata_mbw_alloc)
- def domain_updated(self, domain, guest_info: GuestInfo):
- if domain.ID() in guest_info.low_prio_vm_dict:
- self.__add_vm_pids(guest_info.vm_dict[domain.ID()].cgroup_name)
-
@staticmethod
- def __add_vm_pids(vm_cgrp_name):
- tasks_path = os.path.join(LOW_VMS_PID_CGRP_PATH, vm_cgrp_name, "tasks")
+ def add_vm_pids(tasks_path):
if not os.access(tasks_path, os.R_OK):
LOGGER.warning(
"The path %s is not readable, please check." % tasks_path)
return
- LOGGER.info("Add %s pids to %s" %
- (vm_cgrp_name, os.path.join(LOW_VMS_RESGROUP_PATH, "tasks")))
+
+ resctrl_tsk_path = os.path.join(LOW_VMS_RESGROUP_PATH, "tasks")
+ LOGGER.debug("Add %s pids to %s" % (tasks_path, resctrl_tsk_path))
try:
with open(tasks_path) as tasks:
for task in tasks:
- util.file_write(os.path.join(LOW_VMS_RESGROUP_PATH, "tasks"), task)
+ util.file_write(resctrl_tsk_path, task)
except IOError as e:
- LOGGER.error("Failed to add VM(%s) pids to resctrl: %s" % (vm_cgrp_name, str(e)))
+ LOGGER.error("Failed to add %s pids to resctrl: %s" % (tasks_path, str(e)))
# If the VM doesn't stop, raise exception.
if os.access(tasks_path, os.F_OK):
raise
diff --git a/skylark.py b/skylark.py
index 8962ba5..c281a54 100644
--- a/skylark.py
+++ b/skylark.py
@@ -27,7 +27,7 @@ import stat
import subprocess
import sys
-from apscheduler.schedulers.background import BackgroundScheduler
+from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.events import EVENT_JOB_ERROR
import libvirt
@@ -47,12 +47,7 @@ LIBVIRT_DRIVE_TYPE = None
PID_FILE = None
MSR_PATH = "/dev/cpu/0/msr"
PID_FILE_NAME = "/var/run/skylarkd.pid"
-
-STATE_TO_STRING = ['VIR_DOMAIN_EVENT_DEFINED', 'VIR_DOMAIN_EVENT_UNDEFINED',
- 'VIR_DOMAIN_EVENT_STARTED', 'VIR_DOMAIN_EVENT_SUSPENDED',
- 'VIR_DOMAIN_EVENT_RESUMED', 'VIR_DOMAIN_EVENT_STOPPED',
- 'VIR_DOMAIN_EVENT_SHUTDOWN', 'VIR_DOMAIN_EVENT_PMSUSPENDED',
- 'VIR_DOMAIN_EVENT_CRASHED', 'VIR_DOMAIN_EVENT_LAST']
+LOW_VMS_PID_CGRP_PATH = "/sys/fs/cgroup/pids/low_prio_machine.slice"
class QosManager:
@@ -63,17 +58,18 @@ class QosManager:
self.cpu_controller = CpuController()
self.net_controller = NetController()
self.cachembw_controller = CacheMBWController()
- self.has_job = False
def scheduler_listener(self, event):
if event.exception:
+ LOGGER.info("The Scheduler detects an exception, send SIGABRT and restart skylark...")
self.scheduler.remove_all_jobs()
+ os.kill(os.getpid(), signal.SIGABRT)
def init_scheduler(self):
- self.scheduler = BackgroundScheduler(logger=LOGGER)
+ self.scheduler = BlockingScheduler(logger=LOGGER)
if os.getenv("POWER_QOS_MANAGEMENT", "false").lower() == "true":
self.scheduler.add_job(self.__do_power_manage, trigger='interval', seconds=1, id='do_power_manage')
- self.has_job = True
+ self.scheduler.add_job(self.__do_resctrl_sync, trigger='interval', seconds=0.5, id='do_resctrl_sync')
self.scheduler.add_listener(self.scheduler_listener, EVENT_JOB_ERROR)
def init_data_collector(self):
@@ -95,19 +91,19 @@ class QosManager:
def start_scheduler(self):
self.scheduler.start()
- def reset_data_collector(self):
- self.scheduler.shutdown(wait=True)
- self.data_collector.reset_base_info(self.vir_conn)
- if os.getenv("POWER_QOS_MANAGEMENT", "false").lower() == "true":
- self.data_collector.reset_power_info()
- self.init_scheduler()
- self.start_scheduler()
-
def __do_power_manage(self):
self.data_collector.update_base_info(self.vir_conn)
self.data_collector.update_power_info()
self.power_analyzer.power_manage(self.data_collector, self.cpu_controller)
+ def __do_resctrl_sync(self):
+ with os.scandir(LOW_VMS_PID_CGRP_PATH) as it:
+ for entry in it:
+ if entry.is_file():
+ continue
+ tasks_path = os.path.join(LOW_VMS_PID_CGRP_PATH, entry.name, "tasks")
+ self.cachembw_controller.add_vm_pids(tasks_path)
+
def create_pid_file():
global PID_FILE
@@ -140,85 +136,27 @@ def remove_pid_file():
util.remove_file(PID_FILE.name)
-def register_callback_event(conn, event_id, callback_func, opaque):
- if conn is not None and event_id >= 0:
- try:
- return conn.domainEventRegisterAny(None, event_id, callback_func, opaque)
- except libvirt.libvirtError as error:
- LOGGER.warning("Register event exception %s" % str(error))
- return -1
-
-
-def deregister_callback_event(conn, callback_id):
- if conn is not None and callback_id >= 0:
- try:
- conn.domainEventDeregisterAny(callback_id)
- except libvirt.libvirtError as error:
- LOGGER.warning("Deregister event exception %s" % str(error))
-
-
-def event_lifecycle_callback(conn, dom, event, detail, opaque):
- LOGGER.info("Occur lifecycle event: domain %s(%d) state changed to %s" % (
- dom.name(), dom.ID(), STATE_TO_STRING[event]))
- vm_started = (event == libvirt.VIR_DOMAIN_EVENT_STARTED)
- vm_stopped = (event == libvirt.VIR_DOMAIN_EVENT_STOPPED)
- if vm_started or vm_stopped:
- QOS_MANAGER_ENTRY.reset_data_collector()
- if vm_started:
- QOS_MANAGER_ENTRY.cachembw_controller.domain_updated(dom,
- QOS_MANAGER_ENTRY.data_collector.guest_info)
- return 0
-
-
-def event_device_added_callback(conn, dom, dev_alias, opaque):
- device_name = str(dev_alias[0:4])
- if device_name == "vcpu":
- LOGGER.info("Occur device added event: domain %s(%d) add vcpu" % (dom.name(), dom.ID()))
- QOS_MANAGER_ENTRY.reset_data_collector()
- QOS_MANAGER_ENTRY.cachembw_controller.domain_updated(dom,
- QOS_MANAGER_ENTRY.data_collector.guest_info)
-
-
-def event_device_removed_callback(conn, dom, dev_alias, opaque):
- device_name = str(dev_alias[0:4])
- if device_name == "vcpu":
- LOGGER.info("Occur device removed event: domain %s(%d) removed vcpu" % (dom.name(), dom.ID()))
- QOS_MANAGER_ENTRY.reset_data_collector()
-
-
def sigterm_handler(signo, stack):
sys.exit(0)
+def sigabrt_handler(signo, stack):
+ sys.exit(1)
+
def func_daemon():
global LIBVIRT_CONN
global QOS_MANAGER_ENTRY
- event_lifecycle_id = -1
- event_device_added_id = -1
- event_device_removed_id = -1
-
signal.signal(signal.SIGTERM, sigterm_handler)
- signal.signal(signal.SIGABRT, sigterm_handler)
+ signal.signal(signal.SIGABRT, sigabrt_handler)
@atexit.register
def daemon_exit_func():
- deregister_callback_event(LIBVIRT_CONN, event_lifecycle_id)
- deregister_callback_event(LIBVIRT_CONN, event_device_added_id)
- deregister_callback_event(LIBVIRT_CONN, event_device_removed_id)
LIBVIRT_CONN.close()
remove_pid_file()
create_pid_file()
- try:
- if libvirt.virEventRegisterDefaultImpl() < 0:
- LOGGER.error("Failed to register event implementation!")
- sys.exit(1)
- except libvirt.libvirtError:
- LOGGER.error("System internal error!")
- sys.exit(1)
-
LOGGER.info("Try to open libvirtd connection")
try:
LIBVIRT_CONN = libvirt.open(LIBVIRT_URI)
@@ -227,40 +165,17 @@ def func_daemon():
LOGGER.error("System internal error, failed to open libvirtd connection!")
sys.exit(1)
- event_lifecycle_id = register_callback_event(LIBVIRT_CONN,
- libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE,
- event_lifecycle_callback, None)
- event_device_added_id = register_callback_event(LIBVIRT_CONN,
- libvirt.VIR_DOMAIN_EVENT_ID_DEVICE_ADDED,
- event_device_added_callback, None)
- event_device_removed_id = register_callback_event(LIBVIRT_CONN,
- libvirt.VIR_DOMAIN_EVENT_ID_DEVICE_REMOVED,
- event_device_removed_callback, None)
- if event_lifecycle_id < 0 or event_device_added_id < 0 or event_device_removed_id < 0:
- LOGGER.error("Failed to register libvirt event %d, %d, %d"
- % (event_lifecycle_id, event_device_added_id, event_device_removed_id))
- sys.exit(1)
- LOGGER.info("Libvirtd connected and libvirt event registered.")
+ LOGGER.info("Libvirtd connected.")
QOS_MANAGER_ENTRY = QosManager(LIBVIRT_CONN)
QOS_MANAGER_ENTRY.init_scheduler()
QOS_MANAGER_ENTRY.init_data_collector()
QOS_MANAGER_ENTRY.init_qos_analyzer()
QOS_MANAGER_ENTRY.init_qos_controller()
+
+ LOGGER.info("QoS management ready to start.")
QOS_MANAGER_ENTRY.start_scheduler()
- LOGGER.info("QoS management started.")
-
- while LIBVIRT_CONN.isAlive():
- if not QOS_MANAGER_ENTRY.scheduler.get_jobs() and QOS_MANAGER_ENTRY.has_job:
- LOGGER.error("The Scheduler detects an exception, process will exit!")
- break
- try:
- if libvirt.virEventRunDefaultImpl() < 0:
- LOGGER.warning("Failed to run event loop")
- break
- except libvirt.libvirtError as error:
- LOGGER.warning("Run libvirt event loop failed: %s" % str(error))
- break
+
sys.exit(1)
--
2.33.0