diff --git a/qiling/core.py b/qiling/core.py index 4e99a39be..1e8b62d69 100644 --- a/qiling/core.py +++ b/qiling/core.py @@ -8,6 +8,7 @@ # See https://stackoverflow.com/questions/39740632/python-type-hinting-without-cyclic-imports if TYPE_CHECKING: + from os import PathLike from unicorn.unicorn import Uc from configparser import ConfigParser from logging import Logger @@ -651,10 +652,13 @@ def restore(self, saved_states: Mapping[str, Any] = {}, *, snapshot: str = None) # Map "ql_path" to any objects which implements QlFsMappedObject. - def add_fs_mapper(self, ql_path, real_dest): + def add_fs_mapper(self, ql_path: Union["PathLike", str], real_dest): self.os.fs_mapper.add_fs_mapping(ql_path, real_dest) - + # Remove "ql_path" mapping. + def remove_fs_mapper(self, ql_path: Union["PathLike", str]): + self.os.fs_mapper.remove_fs_mapping(ql_path) + # push to stack bottom, and update stack register def stack_push(self, data): return self.arch.stack_push(data) diff --git a/qiling/debugger/gdb/gdb.py b/qiling/debugger/gdb/gdb.py index f022c28c0..efb051f7e 100644 --- a/qiling/debugger/gdb/gdb.py +++ b/qiling/debugger/gdb/gdb.py @@ -13,9 +13,10 @@ # gdb remote protocol: # https://sourceware.org/gdb/current/onlinedocs/gdb/Remote-Protocol.html -import os, socket, re +import os, socket, re, tempfile from logging import Logger -from typing import Iterator, Optional, Union +from typing import Iterator, Mapping, Optional, Union +import typing from unicorn import UcError from unicorn.unicorn_const import ( @@ -30,6 +31,8 @@ from qiling.debugger import QlDebugger from qiling.debugger.gdb.xmlregs import QlGdbFeatures from qiling.debugger.gdb.utils import QlGdbUtils +from qiling.os.linux.procfs import QlProcFS +from qiling.os.mapper import QlFsMappedCallable, QlFsMappedObject # gdb logging prompt PROMPT = r'gdb>' @@ -101,6 +104,11 @@ def __init__(self, ql: Qiling, ip: str = '127.0.01', port: int = 9999): self.features = QlGdbFeatures(self.ql.arch.type, self.ql.os.type) self.regsmap = self.features.regsmap + # https://sourceware.org/bugzilla/show_bug.cgi?id=17760 + # 42000 is the magic pid to indicate the remote process. + self.ql.add_fs_mapper(r'/proc/42000/maps', QlFsMappedCallable(QlProcFS.self_map, self.ql.mem)) + self.fake_procfs: Mapping[int, typing.IO] = {} + def run(self): server = GdbSerialConn(self.ip, self.port, self.ql.log) killed = False @@ -582,11 +590,15 @@ def handle_v(subcmd: str) -> Reply: virtpath = self.ql.os.path.virtual_abspath(path) - if virtpath.startswith(r'/proc'): - # TODO: we really need a centralized virtual filesystem to open - # both emulated (like procfs) and real files, and manage their - # file descriptors seamlessly - fd = -1 + if virtpath.startswith(r'/proc') and self.ql.os.fs_mapper.has_mapping(virtpath): + # Mapped object by itself is not backed with a host fd and thus a tempfile can + # 1. Make pread easy to implement and avoid duplicate code like seek, fd etc. + # 2. Avoid fd clash if we assign a generated fd. + tfile = tempfile.TemporaryFile("rb+") + tfile.write(self.ql.os.fs_mapper.open(virtpath, "rb+").read()) + tfile.seek(0, os.SEEK_SET) + fd = tfile.fileno() + self.fake_procfs[fd] = tfile else: host_path = self.ql.os.path.virtual_to_host_path(path) @@ -609,6 +621,10 @@ def handle_v(subcmd: str) -> Reply: fd = int(fd, 16) os.close(fd) + + if fd in self.fake_procfs: + del self.fake_procfs[fd] + return 'F0' return REPLY_EMPTY diff --git a/qiling/extensions/pipe.py b/qiling/extensions/pipe.py index f046ae005..f88c27135 100644 --- a/qiling/extensions/pipe.py +++ b/qiling/extensions/pipe.py @@ -2,52 +2,20 @@ # # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # - +import io +import os from typing import TextIO from qiling.os.posix import stat -class SimpleStringBuffer(TextIO): +class SimpleStringBuffer(io.BytesIO): """Simple FIFO pipe. """ def __init__(self): - self.buff = bytearray() - - def read(self, n: int = -1) -> bytes: - if n == -1: - ret = self.buff - rem = bytearray() - else: - ret = self.buff[:n] - rem = self.buff[n:] - - self.buff = rem - - return bytes(ret) - - def readline(self, limit: int = -1) -> bytes: - ret = bytearray() - - while not (ret.endswith(b'\n') or len(ret) == limit): - ret.extend(self.read(1)) - - return bytes(ret) - - def write(self, s: bytes) -> int: - self.buff.extend(s) - - return len(s) - - def flush(self) -> None: - pass - - def writable(self) -> bool: - return True - - def readable(self) -> bool: - return True - + super().__init__() + + # Compatible with old implementation def seek(self, offset: int, origin: int = 0) -> int: # Imitate os.lseek raise OSError("Illega Seek") @@ -55,9 +23,22 @@ def seek(self, offset: int, origin: int = 0) -> int: def seekable(self) -> bool: return False + def write(self, buf: bytes) -> int: + # For the FIFO stream, the write doesn't change pos. + pos = super().tell() + super().seek(0, os.SEEK_END) + ret = super().write(buf) + super().seek(pos) + return ret + + # Compatible with previous TextIO + @property + def name(self): + return None + class SimpleStreamBase: - def __init__(self, fd: int, *args): - super().__init__(*args) + def __init__(self, fd: int): + super().__init__() self.__fd = fd self.__closed = False @@ -65,6 +46,7 @@ def __init__(self, fd: int, *args): def close(self) -> None: self.__closed = True + @property def closed(self) -> bool: return self.__closed @@ -99,72 +81,9 @@ def flush(self) -> None: def writable(self) -> bool: return True -class SimpleBufferedStream(TextIO): +class SimpleBufferedStream(io.BytesIO): """Simple buffered IO. """ def __init__(self): - self.buff = bytearray() - self.cur = 0 - - def seek(self, offset: int, origin: int = 0) -> int: - if origin == 0: # SEEK_SET - base = 0 - elif origin == 1: # SEEK_CUR - base = self.cur - else: # SEEK_END - base = len(self.buff) - - if base + offset >= len(self.buff): - self.cur = len(self.buff) - elif base + offset <= 0: - self.cur = 0 - else: - self.cur = base + offset - - return self.cur - - def tell(self) -> int: - return self.cur - - def read(self, n: int = -1) -> bytes: - if n == -1: - ret = self.buff[self.cur:] - self.cur = len(self.buff) - else: - ret = self.buff[self.cur:self.cur + n] - - if self.cur + n >= len(self.buff): - self.cur = len(self.buff) - else: - self.cur = self.cur + n - - return bytes(ret) - - def readline(self, limit: int = -1) -> bytes: - ret = bytearray() - - while not (ret.endswith(b'\n') or len(ret) == limit): - ret.extend(self.read(1)) - - return bytes(ret) - - def write(self, s: bytes) -> int: - self.buff = self.buff[:self.cur] - self.buff.extend(s) - - self.cur += len(s) - - return len(s) - - def flush(self) -> None: - pass - - def writable(self) -> bool: - return True - - def readable(self) -> bool: - return True - - def seekable(self) -> bool: - return True \ No newline at end of file + super.__init__() \ No newline at end of file diff --git a/qiling/os/linux/linux.py b/qiling/os/linux/linux.py index 4aed180db..4198b2c35 100644 --- a/qiling/os/linux/linux.py +++ b/qiling/os/linux/linux.py @@ -15,6 +15,7 @@ from qiling.os.fcall import QlFunctionCall from qiling.os.const import * from qiling.os.linux.procfs import QlProcFS +from qiling.os.mapper import QlFsMappedCallable from qiling.os.posix.posix import QlOsPosix from . import futex @@ -121,11 +122,11 @@ def load(self): def setup_procfs(self): - self.fs_mapper.add_fs_mapping(r'/proc/self/auxv', QlProcFS.self_auxv(self)) - self.fs_mapper.add_fs_mapping(r'/proc/self/cmdline', QlProcFS.self_cmdline(self)) - self.fs_mapper.add_fs_mapping(r'/proc/self/environ', QlProcFS.self_environ(self)) - self.fs_mapper.add_fs_mapping(r'/proc/self/exe', QlProcFS.self_exe(self)) - + self.fs_mapper.add_fs_mapping(r'/proc/self/auxv', QlFsMappedCallable(QlProcFS.self_auxv, self)) + self.fs_mapper.add_fs_mapping(r'/proc/self/cmdline', QlFsMappedCallable(QlProcFS.self_cmdline, self)) + self.fs_mapper.add_fs_mapping(r'/proc/self/environ', QlFsMappedCallable(QlProcFS.self_environ, self)) + self.fs_mapper.add_fs_mapping(r'/proc/self/exe', QlFsMappedCallable(QlProcFS.self_exe, self)) + self.fs_mapper.add_fs_mapping(r'/proc/self/maps', QlFsMappedCallable(QlProcFS.self_map, self.ql.mem)) def hook_syscall(self, ql, intno = None): return self.load_syscall() diff --git a/qiling/os/linux/procfs.py b/qiling/os/linux/procfs.py index fa1bed298..6166bf684 100644 --- a/qiling/os/linux/procfs.py +++ b/qiling/os/linux/procfs.py @@ -1,76 +1,18 @@ +import io from typing import TYPE_CHECKING, AnyStr, Optional, Sized +from qiling.os.mapper import QlFsMappedObject + if TYPE_CHECKING: from qiling.os.linux.linux import QlOsLinux - - -class QlFileSeekable: - - def __init__(self): - self.buff: Sized - self.pos = 0 - - def seek(self, offset: int, whence: int) -> int: - assert whence in (0, 1, 2) - - # SEEK_SET - if whence == 0: - pos = offset - - # SEEK_CUR - elif whence == 1: - pos = self.pos + offset - - # SEEK_END - elif whence == 2: - pos = len(self.buff) + offset - - # make sure pos is within reasonabe boundaries - self.pos = min(max(pos, 0), len(self.buff)) - - return self.pos - - def ftell(self) -> int: - return self.pos - - -class QlFileReadable: - - def __init__(self, *, content: Optional[bytearray] = None): - self.buff = content or bytearray() - self.pos = 0 - - def read(self, length: int = -1) -> bytes: - if length == -1: - length = len(self.buff) - - content = self.buff[self.pos:length] - self.pos = min(self.pos + length, len(self.buff)) - - return bytes(content) - - -class QlFileProcFS(QlFileReadable, QlFileSeekable): - - def __init__(self, content: bytearray): - QlFileReadable.__init__(self, content=content) - QlFileSeekable.__init__(self) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, exc_tb): - self.close() - - def close(self): - pass + from qiling.os.memory import QlMemoryManager class QlProcFS: @staticmethod - def self_auxv(os: 'QlOsLinux') -> QlFileProcFS: + def self_auxv(os: 'QlOsLinux') -> QlFsMappedObject: nbytes = os.ql.arch.bits // 8 auxv_addr = os.ql.loader.auxv @@ -85,20 +27,20 @@ def self_auxv(os: 'QlOsLinux') -> QlFileProcFS: auxv_data.extend(os.ql.mem.read(auxv_addr, nbytes)) auxv_addr += nbytes - - return QlFileProcFS(content=auxv_data) + + return io.BytesIO(bytes(auxv_data)) @staticmethod - def self_cmdline(os: 'QlOsLinux') -> QlFileProcFS: + def self_cmdline(os: 'QlOsLinux') -> QlFsMappedObject: entries = (arg.encode('utf-8') for arg in os.ql.argv) - cmdline = bytearray(b'\x00'.join(entries) + b'\x00') + cmdline = b'\x00'.join(entries) + b'\x00' - return QlFileProcFS(content=cmdline) + return io.BytesIO(cmdline) @staticmethod - def self_environ(os: 'QlOsLinux') -> QlFileProcFS: + def self_environ(os: 'QlOsLinux') -> QlFsMappedObject: def __to_bytes(s: AnyStr) -> bytes: if isinstance(s, str): return s.encode('utf-8') @@ -106,14 +48,24 @@ def __to_bytes(s: AnyStr) -> bytes: return s entries = (b'='.join((__to_bytes(k), __to_bytes(v))) for k, v in os.ql.env.items()) - environ = bytearray(b'\x00'.join(entries) + b'\x00') + environ = b'\x00'.join(entries) + b'\x00' - return QlFileProcFS(content=environ) + return io.BytesIO(environ) @staticmethod - def self_exe(os: 'QlOsLinux') -> QlFileProcFS: + def self_exe(os: 'QlOsLinux') -> QlFsMappedObject: with open(os.ql.path, 'rb') as exefile: - content = bytearray(exefile.read()) + content = exefile.read() + + return io.BytesIO(content) + + @staticmethod + def self_map(mem: 'QlMemoryManager') -> QlFsMappedObject: + content = b"" + mapinfo = mem.get_mapinfo() + + for lbound, ubound, perms, label, container in mapinfo: + content += f"{lbound:x}-{ubound:x}\t{perms}p\t0\t00:00\t0\t{container if container else label}\n".encode("utf-8") - return QlFileProcFS(content=content) + return io.BytesIO(content) diff --git a/qiling/os/mapper.py b/qiling/os/mapper.py index c5b2914ae..ffeaeadfa 100644 --- a/qiling/os/mapper.py +++ b/qiling/os/mapper.py @@ -57,6 +57,19 @@ def readline(self, end = b'\n'): def name(self): raise NotImplementedError("QlFsMappedObject property not implemented: name") +# This is a helper class to allow users to pass any class to add_fs_mapper +# +# Everytime open is called on the mapped path, cls(*args, **kwargs) will be called. +class QlFsMappedCallable: + + def __init__(self, cls, *args, **kwargs) -> None: + self._args = args + self._kwargs = kwargs + self._cls = cls + + def __call__(self) -> QlFsMappedObject: + return self._cls(*self._args, **self._kwargs) + class QlFsMapper: def __init__(self, path: QlOsPath): @@ -69,7 +82,7 @@ def _open_mapping_ql_file(self, ql_path: str, openflags: int, openmode: int): if isinstance(real_dest, str): obj = ql_file.open(real_dest, openflags, openmode) - elif inspect.isclass(real_dest): + elif callable(real_dest): obj = real_dest() else: @@ -83,7 +96,7 @@ def _open_mapping(self, ql_path: str, openmode: str): if isinstance(real_dest, str): obj = open(real_dest, openmode) - elif inspect.isclass(real_dest): + elif callable(real_dest): obj = real_dest() else: @@ -125,18 +138,26 @@ def _parse_path(self, p: Union[os.PathLike, str]) -> str: return p - def add_fs_mapping(self, ql_path: Union[os.PathLike, str], real_dest: Any) -> None: + def add_fs_mapping(self, ql_path: Union[os.PathLike, str], real_dest: Union[str, QlFsMappedObject, QlFsMappedCallable]) -> None: """Map an object to Qiling emulated file system. Args: ql_path: Emulated path which should be convertable to a string or a hashable object. e.g. pathlib.Path - real_dest: Mapped object, can be a string, an object or a class. + real_dest: Mapped object, can be a string, an object or a callable(class). string: mapped path in the host machine, e.g. '/dev/urandom' -> '/dev/urandom' object: mapped object, will be returned each time the emulated path has been opened - class: mapped class, will be used to create a new instance each time the emulated path has been opened + class: mapped callable, will be used to create a new instance each time the emulated path has been opened """ ql_path = self._parse_path(ql_path) real_dest = self._parse_path(real_dest) self._mapping[ql_path] = real_dest + + def remove_fs_mapping(self, ql_path: Union[os.PathLike, str]): + """Remove a mapping from the fs mapper. + + Args: + ql_path (Union[os.PathLike, str]): The mapped path. + """ + del self._mapping[self._parse_path(ql_path)]