diff --git a/examples/windows_trace.py b/examples/windows_trace.py index e218b8331..8394e3f9d 100644 --- a/examples/windows_trace.py +++ b/examples/windows_trace.py @@ -115,7 +115,6 @@ def emulate(path, rootfs, verbose=QL_VERBOSE.DEBUG, enable_trace=False): parser.add_argument("-t", "--trace", help="Enable full trace", action='store_true', default=False) parser.add_argument("-R", "--root", help="rootfs", default=None) parser.add_argument("-d", "--dump", help="Directory to dump memory regions to", default="dump") - #parser.add_argument("-a", "--automatize_input", help="Automatize writes on standard input", default=False) parser.add_argument("-p ", "--profile", help="customized profile", default="qiling/profiles/windows.ql") parser.add_argument('input', nargs='*') diff --git a/qiling/arch/x86_utils.py b/qiling/arch/x86_utils.py index 39573c12d..de6e24f24 100644 --- a/qiling/arch/x86_utils.py +++ b/qiling/arch/x86_utils.py @@ -4,7 +4,7 @@ from qiling import Qiling from qiling.arch.x86 import QlArchIntel from qiling.arch.x86_const import * -from qiling.exception import QlGDTError +from qiling.exception import QlGDTError, QlMemoryMappedError from qiling.os.memory import QlMemoryManager class GDTArray: @@ -50,8 +50,10 @@ class GDTManager: def __init__(self, ql: Qiling, base = QL_X86_GDT_ADDR, limit = QL_X86_GDT_LIMIT, num_entries = 16): ql.log.debug(f'Mapping GDT at {base:#x} with limit {limit:#x}') - if not ql.mem.is_mapped(base, limit): - ql.mem.map(base, limit, info="[GDT]") + if not ql.mem.is_available(base, limit): + raise QlMemoryMappedError('cannot map GDT, memory location is taken') + + ql.mem.map(base, limit, info="[GDT]") # setup GDT by writing to GDTR ql.arch.regs.write(UC_X86_REG_GDTR, (0, base, limit, 0x0)) diff --git a/qiling/core.py b/qiling/core.py index 684d8e74d..0c5cb929e 100644 --- a/qiling/core.py +++ b/qiling/core.py @@ -45,7 +45,7 @@ def __init__( filter = None, stop: QL_STOP = QL_STOP.NONE, *, - endian: QL_ENDIAN = None, + endian: Optional[QL_ENDIAN] = None, thumb: bool = False, libcache: bool = False ): @@ -414,8 +414,8 @@ def debug_stop(self) -> bool: return self._debug_stop @debug_stop.setter - def debug_stop(self, ds): - self._debug_stop = ds + def debug_stop(self, enabled: bool): + self._debug_stop = enabled @property def debugger(self) -> bool: diff --git a/qiling/core_hooks.py b/qiling/core_hooks.py index a389a9469..0a8e4cb83 100644 --- a/qiling/core_hooks.py +++ b/qiling/core_hooks.py @@ -8,7 +8,8 @@ # handling hooks # ############################################## -from typing import Callable, MutableMapping, MutableSequence +from typing import Any, Callable, MutableMapping, MutableSequence, Protocol +from typing import TYPE_CHECKING from unicorn import Uc from unicorn.unicorn_const import * @@ -18,6 +19,73 @@ from .const import QL_HOOK_BLOCK from .exception import QlErrorCoreHook +if TYPE_CHECKING: + from qiling import Qiling + +class MemHookCallback(Protocol): + def __call__(self, __ql: 'Qiling', __access: int, __address: int, __size: int, __value: int, *__context: Any) -> Any: + """Memory access hook callback. + + Args: + __ql : the associated qiling instance + __access : the intercepted memory access type, one of UC_HOOK_MEM_* constants + __addr : the target memory location + __size : size of intercepted memory access + __value : the value to write, for write operations, 0 for others + __context : additional context passed on hook creation. if no context was passed, this argument should be omitted + + Returns: + an integer with `QL_HOOK_BLOCK` mask set to block execution of remaining hooks + (if any) or `None` + """ + pass + +class TraceHookCalback(Protocol): + def __call__(self, __ql: 'Qiling', __address: int, __size: int, *__context: Any) -> Any: + """Execution hook callback. + + Args: + __ql : the associated qiling instance + __address : address of the instruction to be executed + __size : instruction size + __context : additional context passed on hook creation. if no context was passed, this argument should be omitted + + Returns: + an integer with `QL_HOOK_BLOCK` mask set to block execution of remaining hooks + (if any) or `None` + """ + pass + +class AddressHookCallback(Protocol): + def __call__(self, __ql: 'Qiling', *__context: Any) -> Any: + """Address hook callback. + + Args: + __ql : the associated qiling instance + __context : additional context passed on hook creation. if no context was passed, this argument should be omitted + + Returns: + an integer with `QL_HOOK_BLOCK` mask set to block execution of remaining hooks + (if any) or `None` + """ + pass + +class InterruptHookCallback(Protocol): + def __call__(self, __ql: 'Qiling', intno: int, *__context: Any) -> Any: + """Interrupt hook callback. + + Args: + __ql : the associated qiling instance + __intno : the intercepted interrupt number + __context : additional context passed on hook creation. if no context was passed, this argument should be omitted + + Returns: + an integer with `QL_HOOK_BLOCK` mask set to block execution of remaining hooks + (if any) or `None` + """ + pass + + # Don't assume self is Qiling. class QlCoreHooks: def __init__(self, uc: Uc): @@ -37,6 +105,9 @@ def __init__(self, uc: Uc): # Callback definitions # ######################## def _hook_intr_cb(self, uc: Uc, intno: int, pack_data) -> None: + """Interrupt hooks dispatcher. + """ + ql, hook_type = pack_data handled = False @@ -60,15 +131,18 @@ def _hook_intr_cb(self, uc: Uc, intno: int, pack_data) -> None: def _hook_insn_cb(self, uc: Uc, *args): - ql, hook_type = args[-1] + """Instruction hooks dispatcher. + """ + + *hook_args, (ql, insn_type) = args retval = None - if hook_type in self._insn_hook: - hooks_list = self._insn_hook[hook_type] + if insn_type in self._insn_hook: + hooks_list = self._insn_hook[insn_type] for hook in hooks_list: if hook.bound_check(ql.arch.regs.arch_pc): - ret = hook.call(ql, *args[:-1]) + ret = hook.call(ql, *hook_args) if type(ret) is tuple: ret, retval = ret @@ -81,13 +155,16 @@ def _hook_insn_cb(self, uc: Uc, *args): def _hook_trace_cb(self, uc: Uc, addr: int, size: int, pack_data) -> None: + """Code and block hooks dispatcher. + """ + ql, hook_type = pack_data if hook_type in self._hook: hooks_list = self._hook[hook_type] for hook in hooks_list: - if hook.bound_check(ql.arch.regs.arch_pc): + if hook.bound_check(addr, size): ret = hook.call(ql, addr, size) if type(ret) is int and ret & QL_HOOK_BLOCK: @@ -95,6 +172,9 @@ def _hook_trace_cb(self, uc: Uc, addr: int, size: int, pack_data) -> None: def _hook_mem_cb(self, uc: Uc, access: int, addr: int, size: int, value: int, pack_data): + """Memory access hooks dispatcher. + """ + ql, hook_type = pack_data handled = False @@ -116,6 +196,9 @@ def _hook_mem_cb(self, uc: Uc, access: int, addr: int, size: int, value: int, pa def _hook_insn_invalid_cb(self, uc: Uc, pack_data) -> None: + """Invalid instruction hooks dispatcher. + """ + ql, hook_type = pack_data handled = False @@ -134,6 +217,9 @@ def _hook_insn_invalid_cb(self, uc: Uc, pack_data) -> None: def _hook_addr_cb(self, uc: Uc, addr: int, size: int, pack_data): + """Address hooks dispatcher. + """ + ql = pack_data if addr in self._addr_hook: @@ -148,10 +234,10 @@ def _hook_addr_cb(self, uc: Uc, addr: int, size: int, pack_data): ############### # Class Hooks # ############### - def _ql_hook_internal(self, hook_type, callback, user_data=None, *args) -> int: + def _ql_hook_internal(self, hook_type: int, callback: Callable, context: Any, *args) -> int: _callback = catch_KeyboardInterrupt(self, callback) - # pack user_data & callback for wrapper _callback - return self._h_uc.hook_add(hook_type, _callback, (self, user_data), 1, 0, *args) + + return self._h_uc.hook_add(hook_type, _callback, (self, context), 1, 0, *args) def _ql_hook_addr_internal(self, callback: Callable, address: int) -> int: @@ -175,7 +261,7 @@ def __handle_insn(t: int) -> None: ins_t = args[0] if ins_t not in self._insn_hook_fuc: - self._insn_hook_fuc[ins_t] = self._ql_hook_internal(t, self._hook_insn_cb, ins_t, *args) + self._insn_hook_fuc[ins_t] = self._ql_hook_internal(t, self._hook_insn_cb, ins_t, ins_t) if ins_t not in self._insn_hook: self._insn_hook[ins_t] = [] @@ -232,51 +318,202 @@ def __handle_invalid_insn(t: int) -> None: handler(t) - def ql_hook(self, hook_type: int, callback: Callable, user_data=None, begin=1, end=0, *args) -> HookRet: + def ql_hook(self, hook_type: int, callback: Callable, user_data: Any = None, begin: int = 1, end: int = 0, *args) -> HookRet: + """Intercept certain emulation events within a specified range. + + Args: + hook_type : event type to intercept; this argument is used as a bitmap and may encode multiple + events to hook with the same calback. see UC_HOOK_* constants for available events + callback : a method to call upon interception; callback signature may vary + depending on the hooked event type + user_data : an additional context to pass to callback (default: `None`) + begin : start of memory range to watch + end : end of memory range to watch + + Notes: + If `begin` and `end` are not specified, the entire memory space will be watched. + + Returns: + Hook handle + """ + hook = Hook(callback, user_data, begin, end) self._ql_hook(hook_type, hook, *args) return HookRet(self, hook_type, hook) - def hook_code(self, callback, user_data=None, begin=1, end=0): + def hook_code(self, callback: TraceHookCalback, user_data: Any = None, begin: int = 1, end: int = 0) -> HookRet: + """Intercept assembly instructions before they get executed. + + Args: + callback : a method to call upon interception + user_data : an additional context to pass to callback (default: `None`) + begin : start of memory range to watch + end : end of memory range to watch + + Notes: + If `begin` and `end` are not specified, the entire memory space will be watched. + + Returns: + Hook handle + """ + return self.ql_hook(UC_HOOK_CODE, callback, user_data, begin, end) + # TODO: remove; this is a special case of hook_intno(-1) def hook_intr(self, callback, user_data=None, begin=1, end=0): return self.ql_hook(UC_HOOK_INTR, callback, user_data, begin, end) - def hook_block(self, callback, user_data=None, begin=1, end=0): + def hook_block(self, callback: TraceHookCalback, user_data: Any = None, begin: int = 1, end: int = 0) -> HookRet: + """Intercept landings in new basic blocks in a specified range. + + Args: + callback : a method to call upon interception + user_data : an additional context to pass to callback (default: `None`) + begin : start of memory range to watch + end : end of memory range to watch + + Notes: + If `begin` and `end` are not specified, the entire memory space will be watched. + + Returns: + Hook handle + """ + return self.ql_hook(UC_HOOK_BLOCK, callback, user_data, begin, end) - def hook_mem_unmapped(self, callback, user_data=None, begin=1, end=0): + def hook_mem_unmapped(self, callback: MemHookCallback, user_data: Any = None, begin: int = 1, end: int = 0) -> HookRet: + """Intercept illegal accesses to unmapped memory in a specified range. + + Args: + callback : a method to call upon interception + user_data : an additional context to pass to callback (default: `None`) + begin : start of memory range to watch + end : end of memory range to watch + + Notes: + If `begin` and `end` are not specified, the entire memory space will be watched. + + Returns: + Hook handle + """ + return self.ql_hook(UC_HOOK_MEM_UNMAPPED, callback, user_data, begin, end) - def hook_mem_read_invalid(self, callback, user_data=None, begin=1, end=0): + def hook_mem_read_invalid(self, callback: MemHookCallback, user_data: Any = None, begin: int = 1, end: int = 0) -> HookRet: + """Intercept illegal reading attempts from a specified range. + + Args: + callback : a method to call upon interception + user_data : an additional context to pass to callback (default: `None`) + begin : start of memory range to watch + end : end of memory range to watch + + Notes: + If `begin` and `end` are not specified, the entire memory space will be watched. + + Returns: + Hook handle + """ + return self.ql_hook(UC_HOOK_MEM_READ_INVALID, callback, user_data, begin, end) - def hook_mem_write_invalid(self, callback, user_data=None, begin=1, end=0): + def hook_mem_write_invalid(self, callback: MemHookCallback, user_data: Any = None, begin: int = 1, end: int = 0) -> HookRet: + """Intercept illegal writing attempts to a specified range. + + Args: + callback : a method to call upon interception + user_data : an additional context to pass to callback (default: `None`) + begin : start of memory range to watch + end : end of memory range to watch + + Notes: + If `begin` and `end` are not specified, the entire memory space will be watched. + + Returns: + Hook handle + """ + return self.ql_hook(UC_HOOK_MEM_WRITE_INVALID, callback, user_data, begin, end) - def hook_mem_fetch_invalid(self, callback, user_data=None, begin=1, end=0): + def hook_mem_fetch_invalid(self, callback: MemHookCallback, user_data: Any = None, begin: int = 1, end: int = 0) -> HookRet: + """Intercept illegal code fetching attempts from a specified range. + + Args: + callback : a method to call upon interception + user_data : an additional context to pass to callback (default: `None`) + begin : start of memory range to watch + end : end of memory range to watch + + Notes: + If `begin` and `end` are not specified, the entire memory space will be watched. + + Returns: + Hook handle + """ + return self.ql_hook(UC_HOOK_MEM_FETCH_INVALID, callback, user_data, begin, end) - def hook_mem_valid(self, callback, user_data=None, begin=1, end=0): + def hook_mem_valid(self, callback: MemHookCallback, user_data: Any = None, begin: int = 1, end: int = 0) -> HookRet: + """Intercept benign memory accesses within a specified range. + This is equivalent to hooking memory reads, writes and fetches. + + Args: + callback : a method to call upon interception + user_data : an additional context to pass to callback (default: `None`) + begin : start of memory range to watch + end : end of memory range to watch + + Notes: + If `begin` and `end` are not specified, the entire memory space will be watched. + + Returns: + Hook handle + """ + return self.ql_hook(UC_HOOK_MEM_VALID, callback, user_data, begin, end) - def hook_mem_invalid(self, callback, user_data=None, begin=1, end=0): + def hook_mem_invalid(self, callback: MemHookCallback, user_data: Any = None, begin: int = 1, end: int = 0) -> HookRet: + """Intercept invalid memory accesses within a specified range. + This is equivalent to hooking invalid memory reads, writes and fetches. + + Args: + callback : a method to call upon interception + user_data : an additional context to pass to callback (default: `None`) + begin : start of memory range to watch + end : end of memory range to watch + + Notes: + If `begin` and `end` are not specified, the entire memory space will be watched. + + Returns: + Hook handle + """ + return self.ql_hook(UC_HOOK_MEM_INVALID, callback, user_data, begin, end) - # a convenient API to set callback for a single address - def hook_address(self, callback, address, user_data=None): + def hook_address(self, callback: AddressHookCallback, address: int, user_data: Any = None) -> HookRet: + """Intercept execution from a certain memory address. + + Args: + callback : a method to call upon interception + address : memory location to watch + user_data : an additional context to pass to callback (default: `None`) + + Returns: + Hook handle + """ + hook = HookAddr(callback, address, user_data) if address not in self._addr_hook_fuc: @@ -287,37 +524,114 @@ def hook_address(self, callback, address, user_data=None): self._addr_hook[address].append(hook) - return HookRet(self, None, hook) + # note: assuming 0 is not a valid hook type + return HookRet(self, 0, hook) - def get_hook_address(self, address): - return self._addr_hook.get(address, []) + def hook_intno(self, callback: InterruptHookCallback, intno: int, user_data: Any = None) -> HookRet: + """Intercept interrupts. + Args: + callback : a method to call upon interception + intono : interrupt vector number to intercept, or -1 for any + user_data : an additional context to pass to callback (default: `None`) + + Returns: + Hook handle + """ - def hook_intno(self, callback, intno, user_data=None): hook = HookIntr(callback, intno, user_data) self._ql_hook(UC_HOOK_INTR, hook) return HookRet(self, UC_HOOK_INTR, hook) - def hook_mem_read(self, callback, user_data=None, begin=1, end=0): + def hook_mem_read(self, callback: MemHookCallback, user_data: Any = None, begin: int = 1, end: int = 0) -> HookRet: + """Intercept benign memory reads from a specified range. + + Args: + callback : a method to call upon interception + user_data : an additional context to pass to callback (default: `None`) + begin : start of memory range to watch + end : end of memory range to watch + + Notes: + If `begin` and `end` are not specified, the entire memory space will be watched. + + Returns: + Hook handle + """ + return self.ql_hook(UC_HOOK_MEM_READ, callback, user_data, begin, end) - def hook_mem_write(self, callback, user_data=None, begin=1, end=0): + def hook_mem_write(self, callback: MemHookCallback, user_data: Any = None, begin: int = 1, end: int = 0) -> HookRet: + """Intercept benign memory writes to a specified range. + + Args: + callback : a method to call upon interception + user_data : an additional context to pass to callback (default: `None`) + begin : start of memory range to watch + end : end of memory range to watch + + Notes: + If `begin` and `end` are not specified, the entire memory space will be watched. + + Returns: + Hook handle + """ + return self.ql_hook(UC_HOOK_MEM_WRITE, callback, user_data, begin, end) - def hook_mem_fetch(self, callback, user_data=None, begin=1, end=0): + def hook_mem_fetch(self, callback: MemHookCallback, user_data: Any = None, begin: int = 1, end: int = 0) -> HookRet: + """Intercept benign code fetches from a specified range. + + Args: + callback : a method to call upon interception + user_data : an additional context to pass to callback (default: `None`) + begin : start of memory range to watch + end : end of memory range to watch + + Notes: + If `begin` and `end` are not specified, the entire memory space will be watched. + + Returns: + Hook handle + """ + return self.ql_hook(UC_HOOK_MEM_FETCH, callback, user_data, begin, end) - def hook_insn(self, callback, arg1, user_data=None, begin=1, end=0): - return self.ql_hook(UC_HOOK_INSN, callback, user_data, begin, end, arg1) + def hook_insn(self, callback, insn_type: int, user_data: Any = None, begin: int = 1, end: int = 0) -> HookRet: + """Intercept execution of a certain instruction type within a specified range. + + Args: + callback : a method to call upon interception; the callback arguments list differs + based on the instruction type + insn_type : instruction type to intercept + user_data : an additional context to pass to callback (default: `None`) + begin : start of memory range to watch + end : end of memory range to watch + + Notes: + - The set of supported instruction types is very limited and defined by unicorn. + - If `begin` and `end` are not specified, the entire memory space will be watched. + + Returns: + Hook handle + """ + + return self.ql_hook(UC_HOOK_INSN, callback, user_data, begin, end, insn_type) + + + def hook_del(self, hret: HookRet) -> None: + """Unregister an existing hook and release its resources. + Args: + hret : hook handle + """ - def hook_del(self, hret: HookRet): h = hret.obj hook_type = hret.type diff --git a/qiling/debugger/gdb/gdb.py b/qiling/debugger/gdb/gdb.py index a8aeceb18..f559a2a83 100644 --- a/qiling/debugger/gdb/gdb.py +++ b/qiling/debugger/gdb/gdb.py @@ -28,7 +28,7 @@ from qiling import Qiling from qiling.const import QL_ARCH, QL_ENDIAN, QL_OS from qiling.debugger import QlDebugger -from qiling.debugger.gdb import xmlregs +from qiling.debugger.gdb.xmlregs import QlGdbFeatures from qiling.debugger.gdb.utils import QlGdbUtils # gdb logging prompt @@ -98,7 +98,8 @@ def __init__(self, ql: Qiling, ip: str = '127.0.01', port: int = 9999): self.gdb = QlGdbUtils(ql, entry_point, exit_point) - self.regsmap = xmlregs.load_regsmap(self.ql.arch.type) + self.features = QlGdbFeatures(self.ql.arch.type, self.ql.os.type) + self.regsmap = self.features.regsmap def run(self): server = GdbSerialConn(self.ip, self.port, self.ql.log) @@ -451,22 +452,12 @@ def handle_q(subcmd: str) -> Reply: offset, length = (int(p, 16) for p in params.split(',')) if feature == 'features' and op == 'read': - xfercmd_abspath = os.path.dirname(os.path.abspath(__file__)) - xml_folder = self.ql.arch.type.name.lower() - xfercmd_file = os.path.join(xfercmd_abspath, 'xml', xml_folder, annex) - - if self.ql.os.type == QL_OS.WINDOWS: - self.ql.log.info(f'{PROMPT} Qiling does not support XML for this platform yet') - content = '' - - elif not os.path.exists(xfercmd_file): - self.ql.log.info(f'{PROMPT} XML file not found: "{xfercmd_file}"') - content = '' + if annex == r'target.xml': + content = self.features.tostring()[offset:offset + length] else: - with open(xfercmd_file, 'r') as f: - f.seek(offset, os.SEEK_SET) - content = f.read(length) + self.ql.log.info(f'{PROMPT} did not expect "{annex}" here') + content = '' return f'{"l" if len(content) < length else "m"}{content}' @@ -604,7 +595,7 @@ def handle_v(subcmd: str) -> Reply: if os.path.exists(host_path) and not path.startswith(r'/proc'): fd = os.open(host_path, flags, mode) - return f'F{fd}' + return f'F{fd:x}' elif op == 'pread': fd, count, offset = (int(p, 16) for p in params) diff --git a/qiling/debugger/gdb/xmlregs.py b/qiling/debugger/gdb/xmlregs.py index f872ee0fb..7385a303f 100644 --- a/qiling/debugger/gdb/xmlregs.py +++ b/qiling/debugger/gdb/xmlregs.py @@ -5,6 +5,7 @@ from typing import Iterator, Mapping, Optional, Sequence, Tuple from pathlib import PurePath +from xml.etree import ElementTree, ElementInclude from qiling.arch.arm_const import reg_map as arm_regs from qiling.arch.arm_const import reg_vfp as arm_regs_vfp @@ -12,7 +13,6 @@ from qiling.arch.arm64_const import reg_map_v as arm64_regs_v from qiling.arch.mips_const import reg_map as mips_regs_gpr from qiling.arch.mips_const import reg_map_fpu as mips_regs_fpu -from qiling.arch.x86_const import reg_map_16 as x86_regs_16 from qiling.arch.x86_const import reg_map_32 as x86_regs_32 from qiling.arch.x86_const import reg_map_64 as x86_regs_64 from qiling.arch.x86_const import reg_map_misc as x86_regs_misc @@ -21,92 +21,130 @@ from qiling.arch.x86_const import reg_map_xmm as x86_regs_xmm from qiling.arch.x86_const import reg_map_ymm as x86_regs_ymm -from qiling.const import QL_ARCH +from qiling.const import QL_ARCH, QL_OS RegEntry = Tuple[Optional[int], int, int] -# define a local dummy function to let us reference this module -__anchor__ = lambda x: x +class QlGdbFeatures: + def __init__(self, archtype: QL_ARCH, ostype: QL_OS): + xmltree = QlGdbFeatures.__load_target_xml(archtype, ostype) + regsmap = QlGdbFeatures.__load_regsmap(archtype, xmltree) -def __get_xml_path(archtype: QL_ARCH) -> Tuple[str, PurePath]: - import inspect + self.xmltree = xmltree + self.regsmap = regsmap - p = PurePath(inspect.getfile(__anchor__)) - basedir = p.parent / 'xml' / archtype.name.lower() - filename = basedir / 'target.xml' + def tostring(self) -> str: + root = self.xmltree.getroot() - return str(filename), basedir + return ElementTree.tostring(root, encoding='unicode', xml_declaration=True) -def __walk_xml_regs(filename: str, base_url: PurePath) -> Iterator[Tuple[int, str, int]]: - from xml.etree import ElementTree, ElementInclude + @staticmethod + def __get_xml_path(archtype: QL_ARCH) -> Tuple[str, PurePath]: + import inspect - tree = ElementTree.parse(filename) - root = tree.getroot() + p = PurePath(inspect.getfile(QlGdbFeatures)) + basedir = p.parent / 'xml' / archtype.name.lower() + filename = basedir / 'target.xml' - # NOTE: this is needed to load xinclude hrefs relative to the main xml file. starting - # from python 3.9 ElementInclude.include has an argument for that called 'base_url'. - # this is a workaround for earlier python versions such as 3.8 + return str(filename), basedir - def my_loader(base: PurePath): - def __wrapped(href: str, parse, encoding=None): - abshref = base / href + @staticmethod + def __load_target_xml(archtype: QL_ARCH, ostype: QL_OS) -> ElementTree.ElementTree: + filename, base_url = QlGdbFeatures.__get_xml_path(archtype) - return ElementInclude.default_loader(str(abshref), parse, encoding) + tree = ElementTree.parse(filename) - return __wrapped + # NOTE: this is needed to load xinclude hrefs relative to the main xml file. starting + # from python 3.9 ElementInclude.include has an argument for that called 'base_url'. + # this is a workaround for earlier python versions such as 3.8 - ElementInclude.include(root, loader=my_loader(base_url)) + # + def my_loader(base: PurePath): + def __wrapped(href: str, parse, encoding=None): + abshref = base / href - regnum = -1 + return ElementInclude.default_loader(str(abshref), parse, encoding) - for reg in root.iter('reg'): - # if regnum is not specified, assume it follows the previous one - regnum = int(reg.get('regnum', regnum + 1)) + return __wrapped + # - name = reg.attrib['name'] - bitsize = reg.attrib['bitsize'] + # inline all xi:include elements + ElementInclude.include(tree.getroot(), loader=my_loader(base_url)) - yield regnum, name, int(bitsize) + # patch xml osabi element with the appropriate abi tag + osabi = tree.find('osabi') -def load_regsmap(archtype: QL_ARCH) -> Sequence[RegEntry]: - """Initialize registers map using available target XML files. + if osabi is not None: + # NOTE: the 'Windows' abi tag is supported starting from gdb 10. + # earlier gdb versions use 'Cygwin' instead - Args: - archtype: target architecture type + abitag = { + QL_OS.LINUX : 'GNU/Linux', + QL_OS.FREEBSD : 'FreeBSD', + QL_OS.MACOS : 'Darwin', + QL_OS.WINDOWS : 'Windows', + QL_OS.UEFI : 'Windows', + QL_OS.DOS : 'Windows', + QL_OS.QNX : 'QNX-Neutrino' + }.get(ostype, 'unknown') - Returns: a list representing registers data - """ + osabi.text = abitag - # retreive the relevant set of registers; their order of appearance is not - # important as it is determined by the info read from the xml files - ucregs: Mapping[str, int] = { - QL_ARCH.A8086 : dict(**x86_regs_32, **x86_regs_misc, **x86_regs_cr, **x86_regs_st), - QL_ARCH.X86 : dict(**x86_regs_32, **x86_regs_misc, **x86_regs_cr, **x86_regs_st, **x86_regs_xmm), - QL_ARCH.X8664 : dict(**x86_regs_64, **x86_regs_misc, **x86_regs_cr, **x86_regs_st, **x86_regs_xmm, **x86_regs_ymm), - QL_ARCH.ARM : dict(**arm_regs, **arm_regs_vfp), - QL_ARCH.CORTEX_M : arm_regs, - QL_ARCH.ARM64 : dict(**arm64_regs, **arm64_regs_v), - QL_ARCH.MIPS : dict(**mips_regs_gpr, **mips_regs_fpu) - }[archtype] + return tree - xmlpath = __get_xml_path(archtype) - regsinfo = sorted(__walk_xml_regs(*xmlpath)) + @staticmethod + def __walk_xml_regs(xmltree: ElementTree.ElementTree) -> Iterator[Tuple[int, str, int]]: + regnum = -1 - # pre-allocate regmap and occupy it with null entries - last_regnum = regsinfo[-1][0] - regmap: Sequence[RegEntry] = [(None, 0, 0)] * (last_regnum + 1) + for reg in xmltree.iter('reg'): + # if regnum is not specified, assume it follows the previous one + regnum = int(reg.get('regnum', regnum + 1)) - pos = 0 + name = reg.attrib['name'] + bitsize = reg.attrib['bitsize'] - for regnum, name, bitsize in sorted(regsinfo): - # reg value size in nibbles - nibbles = bitsize // 4 + yield regnum, name, int(bitsize) - regmap[regnum] = (ucregs.get(name), pos, nibbles) + @staticmethod + def __load_regsmap(archtype: QL_ARCH, xmltree: ElementTree.ElementTree) -> Sequence[RegEntry]: + """Initialize registers map using available target XML files. - # value position of next reg - pos += nibbles + Args: + archtype: target architecture type - return regmap + Returns: a list representing registers data + """ -__all__ = ['RegEntry', 'load_regsmap'] + # retreive the relevant set of registers; their order of appearance is not + # important as it is determined by the info read from the xml files + ucregs: Mapping[str, int] = { + QL_ARCH.A8086 : dict(**x86_regs_32, **x86_regs_misc, **x86_regs_cr, **x86_regs_st), + QL_ARCH.X86 : dict(**x86_regs_32, **x86_regs_misc, **x86_regs_cr, **x86_regs_st, **x86_regs_xmm), + QL_ARCH.X8664 : dict(**x86_regs_64, **x86_regs_misc, **x86_regs_cr, **x86_regs_st, **x86_regs_xmm, **x86_regs_ymm), + QL_ARCH.ARM : dict(**arm_regs, **arm_regs_vfp), + QL_ARCH.CORTEX_M : arm_regs, + QL_ARCH.ARM64 : dict(**arm64_regs, **arm64_regs_v), + QL_ARCH.MIPS : dict(**mips_regs_gpr, **mips_regs_fpu) + }[archtype] + + regsinfo = sorted(QlGdbFeatures.__walk_xml_regs(xmltree)) + + # pre-allocate regmap and occupy it with null entries + last_regnum = regsinfo[-1][0] + regmap: Sequence[RegEntry] = [(None, 0, 0)] * (last_regnum + 1) + + pos = 0 + + for regnum, name, bitsize in sorted(regsinfo): + # reg value size in nibbles + nibbles = bitsize // 4 + + regmap[regnum] = (ucregs.get(name), pos, nibbles) + + # value position of next reg + pos += nibbles + + return regmap + + +__all__ = ['RegEntry', 'QlGdbFeatures'] diff --git a/qiling/debugger/qdb/qdb.py b/qiling/debugger/qdb/qdb.py index ef9fc0758..d8810ff29 100644 --- a/qiling/debugger/qdb/qdb.py +++ b/qiling/debugger/qdb/qdb.py @@ -353,7 +353,9 @@ def do_show(self, *args) -> None: show some runtime information """ - self.ql.mem.show_mapinfo() + for info_line in self.ql.mem.get_formatted_mapinfo(): + self.ql.log.info(info_line) + qdb_print(QDB_MSG.INFO, f"Breakpoints: {[hex(addr) for addr in self.bp_list.keys()]}") if self.rr: qdb_print(QDB_MSG.INFO, f"Snapshots: {len([st for st in self.rr.layers if isinstance(st, self.rr.DiffedState)])}") diff --git a/qiling/loader/elf.py b/qiling/loader/elf.py index a1be0ef63..ada002294 100644 --- a/qiling/loader/elf.py +++ b/qiling/loader/elf.py @@ -368,7 +368,7 @@ def __push_str(top: int, s: str) -> int: _vsyscall_addr = int(self.profile.get('vsyscall_address'), 0) _vsyscall_size = int(self.profile.get('vsyscall_size'), 0) - if not self.ql.mem.is_mapped(_vsyscall_addr, _vsyscall_size): + if self.ql.mem.is_available(_vsyscall_addr, _vsyscall_size): # initialize with int3 instructions then insert syscall entry # each syscall should be 1KiB away self.ql.mem.map(_vsyscall_addr, _vsyscall_size, info="[vsyscall]") diff --git a/qiling/loader/pe.py b/qiling/loader/pe.py index 5366623f8..1b05e6d13 100644 --- a/qiling/loader/pe.py +++ b/qiling/loader/pe.py @@ -31,10 +31,10 @@ class QlPeCacheEntry(NamedTuple): class QlPeCache: @staticmethod def cache_filename(path: str) -> str: - dirname, basename = ntpath.split(path) + dirname, basename = os.path.split(path) # canonicalize basename while preserving the path - path = ntpath.join(dirname, basename.casefold()) + path = os.path.join(dirname, basename.casefold()) return f'{path}.cache2' diff --git a/qiling/os/memory.py b/qiling/os/memory.py index 2354f74f9..228d07d0a 100644 --- a/qiling/os/memory.py +++ b/qiling/os/memory.py @@ -4,7 +4,7 @@ # import os, re -from typing import Any, Callable, List, Mapping, MutableSequence, Optional, Sequence, Tuple +from typing import Any, Callable, Iterator, List, Mapping, MutableSequence, Optional, Pattern, Sequence, Tuple, Union from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL @@ -127,8 +127,8 @@ def del_mapinfo(self, mem_s: int, mem_e: int): self.map_info = tmp_map_info - def change_mapinfo(self, mem_s: int, mem_e: int, mem_p: int = None, mem_info: str = None): - tmp_map_info: MapInfoEntry = None + def change_mapinfo(self, mem_s: int, mem_e: int, mem_p: Optional[int] = None, mem_info: Optional[str] = None): + tmp_map_info: Optional[MapInfoEntry] = None info_idx: int = None for idx, map_info in enumerate(self.map_info): @@ -149,12 +149,12 @@ def change_mapinfo(self, mem_s: int, mem_e: int, mem_p: int = None, mem_info: st if mem_info is not None: self.map_info[info_idx] = (tmp_map_info[0], tmp_map_info[1], tmp_map_info[2], mem_info, tmp_map_info[4]) - def get_mapinfo(self) -> Sequence[Tuple[int, int, str, str, Optional[str]]]: + def get_mapinfo(self) -> Sequence[Tuple[int, int, str, str, str]]: """Get memory map info. Returns: A sequence of 5-tuples representing the memory map entries. Each tuple contains range start, range end, permissions, range label and path of - containing image (or None if not contained by any image) + containing image (or an empty string if not contained by any image) """ def __perms_mapping(ps: int) -> str: @@ -166,21 +166,21 @@ def __perms_mapping(ps: int) -> str: return ''.join(val if idx & ps else '-' for idx, val in perms_d.items()) - def __process(lbound: int, ubound: int, perms: int, label: str, is_mmio: bool) -> Tuple[int, int, str, str, Optional[str]]: + def __process(lbound: int, ubound: int, perms: int, label: str, is_mmio: bool) -> Tuple[int, int, str, str, str]: perms_str = __perms_mapping(perms) if hasattr(self.ql, 'loader'): image = self.ql.loader.find_containing_image(lbound) - container = image.path if image and not is_mmio else None + container = image.path if image and not is_mmio else '' else: - container = None + container = '' return (lbound, ubound, perms_str, label, container) return tuple(__process(*entry) for entry in self.map_info) - def show_mapinfo(self): - """Emit memory map info in a nicely formatted table. + def get_formatted_mapinfo(self) -> Sequence[str]: + """Get memory map info in a nicely formatted table. """ mapinfo = self.get_mapinfo() @@ -192,12 +192,17 @@ def show_mapinfo(self): len_addr = max(grouped[0]) len_label = max(grouped[1]) - # emit title row - self.ql.log.info(f'{"Start":{len_addr}s} {"End":{len_addr}s} {"Perm":5s} {"Label":{len_label}s} {"Image"}') + # pre-allocate table + table = [''] * (len(mapinfo) + 1) - # emit table rows - for lbound, ubound, perms, label, container in mapinfo: - self.ql.log.info(f'{lbound:0{len_addr}x} - {ubound:0{len_addr}x} {perms:5s} {label:{len_label}s} {container or ""}') + # add title row + table[0] = f'{"Start":{len_addr}s} {"End":{len_addr}s} {"Perm":5s} {"Label":{len_label}s} {"Image"}' + + # add table rows + for i, (lbound, ubound, perms, label, container) in enumerate(mapinfo, 1): + table[i] = f'{lbound:0{len_addr}x} - {ubound:0{len_addr}x} {perms:5s} {label:{len_label}s} {container}' + + return table # TODO: relying on the label string is risky; find a more reliable method def get_lib_base(self, filename: str) -> Optional[int]: @@ -210,7 +215,7 @@ def get_lib_base(self, filename: str) -> Optional[int]: return next((lbound for lbound, info in stripped if os.path.basename(info) == filename), None) - def align(self, value: int, alignment: int = None) -> int: + def align(self, value: int, alignment: Optional[int] = None) -> int: """Align a value down to the specified alignment boundary. If `value` is already aligned, the same value is returned. Commonly used to determine the base address of the enclosing page. @@ -232,7 +237,7 @@ def align(self, value: int, alignment: int = None) -> int: # round down to nearest alignment return value & ~(alignment - 1) - def align_up(self, value: int, alignment: int = None) -> int: + def align_up(self, value: int, alignment: Optional[int] = None) -> int: """Align a value up to the specified alignment boundary. If `value` is already aligned, the same value is returned. Commonly used to determine the end address of the enlosing page. @@ -280,7 +285,7 @@ def restore(self, mem_dict): self.ql.log.debug(f'restoring memory range: {lbound:#08x} {ubound:#08x} {label}') size = ubound - lbound - if not self.is_mapped(lbound, size): + if self.is_available(lbound, size): self.ql.log.debug(f'mapping {lbound:#08x} {ubound:#08x}, mapsize = {size:#x}') self.map(lbound, size, perms, label) @@ -305,13 +310,13 @@ def read(self, addr: int, size: int) -> bytearray: return self.ql.uc.mem_read(addr, size) - def read_ptr(self, addr: int, size: int=None) -> int: + def read_ptr(self, addr: int, size: int = 0) -> int: """Read an integer value from a memory address. Bytes read will be unpacked using emulated architecture properties. Args: addr: memory address to read - size: pointer size (in bytes): either 1, 2, 4, 8, or None for arch native size + size: pointer size (in bytes): either 1, 2, 4, 8, or 0 for arch native size Returns: integer value stored at the specified memory address """ @@ -341,14 +346,14 @@ def write(self, addr: int, data: bytes) -> None: self.ql.uc.mem_write(addr, data) - def write_ptr(self, addr: int, value: int, size: int=None) -> None: + def write_ptr(self, addr: int, value: int, size: int = 0) -> None: """Write an integer value to a memory address. Bytes written will be packed using emulated architecture properties. Args: addr: target memory address value: integer value to write - size: pointer size (in bytes): either 1, 2, 4, 8, or None for arch native size + size: pointer size (in bytes): either 1, 2, 4, 8, or 0 for arch native size """ if not size: @@ -366,18 +371,18 @@ def write_ptr(self, addr: int, value: int, size: int=None) -> None: self.write(addr, __pack(value)) - def search(self, needle: bytes, begin: int = None, end: int = None) -> Sequence[int]: + def search(self, needle: Union[bytes, Pattern[bytes]], begin: Optional[int] = None, end: Optional[int] = None) -> Sequence[int]: """Search for a sequence of bytes in memory. Args: - needle: bytes sequence to look for + needle: bytes sequence or regex pattern to look for begin: search starting address (or None to start at lowest avaiable address) end: search ending address (or None to end at highest avaiable address) Returns: addresses of all matches """ - # if starting point not set, search from the first mapped region + # if starting point not set, search from the first mapped region if begin is None: begin = self.map_info[0][0] @@ -391,6 +396,10 @@ def search(self, needle: bytes, begin: int = None, end: int = None) -> Sequence[ ranges = [(max(begin, lbound), min(ubound, end)) for lbound, ubound, _, _, is_mmio in self.map_info if not (end < lbound or ubound < begin or is_mmio)] results = [] + # if needle is a bytes sequence use it verbatim, not as a pattern + if type(needle) is bytes: + needle = re.escape(needle) + for lbound, ubound in ranges: haystack = self.read(lbound, ubound - lbound) local_results = (match.start(0) + lbound for match in re.finditer(needle, haystack)) @@ -413,16 +422,41 @@ def unmap(self, addr: int, size: int) -> None: if (addr, addr + size) in self.mmio_cbs: del self.mmio_cbs[(addr, addr+size)] - def unmap_all(self): + def unmap_all(self) -> None: """Reclaim the entire memory space. """ for begin, end, _ in self.ql.uc.mem_regions(): self.unmap(begin, end - begin + 1) + def __mapped_regions(self) -> Iterator[Tuple[int, int]]: + """Iterate through all mapped memory regions, consolidating adjacent regions + together to a continuous one. Protection bits and labels are ignored. + """ + + if not self.map_info: + return + + iter_memmap = iter(self.map_info) + + p_lbound, p_ubound, _, _, _ = next(iter_memmap) + + # map_info is assumed to contain non-overlapping regions sorted by lbound + for lbound, ubound, _, _, _ in iter_memmap: + if lbound == p_ubound: + p_ubound = ubound + else: + yield (p_lbound, p_ubound) + + p_lbound = lbound + p_ubound = ubound + + yield (p_lbound, p_ubound) + + def is_available(self, addr: int, size: int) -> bool: """Query whether the memory range starting at `addr` and is of length of `size` bytes - can be allocated. + is available for allocation. Returns: True if it can be allocated, False otherwise """ @@ -433,37 +467,23 @@ def is_available(self, addr: int, size: int) -> bool: end = addr + size # make sure neither begin nor end are enclosed within a mapped range, or entirely enclosing one - return not any((lbound <= begin < ubound) or (lbound < end <= ubound) or (begin <= lbound < ubound <= end) for lbound, ubound, _, _, _ in self.map_info) + return not any((lbound <= begin < ubound) or (lbound < end <= ubound) or (begin <= lbound < ubound <= end) for lbound, ubound in self.__mapped_regions()) def is_mapped(self, addr: int, size: int) -> bool: """Query whether the memory range starting at `addr` and is of length of `size` bytes - is mapped, either partially or entirely. + is fully mapped. - Returns: True if any part of the specified memory range is taken, False otherwise + Returns: True if the specified memory range is taken fully, False otherwise """ - return not self.is_available(addr, size) - - def is_free(self, address, size): - ''' - The main function of is_free first must fufull is_mapped condition. - then, check for is the mapped range empty, either fill with 0xFF or 0x00 - Returns true if mapped range is empty else return Flase - If not not mapped, map it and return true - ''' - if self.is_mapped(address, size) == True: - address_end = (address + size) - while address < address_end: - mem_read = self.ql.mem.read(address, 0x1) - if (mem_read[0] != 0x00) and (mem_read[0] != 0xFF): - return False - address += 1 - return True - else: - return True - - - def find_free_space(self, size: int, minaddr: int = None, maxaddr: int = None, align: int = None) -> int: + assert size > 0, 'expected a positive size value' + + begin = addr + end = addr + size + + return any((lbound <= begin < end <= ubound) for lbound, ubound in self.__mapped_regions()) + + def find_free_space(self, size: int, minaddr: Optional[int] = None, maxaddr: Optional[int] = None, align: Optional[int] = None) -> int: """Locate an unallocated memory that is large enough to contain a range in size of `size` and based at `minaddr`. @@ -508,7 +528,7 @@ def find_free_space(self, size: int, minaddr: int = None, maxaddr: int = None, a raise QlOutOfMemory('Out Of Memory') - def map_anywhere(self, size: int, minaddr: int = None, maxaddr: int = None, align: int = None, perms: int = UC_PROT_ALL, info: str = None) -> int: + def map_anywhere(self, size: int, minaddr: Optional[int] = None, maxaddr: Optional[int] = None, align: Optional[int] = None, perms: int = UC_PROT_ALL, info: Optional[str] = None) -> int: """Map a region anywhere in memory. Args: @@ -543,7 +563,7 @@ def protect(self, addr: int, size: int, perms): self.change_mapinfo(aligned_address, aligned_address + aligned_size, mem_p = perms) - def map(self, addr: int, size: int, perms: int = UC_PROT_ALL, info: str = None): + def map(self, addr: int, size: int, perms: int = UC_PROT_ALL, info: Optional[str] = None): """Map a new memory range. Args: @@ -722,7 +742,7 @@ def clear(self): self.current_alloc = 0 self.current_use = 0 - def _find(self, addr: int, inuse: bool = None) -> Optional[Chunk]: + def _find(self, addr: int, inuse: Optional[bool] = None) -> Optional[Chunk]: """Find a chunk starting at a specified address. Args: diff --git a/qiling/os/os.py b/qiling/os/os.py index 0b4a3c3a5..93c4c012e 100644 --- a/qiling/os/os.py +++ b/qiling/os/os.py @@ -252,5 +252,6 @@ def emu_error(self): finally: self.ql.log.error(f'PC = {pc:#0{self.ql.arch.pointersize * 2 + 2}x}{pc_info}\n') - self.ql.log.info(f'Memory map:') - self.ql.mem.show_mapinfo() + self.ql.log.error(f'Memory map:') + for info_line in self.ql.mem.get_formatted_mapinfo(): + self.ql.log.error(info_line) diff --git a/qiling/os/stats.py b/qiling/os/stats.py index 704304ad8..0755872fe 100644 --- a/qiling/os/stats.py +++ b/qiling/os/stats.py @@ -45,12 +45,6 @@ def summary(self) -> List[str]: for key, values in self.strings.items(): ret.append(f'{key}: {", ".join(str(word) for word in values)}') - ret.extend(QlOsStats._banner('registry keys accessed')) - - for key, values in self.syscalls.items(): - ret.append(f'{key}:') - ret.extend(f' {json.dumps(value):s}' for value in values) - return ret def log_api_call(self, address: int, name: str, params: Mapping, retval: Any, retaddr: int) -> None: @@ -107,7 +101,7 @@ def summary(self) -> List[str]: ret.extend(QlOsStats._banner('registry keys accessed')) - for key, values in self.syscalls.items(): + for key, values in self.registry.items(): ret.append(f'{key}:') ret.extend(f' {json.dumps(value):s}' for value in values) diff --git a/qiling/os/uefi/uefi.py b/qiling/os/uefi/uefi.py index fe55cc0f3..67163e90b 100644 --- a/qiling/os/uefi/uefi.py +++ b/qiling/os/uefi/uefi.py @@ -197,7 +197,9 @@ def emu_error(self): self.emit_stack() self.ql.log.error(f'Memory map:') - self.ql.mem.show_mapinfo() + for info_line in self.ql.mem.get_formatted_mapinfo(): + self.ql.log.error(info_line) + def set_api(self, target: str, handler: Callable, intercept: QL_INTERCEPT = QL_INTERCEPT.CALL): super().set_api(f'hook_{target}', handler, intercept) diff --git a/qiling/os/windows/dlls/kernel32/fileapi.py b/qiling/os/windows/dlls/kernel32/fileapi.py index 729eeee98..0ccf37e0e 100644 --- a/qiling/os/windows/dlls/kernel32/fileapi.py +++ b/qiling/os/windows/dlls/kernel32/fileapi.py @@ -26,18 +26,13 @@ def hook_GetFileType(ql: Qiling, address: int, params): hFile = params["hFile"] - if hFile in (STD_INPUT_HANDLE, STD_OUTPUT_HANDLE, STD_ERROR_HANDLE): - ret = FILE_TYPE_CHAR - else: - obj = ql.os.handle_manager.get(hFile) + handle = ql.os.handle_manager.get(hFile) - if obj is None: - raise QlErrorNotImplemented("API not implemented") - else: - # technically is not always a type_char but.. almost - ret = FILE_TYPE_CHAR + if handle is None: + raise QlErrorNotImplemented("API not implemented") - return ret + # technically is not always a type_char but.. almost + return FILE_TYPE_CHAR # HANDLE FindFirstFileA( # LPCSTR lpFileName, @@ -156,28 +151,16 @@ def hook_ReadFile(ql: Qiling, address: int, params): nNumberOfBytesToRead = params["nNumberOfBytesToRead"] lpNumberOfBytesRead = params["lpNumberOfBytesRead"] - if hFile == STD_INPUT_HANDLE: - if ql.os.automatize_input: - # TODO maybe insert a good random generation input - s = (b"A" * (nNumberOfBytesToRead - 1)) + b"\x00" - else: - ql.log.debug("Insert input") - s = ql.os.stdin.read(nNumberOfBytesToRead) + handle = ql.os.handle_manager.get(hFile) - slen = len(s) - read_len = slen + if handle is None: + ql.os.last_error = ERROR_INVALID_HANDLE + return 0 - if slen > nNumberOfBytesToRead: - s = s[:nNumberOfBytesToRead] - read_len = nNumberOfBytesToRead + data = handle.obj.read(nNumberOfBytesToRead) - ql.mem.write(lpBuffer, s) - ql.mem.write_ptr(lpNumberOfBytesRead, read_len, 4) - else: - f = ql.os.handle_manager.get(hFile).obj - data = f.read(nNumberOfBytesToRead) - ql.mem.write(lpBuffer, data) - ql.mem.write_ptr(lpNumberOfBytesRead, len(data), 4) + ql.mem.write(lpBuffer, data) + ql.mem.write_ptr(lpNumberOfBytesRead, len(data), 4) return 1 @@ -201,24 +184,20 @@ def hook_WriteFile(ql: Qiling, address: int, params): nNumberOfBytesToWrite = params["nNumberOfBytesToWrite"] lpNumberOfBytesWritten = params["lpNumberOfBytesWritten"] + handle = ql.os.handle_manager.get(hFile) + + if handle is None: + ql.os.last_error = ERROR_INVALID_HANDLE + return 0 + + fobj = handle.obj + data = ql.mem.read(lpBuffer, nNumberOfBytesToWrite) + if hFile == STD_OUTPUT_HANDLE: - s = ql.mem.read(lpBuffer, nNumberOfBytesToWrite) - ql.os.stdout.write(s) - ql.os.stats.log_string(s.decode()) - ql.mem.write_ptr(lpNumberOfBytesWritten, nNumberOfBytesToWrite, 4) - else: - f = ql.os.handle_manager.get(hFile) - - if f is None: - # Invalid handle - ql.os.last_error = ERROR_INVALID_HANDLE - return 0 - else: - f = f.obj - - buffer = ql.mem.read(lpBuffer, nNumberOfBytesToWrite) - nNumberOfBytesWritten = f.write(bytes(buffer)) - ql.mem.write_ptr(lpNumberOfBytesWritten, nNumberOfBytesWritten, 4) + ql.os.stats.log_string(data.decode()) + + written = fobj.write(bytes(data)) + ql.mem.write_ptr(lpNumberOfBytesWritten, written, 4) return 1 diff --git a/qiling/os/windows/handle.py b/qiling/os/windows/handle.py index b5592ec10..b5776c961 100644 --- a/qiling/os/windows/handle.py +++ b/qiling/os/windows/handle.py @@ -6,6 +6,8 @@ from typing import Any, MutableMapping, Optional +from qiling.os.windows.const import STD_ERROR_HANDLE, STD_INPUT_HANDLE, STD_OUTPUT_HANDLE + class Handle: ID = 0xa0000000 @@ -26,9 +28,9 @@ def __eq__(self, other: 'Handle'): class HandleManager: # IO - STD_INPUT_HANDLE = Handle(id=0xfffffff6) - STD_OUTPUT_HANDLE = Handle(id=0xfffffff5) - STD_ERROR_HANDLE = Handle(id=0xfffffff4) + STDIN = Handle(id=STD_INPUT_HANDLE) + STDOUT = Handle(id=STD_OUTPUT_HANDLE) + STDERR = Handle(id=STD_ERROR_HANDLE) # Register HKEY_CLASSES_ROOT = Handle(id=0x80000000) @@ -44,9 +46,9 @@ class HandleManager: def __init__(self): self.handles: MutableMapping[int, Handle] = {} - self.append(HandleManager.STD_INPUT_HANDLE) - self.append(HandleManager.STD_OUTPUT_HANDLE) - self.append(HandleManager.STD_ERROR_HANDLE) + self.append(HandleManager.STDIN) + self.append(HandleManager.STDOUT) + self.append(HandleManager.STDERR) def append(self, handle: Handle) -> None: self.handles[handle.id] = handle diff --git a/qiling/os/windows/windows.py b/qiling/os/windows/windows.py index e9d2d0b0b..a0e615ac2 100644 --- a/qiling/os/windows/windows.py +++ b/qiling/os/windows/windows.py @@ -4,7 +4,7 @@ # import ntpath -from typing import Callable +from typing import Callable, TextIO from unicorn import UcError @@ -13,7 +13,7 @@ from qiling.arch.x86_utils import GDTManager, SegmentManager86, SegmentManager64 from qiling.cc import intel from qiling.const import QL_ARCH, QL_OS, QL_INTERCEPT -from qiling.exception import QlErrorSyscallError, QlErrorSyscallNotFound +from qiling.exception import QlErrorSyscallError, QlErrorSyscallNotFound, QlMemoryMappedError from qiling.os.fcall import QlFunctionCall from qiling.os.memory import QlMemoryHeap from qiling.os.os import QlOs @@ -88,11 +88,43 @@ def __make_fcall_selector(atype: QL_ARCH) -> Callable[[int], QlFunctionCall]: self.argv = self.ql.argv self.env = self.ql.env self.pid = self.profile.getint('KERNEL', 'pid') - self.automatize_input = self.profile.getboolean("MISC","automatize_input") self.services = {} self.load() + # only after handle manager has been set up we can assign the standard streams + self.stdin = self._stdin + self.stdout = self._stdout + self.stderr = self._stderr + + + @QlOs.stdin.setter + def stdin(self, stream: TextIO) -> None: + self._stdin = stream + + handle = self.handle_manager.get(const.STD_INPUT_HANDLE) + assert handle is not None + + handle.obj = stream + + @QlOs.stdout.setter + def stdout(self, stream: TextIO) -> None: + self._stdout = stream + + handle = self.handle_manager.get(const.STD_OUTPUT_HANDLE) + assert handle is not None + + handle.obj = stream + + @QlOs.stderr.setter + def stderr(self, stream: TextIO) -> None: + self._stderr = stream + + handle = self.handle_manager.get(const.STD_ERROR_HANDLE) + assert handle is not None + + handle.obj = stream + def load(self): self.setupGDT() @@ -116,11 +148,15 @@ def setupGDT(self): segm.setup_fs(FS_SEGMENT_ADDR, FS_SEGMENT_SIZE) segm.setup_gs(GS_SEGMENT_ADDR, GS_SEGMENT_SIZE) - if not self.ql.mem.is_mapped(FS_SEGMENT_ADDR, FS_SEGMENT_SIZE): - self.ql.mem.map(FS_SEGMENT_ADDR, FS_SEGMENT_SIZE, info='[FS]') + if not self.ql.mem.is_available(FS_SEGMENT_ADDR, FS_SEGMENT_SIZE): + raise QlMemoryMappedError('cannot map FS segment, memory location is taken') + + self.ql.mem.map(FS_SEGMENT_ADDR, FS_SEGMENT_SIZE, info='[FS]') + + if not self.ql.mem.is_available(GS_SEGMENT_ADDR, GS_SEGMENT_SIZE): + raise QlMemoryMappedError('cannot map GS segment, memory location is taken') - if not self.ql.mem.is_mapped(GS_SEGMENT_ADDR, GS_SEGMENT_SIZE): - self.ql.mem.map(GS_SEGMENT_ADDR, GS_SEGMENT_SIZE, info='[GS]') + self.ql.mem.map(GS_SEGMENT_ADDR, GS_SEGMENT_SIZE, info='[GS]') def __setup_components(self): diff --git a/qiling/profiles/windows.ql b/qiling/profiles/windows.ql index d0ab9a384..17488330a 100644 --- a/qiling/profiles/windows.ql +++ b/qiling/profiles/windows.ql @@ -40,7 +40,6 @@ split = False # maily for multiple times Ql run with one file # usage: append = test1 append = -automatize_input = False current_path = C:\ [SYSTEM] diff --git a/tests/test_elf.py b/tests/test_elf.py index 8ce4712a4..7eb18fecc 100644 --- a/tests/test_elf.py +++ b/tests/test_elf.py @@ -93,6 +93,8 @@ def dump(ql): ql.run(begin=hook_address) del ql + os.remove(snapshot) + def test_elf_linux_x86_snapshot_restore_reg(self): self._test_elf_linux_x86_snapshot_restore_common(reg=True, ctx=False) @@ -1119,28 +1121,28 @@ def test_memory_search(self): ql.mem.write(0x1FFB, b"\x1f\x00\x07\x53\x03\x06\x07\x1f\x1b") # Needle not in haystack - self.assertEqual([], ql.mem.search(re.escape(b"\x3a\x01\x0b\x03\x53\x29\x1b\x1c\x04\x0d\x11"))) + self.assertEqual([], ql.mem.search(b"\x3a\x01\x0b\x03\x53\x29\x1b\x1c\x04\x0d\x11")) # Needle appears several times in haystack - self.assertEqual([0x1000 + 24, 0x2000 + 38, 0x3000 + 24], ql.mem.search(re.escape(b"\x4f\x53\x06\x0d\x1e\x0d\x1a"))) + self.assertEqual([0x1000 + 24, 0x2000 + 38, 0x3000 + 24], ql.mem.search(b"\x4f\x53\x06\x0d\x1e\x0d\x1a")) # Needle inside haystack - self.assertEqual([0x1000 + 13], ql.mem.search(re.escape(b"\x0f\x01\x1e\x0d\x53\x11\x07\x1d\x53\x1d\x18"), begin=0x1000 + 10, end=0x1000 + 30)) + self.assertEqual([0x1000 + 13], ql.mem.search(b"\x0f\x01\x1e\x0d\x53\x11\x07\x1d\x53\x1d\x18", begin=0x1000 + 10, end=0x1000 + 30)) # Needle before haystack - self.assertEqual([], ql.mem.search(re.escape(b"\x04\x0d\x1c\x53\x11\x07\x1d\x53\x0c\x07\x1f\x06"), begin=0x1337)) + self.assertEqual([], ql.mem.search(b"\x04\x0d\x1c\x53\x11\x07\x1d\x53\x0c\x07\x1f\x06", begin=0x1337)) # Needle after haystack - self.assertEqual([], ql.mem.search(re.escape(b"\x1b\x09\x11\x53\x0f\x07\x07\x0c\x0a\x11\x0d"), end=0x3000 + 13)) + self.assertEqual([], ql.mem.search(b"\x1b\x09\x11\x53\x0f\x07\x07\x0c\x0a\x11\x0d", end=0x3000 + 13)) # Needle exactly inside haystack - self.assertEqual([0x2000 + 13], ql.mem.search(re.escape(b"\x1a\x1d\x06\x53\x09\x1a\x07\x1d\x06\x0c"), begin=0x2000 + 13, end=0x2000 + 23)) + self.assertEqual([0x2000 + 13], ql.mem.search(b"\x1a\x1d\x06\x53\x09\x1a\x07\x1d\x06\x0c", begin=0x2000 + 13, end=0x2000 + 23)) # Needle 'tears' two mapped regions - self.assertEqual([], ql.mem.search(re.escape(b"\x1f\x00\x07\x53\x03\x06\x07\x1f\x1b"), begin=0x1F00, end=0x200F)) + self.assertEqual([], ql.mem.search(b"\x1f\x00\x07\x53\x03\x06\x07\x1f\x1b", begin=0x1F00, end=0x200F)) # Needle is a regex - self.assertEqual([0x1000 + 11, 0x2000 + 11, 0x3000 + 43], ql.mem.search(b"\x09\x53(\x0f|\x1a|\x04)[^\x0d]")) + self.assertEqual([0x1000 + 11, 0x2000 + 11, 0x3000 + 43], ql.mem.search(re.compile(b"\x09\x53(\x0f|\x1a|\x04)[^\x0d]"))) del ql diff --git a/tests/test_elf_multithread.py b/tests/test_elf_multithread.py index 5e0e338a6..482a30256 100644 --- a/tests/test_elf_multithread.py +++ b/tests/test_elf_multithread.py @@ -29,9 +29,6 @@ def test_elf_linux_execve_x8664(self): def test_elf_linux_cloexec_x8664(self): - with open('../examples/rootfs/x8664_linux/testfile', 'wb') as f: - f.write(b'0123456789') - ql = Qiling(["../examples/rootfs/x8664_linux/bin/x8664_cloexec_test"], "../examples/rootfs/x8664_linux", verbose=QL_VERBOSE.DEBUG, diff --git a/tests/test_pathutils.py b/tests/test_pathutils.py index cf9edbe9c..de56694ed 100644 --- a/tests/test_pathutils.py +++ b/tests/test_pathutils.py @@ -28,11 +28,8 @@ def posix_to_native(rootfs: str, cwd: str, path: str) -> str: class TestPathUtils(unittest.TestCase): + @unittest.skipUnless(is_posix_host, 'POSIX host only') def test_convert_nt_to_posix(self): - # test only on a POSIX host - if not is_posix_host: - self.skipTest('POSIX host only') - rootfs = PurePosixPath(r'../examples/rootfs/x86_windows') expected = str(realpath(rootfs) / 'test') @@ -74,11 +71,8 @@ def test_convert_nt_to_posix(self): self.assertEqual(expected, nt_to_native(str(rootfs), 'C:\\Windows\\System32\\drivers', '..\\..\\test')) self.assertEqual(expected, nt_to_native(str(rootfs), 'C:\\Windows\\System32', '..\\xxxx\\..\\test')) + @unittest.skipUnless(is_nt_host, 'NT host only') def test_convert_posix_to_nt(self): - # test only on a Windows host - if not is_nt_host: - self.skipTest('NT host only') - rootfs = PureWindowsPath(r'../examples/rootfs/x86_linux') expected = str(realpath(rootfs) / 'test')