#!/usr/bin/python3
# SPDX-License-Identifier: MIT
import subprocess
import re
import enum
import pathlib
from testcrush.utils import get_logger, to_snake_case
from typing import Any
log = get_logger()
[docs]
class Compilation(enum.Enum):
"""Statuses for the VCS compilation of HDL sources."""
ERROR = "ERROR" # stderr contains text
SUCCESS = "SUCCESS" # None of the above
[docs]
class LogicSimulation(enum.Enum):
"""Statuses for the simv logic simulation of a given program."""
TIMEOUT = "TIMEOUT" # Endless loop
SIM_ERROR = "ERROR" # stderr contains text
SUCCESS = "SUCCESS" # None of the above
class LogicSimulationException(BaseException):
"""Custom exception for the simv logic simulation."""
def __init__(self, message="Error during VC Logic Simulation"):
self.message = message
super().__init__(self.message)
[docs]
class FaultSimulation(enum.Enum):
"""Statuses for the Z01X fault simulation."""
TIMEOUT = "TIMEOUT" # Wall-clock
FSIM_ERROR = "ERROR" # stderr contains text
SUCCESS = "SUCCESS" # None of the above
[docs]
class Fault:
"""
Generic representation of a **prime** fault
Each prime fault has two static attributes which are:
-``equivalent_faults`` (int): Corresponds to the total number of faults equivalent to this fault. Defaults to 1 i.e.
itself.
-``equivalent_to`` (Fault): A reference to the primary fault with which the current fault is equivalent. If the
current fault is prime. Defaults to ``None``.
When a fault is constructed it corresponds to a prime fault. It is up to the user to resolve any fault equivalence
by modifying the aforementioned attributes.
"""
def __init__(self, **fault_attributes: dict[str, Any]) -> 'Fault':
for attribute, value in fault_attributes.items():
setattr(self, attribute.replace(" ", "_"), value)
self.equivalent_faults: int = 1
self.equivalent_to: Fault = None
def __repr__(self):
attrs = ', '.join(f'{key}={value!r}' for key, value in self.__dict__.items()
if key not in ("equivalent_faults", "equivalent_to")) # Avoid recursive reprs
return f'{self.__class__.__name__}({attrs})'
def __str__(self):
return ', '.join(f'{key}: {value}' for key, value in self.__dict__.items()
if key not in ("equivalent_faults", "equivalent_to")) # Avoid recursive reprs
def __eq__(self, other):
if isinstance(other, Fault):
return self.__dict__ == other.__dict__
return False
[docs]
def set(self, attribute: str, value: Any) -> None:
setattr(self, attribute, value)
[docs]
def get(self, attribute: str, default: str | None = None) -> str | Any:
"""
Generic getter method for arbitrary attribute.
Args:
attribute (str): The requested attribute of the fault.
default (str | None): A default value to be used as a guard.
Returns:
str | Any: The fault attribute. If no cast has been performed on
the attribute then the default type is ``str``.
"""
return getattr(self, attribute.replace(" ", "_"), default)
[docs]
def cast_attribute(self, attribute: str, func: callable) -> None:
"""
Casts the type of the internal attribute
Args:
attribute (str): The requested attribute of the fault to be casted.
func (callable): A function to cast the fault attribute.
Returns:
None
Raises:
KeyError: If the requested attribute does not exist.
ValueError: If the cast cannot be performed e.g., when
``int('a')``.
"""
attribute = attribute.replace(" ", "_")
try:
self.__dict__[attribute] = func(getattr(self, attribute))
except KeyError:
log.error(f"Attribute {attribute} not a member of {self.__class__}")
except ValueError:
log.critical(f"Unable to cast to {repr(func)} of attribute {getattr(self, attribute)}")
exit(1)
[docs]
def is_prime(self) -> bool:
"""
Checks whether the fault is a prime fault, i.e., is not equivalent to any other fault.
Returns:
bool: True if fault is prime. False otherwise.
"""
return self.equivalent_to is None
[docs]
class TxtFaultReport:
"""
Manages the VC-Z01X text report.
"""
__slots__ = ["fault_report_path", "fault_report", "fault_list", "status_groups", "coverage"]
def __init__(self, fault_report: pathlib.Path) -> "TxtFaultReport":
self.fault_report_path = fault_report # Store the path, but don't read the file yet
self.fault_report = None
self.fault_list = None
self.status_groups = None
self.coverage = None
def __str__(self) -> str:
return f"{self.fault_report_path.resolve()}"
def _load_fault_report(self):
"""Load the fault report from the file."""
if not self.fault_report_path.exists():
raise FileNotFoundError(f"Fault report file {self.fault_report_path} not found.")
with open(self.fault_report_path) as src:
self.fault_report = src.read()
def _parse_sections(self):
"""Parse the required sections from the fault report."""
# Ensure the fault report is loaded before parsing
if self.fault_report is None:
raise ValueError("Fault report is not loaded.")
# Lazy import to avoid circular dependencies
from testcrush.grammars.transformers import FaultReportTransformerFactory
factory = FaultReportTransformerFactory()
for section in ["StatusGroups", "Coverage", "FaultList"]:
parser = factory(section)
try:
raw_section = self.extract(section)
except ValueError: # section doesn't exist
setattr(self, to_snake_case(section), None)
continue
log.debug(f"Parsing {section}")
setattr(self, to_snake_case(section), parser.parse(raw_section))
[docs]
def update(self):
"""Update and parse all sections once the fault report file is available."""
self._load_fault_report() # Read the file
self._parse_sections() # Parse all sections
[docs]
def compute_coverage(self, requested_formula: str = None, precision: int = 4) -> dict[str, float] | float:
"""
Manually computes the coverage based on the current fault report.
Args:
requested_formula (str, optional): The name of the coverage formula from the ``Coverage {}`` section of the
fault report. Defaults to None.
precision (int, optional): The requested float precision. Defaults to 4.
Returns:
dict[str, float] | float: If no formula name is provided, returns a dictionary mapping formula names to
their corresponding evaluated coverage values as floats. If a formula name is specified, returns only
the evaluated value for that formula.
.. todo::
* Implement default Test/Fault coverage computation per-Z01X if no ``Coverage{}`` section exists.
"""
log.info(f"Computing coverage {requested_formula}.")
self.update()
retval = list()
fault_statusses = dict()
for fault in self.fault_list:
status = fault.fault_status
if status in fault_statusses:
fault_statusses[status] += 1
else:
fault_statusses[status] = 1
status_groups = dict()
if self.status_groups:
for group, statuses in self.status_groups.items():
status_groups[group] = 0
for status in statuses:
if status in fault_statusses:
status_groups[group] += fault_statusses[status]
# We expect that if a coverage formula is specified
# i.e., Coverage{} exists, then there may be some
# variables there which may not exist in statuses
# or groups. Hence we must set them to 0.
non_present = dict()
if self.coverage:
for formula_name, formula in self.coverage.items():
for status_or_group in re.findall(r"[A-Z]{2}", formula):
if status_or_group not in fault_statusses and status_or_group not in status_groups:
non_present[status_or_group] = 0
retval.append((formula_name,
round(eval(formula, {**fault_statusses, **status_groups, **non_present}),
precision)))
# Else: TODO: Implement default coverage computation according to manual
return dict(retval)[requested_formula] if requested_formula else dict(retval)
[docs]
class ZoixInvoker:
"""A wrapper class to be used in handling calls to VCS-Z01X."""
def __init__(self) -> "ZoixInvoker":
...
[docs]
@staticmethod
def execute(instruction: str, timeout: float = None) -> tuple[str, str]:
"""
Executes a **bash** instruction and returns the ``stdout`` and ``stderr`` responses as a tuple.
Args:
instruction (str): The bash instruction to be executed.
Returns:
tuple(str, str): The stdout (index 0) and the stderr (index 1)
as strings.
"""
log.debug(f"Executing {instruction}...")
with subprocess.Popen(["/bin/bash", "-c", instruction],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True) as process:
try:
stdout, stderr = process.communicate(timeout=timeout)
return stdout, stderr
except subprocess.TimeoutExpired:
log.debug(f"TIMEOUT during the execution of:\n\t{instruction}")
process.kill()
return "TimeoutExpired", "TimeoutExpired"
[docs]
def compile_sources(self, *instructions: str) -> Compilation:
"""
Performs compilation of HDL files
Args:
instructions (str): A variadic number of bash shell instructions
Returns:
Compilation: A status Enum to signify the success or failure of the compilation.
- ERROR: if any text was found in the ``stderr`` stream during the execution of an instruction.
- SUCCESS: otherwise.
"""
compilation_status = Compilation.SUCCESS
for cmd in instructions:
stdout, stderr = self.execute(cmd)
if stderr:
compilation_status = Compilation.ERROR
break
return compilation_status
[docs]
def logic_simulate(self, *instructions: str, **kwargs) -> LogicSimulation:
"""
Performs logic simulation of user-defined firmware and captures the test application time.
A timeout value must be specified in order to avoid endless loops that will hang the program. There are
two important kwargs that the user must specify. The success regexp and the tat regexp. During a logic
simulation, the simulator typically stops when a ``$finish`` call is met. However, in a non-trivial DUT case
like e.g., a processor, there are many things that may go wrong like for instance an out-of-bounds read or
write from/to a memory. In that case, it is up to the designer to handle accordingly the situation e.g., issue
a ``$fatal`` call. Which means, that in order to be accurate and know whether the logic simulation terminates
gracefully, some sort of ``$display`` must be specified -or- at least the ``$finish`` statement must be invoked
from the correct place. For this reason, the success regexp is required in order to know that not only the logic
simulation ended but that it also ended without causing any kind of violation in the DUT.
When it comes to the tat regexp, this can be either a custom message again issued by the testbench or the time
of the simulation that the correct ``$finish`` statement was issued. It is up to the user to specify it. However
in order for the logic simulation to be considerred successful the success regexp AND the tat regexp must match
something.
Args:
instructions (str): A variadic number of bash instructions
kwargs: User-defined options needed for the evaluation of the result of the logic simulation.
These options are:
- **timeout** (float): A timeout in **seconds** to be used for **each** of the executed logic
simulation instructions.
- **simulation_ok_regex** (re.Pattern): A regular expression used for matching in every line of the
``stdout`` stream to mark the successful completion of the logic simulation.
- **test_application_time_regex** (re.Pattern): A regular expression used to match the line that reports
the test application time from the simulator.
- **test_application_time_regex_group_no** (int): The index of the capture group in the custom regular
expression for the TaT value. Default is 1, corresponding to the ``success_regexp`` group.
- **tat_value** (list): An **empty** list to store the TaT value after being successfully matched with
``success_regexp``. The list is used to mimic a pass-by-reference.
Returns:
LogicSimulation: A status Enum which is:
- TIMEOUT: if user defined timeout has been triggered.
- SIM_ERROR: if any text was found in the ``stderr`` stream during the execution of an instruction.
- SUCCESS: if the halting regexp matched text from the ``stdout`` stream.
"""
timeout: float = kwargs.get("timeout", None)
# The default regexp catches the line:
# $finish at simulation time XXXXXXYs
# where X = a digit and Y = time unit.
# Capturing of the simulation duration
# done for possible TaT purposes. NOTE
# that it is advised that BOTH regular
# expressions ARE specified and that
# the default value is not used. That
# is because it may be that the TB may
# have >=1 $finish call locations e.g.
# on an out-of-bounds read/write case.
# Hence, be as accurate as possible!!!
default_regexp = re.compile(r"\$finish[^0-9]+([0-9]+)[m|u|n|p]s", re.DOTALL)
success_regexp: re.Pattern = kwargs.get("simulation_ok_regex", default_regexp)
tat_regexp: re.Pattern = kwargs.get("test_application_time_regex", default_regexp)
# By default, a single capturing group
# is expected in the regexp, which maps
# to the TaT value. If a custom regexp
# is provided however, with >1 groups,
# then the user must specify which is
# the expected capture group.
tat_capture_group: int = kwargs.get("test_application_time_regex_group_no", 1)
# An empty mutable container is expected
# to store the TaT value matched from
# the regexp. It is used like that to
# mimic pass-by-reference and not alter
# the function's return value.
tat_value: list = kwargs.get("tat_value", [])
simulation_status = None
# Loop control flags
exit_success = False
tat_success = False
for cmd in instructions:
stdout, stderr = self.execute(cmd, timeout=timeout)
if stderr and stderr != "TimeoutExpired":
log.debug(f"Error during execution of {cmd}\n\
------[STDERR STREAM]------\n\
{'-'.join(stderr.splitlines(keepends=True))}\n\
---------------------------\n")
simulation_status = LogicSimulation.SIM_ERROR
break
elif stderr == stdout == "TimeoutExpired":
simulation_status = LogicSimulation.TIMEOUT
break
for line in stdout.splitlines():
log.debug(f"{cmd}: {line.rstrip()}")
# Exit success
success_match: re.Match = re.search(success_regexp, line)
if success_match:
log.debug(f"Exit Success: {success_match.groups()}")
exit_success = True
# TaT matching
tat_match: re.Match = re.search(tat_regexp, line)
if tat_match:
test_application_time = tat_match.group(tat_capture_group)
try:
tat_value.append(int(test_application_time))
tat_success = True
except ValueError:
raise LogicSimulationException(f"Test application time was not correctly captured \
{test_application_time=} and could not be converted to an integer. Perhaps there is something wrong with your regular \
expression '{tat_regexp}' ?")
log.debug(f"TaT Captured: {tat_match.groups()}")
if tat_success and exit_success:
break
if tat_success and exit_success:
log.debug(f"Simulation Success! {exit_success=} and {tat_success=}.")
simulation_status = LogicSimulation.SUCCESS
elif not (tat_success and exit_success) and simulation_status != LogicSimulation.TIMEOUT:
log.debug(f"Simulation Failed! {exit_success=} and {tat_success=}.")
simulation_status = LogicSimulation.SIM_ERROR
return simulation_status
[docs]
def fault_simulate(self, *instructions: str, **kwargs) -> FaultSimulation:
"""
Performs fault simulation of a user-defined firmware.
Args:
instructions (str): A variadic number of shell instructions
to invoke Z01X.
kwargs: User-defined options for fault simulation control.
- timeout (float): A timeout in **seconds** for each fsim
instruction.
- allow_regexs (list[re.Pattern]): Series of regexps to look for in
``stderr`` and allow continuation without raising any error
messages.
Returns:
FaultSimulation: A status Enum which is:
- TIMEOUT: if the timeout kwarg was provided and some
instruction exceeded it.
- FSIM_ERROR: if the ``stderr`` stream contains text during the
execution of an instruction.
- SUCCESS: if none of the above.
"""
fault_simulation_status = FaultSimulation.SUCCESS
timeout: float = kwargs.get("timeout", None)
allow: list[re.Pattern] = kwargs.get("allow_regexs", None)
for cmd in instructions:
stdout, stderr = self.execute(cmd, timeout=timeout)
if stderr and stderr != "TimeoutExpired":
if allow:
continue_execution = False
for regexp in allow:
if regexp.search(stderr):
log.debug(f"Allowing message {regexp.search(stderr)}")
continue_execution = True
break
if continue_execution:
continue
log.debug(f"Error during execution of {cmd}\n\
------[STDERR STREAM]------\n\
{'-'.join(stderr.splitlines(keepends=True))}\n\
---------------------------\n")
fault_simulation_status = FaultSimulation.FSIM_ERROR
break
elif stderr == stdout == "TimeoutExpired":
fault_simulation_status = FaultSimulation.TIMEOUT
break
return fault_simulation_status