diff --git a/pyproject.toml b/pyproject.toml index 46ffd7f7c..8c2cdfa47 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ keywords = [ [tool.poetry.dependencies] python = "^3.8" capstone = "^4" -unicorn = "2.0.1.post1" +unicorn = "2.1.3" pefile = ">=2022.5.30" python-registry = "^1.3.1" keystone-engine = "^0.9.2" diff --git a/qiling/arch/arm.py b/qiling/arch/arm.py index 1a5b7f8c2..8f9b71d38 100644 --- a/qiling/arch/arm.py +++ b/qiling/arch/arm.py @@ -13,6 +13,7 @@ from qiling import Qiling from qiling.arch.arch import QlArch from qiling.arch import arm_const +from qiling.arch.cpr import QlCprManager from qiling.arch.models import ARM_CPU_MODEL from qiling.arch.register import QlRegisterManager from qiling.const import QL_ARCH, QL_ENDIAN @@ -69,6 +70,17 @@ def is_thumb(self) -> bool: def endian(self) -> QL_ENDIAN: return QL_ENDIAN.EB if self.regs.cpsr & (1 << 9) else QL_ENDIAN.EL + @cached_property + def cpr(self) -> QlCprManager: + """Coprocessor Registers. + """ + + regs_map = dict( + **arm_const.reg_cpr + ) + + return QlCprManager(self.uc, regs_map) + @property def effective_pc(self) -> int: """Get effective PC value, taking Thumb mode into account. @@ -119,6 +131,6 @@ def assembler(self) -> Ks: def enable_vfp(self) -> None: # set full access to cp10 and cp11 - self.regs.c1_c0_2 = self.regs.c1_c0_2 | (0b11 << 20) | (0b11 << 22) + self.cpr.CPACR |= (0b11 << 20) | (0b11 << 22) - self.regs.fpexc = (1 << 30) + self.regs.fpexc = (0b1 << 30) diff --git a/qiling/arch/arm64.py b/qiling/arch/arm64.py index ba9f69b35..bfe54e38e 100644 --- a/qiling/arch/arm64.py +++ b/qiling/arch/arm64.py @@ -13,6 +13,7 @@ from qiling import Qiling from qiling.arch.arch import QlArch from qiling.arch import arm64_const +from qiling.arch.cpr64 import QlCpr64Manager from qiling.arch.models import ARM64_CPU_MODEL from qiling.arch.register import QlRegisterManager from qiling.const import QL_ARCH, QL_ENDIAN @@ -56,6 +57,17 @@ def regs(self) -> QlRegisterManager: def endian(self) -> QL_ENDIAN: return QL_ENDIAN.EL + @cached_property + def cpr(self) -> QlCpr64Manager: + """Coprocessor Registers. + """ + + regs_map = dict( + **arm64_const.reg_cpr + ) + + return QlCpr64Manager(self.uc, regs_map) + @cached_property def disassembler(self) -> Cs: return Cs(CS_ARCH_ARM64, CS_MODE_ARM) @@ -65,4 +77,4 @@ def assembler(self) -> Ks: return Ks(KS_ARCH_ARM64, KS_MODE_ARM) def enable_vfp(self): - self.regs.cpacr_el1 = self.regs.cpacr_el1 | 0x300000 + self.regs.cpacr_el1 |= (0b11 << 20) diff --git a/qiling/arch/arm64_const.py b/qiling/arch/arm64_const.py index 0845df59a..eaadb8363 100644 --- a/qiling/arch/arm64_const.py +++ b/qiling/arch/arm64_const.py @@ -5,6 +5,33 @@ from unicorn.arm64_const import * + +# coprocessor registers +reg_cpr = { + 'TPIDR_EL0': (3, 3, 13, 0, 2), + 'TPIDRRO_EL0': (3, 3, 13, 0, 3), + 'TPIDR_EL1': (3, 0, 13, 0, 4), + 'ELR_EL1': (3, 0, 4, 0, 1), + 'ELR_EL2': (3, 4, 4, 0, 1), + 'ELR_EL3': (3, 6, 4, 0, 1), + 'SP_EL0': (3, 0, 4, 1, 0), + 'SP_EL1': (3, 4, 4, 1, 0), + 'SP_EL2': (3, 6, 4, 1, 0), + 'TTBR0_EL1': (3, 0, 2, 0, 0), + 'TTBR1_EL1': (3, 0, 2, 0, 1), + 'ESR_EL1': (3, 0, 5, 2, 0), + 'ESR_EL2': (3, 4, 5, 2, 0), + 'ESR_EL3': (3, 6, 5, 2, 0), + 'FAR_EL1': (3, 0, 6, 0, 0), + 'FAR_EL2': (3, 4, 6, 0, 0), + 'FAR_EL3': (3, 6, 6, 0, 0), + 'PAR_EL1': (3, 0, 7, 4, 0), + 'MAIR_EL1': (3, 0, 10, 2, 0), + 'VBAR_EL1': (3, 0, 12, 0, 0), + 'VBAR_EL2': (3, 4, 12, 0, 0), + 'VBAR_EL3': (3, 6, 12, 0, 0) +} + reg_map = { "x0": UC_ARM64_REG_X0, "x1": UC_ARM64_REG_X1, @@ -41,8 +68,6 @@ "pc": UC_ARM64_REG_PC, "lr": UC_ARM64_REG_LR, "cpacr_el1": UC_ARM64_REG_CPACR_EL1, - "tpidr_el0": UC_ARM64_REG_TPIDR_EL0, - "pstate": UC_ARM64_REG_PSTATE } reg_map_b = { diff --git a/qiling/arch/arm_const.py b/qiling/arch/arm_const.py index 1726a8071..5310938cb 100644 --- a/qiling/arch/arm_const.py +++ b/qiling/arch/arm_const.py @@ -5,27 +5,34 @@ from unicorn.arm_const import * + +# coprocessor registers +reg_cpr = { + 'CPACR': (15, 0, 1, 0, 2, 0, False), + 'TPIDRURO': (15, 0, 13, 0, 3, 0, False) +} + reg_map = { - "r0": UC_ARM_REG_R0, - "r1": UC_ARM_REG_R1, - "r2": UC_ARM_REG_R2, - "r3": UC_ARM_REG_R3, - "r4": UC_ARM_REG_R4, - "r5": UC_ARM_REG_R5, - "r6": UC_ARM_REG_R6, - "r7": UC_ARM_REG_R7, - "r8": UC_ARM_REG_R8, - "r9": UC_ARM_REG_R9, + "r0": UC_ARM_REG_R0, + "r1": UC_ARM_REG_R1, + "r2": UC_ARM_REG_R2, + "r3": UC_ARM_REG_R3, + "r4": UC_ARM_REG_R4, + "r5": UC_ARM_REG_R5, + "r6": UC_ARM_REG_R6, + "r7": UC_ARM_REG_R7, + "r8": UC_ARM_REG_R8, + "r9": UC_ARM_REG_R9, "r10": UC_ARM_REG_R10, "r11": UC_ARM_REG_R11, "r12": UC_ARM_REG_R12, - "sp": UC_ARM_REG_SP, - "lr": UC_ARM_REG_LR, - "pc": UC_ARM_REG_PC, + "sp": UC_ARM_REG_SP, + "lr": UC_ARM_REG_LR, + "pc": UC_ARM_REG_PC, - "cpsr": UC_ARM_REG_CPSR, - "c1_c0_2": UC_ARM_REG_C1_C0_2, - "c13_c0_3": UC_ARM_REG_C13_C0_3, + "apsr": UC_ARM_REG_APSR, + "cpsr": UC_ARM_REG_CPSR, + "spsr": UC_ARM_REG_SPSR, "fpexc": UC_ARM_REG_FPEXC } diff --git a/qiling/arch/arm_utils.py b/qiling/arch/arm_utils.py index 14181a699..8a8830b80 100644 --- a/qiling/arch/arm_utils.py +++ b/qiling/arch/arm_utils.py @@ -3,11 +3,13 @@ # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # +from typing import Mapping + from qiling import Qiling from qiling.const import QL_ENDIAN -def init_linux_traps(ql: Qiling, address_map) -> None: +def init_linux_traps(ql: Qiling, address_map: Mapping[str, int]) -> None: # If the compiler for the target does not provides some primitives for some # reasons (e.g. target limitations), the kernel is responsible to assist # with these operations. @@ -16,51 +18,32 @@ def init_linux_traps(ql: Qiling, address_map) -> None: # https://elixir.bootlin.com/linux/latest/source/arch/arm/kernel/entry-armv.S#L899 trap_map = { - 'memory_barrier': # @ 0xffff0fa0 - # mcr p15, 0, r0, c7, c10, 5 - # nop - # mov pc, lr - ''' - ba 0f 07 ee - 00 f0 20 e3 - 0e f0 a0 e1 - ''', + 'memory_barrier': bytes.fromhex( + 'ba 0f 07 ee' # mcr p15, 0, r0, c7, c10, 5 + '00 f0 20 e3' # nop + '0e f0 a0 e1' # mov pc, lr + ), - 'cmpxchg': # @ 0xffff0fc0 - # ldr r3, [r2] - # subs r3, r3, r0 - # streq r1, [r2] - # rsbs r0, r3, #0 - # mov pc, lr - ''' - 00 30 92 e5 - 00 30 53 e0 - 00 10 82 05 - 00 00 73 e2 - 0e f0 a0 e1 - ''', + 'cmpxchg': bytes.fromhex( + '00 30 92 e5' # ldr r3, [r2] + '00 30 53 e0' # subs r3, r3, r0 + '00 10 82 05' # streq r1, [r2] + '00 00 73 e2' # rsbs r0, r3, #0 + '0e f0 a0 e1' # mov pc, lr + ), - 'get_tls': # @ 0xffff0fe0 - # ldr r0, [pc, #(16 - 8)] - # mov pc, lr - # mrc p15, 0, r0, c13, c0, 3 - # padding (e7 fd de f1) - # data: - # "\x00\x00\x00\x00" - # "\x00\x00\x00\x00" - # "\x00\x00\x00\x00" - ''' - 08 00 9f e5 - 0e f0 a0 e1 - 70 0f 1d ee - e7 fd de f1 - 00 00 00 00 - 00 00 00 00 - 00 00 00 00 - ''' + 'get_tls': bytes.fromhex( + '08 00 9f e5' # ldr r0, [pc, #(16 - 8)] + '0e f0 a0 e1' # mov pc, lr + '70 0f 1d ee' # mrc p15, 0, r0, c13, c0, 3 + 'e7 fd de f1' # padding (e7 fd de f1) + '00 00 00 00' # data + '00 00 00 00' # data + '00 00 00 00' # data + ) } if address_map: @@ -74,16 +57,14 @@ def init_linux_traps(ql: Qiling, address_map) -> None: ql.mem.map(base, size, info="[arm_traps]") - for trap_name, trap_hex in trap_map.items(): - trap_code = bytes.fromhex(trap_hex) - - if ql.arch.endian == QL_ENDIAN.EB: + for trap_name, trap_code in trap_map.items(): + if ql.arch.endian is QL_ENDIAN.EB: trap_code = swap_endianness(trap_code) if trap_name in address_map: ql.mem.write(address_map[trap_name], trap_code) - ql.log.debug(f'Set kernel trap: {trap_name} at {address_map[trap_name]:#x}') + ql.log.debug(f'Setting kernel trap {trap_name} at {address_map[trap_name]:#x}') def swap_endianness(s: bytes, blksize: int = 4) -> bytes: diff --git a/qiling/arch/cpr.py b/qiling/arch/cpr.py new file mode 100644 index 000000000..d1eb22575 --- /dev/null +++ b/qiling/arch/cpr.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +# +# Cross Platform and Multi Architecture Advanced Binary Emulation Framework +# + +from __future__ import annotations + +import weakref + +from typing import TYPE_CHECKING, Dict, Mapping, Tuple + +if TYPE_CHECKING: + from unicorn import Uc + +_CPR_T = Tuple[int, int, int, int, int, int, bool] + + +class QlCprManager: + """Enables access to ARM coprocessor registers. + """ + + # for more information about various aarch32 coprocessor register, pelase refer to: + # https://developer.arm.com/documentation/ddi0601/latest/AArch32-Registers + + def __init__(self, uc: Uc, regs_map: Mapping[str, _CPR_T]) -> None: + """Initialize the coprocessor registers manager. + """ + + # this funny way of initialization is used to avoid calling self setattr and + # getattr upon init. if it did, it would go into an endless recursion + self.register_mapping: Dict[str, _CPR_T] + super().__setattr__('register_mapping', regs_map) + + self.uc: Uc = weakref.proxy(uc) + + def __getattr__(self, name: str) -> int: + if name in self.register_mapping: + return self.read(*self.register_mapping[name]) + + else: + return super().__getattribute__(name) + + def __setattr__(self, name: str, value: int) -> None: + if name in self.register_mapping: + self.write(*self.register_mapping[name], value) + + else: + super().__setattr__(name, value) + + def read(self, coproc: int, opc1: int, crn: int, crm: int, opc2: int, el: int, is_64: bool) -> int: + """Read a coprocessor register value. + + Args: + coproc : coprocessor to access, value varies between 0 and 15 + opc1 : opcode 1, value varies between 0 and 7 + crn : coprocessor register to access (CRn), value varies between 0 and 15 + crm : additional coprocessor register to access (CRm), value varies between 0 and 15 + opc2 : opcode 2, value varies between 0 and 7 + el : the exception level the coprocessor register belongs to, value varies between 0 and 3 + is_64 : indicates whether this is a 64-bit register + + Returns: value of coprocessor register + """ + + return self.uc.cpr_read(coproc, opc1, crn, crm, opc2, el, is_64) + + def write(self, coproc: int, opc1: int, crn: int, crm: int, opc2: int, el: int, is_64: bool, value: int) -> None: + """Write a coprocessor register value. + + Args: + coproc : coprocessor to access, value varies between 0 and 15 + opc1 : opcode 1, value varies between 0 and 7 + crn : coprocessor register to access (CRn), value varies between 0 and 15 + crm : additional coprocessor register to access (CRm), value varies between 0 and 15 + opc2 : opcode 2, value varies between 0 and 7 + el : the exception level the coprocessor register belongs to, value varies between 0 and 3 + is_64 : indicates whether this is a 64-bit register + value : value to write + """ + + self.uc.cpr_write(coproc, opc1, crn, crm, opc2, el, is_64, value) + + def save(self) -> Dict[str, int]: + """Save registers. + """ + + return dict((name, self.read(*reg)) for name, reg in self.register_mapping.items()) + + def restore(self, context: Mapping[str, int]) -> None: + """Restore registers. + """ + + for name, val in context.items(): + self.write(*self.register_mapping[name], val) + + +__all__ = ['QlCprManager'] diff --git a/qiling/arch/cpr64.py b/qiling/arch/cpr64.py new file mode 100644 index 000000000..255ae9359 --- /dev/null +++ b/qiling/arch/cpr64.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python3 +# +# Cross Platform and Multi Architecture Advanced Binary Emulation Framework +# + +from __future__ import annotations + +import weakref + +from typing import TYPE_CHECKING, Dict, Mapping, Tuple + +if TYPE_CHECKING: + from unicorn import Uc + +_CPR_T = Tuple[int, int, int, int, int] + + +class QlCpr64Manager: + """Enables access to ARM64 coprocessor registers. + """ + + # for more information about various aarch32 coprocessor register, pelase refer to: + # https://developer.arm.com/documentation/ddi0601/latest/AArch64-Registers + + def __init__(self, uc: Uc, regs_map: Mapping[str, _CPR_T]) -> None: + """Initialize the coprocessor registers manager. + """ + + # this funny way of initialization is used to avoid calling self setattr and + # getattr upon init. if it did, it would go into an endless recursion + self.register_mapping: Dict[str, _CPR_T] + super().__setattr__('register_mapping', regs_map) + + self.uc: Uc = weakref.proxy(uc) + + def __getattr__(self, name: str) -> int: + if name in self.register_mapping: + return self.read(*self.register_mapping[name]) + + else: + return super().__getattribute__(name) + + def __setattr__(self, name: str, value: int) -> None: + if name in self.register_mapping: + self.write(*self.register_mapping[name], value) + + else: + super().__setattr__(name, value) + + def read(self, op0: int, op1: int, crn: int, crm: int, op2: int) -> int: + """Read a coprocessor register value. + + Args: + op0 : opcode 0, value varies between 0 and 3 + op1 : opcode 1, value varies between 0 and 7 + crn : coprocessor register to access (CRn), value varies between 0 and 15 + crm : additional coprocessor register to access (CRm), value varies between 0 and 15 + op2 : opcode 2, value varies between 0 and 7 + + Returns: value of coprocessor register + """ + + return self.uc.cpr_read(op0, op1, crn, crm, op2) + + def write(self, op0: int, op1: int, crn: int, crm: int, op2: int, value: int) -> None: + """Write a coprocessor register value. + + Args: + op0 : opcode 0, value varies between 0 and 3 + op1 : opcode 1, value varies between 0 and 7 + crn : coprocessor register to access (CRn), value varies between 0 and 15 + crm : additional coprocessor register to access (CRm), value varies between 0 and 15 + op2 : opcode 2, value varies between 0 and 7 + value : value to write + """ + + self.uc.cpr_write(op0, op1, crn, crm, op2, value) + + def save(self) -> Dict[str, int]: + """Save registers. + """ + + return dict((name, self.read(*reg)) for name, reg in self.register_mapping.items()) + + def restore(self, context: Mapping[str, int]) -> None: + """Restore registers. + """ + + for name, val in context.items(): + self.write(*self.register_mapping[name], val) + + +__all__ = ['QlCpr64Manager'] diff --git a/qiling/arch/mips.py b/qiling/arch/mips.py index 87a703b37..4785d5b9e 100644 --- a/qiling/arch/mips.py +++ b/qiling/arch/mips.py @@ -39,9 +39,7 @@ def uc(self) -> Uc: @cached_property def regs(self) -> QlRegisterManager: regs_map = dict( - **mips_const.reg_map, - **mips_const.reg_map_afpr128, - **mips_const.reg_map_fpu + **mips_const.reg_map ) pc_reg = 'pc' diff --git a/qiling/arch/mips_const.py b/qiling/arch/mips_const.py index 1c4467ebb..c7f1a5722 100644 --- a/qiling/arch/mips_const.py +++ b/qiling/arch/mips_const.py @@ -39,6 +39,8 @@ "r30": UC_MIPS_REG_30, "r31": UC_MIPS_REG_31, + # aliases + "pc": UC_MIPS_REG_PC, "zero": UC_MIPS_REG_ZERO, "at": UC_MIPS_REG_AT, "v0": UC_MIPS_REG_V0, @@ -69,52 +71,14 @@ "k1": UC_MIPS_REG_K1, "gp": UC_MIPS_REG_GP, "sp": UC_MIPS_REG_SP, + "fp": UC_MIPS_REG_FP, "s8": UC_MIPS_REG_S8, "ra": UC_MIPS_REG_RA, - "status": UC_MIPS_REG_INVALID, - "lo": UC_MIPS_REG_LO, + "hi": UC_MIPS_REG_HI, - "badvaddr": UC_MIPS_REG_INVALID, - "cause": UC_MIPS_REG_INVALID, - "pc": UC_MIPS_REG_PC -} + "lo": UC_MIPS_REG_LO, -reg_map_afpr128 = { "cp0_config3": UC_MIPS_REG_CP0_CONFIG3, - "cp0_userlocal": UC_MIPS_REG_CP0_USERLOCAL -} - -reg_map_fpu = { - "f0": UC_MIPS_REG_F0, - "f1": UC_MIPS_REG_F1, - "f2": UC_MIPS_REG_F2, - "f3": UC_MIPS_REG_F3, - "f4": UC_MIPS_REG_F4, - "f5": UC_MIPS_REG_F5, - "f6": UC_MIPS_REG_F6, - "f7": UC_MIPS_REG_F7, - "f8": UC_MIPS_REG_F8, - "f9": UC_MIPS_REG_F9, - "f10": UC_MIPS_REG_F10, - "f11": UC_MIPS_REG_F11, - "f12": UC_MIPS_REG_F12, - "f13": UC_MIPS_REG_F13, - "f14": UC_MIPS_REG_F14, - "f15": UC_MIPS_REG_F15, - "f16": UC_MIPS_REG_F16, - "f17": UC_MIPS_REG_F17, - "f18": UC_MIPS_REG_F18, - "f19": UC_MIPS_REG_F19, - "f20": UC_MIPS_REG_F20, - "f21": UC_MIPS_REG_F21, - "f22": UC_MIPS_REG_F22, - "f23": UC_MIPS_REG_F23, - "f24": UC_MIPS_REG_F24, - "f25": UC_MIPS_REG_F25, - "f26": UC_MIPS_REG_F26, - "f27": UC_MIPS_REG_F27, - "f28": UC_MIPS_REG_F28, - "f29": UC_MIPS_REG_F29, - "f30": UC_MIPS_REG_F30, - "f31": UC_MIPS_REG_F31 + "cp0_userlocal": UC_MIPS_REG_CP0_USERLOCAL, + "cp0_status": UC_MIPS_REG_CP0_STATUS } diff --git a/qiling/arch/msr.py b/qiling/arch/msr.py index 8ea13cafc..84cac5819 100644 --- a/qiling/arch/msr.py +++ b/qiling/arch/msr.py @@ -3,7 +3,14 @@ # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # -from unicorn import Uc +from __future__ import annotations + +import weakref + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from unicorn import Uc class QlMsrManager: @@ -11,7 +18,7 @@ class QlMsrManager: """ def __init__(self, uc: Uc) -> None: - self.uc = uc + self.uc: Uc = weakref.proxy(uc) def read(self, msr: int) -> int: """Read a model-specific register value. diff --git a/qiling/arch/x86.py b/qiling/arch/x86.py index e4b64a62f..b5b34642b 100644 --- a/qiling/arch/x86.py +++ b/qiling/arch/x86.py @@ -89,7 +89,9 @@ def regs(self) -> QlRegisterManager: **x86_const.reg_map_cr, **x86_const.reg_map_dr, **x86_const.reg_map_st, - **x86_const.reg_map_misc + **x86_const.reg_map_misc, + **x86_const.reg_map_xmm, + **x86_const.reg_map_ymm ) pc_reg = 'eip' @@ -118,6 +120,7 @@ def regs(self) -> QlRegisterManager: **x86_const.reg_map_32, **x86_const.reg_map_64, **x86_const.reg_map_cr, + **x86_const.reg_map_cr_64, **x86_const.reg_map_dr, **x86_const.reg_map_st, **x86_const.reg_map_misc, @@ -126,7 +129,9 @@ def regs(self) -> QlRegisterManager: **x86_const.reg_map_64_d, **x86_const.reg_map_seg_base, **x86_const.reg_map_xmm, + **x86_const.reg_map_xmm_64, **x86_const.reg_map_ymm, + **x86_const.reg_map_ymm_64, **x86_const.reg_map_zmm ) diff --git a/qiling/arch/x86_const.py b/qiling/arch/x86_const.py index 0bdfe5b46..f4a6035e4 100644 --- a/qiling/arch/x86_const.py +++ b/qiling/arch/x86_const.py @@ -152,7 +152,10 @@ "cr1": UC_X86_REG_CR1, "cr2": UC_X86_REG_CR2, "cr3": UC_X86_REG_CR3, - "cr4": UC_X86_REG_CR4, + "cr4": UC_X86_REG_CR4 +} + +reg_map_cr_64 = { "cr8": UC_X86_REG_CR8 } @@ -207,7 +210,10 @@ "xmm4": UC_X86_REG_XMM4, "xmm5": UC_X86_REG_XMM5, "xmm6": UC_X86_REG_XMM6, - "xmm7": UC_X86_REG_XMM7, + "xmm7": UC_X86_REG_XMM7 +} + +reg_map_xmm_64 = { "xmm8": UC_X86_REG_XMM8, "xmm9": UC_X86_REG_XMM9, "xmm10": UC_X86_REG_XMM10, @@ -243,6 +249,9 @@ "ymm5": UC_X86_REG_YMM5, "ymm6": UC_X86_REG_YMM6, "ymm7": UC_X86_REG_YMM7, +} + +reg_map_ymm_64 = { "ymm8": UC_X86_REG_YMM8, "ymm9": UC_X86_REG_YMM9, "ymm10": UC_X86_REG_YMM10, diff --git a/qiling/cc/__init__.py b/qiling/cc/__init__.py index 126bf620a..99c9e5643 100644 --- a/qiling/cc/__init__.py +++ b/qiling/cc/__init__.py @@ -1,8 +1,8 @@ #!/usr/bin/env python3 -# +# # Cross Platform and Multi Architecture Advanced Binary Emulation Framework -from typing import Callable, Sequence, Tuple +from typing import Callable, Sequence, Tuple, Union from qiling.arch.arch import QlArch @@ -175,3 +175,9 @@ def reserve(self, nslots: int) -> None: si = self._argregs[:nslots].count(None) self.arch.regs.arch_sp -= (self._shadow + si) * self._asize + + +def make_arg_list(*args, maxargs: int = 16) -> Tuple[Union[int, None]]: + assert len(args) <= maxargs + + return args + (None,) * (maxargs - len(args)) diff --git a/qiling/cc/arm.py b/qiling/cc/arm.py index 7974503e0..51d798b23 100644 --- a/qiling/cc/arm.py +++ b/qiling/cc/arm.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# +# # Cross Platform and Multi Architecture Advanced Binary Emulation Framework from unicorn.arm_const import UC_ARM_REG_R0, UC_ARM_REG_R1, UC_ARM_REG_R2, UC_ARM_REG_R3 @@ -8,13 +8,15 @@ UC_ARM64_REG_X4, UC_ARM64_REG_X5, UC_ARM64_REG_X6, UC_ARM64_REG_X7 ) -from qiling.cc import QlCommonBaseCC +from qiling.cc import QlCommonBaseCC, make_arg_list class QlArmBaseCC(QlCommonBaseCC): """Calling convention base class for ARM-based systems. Supports arguments passing over registers and stack. """ + _retaddr_on_stack = False + @staticmethod def getNumSlots(argbits: int) -> int: return 1 @@ -24,14 +26,12 @@ def setReturnAddress(self, addr: int) -> None: def unwind(self, nslots: int) -> int: # TODO: cleanup? - return self.arch.stack_pop() + return self.arch.regs.lr class aarch64(QlArmBaseCC): - _retaddr_on_stack = False _retreg = UC_ARM64_REG_X0 - _argregs = (UC_ARM64_REG_X0, UC_ARM64_REG_X1, UC_ARM64_REG_X2, UC_ARM64_REG_X3, UC_ARM64_REG_X4, UC_ARM64_REG_X5, UC_ARM64_REG_X6, UC_ARM64_REG_X7) + (None, ) * 8 + _argregs = make_arg_list(UC_ARM64_REG_X0, UC_ARM64_REG_X1, UC_ARM64_REG_X2, UC_ARM64_REG_X3, UC_ARM64_REG_X4, UC_ARM64_REG_X5, UC_ARM64_REG_X6, UC_ARM64_REG_X7) class aarch32(QlArmBaseCC): - _retaddr_on_stack = False _retreg = UC_ARM_REG_R0 - _argregs = (UC_ARM_REG_R0, UC_ARM_REG_R1, UC_ARM_REG_R2, UC_ARM_REG_R3) + (None, ) * 12 + _argregs = make_arg_list(UC_ARM_REG_R0, UC_ARM_REG_R1, UC_ARM_REG_R2, UC_ARM_REG_R3) diff --git a/qiling/cc/intel.py b/qiling/cc/intel.py index b916b659b..ca1796034 100644 --- a/qiling/cc/intel.py +++ b/qiling/cc/intel.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# +# # Cross Platform and Multi Architecture Advanced Binary Emulation Framework from unicorn.x86_const import ( @@ -8,7 +8,7 @@ UC_X86_REG_R10 ) -from qiling.cc import QlCommonBaseCC +from qiling.cc import QlCommonBaseCC, make_arg_list class QlIntelBaseCC(QlCommonBaseCC): """Calling convention base class for Intel-based systems. @@ -60,7 +60,7 @@ class amd64(QlIntel64): First 6 arguments are passed in regs, the rest are passed on the stack. """ - _argregs = (UC_X86_REG_RDI, UC_X86_REG_RSI, UC_X86_REG_RDX, UC_X86_REG_R10, UC_X86_REG_R8, UC_X86_REG_R9) + (None, ) * 10 + _argregs = make_arg_list(UC_X86_REG_RDI, UC_X86_REG_RSI, UC_X86_REG_RDX, UC_X86_REG_R10, UC_X86_REG_R8, UC_X86_REG_R9) class ms64(QlIntel64): """Default calling convention for Windows and UEFI (x86-64). @@ -70,7 +70,7 @@ class ms64(QlIntel64): to the first arguments passed in regs. """ - _argregs = (UC_X86_REG_RCX, UC_X86_REG_RDX, UC_X86_REG_R8, UC_X86_REG_R9) + (None, ) * 12 + _argregs = make_arg_list(UC_X86_REG_RCX, UC_X86_REG_RDX, UC_X86_REG_R8, UC_X86_REG_R9) _shadow = 4 class macosx64(QlIntel64): @@ -78,7 +78,7 @@ class macosx64(QlIntel64): First 6 arguments are passed in regs, the rest are passed on the stack. """ - _argregs = (UC_X86_REG_RDI, UC_X86_REG_RSI, UC_X86_REG_RDX, UC_X86_REG_RCX, UC_X86_REG_R8, UC_X86_REG_R9) + (None, ) * 10 + _argregs = make_arg_list(UC_X86_REG_RDI, UC_X86_REG_RSI, UC_X86_REG_RDX, UC_X86_REG_RCX, UC_X86_REG_R8, UC_X86_REG_R9) class cdecl(QlIntel32): """Calling convention used by all operating systems (x86). @@ -87,7 +87,7 @@ class cdecl(QlIntel32): The caller is resopnsible to unwind the stack. """ - _argregs = (None, ) * 16 + _argregs = make_arg_list() class stdcall(QlIntel32): """Calling convention used by all operating systems (x86). @@ -96,7 +96,7 @@ class stdcall(QlIntel32): The callee is resopnsible to unwind the stack. """ - _argregs = (None, ) * 16 + _argregs = make_arg_list() def unwind(self, nslots: int) -> int: retaddr = super().unwind(nslots) diff --git a/qiling/cc/mips.py b/qiling/cc/mips.py index edc5c7302..9ebf23375 100644 --- a/qiling/cc/mips.py +++ b/qiling/cc/mips.py @@ -1,14 +1,14 @@ #!/usr/bin/env python3 -# +# # Cross Platform and Multi Architecture Advanced Binary Emulation Framework from unicorn.mips_const import UC_MIPS_REG_V0, UC_MIPS_REG_A0, UC_MIPS_REG_A1, UC_MIPS_REG_A2, UC_MIPS_REG_A3 -from qiling.cc import QlCommonBaseCC +from qiling.cc import QlCommonBaseCC, make_arg_list class mipso32(QlCommonBaseCC): _retreg = UC_MIPS_REG_V0 - _argregs = (UC_MIPS_REG_A0, UC_MIPS_REG_A1, UC_MIPS_REG_A2, UC_MIPS_REG_A3) + (None, ) * 12 + _argregs = make_arg_list(UC_MIPS_REG_A0, UC_MIPS_REG_A1, UC_MIPS_REG_A2, UC_MIPS_REG_A3) _shadow = 4 _retaddr_on_stack = False diff --git a/qiling/cc/ppc.py b/qiling/cc/ppc.py index af3ceef5d..2440fab15 100644 --- a/qiling/cc/ppc.py +++ b/qiling/cc/ppc.py @@ -1,14 +1,14 @@ #!/usr/bin/env python3 -# +# # Cross Platform and Multi Architecture Advanced Binary Emulation Framework -from qiling.cc import QlCommonBaseCC - from unicorn.ppc_const import ( UC_PPC_REG_3, UC_PPC_REG_4, UC_PPC_REG_5, UC_PPC_REG_6, UC_PPC_REG_7, UC_PPC_REG_8, ) +from qiling.cc import QlCommonBaseCC, make_arg_list + class ppc(QlCommonBaseCC): """Default calling convention for PPC @@ -16,7 +16,7 @@ class ppc(QlCommonBaseCC): """ _retreg = UC_PPC_REG_3 - _argregs = (UC_PPC_REG_3, UC_PPC_REG_4, UC_PPC_REG_5, UC_PPC_REG_6, UC_PPC_REG_7, UC_PPC_REG_8) + (None, ) * 10 + _argregs = make_arg_list(UC_PPC_REG_3, UC_PPC_REG_4, UC_PPC_REG_5, UC_PPC_REG_6, UC_PPC_REG_7, UC_PPC_REG_8) @staticmethod def getNumSlots(argbits: int): diff --git a/qiling/cc/riscv.py b/qiling/cc/riscv.py index a864bfff3..3a360bd8d 100644 --- a/qiling/cc/riscv.py +++ b/qiling/cc/riscv.py @@ -1,21 +1,22 @@ #!/usr/bin/env python3 -# +# # Cross Platform and Multi Architecture Advanced Binary Emulation Framework -from qiling.cc import QlCommonBaseCC - from unicorn.riscv_const import ( UC_RISCV_REG_A0, UC_RISCV_REG_A1, UC_RISCV_REG_A2, UC_RISCV_REG_A3, UC_RISCV_REG_A4, UC_RISCV_REG_A5 ) +from qiling.cc import QlCommonBaseCC, make_arg_list + + class riscv(QlCommonBaseCC): """Default calling convention for RISCV First 6 arguments are passed in regs, the rest are passed on the stack. """ _retreg = UC_RISCV_REG_A0 - _argregs = (UC_RISCV_REG_A0, UC_RISCV_REG_A1, UC_RISCV_REG_A2, UC_RISCV_REG_A3, UC_RISCV_REG_A4, UC_RISCV_REG_A5) + (None, ) * 10 + _argregs = make_arg_list(UC_RISCV_REG_A0, UC_RISCV_REG_A1, UC_RISCV_REG_A2, UC_RISCV_REG_A3, UC_RISCV_REG_A4, UC_RISCV_REG_A5) @staticmethod def getNumSlots(argbits: int): diff --git a/qiling/core.py b/qiling/core.py index 6c15bf2e7..e7a5df004 100644 --- a/qiling/core.py +++ b/qiling/core.py @@ -628,6 +628,9 @@ def save(self, reg=True, mem=True, hw=False, fd=False, cpu_context=False, os=Fal if reg: saved_states["reg"] = self.arch.regs.save() + if self.arch.type in (QL_ARCH.ARM, QL_ARCH.ARM64): + saved_states["cpr"] = self.arch.cpr.save() + if mem: saved_states["mem"] = self.mem.save() @@ -679,6 +682,9 @@ def restore(self, saved_states: Mapping[str, Any] = {}, *, snapshot: Optional[st if "reg" in saved_states: self.arch.regs.restore(saved_states["reg"]) + if self.arch.type in (QL_ARCH.ARM, QL_ARCH.ARM64): + self.arch.cpr.restore(saved_states["cpr"]) + if "hw" in saved_states: self.hw.restore(saved_states['hw']) diff --git a/qiling/debugger/gdb/xmlregs.py b/qiling/debugger/gdb/xmlregs.py index 89b68964b..4749b2111 100644 --- a/qiling/debugger/gdb/xmlregs.py +++ b/qiling/debugger/gdb/xmlregs.py @@ -18,8 +18,7 @@ reg_map_v as arm64_regs_v ) from qiling.arch.mips_const import ( - reg_map as mips_regs_gpr, - reg_map_fpu as mips_regs_fpu + reg_map as mips_regs_gpr ) from qiling.arch.x86_const import ( reg_map_32 as x86_regs_32, @@ -135,7 +134,7 @@ def __load_regsmap(archtype: QL_ARCH, xmltree: ElementTree.ElementTree) -> Seque QL_ARCH.ARM: dict(**arm_regs, **arm_regs_vfp, **arm_regs_q, **arm_regs_s), QL_ARCH.CORTEX_M: arm_regs, QL_ARCH.ARM64: dict(**arm64_regs, **arm64_regs_v), - QL_ARCH.MIPS: dict(**mips_regs_gpr, **mips_regs_fpu) + QL_ARCH.MIPS: dict(**mips_regs_gpr) }[archtype] regsinfo = sorted(QlGdbFeatures.__walk_xml_regs(xmltree)) diff --git a/qiling/debugger/qdb/branch_predictor/__init__.py b/qiling/debugger/qdb/branch_predictor/__init__.py index 69e9e6d9f..5004ec348 100644 --- a/qiling/debugger/qdb/branch_predictor/__init__.py +++ b/qiling/debugger/qdb/branch_predictor/__init__.py @@ -3,6 +3,7 @@ # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # +from .branch_predictor import BranchPredictor from .branch_predictor_x86 import BranchPredictorX86 from .branch_predictor_mips import BranchPredictorMIPS from .branch_predictor_arm import BranchPredictorARM, BranchPredictorCORTEX_M diff --git a/qiling/debugger/qdb/branch_predictor/branch_predictor.py b/qiling/debugger/qdb/branch_predictor/branch_predictor.py index 4b3e891be..713661501 100644 --- a/qiling/debugger/qdb/branch_predictor/branch_predictor.py +++ b/qiling/debugger/qdb/branch_predictor/branch_predictor.py @@ -3,32 +3,28 @@ # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # - - +from abc import abstractmethod from ..context import Context -from ..misc import read_int -class BranchPredictor(Context): + +class Prophecy: """ - Base class for predictor + container for storing result of the predictor + @going: indicate the certian branch will be taken or not + @where: where will it go if going is true """ - class Prophecy: - """ - container for storing result of the predictor - @going: indicate the certian branch will be taken or not - @where: where will it go if going is true - """ + def __init__(self): + self.going = False + self.where = None - def __init__(self): - self.going = False - self.where = None + def __iter__(self): + return iter((self.going, self.where)) - def __iter__(self): - return iter((self.going, self.where)) - - def __init__(self, ql): - super().__init__(ql) +class BranchPredictor(Context): + """ + Base class for predictor + """ def read_reg(self, reg_name): """ @@ -37,9 +33,8 @@ def read_reg(self, reg_name): return self.ql.arch.regs.read(reg_name) - def predict(self): + @abstractmethod + def predict(self) -> Prophecy: """ Try to predict certian branch will be taken or not based on current context """ - - return NotImplementedError diff --git a/qiling/debugger/qdb/branch_predictor/branch_predictor_arm.py b/qiling/debugger/qdb/branch_predictor/branch_predictor_arm.py index 02712bd5a..bb5cd0f61 100644 --- a/qiling/debugger/qdb/branch_predictor/branch_predictor_arm.py +++ b/qiling/debugger/qdb/branch_predictor/branch_predictor_arm.py @@ -6,7 +6,10 @@ from .branch_predictor import * -from ..arch import ArchARM, ArchCORTEX_M +from ..arch import ArchARM +from ..misc import read_int + + class BranchPredictorARM(BranchPredictor, ArchARM): """ @@ -41,7 +44,7 @@ def get_cpsr(bits: int) -> (bool, bool, bool, bool): ) def predict(self, pref_addr=None): - prophecy = self.Prophecy() + prophecy = Prophecy() cur_addr = self.cur_addr if pref_addr is None else pref_addr line = self.disasm(cur_addr) @@ -244,8 +247,8 @@ def predict(self, pref_addr=None): _, r = line.op_str.split(", ") prophecy.where = self.read_reg(r) - if prophecy.where & 1: - prophecy.where -= 1 + if prophecy.where is not None: + prophecy.where &= ~0b1 return prophecy diff --git a/qiling/debugger/qdb/branch_predictor/branch_predictor_mips.py b/qiling/debugger/qdb/branch_predictor/branch_predictor_mips.py index 0dcc9467f..a111df8f6 100644 --- a/qiling/debugger/qdb/branch_predictor/branch_predictor_mips.py +++ b/qiling/debugger/qdb/branch_predictor/branch_predictor_mips.py @@ -39,7 +39,7 @@ def read_reg(self, reg_name): return self.signed_val(getattr(self.ql.arch.regs, reg_name)) def predict(self): - prophecy = self.Prophecy() + prophecy = Prophecy() line = self.disasm(self.cur_addr) if line.mnemonic == self.CODE_END: # indicates program extied diff --git a/qiling/debugger/qdb/branch_predictor/branch_predictor_x86.py b/qiling/debugger/qdb/branch_predictor/branch_predictor_x86.py index 656c64daa..dd1e34fee 100644 --- a/qiling/debugger/qdb/branch_predictor/branch_predictor_x86.py +++ b/qiling/debugger/qdb/branch_predictor/branch_predictor_x86.py @@ -5,7 +5,7 @@ -import ast, re +import re from .branch_predictor import * from ..arch import ArchX86 @@ -27,7 +27,7 @@ def __init__(self, ql): ArchX86.__init__(self) def predict(self): - prophecy = self.Prophecy() + prophecy = Prophecy() line = self.disasm(self.cur_addr) jump_table = { @@ -109,7 +109,7 @@ def predict(self): for each_reg in filter(lambda r: len(r) == 3, self.ql.arch.regs.register_mapping.keys()): if each_reg in new_line: new_line = re.sub(each_reg, hex(self.read_reg(each_reg)), new_line) - + for each_reg in filter(lambda r: len(r) == 2, self.ql.arch.regs.register_mapping.keys()): if each_reg in new_line: new_line = re.sub(each_reg, hex(self.read_reg(each_reg)), new_line) diff --git a/qiling/debugger/qdb/branch_predictor/branch_predictor_x8664.py b/qiling/debugger/qdb/branch_predictor/branch_predictor_x8664.py index eda3a5923..1350c9bb3 100644 --- a/qiling/debugger/qdb/branch_predictor/branch_predictor_x8664.py +++ b/qiling/debugger/qdb/branch_predictor/branch_predictor_x8664.py @@ -5,7 +5,7 @@ -import ast, re +import re from .branch_predictor import * from ..arch import ArchX8664 @@ -27,7 +27,7 @@ def __init__(self, ql): ArchX8664.__init__(self) def predict(self): - prophecy = self.Prophecy() + prophecy = Prophecy() line = self.disasm(self.cur_addr) jump_table = { @@ -109,7 +109,7 @@ def predict(self): for each_reg in filter(lambda r: len(r) == 3, self.ql.arch.regs.register_mapping.keys()): if each_reg in new_line: new_line = re.sub(each_reg, hex(self.read_reg(each_reg)), new_line) - + for each_reg in filter(lambda r: len(r) == 2, self.ql.arch.regs.register_mapping.keys()): if each_reg in new_line: new_line = re.sub(each_reg, hex(self.read_reg(each_reg)), new_line) diff --git a/qiling/debugger/qdb/qdb.py b/qiling/debugger/qdb/qdb.py index d4ba75ca3..fe4a68d61 100644 --- a/qiling/debugger/qdb/qdb.py +++ b/qiling/debugger/qdb/qdb.py @@ -5,11 +5,11 @@ import cmd -from typing import Optional, Tuple, Union, List +from typing import Callable, Optional, Tuple, Union, List from contextlib import contextmanager from qiling import Qiling -from qiling.const import QL_OS, QL_ARCH, QL_ENDIAN, QL_STATE, QL_VERBOSE +from qiling.const import QL_OS, QL_ARCH, QL_ENDIAN, QL_VERBOSE from qiling.debugger import QlDebugger from .utils import setup_context_render, setup_branch_predictor, setup_address_marker, SnapshotManager, run_qdb_script @@ -20,6 +20,30 @@ from .utils import QDB_MSG, qdb_print +def save_reg_dump(func: Callable) -> Callable[..., None]: + """Decorator for saving registers dump. + """ + + def inner(self: 'QlQdb', *args, **kwargs) -> None: + self._saved_reg_dump = dict(filter(lambda d: isinstance(d[0], str), self.ql.arch.regs.save().items())) + + func(self, *args, **kwargs) + + return inner + +def check_ql_alive(func: Callable) -> Callable[..., None]: + """Decorator for checking whether ql instance is alive. + """ + + def inner(self: 'QlQdb', *args, **kwargs) -> None: + if self.ql is None: + qdb_print(QDB_MSG.ERROR, "The program is not being run.") + else: + func(self, *args, **kwargs) + + return inner + + class QlQdb(cmd.Cmd, QlDebugger): """ The built-in debugger of Qiling Framework @@ -160,30 +184,6 @@ def _save(self, reg=True, mem=True, hw=False, fd=False, cpu_context=False, os=Fa yield self self.ql.restore(saved_states) - def save_reg_dump(func) -> None: - """ - decorator function for saving register dump - """ - - def inner(self, *args, **kwargs): - self._saved_reg_dump = dict(filter(lambda d: isinstance(d[0], str), self.ql.arch.regs.save().items())) - func(self, *args, **kwargs) - - return inner - - def check_ql_alive(func) -> None: - """ - decorator function for checking ql instance is alive - """ - - def inner(self, *args, **kwargs): - if self.ql is None: - qdb_print(QDB_MSG.ERROR, "The program is not being run.") - else: - func(self, *args, **kwargs) - - return inner - def parseline(self, line: str) -> Tuple[Optional[str], Optional[str], str]: """ Parse the line into a command name and a string containing @@ -269,17 +269,17 @@ def do_step_over(self, *args) -> Optional[bool]: prophecy = self.predictor.predict() if prophecy.going: + self.set_breakpoint(prophecy.where, is_temp=True) + + else: cur_insn = self.predictor.disasm(self.cur_addr) bp_addr = self.cur_addr + cur_insn.size - if self.ql.arch.type == QL_ARCH.MIPS: + if self.ql.arch.type is QL_ARCH.MIPS: bp_addr += cur_insn.size self.set_breakpoint(bp_addr, is_temp=True) - else: - self.set_breakpoint(prophecy.where, is_temp=True) - self._run() @SnapshotManager.snapshot diff --git a/qiling/debugger/qdb/render/__init__.py b/qiling/debugger/qdb/render/__init__.py index 78acc1646..1625a52ae 100644 --- a/qiling/debugger/qdb/render/__init__.py +++ b/qiling/debugger/qdb/render/__init__.py @@ -3,7 +3,8 @@ # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # +from .render import ContextRender from .render_x86 import ContextRenderX86 from .render_mips import ContextRenderMIPS from .render_arm import ContextRenderARM, ContextRenderCORTEX_M -from .render_x8664 import ContextRenderX8664 \ No newline at end of file +from .render_x8664 import ContextRenderX8664 diff --git a/qiling/debugger/qdb/utils.py b/qiling/debugger/qdb/utils.py index 85189bfed..c5f0d4456 100644 --- a/qiling/debugger/qdb/utils.py +++ b/qiling/debugger/qdb/utils.py @@ -4,34 +4,37 @@ # from __future__ import annotations -from typing import Callable, Optional, Mapping, Tuple +from typing import TYPE_CHECKING, Callable, Dict, Mapping, Tuple, Type from capstone import CsInsn -from qiling import Qiling from qiling.const import QL_ARCH -from .context import Context - from .render import ( - ContextRenderX86, - ContextRenderX8664, - ContextRenderARM, - ContextRenderCORTEX_M, - ContextRenderMIPS - ) + ContextRender, + ContextRenderX86, + ContextRenderX8664, + ContextRenderARM, + ContextRenderCORTEX_M, + ContextRenderMIPS +) from .branch_predictor import ( - BranchPredictorX86, - BranchPredictorX8664, - BranchPredictorARM, - BranchPredictorCORTEX_M, - BranchPredictorMIPS, - ) + BranchPredictor, + BranchPredictorX86, + BranchPredictorX8664, + BranchPredictorARM, + BranchPredictorCORTEX_M, + BranchPredictorMIPS, +) from .const import color, QDB_MSG - + +if TYPE_CHECKING: + from qiling import Qiling + from .qdb import QlQdb + def qdb_print(msgtype: QDB_MSG, msg: str) -> None: """ @@ -51,15 +54,13 @@ def print_info(msg): print(color_coated) -""" - - class Marker provide the ability for marking an address as a more easier rememberable alias - -""" def setup_address_marker(): class Marker: + """provide the ability to mark an address as a more easier rememberable alias + """ + def __init__(self): self._mark_list = {} @@ -111,43 +112,45 @@ def mark(self, sym: str, loc: int): return Marker() -""" - helper functions for setting proper branch predictor and context render depending on different arch +# helper functions for setting proper branch predictor and context render depending on different arch +def setup_branch_predictor(ql: Qiling) -> BranchPredictor: + """Setup BranchPredictor according to arch. + """ -""" + preds: Dict[QL_ARCH, Type[BranchPredictor]] = { + QL_ARCH.X86: BranchPredictorX86, + QL_ARCH.X8664: BranchPredictorX8664, + QL_ARCH.ARM: BranchPredictorARM, + QL_ARCH.CORTEX_M: BranchPredictorCORTEX_M, + QL_ARCH.MIPS: BranchPredictorMIPS + } -def setup_branch_predictor(ql): - """ - setup BranchPredictor correspondingly - """ + p = preds[ql.arch.type] - return { - QL_ARCH.X86: BranchPredictorX86, - QL_ARCH.X8664: BranchPredictorX8664, - QL_ARCH.ARM: BranchPredictorARM, - QL_ARCH.CORTEX_M: BranchPredictorCORTEX_M, - QL_ARCH.MIPS: BranchPredictorMIPS, - }.get(ql.arch.type)(ql) + return p(ql) -def setup_context_render(ql, predictor): - """ - setup context render correspondingly +def setup_context_render(ql: Qiling, predictor: BranchPredictor) -> ContextRender: + """Setup context render according to arch. """ - return { - QL_ARCH.X86: ContextRenderX86, - QL_ARCH.X8664: ContextRenderX8664, - QL_ARCH.ARM: ContextRenderARM, - QL_ARCH.CORTEX_M: ContextRenderCORTEX_M, - QL_ARCH.MIPS: ContextRenderMIPS, - }.get(ql.arch.type)(ql, predictor) + rends: Dict[QL_ARCH, Type[ContextRender]] = { + QL_ARCH.X86: ContextRenderX86, + QL_ARCH.X8664: ContextRenderX8664, + QL_ARCH.ARM: ContextRenderARM, + QL_ARCH.CORTEX_M: ContextRenderCORTEX_M, + QL_ARCH.MIPS: ContextRenderMIPS + } + + r = rends[ql.arch.type] -def run_qdb_script(qdb, filename: str) -> None: + return r(ql, predictor) + +def run_qdb_script(qdb: QlQdb, filename: str) -> None: with open(filename) as fd: for line in iter(fd.readline, ""): - # skip commented and empty line + # skip commented and empty line if line.startswith("#") or line == "\n": continue @@ -159,17 +162,12 @@ def run_qdb_script(qdb, filename: str) -> None: func() -""" +class SnapshotManager: + """for functioning differential snapshot - For supporting Qdb features like: + Supports Qdb features like: 1. record/replay debugging 2. memory access in gdb-style - -""" - -class SnapshotManager: - """ - for functioning differential snapshot """ class State: @@ -178,7 +176,7 @@ class State: """ def __init__(self, saved_state): - self.reg, self.ram = SnapshotManager.transform(saved_state) + self.reg, self.ram, self.xreg = SnapshotManager.transform(saved_state) class DiffedState: """ @@ -186,7 +184,7 @@ class DiffedState: """ def __init__(self, diffed_st): - self.reg, self.ram = diffed_st + self.reg, self.ram, self.xreg = diffed_st @staticmethod def transform(st): @@ -194,18 +192,17 @@ def transform(st): transform saved context into binary set """ - reg = st["reg"] if "reg" in st else st[0] - - if "mem" not in st: - return (reg, st[1]) + reg = st.get("reg", {}) + mem = st.get("mem", []) + xreg = st.get("cpr") or st.get("msr") or {} ram = [] - for mem_seg in st["mem"]["ram"]: + for mem_seg in mem["ram"]: lbound, ubound, perms, label, raw_bytes = mem_seg rb_set = {(idx, val) for idx, val in enumerate(raw_bytes)} ram.append((lbound, ubound, perms, label, rb_set)) - return (reg, ram) + return (reg, ram, xreg) def __init__(self, ql): self.ql = ql @@ -258,16 +255,17 @@ def diff(self, before_st, after_st): # prev_st = self.layers.pop() diffed_reg = self.diff_reg(before_st.reg, after_st.reg) diffed_ram = self.diff_ram(before_st.ram, after_st.ram) + diffed_xreg = self.diff_reg(before_st.xreg, after_st.xreg) # diffed_reg = self.diff_reg(prev_st.reg, cur_st.reg) # diffed_ram = self.diff_ram(prev_st.ram, cur_st.ram) - return self.DiffedState((diffed_reg, diffed_ram)) + return self.DiffedState((diffed_reg, diffed_ram, diffed_xreg)) def snapshot(func): """ decorator function for saving differential context on certian qdb command """ - def magic(self, *args, **kwargs): + def magic(self: QlQdb, *args, **kwargs): if self.rr: # save State before execution p_st = self.rr._save() @@ -297,8 +295,25 @@ def restore(self): for reg_name, reg_value in prev_st.reg.items(): cur_st.reg[reg_name] = reg_value - to_be_restored = {"reg": cur_st.reg} - + for reg_name, reg_value in prev_st.xreg.items(): + cur_st.xreg[reg_name] = reg_value + + to_be_restored = { + "reg": cur_st.reg, + + # though we have arch-specific context to restore, we want to keep this arch-agnostic. + # one way to work around that is to include 'xreg' both as msr (intel) and cpr (arm). + # only the relevant one will be picked up while the other one will be discarded + "msr": cur_st.xreg, + "cpr": cur_st.xreg + } + + # FIXME: not sure how this one even works. while curr_st is a fresh qiling snapshot, + # prev_st is a DiffedState which does not hold a complete state but only a diff between + # two points which seem to be unrelated here. + # + # this code only patches current memory content with the diff between points a and b while + # we may be already be at point c. if getattr(prev_st, "ram", None) and prev_st.ram != cur_st.ram: ram = [] @@ -314,12 +329,9 @@ def restore(self): bs = bytes(dict(sorted(cur_rb_dict.items())).values()) ram.append((*cur_others, bs)) - to_be_restored.update({"mem": {"ram": ram, "mmio": {}}}) + to_be_restored["mem"] = { + "ram": ram, + "mmio": {} + } self.ql.restore(to_be_restored) - - - - -if __name__ == "__main__": - pass diff --git a/qiling/extensions/multitask.py b/qiling/extensions/multitask.py index 87915f1b8..dd8cbe0ee 100644 --- a/qiling/extensions/multitask.py +++ b/qiling/extensions/multitask.py @@ -148,6 +148,7 @@ def __exit__(self, *args, **kwargs): # # Bear in mind that only one task can be picked to emulate at # the same time. +@ucsubclass class MultiTaskUnicorn(Uc): def __init__(self, arch: int, mode: int, cpu: Optional[int], interval: Optional[int] = 100): diff --git a/qiling/loader/elf.py b/qiling/loader/elf.py index 1a00bcec7..076cb8f0b 100644 --- a/qiling/loader/elf.py +++ b/qiling/loader/elf.py @@ -325,39 +325,44 @@ def __push_str(top: int, s: str) -> int: elf_phent = elffile['e_phentsize'] elf_phnum = elffile['e_phnum'] - if self.ql.arch.bits == 64: - elf_hwcap = 0x078bfbfd - elif self.ql.arch.bits == 32: - elf_hwcap = 0x1fb8d7 + # for more details on the following values see: + # https://github.com/google/cpu_features/blob/main/include/internal/hwcaps.h + hwcap_values = { + (QL_ARCH.ARM, QL_ENDIAN.EL, 32): 0x001fb8d7, + (QL_ARCH.ARM, QL_ENDIAN.EB, 32): 0xd7b81f00, + (QL_ARCH.ARM64, QL_ENDIAN.EL, 64): 0x078bfbfd + } - if self.ql.arch.endian == QL_ENDIAN.EB: - # FIXME: considering this is a 32 bits value, it is not a big-endian version of the - # value above like it is meant to be, since the one above has an implied leading zero - # byte (i.e. 0x001fb8d7) which the EB value didn't take into account - elf_hwcap = 0xd7b81f + # determine hwcap value by arch properties; if not found default to 0 + hwcap = hwcap_values.get((self.ql.arch.type, self.ql.arch.endian, self.ql.arch.bits), 0) # setup aux vector - auxv_entries = ( - (AUXV.AT_HWCAP, elf_hwcap), - (AUXV.AT_PAGESZ, self.ql.mem.pagesize), - (AUXV.AT_CLKTCK, 100), + auxv_entries = [ (AUXV.AT_PHDR, elf_phdr), (AUXV.AT_PHENT, elf_phent), (AUXV.AT_PHNUM, elf_phnum), - (AUXV.AT_BASE, interp_address), + (AUXV.AT_PAGESZ, self.ql.mem.pagesize), + ] + + if interp_path: + auxv_entries.append((AUXV.AT_BASE, interp_address)) + + auxv_entries.extend([ (AUXV.AT_FLAGS, 0), (AUXV.AT_ENTRY, self.elf_entry), (AUXV.AT_UID, self.ql.os.uid), (AUXV.AT_EUID, self.ql.os.euid), (AUXV.AT_GID, self.ql.os.gid), (AUXV.AT_EGID, self.ql.os.egid), + (AUXV.AT_CLKTCK, 100), + (AUXV.AT_PLATFORM, cpustraddr), + (AUXV.AT_HWCAP, hwcap), (AUXV.AT_SECURE, 0), (AUXV.AT_RANDOM, randstraddr), (AUXV.AT_HWCAP2, 0), (AUXV.AT_EXECFN, execfn), - (AUXV.AT_PLATFORM, cpustraddr), (AUXV.AT_NULL, 0) - ) + ]) bytes_before_auxv = len(elf_table) @@ -366,7 +371,13 @@ def __push_str(top: int, s: str) -> int: elf_table.extend(self.ql.pack(key)) elf_table.extend(self.ql.pack(val)) - new_stack = self.ql.mem.align(new_stack - len(elf_table), 0x10) + sp_align = self.ql.arch.pointersize + + # mips requires doubleword alignment + if self.ql.arch.type is QL_ARCH.MIPS: + sp_align *= 2 + + new_stack = self.ql.mem.align(new_stack - len(elf_table), sp_align) self.ql.mem.write(new_stack, bytes(elf_table)) self.auxv = new_stack + bytes_before_auxv diff --git a/qiling/log.py b/qiling/log.py index d83854999..2735b8b77 100644 --- a/qiling/log.py +++ b/qiling/log.py @@ -9,7 +9,6 @@ import logging import os import re -import sys import weakref from typing import TYPE_CHECKING, Collection, IO, Optional, Protocol, Union, runtime_checkable diff --git a/qiling/os/freebsd/freebsd.py b/qiling/os/freebsd/freebsd.py index 5bcb920d0..0ff495200 100644 --- a/qiling/os/freebsd/freebsd.py +++ b/qiling/os/freebsd/freebsd.py @@ -6,8 +6,10 @@ from unicorn import UcError from unicorn.x86_const import UC_X86_INS_SYSCALL -from qiling.arch.x86_utils import GDTManager, SegmentManager86 +from qiling.arch.x86_utils import GDTManager, SegmentManager64 +from qiling.cc import intel from qiling.const import QL_OS +from qiling.os.fcall import QlFunctionCall from qiling.os.posix.posix import QlOsPosix @@ -15,7 +17,9 @@ class QlOsFreebsd(QlOsPosix): type = QL_OS.FREEBSD def __init__(self, ql): - super(QlOsFreebsd, self).__init__(ql) + super().__init__(ql) + + self.fcall = QlFunctionCall(ql, intel.amd64(ql.arch)) self.load() @@ -23,7 +27,7 @@ def load(self): gdtm = GDTManager(self.ql) # setup gdt and segments selectors - segm = SegmentManager86(self.ql.arch, gdtm) + segm = SegmentManager64(self.ql.arch, gdtm) segm.setup_cs_ds_ss_es(0, 4 << 30) self.ql.hook_insn(self.hook_syscall, UC_X86_INS_SYSCALL) diff --git a/qiling/os/freebsd/syscall.py b/qiling/os/freebsd/syscall.py index bb1c563bd..fc03d524a 100644 --- a/qiling/os/freebsd/syscall.py +++ b/qiling/os/freebsd/syscall.py @@ -85,9 +85,9 @@ def ql_syscall___sysctl(ql: Qiling, name: int, namelen: int, old: int, oldlenp: ql.log.warning("Unknown oid name!") for i, v in enumerate(out_vecs): - ql.mem.write_ptr(old + i * 4, v, 32, signed=True) + ql.mem.write_ptr(old + i * 4, v, 4, signed=True) - ql.mem.write_ptr(oldlenp, out_len, 32, signed=True) + ql.mem.write_ptr(oldlenp, out_len, 4, signed=True) return -ENOENT @@ -97,7 +97,7 @@ def ql_syscall___sysctl(ql: Qiling, name: int, namelen: int, old: int, oldlenp: return -1 # Ignore oldlenp check. - ql.mem.write_ptr(old, FREEBSD_OSRELDATE, 32, signed=True) + ql.mem.write_ptr(old, FREEBSD_OSRELDATE, 4, signed=True) return 0 return 0 diff --git a/qiling/os/linux/syscall.py b/qiling/os/linux/syscall.py index 6092eefd0..d97de91bf 100644 --- a/qiling/os/linux/syscall.py +++ b/qiling/os/linux/syscall.py @@ -68,11 +68,12 @@ def ql_syscall_set_thread_area(ql: Qiling, u_info_addr: int): def ql_syscall_set_tls(ql: Qiling, address: int): - if ql.arch.type == QL_ARCH.ARM: - ql.arch.regs.c13_c0_3 = address + if ql.arch.type is QL_ARCH.ARM: + ql.arch.cpr.TPIDRURO = address ql.mem.write_ptr(ql.arch.arm_get_tls_addr + 16, address, 4) ql.arch.regs.r0 = address - ql.log.debug("settls(0x%x)" % address) + + ql.log.debug("settls(%#x)", address) def ql_syscall_clock_gettime(ql: Qiling, clock_id: int, tp: int): ts_obj = __get_timespec_obj(ql.arch.bits) diff --git a/qiling/os/linux/thread.py b/qiling/os/linux/thread.py index a4162743d..27efa73f3 100644 --- a/qiling/os/linux/thread.py +++ b/qiling/os/linux/thread.py @@ -3,7 +3,8 @@ # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # -import gevent, os +import os +import gevent from typing import Callable, Sequence from abc import abstractmethod @@ -239,7 +240,7 @@ def _run(self): self.ql.log.debug(f"Scheduled from {hex(start_address)}.") try: # Known issue for timeout: https://github.com/unicorn-engine/unicorn/issues/1355 - self.ql.emu_start(start_address, self.exit_point, count=31337) + self.ql.emu_start(start_address, self.exit_point, count=32337) except UcError as e: self.ql.os.emu_error() self.ql.log.exception("") @@ -473,56 +474,64 @@ def clone(self): class QlLinuxARMThread(QlLinuxThread): """docstring for QlLinuxARMThread""" def __init__(self, ql, start_address, exit_point, context = None, set_child_tid_addr = None, thread_id = None): - super(QlLinuxARMThread, self).__init__(ql, start_address, exit_point, context, set_child_tid_addr, thread_id) - self.tls = 0 + super().__init__(ql, start_address, exit_point, context, set_child_tid_addr, thread_id) + self.tls = 0 - def set_thread_tls(self, tls_addr): + def set_thread_tls(self, tls_addr: int) -> None: self.tls = tls_addr - self.ql.arch.regs.c13_c0_3 = self.tls - self.ql.log.debug(f"Set c13_c0_3 to {hex(self.ql.arch.regs.c13_c0_3)}") + self.ql.arch.cpr.TPIDRURO = self.tls - def save(self): + self.ql.log.debug(f"Setting TPIDRURO to {self.tls:#010x}") + + def save(self) -> None: self.save_context() - self.tls = self.ql.arch.regs.c13_c0_3 - self.ql.log.debug(f"Saved context. c13_c0_3={hex(self.ql.arch.regs.c13_c0_3)}") + self.tls = self.ql.arch.cpr.TPIDRURO + self.ql.log.debug(f"Context saved. TPIDRURO = {self.tls:#010x}") - def restore(self): + def restore(self) -> None: self.restore_context() self.set_thread_tls(self.tls) - self.ql.log.debug(f"Restored context. c13_c0_3={hex(self.ql.arch.regs.c13_c0_3)}") + + self.ql.log.debug(f"Context restored. TPIDRURO = {self.ql.arch.cpr.TPIDRURO:#010x}") def clone(self): - new_thread = super(QlLinuxARMThread, self).clone() + new_thread = super().clone() new_thread.tls = self.tls + return new_thread class QlLinuxARM64Thread(QlLinuxThread): """docstring for QlLinuxARM64Thread""" def __init__(self, ql, start_address, exit_point, context = None, set_child_tid_addr = None, thread_id = None): - super(QlLinuxARM64Thread, self).__init__(ql, start_address, exit_point, context, set_child_tid_addr, thread_id) + super().__init__(ql, start_address, exit_point, context, set_child_tid_addr, thread_id) + self.tls = 0 - def set_thread_tls(self, tls_addr): + def set_thread_tls(self, tls_addr: int) -> None: self.tls = tls_addr - self.ql.arch.regs.tpidr_el0 = self.tls - self.ql.log.debug(f"Set tpidr_el0 to {hex(self.ql.arch.regs.tpidr_el0)}") + self.ql.arch.cpr.TPIDR_EL0 = self.tls - def save(self): + self.ql.log.debug(f"Setting TPIDR_EL0 to {self.tls:#010x}") + + def save(self) -> None: self.save_context() - self.tls = self.ql.arch.regs.tpidr_el0 - self.ql.log.debug(f"Saved context. tpidr_el0={hex(self.ql.arch.regs.tpidr_el0)}") + self.tls = self.ql.arch.cpr.TPIDR_EL0 - def restore(self): + self.ql.log.debug(f"Context saved. TPIDR_EL0 = {self.tls:#010x}") + + def restore(self) -> None: self.restore_context() self.set_thread_tls(self.tls) - self.ql.log.debug(f"Restored context. tpidr_el0={hex(self.ql.arch.regs.tpidr_el0)}") + + self.ql.log.debug(f"Context restored. TPIDR_EL0 = {self.ql.arch.cpr.TPIDR_EL0:#010x}") def clone(self): - new_thread = super(QlLinuxARM64Thread, self).clone() + new_thread = super().clone() new_thread.tls = self.tls + return new_thread class QlLinuxThreadManagement: diff --git a/qiling/os/os.py b/qiling/os/os.py index 9108f7024..636e089c4 100644 --- a/qiling/os/os.py +++ b/qiling/os/os.py @@ -246,10 +246,15 @@ def stop(self): self.ql.emu_stop() def emu_error(self): - self.ql.log.error(f'CPU Context:') + self.ql.log.error('CPU Context:') + for reg in self.ql.arch.regs.register_mapping: - if isinstance(reg, str): - self.ql.log.error(f'{reg}\t: {self.ql.arch.regs.read(reg):#x}') + try: + value = f'{self.ql.arch.regs.read(reg):#x}' + except UcError: + value = 'n/a' + + self.ql.log.error(f'{reg}\t: {value}') pc = self.ql.arch.regs.arch_pc diff --git a/qiling/profiles/linux.ql b/qiling/profiles/linux.ql index 3aae7fcd9..eac82348b 100644 --- a/qiling/profiles/linux.ql +++ b/qiling/profiles/linux.ql @@ -18,7 +18,8 @@ stack_address = 0x7ff0d000 stack_size = 0x30000 load_address = 0x56555000 interp_address = 0x047ba000 -mmap_address = 0x90000000 +# used to be 0x90000000, but changed to comply with MIPS reserved areas +mmap_address = 0x01000000 [KERNEL] diff --git a/tests/test_android.py b/tests/test_android.py index 1556bc02d..89e13a6f3 100644 --- a/tests/test_android.py +++ b/tests/test_android.py @@ -8,6 +8,7 @@ sys.path.append("..") from qiling import Qiling +from qiling.const import QL_VERBOSE from qiling.os.mapper import QlFsMappedObject from qiling.os.posix import syscall @@ -53,7 +54,7 @@ def test_android_arm64(self): 'ANDROID_ROOT': r'/system' } - ql = Qiling([test_binary], rootfs, env, profile={'OS64': OVERRIDES}, multithread=True) + ql = Qiling([test_binary], rootfs, env, profile={'OS64': OVERRIDES}, verbose=QL_VERBOSE.DEBUG, multithread=True) ql.os.set_syscall("close", my_syscall_close) ql.add_fs_mapper("/proc/self/task/2000/maps", Fake_maps(ql)) @@ -70,7 +71,7 @@ def test_android_arm(self): 'ANDROID_ROOT': r'/system' } - ql = Qiling([test_binary], rootfs, env, profile={'OS32': OVERRIDES}, multithread=True) + ql = Qiling([test_binary], rootfs, env, profile={'OS32': OVERRIDES}, verbose=QL_VERBOSE.DEBUG, multithread=True) ql.os.set_syscall("close", my_syscall_close) ql.add_fs_mapper("/proc/self/task/2000/maps", Fake_maps(ql)) diff --git a/tests/test_elf_multithread.py b/tests/test_elf_multithread.py index be48773f1..9b60e8d17 100644 --- a/tests/test_elf_multithread.py +++ b/tests/test_elf_multithread.py @@ -6,6 +6,7 @@ import http.client import platform import re +import socket import sys import os import threading @@ -16,8 +17,8 @@ sys.path.append("..") from qiling import Qiling -from qiling.const import * -from qiling.exception import * +from qiling.arch.models import X86_CPU_MODEL +from qiling.const import QL_VERBOSE, QL_INTERCEPT from qiling.os.filestruct import ql_file from qiling.os.stats import QlOsNullStats @@ -76,7 +77,7 @@ def check_write(ql: Qiling, fd: int, write_buf, count: int): logged.extend(content.decode().splitlines()) - ql = Qiling([fr'{X86_LINUX_ROOTFS}/bin/x86_multithreading'], X86_LINUX_ROOTFS, multithread=True, verbose=QL_VERBOSE.DEBUG) + ql = Qiling([fr'{X86_LINUX_ROOTFS}/bin/x86_multithreading'], X86_LINUX_ROOTFS, cputype=X86_CPU_MODEL.INTEL_HASWELL, multithread=True, verbose=QL_VERBOSE.DEBUG) ql.os.stats = QlOsNullStats() ql.os.set_syscall("write", check_write, QL_INTERCEPT.ENTER) @@ -114,7 +115,7 @@ def check_write(ql: Qiling, fd: int, write_buf, count: int): logged.extend(content.decode().splitlines()) - ql = Qiling([fr'{X64_LINUX_ROOTFS}/bin/x8664_multithreading'], X64_LINUX_ROOTFS, multithread=True, verbose=QL_VERBOSE.DEBUG) + ql = Qiling([fr'{X64_LINUX_ROOTFS}/bin/x8664_multithreading'], X64_LINUX_ROOTFS, cputype=X86_CPU_MODEL.INTEL_HASWELL, multithread=True, verbose=QL_VERBOSE.DEBUG) ql.os.stats = QlOsNullStats() ql.os.set_syscall("write", check_write, QL_INTERCEPT.ENTER) @@ -557,6 +558,26 @@ def picohttpd(): conn = http.client.HTTPConnection('localhost', PORT, timeout=10) conn.request('GET', '/') + # libc uses statx to query stdout stats, but fails because 'stdout' is not a valid path + # on the hosting paltform. it prints out the "Server started" message, but stdout is not + # found and the message is kept buffered in. + # + # later on, picohttpd dups the client socket into stdout fd and uses ordinary printf to + # send data out. however, when the "successful" message is sent, it is sent along with + # the buffered message, which arrives first and raises a http.client.BadStatusLine exception + # as it reads as a malformed http response. + # + # here we first peek at the incoming buffer to see whether we got a surplus prefix message. + # in case we have, we discard it first and only then read the http response + + # look for the http response header + incoming = conn.sock.recv(96, socket.MSG_PEEK) + surplus = incoming.find(b'HTTP/1.1') + + # if incoming buffer has a surplus prefix, discard it + if surplus > 0: + conn.sock.recv(surplus) + response = conn.getresponse() feedback = response.read() self.assertEqual('httpd_test_successful', feedback.decode()) @@ -576,6 +597,14 @@ def picohttpd(): conn = http.client.HTTPConnection('localhost', PORT, timeout=10) conn.request('GET', '/') + # look for the http response header + incoming = conn.sock.recv(96, socket.MSG_PEEK) + surplus = incoming.find(b'HTTP/1.1') + + # if incoming buffer has a surplus prefix, discard it + if surplus > 0: + conn.sock.recv(surplus) + response = conn.getresponse() feedback = response.read() self.assertEqual('httpd_test_successful', feedback.decode()) @@ -592,22 +621,20 @@ def picohttpd(): time.sleep(1) - # armeb libc uses statx to query stdout stats, but fails because 'stdout' is not a valid - # path on the hosting paltform. it prints out the "Server started" message, but stdout is - # not found and the message is kept buffered in. - # - # later on, picohttpd dups the client socket into stdout fd and uses ordinary printf to - # send data out. however, when the "successful" message is sent, it is sent along with - # the buffered message, which arrives first and raises a http.client.BadStatusLine exception - # as it reads as a malformed http response. - # - # here we use a raw 'recv' method instead of 'getresponse' to work around that. - conn = http.client.HTTPConnection('localhost', PORT, timeout=10) conn.request('GET', '/') - feedback = conn.sock.recv(96).decode() - self.assertTrue(feedback.endswith('httpd_test_successful')) + # look for the http response header + incoming = conn.sock.recv(96, socket.MSG_PEEK) + surplus = incoming.find(b'HTTP/1.1') + + # if incoming buffer has a surplus prefix, discard it + if surplus > 0: + conn.sock.recv(surplus) + + response = conn.getresponse() + feedback = response.read() + self.assertEqual('httpd_test_successful', feedback.decode()) if __name__ == "__main__": diff --git a/tests/test_macho.py b/tests/test_macho.py index d002209ee..191e01d1c 100644 --- a/tests/test_macho.py +++ b/tests/test_macho.py @@ -6,19 +6,24 @@ import unittest import sys + sys.path.append("..") from qiling import Qiling +from qiling.arch.models import X86_CPU_MODEL from qiling.const import QL_VERBOSE +ROOTFS = r'../examples/rootfs/x8664_macos' + + class MACHOTest(unittest.TestCase): def test_macho_macos_x8664(self): - ql = Qiling(["../examples/rootfs/x8664_macos/bin/x8664_hello"], "../examples/rootfs/x8664_macos", verbose=QL_VERBOSE.DEBUG) + ql = Qiling([fr'{ROOTFS}/bin/x8664_hello'], ROOTFS, cputype=X86_CPU_MODEL.INTEL_HASWELL, verbose=QL_VERBOSE.DEBUG) ql.run() def test_usercorn_x8664(self): - ql = Qiling(["../examples/rootfs/x8664_macos/bin/x8664_hello_usercorn"], "../examples/rootfs/x8664_macos", verbose=QL_VERBOSE.DEBUG) + ql = Qiling([fr'{ROOTFS}/bin/x8664_hello_usercorn'], ROOTFS, cputype=X86_CPU_MODEL.INTEL_HASWELL, verbose=QL_VERBOSE.DEBUG) ql.run() diff --git a/tests/test_tendaac15_httpd.py b/tests/test_tendaac15_httpd.py index cbc7ac821..9bc7bb253 100644 --- a/tests/test_tendaac15_httpd.py +++ b/tests/test_tendaac15_httpd.py @@ -47,22 +47,19 @@ def nvram_listener(): sock.listen(1) data = bytearray() + connection, _ = sock.accept() + try: + while True: + data += connection.recv(1024) - while True: - connection, _ = sock.accept() - - try: - while True: - data += connection.recv(1024) - - if b"lan.webiplansslen" in data: - connection.send(b'192.168.170.169') - else: - break + if b"lan.webiplansslen" in data: + connection.send(b'192.168.170.169') + else: + break - data.clear() - finally: - connection.close() + data.clear() + finally: + connection.close() def patcher(ql: Qiling): br0_addr = ql.mem.search(b'br0\x00') @@ -74,7 +71,7 @@ def my_tenda(): ql = Qiling(["../examples/rootfs/arm_tendaac15/bin/httpd"], "../examples/rootfs/arm_tendaac15", verbose=QL_VERBOSE.DEBUG) ql.add_fs_mapper("/dev/urandom", "/dev/urandom") ql.hook_address(patcher, ql.loader.elf_entry) - ql.run() + ql.run(count=825000) if __name__ == "__main__": threads = [ @@ -103,7 +100,9 @@ def my_tenda(): response = conn.getresponse() self.assertIn(b"Please update your documents to reflect the new location.", response.read()) - + for th in threads: + th.join(timeout=20.0) + self.assertFalse(th.is_alive()) if __name__ == "__main__": unittest.main()