diff --git a/examples/uboot_bin.ql b/examples/uboot_bin.ql index 1402374e1..b7f7216c8 100644 --- a/examples/uboot_bin.ql +++ b/examples/uboot_bin.ql @@ -4,17 +4,5 @@ entry_point = 0x80800000 heap_size = 0x300000 -[LOG] -# log directory output -# usage: dir = qlog -dir = -# split log file, use with multithread -split = False - - [MISC] -# append string into different logs -# maily for multiple times Ql run with one file -# usage: append = test1 -append = current_path = / diff --git a/qiling/loader/elf.py b/qiling/loader/elf.py index ada002294..fba93a018 100644 --- a/qiling/loader/elf.py +++ b/qiling/loader/elf.py @@ -15,7 +15,6 @@ from elftools.elf.relocation import RelocationHandler from elftools.elf.sections import Symbol, SymbolTableSection from elftools.elf.descriptions import describe_reloc_type -from elftools.elf.segments import InterpSegment from unicorn.unicorn_const import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC from qiling import Qiling @@ -100,7 +99,7 @@ def run(self): # is it a driver? if elftype == 'ET_REL': - self.load_driver(elffile, stack_address + stack_size) + self.load_driver(elffile, stack_address + stack_size, loadbase=0x8000000) self.ql.hook_code(hook_kernel_api) # is it an executable? @@ -271,7 +270,7 @@ def __push_str(top: int, s: str) -> int: Top of stack remains aligned to pointer size """ - data = (s if isinstance(s, bytes) else s.encode("utf-8")) + b'\x00' + data = s.encode('utf-8') + b'\x00' top = self.ql.mem.align(top - len(data), self.ql.arch.pointersize) self.ql.mem.write(top, data) @@ -365,28 +364,31 @@ def __push_str(top: int, s: str) -> int: # map vsyscall section for some specific needs if self.ql.arch.type == QL_ARCH.X8664 and self.ql.os.type == QL_OS.LINUX: - _vsyscall_addr = int(self.profile.get('vsyscall_address'), 0) - _vsyscall_size = int(self.profile.get('vsyscall_size'), 0) + vsyscall_addr = self.profile.getint('vsyscall_address') - if self.ql.mem.is_available(_vsyscall_addr, _vsyscall_size): + vsyscall_ids = ( + SYSCALL_NR.gettimeofday, + SYSCALL_NR.time, + SYSCALL_NR.getcpu + ) + + # each syscall should be 1KiB away + entry_size = 1024 + vsyscall_size = self.ql.mem.align_up(len(vsyscall_ids) * entry_size) + + if self.ql.mem.is_available(vsyscall_addr, vsyscall_size): # initialize with int3 instructions then insert syscall entry - # each syscall should be 1KiB away - self.ql.mem.map(_vsyscall_addr, _vsyscall_size, info="[vsyscall]") - self.ql.mem.write(_vsyscall_addr, _vsyscall_size * b'\xcc') + self.ql.mem.map(vsyscall_addr, vsyscall_size, info="[vsyscall]") assembler = self.ql.arch.assembler def __assemble(asm: str) -> bytes: bs, _ = assembler.asm(asm) return bytes(bs) - _vsyscall_ids = ( - SYSCALL_NR.gettimeofday, - SYSCALL_NR.time, - SYSCALL_NR.getcpu - ) + for i, scid in enumerate(vsyscall_ids): + entry = __assemble(f'mov rax, {scid:#x}; syscall; ret') - for i, scid in enumerate(_vsyscall_ids): - self.ql.mem.write(_vsyscall_addr + i * 1024, __assemble(f'mov rax, {scid:#x}; syscall; ret')) + self.ql.mem.write(vsyscall_addr + i * entry_size, entry.ljust(entry_size, b'\xcc')) def lkm_get_init(self, elffile: ELFFile) -> int: """Get file offset of the init_module function. @@ -400,7 +402,7 @@ def lkm_get_init(self, elffile: ELFFile) -> int: if syms: sym = syms[0] addr = sym['st_value'] + elffile.get_section(sym['st_shndx'])['sh_offset'] - self.ql.log.info(f'init_module = {addr:#x}') + return addr raise QlErrorELFFormat('invalid module: symbol init_module not found') @@ -479,6 +481,10 @@ def __get_symbol(name: str) -> Optional[Symbol]: rev_reloc_symbols[symbol_name] = self.ql.os.hook_addr sym_offset = self.ql.os.hook_addr - mem_start self.ql.os.hook_addr += self.ql.arch.pointersize + + elif _symbol['st_shndx'] == 'SHN_ABS': + rev_reloc_symbols[symbol_name] = _symbol['st_value'] + else: # local symbol _section = elffile.get_section(_symbol['st_shndx']) @@ -521,12 +527,12 @@ def __get_symbol(name: str) -> Optional[Symbol]: elif desc in ('R_386_PC32', 'R_386_PLT32'): val = ql.mem.read_ptr(loc, 4) - val = rev_reloc_symbols[symbol_name] + val - loc + val += rev_reloc_symbols[symbol_name] - loc ql.mem.write_ptr(loc, (val & 0xFFFFFFFF), 4) elif desc in ('R_386_32', 'R_MIPS_32'): val = ql.mem.read_ptr(loc, 4) - val = rev_reloc_symbols[symbol_name] + val + val += rev_reloc_symbols[symbol_name] ql.mem.write_ptr(loc, (val & 0xFFFFFFFF), 4) elif desc == 'R_MIPS_HI16': @@ -551,23 +557,9 @@ def __get_symbol(name: str) -> Optional[Symbol]: def load_driver(self, elffile: ELFFile, stack_addr: int, loadbase: int = 0) -> None: elfdata_mapping = self.get_elfdata_mapping(elffile) - # Determine the range of memory space opened up - # mem_start = -1 - # mem_end = -1 - # - # for i in super().parse_program_header(ql): - # if i['p_type'] == PT_LOAD: - # if mem_start > i['p_vaddr'] or mem_start == -1: - # mem_start = i['p_vaddr'] - # if mem_end < i['p_vaddr'] + i['p_memsz'] or mem_end == -1: - # mem_end = i['p_vaddr'] + i['p_memsz'] - # - # mem_start = int(mem_start // 0x1000) * 0x1000 - # mem_end = int(mem_end // 0x1000 + 1) * 0x1000 - - # FIXME - mem_start = 0x1000 - mem_end = mem_start + (len(elfdata_mapping) // 0x1000 + 1) * 0x1000 + # FIXME: determine true memory boundaries, taking relocation into account (if requested) + mem_start = 0 + mem_end = mem_start + self.ql.mem.align_up(len(elfdata_mapping), 0x1000) # map some memory to intercept external functions of Linux kernel self.ql.mem.map(API_HOOK_MEM, 0x1000, info="[api_mem]") @@ -579,15 +571,17 @@ def load_driver(self, elffile: ELFFile, stack_addr: int, loadbase: int = 0) -> N self.ql.mem.map(loadbase + mem_start, mem_end - mem_start, info=self.ql.path) self.ql.mem.write(loadbase + mem_start, elfdata_mapping) - entry_point = self.lkm_get_init(elffile) + loadbase + mem_start + init_module = self.lkm_get_init(elffile) + loadbase + mem_start + self.ql.log.debug(f'init_module : {init_module:#x}') + self.brk_address = mem_end + loadbase # Set MMAP addr - mmap_address = int(self.profile.get('mmap_address'), 0) + mmap_address = self.profile.getint('mmap_address') self.ql.log.debug(f'mmap_address is : {mmap_address:#x}') # self.ql.os.elf_entry = self.elf_entry = loadbase + elfhead['e_entry'] - self.ql.os.entry_point = self.entry_point = entry_point + self.ql.os.entry_point = self.entry_point = init_module self.elf_entry = self.ql.os.elf_entry = self.ql.os.entry_point self.stack_address = self.ql.mem.align(stack_addr, self.ql.arch.pointersize) @@ -625,6 +619,25 @@ def load_driver(self, elffile: ELFFile, stack_addr: int, loadbase: int = 0) -> N self.import_symbols[self.ql.os.hook_addr + 2 * self.ql.arch.pointersize] = hook_sys_open def get_elfdata_mapping(self, elffile: ELFFile) -> bytes: + # from io import BytesIO + # + # rh = RelocationHandler(elffile) + # + # for sec in elffile.iter_sections(): + # rs = rh.find_relocations_for_section(sec) + # + # if rs is not None: + # ss = BytesIO(sec.data()) + # rh.apply_section_relocations(ss, rs) + # + # # apply changes to stream + # elffile.stream.seek(sec['sh_offset']) + # elffile.stream.write(ss.getbuffer()) + # + # TODO: need to patch hooked symbols with their hook targets + # (e.g. replace calls to 'printk' with the hooked address that + # was allocate for it) + elfdata_mapping = bytearray() # pick up elf header @@ -634,7 +647,15 @@ def get_elfdata_mapping(self, elffile: ELFFile) -> bytes: elfdata_mapping.extend(elf_header) - # pick up loadable sections and relocate them if needed + # FIXME: normally the address of a section would be determined by its 'sh_addr' value. + # in case of a relocatable object all its sections' sh_addr will be set to zero, so + # the value in 'sh_offset' should be used to determine the final address. + # see: https://refspecs.linuxbase.org/elf/gabi4+/ch4.sheader.html + # + # here we presume this a relocatable object and don't do any relocation (that is, it + # is relocated to 0) + + # pick up loadable sections for sec in elffile.iter_sections(): if sec['sh_flags'] & SH_FLAGS.SHF_ALLOC: # pad aggregated elf data to the offset of the current section diff --git a/qiling/os/mapper.py b/qiling/os/mapper.py index c5b2914ae..8c0a3ec22 100644 --- a/qiling/os/mapper.py +++ b/qiling/os/mapper.py @@ -103,6 +103,9 @@ def open_ql_file(self, path: str, openflags: int, openmode: int): host_path = self.path.virtual_to_host_path(path) + if not self.path.is_safe_host_path(host_path): + raise PermissionError(f'unsafe path: {host_path}') + return ql_file.open(host_path, openflags, openmode) def open(self, path: str, openmode: str): @@ -111,6 +114,9 @@ def open(self, path: str, openmode: str): host_path = self.path.virtual_to_host_path(path) + if not self.path.is_safe_host_path(host_path): + raise PermissionError(f'unsafe path: {host_path}') + return open(host_path, openmode) def _parse_path(self, p: Union[os.PathLike, str]) -> str: diff --git a/qiling/os/windows/const.py b/qiling/os/windows/const.py index e7c6d8fe4..33e85310b 100644 --- a/qiling/os/windows/const.py +++ b/qiling/os/windows/const.py @@ -12,12 +12,16 @@ ERROR_PATH_NOT_FOUND = 0x3 ERROR_ACCESS_DENIED = 0x5 ERROR_INVALID_HANDLE = 0x6 +ERROR_GEN_FAILURE = 0x1f +ERROR_FILE_EXISTS = 0x50 ERROR_INVALID_PARAMETER = 0x57 ERROR_BUFFER_OVERFLOW = 0x6F ERROR_INSUFFICIENT_BUFFER = 0x7A +ERROR_BAD_PATHNAME = 0xa1 ERROR_ALREADY_EXISTS = 0xB7 ERROR_MORE_DATA = 0xEA ERROR_NO_MORE_ITEMS = 0x103 +ERROR_DIRECTORY = 0x10b ERROR_NOT_OWNER = 0x120 ERROR_NO_UNICODE_TRANSLATION = 0x459 ERROR_OLD_WIN_VERSION = 0x47E diff --git a/qiling/os/windows/dlls/kernel32/fileapi.py b/qiling/os/windows/dlls/kernel32/fileapi.py index 0ccf37e0e..8f90153e0 100644 --- a/qiling/os/windows/dlls/kernel32/fileapi.py +++ b/qiling/os/windows/dlls/kernel32/fileapi.py @@ -190,13 +190,12 @@ def hook_WriteFile(ql: Qiling, address: int, params): ql.os.last_error = ERROR_INVALID_HANDLE return 0 - fobj = handle.obj data = ql.mem.read(lpBuffer, nNumberOfBytesToWrite) - if hFile == STD_OUTPUT_HANDLE: + if hFile in (STD_OUTPUT_HANDLE, STD_ERROR_HANDLE): ql.os.stats.log_string(data.decode()) - written = fobj.write(bytes(data)) + written = handle.obj.write(bytes(data)) ql.mem.write_ptr(lpNumberOfBytesWritten, written, 4) return 1 @@ -211,11 +210,10 @@ def _CreateFile(ql: Qiling, address: int, params): # hTemplateFile = params["hTemplateFile"] # access mask DesiredAccess - mode = "" if dwDesiredAccess & GENERIC_WRITE: - mode += "wb" + mode = "wb" else: - mode += "rb" + mode = "rb" try: f = ql.os.fs_mapper.open(s_lpFileName, mode) @@ -270,11 +268,13 @@ def hook_CreateFileA(ql: Qiling, address: int, params): def hook_CreateFileW(ql: Qiling, address: int, params): return _CreateFile(ql, address, params) -def _GetTempPath(ql: Qiling, address: int, params, wide: bool): - temp_path = ntpath.join(ql.rootfs, 'Windows', 'Temp') +def _GetTempPath(ql: Qiling, address: int, params, *, wide: bool): + vtmpdir = ntpath.join(ql.os.windir, 'Temp') + htmpdir = ql.os.path.virtual_to_host_path(vtmpdir) - if not os.path.exists(temp_path): - os.makedirs(temp_path, 0o755) + if ql.os.path.is_safe_host_path(htmpdir): + if not os.path.exists(htmpdir): + os.makedirs(htmpdir, 0o755) nBufferLength = params['nBufferLength'] lpBuffer = params['lpBuffer'] @@ -282,7 +282,7 @@ def _GetTempPath(ql: Qiling, address: int, params, wide: bool): enc = 'utf-16le' if wide else 'utf-8' # temp dir path has to end with a path separator - tmpdir = f'{ntpath.join(ql.os.windir, "Temp")}{ntpath.sep}'.encode(enc) + tmpdir = f'{vtmpdir}{ntpath.sep}'.encode(enc) cstr = tmpdir + '\x00'.encode(enc) if nBufferLength >= len(cstr): @@ -324,29 +324,26 @@ def hook_GetTempPathA(ql: Qiling, address: int, params): 'cchBuffer' : DWORD }) def hook_GetShortPathNameW(ql: Qiling, address: int, params): - paths = params["lpszLongPath"].split("\\") - dst = params["lpszShortPath"] - max_size = params["cchBuffer"] - res = paths[0] + lpszLongPath = params['lpszLongPath'] + lpszShortPath = params['lpszShortPath'] + cchBuffer = params['cchBuffer'] - for path in paths[1:]: - nameAndExt = path.split(".") - name = nameAndExt[0] - ext = "" if len(nameAndExt) == 1 else "." + nameAndExt[1] + def __shorten(p: str) -> str: + name, ext = ntpath.splitext(p) - if len(name) > 8: - name = name[:6] + "~1" + return f'{(name[:6] + "~1") if len(name) > 8 else name}{ext}' - res += "\\" + name + ext + shortpath = ntpath.join(*(__shorten(elem) for elem in lpszLongPath.split(ntpath.sep))) + encoded = f'{shortpath}\x00'.encode('utf-16le') - res += "\x00" - res = res.encode("utf-16le") + if len(shortpath) > cchBuffer: + return len(shortpath) + 1 - if max_size < len(res): - return len(res) + if lpszShortPath: + ql.mem.write(lpszShortPath, encoded) - ql.mem.write(dst, res) - return len(res) - 1 + # on succes, return chars count excluding null-term + return len(shortpath) # BOOL GetVolumeInformationW( @@ -471,18 +468,20 @@ def hook_GetDiskFreeSpaceW(ql: Qiling, address: int, params): 'lpSecurityAttributes' : LPSECURITY_ATTRIBUTES }) def hook_CreateDirectoryA(ql: Qiling, address: int, params): - path_name = params["lpPathName"] - target_dir = os.path.join(ql.rootfs, path_name.replace("\\", os.sep)) - ql.log.info('TARGET_DIR = %s' % target_dir) + lpPathName = params['lpPathName'] - # Verify the directory is in ql.rootfs to ensure no path traversal has taken place - real_path = ql.os.path.transform_to_real_path(path_name) + dst = ql.os.path.virtual_to_host_path(lpPathName) + + if not ql.os.path.is_safe_host_path(dst): + ql.os.last_error = ERROR_GEN_FAILURE + return 0 - if os.path.exists(real_path): + if os.path.exists(dst): ql.os.last_error = ERROR_ALREADY_EXISTS return 0 - os.mkdir(real_path) + os.mkdir(dst, 0o755) + return 1 # DWORD GetFileSize( @@ -494,13 +493,20 @@ def hook_CreateDirectoryA(ql: Qiling, address: int, params): 'lpFileSizeHigh' : LPDWORD }) def hook_GetFileSize(ql: Qiling, address: int, params): + hFile = params["hFile"] + + handle = ql.os.handle_manager.get(hFile) + + if handle is None: + ql.os.last_error = ERROR_INVALID_HANDLE + return -1 # INVALID_FILE_SIZE + try: - handle = ql.os.handle_manager.get(params['hFile']) return os.path.getsize(handle.obj.name) except: ql.os.last_error = ERROR_INVALID_HANDLE - return 0xFFFFFFFF #INVALID_FILE_SIZE + return -1 # INVALID_FILE_SIZE def _CreateFileMapping(ql: Qiling, address: int, params): hFile = params['hFile'] @@ -619,14 +625,23 @@ def hook_UnmapViewOfFile(ql: Qiling, address: int, params): 'bFailIfExists' : BOOL }) def hook_CopyFileA(ql: Qiling, address: int, params): - lpExistingFileName = ql.os.path.transform_to_real_path(params["lpExistingFileName"]) - lpNewFileName = ql.os.path.transform_to_real_path(params["lpNewFileName"]) - bFailIfExists = params["bFailIfExists"] + lpExistingFileName = params['lpExistingFileName'] + lpNewFileName = params['lpNewFileName'] + bFailIfExists = params['bFailIfExists'] - if bFailIfExists and os.path.exists(lpNewFileName): + src = ql.os.path.virtual_to_host_path(lpExistingFileName) + dst = ql.os.path.virtual_to_host_path(lpNewFileName) + + if not ql.os.path.is_safe_host_path(src) or not ql.os.path.is_safe_host_path(dst): + ql.os.last_error = ERROR_GEN_FAILURE + return 0 + + if bFailIfExists and os.path.exists(dst): + ql.os.last_error = ERROR_FILE_EXISTS return 0 - copyfile(lpExistingFileName, lpNewFileName) + copyfile(src, dst) + return 1 # BOOL SetFileAttributesA( @@ -663,6 +678,22 @@ def hook_SetFileApisToANSI(ql: Qiling, address: int, params): def hook_SetFileApisToOEM(ql: Qiling, address: int, params): pass +def _DeleteFile(ql: Qiling, address: int, params): + lpFileName = params["lpFileName"] + + dst = ql.os.path.virtual_to_host_path(lpFileName) + + if not ql.os.path.is_safe_host_path(dst): + ql.os.last_error = ERROR_GEN_FAILURE + return 0 + + try: + os.remove(dst) + except OSError: + return 0 + + return 1 + # BOOL DeleteFileA( # LPCSTR lpFileName # ); @@ -670,12 +701,7 @@ def hook_SetFileApisToOEM(ql: Qiling, address: int, params): 'lpFileName' : LPCSTR }) def hook_DeleteFileA(ql: Qiling, address: int, params): - lpFileName = ql.os.path.transform_to_real_path(params["lpFileName"]) - try: - os.remove(lpFileName) - return 1 - except: - return 0 + return _DeleteFile(ql, address, params) # BOOL DeleteFileW( # LPCWSTR lpFileName @@ -684,9 +710,4 @@ def hook_DeleteFileA(ql: Qiling, address: int, params): 'lpFileName' : LPCWSTR }) def hook_DeleteFileW(ql: Qiling, address: int, params): - lpFileName = ql.os.path.transform_to_real_path(params["lpFileName"]) - try: - os.remove(lpFileName) - return 1 - except: - return 0 + return _DeleteFile(ql, address, params) diff --git a/qiling/profiles/dos.ql b/qiling/profiles/dos.ql index c1bf3edc0..550d0f8d0 100644 --- a/qiling/profiles/dos.ql +++ b/qiling/profiles/dos.ql @@ -1,10 +1,3 @@ -[LOG] -# log directory output -# usage: dir = qlog -dir = -# split log file, use with multithread -split = False - [KERNEL] version = 7 ticks_per_second = 18.206 @@ -17,9 +10,5 @@ stack_size = 0x4000 base_address = 0x7000 [MISC] -# append string into different logs -# maily for multiple times Ql run with one file -# usage: append = test1 -append = automatize_input = False current_path = A:\ \ No newline at end of file diff --git a/qiling/profiles/freebsd.ql b/qiling/profiles/freebsd.ql index 7829ce0e4..ef66a4dca 100644 --- a/qiling/profiles/freebsd.ql +++ b/qiling/profiles/freebsd.ql @@ -17,19 +17,7 @@ gid = 1000 pid = 1996 -[LOG] -# log directory output -# usage: dir = qlog -dir = -# split log file, use with multithread -split = False - - [MISC] -# append string into different logs -# maily for multiple times Ql run with one file -# usage: append = test1 -append = current_path = / diff --git a/qiling/profiles/linux.ql b/qiling/profiles/linux.ql index f810f94b0..a0a628ad3 100644 --- a/qiling/profiles/linux.ql +++ b/qiling/profiles/linux.ql @@ -11,7 +11,6 @@ load_address = 0x555555554000 interp_address = 0x7ffff7dd5000 mmap_address = 0x7fffb7dd6000 vsyscall_address = 0xffffffffff600000 -vsyscall_size = 0x1000 [OS32] @@ -28,19 +27,7 @@ gid = 1000 pid = 1996 -[LOG] -# log directory output -# usage: dir = qlog -dir = -# split log file, use with multithread -split = False - - [MISC] -# append string into different logs -# maily for multiple times Ql run with one file -# usage: append = test1 -append = current_path = / diff --git a/qiling/profiles/macos.ql b/qiling/profiles/macos.ql index 50445e42b..b3624c0f1 100644 --- a/qiling/profiles/macos.ql +++ b/qiling/profiles/macos.ql @@ -24,19 +24,7 @@ gid = 1000 pid = 1996 -[LOG] -# log directory output -# usage: dir = qlog -dir = -# split log file, use with multithread -split = False - - [MISC] -# append string into different logs -# maily for multiple times Ql run with one file -# usage: append = test1 -append = current_path = / diff --git a/qiling/profiles/qnx.ql b/qiling/profiles/qnx.ql index a52b0c56b..5e0c7b14a 100644 --- a/qiling/profiles/qnx.ql +++ b/qiling/profiles/qnx.ql @@ -15,27 +15,17 @@ cpupage_address = 0xfc4048d8 cpupage_tls_address = 0xfc405000 tls_data_address = 0xfc406000 + [KERNEL] uid = 1000 gid = 1000 pid = 1996 -[LOG] -# log directory output -# usage: dir = qlog -dir = -# split log file, use with multithread -split = False - - [MISC] -# append string into different logs -# maily for multiple times Ql run with one file -# usage: append = test1 -append = current_path = / + [NETWORK] # To use IPv6 or not, to avoid binary double bind. ipv6 and ipv4 bind the same port at the same time bindtolocalhost = True diff --git a/qiling/profiles/windows.ql b/qiling/profiles/windows.ql index 8dbf184ba..15cc2f39b 100644 --- a/qiling/profiles/windows.ql +++ b/qiling/profiles/windows.ql @@ -30,18 +30,7 @@ pid = 1996 parent_pid = 0 shell_pid = 10 -[LOG] -# log directory output -# usage: dir = qlog -dir = -# split log file, use with multithread -split = False - [MISC] -# append string into different logs -# maily for multiple times Ql run with one file -# usage: append = test1 -append = current_path = C:\ [SYSTEM] diff --git a/tests/profiles/append_test.ql b/tests/profiles/append_test.ql deleted file mode 100644 index 5d3b7ea24..000000000 --- a/tests/profiles/append_test.ql +++ /dev/null @@ -1,5 +0,0 @@ -[LOG] -dir = log_test - -[MISC] -append = append_test diff --git a/tests/profiles/uboot_bin.ql b/tests/profiles/uboot_bin.ql index 1402374e1..b7f7216c8 100644 --- a/tests/profiles/uboot_bin.ql +++ b/tests/profiles/uboot_bin.ql @@ -4,17 +4,5 @@ entry_point = 0x80800000 heap_size = 0x300000 -[LOG] -# log directory output -# usage: dir = qlog -dir = -# split log file, use with multithread -split = False - - [MISC] -# append string into different logs -# maily for multiple times Ql run with one file -# usage: append = test1 -append = current_path = / diff --git a/tests/test_elf.py b/tests/test_elf.py index 7eb18fecc..48d071865 100644 --- a/tests/test_elf.py +++ b/tests/test_elf.py @@ -209,8 +209,12 @@ def test_elf_linux_x8664_static(self): def test_elf_linux_x86(self): - ql = Qiling(["../examples/rootfs/x86_linux/bin/x86_hello"], "../examples/rootfs/x86_linux", verbose=QL_VERBOSE.DEBUG, log_file="test.qlog") + filename = 'test.qlog' + + ql = Qiling(["../examples/rootfs/x86_linux/bin/x86_hello"], "../examples/rootfs/x86_linux", verbose=QL_VERBOSE.DEBUG, log_file=filename) ql.run() + + os.remove(filename) del ql @@ -352,7 +356,7 @@ def my_puts(ql): all_mem = ql.mem.save() ql.mem.restore(all_mem) - ql = Qiling(["../examples/rootfs/arm_linux/bin/arm_hello"], "../examples/rootfs/arm_linux", verbose=QL_VERBOSE.DEBUG, profile='profiles/append_test.ql') + ql = Qiling(["../examples/rootfs/arm_linux/bin/arm_hello"], "../examples/rootfs/arm_linux", verbose=QL_VERBOSE.DEBUG) ql.os.set_api('puts', my_puts) ql.run() del ql @@ -1073,7 +1077,7 @@ def test_elf_linux_x8664_getdents(self): del ql def test_elf_linux_armeb(self): - ql = Qiling(["../examples/rootfs/armeb_linux/bin/armeb_hello"], "../examples/rootfs/armeb_linux", verbose=QL_VERBOSE.DEBUG, profile='profiles/append_test.ql') + ql = Qiling(["../examples/rootfs/armeb_linux/bin/armeb_hello"], "../examples/rootfs/armeb_linux", verbose=QL_VERBOSE.DEBUG) ql.run() del ql diff --git a/tests/test_elf_ko.py b/tests/test_elf_ko.py index 69c61a059..9e5153736 100644 --- a/tests/test_elf_ko.py +++ b/tests/test_elf_ko.py @@ -5,8 +5,6 @@ import os, sys, unittest -from unicorn import UcError - sys.path.append("..") from qiling import Qiling from qiling.const import QL_INTERCEPT, QL_VERBOSE @@ -19,61 +17,64 @@ class ELF_KO_Test(unittest.TestCase): @unittest.skipIf(IS_FAST_TEST, 'fast test') def test_demigod_m0hamed_x86(self): - checklist = {} + checklist = [] @linux_kernel_api(params={ "format": STRING }) - def my_printk(ql: Qiling, address: int, params): - ql.log.info(f'oncall printk: params = {params}') + def __my_printk(ql: Qiling, address: int, params): + ql.log.info(f'my printk: {params=}') - checklist['oncall'] = params['format'] + checklist.append(params['format']) return 0 ql = Qiling(["../examples/rootfs/x86_linux/kernel/m0hamed_rootkit.ko"], "../examples/rootfs/x86_linux", verbose=QL_VERBOSE.DEBUG) - ql.os.set_api("printk", my_printk) + ql.os.set_api("printk", __my_printk) ba = ql.loader.load_address + ql.run(ba + 0x01e0, ba + 0x01fa) - try: - ql.run(ba + 0x11e0, ba + 0x11fa) - except UcError as e: - self.fail(e) - else: - self.assertEqual("DONT YOU EVER TRY TO READ THIS FILE OR I AM GOING TO DESTROY YOUR MOST SECRET DREAMS", checklist['oncall']) + self.assertEqual("DONT YOU EVER TRY TO READ THIS FILE OR I AM GOING TO DESTROY YOUR MOST SECRET DREAMS", checklist.pop(0)) + self.assertEqual(len(checklist), 0) def test_demigod_hello_x8664(self): - checklist = {} + checklist = [] - def my_onenter(ql: Qiling, address: int, params): - ql.log.info(f'onenter printk: params = {params}') + def __onenter_printk(ql: Qiling, address: int, params): + ql.log.info(f'about to enter printk: {params=}') - checklist['onenter'] = params['format'] + checklist.append(params['format']) ql = Qiling(["../examples/rootfs/x8664_linux/kernel/hello.ko"], "../examples/rootfs/x8664_linux", verbose=QL_VERBOSE.DEBUG) - ql.os.set_api("printk", my_onenter, QL_INTERCEPT.ENTER) + ql.os.set_api("printk", __onenter_printk, QL_INTERCEPT.ENTER) ba = ql.loader.load_address - ql.run(ba + 0x1064, ba + 0x107e) + ql.run(ba + 0x64, ba + 0x7e) # run lkm_example_init + ql.run(ba + 0x7f, ba + 0x90) # run lkm_example_exit - self.assertEqual("\x016Hello, World: %p!\n", checklist['onenter']) + self.assertIn('Hello', checklist.pop(0)) + self.assertIn('Goodbye', checklist.pop(0)) + self.assertEqual(len(checklist), 0) def test_demigod_hello_mips32(self): - checklist = {} + checklist = [] - def my_onexit(ql: Qiling, address: int, params, retval: int): - ql.log.info(f'onexit printk: params = {params}') + def __onexit_printk(ql: Qiling, address: int, params, retval: int): + ql.log.info(f'done with printk: {params=}') - checklist['onexit'] = params['format'] + checklist.append(params['format']) ql = Qiling(["../examples/rootfs/mips32_linux/kernel/hello.ko"], "../examples/rootfs/mips32_linux", verbose=QL_VERBOSE.DEBUG) - ql.os.set_api("printk", my_onexit, QL_INTERCEPT.EXIT) + ql.os.set_api("printk", __onexit_printk, QL_INTERCEPT.EXIT) ba = ql.loader.load_address - ql.run(ba + 0x1060, ba + 0x1084) + ql.run(ba + 0x60, ba + 0x84) # run hello + ql.run(ba + 0x88, ba + 0x98) # run goodbye + + self.assertIn('Hello', checklist.pop(0)) + self.assertEqual(len(checklist), 0) - self.assertEqual("\x016Hello, World!\n", checklist['onexit']) if __name__ == "__main__": unittest.main() diff --git a/tests/test_elf_multithread.py b/tests/test_elf_multithread.py index 482a30256..443f63acc 100644 --- a/tests/test_elf_multithread.py +++ b/tests/test_elf_multithread.py @@ -34,12 +34,20 @@ def test_elf_linux_cloexec_x8664(self): verbose=QL_VERBOSE.DEBUG, multithread=True) - err = ql_file.open('output.txt', os.O_RDWR | os.O_CREAT, 0o777) + filename = 'output.txt' + err = ql_file.open(filename, os.O_RDWR | os.O_CREAT, 0o777) + ql.os.stderr = err ql.run() - os.close(err.fileno()) - with open('output.txt', 'rb') as f: - self.assertTrue(b'fail' in f.read()) + err.close() + + with open(filename, 'rb') as f: + content = f.read() + + # cleanup + os.remove(filename) + + self.assertIn(b'fail', content) del ql @@ -92,7 +100,7 @@ def check_write(ql, write_fd, write_buf, write_count, *args, **kw): except: pass buf_out = None - ql = Qiling(["../examples/rootfs/x8664_linux/bin/x8664_multithreading"], "../examples/rootfs/x8664_linux", multithread=True, profile= "profiles/append_test.ql") + ql = Qiling(["../examples/rootfs/x8664_linux/bin/x8664_multithreading"], "../examples/rootfs/x8664_linux", multithread=True) ql.os.set_syscall("write", check_write, QL_INTERCEPT.ENTER) ql.run() diff --git a/tests/test_pe.py b/tests/test_pe.py index f1cb73fb2..b89bf334c 100644 --- a/tests/test_pe.py +++ b/tests/test_pe.py @@ -68,8 +68,7 @@ class PETest(unittest.TestCase): def test_pe_win_x8664_hello(self): def _t(): - ql = Qiling(["../examples/rootfs/x8664_windows/bin/x8664_hello.exe"], "../examples/rootfs/x8664_windows", - verbose=QL_VERBOSE.DEFAULT) + ql = Qiling(["../examples/rootfs/x8664_windows/bin/x8664_hello.exe"], "../examples/rootfs/x8664_windows") ql.run() del ql return True @@ -79,8 +78,7 @@ def _t(): def test_pe_win_x86_hello(self): def _t(): - ql = Qiling(["../examples/rootfs/x86_windows/bin/x86_hello.exe"], "../examples/rootfs/x86_windows", - verbose=QL_VERBOSE.DEFAULT, profile="profiles/append_test.ql") + ql = Qiling(["../examples/rootfs/x86_windows/bin/x86_hello.exe"], "../examples/rootfs/x86_windows") ql.run() del ql return True @@ -90,8 +88,7 @@ def _t(): def test_pe_win_x8664_file_upx(self): def _t(): - ql = Qiling(["../examples/rootfs/x8664_windows/bin/x8664_file_upx.exe"], "../examples/rootfs/x8664_windows", - verbose=QL_VERBOSE.DEFAULT) + ql = Qiling(["../examples/rootfs/x8664_windows/bin/x8664_file_upx.exe"], "../examples/rootfs/x8664_windows") ql.run() del ql return True @@ -101,8 +98,7 @@ def _t(): def test_pe_win_x86_file_upx(self): def _t(): - ql = Qiling(["../examples/rootfs/x86_windows/bin/x86_file_upx.exe"], "../examples/rootfs/x86_windows", - verbose=QL_VERBOSE.DEFAULT) + ql = Qiling(["../examples/rootfs/x86_windows/bin/x86_file_upx.exe"], "../examples/rootfs/x86_windows") ql.run() del ql return True