From 5456e42bff6fcf2cdaf8ba18a3892cbd33fd7f1c Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Tue, 14 Jun 2022 16:28:10 +0800 Subject: [PATCH 01/25] feat: add r2 extension --- examples/extensions/r2/hello_r2.py | 30 ++++++ qiling/extensions/r2/__init__.py | 1 + qiling/extensions/r2/r2.py | 156 +++++++++++++++++++++++++++++ setup.py | 3 + 4 files changed, 190 insertions(+) create mode 100644 examples/extensions/r2/hello_r2.py create mode 100644 qiling/extensions/r2/__init__.py create mode 100644 qiling/extensions/r2/r2.py diff --git a/examples/extensions/r2/hello_r2.py b/examples/extensions/r2/hello_r2.py new file mode 100644 index 000000000..91f91891f --- /dev/null +++ b/examples/extensions/r2/hello_r2.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python3 +# +# Cross Platform and Multi Architecture Advanced Binary Emulation Framework +# + +import sys +sys.path.append('..') + +from qiling import * +from qiling.const import QL_VERBOSE +from qiling.extensions.r2 import R2 + + +def func(ql: Qiling, *args, **kwargs): + ql.os.stdout.write(b"=====hooked main=====!\n") + return + +def my_sandbox(path, rootfs): + ql = Qiling(path, rootfs, verbose=QL_VERBOSE.DEFAULT) + r2 = R2(ql) + assert(ql.loader.images[0].base == r2.baddr) + addrs = ql.mem.search(b'Hello world!') + addr = r2.strings['Hello world!'].vaddr + assert(addr == addrs[0]) + ql.mem.write(addr, b"No hello, Bye!\x00") + ql.hook_address(func, r2.functions['main'].offset) + ql.run() + +if __name__ == "__main__": + my_sandbox(["rootfs/x86_windows/bin/x86_hello.exe"], "rootfs/x86_windows") diff --git a/qiling/extensions/r2/__init__.py b/qiling/extensions/r2/__init__.py new file mode 100644 index 000000000..d8f86c32a --- /dev/null +++ b/qiling/extensions/r2/__init__.py @@ -0,0 +1 @@ +from .r2 import R2 diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py new file mode 100644 index 000000000..7771bd5db --- /dev/null +++ b/qiling/extensions/r2/r2.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python3 +# +# Cross Platform and Multi Architecture Advanced Binary Emulation Framework +# + +import functools +import json +import ctypes +import libr +from dataclasses import dataclass, fields +from enum import Enum +from qiling.core import Qiling + +@dataclass(unsafe_hash=True) +class Function: + name: str + offset: int + size: int + signature: str + + def __init__(self, **kwargs): + names = set([f.name for f in fields(self)]) + for k, v in kwargs.items(): + if k in names: + setattr(self, k, v) + + +@dataclass(unsafe_hash=True) +class Section: + name: str + size: int + vsize: int + paddr: int + vaddr: int + perm: str # TODO: use int or enum + + def __init__(self, **kwargs): + names = set([f.name for f in fields(self)]) + for k, v in kwargs.items(): + if k in names: + setattr(self, k, v) + + +@dataclass(unsafe_hash=True) +class String: + string: str + vaddr: int + paddr: int + size: int + length: int + section: str = None + + def __init__(self, **kwargs): + names = set([f.name for f in fields(self)]) + for k, v in kwargs.items(): + if k in names: + setattr(self, k, v) + + +@dataclass(unsafe_hash=True) +class Symbol: + # see https://github.com/rizinorg/rizin/blob/dev/librz/include/rz_bin.h + class SymbolType(str, Enum): + NOTYPE = "NOTYPE" + OBJ = "OBJ" + FUNC = "FUNC" + FIELD = "FIELD" + IFACE = "IFACE" + METH = "METH" + STATIC = "STATIC" + SECT = "SECT" + FILE = "FILE" + COMMON = "COMMON" + TLS = "TLS" + NUM = "NUM" + LOOS = "LOOS" + HIOS = "HIOS" + LOPROC = "LOPROC" + HIPROC = "HIPROC" + SPCL = "SPCL" + UNK = "UNK" + + class SymbolBind(str, Enum): + LOCAL = "LOCAL" + GLOBAL = "GLOBAL" + WEAK = "WEAK" + NUM = "NUM" + LOOS = "LOOS" + HIOS = "HIOS" + LOPROC = "LOPROC" + HIPROC = "HIPROC" + IMPORT = "IMPORT" + UNKNOWN = "UNKNOWN" + + name: str + realname: str + bind: str + size: int + type: SymbolType + vaddr: int + paddr: int + is_imported: bool + + def __init__(self, **kwargs): + names = set([f.name for f in fields(self)]) + for k, v in kwargs.items(): + if k in names: + setattr(self, k, v) + + +class R2: + def __init__(self, ql: Qiling): + super().__init__() + path = ql.path.encode() + self._r2c = libr.r_core.r_core_new() + fh = libr.r_core.r_core_file_open(self._r2c, path, 0b101, 0) + libr.r_core.r_core_bin_load(self._r2c, path, (1 << 64) - 1) + + def _cmd(self, cmd: str): + r = libr.r_core.r_core_cmd_str( + self._r2c, ctypes.create_string_buffer(cmd.encode("utf-8"))) + return ctypes.string_at(r).decode('utf-8') + + @functools.cached_property + def sections(self) -> dict[str, Section]: + res = self._cmd("iSj") + sec_lst = json.loads(res) + return {dic['name']: Section(**dic) for dic in sec_lst} + + @functools.cached_property + def strings(self) -> dict[str, String]: + res = self._cmd("izzj") + str_lst = json.loads(res) + return {dic['string']: String(**dic) for dic in str_lst} + + @functools.cached_property + def symbols(self) -> dict[str, Symbol]: + res = self._cmd("isj") + sym_lst = json.loads(res) + return {dic['name']: Symbol(**dic).vaddr for dic in sym_lst} + + @functools.cached_property + def functions(self) -> dict[str, Function]: + self._cmd("aaa") + res = self._cmd("aflj") + fcn_lst = json.loads(res) + return {dic['name']: Function(**dic) for dic in fcn_lst} + + @functools.cached_property + def baddr(self) -> int: + _bin = ctypes.cast(self._r2c.contents.bin, + ctypes.POINTER(libr.r_bin.RBin)) + return libr.r_bin.r_bin_get_baddr(_bin) + + def __del__(self): + libr.r_core.r_core_free(self._r2c) diff --git a/setup.py b/setup.py index 0626555f2..31b13372b 100644 --- a/setup.py +++ b/setup.py @@ -42,6 +42,9 @@ "fuzz" : [ "unicornafl>=2.0.0;platform_system=='Windows'", "fuzzercorn>=0.0.1;platform_system=='Linux'" + ], + "SCA" : [ + "r2libr" ] } From 9900c1010233d4c9401a9054def540b1c5021006 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Tue, 14 Jun 2022 21:20:00 +0800 Subject: [PATCH 02/25] style: avoid importing namespace pollution --- examples/extensions/r2/hello_r2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/extensions/r2/hello_r2.py b/examples/extensions/r2/hello_r2.py index 91f91891f..5ba16d0df 100644 --- a/examples/extensions/r2/hello_r2.py +++ b/examples/extensions/r2/hello_r2.py @@ -6,7 +6,7 @@ import sys sys.path.append('..') -from qiling import * +from qiling import Qiling from qiling.const import QL_VERBOSE from qiling.extensions.r2 import R2 From ebfa2d5b41583e8f5fdd3b01ce0f476d5f0a7ef2 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Sat, 18 Jun 2022 09:49:45 +0800 Subject: [PATCH 03/25] Add base class R2Data --- qiling/extensions/r2/r2.py | 39 ++++++++++++++++---------------------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 7771bd5db..829f06452 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -12,21 +12,27 @@ from qiling.core import Qiling @dataclass(unsafe_hash=True) -class Function: +class R2Data: + def __init__(self, **kwargs): + names = set([f.name for f in fields(self)]) + for k, v in kwargs.items(): + if k in names: + setattr(self, k, v) + + +@dataclass(unsafe_hash=True) +class Function(R2Data): name: str offset: int size: int signature: str def __init__(self, **kwargs): - names = set([f.name for f in fields(self)]) - for k, v in kwargs.items(): - if k in names: - setattr(self, k, v) + super().__init__(**kwargs) @dataclass(unsafe_hash=True) -class Section: +class Section(R2Data): name: str size: int vsize: int @@ -35,14 +41,10 @@ class Section: perm: str # TODO: use int or enum def __init__(self, **kwargs): - names = set([f.name for f in fields(self)]) - for k, v in kwargs.items(): - if k in names: - setattr(self, k, v) - + super().__init__(**kwargs) @dataclass(unsafe_hash=True) -class String: +class String(R2Data): string: str vaddr: int paddr: int @@ -51,14 +53,11 @@ class String: section: str = None def __init__(self, **kwargs): - names = set([f.name for f in fields(self)]) - for k, v in kwargs.items(): - if k in names: - setattr(self, k, v) + super().__init__(**kwargs) @dataclass(unsafe_hash=True) -class Symbol: +class Symbol(R2Data): # see https://github.com/rizinorg/rizin/blob/dev/librz/include/rz_bin.h class SymbolType(str, Enum): NOTYPE = "NOTYPE" @@ -101,12 +100,6 @@ class SymbolBind(str, Enum): paddr: int is_imported: bool - def __init__(self, **kwargs): - names = set([f.name for f in fields(self)]) - for k, v in kwargs.items(): - if k in names: - setattr(self, k, v) - class R2: def __init__(self, ql: Qiling): From c1dcc23055479c9d1627214165025051ee293531 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Sat, 18 Jun 2022 15:10:13 +0800 Subject: [PATCH 04/25] Support shellcode mode and set r2 asm arch and bits - eliminate magic number of baseaddr and loadaddr - update example of shellcode mode --- examples/extensions/r2/hello_r2.py | 23 +++++++++++++++---- qiling/extensions/r2/r2.py | 36 ++++++++++++++++++++++++------ 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/examples/extensions/r2/hello_r2.py b/examples/extensions/r2/hello_r2.py index 5ba16d0df..bf2b79feb 100644 --- a/examples/extensions/r2/hello_r2.py +++ b/examples/extensions/r2/hello_r2.py @@ -5,6 +5,7 @@ import sys sys.path.append('..') +from binascii import unhexlify from qiling import Qiling from qiling.const import QL_VERBOSE @@ -18,13 +19,27 @@ def func(ql: Qiling, *args, **kwargs): def my_sandbox(path, rootfs): ql = Qiling(path, rootfs, verbose=QL_VERBOSE.DEFAULT) r2 = R2(ql) - assert(ql.loader.images[0].base == r2.baddr) - addrs = ql.mem.search(b'Hello world!') - addr = r2.strings['Hello world!'].vaddr - assert(addr == addrs[0]) + + # search bytes sequence using ql.mem.search + addrs = ql.mem.search(b'Hello worl') # return all matching results + # search string using r2 + addr = r2.strings['Hello world!'].vaddr # key must be exactly same + print(r2.strings['Hello world!'].__class__) + # write to string using ql.mem.write ql.mem.write(addr, b"No hello, Bye!\x00") + + # get function address and hook it ql.hook_address(func, r2.functions['main'].offset) ql.run() if __name__ == "__main__": my_sandbox(["rootfs/x86_windows/bin/x86_hello.exe"], "rootfs/x86_windows") + + # test shellcode mode + ARM64_LIN = unhexlify('420002ca210080d2400080d2c81880d2010000d4e60300aa01020010020280d2681980d2010000d4410080d2420002cae00306aa080380d2010000d4210400f165ffff54e0000010420002ca210001caa81b80d2010000d4020004d27f0000012f62696e2f736800') + print("\nLinux ARM 64bit Shellcode") + ql = Qiling(code=ARM64_LIN, archtype="arm64", ostype="linux", verbose=QL_VERBOSE.DEBUG) + r2 = R2(ql) + # disassemble 32 instructions + print(r2._cmd('pd 32')) + ql.run() diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 829f06452..b5f9c54cc 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -3,13 +3,14 @@ # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # +import ctypes import functools import json -import ctypes import libr from dataclasses import dataclass, fields from enum import Enum from qiling.core import Qiling +from unicorn import UC_PROT_READ, UC_PROT_EXEC, UC_PROT_ALL @dataclass(unsafe_hash=True) class R2Data: @@ -102,14 +103,35 @@ class SymbolBind(str, Enum): class R2: - def __init__(self, ql: Qiling): + def __init__(self, ql: Qiling, baseaddr=(1 << 64) - 1, loadaddr=0): super().__init__() - path = ql.path.encode() - self._r2c = libr.r_core.r_core_new() - fh = libr.r_core.r_core_file_open(self._r2c, path, 0b101, 0) - libr.r_core.r_core_bin_load(self._r2c, path, (1 << 64) - 1) + self.ql = ql + self.baseaddr = baseaddr # r2 -B [baddr] set base address for PIE binaries + self.loadaddr = loadaddr # r2 -m [addr] map file at given address - def _cmd(self, cmd: str): + self._r2c = libr.r_core.r_core_new() + if ql.code: + self._setup_code() + else: + self._setup_file() + + # set architecture and bits for r2 asm + self._cmd(f"e,asm.arch={ql.arch.type.name.lower().removesuffix('64')},asm.bits={ql.arch.bits}") + + def _setup_code(self): + sz = len(self.ql.code) + path = f'malloc://{sz}'.encode() + fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_ALL, self.loadaddr) + libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr) + cmd = f'wx {self.ql.code.hex()}' + self._cmd(cmd) + + def _setup_file(self): + path = self.ql.path.encode() + fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_READ | UC_PROT_EXEC, self.loadaddr) + libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr) + + def _cmd(self, cmd: str) -> str: r = libr.r_core.r_core_cmd_str( self._r2c, ctypes.create_string_buffer(cmd.encode("utf-8"))) return ctypes.string_at(r).decode('utf-8') From a550019bce7a843b2ccb04d919d9b1408c4f8851 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Sun, 19 Jun 2022 21:43:53 +0800 Subject: [PATCH 05/25] Clean r2 extension code - Remove redundant __init__ - Abstract `cmdj` to parse json in only one place - avoid importing whole functool --- qiling/extensions/r2/r2.py | 45 +++++++++++++++----------------------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index b5f9c54cc..190052917 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -4,15 +4,15 @@ # import ctypes -import functools import json import libr from dataclasses import dataclass, fields from enum import Enum +from functools import cached_property from qiling.core import Qiling from unicorn import UC_PROT_READ, UC_PROT_EXEC, UC_PROT_ALL -@dataclass(unsafe_hash=True) + class R2Data: def __init__(self, **kwargs): names = set([f.name for f in fields(self)]) @@ -21,18 +21,15 @@ def __init__(self, **kwargs): setattr(self, k, v) -@dataclass(unsafe_hash=True) +@dataclass(unsafe_hash=True, init=False) class Function(R2Data): name: str offset: int size: int signature: str - def __init__(self, **kwargs): - super().__init__(**kwargs) - -@dataclass(unsafe_hash=True) +@dataclass(unsafe_hash=True, init=False) class Section(R2Data): name: str size: int @@ -41,10 +38,8 @@ class Section(R2Data): vaddr: int perm: str # TODO: use int or enum - def __init__(self, **kwargs): - super().__init__(**kwargs) -@dataclass(unsafe_hash=True) +@dataclass(unsafe_hash=True, init=False) class String(R2Data): string: str vaddr: int @@ -53,11 +48,8 @@ class String(R2Data): length: int section: str = None - def __init__(self, **kwargs): - super().__init__(**kwargs) - -@dataclass(unsafe_hash=True) +@dataclass(unsafe_hash=True, init=False) class Symbol(R2Data): # see https://github.com/rizinorg/rizin/blob/dev/librz/include/rz_bin.h class SymbolType(str, Enum): @@ -136,32 +128,31 @@ def _cmd(self, cmd: str) -> str: self._r2c, ctypes.create_string_buffer(cmd.encode("utf-8"))) return ctypes.string_at(r).decode('utf-8') - @functools.cached_property + def _cmdj(self, cmd: str) -> list[dict]: + return json.loads(self._cmd(cmd)) + + @cached_property def sections(self) -> dict[str, Section]: - res = self._cmd("iSj") - sec_lst = json.loads(res) + sec_lst = self._cmdj("iSj") return {dic['name']: Section(**dic) for dic in sec_lst} - @functools.cached_property + @cached_property def strings(self) -> dict[str, String]: - res = self._cmd("izzj") - str_lst = json.loads(res) + str_lst = self._cmdj("izzj") return {dic['string']: String(**dic) for dic in str_lst} - @functools.cached_property + @cached_property def symbols(self) -> dict[str, Symbol]: - res = self._cmd("isj") - sym_lst = json.loads(res) + sym_lst = self._cmdj("isj") return {dic['name']: Symbol(**dic).vaddr for dic in sym_lst} - @functools.cached_property + @cached_property def functions(self) -> dict[str, Function]: self._cmd("aaa") - res = self._cmd("aflj") - fcn_lst = json.loads(res) + fcn_lst = self._cmdj("aflj") return {dic['name']: Function(**dic) for dic in fcn_lst} - @functools.cached_property + @cached_property def baddr(self) -> int: _bin = ctypes.cast(self._r2c.contents.bin, ctypes.POINTER(libr.r_bin.RBin)) From ca755a28a1cbd6563f478ee767140b8498b713e8 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Sun, 19 Jun 2022 21:50:16 +0800 Subject: [PATCH 06/25] Replace binascii.unhexlify with bytes.fromhex in r2 example --- examples/extensions/r2/hello_r2.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/extensions/r2/hello_r2.py b/examples/extensions/r2/hello_r2.py index bf2b79feb..9a8d84b33 100644 --- a/examples/extensions/r2/hello_r2.py +++ b/examples/extensions/r2/hello_r2.py @@ -5,7 +5,6 @@ import sys sys.path.append('..') -from binascii import unhexlify from qiling import Qiling from qiling.const import QL_VERBOSE @@ -24,7 +23,6 @@ def my_sandbox(path, rootfs): addrs = ql.mem.search(b'Hello worl') # return all matching results # search string using r2 addr = r2.strings['Hello world!'].vaddr # key must be exactly same - print(r2.strings['Hello world!'].__class__) # write to string using ql.mem.write ql.mem.write(addr, b"No hello, Bye!\x00") @@ -36,7 +34,7 @@ def my_sandbox(path, rootfs): my_sandbox(["rootfs/x86_windows/bin/x86_hello.exe"], "rootfs/x86_windows") # test shellcode mode - ARM64_LIN = unhexlify('420002ca210080d2400080d2c81880d2010000d4e60300aa01020010020280d2681980d2010000d4410080d2420002cae00306aa080380d2010000d4210400f165ffff54e0000010420002ca210001caa81b80d2010000d4020004d27f0000012f62696e2f736800') + ARM64_LIN = bytes.fromhex('420002ca210080d2400080d2c81880d2010000d4e60300aa01020010020280d2681980d2010000d4410080d2420002cae00306aa080380d2010000d4210400f165ffff54e0000010420002ca210001caa81b80d2010000d4020004d27f0000012f62696e2f736800') print("\nLinux ARM 64bit Shellcode") ql = Qiling(code=ARM64_LIN, archtype="arm64", ostype="linux", verbose=QL_VERBOSE.DEBUG) r2 = R2(ql) From e2cf9e2328db5c8a80087c7edd497317dd9d34e6 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Tue, 21 Jun 2022 20:38:52 +0800 Subject: [PATCH 07/25] style: improve typing - replace str enum with typing.Literal - avoid generic typing annotation for compability with Python 3.8 --- qiling/extensions/r2/r2.py | 50 +++++++++----------------------------- 1 file changed, 12 insertions(+), 38 deletions(-) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 190052917..c4ffb48df 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -9,6 +9,7 @@ from dataclasses import dataclass, fields from enum import Enum from functools import cached_property +from typing import Dict, List, Literal from qiling.core import Qiling from unicorn import UC_PROT_READ, UC_PROT_EXEC, UC_PROT_ALL @@ -36,7 +37,7 @@ class Section(R2Data): vsize: int paddr: int vaddr: int - perm: str # TODO: use int or enum + perm: str # TODO: use enum or literal @dataclass(unsafe_hash=True, init=False) @@ -52,41 +53,14 @@ class String(R2Data): @dataclass(unsafe_hash=True, init=False) class Symbol(R2Data): # see https://github.com/rizinorg/rizin/blob/dev/librz/include/rz_bin.h - class SymbolType(str, Enum): - NOTYPE = "NOTYPE" - OBJ = "OBJ" - FUNC = "FUNC" - FIELD = "FIELD" - IFACE = "IFACE" - METH = "METH" - STATIC = "STATIC" - SECT = "SECT" - FILE = "FILE" - COMMON = "COMMON" - TLS = "TLS" - NUM = "NUM" - LOOS = "LOOS" - HIOS = "HIOS" - LOPROC = "LOPROC" - HIPROC = "HIPROC" - SPCL = "SPCL" - UNK = "UNK" - - class SymbolBind(str, Enum): - LOCAL = "LOCAL" - GLOBAL = "GLOBAL" - WEAK = "WEAK" - NUM = "NUM" - LOOS = "LOOS" - HIOS = "HIOS" - LOPROC = "LOPROC" - HIPROC = "HIPROC" - IMPORT = "IMPORT" - UNKNOWN = "UNKNOWN" + SymbolType = Literal["NOTYPE", "OBJ", "FUNC", "FIELD", "IFACE", "METH", "STATIC", "SECT", + "FILE", "COMMON", "TLS", "NUM", "LOOS", "HIOS", "LOPROC", "HIPROC", "SPCL", "UNK"] + + SymbolBind = Literal["LOCAL", "GLOBAL", "WEAK", "NUM", "LOOS", "HIOS", "LOPROC", "HIPROC", "IMPORT", "UNKNOWN"] name: str realname: str - bind: str + bind: SymbolBind size: int type: SymbolType vaddr: int @@ -128,26 +102,26 @@ def _cmd(self, cmd: str) -> str: self._r2c, ctypes.create_string_buffer(cmd.encode("utf-8"))) return ctypes.string_at(r).decode('utf-8') - def _cmdj(self, cmd: str) -> list[dict]: + def _cmdj(self, cmd: str) -> List[Dict]: return json.loads(self._cmd(cmd)) @cached_property - def sections(self) -> dict[str, Section]: + def sections(self) -> Dict[str, Section]: sec_lst = self._cmdj("iSj") return {dic['name']: Section(**dic) for dic in sec_lst} @cached_property - def strings(self) -> dict[str, String]: + def strings(self) -> Dict[str, String]: str_lst = self._cmdj("izzj") return {dic['string']: String(**dic) for dic in str_lst} @cached_property - def symbols(self) -> dict[str, Symbol]: + def symbols(self) -> Dict[str, Symbol]: sym_lst = self._cmdj("isj") return {dic['name']: Symbol(**dic).vaddr for dic in sym_lst} @cached_property - def functions(self) -> dict[str, Function]: + def functions(self) -> Dict[str, Function]: self._cmd("aaa") fcn_lst = self._cmdj("aflj") return {dic['name']: Function(**dic) for dic in fcn_lst} From 1252a389d61c5923c40fe921dffd6444796cfc13 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Tue, 21 Jun 2022 20:46:43 +0800 Subject: [PATCH 08/25] Add r2libr==5.7.0 to essential dependency --- setup.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index 31b13372b..f12ab1b13 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,8 @@ "gevent>=20.9.0", "multiprocess>=0.70.12.2", "windows-curses>=2.1.0;platform_system=='Windows'", - "pyyaml>=6.0" + "pyyaml>=6.0", + "r2libr==5.7.0", ] extras = { @@ -43,9 +44,6 @@ "unicornafl>=2.0.0;platform_system=='Windows'", "fuzzercorn>=0.0.1;platform_system=='Linux'" ], - "SCA" : [ - "r2libr" - ] } with open("README.md", "r", encoding="utf-8") as ld: From f31b08357fbe8c8048d5bb3fa72cb07fa17b0220 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Sat, 2 Jul 2022 20:47:07 +0800 Subject: [PATCH 09/25] Convert rwx perm str to UC_PROT const integer --- qiling/extensions/r2/r2.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index c4ffb48df..0aad5028c 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -11,7 +11,7 @@ from functools import cached_property from typing import Dict, List, Literal from qiling.core import Qiling -from unicorn import UC_PROT_READ, UC_PROT_EXEC, UC_PROT_ALL +from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL class R2Data: @@ -37,7 +37,24 @@ class Section(R2Data): vsize: int paddr: int vaddr: int - perm: str # TODO: use enum or literal + perm: int + + @staticmethod + def perm2uc(permstr: str) -> int: + '''convert "-rwx" to unicorn const''' + perm = UC_PROT_NONE + dic = { + "r": UC_PROT_READ, + "w": UC_PROT_WRITE, + "x": UC_PROT_EXEC, + } + for ch in permstr: + perm += dic.get(ch, 0) + return perm + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.perm = Section.perm2uc(self.perm) @dataclass(unsafe_hash=True, init=False) From 24ec8fde0d5247610aeefb3050f8f305ab9bb84c Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Sun, 3 Jul 2022 11:34:28 +0800 Subject: [PATCH 10/25] Add `read` function to r2 - use `s addr`, `p8 size` to get hex data - return bytes that can be used to write memory --- qiling/extensions/r2/r2.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 0aad5028c..12ddf78c8 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -121,6 +121,11 @@ def _cmd(self, cmd: str) -> str: def _cmdj(self, cmd: str) -> List[Dict]: return json.loads(self._cmd(cmd)) + + def read(self, addr: int, size: int) -> bytes: + self._cmd(f"s {addr}") + hexstr = self._cmd(f"p8 {size}") + return bytes.fromhex(hexstr) @cached_property def sections(self) -> Dict[str, Section]: From faf1537b150b162c4d64be05adb3d0657bce1b19 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Mon, 4 Jul 2022 20:51:39 +0800 Subject: [PATCH 11/25] Add functions to get binary info r2 command `iIj` return JSON of binary info like baddr and bintype --- qiling/extensions/r2/r2.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 12ddf78c8..886e854f0 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -9,7 +9,7 @@ from dataclasses import dataclass, fields from enum import Enum from functools import cached_property -from typing import Dict, List, Literal +from typing import Dict, List, Literal, Union from qiling.core import Qiling from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL @@ -119,7 +119,7 @@ def _cmd(self, cmd: str) -> str: self._r2c, ctypes.create_string_buffer(cmd.encode("utf-8"))) return ctypes.string_at(r).decode('utf-8') - def _cmdj(self, cmd: str) -> List[Dict]: + def _cmdj(self, cmd: str) -> Union[Dict, List[Dict]]: return json.loads(self._cmd(cmd)) def read(self, addr: int, size: int) -> bytes: @@ -148,11 +148,17 @@ def functions(self) -> Dict[str, Function]: fcn_lst = self._cmdj("aflj") return {dic['name']: Function(**dic) for dic in fcn_lst} + @cached_property + def binfo(self) -> Dict[str, str]: + return self._cmdj("iIj") + @cached_property def baddr(self) -> int: - _bin = ctypes.cast(self._r2c.contents.bin, - ctypes.POINTER(libr.r_bin.RBin)) - return libr.r_bin.r_bin_get_baddr(_bin) + return self.binfo["baddr"] + + @cached_property + def bintype(self) -> str: + return self.binfo["bintype"] def __del__(self): libr.r_core.r_core_free(self._r2c) From cee426538d85c24595dfc0b332971ae2ec124f85 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Tue, 5 Jul 2022 00:06:06 +0800 Subject: [PATCH 12/25] Add function to get associated flag at an address - flag is a bookmark that associate a name with a given offset - memory address in qiling can be interpreted better --- examples/extensions/r2/hello_r2.py | 2 ++ qiling/extensions/r2/r2.py | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/examples/extensions/r2/hello_r2.py b/examples/extensions/r2/hello_r2.py index 9a8d84b33..b48d91296 100644 --- a/examples/extensions/r2/hello_r2.py +++ b/examples/extensions/r2/hello_r2.py @@ -21,8 +21,10 @@ def my_sandbox(path, rootfs): # search bytes sequence using ql.mem.search addrs = ql.mem.search(b'Hello worl') # return all matching results + print(r2.at(addrs[0])) # find the corresponding flag at the address # search string using r2 addr = r2.strings['Hello world!'].vaddr # key must be exactly same + print(addrs[0], addr) # write to string using ql.mem.write ql.mem.write(addr, b"No hello, Bye!\x00") diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 886e854f0..0f878bc21 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -3,6 +3,7 @@ # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # +import bisect import ctypes import json import libr @@ -84,6 +85,14 @@ class Symbol(R2Data): paddr: int is_imported: bool +@dataclass(unsafe_hash=True, init=False) +class Flag(R2Data): + offset: int + name: str = '' + size: int = 0 + + def __lt__(self, other): + return self.offset < other.offset class R2: def __init__(self, ql: Qiling, baseaddr=(1 << 64) - 1, loadaddr=0): @@ -148,6 +157,17 @@ def functions(self) -> Dict[str, Function]: fcn_lst = self._cmdj("aflj") return {dic['name']: Function(**dic) for dic in fcn_lst} + @cached_property + def flags(self) -> List[Flag]: + return [Flag(**dic) for dic in self._cmdj("fj")] + + def at(self, addr: int) -> Flag: + # the most suitable flag should have address <= addr + # bisect_right find the insertion point, right side if value exists + idx = bisect.bisect_right(self.flags, Flag(offset=addr)) + # minus 1 to find the corresponding flag + return self.flags[idx - 1] + @cached_property def binfo(self) -> Dict[str, str]: return self._cmdj("iIj") From e4aad7dc938166a1c236db226cfdf7f2baea2d3f Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Tue, 5 Jul 2022 10:35:28 +0800 Subject: [PATCH 13/25] style: simplify `read` function --- qiling/extensions/r2/r2.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 0f878bc21..97fbf0ebb 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -132,8 +132,7 @@ def _cmdj(self, cmd: str) -> Union[Dict, List[Dict]]: return json.loads(self._cmd(cmd)) def read(self, addr: int, size: int) -> bytes: - self._cmd(f"s {addr}") - hexstr = self._cmd(f"p8 {size}") + hexstr = self._cmd(f"p8 {size} @ {addr}") return bytes.fromhex(hexstr) @cached_property From f6cf40aa763c5e1cdc515f75b3eef2a3b95ae3df Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Wed, 6 Jul 2022 10:35:58 +0800 Subject: [PATCH 14/25] refactor: simplify setup_code and setup_file - set arch and bits for r2 asm only in shellcode mode --- qiling/extensions/r2/r2.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 97fbf0ebb..ba67d244e 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -103,23 +103,20 @@ def __init__(self, ql: Qiling, baseaddr=(1 << 64) - 1, loadaddr=0): self._r2c = libr.r_core.r_core_new() if ql.code: - self._setup_code() + self._setup_code(ql.code) else: - self._setup_file() + self._setup_file(ql.path) - # set architecture and bits for r2 asm - self._cmd(f"e,asm.arch={ql.arch.type.name.lower().removesuffix('64')},asm.bits={ql.arch.bits}") - - def _setup_code(self): - sz = len(self.ql.code) - path = f'malloc://{sz}'.encode() + def _setup_code(self, code: bytes): + path = f'malloc://{len(code)}'.encode() fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_ALL, self.loadaddr) libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr) - cmd = f'wx {self.ql.code.hex()}' - self._cmd(cmd) + self._cmd(f'wx {code.hex()}') + # set architecture and bits for r2 asm + self._cmd(f"e,asm.arch={self.ql.arch.type.name.lower().removesuffix('64')},asm.bits={self.ql.arch.bits}") - def _setup_file(self): - path = self.ql.path.encode() + def _setup_file(self, path: str): + path = path.encode() fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_READ | UC_PROT_EXEC, self.loadaddr) libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr) From 682ee74a8760c6d80f75dd57c6f28ef1208169ef Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Wed, 6 Jul 2022 10:51:33 +0800 Subject: [PATCH 15/25] Improve `at` function to get offset to flag --- examples/extensions/r2/hello_r2.py | 2 +- qiling/extensions/r2/r2.py | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/examples/extensions/r2/hello_r2.py b/examples/extensions/r2/hello_r2.py index b48d91296..6f955cec2 100644 --- a/examples/extensions/r2/hello_r2.py +++ b/examples/extensions/r2/hello_r2.py @@ -21,7 +21,7 @@ def my_sandbox(path, rootfs): # search bytes sequence using ql.mem.search addrs = ql.mem.search(b'Hello worl') # return all matching results - print(r2.at(addrs[0])) # find the corresponding flag at the address + print(r2.at(addrs[0])) # find corresponding flag at the address and the offset to the flag # search string using r2 addr = r2.strings['Hello world!'].vaddr # key must be exactly same print(addrs[0], addr) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index ba67d244e..f286063b9 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -10,7 +10,7 @@ from dataclasses import dataclass, fields from enum import Enum from functools import cached_property -from typing import Dict, List, Literal, Union +from typing import Dict, List, Literal, Tuple, Union from qiling.core import Qiling from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL @@ -157,12 +157,13 @@ def functions(self) -> Dict[str, Function]: def flags(self) -> List[Flag]: return [Flag(**dic) for dic in self._cmdj("fj")] - def at(self, addr: int) -> Flag: + def at(self, addr: int) -> Tuple[Flag, int]: # the most suitable flag should have address <= addr # bisect_right find the insertion point, right side if value exists idx = bisect.bisect_right(self.flags, Flag(offset=addr)) # minus 1 to find the corresponding flag - return self.flags[idx - 1] + flag = self.flags[idx - 1] + return flag, addr - flag.offset @cached_property def binfo(self) -> Dict[str, str]: From 027a926b2d9fa232f5bdaf8e91dcdd29105b0522 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Wed, 6 Jul 2022 10:54:45 +0800 Subject: [PATCH 16/25] PoC of symbol resolved trace powered by r2 --- examples/extensions/r2/hello_r2.py | 6 ++++-- qiling/extensions/r2/r2.py | 9 +++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/examples/extensions/r2/hello_r2.py b/examples/extensions/r2/hello_r2.py index 6f955cec2..f3769cbe8 100644 --- a/examples/extensions/r2/hello_r2.py +++ b/examples/extensions/r2/hello_r2.py @@ -16,11 +16,11 @@ def func(ql: Qiling, *args, **kwargs): return def my_sandbox(path, rootfs): - ql = Qiling(path, rootfs, verbose=QL_VERBOSE.DEFAULT) + ql = Qiling(path, rootfs, verbose=QL_VERBOSE.DEBUG) r2 = R2(ql) # search bytes sequence using ql.mem.search - addrs = ql.mem.search(b'Hello worl') # return all matching results + addrs = ql.mem.search(b'llo worl') # return all matching results print(r2.at(addrs[0])) # find corresponding flag at the address and the offset to the flag # search string using r2 addr = r2.strings['Hello world!'].vaddr # key must be exactly same @@ -30,6 +30,8 @@ def my_sandbox(path, rootfs): # get function address and hook it ql.hook_address(func, r2.functions['main'].offset) + # enable trace powered by r2 symsmap + r2.enable_trace() ql.run() if __name__ == "__main__": diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index f286063b9..dc86c9452 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -12,6 +12,7 @@ from functools import cached_property from typing import Dict, List, Literal, Tuple, Union from qiling.core import Qiling +from qiling.extensions import trace from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL @@ -165,6 +166,14 @@ def at(self, addr: int) -> Tuple[Flag, int]: flag = self.flags[idx - 1] return flag, addr - flag.offset + def enable_trace(self, mode='full'): + # simple map from addr to flag name, cannot resolve addresses in the middle + self.ql.loader.symsmap = {flag.offset: flag.name for flag in self.flags} + if mode == 'full': + trace.enable_full_trace(self.ql) + elif mode == 'history': + trace.enable_history_trace(self.ql) + @cached_property def binfo(self) -> Dict[str, str]: return self._cmdj("iIj") From a9d75944abfd4f54f29fcd4ebaa2a83ee009d3ba Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Wed, 6 Jul 2022 15:13:22 +0800 Subject: [PATCH 17/25] feat: PoC of r2 xref --- examples/extensions/r2/hello_r2.py | 2 ++ qiling/extensions/r2/r2.py | 27 ++++++++++++++++++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/examples/extensions/r2/hello_r2.py b/examples/extensions/r2/hello_r2.py index f3769cbe8..5864bfa49 100644 --- a/examples/extensions/r2/hello_r2.py +++ b/examples/extensions/r2/hello_r2.py @@ -32,6 +32,8 @@ def my_sandbox(path, rootfs): ql.hook_address(func, r2.functions['main'].offset) # enable trace powered by r2 symsmap r2.enable_trace() + # print xref to string "Hello world!" + print(r2.refto(addr)) ql.run() if __name__ == "__main__": diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index dc86c9452..f506c08f9 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -10,7 +10,7 @@ from dataclasses import dataclass, fields from enum import Enum from functools import cached_property -from typing import Dict, List, Literal, Tuple, Union +from typing import Dict, List, Literal, Optional, Tuple, Union from qiling.core import Qiling from qiling.extensions import trace from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL @@ -95,6 +95,21 @@ class Flag(R2Data): def __lt__(self, other): return self.offset < other.offset +@dataclass(unsafe_hash=True, init=False) +class Xref(R2Data): + name: str + fromaddr: int # from is reserved word in Python + refname: str + addr: int + type: str + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.fromaddr = kwargs["from"] + + def __lt__(self, other): + return self.fromaddr < other.fromaddr + class R2: def __init__(self, ql: Qiling, baseaddr=(1 << 64) - 1, loadaddr=0): super().__init__() @@ -158,6 +173,10 @@ def functions(self) -> Dict[str, Function]: def flags(self) -> List[Flag]: return [Flag(**dic) for dic in self._cmdj("fj")] + @cached_property + def xrefs(self) -> Dict[int, Xref]: + return {dic['from']: Xref(**dic) for dic in self._cmdj("axj")} + def at(self, addr: int) -> Tuple[Flag, int]: # the most suitable flag should have address <= addr # bisect_right find the insertion point, right side if value exists @@ -166,6 +185,12 @@ def at(self, addr: int) -> Tuple[Flag, int]: flag = self.flags[idx - 1] return flag, addr - flag.offset + def refrom(self, addr: int) -> Optional[Xref]: + return self.xrefs.get(addr) + + def refto(self, addr: int) -> List[Xref]: + return [xref for xref in self.xrefs.values() if xref.addr == addr] + def enable_trace(self, mode='full'): # simple map from addr to flag name, cannot resolve addresses in the middle self.ql.loader.symsmap = {flag.offset: flag.name for flag in self.flags} From 5ca42773b695ed3d38c70e8c8950f8b3e85b1e76 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Wed, 6 Jul 2022 15:56:54 +0800 Subject: [PATCH 18/25] refactor: use decorator to analyze before call methods --- examples/extensions/r2/hello_r2.py | 4 ++-- qiling/extensions/r2/r2.py | 18 +++++++++++++++--- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/examples/extensions/r2/hello_r2.py b/examples/extensions/r2/hello_r2.py index 5864bfa49..d51dc9c7d 100644 --- a/examples/extensions/r2/hello_r2.py +++ b/examples/extensions/r2/hello_r2.py @@ -25,6 +25,8 @@ def my_sandbox(path, rootfs): # search string using r2 addr = r2.strings['Hello world!'].vaddr # key must be exactly same print(addrs[0], addr) + # print xref to string "Hello world!" + print(r2.refto(addr)) # write to string using ql.mem.write ql.mem.write(addr, b"No hello, Bye!\x00") @@ -32,8 +34,6 @@ def my_sandbox(path, rootfs): ql.hook_address(func, r2.functions['main'].offset) # enable trace powered by r2 symsmap r2.enable_trace() - # print xref to string "Hello world!" - print(r2.refto(addr)) ql.run() if __name__ == "__main__": diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index f506c08f9..a845df611 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -9,7 +9,7 @@ import libr from dataclasses import dataclass, fields from enum import Enum -from functools import cached_property +from functools import cached_property, wraps from typing import Dict, List, Literal, Optional, Tuple, Union from qiling.core import Qiling from qiling.extensions import trace @@ -116,7 +116,7 @@ def __init__(self, ql: Qiling, baseaddr=(1 << 64) - 1, loadaddr=0): self.ql = ql self.baseaddr = baseaddr # r2 -B [baddr] set base address for PIE binaries self.loadaddr = loadaddr # r2 -m [addr] map file at given address - + self.analyzed = False self._r2c = libr.r_core.r_core_new() if ql.code: self._setup_code(ql.code) @@ -141,6 +141,16 @@ def _cmd(self, cmd: str) -> str: self._r2c, ctypes.create_string_buffer(cmd.encode("utf-8"))) return ctypes.string_at(r).decode('utf-8') + @staticmethod + def aaa(fun): + @wraps(fun) + def wrapper(self): + if self.analyzed is False: + self._cmd("aaa") + self.analyzed = True + return fun(self) + return wrapper + def _cmdj(self, cmd: str) -> Union[Dict, List[Dict]]: return json.loads(self._cmd(cmd)) @@ -164,16 +174,18 @@ def symbols(self) -> Dict[str, Symbol]: return {dic['name']: Symbol(**dic).vaddr for dic in sym_lst} @cached_property + @aaa def functions(self) -> Dict[str, Function]: - self._cmd("aaa") fcn_lst = self._cmdj("aflj") return {dic['name']: Function(**dic) for dic in fcn_lst} @cached_property + @aaa def flags(self) -> List[Flag]: return [Flag(**dic) for dic in self._cmdj("fj")] @cached_property + @aaa def xrefs(self) -> Dict[int, Xref]: return {dic['from']: Xref(**dic) for dic in self._cmdj("axj")} From cf51ee94de284c11413b3f7b1c29b6d39313a21d Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Thu, 14 Jul 2022 09:36:37 +0800 Subject: [PATCH 19/25] style(r2): adjust order of classes and methods --- qiling/extensions/r2/r2.py | 52 ++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index a845df611..20b456d8f 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -24,14 +24,6 @@ def __init__(self, **kwargs): setattr(self, k, v) -@dataclass(unsafe_hash=True, init=False) -class Function(R2Data): - name: str - offset: int - size: int - signature: str - - @dataclass(unsafe_hash=True, init=False) class Section(R2Data): name: str @@ -86,6 +78,15 @@ class Symbol(R2Data): paddr: int is_imported: bool + +@dataclass(unsafe_hash=True, init=False) +class Function(R2Data): + name: str + offset: int + size: int + signature: str + + @dataclass(unsafe_hash=True, init=False) class Flag(R2Data): offset: int @@ -95,6 +96,7 @@ class Flag(R2Data): def __lt__(self, other): return self.offset < other.offset + @dataclass(unsafe_hash=True, init=False) class Xref(R2Data): name: str @@ -110,11 +112,13 @@ def __init__(self, **kwargs): def __lt__(self, other): return self.fromaddr < other.fromaddr + class R2: def __init__(self, ql: Qiling, baseaddr=(1 << 64) - 1, loadaddr=0): super().__init__() self.ql = ql - self.baseaddr = baseaddr # r2 -B [baddr] set base address for PIE binaries + # r2 -B [baddr] set base address for PIE binaries + self.baseaddr = baseaddr self.loadaddr = loadaddr # r2 -m [addr] map file at given address self.analyzed = False self._r2c = libr.r_core.r_core_new() @@ -154,9 +158,17 @@ def wrapper(self): def _cmdj(self, cmd: str) -> Union[Dict, List[Dict]]: return json.loads(self._cmd(cmd)) - def read(self, addr: int, size: int) -> bytes: - hexstr = self._cmd(f"p8 {size} @ {addr}") - return bytes.fromhex(hexstr) + @cached_property + def binfo(self) -> Dict[str, str]: + return self._cmdj("iIj") + + @cached_property + def baddr(self) -> int: + return self.binfo["baddr"] + + @cached_property + def bintype(self) -> str: + return self.binfo["bintype"] @cached_property def sections(self) -> Dict[str, Section]: @@ -203,6 +215,10 @@ def refrom(self, addr: int) -> Optional[Xref]: def refto(self, addr: int) -> List[Xref]: return [xref for xref in self.xrefs.values() if xref.addr == addr] + def read(self, addr: int, size: int) -> bytes: + hexstr = self._cmd(f"p8 {size} @ {addr}") + return bytes.fromhex(hexstr) + def enable_trace(self, mode='full'): # simple map from addr to flag name, cannot resolve addresses in the middle self.ql.loader.symsmap = {flag.offset: flag.name for flag in self.flags} @@ -211,17 +227,5 @@ def enable_trace(self, mode='full'): elif mode == 'history': trace.enable_history_trace(self.ql) - @cached_property - def binfo(self) -> Dict[str, str]: - return self._cmdj("iIj") - - @cached_property - def baddr(self) -> int: - return self.binfo["baddr"] - - @cached_property - def bintype(self) -> str: - return self.binfo["bintype"] - def __del__(self): libr.r_core.r_core_free(self._r2c) From 6b1a3477e6204d77dca2929d5dc04e0f295619fd Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Fri, 15 Jul 2022 21:36:03 +0800 Subject: [PATCH 20/25] refactor(r2): get xrefs as list --- qiling/extensions/r2/r2.py | 48 +++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 20b456d8f..56f56ca05 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -16,6 +16,19 @@ from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL +def perm2uc(permstr: str) -> int: + '''convert "-rwx" to unicorn const''' + perm = UC_PROT_NONE + dic = { + "r": UC_PROT_READ, + "w": UC_PROT_WRITE, + "x": UC_PROT_EXEC, + } + for ch in permstr: + perm += dic.get(ch, 0) + return perm + + class R2Data: def __init__(self, **kwargs): names = set([f.name for f in fields(self)]) @@ -33,22 +46,9 @@ class Section(R2Data): vaddr: int perm: int - @staticmethod - def perm2uc(permstr: str) -> int: - '''convert "-rwx" to unicorn const''' - perm = UC_PROT_NONE - dic = { - "r": UC_PROT_READ, - "w": UC_PROT_WRITE, - "x": UC_PROT_EXEC, - } - for ch in permstr: - perm += dic.get(ch, 0) - return perm - def __init__(self, **kwargs): super().__init__(**kwargs) - self.perm = Section.perm2uc(self.perm) + self.perm = perm2uc(self.perm) @dataclass(unsafe_hash=True, init=False) @@ -99,15 +99,19 @@ def __lt__(self, other): @dataclass(unsafe_hash=True, init=False) class Xref(R2Data): + XrefType = Literal["NULL", "CODE", "CALL", "DATA", "STRN", "UNKN"] + name: str fromaddr: int # from is reserved word in Python - refname: str + type: XrefType + perm: int addr: int - type: str + refname: str def __init__(self, **kwargs): super().__init__(**kwargs) self.fromaddr = kwargs["from"] + self.perm = perm2uc(self.perm) def __lt__(self, other): return self.fromaddr < other.fromaddr @@ -133,7 +137,7 @@ def _setup_code(self, code: bytes): libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr) self._cmd(f'wx {code.hex()}') # set architecture and bits for r2 asm - self._cmd(f"e,asm.arch={self.ql.arch.type.name.lower().removesuffix('64')},asm.bits={self.ql.arch.bits}") + self._cmd(f"e,asm.arch={self.ql.arch.type.name.lower().rstrip('64')},asm.bits={self.ql.arch.bits}") def _setup_file(self, path: str): path = path.encode() @@ -198,8 +202,8 @@ def flags(self) -> List[Flag]: @cached_property @aaa - def xrefs(self) -> Dict[int, Xref]: - return {dic['from']: Xref(**dic) for dic in self._cmdj("axj")} + def xrefs(self) -> List[Xref]: + return [Xref(**dic) for dic in self._cmdj("axj")] def at(self, addr: int) -> Tuple[Flag, int]: # the most suitable flag should have address <= addr @@ -209,11 +213,11 @@ def at(self, addr: int) -> Tuple[Flag, int]: flag = self.flags[idx - 1] return flag, addr - flag.offset - def refrom(self, addr: int) -> Optional[Xref]: - return self.xrefs.get(addr) + def refrom(self, addr: int) -> List[Xref]: + return [x for x in self.xrefs if x.fromaddr == addr] def refto(self, addr: int) -> List[Xref]: - return [xref for xref in self.xrefs.values() if xref.addr == addr] + return [x for x in self.xrefs if x.addr == addr] def read(self, addr: int, size: int) -> bytes: hexstr = self._cmd(f"p8 {size} @ {addr}") From 86c3c9f2f3a5b662616ed54721ede68757a9f8a7 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Fri, 15 Jul 2022 21:46:06 +0800 Subject: [PATCH 21/25] refactor: convert ql arch to r2 without removesuffix() --- qiling/extensions/r2/r2.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 56f56ca05..6f12a444c 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -131,13 +131,19 @@ def __init__(self, ql: Qiling, baseaddr=(1 << 64) - 1, loadaddr=0): else: self._setup_file(ql.path) + def _qlarch2r(self, archname: str) -> str: + archname = archname.lower() + # str.removesuffix() is not available until Python 3.9 + return archname[:-2] if archname.endswith('64') else archname + def _setup_code(self, code: bytes): path = f'malloc://{len(code)}'.encode() fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_ALL, self.loadaddr) libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr) self._cmd(f'wx {code.hex()}') # set architecture and bits for r2 asm - self._cmd(f"e,asm.arch={self.ql.arch.type.name.lower().rstrip('64')},asm.bits={self.ql.arch.bits}") + arch = self._qlarch2r(self.ql.arch.type.name) + self._cmd(f"e,asm.arch={arch},asm.bits={self.ql.arch.bits}") def _setup_file(self, path: str): path = path.encode() From b2a2311cb3965680e66157a5f11211a6d0b19c8a Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Sun, 17 Jul 2022 00:31:57 +0800 Subject: [PATCH 22/25] fix(r2): set r2 arch from QlArch properly --- qiling/extensions/r2/r2.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 6f12a444c..dc65e7b9e 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -11,6 +11,7 @@ from enum import Enum from functools import cached_property, wraps from typing import Dict, List, Literal, Optional, Tuple, Union +from qiling.const import QL_ARCH from qiling.core import Qiling from qiling.extensions import trace from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL @@ -131,10 +132,20 @@ def __init__(self, ql: Qiling, baseaddr=(1 << 64) - 1, loadaddr=0): else: self._setup_file(ql.path) - def _qlarch2r(self, archname: str) -> str: - archname = archname.lower() - # str.removesuffix() is not available until Python 3.9 - return archname[:-2] if archname.endswith('64') else archname + def _qlarch2r(self, archtype: QL_ARCH) -> str: + return { + QL_ARCH.X86: "x86", + QL_ARCH.X8664: "x86", + QL_ARCH.ARM: "arm", + QL_ARCH.ARM64: "arm", + QL_ARCH.A8086: "x86", + QL_ARCH.EVM: "evm.cs", + QL_ARCH.CORTEX_M: "arm", + QL_ARCH.MIPS: "mips", + QL_ARCH.RISCV: "riscv", + QL_ARCH.RISCV64: "riscv", + QL_ARCH.PPC: "ppc", + }[archtype] def _setup_code(self, code: bytes): path = f'malloc://{len(code)}'.encode() @@ -142,7 +153,7 @@ def _setup_code(self, code: bytes): libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr) self._cmd(f'wx {code.hex()}') # set architecture and bits for r2 asm - arch = self._qlarch2r(self.ql.arch.type.name) + arch = self._qlarch2r(self.ql.arch.type) self._cmd(f"e,asm.arch={arch},asm.bits={self.ql.arch.bits}") def _setup_file(self, path: str): From 48c36991af93693525c8f2c3bb57159f18fb0e52 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Sun, 17 Jul 2022 10:09:51 +0800 Subject: [PATCH 23/25] test(r2): Add test for evm shellcode disasm --- tests/test_r2.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 tests/test_r2.py diff --git a/tests/test_r2.py b/tests/test_r2.py new file mode 100644 index 000000000..ec28cd11b --- /dev/null +++ b/tests/test_r2.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 + +import sys, unittest + +sys.path.append("..") +from qiling import Qiling +from qiling.const import QL_VERBOSE +from qiling.extensions.r2.r2 import R2 + + +EVM_CODE = bytes.fromhex("6060604052341561000f57600080fd5b60405160208061031c833981016040528080519060200190919050508060018190556000803373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff1681526020019081526020016000208190555050610299806100836000396000f300606060405260043610610057576000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff16806318160ddd1461005c57806370a0823114610085578063a9059cbb146100d2575b600080fd5b341561006757600080fd5b61006f61012c565b6040518082815260200191505060405180910390f35b341561009057600080fd5b6100bc600480803573ffffffffffffffffffffffffffffffffffffffff16906020019091905050610132565b6040518082815260200191505060405180910390f35b34156100dd57600080fd5b610112600480803573ffffffffffffffffffffffffffffffffffffffff1690602001909190803590602001909190505061017a565b604051808215151515815260200191505060405180910390f35b60015481565b60008060008373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff168152602001908152602001600020549050919050565b600080826000803373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff1681526020019081526020016000205403101515156101cb57600080fd5b816000803373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff16815260200190815260200160002060008282540392505081905550816000808573ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff1681526020019081526020016000206000828254019250508190555060019050929150505600a165627a7a7230582098f1551a391a3e65b3ce45cfa2b3fa5f91eea9a3e7181a81454e025ea0d7151c0029") + + +class R2Test(unittest.TestCase): + def test_shellcode_disasm(self): + ql = Qiling(code=EVM_CODE, archtype="evm", verbose=QL_VERBOSE.DEBUG) + r2 = R2(ql) + pd = r2._cmd("pd 32") + self.assertTrue('invalid' not in pd) + + +if __name__ == "__main__": + unittest.main() From 7c0fd6a8232337d4fa6e0f8518c4eb7536996bae Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Sun, 17 Jul 2022 11:58:26 +0800 Subject: [PATCH 24/25] style(r2): remove unnecessary import - "Qiling" is only used for type hint --- qiling/extensions/r2/r2.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index dc65e7b9e..669efc311 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -8,14 +8,14 @@ import json import libr from dataclasses import dataclass, fields -from enum import Enum from functools import cached_property, wraps -from typing import Dict, List, Literal, Optional, Tuple, Union +from typing import TYPE_CHECKING, Dict, List, Literal, Tuple, Union from qiling.const import QL_ARCH -from qiling.core import Qiling from qiling.extensions import trace from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL +if TYPE_CHECKING: + from qiling.core import Qiling def perm2uc(permstr: str) -> int: '''convert "-rwx" to unicorn const''' @@ -119,7 +119,7 @@ def __lt__(self, other): class R2: - def __init__(self, ql: Qiling, baseaddr=(1 << 64) - 1, loadaddr=0): + def __init__(self, ql: "Qiling", baseaddr=(1 << 64) - 1, loadaddr=0): super().__init__() self.ql = ql # r2 -B [baddr] set base address for PIE binaries From 7ce81099e18deddc43a6950741cc590272c33435 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Mon, 18 Jul 2022 00:15:12 +0800 Subject: [PATCH 25/25] chore: update r2libr and move to extra dependency --- setup.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index f12ab1b13..84b0648c8 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,6 @@ "multiprocess>=0.70.12.2", "windows-curses>=2.1.0;platform_system=='Windows'", "pyyaml>=6.0", - "r2libr==5.7.0", ] extras = { @@ -44,6 +43,9 @@ "unicornafl>=2.0.0;platform_system=='Windows'", "fuzzercorn>=0.0.1;platform_system=='Linux'" ], + "RE": [ + "r2libr>=5.7.4", + ] } with open("README.md", "r", encoding="utf-8") as ld: