Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.11", "3.12"]
python-version: ["3.11", "3.12", "3.13"]

steps:
- uses: actions/checkout@v4
Expand Down
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,30 @@ Temporal.io Worker Interceptor for Predicate Authority Zero-Trust authorization.

This package provides a pre-execution security gate for all Temporal Activities, enforcing cryptographic authorization mandates before any activity code runs.

## Prerequisites

This package requires the **Predicate Authority Sidecar** daemon to be running. The sidecar is a lightweight Rust binary that handles policy evaluation and mandate signing.

| Resource | Link |
|----------|------|
| Sidecar Repository | [github.com/PredicateSystems/predicate-authority-sidecar](https://github.com/PredicateSystems/predicate-authority-sidecar) |
| Download Binaries | [Latest Releases](https://github.com/PredicateSystems/predicate-authority-sidecar/releases) |
| License | MIT / Apache 2.0 |

### Quick Sidecar Setup

```bash
# Download the latest release for your platform
# Linux x64, macOS x64/ARM64, Windows x64 available

# Extract and run
tar -xzf predicate-authorityd-*.tar.gz
chmod +x predicate-authorityd

# Start with a policy file
./predicate-authorityd --port 8787 --policy-file policy.json
```

## Installation

```bash
Expand Down
73 changes: 73 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Predicate Temporal Python Examples

This directory contains examples demonstrating how to use `predicate-temporal` to secure Temporal activities.

## Prerequisites

1. Install dependencies:
```bash
pip install temporalio predicate-authority predicate-temporal
```

2. Start the Predicate Authority daemon:
```bash
# Download from https://github.com/PredicateSystems/predicate-authority-sidecar/releases
./predicate-authorityd --port 8787 --policy-file policy.json
```

3. Start a local Temporal server:
```bash
temporal server start-dev
```

## Examples

### Basic Example (`basic_worker.py`)

A minimal example showing:
- Setting up activities with Predicate interceptor
- Running a workflow that executes secured activities
- Handling authorization denials

Run with:
```bash
python basic_worker.py
```

### E-commerce Example (`ecommerce_worker.py`)

A realistic e-commerce scenario with:
- Order processing activities
- Payment handling
- Inventory management
- Policy-based access control

Run with:
```bash
python ecommerce_worker.py
```

## Policy File

The `policy.json` file defines which activities are allowed. Example:

```json
{
"rules": [
{
"name": "allow-order-processing",
"effect": "allow",
"principals": ["temporal-worker"],
"actions": ["process_order", "check_inventory", "send_confirmation"],
"resources": ["*"]
},
{
"name": "deny-admin-actions",
"effect": "deny",
"principals": ["*"],
"actions": ["delete_*", "admin_*"],
"resources": ["*"]
}
]
}
```
198 changes: 198 additions & 0 deletions examples/basic_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
"""
Basic example demonstrating Predicate Temporal interceptor.

This example shows:
1. Setting up a Temporal worker with Predicate authorization
2. Defining activities that will be secured
3. Running a workflow that executes those activities

Prerequisites:
- Temporal server running locally (temporal server start-dev)
- Predicate Authority daemon running (./predicate-authorityd --port 8787 --policy-file policy.json)
"""

from __future__ import annotations

import asyncio
from dataclasses import dataclass
from datetime import timedelta

from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker

from predicate_authority import AuthorityClient
from predicate_temporal import PredicateInterceptor


# ============================================================================
# Activities - These will be secured by Predicate Authority
# ============================================================================


@activity.defn
async def greet(name: str) -> str:
"""Simple greeting activity - allowed by policy."""
return f"Hello, {name}!"


@activity.defn
async def fetch_data(data_id: str) -> dict:
"""Fetch data activity - allowed by policy."""
# Simulate fetching data from a database
return {"id": data_id, "value": "sample_data", "status": "active"}


@activity.defn
async def process_data(data: dict) -> dict:
"""Process data activity - allowed by policy."""
# Simulate processing
return {**data, "processed": True, "processed_by": "temporal-worker"}


@activity.defn
async def delete_all_records() -> str:
"""
Dangerous activity - DENIED by policy.

This activity matches the "deny-dangerous-operations" rule
and will be blocked before execution.
"""
# This code will NEVER run due to Predicate authorization
return "All records deleted!"


# ============================================================================
# Workflow
# ============================================================================


@dataclass
class WorkflowInput:
name: str
data_id: str


@dataclass
class WorkflowResult:
greeting: str
processed_data: dict


@workflow.defn
class BasicWorkflow:
"""Basic workflow demonstrating secured activities."""

@workflow.run
async def run(self, input: WorkflowInput) -> WorkflowResult:
# This activity will be allowed
greeting = await workflow.execute_activity(
greet,
input.name,
start_to_close_timeout=timedelta(seconds=10),
)

# This activity will be allowed
data = await workflow.execute_activity(
fetch_data,
input.data_id,
start_to_close_timeout=timedelta(seconds=10),
)

# This activity will be allowed
processed = await workflow.execute_activity(
process_data,
data,
start_to_close_timeout=timedelta(seconds=10),
)

return WorkflowResult(greeting=greeting, processed_data=processed)


@workflow.defn
class DangerousWorkflow:
"""Workflow that attempts a dangerous operation - will be blocked."""

@workflow.run
async def run(self) -> str:
# This will raise PermissionError due to Predicate denial
return await workflow.execute_activity(
delete_all_records,
start_to_close_timeout=timedelta(seconds=10),
)


# ============================================================================
# Main
# ============================================================================


async def main():
"""Run the example worker and execute workflows."""

# Connect to Temporal
client = await Client.connect("localhost:7233")

# Initialize Predicate Authority client
# This connects to the predicate-authorityd daemon
authority_ctx = AuthorityClient.from_policy_file(
policy_file="policy.json",
secret_key="demo-secret-key-for-signing",
ttl_seconds=300,
)

# Create the Predicate interceptor
interceptor = PredicateInterceptor(
authority_client=authority_ctx.client,
principal="temporal-worker",
)

# Create worker with the interceptor
worker = Worker(
client,
task_queue="predicate-demo-queue",
workflows=[BasicWorkflow, DangerousWorkflow],
activities=[greet, fetch_data, process_data, delete_all_records],
interceptors=[interceptor],
)

print("Starting worker with Predicate authorization...")
print("=" * 60)

# Run worker in background
async with worker:
# Execute the basic workflow - should succeed
print("\n[1] Running BasicWorkflow (should succeed)...")
try:
result = await client.execute_workflow(
BasicWorkflow.run,
WorkflowInput(name="Alice", data_id="data-123"),
id="basic-workflow-1",
task_queue="predicate-demo-queue",
)
print(f" Greeting: {result.greeting}")
print(f" Processed data: {result.processed_data}")
print(" Status: SUCCESS")
except Exception as e:
print(f" Status: FAILED - {e}")

# Execute the dangerous workflow - should be blocked
print("\n[2] Running DangerousWorkflow (should be blocked)...")
try:
result = await client.execute_workflow(
DangerousWorkflow.run,
id="dangerous-workflow-1",
task_queue="predicate-demo-queue",
)
print(f" Result: {result}")
print(" Status: SUCCESS (unexpected!)")
except Exception as e:
print(f" Status: BLOCKED by Predicate Authority")
print(f" Error: {type(e).__name__}: {e}")

print("\n" + "=" * 60)
print("Demo complete!")


if __name__ == "__main__":
asyncio.run(main())
Loading