Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ CLI and data enrichment utilities for the [Parallel API](https://docs.parallel.a
- **Web Search** - AI-powered search with domain filtering and date ranges
- **Content Extraction** - Extract clean markdown from any URL
- **Data Enrichment** - Enrich CSV, JSON, DuckDB, and BigQuery data with AI
- **Follow-up Context** - Chain research and enrichment tasks using `--previous-interaction-id`
- **AI-Assisted Planning** - Use natural language to define what data you want
- **Multiple Integrations** - Polars, DuckDB, Snowflake, BigQuery, Spark

Expand Down Expand Up @@ -208,13 +209,41 @@ echo "What is the latest funding for Anthropic?" | parallel-cli search - --json
echo "Research question" | parallel-cli research run - --json

# Async: launch then poll separately
parallel-cli research run "question" --no-wait --json # returns run_id
parallel-cli research run "question" --no-wait --json # returns run_id + interaction_id
parallel-cli research status trun_xxx --json # check status
parallel-cli research poll trun_xxx --json # wait and get result

# Follow-up: reuse context from a previous task
parallel-cli research run "follow-up question" --previous-interaction-id trun_xxx --json
parallel-cli enrich run --data '[...]' --previous-interaction-id trun_xxx --json

# Exit codes: 0=ok, 2=bad input, 3=auth error, 4=api error, 5=timeout
```

### Follow-up research with context reuse

Tasks return an `interaction_id` that can be passed as `--previous-interaction-id` on a subsequent research or enrichment run. The new task inherits the context from the prior one, so follow-up questions can reference earlier results without repeating them.

```bash
# Step 1: Run initial research (interaction_id is in the JSON output)
parallel-cli research run "What are the top 3 AI companies?" --json --processor lite-fast
# → { "run_id": "trun_abc", "interaction_id": "trun_abc", ... }

# Step 2: Follow-up research referencing the first task's context
parallel-cli research run "What products does the #1 company make?" \
--previous-interaction-id trun_abc --json

# Step 3: Use research context for enrichment
parallel-cli enrich run \
--data '[{"company": "Anthropic"}, {"company": "OpenAI"}]' \
--target enriched.csv \
--source-columns '[{"name": "company", "description": "Company name"}]' \
--enriched-columns '[{"name": "products", "description": "Main products"}]' \
--previous-interaction-id trun_abc --json
```

The `interaction_id` is shown in both human-readable and `--json` output for `research run`, `research status`, and `research poll`.

### More examples

```bash
Expand Down
35 changes: 32 additions & 3 deletions parallel_web_tools/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,10 @@ def enrich():
@click.option("--dry-run", is_flag=True, help="Show what would be executed without making API calls")
@click.option("--json", "output_json", is_flag=True, help="Output results as JSON to stdout")
@click.option("-o", "--output", "output_file", type=click.Path(), help="Save results to JSON file")
@click.option(
"--previous-interaction-id",
help="Interaction ID from a previous task to reuse as context",
)
def enrich_run(
config_file: str | None,
source_type: str | None,
Expand All @@ -974,6 +978,7 @@ def enrich_run(
dry_run: bool,
output_json: bool,
output_file: str | None,
previous_interaction_id: str | None,
):
"""Run data enrichment from YAML config or CLI arguments.

Expand Down Expand Up @@ -1043,7 +1048,7 @@ def enrich_run(

if not output_json:
console.print(f"[bold cyan]Running enrichment from {config_file}...[/bold cyan]\n")
result = run_enrichment(config_file, no_wait=no_wait)
result = run_enrichment(config_file, no_wait=no_wait, previous_interaction_id=previous_interaction_id)
else:
# After validation, these are guaranteed non-None
assert source_type is not None
Expand Down Expand Up @@ -1134,7 +1139,7 @@ def enrich_run(

if not output_json:
console.print(f"[bold cyan]Running enrichment: {source} -> {target}[/bold cyan]\n")
result = run_enrichment_from_dict(config, no_wait=no_wait)
result = run_enrichment_from_dict(config, no_wait=no_wait, previous_interaction_id=previous_interaction_id)

if no_wait and result:
if output_json:
Expand Down Expand Up @@ -1572,6 +1577,10 @@ def research():
"-o", "--output", "output_file", type=click.Path(), help="Save results (creates {name}.json and {name}.md)"
)
@click.option("--json", "output_json", is_flag=True, help="Output JSON to stdout")
@click.option(
"--previous-interaction-id",
help="Interaction ID from a previous task to reuse as context",
)
def research_run(
query: str | None,
input_file: str | None,
Expand All @@ -1582,19 +1591,26 @@ def research_run(
dry_run: bool,
output_file: str | None,
output_json: bool,
previous_interaction_id: str | None,
):
"""Run deep research on a question or topic.

QUERY is the research question (max 15,000 chars). Alternatively, use --input-file
or pass "-" as QUERY to read from stdin.

Use --previous-interaction-id to continue research from a prior task's context.
The interaction ID is shown in the output of every research run.

Examples:

parallel-cli research run "What are the latest developments in quantum computing?"

parallel-cli research run -f question.txt --processor ultra -o report

echo "My research question" | parallel-cli research run - --json

# Follow-up research using context from a previous task:
parallel-cli research run "What are the implications?" --previous-interaction-id trun_abc123
"""
# Read from stdin if "-" is passed
if query == "-":
Expand Down Expand Up @@ -1634,13 +1650,18 @@ def research_run(
# Create task and return immediately
if not output_json:
console.print(f"[dim]Creating research task with processor: {processor}...[/dim]")
result = create_research_task(query, processor=processor, source="cli")
result = create_research_task(
query, processor=processor, source="cli", previous_interaction_id=previous_interaction_id
)

if not output_json:
console.print(f"\n[bold green]Task created: {result['run_id']}[/bold green]")
if result.get("interaction_id"):
console.print(f"Interaction ID: {result['interaction_id']}")
console.print(f"Track progress: {result['result_url']}")
console.print("\n[dim]Use 'parallel-cli research status <run_id>' to check status[/dim]")
console.print("[dim]Use 'parallel-cli research poll <run_id>' to wait for results[/dim]")
console.print("[dim]Use '--previous-interaction-id' on a new run to continue this research[/dim]")

if output_json:
print(json.dumps(result, indent=2))
Expand Down Expand Up @@ -1673,6 +1694,7 @@ def on_status(status: str, run_id: str):
poll_interval=poll_interval,
on_status=on_status,
source="cli",
previous_interaction_id=previous_interaction_id,
)

_output_research_result(result, output_file, output_json)
Expand Down Expand Up @@ -1715,11 +1737,13 @@ def research_status(run_id: str, output_json: bool):
}.get(status, "white")

console.print(f"[bold]Task:[/bold] {run_id}")
console.print(f"[bold]Interaction ID:[/bold] {result.get('interaction_id', run_id)}")
console.print(f"[bold]Status:[/bold] [{status_color}]{status}[/{status_color}]")
console.print(f"[bold]URL:[/bold] {result['result_url']}")

if status == "completed":
console.print("\n[dim]Use 'parallel-cli research poll <run_id>' to retrieve results[/dim]")
console.print("[dim]Use '--previous-interaction-id' on a new run to continue this research[/dim]")

except Exception as e:
_handle_error(e, output_json=output_json)
Expand Down Expand Up @@ -1915,6 +1939,7 @@ def _output_research_result(
output = result.get("output", {})
output_data = {
"run_id": result.get("run_id"),
"interaction_id": result.get("interaction_id"),
"result_url": result.get("result_url"),
"status": result.get("status"),
"output": output.copy() if isinstance(output, dict) else output,
Expand Down Expand Up @@ -1957,6 +1982,7 @@ def _output_research_result(
else:
console.print("\n[bold green]Research Complete![/bold green]")
console.print(f"[dim]Task: {result.get('run_id')}[/dim]")
console.print(f"[dim]Interaction ID: {result.get('interaction_id')}[/dim]")
console.print(f"[dim]URL: {result.get('result_url')}[/dim]\n")

# Show executive summary if available
Expand All @@ -1973,6 +1999,9 @@ def _output_research_result(

if not output_file:
console.print("[dim]Use --output to save full results to a file, or --json to print to stdout[/dim]")
interaction_id = result.get("interaction_id")
if interaction_id:
console.print(f"[dim]Use '--previous-interaction-id {interaction_id}' to continue this research[/dim]")


# =============================================================================
Expand Down
29 changes: 26 additions & 3 deletions parallel_web_tools/core/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def enrich_batch(
poll_interval: int = 5,
include_basis: bool = True,
source: ClientSource = "python",
previous_interaction_id: str | None = None,
) -> list[dict[str, Any]]:
"""Enrich multiple inputs using the Parallel Task Group API.

Expand All @@ -111,6 +112,7 @@ def enrich_batch(
poll_interval: Seconds between status polls
include_basis: Whether to include citations
source: Client source identifier for User-Agent (default: python)
previous_interaction_id: Interaction ID from a previous task to reuse as context.

Returns:
List of result dictionaries in same order as inputs.
Expand All @@ -131,7 +133,13 @@ def enrich_batch(
taskgroup_id = task_group.task_group_id

# Add runs - use SDK type for proper typing
run_inputs: list[BetaRunInputParam] = [{"input": inp, "processor": processor} for inp in inputs]
def _make_run_input(inp: dict[str, Any]) -> BetaRunInputParam:
entry: BetaRunInputParam = {"input": inp, "processor": processor}
if previous_interaction_id:
entry["previous_interaction_id"] = previous_interaction_id
return entry

run_inputs: list[BetaRunInputParam] = [_make_run_input(inp) for inp in inputs]
response = client.beta.task_group.add_runs(
taskgroup_id,
default_task_spec=task_spec,
Expand Down Expand Up @@ -187,6 +195,7 @@ def enrich_single(
timeout: int = 300,
include_basis: bool = True,
source: ClientSource = "python",
previous_interaction_id: str | None = None,
) -> dict[str, Any]:
"""Enrich a single input using the Parallel API."""
results = enrich_batch(
Expand All @@ -197,6 +206,7 @@ def enrich_single(
timeout=timeout,
include_basis=include_basis,
source=source,
previous_interaction_id=previous_interaction_id,
)
return results[0] if results else {"error": "No result"}

Expand All @@ -207,6 +217,7 @@ def create_task_group(
OutputModel,
processor: str = "core-fast",
source: ClientSource = "python",
previous_interaction_id: str | None = None,
) -> dict[str, Any]:
"""Create a task group and add runs without waiting for completion.

Expand All @@ -216,6 +227,7 @@ def create_task_group(
OutputModel: Pydantic model for output schema.
processor: Parallel processor (default: core-fast).
source: Client source identifier for User-Agent.
previous_interaction_id: Interaction ID from a previous task to reuse as context.

Returns:
Dict with taskgroup_id, url, and num_runs.
Expand All @@ -238,12 +250,19 @@ def create_task_group(
taskgroup_id = task_group.task_group_id
logger.info(f"Created taskgroup id {taskgroup_id}")

# Build run input helper
def _make_run_input(row: dict[str, Any]) -> BetaRunInputParam:
entry: BetaRunInputParam = {"input": row, "processor": processor}
if previous_interaction_id:
entry["previous_interaction_id"] = previous_interaction_id
return entry

# Add runs in batches
batch_size = 100
total_created = 0
for i in range(0, len(input_data), batch_size):
batch = input_data[i : i + batch_size]
run_inputs: list[BetaRunInputParam] = [{"input": row, "processor": processor} for row in batch]
run_inputs: list[BetaRunInputParam] = [_make_run_input(row) for row in batch]
response = client.beta.task_group.add_runs(
taskgroup_id,
default_task_spec=task_spec,
Expand Down Expand Up @@ -361,21 +380,25 @@ def run_tasks(
processor: str = "core-fast",
source: ClientSource = "python",
timeout: int = 3600,
previous_interaction_id: str | None = None,
) -> list[Any]:
"""Run batch tasks using Pydantic models for schema.

Uses the Parallel SDK's task group API with proper SSE handling.

Args:
timeout: Max seconds to wait for completion (default: 3600 = 1 hour).
previous_interaction_id: Interaction ID from a previous task to reuse as context.
"""
logger = logging.getLogger(__name__)

batch_id = str(uuid.uuid4())
logger.info(f"Generated batch_id: {batch_id}")

# Create task group and add runs
tg_info = create_task_group(input_data, InputModel, OutputModel, processor, source)
tg_info = create_task_group(
input_data, InputModel, OutputModel, processor, source, previous_interaction_id=previous_interaction_id
)
taskgroup_id = tg_info["taskgroup_id"]

# Wait for completion
Expand Down
Loading
Loading