Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions examples/stage/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"created_at": "2025-10-23T13:09:51.866481+00:00",
"envelope": {
"alg": "AES-256-GCM",
"filename": "mvpEX.stl",
"kdf": "HKDF-SHA256",
"kx": "X25519",
"payload_sha256": "409d11741321d4940018f304a2df598b4c175cb2f2933985ea99f91ff69f2078",
"sender_pub_ed25519": "2NWHb34FFmSUkc77VKhwk2rq9Nj8yhp95xaI9Uvl6GM="
},
"payload": {
"filename": "mvpEX.stl.psenc",
"sha256": "25f2eedf62aeebd95c525de870932e38c76e4c0909f636bebfd4f7338042949e"
},
"policy_id": "nist-800-171",
"v": 1
}
Binary file added examples/stage/mvpEX.stl
Binary file not shown.
Binary file added examples/stage/mvpEX.stl.psenc
Binary file not shown.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ dependencies = [
"rich>=13.7,<15",
"pydantic>=2.11,<2.12",
"cryptography>=42,<46",
"paramiko>=3.4,<3.7",
"paramiko>=3.2",
"requests>=2.31,<3.0",
"watchdog>=4.0,<5.0",
"docker>=6.1,<8.0",
Expand Down
202 changes: 196 additions & 6 deletions src/printshield/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
build_provenance, sign_provenance, verify_provenance,
default_manifest_path, save_manifest,
)
from .core.transfer.receive import receive_gate
from .core.transfer.sftp import sftp_upload_verify
from .core.transfer.https import https_upload_verify
from .core.transfer.octoprint import octoprint_upload


@dataclass
class Context:
Expand Down Expand Up @@ -65,6 +70,9 @@ def main_callback(
prov_app = typer.Typer(help="Provenance sidecar tools for STL.")
app.add_typer(prov_app, name="provenance")

transfer_app = typer.Typer(help="Secure file transfer to printers/servers.")
app.add_typer(transfer_app, name="transfer")

# --- Command stubs ---

@app.command(help="Interactive bootstrap: keys, policy, audit path, endpoints, hardening.")
Expand Down Expand Up @@ -271,13 +279,195 @@ def provenance_verify(
typer.echo(f"Provenance FAIL — {res.reason}")
raise typer.Exit(code=0 if res.ok else 1)

@app.command(help="Securely transfer to printer/server (SFTP/HTTPS/OctoPrint); integrity gate on receive.")
def transfer() -> None:
typer.echo("transfer: not implemented yet.")
# ============================ TRANSFER / RECIEVE / MONITOR ================================
@transfer_app.command("sftp", help="Upload a bundle or envelope over SFTP with host-key check and SHA-256 verification.")
def transfer_sftp(
ctx: typer.Context,
input: Path = typer.Argument(..., exists=True, file_okay=True, dir_okay=False, readable=True),
host: str = typer.Option(..., "--host", "-H"),
user: str = typer.Option(..., "--user", "-u"),
dest_dir: str = typer.Option(..., "--dest-dir", "-d", help="Remote directory (POSIX)"),
dest_name: str = typer.Option(None, "--dest-name", help="Remote filename (defaults to source basename)"),
port: int = typer.Option(22, "--port"),
key_file: Path = typer.Option(None, "--key-file", help="Private key for auth (PEM)"),
password: str = typer.Option(None, "--password", help="Password (discouraged; use keys)"),
known_hosts: Path = typer.Option(None, "--known-hosts", help="known_hosts file to trust (merged with system)"),
hostkey_fingerprint: str = typer.Option(None, "--hostkey-fingerprint", help="Pin server key (OpenSSH SHA256:... form)"),
insecure_no_hostkey_check: bool = typer.Option(False, "--insecure-no-hostkey-check", help="Accept new/unknown host keys (NOT recommended)"),
):
c = cast(Context, ctx.obj)
loaded = load_config(c.config_path)
res = sftp_upload_verify(
input,
host=host,
username=user,
dest_dir=dest_dir,
dest_name=dest_name,
port=port,
key_filename=str(key_file) if key_file else None,
password=password,
known_hosts=str(known_hosts) if known_hosts else None,
hostkey_fingerprint=hostkey_fingerprint,
allow_unknown_hostkey=bool(insecure_no_hostkey_check),
)
record_event(loaded.config.audit.path, "transfer.sftp", {
"input": str(input),
"host": host,
"remote_path": res.remote_path,
"size": res.size,
"sha256": res.local_sha256,
"hostkey_pinned": hostkey_fingerprint is not None,
"insecure_no_hostkey_check": bool(insecure_no_hostkey_check),
})
if c.output_format == "json":
import json
typer.echo(json.dumps(res.__dict__, indent=2))
else:
typer.echo(f"Remote: {res.remote_path}")
typer.echo(f"Size: {res.size} bytes")
typer.echo(f"SHA-256: {res.local_sha256}")

@transfer_app.command("https", help="Upload a bundle/envelope over HTTPS with TLS + SHA-256 verification.")
def transfer_https(
ctx: typer.Context,
input: Path = typer.Argument(..., exists=True, file_okay=True, dir_okay=False, readable=True),
url: str = typer.Option(..., "--url", "-U", help="HTTPS endpoint to POST to."),
token: str = typer.Option(None, "--token", help="Bearer token for Authorization header."),
ca_cert: Path = typer.Option(None, "--ca-cert", help="Custom CA cert bundle (PEM)."),
insecure_no_verify_tls: bool = typer.Option(False, "--insecure-no-verify-tls", help="Disable TLS verification (NOT recommended)."),
timeout: float = typer.Option(30.0, "--timeout", help="Request timeout in seconds."),
):
c = cast(Context, ctx.obj)
loaded = load_config(c.config_path)

res = https_upload_verify(
input,
url=url,
token=token,
ca_cert=str(ca_cert) if ca_cert is not None else None,
insecure_no_verify_tls=bool(insecure_no_verify_tls),
timeout=timeout,
)

record_event(
loaded.config.audit.path,
"transfer.https",
{
"input": str(input),
"url": res.url,
"sha256": res.sha256,
"stored_path": res.stored_path,
"insecure_no_verify_tls": bool(insecure_no_verify_tls),
"has_token": token is not None,
},
)

if c.output_format == "json":
typer.echo(json.dumps(res.__dict__, indent=2))
else:
typer.echo(f"Uploaded to: {res.url}")
typer.echo(f"SHA-256: {res.sha256}")
if res.stored_path:
typer.echo(f"Stored as: {res.stored_path}")

@transfer_app.command("octoprint", help="Upload a bundle/envelope to OctoPrint via its REST API.")
def transfer_octoprint(
ctx: typer.Context,
input: Path = typer.Argument(..., exists=True, file_okay=True, dir_okay=False, readable=True),
base_url: str = typer.Option(..., "--url", "-U", help="Base URL of OctoPrint, e.g. https://octopi.local"),
api_key: str = typer.Option(..., "--api-key", "-K", help="OctoPrint API key (X-Api-Key)."),
location: str = typer.Option("local", "--location", "-L", help="Target location: local|sdcard"),
subpath: str = typer.Option(None, "--path", help="Remote subfolder within location."),
select: bool = typer.Option(False, "--select", help="Select file after upload."),
print_after: bool = typer.Option(False, "--print", help="Start printing after upload (implies select)."),
ca_cert: Path = typer.Option(None, "--ca-cert", help="Custom CA bundle (PEM)."),
insecure_no_verify_tls: bool = typer.Option(False, "--insecure-no-verify-tls", help="Disable TLS verification (NOT recommended)."),
timeout: float = typer.Option(30.0, "--timeout", help="Request timeout in seconds."),
):
c = cast(Context, ctx.obj)
loaded = load_config(c.config_path)

res = octoprint_upload(
input,
base_url=base_url,
api_key=api_key,
location=location,
select=select or print_after,
print_after_select=print_after,
subpath=subpath,
ca_cert=str(ca_cert) if ca_cert is not None else None,
insecure_no_verify_tls=bool(insecure_no_verify_tls),
timeout=timeout,
)

record_event(
loaded.config.audit.path,
"transfer.octoprint",
{
"input": str(input),
"url": res.url,
"location": res.location,
"path": res.path,
"origin": res.origin,
"sha256": res.sha256,
"select": select or print_after,
"print": print_after,
"insecure_no_verify_tls": bool(insecure_no_verify_tls),
},
)

if c.output_format == "json":
typer.echo(json.dumps(res.__dict__, indent=2))
else:
typer.echo(f"Uploaded to OctoPrint {res.origin}:{res.path}")
typer.echo(f"SHA-256: {res.sha256}")


@app.command(help="Server/printer-side staging: verify bundle, signatures, and policy before queueing.")
def receive() -> None:
typer.echo("receive: not implemented yet.")
@app.command(help="Server-side gate: verify policy, decrypt envelope, and stage plaintext for printing.")
def receive(
ctx: typer.Context,
input: Path = typer.Argument(..., exists=True, file_okay=True, dir_okay=False, readable=True,
help="Path to .pshieldpkg or .psenc"),
private_key: Path = typer.Option(..., "--private-key", "-k", help="Recipient X25519 private key (PEM)."),
expected_sender_pub: Path = typer.Option(None, "--expected-sender-pub", "-E", help="Require this ed25519 public key (PEM)."),
out_dir: Path = typer.Option(None, "--out-dir", "-d", help="Staging directory (created if missing). Defaults to <input>.stage"),
require_policy: str = typer.Option(None, "--require-policy", help="If set, reject bundles whose manifest.policy_id != this"),
):
c = cast(Context, ctx.obj)
loaded = load_config(c.config_path)

result = receive_gate(
input_path=input,
out_dir=out_dir,
recipient_private_key_pem=private_key,
expected_sender_pub_pem=expected_sender_pub,
require_policy_id=require_policy,
)

record_event(
loaded.config.audit.path, "receive",
{
"input": str(input),
"staged_dir": str(result.staged_dir),
"envelope": str(result.envelope_path),
"decrypted": str(result.decrypted_path),
"manifest": str(result.manifest_path) if result.manifest_path else None,
},
)

if c.output_format == "json":
typer.echo(json.dumps({
"staged_dir": str(result.staged_dir),
"envelope": str(result.envelope_path),
"decrypted": str(result.decrypted_path),
"manifest": str(result.manifest_path) if result.manifest_path else None,
}, indent=2))
else:
typer.echo(f"Staged dir: {result.staged_dir}")
if result.manifest_path:
typer.echo(f"Manifest: {result.manifest_path}")
typer.echo(f"Envelope: {result.envelope_path}")
typer.echo(f"Decrypted: {result.decrypted_path}")

@app.command(help="Watch directories/printers for changes and integrity drift; emit audit events.")
def monitor() -> None:
Expand Down
85 changes: 85 additions & 0 deletions src/printshield/core/transfer/https.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from __future__ import annotations

import hashlib
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Dict, Any

@dataclass
class HttpUploadResult:
url: str
sha256: str
stored_path: Optional[str]

def _sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()

def https_upload_verify(
local_path: str | Path,
*,
url: str,
token: Optional[str] = None,
ca_cert: Optional[str | Path] = None,
insecure_no_verify_tls: bool = False,
timeout: float = 30.0,
extra_meta: Optional[Dict[str, Any]] = None,
session: Any = None, # for tests; if None, create a new requests.Session()
) -> HttpUploadResult:
"""
Upload local_path to HTTPS endpoint via POST multipart/form-data,
include SHA-256 in header/body, and require server to echo its computed SHA-256.
Integrity passes only if server's sha256 == local sha256.
"""
lp = Path(local_path)
local_hash = _sha256_file(lp)

import requests # type: ignore

verify: bool | str
if insecure_no_verify_tls:
verify = False
elif ca_cert is not None:
verify = str(ca_cert)
else:
verify = True # system CA bundle

headers: Dict[str, str] = {
"X-PrintShield-SHA256": local_hash,
}
if token:
headers["Authorization"] = f"Bearer {token}"

meta = dict(extra_meta or {})
meta["sha256"] = local_hash

sess = session or requests.Session()
with lp.open("rb") as f:
files = {"file": (lp.name, f)}
resp = sess.post(
url,
headers=headers,
files=files,
data=meta,
timeout=timeout,
verify=verify,
)

if not (200 <= resp.status_code < 300):
raise ValueError(f"HTTP upload failed: {resp.status_code}")

# Expect JSON body with 'sha256' and optional 'stored_path'
try:
data = resp.json()
except Exception as e:
raise ValueError(f"Server response is not valid JSON: {e}") from e

server_hash = data.get("sha256")
if server_hash != local_hash:
raise ValueError("Integrity check failed: server sha256 != local sha256")

stored = data.get("stored_path")
return HttpUploadResult(url=url, sha256=local_hash, stored_path=stored)
Loading