Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 121 additions & 39 deletions parallel_web_tools/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -1555,9 +1555,19 @@ def research():
@click.option("--no-wait", is_flag=True, help="Return immediately after creating task (don't poll)")
@click.option("--dry-run", is_flag=True, help="Show what would be executed without making API calls")
@click.option(
"-o", "--output", "output_file", type=click.Path(), help="Save results (creates {name}.json and {name}.md)"
"--text",
"use_text",
is_flag=True,
help="Use text schema — returns markdown report instead of structured JSON",
)
@click.option(
"-o",
"--output",
"output_path",
type=click.Path(),
default=None,
help="Base path for output files (default: auto-generated from run ID)",
)
@click.option("--json", "output_json", is_flag=True, help="Output JSON to stdout")
def research_run(
query: str | None,
input_file: str | None,
Expand All @@ -1566,22 +1576,30 @@ def research_run(
poll_interval: int,
no_wait: bool,
dry_run: bool,
output_file: str | None,
output_json: bool,
use_text: bool,
output_path: str | None,
):
"""Run deep research on a question or topic.

QUERY is the research question (max 15,000 chars). Alternatively, use --input-file
or pass "-" as QUERY to read from stdin.

By default, results use auto schema (structured JSON) and save to a .json file.
Use --text for a markdown report (saves both .json and .md). Use -o to set a custom
base filename; otherwise the run ID is used.

Examples:

parallel-cli research run "What are the latest developments in quantum computing?"

parallel-cli research run -f question.txt --processor ultra -o report
parallel-cli research run --text "Market analysis of HVAC industry" -o report

parallel-cli research run -f question.txt --processor ultra --text

echo "My research question" | parallel-cli research run - --json
parallel-cli research run "My question" -o my-report
"""
output_schema = "text" if use_text else "auto"

# Read from stdin if "-" is passed
if query == "-":
query = click.get_text_stream("stdin").read().strip()
Expand All @@ -1603,44 +1621,35 @@ def research_run(
"query": query[:200] + "..." if len(query) > 200 else query,
"query_length": len(query),
"processor": processor,
"output_schema": output_schema,
"expected_latency": RESEARCH_PROCESSORS[processor],
}
if output_json:
print(json.dumps(dry_run_data, indent=2))
else:
console.print("[bold]Dry run — no API calls will be made[/bold]\n")
console.print(f" [bold]Query:[/bold] {dry_run_data['query']}")
console.print(f" [bold]Length:[/bold] {len(query)} chars")
console.print(f" [bold]Processor:[/bold] {processor}")
console.print(f" [bold]Latency:[/bold] {RESEARCH_PROCESSORS[processor]}")
console.print("[bold]Dry run — no API calls will be made[/bold]\n")
console.print(f" [bold]Query:[/bold] {dry_run_data['query']}")
console.print(f" [bold]Length:[/bold] {len(query)} chars")
console.print(f" [bold]Processor:[/bold] {processor}")
console.print(f" [bold]Schema:[/bold] {output_schema}")
console.print(f" [bold]Latency:[/bold] {RESEARCH_PROCESSORS[processor]}")
return

try:
if no_wait:
# Create task and return immediately
if not output_json:
console.print(f"[dim]Creating research task with processor: {processor}...[/dim]")
result = create_research_task(query, processor=processor, source="cli")

if not output_json:
console.print(f"\n[bold green]Task created: {result['run_id']}[/bold green]")
console.print(f"Track progress: {result['result_url']}")
console.print("\n[dim]Use 'parallel-cli research status <run_id>' to check status[/dim]")
console.print("[dim]Use 'parallel-cli research poll <run_id>' to wait for results[/dim]")
console.print(f"[dim]Creating research task with processor: {processor}...[/dim]")
result = create_research_task(query, processor=processor, source="cli", output_schema=output_schema)

if output_json:
print(json.dumps(result, indent=2))
console.print(f"\n[bold green]Task created: {result['run_id']}[/bold green]")
console.print(f"Track progress: {result['result_url']}")
console.print("\n[dim]Use 'parallel-cli research status <run_id>' to check status[/dim]")
console.print("[dim]Use 'parallel-cli research poll <run_id>' to wait for results[/dim]")
else:
# Run and wait for results
if not output_json:
console.print(f"[bold cyan]Starting deep research with processor: {processor}[/bold cyan]")
console.print(f"[dim]This may take {RESEARCH_PROCESSORS[processor]}[/dim]\n")
console.print(f"[bold cyan]Starting deep research with processor: {processor}[/bold cyan]")
console.print(f"[dim]This may take {RESEARCH_PROCESSORS[processor]}[/dim]\n")

start_time = time.time()

def on_status(status: str, run_id: str):
if output_json:
return
elapsed = time.time() - start_time
mins, secs = divmod(int(elapsed), 60)
elapsed_str = f"{mins}m{secs:02d}s" if mins else f"{secs}s"
Expand All @@ -1659,22 +1668,19 @@ def on_status(status: str, run_id: str):
poll_interval=poll_interval,
on_status=on_status,
source="cli",
output_schema=output_schema,
)

_output_research_result(result, output_file, output_json)
_save_and_display_research(result, output_schema, output_path)

except TimeoutError as e:
if output_json:
error_data = {"error": {"message": str(e), "type": "TimeoutError"}}
print(json.dumps(error_data, indent=2))
else:
console.print(f"[bold yellow]Timeout: {e}[/bold yellow]")
console.print("[dim]The task is still running. Use 'parallel-cli research poll <run_id>' to resume.[/dim]")
console.print(f"[bold yellow]Timeout: {e}[/bold yellow]")
console.print("[dim]The task is still running. Use 'parallel-cli research poll <run_id>' to resume.[/dim]")
sys.exit(EXIT_TIMEOUT)
except RuntimeError as e:
_handle_error(e, output_json=output_json)
_handle_error(e)
except Exception as e:
_handle_error(e, output_json=output_json)
_handle_error(e)


@research.command(name="status")
Expand Down Expand Up @@ -1783,6 +1789,82 @@ def research_processors(output_json: bool):
console.print("\n[dim]Use --processor/-p to select a processor[/dim]")


def _save_and_display_research(
result: dict,
output_schema: str,
output_path: str | None,
):
"""Save research result to file(s) and display the executive summary.

For auto schema (default): saves {base}.json with full response.
For text schema (--text): saves {base}.json with full response AND
{base}.md with just the markdown content.

Args:
result: The research result dict from the API.
output_schema: "auto" or "text" — determines file format.
output_path: Base path for output files. None means auto-generate from run_id.
"""
from pathlib import Path

output = result.get("output", {})
run_id = result.get("run_id", "research")

# Resolve base path (strip any extension)
if output_path is None:
base_path = Path(run_id)
else:
base_path = Path(output_path)
if base_path.suffix:
base_path = base_path.with_suffix("")

# Always write the full response as JSON
json_path = base_path.with_suffix(".json")
output_data = {
"run_id": run_id,
"result_url": result.get("result_url"),
"status": result.get("status"),
"output": output,
}

# For text schema, also write markdown content to .md
if output_schema == "text":
md_path = base_path.with_suffix(".md")
content = output.get("content") if isinstance(output, dict) else output
content_text = _content_to_markdown(content) if content else ""

if content_text:
with open(md_path, "w") as f:
f.write(content_text)
console.print(f"[green]Content saved to:[/green] {md_path}")

# Replace content in JSON with reference to the .md file
output_data = output_data.copy()
output_data["output"] = output.copy() if isinstance(output, dict) else {"raw": output}
output_data["output"]["content_file"] = md_path.name
output_data["output"].pop("content", None)

with open(json_path, "w") as f:
json.dump(output_data, f, indent=2, default=str)
console.print(f"[green]Metadata saved to:[/green] {json_path}")

# Always display summary to console
console.print("\n[bold green]Research Complete![/bold green]")
console.print(f"[dim]Task: {run_id}[/dim]")
console.print(f"[dim]URL: {result.get('result_url')}[/dim]\n")

# Show executive summary
content = output.get("content") if isinstance(output, dict) else None
summary = _extract_executive_summary(content) if content else None

if summary:
from rich.markdown import Markdown
from rich.panel import Panel

console.print(Panel(Markdown(summary), title="Executive Summary", border_style="cyan"))
console.print()


def _extract_executive_summary(content: Any) -> str | None:
"""Extract the executive summary from research content.

Expand Down
2 changes: 2 additions & 0 deletions parallel_web_tools/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
)
from parallel_web_tools.core.research import (
RESEARCH_PROCESSORS,
OutputSchemaType,
create_research_task,
get_research_result,
get_research_status,
Expand Down Expand Up @@ -123,6 +124,7 @@
"run_enrichment_from_dict",
# Research
"RESEARCH_PROCESSORS",
"OutputSchemaType",
"create_research_task",
"get_research_result",
"get_research_status",
Expand Down
47 changes: 38 additions & 9 deletions parallel_web_tools/core/research.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
from __future__ import annotations

from collections.abc import Callable
from typing import Any
from typing import Any, Literal

from parallel_web_tools.core.auth import create_client
from parallel_web_tools.core.polling import poll_until
from parallel_web_tools.core.user_agent import ClientSource

# Output schema types supported for deep research
OutputSchemaType = Literal["auto", "text"]

# Base URL for viewing results
PLATFORM_BASE = "https://platform.parallel.ai"

Expand Down Expand Up @@ -69,11 +72,24 @@ def _serialize_output(output: Any) -> dict[str, Any]:
return {"raw": str(output)}


def _build_task_spec(output_schema: OutputSchemaType) -> Any:
"""Build task_spec kwargs for the SDK based on output schema type.

Returns None for auto schema (SDK default), or a TaskSpecParam for text.
"""
if output_schema == "text":
from parallel.types import TaskSpecParam, TextSchemaParam

return TaskSpecParam(output_schema=TextSchemaParam(type="text"))
return None


def create_research_task(
query: str,
processor: str = "pro-fast",
api_key: str | None = None,
source: ClientSource = "python",
output_schema: OutputSchemaType = "auto",
) -> dict[str, Any]:
"""Create a deep research task without waiting for results.

Expand All @@ -82,16 +98,22 @@ def create_research_task(
processor: Processor tier (see RESEARCH_PROCESSORS).
api_key: Optional API key.
source: Client source identifier for User-Agent.
output_schema: Output schema type - "auto" for structured JSON, "text" for markdown.

Returns:
Dict with run_id, result_url, and other task metadata.
"""
client = create_client(api_key, source)

task = client.task_run.create(
input=query[:15000],
processor=processor,
)
create_kwargs: dict[str, Any] = {
"input": query[:15000],
"processor": processor,
}
task_spec = _build_task_spec(output_schema)
if task_spec is not None:
create_kwargs["task_spec"] = task_spec

task = client.task_run.create(**create_kwargs)

return {
"run_id": task.run_id,
Expand Down Expand Up @@ -226,6 +248,7 @@ def run_research(
poll_interval: int = 45,
on_status: Callable[[str, str], None] | None = None,
source: ClientSource = "python",
output_schema: OutputSchemaType = "auto",
) -> dict[str, Any]:
"""Run deep research and wait for results.

Expand All @@ -240,6 +263,7 @@ def run_research(
poll_interval: Seconds between status checks (default: 45).
on_status: Optional callback called with (status, run_id) on each poll.
source: Client source identifier for User-Agent.
output_schema: Output schema type - "auto" for structured JSON, "text" for markdown.

Returns:
Dict with content and metadata.
Expand All @@ -250,10 +274,15 @@ def run_research(
"""
client = create_client(api_key, source)

task = client.task_run.create(
input=query[:15000],
processor=processor,
)
create_kwargs: dict[str, Any] = {
"input": query[:15000],
"processor": processor,
}
task_spec = _build_task_spec(output_schema)
if task_spec is not None:
create_kwargs["task_spec"] = task_spec

task = client.task_run.create(**create_kwargs)
run_id = task.run_id
result_url = f"{PLATFORM_BASE}/play/deep-research/{run_id}"

Expand Down
Loading
Loading