From 8ce387c679811d9f7c4b68c3585705399e60367c Mon Sep 17 00:00:00 2001 From: ucgJhe Date: Tue, 8 Feb 2022 02:32:34 +0800 Subject: [PATCH 1/7] implement partial jumping instruction handler for x86 --- qiling/debugger/qdb/qdb.py | 18 ++++-- qiling/debugger/qdb/utils.py | 107 ++++++++++++++++++++++++++++++++--- 2 files changed, 113 insertions(+), 12 deletions(-) diff --git a/qiling/debugger/qdb/qdb.py b/qiling/debugger/qdb/qdb.py index 819c04b51..4fb4c95e5 100644 --- a/qiling/debugger/qdb/qdb.py +++ b/qiling/debugger/qdb/qdb.py @@ -13,7 +13,7 @@ from qiling.debugger import QlDebugger from .frontend import context_reg, context_asm, examine_mem -from .utils import _parse_int, handle_bnj, is_thumb, CODE_END, parse_int +from .utils import handle_bnj, is_thumb, CODE_END, parse_int from .utils import Breakpoint, TempBreakpoint from .const import * @@ -40,6 +40,16 @@ def dbg_hook(self: QlQdb, init_hook: str): # self.ql.loader.entry_point # ld.so # self.ql.loader.elf_entry # .text of binary + if self.ql.archtype in (QL_ARCH.X86, QL_ARCH.X8664): + # def pause(ql, addr, size): + # data = ql.mem.read(addr, size) + # ql.disassembler.detail = True + # breakpoint() + # ret = next(ql.disassembler.disasm(data, addr)) + # print(ret) + + self.ql.hook_code(handle_bnj) + if init_hook and self.ql.loader.entry_point != init_hook: self.do_breakpoint(init_hook) @@ -222,10 +232,10 @@ def do_step(self: QlQdb, *args) -> Optional[bool]: if self.rr: self._save() - _, next_stop = handle_bnj(self.ql, self.cur_addr) + # _, next_stop = handle_bnj(self.ql, self.cur_addr) - if next_stop is CODE_END: - return True + # if next_stop is CODE_END: + # return True if self.ql.archtype == QL_ARCH.CORTEX_M: self.ql.arch.step() diff --git a/qiling/debugger/qdb/utils.py b/qiling/debugger/qdb/utils.py index f32003205..29c6f3950 100644 --- a/qiling/debugger/qdb/utils.py +++ b/qiling/debugger/qdb/utils.py @@ -103,13 +103,14 @@ def signed_val(val: int) -> int: # handle braches and jumps so we can set berakpoint properly -def handle_bnj(ql: Qiling, cur_addr: str) -> Callable[[Qiling, str], int]: +def handle_bnj(ql: Qiling, cur_addr: str, size: int) -> Callable[[Qiling, str], int]: return { QL_ARCH.MIPS : handle_bnj_mips, QL_ARCH.ARM : handle_bnj_arm, QL_ARCH.ARM_THUMB: handle_bnj_arm, QL_ARCH.CORTEX_M : handle_bnj_arm, - }.get(ql.archtype)(ql, cur_addr) + QL_ARCH.X86 : handle_bnj_x86, + }.get(ql.archtype)(ql, cur_addr, size) def get_cpsr(bits: int) -> (bool, bool, bool, bool): @@ -121,6 +122,27 @@ def get_cpsr(bits: int) -> (bool, bool, bool, bool): ) +def get_x86_eflags(bits: int) -> Dict[str, bool]: + return { + "CF" : bits & 0x0001 != 0, # CF, carry flag + "PF" : bits & 0x0004 != 0, # PF, parity flag + "AF" : bits & 0x0010 != 0, # AF, adjust flag + "ZF" : bits & 0x0040 != 0, # ZF, zero flag + "SF" : bits & 0x0080 != 0, # SF, sign flag + "OF" : bits & 0x0800 != 0, # OF, overflow flag + } + +def get_eflags(bits: int) -> (bool, bool, bool, bool, bool, bool): + return ( + bits & 0x0001 != 0, # CF, carry flag + bits & 0x0004 != 0, # PF, parity flag + bits & 0x0010 != 0, # AF, adjust flag + bits & 0x0040 != 0, # ZF, zero flag + bits & 0x0080 != 0, # SF, sign flag + bits & 0x0800 != 0, # OF, overflow flag + ) + + def is_thumb(bits: int) -> bool: return bits & 0x00000020 != 0 @@ -160,8 +182,77 @@ def _read_inst(ql: Qiling, addr: int) -> int: return result +def handle_bnj_x86(ql: Qilng, cur_addr: str, size: int) -> (bool, int): + + # FIXME: NO HANDLE BRANCH AND JUMP FOR X86 FOR NOW + + ret_addr = None + + md = ql.disassembler + data = ql.mem.read(cur_addr, size) + line = next(md.disasm(data, cur_addr)) + + jump_table = { + + # conditional jump + "jo" : (lambda C, P, A, Z, S, O: O == 1), + "jno" : (lambda C, P, A, Z, S, O: O == 0), + + "js" : (lambda C, P, A, Z, S, O: S == 1), + "jns" : (lambda C, P, A, Z, S, O: S == 0), + + "je" : (lambda C, P, A, Z, S, O: Z == 1), + "jz" : (lambda C, P, A, Z, S, O: Z == 1), + + "jne" : (lambda C, P, A, Z, S, O: Z == 0), + "jnz" : (lambda C, P, A, Z, S, O: Z == 0), + + "jb" : (lambda C, P, A, Z, S, O: C == 1), + "jc" : (lambda C, P, A, Z, S, O: C == 1), + "jnae" : (lambda C, P, A, Z, S, O: C == 1), + + "jnb" : (lambda C, P, A, Z, S, O: C == 0), + "jnc" : (lambda C, P, A, Z, S, O: C == 0), + "jae" : (lambda C, P, A, Z, S, O: C == 0), + + "jbe" : (lambda C, P, A, Z, S, O: C == 1 or Z == 1), + "jna" : (lambda C, P, A, Z, S, O: C == 1 or Z == 1), + + "ja" : (lambda C, P, A, Z, S, O: C == 0 and Z == 0), + "jnbe" : (lambda C, P, A, Z, S, O: C == 0 and Z == 0), + + "jl" : (lambda C, P, A, Z, S, O: S != O), + "jnge" : (lambda C, P, A, Z, S, O: S != O), + + "jge" : (lambda C, P, A, Z, S, O: S == O), + "jnl" : (lambda C, P, A, Z, S, O: S == O), + + "jle" : (lambda C, P, A, Z, S, O: Z == 1 or S != O), + "jng" : (lambda C, P, A, Z, S, O: Z == 1 or S != O), + + "jg" : (lambda C, P, A, Z, S, O: Z == 0 or S == O), + "jnle" : (lambda C, P, A, Z, S, O: Z == 0 or S == O), + + "jp" : (lambda C, P, A, Z, S, O: P == 1), + "jpe" : (lambda C, P, A, Z, S, O: P == 1), + + "jnp" : (lambda C, P, A, Z, S, O: P == 0), + "jpo" : (lambda C, P, A, Z, S, O: P == 0), + + # unconditional jump + + "call" : (lambda *_: True), + + } + + to_jump = False + if line.mnemonic in jump_table: + to_jump = jump_table.get(line.mnemonic)(*get_eflags(ql.reg.ef)) + + return (to_jump, ret_addr) + -def handle_bnj_arm(ql: Qiling, cur_addr: str) -> int: +def handle_bnj_arm(ql: Qiling, cur_addr: str) -> (bool, int): def _read_reg_val(regs, _reg): return getattr(ql.reg, _reg.replace("ip", "r12").replace("fp", "r11")) @@ -383,7 +474,7 @@ def regdst_eq_pc(op_str): return (to_jump, ret_addr) -def handle_bnj_mips(ql: Qiling, cur_addr: str) -> int: +def handle_bnj_mips(ql: Qiling, cur_addr: str) -> (bool, int): MIPS_INST_SIZE = 4 def _read_reg(regs, _reg): @@ -413,10 +504,10 @@ def _read_reg(regs, _reg): ] to_jump = { - "j" : (lambda _: True), # uncontitional jump - "jr" : (lambda _: True), # uncontitional jump - "jal" : (lambda _: True), # uncontitional jump - "jalr" : (lambda _: True), # uncontitional jump + "j" : (lambda _: True), # unconditional jump + "jr" : (lambda _: True), # unconditional jump + "jal" : (lambda _: True), # unconditional jump + "jalr" : (lambda _: True), # unconditional jump "b" : (lambda _: True), # unconditional branch "bl" : (lambda _: True), # unconditional branch "bal" : (lambda _: True), # unconditional branch From 98452a841abf1247ac9ab186e7aaae5fe7fa74ea Mon Sep 17 00:00:00 2001 From: ucgJhe Date: Wed, 9 Feb 2022 19:10:00 +0800 Subject: [PATCH 2/7] add jump hint --- qiling/debugger/qdb/frontend.py | 12 ++++++++---- qiling/debugger/qdb/qdb.py | 16 +++------------- qiling/debugger/qdb/utils.py | 24 ++++++++---------------- 3 files changed, 19 insertions(+), 33 deletions(-) diff --git a/qiling/debugger/qdb/frontend.py b/qiling/debugger/qdb/frontend.py index 03f584d8f..5a3def3a6 100644 --- a/qiling/debugger/qdb/frontend.py +++ b/qiling/debugger/qdb/frontend.py @@ -301,7 +301,7 @@ def print_asm(ql: Qiling, insn: CsInsn, to_jump: Optional[bool] = None, address: cursor = "►" jump_sign = " " - if to_jump and address != ql.reg.arch_pc+4: + if to_jump: jump_sign = f"{color.RED}✓{color.END}" print(f"{jump_sign} {cursor} {color.DARKGRAY}{trace_line}{color.END}") @@ -314,18 +314,22 @@ def context_asm(ql: Qiling, address: int) -> None: if ql.archtype in (QL_ARCH.X86, QL_ARCH.X8664): past_list = [] - # assembly before current location + # assembly for current location line = disasm(ql, address) + to_jump, next_stop = handle_bnj(ql, address) + print_asm(ql, line, to_jump=to_jump) + + # assembly after current location + acc_size = line.size while line and len(past_list) != 10: - past_list.append(line) next_start = address + acc_size line = disasm(ql, next_start) acc_size += line.size + past_list.append(line) - # print four insns before current location for line in past_list[:-1]: print_asm(ql, line) diff --git a/qiling/debugger/qdb/qdb.py b/qiling/debugger/qdb/qdb.py index 541e8d684..167349536 100644 --- a/qiling/debugger/qdb/qdb.py +++ b/qiling/debugger/qdb/qdb.py @@ -40,16 +40,6 @@ def dbg_hook(self: QlQdb, init_hook: str): # self.ql.loader.entry_point # ld.so # self.ql.loader.elf_entry # .text of binary - if self.ql.archtype in (QL_ARCH.X86, QL_ARCH.X8664): - # def pause(ql, addr, size): - # data = ql.mem.read(addr, size) - # ql.disassembler.detail = True - # breakpoint() - # ret = next(ql.disassembler.disasm(data, addr)) - # print(ret) - - self.ql.hook_code(handle_bnj) - if init_hook and self.ql.loader.entry_point != init_hook: self.do_breakpoint(init_hook) @@ -232,10 +222,10 @@ def do_step(self: QlQdb, *args) -> Optional[bool]: if self.rr: self._save() - # _, next_stop = handle_bnj(self.ql, self.cur_addr) + _, next_stop = handle_bnj(self.ql, self.cur_addr) - # if next_stop is CODE_END: - # return True + if next_stop is CODE_END: + return True if self.ql.archtype == QL_ARCH.CORTEX_M: self.ql.arch.step() diff --git a/qiling/debugger/qdb/utils.py b/qiling/debugger/qdb/utils.py index 8e2715a9b..90bff6733 100644 --- a/qiling/debugger/qdb/utils.py +++ b/qiling/debugger/qdb/utils.py @@ -112,14 +112,14 @@ def signed_val(val: int) -> int: # handle braches and jumps so we can set berakpoint properly -def handle_bnj(ql: Qiling, cur_addr: str, size: int) -> Callable[[Qiling, str], int]: +def handle_bnj(ql: Qiling, cur_addr: str) -> Callable[[Qiling, str], int]: return { QL_ARCH.MIPS : handle_bnj_mips, QL_ARCH.ARM : handle_bnj_arm, QL_ARCH.ARM_THUMB: handle_bnj_arm, QL_ARCH.CORTEX_M : handle_bnj_arm, QL_ARCH.X86 : handle_bnj_x86, - }.get(ql.archtype)(ql, cur_addr, size) + }.get(ql.archtype)(ql, cur_addr) def get_cpsr(bits: int) -> (bool, bool, bool, bool): @@ -141,15 +141,6 @@ def get_x86_eflags(bits: int) -> Dict[str, bool]: "OF" : bits & 0x0800 != 0, # OF, overflow flag } -def get_eflags(bits: int) -> (bool, bool, bool, bool, bool, bool): - return ( - bits & 0x0001 != 0, # CF, carry flag - bits & 0x0004 != 0, # PF, parity flag - bits & 0x0010 != 0, # AF, adjust flag - bits & 0x0040 != 0, # ZF, zero flag - bits & 0x0080 != 0, # SF, sign flag - bits & 0x0800 != 0, # OF, overflow flag - ) def is_thumb(bits: int) -> bool: return bits & 0x00000020 != 0 @@ -196,15 +187,14 @@ def _read_inst(ql: Qiling, addr: int) -> int: return result -def handle_bnj_x86(ql: Qilng, cur_addr: str, size: int) -> (bool, int): + +def handle_bnj_x86(ql: Qilng, cur_addr: str) -> (bool, int): # FIXME: NO HANDLE BRANCH AND JUMP FOR X86 FOR NOW ret_addr = None - md = ql.disassembler - data = ql.mem.read(cur_addr, size) - line = next(md.disasm(data, cur_addr)) + line = disasm(ql, cur_addr) jump_table = { @@ -256,12 +246,14 @@ def handle_bnj_x86(ql: Qilng, cur_addr: str, size: int) -> (bool, int): # unconditional jump "call" : (lambda *_: True), + "jmp" : (lambda *_: True), } to_jump = False if line.mnemonic in jump_table: - to_jump = jump_table.get(line.mnemonic)(*get_eflags(ql.reg.ef)) + eflags = get_x86_eflags(ql.reg.ef).values() + to_jump = jump_table.get(line.mnemonic)(*eflags) return (to_jump, ret_addr) From 3b865bf1f6ce58ce1debecd1912b9fda9d75afa9 Mon Sep 17 00:00:00 2001 From: ucgJhe Date: Thu, 10 Feb 2022 01:59:05 +0800 Subject: [PATCH 3/7] massive refactor --- qiling/debugger/qdb/frontend.py | 469 +++++++++++++++-------- qiling/debugger/qdb/qdb.py | 25 +- qiling/debugger/qdb/utils.py | 658 +++++++++++++++----------------- 3 files changed, 620 insertions(+), 532 deletions(-) diff --git a/qiling/debugger/qdb/frontend.py b/qiling/debugger/qdb/frontend.py index 5a3def3a6..4f9e5e614 100644 --- a/qiling/debugger/qdb/frontend.py +++ b/qiling/debugger/qdb/frontend.py @@ -13,9 +13,22 @@ import unicorn -from .utils import dump_regs, get_x86_eflags, get_arm_flags, disasm, _parse_int, handle_bnj from .const import * +from .utils import disasm, get_x86_eflags, setup_branch_predictor +def setup_ctx_manager(ql: Qiling) -> CtxManager: + + if ql.archtype == QL_ARCH.MIPS: + return CtxManager_MIPS(ql) + + elif ql.archtype in (QL_ARCH.ARM, QL_ARCH.ARM_THUMB): + return CtxManager_ARM(ql) + + elif ql.archtype in (QL_ARCH.X86, QL_ARCH.X8664): + return CtxManager_X86(ql) + + elif ql.archtype == QL_ARCH.CORTEX_M: + return CtxManager_CORTEX_M(ql) # read data from memory of qiling instance def examine_mem(ql: Qiling, line: str) -> Union[bool, (str, int, int)]: @@ -146,228 +159,348 @@ def _try_read(ql: Qiling, address: int, size: int) -> Optional[bytes]: return (result, err_msg) -# divider printer -@contextmanager -def context_printer(ql: Qiling, field_name: str, ruler: str = "─") -> None: - height, width = get_terminal_size() - bar = (width - len(field_name)) // 2 - 1 - print(ruler * bar, field_name, ruler * bar) - yield - if "DISASM" in field_name: - print(ruler * width) - - -def context_reg(ql: Qiling, saved_states: Optional[Mapping[str, int]] = None, /, *args, **kwargs) -> None: - - # context render for registers - with context_printer(ql, "[ REGISTERS ]"): - - _cur_regs = dump_regs(ql) - - _colors = (color.DARKCYAN, color.BLUE, color.RED, color.YELLOW, color.GREEN, color.PURPLE, color.CYAN, color.WHITE) - - if ql.archtype == QL_ARCH.MIPS: - - _cur_regs.update({"fp": _cur_regs.pop("s8")}) +""" - if saved_states is not None: - _saved_states = copy.deepcopy(saved_states) - _saved_states.update({"fp": _saved_states.pop("s8")}) - _diff = [k for k in _cur_regs if _cur_regs[k] != _saved_states[k]] + Context Manager for rendering UI - else: - _diff = None - - lines = "" - for idx, r in enumerate(_cur_regs, 1): - line = "{}{}: 0x{{:08x}} {}\t".format(_colors[(idx-1) // 4], r, color.END) - - if _diff and r in _diff: - line = f"{color.UNDERLINE}{color.BOLD}{line}" +""" - if idx % 4 == 0 and idx != 32: - line += "\n" +COLORS = (color.DARKCYAN, color.BLUE, color.RED, color.YELLOW, color.GREEN, color.PURPLE, color.CYAN, color.WHITE) - lines += line - - print(lines.format(*_cur_regs.values())) +# divider printer +def context_printer(field_name, ruler="─"): + def decorator(context_dumper): + def wrapper(*args, **kwargs): + height, width = get_terminal_size() + bar = (width - len(field_name)) // 2 - 1 + print(ruler * bar, field_name, ruler * bar) + context_dumper(*args, **kwargs) + if "DISASM" in field_name: + print(ruler * width) + return wrapper + return decorator + +class CtxManager(object): + + def __init__(self, ql): + self.ql = ql + self.predictor = setup_branch_predictor(ql) + + def print_asm(self, insn: CsInsn, to_jump: Optional[bool] = None, address: int = None) -> None: + + opcode = "".join(f"{b:02x}" for b in insn.bytes) + if self.ql.archtype in (QL_ARCH.X86, QL_ARCH.X8664): + trace_line = f"0x{insn.address:08x} │ {opcode:20s} {insn.mnemonic:10} {insn.op_str:35s}" + else: + trace_line = f"0x{insn.address:08x} │ {opcode:10s} {insn.mnemonic:10} {insn.op_str:35s}" - elif ql.archtype == QL_ARCH.X86: + cursor = " " + if self.ql.reg.arch_pc == insn.address: + cursor = "►" - if saved_states is not None: - _saved_states = copy.deepcopy(saved_states) - _diff = [k for k in _cur_regs if _cur_regs[k] != _saved_states[k]] + jump_sign = " " + if to_jump: + jump_sign = f"{color.RED}✓{color.END}" - else: - _diff = None + print(f"{jump_sign} {cursor} {color.DARKGRAY}{trace_line}{color.END}") - lines = "" - for idx, r in enumerate(_cur_regs, 1): - if len(r) == 2: - line = "{}{}: 0x{{:08x}} {}\t\t".format(_colors[(idx-1) // 4], r, color.END) - else: - line = "{}{}: 0x{{:08x}} {}\t".format(_colors[(idx-1) // 4], r, color.END) + def dump_regs(self): + return {reg_name: getattr(self.ql.reg, reg_name) for reg_name in self.regs} - if _diff and r in _diff: - line = f"{color.UNDERLINE}{color.BOLD}{line}" + def context_reg(self, saved_states): + return NotImplementedError - if idx % 4 == 0 and idx != 32: - line += "\n" + @context_printer("[ STACK ]") + def context_stack(self): - lines += line + for idx in range(10): + addr = self.ql.reg.arch_sp + idx * self.ql.pointersize + val = self.ql.mem.read(addr, self.ql.pointersize) + print(f"$sp+0x{idx*self.ql.pointersize:02x}│ [0x{addr:08x}] —▸ 0x{self.ql.unpack(val):08x}", end="") - print(lines.format(*_cur_regs.values())) - print(color.GREEN, "EFLAGS: [CF: {flags[CF]}, PF: {flags[PF]}, AF: {flags[AF]}, ZF: {flags[ZF]}, SF: {flags[SF]}, OF: {flags[OF]}]".format(flags=get_x86_eflags(ql.reg.ef)), color.END, sep="") + # try to dereference wether it's a pointer + if (buf := _try_read(self.ql, addr, self.ql.pointersize))[0] is not None: - elif ql.archtype in (QL_ARCH.ARM, QL_ARCH.ARM_THUMB, QL_ARCH.CORTEX_M): + if (addr := self.ql.unpack(buf[0])): - _cur_regs.update({"sl": _cur_regs.pop("r10")}) - _cur_regs.update({"ip": _cur_regs.pop("r12")}) - _cur_regs.update({"fp": _cur_regs.pop("r11")}) + # try to dereference again + if (buf := _try_read(self.ql, addr, self.ql.pointersize))[0] is not None: + try: + s = self.ql.mem.string(addr) + except: + s = None - regs_in_row = 4 - if ql.archtype == QL_ARCH.CORTEX_M: - regs_in_row = 3 + if s and s.isprintable(): + print(f" ◂— {self.ql.mem.string(addr)}", end="") + else: + print(f" ◂— 0x{self.ql.unpack(buf[0]):08x}", end="") + print() - # for re-order - _cur_regs.update({"xpsr": _cur_regs.pop("xpsr")}) - _cur_regs.update({"control": _cur_regs.pop("control")}) - _cur_regs.update({"primask": _cur_regs.pop("primask")}) - _cur_regs.update({"faultmask": _cur_regs.pop("faultmask")}) - _cur_regs.update({"basepri": _cur_regs.pop("basepri")}) + @context_printer("[ DISASM ]") + def context_asm(self): + # assembly before current location + past_list = [] + cur_addr = self.ql.reg.arch_pc - _diff = None - if saved_states is not None: - _saved_states = copy.deepcopy(saved_states) - _saved_states.update({"sl": _saved_states.pop("r10")}) - _saved_states.update({"ip": _saved_states.pop("r12")}) - _saved_states.update({"fp": _saved_states.pop("r11")}) - _diff = [k for k in _cur_regs if _cur_regs[k] != _saved_states[k]] + line = disasm(self.ql, cur_addr-0x10) - lines = "" - for idx, r in enumerate(_cur_regs, 1): + while line: + if line.address == cur_addr: + break - line = "{}{:}: 0x{{:08x}} {} ".format(_colors[(idx-1) // regs_in_row], r, color.END) + addr = line.address + line.size + line = disasm(self.ql, addr) - if _diff and r in _diff: - line = "{}{}".format(color.UNDERLINE, color.BOLD) + line + if not line: + break - if idx % regs_in_row == 0: - line += "\n" + past_list.append(line) - lines += line + # print four insns before current location + for line in past_list[:-1][:4]: + self.print_asm(line) - print(lines.format(*_cur_regs.values())) - print(color.GREEN, "[{cpsr[mode]} mode], Thumb: {cpsr[thumb]}, FIQ: {cpsr[fiq]}, IRQ: {cpsr[irq]}, NEG: {cpsr[neg]}, ZERO: {cpsr[zero]}, Carry: {cpsr[carry]}, Overflow: {cpsr[overflow]}".format(cpsr=get_arm_flags(ql.reg.cpsr)), color.END, sep="") + # assembly for current location - if ql.archtype != QL_ARCH.CORTEX_M: - # context render for Stack, skip this for CORTEX_M - with context_printer(ql, "[ STACK ]", ruler="─"): + cur_ins = disasm(self.ql, cur_addr) + to_jump, _ = self.predictor.predict() + self.print_asm(cur_ins, to_jump=to_jump) - for idx in range(10): - addr = ql.reg.arch_sp + idx * ql.pointersize - val = ql.mem.read(addr, ql.pointersize) - print(f"$sp+0x{idx*ql.pointersize:02x}│ [0x{addr:08x}] —▸ 0x{ql.unpack(val):08x}", end="") + # assembly after current location - # try to dereference wether it's a pointer - if (buf := _try_read(ql, addr, ql.pointersize))[0] is not None: + forward_insn_size = cur_ins.size + for _ in range(5): + forward_insn = disasm(self.ql, cur_addr+forward_insn_size) + if forward_insn: + self.print_asm(forward_insn) + forward_insn_size += forward_insn.size - if (addr := ql.unpack(buf[0])): +class CtxManager_ARM(CtxManager): + def __init__(self, ql): + super().__init__(ql) - # try to dereference again - if (buf := _try_read(ql, addr, ql.pointersize))[0] is not None: - try: - s = ql.mem.string(addr) - except: - s = None + self.regs = ( + "r0", "r1", "r2", "r3", + "r4", "r5", "r6", "r7", + "r8", "r9", "r10", "r11", + "r12", "sp", "lr", "pc", + ) - if s and s.isprintable(): - print(f" ◂— {ql.mem.string(addr)}", end="") - else: - print(f" ◂— 0x{ql.unpack(buf[0]):08x}", end="") - print() + @staticmethod + def get_flags(bits: int) -> Mapping[str, int]: + def _get_mode(bits): + return { + 0b10000: "User", + 0b10001: "FIQ", + 0b10010: "IRQ", + 0b10011: "Supervisor", + 0b10110: "Monitor", + 0b10111: "Abort", + 0b11010: "Hypervisor", + 0b11011: "Undefined", + 0b11111: "System", + }.get(bits & 0x00001f) -def print_asm(ql: Qiling, insn: CsInsn, to_jump: Optional[bool] = None, address: int = None) -> None: + return { + "mode": _get_mode(bits), + "thumb": bits & 0x00000020 != 0, + "fiq": bits & 0x00000040 != 0, + "irq": bits & 0x00000080 != 0, + "neg": bits & 0x80000000 != 0, + "zero": bits & 0x40000000 != 0, + "carry": bits & 0x20000000 != 0, + "overflow": bits & 0x10000000 != 0, + } + + @context_printer("[ REGISTERS ]") + def context_reg(self, saved_reg_dump): + cur_regs = self.dump_regs() + + cur_regs.update({"sl": cur_regs.pop("r10")}) + cur_regs.update({"ip": cur_regs.pop("r12")}) + cur_regs.update({"fp": cur_regs.pop("r11")}) + + regs_in_row = 4 + + diff = None + if saved_reg_dump is not None: + reg_dump = copy.deepcopy(saved_reg_dump) + reg_dump.update({"sl": reg_dump.pop("r10")}) + reg_dump.update({"ip": reg_dump.pop("r12")}) + reg_dump.update({"fp": reg_dump.pop("r11")}) + diff = [k for k in cur_regs if cur_regs[k] != reg_dump[k]] + + lines = "" + for idx, r in enumerate(cur_regs, 1): + + line = "{}{:}: 0x{{:08x}} {} ".format(COLORS[(idx-1) // regs_in_row], r, color.END) + + if diff and r in diff: + line = f"{color.UNDERLINE}{color.BOLD}{line}" + + if idx % regs_in_row == 0: + line += "\n" + + lines += line + + print(lines.format(*cur_regs.values())) + print(color.GREEN, "[{cpsr[mode]} mode], Thumb: {cpsr[thumb]}, FIQ: {cpsr[fiq]}, IRQ: {cpsr[irq]}, NEG: {cpsr[neg]}, ZERO: {cpsr[zero]}, Carry: {cpsr[carry]}, Overflow: {cpsr[overflow]}".format(cpsr=self.get_flags(self.ql.reg.cpsr)), color.END, sep="") + + +class CtxManager_MIPS(CtxManager): + def __init__(self, ql): + super().__init__(ql) + + self.regs = ( + "gp", "at", "v0", "v1", + "a0", "a1", "a2", "a3", + "t0", "t1", "t2", "t3", + "t4", "t5", "t6", "t7", + "t8", "t9", "sp", "s8", + "s0", "s1", "s2", "s3", + "s4", "s5", "s6", "s7", + "ra", "k0", "k1", "pc", + ) + + @context_printer("[ REGISTERS ]") + def context_reg(self, saved_reg_dump): + + cur_regs = self.dump_regs() + + cur_regs.update({"fp": cur_regs.pop("s8")}) + + diff = None + if saved_reg_dump is not None: + reg_dump = copy.deepcopy(saved_reg_dump) + reg_dump.update({"fp": saved_reg_dump.pop("s8")}) + diff = [k for k in cur_regs if cur_regs[k] != reg_dump[k]] + + lines = "" + for idx, r in enumerate(cur_regs, 1): + line = "{}{}: 0x{{:08x}} {}\t".format(COLORS[(idx-1) // 4], r, color.END) + + if diff and r in diff: + line = f"{color.UNDERLINE}{color.BOLD}{line}" + + if idx % 4 == 0 and idx != 32: + line += "\n" + + lines += line + + print(lines.format(*cur_regs.values())) + +class CtxManager_X86(CtxManager): + def __init__(self, ql): + super().__init__(ql) + + self.regs = ( + "eax", "ebx", "ecx", "edx", + "esp", "ebp", "esi", "edi", + "eip", "ss", "cs", "ds", "es", + "fs", "gs", "ef", + ) + @context_printer("[ REGISTERS ]") + def context_reg(self, saved_reg_dump): + cur_regs = self.dump_regs() + + diff = None + if saved_reg_dump is not None: + reg_dump = copy.deepcopy(saved_reg_dump) + diff = [k for k in cur_regs if cur_regs[k] != saved_reg_dump[k]] - opcode = "".join(f"{b:02x}" for b in insn.bytes) - if ql.archtype in (QL_ARCH.X86, QL_ARCH.X8664): - trace_line = f"0x{insn.address:08x} │ {opcode:20s} {insn.mnemonic:10} {insn.op_str:35s}" - else: - trace_line = f"0x{insn.address:08x} │ {opcode:10s} {insn.mnemonic:10} {insn.op_str:35s}" + lines = "" + for idx, r in enumerate(cur_regs, 1): + if len(r) == 2: + line = "{}{}: 0x{{:08x}} {}\t\t".format(COLORS[(idx-1) // 4], r, color.END) + else: + line = "{}{}: 0x{{:08x}} {}\t".format(COLORS[(idx-1) // 4], r, color.END) - cursor = " " - if ql.reg.arch_pc == insn.address: - cursor = "►" + if diff and r in diff: + line = f"{color.UNDERLINE}{color.BOLD}{line}" - jump_sign = " " - if to_jump: - jump_sign = f"{color.RED}✓{color.END}" + if idx % 4 == 0 and idx != 32: + line += "\n" - print(f"{jump_sign} {cursor} {color.DARKGRAY}{trace_line}{color.END}") + lines += line + print(lines.format(*cur_regs.values())) + print(color.GREEN, "EFLAGS: [CF: {flags[CF]}, PF: {flags[PF]}, AF: {flags[AF]}, ZF: {flags[ZF]}, SF: {flags[SF]}, OF: {flags[OF]}]".format(flags=get_x86_eflags(self.ql.reg.ef)), color.END, sep="") -def context_asm(ql: Qiling, address: int) -> None: + @context_printer("[ DISASM ]") + def context_asm(self): + past_list = [] + cur_addr = self.ql.reg.arch_pc - with context_printer(ql, field_name="[ DISASM ]"): + # assembly before current location - if ql.archtype in (QL_ARCH.X86, QL_ARCH.X8664): - past_list = [] + line = disasm(self.ql, cur_addr) + acc_size = line.size - # assembly for current location + while line and len(past_list) != 10: + past_list.append(line) + next_start = cur_addr + acc_size + line = disasm(self.ql, next_start) + acc_size += line.size - line = disasm(ql, address) - to_jump, next_stop = handle_bnj(ql, address) - print_asm(ql, line, to_jump=to_jump) + # print four insns before current location + for line in past_list[:-1]: + self.print_asm(line) - # assembly after current location - acc_size = line.size +class CtxManager_CORTEX_M(CtxManager): + def __init__(self, ql): + super().__init__(ql) - while line and len(past_list) != 10: - next_start = address + acc_size - line = disasm(ql, next_start) - acc_size += line.size - past_list.append(line) + self.regs = ( + "r0", "r1", "r2", "r3", + "r4", "r5", "r6", "r7", + "r8", "r9", "r10", "r11", + "r12", "sp", "lr", "pc", + "xpsr", "control", "primask", "basepri", "faultmask" + ) - for line in past_list[:-1]: - print_asm(ql, line) + @context_printer("[ REGISTERS ]") + def context_reg(self, saved_reg_dump): - else: + cur_regs.update({"sl": cur_regs.pop("r10")}) + cur_regs.update({"ip": cur_regs.pop("r12")}) + cur_regs.update({"fp": cur_regs.pop("r11")}) - # assembly before current location + regs_in_row = 3 - past_list = [] + # for re-order + cur_regs.update({"xpsr": cur_regs.pop("xpsr")}) + cur_regs.update({"control": cur_regs.pop("control")}) + cur_regs.update({"primask": cur_regs.pop("primask")}) + cur_regs.update({"faultmask": cur_regs.pop("faultmask")}) + cur_regs.update({"basepri": cur_regs.pop("basepri")}) - line = disasm(ql, address-0x10) + diff = None + if saved_reg_dump is not None: + reg_dump = copy.deepcopy(saved_reg_dump) + reg_dump.update({"sl": reg_dump.pop("r10")}) + reg_dump.update({"ip": reg_dump.pop("r12")}) + reg_dump.update({"fp": reg_dump.pop("r11")}) + diff = [k for k in cur_regs if cur_regs[k] != reg_dump[k]] - while line: - if line.address == address: - break + lines = "" + for idx, r in enumerate(_cur_regs, 1): - addr = line.address + line.size - line = disasm(ql, addr) + line = "{}{:}: 0x{{:08x}} {} ".format(_colors[(idx-1) // regs_in_row], r, color.END) - if not line: - break + if _diff and r in _diff: + line = "{}{}".format(color.UNDERLINE, color.BOLD) + line - past_list.append(line) + if idx % regs_in_row == 0: + line += "\n" - # print four insns before current location - for line in past_list[:-1][:4]: - print_asm(ql, line) + lines += line - # assembly for current location + print(lines.format(cur_regs.values())) + print(color.GREEN, "[{cpsr[mode]} mode], Thumb: {cpsr[thumb]}, FIQ: {cpsr[fiq]}, IRQ: {cpsr[irq]}, NEG: {cpsr[neg]}, ZERO: {cpsr[zero]}, Carry: {cpsr[carry]}, Overflow: {cpsr[overflow]}".format(cpsr=get_arm_flags(self.ql.reg.cpsr)), color.END, sep="") - cur_ins = disasm(ql, address) - to_jump, next_stop = handle_bnj(ql, address) - print_asm(ql, cur_ins, to_jump=to_jump) - # assembly after current location +if __name__ == "__main__": + pass - forward_insn_size = cur_ins.size - for _ in range(5): - forward_insn = disasm(ql, address+forward_insn_size) - if forward_insn: - print_asm(ql, forward_insn) - forward_insn_size += forward_insn.size diff --git a/qiling/debugger/qdb/qdb.py b/qiling/debugger/qdb/qdb.py index 167349536..a55b9d810 100644 --- a/qiling/debugger/qdb/qdb.py +++ b/qiling/debugger/qdb/qdb.py @@ -12,8 +12,9 @@ from qiling.const import QL_ARCH, QL_VERBOSE from qiling.debugger import QlDebugger -from .frontend import context_reg, context_asm, examine_mem -from .utils import handle_bnj, is_thumb, CODE_END, parse_int +# from .frontend import context_reg, context_asm, examine_mem +from .frontend import examine_mem, setup_ctx_manager +from .utils import _parse_int, is_thumb, parse_int from .utils import Breakpoint, TempBreakpoint from .const import * @@ -31,6 +32,8 @@ def __init__(self: QlQdb, ql: Qiling, init_hook: str = "", rr: bool = False) -> if self.rr: self._states_list = [] + self.ctx = setup_ctx_manager(ql) + super().__init__() self.dbg_hook(init_hook) @@ -191,8 +194,9 @@ def do_context(self: QlQdb, *args) -> None: show context information for current location """ - context_reg(self.ql, self._saved_reg_dump) - context_asm(self.ql, self.cur_addr) + self.ctx.context_reg(self._saved_reg_dump) + self.ctx.context_stack() + self.ctx.context_asm() def do_backward(self: QlQdb, *args) -> None: """ @@ -222,22 +226,17 @@ def do_step(self: QlQdb, *args) -> Optional[bool]: if self.rr: self._save() - _, next_stop = handle_bnj(self.ql, self.cur_addr) + # next_stop = self.predictor.predict() - if next_stop is CODE_END: - return True + # if next_stop is True: + # return True if self.ql.archtype == QL_ARCH.CORTEX_M: self.ql.arch.step() self.ql.count -= 1 else: - if self.ql.archtype in (QL_ARCH.X86, QL_ARCH.X8664): - count = 1 - else: - count = 1 if next_stop == self.cur_addr + 4 else 2 - - self._run(count=count) + self._run(count=1) self.do_context() diff --git a/qiling/debugger/qdb/utils.py b/qiling/debugger/qdb/utils.py index 90bff6733..d93f8dfa0 100644 --- a/qiling/debugger/qdb/utils.py +++ b/qiling/debugger/qdb/utils.py @@ -5,92 +5,15 @@ from __future__ import annotations from typing import Callable, Optional, Mapping -from functools import partial from qiling.const import * -CODE_END = True - - -def dump_regs(ql: Qiling) -> Mapping[str, int]: - - if ql.archtype == QL_ARCH.MIPS: - - _reg_order = ( - "gp", "at", "v0", "v1", - "a0", "a1", "a2", "a3", - "t0", "t1", "t2", "t3", - "t4", "t5", "t6", "t7", - "t8", "t9", "sp", "s8", - "s0", "s1", "s2", "s3", - "s4", "s5", "s6", "s7", - "ra", "k0", "k1", "pc", - ) - - elif ql.archtype in (QL_ARCH.ARM, QL_ARCH.ARM_THUMB): - - _reg_order = ( - "r0", "r1", "r2", "r3", - "r4", "r5", "r6", "r7", - "r8", "r9", "r10", "r11", - "r12", "sp", "lr", "pc", - ) - - elif ql.archtype == QL_ARCH.X86: - - _reg_order = ( - "eax", "ebx", "ecx", "edx", - "esp", "ebp", "esi", "edi", - "eip", "ss", "cs", "ds", "es", - "fs", "gs", "ef", - ) - - elif ql.archtype == QL_ARCH.CORTEX_M: - - _reg_order = ( - "r0", "r1", "r2", "r3", - "r4", "r5", "r6", "r7", - "r8", "r9", "r10", "r11", - "r12", "sp", "lr", "pc", - "xpsr", "control", "primask", "basepri", "faultmask" - ) - - return {reg_name: getattr(ql.reg, reg_name) for reg_name in _reg_order} - - -def get_arm_flags(bits: int) -> Mapping[str, int]: - - def _get_mode(bits): - return { - 0b10000: "User", - 0b10001: "FIQ", - 0b10010: "IRQ", - 0b10011: "Supervisor", - 0b10110: "Monitor", - 0b10111: "Abort", - 0b11010: "Hypervisor", - 0b11011: "Undefined", - 0b11111: "System", - }.get(bits & 0x00001f) - - return { - "mode": _get_mode(bits), - "thumb": bits & 0x00000020 != 0, - "fiq": bits & 0x00000040 != 0, - "irq": bits & 0x00000080 != 0, - "neg": bits & 0x80000000 != 0, - "zero": bits & 0x40000000 != 0, - "carry": bits & 0x20000000 != 0, - "overflow": bits & 0x10000000 != 0, - } - - # parse unsigned integer from string def _parse_int(s: str) -> int: return int(s, 0) -# function dectorator for parse argument as integer +# function dectorator for parsing argument as integer def parse_int(func: Callable) -> Callable: def wrap(qdb, s: str = "") -> int: assert type(s) is str @@ -101,27 +24,17 @@ def wrap(qdb, s: str = "") -> int: return func(qdb, ret) return wrap + # check wether negative value or not def is_negative(i: int) -> int: return i & (1 << 31) -# convert valu to signed +# signed value convertion def signed_val(val: int) -> int: return (val-1 << 32) if is_negative(val) else val -# handle braches and jumps so we can set berakpoint properly -def handle_bnj(ql: Qiling, cur_addr: str) -> Callable[[Qiling, str], int]: - return { - QL_ARCH.MIPS : handle_bnj_mips, - QL_ARCH.ARM : handle_bnj_arm, - QL_ARCH.ARM_THUMB: handle_bnj_arm, - QL_ARCH.CORTEX_M : handle_bnj_arm, - QL_ARCH.X86 : handle_bnj_x86, - }.get(ql.archtype)(ql, cur_addr) - - def get_cpsr(bits: int) -> (bool, bool, bool, bool): return ( bits & 0x10000000 != 0, # V, overflow flag @@ -147,6 +60,9 @@ def is_thumb(bits: int) -> bool: def disasm(ql: Qiling, address: int, detail: bool = False) -> Optional[int]: + """ + helper function for disassembling + """ md = ql.disassembler md.detail = detail @@ -160,7 +76,6 @@ def disasm(ql: Qiling, address: int, detail: bool = False) -> Optional[int]: def _read_inst(ql: Qiling, addr: int) -> int: - result = ql.mem.read(addr, 4) if ql.archtype in (QL_ARCH.ARM, QL_ARCH.ARM_THUMB, QL_ARCH.CORTEX_M): @@ -187,357 +102,397 @@ def _read_inst(ql: Qiling, addr: int) -> int: return result +""" + Try to predict certian branch will be taken or not based on current context +""" -def handle_bnj_x86(ql: Qilng, cur_addr: str) -> (bool, int): - - # FIXME: NO HANDLE BRANCH AND JUMP FOR X86 FOR NOW +def setup_branch_predictor(ql: Qiling) -> BranchPredictor: - ret_addr = None - - line = disasm(ql, cur_addr) - - jump_table = { - - # conditional jump - "jo" : (lambda C, P, A, Z, S, O: O == 1), - "jno" : (lambda C, P, A, Z, S, O: O == 0), - - "js" : (lambda C, P, A, Z, S, O: S == 1), - "jns" : (lambda C, P, A, Z, S, O: S == 0), - - "je" : (lambda C, P, A, Z, S, O: Z == 1), - "jz" : (lambda C, P, A, Z, S, O: Z == 1), + if ql.archtype == QL_ARCH.MIPS: + return BranchPredictor_MIPS(ql) - "jne" : (lambda C, P, A, Z, S, O: Z == 0), - "jnz" : (lambda C, P, A, Z, S, O: Z == 0), + elif ql.archtype in (QL_ARCH.ARM, QL_ARCH.ARM_THUMB): + return BranchPredictor_ARM(ql) - "jb" : (lambda C, P, A, Z, S, O: C == 1), - "jc" : (lambda C, P, A, Z, S, O: C == 1), - "jnae" : (lambda C, P, A, Z, S, O: C == 1), + elif ql.archtype in (QL_ARCH.X86, QL_ARCH.X8664): + return BranchPredictor_X86(ql) - "jnb" : (lambda C, P, A, Z, S, O: C == 0), - "jnc" : (lambda C, P, A, Z, S, O: C == 0), - "jae" : (lambda C, P, A, Z, S, O: C == 0), + elif ql.archtype == QL_ARCH.CORTEX_M: + return BranchPredictor_CORTEX_M(ql) - "jbe" : (lambda C, P, A, Z, S, O: C == 1 or Z == 1), - "jna" : (lambda C, P, A, Z, S, O: C == 1 or Z == 1), +class BranchPredictor(object): + def __init__(self, ql): + self.ql = ql - "ja" : (lambda C, P, A, Z, S, O: C == 0 and Z == 0), - "jnbe" : (lambda C, P, A, Z, S, O: C == 0 and Z == 0), + def read_reg(self, reg_name): + return NotImplementedError - "jl" : (lambda C, P, A, Z, S, O: S != O), - "jnge" : (lambda C, P, A, Z, S, O: S != O), + def resolve(self): + return NotImplementedError - "jge" : (lambda C, P, A, Z, S, O: S == O), - "jnl" : (lambda C, P, A, Z, S, O: S == O), +class BranchPredictor_ARM(BranchPredictor): + def __init__(self, ql): + super().__init__(ql) - "jle" : (lambda C, P, A, Z, S, O: Z == 1 or S != O), - "jng" : (lambda C, P, A, Z, S, O: Z == 1 or S != O), + self.INST_SIZE = 4 + self.THUMB_INST_SIZE = 2 + self.CODE_END = "udf" - "jg" : (lambda C, P, A, Z, S, O: Z == 0 or S == O), - "jnle" : (lambda C, P, A, Z, S, O: Z == 0 or S == O), + def read_reg(self, reg_name): + reg_name = reg_name.replace("ip", "r12").replace("fp", "r11") + return getattr(self.ql.reg, reg_name) - "jp" : (lambda C, P, A, Z, S, O: P == 1), - "jpe" : (lambda C, P, A, Z, S, O: P == 1), + def regdst_eq_pc(self, op_str): + return op_str.partition(", ")[0] == "pc" - "jnp" : (lambda C, P, A, Z, S, O: P == 0), - "jpo" : (lambda C, P, A, Z, S, O: P == 0), + def predict(self): + cur_addr = self.ql.reg.arch_pc + line = disasm(self.ql, cur_addr) + ret_addr = cur_addr + line.size - # unconditional jump + if line.mnemonic == self.CODE_END: # indicates program exited + return True - "call" : (lambda *_: True), - "jmp" : (lambda *_: True), + jump_table = { + # unconditional branch + "b" : (lambda *_: True), + "bl" : (lambda *_: True), + "bx" : (lambda *_: True), + "blx" : (lambda *_: True), + "b.w" : (lambda *_: True), - } + # branch on equal, Z == 1 + "beq" : (lambda V, C, Z, N: Z == 1), + "bxeq" : (lambda V, C, Z, N: Z == 1), + "beq.w": (lambda V, C, Z, N: Z == 1), - to_jump = False - if line.mnemonic in jump_table: - eflags = get_x86_eflags(ql.reg.ef).values() - to_jump = jump_table.get(line.mnemonic)(*eflags) + # branch on not equal, Z == 0 + "bne" : (lambda V, C, Z, N: Z == 0), + "bxne" : (lambda V, C, Z, N: Z == 0), + "bne.w": (lambda V, C, Z, N: Z == 0), - return (to_jump, ret_addr) + # branch on signed greater than, Z == 0 and N == V + "bgt" : (lambda V, C, Z, N: (Z == 0 and N == V)), + "bgt.w": (lambda V, C, Z, N: (Z == 0 and N == V)), + # branch on signed less than, N != V + "blt" : (lambda V, C, Z, N: N != V), -def handle_bnj_arm(ql: Qiling, cur_addr: str) -> (bool, int): + # branch on signed greater than or equal, N == V + "bge" : (lambda V, C, Z, N: N == V), - def _read_reg_val(regs, _reg): - return getattr(ql.reg, _reg.replace("ip", "r12").replace("fp", "r11")) + # branch on signed less than or queal + "ble" : (lambda V, C, Z, N: Z == 1 or N != V), - def regdst_eq_pc(op_str): - return op_str.partition(", ")[0] == "pc" + # branch on unsigned higher or same (or carry set), C == 1 + "bhs" : (lambda V, C, Z, N: C == 1), + "bcs" : (lambda V, C, Z, N: C == 1), - read_inst = partial(_read_inst, ql) - read_reg_val = partial(_read_reg_val, ql.reg) + # branch on unsigned lower (or carry clear), C == 0 + "bcc" : (lambda V, C, Z, N: C == 0), + "blo" : (lambda V, C, Z, N: C == 0), + "bxlo" : (lambda V, C, Z, N: C == 0), + "blo.w": (lambda V, C, Z, N: C == 0), - ARM_INST_SIZE = 4 - ARM_THUMB_INST_SIZE = 2 + # branch on negative or minus, N == 1 + "bmi" : (lambda V, C, Z, N: N == 1), - line = disasm(ql, cur_addr) - ret_addr = cur_addr + line.size + # branch on positive or plus, N == 0 + "bpl" : (lambda V, C, Z, N: N == 0), - if line.mnemonic == "udf": # indicates program exited - return CODE_END + # branch on signed overflow + "bvs" : (lambda V, C, Z, N: V == 1), - jump_table = { - # unconditional branch - "b" : (lambda *_: True), - "bl" : (lambda *_: True), - "bx" : (lambda *_: True), - "blx" : (lambda *_: True), - "b.w" : (lambda *_: True), + # branch on no signed overflow + "bvc" : (lambda V, C, Z, N: V == 0), - # branch on equal, Z == 1 - "beq" : (lambda V, C, Z, N: Z == 1), - "bxeq" : (lambda V, C, Z, N: Z == 1), - "beq.w": (lambda V, C, Z, N: Z == 1), + # branch on unsigned higher + "bhi" : (lambda V, C, Z, N: (Z == 0 and C == 1)), + "bxhi" : (lambda V, C, Z, N: (Z == 0 and C == 1)), + "bhi.w": (lambda V, C, Z, N: (Z == 0 and C == 1)), - # branch on not equal, Z == 0 - "bne" : (lambda V, C, Z, N: Z == 0), - "bxne" : (lambda V, C, Z, N: Z == 0), - "bne.w": (lambda V, C, Z, N: Z == 0), + # branch on unsigned lower + "bls" : (lambda V, C, Z, N: (C == 0 or Z == 1)), + "bls.w": (lambda V, C, Z, N: (C == 0 or Z == 1)), + } - # branch on signed greater than, Z == 0 and N == V - "bgt" : (lambda V, C, Z, N: (Z == 0 and N == V)), - "bgt.w": (lambda V, C, Z, N: (Z == 0 and N == V)), + cb_table = { + # branch on equal to zero + "cbz" : (lambda r: r == 0), - # branch on signed less than, N != V - "blt" : (lambda V, C, Z, N: N != V), + # branch on not equal to zero + "cbnz": (lambda r: r != 0), + } - # branch on signed greater than or equal, N == V - "bge" : (lambda V, C, Z, N: N == V), + to_jump = False + if line.mnemonic in jump_table: + to_jump = jump_table.get(line.mnemonic)(*get_cpsr(self.ql.reg.cpsr)) - # branch on signed less than or queal - "ble" : (lambda V, C, Z, N: Z == 1 or N != V), + elif line.mnemonic in cb_table: + to_jump = cb_table.get(line.mnemonic)(self.read_reg(line.op_str.split(", ")[0])) - # branch on unsigned higher or same (or carry set), C == 1 - "bhs" : (lambda V, C, Z, N: C == 1), - "bcs" : (lambda V, C, Z, N: C == 1), + if to_jump: + if "#" in line.op_str: + ret_addr = _parse_int(line.op_str.split("#")[-1]) + else: + ret_addr = read_reg_val(line.op_str) - # branch on unsigned lower (or carry clear), C == 0 - "bcc" : (lambda V, C, Z, N: C == 0), - "blo" : (lambda V, C, Z, N: C == 0), - "bxlo" : (lambda V, C, Z, N: C == 0), - "blo.w": (lambda V, C, Z, N: C == 0), + if regdst_eq_pc(line.op_str): + next_addr = cur_addr + line.size + n2_addr = next_addr + len(read_inst(next_addr)) + ret_addr += len(read_inst(n2_addr)) + len(read_inst(next_addr)) - # branch on negative or minus, N == 1 - "bmi" : (lambda V, C, Z, N: N == 1), + elif line.mnemonic.startswith("it"): + # handle IT block here - # branch on positive or plus, N == 0 - "bpl" : (lambda V, C, Z, N: N == 0), + cond_met = { + "eq": lambda V, C, Z, N: (Z == 1), + "ne": lambda V, C, Z, N: (Z == 0), + "ge": lambda V, C, Z, N: (N == V), + "hs": lambda V, C, Z, N: (C == 1), + "lo": lambda V, C, Z, N: (C == 0), + "mi": lambda V, C, Z, N: (N == 1), + "pl": lambda V, C, Z, N: (N == 0), + "ls": lambda V, C, Z, N: (C == 0 or Z == 1), + "le": lambda V, C, Z, N: (Z == 1 or N != V), + "hi": lambda V, C, Z, N: (Z == 0 and C == 1), + }.get(line.op_str)(*get_cpsr(ql.reg.cpsr)) - # branch on signed overflow - "bvs" : (lambda V, C, Z, N: V == 1), + it_block_range = [each_char for each_char in line.mnemonic[1:]] - # branch on no signed overflow - "bvc" : (lambda V, C, Z, N: V == 0), + next_addr = cur_addr + self.THUMB_INST_SIZE + for each in it_block_range: + _inst = read_inst(next_addr) + n2_addr = handle_bnj_arm(ql, next_addr) - # branch on unsigned higher - "bhi" : (lambda V, C, Z, N: (Z == 0 and C == 1)), - "bxhi" : (lambda V, C, Z, N: (Z == 0 and C == 1)), - "bhi.w": (lambda V, C, Z, N: (Z == 0 and C == 1)), + if (cond_met and each == "t") or (not cond_met and each == "e"): + if n2_addr != (next_addr+len(_inst)): # branch detected + break - # branch on unsigned lower - "bls" : (lambda V, C, Z, N: (C == 0 or Z == 1)), - "bls.w": (lambda V, C, Z, N: (C == 0 or Z == 1)), - } + next_addr += len(_inst) - cb_table = { - # branch on equal to zero - "cbz" : (lambda r: r == 0), + ret_addr = next_addr - # branch on not equal to zero - "cbnz": (lambda r: r != 0), - } + elif line.mnemonic in ("ldr",): - to_jump = False - if line.mnemonic in jump_table: - to_jump = jump_table.get(line.mnemonic)(*get_cpsr(ql.reg.cpsr)) + if self.regdst_eq_pc(line.op_str): + _, _, rn_offset = line.op_str.partition(", ") + r, _, imm = rn_offset.strip("[]!").partition(", #") - elif line.mnemonic in cb_table: - to_jump = cb_table.get(line.mnemonic)(read_reg_val(line.op_str.split(", ")[0])) + if "]" in rn_offset.split(", ")[1]: # pre-indexed immediate + ret_addr = ql.unpack32(ql.mem.read(_parse_int(imm) + read_reg_val(r), self.INST_SIZE)) - if to_jump: - if "#" in line.op_str: - ret_addr = _parse_int(line.op_str.split("#")[-1]) - else: - ret_addr = read_reg_val(line.op_str) + else: # post-indexed immediate + # FIXME: weired behavior, immediate here does not apply + ret_addr = ql.unpack32(ql.mem.read(read_reg_val(r), self.INST_SIZE)) - if regdst_eq_pc(line.op_str): - next_addr = cur_addr + line.size - n2_addr = next_addr + len(read_inst(next_addr)) - ret_addr += len(read_inst(n2_addr)) + len(read_inst(next_addr)) + elif line.mnemonic in ("addls", "addne", "add") and self.regdst_eq_pc(line.op_str): + V, C, Z, N = get_cpsr(ql.reg.cpsr) + r0, r1, r2, *imm = line.op_str.split(", ") - elif line.mnemonic.startswith("it"): - # handle IT block here + # program counter is awalys 8 bytes ahead when it comes with pc, need to add extra 8 bytes + extra = 8 if 'pc' in (r0, r1, r2) else 0 - cond_met = { - "eq": lambda V, C, Z, N: (Z == 1), - "ne": lambda V, C, Z, N: (Z == 0), - "ge": lambda V, C, Z, N: (N == V), - "hs": lambda V, C, Z, N: (C == 1), - "lo": lambda V, C, Z, N: (C == 0), - "mi": lambda V, C, Z, N: (N == 1), - "pl": lambda V, C, Z, N: (N == 0), - "ls": lambda V, C, Z, N: (C == 0 or Z == 1), - "le": lambda V, C, Z, N: (Z == 1 or N != V), - "hi": lambda V, C, Z, N: (Z == 0 and C == 1), - }.get(line.op_str)(*get_cpsr(ql.reg.cpsr)) + if imm: + expr = imm[0].split() + # TODO: should support more bit shifting and rotating operation + if expr[0] == "lsl": # logical shift left + n = _parse_int(expr[-1].strip("#")) * 2 - it_block_range = [each_char for each_char in line.mnemonic[1:]] + if line.mnemonic == "addls" and (C == 0 or Z == 1): + ret_addr = extra + read_reg_val(r1) + read_reg_val(r2) * n + + elif line.mnemonic == "add" or (line.mnemonic == "addne" and Z == 0): + ret_addr = extra + read_reg_val(r1) + (read_reg_val(r2) * n if imm else read_reg_val(r2)) + + elif line.mnemonic in ("tbh", "tbb"): + + cur_addr += self.INST_SIZE + r0, r1, *imm = line.op_str.strip("[]").split(", ") + + if imm: + expr = imm[0].split() + if expr[0] == "lsl": # logical shift left + n = _parse_int(expr[-1].strip("#")) * 2 + + if line.mnemonic == "tbh": + + r1 = read_reg_val(r1) * n + + elif line.mnemonic == "tbb": + + r1 = read_reg_val(r1) + + to_add = int.from_bytes(ql.mem.read(cur_addr+r1, 2 if line.mnemonic == "tbh" else 1), byteorder="little") * n + ret_addr = cur_addr + to_add + + elif line.mnemonic.startswith("pop") and "pc" in line.op_str: + + ret_addr = ql.stack_read(line.op_str.strip("{}").split(", ").index("pc") * self.INST_SIZE) + if not { # step to next instruction if cond does not meet + "pop" : lambda *_: True, + "pop.w": lambda *_: True, + "popeq": lambda V, C, Z, N: (Z == 1), + "popne": lambda V, C, Z, N: (Z == 0), + "pophi": lambda V, C, Z, N: (C == 1), + "popge": lambda V, C, Z, N: (N == V), + "poplt": lambda V, C, Z, N: (N != V), + }.get(line.mnemonic)(*get_cpsr(ql.reg.cpsr)): - next_addr = cur_addr + ARM_THUMB_INST_SIZE - for each in it_block_range: - _inst = read_inst(next_addr) - n2_addr = handle_bnj_arm(ql, next_addr) + ret_addr = cur_addr + self.INST_SIZE - if (cond_met and each == "t") or (not cond_met and each == "e"): - if n2_addr != (next_addr+len(_inst)): # branch detected - break + elif line.mnemonic == "sub" and self.regdst_eq_pc(line.op_str): + _, r, imm = line.op_str.split(", ") + ret_addr = read_reg_val(r) - _parse_int(imm.strip("#")) - next_addr += len(_inst) + elif line.mnemonic == "mov" and self.regdst_eq_pc(line.op_str): + _, r = line.op_str.split(", ") + ret_addr = read_reg_val(r) - ret_addr = next_addr + if ret_addr & 1: + ret_addr -= 1 - elif line.mnemonic in ("ldr",): + return (to_jump, ret_addr) - if regdst_eq_pc(line.op_str): - _, _, rn_offset = line.op_str.partition(", ") - r, _, imm = rn_offset.strip("[]!").partition(", #") +class BranchPredictor_MIPS(BranchPredictor): + def __init__(self, ql): + self.CODE_END = "break" + self.INST_SIZE = 4 + self.ql = ql - if "]" in rn_offset.split(", ")[1]: # pre-indexed immediate - ret_addr = ql.unpack32(ql.mem.read(_parse_int(imm) + read_reg_val(r), ARM_INST_SIZE)) + def read_reg(self, reg_name): + reg_name = reg_name.strip("$").replace("fp", "s8") + return signed_val(getattr(self.ql.reg, reg_name)) - else: # post-indexed immediate - # FIXME: weired behavior, immediate here does not apply - ret_addr = ql.unpack32(ql.mem.read(read_reg_val(r), ARM_INST_SIZE)) + def predict(self): - elif line.mnemonic in ("addls", "addne", "add") and regdst_eq_pc(line.op_str): - V, C, Z, N = get_cpsr(ql.reg.cpsr) - r0, r1, r2, *imm = line.op_str.split(", ") + cur_addr = self.ql.reg.arch_pc + line = disasm(self.ql, cur_addr) - # program counter is awalys 8 bytes ahead when it comes with pc, need to add extra 8 bytes - extra = 8 if 'pc' in (r0, r1, r2) else 0 + if line.mnemonic == self.CODE_END: # indicates program extied + return True - if imm: - expr = imm[0].split() - # TODO: should support more bit shifting and rotating operation - if expr[0] == "lsl": # logical shift left - n = _parse_int(expr[-1].strip("#")) * 2 + ret_addr = None + to_jump = False + if line.mnemonic.startswith('j') or line.mnemonic.startswith('b'): - if line.mnemonic == "addls" and (C == 0 or Z == 1): - ret_addr = extra + read_reg_val(r1) + read_reg_val(r2) * n + # make sure at least delay slot executed + ret_addr = cur_addr + self.INST_SIZE - elif line.mnemonic == "add" or (line.mnemonic == "addne" and Z == 0): - ret_addr = extra + read_reg_val(r1) + (read_reg_val(r2) * n if imm else read_reg_val(r2)) + # get registers or memory address from op_str + targets = [ + self.read_reg(each) + if '$' in each else _parse_int(each) + for each in line.op_str.split(", ") + ] - elif line.mnemonic in ("tbh", "tbb"): + to_jump = { + "j" : (lambda _: True), # unconditional jump + "jr" : (lambda _: True), # unconditional jump + "jal" : (lambda _: True), # unconditional jump + "jalr" : (lambda _: True), # unconditional jump + "b" : (lambda _: True), # unconditional branch + "bl" : (lambda _: True), # unconditional branch + "bal" : (lambda _: True), # unconditional branch + "beq" : (lambda r0, r1, _: r0 == r1), # branch on equal + "bne" : (lambda r0, r1, _: r0 != r1), # branch on not equal + "blt" : (lambda r0, r1, _: r0 < r1), # branch on r0 less than r1 + "bgt" : (lambda r0, r1, _: r0 > r1), # branch on r0 greater than r1 + "ble" : (lambda r0, r1, _: r0 <= r1), # brach on r0 less than or equal to r1 + "bge" : (lambda r0, r1, _: r0 >= r1), # branch on r0 greater than or equal to r1 + "beqz" : (lambda r, _: r == 0), # branch on equal to zero + "bnez" : (lambda r, _: r != 0), # branch on not equal to zero + "bgtz" : (lambda r, _: r > 0), # branch on greater than zero + "bltz" : (lambda r, _: r < 0), # branch on less than zero + "bltzal" : (lambda r, _: r < 0), # branch on less than zero and link + "blez" : (lambda r, _: r <= 0), # branch on less than or equal to zero + "bgez" : (lambda r, _: r >= 0), # branch on greater than or equal to zero + "bgezal" : (lambda r, _: r >= 0), # branch on greater than or equal to zero and link + }.get(line.mnemonic)(*targets) - cur_addr += ARM_INST_SIZE - r0, r1, *imm = line.op_str.strip("[]").split(", ") + if to_jump: + # target address is always the rightmost one + ret_addr = targets[-1] - if imm: - expr = imm[0].split() - if expr[0] == "lsl": # logical shift left - n = _parse_int(expr[-1].strip("#")) * 2 + return (to_jump, ret_addr) - if line.mnemonic == "tbh": +class BranchPredictor_X86(BranchPredictor): + def __init__(self, ql): + super().__init__(ql) - r1 = read_reg_val(r1) * n + def predict(self): - elif line.mnemonic == "tbb": + cur_addr = self.ql.reg.arch_pc + line = disasm(self.ql, cur_addr) - r1 = read_reg_val(r1) + jump_table = { - to_add = int.from_bytes(ql.mem.read(cur_addr+r1, 2 if line.mnemonic == "tbh" else 1), byteorder="little") * n - ret_addr = cur_addr + to_add + # conditional jump + "jo" : (lambda C, P, A, Z, S, O: O == 1), + "jno" : (lambda C, P, A, Z, S, O: O == 0), - elif line.mnemonic.startswith("pop") and "pc" in line.op_str: + "js" : (lambda C, P, A, Z, S, O: S == 1), + "jns" : (lambda C, P, A, Z, S, O: S == 0), - ret_addr = ql.stack_read(line.op_str.strip("{}").split(", ").index("pc") * ARM_INST_SIZE) - if not { # step to next instruction if cond does not meet - "pop" : lambda *_: True, - "pop.w": lambda *_: True, - "popeq": lambda V, C, Z, N: (Z == 1), - "popne": lambda V, C, Z, N: (Z == 0), - "pophi": lambda V, C, Z, N: (C == 1), - "popge": lambda V, C, Z, N: (N == V), - "poplt": lambda V, C, Z, N: (N != V), - }.get(line.mnemonic)(*get_cpsr(ql.reg.cpsr)): + "je" : (lambda C, P, A, Z, S, O: Z == 1), + "jz" : (lambda C, P, A, Z, S, O: Z == 1), - ret_addr = cur_addr + ARM_INST_SIZE + "jne" : (lambda C, P, A, Z, S, O: Z == 0), + "jnz" : (lambda C, P, A, Z, S, O: Z == 0), - elif line.mnemonic == "sub" and regdst_eq_pc(line.op_str): - _, r, imm = line.op_str.split(", ") - ret_addr = read_reg_val(r) - _parse_int(imm.strip("#")) + "jb" : (lambda C, P, A, Z, S, O: C == 1), + "jc" : (lambda C, P, A, Z, S, O: C == 1), + "jnae" : (lambda C, P, A, Z, S, O: C == 1), - elif line.mnemonic == "mov" and regdst_eq_pc(line.op_str): - _, r = line.op_str.split(", ") - ret_addr = read_reg_val(r) + "jnb" : (lambda C, P, A, Z, S, O: C == 0), + "jnc" : (lambda C, P, A, Z, S, O: C == 0), + "jae" : (lambda C, P, A, Z, S, O: C == 0), - if ret_addr & 1: - ret_addr -= 1 + "jbe" : (lambda C, P, A, Z, S, O: C == 1 or Z == 1), + "jna" : (lambda C, P, A, Z, S, O: C == 1 or Z == 1), - return (to_jump, ret_addr) + "ja" : (lambda C, P, A, Z, S, O: C == 0 and Z == 0), + "jnbe" : (lambda C, P, A, Z, S, O: C == 0 and Z == 0), + "jl" : (lambda C, P, A, Z, S, O: S != O), + "jnge" : (lambda C, P, A, Z, S, O: S != O), -def handle_bnj_mips(ql: Qiling, cur_addr: str) -> (bool, int): - MIPS_INST_SIZE = 4 + "jge" : (lambda C, P, A, Z, S, O: S == O), + "jnl" : (lambda C, P, A, Z, S, O: S == O), - def _read_reg(regs, _reg): - return signed_val(getattr(regs, _reg.strip('$').replace("fp", "s8"))) + "jle" : (lambda C, P, A, Z, S, O: Z == 1 or S != O), + "jng" : (lambda C, P, A, Z, S, O: Z == 1 or S != O), - read_reg_val = partial(_read_reg, ql.reg) + "jg" : (lambda C, P, A, Z, S, O: Z == 0 or S == O), + "jnle" : (lambda C, P, A, Z, S, O: Z == 0 or S == O), - line = disasm(ql, cur_addr) + "jp" : (lambda C, P, A, Z, S, O: P == 1), + "jpe" : (lambda C, P, A, Z, S, O: P == 1), - if line.mnemonic == "break": # indicates program extied - return CODE_END + "jnp" : (lambda C, P, A, Z, S, O: P == 0), + "jpo" : (lambda C, P, A, Z, S, O: P == 0), - # default breakpoint address if no jumps and branches here - ret_addr = cur_addr + MIPS_INST_SIZE + # unconditional jump - to_jump = False - if line.mnemonic.startswith('j') or line.mnemonic.startswith('b'): + "call" : (lambda *_: True), + "jmp" : (lambda *_: True), - # make sure at least delay slot executed - ret_addr += MIPS_INST_SIZE + } - # get registers or memory address from op_str - targets = [ - read_reg_val(each) - if '$' in each else _parse_int(each) - for each in line.op_str.split(", ") - ] + to_jump = False + ret_addr = None + if line.mnemonic in jump_table: + eflags = get_x86_eflags(self.ql.reg.ef).values() + to_jump = jump_table.get(line.mnemonic)(*eflags) - to_jump = { - "j" : (lambda _: True), # unconditional jump - "jr" : (lambda _: True), # unconditional jump - "jal" : (lambda _: True), # unconditional jump - "jalr" : (lambda _: True), # unconditional jump - "b" : (lambda _: True), # unconditional branch - "bl" : (lambda _: True), # unconditional branch - "bal" : (lambda _: True), # unconditional branch - "beq" : (lambda r0, r1, _: r0 == r1), # branch on equal - "bne" : (lambda r0, r1, _: r0 != r1), # branch on not equal - "blt" : (lambda r0, r1, _: r0 < r1), # branch on r0 less than r1 - "bgt" : (lambda r0, r1, _: r0 > r1), # branch on r0 greater than r1 - "ble" : (lambda r0, r1, _: r0 <= r1), # brach on r0 less than or equal to r1 - "bge" : (lambda r0, r1, _: r0 >= r1), # branch on r0 greater than or equal to r1 - "beqz" : (lambda r, _: r == 0), # branch on equal to zero - "bnez" : (lambda r, _: r != 0), # branch on not equal to zero - "bgtz" : (lambda r, _: r > 0), # branch on greater than zero - "bltz" : (lambda r, _: r < 0), # branch on less than zero - "bltzal" : (lambda r, _: r < 0), # branch on less than zero and link - "blez" : (lambda r, _: r <= 0), # branch on less than or equal to zero - "bgez" : (lambda r, _: r >= 0), # branch on greater than or equal to zero - "bgezal" : (lambda r, _: r >= 0), # branch on greater than or equal to zero and link - }.get(line.mnemonic)(*targets) + if to_jump: + ret_addr = _parse_int(line.op_str) - if to_jump: - # target address is always the rightmost one - ret_addr = targets[-1] + return (to_jump, ret_addr) - return (to_jump, ret_addr) +class BranchPredictor_CORTEXT_M(BranchPredictor_ARM): + def __init__(self, ql): + super().__init__(ql) class Breakpoint(object): """ @@ -556,5 +511,6 @@ def __init__(self, address): super().__init__(address) + if __name__ == "__main__": pass From 7374d37f4afe83ab96aeb04fef79dc3c9f0abb05 Mon Sep 17 00:00:00 2001 From: ucgJhe Date: Thu, 10 Feb 2022 10:05:43 +0800 Subject: [PATCH 4/7] add basic support for X86 branch prediction --- qiling/debugger/qdb/frontend.py | 10 ++++-- qiling/debugger/qdb/utils.py | 55 ++++++++++++++++++++++++++++++--- 2 files changed, 57 insertions(+), 8 deletions(-) diff --git a/qiling/debugger/qdb/frontend.py b/qiling/debugger/qdb/frontend.py index 4f9e5e614..7c1eeec06 100644 --- a/qiling/debugger/qdb/frontend.py +++ b/qiling/debugger/qdb/frontend.py @@ -432,12 +432,16 @@ def context_asm(self): past_list = [] cur_addr = self.ql.reg.arch_pc + cur_insn = disasm(self.ql, cur_addr) + to_jump, _ = self.predictor.predict() + self.print_asm(cur_insn, to_jump=to_jump) + # assembly before current location - line = disasm(self.ql, cur_addr) - acc_size = line.size + line = disasm(self.ql, cur_addr+cur_insn.size) + acc_size = line.size + cur_insn.size - while line and len(past_list) != 10: + while line and len(past_list) != 8: past_list.append(line) next_start = cur_addr + acc_size line = disasm(self.ql, next_start) diff --git a/qiling/debugger/qdb/utils.py b/qiling/debugger/qdb/utils.py index d93f8dfa0..87c029c57 100644 --- a/qiling/debugger/qdb/utils.py +++ b/qiling/debugger/qdb/utils.py @@ -5,8 +5,8 @@ from __future__ import annotations from typing import Callable, Optional, Mapping - from qiling.const import * +import ast, re # parse unsigned integer from string def _parse_int(s: str) -> int: @@ -125,7 +125,7 @@ def __init__(self, ql): self.ql = ql def read_reg(self, reg_name): - return NotImplementedError + return getattr(self.ql.reg, reg_name) def resolve(self): return NotImplementedError @@ -426,8 +426,8 @@ def predict(self): line = disasm(self.ql, cur_addr) jump_table = { - # conditional jump + "jo" : (lambda C, P, A, Z, S, O: O == 1), "jno" : (lambda C, P, A, Z, S, O: O == 0), @@ -479,13 +479,56 @@ def predict(self): } + jump_reg_table = { + "jcxz" : (lambda cx: cx == 0), + "jecxz" : (lambda ecx: ecx == 0), + "jrcxz" : (lambda rcx: rcx == 0), + } + to_jump = False - ret_addr = None if line.mnemonic in jump_table: eflags = get_x86_eflags(self.ql.reg.ef).values() to_jump = jump_table.get(line.mnemonic)(*eflags) - if to_jump: + elif line.mnemonic in jump_reg_table: + to_jump = jump_reg_table.get(line.mnemonic)(self.ql.reg.ecx) + + ret_addr = None + + if to_jump: + takeaway_list = ["ptr", "dword", "[", "]"] + class AST_checker(ast.NodeVisitor): + def generic_visit(self, node): + if type(node) in (ast.Module, ast.Expr, ast.BinOp, ast.Constant, ast.Add, ast.Mult, ast.Sub): + ast.NodeVisitor.generic_visit(self, node) + else: + raise ParseError("malform or invalid ast node") + + if len(line.op_str.split()) > 1: + new_line = line.op_str.replace(":", "+") + for each in takeaway_list: + new_line = new_line.replace(each, " ") + + new_line = " ".join(new_line.split()) + for each_reg in filter(lambda r: len(r) == 3, self.ql.reg.register_mapping.keys()): + if each_reg in new_line: + new_line = re.sub(each_reg, hex(self.read_reg(each_reg)), new_line) + + for each_reg in filter(lambda r: len(r) == 2, self.ql.reg.register_mapping.keys()): + if each_reg in new_line: + new_line = re.sub(each_reg, hex(self.read_reg(each_reg)), new_line) + + checker = AST_checker() + ast_tree = ast.parse(new_line) + + checker.visit(ast_tree) + + ret_addr = eval(new_line) + + elif line.op_str in self.ql.reg.register_mapping: + ret_addr = getattr(self.ql.reg, line.op_str) + + else: ret_addr = _parse_int(line.op_str) return (to_jump, ret_addr) @@ -510,6 +553,8 @@ class TempBreakpoint(Breakpoint): def __init__(self, address): super().__init__(address) +class ParseError(Exception): + pass if __name__ == "__main__": From 24f5e3d9e3cc9830cc47ff0bd052fa7aab0e6185 Mon Sep 17 00:00:00 2001 From: ucgJhe Date: Fri, 11 Feb 2022 02:11:01 +0800 Subject: [PATCH 5/7] fix typo and minor refactor --- qiling/debugger/qdb/frontend.py | 39 +++++++++++++++---------------- qiling/debugger/qdb/qdb.py | 6 ++--- qiling/debugger/qdb/utils.py | 41 +++++++++++++++------------------ 3 files changed, 39 insertions(+), 47 deletions(-) diff --git a/qiling/debugger/qdb/frontend.py b/qiling/debugger/qdb/frontend.py index 7c1eeec06..3613210f2 100644 --- a/qiling/debugger/qdb/frontend.py +++ b/qiling/debugger/qdb/frontend.py @@ -5,30 +5,15 @@ from __future__ import annotations from typing import Optional, Mapping, Iterable, Union - import copy, math, os -from contextlib import contextmanager - -from qiling.const import QL_ARCH import unicorn -from .const import * -from .utils import disasm, get_x86_eflags, setup_branch_predictor - -def setup_ctx_manager(ql: Qiling) -> CtxManager: - - if ql.archtype == QL_ARCH.MIPS: - return CtxManager_MIPS(ql) - - elif ql.archtype in (QL_ARCH.ARM, QL_ARCH.ARM_THUMB): - return CtxManager_ARM(ql) +from qiling.const import QL_ARCH - elif ql.archtype in (QL_ARCH.X86, QL_ARCH.X8664): - return CtxManager_X86(ql) +from .utils import disasm, get_x86_eflags, setup_branch_predictor +from .const import color, SIZE_LETTER, FORMAT_LETTER - elif ql.archtype == QL_ARCH.CORTEX_M: - return CtxManager_CORTEX_M(ql) # read data from memory of qiling instance def examine_mem(ql: Qiling, line: str) -> Union[bool, (str, int, int)]: @@ -84,7 +69,7 @@ def extract_count(t): if elem in ql.reg.register_mapping.keys(): items.append(getattr(ql.reg, elem, None)) else: - items.append(_parse_int(elem)) + items.append(read_int(elem)) addr = sum(items) @@ -167,7 +152,7 @@ def _try_read(ql: Qiling, address: int, size: int) -> Optional[bytes]: COLORS = (color.DARKCYAN, color.BLUE, color.RED, color.YELLOW, color.GREEN, color.PURPLE, color.CYAN, color.WHITE) -# divider printer +# decorator function for printing divider def context_printer(field_name, ruler="─"): def decorator(context_dumper): def wrapper(*args, **kwargs): @@ -180,8 +165,18 @@ def wrapper(*args, **kwargs): return wrapper return decorator -class CtxManager(object): +def setup_ctx_manager(ql: Qiling) -> CtxManager: + return { + QL_ARCH.X86: CtxManager_X86, + QL_ARCH.ARM: CtxManager_ARM, + QL_ARCH.ARM_THUMB: CtxManager_ARM, + QL_ARCH.CORTEX_M: CtxManager_ARM, + QL_ARCH.MIPS: CtxManager_MIPS, + }.get(ql.archtype)(ql) + + +class CtxManager(object): def __init__(self, ql): self.ql = ql self.predictor = setup_branch_predictor(ql) @@ -275,6 +270,7 @@ def context_asm(self): self.print_asm(forward_insn) forward_insn_size += forward_insn.size + class CtxManager_ARM(CtxManager): def __init__(self, ql): super().__init__(ql) @@ -390,6 +386,7 @@ def context_reg(self, saved_reg_dump): print(lines.format(*cur_regs.values())) + class CtxManager_X86(CtxManager): def __init__(self, ql): super().__init__(ql) diff --git a/qiling/debugger/qdb/qdb.py b/qiling/debugger/qdb/qdb.py index a55b9d810..74b16d7aa 100644 --- a/qiling/debugger/qdb/qdb.py +++ b/qiling/debugger/qdb/qdb.py @@ -12,11 +12,10 @@ from qiling.const import QL_ARCH, QL_VERBOSE from qiling.debugger import QlDebugger -# from .frontend import context_reg, context_asm, examine_mem from .frontend import examine_mem, setup_ctx_manager -from .utils import _parse_int, is_thumb, parse_int +from .utils import is_thumb, parse_int from .utils import Breakpoint, TempBreakpoint -from .const import * +from .const import color class QlQdb(cmd.Cmd, QlDebugger): @@ -355,6 +354,7 @@ def do_EOF(self: QlQdb, *args) -> None: if input(f"{color.RED}[!] Are you sure about saying good bye ~ ? [Y/n]{color.END} ").strip() == "Y": self.do_quit() + do_r = do_run do_s = do_step do_q = do_quit diff --git a/qiling/debugger/qdb/utils.py b/qiling/debugger/qdb/utils.py index 87c029c57..eeb322535 100644 --- a/qiling/debugger/qdb/utils.py +++ b/qiling/debugger/qdb/utils.py @@ -9,7 +9,7 @@ import ast, re # parse unsigned integer from string -def _parse_int(s: str) -> int: +def read_int(s: str) -> int: return int(s, 0) @@ -18,7 +18,7 @@ def parse_int(func: Callable) -> Callable: def wrap(qdb, s: str = "") -> int: assert type(s) is str try: - ret = _parse_int(s) + ret = read_int(s) except: ret = None return func(qdb, ret) @@ -107,18 +107,13 @@ def _read_inst(ql: Qiling, addr: int) -> int: """ def setup_branch_predictor(ql: Qiling) -> BranchPredictor: - - if ql.archtype == QL_ARCH.MIPS: - return BranchPredictor_MIPS(ql) - - elif ql.archtype in (QL_ARCH.ARM, QL_ARCH.ARM_THUMB): - return BranchPredictor_ARM(ql) - - elif ql.archtype in (QL_ARCH.X86, QL_ARCH.X8664): - return BranchPredictor_X86(ql) - - elif ql.archtype == QL_ARCH.CORTEX_M: - return BranchPredictor_CORTEX_M(ql) + return { + QL_ARCH.X86: BranchPredictor_X86, + QL_ARCH.ARM: BranchPredictor_ARM, + QL_ARCH.ARM_THUMB: BranchPredictor_ARM, + QL_ARCH.CORTEX_M: BranchPredictor_CORTEX_M, + QL_ARCH.MIPS: BranchPredictor_MIPS, + }.get(ql.archtype)(ql) class BranchPredictor(object): def __init__(self, ql): @@ -127,7 +122,7 @@ def __init__(self, ql): def read_reg(self, reg_name): return getattr(self.ql.reg, reg_name) - def resolve(self): + def predict(self): return NotImplementedError class BranchPredictor_ARM(BranchPredictor): @@ -233,7 +228,7 @@ def predict(self): if to_jump: if "#" in line.op_str: - ret_addr = _parse_int(line.op_str.split("#")[-1]) + ret_addr = read_int(line.op_str.split("#")[-1]) else: ret_addr = read_reg_val(line.op_str) @@ -280,7 +275,7 @@ def predict(self): r, _, imm = rn_offset.strip("[]!").partition(", #") if "]" in rn_offset.split(", ")[1]: # pre-indexed immediate - ret_addr = ql.unpack32(ql.mem.read(_parse_int(imm) + read_reg_val(r), self.INST_SIZE)) + ret_addr = ql.unpack32(ql.mem.read(read_int(imm) + read_reg_val(r), self.INST_SIZE)) else: # post-indexed immediate # FIXME: weired behavior, immediate here does not apply @@ -297,7 +292,7 @@ def predict(self): expr = imm[0].split() # TODO: should support more bit shifting and rotating operation if expr[0] == "lsl": # logical shift left - n = _parse_int(expr[-1].strip("#")) * 2 + n = read_int(expr[-1].strip("#")) * 2 if line.mnemonic == "addls" and (C == 0 or Z == 1): ret_addr = extra + read_reg_val(r1) + read_reg_val(r2) * n @@ -313,7 +308,7 @@ def predict(self): if imm: expr = imm[0].split() if expr[0] == "lsl": # logical shift left - n = _parse_int(expr[-1].strip("#")) * 2 + n = read_int(expr[-1].strip("#")) * 2 if line.mnemonic == "tbh": @@ -343,7 +338,7 @@ def predict(self): elif line.mnemonic == "sub" and self.regdst_eq_pc(line.op_str): _, r, imm = line.op_str.split(", ") - ret_addr = read_reg_val(r) - _parse_int(imm.strip("#")) + ret_addr = read_reg_val(r) - read_int(imm.strip("#")) elif line.mnemonic == "mov" and self.regdst_eq_pc(line.op_str): _, r = line.op_str.split(", ") @@ -382,7 +377,7 @@ def predict(self): # get registers or memory address from op_str targets = [ self.read_reg(each) - if '$' in each else _parse_int(each) + if '$' in each else read_int(each) for each in line.op_str.split(", ") ] @@ -529,11 +524,11 @@ def generic_visit(self, node): ret_addr = getattr(self.ql.reg, line.op_str) else: - ret_addr = _parse_int(line.op_str) + ret_addr = read_int(line.op_str) return (to_jump, ret_addr) -class BranchPredictor_CORTEXT_M(BranchPredictor_ARM): +class BranchPredictor_CORTEX_M(BranchPredictor_ARM): def __init__(self, ql): super().__init__(ql) From cfca0cc5fcb4e6f8d77e23aa8cdeddff45592a92 Mon Sep 17 00:00:00 2001 From: ucgJhe Date: Sat, 12 Feb 2022 00:01:26 +0800 Subject: [PATCH 6/7] try to adopt new strategy for starting and pausing --- qiling/debugger/qdb/frontend.py | 16 +++--- qiling/debugger/qdb/qdb.py | 57 +++++++++++++++----- qiling/debugger/qdb/utils.py | 94 ++++++++++++++++++--------------- 3 files changed, 103 insertions(+), 64 deletions(-) diff --git a/qiling/debugger/qdb/frontend.py b/qiling/debugger/qdb/frontend.py index 3613210f2..3e34abc3d 100644 --- a/qiling/debugger/qdb/frontend.py +++ b/qiling/debugger/qdb/frontend.py @@ -210,8 +210,8 @@ def context_stack(self): for idx in range(10): addr = self.ql.reg.arch_sp + idx * self.ql.pointersize - val = self.ql.mem.read(addr, self.ql.pointersize) - print(f"$sp+0x{idx*self.ql.pointersize:02x}│ [0x{addr:08x}] —▸ 0x{self.ql.unpack(val):08x}", end="") + if (val := _try_read(self.ql, addr, self.ql.pointersize)[0]): + print(f"$sp+0x{idx*self.ql.pointersize:02x}│ [0x{addr:08x}] —▸ 0x{self.ql.unpack(val):08x}", end="") # try to dereference wether it's a pointer if (buf := _try_read(self.ql, addr, self.ql.pointersize))[0] is not None: @@ -257,13 +257,13 @@ def context_asm(self): # assembly for current location - cur_ins = disasm(self.ql, cur_addr) - to_jump, _ = self.predictor.predict() - self.print_asm(cur_ins, to_jump=to_jump) + cur_insn = disasm(self.ql, cur_addr) + prophecy = self.predictor.predict() + self.print_asm(cur_insn, to_jump=prophecy.going) # assembly after current location - forward_insn_size = cur_ins.size + forward_insn_size = cur_insn.size for _ in range(5): forward_insn = disasm(self.ql, cur_addr+forward_insn_size) if forward_insn: @@ -430,8 +430,8 @@ def context_asm(self): cur_addr = self.ql.reg.arch_pc cur_insn = disasm(self.ql, cur_addr) - to_jump, _ = self.predictor.predict() - self.print_asm(cur_insn, to_jump=to_jump) + prophecy = self.predictor.predict() + self.print_asm(cur_insn, to_jump=prophecy.going) # assembly before current location diff --git a/qiling/debugger/qdb/qdb.py b/qiling/debugger/qdb/qdb.py index 74b16d7aa..4bb5c64df 100644 --- a/qiling/debugger/qdb/qdb.py +++ b/qiling/debugger/qdb/qdb.py @@ -13,8 +13,8 @@ from qiling.debugger import QlDebugger from .frontend import examine_mem, setup_ctx_manager -from .utils import is_thumb, parse_int -from .utils import Breakpoint, TempBreakpoint +from .utils import is_thumb, parse_int, setup_branch_predictor, disasm +from .utils import Breakpoint, TempBreakpoint, read_inst from .const import color @@ -32,6 +32,7 @@ def __init__(self: QlQdb, ql: Qiling, init_hook: str = "", rr: bool = False) -> self._states_list = [] self.ctx = setup_ctx_manager(ql) + self.predictor = setup_branch_predictor(ql) super().__init__() @@ -42,7 +43,16 @@ def dbg_hook(self: QlQdb, init_hook: str): # self.ql.loader.entry_point # ld.so # self.ql.loader.elf_entry # .text of binary + def bp_handler(ql, address, size, bp_list): + if address in bp_list: + ql.emu_stop() + _ = bp_list.pop(address) + self.do_context() + + self.ql.hook_code(bp_handler, self.bp_list) + if init_hook and self.ql.loader.entry_point != init_hook: + breakpoint() self.do_breakpoint(init_hook) self.cur_addr = self.ql.loader.entry_point @@ -210,9 +220,9 @@ def do_backward(self: QlQdb, *args) -> None: self._restore() self.do_context() - def do_step(self: QlQdb, *args) -> Optional[bool]: + def do_step_in(self: QlQdb, *args) -> Optional[bool]: """ - execute one instruction at a time + execute one instruction at a time, will enter subroutine """ if self.ql is None: @@ -225,20 +235,40 @@ def do_step(self: QlQdb, *args) -> Optional[bool]: if self.rr: self._save() - # next_stop = self.predictor.predict() + prophecy = self.predictor.predict() - # if next_stop is True: - # return True + if prophecy.where is True: + return True if self.ql.archtype == QL_ARCH.CORTEX_M: - self.ql.arch.step() self.ql.count -= 1 + self.set_breakpoint(self.cur_addr, is_temp=True) - else: - self._run(count=1) + self._run(count=1) self.do_context() + def do_step_over(self: QlQdb, *args) -> Option[bool]: + """ + execute one instruction at a time, but WON't enter subroutine + """ + + if self.ql is None: + print(f"{color.RED}[!] The program is not being run.{color.END}") + + else: + + prophecy = self.predictor.predict() + + if prophecy.going: + cur_insn = disasm(self.ql, self.cur_addr) + self.set_breakpoint(self.cur_addr + cur_insn.size, is_temp=True) + + else: + self.set_breakpoint(prophecy.where, is_temp=True) + + self._run(self.cur_addr) + def set_breakpoint(self: QlQdb, address: int, is_temp: bool = False) -> None: """ internal function for placing breakpoint @@ -246,9 +276,9 @@ def set_breakpoint(self: QlQdb, address: int, is_temp: bool = False) -> None: bp = TempBreakpoint(address) if is_temp else Breakpoint(address) - if self.ql.archtype != QL_ARCH.CORTEX_M: + # if self.ql.archtype != QL_ARCH.CORTEX_M: # skip hook_address for cortex_m - bp.hook = self.ql.hook_address(self._bp_handler, address) + # bp.hook = self.ql.hook_address(self._bp_handler, address) self.bp_list.update({address: bp}) @@ -356,7 +386,8 @@ def do_EOF(self: QlQdb, *args) -> None: do_r = do_run - do_s = do_step + do_s = do_step_in + do_n = do_step_over do_q = do_quit do_x = do_examine do_p = do_backward diff --git a/qiling/debugger/qdb/utils.py b/qiling/debugger/qdb/utils.py index eeb322535..dd9f5f8d7 100644 --- a/qiling/debugger/qdb/utils.py +++ b/qiling/debugger/qdb/utils.py @@ -6,6 +6,8 @@ from __future__ import annotations from typing import Callable, Optional, Mapping from qiling.const import * + +from collections import namedtuple import ast, re # parse unsigned integer from string @@ -67,7 +69,7 @@ def disasm(ql: Qiling, address: int, detail: bool = False) -> Optional[int]: md = ql.disassembler md.detail = detail try: - ret = next(md.disasm(_read_inst(ql, address), address)) + ret = next(md.disasm(read_inst(ql, address), address)) except StopIteration: ret = None @@ -75,7 +77,7 @@ def disasm(ql: Qiling, address: int, detail: bool = False) -> Optional[int]: return ret -def _read_inst(ql: Qiling, addr: int) -> int: +def read_inst(ql: Qiling, addr: int) -> int: result = ql.mem.read(addr, 4) if ql.archtype in (QL_ARCH.ARM, QL_ARCH.ARM_THUMB, QL_ARCH.CORTEX_M): @@ -115,6 +117,14 @@ def setup_branch_predictor(ql: Qiling) -> BranchPredictor: QL_ARCH.MIPS: BranchPredictor_MIPS, }.get(ql.archtype)(ql) +class Prophecy(object): + def __init__(self): + self.going = False + self.where = None + + def __iter__(self): + return iter((self.going, self.where)) + class BranchPredictor(object): def __init__(self, ql): self.ql = ql @@ -141,9 +151,10 @@ def regdst_eq_pc(self, op_str): return op_str.partition(", ")[0] == "pc" def predict(self): + prophecy = Prophecy() cur_addr = self.ql.reg.arch_pc line = disasm(self.ql, cur_addr) - ret_addr = cur_addr + line.size + prophecy.where = cur_addr + line.size if line.mnemonic == self.CODE_END: # indicates program exited return True @@ -219,23 +230,22 @@ def predict(self): "cbnz": (lambda r: r != 0), } - to_jump = False if line.mnemonic in jump_table: - to_jump = jump_table.get(line.mnemonic)(*get_cpsr(self.ql.reg.cpsr)) + prophecy.going = jump_table.get(line.mnemonic)(*get_cpsr(self.ql.reg.cpsr)) elif line.mnemonic in cb_table: - to_jump = cb_table.get(line.mnemonic)(self.read_reg(line.op_str.split(", ")[0])) + prophecy.going = cb_table.get(line.mnemonic)(self.read_reg(line.op_str.split(", ")[0])) - if to_jump: + if prophecy.going: if "#" in line.op_str: - ret_addr = read_int(line.op_str.split("#")[-1]) + prophecy.where = read_int(line.op_str.split("#")[-1]) else: - ret_addr = read_reg_val(line.op_str) + prophecy.where = read_reg_val(line.op_str) if regdst_eq_pc(line.op_str): next_addr = cur_addr + line.size n2_addr = next_addr + len(read_inst(next_addr)) - ret_addr += len(read_inst(n2_addr)) + len(read_inst(next_addr)) + prophecy.where += len(read_inst(n2_addr)) + len(read_inst(next_addr)) elif line.mnemonic.startswith("it"): # handle IT block here @@ -266,7 +276,7 @@ def predict(self): next_addr += len(_inst) - ret_addr = next_addr + prophecy.where = next_addr elif line.mnemonic in ("ldr",): @@ -275,11 +285,11 @@ def predict(self): r, _, imm = rn_offset.strip("[]!").partition(", #") if "]" in rn_offset.split(", ")[1]: # pre-indexed immediate - ret_addr = ql.unpack32(ql.mem.read(read_int(imm) + read_reg_val(r), self.INST_SIZE)) + prophecy.where = ql.unpack32(ql.mem.read(read_int(imm) + read_reg_val(r), self.INST_SIZE)) else: # post-indexed immediate # FIXME: weired behavior, immediate here does not apply - ret_addr = ql.unpack32(ql.mem.read(read_reg_val(r), self.INST_SIZE)) + prophecy.where = ql.unpack32(ql.mem.read(read_reg_val(r), self.INST_SIZE)) elif line.mnemonic in ("addls", "addne", "add") and self.regdst_eq_pc(line.op_str): V, C, Z, N = get_cpsr(ql.reg.cpsr) @@ -295,10 +305,10 @@ def predict(self): n = read_int(expr[-1].strip("#")) * 2 if line.mnemonic == "addls" and (C == 0 or Z == 1): - ret_addr = extra + read_reg_val(r1) + read_reg_val(r2) * n + prophecy.where = extra + read_reg_val(r1) + read_reg_val(r2) * n elif line.mnemonic == "add" or (line.mnemonic == "addne" and Z == 0): - ret_addr = extra + read_reg_val(r1) + (read_reg_val(r2) * n if imm else read_reg_val(r2)) + prophecy.where = extra + read_reg_val(r1) + (read_reg_val(r2) * n if imm else read_reg_val(r2)) elif line.mnemonic in ("tbh", "tbb"): @@ -319,11 +329,11 @@ def predict(self): r1 = read_reg_val(r1) to_add = int.from_bytes(ql.mem.read(cur_addr+r1, 2 if line.mnemonic == "tbh" else 1), byteorder="little") * n - ret_addr = cur_addr + to_add + prophecy.where = cur_addr + to_add elif line.mnemonic.startswith("pop") and "pc" in line.op_str: - ret_addr = ql.stack_read(line.op_str.strip("{}").split(", ").index("pc") * self.INST_SIZE) + prophecy.where = ql.stack_read(line.op_str.strip("{}").split(", ").index("pc") * self.INST_SIZE) if not { # step to next instruction if cond does not meet "pop" : lambda *_: True, "pop.w": lambda *_: True, @@ -334,45 +344,44 @@ def predict(self): "poplt": lambda V, C, Z, N: (N != V), }.get(line.mnemonic)(*get_cpsr(ql.reg.cpsr)): - ret_addr = cur_addr + self.INST_SIZE + prophecy.where = cur_addr + self.INST_SIZE elif line.mnemonic == "sub" and self.regdst_eq_pc(line.op_str): _, r, imm = line.op_str.split(", ") - ret_addr = read_reg_val(r) - read_int(imm.strip("#")) + prophecy.where = read_reg_val(r) - read_int(imm.strip("#")) elif line.mnemonic == "mov" and self.regdst_eq_pc(line.op_str): _, r = line.op_str.split(", ") - ret_addr = read_reg_val(r) + prophecy.where = read_reg_val(r) - if ret_addr & 1: - ret_addr -= 1 + if prophecy.where & 1: + prophecy.where -= 1 - return (to_jump, ret_addr) + return prophecy class BranchPredictor_MIPS(BranchPredictor): def __init__(self, ql): + super().__init__(ql) self.CODE_END = "break" self.INST_SIZE = 4 - self.ql = ql def read_reg(self, reg_name): reg_name = reg_name.strip("$").replace("fp", "s8") return signed_val(getattr(self.ql.reg, reg_name)) def predict(self): - + prophecy = Prophecy() cur_addr = self.ql.reg.arch_pc line = disasm(self.ql, cur_addr) if line.mnemonic == self.CODE_END: # indicates program extied return True - ret_addr = None - to_jump = False + prophecy.where = cur_addr + self.INST_SIZE if line.mnemonic.startswith('j') or line.mnemonic.startswith('b'): # make sure at least delay slot executed - ret_addr = cur_addr + self.INST_SIZE + prophecy.where += self.INST_SIZE # get registers or memory address from op_str targets = [ @@ -381,7 +390,7 @@ def predict(self): for each in line.op_str.split(", ") ] - to_jump = { + prophecy.going = { "j" : (lambda _: True), # unconditional jump "jr" : (lambda _: True), # unconditional jump "jal" : (lambda _: True), # unconditional jump @@ -405,18 +414,18 @@ def predict(self): "bgezal" : (lambda r, _: r >= 0), # branch on greater than or equal to zero and link }.get(line.mnemonic)(*targets) - if to_jump: + if prophecy.going: # target address is always the rightmost one - ret_addr = targets[-1] + prophecy.where = targets[-1] - return (to_jump, ret_addr) + return prophecy class BranchPredictor_X86(BranchPredictor): def __init__(self, ql): super().__init__(ql) def predict(self): - + prophecy = Prophecy() cur_addr = self.ql.reg.arch_pc line = disasm(self.ql, cur_addr) @@ -480,17 +489,14 @@ def predict(self): "jrcxz" : (lambda rcx: rcx == 0), } - to_jump = False if line.mnemonic in jump_table: eflags = get_x86_eflags(self.ql.reg.ef).values() - to_jump = jump_table.get(line.mnemonic)(*eflags) + prophecy.going = jump_table.get(line.mnemonic)(*eflags) elif line.mnemonic in jump_reg_table: - to_jump = jump_reg_table.get(line.mnemonic)(self.ql.reg.ecx) - - ret_addr = None + prophecy.going = jump_reg_table.get(line.mnemonic)(self.ql.reg.ecx) - if to_jump: + if prophecy.going: takeaway_list = ["ptr", "dword", "[", "]"] class AST_checker(ast.NodeVisitor): def generic_visit(self, node): @@ -518,15 +524,17 @@ def generic_visit(self, node): checker.visit(ast_tree) - ret_addr = eval(new_line) + prophecy.where = eval(new_line) elif line.op_str in self.ql.reg.register_mapping: - ret_addr = getattr(self.ql.reg, line.op_str) + prophecy.where = getattr(self.ql.reg, line.op_str) else: - ret_addr = read_int(line.op_str) + prophecy.where = read_int(line.op_str) + else: + prophecy.where = cur_addr + line.size - return (to_jump, ret_addr) + return prophecy class BranchPredictor_CORTEX_M(BranchPredictor_ARM): def __init__(self, ql): From 8b4fa21ab63d3ad4e98120f30f7360285e1bb434 Mon Sep 17 00:00:00 2001 From: ucgJhe Date: Sat, 12 Feb 2022 11:15:29 +0800 Subject: [PATCH 7/7] fix asm listing for cortex_m and breakpoint --- qiling/debugger/qdb/frontend.py | 2 +- qiling/debugger/qdb/qdb.py | 68 ++++++++++++++------------------- qiling/debugger/qdb/utils.py | 29 +++++++------- 3 files changed, 44 insertions(+), 55 deletions(-) diff --git a/qiling/debugger/qdb/frontend.py b/qiling/debugger/qdb/frontend.py index 3e34abc3d..cf4a1d087 100644 --- a/qiling/debugger/qdb/frontend.py +++ b/qiling/debugger/qdb/frontend.py @@ -252,7 +252,7 @@ def context_asm(self): past_list.append(line) # print four insns before current location - for line in past_list[:-1][:4]: + for line in past_list[:-1]: self.print_asm(line) # assembly for current location diff --git a/qiling/debugger/qdb/qdb.py b/qiling/debugger/qdb/qdb.py index 4bb5c64df..d6366254a 100644 --- a/qiling/debugger/qdb/qdb.py +++ b/qiling/debugger/qdb/qdb.py @@ -44,15 +44,26 @@ def dbg_hook(self: QlQdb, init_hook: str): # self.ql.loader.elf_entry # .text of binary def bp_handler(ql, address, size, bp_list): - if address in bp_list: - ql.emu_stop() - _ = bp_list.pop(address) + + if (bp := self.bp_list.get(address, None)): + + if isinstance(bp, TempBreakpoint): + # remove TempBreakpoint once hitted + self.del_breakpoint(bp) + + else: + if bp.hitted: + return + + print(f"{color.CYAN}[+] hit breakpoint at 0x{self.cur_addr:08x}{color.END}") + bp.hitted = True + + ql.stop() self.do_context() self.ql.hook_code(bp_handler, self.bp_list) if init_hook and self.ql.loader.entry_point != init_hook: - breakpoint() self.do_breakpoint(init_hook) self.cur_addr = self.ql.loader.entry_point @@ -82,27 +93,6 @@ def cur_addr(self: QlQdb, address: int) -> None: self.ql.reg.arch_pc = address - def _bp_handler(self: QlQdb, *args) -> None: - """ - internal function for handling once breakpoint hitted - """ - - if (bp := self.bp_list.get(self.cur_addr, None)): - - if isinstance(bp, TempBreakpoint): - # remove TempBreakpoint once hitted - self.del_breakpoint(bp) - - else: - if bp.hitted: - return - - print(f"{color.CYAN}[+] hit breakpoint at 0x{self.cur_addr:08x}{color.END}") - bp.hitted = True - - self.ql.stop() - self.do_context() - def _save(self: QlQdb, *args) -> None: """ internal function for saving state of qiling instance @@ -220,6 +210,12 @@ def do_backward(self: QlQdb, *args) -> None: self._restore() self.do_context() + def update_reg_dump(self: QlQdb) -> None: + """ + internal function for updating registers dump + """ + self._saved_reg_dump = dict(filter(lambda d: isinstance(d[0], str), self.ql.reg.save().items())) + def do_step_in(self: QlQdb, *args) -> Optional[bool]: """ execute one instruction at a time, will enter subroutine @@ -229,8 +225,7 @@ def do_step_in(self: QlQdb, *args) -> Optional[bool]: print(f"{color.RED}[!] The program is not being run.{color.END}") else: - # save reg dump for data highlighting changes - self._saved_reg_dump = dict(filter(lambda d: isinstance(d[0], str), self.ql.reg.save().items())) + self.update_reg_dump() if self.rr: self._save() @@ -241,10 +236,9 @@ def do_step_in(self: QlQdb, *args) -> Optional[bool]: return True if self.ql.archtype == QL_ARCH.CORTEX_M: - self.ql.count -= 1 - self.set_breakpoint(self.cur_addr, is_temp=True) - - self._run(count=1) + self.ql.arch.step() + else: + self._run(count=1) self.do_context() @@ -259,6 +253,7 @@ def do_step_over(self: QlQdb, *args) -> Option[bool]: else: prophecy = self.predictor.predict() + self.update_reg_dump() if prophecy.going: cur_insn = disasm(self.ql, self.cur_addr) @@ -267,7 +262,7 @@ def do_step_over(self: QlQdb, *args) -> Option[bool]: else: self.set_breakpoint(prophecy.where, is_temp=True) - self._run(self.cur_addr) + self._run() def set_breakpoint(self: QlQdb, address: int, is_temp: bool = False) -> None: """ @@ -276,19 +271,14 @@ def set_breakpoint(self: QlQdb, address: int, is_temp: bool = False) -> None: bp = TempBreakpoint(address) if is_temp else Breakpoint(address) - # if self.ql.archtype != QL_ARCH.CORTEX_M: - # skip hook_address for cortex_m - # bp.hook = self.ql.hook_address(self._bp_handler, address) - self.bp_list.update({address: bp}) def del_breakpoint(self: QlQdb, bp: Union[Breakpoint, TempBreakpoint]) -> None: """ - internal function for removing breakpoints + internal function for removing breakpoint """ - if self.bp_list.pop(bp.addr, None): - bp.hook.remove() + self.bp_list.pop(bp.addr, None) def do_start(self: QlQdb, *args) -> None: """ diff --git a/qiling/debugger/qdb/utils.py b/qiling/debugger/qdb/utils.py index dd9f5f8d7..9178a2621 100644 --- a/qiling/debugger/qdb/utils.py +++ b/qiling/debugger/qdb/utils.py @@ -240,9 +240,9 @@ def predict(self): if "#" in line.op_str: prophecy.where = read_int(line.op_str.split("#")[-1]) else: - prophecy.where = read_reg_val(line.op_str) + prophecy.where = self.read_reg(line.op_str) - if regdst_eq_pc(line.op_str): + if self.regdst_eq_pc(line.op_str): next_addr = cur_addr + line.size n2_addr = next_addr + len(read_inst(next_addr)) prophecy.where += len(read_inst(n2_addr)) + len(read_inst(next_addr)) @@ -285,11 +285,11 @@ def predict(self): r, _, imm = rn_offset.strip("[]!").partition(", #") if "]" in rn_offset.split(", ")[1]: # pre-indexed immediate - prophecy.where = ql.unpack32(ql.mem.read(read_int(imm) + read_reg_val(r), self.INST_SIZE)) + prophecy.where = ql.unpack32(ql.mem.read(read_int(imm) + self.read_reg(r), self.INST_SIZE)) else: # post-indexed immediate # FIXME: weired behavior, immediate here does not apply - prophecy.where = ql.unpack32(ql.mem.read(read_reg_val(r), self.INST_SIZE)) + prophecy.where = ql.unpack32(ql.mem.read(self.read_reg(r), self.INST_SIZE)) elif line.mnemonic in ("addls", "addne", "add") and self.regdst_eq_pc(line.op_str): V, C, Z, N = get_cpsr(ql.reg.cpsr) @@ -305,10 +305,10 @@ def predict(self): n = read_int(expr[-1].strip("#")) * 2 if line.mnemonic == "addls" and (C == 0 or Z == 1): - prophecy.where = extra + read_reg_val(r1) + read_reg_val(r2) * n + prophecy.where = extra + self.read_reg(r1) + self.read_reg(r2) * n elif line.mnemonic == "add" or (line.mnemonic == "addne" and Z == 0): - prophecy.where = extra + read_reg_val(r1) + (read_reg_val(r2) * n if imm else read_reg_val(r2)) + prophecy.where = extra + self.read_reg(r1) + (self.read_reg(r2) * n if imm else self.read_reg(r2)) elif line.mnemonic in ("tbh", "tbb"): @@ -322,11 +322,11 @@ def predict(self): if line.mnemonic == "tbh": - r1 = read_reg_val(r1) * n + r1 = self.read_reg(r1) * n elif line.mnemonic == "tbb": - r1 = read_reg_val(r1) + r1 = self.read_reg(r1) to_add = int.from_bytes(ql.mem.read(cur_addr+r1, 2 if line.mnemonic == "tbh" else 1), byteorder="little") * n prophecy.where = cur_addr + to_add @@ -348,11 +348,11 @@ def predict(self): elif line.mnemonic == "sub" and self.regdst_eq_pc(line.op_str): _, r, imm = line.op_str.split(", ") - prophecy.where = read_reg_val(r) - read_int(imm.strip("#")) + prophecy.where = self.read_reg(r) - read_int(imm.strip("#")) elif line.mnemonic == "mov" and self.regdst_eq_pc(line.op_str): _, r = line.op_str.split(", ") - prophecy.where = read_reg_val(r) + prophecy.where = self.read_reg(r) if prophecy.where & 1: prophecy.where -= 1 @@ -544,17 +544,16 @@ class Breakpoint(object): """ dummy class for breakpoint """ - def __init__(self, address: int): - self.addr = address + def __init__(self, addr): + self.addr = addr self.hitted = False - self.hook = None class TempBreakpoint(Breakpoint): """ dummy class for temporay breakpoint """ - def __init__(self, address): - super().__init__(address) + def __init__(self, addr): + super().__init__(addr) class ParseError(Exception): pass