#!/usr/bin/python3
# SPDX-License-Identifier: MIT
import pathlib
import re
import random
import csv
import time
import sqlite3
import io
import os
import testcrush.grammars.transformers as transformers
from testcrush.utils import get_logger, compile_assembly, zip_archive, Singleton, addr2line, reap_process_tree
from testcrush import asm, zoix
from typing import Any
log = get_logger()
class CSVCompactionStatistics(metaclass=Singleton):
"""Manages I/O operations on the CSV file which logs the statistics of the A0."""
_header = ["asm_source", "removed_codeline", "compiles", "lsim_ok",
"tat", "fsim_ok", "coverage", "verdict"]
def __init__(self, output: pathlib.Path) -> 'CSVCompactionStatistics':
self._file = open(output, 'w')
self.writer: csv._writer = csv.writer(self._file)
self.writer.writerow(self._header)
def __iadd__(self, rowline: dict):
self.writer.writerow(rowline.values())
self._file.flush()
return self
[docs]
class Preprocessor(metaclass=Singleton):
"""Filters out candidate instructions"""
_trace_db = ".trace.db"
def __init__(self, fault_list: list[zoix.Fault], **kwargs) -> 'Preprocessor':
factory = transformers.TraceTransformerFactory()
parser = factory(kwargs.get("processor_name"))
processor_trace = kwargs.get("processor_trace")
with open(processor_trace) as src:
trace_raw = src.read()
self.trace = parser.parse(trace_raw)
self.fault_list: list[zoix.Fault] = fault_list
self.elf = kwargs.get("elf_file")
self.zoix2trace = kwargs.get("zoix_to_trace")
self._create_trace_db()
def _create_trace_db(self):
"""
Transforms the trace of the DUT to a SQLite database of a single table. The header of the CSV is mapped to the
DB column names and then the CSV body is transformed into DB row entries.
"""
# If pre-existent db is found, delete it.
db = pathlib.Path(self._trace_db)
if db.exists():
log.debug(f"Database {self._trace_db} exists. Overwritting it.")
db.unlink()
con = sqlite3.connect(self._trace_db)
cursor = con.cursor()
header: list[str] = self.trace[0].split(',')
header = list(map(lambda column_name: f"\"{column_name}\"", header))
header = ", ".join(header)
cursor.execute(f"CREATE TABLE trace({header})")
body: list[str] = self.trace[1:]
with io.StringIO('\n'.join(body)) as source:
for row in csv.reader(source):
cursor.execute(f"INSERT INTO trace VALUES ({', '.join(['?'] * len(row))})", row)
con.commit()
con.close()
log.debug(f"Database {self._trace_db} created.")
[docs]
def query_trace_db(self, select: str, where: dict[str, str],
history: int = 5, allow_multiple: bool = False) -> list[tuple[str, ...]]:
"""
Perform a query with the specified parameters.
Assuming that the DB looks like this:
::
Time || Cycle || PC || Instruction
-----||-------||----------||------------
10ns || 1 || 00000004 || and
20ns || 2 || 00000008 || or <-*
30ns || 3 || 0000000c || xor <-|
40ns || 4 || 00000010 || sll <-|
50ns || 5 || 00000014 || j <-|
60ns || 6 || 0000004c || addi <-*
70ns || 7 || 00000050 || wfi
And you perform a query for the ``select="PC"`` and ``where={"PC": "0000004c", "Time": "60ns"}`` then the search
would result in a window of 1+4 ``PC`` values, indicated by ``<-`` in the snapshot above. The size of the window
defaults to 5 but can be freely selected by the user.
Args:
select (str): The field to select in the query.
where (dict[str, str]): A dictionary specifying conditions to filter the query.
history (int, optional): The number of past queries to include. Defaults to 5.
allow_multiple (bool, optional): Whether to allow multiple results. Defaults to False.
Returns:
list[tuple[str, ...]: A list of query results (tuples of strings) matching the criteria.
"""
db = pathlib.Path(self._trace_db)
if not db.exists():
raise FileNotFoundError("Trace DB not found")
columns = where.keys()
query = f"""
SELECT ROWID
FROM trace
WHERE {' AND '.join([f'{x} = ?' for x in columns])}
"""
values = where.values()
with sqlite3.connect(db) as con:
cursor = con.cursor()
cursor.execute(query, tuple(values))
rowids = cursor.fetchall()
if not rowids:
raise ValueError(f"No row found for {', '.join([f'{k}={v}' for k, v in where.items()])}")
if len(rowids) > 1 and not allow_multiple:
raise ValueError(f"Query resulted in multiple ROWIDs for \
{', '.join([f'{k}={v}' for k, v in where.items()])}")
result = list()
for rowid, in rowids:
query_with_history = f"""
SELECT {'"'+select+'"' if select != '*' else select} FROM trace
WHERE ROWID <= ?
ORDER BY ROWID DESC
LIMIT ?
"""
cursor.execute(query_with_history, (rowid, history))
result += cursor.fetchall()[::-1]
return result
[docs]
def prune_candidates(self, candidates: list[asm.Codeline], mapping: dict[str, str]):
# 1. Gather attribute pairs and rank them
attributes = list()
for fault in self.fault_list:
if hasattr(fault, "fault_attributes"):
entry = {self.zoix2trace[k]: fault.fault_attributes[k] for k in self.zoix2trace.keys()}
if entry not in attributes:
attributes.append(entry)
# 2. Query the database for PC windows
# TODO: How to specify the column name of the trace? ask explicitly for PC?
pcs = list()
for entry in attributes:
try:
window = [pc for (pc,) in self.query_trace_db(select="PC", where=entry, history=4)]
except ValueError:
continue
if window not in pcs:
pcs.append(window)
# Flatten the list
pcs = [pc for window in pcs for pc in window]
# 3. Find the asm source and line numbers and filter out the candidates
removed = list()
for pc in pcs:
asm_file, lineno = addr2line(self.elf, pc)
if lineno in removed:
log.warning(f"Line {lineno} has already been removed. Skipping.")
if not asm_file:
log.warning(f"Program counter {pc} not found in {self.elf}")
if asm_file not in mapping:
log.warning(f"PC value {pc} maps to line {lineno} of {asm_file} which isn't in asm sources. Skipping.")
continue
before = len(candidates)
candidates[:] = list(filter(lambda entry: not ((entry[0] == mapping[asm_file]) and
(entry[1] == lineno - 1)),
candidates))
after = len(candidates)
if before != after:
removed.append(lineno)
[docs]
class A0(metaclass=Singleton):
"""Implements the A0 compaction algorithm"""
def __init__(self, isa: pathlib.Path, a0_asm_sources: list[str], a0_settings: dict[str, Any]) -> "A0":
log.debug(f"Generating AssemblyHandlers for {a0_asm_sources}")
a0_asm_sources = list(map(pathlib.Path, a0_asm_sources))
self.assembly_sources: list[asm.AssemblyHandler] = [asm.AssemblyHandler(asm.ISA(isa), asm_file,
chunksize=1)
for asm_file in a0_asm_sources]
# Flatten candidates list
self.all_instructions: list[asm.Codeline] = [(asm_id, codeline) for asm_id, asm in
enumerate(self.assembly_sources) for codeline in asm.get_code()]
self.path_to_id = {f"{v.stem}{v.suffix}": k for k, v in enumerate(a0_asm_sources)}
self.assembly_compilation_instructions: list[str] = a0_settings.get("assembly_compilation_instructions")
self.zoix_compilation_args: list[str] = a0_settings.get("vcs_compilation_instructions")
log.debug(f"VCS compilation instructions for HDL sources set to {self.zoix_compilation_args}")
self.zoix_lsim_args: list[str] = a0_settings.get("vcs_logic_simulation_instructions")
log.debug(f"VCS logic simulation instructions are {self.zoix_lsim_args}")
self.zoix_lsim_kwargs: dict[str, float | re.Pattern | int | list] = \
{k: v for k, v in a0_settings.get("vcs_logic_simulation_control").items()}
log.debug(f"VCS logic simulation control parameters are: {self.zoix_lsim_kwargs}")
self.zoix_fsim_args: list[str] = a0_settings.get("zoix_fault_simulation_instructions")
self.zoix_fsim_kwargs: dict[str, float] = \
{k: v for k, v in a0_settings.get("zoix_fault_simulation_control").items()}
self.fsim_report: zoix.TxtFaultReport = zoix.TxtFaultReport(pathlib.Path(a0_settings.get("fsim_report")))
log.debug(f"Z01X fault report is set to: {self.fsim_report}")
self.coverage_formula: str = a0_settings.get("coverage_formula")
log.debug(f"The coverage formula that will be used is: {self.coverage_formula}")
self.vc_zoix: zoix.ZoixInvoker = zoix.ZoixInvoker()
[docs]
@staticmethod
def evaluate(previous_result: tuple[int, float],
new_result: tuple[int, float]) -> bool:
"""
Evaluates the new results with respect to the previous ones.
Specifically, if new tat <= old tat and if new coverage >= old coverage.
Args:
previous_result (tuple[int, float]): the old tat value (int) and coverage (float) values.
new_result (tuple[int, float]): the new tat value (int) and coverage values.
Returns:
bool: ``True`` if new tat <= old tat and new coverage >= old coverage. ``False`` otherwise
"""
old_tat, old_coverage = previous_result
new_tat, new_coverage = new_result
return (new_tat <= old_tat) and (new_coverage >= old_coverage)
def _coverage(self, precision: int = 4) -> float:
"""
Args:
precision (int, optional): Specifies the precision of the coverage value when this is computed.
fault_status_attr (str, optional): Indicates which column of the header row of the faultlist.csv file is
specifying the fault status attribute of the fault. As of now the
default column value is "Status". If the user specifies otherwise in the
configuration file then this value shall be used instead.
Returns:
float: The fault coverage.
"""
coverage_formula = self.coverage_formula
return self.fsim_report.compute_coverage(requested_formula=coverage_formula, precision=precision)
[docs]
def pre_run(self) -> tuple[int, float]:
"""
Extracts the initial test application time and coverage of the STL.
The test application time is extracted by a logic simulation of the STL
whereas the coverage is computed by performing a fault simulation.
Returns:
tuple[int, float]: The test application time (index 0) and the
coverage of the STL (index 1)
Raises:
SystemExit: If HDL sources cannot be compiled (if specified) or
if logic simulation cannot be performed.
TimeoutError: if logic or fault simulation timed out.
"""
vc_zoix = self.vc_zoix
test_application_time = list()
compile_assembly(*self.assembly_compilation_instructions)
if self.zoix_compilation_args:
comp = vc_zoix.compile_sources(*self.zoix_compilation_args)
if comp == zoix.Compilation.ERROR:
log.critical("Unable to compile HDL sources!")
exit(1)
print("Initial logic simulation for TaT computation.")
try:
lsim = vc_zoix.logic_simulate(*self.zoix_lsim_args,
**self.zoix_lsim_kwargs,
tat_value=test_application_time)
except zoix.LogicSimulationException:
log.critical("Unable to perform logic simulation for TaT computation")
exit(1)
if lsim != zoix.LogicSimulation.SUCCESS:
log.critical("Error during initial logic simulation! Check the debug log!")
exit(1)
print("Initial fault simulation for coverage computation.")
fsim = vc_zoix.fault_simulate(*self.zoix_fsim_args, **self.zoix_fsim_kwargs)
if fsim != zoix.FaultSimulation.SUCCESS:
log.critical("Error during initial fault simulation! Check the debug log!")
exit(1)
coverage = self._coverage()
return (test_application_time.pop(), coverage)
[docs]
def run(self, initial_stl_stats: tuple[int, float], times_to_shuffle: int = 100) -> None:
"""
Main loop of the A0 algorithm
1. Removal of a random instruction
2. Cross-compilation
2.1 If FAIL, Restore
3. Logic simulation
3.1 If ERROR or TIMEOUT, Restore
4. Fault simulation
4.1 If ERROR or TIMEOUT, Restore
5. Evaluation
6. Goto 1.
Args:
initial_stl_stats (tuple[int, float]): The test application time (int) and coverage (float) of the original
STL
times_to_shuffle (int, optional): Number of times to permutate the assembly candidates. Defaults to 100.
Returns:
None
"""
def _restore(asm_source: int) -> None:
"""
Invokes the ``restore()`` function of a specific assembly handler.
"""
self.assembly_sources[asm_source].restore()
# To be used for generated file suffixes
unique_id = time.strftime("%d_%b_%H%M", time.gmtime())
# Step 1: Compute initial stats of the STL
initial_tat, initial_coverage = initial_stl_stats
log.debug(f"Initial coverage {initial_coverage}, TaT {initial_tat}")
# Z01X alias
vc_zoix = self.vc_zoix
# Statistics
stats_filename = f"a0_statistics_{unique_id}.csv"
stats = CSVCompactionStatistics(pathlib.Path(stats_filename))
# Keep a backup of all sources since
# they will be modified in-place.
zip_archive(f"../backup_{unique_id}", *[asm.get_asm_source() for asm in self.assembly_sources])
# Randomize order for Step 2
for _ in range(times_to_shuffle):
random.shuffle(self.all_instructions)
iteration_stats = dict.fromkeys(CSVCompactionStatistics._header)
iteration_stats["tat"] = initial_tat
iteration_stats["coverage"] = initial_coverage
total_iterations = len(self.all_instructions)
# Step 2: Select instructions in a random order
old_stl_stats = (initial_tat, initial_coverage)
while len(self.all_instructions) != 0:
print(f"""
#############
# ITERATION {total_iterations - len(self.all_instructions) + 1} / {total_iterations}
#############
""")
# Update statistics
if any(iteration_stats.values()):
stats += iteration_stats
iteration_stats = dict.fromkeys(CSVCompactionStatistics._header)
asm_id, codeline = self.all_instructions.pop(0)
asm_source_file = self.assembly_sources[asm_id].get_asm_source().name
iteration_stats["asm_source"] = asm_source_file
iteration_stats["removed_codeline"] = codeline
print(f"Removing {codeline} of assembly source {asm_source_file}")
# Step 3: Removal of the selected instruction
handler = self.assembly_sources[asm_id]
handler.remove(codeline)
# +-+-+-+ +-+-+-+-+-+-+-+
# |A|S|M| |C|O|M|P|I|L|E|
# +-+-+-+ +-+-+-+-+-+-+-+
print("\tCross-compiling assembly sources.")
asm_compilation = compile_assembly(*self.assembly_compilation_instructions)
if not asm_compilation:
print(f"\t{asm_source_file} does not compile after the removal of: {codeline}. Restoring!")
iteration_stats["compiles"] = "NO"
iteration_stats["verdict"] = "Restore"
_restore(asm_id)
continue
# +-+-+-+ +-+-+-+-+-+-+-+
# |V|C|S| |C|O|M|P|I|L|E|
# +-+-+-+ +-+-+-+-+-+-+-+
if self.zoix_compilation_args:
comp = vc_zoix.compile_sources(*self.zoix_compilation_args)
if comp == zoix.Compilation.ERROR:
log.critical("Unable to compile HDL sources!")
exit(1)
# +-+-+-+ +-+-+-+-+
# |V|C|S| |L|S|I|M|
# +-+-+-+ +-+-+-+-+
test_application_time = list()
try:
print("\tInitiating logic simulation.")
lsim = vc_zoix.logic_simulate(*self.zoix_lsim_args,
**self.zoix_lsim_kwargs,
tat_value=test_application_time)
except zoix.LogicSimulationException:
log.critical("Unable to perform logic simulation for TaT computation. Simulation status not set!")
exit(1)
if lsim != zoix.LogicSimulation.SUCCESS:
print(f"\tLogic simulation of {asm_source_file} resulted in {lsim.value} after removing {codeline}.")
print("\tRestoring.")
iteration_stats["compiles"] = "YES"
iteration_stats["lsim_ok"] = f"NO-{lsim.value}"
iteration_stats["verdict"] = "Restore"
_restore(asm_id)
continue
test_application_time = test_application_time.pop(0)
# +-+-+-+ +-+-+-+-+
# |V|C|S| |F|S|I|M|
# +-+-+-+ +-+-+-+-+
print("\tInitiating fault simulation.")
fsim = vc_zoix.fault_simulate(*self.zoix_fsim_args, **self.zoix_fsim_kwargs)
if fsim != zoix.FaultSimulation.SUCCESS:
print(f"\tFault simulation of {asm_source_file} resulted in a {fsim.value} after removing {codeline}.")
print("\tRestoring.")
iteration_stats["compiles"] = "YES"
iteration_stats["lsim_ok"] = "YES"
iteration_stats["tat"] = str(test_application_time)
iteration_stats["fsim_ok"] = f"NO-{fsim.value}"
iteration_stats["verdict"] = "Restore"
_restore(asm_id)
continue
print("\t\tComputing coverage.")
coverage = self._coverage()
new_stl_stats = (test_application_time, coverage)
iteration_stats["compiles"] = "YES"
iteration_stats["lsim_ok"] = "YES"
iteration_stats["tat"] = str(test_application_time)
iteration_stats["fsim_ok"] = "YES"
iteration_stats["coverage"] = str(coverage)
# Step 4: Coverage and TaT evaluation. Wrt
# the paper the evaluation happens on the
# coverage i.e., new >= old rather than the
# comparisson of the set of detected faults
if self.evaluate(old_stl_stats, new_stl_stats):
print(f"\tSTL has better stats than before!\n\t\tOld TaT: \
{old_stl_stats[0]} | Old Coverage: {old_stl_stats[1]}\n\t\tNew TaT: \
{new_stl_stats[0]} | New Coverage: {new_stl_stats[1]}\n\tProceeding!")
old_stl_stats = new_stl_stats
iteration_stats["verdict"] = "Proceed"
else:
print(f"\tSTL has worse stats than before!\n\t\tOld TaT: \
{old_stl_stats[0]} | Old Coverage: {old_stl_stats[1]}\n\t\tNew TaT: \
{new_stl_stats[0]} | New Coverage: {new_stl_stats[1]}\n\tRestoring!")
iteration_stats["verdict"] = "Restore"
_restore(asm_id)
[docs]
def post_run(self) -> None:
""" Cleanup any VC-Z01X stopped processes """
reap_process_tree(os.getpid())