Source code for tritondse.coverage

# built-in imports
from __future__ import annotations
import hashlib
import struct
from pathlib import Path
from typing import List, Generator, Tuple, Set, Union, Dict, Optional
from collections import Counter
from enum import IntFlag, Enum, auto
import pickle
import enum_tools.documentation

# third-party imports
from triton import AST_NODE

# local imports
from tritondse.types import Addr, PathConstraint, PathBranch, SolverStatus, PathHash, Edge, SymExType
import tritondse.logging

logger = tritondse.logging.get('coverage')

CovItem = Union[Addr, Edge, PathHash, Tuple[PathHash, Edge]]
"""
Variant type representing a coverage item.
It can be:

* an address :py:obj:`tritondse.types.Addr` for block coverage
* an edge :py:obj:`tritondse.types.Edge` for edge coverage
* a string :py:obj:`tritondse.types.PathHash` for path coverage
* a tuple of both a Pathhash and an edge
"""


[docs] @enum_tools.documentation.document_enum class CoverageStrategy(str, Enum): """ Coverage strategy (metric) enum. This enum will change whether a given branch have to be solved or not. """ BLOCK = "block" # doc: block coverage, only tracks new basic blocks covered EDGE = "edge" # doc: edge coverage, tracks CFGs edges covered PATH = "path" # doc: tracks any new path covered PREFIXED_EDGE = "PREFIXED_EDGE" # doc: edge coverage but also taking in account path prefix
[docs] @enum_tools.documentation.document_enum class BranchSolvingStrategy(IntFlag): """ Branch strategy enumerate. It defines the manner with which branches are checked with SMT on a single trace, namely a :py:obj:`CoverageSingleRun`. For a given branch that has not been covered strategies are: * ``ALL_NOT_COVERED``: check by SMT all occurrences * ``FIRST_LAST_NOT_COVERED``: check only the first and last occurrence in the trace """ ALL_NOT_COVERED = auto() # doc: check by SMT all occurrences of a given branch (true by default) FIRST_LAST_NOT_COVERED = auto() # doc: check by SMT the first and last occurrence of a given branch UNSAT_ONCE = auto() # doc: if a branch is UNSAT do not try solving it again TIMEOUT_ONCE = auto() # doc: if a branch is TIMEOUT do not try solving it again TIMEOUT_ALWAYS = auto() # doc: always try solving again a TIMEOUT branch (incompatible with :py:enum:mem:`TIMEOUT_ONCE`) COVER_SYM_DYNJUMP = auto() # doc: try covering dynamic jumps on a symbolic register or memory value COVER_SYM_READ = auto() # doc: try enumerating values for symbolic reads COVER_SYM_WRITE = auto() # doc: try enumerating values for symbolic writes SOUND_MEM_ACCESS = auto() # doc: enables adding a constraint when using a symbolic read/write or jump MANUAL = auto() # doc: disable automatic branch solving after an execution (has to be done manually in callbacks)
[docs] class CoverageSingleRun(object): """ Coverage produced by a **Single Execution** Depending on the strategy given to the constructor it stores different data. """ def __init__(self, strategy: CoverageStrategy): """ :param strategy: Strategy to employ :type strategy: CoverageStrategy """ self.strategy: CoverageStrategy = strategy #: Coverage strategy # For instruction coverage self.covered_instructions: Dict[Addr, int] = Counter() """ Instruction coverage. Counter for code coverage) """ self.covered_items: Dict[CovItem, int] = Counter() """ Stores covered items whatever they are """ self.not_covered_items: Set[CovItem] = set() self._not_covered_items_mirror: Dict[CovItem, List[str]] = {} # solely used for prefixed-edge """ CovItems not covered in the trace. It thus represent what can be covered by the trace (input). We call it coverage objectives.""" # For path coverage self._current_path: List[Addr] = [] """ List of addresses forming the path currently being taken """ self._current_path_hash = hashlib.md5()
[docs] def add_covered_address(self, address: Addr) -> None: """ Add an instruction address covered. *(Called by :py:obj:`SymbolicExecutor` for each instruction executed)* :param address: The address of the instruction :type address: :py:obj:`tritondse.types.Addr` """ self.covered_instructions[address] += 1
[docs] def add_covered_dynamic_branch(self, source: Addr, target: Addr) -> None: """ Add a dynamic branch covered. The branch will be encoded according to the coverage strategy. :param source: Address of the dynamic jump :param target: Target address on which the jump is performed :return: """ if self.strategy == CoverageStrategy.BLOCK: pass # Target address will be covered anyway if self.strategy == CoverageStrategy.EDGE: self.covered_items[(source, target)] += 1 self.not_covered_items.discard((source, target)) # Remove it from non-taken if it was inside if self.strategy == CoverageStrategy.PATH: self._current_path.append(target) self._current_path_hash.update(struct.pack("<Q", target)) self.covered_items[self._current_path_hash.hexdigest()] += 1 if self.strategy == CoverageStrategy.PREFIXED_EDGE: # Add covered as covered self.covered_items[("", (source, target))] += 1 # update the current path hash etc self._current_path.append(target) self._current_path_hash.update(struct.pack("<Q", target))
[docs] def add_covered_branch(self, program_counter: Addr, taken_addr: Addr, not_taken_addr: Addr) -> None: """ Add a branch to our covered branches list. Each branch is encoded according to the coverage strategy. For code coverage, the branch encoding is the address of the instruction. For edge coverage, the branch encoding is the tupe (src address, dst address). For path coverage, the branch encoding is the MD5 of the conjunction of all taken branch addresses. :param program_counter: The address taken in by the branch :type program_counter: :py:obj:`tritondse.types.Addr` :param taken_addr: Target address of branch taken :type taken_addr: Addr :param not_taken_addr: Target address of branch **not** taken :type not_taken_addr: Addr """ if self.strategy == CoverageStrategy.BLOCK: self.covered_items[taken_addr] += 1 self.not_covered_items.discard(taken_addr) # remove address from non-covered if inside if not_taken_addr not in self.covered_items: # Keep the address that has not been covered (and could have) self.not_covered_items.add(not_taken_addr) if self.strategy == CoverageStrategy.EDGE: taken_tuple, not_taken_tuple = (program_counter, taken_addr), (program_counter, not_taken_addr) self.covered_items[taken_tuple] += 1 self.not_covered_items.discard(taken_tuple) # Remove it from non-taken if it was inside if not_taken_tuple not in self.covered_items: # Add the not taken tuple in non-covered self.not_covered_items.add(not_taken_tuple) if self.strategy == CoverageStrategy.PATH: self._current_path.append(taken_addr) # Compute the hash of the not taken path and add it to non-covered paths not_taken_path_hash = self._current_path_hash.copy() not_taken_path_hash.update(struct.pack('<Q', not_taken_addr)) self.not_covered_items.add(not_taken_path_hash.hexdigest()) # Update the current path hash and add it to hashes self._current_path_hash.update(struct.pack("<Q", taken_addr)) self.covered_items[self._current_path_hash.hexdigest()] += 1 if self.strategy == CoverageStrategy.PREFIXED_EDGE: taken_tuple, not_taken_tuple = (program_counter, taken_addr), (program_counter, not_taken_addr) _, not_taken = (self._current_path_hash.hexdigest(), taken_tuple), (self._current_path_hash.hexdigest(), not_taken_tuple) gtaken, gnot_taken = ("", taken_tuple), ("", not_taken_tuple) # Add covered as covered self.covered_items[gtaken] += 1 # Find all items in not_covered that have this edge if taken_tuple in self._not_covered_items_mirror: # if a not_covered match this edge for prefix in self._not_covered_items_mirror[taken_tuple]: # iterate all the prefixes self.not_covered_items.discard((prefix, taken_tuple)) # and discard them self._not_covered_items_mirror.pop(taken_tuple) # finally discard the entry # look if not_taken edge not in covered if gnot_taken not in self.covered_items: self.not_covered_items.add(not_taken) if not_taken[1] not in self._not_covered_items_mirror: self._not_covered_items_mirror[not_taken[1]] = [not_taken[0]] else: self._not_covered_items_mirror[not_taken[1]].append(not_taken[0]) # update the current path hash etc self._current_path.append(taken_addr) self._current_path_hash.update(struct.pack("<Q", taken_addr))
@property def unique_instruction_covered(self) -> int: """ :return: The number of unique instructions covered """ return len(self.covered_instructions) @property def unique_covitem_covered(self) -> int: """ :return: The number of unique edges covered """ return len(self.covered_items) @property def total_instruction_executed(self) -> int: """ :return: The number of total instruction executed """ return sum(self.covered_instructions.values())
[docs] def post_execution(self) -> None: """ Function is called after each execution for post-processing or clean-up. *(Not doing anything at the moment)* """ pass
[docs] def is_covered(self, item: CovItem) -> bool: """ Return whether the item has been covered or not. **The item should match the strategy** :param item: An address, an edge or a path :type item: CovItem :return: bool """ if self.strategy == CoverageStrategy.PREFIXED_EDGE: try: return ('', item[1]) in self.covered_items except TypeError: # in case of ellipsis return False else: return item in self.covered_items
[docs] def pp_item(self, covitem: CovItem) -> str: """ Pretty print a CovItem according the coverage strategy :param covitem: An address, an edge or a path :return: str """ if self.strategy == CoverageStrategy.BLOCK: return f"0x{covitem:08x}" elif self.strategy == CoverageStrategy.EDGE: return f"(0x{covitem[0]:08x}-0x{covitem[1]:08x})" elif self.strategy == CoverageStrategy.PATH: return covitem[:10] # already a hash str elif self.strategy == CoverageStrategy.PREFIXED_EDGE: return f"({covitem[0][:6]}_0x{covitem[1][0]:08x}-0x{covitem[1][1]:08x})"
[docs] def difference(self, other: CoverageSingleRun) -> Set[CovItem]: if self.strategy == other.strategy: return self.covered_items.keys() - other.covered_items.keys() else: logger.error("Trying to make difference of coverage with different strategies") return set()
def __sub__(self, other) -> Set[CovItem]: return self.difference(other)
[docs] class GlobalCoverage(CoverageSingleRun): """ Global Coverage. Represent the overall coverage of the exploration. It is filled by iteratively call merge with the :py:obj:`CoverageSingleRun` objects created during exploration. """ COVERAGE_FILE = "coverage.json" def __init__(self, strategy: CoverageStrategy, branch_strategy: BranchSolvingStrategy): """ :param strategy: Coverage strategy to use :type strategy: CoverageStrategy :param branch_strategy: Branch checking strategies :type branch_strategy: BranchSolvingStrategy """ super().__init__(strategy) self.branch_strategy = branch_strategy # Keep pending items to be covered (code, edge, path) self.pending_coverage: Set[CovItem] = set() """ Set of pending coverage items. These are items for which a branch as already been solved and """ self.uncoverable_items: Dict[CovItem, SolverStatus] = {} """ CovItems that are determined not to be coverable. """ self.covered_symbolic_pointers: Set[Addr] = set() """ Set of addresses for which pointers have been enumerated """
[docs] def iter_new_paths(self, path_constraints: List[PathConstraint]) -> Generator[Tuple[SymExType, List[PathConstraint], PathBranch, CovItem, int], Optional[SolverStatus], None]: """ The function iterate the given path predicate and yield PatchConstraint to consider as-is and PathBranch representing the new branch to take. It acts as a black-box so that the SeedManager does not have to know what strategy is being used under the hood. From an implementation perspective the goal of the function is to manipulate the path WITHOUT doing any SMT related things. :param path_constraints: list of path constraint to iterate :return: generator of path constraint and branches to solve. The first tuple item is a list of PathConstraint to add in the path predicate and the second is the branch to solve (but not to keep in path predicate) """ if BranchSolvingStrategy.MANUAL in self.branch_strategy: logger.info(f'Branch solving strategy set to MANUAL.') return pending_csts = [] current_hash = hashlib.md5() # Current path hash for PATH coverage # NOTE: When we arrive here the CoverageSingleRun associated with the path_constraints # has already been merge. Thus covered, pending etc. do include ones of the CoverageSingleRuns not_covered_items = self._get_items_trace(path_constraints) # Map of CovItem -> [idx1, idx2, ..., .] (occurrence in list) # is_ok_with_branch_strategy = lambda covitem, idx: True if self.strategy == CoverageStrategy.PATH else (idx in occurrence_map[covitem]) # Re-iterate through all path constraints to solve them concretely (with knowledge of what is beyond in the trace) for i, pc in enumerate(path_constraints): if pc.isMultipleBranches(): # If there is a condition for branch in pc.getBranchConstraints(): # Get all branches # Get the constraint of the branch which has not been taken. if not branch['isTaken']: covitem = self._get_covitem(current_hash, branch) generic_covitem = ('', covitem[1]) if self.strategy == CoverageStrategy.PREFIXED_EDGE else covitem # print(f"Covitem: {covitem}: {covitem not in self.covered_items} | {covitem not in self.pending_coverage} | {covitem not in self.uncoverable_items} | {i in not_covered_items.get(covitem, [])} | {i} | {not_covered_items.get(covitem)}") # Not covered in: previous runs | yet to be covered by a seed already SAT | not uncoverable | parts of items to solve if generic_covitem not in self.covered_items and \ generic_covitem not in self.pending_coverage and \ covitem not in self.uncoverable_items and \ i in not_covered_items.get(covitem, []): # Send the branch to solve to the function iterating res = yield SymExType.CONDITIONAL_JMP, pending_csts, branch, covitem, i # If path SAT add it to pending coverage if res == SolverStatus.SAT: self.pending_coverage.add(generic_covitem) elif res == SolverStatus.UNSAT: if BranchSolvingStrategy.UNSAT_ONCE in self.branch_strategy: self.uncoverable_items[covitem] = res elif self.strategy in [CoverageStrategy.PATH, CoverageStrategy.PREFIXED_EDGE]: self.uncoverable_items[covitem] = res # paths, and prefixed-edge ensure to be unique thus drop if unsat elif res == SolverStatus.TIMEOUT: if BranchSolvingStrategy.TIMEOUT_ONCE in self.branch_strategy: self.uncoverable_items[covitem] = res elif res == SolverStatus.UNKNOWN: pass else: # status == None logger.debug(f'Branch skipped!') pending_csts = [] # reset pending constraint added else: pass # Branch was taken do nothing # Add it the path predicate constraints and update current path hash pending_csts.append(pc) current_hash.update(struct.pack("<Q", pc.getTakenAddress())) else: cmt = pc.getComment() if (cmt.startswith("dyn-jmp") and BranchSolvingStrategy.COVER_SYM_DYNJUMP in self.branch_strategy) or \ (cmt.startswith("sym-read") and BranchSolvingStrategy.COVER_SYM_READ in self.branch_strategy) or \ (cmt.startswith("sym-write") and BranchSolvingStrategy.COVER_SYM_WRITE in self.branch_strategy): typ, offset, addr = cmt.split(":") typ = SymExType(typ) offset, addr = int(offset), int(addr) if addr not in self.covered_symbolic_pointers: # if the address pointer has never been covered pred = pc.getTakenPredicate() if pred.getType() == AST_NODE.EQUAL: p1, p2 = pred.getChildren() if p2.getType() == AST_NODE.BV: logger.info(f"Try to enumerate value {offset}:0x{addr:02x}: {p1}") res = yield typ, pending_csts, p1, (addr, p2.evaluate()), i self.covered_symbolic_pointers.add(addr) # add the pointer in covered regardless of result else: logger.warning(f"memory constraint unexpected pattern: {pred}") else: logger.warning(f"memory constraint unexpected pattern: {pred}") if BranchSolvingStrategy.SOUND_MEM_ACCESS in self.branch_strategy: pending_csts.append(pc) # if sound add the mem dereference as a constraint in path predicate # NOTE: in both case the branch is not taken in account in the current_path_hash else: # Routines, or user-defined constraints thus add it all the time. pending_csts.append(pc)
def _get_covitem(self, path_hash, branch: PathBranch) -> CovItem: src, dst = branch['srcAddr'], branch['dstAddr'] # Check if the target is new with regard to the strategy if self.strategy == CoverageStrategy.BLOCK: return dst elif self.strategy == CoverageStrategy.EDGE: return src, dst elif self.strategy == CoverageStrategy.PATH: # Have to fork the hash of the current pc for each branch we want to revert forked_hash = path_hash.copy() forked_hash.update(struct.pack("<Q", dst)) return forked_hash.hexdigest() elif self.strategy == CoverageStrategy.PREFIXED_EDGE: return path_hash.hexdigest(), (src, dst) else: assert False def _get_items_trace(self, path_constraints: List[PathConstraint]) -> Dict[CovItem, List[int]]: """ Iterate the all trace and retrieve all covered and not covered CovItem. For non-covered one it filters instances to check. """ not_covered = {} current_hash = hashlib.md5() # Current path hash for PATH coverage for i, pc in enumerate(path_constraints): if pc.isMultipleBranches(): # If there is a condition for branch in pc.getBranchConstraints(): # Get all branches if not branch['isTaken']: covitem = self._get_covitem(current_hash, branch) if covitem in not_covered: not_covered[covitem].append(i) else: not_covered[covitem] = [i] current_hash.update(struct.pack("<Q", pc.getTakenAddress())) # compute current path hash along the way else: pass # Ignore all other dynamic constraints in path computation # Now filter the map according to the branch solving strategy if BranchSolvingStrategy.FIRST_LAST_NOT_COVERED in self.branch_strategy: if self.strategy == CoverageStrategy.PREFIXED_EDGE: # Black magic m = {("", e): [] for h, e in not_covered.keys()} # Map: ("", edge) -> [] for (h, e), v in not_covered.items(): # fill map with all occurrences edges regardless of path m[("", e)].extend(v) for k in m.keys(): # iterate the result and only keep min and max occurrence idxs = m[k] if len(idxs) > 2: m[k] = [min(idxs), max(idxs)] for k in not_covered.keys(): # Push back resulting list in not_covered items not_covered[k] = m[('', k[1])] else: # Straightforward for k in not_covered.keys(): lst = not_covered[k] if len(lst) > 2: not_covered[k] = [lst[0], lst[-1]] # Only keep first and last iteration else: # ALL_NOT_COVERED pass # Keep all occurrences return not_covered
[docs] def merge(self, other: CoverageSingleRun) -> None: """ Merge a CoverageSingeRun instance into this instance :param other: The CoverageSingleRun to merge into self :type other: CoverageSingleRun """ assert self.strategy == other.strategy # Update instruction coverage for code coverage (in all cases keep code coverage) self.covered_instructions.update(other.covered_instructions) # Remove covered items from pending ones self.pending_coverage.difference_update(other.covered_items) # Update covered items self.covered_items.update(other.covered_items) # Update non-covered ones if self.strategy == CoverageStrategy.PREFIXED_EDGE: # More complex as not_covered as covitem: (hash, edge) while covered has covitems: ("", edge) # remove self not covered that are now covered for _, edge in other.covered_items.keys(): if edge in self._not_covered_items_mirror: for prefix in self._not_covered_items_mirror[edge]: # iterate all the prefixes self.not_covered_items.discard((prefix, edge)) # and discard them self._not_covered_items_mirror.pop(edge) # finally discard the entry # Only add other not covered if still not covered for prefix, edge in other.not_covered_items: if ("", edge) not in self.covered_items: self.not_covered_items.add((prefix, edge)) if edge not in self._not_covered_items_mirror: self._not_covered_items_mirror[edge] = [prefix] else: self._not_covered_items_mirror[edge].append(prefix) else: # Straightforward set difference self.not_covered_items.update(other.not_covered_items - self.covered_items.keys())
[docs] def can_improve_coverage(self, other: CoverageSingleRun) -> bool: """ Check if some of the non-covered are not already in the global coverage Used to know if an input is relevant to keep or not :param other: The CoverageSingleRun to check against our global coverage state :return: bool """ return bool(self.new_items_to_cover(other))
[docs] def can_cover_symbolic_pointers(self, execution: 'SymbolicExecutor') -> bool: """ Determines if this execution has symbolic memory accesses to enumerate. If so we may want to enumerate them even though """ path_constraints = execution.pstate.get_path_constraints() for pc in path_constraints: if not pc.isMultipleBranches(): # If there isn't a condition i.e. it's a sym ptr access cmt = pc.getComment() if (cmt.startswith("dyn-jmp") and BranchSolvingStrategy.COVER_SYM_DYNJUMP in self.branch_strategy) or \ (cmt.startswith("sym-read") and BranchSolvingStrategy.COVER_SYM_READ in self.branch_strategy) or \ (cmt.startswith("sym-write") and BranchSolvingStrategy.COVER_SYM_WRITE in self.branch_strategy): typ, offset, addr = cmt.split(":") typ = SymExType(typ) offset, addr = int(offset), int(addr) if addr not in self.covered_symbolic_pointers: # if the address pointer has never been covered return True return False
[docs] def new_items_to_cover(self, other: CoverageSingleRun) -> Set[CovItem]: """ Return all coverage items (addresses, edges, paths) that the given CoverageSingleRun can cover if it is possible to negate their branches :param other: The CoverageSingleRun to check with our global coverage state :return: A set of CovItem """ assert self.strategy == other.strategy # Take not covered_items (potential candidates) subtract already covered items, uncoverable and pending ones. # Resulting covitem are really new ones that the trace brings return other.not_covered_items - self.covered_items.keys() - self.uncoverable_items.keys() - self.pending_coverage
[docs] def improve_coverage(self, other: CoverageSingleRun) -> bool: """ Checks if the given object do cover new covitem than the current coverage. More concretely it performs the difference between the two covered dicts. If ``other`` contains new items return True. :param other: coverage on which to check coverage :return: Whether the coverage covers new items """ return bool(other.covered_items.keys() - self.covered_items.keys())
[docs] @staticmethod def from_file(file: Union[str, Path]) -> 'GlobalCoverage': with open(file, "rb") as f: obj = pickle.load(f) return obj
[docs] def to_file(self, file: Union[str, Path]) -> None: copy = self._current_path_hash self._current_path_hash = None with open(file, "wb") as f: pickle.dump(self, f) self._current_path_hash = copy
[docs] def post_exploration(self, workspace: 'Workspace') -> None: """ Function called at the very end of the exploration. It saves the coverage in the workspace. :param workspace: Workspace in which to save coverage :type workspace: Workspace """ # Save the coverage self.to_file(workspace.get_metadata_file_path(self.COVERAGE_FILE))
[docs] def clone(self) -> 'GlobalCoverage': cov2 = GlobalCoverage(self.strategy, self.branch_strategy) # Copy items from the CoverageSingleRun cov2.covered_instructions = Counter({k: v for k, v in self.covered_instructions.items()}) cov2.covered_items = Counter({k: v for k, v in self.covered_items.items()}) cov2.not_covered_items = {x for x in self.not_covered_items} cov2._not_covered_items_mirror = {k: v for k, v in self._not_covered_items_mirror.items()} cov2._current_path = self._current_path[:] self._current_path: List[Addr] = [] self._current_path_hash = self._current_path_hash.copy() # Copy items from the global coverage cov2.pending_coverage = {x for x in self.pending_coverage} cov2.uncoverable_items = {k: v for k, v in self.uncoverable_items.items()} cov2.covered_symbolic_pointers = {x for x in self.covered_symbolic_pointers} return cov2