Skip to content

refactor: async methods in components#9887

Merged
edwinjosechittilappilly merged 6 commits into
mainfrom
fix-aysnc-gettools
Sep 16, 2025
Merged

refactor: async methods in components#9887
edwinjosechittilappilly merged 6 commits into
mainfrom
fix-aysnc-gettools

Conversation

@edwinjosechittilappilly
Copy link
Copy Markdown
Collaborator

@edwinjosechittilappilly edwinjosechittilappilly commented Sep 16, 2025

@coderabbitai

Summary by CodeRabbit

  • New Features

    • Starter agents enhanced with structured JSON outputs, optional current-date tool, and memory-aware responses (e.g., Simple Agent, Pokédex, News Aggregator).
    • Chat output now supports a Data Template to convert data into text.
  • Improvements

    • Knowledge Ingestion and Retrieval rebuilt on the new LFx framework for more robust ingestion, retrieval, and metadata handling.
    • Asynchronous tool handling across agents for smoother operations and better compatibility.
  • Bug Fixes

    • Corrected provider mapping in the Social Media Agent to ensure proper model configuration.

…onents

- Updated code hashes in multiple starter project JSON files to ensure consistency.
- Refactored agent components to use async methods for tool retrieval, enhancing performance and compatibility with asynchronous operations.
- Adjusted test cases to accommodate the new async behavior in agent workflows.

Co-authored-by: Edwin Jose <edwin.jose@datastax.com>
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Sep 16, 2025

Walkthrough

Converts tool-building APIs to async across LFx components, updates multiple starter Agent implementations accordingly, and migrates Knowledge Ingestion/Retrieval components to LFx modules. Adds a ChatOutput data_template input in one starter. Tests are adapted to await async to_toolkit. Several starter project Agent code blocks are replaced with updated implementations.

Changes

Cohort / File(s) Summary
LFx core async toolkit
src/lfx/src/lfx/base/agents/agent.py, src/lfx/src/lfx/base/composio/composio_base.py, src/lfx/src/lfx/custom/custom_component/component.py
Made _get_tools async; made to_toolkit async; added _update_tools_with_metadata; updated internal flows to await tool retrieval and apply metadata.
Starter Agents updated (async tools, structured-output, misc fixes)
src/backend/base/langflow/initial_setup/starter_projects/Market Research.json, .../News Aggregator.json, .../Simple Agent.json, .../Search agent.json, .../Price Deal Finder.json, .../Research Agent.json, .../SaaS Pricing.json, .../Nvidia Remix.json, .../Pokédex Agent.json
Replaced Agent code blocks (code_hash → 52dda82053c0). Common changes: await to_toolkit, async _get_tools, structured-output paths, memory/tool orchestration, and targeted fixes (e.g., provider dict reference).
Knowledge Base migration to LFx
.../Knowledge Ingestion.json, .../Knowledge Retrieval.json
Rewrote components to LFx modules (custom_components.*), updated imports/dependencies, switched to LFx Data/DataFrame, updated UI schema, and preserved core ingestion/retrieval logic with LFx APIs.
Invoice Summarizer prompt refactor
.../Invoice Summarizer.json
Prompt component logic updated: integrates update_template_values, awaits current-date tool; Agent code_hash updated.
Instagram Copywriter UI update
.../Instagram Copywriter.json
Agent code block replaced; added data_template (PromptInput) to ChatOutput template with default "{text}".
YouTube Analysis hashes
.../Youtube Analysis.json
Updated code_hash for YouTubeComments and Agent; no functional code changes observed.
Tests adapted to async to_toolkit
src/backend/tests/data/simple_agent.py, src/backend/tests/unit/test_simple_agent_in_lfx_run.py
Updated tests to await to_toolkit (converted to async tests where needed) and used asyncio.run in a script.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant UI as Starter Flow (Agent)
  participant Agent as AgentComponent
  participant Comp as Component (Tool Provider)
  participant TK as ComponentToolkit
  participant Tool as Tools

  rect rgb(245,248,252)
  note over Agent,Comp: Async tool-building path (new)
  UI->>Agent: get_agent_requirements()
  Agent->>Comp: to_toolkit() (await)
  Comp->>Comp: _get_tools() (await)
  Comp->>TK: build tools
  TK-->>Comp: list[Tool]
  Comp->>TK: update tools with metadata (optional)
  TK-->>Comp: list[Tool]
  Comp-->>Agent: list[Tool]
  Agent-->>UI: llm, tools, memory
  end
Loading
sequenceDiagram
  autonumber
  participant User as User
  participant KI as KnowledgeIngestion (LFx)
  participant DF as DataFrame
  participant VS as Vector Store (Chroma)
  participant Emb as Embeddings

  User->>KI: run(input_value, config)
  KI->>KI: validate inputs, provider, keys
  KI->>Emb: init embeddings (OpenAI/HF/Cohere)
  KI->>DF: convert_to_dataframe(input_value, auto_parse=False)
  KI->>VS: create/load collection
  KI->>VS: add documents (skip duplicates)
  VS-->>KI: upsert result
  KI-->>User: metadata (provider, model, created_at)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

Suggested labels

size:XXL

Suggested reviewers

  • ogabrielluiz
  • jordanrfrazier

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 53.33% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title succinctly and accurately summarizes the primary change in this PR: converting component methods to asynchronous variants (notably _get_tools and to_toolkit) across multiple lfx modules and related starter-project updates. It is concise, on-topic, and communicates the main refactor to a reviewer scanning history.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix-aysnc-gettools

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions Bot added refactor Maintenance tasks and housekeeping and removed refactor Maintenance tasks and housekeeping labels Sep 16, 2025
@github-actions github-actions Bot added refactor Maintenance tasks and housekeeping and removed refactor Maintenance tasks and housekeeping labels Sep 16, 2025
@github-actions github-actions Bot added lgtm This PR has been approved by a maintainer refactor Maintenance tasks and housekeeping and removed refactor Maintenance tasks and housekeeping labels Sep 16, 2025
@github-actions github-actions Bot added refactor Maintenance tasks and housekeeping and removed refactor Maintenance tasks and housekeeping labels Sep 16, 2025
@sonarqubecloud
Copy link
Copy Markdown

@codecov
Copy link
Copy Markdown

codecov Bot commented Sep 16, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 22.85%. Comparing base (fbf2a4a) to head (35a1282).
⚠️ Report is 1 commits behind head on main.

❌ Your project status has failed because the head coverage (8.12%) is below the target coverage (10.00%). You can increase the head coverage or adjust the target coverage.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #9887      +/-   ##
==========================================
- Coverage   22.87%   22.85%   -0.03%     
==========================================
  Files        1086     1086              
  Lines       39710    39710              
  Branches     5418     5418              
==========================================
- Hits         9083     9075       -8     
- Misses      30472    30480       +8     
  Partials      155      155              
Flag Coverage Δ
backend 46.43% <ø> (-0.06%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.
see 1 file with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@github-actions github-actions Bot added refactor Maintenance tasks and housekeeping and removed refactor Maintenance tasks and housekeeping labels Sep 16, 2025
@edwinjosechittilappilly edwinjosechittilappilly added this pull request to the merge queue Sep 16, 2025
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (16)
src/lfx/src/lfx/base/agents/agent.py (1)

257-269: Await async _get_tools and fix un-awaited call sites.

Un-awaited calls found:

  • src/lfx/src/lfx/custom/custom_component/component.py — _get_tools at line 1428 (awaited=False; other calls in same file: 1328 awaited=True, 1426 awaited=True).
  • src/backend/tests/data/simple_agent.py — to_toolkit at line 35 (awaited=False).
  • src/backend/tests/unit/test_simple_agent_in_lfx_run.py — to_toolkit at line 332 (awaited=False).

Change these to await (e.g. await self._get_tools(...)) or revert to a synchronous contract; verify all overrides remain async and update tests accordingly.

src/lfx/src/lfx/base/composio/composio_base.py (1)

1953-1957: Offload blocking composio.tools.get in async _get_tools.

configure_tools calls composio.tools.get synchronously (blocking). Offload the SDK call to a thread to avoid stalling the event loop and ensure any mutations to self are thread-safe.

File: src/lfx/src/lfx/base/composio/composio_base.py (lines ~1953-1957)

@@
-    async def _get_tools(self) -> list[Tool]:
-        """Get tools with cached results and optimized name sanitization."""
-        composio = self._build_wrapper()
-        self.set_default_tools()
-        return self.configure_tools(composio)
+    async def _get_tools(self) -> list[Tool]:
+        """Get tools with cached results and optimized name sanitization."""
+        composio = self._build_wrapper()
+        self.set_default_tools()
+        # Offload potentially blocking SDK call.
+        return await asyncio.to_thread(self.configure_tools, composio)

Add import at top-level (outside this hunk):

import asyncio

If composio.tools.get is guaranteed non‑blocking/purely in‑memory, keep it sync; otherwise apply the change.

src/backend/base/langflow/initial_setup/starter_projects/Market Research.json (1)

2321-2736: Avoid IndexError when CurrentDateComponent returns no tools.

pop(0) on the awaited toolkit can raise IndexError if the list is empty. Guard and use the first item safely.

Apply this diff inside AgentComponent.get_agent_requirements:

-            current_date_tool = (await CurrentDateComponent(**self.get_base_args()).to_toolkit()).pop(0)
-            if not isinstance(current_date_tool, StructuredTool):
-                msg = "CurrentDateComponent must be converted to a StructuredTool"
-                raise TypeError(msg)
-            self.tools.append(current_date_tool)
+            tools = await CurrentDateComponent(**self.get_base_args()).to_toolkit()
+            if not tools:
+                await logger.aerror("CurrentDateComponent.to_toolkit() returned no tools")
+            else:
+                current_date_tool = tools[0]
+                if not isinstance(current_date_tool, StructuredTool):
+                    msg = "CurrentDateComponent must be converted to a StructuredTool"
+                    raise TypeError(msg)
+                self.tools.append(current_date_tool)
src/backend/tests/unit/test_simple_agent_in_lfx_run.py (1)

330-333: Fix remaining synchronous to_toolkit call.

to_toolkit is now async; this test will fail when not skipped.

Two options; pick one:

  • Convert test to async:
-    def test_complete_workflow_integration(self):
+    @pytest.mark.asyncio
+    async def test_complete_workflow_integration(self):
@@
-        tools = url_component.to_toolkit()
+        tools = await url_component.to_toolkit()
  • Or keep it sync via asyncio.run:
+        import asyncio
@@
-        tools = url_component.to_toolkit()
+        tools = asyncio.run(url_component.to_toolkit())
src/backend/base/langflow/initial_setup/starter_projects/Invoice Summarizer.json (1)

1290-1413: Same pop(0) tool guard needed in Agent.

Mirror the safety fix from Market Research to prevent IndexError when adding CurrentDate tool.

-            current_date_tool = (await CurrentDateComponent(**self.get_base_args()).to_toolkit()).pop(0)
+            tools = await CurrentDateComponent(**self.get_base_args()).to_toolkit()
+            if not tools:
+                await logger.aerror("CurrentDateComponent.to_toolkit() returned no tools")
+                return llm_model, self.chat_history, self.tools
+            current_date_tool = tools[0]

…and keep the existing type check and append.

src/lfx/src/lfx/components/agents/agent.py (1)

182-187: Guard against empty toolkit before pop(0).

Avoid IndexError if CurrentDateComponent yields no tools.

-            current_date_tool = (await CurrentDateComponent(**self.get_base_args()).to_toolkit()).pop(0)
-            if not isinstance(current_date_tool, StructuredTool):
+            tools = await CurrentDateComponent(**self.get_base_args()).to_toolkit()
+            if not tools:
+                await logger.aerror("CurrentDateComponent.to_toolkit() returned no tools")
+                return llm_model, self.chat_history, self.tools
+            current_date_tool = tools[0]
+            if not isinstance(current_date_tool, StructuredTool):
                 msg = "CurrentDateComponent must be converted to a StructuredTool"
                 raise TypeError(msg)
             self.tools.append(current_date_tool)
src/backend/base/langflow/initial_setup/starter_projects/Nvidia Remix.json (3)

660-760: Fix invalid isinstance unions and handle Generator safely in ChatOutput.

  • PEP 604 unions (A | B) are invalid in isinstance; use a tuple.
  • If convert_to_string returns a Generator, message_response currently assigns it to Message.text. Convert to str or gate on streaming.
-        if isinstance(self.input_value, list) and not all(
-            isinstance(item, Message | Data | DataFrame | str) for item in self.input_value
-        ):
+        if isinstance(self.input_value, list) and not all(
+            isinstance(item, (Message, Data, DataFrame, str)) for item in self.input_value
+        ):
@@
-        if not isinstance(
-            self.input_value,
-            Message | Data | DataFrame | str | list | Generator | type(None),
-        ):
+        if not isinstance(
+            self.input_value,
+            (Message, Data, DataFrame, str, list, Generator, type(None)),
+        ):
@@
-    def convert_to_string(self) -> str | Generator[Any, None, None]:
+    def convert_to_string(self) -> str | Generator[Any, None, None]:
@@
-        if isinstance(self.input_value, Generator):
-            return self.input_value
-        return safe_convert(self.input_value)
+        if isinstance(self.input_value, Generator):
+            return self.input_value  # streaming path
+        return safe_convert(self.input_value, clean_data=self.clean_data)

And in message_response, ensure text is a string:

-        # Create or use existing Message object
+        # Create or use existing Message object
         if isinstance(self.input_value, Message):
             message = self.input_value
             # Update message properties
-            message.text = text
+            if isinstance(text, Generator):
+                text = "".join(chunk for chunk in text)
+            message.text = text
         else:
-            message = Message(text=text)
+            if isinstance(text, Generator):
+                text = "".join(chunk for chunk in text)
+            message = Message(text=text)

2231-2252: Defaulting “Allow Dangerous Deserialization” to true is unsafe.

Pickle deserialization can lead to code execution. Default should be false; keep it advanced and document clearly.

-        BoolInput(
+        BoolInput(
             name="allow_dangerous_deserialization",
             display_name="Allow Dangerous Deserialization",
@@
-            value=True,
+            value=False,
         ),

Also flip the template value below:

-                "value": true
+                "value": false

And consider gating loads behind a signed artifact policy or path allowlist.


1789-1840: Network call without timeout/retry in RemixDocumentation.

httpx.get without an explicit timeout can hang the worker. Add a bounded timeout and error handling.

-        response = httpx.get(search_index_url, follow_redirects=True)
+        try:
+            response = httpx.get(search_index_url, follow_redirects=True, timeout=10.0)
+        except httpx.RequestError as e:
+            raise ValueError(f"Failed to fetch search index: {e!s}") from e
src/lfx/src/lfx/custom/custom_component/component.py (1)

1314-1336: API change — to_toolkit() is async: await all call sites

  • Fix src/backend/tests/unit/test_simple_agent_in_lfx_run.py:332 — change
    tools = url_component.to_toolkit()
    to
    tools = await url_component.to_toolkit()
  • Review src/backend/tests/data/simple_agent.py:35 — it uses asyncio.run(url_component.to_toolkit()) (acceptable for standalone scripts); change only if you want native async usage.
src/backend/base/langflow/initial_setup/starter_projects/Research Agent.json (2)

2831-3247: Guard against empty toolkit and type-mismatch when adding CurrentDate tool.

to_toolkit() may return an empty list or unexpected tool type; pop(0) will raise IndexError and crash agent setup.

Apply:

-            current_date_tool = (await CurrentDateComponent(**self.get_base_args()).to_toolkit()).pop(0)
-            if not isinstance(current_date_tool, StructuredTool):
-                msg = "CurrentDateComponent must be converted to a StructuredTool"
-                raise TypeError(msg)
-            self.tools.append(current_date_tool)
+            toolkit = await CurrentDateComponent(**self.get_base_args()).to_toolkit()
+            if not toolkit:
+                await logger.awarn("CurrentDateComponent returned no tools; skipping.")
+            else:
+                tool = toolkit[0]
+                if not isinstance(tool, StructuredTool):
+                    msg = "CurrentDateComponent must be converted to a StructuredTool"
+                    raise TypeError(msg)
+                self.tools.append(tool)

2831-3247: Fix schema prompt in json_response (currently asks the model to “extract only the JSON schema”).

The current system text instructs the model to output just the schema itself, not data conforming to the schema. This degrades JSON-output mode.

Use schema as a constraint instead:

-                    schema_info = (
-                        "You are given some text that may include format instructions, "
-                        "explanations, or other content alongside a JSON schema.\n\n"
-                        "Your task:\n"
-                        "- Extract only the JSON schema.\n"
-                        "- Return it as valid JSON.\n"
-                        "- Do not include format instructions, explanations, or extra text.\n\n"
-                        "Input:\n"
-                        f"{json.dumps(schema_dict, indent=2)}\n\n"
-                        "Output (only JSON schema):"
-                    )
+                    schema_info = (
+                        "You must produce JSON that strictly conforms to the following JSON schema.\n"
+                        "Return only JSON (no prose). If multiple records match, return an array of objects.\n"
+                        f"Schema:\n{json.dumps(schema_dict, indent=2)}"
+                    )
src/backend/base/langflow/initial_setup/starter_projects/Knowledge Ingestion.json (3)

868-1122: Uninitialized embedding_model/api_key can crash ingestion.

In build_kb_info, embedding_model and api_key are only set when metadata exists; otherwise they may be undefined before use and before writing metadata when a user supplies a fresh API key.

Apply:

@@
-            metadata_path = kb_path / "embedding_metadata.json"
-
-            # If the API key is not provided, try to read it from the metadata file
-            if metadata_path.exists():
+            metadata_path = kb_path / "embedding_metadata.json"
+            embedding_model = None
+            api_key = None
+            # If metadata exists, prefer it
+            if metadata_path.exists():
                 settings_service = get_settings_service()
                 metadata = json.loads(metadata_path.read_text())
-                embedding_model = metadata.get("embedding_model")
+                embedding_model = metadata.get("embedding_model")
                 try:
                     api_key = decrypt_api_key(metadata["api_key"], settings_service)
                 except (InvalidToken, TypeError, ValueError) as e:
                     logger.error(f"Could not decrypt API key. Please provide it manually. Error: {e}")
 
-            # Check if a custom API key was provided, update metadata if so
-            if self.api_key:
-                api_key = self.api_key
+            # Allow override via input
+            if self.api_key:
+                api_key = self.api_key
                 self._save_embedding_metadata(
                     kb_path=kb_path,
-                    embedding_model=embedding_model,
+                    embedding_model=embedding_model,
                     api_key=api_key,
                 )
+
+            # Final validation
+            if not embedding_model or not api_key:
+                msg = "Embedding model and API key are required. Create the knowledge base via the dialog or provide both."
+                raise ValueError(msg)

868-1122: Timeout exception type is wrong; asyncio.wait_for raises asyncio.TimeoutError.

Catching built-in TimeoutError will miss the timeout.

Apply:

-                except TimeoutError as e:
+                except asyncio.TimeoutError as e:
                     msg = "Embedding validation timed out. Please verify network connectivity and key."
                     raise ValueError(msg) from e

868-1122: Vectorize/Identifier handling leads to duplicate IDs and wrong content.

  • A column marked both vectorize and identifier will not be added to identifier_cols due to elif, making hashes empty and all rows collide.
  • Data objects place text inside metadata instead of Data.text.

Apply:

@@
-        for config in config_list:
+        for config in config_list:
             col_name = config.get("column_name")
             vectorize = config.get("vectorize") == "True" or config.get("vectorize") is True
             identifier = config.get("identifier") == "True" or config.get("identifier") is True
-
-            if vectorize:
-                content_cols.append(col_name)
-            elif identifier:
-                identifier_cols.append(col_name)
+            if vectorize:
+                content_cols.append(col_name)
+            if identifier:
+                identifier_cols.append(col_name)
@@
-        for _, row in df_source.iterrows():
-            # Build content text from identifier columns using list comprehension
-            identifier_parts = [str(row[col]) for col in content_cols if col in row and pd.notna(row[col])]
-
-            # Join all parts into a single string
-            page_content = " ".join(identifier_parts)
-
-            # Build metadata from NON-vectorized columns only (simple key-value pairs)
-            data_dict = {
-                "text": page_content,  # Main content for vectorization
-            }
-
-            # Add identifier columns if they exist
-            if identifier_cols:
-                identifier_parts = [str(row[col]) for col in identifier_cols if col in row and pd.notna(row[col])]
-                page_content = " ".join(identifier_parts)
+        for _, row in df_source.iterrows():
+            # Build main content from vectorized columns
+            content_parts = [str(row[col]) for col in content_cols if col in row and pd.notna(row[col])]
+            page_content = " ".join(content_parts)
+
+            # Build identifier string; if none configured, fall back to content columns
+            id_parts = [str(row[col]) for col in identifier_cols if col in row and pd.notna(row[col])] or content_parts
+            id_string = " ".join(id_parts)
@@
-            for col in df_source.columns:
+            for col in df_source.columns:
                 if col not in content_cols and col in row and pd.notna(row[col]):
                     # Convert to simple types for Chroma metadata
                     value = row[col]
                     data_dict[col] = str(value)  # Convert complex types to string
@@
-            page_content_hash = hashlib.sha256(page_content.encode()).hexdigest()
+            page_content_hash = hashlib.sha256(id_string.encode()).hexdigest()
             data_dict["_id"] = page_content_hash
@@
-            # Create Data object - everything except "text" becomes metadata
-            data_obj = Data(data=data_dict)
+            # Create Data object with proper text
+            data_obj = Data(text=page_content, data=data_dict)
             data_objects.append(data_obj)
src/backend/base/langflow/initial_setup/starter_projects/Pokédex Agent.json (1)

1353-1370: Expose the new structured output handle in node.outputs.

The Agent code now defines an Output named structured_response (json_response), but this node.outputs block exposes only response. Without a second handle, users can’t wire the structured output in the UI.

Apply this diff to add the handle:

         "outputs": [
           {
             "allows_loop": false,
             "cache": true,
             "display_name": "Response",
             "group_outputs": false,
             "method": "message_response",
             "name": "response",
             "options": null,
             "required_inputs": null,
             "selected": "Message",
             "tool_mode": true,
             "types": [
               "Message"
             ],
             "value": "__UNDEFINED__"
           }
+          ,
+          {
+            "allows_loop": false,
+            "cache": true,
+            "display_name": "Structured Response",
+            "group_outputs": false,
+            "method": "json_response",
+            "name": "structured_response",
+            "options": null,
+            "required_inputs": null,
+            "selected": "Data",
+            "tool_mode": false,
+            "types": [
+              "Data"
+            ],
+            "value": "__UNDEFINED__"
+          }
         ],
♻️ Duplicate comments (5)
src/backend/base/langflow/initial_setup/starter_projects/SaaS Pricing.json (1)

1080-1081: Same JSON parsing robustness issue as in YouTube Analysis — apply the refactor here too.

Replicate the non-greedy, fence-aware, list-capable extraction in build_structured_output_base to reduce parsing failures with mixed-content LLM outputs.

Would you like me to open a small follow-up PR applying the same patch across all updated starters that include this Agent component?

src/backend/base/langflow/initial_setup/starter_projects/Social Media Agent.json (2)

1560-1561: Same pop(0) edge case as noted in Search agent.

Please apply the toolkit emptiness guard here too.

-            current_date_tool = (await CurrentDateComponent(**self.get_base_args()).to_toolkit()).pop(0)
+            toolkit = await CurrentDateComponent(**self.get_base_args()).to_toolkit()
+            if not toolkit:
+                raise ValueError("CurrentDateComponent produced no tools")
+            current_date_tool = toolkit[0]

1560-1561: Use self.get_tool_name() instead of "Call_Agent".

Keeps tool naming consistent with base class.

-        tools = component_toolkit(component=self).get_tools(
-            tool_name="Call_Agent", tool_description=description, callbacks=self.get_langchain_callbacks()
-        )
+        tools = component_toolkit(component=self).get_tools(
+            tool_name=self.get_tool_name(),
+            tool_description=description,
+            callbacks=self.get_langchain_callbacks(),
+        )
src/backend/base/langflow/initial_setup/starter_projects/Nvidia Remix.json (1)

700-740: Consistency: pass clean_data flag in safe_convert single-value path.

List path honors clean_data; single-value path didn’t. Fixed in earlier diff, noting here for intent.

src/backend/base/langflow/initial_setup/starter_projects/Simple Agent.json (1)

1202-1203: Apply same AgentComponent fixes as in News Aggregator

Mirror the empty-toolkit guard, inputs None guards, and regex tweak here.

🧹 Nitpick comments (24)
src/backend/base/langflow/initial_setup/starter_projects/Youtube Analysis.json (1)

939-940: Harden JSON extraction in structured output path (non-greedy + code-fence cleanup + list support).

Current json_pattern = r"\{.*\}" is greedy and misses fenced/list JSON embedded in text, yielding false negatives/positives.

Apply inside AgentComponent’s build_structured_output_base:

-    async def build_structured_output_base(self, content: str):
-        """Build structured output with optional BaseModel validation."""
-        json_pattern = r"\{.*\}"
+    async def build_structured_output_base(self, content: str):
+        """Build structured output with optional BaseModel validation."""
+        # Support fenced blocks and both object/list payloads; make non-greedy to avoid spanning across text.
+        import re
+        fenced = re.sub(r"^\s*```(?:json)?\s*|\s*```\s*$", "", content.strip(), flags=re.IGNORECASE)
+        json_pattern = r"(\{.*?\}|\[.*?\])"
-        schema_error_msg = "Try setting an output schema"
+        schema_error_msg = "Try setting an output schema"
@@
-        try:
-            json_data = json.loads(content)
+        try:
+            json_data = json.loads(fenced)
         except json.JSONDecodeError:
-            json_match = re.search(json_pattern, content, re.DOTALL)
+            json_match = re.search(json_pattern, fenced, re.DOTALL)
             if json_match:
                 try:
                     json_data = json.loads(json_match.group())
                 except json.JSONDecodeError:
                     return {"content": content, "error": schema_error_msg}
             else:
                 return {"content": content, "error": schema_error_msg}
src/backend/base/langflow/initial_setup/starter_projects/Search agent.json (3)

1203-1204: Guard against empty CurrentDateComponent toolkit before pop(0).

If to_toolkit returns [], pop(0) raises IndexError. Add a check.

-            current_date_tool = (await CurrentDateComponent(**self.get_base_args()).to_toolkit()).pop(0)
+            toolkit = await CurrentDateComponent(**self.get_base_args()).to_toolkit()
+            if not toolkit:
+                raise ValueError("CurrentDateComponent produced no tools")
+            current_date_tool = toolkit[0]

1203-1204: Use the component’s name for the tool instead of hard‑coding "Call_Agent".

Aligns with base class behavior and improves UX.

-        tools = component_toolkit(component=self).get_tools(
-            tool_name="Call_Agent", tool_description=description, callbacks=self.get_langchain_callbacks()
-        )
+        tools = component_toolkit(component=self).get_tools(
+            tool_name=self.get_tool_name(),
+            tool_description=description,
+            callbacks=self.get_langchain_callbacks(),
+        )

1203-1204: Greedy JSON extraction can mis-parse content.

json_pattern = r"{.*}" is greedy; prefer parsing fenced JSON first and use a non-greedy fallback.

-        json_pattern = r"\{.*\}"
+        # Prefer fenced JSON blocks, fallback to first balanced-looking object
+        json_pattern = r"```json\s*(\{[\s\S]*?\})\s*```|(\{[\s\S]*?\})"
@@
-            json_match = re.search(json_pattern, content, re.DOTALL)
-            if json_match:
-                try:
-                    json_data = json.loads(json_match.group())
+            m = re.search(json_pattern, content, re.DOTALL)
+            if m:
+                candidate = m.group(1) or m.group(2)
+                try:
+                    json_data = json.loads(candidate)
src/backend/base/langflow/initial_setup/starter_projects/Price Deal Finder.json (1)

1864-1864: Guard against empty to_toolkit() before pop and confirm no blocking in async _get_tools.

  • pop(0) will raise IndexError if to_toolkit() returns an empty list.
  • Your async _get_tools uses a likely sync get_tools(...); avoid blocking the event loop.
-            current_date_tool = (await CurrentDateComponent(**self.get_base_args()).to_toolkit()).pop(0)
+            tools_list = await CurrentDateComponent(**self.get_base_args()).to_toolkit()
+            if not tools_list:
+                raise ValueError("CurrentDateComponent.to_toolkit() returned no tools")
+            current_date_tool = tools_list[0]

If component_toolkit(...).get_tools(...) does IO, wrap it:

 async def _get_tools(self) -> list[Tool]:
     component_toolkit = get_component_toolkit()
@@
-    tools = component_toolkit(component=self).get_tools(
-        tool_name="Call_Agent", tool_description=description, callbacks=self.get_langchain_callbacks()
-    )
+    tools = await asyncio.to_thread(
+        component_toolkit(component=self).get_tools,
+        tool_name="Call_Agent",
+        tool_description=description,
+        callbacks=self.get_langchain_callbacks(),
+    )

(Remember to import asyncio in the embedded code.)

Please confirm whether get_tools is synchronous and can block.

src/backend/tests/unit/test_simple_agent_in_lfx_run.py (3)

142-165: Mark async test with pytest.mark.asyncio.

Per repo test guidelines, decorate async tests.

-    async def test_agent_workflow_direct_execution(self):
+    @pytest.mark.asyncio
+    async def test_agent_workflow_direct_execution(self):

211-225: Mark async test with pytest.mark.asyncio.

Same decorator needed here.

-    async def test_url_component_to_toolkit_functionality(self):
+    @pytest.mark.asyncio
+    async def test_url_component_to_toolkit_functionality(self):

51-56: Keep documentation sample consistent with async API.

The embedded script uses url_component.to_toolkit() without await. Consider switching the sample to asyncio.run(...) and update related assertions.

- tools = url_component.to_toolkit()
+ import asyncio
+ tools = asyncio.run(url_component.to_toolkit())
src/backend/base/langflow/initial_setup/starter_projects/Nvidia Remix.json (2)

1410-1424: System prompt value is truncated (“Before ”). Use empty default or complete it.

This starter wires a Prompt node into system_prompt, so leaving a half sentence is confusing.

-                "value": "You are a helpful assistant that must use tools to answer questions and perform tasks regarding RTX Remix.\n\nBefore "
+                "value": ""

2470-2660: MCP default tool selection may 404 cache; tolerate missing tool.

If the default “remix_lock_layer” isn’t present in cache, build_output will KeyError on _tool_cache[self.tool]. Guard it.

-                exec_tool = self._tool_cache[self.tool]
+                exec_tool = self._tool_cache.get(self.tool)
+                if exec_tool is None:
+                    return DataFrame(data=[{"error": f"Tool '{self.tool}' not available for selected server"}])
src/backend/base/langflow/initial_setup/starter_projects/Instagram Copywriter.json (1)

1205-1226: Wire the new ChatOutput.data_template into conversion path

You added data_template but ChatOutput.convert_to_string/safe_convert aren’t using it here. If safe_convert supports a template parameter, pass it so the input actually affects rendering.

Apply this minimal change inside the ChatOutput code string:

-        if isinstance(self.input_value, list):
-            return "\n".join([safe_convert(item, clean_data=self.clean_data) for item in self.input_value])
+        if isinstance(self.input_value, list):
+            return "\n".join([safe_convert(item, clean_data=self.clean_data, template=self.data_template) for item in self.input_value])
-        if isinstance(self.input_value, Generator):
+        if isinstance(self.input_value, Generator):
             return self.input_value
-        return safe_convert(self.input_value)
+        return safe_convert(self.input_value, clean_data=self.clean_data, template=self.data_template)

If safe_convert doesn’t accept template, either extend it or handle templating here before building Message.

src/backend/base/langflow/initial_setup/starter_projects/Knowledge Retrieval.json (3)

654-655: Avoid accessing Chroma private attributes for embeddings

Using chroma._client.get_collection(...) is a private API and can break across versions. Prefer public methods or guard with hasattr and fail gracefully if unavailable.

-                collection = chroma._client.get_collection(name=self.knowledge_base)
+                # Avoid private API; fall back only if present
+                if hasattr(chroma, "_client"):
+                    collection = chroma._client.get_collection(name=self.knowledge_base)
+                else:
+                    raise RuntimeError("Chroma client not accessible; cannot fetch embeddings safely")

Consider exposing an official path in a helper instead of relying on internals.


654-655: Handle empty query more robustly

When search_query is empty you call similarity_search("", k=...). Some vector stores reject empty queries. Consider returning top_k most recent/any docs or requiring a non-empty query.

-        else:
-            results = chroma.similarity_search(
-                query=self.search_query or "",
-                k=self.top_k,
-            )
+        else:
+            if not self.search_query:
+                # No query: fallback to IDs or metadata order
+                docs = chroma.get(k=self.top_k) if hasattr(chroma, "get") else []
+                results = [(doc, 0) for doc in docs]
+            else:
+                results = chroma.similarity_search(query=self.search_query, k=self.top_k)
+                results = [(doc, 0) for doc in results]

Adjust to whatever public API your Chroma wrapper exposes.


654-655: Score sign convention

You negate similarity scores when present (_score = -1 * doc[1]). Consider keeping the native score and documenting the meaning (distance vs similarity) to avoid confusion downstream.

-            if self.search_query:
-                kwargs["_score"] = -1 * doc[1]
+            if self.search_query:
+                kwargs["_score"] = doc[1]  # preserve store's native semantics
src/lfx/src/lfx/custom/custom_component/component.py (3)

1337-1347: Docstring/type hints drift after async conversion

_update _get_tools docstring to emphasize it's async; optionally mark return as -> list[Tool] with note it’s awaited by callers.

-    async def _get_tools(self) -> list[Tool]:
-        """Get the list of tools for this component.
+    async def _get_tools(self) -> list[Tool]:
+        """Get the list of tools for this component (async).

1414-1429: Nit: comment typo and legacy branch

“remomved ince” → “removed once”. Also the else branch calling self._get_tools() (sync) is now dead code since _get_tools is async; consider deleting the branch once all subclasses are migrated.

-            # TODO: this check can be remomved ince get tools is async
+            # TODO: this check can be removed once _get_tools is always async
             if asyncio.iscoroutinefunction(self._get_tools):
                 tools = await self._get_tools()
             else:
                 tools = self._get_tools()

1365-1398: Metadata filtering: keep ordering stable

_set conversion to list(dicts) alters ordering when using sets later. If UI relies on ordering, consider stable sort by display_name before returning.

-        return [tool for tool in tools if tool_status.get(tool.name, True)]
+        filtered = [tool for tool in tools if tool_status.get(tool.name, True)]
+        return sorted(filtered, key=lambda t: getattr(t, "metadata", {}).get("display_name", t.name))
src/backend/base/langflow/initial_setup/starter_projects/News Aggregator.json (3)

1612-1613: Guard empty toolkit to avoid IndexError

CurrentDateComponent.to_toolkit() may return an empty list; .pop(0) would raise.

Apply:

-            current_date_tool = (await CurrentDateComponent(**self.get_base_args()).to_toolkit()).pop(0)
-            if not isinstance(current_date_tool, StructuredTool):
-                msg = "CurrentDateComponent must be converted to a StructuredTool"
-                raise TypeError(msg)
-            self.tools.append(current_date_tool)
+            toolkit = await CurrentDateComponent(**self.get_base_args()).to_toolkit()
+            if toolkit:
+                current_date_tool = toolkit[0]
+                if not isinstance(current_date_tool, StructuredTool):
+                    msg = "CurrentDateComponent must be converted to a StructuredTool"
+                    raise TypeError(msg)
+                self.tools.append(current_date_tool)

1612-1613: Handle optional provider inputs safely

inputs can be None; loop will raise. Also when setting params.

-    def _build_llm_model(self, component, inputs, prefix=""):
-        model_kwargs = {}
-        for input_ in inputs:
+    def _build_llm_model(self, component, inputs, prefix=""):
+        model_kwargs = {}
+        for input_ in (inputs or []):
             if hasattr(self, f"{prefix}{input_.name}"):
                 model_kwargs[input_.name] = getattr(self, f"{prefix}{input_.name}")
         return component.set(**model_kwargs).build_model()

-    def set_component_params(self, component):
+    def set_component_params(self, component):
         provider_info = MODEL_PROVIDERS_DICT.get(self.agent_llm)
         if provider_info:
-            inputs = provider_info.get("inputs")
+            inputs = provider_info.get("inputs") or []
             prefix = provider_info.get("prefix")
             # Filter out json_mode and only use attributes that exist on this component
             model_kwargs = {}
             for input_ in inputs:
                 if hasattr(self, f"{prefix}{input_.name}"):
                     model_kwargs[input_.name] = getattr(self, f"{prefix}{input_.name}")

1612-1613: Non-greedy JSON extraction to reduce over-capture

Regex r"{.*}" is greedy and can swallow too much content; use non-greedy.

-        json_pattern = r"\{.*\}"
+        json_pattern = r"\{.*?\}"
src/backend/base/langflow/initial_setup/starter_projects/Knowledge Ingestion.json (2)

868-1122: Defer settings validation to runtime, not import.

Raising at import time if knowledge_bases_dir is unset will prevent the project from loading.

Apply:

-knowledge_directory = settings.knowledge_bases_dir
-if not knowledge_directory:
-    msg = "Knowledge bases directory is not set in the settings."
-    raise ValueError(msg)
-KNOWLEDGE_BASES_ROOT_PATH = Path(knowledge_directory).expanduser()
+knowledge_directory = settings.knowledge_bases_dir
+KNOWLEDGE_BASES_ROOT_PATH = Path(knowledge_directory).expanduser() if knowledge_directory else None

And validate where first used:

-        return KNOWLEDGE_BASES_ROOT_PATH
+        if not KNOWLEDGE_BASES_ROOT_PATH:
+            raise ValueError("Knowledge bases directory is not set in the settings.")
+        return KNOWLEDGE_BASES_ROOT_PATH

868-1122: Persist Chroma after additions (defensive).

Optional but safer to flush to disk explicitly.

Apply:

             if documents:
                 chroma.add_documents(documents)
                 self.log(f"Added {len(documents)} documents to vector store '{self.knowledge_base}'")
+                with contextlib.suppress(Exception):
+                    chroma.persist()
src/backend/base/langflow/initial_setup/starter_projects/Pokédex Agent.json (2)

1497-1498: Greedy JSON extraction can over-capture; switch to non-greedy and try-all matches.

Using r"{.*}" will often swallow too much. Prefer non-greedy with finditer and parse-first-valid.

-        json_pattern = r"\{.*\}"
+        # Non-greedy; try each JSON-looking block until one parses
+        json_pattern = r"\{.*?\}"
@@
-        except json.JSONDecodeError:
-            json_match = re.search(json_pattern, content, re.DOTALL)
-            if json_match:
-                try:
-                    json_data = json.loads(json_match.group())
-                except json.JSONDecodeError:
-                    return {"content": content, "error": schema_error_msg}
-            else:
-                return {"content": content, "error": schema_error_msg}
+        except json.JSONDecodeError:
+            for m in re.finditer(json_pattern, content, re.DOTALL):
+                try:
+                    json_data = json.loads(m.group())
+                    break
+                except json.JSONDecodeError:
+                    continue
+            if json_data is None:
+                return {"content": content, "error": schema_error_msg}

1497-1498: Guard against empty toolkits when adding CurrentDate tool.

pop(0) will raise on empty lists. Add a length check.

-            current_date_tool = (await CurrentDateComponent(**self.get_base_args()).to_toolkit()).pop(0)
+            toolkit = await CurrentDateComponent(**self.get_base_args()).to_toolkit()
+            if not toolkit:
+                raise ValueError("CurrentDateComponent.to_toolkit() returned no tools")
+            current_date_tool = toolkit[0]
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fbf2a4a and 35a1282.

📒 Files selected for processing (21)
  • src/backend/base/langflow/initial_setup/starter_projects/Instagram Copywriter.json (2 hunks)
  • src/backend/base/langflow/initial_setup/starter_projects/Invoice Summarizer.json (2 hunks)
  • src/backend/base/langflow/initial_setup/starter_projects/Knowledge Ingestion.json (4 hunks)
  • src/backend/base/langflow/initial_setup/starter_projects/Knowledge Retrieval.json (4 hunks)
  • src/backend/base/langflow/initial_setup/starter_projects/Market Research.json (2 hunks)
  • src/backend/base/langflow/initial_setup/starter_projects/News Aggregator.json (2 hunks)
  • src/backend/base/langflow/initial_setup/starter_projects/Nvidia Remix.json (2 hunks)
  • src/backend/base/langflow/initial_setup/starter_projects/Pokédex Agent.json (2 hunks)
  • src/backend/base/langflow/initial_setup/starter_projects/Price Deal Finder.json (2 hunks)
  • src/backend/base/langflow/initial_setup/starter_projects/Research Agent.json (2 hunks)
  • src/backend/base/langflow/initial_setup/starter_projects/SaaS Pricing.json (2 hunks)
  • src/backend/base/langflow/initial_setup/starter_projects/Search agent.json (2 hunks)
  • src/backend/base/langflow/initial_setup/starter_projects/Simple Agent.json (2 hunks)
  • src/backend/base/langflow/initial_setup/starter_projects/Social Media Agent.json (2 hunks)
  • src/backend/base/langflow/initial_setup/starter_projects/Youtube Analysis.json (2 hunks)
  • src/backend/tests/data/simple_agent.py (2 hunks)
  • src/backend/tests/unit/test_simple_agent_in_lfx_run.py (4 hunks)
  • src/lfx/src/lfx/base/agents/agent.py (1 hunks)
  • src/lfx/src/lfx/base/composio/composio_base.py (1 hunks)
  • src/lfx/src/lfx/components/agents/agent.py (2 hunks)
  • src/lfx/src/lfx/custom/custom_component/component.py (4 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
{src/backend/**/*.py,tests/**/*.py,Makefile}

📄 CodeRabbit inference engine (.cursor/rules/backend_development.mdc)

{src/backend/**/*.py,tests/**/*.py,Makefile}: Run make format_backend to format Python code before linting or committing changes
Run make lint to perform linting checks on backend Python code

Files:

  • src/backend/tests/data/simple_agent.py
  • src/backend/tests/unit/test_simple_agent_in_lfx_run.py
src/backend/tests/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/testing.mdc)

src/backend/tests/**/*.py: Unit tests for backend code must be located in the 'src/backend/tests/' directory, with component tests organized by component subdirectory under 'src/backend/tests/unit/components/'.
Test files should use the same filename as the component under test, with an appropriate test prefix or suffix (e.g., 'my_component.py' → 'test_my_component.py').
Use the 'client' fixture (an async httpx.AsyncClient) for API tests in backend Python tests, as defined in 'src/backend/tests/conftest.py'.
When writing component tests, inherit from the appropriate base class in 'src/backend/tests/base.py' (ComponentTestBase, ComponentTestBaseWithClient, or ComponentTestBaseWithoutClient) and provide the required fixtures: 'component_class', 'default_kwargs', and 'file_names_mapping'.
Each test in backend Python test files should have a clear docstring explaining its purpose, and complex setups or mocks should be well-commented.
Test both sync and async code paths in backend Python tests, using '@pytest.mark.asyncio' for async tests.
Mock external dependencies appropriately in backend Python tests to isolate unit tests from external services.
Test error handling and edge cases in backend Python tests, including using 'pytest.raises' and asserting error messages.
Validate input/output behavior and test component initialization and configuration in backend Python tests.
Use the 'no_blockbuster' pytest marker to skip the blockbuster plugin in tests when necessary.
Be aware of ContextVar propagation in async tests; test both direct event loop execution and 'asyncio.to_thread' scenarios to ensure proper context isolation.
Test error handling by mocking internal functions using monkeypatch in backend Python tests.
Test resource cleanup in backend Python tests by using fixtures that ensure proper initialization and cleanup of resources.
Test timeout and performance constraints in backend Python tests using 'asyncio.wait_for' and timing assertions.
Test Langflow's Messag...

Files:

  • src/backend/tests/data/simple_agent.py
  • src/backend/tests/unit/test_simple_agent_in_lfx_run.py
src/backend/tests/unit/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/backend_development.mdc)

Test component integration within flows using create_flow, build_flow, and get_build_events utilities

Files:

  • src/backend/tests/unit/test_simple_agent_in_lfx_run.py
🧠 Learnings (1)
📚 Learning: 2025-07-21T14:16:14.125Z
Learnt from: CR
PR: langflow-ai/langflow#0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-07-21T14:16:14.125Z
Learning: Applies to src/backend/tests/**/*.py : Use 'anyio' and 'aiofiles' for async file operations in backend Python tests that involve file handling.

Applied to files:

  • src/backend/tests/data/simple_agent.py
🧬 Code graph analysis (6)
src/backend/tests/data/simple_agent.py (1)
src/lfx/src/lfx/custom/custom_component/component.py (2)
  • run (419-425)
  • to_toolkit (1314-1335)
src/lfx/src/lfx/base/agents/agent.py (3)
src/lfx/src/lfx/components/agents/agent.py (1)
  • _get_tools (547-558)
src/lfx/src/lfx/base/composio/composio_base.py (1)
  • _get_tools (1953-1957)
src/lfx/src/lfx/custom/custom_component/component.py (1)
  • _get_tools (1337-1347)
src/lfx/src/lfx/base/composio/composio_base.py (3)
src/lfx/src/lfx/components/agents/agent.py (1)
  • _get_tools (547-558)
src/lfx/src/lfx/base/agents/agent.py (1)
  • _get_tools (257-268)
src/lfx/src/lfx/custom/custom_component/component.py (1)
  • _get_tools (1337-1347)
src/backend/tests/unit/test_simple_agent_in_lfx_run.py (1)
src/lfx/src/lfx/custom/custom_component/component.py (1)
  • to_toolkit (1314-1335)
src/lfx/src/lfx/components/agents/agent.py (2)
src/lfx/src/lfx/custom/custom_component/component.py (3)
  • get_base_args (268-281)
  • to_toolkit (1314-1335)
  • _get_tools (1337-1347)
src/lfx/src/lfx/base/agents/agent.py (1)
  • _get_tools (257-268)
src/lfx/src/lfx/custom/custom_component/component.py (3)
src/lfx/src/lfx/components/agents/agent.py (1)
  • _get_tools (547-558)
src/lfx/src/lfx/base/agents/agent.py (1)
  • _get_tools (257-268)
src/lfx/src/lfx/base/composio/composio_base.py (1)
  • _get_tools (1953-1957)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (46)
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 33/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 38/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 31/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 39/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 26/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 30/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 37/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 35/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 36/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 40/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 32/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 27/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 28/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 34/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 21/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 29/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 19/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 24/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 25/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 17/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 20/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 23/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 22/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 18/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 12/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 7/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 16/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 15/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 11/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 14/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 5/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 3/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 1/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 8/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 13/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 2/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 9/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 10/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 6/40
  • GitHub Check: Run Frontend Tests / Playwright Tests - Shard 4/40
  • GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 3
  • GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 2
  • GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 5
  • GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 1
  • GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 4
  • GitHub Check: Test Starter Templates
🔇 Additional comments (24)
src/backend/base/langflow/initial_setup/starter_projects/Youtube Analysis.json (1)

762-762: Confirm Agent code_hash matches bundled code; rebuild starter cache if hashes drive client-side invalidation.

Only metadata changed here. Please verify no other nodes reference the previous hash to avoid stale caching in exported/imported flows.

src/backend/base/langflow/initial_setup/starter_projects/SaaS Pricing.json (1)

914-914: Agent code_hash bump — validate downstream artifacts.

Please confirm the new hash corresponds to the inlined Agent code and that starter export/import tools don’t rely on the old hash.

src/backend/base/langflow/initial_setup/starter_projects/Search agent.json (1)

1203-1204: Async migration in AgentComponent looks consistent.

get_agent_requirements, message_response, json_response, and _get_tools are properly async/awaited. Nice.

src/backend/base/langflow/initial_setup/starter_projects/Social Media Agent.json (1)

1560-1561: Fix in update_build_config using MODEL_PROVIDERS_DICT is correct.

Provider field toggling logic now references the right mapping.

src/backend/base/langflow/initial_setup/starter_projects/Price Deal Finder.json (1)

1698-1698: Code hash bump acknowledged.

No action needed.

src/backend/base/langflow/initial_setup/starter_projects/Market Research.json (1)

2155-2155: Agent code hash update looks consistent with async refactor.

src/backend/tests/unit/test_simple_agent_in_lfx_run.py (2)

162-165: Good: awaiting to_toolkit after async migration.


10-14: Optional: add anyio/pytest-asyncio to test markers globally.

If the project uses auto mode, this is fine; otherwise ensure the plugin is configured so async tests run.

Would you like a follow-up PR config to enforce pytest-asyncio usage across this package?

src/backend/base/langflow/initial_setup/starter_projects/Invoice Summarizer.json (2)

1246-1246: Agent code hash bumped; aligns with async/toolkit migration.


1412-1413: Prompt: Nice addition of update_template_values to preserve user-set values.

src/lfx/src/lfx/components/agents/agent.py (1)

547-559: ```shell
#!/bin/bash
set -euo pipefail

echo "PWD: $(pwd)"
echo "1) find any _get_tools definitions (async or sync)"
rg -n --hidden -uu -P '^\s*(async\s+def|def)\s+_get_tools\s*(' -S || true

echo
echo "2) find all to_toolkit(...) occurrences (with context)"
rg -n --hidden -uu -P '\bto_toolkit\s*(' -S -C3 || true

echo
echo "3) heuristic: to_toolkit call sites without an obvious await"
rg -n --hidden -uu -P '\bto_toolkit\s*(' -S -C3 | rg -nP -v 'await|asyncio.run|anyio.to_thread|create_task' || true

echo
echo "4) references to _get_tools"
rg -n --hidden -uu -P '\b_get_tools\b' -S -C3 || true

echo
echo "5) show agent.py snippet (if present)"
if [ -f src/lfx/src/lfx/components/agents/agent.py ]; then
sed -n '520,580p' src/lfx/src/lfx/components/agents/agent.py || true
else
echo "agent.py not found at src/lfx/src/lfx/components/agents/agent.py"
fi


</blockquote></details>
<details>
<summary>src/backend/base/langflow/initial_setup/starter_projects/Nvidia Remix.json (1)</summary><blockquote>

`1093-1135`: **Incorrect — get_tools is synchronous; no await needed.**

get_tools is defined as a regular def in src/lfx/src/lfx/base/tools/component_tool.py and is invoked synchronously in run_flow.py, agent.py, components/agents/agent.py, and custom_component/component.py; no change to await is required. 

> Likely an incorrect or invalid review comment.

</blockquote></details>
<details>
<summary>src/backend/base/langflow/initial_setup/starter_projects/Instagram Copywriter.json (1)</summary><blockquote>

`2231-2232`: **Good: agent now awaits async to_toolkit call**

The Agent component correctly awaits CurrentDateComponent(...).to_toolkit(). This aligns with the async tool pipeline introduced in this PR.

</blockquote></details>
<details>
<summary>src/backend/base/langflow/initial_setup/starter_projects/News Aggregator.json (1)</summary><blockquote>

`1446-1446`: **Metadata hash bump noted**

code_hash updated; no issues. Ensure starter cache is invalidated accordingly in packaging.

</blockquote></details>
<details>
<summary>src/backend/base/langflow/initial_setup/starter_projects/Simple Agent.json (1)</summary><blockquote>

`1025-1025`: **Metadata hash bump noted**

code_hash updated; looks good.

</blockquote></details>
<details>
<summary>src/backend/base/langflow/initial_setup/starter_projects/Research Agent.json (2)</summary><blockquote>

`2665-2665`: **Meta-only change.**

code_hash updated. No action required.

---

`2831-3247`: **Minor: log and rethrow policy looks good.**

Async logging with logger.aerror and specific exception branches are appropriate; keeping the final broad except is acceptable given rethrow.


Please confirm tests cover:
- add_current_date_tool=True/False branches
- json_response with/without output_schema
- empty toolkits for CurrentDateComponent

</blockquote></details>
<details>
<summary>src/backend/base/langflow/initial_setup/starter_projects/Knowledge Ingestion.json (4)</summary><blockquote>

`738-738`: **Meta-only change.**

code_hash updated. No action required.

---

`754-756`: **Dependency metadata switched to lfx.**

Looks fine; ensure the environment images include lfx at runtime.

---

`777-777`: **Module path moved to custom_components.knowledge_ingestion.**

Confirm the module is discoverable in production builds.

---

`868-1122`: **Keep api_key for langchain_openai — do not change to openai_api_key.**

The partner package langchain_openai.OpenAIEmbeddings accepts api_key in its constructor (not openai_api_key). ([api.python.langchain.com](https://api.python.langchain.com/en/latest/embeddings/langchain_openai.embeddings.base.OpenAIEmbeddings.html?utm_source=openai))  
Some other LangChain variants (community/core) historically use openai_api_key — change only if your environment pins one of those packages; verify the pinned package/version (e.g. langchain-openai 0.3.23). ([api.python.langchain.com](https://api.python.langchain.com/en/latest/community/embeddings/langchain_community.embeddings.openai.OpenAIEmbeddings.html?utm_source=openai))

Occurrences in this repo that use api_key and are correct: src/backend/base/langflow/components/knowledge_bases/ingestion.py, src/backend/base/langflow/components/knowledge_bases/retrieval.py. 

> Likely an incorrect or invalid review comment.

</blockquote></details>
<details>
<summary>src/backend/base/langflow/initial_setup/starter_projects/Pokédex Agent.json (3)</summary><blockquote>

`1331-1331`: **Confirm code_hash/source parity.**

code_hash changed. Ensure the embedded Agent code (below) is exactly what shipped in the built component, otherwise cache/key mismatches may prevent runtime hot-reload in the playground.

---

`1497-1498`: **Incorrect — no change required: provider component_class values are instantiated.**

MODEL_PROVIDERS_DICT in src/lfx/src/lfx/base/models/model_input_constants.py sets "component_class" to instantiated objects (e.g., OpenAIModelComponent(), GroqModel(), AnthropicModelComponent()), so calling component.set(...).build_model() is valid; ignore the suggested instantiation diff. 

> Likely an incorrect or invalid review comment.

---

`1497-1498`: **No change required — keep the synchronous call.**
get_component_toolkit(...).get_tools is defined as a regular def (not async) in src/lfx/src/lfx/base/tools/component_tool.py, so do not await it.

</blockquote></details>

</blockquote></details>

</details>

<!-- This is an auto-generated comment by CodeRabbit for review status -->

Comment on lines +654 to 655
"value": "import json\nfrom pathlib import Path\nfrom typing import Any\n\nfrom cryptography.fernet import InvalidToken\nfrom langchain_chroma import Chroma\nfrom lfx.custom import Component\nfrom lfx.io import BoolInput, DropdownInput, IntInput, MessageTextInput, Output, SecretStrInput\nfrom lfx.log.logger import logger\nfrom lfx.schema.data import Data\nfrom lfx.schema.dataframe import DataFrame\nfrom lfx.services.deps import get_settings_service\nfrom pydantic import SecretStr\n\nfrom langflow.base.knowledge_bases import get_knowledge_bases\nfrom langflow.services.auth.utils import decrypt_api_key\nfrom langflow.services.database.models.user.crud import get_user_by_id\nfrom langflow.services.deps import session_scope\n\nsettings = get_settings_service().settings\nknowledge_directory = settings.knowledge_bases_dir\nif not knowledge_directory:\n msg = \"Knowledge bases directory is not set in the settings.\"\n raise ValueError(msg)\nKNOWLEDGE_BASES_ROOT_PATH = Path(knowledge_directory).expanduser()\n\n\nclass KnowledgeRetrievalComponent(Component):\n display_name = \"Knowledge Retrieval\"\n description = \"Search and retrieve data from knowledge.\"\n icon = \"download\"\n name = \"KnowledgeRetrieval\"\n\n inputs = [\n DropdownInput(\n name=\"knowledge_base\",\n display_name=\"Knowledge\",\n info=\"Select the knowledge to load data from.\",\n required=True,\n options=[],\n refresh_button=True,\n real_time_refresh=True,\n ),\n SecretStrInput(\n name=\"api_key\",\n display_name=\"Embedding Provider API Key\",\n info=\"API key for the embedding provider to generate embeddings.\",\n advanced=True,\n required=False,\n ),\n MessageTextInput(\n name=\"search_query\",\n display_name=\"Search Query\",\n info=\"Optional search query to filter knowledge base data.\",\n tool_mode=True,\n ),\n IntInput(\n name=\"top_k\",\n display_name=\"Top K Results\",\n info=\"Number of top results to return from the knowledge base.\",\n value=5,\n advanced=True,\n required=False,\n ),\n BoolInput(\n name=\"include_metadata\",\n display_name=\"Include Metadata\",\n info=\"Whether to include all metadata in the output. If false, only content is returned.\",\n value=True,\n advanced=False,\n ),\n BoolInput(\n name=\"include_embeddings\",\n display_name=\"Include Embeddings\",\n info=\"Whether to include embeddings in the output. Only applicable if 'Include Metadata' is enabled.\",\n value=False,\n advanced=True,\n ),\n ]\n\n outputs = [\n Output(\n name=\"retrieve_data\",\n display_name=\"Results\",\n method=\"retrieve_data\",\n info=\"Returns the data from the selected knowledge base.\",\n ),\n ]\n\n async def update_build_config(self, build_config, field_value, field_name=None): # noqa: ARG002\n if field_name == \"knowledge_base\":\n # Update the knowledge base options dynamically\n build_config[\"knowledge_base\"][\"options\"] = await get_knowledge_bases(\n KNOWLEDGE_BASES_ROOT_PATH,\n user_id=self.user_id, # Use the user_id from the component context\n )\n\n # If the selected knowledge base is not available, reset it\n if build_config[\"knowledge_base\"][\"value\"] not in build_config[\"knowledge_base\"][\"options\"]:\n build_config[\"knowledge_base\"][\"value\"] = None\n\n return build_config\n\n def _get_kb_metadata(self, kb_path: Path) -> dict:\n \"\"\"Load and process knowledge base metadata.\"\"\"\n metadata: dict[str, Any] = {}\n metadata_file = kb_path / \"embedding_metadata.json\"\n if not metadata_file.exists():\n logger.warning(f\"Embedding metadata file not found at {metadata_file}\")\n return metadata\n\n try:\n with metadata_file.open(\"r\", encoding=\"utf-8\") as f:\n metadata = json.load(f)\n except json.JSONDecodeError:\n logger.error(f\"Error decoding JSON from {metadata_file}\")\n return {}\n\n # Decrypt API key if it exists\n if \"api_key\" in metadata and metadata.get(\"api_key\"):\n settings_service = get_settings_service()\n try:\n decrypted_key = decrypt_api_key(metadata[\"api_key\"], settings_service)\n metadata[\"api_key\"] = decrypted_key\n except (InvalidToken, TypeError, ValueError) as e:\n logger.error(f\"Could not decrypt API key. Please provide it manually. Error: {e}\")\n metadata[\"api_key\"] = None\n return metadata\n\n def _build_embeddings(self, metadata: dict):\n \"\"\"Build embedding model from metadata.\"\"\"\n runtime_api_key = self.api_key.get_secret_value() if isinstance(self.api_key, SecretStr) else self.api_key\n provider = metadata.get(\"embedding_provider\")\n model = metadata.get(\"embedding_model\")\n api_key = runtime_api_key or metadata.get(\"api_key\")\n chunk_size = metadata.get(\"chunk_size\")\n\n # Handle various providers\n if provider == \"OpenAI\":\n from langchain_openai import OpenAIEmbeddings\n\n if not api_key:\n msg = \"OpenAI API key is required. Provide it in the component's advanced settings.\"\n raise ValueError(msg)\n return OpenAIEmbeddings(\n model=model,\n api_key=api_key,\n chunk_size=chunk_size,\n )\n if provider == \"HuggingFace\":\n from langchain_huggingface import HuggingFaceEmbeddings\n\n return HuggingFaceEmbeddings(\n model=model,\n )\n if provider == \"Cohere\":\n from langchain_cohere import CohereEmbeddings\n\n if not api_key:\n msg = \"Cohere API key is required when using Cohere provider\"\n raise ValueError(msg)\n return CohereEmbeddings(\n model=model,\n cohere_api_key=api_key,\n )\n if provider == \"Custom\":\n # For custom embedding models, we would need additional configuration\n msg = \"Custom embedding models not yet supported\"\n raise NotImplementedError(msg)\n # Add other providers here if they become supported in ingest\n msg = f\"Embedding provider '{provider}' is not supported for retrieval.\"\n raise NotImplementedError(msg)\n\n async def retrieve_data(self) -> DataFrame:\n \"\"\"Retrieve data from the selected knowledge base by reading the Chroma collection.\n\n Returns:\n A DataFrame containing the data rows from the knowledge base.\n \"\"\"\n # Get the current user\n async with session_scope() as db:\n if not self.user_id:\n msg = \"User ID is required for fetching Knowledge Base data.\"\n raise ValueError(msg)\n current_user = await get_user_by_id(db, self.user_id)\n if not current_user:\n msg = f\"User with ID {self.user_id} not found.\"\n raise ValueError(msg)\n kb_user = current_user.username\n kb_path = KNOWLEDGE_BASES_ROOT_PATH / kb_user / self.knowledge_base\n\n metadata = self._get_kb_metadata(kb_path)\n if not metadata:\n msg = f\"Metadata not found for knowledge base: {self.knowledge_base}. Ensure it has been indexed.\"\n raise ValueError(msg)\n\n # Build the embedder for the knowledge base\n embedding_function = self._build_embeddings(metadata)\n\n # Load vector store\n chroma = Chroma(\n persist_directory=str(kb_path),\n embedding_function=embedding_function,\n collection_name=self.knowledge_base,\n )\n\n # If a search query is provided, perform a similarity search\n if self.search_query:\n # Use the search query to perform a similarity search\n logger.info(f\"Performing similarity search with query: {self.search_query}\")\n results = chroma.similarity_search_with_score(\n query=self.search_query or \"\",\n k=self.top_k,\n )\n else:\n results = chroma.similarity_search(\n query=self.search_query or \"\",\n k=self.top_k,\n )\n\n # For each result, make it a tuple to match the expected output format\n results = [(doc, 0) for doc in results] # Assign a dummy score of 0\n\n # If include_embeddings is enabled, get embeddings for the results\n id_to_embedding = {}\n if self.include_embeddings and results:\n doc_ids = [doc[0].metadata.get(\"_id\") for doc in results if doc[0].metadata.get(\"_id\")]\n\n # Only proceed if we have valid document IDs\n if doc_ids:\n # Access underlying client to get embeddings\n collection = chroma._client.get_collection(name=self.knowledge_base)\n embeddings_result = collection.get(where={\"_id\": {\"$in\": doc_ids}}, include=[\"metadatas\", \"embeddings\"])\n\n # Create a mapping from document ID to embedding\n for i, metadata in enumerate(embeddings_result.get(\"metadatas\", [])):\n if metadata and \"_id\" in metadata:\n id_to_embedding[metadata[\"_id\"]] = embeddings_result[\"embeddings\"][i]\n\n # Build output data based on include_metadata setting\n data_list = []\n for doc in results:\n kwargs = {\n \"content\": doc[0].page_content,\n }\n if self.search_query:\n kwargs[\"_score\"] = -1 * doc[1]\n if self.include_metadata:\n # Include all metadata, embeddings, and content\n kwargs.update(doc[0].metadata)\n if self.include_embeddings:\n kwargs[\"_embeddings\"] = id_to_embedding.get(doc[0].metadata.get(\"_id\"))\n\n data_list.append(Data(**kwargs))\n\n # Return the DataFrame containing the data\n return DataFrame(data=data_list)\n"
},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

🧩 Analysis chain

Bug: OpenAIEmbeddings uses wrong keyword for API key

langchain_openai.OpenAIEmbeddings expects openai_api_key, not api_key. Current code will raise at runtime.

Apply this fix inside the code string:

-            return OpenAIEmbeddings(
-                model=model,
-                api_key=api_key,
-                chunk_size=chunk_size,
-            )
+            return OpenAIEmbeddings(
+                model=model,
+                openai_api_key=api_key,
+                chunk_size=chunk_size,
+            )

🌐 Web query:

What is the correct keyword argument for passing the API key to `OpenAIEmbeddings` in `langchain_openai`?

💡 Result:

Use the openai_api_key keyword (e.g. OpenAIEmbeddings(openai_api_key="sk-...")). [1][2]

Sources:

  • LangChain discussion showing use of openai_api_key. [1]
  • langchain_openai OpenAIEmbeddings source/docs. [2]

Fix OpenAIEmbeddings API key kwarg — use openai_api_key

OpenAIEmbeddings in langchain_openai expects openai_api_key (not api_key). Change as shown.

File: src/backend/base/langflow/initial_setup/starter_projects/Knowledge Retrieval.json (around lines 654-655)

-            return OpenAIEmbeddings(
-                model=model,
-                api_key=api_key,
-                chunk_size=chunk_size,
-            )
+            return OpenAIEmbeddings(
+                model=model,
+                openai_api_key=api_key,
+                chunk_size=chunk_size,
+            )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"value": "import json\nfrom pathlib import Path\nfrom typing import Any\n\nfrom cryptography.fernet import InvalidToken\nfrom langchain_chroma import Chroma\nfrom lfx.custom import Component\nfrom lfx.io import BoolInput, DropdownInput, IntInput, MessageTextInput, Output, SecretStrInput\nfrom lfx.log.logger import logger\nfrom lfx.schema.data import Data\nfrom lfx.schema.dataframe import DataFrame\nfrom lfx.services.deps import get_settings_service\nfrom pydantic import SecretStr\n\nfrom langflow.base.knowledge_bases import get_knowledge_bases\nfrom langflow.services.auth.utils import decrypt_api_key\nfrom langflow.services.database.models.user.crud import get_user_by_id\nfrom langflow.services.deps import session_scope\n\nsettings = get_settings_service().settings\nknowledge_directory = settings.knowledge_bases_dir\nif not knowledge_directory:\n msg = \"Knowledge bases directory is not set in the settings.\"\n raise ValueError(msg)\nKNOWLEDGE_BASES_ROOT_PATH = Path(knowledge_directory).expanduser()\n\n\nclass KnowledgeRetrievalComponent(Component):\n display_name = \"Knowledge Retrieval\"\n description = \"Search and retrieve data from knowledge.\"\n icon = \"download\"\n name = \"KnowledgeRetrieval\"\n\n inputs = [\n DropdownInput(\n name=\"knowledge_base\",\n display_name=\"Knowledge\",\n info=\"Select the knowledge to load data from.\",\n required=True,\n options=[],\n refresh_button=True,\n real_time_refresh=True,\n ),\n SecretStrInput(\n name=\"api_key\",\n display_name=\"Embedding Provider API Key\",\n info=\"API key for the embedding provider to generate embeddings.\",\n advanced=True,\n required=False,\n ),\n MessageTextInput(\n name=\"search_query\",\n display_name=\"Search Query\",\n info=\"Optional search query to filter knowledge base data.\",\n tool_mode=True,\n ),\n IntInput(\n name=\"top_k\",\n display_name=\"Top K Results\",\n info=\"Number of top results to return from the knowledge base.\",\n value=5,\n advanced=True,\n required=False,\n ),\n BoolInput(\n name=\"include_metadata\",\n display_name=\"Include Metadata\",\n info=\"Whether to include all metadata in the output. If false, only content is returned.\",\n value=True,\n advanced=False,\n ),\n BoolInput(\n name=\"include_embeddings\",\n display_name=\"Include Embeddings\",\n info=\"Whether to include embeddings in the output. Only applicable if 'Include Metadata' is enabled.\",\n value=False,\n advanced=True,\n ),\n ]\n\n outputs = [\n Output(\n name=\"retrieve_data\",\n display_name=\"Results\",\n method=\"retrieve_data\",\n info=\"Returns the data from the selected knowledge base.\",\n ),\n ]\n\n async def update_build_config(self, build_config, field_value, field_name=None): # noqa: ARG002\n if field_name == \"knowledge_base\":\n # Update the knowledge base options dynamically\n build_config[\"knowledge_base\"][\"options\"] = await get_knowledge_bases(\n KNOWLEDGE_BASES_ROOT_PATH,\n user_id=self.user_id, # Use the user_id from the component context\n )\n\n # If the selected knowledge base is not available, reset it\n if build_config[\"knowledge_base\"][\"value\"] not in build_config[\"knowledge_base\"][\"options\"]:\n build_config[\"knowledge_base\"][\"value\"] = None\n\n return build_config\n\n def _get_kb_metadata(self, kb_path: Path) -> dict:\n \"\"\"Load and process knowledge base metadata.\"\"\"\n metadata: dict[str, Any] = {}\n metadata_file = kb_path / \"embedding_metadata.json\"\n if not metadata_file.exists():\n logger.warning(f\"Embedding metadata file not found at {metadata_file}\")\n return metadata\n\n try:\n with metadata_file.open(\"r\", encoding=\"utf-8\") as f:\n metadata = json.load(f)\n except json.JSONDecodeError:\n logger.error(f\"Error decoding JSON from {metadata_file}\")\n return {}\n\n # Decrypt API key if it exists\n if \"api_key\" in metadata and metadata.get(\"api_key\"):\n settings_service = get_settings_service()\n try:\n decrypted_key = decrypt_api_key(metadata[\"api_key\"], settings_service)\n metadata[\"api_key\"] = decrypted_key\n except (InvalidToken, TypeError, ValueError) as e:\n logger.error(f\"Could not decrypt API key. Please provide it manually. Error: {e}\")\n metadata[\"api_key\"] = None\n return metadata\n\n def _build_embeddings(self, metadata: dict):\n \"\"\"Build embedding model from metadata.\"\"\"\n runtime_api_key = self.api_key.get_secret_value() if isinstance(self.api_key, SecretStr) else self.api_key\n provider = metadata.get(\"embedding_provider\")\n model = metadata.get(\"embedding_model\")\n api_key = runtime_api_key or metadata.get(\"api_key\")\n chunk_size = metadata.get(\"chunk_size\")\n\n # Handle various providers\n if provider == \"OpenAI\":\n from langchain_openai import OpenAIEmbeddings\n\n if not api_key:\n msg = \"OpenAI API key is required. Provide it in the component's advanced settings.\"\n raise ValueError(msg)\n return OpenAIEmbeddings(\n model=model,\n api_key=api_key,\n chunk_size=chunk_size,\n )\n if provider == \"HuggingFace\":\n from langchain_huggingface import HuggingFaceEmbeddings\n\n return HuggingFaceEmbeddings(\n model=model,\n )\n if provider == \"Cohere\":\n from langchain_cohere import CohereEmbeddings\n\n if not api_key:\n msg = \"Cohere API key is required when using Cohere provider\"\n raise ValueError(msg)\n return CohereEmbeddings(\n model=model,\n cohere_api_key=api_key,\n )\n if provider == \"Custom\":\n # For custom embedding models, we would need additional configuration\n msg = \"Custom embedding models not yet supported\"\n raise NotImplementedError(msg)\n # Add other providers here if they become supported in ingest\n msg = f\"Embedding provider '{provider}' is not supported for retrieval.\"\n raise NotImplementedError(msg)\n\n async def retrieve_data(self) -> DataFrame:\n \"\"\"Retrieve data from the selected knowledge base by reading the Chroma collection.\n\n Returns:\n A DataFrame containing the data rows from the knowledge base.\n \"\"\"\n # Get the current user\n async with session_scope() as db:\n if not self.user_id:\n msg = \"User ID is required for fetching Knowledge Base data.\"\n raise ValueError(msg)\n current_user = await get_user_by_id(db, self.user_id)\n if not current_user:\n msg = f\"User with ID {self.user_id} not found.\"\n raise ValueError(msg)\n kb_user = current_user.username\n kb_path = KNOWLEDGE_BASES_ROOT_PATH / kb_user / self.knowledge_base\n\n metadata = self._get_kb_metadata(kb_path)\n if not metadata:\n msg = f\"Metadata not found for knowledge base: {self.knowledge_base}. Ensure it has been indexed.\"\n raise ValueError(msg)\n\n # Build the embedder for the knowledge base\n embedding_function = self._build_embeddings(metadata)\n\n # Load vector store\n chroma = Chroma(\n persist_directory=str(kb_path),\n embedding_function=embedding_function,\n collection_name=self.knowledge_base,\n )\n\n # If a search query is provided, perform a similarity search\n if self.search_query:\n # Use the search query to perform a similarity search\n logger.info(f\"Performing similarity search with query: {self.search_query}\")\n results = chroma.similarity_search_with_score(\n query=self.search_query or \"\",\n k=self.top_k,\n )\n else:\n results = chroma.similarity_search(\n query=self.search_query or \"\",\n k=self.top_k,\n )\n\n # For each result, make it a tuple to match the expected output format\n results = [(doc, 0) for doc in results] # Assign a dummy score of 0\n\n # If include_embeddings is enabled, get embeddings for the results\n id_to_embedding = {}\n if self.include_embeddings and results:\n doc_ids = [doc[0].metadata.get(\"_id\") for doc in results if doc[0].metadata.get(\"_id\")]\n\n # Only proceed if we have valid document IDs\n if doc_ids:\n # Access underlying client to get embeddings\n collection = chroma._client.get_collection(name=self.knowledge_base)\n embeddings_result = collection.get(where={\"_id\": {\"$in\": doc_ids}}, include=[\"metadatas\", \"embeddings\"])\n\n # Create a mapping from document ID to embedding\n for i, metadata in enumerate(embeddings_result.get(\"metadatas\", [])):\n if metadata and \"_id\" in metadata:\n id_to_embedding[metadata[\"_id\"]] = embeddings_result[\"embeddings\"][i]\n\n # Build output data based on include_metadata setting\n data_list = []\n for doc in results:\n kwargs = {\n \"content\": doc[0].page_content,\n }\n if self.search_query:\n kwargs[\"_score\"] = -1 * doc[1]\n if self.include_metadata:\n # Include all metadata, embeddings, and content\n kwargs.update(doc[0].metadata)\n if self.include_embeddings:\n kwargs[\"_embeddings\"] = id_to_embedding.get(doc[0].metadata.get(\"_id\"))\n\n data_list.append(Data(**kwargs))\n\n # Return the DataFrame containing the data\n return DataFrame(data=data_list)\n"
},
def _build_embeddings(self, metadata: dict):
"""Build embedding model from metadata."""
runtime_api_key = self.api_key.get_secret_value() if isinstance(self.api_key, SecretStr) else self.api_key
provider = metadata.get("embedding_provider")
model = metadata.get("embedding_model")
api_key = runtime_api_key or metadata.get("api_key")
chunk_size = metadata.get("chunk_size")
# Handle various providers
if provider == "OpenAI":
from langchain_openai import OpenAIEmbeddings
if not api_key:
msg = "OpenAI API key is required. Provide it in the component's advanced settings."
raise ValueError(msg)
return OpenAIEmbeddings(
model=model,
openai_api_key=api_key,
chunk_size=chunk_size,
)
if provider == "HuggingFace":
from langchain_huggingface import HuggingFaceEmbeddings
return HuggingFaceEmbeddings(
model=model,
)
if provider == "Cohere":
from langchain_cohere import CohereEmbeddings
if not api_key:
msg = "Cohere API key is required when using Cohere provider"
raise ValueError(msg)
return CohereEmbeddings(
model=model,
cohere_api_key=api_key,
)
if provider == "Custom":
# For custom embedding models, we would need additional configuration
msg = "Custom embedding models not yet supported"
raise NotImplementedError(msg)
# Add other providers here if they become supported in ingest
msg = f"Embedding provider '{provider}' is not supported for retrieval."
raise NotImplementedError(msg)
🤖 Prompt for AI Agents
In src/backend/base/langflow/initial_setup/starter_projects/Knowledge
Retrieval.json around lines 654-655, the OpenAIEmbeddings is being constructed
with the wrong kwarg name (api_key); change the constructor call to pass the API
key as openai_api_key=api_key (keep model and chunk_size args as-is) so it
matches langchain_openai's expected parameter name.

uv run lfx run simple_agent.py "How are you?"
"""

import asyncio
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

🧩 Analysis chain

Avoid asyncio.run at import time; expose an async factory instead.

Import‑time asyncio.run(...) can fail under pytest’s running loop and makes imports side‑effectful.

-import asyncio
+import asyncio
@@
-tools = asyncio.run(url_component.to_toolkit())
+# Prefer lazy/explicit initialization to avoid event-loop issues on import.
+graph = None
+
+async def build_graph() -> Graph:
+    tools = await url_component.to_toolkit()
+    agent.set(
+        model_name="gpt-4o-mini",
+        agent_llm="OpenAI",
+        api_key=os.getenv("OPENAI_API_KEY"),
+        input_value=chat_input.message_response,
+        tools=tools,
+    )
+    chat_output = cp.ChatOutput().set(input_value=agent.message_response)
+    return Graph(chat_input, chat_output, log_config=log_config)
+
+if __name__ == "__main__":
+    # Local manual run: uv run python simple_agent.py
+    import anyio
+    graph = anyio.run(build_graph)

Run to check for other import-time asyncio.run usages and imports of this module:

Also applies to: 35-35


🏁 Script executed:

#!/bin/bash
rg -nP "asyncio\.run\(.+to_toolkit\(" -g 'src/**'
rg -nP 'from\s+src\.backend\.tests\.data\.simple_agent|import\s+simple_agent' -g 'src/**'

Length of output: 89


Avoid asyncio.run at import time; expose an async factory instead.

Import-time asyncio.run(...) will fail under pytest’s running loop and makes imports side‑effectful.

File: src/backend/tests/data/simple_agent.py — import at line 17; asyncio.run usage at line 35.

-import asyncio
+import asyncio
@@
-tools = asyncio.run(url_component.to_toolkit())
+# Prefer lazy/explicit initialization to avoid event-loop issues on import.
+graph = None
+
+async def build_graph() -> Graph:
+    tools = await url_component.to_toolkit()
+    agent.set(
+        model_name="gpt-4o-mini",
+        agent_llm="OpenAI",
+        api_key=os.getenv("OPENAI_API_KEY"),
+        input_value=chat_input.message_response,
+        tools=tools,
+    )
+    chat_output = cp.ChatOutput().set(input_value=agent.message_response)
+    return Graph(chat_input, chat_output, log_config=log_config)
+
+if __name__ == "__main__":
+    # Local manual run: uv run python simple_agent.py
+    import anyio
+    graph = anyio.run(build_graph)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import asyncio
import asyncio
# Prefer lazy/explicit initialization to avoid event-loop issues on import.
graph = None
async def build_graph() -> Graph:
tools = await url_component.to_toolkit()
agent.set(
model_name="gpt-4o-mini",
agent_llm="OpenAI",
api_key=os.getenv("OPENAI_API_KEY"),
input_value=chat_input.message_response,
tools=tools,
)
chat_output = cp.ChatOutput().set(input_value=agent.message_response)
return Graph(chat_input, chat_output, log_config=log_config)
if __name__ == "__main__":
# Local manual run: uv run python simple_agent.py
import anyio
graph = anyio.run(build_graph)
🤖 Prompt for AI Agents
In src/backend/tests/data/simple_agent.py around lines 17 and 35, there is an
asyncio.run(...) executed at import time which causes side effects and breaks
under pytest’s event loop; remove the import-time asyncio.run call and instead
export an async factory (e.g. async def create_simple_agent(...)) that
constructs and returns the agent; update any tests to await that factory (or
provide a tiny synchronous helper used only by non-async callers that runs the
coroutine on a fresh event loop), ensuring no coroutine is executed during
module import.

Merged via the queue into main with commit 9b0ef8b Sep 16, 2025
76 of 78 checks passed
@edwinjosechittilappilly edwinjosechittilappilly deleted the fix-aysnc-gettools branch September 16, 2025 18:38
lucaseduoli pushed a commit that referenced this pull request Sep 17, 2025
* refactor: update code hashes and refactor async methods in agent components

- Updated code hashes in multiple starter project JSON files to ensure consistency.
- Refactored agent components to use async methods for tool retrieval, enhancing performance and compatibility with asynchronous operations.
- Adjusted test cases to accommodate the new async behavior in agent workflows.

Co-authored-by: Edwin Jose <edwin.jose@datastax.com>

* fix async issues

* [autofix.ci] apply automated fixes

* update to templates

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

lgtm This PR has been approved by a maintainer refactor Maintenance tasks and housekeeping

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants