diff --git a/qiling/os/memory.py b/qiling/os/memory.py index d858b0f7a..39878f759 100644 --- a/qiling/os/memory.py +++ b/qiling/os/memory.py @@ -4,7 +4,7 @@ # import os, re -from typing import Any, Callable, ClassVar, List, MutableSequence, Optional, Sequence, Tuple +from typing import Any, Callable, List, MutableSequence, Optional, Sequence, Tuple from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL @@ -12,8 +12,11 @@ from qiling.const import * from qiling.exception import * -# tuple: range start, range end, permissions mask, range label -MapInfoEntry = Tuple[int, int, int, str] +# tuple: range start, range end, permissions mask, range label, is mmio? +MapInfoEntry = Tuple[int, int, int, str, bool] + +MmioReadCallback = Callable[[Qiling, int, int], int] +MmioWriteCallback = Callable[[Qiling, int, int, int], None] class QlMemoryManager: """ @@ -37,10 +40,15 @@ def __init__(self, ql: Qiling): max_addr = bit_stuff[ql.archbit] - #self.read_ptr = read_ptr self.max_addr = max_addr self.max_mem_addr = max_addr + # memory page size + self.pagesize = 0x1000 + + # make sure pagesize is a power of 2 + assert self.pagesize & (self.pagesize - 1) == 0, 'pagesize has to be a power of 2' + def __read_string(self, addr: int) -> str: ret = bytearray() c = self.read(addr, 1) @@ -73,7 +81,7 @@ def string(self, addr: int, value=None, encoding='utf-8') -> Optional[str]: self.__write_string(addr, value, encoding) - def add_mapinfo(self, mem_s: int, mem_e: int, mem_p: int, mem_info: str, is_mmio=False): + def add_mapinfo(self, mem_s: int, mem_e: int, mem_p: int, mem_info: str, is_mmio: bool = False): """Add a new memory range to map. Args: @@ -81,6 +89,7 @@ def add_mapinfo(self, mem_s: int, mem_e: int, mem_p: int, mem_info: str, is_mmio mem_e: memory range end mem_p: permissions mask mem_info: map entry label + is_mmio: memory range is mmio """ self.map_info.append((mem_s, mem_e, mem_p, mem_info, is_mmio)) @@ -136,7 +145,7 @@ def __perms_mapping(ps: int) -> str: return ''.join(val if idx & ps else '-' for idx, val in perms_d.items()) - def __process(lbound: int, ubound: int, perms: int, label: str, is_mmio) -> Tuple[int, int, str, str, Optional[str]]: + def __process(lbound: int, ubound: int, perms: int, label: str, is_mmio: bool) -> Tuple[int, int, str, str, Optional[str]]: perms_str = __perms_mapping(perms) if hasattr(self.ql, 'os'): @@ -164,7 +173,7 @@ def show_mapinfo(self): def get_lib_base(self, filename: str) -> int: return next((s for s, _, _, info, _ in self.map_info if os.path.split(info)[1] == filename), -1) - def align(self, addr: int, alignment: int = 0x1000) -> int: + def align(self, addr: int, alignment: int = None) -> int: """Round up to nearest alignment. Args: @@ -172,6 +181,9 @@ def align(self, addr: int, alignment: int = 0x1000) -> int: alignment: alignment granularity, must be a power of 2 """ + if alignment is None: + alignment = self.pagesize + # rounds up to nearest alignment mask = self.max_mem_addr & -alignment @@ -186,12 +198,12 @@ def save(self): "mmio" : [] } - for lbound, ubound, perm, label, mmio in self.map_info: - if not mmio: + for lbound, ubound, perm, label, is_mmio in self.map_info: + if is_mmio: + mem_dict['mmio'].append((lbound, ubound, perm, label, *self.mmio_cbs[(lbound, ubound)])) + else: data = self.read(lbound, ubound - lbound) mem_dict['ram'].append((lbound, ubound, perm, label, bytes(data))) - else: - mem_dict['mmio'].append((lbound, ubound, perm, label, *self.mmio_cbs[(lbound, ubound)])) return mem_dict @@ -200,18 +212,18 @@ def restore(self, mem_dict): """ for lbound, ubound, perms, label, data in mem_dict['ram']: - self.ql.log.debug(f'To restore mapping: {lbound:#08x} {ubound:#08x} {label}') + self.ql.log.debug(f'restoring memory range: {lbound:#08x} {ubound:#08x} {label}') size = ubound - lbound if not self.is_mapped(lbound, size): self.ql.log.debug(f'mapping {lbound:#08x} {ubound:#08x}, mapsize = {size:#x}') self.map(lbound, size, perms, label) - self.ql.log.debug(f'writing {lbound:#08x}, size = {size:#x}, write_size = {len(data):#x}') + self.ql.log.debug(f'writing {len(data):#x} bytes at {lbound:#08x}') self.write(lbound, data) - + for lbound, ubound, perms, label, read_cb, write_cb in mem_dict['mmio']: - self.ql.log.debug(f"To restore mmio mapping: {lbound:#08x} {ubound:#08x} {label}") + self.ql.log.debug(f"restoring mmio range: {lbound:#08x} {ubound:#08x} {label}") #TODO: Handle overlapped MMIO? self.map_mmio(lbound, ubound - lbound, read_cb, write_cb, info=label) @@ -284,7 +296,8 @@ def search(self, needle: bytes, begin: int = None, end: int = None) -> Sequence[ assert begin < end, 'search arguments do not make sense' - ranges = [(max(begin, lbound), min(ubound, end)) for lbound, ubound, _, _, _ in self.map_info if not (end < lbound or ubound < begin)] + # narrow the search down to relevant ranges; mmio ranges are excluded due to potential read size effects + ranges = [(max(begin, lbound), min(ubound, end)) for lbound, ubound, _, _, is_mmio in self.map_info if not (end < lbound or ubound < begin or is_mmio)] results = [] for lbound, ubound in ranges: @@ -305,6 +318,7 @@ def unmap(self, addr: int, size: int) -> None: self.del_mapinfo(addr, addr + size) self.ql.uc.mem_unmap(addr, size) + if (addr, addr + size) in self.mmio_cbs: del self.mmio_cbs[(addr, addr+size)] @@ -359,7 +373,7 @@ def is_free(self, address, size): return True - def find_free_space(self, size: int, minaddr: int = None, maxaddr: int = None, align=0x1000) -> int: + def find_free_space(self, size: int, minaddr: int = None, maxaddr: int = None, align: int = None) -> int: """Locate an unallocated memory that is large enough to contain a range in size of `size` and based at `minaddr`. @@ -374,6 +388,9 @@ def find_free_space(self, size: int, minaddr: int = None, maxaddr: int = None, a Raises: QlOutOfMemory in case no available memory space found with the specified requirements """ + if align is None: + align = self.pagesize + # memory space bounds (exclusive) mem_lbound = 0 mem_ubound = self.max_addr + 1 @@ -401,7 +418,7 @@ def find_free_space(self, size: int, minaddr: int = None, maxaddr: int = None, a raise QlOutOfMemory('Out Of Memory') - def map_anywhere(self, size: int, minaddr: int = None, maxaddr: int = None, align=0x1000, perms: int = UC_PROT_ALL, info: str = None) -> int: + def map_anywhere(self, size: int, minaddr: int = None, maxaddr: int = None, align: int = None, perms: int = UC_PROT_ALL, info: str = None) -> int: """Map a region anywhere in memory. Args: @@ -415,6 +432,9 @@ def map_anywhere(self, size: int, minaddr: int = None, maxaddr: int = None, alig Returns: mapped address """ + if align is None: + align = self.pagesize + addr = self.find_free_space(size, minaddr, maxaddr, align) self.map(addr, self.align(size), perms, info) @@ -425,8 +445,8 @@ def protect(self, addr: int, size: int, perms): # mask off perms bits that are not supported by unicorn perms &= UC_PROT_ALL - aligned_address = (addr >> 12) << 12 - aligned_size = self.align((addr & 0xFFF) + size) + aligned_address = addr & ~(self.pagesize - 1) + aligned_size = self.align((addr & (self.pagesize - 1)) + size) self.ql.uc.mem_protect(aligned_address, aligned_size, perms) @@ -451,30 +471,35 @@ def map(self, addr: int, size: int, perms: int = UC_PROT_ALL, info: str = None): raise QlMemoryMappedError('Requested memory is unavailable') self.ql.uc.mem_map(addr, size, perms) - self.add_mapinfo(addr, addr + size, perms, info or '[mapped]') - - def _mmio_read_cb(self, uc, offset, size, data): - ql, cb = data - return cb(ql, offset, size) - - def _mmio_write_cb(self, uc, offset, size, value, data): - ql, cb = data - cb(ql, offset, size, value) + self.add_mapinfo(addr, addr + size, perms, info or '[mapped]', is_mmio=False) - def map_mmio(self, addr: int, size: int, read_cb: Callable, write_cb: Callable, info: str="[IO Memory]"): + def map_mmio(self, addr: int, size: int, read_cb: Optional[MmioReadCallback], write_cb: Optional[MmioWriteCallback], info: str = '[mmio]'): # TODO: mmio memory overlap with ram? Is that possible? # TODO: Can read_cb or write_cb be None? How uc handle that access? - prot = 0 - if read_cb is not None: + prot = UC_PROT_NONE + + if read_cb: prot |= UC_PROT_READ - - if write_cb is not None: + + if write_cb: prot |= UC_PROT_WRITE - self.ql.uc.mmio_map(addr, size, self._mmio_read_cb, (self.ql, read_cb), self._mmio_write_cb, (self.ql, write_cb)) - self.add_mapinfo(addr, addr+size, prot, info, True) + # generic mmio read wrapper + def __mmio_read(uc, offset: int, size: int, user_data: MmioReadCallback): + cb = user_data + + return cb(self.ql, offset, size) + + # generic mmio write wrapper + def __mmio_write(uc, offset: int, size: int, value: int, user_data: MmioWriteCallback): + cb = user_data + + cb(self.ql, offset, size, value) + + self.ql.uc.mmio_map(addr, size, __mmio_read, read_cb, __mmio_write, write_cb) + self.add_mapinfo(addr, addr + size, prot, info, is_mmio=True) - self.mmio_cbs[(addr, addr+size)] = (read_cb, write_cb) + self.mmio_cbs[(addr, addr + size)] = (read_cb, write_cb) # A Simple Heap Implementation class Chunk():