diff --git a/qiling/core_hooks.py b/qiling/core_hooks.py index 36a13393e..343736f6b 100644 --- a/qiling/core_hooks.py +++ b/qiling/core_hooks.py @@ -8,77 +8,65 @@ # handling hooks # ############################################## +from typing import Callable, MutableMapping, MutableSequence -from unicorn import * +from unicorn import Uc +from unicorn.unicorn_const import * from .core_hooks_types import Hook, HookAddr, HookIntr, HookRet from .utils import catch_KeyboardInterrupt from .const import QL_HOOK_BLOCK, QL_ARCH_NONEOS from .exception import QlErrorCoreHook - # Don't assume self is Qiling. class QlCoreHooks: - def __init__(self, uc=None): + def __init__(self, uc: Uc): self._h_uc = uc - self._hook = {} - self._hook_fuc = {} - self._insn_hook = {} - self._insn_hook_fuc = {} - self._addr_hook = {} - self._addr_hook_fuc = {} - - self.hook_intr_fuc = None - self.hook_insn_fuc = None - self.hook_code_fuc = None - self.hook_block_fuc = None - self.hook_mem_read_unmapped_fuc = None - self.hook_mem_write_unmapped_fuc = None - self.hook_mem_fetch_unmapped_fuc = None - self.hook_mem_read_prot_fuc = None - self.hook_mem_write_prot_fuc = None - self.hook_mem_fetch_prot_fuc = None - self.hook_mem_read_fuc = None - self.hook_mem_write_fuc = None - self.hook_mem_fetch_fuc = None - self.hook_mem_read_after_fuc = None - self.hook_insn_invalid_fuc = None + + self._hook: MutableMapping[int, MutableSequence[Hook]] = {} + self._hook_fuc: MutableMapping[int, int] = {} + + self._insn_hook: MutableMapping[int, MutableSequence[Hook]] = {} + self._insn_hook_fuc: MutableMapping[int, int] = {} + + self._addr_hook: MutableMapping[int, MutableSequence[HookAddr]] = {} + self._addr_hook_fuc: MutableMapping[int, int] = {} ######################## # Callback definitions # ######################## - def _callback_type3(self, uc, intno, pack_data): - ql, user_data, callback = pack_data # unpack what we packed for hook_add() - if user_data: - return callback(ql, intno, user_data) - return callback(ql, intno) # callback does not require user_data - - - def _hook_intr_cb(self, uc, intno, pack_data): + def _hook_intr_cb(self, uc: Uc, intno: int, pack_data) -> None: ql, hook_type = pack_data - catched = False - if hook_type in self._hook.keys(): - for h in self._hook[hook_type]: - if h.check(ql, intno): - catched = True - ret = h.call(ql, intno) - if isinstance(ret, int) == True and ret & QL_HOOK_BLOCK != 0: + handled = False + + if hook_type in self._hook: + # the hooks list might change from within a hook method. + # iterating over a copy of the list would be a safer practice + hooks_list = self._hook[hook_type] + + for hook in hooks_list: + if hook.check(ql, intno): + handled = True + ret = hook.call(ql, intno) + + if type(ret) is int and ret & QL_HOOK_BLOCK: break - - if catched == False: - raise QlErrorCoreHook("_hook_intr_cb : catched == False") - - def _hook_insn_cb(self, uc, *args): + if not handled: + raise QlErrorCoreHook("_hook_intr_cb : not handled") + + + def _hook_insn_cb(self, uc: Uc, *args): ql, hook_type = args[-1] + retval = None - if hook_type in self._insn_hook.keys(): - retval = None + if hook_type in self._insn_hook: + hooks_list = self._insn_hook[hook_type] - for h in self._insn_hook[hook_type]: - if h.bound_check(ql.reg.arch_pc): - ret = h.call(ql, *args[ : -1]) + for hook in hooks_list: + if hook.bound_check(ql.reg.arch_pc): + ret = hook.call(ql, *args[:-1]) if type(ret) is tuple: ret, retval = ret @@ -86,173 +74,174 @@ def _hook_insn_cb(self, uc, *args): if type(ret) is int and ret & QL_HOOK_BLOCK: break - # use the last return value received - return retval + # use the last return value received + return retval - def _callback_type4(self, uc, addr, size, pack_data): - ql, user_data, callback = pack_data - if user_data: - return callback(ql, addr, size, user_data) - return callback(ql, addr, size) + def _hook_trace_cb(self, uc: Uc, addr: int, size: int, pack_data) -> None: + ql, hook_type = pack_data - def _callback_type4a(self, uc, _addr, _size, pack_data): - ql, user_data, callback = pack_data - if user_data: - return callback(ql, user_data) - return callback(ql) + if hook_type in self._hook: + hooks_list = self._hook[hook_type] + for hook in hooks_list: + if hook.bound_check(ql.reg.arch_pc): + ret = hook.call(ql, addr, size) - def _hook_trace_cb(self, uc, addr, size, pack_data): - ql, hook_type = pack_data - if hook_type in self._hook.keys(): - for h in self._hook[hook_type]: - if h.bound_check(ql.reg.arch_pc): - ret = h.call(ql, addr, size) - if isinstance(ret, int) == True and ret & QL_HOOK_BLOCK != 0: + if type(ret) is int and ret & QL_HOOK_BLOCK: break - def _callback_type6(self, uc, access, addr, size, value, pack_data): - ql, user_data, callback = pack_data - if user_data: - return callback(ql, addr, size, value, user_data) - return callback(ql, addr, size, value) - - - def _hook_mem_cb(self, uc, access, addr, size, value, pack_data): + def _hook_mem_cb(self, uc: Uc, access: int, addr: int, size: int, value: int, pack_data): ql, hook_type = pack_data handled = False - if hook_type in self._hook.keys(): - for h in self._hook[hook_type]: - if h.bound_check(addr, size): + + if hook_type in self._hook: + hooks_list = self._hook[hook_type] + + for hook in hooks_list: + if hook.bound_check(addr, size): handled = True - ret = h.call(ql, access, addr, size, value) - if isinstance(ret, int) == True and ret & QL_HOOK_BLOCK != 0: + ret = hook.call(ql, access, addr, size, value) + + if type(ret) is int and ret & QL_HOOK_BLOCK: break - - if hook_type in (UC_HOOK_MEM_READ_UNMAPPED, UC_HOOK_MEM_WRITE_UNMAPPED, UC_HOOK_MEM_FETCH_UNMAPPED, UC_HOOK_MEM_READ_PROT, UC_HOOK_MEM_WRITE_PROT, UC_HOOK_MEM_FETCH_PROT): - if handled == False: - raise QlErrorCoreHook("_hook_mem_cb : handled == False") - return True + if not handled and hook_type in (UC_HOOK_MEM_READ_UNMAPPED, UC_HOOK_MEM_WRITE_UNMAPPED, UC_HOOK_MEM_FETCH_UNMAPPED, UC_HOOK_MEM_READ_PROT, UC_HOOK_MEM_WRITE_PROT, UC_HOOK_MEM_FETCH_PROT): + raise QlErrorCoreHook("_hook_mem_cb : not handled") - def _callback_x86_syscall(self, uc, pack_data): - ql, user_data, callback = pack_data - if user_data: - return callback(ql, user_data) - return callback(ql) + return True - def _hook_insn_invalid_cb(self, uc, pack_data): + def _hook_insn_invalid_cb(self, uc: Uc, pack_data) -> None: ql, hook_type = pack_data - catched = False - if hook_type in self._hook.keys(): - for h in self._hook[hook_type]: - catched = True - ret = h.call(ql) - if isinstance(ret, int) == True and ret & QL_HOOK_BLOCK != 0: + handled = False + + if hook_type in self._hook: + hooks_list = self._hook[hook_type] + + for hook in hooks_list: + handled = True + ret = hook.call(ql) + + if type(ret) is int and ret & QL_HOOK_BLOCK: break - - if catched == False: - raise QlErrorCoreHook("_hook_intr_invalid_cb : catched == False") + + if not handled: + raise QlErrorCoreHook("_hook_intr_invalid_cb : not handled") - def _hook_addr_cb(self, uc, addr, size, pack_data): + def _hook_addr_cb(self, uc: Uc, addr: int, size: int, pack_data): ql, addr = pack_data - if addr in self._addr_hook.keys(): - for h in self._addr_hook[addr]: - ret = h.call(ql, addr, size) - if isinstance(ret, int) == True and ret & QL_HOOK_BLOCK != 0: + + if addr in self._addr_hook: + hooks_list = self._addr_hook[addr] + + for hook in hooks_list: + ret = hook.call(ql, addr, size) + + if type(ret) is int and ret & QL_HOOK_BLOCK: break ############### # Class Hooks # ############### - def _ql_hook_internal(self, hook_type, callback, user_data=None, *args): + def _ql_hook_internal(self, hook_type, callback, user_data=None, *args) -> int: _callback = (catch_KeyboardInterrupt(self))(callback) # pack user_data & callback for wrapper _callback return self._h_uc.hook_add(hook_type, _callback, (self, user_data), 1, 0, *args) - def _ql_hook_addr_internal(self, callback, user_data, address): + def _ql_hook_addr_internal(self, callback: Callable, address: int) -> int: _callback = (catch_KeyboardInterrupt(self))(callback) # pack user_data & callback for wrapper _callback - return self._h_uc.hook_add(UC_HOOK_CODE, _callback, (self, user_data), address, address) - - - def _ql_hook(self, hook_type, h, *args): - base_type = [ - UC_HOOK_INTR, - UC_HOOK_INSN, - UC_HOOK_CODE, - UC_HOOK_BLOCK, - UC_HOOK_MEM_READ_UNMAPPED, - UC_HOOK_MEM_WRITE_UNMAPPED, - UC_HOOK_MEM_FETCH_UNMAPPED, - UC_HOOK_MEM_READ_PROT, - UC_HOOK_MEM_WRITE_PROT, - UC_HOOK_MEM_FETCH_PROT, - UC_HOOK_MEM_READ, - UC_HOOK_MEM_WRITE, - UC_HOOK_MEM_FETCH, - UC_HOOK_MEM_READ_AFTER, - UC_HOOK_INSN_INVALID - ] - for t in base_type: - if (t & hook_type) != 0: - if t in (UC_HOOK_INTR, ): - if t not in self._hook_fuc.keys(): - self._hook_fuc[t] = self._ql_hook_internal(t, self._hook_intr_cb, t) - - if t not in self._hook.keys(): - self._hook[t] = [] - self._hook[t].append(h) - - if t in (UC_HOOK_INSN, ): - ins_t = args[0] - if ins_t not in self._insn_hook_fuc.keys(): - self._insn_hook_fuc[ins_t] = self._ql_hook_internal(t, self._hook_insn_cb, ins_t, *args) - - if ins_t not in self._insn_hook.keys(): - self._insn_hook[ins_t] = [] - self._insn_hook[ins_t].append(h) - - if t in (UC_HOOK_CODE, UC_HOOK_BLOCK): - if t not in self._hook_fuc.keys(): - self._hook_fuc[t] = self._ql_hook_internal(t, self._hook_trace_cb, t) - - if t not in self._hook.keys(): - self._hook[t] = [] - self._hook[t].append(h) - - if t in (UC_HOOK_MEM_READ_UNMAPPED, UC_HOOK_MEM_WRITE_UNMAPPED, UC_HOOK_MEM_FETCH_UNMAPPED, UC_HOOK_MEM_READ_PROT, UC_HOOK_MEM_WRITE_PROT, UC_HOOK_MEM_FETCH_PROT, UC_HOOK_MEM_READ, UC_HOOK_MEM_WRITE, UC_HOOK_MEM_FETCH, UC_HOOK_MEM_READ_AFTER): - if t not in self._hook_fuc.keys(): - self._hook_fuc[t] = self._ql_hook_internal(t, self._hook_mem_cb, t) - - if t not in self._hook.keys(): - self._hook[t] = [] - self._hook[t].append(h) - - if t in (UC_HOOK_INSN_INVALID, ): - if t not in self._hook_fuc.keys(): - self._hook_fuc[t] = self._ql_hook_internal(t, self._hook_insn_invalid_cb, t) - - if t not in self._hook.keys(): - self._hook[t] = [] - self._hook[t].append(h) - - - def ql_hook(self, hook_type, callback, user_data=None, begin=1, end=0, *args): - h = Hook(callback, user_data, begin, end) - self._ql_hook(hook_type, h, *args) - return HookRet(self, hook_type, h) + return self._h_uc.hook_add(UC_HOOK_CODE, _callback, (self, address), address, address) + + + def _ql_hook(self, hook_type: int, h: Hook, *args) -> None: + + def __handle_intr(t: int) -> None: + if t not in self._hook_fuc: + self._hook_fuc[t] = self._ql_hook_internal(t, self._hook_intr_cb, t) + + if t not in self._hook: + self._hook[t] = [] + + self._hook[t].append(h) + + def __handle_insn(t: int) -> None: + ins_t = args[0] + + if ins_t not in self._insn_hook_fuc: + self._insn_hook_fuc[ins_t] = self._ql_hook_internal(t, self._hook_insn_cb, ins_t, *args) + + if ins_t not in self._insn_hook: + self._insn_hook[ins_t] = [] + + self._insn_hook[ins_t].append(h) + + def __handle_trace(t: int) -> None: + if t not in self._hook_fuc: + self._hook_fuc[t] = self._ql_hook_internal(t, self._hook_trace_cb, t) + + if t not in self._hook: + self._hook[t] = [] + + self._hook[t].append(h) + + def __handle_mem(t: int) -> None: + if t not in self._hook_fuc: + self._hook_fuc[t] = self._ql_hook_internal(t, self._hook_mem_cb, t) + + if t not in self._hook: + self._hook[t] = [] + + self._hook[t].append(h) + + def __handle_invalid_insn(t: int) -> None: + if t not in self._hook_fuc: + self._hook_fuc[t] = self._ql_hook_internal(t, self._hook_insn_invalid_cb, t) + + if t not in self._hook: + self._hook[t] = [] + + self._hook[t].append(h) + + type_handlers = { + UC_HOOK_INTR : __handle_intr, + UC_HOOK_INSN : __handle_insn, + UC_HOOK_CODE : __handle_trace, + UC_HOOK_BLOCK : __handle_trace, + UC_HOOK_MEM_READ_UNMAPPED : __handle_mem, + UC_HOOK_MEM_WRITE_UNMAPPED : __handle_mem, + UC_HOOK_MEM_FETCH_UNMAPPED : __handle_mem, + UC_HOOK_MEM_READ_PROT : __handle_mem, + UC_HOOK_MEM_WRITE_PROT : __handle_mem, + UC_HOOK_MEM_FETCH_PROT : __handle_mem, + UC_HOOK_MEM_READ : __handle_mem, + UC_HOOK_MEM_WRITE : __handle_mem, + UC_HOOK_MEM_FETCH : __handle_mem, + UC_HOOK_MEM_READ_AFTER : __handle_mem, + UC_HOOK_INSN_INVALID : __handle_invalid_insn + } + + for t, handler in type_handlers.items(): + if hook_type & t: + handler(t) + + + def ql_hook(self, hook_type: int, callback: Callable, user_data=None, begin=1, end=0, *args) -> HookRet: + hook = Hook(callback, user_data, begin, end) + self._ql_hook(hook_type, hook, *args) + + return HookRet(self, hook_type, hook) def hook_code(self, callback, user_data=None, begin=1, end=0): if self.archtype in QL_ARCH_NONEOS: from .arch.evm.hooks import ql_evm_hooks return ql_evm_hooks(self, 'HOOK_CODE', callback, user_data, begin, end) + return self.ql_hook(UC_HOOK_CODE, callback, user_data, begin, end) @@ -282,36 +271,40 @@ def hook_mem_fetch_invalid(self, callback, user_data=None, begin=1, end=0): def hook_mem_valid(self, callback, user_data=None, begin=1, end=0): return self.ql_hook(UC_HOOK_MEM_VALID, callback, user_data, begin, end) - + + def hook_mem_invalid(self, callback, user_data=None, begin=1, end=0): return self.ql_hook(UC_HOOK_MEM_INVALID, callback, user_data, begin, end) # a convenient API to set callback for a single address def hook_address(self, callback, address, user_data=None): - h = HookAddr(callback, address, user_data) - + hook = HookAddr(callback, address, user_data) + if self.archtype in QL_ARCH_NONEOS: from .arch.evm.hooks import evm_hook_address - return evm_hook_address(self, 'HOOK_ADDR', h, address) + return evm_hook_address(self, 'HOOK_ADDR', hook, address) - if address not in self._addr_hook_fuc.keys(): - self._addr_hook_fuc[address] = self._ql_hook_addr_internal(self._hook_addr_cb, address, address) + if address not in self._addr_hook_fuc: + self._addr_hook_fuc[address] = self._ql_hook_addr_internal(self._hook_addr_cb, address) - if address not in self._addr_hook.keys(): + if address not in self._addr_hook: self._addr_hook[address] = [] - self._addr_hook[address].append(h) - return HookRet(self, None, h) - + self._addr_hook[address].append(hook) + + return HookRet(self, None, hook) + def get_hook_address(self, address): return self._addr_hook.get(address, []) + def hook_intno(self, callback, intno, user_data=None): - h = HookIntr(callback, intno, user_data) - self._ql_hook(UC_HOOK_INTR, h) - return HookRet(self, UC_HOOK_INTR, h) + hook = HookIntr(callback, intno, user_data) + self._ql_hook(UC_HOOK_INTR, hook) + + return HookRet(self, UC_HOOK_INTR, hook) def hook_mem_read(self, callback, user_data=None, begin=1, end=0): @@ -330,6 +323,7 @@ def hook_insn(self, callback, arg1, user_data=None, begin=1, end=0): if self.archtype in QL_ARCH_NONEOS: from .arch.evm.hooks import evm_hook_insn return evm_hook_insn(self, 'HOOK_INSN', callback, arg1, user_data, begin, end) + return self.ql_hook(UC_HOOK_INSN, callback, user_data, begin, end, arg1) @@ -340,79 +334,88 @@ def hook_del(self, *args): if isinstance(args[0], HookRet): args[0].remove() return - else: - hook_type, h = args + + hook_type, h = args if self.archtype in QL_ARCH_NONEOS: from .arch.evm.hooks import evm_hook_del return evm_hook_del(hook_type, h) - base_type = [ - UC_HOOK_INTR, - UC_HOOK_INSN, - UC_HOOK_CODE, - UC_HOOK_BLOCK, - UC_HOOK_MEM_READ_UNMAPPED, - UC_HOOK_MEM_WRITE_UNMAPPED, - UC_HOOK_MEM_FETCH_UNMAPPED, - UC_HOOK_MEM_READ_PROT, - UC_HOOK_MEM_WRITE_PROT, - UC_HOOK_MEM_FETCH_PROT, - UC_HOOK_MEM_READ, - UC_HOOK_MEM_WRITE, - UC_HOOK_MEM_FETCH, - UC_HOOK_MEM_READ_AFTER, - UC_HOOK_INSN_INVALID - ] + def __handle_common(t: int) -> None: + if t in self._hook: + if h in self._hook[t]: + del self._hook[t][self._hook[t].index(h)] + + if len(self._hook[t]) == 0: + self._h_uc.hook_del(self._hook_fuc[t]) + del self._hook_fuc[t] + + def __handle_insn(t: int) -> None: + if t in self._insn_hook: + if h in self._insn_hook[t]: + del self._insn_hook[t][self._insn_hook[t].index(h)] + + if len(self._insn_hook[t]) == 0: + self._h_uc.hook_del(self._insn_hook_fuc[t]) + del self._insn_hook_fuc[t] + + def __handle_addr(t: int) -> None: + if t in self._addr_hook: + if h in self._addr_hook[t]: + del self._addr_hook[t][self._addr_hook[t].index(h)] + + if len(self._addr_hook[t]) == 0: + self._h_uc.hook_del(self._addr_hook_fuc[t]) + del self._addr_hook_fuc[t] + + type_handlers = { + UC_HOOK_INTR : __handle_common, + UC_HOOK_INSN : __handle_insn, + UC_HOOK_CODE : __handle_common, + UC_HOOK_BLOCK : __handle_common, + UC_HOOK_MEM_READ_UNMAPPED : __handle_common, + UC_HOOK_MEM_WRITE_UNMAPPED : __handle_common, + UC_HOOK_MEM_FETCH_UNMAPPED : __handle_common, + UC_HOOK_MEM_READ_PROT : __handle_common, + UC_HOOK_MEM_WRITE_PROT : __handle_common, + UC_HOOK_MEM_FETCH_PROT : __handle_common, + UC_HOOK_MEM_READ : __handle_common, + UC_HOOK_MEM_WRITE : __handle_common, + UC_HOOK_MEM_FETCH : __handle_common, + UC_HOOK_MEM_READ_AFTER : __handle_common, + UC_HOOK_INSN_INVALID : __handle_common + } + + # address hooks are a special case of UC_HOOK_CODE and + # should be handled separately if isinstance(h, HookAddr): - if h.addr in self._addr_hook.keys(): - if h in self._addr_hook[h.addr]: - del self._addr_hook[h.addr][self._addr_hook[h.addr].index(h)] - - if len(self._addr_hook[h.addr]) == 0: - self._h_uc.hook_del(self._addr_hook_fuc[h.addr]) - del self._addr_hook_fuc[h.addr] - + __handle_addr(h.addr) return - for t in base_type: - if (t & hook_type) != 0: - if t in (UC_HOOK_INTR, UC_HOOK_CODE, UC_HOOK_BLOCK, UC_HOOK_MEM_READ_UNMAPPED, UC_HOOK_MEM_WRITE_UNMAPPED, UC_HOOK_MEM_FETCH_UNMAPPED, UC_HOOK_MEM_READ_PROT, UC_HOOK_MEM_WRITE_PROT, UC_HOOK_MEM_FETCH_PROT, UC_HOOK_MEM_READ, UC_HOOK_MEM_WRITE, UC_HOOK_MEM_FETCH, UC_HOOK_MEM_READ_AFTER, UC_HOOK_INSN_INVALID): - if t in self._hook.keys(): - if h in self._hook[t]: - del self._hook[t][self._hook[t].index(h)] - - if len(self._hook[t]) == 0: - self._h_uc.hook_del(self._hook_fuc[t]) - del self._hook_fuc[t] - - if t in (UC_HOOK_INSN, ): - if t in self._insn_hook.keys(): - if h in self._insn_hook[t]: - del self._insn_hook[t][self._insn_hook[t].index(h)] - - if len(self._insn_hook[t]) == 0: - self._h_uc.hook_del(self._insn_hook_fuc[t]) - del self._insn_hook_fuc[t] - + for t, handler in type_handlers.items(): + if hook_type & t: + handler(t) + def clear_hooks(self): - for i in self._hook_fuc.keys(): - self._h_uc.hook_del(self._hook_fuc[i]) - - for i in self._insn_hook_fuc.keys(): - self._h_uc.hook_del(self._insn_hook_fuc[i]) + for ptr in self._hook_fuc.values(): + self._h_uc.hook_del(ptr) + + for ptr in self._insn_hook_fuc.values(): + self._h_uc.hook_del(ptr) - for i in self._addr_hook_fuc.keys(): - self._h_uc.hook_del(self._addr_hook_fuc[i]) + for ptr in self._addr_hook_fuc.values(): + self._h_uc.hook_del(ptr) self.clear_ql_hooks() - - + + def clear_ql_hooks(self): self._hook = {} self._hook_fuc = {} + self._insn_hook = {} self._insn_hook_fuc = {} + self._addr_hook = {} self._addr_hook_fuc = {}