Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions swat/commands/coverage.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

import argparse

import pandas as pd
Expand Down
92 changes: 44 additions & 48 deletions swat/commands/emulate.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@

import importlib
import os
from dataclasses import dataclass
from pathlib import Path

from swat.commands.base_command import BaseCommand
from swat.emulations.base_emulation import BaseEmulation

EMULATIONS_DIR = Path(__file__).parent.parent.absolute() / 'emulations'

Expand All @@ -17,59 +20,52 @@ class AttackData:
class Command(BaseCommand):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.kwargs = kwargs
assert len(self.args) > 0, "No emulation command provided."
emulation_commands = [c for c in self.args if c.replace("-", "_") in
Command.__dict__ or c == "list-commands"]
if emulation_commands:
self.command = emulation_commands[0].replace("-", "_")
self.attack = None
else:
assert len(self.args) > 1, "No emulation command provided."
self.attack = AttackData(self.args[0], self.args[1])
self.command = "emulate"

def load_attack(self, attack: AttackData) -> any:
try:
attack_module = importlib.import_module(f"swat.emulations.{attack.tactic}.{attack.technique}")
emulation_module = getattr(attack_module, "Emulation")
return emulation_module(tactic=attack.tactic, technique=attack.technique, **self.kwargs)
except (ImportError, AttributeError) as e:
self.logger.error(f"{e}")
return None
self.emulation_command = None

def list_commands(self):
"""List all available commands"""
commands = [method.replace("_", "-") for method in dir(self) if not method.startswith("_")
and callable(getattr(self, method)) and method != "execute"
and method != "list_commands" and method != "load_attack"]
return '|'.join(commands)
args = kwargs.pop('args', None)
if args:
self.emulation_name, *self.emulation_args = args
if self.emulation_name not in self.get_emulate_commands():
self.logger.info(f"Unknown emulation command: {self.emulation_name}")
else:
emulation_command_class = self.load_emulation_command_class(self.emulation_name)
self.emulation_command = emulation_command_class(args=self.emulation_args, **kwargs)

@staticmethod
def list_tactics(**kwargs):
"""List all available tactics"""
tactics = "|".join([tactic.name for tactic in EMULATIONS_DIR.iterdir() if
tactic.is_dir() and not tactic.name.startswith('_')])
return tactics
def get_dotted_command_path(command_name: str) -> str:
"""Return the path to the command module."""
path = list(EMULATIONS_DIR.rglob(f"{command_name}.py"))
assert len(path) == 1, f"Error: Ambiguous command '{command_name}' more than one found with that name"
dotted = str(path[0].relative_to(EMULATIONS_DIR)).replace(os.sep, '.')[:-3]
return dotted

@staticmethod
def list_techniques(**kwargs):
"""List all available techniques for a given tactic"""
tactic = kwargs.get('args')[1]
tactic_dir = EMULATIONS_DIR / tactic
if not tactic_dir.exists():
return f"No techniques found for tactic: {tactic}"
techniques = '|'.join([technique.stem for technique in tactic_dir.glob('*.py')
if technique.stem != '__init__'])
return techniques
def get_emulate_commands() -> list[str]:
"""Return a list of possible emulation commands."""
commands = [c.stem for c in EMULATIONS_DIR.rglob('*.py') if not c.name.startswith('_') and
not c.parent.name == "emulations"]
return commands

def load_emulation_command_class(self, name: str) -> type[BaseEmulation] | None:
# Dynamically import the command module
try:
dotted_command = self.get_dotted_command_path(name)
command_module = importlib.import_module(f"swat.emulations.{dotted_command}")
command_class = getattr(command_module, "Emulation")
except (ImportError, AttributeError) as e:
print(f"Error: Command '{self.emulation_name}' not found.")
return

# Check if the command class is a subclass of BaseCommand
if not issubclass(command_class, BaseEmulation):
print(f"Error: Command '{self.emulation_command}' is not a valid command.")
return

return command_class

def execute(self) -> None:
if self.attack is not None:
self.logger.info(f"Loading emulation for {self.attack}")
emulate = self.load_attack(self.attack)
emulate.execute()
elif self.command == "list_commands":
self.logger.info(f"Available commands - {self.list_commands()}")
if not self.emulation_command:
self.logger.info(f"Available commands: " + '\n'.join(self.get_emulate_commands()))
else:
self.logger.info(f"Executing command - {self.command}")
self.logger.info(f"Command results - {getattr(self, self.command)(args=self.args)}")
self.logger.info(f"Executing command: {self.emulation_name}")
self.emulation_command.execute()
24 changes: 20 additions & 4 deletions swat/emulations/base_emulation.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,33 @@

import logging
from argparse import ArgumentParser
from pathlib import Path

from .. import utils


class BaseEmulation:
def __init__(self, tactic: str, technique: str, credentials: Path = None,
token: Path = None, config: dict = None, **extra) -> None:
self.tactic = tactic
self.technique = technique
def __init__(self, credentials: Path, token: Path, config: dict, parser: ArgumentParser, args: list,
**extra) -> None:
self.config = config
self.credentials = credentials
self.token = token
self.logger = logging.getLogger(__name__)
self.creds = extra['creds']

self.tactic, self.technique = self.parse_attack_from_class()

self.args = utils.validate_args(parser, args)


def execute(self) -> None:
raise NotImplementedError("The 'execute' method must be implemented in each emulation class.")

def parse_attack_from_class(self) -> (str, str):
"""Parse tactic and technique from path."""
_, _, tactic, technique = self.__module__.split('.')
return tactic, technique.upper()

def exec_str(self, description: str) -> str:
"""Return standard execution log string."""
return f"Executing emulation for: [{self.tactic} - {self.technique}] {description}"
11 changes: 9 additions & 2 deletions swat/emulations/persistence/t1098.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@

import argparse

from swat.emulations.base_emulation import BaseEmulation


class Emulation(BaseEmulation):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.parser = argparse.ArgumentParser(prog='T1098',
description='Account Manipulation',
usage='T1098 [options]')
super().__init__(parser=self.parser, **kwargs)

def execute(self) -> None:
self.logger.info(f"Executing emulation for {self.tactic}/{self.technique}")
self.logger.info(self.exec_str(self.parser.description))
self.logger.info("Hello, world, from T1098!")
9 changes: 4 additions & 5 deletions swat/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class SWATShell(cmd.Cmd):
██████╔╝░░╚██╔╝░╚██╔╝░██║░░██║░░░██║░░░
╚═════╝░░░░╚═╝░░░╚═╝░░╚═╝░░╚═╝░░░╚═╝░░░
:: Simple Workspace ATT&CK Tool ::
\n"""

"""
prompt = "SWAT> "

def __init__(self, args: argparse.Namespace) -> None:
Expand All @@ -33,12 +34,10 @@ def __init__(self, args: argparse.Namespace) -> None:

def default(self, line: str) -> None:
"""Handle commands that are not recognized."""
args_list = line.split()
command_name = args_list[0]
command_name, *args = line.split()

# Create a new Namespace object containing the credentials and command arguments
new_args = dict(command=command_name, args=args_list[1:],
config=CONFIG, creds=self.creds, **(vars(self.args)))
new_args = dict(command=command_name, args=args, config=CONFIG, creds=self.creds, **(vars(self.args)))

# Dynamically import the command module
try:
Expand Down
3 changes: 1 addition & 2 deletions swat/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ def validate_args(parser, args):
try:
parsed_args, unknown = parser.parse_known_args(args)
if unknown:
unknown_args = [a for a in unknown if a.startswith('-')]
raise ValueError(f"Unknown arguments {unknown_args}")
raise ValueError(f"Unknown arguments {unknown}")
except SystemExit:
return None
return parsed_args
Expand Down