diff --git a/qiling/extensions/coverage/formats/base.py b/qiling/extensions/coverage/formats/base.py index d9fe7c34e..4ca162cb8 100644 --- a/qiling/extensions/coverage/formats/base.py +++ b/qiling/extensions/coverage/formats/base.py @@ -3,9 +3,14 @@ # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # +from __future__ import annotations + from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + -from qiling import Qiling +if TYPE_CHECKING: + from qiling import Qiling class QlBaseCoverage(ABC): @@ -15,25 +20,21 @@ class QlBaseCoverage(ABC): all the methods marked with the @abstractmethod decorator. """ + FORMAT_NAME: str + def __init__(self, ql: Qiling): super().__init__() self.ql = ql - @property - @staticmethod - @abstractmethod - def FORMAT_NAME() -> str: - raise NotImplementedError - @abstractmethod - def activate(self): + def activate(self) -> None: pass @abstractmethod - def deactivate(self): + def deactivate(self) -> None: pass @abstractmethod - def dump_coverage(self, coverage_file: str): + def dump_coverage(self, coverage_file: str) -> None: pass diff --git a/qiling/extensions/coverage/formats/drcov.py b/qiling/extensions/coverage/formats/drcov.py index 51a421946..bed0f8701 100644 --- a/qiling/extensions/coverage/formats/drcov.py +++ b/qiling/extensions/coverage/formats/drcov.py @@ -3,12 +3,20 @@ # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # -from ctypes import Structure -from ctypes import c_uint32, c_uint16 +from __future__ import annotations + +from ctypes import Structure, c_uint32, c_uint16 +from functools import lru_cache +from typing import TYPE_CHECKING, BinaryIO, Dict, Tuple from .base import QlBaseCoverage +if TYPE_CHECKING: + from qiling import Qiling + from qiling.loader.loader import QlLoader + + # Adapted from https://www.ayrx.me/drcov-file-format class bb_entry(Structure): _fields_ = [ @@ -29,36 +37,61 @@ class QlDrCoverage(QlBaseCoverage): FORMAT_NAME = "drcov" - def __init__(self, ql): + def __init__(self, ql: Qiling): super().__init__(ql) self.drcov_version = 2 self.drcov_flavor = 'drcov' - self.basic_blocks = [] + self.basic_blocks: Dict[int, bb_entry] = {} self.bb_callback = None - @staticmethod - def block_callback(ql, address, size, self): - for mod_id, mod in enumerate(ql.loader.images): - if mod.base <= address <= mod.end: - ent = bb_entry(address - mod.base, size, mod_id) - self.basic_blocks.append(ent) - break + @lru_cache(maxsize=64) + def _get_img_base(self, loader: QlLoader, address: int) -> Tuple[int, int]: + """Retrieve the containing image of a given address. + + Addresses are expected to be aligned to page boundary, and cached for faster retrieval. + """ + + return next((i, img.base) for i, img in enumerate(loader.images) if img.base <= address < img.end) + + def block_callback(self, ql: Qiling, address: int, size: int): + if address not in self.basic_blocks: + try: + # we rely on the fact that images are allocated on page size boundary and + # use it to speed up image retrieval. we align the basic block address to + # page boundary, knowing basic blocks within the same page belong to the + # same image. then we use the aligned address to retreive the containing + # image. returned values are cached so subsequent retrievals for basic + # blocks within the same page will return the cached value instead of + # going through the retreival process again (up to maxsize cached pages) - def activate(self): - self.bb_callback = self.ql.hook_block(self.block_callback, user_data=self) + i, img_base = self._get_img_base(ql.loader, address & ~(0x1000 - 1)) + except StopIteration: + pass + else: + self.basic_blocks[address] = bb_entry(address - img_base, size, i) - def deactivate(self): - self.ql.hook_del(self.bb_callback) + def activate(self) -> None: + self.bb_callback = self.ql.hook_block(self.block_callback) + + def deactivate(self) -> None: + if self.bb_callback: + self.ql.hook_del(self.bb_callback) + + def dump_coverage(self, coverage_file: str) -> None: + def __write_line(bio: BinaryIO, line: str) -> None: + bio.write(f'{line}\n'.encode()) - def dump_coverage(self, coverage_file): with open(coverage_file, "wb") as cov: - cov.write(f"DRCOV VERSION: {self.drcov_version}\n".encode()) - cov.write(f"DRCOV FLAVOR: {self.drcov_flavor}\n".encode()) - cov.write(f"Module Table: version {self.drcov_version}, count {len(self.ql.loader.images)}\n".encode()) - cov.write("Columns: id, base, end, entry, checksum, timestamp, path\n".encode()) + __write_line(cov, f"DRCOV VERSION: {self.drcov_version}") + __write_line(cov, f"DRCOV FLAVOR: {self.drcov_flavor}") + __write_line(cov, f"Module Table: version {self.drcov_version}, count {len(self.ql.loader.images)}") + __write_line(cov, "Columns: id, base, end, entry, checksum, timestamp, path") + for mod_id, mod in enumerate(self. ql.loader.images): - cov.write(f"{mod_id}, {mod.base}, {mod.end}, 0, 0, 0, {mod.path}\n".encode()) - cov.write(f"BB Table: {len(self.basic_blocks)} bbs\n".encode()) - for bb in self.basic_blocks: + __write_line(cov, f"{mod_id}, {mod.base}, {mod.end}, 0, 0, 0, {mod.path}") + + __write_line(cov, f"BB Table: {len(self.basic_blocks)} bbs") + + for bb in self.basic_blocks.values(): cov.write(bytes(bb)) diff --git a/qiling/extensions/coverage/formats/drcov_exact.py b/qiling/extensions/coverage/formats/drcov_exact.py index 685c6c044..4f7082789 100644 --- a/qiling/extensions/coverage/formats/drcov_exact.py +++ b/qiling/extensions/coverage/formats/drcov_exact.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# +# # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # @@ -17,10 +17,6 @@ class QlDrCoverageExact(QlDrCoverage): FORMAT_NAME = "drcov_exact" - def __init__(self, ql): - super().__init__(ql) - - def activate(self): + def activate(self) -> None: # We treat every instruction as a block on its own. - self.bb_callback = self.ql.hook_code(self.block_callback, user_data=self) - \ No newline at end of file + self.bb_callback = self.ql.hook_code(self.block_callback) diff --git a/qiling/extensions/coverage/formats/ezcov.py b/qiling/extensions/coverage/formats/ezcov.py index b25218691..46290e4c5 100644 --- a/qiling/extensions/coverage/formats/ezcov.py +++ b/qiling/extensions/coverage/formats/ezcov.py @@ -1,19 +1,30 @@ #!/usr/bin/env python3 -# +# # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # -from collections import namedtuple -from os.path import basename +from __future__ import annotations + +import os +from typing import Any, TYPE_CHECKING, List, NamedTuple from .base import QlBaseCoverage +if TYPE_CHECKING: + from qiling import Qiling + + # Adapted from https://github.com/nccgroup/Cartographer/blob/main/EZCOV.md#coverage-data -class bb_entry(namedtuple('bb_entry', 'offset size mod_id')): - def csvline(self): - offset = '0x{:08x}'.format(self.offset) +class bb_entry(NamedTuple): + offset: int + size: int + mod_id: Any + + def as_csv(self) -> str: + offset = f'{self.offset:#010x}' mod_id = f"[ {self.mod_id if self.mod_id is not None else ''} ]" + return f"{offset},{self.size},{mod_id}\n" class QlEzCoverage(QlBaseCoverage): @@ -27,29 +38,30 @@ class QlEzCoverage(QlBaseCoverage): FORMAT_NAME = "ezcov" - def __init__(self, ql): + def __init__(self, ql: Qiling): super().__init__(ql) + self.ezcov_version = 1 - self.ezcov_flavor = 'ezcov' - self.basic_blocks = [] - self.bb_callback = None + self.ezcov_flavor = 'ezcov' + self.basic_blocks: List[bb_entry] = [] + self.bb_callback = None - @staticmethod - def block_callback(ql, address, size, self): - mod = ql.loader.find_containing_image(address) - if mod is not None: - ent = bb_entry(address - mod.base, size, basename(mod.path)) - self.basic_blocks.append(ent) + def block_callback(self, ql: Qiling, address: int, size: int): + img = ql.loader.find_containing_image(address) - def activate(self): - self.bb_callback = self.ql.hook_block(self.block_callback, user_data=self) + if img is not None: + self.basic_blocks.append(bb_entry(address - img.base, size, os.path.basename(img.path))) - def deactivate(self): - self.ql.hook_del(self.bb_callback) + def activate(self) -> None: + self.bb_callback = self.ql.hook_block(self.block_callback) - def dump_coverage(self, coverage_file): + def deactivate(self) -> None: + if self.bb_callback: + self.ql.hook_del(self.bb_callback) + + def dump_coverage(self, coverage_file: str) -> None: with open(coverage_file, "w") as cov: cov.write(f"EZCOV VERSION: {self.ezcov_version}\n") cov.write("# Qiling EZCOV exporter tool\n") - for bb in self.basic_blocks: - cov.write(bb.csvline()) \ No newline at end of file + + cov.writelines(bb.as_csv() for bb in self.basic_blocks) diff --git a/qiling/extensions/tracing/formats/base.py b/qiling/extensions/tracing/formats/base.py index 145944340..dbb9c78af 100644 --- a/qiling/extensions/tracing/formats/base.py +++ b/qiling/extensions/tracing/formats/base.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# +# # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # This code structure is copied and modified from the coverage extension @@ -12,24 +12,20 @@ class QlBaseTrace(ABC): To add support for a new coverage format, just derive from this class and implement all the methods marked with the @abstractmethod decorator. """ - + + FORMAT_NAME: str + def __init__(self): super().__init__() - @property - @staticmethod - @abstractmethod - def FORMAT_NAME(): - raise NotImplementedError - @abstractmethod - def activate(self): + def activate(self) -> None: pass @abstractmethod - def deactivate(self): + def deactivate(self) -> None: pass @abstractmethod - def dump_trace(self, trace_file): + def dump_trace(self, trace_file: str) -> None: pass \ No newline at end of file diff --git a/qiling/loader/blob.py b/qiling/loader/blob.py index 382dbb33c..b8831a552 100644 --- a/qiling/loader/blob.py +++ b/qiling/loader/blob.py @@ -4,9 +4,10 @@ # from qiling import Qiling -from qiling.loader.loader import QlLoader +from qiling.loader.loader import QlLoader, Image from qiling.os.memory import QlMemoryHeap + class QlLoaderBLOB(QlLoader): def __init__(self, ql: Qiling): super().__init__(ql) @@ -16,13 +17,19 @@ def __init__(self, ql: Qiling): def run(self): self.load_address = self.ql.os.entry_point # for consistency - self.ql.mem.map(self.ql.os.entry_point, self.ql.os.code_ram_size, info="[code]") - self.ql.mem.write(self.ql.os.entry_point, self.ql.code) + code_begins = self.load_address + code_size = self.ql.os.code_ram_size + code_ends = code_begins + code_size - heap_address = self.ql.os.entry_point + self.ql.os.code_ram_size - heap_size = int(self.ql.os.profile.get("CODE", "heap_size"), 16) - self.ql.os.heap = QlMemoryHeap(self.ql, heap_address, heap_address + heap_size) + self.ql.mem.map(code_begins, code_size, info="[code]") + self.ql.mem.write(code_begins, self.ql.code) - self.ql.arch.regs.arch_sp = heap_address - 0x1000 + # allow image-related functionalities + self.images.append(Image(code_begins, code_ends, 'blob_code')) + + # FIXME: heap starts above end of ram?? + heap_base = code_ends + heap_size = int(self.ql.os.profile.get("CODE", "heap_size"), 16) + self.ql.os.heap = QlMemoryHeap(self.ql, heap_base, heap_base + heap_size) - return + self.ql.arch.regs.arch_sp = code_ends - 0x1000