From c197553841c7f5faa471f0c9953c29362d321221 Mon Sep 17 00:00:00 2001 From: elicn Date: Sun, 17 Jul 2022 15:06:53 +0300 Subject: [PATCH] Fix examples --- examples/crackme_x86_windows.py | 2 +- examples/crackme_x86_windows_setcallback.py | 12 ++++- examples/crackme_x86_windows_unpatch.py | 9 +++- examples/hello_arm_qnx_customapi.py | 5 +- examples/hello_arm_uboot.py | 12 +++-- examples/hello_linuxx8664_intercept.py | 17 ++++--- examples/uefi_sanitized_heap.py | 1 + qiling/extensions/sanitizers/heap.py | 55 ++++++++++++--------- 8 files changed, 76 insertions(+), 37 deletions(-) diff --git a/examples/crackme_x86_windows.py b/examples/crackme_x86_windows.py index c47da07eb..f20eef8c1 100644 --- a/examples/crackme_x86_windows.py +++ b/examples/crackme_x86_windows.py @@ -15,7 +15,7 @@ class Solver: def __init__(self, invalid: bytes): # create a silent qiling instance - self.ql = Qiling([rf"{ROOTFS}/bin/crackme.exe"], ROOTFS, verbose=QL_VERBOSE.OFF) + self.ql = Qiling([rf"{ROOTFS}/bin/crackme.exe"], ROOTFS, verbose=QL_VERBOSE.DISABLED) self.ql.os.stdin = pipe.SimpleInStream(sys.stdin.fileno()) # take over the input to the program using a fake stdin self.ql.os.stdout = pipe.NullOutStream(sys.stdout.fileno()) # disregard program output diff --git a/examples/crackme_x86_windows_setcallback.py b/examples/crackme_x86_windows_setcallback.py index f873d7d83..0200f4722 100644 --- a/examples/crackme_x86_windows_setcallback.py +++ b/examples/crackme_x86_windows_setcallback.py @@ -9,26 +9,36 @@ from qiling import Qiling def force_call_dialog_func(ql: Qiling): + # this hook is invoked after returning from DialogBoxParamA, so its + # stack frame content is still available to us. + # get DialogFunc address - lpDialogFunc = ql.unpack32(ql.mem.read(ql.arch.regs.esp - 0x8, 4)) + lpDialogFunc = ql.stack_read(-8) + # setup stack for DialogFunc ql.stack_push(0) ql.stack_push(1001) ql.stack_push(273) ql.stack_push(0) ql.stack_push(0x0401018) + # force EIP to DialogFunc ql.arch.regs.eip = lpDialogFunc def my_sandbox(path, rootfs): ql = Qiling(path, rootfs) + # patch the input validation code: overwrite all its breaking points + # denoted with "jne 0x401135", so it would keep going even if there + # is an error ql.patch(0x004010B5, b'\x90\x90') ql.patch(0x004010CD, b'\x90\x90') ql.patch(0x0040110B, b'\x90\x90') ql.patch(0x00401112, b'\x90\x90') + # hook the instruction after returning from DialogBoxParamA ql.hook_address(force_call_dialog_func, 0x00401016) + ql.run() if __name__ == "__main__": diff --git a/examples/crackme_x86_windows_unpatch.py b/examples/crackme_x86_windows_unpatch.py index 1e43e50f6..ffcf0a52c 100644 --- a/examples/crackme_x86_windows_unpatch.py +++ b/examples/crackme_x86_windows_unpatch.py @@ -9,21 +9,28 @@ from qiling import Qiling def force_call_dialog_func(ql: Qiling): + # this hook is invoked after returning from DialogBoxParamA, so its + # stack frame content is still available to us. + # get DialogFunc address - lpDialogFunc = ql.unpack32(ql.mem.read(ql.arch.regs.esp - 0x8, 4)) + lpDialogFunc = ql.stack_read(-8) + # setup stack for DialogFunc ql.stack_push(0) ql.stack_push(1001) ql.stack_push(273) ql.stack_push(0) ql.stack_push(0x0401018) + # force EIP to DialogFunc ql.arch.regs.eip = lpDialogFunc def our_sandbox(path, rootfs): ql = Qiling(path, rootfs) + # hook the instruction after returning from DialogBoxParamA ql.hook_address(force_call_dialog_func, 0x00401016) + ql.run() if __name__ == "__main__": diff --git a/examples/hello_arm_qnx_customapi.py b/examples/hello_arm_qnx_customapi.py index 7924afd7c..8175e1be9 100644 --- a/examples/hello_arm_qnx_customapi.py +++ b/examples/hello_arm_qnx_customapi.py @@ -7,23 +7,26 @@ sys.path.append("..") from qiling import Qiling -from qiling.const import QL_INTERCEPT, QL_CALL_BLOCK, QL_VERBOSE +from qiling.const import QL_INTERCEPT, QL_CALL_BLOCK from qiling.os.const import STRING def my_puts_onenter(ql: Qiling): params = ql.os.resolve_fcall_params({'s': STRING}) print(f'puts("{params["s"]}")') + return QL_CALL_BLOCK def my_printf_onenter(ql: Qiling): params = ql.os.resolve_fcall_params({'s': STRING}) print(f'printf("{params["s"]}")') + return QL_CALL_BLOCK def my_puts_onexit(ql: Qiling): print(f'after puts') + return QL_CALL_BLOCK if __name__ == "__main__": diff --git a/examples/hello_arm_uboot.py b/examples/hello_arm_uboot.py index 9ed8fba51..90087f3d6 100644 --- a/examples/hello_arm_uboot.py +++ b/examples/hello_arm_uboot.py @@ -11,8 +11,12 @@ from qiling.os.const import STRING def get_kaimendaji_password(): - def my_getenv(ql, *args, **kwargs): - env = {"ID": b"000000000000000", "ethaddr": b"11:22:33:44:55:66"} + def my_getenv(ql: Qiling): + env = { + "ID" : b"000000000000000", + "ethaddr" : b"11:22:33:44:55:66" + } + params = ql.os.resolve_fcall_params({'key': STRING}) value = env.get(params["key"], b"") @@ -22,7 +26,7 @@ def my_getenv(ql, *args, **kwargs): ql.arch.regs.r0 = value_addr ql.arch.regs.arch_pc = ql.arch.regs.lr - def get_password(ql, *args, **kwargs): + def get_password(ql: Qiling): password_raw = ql.mem.read(ql.arch.regs.r0, ql.arch.regs.r2) password = '' @@ -34,7 +38,7 @@ def get_password(ql, *args, **kwargs): print("The password is: %s" % password) - def partial_run_init(ql): + def partial_run_init(ql: Qiling): # argv prepare ql.arch.regs.arch_sp -= 0x30 arg0_ptr = ql.arch.regs.arch_sp diff --git a/examples/hello_linuxx8664_intercept.py b/examples/hello_linuxx8664_intercept.py index 532970a47..d93b9c22b 100644 --- a/examples/hello_linuxx8664_intercept.py +++ b/examples/hello_linuxx8664_intercept.py @@ -7,21 +7,24 @@ sys.path.append("..") from qiling import Qiling -from qiling.const import QL_INTERCEPT, QL_VERBOSE +from qiling.const import QL_INTERCEPT from qiling.os.linux.syscall_nums import SYSCALL_NR -def write_onenter(ql: Qiling, arg1, arg2, arg3, *args): +def write_onenter(ql: Qiling, fd: int, buf: int, count: int): print("enter write syscall!") - ql.arch.regs.rsi = arg2 + 1 - ql.arch.regs.rdx = arg3 - 1 -def write_onexit(ql: Qiling, arg1, arg2, arg3, *args): + ql.arch.regs.rsi = buf + 1 + ql.arch.regs.rdx = count - 1 + +def write_onexit(ql: Qiling, fd: int, buf: int, count: int, retval: int): print("exit write syscall!") - ql.arch.regs.rax = arg3 + 1 + + ql.arch.regs.rax = count + 1 if __name__ == "__main__": - ql = Qiling(["rootfs/x8664_linux/bin/x8664_hello"], "rootfs/x8664_linux", verbose=QL_VERBOSE.DEBUG) + ql = Qiling(["rootfs/x8664_linux/bin/x8664_hello"], "rootfs/x8664_linux") ql.os.set_syscall(SYSCALL_NR.write, write_onenter, QL_INTERCEPT.ENTER) ql.os.set_syscall(SYSCALL_NR.write, write_onexit, QL_INTERCEPT.EXIT) + ql.run() diff --git a/examples/uefi_sanitized_heap.py b/examples/uefi_sanitized_heap.py index 1738025d9..321de48ac 100644 --- a/examples/uefi_sanitized_heap.py +++ b/examples/uefi_sanitized_heap.py @@ -27,6 +27,7 @@ def enable_sanitized_heap(ql, fault_rate=0): heap.alloc(0x1000) ql.os.heap = heap + ql.loader.dxe_context.heap = heap def sanitized_emulate(path, rootfs, fault_type, verbose=QL_VERBOSE.DEBUG): env = {'FaultType': fault_type} diff --git a/qiling/extensions/sanitizers/heap.py b/qiling/extensions/sanitizers/heap.py index 8fc476f79..7ca0f41ca 100644 --- a/qiling/extensions/sanitizers/heap.py +++ b/qiling/extensions/sanitizers/heap.py @@ -6,6 +6,8 @@ import random from enum import Enum +from qiling import Qiling + class CaneryType(Enum): underflow = 0 overflow = 1 @@ -22,7 +24,9 @@ class QlSanitizedMemoryHeap(): ql.os.heap.uaf_handler = my_uaf_handler """ - def __init__(self, ql, heap, fault_rate=0, canary_byte=b'\xCD'): + CANARY_SIZE = 4 + + def __init__(self, ql: Qiling, heap, fault_rate=0, canary_byte=b'\xCD'): self.ql = ql self.heap = heap self.fault_rate = fault_rate @@ -77,34 +81,39 @@ def bad_free_handler(ql, addr): """ pass - def alloc(self, size): + def alloc(self, size: int): chance = random.randint(1, 100) if chance <= self.fault_rate: # Fail the allocation. return 0 - # Add 8 bytes to the requested size so as to accomodate the canaries. - addr = self.heap.alloc(size + 8) - self.ql.mem.write(addr, self.canary_byte * (size + 8)) + addr = self.heap.alloc(size + self.CANARY_SIZE * 2) + + canary_begins = addr + canary_ends = canary_begins + self.CANARY_SIZE - 1 + + # install underflow canary and detection hooks + self.ql.mem.write(canary_begins, self.canary_byte * self.CANARY_SIZE) + self.ql.hook_mem_write(self.bo_handler, begin=canary_begins, end=canary_ends) + self.ql.hook_mem_read(self.oob_handler, begin=canary_begins, end=canary_ends) + self.canaries.append((canary_begins, canary_ends, CaneryType.underflow)) - # Install canary hooks for overflow/underflow detection. - underflow_canary = (addr, addr + 3, CaneryType.underflow) - self.ql.hook_mem_write(self.bo_handler, begin=underflow_canary[0], end=underflow_canary[1]) - self.ql.hook_mem_read(self.oob_handler, begin=underflow_canary[0], end=underflow_canary[1]) - self.canaries.append(underflow_canary) + canary_begins = addr + self.CANARY_SIZE + size + canary_ends = canary_begins + self.CANARY_SIZE - 1 - overflow_canary = (addr + 4 + size, addr + 4 + size + 3, CaneryType.overflow) - self.ql.hook_mem_write(self.bo_handler, begin=overflow_canary[0], end=overflow_canary[1]) - self.ql.hook_mem_read(self.oob_handler, begin=overflow_canary[0], end=overflow_canary[1]) - self.canaries.append(overflow_canary) + # install overflow canary and detection hooks + self.ql.mem.write(canary_begins, self.canary_byte * self.CANARY_SIZE) + self.ql.hook_mem_write(self.bo_handler, begin=canary_begins, end=canary_ends) + self.ql.hook_mem_read(self.oob_handler, begin=canary_begins, end=canary_ends) + self.canaries.append((canary_begins, canary_ends, CaneryType.overflow)) - return (addr + 4) + return (addr + self.CANARY_SIZE) - def size(self, addr): - return self.heap.size(addr - 4) + def size(self, addr: int): + return self.heap.size(addr - self.CANARY_SIZE) - def free(self, addr): - chunk = self.heap._find(addr - 4) + def free(self, addr: int) -> bool: + chunk = self.heap._find(addr - self.CANARY_SIZE) if not chunk: self.bad_free_handler(self.ql, addr) @@ -118,13 +127,15 @@ def free(self, addr): # Make sure the chunk won't be re-used by the underlying heap. self.heap.chunks.remove(chunk) + return True - def validate(self): - for (canary_begin, canary_end, canery_type) in self.canaries: + def validate(self) -> bool: + for canary_begin, canary_end, _ in self.canaries: size = canary_end - canary_begin + 1 canary = self.ql.mem.read(canary_begin, size) + if canary.count(self.canary_byte) != len(canary): return False + return True - \ No newline at end of file