Source code for tritondse.seed_scheduler

# built-in imports
import json
from typing import Optional

# Local imports
from tritondse.seed import Seed
from tritondse.coverage import GlobalCoverage
from tritondse.workspace import Workspace
import tritondse.logging

logger = tritondse.logging.get("seedscheduler")


[docs] class SeedScheduler: """ Abstract class for all seed selection strategies. This class provides the base methods that all subclasses should implement to be compliant with the interface. """
[docs] def has_seed_remaining(self) -> bool: """ Returns true if there are still seeds to be processed in the scheduler :returns: true if there are seeds to process """ raise NotImplementedError()
[docs] def add(self, seed: Seed) -> None: """ Add a new seed in the scheduler :param seed: Seed to add in the scheduler :type seed: Seed """ raise NotImplementedError()
[docs] def update_worklist(self, coverage: GlobalCoverage) -> None: """ Call after every execution. That function might help the scheduler with some of its internal states. For instance the scheduler keep some seeds meant to cover an address which is now covered, it can just drop these seeds. :param coverage: global coverage of the exploration :type coverage: GlobalCoverage """ raise NotImplementedError()
[docs] def can_solve_models(self) -> bool: """ Function called by the seed manager to know if it can start negating branches to discover new paths. Some seed scheduler might want to run concretely all inputs first before starting negating branches. :return: true if the :py:obj:`SeedManager` can negate branches """ raise NotImplementedError()
[docs] def pick(self) -> Optional[Seed]: """ Return the next seed to execute. :returns: seed to execute :rtype: Seed """ raise NotImplementedError()
[docs] def post_execution(self) -> None: """ Called at the end of each execution after the generation of new seeds through SMT. Last thing called before starting the next iteration. """ pass
[docs] def post_exploration(self, workspace: Workspace) -> None: """ Called at the end of the exploration to perform some clean-up or anything else. """ pass
[docs] class WorklistAddressToSet(SeedScheduler): """ This worklist classifies seeds by addresses. We map a seed X to an address Y, if the seed X has been generated to reach the address Y. When the method pick() is called, seeds covering a new address 'Y' are selected first. Otherwise, anyone is taken. """ def __init__(self, manager: 'SeedManager'): self.manager = manager self.cov = None self.worklist = dict() # {CovItem: set(Seed)}
[docs] def __len__(self) -> int: """ Number of pending seeds to execute """ count = 0 for k in list(self.worklist): # Copy in list to avoid race-condition v = self.worklist[k] count += len(v) return count
[docs] def has_seed_remaining(self) -> bool: """ Returns true if there are still seeds in the worklist """ return len(self) != 0
[docs] def add(self, seed: Seed) -> None: """ Add a seed to the worklist """ for obj in seed.coverage_objectives: if obj in self.worklist: self.worklist[obj].add(seed) else: self.worklist[obj] = {seed}
[docs] def update_worklist(self, coverage: GlobalCoverage) -> None: """ Update the coverage state of the woklist with the global one """ self.cov = coverage
[docs] def can_solve_models(self) -> bool: """ Always true. This strategy always allows solving branches. As a consequence it might try to solve a branch already covered in a seed not run yet. But this enables iterating a seed only once. :returns: True """ return True
[docs] def pick(self) -> Optional[Seed]: """ Return the next seed to execute :returns: next seed to execute (first one covering new addresses, otherwise any other) :rtype: Seed """ seed_picked = None item_picked = None to_remove = set() for k, v in self.worklist.items(): # If the set is empty remove the entry if not len(v): to_remove.add(k) continue # If the address has never been executed, return the seed if not self.cov.is_covered(k): seed_picked = v.pop() item_picked = k if not len(v): to_remove.add(k) break # If all addresses have been executed, just pick a random seed if not seed_picked: for k, v in self.worklist.items(): if v: seed_picked = v.pop() item_picked = k if not len(v): to_remove.add(k) break # If we did not have a seed again the worklist is definitely empty if not seed_picked: return None # Pop the seed from all worklist[X] where it is for obj in seed_picked.coverage_objectives: if obj != item_picked: # already poped it from item_picked thus only pop the other if obj in self.worklist and seed_picked in self.worklist[obj]: self.worklist[obj].remove(seed_picked) if not self.worklist[obj]: to_remove.add(obj) # Garbage the worklist for i in to_remove: self.worklist.pop(i) return seed_picked
[docs] class WorklistRand(SeedScheduler): """ Trivial strategy that returns any Seed without any classification. It uses a Set for insertion and pop (which is random) for picking seeds. """ def __init__(self, manager: 'SeedManager'): self.worklist = set() # set(Seed) self.cov = None
[docs] def __len__(self) -> int: """ Number of pending seeds to execute """ return len(self.worklist)
[docs] def has_seed_remaining(self) -> bool: """ Returns true if there are still seeds in the worklist """ return len(self) != 0
[docs] def add(self, seed: Seed) -> None: """ Add a seed to the worklist :param seed: Seed to add to this rand scheduler :type seed: Seed """ self.worklist.add(seed)
[docs] def update_worklist(self, coverage: GlobalCoverage) -> None: """ Update the coverage state of the worklist with the global one """ self.cov = coverage
[docs] def can_solve_models(self) -> bool: """ Always true """ return True
[docs] def pick(self) -> Optional[Seed]: """ Return the next seed to execute. The method pop() removes a random element from the set and returns the removed element. Unlike, a stack a random element is popped off the set. :returns: next seed to execute :rtype: Seed """ return self.worklist.pop() if self.worklist else None
[docs] class FreshSeedPrioritizerWorklist(SeedScheduler): """ Strategy that first execute all seeds without negating branches in order to get the most updated coverage and which then re-run all relevant seeds to negate their branches. This worklist works as follows: - return first fresh seeds first to get them executed (to improve coverage) - keep the seed in the worklist up until it gets dropped or thoroughly processed - if no fresh seed is available, iterates seed that will generate coverage """ def __init__(self, manager: 'SeedManager'): self.manager = manager self.fresh = [] # Seed never processed (list to make sure we can pop first one received) self.worklist = dict() # CovItem -> set(Seed)
[docs] def __len__(self) -> int: """ Number of pending seeds to execute """ s = set() for seeds in list(self.worklist.values()): s.update(seeds) return len(self.fresh) + len(s)
[docs] def has_seed_remaining(self) -> bool: """ Returns true if there are still seeds in the worklist """ return len(self) != 0
[docs] def add(self, seed: Seed) -> None: """ Add a seed to the worklist :param seed: seed to add to the scheduler :type seed: Seed """ if seed.coverage_objectives: # If the seed already have coverage objectives for item in seed.coverage_objectives: # Add it in our worklist if item in self.worklist: self.worklist[item].add(seed) else: self.worklist[item] = {seed} # seed.coverage_objectives.clear() # Flush the objectives else: # Otherwise it is fresh self.fresh.append(seed)
[docs] def update_worklist(self, coverage: GlobalCoverage) -> None: """ Update the coverage state of the worklist with the global one """ # Iterate the worklist to see if some items have now been covered # and are thus not interesting anymore to_remove = [x for x in self.worklist if coverage.is_covered(x)] for item in to_remove: for seed in self.worklist.pop(item): seed.coverage_objectives.remove(item) if not seed.coverage_objectives: # The seed cannot improve the coverage of anything self.manager.drop_seed(seed)
[docs] def can_solve_models(self) -> bool: """ Returns True if there are no "fresh" seeds to execute. :returns: True if all fresh seeds have been executed. """ return not self.fresh
[docs] def pick(self) -> Optional[Seed]: """ Return the next seed to execute """ # Pop first fresh seed if self.fresh: return self.fresh.pop(0) # Return first item as it is the older # Then pop seed meant to crash if ... in self.worklist: # If we have specific seeds (mostly generated by sanitizers) it = self.worklist[...].pop() if not self.worklist[...]: # Remove the key if now empty self.worklist.pop(...) return it if not self.worklist: return None # Then pop traditional coverage seeds k = list(self.worklist.keys())[0] # arbitrary covitem seed = self.worklist[k].pop() # remove first seed inside for it in seed.coverage_objectives: # Remove the seed from all worklist[x] if it != k: # we already popped the item from k self.worklist[it].remove(seed) # remove the seed from that covitem set if not self.worklist[it]: # remove the whole covitem if empty self.worklist.pop(it) return seed
[docs] def post_execution(self) -> None: """ Solely used to show intermediate statistics """ logger.info(f"Seed Scheduler: worklist:{len(self)} Coverage objectives:{len(self.worklist)} (fresh:{len(self.fresh)})")
[docs] def post_exploration(self, workspace: Workspace) -> None: """ At the end of the execution, print the worklist to know its state before exit. """ workspace.save_metadata_file("coverage_objectives.json", json.dumps(list(self.worklist.keys())))