Source code for tritondse.trace

# built-in imports
import json
import os
import subprocess
from pathlib import Path
import time
from typing import List, Optional, Union
from collections import Counter


# local imports
import tritondse # NOTE We need this import so we can use it to determine the path of this file.
from tritondse import Config, Program, SymbolicExecutor, CoverageStrategy, CoverageSingleRun
import tritondse.logging

logger = tritondse.logging.get("tracer")


[docs] class TraceException(Exception): pass
[docs] class Trace: def __init__(self): pass
[docs] @staticmethod def run(strategy: CoverageStrategy, binary_path: str, args: List[str], output_path: str, dump_trace: bool = False, stdin_file=None) -> bool: """Run the binary passed as argument and return the coverage. :param strategy: Coverage strategy. :type strategy: :py:obj:`CoverageStrategy`. :param binary_path: Path to the binary. :type binary_path: :py:obj:`str`. :param args: List of arguments to pass to the binary. :type args: :py:obj:`List[str]`. :type output_path: File where to store trace :param dump_trace: Enable gather the trace :param stdin_file: Path to the file that will act as stdin. :type args: :py:obj:`str`. """ raise NotImplementedError()
@property def trace(self) -> List[int]: raise NotImplementedError()
[docs] @staticmethod def from_file(file: Union[str, Path]) -> 'QBDITrace': raise NotImplementedError()
@property def coverage(self) -> CoverageSingleRun: """ Coverage generated by the trace :return: CoverageSingleRun object """ raise NotImplementedError()
[docs] def get_coverage(self) -> CoverageSingleRun: """Return the execution coverage. :return: :py:obj:`CoverageSingleRun`. """ return self.coverage
@property def strategy(self) -> CoverageStrategy: """ Return the coverage strategy with which this trace was generated with. :return: :py:obj:`CoverageStrategy` """ return self.coverage.strategy
[docs] class TritonTrace(Trace): def __init__(self): super().__init__() self._coverage = None
[docs] @staticmethod def run(strategy: CoverageStrategy, binary_path: str, args: List[str], output_path: str, dump_trace: bool = False, stdin_file=None) -> bool: # Override stdin with the input file. if stdin_file: os.dup2(os.open(stdin_file, os.O_RDONLY), 0) config = Config(coverage_strategy=strategy) se = SymbolicExecutor(config) se.load(Program(binary_path)) se.run() trace = TritonTrace() trace._coverage = se.coverage
# FIXME: Writing the coverage to a file
[docs] @staticmethod def from_file(file: Union[str, Path]) -> 'QBDITrace': # FIXME: Reading coverage file from a file return trace
@property def coverage(self) -> CoverageSingleRun: return self._coverage
[docs] class QBDITrace(Trace): QBDI_SCRIPT_FILEPATH = Path(tritondse.__file__).parent / 'qbdi_trace.py' def __init__(self): super().__init__() self._coverage = None self._trace = None self.modules = {}
[docs] @staticmethod def run(strategy: CoverageStrategy, binary_path: str, args: List[str], output_path: str, dump_trace: bool = False, stdin_file=None, timeout=None, cwd=None) -> bool: if not Path(binary_path).exists(): raise FileNotFoundError() if stdin_file and not Path(stdin_file).exists(): raise FileNotFoundError() args = [] if not args else args cmdlne = f'timeout {timeout} python -m pyqbdipreload {QBDITrace.QBDI_SCRIPT_FILEPATH}'.split(' ') + [binary_path] + args cmdlne = " ".join(cmdlne) logger.debug(f'Command line: {cmdlne}') # Set environment variables. environ = { 'PYQBDIPRELOAD_COVERAGE_STRATEGY': strategy.name, 'PYQBDIPRELOAD_OUTPUT_FILEPATH': output_path, 'PYQBDIPRELOAD_DUMP_TRACE': str(dump_trace), 'LD_BIND_NOW': '1', } environ.update(os.environ) # Open stdin file if it is present. stdin_fp = open(stdin_file, 'rb') if stdin_file else None # Run QBDI tool. process = subprocess.Popen(cmdlne, shell=True, stdin=stdin_fp, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, cwd=cwd, env=environ) try: stdout, stderr = process.communicate(timeout=timeout) # for line in stdout.split(b"\n"): # logger.debug(f"stdout: {line}") # for line in stderr.split(b"\n"): # logger.debug(f"stdout: {line}") except subprocess.TimeoutExpired: process.wait() # logger.warning('QBDI tracer timeout expired!') raise TraceException('QBDI tracer timeout expired') if stdin_fp: stdin_fp.close() return Path(output_path).exists()
[docs] @staticmethod def from_file(coverage_path: str) -> 'QBDITrace': """Load coverage from a file. :param coverage_path: Path to the coverage file. :type coverage_path: :py:obj:`str`. """ trace = QBDITrace() logger.debug(f'Loading coverage file: {coverage_path}') with open(coverage_path, 'rb') as fd: data = json.load(fd) cov = CoverageSingleRun(CoverageStrategy[data["coverage_strategy"]]) cov.covered_instructions = Counter({int(k): v for k, v in data["covered_instructions"].items()}) for (src, dst, not_taken) in data["covered_items"]: if not_taken is None: cov.add_covered_dynamic_branch(src, dst) else: cov.add_covered_branch(src, dst, not_taken) trace._coverage = cov trace._trace = data['trace'] trace.modules = data['modules_base'] return trace
@property def coverage(self) -> CoverageSingleRun: """ CoverageSingleRun associated with the trace. :return: coverage object """ if not self._coverage: logger.warning("Please .run() the trace before querying coverage") return self._coverage @property def trace(self) -> List[int]: """ List of addresses executed. :return: list of addresses """ return self._trace
if __name__ == "__main__": import sys if len(sys.argv) < 2: print("Usage: trace.py program [args]") sys.exit(1) tritondse.logging.enable() if QBDITrace.run(CoverageStrategy.EDGE, sys.argv[1], sys.argv[2:], "/tmp/test.cov", dump_trace=False): coverage = QBDITrace.from_file("/tmp/test.cov") else: print("Something went wrong during trace generation")