diff --git a/examples/extensions/r2/hello_r2.py b/examples/extensions/r2/hello_r2.py new file mode 100644 index 000000000..d51dc9c7d --- /dev/null +++ b/examples/extensions/r2/hello_r2.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 +# +# Cross Platform and Multi Architecture Advanced Binary Emulation Framework +# + +import sys +sys.path.append('..') + +from qiling import Qiling +from qiling.const import QL_VERBOSE +from qiling.extensions.r2 import R2 + + +def func(ql: Qiling, *args, **kwargs): + ql.os.stdout.write(b"=====hooked main=====!\n") + return + +def my_sandbox(path, rootfs): + ql = Qiling(path, rootfs, verbose=QL_VERBOSE.DEBUG) + r2 = R2(ql) + + # search bytes sequence using ql.mem.search + addrs = ql.mem.search(b'llo worl') # return all matching results + print(r2.at(addrs[0])) # find corresponding flag at the address and the offset to the flag + # search string using r2 + addr = r2.strings['Hello world!'].vaddr # key must be exactly same + print(addrs[0], addr) + # print xref to string "Hello world!" + print(r2.refto(addr)) + # write to string using ql.mem.write + ql.mem.write(addr, b"No hello, Bye!\x00") + + # get function address and hook it + ql.hook_address(func, r2.functions['main'].offset) + # enable trace powered by r2 symsmap + r2.enable_trace() + ql.run() + +if __name__ == "__main__": + my_sandbox(["rootfs/x86_windows/bin/x86_hello.exe"], "rootfs/x86_windows") + + # test shellcode mode + ARM64_LIN = bytes.fromhex('420002ca210080d2400080d2c81880d2010000d4e60300aa01020010020280d2681980d2010000d4410080d2420002cae00306aa080380d2010000d4210400f165ffff54e0000010420002ca210001caa81b80d2010000d4020004d27f0000012f62696e2f736800') + print("\nLinux ARM 64bit Shellcode") + ql = Qiling(code=ARM64_LIN, archtype="arm64", ostype="linux", verbose=QL_VERBOSE.DEBUG) + r2 = R2(ql) + # disassemble 32 instructions + print(r2._cmd('pd 32')) + ql.run() diff --git a/qiling/extensions/r2/__init__.py b/qiling/extensions/r2/__init__.py new file mode 100644 index 000000000..d8f86c32a --- /dev/null +++ b/qiling/extensions/r2/__init__.py @@ -0,0 +1 @@ +from .r2 import R2 diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py new file mode 100644 index 000000000..669efc311 --- /dev/null +++ b/qiling/extensions/r2/r2.py @@ -0,0 +1,252 @@ +#!/usr/bin/env python3 +# +# Cross Platform and Multi Architecture Advanced Binary Emulation Framework +# + +import bisect +import ctypes +import json +import libr +from dataclasses import dataclass, fields +from functools import cached_property, wraps +from typing import TYPE_CHECKING, Dict, List, Literal, Tuple, Union +from qiling.const import QL_ARCH +from qiling.extensions import trace +from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL + +if TYPE_CHECKING: + from qiling.core import Qiling + +def perm2uc(permstr: str) -> int: + '''convert "-rwx" to unicorn const''' + perm = UC_PROT_NONE + dic = { + "r": UC_PROT_READ, + "w": UC_PROT_WRITE, + "x": UC_PROT_EXEC, + } + for ch in permstr: + perm += dic.get(ch, 0) + return perm + + +class R2Data: + def __init__(self, **kwargs): + names = set([f.name for f in fields(self)]) + for k, v in kwargs.items(): + if k in names: + setattr(self, k, v) + + +@dataclass(unsafe_hash=True, init=False) +class Section(R2Data): + name: str + size: int + vsize: int + paddr: int + vaddr: int + perm: int + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.perm = perm2uc(self.perm) + + +@dataclass(unsafe_hash=True, init=False) +class String(R2Data): + string: str + vaddr: int + paddr: int + size: int + length: int + section: str = None + + +@dataclass(unsafe_hash=True, init=False) +class Symbol(R2Data): + # see https://github.com/rizinorg/rizin/blob/dev/librz/include/rz_bin.h + SymbolType = Literal["NOTYPE", "OBJ", "FUNC", "FIELD", "IFACE", "METH", "STATIC", "SECT", + "FILE", "COMMON", "TLS", "NUM", "LOOS", "HIOS", "LOPROC", "HIPROC", "SPCL", "UNK"] + + SymbolBind = Literal["LOCAL", "GLOBAL", "WEAK", "NUM", "LOOS", "HIOS", "LOPROC", "HIPROC", "IMPORT", "UNKNOWN"] + + name: str + realname: str + bind: SymbolBind + size: int + type: SymbolType + vaddr: int + paddr: int + is_imported: bool + + +@dataclass(unsafe_hash=True, init=False) +class Function(R2Data): + name: str + offset: int + size: int + signature: str + + +@dataclass(unsafe_hash=True, init=False) +class Flag(R2Data): + offset: int + name: str = '' + size: int = 0 + + def __lt__(self, other): + return self.offset < other.offset + + +@dataclass(unsafe_hash=True, init=False) +class Xref(R2Data): + XrefType = Literal["NULL", "CODE", "CALL", "DATA", "STRN", "UNKN"] + + name: str + fromaddr: int # from is reserved word in Python + type: XrefType + perm: int + addr: int + refname: str + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.fromaddr = kwargs["from"] + self.perm = perm2uc(self.perm) + + def __lt__(self, other): + return self.fromaddr < other.fromaddr + + +class R2: + def __init__(self, ql: "Qiling", baseaddr=(1 << 64) - 1, loadaddr=0): + super().__init__() + self.ql = ql + # r2 -B [baddr] set base address for PIE binaries + self.baseaddr = baseaddr + self.loadaddr = loadaddr # r2 -m [addr] map file at given address + self.analyzed = False + self._r2c = libr.r_core.r_core_new() + if ql.code: + self._setup_code(ql.code) + else: + self._setup_file(ql.path) + + def _qlarch2r(self, archtype: QL_ARCH) -> str: + return { + QL_ARCH.X86: "x86", + QL_ARCH.X8664: "x86", + QL_ARCH.ARM: "arm", + QL_ARCH.ARM64: "arm", + QL_ARCH.A8086: "x86", + QL_ARCH.EVM: "evm.cs", + QL_ARCH.CORTEX_M: "arm", + QL_ARCH.MIPS: "mips", + QL_ARCH.RISCV: "riscv", + QL_ARCH.RISCV64: "riscv", + QL_ARCH.PPC: "ppc", + }[archtype] + + def _setup_code(self, code: bytes): + path = f'malloc://{len(code)}'.encode() + fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_ALL, self.loadaddr) + libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr) + self._cmd(f'wx {code.hex()}') + # set architecture and bits for r2 asm + arch = self._qlarch2r(self.ql.arch.type) + self._cmd(f"e,asm.arch={arch},asm.bits={self.ql.arch.bits}") + + def _setup_file(self, path: str): + path = path.encode() + fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_READ | UC_PROT_EXEC, self.loadaddr) + libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr) + + def _cmd(self, cmd: str) -> str: + r = libr.r_core.r_core_cmd_str( + self._r2c, ctypes.create_string_buffer(cmd.encode("utf-8"))) + return ctypes.string_at(r).decode('utf-8') + + @staticmethod + def aaa(fun): + @wraps(fun) + def wrapper(self): + if self.analyzed is False: + self._cmd("aaa") + self.analyzed = True + return fun(self) + return wrapper + + def _cmdj(self, cmd: str) -> Union[Dict, List[Dict]]: + return json.loads(self._cmd(cmd)) + + @cached_property + def binfo(self) -> Dict[str, str]: + return self._cmdj("iIj") + + @cached_property + def baddr(self) -> int: + return self.binfo["baddr"] + + @cached_property + def bintype(self) -> str: + return self.binfo["bintype"] + + @cached_property + def sections(self) -> Dict[str, Section]: + sec_lst = self._cmdj("iSj") + return {dic['name']: Section(**dic) for dic in sec_lst} + + @cached_property + def strings(self) -> Dict[str, String]: + str_lst = self._cmdj("izzj") + return {dic['string']: String(**dic) for dic in str_lst} + + @cached_property + def symbols(self) -> Dict[str, Symbol]: + sym_lst = self._cmdj("isj") + return {dic['name']: Symbol(**dic).vaddr for dic in sym_lst} + + @cached_property + @aaa + def functions(self) -> Dict[str, Function]: + fcn_lst = self._cmdj("aflj") + return {dic['name']: Function(**dic) for dic in fcn_lst} + + @cached_property + @aaa + def flags(self) -> List[Flag]: + return [Flag(**dic) for dic in self._cmdj("fj")] + + @cached_property + @aaa + def xrefs(self) -> List[Xref]: + return [Xref(**dic) for dic in self._cmdj("axj")] + + def at(self, addr: int) -> Tuple[Flag, int]: + # the most suitable flag should have address <= addr + # bisect_right find the insertion point, right side if value exists + idx = bisect.bisect_right(self.flags, Flag(offset=addr)) + # minus 1 to find the corresponding flag + flag = self.flags[idx - 1] + return flag, addr - flag.offset + + def refrom(self, addr: int) -> List[Xref]: + return [x for x in self.xrefs if x.fromaddr == addr] + + def refto(self, addr: int) -> List[Xref]: + return [x for x in self.xrefs if x.addr == addr] + + def read(self, addr: int, size: int) -> bytes: + hexstr = self._cmd(f"p8 {size} @ {addr}") + return bytes.fromhex(hexstr) + + def enable_trace(self, mode='full'): + # simple map from addr to flag name, cannot resolve addresses in the middle + self.ql.loader.symsmap = {flag.offset: flag.name for flag in self.flags} + if mode == 'full': + trace.enable_full_trace(self.ql) + elif mode == 'history': + trace.enable_history_trace(self.ql) + + def __del__(self): + libr.r_core.r_core_free(self._r2c) diff --git a/setup.py b/setup.py index 0626555f2..84b0648c8 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,7 @@ "gevent>=20.9.0", "multiprocess>=0.70.12.2", "windows-curses>=2.1.0;platform_system=='Windows'", - "pyyaml>=6.0" + "pyyaml>=6.0", ] extras = { @@ -42,6 +42,9 @@ "fuzz" : [ "unicornafl>=2.0.0;platform_system=='Windows'", "fuzzercorn>=0.0.1;platform_system=='Linux'" + ], + "RE": [ + "r2libr>=5.7.4", ] } diff --git a/tests/test_r2.py b/tests/test_r2.py new file mode 100644 index 000000000..ec28cd11b --- /dev/null +++ b/tests/test_r2.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 + +import sys, unittest + +sys.path.append("..") +from qiling import Qiling +from qiling.const import QL_VERBOSE +from qiling.extensions.r2.r2 import R2 + + +EVM_CODE = bytes.fromhex("6060604052341561000f57600080fd5b60405160208061031c833981016040528080519060200190919050508060018190556000803373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff1681526020019081526020016000208190555050610299806100836000396000f300606060405260043610610057576000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff16806318160ddd1461005c57806370a0823114610085578063a9059cbb146100d2575b600080fd5b341561006757600080fd5b61006f61012c565b6040518082815260200191505060405180910390f35b341561009057600080fd5b6100bc600480803573ffffffffffffffffffffffffffffffffffffffff16906020019091905050610132565b6040518082815260200191505060405180910390f35b34156100dd57600080fd5b610112600480803573ffffffffffffffffffffffffffffffffffffffff1690602001909190803590602001909190505061017a565b604051808215151515815260200191505060405180910390f35b60015481565b60008060008373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff168152602001908152602001600020549050919050565b600080826000803373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff1681526020019081526020016000205403101515156101cb57600080fd5b816000803373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff16815260200190815260200160002060008282540392505081905550816000808573ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff1681526020019081526020016000206000828254019250508190555060019050929150505600a165627a7a7230582098f1551a391a3e65b3ce45cfa2b3fa5f91eea9a3e7181a81454e025ea0d7151c0029") + + +class R2Test(unittest.TestCase): + def test_shellcode_disasm(self): + ql = Qiling(code=EVM_CODE, archtype="evm", verbose=QL_VERBOSE.DEBUG) + r2 = R2(ql) + pd = r2._cmd("pd 32") + self.assertTrue('invalid' not in pd) + + +if __name__ == "__main__": + unittest.main()