forked from Qortal/Brooklyn
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
380 lines
13 KiB
380 lines
13 KiB
# SPDX-License-Identifier: GPL-2.0 |
|
# |
|
# Runs UML kernel, collects output, and handles errors. |
|
# |
|
# Copyright (C) 2019, Google LLC. |
|
# Author: Felix Guo <[email protected]> |
|
# Author: Brendan Higgins <[email protected]> |
|
|
|
import importlib.abc |
|
import importlib.util |
|
import logging |
|
import subprocess |
|
import os |
|
import shutil |
|
import signal |
|
import threading |
|
from typing import Iterator, List, Optional, Tuple |
|
|
|
import kunit_config |
|
import kunit_parser |
|
import qemu_config |
|
|
|
KCONFIG_PATH = '.config' |
|
KUNITCONFIG_PATH = '.kunitconfig' |
|
OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig' |
|
DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config' |
|
BROKEN_ALLCONFIG_PATH = 'tools/testing/kunit/configs/broken_on_uml.config' |
|
OUTFILE_PATH = 'test.log' |
|
ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__)) |
|
QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs') |
|
|
|
def get_file_path(build_dir, default): |
|
if build_dir: |
|
default = os.path.join(build_dir, default) |
|
return default |
|
|
|
class ConfigError(Exception): |
|
"""Represents an error trying to configure the Linux kernel.""" |
|
|
|
|
|
class BuildError(Exception): |
|
"""Represents an error trying to build the Linux kernel.""" |
|
|
|
|
|
class LinuxSourceTreeOperations(object): |
|
"""An abstraction over command line operations performed on a source tree.""" |
|
|
|
def __init__(self, linux_arch: str, cross_compile: Optional[str]): |
|
self._linux_arch = linux_arch |
|
self._cross_compile = cross_compile |
|
|
|
def make_mrproper(self) -> None: |
|
try: |
|
subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT) |
|
except OSError as e: |
|
raise ConfigError('Could not call make command: ' + str(e)) |
|
except subprocess.CalledProcessError as e: |
|
raise ConfigError(e.output.decode()) |
|
|
|
def make_arch_qemuconfig(self, kconfig: kunit_config.Kconfig) -> None: |
|
pass |
|
|
|
def make_allyesconfig(self, build_dir, make_options) -> None: |
|
raise ConfigError('Only the "um" arch is supported for alltests') |
|
|
|
def make_olddefconfig(self, build_dir, make_options) -> None: |
|
command = ['make', 'ARCH=' + self._linux_arch, 'olddefconfig'] |
|
if self._cross_compile: |
|
command += ['CROSS_COMPILE=' + self._cross_compile] |
|
if make_options: |
|
command.extend(make_options) |
|
if build_dir: |
|
command += ['O=' + build_dir] |
|
print('Populating config with:\n$', ' '.join(command)) |
|
try: |
|
subprocess.check_output(command, stderr=subprocess.STDOUT) |
|
except OSError as e: |
|
raise ConfigError('Could not call make command: ' + str(e)) |
|
except subprocess.CalledProcessError as e: |
|
raise ConfigError(e.output.decode()) |
|
|
|
def make(self, jobs, build_dir, make_options) -> None: |
|
command = ['make', 'ARCH=' + self._linux_arch, '--jobs=' + str(jobs)] |
|
if make_options: |
|
command.extend(make_options) |
|
if self._cross_compile: |
|
command += ['CROSS_COMPILE=' + self._cross_compile] |
|
if build_dir: |
|
command += ['O=' + build_dir] |
|
print('Building with:\n$', ' '.join(command)) |
|
try: |
|
proc = subprocess.Popen(command, |
|
stderr=subprocess.PIPE, |
|
stdout=subprocess.DEVNULL) |
|
except OSError as e: |
|
raise BuildError('Could not call execute make: ' + str(e)) |
|
except subprocess.CalledProcessError as e: |
|
raise BuildError(e.output) |
|
_, stderr = proc.communicate() |
|
if proc.returncode != 0: |
|
raise BuildError(stderr.decode()) |
|
if stderr: # likely only due to build warnings |
|
print(stderr.decode()) |
|
|
|
def start(self, params: List[str], build_dir: str) -> subprocess.Popen: |
|
raise RuntimeError('not implemented!') |
|
|
|
|
|
class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations): |
|
|
|
def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]): |
|
super().__init__(linux_arch=qemu_arch_params.linux_arch, |
|
cross_compile=cross_compile) |
|
self._kconfig = qemu_arch_params.kconfig |
|
self._qemu_arch = qemu_arch_params.qemu_arch |
|
self._kernel_path = qemu_arch_params.kernel_path |
|
self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot' |
|
self._extra_qemu_params = qemu_arch_params.extra_qemu_params |
|
|
|
def make_arch_qemuconfig(self, base_kunitconfig: kunit_config.Kconfig) -> None: |
|
kconfig = kunit_config.parse_from_string(self._kconfig) |
|
base_kunitconfig.merge_in_entries(kconfig) |
|
|
|
def start(self, params: List[str], build_dir: str) -> subprocess.Popen: |
|
kernel_path = os.path.join(build_dir, self._kernel_path) |
|
qemu_command = ['qemu-system-' + self._qemu_arch, |
|
'-nodefaults', |
|
'-m', '1024', |
|
'-kernel', kernel_path, |
|
'-append', '\'' + ' '.join(params + [self._kernel_command_line]) + '\'', |
|
'-no-reboot', |
|
'-nographic', |
|
'-serial stdio'] + self._extra_qemu_params |
|
print('Running tests with:\n$', ' '.join(qemu_command)) |
|
return subprocess.Popen(' '.join(qemu_command), |
|
stdin=subprocess.PIPE, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.STDOUT, |
|
text=True, shell=True, errors='backslashreplace') |
|
|
|
class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations): |
|
"""An abstraction over command line operations performed on a source tree.""" |
|
|
|
def __init__(self, cross_compile=None): |
|
super().__init__(linux_arch='um', cross_compile=cross_compile) |
|
|
|
def make_allyesconfig(self, build_dir, make_options) -> None: |
|
kunit_parser.print_with_timestamp( |
|
'Enabling all CONFIGs for UML...') |
|
command = ['make', 'ARCH=um', 'allyesconfig'] |
|
if make_options: |
|
command.extend(make_options) |
|
if build_dir: |
|
command += ['O=' + build_dir] |
|
process = subprocess.Popen( |
|
command, |
|
stdout=subprocess.DEVNULL, |
|
stderr=subprocess.STDOUT) |
|
process.wait() |
|
kunit_parser.print_with_timestamp( |
|
'Disabling broken configs to run KUnit tests...') |
|
|
|
with open(get_kconfig_path(build_dir), 'a') as config: |
|
with open(BROKEN_ALLCONFIG_PATH, 'r') as disable: |
|
config.write(disable.read()) |
|
kunit_parser.print_with_timestamp( |
|
'Starting Kernel with all configs takes a few minutes...') |
|
|
|
def start(self, params: List[str], build_dir: str) -> subprocess.Popen: |
|
"""Runs the Linux UML binary. Must be named 'linux'.""" |
|
linux_bin = get_file_path(build_dir, 'linux') |
|
return subprocess.Popen([linux_bin] + params, |
|
stdin=subprocess.PIPE, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.STDOUT, |
|
text=True, errors='backslashreplace') |
|
|
|
def get_kconfig_path(build_dir) -> str: |
|
return get_file_path(build_dir, KCONFIG_PATH) |
|
|
|
def get_kunitconfig_path(build_dir) -> str: |
|
return get_file_path(build_dir, KUNITCONFIG_PATH) |
|
|
|
def get_old_kunitconfig_path(build_dir) -> str: |
|
return get_file_path(build_dir, OLD_KUNITCONFIG_PATH) |
|
|
|
def get_outfile_path(build_dir) -> str: |
|
return get_file_path(build_dir, OUTFILE_PATH) |
|
|
|
def get_source_tree_ops(arch: str, cross_compile: Optional[str]) -> LinuxSourceTreeOperations: |
|
config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py') |
|
if arch == 'um': |
|
return LinuxSourceTreeOperationsUml(cross_compile=cross_compile) |
|
elif os.path.isfile(config_path): |
|
return get_source_tree_ops_from_qemu_config(config_path, cross_compile)[1] |
|
|
|
options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')] |
|
raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options))) |
|
|
|
def get_source_tree_ops_from_qemu_config(config_path: str, |
|
cross_compile: Optional[str]) -> Tuple[ |
|
str, LinuxSourceTreeOperations]: |
|
# The module name/path has very little to do with where the actual file |
|
# exists (I learned this through experimentation and could not find it |
|
# anywhere in the Python documentation). |
|
# |
|
# Bascially, we completely ignore the actual file location of the config |
|
# we are loading and just tell Python that the module lives in the |
|
# QEMU_CONFIGS_DIR for import purposes regardless of where it actually |
|
# exists as a file. |
|
module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path)) |
|
spec = importlib.util.spec_from_file_location(module_path, config_path) |
|
assert spec is not None |
|
config = importlib.util.module_from_spec(spec) |
|
# See https://github.com/python/typeshed/pull/2626 for context. |
|
assert isinstance(spec.loader, importlib.abc.Loader) |
|
spec.loader.exec_module(config) |
|
|
|
if not hasattr(config, 'QEMU_ARCH'): |
|
raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path) |
|
params: qemu_config.QemuArchParams = config.QEMU_ARCH # type: ignore |
|
return params.linux_arch, LinuxSourceTreeOperationsQemu( |
|
params, cross_compile=cross_compile) |
|
|
|
class LinuxSourceTree(object): |
|
"""Represents a Linux kernel source tree with KUnit tests.""" |
|
|
|
def __init__( |
|
self, |
|
build_dir: str, |
|
load_config=True, |
|
kunitconfig_path='', |
|
kconfig_add: Optional[List[str]]=None, |
|
arch=None, |
|
cross_compile=None, |
|
qemu_config_path=None) -> None: |
|
signal.signal(signal.SIGINT, self.signal_handler) |
|
if qemu_config_path: |
|
self._arch, self._ops = get_source_tree_ops_from_qemu_config( |
|
qemu_config_path, cross_compile) |
|
else: |
|
self._arch = 'um' if arch is None else arch |
|
self._ops = get_source_tree_ops(self._arch, cross_compile) |
|
|
|
if not load_config: |
|
return |
|
|
|
if kunitconfig_path: |
|
if os.path.isdir(kunitconfig_path): |
|
kunitconfig_path = os.path.join(kunitconfig_path, KUNITCONFIG_PATH) |
|
if not os.path.exists(kunitconfig_path): |
|
raise ConfigError(f'Specified kunitconfig ({kunitconfig_path}) does not exist') |
|
else: |
|
kunitconfig_path = get_kunitconfig_path(build_dir) |
|
if not os.path.exists(kunitconfig_path): |
|
shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, kunitconfig_path) |
|
|
|
self._kconfig = kunit_config.parse_file(kunitconfig_path) |
|
if kconfig_add: |
|
kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add)) |
|
self._kconfig.merge_in_entries(kconfig) |
|
|
|
|
|
def clean(self) -> bool: |
|
try: |
|
self._ops.make_mrproper() |
|
except ConfigError as e: |
|
logging.error(e) |
|
return False |
|
return True |
|
|
|
def validate_config(self, build_dir) -> bool: |
|
kconfig_path = get_kconfig_path(build_dir) |
|
validated_kconfig = kunit_config.parse_file(kconfig_path) |
|
if self._kconfig.is_subset_of(validated_kconfig): |
|
return True |
|
invalid = self._kconfig.entries() - validated_kconfig.entries() |
|
message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \ |
|
'This is probably due to unsatisfied dependencies.\n' \ |
|
'Missing: ' + ', '.join([str(e) for e in invalid]) |
|
if self._arch == 'um': |
|
message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \ |
|
'on a different architecture with something like "--arch=x86_64".' |
|
logging.error(message) |
|
return False |
|
|
|
def build_config(self, build_dir, make_options) -> bool: |
|
kconfig_path = get_kconfig_path(build_dir) |
|
if build_dir and not os.path.exists(build_dir): |
|
os.mkdir(build_dir) |
|
try: |
|
self._ops.make_arch_qemuconfig(self._kconfig) |
|
self._kconfig.write_to_file(kconfig_path) |
|
self._ops.make_olddefconfig(build_dir, make_options) |
|
except ConfigError as e: |
|
logging.error(e) |
|
return False |
|
if not self.validate_config(build_dir): |
|
return False |
|
|
|
old_path = get_old_kunitconfig_path(build_dir) |
|
if os.path.exists(old_path): |
|
os.remove(old_path) # write_to_file appends to the file |
|
self._kconfig.write_to_file(old_path) |
|
return True |
|
|
|
def _kunitconfig_changed(self, build_dir: str) -> bool: |
|
old_path = get_old_kunitconfig_path(build_dir) |
|
if not os.path.exists(old_path): |
|
return True |
|
|
|
old_kconfig = kunit_config.parse_file(old_path) |
|
return old_kconfig.entries() != self._kconfig.entries() |
|
|
|
def build_reconfig(self, build_dir, make_options) -> bool: |
|
"""Creates a new .config if it is not a subset of the .kunitconfig.""" |
|
kconfig_path = get_kconfig_path(build_dir) |
|
if not os.path.exists(kconfig_path): |
|
print('Generating .config ...') |
|
return self.build_config(build_dir, make_options) |
|
|
|
existing_kconfig = kunit_config.parse_file(kconfig_path) |
|
self._ops.make_arch_qemuconfig(self._kconfig) |
|
if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir): |
|
return True |
|
print('Regenerating .config ...') |
|
os.remove(kconfig_path) |
|
return self.build_config(build_dir, make_options) |
|
|
|
def build_kernel(self, alltests, jobs, build_dir, make_options) -> bool: |
|
try: |
|
if alltests: |
|
self._ops.make_allyesconfig(build_dir, make_options) |
|
self._ops.make_olddefconfig(build_dir, make_options) |
|
self._ops.make(jobs, build_dir, make_options) |
|
except (ConfigError, BuildError) as e: |
|
logging.error(e) |
|
return False |
|
return self.validate_config(build_dir) |
|
|
|
def run_kernel(self, args=None, build_dir='', filter_glob='', timeout=None) -> Iterator[str]: |
|
if not args: |
|
args = [] |
|
args.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt']) |
|
if filter_glob: |
|
args.append('kunit.filter_glob='+filter_glob) |
|
|
|
process = self._ops.start(args, build_dir) |
|
assert process.stdout is not None # tell mypy it's set |
|
|
|
# Enforce the timeout in a background thread. |
|
def _wait_proc(): |
|
try: |
|
process.wait(timeout=timeout) |
|
except Exception as e: |
|
print(e) |
|
process.terminate() |
|
process.wait() |
|
waiter = threading.Thread(target=_wait_proc) |
|
waiter.start() |
|
|
|
output = open(get_outfile_path(build_dir), 'w') |
|
try: |
|
# Tee the output to the file and to our caller in real time. |
|
for line in process.stdout: |
|
output.write(line) |
|
yield line |
|
# This runs even if our caller doesn't consume every line. |
|
finally: |
|
# Flush any leftover output to the file |
|
output.write(process.stdout.read()) |
|
output.close() |
|
process.stdout.close() |
|
|
|
waiter.join() |
|
subprocess.call(['stty', 'sane']) |
|
|
|
def signal_handler(self, sig, frame) -> None: |
|
logging.error('Build interruption occurred. Cleaning console.') |
|
subprocess.call(['stty', 'sane'])
|
|
|