Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 108 additions & 59 deletions qiling/hw/hw.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,70 @@
#!/usr/bin/env python3
#
#
# Cross Platform and Multi Architecture Advanced Binary Emulation Framework
#

import ctypes
from functools import cached_property
from typing import Any, Dict, List, Optional, Tuple

from qiling.core import Qiling
from qiling import Qiling
from qiling.hw.peripheral import QlPeripheral
from qiling.utils import ql_get_module_function
from qiling.exception import QlErrorModuleFunctionNotFound


# should adhere to the QlMmioHandler interface, but not extend it directly to
# avoid potential pickling issues
class QlPripheralHandler:
def __init__(self, hwman: "QlHwManager", base: int, size: int, label: str) -> None:
self._hwman = hwman
self._base = base
self._size = size
self._label = label

def __getstate__(self):
state = self.__dict__.copy()
del state['_hwman'] # remove non-pickleable reference

return state

@cached_property
def _mmio(self) -> bytearray:
"""Get memory buffer used to back non-mapped hardware mmio regions.
"""

return bytearray(self._size)

def read(self, ql: Qiling, offset: int, size: int) -> int:
address = self._base + offset
hardware = self._hwman.find(address)

if hardware:
return hardware.read(address - hardware.base, size)

else:
ql.log.debug('[%s] read non-mapped hardware [%#010x]', self._label, address)
return int.from_bytes(self._mmio[offset:offset + size], byteorder='little')

def write(self, ql: Qiling, offset: int, size: int, value: int) -> None:
address = self._base + offset
hardware = self._hwman.find(address)

if hardware:
hardware.write(address - hardware.base, size, value)

else:
ql.log.debug('[%s] write non-mapped hardware [%#010x] = %#010x', self._label, address, value)
self._mmio[offset:offset + size] = value.to_bytes(size, 'little')


class QlHwManager:
def __init__(self, ql: Qiling):
self.ql = ql

self.entity = {}
self.region = {}

self.stepable = {}
self.entity: Dict[str, QlPeripheral] = {}
self.region: Dict[str, List[Tuple[int, int]]] = {}

def create(self, label: str, struct: str=None, base: int=None, kwargs: dict={}) -> "QlPeripheral":
def create(self, label: str, struct: Optional[str] = None, base: Optional[int] = None, kwargs: Optional[Dict[str, Any]] = None) -> QlPeripheral:
""" Create the peripheral accroding the label and envs.

struct: Structure of the peripheral. Use defualt ql structure if not provide.
Expand All @@ -30,88 +74,76 @@ def create(self, label: str, struct: str=None, base: int=None, kwargs: dict={})
if struct is None:
struct, base, kwargs = self.load_env(label.upper())

if kwargs is None:
kwargs = {}

try:

entity = ql_get_module_function('qiling.hw', struct)(self.ql, label, **kwargs)

self.entity[label] = entity
if hasattr(entity, 'step'):
self.stepable[label] = entity

self.region[label] = [(lbound + base, rbound + base) for (lbound, rbound) in entity.region]
except QlErrorModuleFunctionNotFound:
self.ql.log.warning(f'could not create {struct}({label}): implementation not found')

else:
assert isinstance(entity, QlPeripheral)
assert isinstance(base, int)

self.entity[label] = entity
self.region[label] = [(lbound + base, rbound + base) for (lbound, rbound) in entity.region]

return entity
except QlErrorModuleFunctionNotFound:
self.ql.log.debug(f'The {struct}({label}) has not been implemented')

def delete(self, label: str):
# FIXME: what should we do if struct is not implemented? is it OK to return None , or we fail?

def delete(self, label: str) -> None:
""" Remove the peripheral
"""

if label in self.entity:
self.entity.pop(label)
self.region.pop(label)
if label in self.stepable:
self.stepable.pop(label)
del self.entity[label]

if label in self.region:
del self.region[label]

def load_env(self, label: str):
def load_env(self, label: str) -> Tuple[str, int, Dict[str, Any]]:
""" Get peripheral information (structure, base address, initialization list) from env.

Args:
label (str): Peripheral Label

"""
args = self.ql.env[label]

return args['struct'], args['base'], args.get("kwargs", {})

def load_all(self):
for label, args in self.ql.env.items():
if args['type'] == 'peripheral':
self.create(label.lower(), args['struct'], args['base'], args.get("kwargs", {}))

def find(self, address: int):
# TODO: this is wasteful. device mapping is known at creation time. at least we could cache lru entries
def find(self, address: int) -> Optional[QlPeripheral]:
""" Find the peripheral at `address`
"""

for label in self.entity.keys():
for lbound, rbound in self.region[label]:
if lbound <= address < rbound:
return self.entity[label]

return None

def step(self):
""" Update all peripheral's state
""" Update all peripheral's state
"""
for entity in self.stepable.values():
entity.step()

def setup_mmio(self, begin, size, info=""):
mmio = ctypes.create_string_buffer(size)

def mmio_read_cb(ql, offset, size):
address = begin + offset
hardware = self.find(address)

if hardware:
return hardware.read(address - hardware.base, size)
else:
ql.log.debug('%s Read non-mapped hardware [0x%08x]' % (info, address))

buf = ctypes.create_string_buffer(size)
ctypes.memmove(buf, ctypes.addressof(mmio) + offset, size)
return int.from_bytes(buf.raw, byteorder='little')

def mmio_write_cb(ql, offset, size, value):
address = begin + offset
hardware = self.find(address)

if hardware:
hardware.write(address - hardware.base, size, value)
else:
ql.log.debug('%s Write non-mapped hardware [0x%08x] = 0x%08x' % (info, address, value))
ctypes.memmove(ctypes.addressof(mmio) + offset, (value).to_bytes(size, 'little'), size)

self.ql.mem.map_mmio(begin, size, mmio_read_cb, mmio_write_cb, info=info)

for ent in self.entity.values():
if hasattr(ent, 'step'):
ent.step()

def setup_mmio(self, begin: int, size: int, info: str) -> None:
dev = QlPripheralHandler(self, begin, size, info)

self.ql.mem.map_mmio(begin, size, dev, info)

def show_info(self):
self.ql.log.info(f'{"Start":8s} {"End":8s} {"Label":8s} {"Class"}')
Expand All @@ -131,8 +163,25 @@ def __getattr__(self, key):
return self.entity.get(key)

def save(self):
return {label : entity.save() for label, entity in self.entity.items()}
return {
'entity': {label: entity.save() for label, entity in self.entity.items()},
'region': self.region
}

def restore(self, saved_state):
for label, data in saved_state.items():
entity = saved_state['entity']
assert isinstance(entity, dict)

region = saved_state['region']
assert isinstance(region, dict)

for label, data in entity.items():
self.entity[label].restore(data)

self.region = region

# a dirty hack to rehydrate non-pickleable hwman
# a proper fix would require a deeper refactoring to how peripherals are created and managed
for ph in self.ql.mem.mmio_cbs.values():
if isinstance(ph, QlPripheralHandler):
setattr(ph, '_hwman', self)
60 changes: 32 additions & 28 deletions qiling/loader/mcu.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
#
#
# Cross Platform and Multi Architecture Advanced Binary Emulation Framework
# Built on top of Unicorn emulator (www.unicorn-engine.org)
# Built on top of Unicorn emulator (www.unicorn-engine.org)


import io
Expand All @@ -27,7 +27,7 @@ def __init__(self, path):
if addr != begin + len(stream):
self.segments.append((begin, stream))
begin, stream = addr, data

else:
stream += data

Expand All @@ -36,13 +36,13 @@ def __init__(self, path):
def parse_line(self, line):
if len(line) < 9:
return

desc = line[7: 9]
size = int(line[1: 3], 16)
size = int(line[1: 3], 16)

addr = bytes.fromhex(line[3: 7])
data = bytes.fromhex(line[9: 9 + size * 2])
data = bytes.fromhex(line[9: 9 + size * 2])

if desc == '00': # Data
offset = int.from_bytes(addr, byteorder='big')
self.mem.append((self.base + offset, data))
Expand All @@ -52,20 +52,20 @@ def parse_line(self, line):

elif desc == '04': # Extended Linear Address
self.base = int.from_bytes(data, byteorder='big') * 0x10000


class QlLoaderMCU(QlLoader):
def __init__(self, ql:Qiling):
super().__init__(ql)
super().__init__(ql)

self.entry_point = 0
self.load_address = 0
self.filetype = self.guess_filetype()

if self.filetype == 'elf':
with open(self.ql.path, 'rb') as infile:
self.elf = ELFFile(io.BytesIO(infile.read()))

elif self.filetype == 'bin':
self.map_address = self.argv[1]

Expand All @@ -74,16 +74,16 @@ def __init__(self, ql:Qiling):

def guess_filetype(self):
if self.ql.path.endswith('.elf'):
return 'elf'
return 'elf'

if self.ql.path.endswith('.bin'):
return 'bin'

if self.ql.path.endswith('.hex'):
return 'hex'

return 'elf'

def reset(self):
if self.filetype == 'elf':
for segment in self.elf.iter_segments(type='PT_LOAD'):
Expand All @@ -99,7 +99,7 @@ def reset(self):
for begin, data in self.ihex.segments:
self.ql.mem.write(begin, data)


self.ql.arch.init_context()
self.entry_point = self.ql.arch.regs.read('pc')

Expand All @@ -109,30 +109,34 @@ def load_profile(self):
def load_env(self):
for name, args in self.env.items():
memtype = args['type']

if memtype == 'memory':
size = args['size']
base = args['base']
self.ql.mem.map(base, size, info=f'[{name}]')

if memtype == 'remap':
size = args['size']
base = args['base']
alias = args['alias']
self.ql.hw.setup_remap(alias, base, size, info=f'[{name}]')

if memtype == 'mmio':
# elif memtype == 'remap':
# size = args['size']
# base = args['base']
# alias = args['alias']
# self.ql.hw.setup_remap(alias, base, size, info=f'[{name}]')

elif memtype == 'mmio':
size = args['size']
base = args['base']
self.ql.hw.setup_mmio(base, size, info=f'[{name}]')
self.ql.hw.setup_mmio(base, size, name)

if memtype == 'core':
elif memtype == 'core':
self.ql.hw.create(name.lower())

else:
self.ql.log.debug(f'ignoring unknown memory type "{memtype}" for {name}')

def run(self):
self.load_profile()
self.load_env()

## Handle interrupt from instruction execution
self.ql.hook_intr(self.ql.arch.unicorn_exception_handler)

self.reset()
Loading
Loading