diff --git a/qiling/extensions/tracing/README.md b/qiling/extensions/tracing/README.md new file mode 100644 index 000000000..08d2abf9d --- /dev/null +++ b/qiling/extensions/tracing/README.md @@ -0,0 +1 @@ +For latest documentation, please visit https://docs.qiling.io \ No newline at end of file diff --git a/qiling/extensions/tracing/__init__.py b/qiling/extensions/tracing/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/qiling/extensions/tracing/formats/__init__.py b/qiling/extensions/tracing/formats/__init__.py new file mode 100644 index 000000000..e9dc6e169 --- /dev/null +++ b/qiling/extensions/tracing/formats/__init__.py @@ -0,0 +1,2 @@ +# This code structure is copied and modified from the coverage extension +__all__ = ["base", "tenet"] \ No newline at end of file diff --git a/qiling/extensions/tracing/formats/base.py b/qiling/extensions/tracing/formats/base.py new file mode 100644 index 000000000..145944340 --- /dev/null +++ b/qiling/extensions/tracing/formats/base.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 +# +# Cross Platform and Multi Architecture Advanced Binary Emulation Framework +# This code structure is copied and modified from the coverage extension + +from abc import ABC, abstractmethod + + +class QlBaseTrace(ABC): + """ + An abstract base class for trace collectors. + To add support for a new coverage format, just derive from this class and implement + all the methods marked with the @abstractmethod decorator. + """ + + def __init__(self): + super().__init__() + + @property + @staticmethod + @abstractmethod + def FORMAT_NAME(): + raise NotImplementedError + + @abstractmethod + def activate(self): + pass + + @abstractmethod + def deactivate(self): + pass + + @abstractmethod + def dump_trace(self, trace_file): + pass \ No newline at end of file diff --git a/qiling/extensions/tracing/formats/registers.py b/qiling/extensions/tracing/formats/registers.py new file mode 100644 index 000000000..9d0431e52 --- /dev/null +++ b/qiling/extensions/tracing/formats/registers.py @@ -0,0 +1,111 @@ +import unicorn +from qiling.arch import arm64, arm, x86 + + + +class ArchRegs(): + """ + Stores architecture's registers relevant to tracing in a format + universally usable by different tracers + """ + + arm_registers = { + '$r0': unicorn.arm_const.UC_ARM_REG_R0, + '$r1': unicorn.arm_const.UC_ARM_REG_R1, + '$r2': unicorn.arm_const.UC_ARM_REG_R2, + '$r3': unicorn.arm_const.UC_ARM_REG_R3, + '$r4': unicorn.arm_const.UC_ARM_REG_R4, + '$r5': unicorn.arm_const.UC_ARM_REG_R5, + '$r6': unicorn.arm_const.UC_ARM_REG_R6, + '$r7': unicorn.arm_const.UC_ARM_REG_R7, + '$r8': unicorn.arm_const.UC_ARM_REG_R8, + '$r9': unicorn.arm_const.UC_ARM_REG_R9, + '$r10': unicorn.arm_const.UC_ARM_REG_R10, + '$r11': unicorn.arm_const.UC_ARM_REG_R11, + '$r12': unicorn.arm_const.UC_ARM_REG_R12, + '$sp': unicorn.arm_const.UC_ARM_REG_SP, + '$lr': unicorn.arm_const.UC_ARM_REG_LR, + '$pc': unicorn.arm_const.UC_ARM_REG_PC, + '$cpsr': unicorn.arm_const.UC_ARM_REG_CPSR + } + arm64_registers = { + "$pc": unicorn.arm64_const.UC_ARM64_REG_PC, + "$sp": unicorn.arm64_const.UC_ARM64_REG_SP, + "$x0": unicorn.arm64_const.UC_ARM64_REG_X0, + "$x1": unicorn.arm64_const.UC_ARM64_REG_X1, + "$x2": unicorn.arm64_const.UC_ARM64_REG_X2, + "$x3": unicorn.arm64_const.UC_ARM64_REG_X3, + "$x4": unicorn.arm64_const.UC_ARM64_REG_X4, + "$x5": unicorn.arm64_const.UC_ARM64_REG_X5, + "$x6": unicorn.arm64_const.UC_ARM64_REG_X6, + "$x7": unicorn.arm64_const.UC_ARM64_REG_X7, + "$x8": unicorn.arm64_const.UC_ARM64_REG_X8, + "$x9": unicorn.arm64_const.UC_ARM64_REG_X9, + "$x10": unicorn.arm64_const.UC_ARM64_REG_X10, + "$x11": unicorn.arm64_const.UC_ARM64_REG_X11, + "$x12": unicorn.arm64_const.UC_ARM64_REG_X12, + "$x13": unicorn.arm64_const.UC_ARM64_REG_X13, + "$x14": unicorn.arm64_const.UC_ARM64_REG_X14, + "$x15": unicorn.arm64_const.UC_ARM64_REG_X15, + "$x16": unicorn.arm64_const.UC_ARM64_REG_X16, + "$x17": unicorn.arm64_const.UC_ARM64_REG_X17, + "$x18": unicorn.arm64_const.UC_ARM64_REG_X18, + "$x19": unicorn.arm64_const.UC_ARM64_REG_X19, + "$x20": unicorn.arm64_const.UC_ARM64_REG_X20, + "$x21": unicorn.arm64_const.UC_ARM64_REG_X21, + "$x22": unicorn.arm64_const.UC_ARM64_REG_X22, + "$x23": unicorn.arm64_const.UC_ARM64_REG_X23, + "$x24": unicorn.arm64_const.UC_ARM64_REG_X24, + "$x25": unicorn.arm64_const.UC_ARM64_REG_X25, + "$x26": unicorn.arm64_const.UC_ARM64_REG_X26, + "$x27": unicorn.arm64_const.UC_ARM64_REG_X27, + "$x28": unicorn.arm64_const.UC_ARM64_REG_X28, + "$x29": unicorn.arm64_const.UC_ARM64_REG_X29, + } + + x86_registers = { + '$eax': unicorn.x86_const.UC_X86_REG_EAX, + '$ebx': unicorn.x86_const.UC_X86_REG_EBX, + '$ecx': unicorn.x86_const.UC_X86_REG_ECX, + '$edx': unicorn.x86_const.UC_X86_REG_ECX, + '$ebp': unicorn.x86_const.UC_X86_REG_EBP, + '$esp': unicorn.x86_const.UC_X86_REG_ESP, + '$esi': unicorn.x86_const.UC_X86_REG_ESI, + '$edi': unicorn.x86_const.UC_X86_REG_EDI, + '$eip': unicorn.x86_const.UC_X86_REG_EIP, + } + x86_64_registers = { + '$rax': unicorn.x86_const.UC_X86_REG_RAX , + '$rbx': unicorn.x86_const.UC_X86_REG_RBX, + '$rcx': unicorn.x86_const.UC_X86_REG_RCX, + '$rdx': unicorn.x86_const.UC_X86_REG_RCX, + '$rbp': unicorn.x86_const.UC_X86_REG_RBP, + '$rsp': unicorn.x86_const.UC_X86_REG_RSP, + '$rsi': unicorn.x86_const.UC_X86_REG_RSI, + '$rdi': unicorn.x86_const.UC_X86_REG_RDI, + '$rip': unicorn.x86_const.UC_X86_REG_RIP, + '$r8': unicorn.x86_const.UC_X86_REG_R8, + '$r9': unicorn.x86_const.UC_X86_REG_R9, + '$r10': unicorn.x86_const.UC_X86_REG_R10, + '$r11': unicorn.x86_const.UC_X86_REG_R11, + '$r12': unicorn.x86_const.UC_X86_REG_R12, + '$r13': unicorn.x86_const.UC_X86_REG_R13, + '$r14': unicorn.x86_const.UC_X86_REG_R14, + '$r15': unicorn.x86_const.UC_X86_REG_R15, + } + + def __init__(self, arch): + if isinstance(arch, arm.QlArchARM): + self.registers = self.arm_registers + self.pc_key = "$pc" + elif isinstance(arch,arm64.QlArchARM64): + self.registers = self.arm64_registers + self.pc_key = "$pc" + elif isinstance(arch,x86.QlArchX86): + self.registers = self.x86_registers + self.pc_key = "$eip" + elif isinstance(arch,x86.QlArchX8664): + self.registers = self.x86_64_registers + self.pc_key = "$rip" + else: + raise("Unsupported arch") diff --git a/qiling/extensions/tracing/formats/tenet.py b/qiling/extensions/tracing/formats/tenet.py new file mode 100644 index 000000000..161e37ceb --- /dev/null +++ b/qiling/extensions/tracing/formats/tenet.py @@ -0,0 +1,103 @@ +# This code structure is copied and modified from the coverage extension + +import qiling +from qiling.const import QL_ENDIAN +from .base import QlBaseTrace +from .registers import ArchRegs +from unicorn.unicorn_const import UC_MEM_READ, UC_MEM_WRITE + +class QlDrTrace(QlBaseTrace): + """ + Traces emulation and puts it into a format viewable in Tenet + IDAPro plugin Tenet: https://github.com/gaasedelen/tenet + """ + + FORMAT_NAME = "tenet" + + def __init__(self, ql: qiling.Qiling): + super().__init__() + self.ql = ql + self.deltas = [] + self.current_delta = [] + self.current_pc = 0x0 + + self.arch_regs = ArchRegs(ql.arch) + self.register_values= dict() + + # Initialize with ridiculous value so first delta isn't missed + for register in self.arch_regs.registers: + self.register_values[register] = 0xFEEDBABE + + + def _add_delta(self): + # Cover glitch cases where nothing changed + if self.current_delta != []: + # Join all delta fragments into delta line and append + self.deltas.append(",".join(self.current_delta)) + self.current_delta = [] + return + + + @staticmethod + def mem_access_callback(ql, access, address, size, value, self): + access_type = None + # Set delta based on access type + if access == UC_MEM_READ: + # Since we are reading memory, we just read it ourselves + access_type = "mr" + value = ql.mem.read(address, size) + delta = f"{access_type}={hex(address)}:{value.hex()}" + elif access == UC_MEM_WRITE: + # Hook is before it's written, so we have to use the "value" + access_type = "mw" + if ql.arch.endian == QL_ENDIAN.EL: + endian = 'little' + else: + endian = 'big' + if value < 0: + sign = True + else: + sign = False + value = int.to_bytes(value, size, endian, signed=sign) + delta = f"{access_type}={hex(address)}:{value.hex()}" + else: + print("Invalid access type") + return + # =:... + self.current_delta.append(delta) + return + + @staticmethod + def code_callback(ql, address, size, self): + # Check if PC changed for next delta + pc = ql.arch.regs.read(self.arch_regs.registers[self.arch_regs.pc_key]) + if pc != self.current_pc: + self._add_delta() + self.current_pc = pc + # Go through each register and see if it changed + for register in self.arch_regs.registers: + value = ql.arch.regs.read(self.arch_regs.registers[register]) + if value != self.register_values[register]: + # = + delta = f"{register[1::]}={hex(value)}" + self.current_delta.append(delta) + # Update value + self.register_values[register] = value + + return + + def activate(self): + self.code_callback = self.ql.hook_code(self.code_callback, user_data=self) + self.mem_write_callback = self.ql.hook_mem_read(self.mem_access_callback, user_data=self) + self.mem_read_callback = self.ql.hook_mem_write(self.mem_access_callback, user_data=self) + + def deactivate(self): + self.ql.hook_del(self.code_callback) + self.ql.hook_del(self.mem_write_callback) + self.ql.hook_del(self.mem_read_callback) + + def dump_trace(self, trace_file: str): + with open(trace_file, "w") as trace: + # Write out each delta on a separate line + for delta in self.deltas: + trace.write(delta + "\n") diff --git a/qiling/extensions/tracing/utils.py b/qiling/extensions/tracing/utils.py new file mode 100644 index 000000000..a3d8ab1ab --- /dev/null +++ b/qiling/extensions/tracing/utils.py @@ -0,0 +1,46 @@ +# This code structure is copied and modified from the coverage extension + +from contextlib import contextmanager +from .formats import * + + +# Returns subclasses recursively. +def get_all_subclasses(cls): + all_subclasses = [] + + for subclass in cls.__subclasses__(): + all_subclasses.append(subclass) + all_subclasses.extend(get_all_subclasses(subclass)) + + return all_subclasses + +class TraceFactory(): + def __init__(self): + self.trace_collectors = {subcls.FORMAT_NAME:subcls for subcls in get_all_subclasses(base.QlBaseTrace)} + + @property + def formats(self): + return self.trace_collectors.keys() + + def get_trace_collector(self, ql, name): + return self.trace_collectors[name](ql) + +factory = TraceFactory() + +@contextmanager +def collect_trace(ql, name: str, trace_file: str): + """ + Context manager for emulating a given piece of code with tracing. + Example: + with collect_trace(ql, 'tenet', 'trace.0.log'): + ql.run(...) + """ + + trace = factory.get_trace_collector(ql, name) + trace.activate() + try: + yield + finally: + trace.deactivate() + if trace_file: + trace.dump_trace(trace_file)