diff --git a/0001-optimize-cve-query-performance.patch b/0001-optimize-cve-query-performance.patch deleted file mode 100644 index 9fbc4a3..0000000 --- a/0001-optimize-cve-query-performance.patch +++ /dev/null @@ -1,686 +0,0 @@ -From f5dd41212f89f4392b0d33b87bd5fda8648c2bd3 Mon Sep 17 00:00:00 2001 -From: gongzt -Date: Tue, 10 Oct 2023 16:02:17 +0800 -Subject: Optimize cve query performance -MIME-Version: 1.0 -Content-Type: text/plain; charset=UTF-8 -Content-Transfer-Encoding: 8bit - ---- - apollo/database/proxy/cve.py | 260 +++++++++------------------------- - apollo/database/proxy/host.py | 70 +++++---- - apollo/database/table.py | 1 + - database/apollo.sql | 104 +++++++++++++- - 4 files changed, 215 insertions(+), 220 deletions(-) - -diff --git a/apollo/database/proxy/cve.py b/apollo/database/proxy/cve.py -index 3a6ec01..4125894 100644 ---- a/apollo/database/proxy/cve.py -+++ b/apollo/database/proxy/cve.py -@@ -17,11 +17,10 @@ Description: Host table operation - """ - import math - import copy --from typing import List, Tuple - from collections import defaultdict - - from elasticsearch import ElasticsearchException --from sqlalchemy import func, tuple_, case, distinct, or_ -+from sqlalchemy import func, tuple_, case - from sqlalchemy.exc import SQLAlchemyError - from vulcanus.database.helper import sort_and_page, judge_return_code - from vulcanus.database.proxy import MysqlProxy, ElasticsearchProxy -@@ -83,40 +82,15 @@ class CveMysqlProxy(MysqlProxy): - """ - result = {"Critical": 0, "High": 0, "Medium": 0, "Low": 0, "Unknown": 0} - username = data["username"] -- cve_overview_query = self._query_cve_overview(username).all() -+ cve_overview = self.session.execute("CALL GET_CVE_OVERVIEW_PRO(:username)", {"username": username}).fetchall() - -- for severity, count in cve_overview_query: -+ for severity, count in cve_overview: - if severity not in result: - LOGGER.debug("Unknown cve severity '%s' when getting overview." % severity) - continue - result[severity] = count - return {"result": result} - -- def _query_cve_overview(self, username): -- """ -- query cve overview -- Args: -- username (str): user name of the request -- -- Returns: -- sqlalchemy.orm.query.Query -- """ -- cve_id_with_severity = ( -- self.session.query( -- distinct(CveHostAssociation.cve_id), -- case([(Cve.severity == None, "Unknown")], else_=Cve.severity).label("severity"), -- ) -- .select_from(CveHostAssociation) -- .outerjoin(Cve, CveHostAssociation.cve_id == Cve.cve_id) -- .outerjoin(Host, CveHostAssociation.host_id == Host.host_id) -- .filter(CveHostAssociation.affected == 1, CveHostAssociation.fixed == 0, Host.user == username) -- .subquery() -- ) -- cve_overview_query = self.session.query( -- cve_id_with_severity.c.severity, func.count(cve_id_with_severity.c.severity) -- ).group_by(cve_id_with_severity.c.severity) -- return cve_overview_query -- - def get_cve_host(self, data): - """ - Get hosts info of a cve -@@ -250,7 +224,7 @@ class CveMysqlProxy(MysqlProxy): - CveHostAssociation.fixed, - ) - .join(CveHostAssociation, Host.host_id == CveHostAssociation.host_id) -- .filter(Host.user == username, CveHostAssociation.cve_id == cve_id) -+ .filter(CveHostAssociation.cve_id == cve_id, Host.user == username) - .filter(*filters) - .group_by(Host.host_id) - ) -@@ -501,7 +475,7 @@ class CveMysqlProxy(MysqlProxy): - if host_list: - filters.add(Host.host_id.in_(host_list)) - -- cve_query = ( -+ cve_host_list = ( - self.session.query( - CveHostAssociation.cve_id, - Host.host_id, -@@ -514,8 +488,9 @@ class CveMysqlProxy(MysqlProxy): - .join(CveHostAssociation, Host.host_id == CveHostAssociation.host_id) - .filter(CveHostAssociation.cve_id.in_(cve_list)) - .filter(*filters) -+ .all() - ) -- return cve_query -+ return cve_host_list - - def _get_cve_source_pkg(self, cve_list: list) -> dict: - """ -@@ -675,178 +650,70 @@ class CveProxy(CveMysqlProxy, CveEsProxy): - EsOperationError - """ - result = {"total_count": 0, "total_page": 0, "result": []} -+ cve_list, total = self._query_cve_list(data) - -- filters = self._get_cve_list_filters(data.get("filter", {}), data["username"]) -- cve_query = self._query_cve_list(filters) -- -- total_count = len(cve_query.all()) -- if not total_count: -- return result -- -- cve_info_list, cve_pkg_dict = self._preprocess_cve_list_query(cve_query) -- processed_cve_list, total_page = self._sort_and_page_cve_list(cve_info_list, data) -- description_dict = self._get_cve_description([cve_info["cve_id"] for cve_info in processed_cve_list]) -+ description_dict = self._get_cve_description([cve_info["cve_id"] for cve_info in cve_list]) - -- result['result'] = self._add_additional_info_to_cve_list(processed_cve_list, description_dict, cve_pkg_dict) -- result['total_page'] = total_page -- result['total_count'] = total_count -+ result['result'] = self._add_description_to_cve(cve_list, description_dict) -+ result['total_page'] = math.ceil(total / data["per_page"]) -+ result['total_count'] = total - - return result - - @staticmethod -- def _get_cve_list_filters(filter_dict, username): -- """ -- Generate filters -+ def _sort_and_page_cve_list(data) -> dict: -+ start_limt = int(data["per_page"]) * (int(data["page"]) - 1) -+ end_limt = int(data["per_page"]) * int(data["page"]) - -- Args: -- filter_dict(dict): filter dict to filter cve list, e.g. -- { -- "cve_id": "2021", -- "severity": ["high"], -- "affected": True, -- "fixed": True, -- "package": "kernel" -- } -- username(str): admin -- -- Returns: -- set -- """ -- filters = {Host.user == username} -- if not filter_dict: -- return filters -+ # sort by host num by default -+ order_by_filed = data.get('sort', "cve_host_user_count.host_num") -+ if order_by_filed == "host_num": -+ order_by_filed = "cve_host_user_count.host_num" -+ order_by = "dsc" if data.get("direction") == "desc" else "asc" - -- if filter_dict.get("search_key"): -- filters.add( -- or_( -- CveHostAssociation.cve_id.like("%" + filter_dict["search_key"] + "%"), -- CveAffectedPkgs.package.like("%" + filter_dict["search_key"] + "%"), -- ) -- ) -- if filter_dict.get("severity"): -- filters.add(Cve.severity.in_(filter_dict["severity"])) -- if "fixed" in filter_dict: -- filters.add(CveHostAssociation.fixed == filter_dict["fixed"]) -- if "affected" in filter_dict: -- filters.add(CveHostAssociation.affected == filter_dict["affected"]) -- return filters -+ return {"start_limt": start_limt, "end_limt": end_limt, "order_by_filed": order_by_filed, "order_by": order_by} - -- def _query_cve_list(self, filters): -+ def _query_cve_list(self, data): - """ - query needed cve info - Args: -- filters (set): filter given by user -+ data (set): filter given by user - - Returns: - sqlalchemy.orm.query.Query: attention, two rows may have same cve id with different source package. - """ -- cve_query = ( -- self.session.query( -- CveHostAssociation.cve_id, -- case([(Cve.publish_time == None, "")], else_=Cve.publish_time).label("publish_time"), -- case([(CveAffectedPkgs.package == None, "")], else_=CveAffectedPkgs.package).label("package"), -- case([(Cve.severity == None, "")], else_=Cve.severity).label("severity"), -- case([(Cve.cvss_score == None, "")], else_=Cve.cvss_score).label("cvss_score"), -- func.count(distinct(CveHostAssociation.host_id)).label("host_num"), -- ) -- .outerjoin(Cve, CveHostAssociation.cve_id == Cve.cve_id) -- .outerjoin(Host, Host.host_id == CveHostAssociation.host_id) -- .outerjoin(CveAffectedPkgs, CveAffectedPkgs.cve_id == CveHostAssociation.cve_id) -- .filter(*filters) -- .group_by(CveHostAssociation.cve_id, CveAffectedPkgs.package) -- ) -- return cve_query -+ filters = {"username": data["username"], "search_key": None, "severity": None, "affected": True} -+ filters.update(data.get("filter", {})) -+ filters.update(self._sort_and_page_cve_list(data)) -+ if filters["severity"]: -+ filters["severity"] = ",".join(["'" + serverity + "'" for serverity in filters["severity"]]) - -- @staticmethod -- def _preprocess_cve_list_query(cve_list_query) -> Tuple[List[dict], dict]: -- """ -- get each cve's source package set and deduplication rows by cve id -- Args: -- cve_list_query(sqlalchemy.orm.query.Query): rows of cve list info (two rows may have same cve id -- with different source package) -- Returns: -- list: list of cve info without package and description. -- dict: key is cve id, value is cve affected source package joined with ',', e.g. 'kernel,vim' -- """ -- cve_pkgs_dict = defaultdict(set) -- cve_info_list = [] -- for row in cve_list_query: -- cve_id = row.cve_id -- if cve_id not in cve_pkgs_dict: -- cve_info = { -- "cve_id": cve_id, -- "publish_time": row.publish_time, -- "severity": row.severity, -- "cvss_score": row.cvss_score, -- "host_num": row.host_num, -- } -- cve_info_list.append(cve_info) -- cve_pkgs_dict[cve_id].add(row.package) -- -- final_cve_pkgs_dict = {} -- for cve_id, pkg_set in cve_pkgs_dict.items(): -- final_cve_pkgs_dict[cve_id] = ",".join(list(pkg_set)) -- return cve_info_list, final_cve_pkgs_dict -- -- @staticmethod -- def _sort_and_page_cve_list(cve_info_list, data) -> Tuple[list, int]: -- """ -- sort and page cve info -- Args: -- cve_info_list (list): cve info list. not empty. -- data (dict): parameter, e.g. -- { -- "sort": "cve_id", -- "direction": "asc", -- "page": 1, -- "per_page": 10, -- "username": "admin", -- "filter": { -- "cve_id": "cve-2021", -- "severity": "medium", -- "affected": True, -- "fixed": True, -- "package": "kernel" -- } -- } -- -- Returns: -- list: sorted cve info list -- int: total page -- """ -- page = data.get('page') -- per_page = data.get('per_page') -- # sort by host num by default -- sort_column = data.get('sort', "host_num") -- reverse = True if data.get("direction") == "desc" else False -- -- total_page = 1 -- total_count = len(cve_info_list) -- -- cve_info_list.sort(key=lambda cve_info: cve_info[sort_column], reverse=reverse) -- -- if page and per_page: -- total_page = math.ceil(total_count / per_page) -- return cve_info_list[per_page * (page - 1) : per_page * page], total_page -- -- return cve_info_list, total_page -+ # Call stored procedure: GET_CVE_LIST_PRO -+ pro_result_set = self.session.execute( -+ "CALL GET_CVE_LIST_PRO(:username,:search_key,:severity,:fixed,:affected,:order_by_filed,:order_by,:start_limt,:end_limt)", -+ filters, -+ ) -+ cursor = pro_result_set.cursor -+ columns = [column[0] for column in cursor.description] -+ cve_list = [dict(zip(columns, cve)) for cve in cursor.fetchall()] -+ cursor.nextset() -+ total = cursor.fetchone()[0] -+ return cve_list, total - - @staticmethod -- def _add_additional_info_to_cve_list(cve_info_list, description_dict, cve_package_dict): -+ def _add_description_to_cve(cve_info_list, description_dict): - """ -- add description and affected source packages for each cve -+ add description for each cve - Args: - cve_info_list: list of cve info without description and package - description_dict (dict): key is cve's id, value is cve's description -- cve_package_dict (dict): key is cve's id, value is cve's packages joined with ',' - - Returns: - list - """ - for cve_info in cve_info_list: - cve_id = cve_info["cve_id"] -- cve_info["description"] = description_dict[cve_id] if description_dict.get(cve_id) else "" -- cve_info["package"] = cve_package_dict[cve_id] if cve_package_dict.get(cve_id) else "" -+ cve_info["description"] = description_dict[cve_id] if description_dict.get(cve_id) else None - return cve_info_list - - def get_cve_info(self, data): -@@ -906,8 +773,7 @@ class CveProxy(CveMysqlProxy, CveEsProxy): - cve_id = data["cve_id"] - username = data["username"] - -- cve_info_query = self._query_cve_info(username, cve_id) -- cve_info_data = cve_info_query.first() -+ cve_info_data = self._query_cve_info(cve_id) - if cve_info_data: - # raise exception when multiple record found - -@@ -928,24 +794,27 @@ class CveProxy(CveMysqlProxy, CveEsProxy): - } - return SUCCEED, {"result": info_dict} - -- def _query_cve_info(self, username, cve_id): -+ def _query_cve_info(self, cve_id): - """ - query needed cve info - Args: -- username (str): user name of the request - cve_id (str): cve id - - Returns: - sqlalchemy.orm.query.Query - """ -- cve_info_query = self.session.query( -- case([(Cve.cve_id == None, "")], else_=Cve.cve_id).label("cve_id"), -- case([(Cve.publish_time == None, "")], else_=Cve.publish_time).label("publish_time"), -- case([(Cve.severity == None, "")], else_=Cve.severity).label("severity"), -- case([(Cve.cvss_score == None, "")], else_=Cve.cvss_score).label("cvss_score"), -- ).filter(Cve.cve_id == cve_id) -+ cve_info = ( -+ self.session.query( -+ case([(Cve.cve_id == None, "")], else_=Cve.cve_id).label("cve_id"), -+ case([(Cve.publish_time == None, "")], else_=Cve.publish_time).label("publish_time"), -+ case([(Cve.severity == None, "")], else_=Cve.severity).label("severity"), -+ case([(Cve.cvss_score == None, "")], else_=Cve.cvss_score).label("cvss_score"), -+ ) -+ .filter(Cve.cve_id == cve_id) -+ .first() -+ ) - -- return cve_info_query -+ return cve_info - - def _get_affected_pkgs(self, cve_id): - """ -@@ -979,20 +848,25 @@ class CveProxy(CveMysqlProxy, CveEsProxy): - return [] - pkg_list = [pkg["package"] for pkg in pkg_list] - -- exist_cve_query = ( -+ exist_cve = ( - self.session.query(CveHostAssociation.cve_id) -- .join(Host, Host.host_id == CveHostAssociation.host_id) -- .filter(Host.user == username, CveHostAssociation.affected == 1, CveHostAssociation.fixed == 0) -+ .filter( -+ CveHostAssociation.host_user == username, -+ CveHostAssociation.fixed == 0, -+ CveHostAssociation.affected == 1, -+ ) - .distinct() -+ .all() - ) -- -- related_cve_query = ( -+ cve_ids = [cve.cve_id for cve in exist_cve] -+ related_cve = ( - self.session.query(CveAffectedPkgs.cve_id) -- .filter(CveAffectedPkgs.package.in_(pkg_list), CveAffectedPkgs.cve_id.in_(exist_cve_query.subquery())) -+ .filter(CveAffectedPkgs.package.in_(pkg_list), CveAffectedPkgs.cve_id.in_(cve_ids)) - .distinct() -+ .all() - ) - -- related_cve = [row[0] for row in related_cve_query.all() if row[0] != cve_id] -+ related_cve = [cve.cve_id for cve in related_cve if cve.cve_id != cve_id] - - return related_cve - -diff --git a/apollo/database/proxy/host.py b/apollo/database/proxy/host.py -index c46394f..4b75f1c 100644 ---- a/apollo/database/proxy/host.py -+++ b/apollo/database/proxy/host.py -@@ -103,20 +103,44 @@ class HostMysqlProxy(MysqlProxy): - if not total_count: - return result - -- sort_column = self._get_host_list_sort_column(data.get('sort'), host_query) -+ sort_column = self._get_host_list_sort_column(data.get('sort')) - direction, page, per_page = data.get('direction'), data.get('page'), data.get('per_page') - - processed_query, total_page = sort_and_page(host_query, sort_column, direction, per_page, page) - - host_rows = processed_query.all() -- result['result'] = self._host_list_row2dict(host_rows) -+ host_ids = [host.host_id for host in host_rows] -+ host_cve_fixed_info = self._get_host_cve_fixed_info(host_ids) -+ result['result'] = self._host_list_row2dict(host_rows, host_cve_fixed_info) - result['total_page'] = total_page - result['total_count'] = total_count - - return result - -+ def _get_host_cve_fixed_info(self, host_ids): -+ """ -+ Get host cve fixed info -+ Args: -+ host_ids(list): host id list -+ -+ Returns: -+ dict -+ """ -+ -+ host_cve_fixed_list = ( -+ self.session.query( -+ CveHostAssociation.host_id, -+ func.COUNT(func.IF(CveHostAssociation.fixed == True, 1, None)).label("fixed_cve_num"), -+ func.COUNT(func.IF(CveHostAssociation.fixed == False, 1, None)).label("unfixed_cve_num"), -+ ) -+ .filter(CveHostAssociation.host_id.in_(host_ids)) -+ .group_by(CveHostAssociation.host_id) -+ .all() -+ ) -+ return host_cve_fixed_list -+ - @staticmethod -- def _get_host_list_sort_column(column_name, query=None): -+ def _get_host_list_sort_column(column_name): - """ - get column or aggregation column of table by name - Args: -@@ -127,8 +151,6 @@ class HostMysqlProxy(MysqlProxy): - """ - if not column_name: - return None -- if column_name == "cve_num" and query is not None: -- return query.c.cve_num - return getattr(Host, column_name) - - def _query_host_list(self, filters): -@@ -140,7 +162,6 @@ class HostMysqlProxy(MysqlProxy): - Returns: - sqlalchemy.orm.query.Query - """ -- subquery = self._query_host_cve_info().distinct().subquery() - query = ( - self.session.query( - Host.host_id, -@@ -149,39 +170,30 @@ class HostMysqlProxy(MysqlProxy): - Host.host_group_name, - Host.repo_name, - Host.last_scan, -- func.COUNT(func.IF(subquery.c.fixed == True, 1, None)).label("fixed_cve_num"), -- func.COUNT(func.IF(subquery.c.fixed == False, 1, None)).label("unfixed_cve_num"), - ) -- .outerjoin(subquery, Host.host_id == subquery.c.host_id) - .group_by(Host.host_id) - .filter(*filters) - ) - - return query - -- def _query_host_cve_info(self): -- """ -- query host with its cve -- -- Returns: -- sqlalchemy.orm.query.Query -- """ -- return self.session.query( -- CveHostAssociation.host_id, CveHostAssociation.cve_id, CveHostAssociation.fixed, CveHostAssociation.affected -- ) -- - @staticmethod -- def _host_list_row2dict(rows): -+ def _host_list_row2dict(rows, host_cve_fixed_info): - result = [] -+ host_cve_fixed_info_dict = {host_cve.host_id: host_cve for host_cve in host_cve_fixed_info} - for row in rows: -+ unfixed_cve_num, fixed_cve_num = 0, 0 -+ if row.host_id in host_cve_fixed_info_dict: -+ unfixed_cve_num = host_cve_fixed_info_dict[row.host_id].unfixed_cve_num -+ fixed_cve_num = host_cve_fixed_info_dict[row.host_id].fixed_cve_num - host_info = { - "host_id": row.host_id, - "host_name": row.host_name, - "host_ip": row.host_ip, - "host_group": row.host_group_name, - "repo": row.repo_name, -- "unfixed_cve_num": row.unfixed_cve_num, -- "fixed_cve_num": row.fixed_cve_num, -+ "unfixed_cve_num": unfixed_cve_num, -+ "fixed_cve_num": fixed_cve_num, - "last_scan": row.last_scan, - } - result.append(host_info) -@@ -373,7 +385,17 @@ class HostMysqlProxy(MysqlProxy): - Returns: - sqlalchemy.orm.query.Query - """ -- subquery = self._query_host_cve_info().distinct().subquery() -+ subquery = ( -+ self.session.query( -+ CveHostAssociation.host_id, -+ CveHostAssociation.fixed, -+ CveHostAssociation.affected, -+ CveHostAssociation.cve_id, -+ ) -+ .filter(CveHostAssociation.host_id == host_id) -+ .distinct() -+ .subquery() -+ ) - query = ( - self.session.query( - Host.host_id, -diff --git a/apollo/database/table.py b/apollo/database/table.py -index 2e607d3..04b1e4b 100644 ---- a/apollo/database/table.py -+++ b/apollo/database/table.py -@@ -126,6 +126,7 @@ class CveHostAssociation(Base, MyBase): - hp_status = Column(String(20)) - installed_rpm = Column(String(100)) - available_rpm = Column(String(100)) -+ host_user = Column(String(100)) - - - class CveAffectedPkgs(Base, MyBase): -diff --git a/database/apollo.sql b/database/apollo.sql -index d90376f..4e87727 100644 ---- a/database/apollo.sql -+++ b/database/apollo.sql -@@ -48,8 +48,11 @@ CREATE TABLE IF NOT EXISTS `cve_host_match` ( - `hp_status` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL, - `installed_rpm` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL, - `available_rpm` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL, -+ `host_user` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL, - PRIMARY KEY (`id`) USING BTREE, - INDEX `ix_cve_host_match_host_id`(`host_id`) USING BTREE, -+ INDEX `ix_cve_host_match_cve_id`(`cve_id`) USING BTREE, -+ INDEX `ix_cve_hsot_match_user`(`host_user`) USING BTREE, - CONSTRAINT `cve_host_match_ibfk_1` FOREIGN KEY (`host_id`) REFERENCES `host` (`host_id`) ON DELETE CASCADE ON UPDATE RESTRICT - ) ENGINE = InnoDB AUTO_INCREMENT = 2621 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic; - -@@ -77,7 +80,7 @@ CREATE TABLE IF NOT EXISTS `task_cve_host` ( - `task_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, - `cve_id` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, - `host_id` int(11) NOT NULL, -- `host_name` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, -+ `host_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, - `host_ip` varchar(16) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, - `status` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL, - `hotpatch` tinyint(4) NULL DEFAULT NULL, -@@ -88,7 +91,7 @@ CREATE TABLE IF NOT EXISTS `task_cve_host` ( - CREATE TABLE IF NOT EXISTS `task_host_repo` ( - `task_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, - `host_id` int(11) NOT NULL, -- `host_name` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, -+ `host_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, - `host_ip` varchar(16) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, - `repo_name` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, - `status` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL, -@@ -119,4 +122,99 @@ CREATE TABLE IF NOT EXISTS `task_rollback`( - `dnf_event_start` int(11) NULL DEFAULT NULL, - `dnf_event_end` int(11) NULL DEFAULT NULL, - PRIMARY KEY (`id`) USING BTREE --) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic; -\ No newline at end of file -+) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic; -+ -+CREATE PROCEDURE GET_CVE_LIST_PRO(IN username VARCHAR(20), IN search_key VARCHAR(100), IN severity VARCHAR(20), IN fixed TINYINT, IN affected TINYINT,IN order_by_filed VARCHAR(50),IN order_by VARCHAR(20),IN start_limt INT,IN end_limt INT) -+BEGIN -+ -+ DROP TABLE IF EXISTS cve_host_user_count; -+ SET @tmp_cve_host_count_sql = 'CREATE TEMPORARY TABLE cve_host_user_count SELECT -+ cve_id, -+ COUNT(host_id) AS host_num -+ FROM -+ cve_host_match FORCE INDEX (ix_cve_host_match_host_id) -+ WHERE 1=1 '; -+ -+ IF search_key is not null and search_key !='' THEN -+ SET @tmp_cve_host_count_sql = CONCAT(@tmp_cve_host_count_sql, ' AND LOCATE("', search_key, '", cve_id) > 0 '); -+ END IF; -+ IF fixed is not null THEN -+ SET @tmp_cve_host_count_sql = CONCAT(@tmp_cve_host_count_sql, ' AND fixed = ', fixed, ' '); -+ END IF; -+ IF affected is not null THEN -+ SET @tmp_cve_host_count_sql = CONCAT(@tmp_cve_host_count_sql, ' AND affected = ', affected, ' '); -+ END IF; -+ -+ SET @tmp_cve_host_count_sql = CONCAT(@tmp_cve_host_count_sql, ' AND host_user = "', username, '" GROUP BY cve_id'); -+ -+ prepare stmt from @tmp_cve_host_count_sql; -+ EXECUTE stmt; -+ DEALLOCATE PREPARE stmt; -+ -+ SET @cve_list_sql = 'SELECT -+ cve_host_user_count.cve_id, -+ cve.publish_time, -+ cve_pkg.package, -+ cve.severity, -+ cve.cvss_score, -+ cve_host_user_count.host_num -+ FROM -+ cve_host_user_count -+ LEFT JOIN cve ON cve.cve_id = cve_host_user_count.cve_id -+ LEFT JOIN (select DISTINCT cve_id, GROUP_CONCAT(DISTINCT package SEPARATOR ",") AS package from cve_affected_pkgs group by cve_id) as cve_pkg ON cve_host_user_count.cve_id = cve_pkg.cve_id where 1=1 '; -+ -+ set @cve_list_page_count_sql='SELECT -+ count(1) as total -+ FROM -+ cve_host_user_count -+ LEFT JOIN cve ON cve.cve_id = cve_host_user_count.cve_id -+ LEFT JOIN (select cve_id,package from cve_affected_pkgs GROUP BY cve_id,package) as cve_pkg ON cve_host_user_count.cve_id = cve_pkg.cve_id where 1=1 '; -+ -+ IF search_key IS NOT NULL and search_key !='' THEN -+ SET @cve_list_sql = CONCAT(@cve_list_sql, 'AND ( LOCATE("', search_key, '", cve_pkg.package) > 0 ',' OR LOCATE("',search_key, '", cve_host_user_count.cve_id) > 0 ) '); -+ SET @cve_list_page_count_sql = CONCAT(@cve_list_page_count_sql, 'AND ( LOCATE("', search_key, '", cve_pkg.package) > 0 ',' OR LOCATE("',search_key, '", cve_host_user_count.cve_id) > 0 ) '); -+ END IF; -+ IF severity IS NOT NULL and severity !='' THEN -+ SET @cve_list_sql = CONCAT(@cve_list_sql, 'AND cve.severity IN (', severity, ') '); -+ SET @cve_list_page_count_sql = CONCAT(@cve_list_page_count_sql, 'AND cve.severity IN (', severity, ') '); -+ END IF; -+ -+ IF order_by_filed IS NULL or order_by_filed ='' THEN -+ SET @order_by_filed = 'cve_host_user_count.host_num'; -+ END IF; -+ -+ SET @cve_list_sql = CONCAT(@cve_list_sql, ' ORDER BY ', order_by_filed ,' ', order_by,' limit ',start_limt ,' ,', end_limt); -+ -+ prepare stmt from @cve_list_sql; -+ EXECUTE stmt; -+ DEALLOCATE PREPARE stmt; -+ -+ prepare stmt from @cve_list_page_count_sql; -+ EXECUTE stmt; -+ DEALLOCATE PREPARE stmt; -+ -+END; -+ -+CREATE PROCEDURE GET_CVE_OVERVIEW_PRO(IN username VARCHAR(20)) -+BEGIN -+ -+ DROP TABLE IF EXISTS tmp_cve_overview; -+ SET @tmp_cve_overview_sql = 'CREATE TEMPORARY TABLE tmp_cve_overview SELECT cve_id from cve_host_match where '; -+ -+ SET @tmp_cve_overview_sql = CONCAT(@tmp_cve_overview_sql, ' host_user = "', username, '" and affected=1 and fixed=0 GROUP BY cve_id '); -+ -+ prepare stmt from @tmp_cve_overview_sql; -+ EXECUTE stmt; -+ DEALLOCATE PREPARE stmt; -+ -+ select CASE WHEN cve.severity is null THEN 'Unknown' ELSE cve.severity END as severity,count( CASE WHEN cve.severity is null THEN 'Unknown' ELSE cve.severity END ) as severity_count from tmp_cve_overview left join cve on cve.cve_id=tmp_cve_overview.cve_id GROUP BY cve.severity; -+ -+END; -+ -+CREATE TRIGGER tri_cvehost_match_user BEFORE INSERT ON cve_host_match -+FOR EACH ROW -+begin -+ DECLARE host_user varchar(100); -+ SELECT user into @host_user from host where host_id=new.host_id; -+ set new.host_user=@host_user; -+end; -\ No newline at end of file --- -Gitee - diff --git a/0002-suitable-for-2003-sp3.patch b/0002-suitable-for-2003-sp3.patch deleted file mode 100644 index 539d716..0000000 --- a/0002-suitable-for-2003-sp3.patch +++ /dev/null @@ -1,247 +0,0 @@ -From ad8982cafbe849707a2a07fa3f329f377b5aea33 Mon Sep 17 00:00:00 2001 -From: gongzt -Date: Tue, 17 Oct 2023 09:50:16 +0800 -Subject: Fixed errors in 20.03-sp3, such as task progress, cve repair task, and host cve query -MIME-Version: 1.0 -Content-Type: text/plain; charset=UTF-8 -Content-Transfer-Encoding: 8bit - ---- - apollo/database/proxy/cve.py | 2 +- - apollo/database/proxy/host.py | 136 +++++++++++++--------------------- - apollo/database/proxy/task.py | 2 +- - 3 files changed, 54 insertions(+), 86 deletions(-) - -diff --git a/apollo/database/proxy/cve.py b/apollo/database/proxy/cve.py -index 4125894..257083a 100644 ---- a/apollo/database/proxy/cve.py -+++ b/apollo/database/proxy/cve.py -@@ -485,7 +485,7 @@ class CveMysqlProxy(MysqlProxy): - CveHostAssociation.installed_rpm, - CveHostAssociation.available_rpm, - ) -- .join(CveHostAssociation, Host.host_id == CveHostAssociation.host_id) -+ .join(Host, Host.host_id == CveHostAssociation.host_id) - .filter(CveHostAssociation.cve_id.in_(cve_list)) - .filter(*filters) - .all() -diff --git a/apollo/database/proxy/host.py b/apollo/database/proxy/host.py -index 4b75f1c..b3cabb6 100644 ---- a/apollo/database/proxy/host.py -+++ b/apollo/database/proxy/host.py -@@ -511,31 +511,27 @@ class HostProxy(HostMysqlProxy, CveEsProxy): - result = {"total_count": 0, "total_page": 1, "result": []} - - host_id = data["host_id"] -- filters = self._get_host_cve_filters(data.get("filter", {})) -- host_cve_query = self._query_host_cve(data["username"], host_id, filters).all() -+ host_cve_query = self._query_host_cve(data["username"], host_id, data.get("filter", {})) - -- total_count = len(host_cve_query) -+ total_count = host_cve_query.count() - if not total_count: - return SUCCEED, result - -- cve_info_list, cve_packages_dict = self._preprocess_cve_list_query(host_cve_query) - sort_column = data['sort'] if "sort" in data else "cve_id" - direction, page, per_page = data.get('direction'), data.get('page'), data.get('per_page') - -- processed_cve_list, total_page = self._sort_and_page_host_cve_info( -- cve_info_list, page, per_page, sort_column, direction -- ) -- description_dict = self._get_cve_description(list(cve_packages_dict.keys())) -- result['result'] = self._add_additional_info_to_cve_list( -- processed_cve_list, description_dict, cve_packages_dict -- ) -+ host_cve_list, total_page = sort_and_page(host_cve_query, sort_column, direction, per_page, page) -+ -+ cve_id_list = [cve.cve_id for cve in host_cve_list] -+ description_dict = self._get_cve_description(cve_id_list) -+ result['result'] = self._add_additional_info_to_cve_list(host_cve_list, description_dict) - result['total_page'] = total_page - result['total_count'] = total_count - - return SUCCEED, result - - @staticmethod -- def _get_host_cve_filters(filter_dict): -+ def _get_host_cve_filters(filter_dict, cve_affected_pkg_subquery): - """ - Generate filters to filter host's CVEs - -@@ -563,7 +559,7 @@ class HostProxy(HostMysqlProxy, CveEsProxy): - filters.add( - or_( - CveHostAssociation.cve_id.like("%" + filter_dict["search_key"] + "%"), -- CveAffectedPkgs.package.like("%" + filter_dict["search_key"] + "%"), -+ cve_affected_pkg_subquery.c.package.like("%" + filter_dict["search_key"] + "%"), - ) - ) - if filter_dict.get("severity"): -@@ -573,107 +569,79 @@ class HostProxy(HostMysqlProxy, CveEsProxy): - filters.add(CveHostAssociation.affected == filter_dict["affected"]) - return filters - -- def _query_host_cve(self, username: str, host_id: int, filters: set): -+ def _query_host_cve(self, username: str, host_id: int, filters_dict: dict): - """ - query needed host CVEs info - Args: - username (str): user name of the request - host_id (int): host id -- filters (set): filter given by user -+ filter_dict(dict): filter dict to filter host's CVEs, e.g. -+ { -+ "cve_id": "", -+ "severity": ["high", "unknown"], -+ "affected": True, -+ "package": "vim", -+ "fixed": False // The default is false if the field is null. -+ } - Returns: - sqlalchemy.orm.query.Query - """ -+ cve_affected_pkg_subquery = ( -+ self.session.query( -+ CveAffectedPkgs.cve_id, -+ func.group_concat(func.distinct(CveAffectedPkgs.package), SEPARATOR=",").label("package"), -+ ) -+ .group_by(CveAffectedPkgs.cve_id) -+ .distinct() -+ .subquery() -+ ) -+ -+ filters = self._get_host_cve_filters(filters_dict, cve_affected_pkg_subquery) -+ - host_cve_query = ( - self.session.query( - CveHostAssociation.cve_id, - case([(Cve.publish_time == None, "")], else_=Cve.publish_time).label("publish_time"), - case([(Cve.severity == None, "")], else_=Cve.severity).label("severity"), - case([(Cve.cvss_score == None, "")], else_=Cve.cvss_score).label("cvss_score"), -- case([(CveAffectedPkgs.package == None, "")], else_=CveAffectedPkgs.package).label("package"), -+ case( -+ [(cve_affected_pkg_subquery.c.package == None, "")], else_=cve_affected_pkg_subquery.c.package -+ ).label("package"), - ) - .select_from(CveHostAssociation) - .outerjoin(Cve, CveHostAssociation.cve_id == Cve.cve_id) -- .outerjoin(CveAffectedPkgs, CveAffectedPkgs.cve_id == CveHostAssociation.cve_id) -+ .outerjoin(cve_affected_pkg_subquery, cve_affected_pkg_subquery.c.cve_id == CveHostAssociation.cve_id) - .outerjoin(Host, Host.host_id == CveHostAssociation.host_id) - .filter(CveHostAssociation.host_id == host_id, Host.user == username) - .filter(*filters) -- ).group_by(CveHostAssociation.cve_id, CveAffectedPkgs.package) -+ ).group_by(CveHostAssociation.cve_id, cve_affected_pkg_subquery.c.package) - - return host_cve_query - - @staticmethod -- def _preprocess_cve_list_query(cve_list_query: List[dict]) -> Tuple[List[dict], dict]: -- """ -- get each cve's source package set and deduplication cve info list -- -- Args: -- cve_list_query(list): cve info list(two rows may have same cve id -- with different source package) -- Returns: -- list: list of cve info without package and description. -- dict: key is cve id, value is cve affected source package set. e.g -- { "cve-xxxx-xxxx": {"vim"}, "cve-xxxx-xxxx":{"kernel"} } -- """ -- cve_packages_dict, cve_info_list = defaultdict(set), [] -- -- for row in cve_list_query: -- if row.cve_id not in cve_packages_dict: -- cve_info_list.append( -- { -- "cve_id": row.cve_id, -- "publish_time": row.publish_time, -- "severity": row.severity, -- "cvss_score": row.cvss_score, -- } -- ) -- cve_packages_dict[row.cve_id].add(row.package) -- -- return cve_info_list, cve_packages_dict -- -- @staticmethod -- def _sort_and_page_host_cve_info(cve_info_list, page, per_page, sort_column=None, direction="desc"): -- """ -- sort and page cve info list -- -- Args: -- cve_info_list (list): cve info list. not empty. -- page(int) -- per_page(int): number of record per page -- sort_column(str): the column that sort based on -- direction(str): desc or asc -- -- Returns: -- query result, total number of pages -- """ -- total_page = 1 -- total_count = len(cve_info_list) -- -- if sort_column: -- cve_info_list.sort( -- key=lambda cve_info: cve_info[sort_column], reverse=True if direction == "desc" else False -- ) -- -- if page and per_page: -- total_page = math.ceil(total_count / per_page) -- return cve_info_list[per_page * (page - 1) : per_page * page], total_page -- -- return cve_info_list, total_page -- -- @staticmethod -- def _add_additional_info_to_cve_list(cve_info_list: list, description_dict: dict, cve_packages_dict: list) -> list: -+ def _add_additional_info_to_cve_list(host_cve_list: list, description_dict: dict) -> list: - """ - add description and affected source packages for each cve - - Args: -- cve_info_list: -+ host_cve_list: - description_dict (dict): key is cve's id, value is cve's description -- cve_pkgs_dict (dict): key is cve's id, value is cve's affected packages set - - Returns: - list - """ -- for cve_info in cve_info_list: -- cve_id = cve_info["cve_id"] -- cve_info["description"] = description_dict[cve_id] if description_dict.get(cve_id) else "" -- cve_info["package"] = ",".join(cve_packages_dict[cve_id]) -- return cve_info_list -+ host_cve_info_list = [] -+ for host_cve in host_cve_list: -+ description = description_dict[host_cve.cve_id] if description_dict.get(host_cve.cve_id) else "" -+ host_cve_info_list.append( -+ { -+ "cve_id": host_cve.cve_id, -+ "publish_time": host_cve.publish_time, -+ "severity": host_cve.severity, -+ "cvss_score": host_cve.cvss_score, -+ "package": host_cve.package, -+ "description": description, -+ } -+ ) -+ -+ return host_cve_info_list -diff --git a/apollo/database/proxy/task.py b/apollo/database/proxy/task.py -index e7c3b4e..de151b2 100644 ---- a/apollo/database/proxy/task.py -+++ b/apollo/database/proxy/task.py -@@ -1291,7 +1291,7 @@ class TaskMysqlProxy(MysqlProxy): - status = TaskStatus.UNKNOWN - else: - status = TaskStatus.SUCCEED -- result[cve_id] = {"progress": row.total - row.running - row.none, "status": status} -+ result[cve_id] = {"progress": int(row.total - row.running - row.none), "status": status} - - succeed_list = list(result.keys()) - fail_list = list(set(cve_list) - set(succeed_list)) --- -Gitee - diff --git a/0003-rm-hotpatch.patch b/0003-rm-hotpatch.patch deleted file mode 100644 index 2a21c09..0000000 --- a/0003-rm-hotpatch.patch +++ /dev/null @@ -1,2653 +0,0 @@ -From 7b986fb7739ddd7ab508c5e4e53e1d51442553f7 Mon Sep 17 00:00:00 2001 -From: gongzt -Date: Thu, 19 Oct 2023 03:42:45 +0000 -Subject: [PATCH 1/1] rm - ---- - aops-apollo.spec | 18 +- - hotpatch/baseclass.py | 285 ---------------- - hotpatch/hot_updateinfo.py | 613 ----------------------------------- - hotpatch/hotpatch.py | 165 ---------- - hotpatch/hotupgrade.py | 482 --------------------------- - hotpatch/syscare.py | 116 ------- - hotpatch/test_hotpatch.py | 45 --- - hotpatch/test_hotupgrade.py | 75 ----- - hotpatch/test_syscare.py | 152 --------- - hotpatch/updateinfo_parse.py | 527 ------------------------------ - hotpatch/version.py | 46 --- - 11 files changed, 4 insertions(+), 2520 deletions(-) - delete mode 100644 hotpatch/baseclass.py - delete mode 100644 hotpatch/hot_updateinfo.py - delete mode 100644 hotpatch/hotpatch.py - delete mode 100644 hotpatch/hotupgrade.py - delete mode 100644 hotpatch/syscare.py - delete mode 100644 hotpatch/test_hotpatch.py - delete mode 100644 hotpatch/test_hotupgrade.py - delete mode 100644 hotpatch/test_syscare.py - delete mode 100644 hotpatch/updateinfo_parse.py - delete mode 100644 hotpatch/version.py - -diff --git a/aops-apollo.spec b/aops-apollo.spec -index 308436f..1047cdd 100644 ---- a/aops-apollo.spec -+++ b/aops-apollo.spec -@@ -1,5 +1,5 @@ - Name: aops-apollo --Version: v1.2.1 -+Version: v1.3.3 - Release: 1 - Summary: Cve management service, monitor machine vulnerabilities and provide fix functions. - License: MulanPSL2 -@@ -18,13 +18,6 @@ Provides: aops-apollo - %description - Cve management service, monitor machine vulnerabilities and provide fix functions. - --%package -n dnf-hotpatch-plugin --Summary: dnf hotpatch plugin --Requires: python3-hawkey python3-dnf syscare >= 1.0.1 -- --%description -n dnf-hotpatch-plugin --dnf hotpatch plugin, it's about hotpatch query and fix -- - %package -n aops-apollo-tool - Summary: Small tools for aops-apollo, e.g. updateinfo.xml generater - Requires: python3-rpm -@@ -54,9 +47,6 @@ pushd aops-apollo-tool - %py3_install - popd - --#install for aops-dnf-plugin --cp -r hotpatch %{buildroot}/%{python3_sitelib}/dnf-plugins/ -- - - %files - %doc README.* -@@ -68,9 +58,6 @@ cp -r hotpatch %{buildroot}/%{python3_sitelib}/dnf-plugins/ - %{python3_sitelib}/apollo/* - %attr(0755, root, root) /opt/aops/database/* - --%files -n dnf-hotpatch-plugin --%{python3_sitelib}/dnf-plugins/* -- - %files -n aops-apollo-tool - %attr(0644,root,root) %{_sysconfdir}/aops_apollo_tool/updateinfo_config.ini - %attr(0755,root,root) %{_bindir}/gen-updateinfo -@@ -78,6 +65,9 @@ cp -r hotpatch %{buildroot}/%{python3_sitelib}/dnf-plugins/ - %{python3_sitelib}/aops_apollo_tool/* - - %changelog -+* Thu Oct 19 2023 gongzhengtang - v1.3.3-1 -+- Remove hotpatch -+ - * Tue May 23 2023 zhu-yuncheng - v1.2.1-1 - - Better dnf hotpatch plugin for more syscare command - - Add updateinfo.xml generation tool -diff --git a/hotpatch/baseclass.py b/hotpatch/baseclass.py -deleted file mode 100644 -index b7e29fd..0000000 ---- a/hotpatch/baseclass.py -+++ /dev/null -@@ -1,285 +0,0 @@ --#!/usr/bin/python3 --# ****************************************************************************** --# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. --# licensed under the Mulan PSL v2. --# You can use this software according to the terms and conditions of the Mulan PSL v2. --# You may obtain a copy of Mulan PSL v2 at: --# http://license.coscl.org.cn/MulanPSL2 --# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR --# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR --# PURPOSE. --# See the Mulan PSL v2 for more details. --# ******************************************************************************/ --class Hotpatch(object): -- __slots__ = [ -- '_name', -- '_version', -- '_release', -- '_cves', -- '_advisory', -- '_arch', -- '_filename', -- '_state', -- '_required_pkgs_info', -- '_required_pkgs_str', -- '_required_pkgs_name_str', -- ] -- -- def __init__(self, name, version, arch, filename, release): -- """ -- name: str -- version: str -- arch: str -- filename: str -- release: str -- """ -- self._name = name -- self._version = version -- self._arch = arch -- self._filename = filename -- self._cves = [] -- self._advisory = None -- self._state = '' -- self._release = release -- self._required_pkgs_info = dict() -- self._required_pkgs_str = '' -- self._required_pkgs_name_str = '' -- -- @property -- def state(self): -- return self._state -- -- @state.setter -- def state(self, value): -- self._state = value -- -- @property -- def name(self): -- """ -- name: patch-src_pkg-ACC or patch-src_pkg-SGL_xxx -- """ -- return self._name -- -- @property -- def version(self): -- return self._version -- -- @property -- def release(self): -- return self._release -- -- @property -- def src_pkg(self): -- """ -- The compiled source package for hotpatch. -- -- src_pkg: name-version-release -- """ -- src_pkg = self.name[self.name.index('-') + 1 : self.name.rindex('-')] -- return src_pkg -- -- @property -- def required_pkgs_info(self): -- """ -- The target fixed rpm package of the hotpatch. -- """ -- return self._required_pkgs_info -- -- @required_pkgs_info.setter -- def required_pkgs_info(self, required_pkgs_info): -- """ -- The required pkgs info are from the 'dnf repoquery --requires name-version-release.arch'. The -- required pkgs info are considered to be truely fixed rpm package. -- -- e.g. -- { -- 'redis': '6.2.5-1', -- 'redis-cli': '6.2.5-1' -- } -- """ -- -- self._required_pkgs_info = required_pkgs_info -- required_pkgs_str_list = [] -- required_pkgs_name_str_list = [] -- # sort the _required_pkgs_info and concatenate the str to get _required_pkgs_str -- for required_pkgs_name, required_pkgs_vere in self._required_pkgs_info.items(): -- required_pkgs_str_list.append("%s-%s" % (required_pkgs_name, required_pkgs_vere)) -- required_pkgs_name_str_list.append(required_pkgs_name) -- sorted(required_pkgs_str_list) -- sorted(required_pkgs_name_str_list) -- self._required_pkgs_str = ",".join(required_pkgs_str_list) -- self._required_pkgs_name_str = ",".join(required_pkgs_name_str_list) -- -- @property -- def required_pkgs_str(self): -- """ -- The truly fixed rpm package mark, which is composed of required_pkgs_info. -- -- e.g. -- 'redis-6.2.5-1,redis-cli-6.2.5-1' -- """ -- return self._required_pkgs_str -- -- @property -- def required_pkgs_name_str(self): -- """ -- The truly fixed rpm package name mark, which is composed of keys of required_pkgs_info. -- -- e.g. -- 'redis,redis-cli' -- """ -- return self._required_pkgs_name_str -- -- @property -- def src_pkg_nevre(self): -- """ -- Parse the source package to get the source package name, the source package version and the source package release -- -- Returns: -- src_pkg_name, src_pkg_version, src_pkg_release -- """ -- src_pkg = self.src_pkg -- release_pos = src_pkg.rindex('-') -- version_pos = src_pkg.rindex('-', 0, release_pos) -- src_pkg_name, src_pkg_version, src_pkg_release = ( -- src_pkg[0:version_pos], -- src_pkg[version_pos + 1 : release_pos], -- src_pkg[release_pos + 1 :], -- ) -- return src_pkg_name, src_pkg_version, src_pkg_release -- -- @property -- def nevra(self): -- """ -- Format the filename as 'name-versioin-release.arch' for display, which is defined as nevra -- -- nevra: name-version-release.arch -- """ -- return self.filename[0 : self.filename.rindex('.')] -- -- @property -- def hotpatch_name(self): -- """ -- There are two types of hotpatch, ACC hotpatch and SGL hotpatch. The ACC hotpatch can be made -- iteratively, and its 'hotpatch_name' is defined as ACC. The SGL hotpatch cannot be made iteratively, -- and its 'hotpatch_name' is defined as SGL_xxx. The 'xxx' in the SGL_xxx means the issue it solves. -- """ -- hotpatch_name = self.name[self.name.rindex('-') + 1 :] -- return hotpatch_name -- -- @property -- def syscare_subname(self): -- """ -- The 'syscare_subname' is used for hotpatch status querying in syscare list, which is composed of -- 'src_pkg/hotpatch_name-version-release'. -- """ -- src_pkg = '%s-%s-%s' % (self.src_pkg_nevre) -- -- return '%s/%s-%s-%s' % (src_pkg, self.hotpatch_name, self.version, self.release) -- -- @property -- def cves(self): -- return self._cves -- -- @cves.setter -- def cves(self, cves): -- self._cves = cves -- -- @property -- def advisory(self): -- return self._advisory -- -- @advisory.setter -- def advisory(self, advisory): -- self._advisory = advisory -- -- @property -- def arch(self): -- return self._arch -- -- @property -- def filename(self): -- return self._filename -- -- --class Cve(object): -- __slots__ = ['_cve_id', '_hotpatches'] -- -- def __init__(self, id, **kwargs): -- """ -- id: str -- """ -- self._cve_id = id -- self._hotpatches = [] -- -- @property -- def hotpatches(self): -- return self._hotpatches -- -- def add_hotpatch(self, hotpatch: Hotpatch): -- self._hotpatches.append(hotpatch) -- -- @property -- def cve_id(self): -- return self._cve_id -- -- --class Advisory(object): -- __slots__ = ['_id', '_adv_type', '_title', '_severity', '_description', '_updated', '_hotpatches', '_cves'] -- -- def __init__(self, id, adv_type, title, severity, description, updated="1970-01-01 08:00:00", **kwargs): -- """ -- id: str -- adv_type: str -- title: str -- severity: str -- description: str -- updated: str -- """ -- self._id = id -- self._adv_type = adv_type -- self._title = title -- self._severity = severity -- self._description = description -- self._updated = updated -- self._cves = {} -- self._hotpatches = [] -- -- @property -- def id(self): -- return self._id -- -- @property -- def adv_type(self): -- return self._adv_type -- -- @property -- def title(self): -- return self._title -- -- @property -- def severity(self): -- return self._severity -- -- @property -- def description(self): -- return self._description -- -- @property -- def updated(self): -- return self._updated -- -- @property -- def cves(self): -- return self._cves -- -- @cves.setter -- def cves(self, advisory_cves): -- self._cves = advisory_cves -- -- @property -- def hotpatches(self): -- return self._hotpatches -- -- def add_hotpatch(self, hotpatch: Hotpatch): -- self._hotpatches.append(hotpatch) -diff --git a/hotpatch/hot_updateinfo.py b/hotpatch/hot_updateinfo.py -deleted file mode 100644 -index 3e86721..0000000 ---- a/hotpatch/hot_updateinfo.py -+++ /dev/null -@@ -1,613 +0,0 @@ --#!/usr/bin/python3 --# ****************************************************************************** --# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. --# licensed under the Mulan PSL v2. --# You can use this software according to the terms and conditions of the Mulan PSL v2. --# You may obtain a copy of Mulan PSL v2 at: --# http://license.coscl.org.cn/MulanPSL2 --# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR --# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR --# PURPOSE. --# See the Mulan PSL v2 for more details. --# ******************************************************************************/ --import dnf --import hawkey --from dnf.i18n import _ --from dnf.cli.commands.updateinfo import UpdateInfoCommand --from dataclasses import dataclass --from .updateinfo_parse import HotpatchUpdateInfo --from .version import Versions --from .baseclass import Hotpatch -- -- --@dataclass --class DisplayItem: -- """ -- Class for storing the formatting parameters and display lines. -- -- idw(int): the width of 'cve_id' -- tiw(int): the width of 'adv_type' -- ciw(int): the width of 'coldpatch' -- display_lines(set): { -- (cve_id, adv_type, coldpatch, hotpatch), -- } -- """ -- -- idw: int -- tiw: int -- ciw: int -- display_lines: set -- -- --@dnf.plugin.register_command --class HotUpdateinfoCommand(dnf.cli.Command): -- CVE_ID_INDEX = 0 -- ADV_SEVERITY_INDEX = 1 -- COLDPATCH_INDEX = 2 -- HOTPATCH_INDEX = 3 -- -- aliases = ['hot-updateinfo'] -- summary = _('show hotpatch updateinfo') -- -- def __init__(self, cli): -- """ -- Initialize the command -- """ -- super(HotUpdateinfoCommand, self).__init__(cli) -- -- @staticmethod -- def set_argparser(parser): -- spec_action_cmds = ['list'] -- parser.add_argument('spec_action', nargs=1, choices=spec_action_cmds, help=_('show updateinfo list')) -- -- with_cve_cmds = ['cve', 'cves'] -- parser.add_argument('with_cve', nargs=1, choices=with_cve_cmds, help=_('show cves')) -- -- availability = parser.add_mutually_exclusive_group() -- availability.add_argument( -- "--available", -- dest="availability", -- const='available', -- action='store_const', -- help=_("cves about newer versions of installed packages (default)"), -- ) -- availability.add_argument( -- "--installed", -- dest="availability", -- const='installed', -- action='store_const', -- help=_("cves about equal and older versions of installed packages"), -- ) -- -- def configure(self): -- demands = self.cli.demands -- demands.sack_activation = True -- demands.available_repos = True -- -- self.filter_cves = self.opts.cves if self.opts.cves else None -- -- def run(self): -- self.hp_hawkey = HotpatchUpdateInfo(self.cli.base, self.cli) -- -- if self.opts.spec_action and self.opts.spec_action[0] == 'list' and self.opts.with_cve: -- self.display() -- -- def get_mapping_nevra_cve(self) -> dict: -- """ -- Get cve nevra mapping based on the UpdateInfoCommand of 'dnf updateinfo list cves'. -- -- Returns: -- dict: to collect cve and cold patch updateinfo information -- { -- (name-version-release.arch, advisory.updated): { -- cve_id: (advisory.type, advisory.severity) -- } -- } -- """ -- -- apkg_adv_insts = self.get_apkg_adv_insts() -- -- mapping_nevra_cve = dict() -- for apkg, advisory, _ in apkg_adv_insts: -- nevra = (apkg.name, apkg.evr, apkg.arch) -- for ref in advisory.references: -- if ref.type != hawkey.REFERENCE_CVE: -- continue -- mapping_nevra_cve.setdefault((nevra, advisory.updated), dict())[ref.id] = ( -- advisory.type, -- advisory.severity, -- ) -- return mapping_nevra_cve -- -- def get_apkg_adv_insts(self): -- """ -- Configure UpdateInfoCommand with 'dnf updateinfo list cves --available' (default) or -- 'dnf updateinfo list cves --installed' according to the args, and get available package, advisory -- and package installation information. -- -- Returns: -- generator: generator for (available package, advisory, package installation -- information) -- """ -- updateinfo = UpdateInfoCommand(self.cli) -- updateinfo.opts = self.opts -- -- updateinfo.opts.spec_action = 'list' -- updateinfo.opts.with_cve = True -- updateinfo.opts.spec = '*' -- updateinfo.opts._advisory_types = set() -- updateinfo.opts.availability = self.opts.availability -- self.updateinfo = updateinfo -- -- if self.updateinfo.opts.availability == 'installed': -- apkg_adv_insts = updateinfo.installed_apkg_adv_insts(updateinfo.opts.spec) -- else: -- apkg_adv_insts = updateinfo.available_apkg_adv_insts(updateinfo.opts.spec) -- -- return apkg_adv_insts -- -- def get_fixed_cve_id_and_hotpatch_require_info(self, fixed_cve_id_and_hotpatch: set): -- """ -- Get fixed cve id and hotpatch require package information. -- -- Args: -- fixed_cve_id_and_hotpatch(set): -- e.g. -- { -- ('CVE-2023-1111', Hotpatch) -- } -- -- Returns: -- set: -- e.g. -- { -- ('CVE-2023-1111', 'redis-6.2.5-1') -- ('CVE-2023-1112', 'redis-6.2.5-1,redis-cli-6.2.5-1') -- } -- """ -- fixed_cve_id_and_hotpatch_require_info = set() -- for fixed_cve_id, fixed_hotpatch in fixed_cve_id_and_hotpatch: -- fixed_cve_id_and_hotpatch_require_info.add((fixed_cve_id, fixed_hotpatch.required_pkgs_str)) -- return fixed_cve_id_and_hotpatch_require_info -- -- def get_iterated_cve_id_and_hotpatch_require_info(self, iterated_cve_id_and_hotpatch: set): -- """ -- Get iterated cve id and hotpatch require package name information. -- -- Args: -- iterated_cve_id_and_hotpatch(set): -- e.g. -- { -- ('CVE-2023-1111', Hotpatch) -- } -- -- -- Returns: -- set -- e.g. -- { -- ('CVE-2023-1111', 'redis') -- ('CVE-2023-1112', 'redis,redis-cli') -- } -- """ -- iterated_cve_id_and_hotpatch_require_info = set() -- for iterated_cve_id, iterated_hotpatch in iterated_cve_id_and_hotpatch: -- iterated_cve_id_and_hotpatch_require_info.add((iterated_cve_id, iterated_hotpatch.required_pkgs_name_str)) -- return iterated_cve_id_and_hotpatch_require_info -- -- def check_is_in_fixed_cve_id_and_hotpatch_require_info( -- self, cve_id: str, hotpatch: Hotpatch, fixed_cve_id_and_hotpatch_require_info: set -- ): -- """ -- Check the (cve_id, hotpatch.required_pkgs_str) whether is in fixed_cve_id_and_hotpatch_require_info. -- If the (cve_id, hotpatch.required_pkgs_str) is in fixed_cve_id_and_hotpatch_require_info, the cve corresponding -- to the hotpatch should be fixed. -- -- Args: -- cve_id(str) -- hotpatch(Hotpatch) -- fixed_cve_id_and_hotpatch_require_info(set): -- e.g. -- { -- ('CVE-2023-1111', 'redis-6.2.5-1') -- ('CVE-2023-1112', 'redis-6.2.5-1') -- } -- -- Returns: -- bool: whether the cve corresponding to the hotpatch is fixed -- """ -- is_fixed = False -- if (cve_id, hotpatch.required_pkgs_str) in fixed_cve_id_and_hotpatch_require_info: -- is_fixed = True -- return is_fixed -- -- def check_is_in_iterated_cve_id_and_hotpatch_require_info( -- self, cve_id: str, hotpatch: Hotpatch, iterated_cve_id_and_hotpatch_require_info: set -- ): -- """ -- Check the (cve_id, hotpatch.required_pkgs_name_str) whether is in iterated_cve_id_and_hotpatch_require_info. -- If the (cve_id, hotpatch.required_pkgs_name_str) is in fixed_cve_id_and_hotpatch_require_info, the cve -- corresponding to the hotpatch should be iterated. -- -- Args: -- cve_id(str) -- hotpatch(Hotpatch) -- iterated_cve_id_and_hotpatch_require_info(set): -- e.g. -- { -- ('CVE-2023-1111', 'redis') -- ('CVE-2023-1112', 'redis') -- } -- -- Returns: -- bool: whether the cve corresponding to the hotpatch is iterated -- """ -- is_iterated = False -- if ( -- hotpatch.state in (self.hp_hawkey.UNINSTALLABLE, self.hp_hawkey.UNRELATED) -- and (cve_id, hotpatch.required_pkgs_name_str) in iterated_cve_id_and_hotpatch_require_info -- ): -- is_iterated = True -- return is_iterated -- -- def get_filtered_display_item( -- self, -- format_lines: set, -- fixed_cve_id_and_hotpatch: set, -- installable_cve_id_and_hotpatch: set, -- iterated_cve_id_and_hotpatch: set, -- ): -- """ -- Get filtered display item. -- -- Args: -- format_lines(set): -- { -- (cve_id, adv_type, coldpatch, hotpatch) -- } -- -- fixed_cve_id_and_hotpatch(set): -- e.g. -- { -- ('CVE-2023-1111', Hotpatch) -- } -- -- iterated_cve_id_and_hotpatch(set): -- e.g. -- { -- ('CVE-2023-1111', Hotpatch) -- } -- -- Returns: -- DisplayItem: for display -- """ -- if self.updateinfo.opts.availability == 'installed': -- display_item = self.get_installed_filtered_display_item( -- format_lines, fixed_cve_id_and_hotpatch, installable_cve_id_and_hotpatch -- ) -- return display_item -- display_item = self.get_available_filtered_display_item( -- format_lines, fixed_cve_id_and_hotpatch, iterated_cve_id_and_hotpatch -- ) -- return display_item -- -- def get_installed_filtered_display_item( -- self, format_lines: set, fixed_cve_id_and_hotpatch: set, installable_cve_id_and_hotpatch: set -- ): -- """ -- Get filtered display item by removing installable cve id and hotpatch, and removing iterated cve id -- and hotpatch. For hotpatch, only show ones which have been installed and been actived/accepted in -- syscare. -- -- Args: -- format_lines(set): -- { -- (cve_id, adv_type, coldpatch, hotpatch) -- } -- -- installable_cve_id_and_hotpatch(set): -- e.g. -- { -- ('CVE-2023-1111', Hotpatch) -- } -- -- Returns: -- DisplayItem: for display -- -- """ -- display_lines = set() -- -- # calculate the width of each column -- idw = tiw = ciw = 0 -- for format_line in format_lines: -- cve_id, adv_type, coldpatch, hotpatch = ( -- format_line[self.CVE_ID_INDEX], -- format_line[self.ADV_SEVERITY_INDEX], -- format_line[self.COLDPATCH_INDEX], -- format_line[self.HOTPATCH_INDEX], -- ) -- if self.filter_cves is not None and cve_id not in self.filter_cves: -- continue -- if (cve_id, hotpatch) in installable_cve_id_and_hotpatch: -- if coldpatch == '-': -- continue -- else: -- hotpatch = '-' -- -- if isinstance(hotpatch, Hotpatch): -- if (cve_id, hotpatch) in fixed_cve_id_and_hotpatch or hotpatch.state == self.hp_hawkey.INSTALLED: -- hotpatch = hotpatch.nevra -- elif hotpatch.state in (self.hp_hawkey.UNINSTALLABLE, self.hp_hawkey.UNRELATED): -- hotpatch = '-' -- -- if coldpatch == '-' and hotpatch == '-': -- continue -- -- idw = max(idw, len(cve_id)) -- tiw = max(tiw, len(adv_type)) -- ciw = max(ciw, len(coldpatch)) -- display_lines.add((cve_id, adv_type, coldpatch, hotpatch)) -- -- display_item = DisplayItem(idw=idw, tiw=tiw, ciw=ciw, display_lines=display_lines) -- return display_item -- -- def get_available_filtered_display_item( -- self, format_lines: set, fixed_cve_id_and_hotpatch: set, iterated_cve_id_and_hotpatch: set -- ): -- """ -- Get filtered display item by removing fixed cve id and hotpatch, and removing iterated cve id -- and hotpatch. -- -- Args: -- format_lines(set): -- { -- (cve_id, adv_type, coldpatch, hotpatch) -- } -- -- fixed_cve_id_and_hotpatch(set): -- e.g. -- { -- ('CVE-2023-1111', Hotpatch) -- } -- -- iterated_cve_id_and_hotpatch(set): -- e.g. -- { -- ('CVE-2023-1111', Hotpatch) -- } -- -- Returns: -- DisplayItem: for display -- """ -- display_lines = set() -- -- fixed_cve_id_and_hotpatch_require_info = self.get_fixed_cve_id_and_hotpatch_require_info( -- fixed_cve_id_and_hotpatch -- ) -- iterated_cve_id_and_hotpatch_require_info = self.get_iterated_cve_id_and_hotpatch_require_info( -- iterated_cve_id_and_hotpatch -- ) -- -- # calculate the width of each column -- idw = tiw = ciw = 0 -- for format_line in format_lines: -- cve_id, adv_type, coldpatch, hotpatch = ( -- format_line[self.CVE_ID_INDEX], -- format_line[self.ADV_SEVERITY_INDEX], -- format_line[self.COLDPATCH_INDEX], -- format_line[self.HOTPATCH_INDEX], -- ) -- if self.filter_cves is not None and cve_id not in self.filter_cves: -- continue -- if (cve_id, hotpatch) in fixed_cve_id_and_hotpatch: -- continue -- -- if isinstance(hotpatch, Hotpatch): -- if self.check_is_in_iterated_cve_id_and_hotpatch_require_info( -- cve_id, hotpatch, iterated_cve_id_and_hotpatch_require_info -- ): -- continue -- if self.check_is_in_fixed_cve_id_and_hotpatch_require_info( -- cve_id, hotpatch, fixed_cve_id_and_hotpatch_require_info -- ): -- continue -- # format hotpatch -- if hotpatch.state == self.hp_hawkey.INSTALLABLE: -- hotpatch = hotpatch.nevra -- elif hotpatch.state == self.hp_hawkey.UNINSTALLABLE: -- hotpatch = '-' -- elif hotpatch.state == self.hp_hawkey.UNRELATED and coldpatch == '-': -- continue -- elif hotpatch.state == self.hp_hawkey.UNRELATED: -- hotpatch = '-' -- -- idw = max(idw, len(cve_id)) -- tiw = max(tiw, len(adv_type)) -- ciw = max(ciw, len(coldpatch)) -- display_lines.add((cve_id, adv_type, coldpatch, hotpatch)) -- -- display_item = DisplayItem(idw=idw, tiw=tiw, ciw=ciw, display_lines=display_lines) -- return display_item -- -- def append_fixed_cve_id_and_hotpatch(self, fixed_cve_id_and_hotpatch: set): -- """ -- Append fixed cve id and hotpatch in fixed_cve_id_and_hotpatch. The ACC hotpatch that are less or equal -- to the highest ACC actived version-release for the same target required package, is considered to be -- fixed. -- -- Args: -- fixed_cve_id_and_hotpatch(set) -- e.g. -- { -- ('CVE-2023-2221', Hotpatch) -- } -- Returns: -- set: -- e.g. -- { -- ('CVE-2023-2221', Hotpatch), -- ('CVE-2023-1111', Hotpatch) -- } -- -- """ -- versions = Versions() -- # {hotpatch_required_pkgs_str: version-release} -- hotpatch_vere_mapping = dict() -- for _, fixed_hotpatch in fixed_cve_id_and_hotpatch: -- # get the highest version-release for each target required package -- required_pkgs_str = fixed_hotpatch.required_pkgs_str -- current_vere = "%s-%s" % (fixed_hotpatch.version, fixed_hotpatch.release) -- if fixed_hotpatch.hotpatch_name != "ACC": -- continue -- if required_pkgs_str not in hotpatch_vere_mapping: -- hotpatch_vere_mapping[required_pkgs_str] = current_vere -- elif versions.larger_than(current_vere, hotpatch_vere_mapping[required_pkgs_str]): -- hotpatch_vere_mapping[required_pkgs_str] = current_vere -- -- # get all hot hotpatches that are less or equal to the highest version-release, and record the cves -- # which they fix -- for required_pkgs_str, actived_vere in hotpatch_vere_mapping.items(): -- all_hotpatches = self.hp_hawkey._hotpatch_required_pkg_info_str[required_pkgs_str] -- for cmped_vere, hotpatch in all_hotpatches: -- if hotpatch.hotpatch_name != "ACC": -- continue -- if not versions.larger_than(actived_vere, cmped_vere): -- continue -- for cve_id in hotpatch.cves: -- fixed_cve_id_and_hotpatch.add((cve_id, hotpatch)) -- return fixed_cve_id_and_hotpatch -- -- def get_formatting_parameters_and_display_lines(self): -- """ -- Append hotpatch information according to the output of 'dnf updateinfo list cves' -- -- Returns: -- DisplayItem: for display -- """ -- -- def type2label(updateinfo, typ, sev): -- if typ == hawkey.ADVISORY_SECURITY: -- return updateinfo.SECURITY2LABEL.get(sev, _('Unknown/Sec.')) -- else: -- return updateinfo.TYPE2LABEL.get(typ, _('unknown')) -- -- mapping_nevra_cve = self.get_mapping_nevra_cve() -- echo_lines = set() -- fixed_cve_id_and_hotpatch = set() -- installable_cve_id_and_hotpatch = set() -- uninstallable_cve_id_and_hotpatch = set() -- iterated_cve_id_and_hotpatch = set() -- -- for ((nevra), aupdated), id2type in sorted(mapping_nevra_cve.items(), key=lambda x: x[0]): -- pkg_name, pkg_evr, pkg_arch = nevra -- coldpatch = '%s-%s.%s' % (pkg_name, pkg_evr, pkg_arch) -- for cve_id, atypesev in id2type.items(): -- label = type2label(self.updateinfo, *atypesev) -- # if there is no hotpatch corresponding to the cve id, mark hotpatch as '-' -- if cve_id not in self.hp_hawkey.hotpatch_cves or not self.hp_hawkey.hotpatch_cves[cve_id].hotpatches: -- echo_line = (cve_id, label, coldpatch, '-') -- echo_lines.add(echo_line) -- continue -- -- for hotpatch in self.hp_hawkey.hotpatch_cves[cve_id].hotpatches: -- # if cold patch name does not match with hotpatch required pkg name (target fix pkgs) -- if pkg_name not in hotpatch._required_pkgs_info.keys(): -- echo_line = (cve_id, label, coldpatch, '-') -- echo_lines.add(echo_line) -- continue -- if hotpatch.state == self.hp_hawkey.INSTALLED: -- # record the fixed cve_id and hotpatch, filter the packages that are lower than -- # the currently installed package for solving the same cve and target required -- # pakcage -- fixed_cve_id_and_hotpatch.add((cve_id, hotpatch)) -- iterated_cve_id_and_hotpatch.add((cve_id, hotpatch)) -- elif hotpatch.state == self.hp_hawkey.INSTALLABLE: -- # record the installable cve_id and hotpatch, filter the packages that are bigger -- # than the currently installed package -- installable_cve_id_and_hotpatch.add((cve_id, hotpatch)) -- iterated_cve_id_and_hotpatch.add((cve_id, hotpatch)) -- elif hotpatch.state == self.hp_hawkey.UNINSTALLABLE: -- uninstallable_cve_id_and_hotpatch.add((cve_id, hotpatch)) -- echo_line = (cve_id, label, coldpatch, hotpatch) -- echo_lines.add(echo_line) -- -- self.add_untraversed_hotpatches( -- echo_lines, -- fixed_cve_id_and_hotpatch, -- installable_cve_id_and_hotpatch, -- uninstallable_cve_id_and_hotpatch, -- iterated_cve_id_and_hotpatch, -- ) -- # lower version ACC hotpatch of fixed ACC hotpatch, is also considered to be fixed -- fixed_cve_id_and_hotpatch = self.append_fixed_cve_id_and_hotpatch(fixed_cve_id_and_hotpatch) -- # remove fixed cve and hotpatch from installable_cve_id_and_hotpatch -- installable_cve_id_and_hotpatch = installable_cve_id_and_hotpatch.difference(fixed_cve_id_and_hotpatch) -- display_item = self.get_filtered_display_item( -- echo_lines, fixed_cve_id_and_hotpatch, installable_cve_id_and_hotpatch, iterated_cve_id_and_hotpatch -- ) -- -- return display_item -- -- def add_untraversed_hotpatches( -- self, -- echo_lines: set, -- fixed_cve_id_and_hotpatch: set, -- installable_cve_id_and_hotpatch: set, -- uninstallable_cve_id_and_hotpatch: set, -- iterated_cve_id_and_hotpatch: set, -- ): -- """ -- Add the echo lines, which are only with hotpatch but no coldpatch. And append -- fixed_cve_id_and_hotpatch and iterated_cve_id_and_hotpatch. -- -- Args: -- echo_lines(set) -- fixed_cve_id_and_hotpatch(set) -- installable_cve_id_and_hotpatch(set) -- uninstallable_cve_id_and_hotpatch(set) -- iterated_cve_id_and_hotpatch(set) -- """ -- for cve_id, cve in self.hp_hawkey.hotpatch_cves.items(): -- for hotpatch in cve.hotpatches: -- if hotpatch.state == self.hp_hawkey.UNRELATED: -- continue -- if (cve_id, hotpatch) in iterated_cve_id_and_hotpatch: -- continue -- if (cve_id, hotpatch) in uninstallable_cve_id_and_hotpatch: -- continue -- if hotpatch.state == self.hp_hawkey.INSTALLED: -- fixed_cve_id_and_hotpatch.add((cve_id, hotpatch)) -- iterated_cve_id_and_hotpatch.add((cve_id, hotpatch)) -- elif hotpatch.state == self.hp_hawkey.INSTALLABLE: -- installable_cve_id_and_hotpatch.add((cve_id, hotpatch)) -- iterated_cve_id_and_hotpatch.add((cve_id, hotpatch)) -- echo_line = (cve_id, hotpatch.advisory.severity + '/Sec.', '-', hotpatch) -- echo_lines.add(echo_line) -- -- def display(self): -- """ -- Print the display lines according to the formatting parameters. -- """ -- display_item = self.get_formatting_parameters_and_display_lines() -- idw, tiw, ciw, display_lines = display_item.idw, display_item.tiw, display_item.ciw, display_item.display_lines -- for display_line in sorted( -- display_lines, -- key=lambda x: ( -- x[self.COLDPATCH_INDEX], -- x[self.HOTPATCH_INDEX], -- x[self.CVE_ID_INDEX], -- x[self.ADV_SEVERITY_INDEX], -- ), -- ): -- print( -- '%-*s %-*s %-*s %s' -- % ( -- idw, -- display_line[self.CVE_ID_INDEX], -- tiw, -- display_line[self.ADV_SEVERITY_INDEX], -- ciw, -- display_line[self.COLDPATCH_INDEX], -- display_line[self.HOTPATCH_INDEX], -- ) -- ) -diff --git a/hotpatch/hotpatch.py b/hotpatch/hotpatch.py -deleted file mode 100644 -index 0664edf..0000000 ---- a/hotpatch/hotpatch.py -+++ /dev/null -@@ -1,165 +0,0 @@ --#!/usr/bin/python3 --# ****************************************************************************** --# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. --# licensed under the Mulan PSL v2. --# You can use this software according to the terms and conditions of the Mulan PSL v2. --# You may obtain a copy of Mulan PSL v2 at: --# http://license.coscl.org.cn/MulanPSL2 --# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR --# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR --# PURPOSE. --# See the Mulan PSL v2 for more details. --# ******************************************************************************/ --import dnf --from dnfpluginscore import _, logger --from .syscare import Syscare --from .updateinfo_parse import HotpatchUpdateInfo -- -- --@dnf.plugin.register_command --class HotpatchCommand(dnf.cli.Command): -- CVE_ID_INDEX = 0 -- Name_INDEX = 1 -- STATUS_INDEX = 2 -- -- aliases = ['hotpatch'] -- summary = _('show hotpatch info') -- syscare = Syscare() -- -- def __init__(self, cli): -- """ -- Initialize the command -- """ -- super(HotpatchCommand, self).__init__(cli) -- -- @staticmethod -- def set_argparser(parser): -- output_format = parser.add_mutually_exclusive_group() -- output_format.add_argument( -- '--list', nargs='?', type=str, default='', choices=['cve', 'cves'], help=_('show list of hotpatch') -- ) -- output_format.add_argument( -- '--apply', type=str, default=None, dest='apply_name', nargs=1, help=_('apply hotpatch') -- ) -- output_format.add_argument( -- '--remove', type=str, default=None, dest='remove_name', nargs=1, help=_('remove hotpatch') -- ) -- output_format.add_argument( -- '--active', type=str, default=None, dest='active_name', nargs=1, help=_('active hotpatch') -- ) -- output_format.add_argument( -- '--deactive', type=str, default=None, dest='deactive_name', nargs=1, help=_('deactive hotpatch') -- ) -- output_format.add_argument( -- '--accept', type=str, default=None, dest='accept_name', nargs=1, help=_('accept hotpatch') -- ) -- -- def configure(self): -- demands = self.cli.demands -- demands.sack_activation = True -- demands.available_repos = True -- -- self.filter_cves = self.opts.cves if self.opts.cves else None -- -- def run(self): -- self.hp_hawkey = HotpatchUpdateInfo(self.cli.base, self.cli) -- if self.opts.list != '': -- self.display() -- if self.opts.apply_name: -- self.operate_hot_patches(self.opts.apply_name, "apply", self.syscare.apply) -- if self.opts.remove_name: -- self.operate_hot_patches(self.opts.remove_name, "remove", self.syscare.remove) -- if self.opts.active_name: -- self.operate_hot_patches(self.opts.active_name, "active", self.syscare.active) -- if self.opts.deactive_name: -- self.operate_hot_patches(self.opts.deactive_name, "deactive", self.syscare.deactive) -- if self.opts.accept_name: -- self.operate_hot_patches(self.opts.accept_name, "accept", self.syscare.accept) -- -- def _filter_and_format_list_output(self, echo_lines: list): -- """ -- Only show specific cve information if cve id is given, and format the output. -- """ -- format_lines = [] -- title = ['CVE-id', 'base-pkg/hotpatch', 'status'] -- idw = len(title[0]) -- naw = len(title[1]) -- for echo_line in echo_lines: -- cve_id, name, status = ( -- echo_line[self.CVE_ID_INDEX], -- echo_line[self.Name_INDEX], -- echo_line[self.STATUS_INDEX], -- ) -- if self.filter_cves is not None and cve_id not in self.filter_cves: -- continue -- idw = max(idw, len(cve_id)) -- naw = max(naw, len(name)) -- format_lines.append([cve_id, name, status]) -- -- if not format_lines: -- return -- -- # print title -- if self.opts.list in ['cve', 'cves']: -- print( -- '%-*s %-*s %s' % (idw, title[self.CVE_ID_INDEX], naw, title[self.Name_INDEX], title[self.STATUS_INDEX]) -- ) -- else: -- print('%-*s %s' % (naw, title[self.Name_INDEX], title[self.STATUS_INDEX])) -- -- format_lines.sort(key=lambda x: (x[self.Name_INDEX], x[self.CVE_ID_INDEX])) -- -- if self.opts.list in ['cve', 'cves']: -- for cve_id, name, status in format_lines: -- print('%-*s %-*s %s' % (idw, cve_id, naw, name, status)) -- else: -- new_format_lines = [(name, status) for _, name, status in format_lines] -- deduplicated_format_lines = list(set(new_format_lines)) -- deduplicated_format_lines.sort(key=new_format_lines.index) -- for name, status in deduplicated_format_lines: -- print('%-*s %s' % (naw, name, status)) -- -- def display(self): -- """ -- Display hotpatch information. -- -- e.g. -- For the command of 'dnf hotpatch --list', the echo_lines is [[base-pkg/hotpatch, status], ...] -- For the command of 'dnf hotpatch --list cve', the echo_lines is [[cve_id, base-pkg/hotpatch, status], ...] -- """ -- -- hotpatch_cves = self.hp_hawkey.hotpatch_cves -- echo_lines = [] -- for cve_id in hotpatch_cves.keys(): -- for hotpatch in hotpatch_cves[cve_id].hotpatches: -- for name, status in self.hp_hawkey._hotpatch_state.items(): -- if hotpatch.syscare_subname not in name: -- continue -- echo_line = [cve_id, name, status] -- echo_lines.append(echo_line) -- self._filter_and_format_list_output(echo_lines) -- -- def operate_hot_patches(self, target_patch: list, operate, func) -> None: -- """ -- operate hotpatch using syscare command -- Args: -- target_patch: type:list,e.g.:['redis-6.2.5-1/HP2'] -- -- Returns: -- None -- """ -- if len(target_patch) != 1: -- logger.error(_("using dnf hotpatch --%s wrong!"), operate) -- return -- target_patch = target_patch[0] -- logger.info(_("Gonna %s this hot patch: %s"), operate, self.base.output.term.bold(target_patch)) -- -- output, status = func(target_patch) -- if status: -- logger.error( -- _("%s hot patch '%s' failed, remain original status."), -- operate, -- self.base.output.term.bold(target_patch), -- ) -- else: -- logger.info(_("%s hot patch '%s' succeed"), operate, self.base.output.term.bold(target_patch)) -diff --git a/hotpatch/hotupgrade.py b/hotpatch/hotupgrade.py -deleted file mode 100644 -index f61e37f..0000000 ---- a/hotpatch/hotupgrade.py -+++ /dev/null -@@ -1,482 +0,0 @@ --#!/usr/bin/python3 --# ****************************************************************************** --# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. --# licensed under the Mulan PSL v2. --# You can use this software according to the terms and conditions of the Mulan PSL v2. --# You may obtain a copy of Mulan PSL v2 at: --# http://license.coscl.org.cn/MulanPSL2 --# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR --# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR --# PURPOSE. --# See the Mulan PSL v2 for more details. --# ******************************************************************************/ --from __future__ import print_function -- --from time import sleep --import dnf.base --import dnf.exceptions --import hawkey --from dnf.cli import commands --from dnf.cli.option_parser import OptionParser -- --# from dnf.cli.output import Output --from dnfpluginscore import _, logger -- --from .hot_updateinfo import HotUpdateinfoCommand --from .updateinfo_parse import HotpatchUpdateInfo --from .syscare import Syscare --from .version import Versions -- --EMPTY_TAG = "-" -- -- --@dnf.plugin.register_command --class HotupgradeCommand(dnf.cli.Command): -- aliases = ("hotupgrade",) -- summary = "Hot upgrade package using hot patch." -- usage = "" -- syscare = Syscare() -- hp_list = [] -- -- @staticmethod -- def set_argparser(parser): -- parser.add_argument( -- 'packages', -- nargs='*', -- help=_('Package to upgrade'), -- action=OptionParser.ParseSpecGroupFileCallback, -- metavar=_('PACKAGE'), -- ) -- parser.add_argument( -- "--takeover", default=False, action='store_true', help=_('kernel cold patch takeover operation') -- ) -- -- def configure(self): -- """Verify that conditions are met so that this command can run. -- These include that there are enabled repositories with gpg -- keys, and that this command is being run by the root user. -- """ -- demands = self.cli.demands -- demands.sack_activation = True -- demands.available_repos = True -- demands.resolving = True -- demands.root_user = True -- -- commands._checkGPGKey(self.base, self.cli) -- if not self.opts.filenames: -- commands._checkEnabledRepo(self.base) -- -- def run(self): -- if self.opts.pkg_specs: -- self.hp_list = self.opts.pkg_specs -- elif self.opts.cves or self.opts.advisory: -- cve_pkgs = self.get_hotpatch_based_on_cve(self.opts.cves) -- advisory_pkgs = self.get_hotpatch_based_on_advisory(self.opts.advisory) -- self.hp_list = cve_pkgs + advisory_pkgs -- -- else: -- self.hp_list = self.get_hotpatch_of_all_cve() -- if self.hp_list: -- logger.info(_("Gonna apply all available hot patches: %s"), self.hp_list) -- -- available_hp_dict = self._get_available_hotpatches(self.hp_list) -- if not available_hp_dict: -- logger.info(_('No hot patches marked for install.')) -- return -- -- applied_old_patches = self._get_applied_old_patch(list(available_hp_dict.values())) -- if applied_old_patches: -- self._remove_hot_patches(applied_old_patches) -- else: -- self.syscare.save() -- success = self._install_rpm_pkg(list(available_hp_dict.keys())) -- if not success: -- logger.info(_("Error: Install hot patch failed, try to rollback.")) -- output, status = self.syscare.restore() -- if status: -- raise dnf.exceptions.Error(_('Roll back failed.')) -- logger.info(_("Roll back succeed.")) -- return -- -- if self.opts.takeover: -- self.takeover_operation() -- return -- -- def run_transaction(self) -> None: -- """ -- apply hot patches -- Returns: -- None -- """ -- # syscare need a little bit time to process the installed hot patch -- sleep(0.5) -- for hp in self.hp_list: -- self._apply_hp(hp) -- if self.opts.takeover and self.is_need_accept_kernel_hp: -- self._accept_kernel_hp(hp) -- -- def _apply_hp(self, hp_full_name): -- pkg_info = self._parse_hp_name(hp_full_name) -- hp_subname = self._get_hp_subname_for_syscare(pkg_info) -- output, status = self.syscare.apply(hp_subname) -- if status: -- logger.info(_('Apply hot patch failed: %s.'), hp_subname) -- else: -- logger.info(_('Apply hot patch succeed: %s.'), hp_subname) -- -- @staticmethod -- def _get_hp_subname_for_syscare(pkg_info: dict) -> str: -- """ -- get hotpatch's subname for syscare command. e.g. redis-1-1/ACC-1-1 -- Args: -- pkg_info: out put of _parse_hp_name. -- -- Returns: -- str -- """ -- hp_subname = ( -- "-".join([pkg_info["target_name"], pkg_info["target_version"], pkg_info["target_release"]]) -- + '/' -- + "-".join([pkg_info["hp_name"], pkg_info["hp_version"], pkg_info["hp_release"]]) -- ) -- return hp_subname -- -- def _get_available_hotpatches(self, pkg_specs: list) -> dict: -- """ -- check two conditions: -- 1. the hot patch rpm package exists in repositories -- 2. the hot patch's target package with specific version and release already installed -- Args: -- pkg_specs: full names of hot patches' rpm packages -- -- Returns: -- dict: key is available hot patches' full name, -- value is hot patches' operate name. e.g. kernel-5.10.0-60.66.0.91.oe2203/ACC-1-1 -- """ -- hp_map = {} -- installed_packages = self.base.sack.query().installed() -- for pkg_spec in set(pkg_specs): -- query = self.base.sack.query() -- # check the package exist in repo or not -- subj = dnf.subject.Subject(pkg_spec) -- parsed_nevras = subj.get_nevra_possibilities(forms=[hawkey.FORM_NEVRA]) -- if len(parsed_nevras) != 1: -- logger.info(_('Cannot parse NEVRA for package "{nevra}"').format(nevra=pkg_spec)) -- continue -- -- parsed_nevra = parsed_nevras[0] -- available_hp = query.available().filter( -- name=parsed_nevra.name, -- version=parsed_nevra.version, -- release=parsed_nevra.release, -- arch=parsed_nevra.arch, -- ) -- if not available_hp: -- logger.info(_('No match for argument: %s'), self.base.output.term.bold(pkg_spec)) -- continue -- -- # check the hot patch's target package installed or not -- pkg_info = self._parse_hp_name(pkg_spec) -- installed_pkg = installed_packages.filter( -- name=pkg_info["target_name"], version=pkg_info["target_version"], release=pkg_info["target_release"] -- ).run() -- if not installed_pkg: -- logger.info( -- _("The hot patch's target package is not installed: %s"), self.base.output.term.bold(pkg_spec) -- ) -- continue -- -- if len(installed_pkg) != 1: -- logger.info( -- _("The hot patch '%s' has multiple target packages, please check."), -- self.base.output.term.bold(pkg_spec), -- ) -- continue -- hp_subname = self._get_hp_subname_for_syscare(pkg_info) -- hp_map[pkg_spec] = hp_subname -- return hp_map -- -- @staticmethod -- def _get_applied_old_patch(available_hp_list: list) -> list: -- """ -- get targets' applied accumulative hot patches. -- User can install and apply multiple sgl (single) hot patches because the rpm name is different, -- but for acc (accumulative) hot patch, user can only install one for a specific target binary rpm. -- Args: -- available_hp_list: e.g. ['redis-1.0-1/ACC-1-1', 'redis-1.0-1/SGL_CVE_2022_1-1-1'] -- -- Returns: -- list: applied hot patches. e.g. ['redis-1.0-1/ACC-1-1'] -- """ -- hotpatch_set = set() -- hps_info = Syscare.list() -- for hp_info in hps_info: -- # hp_info[Name] is the middle column of syscare list. format: {target_rpm_name}/{hp_name}/{binary_file} -- # a hotpatch is mapped to a target binary rpm, and may affect multiple binary executable binary files -- # e.g. for hotpatch patch-redis-1-1-ACC-1-1.x86_64.rpm, it may provide 2 sub hotpatches in syscare list, -- # and SGL hotpatches may be installed at the same time -- # redis-1-1/ACC-1-1/redis -- # redis-1-1/ACC-1-1/redis-cli -- # redis-1-1/SGL_CVE_2022_1-1-1/redis -- # redis-1-1/SGL_CVE_2022_2-1-1/redis -- target, hp_name, binary_file = hp_info["Name"].split('/') -- hotpatch = target + '/' + hp_name -- # right now, if part of the hotpatch (for different binary file) is applied, -- # we consider the hotpatch is applied -- if hotpatch in available_hp_list and hp_info["Status"] != "NOT-APPLIED": -- logger.info( -- _("The hotpatch '%s' already has a '%s' sub hotpatch of binary file '%s'"), -- hotpatch, -- hp_info["Status"], -- binary_file, -- ) -- if hotpatch not in hotpatch_set: -- hotpatch_set.add(hotpatch) -- return list(hotpatch_set) -- -- def _remove_hot_patches(self, applied_old_patches: list) -> None: -- # output = Output(self.base, dnf.conf.Conf()) -- logger.info(_("Gonna remove these hot patches: %s"), applied_old_patches) -- # remove_flag = output.userconfirm() -- # if not remove_flag: -- # raise dnf.exceptions.Error(_('Operation aborted.')) -- -- self.syscare.save() -- for hp_name in applied_old_patches: -- logger.info(_("Remove hot patch %s."), hp_name) -- output, status = self.syscare.remove(hp_name) -- if status: -- logger.info( -- _("Remove hot patch '%s' failed, roll back to original status."), -- self.base.output.term.bold(hp_name), -- ) -- output, status = self.syscare.restore() -- if status: -- raise dnf.exceptions.Error(_('Roll back failed.')) -- raise dnf.exceptions.Error(_('Roll back succeed.')) -- -- @staticmethod -- def _parse_hp_name(hp_filename: str) -> dict: -- """ -- parse hot patch's name, get target rpm's name, version, release and hp's name. -- Args: -- hp_filename: hot patch's name, in the format of -- 'patch-{pkg_name}-{pkg_version}-{pkg_release}-{patchname}-{patch_version}-{patch_release}' -- e.g. patch-kernel-5.10.0-60.66.0.91.oe2203-ACC-1-1.x86_64 -- patch-kernel-5.10.0-60.66.0.91.oe2203-SGL_CVE_2022_1-1-1.x86_64 -- pkg_name may have '-' in it, patch name cannot have '-'. -- Returns: -- dict: rpm info. {"target_name": "", "target_version": "", "target_release": "", "hp_name": "", -- "hp_version": "", "hp_release": ""} -- """ -- hp_filename_format = ( -- "patch-{pkg_name}-{pkg_version}-{pkg_release}-{patch_name}-" "{patch_version}-{patch_release}.{arch}" -- ) -- -- remove_suffix_filename = hp_filename.rsplit(".", 1)[0] -- splitted_hp_filename = remove_suffix_filename.split('-') -- try: -- rpm_info = { -- "target_release": splitted_hp_filename[-4], -- "target_version": splitted_hp_filename[-5], -- "target_name": "-".join(splitted_hp_filename[1:-5]), -- "hp_name": splitted_hp_filename[-3], -- "hp_version": splitted_hp_filename[-2], -- "hp_release": splitted_hp_filename[-1], -- } -- except IndexError as e: -- raise dnf.exceptions.Error( -- _( -- "Parse hot patch name failed. Please insert correct hot patch name " -- "with the format: \n %s" % hp_filename_format -- ) -- ) -- return rpm_info -- -- def _install_rpm_pkg(self, pkg_specs: list) -> bool: -- """ -- install rpm package -- Args: -- pkg_specs: rpm package's full name -- -- Returns: -- bool -- """ -- success = True -- for pkg_spec in pkg_specs: -- try: -- self.base.install(pkg_spec) -- except dnf.exceptions.MarkingError as e: -- logger.info(_('No match for argument: %s.'), self.base.output.term.bold(pkg_spec)) -- success = False -- return success -- -- def get_hotpatch_based_on_cve(self, cves: list) -> list: -- """ -- Get the hot patches corresponding to CVEs -- Args: -- cves: cve id list -- -- Returns: -- list: list of hot patches full name. e.g.["tmp2-tss-3.1.0-3.oe2203sp1"] -- """ -- updateinfo = HotpatchUpdateInfo(self.cli.base, self.cli) -- hp_list = [] -- cve_hp_dict = updateinfo.get_hotpatches_from_cve(cves) -- for cve, hp in cve_hp_dict.items(): -- if not hp: -- logger.info(_("The cve doesn't exist or cannot be fixed by hotpatch: %s"), cve) -- continue -- hp_list += hp -- return list(set(hp_list)) -- -- def get_hotpatch_based_on_advisory(self, advisories: list) -> list: -- """ -- Get the hot patches corresponding to advisories -- Args: -- advisories: advisory id list -- -- Returns: -- list: list of hot patches full name. e.g.["tmp2-tss-3.1.0-3.oe2203sp1"] -- """ -- updateinfo = HotpatchUpdateInfo(self.cli.base, self.cli) -- hp_list = [] -- advisory_hp_dict = updateinfo.get_hotpatches_from_advisories(advisories) -- for hp in advisory_hp_dict.values(): -- hp_list += hp -- return list(set(hp_list)) -- -- def get_hotpatch_of_all_cve(self) -> list: -- """ -- upgrade all exist cve using hot patches -- 1. find all cves when init HotpatchUpdateInfo -- 2. get the recommended hot patch for each cve -- 3. deduplication -- Returns: -- ['patch-redis-6.2.5-1-HP2-1-1.x86_64'] -- """ -- updateinfo = HotpatchUpdateInfo(self.cli.base, self.cli) -- cve_list = self.get_all_cve_which_can_be_fixed_by_hotpatch() -- hp_list = [] -- cve_hp_dict = updateinfo.get_hotpatches_from_cve(cve_list) -- for hp in cve_hp_dict.values(): -- if not hp: -- continue -- hp_list += hp -- return list(set(hp_list)) -- -- def get_all_cve_which_can_be_fixed_by_hotpatch(self) -> list: -- """ -- get all unfixed cve which can be fixed by hotpatch -- use command : dnf hot-updateinfo list cves -- Last metadata expiration check: 0:48:26 ago on 2023年06月01日 星期四 20时29分55秒. -- CVE-2023-3332 Low/Sec. - - -- CVE-2023-3331 Low/Sec. - - -- CVE-2023-1111 Important/Sec. - patch-redis-6.2.5-1-ACC-1-1.x86_64 -- CVE-2023-1111 Important/Sec. - patch-redis-cli-6.2.5-1-ACC-1-1.x86_64 -- -- Returns: list of unfixed cve. e.g.['CVE-2023-1111'] -- """ -- hp_hawkey = HotpatchUpdateInfo(self.cli.base, self.cli) -- hot_updateinfo = HotUpdateinfoCommand(self.cli) -- hot_updateinfo.opts = self.opts -- hot_updateinfo.hp_hawkey = hp_hawkey -- hot_updateinfo.filter_cves = None -- hot_updateinfo.opts.availability = 'available' -- all_cves = hot_updateinfo.get_formatting_parameters_and_display_lines() -- cve_set = set() -- for display_line in all_cves.display_lines: -- if display_line[3] != EMPTY_TAG: -- cve_set.add(display_line[0]) -- return list(cve_set) -- -- def takeover_operation(self): -- """ -- process takeover operation. -- """ -- kernel_coldpatch = self.get_target_installed_kernel_coldpatch_of_hotpatch() -- self.is_need_accept_kernel_hp = False -- if kernel_coldpatch: -- logger.info(_("Gonna takeover kernel cold patch: ['%s']" % kernel_coldpatch)) -- success = self._install_rpm_pkg([kernel_coldpatch]) -- if success: -- return -- logger.info(_("Takeover operation failed.")) -- else: -- logger.info(_('No kernel cold patch matched for takeover.')) -- self.is_need_accept_kernel_hp = True -- logger.info( -- _( -- 'To maintain the effectiveness of kernel hot patch after rebooting, gonna accept available kernel hot patch.' -- ) -- ) -- return -- -- def get_target_installed_kernel_coldpatch_of_hotpatch(self) -> str: -- """ -- get the highest kernel cold patch of hot patch in "dnf hot-updateinfo list cves", if the corresponding -- kernel cold patch exists. -- -- Returns: -- str: full name of cold patch. e.g. kernel-5.10.0-1.x86_64 -- """ -- hp_hawkey = HotpatchUpdateInfo(self.cli.base, self.cli) -- hot_updateinfo = HotUpdateinfoCommand(self.cli) -- hot_updateinfo.opts = self.opts -- hot_updateinfo.hp_hawkey = hp_hawkey -- hot_updateinfo.filter_cves = None -- hot_updateinfo.opts.availability = 'available' -- all_cves = hot_updateinfo.get_formatting_parameters_and_display_lines() -- kernel_highest_vere = "" -- target_kernel_coldpatch = None -- version = Versions() -- for display_line in all_cves.display_lines: -- if display_line[3] not in self.hp_list: -- continue -- kernel_pkg_spec = display_line[2] -- if kernel_pkg_spec == EMPTY_TAG: -- continue -- pkg_name, pkg_ver = self._get_pkg_name_and_ver(kernel_pkg_spec) -- if pkg_name != "kernel": -- continue -- if kernel_highest_vere == "": -- kernel_highest_vere = pkg_ver -- target_kernel_coldpatch = kernel_pkg_spec -- continue -- if version.larger_than(pkg_ver, kernel_highest_vere): -- kernel_highest_vere = pkg_ver -- target_kernel_coldpatch = kernel_pkg_spec -- -- return target_kernel_coldpatch -- -- def _get_pkg_name_and_ver(self, pkg_spec: str): -- """ -- get the "name" and "version-release" string of package. -- -- Args: -- str: pkg_specs: full name of cold patch. e.g. kernel-5.10.0-1.x86_64 -- """ -- subj = dnf.subject.Subject(pkg_spec) -- parsed_nevras = subj.get_nevra_possibilities(forms=[hawkey.FORM_NEVRA]) -- if len(parsed_nevras) != 1: -- logger.info(_('Cannot parse NEVRA for package "{nevra}"').format(nevra=pkg_spec)) -- return "" -- parsed_nevra = parsed_nevras[0] -- return parsed_nevra.name, "%s-%s" % (parsed_nevra.version, parsed_nevra.release) -- -- def _accept_kernel_hp(self, hp_full_name: str): -- """ -- accept kernel hot patch -- -- Args: -- str: hp_full_name: full name of hot patch. e.g. patch-kernel-5.10.0-1-ACC-1-1.x86_64 -- """ -- pkg_info = self._parse_hp_name(hp_full_name) -- if pkg_info['target_name'] != "kernel": -- return -- hp_subname = self._get_hp_subname_for_syscare(pkg_info) -- output, status = self.syscare.accept(hp_subname) -- if status: -- logger.info(_('Accept kernel hot patch failed: %s.'), hp_subname) -- else: -- logger.info(_('Accept kernel hot patch succeed: %s.'), hp_subname) -diff --git a/hotpatch/syscare.py b/hotpatch/syscare.py -deleted file mode 100644 -index a858ed4..0000000 ---- a/hotpatch/syscare.py -+++ /dev/null -@@ -1,116 +0,0 @@ --#!/usr/bin/python3 --# ****************************************************************************** --# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. --# licensed under the Mulan PSL v2. --# You can use this software according to the terms and conditions of the Mulan PSL v2. --# You may obtain a copy of Mulan PSL v2 at: --# http://license.coscl.org.cn/MulanPSL2 --# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR --# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR --# PURPOSE. --# See the Mulan PSL v2 for more details. --# ******************************************************************************/ --import subprocess --from typing import List -- --SUCCEED = 0 --FAIL = 255 -- -- --def cmd_output(cmd): -- try: -- result = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) -- result.wait() -- return result.stdout.read().decode('utf-8'), result.returncode -- except Exception as e: -- print("error: ", e) -- return str(e), FAIL -- -- --class Syscare: -- @classmethod -- def list(cls, condition=None) -> List[dict]: -- """ -- Target Name Status -- redis-6.2.5-1.oe2203 CVE-2021-23675 ACTIVED -- kernel-5.10.0-60.80.0.104.oe2203 modify-proc-version ACTIVED -- """ -- cmd = ["syscare", "list"] -- list_output, return_code = cmd_output(cmd) -- if return_code != SUCCEED: -- return [] -- -- content = list_output.split('\n') -- if len(content) <= 2: -- return [] -- -- header = content[0].split() -- result = [] -- for item in content[1:-1]: -- tmp = dict(zip(header, item.split())) -- if not condition or cls.judge(tmp, condition): -- result.append(tmp) -- return result -- -- @staticmethod -- def judge(content: dict, condition: dict): -- for key, value in condition.items(): -- if content.get(key) != value: -- return False -- return True -- -- @staticmethod -- def status(patch_name: str): -- cmd = ["syscare", "status", patch_name] -- output, return_code = cmd_output(cmd) -- -- return output, return_code -- -- @staticmethod -- def active(patch_name: str): -- cmd = ["syscare", "active", patch_name] -- output, return_code = cmd_output(cmd) -- -- return output, return_code -- -- @staticmethod -- def deactive(patch_name: str): -- cmd = ["syscare", "deactive", patch_name] -- output, return_code = cmd_output(cmd) -- -- return output, return_code -- -- @staticmethod -- def remove(patch_name: str): -- cmd = ["syscare", "remove", patch_name] -- output, return_code = cmd_output(cmd) -- -- return output, return_code -- -- @staticmethod -- def apply(patch_name: str): -- cmd = ["syscare", "apply", patch_name] -- output, return_code = cmd_output(cmd) -- -- return output, return_code -- -- @staticmethod -- def save(): -- cmd = ["syscare", "save"] -- output, return_code = cmd_output(cmd) -- -- return output, return_code -- -- @staticmethod -- def restore(): -- cmd = ["syscare", "restore"] -- output, return_code = cmd_output(cmd) -- -- return output, return_code -- -- @staticmethod -- def accept(patch_name: str): -- cmd = ["syscare", "accept", patch_name] -- output, return_code = cmd_output(cmd) -- -- return output, return_code -diff --git a/hotpatch/test_hotpatch.py b/hotpatch/test_hotpatch.py -deleted file mode 100644 -index e239c86..0000000 ---- a/hotpatch/test_hotpatch.py -+++ /dev/null -@@ -1,45 +0,0 @@ --#!/usr/bin/python3 --# ****************************************************************************** --# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. --# licensed under the Mulan PSL v2. --# You can use this software according to the terms and conditions of the Mulan PSL v2. --# You may obtain a copy of Mulan PSL v2 at: --# http://license.coscl.org.cn/MulanPSL2 --# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR --# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR --# PURPOSE. --# See the Mulan PSL v2 for more details. --# ******************************************************************************/ --import unittest --from unittest import mock -- --from .hotpatch import HotpatchCommand --from .syscare import SUCCEED, FAIL -- -- --class HotpatchTestCase(unittest.TestCase): -- def setUp(self) -> None: -- self.cli = mock.MagicMock() -- self.cmd = HotpatchCommand(self.cli) -- -- def test_operate_hot_patches_should_return_none(self): -- target_patch = ["patch1"] -- operate = mock.MagicMock() -- func = mock.MagicMock() -- func.return_value = ("", FAIL) -- self.assertIsNone(self.cmd.operate_hot_patches(target_patch, operate, func)) -- self.assertEqual(self.cmd.base.output.term.bold.call_args_list[1][0][0], target_patch[0]) -- -- func.return_value = ("", SUCCEED) -- self.assertIsNone(self.cmd.operate_hot_patches(target_patch, operate, func)) -- self.assertEqual(self.cmd.base.output.term.bold.call_args_list[1][0][0], target_patch[0]) -- -- def test_operate_hot_patches_should_return_none_when_target_patch_is_none(self): -- target_patch = [] -- operate = "apply" -- func = mock.MagicMock() -- self.assertIsNone(self.cmd.operate_hot_patches(target_patch, operate, func)) -- -- --if __name__ == '__main__': -- unittest.main() -diff --git a/hotpatch/test_hotupgrade.py b/hotpatch/test_hotupgrade.py -deleted file mode 100644 -index 7f003c7..0000000 ---- a/hotpatch/test_hotupgrade.py -+++ /dev/null -@@ -1,75 +0,0 @@ --#!/usr/bin/python3 --# ****************************************************************************** --# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. --# licensed under the Mulan PSL v2. --# You can use this software according to the terms and conditions of the Mulan PSL v2. --# You may obtain a copy of Mulan PSL v2 at: --# http://license.coscl.org.cn/MulanPSL2 --# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR --# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR --# PURPOSE. --# See the Mulan PSL v2 for more details. --# ******************************************************************************/ --import unittest --from unittest import mock -- --from .updateinfo_parse import HotpatchUpdateInfo --from .hotupgrade import HotupgradeCommand -- -- --class UpgradeTestCase(unittest.TestCase): -- def setUp(self) -> None: -- cli = mock.MagicMock() -- self.cmd = HotupgradeCommand(cli) -- -- @mock.patch.object(HotpatchUpdateInfo, "get_hotpatches_from_cve") -- def test_get_hotpatch_based_on_cve_should_return_empty_list_when_no_patch_found(self, mock_patch): -- mock_patch.return_value = {"CVE-2022-1": ["patch-kernel-4.19-1-ACC-1-1"], -- "CVE-2022-2": ["patch-kernel-4.19-1-ACC-1-1"]} -- res = self.cmd.get_hotpatch_based_on_cve(["CVE-2022-3"]) -- expected_res = [] -- self.assertEqual(res, expected_res) -- -- @mock.patch.object(HotpatchUpdateInfo, "get_hotpatches_from_cve") -- def test_get_hotpatch_based_on_cve_should_return_correct_when_two_cve_have_same_patch(self, mock_patch): -- mock_patch.return_value = {"CVE-2022-1": ["patch-kernel-4.19-1-ACC-1-1"], -- "CVE-2022-2": ["patch-kernel-4.19-1-ACC-1-1", "patch-kernel-tools-4.19-1-ACc-1-1"]} -- res = self.cmd.get_hotpatch_based_on_cve(["CVE-2022-1", "CVE-2022-2"]) -- expected_res = ["patch-kernel-4.19-1-ACC-1-1", "patch-kernel-tools-4.19-1-ACc-1-1"] -- self.assertEqual(res, expected_res) -- -- @mock.patch.object(HotpatchUpdateInfo, "get_hotpatches_from_advisories") -- def test_get_hotpatch_based_on_advisory_should_return_empty_list_when_no_patch_found(self, mock_patch): -- mock_patch.return_value = {"SA-2022-1": ["patch-kernel-4.19-1-ACC-1-1"], -- "SA-2022-2": ["patch-kernel-4.19-1-ACC-1-1"]} -- res = self.cmd.get_hotpatch_based_on_advisory(["SA-2022-3"]) -- expected_res = [] -- self.assertEqual(res, expected_res) -- -- @mock.patch.object(HotpatchUpdateInfo, "get_hotpatches_from_advisories") -- def test_get_hotpatch_based_on_advisory_should_return_correct_when_two_cve_have_same_patch(self, mock_patch): -- mock_patch.return_value = {"SA-2022-1": ["patch-kernel-4.19-1-ACC-1-1"], -- "SA-2022-2": ["patch-kernel-4.19-1-ACC-1-1", "patch-kernel-tools-4.19-1-ACc-1-1"]} -- res = self.cmd.get_hotpatch_based_on_cve(["SA-2022-2"]) -- expected_res = ["patch-kernel-4.19-1-ACC-1-1", "patch-kernel-tools-4.19-1-ACc-1-1"] -- self.assertEqual(res, expected_res) -- -- @mock.patch.object(HotpatchUpdateInfo, "get_hotpatch_of_all_cve") -- def test_get_hotpatch_of_all_cve_should_return_empty_list_when_no_patch_found(self, mock_patch): -- mock_patch.return_value = {"CVE-2022-1": [], -- "CVE-2022-2": []} -- res = self.cmd.get_hotpatch_of_all_cve() -- expected_res = [] -- self.assertEqual(res, expected_res) -- -- @mock.patch.object(HotpatchUpdateInfo, "get_hotpatch_of_all_cve") -- def test_get_hotpatch_of_all_cve_should_return_correct_when_two_cve_have_same_patch(self, mock_patch): -- mock_patch.return_value = {"CVE-2022-1": ["patch-kernel-4.19-1-ACC-1-1"], -- "CVE-2022-2": ["patch-kernel-4.19-1-ACC-1-1", "patch-kernel-tools-4.19-1-ACc-1-1"]} -- res = self.cmd.get_hotpatch_of_all_cve() -- expected_res = ["patch-kernel-4.19-1-ACC-1-1", "patch-kernel-tools-4.19-1-ACc-1-1"] -- self.assertEqual(res, expected_res) -- -- --if __name__ == '__main__': -- unittest.main() -diff --git a/hotpatch/test_syscare.py b/hotpatch/test_syscare.py -deleted file mode 100644 -index 50903ed..0000000 ---- a/hotpatch/test_syscare.py -+++ /dev/null -@@ -1,152 +0,0 @@ --#!/usr/bin/python3 --# ****************************************************************************** --# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. --# licensed under the Mulan PSL v2. --# You can use this software according to the terms and conditions of the Mulan PSL v2. --# You may obtain a copy of Mulan PSL v2 at: --# http://license.coscl.org.cn/MulanPSL2 --# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR --# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR --# PURPOSE. --# See the Mulan PSL v2 for more details. --# ******************************************************************************/ --import unittest --from unittest import mock -- --from .syscare import SUCCEED, FAIL --from .syscare import Syscare, cmd_output -- -- --class SyscareTestCase(unittest.TestCase): -- @mock.patch("hotpatch.syscare.cmd_output") -- def test_list_should_return_empty_when_cmd_output_nothing(self, mock_cmd): -- mock_cmd.return_value = "", FAIL -- result = Syscare().list() -- self.assertEqual(result, []) -- -- @mock.patch("hotpatch.syscare.cmd_output") -- def test_list_should_return_empty_when_cmd_output_only_header(self, mock_cmd): -- mock_cmd.return_value = "Target Name Status\n", SUCCEED -- result = Syscare().list() -- self.assertEqual(result, []) -- -- @mock.patch("hotpatch.syscare.cmd_output") -- def test_list_should_return_correct_result_when_cmd_output_correct(self, mock_cmd): -- mock_cmd.return_value = "Target Name Status\nredis-6.2.5-1.oe2203 CVE-2021-23675 ACTIVED\n", SUCCEED -- result = Syscare().list() -- expected_res = [{'Target': 'redis-6.2.5-1.oe2203', 'Name': 'CVE-2021-23675', 'Status': 'ACTIVED'}] -- self.assertEqual(result, expected_res) -- -- @mock.patch("hotpatch.syscare.cmd_output") -- def test_list_should_return_filtered_result_when_input_with_condition(self, mock_cmd): -- mock_cmd.return_value = ( -- "Target Name Status\nredis-6.2.5-1.oe2203 CVE-2021-23675 ACTIVED\n" -- "kernel-5.10.0-60.80.0.104.oe2203 modify-proc-version DEACTIVED\n", -- SUCCEED, -- ) -- result = Syscare().list(condition={"Status": "ACTIVED"}) -- expected_res = [{'Target': 'redis-6.2.5-1.oe2203', 'Name': 'CVE-2021-23675', 'Status': 'ACTIVED'}] -- self.assertEqual(result, expected_res) -- -- @mock.patch("hotpatch.syscare.cmd_output") -- def test_status_should_return_correct(self, mock_cmd): -- mock_cmd.return_value = "ACTIVED", SUCCEED -- patch_name = mock.MagicMock() -- result, status_code = Syscare().status(patch_name) -- expected_res = "ACTIVED" -- expected_code = SUCCEED -- self.assertEqual(result, expected_res) -- self.assertEqual(status_code, expected_code) -- -- @mock.patch("hotpatch.syscare.cmd_output") -- def test_apply_should_return_correct(self, mock_cmd): -- mock_cmd.return_value = "", SUCCEED -- patch_name = mock.MagicMock() -- result, status_code = Syscare().apply(patch_name) -- expected_res = "" -- expected_code = SUCCEED -- self.assertEqual(result, expected_res) -- self.assertEqual(status_code, expected_code) -- -- @mock.patch("hotpatch.syscare.cmd_output") -- def test_active_should_return_correct(self, mock_cmd): -- mock_cmd.return_value = "", SUCCEED -- patch_name = mock.MagicMock() -- result, status_code = Syscare().active(patch_name) -- expected_res = "" -- expected_code = SUCCEED -- self.assertEqual(result, expected_res) -- self.assertEqual(status_code, expected_code) -- -- @mock.patch("hotpatch.syscare.cmd_output") -- def test_deactive_should_return_correct(self, mock_cmd): -- mock_cmd.return_value = "", SUCCEED -- patch_name = mock.MagicMock() -- result, status_code = Syscare().deactive(patch_name) -- expected_res = "" -- expected_code = SUCCEED -- self.assertEqual(result, expected_res) -- self.assertEqual(status_code, expected_code) -- -- @mock.patch("hotpatch.syscare.cmd_output") -- def test_accept_should_return_correct(self, mock_cmd): -- mock_cmd.return_value = "", SUCCEED -- patch_name = mock.MagicMock() -- result, status_code = Syscare().accept(patch_name) -- expected_res = "" -- expected_code = SUCCEED -- self.assertEqual(result, expected_res) -- self.assertEqual(status_code, expected_code) -- -- @mock.patch("hotpatch.syscare.cmd_output") -- def test_remove_should_return_correct(self, mock_cmd): -- mock_cmd.return_value = "", SUCCEED -- patch_name = mock.MagicMock() -- result, status_code = Syscare().remove(patch_name) -- expected_res = "" -- expected_code = SUCCEED -- self.assertEqual(result, expected_res) -- self.assertEqual(status_code, expected_code) -- -- @mock.patch("hotpatch.syscare.cmd_output") -- def test_save_should_return_correct(self, mock_cmd): -- mock_cmd.return_value = "", SUCCEED -- result, status_code = Syscare().save() -- expected_res = "" -- expected_code = SUCCEED -- self.assertEqual(result, expected_res) -- self.assertEqual(status_code, expected_code) -- -- @mock.patch("hotpatch.syscare.cmd_output") -- def test_restore_should_return_correct(self, mock_cmd): -- mock_cmd.return_value = "", SUCCEED -- result, status_code = Syscare().restore() -- expected_res = "" -- expected_code = SUCCEED -- self.assertEqual(result, expected_res) -- self.assertEqual(status_code, expected_code) -- -- @mock.patch('subprocess.Popen') -- def test_cmd_output_should_return_correct_when_popen_return_success(self, mock_popen): -- expected_output = "Hello" -- expected_returncode = SUCCEED -- mock_process = mock_popen.return_value -- mock_process.stdout.read.return_value = expected_output.encode('utf-8') -- mock_process.returncode = expected_returncode -- output, returncode = cmd_output(['echo', 'hello']) -- self.assertEqual(output, expected_output) -- self.assertEqual(returncode, expected_returncode) -- -- @mock.patch('subprocess.Popen') -- def test_cmd_output_should_raise_exception_when_popen_excute_fail(self, mock_popen): -- expected_output = "-bash: hello:command not found" -- expected_returncode = FAIL -- mock_popen.side_effect = Exception('-bash: hello:command not found') -- cmd = mock.MagicMock() -- output, returncode = cmd_output(cmd) -- self.assertEqual(output, expected_output) -- self.assertEqual(returncode, expected_returncode) -- -- --if __name__ == '__main__': -- unittest.main() -diff --git a/hotpatch/updateinfo_parse.py b/hotpatch/updateinfo_parse.py -deleted file mode 100644 -index 5fe36da..0000000 ---- a/hotpatch/updateinfo_parse.py -+++ /dev/null -@@ -1,527 +0,0 @@ --#!/usr/bin/python3 --# ****************************************************************************** --# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. --# licensed under the Mulan PSL v2. --# You can use this software according to the terms and conditions of the Mulan PSL v2. --# You may obtain a copy of Mulan PSL v2 at: --# http://license.coscl.org.cn/MulanPSL2 --# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR --# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR --# PURPOSE. --# See the Mulan PSL v2 for more details. --# ******************************************************************************/ --import os --import re --import gzip --import datetime --import subprocess --import xml.etree.ElementTree as ET --from functools import cmp_to_key --from typing import List --from .baseclass import Hotpatch, Cve, Advisory --from .syscare import Syscare --from .version import Versions -- --SUCCEED = 0 --FAIL = 255 -- -- --def cmd_output(cmd): -- try: -- result = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) -- result.wait() -- return result.stdout.read().decode('utf-8'), result.returncode -- except Exception as e: -- print("error: ", e) -- return str(e), FAIL -- -- --class HotpatchUpdateInfo(object): -- """ -- Hotpatch relevant updateinfo processing -- """ -- -- UNINSTALLABLE = 0 -- INSTALLED = 1 -- INSTALLABLE = 2 -- UNRELATED = 3 -- -- syscare = Syscare() -- version = Versions() -- -- def __init__(self, base, cli): -- self.base = base -- self.cli = cli -- # dict {advisory_id: Advisory} -- self._hotpatch_advisories = {} -- # dict {cve_id: Cve} -- self._hotpatch_cves = {} -- # list [{'Uuid': uuid, 'Name':name, 'Status': status}] -- self._hotpatch_status = [] -- # dict {required_pkg_info_str: [Hotpatch]} -- self._hotpatch_required_pkg_info_str = {} -- -- self.init_hotpatch_info() -- -- def init_hotpatch_info(self): -- """ -- Initialize hotpatch information -- """ -- self._get_installed_pkgs() -- self._parse_and_store_hotpatch_info_from_updateinfo() -- self._init_hotpatch_status_from_syscare() -- self._init_hotpatch_state() -- -- @property -- def hotpatch_cves(self): -- return self._hotpatch_cves -- -- @property -- def hotpatch_status(self): -- return self._hotpatch_status -- -- @property -- def hotpatch_required_pkg_info_str(self): -- return self._hotpatch_required_pkg_info_str -- -- def _get_installed_pkgs(self): -- """ -- Get installed packages by setting the hawkey -- """ -- sack = self.base.sack -- # the latest installed packages -- q = sack.query().installed().latest(1) -- # plus packages of the running kernel -- kernel_q = sack.query().filterm(empty=True) -- kernel = sack.get_running_kernel() -- if kernel: -- kernel_q = kernel_q.union(sack.query().filterm(sourcerpm=kernel.sourcerpm)) -- q = q.union(kernel_q.installed()) -- q = q.apply() -- -- self._inst_pkgs_query = q -- -- def _parse_and_store_hotpatch_info_from_updateinfo(self): -- """ -- Initialize hotpatch information from repos -- """ -- # get xxx-updateinfo.xml.gz file paths by traversing the system_cachedir(/var/cache/dnf) -- system_cachedir = self.cli.base.conf.system_cachedir -- all_repos = self.cli.base.repos -- map_repo_updateinfoxml = {} -- -- for file in os.listdir(system_cachedir): -- file_path = os.path.join(system_cachedir, file) -- if os.path.isdir(file_path): -- repodata_path = os.path.join(file_path, "repodata") -- if not os.path.isdir(repodata_path): -- continue -- -- for xml_file in os.listdir(repodata_path): -- # the hotpatch relevant updateinfo is recorded in xxx-updateinfo.xml.gz -- if "updateinfo" in xml_file: -- repo_name = file.rsplit("-", 1)[0] -- cache_updateinfo_xml_path = os.path.join(repodata_path, xml_file) -- map_repo_updateinfoxml[repo_name] = cache_updateinfo_xml_path -- -- # only hotpatch relevant updateinfo from enabled repos are parsed and stored -- for repo in all_repos.iter_enabled(): -- repo_id = repo.id -- if repo_id in map_repo_updateinfoxml: -- updateinfoxml_path = map_repo_updateinfoxml[repo_id] -- self._parse_and_store_from_xml(updateinfoxml_path) -- -- def _parse_pkglist(self, pkglist): -- """ -- Parse the pkglist information, filter the hotpatches with different arches -- """ -- hotpatches = [] -- hot_patch_collection = pkglist.find('hot_patch_collection') -- arches = self.base.sack.list_arches() -- if not hot_patch_collection: -- return hotpatches -- for package in hot_patch_collection.iter('package'): -- hotpatch = {key: value for key, value in package.items()} -- if hotpatch['arch'] not in arches: -- continue -- hotpatch['filename'] = package.find('filename').text -- hotpatches.append(hotpatch) -- return hotpatches -- -- def _parse_references(self, reference): -- """ -- Parse the reference information, check whether the 'id' is missing -- """ -- cves = [] -- for ref in reference: -- cve = {key: value for key, value in ref.items()} -- if 'id' not in cve: -- continue -- cves.append(cve) -- return cves -- -- def _verify_date_str_lawyer(self, datetime_str: str) -> str: -- """ -- Check whether the 'datetime' field is legal, if not return default value -- """ -- if datetime_str.isdigit() and len(datetime_str) == 10: -- datetime_str = int(datetime_str) -- datetime_str = datetime.datetime.fromtimestamp(datetime_str).strftime("%Y-%m-%d %H:%M:%S") -- try: -- datetime.datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S') -- return datetime_str -- except ValueError: -- return "1970-01-01 08:00:00" -- -- def _parse_advisory(self, update): -- """ -- Parse the advisory information: check whether the 'datetime' field is legal, parse the 'references' -- field and the 'pkglist' field, save 'type' information -- """ -- advisory = {} -- for node in update: -- if node.tag == 'datetime': -- advisory[node.tag] = self._verify_date_str_lawyer(update.find(node.tag).text) -- elif node.tag == 'references': -- advisory[node.tag] = self._parse_references(node) -- elif node.tag == 'pkglist': -- advisory['hotpatches'] = self._parse_pkglist(node) -- else: -- advisory[node.tag] = update.find(node.tag).text -- advisory['adv_type'] = update.get('type') -- return advisory -- -- def _get_hotpatch_require_pkgs_info(self, hotpatch: Hotpatch): -- """ -- Get require packages from requires info of hotpatch packages. Specifically, read the require -- information of the rpm package, get the target coldpatch rpm package, and record the -- name_vere(name-version-release) information of the coldpatch. -- -- Returns: -- require pkgs: dict -- e.g. -- { -- 'redis': '6.2.5-1', -- 'redis-cli': '6.2.5-1' -- } -- """ -- require_pkgs = dict() -- cmd = ["dnf", "repoquery", "--requires", hotpatch.nevra] -- requires_output, return_code = cmd_output(cmd) -- if return_code != SUCCEED: -- return require_pkgs -- requires_output = requires_output.split('\n') -- for requires in requires_output: -- if " = " not in requires: -- continue -- require_pkg = requires -- pkg_name, pkg_vere = re.split(' = ', require_pkg) -- if pkg_name == "syscare": -- continue -- require_pkgs[pkg_name] = pkg_vere -- return require_pkgs -- -- def _store_advisory_info(self, advisory_kwargs: dict): -- """ -- Instantiate Cve, Hotpatch and Advisory object according to the advisory kwargs -- """ -- advisory_references = advisory_kwargs.pop('references') -- advisory_hotpatches = advisory_kwargs.pop('hotpatches') -- advisory = Advisory(**advisory_kwargs) -- advisory_cves = {} -- for cve_kwargs in advisory_references: -- cve = Cve(**cve_kwargs) -- -- if cve.cve_id not in self._hotpatch_cves: -- self._hotpatch_cves[cve.cve_id] = cve -- advisory_cves[cve.cve_id] = self._hotpatch_cves[cve.cve_id] -- -- advisory.cves = advisory_cves -- -- for hotpatch_kwargs in advisory_hotpatches: -- hotpatch = Hotpatch(**hotpatch_kwargs) -- hotpatch.advisory = advisory -- hotpatch.cves = advisory_cves.keys() -- hotpatch.required_pkgs_info = self._get_hotpatch_require_pkgs_info(hotpatch) -- -- advisory.add_hotpatch(hotpatch) -- -- for ref_id in advisory_cves.keys(): -- advisory_cves[ref_id].add_hotpatch(hotpatch) -- -- hotpatch_vere = "%s-%s" % (hotpatch.version, hotpatch.release) -- self._hotpatch_required_pkg_info_str.setdefault(hotpatch.required_pkgs_str, []).append( -- [hotpatch_vere, hotpatch] -- ) -- -- self._hotpatch_advisories[advisory_kwargs['id']] = advisory -- -- def _init_hotpatch_state(self): -- """ -- Initialize the hotpatch state, according to the target require package information and status -- information in syscare list. -- """ -- for advisory in self._hotpatch_advisories.values(): -- for hotpatch in advisory.hotpatches: -- self._set_hotpatch_state(hotpatch) -- -- def _set_hotpatch_state(self, hotpatch: Hotpatch): -- """ -- Set hotpatch state. -- -- each hotpatch has four states: -- 1. UNRELATED: the target required pakcage version is smaller than the version of package -- installed on this machine or not installed on this machine -- 2. UNINSTALLABLE: can not be installed due to the target required package version is bigger -- 3. INSTALLED: has been installed and actived/accepted in syscare -- 4. INSTALLABLE: can be installed -- -- Args: -- hotpatch(Hotpatch) -- """ -- hotpatch.state = self.UNRELATED -- is_find_installable_hp = False -- for required_pkg_name, required_pkg_vere in hotpatch.required_pkgs_info.items(): -- inst_pkgs = self._inst_pkgs_query.filter(name=required_pkg_name) -- # check whether the relevant target required package is installed on this machine -- if not inst_pkgs: -- return -- for inst_pkg in inst_pkgs: -- inst_pkg_vere = '%s-%s' % (inst_pkg.version, inst_pkg.release) -- if not self.version.larger_than(required_pkg_vere, inst_pkg_vere): -- hotpatch.state = self.UNRELATED -- elif required_pkg_vere != inst_pkg_vere: -- hotpatch.state = self.UNINSTALLABLE -- else: -- is_find_installable_hp = True -- -- if not is_find_installable_hp: -- return -- -- if self._get_hotpatch_aggregated_status_in_syscare(hotpatch) in ('ACTIVED', "ACCEPTED"): -- hotpatch.state = self.INSTALLED -- else: -- hotpatch.state = self.INSTALLABLE -- return -- -- def _parse_and_store_from_xml(self, updateinfoxml: str): -- """ -- Parse and store hotpatch update information from xxx-updateinfo.xml.gz -- -- xxx-updateinfo.xml.gz e.g. -- -- -- -- -- openEuler-SA-2022-1 -- An update for mariadb is now available for openEuler-22.03-LTS -- Important -- openEuler -- -- -- -- -- -- patch-redis-6.2.5-1-HP001.(CVE-2022-24048) -- -- -- openEuler -- -- patch-redis-6.2.5-1-HP001-1-1.aarch64.rpm -- -- -- patch-redis-6.2.5-1-HP001-1-1.x86_64.rpm -- -- -- patch-redis-6.2.5-1-HP002-1-1.aarch64.rpm -- -- -- patch-redis-6.2.5-1-HP002-1-1.x86_64.rpm -- -- -- -- -- ... -- -- """ -- content = gzip.open(updateinfoxml) -- tree = ET.parse(content) -- root = tree.getroot() -- for update in root.iter('update'): -- # check whether the hotpatch relevant package information is in each advisory -- if not update.find('pkglist/hot_patch_collection'): -- continue -- advisory = self._parse_advisory(update) -- self._store_advisory_info(advisory) -- -- def _init_hotpatch_status_from_syscare(self): -- """ -- Initialize hotpatch status from syscare -- """ -- self._hotpatch_status = self.syscare.list() -- self._hotpatch_state = {} -- for hotpatch_info in self._hotpatch_status: -- self._hotpatch_state[hotpatch_info['Name']] = hotpatch_info['Status'] -- -- def _get_hotpatch_aggregated_status_in_syscare(self, hotpatch: Hotpatch) -> str: -- """ -- Get hotpatch aggregated status in syscare. -- """ -- hotpatch_status = '' -- -- for name, status in self._hotpatch_state.items(): -- if hotpatch.syscare_subname not in name: -- continue -- elif status in ('ACTIVED', 'ACCEPTED'): -- hotpatch_status = status -- elif status not in ('ACTIVED', 'ACCEPTED'): -- return '' -- return hotpatch_status -- -- def get_hotpatches_from_cve(self, cves: List[str], priority="ACC") -> dict(): -- """ -- Get hotpatches from specified cve. If there are several hotpatches for the same target required -- package for a cve, only return the hotpatch with the highest version according to the priority. -- -- Args: -- cves: [cve_id_1, cve_id_2] -- -- Returns: -- { -- cve_id_1: [hotpatch1], -- cve_id_2: [] -- } -- """ -- -- mapping_cve_hotpatches = dict() -- if priority not in ("ACC", "SGL"): -- return mapping_cve_hotpatches -- -- for cve_id in cves: -- mapping_cve_hotpatches[cve_id] = [] -- if cve_id not in self.hotpatch_cves: -- continue -- -- self.update_mapping_cve_hotpatches(priority, mapping_cve_hotpatches, cve_id) -- -- return mapping_cve_hotpatches -- -- def version_cmp(self, hotpatch_a: Hotpatch, hotpatch_b: Hotpatch): -- """ -- Sort the hotpatch in descending order according to the version-release. -- """ -- vere_a = "%s-%s" % (hotpatch_a.version, hotpatch_a.release) -- vere_b = "%s-%s" % (hotpatch_b.version, hotpatch_b.release) -- if self.version.larger_than(vere_a, vere_b): -- return -1 -- if self.version.larger_than(vere_b, vere_a): -- return 1 -- return 0 -- -- def update_mapping_cve_hotpatches(self, priority: str, mapping_cve_hotpatches: dict, cve_id: str): -- """ -- Update mapping_cve_hotpatches. Specifically, get the hotpatches corresponding to the cve_id, if find -- at least one installable hotpatch, append hotpatches in mapping_cve_hotpatches for cve_id, in which -- only the hotpatch with the highest version (priority first) for each targeted required package is -- returned. Otherwise, do not update mapping_cve_hotpatches. -- -- Args: -- priority(str): ACC or SGL -- mapping_cve_hotpatches(dict): cve and corresponding hotpatches -- e.g. -- { -- cve_id: [Hotpatch.nevra] -- } -- cve_id(str): cve id. e.g. "CVE-2023-1111" -- """ -- mapping_required_pkg_to_hotpatches = dict() -- fixed_required_pkgs_str = [ -- hotpatch.required_pkgs_str -- for hotpatch in self.hotpatch_cves[cve_id].hotpatches -- if hotpatch.state == self.INSTALLED -- ] -- for hotpatch in self.hotpatch_cves[cve_id].hotpatches: -- if hotpatch.state != self.INSTALLABLE: -- continue -- if hotpatch.required_pkgs_str in fixed_required_pkgs_str: -- continue -- mapping_required_pkg_to_hotpatches.setdefault(hotpatch.required_pkgs_str, []).append(hotpatch) -- if hotpatch.hotpatch_name == "ACC": -- self.append_related_hotpatches(mapping_required_pkg_to_hotpatches) -- -- if not mapping_required_pkg_to_hotpatches: -- return -- -- # find the hotpatch with the highest version for the same target required package -- for hotpatches in mapping_required_pkg_to_hotpatches.values(): -- prefered_hotpatch = self.get_preferred_hotpatch(hotpatches, priority) -- if not prefered_hotpatch: -- continue -- if prefered_hotpatch.state == self.INSTALLABLE: -- mapping_cve_hotpatches[cve_id].append(prefered_hotpatch.nevra) -- -- def append_related_hotpatches(self, mapping_required_pkg_to_hotpatches: dict): -- """ -- Append the hotpatches corresponding to the target required package in -- mapping_required_pkg_to_hotpatches. -- -- Args: -- mapping_required_pkg_to_hotpatches(dict): target required pkg and corresponding hotpatches -- e.g. -- { -- "redis,redis-cli": [Hotpatch] -- } -- -- """ -- for cve in self.hotpatch_cves.values(): -- for hotpatch in cve.hotpatches: -- if hotpatch.hotpatch_name != "ACC": -- continue -- if hotpatch.required_pkgs_str not in mapping_required_pkg_to_hotpatches: -- continue -- if hotpatch in mapping_required_pkg_to_hotpatches[hotpatch.required_pkgs_str]: -- continue -- mapping_required_pkg_to_hotpatches[hotpatch.required_pkgs_str].append(hotpatch) -- -- def get_preferred_hotpatch(self, hotpatches: list, priority: str): -- """ -- Get the preferred hotpatch with priority in their hotpatch_name. -- -- Args: -- hotpatches(list): hotpatches -- priority(str): ACC or SGL -- -- Returns: -- Hotpatch or None -- """ -- if not hotpatches: -- return None -- hotpatches.sort(key=cmp_to_key(self.version_cmp)) -- for hotpatch in hotpatches: -- if priority in hotpatch.hotpatch_name: -- return hotpatch -- return hotpatches[0] -- -- def get_hotpatches_from_advisories(self, advisories: List[str]) -> dict(): -- """ -- Get hotpatches from specified advisories -- -- Args: -- advisories: [advisory_id_1, advisory_id_2] -- -- Return: -- { -- advisory_id_1: [hotpatch1], -- advisory_id_2: [] -- } -- """ -- mapping_advisory_hotpatches = dict() -- for advisory_id in advisories: -- mapping_advisory_hotpatches[advisory_id] = [] -- if advisory_id not in self._hotpatch_advisories: -- continue -- advisory = self._hotpatch_advisories[advisory_id] -- for hotpatch in advisory.hotpatches: -- if hotpatch.state == self.INSTALLABLE: -- mapping_advisory_hotpatches[advisory_id].append(hotpatch.nevra) -- return mapping_advisory_hotpatches -diff --git a/hotpatch/version.py b/hotpatch/version.py -deleted file mode 100644 -index 820b8d5..0000000 ---- a/hotpatch/version.py -+++ /dev/null -@@ -1,46 +0,0 @@ --#!/usr/bin/python3 --# ****************************************************************************** --# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. --# licensed under the Mulan PSL v2. --# You can use this software according to the terms and conditions of the Mulan PSL v2. --# You may obtain a copy of Mulan PSL v2 at: --# http://license.coscl.org.cn/MulanPSL2 --# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR --# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR --# PURPOSE. --# See the Mulan PSL v2 for more details. --# ******************************************************************************/ --class Versions: -- """ -- Version number processing -- """ -- -- separator = (".", "-") -- _connector = "&" -- -- def _order(self, version, separator=None): -- """ -- Version of the cutting -- Args: -- version: version -- separator: separator -- -- Returns: -- -- """ -- if not separator: -- separator = self._connector -- return tuple([int(v) for v in version.split(separator) if v.isdigit()]) -- -- def larger_than(self, version, compare_version): -- """ -- Returns true if the size of the compared version is greater -- than that of the compared version, or false otherwise -- -- """ -- for separator in self.separator: -- version = self._connector.join([v for v in version.split(separator)]) -- compare_version = self._connector.join([v for v in compare_version.split(separator)]) -- version = self._order(version) -- compare_version = self._order(compare_version) -- return version >= compare_version --- -2.27.0 - diff --git a/aops-apollo-v1.3.3.tar.gz b/aops-apollo-v1.3.3.tar.gz deleted file mode 100644 index a4b55ad..0000000 Binary files a/aops-apollo-v1.3.3.tar.gz and /dev/null differ diff --git a/aops-apollo-v1.3.4.tar.gz b/aops-apollo-v1.3.4.tar.gz new file mode 100644 index 0000000..534e301 Binary files /dev/null and b/aops-apollo-v1.3.4.tar.gz differ diff --git a/aops-apollo.spec b/aops-apollo.spec index 3d6a6c1..c12ae7e 100644 --- a/aops-apollo.spec +++ b/aops-apollo.spec @@ -1,13 +1,10 @@ Name: aops-apollo -Version: v1.3.3 -Release: 3 +Version: v1.3.4 +Release: 1 Summary: Cve management service, monitor machine vulnerabilities and provide fix functions. License: MulanPSL2 URL: https://gitee.com/openeuler/%{name} Source0: %{name}-%{version}.tar.gz -Patch0001: 0001-optimize-cve-query-performance.patch -Patch0002: 0002-suitable-for-2003-sp3.patch -Patch0003: 0003-rm-hotpatch.patch BuildRequires: python3-setuptools Requires: aops-vulcanus >= v1.3.0 @@ -30,7 +27,7 @@ Requires: python3-rpm smalltools for aops-apollo, e.g.updateinfo.xml generater %prep -%autosetup -n %{name}-%{version} -p1 +%autosetup -n %{name}-%{version} # build for aops-apollo @@ -70,7 +67,7 @@ popd %{python3_sitelib}/aops_apollo_tool/* %changelog -* Thu Oct 19 2023 gongzhengtang - v1.3.3-3 +* Thu Oct 19 2023 gongzhengtang - v1.3.4-1 - Remove hotpatch * Wed Oct 18 2023 gongzhengtang - v1.3.3-2