-
Notifications
You must be signed in to change notification settings - Fork 778
feat: add r2 extension #1172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add r2 extension #1172
Changes from all commits
5456e42
9900c10
ebfa2d5
c1dcc23
a550019
ca755a2
e2cf9e2
1252a38
f31b083
24ec8fd
faf1537
cee4265
e4aad7d
f6cf40a
682ee74
027a926
a9d7594
5ca4277
cf51ee9
6b1a347
86c3c9f
b2a2311
48c3699
7c0fd6a
7ce8109
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| #!/usr/bin/env python3 | ||
| # | ||
| # Cross Platform and Multi Architecture Advanced Binary Emulation Framework | ||
| # | ||
|
|
||
| import sys | ||
| sys.path.append('..') | ||
|
|
||
| from qiling import Qiling | ||
| from qiling.const import QL_VERBOSE | ||
| from qiling.extensions.r2 import R2 | ||
|
|
||
|
|
||
| def func(ql: Qiling, *args, **kwargs): | ||
| ql.os.stdout.write(b"=====hooked main=====!\n") | ||
| return | ||
|
|
||
| def my_sandbox(path, rootfs): | ||
| ql = Qiling(path, rootfs, verbose=QL_VERBOSE.DEBUG) | ||
| r2 = R2(ql) | ||
|
|
||
| # search bytes sequence using ql.mem.search | ||
| addrs = ql.mem.search(b'llo worl') # return all matching results | ||
| print(r2.at(addrs[0])) # find corresponding flag at the address and the offset to the flag | ||
| # search string using r2 | ||
| addr = r2.strings['Hello world!'].vaddr # key must be exactly same | ||
| print(addrs[0], addr) | ||
| # print xref to string "Hello world!" | ||
| print(r2.refto(addr)) | ||
| # write to string using ql.mem.write | ||
| ql.mem.write(addr, b"No hello, Bye!\x00") | ||
|
|
||
| # get function address and hook it | ||
| ql.hook_address(func, r2.functions['main'].offset) | ||
|
elicn marked this conversation as resolved.
|
||
| # enable trace powered by r2 symsmap | ||
| r2.enable_trace() | ||
| ql.run() | ||
|
|
||
| if __name__ == "__main__": | ||
| my_sandbox(["rootfs/x86_windows/bin/x86_hello.exe"], "rootfs/x86_windows") | ||
|
|
||
| # test shellcode mode | ||
| ARM64_LIN = bytes.fromhex('420002ca210080d2400080d2c81880d2010000d4e60300aa01020010020280d2681980d2010000d4410080d2420002cae00306aa080380d2010000d4210400f165ffff54e0000010420002ca210001caa81b80d2010000d4020004d27f0000012f62696e2f736800') | ||
| print("\nLinux ARM 64bit Shellcode") | ||
| ql = Qiling(code=ARM64_LIN, archtype="arm64", ostype="linux", verbose=QL_VERBOSE.DEBUG) | ||
| r2 = R2(ql) | ||
| # disassemble 32 instructions | ||
| print(r2._cmd('pd 32')) | ||
| ql.run() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| from .r2 import R2 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,252 @@ | ||
| #!/usr/bin/env python3 | ||
| # | ||
| # Cross Platform and Multi Architecture Advanced Binary Emulation Framework | ||
| # | ||
|
|
||
| import bisect | ||
| import ctypes | ||
| import json | ||
| import libr | ||
| from dataclasses import dataclass, fields | ||
| from functools import cached_property, wraps | ||
| from typing import TYPE_CHECKING, Dict, List, Literal, Tuple, Union | ||
| from qiling.const import QL_ARCH | ||
| from qiling.extensions import trace | ||
| from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL | ||
|
|
||
| if TYPE_CHECKING: | ||
| from qiling.core import Qiling | ||
|
|
||
| def perm2uc(permstr: str) -> int: | ||
| '''convert "-rwx" to unicorn const''' | ||
| perm = UC_PROT_NONE | ||
| dic = { | ||
| "r": UC_PROT_READ, | ||
| "w": UC_PROT_WRITE, | ||
| "x": UC_PROT_EXEC, | ||
| } | ||
| for ch in permstr: | ||
| perm += dic.get(ch, 0) | ||
| return perm | ||
|
|
||
|
|
||
| class R2Data: | ||
| def __init__(self, **kwargs): | ||
|
chinggg marked this conversation as resolved.
|
||
| names = set([f.name for f in fields(self)]) | ||
| for k, v in kwargs.items(): | ||
| if k in names: | ||
| setattr(self, k, v) | ||
|
|
||
|
|
||
| @dataclass(unsafe_hash=True, init=False) | ||
| class Section(R2Data): | ||
| name: str | ||
| size: int | ||
| vsize: int | ||
| paddr: int | ||
| vaddr: int | ||
| perm: int | ||
|
|
||
| def __init__(self, **kwargs): | ||
| super().__init__(**kwargs) | ||
| self.perm = perm2uc(self.perm) | ||
|
|
||
|
|
||
| @dataclass(unsafe_hash=True, init=False) | ||
| class String(R2Data): | ||
| string: str | ||
| vaddr: int | ||
| paddr: int | ||
| size: int | ||
| length: int | ||
| section: str = None | ||
|
|
||
|
|
||
| @dataclass(unsafe_hash=True, init=False) | ||
| class Symbol(R2Data): | ||
| # see https://github.com/rizinorg/rizin/blob/dev/librz/include/rz_bin.h | ||
| SymbolType = Literal["NOTYPE", "OBJ", "FUNC", "FIELD", "IFACE", "METH", "STATIC", "SECT", | ||
| "FILE", "COMMON", "TLS", "NUM", "LOOS", "HIOS", "LOPROC", "HIPROC", "SPCL", "UNK"] | ||
|
|
||
| SymbolBind = Literal["LOCAL", "GLOBAL", "WEAK", "NUM", "LOOS", "HIOS", "LOPROC", "HIPROC", "IMPORT", "UNKNOWN"] | ||
|
|
||
| name: str | ||
| realname: str | ||
| bind: SymbolBind | ||
| size: int | ||
| type: SymbolType | ||
| vaddr: int | ||
| paddr: int | ||
| is_imported: bool | ||
|
|
||
|
|
||
| @dataclass(unsafe_hash=True, init=False) | ||
| class Function(R2Data): | ||
| name: str | ||
| offset: int | ||
| size: int | ||
| signature: str | ||
|
|
||
|
|
||
| @dataclass(unsafe_hash=True, init=False) | ||
| class Flag(R2Data): | ||
| offset: int | ||
| name: str = '' | ||
| size: int = 0 | ||
|
|
||
| def __lt__(self, other): | ||
| return self.offset < other.offset | ||
|
|
||
|
|
||
| @dataclass(unsafe_hash=True, init=False) | ||
| class Xref(R2Data): | ||
| XrefType = Literal["NULL", "CODE", "CALL", "DATA", "STRN", "UNKN"] | ||
|
|
||
| name: str | ||
| fromaddr: int # from is reserved word in Python | ||
| type: XrefType | ||
| perm: int | ||
| addr: int | ||
| refname: str | ||
|
|
||
| def __init__(self, **kwargs): | ||
| super().__init__(**kwargs) | ||
| self.fromaddr = kwargs["from"] | ||
| self.perm = perm2uc(self.perm) | ||
|
|
||
| def __lt__(self, other): | ||
| return self.fromaddr < other.fromaddr | ||
|
|
||
|
|
||
| class R2: | ||
| def __init__(self, ql: "Qiling", baseaddr=(1 << 64) - 1, loadaddr=0): | ||
| super().__init__() | ||
| self.ql = ql | ||
| # r2 -B [baddr] set base address for PIE binaries | ||
| self.baseaddr = baseaddr | ||
| self.loadaddr = loadaddr # r2 -m [addr] map file at given address | ||
| self.analyzed = False | ||
| self._r2c = libr.r_core.r_core_new() | ||
| if ql.code: | ||
| self._setup_code(ql.code) | ||
| else: | ||
| self._setup_file(ql.path) | ||
|
|
||
| def _qlarch2r(self, archtype: QL_ARCH) -> str: | ||
| return { | ||
| QL_ARCH.X86: "x86", | ||
| QL_ARCH.X8664: "x86", | ||
| QL_ARCH.ARM: "arm", | ||
| QL_ARCH.ARM64: "arm", | ||
| QL_ARCH.A8086: "x86", | ||
| QL_ARCH.EVM: "evm.cs", | ||
| QL_ARCH.CORTEX_M: "arm", | ||
| QL_ARCH.MIPS: "mips", | ||
| QL_ARCH.RISCV: "riscv", | ||
| QL_ARCH.RISCV64: "riscv", | ||
| QL_ARCH.PPC: "ppc", | ||
| }[archtype] | ||
|
|
||
| def _setup_code(self, code: bytes): | ||
| path = f'malloc://{len(code)}'.encode() | ||
| fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_ALL, self.loadaddr) | ||
| libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr) | ||
| self._cmd(f'wx {code.hex()}') | ||
| # set architecture and bits for r2 asm | ||
| arch = self._qlarch2r(self.ql.arch.type) | ||
| self._cmd(f"e,asm.arch={arch},asm.bits={self.ql.arch.bits}") | ||
|
|
||
| def _setup_file(self, path: str): | ||
| path = path.encode() | ||
| fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_READ | UC_PROT_EXEC, self.loadaddr) | ||
| libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr) | ||
|
|
||
| def _cmd(self, cmd: str) -> str: | ||
|
chinggg marked this conversation as resolved.
|
||
| r = libr.r_core.r_core_cmd_str( | ||
| self._r2c, ctypes.create_string_buffer(cmd.encode("utf-8"))) | ||
| return ctypes.string_at(r).decode('utf-8') | ||
|
|
||
| @staticmethod | ||
| def aaa(fun): | ||
| @wraps(fun) | ||
| def wrapper(self): | ||
| if self.analyzed is False: | ||
| self._cmd("aaa") | ||
| self.analyzed = True | ||
| return fun(self) | ||
| return wrapper | ||
|
|
||
| def _cmdj(self, cmd: str) -> Union[Dict, List[Dict]]: | ||
| return json.loads(self._cmd(cmd)) | ||
|
|
||
| @cached_property | ||
| def binfo(self) -> Dict[str, str]: | ||
| return self._cmdj("iIj") | ||
|
|
||
| @cached_property | ||
| def baddr(self) -> int: | ||
| return self.binfo["baddr"] | ||
|
|
||
| @cached_property | ||
| def bintype(self) -> str: | ||
| return self.binfo["bintype"] | ||
|
|
||
| @cached_property | ||
| def sections(self) -> Dict[str, Section]: | ||
| sec_lst = self._cmdj("iSj") | ||
| return {dic['name']: Section(**dic) for dic in sec_lst} | ||
|
|
||
| @cached_property | ||
| def strings(self) -> Dict[str, String]: | ||
| str_lst = self._cmdj("izzj") | ||
| return {dic['string']: String(**dic) for dic in str_lst} | ||
|
|
||
| @cached_property | ||
| def symbols(self) -> Dict[str, Symbol]: | ||
| sym_lst = self._cmdj("isj") | ||
| return {dic['name']: Symbol(**dic).vaddr for dic in sym_lst} | ||
|
|
||
| @cached_property | ||
| @aaa | ||
| def functions(self) -> Dict[str, Function]: | ||
| fcn_lst = self._cmdj("aflj") | ||
| return {dic['name']: Function(**dic) for dic in fcn_lst} | ||
|
|
||
| @cached_property | ||
| @aaa | ||
| def flags(self) -> List[Flag]: | ||
| return [Flag(**dic) for dic in self._cmdj("fj")] | ||
|
|
||
| @cached_property | ||
| @aaa | ||
| def xrefs(self) -> List[Xref]: | ||
| return [Xref(**dic) for dic in self._cmdj("axj")] | ||
|
|
||
| def at(self, addr: int) -> Tuple[Flag, int]: | ||
| # the most suitable flag should have address <= addr | ||
| # bisect_right find the insertion point, right side if value exists | ||
| idx = bisect.bisect_right(self.flags, Flag(offset=addr)) | ||
| # minus 1 to find the corresponding flag | ||
| flag = self.flags[idx - 1] | ||
| return flag, addr - flag.offset | ||
|
|
||
| def refrom(self, addr: int) -> List[Xref]: | ||
| return [x for x in self.xrefs if x.fromaddr == addr] | ||
|
|
||
| def refto(self, addr: int) -> List[Xref]: | ||
| return [x for x in self.xrefs if x.addr == addr] | ||
|
|
||
| def read(self, addr: int, size: int) -> bytes: | ||
| hexstr = self._cmd(f"p8 {size} @ {addr}") | ||
| return bytes.fromhex(hexstr) | ||
|
|
||
| def enable_trace(self, mode='full'): | ||
| # simple map from addr to flag name, cannot resolve addresses in the middle | ||
| self.ql.loader.symsmap = {flag.offset: flag.name for flag in self.flags} | ||
| if mode == 'full': | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use an enumeration instead.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just a PoC. The trace extension made by @elicn need a |
||
| trace.enable_full_trace(self.ql) | ||
| elif mode == 'history': | ||
| trace.enable_history_trace(self.ql) | ||
|
|
||
| def __del__(self): | ||
| libr.r_core.r_core_free(self._r2c) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| #!/usr/bin/env python3 | ||
|
|
||
| import sys, unittest | ||
|
|
||
| sys.path.append("..") | ||
| from qiling import Qiling | ||
| from qiling.const import QL_VERBOSE | ||
| from qiling.extensions.r2.r2 import R2 | ||
|
|
||
|
|
||
| EVM_CODE = bytes.fromhex("6060604052341561000f57600080fd5b60405160208061031c833981016040528080519060200190919050508060018190556000803373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff1681526020019081526020016000208190555050610299806100836000396000f300606060405260043610610057576000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff16806318160ddd1461005c57806370a0823114610085578063a9059cbb146100d2575b600080fd5b341561006757600080fd5b61006f61012c565b6040518082815260200191505060405180910390f35b341561009057600080fd5b6100bc600480803573ffffffffffffffffffffffffffffffffffffffff16906020019091905050610132565b6040518082815260200191505060405180910390f35b34156100dd57600080fd5b610112600480803573ffffffffffffffffffffffffffffffffffffffff1690602001909190803590602001909190505061017a565b604051808215151515815260200191505060405180910390f35b60015481565b60008060008373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff168152602001908152602001600020549050919050565b600080826000803373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff1681526020019081526020016000205403101515156101cb57600080fd5b816000803373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff16815260200190815260200160002060008282540392505081905550816000808573ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff1681526020019081526020016000206000828254019250508190555060019050929150505600a165627a7a7230582098f1551a391a3e65b3ce45cfa2b3fa5f91eea9a3e7181a81454e025ea0d7151c0029") | ||
|
|
||
|
|
||
| class R2Test(unittest.TestCase): | ||
| def test_shellcode_disasm(self): | ||
| ql = Qiling(code=EVM_CODE, archtype="evm", verbose=QL_VERBOSE.DEBUG) | ||
| r2 = R2(ql) | ||
| pd = r2._cmd("pd 32") | ||
| self.assertTrue('invalid' not in pd) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| unittest.main() |
Uh oh!
There was an error while loading. Please reload this page.