Document Vault bootstrap usage and extend helper configuration#174
Document Vault bootstrap usage and extend helper configuration#174
Conversation
Reviewer's GuideThis PR introduces a new Python-based, idempotent Vault appliance bootstrap helper with configurable API endpoint and CA bundle support, extends the pytest suite to validate its behavior across discovery, initialization, reuse, and error scenarios, and adds comprehensive documentation covering module design and usage instructions. Sequence diagram for Vault appliance bootstrap processsequenceDiagram
participant User as actor User
participant Helper as "bootstrap_vault_appliance.py"
participant DO as "DigitalOcean API"
participant Vault as "Vault Appliance"
participant Secrets as "DigitalOcean Secrets Manager"
User->>Helper: Run bootstrap script with options
Helper->>DO: Discover droplet IP via tag
Helper->>Vault: Verify Vault systemd unit is active (via SSH)
Helper->>Vault: Check Vault status (via CLI)
alt Vault not initialized
Helper->>Vault: Initialize Vault (operator init)
Helper->>Secrets: Store unseal keys and root token
Helper->>Vault: Unseal Vault
Helper->>Vault: Enable KV v2 engine
Helper->>Vault: Enable AppRole and write policy
Helper->>Secrets: Store AppRole role-id and secret-id
else Vault initialized
alt Vault sealed
Helper->>Secrets: Load unseal keys
Helper->>Vault: Unseal Vault
end
Helper->>Secrets: Load root token
Helper->>Vault: Enable KV v2 engine
Helper->>Vault: Enable AppRole and write policy
Helper->>Secrets: Store AppRole role-id and secret-id
end
Class diagram for the new Vault bootstrap helperclassDiagram
class BootstrapOptions {
+environment: str
+droplet_tag: str
+ssh_user: str
+mount_path: str
+approle_name: str
+policy_name: str
+secret_prefix: str
+key_shares: int
+key_threshold: int
+vault_address: str | None
+ca_cert_path: str | None
}
class CommandRunner {
-_local
+run(command: str, *args: str, env: dict[str, str] | None = None): str
}
class SecretStore {
-_runner: CommandRunner
-_prefix: str
+get(suffix: str): str | None
+put(suffix: str, value: str): None
}
class bootstrap {
+bootstrap(options: BootstrapOptions, runner: CommandRunner | None = None): None
}
BootstrapOptions <.. CommandRunner : uses
CommandRunner <.. SecretStore : uses
SecretStore <.. bootstrap : uses
BootstrapOptions <.. bootstrap : uses
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Warning Rate limit exceeded@leynos has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 12 minutes and 30 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (1)
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. Summary by CodeRabbit
WalkthroughAdd a Vault appliance bootstrapper script with DigitalOcean/doctl integration, secrets storage, and idempotent provisioning (init, unseal, KV v2, AppRole). Add supporting test infrastructure (cmd-mox), pytest config, and documentation detailing design and operational steps. Introduce a scripts package initialiser. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Operator
participant Script as bootstrap_vault_appliance.py
participant Runner as CommandRunner
participant DO as doctl (Droplets/Secrets)
participant SSH as ssh
participant Vault as vault CLI
Operator->>Script: Run with options
Script->>Runner: doctl compute droplet list --tag <tag>
Runner->>DO: Execute
DO-->>Runner: IP address
Runner-->>Script: IP address
Script->>Runner: ssh <user>@<ip> "systemctl is-active vault"
Runner->>SSH: Execute
SSH-->>Runner: active
Runner-->>Script: OK
Script->>Vault: vault status (VAULT_ADDR/VAULT_CACERT)
alt Uninitialised
Script->>Vault: vault operator init -key-shares N -key-threshold T
Vault-->>Script: unseal keys + root token
Script->>Runner: doctl secrets put unseal-1..N, root-token
Runner->>DO: Store secrets
DO-->>Runner: OK
loop Unseal T times
Script->>Vault: vault operator unseal <key>
Vault-->>Script: OK
end
else Sealed
Script->>Runner: doctl secrets get unseal-*
Runner->>DO: Fetch secrets
DO-->>Runner: keys
loop Unseal T times
Script->>Vault: vault operator unseal <key>
Vault-->>Script: OK
end
end
Script->>Vault: vault secrets enable -path=<mount> kv-v2 (if missing)
Vault-->>Script: OK/exists
Script->>Vault: vault policy write <policy> <tmpfile>
Vault-->>Script: OK
Script->>Vault: vault auth enable approle (if needed)
Script->>Vault: vault write auth/approle/role/<name> ...
Vault-->>Script: role-id, secret-id
Script->>Runner: doctl secrets put role-id, secret-id
Runner->>DO: Store
DO-->>Runner: OK
Runner-->>Script: Done
Script-->>Operator: Exit 0
sequenceDiagram
autonumber
actor Operator
participant Script
participant Runner
participant DO as doctl (Secrets)
participant Vault
Operator->>Script: Re-run bootstrap
Script->>Vault: vault status (with VAULT_ADDR[/CACERT])
alt Already initialised and unsealed
Script->>Vault: ensure KV v2 mount
Script->>Vault: ensure AppRole + policy
Script->>Runner: refresh missing secrets (if any)
Runner->>DO: put/get
DO-->>Runner: OK
else Sealed
Script->>Runner: doctl secrets get unseal-*
Runner->>DO: Fetch keys
DO-->>Runner: keys
Script->>Vault: unseal with keys
end
Script-->>Operator: Exit 0
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Comment |
There was a problem hiding this comment.
Hey there - I've reviewed your changes and found some issues that need to be addressed.
- Consider modularizing the bootstrap script by splitting droplet discovery, Vault initialization, and secret‐store interactions into separate classes or modules to improve readability and testability.
- Avoid catching all Exceptions in main; handle known error cases explicitly or re‐raise unexpected exceptions so failures aren't silently masked.
- Centralize building and injecting VAULT_ADDR/VAULT_CACERT/VAULT_TOKEN environment settings into a helper or context manager to reduce repetition across command invocations.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider modularizing the bootstrap script by splitting droplet discovery, Vault initialization, and secret‐store interactions into separate classes or modules to improve readability and testability.
- Avoid catching all Exceptions in main; handle known error cases explicitly or re‐raise unexpected exceptions so failures aren't silently masked.
- Centralize building and injecting VAULT_ADDR/VAULT_CACERT/VAULT_TOKEN environment settings into a helper or context manager to reduce repetition across command invocations.
## Individual Comments
### Comment 1
<location> `scripts/bootstrap_vault_appliance.py:262-269` </location>
<code_context>
+ root_token: str | None = init_data.get("root_token")
+ if not root_token:
+ raise RuntimeError("Vault did not return a root token during initialisation.")
+ for index, key in enumerate(unseal_keys, start=1):
+ secrets.put(f"unseal-{index}", key)
+ secrets.put("root-token", root_token)
+ return {"unseal_keys": list(unseal_keys), "root_token": root_token}
</code_context>
<issue_to_address>
**suggestion:** Unseal key enumeration may mismatch with key_shares if Vault changes output format.
Validate that the number of unseal keys matches options.key_shares before storing, to prevent mismatches if Vault output changes.
```suggestion
unseal_keys: Iterable[str] = init_data.get("unseal_keys_b64", [])
if len(unseal_keys) != options.key_shares:
raise RuntimeError(
f"Vault returned {len(unseal_keys)} unseal keys, but options.key_shares is {options.key_shares}. Aborting to prevent mismatched key storage."
)
root_token: str | None = init_data.get("root_token")
if not root_token:
raise RuntimeError("Vault did not return a root token during initialisation.")
for index, key in enumerate(unseal_keys, start=1):
secrets.put(f"unseal-{index}", key)
secrets.put("root-token", root_token)
return {"unseal_keys": list(unseal_keys), "root_token": root_token}
```
</issue_to_address>
### Comment 2
<location> `scripts/bootstrap_vault_appliance.py:272-278` </location>
<code_context>
+
+def load_unseal_keys(options: BootstrapOptions, secrets: SecretStore) -> list[str]:
+ keys: list[str] = []
+ for index in range(1, options.key_shares + 1):
+ key = secrets.get(f"unseal-{index}")
+ if key:
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Loading unseal keys may skip missing keys without warning.
Log a warning if the number of loaded keys is less than expected to help with troubleshooting.
```suggestion
import logging
def load_unseal_keys(options: BootstrapOptions, secrets: SecretStore) -> list[str]:
keys: list[str] = []
for index in range(1, options.key_shares + 1):
key = secrets.get(f"unseal-{index}")
if key:
keys.append(key)
if len(keys) < options.key_shares:
logging.warning(
"Loaded %d unseal keys, but expected %d. Some keys may be missing.",
len(keys),
options.key_shares,
)
return keys
```
</issue_to_address>
### Comment 3
<location> `scripts/bootstrap_vault_appliance.py:291-292` </location>
<code_context>
+ raise RuntimeError(
+ "Insufficient unseal keys available; aborting to keep Vault sealed."
+ )
+ for key in keys[: options.key_threshold]:
+ runner.run("vault", "operator", "unseal", key, env=env)
+
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Unseal process does not handle errors from vault operator unseal.
Check the result of each unseal command and report or abort on failure to improve reliability.
```suggestion
for key in keys[: options.key_threshold]:
result = runner.run("vault", "operator", "unseal", key, env=env)
if hasattr(result, "returncode"):
if result.returncode != 0:
raise RuntimeError(
f"Vault unseal failed for key: {key!r} with exit code {result.returncode}.\n"
f"Output: {getattr(result, 'stdout', '')}\nError: {getattr(result, 'stderr', '')}"
)
elif result is False or result is None:
raise RuntimeError(
f"Vault unseal failed for key: {key!r}. Command runner returned {result!r}."
)
```
</issue_to_address>
### Comment 4
<location> `scripts/bootstrap_vault_appliance.py:299` </location>
<code_context>
+ mounts = json.loads(runner.run("vault", "secrets", "list", "-format=json", env=env))
+ mount_path = f"{options.mount_path}/"
+ current = mounts.get(mount_path)
+ if current and current.get("type") == "kv" and current.get("options", {}).get("version") == "2":
+ return
+ runner.run(
</code_context>
<issue_to_address>
**suggestion:** KV mount check may not handle partial or misconfigured mounts.
Warn or handle cases where an existing mount is present but misconfigured, as enabling a new mount may leave the old one and cause confusion.
</issue_to_address>
### Comment 5
<location> `scripts/tests/test_bootstrap_vault_appliance.py:28-37` </location>
<code_context>
+def test_bootstrap_initialises_and_configures_vault(tmp_path) -> None:
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding tests for error conditions during Vault initialisation and configuration.
Include test cases for scenarios where Vault commands fail, such as inactive systemd units, failed initialisation, or unexpected secrets manager responses, to verify error handling.
Suggested implementation:
```python
def test_bootstrap_initialises_and_configures_vault(tmp_path) -> None:
registry = CommandRegistry()
doctl = registry.create("doctl")
vault = registry.create("vault")
ssh = registry.create("ssh")
runner = CommandRunner(local_module=registry.local_proxy)
options = make_options()
doctl.queue(
"compute",
def test_bootstrap_vault_systemd_inactive(tmp_path, mocker) -> None:
registry = CommandRegistry()
vault = registry.create("vault")
runner = CommandRunner(local_module=registry.local_proxy)
# Simulate systemd unit inactive
mocker.patch.object(vault, "status", return_value="inactive")
options = make_options()
with pytest.raises(RuntimeError, match="Vault systemd unit is inactive"):
# Replace with actual bootstrap call that checks systemd status
bootstrap_vault(options, runner, vault)
def test_bootstrap_vault_initialisation_failure(tmp_path, mocker) -> None:
registry = CommandRegistry()
vault = registry.create("vault")
runner = CommandRunner(local_module=registry.local_proxy)
# Simulate vault init command failure
mocker.patch.object(vault, "init", side_effect=Exception("Vault initialisation failed"))
options = make_options()
with pytest.raises(Exception, match="Vault initialisation failed"):
# Replace with actual bootstrap call that initialises vault
bootstrap_vault(options, runner, vault)
def test_bootstrap_vault_secrets_manager_error(tmp_path, mocker) -> None:
registry = CommandRegistry()
vault = registry.create("vault")
runner = CommandRunner(local_module=registry.local_proxy)
# Simulate secrets manager error
mocker.patch("scripts.bootstrap_vault_appliance.SecretsManager.store", side_effect=Exception("Secrets manager error"))
options = make_options()
with pytest.raises(Exception, match="Secrets manager error"):
# Replace with actual bootstrap call that stores secrets
bootstrap_vault(options, runner, vault)
```
- You may need to import `pytest` and `mocker` if not already present.
- Replace `bootstrap_vault(options, runner, vault)` with the actual function that performs the bootstrap logic in your codebase.
- Adjust the error messages and mocks to match your implementation details.
- Ensure that the error handling in your bootstrap logic raises the expected exceptions.
</issue_to_address>
### Comment 6
<location> `scripts/tests/test_bootstrap_vault_appliance.py:310-319` </location>
<code_context>
+def test_bootstrap_aborts_when_unseal_keys_missing() -> None:
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test for missing root token when Vault is initialized.
Please also add a test for the scenario where the root token is missing after Vault initialization to verify correct error handling.
</issue_to_address>
### Comment 7
<location> `scripts/bootstrap_vault_appliance.py:45` </location>
<code_context>
+ ca_cert_path: str | None = None
+
+
+class CommandRunner:
+ """Thin wrapper around :mod:`plumbum` for deterministic command execution."""
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider removing the CommandRunner class and using simple helper functions with partial binding to reduce mental overhead and code complexity.
```markdown
The indirection from `CommandRunner` + per‐call `env` + tiny one‐off helpers adds a lot of mental overhead. You can collapse all of that by:
1. Dropping `CommandRunner` entirely and using a single helper (or two)
2. Binding `vault`/`doctl` + their shared `env` once via `functools.partial`
3. Inlining the few JSON parses and error checks
For example, at the top of your file replace:
```python
class CommandRunner:
def __init__(self, local_module=local):
self._local = local_module
def run(self, command: str, *args: str, env=None) -> str:
cmd = self._local[command]
if env: cmd = cmd.with_env(**env)
if args: cmd = cmd[args]
return cmd()
```
with:
```python
import subprocess
from functools import partial
def run(cmd: list[str], env: dict[str,str] | None = None) -> str:
result = subprocess.run(cmd, capture_output=True, text=True, check=True, env=env)
return result.stdout
# bind once in `bootstrap()`:
vault_env = { "VAULT_ADDR": address, **({"VAULT_CACERT": ca_cert} if ca_cert else {}) }
vault = partial(run, ["vault"], env=vault_env)
doctl = partial(run, ["doctl"])
```
Then your calls become:
```python
# before
status = runner.run("ssh", f"{user}@{ip}", "sudo", "systemctl", "is-active", "vault")
# after
status = vault(["ssh", f"{user}@{ip}", "sudo", "systemctl", "is-active", "vault"]).strip()
# before
mounts = json.loads(runner.run("vault", "secrets", "list", "-format=json", env=env))
# after
mounts = json.loads(vault(["secrets", "list", "-format=json"]))
```
— deleting `CommandRunner`, removing per‐call `env` args, and collapsing `SecretStore` methods into a couple of free functions will shave off hundreds of lines and flatten nesting without losing any functionality.
</issue_to_address>
### Comment 8
<location> `scripts/bootstrap_vault_appliance.py:1` </location>
<code_context>
+#!/usr/bin/env -S uv run python
+# /// script
+# requires-python = ">=3.13"
</code_context>
<issue_to_address>
**issue (review_instructions):** Add behavioural and unit tests for the new bootstrap_vault_appliance.py script.
This script introduces significant new functionality for bootstrapping a Vault appliance. You must provide both behavioural and unit tests to verify its correct operation, including happy and unhappy paths, as required by the review instructions.
<details>
<summary>Review instructions:</summary>
**Path patterns:** `**/*`
**Instructions:**
For any new feature or change to an existing feature, both behavioural *and* unit tests are required.
</details>
</issue_to_address>
### Comment 9
<location> `scripts/bootstrap_vault_appliance.py:412` </location>
<code_context>
+ try:
+ options = parse_args(argv)
+ bootstrap(options)
+ except Exception as exc: # noqa: BLE001 - top-level guard
+ print(f"Error: {exc}", file=sys.stderr)
+ return 1
</code_context>
<issue_to_address>
**issue (review_instructions):** Demonstrate error handling and failure modes with tests.
The script contains error handling for various failure modes (e.g., missing secrets, unhealthy Vault service). You must provide tests that demonstrate these error paths are handled as expected.
<details>
<summary>Review instructions:</summary>
**Path patterns:** `**/*`
**Instructions:**
Bug fixes *must* be demonstrated by a test.
</details>
</issue_to_address>
### Comment 10
<location> `docs/vault-applience-bootstrap-process.md:55` </location>
<code_context>
+- `--mount-path`, `--approle-name`, and `--policy-name` customise the KV engine
+ path and DOKS AppRole naming.
+
+All options accept the values documented in `scripts/bootstrap_vault_appliance.py
+--help`.
+
</code_context>
<issue_to_address>
**suggestion (review_instructions):** This line exceeds 80 columns; please wrap paragraphs and bullets to 80 columns.
Please ensure all paragraphs and bullet points are wrapped to 80 columns for readability.
<details>
<summary>Review instructions:</summary>
**Path patterns:** `**/*.md`
**Instructions:**
Paragraphs and bullets must be wrapped to 80 columns
</details>
</issue_to_address>
### Comment 11
<location> `scripts/tests/_vendor/cmd_mox/__init__.py:86` </location>
<code_context>
return MockCommandInvocation(self, tuple(), env)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Replace `tuple()` with `()` ([`tuple-literal`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/tuple-literal))
```suggestion
return MockCommandInvocation(self, (), env)
```
<br/><details><summary>Explanation</summary>The most concise and Pythonic way to create an empty tuple is to use the `()`
literal.
This fits in with the way we create tuples with items, saving a bit of
mental energy that might be taken up with thinking about two different ways of
creating tuples:
```python
x = ("first", "second")
```
Doing things this way has the added advantage of being a nice little performance
improvement. Here are the timings before and after the change:
```
$ python3 -m timeit "tuple()"
10000000 loops, best of 5: 22.6 nsec per loop
```
```
$ python3 -m timeit "()"
50000000 loops, best of 5: 5.46 nsec per loop
```
</details>
</issue_to_address>
### Comment 12
<location> `scripts/bootstrap_vault_appliance.py:275-276` </location>
<code_context>
def load_unseal_keys(options: BootstrapOptions, secrets: SecretStore) -> list[str]:
keys: list[str] = []
for index in range(1, options.key_shares + 1):
key = secrets.get(f"unseal-{index}")
if key:
keys.append(key)
return keys
</code_context>
<issue_to_address>
**suggestion (code-quality):** Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
```suggestion
if key := secrets.get(f"unseal-{index}"):
```
</issue_to_address>
### Comment 13
<location> `scripts/tests/_vendor/cmd_mox/__init__.py:41-42` </location>
<code_context>
def with_env(self, **env: str) -> "MockCommandInvocation":
merged = dict(self._env)
merged.update(env)
return MockCommandInvocation(self._command, self._args, merged)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Merge dictionary updates via the union operator [×2] ([`dict-assign-update-to-union`](https://docs.sourcery.ai/Reference/Default-Rules/suggestions/dict-assign-update-to-union/))
```suggestion
merged = dict(self._env) | env
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| unseal_keys: Iterable[str] = init_data.get("unseal_keys_b64", []) | ||
| root_token: str | None = init_data.get("root_token") | ||
| if not root_token: | ||
| raise RuntimeError("Vault did not return a root token during initialisation.") | ||
| for index, key in enumerate(unseal_keys, start=1): | ||
| secrets.put(f"unseal-{index}", key) | ||
| secrets.put("root-token", root_token) | ||
| return {"unseal_keys": list(unseal_keys), "root_token": root_token} |
There was a problem hiding this comment.
suggestion: Unseal key enumeration may mismatch with key_shares if Vault changes output format.
Validate that the number of unseal keys matches options.key_shares before storing, to prevent mismatches if Vault output changes.
| unseal_keys: Iterable[str] = init_data.get("unseal_keys_b64", []) | |
| root_token: str | None = init_data.get("root_token") | |
| if not root_token: | |
| raise RuntimeError("Vault did not return a root token during initialisation.") | |
| for index, key in enumerate(unseal_keys, start=1): | |
| secrets.put(f"unseal-{index}", key) | |
| secrets.put("root-token", root_token) | |
| return {"unseal_keys": list(unseal_keys), "root_token": root_token} | |
| unseal_keys: Iterable[str] = init_data.get("unseal_keys_b64", []) | |
| if len(unseal_keys) != options.key_shares: | |
| raise RuntimeError( | |
| f"Vault returned {len(unseal_keys)} unseal keys, but options.key_shares is {options.key_shares}. Aborting to prevent mismatched key storage." | |
| ) | |
| root_token: str | None = init_data.get("root_token") | |
| if not root_token: | |
| raise RuntimeError("Vault did not return a root token during initialisation.") | |
| for index, key in enumerate(unseal_keys, start=1): | |
| secrets.put(f"unseal-{index}", key) | |
| secrets.put("root-token", root_token) | |
| return {"unseal_keys": list(unseal_keys), "root_token": root_token} |
| def load_unseal_keys(options: BootstrapOptions, secrets: SecretStore) -> list[str]: | ||
| keys: list[str] = [] | ||
| for index in range(1, options.key_shares + 1): | ||
| key = secrets.get(f"unseal-{index}") | ||
| if key: | ||
| keys.append(key) | ||
| return keys |
There was a problem hiding this comment.
suggestion (bug_risk): Loading unseal keys may skip missing keys without warning.
Log a warning if the number of loaded keys is less than expected to help with troubleshooting.
| def load_unseal_keys(options: BootstrapOptions, secrets: SecretStore) -> list[str]: | |
| keys: list[str] = [] | |
| for index in range(1, options.key_shares + 1): | |
| key = secrets.get(f"unseal-{index}") | |
| if key: | |
| keys.append(key) | |
| return keys | |
| import logging | |
| def load_unseal_keys(options: BootstrapOptions, secrets: SecretStore) -> list[str]: | |
| keys: list[str] = [] | |
| for index in range(1, options.key_shares + 1): | |
| key = secrets.get(f"unseal-{index}") | |
| if key: | |
| keys.append(key) | |
| if len(keys) < options.key_shares: | |
| logging.warning( | |
| "Loaded %d unseal keys, but expected %d. Some keys may be missing.", | |
| len(keys), | |
| options.key_shares, | |
| ) | |
| return keys |
| for key in keys[: options.key_threshold]: | ||
| runner.run("vault", "operator", "unseal", key, env=env) |
There was a problem hiding this comment.
suggestion (bug_risk): Unseal process does not handle errors from vault operator unseal.
Check the result of each unseal command and report or abort on failure to improve reliability.
| for key in keys[: options.key_threshold]: | |
| runner.run("vault", "operator", "unseal", key, env=env) | |
| for key in keys[: options.key_threshold]: | |
| result = runner.run("vault", "operator", "unseal", key, env=env) | |
| if hasattr(result, "returncode"): | |
| if result.returncode != 0: | |
| raise RuntimeError( | |
| f"Vault unseal failed for key: {key!r} with exit code {result.returncode}.\n" | |
| f"Output: {getattr(result, 'stdout', '')}\nError: {getattr(result, 'stderr', '')}" | |
| ) | |
| elif result is False or result is None: | |
| raise RuntimeError( | |
| f"Vault unseal failed for key: {key!r}. Command runner returned {result!r}." | |
| ) |
| mounts = json.loads(runner.run("vault", "secrets", "list", "-format=json", env=env)) | ||
| mount_path = f"{options.mount_path}/" | ||
| current = mounts.get(mount_path) | ||
| if current and current.get("type") == "kv" and current.get("options", {}).get("version") == "2": |
There was a problem hiding this comment.
suggestion: KV mount check may not handle partial or misconfigured mounts.
Warn or handle cases where an existing mount is present but misconfigured, as enabling a new mount may leave the old one and cause confusion.
| def test_bootstrap_initialises_and_configures_vault(tmp_path) -> None: | ||
| registry = CommandRegistry() | ||
| doctl = registry.create("doctl") | ||
| vault = registry.create("vault") | ||
| ssh = registry.create("ssh") | ||
| runner = CommandRunner(local_module=registry.local_proxy) | ||
|
|
||
| options = make_options() | ||
|
|
||
| doctl.queue( |
There was a problem hiding this comment.
suggestion (testing): Consider adding tests for error conditions during Vault initialisation and configuration.
Include test cases for scenarios where Vault commands fail, such as inactive systemd units, failed initialisation, or unexpected secrets manager responses, to verify error handling.
Suggested implementation:
def test_bootstrap_initialises_and_configures_vault(tmp_path) -> None:
registry = CommandRegistry()
doctl = registry.create("doctl")
vault = registry.create("vault")
ssh = registry.create("ssh")
runner = CommandRunner(local_module=registry.local_proxy)
options = make_options()
doctl.queue(
"compute",
def test_bootstrap_vault_systemd_inactive(tmp_path, mocker) -> None:
registry = CommandRegistry()
vault = registry.create("vault")
runner = CommandRunner(local_module=registry.local_proxy)
# Simulate systemd unit inactive
mocker.patch.object(vault, "status", return_value="inactive")
options = make_options()
with pytest.raises(RuntimeError, match="Vault systemd unit is inactive"):
# Replace with actual bootstrap call that checks systemd status
bootstrap_vault(options, runner, vault)
def test_bootstrap_vault_initialisation_failure(tmp_path, mocker) -> None:
registry = CommandRegistry()
vault = registry.create("vault")
runner = CommandRunner(local_module=registry.local_proxy)
# Simulate vault init command failure
mocker.patch.object(vault, "init", side_effect=Exception("Vault initialisation failed"))
options = make_options()
with pytest.raises(Exception, match="Vault initialisation failed"):
# Replace with actual bootstrap call that initialises vault
bootstrap_vault(options, runner, vault)
def test_bootstrap_vault_secrets_manager_error(tmp_path, mocker) -> None:
registry = CommandRegistry()
vault = registry.create("vault")
runner = CommandRunner(local_module=registry.local_proxy)
# Simulate secrets manager error
mocker.patch("scripts.bootstrap_vault_appliance.SecretsManager.store", side_effect=Exception("Secrets manager error"))
options = make_options()
with pytest.raises(Exception, match="Secrets manager error"):
# Replace with actual bootstrap call that stores secrets
bootstrap_vault(options, runner, vault)- You may need to import
pytestandmockerif not already present. - Replace
bootstrap_vault(options, runner, vault)with the actual function that performs the bootstrap logic in your codebase. - Adjust the error messages and mocks to match your implementation details.
- Ensure that the error handling in your bootstrap logic raises the expected exceptions.
| def test_bootstrap_aborts_when_unseal_keys_missing() -> None: | ||
| registry = CommandRegistry() | ||
| doctl = registry.create("doctl") | ||
| vault = registry.create("vault") | ||
| ssh = registry.create("ssh") | ||
| runner = CommandRunner(local_module=registry.local_proxy) | ||
|
|
||
| options = replace(make_options(), vault_address=None) | ||
|
|
||
| doctl.queue( |
There was a problem hiding this comment.
suggestion (testing): Consider adding a test for missing root token when Vault is initialized.
Please also add a test for the scenario where the root token is missing after Vault initialization to verify correct error handling.
| ca_cert_path: str | None = None | ||
|
|
||
|
|
||
| class CommandRunner: |
There was a problem hiding this comment.
issue (complexity): Consider removing the CommandRunner class and using simple helper functions with partial binding to reduce mental overhead and code complexity.
The indirection from `CommandRunner` + per‐call `env` + tiny one‐off helpers adds a lot of mental overhead. You can collapse all of that by:
1. Dropping `CommandRunner` entirely and using a single helper (or two)
2. Binding `vault`/`doctl` + their shared `env` once via `functools.partial`
3. Inlining the few JSON parses and error checks
For example, at the top of your file replace:
```python
class CommandRunner:
def __init__(self, local_module=local):
self._local = local_module
def run(self, command: str, *args: str, env=None) -> str:
cmd = self._local[command]
if env: cmd = cmd.with_env(**env)
if args: cmd = cmd[args]
return cmd()with:
import subprocess
from functools import partial
def run(cmd: list[str], env: dict[str,str] | None = None) -> str:
result = subprocess.run(cmd, capture_output=True, text=True, check=True, env=env)
return result.stdout
# bind once in `bootstrap()`:
vault_env = { "VAULT_ADDR": address, **({"VAULT_CACERT": ca_cert} if ca_cert else {}) }
vault = partial(run, ["vault"], env=vault_env)
doctl = partial(run, ["doctl"])Then your calls become:
# before
status = runner.run("ssh", f"{user}@{ip}", "sudo", "systemctl", "is-active", "vault")
# after
status = vault(["ssh", f"{user}@{ip}", "sudo", "systemctl", "is-active", "vault"]).strip()
# before
mounts = json.loads(runner.run("vault", "secrets", "list", "-format=json", env=env))
# after
mounts = json.loads(vault(["secrets", "list", "-format=json"]))— deleting CommandRunner, removing per‐call env args, and collapsing SecretStore methods into a couple of free functions will shave off hundreds of lines and flatten nesting without losing any functionality.
| return MockCommandInvocation(self, values) | ||
|
|
||
| def with_env(self, **env: str) -> MockCommandInvocation: | ||
| return MockCommandInvocation(self, tuple(), env) |
There was a problem hiding this comment.
suggestion (code-quality): Replace tuple() with () (tuple-literal)
| return MockCommandInvocation(self, tuple(), env) | |
| return MockCommandInvocation(self, (), env) |
Explanation
The most concise and Pythonic way to create an empty tuple is to use the()literal.
This fits in with the way we create tuples with items, saving a bit of
mental energy that might be taken up with thinking about two different ways of
creating tuples:
x = ("first", "second")Doing things this way has the added advantage of being a nice little performance
improvement. Here are the timings before and after the change:
$ python3 -m timeit "tuple()"
10000000 loops, best of 5: 22.6 nsec per loop
$ python3 -m timeit "()"
50000000 loops, best of 5: 5.46 nsec per loop
| key = secrets.get(f"unseal-{index}") | ||
| if key: |
There was a problem hiding this comment.
suggestion (code-quality): Use named expression to simplify assignment and conditional (use-named-expression)
| key = secrets.get(f"unseal-{index}") | |
| if key: | |
| if key := secrets.get(f"unseal-{index}"): |
| merged = dict(self._env) | ||
| merged.update(env) |
There was a problem hiding this comment.
suggestion (code-quality): Merge dictionary updates via the union operator [×2] (dict-assign-update-to-union)
| merged = dict(self._env) | |
| merged.update(env) | |
| merged = dict(self._env) | env |
There was a problem hiding this comment.
Actionable comments posted: 24
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (7)
docs/vault-appliance-module-design.md(1 hunks)docs/vault-applience-bootstrap-process.md(1 hunks)scripts/__init__.py(1 hunks)scripts/bootstrap_vault_appliance.py(1 hunks)scripts/tests/_vendor/cmd_mox/__init__.py(1 hunks)scripts/tests/conftest.py(1 hunks)scripts/tests/test_bootstrap_vault_appliance.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (4)
**/*.py
⚙️ CodeRabbit configuration file
**/*.py: - Keep cyclomatic complexity ≤ 12
- Follow single responsibility and CQRS (command/query segregation)
- Docstrings must follow the
numpystyle guide. Use a single-line summary for private functions and methods, and full structured docs for all public interfaces.- Move conditionals with >2 branches to predicate/helper functions
- Avoid
eval,exec,pickle, monkey-patching,ctypes, unsafe shell- Every module must begin with a triple-quoted docstring explaining its purpose, utility, and usage, including example calls if appropriate.
- Comments and docs must follow en-GB-oxendict (-ize / -yse / -our) spelling and grammar
- Lint suppressions:
- Blanket
# noqa, file-level skips, and categories are forbidden- Only narrow in-line disables (
# noqa: XYZ) are permitted, and must be accompanied byFIXME:or a ticket link, and used only as a last resort.- Use
pytestfixtures for shared setup (conftest.pyorfixtures/)- Replace duplicate tests with
@pytest.mark.parametrize- Prefer
pytest-mockorunittest.mockfor stubs/mocks- Use
assert …, "message"over bare asserts- Reflect all API/behaviour changes in
docs/and update roadmap on completion- Files must not exceed 400 logical lines:
- Decompose large modules into subpackages
- Split large
match/caseor dispatch tables by domain and collocate with targets if appropriate- Move bulky data (fixtures, templates) to external files for parsing at runtime
- Mutable defaults and shadowed built-ins are forbidden
- All code must have clear type hints using modern style (
A | B,list[str],class Foo[A]:,type Bar = int, etc.), with ABC imports drawn from the correct stdlib module.- All path manipulation must be performed using pathlib for cross platform safety. Do not use string manipulation for extraction of path components or filename elements.
Files:
scripts/__init__.pyscripts/tests/conftest.pyscripts/tests/_vendor/cmd_mox/__init__.pyscripts/tests/test_bootstrap_vault_appliance.pyscripts/bootstrap_vault_appliance.py
docs/**/*.md
📄 CodeRabbit inference engine (docs/documentation-style-guide.md)
docs/**/*.md: Use British English based on the Oxford English Dictionary (en-oxendict) in documentation
The word “outwith” is acceptable in documentation
Use the Oxford comma
Treat company names as collective nouns (e.g., “Lille Industries are…”)
Write headings in sentence case
Use Markdown headings (#, ##, ###, …) in order without skipping levels
Follow markdownlint recommendations
Provide code blocks and lists using standard Markdown syntax
Always provide a language identifier for fenced code blocks; use plaintext for non-code text
Use '-' as the first-level bullet and renumber lists when items change
Prefer inline links using text or angle-bracketed URLs
Ensure blank lines before and after bulleted lists and fenced code blocks
Ensure tables have a delimiter line below the header row
Expand any uncommon acronym on first use (e.g., Continuous Integration (CI))
Wrap paragraphs at 80 columns
Wrap code at 120 columns (within documentation)
Do not wrap tables
Use footnotes referenced with [^label]
Include Mermaid diagrams where they add clarity
When embedding figures, use Markdown image syntaxand provide brief, descriptive alt text
Add a short description before each Mermaid diagram for screen readers
docs/**/*.md: Use docs/ as the source of truth for requirements, dependencies, and architectural decisions, and proactively update it when things change
Documentation in docs/ must use en-GB-oxendict spelling and grammar (LICENSE naming is exempt)
Keep TypeScript examples in docs small and compilable (typecheck during builds); mirror production patterns
Files:
docs/vault-applience-bootstrap-process.mddocs/vault-appliance-module-design.md
docs/**/*.{md,mdx}
📄 CodeRabbit inference engine (docs/contents.md)
Documentation style guide – conventions for clear, consistent docs
Files:
docs/vault-applience-bootstrap-process.mddocs/vault-appliance-module-design.md
**/*.md
📄 CodeRabbit inference engine (AGENTS.md)
**/*.md: Validate Markdown with make markdownlint; wrap paragraphs/bullets at 80 columns, code blocks at 120, do not wrap tables/headings; use dashes for bullets and GFM footnotes
Run make fmt after documentation changes to format Markdown and fix table markup; validate Mermaid diagrams with make nixie
Files:
docs/vault-applience-bootstrap-process.mddocs/vault-appliance-module-design.md
⚙️ CodeRabbit configuration file
**/*.md: * Avoid 2nd person or 1st person pronouns ("I", "you", "we")
- Use en-GB-oxendict (-ize / -yse / -our) spelling and grammar
- Headings must not be wrapped.
- Documents must start with a level 1 heading
- Headings must correctly increase or decrease by no more than one level at a time
- Use GitHub-flavoured Markdown style for footnotes and endnotes.
- Numbered footnotes must be numbered by order of appearance in the document.
Files:
docs/vault-applience-bootstrap-process.mddocs/vault-appliance-module-design.md
🧬 Code graph analysis (2)
scripts/tests/test_bootstrap_vault_appliance.py (2)
scripts/tests/_vendor/cmd_mox/__init__.py (3)
CommandRegistry(99-113)create(104-107)queue(55-56)scripts/bootstrap_vault_appliance.py (4)
BootstrapOptions(29-42)CommandRunner(45-57)bootstrap(372-405)get(70-92)
scripts/bootstrap_vault_appliance.py (1)
scripts/tests/_vendor/cmd_mox/__init__.py (2)
with_env(40-43)with_env(85-86)
⏰ Context from checks skipped due to timeout of 120000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Sourcery review
🔇 Additional comments (9)
scripts/__init__.py (1)
1-3: LGTM!The package initialiser correctly establishes the scripts namespace with a clear module docstring and future annotations import.
docs/vault-applience-bootstrap-process.md (1)
3-87: LGTM!The documentation clearly explains the bootstrap process, prerequisites, running instructions, secrets management, and troubleshooting. The structure, language, and formatting follow the guidelines correctly.
docs/vault-appliance-module-design.md (2)
59-72: LGTM!The bootstrap helper strategy documentation is clear, comprehensive, and follows en-GB-oxendict spelling and grammar conventions. The structure and content align well with the implementation details.
59-72: Correct the American spelling.Line 59 contains "behavior" which should be "behaviour" to follow en-GB-oxendict spelling conventions.
As per coding guidelines.
Apply this diff:
-- **Bootstrap helper strategy (2024-12-06).** The Python bootstrapper resolves - the appliance IP via DigitalOcean tags before touching Vault. It records the - generated root token and unseal shares in DigitalOcean Secrets Manager using - a configurable prefix so repeated runs converge without manual data capture. - Subsequent invocations read the stored material, unseal Vault only when - necessary, and ensure the `secret/` KV v2 engine plus the `doks` AppRole exist - with deterministic TTLs. The helper writes the AppRole policy from a - temporary file to sidestep shell heredocs and stores the resulting role and - secret identifiers alongside the unseal keys for reuse by CI workflows. Tests - exercise happy and unhappy paths via `cmd-mox` backed command mocks to - validate DigitalOcean, Vault, and SSH interactions without hitting live APIs. - Operators can override the Vault address when routing through the managed - load balancer and supply a custom CA bundle so Vault CLI calls validate the - TLS chain without disabling verification. +- **Bootstrap helper strategy (2024-12-06).** The Python bootstrapper resolves + the appliance IP via DigitalOcean tags before touching Vault. It records the + generated root token and unseal shares in DigitalOcean Secrets Manager using + a configurable prefix so repeated runs converge without manual data capture. + Subsequent invocations read the stored material, unseal Vault only when + necessary, and ensure the `secret/` KV v2 engine plus the `doks` AppRole exist + with deterministic TTLs. The helper writes the AppRole policy from a + temporary file to sidestep shell heredocs and stores the resulting role and + secret identifiers alongside the unseal keys for reuse by CI workflows. Tests + exercise happy and unhappy paths via `cmd-mox` backed command mocks to + validate DigitalOcean, Vault, and SSH interactions without hitting live APIs. + Operators can override the Vault address when routing through the managed + load balancer and supply a custom CA bundle so Vault CLI calls validate the + TLS chain without disabling verification.Wait, I misread. Let me re-check the line. Actually, the text does not contain "behavior", I was mistaken. Let me re-examine.
Actually, looking at line 59-72, I don't see "behavior" in the provided text. The word "idempotent" appears on line 12 in the context, but the changed lines 59-72 do not contain "behavior". Let me withdraw that comment.
scripts/tests/test_bootstrap_vault_appliance.py (3)
28-176: LGTM!The test correctly validates the full bootstrap flow, including droplet discovery, Vault initialisation, unseal key provisioning, AppRole setup, secret storage, and environment propagation (VAULT_TOKEN, VAULT_ADDR). The assertions confirm that the policy write occurs exactly once with the correct environment.
178-308: LGTM!The test correctly validates that the bootstrap flow reuses existing configuration without re-initialising Vault. The assertions confirm that no init calls are made and that the initial Vault interaction applies VAULT_ADDR and VAULT_CACERT correctly.
310-359: LGTM!The test correctly validates that the bootstrap flow aborts with a RuntimeError when unseal keys are missing. The assertions confirm that the error message matches "Insufficient unseal keys" and that VAULT_CACERT is not set when the CA path is absent.
scripts/bootstrap_vault_appliance.py (2)
1-419: Verify that the file does not exceed 400 logical lines.The file has 419 total lines. Ensure that the number of logical lines (excluding blank lines, comments, and docstrings) does not exceed 400. If it does, decompose the module into subpackages or split functions into separate files.
As per coding guidelines.
372-405: LGTM!The
bootstrapfunction correctly orchestrates the entire workflow: discovering the droplet, verifying the Vault service, reading status, initialising or unsealing as needed, and ensuring the KV mount and AppRole exist. The logic is idempotent and handles both fresh installations and existing configurations appropriately.
| @@ -0,0 +1,87 @@ | |||
| # Vault appliance bootstrap process | |||
There was a problem hiding this comment.
Correct the filename typo.
The filename contains "applience" but should be "appliance". Rename the file to vault-appliance-bootstrap-process.md to match the correct spelling.
🤖 Prompt for AI Agents
In docs/vault-applience-bootstrap-process.md around lines 1 to 1, the filename
contains a typo ("applience"); rename the file to
docs/vault-appliance-bootstrap-process.md so the spelling is correct and update
any references (links, TOCs, scripts, CI configs) to point to the new filename
to avoid broken links.
| class CommandRunner: | ||
| """Thin wrapper around :mod:`plumbum` for deterministic command execution.""" | ||
|
|
||
| def __init__(self, local_module=local): |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Add a type hint to the parameter.
The local_module parameter in CommandRunner.__init__ requires a type hint. Use Any from the typing module or a protocol if a more specific type is available.
As per coding guidelines.
🤖 Prompt for AI Agents
In scripts/bootstrap_vault_appliance.py around line 48, the __init__ signature
def __init__(self, local_module=local): is missing a type hint for local_module;
update the parameter to include a type (e.g., local_module: Any = local), add
the necessary import from typing (from typing import Any) at the top of the
file, or use a more specific protocol type if available per project conventions,
and ensure the default value remains unchanged.
| def parse_args(argv: Sequence[str] | None = None) -> BootstrapOptions: | ||
| parser = argparse.ArgumentParser(description="Bootstrap the Vault appliance") | ||
| parser.add_argument( | ||
| "--environment", | ||
| required=True, | ||
| help="Logical environment identifier (for example 'dev').", | ||
| ) | ||
| parser.add_argument( | ||
| "--droplet-tag", | ||
| required=True, | ||
| help="DigitalOcean tag used to discover the Vault droplet.", | ||
| ) | ||
| parser.add_argument( | ||
| "--ssh-user", | ||
| default="root", | ||
| help="SSH user with access to the appliance (default: root).", | ||
| ) | ||
| parser.add_argument( | ||
| "--mount-path", | ||
| default="secret", | ||
| help="KV v2 mount point used for application secrets (default: secret).", | ||
| ) | ||
| parser.add_argument( | ||
| "--approle-name", | ||
| default="doks-deployer", | ||
| help="Name of the AppRole consumed by the DOKS workflow.", | ||
| ) | ||
| parser.add_argument( | ||
| "--policy-name", | ||
| default="doks-deployer", | ||
| help="Name of the policy bound to the DOKS AppRole.", | ||
| ) | ||
| parser.add_argument( | ||
| "--secret-prefix", | ||
| required=True, | ||
| help="Prefix for DigitalOcean Secrets that store unseal material.", | ||
| ) | ||
| parser.add_argument( | ||
| "--key-shares", | ||
| type=int, | ||
| default=5, | ||
| help="Number of unseal key shares to generate when initialising Vault.", | ||
| ) | ||
| parser.add_argument( | ||
| "--key-threshold", | ||
| type=int, | ||
| default=3, | ||
| help="Number of unseal shares required to unseal Vault.", | ||
| ) | ||
| parser.add_argument( | ||
| "--vault-address", | ||
| help=( | ||
| "Override the Vault API address. Defaults to the discovered droplet" | ||
| " IP on port 8200 over HTTPS." | ||
| ), | ||
| ) | ||
| parser.add_argument( | ||
| "--ca-cert-path", | ||
| help=( | ||
| "Optional path to the Vault certificate authority bundle. When set," | ||
| " exported as VAULT_CACERT for CLI calls." | ||
| ), | ||
| ) | ||
| args = parser.parse_args(argv) | ||
| return BootstrapOptions( | ||
| environment=args.environment, | ||
| droplet_tag=args.droplet_tag, | ||
| ssh_user=args.ssh_user, | ||
| mount_path=args.mount_path.rstrip("/"), | ||
| approle_name=args.approle_name, | ||
| policy_name=args.policy_name, | ||
| secret_prefix=args.secret_prefix, | ||
| key_shares=args.key_shares, | ||
| key_threshold=args.key_threshold, | ||
| vault_address=args.vault_address.rstrip("/") if args.vault_address else None, | ||
| ca_cert_path=args.ca_cert_path, | ||
| ) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Add a docstring to the public function.
The parse_args function requires a structured docstring following the numpy style guide, including parameters, returns, and a summary.
As per coding guidelines.
🤖 Prompt for AI Agents
In scripts/bootstrap_vault_appliance.py around lines 123 to 199, the public
function parse_args is missing a numpy-style docstring; add a concise
numpy-style docstring immediately below the def line that includes a one-line
summary, a Parameters section documenting argv (type and purpose), a Returns
section documenting the BootstrapOptions return type and what it contains, and
(optionally) Notes or Raises if relevant; keep wording brief and accurate and
follow numpy formatting conventions so the function is properly documented for
users and tooling.
| def discover_droplet_ip(options: BootstrapOptions, runner: CommandRunner) -> str: | ||
| output = runner.run( | ||
| "doctl", | ||
| "compute", | ||
| "droplet", | ||
| "list", | ||
| "--tag-name", | ||
| options.droplet_tag, | ||
| "--format", | ||
| "PublicIPv4", | ||
| "--no-header", | ||
| ) | ||
| addresses = [line.strip() for line in output.splitlines() if line.strip()] | ||
| if not addresses: | ||
| raise RuntimeError( | ||
| f"No droplets tagged '{options.droplet_tag}' found in DigitalOcean." | ||
| ) | ||
| if len(addresses) > 1: | ||
| raise RuntimeError( | ||
| "Multiple droplets matched the Vault tag; aborting to avoid ambiguity." | ||
| ) | ||
| return addresses[0] |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Add a docstring to the public function.
The discover_droplet_ip function requires a structured docstring following the numpy style guide, including parameters, returns, raises, and a summary.
As per coding guidelines.
🤖 Prompt for AI Agents
In scripts/bootstrap_vault_appliance.py around lines 202 to 223, add a
numpy-style docstring to the public function discover_droplet_ip describing what
the function does, its parameters (options: BootstrapOptions, runner:
CommandRunner), the return value (str — the discovered public IPv4 address), and
the exceptions raised (RuntimeError when no droplets found or when multiple
matches exist); keep the docstring concise with a one-line summary, a short
description if needed, and explicit "Parameters", "Returns", and "Raises"
sections formatted to follow numpy docstring conventions.
| def verify_vault_service(ip: str, options: BootstrapOptions, runner: CommandRunner) -> None: | ||
| status = runner.run( | ||
| "ssh", | ||
| f"{options.ssh_user}@{ip}", | ||
| "sudo", | ||
| "systemctl", | ||
| "is-active", | ||
| "vault", | ||
| ).strip() | ||
| if status != "active": | ||
| raise RuntimeError(f"Vault systemd unit is not active (reported '{status}').") | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Add a docstring to the public function.
The verify_vault_service function requires a structured docstring following the numpy style guide, including parameters, raises, and a summary.
As per coding guidelines.
🤖 Prompt for AI Agents
In scripts/bootstrap_vault_appliance.py around lines 226 to 237, the public
function verify_vault_service is missing a numpy-style docstring; add a
docstring immediately under the def that briefly summarizes the function,
documents parameters (ip: str, options: BootstrapOptions, runner: CommandRunner)
with types, and documents raises (RuntimeError) with conditions, following numpy
docstring format (short summary line, blank line, Parameters:, Raises:). Keep it
concise and ensure types and the condition that the vault systemd unit is not
active are included.
| @@ -0,0 +1,359 @@ | |||
| from __future__ import annotations | |||
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Add a module docstring.
Every module must begin with a triple-quoted docstring explaining its purpose, utility, and usage. Add a docstring describing that this module contains tests for the Vault appliance bootstrap script.
As per coding guidelines.
🤖 Prompt for AI Agents
In scripts/tests/test_bootstrap_vault_appliance.py at line 1, the module is
missing the required triple-quoted module docstring; add a top-of-file docstring
(as the first statement) that briefly explains that this module contains tests
for the Vault appliance bootstrap script, describing its purpose, what it tests,
and any usage notes per the coding guidelines.
| def make_options() -> BootstrapOptions: | ||
| return BootstrapOptions( | ||
| environment="dev", | ||
| droplet_tag="vault-dev", | ||
| ssh_user="root", | ||
| mount_path="secret", | ||
| approle_name="doks", | ||
| policy_name="doks", | ||
| secret_prefix="dev-vault", | ||
| key_shares=5, | ||
| key_threshold=3, | ||
| vault_address="https://vault.dev.example:8200", | ||
| ) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Add a docstring to the helper function.
The make_options helper requires a docstring (single-line summary is sufficient for private functions).
As per coding guidelines.
🤖 Prompt for AI Agents
In scripts/tests/test_bootstrap_vault_appliance.py around lines 13 to 25, the
helper function make_options is missing a docstring; add a single-line docstring
immediately below the def that succinctly describes the function (e.g., "Return
a BootstrapOptions instance preconfigured for vault appliance tests"), using a
standard Python string literal directly under the function definition so the
function has an inline docstring.
| def test_bootstrap_initialises_and_configures_vault(tmp_path) -> None: | ||
| registry = CommandRegistry() | ||
| doctl = registry.create("doctl") | ||
| vault = registry.create("vault") | ||
| ssh = registry.create("ssh") | ||
| runner = CommandRunner(local_module=registry.local_proxy) | ||
|
|
||
| options = make_options() | ||
|
|
||
| doctl.queue( | ||
| "compute", | ||
| "droplet", | ||
| "list", | ||
| "--tag-name", | ||
| options.droplet_tag, | ||
| "--format", | ||
| "PublicIPv4", | ||
| "--no-header", | ||
| stdout="203.0.113.10\n", | ||
| ) | ||
| ssh.queue( | ||
| f"{options.ssh_user}@203.0.113.10", | ||
| "sudo", | ||
| "systemctl", | ||
| "is-active", | ||
| "vault", | ||
| stdout="active\n", | ||
| ) | ||
| vault.queue( | ||
| "status", | ||
| "-format=json", | ||
| stdout=json.dumps({"initialized": False, "sealed": True}), | ||
| ) | ||
| init_payload = json.dumps( | ||
| { | ||
| "unseal_keys_b64": [ | ||
| "key-1", | ||
| "key-2", | ||
| "key-3", | ||
| "key-4", | ||
| "key-5", | ||
| ], | ||
| "root_token": "root-token", | ||
| } | ||
| ) | ||
| vault.queue( | ||
| "operator", | ||
| "init", | ||
| "-key-shares", | ||
| str(options.key_shares), | ||
| "-key-threshold", | ||
| str(options.key_threshold), | ||
| "-format=json", | ||
| stdout=init_payload, | ||
| ) | ||
| for index in range(1, options.key_shares + 1): | ||
| doctl.queue( | ||
| "secrets", | ||
| "manager", | ||
| "secrets", | ||
| "create", | ||
| f"{options.secret_prefix}-unseal-{index}", | ||
| "--data", | ||
| f"key-{index}", | ||
| ) | ||
| doctl.queue( | ||
| "secrets", | ||
| "manager", | ||
| "secrets", | ||
| "create", | ||
| f"{options.secret_prefix}-root-token", | ||
| "--data", | ||
| "root-token", | ||
| ) | ||
| for index in range(1, options.key_threshold + 1): | ||
| vault.queue("operator", "unseal", f"key-{index}") | ||
| vault.queue( | ||
| "status", | ||
| "-format=json", | ||
| stdout=json.dumps({"initialized": True, "sealed": False}), | ||
| ) | ||
| vault.queue( | ||
| "secrets", | ||
| "list", | ||
| "-format=json", | ||
| stdout=json.dumps({}), | ||
| ) | ||
| vault.queue( | ||
| "secrets", | ||
| "enable", | ||
| "-path", | ||
| options.mount_path, | ||
| "kv-v2", | ||
| ) | ||
| vault.queue( | ||
| "auth", | ||
| "list", | ||
| "-format=json", | ||
| stdout=json.dumps({}), | ||
| ) | ||
| vault.queue("auth", "enable", "approle") | ||
| vault.queue() | ||
| vault.queue( | ||
| "write", | ||
| f"auth/approle/role/{options.approle_name}", | ||
| f"token_policies={options.policy_name}", | ||
| "secret_id_ttl=24h", | ||
| "token_ttl=1h", | ||
| "token_max_ttl=4h", | ||
| ) | ||
| vault.queue( | ||
| "read", | ||
| "-field=role_id", | ||
| f"auth/approle/role/{options.approle_name}/role-id", | ||
| stdout="role-123\n", | ||
| ) | ||
| vault.queue( | ||
| "write", | ||
| "-f", | ||
| "-field=secret_id", | ||
| f"auth/approle/role/{options.approle_name}/secret-id", | ||
| stdout="secret-456\n", | ||
| ) | ||
| doctl.queue( | ||
| "secrets", | ||
| "manager", | ||
| "secrets", | ||
| "create", | ||
| f"{options.secret_prefix}-role-id", | ||
| "--data", | ||
| "role-123", | ||
| ) | ||
| doctl.queue( | ||
| "secrets", | ||
| "manager", | ||
| "secrets", | ||
| "create", | ||
| f"{options.secret_prefix}-secret-id", | ||
| "--data", | ||
| "secret-456", | ||
| ) | ||
|
|
||
| bootstrap(options, runner=runner) | ||
|
|
||
| policy_calls = [call for call in vault.calls if call.args[:2] == ("policy", "write")] | ||
| assert len(policy_calls) == 1 | ||
| assert policy_calls[0].env.get("VAULT_TOKEN") == "root-token" | ||
| assert policy_calls[0].env.get("VAULT_ADDR") == options.vault_address | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Add a docstring to the test function.
The test function requires a docstring explaining what behaviour it validates (full bootstrap flow including initialisation, unsealing, KV mount, AppRole setup, and environment propagation).
As per coding guidelines.
🤖 Prompt for AI Agents
In scripts/tests/test_bootstrap_vault_appliance.py around lines 28 to 176, the
test function test_bootstrap_initialises_and_configures_vault is missing a
docstring; add a concise docstring immediately under the def line describing
what the test validates (the full bootstrap flow: Vault initialization,
unsealing, KV mount, AppRole setup, secret storage in Doctl, and propagation of
VAULT_TOKEN/VAULT_ADDR to commands) — one short sentence or a brief two-line
description is sufficient and should follow the project's docstring style.
| def test_bootstrap_reuses_existing_configuration() -> None: | ||
| registry = CommandRegistry() | ||
| doctl = registry.create("doctl") | ||
| vault = registry.create("vault") | ||
| ssh = registry.create("ssh") | ||
| runner = CommandRunner(local_module=registry.local_proxy) | ||
|
|
||
| options = replace(make_options(), ca_cert_path="/tmp/vault-ca.pem") | ||
|
|
||
| doctl.queue( | ||
| "compute", | ||
| "droplet", | ||
| "list", | ||
| "--tag-name", | ||
| options.droplet_tag, | ||
| "--format", | ||
| "PublicIPv4", | ||
| "--no-header", | ||
| stdout="203.0.113.10\n", | ||
| ) | ||
| ssh.queue( | ||
| f"{options.ssh_user}@203.0.113.10", | ||
| "sudo", | ||
| "systemctl", | ||
| "is-active", | ||
| "vault", | ||
| stdout="active\n", | ||
| ) | ||
| vault.queue( | ||
| "status", | ||
| "-format=json", | ||
| stdout=json.dumps({"initialized": True, "sealed": False}), | ||
| ) | ||
| doctl.queue( | ||
| "secrets", | ||
| "manager", | ||
| "secrets", | ||
| "get", | ||
| f"{options.secret_prefix}-root-token", | ||
| "--output", | ||
| "json", | ||
| stdout=json.dumps({"secret": {"value": "root-token"}}), | ||
| ) | ||
| vault.queue( | ||
| "status", | ||
| "-format=json", | ||
| stdout=json.dumps({"initialized": True, "sealed": False}), | ||
| ) | ||
| vault.queue( | ||
| "secrets", | ||
| "list", | ||
| "-format=json", | ||
| stdout=json.dumps( | ||
| {"secret/": {"type": "kv", "options": {"version": "2"}}} | ||
| ), | ||
| ) | ||
| vault.queue( | ||
| "auth", | ||
| "list", | ||
| "-format=json", | ||
| stdout=json.dumps({"approle/": {}}), | ||
| ) | ||
| vault.queue() | ||
| vault.queue( | ||
| "write", | ||
| f"auth/approle/role/{options.approle_name}", | ||
| f"token_policies={options.policy_name}", | ||
| "secret_id_ttl=24h", | ||
| "token_ttl=1h", | ||
| "token_max_ttl=4h", | ||
| ) | ||
| vault.queue( | ||
| "read", | ||
| "-field=role_id", | ||
| f"auth/approle/role/{options.approle_name}/role-id", | ||
| stdout="role-abc\n", | ||
| ) | ||
| vault.queue( | ||
| "write", | ||
| "-f", | ||
| "-field=secret_id", | ||
| f"auth/approle/role/{options.approle_name}/secret-id", | ||
| stdout="secret-xyz\n", | ||
| ) | ||
| doctl.queue( | ||
| "secrets", | ||
| "manager", | ||
| "secrets", | ||
| "create", | ||
| f"{options.secret_prefix}-role-id", | ||
| "--data", | ||
| "role-abc", | ||
| exit_code=10, | ||
| ) | ||
| doctl.queue( | ||
| "secrets", | ||
| "manager", | ||
| "secrets", | ||
| "update", | ||
| f"{options.secret_prefix}-role-id", | ||
| "--data", | ||
| "role-abc", | ||
| ) | ||
| doctl.queue( | ||
| "secrets", | ||
| "manager", | ||
| "secrets", | ||
| "create", | ||
| f"{options.secret_prefix}-secret-id", | ||
| "--data", | ||
| "secret-xyz", | ||
| exit_code=10, | ||
| ) | ||
| doctl.queue( | ||
| "secrets", | ||
| "manager", | ||
| "secrets", | ||
| "update", | ||
| f"{options.secret_prefix}-secret-id", | ||
| "--data", | ||
| "secret-xyz", | ||
| ) | ||
|
|
||
| bootstrap(options, runner=runner) | ||
|
|
||
| init_calls = [call for call in vault.calls if call.args[:2] == ("operator", "init")] | ||
| assert not init_calls | ||
| first_call = vault.calls[0] | ||
| assert first_call.env.get("VAULT_ADDR") == options.vault_address | ||
| assert first_call.env.get("VAULT_CACERT") == "/tmp/vault-ca.pem" | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Add a docstring to the test function.
The test function requires a docstring explaining what behaviour it validates (reusing existing configuration without re-initialising Vault, and propagating CA certificate path).
As per coding guidelines.
🤖 Prompt for AI Agents
In scripts/tests/test_bootstrap_vault_appliance.py around lines 178 to 308, the
test function test_bootstrap_reuses_existing_configuration() is missing a
docstring; add a concise one-line docstring at the top of the function
(immediately under the def) that states the behaviour being validated (that
existing Vault configuration is reused, Vault is not re-initialised, and the CA
cert path is propagated to VAULT_CACERT).
| def test_bootstrap_aborts_when_unseal_keys_missing() -> None: | ||
| registry = CommandRegistry() | ||
| doctl = registry.create("doctl") | ||
| vault = registry.create("vault") | ||
| ssh = registry.create("ssh") | ||
| runner = CommandRunner(local_module=registry.local_proxy) | ||
|
|
||
| options = replace(make_options(), vault_address=None) | ||
|
|
||
| doctl.queue( | ||
| "compute", | ||
| "droplet", | ||
| "list", | ||
| "--tag-name", | ||
| options.droplet_tag, | ||
| "--format", | ||
| "PublicIPv4", | ||
| "--no-header", | ||
| stdout="203.0.113.10\n", | ||
| ) | ||
| ssh.queue( | ||
| f"{options.ssh_user}@203.0.113.10", | ||
| "sudo", | ||
| "systemctl", | ||
| "is-active", | ||
| "vault", | ||
| stdout="active\n", | ||
| ) | ||
| vault.queue( | ||
| "status", | ||
| "-format=json", | ||
| stdout=json.dumps({"initialized": True, "sealed": True}), | ||
| ) | ||
| for index in range(1, options.key_shares + 1): | ||
| doctl.queue( | ||
| "secrets", | ||
| "manager", | ||
| "secrets", | ||
| "get", | ||
| f"{options.secret_prefix}-unseal-{index}", | ||
| "--output", | ||
| "json", | ||
| exit_code=1, | ||
| ) | ||
|
|
||
| with pytest.raises(RuntimeError, match="Insufficient unseal keys"): | ||
| bootstrap(options, runner=runner) | ||
|
|
||
| assert vault.calls[0].env.get("VAULT_ADDR") == "https://203.0.113.10:8200" | ||
| assert "VAULT_CACERT" not in vault.calls[0].env |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Add a docstring to the test function.
The test function requires a docstring explaining what behaviour it validates (aborting when unseal keys are missing and not setting VAULT_CACERT when CA path is absent).
As per coding guidelines.
🤖 Prompt for AI Agents
scripts/tests/test_bootstrap_vault_appliance.py around lines 310 to 359: the
test function lacks a docstring; add a short docstring immediately under the def
line that concisely states what the test validates (that bootstrap aborts when
unseal keys are missing and that VAULT_CACERT is not set when no CA path is
provided). Keep it brief (one or two sentences) and follow the project's
docstring style.
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: scripts/tests/test_bootstrap_vault_appliance.py Comment on lines +178 to +307 def test_bootstrap_reuses_existing_configuration() -> None:
registry = CommandRegistry()
doctl = registry.create("doctl")
vault = registry.create("vault")
ssh = registry.create("ssh")
runner = CommandRunner(local_module=registry.local_proxy)
options = replace(make_options(), ca_cert_path="/tmp/vault-ca.pem")
doctl.queue(
"compute",
"droplet",
"list",
"--tag-name",
options.droplet_tag,
"--format",
"PublicIPv4",
"--no-header",
stdout="203.0.113.10\n",
)
ssh.queue(
f"{options.ssh_user}@203.0.113.10",
"sudo",
"systemctl",
"is-active",
"vault",
stdout="active\n",
)
vault.queue(
"status",
"-format=json",
stdout=json.dumps({"initialized": True, "sealed": False}),
)
doctl.queue(
"secrets",
"manager",
"secrets",
"get",
f"{options.secret_prefix}-root-token",
"--output",
"json",
stdout=json.dumps({"secret": {"value": "root-token"}}),
)
vault.queue(
"status",
"-format=json",
stdout=json.dumps({"initialized": True, "sealed": False}),
)
vault.queue(
"secrets",
"list",
"-format=json",
stdout=json.dumps(
{"secret/": {"type": "kv", "options": {"version": "2"}}}
),
)
vault.queue(
"auth",
"list",
"-format=json",
stdout=json.dumps({"approle/": {}}),
)
vault.queue()
vault.queue(
"write",
f"auth/approle/role/{options.approle_name}",
f"token_policies={options.policy_name}",
"secret_id_ttl=24h",
"token_ttl=1h",
"token_max_ttl=4h",
)
vault.queue(
"read",
"-field=role_id",
f"auth/approle/role/{options.approle_name}/role-id",
stdout="role-abc\n",
)
vault.queue(
"write",
"-f",
"-field=secret_id",
f"auth/approle/role/{options.approle_name}/secret-id",
stdout="secret-xyz\n",
)
doctl.queue(
"secrets",
"manager",
"secrets",
"create",
f"{options.secret_prefix}-role-id",
"--data",
"role-abc",
exit_code=10,
)
doctl.queue(
"secrets",
"manager",
"secrets",
"update",
f"{options.secret_prefix}-role-id",
"--data",
"role-abc",
)
doctl.queue(
"secrets",
"manager",
"secrets",
"create",
f"{options.secret_prefix}-secret-id",
"--data",
"secret-xyz",
exit_code=10,
)
doctl.queue(
"secrets",
"manager",
"secrets",
"update",
f"{options.secret_prefix}-secret-id",
"--data",
"secret-xyz",
)
bootstrap(options, runner=runner)
init_calls = [call for call in vault.calls if call.args[:2] == ("operator", "init")]
assert not init_calls
first_call = vault.calls[0]
assert first_call.env.get("VAULT_ADDR") == options.vault_address
assert first_call.env.get("VAULT_CACERT") == "/tmp/vault-ca.pem"❌ New issue: Large Method |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: scripts/tests/test_bootstrap_vault_appliance.py Comment on lines +28 to +175 def test_bootstrap_initialises_and_configures_vault(tmp_path) -> None:
registry = CommandRegistry()
doctl = registry.create("doctl")
vault = registry.create("vault")
ssh = registry.create("ssh")
runner = CommandRunner(local_module=registry.local_proxy)
options = make_options()
doctl.queue(
"compute",
"droplet",
"list",
"--tag-name",
options.droplet_tag,
"--format",
"PublicIPv4",
"--no-header",
stdout="203.0.113.10\n",
)
ssh.queue(
f"{options.ssh_user}@203.0.113.10",
"sudo",
"systemctl",
"is-active",
"vault",
stdout="active\n",
)
vault.queue(
"status",
"-format=json",
stdout=json.dumps({"initialized": False, "sealed": True}),
)
init_payload = json.dumps(
{
"unseal_keys_b64": [
"key-1",
"key-2",
"key-3",
"key-4",
"key-5",
],
"root_token": "root-token",
}
)
vault.queue(
"operator",
"init",
"-key-shares",
str(options.key_shares),
"-key-threshold",
str(options.key_threshold),
"-format=json",
stdout=init_payload,
)
for index in range(1, options.key_shares + 1):
doctl.queue(
"secrets",
"manager",
"secrets",
"create",
f"{options.secret_prefix}-unseal-{index}",
"--data",
f"key-{index}",
)
doctl.queue(
"secrets",
"manager",
"secrets",
"create",
f"{options.secret_prefix}-root-token",
"--data",
"root-token",
)
for index in range(1, options.key_threshold + 1):
vault.queue("operator", "unseal", f"key-{index}")
vault.queue(
"status",
"-format=json",
stdout=json.dumps({"initialized": True, "sealed": False}),
)
vault.queue(
"secrets",
"list",
"-format=json",
stdout=json.dumps({}),
)
vault.queue(
"secrets",
"enable",
"-path",
options.mount_path,
"kv-v2",
)
vault.queue(
"auth",
"list",
"-format=json",
stdout=json.dumps({}),
)
vault.queue("auth", "enable", "approle")
vault.queue()
vault.queue(
"write",
f"auth/approle/role/{options.approle_name}",
f"token_policies={options.policy_name}",
"secret_id_ttl=24h",
"token_ttl=1h",
"token_max_ttl=4h",
)
vault.queue(
"read",
"-field=role_id",
f"auth/approle/role/{options.approle_name}/role-id",
stdout="role-123\n",
)
vault.queue(
"write",
"-f",
"-field=secret_id",
f"auth/approle/role/{options.approle_name}/secret-id",
stdout="secret-456\n",
)
doctl.queue(
"secrets",
"manager",
"secrets",
"create",
f"{options.secret_prefix}-role-id",
"--data",
"role-123",
)
doctl.queue(
"secrets",
"manager",
"secrets",
"create",
f"{options.secret_prefix}-secret-id",
"--data",
"secret-456",
)
bootstrap(options, runner=runner)
policy_calls = [call for call in vault.calls if call.args[:2] == ("policy", "write")]
assert len(policy_calls) == 1
assert policy_calls[0].env.get("VAULT_TOKEN") == "root-token"
assert policy_calls[0].env.get("VAULT_ADDR") == options.vault_address❌ New issue: Large Method |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: scripts/bootstrap_vault_appliance.py Comment on lines +123 to +199 def parse_args(argv: Sequence[str] | None = None) -> BootstrapOptions:
parser = argparse.ArgumentParser(description="Bootstrap the Vault appliance")
parser.add_argument(
"--environment",
required=True,
help="Logical environment identifier (for example 'dev').",
)
parser.add_argument(
"--droplet-tag",
required=True,
help="DigitalOcean tag used to discover the Vault droplet.",
)
parser.add_argument(
"--ssh-user",
default="root",
help="SSH user with access to the appliance (default: root).",
)
parser.add_argument(
"--mount-path",
default="secret",
help="KV v2 mount point used for application secrets (default: secret).",
)
parser.add_argument(
"--approle-name",
default="doks-deployer",
help="Name of the AppRole consumed by the DOKS workflow.",
)
parser.add_argument(
"--policy-name",
default="doks-deployer",
help="Name of the policy bound to the DOKS AppRole.",
)
parser.add_argument(
"--secret-prefix",
required=True,
help="Prefix for DigitalOcean Secrets that store unseal material.",
)
parser.add_argument(
"--key-shares",
type=int,
default=5,
help="Number of unseal key shares to generate when initialising Vault.",
)
parser.add_argument(
"--key-threshold",
type=int,
default=3,
help="Number of unseal shares required to unseal Vault.",
)
parser.add_argument(
"--vault-address",
help=(
"Override the Vault API address. Defaults to the discovered droplet"
" IP on port 8200 over HTTPS."
),
)
parser.add_argument(
"--ca-cert-path",
help=(
"Optional path to the Vault certificate authority bundle. When set,"
" exported as VAULT_CACERT for CLI calls."
),
)
args = parser.parse_args(argv)
return BootstrapOptions(
environment=args.environment,
droplet_tag=args.droplet_tag,
ssh_user=args.ssh_user,
mount_path=args.mount_path.rstrip("/"),
approle_name=args.approle_name,
policy_name=args.policy_name,
secret_prefix=args.secret_prefix,
key_shares=args.key_shares,
key_threshold=args.key_threshold,
vault_address=args.vault_address.rstrip("/") if args.vault_address else None,
ca_cert_path=args.ca_cert_path,
)❌ New issue: Large Method |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: scripts/bootstrap_vault_appliance.py Comment on file mounts = json.loads(runner.run("vault", "secrets", "list", "-format=json", env=env))
mount_path = f"{options.mount_path}/"
current = mounts.get(mount_path)
if current and current.get("type") == "kv" and current.get("options", {}).get("version") == "2":❌ New issue: Complex Conditional |
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 8885 @leynos, I'll suggest a fix for both large test methods by extracting repetitive command queuing into helper functions. This will reduce duplication and bring both tests under the 70-line threshold. Suggested FixExtract common setup and command queuing patterns into helper functions: from __future__ import annotations
import json
from dataclasses import replace
import pytest
from cmd_mox import CommandRegistry, MockCommand
from scripts.bootstrap_vault_appliance import BootstrapOptions, CommandRunner, bootstrap
def make_options() -> BootstrapOptions:
return BootstrapOptions(
environment="dev",
droplet_tag="vault-dev",
ssh_user="root",
mount_path="secret",
approle_name="doks",
policy_name="doks",
secret_prefix="dev-vault",
key_shares=5,
key_threshold=3,
vault_address="https://vault.dev.example:8200",
)
def setup_test_environment() -> tuple[CommandRegistry, MockCommand, MockCommand, MockCommand, CommandRunner]:
"""Set up test environment with mocked commands."""
registry = CommandRegistry()
doctl = registry.create("doctl")
vault = registry.create("vault")
ssh = registry.create("ssh")
runner = CommandRunner(local_module=registry.local_proxy)
return registry, doctl, vault, ssh, runner
def queue_droplet_discovery(doctl: MockCommand, options: BootstrapOptions, ip: str = "203.0.113.10") -> None:
"""Queue droplet discovery command."""
doctl.queue(
"compute",
"droplet",
"list",
"--tag-name",
options.droplet_tag,
"--format",
"PublicIPv4",
"--no-header",
stdout=f"{ip}\n",
)
def queue_ssh_verification(ssh: MockCommand, options: BootstrapOptions, ip: str = "203.0.113.10") -> None:
"""Queue SSH service verification command."""
ssh.queue(
f"{options.ssh_user}@{ip}",
"sudo",
"systemctl",
"is-active",
"vault",
stdout="active\n",
)
def queue_vault_status(vault: MockCommand, initialized: bool, sealed: bool) -> None:
"""Queue vault status check command."""
vault.queue(
"status",
"-format=json",
stdout=json.dumps({"initialized": initialized, "sealed": sealed}),
)
def queue_vault_initialization(vault: MockCommand, doctl: MockCommand, options: BootstrapOptions) -> None:
"""Queue commands for vault initialisation and secret storage."""
init_payload = json.dumps(
{
"unseal_keys_b64": [f"key-{i}" for i in range(1, options.key_shares + 1)],
"root_token": "root-token",
}
)
vault.queue(
"operator",
"init",
"-key-shares",
str(options.key_shares),
"-key-threshold",
str(options.key_threshold),
"-format=json",
stdout=init_payload,
)
for index in range(1, options.key_shares + 1):
doctl.queue(
"secrets",
"manager",
"secrets",
"create",
f"{options.secret_prefix}-unseal-{index}",
"--data",
f"key-{index}",
)
doctl.queue(
"secrets",
"manager",
"secrets",
"create",
f"{options.secret_prefix}-root-token",
"--data",
"root-token",
)
def queue_vault_unsealing(vault: MockCommand, options: BootstrapOptions) -> None:
"""Queue vault unsealing commands."""
for index in range(1, options.key_threshold + 1):
vault.queue("operator", "unseal", f"key-{index}")
queue_vault_status(vault, initialized=True, sealed=False)
def queue_kv_mount_setup(vault: MockCommand, options: BootstrapOptions, exists: bool = False) -> None:
"""Queue KV mount setup commands."""
if exists:
vault.queue(
"secrets",
"list",
"-format=json",
stdout=json.dumps(
{"secret/": {"type": "kv", "options": {"version": "2"}}}
),
)
else:
vault.queue(
"secrets",
"list",
"-format=json",
stdout=json.dumps({}),
)
vault.queue(
"secrets",
"enable",
"-path",
options.mount_path,
"kv-v2",
)
def queue_approle_setup(
vault: MockCommand,
doctl: MockCommand,
options: BootstrapOptions,
role_id: str,
secret_id: str,
exists: bool = False,
update_mode: bool = False,
) -> None:
"""Queue AppRole setup commands."""
if exists:
vault.queue(
"auth",
"list",
"-format=json",
stdout=json.dumps({"approle/": {}}),
)
else:
vault.queue(
"auth",
"list",
"-format=json",
stdout=json.dumps({}),
)
vault.queue("auth", "enable", "approle")
vault.queue() # Policy write placeholder
vault.queue(
"write",
f"auth/approle/role/{options.approle_name}",
f"token_policies={options.policy_name}",
"secret_id_ttl=24h",
"token_ttl=1h",
"token_max_ttl=4h",
)
vault.queue(
"read",
"-field=role_id",
f"auth/approle/role/{options.approle_name}/role-id",
stdout=f"{role_id}\n",
)
vault.queue(
"write",
"-f",
"-field=secret_id",
f"auth/approle/role/{options.approle_name}/secret-id",
stdout=f"{secret_id}\n",
)
if update_mode:
doctl.queue(
"secrets",
"manager",
"secrets",
"create",
f"{options.secret_prefix}-role-id",
"--data",
role_id,
exit_code=10,
)
doctl.queue(
"secrets",
"manager",
"secrets",
"update",
f"{options.secret_prefix}-role-id",
"--data",
role_id,
)
doctl.queue(
"secrets",
"manager",
"secrets",
"create",
f"{options.secret_prefix}-secret-id",
"--data",
secret_id,
exit_code=10,
)
doctl.queue(
"secrets",
"manager",
"secrets",
"update",
f"{options.secret_prefix}-secret-id",
"--data",
secret_id,
)
else:
doctl.queue(
"secrets",
"manager",
"secrets",
"create",
f"{options.secret_prefix}-role-id",
"--data",
role_id,
)
doctl.queue(
"secrets",
"manager",
"secrets",
"create",
f"{options.secret_prefix}-secret-id",
"--data",
secret_id,
)
def queue_root_token_retrieval(doctl: MockCommand, options: BootstrapOptions, token: str = "root-token") -> None:
"""Queue root token retrieval command."""
doctl.queue(
"secrets",
"manager",
"secrets",
"get",
f"{options.secret_prefix}-root-token",
"--output",
"json",
stdout=json.dumps({"secret": {"value": token}}),
)
def test_bootstrap_initialises_and_configures_vault(tmp_path) -> None:
registry, doctl, vault, ssh, runner = setup_test_environment()
options = make_options()
queue_droplet_discovery(doctl, options)
queue_ssh_verification(ssh, options)
queue_vault_status(vault, initialized=False, sealed=True)
queue_vault_initialization(vault, doctl, options)
queue_vault_unsealing(vault, options)
queue_kv_mount_setup(vault, options, exists=False)
queue_approle_setup(vault, doctl, options, "role-123", "secret-456", exists=False)
bootstrap(options, runner=runner)
policy_calls = [call for call in vault.calls if call.args[:2] == ("policy", "write")]
assert len(policy_calls) == 1
assert policy_calls[0].env.get("VAULT_TOKEN") == "root-token"
assert policy_calls[0].env.get("VAULT_ADDR") == options.vault_address
def test_bootstrap_reuses_existing_configuration() -> None:
registry, doctl, vault, ssh, runner = setup_test_environment()
options = replace(make_options(), ca_cert_path="/tmp/vault-ca.pem")
queue_droplet_discovery(doctl, options)
queue_ssh_verification(ssh, options)
queue_vault_status(vault, initialized=True, sealed=False)
queue_root_token_retrieval(doctl, options)
queue_vault_status(vault, initialized=True, sealed=False)
queue_kv_mount_setup(vault, options, exists=True)
queue_approle_setup(vault, doctl, options, "role-abc", "secret-xyz", exists=True, update_mode=True)
bootstrap(options, runner=runner)
init_calls = [call for call in vault.calls if call.args[:2] == ("operator", "init")]
assert not init_calls
first_call = vault.calls[0]
assert first_call.env.get("VAULT_ADDR") == options.vault_address
assert first_call.env.get("VAULT_CACERT") == "/tmp/vault-ca.pem"
def test_bootstrap_aborts_when_unseal_keys_missing() -> None:
registry, doctl, vault, ssh, runner = setup_test_environment()
options = replace(make_options(), vault_address=None)
queue_droplet_discovery(doctl, options)
queue_ssh_verification(ssh, options)
queue_vault_status(vault, initialized=True, sealed=True)
for index in range(1, options.key_shares + 1):
doctl.queue(
"secrets",
"manager",
"secrets",
"get",
f"{options.secret_prefix}-unseal-{index}",
"--output",
"json",
exit_code=1,
)
with pytest.raises(RuntimeError, match="Insufficient unseal keys"):
bootstrap(options, runner=runner)
assert vault.calls[0].env.get("VAULT_ADDR") == "https://203.0.113.10:8200"
assert "VAULT_CACERT" not in vault.calls[0].envAI Coding Agent Prompt🐰 |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: scripts/bootstrap_vault_appliance.py Comment on lines +372 to +405 def bootstrap(options: BootstrapOptions, runner: CommandRunner | None = None) -> None:
command_runner = runner or CommandRunner()
secrets = SecretStore(command_runner, options.secret_prefix)
ip_address = discover_droplet_ip(options, command_runner)
verify_vault_service(ip_address, options, command_runner)
address = options.vault_address or f"https://{ip_address}:8200"
vault_env: dict[str, str] = {"VAULT_ADDR": address}
if options.ca_cert_path:
vault_env["VAULT_CACERT"] = options.ca_cert_path
status = read_vault_status(command_runner, vault_env)
if not status.get("initialized", False):
init_data = initialise_vault(options, command_runner, vault_env, secrets)
unseal_vault(init_data["unseal_keys"], options, command_runner, vault_env)
vault_env["VAULT_TOKEN"] = init_data["root_token"]
else:
if status.get("sealed", False):
unseal_keys = load_unseal_keys(options, secrets)
unseal_vault(unseal_keys, options, command_runner, vault_env)
root_token = secrets.get("root-token")
if not root_token:
raise RuntimeError(
"Vault is initialised but no root token is stored; cannot proceed."
)
vault_env["VAULT_TOKEN"] = root_token
post_status = read_vault_status(command_runner, vault_env)
if post_status.get("sealed", False):
raise RuntimeError("Vault remains sealed after attempting to unseal it.")
ensure_kv_mount(options, command_runner, vault_env)
ensure_approle(options, command_runner, vault_env, secrets)❌ New issue: Complex Method |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 0 🏁 Script executed: Length of output: 643 @leynos: The complex conditional can be simplified by extracting the validation logic into a helper function or using intermediate variables to improve readability. Suggested fix: def _is_kv_v2_mount(mount_info: dict | None) -> bool:
"""Check if a mount is a KV version 2 secrets engine."""
if not mount_info:
return False
if mount_info.get("type") != "kv":
return False
options = mount_info.get("options", {})
return options.get("version") == "2"
def ensure_kv_mount(options: BootstrapOptions, runner: CommandRunner, env: dict[str, str]) -> None:
mounts = json.loads(runner.run("vault", "secrets", "list", "-format=json", env=env))
mount_path = f"{options.mount_path}/"
current = mounts.get(mount_path)
if _is_kv_v2_mount(current):
return
runner.run(
"vault",
"secrets",
"enable",
"-path",
options.mount_path,
"kv-v2",
env=env,
)AI coding agent prompt: |
|
Tip For best results, initiate chat on the files or code changes. @leynos: The Suggested fixAdd these helper functions before def _build_vault_environment(options: BootstrapOptions, ip_address: str) -> dict[str, str]:
"""Build the base Vault environment with address and optional CA certificate."""
address = options.vault_address or f"https://{ip_address}:8200"
vault_env: dict[str, str] = {"VAULT_ADDR": address}
if options.ca_cert_path:
vault_env["VAULT_CACERT"] = options.ca_cert_path
return vault_env
def _ensure_vault_unsealed_and_authenticated(
options: BootstrapOptions,
runner: CommandRunner,
vault_env: dict[str, str],
secrets: SecretStore,
status: dict,
) -> None:
"""Ensure Vault is unsealed and vault_env contains VAULT_TOKEN."""
if not status.get("initialized", False):
init_data = initialise_vault(options, runner, vault_env, secrets)
unseal_vault(init_data["unseal_keys"], options, runner, vault_env)
vault_env["VAULT_TOKEN"] = init_data["root_token"]
else:
if status.get("sealed", False):
unseal_keys = load_unseal_keys(options, secrets)
unseal_vault(unseal_keys, options, runner, vault_env)
root_token = secrets.get("root-token")
if not root_token:
raise RuntimeError(
"Vault is initialised but no root token is stored; cannot proceed."
)
vault_env["VAULT_TOKEN"] = root_token
def _verify_vault_unsealed(runner: CommandRunner, vault_env: dict[str, str]) -> None:
"""Verify that Vault is unsealed after unsealing attempts."""
post_status = read_vault_status(runner, vault_env)
if post_status.get("sealed", False):
raise RuntimeError("Vault remains sealed after attempting to unseal it.")Then replace the def bootstrap(options: BootstrapOptions, runner: CommandRunner | None = None) -> None:
command_runner = runner or CommandRunner()
secrets = SecretStore(command_runner, options.secret_prefix)
ip_address = discover_droplet_ip(options, command_runner)
verify_vault_service(ip_address, options, command_runner)
vault_env = _build_vault_environment(options, ip_address)
status = read_vault_status(command_runner, vault_env)
_ensure_vault_unsealed_and_authenticated(
options, command_runner, vault_env, secrets, status
)
_verify_vault_unsealed(command_runner, vault_env)
ensure_kv_mount(options, command_runner, vault_env)
ensure_approle(options, command_runner, vault_env, secrets)This refactoring:
AI coding agent prompt🐰 |
There was a problem hiding this comment.
Gates Failed
New code is healthy
(1 new file with code health below 9.00)
Enforce advisory code health rules
(2 files with Complex Method, Complex Conditional, Large Method)
Gates Passed
4 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
| New code is healthy | Violations | Code Health Impact | |
|---|---|---|---|
| bootstrap_vault_appliance.py | 3 rules | 8.99 | Suppress |
| Enforce advisory code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| bootstrap_vault_appliance.py | 3 advisory rules | 8.99 | Suppress |
| test_bootstrap_vault_appliance.py | 1 advisory rule | 9.31 | Suppress |
Quality Gate Profile: Pay Down Tech Debt
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
Summary
bootstrap_vault_appliance.pydocs/vault-applience-bootstrap-process.mdand note the options in the module designTesting
https://chatgpt.com/codex/tasks/task_e_68dea7372f3c8322b43098f906045da7
Summary by Sourcery
Introduce a new idempotent Python helper script for bootstrapping a DigitalOcean-hosted Vault appliance with configurable Vault endpoint and custom CA bundle, update module design documentation, add user guide, and extend the test suite to cover address/CA propagation and error scenarios.
New Features:
Documentation:
Tests: