Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/crackme_x86_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
class Solver:
def __init__(self, invalid: bytes):
# create a silent qiling instance
self.ql = Qiling([rf"{ROOTFS}/bin/crackme.exe"], ROOTFS, verbose=QL_VERBOSE.OFF)
self.ql = Qiling([rf"{ROOTFS}/bin/crackme.exe"], ROOTFS, verbose=QL_VERBOSE.DISABLED)

self.ql.os.stdin = pipe.SimpleInStream(sys.stdin.fileno()) # take over the input to the program using a fake stdin
self.ql.os.stdout = pipe.NullOutStream(sys.stdout.fileno()) # disregard program output
Expand Down
12 changes: 11 additions & 1 deletion examples/crackme_x86_windows_setcallback.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,36 @@
from qiling import Qiling

def force_call_dialog_func(ql: Qiling):
# this hook is invoked after returning from DialogBoxParamA, so its
# stack frame content is still available to us.

# get DialogFunc address
lpDialogFunc = ql.unpack32(ql.mem.read(ql.arch.regs.esp - 0x8, 4))
lpDialogFunc = ql.stack_read(-8)

# setup stack for DialogFunc
ql.stack_push(0)
ql.stack_push(1001)
ql.stack_push(273)
ql.stack_push(0)
ql.stack_push(0x0401018)

# force EIP to DialogFunc
ql.arch.regs.eip = lpDialogFunc

def my_sandbox(path, rootfs):
ql = Qiling(path, rootfs)

# patch the input validation code: overwrite all its breaking points
# denoted with "jne 0x401135", so it would keep going even if there
# is an error
ql.patch(0x004010B5, b'\x90\x90')
ql.patch(0x004010CD, b'\x90\x90')
ql.patch(0x0040110B, b'\x90\x90')
ql.patch(0x00401112, b'\x90\x90')

# hook the instruction after returning from DialogBoxParamA
ql.hook_address(force_call_dialog_func, 0x00401016)

ql.run()

if __name__ == "__main__":
Expand Down
9 changes: 8 additions & 1 deletion examples/crackme_x86_windows_unpatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,28 @@
from qiling import Qiling

def force_call_dialog_func(ql: Qiling):
# this hook is invoked after returning from DialogBoxParamA, so its
# stack frame content is still available to us.

# get DialogFunc address
lpDialogFunc = ql.unpack32(ql.mem.read(ql.arch.regs.esp - 0x8, 4))
lpDialogFunc = ql.stack_read(-8)

# setup stack for DialogFunc
ql.stack_push(0)
ql.stack_push(1001)
ql.stack_push(273)
ql.stack_push(0)
ql.stack_push(0x0401018)

# force EIP to DialogFunc
ql.arch.regs.eip = lpDialogFunc

def our_sandbox(path, rootfs):
ql = Qiling(path, rootfs)

# hook the instruction after returning from DialogBoxParamA
ql.hook_address(force_call_dialog_func, 0x00401016)

ql.run()

if __name__ == "__main__":
Expand Down
5 changes: 4 additions & 1 deletion examples/hello_arm_qnx_customapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,26 @@
sys.path.append("..")

from qiling import Qiling
from qiling.const import QL_INTERCEPT, QL_CALL_BLOCK, QL_VERBOSE
from qiling.const import QL_INTERCEPT, QL_CALL_BLOCK
from qiling.os.const import STRING

def my_puts_onenter(ql: Qiling):
params = ql.os.resolve_fcall_params({'s': STRING})

print(f'puts("{params["s"]}")')

return QL_CALL_BLOCK

def my_printf_onenter(ql: Qiling):
params = ql.os.resolve_fcall_params({'s': STRING})

print(f'printf("{params["s"]}")')

return QL_CALL_BLOCK

def my_puts_onexit(ql: Qiling):
print(f'after puts')

return QL_CALL_BLOCK

if __name__ == "__main__":
Expand Down
12 changes: 8 additions & 4 deletions examples/hello_arm_uboot.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@
from qiling.os.const import STRING

def get_kaimendaji_password():
def my_getenv(ql, *args, **kwargs):
env = {"ID": b"000000000000000", "ethaddr": b"11:22:33:44:55:66"}
def my_getenv(ql: Qiling):
env = {
"ID" : b"000000000000000",
"ethaddr" : b"11:22:33:44:55:66"
}

params = ql.os.resolve_fcall_params({'key': STRING})
value = env.get(params["key"], b"")

Expand All @@ -22,7 +26,7 @@ def my_getenv(ql, *args, **kwargs):
ql.arch.regs.r0 = value_addr
ql.arch.regs.arch_pc = ql.arch.regs.lr

def get_password(ql, *args, **kwargs):
def get_password(ql: Qiling):
password_raw = ql.mem.read(ql.arch.regs.r0, ql.arch.regs.r2)

password = ''
Expand All @@ -34,7 +38,7 @@ def get_password(ql, *args, **kwargs):

print("The password is: %s" % password)

def partial_run_init(ql):
def partial_run_init(ql: Qiling):
# argv prepare
ql.arch.regs.arch_sp -= 0x30
arg0_ptr = ql.arch.regs.arch_sp
Expand Down
17 changes: 10 additions & 7 deletions examples/hello_linuxx8664_intercept.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,24 @@
sys.path.append("..")

from qiling import Qiling
from qiling.const import QL_INTERCEPT, QL_VERBOSE
from qiling.const import QL_INTERCEPT
from qiling.os.linux.syscall_nums import SYSCALL_NR

def write_onenter(ql: Qiling, arg1, arg2, arg3, *args):
def write_onenter(ql: Qiling, fd: int, buf: int, count: int):
print("enter write syscall!")
ql.arch.regs.rsi = arg2 + 1
ql.arch.regs.rdx = arg3 - 1

def write_onexit(ql: Qiling, arg1, arg2, arg3, *args):
ql.arch.regs.rsi = buf + 1
ql.arch.regs.rdx = count - 1

def write_onexit(ql: Qiling, fd: int, buf: int, count: int, retval: int):
print("exit write syscall!")
ql.arch.regs.rax = arg3 + 1

ql.arch.regs.rax = count + 1

if __name__ == "__main__":
ql = Qiling(["rootfs/x8664_linux/bin/x8664_hello"], "rootfs/x8664_linux", verbose=QL_VERBOSE.DEBUG)
ql = Qiling(["rootfs/x8664_linux/bin/x8664_hello"], "rootfs/x8664_linux")

ql.os.set_syscall(SYSCALL_NR.write, write_onenter, QL_INTERCEPT.ENTER)
ql.os.set_syscall(SYSCALL_NR.write, write_onexit, QL_INTERCEPT.EXIT)

ql.run()
1 change: 1 addition & 0 deletions examples/uefi_sanitized_heap.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def enable_sanitized_heap(ql, fault_rate=0):
heap.alloc(0x1000)

ql.os.heap = heap
ql.loader.dxe_context.heap = heap

def sanitized_emulate(path, rootfs, fault_type, verbose=QL_VERBOSE.DEBUG):
env = {'FaultType': fault_type}
Expand Down
55 changes: 33 additions & 22 deletions qiling/extensions/sanitizers/heap.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import random
from enum import Enum

from qiling import Qiling

class CaneryType(Enum):
underflow = 0
overflow = 1
Expand All @@ -22,7 +24,9 @@ class QlSanitizedMemoryHeap():
ql.os.heap.uaf_handler = my_uaf_handler
"""

def __init__(self, ql, heap, fault_rate=0, canary_byte=b'\xCD'):
CANARY_SIZE = 4

def __init__(self, ql: Qiling, heap, fault_rate=0, canary_byte=b'\xCD'):
self.ql = ql
self.heap = heap
self.fault_rate = fault_rate
Expand Down Expand Up @@ -77,34 +81,39 @@ def bad_free_handler(ql, addr):
"""
pass

def alloc(self, size):
def alloc(self, size: int):
chance = random.randint(1, 100)
if chance <= self.fault_rate:
# Fail the allocation.
return 0

# Add 8 bytes to the requested size so as to accomodate the canaries.
addr = self.heap.alloc(size + 8)
self.ql.mem.write(addr, self.canary_byte * (size + 8))
addr = self.heap.alloc(size + self.CANARY_SIZE * 2)

canary_begins = addr
canary_ends = canary_begins + self.CANARY_SIZE - 1

# install underflow canary and detection hooks
self.ql.mem.write(canary_begins, self.canary_byte * self.CANARY_SIZE)
self.ql.hook_mem_write(self.bo_handler, begin=canary_begins, end=canary_ends)
self.ql.hook_mem_read(self.oob_handler, begin=canary_begins, end=canary_ends)
self.canaries.append((canary_begins, canary_ends, CaneryType.underflow))

# Install canary hooks for overflow/underflow detection.
underflow_canary = (addr, addr + 3, CaneryType.underflow)
self.ql.hook_mem_write(self.bo_handler, begin=underflow_canary[0], end=underflow_canary[1])
self.ql.hook_mem_read(self.oob_handler, begin=underflow_canary[0], end=underflow_canary[1])
self.canaries.append(underflow_canary)
canary_begins = addr + self.CANARY_SIZE + size
canary_ends = canary_begins + self.CANARY_SIZE - 1

overflow_canary = (addr + 4 + size, addr + 4 + size + 3, CaneryType.overflow)
self.ql.hook_mem_write(self.bo_handler, begin=overflow_canary[0], end=overflow_canary[1])
self.ql.hook_mem_read(self.oob_handler, begin=overflow_canary[0], end=overflow_canary[1])
self.canaries.append(overflow_canary)
# install overflow canary and detection hooks
self.ql.mem.write(canary_begins, self.canary_byte * self.CANARY_SIZE)
self.ql.hook_mem_write(self.bo_handler, begin=canary_begins, end=canary_ends)
self.ql.hook_mem_read(self.oob_handler, begin=canary_begins, end=canary_ends)
self.canaries.append((canary_begins, canary_ends, CaneryType.overflow))

return (addr + 4)
return (addr + self.CANARY_SIZE)

def size(self, addr):
return self.heap.size(addr - 4)
def size(self, addr: int):
return self.heap.size(addr - self.CANARY_SIZE)

def free(self, addr):
chunk = self.heap._find(addr - 4)
def free(self, addr: int) -> bool:
chunk = self.heap._find(addr - self.CANARY_SIZE)

if not chunk:
self.bad_free_handler(self.ql, addr)
Expand All @@ -118,13 +127,15 @@ def free(self, addr):

# Make sure the chunk won't be re-used by the underlying heap.
self.heap.chunks.remove(chunk)

return True

def validate(self):
for (canary_begin, canary_end, canery_type) in self.canaries:
def validate(self) -> bool:
for canary_begin, canary_end, _ in self.canaries:
size = canary_end - canary_begin + 1
canary = self.ql.mem.read(canary_begin, size)

if canary.count(self.canary_byte) != len(canary):
return False

return True