From a7497b46c02565161ba3f28fa4d7106a8c14697e Mon Sep 17 00:00:00 2001 From: elicn Date: Tue, 14 Jun 2022 15:12:57 +0300 Subject: [PATCH 1/2] Provide virtual path instead of host path when possible --- qiling/debugger/gdb/gdb.py | 18 +++++---- qiling/os/mapper.py | 10 +++-- qiling/os/path.py | 19 +++++++++- qiling/os/posix/syscall/unistd.py | 63 +++++++++++++++++-------------- 4 files changed, 70 insertions(+), 40 deletions(-) diff --git a/qiling/debugger/gdb/gdb.py b/qiling/debugger/gdb/gdb.py index f559a2a83..f3472af91 100644 --- a/qiling/debugger/gdb/gdb.py +++ b/qiling/debugger/gdb/gdb.py @@ -494,7 +494,7 @@ def handle_q(subcmd: str) -> Reply: return b'l' + auxv_data[:length] elif feature == 'exec-file' and op == 'read': - return f'l{os.path.abspath(self.ql.path)}' + return f'l{self.ql.os.path.host_to_virtual_path(self.ql.path)}' elif feature == 'libraries-svr4' and op == 'read': # TODO: this one requires information of loaded libraries which currently not provided @@ -584,16 +584,20 @@ def handle_v(subcmd: str) -> Reply: flags = int(flags, 16) mode = int(mode, 16) - # try to guess whether this is an emulated path or real one - if path.startswith(os.path.abspath(self.ql.rootfs)): - host_path = path + virtpath = self.ql.os.path.virtual_abspath(path) + + if virtpath.startswith(r'/proc'): + # TODO: we really need a centralized virtual filesystem to open + # both emulated (like procfs) and real files, and manage their + # file descriptors seamlessly + fd = -1 else: host_path = self.ql.os.path.virtual_to_host_path(path) - self.ql.log.debug(f'{PROMPT} target file: {host_path}') + self.ql.log.debug(f'{PROMPT} target host path: {host_path}') - if os.path.exists(host_path) and not path.startswith(r'/proc'): - fd = os.open(host_path, flags, mode) + if os.path.exists(host_path): + fd = os.open(host_path, flags, mode) return f'F{fd:x}' diff --git a/qiling/os/mapper.py b/qiling/os/mapper.py index 6b79524fb..c5b2914ae 100644 --- a/qiling/os/mapper.py +++ b/qiling/os/mapper.py @@ -101,15 +101,17 @@ def open_ql_file(self, path: str, openflags: int, openmode: int): if self.has_mapping(path): return self._open_mapping_ql_file(path, openflags, openmode) - real_path = self.path.transform_to_real_path(path) - return ql_file.open(real_path, openflags, openmode) + host_path = self.path.virtual_to_host_path(path) + + return ql_file.open(host_path, openflags, openmode) def open(self, path: str, openmode: str): if self.has_mapping(path): return self._open_mapping(path, openmode) - real_path = self.path.transform_to_real_path(path) - return open(real_path, openmode) + host_path = self.path.virtual_to_host_path(path) + + return open(host_path, openmode) def _parse_path(self, p: Union[os.PathLike, str]) -> str: fspath = getattr(p, '__fspath__', None) diff --git a/qiling/os/path.py b/qiling/os/path.py index 2cab19913..61c5b8db1 100644 --- a/qiling/os/path.py +++ b/qiling/os/path.py @@ -42,9 +42,9 @@ def __init__(self, rootfs: str, cwd: str, emulos: QL_OS) -> None: self.cwd = cwd # + # temporary aliases for backward compatibility self.transform_to_relative_path = self.virtual_abspath self.transform_to_real_path = self.virtual_to_host_path - self.transform_to_link_path = self.virtual_to_host_path # @staticmethod @@ -258,6 +258,23 @@ def __is_safe_host_path(self, hostpath: Path, strict: bool = False) -> bool: else: return True + def host_to_virtual_path(self, hostpath: str) -> str: + """Convert a host path to its corresponding virtual path relative to rootfs. + + Args: + hostpath : host path + + Returns: the corresponding virtual path + + Raises: `ValueError` in case the host path does not map to a rootfs location + """ + + resolved = Path(hostpath).resolve(strict=False) + virtpath = self._cwd_anchor / resolved.relative_to(self._rootfs_path) + + return str(virtpath) + + def virtual_abspath(self, virtpath: str) -> str: """Convert a relative virtual path to an absolute virtual path based on the current working directory. diff --git a/qiling/os/posix/syscall/unistd.py b/qiling/os/posix/syscall/unistd.py index fe0c01ca6..f7cb6cf94 100644 --- a/qiling/os/posix/syscall/unistd.py +++ b/qiling/os/posix/syscall/unistd.py @@ -338,23 +338,25 @@ def ql_syscall_write(ql: Qiling, fd: int, buf: int, count: int): def ql_syscall_readlink(ql: Qiling, path_name: int, path_buff: int, path_buffsize: int): pathname = ql.os.utils.read_cstring(path_name) # pathname = str(pathname, 'utf-8', errors="ignore") - real_path = ql.os.path.transform_to_link_path(pathname) - relative_path = ql.os.path.transform_to_relative_path(pathname) + host_path = ql.os.path.virtual_to_host_path(pathname) + virt_path = ql.os.path.virtual_abspath(pathname) - if not os.path.exists(real_path): - regreturn = -1 + # cover procfs psaudo files first + # TODO: /proc/self/root, /proc/self/cwd + if virt_path == r'/proc/self/exe': + p = ql.os.path.host_to_virtual_path(ql.path) + p = p.encode('utf-8') - elif relative_path == r'/proc/self/exe': - localpath = os.path.abspath(ql.path) - localpath = bytes(localpath, 'utf-8') + b'\x00' + ql.mem.write(path_buff, p + b'\x00') + regreturn = len(p) - ql.mem.write(path_buff, localpath) - regreturn = len(localpath) - 1 + elif os.path.exists(host_path): + regreturn = 0 else: - regreturn = 0 + regreturn = -1 - ql.log.debug("readlink(%s, 0x%x, 0x%x) = %d" % (relative_path, path_buff, path_buffsize, regreturn)) + ql.log.debug('readlink("%s", 0x%x, 0x%x) = %d' % (virt_path, path_buff, path_buffsize, regreturn)) return regreturn @@ -376,20 +378,20 @@ def ql_syscall_getcwd(ql: Qiling, path_buff: int, path_buffsize: int): def ql_syscall_chdir(ql: Qiling, path_name: int): pathname = ql.os.utils.read_cstring(path_name) - real_path = ql.os.path.transform_to_real_path(pathname) - relative_path = ql.os.path.transform_to_relative_path(pathname) + host_path = ql.os.path.virtual_to_host_path(pathname) + virt_path = ql.os.path.virtual_abspath(pathname) - if os.path.exists(real_path) and os.path.isdir(real_path): + if os.path.exists(host_path) and os.path.isdir(host_path): if ql.os.thread_management: - ql.os.thread_management.cur_thread.path.cwd = relative_path + ql.os.thread_management.cur_thread.path.cwd = virt_path else: - ql.os.path.cwd = relative_path + ql.os.path.cwd = virt_path regreturn = 0 - ql.log.debug("chdir(%s) = %d"% (relative_path, regreturn)) + ql.log.debug("chdir(%s) = %d"% (virt_path, regreturn)) else: regreturn = -1 - ql.log.warning("chdir(%s) = %d : not found" % (relative_path, regreturn)) + ql.log.warning("chdir(%s) = %d : not found" % (virt_path, regreturn)) return regreturn @@ -397,21 +399,26 @@ def ql_syscall_chdir(ql: Qiling, path_name: int): def ql_syscall_readlinkat(ql: Qiling, dfd: int, path: int, buf: int, bufsize: int): pathname = ql.os.utils.read_cstring(path) # pathname = str(pathname, 'utf-8', errors="ignore") - real_path = ql.os.path.transform_to_link_path(pathname) - relative_path = ql.os.path.transform_to_relative_path(pathname) + host_path = ql.os.path.virtual_to_host_path(pathname) + virt_path = ql.os.path.virtual_abspath(pathname) - if not os.path.exists(real_path): - regreturn = -1 + # cover procfs psaudo files first + # TODO: /proc/self/root, /proc/self/cwd + if virt_path == r'/proc/self/exe': + p = ql.os.path.host_to_virtual_path(ql.path) + p = p.encode('utf-8') - elif relative_path == r'/proc/self/exe': - localpath = os.path.abspath(ql.path) - localpath = bytes(localpath, 'utf-8') + b'\x00' + ql.mem.write(buf, p + b'\x00') + regreturn = len(p) - ql.mem.write(buf, localpath) - regreturn = len(localpath) -1 - else: + elif os.path.exists(host_path): regreturn = 0 + else: + regreturn = -1 + + ql.log.debug('readlinkat(%d, "%s", 0x%x, 0x%x) = %d' % (dfd, virt_path, buf, bufsize, regreturn)) + return regreturn From ce2b3769cc37f1d080e71521cbb39312541a4d54 Mon Sep 17 00:00:00 2001 From: elicn Date: Tue, 14 Jun 2022 15:15:45 +0300 Subject: [PATCH 2/2] Code quality improvements --- qiling/arch/x86_utils.py | 5 +- qiling/os/posix/filestruct.py | 196 +++++++++++++++--------------- qiling/os/posix/syscall/socket.py | 117 +++++++++--------- qiling/os/windows/registry.py | 2 +- 4 files changed, 165 insertions(+), 155 deletions(-) diff --git a/qiling/arch/x86_utils.py b/qiling/arch/x86_utils.py index de6e24f24..f0f58706f 100644 --- a/qiling/arch/x86_utils.py +++ b/qiling/arch/x86_utils.py @@ -1,5 +1,6 @@ from abc import abstractmethod +from typing import Optional from qiling import Qiling from qiling.arch.x86 import QlArchIntel @@ -32,7 +33,7 @@ def __setitem__(self, index: int, data: bytes) -> None: self.mem.write(self.base + (index * self.entsize), data) - def get_next_free(self, start: int = None, end: int = None) -> int: + def get_next_free(self, start: Optional[int] = None, end: Optional[int] = None) -> int: # The Linux kernel determines whether the segment is empty by judging whether the content in the current GDT segment is 0. null_entry = b'\x00' * self.entsize @@ -112,7 +113,7 @@ def get_entry(self, index: int) -> bytes: def set_entry(self, index: int, data: bytes) -> None: self.array[index] = data - def get_free_idx(self, start: int = None, end: int = None) -> int: + def get_free_idx(self, start: Optional[int] = None, end: Optional[int] = None) -> int: return self.array.get_next_free(start, end) diff --git a/qiling/os/posix/filestruct.py b/qiling/os/posix/filestruct.py index daa978ce8..f20fd38fd 100644 --- a/qiling/os/posix/filestruct.py +++ b/qiling/os/posix/filestruct.py @@ -3,133 +3,136 @@ # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # import os - -from qiling.exception import * +from socket import socket, AddressFamily, SocketKind +from typing import Union try: import fcntl except ImportError: pass -import socket class ql_socket: - def __init__(self, socket): + def __init__(self, socket: socket): self.__fd = socket.fileno() self.__socket = socket def __getstate__(self, *args, **kwargs): - _state = self.__dict__.copy() - _state["_ql_socket__socket"] = { - "family": self.__dict__["_ql_socket__socket"].family, - "type": self.__dict__["_ql_socket__socket"].type, - "proto": self.__dict__["_ql_socket__socket"].proto, - "laddr": self.__dict__["_ql_socket__socket"].getsockname(), - } + fname = f'_{self.__class__.__name__}__socket' + sock = self.__dict__[fname] + + _state[fname] = { + "family" : sock.family, + "type" : sock.type, + "proto" : sock.proto, + "laddr" : sock.getsockname(), + } return _state def __setstate__(self, state): self.__dict__ = state - + @classmethod - def open(self, socket_domain, socket_type, socket_protocol, opts=None): - s = socket.socket(socket_domain, socket_type, socket_protocol) - if opts: - s.setsockopt(*opts) - return self(s) - - def read(self, read_len): - return os.read(self.__fd, read_len) - - def write(self, write_buf): - return os.write(self.__fd, write_buf) - - def fileno(self): + def open(cls, domain: Union[AddressFamily, int], socktype: Union[SocketKind, int], protocol: int): + s = socket(domain, socktype, protocol) + + return cls(s) + + def read(self, length: int) -> bytes: + return os.read(self.__fd, length) + + def write(self, data: bytes) -> int: + return os.write(self.__fd, data) + + def fileno(self) -> int: return self.__fd - - def close(self): - return os.close(self.__fd) - - def fcntl(self, fcntl_cmd, fcntl_arg): + + def close(self) -> None: + os.close(self.__fd) + + def fcntl(self, cmd, arg): try: - return fcntl.fcntl(self.__fd, fcntl_cmd, fcntl_arg) + return fcntl.fcntl(self.__fd, cmd, arg) except Exception: pass - def ioctl(self, ioctl_cmd, ioctl_arg): + def ioctl(self, cmd, arg): try: - return fcntl.ioctl(self.__fd, ioctl_cmd, ioctl_arg) + return fcntl.ioctl(self.__fd, cmd, arg) except Exception: - pass - - def dup(self): + pass + + def dup(self) -> 'ql_socket': new_s = self.__socket.dup() - new_ql_socket = ql_socket(new_s) - return new_ql_socket - - def connect(self, connect_addr): - return self.__socket.connect(connect_addr) - - def shutdown(self, shutdown_how): - return self.__socket.shutdown(shutdown_how) - - def bind(self, bind_addr): - return self.__socket.bind(bind_addr) - - def listen(self, listen_num): - return self.__socket.listen(listen_num) + + return ql_socket(new_s) + + def connect(self, address) -> None: + self.__socket.connect(address) + + def shutdown(self, how: int) -> None: + return self.__socket.shutdown(how) + + def bind(self, address) -> None: + return self.__socket.bind(address) + + def listen(self, backlog: int) -> None: + return self.__socket.listen(backlog) def getsockname(self): return self.__socket.getsockname() - + def getpeername(self): return self.__socket.getpeername() - def getsockopt(self, level, optname, optlen): - return self.__socket.getsockopt(level, optname, optlen) + def getsockopt(self, level: int, optname: int, buflen: int): + return self.__socket.getsockopt(level, optname, buflen) - def setsockopt(self, level, optname, optval, optlen): - if optval is not None: - return self.__socket.setsockopt(level, optname, optval) + def setsockopt(self, level: int, optname: int, value: Union[int, bytes, None], optlen: int = 0) -> None: + if value is None: + self.__socket.setsockopt(level, optname, None, optlen) else: - return self.__socket.setsockopt(level, optname, None, optval) - + self.__socket.setsockopt(level, optname, value) + def accept(self): try: con, addr = self.__socket.accept() - new_ql_socket = ql_socket(con) except BlockingIOError: # For support non-blocking sockets - return None, None + addr = None + new_ql_socket = None + else: + new_ql_socket = ql_socket(con) + return new_ql_socket, addr - - def recv(self, recv_len, recv_flags): - return self.__socket.recv(recv_len, recv_flags) - - def send(self, send_buf, send_flags): - return self.__socket.send(send_buf, send_flags) - - def recvmsg(self, bufsize, ancbufsize, flags): + + def recv(self, bufsize: int, flags: int) -> bytes: + return self.__socket.recv(bufsize, flags) + + def send(self, data, flags: int) -> int: + return self.__socket.send(data, flags) + + def recvmsg(self, bufsize: int, ancbufsize: int, flags: int): return self.__socket.recvmsg(bufsize, ancbufsize, flags) - def recvfrom(self, recvfrom_len, recvfrom_flags): - return self.__socket.recvfrom(recvfrom_len, recvfrom_flags) + def recvfrom(self, bufsize: int, flags: int): + return self.__socket.recvfrom(bufsize, flags) def sendto(self, sendto_buf, sendto_flags, sendto_addr): return self.__socket.sendto(sendto_buf, sendto_flags, sendto_addr) @property - def family(self): + def family(self) -> AddressFamily: return self.__socket.family @property - def socktype(self): + def socktype(self) -> SocketKind: return self.__socket.type @property - def socket(self): + def socket(self) -> socket: return self.__socket # def __getattr__(self,name): @@ -139,41 +142,40 @@ def socket(self): # raise AttributeError("A instance has no attribute '%s'" % name) class ql_pipe: - def __init__(self, fd): + def __init__(self, fd: int): self.__fd = fd @classmethod - def open(self): + def open(cls): r, w = os.pipe() - return (self(r), self(w)) - - def read(self, read_len): - return os.read(self.__fd, read_len) - - def write(self, write_buf): - return os.write(self.__fd, write_buf) - - def fileno(self): + + return (cls(r), cls(w)) + + def read(self, length: int) -> bytes: + return os.read(self.__fd, length) + + def write(self, data: bytes) -> int: + return os.write(self.__fd, data) + + def fileno(self) -> int: return self.__fd - def close(self): - return os.close(self.__fd) - - def fcntl(self, fcntl_cmd, fcntl_arg): + def close(self) -> None: + os.close(self.__fd) + + def fcntl(self, cmd, arg): try: - return fcntl.fcntl(self.__fd, fcntl_cmd, fcntl_arg) + return fcntl.fcntl(self.__fd, cmd, arg) except Exception: pass - def ioctl(self, ioctl_cmd, ioctl_arg): + def ioctl(self, cmd, arg): try: - return fcntl.ioctl(self.__fd, ioctl_cmd, ioctl_arg) + return fcntl.ioctl(self.__fd, cmd, arg) except Exception: - pass - - def dup(self): - new_fd = os.dup(self.__fd) - new_ql_pipe = ql_pipe(new_fd) - return new_ql_pipe + pass + def dup(self) -> 'ql_pipe': + new_fd = os.dup(self.__fd) + return ql_pipe(new_fd) diff --git a/qiling/os/posix/syscall/socket.py b/qiling/os/posix/syscall/socket.py index a80c7b998..3eb4deefe 100644 --- a/qiling/os/posix/syscall/socket.py +++ b/qiling/os/posix/syscall/socket.py @@ -69,38 +69,44 @@ def ql_bin_to_ip(ip): def ql_syscall_socket(ql: Qiling, socket_domain, socket_type, socket_protocol): idx = next((i for i in range(NR_OPEN) if ql.os.fd[i] is None), -1) - regreturn = idx if idx != -1: + emu_socket_value = socket_type + # ql_socket.open should use host platform based socket_type. try: - emu_socket_value = socket_type emu_socket_type = socket_type_mapping(socket_type, ql.arch.type, ql.os.type) - socket_type = getattr(socket, emu_socket_type) - ql.log.debug(f'Convert emu_socket_type {emu_socket_type}:{emu_socket_value} to host platform based socket_type {emu_socket_type}:{socket_type}') + except KeyError: + ql.log.error(f'Cannot convert emu_socket_type {emu_socket_value} to host platform based socket_type') + raise + try: + socket_type = getattr(socket, emu_socket_type) except AttributeError: ql.log.error(f'Cannot convert emu_socket_type {emu_socket_type}:{emu_socket_value} to host platform based socket_type') raise - except Exception: - ql.log.error(f'Cannot convert emu_socket_type {emu_socket_value} to host platform based socket_type') - raise + ql.log.debug(f'Convert emu_socket_type {emu_socket_type}:{emu_socket_value} to host platform based socket_type {emu_socket_type}:{socket_type}') try: - if ql.verbose >= QL_VERBOSE.DEBUG: # set REUSEADDR options under debug mode - ql.os.fd[idx] = ql_socket.open(socket_domain, socket_type, socket_protocol, (socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)) - else: - ql.os.fd[idx] = ql_socket.open(socket_domain, socket_type, socket_protocol) - except OSError as e: # May raise error: Protocol not supported + sock = ql_socket.open(socket_domain, socket_type, socket_protocol) + + # set REUSEADDR options under debug mode + if ql.verbose >= QL_VERBOSE.DEBUG: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + ql.os.fd[idx] = sock + + # May raise error: Protocol not supported + except OSError as e: ql.log.debug(f'{e}: {socket_domain=}, {socket_type=}, {socket_protocol=}') - regreturn = -1 + idx = -1 socket_type = socket_type_mapping(socket_type, ql.arch.type, ql.os.type) socket_domain = socket_domain_mapping(socket_domain, ql.arch.type, ql.os.type) - ql.log.debug("socket(%s, %s, %s) = %d" % (socket_domain, socket_type, socket_protocol, regreturn)) + ql.log.debug("socket(%s, %s, %s) = %d" % (socket_domain, socket_type, socket_protocol, idx)) - return regreturn + return idx def ql_syscall_connect(ql: Qiling, connect_sockfd, connect_addr, connect_addrlen): @@ -146,29 +152,31 @@ def ql_syscall_getsockopt(ql: Qiling, sockfd, level, optname, optval_addr, optle try: optlen = min(ql.unpack32s(ql.mem.read(optlen_addr, 4)), 1024) + if optlen < 0: return -EINVAL + emu_level = level + try: - emu_level = level emu_level_name = socket_level_mapping(emu_level, ql.arch.type, ql.os.type) - level = getattr(socket, emu_level_name) - ql.log.debug("Convert emu_level {}:{} to host platform based level {}:{}".format( - emu_level_name, emu_level, emu_level_name, level)) + except KeyError: + ql.log.error(f"Can't convert emu_level {emu_level} to host platform based level") + raise + try: + level = getattr(socket, emu_level_name) except AttributeError: - ql.log.error("Can't convert emu_level {}:{} to host platform based emu_level".format( - emu_level_name, emu_level)) + ql.log.error(f"Can't convert emu_level {emu_level_name}:{emu_level} to host platform based emu_level") raise - except Exception: - ql.log.error("Can't convert emu_level {} to host platform based level".format(emu_level)) - raise + ql.log.debug(f"Convert emu_level {emu_level_name}:{emu_level} to host platform based level {emu_level_name}:{level}") - try: - emu_opt = optname + emu_opt = optname + try: emu_level_name = socket_level_mapping(emu_level, ql.arch.type, ql.os.type) + # emu_opt_name is based on level if emu_level_name == "IPPROTO_IP": emu_opt_name = socket_ip_option_mapping(emu_opt, ql.arch.type, ql.os.type) @@ -180,18 +188,17 @@ def ql_syscall_getsockopt(ql: Qiling, sockfd, level, optname, optval_addr, optle if emu_opt_name.endswith("_NEW") or emu_opt_name.endswith("_OLD"): emu_opt_name = emu_opt_name[:-4] - optname = getattr(socket, emu_opt_name) - ql.log.debug("Convert emu_optname {}:{} to host platform based optname {}:{}".format( - emu_opt_name, emu_opt, emu_opt_name, optname)) + except KeyError: + ql.log.error(f"Can't convert emu_optname {emu_opt} to host platform based optname") + raise + try: + optname = getattr(socket, emu_opt_name) except AttributeError: - ql.log.error("Can't convert emu_optname {}:{} to host platform based emu_optname".format( - emu_opt_name, emu_opt)) + ql.log.error(f"Can't convert emu_optname {emu_opt_name}:{emu_opt} to host platform based emu_optname") raise - except Exception: - ql.log.error("Can't convert emu_optname {} to host platform based optname".format(emu_opt)) - raise + ql.log.debug(f"Convert emu_optname {emu_opt_name}:{emu_opt} to host platform based optname {emu_opt_name}:{optname}") optval = ql.os.fd[sockfd].getsockopt(level, optname, optlen) ql.mem.write(optval_addr, optval) @@ -210,26 +217,27 @@ def ql_syscall_setsockopt(ql: Qiling, sockfd, level, optname, optval_addr, optle ql.os.fd[sockfd].setsockopt(level, optname, None, optlen) else: try: + emu_level = level + try: - emu_level = level emu_level_name = socket_level_mapping(emu_level, ql.arch.type, ql.os.type) - level = getattr(socket, emu_level_name) - ql.log.debug("Convert emu_level {}:{} to host platform based level {}:{}".format( - emu_level_name, emu_level, emu_level_name, level)) + except KeyError: + ql.log.error(f"Can't convert emu_level {emu_level} to host platform based level") + raise + try: + level = getattr(socket, emu_level_name) except AttributeError: - ql.log.error("Can't convert emu_level {}:{} to host platform based emu_level".format( - emu_level_name, emu_level)) + ql.log.error(f"Can't convert emu_level {emu_level_name}:{emu_level} to host platform based emu_level") raise - except Exception: - ql.log.error("Can't convert emu_level {} to host platform based level".format(emu_level)) - raise + ql.log.debug(f"Convert emu_level {emu_level_name}:{emu_level} to host platform based level {emu_level_name}:{level}") - try: - emu_opt = optname + emu_opt = optname + try: emu_level_name = socket_level_mapping(emu_level, ql.arch.type, ql.os.type) + # emu_opt_name is based on level if emu_level_name == "IPPROTO_IP": emu_opt_name = socket_ip_option_mapping(emu_opt, ql.arch.type, ql.os.type) @@ -241,18 +249,17 @@ def ql_syscall_setsockopt(ql: Qiling, sockfd, level, optname, optval_addr, optle if emu_opt_name.endswith("_NEW") or emu_opt_name.endswith("_OLD"): emu_opt_name = emu_opt_name[:-4] - optname = getattr(socket, emu_opt_name) - ql.log.debug("Convert emu_optname {}:{} to host platform based optname {}:{}".format( - emu_opt_name, emu_opt, emu_opt_name, optname)) + except KeyError: + ql.log.error(f"Can't convert emu_optname {emu_opt} to host platform based optname") + raise + try: + optname = getattr(socket, emu_opt_name) except AttributeError: - ql.log.error("Can't convert emu_optname {}:{} to host platform based emu_optname".format( - emu_opt_name, emu_opt)) + ql.log.error(f"Can't convert emu_optname {emu_opt_name}:{emu_opt} to host platform based emu_optname") raise - except Exception: - ql.log.error("Can't convert emu_optname {} to host platform based optname".format(emu_opt)) - raise + ql.log.debug(f"Convert emu_optname {emu_opt_name}:{emu_opt} to host platform based optname {emu_opt_name}:{optname}") optval = ql.mem.read(optval_addr, optlen) ql.os.fd[sockfd].setsockopt(level, optname, optval, None) @@ -304,13 +311,13 @@ def ql_syscall_bind(ql: Qiling, bind_fd, bind_addr, bind_addrlen): # need a proper fix, for now ipv4 comes first elif sin_family == 2 and ql.os.bindtolocalhost == True: - ql.os.fd[bind_fd].bind(('127.0.0.1', port)) host = "127.0.0.1" + ql.os.fd[bind_fd].bind((host, port)) # IPv4 should comes first elif ql.os.ipv6 == True and sin_family == 10 and ql.os.bindtolocalhost == True: - ql.os.fd[bind_fd].bind(('::1', port)) host = "::1" + ql.os.fd[bind_fd].bind((host, port)) elif ql.os.bindtolocalhost == False: ql.os.fd[bind_fd].bind((host, port)) diff --git a/qiling/os/windows/registry.py b/qiling/os/windows/registry.py index 70dbff3cc..5cd0a6a68 100644 --- a/qiling/os/windows/registry.py +++ b/qiling/os/windows/registry.py @@ -74,7 +74,7 @@ def write(self, key: str, subkey: str, reg_type: int, data: Union[str, bytes, in def save(self, fname: str): if self.conf: with open(fname, 'wb') as ofile: - data = json.dumps(self.conf) + data = json.dumps(self.conf, indent=4) ofile.write(data.encode('utf-8'))