From c865be9953136b066b9e0b0500db7bec2f1cb499 Mon Sep 17 00:00:00 2001 From: brokensound77 Date: Wed, 19 Jul 2023 20:52:15 -0600 Subject: [PATCH] Setup emulations and emulation args --- swat/commands/coverage.py | 1 + swat/commands/emulate.py | 92 +++++++++++++--------------- swat/emulations/base_emulation.py | 24 ++++++-- swat/emulations/persistence/t1098.py | 11 +++- swat/shell.py | 9 ++- swat/utils.py | 3 +- 6 files changed, 79 insertions(+), 61 deletions(-) diff --git a/swat/commands/coverage.py b/swat/commands/coverage.py index 0b20f60..910772f 100644 --- a/swat/commands/coverage.py +++ b/swat/commands/coverage.py @@ -1,3 +1,4 @@ + import argparse import pandas as pd diff --git a/swat/commands/emulate.py b/swat/commands/emulate.py index 1d97bae..2805c0d 100644 --- a/swat/commands/emulate.py +++ b/swat/commands/emulate.py @@ -1,8 +1,11 @@ + import importlib +import os from dataclasses import dataclass from pathlib import Path from swat.commands.base_command import BaseCommand +from swat.emulations.base_emulation import BaseEmulation EMULATIONS_DIR = Path(__file__).parent.parent.absolute() / 'emulations' @@ -17,59 +20,52 @@ class AttackData: class Command(BaseCommand): def __init__(self, **kwargs) -> None: super().__init__(**kwargs) - self.kwargs = kwargs - assert len(self.args) > 0, "No emulation command provided." - emulation_commands = [c for c in self.args if c.replace("-", "_") in - Command.__dict__ or c == "list-commands"] - if emulation_commands: - self.command = emulation_commands[0].replace("-", "_") - self.attack = None - else: - assert len(self.args) > 1, "No emulation command provided." - self.attack = AttackData(self.args[0], self.args[1]) - self.command = "emulate" - - def load_attack(self, attack: AttackData) -> any: - try: - attack_module = importlib.import_module(f"swat.emulations.{attack.tactic}.{attack.technique}") - emulation_module = getattr(attack_module, "Emulation") - return emulation_module(tactic=attack.tactic, technique=attack.technique, **self.kwargs) - except (ImportError, AttributeError) as e: - self.logger.error(f"{e}") - return None + self.emulation_command = None - def list_commands(self): - """List all available commands""" - commands = [method.replace("_", "-") for method in dir(self) if not method.startswith("_") - and callable(getattr(self, method)) and method != "execute" - and method != "list_commands" and method != "load_attack"] - return '|'.join(commands) + args = kwargs.pop('args', None) + if args: + self.emulation_name, *self.emulation_args = args + if self.emulation_name not in self.get_emulate_commands(): + self.logger.info(f"Unknown emulation command: {self.emulation_name}") + else: + emulation_command_class = self.load_emulation_command_class(self.emulation_name) + self.emulation_command = emulation_command_class(args=self.emulation_args, **kwargs) @staticmethod - def list_tactics(**kwargs): - """List all available tactics""" - tactics = "|".join([tactic.name for tactic in EMULATIONS_DIR.iterdir() if - tactic.is_dir() and not tactic.name.startswith('_')]) - return tactics + def get_dotted_command_path(command_name: str) -> str: + """Return the path to the command module.""" + path = list(EMULATIONS_DIR.rglob(f"{command_name}.py")) + assert len(path) == 1, f"Error: Ambiguous command '{command_name}' more than one found with that name" + dotted = str(path[0].relative_to(EMULATIONS_DIR)).replace(os.sep, '.')[:-3] + return dotted @staticmethod - def list_techniques(**kwargs): - """List all available techniques for a given tactic""" - tactic = kwargs.get('args')[1] - tactic_dir = EMULATIONS_DIR / tactic - if not tactic_dir.exists(): - return f"No techniques found for tactic: {tactic}" - techniques = '|'.join([technique.stem for technique in tactic_dir.glob('*.py') - if technique.stem != '__init__']) - return techniques + def get_emulate_commands() -> list[str]: + """Return a list of possible emulation commands.""" + commands = [c.stem for c in EMULATIONS_DIR.rglob('*.py') if not c.name.startswith('_') and + not c.parent.name == "emulations"] + return commands + + def load_emulation_command_class(self, name: str) -> type[BaseEmulation] | None: + # Dynamically import the command module + try: + dotted_command = self.get_dotted_command_path(name) + command_module = importlib.import_module(f"swat.emulations.{dotted_command}") + command_class = getattr(command_module, "Emulation") + except (ImportError, AttributeError) as e: + print(f"Error: Command '{self.emulation_name}' not found.") + return + + # Check if the command class is a subclass of BaseCommand + if not issubclass(command_class, BaseEmulation): + print(f"Error: Command '{self.emulation_command}' is not a valid command.") + return + + return command_class def execute(self) -> None: - if self.attack is not None: - self.logger.info(f"Loading emulation for {self.attack}") - emulate = self.load_attack(self.attack) - emulate.execute() - elif self.command == "list_commands": - self.logger.info(f"Available commands - {self.list_commands()}") + if not self.emulation_command: + self.logger.info(f"Available commands: " + '\n'.join(self.get_emulate_commands())) else: - self.logger.info(f"Executing command - {self.command}") - self.logger.info(f"Command results - {getattr(self, self.command)(args=self.args)}") + self.logger.info(f"Executing command: {self.emulation_name}") + self.emulation_command.execute() diff --git a/swat/emulations/base_emulation.py b/swat/emulations/base_emulation.py index ac2e4d1..8c32862 100644 --- a/swat/emulations/base_emulation.py +++ b/swat/emulations/base_emulation.py @@ -1,17 +1,33 @@ + import logging +from argparse import ArgumentParser from pathlib import Path +from .. import utils + class BaseEmulation: - def __init__(self, tactic: str, technique: str, credentials: Path = None, - token: Path = None, config: dict = None, **extra) -> None: - self.tactic = tactic - self.technique = technique + def __init__(self, credentials: Path, token: Path, config: dict, parser: ArgumentParser, args: list, + **extra) -> None: self.config = config self.credentials = credentials self.token = token self.logger = logging.getLogger(__name__) self.creds = extra['creds'] + self.tactic, self.technique = self.parse_attack_from_class() + + self.args = utils.validate_args(parser, args) + + def execute(self) -> None: raise NotImplementedError("The 'execute' method must be implemented in each emulation class.") + + def parse_attack_from_class(self) -> (str, str): + """Parse tactic and technique from path.""" + _, _, tactic, technique = self.__module__.split('.') + return tactic, technique.upper() + + def exec_str(self, description: str) -> str: + """Return standard execution log string.""" + return f"Executing emulation for: [{self.tactic} - {self.technique}] {description}" diff --git a/swat/emulations/persistence/t1098.py b/swat/emulations/persistence/t1098.py index 9090c3e..8e07ab2 100644 --- a/swat/emulations/persistence/t1098.py +++ b/swat/emulations/persistence/t1098.py @@ -1,9 +1,16 @@ + +import argparse + from swat.emulations.base_emulation import BaseEmulation class Emulation(BaseEmulation): def __init__(self, **kwargs) -> None: - super().__init__(**kwargs) + self.parser = argparse.ArgumentParser(prog='T1098', + description='Account Manipulation', + usage='T1098 [options]') + super().__init__(parser=self.parser, **kwargs) def execute(self) -> None: - self.logger.info(f"Executing emulation for {self.tactic}/{self.technique}") + self.logger.info(self.exec_str(self.parser.description)) + self.logger.info("Hello, world, from T1098!") diff --git a/swat/shell.py b/swat/shell.py index 6bccfd2..78708d2 100644 --- a/swat/shell.py +++ b/swat/shell.py @@ -23,7 +23,8 @@ class SWATShell(cmd.Cmd): ██████╔╝░░╚██╔╝░╚██╔╝░██║░░██║░░░██║░░░ ╚═════╝░░░░╚═╝░░░╚═╝░░╚═╝░░╚═╝░░░╚═╝░░░ :: Simple Workspace ATT&CK Tool :: -\n""" + +""" prompt = "SWAT> " def __init__(self, args: argparse.Namespace) -> None: @@ -33,12 +34,10 @@ def __init__(self, args: argparse.Namespace) -> None: def default(self, line: str) -> None: """Handle commands that are not recognized.""" - args_list = line.split() - command_name = args_list[0] + command_name, *args = line.split() # Create a new Namespace object containing the credentials and command arguments - new_args = dict(command=command_name, args=args_list[1:], - config=CONFIG, creds=self.creds, **(vars(self.args))) + new_args = dict(command=command_name, args=args, config=CONFIG, creds=self.creds, **(vars(self.args))) # Dynamically import the command module try: diff --git a/swat/utils.py b/swat/utils.py index 5c64ea7..027ed72 100644 --- a/swat/utils.py +++ b/swat/utils.py @@ -37,8 +37,7 @@ def validate_args(parser, args): try: parsed_args, unknown = parser.parse_known_args(args) if unknown: - unknown_args = [a for a in unknown if a.startswith('-')] - raise ValueError(f"Unknown arguments {unknown_args}") + raise ValueError(f"Unknown arguments {unknown}") except SystemExit: return None return parsed_args