diff --git a/qiling/hw/hw.py b/qiling/hw/hw.py index 33081a052..3d869d562 100644 --- a/qiling/hw/hw.py +++ b/qiling/hw/hw.py @@ -1,26 +1,70 @@ #!/usr/bin/env python3 -# +# # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # -import ctypes +from functools import cached_property +from typing import Any, Dict, List, Optional, Tuple -from qiling.core import Qiling +from qiling import Qiling from qiling.hw.peripheral import QlPeripheral from qiling.utils import ql_get_module_function from qiling.exception import QlErrorModuleFunctionNotFound +# should adhere to the QlMmioHandler interface, but not extend it directly to +# avoid potential pickling issues +class QlPripheralHandler: + def __init__(self, hwman: "QlHwManager", base: int, size: int, label: str) -> None: + self._hwman = hwman + self._base = base + self._size = size + self._label = label + + def __getstate__(self): + state = self.__dict__.copy() + del state['_hwman'] # remove non-pickleable reference + + return state + + @cached_property + def _mmio(self) -> bytearray: + """Get memory buffer used to back non-mapped hardware mmio regions. + """ + + return bytearray(self._size) + + def read(self, ql: Qiling, offset: int, size: int) -> int: + address = self._base + offset + hardware = self._hwman.find(address) + + if hardware: + return hardware.read(address - hardware.base, size) + + else: + ql.log.debug('[%s] read non-mapped hardware [%#010x]', self._label, address) + return int.from_bytes(self._mmio[offset:offset + size], byteorder='little') + + def write(self, ql: Qiling, offset: int, size: int, value: int) -> None: + address = self._base + offset + hardware = self._hwman.find(address) + + if hardware: + hardware.write(address - hardware.base, size, value) + + else: + ql.log.debug('[%s] write non-mapped hardware [%#010x] = %#010x', self._label, address, value) + self._mmio[offset:offset + size] = value.to_bytes(size, 'little') + + class QlHwManager: def __init__(self, ql: Qiling): self.ql = ql - self.entity = {} - self.region = {} - - self.stepable = {} + self.entity: Dict[str, QlPeripheral] = {} + self.region: Dict[str, List[Tuple[int, int]]] = {} - def create(self, label: str, struct: str=None, base: int=None, kwargs: dict={}) -> "QlPeripheral": + def create(self, label: str, struct: Optional[str] = None, base: Optional[int] = None, kwargs: Optional[Dict[str, Any]] = None) -> QlPeripheral: """ Create the peripheral accroding the label and envs. struct: Structure of the peripheral. Use defualt ql structure if not provide. @@ -30,39 +74,45 @@ def create(self, label: str, struct: str=None, base: int=None, kwargs: dict={}) if struct is None: struct, base, kwargs = self.load_env(label.upper()) + if kwargs is None: + kwargs = {} + try: - entity = ql_get_module_function('qiling.hw', struct)(self.ql, label, **kwargs) - - self.entity[label] = entity - if hasattr(entity, 'step'): - self.stepable[label] = entity - self.region[label] = [(lbound + base, rbound + base) for (lbound, rbound) in entity.region] + except QlErrorModuleFunctionNotFound: + self.ql.log.warning(f'could not create {struct}({label}): implementation not found') + else: + assert isinstance(entity, QlPeripheral) + assert isinstance(base, int) + + self.entity[label] = entity + self.region[label] = [(lbound + base, rbound + base) for (lbound, rbound) in entity.region] return entity - except QlErrorModuleFunctionNotFound: - self.ql.log.debug(f'The {struct}({label}) has not been implemented') - def delete(self, label: str): + # FIXME: what should we do if struct is not implemented? is it OK to return None , or we fail? + + def delete(self, label: str) -> None: """ Remove the peripheral """ + if label in self.entity: - self.entity.pop(label) - self.region.pop(label) - if label in self.stepable: - self.stepable.pop(label) + del self.entity[label] + + if label in self.region: + del self.region[label] - def load_env(self, label: str): + def load_env(self, label: str) -> Tuple[str, int, Dict[str, Any]]: """ Get peripheral information (structure, base address, initialization list) from env. Args: label (str): Peripheral Label - + """ args = self.ql.env[label] - + return args['struct'], args['base'], args.get("kwargs", {}) def load_all(self): @@ -70,48 +120,30 @@ def load_all(self): if args['type'] == 'peripheral': self.create(label.lower(), args['struct'], args['base'], args.get("kwargs", {})) - def find(self, address: int): + # TODO: this is wasteful. device mapping is known at creation time. at least we could cache lru entries + def find(self, address: int) -> Optional[QlPeripheral]: """ Find the peripheral at `address` """ - + for label in self.entity.keys(): for lbound, rbound in self.region[label]: if lbound <= address < rbound: return self.entity[label] + return None + def step(self): - """ Update all peripheral's state + """ Update all peripheral's state """ - for entity in self.stepable.values(): - entity.step() - - def setup_mmio(self, begin, size, info=""): - mmio = ctypes.create_string_buffer(size) - - def mmio_read_cb(ql, offset, size): - address = begin + offset - hardware = self.find(address) - - if hardware: - return hardware.read(address - hardware.base, size) - else: - ql.log.debug('%s Read non-mapped hardware [0x%08x]' % (info, address)) - - buf = ctypes.create_string_buffer(size) - ctypes.memmove(buf, ctypes.addressof(mmio) + offset, size) - return int.from_bytes(buf.raw, byteorder='little') - - def mmio_write_cb(ql, offset, size, value): - address = begin + offset - hardware = self.find(address) - - if hardware: - hardware.write(address - hardware.base, size, value) - else: - ql.log.debug('%s Write non-mapped hardware [0x%08x] = 0x%08x' % (info, address, value)) - ctypes.memmove(ctypes.addressof(mmio) + offset, (value).to_bytes(size, 'little'), size) - - self.ql.mem.map_mmio(begin, size, mmio_read_cb, mmio_write_cb, info=info) + + for ent in self.entity.values(): + if hasattr(ent, 'step'): + ent.step() + + def setup_mmio(self, begin: int, size: int, info: str) -> None: + dev = QlPripheralHandler(self, begin, size, info) + + self.ql.mem.map_mmio(begin, size, dev, info) def show_info(self): self.ql.log.info(f'{"Start":8s} {"End":8s} {"Label":8s} {"Class"}') @@ -131,8 +163,25 @@ def __getattr__(self, key): return self.entity.get(key) def save(self): - return {label : entity.save() for label, entity in self.entity.items()} + return { + 'entity': {label: entity.save() for label, entity in self.entity.items()}, + 'region': self.region + } def restore(self, saved_state): - for label, data in saved_state.items(): + entity = saved_state['entity'] + assert isinstance(entity, dict) + + region = saved_state['region'] + assert isinstance(region, dict) + + for label, data in entity.items(): self.entity[label].restore(data) + + self.region = region + + # a dirty hack to rehydrate non-pickleable hwman + # a proper fix would require a deeper refactoring to how peripherals are created and managed + for ph in self.ql.mem.mmio_cbs.values(): + if isinstance(ph, QlPripheralHandler): + setattr(ph, '_hwman', self) diff --git a/qiling/loader/mcu.py b/qiling/loader/mcu.py index 8a91d6334..3ad64c3bc 100644 --- a/qiling/loader/mcu.py +++ b/qiling/loader/mcu.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 -# +# # Cross Platform and Multi Architecture Advanced Binary Emulation Framework -# Built on top of Unicorn emulator (www.unicorn-engine.org) +# Built on top of Unicorn emulator (www.unicorn-engine.org) import io @@ -27,7 +27,7 @@ def __init__(self, path): if addr != begin + len(stream): self.segments.append((begin, stream)) begin, stream = addr, data - + else: stream += data @@ -36,13 +36,13 @@ def __init__(self, path): def parse_line(self, line): if len(line) < 9: return - + desc = line[7: 9] - size = int(line[1: 3], 16) - + size = int(line[1: 3], 16) + addr = bytes.fromhex(line[3: 7]) - data = bytes.fromhex(line[9: 9 + size * 2]) - + data = bytes.fromhex(line[9: 9 + size * 2]) + if desc == '00': # Data offset = int.from_bytes(addr, byteorder='big') self.mem.append((self.base + offset, data)) @@ -52,20 +52,20 @@ def parse_line(self, line): elif desc == '04': # Extended Linear Address self.base = int.from_bytes(data, byteorder='big') * 0x10000 - + class QlLoaderMCU(QlLoader): def __init__(self, ql:Qiling): - super().__init__(ql) - + super().__init__(ql) + self.entry_point = 0 self.load_address = 0 self.filetype = self.guess_filetype() - + if self.filetype == 'elf': with open(self.ql.path, 'rb') as infile: self.elf = ELFFile(io.BytesIO(infile.read())) - + elif self.filetype == 'bin': self.map_address = self.argv[1] @@ -74,8 +74,8 @@ def __init__(self, ql:Qiling): def guess_filetype(self): if self.ql.path.endswith('.elf'): - return 'elf' - + return 'elf' + if self.ql.path.endswith('.bin'): return 'bin' @@ -83,7 +83,7 @@ def guess_filetype(self): return 'hex' return 'elf' - + def reset(self): if self.filetype == 'elf': for segment in self.elf.iter_segments(type='PT_LOAD'): @@ -99,7 +99,7 @@ def reset(self): for begin, data in self.ihex.segments: self.ql.mem.write(begin, data) - + self.ql.arch.init_context() self.entry_point = self.ql.arch.regs.read('pc') @@ -109,30 +109,34 @@ def load_profile(self): def load_env(self): for name, args in self.env.items(): memtype = args['type'] + if memtype == 'memory': size = args['size'] base = args['base'] self.ql.mem.map(base, size, info=f'[{name}]') - - if memtype == 'remap': - size = args['size'] - base = args['base'] - alias = args['alias'] - self.ql.hw.setup_remap(alias, base, size, info=f'[{name}]') - if memtype == 'mmio': + # elif memtype == 'remap': + # size = args['size'] + # base = args['base'] + # alias = args['alias'] + # self.ql.hw.setup_remap(alias, base, size, info=f'[{name}]') + + elif memtype == 'mmio': size = args['size'] base = args['base'] - self.ql.hw.setup_mmio(base, size, info=f'[{name}]') + self.ql.hw.setup_mmio(base, size, name) - if memtype == 'core': + elif memtype == 'core': self.ql.hw.create(name.lower()) + else: + self.ql.log.debug(f'ignoring unknown memory type "{memtype}" for {name}') + def run(self): self.load_profile() self.load_env() - + ## Handle interrupt from instruction execution self.ql.hook_intr(self.ql.arch.unicorn_exception_handler) - + self.reset() diff --git a/qiling/os/memory.py b/qiling/os/memory.py index 0939fc278..ec7aef19f 100644 --- a/qiling/os/memory.py +++ b/qiling/os/memory.py @@ -6,7 +6,7 @@ import bisect import os import re -from typing import Any, Callable, Iterator, List, Mapping, Optional, Pattern, Sequence, Tuple, Union +from typing import Any, Callable, Dict, Iterator, List, Mapping, Optional, Pattern, Protocol, Sequence, Tuple, Union from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL @@ -20,6 +20,22 @@ MmioWriteCallback = Callable[[Qiling, int, int, int], None] +class QlMmioHandler(Protocol): + """A simple MMIO handler boilerplate that can be used to implement memory mapped devices. + + This should be extended to implement mapped devices state machines. Note that the read and write + methods are optional, where their existance indicates whether the device supports the corresponding + operation. That is, an unimplemented method means the corresponding operation will be silently + dropped. + """ + + def read(self, ql: Qiling, offset: int, size: int) -> int: + ... + + def write(self, ql: Qiling, offset: int, size: int, value: int) -> None: + ... + + class QlMemoryManager: """ some ideas and code from: @@ -29,7 +45,7 @@ class QlMemoryManager: def __init__(self, ql: Qiling, pagesize: int = 0x1000): self.ql = ql self.map_info: List[MapInfoEntry] = [] - self.mmio_cbs = {} + self.mmio_cbs: Dict[Tuple[int, int], QlMmioHandler] = {} bit_stuff = { 64: (1 << 64) - 1, @@ -272,7 +288,7 @@ def save(self): for lbound, ubound, perm, label, is_mmio in self.map_info: if is_mmio: - mem_dict['mmio'].append((lbound, ubound, perm, label, *self.mmio_cbs[(lbound, ubound)])) + mem_dict['mmio'].append((lbound, ubound, perm, label, self.mmio_cbs[(lbound, ubound)])) else: data = self.read(lbound, ubound - lbound) mem_dict['ram'].append((lbound, ubound, perm, label, bytes(data))) @@ -294,12 +310,12 @@ def restore(self, mem_dict): self.ql.log.debug(f'writing {len(data):#x} bytes at {lbound:#08x}') self.write(lbound, data) - for lbound, ubound, perms, label, read_cb, write_cb in mem_dict['mmio']: + for lbound, ubound, perms, label, handler in mem_dict['mmio']: self.ql.log.debug(f"restoring mmio range: {lbound:#08x} {ubound:#08x} {label}") size = ubound - lbound if not self.is_mapped(lbound, size): - self.map_mmio(lbound, size, read_cb, write_cb, info=label) + self.map_mmio(lbound, size, handler, label) def read(self, addr: int, size: int) -> bytearray: """Read bytes from memory. @@ -619,15 +635,15 @@ def map(self, addr: int, size: int, perms: int = UC_PROT_ALL, info: Optional[str self.ql.uc.mem_map(addr, size, perms) self.add_mapinfo(addr, addr + size, perms, info or '[mapped]', is_mmio=False) - def map_mmio(self, addr: int, size: int, read_cb: Optional[MmioReadCallback], write_cb: Optional[MmioWriteCallback], info: str = '[mmio]'): + def map_mmio(self, addr: int, size: int, handler: QlMmioHandler, info: str = '[mmio]'): # TODO: mmio memory overlap with ram? Is that possible? # TODO: Can read_cb or write_cb be None? How uc handle that access? prot = UC_PROT_NONE - if read_cb: + if hasattr(handler, 'read'): prot |= UC_PROT_READ - if write_cb: + if hasattr(handler, 'write'): prot |= UC_PROT_WRITE # generic mmio read wrapper @@ -642,10 +658,10 @@ def __mmio_write(uc, offset: int, size: int, value: int, user_data: MmioWriteCal cb(self.ql, offset, size, value) - self.ql.uc.mmio_map(addr, size, __mmio_read, read_cb, __mmio_write, write_cb) + self.ql.uc.mmio_map(addr, size, __mmio_read, handler.read, __mmio_write, handler.write) self.add_mapinfo(addr, addr + size, prot, info, is_mmio=True) - self.mmio_cbs[(addr, addr + size)] = (read_cb, write_cb) + self.mmio_cbs[(addr, addr + size)] = handler class Chunk: