Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions qiling/arch/x86_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

from abc import abstractmethod
from typing import Optional

from qiling import Qiling
from qiling.arch.x86 import QlArchIntel
Expand Down Expand Up @@ -32,7 +33,7 @@ def __setitem__(self, index: int, data: bytes) -> None:

self.mem.write(self.base + (index * self.entsize), data)

def get_next_free(self, start: int = None, end: int = None) -> int:
def get_next_free(self, start: Optional[int] = None, end: Optional[int] = None) -> int:
# The Linux kernel determines whether the segment is empty by judging whether the content in the current GDT segment is 0.
null_entry = b'\x00' * self.entsize

Expand Down Expand Up @@ -112,7 +113,7 @@ def get_entry(self, index: int) -> bytes:
def set_entry(self, index: int, data: bytes) -> None:
self.array[index] = data

def get_free_idx(self, start: int = None, end: int = None) -> int:
def get_free_idx(self, start: Optional[int] = None, end: Optional[int] = None) -> int:
return self.array.get_next_free(start, end)


Expand Down
18 changes: 11 additions & 7 deletions qiling/debugger/gdb/gdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ def handle_q(subcmd: str) -> Reply:
return b'l' + auxv_data[:length]

elif feature == 'exec-file' and op == 'read':
return f'l{os.path.abspath(self.ql.path)}'
return f'l{self.ql.os.path.host_to_virtual_path(self.ql.path)}'

elif feature == 'libraries-svr4' and op == 'read':
# TODO: this one requires information of loaded libraries which currently not provided
Expand Down Expand Up @@ -584,16 +584,20 @@ def handle_v(subcmd: str) -> Reply:
flags = int(flags, 16)
mode = int(mode, 16)

# try to guess whether this is an emulated path or real one
if path.startswith(os.path.abspath(self.ql.rootfs)):
host_path = path
virtpath = self.ql.os.path.virtual_abspath(path)

if virtpath.startswith(r'/proc'):
# TODO: we really need a centralized virtual filesystem to open
# both emulated (like procfs) and real files, and manage their
# file descriptors seamlessly
fd = -1
else:
host_path = self.ql.os.path.virtual_to_host_path(path)

self.ql.log.debug(f'{PROMPT} target file: {host_path}')
self.ql.log.debug(f'{PROMPT} target host path: {host_path}')

if os.path.exists(host_path) and not path.startswith(r'/proc'):
fd = os.open(host_path, flags, mode)
if os.path.exists(host_path):
fd = os.open(host_path, flags, mode)

return f'F{fd:x}'

Expand Down
10 changes: 6 additions & 4 deletions qiling/os/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,17 @@ def open_ql_file(self, path: str, openflags: int, openmode: int):
if self.has_mapping(path):
return self._open_mapping_ql_file(path, openflags, openmode)

real_path = self.path.transform_to_real_path(path)
return ql_file.open(real_path, openflags, openmode)
host_path = self.path.virtual_to_host_path(path)

return ql_file.open(host_path, openflags, openmode)

def open(self, path: str, openmode: str):
if self.has_mapping(path):
return self._open_mapping(path, openmode)

real_path = self.path.transform_to_real_path(path)
return open(real_path, openmode)
host_path = self.path.virtual_to_host_path(path)

return open(host_path, openmode)

def _parse_path(self, p: Union[os.PathLike, str]) -> str:
fspath = getattr(p, '__fspath__', None)
Expand Down
19 changes: 18 additions & 1 deletion qiling/os/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ def __init__(self, rootfs: str, cwd: str, emulos: QL_OS) -> None:
self.cwd = cwd

# <TEMPORARY>
# temporary aliases for backward compatibility
self.transform_to_relative_path = self.virtual_abspath
self.transform_to_real_path = self.virtual_to_host_path
self.transform_to_link_path = self.virtual_to_host_path
# </TEMPORARY>

@staticmethod
Expand Down Expand Up @@ -258,6 +258,23 @@ def __is_safe_host_path(self, hostpath: Path, strict: bool = False) -> bool:
else:
return True

def host_to_virtual_path(self, hostpath: str) -> str:
"""Convert a host path to its corresponding virtual path relative to rootfs.

Args:
hostpath : host path

Returns: the corresponding virtual path

Raises: `ValueError` in case the host path does not map to a rootfs location
"""

resolved = Path(hostpath).resolve(strict=False)
virtpath = self._cwd_anchor / resolved.relative_to(self._rootfs_path)

return str(virtpath)


def virtual_abspath(self, virtpath: str) -> str:
"""Convert a relative virtual path to an absolute virtual path based
on the current working directory.
Expand Down
196 changes: 99 additions & 97 deletions qiling/os/posix/filestruct.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,133 +3,136 @@
# Cross Platform and Multi Architecture Advanced Binary Emulation Framework
#
import os

from qiling.exception import *
from socket import socket, AddressFamily, SocketKind
from typing import Union

try:
import fcntl
except ImportError:
pass
import socket

class ql_socket:
def __init__(self, socket):
def __init__(self, socket: socket):
self.__fd = socket.fileno()
self.__socket = socket

def __getstate__(self, *args, **kwargs):

_state = self.__dict__.copy()

_state["_ql_socket__socket"] = {
"family": self.__dict__["_ql_socket__socket"].family,
"type": self.__dict__["_ql_socket__socket"].type,
"proto": self.__dict__["_ql_socket__socket"].proto,
"laddr": self.__dict__["_ql_socket__socket"].getsockname(),
}
fname = f'_{self.__class__.__name__}__socket'
sock = self.__dict__[fname]

_state[fname] = {
"family" : sock.family,
"type" : sock.type,
"proto" : sock.proto,
"laddr" : sock.getsockname(),
}

return _state

def __setstate__(self, state):
self.__dict__ = state

@classmethod
def open(self, socket_domain, socket_type, socket_protocol, opts=None):
s = socket.socket(socket_domain, socket_type, socket_protocol)
if opts:
s.setsockopt(*opts)
return self(s)

def read(self, read_len):
return os.read(self.__fd, read_len)

def write(self, write_buf):
return os.write(self.__fd, write_buf)

def fileno(self):
def open(cls, domain: Union[AddressFamily, int], socktype: Union[SocketKind, int], protocol: int):
s = socket(domain, socktype, protocol)

return cls(s)

def read(self, length: int) -> bytes:
return os.read(self.__fd, length)

def write(self, data: bytes) -> int:
return os.write(self.__fd, data)

def fileno(self) -> int:
return self.__fd
def close(self):
return os.close(self.__fd)
def fcntl(self, fcntl_cmd, fcntl_arg):

def close(self) -> None:
os.close(self.__fd)

def fcntl(self, cmd, arg):
try:
return fcntl.fcntl(self.__fd, fcntl_cmd, fcntl_arg)
return fcntl.fcntl(self.__fd, cmd, arg)
except Exception:
pass

def ioctl(self, ioctl_cmd, ioctl_arg):
def ioctl(self, cmd, arg):
try:
return fcntl.ioctl(self.__fd, ioctl_cmd, ioctl_arg)
return fcntl.ioctl(self.__fd, cmd, arg)
except Exception:
pass
def dup(self):
pass

def dup(self) -> 'ql_socket':
new_s = self.__socket.dup()
new_ql_socket = ql_socket(new_s)
return new_ql_socket

def connect(self, connect_addr):
return self.__socket.connect(connect_addr)
def shutdown(self, shutdown_how):
return self.__socket.shutdown(shutdown_how)
def bind(self, bind_addr):
return self.__socket.bind(bind_addr)
def listen(self, listen_num):
return self.__socket.listen(listen_num)

return ql_socket(new_s)

def connect(self, address) -> None:
self.__socket.connect(address)

def shutdown(self, how: int) -> None:
return self.__socket.shutdown(how)

def bind(self, address) -> None:
return self.__socket.bind(address)

def listen(self, backlog: int) -> None:
return self.__socket.listen(backlog)

def getsockname(self):
return self.__socket.getsockname()

def getpeername(self):
return self.__socket.getpeername()

def getsockopt(self, level, optname, optlen):
return self.__socket.getsockopt(level, optname, optlen)
def getsockopt(self, level: int, optname: int, buflen: int):
return self.__socket.getsockopt(level, optname, buflen)

def setsockopt(self, level, optname, optval, optlen):
if optval is not None:
return self.__socket.setsockopt(level, optname, optval)
def setsockopt(self, level: int, optname: int, value: Union[int, bytes, None], optlen: int = 0) -> None:
if value is None:
self.__socket.setsockopt(level, optname, None, optlen)
else:
return self.__socket.setsockopt(level, optname, None, optval)
self.__socket.setsockopt(level, optname, value)

def accept(self):
try:
con, addr = self.__socket.accept()
new_ql_socket = ql_socket(con)
except BlockingIOError:
# For support non-blocking sockets
return None, None
addr = None
new_ql_socket = None
else:
new_ql_socket = ql_socket(con)

return new_ql_socket, addr
def recv(self, recv_len, recv_flags):
return self.__socket.recv(recv_len, recv_flags)
def send(self, send_buf, send_flags):
return self.__socket.send(send_buf, send_flags)

def recvmsg(self, bufsize, ancbufsize, flags):

def recv(self, bufsize: int, flags: int) -> bytes:
return self.__socket.recv(bufsize, flags)

def send(self, data, flags: int) -> int:
return self.__socket.send(data, flags)

def recvmsg(self, bufsize: int, ancbufsize: int, flags: int):
return self.__socket.recvmsg(bufsize, ancbufsize, flags)

def recvfrom(self, recvfrom_len, recvfrom_flags):
return self.__socket.recvfrom(recvfrom_len, recvfrom_flags)
def recvfrom(self, bufsize: int, flags: int):
return self.__socket.recvfrom(bufsize, flags)

def sendto(self, sendto_buf, sendto_flags, sendto_addr):
return self.__socket.sendto(sendto_buf, sendto_flags, sendto_addr)

@property
def family(self):
def family(self) -> AddressFamily:
return self.__socket.family

@property
def socktype(self):
def socktype(self) -> SocketKind:
return self.__socket.type

@property
def socket(self):
def socket(self) -> socket:
return self.__socket

# def __getattr__(self,name):
Expand All @@ -139,41 +142,40 @@ def socket(self):
# raise AttributeError("A instance has no attribute '%s'" % name)

class ql_pipe:
def __init__(self, fd):
def __init__(self, fd: int):
self.__fd = fd

@classmethod
def open(self):
def open(cls):
r, w = os.pipe()
return (self(r), self(w))

def read(self, read_len):
return os.read(self.__fd, read_len)

def write(self, write_buf):
return os.write(self.__fd, write_buf)

def fileno(self):

return (cls(r), cls(w))

def read(self, length: int) -> bytes:
return os.read(self.__fd, length)

def write(self, data: bytes) -> int:
return os.write(self.__fd, data)

def fileno(self) -> int:
return self.__fd

def close(self):
return os.close(self.__fd)
def fcntl(self, fcntl_cmd, fcntl_arg):
def close(self) -> None:
os.close(self.__fd)

def fcntl(self, cmd, arg):
try:
return fcntl.fcntl(self.__fd, fcntl_cmd, fcntl_arg)
return fcntl.fcntl(self.__fd, cmd, arg)
except Exception:
pass

def ioctl(self, ioctl_cmd, ioctl_arg):
def ioctl(self, cmd, arg):
try:
return fcntl.ioctl(self.__fd, ioctl_cmd, ioctl_arg)
return fcntl.ioctl(self.__fd, cmd, arg)
except Exception:
pass

def dup(self):
new_fd = os.dup(self.__fd)
new_ql_pipe = ql_pipe(new_fd)
return new_ql_pipe
pass

def dup(self) -> 'ql_pipe':
new_fd = os.dup(self.__fd)

return ql_pipe(new_fd)
Loading