Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions qiling/debugger/qdb/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class color:
UNDERLINE = '\033[4m'
BOLD = '\033[1m'
END = '\033[0m'
RESET = '\x1b[39m'



FORMAT_LETTER = {
Expand Down
74 changes: 50 additions & 24 deletions qiling/debugger/qdb/frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from qiling.const import QL_ARCH

from .utils import dump_regs, get_arm_flags, disasm, parse_int
from .utils import dump_regs, get_arm_flags, disasm, parse_int, handle_bnj
from .const import *


Expand Down Expand Up @@ -87,7 +87,7 @@ def unpack(bs, sz):

for line in range(lines):
offset = line * sz * 4
print("0x{addr+offset:x}:\t", end="")
print(f"0x{addr+offset:x}:\t", end="")

idx = line * ql.pointersize
for each in mem_read[idx:idx+ql.pointersize]:
Expand Down Expand Up @@ -120,17 +120,19 @@ def _try_read(ql: Qiling, address: int, size: int) -> Optional[bytes]:

# divider printer
@contextmanager
def context_printer(ql: Qiling, field_name: str, ruler: str = "=") -> None:
_height, _width = get_terminal_size()
print(field_name, ruler * (_width - len(field_name) - 1))
def context_printer(ql: Qiling, field_name: str, ruler: str = "─") -> None:
height, width = get_terminal_size()
bar = (width - len(field_name)) // 2 - 1
print(ruler * bar, field_name, ruler * bar)
yield
print(ruler * _width)
if "DISASM" in field_name:
print(ruler * width)


def context_reg(ql: Qiling, saved_states: Optional[Mapping[str, int]] = None, /, *args, **kwargs) -> None:

# context render for registers
with context_printer(ql, "[Registers]"):
with context_printer(ql, "[ REGISTERS ]"):

_cur_regs = dump_regs(ql)

Expand Down Expand Up @@ -194,33 +196,56 @@ def context_reg(ql: Qiling, saved_states: Optional[Mapping[str, int]] = None, /,
print(color.GREEN, "[{cpsr[mode]} mode], Thumb: {cpsr[thumb]}, FIQ: {cpsr[fiq]}, IRQ: {cpsr[irq]}, NEG: {cpsr[neg]}, ZERO: {cpsr[zero]}, Carry: {cpsr[carry]}, Overflow: {cpsr[overflow]}".format(cpsr=get_arm_flags(ql.reg.cpsr)), color.END, sep="")

# context render for Stack
with context_printer(ql, "[Stack]", ruler="-"):
with context_printer(ql, "[ STACK ]", ruler=""):

for idx in range(8):
_addr = ql.reg.arch_sp + idx * 4
_val = ql.mem.read(_addr, ql.pointersize)
print(f"$sp+0x{idx*4:02x}|[0x{_addr:08x}]=> 0x{ql.unpack(_val):08x}", end="")
for idx in range(10):
addr = ql.reg.arch_sp + idx * ql.pointersize
val = ql.mem.read(addr, ql.pointersize)
print(f"$sp+0x{idx*ql.pointersize:02x}[0x{addr:08x}] —▸ 0x{ql.unpack(val):08x}", end="")

try: # try to deference wether its a pointer
_deref = ql.mem.read(_addr, ql.pointersize)
buf = ql.mem.read(addr, ql.pointersize)
except:
_deref = None
buf = None

if (addr := ql.unpack(buf)):
try: # try to deference again
buf = ql.mem.read(addr, ql.pointersize)
except:
buf = None

if buf:
try:
s = ql.mem.string(addr)
except:
s = None

if s and s.isprintable():
print(f" ◂— {ql.mem.string(addr)}", end="")
else:
print(f" ◂— 0x{ql.unpack(buf):08x}", end="")
print()

if _deref:
print(f" => 0x{ql.unpack(_deref):08x}")

def print_asm(ql: Qiling, insn: CsInsn, to_jump: Optional[bool] = None, address: int = None) -> None:

def print_asm(ql: Qiling, ins: CsInsn) -> None:
fmt = (ins.address, ins.mnemonic.ljust(6), ins.op_str)
if ql.reg.arch_pc == ins.address:
print(f"PC ==> 0x{fmt[0]:x}\t{fmt[1]} {fmt[2]}")
else:
print(f"\t0x{fmt[0]:x}\t{fmt[1]} {fmt[2]}")
opcode = "".join(f"{b:02x}" for b in insn.bytes)
trace_line = f"0x{insn.address:08x} │ {opcode:10s} {insn.mnemonic:10} {insn.op_str:35s}"

cursor = " "
if ql.reg.arch_pc == insn.address:
cursor = "►"

jump_sign = " "
if to_jump and address != ql.reg.arch_pc+4:
jump_sign = f"{color.RED}✓{color.END}"

print(f"{jump_sign} {cursor} {color.DARKGRAY}{trace_line}{color.END}")


def context_asm(ql: Qiling, address: int) -> None:

with context_printer(ql, field_name="[Code]"):
with context_printer(ql, field_name="[ DISASM ]"):

# assembly before current location

Expand Down Expand Up @@ -249,7 +274,8 @@ def context_asm(ql: Qiling, address: int) -> None:
# assembly for current location

cur_ins = disasm(ql, address)
print_asm(ql, cur_ins)
to_jump, next_stop = handle_bnj(ql, address)
print_asm(ql, cur_ins, to_jump=to_jump)

# assembly after current location

Expand Down
55 changes: 27 additions & 28 deletions qiling/debugger/qdb/qdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ class QlQdb(cmd.Cmd, QlDebugger):

def __init__(self: QlQdb, ql: Qiling, init_hook: str = "", rr: bool = False) -> None:

self._ql = ql
self.prompt = "(Qdb) "
self.ql = ql
self.prompt = f"{color.BOLD}{color.RED}Qdb> {color.END}"
self.breakpoints = {}
self._saved_reg_dump = None
self.bp_list = {}
Expand All @@ -35,7 +35,7 @@ def __init__(self: QlQdb, ql: Qiling, init_hook: str = "", rr: bool = False) ->
super().__init__()

# setup a breakpoint at entry point or user specified address
address = self._ql.loader.entry_point if not init_hook else parse_int(init_hook)
address = self.ql.loader.entry_point if not init_hook else parse_int(init_hook)
self.set_breakpoint(address, is_temp=True)
self.dbg_hook()

Expand All @@ -44,23 +44,23 @@ def dbg_hook(self: QlQdb):
hook every instruction with callback funtion _bp_handler
"""

self._ql.hook_code(self._bp_handler)
self.ql.hook_code(self._bp_handler)

@property
def cur_addr(self: QlQdb) -> int:
"""
getter for current address of qiling instance
"""

return self._ql.reg.arch_pc
return self.ql.reg.arch_pc

@cur_addr.setter
def cur_addr(self: QlQdb, address: int) -> None:
"""
setter for current address of qiling instance
"""

self._ql.reg.arch_pc = address
self.ql.reg.arch_pc = address

def _bp_handler(self: QlQdb, *args) -> None:
"""
Expand All @@ -83,21 +83,21 @@ def _save(self: QlQdb, *args) -> None:
internal function for saving state of qiling instance
"""

self._states_list.append(self._ql.save())
self._states_list.append(self.ql.save())

def _restore(self: QlQdb, *args) -> None:
"""
internal function for restoring state of qiling instance
"""

self._ql.restore(self._states_list.pop())
self.ql.restore(self._states_list.pop())

def _run(self: Qldbg, *args) -> None:
"""
internal function for launching qiling instance
"""

self._ql.run()
self.ql.run()

def parseline(self: QlQdb, line: str) -> Tuple[Optional[str], Optional[str], str]:
"""
Expand Down Expand Up @@ -150,12 +150,12 @@ def do_run(self: QlQdb, *args) -> None:
launch qiling instance from a fresh start
"""

self._ql = Qiling(
argv=self._ql.argv,
rootfs=self._ql.rootfs,
verbose=self._ql.verbose,
console=self._ql.console,
log_file=self._ql.log_file,
self.ql = Qiling(
argv=self.ql.argv,
rootfs=self.ql.rootfs,
verbose=self.ql.verbose,
console=self.ql.console,
log_file=self.ql.log_file,
)

self.dbg_hook()
Expand All @@ -166,8 +166,8 @@ def do_context(self: QlQdb, *args) -> None:
show context information for current location
"""

context_reg(self._ql, self._saved_reg_dump)
context_asm(self._ql, self.cur_addr)
context_reg(self.ql, self._saved_reg_dump)
context_asm(self.ql, self.cur_addr)

def do_backward(self: QlQdb, *args) -> None:
"""
Expand All @@ -187,13 +187,13 @@ def do_step(self: QlQdb, *args) -> Optional[bool, None]:
execute one instruction at a time
"""

if self._ql is None:
if self.ql is None:
print(f"{color.RED}[!] The program is not being run.{color.END}")

else:
self._saved_reg_dump = dict(filter(lambda d: isinstance(d[0], str), self._ql.reg.save().items()))
self._saved_reg_dump = dict(filter(lambda d: isinstance(d[0], str), self.ql.reg.save().items()))

next_stop = handle_bnj(self._ql, self.cur_addr)
_, next_stop = handle_bnj(self.ql, self.cur_addr)

if next_stop is CODE_END:
return True
Expand Down Expand Up @@ -226,8 +226,8 @@ def do_start(self: QlQdb, address: str = "", *args) -> None:
pause at entry point by setting a temporary breakpoint on it
"""

# entry = self._ql.loader.entry_point # ld.so
# entry = self._ql.loader.elf_entry # .text of binary
# entry = self.ql.loader.entry_point # ld.so
# entry = self.ql.loader.elf_entry # .text of binary

self._run()

Expand All @@ -236,7 +236,7 @@ def do_breakpoint(self: QlQdb, address: str = "") -> None:
set breakpoint on specific address
"""

# address = parse_int(address) if address else self._ql.reg.arch_pc
# address = parse_int(address) if address else self.ql.reg.arch_pc
address = parse_int(address) if address else self.cur_addr

self.set_breakpoint(address)
Expand All @@ -263,7 +263,7 @@ def do_examine(self: QlQdb, line: str) -> None:
"""

try:
if not examine_mem(self._ql, line):
if not examine_mem(self.ql, line):
self.do_help("examine")
except:
print(f"{color.RED}[!] something went wrong ...{color.END}")
Expand All @@ -273,7 +273,7 @@ def do_show(self: QlQdb, *args) -> None:
show some runtime information
"""

self._ql.mem.show_mapinfo()
self.ql.mem.show_mapinfo()
print(f"Breakpoints: {[hex(addr) for addr in self.bp_list.keys()]}")

def do_disassemble(self: QlQdb, address: str, /, *args, **kwargs) -> None:
Expand All @@ -282,7 +282,7 @@ def do_disassemble(self: QlQdb, address: str, /, *args, **kwargs) -> None:
"""

try:
context_asm(self._ql, parse_int(address), 4)
context_asm(self.ql, parse_int(address), 4)
except:
print(f"{color.RED}[!] something went wrong ...{color.END}")

Expand All @@ -300,9 +300,8 @@ def do_quit(self: QlQdb, *args) -> bool:
"""
exit Qdb and stop running qiling instance
"""
# breakpoint()

self._ql.stop()
self.ql.stop()
return True

do_r = do_run
Expand Down
14 changes: 8 additions & 6 deletions qiling/debugger/qdb/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def _get_mode(bits):

# parse unsigned integer from string
def parse_int(s: str) -> int:
return int(s, 16) if s.startswith("0x") else int(s)
return int(s, 0)


# check wether negative value or not
Expand Down Expand Up @@ -103,11 +103,12 @@ def is_thumb(bits: int) -> bool:
return bits & 0x00000020 != 0


def disasm(ql: Qiling, address: int) -> Optional[int]:
# md = ql.create_disassembler()
def disasm(ql: Qiling, address: int, detail: bool = False) -> Optional[int]:

md = ql.disassembler
md.detail = detail
try:
ret = next(ql.disassembler.disasm(_read_inst(ql, address), address))
ret = next(md.disasm(_read_inst(ql, address), address))

except StopIteration:
ret = None
Expand Down Expand Up @@ -350,7 +351,7 @@ def regdst_eq_pc(op_str):
if ret_addr & 1:
ret_addr -= 1

return ret_addr
return (to_jump, ret_addr)


def handle_bnj_mips(ql: Qiling, cur_addr: str) -> int:
Expand All @@ -369,6 +370,7 @@ def _read_reg(regs, _reg):
# default breakpoint address if no jumps and branches here
ret_addr = cur_addr + MIPS_INST_SIZE

to_jump = False
if line.mnemonic.startswith('j') or line.mnemonic.startswith('b'):

# make sure at least delay slot executed
Expand Down Expand Up @@ -409,7 +411,7 @@ def _read_reg(regs, _reg):
# target address is always the rightmost one
ret_addr = targets[-1]

return ret_addr
return (to_jump, ret_addr)

class Breakpoint(object):
"""
Expand Down