oec-hardware/oec-hardware-1.0.0-optimization.patch

3076 lines
106 KiB
Diff
Raw Normal View History

Binary files oec-hardware/docs/test-flow.png and oec-hardware_new/docs/test-flow.png differ
diff -urN oec-hardware/hwcompatible/client.py oec-hardware_new/hwcompatible/client.py
--- oec-hardware/hwcompatible/client.py 2021-09-13 11:43:59.390364522 +0800
+++ oec-hardware_new/hwcompatible/client.py 2021-09-13 11:42:03.381474028 +0800
@@ -12,15 +12,16 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""upload file"""
+
import os
import base64
try:
from urllib.parse import urlencode
from urllib.request import urlopen, Request
- from urllib.error import HTTPError
except ImportError:
from urllib import urlencode
- from urllib2 import urlopen, Request, HTTPError
+ from urllib2 import urlopen, Request
class Client:
@@ -29,7 +30,7 @@
"""
def __init__(self, host, oec_id):
self.host = host
- self.id = oec_id
+ self.oec_id = oec_id
self.form = {}
def upload(self, files, server='localhost'):
@@ -42,17 +43,17 @@
filename = os.path.basename(files)
try:
job = filename.split('.')[0]
- with open(files, 'rb') as f:
- filetext = base64.b64encode(f.read())
- except Exception as e:
- print(e)
+ with open(files, 'rb') as file:
+ filetext = base64.b64encode(file.read())
+ except Exception as excp:
+ print(excp)
return False
- if not self.host or not self.id:
- print("Missing host({0}) or id({1})".format(self.host, self.id))
+ if not self.host or not self.oec_id:
+ print("Missing host({0}) or id({1})".format(self.host, self.oec_id))
return False
self.form['host'] = self.host
- self.form['id'] = self.id
+ self.form['id'] = self.oec_id
self.form['job'] = job
self.form['filetext'] = filetext
@@ -70,8 +71,8 @@
print("Error: upload failed, %s" % res.msg)
return False
return True
- except Exception as e:
- print(e)
+ except Exception as excp:
+ print(excp)
return False
diff -urN oec-hardware/hwcompatible/command.py oec-hardware_new/hwcompatible/command.py
--- oec-hardware/hwcompatible/command.py 2021-09-13 11:43:59.390364522 +0800
+++ oec-hardware_new/hwcompatible/command.py 2021-09-13 11:42:03.381474028 +0800
@@ -45,7 +45,7 @@
encoding='utf8')
(output, errors) = self.pipe.communicate()
if output:
- #Strip new line character/s if any from the end of output string
+ # Strip new line character/s if any from the end of output string
output = output.rstrip('\n')
self.origin_output = output
self.output = output.splitlines()
@@ -54,6 +54,7 @@
self.returncode = self.pipe.returncode
def start(self):
+ """start command"""
if sys.version_info.major < 3:
self.pipe = subprocess.Popen(self.command, shell=True,
stdin=subprocess.PIPE,
@@ -82,14 +83,15 @@
# raise CertCommandError(self, "has output on stderr")
def run_quiet(self):
+ """quiet after running command"""
self._run()
if self.returncode != 0:
raise CertCommandError(self, "returned %d" % self.returncode)
def echo(self, ignore_errors=False):
+ """Print information to terminal"""
self.run(ignore_errors)
self.print_output()
- return
def print_output(self):
"""
@@ -141,49 +143,39 @@
return self.pipe.stdout.read().decode('utf-8', 'ignore').rstrip()
def poll(self):
+ """get poll message"""
if self.pipe:
return self.pipe.poll()
- def _get_str(self, regex=None, regex_group=None, single_line=True, return_list=False):
- self.regex = regex
- self.single_line = single_line
- self.regex_group = regex_group
-
- self._run()
-
- if self.single_line:
- if self.output and len(self.output) > 1:
- raise CertCommandError(self, "Found %u lines of output, expected 1" % len(self.output))
+ def _get_str_single_line(self):
+ if self.output and len(self.output) > 1:
+ raise CertCommandError(self, "Found %u lines of output, "
+ "expected 1" % len(self.output))
- if self.output:
- line = self.output[0].strip()
- if not self.regex:
- return line
- # otherwise, try the regex
- pattern = re.compile(self.regex)
- match = pattern.match(line)
+ if self.output:
+ line = self.output[0].strip()
+ if not self.regex:
+ return line
+ # otherwise, try the regex
+ pattern = re.compile(self.regex)
+ match = pattern.match(line)
+ if match:
+ if self.regex_group:
+ return match.group(self.regex_group)
+ # otherwise, no group, return the whole line
+ return line
+
+ # no regex match try a grep-style match
+ if not self.regex_group:
+ match = pattern.search(line)
if match:
- if self.regex_group:
- return match.group(self.regex_group)
- # otherwise, no group, return the whole line
- return line
-
- # no regex match try a grep-style match
- if not self.regex_group:
- match = pattern.search(line)
- if match:
- return match.group()
+ return match.group()
- # otherwise
- raise CertCommandError(self, "no match for regular expression %s" % self.regex)
+ # otherwise
+ raise CertCommandError(self, "no match for regular "
+ "expression %s" % self.regex)
- #otherwise, multi-line or single-line regex
- if not self.regex:
- raise CertCommandError(self, "no regular expression set for multi-line command")
- pattern = re.compile(self.regex)
- result = None
- if return_list:
- result = list()
+ def _get_str_multi_line(self, result, pattern, return_list):
if self.output:
for line in self.output:
if self.regex_group:
@@ -202,12 +194,32 @@
result.append(match.group())
else:
return match.group()
- if result:
- return result
+ return result
+
+ def _get_str(self, regex=None, regex_group=None,
+ single_line=True, return_list=False):
+ self.regex = regex
+ self.single_line = single_line
+ self.regex_group = regex_group
+
+ self._run()
+
+ if self.single_line:
+ return self._get_str_single_line()
- raise CertCommandError(self, "no match for regular expression %s" % self.regex)
+ # otherwise, multi-line or single-line regex
+ if not self.regex:
+ raise CertCommandError(self, "no regular expression "
+ "set for multi-line command")
+ pattern = re.compile(self.regex)
+ result = None
+ if return_list:
+ result = list()
+ return self._get_str_multi_line(result, pattern, return_list)
- def get_str(self, regex=None, regex_group=None, single_line=True, return_list=False, ignore_errors=False):
+ def get_str(self, regex=None, regex_group=None, single_line=True,
+ return_list=False, ignore_errors=False):
+ """获取命令执行结果中匹配的值"""
result = self._get_str(regex, regex_group, single_line, return_list)
if not ignore_errors:
if self.returncode != 0:
@@ -226,7 +238,7 @@
Cert command error handling
"""
def __init__(self, command, message):
- super(CertCommandError, self).__init__()
+ Exception.__init__(self)
self.message = message
self.command = command
self.__message = None
@@ -234,7 +246,9 @@
def __str__(self):
return "\"%s\" %s" % (self.command.command, self.message)
- def _get_message(self): return self.__message
- def _set_message(self, value): self.__message = value
- message = property(_get_message, _set_message)
+ def _get_message(self):
+ return self.__message
+ def _set_message(self, value):
+ self.__message = value
+ message = property(_get_message, _set_message)
diff -urN oec-hardware/hwcompatible/commandUI.py oec-hardware_new/hwcompatible/commandUI.py
--- oec-hardware/hwcompatible/commandUI.py 2021-09-13 11:43:59.390364522 +0800
+++ oec-hardware_new/hwcompatible/commandUI.py 2021-09-13 11:42:03.381474028 +0800
@@ -23,7 +23,7 @@
def __init__(self, echoResponses=False):
self.echo = echoResponses
- def printPipe(self, pipe):
+ def print_pipe(self, pipe):
"""
print pipe data
:param pipe:
@@ -123,6 +123,7 @@
reply = input(label).strip()
if not choices or reply in choices:
return reply
- print("Please enter one of the following: %s" % " | ".join(choices))
+ print("Please enter one of the "
+ "following: %s" % " | ".join(choices))
finally:
readline.set_startup_hook()
diff -urN oec-hardware/hwcompatible/compatibility.py oec-hardware_new/hwcompatible/compatibility.py
--- oec-hardware/hwcompatible/compatibility.py 2021-09-13 11:43:59.390364522 +0800
+++ oec-hardware_new/hwcompatible/compatibility.py 2021-09-13 11:42:03.381474028 +0800
@@ -60,7 +60,8 @@
self.devices = DeviceDocument(CertEnv.devicefile, oec_devices)
self.devices.save()
- # test_factory format example: [{"name":"nvme", "device":device, "run":True, "status":"PASS", "reboot":False}]
+ # test_factory format example: [{"name":"nvme", "device":device,
+ # "run":True, "status":"PASS", "reboot":False}]
test_factory = self.get_tests(oec_devices)
self.update_factory(test_factory)
if not self.choose_tests():
@@ -95,7 +96,8 @@
clean all compatibility test file
:return:
"""
- if self.ui.prompt_confirm("Are you sure to clean all compatibility test data?"):
+ if self.ui.prompt_confirm("Are you sure to clean all "
+ "compatibility test data?"):
try:
Command("rm -rf %s" % CertEnv.certificationfile).run()
Command("rm -rf %s" % CertEnv.factoryfile).run()
@@ -149,7 +151,8 @@
cwd = os.getcwd()
os.chdir(os.path.dirname(doc_dir))
- dir_name = "oech-" + datetime.datetime.now().strftime("%Y%m%d%H%M%S") + "-" + job.job_id
+ dir_name = "oech-" + datetime.datetime.now().strftime("%Y%m%d%H%M%S")\
+ + "-" + job.job_id
pack_name = dir_name + ".tar"
cmd = Command("tar -cf %s %s" % (pack_name, dir_name))
try:
@@ -213,11 +216,8 @@
:param devices:
:return:
"""
- nodevice = ["cpufreq", "memory", "clock", "profiler", "system", "stress", "kdump", "perf", "acpi", "watchdog"]
- ethernet = ["ethernet"]
- infiniband = ["infiniband"]
- storage = ["nvme", "disk", "nvdimm"]
- cdrom = ["cdrom"]
+ nodevice = ["cpufreq", "memory", "clock", "profiler", "system",
+ "stress", "kdump", "perf", "acpi", "watchdog"]
sort_devices = self.sort_tests(devices)
empty_device = Device()
test_factory = list()
@@ -225,7 +225,8 @@
for (dirpath, dirs, filenames) in os.walk(CertEnv.testdirectoy):
dirs.sort()
for filename in filenames:
- if filename.endswith(".py") and not filename.startswith("__init__"):
+ if filename.endswith(".py") and \
+ not filename.startswith("__init__"):
casenames.append(filename.split(".")[0])
for testname in casenames:
@@ -258,15 +259,18 @@
empty_device = Device()
for device in devices:
if device.get_property("SUBSYSTEM") == "usb" and \
- device.get_property("ID_VENDOR_FROM_DATABASE") == "Linux Foundation" and \
+ device.get_property("ID_VENDOR_FROM_DATABASE") == \
+ "Linux Foundation" and \
("2." in device.get_property("ID_MODEL_FROM_DATABASE") or
"3." in device.get_property("ID_MODEL_FROM_DATABASE")):
sort_devices["usb"] = [empty_device]
continue
- if device.get_property("PCI_CLASS") == "30000" or device.get_property("PCI_CLASS") == "38000":
+ if device.get_property("PCI_CLASS") == "30000" or \
+ device.get_property("PCI_CLASS") == "38000":
sort_devices["video"] = [device]
continue
- if (device.get_property("DEVTYPE") == "disk" and not device.get_property("ID_TYPE")) or \
+ if (device.get_property("DEVTYPE") == "disk" and
+ not device.get_property("ID_TYPE")) or \
device.get_property("ID_TYPE") == "disk":
if "nvme" in device.get_property("DEVPATH"):
sort_devices["disk"] = [empty_device]
@@ -274,11 +278,10 @@
sort_devices["nvme"].extend([device])
except KeyError:
sort_devices["nvme"] = [device]
- continue
elif "/host" in device.get_property("DEVPATH"):
sort_devices["disk"] = [empty_device]
- continue
- if device.get_property("SUBSYSTEM") == "net" and device.get_property("INTERFACE"):
+ if device.get_property("SUBSYSTEM") == "net" and \
+ device.get_property("INTERFACE"):
interface = device.get_property("INTERFACE")
nmcli = Command("nmcli device")
nmcli.start()
@@ -304,7 +307,7 @@
break
continue
if device.get_property("ID_CDROM") == "1":
- types = ["DVD_RW", "DVD_PLUS_RW", "DVD_R", "DVD_PLUS_R", "DVD", \
+ types = ["DVD_RW", "DVD_PLUS_RW", "DVD_R", "DVD_PLUS_R", "DVD",
"BD_RE", "BD_R", "BD", "CD_RW", "CD_R", "CD"]
for dev_type in types:
if device.get_property("ID_CDROM_" + dev_type) == "1":
@@ -316,7 +319,8 @@
if device.get_property("SUBSYSTEM") == "ipmi":
sort_devices["ipmi"] = [empty_device]
try:
- Command("dmidecode").get_str("IPMI Device Information", single_line=False)
+ Command("dmidecode").get_str("IPMI Device Information",
+ single_line=False)
sort_devices["ipmi"] = [empty_device]
except:
pass
@@ -353,21 +357,24 @@
test["run"] = True
continue
- try:
- num = int(reply)
- except:
- continue
+ num_lst = reply.split(" ")
+ for num in num_lst:
+ try:
+ num = int(num)
+ except ValueError:
+ continue
- if num > 0 and num <= len(self.test_factory):
- self.test_factory[num - 1]["run"] = not self.test_factory[num - 1]["run"]
- continue
+ if 0 < num <= len(self.test_factory):
+ self.test_factory[num - 1]["run"] = not \
+ self.test_factory[num - 1]["run"]
+ continue
def show_tests(self):
"""
show test items
:return:
"""
- print("\033[1;35m" + "No.".ljust(4) + "Run-Now?".ljust(10) \
+ print("\033[1;35m" + "No.".ljust(4) + "Run-Now?".ljust(10)
+ "Status".ljust(8) + "Class".ljust(14) + "Device\033[0m")
num = 0
for test in self.test_factory:
@@ -385,16 +392,16 @@
num = num + 1
if status == "PASS":
- print("%-6d" % num + run.ljust(8) + "\033[0;32mPASS \033[0m" \
+ print("%-6d" % num + run.ljust(8) + "\033[0;32mPASS \033[0m"
+ name.ljust(14) + "%s" % device)
elif status == "FAIL":
- print("%-6d" % num + run.ljust(8) + "\033[0;31mFAIL \033[0m" \
+ print("%-6d" % num + run.ljust(8) + "\033[0;31mFAIL \033[0m"
+ name.ljust(14) + "%s" % device)
elif status == "Force":
- print("%-6d" % num + run.ljust(8) + "\033[0;33mForce \033[0m" \
+ print("%-6d" % num + run.ljust(8) + "\033[0;33mForce \033[0m"
+ name.ljust(14) + "%s" % device)
else:
- print("%-6d" % num + run.ljust(8) + "\033[0;34mNotRun \033[0m" \
+ print("%-6d" % num + run.ljust(8) + "\033[0;34mNotRun \033[0m"
+ name.ljust(14) + "%s" % device)
def choose_tests(self):
@@ -408,19 +415,20 @@
else:
test["run"] = True
os.system("clear")
- print("These tests are recommended to complete the compatibility test:")
+ print("These tests are recommended to "
+ "complete the compatibility test:")
self.show_tests()
- action = self.ui.prompt("Ready to begin testing?", ["run", "edit", "quit"])
+ action = self.ui.prompt("Ready to begin testing?",
+ ["run", "edit", "quit"])
action = action.lower()
if action in ["r", "run"]:
return True
- elif action in ["q", "quit"]:
+ if action in ["q", "quit"]:
return False
- elif action in ["e", "edit"]:
+ if action in ["e", "edit"]:
return self.edit_tests()
- else:
- print("Invalid choice!")
- return self.choose_tests()
+ print("Invalid choice!")
+ return self.choose_tests()
def check_result(self):
"""
@@ -443,15 +451,16 @@
if not self.test_factory:
self.test_factory = test_factory
else:
- factory_changed = False
for test in self.test_factory:
if not self.search_factory(test, test_factory):
self.test_factory.remove(test)
- print("delete %s test %s" % (test["name"], test["device"].get_name()))
+ print("delete %s test %s" % (test["name"],
+ test["device"].get_name()))
for test in test_factory:
if not self.search_factory(test, self.test_factory):
self.test_factory.append(test)
- print("add %s test %s" % (test["name"], test["device"].get_name()))
+ print("add %s test %s" % (test["name"],
+ test["device"].get_name()))
self.test_factory.sort(key=lambda k: k["name"])
FactoryDocument(CertEnv.factoryfile, self.test_factory).save()
@@ -463,6 +472,7 @@
:return:
"""
for test in test_factory:
- if test["name"] == obj_test["name"] and test["device"].path == obj_test["device"].path:
+ if test["name"] == obj_test["name"] and \
+ test["device"].path == obj_test["device"].path:
return True
return False
diff -urN oec-hardware/hwcompatible/device.py oec-hardware_new/hwcompatible/device.py
--- oec-hardware/hwcompatible/device.py 2021-09-13 11:43:59.390364522 +0800
+++ oec-hardware_new/hwcompatible/device.py 2021-09-13 11:42:03.381474028 +0800
@@ -12,7 +12,7 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
-from .command import Command, CertCommandError
+from .command import Command
def filter_char(string):
@@ -25,8 +25,8 @@
filtered = u''
start = 0
for i in range(len(string)):
- c = string[i]
- if c in ascii_blacklist or (type(string) != unicode and ord(c) >= 128):
+ char_filter = string[i]
+ if char_filter in ascii_blacklist or (type(string) != unicode and ord(char_filter) >= 128):
if start < i:
filtered += string[start:i]
start = i + 1
@@ -110,10 +110,8 @@
"""
if "INTERFACE" in self.properties.keys():
return self.properties["INTERFACE"]
- elif "DEVNAME" in self.properties.keys():
+ if "DEVNAME" in self.properties.keys():
return self.properties["DEVNAME"].split("/")[-1]
- elif self.path:
+ if self.path:
return self.path.split("/")[-1]
- else:
- return ""
-
+ return ""
diff -urN oec-hardware/hwcompatible/document.py oec-hardware_new/hwcompatible/document.py
--- oec-hardware/hwcompatible/document.py 2021-09-13 11:43:59.390364522 +0800
+++ oec-hardware_new/hwcompatible/document.py 2021-09-13 11:42:03.381474028 +0800
@@ -12,8 +12,9 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
-import json
+"""Document processing"""
+import json
from .commandUI import CommandUI
from .command import Command
from .device import Device
@@ -33,23 +34,25 @@
print("doc new")
def save(self):
+ """save file"""
try:
with open(self.filename, "w+") as save_f:
json.dump(self.document, save_f, indent=4)
save_f.close()
- except Exception as e:
+ except Exception as concrete_error:
print("Error: doc save fail.")
- print(e)
+ print(concrete_error)
return False
return True
def load(self):
+ """load file"""
try:
with open(self.filename, "r") as load_f:
self.document = json.load(load_f)
load_f.close()
return True
- except:
+ except Exception:
return False
@@ -86,33 +89,53 @@
self.document[key] = value
else:
break
- except Exception as e:
+ except Exception as concrete_error:
print("Error: get hardware info fail.")
- print(e)
+ print(concrete_error)
sysinfo = SysInfo(CertEnv.releasefile)
self.document["OS"] = sysinfo.product + " " + sysinfo.get_version()
self.document["kernel"] = sysinfo.kernel
self.document["ID"] = CommandUI().prompt("Please provide your Compatibility Test ID:")
self.document["Product URL"] = CommandUI().prompt("Please provide your Product URL:")
- self.document["server"] = CommandUI().prompt("Please provide the Compatibility Test Server (Hostname or Ipaddr):")
+ self.document["server"] = CommandUI().prompt("Please provide the Compatibility Test "
+ "Server (Hostname or Ipaddr):")
def get_hardware(self):
- return self.document["Manufacturer"] + " " + self.document["Product Name"] + " " + self.document["Version"]
+ """
+ Get hardware information
+ """
+ return self.document["Manufacturer"] + " " + self.document["Product Name"] + " " \
+ + self.document["Version"]
def get_os(self):
+ """
+ Get os information
+ """
return self.document["OS"]
def get_server(self):
+ """
+ Get server information
+ """
return self.document["server"]
def get_url(self):
+ """
+ Get url
+ """
return self.document["Product URL"]
def get_certify(self):
+ """
+ Get certify
+ """
return self.document["ID"]
def get_kernel(self):
+ """
+ Get kernel information
+ """
return self.document["kernel"]
@@ -179,17 +202,23 @@
self.load()
def load(self):
- fp = open(self.filename)
- self.config = fp.readlines()
+ """
+ Load config file
+ """
+ fp_info = open(self.filename)
+ self.config = fp_info.readlines()
for line in self.config:
if line.strip() and line.strip()[0] == "#":
continue
words = line.strip().split(" ")
if words[0]:
self.parameters[words[0]] = " ".join(words[1:])
- fp.close()
+ fp_info.close()
def get_parameter(self, name):
+ """
+ Get parameter
+ """
if self.parameters:
try:
return self.parameters[name]
@@ -198,6 +227,9 @@
return None
def dump(self):
+ """
+ Dump
+ """
for line in self.config:
string = line.strip()
if not string or string[0] == "#":
@@ -205,6 +237,9 @@
print(string)
def add_parameter(self, name, value):
+ """
+ add parameter
+ """
if not self.getParameter(name):
self.parameters[name] = value
self.config.append("%s %s\n" % (name, value))
@@ -238,7 +273,7 @@
Save the config property value to a file
:return:
"""
- fp = open(self.filename, "w")
+ fp_info = open(self.filename, "w")
for line in self.config:
- fp.write(line)
- fp.close()
+ fp_info.write(line)
+ fp_info.close()
diff -urN oec-hardware/hwcompatible/env.py oec-hardware_new/hwcompatible/env.py
--- oec-hardware/hwcompatible/env.py 2021-09-13 11:43:59.390364522 +0800
+++ oec-hardware_new/hwcompatible/env.py 2021-09-13 11:42:03.381474028 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""Certification file path"""
+
class CertEnv:
"""
@@ -28,5 +30,3 @@
logdirectoy = "/usr/share/oech/logs"
resultdirectoy = "/usr/share/oech/lib/server/results"
kernelinfo = "/usr/share/oech/kernelrelease.json"
-
-
diff -urN oec-hardware/hwcompatible/job.py oec-hardware_new/hwcompatible/job.py
--- oec-hardware/hwcompatible/job.py 2021-09-13 11:43:59.390364522 +0800
+++ oec-hardware_new/hwcompatible/job.py 2021-09-13 11:42:03.381474028 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""Test task management"""
+
import os
import sys
import string
@@ -26,7 +28,7 @@
from .reboot import Reboot
-class Job(object):
+class Job():
"""
Test task management
"""
@@ -42,7 +44,7 @@
self.test_factory = getattr(args, "test_factory", [])
self.test_suite = []
self.job_id = ''.join(random.sample(string.ascii_letters + string.digits, 10))
- self.ui = CommandUI()
+ self.com_ui = CommandUI()
self.subtests_filter = getattr(args, "subtests_filter", None)
self.test_parameters = None
@@ -74,9 +76,9 @@
sys.path.insert(0, dirpath)
try:
module = __import__(testname, globals(), locals())
- except Exception as e:
+ except Exception as concrete_error:
print("Error: module import failed for %s" % testname)
- print(e)
+ print(concrete_error)
return None
for thing in dir(module):
@@ -144,8 +146,8 @@
try:
cmd = Command("yum install -y " + " ".join(required_rpms))
cmd.echo()
- except CertCommandError as e:
- print(e)
+ except CertCommandError as concrete_error:
+ print(concrete_error)
print("Fail to install required packages.")
return False
@@ -182,8 +184,8 @@
return_code = test.test()
else:
return_code = test.test()
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
return_code = False
if reboot:
@@ -251,5 +253,6 @@
"""
for test in self.test_factory:
for testcase in self.test_suite:
- if test["name"] == testcase["name"] and test["device"].path == testcase["device"].path:
+ if test["name"] == testcase["name"] and test["device"].path == \
+ testcase["device"].path:
test["status"] = testcase["status"]
diff -urN oec-hardware/hwcompatible/log.py oec-hardware_new/hwcompatible/log.py
--- oec-hardware/hwcompatible/log.py 2021-09-13 11:43:59.390364522 +0800
+++ oec-hardware_new/hwcompatible/log.py 2021-09-13 11:42:03.381474028 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""Log management"""
+
import os
import sys
import datetime
diff -urN oec-hardware/hwcompatible/reboot.py oec-hardware_new/hwcompatible/reboot.py
--- oec-hardware/hwcompatible/reboot.py 2021-09-13 11:43:59.390364522 +0800
+++ oec-hardware_new/hwcompatible/reboot.py 2021-09-13 11:42:03.381474028 +0800
@@ -12,7 +12,10 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""Special for restart tasks, so that the test can be continued after the machine is restarted"""
+
import datetime
+import os
from .document import Document, FactoryDocument
from .env import CertEnv
@@ -73,7 +76,7 @@
try:
Command("systemctl daemon-reload").run_quiet()
Command("systemctl enable oech").run_quiet()
- except:
+ except Exception:
print("Error: enable oech.service fail.")
return False
@@ -85,17 +88,20 @@
:return:
"""
doc = Document(CertEnv.rebootfile)
- if not doc.load():
- print("Error: reboot file load fail.")
+ if os.path.exists(CertEnv.rebootfile):
+ if not doc.load():
+ print("Error: reboot111 file load fail.")
+ return False
+ else:
return False
-
+
try:
self.testname = doc.document["test"]
self.reboot = doc.document
self.job.job_id = self.reboot["job_id"]
self.job.subtests_filter = self.reboot["rebootup"]
time_reboot = datetime.datetime.strptime(self.reboot["time"], "%Y%m%d%H%M%S")
- except:
+ except Exception:
print("Error: reboot file format not as expect.")
return False
@@ -108,4 +114,3 @@
return False
return True
-
diff -urN oec-hardware/hwcompatible/sysinfo.py oec-hardware_new/hwcompatible/sysinfo.py
--- oec-hardware/hwcompatible/sysinfo.py 2021-09-13 11:43:59.390364522 +0800
+++ oec-hardware_new/hwcompatible/sysinfo.py 2021-09-13 11:42:03.381474028 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""Get system information"""
+
import os
import re
@@ -40,10 +42,10 @@
:return:
"""
try:
- f = open(filename)
- text = f.read()
- f.close()
- except:
+ file_content = open(filename)
+ text = file_content.read()
+ file_content.close()
+ except Exception:
print("Release file not found.")
return
@@ -56,12 +58,12 @@
results = pattern.findall(text)
self.version = results[0].strip() if results else ""
- with os.popen('uname -m') as p:
- self.arch = p.readline().strip()
+ with os.popen('uname -m') as pop:
+ self.arch = pop.readline().strip()
self.debug_kernel = "debug" in self.arch
- with os.popen('uname -r') as p:
- self.kernel = p.readline().strip()
+ with os.popen('uname -r') as pop:
+ self.kernel = pop.readline().strip()
self.kernel_rpm = "kernel-{}".format(self.kernel)
self.kerneldevel_rpm = "kernel-devel-{}".format(self.kernel)
self.kernel_version = self.kernel.split('-')[0]
@@ -72,4 +74,3 @@
:return:
"""
return self.version
-
diff -urN oec-hardware/hwcompatible/test.py oec-hardware_new/hwcompatible/test.py
--- oec-hardware/hwcompatible/test.py 2021-09-13 11:43:59.390364522 +0800
+++ oec-hardware_new/hwcompatible/test.py 2021-09-13 11:42:03.381474028 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""Test set template"""
+
class Test:
"""
@@ -24,7 +26,13 @@
self.rebootup = None
def setup(self, args=None):
+ """
+ setup
+ """
pass
def teardown(self):
+ """
+ teardown
+ """
pass
diff -urN oec-hardware/hwcompatible/version.py oec-hardware_new/hwcompatible/version.py
--- oec-hardware/hwcompatible/version.py 1970-01-01 08:00:00.000000000 +0800
+++ oec-hardware_new/hwcompatible/version.py 2021-09-13 11:42:03.381474028 +0800
@@ -0,0 +1,3 @@
+# hwcompatible/version.py is automatically-generated
+version = ''
+name = 'oec-hardware'
diff -urN oec-hardware/README.md oec-hardware_new/README.md
--- oec-hardware/README.md 2021-09-13 11:43:59.390364522 +0800
+++ oec-hardware_new/README.md 2021-09-13 11:43:06.821961002 +0800
@@ -31,15 +31,15 @@
## 背景介绍
-OS 厂商为了扩大自己产品的兼容性范围常常寻求与硬件厂商的合作进行兼容性测试。OS 厂商制定一个测试标准并提供测试用例硬件厂商进行实际的测试测试通过后OS 厂商和硬件厂商将共同对结果负责。这是一个双赢的合作,双方都可以藉此推销自己的产品。
+OS 厂商为了扩大自己产品的兼容性范围常常寻求与硬件厂商的合作进行兼容性测试。OS 厂商制定一个测试标准并提供测试用例硬件厂商进行实际的测试。测试通过后OS 厂商和硬件厂商将共同在对应的官网发布兼容性信息。
-验证目的就是保证 OS 与硬件平台的兼容性,认证仅限于基本功能验证,不包括性能测试等其它测试。
+验证目的就是保证 OS 与硬件平台的兼容性,验证仅限于基本功能验证,不包括性能测试等其它测试。
openEuler硬件兼容性验证测试框架有如下特点
1. 为满足可信要求必须使用openEuler操作系统不能随意重编/插入内核模块。
2. 通过扫描机制自适应发现硬件列表,来确定要运行的测试用例集合。
-3. 面向对象抽象各种硬件类型以及测试用例类,用于扩展开发。
+3. 面向对象抽象各种硬件类型以及测试用例类,用于扩展开发。
## 原理简介
@@ -100,33 +100,19 @@
## 获取安装包
-* 安装包从 openEuler 官方网站下载(暂未开放)。
-
-* 校验安装包的完整性。
-
- 1. 获取校验文件中的校验值:
-
- ```
- cat oec-hardware-*.rpm.sha256sum
- ```
-
- 2. 计算文件的 sha256 校验值:
-
- ```
- sha256sum oec-hardware-*.rpm
- ```
-
- 命令执行完成后,输出校验值。
-
- 3. 对比步骤1和步骤2计算的校验值是否一致。
-
- 如果校验值一致说明安装文件完整性没有破坏,如果校验值不一致则可以确认文件完整性已被破坏,需要重新获取。
+https://gitee.com/src-openeuler/oec-hardware/releases
## 安装过程
### 客户端
-1. 配置 [openEuler 官方 repo](https://repo.openeuler.org/) 中对应版本的 everything 源,使用 `dnf` 安装客户端 oec-hardware。
+1. 安装 memtester 依赖工具。
+
+ ```
+ dnf install memtester-4.3.0-18.fc33.aarch64(x86_64).rpm
+ ```
+
+2. 配置 [openEuler 官方 repo](https://repo.openeuler.org/) 中对应版本的 everything 源,使用 `dnf` 安装客户端 oec-hardware。
```
dnf install oec-hardware-XXX.rpm
@@ -277,7 +263,6 @@
1. **system**
- 注意在安装openEuler系统的时候非最小安装会引入kmod-kvdo模块此模块会修改内核导致system测试项"检测内核是否被修改"不过。若非有意引入,可卸载该模块。
- 检查本工具是否被修改。
- 检查 OS 版本和 kernel 版本是否匹配。
- 检查内核是否被修改/感染。
diff -urN oec-hardware/scripts/oech oec-hardware_new/scripts/oech
--- oec-hardware/scripts/oech 2021-09-13 11:43:59.390364522 +0800
+++ oec-hardware_new/scripts/oech 2021-09-13 11:42:03.409474243 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""Operation of client """
+
import os
import sys
import fcntl
@@ -25,22 +27,31 @@
class CertLock:
+ """
+ certlock
+ """
def __init__(self, filename):
self.filename = filename
- self.fd = open(filename, 'w')
+ self.fd_obj = open(filename, 'w')
def acquire(self):
+ """
+ acquire
+ """
try:
- fcntl.flock(self.fd, fcntl.LOCK_EX|fcntl.LOCK_NB)
+ fcntl.flock(self.fd_obj, fcntl.LOCK_EX|fcntl.LOCK_NB)
return True
except IOError:
return False
def release(self):
- fcntl.flock(self.fd, fcntl.LOCK_UN)
+ """
+ release
+ """
+ fcntl.flock(self.fd_obj, fcntl.LOCK_UN)
def __del__(self):
- self.fd.close()
+ self.fd_obj.close()
if __name__ == '__main__':
@@ -80,4 +91,3 @@
lock.release()
sys.exit(0)
-
diff -urN oec-hardware/scripts/oech-server.service oec-hardware_new/scripts/oech-server.service
--- oec-hardware/scripts/oech-server.service 2021-09-13 11:43:59.390364522 +0800
+++ oec-hardware_new/scripts/oech-server.service 2021-09-13 11:42:03.409474243 +0800
@@ -6,6 +6,8 @@
Type=notify
ExecStartPre=/usr/share/oech/lib/server/oech-server-pre.sh
ExecStart=/usr/local/bin/uwsgi --ini /usr/share/oech/lib/server/uwsgi.ini
+ExecStop=/usr/local/bin/uwsgi --stop /usr/share/oech/lib/server/uwsgi.pid
+
[Install]
WantedBy=multi-user.target
diff -urN oec-hardware/server/server.py oec-hardware_new/server/server.py
--- oec-hardware/server/server.py 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/server/server.py 2021-09-13 11:42:03.409474243 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""server"""
+
import os
import json
import time
@@ -40,27 +42,42 @@
@app.errorhandler(400)
-def bad_request(e):
+def bad_request():
+ """
+ bad request
+ """
return render_template('error.html', error='400 - Bad Request'), 400
@app.errorhandler(404)
def page_not_found(e):
+ """
+ page not fount
+ """
return render_template('error.html', error='404 - Page Not Found'), 404
@app.errorhandler(500)
def internal_server_error(e):
+ """
+ internal server error
+ """
return render_template('error.html', error='500 - Internal Server Error'), 500
@app.route('/')
def index():
+ """
+ index
+ """
return render_template('index.html')
@app.route('/results')
def get_results():
+ """
+ get results
+ """
results = {}
for host in next(os.walk(dir_results))[1]:
dir_host = os.path.join(dir_results, host)
@@ -85,11 +102,11 @@
json_info = os.path.join(dir_job, 'compatibility.json')
json_results = os.path.join(dir_job, 'factory.json')
try:
- with open(json_info, 'r') as f:
- info = json.load(f)
- with open(json_results, 'r') as f:
- results = json.load(f)
- except Exception as e:
+ with open(json_info, 'r') as file_content:
+ info = json.load(file_content)
+ with open(json_results, 'r') as file_content:
+ results = json.load(file_content)
+ except Exception:
abort(404)
return render_template('job.html', host=host, id=oec_id, job=job, info=info, results=results)
@@ -107,9 +124,9 @@
dir_job = os.path.join(dir_results, host, oec_id, job)
json_results = os.path.join(dir_job, 'factory.json')
try:
- with open(json_results, 'r') as f:
- results = json.load(f)
- except Exception as e:
+ with open(json_results, 'r') as file_content:
+ results = json.load(file_content)
+ except Exception:
abort(404)
for testcase in results:
device = testcase.get('device')
@@ -131,9 +148,9 @@
dir_job = os.path.join(dir_results, host, oec_id, job)
json_devices = os.path.join(dir_job, 'device.json')
try:
- with open(json_devices, 'r') as f:
- devices = json.load(f)
- except Exception as e:
+ with open(json_devices, 'r') as file_content:
+ devices = json.load(file_content)
+ except Exception:
abort(404)
return render_template('devices.html', devices=devices)
@@ -169,9 +186,9 @@
if not os.path.exists(logpath):
logpath = os.path.join(dir_job, 'job.log')
try:
- with open(logpath, 'r') as f:
- log = f.read().split('\n')
- except Exception as e:
+ with open(logpath, 'r') as file_content:
+ log = file_content.read().split('\n')
+ except Exception:
abort(404)
return render_template('log.html', name=name, log=log)
@@ -189,12 +206,12 @@
tar_job = dir_job + '.tar.gz'
json_cert = os.path.join(dir_job, 'compatibility.json')
try:
- with open(json_cert, 'r') as f:
- cert = json.load(f)
- with open(tar_job, 'rb') as f:
- attachment = base64.b64encode(f.read())
- except Exception as e:
- print(e)
+ with open(json_cert, 'r') as file_content:
+ cert = json.load(file_content)
+ with open(tar_job, 'rb') as file_content:
+ attachment = base64.b64encode(file_content.read())
+ except Exception as concrete_error:
+ print(concrete_error)
abort(500)
form = {}
@@ -211,9 +228,9 @@
try:
req = Request(url, data=data, headers=headers)
res = urlopen(req)
- except HTTPError as e:
- print(e)
- res = e
+ except HTTPError as concrete_error:
+ print(concrete_error)
+ res = concrete_error
if res.code == 200:
flash('Submit Successful', 'success')
@@ -242,11 +259,11 @@
if not os.path.exists(dir_job):
os.makedirs(dir_job)
try:
- with open(tar_job, 'wb') as f:
- f.write(base64.b64decode(filetext))
+ with open(tar_job, 'wb') as file_content:
+ file_content.write(base64.b64decode(filetext))
os.system("tar xf '%s' -C '%s'" % (tar_job, os.path.dirname(dir_job)))
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
abort(400)
return render_template('upload.html', host=host, id=oec_id, job=job,
filetext=filetext, ret='Successful')
@@ -254,17 +271,26 @@
@app.route('/files')
def get_files():
+ """
+ get files
+ """
files = os.listdir(dir_files)
return render_template('files.html', files=files)
@app.route('/files/<path:path>')
def download_file(path):
+ """
+ download file
+ """
return send_from_directory('files', path, as_attachment=True)
@app.route('/api/file/upload', methods=['GET', 'POST'])
def upload_file():
+ """
+ upload_file
+ """
filename = request.values.get('filename', '')
filetext = request.values.get('filetext', '')
if not(all([filename, filetext])):
@@ -275,10 +301,10 @@
if not os.path.exists(dir_files):
os.makedirs(dir_files)
try:
- with open(filepath, 'wb') as f:
- f.write(base64.b64decode(filetext))
- except Exception as e:
- print(e)
+ with open(filepath, 'wb') as file_content:
+ file_content.write(base64.b64decode(filetext))
+ except Exception as concrete_error:
+ print(concrete_error)
abort(400)
return render_template('upload.html', filename=filename, filetext=filetext,
ret='Successful')
@@ -286,6 +312,9 @@
@app.route('/api/<act>', methods=['GET', 'POST'])
def test_server(act):
+ """
+ test server
+ """
valid_commands = ['rping', 'rcopy', 'ib_read_bw', 'ib_write_bw', 'ib_send_bw',
'qperf']
cmd = request.values.get('cmd', '')
@@ -295,7 +324,7 @@
abort(400)
if act == 'start':
- if 'rping' == cmd[0]:
+ if cmd[0] == 'rping':
cmd = ['rping', '-s']
if 'ib_' in cmd[0]:
@@ -309,7 +338,7 @@
abort(400)
cmd.extend(['-d', ibdev, '-i', ibport])
elif act == 'stop':
- if 'all' == cmd[0]:
+ if cmd[0] == 'all':
cmd = ['killall', '-9'] + valid_commands
else:
cmd = ['killall', '-9', cmd[0]]
@@ -351,11 +380,10 @@
ibport = str(ibport)
return ibdev, ibport
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
return None, None
if __name__ == '__main__':
app.run(host='0.0.0.0', port=80)
-
diff -urN oec-hardware/server/uwsgi.ini oec-hardware_new/server/uwsgi.ini
--- oec-hardware/server/uwsgi.ini 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/server/uwsgi.ini 2021-09-13 11:42:03.409474243 +0800
@@ -1,6 +1,11 @@
[uwsgi]
socket = 127.0.0.1:8080
chdir = /usr/share/oech/lib/server
-wsgi-file = server.py
+wsgi-file = /usr/share/oech/lib/server/server.py
callable = app
processes = 4
+pidfile = /usr/share/oech/lib/server/uwsgi.pid
+master = true
+buffer-size = 65536
+
+
diff -urN oec-hardware/tests/acpi/acpi.py oec-hardware_new/tests/acpi/acpi.py
--- oec-hardware/tests/acpi/acpi.py 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/tests/acpi/acpi.py 2021-09-13 11:42:03.409474243 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""acpi test"""
+
from hwcompatible.test import Test
from hwcompatible.command import Command
@@ -25,9 +27,12 @@
self.requirements = ["acpica-tools"]
def test(self):
+ """
+ start test
+ """
try:
Command("acpidump").echo()
return True
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
return False
diff -urN oec-hardware/tests/cdrom/cdrom.py oec-hardware_new/tests/cdrom/cdrom.py
--- oec-hardware/tests/cdrom/cdrom.py 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/tests/cdrom/cdrom.py 2021-09-13 12:54:30.082712105 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""cdrom test"""
+
import os
import sys
import time
@@ -34,7 +36,7 @@
self.device = None
self.type = None
self.args = None
- self.ui = CommandUI()
+ self.com_ui = CommandUI()
self.test_dir = "/usr/share/doc"
def setup(self, args=None):
@@ -62,12 +64,13 @@
devname = self.device.get_property("DEVNAME")
Command("eject %s" % devname).run(ignore_errors=True)
while True:
- print("Please insert %s disc into %s, then close the tray manually." % (self.type.lower(), devname))
+ print("Please insert %s disc into %s, then close the tray manually."\
+ % (self.type.lower(), devname))
if self.method == "write_test":
print(" tips:disc should be new.")
elif self.method == "read_test":
print(" tips:disc should not be blank.")
- if self.ui.prompt_confirm("Done well?"):
+ if self.com_ui.prompt_confirm("Done well?"):
break
Command("eject -t %s" % devname).run(ignore_errors=True)
print("Waiting media..).")
@@ -86,7 +89,7 @@
if not device:
return None
- bd_types = ["BD_RE", "BD_R", "BD"]
+ bd_types = ["BD_RE", "BD_R", "BD"]
dvd_types = ["DVD_RW", "DVD_PLUS_RW", "DVD_R", "DVD_PLUS_R", "DVD"]
cd_types = ["CD_RW", "CD_R", "CD"]
for bd_type in bd_types:
@@ -99,7 +102,7 @@
if device.get_property("ID_CDROM_" + cd_type) == "1":
return cd_type
- print("Can not find proper test-type for %s." % device.get_name())
+ print("Can not find pr)oper test-type for %s." % device.get_name())
return None
def get_mode(self, device_type):
@@ -148,7 +151,7 @@
self.reload_disc(devname)
sys.stdout.flush()
return self.write_test()
- except CertCommandError as e:
+ except CertCommandError:
return False
def write_test(self):
@@ -168,18 +171,20 @@
write_opts = "-sao"
try:
command = Command("cdrecord dev=%s -checkdrive" % devname)
- modes = command.get_str(regex="^Supported modes[^:]*:(?P<modes>.*$)", regex_group="modes",
+ modes = command.get_str(regex="^Supported modes[^:]*:(?P<modes>.*$)", \
+ regex_group="modes",
single_line=False, ignore_errors=True)
if "TAO" in modes:
write_opts = "-tao"
if "SAO" in modes:
write_opts = "-sao"
- flags = command.get_str(regex="^Driver flags[^:]*:(?P<flags>.*$)", regex_group="flags",
+ flags = command.get_str(regex="^Driver flags[^:]*:(?P<flags>.*$)", \
+ regex_group="flags",
single_line=False, ignore_errors=True)
if "BURNFREE" in flags:
write_opts += " driveropts=burnfree"
- except CertCommandError as e:
- print(e)
+ except CertCommandError as concrete_error:
+ print(concrete_error)
size = Command("mkisofs -quiet -R -print-size %s " % self.test_dir).get_str()
blocks = int(size)
@@ -189,7 +194,7 @@
self.reload_disc(devname)
sys.stdout.flush()
return True
- except CertCommandError as e:
+ except CertCommandError as concrete_error:
return False
def read_test(self):
@@ -229,8 +234,8 @@
Command("umount ./mnt_cdrom").run(ignore_errors=True)
Command("rm -rf ./mnt_cdrom ./device_dir").run(ignore_errors=True)
return return_code
- except CertCommandError as e:
- print(e)
+ except CertCommandError as concrete_error:
+ print(concrete_error)
return False
def cmp_tree(self, dir1, dir2):
@@ -246,7 +251,7 @@
try:
Command("diff -r %s %s" % (dir1, dir2)).run()
return True
- except CertCommandError as e:
+ except CertCommandError:
print("Error: file comparison failed.")
return False
@@ -265,16 +270,16 @@
Command("eject %s" % device).run()
print("tray ejected.")
sys.stdout.flush()
- except:
+ except Exception:
pass
try:
Command("eject -t %s" % device).run()
print("tray auto-closed.\n")
sys.stdout.flush()
- except:
+ except Exception:
print("Could not auto-close the tray, please close the tray manually.")
- self.ui.prompt_confirm("Done well?")
+ self.com_ui.prompt_confirm("Done well?")
time.sleep(20)
return True
diff -urN oec-hardware/tests/clock/clock.py oec-hardware_new/tests/clock/clock.py
--- oec-hardware/tests/clock/clock.py 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/tests/clock/clock.py 2021-09-13 11:42:03.409474243 +0800
@@ -12,8 +12,11 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""clock test"""
+
import os
+from hwcompatible.command import Command, CertCommandError
from hwcompatible.test import Test
clock_dir = os.path.dirname(os.path.realpath(__file__))
@@ -28,7 +31,12 @@
Clock test case
:return:
"""
- return 0 == os.system("cd %s; ./clock" % clock_dir)
+ try:
+ Command("cd %s; ./clock" % clock_dir).echo()
+ return True
+ except CertCommandError as concrete_error:
+ print(concrete_error)
+ return False
if __name__ == '__main__':
diff -urN oec-hardware/tests/cpufreq/cal.py oec-hardware_new/tests/cpufreq/cal.py
--- oec-hardware/tests/cpufreq/cal.py 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/tests/cpufreq/cal.py 2021-09-13 11:42:03.409474243 +0800
@@ -12,11 +12,14 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""call test_case"""
+
import decimal
import time
def cal():
+ """call test_case"""
decimal.getcontext().prec = 1000
one = decimal.Decimal(1)
for i in range(1000):
diff -urN oec-hardware/tests/cpufreq/cpufreq.py oec-hardware_new/tests/cpufreq/cpufreq.py
--- oec-hardware/tests/cpufreq/cpufreq.py 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/tests/cpufreq/cpufreq.py 2021-09-13 12:53:08.458080148 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""cpufreq test"""
+
from random import randint
from time import sleep
@@ -21,6 +23,9 @@
class CPU:
+ """
+ cpufreq test
+ """
def __init__(self):
self.cpu = None
self.nums = None
@@ -39,8 +44,8 @@
cmd = Command("lscpu")
try:
nums = cmd.get_str(r'^CPU\S*:\s+(?P<cpus>\d+)$', 'cpus', False)
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
return False
self.nums = int(nums)
self.list = range(self.nums)
@@ -48,16 +53,16 @@
cmd = Command("cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_max_freq")
try:
max_freq = cmd.get_str()
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
return False
self.max_freq = int(max_freq)
cmd = Command("cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_min_freq")
try:
min_freq = cmd.get_str()
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
return False
self.min_freq = int(min_freq)
@@ -74,8 +79,8 @@
try:
cmd.run()
return cmd.returncode
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
return False
def get_freq(self, cpu):
@@ -87,8 +92,8 @@
cmd = Command("cpupower -c %s frequency-info -w" % cpu)
try:
return int(cmd.get_str(r'.* frequency: (?P<freq>\d+) .*', 'freq', False))
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
return False
def set_governor(self, governor, cpu='all'):
@@ -102,8 +107,8 @@
try:
cmd.run()
return cmd.returncode
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
return False
def get_governor(self, cpu):
@@ -115,8 +120,8 @@
cmd = Command("cpupower -c %s frequency-info -p" % cpu)
try:
return cmd.get_str(r'.* governor "(?P<governor>\w+)".*', 'governor', False)
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
return False
def find_path(self, parent_dir, target_name):
@@ -130,8 +135,8 @@
try:
cmd.run()
return cmd.returncode
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
return False
@@ -141,7 +146,8 @@
"""
def __init__(self, cpu):
self.cpu = cpu
- self.process = Command("taskset -c {} python -u {}/cpufreq/cal.py".format(self.cpu, CertEnv.testdirectoy))
+ self.process = Command("taskset -c {} python -u {}/cpufreq/cal.py".\
+ format(self.cpu, CertEnv.testdirectoy))
self.returncode = None
def run(self):
diff -urN oec-hardware/tests/cpufreq/cpufreq.py.orig oec-hardware_new/tests/cpufreq/cpufreq.py.orig
--- oec-hardware/tests/cpufreq/cpufreq.py.orig 1970-01-01 08:00:00.000000000 +0800
+++ oec-hardware_new/tests/cpufreq/cpufreq.py.orig 2021-09-13 11:42:03.409474243 +0800
@@ -0,0 +1,466 @@
+#!/usr/bin/env python
+# coding: utf-8
+
+# Copyright (c) 2020 Huawei Technologies Co., Ltd.
+# oec-hardware is licensed under the Mulan PSL v2.
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
+# You may obtain a copy of Mulan PSL v2 at:
+# http://license.coscl.org.cn/MulanPSL2
+# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+# PURPOSE.
+# See the Mulan PSL v2 for more details.
+# Create: 2020-04-01
+
+"""cpufreq test"""
+
+from random import randint
+from time import sleep
+
+from hwcompatible.env import CertEnv
+from hwcompatible.test import Test
+from hwcompatible.command import Command
+
+
+class CPU:
+ """
+ cpufreq test
+ """
+ def __init__(self):
+ self.cpu = None
+ self.nums = None
+ self.list = None
+ self.numa_nodes = None
+ self.governors = None
+ self.original_governor = None
+ self.max_freq = None
+ self.min_freq = None
+
+ def get_info(self):
+ """
+ Get CPU info
+ :return:
+ """
+ cmd = Command("lscpu")
+ try:
+ nums = cmd.get_str(r'^CPU\S*:\s+(?P<cpus>\d+)$', 'cpus', False)
+ except Exception as concrete_error:
+ print(concrete_error)
+ return False
+ self.nums = int(nums)
+ self.list = range(self.nums)
+
+ cmd = Command("cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_max_freq")
+ try:
+ max_freq = cmd.get_str()
+ except Exception as concrete_error:
+ print(concrete_error)
+ return False
+ self.max_freq = int(max_freq)
+
+ cmd = Command("cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_min_freq")
+ try:
+ min_freq = cmd.get_str()
+ except Exception as concrete_error:
+ print(concrete_error)
+ return False
+ self.min_freq = int(min_freq)
+
+ return True
+
+ def set_freq(self, freq, cpu='all'):
+ """
+ Set CPU frequency
+ :param freq:
+ :param cpu:
+ :return:
+ """
+ cmd = Command("cpupower -c %s frequency-set --freq %s" % (cpu, freq))
+ try:
+ cmd.run()
+ return cmd.returncode
+ except Exception as concrete_error:
+ print(concrete_error)
+ return False
+
+ def get_freq(self, cpu):
+ """
+ Get CPU frequency
+ :param cpu:
+ :return:
+ """
+ cmd = Command("cpupower -c %s frequency-info -w" % cpu)
+ try:
+ return int(cmd.get_str(r'.* frequency: (?P<freq>\d+) .*', 'freq', False))
+ except Exception as concrete_error:
+ print(concrete_error)
+ return False
+
+ def set_governor(self, governor, cpu='all'):
+ """
+ Set the frequency governor mode of CPU
+ :param governor:
+ :param cpu:
+ :return:
+ """
+ cmd = Command("cpupower -c %s frequency-set --governor %s" % (cpu, governor))
+ try:
+ cmd.run()
+ return cmd.returncode
+ except Exception as concrete_error:
+ print(concrete_error)
+ return False
+
+ def get_governor(self, cpu):
+ """
+ Get cpu governor
+ :param cpu:
+ :return:
+ """
+ cmd = Command("cpupower -c %s frequency-info -p" % cpu)
+ try:
+ return cmd.get_str(r'.* governor "(?P<governor>\w+)".*', 'governor', False)
+ except Exception as concrete_error:
+ print(concrete_error)
+ return False
+
+ def find_path(self, parent_dir, target_name):
+ """
+ Find the target path from the specified directory
+ :param parent_dir:
+ :param target_name:
+ :return:
+ """
+ cmd = Command("find %s -name %s" % (parent_dir, target_name))
+ try:
+ cmd.run()
+ return cmd.returncode
+ except Exception as concrete_error:
+ print(concrete_error)
+ return False
+
+
+class Load:
+ """
+ Let a program run on a specific CPU
+ """
+ def __init__(self, cpu):
+ self.cpu = cpu
+ self.process = Command("taskset -c {} python -u {}/cpufreq/cal.py".\
+ format(self.cpu, CertEnv.testdirectoy))
+ self.returncode = None
+
+ def run(self):
+ """
+ Process started
+ :return:
+ """
+ self.process.start() # background
+
+ def get_runtime(self):
+ """
+ Get the running time of the process
+ :return:
+ """
+ if not self.process:
+ return None
+
+ while self.returncode is None:
+ self.returncode = self.process.poll()
+ if self.returncode == 0:
+ line = self.process.readline()
+ return float(line)
+ else:
+ return False
+
+
+class CPUFreqTest(Test):
+ """
+ CPU frequency test
+ """
+ def __init__(self):
+ Test.__init__(self)
+ self.requirements = ['util-linux', 'kernel-tools']
+ self.cpu = CPU()
+ self.original_governor = self.cpu.get_governor(0)
+
+ def test_userspace(self):
+ """
+ userspace mode of testing CPU frequency
+ :return:
+ """
+ target_cpu = randint(0, self.cpu.nums-1)
+ target_freq = randint(self.cpu.min_freq, self.cpu.max_freq)
+ if self.cpu.set_freq(target_freq, cpu=target_cpu) != 0:
+ print("[X] Set CPU%s to freq %d failed." % (target_cpu, target_freq))
+ return False
+ print("[.] Set CPU%s to freq %d." % (target_cpu, target_freq))
+ target_cpu_freq = self.cpu.get_freq(target_cpu)
+ print("[.] Current freq of CPU%s is %d." % (target_cpu, target_cpu_freq))
+
+ target_cpu_governor = self.cpu.get_governor(target_cpu)
+ if target_cpu_governor != 'userspace':
+ print("[X] The governor of CPU%s(%s) is not userspace." %
+ (target_cpu, target_cpu_governor))
+ return False
+ print("[.] The governor of CPU%s is %s." %
+ (target_cpu, target_cpu_governor))
+
+ # min_freq -> max_runtime
+ self.cpu.set_freq(self.cpu.min_freq)
+ load_list = []
+ runtime_list = []
+ for cpu in self.cpu.list:
+ load_test = Load(cpu)
+ load_test.run()
+ load_list.append(load_test)
+ for cpu in self.cpu.list:
+ runtime = load_list[cpu].get_runtime()
+ runtime_list.append(runtime)
+ max_average_runtime = 1.0 * sum(runtime_list) / len(runtime_list)
+ if max_average_runtime == 0:
+ print("[X] Max average time is 0.")
+ return False
+ print("[.] Max average time of all CPUs userspace load test: %.2f" %
+ max_average_runtime)
+
+ # max_freq -> min_runtime
+ self.cpu.set_freq(self.cpu.max_freq)
+ load_list = []
+ runtime_list = []
+ for cpu in self.cpu.list:
+ load_test = Load(cpu)
+ load_test.run()
+ load_list.append(load_test)
+ for cpu in self.cpu.list:
+ runtime = load_list[cpu].get_runtime()
+ runtime_list.append(runtime)
+ min_average_runtime = 1.0 * sum(runtime_list) / len(runtime_list)
+ if min_average_runtime == 0:
+ print("[X] Min average time is 0.")
+ return False
+ print("[.] Min average time of all CPUs userspace load test: %.2f" %
+ min_average_runtime)
+
+ measured_speedup = 1.0 * max_average_runtime / min_average_runtime
+ expected_speedup = 1.0 * self.cpu.max_freq / self.cpu.min_freq
+ tolerance = 1.0
+ min_speedup = expected_speedup - (expected_speedup - 1.0) * tolerance
+ max_speedup = expected_speedup + (expected_speedup - 1.0) * tolerance
+ if not min_speedup < measured_speedup < max_speedup:
+ print("[X] The speedup(%.2f) is not between %.2f and %.2f" %
+ (measured_speedup, min_speedup, max_speedup))
+ return False
+ print("[.] The speedup(%.2f) is between %.2f and %.2f" %
+ (measured_speedup, min_speedup, max_speedup))
+
+ return True
+
+ def test_ondemand(self):
+ """
+ ondemand mode of testing CPU frequency
+ :return:
+ """
+ if self.cpu.set_governor('powersave') != 0:
+ print("[X] Set governor of all CPUs to powersave failed.")
+ return False
+ print("[.] Set governor of all CPUs to powersave.")
+
+ if self.cpu.set_governor('ondemand') != 0:
+ print("[X] Set governor of all CPUs to ondemand failed.")
+ return False
+ print("[.] Set governor of all CPUs to ondemand.")
+
+ target_cpu = randint(0, self.cpu.nums)
+ target_cpu_governor = self.cpu.get_governor(target_cpu)
+ if target_cpu_governor != 'ondemand':
+ print("[X] The governor of CPU%s(%s) is not ondemand." %
+ (target_cpu, target_cpu_governor))
+ return False
+ print("[.] The governor of CPU%s is %s." %
+ (target_cpu, target_cpu_governor))
+
+ load_test = Load(target_cpu)
+ load_test.run()
+ sleep(1)
+ target_cpu_freq = self.cpu.get_freq(target_cpu)
+ if target_cpu_freq != self.cpu.max_freq:
+ print("[X] The freq of CPU%s(%d) is not scaling_max_freq(%d)." %
+ (target_cpu, target_cpu_freq, self.cpu.max_freq))
+ return False
+ print("[.] The freq of CPU%s is scaling_max_freq(%d)." %
+ (target_cpu, target_cpu_freq))
+
+ load_test_time = load_test.get_runtime()
+ print("[.] Time of CPU%s ondemand load test: %.2f" %
+ (target_cpu, load_test_time))
+ target_cpu_freq = self.cpu.get_freq(target_cpu)
+ if not target_cpu_freq <= self.cpu.max_freq:
+ print("[X] The freq of CPU%s(%d) is not less equal than %d." %
+ (target_cpu, target_cpu_freq, self.cpu.max_freq))
+ return False
+ print("[.] The freq of CPU%s(%d) is less equal than %d." %
+ (target_cpu, target_cpu_freq, self.cpu.max_freq))
+
+ return True
+
+ def test_conservative(self):
+ """
+ conservative mode of testing CPU frequency
+ :return:
+ """
+ if self.cpu.set_governor('powersave') != 0:
+ print("[X] Set governor of all CPUs to powersave failed.")
+ return False
+ print("[.] Set governor of all CPUs to powersave.")
+
+ if self.cpu.set_governor('conservative') != 0:
+ print("[X] Set governor of all CPUs to conservative failed.")
+ return False
+ print("[.] Set governor of all CPUs to conservative.")
+
+ target_cpu = randint(0, self.cpu.nums)
+ target_cpu_governor = self.cpu.get_governor(target_cpu)
+ if target_cpu_governor != 'conservative':
+ print("[X] The governor of CPU%s(%s) is not conservative." %
+ (target_cpu, target_cpu_governor))
+ return False
+ print("[.] The governor of CPU%s is %s." %
+ (target_cpu, target_cpu_governor))
+
+ load_test = Load(target_cpu)
+ load_test.run()
+ sleep(1)
+ target_cpu_freq = self.cpu.get_freq(target_cpu)
+ if not self.cpu.min_freq < target_cpu_freq < self.cpu.max_freq:
+ print("[X] The freq of CPU%s(%d) is not between %d~%d." %
+ (target_cpu, target_cpu_freq, self.cpu.min_freq, self.cpu.max_freq))
+ return False
+ print("[.] The freq of CPU%s(%d) is between %d~%d." %
+ (target_cpu, target_cpu_freq, self.cpu.min_freq, self.cpu.max_freq))
+
+ load_test_time = load_test.get_runtime()
+ print("[.] Time of CPU%s conservative load test: %.2f" %
+ (target_cpu, load_test_time))
+ target_cpu_freq = self.cpu.get_freq(target_cpu)
+ print("[.] Current freq of CPU%s is %d." % (target_cpu, target_cpu_freq))
+
+ return True
+
+ def test_powersave(self):
+ """
+ powersave mode of testing CPU frequency
+ :return:
+ """
+ if self.cpu.set_governor('powersave') != 0:
+ print("[X] Set governor of all CPUs to powersave failed.")
+ return False
+ print("[.] Set governor of all CPUs to powersave.")
+
+ target_cpu = randint(0, self.cpu.nums)
+ target_cpu_governor = self.cpu.get_governor(target_cpu)
+ if target_cpu_governor != 'powersave':
+ print("[X] The governor of CPU%s(%s) is not powersave." %
+ (target_cpu, target_cpu_governor))
+ return False
+ print("[.] The governor of CPU%s is %s." %
+ (target_cpu, target_cpu_governor))
+
+ target_cpu_freq = self.cpu.get_freq(target_cpu)
+ if target_cpu_freq != self.cpu.min_freq:
+ print("[X] The freq of CPU%s(%d) is not scaling_min_freq(%d)." %
+ (target_cpu, target_cpu_freq, self.cpu.min_freq))
+ return False
+ print("[.] The freq of CPU%s is %d." % (target_cpu, target_cpu_freq))
+
+ load_test = Load(target_cpu)
+ load_test.run()
+ load_test_time = load_test.get_runtime()
+ print("[.] Time of CPU%s powersave load test: %.2f" %
+ (target_cpu, load_test_time))
+ target_cpu_freq = self.cpu.get_freq(target_cpu)
+ print("[.] Current freq of CPU%s is %d." % (target_cpu, target_cpu_freq))
+
+ return True
+
+ def test_performance(self):
+ """
+ Performance mode of testing CPU frequency
+ :return:
+ """
+ if self.cpu.set_governor('performance') != 0:
+ print("[X] Set governor of all CPUs to performance failed.")
+ return False
+ print("[.] Set governor of all CPUs to performance.")
+
+ target_cpu = randint(0, self.cpu.nums)
+ target_cpu_governor = self.cpu.get_governor(target_cpu)
+ if target_cpu_governor != 'performance':
+ print("[X] The governor of CPU%s(%s) is not performance." %
+ (target_cpu, target_cpu_governor))
+ return False
+ print("[.] The governor of CPU%s is %s." %
+ (target_cpu, target_cpu_governor))
+
+ target_cpu_freq = self.cpu.get_freq(target_cpu)
+ if target_cpu_freq != self.cpu.max_freq:
+ print("[X] The freq of CPU%s(%d) is not scaling_max_freq(%d)." %
+ (target_cpu, target_cpu_freq, self.cpu.max_freq))
+ return False
+ print("[.] The freq of CPU%s is %d." % (target_cpu, target_cpu_freq))
+
+ load_test = Load(target_cpu)
+ load_test.run()
+ load_test_time = load_test.get_runtime()
+ print("[.] Time of CPU%s performance load test: %.2f" %
+ (target_cpu, load_test_time))
+ target_cpu_freq = self.cpu.get_freq(target_cpu)
+ print("[.] Current freq of CPU%s is %d." % (target_cpu, target_cpu_freq))
+
+ return True
+
+ def test(self):
+ """
+ Test case
+ :return:
+ """
+ if not self.cpu.get_info():
+ print("[X] Fail to get CPU info."
+ " Please check if the CPU supports cpufreq.")
+ return False
+
+ ret = True
+ print("")
+ print("[.] Test userspace")
+ if not self.test_userspace():
+ print("[X] Test userspace FAILED")
+ ret = False
+ print("")
+ print("[.] Test ondemand")
+ if not self.test_ondemand():
+ print("[X] Test ondemand FAILED")
+ ret = False
+ print("")
+ print("[.] Test conservative")
+ if not self.test_conservative():
+ print("[X] Test conservative FAILED")
+ ret = False
+ print("")
+ print("[.] Test powersave")
+ if not self.test_powersave():
+ print("[X] Test powersave FAILED")
+ ret = False
+ print("")
+ print("[.] Test performance")
+ if not self.test_performance():
+ print("[X] Test performance FAILED")
+ ret = False
+
+ self.cpu.set_governor(self.original_governor)
+ return ret
+
+
+if __name__ == "__main__":
+ t = CPUFreqTest()
+ t.setup()
+ t.test()
diff -urN oec-hardware/tests/disk/disk.py oec-hardware_new/tests/disk/disk.py
--- oec-hardware/tests/disk/disk.py 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/tests/disk/disk.py 2021-09-13 11:42:03.409474243 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""disk test"""
+
import os
import sys
import shutil
@@ -23,12 +25,14 @@
class DiskTest(Test):
-
+ """
+ disk test
+ """
def __init__(self):
Test.__init__(self)
self.disks = list()
self.filesystems = ["ext4"]
- self.ui = CommandUI()
+ self.com_ui = CommandUI()
def setup(self, args=None):
"""
@@ -52,18 +56,22 @@
Command("cat /proc/mdstat").echo(ignore_errors=True)
sys.stdout.flush()
print("\n")
- except Exception as e:
+ except Exception as concrete_error:
print("Warning: could not get disk info")
- print(e)
+ print(concrete_error)
def test(self):
+ """
+ start test
+ """
self.get_disk()
if len(self.disks) == 0:
print("No suite disk found to test.")
return False
self.disks.append("all")
- disk = self.ui.prompt_edit("Which disk would you like to test: ", self.disks[0], self.disks)
+ disk = self.com_ui.prompt_edit("Which disk would you like to test: ",\
+ self.disks[0], self.disks)
return_code = True
if disk == "all":
for disk in self.disks[:-1]:
@@ -79,12 +87,16 @@
return return_code
def get_disk(self):
+ """
+ get disk info
+ """
self.disks = list()
disks = list()
devices = CertDevice().get_devices()
for device in devices:
- if (device.get_property("DEVTYPE") == "disk" and not device.get_property("ID_TYPE")) or \
- device.get_property("ID_TYPE") == "disk":
+ if (device.get_property("DEVTYPE") == "disk" and not \
+ device.get_property("ID_TYPE")) or device.\
+ get_property("ID_TYPE") == "disk":
if "/host" in device.get_property("DEVPATH"):
disks.append(device.get_name())
@@ -123,6 +135,9 @@
print("These disks %s are in use now, skip them." % "|".join(un_suitable))
def raw_test(self, disk):
+ """
+ raw test
+ """
print("\n#############")
print("%s raw IO test" % disk)
device = "/dev/" + disk
@@ -147,7 +162,8 @@
return False
print("\nStarting rand raw IO test...")
- opts = "-direct=1 -iodepth 4 -rw=randrw -rwmixread=50 -group_reporting -name=file -runtime=300"
+ opts = "-direct=1 -iodepth 4 -rw=randrw -rwmixread=50 " \
+ "-group_reporting -name=file -runtime=300"
if not self.do_fio(device, size, opts):
print("%s rand raw IO test fail." % device)
print("#############")
@@ -157,6 +173,9 @@
return True
def vfs_test(self, disk):
+ """
+ vfs test
+ """
print("\n#############")
print("%s vfs test" % disk)
device = "/dev/" + disk
@@ -179,12 +198,12 @@
path = os.path.join(os.getcwd(), "vfs_test")
return_code = True
- for fs in self.filesystems:
+ for file_sys in self.filesystems:
try:
- print("\nFormatting %s to %s ..." % (device, fs))
+ print("\nFormatting %s to %s ..." % (device, file_sys))
Command("umount %s" % device).echo(ignore_errors=True)
- Command("mkfs -t %s -F %s 2>/dev/null" % (fs, device)).echo()
- Command("mount -t %s %s %s" % (fs, device, "vfs_test")).echo()
+ Command("mkfs -t %s -F %s 2>/dev/null" % (file_sys, device)).echo()
+ Command("mount -t %s %s %s" % (file_sys, device, "vfs_test")).echo()
print("\nStarting sequential vfs IO test...")
opts = "-direct=1 -iodepth 4 -rw=rw -rwmixread=50 -name=directoy -runtime=300"
@@ -197,8 +216,8 @@
if not self.do_fio(path, size, opts):
return_code = False
break
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
return_code = False
break
@@ -208,17 +227,20 @@
return return_code
def do_fio(self, filepath, size, option):
+ """
+ fio test
+ """
if os.path.isdir(filepath):
file_opt = "-directory=%s" % filepath
else:
file_opt = "-filename=%s" % filepath
max_bs = 64
- bs = 4
- while bs <= max_bs:
- if os.system("fio %s -size=%dK -bs=%dK %s" % (file_opt, size, bs, option)) != 0:
+ a_bs = 4
+ while a_bs <= max_bs:
+ if os.system("fio %s -size=%dK -bs=%dK %s" % (file_opt, size, a_bs, option)) != 0:
print("Error: %s fio failed." % filepath)
return False
print("\n")
sys.stdout.flush()
- bs = bs * 2
+ a_bs = a_bs * 2
return True
diff -urN oec-hardware/tests/ipmi/ipmi.py oec-hardware_new/tests/ipmi/ipmi.py
--- oec-hardware/tests/ipmi/ipmi.py 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/tests/ipmi/ipmi.py 2021-09-13 11:42:03.413474274 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""ipmi test"""
+
from hwcompatible.test import Test
from hwcompatible.command import Command
@@ -31,8 +33,9 @@
"""
try:
Command("systemctl start ipmi").run()
- Command("systemctl status ipmi.service").get_str(regex="Active: active", single_line=False)
- except:
+ Command("systemctl status ipmi.service").get_str(regex="Active: active", \
+ single_line=False)
+ except Exception:
print("ipmi service cant't be started")
return False
return True
@@ -46,7 +49,7 @@
for cmd in cmd_list:
try:
Command(cmd).echo()
- except:
+ except Exception:
print("%s return error." % cmd)
return False
return True
diff -urN oec-hardware/tests/kdump/kdump.py oec-hardware_new/tests/kdump/kdump.py
--- oec-hardware/tests/kdump/kdump.py 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/tests/kdump/kdump.py 2021-09-13 11:42:03.413474274 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""kdump test"""
+
import os
import sys
import time
@@ -26,6 +28,7 @@
"""
Kdump Test
"""
+
def __init__(self):
Test.__init__(self)
self.pri = 9
@@ -42,7 +45,7 @@
"""
try:
Command("cat /proc/cmdline").get_str(r"crashkernel=[^\ ]*")
- except:
+ except Exception:
print("Error: no crashkernel found.")
return False
@@ -58,8 +61,9 @@
try:
Command("systemctl restart kdump").run()
- Command("systemctl status kdump").get_str(regex="Active: active", single_line=False)
- except:
+ Command("systemctl status kdump").get_str(regex="Active: active",
+ single_line=False)
+ except Exception:
print("Error: kdump service not working.")
return False
@@ -68,8 +72,8 @@
config.dump()
print("#############")
- ui = CommandUI()
- if ui.prompt_confirm("System will reboot, are you ready?"):
+ com_ui = CommandUI()
+ if com_ui.prompt_confirm("System will reboot, are you ready?"):
print("\ntrigger crash...")
sys.stdout.flush()
os.system("sync")
@@ -89,8 +93,9 @@
if config.get_parameter("path"):
self.vmcore_path = config.get_parameter("path")
- dir_pattern = re.compile(r'(?P<ipaddr>[0-9]+\.[0-9]+\.[0-9]+)-(?P<date>[0-9]+[-.][0-9]+[-.][0-9]+)-'
- r'(?P<time>[0-9]+:[0-9]+:[0-9]+)')
+ dir_pattern = re.compile(
+ r'(?P<ipaddr>[0-9]+\.[0-9]+\.[0-9]+)-(?P<date>[0-9]+[-.][0-9]+[-.][0-9]+)-'
+ r'(?P<time>[0-9]+:[0-9]+:[0-9]+)')
vmcore_dirs = list()
for (root, dirs, files) in os.walk(self.vmcore_path):
@@ -101,10 +106,12 @@
vmcore_file = os.path.join(self.vmcore_path, vmcore_dirs[-1], "vmcore")
try:
- Command("echo \"sys\nq\" | crash -s %s /usr/lib/debug/lib/modules/`uname -r`/vmlinux" % vmcore_file).echo()
+ Command(
+ "echo \"sys\nq\" | crash -s %s /usr/lib/debug/lib/modules/`uname -r`/vmlinux" % \
+ vmcore_file).echo()
print("kdump image %s verified" % vmcore_file)
return True
- except CertCommandError as e:
+ except CertCommandError as concrete_error:
print("Error: could not verify kdump image %s" % vmcore_file)
- print(e)
+ print(concrete_error)
return False
diff -urN oec-hardware/tests/memory/memory.py oec-hardware_new/tests/memory/memory.py
--- oec-hardware/tests/memory/memory.py 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/tests/memory/memory.py 2021-09-13 11:42:03.413474274 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""memory test"""
+
import os
import time
import re
@@ -79,18 +81,18 @@
if line:
tokens = line.split()
if len(tokens) == 3:
- if "MemTotal:" == tokens[0].strip():
+ if tokens[0].strip() == "MemTotal:":
self.system_memory = int(tokens[1].strip())/1024
elif tokens[0].strip() in ["MemFree:", "Cached:", "Buffers:"]:
self.free_memory += int(tokens[1].strip())
- elif "SwapTotal:" == tokens[0].strip():
+ elif tokens[0].strip() == "SwapTotal:":
self.swap_memory = int(tokens[1].strip())/1024
- elif "Hugepagesize:" == tokens[0].strip():
+ elif tokens[0].strip() == "Hugepagesize:":
self.hugepage_size = int(tokens[1].strip())/1024
elif len(tokens) == 2:
- if "HugePages_Total:" == tokens[0].strip():
+ if tokens[0].strip() == "HugePages_Total:":
self.hugepage_total = int(tokens[1].strip())
- elif "HugePages_Free:" == tokens[0].strip():
+ elif tokens[0].strip() == "HugePages_Free:":
self.hugepage_free = int(tokens[1].strip())
else:
break
@@ -180,8 +182,8 @@
try:
Command("cd %s; ./hugetlb_test" % self.test_dir).echo()
print("Hugetlb test succ.\n")
- except CertCommandError as e:
- print(e)
+ except CertCommandError as concrete_error:
+ print(concrete_error)
print("Error: hugepages test fail.\n")
return False
return True
@@ -241,7 +243,7 @@
Command("echo 1 > %s/online" % memory_path).run()
Command("cat %s/state" % memory_path).get_str("online")
return True
- except:
+ except Exception:
print("Error: fail to online %s." % memory_path)
return False
@@ -255,7 +257,7 @@
Command("echo 0 > %s/online" % memory_path).run()
Command("cat %s/state" % memory_path).get_str("offline")
return True
- except:
+ except Exception:
print("Error: fail to online %s." % memory_path)
return False
diff -urN oec-hardware/tests/network/ethernet.py oec-hardware_new/tests/network/ethernet.py
--- oec-hardware/tests/network/ethernet.py 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/tests/network/ethernet.py 2021-09-13 11:42:03.413474274 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""ethernet test"""
+
import os
import argparse
@@ -43,7 +45,7 @@
path_pci = path_netdev.split('net')[0]
cmd = "ls %s | grep -q infiniband" % path_pci
print(cmd)
- return 0 == os.system(cmd)
+ return os.system(cmd) == 0
def setup(self, args=None):
"""
diff -urN oec-hardware/tests/network/infiniband.py oec-hardware_new/tests/network/infiniband.py
--- oec-hardware/tests/network/infiniband.py 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/tests/network/infiniband.py 2021-09-13 11:42:03.413474274 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""InfiniBand Test"""
+
from rdma import RDMATest
@@ -38,12 +40,12 @@
if 'ACTIVE' not in self.state:
print("[X] Link is not ACTIVE.")
- if 0x0 == self.base_lid:
+ if self.base_lid == 0x0:
print("[X] Fail to get base lid of %s." % self.interface)
return False
print("[.] The base lid is %s" % self.base_lid)
- if 0x0 == self.sm_lid:
+ if self.sm_lid == 0x0:
print("[X] Fail to get subnet manager lid of %s." % self.interface)
return False
print("[.] The subnet manager lid is %s" % self.sm_lid)
diff -urN oec-hardware/tests/network/network.py oec-hardware_new/tests/network/network.py
--- oec-hardware/tests/network/network.py 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/tests/network/network.py 2021-09-13 11:42:03.413474274 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""Network Test"""
+
import os
import time
import argparse
@@ -58,7 +60,7 @@
"""
os.system("ip link set down %s" % interface)
for _ in range(5):
- if 0 == os.system("ip link show %s | grep 'state DOWN'" % interface):
+ if os.system("ip link show %s | grep 'state DOWN'" % interface) == 0:
return True
time.sleep(1)
return False
@@ -72,7 +74,7 @@
os.system("ip link set up %s" % interface)
for _ in range(5):
time.sleep(1)
- if 0 == os.system("ip link show %s | grep 'state UP'" % interface):
+ if os.system("ip link show %s | grep 'state UP'" % interface) == 0:
return True
return False
@@ -83,8 +85,8 @@
"""
ignore_interfaces = ['^lo', '^v', 'docker', 'br', 'bond']
cmd = "ip route show default | awk '/default/ {print $5}'"
- c = Command(cmd)
- management_interface = c.read()
+ com = Command(cmd)
+ management_interface = com.read()
if management_interface:
ignore_interfaces.append(management_interface)
# print(cmd)
@@ -94,12 +96,12 @@
# print(ignore_pattern)
cmd = "ls /sys/class/net/ | grep -vE '%s'" % ignore_pattern
# print(cmd)
- c = Command(cmd)
+ com = Command(cmd)
try:
- c.run()
- return c.output
- except Exception as e:
- print(e)
+ com.run()
+ return com.output
+ except Exception as concrete_error:
+ print(concrete_error)
return []
def set_other_interfaces_down(self):
@@ -128,12 +130,12 @@
Get speed on the interface
:return:
"""
- c = Command("ethtool %s" % self.interface)
+ com = Command("ethtool %s" % self.interface)
pattern = r".*Speed:\s+(?P<speed>\d+)Mb/s"
try:
- speed = c.get_str(pattern, 'speed', False)
+ speed = com.get_str(pattern, 'speed', False)
return int(speed)
- except Exception as e:
+ except Exception:
print("[X] No speed found on the interface.")
return None
@@ -142,12 +144,12 @@
Get interface ip
:return:
"""
- c = Command("ip addr show %s" % self.interface)
+ com = Command("ip addr show %s" % self.interface)
pattern = r".*inet.? (?P<ip>.+)/.*"
try:
- ip = c.get_str(pattern, 'ip', False)
- return ip
- except Exception as e:
+ ip_addr = com.get_str(pattern, 'ip', False)
+ return ip_addr
+ except Exception:
print("[X] No available ip on the interface.")
return None
@@ -157,18 +159,18 @@
:return:
"""
count = 500
- c = Command("ping -q -c %d -i 0 %s" % (count, self.server_ip))
+ com = Command("ping -q -c %d -i 0 %s" % (count, self.server_ip))
pattern = r".*, (?P<loss>\d+\.{0,1}\d*)% packet loss.*"
for _ in range(self.retries):
try:
- print(c.command)
- loss = c.get_str(pattern, 'loss', False)
- c.print_output()
+ print(com.command)
+ loss = com.get_str(pattern, 'loss', False)
+ com.print_output()
if float(loss) == 0:
return True
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
return False
def call_remote_server(self, cmd, act='start', ib_server_ip=''):
@@ -191,8 +193,8 @@
try:
request = Request(url, data=data, headers=headers)
response = urlopen(request)
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
return False
print("Status: %u %s" % (response.code, response.msg))
return int(response.code) == 200
@@ -205,15 +207,18 @@
cmd = "qperf %s udp_lat" % self.server_ip
print(cmd)
for _ in range(self.retries):
- if 0 == os.system(cmd):
+ if os.system(cmd) == 0:
return True
return False
def test_tcp_latency(self):
+ """
+ tcp test
+ """
cmd = "qperf %s tcp_lat" % self.server_ip
print(cmd)
for _ in range(self.retries):
- if 0 == os.system(cmd):
+ if os.system(cmd) == 0:
return True
return False
@@ -224,24 +229,24 @@
"""
cmd = "qperf %s tcp_bw" % self.server_ip
print(cmd)
- c = Command(cmd)
+ com = Command(cmd)
pattern = r"\s+bw\s+=\s+(?P<bandwidth>[\.0-9]+ [MG]B/sec)"
for _ in range(self.retries):
try:
- bandwidth = c.get_str(pattern, 'bandwidth', False)
- bw = bandwidth.split()
- if 'GB' in bw[1]:
- bandwidth = float(bw[0]) * 8 * 1024
+ bandwidth = com.get_str(pattern, 'bandwidth', False)
+ band_width = bandwidth.split()
+ if 'GB' in band_width[1]:
+ bandwidth = float(band_width[0]) * 8 * 1024
else:
- bandwidth = float(bw[0]) * 8
+ bandwidth = float(band_width[0]) * 8
target_bandwidth = self.target_bandwidth_percent * self.speed
print("Current bandwidth is %.2fMb/s, target is %.2fMb/s" %
(bandwidth, target_bandwidth))
if bandwidth > target_bandwidth:
return True
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
return False
def create_testfile(self):
@@ -249,10 +254,10 @@
Create testfile
:return:
"""
- bs = 128
+ b_s = 128
count = self.speed/8
- cmd = "dd if=/dev/urandom of=%s bs=%uk count=%u" % (self.testfile, bs, count)
- return 0 == os.system(cmd)
+ cmd = "dd if=/dev/urandom of=%s bs=%uk count=%u" % (self.testfile, b_s, count)
+ return os.system(cmd) == 0
def test_http_upload(self):
"""
@@ -264,10 +269,10 @@
filename = os.path.basename(self.testfile)
try:
- with open(self.testfile, 'rb') as f:
- filetext = base64.b64encode(f.read())
- except Exception as e:
- print(e)
+ with open(self.testfile, 'rb') as file_info:
+ filetext = base64.b64encode(file_info.read())
+ except Exception as concrete_error:
+ print(concrete_error)
return False
form['filename'] = filename
@@ -283,8 +288,8 @@
try:
request = Request(url, data=data, headers=headers)
response = urlopen(request)
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
return False
time_stop = time.time()
time_upload = time_stop - time_start
@@ -306,8 +311,8 @@
time_start = time.time()
try:
response = urlopen(url)
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
return False
time_stop = time.time()
time_download = time_stop - time_start
@@ -316,10 +321,10 @@
print(response.headers)
filetext = response.read()
try:
- with open(self.testfile, 'wb') as f:
- f.write(filetext)
- except Exception as e:
- print(e)
+ with open(self.testfile, 'wb') as file_info:
+ file_info.write(filetext)
+ except Exception as concrete_error:
+ print(concrete_error)
return False
size = os.path.getsize(self.testfile)
diff -urN oec-hardware/tests/network/rdma.py oec-hardware_new/tests/network/rdma.py
--- oec-hardware/tests/network/rdma.py 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/tests/network/rdma.py 2021-09-13 11:42:03.413474274 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""RDMA Test"""
+
import os
import re
import argparse
@@ -58,20 +60,20 @@
path_ibdev = 'infiniband_verbs/uverb*/ibdev'
path_ibdev = ''.join([path_pci, path_ibdev])
cmd = "cat %s" % path_ibdev
- c = Command(cmd)
+ com = Command(cmd)
try:
- self.ib_device = c.read()
- except Exception as e:
- print(e)
+ self.ib_device = com.read()
+ except Exception as concrete_error:
+ print(concrete_error)
return False
path_ibport = '/sys/class/net/%s/dev_id' % self.interface
cmd = "cat %s" % path_ibport
- c = Command(cmd)
+ com = Command(cmd)
try:
- self.ib_port = int(c.read(), 16) + 1
- except Exception as e:
- print(e)
+ self.ib_port = int(com.read(), 16) + 1
+ except Exception as concrete_error:
+ print(concrete_error)
return False
ib_str = "Infiniband device '%s' port %d" % (self.ib_device, self.ib_port)
@@ -79,9 +81,9 @@
cmd = "ibstatus"
print(cmd)
- c = Command(cmd)
+ com = Command(cmd)
try:
- output = c.read()
+ output = com.read()
for info in output.split('\n\n'):
if ib_str not in info:
continue
@@ -93,8 +95,8 @@
self.phys_state = re.search(r"phys state:\s+(.*)", info).group(1)
self.link_layer = re.search(r"link_layer:\s+(.*)", info).group(1)
self.speed = int(re.search(r"rate:\s+(\d*)", info).group(1)) * 1024
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
return False
return True
@@ -110,7 +112,7 @@
cmd = "rping -c -a %s -C 50 -v" % self.server_ip
print(cmd)
- if 0 == os.system(cmd):
+ if os.system(cmd) == 0:
return True
else:
self.call_remote_server('rping', 'stop')
@@ -129,7 +131,7 @@
print(cmd)
ret = os.system(cmd)
self.call_remote_server('rcopy', 'stop')
- return 0 == ret
+ return ret == 0
def test_bw(self, cmd):
"""
@@ -146,18 +148,18 @@
cmd = "%s %s -d %s -i %s" % (cmd, self.server_ip, self.ib_device, self.ib_port)
print(cmd)
- c = Command(cmd)
+ com = Command(cmd)
pattern = r"\s+(\d+)\s+(\d+)\s+([\.\d]+)\s+(?P<avg_bw>[\.\d]+)\s+([\.\d]+)"
try:
- avg_bw = c.get_str(pattern, 'avg_bw', False) # MB/sec
+ avg_bw = com.get_str(pattern, 'avg_bw', False) # MB/sec
avg_bw = float(avg_bw) * 8
tgt_bw = self.target_bandwidth_percent * self.speed
print("Current bandwidth is %.2fMb/s, target is %.2fMb/s" %
(avg_bw, tgt_bw))
return avg_bw > tgt_bw
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
self.call_remote_server(cmd, 'stop')
return False
@@ -203,11 +205,11 @@
Test ibstatus
:return:
"""
- if 0 != os.system("systemctl start opensm"):
+ if os.system("systemctl start opensm") != 0:
print("[X] start opensm failed.")
return False
- if 0 != os.system("modprobe ib_umad"):
+ if os.system("modprobe ib_umad") != 0:
print("[X] modprobe ib_umad failed.")
return False
diff -urN oec-hardware/tests/nvme/nvme.py oec-hardware_new/tests/nvme/nvme.py
--- oec-hardware/tests/nvme/nvme.py 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/tests/nvme/nvme.py 2021-09-13 11:42:03.413474274 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""Test Non-Volatile Memory express"""
+
import os
import sys
import argparse
@@ -63,7 +65,8 @@
sys.stdout.flush()
print("\nWritting...")
- Command("nvme write -z %d -s 0 -d /dev/urandom /dev/%s 2> /dev/null" % (size, disk)).echo()
+ Command("nvme write -z %d -s 0 -d /dev/urandom /dev/%s 2> /dev/null" \
+ % (size, disk)).echo()
sys.stdout.flush()
print("\nReading...")
@@ -81,9 +84,9 @@
Command("nvme list").echo(ignore_errors=True)
return True
- except Exception as e:
+ except Exception as concrete_error:
print("Error: nvme cmd fail.")
- print(e)
+ print(concrete_error)
return False
def in_use(self, disk):
diff -urN oec-hardware/tests/perf/perf.py oec-hardware_new/tests/perf/perf.py
--- oec-hardware/tests/perf/perf.py 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/tests/perf/perf.py 2021-09-13 11:42:03.413474274 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""Perf Test"""
+
import re
from hwcompatible.test import Test
from hwcompatible.command import Command
diff -urN oec-hardware/tests/system/system.py oec-hardware_new/tests/system/system.py
--- oec-hardware/tests/system/system.py 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/tests/system/system.py 2021-09-13 11:42:03.413474274 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""System Test"""
+
import os
import sys
import re
@@ -82,7 +84,7 @@
sys.stdout.flush()
if rpm_verify.output and len(rpm_verify.output) > 0:
return_code = False
- except:
+ except Exception:
print("Error: files in %s have been tampered." % cert_package)
return_code = False
return return_code
@@ -112,7 +114,7 @@
if kernel_dict.document[os_version] != self.sysinfo.kernel_version:
print("Error: kernel %s check GA status fail." % self.sysinfo.kernel_version)
return_code = False
- except:
+ except Exception:
print("Error: %s is not supported." % os_version)
return_code = False
@@ -147,12 +149,15 @@
print("")
tainted_file.close()
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
print("Error: could not determine if kernel is tainted.")
return_code = False
- if not os.system("rpm -V --nomtime --nomode --nocontexts %s" % kernel_rpm) is 0:
+ except_list = ["/modules.dep$", "/modules.symbols$", "/modules.dep.bin$", \
+ "/modules.symbols.bin$"]
+ if os.system("rpm -V --nomtime --nomode --nocontexts %s | grep -Ev '%s'" % \
+ (kernel_rpm, "|".join(except_list))) is 0:
print("Error: files from %s were modified." % kernel_rpm)
print("")
return_code = False
@@ -160,8 +165,8 @@
try:
params = Command("cat /proc/cmdline").get_str()
print("Boot Parameters: %s" % params)
- except Exception as e:
- print(e)
+ except Exception as concrete_error:
+ print(concrete_error)
print("Error: could not determine boot parameters.")
return_code = False
@@ -228,7 +233,8 @@
black_symbols = extra_symbols
if black_symbols:
- print("Error: The following symbols are used by %s are not on the ABI whitelist." % module)
+ print("Error: The following symbols are used by %s are not on the ABI \
+ whitelist." % module)
for symbol in black_symbols:
print(symbol)
return False
@@ -280,17 +286,17 @@
return None
if module_file[-2:] == "ko":
- nm = os.popen('modprobe --dump-modversions ' + module_file)
+ n_m = os.popen('modprobe --dump-modversions ' + module_file)
else:
- nm = open(module_file, "r")
+ n_m = open(module_file, "r")
while True:
- line = nm.readline()
+ line = n_m.readline()
if line == "":
break
symbols.append(line)
- nm.close()
+ n_m.close()
return self.readSymbols(symbols)
def get_modulefile(self, module):
@@ -304,7 +310,7 @@
if os.path.islink(modulefile):
modulefile = os.readlink(modulefile)
return modulefile
- except:
+ except Exception:
print("Error: could no find module file for %s:" % module)
return None
diff -urN oec-hardware/tests/usb/usb.py oec-hardware_new/tests/usb/usb.py
--- oec-hardware/tests/usb/usb.py 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/tests/usb/usb.py 2021-09-13 11:42:03.413474274 +0800
@@ -12,6 +12,8 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
+"""Usb test"""
+
import sys
import time
@@ -28,7 +30,7 @@
def __init__(self):
Test.__init__(self)
self.requirements = ["usbutils"]
- self.ui = CommandUI()
+ self.com_ui = CommandUI()
def test(self):
"""
@@ -46,7 +48,7 @@
print("#############")
while True:
print("Please plug in a USB device.")
- if self.ui.prompt_confirm("Done well?"):
+ if self.com_ui.prompt_confirm("Done well?"):
break
time.sleep(1)
@@ -73,7 +75,7 @@
plugged_device = new_plugged
while True:
print("Please unplug the USB device you plugged in just now.")
- if self.ui.prompt_confirm("Done well?"):
+ if self.com_ui.prompt_confirm("Done well?"):
break
time.sleep(1)
@@ -94,7 +96,7 @@
sys.stdout.flush()
plugged_device = new_plugged
- if self.ui.prompt_confirm("All usb sockets have been tested?"):
+ if self.com_ui.prompt_confirm("All usb sockets have been tested?"):
return True
def get_usb(self):
diff -urN oec-hardware/tests/watchdog/watchdog.py oec-hardware_new/tests/watchdog/watchdog.py
--- oec-hardware/tests/watchdog/watchdog.py 2021-09-13 11:43:59.394364553 +0800
+++ oec-hardware_new/tests/watchdog/watchdog.py 2021-09-13 11:42:03.413474274 +0800
@@ -43,8 +43,9 @@
os.chdir(self.test_dir)
try:
- timeout = Command("./watchdog -g").get_str(regex="^Watchdog timeout is (?P<timeout>[0-9]*) seconds.$",
- regex_group="timeout")
+ timeout = Command("./watchdog -g").get_str(
+ regex="^Watchdog timeout is (?P<timeout>[0-9]*) seconds.$",
+ regex_group="timeout")
timeout = int(timeout)
if timeout > self.max_timeout:
Command("./watchdog -s %d" % self.max_timeout).echo()