diff --git a/examples/fuzzing/linux_x8664/fuzz_x8664_linux.py b/examples/fuzzing/linux_x8664/fuzz_x8664_linux.py index c7fb84db8..f24282104 100755 --- a/examples/fuzzing/linux_x8664/fuzz_x8664_linux.py +++ b/examples/fuzzing/linux_x8664/fuzz_x8664_linux.py @@ -17,35 +17,44 @@ $ rm -fr afl_outputs/default/ """ -# No more need for importing unicornafl, try afl.ql_afl_fuzz instead! - import os import sys -from typing import Optional +from typing import Sequence + +QLHOME = os.path.realpath(r'../../..') -sys.path.append("../../..") +sys.path.append(QLHOME) from qiling import Qiling from qiling.const import QL_VERBOSE from qiling.extensions import pipe from qiling.extensions import afl -def main(input_file: str): - ql = Qiling(["./x8664_fuzz"], "../../rootfs/x8664_linux", - verbose=QL_VERBOSE.OFF, # keep qiling logging off - console=False) # thwart program output + +def main(argv: Sequence[str], rootfs: str, infilename: str): + # initialize a qiling instance. + # note we keep verbosity off and thwart the program's output to gain some speed-up + ql = Qiling(argv, rootfs, verbose=QL_VERBOSE.OFF, console=False) + + # get the image base address + img = ql.loader.get_image_by_name('x8664_fuzz') + assert img is not None + + # fuzzing scope: the main function + main_begins = img.base + 0x1275 + main_ends = img.base + 0x1293 # redirect stdin to our mock to feed it with incoming fuzzed keystrokes ql.os.stdin = pipe.SimpleInStream(sys.stdin.fileno()) - def place_input_callback(ql: Qiling, input: bytes, persistent_round: int) -> Optional[bool]: + def place_input_callback(ql: Qiling, feed: bytes, round: int) -> bool: """Feed generated stimuli to the fuzzed target. This method is called with every fuzzing iteration. """ # feed fuzzed input to our mock stdin - ql.os.stdin.write(input) + ql.os.stdin.write(feed) # signal afl to proceed with this input return True @@ -54,23 +63,24 @@ def start_afl(ql: Qiling): """Have Unicorn fork and start instrumentation. """ - afl.ql_afl_fuzz(ql, input_file=input_file, place_input_callback=place_input_callback, exits=[ql.os.exit_point]) + afl.ql_afl_fuzz(ql, infilename, place_input_callback, [main_ends]) + + # set afl instrumentation [re]starting point + ql.hook_address(start_afl, main_begins) - # get image base address - ba = ql.loader.images[0].base + def __crash(ql: Qiling) -> None: + os.abort() # make the process crash whenever __stack_chk_fail@plt is about to be called. # this way afl will count stack protection violations as crashes - ql.hook_address(callback=lambda x: os.abort(), address=ba + 0x126e) - - # set afl instrumentation [re]starting point. we set it to 'main' - ql.hook_address(callback=start_afl, address=ba + 0x1275) + ql.hook_address(__crash, img.base + 0x126e) # okay, ready to roll ql.run() if __name__ == "__main__": - if len(sys.argv) == 1: - raise ValueError("No input file provided.") - - main(sys.argv[1]) + main( + rf'{QLHOME}/examples/fuzzing/linux_x8664/x8664_fuzz'.split(), + rf'{QLHOME}/examples/rootfs/x8664_linux', + rf'{QLHOME}/examples/fuzzing/linux_x8664/afl_inputs/a' + ) diff --git a/examples/fuzzing/linux_x8664/qlfuzzer_x8664_linux.py b/examples/fuzzing/linux_x8664/qlfuzzer_x8664_linux.py new file mode 100644 index 000000000..9b23f20bf --- /dev/null +++ b/examples/fuzzing/linux_x8664/qlfuzzer_x8664_linux.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 + +"""Simple example of how to use QlFuzzer to easily create a custom fuzzer that +leverages Qiling and AFLplusplus. + +Note: this example refers to linux_x8664/fuzz_x8664_linux.py + +Steps: + o Clone and build AFL++ + $ git clone https://github.com/AFLplusplus/AFLplusplus.git + $ make -C AFLplusplus + + o Build Unicorn support + $ ( cd AFLplusplus/unicorn_mode ; ./build_unicorn_support.sh ) + + o Start fuzzing + $ AFL_AUTORESUME=1 AFL_PATH="$(realpath ./AFLplusplus)" PATH="$AFL_PATH:$PATH" afl-fuzz -i afl_inputs -o afl_outputs -U -- python3 ./qlfuzzer_x8664_linux.py @@ + + o Cleanup results + $ rm -fr afl_outputs/default/ +""" + +from __future__ import annotations + +import os +import sys + +from typing import TYPE_CHECKING, Collection, Optional, Sequence + +# replace this if qiling is located elsewhere +QLHOME = os.path.realpath(r'../../..') + +sys.path.append(QLHOME) +from qiling.extensions import pipe +from qiling.extensions.afl.qlfuzzer import QlFuzzer + + +if TYPE_CHECKING: + from qiling import Qiling + + +class MyFuzzer(QlFuzzer): + """Custom fuzzer. + """ + + def setup(self, infilename: str, entry: int, exits: Collection[int], crashes: Optional[Collection[int]] = None) -> None: + super().setup(infilename, entry, exits, crashes) + + # redirect stdin to our mock to feed it with incoming fuzzed keystrokes + self.ql.os.stdin = pipe.SimpleInStream(sys.stdin.fileno()) + + def feed_input(self, ql: Qiling, stimuli: bytes, pround: int) -> bool: + # feed fuzzed input as-is to our mock stdin + ql.os.stdin.write(stimuli) + + # signal afl to proceed with this input + return True + + +def main(argv: Sequence[str], rootfs: str, infilename: str): + # initialize our custom fuzzer + fuzzer = MyFuzzer(argv, rootfs) + + # calculate fuzzing scope effective addresses + main_begins = fuzzer.ea(0x1275) + main_ends = fuzzer.ea(0x1293) + + # make the process crash whenever __stack_chk_fail@plt is about to be called. + # this way afl will count stack protection violations as fuzzing crashes + stack_chk_fail = fuzzer.ea(0x126e) + + # set up fuzzing parameters + fuzzer.setup(infilename, main_begins, [main_ends], [stack_chk_fail]) + + # start fuzzing. + # + # note that although the main function is being fuzzed, we start emulating the program from its + # default starting point to make sure 'main' has all the necessary data initialized and ready. + fuzzer.run() + + +if __name__ == '__main__': + main( + rf'{QLHOME}/examples/fuzzing/linux_x8664/x8664_fuzz'.split(), + rf'{QLHOME}/examples/rootfs/x8664_linux', + rf'{QLHOME}/examples/fuzzing/linux_x8664/afl_inputs/a' + ) diff --git a/qiling/extensions/afl/afl.py b/qiling/extensions/afl/afl.py index 99c15a7d8..90dd7b8af 100644 --- a/qiling/extensions/afl/afl.py +++ b/qiling/extensions/afl/afl.py @@ -1,99 +1,130 @@ -from typing import List, Callable -from qiling.arch.arm import QlArchARM -from qiling.core import Qiling -from unicornafl import * -from unicorn import UcError + +from __future__ import annotations +from typing import TYPE_CHECKING, Any, Collection, Optional, Callable + +from unicornafl import UcAflError, UC_AFL_RET_CALLED_TWICE, uc_afl_fuzz_custom +from unicorn import UcError, UC_ERR_OK + +from qiling import Qiling from qiling.exception import QlErrorNotImplemented + +if TYPE_CHECKING: + from unicorn import Uc + from ctypes import c_char, Array + +InputFeedingCallback = Callable[[Qiling, bytes, int], bool] +FuzzingCallback = Callable[[Qiling], int] +CrashValidationCallback = Callable[[Qiling, int, bytes, int], bool] + + def ql_afl_fuzz(ql: Qiling, input_file: str, - place_input_callback: Callable[["Qiling", bytes, int], bool], - exits: List[int], - validate_crash_callback: Callable[["Qiling", int, bytes, int], bool] = None, + place_input_callback: InputFeedingCallback, + exits: Collection[int], + validate_crash_callback: Optional[CrashValidationCallback] = None, always_validate: bool = False, - persistent_iters: int = 1): - """ Fuzz a range of code with afl++. - This function wraps some common logic with unicornafl.uc_afl_fuzz. - NOTE: If no afl-fuzz instance is found, this function is almost identical to ql.run. - :param Qiling ql: The Qiling instance. - :param str input_file: This usually is the input file name provided by the command argument. - :param Callable place_input_callback: This callback is triggered every time a new child is - generated. It returns True if the input is accepted, or the input would be skipped. - :param list exits: All possible exits. - :param Callable validate_crash_callback: This callback is triggered every time to check if we are crashed. - :param bool always_validate: If this is set to False, validate_crash_callback will be only triggered if - uc_emu_start (which is called internally by afl_fuzz) returns an error. Or the validate_crash_callback will - be triggered every time. - :param int persistent_iters: Fuzz how many times before forking a new child. - :raises UcAflError: If something wrong happens with the fuzzer. + persistent_iters: int = 1) -> None: + """Fuzz a range of code with afl++. + This function wraps some common logic with unicornafl.uc_afl_fuzz. + + Args: + ql: qiling instance + + filename: path to a file that contains an initial input data. this is usually + the filename provided as the fuzzer command line argument + + place_input_callback: a callback that is triggered whenever a new child process is created + and about to be fed with a new fuzzing input. the callback is responsible + to place the newly generated stimuli (as is, or manipulated to the users' + need) where the fuzzed program expects to find its input: e.g. stdin, + memory buffer, file, etc. based on the stimuli, the callback can decide + whether afl should proceed with this round (returns `True`) or discard + it (returns `False`) + + exits: addresses that mark a graceful completion of the fuzzed flow + + validate_crash_callback: a callback that is triggered to check whether the emulation has crashed + + always_validate: indicate whether the crash validating callback should be called on every + iteration (`True`) or only when emluation raises an exception (`False`, default) + + persistent_iters: Reuse the same process for this many fuzzing iterations before forking + a new child process (default: 1) + + Raises: + UcAflError: If something wrong happens with the fuzzer. + """ + + def __fuzzing_wrapper(ql: Qiling) -> int: + """Emulation wrapper. """ - def _dummy_fuzz_callback(_ql: "Qiling"): - if isinstance(_ql.arch, QlArchARM): - pc = _ql.arch.effective_pc - else: - pc = _ql.arch.regs.arch_pc - try: - _ql.uc.emu_start(pc, 0, 0, 0) - except UcError as e: - return e.errno - - return UC_ERR_OK - - return ql_afl_fuzz_custom(ql, input_file, place_input_callback, _dummy_fuzz_callback, exits, - validate_crash_callback, always_validate, persistent_iters) + # if we are fuzzin an arm code, make sure to take the effective pc + pc = getattr(ql.arch, 'effective_pc', ql.arch.regs.arch_pc) + + try: + ql.arch.uc.emu_start(pc, 0) + except UcError as err: + return err.errno + + return UC_ERR_OK + + def __null_crash_validation(ql: Qiling, result: int, input_bytes: bytes, round: int) -> bool: + return False + + ql_afl_fuzz_custom( + ql, + input_file, + place_input_callback, + __fuzzing_wrapper, + exits, + validate_crash_callback or __null_crash_validation, + always_validate, + persistent_iters) + def ql_afl_fuzz_custom(ql: Qiling, input_file: str, - place_input_callback: Callable[["Qiling", bytes, int], bool], - fuzzing_callback: Callable[["Qiling"], int], - exits: List[int] = [], - validate_crash_callback: Callable[["Qiling", bytes, int], bool] = None, + place_input_callback: InputFeedingCallback, + fuzzing_callback: FuzzingCallback, + exits: Collection[int], + validate_crash_callback: CrashValidationCallback, always_validate: bool = False, persistent_iters: int = 1): - ql.uc.ctl_exits_enabled(True) - ql.uc.ctl_set_exits(exits) - - def _ql_afl_place_input_wrapper(uc, input_bytes, iters, data): - (ql, cb, _, _) = data + def __place_input_wrapper(uc: Uc, input_bytes: Array[c_char], iters: int, context: Any) -> bool: + return place_input_callback(ql, input_bytes.value, iters) - if cb: - return cb(ql, input_bytes, iters) - else: - return False + def __validate_crash_wrapper(uc: Uc, result: int, input_bytes: bytes, iters: int, context: Any) -> bool: + return validate_crash_callback(ql, result, input_bytes, iters) - def _ql_afl_validate_wrapper(uc, result, input_bytes, iters, data): - (ql, _, cb, _) = data + def __fuzzing_wrapper(uc: Uc, context: Any) -> int: + return fuzzing_callback(ql) - if cb: - return cb(ql, result, input_bytes, iters) - else: - return False + uc = ql.arch.uc + uc.ctl_exits_enabled(True) + uc.ctl_set_exits(exits) - def _ql_afl_fuzzing_callback_wrapper(uc, data): - (ql, _, _, cb) = data + try: + uc_afl_fuzz_custom( + uc, + input_file, + __place_input_wrapper, + __fuzzing_wrapper, + __validate_crash_wrapper, + always_validate, + persistent_iters, + None) - return cb(ql) + except NameError as ex: + raise QlErrorNotImplemented('unicornafl is not installed or AFL++ is not supported on this platform') from ex - data = (ql, place_input_callback, validate_crash_callback, fuzzing_callback) - try: - # uc_afl_fuzz will never return non-zero value. - uc_afl_fuzz_custom(ql.uc, - input_file=input_file, - place_input_callback=_ql_afl_place_input_wrapper, - fuzzing_callback=_ql_afl_fuzzing_callback_wrapper, - validate_crash_callback=_ql_afl_validate_wrapper, - always_validate=always_validate, - persistent_iters=persistent_iters, - data=data) - except NameError as ex: - raise QlErrorNotImplemented("unicornafl is not installed or AFL++ is not supported on this platform") from ex - except UcAflError as ex: - if ex.errno != UC_AFL_RET_CALLED_TWICE: - # This one is special. Many fuzzing scripts start fuzzing in a Unicorn UC_HOOK_CODE callback and - # starts execution on the current address, which results in a duplicate UC_HOOK_CODE callback. To - # make unicornafl easy to use, we handle this siliently. - # - # For other exceptions, we raise them. - raise \ No newline at end of file + except UcAflError as ex: + if ex.errno != UC_AFL_RET_CALLED_TWICE: + # many fuzzing scripts start fuzzing with a Unicorn UC_HOOK_CODE callback and while + # starting execution at the current address. that results in a duplicated UC_HOOK_CODE + # callback. we handle this case siliently for simplicity + # + # For other exceptions, we raise them. + raise diff --git a/qiling/extensions/afl/qlfuzzer.py b/qiling/extensions/afl/qlfuzzer.py new file mode 100644 index 000000000..c7f7c588c --- /dev/null +++ b/qiling/extensions/afl/qlfuzzer.py @@ -0,0 +1,157 @@ +# @author: elicn + +import os + +from abc import ABC, abstractmethod +from typing import Any, Collection, Dict, Optional, Sequence + +from qiling import Qiling +from qiling.const import QL_VERBOSE +from qiling.extensions import afl +from qiling.os.const import POINTER + + +class QlFuzzer(ABC): + """Simplify Qiling-based fuzzing. + + Subclass it to easily implement a custom Qiling-based fuzzer. + """ + + @staticmethod + def __set_default(params: Dict[str, Any], name: str, value: Any) -> None: + """Set a default value to an option whose value was not specified. + + Args: + params: kwargs dictionary to modify + name: option name + value: default value to set + + Returns: None. however `params` dictionary is modified + """ + + if name not in params: + params[name] = value + + def __init__(self, argv: Sequence[str], rootfs: str, **kwargs) -> None: + """Initialize fuzzer instance. + + Parameters are identical to Qiling init. + """ + + # unless explicitly set otherwise, tune qiling for maximum performance + self.__set_default(kwargs, 'verbose', QL_VERBOSE.DISABLED) + self.__set_default(kwargs, 'log_devices', []) + self.__set_default(kwargs, 'console', False) + + self.ql = Qiling(argv, rootfs, **kwargs) + + def __install_crash_hooks(self, crashes: Collection[int]) -> None: + """Hook certain locations in code and make them simulate a crash so AFL would recognize + them as meaningful targets. + + Args: + crashes: executable addresses to hook + """ + + def __crash(ql: Qiling) -> None: + os.abort() + + for address in crashes: + self.ql.hook_address(__crash, address) + + def __install_kickoff_hook(self, infilename: str, entry: int, exits: Collection[int]) -> None: + def __kickoff(ql: Qiling): + """Have Unicorn forked and start instrumentation. + """ + + # this is just a one-time hook; remove it + ko_hook.remove() + + afl.ql_afl_fuzz(ql, infilename, self.feed_input, exits) + + # set afl instrumentation [re]starting point + ko_hook = self.ql.hook_address(__kickoff, entry) + + def stage_call_site(self, params: Sequence[int]) -> None: + """Stage parameters for a function call. + This method provides a convinient way to set up parameters when fuzzing a function call. + + Args: + params: a sequence of integer values to set as parameters + """ + + self.ql.os.fcall.writeParams([(POINTER, p) for p in params]) + + @abstractmethod + def feed_input(self, ql: Qiling, stimuli: bytes, pround: int) -> bool: + """A callback method invoked by AFL whenever a new fuzzing stimuli is generated. + The method may manipulate the stimuli to its needs or use it as-is, and ultimately + responsible to place it where the fuzzed program expects its input to be found, e.g.: + stdin, file, socket, memory, etc. + + Args: + ql: qiling instance + stimuli: newly generated input to the fuzzed program + pround: iteration number within a persistent session. if persistency was not set, + round value is expected to be 0 every time + + Returns: a boolean indicator of whether AFL should proceed with this fuzzing iteration + or not (i.e. in case the generated stimuli does not satisfy fuzzing logic criteria) + """ + + def ea(self, offset: int, module: Optional[str] = None, *, casefold: bool = False) -> int: + """Get the effective address of a file offset. + + Args: + offset: file offset + module: module basename (the emulated binary, by default) + casefold: match module name case-insensitively. this becomes useful when windows + binaries load their libraries using arbitrary case names + + Returns: the effective address of `offset` using `module` base address. + Raises: `KeyError` if the requested module was not loaded + """ + + image = self.ql.loader.get_image_by_name(module or os.path.basename(self.ql.argv[0]), casefold=casefold) + + if image is None: + raise KeyError(f'could not find a loaded module named "{module}"') + + return image.base + offset + + def setup(self, infilename: str, entry: int, exits: Collection[int], crashes: Optional[Collection[int]] = None) -> None: + """Set up the fuzzing parameters. + + Args: + infilename: path of a file that contains an initial fuzzing input which does not crash + entry: fuzzing entry point. this is where AFL will keep resetting to on each iteration + exits: fuzzing exit points. reaching either one of these addresses means the fuzzing + iteration has ended gracefully and AFL should start a new one + crashes: simulate a crash on these addresses to make AFL mark it as a successfull case. + this is useful to mark "fuzzing points of interest" that would be otherwise overlooked + by AFL since they do not crash the program + + Notes: + - starting a fuzzing session without calling this method first will result in a dry-run + """ + + # set up hooks to simulate crashes + if crashes is not None: + self.__install_crash_hooks(crashes) + + # hook the fuzzing entry address to kick-off AFL + self.__install_kickoff_hook(infilename, entry, exits) + + def run(self, begin: Optional[int] = None) -> None: + """Start the fuzzing session. + + Args: + begin: emulation starting point. this may or may not be the same as the fuzzing entry + point, depending on whether the fuzzed code reply on global resources or prior + initialization. For example, fuzzing a 'main' function would require prior code to + initialize argc and argv, as opposed to a stand-alone (pure) function that only needs + its arguments and does not need any prior initialization to happen first. + If not set, emulation will start from the default starting point. + """ + + self.ql.run(begin)