import time
import threading
import gc
from enum import Enum
from typing import Union, Type, Optional
import stat
import enum_tools.documentation
from tritondse.config import Config
from tritondse.process_state import ProcessState
from tritondse.loaders.loader import Loader
from tritondse.seed import Seed
from tritondse.symbolic_executor import SymbolicExecutor
from tritondse.workspace import Workspace
from tritondse.coverage import GlobalCoverage
from tritondse.types import Addr
from tritondse.exception import StopExplorationException
from tritondse.seed_manager import SeedManager
from tritondse.seed_scheduler import SeedScheduler
from tritondse.callbacks import CallbackManager
import tritondse.logging
logger = tritondse.logging.get("explorator")
[docs]
@enum_tools.documentation.document_enum
class ExplorationStatus(Enum):
""" Enum representing the current state of the exploration """
NOT_RUNNING = 0 # doc: The explorator has not been started yet
RUNNING = 1 # doc: The explorator is running and performing exploration
IDLE = 2 # doc: The explorator is idle, it has run, but is out of inputs to process
STOPPED = 3 # doc: The explorator has been stopped by the user
TERMINATED = 4 # doc: The explorator has stopped normally, reached its goal (user-defined)
[docs]
class SymbolicExplorator(object):
"""
Symbolic Exploration. This class is in charge of iterating
executions with the different seeds available in the workspace
and generated along the way.
"""
def __init__(self, config: Config, loader: Loader = None, workspace: Workspace = None, executor_stop_at: Addr = None, seed_scheduler_class: Type[SeedScheduler] = None):
self.loader: Loader = loader #: Program being analyzed
self.config: Config = config #: Configuration file
self.cbm: CallbackManager = CallbackManager() #: CallbackManager to register callbacks
self._stop = False
self.ts: float = time.time() #: Timestamp (object instantiation)
self.uid_counter: int = 0
self.status: ExplorationStatus = ExplorationStatus.NOT_RUNNING #: status of the execution
self._executor_stop_at = executor_stop_at
# Initialize the workspace
if workspace:
self.workspace: Workspace = workspace #: exploration workspace
else:
self.workspace: Workspace = Workspace(self.config.workspace) #: workspace object
self.workspace.initialize(flush=self.config.workspace_reset)
# Save the configuration in the workspace
self.workspace.save_file("config.json", self.config.to_json())
# Save the binary in the workspace if not already done
if self.loader:
bin_path = self.workspace.get_binary_directory() / self.loader.bin_path.name
if not bin_path.exists(): # If the program is not yet present
self.workspace.save_file(bin_path, self.loader.bin_path.read_bytes())
self.loader.bin_path = bin_path # Patch its official new location
bin_path.chmod(stat.S_IRWXU) # Make it executable
# Configure logfile in workspace
tritondse.logging.enable_to_file(logger.level, self.workspace.logfile_path)
# Initialize coverage
self.coverage: GlobalCoverage = GlobalCoverage(self.config.coverage_strategy, self.config.branch_solving_strategy)
""" GlobalCoverage object holding information about the global coverage.
*(not really meant to be manipulated by the user)*
"""
# Load workspace global coverage if any
cov = self.workspace.get_metadata_file_path(GlobalCoverage.COVERAGE_FILE)
if cov.exists():
self.coverage = GlobalCoverage.from_file(cov)
# Initialize the seed manager
self.seeds_manager: SeedManager = SeedManager(self.coverage, self.workspace, self.config.smt_queries_limit, callback_manager=self.cbm, seed_scheduler_class=seed_scheduler_class)
""" Manager of seed, holding all seeds related data and various statistics """
# running executors (for debugging purposes)
self.current_executor: Optional[SymbolicExecutor] = None #: last symbolic executor executed
# General purpose attributes
self._exec_count = 0
self._total_emulation_time = 0
@property
def total_emulation_time(self) -> float:
""" Represent total emulation time. This includes all callbacks execution
but not the SMT solving time (performed at the end). """
return self._total_emulation_time
@property
def callback_manager(self) -> CallbackManager:
"""
CallbackManager global instance that will be transmitted to
all :py:obj:`SymbolicExecutor`.
:rtype: CallbackManager
"""
return self.cbm
@property
def execution_count(self) -> int:
"""
Get the number of execution performed.
:return: number of execution performed
:rtype: int
"""
return self._exec_count
def __time_delta(self):
return time.time() - self.ts
def _worker(self, seed, uid):
""" Worker thread """
logger.info(f'Pick-up seed: {seed.filename} (fresh: {seed.is_fresh()})')
if self.config.exploration_timeout and self.__time_delta() >= self.config.exploration_timeout:
logger.info('Exploration timeout')
self.stop_exploration()
return
# Execute the binary with seeds
cbs = None if self.cbm.is_empty() else self.cbm.fork()
logger.info(f"Initialize ProcessState with thread scheduling: {self.config.thread_scheduling}")
execution = SymbolicExecutor(self.config, seed=seed, workspace=self.workspace, uid=uid, callbacks=cbs)
if self.loader: # If doing the exploration from a program
execution.load(self.loader)
else:
execution.load_process(ProcessState())
# self.current_executor = execution
# increment exec_count
self._exec_count += 1
ts = time.time()
try:
execution.run(self._executor_stop_at)
expl_ts = time.time() - ts
except StopExplorationException:
expl_ts = time.time() - ts
logger.info("Exploration interrupted (coverage not integrated)")
self.stop_exploration()
if self.config.exploration_limit and (uid+1) >= self.config.exploration_limit:
logger.info('Exploration limit reached')
self.stop_exploration()
# Some analysis in post execution
solve_time = self.seeds_manager.post_execution(execution, seed, not self._stop)
self._total_emulation_time += expl_ts
logger.info(f"Emulation: {self._fmt_secs(expl_ts)} | Solving: {self._fmt_secs(solve_time)} | Elapsed: {self._fmt_secs(self.__time_delta())}\n")
[docs]
def step(self) -> None:
"""
Perform a single exploration step. That means it execute
a single :py:obj:`SymbolicExecutor`. Then it gives the hand
back to the user.
"""
# Take an input
seed = self.seeds_manager.pick_seed()
# If we don't have any new seed to process just switch exploration to idle
if seed is None:
logger.info("worklist of seed to process is empty")
self.status = ExplorationStatus.IDLE
return
# Iterate the callback to be called at each steps
for cb in self.cbm.get_exploration_step_callbacks():
cb(self)
# Execution into a thread
t = threading.Thread(
name='\033[0;%dm[exec:%08d]\033[0m' % ((31 + (self.uid_counter % 4)), self.uid_counter),
target=self._worker,
args=[seed, self.uid_counter],
daemon=True
)
t.start()
self.uid_counter += 1
while True:
t.join(0.001)
if not t.is_alive():
break
[docs]
def explore(self) -> ExplorationStatus:
"""
Start the symbolic exploration. That function
holds until the exploration is interrupted or finished.
:returns: the status of the exploration
:rtype: ExplorationStatus
"""
self.status = ExplorationStatus.RUNNING
try:
while self.seeds_manager.seeds_available() and not self._stop:
gc.collect()
self.step()
if self.status == ExplorationStatus.RUNNING:
if not self.seeds_manager.seeds_available():
logger.info("exploration step done (no new seed available)")
self.status = ExplorationStatus.IDLE
else:
logger.warning(f'should not exit step() in RUNNING state (stop? {self._stop}, seeds available? {self.seeds_manager.seeds_available()})')
except KeyboardInterrupt:
logger.warning("keyboard interrupt, stop symbolic exploration")
self.stop_exploration()
self.post_exploration()
logger.info(f"Total time of the exploration: {self._fmt_secs(self.__time_delta())}")
if self.status == ExplorationStatus.IDLE:
logger.info("Execution IDLE no seeds to execute")
return self.status
[docs]
def stop_exploration(self) -> None:
""" Interrupt the exploration """
self.status = ExplorationStatus.STOPPED
self._stop = True
[docs]
def terminate_exploration(self) -> None:
""" Terminate exploration with status terminated (normal shutdown) """
self.status = ExplorationStatus.TERMINATED
self._stop = True
@staticmethod
def _fmt_secs(seconds) -> str:
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
return (f"{int(h)}h" if h else '')+f"{int(m)}m{int(s)}s"
[docs]
def post_exploration(self) -> None:
""" Perform all calls to post exploration functions"""
self.seeds_manager.post_exploration()
self.coverage.post_exploration(self.workspace)