324 lines
13 KiB
Diff
324 lines
13 KiB
Diff
|
|
From fe004fc62e8d47835c96902b26fc3f6e088559e6 Mon Sep 17 00:00:00 2001
|
||
|
|
From: Dongxu Sun <sundongxu3@huawei.com>
|
||
|
|
Date: Tue, 16 Aug 2022 10:46:47 +0800
|
||
|
|
Subject: [PATCH 3/3] cachembw_qos: Add a job to sync VM pids to resctrl
|
||
|
|
|
||
|
|
Delete unnecessary libvirt event registrations, and poll
|
||
|
|
the tasks file to sync VM pids to resctrl task file.
|
||
|
|
|
||
|
|
Signed-off-by: Dongxu Sun <sundongxu3@huawei.com>
|
||
|
|
---
|
||
|
|
data_collector/datacollector.py | 7 --
|
||
|
|
data_collector/guestinfo.py | 7 --
|
||
|
|
qos_controller/cachembwcontroller.py | 22 ++---
|
||
|
|
skylark.py | 129 +++++----------------------
|
||
|
|
4 files changed, 28 insertions(+), 137 deletions(-)
|
||
|
|
|
||
|
|
diff --git a/data_collector/datacollector.py b/data_collector/datacollector.py
|
||
|
|
index aaddc42..8aa20ab 100644
|
||
|
|
--- a/data_collector/datacollector.py
|
||
|
|
+++ b/data_collector/datacollector.py
|
||
|
|
@@ -31,13 +31,6 @@ class DataCollector:
|
||
|
|
def set_static_power_info(self):
|
||
|
|
self.host_info.set_host_power_attribute()
|
||
|
|
|
||
|
|
- def reset_base_info(self, vir_conn):
|
||
|
|
- self.guest_info.clear_guest_info()
|
||
|
|
- self.guest_info.update_guest_info(vir_conn, self.host_info.host_topo)
|
||
|
|
-
|
||
|
|
- def reset_power_info(self):
|
||
|
|
- self.host_info.update_host_power_info()
|
||
|
|
-
|
||
|
|
def update_base_info(self, vir_conn):
|
||
|
|
self.guest_info.update_guest_info(vir_conn, self.host_info.host_topo)
|
||
|
|
|
||
|
|
diff --git a/data_collector/guestinfo.py b/data_collector/guestinfo.py
|
||
|
|
index 38fa827..97d9f20 100644
|
||
|
|
--- a/data_collector/guestinfo.py
|
||
|
|
+++ b/data_collector/guestinfo.py
|
||
|
|
@@ -125,13 +125,6 @@ class GuestInfo:
|
||
|
|
self.domain_online = []
|
||
|
|
self.running_domain_in_cpus = []
|
||
|
|
|
||
|
|
- def clear_guest_info(self):
|
||
|
|
- self.vm_dict.clear()
|
||
|
|
- self.low_prio_vm_dict.clear()
|
||
|
|
- self.vm_online_dict.clear()
|
||
|
|
- self.domain_online.clear()
|
||
|
|
- self.running_domain_in_cpus.clear()
|
||
|
|
-
|
||
|
|
def update_guest_info(self, conn, host_topo):
|
||
|
|
self.running_domain_in_cpus.clear()
|
||
|
|
self.vm_online_dict.clear()
|
||
|
|
diff --git a/qos_controller/cachembwcontroller.py b/qos_controller/cachembwcontroller.py
|
||
|
|
index 8be9329..bbfe08f 100644
|
||
|
|
--- a/qos_controller/cachembwcontroller.py
|
||
|
|
+++ b/qos_controller/cachembwcontroller.py
|
||
|
|
@@ -25,7 +25,6 @@ from data_collector.guestinfo import GuestInfo
|
||
|
|
from data_collector.hostinfo import ResctrlInfo
|
||
|
|
|
||
|
|
LOW_VMS_RESGROUP_PATH = "/sys/fs/resctrl/low_prio_machine"
|
||
|
|
-LOW_VMS_PID_CGRP_PATH = "/sys/fs/cgroup/pids/low_prio_machine.slice"
|
||
|
|
LOW_MBW_INIT_FLOOR = 0.1
|
||
|
|
LOW_MBW_INIT_CEIL = 0.2
|
||
|
|
LOW_CACHE_INIT_FLOOR = 1
|
||
|
|
@@ -56,11 +55,6 @@ class CacheMBWController:
|
||
|
|
ResgroupFileOperations.create_group_dir(LOW_VMS_RESGROUP_PATH)
|
||
|
|
self.__get_low_init_alloc(resctrl_info)
|
||
|
|
self.set_low_init_alloc(resctrl_info)
|
||
|
|
- with os.scandir(LOW_VMS_PID_CGRP_PATH) as it:
|
||
|
|
- for entry in it:
|
||
|
|
- if entry.is_file():
|
||
|
|
- continue
|
||
|
|
- self.__add_vm_pids(entry.name)
|
||
|
|
|
||
|
|
def __get_low_init_alloc(self, resctrl_info: ResctrlInfo):
|
||
|
|
low_vms_mbw_init = float(os.getenv("MIN_MBW_LOW_VMS"))
|
||
|
|
@@ -111,25 +105,21 @@ class CacheMBWController:
|
||
|
|
util.file_write(os.path.join(
|
||
|
|
LOW_VMS_RESGROUP_PATH, "schemata"), schemata_mbw_alloc)
|
||
|
|
|
||
|
|
- def domain_updated(self, domain, guest_info: GuestInfo):
|
||
|
|
- if domain.ID() in guest_info.low_prio_vm_dict:
|
||
|
|
- self.__add_vm_pids(guest_info.vm_dict[domain.ID()].cgroup_name)
|
||
|
|
-
|
||
|
|
@staticmethod
|
||
|
|
- def __add_vm_pids(vm_cgrp_name):
|
||
|
|
- tasks_path = os.path.join(LOW_VMS_PID_CGRP_PATH, vm_cgrp_name, "tasks")
|
||
|
|
+ def add_vm_pids(tasks_path):
|
||
|
|
if not os.access(tasks_path, os.R_OK):
|
||
|
|
LOGGER.warning(
|
||
|
|
"The path %s is not readable, please check." % tasks_path)
|
||
|
|
return
|
||
|
|
- LOGGER.info("Add %s pids to %s" %
|
||
|
|
- (vm_cgrp_name, os.path.join(LOW_VMS_RESGROUP_PATH, "tasks")))
|
||
|
|
+
|
||
|
|
+ resctrl_tsk_path = os.path.join(LOW_VMS_RESGROUP_PATH, "tasks")
|
||
|
|
+ LOGGER.debug("Add %s pids to %s" % (tasks_path, resctrl_tsk_path))
|
||
|
|
try:
|
||
|
|
with open(tasks_path) as tasks:
|
||
|
|
for task in tasks:
|
||
|
|
- util.file_write(os.path.join(LOW_VMS_RESGROUP_PATH, "tasks"), task)
|
||
|
|
+ util.file_write(resctrl_tsk_path, task)
|
||
|
|
except IOError as e:
|
||
|
|
- LOGGER.error("Failed to add VM(%s) pids to resctrl: %s" % (vm_cgrp_name, str(e)))
|
||
|
|
+ LOGGER.error("Failed to add %s pids to resctrl: %s" % (tasks_path, str(e)))
|
||
|
|
# If the VM doesn't stop, raise exception.
|
||
|
|
if os.access(tasks_path, os.F_OK):
|
||
|
|
raise
|
||
|
|
diff --git a/skylark.py b/skylark.py
|
||
|
|
index 8962ba5..c281a54 100644
|
||
|
|
--- a/skylark.py
|
||
|
|
+++ b/skylark.py
|
||
|
|
@@ -27,7 +27,7 @@ import stat
|
||
|
|
import subprocess
|
||
|
|
import sys
|
||
|
|
|
||
|
|
-from apscheduler.schedulers.background import BackgroundScheduler
|
||
|
|
+from apscheduler.schedulers.background import BlockingScheduler
|
||
|
|
from apscheduler.events import EVENT_JOB_ERROR
|
||
|
|
import libvirt
|
||
|
|
|
||
|
|
@@ -47,12 +47,7 @@ LIBVIRT_DRIVE_TYPE = None
|
||
|
|
PID_FILE = None
|
||
|
|
MSR_PATH = "/dev/cpu/0/msr"
|
||
|
|
PID_FILE_NAME = "/var/run/skylarkd.pid"
|
||
|
|
-
|
||
|
|
-STATE_TO_STRING = ['VIR_DOMAIN_EVENT_DEFINED', 'VIR_DOMAIN_EVENT_UNDEFINED',
|
||
|
|
- 'VIR_DOMAIN_EVENT_STARTED', 'VIR_DOMAIN_EVENT_SUSPENDED',
|
||
|
|
- 'VIR_DOMAIN_EVENT_RESUMED', 'VIR_DOMAIN_EVENT_STOPPED',
|
||
|
|
- 'VIR_DOMAIN_EVENT_SHUTDOWN', 'VIR_DOMAIN_EVENT_PMSUSPENDED',
|
||
|
|
- 'VIR_DOMAIN_EVENT_CRASHED', 'VIR_DOMAIN_EVENT_LAST']
|
||
|
|
+LOW_VMS_PID_CGRP_PATH = "/sys/fs/cgroup/pids/low_prio_machine.slice"
|
||
|
|
|
||
|
|
|
||
|
|
class QosManager:
|
||
|
|
@@ -63,17 +58,18 @@ class QosManager:
|
||
|
|
self.cpu_controller = CpuController()
|
||
|
|
self.net_controller = NetController()
|
||
|
|
self.cachembw_controller = CacheMBWController()
|
||
|
|
- self.has_job = False
|
||
|
|
|
||
|
|
def scheduler_listener(self, event):
|
||
|
|
if event.exception:
|
||
|
|
+ LOGGER.info("The Scheduler detects an exception, send SIGABRT and restart skylark...")
|
||
|
|
self.scheduler.remove_all_jobs()
|
||
|
|
+ os.kill(os.getpid(), signal.SIGABRT)
|
||
|
|
|
||
|
|
def init_scheduler(self):
|
||
|
|
- self.scheduler = BackgroundScheduler(logger=LOGGER)
|
||
|
|
+ self.scheduler = BlockingScheduler(logger=LOGGER)
|
||
|
|
if os.getenv("POWER_QOS_MANAGEMENT", "false").lower() == "true":
|
||
|
|
self.scheduler.add_job(self.__do_power_manage, trigger='interval', seconds=1, id='do_power_manage')
|
||
|
|
- self.has_job = True
|
||
|
|
+ self.scheduler.add_job(self.__do_resctrl_sync, trigger='interval', seconds=0.5, id='do_resctrl_sync')
|
||
|
|
self.scheduler.add_listener(self.scheduler_listener, EVENT_JOB_ERROR)
|
||
|
|
|
||
|
|
def init_data_collector(self):
|
||
|
|
@@ -95,19 +91,19 @@ class QosManager:
|
||
|
|
def start_scheduler(self):
|
||
|
|
self.scheduler.start()
|
||
|
|
|
||
|
|
- def reset_data_collector(self):
|
||
|
|
- self.scheduler.shutdown(wait=True)
|
||
|
|
- self.data_collector.reset_base_info(self.vir_conn)
|
||
|
|
- if os.getenv("POWER_QOS_MANAGEMENT", "false").lower() == "true":
|
||
|
|
- self.data_collector.reset_power_info()
|
||
|
|
- self.init_scheduler()
|
||
|
|
- self.start_scheduler()
|
||
|
|
-
|
||
|
|
def __do_power_manage(self):
|
||
|
|
self.data_collector.update_base_info(self.vir_conn)
|
||
|
|
self.data_collector.update_power_info()
|
||
|
|
self.power_analyzer.power_manage(self.data_collector, self.cpu_controller)
|
||
|
|
|
||
|
|
+ def __do_resctrl_sync(self):
|
||
|
|
+ with os.scandir(LOW_VMS_PID_CGRP_PATH) as it:
|
||
|
|
+ for entry in it:
|
||
|
|
+ if entry.is_file():
|
||
|
|
+ continue
|
||
|
|
+ tasks_path = os.path.join(LOW_VMS_PID_CGRP_PATH, entry.name, "tasks")
|
||
|
|
+ self.cachembw_controller.add_vm_pids(tasks_path)
|
||
|
|
+
|
||
|
|
|
||
|
|
def create_pid_file():
|
||
|
|
global PID_FILE
|
||
|
|
@@ -140,85 +136,27 @@ def remove_pid_file():
|
||
|
|
util.remove_file(PID_FILE.name)
|
||
|
|
|
||
|
|
|
||
|
|
-def register_callback_event(conn, event_id, callback_func, opaque):
|
||
|
|
- if conn is not None and event_id >= 0:
|
||
|
|
- try:
|
||
|
|
- return conn.domainEventRegisterAny(None, event_id, callback_func, opaque)
|
||
|
|
- except libvirt.libvirtError as error:
|
||
|
|
- LOGGER.warning("Register event exception %s" % str(error))
|
||
|
|
- return -1
|
||
|
|
-
|
||
|
|
-
|
||
|
|
-def deregister_callback_event(conn, callback_id):
|
||
|
|
- if conn is not None and callback_id >= 0:
|
||
|
|
- try:
|
||
|
|
- conn.domainEventDeregisterAny(callback_id)
|
||
|
|
- except libvirt.libvirtError as error:
|
||
|
|
- LOGGER.warning("Deregister event exception %s" % str(error))
|
||
|
|
-
|
||
|
|
-
|
||
|
|
-def event_lifecycle_callback(conn, dom, event, detail, opaque):
|
||
|
|
- LOGGER.info("Occur lifecycle event: domain %s(%d) state changed to %s" % (
|
||
|
|
- dom.name(), dom.ID(), STATE_TO_STRING[event]))
|
||
|
|
- vm_started = (event == libvirt.VIR_DOMAIN_EVENT_STARTED)
|
||
|
|
- vm_stopped = (event == libvirt.VIR_DOMAIN_EVENT_STOPPED)
|
||
|
|
- if vm_started or vm_stopped:
|
||
|
|
- QOS_MANAGER_ENTRY.reset_data_collector()
|
||
|
|
- if vm_started:
|
||
|
|
- QOS_MANAGER_ENTRY.cachembw_controller.domain_updated(dom,
|
||
|
|
- QOS_MANAGER_ENTRY.data_collector.guest_info)
|
||
|
|
- return 0
|
||
|
|
-
|
||
|
|
-
|
||
|
|
-def event_device_added_callback(conn, dom, dev_alias, opaque):
|
||
|
|
- device_name = str(dev_alias[0:4])
|
||
|
|
- if device_name == "vcpu":
|
||
|
|
- LOGGER.info("Occur device added event: domain %s(%d) add vcpu" % (dom.name(), dom.ID()))
|
||
|
|
- QOS_MANAGER_ENTRY.reset_data_collector()
|
||
|
|
- QOS_MANAGER_ENTRY.cachembw_controller.domain_updated(dom,
|
||
|
|
- QOS_MANAGER_ENTRY.data_collector.guest_info)
|
||
|
|
-
|
||
|
|
-
|
||
|
|
-def event_device_removed_callback(conn, dom, dev_alias, opaque):
|
||
|
|
- device_name = str(dev_alias[0:4])
|
||
|
|
- if device_name == "vcpu":
|
||
|
|
- LOGGER.info("Occur device removed event: domain %s(%d) removed vcpu" % (dom.name(), dom.ID()))
|
||
|
|
- QOS_MANAGER_ENTRY.reset_data_collector()
|
||
|
|
-
|
||
|
|
-
|
||
|
|
def sigterm_handler(signo, stack):
|
||
|
|
sys.exit(0)
|
||
|
|
|
||
|
|
+def sigabrt_handler(signo, stack):
|
||
|
|
+ sys.exit(1)
|
||
|
|
+
|
||
|
|
|
||
|
|
def func_daemon():
|
||
|
|
global LIBVIRT_CONN
|
||
|
|
global QOS_MANAGER_ENTRY
|
||
|
|
|
||
|
|
- event_lifecycle_id = -1
|
||
|
|
- event_device_added_id = -1
|
||
|
|
- event_device_removed_id = -1
|
||
|
|
-
|
||
|
|
signal.signal(signal.SIGTERM, sigterm_handler)
|
||
|
|
- signal.signal(signal.SIGABRT, sigterm_handler)
|
||
|
|
+ signal.signal(signal.SIGABRT, sigabrt_handler)
|
||
|
|
|
||
|
|
@atexit.register
|
||
|
|
def daemon_exit_func():
|
||
|
|
- deregister_callback_event(LIBVIRT_CONN, event_lifecycle_id)
|
||
|
|
- deregister_callback_event(LIBVIRT_CONN, event_device_added_id)
|
||
|
|
- deregister_callback_event(LIBVIRT_CONN, event_device_removed_id)
|
||
|
|
LIBVIRT_CONN.close()
|
||
|
|
remove_pid_file()
|
||
|
|
|
||
|
|
create_pid_file()
|
||
|
|
|
||
|
|
- try:
|
||
|
|
- if libvirt.virEventRegisterDefaultImpl() < 0:
|
||
|
|
- LOGGER.error("Failed to register event implementation!")
|
||
|
|
- sys.exit(1)
|
||
|
|
- except libvirt.libvirtError:
|
||
|
|
- LOGGER.error("System internal error!")
|
||
|
|
- sys.exit(1)
|
||
|
|
-
|
||
|
|
LOGGER.info("Try to open libvirtd connection")
|
||
|
|
try:
|
||
|
|
LIBVIRT_CONN = libvirt.open(LIBVIRT_URI)
|
||
|
|
@@ -227,40 +165,17 @@ def func_daemon():
|
||
|
|
LOGGER.error("System internal error, failed to open libvirtd connection!")
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
- event_lifecycle_id = register_callback_event(LIBVIRT_CONN,
|
||
|
|
- libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE,
|
||
|
|
- event_lifecycle_callback, None)
|
||
|
|
- event_device_added_id = register_callback_event(LIBVIRT_CONN,
|
||
|
|
- libvirt.VIR_DOMAIN_EVENT_ID_DEVICE_ADDED,
|
||
|
|
- event_device_added_callback, None)
|
||
|
|
- event_device_removed_id = register_callback_event(LIBVIRT_CONN,
|
||
|
|
- libvirt.VIR_DOMAIN_EVENT_ID_DEVICE_REMOVED,
|
||
|
|
- event_device_removed_callback, None)
|
||
|
|
- if event_lifecycle_id < 0 or event_device_added_id < 0 or event_device_removed_id < 0:
|
||
|
|
- LOGGER.error("Failed to register libvirt event %d, %d, %d"
|
||
|
|
- % (event_lifecycle_id, event_device_added_id, event_device_removed_id))
|
||
|
|
- sys.exit(1)
|
||
|
|
- LOGGER.info("Libvirtd connected and libvirt event registered.")
|
||
|
|
+ LOGGER.info("Libvirtd connected.")
|
||
|
|
|
||
|
|
QOS_MANAGER_ENTRY = QosManager(LIBVIRT_CONN)
|
||
|
|
QOS_MANAGER_ENTRY.init_scheduler()
|
||
|
|
QOS_MANAGER_ENTRY.init_data_collector()
|
||
|
|
QOS_MANAGER_ENTRY.init_qos_analyzer()
|
||
|
|
QOS_MANAGER_ENTRY.init_qos_controller()
|
||
|
|
+
|
||
|
|
+ LOGGER.info("QoS management ready to start.")
|
||
|
|
QOS_MANAGER_ENTRY.start_scheduler()
|
||
|
|
- LOGGER.info("QoS management started.")
|
||
|
|
-
|
||
|
|
- while LIBVIRT_CONN.isAlive():
|
||
|
|
- if not QOS_MANAGER_ENTRY.scheduler.get_jobs() and QOS_MANAGER_ENTRY.has_job:
|
||
|
|
- LOGGER.error("The Scheduler detects an exception, process will exit!")
|
||
|
|
- break
|
||
|
|
- try:
|
||
|
|
- if libvirt.virEventRunDefaultImpl() < 0:
|
||
|
|
- LOGGER.warning("Failed to run event loop")
|
||
|
|
- break
|
||
|
|
- except libvirt.libvirtError as error:
|
||
|
|
- LOGGER.warning("Run libvirt event loop failed: %s" % str(error))
|
||
|
|
- break
|
||
|
|
+
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
|
||
|
|
--
|
||
|
|
2.33.0
|
||
|
|
|