diff --git a/examples/enc/incorrect_private_ed25519.pem b/examples/enc/incorrect_private_ed25519.pem new file mode 100644 index 0000000..79e6d89 --- /dev/null +++ b/examples/enc/incorrect_private_ed25519.pem @@ -0,0 +1,3 @@ +-----BEGIN PRIVATE KEY----- +MC4CAQAwBQYDK2VwBCIEIA0Uz2TA66FhUY6XXFtUm1gitpACP7Fe30g1UbZGubAR +-----END PRIVATE KEY----- diff --git a/examples/enc/incorrect_public_ed25519.pem b/examples/enc/incorrect_public_ed25519.pem new file mode 100644 index 0000000..f8f9243 --- /dev/null +++ b/examples/enc/incorrect_public_ed25519.pem @@ -0,0 +1,3 @@ +-----BEGIN PUBLIC KEY----- +MCowBQYDK2VwAyEA5pIucZl12nyISlFbKdWVvG5w66MSfXbshpkh0ehQAIE= +-----END PUBLIC KEY----- diff --git a/examples/mvp/extracted/manifest.json b/examples/mvp/extracted/manifest.json new file mode 100644 index 0000000..95902ac --- /dev/null +++ b/examples/mvp/extracted/manifest.json @@ -0,0 +1,17 @@ +{ + "created_at": "2025-10-23T13:09:51.866481+00:00", + "envelope": { + "alg": "AES-256-GCM", + "filename": "mvpEX.stl", + "kdf": "HKDF-SHA256", + "kx": "X25519", + "payload_sha256": "409d11741321d4940018f304a2df598b4c175cb2f2933985ea99f91ff69f2078", + "sender_pub_ed25519": "2NWHb34FFmSUkc77VKhwk2rq9Nj8yhp95xaI9Uvl6GM=" + }, + "payload": { + "filename": "mvpEX.stl.psenc", + "sha256": "25f2eedf62aeebd95c525de870932e38c76e4c0909f636bebfd4f7338042949e" + }, + "policy_id": "nist-800-171", + "v": 1 +} \ No newline at end of file diff --git a/examples/mvp/extracted/mvpEX.stl.psenc b/examples/mvp/extracted/mvpEX.stl.psenc new file mode 100644 index 0000000..d9f4316 Binary files /dev/null and b/examples/mvp/extracted/mvpEX.stl.psenc differ diff --git a/examples/mvp/incorrect_private_ed25519.pem b/examples/mvp/incorrect_private_ed25519.pem new file mode 100644 index 0000000..79e6d89 --- /dev/null +++ b/examples/mvp/incorrect_private_ed25519.pem @@ -0,0 +1,3 @@ +-----BEGIN PRIVATE KEY----- +MC4CAQAwBQYDK2VwBCIEIA0Uz2TA66FhUY6XXFtUm1gitpACP7Fe30g1UbZGubAR +-----END PRIVATE KEY----- diff --git a/examples/mvp/incorrect_public_ed25519.pem b/examples/mvp/incorrect_public_ed25519.pem new file mode 100644 index 0000000..f8f9243 --- /dev/null +++ b/examples/mvp/incorrect_public_ed25519.pem @@ -0,0 +1,3 @@ +-----BEGIN PUBLIC KEY----- +MCowBQYDK2VwAyEA5pIucZl12nyISlFbKdWVvG5w66MSfXbshpkh0ehQAIE= +-----END PUBLIC KEY----- diff --git a/examples/mvp/mvpEX.stl b/examples/mvp/mvpEX.stl new file mode 100644 index 0000000..ee25e4b Binary files /dev/null and b/examples/mvp/mvpEX.stl differ diff --git a/examples/mvp/mvpEX.stl.psenc b/examples/mvp/mvpEX.stl.psenc new file mode 100644 index 0000000..d9f4316 Binary files /dev/null and b/examples/mvp/mvpEX.stl.psenc differ diff --git a/examples/mvp/mvpEX.stl.psenc.pshieldpkg b/examples/mvp/mvpEX.stl.psenc.pshieldpkg new file mode 100644 index 0000000..6fcd571 Binary files /dev/null and b/examples/mvp/mvpEX.stl.psenc.pshieldpkg differ diff --git a/examples/mvp/sample_private_ed25519.pem b/examples/mvp/sample_private_ed25519.pem new file mode 100644 index 0000000..49ff226 --- /dev/null +++ b/examples/mvp/sample_private_ed25519.pem @@ -0,0 +1,3 @@ +-----BEGIN PRIVATE KEY----- +MC4CAQAwBQYDK2VwBCIEICat24dZ+8D7aInN2TpwHIOKYGJUVNVTdBA6zGB6Nfzz +-----END PRIVATE KEY----- diff --git a/examples/mvp/sample_private_x25519.pem b/examples/mvp/sample_private_x25519.pem new file mode 100644 index 0000000..a829e74 --- /dev/null +++ b/examples/mvp/sample_private_x25519.pem @@ -0,0 +1,3 @@ +-----BEGIN PRIVATE KEY----- +MC4CAQAwBQYDK2VuBCIEIGBTwyL4k3HiuW9P6PrHpAYqvVZ+lpfXmPs/k1ZzUCRD +-----END PRIVATE KEY----- diff --git a/examples/mvp/sample_public_ed25519.pem b/examples/mvp/sample_public_ed25519.pem new file mode 100644 index 0000000..47a6dcc --- /dev/null +++ b/examples/mvp/sample_public_ed25519.pem @@ -0,0 +1,3 @@ +-----BEGIN PUBLIC KEY----- +MCowBQYDK2VwAyEA2NWHb34FFmSUkc77VKhwk2rq9Nj8yhp95xaI9Uvl6GM= +-----END PUBLIC KEY----- diff --git a/examples/mvp/sample_public_x25519.pem b/examples/mvp/sample_public_x25519.pem new file mode 100644 index 0000000..786136d --- /dev/null +++ b/examples/mvp/sample_public_x25519.pem @@ -0,0 +1,3 @@ +-----BEGIN PUBLIC KEY----- +MCowBQYDK2VuAyEAGJ2TDSnFELT8DVy+WKn8rLefA6Z0X0mS/SBl7TYaXjs= +-----END PUBLIC KEY----- diff --git a/src/printshield/cli.py b/src/printshield/cli.py index e2d4adf..bb34536 100644 --- a/src/printshield/cli.py +++ b/src/printshield/cli.py @@ -20,6 +20,10 @@ from .core.audit.chain import record_event, verify_log from .core.crypto.encrypt import encrypt_file, decrypt_file from .core.transfer.bundle import create_bundle, extract_bundle, BUNDLE_EXT +from .core.provenance.manifest import ( + build_provenance, sign_provenance, verify_provenance, + default_manifest_path, save_manifest, +) @dataclass class Context: @@ -58,6 +62,9 @@ def main_callback( bundle_app = typer.Typer(help="Create/extract .pshieldpkg job bundles.") app.add_typer(bundle_app, name="bundle") +prov_app = typer.Typer(help="Provenance sidecar tools for STL.") +app.add_typer(prov_app, name="provenance") + # --- Command stubs --- @app.command(help="Interactive bootstrap: keys, policy, audit path, endpoints, hardening.") @@ -104,7 +111,6 @@ def decrypt( expected_sender_pub: Path = typer.Option(None, "--expected-sender-pub", "-E", help="Require this Ed25519 public key (PEM)."), output: Path = typer.Option(None, "--output", "-o", help="Decrypted file path (defaults to strip .psenc)."), ): - from typing import cast c = cast(Context, ctx.obj) loaded = load_config(c.config_path) out_p = decrypt_file(path, private_key, out_path=output, expected_sender_pub_pem=expected_sender_pub) @@ -200,9 +206,70 @@ def verify_cmd( raise typer.Exit(code=0 if ok else 1) -@app.command(help="Create/read provenance manifests; embed/extract when supported.") -def provenance() -> None: - typer.echo("provenance: not implemented yet.") + +# ============================ PROVENANCE ================================ +@prov_app.command("create", help="Create a (signed) provenance sidecar JSON for an STL file.") +def provenance_create( + ctx: typer.Context, + file: Path = typer.Argument(..., exists=True, file_okay=True, dir_okay=False, readable=True), + signer_key: Path = typer.Option(None, "--signer-key", "-S", help="Ed25519 private key (PEM) to sign the manifest."), + manifest: Path = typer.Option(None, "--manifest", "-m", help="Output manifest path (defaults to .provenance.json)"), + project: str = typer.Option(None, "--project"), + part_id: str = typer.Option(None, "--part-id"), + revision: str = typer.Option(None, "--revision"), + owner: str = typer.Option(None, "--owner"), + slicer_name: str = typer.Option(None, "--slicer-name"), + slicer_version: str = typer.Option(None, "--slicer-version"), + slicer_profile_hash: str = typer.Option(None, "--slicer-profile-hash"), +): + c = cast(Context, ctx.obj) + loaded = load_config(c.config_path) + + prov = build_provenance( + file, loaded.config.policy_id, + project=project, part_id=part_id, revision=revision, owner=owner, + slicer_name=slicer_name, slicer_version=slicer_version, slicer_profile_hash=slicer_profile_hash, + ) + + if signer_key is not None: + prov = sign_provenance(prov, signer_private_key_pem=signer_key) + + out_path = manifest or default_manifest_path(file) + save_manifest(prov, out_path) + + # Audit + fields = {"file": str(file), "manifest": str(out_path), "signed": signer_key is not None} + record_event(loaded.config.audit.path, "provenance.create", fields) + + if c.output_format == "json": + typer.echo(json.dumps({"ok": True, "manifest": str(out_path), "signed": signer_key is not None}, indent=2)) + else: + mode = "signed" if signer_key is not None else "unsigned" + typer.echo(f"Provenance ({mode}) → {out_path}") + +@prov_app.command("verify", help="Verify a provenance sidecar against an STL file.") +def provenance_verify( + ctx: typer.Context, + file: Path = typer.Argument(..., exists=True, file_okay=True, dir_okay=False, readable=True), + manifest: Path = typer.Option(None, "--manifest", "-m", help="Manifest path (defaults to .provenance.json)"), + expected_signer_pub: Path = typer.Option(None, "--expected-signer-pub", "-E", help="Require this Ed25519 public key (PEM)."), +): + c = cast(Context, ctx.obj) + loaded = load_config(c.config_path) + man_path = manifest or default_manifest_path(file) + + res = verify_provenance(file, man_path, expected_signer_pub_pem=expected_signer_pub) + # Audit + record_event(loaded.config.audit.path, "provenance.verify", {"file": str(file), "manifest": str(man_path), "ok": res.ok}) + + if c.output_format == "json": + typer.echo(json.dumps({"ok": res.ok, "reason": res.reason}, indent=2)) + else: + if res.ok: + typer.echo("Provenance OK") + else: + typer.echo(f"Provenance FAIL — {res.reason}") + raise typer.Exit(code=0 if res.ok else 1) @app.command(help="Securely transfer to printer/server (SFTP/HTTPS/OctoPrint); integrity gate on receive.") def transfer() -> None: @@ -225,7 +292,6 @@ def monitor() -> None: # ============================ AUDIT-VERIFY ================================ @audit_app.command("verify", help="Verify the tamper-evident audit log chain.") def audit_verify(ctx: typer.Context) -> None: - from typing import cast c = cast(Context, ctx.obj) loaded = load_config(c.config_path) ok, count, bad = verify_log(loaded.config.audit.path) diff --git a/src/printshield/core/provenance/manifest.py b/src/printshield/core/provenance/manifest.py index e69de29..8c125b6 100644 --- a/src/printshield/core/provenance/manifest.py +++ b/src/printshield/core/provenance/manifest.py @@ -0,0 +1,164 @@ +from __future__ import annotations + +import json +from base64 import b64encode, b64decode +from dataclasses import dataclass +from datetime import datetime, timezone +from hashlib import sha256 +from pathlib import Path +from typing import Optional + +from pydantic import BaseModel, Field +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric.ed25519 import ( + Ed25519PrivateKey, + Ed25519PublicKey, +) + +# ---------- Canonical JSON (JCS-style subset) ---------- +def _canon(obj: dict) -> bytes: + return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8") + +# ---------- Models ---------- +class Artifact(BaseModel): + filename: str + sha256: str + size: int = Field(ge=0) + format: str = "stl" # STL-only for now + +class SlicerInfo(BaseModel): + name: Optional[str] = None + version: Optional[str] = None + profile_hash: Optional[str] = None + +class Provenance(BaseModel): + v: int = 1 + created_at: str + policy_id: str + artifact: Artifact + project: Optional[str] = None + part_id: Optional[str] = None + revision: Optional[str] = None + owner: Optional[str] = None + slicer: Optional[SlicerInfo] = None + signer_pub_ed25519: Optional[str] = None # base64 (raw 32 bytes) + signature: Optional[str] = None # base64 (Ed25519 sig over canonicalized object WITHOUT signature) + +# ---------- Hash helpers ---------- +def file_sha256(path: str | Path) -> str: + p = Path(path) + h = sha256() + with p.open("rb") as f: + for chunk in iter(lambda: f.read(1024 * 1024), b""): + h.update(chunk) + return h.hexdigest() + +# ---------- Build / Sign / Verify ---------- +def build_provenance( + file_path: str | Path, + policy_id: str, + *, + project: str | None = None, + part_id: str | None = None, + revision: str | None = None, + owner: str | None = None, + slicer_name: str | None = None, + slicer_version: str | None = None, + slicer_profile_hash: str | None = None, +) -> Provenance: + p = Path(file_path) + prov = Provenance( + created_at=datetime.now(timezone.utc).isoformat(), + policy_id=policy_id, + artifact=Artifact( + filename=p.name, + sha256=file_sha256(p), + size=p.stat().st_size, + format="stl", + ), + project=project, + part_id=part_id, + revision=revision, + owner=owner, + slicer=SlicerInfo(name=slicer_name, version=slicer_version, profile_hash=slicer_profile_hash) if any([slicer_name, slicer_version, slicer_profile_hash]) else None, + ) + return prov + +def _ed25519_pub_bytes(pk: Ed25519PublicKey) -> bytes: + return pk.public_bytes(encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw) + +def sign_provenance(prov: Provenance, signer_private_key_pem: str | Path) -> Provenance: + # produce canonical dict WITHOUT signature + obj = prov.model_dump(exclude={"signature"}) + # ensure signer_pub is included + sk = serialization.load_pem_private_key(Path(signer_private_key_pem).read_bytes(), password=None) + if not isinstance(sk, Ed25519PrivateKey): + raise TypeError("Signer key must be Ed25519 private key") + signer_pub_raw = _ed25519_pub_bytes(sk.public_key()) + obj["signer_pub_ed25519"] = b64encode(signer_pub_raw).decode("ascii") + + sig = sk.sign(_canon(obj)) + obj["signature"] = b64encode(sig).decode("ascii") + # validate with Pydantic and return + return Provenance.model_validate(obj) + +@dataclass +class VerifyResult: + ok: bool + reason: Optional[str] = None + +def verify_provenance( + file_path: str | Path, + manifest_path: str | Path, + expected_signer_pub_pem: Optional[str | Path] = None, +) -> VerifyResult: + # read and parse + mp = Path(manifest_path) + try: + raw = json.loads(mp.read_text(encoding="utf-8")) + prov = Provenance.model_validate(raw) + except Exception as e: + return VerifyResult(ok=False, reason=f"Invalid manifest JSON/schema: {e}") + + # verify artifact hash + actual = file_sha256(file_path) + if prov.artifact.sha256 != actual: + return VerifyResult(ok=False, reason="Artifact hash mismatch") + + # if there is a signature, verify it + has_sig = prov.signature is not None and prov.signer_pub_ed25519 is not None + if has_sig: + signer_pub_raw = b64decode(prov.signer_pub_ed25519) + pk = Ed25519PublicKey.from_public_bytes(signer_pub_raw) + # rebuild canonical object without signature + obj_wo_sig = prov.model_dump(exclude={"signature"}) + sig = b64decode(prov.signature) + try: + pk.verify(sig, _canon(obj_wo_sig)) + except Exception as e: + return VerifyResult(ok=False, reason=f"Signature invalid: {e}") + + # optional pinning + if expected_signer_pub_pem is not None: + exp_pk = serialization.load_pem_public_key(Path(expected_signer_pub_pem).read_bytes()) + if not isinstance(exp_pk, Ed25519PublicKey): + return VerifyResult(ok=False, reason="expected_signer_pub must be Ed25519 public key") + if _ed25519_pub_bytes(exp_pk) != signer_pub_raw: + return VerifyResult(ok=False, reason="Signer pubkey does not match expected") + + return VerifyResult(ok=True) + + # unsigned manifest -> succeed only if user didn’t require pinning + if expected_signer_pub_pem is not None: + return VerifyResult(ok=False, reason="Manifest is unsigned but an expected signer was provided") + return VerifyResult(ok=True) + +# ---------- I/O ---------- +def default_manifest_path(file_path: str | Path) -> Path: + p = Path(file_path) + return p.with_suffix(p.suffix + ".provenance.json") + +def save_manifest(prov: Provenance, path: str | Path) -> Path: + pp = Path(path) + pp.write_text(json.dumps(prov.model_dump(), indent=2), encoding="utf-8") + return pp \ No newline at end of file diff --git a/tests/unit/test_provenance_sidecar.py b/tests/unit/test_provenance_sidecar.py new file mode 100644 index 0000000..896e3ea --- /dev/null +++ b/tests/unit/test_provenance_sidecar.py @@ -0,0 +1,75 @@ +from __future__ import annotations +from pathlib import Path + +import json +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey +from typer.testing import CliRunner +from printshield.cli import app + +runner = CliRunner() + +def _gen_ed25519(tmp: Path): + sk = Ed25519PrivateKey.generate() + pk = sk.public_key() + sk_p = tmp / "sk.pem"; pk_p = tmp / "pk.pem" + sk_p.write_bytes(sk.private_bytes( + serialization.Encoding.PEM, serialization.PrivateFormat.PKCS8, serialization.NoEncryption())) + pk_p.write_bytes(pk.public_bytes( + serialization.Encoding.PEM, serialization.PublicFormat.SubjectPublicKeyInfo)) + return sk_p, pk_p + +def _write_cfg(tmp: Path) -> Path: + cfg = tmp / "ps.yaml" + cfg.write_text(f"policy_id: thesis\n" + f"audit:\n path: {tmp.as_posix()}/audit\n", encoding="utf-8") + return cfg + +def test_provenance_create_and_verify_signed(tmp_path: Path): + cfg = _write_cfg(tmp_path) + sk, pk = _gen_ed25519(tmp_path) + + f = tmp_path / "p.stl" + f.write_text("solid p\nendsolid p\n", encoding="utf-8") + man = tmp_path / "p.stl.provenance.json" + + r1 = runner.invoke(app, ["--config", str(cfg), "provenance", "create", str(f), "--signer-key", str(sk), "--manifest", str(man)]) + assert r1.exit_code == 0, r1.output + assert man.exists() + + r2 = runner.invoke(app, ["--config", str(cfg), "provenance", "verify", str(f), "--manifest", str(man), "--expected-signer-pub", str(pk)]) + assert r2.exit_code == 0, r2.output + assert "OK" in r2.stdout or '"ok": true' in r2.stdout + +def test_provenance_detects_file_tamper(tmp_path: Path): + cfg = _write_cfg(tmp_path) + sk, pk = _gen_ed25519(tmp_path) + + f = tmp_path / "t.stl" + f.write_text("solid t\nendsolid t\n", encoding="utf-8") + man = tmp_path / "t.stl.provenance.json" + + runner.invoke(app, ["--config", str(cfg), "provenance", "create", str(f), "--signer-key", str(sk), "--manifest", str(man)]) + # tamper file + f.write_text("solid t\n facet normal 0 0 1\nendsolid t\n", encoding="utf-8") + + r = runner.invoke(app, ["--config", str(cfg), "provenance", "verify", str(f), "--manifest", str(man)]) + assert r.exit_code != 0 + assert "FAIL" in r.stdout or '"ok": false' in r.stdout + +def test_provenance_detects_manifest_tamper(tmp_path: Path): + cfg = _write_cfg(tmp_path) + sk, pk = _gen_ed25519(tmp_path) + + f = tmp_path / "m.stl" + f.write_text("solid m\nendsolid m\n", encoding="utf-8") + man = tmp_path / "m.stl.provenance.json" + + runner.invoke(app, ["--config", str(cfg), "provenance", "create", str(f), "--signer-key", str(sk), "--manifest", str(man)]) + # tamper manifest (flip one character) + data = json.loads(man.read_text(encoding="utf-8")) + data["policy_id"] = "tampered" + man.write_text(json.dumps(data, indent=2), encoding="utf-8") + + r = runner.invoke(app, ["--config", str(cfg), "provenance", "verify", str(f), "--manifest", str(man), "--expected-signer-pub", str(pk)]) + assert r.exit_code != 0