Source code for tritondse.heap_allocator

# local imports
from tritondse.types import Addr, ByteSize, Perm
from tritondse.memory import Memory
from tritondse.exception import AllocatorException
import tritondse.logging

logger = tritondse.logging.get('heapallocator')


[docs] class HeapAllocator(object): """ Custom tiny heap allocator. Used by built-ins routines like malloc/free. This allocation manager also provides an API enabling checking whether a pointer is allocated freed etc. .. warning:: This allocator is very simple and does not perform any coalescing of freed memory areas. Thus, it may not correctly model the behavior of libc allocator. """ def __init__(self, start: Addr, end: Addr, memory: Memory): """ Class constructor. Takes heap bounds as parameter. :param start: Where the heap area can start :type start: :py:obj:`tritondse.types.Addr` :param end: Where the heap area must end :type start: :py:obj:`tritondse.types.Addr` :param memory: Memory: Memory object on which to perform allocations """ # Range of the memory mapping self.start: Addr = start #: Starting address of the heap self.end: Addr = end #: Ending address of the heap self._curr_offset: Addr = self.start #: Heap current offset address self._memory = memory # Pools memory self.alloc_pool = dict() # {ptr: MemMap} self.free_pool = dict() # {size: set(MemMap ...)} # TODO: For a to-the-moon allocator, we could merge freed chunks. Like 4 chunks of 1 byte into one chunk of 4 bytes. # TODO: For a to-the-moon allocator, we could split a big chunk into two chunks when asking an allocation.
[docs] def alloc(self, size: ByteSize) -> Addr: """ Performs an allocation of the given byte size. :param size: Byte size to allocate :type size: :py:obj:`tritondse.types.ByteSize` :raise AllocatorException: if not memory is available :return: The pointer address allocated :rtype: :py:obj:`tritondse.types.Addr` """ if size <= 0: logger.error(f"Heap: invalid allocation size {size}") return 0 ptr = None for sz in sorted(x for x in self.free_pool if x >= size): # get the free chunk ptr = self.free_pool[sz].pop().start # If the set is empty after the pop(), remove the entry if not self.free_pool[sz]: del self.free_pool[sz] break if ptr is None: # We did not find reusable freed ptr ptr = self._curr_offset self._curr_offset += size # Now we can allocate the chunk mmap = self._memory.map(ptr, size, Perm.R | Perm.W, 'heap') self.alloc_pool.update({ptr: mmap}) return ptr
[docs] def free(self, ptr: Addr) -> None: """ Free the given memory chunk. :param ptr: Address to free :type ptr: :py:obj:`tritondse.types.Addr` :raise AllocatorException: if the pointer has already been freed or if it has never been allocated """ if self.is_ptr_freed(ptr): raise AllocatorException('Double free or corruption!') if not self.is_ptr_allocated(ptr): raise AllocatorException(f'Invalid pointer ({hex(ptr)})') # Add the chunk into our free_pool memmap = self.alloc_pool[ptr] if memmap.size in self.free_pool: self.free_pool[memmap.size].add(memmap) else: self.free_pool[memmap.size] = {memmap} # Remove the chunk from our alloc_pool self._memory.unmap(ptr) del self.alloc_pool[ptr]
[docs] def is_ptr_allocated(self, ptr: Addr) -> bool: """ Check whether a given address has been allocated :param ptr: Address to check :type ptr: :py:obj:`tritondse.types.Addr` :return: True if pointer points to an allocated memory region """ return self._memory.is_mapped(ptr, 1)
[docs] def is_ptr_freed(self, ptr: Addr) -> bool: """ Check whether a given pointer has recently been freed. :param ptr: Address to check :type ptr: :py:obj:`tritondse.types.Addr` :return: True if pointer has been freed, False otherwise """ # FIXME: This function is linear in the size of chunks. Can make it logarithmic for size, chunks in self.free_pool.items(): for chunk in chunks: if chunk.start <= ptr < chunk.start + size: return True return False