diff --git a/qiling/core.py b/qiling/core.py index 321c72b4e..704efae3c 100644 --- a/qiling/core.py +++ b/qiling/core.py @@ -499,18 +499,20 @@ def stop_options(self) -> QL_STOP: """ return self._stop_options - def __enable_bin_patch(self): - - for addr, code in self.patch_bin: - self.mem.write(self.loader.load_address + addr, code) + def do_bin_patch(self): + ba = self.loader.load_address + for offset, code in self.patch_bin: + self.mem.write(ba + offset, code) - def enable_lib_patch(self): - for addr, code, filename in self.patch_lib: - try: - self.mem.write(self.mem.get_lib_base(filename) + addr, code) - except: - raise RuntimeError("Fail to patch %s at address 0x%x" % (filename, addr)) + def do_lib_patch(self): + for offset, code, filename in self.patch_lib: + ba = self.mem.get_lib_base(filename) + + if ba is None: + raise RuntimeError(f'Patch failed: there is no loaded library named "{filename}"') + + self.mem.write(ba + offset, code) def _init_stop_guard(self): if not self.stop_options: @@ -572,7 +574,7 @@ def run(self, begin=None, end=None, timeout=0, count=0, code=None): debugger = debugger_setup(self._debugger, self) # patch binary - self.__enable_bin_patch() + self.do_bin_patch() if self.baremetal: if self.count <= 0: diff --git a/qiling/loader/elf.py b/qiling/loader/elf.py index 77dba9ffc..a68bc6288 100644 --- a/qiling/loader/elf.py +++ b/qiling/loader/elf.py @@ -146,41 +146,7 @@ def seg_perm_to_uc_prot(perm: int) -> int: return prot - @staticmethod - def align(value: int, alignment: int) -> int: - """Align a value down to the specified alignment boundary. If `value` is already - aligned, the same value is returned. Commonly used to determine the base address - of the enclosing page. - - Args: - value: numberic value to align - alignment: alignment boundary; must be a power of 2 - - Returns: - Value aligned down to boundary - """ - - return value & ~(alignment - 1) - - @staticmethod - def align_up(value: int, alignment: int) -> int: - """Align a value up to the specified alignment boundary. If `value` is already - aligned, the same value is returned. Commonly used to determine the end address - of the enlosing page. - - Args: - value: numberic value to align - alignment: alignment boundary; must be a power of 2 - - Returns: - Value aligned up to boundary - """ - - return (value + alignment - 1) & ~(alignment - 1) - def load_with_ld(self, elffile: ELFFile, stack_addr: int, load_address: int, argv: Sequence[str] = [], env: Mapping[str, str] = {}): - pagesize = 0x1000 - # get list of loadable segments; these segments will be loaded to memory seg_pt_load = tuple(seg for seg in elffile.iter_segments() if seg['p_type'] == 'PT_LOAD') @@ -192,8 +158,8 @@ def load_with_ld(self, elffile: ELFFile, stack_addr: int, load_address: int, arg # iterate over loadable segments by vaddr for seg in sorted(seg_pt_load, key=lambda s: s['p_vaddr']): - lbound = QlLoaderELF.align(load_address + seg['p_vaddr'], pagesize) - ubound = QlLoaderELF.align_up(load_address + seg['p_vaddr'] + seg['p_memsz'], pagesize) + lbound = self.ql.mem.align(load_address + seg['p_vaddr']) + ubound = self.ql.mem.align_up(load_address + seg['p_vaddr'] + seg['p_memsz']) perms = QlLoaderELF.seg_perm_to_uc_prot(seg['p_flags']) if load_regions: @@ -233,7 +199,7 @@ def load_with_ld(self, elffile: ELFFile, stack_addr: int, load_address: int, arg # map the memory regions for lbound, ubound, perms in load_regions: try: - self.ql.mem.map(lbound, ubound - lbound, perms, info=self.path) + self.ql.mem.map(lbound, ubound - lbound, perms, info=os.path.basename(self.path)) except QlMemoryMappedError: self.ql.log.exception(f'Failed to map {lbound:#x}-{ubound:#x}') else: @@ -249,12 +215,15 @@ def load_with_ld(self, elffile: ELFFile, stack_addr: int, load_address: int, arg mem_start = min(seg['p_vaddr'] for seg in seg_pt_load) mem_end = max(seg['p_vaddr'] + seg['p_memsz'] for seg in seg_pt_load) - mem_start = QlLoaderELF.align(mem_start, pagesize) - mem_end = QlLoaderELF.align_up(mem_end, pagesize) + mem_start = self.ql.mem.align(mem_start) + mem_end = self.ql.mem.align_up(mem_end) self.ql.log.debug(f'mem_start : {mem_start:#x}') self.ql.log.debug(f'mem_end : {mem_end:#x}') + # by convention the loaded binary is first on the list + self.images.append(Image(load_address + mem_start, load_address + mem_end, os.path.abspath(self.path))) + # note: 0x2000 is the size of [hook_mem] self.brk_address = load_address + mem_end + 0x2000 @@ -280,11 +249,14 @@ def load_with_ld(self, elffile: ELFFile, stack_addr: int, load_address: int, arg # determine memory size needed for interpreter interp_mem_size = max((seg['p_vaddr'] + seg['p_memsz']) for seg in interp_seg_pt_load) - interp_mem_size = QlLoaderELF.align_up(interp_mem_size, pagesize) + interp_mem_size = self.ql.mem.align_up(interp_mem_size) self.ql.log.debug(f'Interpreter size: {interp_mem_size:#x}') # map memory for interpreter - self.ql.mem.map(interp_address, interp_mem_size, info=os.path.abspath(interp_local_path)) + self.ql.mem.map(interp_address, interp_mem_size, info=os.path.basename(interp_local_path)) + + # add interpreter to the loaded images list + self.images.append(Image(interp_address, interp_address + interp_mem_size, os.path.abspath(interp_local_path))) # load interpterter segments data to memory for seg in interp_seg_pt_load: @@ -314,7 +286,7 @@ def __push_str(top: int, s: str) -> int: """ data = (s if isinstance(s, bytes) else s.encode("utf-8")) + b'\x00' - top = QlLoaderELF.align(top - len(data), self.ql.arch.pointersize) + top = self.ql.mem.align(top - len(data), self.ql.arch.pointersize) self.ql.mem.write(top, data) return top @@ -364,7 +336,7 @@ def __push_str(top: int, s: str) -> int: (AUX.AT_PHDR, elf_phdr + mem_start), (AUX.AT_PHENT, elf_phent), (AUX.AT_PHNUM, elf_phnum), - (AUX.AT_PAGESZ, pagesize), + (AUX.AT_PAGESZ, self.ql.mem.pagesize), (AUX.AT_BASE, interp_address), (AUX.AT_FLAGS, 0), (AUX.AT_ENTRY, elf_entry), @@ -386,7 +358,7 @@ def __push_str(top: int, s: str) -> int: for key, val in aux_entries: elf_table.extend(self.ql.pack(key) + self.ql.pack(val)) - new_stack = QlLoaderELF.align(new_stack - len(elf_table), 0x10) + new_stack = self.ql.mem.align(new_stack - len(elf_table), 0x10) self.ql.mem.write(new_stack, bytes(elf_table)) # if enabled, gdb would need to retrieve aux vector data. @@ -397,7 +369,6 @@ def __push_str(top: int, s: str) -> int: self.elf_entry = elf_entry self.stack_address = new_stack self.load_address = load_address - self.images.append(Image(load_address, load_address + mem_end, self.path)) self.init_sp = self.ql.arch.regs.arch_sp self.ql.os.entry_point = self.entry_point = entry_point @@ -511,7 +482,7 @@ def __get_symbol(name: str) -> Optional[Symbol]: # we need to lookup from address to symbol, so we can find the right callback # for sys_xxx handler for syscall, the address must be aligned to pointer size if symbol_name.startswith('sys_'): - self.ql.os.hook_addr = QlLoaderELF.align_up(self.ql.os.hook_addr, self.ql.arch.pointersize) + self.ql.os.hook_addr = self.ql.mem.align_up(self.ql.os.hook_addr, self.ql.arch.pointersize) self.import_symbols[self.ql.os.hook_addr] = symbol_name @@ -635,7 +606,7 @@ def load_driver(self, elffile: ELFFile, stack_addr: int, loadbase: int = 0) -> N self.ql.os.entry_point = self.entry_point = entry_point self.elf_entry = self.ql.os.elf_entry = self.ql.os.entry_point - self.stack_address = QlLoaderELF.align(stack_addr, self.ql.arch.pointersize) + self.stack_address = self.ql.mem.align(stack_addr, self.ql.arch.pointersize) self.load_address = loadbase # remember address of syscall table, so external tools can access to it diff --git a/qiling/loader/loader.py b/qiling/loader/loader.py index 2afe1a138..472881cbf 100644 --- a/qiling/loader/loader.py +++ b/qiling/loader/loader.py @@ -3,7 +3,7 @@ # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # -from typing import Any, Mapping, MutableSequence, NamedTuple +from typing import Any, Mapping, MutableSequence, NamedTuple, Optional from qiling import Qiling @@ -17,6 +17,14 @@ def __init__(self, ql: Qiling): self.images: MutableSequence[Image] = [] self.skip_exit_check = False + def find_containing_image(self, address: int) -> Optional[Image]: + """Retrieve the image object that contains the specified address. + + Returns: image containing the specified address, or `None` if not found + """ + + return next((image for image in self.images if image.base <= address < image.end), None) + def save(self) -> Mapping[str, Any]: saved_state = { 'images': [tuple(img) for img in self.images] diff --git a/qiling/loader/pe_uefi.py b/qiling/loader/pe_uefi.py index 9414ebf64..a20bf9444 100644 --- a/qiling/loader/pe_uefi.py +++ b/qiling/loader/pe_uefi.py @@ -119,7 +119,7 @@ def map_and_load(self, path: str, context: UefiContext, exec_now: bool=False): self.install_loaded_image_protocol(image_base, image_size) - # this would be used later be os.find_containing_image + # this would be used later be loader.find_containing_image self.images.append(Image(image_base, image_base + image_size, path)) # update next memory slot to allow sequencial loading. its availability diff --git a/qiling/os/freebsd/freebsd.py b/qiling/os/freebsd/freebsd.py index 800d31f07..6b368466e 100644 --- a/qiling/os/freebsd/freebsd.py +++ b/qiling/os/freebsd/freebsd.py @@ -44,7 +44,7 @@ def run(self): else: if self.ql.loader.elf_entry != self.ql.loader.entry_point: self.ql.emu_start(self.ql.loader.entry_point, self.ql.loader.elf_entry, self.ql.timeout) - self.ql.enable_lib_patch() + self.ql.do_lib_patch() self.ql.emu_start(self.ql.loader.elf_entry, self.exit_point, self.ql.timeout, self.ql.count) diff --git a/qiling/os/linux/linux.py b/qiling/os/linux/linux.py index 75b4f5813..21ab6d5dd 100644 --- a/qiling/os/linux/linux.py +++ b/qiling/os/linux/linux.py @@ -149,7 +149,7 @@ def run(self): if self.ql.arch.type == QL_ARCH.ARM and entry_address & 1 == 1: entry_address -= 1 self.ql.emu_start(self.ql.loader.entry_point, entry_address, self.ql.timeout) - self.ql.enable_lib_patch() + self.ql.do_lib_patch() self.run_function_after_load() self.ql.loader.skip_exit_check = False self.ql.write_exit_trap() diff --git a/qiling/os/linux/thread.py b/qiling/os/linux/thread.py index 32d15a028..04a6c1be2 100644 --- a/qiling/os/linux/thread.py +++ b/qiling/os/linux/thread.py @@ -586,7 +586,7 @@ def _prepare_lib_patch(self): if self.ql.arch.regs.arch_pc != entry_address: self.ql.log.error(f"{self.cur_thread} Expect {hex(self.ql.loader.elf_entry)} but get {hex(self.ql.arch.regs.arch_pc)} when running loader.") raise QlErrorExecutionStop('Dynamic library .init() failed!') - self.ql.enable_lib_patch() + self.ql.do_lib_patch() self.ql.os.run_function_after_load() self.ql.loader.skip_exit_check = False self.ql.write_exit_trap() diff --git a/qiling/os/memory.py b/qiling/os/memory.py index 394b121c2..843059534 100644 --- a/qiling/os/memory.py +++ b/qiling/os/memory.py @@ -170,8 +170,8 @@ def __perms_mapping(ps: int) -> str: def __process(lbound: int, ubound: int, perms: int, label: str, is_mmio: bool) -> Tuple[int, int, str, str, Optional[str]]: perms_str = __perms_mapping(perms) - if hasattr(self.ql, 'os'): - image = self.ql.os.find_containing_image(lbound) + if hasattr(self.ql, 'loader'): + image = self.ql.loader.find_containing_image(lbound) container = image.path if image and not is_mmio else None else: container = None @@ -184,26 +184,44 @@ def show_mapinfo(self): """Emit memory map info in a nicely formatted table. """ + mapinfo = self.get_mapinfo() + + # determine columns sizes based on the longest value for each field + lengths = ((len(f'{ubound:#x}'), len(label)) for _, ubound, _, label, _ in mapinfo) + grouped = tuple(zip(*lengths)) + + len_addr = max(grouped[0]) + len_label = max(grouped[1]) + # emit title row - self.ql.log.info(f'{"Start":8s} {"End":8s} {"Perm":5s} {"Label":12s} {"Image"}') + self.ql.log.info(f'{"Start":{len_addr}s} {"End":{len_addr}s} {"Perm":5s} {"Label":{len_label}s} {"Image"}') # emit table rows - for lbound, ubound, perms, label, container in self.get_mapinfo(): - self.ql.log.info(f'{lbound:08x} - {ubound:08x} {perms:5s} {label:12s} {container or ""}') + for lbound, ubound, perms, label, container in mapinfo: + self.ql.log.info(f'{lbound:0{len_addr}x} - {ubound:0{len_addr}x} {perms:5s} {label:{len_label}s} {container or ""}') # TODO: relying on the label string is risky; find a more reliable method - def get_lib_base(self, filename: str) -> int: - return next((s for s, _, _, info, _ in self.map_info if os.path.split(info)[1] == filename), -1) + def get_lib_base(self, filename: str) -> Optional[int]: + # regex pattern to capture boxed labels prefixes + p = re.compile(r'^\[.+\]\s*') + + # some info labels may be prefixed by boxed label which breaks the search by basename. + # iterate through all info labels and remove all boxed prefixes, if any + stripped = ((lbound, p.sub('', info)) for lbound, _, _, info, _ in self.map_info) + + return next((lbound for lbound, info in stripped if os.path.basename(info) == filename), None) def align(self, value: int, alignment: int = None) -> int: - """Align a value down to the specified alignment boundary. + """Align a value down to the specified alignment boundary. If `value` is already + aligned, the same value is returned. Commonly used to determine the base address + of the enclosing page. Args: value: a value to align - alignment: alignment boundary; must be a power of 2. if not specified - value will be aligned to page size + alignment: alignment boundary; must be a power of 2. if not specified value + will be aligned to page size - Returns: aligned value + Returns: value aligned down to boundary """ if alignment is None: @@ -216,14 +234,16 @@ def align(self, value: int, alignment: int = None) -> int: return value & ~(alignment - 1) def align_up(self, value: int, alignment: int = None) -> int: - """Align a value up to the specified alignment boundary. + """Align a value up to the specified alignment boundary. If `value` is already + aligned, the same value is returned. Commonly used to determine the end address + of the enlosing page. Args: value: value to align - alignment: alignment boundary; must be a power of 2. if not specified - value will be aligned to page size + alignment: alignment boundary; must be a power of 2. if not specified value + will be aligned to page size - Returns: aligned value + Returns: value aligned up to boundary """ if alignment is None: diff --git a/qiling/os/os.py b/qiling/os/os.py index 7654933c7..a06dacb46 100644 --- a/qiling/os/os.py +++ b/qiling/os/os.py @@ -222,11 +222,6 @@ def set_api(self, api_name: str, intercept_function: Callable, intercept: QL_INT else: self.add_function_hook(api_name, intercept_function, intercept) - def find_containing_image(self, pc: int): - for image in self.ql.loader.images: - if image.base <= pc < image.end: - return image - # os main method; derivatives must implement one of their own def run(self) -> None: raise NotImplementedError @@ -256,7 +251,7 @@ def emu_error(self): self.ql.log.error('Disassembly:') self.ql.arch.utils.disassembler(self.ql, pc, 64) - containing_image = self.find_containing_image(pc) + containing_image = self.ql.loader.find_containing_image(pc) pc_info = f' ({containing_image.path} + {pc - containing_image.base:#x})' if containing_image else '' finally: self.ql.log.error(f'PC = {pc:#0{self.ql.arch.pointersize * 2 + 2}x}{pc_info}\n') diff --git a/qiling/os/posix/syscall/mman.py b/qiling/os/posix/syscall/mman.py index cf7585e9c..0a09f1f8e 100755 --- a/qiling/os/posix/syscall/mman.py +++ b/qiling/os/posix/syscall/mman.py @@ -3,7 +3,7 @@ # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # -from unicorn import UC_PROT_ALL +import os from qiling import Qiling from qiling.exception import QlMemoryMappedError @@ -135,7 +135,7 @@ def syscall_mmap_impl(ql: Qiling, addr: int, mlen: int, prot: int, flags: int, f ql.log.debug("mem write : " + hex(len(data))) ql.log.debug("mem mmap : " + mem_info) - ql.mem.change_mapinfo(mmap_base, mmap_base + mmap_size, mem_info=("[%s] " % api_name) + mem_info) + ql.mem.change_mapinfo(mmap_base, mmap_base + mmap_size, mem_info=f'[{api_name}] {os.path.basename(mem_info)}') try: ql.mem.write(mmap_base, data) except Exception as e: diff --git a/qiling/os/uefi/uefi.py b/qiling/os/uefi/uefi.py index 709000dbb..3923a3c26 100644 --- a/qiling/os/uefi/uefi.py +++ b/qiling/os/uefi/uefi.py @@ -185,7 +185,7 @@ def emu_error(self): self.emit_hexdump(pc, data) self.emit_disasm(pc, data) - containing_image = self.find_containing_image(pc) + containing_image = self.ql.loader.find_containing_image(pc) pc_info = f' ({containing_image.path} + {pc - containing_image.base:#x})' if containing_image else '' finally: self.ql.log.error(f'PC = {pc:#010x}{pc_info}') diff --git a/qiling/utils.py b/qiling/utils.py index 1952fe70e..fc246c27b 100644 --- a/qiling/utils.py +++ b/qiling/utils.py @@ -502,9 +502,10 @@ def os_setup(ostype: QL_OS, ql): def profile_setup(ql, ostype: QL_OS, filename: Optional[str]): ql.log.debug(f'Profile: {filename or "default"}') - if ql.baremetal: + # mcu uses a yaml-based config + if ostype == QL_OS.MCU: if filename: - with open(filename) as f: + with open(filename) as f: config = yaml.load(f, Loader=yaml.Loader) else: config = {} diff --git a/qltool b/qltool index 266dc0ba2..5467c0dd0 100755 --- a/qltool +++ b/qltool @@ -226,9 +226,6 @@ if __name__ == '__main__': if options.subcommand == 'examples': handle_examples(parser) - if options.debug_stop and options.verbose not in (QL_VERBOSE.DEBUG, QL_VERBOSE.DUMP): - parser.error('the debug_stop option requires verbose to be set to either "debug" or "dump"') - # ql file setup if options.subcommand == 'run': ql = handle_run(options) @@ -247,6 +244,9 @@ if __name__ == '__main__': ql.debugger = argval if options.debug_stop: + if options.verbose not in (QL_VERBOSE.DEBUG, QL_VERBOSE.DUMP): + parser.error('the debug_stop option requires verbose to be set to either "debug" or "dump"') + ql.debug_stop = True if options.root: diff --git a/tests/test_peshellcode.py b/tests/test_peshellcode.py index 2d6ade205..72ed657f4 100644 --- a/tests/test_peshellcode.py +++ b/tests/test_peshellcode.py @@ -5,36 +5,42 @@ import sys, unittest -from binascii import unhexlify - sys.path.append("..") -from qiling import * -from qiling.exception import * -from qiling.const import QL_VERBOSE +from qiling import Qiling -X86_WIN = unhexlify( - 'fce8820000006089e531c0648b50308b520c8b52148b72280fb74a2631ffac3c617c022c20c1cf0d01c7e2f252578b52108b4a3c8b4c1178e34801d1518b592001d38b4918e33a498b348b01d631ffacc1cf0d01c738e075f6037df83b7d2475e4588b582401d3668b0c4b8b581c01d38b048b01d0894424245b5b61595a51ffe05f5f5a8b12eb8d5d6a01eb2668318b6f87ffd5bbf0b5a25668a695bd9dffd53c067c0a80fbe07505bb4713726f6a0053ffd5e8d5ffffff63616c6300' -) +X86_WIN = bytes.fromhex(''' + fce8820000006089e531c0648b50308b520c8b52148b72280fb74a2631ffac3c + 617c022c20c1cf0d01c7e2f252578b52108b4a3c8b4c1178e34801d1518b5920 + 01d38b4918e33a498b348b01d631ffacc1cf0d01c738e075f6037df83b7d2475 + e4588b582401d3668b0c4b8b581c01d38b048b01d0894424245b5b61595a51ff + e05f5f5a8b12eb8d5d6a01eb2668318b6f87ffd5bbf0b5a25668a695bd9dffd5 + 3c067c0a80fbe07505bb4713726f6a0053ffd5e8d5ffffff63616c6300 +''') -X8664_WIN = unhexlify( - 'fc4881e4f0ffffffe8d0000000415141505251564831d265488b52603e488b52183e488b52203e488b72503e480fb74a4a4d31c94831c0ac3c617c022c2041c1c90d4101c1e2ed5241513e488b52203e8b423c4801d03e8b80880000004885c0746f4801d0503e8b48183e448b40204901d0e35c48ffc93e418b34884801d64d31c94831c0ac41c1c90d4101c138e075f13e4c034c24084539d175d6583e448b40244901d0663e418b0c483e448b401c4901d03e418b04884801d0415841585e595a41584159415a4883ec204152ffe05841595a3e488b12e949ffffff5d49c7c1000000003e488d95fe0000003e4c8d850f0100004831c941ba45835607ffd54831c941baf0b5a256ffd548656c6c6f2c2066726f6d204d534621004d657373616765426f7800' -) +X8664_WIN = bytes.fromhex(''' + fc4881e4f0ffffffe8d0000000415141505251564831d265488b52603e488b52 + 183e488b52203e488b72503e480fb74a4a4d31c94831c0ac3c617c022c2041c1 + c90d4101c1e2ed5241513e488b52203e8b423c4801d03e8b80880000004885c0 + 746f4801d0503e8b48183e448b40204901d0e35c48ffc93e418b34884801d64d + 31c94831c0ac41c1c90d4101c138e075f13e4c034c24084539d175d6583e448b + 40244901d0663e418b0c483e448b401c4901d03e418b04884801d0415841585e + 595a41584159415a4883ec204152ffe05841595a3e488b12e949ffffff5d49c7 + c1000000003e488d95fe0000003e4c8d850f0100004831c941ba45835607ffd5 + 4831c941baf0b5a256ffd548656c6c6f2c2066726f6d204d534621004d657373 + 616765426f7800 +''') -POINTER_TEST = unhexlify( - '1122334455667788' -) +POINTER_TEST = bytes.fromhex('1122334455667788') class PEShellcodeTest(unittest.TestCase): def test_windowssc_x86(self): - ql = Qiling(code=X86_WIN, archtype="x86", ostype="windows", rootfs="../examples/rootfs/x86_windows", - verbose=QL_VERBOSE.DEFAULT) + ql = Qiling(code=X86_WIN, archtype="x86", ostype="windows", rootfs="../examples/rootfs/x86_windows") ql.run() del ql def test_windowssc_x64(self): - ql = Qiling(code=X8664_WIN, archtype="x8664", ostype="windows", rootfs="../examples/rootfs/x8664_windows", - verbose=QL_VERBOSE.DEBUG) + ql = Qiling(code=X8664_WIN, archtype="x8664", ostype="windows", rootfs="../examples/rootfs/x8664_windows") ql.run() del ql @@ -50,7 +56,7 @@ def test_read_ptr32(self): del ql def test_read_ptr64(self): - ql = Qiling(code=POINTER_TEST, archtype="x8664", ostype="windows", rootfs="../examples/rootfs/x86_windows") + ql = Qiling(code=POINTER_TEST, archtype="x8664", ostype="windows", rootfs="../examples/rootfs/x8664_windows") addr = ql.loader.entry_point self.assertEqual(0x11, ql.mem.read_ptr(addr, 1))