aops-apollo/0003-remove-hotpatch.patch

2657 lines
102 KiB
Diff
Raw Normal View History

From dfdc428a9f9685d5186fcb8ca2aba612e7426907 Mon Sep 17 00:00:00 2001
From: gongzt <gong_zhengtang@163.com>
Date: Thu, 19 Oct 2023 09:26:26 +0800
Subject: remove hotpatch
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
aops-apollo.spec | 18 +-
hotpatch/baseclass.py | 285 ----------------
hotpatch/hot_updateinfo.py | 613 -----------------------------------
hotpatch/hotpatch.py | 165 ----------
hotpatch/hotupgrade.py | 482 ---------------------------
hotpatch/syscare.py | 116 -------
hotpatch/test_hotpatch.py | 45 ---
hotpatch/test_hotupgrade.py | 75 -----
hotpatch/test_syscare.py | 152 ---------
hotpatch/updateinfo_parse.py | 527 ------------------------------
hotpatch/version.py | 46 ---
11 files changed, 4 insertions(+), 2520 deletions(-)
delete mode 100644 hotpatch/baseclass.py
delete mode 100644 hotpatch/hot_updateinfo.py
delete mode 100644 hotpatch/hotpatch.py
delete mode 100644 hotpatch/hotupgrade.py
delete mode 100644 hotpatch/syscare.py
delete mode 100644 hotpatch/test_hotpatch.py
delete mode 100644 hotpatch/test_hotupgrade.py
delete mode 100644 hotpatch/test_syscare.py
delete mode 100644 hotpatch/updateinfo_parse.py
delete mode 100644 hotpatch/version.py
diff --git a/aops-apollo.spec b/aops-apollo.spec
index 308436f..1047cdd 100644
--- a/aops-apollo.spec
+++ b/aops-apollo.spec
@@ -1,5 +1,5 @@
Name: aops-apollo
-Version: v1.2.1
+Version: v1.3.3
Release: 1
Summary: Cve management service, monitor machine vulnerabilities and provide fix functions.
License: MulanPSL2
@@ -18,13 +18,6 @@ Provides: aops-apollo
%description
Cve management service, monitor machine vulnerabilities and provide fix functions.
-%package -n dnf-hotpatch-plugin
-Summary: dnf hotpatch plugin
-Requires: python3-hawkey python3-dnf syscare >= 1.0.1
-
-%description -n dnf-hotpatch-plugin
-dnf hotpatch plugin, it's about hotpatch query and fix
-
%package -n aops-apollo-tool
Summary: Small tools for aops-apollo, e.g. updateinfo.xml generater
Requires: python3-rpm
@@ -54,9 +47,6 @@ pushd aops-apollo-tool
%py3_install
popd
-#install for aops-dnf-plugin
-cp -r hotpatch %{buildroot}/%{python3_sitelib}/dnf-plugins/
-
%files
%doc README.*
@@ -68,9 +58,6 @@ cp -r hotpatch %{buildroot}/%{python3_sitelib}/dnf-plugins/
%{python3_sitelib}/apollo/*
%attr(0755, root, root) /opt/aops/database/*
-%files -n dnf-hotpatch-plugin
-%{python3_sitelib}/dnf-plugins/*
-
%files -n aops-apollo-tool
%attr(0644,root,root) %{_sysconfdir}/aops_apollo_tool/updateinfo_config.ini
%attr(0755,root,root) %{_bindir}/gen-updateinfo
@@ -78,6 +65,9 @@ cp -r hotpatch %{buildroot}/%{python3_sitelib}/dnf-plugins/
%{python3_sitelib}/aops_apollo_tool/*
%changelog
+* Thu Oct 19 2023 gongzhengtang<gong_zhengtang@163.com> - v1.3.3-1
+- Remove hotpatch
+
* Tue May 23 2023 zhu-yuncheng<zhuyuncheng@huawei.com> - v1.2.1-1
- Better dnf hotpatch plugin for more syscare command
- Add updateinfo.xml generation tool
diff --git a/hotpatch/baseclass.py b/hotpatch/baseclass.py
deleted file mode 100644
index b7e29fd..0000000
--- a/hotpatch/baseclass.py
+++ /dev/null
@@ -1,285 +0,0 @@
-#!/usr/bin/python3
-# ******************************************************************************
-# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
-# licensed under the Mulan PSL v2.
-# You can use this software according to the terms and conditions of the Mulan PSL v2.
-# You may obtain a copy of Mulan PSL v2 at:
-# http://license.coscl.org.cn/MulanPSL2
-# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
-# PURPOSE.
-# See the Mulan PSL v2 for more details.
-# ******************************************************************************/
-class Hotpatch(object):
- __slots__ = [
- '_name',
- '_version',
- '_release',
- '_cves',
- '_advisory',
- '_arch',
- '_filename',
- '_state',
- '_required_pkgs_info',
- '_required_pkgs_str',
- '_required_pkgs_name_str',
- ]
-
- def __init__(self, name, version, arch, filename, release):
- """
- name: str
- version: str
- arch: str
- filename: str
- release: str
- """
- self._name = name
- self._version = version
- self._arch = arch
- self._filename = filename
- self._cves = []
- self._advisory = None
- self._state = ''
- self._release = release
- self._required_pkgs_info = dict()
- self._required_pkgs_str = ''
- self._required_pkgs_name_str = ''
-
- @property
- def state(self):
- return self._state
-
- @state.setter
- def state(self, value):
- self._state = value
-
- @property
- def name(self):
- """
- name: patch-src_pkg-ACC or patch-src_pkg-SGL_xxx
- """
- return self._name
-
- @property
- def version(self):
- return self._version
-
- @property
- def release(self):
- return self._release
-
- @property
- def src_pkg(self):
- """
- The compiled source package for hotpatch.
-
- src_pkg: name-version-release
- """
- src_pkg = self.name[self.name.index('-') + 1 : self.name.rindex('-')]
- return src_pkg
-
- @property
- def required_pkgs_info(self):
- """
- The target fixed rpm package of the hotpatch.
- """
- return self._required_pkgs_info
-
- @required_pkgs_info.setter
- def required_pkgs_info(self, required_pkgs_info):
- """
- The required pkgs info are from the 'dnf repoquery --requires name-version-release.arch'. The
- required pkgs info are considered to be truely fixed rpm package.
-
- e.g.
- {
- 'redis': '6.2.5-1',
- 'redis-cli': '6.2.5-1'
- }
- """
-
- self._required_pkgs_info = required_pkgs_info
- required_pkgs_str_list = []
- required_pkgs_name_str_list = []
- # sort the _required_pkgs_info and concatenate the str to get _required_pkgs_str
- for required_pkgs_name, required_pkgs_vere in self._required_pkgs_info.items():
- required_pkgs_str_list.append("%s-%s" % (required_pkgs_name, required_pkgs_vere))
- required_pkgs_name_str_list.append(required_pkgs_name)
- sorted(required_pkgs_str_list)
- sorted(required_pkgs_name_str_list)
- self._required_pkgs_str = ",".join(required_pkgs_str_list)
- self._required_pkgs_name_str = ",".join(required_pkgs_name_str_list)
-
- @property
- def required_pkgs_str(self):
- """
- The truly fixed rpm package mark, which is composed of required_pkgs_info.
-
- e.g.
- 'redis-6.2.5-1,redis-cli-6.2.5-1'
- """
- return self._required_pkgs_str
-
- @property
- def required_pkgs_name_str(self):
- """
- The truly fixed rpm package name mark, which is composed of keys of required_pkgs_info.
-
- e.g.
- 'redis,redis-cli'
- """
- return self._required_pkgs_name_str
-
- @property
- def src_pkg_nevre(self):
- """
- Parse the source package to get the source package name, the source package version and the source package release
-
- Returns:
- src_pkg_name, src_pkg_version, src_pkg_release
- """
- src_pkg = self.src_pkg
- release_pos = src_pkg.rindex('-')
- version_pos = src_pkg.rindex('-', 0, release_pos)
- src_pkg_name, src_pkg_version, src_pkg_release = (
- src_pkg[0:version_pos],
- src_pkg[version_pos + 1 : release_pos],
- src_pkg[release_pos + 1 :],
- )
- return src_pkg_name, src_pkg_version, src_pkg_release
-
- @property
- def nevra(self):
- """
- Format the filename as 'name-versioin-release.arch' for display, which is defined as nevra
-
- nevra: name-version-release.arch
- """
- return self.filename[0 : self.filename.rindex('.')]
-
- @property
- def hotpatch_name(self):
- """
- There are two types of hotpatch, ACC hotpatch and SGL hotpatch. The ACC hotpatch can be made
- iteratively, and its 'hotpatch_name' is defined as ACC. The SGL hotpatch cannot be made iteratively,
- and its 'hotpatch_name' is defined as SGL_xxx. The 'xxx' in the SGL_xxx means the issue it solves.
- """
- hotpatch_name = self.name[self.name.rindex('-') + 1 :]
- return hotpatch_name
-
- @property
- def syscare_subname(self):
- """
- The 'syscare_subname' is used for hotpatch status querying in syscare list, which is composed of
- 'src_pkg/hotpatch_name-version-release'.
- """
- src_pkg = '%s-%s-%s' % (self.src_pkg_nevre)
-
- return '%s/%s-%s-%s' % (src_pkg, self.hotpatch_name, self.version, self.release)
-
- @property
- def cves(self):
- return self._cves
-
- @cves.setter
- def cves(self, cves):
- self._cves = cves
-
- @property
- def advisory(self):
- return self._advisory
-
- @advisory.setter
- def advisory(self, advisory):
- self._advisory = advisory
-
- @property
- def arch(self):
- return self._arch
-
- @property
- def filename(self):
- return self._filename
-
-
-class Cve(object):
- __slots__ = ['_cve_id', '_hotpatches']
-
- def __init__(self, id, **kwargs):
- """
- id: str
- """
- self._cve_id = id
- self._hotpatches = []
-
- @property
- def hotpatches(self):
- return self._hotpatches
-
- def add_hotpatch(self, hotpatch: Hotpatch):
- self._hotpatches.append(hotpatch)
-
- @property
- def cve_id(self):
- return self._cve_id
-
-
-class Advisory(object):
- __slots__ = ['_id', '_adv_type', '_title', '_severity', '_description', '_updated', '_hotpatches', '_cves']
-
- def __init__(self, id, adv_type, title, severity, description, updated="1970-01-01 08:00:00", **kwargs):
- """
- id: str
- adv_type: str
- title: str
- severity: str
- description: str
- updated: str
- """
- self._id = id
- self._adv_type = adv_type
- self._title = title
- self._severity = severity
- self._description = description
- self._updated = updated
- self._cves = {}
- self._hotpatches = []
-
- @property
- def id(self):
- return self._id
-
- @property
- def adv_type(self):
- return self._adv_type
-
- @property
- def title(self):
- return self._title
-
- @property
- def severity(self):
- return self._severity
-
- @property
- def description(self):
- return self._description
-
- @property
- def updated(self):
- return self._updated
-
- @property
- def cves(self):
- return self._cves
-
- @cves.setter
- def cves(self, advisory_cves):
- self._cves = advisory_cves
-
- @property
- def hotpatches(self):
- return self._hotpatches
-
- def add_hotpatch(self, hotpatch: Hotpatch):
- self._hotpatches.append(hotpatch)
diff --git a/hotpatch/hot_updateinfo.py b/hotpatch/hot_updateinfo.py
deleted file mode 100644
index 3e86721..0000000
--- a/hotpatch/hot_updateinfo.py
+++ /dev/null
@@ -1,613 +0,0 @@
-#!/usr/bin/python3
-# ******************************************************************************
-# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
-# licensed under the Mulan PSL v2.
-# You can use this software according to the terms and conditions of the Mulan PSL v2.
-# You may obtain a copy of Mulan PSL v2 at:
-# http://license.coscl.org.cn/MulanPSL2
-# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
-# PURPOSE.
-# See the Mulan PSL v2 for more details.
-# ******************************************************************************/
-import dnf
-import hawkey
-from dnf.i18n import _
-from dnf.cli.commands.updateinfo import UpdateInfoCommand
-from dataclasses import dataclass
-from .updateinfo_parse import HotpatchUpdateInfo
-from .version import Versions
-from .baseclass import Hotpatch
-
-
-@dataclass
-class DisplayItem:
- """
- Class for storing the formatting parameters and display lines.
-
- idw(int): the width of 'cve_id'
- tiw(int): the width of 'adv_type'
- ciw(int): the width of 'coldpatch'
- display_lines(set): {
- (cve_id, adv_type, coldpatch, hotpatch),
- }
- """
-
- idw: int
- tiw: int
- ciw: int
- display_lines: set
-
-
-@dnf.plugin.register_command
-class HotUpdateinfoCommand(dnf.cli.Command):
- CVE_ID_INDEX = 0
- ADV_SEVERITY_INDEX = 1
- COLDPATCH_INDEX = 2
- HOTPATCH_INDEX = 3
-
- aliases = ['hot-updateinfo']
- summary = _('show hotpatch updateinfo')
-
- def __init__(self, cli):
- """
- Initialize the command
- """
- super(HotUpdateinfoCommand, self).__init__(cli)
-
- @staticmethod
- def set_argparser(parser):
- spec_action_cmds = ['list']
- parser.add_argument('spec_action', nargs=1, choices=spec_action_cmds, help=_('show updateinfo list'))
-
- with_cve_cmds = ['cve', 'cves']
- parser.add_argument('with_cve', nargs=1, choices=with_cve_cmds, help=_('show cves'))
-
- availability = parser.add_mutually_exclusive_group()
- availability.add_argument(
- "--available",
- dest="availability",
- const='available',
- action='store_const',
- help=_("cves about newer versions of installed packages (default)"),
- )
- availability.add_argument(
- "--installed",
- dest="availability",
- const='installed',
- action='store_const',
- help=_("cves about equal and older versions of installed packages"),
- )
-
- def configure(self):
- demands = self.cli.demands
- demands.sack_activation = True
- demands.available_repos = True
-
- self.filter_cves = self.opts.cves if self.opts.cves else None
-
- def run(self):
- self.hp_hawkey = HotpatchUpdateInfo(self.cli.base, self.cli)
-
- if self.opts.spec_action and self.opts.spec_action[0] == 'list' and self.opts.with_cve:
- self.display()
-
- def get_mapping_nevra_cve(self) -> dict:
- """
- Get cve nevra mapping based on the UpdateInfoCommand of 'dnf updateinfo list cves'.
-
- Returns:
- dict: to collect cve and cold patch updateinfo information
- {
- (name-version-release.arch, advisory.updated): {
- cve_id: (advisory.type, advisory.severity)
- }
- }
- """
-
- apkg_adv_insts = self.get_apkg_adv_insts()
-
- mapping_nevra_cve = dict()
- for apkg, advisory, _ in apkg_adv_insts:
- nevra = (apkg.name, apkg.evr, apkg.arch)
- for ref in advisory.references:
- if ref.type != hawkey.REFERENCE_CVE:
- continue
- mapping_nevra_cve.setdefault((nevra, advisory.updated), dict())[ref.id] = (
- advisory.type,
- advisory.severity,
- )
- return mapping_nevra_cve
-
- def get_apkg_adv_insts(self):
- """
- Configure UpdateInfoCommand with 'dnf updateinfo list cves --available' (default) or
- 'dnf updateinfo list cves --installed' according to the args, and get available package, advisory
- and package installation information.
-
- Returns:
- generator: generator for (available package, advisory, package installation
- information)
- """
- updateinfo = UpdateInfoCommand(self.cli)
- updateinfo.opts = self.opts
-
- updateinfo.opts.spec_action = 'list'
- updateinfo.opts.with_cve = True
- updateinfo.opts.spec = '*'
- updateinfo.opts._advisory_types = set()
- updateinfo.opts.availability = self.opts.availability
- self.updateinfo = updateinfo
-
- if self.updateinfo.opts.availability == 'installed':
- apkg_adv_insts = updateinfo.installed_apkg_adv_insts(updateinfo.opts.spec)
- else:
- apkg_adv_insts = updateinfo.available_apkg_adv_insts(updateinfo.opts.spec)
-
- return apkg_adv_insts
-
- def get_fixed_cve_id_and_hotpatch_require_info(self, fixed_cve_id_and_hotpatch: set):
- """
- Get fixed cve id and hotpatch require package information.
-
- Args:
- fixed_cve_id_and_hotpatch(set):
- e.g.
- {
- ('CVE-2023-1111', Hotpatch)
- }
-
- Returns:
- set:
- e.g.
- {
- ('CVE-2023-1111', 'redis-6.2.5-1')
- ('CVE-2023-1112', 'redis-6.2.5-1,redis-cli-6.2.5-1')
- }
- """
- fixed_cve_id_and_hotpatch_require_info = set()
- for fixed_cve_id, fixed_hotpatch in fixed_cve_id_and_hotpatch:
- fixed_cve_id_and_hotpatch_require_info.add((fixed_cve_id, fixed_hotpatch.required_pkgs_str))
- return fixed_cve_id_and_hotpatch_require_info
-
- def get_iterated_cve_id_and_hotpatch_require_info(self, iterated_cve_id_and_hotpatch: set):
- """
- Get iterated cve id and hotpatch require package name information.
-
- Args:
- iterated_cve_id_and_hotpatch(set):
- e.g.
- {
- ('CVE-2023-1111', Hotpatch)
- }
-
-
- Returns:
- set
- e.g.
- {
- ('CVE-2023-1111', 'redis')
- ('CVE-2023-1112', 'redis,redis-cli')
- }
- """
- iterated_cve_id_and_hotpatch_require_info = set()
- for iterated_cve_id, iterated_hotpatch in iterated_cve_id_and_hotpatch:
- iterated_cve_id_and_hotpatch_require_info.add((iterated_cve_id, iterated_hotpatch.required_pkgs_name_str))
- return iterated_cve_id_and_hotpatch_require_info
-
- def check_is_in_fixed_cve_id_and_hotpatch_require_info(
- self, cve_id: str, hotpatch: Hotpatch, fixed_cve_id_and_hotpatch_require_info: set
- ):
- """
- Check the (cve_id, hotpatch.required_pkgs_str) whether is in fixed_cve_id_and_hotpatch_require_info.
- If the (cve_id, hotpatch.required_pkgs_str) is in fixed_cve_id_and_hotpatch_require_info, the cve corresponding
- to the hotpatch should be fixed.
-
- Args:
- cve_id(str)
- hotpatch(Hotpatch)
- fixed_cve_id_and_hotpatch_require_info(set):
- e.g.
- {
- ('CVE-2023-1111', 'redis-6.2.5-1')
- ('CVE-2023-1112', 'redis-6.2.5-1')
- }
-
- Returns:
- bool: whether the cve corresponding to the hotpatch is fixed
- """
- is_fixed = False
- if (cve_id, hotpatch.required_pkgs_str) in fixed_cve_id_and_hotpatch_require_info:
- is_fixed = True
- return is_fixed
-
- def check_is_in_iterated_cve_id_and_hotpatch_require_info(
- self, cve_id: str, hotpatch: Hotpatch, iterated_cve_id_and_hotpatch_require_info: set
- ):
- """
- Check the (cve_id, hotpatch.required_pkgs_name_str) whether is in iterated_cve_id_and_hotpatch_require_info.
- If the (cve_id, hotpatch.required_pkgs_name_str) is in fixed_cve_id_and_hotpatch_require_info, the cve
- corresponding to the hotpatch should be iterated.
-
- Args:
- cve_id(str)
- hotpatch(Hotpatch)
- iterated_cve_id_and_hotpatch_require_info(set):
- e.g.
- {
- ('CVE-2023-1111', 'redis')
- ('CVE-2023-1112', 'redis')
- }
-
- Returns:
- bool: whether the cve corresponding to the hotpatch is iterated
- """
- is_iterated = False
- if (
- hotpatch.state in (self.hp_hawkey.UNINSTALLABLE, self.hp_hawkey.UNRELATED)
- and (cve_id, hotpatch.required_pkgs_name_str) in iterated_cve_id_and_hotpatch_require_info
- ):
- is_iterated = True
- return is_iterated
-
- def get_filtered_display_item(
- self,
- format_lines: set,
- fixed_cve_id_and_hotpatch: set,
- installable_cve_id_and_hotpatch: set,
- iterated_cve_id_and_hotpatch: set,
- ):
- """
- Get filtered display item.
-
- Args:
- format_lines(set):
- {
- (cve_id, adv_type, coldpatch, hotpatch)
- }
-
- fixed_cve_id_and_hotpatch(set):
- e.g.
- {
- ('CVE-2023-1111', Hotpatch)
- }
-
- iterated_cve_id_and_hotpatch(set):
- e.g.
- {
- ('CVE-2023-1111', Hotpatch)
- }
-
- Returns:
- DisplayItem: for display
- """
- if self.updateinfo.opts.availability == 'installed':
- display_item = self.get_installed_filtered_display_item(
- format_lines, fixed_cve_id_and_hotpatch, installable_cve_id_and_hotpatch
- )
- return display_item
- display_item = self.get_available_filtered_display_item(
- format_lines, fixed_cve_id_and_hotpatch, iterated_cve_id_and_hotpatch
- )
- return display_item
-
- def get_installed_filtered_display_item(
- self, format_lines: set, fixed_cve_id_and_hotpatch: set, installable_cve_id_and_hotpatch: set
- ):
- """
- Get filtered display item by removing installable cve id and hotpatch, and removing iterated cve id
- and hotpatch. For hotpatch, only show ones which have been installed and been actived/accepted in
- syscare.
-
- Args:
- format_lines(set):
- {
- (cve_id, adv_type, coldpatch, hotpatch)
- }
-
- installable_cve_id_and_hotpatch(set):
- e.g.
- {
- ('CVE-2023-1111', Hotpatch)
- }
-
- Returns:
- DisplayItem: for display
-
- """
- display_lines = set()
-
- # calculate the width of each column
- idw = tiw = ciw = 0
- for format_line in format_lines:
- cve_id, adv_type, coldpatch, hotpatch = (
- format_line[self.CVE_ID_INDEX],
- format_line[self.ADV_SEVERITY_INDEX],
- format_line[self.COLDPATCH_INDEX],
- format_line[self.HOTPATCH_INDEX],
- )
- if self.filter_cves is not None and cve_id not in self.filter_cves:
- continue
- if (cve_id, hotpatch) in installable_cve_id_and_hotpatch:
- if coldpatch == '-':
- continue
- else:
- hotpatch = '-'
-
- if isinstance(hotpatch, Hotpatch):
- if (cve_id, hotpatch) in fixed_cve_id_and_hotpatch or hotpatch.state == self.hp_hawkey.INSTALLED:
- hotpatch = hotpatch.nevra
- elif hotpatch.state in (self.hp_hawkey.UNINSTALLABLE, self.hp_hawkey.UNRELATED):
- hotpatch = '-'
-
- if coldpatch == '-' and hotpatch == '-':
- continue
-
- idw = max(idw, len(cve_id))
- tiw = max(tiw, len(adv_type))
- ciw = max(ciw, len(coldpatch))
- display_lines.add((cve_id, adv_type, coldpatch, hotpatch))
-
- display_item = DisplayItem(idw=idw, tiw=tiw, ciw=ciw, display_lines=display_lines)
- return display_item
-
- def get_available_filtered_display_item(
- self, format_lines: set, fixed_cve_id_and_hotpatch: set, iterated_cve_id_and_hotpatch: set
- ):
- """
- Get filtered display item by removing fixed cve id and hotpatch, and removing iterated cve id
- and hotpatch.
-
- Args:
- format_lines(set):
- {
- (cve_id, adv_type, coldpatch, hotpatch)
- }
-
- fixed_cve_id_and_hotpatch(set):
- e.g.
- {
- ('CVE-2023-1111', Hotpatch)
- }
-
- iterated_cve_id_and_hotpatch(set):
- e.g.
- {
- ('CVE-2023-1111', Hotpatch)
- }
-
- Returns:
- DisplayItem: for display
- """
- display_lines = set()
-
- fixed_cve_id_and_hotpatch_require_info = self.get_fixed_cve_id_and_hotpatch_require_info(
- fixed_cve_id_and_hotpatch
- )
- iterated_cve_id_and_hotpatch_require_info = self.get_iterated_cve_id_and_hotpatch_require_info(
- iterated_cve_id_and_hotpatch
- )
-
- # calculate the width of each column
- idw = tiw = ciw = 0
- for format_line in format_lines:
- cve_id, adv_type, coldpatch, hotpatch = (
- format_line[self.CVE_ID_INDEX],
- format_line[self.ADV_SEVERITY_INDEX],
- format_line[self.COLDPATCH_INDEX],
- format_line[self.HOTPATCH_INDEX],
- )
- if self.filter_cves is not None and cve_id not in self.filter_cves:
- continue
- if (cve_id, hotpatch) in fixed_cve_id_and_hotpatch:
- continue
-
- if isinstance(hotpatch, Hotpatch):
- if self.check_is_in_iterated_cve_id_and_hotpatch_require_info(
- cve_id, hotpatch, iterated_cve_id_and_hotpatch_require_info
- ):
- continue
- if self.check_is_in_fixed_cve_id_and_hotpatch_require_info(
- cve_id, hotpatch, fixed_cve_id_and_hotpatch_require_info
- ):
- continue
- # format hotpatch
- if hotpatch.state == self.hp_hawkey.INSTALLABLE:
- hotpatch = hotpatch.nevra
- elif hotpatch.state == self.hp_hawkey.UNINSTALLABLE:
- hotpatch = '-'
- elif hotpatch.state == self.hp_hawkey.UNRELATED and coldpatch == '-':
- continue
- elif hotpatch.state == self.hp_hawkey.UNRELATED:
- hotpatch = '-'
-
- idw = max(idw, len(cve_id))
- tiw = max(tiw, len(adv_type))
- ciw = max(ciw, len(coldpatch))
- display_lines.add((cve_id, adv_type, coldpatch, hotpatch))
-
- display_item = DisplayItem(idw=idw, tiw=tiw, ciw=ciw, display_lines=display_lines)
- return display_item
-
- def append_fixed_cve_id_and_hotpatch(self, fixed_cve_id_and_hotpatch: set):
- """
- Append fixed cve id and hotpatch in fixed_cve_id_and_hotpatch. The ACC hotpatch that are less or equal
- to the highest ACC actived version-release for the same target required package, is considered to be
- fixed.
-
- Args:
- fixed_cve_id_and_hotpatch(set)
- e.g.
- {
- ('CVE-2023-2221', Hotpatch)
- }
- Returns:
- set:
- e.g.
- {
- ('CVE-2023-2221', Hotpatch),
- ('CVE-2023-1111', Hotpatch)
- }
-
- """
- versions = Versions()
- # {hotpatch_required_pkgs_str: version-release}
- hotpatch_vere_mapping = dict()
- for _, fixed_hotpatch in fixed_cve_id_and_hotpatch:
- # get the highest version-release for each target required package
- required_pkgs_str = fixed_hotpatch.required_pkgs_str
- current_vere = "%s-%s" % (fixed_hotpatch.version, fixed_hotpatch.release)
- if fixed_hotpatch.hotpatch_name != "ACC":
- continue
- if required_pkgs_str not in hotpatch_vere_mapping:
- hotpatch_vere_mapping[required_pkgs_str] = current_vere
- elif versions.larger_than(current_vere, hotpatch_vere_mapping[required_pkgs_str]):
- hotpatch_vere_mapping[required_pkgs_str] = current_vere
-
- # get all hot hotpatches that are less or equal to the highest version-release, and record the cves
- # which they fix
- for required_pkgs_str, actived_vere in hotpatch_vere_mapping.items():
- all_hotpatches = self.hp_hawkey._hotpatch_required_pkg_info_str[required_pkgs_str]
- for cmped_vere, hotpatch in all_hotpatches:
- if hotpatch.hotpatch_name != "ACC":
- continue
- if not versions.larger_than(actived_vere, cmped_vere):
- continue
- for cve_id in hotpatch.cves:
- fixed_cve_id_and_hotpatch.add((cve_id, hotpatch))
- return fixed_cve_id_and_hotpatch
-
- def get_formatting_parameters_and_display_lines(self):
- """
- Append hotpatch information according to the output of 'dnf updateinfo list cves'
-
- Returns:
- DisplayItem: for display
- """
-
- def type2label(updateinfo, typ, sev):
- if typ == hawkey.ADVISORY_SECURITY:
- return updateinfo.SECURITY2LABEL.get(sev, _('Unknown/Sec.'))
- else:
- return updateinfo.TYPE2LABEL.get(typ, _('unknown'))
-
- mapping_nevra_cve = self.get_mapping_nevra_cve()
- echo_lines = set()
- fixed_cve_id_and_hotpatch = set()
- installable_cve_id_and_hotpatch = set()
- uninstallable_cve_id_and_hotpatch = set()
- iterated_cve_id_and_hotpatch = set()
-
- for ((nevra), aupdated), id2type in sorted(mapping_nevra_cve.items(), key=lambda x: x[0]):
- pkg_name, pkg_evr, pkg_arch = nevra
- coldpatch = '%s-%s.%s' % (pkg_name, pkg_evr, pkg_arch)
- for cve_id, atypesev in id2type.items():
- label = type2label(self.updateinfo, *atypesev)
- # if there is no hotpatch corresponding to the cve id, mark hotpatch as '-'
- if cve_id not in self.hp_hawkey.hotpatch_cves or not self.hp_hawkey.hotpatch_cves[cve_id].hotpatches:
- echo_line = (cve_id, label, coldpatch, '-')
- echo_lines.add(echo_line)
- continue
-
- for hotpatch in self.hp_hawkey.hotpatch_cves[cve_id].hotpatches:
- # if cold patch name does not match with hotpatch required pkg name (target fix pkgs)
- if pkg_name not in hotpatch._required_pkgs_info.keys():
- echo_line = (cve_id, label, coldpatch, '-')
- echo_lines.add(echo_line)
- continue
- if hotpatch.state == self.hp_hawkey.INSTALLED:
- # record the fixed cve_id and hotpatch, filter the packages that are lower than
- # the currently installed package for solving the same cve and target required
- # pakcage
- fixed_cve_id_and_hotpatch.add((cve_id, hotpatch))
- iterated_cve_id_and_hotpatch.add((cve_id, hotpatch))
- elif hotpatch.state == self.hp_hawkey.INSTALLABLE:
- # record the installable cve_id and hotpatch, filter the packages that are bigger
- # than the currently installed package
- installable_cve_id_and_hotpatch.add((cve_id, hotpatch))
- iterated_cve_id_and_hotpatch.add((cve_id, hotpatch))
- elif hotpatch.state == self.hp_hawkey.UNINSTALLABLE:
- uninstallable_cve_id_and_hotpatch.add((cve_id, hotpatch))
- echo_line = (cve_id, label, coldpatch, hotpatch)
- echo_lines.add(echo_line)
-
- self.add_untraversed_hotpatches(
- echo_lines,
- fixed_cve_id_and_hotpatch,
- installable_cve_id_and_hotpatch,
- uninstallable_cve_id_and_hotpatch,
- iterated_cve_id_and_hotpatch,
- )
- # lower version ACC hotpatch of fixed ACC hotpatch, is also considered to be fixed
- fixed_cve_id_and_hotpatch = self.append_fixed_cve_id_and_hotpatch(fixed_cve_id_and_hotpatch)
- # remove fixed cve and hotpatch from installable_cve_id_and_hotpatch
- installable_cve_id_and_hotpatch = installable_cve_id_and_hotpatch.difference(fixed_cve_id_and_hotpatch)
- display_item = self.get_filtered_display_item(
- echo_lines, fixed_cve_id_and_hotpatch, installable_cve_id_and_hotpatch, iterated_cve_id_and_hotpatch
- )
-
- return display_item
-
- def add_untraversed_hotpatches(
- self,
- echo_lines: set,
- fixed_cve_id_and_hotpatch: set,
- installable_cve_id_and_hotpatch: set,
- uninstallable_cve_id_and_hotpatch: set,
- iterated_cve_id_and_hotpatch: set,
- ):
- """
- Add the echo lines, which are only with hotpatch but no coldpatch. And append
- fixed_cve_id_and_hotpatch and iterated_cve_id_and_hotpatch.
-
- Args:
- echo_lines(set)
- fixed_cve_id_and_hotpatch(set)
- installable_cve_id_and_hotpatch(set)
- uninstallable_cve_id_and_hotpatch(set)
- iterated_cve_id_and_hotpatch(set)
- """
- for cve_id, cve in self.hp_hawkey.hotpatch_cves.items():
- for hotpatch in cve.hotpatches:
- if hotpatch.state == self.hp_hawkey.UNRELATED:
- continue
- if (cve_id, hotpatch) in iterated_cve_id_and_hotpatch:
- continue
- if (cve_id, hotpatch) in uninstallable_cve_id_and_hotpatch:
- continue
- if hotpatch.state == self.hp_hawkey.INSTALLED:
- fixed_cve_id_and_hotpatch.add((cve_id, hotpatch))
- iterated_cve_id_and_hotpatch.add((cve_id, hotpatch))
- elif hotpatch.state == self.hp_hawkey.INSTALLABLE:
- installable_cve_id_and_hotpatch.add((cve_id, hotpatch))
- iterated_cve_id_and_hotpatch.add((cve_id, hotpatch))
- echo_line = (cve_id, hotpatch.advisory.severity + '/Sec.', '-', hotpatch)
- echo_lines.add(echo_line)
-
- def display(self):
- """
- Print the display lines according to the formatting parameters.
- """
- display_item = self.get_formatting_parameters_and_display_lines()
- idw, tiw, ciw, display_lines = display_item.idw, display_item.tiw, display_item.ciw, display_item.display_lines
- for display_line in sorted(
- display_lines,
- key=lambda x: (
- x[self.COLDPATCH_INDEX],
- x[self.HOTPATCH_INDEX],
- x[self.CVE_ID_INDEX],
- x[self.ADV_SEVERITY_INDEX],
- ),
- ):
- print(
- '%-*s %-*s %-*s %s'
- % (
- idw,
- display_line[self.CVE_ID_INDEX],
- tiw,
- display_line[self.ADV_SEVERITY_INDEX],
- ciw,
- display_line[self.COLDPATCH_INDEX],
- display_line[self.HOTPATCH_INDEX],
- )
- )
diff --git a/hotpatch/hotpatch.py b/hotpatch/hotpatch.py
deleted file mode 100644
index 0664edf..0000000
--- a/hotpatch/hotpatch.py
+++ /dev/null
@@ -1,165 +0,0 @@
-#!/usr/bin/python3
-# ******************************************************************************
-# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
-# licensed under the Mulan PSL v2.
-# You can use this software according to the terms and conditions of the Mulan PSL v2.
-# You may obtain a copy of Mulan PSL v2 at:
-# http://license.coscl.org.cn/MulanPSL2
-# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
-# PURPOSE.
-# See the Mulan PSL v2 for more details.
-# ******************************************************************************/
-import dnf
-from dnfpluginscore import _, logger
-from .syscare import Syscare
-from .updateinfo_parse import HotpatchUpdateInfo
-
-
-@dnf.plugin.register_command
-class HotpatchCommand(dnf.cli.Command):
- CVE_ID_INDEX = 0
- Name_INDEX = 1
- STATUS_INDEX = 2
-
- aliases = ['hotpatch']
- summary = _('show hotpatch info')
- syscare = Syscare()
-
- def __init__(self, cli):
- """
- Initialize the command
- """
- super(HotpatchCommand, self).__init__(cli)
-
- @staticmethod
- def set_argparser(parser):
- output_format = parser.add_mutually_exclusive_group()
- output_format.add_argument(
- '--list', nargs='?', type=str, default='', choices=['cve', 'cves'], help=_('show list of hotpatch')
- )
- output_format.add_argument(
- '--apply', type=str, default=None, dest='apply_name', nargs=1, help=_('apply hotpatch')
- )
- output_format.add_argument(
- '--remove', type=str, default=None, dest='remove_name', nargs=1, help=_('remove hotpatch')
- )
- output_format.add_argument(
- '--active', type=str, default=None, dest='active_name', nargs=1, help=_('active hotpatch')
- )
- output_format.add_argument(
- '--deactive', type=str, default=None, dest='deactive_name', nargs=1, help=_('deactive hotpatch')
- )
- output_format.add_argument(
- '--accept', type=str, default=None, dest='accept_name', nargs=1, help=_('accept hotpatch')
- )
-
- def configure(self):
- demands = self.cli.demands
- demands.sack_activation = True
- demands.available_repos = True
-
- self.filter_cves = self.opts.cves if self.opts.cves else None
-
- def run(self):
- self.hp_hawkey = HotpatchUpdateInfo(self.cli.base, self.cli)
- if self.opts.list != '':
- self.display()
- if self.opts.apply_name:
- self.operate_hot_patches(self.opts.apply_name, "apply", self.syscare.apply)
- if self.opts.remove_name:
- self.operate_hot_patches(self.opts.remove_name, "remove", self.syscare.remove)
- if self.opts.active_name:
- self.operate_hot_patches(self.opts.active_name, "active", self.syscare.active)
- if self.opts.deactive_name:
- self.operate_hot_patches(self.opts.deactive_name, "deactive", self.syscare.deactive)
- if self.opts.accept_name:
- self.operate_hot_patches(self.opts.accept_name, "accept", self.syscare.accept)
-
- def _filter_and_format_list_output(self, echo_lines: list):
- """
- Only show specific cve information if cve id is given, and format the output.
- """
- format_lines = []
- title = ['CVE-id', 'base-pkg/hotpatch', 'status']
- idw = len(title[0])
- naw = len(title[1])
- for echo_line in echo_lines:
- cve_id, name, status = (
- echo_line[self.CVE_ID_INDEX],
- echo_line[self.Name_INDEX],
- echo_line[self.STATUS_INDEX],
- )
- if self.filter_cves is not None and cve_id not in self.filter_cves:
- continue
- idw = max(idw, len(cve_id))
- naw = max(naw, len(name))
- format_lines.append([cve_id, name, status])
-
- if not format_lines:
- return
-
- # print title
- if self.opts.list in ['cve', 'cves']:
- print(
- '%-*s %-*s %s' % (idw, title[self.CVE_ID_INDEX], naw, title[self.Name_INDEX], title[self.STATUS_INDEX])
- )
- else:
- print('%-*s %s' % (naw, title[self.Name_INDEX], title[self.STATUS_INDEX]))
-
- format_lines.sort(key=lambda x: (x[self.Name_INDEX], x[self.CVE_ID_INDEX]))
-
- if self.opts.list in ['cve', 'cves']:
- for cve_id, name, status in format_lines:
- print('%-*s %-*s %s' % (idw, cve_id, naw, name, status))
- else:
- new_format_lines = [(name, status) for _, name, status in format_lines]
- deduplicated_format_lines = list(set(new_format_lines))
- deduplicated_format_lines.sort(key=new_format_lines.index)
- for name, status in deduplicated_format_lines:
- print('%-*s %s' % (naw, name, status))
-
- def display(self):
- """
- Display hotpatch information.
-
- e.g.
- For the command of 'dnf hotpatch --list', the echo_lines is [[base-pkg/hotpatch, status], ...]
- For the command of 'dnf hotpatch --list cve', the echo_lines is [[cve_id, base-pkg/hotpatch, status], ...]
- """
-
- hotpatch_cves = self.hp_hawkey.hotpatch_cves
- echo_lines = []
- for cve_id in hotpatch_cves.keys():
- for hotpatch in hotpatch_cves[cve_id].hotpatches:
- for name, status in self.hp_hawkey._hotpatch_state.items():
- if hotpatch.syscare_subname not in name:
- continue
- echo_line = [cve_id, name, status]
- echo_lines.append(echo_line)
- self._filter_and_format_list_output(echo_lines)
-
- def operate_hot_patches(self, target_patch: list, operate, func) -> None:
- """
- operate hotpatch using syscare command
- Args:
- target_patch: type:list,e.g.:['redis-6.2.5-1/HP2']
-
- Returns:
- None
- """
- if len(target_patch) != 1:
- logger.error(_("using dnf hotpatch --%s wrong!"), operate)
- return
- target_patch = target_patch[0]
- logger.info(_("Gonna %s this hot patch: %s"), operate, self.base.output.term.bold(target_patch))
-
- output, status = func(target_patch)
- if status:
- logger.error(
- _("%s hot patch '%s' failed, remain original status."),
- operate,
- self.base.output.term.bold(target_patch),
- )
- else:
- logger.info(_("%s hot patch '%s' succeed"), operate, self.base.output.term.bold(target_patch))
diff --git a/hotpatch/hotupgrade.py b/hotpatch/hotupgrade.py
deleted file mode 100644
index f61e37f..0000000
--- a/hotpatch/hotupgrade.py
+++ /dev/null
@@ -1,482 +0,0 @@
-#!/usr/bin/python3
-# ******************************************************************************
-# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
-# licensed under the Mulan PSL v2.
-# You can use this software according to the terms and conditions of the Mulan PSL v2.
-# You may obtain a copy of Mulan PSL v2 at:
-# http://license.coscl.org.cn/MulanPSL2
-# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
-# PURPOSE.
-# See the Mulan PSL v2 for more details.
-# ******************************************************************************/
-from __future__ import print_function
-
-from time import sleep
-import dnf.base
-import dnf.exceptions
-import hawkey
-from dnf.cli import commands
-from dnf.cli.option_parser import OptionParser
-
-# from dnf.cli.output import Output
-from dnfpluginscore import _, logger
-
-from .hot_updateinfo import HotUpdateinfoCommand
-from .updateinfo_parse import HotpatchUpdateInfo
-from .syscare import Syscare
-from .version import Versions
-
-EMPTY_TAG = "-"
-
-
-@dnf.plugin.register_command
-class HotupgradeCommand(dnf.cli.Command):
- aliases = ("hotupgrade",)
- summary = "Hot upgrade package using hot patch."
- usage = ""
- syscare = Syscare()
- hp_list = []
-
- @staticmethod
- def set_argparser(parser):
- parser.add_argument(
- 'packages',
- nargs='*',
- help=_('Package to upgrade'),
- action=OptionParser.ParseSpecGroupFileCallback,
- metavar=_('PACKAGE'),
- )
- parser.add_argument(
- "--takeover", default=False, action='store_true', help=_('kernel cold patch takeover operation')
- )
-
- def configure(self):
- """Verify that conditions are met so that this command can run.
- These include that there are enabled repositories with gpg
- keys, and that this command is being run by the root user.
- """
- demands = self.cli.demands
- demands.sack_activation = True
- demands.available_repos = True
- demands.resolving = True
- demands.root_user = True
-
- commands._checkGPGKey(self.base, self.cli)
- if not self.opts.filenames:
- commands._checkEnabledRepo(self.base)
-
- def run(self):
- if self.opts.pkg_specs:
- self.hp_list = self.opts.pkg_specs
- elif self.opts.cves or self.opts.advisory:
- cve_pkgs = self.get_hotpatch_based_on_cve(self.opts.cves)
- advisory_pkgs = self.get_hotpatch_based_on_advisory(self.opts.advisory)
- self.hp_list = cve_pkgs + advisory_pkgs
-
- else:
- self.hp_list = self.get_hotpatch_of_all_cve()
- if self.hp_list:
- logger.info(_("Gonna apply all available hot patches: %s"), self.hp_list)
-
- available_hp_dict = self._get_available_hotpatches(self.hp_list)
- if not available_hp_dict:
- logger.info(_('No hot patches marked for install.'))
- return
-
- applied_old_patches = self._get_applied_old_patch(list(available_hp_dict.values()))
- if applied_old_patches:
- self._remove_hot_patches(applied_old_patches)
- else:
- self.syscare.save()
- success = self._install_rpm_pkg(list(available_hp_dict.keys()))
- if not success:
- logger.info(_("Error: Install hot patch failed, try to rollback."))
- output, status = self.syscare.restore()
- if status:
- raise dnf.exceptions.Error(_('Roll back failed.'))
- logger.info(_("Roll back succeed."))
- return
-
- if self.opts.takeover:
- self.takeover_operation()
- return
-
- def run_transaction(self) -> None:
- """
- apply hot patches
- Returns:
- None
- """
- # syscare need a little bit time to process the installed hot patch
- sleep(0.5)
- for hp in self.hp_list:
- self._apply_hp(hp)
- if self.opts.takeover and self.is_need_accept_kernel_hp:
- self._accept_kernel_hp(hp)
-
- def _apply_hp(self, hp_full_name):
- pkg_info = self._parse_hp_name(hp_full_name)
- hp_subname = self._get_hp_subname_for_syscare(pkg_info)
- output, status = self.syscare.apply(hp_subname)
- if status:
- logger.info(_('Apply hot patch failed: %s.'), hp_subname)
- else:
- logger.info(_('Apply hot patch succeed: %s.'), hp_subname)
-
- @staticmethod
- def _get_hp_subname_for_syscare(pkg_info: dict) -> str:
- """
- get hotpatch's subname for syscare command. e.g. redis-1-1/ACC-1-1
- Args:
- pkg_info: out put of _parse_hp_name.
-
- Returns:
- str
- """
- hp_subname = (
- "-".join([pkg_info["target_name"], pkg_info["target_version"], pkg_info["target_release"]])
- + '/'
- + "-".join([pkg_info["hp_name"], pkg_info["hp_version"], pkg_info["hp_release"]])
- )
- return hp_subname
-
- def _get_available_hotpatches(self, pkg_specs: list) -> dict:
- """
- check two conditions:
- 1. the hot patch rpm package exists in repositories
- 2. the hot patch's target package with specific version and release already installed
- Args:
- pkg_specs: full names of hot patches' rpm packages
-
- Returns:
- dict: key is available hot patches' full name,
- value is hot patches' operate name. e.g. kernel-5.10.0-60.66.0.91.oe2203/ACC-1-1
- """
- hp_map = {}
- installed_packages = self.base.sack.query().installed()
- for pkg_spec in set(pkg_specs):
- query = self.base.sack.query()
- # check the package exist in repo or not
- subj = dnf.subject.Subject(pkg_spec)
- parsed_nevras = subj.get_nevra_possibilities(forms=[hawkey.FORM_NEVRA])
- if len(parsed_nevras) != 1:
- logger.info(_('Cannot parse NEVRA for package "{nevra}"').format(nevra=pkg_spec))
- continue
-
- parsed_nevra = parsed_nevras[0]
- available_hp = query.available().filter(
- name=parsed_nevra.name,
- version=parsed_nevra.version,
- release=parsed_nevra.release,
- arch=parsed_nevra.arch,
- )
- if not available_hp:
- logger.info(_('No match for argument: %s'), self.base.output.term.bold(pkg_spec))
- continue
-
- # check the hot patch's target package installed or not
- pkg_info = self._parse_hp_name(pkg_spec)
- installed_pkg = installed_packages.filter(
- name=pkg_info["target_name"], version=pkg_info["target_version"], release=pkg_info["target_release"]
- ).run()
- if not installed_pkg:
- logger.info(
- _("The hot patch's target package is not installed: %s"), self.base.output.term.bold(pkg_spec)
- )
- continue
-
- if len(installed_pkg) != 1:
- logger.info(
- _("The hot patch '%s' has multiple target packages, please check."),
- self.base.output.term.bold(pkg_spec),
- )
- continue
- hp_subname = self._get_hp_subname_for_syscare(pkg_info)
- hp_map[pkg_spec] = hp_subname
- return hp_map
-
- @staticmethod
- def _get_applied_old_patch(available_hp_list: list) -> list:
- """
- get targets' applied accumulative hot patches.
- User can install and apply multiple sgl (single) hot patches because the rpm name is different,
- but for acc (accumulative) hot patch, user can only install one for a specific target binary rpm.
- Args:
- available_hp_list: e.g. ['redis-1.0-1/ACC-1-1', 'redis-1.0-1/SGL_CVE_2022_1-1-1']
-
- Returns:
- list: applied hot patches. e.g. ['redis-1.0-1/ACC-1-1']
- """
- hotpatch_set = set()
- hps_info = Syscare.list()
- for hp_info in hps_info:
- # hp_info[Name] is the middle column of syscare list. format: {target_rpm_name}/{hp_name}/{binary_file}
- # a hotpatch is mapped to a target binary rpm, and may affect multiple binary executable binary files
- # e.g. for hotpatch patch-redis-1-1-ACC-1-1.x86_64.rpm, it may provide 2 sub hotpatches in syscare list,
- # and SGL hotpatches may be installed at the same time
- # redis-1-1/ACC-1-1/redis
- # redis-1-1/ACC-1-1/redis-cli
- # redis-1-1/SGL_CVE_2022_1-1-1/redis
- # redis-1-1/SGL_CVE_2022_2-1-1/redis
- target, hp_name, binary_file = hp_info["Name"].split('/')
- hotpatch = target + '/' + hp_name
- # right now, if part of the hotpatch (for different binary file) is applied,
- # we consider the hotpatch is applied
- if hotpatch in available_hp_list and hp_info["Status"] != "NOT-APPLIED":
- logger.info(
- _("The hotpatch '%s' already has a '%s' sub hotpatch of binary file '%s'"),
- hotpatch,
- hp_info["Status"],
- binary_file,
- )
- if hotpatch not in hotpatch_set:
- hotpatch_set.add(hotpatch)
- return list(hotpatch_set)
-
- def _remove_hot_patches(self, applied_old_patches: list) -> None:
- # output = Output(self.base, dnf.conf.Conf())
- logger.info(_("Gonna remove these hot patches: %s"), applied_old_patches)
- # remove_flag = output.userconfirm()
- # if not remove_flag:
- # raise dnf.exceptions.Error(_('Operation aborted.'))
-
- self.syscare.save()
- for hp_name in applied_old_patches:
- logger.info(_("Remove hot patch %s."), hp_name)
- output, status = self.syscare.remove(hp_name)
- if status:
- logger.info(
- _("Remove hot patch '%s' failed, roll back to original status."),
- self.base.output.term.bold(hp_name),
- )
- output, status = self.syscare.restore()
- if status:
- raise dnf.exceptions.Error(_('Roll back failed.'))
- raise dnf.exceptions.Error(_('Roll back succeed.'))
-
- @staticmethod
- def _parse_hp_name(hp_filename: str) -> dict:
- """
- parse hot patch's name, get target rpm's name, version, release and hp's name.
- Args:
- hp_filename: hot patch's name, in the format of
- 'patch-{pkg_name}-{pkg_version}-{pkg_release}-{patchname}-{patch_version}-{patch_release}'
- e.g. patch-kernel-5.10.0-60.66.0.91.oe2203-ACC-1-1.x86_64
- patch-kernel-5.10.0-60.66.0.91.oe2203-SGL_CVE_2022_1-1-1.x86_64
- pkg_name may have '-' in it, patch name cannot have '-'.
- Returns:
- dict: rpm info. {"target_name": "", "target_version": "", "target_release": "", "hp_name": "",
- "hp_version": "", "hp_release": ""}
- """
- hp_filename_format = (
- "patch-{pkg_name}-{pkg_version}-{pkg_release}-{patch_name}-" "{patch_version}-{patch_release}.{arch}"
- )
-
- remove_suffix_filename = hp_filename.rsplit(".", 1)[0]
- splitted_hp_filename = remove_suffix_filename.split('-')
- try:
- rpm_info = {
- "target_release": splitted_hp_filename[-4],
- "target_version": splitted_hp_filename[-5],
- "target_name": "-".join(splitted_hp_filename[1:-5]),
- "hp_name": splitted_hp_filename[-3],
- "hp_version": splitted_hp_filename[-2],
- "hp_release": splitted_hp_filename[-1],
- }
- except IndexError as e:
- raise dnf.exceptions.Error(
- _(
- "Parse hot patch name failed. Please insert correct hot patch name "
- "with the format: \n %s" % hp_filename_format
- )
- )
- return rpm_info
-
- def _install_rpm_pkg(self, pkg_specs: list) -> bool:
- """
- install rpm package
- Args:
- pkg_specs: rpm package's full name
-
- Returns:
- bool
- """
- success = True
- for pkg_spec in pkg_specs:
- try:
- self.base.install(pkg_spec)
- except dnf.exceptions.MarkingError as e:
- logger.info(_('No match for argument: %s.'), self.base.output.term.bold(pkg_spec))
- success = False
- return success
-
- def get_hotpatch_based_on_cve(self, cves: list) -> list:
- """
- Get the hot patches corresponding to CVEs
- Args:
- cves: cve id list
-
- Returns:
- list: list of hot patches full name. e.g.["tmp2-tss-3.1.0-3.oe2203sp1"]
- """
- updateinfo = HotpatchUpdateInfo(self.cli.base, self.cli)
- hp_list = []
- cve_hp_dict = updateinfo.get_hotpatches_from_cve(cves)
- for cve, hp in cve_hp_dict.items():
- if not hp:
- logger.info(_("The cve doesn't exist or cannot be fixed by hotpatch: %s"), cve)
- continue
- hp_list += hp
- return list(set(hp_list))
-
- def get_hotpatch_based_on_advisory(self, advisories: list) -> list:
- """
- Get the hot patches corresponding to advisories
- Args:
- advisories: advisory id list
-
- Returns:
- list: list of hot patches full name. e.g.["tmp2-tss-3.1.0-3.oe2203sp1"]
- """
- updateinfo = HotpatchUpdateInfo(self.cli.base, self.cli)
- hp_list = []
- advisory_hp_dict = updateinfo.get_hotpatches_from_advisories(advisories)
- for hp in advisory_hp_dict.values():
- hp_list += hp
- return list(set(hp_list))
-
- def get_hotpatch_of_all_cve(self) -> list:
- """
- upgrade all exist cve using hot patches
- 1. find all cves when init HotpatchUpdateInfo
- 2. get the recommended hot patch for each cve
- 3. deduplication
- Returns:
- ['patch-redis-6.2.5-1-HP2-1-1.x86_64']
- """
- updateinfo = HotpatchUpdateInfo(self.cli.base, self.cli)
- cve_list = self.get_all_cve_which_can_be_fixed_by_hotpatch()
- hp_list = []
- cve_hp_dict = updateinfo.get_hotpatches_from_cve(cve_list)
- for hp in cve_hp_dict.values():
- if not hp:
- continue
- hp_list += hp
- return list(set(hp_list))
-
- def get_all_cve_which_can_be_fixed_by_hotpatch(self) -> list:
- """
- get all unfixed cve which can be fixed by hotpatch
- use command : dnf hot-updateinfo list cves
- Last metadata expiration check: 0:48:26 ago on 2023年06月01日 星期四 20时29分55秒.
- CVE-2023-3332 Low/Sec. - -
- CVE-2023-3331 Low/Sec. - -
- CVE-2023-1111 Important/Sec. - patch-redis-6.2.5-1-ACC-1-1.x86_64
- CVE-2023-1111 Important/Sec. - patch-redis-cli-6.2.5-1-ACC-1-1.x86_64
-
- Returns: list of unfixed cve. e.g.['CVE-2023-1111']
- """
- hp_hawkey = HotpatchUpdateInfo(self.cli.base, self.cli)
- hot_updateinfo = HotUpdateinfoCommand(self.cli)
- hot_updateinfo.opts = self.opts
- hot_updateinfo.hp_hawkey = hp_hawkey
- hot_updateinfo.filter_cves = None
- hot_updateinfo.opts.availability = 'available'
- all_cves = hot_updateinfo.get_formatting_parameters_and_display_lines()
- cve_set = set()
- for display_line in all_cves.display_lines:
- if display_line[3] != EMPTY_TAG:
- cve_set.add(display_line[0])
- return list(cve_set)
-
- def takeover_operation(self):
- """
- process takeover operation.
- """
- kernel_coldpatch = self.get_target_installed_kernel_coldpatch_of_hotpatch()
- self.is_need_accept_kernel_hp = False
- if kernel_coldpatch:
- logger.info(_("Gonna takeover kernel cold patch: ['%s']" % kernel_coldpatch))
- success = self._install_rpm_pkg([kernel_coldpatch])
- if success:
- return
- logger.info(_("Takeover operation failed."))
- else:
- logger.info(_('No kernel cold patch matched for takeover.'))
- self.is_need_accept_kernel_hp = True
- logger.info(
- _(
- 'To maintain the effectiveness of kernel hot patch after rebooting, gonna accept available kernel hot patch.'
- )
- )
- return
-
- def get_target_installed_kernel_coldpatch_of_hotpatch(self) -> str:
- """
- get the highest kernel cold patch of hot patch in "dnf hot-updateinfo list cves", if the corresponding
- kernel cold patch exists.
-
- Returns:
- str: full name of cold patch. e.g. kernel-5.10.0-1.x86_64
- """
- hp_hawkey = HotpatchUpdateInfo(self.cli.base, self.cli)
- hot_updateinfo = HotUpdateinfoCommand(self.cli)
- hot_updateinfo.opts = self.opts
- hot_updateinfo.hp_hawkey = hp_hawkey
- hot_updateinfo.filter_cves = None
- hot_updateinfo.opts.availability = 'available'
- all_cves = hot_updateinfo.get_formatting_parameters_and_display_lines()
- kernel_highest_vere = ""
- target_kernel_coldpatch = None
- version = Versions()
- for display_line in all_cves.display_lines:
- if display_line[3] not in self.hp_list:
- continue
- kernel_pkg_spec = display_line[2]
- if kernel_pkg_spec == EMPTY_TAG:
- continue
- pkg_name, pkg_ver = self._get_pkg_name_and_ver(kernel_pkg_spec)
- if pkg_name != "kernel":
- continue
- if kernel_highest_vere == "":
- kernel_highest_vere = pkg_ver
- target_kernel_coldpatch = kernel_pkg_spec
- continue
- if version.larger_than(pkg_ver, kernel_highest_vere):
- kernel_highest_vere = pkg_ver
- target_kernel_coldpatch = kernel_pkg_spec
-
- return target_kernel_coldpatch
-
- def _get_pkg_name_and_ver(self, pkg_spec: str):
- """
- get the "name" and "version-release" string of package.
-
- Args:
- str: pkg_specs: full name of cold patch. e.g. kernel-5.10.0-1.x86_64
- """
- subj = dnf.subject.Subject(pkg_spec)
- parsed_nevras = subj.get_nevra_possibilities(forms=[hawkey.FORM_NEVRA])
- if len(parsed_nevras) != 1:
- logger.info(_('Cannot parse NEVRA for package "{nevra}"').format(nevra=pkg_spec))
- return ""
- parsed_nevra = parsed_nevras[0]
- return parsed_nevra.name, "%s-%s" % (parsed_nevra.version, parsed_nevra.release)
-
- def _accept_kernel_hp(self, hp_full_name: str):
- """
- accept kernel hot patch
-
- Args:
- str: hp_full_name: full name of hot patch. e.g. patch-kernel-5.10.0-1-ACC-1-1.x86_64
- """
- pkg_info = self._parse_hp_name(hp_full_name)
- if pkg_info['target_name'] != "kernel":
- return
- hp_subname = self._get_hp_subname_for_syscare(pkg_info)
- output, status = self.syscare.accept(hp_subname)
- if status:
- logger.info(_('Accept kernel hot patch failed: %s.'), hp_subname)
- else:
- logger.info(_('Accept kernel hot patch succeed: %s.'), hp_subname)
diff --git a/hotpatch/syscare.py b/hotpatch/syscare.py
deleted file mode 100644
index a858ed4..0000000
--- a/hotpatch/syscare.py
+++ /dev/null
@@ -1,116 +0,0 @@
-#!/usr/bin/python3
-# ******************************************************************************
-# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
-# licensed under the Mulan PSL v2.
-# You can use this software according to the terms and conditions of the Mulan PSL v2.
-# You may obtain a copy of Mulan PSL v2 at:
-# http://license.coscl.org.cn/MulanPSL2
-# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
-# PURPOSE.
-# See the Mulan PSL v2 for more details.
-# ******************************************************************************/
-import subprocess
-from typing import List
-
-SUCCEED = 0
-FAIL = 255
-
-
-def cmd_output(cmd):
- try:
- result = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- result.wait()
- return result.stdout.read().decode('utf-8'), result.returncode
- except Exception as e:
- print("error: ", e)
- return str(e), FAIL
-
-
-class Syscare:
- @classmethod
- def list(cls, condition=None) -> List[dict]:
- """
- Target Name Status
- redis-6.2.5-1.oe2203 CVE-2021-23675 ACTIVED
- kernel-5.10.0-60.80.0.104.oe2203 modify-proc-version ACTIVED
- """
- cmd = ["syscare", "list"]
- list_output, return_code = cmd_output(cmd)
- if return_code != SUCCEED:
- return []
-
- content = list_output.split('\n')
- if len(content) <= 2:
- return []
-
- header = content[0].split()
- result = []
- for item in content[1:-1]:
- tmp = dict(zip(header, item.split()))
- if not condition or cls.judge(tmp, condition):
- result.append(tmp)
- return result
-
- @staticmethod
- def judge(content: dict, condition: dict):
- for key, value in condition.items():
- if content.get(key) != value:
- return False
- return True
-
- @staticmethod
- def status(patch_name: str):
- cmd = ["syscare", "status", patch_name]
- output, return_code = cmd_output(cmd)
-
- return output, return_code
-
- @staticmethod
- def active(patch_name: str):
- cmd = ["syscare", "active", patch_name]
- output, return_code = cmd_output(cmd)
-
- return output, return_code
-
- @staticmethod
- def deactive(patch_name: str):
- cmd = ["syscare", "deactive", patch_name]
- output, return_code = cmd_output(cmd)
-
- return output, return_code
-
- @staticmethod
- def remove(patch_name: str):
- cmd = ["syscare", "remove", patch_name]
- output, return_code = cmd_output(cmd)
-
- return output, return_code
-
- @staticmethod
- def apply(patch_name: str):
- cmd = ["syscare", "apply", patch_name]
- output, return_code = cmd_output(cmd)
-
- return output, return_code
-
- @staticmethod
- def save():
- cmd = ["syscare", "save"]
- output, return_code = cmd_output(cmd)
-
- return output, return_code
-
- @staticmethod
- def restore():
- cmd = ["syscare", "restore"]
- output, return_code = cmd_output(cmd)
-
- return output, return_code
-
- @staticmethod
- def accept(patch_name: str):
- cmd = ["syscare", "accept", patch_name]
- output, return_code = cmd_output(cmd)
-
- return output, return_code
diff --git a/hotpatch/test_hotpatch.py b/hotpatch/test_hotpatch.py
deleted file mode 100644
index e239c86..0000000
--- a/hotpatch/test_hotpatch.py
+++ /dev/null
@@ -1,45 +0,0 @@
-#!/usr/bin/python3
-# ******************************************************************************
-# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
-# licensed under the Mulan PSL v2.
-# You can use this software according to the terms and conditions of the Mulan PSL v2.
-# You may obtain a copy of Mulan PSL v2 at:
-# http://license.coscl.org.cn/MulanPSL2
-# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
-# PURPOSE.
-# See the Mulan PSL v2 for more details.
-# ******************************************************************************/
-import unittest
-from unittest import mock
-
-from .hotpatch import HotpatchCommand
-from .syscare import SUCCEED, FAIL
-
-
-class HotpatchTestCase(unittest.TestCase):
- def setUp(self) -> None:
- self.cli = mock.MagicMock()
- self.cmd = HotpatchCommand(self.cli)
-
- def test_operate_hot_patches_should_return_none(self):
- target_patch = ["patch1"]
- operate = mock.MagicMock()
- func = mock.MagicMock()
- func.return_value = ("", FAIL)
- self.assertIsNone(self.cmd.operate_hot_patches(target_patch, operate, func))
- self.assertEqual(self.cmd.base.output.term.bold.call_args_list[1][0][0], target_patch[0])
-
- func.return_value = ("", SUCCEED)
- self.assertIsNone(self.cmd.operate_hot_patches(target_patch, operate, func))
- self.assertEqual(self.cmd.base.output.term.bold.call_args_list[1][0][0], target_patch[0])
-
- def test_operate_hot_patches_should_return_none_when_target_patch_is_none(self):
- target_patch = []
- operate = "apply"
- func = mock.MagicMock()
- self.assertIsNone(self.cmd.operate_hot_patches(target_patch, operate, func))
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/hotpatch/test_hotupgrade.py b/hotpatch/test_hotupgrade.py
deleted file mode 100644
index 7f003c7..0000000
--- a/hotpatch/test_hotupgrade.py
+++ /dev/null
@@ -1,75 +0,0 @@
-#!/usr/bin/python3
-# ******************************************************************************
-# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
-# licensed under the Mulan PSL v2.
-# You can use this software according to the terms and conditions of the Mulan PSL v2.
-# You may obtain a copy of Mulan PSL v2 at:
-# http://license.coscl.org.cn/MulanPSL2
-# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
-# PURPOSE.
-# See the Mulan PSL v2 for more details.
-# ******************************************************************************/
-import unittest
-from unittest import mock
-
-from .updateinfo_parse import HotpatchUpdateInfo
-from .hotupgrade import HotupgradeCommand
-
-
-class UpgradeTestCase(unittest.TestCase):
- def setUp(self) -> None:
- cli = mock.MagicMock()
- self.cmd = HotupgradeCommand(cli)
-
- @mock.patch.object(HotpatchUpdateInfo, "get_hotpatches_from_cve")
- def test_get_hotpatch_based_on_cve_should_return_empty_list_when_no_patch_found(self, mock_patch):
- mock_patch.return_value = {"CVE-2022-1": ["patch-kernel-4.19-1-ACC-1-1"],
- "CVE-2022-2": ["patch-kernel-4.19-1-ACC-1-1"]}
- res = self.cmd.get_hotpatch_based_on_cve(["CVE-2022-3"])
- expected_res = []
- self.assertEqual(res, expected_res)
-
- @mock.patch.object(HotpatchUpdateInfo, "get_hotpatches_from_cve")
- def test_get_hotpatch_based_on_cve_should_return_correct_when_two_cve_have_same_patch(self, mock_patch):
- mock_patch.return_value = {"CVE-2022-1": ["patch-kernel-4.19-1-ACC-1-1"],
- "CVE-2022-2": ["patch-kernel-4.19-1-ACC-1-1", "patch-kernel-tools-4.19-1-ACc-1-1"]}
- res = self.cmd.get_hotpatch_based_on_cve(["CVE-2022-1", "CVE-2022-2"])
- expected_res = ["patch-kernel-4.19-1-ACC-1-1", "patch-kernel-tools-4.19-1-ACc-1-1"]
- self.assertEqual(res, expected_res)
-
- @mock.patch.object(HotpatchUpdateInfo, "get_hotpatches_from_advisories")
- def test_get_hotpatch_based_on_advisory_should_return_empty_list_when_no_patch_found(self, mock_patch):
- mock_patch.return_value = {"SA-2022-1": ["patch-kernel-4.19-1-ACC-1-1"],
- "SA-2022-2": ["patch-kernel-4.19-1-ACC-1-1"]}
- res = self.cmd.get_hotpatch_based_on_advisory(["SA-2022-3"])
- expected_res = []
- self.assertEqual(res, expected_res)
-
- @mock.patch.object(HotpatchUpdateInfo, "get_hotpatches_from_advisories")
- def test_get_hotpatch_based_on_advisory_should_return_correct_when_two_cve_have_same_patch(self, mock_patch):
- mock_patch.return_value = {"SA-2022-1": ["patch-kernel-4.19-1-ACC-1-1"],
- "SA-2022-2": ["patch-kernel-4.19-1-ACC-1-1", "patch-kernel-tools-4.19-1-ACc-1-1"]}
- res = self.cmd.get_hotpatch_based_on_cve(["SA-2022-2"])
- expected_res = ["patch-kernel-4.19-1-ACC-1-1", "patch-kernel-tools-4.19-1-ACc-1-1"]
- self.assertEqual(res, expected_res)
-
- @mock.patch.object(HotpatchUpdateInfo, "get_hotpatch_of_all_cve")
- def test_get_hotpatch_of_all_cve_should_return_empty_list_when_no_patch_found(self, mock_patch):
- mock_patch.return_value = {"CVE-2022-1": [],
- "CVE-2022-2": []}
- res = self.cmd.get_hotpatch_of_all_cve()
- expected_res = []
- self.assertEqual(res, expected_res)
-
- @mock.patch.object(HotpatchUpdateInfo, "get_hotpatch_of_all_cve")
- def test_get_hotpatch_of_all_cve_should_return_correct_when_two_cve_have_same_patch(self, mock_patch):
- mock_patch.return_value = {"CVE-2022-1": ["patch-kernel-4.19-1-ACC-1-1"],
- "CVE-2022-2": ["patch-kernel-4.19-1-ACC-1-1", "patch-kernel-tools-4.19-1-ACc-1-1"]}
- res = self.cmd.get_hotpatch_of_all_cve()
- expected_res = ["patch-kernel-4.19-1-ACC-1-1", "patch-kernel-tools-4.19-1-ACc-1-1"]
- self.assertEqual(res, expected_res)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/hotpatch/test_syscare.py b/hotpatch/test_syscare.py
deleted file mode 100644
index 50903ed..0000000
--- a/hotpatch/test_syscare.py
+++ /dev/null
@@ -1,152 +0,0 @@
-#!/usr/bin/python3
-# ******************************************************************************
-# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
-# licensed under the Mulan PSL v2.
-# You can use this software according to the terms and conditions of the Mulan PSL v2.
-# You may obtain a copy of Mulan PSL v2 at:
-# http://license.coscl.org.cn/MulanPSL2
-# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
-# PURPOSE.
-# See the Mulan PSL v2 for more details.
-# ******************************************************************************/
-import unittest
-from unittest import mock
-
-from .syscare import SUCCEED, FAIL
-from .syscare import Syscare, cmd_output
-
-
-class SyscareTestCase(unittest.TestCase):
- @mock.patch("hotpatch.syscare.cmd_output")
- def test_list_should_return_empty_when_cmd_output_nothing(self, mock_cmd):
- mock_cmd.return_value = "", FAIL
- result = Syscare().list()
- self.assertEqual(result, [])
-
- @mock.patch("hotpatch.syscare.cmd_output")
- def test_list_should_return_empty_when_cmd_output_only_header(self, mock_cmd):
- mock_cmd.return_value = "Target Name Status\n", SUCCEED
- result = Syscare().list()
- self.assertEqual(result, [])
-
- @mock.patch("hotpatch.syscare.cmd_output")
- def test_list_should_return_correct_result_when_cmd_output_correct(self, mock_cmd):
- mock_cmd.return_value = "Target Name Status\nredis-6.2.5-1.oe2203 CVE-2021-23675 ACTIVED\n", SUCCEED
- result = Syscare().list()
- expected_res = [{'Target': 'redis-6.2.5-1.oe2203', 'Name': 'CVE-2021-23675', 'Status': 'ACTIVED'}]
- self.assertEqual(result, expected_res)
-
- @mock.patch("hotpatch.syscare.cmd_output")
- def test_list_should_return_filtered_result_when_input_with_condition(self, mock_cmd):
- mock_cmd.return_value = (
- "Target Name Status\nredis-6.2.5-1.oe2203 CVE-2021-23675 ACTIVED\n"
- "kernel-5.10.0-60.80.0.104.oe2203 modify-proc-version DEACTIVED\n",
- SUCCEED,
- )
- result = Syscare().list(condition={"Status": "ACTIVED"})
- expected_res = [{'Target': 'redis-6.2.5-1.oe2203', 'Name': 'CVE-2021-23675', 'Status': 'ACTIVED'}]
- self.assertEqual(result, expected_res)
-
- @mock.patch("hotpatch.syscare.cmd_output")
- def test_status_should_return_correct(self, mock_cmd):
- mock_cmd.return_value = "ACTIVED", SUCCEED
- patch_name = mock.MagicMock()
- result, status_code = Syscare().status(patch_name)
- expected_res = "ACTIVED"
- expected_code = SUCCEED
- self.assertEqual(result, expected_res)
- self.assertEqual(status_code, expected_code)
-
- @mock.patch("hotpatch.syscare.cmd_output")
- def test_apply_should_return_correct(self, mock_cmd):
- mock_cmd.return_value = "", SUCCEED
- patch_name = mock.MagicMock()
- result, status_code = Syscare().apply(patch_name)
- expected_res = ""
- expected_code = SUCCEED
- self.assertEqual(result, expected_res)
- self.assertEqual(status_code, expected_code)
-
- @mock.patch("hotpatch.syscare.cmd_output")
- def test_active_should_return_correct(self, mock_cmd):
- mock_cmd.return_value = "", SUCCEED
- patch_name = mock.MagicMock()
- result, status_code = Syscare().active(patch_name)
- expected_res = ""
- expected_code = SUCCEED
- self.assertEqual(result, expected_res)
- self.assertEqual(status_code, expected_code)
-
- @mock.patch("hotpatch.syscare.cmd_output")
- def test_deactive_should_return_correct(self, mock_cmd):
- mock_cmd.return_value = "", SUCCEED
- patch_name = mock.MagicMock()
- result, status_code = Syscare().deactive(patch_name)
- expected_res = ""
- expected_code = SUCCEED
- self.assertEqual(result, expected_res)
- self.assertEqual(status_code, expected_code)
-
- @mock.patch("hotpatch.syscare.cmd_output")
- def test_accept_should_return_correct(self, mock_cmd):
- mock_cmd.return_value = "", SUCCEED
- patch_name = mock.MagicMock()
- result, status_code = Syscare().accept(patch_name)
- expected_res = ""
- expected_code = SUCCEED
- self.assertEqual(result, expected_res)
- self.assertEqual(status_code, expected_code)
-
- @mock.patch("hotpatch.syscare.cmd_output")
- def test_remove_should_return_correct(self, mock_cmd):
- mock_cmd.return_value = "", SUCCEED
- patch_name = mock.MagicMock()
- result, status_code = Syscare().remove(patch_name)
- expected_res = ""
- expected_code = SUCCEED
- self.assertEqual(result, expected_res)
- self.assertEqual(status_code, expected_code)
-
- @mock.patch("hotpatch.syscare.cmd_output")
- def test_save_should_return_correct(self, mock_cmd):
- mock_cmd.return_value = "", SUCCEED
- result, status_code = Syscare().save()
- expected_res = ""
- expected_code = SUCCEED
- self.assertEqual(result, expected_res)
- self.assertEqual(status_code, expected_code)
-
- @mock.patch("hotpatch.syscare.cmd_output")
- def test_restore_should_return_correct(self, mock_cmd):
- mock_cmd.return_value = "", SUCCEED
- result, status_code = Syscare().restore()
- expected_res = ""
- expected_code = SUCCEED
- self.assertEqual(result, expected_res)
- self.assertEqual(status_code, expected_code)
-
- @mock.patch('subprocess.Popen')
- def test_cmd_output_should_return_correct_when_popen_return_success(self, mock_popen):
- expected_output = "Hello"
- expected_returncode = SUCCEED
- mock_process = mock_popen.return_value
- mock_process.stdout.read.return_value = expected_output.encode('utf-8')
- mock_process.returncode = expected_returncode
- output, returncode = cmd_output(['echo', 'hello'])
- self.assertEqual(output, expected_output)
- self.assertEqual(returncode, expected_returncode)
-
- @mock.patch('subprocess.Popen')
- def test_cmd_output_should_raise_exception_when_popen_excute_fail(self, mock_popen):
- expected_output = "-bash: hellocommand not found"
- expected_returncode = FAIL
- mock_popen.side_effect = Exception('-bash: hellocommand not found')
- cmd = mock.MagicMock()
- output, returncode = cmd_output(cmd)
- self.assertEqual(output, expected_output)
- self.assertEqual(returncode, expected_returncode)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/hotpatch/updateinfo_parse.py b/hotpatch/updateinfo_parse.py
deleted file mode 100644
index 5fe36da..0000000
--- a/hotpatch/updateinfo_parse.py
+++ /dev/null
@@ -1,527 +0,0 @@
-#!/usr/bin/python3
-# ******************************************************************************
-# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
-# licensed under the Mulan PSL v2.
-# You can use this software according to the terms and conditions of the Mulan PSL v2.
-# You may obtain a copy of Mulan PSL v2 at:
-# http://license.coscl.org.cn/MulanPSL2
-# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
-# PURPOSE.
-# See the Mulan PSL v2 for more details.
-# ******************************************************************************/
-import os
-import re
-import gzip
-import datetime
-import subprocess
-import xml.etree.ElementTree as ET
-from functools import cmp_to_key
-from typing import List
-from .baseclass import Hotpatch, Cve, Advisory
-from .syscare import Syscare
-from .version import Versions
-
-SUCCEED = 0
-FAIL = 255
-
-
-def cmd_output(cmd):
- try:
- result = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- result.wait()
- return result.stdout.read().decode('utf-8'), result.returncode
- except Exception as e:
- print("error: ", e)
- return str(e), FAIL
-
-
-class HotpatchUpdateInfo(object):
- """
- Hotpatch relevant updateinfo processing
- """
-
- UNINSTALLABLE = 0
- INSTALLED = 1
- INSTALLABLE = 2
- UNRELATED = 3
-
- syscare = Syscare()
- version = Versions()
-
- def __init__(self, base, cli):
- self.base = base
- self.cli = cli
- # dict {advisory_id: Advisory}
- self._hotpatch_advisories = {}
- # dict {cve_id: Cve}
- self._hotpatch_cves = {}
- # list [{'Uuid': uuid, 'Name':name, 'Status': status}]
- self._hotpatch_status = []
- # dict {required_pkg_info_str: [Hotpatch]}
- self._hotpatch_required_pkg_info_str = {}
-
- self.init_hotpatch_info()
-
- def init_hotpatch_info(self):
- """
- Initialize hotpatch information
- """
- self._get_installed_pkgs()
- self._parse_and_store_hotpatch_info_from_updateinfo()
- self._init_hotpatch_status_from_syscare()
- self._init_hotpatch_state()
-
- @property
- def hotpatch_cves(self):
- return self._hotpatch_cves
-
- @property
- def hotpatch_status(self):
- return self._hotpatch_status
-
- @property
- def hotpatch_required_pkg_info_str(self):
- return self._hotpatch_required_pkg_info_str
-
- def _get_installed_pkgs(self):
- """
- Get installed packages by setting the hawkey
- """
- sack = self.base.sack
- # the latest installed packages
- q = sack.query().installed().latest(1)
- # plus packages of the running kernel
- kernel_q = sack.query().filterm(empty=True)
- kernel = sack.get_running_kernel()
- if kernel:
- kernel_q = kernel_q.union(sack.query().filterm(sourcerpm=kernel.sourcerpm))
- q = q.union(kernel_q.installed())
- q = q.apply()
-
- self._inst_pkgs_query = q
-
- def _parse_and_store_hotpatch_info_from_updateinfo(self):
- """
- Initialize hotpatch information from repos
- """
- # get xxx-updateinfo.xml.gz file paths by traversing the system_cachedir(/var/cache/dnf)
- system_cachedir = self.cli.base.conf.system_cachedir
- all_repos = self.cli.base.repos
- map_repo_updateinfoxml = {}
-
- for file in os.listdir(system_cachedir):
- file_path = os.path.join(system_cachedir, file)
- if os.path.isdir(file_path):
- repodata_path = os.path.join(file_path, "repodata")
- if not os.path.isdir(repodata_path):
- continue
-
- for xml_file in os.listdir(repodata_path):
- # the hotpatch relevant updateinfo is recorded in xxx-updateinfo.xml.gz
- if "updateinfo" in xml_file:
- repo_name = file.rsplit("-", 1)[0]
- cache_updateinfo_xml_path = os.path.join(repodata_path, xml_file)
- map_repo_updateinfoxml[repo_name] = cache_updateinfo_xml_path
-
- # only hotpatch relevant updateinfo from enabled repos are parsed and stored
- for repo in all_repos.iter_enabled():
- repo_id = repo.id
- if repo_id in map_repo_updateinfoxml:
- updateinfoxml_path = map_repo_updateinfoxml[repo_id]
- self._parse_and_store_from_xml(updateinfoxml_path)
-
- def _parse_pkglist(self, pkglist):
- """
- Parse the pkglist information, filter the hotpatches with different arches
- """
- hotpatches = []
- hot_patch_collection = pkglist.find('hot_patch_collection')
- arches = self.base.sack.list_arches()
- if not hot_patch_collection:
- return hotpatches
- for package in hot_patch_collection.iter('package'):
- hotpatch = {key: value for key, value in package.items()}
- if hotpatch['arch'] not in arches:
- continue
- hotpatch['filename'] = package.find('filename').text
- hotpatches.append(hotpatch)
- return hotpatches
-
- def _parse_references(self, reference):
- """
- Parse the reference information, check whether the 'id' is missing
- """
- cves = []
- for ref in reference:
- cve = {key: value for key, value in ref.items()}
- if 'id' not in cve:
- continue
- cves.append(cve)
- return cves
-
- def _verify_date_str_lawyer(self, datetime_str: str) -> str:
- """
- Check whether the 'datetime' field is legal, if not return default value
- """
- if datetime_str.isdigit() and len(datetime_str) == 10:
- datetime_str = int(datetime_str)
- datetime_str = datetime.datetime.fromtimestamp(datetime_str).strftime("%Y-%m-%d %H:%M:%S")
- try:
- datetime.datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S')
- return datetime_str
- except ValueError:
- return "1970-01-01 08:00:00"
-
- def _parse_advisory(self, update):
- """
- Parse the advisory information: check whether the 'datetime' field is legal, parse the 'references'
- field and the 'pkglist' field, save 'type' information
- """
- advisory = {}
- for node in update:
- if node.tag == 'datetime':
- advisory[node.tag] = self._verify_date_str_lawyer(update.find(node.tag).text)
- elif node.tag == 'references':
- advisory[node.tag] = self._parse_references(node)
- elif node.tag == 'pkglist':
- advisory['hotpatches'] = self._parse_pkglist(node)
- else:
- advisory[node.tag] = update.find(node.tag).text
- advisory['adv_type'] = update.get('type')
- return advisory
-
- def _get_hotpatch_require_pkgs_info(self, hotpatch: Hotpatch):
- """
- Get require packages from requires info of hotpatch packages. Specifically, read the require
- information of the rpm package, get the target coldpatch rpm package, and record the
- name_vere(name-version-release) information of the coldpatch.
-
- Returns:
- require pkgs: dict
- e.g.
- {
- 'redis': '6.2.5-1',
- 'redis-cli': '6.2.5-1'
- }
- """
- require_pkgs = dict()
- cmd = ["dnf", "repoquery", "--requires", hotpatch.nevra]
- requires_output, return_code = cmd_output(cmd)
- if return_code != SUCCEED:
- return require_pkgs
- requires_output = requires_output.split('\n')
- for requires in requires_output:
- if " = " not in requires:
- continue
- require_pkg = requires
- pkg_name, pkg_vere = re.split(' = ', require_pkg)
- if pkg_name == "syscare":
- continue
- require_pkgs[pkg_name] = pkg_vere
- return require_pkgs
-
- def _store_advisory_info(self, advisory_kwargs: dict):
- """
- Instantiate Cve, Hotpatch and Advisory object according to the advisory kwargs
- """
- advisory_references = advisory_kwargs.pop('references')
- advisory_hotpatches = advisory_kwargs.pop('hotpatches')
- advisory = Advisory(**advisory_kwargs)
- advisory_cves = {}
- for cve_kwargs in advisory_references:
- cve = Cve(**cve_kwargs)
-
- if cve.cve_id not in self._hotpatch_cves:
- self._hotpatch_cves[cve.cve_id] = cve
- advisory_cves[cve.cve_id] = self._hotpatch_cves[cve.cve_id]
-
- advisory.cves = advisory_cves
-
- for hotpatch_kwargs in advisory_hotpatches:
- hotpatch = Hotpatch(**hotpatch_kwargs)
- hotpatch.advisory = advisory
- hotpatch.cves = advisory_cves.keys()
- hotpatch.required_pkgs_info = self._get_hotpatch_require_pkgs_info(hotpatch)
-
- advisory.add_hotpatch(hotpatch)
-
- for ref_id in advisory_cves.keys():
- advisory_cves[ref_id].add_hotpatch(hotpatch)
-
- hotpatch_vere = "%s-%s" % (hotpatch.version, hotpatch.release)
- self._hotpatch_required_pkg_info_str.setdefault(hotpatch.required_pkgs_str, []).append(
- [hotpatch_vere, hotpatch]
- )
-
- self._hotpatch_advisories[advisory_kwargs['id']] = advisory
-
- def _init_hotpatch_state(self):
- """
- Initialize the hotpatch state, according to the target require package information and status
- information in syscare list.
- """
- for advisory in self._hotpatch_advisories.values():
- for hotpatch in advisory.hotpatches:
- self._set_hotpatch_state(hotpatch)
-
- def _set_hotpatch_state(self, hotpatch: Hotpatch):
- """
- Set hotpatch state.
-
- each hotpatch has four states:
- 1. UNRELATED: the target required pakcage version is smaller than the version of package
- installed on this machine or not installed on this machine
- 2. UNINSTALLABLE: can not be installed due to the target required package version is bigger
- 3. INSTALLED: has been installed and actived/accepted in syscare
- 4. INSTALLABLE: can be installed
-
- Args:
- hotpatch(Hotpatch)
- """
- hotpatch.state = self.UNRELATED
- is_find_installable_hp = False
- for required_pkg_name, required_pkg_vere in hotpatch.required_pkgs_info.items():
- inst_pkgs = self._inst_pkgs_query.filter(name=required_pkg_name)
- # check whether the relevant target required package is installed on this machine
- if not inst_pkgs:
- return
- for inst_pkg in inst_pkgs:
- inst_pkg_vere = '%s-%s' % (inst_pkg.version, inst_pkg.release)
- if not self.version.larger_than(required_pkg_vere, inst_pkg_vere):
- hotpatch.state = self.UNRELATED
- elif required_pkg_vere != inst_pkg_vere:
- hotpatch.state = self.UNINSTALLABLE
- else:
- is_find_installable_hp = True
-
- if not is_find_installable_hp:
- return
-
- if self._get_hotpatch_aggregated_status_in_syscare(hotpatch) in ('ACTIVED', "ACCEPTED"):
- hotpatch.state = self.INSTALLED
- else:
- hotpatch.state = self.INSTALLABLE
- return
-
- def _parse_and_store_from_xml(self, updateinfoxml: str):
- """
- Parse and store hotpatch update information from xxx-updateinfo.xml.gz
-
- xxx-updateinfo.xml.gz e.g.
-
- <?xml version="1.0" encoding="UTF-8"?>
- <updates>
- <update from="openeuler.org" type="security" status="stable">
- <id>openEuler-SA-2022-1</id>
- <title>An update for mariadb is now available for openEuler-22.03-LTS</title>
- <severity>Important</severity>
- <release>openEuler</release>
- <issued date="2022-04-16"></issued>
- <references>
- <reference href="https://nvd.nist.gov/vuln/detail/CVE-2021-1111" id="CVE-2021-1111" title="CVE-2021-1111" type="cve"></reference>
- <reference href="https://nvd.nist.gov/vuln/detail/CVE-2021-1112" id="CVE-2021-1112" title="CVE-2021-1112" type="cve"></reference>
- </references>
- <description>patch-redis-6.2.5-1-HP001.(CVE-2022-24048)</description>
- <pkglist>
- <hot_patch_collection>
- <name>openEuler</name>
- <package arch="aarch64" name="patch-redis-6.2.5-1-HP001" release="1" version="1">
- <filename>patch-redis-6.2.5-1-HP001-1-1.aarch64.rpm</filename>
- </package>
- <package arch="x86_64" name="patch-redis-6.2.5-1-HP001" release="1" version="1">
- <filename>patch-redis-6.2.5-1-HP001-1-1.x86_64.rpm</filename>
- </package>
- <package arch="aarch64" name="patch-redis-6.2.5-1-HP002" release="1" version="1">
- <filename>patch-redis-6.2.5-1-HP002-1-1.aarch64.rpm</filename>
- </package>
- <package arch="x86_64" name="patch-redis-6.2.5-1-HP002" release="1" version="1">
- <filename>patch-redis-6.2.5-1-HP002-1-1.x86_64.rpm</filename>
- </package>
- </hot_patch_collection>
- </pkglist>
- </update>
- ...
- </updates>
- """
- content = gzip.open(updateinfoxml)
- tree = ET.parse(content)
- root = tree.getroot()
- for update in root.iter('update'):
- # check whether the hotpatch relevant package information is in each advisory
- if not update.find('pkglist/hot_patch_collection'):
- continue
- advisory = self._parse_advisory(update)
- self._store_advisory_info(advisory)
-
- def _init_hotpatch_status_from_syscare(self):
- """
- Initialize hotpatch status from syscare
- """
- self._hotpatch_status = self.syscare.list()
- self._hotpatch_state = {}
- for hotpatch_info in self._hotpatch_status:
- self._hotpatch_state[hotpatch_info['Name']] = hotpatch_info['Status']
-
- def _get_hotpatch_aggregated_status_in_syscare(self, hotpatch: Hotpatch) -> str:
- """
- Get hotpatch aggregated status in syscare.
- """
- hotpatch_status = ''
-
- for name, status in self._hotpatch_state.items():
- if hotpatch.syscare_subname not in name:
- continue
- elif status in ('ACTIVED', 'ACCEPTED'):
- hotpatch_status = status
- elif status not in ('ACTIVED', 'ACCEPTED'):
- return ''
- return hotpatch_status
-
- def get_hotpatches_from_cve(self, cves: List[str], priority="ACC") -> dict():
- """
- Get hotpatches from specified cve. If there are several hotpatches for the same target required
- package for a cve, only return the hotpatch with the highest version according to the priority.
-
- Args:
- cves: [cve_id_1, cve_id_2]
-
- Returns:
- {
- cve_id_1: [hotpatch1],
- cve_id_2: []
- }
- """
-
- mapping_cve_hotpatches = dict()
- if priority not in ("ACC", "SGL"):
- return mapping_cve_hotpatches
-
- for cve_id in cves:
- mapping_cve_hotpatches[cve_id] = []
- if cve_id not in self.hotpatch_cves:
- continue
-
- self.update_mapping_cve_hotpatches(priority, mapping_cve_hotpatches, cve_id)
-
- return mapping_cve_hotpatches
-
- def version_cmp(self, hotpatch_a: Hotpatch, hotpatch_b: Hotpatch):
- """
- Sort the hotpatch in descending order according to the version-release.
- """
- vere_a = "%s-%s" % (hotpatch_a.version, hotpatch_a.release)
- vere_b = "%s-%s" % (hotpatch_b.version, hotpatch_b.release)
- if self.version.larger_than(vere_a, vere_b):
- return -1
- if self.version.larger_than(vere_b, vere_a):
- return 1
- return 0
-
- def update_mapping_cve_hotpatches(self, priority: str, mapping_cve_hotpatches: dict, cve_id: str):
- """
- Update mapping_cve_hotpatches. Specifically, get the hotpatches corresponding to the cve_id, if find
- at least one installable hotpatch, append hotpatches in mapping_cve_hotpatches for cve_id, in which
- only the hotpatch with the highest version (priority first) for each targeted required package is
- returned. Otherwise, do not update mapping_cve_hotpatches.
-
- Args:
- priority(str): ACC or SGL
- mapping_cve_hotpatches(dict): cve and corresponding hotpatches
- e.g.
- {
- cve_id: [Hotpatch.nevra]
- }
- cve_id(str): cve id. e.g. "CVE-2023-1111"
- """
- mapping_required_pkg_to_hotpatches = dict()
- fixed_required_pkgs_str = [
- hotpatch.required_pkgs_str
- for hotpatch in self.hotpatch_cves[cve_id].hotpatches
- if hotpatch.state == self.INSTALLED
- ]
- for hotpatch in self.hotpatch_cves[cve_id].hotpatches:
- if hotpatch.state != self.INSTALLABLE:
- continue
- if hotpatch.required_pkgs_str in fixed_required_pkgs_str:
- continue
- mapping_required_pkg_to_hotpatches.setdefault(hotpatch.required_pkgs_str, []).append(hotpatch)
- if hotpatch.hotpatch_name == "ACC":
- self.append_related_hotpatches(mapping_required_pkg_to_hotpatches)
-
- if not mapping_required_pkg_to_hotpatches:
- return
-
- # find the hotpatch with the highest version for the same target required package
- for hotpatches in mapping_required_pkg_to_hotpatches.values():
- prefered_hotpatch = self.get_preferred_hotpatch(hotpatches, priority)
- if not prefered_hotpatch:
- continue
- if prefered_hotpatch.state == self.INSTALLABLE:
- mapping_cve_hotpatches[cve_id].append(prefered_hotpatch.nevra)
-
- def append_related_hotpatches(self, mapping_required_pkg_to_hotpatches: dict):
- """
- Append the hotpatches corresponding to the target required package in
- mapping_required_pkg_to_hotpatches.
-
- Args:
- mapping_required_pkg_to_hotpatches(dict): target required pkg and corresponding hotpatches
- e.g.
- {
- "redis,redis-cli": [Hotpatch]
- }
-
- """
- for cve in self.hotpatch_cves.values():
- for hotpatch in cve.hotpatches:
- if hotpatch.hotpatch_name != "ACC":
- continue
- if hotpatch.required_pkgs_str not in mapping_required_pkg_to_hotpatches:
- continue
- if hotpatch in mapping_required_pkg_to_hotpatches[hotpatch.required_pkgs_str]:
- continue
- mapping_required_pkg_to_hotpatches[hotpatch.required_pkgs_str].append(hotpatch)
-
- def get_preferred_hotpatch(self, hotpatches: list, priority: str):
- """
- Get the preferred hotpatch with priority in their hotpatch_name.
-
- Args:
- hotpatches(list): hotpatches
- priority(str): ACC or SGL
-
- Returns:
- Hotpatch or None
- """
- if not hotpatches:
- return None
- hotpatches.sort(key=cmp_to_key(self.version_cmp))
- for hotpatch in hotpatches:
- if priority in hotpatch.hotpatch_name:
- return hotpatch
- return hotpatches[0]
-
- def get_hotpatches_from_advisories(self, advisories: List[str]) -> dict():
- """
- Get hotpatches from specified advisories
-
- Args:
- advisories: [advisory_id_1, advisory_id_2]
-
- Return:
- {
- advisory_id_1: [hotpatch1],
- advisory_id_2: []
- }
- """
- mapping_advisory_hotpatches = dict()
- for advisory_id in advisories:
- mapping_advisory_hotpatches[advisory_id] = []
- if advisory_id not in self._hotpatch_advisories:
- continue
- advisory = self._hotpatch_advisories[advisory_id]
- for hotpatch in advisory.hotpatches:
- if hotpatch.state == self.INSTALLABLE:
- mapping_advisory_hotpatches[advisory_id].append(hotpatch.nevra)
- return mapping_advisory_hotpatches
diff --git a/hotpatch/version.py b/hotpatch/version.py
deleted file mode 100644
index 820b8d5..0000000
--- a/hotpatch/version.py
+++ /dev/null
@@ -1,46 +0,0 @@
-#!/usr/bin/python3
-# ******************************************************************************
-# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved.
-# licensed under the Mulan PSL v2.
-# You can use this software according to the terms and conditions of the Mulan PSL v2.
-# You may obtain a copy of Mulan PSL v2 at:
-# http://license.coscl.org.cn/MulanPSL2
-# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
-# PURPOSE.
-# See the Mulan PSL v2 for more details.
-# ******************************************************************************/
-class Versions:
- """
- Version number processing
- """
-
- separator = (".", "-")
- _connector = "&"
-
- def _order(self, version, separator=None):
- """
- Version of the cutting
- Args:
- version: version
- separator: separator
-
- Returns:
-
- """
- if not separator:
- separator = self._connector
- return tuple([int(v) for v in version.split(separator) if v.isdigit()])
-
- def larger_than(self, version, compare_version):
- """
- Returns true if the size of the compared version is greater
- than that of the compared version, or false otherwise
-
- """
- for separator in self.separator:
- version = self._connector.join([v for v in version.split(separator)])
- compare_version = self._connector.join([v for v in compare_version.split(separator)])
- version = self._order(version)
- compare_version = self._order(compare_version)
- return version >= compare_version
--
Gitee