diff --git a/.github/workflows/build-ci.yml b/.github/workflows/build-ci.yml index c28d542a3..1b652b8d3 100644 --- a/.github/workflows/build-ci.yml +++ b/.github/workflows/build-ci.yml @@ -80,7 +80,7 @@ jobs: cd ../qiling cd ../examples/rootfs/x86_linux/kernel && unzip -P infected m0hamed_rootkit.ko.zip cd ../../../../ - pip3 install -e .[evm] + pip3 install -e .[evm,RE] if [ ${{ matrix.os }} == 'ubuntu-18.04' ] and [ ${{ matrix.python-version }} == '3.9' ]; then docker run -it --rm -v ${GITHUB_WORKSPACE}:/qiling qilingframework/qiling:dev bash -c "cd tests && ./test_onlinux.sh" diff --git a/examples/extensions/r2/hello_r2.py b/examples/extensions/r2/hello_r2.py index d51dc9c7d..ebd54c452 100644 --- a/examples/extensions/r2/hello_r2.py +++ b/examples/extensions/r2/hello_r2.py @@ -16,7 +16,8 @@ def func(ql: Qiling, *args, **kwargs): return def my_sandbox(path, rootfs): - ql = Qiling(path, rootfs, verbose=QL_VERBOSE.DEBUG) + ql = Qiling(path, rootfs, verbose=QL_VERBOSE.DISASM) + # QL_VERBOSE.DISASM will be monkey-patched when r2 is available r2 = R2(ql) # search bytes sequence using ql.mem.search @@ -33,7 +34,7 @@ def my_sandbox(path, rootfs): # get function address and hook it ql.hook_address(func, r2.functions['main'].offset) # enable trace powered by r2 symsmap - r2.enable_trace() + # r2.enable_trace() ql.run() if __name__ == "__main__": diff --git a/qiling/arch/utils.py b/qiling/arch/utils.py index ab9583fb7..7c6bcd9c8 100644 --- a/qiling/arch/utils.py +++ b/qiling/arch/utils.py @@ -73,7 +73,12 @@ def ql_hook_block_disasm(ql: Qiling, address: int, size: int): self._block_hook = None if verbosity >= QL_VERBOSE.DISASM: - self._disasm_hook = self.ql.hook_code(self.disassembler) + try: # monkey patch disassembler + from qiling.extensions.r2 import R2 + r2 = R2(self.ql) + self._disasm_hook = self.ql.hook_code(r2.disassembler) + except (ImportError, ModuleNotFoundError): + self._disasm_hook = self.ql.hook_code(self.disassembler) if verbosity >= QL_VERBOSE.DUMP: self._block_hook = self.ql.hook_block(ql_hook_block_disasm) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 669efc311..13a655b2f 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -3,13 +3,13 @@ # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # -import bisect import ctypes import json +import re import libr -from dataclasses import dataclass, fields +from dataclasses import dataclass, field, fields from functools import cached_property, wraps -from typing import TYPE_CHECKING, Dict, List, Literal, Tuple, Union +from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Pattern, Tuple, Union from qiling.const import QL_ARCH from qiling.extensions import trace from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL @@ -80,6 +80,20 @@ class Symbol(R2Data): is_imported: bool +@dataclass(unsafe_hash=True, init=False) +class Instruction(R2Data): + offset: int + size: int + opcode: str # raw opcode + disasm: str = '' # flag resolved opcode + bytes: bytes + type: str + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.bytes = bytes.fromhex(kwargs["bytes"]) + + @dataclass(unsafe_hash=True, init=False) class Function(R2Data): name: str @@ -90,7 +104,7 @@ class Function(R2Data): @dataclass(unsafe_hash=True, init=False) class Flag(R2Data): - offset: int + offset: int # should be addr but r2 calls it offset name: str = '' size: int = 0 @@ -166,7 +180,9 @@ def _cmd(self, cmd: str) -> str: self._r2c, ctypes.create_string_buffer(cmd.encode("utf-8"))) return ctypes.string_at(r).decode('utf-8') - @staticmethod + def _cmdj(self, cmd: str) -> Union[Dict, List[Dict]]: + return json.loads(self._cmd(cmd)) + def aaa(fun): @wraps(fun) def wrapper(self): @@ -176,9 +192,6 @@ def wrapper(self): return fun(self) return wrapper - def _cmdj(self, cmd: str) -> Union[Dict, List[Dict]]: - return json.loads(self._cmd(cmd)) - @cached_property def binfo(self) -> Dict[str, str]: return self._cmdj("iIj") @@ -222,13 +235,24 @@ def flags(self) -> List[Flag]: def xrefs(self) -> List[Xref]: return [Xref(**dic) for dic in self._cmdj("axj")] - def at(self, addr: int) -> Tuple[Flag, int]: - # the most suitable flag should have address <= addr - # bisect_right find the insertion point, right side if value exists - idx = bisect.bisect_right(self.flags, Flag(offset=addr)) - # minus 1 to find the corresponding flag - flag = self.flags[idx - 1] - return flag, addr - flag.offset + def at(self, addr: int, parse=False) -> Union[str, Tuple[str, int]]: + '''Given an address, return [name, offset] or "name + offset"''' + name = self._cmd(f'fd {addr}').strip() + if parse: + try: + name, offset = name.split(' + ') + offset = int(offset) + except ValueError: # split fail when offset=0 + offset = 0 + return name, offset + return name + + def where(self, name: str, offset: int=0) -> int: + '''Given a name (+ offset), return its address (0 when not found)''' + if offset != 0: # name can already have offset, multiple + is allowd + name += f' + {offset}' + addr = self._cmd(f'?v {name}').strip() # 0x0 when name is not found + return int(addr, 16) def refrom(self, addr: int) -> List[Xref]: return [x for x in self.xrefs if x.fromaddr == addr] @@ -240,6 +264,35 @@ def read(self, addr: int, size: int) -> bytes: hexstr = self._cmd(f"p8 {size} @ {addr}") return bytes.fromhex(hexstr) + def dis_nbytes(self, addr: int, size: int) -> List[Instruction]: + insts = [Instruction(**dic) for dic in self._cmdj(f"pDj {size} @ {addr}")] + return insts + + def disassembler(self, ql: 'Qiling', addr: int, size: int, filt: Pattern[str]=None) -> int: + '''A human-friendly monkey patch of QlArchUtils.disassembler powered by r2, can be used for hook_code + :param ql: Qiling instance + :param addr: start address for disassembly + :param size: size in bytes + :param filt: regex pattern to filter instructions + :return: progress of dissembler, should be equal to size if success + ''' + anibbles = ql.arch.bits // 4 + progress = 0 + for inst in self.dis_nbytes(addr, size): + if inst.type.lower() == 'invalid': + break # stop disasm + name, offset = self.at(inst.offset, parse=True) + if filt is None or filt.search(name): + ql.log.info(f'{inst.offset:0{anibbles}x} [{name:20s} + {offset:#08x}] {inst.bytes.hex(" "):20s} {inst.disasm}') + progress = inst.offset + inst.size - addr + if progress < size: + ql.arch.utils.disassembler(ql, addr + progress, size - progress) + return progress + + def enable_disasm(self, filt_str: str=''): + filt = re.compile(filt_str) + self.ql.hook_code(self.disassembler, filt) + def enable_trace(self, mode='full'): # simple map from addr to flag name, cannot resolve addresses in the middle self.ql.loader.symsmap = {flag.offset: flag.name for flag in self.flags} diff --git a/tests/test_r2.py b/tests/test_r2.py index ec28cd11b..97c180288 100644 --- a/tests/test_r2.py +++ b/tests/test_r2.py @@ -13,10 +13,17 @@ class R2Test(unittest.TestCase): def test_shellcode_disasm(self): - ql = Qiling(code=EVM_CODE, archtype="evm", verbose=QL_VERBOSE.DEBUG) + ql = Qiling(code=EVM_CODE, archtype="evm", verbose=QL_VERBOSE.DISABLED) r2 = R2(ql) pd = r2._cmd("pd 32") - self.assertTrue('invalid' not in pd) + self.assertTrue('callvalue' in pd) + + def test_addr_flag(self): + ql = Qiling(["../examples/rootfs/x86_windows/bin/x86_hello.exe"], "../examples/rootfs/x86_windows", + verbose=QL_VERBOSE.DISABLED) # x8864_hello does not have 'main' + r2 = R2(ql) + print(r2.where('main')) + self.assertEqual(r2.at(r2.where('main')), 'main') if __name__ == "__main__":