From 7b986fb7739ddd7ab508c5e4e53e1d51442553f7 Mon Sep 17 00:00:00 2001 From: gongzt Date: Thu, 19 Oct 2023 03:42:45 +0000 Subject: [PATCH 1/1] rm --- aops-apollo.spec | 18 +- hotpatch/baseclass.py | 285 ---------------- hotpatch/hot_updateinfo.py | 613 ----------------------------------- hotpatch/hotpatch.py | 165 ---------- hotpatch/hotupgrade.py | 482 --------------------------- hotpatch/syscare.py | 116 ------- hotpatch/test_hotpatch.py | 45 --- hotpatch/test_hotupgrade.py | 75 ----- hotpatch/test_syscare.py | 152 --------- hotpatch/updateinfo_parse.py | 527 ------------------------------ hotpatch/version.py | 46 --- 11 files changed, 4 insertions(+), 2520 deletions(-) delete mode 100644 hotpatch/baseclass.py delete mode 100644 hotpatch/hot_updateinfo.py delete mode 100644 hotpatch/hotpatch.py delete mode 100644 hotpatch/hotupgrade.py delete mode 100644 hotpatch/syscare.py delete mode 100644 hotpatch/test_hotpatch.py delete mode 100644 hotpatch/test_hotupgrade.py delete mode 100644 hotpatch/test_syscare.py delete mode 100644 hotpatch/updateinfo_parse.py delete mode 100644 hotpatch/version.py diff --git a/aops-apollo.spec b/aops-apollo.spec index 308436f..1047cdd 100644 --- a/aops-apollo.spec +++ b/aops-apollo.spec @@ -1,5 +1,5 @@ Name: aops-apollo -Version: v1.2.1 +Version: v1.3.3 Release: 1 Summary: Cve management service, monitor machine vulnerabilities and provide fix functions. License: MulanPSL2 @@ -18,13 +18,6 @@ Provides: aops-apollo %description Cve management service, monitor machine vulnerabilities and provide fix functions. -%package -n dnf-hotpatch-plugin -Summary: dnf hotpatch plugin -Requires: python3-hawkey python3-dnf syscare >= 1.0.1 - -%description -n dnf-hotpatch-plugin -dnf hotpatch plugin, it's about hotpatch query and fix - %package -n aops-apollo-tool Summary: Small tools for aops-apollo, e.g. updateinfo.xml generater Requires: python3-rpm @@ -54,9 +47,6 @@ pushd aops-apollo-tool %py3_install popd -#install for aops-dnf-plugin -cp -r hotpatch %{buildroot}/%{python3_sitelib}/dnf-plugins/ - %files %doc README.* @@ -68,9 +58,6 @@ cp -r hotpatch %{buildroot}/%{python3_sitelib}/dnf-plugins/ %{python3_sitelib}/apollo/* %attr(0755, root, root) /opt/aops/database/* -%files -n dnf-hotpatch-plugin -%{python3_sitelib}/dnf-plugins/* - %files -n aops-apollo-tool %attr(0644,root,root) %{_sysconfdir}/aops_apollo_tool/updateinfo_config.ini %attr(0755,root,root) %{_bindir}/gen-updateinfo @@ -78,6 +65,9 @@ cp -r hotpatch %{buildroot}/%{python3_sitelib}/dnf-plugins/ %{python3_sitelib}/aops_apollo_tool/* %changelog +* Thu Oct 19 2023 gongzhengtang - v1.3.3-1 +- Remove hotpatch + * Tue May 23 2023 zhu-yuncheng - v1.2.1-1 - Better dnf hotpatch plugin for more syscare command - Add updateinfo.xml generation tool diff --git a/hotpatch/baseclass.py b/hotpatch/baseclass.py deleted file mode 100644 index b7e29fd..0000000 --- a/hotpatch/baseclass.py +++ /dev/null @@ -1,285 +0,0 @@ -#!/usr/bin/python3 -# ****************************************************************************** -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. -# licensed under the Mulan PSL v2. -# You can use this software according to the terms and conditions of the Mulan PSL v2. -# You may obtain a copy of Mulan PSL v2 at: -# http://license.coscl.org.cn/MulanPSL2 -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR -# PURPOSE. -# See the Mulan PSL v2 for more details. -# ******************************************************************************/ -class Hotpatch(object): - __slots__ = [ - '_name', - '_version', - '_release', - '_cves', - '_advisory', - '_arch', - '_filename', - '_state', - '_required_pkgs_info', - '_required_pkgs_str', - '_required_pkgs_name_str', - ] - - def __init__(self, name, version, arch, filename, release): - """ - name: str - version: str - arch: str - filename: str - release: str - """ - self._name = name - self._version = version - self._arch = arch - self._filename = filename - self._cves = [] - self._advisory = None - self._state = '' - self._release = release - self._required_pkgs_info = dict() - self._required_pkgs_str = '' - self._required_pkgs_name_str = '' - - @property - def state(self): - return self._state - - @state.setter - def state(self, value): - self._state = value - - @property - def name(self): - """ - name: patch-src_pkg-ACC or patch-src_pkg-SGL_xxx - """ - return self._name - - @property - def version(self): - return self._version - - @property - def release(self): - return self._release - - @property - def src_pkg(self): - """ - The compiled source package for hotpatch. - - src_pkg: name-version-release - """ - src_pkg = self.name[self.name.index('-') + 1 : self.name.rindex('-')] - return src_pkg - - @property - def required_pkgs_info(self): - """ - The target fixed rpm package of the hotpatch. - """ - return self._required_pkgs_info - - @required_pkgs_info.setter - def required_pkgs_info(self, required_pkgs_info): - """ - The required pkgs info are from the 'dnf repoquery --requires name-version-release.arch'. The - required pkgs info are considered to be truely fixed rpm package. - - e.g. - { - 'redis': '6.2.5-1', - 'redis-cli': '6.2.5-1' - } - """ - - self._required_pkgs_info = required_pkgs_info - required_pkgs_str_list = [] - required_pkgs_name_str_list = [] - # sort the _required_pkgs_info and concatenate the str to get _required_pkgs_str - for required_pkgs_name, required_pkgs_vere in self._required_pkgs_info.items(): - required_pkgs_str_list.append("%s-%s" % (required_pkgs_name, required_pkgs_vere)) - required_pkgs_name_str_list.append(required_pkgs_name) - sorted(required_pkgs_str_list) - sorted(required_pkgs_name_str_list) - self._required_pkgs_str = ",".join(required_pkgs_str_list) - self._required_pkgs_name_str = ",".join(required_pkgs_name_str_list) - - @property - def required_pkgs_str(self): - """ - The truly fixed rpm package mark, which is composed of required_pkgs_info. - - e.g. - 'redis-6.2.5-1,redis-cli-6.2.5-1' - """ - return self._required_pkgs_str - - @property - def required_pkgs_name_str(self): - """ - The truly fixed rpm package name mark, which is composed of keys of required_pkgs_info. - - e.g. - 'redis,redis-cli' - """ - return self._required_pkgs_name_str - - @property - def src_pkg_nevre(self): - """ - Parse the source package to get the source package name, the source package version and the source package release - - Returns: - src_pkg_name, src_pkg_version, src_pkg_release - """ - src_pkg = self.src_pkg - release_pos = src_pkg.rindex('-') - version_pos = src_pkg.rindex('-', 0, release_pos) - src_pkg_name, src_pkg_version, src_pkg_release = ( - src_pkg[0:version_pos], - src_pkg[version_pos + 1 : release_pos], - src_pkg[release_pos + 1 :], - ) - return src_pkg_name, src_pkg_version, src_pkg_release - - @property - def nevra(self): - """ - Format the filename as 'name-versioin-release.arch' for display, which is defined as nevra - - nevra: name-version-release.arch - """ - return self.filename[0 : self.filename.rindex('.')] - - @property - def hotpatch_name(self): - """ - There are two types of hotpatch, ACC hotpatch and SGL hotpatch. The ACC hotpatch can be made - iteratively, and its 'hotpatch_name' is defined as ACC. The SGL hotpatch cannot be made iteratively, - and its 'hotpatch_name' is defined as SGL_xxx. The 'xxx' in the SGL_xxx means the issue it solves. - """ - hotpatch_name = self.name[self.name.rindex('-') + 1 :] - return hotpatch_name - - @property - def syscare_subname(self): - """ - The 'syscare_subname' is used for hotpatch status querying in syscare list, which is composed of - 'src_pkg/hotpatch_name-version-release'. - """ - src_pkg = '%s-%s-%s' % (self.src_pkg_nevre) - - return '%s/%s-%s-%s' % (src_pkg, self.hotpatch_name, self.version, self.release) - - @property - def cves(self): - return self._cves - - @cves.setter - def cves(self, cves): - self._cves = cves - - @property - def advisory(self): - return self._advisory - - @advisory.setter - def advisory(self, advisory): - self._advisory = advisory - - @property - def arch(self): - return self._arch - - @property - def filename(self): - return self._filename - - -class Cve(object): - __slots__ = ['_cve_id', '_hotpatches'] - - def __init__(self, id, **kwargs): - """ - id: str - """ - self._cve_id = id - self._hotpatches = [] - - @property - def hotpatches(self): - return self._hotpatches - - def add_hotpatch(self, hotpatch: Hotpatch): - self._hotpatches.append(hotpatch) - - @property - def cve_id(self): - return self._cve_id - - -class Advisory(object): - __slots__ = ['_id', '_adv_type', '_title', '_severity', '_description', '_updated', '_hotpatches', '_cves'] - - def __init__(self, id, adv_type, title, severity, description, updated="1970-01-01 08:00:00", **kwargs): - """ - id: str - adv_type: str - title: str - severity: str - description: str - updated: str - """ - self._id = id - self._adv_type = adv_type - self._title = title - self._severity = severity - self._description = description - self._updated = updated - self._cves = {} - self._hotpatches = [] - - @property - def id(self): - return self._id - - @property - def adv_type(self): - return self._adv_type - - @property - def title(self): - return self._title - - @property - def severity(self): - return self._severity - - @property - def description(self): - return self._description - - @property - def updated(self): - return self._updated - - @property - def cves(self): - return self._cves - - @cves.setter - def cves(self, advisory_cves): - self._cves = advisory_cves - - @property - def hotpatches(self): - return self._hotpatches - - def add_hotpatch(self, hotpatch: Hotpatch): - self._hotpatches.append(hotpatch) diff --git a/hotpatch/hot_updateinfo.py b/hotpatch/hot_updateinfo.py deleted file mode 100644 index 3e86721..0000000 --- a/hotpatch/hot_updateinfo.py +++ /dev/null @@ -1,613 +0,0 @@ -#!/usr/bin/python3 -# ****************************************************************************** -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. -# licensed under the Mulan PSL v2. -# You can use this software according to the terms and conditions of the Mulan PSL v2. -# You may obtain a copy of Mulan PSL v2 at: -# http://license.coscl.org.cn/MulanPSL2 -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR -# PURPOSE. -# See the Mulan PSL v2 for more details. -# ******************************************************************************/ -import dnf -import hawkey -from dnf.i18n import _ -from dnf.cli.commands.updateinfo import UpdateInfoCommand -from dataclasses import dataclass -from .updateinfo_parse import HotpatchUpdateInfo -from .version import Versions -from .baseclass import Hotpatch - - -@dataclass -class DisplayItem: - """ - Class for storing the formatting parameters and display lines. - - idw(int): the width of 'cve_id' - tiw(int): the width of 'adv_type' - ciw(int): the width of 'coldpatch' - display_lines(set): { - (cve_id, adv_type, coldpatch, hotpatch), - } - """ - - idw: int - tiw: int - ciw: int - display_lines: set - - -@dnf.plugin.register_command -class HotUpdateinfoCommand(dnf.cli.Command): - CVE_ID_INDEX = 0 - ADV_SEVERITY_INDEX = 1 - COLDPATCH_INDEX = 2 - HOTPATCH_INDEX = 3 - - aliases = ['hot-updateinfo'] - summary = _('show hotpatch updateinfo') - - def __init__(self, cli): - """ - Initialize the command - """ - super(HotUpdateinfoCommand, self).__init__(cli) - - @staticmethod - def set_argparser(parser): - spec_action_cmds = ['list'] - parser.add_argument('spec_action', nargs=1, choices=spec_action_cmds, help=_('show updateinfo list')) - - with_cve_cmds = ['cve', 'cves'] - parser.add_argument('with_cve', nargs=1, choices=with_cve_cmds, help=_('show cves')) - - availability = parser.add_mutually_exclusive_group() - availability.add_argument( - "--available", - dest="availability", - const='available', - action='store_const', - help=_("cves about newer versions of installed packages (default)"), - ) - availability.add_argument( - "--installed", - dest="availability", - const='installed', - action='store_const', - help=_("cves about equal and older versions of installed packages"), - ) - - def configure(self): - demands = self.cli.demands - demands.sack_activation = True - demands.available_repos = True - - self.filter_cves = self.opts.cves if self.opts.cves else None - - def run(self): - self.hp_hawkey = HotpatchUpdateInfo(self.cli.base, self.cli) - - if self.opts.spec_action and self.opts.spec_action[0] == 'list' and self.opts.with_cve: - self.display() - - def get_mapping_nevra_cve(self) -> dict: - """ - Get cve nevra mapping based on the UpdateInfoCommand of 'dnf updateinfo list cves'. - - Returns: - dict: to collect cve and cold patch updateinfo information - { - (name-version-release.arch, advisory.updated): { - cve_id: (advisory.type, advisory.severity) - } - } - """ - - apkg_adv_insts = self.get_apkg_adv_insts() - - mapping_nevra_cve = dict() - for apkg, advisory, _ in apkg_adv_insts: - nevra = (apkg.name, apkg.evr, apkg.arch) - for ref in advisory.references: - if ref.type != hawkey.REFERENCE_CVE: - continue - mapping_nevra_cve.setdefault((nevra, advisory.updated), dict())[ref.id] = ( - advisory.type, - advisory.severity, - ) - return mapping_nevra_cve - - def get_apkg_adv_insts(self): - """ - Configure UpdateInfoCommand with 'dnf updateinfo list cves --available' (default) or - 'dnf updateinfo list cves --installed' according to the args, and get available package, advisory - and package installation information. - - Returns: - generator: generator for (available package, advisory, package installation - information) - """ - updateinfo = UpdateInfoCommand(self.cli) - updateinfo.opts = self.opts - - updateinfo.opts.spec_action = 'list' - updateinfo.opts.with_cve = True - updateinfo.opts.spec = '*' - updateinfo.opts._advisory_types = set() - updateinfo.opts.availability = self.opts.availability - self.updateinfo = updateinfo - - if self.updateinfo.opts.availability == 'installed': - apkg_adv_insts = updateinfo.installed_apkg_adv_insts(updateinfo.opts.spec) - else: - apkg_adv_insts = updateinfo.available_apkg_adv_insts(updateinfo.opts.spec) - - return apkg_adv_insts - - def get_fixed_cve_id_and_hotpatch_require_info(self, fixed_cve_id_and_hotpatch: set): - """ - Get fixed cve id and hotpatch require package information. - - Args: - fixed_cve_id_and_hotpatch(set): - e.g. - { - ('CVE-2023-1111', Hotpatch) - } - - Returns: - set: - e.g. - { - ('CVE-2023-1111', 'redis-6.2.5-1') - ('CVE-2023-1112', 'redis-6.2.5-1,redis-cli-6.2.5-1') - } - """ - fixed_cve_id_and_hotpatch_require_info = set() - for fixed_cve_id, fixed_hotpatch in fixed_cve_id_and_hotpatch: - fixed_cve_id_and_hotpatch_require_info.add((fixed_cve_id, fixed_hotpatch.required_pkgs_str)) - return fixed_cve_id_and_hotpatch_require_info - - def get_iterated_cve_id_and_hotpatch_require_info(self, iterated_cve_id_and_hotpatch: set): - """ - Get iterated cve id and hotpatch require package name information. - - Args: - iterated_cve_id_and_hotpatch(set): - e.g. - { - ('CVE-2023-1111', Hotpatch) - } - - - Returns: - set - e.g. - { - ('CVE-2023-1111', 'redis') - ('CVE-2023-1112', 'redis,redis-cli') - } - """ - iterated_cve_id_and_hotpatch_require_info = set() - for iterated_cve_id, iterated_hotpatch in iterated_cve_id_and_hotpatch: - iterated_cve_id_and_hotpatch_require_info.add((iterated_cve_id, iterated_hotpatch.required_pkgs_name_str)) - return iterated_cve_id_and_hotpatch_require_info - - def check_is_in_fixed_cve_id_and_hotpatch_require_info( - self, cve_id: str, hotpatch: Hotpatch, fixed_cve_id_and_hotpatch_require_info: set - ): - """ - Check the (cve_id, hotpatch.required_pkgs_str) whether is in fixed_cve_id_and_hotpatch_require_info. - If the (cve_id, hotpatch.required_pkgs_str) is in fixed_cve_id_and_hotpatch_require_info, the cve corresponding - to the hotpatch should be fixed. - - Args: - cve_id(str) - hotpatch(Hotpatch) - fixed_cve_id_and_hotpatch_require_info(set): - e.g. - { - ('CVE-2023-1111', 'redis-6.2.5-1') - ('CVE-2023-1112', 'redis-6.2.5-1') - } - - Returns: - bool: whether the cve corresponding to the hotpatch is fixed - """ - is_fixed = False - if (cve_id, hotpatch.required_pkgs_str) in fixed_cve_id_and_hotpatch_require_info: - is_fixed = True - return is_fixed - - def check_is_in_iterated_cve_id_and_hotpatch_require_info( - self, cve_id: str, hotpatch: Hotpatch, iterated_cve_id_and_hotpatch_require_info: set - ): - """ - Check the (cve_id, hotpatch.required_pkgs_name_str) whether is in iterated_cve_id_and_hotpatch_require_info. - If the (cve_id, hotpatch.required_pkgs_name_str) is in fixed_cve_id_and_hotpatch_require_info, the cve - corresponding to the hotpatch should be iterated. - - Args: - cve_id(str) - hotpatch(Hotpatch) - iterated_cve_id_and_hotpatch_require_info(set): - e.g. - { - ('CVE-2023-1111', 'redis') - ('CVE-2023-1112', 'redis') - } - - Returns: - bool: whether the cve corresponding to the hotpatch is iterated - """ - is_iterated = False - if ( - hotpatch.state in (self.hp_hawkey.UNINSTALLABLE, self.hp_hawkey.UNRELATED) - and (cve_id, hotpatch.required_pkgs_name_str) in iterated_cve_id_and_hotpatch_require_info - ): - is_iterated = True - return is_iterated - - def get_filtered_display_item( - self, - format_lines: set, - fixed_cve_id_and_hotpatch: set, - installable_cve_id_and_hotpatch: set, - iterated_cve_id_and_hotpatch: set, - ): - """ - Get filtered display item. - - Args: - format_lines(set): - { - (cve_id, adv_type, coldpatch, hotpatch) - } - - fixed_cve_id_and_hotpatch(set): - e.g. - { - ('CVE-2023-1111', Hotpatch) - } - - iterated_cve_id_and_hotpatch(set): - e.g. - { - ('CVE-2023-1111', Hotpatch) - } - - Returns: - DisplayItem: for display - """ - if self.updateinfo.opts.availability == 'installed': - display_item = self.get_installed_filtered_display_item( - format_lines, fixed_cve_id_and_hotpatch, installable_cve_id_and_hotpatch - ) - return display_item - display_item = self.get_available_filtered_display_item( - format_lines, fixed_cve_id_and_hotpatch, iterated_cve_id_and_hotpatch - ) - return display_item - - def get_installed_filtered_display_item( - self, format_lines: set, fixed_cve_id_and_hotpatch: set, installable_cve_id_and_hotpatch: set - ): - """ - Get filtered display item by removing installable cve id and hotpatch, and removing iterated cve id - and hotpatch. For hotpatch, only show ones which have been installed and been actived/accepted in - syscare. - - Args: - format_lines(set): - { - (cve_id, adv_type, coldpatch, hotpatch) - } - - installable_cve_id_and_hotpatch(set): - e.g. - { - ('CVE-2023-1111', Hotpatch) - } - - Returns: - DisplayItem: for display - - """ - display_lines = set() - - # calculate the width of each column - idw = tiw = ciw = 0 - for format_line in format_lines: - cve_id, adv_type, coldpatch, hotpatch = ( - format_line[self.CVE_ID_INDEX], - format_line[self.ADV_SEVERITY_INDEX], - format_line[self.COLDPATCH_INDEX], - format_line[self.HOTPATCH_INDEX], - ) - if self.filter_cves is not None and cve_id not in self.filter_cves: - continue - if (cve_id, hotpatch) in installable_cve_id_and_hotpatch: - if coldpatch == '-': - continue - else: - hotpatch = '-' - - if isinstance(hotpatch, Hotpatch): - if (cve_id, hotpatch) in fixed_cve_id_and_hotpatch or hotpatch.state == self.hp_hawkey.INSTALLED: - hotpatch = hotpatch.nevra - elif hotpatch.state in (self.hp_hawkey.UNINSTALLABLE, self.hp_hawkey.UNRELATED): - hotpatch = '-' - - if coldpatch == '-' and hotpatch == '-': - continue - - idw = max(idw, len(cve_id)) - tiw = max(tiw, len(adv_type)) - ciw = max(ciw, len(coldpatch)) - display_lines.add((cve_id, adv_type, coldpatch, hotpatch)) - - display_item = DisplayItem(idw=idw, tiw=tiw, ciw=ciw, display_lines=display_lines) - return display_item - - def get_available_filtered_display_item( - self, format_lines: set, fixed_cve_id_and_hotpatch: set, iterated_cve_id_and_hotpatch: set - ): - """ - Get filtered display item by removing fixed cve id and hotpatch, and removing iterated cve id - and hotpatch. - - Args: - format_lines(set): - { - (cve_id, adv_type, coldpatch, hotpatch) - } - - fixed_cve_id_and_hotpatch(set): - e.g. - { - ('CVE-2023-1111', Hotpatch) - } - - iterated_cve_id_and_hotpatch(set): - e.g. - { - ('CVE-2023-1111', Hotpatch) - } - - Returns: - DisplayItem: for display - """ - display_lines = set() - - fixed_cve_id_and_hotpatch_require_info = self.get_fixed_cve_id_and_hotpatch_require_info( - fixed_cve_id_and_hotpatch - ) - iterated_cve_id_and_hotpatch_require_info = self.get_iterated_cve_id_and_hotpatch_require_info( - iterated_cve_id_and_hotpatch - ) - - # calculate the width of each column - idw = tiw = ciw = 0 - for format_line in format_lines: - cve_id, adv_type, coldpatch, hotpatch = ( - format_line[self.CVE_ID_INDEX], - format_line[self.ADV_SEVERITY_INDEX], - format_line[self.COLDPATCH_INDEX], - format_line[self.HOTPATCH_INDEX], - ) - if self.filter_cves is not None and cve_id not in self.filter_cves: - continue - if (cve_id, hotpatch) in fixed_cve_id_and_hotpatch: - continue - - if isinstance(hotpatch, Hotpatch): - if self.check_is_in_iterated_cve_id_and_hotpatch_require_info( - cve_id, hotpatch, iterated_cve_id_and_hotpatch_require_info - ): - continue - if self.check_is_in_fixed_cve_id_and_hotpatch_require_info( - cve_id, hotpatch, fixed_cve_id_and_hotpatch_require_info - ): - continue - # format hotpatch - if hotpatch.state == self.hp_hawkey.INSTALLABLE: - hotpatch = hotpatch.nevra - elif hotpatch.state == self.hp_hawkey.UNINSTALLABLE: - hotpatch = '-' - elif hotpatch.state == self.hp_hawkey.UNRELATED and coldpatch == '-': - continue - elif hotpatch.state == self.hp_hawkey.UNRELATED: - hotpatch = '-' - - idw = max(idw, len(cve_id)) - tiw = max(tiw, len(adv_type)) - ciw = max(ciw, len(coldpatch)) - display_lines.add((cve_id, adv_type, coldpatch, hotpatch)) - - display_item = DisplayItem(idw=idw, tiw=tiw, ciw=ciw, display_lines=display_lines) - return display_item - - def append_fixed_cve_id_and_hotpatch(self, fixed_cve_id_and_hotpatch: set): - """ - Append fixed cve id and hotpatch in fixed_cve_id_and_hotpatch. The ACC hotpatch that are less or equal - to the highest ACC actived version-release for the same target required package, is considered to be - fixed. - - Args: - fixed_cve_id_and_hotpatch(set) - e.g. - { - ('CVE-2023-2221', Hotpatch) - } - Returns: - set: - e.g. - { - ('CVE-2023-2221', Hotpatch), - ('CVE-2023-1111', Hotpatch) - } - - """ - versions = Versions() - # {hotpatch_required_pkgs_str: version-release} - hotpatch_vere_mapping = dict() - for _, fixed_hotpatch in fixed_cve_id_and_hotpatch: - # get the highest version-release for each target required package - required_pkgs_str = fixed_hotpatch.required_pkgs_str - current_vere = "%s-%s" % (fixed_hotpatch.version, fixed_hotpatch.release) - if fixed_hotpatch.hotpatch_name != "ACC": - continue - if required_pkgs_str not in hotpatch_vere_mapping: - hotpatch_vere_mapping[required_pkgs_str] = current_vere - elif versions.larger_than(current_vere, hotpatch_vere_mapping[required_pkgs_str]): - hotpatch_vere_mapping[required_pkgs_str] = current_vere - - # get all hot hotpatches that are less or equal to the highest version-release, and record the cves - # which they fix - for required_pkgs_str, actived_vere in hotpatch_vere_mapping.items(): - all_hotpatches = self.hp_hawkey._hotpatch_required_pkg_info_str[required_pkgs_str] - for cmped_vere, hotpatch in all_hotpatches: - if hotpatch.hotpatch_name != "ACC": - continue - if not versions.larger_than(actived_vere, cmped_vere): - continue - for cve_id in hotpatch.cves: - fixed_cve_id_and_hotpatch.add((cve_id, hotpatch)) - return fixed_cve_id_and_hotpatch - - def get_formatting_parameters_and_display_lines(self): - """ - Append hotpatch information according to the output of 'dnf updateinfo list cves' - - Returns: - DisplayItem: for display - """ - - def type2label(updateinfo, typ, sev): - if typ == hawkey.ADVISORY_SECURITY: - return updateinfo.SECURITY2LABEL.get(sev, _('Unknown/Sec.')) - else: - return updateinfo.TYPE2LABEL.get(typ, _('unknown')) - - mapping_nevra_cve = self.get_mapping_nevra_cve() - echo_lines = set() - fixed_cve_id_and_hotpatch = set() - installable_cve_id_and_hotpatch = set() - uninstallable_cve_id_and_hotpatch = set() - iterated_cve_id_and_hotpatch = set() - - for ((nevra), aupdated), id2type in sorted(mapping_nevra_cve.items(), key=lambda x: x[0]): - pkg_name, pkg_evr, pkg_arch = nevra - coldpatch = '%s-%s.%s' % (pkg_name, pkg_evr, pkg_arch) - for cve_id, atypesev in id2type.items(): - label = type2label(self.updateinfo, *atypesev) - # if there is no hotpatch corresponding to the cve id, mark hotpatch as '-' - if cve_id not in self.hp_hawkey.hotpatch_cves or not self.hp_hawkey.hotpatch_cves[cve_id].hotpatches: - echo_line = (cve_id, label, coldpatch, '-') - echo_lines.add(echo_line) - continue - - for hotpatch in self.hp_hawkey.hotpatch_cves[cve_id].hotpatches: - # if cold patch name does not match with hotpatch required pkg name (target fix pkgs) - if pkg_name not in hotpatch._required_pkgs_info.keys(): - echo_line = (cve_id, label, coldpatch, '-') - echo_lines.add(echo_line) - continue - if hotpatch.state == self.hp_hawkey.INSTALLED: - # record the fixed cve_id and hotpatch, filter the packages that are lower than - # the currently installed package for solving the same cve and target required - # pakcage - fixed_cve_id_and_hotpatch.add((cve_id, hotpatch)) - iterated_cve_id_and_hotpatch.add((cve_id, hotpatch)) - elif hotpatch.state == self.hp_hawkey.INSTALLABLE: - # record the installable cve_id and hotpatch, filter the packages that are bigger - # than the currently installed package - installable_cve_id_and_hotpatch.add((cve_id, hotpatch)) - iterated_cve_id_and_hotpatch.add((cve_id, hotpatch)) - elif hotpatch.state == self.hp_hawkey.UNINSTALLABLE: - uninstallable_cve_id_and_hotpatch.add((cve_id, hotpatch)) - echo_line = (cve_id, label, coldpatch, hotpatch) - echo_lines.add(echo_line) - - self.add_untraversed_hotpatches( - echo_lines, - fixed_cve_id_and_hotpatch, - installable_cve_id_and_hotpatch, - uninstallable_cve_id_and_hotpatch, - iterated_cve_id_and_hotpatch, - ) - # lower version ACC hotpatch of fixed ACC hotpatch, is also considered to be fixed - fixed_cve_id_and_hotpatch = self.append_fixed_cve_id_and_hotpatch(fixed_cve_id_and_hotpatch) - # remove fixed cve and hotpatch from installable_cve_id_and_hotpatch - installable_cve_id_and_hotpatch = installable_cve_id_and_hotpatch.difference(fixed_cve_id_and_hotpatch) - display_item = self.get_filtered_display_item( - echo_lines, fixed_cve_id_and_hotpatch, installable_cve_id_and_hotpatch, iterated_cve_id_and_hotpatch - ) - - return display_item - - def add_untraversed_hotpatches( - self, - echo_lines: set, - fixed_cve_id_and_hotpatch: set, - installable_cve_id_and_hotpatch: set, - uninstallable_cve_id_and_hotpatch: set, - iterated_cve_id_and_hotpatch: set, - ): - """ - Add the echo lines, which are only with hotpatch but no coldpatch. And append - fixed_cve_id_and_hotpatch and iterated_cve_id_and_hotpatch. - - Args: - echo_lines(set) - fixed_cve_id_and_hotpatch(set) - installable_cve_id_and_hotpatch(set) - uninstallable_cve_id_and_hotpatch(set) - iterated_cve_id_and_hotpatch(set) - """ - for cve_id, cve in self.hp_hawkey.hotpatch_cves.items(): - for hotpatch in cve.hotpatches: - if hotpatch.state == self.hp_hawkey.UNRELATED: - continue - if (cve_id, hotpatch) in iterated_cve_id_and_hotpatch: - continue - if (cve_id, hotpatch) in uninstallable_cve_id_and_hotpatch: - continue - if hotpatch.state == self.hp_hawkey.INSTALLED: - fixed_cve_id_and_hotpatch.add((cve_id, hotpatch)) - iterated_cve_id_and_hotpatch.add((cve_id, hotpatch)) - elif hotpatch.state == self.hp_hawkey.INSTALLABLE: - installable_cve_id_and_hotpatch.add((cve_id, hotpatch)) - iterated_cve_id_and_hotpatch.add((cve_id, hotpatch)) - echo_line = (cve_id, hotpatch.advisory.severity + '/Sec.', '-', hotpatch) - echo_lines.add(echo_line) - - def display(self): - """ - Print the display lines according to the formatting parameters. - """ - display_item = self.get_formatting_parameters_and_display_lines() - idw, tiw, ciw, display_lines = display_item.idw, display_item.tiw, display_item.ciw, display_item.display_lines - for display_line in sorted( - display_lines, - key=lambda x: ( - x[self.COLDPATCH_INDEX], - x[self.HOTPATCH_INDEX], - x[self.CVE_ID_INDEX], - x[self.ADV_SEVERITY_INDEX], - ), - ): - print( - '%-*s %-*s %-*s %s' - % ( - idw, - display_line[self.CVE_ID_INDEX], - tiw, - display_line[self.ADV_SEVERITY_INDEX], - ciw, - display_line[self.COLDPATCH_INDEX], - display_line[self.HOTPATCH_INDEX], - ) - ) diff --git a/hotpatch/hotpatch.py b/hotpatch/hotpatch.py deleted file mode 100644 index 0664edf..0000000 --- a/hotpatch/hotpatch.py +++ /dev/null @@ -1,165 +0,0 @@ -#!/usr/bin/python3 -# ****************************************************************************** -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. -# licensed under the Mulan PSL v2. -# You can use this software according to the terms and conditions of the Mulan PSL v2. -# You may obtain a copy of Mulan PSL v2 at: -# http://license.coscl.org.cn/MulanPSL2 -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR -# PURPOSE. -# See the Mulan PSL v2 for more details. -# ******************************************************************************/ -import dnf -from dnfpluginscore import _, logger -from .syscare import Syscare -from .updateinfo_parse import HotpatchUpdateInfo - - -@dnf.plugin.register_command -class HotpatchCommand(dnf.cli.Command): - CVE_ID_INDEX = 0 - Name_INDEX = 1 - STATUS_INDEX = 2 - - aliases = ['hotpatch'] - summary = _('show hotpatch info') - syscare = Syscare() - - def __init__(self, cli): - """ - Initialize the command - """ - super(HotpatchCommand, self).__init__(cli) - - @staticmethod - def set_argparser(parser): - output_format = parser.add_mutually_exclusive_group() - output_format.add_argument( - '--list', nargs='?', type=str, default='', choices=['cve', 'cves'], help=_('show list of hotpatch') - ) - output_format.add_argument( - '--apply', type=str, default=None, dest='apply_name', nargs=1, help=_('apply hotpatch') - ) - output_format.add_argument( - '--remove', type=str, default=None, dest='remove_name', nargs=1, help=_('remove hotpatch') - ) - output_format.add_argument( - '--active', type=str, default=None, dest='active_name', nargs=1, help=_('active hotpatch') - ) - output_format.add_argument( - '--deactive', type=str, default=None, dest='deactive_name', nargs=1, help=_('deactive hotpatch') - ) - output_format.add_argument( - '--accept', type=str, default=None, dest='accept_name', nargs=1, help=_('accept hotpatch') - ) - - def configure(self): - demands = self.cli.demands - demands.sack_activation = True - demands.available_repos = True - - self.filter_cves = self.opts.cves if self.opts.cves else None - - def run(self): - self.hp_hawkey = HotpatchUpdateInfo(self.cli.base, self.cli) - if self.opts.list != '': - self.display() - if self.opts.apply_name: - self.operate_hot_patches(self.opts.apply_name, "apply", self.syscare.apply) - if self.opts.remove_name: - self.operate_hot_patches(self.opts.remove_name, "remove", self.syscare.remove) - if self.opts.active_name: - self.operate_hot_patches(self.opts.active_name, "active", self.syscare.active) - if self.opts.deactive_name: - self.operate_hot_patches(self.opts.deactive_name, "deactive", self.syscare.deactive) - if self.opts.accept_name: - self.operate_hot_patches(self.opts.accept_name, "accept", self.syscare.accept) - - def _filter_and_format_list_output(self, echo_lines: list): - """ - Only show specific cve information if cve id is given, and format the output. - """ - format_lines = [] - title = ['CVE-id', 'base-pkg/hotpatch', 'status'] - idw = len(title[0]) - naw = len(title[1]) - for echo_line in echo_lines: - cve_id, name, status = ( - echo_line[self.CVE_ID_INDEX], - echo_line[self.Name_INDEX], - echo_line[self.STATUS_INDEX], - ) - if self.filter_cves is not None and cve_id not in self.filter_cves: - continue - idw = max(idw, len(cve_id)) - naw = max(naw, len(name)) - format_lines.append([cve_id, name, status]) - - if not format_lines: - return - - # print title - if self.opts.list in ['cve', 'cves']: - print( - '%-*s %-*s %s' % (idw, title[self.CVE_ID_INDEX], naw, title[self.Name_INDEX], title[self.STATUS_INDEX]) - ) - else: - print('%-*s %s' % (naw, title[self.Name_INDEX], title[self.STATUS_INDEX])) - - format_lines.sort(key=lambda x: (x[self.Name_INDEX], x[self.CVE_ID_INDEX])) - - if self.opts.list in ['cve', 'cves']: - for cve_id, name, status in format_lines: - print('%-*s %-*s %s' % (idw, cve_id, naw, name, status)) - else: - new_format_lines = [(name, status) for _, name, status in format_lines] - deduplicated_format_lines = list(set(new_format_lines)) - deduplicated_format_lines.sort(key=new_format_lines.index) - for name, status in deduplicated_format_lines: - print('%-*s %s' % (naw, name, status)) - - def display(self): - """ - Display hotpatch information. - - e.g. - For the command of 'dnf hotpatch --list', the echo_lines is [[base-pkg/hotpatch, status], ...] - For the command of 'dnf hotpatch --list cve', the echo_lines is [[cve_id, base-pkg/hotpatch, status], ...] - """ - - hotpatch_cves = self.hp_hawkey.hotpatch_cves - echo_lines = [] - for cve_id in hotpatch_cves.keys(): - for hotpatch in hotpatch_cves[cve_id].hotpatches: - for name, status in self.hp_hawkey._hotpatch_state.items(): - if hotpatch.syscare_subname not in name: - continue - echo_line = [cve_id, name, status] - echo_lines.append(echo_line) - self._filter_and_format_list_output(echo_lines) - - def operate_hot_patches(self, target_patch: list, operate, func) -> None: - """ - operate hotpatch using syscare command - Args: - target_patch: type:list,e.g.:['redis-6.2.5-1/HP2'] - - Returns: - None - """ - if len(target_patch) != 1: - logger.error(_("using dnf hotpatch --%s wrong!"), operate) - return - target_patch = target_patch[0] - logger.info(_("Gonna %s this hot patch: %s"), operate, self.base.output.term.bold(target_patch)) - - output, status = func(target_patch) - if status: - logger.error( - _("%s hot patch '%s' failed, remain original status."), - operate, - self.base.output.term.bold(target_patch), - ) - else: - logger.info(_("%s hot patch '%s' succeed"), operate, self.base.output.term.bold(target_patch)) diff --git a/hotpatch/hotupgrade.py b/hotpatch/hotupgrade.py deleted file mode 100644 index f61e37f..0000000 --- a/hotpatch/hotupgrade.py +++ /dev/null @@ -1,482 +0,0 @@ -#!/usr/bin/python3 -# ****************************************************************************** -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. -# licensed under the Mulan PSL v2. -# You can use this software according to the terms and conditions of the Mulan PSL v2. -# You may obtain a copy of Mulan PSL v2 at: -# http://license.coscl.org.cn/MulanPSL2 -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR -# PURPOSE. -# See the Mulan PSL v2 for more details. -# ******************************************************************************/ -from __future__ import print_function - -from time import sleep -import dnf.base -import dnf.exceptions -import hawkey -from dnf.cli import commands -from dnf.cli.option_parser import OptionParser - -# from dnf.cli.output import Output -from dnfpluginscore import _, logger - -from .hot_updateinfo import HotUpdateinfoCommand -from .updateinfo_parse import HotpatchUpdateInfo -from .syscare import Syscare -from .version import Versions - -EMPTY_TAG = "-" - - -@dnf.plugin.register_command -class HotupgradeCommand(dnf.cli.Command): - aliases = ("hotupgrade",) - summary = "Hot upgrade package using hot patch." - usage = "" - syscare = Syscare() - hp_list = [] - - @staticmethod - def set_argparser(parser): - parser.add_argument( - 'packages', - nargs='*', - help=_('Package to upgrade'), - action=OptionParser.ParseSpecGroupFileCallback, - metavar=_('PACKAGE'), - ) - parser.add_argument( - "--takeover", default=False, action='store_true', help=_('kernel cold patch takeover operation') - ) - - def configure(self): - """Verify that conditions are met so that this command can run. - These include that there are enabled repositories with gpg - keys, and that this command is being run by the root user. - """ - demands = self.cli.demands - demands.sack_activation = True - demands.available_repos = True - demands.resolving = True - demands.root_user = True - - commands._checkGPGKey(self.base, self.cli) - if not self.opts.filenames: - commands._checkEnabledRepo(self.base) - - def run(self): - if self.opts.pkg_specs: - self.hp_list = self.opts.pkg_specs - elif self.opts.cves or self.opts.advisory: - cve_pkgs = self.get_hotpatch_based_on_cve(self.opts.cves) - advisory_pkgs = self.get_hotpatch_based_on_advisory(self.opts.advisory) - self.hp_list = cve_pkgs + advisory_pkgs - - else: - self.hp_list = self.get_hotpatch_of_all_cve() - if self.hp_list: - logger.info(_("Gonna apply all available hot patches: %s"), self.hp_list) - - available_hp_dict = self._get_available_hotpatches(self.hp_list) - if not available_hp_dict: - logger.info(_('No hot patches marked for install.')) - return - - applied_old_patches = self._get_applied_old_patch(list(available_hp_dict.values())) - if applied_old_patches: - self._remove_hot_patches(applied_old_patches) - else: - self.syscare.save() - success = self._install_rpm_pkg(list(available_hp_dict.keys())) - if not success: - logger.info(_("Error: Install hot patch failed, try to rollback.")) - output, status = self.syscare.restore() - if status: - raise dnf.exceptions.Error(_('Roll back failed.')) - logger.info(_("Roll back succeed.")) - return - - if self.opts.takeover: - self.takeover_operation() - return - - def run_transaction(self) -> None: - """ - apply hot patches - Returns: - None - """ - # syscare need a little bit time to process the installed hot patch - sleep(0.5) - for hp in self.hp_list: - self._apply_hp(hp) - if self.opts.takeover and self.is_need_accept_kernel_hp: - self._accept_kernel_hp(hp) - - def _apply_hp(self, hp_full_name): - pkg_info = self._parse_hp_name(hp_full_name) - hp_subname = self._get_hp_subname_for_syscare(pkg_info) - output, status = self.syscare.apply(hp_subname) - if status: - logger.info(_('Apply hot patch failed: %s.'), hp_subname) - else: - logger.info(_('Apply hot patch succeed: %s.'), hp_subname) - - @staticmethod - def _get_hp_subname_for_syscare(pkg_info: dict) -> str: - """ - get hotpatch's subname for syscare command. e.g. redis-1-1/ACC-1-1 - Args: - pkg_info: out put of _parse_hp_name. - - Returns: - str - """ - hp_subname = ( - "-".join([pkg_info["target_name"], pkg_info["target_version"], pkg_info["target_release"]]) - + '/' - + "-".join([pkg_info["hp_name"], pkg_info["hp_version"], pkg_info["hp_release"]]) - ) - return hp_subname - - def _get_available_hotpatches(self, pkg_specs: list) -> dict: - """ - check two conditions: - 1. the hot patch rpm package exists in repositories - 2. the hot patch's target package with specific version and release already installed - Args: - pkg_specs: full names of hot patches' rpm packages - - Returns: - dict: key is available hot patches' full name, - value is hot patches' operate name. e.g. kernel-5.10.0-60.66.0.91.oe2203/ACC-1-1 - """ - hp_map = {} - installed_packages = self.base.sack.query().installed() - for pkg_spec in set(pkg_specs): - query = self.base.sack.query() - # check the package exist in repo or not - subj = dnf.subject.Subject(pkg_spec) - parsed_nevras = subj.get_nevra_possibilities(forms=[hawkey.FORM_NEVRA]) - if len(parsed_nevras) != 1: - logger.info(_('Cannot parse NEVRA for package "{nevra}"').format(nevra=pkg_spec)) - continue - - parsed_nevra = parsed_nevras[0] - available_hp = query.available().filter( - name=parsed_nevra.name, - version=parsed_nevra.version, - release=parsed_nevra.release, - arch=parsed_nevra.arch, - ) - if not available_hp: - logger.info(_('No match for argument: %s'), self.base.output.term.bold(pkg_spec)) - continue - - # check the hot patch's target package installed or not - pkg_info = self._parse_hp_name(pkg_spec) - installed_pkg = installed_packages.filter( - name=pkg_info["target_name"], version=pkg_info["target_version"], release=pkg_info["target_release"] - ).run() - if not installed_pkg: - logger.info( - _("The hot patch's target package is not installed: %s"), self.base.output.term.bold(pkg_spec) - ) - continue - - if len(installed_pkg) != 1: - logger.info( - _("The hot patch '%s' has multiple target packages, please check."), - self.base.output.term.bold(pkg_spec), - ) - continue - hp_subname = self._get_hp_subname_for_syscare(pkg_info) - hp_map[pkg_spec] = hp_subname - return hp_map - - @staticmethod - def _get_applied_old_patch(available_hp_list: list) -> list: - """ - get targets' applied accumulative hot patches. - User can install and apply multiple sgl (single) hot patches because the rpm name is different, - but for acc (accumulative) hot patch, user can only install one for a specific target binary rpm. - Args: - available_hp_list: e.g. ['redis-1.0-1/ACC-1-1', 'redis-1.0-1/SGL_CVE_2022_1-1-1'] - - Returns: - list: applied hot patches. e.g. ['redis-1.0-1/ACC-1-1'] - """ - hotpatch_set = set() - hps_info = Syscare.list() - for hp_info in hps_info: - # hp_info[Name] is the middle column of syscare list. format: {target_rpm_name}/{hp_name}/{binary_file} - # a hotpatch is mapped to a target binary rpm, and may affect multiple binary executable binary files - # e.g. for hotpatch patch-redis-1-1-ACC-1-1.x86_64.rpm, it may provide 2 sub hotpatches in syscare list, - # and SGL hotpatches may be installed at the same time - # redis-1-1/ACC-1-1/redis - # redis-1-1/ACC-1-1/redis-cli - # redis-1-1/SGL_CVE_2022_1-1-1/redis - # redis-1-1/SGL_CVE_2022_2-1-1/redis - target, hp_name, binary_file = hp_info["Name"].split('/') - hotpatch = target + '/' + hp_name - # right now, if part of the hotpatch (for different binary file) is applied, - # we consider the hotpatch is applied - if hotpatch in available_hp_list and hp_info["Status"] != "NOT-APPLIED": - logger.info( - _("The hotpatch '%s' already has a '%s' sub hotpatch of binary file '%s'"), - hotpatch, - hp_info["Status"], - binary_file, - ) - if hotpatch not in hotpatch_set: - hotpatch_set.add(hotpatch) - return list(hotpatch_set) - - def _remove_hot_patches(self, applied_old_patches: list) -> None: - # output = Output(self.base, dnf.conf.Conf()) - logger.info(_("Gonna remove these hot patches: %s"), applied_old_patches) - # remove_flag = output.userconfirm() - # if not remove_flag: - # raise dnf.exceptions.Error(_('Operation aborted.')) - - self.syscare.save() - for hp_name in applied_old_patches: - logger.info(_("Remove hot patch %s."), hp_name) - output, status = self.syscare.remove(hp_name) - if status: - logger.info( - _("Remove hot patch '%s' failed, roll back to original status."), - self.base.output.term.bold(hp_name), - ) - output, status = self.syscare.restore() - if status: - raise dnf.exceptions.Error(_('Roll back failed.')) - raise dnf.exceptions.Error(_('Roll back succeed.')) - - @staticmethod - def _parse_hp_name(hp_filename: str) -> dict: - """ - parse hot patch's name, get target rpm's name, version, release and hp's name. - Args: - hp_filename: hot patch's name, in the format of - 'patch-{pkg_name}-{pkg_version}-{pkg_release}-{patchname}-{patch_version}-{patch_release}' - e.g. patch-kernel-5.10.0-60.66.0.91.oe2203-ACC-1-1.x86_64 - patch-kernel-5.10.0-60.66.0.91.oe2203-SGL_CVE_2022_1-1-1.x86_64 - pkg_name may have '-' in it, patch name cannot have '-'. - Returns: - dict: rpm info. {"target_name": "", "target_version": "", "target_release": "", "hp_name": "", - "hp_version": "", "hp_release": ""} - """ - hp_filename_format = ( - "patch-{pkg_name}-{pkg_version}-{pkg_release}-{patch_name}-" "{patch_version}-{patch_release}.{arch}" - ) - - remove_suffix_filename = hp_filename.rsplit(".", 1)[0] - splitted_hp_filename = remove_suffix_filename.split('-') - try: - rpm_info = { - "target_release": splitted_hp_filename[-4], - "target_version": splitted_hp_filename[-5], - "target_name": "-".join(splitted_hp_filename[1:-5]), - "hp_name": splitted_hp_filename[-3], - "hp_version": splitted_hp_filename[-2], - "hp_release": splitted_hp_filename[-1], - } - except IndexError as e: - raise dnf.exceptions.Error( - _( - "Parse hot patch name failed. Please insert correct hot patch name " - "with the format: \n %s" % hp_filename_format - ) - ) - return rpm_info - - def _install_rpm_pkg(self, pkg_specs: list) -> bool: - """ - install rpm package - Args: - pkg_specs: rpm package's full name - - Returns: - bool - """ - success = True - for pkg_spec in pkg_specs: - try: - self.base.install(pkg_spec) - except dnf.exceptions.MarkingError as e: - logger.info(_('No match for argument: %s.'), self.base.output.term.bold(pkg_spec)) - success = False - return success - - def get_hotpatch_based_on_cve(self, cves: list) -> list: - """ - Get the hot patches corresponding to CVEs - Args: - cves: cve id list - - Returns: - list: list of hot patches full name. e.g.["tmp2-tss-3.1.0-3.oe2203sp1"] - """ - updateinfo = HotpatchUpdateInfo(self.cli.base, self.cli) - hp_list = [] - cve_hp_dict = updateinfo.get_hotpatches_from_cve(cves) - for cve, hp in cve_hp_dict.items(): - if not hp: - logger.info(_("The cve doesn't exist or cannot be fixed by hotpatch: %s"), cve) - continue - hp_list += hp - return list(set(hp_list)) - - def get_hotpatch_based_on_advisory(self, advisories: list) -> list: - """ - Get the hot patches corresponding to advisories - Args: - advisories: advisory id list - - Returns: - list: list of hot patches full name. e.g.["tmp2-tss-3.1.0-3.oe2203sp1"] - """ - updateinfo = HotpatchUpdateInfo(self.cli.base, self.cli) - hp_list = [] - advisory_hp_dict = updateinfo.get_hotpatches_from_advisories(advisories) - for hp in advisory_hp_dict.values(): - hp_list += hp - return list(set(hp_list)) - - def get_hotpatch_of_all_cve(self) -> list: - """ - upgrade all exist cve using hot patches - 1. find all cves when init HotpatchUpdateInfo - 2. get the recommended hot patch for each cve - 3. deduplication - Returns: - ['patch-redis-6.2.5-1-HP2-1-1.x86_64'] - """ - updateinfo = HotpatchUpdateInfo(self.cli.base, self.cli) - cve_list = self.get_all_cve_which_can_be_fixed_by_hotpatch() - hp_list = [] - cve_hp_dict = updateinfo.get_hotpatches_from_cve(cve_list) - for hp in cve_hp_dict.values(): - if not hp: - continue - hp_list += hp - return list(set(hp_list)) - - def get_all_cve_which_can_be_fixed_by_hotpatch(self) -> list: - """ - get all unfixed cve which can be fixed by hotpatch - use command : dnf hot-updateinfo list cves - Last metadata expiration check: 0:48:26 ago on 2023年06月01日 星期四 20时29分55秒. - CVE-2023-3332 Low/Sec. - - - CVE-2023-3331 Low/Sec. - - - CVE-2023-1111 Important/Sec. - patch-redis-6.2.5-1-ACC-1-1.x86_64 - CVE-2023-1111 Important/Sec. - patch-redis-cli-6.2.5-1-ACC-1-1.x86_64 - - Returns: list of unfixed cve. e.g.['CVE-2023-1111'] - """ - hp_hawkey = HotpatchUpdateInfo(self.cli.base, self.cli) - hot_updateinfo = HotUpdateinfoCommand(self.cli) - hot_updateinfo.opts = self.opts - hot_updateinfo.hp_hawkey = hp_hawkey - hot_updateinfo.filter_cves = None - hot_updateinfo.opts.availability = 'available' - all_cves = hot_updateinfo.get_formatting_parameters_and_display_lines() - cve_set = set() - for display_line in all_cves.display_lines: - if display_line[3] != EMPTY_TAG: - cve_set.add(display_line[0]) - return list(cve_set) - - def takeover_operation(self): - """ - process takeover operation. - """ - kernel_coldpatch = self.get_target_installed_kernel_coldpatch_of_hotpatch() - self.is_need_accept_kernel_hp = False - if kernel_coldpatch: - logger.info(_("Gonna takeover kernel cold patch: ['%s']" % kernel_coldpatch)) - success = self._install_rpm_pkg([kernel_coldpatch]) - if success: - return - logger.info(_("Takeover operation failed.")) - else: - logger.info(_('No kernel cold patch matched for takeover.')) - self.is_need_accept_kernel_hp = True - logger.info( - _( - 'To maintain the effectiveness of kernel hot patch after rebooting, gonna accept available kernel hot patch.' - ) - ) - return - - def get_target_installed_kernel_coldpatch_of_hotpatch(self) -> str: - """ - get the highest kernel cold patch of hot patch in "dnf hot-updateinfo list cves", if the corresponding - kernel cold patch exists. - - Returns: - str: full name of cold patch. e.g. kernel-5.10.0-1.x86_64 - """ - hp_hawkey = HotpatchUpdateInfo(self.cli.base, self.cli) - hot_updateinfo = HotUpdateinfoCommand(self.cli) - hot_updateinfo.opts = self.opts - hot_updateinfo.hp_hawkey = hp_hawkey - hot_updateinfo.filter_cves = None - hot_updateinfo.opts.availability = 'available' - all_cves = hot_updateinfo.get_formatting_parameters_and_display_lines() - kernel_highest_vere = "" - target_kernel_coldpatch = None - version = Versions() - for display_line in all_cves.display_lines: - if display_line[3] not in self.hp_list: - continue - kernel_pkg_spec = display_line[2] - if kernel_pkg_spec == EMPTY_TAG: - continue - pkg_name, pkg_ver = self._get_pkg_name_and_ver(kernel_pkg_spec) - if pkg_name != "kernel": - continue - if kernel_highest_vere == "": - kernel_highest_vere = pkg_ver - target_kernel_coldpatch = kernel_pkg_spec - continue - if version.larger_than(pkg_ver, kernel_highest_vere): - kernel_highest_vere = pkg_ver - target_kernel_coldpatch = kernel_pkg_spec - - return target_kernel_coldpatch - - def _get_pkg_name_and_ver(self, pkg_spec: str): - """ - get the "name" and "version-release" string of package. - - Args: - str: pkg_specs: full name of cold patch. e.g. kernel-5.10.0-1.x86_64 - """ - subj = dnf.subject.Subject(pkg_spec) - parsed_nevras = subj.get_nevra_possibilities(forms=[hawkey.FORM_NEVRA]) - if len(parsed_nevras) != 1: - logger.info(_('Cannot parse NEVRA for package "{nevra}"').format(nevra=pkg_spec)) - return "" - parsed_nevra = parsed_nevras[0] - return parsed_nevra.name, "%s-%s" % (parsed_nevra.version, parsed_nevra.release) - - def _accept_kernel_hp(self, hp_full_name: str): - """ - accept kernel hot patch - - Args: - str: hp_full_name: full name of hot patch. e.g. patch-kernel-5.10.0-1-ACC-1-1.x86_64 - """ - pkg_info = self._parse_hp_name(hp_full_name) - if pkg_info['target_name'] != "kernel": - return - hp_subname = self._get_hp_subname_for_syscare(pkg_info) - output, status = self.syscare.accept(hp_subname) - if status: - logger.info(_('Accept kernel hot patch failed: %s.'), hp_subname) - else: - logger.info(_('Accept kernel hot patch succeed: %s.'), hp_subname) diff --git a/hotpatch/syscare.py b/hotpatch/syscare.py deleted file mode 100644 index a858ed4..0000000 --- a/hotpatch/syscare.py +++ /dev/null @@ -1,116 +0,0 @@ -#!/usr/bin/python3 -# ****************************************************************************** -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. -# licensed under the Mulan PSL v2. -# You can use this software according to the terms and conditions of the Mulan PSL v2. -# You may obtain a copy of Mulan PSL v2 at: -# http://license.coscl.org.cn/MulanPSL2 -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR -# PURPOSE. -# See the Mulan PSL v2 for more details. -# ******************************************************************************/ -import subprocess -from typing import List - -SUCCEED = 0 -FAIL = 255 - - -def cmd_output(cmd): - try: - result = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - result.wait() - return result.stdout.read().decode('utf-8'), result.returncode - except Exception as e: - print("error: ", e) - return str(e), FAIL - - -class Syscare: - @classmethod - def list(cls, condition=None) -> List[dict]: - """ - Target Name Status - redis-6.2.5-1.oe2203 CVE-2021-23675 ACTIVED - kernel-5.10.0-60.80.0.104.oe2203 modify-proc-version ACTIVED - """ - cmd = ["syscare", "list"] - list_output, return_code = cmd_output(cmd) - if return_code != SUCCEED: - return [] - - content = list_output.split('\n') - if len(content) <= 2: - return [] - - header = content[0].split() - result = [] - for item in content[1:-1]: - tmp = dict(zip(header, item.split())) - if not condition or cls.judge(tmp, condition): - result.append(tmp) - return result - - @staticmethod - def judge(content: dict, condition: dict): - for key, value in condition.items(): - if content.get(key) != value: - return False - return True - - @staticmethod - def status(patch_name: str): - cmd = ["syscare", "status", patch_name] - output, return_code = cmd_output(cmd) - - return output, return_code - - @staticmethod - def active(patch_name: str): - cmd = ["syscare", "active", patch_name] - output, return_code = cmd_output(cmd) - - return output, return_code - - @staticmethod - def deactive(patch_name: str): - cmd = ["syscare", "deactive", patch_name] - output, return_code = cmd_output(cmd) - - return output, return_code - - @staticmethod - def remove(patch_name: str): - cmd = ["syscare", "remove", patch_name] - output, return_code = cmd_output(cmd) - - return output, return_code - - @staticmethod - def apply(patch_name: str): - cmd = ["syscare", "apply", patch_name] - output, return_code = cmd_output(cmd) - - return output, return_code - - @staticmethod - def save(): - cmd = ["syscare", "save"] - output, return_code = cmd_output(cmd) - - return output, return_code - - @staticmethod - def restore(): - cmd = ["syscare", "restore"] - output, return_code = cmd_output(cmd) - - return output, return_code - - @staticmethod - def accept(patch_name: str): - cmd = ["syscare", "accept", patch_name] - output, return_code = cmd_output(cmd) - - return output, return_code diff --git a/hotpatch/test_hotpatch.py b/hotpatch/test_hotpatch.py deleted file mode 100644 index e239c86..0000000 --- a/hotpatch/test_hotpatch.py +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/python3 -# ****************************************************************************** -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. -# licensed under the Mulan PSL v2. -# You can use this software according to the terms and conditions of the Mulan PSL v2. -# You may obtain a copy of Mulan PSL v2 at: -# http://license.coscl.org.cn/MulanPSL2 -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR -# PURPOSE. -# See the Mulan PSL v2 for more details. -# ******************************************************************************/ -import unittest -from unittest import mock - -from .hotpatch import HotpatchCommand -from .syscare import SUCCEED, FAIL - - -class HotpatchTestCase(unittest.TestCase): - def setUp(self) -> None: - self.cli = mock.MagicMock() - self.cmd = HotpatchCommand(self.cli) - - def test_operate_hot_patches_should_return_none(self): - target_patch = ["patch1"] - operate = mock.MagicMock() - func = mock.MagicMock() - func.return_value = ("", FAIL) - self.assertIsNone(self.cmd.operate_hot_patches(target_patch, operate, func)) - self.assertEqual(self.cmd.base.output.term.bold.call_args_list[1][0][0], target_patch[0]) - - func.return_value = ("", SUCCEED) - self.assertIsNone(self.cmd.operate_hot_patches(target_patch, operate, func)) - self.assertEqual(self.cmd.base.output.term.bold.call_args_list[1][0][0], target_patch[0]) - - def test_operate_hot_patches_should_return_none_when_target_patch_is_none(self): - target_patch = [] - operate = "apply" - func = mock.MagicMock() - self.assertIsNone(self.cmd.operate_hot_patches(target_patch, operate, func)) - - -if __name__ == '__main__': - unittest.main() diff --git a/hotpatch/test_hotupgrade.py b/hotpatch/test_hotupgrade.py deleted file mode 100644 index 7f003c7..0000000 --- a/hotpatch/test_hotupgrade.py +++ /dev/null @@ -1,75 +0,0 @@ -#!/usr/bin/python3 -# ****************************************************************************** -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. -# licensed under the Mulan PSL v2. -# You can use this software according to the terms and conditions of the Mulan PSL v2. -# You may obtain a copy of Mulan PSL v2 at: -# http://license.coscl.org.cn/MulanPSL2 -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR -# PURPOSE. -# See the Mulan PSL v2 for more details. -# ******************************************************************************/ -import unittest -from unittest import mock - -from .updateinfo_parse import HotpatchUpdateInfo -from .hotupgrade import HotupgradeCommand - - -class UpgradeTestCase(unittest.TestCase): - def setUp(self) -> None: - cli = mock.MagicMock() - self.cmd = HotupgradeCommand(cli) - - @mock.patch.object(HotpatchUpdateInfo, "get_hotpatches_from_cve") - def test_get_hotpatch_based_on_cve_should_return_empty_list_when_no_patch_found(self, mock_patch): - mock_patch.return_value = {"CVE-2022-1": ["patch-kernel-4.19-1-ACC-1-1"], - "CVE-2022-2": ["patch-kernel-4.19-1-ACC-1-1"]} - res = self.cmd.get_hotpatch_based_on_cve(["CVE-2022-3"]) - expected_res = [] - self.assertEqual(res, expected_res) - - @mock.patch.object(HotpatchUpdateInfo, "get_hotpatches_from_cve") - def test_get_hotpatch_based_on_cve_should_return_correct_when_two_cve_have_same_patch(self, mock_patch): - mock_patch.return_value = {"CVE-2022-1": ["patch-kernel-4.19-1-ACC-1-1"], - "CVE-2022-2": ["patch-kernel-4.19-1-ACC-1-1", "patch-kernel-tools-4.19-1-ACc-1-1"]} - res = self.cmd.get_hotpatch_based_on_cve(["CVE-2022-1", "CVE-2022-2"]) - expected_res = ["patch-kernel-4.19-1-ACC-1-1", "patch-kernel-tools-4.19-1-ACc-1-1"] - self.assertEqual(res, expected_res) - - @mock.patch.object(HotpatchUpdateInfo, "get_hotpatches_from_advisories") - def test_get_hotpatch_based_on_advisory_should_return_empty_list_when_no_patch_found(self, mock_patch): - mock_patch.return_value = {"SA-2022-1": ["patch-kernel-4.19-1-ACC-1-1"], - "SA-2022-2": ["patch-kernel-4.19-1-ACC-1-1"]} - res = self.cmd.get_hotpatch_based_on_advisory(["SA-2022-3"]) - expected_res = [] - self.assertEqual(res, expected_res) - - @mock.patch.object(HotpatchUpdateInfo, "get_hotpatches_from_advisories") - def test_get_hotpatch_based_on_advisory_should_return_correct_when_two_cve_have_same_patch(self, mock_patch): - mock_patch.return_value = {"SA-2022-1": ["patch-kernel-4.19-1-ACC-1-1"], - "SA-2022-2": ["patch-kernel-4.19-1-ACC-1-1", "patch-kernel-tools-4.19-1-ACc-1-1"]} - res = self.cmd.get_hotpatch_based_on_cve(["SA-2022-2"]) - expected_res = ["patch-kernel-4.19-1-ACC-1-1", "patch-kernel-tools-4.19-1-ACc-1-1"] - self.assertEqual(res, expected_res) - - @mock.patch.object(HotpatchUpdateInfo, "get_hotpatch_of_all_cve") - def test_get_hotpatch_of_all_cve_should_return_empty_list_when_no_patch_found(self, mock_patch): - mock_patch.return_value = {"CVE-2022-1": [], - "CVE-2022-2": []} - res = self.cmd.get_hotpatch_of_all_cve() - expected_res = [] - self.assertEqual(res, expected_res) - - @mock.patch.object(HotpatchUpdateInfo, "get_hotpatch_of_all_cve") - def test_get_hotpatch_of_all_cve_should_return_correct_when_two_cve_have_same_patch(self, mock_patch): - mock_patch.return_value = {"CVE-2022-1": ["patch-kernel-4.19-1-ACC-1-1"], - "CVE-2022-2": ["patch-kernel-4.19-1-ACC-1-1", "patch-kernel-tools-4.19-1-ACc-1-1"]} - res = self.cmd.get_hotpatch_of_all_cve() - expected_res = ["patch-kernel-4.19-1-ACC-1-1", "patch-kernel-tools-4.19-1-ACc-1-1"] - self.assertEqual(res, expected_res) - - -if __name__ == '__main__': - unittest.main() diff --git a/hotpatch/test_syscare.py b/hotpatch/test_syscare.py deleted file mode 100644 index 50903ed..0000000 --- a/hotpatch/test_syscare.py +++ /dev/null @@ -1,152 +0,0 @@ -#!/usr/bin/python3 -# ****************************************************************************** -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. -# licensed under the Mulan PSL v2. -# You can use this software according to the terms and conditions of the Mulan PSL v2. -# You may obtain a copy of Mulan PSL v2 at: -# http://license.coscl.org.cn/MulanPSL2 -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR -# PURPOSE. -# See the Mulan PSL v2 for more details. -# ******************************************************************************/ -import unittest -from unittest import mock - -from .syscare import SUCCEED, FAIL -from .syscare import Syscare, cmd_output - - -class SyscareTestCase(unittest.TestCase): - @mock.patch("hotpatch.syscare.cmd_output") - def test_list_should_return_empty_when_cmd_output_nothing(self, mock_cmd): - mock_cmd.return_value = "", FAIL - result = Syscare().list() - self.assertEqual(result, []) - - @mock.patch("hotpatch.syscare.cmd_output") - def test_list_should_return_empty_when_cmd_output_only_header(self, mock_cmd): - mock_cmd.return_value = "Target Name Status\n", SUCCEED - result = Syscare().list() - self.assertEqual(result, []) - - @mock.patch("hotpatch.syscare.cmd_output") - def test_list_should_return_correct_result_when_cmd_output_correct(self, mock_cmd): - mock_cmd.return_value = "Target Name Status\nredis-6.2.5-1.oe2203 CVE-2021-23675 ACTIVED\n", SUCCEED - result = Syscare().list() - expected_res = [{'Target': 'redis-6.2.5-1.oe2203', 'Name': 'CVE-2021-23675', 'Status': 'ACTIVED'}] - self.assertEqual(result, expected_res) - - @mock.patch("hotpatch.syscare.cmd_output") - def test_list_should_return_filtered_result_when_input_with_condition(self, mock_cmd): - mock_cmd.return_value = ( - "Target Name Status\nredis-6.2.5-1.oe2203 CVE-2021-23675 ACTIVED\n" - "kernel-5.10.0-60.80.0.104.oe2203 modify-proc-version DEACTIVED\n", - SUCCEED, - ) - result = Syscare().list(condition={"Status": "ACTIVED"}) - expected_res = [{'Target': 'redis-6.2.5-1.oe2203', 'Name': 'CVE-2021-23675', 'Status': 'ACTIVED'}] - self.assertEqual(result, expected_res) - - @mock.patch("hotpatch.syscare.cmd_output") - def test_status_should_return_correct(self, mock_cmd): - mock_cmd.return_value = "ACTIVED", SUCCEED - patch_name = mock.MagicMock() - result, status_code = Syscare().status(patch_name) - expected_res = "ACTIVED" - expected_code = SUCCEED - self.assertEqual(result, expected_res) - self.assertEqual(status_code, expected_code) - - @mock.patch("hotpatch.syscare.cmd_output") - def test_apply_should_return_correct(self, mock_cmd): - mock_cmd.return_value = "", SUCCEED - patch_name = mock.MagicMock() - result, status_code = Syscare().apply(patch_name) - expected_res = "" - expected_code = SUCCEED - self.assertEqual(result, expected_res) - self.assertEqual(status_code, expected_code) - - @mock.patch("hotpatch.syscare.cmd_output") - def test_active_should_return_correct(self, mock_cmd): - mock_cmd.return_value = "", SUCCEED - patch_name = mock.MagicMock() - result, status_code = Syscare().active(patch_name) - expected_res = "" - expected_code = SUCCEED - self.assertEqual(result, expected_res) - self.assertEqual(status_code, expected_code) - - @mock.patch("hotpatch.syscare.cmd_output") - def test_deactive_should_return_correct(self, mock_cmd): - mock_cmd.return_value = "", SUCCEED - patch_name = mock.MagicMock() - result, status_code = Syscare().deactive(patch_name) - expected_res = "" - expected_code = SUCCEED - self.assertEqual(result, expected_res) - self.assertEqual(status_code, expected_code) - - @mock.patch("hotpatch.syscare.cmd_output") - def test_accept_should_return_correct(self, mock_cmd): - mock_cmd.return_value = "", SUCCEED - patch_name = mock.MagicMock() - result, status_code = Syscare().accept(patch_name) - expected_res = "" - expected_code = SUCCEED - self.assertEqual(result, expected_res) - self.assertEqual(status_code, expected_code) - - @mock.patch("hotpatch.syscare.cmd_output") - def test_remove_should_return_correct(self, mock_cmd): - mock_cmd.return_value = "", SUCCEED - patch_name = mock.MagicMock() - result, status_code = Syscare().remove(patch_name) - expected_res = "" - expected_code = SUCCEED - self.assertEqual(result, expected_res) - self.assertEqual(status_code, expected_code) - - @mock.patch("hotpatch.syscare.cmd_output") - def test_save_should_return_correct(self, mock_cmd): - mock_cmd.return_value = "", SUCCEED - result, status_code = Syscare().save() - expected_res = "" - expected_code = SUCCEED - self.assertEqual(result, expected_res) - self.assertEqual(status_code, expected_code) - - @mock.patch("hotpatch.syscare.cmd_output") - def test_restore_should_return_correct(self, mock_cmd): - mock_cmd.return_value = "", SUCCEED - result, status_code = Syscare().restore() - expected_res = "" - expected_code = SUCCEED - self.assertEqual(result, expected_res) - self.assertEqual(status_code, expected_code) - - @mock.patch('subprocess.Popen') - def test_cmd_output_should_return_correct_when_popen_return_success(self, mock_popen): - expected_output = "Hello" - expected_returncode = SUCCEED - mock_process = mock_popen.return_value - mock_process.stdout.read.return_value = expected_output.encode('utf-8') - mock_process.returncode = expected_returncode - output, returncode = cmd_output(['echo', 'hello']) - self.assertEqual(output, expected_output) - self.assertEqual(returncode, expected_returncode) - - @mock.patch('subprocess.Popen') - def test_cmd_output_should_raise_exception_when_popen_excute_fail(self, mock_popen): - expected_output = "-bash: hello:command not found" - expected_returncode = FAIL - mock_popen.side_effect = Exception('-bash: hello:command not found') - cmd = mock.MagicMock() - output, returncode = cmd_output(cmd) - self.assertEqual(output, expected_output) - self.assertEqual(returncode, expected_returncode) - - -if __name__ == '__main__': - unittest.main() diff --git a/hotpatch/updateinfo_parse.py b/hotpatch/updateinfo_parse.py deleted file mode 100644 index 5fe36da..0000000 --- a/hotpatch/updateinfo_parse.py +++ /dev/null @@ -1,527 +0,0 @@ -#!/usr/bin/python3 -# ****************************************************************************** -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. -# licensed under the Mulan PSL v2. -# You can use this software according to the terms and conditions of the Mulan PSL v2. -# You may obtain a copy of Mulan PSL v2 at: -# http://license.coscl.org.cn/MulanPSL2 -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR -# PURPOSE. -# See the Mulan PSL v2 for more details. -# ******************************************************************************/ -import os -import re -import gzip -import datetime -import subprocess -import xml.etree.ElementTree as ET -from functools import cmp_to_key -from typing import List -from .baseclass import Hotpatch, Cve, Advisory -from .syscare import Syscare -from .version import Versions - -SUCCEED = 0 -FAIL = 255 - - -def cmd_output(cmd): - try: - result = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - result.wait() - return result.stdout.read().decode('utf-8'), result.returncode - except Exception as e: - print("error: ", e) - return str(e), FAIL - - -class HotpatchUpdateInfo(object): - """ - Hotpatch relevant updateinfo processing - """ - - UNINSTALLABLE = 0 - INSTALLED = 1 - INSTALLABLE = 2 - UNRELATED = 3 - - syscare = Syscare() - version = Versions() - - def __init__(self, base, cli): - self.base = base - self.cli = cli - # dict {advisory_id: Advisory} - self._hotpatch_advisories = {} - # dict {cve_id: Cve} - self._hotpatch_cves = {} - # list [{'Uuid': uuid, 'Name':name, 'Status': status}] - self._hotpatch_status = [] - # dict {required_pkg_info_str: [Hotpatch]} - self._hotpatch_required_pkg_info_str = {} - - self.init_hotpatch_info() - - def init_hotpatch_info(self): - """ - Initialize hotpatch information - """ - self._get_installed_pkgs() - self._parse_and_store_hotpatch_info_from_updateinfo() - self._init_hotpatch_status_from_syscare() - self._init_hotpatch_state() - - @property - def hotpatch_cves(self): - return self._hotpatch_cves - - @property - def hotpatch_status(self): - return self._hotpatch_status - - @property - def hotpatch_required_pkg_info_str(self): - return self._hotpatch_required_pkg_info_str - - def _get_installed_pkgs(self): - """ - Get installed packages by setting the hawkey - """ - sack = self.base.sack - # the latest installed packages - q = sack.query().installed().latest(1) - # plus packages of the running kernel - kernel_q = sack.query().filterm(empty=True) - kernel = sack.get_running_kernel() - if kernel: - kernel_q = kernel_q.union(sack.query().filterm(sourcerpm=kernel.sourcerpm)) - q = q.union(kernel_q.installed()) - q = q.apply() - - self._inst_pkgs_query = q - - def _parse_and_store_hotpatch_info_from_updateinfo(self): - """ - Initialize hotpatch information from repos - """ - # get xxx-updateinfo.xml.gz file paths by traversing the system_cachedir(/var/cache/dnf) - system_cachedir = self.cli.base.conf.system_cachedir - all_repos = self.cli.base.repos - map_repo_updateinfoxml = {} - - for file in os.listdir(system_cachedir): - file_path = os.path.join(system_cachedir, file) - if os.path.isdir(file_path): - repodata_path = os.path.join(file_path, "repodata") - if not os.path.isdir(repodata_path): - continue - - for xml_file in os.listdir(repodata_path): - # the hotpatch relevant updateinfo is recorded in xxx-updateinfo.xml.gz - if "updateinfo" in xml_file: - repo_name = file.rsplit("-", 1)[0] - cache_updateinfo_xml_path = os.path.join(repodata_path, xml_file) - map_repo_updateinfoxml[repo_name] = cache_updateinfo_xml_path - - # only hotpatch relevant updateinfo from enabled repos are parsed and stored - for repo in all_repos.iter_enabled(): - repo_id = repo.id - if repo_id in map_repo_updateinfoxml: - updateinfoxml_path = map_repo_updateinfoxml[repo_id] - self._parse_and_store_from_xml(updateinfoxml_path) - - def _parse_pkglist(self, pkglist): - """ - Parse the pkglist information, filter the hotpatches with different arches - """ - hotpatches = [] - hot_patch_collection = pkglist.find('hot_patch_collection') - arches = self.base.sack.list_arches() - if not hot_patch_collection: - return hotpatches - for package in hot_patch_collection.iter('package'): - hotpatch = {key: value for key, value in package.items()} - if hotpatch['arch'] not in arches: - continue - hotpatch['filename'] = package.find('filename').text - hotpatches.append(hotpatch) - return hotpatches - - def _parse_references(self, reference): - """ - Parse the reference information, check whether the 'id' is missing - """ - cves = [] - for ref in reference: - cve = {key: value for key, value in ref.items()} - if 'id' not in cve: - continue - cves.append(cve) - return cves - - def _verify_date_str_lawyer(self, datetime_str: str) -> str: - """ - Check whether the 'datetime' field is legal, if not return default value - """ - if datetime_str.isdigit() and len(datetime_str) == 10: - datetime_str = int(datetime_str) - datetime_str = datetime.datetime.fromtimestamp(datetime_str).strftime("%Y-%m-%d %H:%M:%S") - try: - datetime.datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S') - return datetime_str - except ValueError: - return "1970-01-01 08:00:00" - - def _parse_advisory(self, update): - """ - Parse the advisory information: check whether the 'datetime' field is legal, parse the 'references' - field and the 'pkglist' field, save 'type' information - """ - advisory = {} - for node in update: - if node.tag == 'datetime': - advisory[node.tag] = self._verify_date_str_lawyer(update.find(node.tag).text) - elif node.tag == 'references': - advisory[node.tag] = self._parse_references(node) - elif node.tag == 'pkglist': - advisory['hotpatches'] = self._parse_pkglist(node) - else: - advisory[node.tag] = update.find(node.tag).text - advisory['adv_type'] = update.get('type') - return advisory - - def _get_hotpatch_require_pkgs_info(self, hotpatch: Hotpatch): - """ - Get require packages from requires info of hotpatch packages. Specifically, read the require - information of the rpm package, get the target coldpatch rpm package, and record the - name_vere(name-version-release) information of the coldpatch. - - Returns: - require pkgs: dict - e.g. - { - 'redis': '6.2.5-1', - 'redis-cli': '6.2.5-1' - } - """ - require_pkgs = dict() - cmd = ["dnf", "repoquery", "--requires", hotpatch.nevra] - requires_output, return_code = cmd_output(cmd) - if return_code != SUCCEED: - return require_pkgs - requires_output = requires_output.split('\n') - for requires in requires_output: - if " = " not in requires: - continue - require_pkg = requires - pkg_name, pkg_vere = re.split(' = ', require_pkg) - if pkg_name == "syscare": - continue - require_pkgs[pkg_name] = pkg_vere - return require_pkgs - - def _store_advisory_info(self, advisory_kwargs: dict): - """ - Instantiate Cve, Hotpatch and Advisory object according to the advisory kwargs - """ - advisory_references = advisory_kwargs.pop('references') - advisory_hotpatches = advisory_kwargs.pop('hotpatches') - advisory = Advisory(**advisory_kwargs) - advisory_cves = {} - for cve_kwargs in advisory_references: - cve = Cve(**cve_kwargs) - - if cve.cve_id not in self._hotpatch_cves: - self._hotpatch_cves[cve.cve_id] = cve - advisory_cves[cve.cve_id] = self._hotpatch_cves[cve.cve_id] - - advisory.cves = advisory_cves - - for hotpatch_kwargs in advisory_hotpatches: - hotpatch = Hotpatch(**hotpatch_kwargs) - hotpatch.advisory = advisory - hotpatch.cves = advisory_cves.keys() - hotpatch.required_pkgs_info = self._get_hotpatch_require_pkgs_info(hotpatch) - - advisory.add_hotpatch(hotpatch) - - for ref_id in advisory_cves.keys(): - advisory_cves[ref_id].add_hotpatch(hotpatch) - - hotpatch_vere = "%s-%s" % (hotpatch.version, hotpatch.release) - self._hotpatch_required_pkg_info_str.setdefault(hotpatch.required_pkgs_str, []).append( - [hotpatch_vere, hotpatch] - ) - - self._hotpatch_advisories[advisory_kwargs['id']] = advisory - - def _init_hotpatch_state(self): - """ - Initialize the hotpatch state, according to the target require package information and status - information in syscare list. - """ - for advisory in self._hotpatch_advisories.values(): - for hotpatch in advisory.hotpatches: - self._set_hotpatch_state(hotpatch) - - def _set_hotpatch_state(self, hotpatch: Hotpatch): - """ - Set hotpatch state. - - each hotpatch has four states: - 1. UNRELATED: the target required pakcage version is smaller than the version of package - installed on this machine or not installed on this machine - 2. UNINSTALLABLE: can not be installed due to the target required package version is bigger - 3. INSTALLED: has been installed and actived/accepted in syscare - 4. INSTALLABLE: can be installed - - Args: - hotpatch(Hotpatch) - """ - hotpatch.state = self.UNRELATED - is_find_installable_hp = False - for required_pkg_name, required_pkg_vere in hotpatch.required_pkgs_info.items(): - inst_pkgs = self._inst_pkgs_query.filter(name=required_pkg_name) - # check whether the relevant target required package is installed on this machine - if not inst_pkgs: - return - for inst_pkg in inst_pkgs: - inst_pkg_vere = '%s-%s' % (inst_pkg.version, inst_pkg.release) - if not self.version.larger_than(required_pkg_vere, inst_pkg_vere): - hotpatch.state = self.UNRELATED - elif required_pkg_vere != inst_pkg_vere: - hotpatch.state = self.UNINSTALLABLE - else: - is_find_installable_hp = True - - if not is_find_installable_hp: - return - - if self._get_hotpatch_aggregated_status_in_syscare(hotpatch) in ('ACTIVED', "ACCEPTED"): - hotpatch.state = self.INSTALLED - else: - hotpatch.state = self.INSTALLABLE - return - - def _parse_and_store_from_xml(self, updateinfoxml: str): - """ - Parse and store hotpatch update information from xxx-updateinfo.xml.gz - - xxx-updateinfo.xml.gz e.g. - - - - - openEuler-SA-2022-1 - An update for mariadb is now available for openEuler-22.03-LTS - Important - openEuler - - - - - - patch-redis-6.2.5-1-HP001.(CVE-2022-24048) - - - openEuler - - patch-redis-6.2.5-1-HP001-1-1.aarch64.rpm - - - patch-redis-6.2.5-1-HP001-1-1.x86_64.rpm - - - patch-redis-6.2.5-1-HP002-1-1.aarch64.rpm - - - patch-redis-6.2.5-1-HP002-1-1.x86_64.rpm - - - - - ... - - """ - content = gzip.open(updateinfoxml) - tree = ET.parse(content) - root = tree.getroot() - for update in root.iter('update'): - # check whether the hotpatch relevant package information is in each advisory - if not update.find('pkglist/hot_patch_collection'): - continue - advisory = self._parse_advisory(update) - self._store_advisory_info(advisory) - - def _init_hotpatch_status_from_syscare(self): - """ - Initialize hotpatch status from syscare - """ - self._hotpatch_status = self.syscare.list() - self._hotpatch_state = {} - for hotpatch_info in self._hotpatch_status: - self._hotpatch_state[hotpatch_info['Name']] = hotpatch_info['Status'] - - def _get_hotpatch_aggregated_status_in_syscare(self, hotpatch: Hotpatch) -> str: - """ - Get hotpatch aggregated status in syscare. - """ - hotpatch_status = '' - - for name, status in self._hotpatch_state.items(): - if hotpatch.syscare_subname not in name: - continue - elif status in ('ACTIVED', 'ACCEPTED'): - hotpatch_status = status - elif status not in ('ACTIVED', 'ACCEPTED'): - return '' - return hotpatch_status - - def get_hotpatches_from_cve(self, cves: List[str], priority="ACC") -> dict(): - """ - Get hotpatches from specified cve. If there are several hotpatches for the same target required - package for a cve, only return the hotpatch with the highest version according to the priority. - - Args: - cves: [cve_id_1, cve_id_2] - - Returns: - { - cve_id_1: [hotpatch1], - cve_id_2: [] - } - """ - - mapping_cve_hotpatches = dict() - if priority not in ("ACC", "SGL"): - return mapping_cve_hotpatches - - for cve_id in cves: - mapping_cve_hotpatches[cve_id] = [] - if cve_id not in self.hotpatch_cves: - continue - - self.update_mapping_cve_hotpatches(priority, mapping_cve_hotpatches, cve_id) - - return mapping_cve_hotpatches - - def version_cmp(self, hotpatch_a: Hotpatch, hotpatch_b: Hotpatch): - """ - Sort the hotpatch in descending order according to the version-release. - """ - vere_a = "%s-%s" % (hotpatch_a.version, hotpatch_a.release) - vere_b = "%s-%s" % (hotpatch_b.version, hotpatch_b.release) - if self.version.larger_than(vere_a, vere_b): - return -1 - if self.version.larger_than(vere_b, vere_a): - return 1 - return 0 - - def update_mapping_cve_hotpatches(self, priority: str, mapping_cve_hotpatches: dict, cve_id: str): - """ - Update mapping_cve_hotpatches. Specifically, get the hotpatches corresponding to the cve_id, if find - at least one installable hotpatch, append hotpatches in mapping_cve_hotpatches for cve_id, in which - only the hotpatch with the highest version (priority first) for each targeted required package is - returned. Otherwise, do not update mapping_cve_hotpatches. - - Args: - priority(str): ACC or SGL - mapping_cve_hotpatches(dict): cve and corresponding hotpatches - e.g. - { - cve_id: [Hotpatch.nevra] - } - cve_id(str): cve id. e.g. "CVE-2023-1111" - """ - mapping_required_pkg_to_hotpatches = dict() - fixed_required_pkgs_str = [ - hotpatch.required_pkgs_str - for hotpatch in self.hotpatch_cves[cve_id].hotpatches - if hotpatch.state == self.INSTALLED - ] - for hotpatch in self.hotpatch_cves[cve_id].hotpatches: - if hotpatch.state != self.INSTALLABLE: - continue - if hotpatch.required_pkgs_str in fixed_required_pkgs_str: - continue - mapping_required_pkg_to_hotpatches.setdefault(hotpatch.required_pkgs_str, []).append(hotpatch) - if hotpatch.hotpatch_name == "ACC": - self.append_related_hotpatches(mapping_required_pkg_to_hotpatches) - - if not mapping_required_pkg_to_hotpatches: - return - - # find the hotpatch with the highest version for the same target required package - for hotpatches in mapping_required_pkg_to_hotpatches.values(): - prefered_hotpatch = self.get_preferred_hotpatch(hotpatches, priority) - if not prefered_hotpatch: - continue - if prefered_hotpatch.state == self.INSTALLABLE: - mapping_cve_hotpatches[cve_id].append(prefered_hotpatch.nevra) - - def append_related_hotpatches(self, mapping_required_pkg_to_hotpatches: dict): - """ - Append the hotpatches corresponding to the target required package in - mapping_required_pkg_to_hotpatches. - - Args: - mapping_required_pkg_to_hotpatches(dict): target required pkg and corresponding hotpatches - e.g. - { - "redis,redis-cli": [Hotpatch] - } - - """ - for cve in self.hotpatch_cves.values(): - for hotpatch in cve.hotpatches: - if hotpatch.hotpatch_name != "ACC": - continue - if hotpatch.required_pkgs_str not in mapping_required_pkg_to_hotpatches: - continue - if hotpatch in mapping_required_pkg_to_hotpatches[hotpatch.required_pkgs_str]: - continue - mapping_required_pkg_to_hotpatches[hotpatch.required_pkgs_str].append(hotpatch) - - def get_preferred_hotpatch(self, hotpatches: list, priority: str): - """ - Get the preferred hotpatch with priority in their hotpatch_name. - - Args: - hotpatches(list): hotpatches - priority(str): ACC or SGL - - Returns: - Hotpatch or None - """ - if not hotpatches: - return None - hotpatches.sort(key=cmp_to_key(self.version_cmp)) - for hotpatch in hotpatches: - if priority in hotpatch.hotpatch_name: - return hotpatch - return hotpatches[0] - - def get_hotpatches_from_advisories(self, advisories: List[str]) -> dict(): - """ - Get hotpatches from specified advisories - - Args: - advisories: [advisory_id_1, advisory_id_2] - - Return: - { - advisory_id_1: [hotpatch1], - advisory_id_2: [] - } - """ - mapping_advisory_hotpatches = dict() - for advisory_id in advisories: - mapping_advisory_hotpatches[advisory_id] = [] - if advisory_id not in self._hotpatch_advisories: - continue - advisory = self._hotpatch_advisories[advisory_id] - for hotpatch in advisory.hotpatches: - if hotpatch.state == self.INSTALLABLE: - mapping_advisory_hotpatches[advisory_id].append(hotpatch.nevra) - return mapping_advisory_hotpatches diff --git a/hotpatch/version.py b/hotpatch/version.py deleted file mode 100644 index 820b8d5..0000000 --- a/hotpatch/version.py +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/python3 -# ****************************************************************************** -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. -# licensed under the Mulan PSL v2. -# You can use this software according to the terms and conditions of the Mulan PSL v2. -# You may obtain a copy of Mulan PSL v2 at: -# http://license.coscl.org.cn/MulanPSL2 -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR -# PURPOSE. -# See the Mulan PSL v2 for more details. -# ******************************************************************************/ -class Versions: - """ - Version number processing - """ - - separator = (".", "-") - _connector = "&" - - def _order(self, version, separator=None): - """ - Version of the cutting - Args: - version: version - separator: separator - - Returns: - - """ - if not separator: - separator = self._connector - return tuple([int(v) for v in version.split(separator) if v.isdigit()]) - - def larger_than(self, version, compare_version): - """ - Returns true if the size of the compared version is greater - than that of the compared version, or false otherwise - - """ - for separator in self.separator: - version = self._connector.join([v for v in version.split(separator)]) - compare_version = self._connector.join([v for v in compare_version.split(separator)]) - version = self._order(version) - compare_version = self._order(compare_version) - return version >= compare_version -- 2.27.0