diff --git a/README.md b/README.md index 5039dcd..a8aef6b 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ CLI and data enrichment utilities for the [Parallel API](https://docs.parallel.a - **Web Search** - AI-powered search with domain filtering and date ranges - **Content Extraction** - Extract clean markdown from any URL - **Data Enrichment** - Enrich CSV, JSON, DuckDB, and BigQuery data with AI +- **Follow-up Context** - Chain research and enrichment tasks using `--previous-interaction-id` - **AI-Assisted Planning** - Use natural language to define what data you want - **Multiple Integrations** - Polars, DuckDB, Snowflake, BigQuery, Spark @@ -208,13 +209,41 @@ echo "What is the latest funding for Anthropic?" | parallel-cli search - --json echo "Research question" | parallel-cli research run - --json # Async: launch then poll separately -parallel-cli research run "question" --no-wait --json # returns run_id +parallel-cli research run "question" --no-wait --json # returns run_id + interaction_id parallel-cli research status trun_xxx --json # check status parallel-cli research poll trun_xxx --json # wait and get result +# Follow-up: reuse context from a previous task +parallel-cli research run "follow-up question" --previous-interaction-id trun_xxx --json +parallel-cli enrich run --data '[...]' --previous-interaction-id trun_xxx --json + # Exit codes: 0=ok, 2=bad input, 3=auth error, 4=api error, 5=timeout ``` +### Follow-up research with context reuse + +Tasks return an `interaction_id` that can be passed as `--previous-interaction-id` on a subsequent research or enrichment run. The new task inherits the context from the prior one, so follow-up questions can reference earlier results without repeating them. + +```bash +# Step 1: Run initial research (interaction_id is in the JSON output) +parallel-cli research run "What are the top 3 AI companies?" --json --processor lite-fast +# → { "run_id": "trun_abc", "interaction_id": "trun_abc", ... } + +# Step 2: Follow-up research referencing the first task's context +parallel-cli research run "What products does the #1 company make?" \ + --previous-interaction-id trun_abc --json + +# Step 3: Use research context for enrichment +parallel-cli enrich run \ + --data '[{"company": "Anthropic"}, {"company": "OpenAI"}]' \ + --target enriched.csv \ + --source-columns '[{"name": "company", "description": "Company name"}]' \ + --enriched-columns '[{"name": "products", "description": "Main products"}]' \ + --previous-interaction-id trun_abc --json +``` + +The `interaction_id` is shown in both human-readable and `--json` output for `research run`, `research status`, and `research poll`. + ### More examples ```bash diff --git a/parallel_web_tools/cli/commands.py b/parallel_web_tools/cli/commands.py index 74945dd..261ffc3 100644 --- a/parallel_web_tools/cli/commands.py +++ b/parallel_web_tools/cli/commands.py @@ -960,6 +960,10 @@ def enrich(): @click.option("--dry-run", is_flag=True, help="Show what would be executed without making API calls") @click.option("--json", "output_json", is_flag=True, help="Output results as JSON to stdout") @click.option("-o", "--output", "output_file", type=click.Path(), help="Save results to JSON file") +@click.option( + "--previous-interaction-id", + help="Interaction ID from a previous task to reuse as context", +) def enrich_run( config_file: str | None, source_type: str | None, @@ -974,6 +978,7 @@ def enrich_run( dry_run: bool, output_json: bool, output_file: str | None, + previous_interaction_id: str | None, ): """Run data enrichment from YAML config or CLI arguments. @@ -1043,7 +1048,7 @@ def enrich_run( if not output_json: console.print(f"[bold cyan]Running enrichment from {config_file}...[/bold cyan]\n") - result = run_enrichment(config_file, no_wait=no_wait) + result = run_enrichment(config_file, no_wait=no_wait, previous_interaction_id=previous_interaction_id) else: # After validation, these are guaranteed non-None assert source_type is not None @@ -1134,7 +1139,7 @@ def enrich_run( if not output_json: console.print(f"[bold cyan]Running enrichment: {source} -> {target}[/bold cyan]\n") - result = run_enrichment_from_dict(config, no_wait=no_wait) + result = run_enrichment_from_dict(config, no_wait=no_wait, previous_interaction_id=previous_interaction_id) if no_wait and result: if output_json: @@ -1572,6 +1577,10 @@ def research(): "-o", "--output", "output_file", type=click.Path(), help="Save results (creates {name}.json and {name}.md)" ) @click.option("--json", "output_json", is_flag=True, help="Output JSON to stdout") +@click.option( + "--previous-interaction-id", + help="Interaction ID from a previous task to reuse as context", +) def research_run( query: str | None, input_file: str | None, @@ -1582,12 +1591,16 @@ def research_run( dry_run: bool, output_file: str | None, output_json: bool, + previous_interaction_id: str | None, ): """Run deep research on a question or topic. QUERY is the research question (max 15,000 chars). Alternatively, use --input-file or pass "-" as QUERY to read from stdin. + Use --previous-interaction-id to continue research from a prior task's context. + The interaction ID is shown in the output of every research run. + Examples: parallel-cli research run "What are the latest developments in quantum computing?" @@ -1595,6 +1608,9 @@ def research_run( parallel-cli research run -f question.txt --processor ultra -o report echo "My research question" | parallel-cli research run - --json + + # Follow-up research using context from a previous task: + parallel-cli research run "What are the implications?" --previous-interaction-id trun_abc123 """ # Read from stdin if "-" is passed if query == "-": @@ -1634,13 +1650,18 @@ def research_run( # Create task and return immediately if not output_json: console.print(f"[dim]Creating research task with processor: {processor}...[/dim]") - result = create_research_task(query, processor=processor, source="cli") + result = create_research_task( + query, processor=processor, source="cli", previous_interaction_id=previous_interaction_id + ) if not output_json: console.print(f"\n[bold green]Task created: {result['run_id']}[/bold green]") + if result.get("interaction_id"): + console.print(f"Interaction ID: {result['interaction_id']}") console.print(f"Track progress: {result['result_url']}") console.print("\n[dim]Use 'parallel-cli research status ' to check status[/dim]") console.print("[dim]Use 'parallel-cli research poll ' to wait for results[/dim]") + console.print("[dim]Use '--previous-interaction-id' on a new run to continue this research[/dim]") if output_json: print(json.dumps(result, indent=2)) @@ -1673,6 +1694,7 @@ def on_status(status: str, run_id: str): poll_interval=poll_interval, on_status=on_status, source="cli", + previous_interaction_id=previous_interaction_id, ) _output_research_result(result, output_file, output_json) @@ -1715,11 +1737,13 @@ def research_status(run_id: str, output_json: bool): }.get(status, "white") console.print(f"[bold]Task:[/bold] {run_id}") + console.print(f"[bold]Interaction ID:[/bold] {result.get('interaction_id', run_id)}") console.print(f"[bold]Status:[/bold] [{status_color}]{status}[/{status_color}]") console.print(f"[bold]URL:[/bold] {result['result_url']}") if status == "completed": console.print("\n[dim]Use 'parallel-cli research poll ' to retrieve results[/dim]") + console.print("[dim]Use '--previous-interaction-id' on a new run to continue this research[/dim]") except Exception as e: _handle_error(e, output_json=output_json) @@ -1915,6 +1939,7 @@ def _output_research_result( output = result.get("output", {}) output_data = { "run_id": result.get("run_id"), + "interaction_id": result.get("interaction_id"), "result_url": result.get("result_url"), "status": result.get("status"), "output": output.copy() if isinstance(output, dict) else output, @@ -1957,6 +1982,7 @@ def _output_research_result( else: console.print("\n[bold green]Research Complete![/bold green]") console.print(f"[dim]Task: {result.get('run_id')}[/dim]") + console.print(f"[dim]Interaction ID: {result.get('interaction_id')}[/dim]") console.print(f"[dim]URL: {result.get('result_url')}[/dim]\n") # Show executive summary if available @@ -1973,6 +1999,9 @@ def _output_research_result( if not output_file: console.print("[dim]Use --output to save full results to a file, or --json to print to stdout[/dim]") + interaction_id = result.get("interaction_id") + if interaction_id: + console.print(f"[dim]Use '--previous-interaction-id {interaction_id}' to continue this research[/dim]") # ============================================================================= diff --git a/parallel_web_tools/core/batch.py b/parallel_web_tools/core/batch.py index e71fa80..bd1d569 100644 --- a/parallel_web_tools/core/batch.py +++ b/parallel_web_tools/core/batch.py @@ -99,6 +99,7 @@ def enrich_batch( poll_interval: int = 5, include_basis: bool = True, source: ClientSource = "python", + previous_interaction_id: str | None = None, ) -> list[dict[str, Any]]: """Enrich multiple inputs using the Parallel Task Group API. @@ -111,6 +112,7 @@ def enrich_batch( poll_interval: Seconds between status polls include_basis: Whether to include citations source: Client source identifier for User-Agent (default: python) + previous_interaction_id: Interaction ID from a previous task to reuse as context. Returns: List of result dictionaries in same order as inputs. @@ -131,7 +133,13 @@ def enrich_batch( taskgroup_id = task_group.task_group_id # Add runs - use SDK type for proper typing - run_inputs: list[BetaRunInputParam] = [{"input": inp, "processor": processor} for inp in inputs] + def _make_run_input(inp: dict[str, Any]) -> BetaRunInputParam: + entry: BetaRunInputParam = {"input": inp, "processor": processor} + if previous_interaction_id: + entry["previous_interaction_id"] = previous_interaction_id + return entry + + run_inputs: list[BetaRunInputParam] = [_make_run_input(inp) for inp in inputs] response = client.beta.task_group.add_runs( taskgroup_id, default_task_spec=task_spec, @@ -187,6 +195,7 @@ def enrich_single( timeout: int = 300, include_basis: bool = True, source: ClientSource = "python", + previous_interaction_id: str | None = None, ) -> dict[str, Any]: """Enrich a single input using the Parallel API.""" results = enrich_batch( @@ -197,6 +206,7 @@ def enrich_single( timeout=timeout, include_basis=include_basis, source=source, + previous_interaction_id=previous_interaction_id, ) return results[0] if results else {"error": "No result"} @@ -207,6 +217,7 @@ def create_task_group( OutputModel, processor: str = "core-fast", source: ClientSource = "python", + previous_interaction_id: str | None = None, ) -> dict[str, Any]: """Create a task group and add runs without waiting for completion. @@ -216,6 +227,7 @@ def create_task_group( OutputModel: Pydantic model for output schema. processor: Parallel processor (default: core-fast). source: Client source identifier for User-Agent. + previous_interaction_id: Interaction ID from a previous task to reuse as context. Returns: Dict with taskgroup_id, url, and num_runs. @@ -238,12 +250,19 @@ def create_task_group( taskgroup_id = task_group.task_group_id logger.info(f"Created taskgroup id {taskgroup_id}") + # Build run input helper + def _make_run_input(row: dict[str, Any]) -> BetaRunInputParam: + entry: BetaRunInputParam = {"input": row, "processor": processor} + if previous_interaction_id: + entry["previous_interaction_id"] = previous_interaction_id + return entry + # Add runs in batches batch_size = 100 total_created = 0 for i in range(0, len(input_data), batch_size): batch = input_data[i : i + batch_size] - run_inputs: list[BetaRunInputParam] = [{"input": row, "processor": processor} for row in batch] + run_inputs: list[BetaRunInputParam] = [_make_run_input(row) for row in batch] response = client.beta.task_group.add_runs( taskgroup_id, default_task_spec=task_spec, @@ -361,6 +380,7 @@ def run_tasks( processor: str = "core-fast", source: ClientSource = "python", timeout: int = 3600, + previous_interaction_id: str | None = None, ) -> list[Any]: """Run batch tasks using Pydantic models for schema. @@ -368,6 +388,7 @@ def run_tasks( Args: timeout: Max seconds to wait for completion (default: 3600 = 1 hour). + previous_interaction_id: Interaction ID from a previous task to reuse as context. """ logger = logging.getLogger(__name__) @@ -375,7 +396,9 @@ def run_tasks( logger.info(f"Generated batch_id: {batch_id}") # Create task group and add runs - tg_info = create_task_group(input_data, InputModel, OutputModel, processor, source) + tg_info = create_task_group( + input_data, InputModel, OutputModel, processor, source, previous_interaction_id=previous_interaction_id + ) taskgroup_id = tg_info["taskgroup_id"] # Wait for completion diff --git a/parallel_web_tools/core/research.py b/parallel_web_tools/core/research.py index 5ff78e3..7cc6474 100644 --- a/parallel_web_tools/core/research.py +++ b/parallel_web_tools/core/research.py @@ -74,6 +74,7 @@ def create_research_task( processor: str = "pro-fast", api_key: str | None = None, source: ClientSource = "python", + previous_interaction_id: str | None = None, ) -> dict[str, Any]: """Create a deep research task without waiting for results. @@ -82,19 +83,25 @@ def create_research_task( processor: Processor tier (see RESEARCH_PROCESSORS). api_key: Optional API key. source: Client source identifier for User-Agent. + previous_interaction_id: Interaction ID from a previous task to reuse as context. Returns: - Dict with run_id, result_url, and other task metadata. + Dict with run_id, interaction_id, result_url, and other task metadata. """ client = create_client(api_key, source) - task = client.task_run.create( - input=query[:15000], - processor=processor, - ) + create_kwargs: dict[str, Any] = { + "input": query[:15000], + "processor": processor, + } + if previous_interaction_id: + create_kwargs["previous_interaction_id"] = previous_interaction_id + + task = client.task_run.create(**create_kwargs) return { "run_id": task.run_id, + "interaction_id": getattr(task, "interaction_id", task.run_id), "result_url": f"{PLATFORM_BASE}/play/deep-research/{task.run_id}", "processor": processor, "status": getattr(task, "status", "pending"), @@ -114,13 +121,14 @@ def get_research_status( source: Client source identifier for User-Agent. Returns: - Dict with status and other task info. + Dict with status, interaction_id, and other task info. """ client = create_client(api_key, source) status = client.task_run.retrieve(run_id=run_id) return { "run_id": run_id, + "interaction_id": getattr(status, "interaction_id", run_id), "status": status.status, "result_url": f"{PLATFORM_BASE}/play/deep-research/{run_id}", } @@ -162,6 +170,7 @@ def _poll_until_complete( timeout: int, poll_interval: int, on_status: Callable[[str, str], None] | None, + interaction_id: str | None = None, ) -> dict[str, Any]: """Poll a research task until completion and return the result. @@ -172,6 +181,7 @@ def _poll_until_complete( timeout: Maximum wait time in seconds. poll_interval: Seconds between status checks. on_status: Optional callback called with (status, run_id) on each poll. + interaction_id: Known interaction ID (updated from poll responses). Returns: Dict with content and metadata. @@ -180,9 +190,15 @@ def _poll_until_complete( TimeoutError: If the task doesn't complete within timeout. RuntimeError: If the task fails or is cancelled. """ + # Track interaction_id from poll responses + poll_state = {"interaction_id": interaction_id} def retrieve(): - return client.task_run.retrieve(run_id=run_id) + response = client.task_run.retrieve(run_id=run_id) + # Capture interaction_id from the latest response + if hasattr(response, "interaction_id") and response.interaction_id: + poll_state["interaction_id"] = response.interaction_id + return response def extract_status(response): return response.status @@ -193,6 +209,7 @@ def fetch_result(): output_data = _serialize_output(output) return { "run_id": run_id, + "interaction_id": poll_state["interaction_id"] or run_id, "result_url": result_url, "status": "completed", "output": output_data, @@ -226,6 +243,7 @@ def run_research( poll_interval: int = 45, on_status: Callable[[str, str], None] | None = None, source: ClientSource = "python", + previous_interaction_id: str | None = None, ) -> dict[str, Any]: """Run deep research and wait for results. @@ -240,6 +258,7 @@ def run_research( poll_interval: Seconds between status checks (default: 45). on_status: Optional callback called with (status, run_id) on each poll. source: Client source identifier for User-Agent. + previous_interaction_id: Interaction ID from a previous task to reuse as context. Returns: Dict with content and metadata. @@ -250,17 +269,24 @@ def run_research( """ client = create_client(api_key, source) - task = client.task_run.create( - input=query[:15000], - processor=processor, - ) + create_kwargs: dict[str, Any] = { + "input": query[:15000], + "processor": processor, + } + if previous_interaction_id: + create_kwargs["previous_interaction_id"] = previous_interaction_id + + task = client.task_run.create(**create_kwargs) run_id = task.run_id + interaction_id = getattr(task, "interaction_id", run_id) result_url = f"{PLATFORM_BASE}/play/deep-research/{run_id}" if on_status: on_status("created", run_id) - return _poll_until_complete(client, run_id, result_url, timeout, poll_interval, on_status) + return _poll_until_complete( + client, run_id, result_url, timeout, poll_interval, on_status, interaction_id=interaction_id + ) def poll_research( @@ -284,7 +310,7 @@ def poll_research( source: Client source identifier for User-Agent. Returns: - Dict with content and metadata. + Dict with content and metadata including interaction_id. """ client = create_client(api_key, source) result_url = f"{PLATFORM_BASE}/play/deep-research/{run_id}" diff --git a/parallel_web_tools/core/runner.py b/parallel_web_tools/core/runner.py index c7de4b8..80a6fb4 100644 --- a/parallel_web_tools/core/runner.py +++ b/parallel_web_tools/core/runner.py @@ -8,35 +8,40 @@ logger = logging.getLogger(__name__) -def _run_processor(parsed_schema: InputSchema, no_wait: bool = False) -> dict | None: +def _run_processor( + parsed_schema: InputSchema, no_wait: bool = False, previous_interaction_id: str | None = None +) -> dict | None: """Run the appropriate processor for the given schema.""" match parsed_schema.source_type: case SourceType.CSV: from parallel_web_tools.processors.csv import process_csv - return process_csv(parsed_schema, no_wait=no_wait) + return process_csv(parsed_schema, no_wait=no_wait, previous_interaction_id=previous_interaction_id) case SourceType.JSON: from parallel_web_tools.processors.json import process_json - return process_json(parsed_schema, no_wait=no_wait) + return process_json(parsed_schema, no_wait=no_wait, previous_interaction_id=previous_interaction_id) case SourceType.DUCKDB: from parallel_web_tools.processors.duckdb import process_duckdb - return process_duckdb(parsed_schema, no_wait=no_wait) + return process_duckdb(parsed_schema, no_wait=no_wait, previous_interaction_id=previous_interaction_id) case SourceType.BIGQUERY: from parallel_web_tools.processors.bigquery import process_bigquery - return process_bigquery(parsed_schema, no_wait=no_wait) + return process_bigquery(parsed_schema, no_wait=no_wait, previous_interaction_id=previous_interaction_id) case _: raise NotImplementedError(f"{parsed_schema.source_type} is not supported") -def run_enrichment(config_file: str | Path, no_wait: bool = False) -> dict | None: +def run_enrichment( + config_file: str | Path, no_wait: bool = False, previous_interaction_id: str | None = None +) -> dict | None: """Run data enrichment using a YAML config file. Args: config_file: Path to YAML configuration file no_wait: If True, return taskgroup info without waiting for completion. + previous_interaction_id: Interaction ID from a previous task to reuse as context. Example: >>> from parallel_web_tools import run_enrichment @@ -52,7 +57,7 @@ def run_enrichment(config_file: str | Path, no_wait: bool = False) -> dict | Non parsed_schema = parse_schema(schema) logger.info(f"Running enrichment: {parsed_schema.source} -> {parsed_schema.target}") - result = _run_processor(parsed_schema, no_wait=no_wait) + result = _run_processor(parsed_schema, no_wait=no_wait, previous_interaction_id=previous_interaction_id) if no_wait: return result @@ -61,12 +66,15 @@ def run_enrichment(config_file: str | Path, no_wait: bool = False) -> dict | Non return None -def run_enrichment_from_dict(config: dict, no_wait: bool = False) -> dict | None: +def run_enrichment_from_dict( + config: dict, no_wait: bool = False, previous_interaction_id: str | None = None +) -> dict | None: """Run data enrichment using a configuration dictionary. Args: config: Configuration dictionary matching YAML schema no_wait: If True, return taskgroup info without waiting for completion. + previous_interaction_id: Interaction ID from a previous task to reuse as context. Example: >>> config = { @@ -82,7 +90,7 @@ def run_enrichment_from_dict(config: dict, no_wait: bool = False) -> dict | None parsed_schema = parse_schema(config) logger.info(f"Running enrichment: {parsed_schema.source} -> {parsed_schema.target}") - result = _run_processor(parsed_schema, no_wait=no_wait) + result = _run_processor(parsed_schema, no_wait=no_wait, previous_interaction_id=previous_interaction_id) if no_wait: return result diff --git a/parallel_web_tools/processors/bigquery.py b/parallel_web_tools/processors/bigquery.py index a63f4dd..0650ad0 100644 --- a/parallel_web_tools/processors/bigquery.py +++ b/parallel_web_tools/processors/bigquery.py @@ -46,7 +46,9 @@ def fetch_all(conn: Connection, table: str) -> list[dict[str, Any]]: return [dict(row) for row in rows] -def process_bigquery(schema: InputSchema, no_wait: bool = False) -> dict[str, Any] | None: +def process_bigquery( + schema: InputSchema, no_wait: bool = False, previous_interaction_id: str | None = None +) -> dict[str, Any] | None: """Process BigQuery table and enrich data.""" InputModel, OutputModel = parse_input_and_output_models(schema) @@ -57,9 +59,13 @@ def process_bigquery(schema: InputSchema, no_wait: bool = False) -> dict[str, An data = fetch_all(conn, schema.source) if no_wait: - return create_task_group(data, InputModel, OutputModel, schema.processor) + return create_task_group( + data, InputModel, OutputModel, schema.processor, previous_interaction_id=previous_interaction_id + ) - output_rows = run_tasks(data, InputModel, OutputModel, schema.processor) + output_rows = run_tasks( + data, InputModel, OutputModel, schema.processor, previous_interaction_id=previous_interaction_id + ) df = pl.DataFrame(output_rows) _project, dataset, table = split_bq_name(schema.target) diff --git a/parallel_web_tools/processors/csv.py b/parallel_web_tools/processors/csv.py index 3d4fd94..a010452 100644 --- a/parallel_web_tools/processors/csv.py +++ b/parallel_web_tools/processors/csv.py @@ -10,7 +10,9 @@ logger = logging.getLogger(__name__) -def process_csv(schema: InputSchema, no_wait: bool = False) -> dict[str, Any] | None: +def process_csv( + schema: InputSchema, no_wait: bool = False, previous_interaction_id: str | None = None +) -> dict[str, Any] | None: """Process CSV file and enrich data.""" logger.info("Processing CSV file: %s", schema.source) @@ -24,10 +26,14 @@ def process_csv(schema: InputSchema, no_wait: bool = False) -> dict[str, Any] | data.append(dict(row)) if no_wait: - return create_task_group(data, InputModel, OutputModel, schema.processor) + return create_task_group( + data, InputModel, OutputModel, schema.processor, previous_interaction_id=previous_interaction_id + ) # Process all rows in batch - output_rows = run_tasks(data, InputModel, OutputModel, schema.processor) + output_rows = run_tasks( + data, InputModel, OutputModel, schema.processor, previous_interaction_id=previous_interaction_id + ) # Write results to target CSV with open(schema.target, "w", newline="") as f: diff --git a/parallel_web_tools/processors/duckdb.py b/parallel_web_tools/processors/duckdb.py index bfccaed..6f7079c 100644 --- a/parallel_web_tools/processors/duckdb.py +++ b/parallel_web_tools/processors/duckdb.py @@ -11,7 +11,9 @@ from parallel_web_tools.core.sql_utils import quote_identifier -def process_duckdb(schema: InputSchema, no_wait: bool = False) -> dict[str, Any] | None: +def process_duckdb( + schema: InputSchema, no_wait: bool = False, previous_interaction_id: str | None = None +) -> dict[str, Any] | None: """Process DuckDB table and enrich data.""" InputModel, OutputModel = parse_input_and_output_models(schema) duckdb_file = os.getenv("DUCKDB_FILE") @@ -25,9 +27,13 @@ def process_duckdb(schema: InputSchema, no_wait: bool = False) -> dict[str, Any] data = con.sql(f"SELECT * from {source_quoted}").pl().to_dicts() if no_wait: - return create_task_group(data, InputModel, OutputModel, schema.processor) + return create_task_group( + data, InputModel, OutputModel, schema.processor, previous_interaction_id=previous_interaction_id + ) - output_rows = run_tasks(data, InputModel, OutputModel, schema.processor) + output_rows = run_tasks( + data, InputModel, OutputModel, schema.processor, previous_interaction_id=previous_interaction_id + ) # Write output_rows to the target table df = pl.DataFrame(output_rows) # noqa: F841 diff --git a/parallel_web_tools/processors/json.py b/parallel_web_tools/processors/json.py index 014de0d..42769a8 100644 --- a/parallel_web_tools/processors/json.py +++ b/parallel_web_tools/processors/json.py @@ -10,7 +10,9 @@ logger = logging.getLogger(__name__) -def process_json(schema: InputSchema, no_wait: bool = False) -> dict[str, Any] | None: +def process_json( + schema: InputSchema, no_wait: bool = False, previous_interaction_id: str | None = None +) -> dict[str, Any] | None: """Process JSON file and enrich data.""" logger.info("Processing JSON file: %s", schema.source) @@ -21,10 +23,14 @@ def process_json(schema: InputSchema, no_wait: bool = False) -> dict[str, Any] | data = json.load(f) if no_wait: - return create_task_group(data, InputModel, OutputModel, schema.processor) + return create_task_group( + data, InputModel, OutputModel, schema.processor, previous_interaction_id=previous_interaction_id + ) # Process all rows in batch - output_rows = run_tasks(data, InputModel, OutputModel, schema.processor) + output_rows = run_tasks( + data, InputModel, OutputModel, schema.processor, previous_interaction_id=previous_interaction_id + ) # Write results to target JSON with open(schema.target, "w") as f: diff --git a/tests/test_enrichment.py b/tests/test_enrichment.py index 7c87a0b..0c9e4a2 100644 --- a/tests/test_enrichment.py +++ b/tests/test_enrichment.py @@ -731,6 +731,7 @@ def test_delegates_to_enrich_batch(self): timeout=300, include_basis=True, source="python", + previous_interaction_id=None, ) assert result == {"ceo_name": "Test CEO"} diff --git a/tests/test_research.py b/tests/test_research.py index 065cbeb..f5848eb 100644 --- a/tests/test_research.py +++ b/tests/test_research.py @@ -294,6 +294,7 @@ def test_research_run_with_input_file(self, runner, tmp_path): with mock.patch("parallel_web_tools.cli.commands.create_research_task") as mock_create: mock_create.return_value = { "run_id": "trun_123", + "interaction_id": "trun_123", "result_url": "https://platform.parallel.ai/play/deep-research/trun_123", "status": "pending", } @@ -310,6 +311,7 @@ def test_research_run_no_wait(self, runner): with mock.patch("parallel_web_tools.cli.commands.create_research_task") as mock_create: mock_create.return_value = { "run_id": "trun_123", + "interaction_id": "trun_123", "result_url": "https://platform.parallel.ai/play/deep-research/trun_123", "status": "pending", } @@ -325,6 +327,7 @@ def test_research_run_json_output(self, runner): with mock.patch("parallel_web_tools.cli.commands.create_research_task") as mock_create: mock_create.return_value = { "run_id": "trun_123", + "interaction_id": "trun_123", "result_url": "https://platform.parallel.ai/play/deep-research/trun_123", "status": "pending", } @@ -362,6 +365,104 @@ def test_research_run_with_wait(self, runner): assert "Research Complete" in result.output mock_run.assert_called_once() + def test_research_run_with_previous_interaction_id_no_wait(self, runner): + """Should pass previous_interaction_id to create_research_task.""" + with mock.patch("parallel_web_tools.cli.commands.create_research_task") as mock_create: + mock_create.return_value = { + "run_id": "trun_456", + "interaction_id": "trun_456", + "result_url": "https://platform.parallel.ai/play/deep-research/trun_456", + "status": "pending", + } + + result = runner.invoke( + main, + [ + "research", + "run", + "Follow-up question?", + "--no-wait", + "--previous-interaction-id", + "trun_123", + ], + ) + + assert result.exit_code == 0 + mock_create.assert_called_once() + call_kwargs = mock_create.call_args + assert call_kwargs.kwargs.get("previous_interaction_id") == "trun_123" + + def test_research_run_with_previous_interaction_id_wait(self, runner): + """Should pass previous_interaction_id to run_research.""" + with mock.patch("parallel_web_tools.cli.commands.run_research") as mock_run: + mock_run.return_value = { + "run_id": "trun_456", + "interaction_id": "trun_456", + "result_url": "https://platform.parallel.ai/play/deep-research/trun_456", + "status": "completed", + "output": {"content": {"text": "Follow-up findings"}}, + } + + result = runner.invoke( + main, + [ + "research", + "run", + "Follow-up question?", + "--poll-interval", + "1", + "--previous-interaction-id", + "trun_123", + ], + ) + + assert result.exit_code == 0 + mock_run.assert_called_once() + call_kwargs = mock_run.call_args + assert call_kwargs.kwargs.get("previous_interaction_id") == "trun_123" + + def test_research_run_shows_interaction_id(self, runner): + """Should display interaction_id in no-wait output.""" + with mock.patch("parallel_web_tools.cli.commands.create_research_task") as mock_create: + mock_create.return_value = { + "run_id": "trun_123", + "interaction_id": "trun_int_abc", + "result_url": "https://platform.parallel.ai/play/deep-research/trun_123", + "status": "pending", + } + + result = runner.invoke(main, ["research", "run", "What is AI?", "--no-wait"]) + + assert result.exit_code == 0 + assert "trun_int_abc" in result.output + assert "previous-interaction-id" in result.output + + def test_research_run_json_includes_interaction_id(self, runner): + """Should include interaction_id in JSON output.""" + with mock.patch("parallel_web_tools.cli.commands.create_research_task") as mock_create: + mock_create.return_value = { + "run_id": "trun_123", + "interaction_id": "trun_int_abc", + "result_url": "https://platform.parallel.ai/play/deep-research/trun_123", + "status": "pending", + } + + result = runner.invoke(main, ["research", "run", "What is AI?", "--no-wait", "--json"]) + + assert result.exit_code == 0 + lines = result.output.strip().split("\n") + json_lines = [] + in_json = False + for line in lines: + if line.strip().startswith("{"): + in_json = True + if in_json: + json_lines.append(line) + if in_json and line.strip().startswith("}"): + break + output = json.loads("\n".join(json_lines)) + assert output["interaction_id"] == "trun_int_abc" + class TestResearchStatusCommand: """Tests for the research status command."""