From 4ad420b1fa32ef5ed4c5600db4741a0f875f021a Mon Sep 17 00:00:00 2001 From: cristhianzl Date: Thu, 5 Feb 2026 11:16:57 -0300 Subject: [PATCH 1/6] add concatenate and merge operations on tables --- .../processing/test_dataframe_operations.py | 230 ++++++++++++++++++ .../processing/dataframe_operations.py | 96 +++++++- 2 files changed, 324 insertions(+), 2 deletions(-) diff --git a/src/backend/tests/unit/components/processing/test_dataframe_operations.py b/src/backend/tests/unit/components/processing/test_dataframe_operations.py index 3c45fc7a764f..375403be65e8 100644 --- a/src/backend/tests/unit/components/processing/test_dataframe_operations.py +++ b/src/backend/tests/unit/components/processing/test_dataframe_operations.py @@ -280,6 +280,8 @@ def test_filter_fields_show(self, component): "num_rows": {"show": False}, "replace_value": {"show": False}, "replacement_value": {"show": False}, + "merge_on_column": {"show": False}, + "merge_how": {"show": False}, } # Select Filter operation @@ -305,6 +307,8 @@ def test_sort_fields_show(self, component): "num_rows": {"show": False}, "replace_value": {"show": False}, "replacement_value": {"show": False}, + "merge_on_column": {"show": False}, + "merge_how": {"show": False}, } # Select Sort operation @@ -330,6 +334,8 @@ def test_empty_selection_hides_fields(self, component): "num_rows": {"show": True}, "replace_value": {"show": True}, "replacement_value": {"show": True}, + "merge_on_column": {"show": True}, + "merge_how": {"show": True}, } # Deselect operation (empty list) @@ -350,6 +356,8 @@ def test_empty_selection_hides_fields(self, component): assert updated_config["num_rows"]["show"] is False assert updated_config["replace_value"]["show"] is False assert updated_config["replacement_value"]["show"] is False + assert updated_config["merge_on_column"]["show"] is False + assert updated_config["merge_how"]["show"] is False class TestDataTypes: @@ -385,6 +393,228 @@ def test_mixed_data_types(self, component): assert len(result) == 2 # "text" and "more_text" +class TestConcatenateOperation: + """Test concatenate operation for combining multiple DataFrames.""" + + def test_concatenate_two_dataframes(self, component): + """Test concatenating two DataFrames vertically.""" + df1 = DataFrame(pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})) + df2 = DataFrame(pd.DataFrame({"id": [3, 4], "name": ["Charlie", "Diana"]})) + + component.df = [df1, df2] + component.operation = [{"name": "Concatenate", "icon": "combine"}] + + result = component.perform_operation() + + assert len(result) == 4 + assert list(result["id"]) == [1, 2, 3, 4] + assert list(result["name"]) == ["Alice", "Bob", "Charlie", "Diana"] + + def test_concatenate_single_dataframe(self, component): + """Test concatenate with only one DataFrame returns it unchanged.""" + df1 = DataFrame(pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})) + + component.df = [df1] + component.operation = [{"name": "Concatenate", "icon": "combine"}] + + result = component.perform_operation() + + assert len(result) == 2 + assert list(result["id"]) == [1, 2] + + def test_concatenate_different_row_counts(self, component): + """Test concatenating DataFrames with different row counts.""" + df1 = DataFrame(pd.DataFrame({"id": [1, 2, 3], "value": ["a", "b", "c"]})) + df2 = DataFrame(pd.DataFrame({"id": [4, 5], "value": ["d", "e"]})) + + component.df = [df1, df2] + component.operation = [{"name": "Concatenate", "icon": "combine"}] + + result = component.perform_operation() + + assert len(result) == 5 + + +class TestMergeOperation: + """Test merge operation for joining DataFrames.""" + + def test_merge_inner_join(self, component): + """Test inner merge on common column.""" + df1 = DataFrame(pd.DataFrame({"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"]})) + df2 = DataFrame(pd.DataFrame({"id": [2, 3, 4], "city": ["NYC", "LA", "Chicago"]})) + + component.df = [df1, df2] + component.operation = [{"name": "Merge", "icon": "merge"}] + component.merge_on_column = "id" + component.merge_how = "inner" + + result = component.perform_operation() + + assert len(result) == 2 # Only ids 2 and 3 exist in both + assert "name" in result.columns + assert "city" in result.columns + + def test_merge_outer_join(self, component): + """Test outer merge includes all records.""" + df1 = DataFrame(pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})) + df2 = DataFrame(pd.DataFrame({"id": [2, 3], "city": ["NYC", "LA"]})) + + component.df = [df1, df2] + component.operation = [{"name": "Merge", "icon": "merge"}] + component.merge_on_column = "id" + component.merge_how = "outer" + + result = component.perform_operation() + + assert len(result) == 3 # ids 1, 2, 3 + + def test_merge_left_join(self, component): + """Test left merge keeps all records from first DataFrame.""" + df1 = DataFrame(pd.DataFrame({"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"]})) + df2 = DataFrame(pd.DataFrame({"id": [2, 4], "city": ["NYC", "Chicago"]})) + + component.df = [df1, df2] + component.operation = [{"name": "Merge", "icon": "merge"}] + component.merge_on_column = "id" + component.merge_how = "left" + + result = component.perform_operation() + + assert len(result) == 3 # All from df1 + + def test_merge_right_join(self, component): + """Test right merge keeps all records from second DataFrame.""" + df1 = DataFrame(pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})) + df2 = DataFrame(pd.DataFrame({"id": [2, 3, 4], "city": ["NYC", "LA", "Chicago"]})) + + component.df = [df1, df2] + component.operation = [{"name": "Merge", "icon": "merge"}] + component.merge_on_column = "id" + component.merge_how = "right" + + result = component.perform_operation() + + assert len(result) == 3 # All from df2 + + def test_merge_single_dataframe_returns_original(self, component): + """Test merge with single DataFrame returns it unchanged.""" + df1 = DataFrame(pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})) + + component.df = [df1] + component.operation = [{"name": "Merge", "icon": "merge"}] + component.merge_on_column = "id" + component.merge_how = "inner" + + result = component.perform_operation() + + assert len(result) == 2 + + def test_merge_invalid_column_raises_error(self, component): + """Test merge with non-existent column raises ValueError.""" + df1 = DataFrame(pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})) + df2 = DataFrame(pd.DataFrame({"id": [2, 3], "city": ["NYC", "LA"]})) + + component.df = [df1, df2] + component.operation = [{"name": "Merge", "icon": "merge"}] + component.merge_on_column = "non_existent" + component.merge_how = "inner" + + with pytest.raises(ValueError, match="not found in first DataFrame"): + component.perform_operation() + + def test_merge_same_columns_coalesces_values(self, component): + """Test merge with same columns uses coalesce (df1 value or df2 value).""" + df1 = DataFrame(pd.DataFrame({"id": [1, 2], "value": ["a", "b"]})) + df2 = DataFrame(pd.DataFrame({"id": [2, 3], "value": ["x", "y"]})) + + component.df = [df1, df2] + component.operation = [{"name": "Merge", "icon": "merge"}] + component.merge_on_column = "id" + component.merge_how = "outer" + + result = component.perform_operation() + + assert len(result) == 3 + # Check no duplicate columns with _df2 suffix + assert "value_df2" not in result.columns + # id=1 should have value "a" (from df1) + # id=2 should have value "b" (from df1, coalesced) + # id=3 should have value "y" (from df2) + + +class TestListInputHandling: + """Test that component handles list inputs correctly.""" + + def test_operations_use_first_dataframe_from_list(self, component): + """Test that non-merge operations use only the first DataFrame.""" + df1 = DataFrame(pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})) + df2 = DataFrame(pd.DataFrame({"id": [3, 4], "name": ["Charlie", "Diana"]})) + + component.df = [df1, df2] + component.operation = [{"name": "Head", "icon": "arrow-up"}] + component.num_rows = 1 + + result = component.perform_operation() + + assert len(result) == 1 + assert result.iloc[0]["name"] == "Alice" # From first DataFrame + + +class TestMergeDynamicUI: + """Test dynamic UI for Merge and Concatenate operations.""" + + def test_merge_fields_show(self, component): + """Test that merge fields show when Merge is selected.""" + build_config = { + "column_name": {"show": False}, + "filter_value": {"show": False}, + "filter_operator": {"show": False}, + "ascending": {"show": False}, + "new_column_name": {"show": False}, + "new_column_value": {"show": False}, + "columns_to_select": {"show": False}, + "num_rows": {"show": False}, + "replace_value": {"show": False}, + "replacement_value": {"show": False}, + "merge_on_column": {"show": False}, + "merge_how": {"show": False}, + } + + updated_config = component.update_build_config( + build_config, [{"name": "Merge", "icon": "merge"}], "operation" + ) + + assert updated_config["merge_on_column"]["show"] is True + assert updated_config["merge_how"]["show"] is True + assert updated_config["column_name"]["show"] is False + + def test_concatenate_hides_all_extra_fields(self, component): + """Test that Concatenate operation hides all extra fields.""" + build_config = { + "column_name": {"show": True}, + "filter_value": {"show": True}, + "filter_operator": {"show": True}, + "ascending": {"show": True}, + "new_column_name": {"show": True}, + "new_column_value": {"show": True}, + "columns_to_select": {"show": True}, + "num_rows": {"show": True}, + "replace_value": {"show": True}, + "replacement_value": {"show": True}, + "merge_on_column": {"show": True}, + "merge_how": {"show": True}, + } + + updated_config = component.update_build_config( + build_config, [{"name": "Concatenate", "icon": "combine"}], "operation" + ) + + # Concatenate doesn't need any extra fields + assert updated_config["column_name"]["show"] is False + assert updated_config["merge_on_column"]["show"] is False + assert updated_config["merge_how"]["show"] is False + + # Integration test to verify all operators work together def test_all_filter_operators_comprehensive(): """Comprehensive test of all filter operators on the same dataset.""" diff --git a/src/lfx/src/lfx/components/processing/dataframe_operations.py b/src/lfx/src/lfx/components/processing/dataframe_operations.py index fcda0c640c06..2b29d5115732 100644 --- a/src/lfx/src/lfx/components/processing/dataframe_operations.py +++ b/src/lfx/src/lfx/components/processing/dataframe_operations.py @@ -16,9 +16,11 @@ class DataFrameOperationsComponent(Component): OPERATION_CHOICES = [ "Add Column", + "Concatenate", "Drop Column", "Filter", "Head", + "Merge", "Rename Column", "Replace Value", "Select Columns", @@ -31,8 +33,9 @@ class DataFrameOperationsComponent(Component): DataFrameInput( name="df", display_name="DataFrame", - info="The input DataFrame to operate on.", + info="The input DataFrame to operate on. Connect multiple DataFrames for merge operations.", required=True, + is_list=True, ), SortableListInput( name="operation", @@ -41,9 +44,11 @@ class DataFrameOperationsComponent(Component): info="Select the DataFrame operation to perform.", options=[ {"name": "Add Column", "icon": "plus"}, + {"name": "Concatenate", "icon": "combine"}, {"name": "Drop Column", "icon": "minus"}, {"name": "Filter", "icon": "filter"}, {"name": "Head", "icon": "arrow-up"}, + {"name": "Merge", "icon": "merge"}, {"name": "Rename Column", "icon": "pencil"}, {"name": "Replace Value", "icon": "replace"}, {"name": "Select Columns", "icon": "columns"}, @@ -138,6 +143,22 @@ class DataFrameOperationsComponent(Component): dynamic=True, show=False, ), + StrInput( + name="merge_on_column", + display_name="Merge On Column", + info="The column name to merge DataFrames on. Must exist in both DataFrames.", + dynamic=True, + show=False, + ), + DropdownInput( + name="merge_how", + display_name="Merge Type", + options=["inner", "outer", "left", "right"], + value="inner", + info="Type of merge: inner (intersection), outer (union), left, or right.", + dynamic=True, + show=False, + ), ] outputs = [ @@ -161,6 +182,8 @@ def update_build_config(self, build_config, field_value, field_name=None): "num_rows", "replace_value", "replacement_value", + "merge_on_column", + "merge_how", ] for field in dynamic_fields: build_config[field]["show"] = False @@ -201,11 +224,20 @@ def update_build_config(self, build_config, field_value, field_name=None): build_config["replacement_value"]["show"] = True elif operation_name == "Drop Duplicates": build_config["column_name"]["show"] = True + elif operation_name == "Merge": + build_config["merge_on_column"]["show"] = True + build_config["merge_how"]["show"] = True return build_config + def _get_primary_dataframe(self) -> DataFrame: + """Get the first DataFrame from input (handles both single and list inputs).""" + if isinstance(self.df, list): + return self.df[0].copy() if self.df else DataFrame() + return self.df.copy() + def perform_operation(self) -> DataFrame: - df_copy = self.df.copy() + df_copy = self._get_primary_dataframe() # Handle SortableListInput format for operation operation_input = getattr(self, "operation", []) @@ -238,6 +270,10 @@ def perform_operation(self) -> DataFrame: return self.replace_values(df_copy) if op == "Drop Duplicates": return self.drop_duplicates(df_copy) + if op == "Concatenate": + return self.concatenate_dataframes() + if op == "Merge": + return self.merge_dataframes() msg = f"Unsupported operation: {op}" logger.error(msg) raise ValueError(msg) @@ -311,3 +347,59 @@ def replace_values(self, df: DataFrame) -> DataFrame: def drop_duplicates(self, df: DataFrame) -> DataFrame: return DataFrame(df.drop_duplicates(subset=self.column_name)) + + def concatenate_dataframes(self) -> DataFrame: + """Concatenate multiple DataFrames vertically (stack rows).""" + if not isinstance(self.df, list) or len(self.df) == 0: + return self.df.copy() if self.df is not None else DataFrame() + + # If only one DataFrame, return it + if len(self.df) == 1: + return self.df[0].copy() + + # Concatenate all DataFrames vertically + concatenated = pd.concat(self.df, ignore_index=True) + return DataFrame(concatenated) + + def merge_dataframes(self) -> DataFrame: + """Merge two DataFrames based on a common column (join operation).""" + if not isinstance(self.df, list) or len(self.df) == 0: + return self.df.copy() if self.df is not None else DataFrame() + + # If only one DataFrame, return it + if len(self.df) == 1: + return self.df[0].copy() + + df1 = self.df[0].copy() + df2 = self.df[1].copy() + + merge_on = getattr(self, "merge_on_column", None) + merge_how = getattr(self, "merge_how", "inner") + + # If merge column specified, validate it exists in both DataFrames + if merge_on: + if merge_on not in df1.columns: + msg = f"Column '{merge_on}' not found in first DataFrame. Available: {list(df1.columns)}" + raise ValueError(msg) + if merge_on not in df2.columns: + msg = f"Column '{merge_on}' not found in second DataFrame. Available: {list(df2.columns)}" + raise ValueError(msg) + + merged = df1.merge(df2, on=merge_on, how=merge_how, suffixes=("", "_df2")) + else: + merged = df1.merge(df2, left_index=True, right_index=True, how=merge_how, suffixes=("", "_df2")) + + # Combine duplicate columns: use df1 value if exists, otherwise df2 value + cols_to_drop = [] + for col in merged.columns: + if col.endswith("_df2"): + original_col = col[:-4] # Remove "_df2" suffix + if original_col in merged.columns: + # Coalesce: use original if not null, otherwise use _df2 + merged[original_col] = merged[original_col].combine_first(merged[col]) + cols_to_drop.append(col) + + if cols_to_drop: + merged = merged.drop(columns=cols_to_drop) + + return DataFrame(merged) From 9336e3208862fcf110ca2e493932dc1591fa76fb Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 5 Feb 2026 14:21:51 +0000 Subject: [PATCH 2/6] [autofix.ci] apply automated fixes --- .../unit/components/processing/test_dataframe_operations.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/backend/tests/unit/components/processing/test_dataframe_operations.py b/src/backend/tests/unit/components/processing/test_dataframe_operations.py index 375403be65e8..8c9529a096f0 100644 --- a/src/backend/tests/unit/components/processing/test_dataframe_operations.py +++ b/src/backend/tests/unit/components/processing/test_dataframe_operations.py @@ -580,9 +580,7 @@ def test_merge_fields_show(self, component): "merge_how": {"show": False}, } - updated_config = component.update_build_config( - build_config, [{"name": "Merge", "icon": "merge"}], "operation" - ) + updated_config = component.update_build_config(build_config, [{"name": "Merge", "icon": "merge"}], "operation") assert updated_config["merge_on_column"]["show"] is True assert updated_config["merge_how"]["show"] is True From 2801e70516ff35af683262ca3d8bd0eda3f5ee95 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 5 Feb 2026 14:24:19 +0000 Subject: [PATCH 3/6] [autofix.ci] apply automated fixes (attempt 2/3) --- src/lfx/src/lfx/_assets/component_index.json | 72 +++++++++++++++++-- .../src/lfx/_assets/stable_hash_history.json | 2 +- 2 files changed, 67 insertions(+), 7 deletions(-) diff --git a/src/lfx/src/lfx/_assets/component_index.json b/src/lfx/src/lfx/_assets/component_index.json index 5b665a26326a..f79a32f4db00 100644 --- a/src/lfx/src/lfx/_assets/component_index.json +++ b/src/lfx/src/lfx/_assets/component_index.json @@ -97941,13 +97941,15 @@ "columns_to_select", "num_rows", "replace_value", - "replacement_value" + "replacement_value", + "merge_on_column", + "merge_how" ], "frozen": false, "icon": "table", "legacy": false, "metadata": { - "code_hash": "904f4eaebccd", + "code_hash": "4e4612f8cf41", "dependencies": { "dependencies": [ { @@ -98020,7 +98022,7 @@ "show": true, "title_case": false, "type": "code", - "value": "import pandas as pd\n\nfrom lfx.custom.custom_component.component import Component\nfrom lfx.inputs import SortableListInput\nfrom lfx.io import BoolInput, DataFrameInput, DropdownInput, IntInput, MessageTextInput, Output, StrInput\nfrom lfx.log.logger import logger\nfrom lfx.schema.dataframe import DataFrame\n\n\nclass DataFrameOperationsComponent(Component):\n display_name = \"DataFrame Operations\"\n description = \"Perform various operations on a DataFrame.\"\n documentation: str = \"https://docs.langflow.org/dataframe-operations\"\n icon = \"table\"\n name = \"DataFrameOperations\"\n\n OPERATION_CHOICES = [\n \"Add Column\",\n \"Drop Column\",\n \"Filter\",\n \"Head\",\n \"Rename Column\",\n \"Replace Value\",\n \"Select Columns\",\n \"Sort\",\n \"Tail\",\n \"Drop Duplicates\",\n ]\n\n inputs = [\n DataFrameInput(\n name=\"df\",\n display_name=\"DataFrame\",\n info=\"The input DataFrame to operate on.\",\n required=True,\n ),\n SortableListInput(\n name=\"operation\",\n display_name=\"Operation\",\n placeholder=\"Select Operation\",\n info=\"Select the DataFrame operation to perform.\",\n options=[\n {\"name\": \"Add Column\", \"icon\": \"plus\"},\n {\"name\": \"Drop Column\", \"icon\": \"minus\"},\n {\"name\": \"Filter\", \"icon\": \"filter\"},\n {\"name\": \"Head\", \"icon\": \"arrow-up\"},\n {\"name\": \"Rename Column\", \"icon\": \"pencil\"},\n {\"name\": \"Replace Value\", \"icon\": \"replace\"},\n {\"name\": \"Select Columns\", \"icon\": \"columns\"},\n {\"name\": \"Sort\", \"icon\": \"arrow-up-down\"},\n {\"name\": \"Tail\", \"icon\": \"arrow-down\"},\n {\"name\": \"Drop Duplicates\", \"icon\": \"copy-x\"},\n ],\n real_time_refresh=True,\n limit=1,\n ),\n StrInput(\n name=\"column_name\",\n display_name=\"Column Name\",\n info=\"The column name to use for the operation.\",\n dynamic=True,\n show=False,\n ),\n MessageTextInput(\n name=\"filter_value\",\n display_name=\"Filter Value\",\n info=\"The value to filter rows by.\",\n dynamic=True,\n show=False,\n ),\n DropdownInput(\n name=\"filter_operator\",\n display_name=\"Filter Operator\",\n options=[\n \"equals\",\n \"not equals\",\n \"contains\",\n \"not contains\",\n \"starts with\",\n \"ends with\",\n \"greater than\",\n \"less than\",\n ],\n value=\"equals\",\n info=\"The operator to apply for filtering rows.\",\n advanced=False,\n dynamic=True,\n show=False,\n ),\n BoolInput(\n name=\"ascending\",\n display_name=\"Sort Ascending\",\n info=\"Whether to sort in ascending order.\",\n dynamic=True,\n show=False,\n value=True,\n ),\n StrInput(\n name=\"new_column_name\",\n display_name=\"New Column Name\",\n info=\"The new column name when renaming or adding a column.\",\n dynamic=True,\n show=False,\n ),\n MessageTextInput(\n name=\"new_column_value\",\n display_name=\"New Column Value\",\n info=\"The value to populate the new column with.\",\n dynamic=True,\n show=False,\n ),\n StrInput(\n name=\"columns_to_select\",\n display_name=\"Columns to Select\",\n dynamic=True,\n is_list=True,\n show=False,\n ),\n IntInput(\n name=\"num_rows\",\n display_name=\"Number of Rows\",\n info=\"Number of rows to return (for head/tail).\",\n dynamic=True,\n show=False,\n value=5,\n ),\n MessageTextInput(\n name=\"replace_value\",\n display_name=\"Value to Replace\",\n info=\"The value to replace in the column.\",\n dynamic=True,\n show=False,\n ),\n MessageTextInput(\n name=\"replacement_value\",\n display_name=\"Replacement Value\",\n info=\"The value to replace with.\",\n dynamic=True,\n show=False,\n ),\n ]\n\n outputs = [\n Output(\n display_name=\"DataFrame\",\n name=\"output\",\n method=\"perform_operation\",\n info=\"The resulting DataFrame after the operation.\",\n )\n ]\n\n def update_build_config(self, build_config, field_value, field_name=None):\n dynamic_fields = [\n \"column_name\",\n \"filter_value\",\n \"filter_operator\",\n \"ascending\",\n \"new_column_name\",\n \"new_column_value\",\n \"columns_to_select\",\n \"num_rows\",\n \"replace_value\",\n \"replacement_value\",\n ]\n for field in dynamic_fields:\n build_config[field][\"show\"] = False\n\n if field_name == \"operation\":\n # Handle SortableListInput format\n if isinstance(field_value, list):\n operation_name = field_value[0].get(\"name\", \"\") if field_value else \"\"\n else:\n operation_name = field_value or \"\"\n\n # If no operation selected, all dynamic fields stay hidden (already set to False above)\n if not operation_name:\n return build_config\n\n if operation_name == \"Filter\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"filter_value\"][\"show\"] = True\n build_config[\"filter_operator\"][\"show\"] = True\n elif operation_name == \"Sort\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"ascending\"][\"show\"] = True\n elif operation_name == \"Drop Column\":\n build_config[\"column_name\"][\"show\"] = True\n elif operation_name == \"Rename Column\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"new_column_name\"][\"show\"] = True\n elif operation_name == \"Add Column\":\n build_config[\"new_column_name\"][\"show\"] = True\n build_config[\"new_column_value\"][\"show\"] = True\n elif operation_name == \"Select Columns\":\n build_config[\"columns_to_select\"][\"show\"] = True\n elif operation_name in {\"Head\", \"Tail\"}:\n build_config[\"num_rows\"][\"show\"] = True\n elif operation_name == \"Replace Value\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"replace_value\"][\"show\"] = True\n build_config[\"replacement_value\"][\"show\"] = True\n elif operation_name == \"Drop Duplicates\":\n build_config[\"column_name\"][\"show\"] = True\n\n return build_config\n\n def perform_operation(self) -> DataFrame:\n df_copy = self.df.copy()\n\n # Handle SortableListInput format for operation\n operation_input = getattr(self, \"operation\", [])\n if isinstance(operation_input, list) and len(operation_input) > 0:\n op = operation_input[0].get(\"name\", \"\")\n else:\n op = \"\"\n\n # If no operation selected, return original DataFrame\n if not op:\n return df_copy\n\n if op == \"Filter\":\n return self.filter_rows_by_value(df_copy)\n if op == \"Sort\":\n return self.sort_by_column(df_copy)\n if op == \"Drop Column\":\n return self.drop_column(df_copy)\n if op == \"Rename Column\":\n return self.rename_column(df_copy)\n if op == \"Add Column\":\n return self.add_column(df_copy)\n if op == \"Select Columns\":\n return self.select_columns(df_copy)\n if op == \"Head\":\n return self.head(df_copy)\n if op == \"Tail\":\n return self.tail(df_copy)\n if op == \"Replace Value\":\n return self.replace_values(df_copy)\n if op == \"Drop Duplicates\":\n return self.drop_duplicates(df_copy)\n msg = f\"Unsupported operation: {op}\"\n logger.error(msg)\n raise ValueError(msg)\n\n def filter_rows_by_value(self, df: DataFrame) -> DataFrame:\n column = df[self.column_name]\n filter_value = self.filter_value\n\n # Handle regular DropdownInput format (just a string value)\n operator = getattr(self, \"filter_operator\", \"equals\") # Default to equals for backward compatibility\n\n if operator == \"equals\":\n mask = column == filter_value\n elif operator == \"not equals\":\n mask = column != filter_value\n elif operator == \"contains\":\n mask = column.astype(str).str.contains(str(filter_value), na=False)\n elif operator == \"not contains\":\n mask = ~column.astype(str).str.contains(str(filter_value), na=False)\n elif operator == \"starts with\":\n mask = column.astype(str).str.startswith(str(filter_value), na=False)\n elif operator == \"ends with\":\n mask = column.astype(str).str.endswith(str(filter_value), na=False)\n elif operator == \"greater than\":\n try:\n # Try to convert filter_value to numeric for comparison\n numeric_value = pd.to_numeric(filter_value)\n mask = column > numeric_value\n except (ValueError, TypeError):\n # If conversion fails, compare as strings\n mask = column.astype(str) > str(filter_value)\n elif operator == \"less than\":\n try:\n # Try to convert filter_value to numeric for comparison\n numeric_value = pd.to_numeric(filter_value)\n mask = column < numeric_value\n except (ValueError, TypeError):\n # If conversion fails, compare as strings\n mask = column.astype(str) < str(filter_value)\n else:\n mask = column == filter_value # Fallback to equals\n\n return DataFrame(df[mask])\n\n def sort_by_column(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.sort_values(by=self.column_name, ascending=self.ascending))\n\n def drop_column(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.drop(columns=[self.column_name]))\n\n def rename_column(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.rename(columns={self.column_name: self.new_column_name}))\n\n def add_column(self, df: DataFrame) -> DataFrame:\n df[self.new_column_name] = [self.new_column_value] * len(df)\n return DataFrame(df)\n\n def select_columns(self, df: DataFrame) -> DataFrame:\n columns = [col.strip() for col in self.columns_to_select]\n return DataFrame(df[columns])\n\n def head(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.head(self.num_rows))\n\n def tail(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.tail(self.num_rows))\n\n def replace_values(self, df: DataFrame) -> DataFrame:\n df[self.column_name] = df[self.column_name].replace(self.replace_value, self.replacement_value)\n return DataFrame(df)\n\n def drop_duplicates(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.drop_duplicates(subset=self.column_name))\n" + "value": "import pandas as pd\n\nfrom lfx.custom.custom_component.component import Component\nfrom lfx.inputs import SortableListInput\nfrom lfx.io import BoolInput, DataFrameInput, DropdownInput, IntInput, MessageTextInput, Output, StrInput\nfrom lfx.log.logger import logger\nfrom lfx.schema.dataframe import DataFrame\n\n\nclass DataFrameOperationsComponent(Component):\n display_name = \"DataFrame Operations\"\n description = \"Perform various operations on a DataFrame.\"\n documentation: str = \"https://docs.langflow.org/dataframe-operations\"\n icon = \"table\"\n name = \"DataFrameOperations\"\n\n OPERATION_CHOICES = [\n \"Add Column\",\n \"Concatenate\",\n \"Drop Column\",\n \"Filter\",\n \"Head\",\n \"Merge\",\n \"Rename Column\",\n \"Replace Value\",\n \"Select Columns\",\n \"Sort\",\n \"Tail\",\n \"Drop Duplicates\",\n ]\n\n inputs = [\n DataFrameInput(\n name=\"df\",\n display_name=\"DataFrame\",\n info=\"The input DataFrame to operate on. Connect multiple DataFrames for merge operations.\",\n required=True,\n is_list=True,\n ),\n SortableListInput(\n name=\"operation\",\n display_name=\"Operation\",\n placeholder=\"Select Operation\",\n info=\"Select the DataFrame operation to perform.\",\n options=[\n {\"name\": \"Add Column\", \"icon\": \"plus\"},\n {\"name\": \"Concatenate\", \"icon\": \"combine\"},\n {\"name\": \"Drop Column\", \"icon\": \"minus\"},\n {\"name\": \"Filter\", \"icon\": \"filter\"},\n {\"name\": \"Head\", \"icon\": \"arrow-up\"},\n {\"name\": \"Merge\", \"icon\": \"merge\"},\n {\"name\": \"Rename Column\", \"icon\": \"pencil\"},\n {\"name\": \"Replace Value\", \"icon\": \"replace\"},\n {\"name\": \"Select Columns\", \"icon\": \"columns\"},\n {\"name\": \"Sort\", \"icon\": \"arrow-up-down\"},\n {\"name\": \"Tail\", \"icon\": \"arrow-down\"},\n {\"name\": \"Drop Duplicates\", \"icon\": \"copy-x\"},\n ],\n real_time_refresh=True,\n limit=1,\n ),\n StrInput(\n name=\"column_name\",\n display_name=\"Column Name\",\n info=\"The column name to use for the operation.\",\n dynamic=True,\n show=False,\n ),\n MessageTextInput(\n name=\"filter_value\",\n display_name=\"Filter Value\",\n info=\"The value to filter rows by.\",\n dynamic=True,\n show=False,\n ),\n DropdownInput(\n name=\"filter_operator\",\n display_name=\"Filter Operator\",\n options=[\n \"equals\",\n \"not equals\",\n \"contains\",\n \"not contains\",\n \"starts with\",\n \"ends with\",\n \"greater than\",\n \"less than\",\n ],\n value=\"equals\",\n info=\"The operator to apply for filtering rows.\",\n advanced=False,\n dynamic=True,\n show=False,\n ),\n BoolInput(\n name=\"ascending\",\n display_name=\"Sort Ascending\",\n info=\"Whether to sort in ascending order.\",\n dynamic=True,\n show=False,\n value=True,\n ),\n StrInput(\n name=\"new_column_name\",\n display_name=\"New Column Name\",\n info=\"The new column name when renaming or adding a column.\",\n dynamic=True,\n show=False,\n ),\n MessageTextInput(\n name=\"new_column_value\",\n display_name=\"New Column Value\",\n info=\"The value to populate the new column with.\",\n dynamic=True,\n show=False,\n ),\n StrInput(\n name=\"columns_to_select\",\n display_name=\"Columns to Select\",\n dynamic=True,\n is_list=True,\n show=False,\n ),\n IntInput(\n name=\"num_rows\",\n display_name=\"Number of Rows\",\n info=\"Number of rows to return (for head/tail).\",\n dynamic=True,\n show=False,\n value=5,\n ),\n MessageTextInput(\n name=\"replace_value\",\n display_name=\"Value to Replace\",\n info=\"The value to replace in the column.\",\n dynamic=True,\n show=False,\n ),\n MessageTextInput(\n name=\"replacement_value\",\n display_name=\"Replacement Value\",\n info=\"The value to replace with.\",\n dynamic=True,\n show=False,\n ),\n StrInput(\n name=\"merge_on_column\",\n display_name=\"Merge On Column\",\n info=\"The column name to merge DataFrames on. Must exist in both DataFrames.\",\n dynamic=True,\n show=False,\n ),\n DropdownInput(\n name=\"merge_how\",\n display_name=\"Merge Type\",\n options=[\"inner\", \"outer\", \"left\", \"right\"],\n value=\"inner\",\n info=\"Type of merge: inner (intersection), outer (union), left, or right.\",\n dynamic=True,\n show=False,\n ),\n ]\n\n outputs = [\n Output(\n display_name=\"DataFrame\",\n name=\"output\",\n method=\"perform_operation\",\n info=\"The resulting DataFrame after the operation.\",\n )\n ]\n\n def update_build_config(self, build_config, field_value, field_name=None):\n dynamic_fields = [\n \"column_name\",\n \"filter_value\",\n \"filter_operator\",\n \"ascending\",\n \"new_column_name\",\n \"new_column_value\",\n \"columns_to_select\",\n \"num_rows\",\n \"replace_value\",\n \"replacement_value\",\n \"merge_on_column\",\n \"merge_how\",\n ]\n for field in dynamic_fields:\n build_config[field][\"show\"] = False\n\n if field_name == \"operation\":\n # Handle SortableListInput format\n if isinstance(field_value, list):\n operation_name = field_value[0].get(\"name\", \"\") if field_value else \"\"\n else:\n operation_name = field_value or \"\"\n\n # If no operation selected, all dynamic fields stay hidden (already set to False above)\n if not operation_name:\n return build_config\n\n if operation_name == \"Filter\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"filter_value\"][\"show\"] = True\n build_config[\"filter_operator\"][\"show\"] = True\n elif operation_name == \"Sort\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"ascending\"][\"show\"] = True\n elif operation_name == \"Drop Column\":\n build_config[\"column_name\"][\"show\"] = True\n elif operation_name == \"Rename Column\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"new_column_name\"][\"show\"] = True\n elif operation_name == \"Add Column\":\n build_config[\"new_column_name\"][\"show\"] = True\n build_config[\"new_column_value\"][\"show\"] = True\n elif operation_name == \"Select Columns\":\n build_config[\"columns_to_select\"][\"show\"] = True\n elif operation_name in {\"Head\", \"Tail\"}:\n build_config[\"num_rows\"][\"show\"] = True\n elif operation_name == \"Replace Value\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"replace_value\"][\"show\"] = True\n build_config[\"replacement_value\"][\"show\"] = True\n elif operation_name == \"Drop Duplicates\":\n build_config[\"column_name\"][\"show\"] = True\n elif operation_name == \"Merge\":\n build_config[\"merge_on_column\"][\"show\"] = True\n build_config[\"merge_how\"][\"show\"] = True\n\n return build_config\n\n def _get_primary_dataframe(self) -> DataFrame:\n \"\"\"Get the first DataFrame from input (handles both single and list inputs).\"\"\"\n if isinstance(self.df, list):\n return self.df[0].copy() if self.df else DataFrame()\n return self.df.copy()\n\n def perform_operation(self) -> DataFrame:\n df_copy = self._get_primary_dataframe()\n\n # Handle SortableListInput format for operation\n operation_input = getattr(self, \"operation\", [])\n if isinstance(operation_input, list) and len(operation_input) > 0:\n op = operation_input[0].get(\"name\", \"\")\n else:\n op = \"\"\n\n # If no operation selected, return original DataFrame\n if not op:\n return df_copy\n\n if op == \"Filter\":\n return self.filter_rows_by_value(df_copy)\n if op == \"Sort\":\n return self.sort_by_column(df_copy)\n if op == \"Drop Column\":\n return self.drop_column(df_copy)\n if op == \"Rename Column\":\n return self.rename_column(df_copy)\n if op == \"Add Column\":\n return self.add_column(df_copy)\n if op == \"Select Columns\":\n return self.select_columns(df_copy)\n if op == \"Head\":\n return self.head(df_copy)\n if op == \"Tail\":\n return self.tail(df_copy)\n if op == \"Replace Value\":\n return self.replace_values(df_copy)\n if op == \"Drop Duplicates\":\n return self.drop_duplicates(df_copy)\n if op == \"Concatenate\":\n return self.concatenate_dataframes()\n if op == \"Merge\":\n return self.merge_dataframes()\n msg = f\"Unsupported operation: {op}\"\n logger.error(msg)\n raise ValueError(msg)\n\n def filter_rows_by_value(self, df: DataFrame) -> DataFrame:\n column = df[self.column_name]\n filter_value = self.filter_value\n\n # Handle regular DropdownInput format (just a string value)\n operator = getattr(self, \"filter_operator\", \"equals\") # Default to equals for backward compatibility\n\n if operator == \"equals\":\n mask = column == filter_value\n elif operator == \"not equals\":\n mask = column != filter_value\n elif operator == \"contains\":\n mask = column.astype(str).str.contains(str(filter_value), na=False)\n elif operator == \"not contains\":\n mask = ~column.astype(str).str.contains(str(filter_value), na=False)\n elif operator == \"starts with\":\n mask = column.astype(str).str.startswith(str(filter_value), na=False)\n elif operator == \"ends with\":\n mask = column.astype(str).str.endswith(str(filter_value), na=False)\n elif operator == \"greater than\":\n try:\n # Try to convert filter_value to numeric for comparison\n numeric_value = pd.to_numeric(filter_value)\n mask = column > numeric_value\n except (ValueError, TypeError):\n # If conversion fails, compare as strings\n mask = column.astype(str) > str(filter_value)\n elif operator == \"less than\":\n try:\n # Try to convert filter_value to numeric for comparison\n numeric_value = pd.to_numeric(filter_value)\n mask = column < numeric_value\n except (ValueError, TypeError):\n # If conversion fails, compare as strings\n mask = column.astype(str) < str(filter_value)\n else:\n mask = column == filter_value # Fallback to equals\n\n return DataFrame(df[mask])\n\n def sort_by_column(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.sort_values(by=self.column_name, ascending=self.ascending))\n\n def drop_column(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.drop(columns=[self.column_name]))\n\n def rename_column(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.rename(columns={self.column_name: self.new_column_name}))\n\n def add_column(self, df: DataFrame) -> DataFrame:\n df[self.new_column_name] = [self.new_column_value] * len(df)\n return DataFrame(df)\n\n def select_columns(self, df: DataFrame) -> DataFrame:\n columns = [col.strip() for col in self.columns_to_select]\n return DataFrame(df[columns])\n\n def head(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.head(self.num_rows))\n\n def tail(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.tail(self.num_rows))\n\n def replace_values(self, df: DataFrame) -> DataFrame:\n df[self.column_name] = df[self.column_name].replace(self.replace_value, self.replacement_value)\n return DataFrame(df)\n\n def drop_duplicates(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.drop_duplicates(subset=self.column_name))\n\n def concatenate_dataframes(self) -> DataFrame:\n \"\"\"Concatenate multiple DataFrames vertically (stack rows).\"\"\"\n if not isinstance(self.df, list) or len(self.df) == 0:\n return self.df.copy() if self.df is not None else DataFrame()\n\n # If only one DataFrame, return it\n if len(self.df) == 1:\n return self.df[0].copy()\n\n # Concatenate all DataFrames vertically\n concatenated = pd.concat(self.df, ignore_index=True)\n return DataFrame(concatenated)\n\n def merge_dataframes(self) -> DataFrame:\n \"\"\"Merge two DataFrames based on a common column (join operation).\"\"\"\n if not isinstance(self.df, list) or len(self.df) == 0:\n return self.df.copy() if self.df is not None else DataFrame()\n\n # If only one DataFrame, return it\n if len(self.df) == 1:\n return self.df[0].copy()\n\n df1 = self.df[0].copy()\n df2 = self.df[1].copy()\n\n merge_on = getattr(self, \"merge_on_column\", None)\n merge_how = getattr(self, \"merge_how\", \"inner\")\n\n # If merge column specified, validate it exists in both DataFrames\n if merge_on:\n if merge_on not in df1.columns:\n msg = f\"Column '{merge_on}' not found in first DataFrame. Available: {list(df1.columns)}\"\n raise ValueError(msg)\n if merge_on not in df2.columns:\n msg = f\"Column '{merge_on}' not found in second DataFrame. Available: {list(df2.columns)}\"\n raise ValueError(msg)\n\n merged = df1.merge(df2, on=merge_on, how=merge_how, suffixes=(\"\", \"_df2\"))\n else:\n merged = df1.merge(df2, left_index=True, right_index=True, how=merge_how, suffixes=(\"\", \"_df2\"))\n\n # Combine duplicate columns: use df1 value if exists, otherwise df2 value\n cols_to_drop = []\n for col in merged.columns:\n if col.endswith(\"_df2\"):\n original_col = col[:-4] # Remove \"_df2\" suffix\n if original_col in merged.columns:\n # Coalesce: use original if not null, otherwise use _df2\n merged[original_col] = merged[original_col].combine_first(merged[col])\n cols_to_drop.append(col)\n\n if cols_to_drop:\n merged = merged.drop(columns=cols_to_drop)\n\n return DataFrame(merged)\n" }, "column_name": { "_input_type": "StrInput", @@ -98069,11 +98071,11 @@ "advanced": false, "display_name": "DataFrame", "dynamic": false, - "info": "The input DataFrame to operate on.", + "info": "The input DataFrame to operate on. Connect multiple DataFrames for merge operations.", "input_types": [ "DataFrame" ], - "list": false, + "list": true, "list_add_label": "Add More", "name": "df", "override_skip": false, @@ -98146,6 +98148,56 @@ "type": "str", "value": "" }, + "merge_how": { + "_input_type": "DropdownInput", + "advanced": false, + "combobox": false, + "dialog_inputs": {}, + "display_name": "Merge Type", + "dynamic": true, + "external_options": {}, + "info": "Type of merge: inner (intersection), outer (union), left, or right.", + "name": "merge_how", + "options": [ + "inner", + "outer", + "left", + "right" + ], + "options_metadata": [], + "override_skip": false, + "placeholder": "", + "required": false, + "show": false, + "title_case": false, + "toggle": false, + "tool_mode": false, + "trace_as_metadata": true, + "track_in_telemetry": true, + "type": "str", + "value": "inner" + }, + "merge_on_column": { + "_input_type": "StrInput", + "advanced": false, + "display_name": "Merge On Column", + "dynamic": true, + "info": "The column name to merge DataFrames on. Must exist in both DataFrames.", + "list": false, + "list_add_label": "Add More", + "load_from_db": false, + "name": "merge_on_column", + "override_skip": false, + "placeholder": "", + "required": false, + "show": false, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "track_in_telemetry": false, + "type": "str", + "value": "" + }, "new_column_name": { "_input_type": "StrInput", "advanced": false, @@ -98225,6 +98277,10 @@ "icon": "plus", "name": "Add Column" }, + { + "icon": "combine", + "name": "Concatenate" + }, { "icon": "minus", "name": "Drop Column" @@ -98237,6 +98293,10 @@ "icon": "arrow-up", "name": "Head" }, + { + "icon": "merge", + "name": "Merge" + }, { "icon": "pencil", "name": "Rename Column" @@ -116041,6 +116101,6 @@ "num_components": 355, "num_modules": 95 }, - "sha256": "e8a24b8395468db83eee41b97c16d8b42dd9fcded087cae67aa44eaacd3aa3d5", + "sha256": "e76091e51bfc1a1675415f87b3fa9fe5609ac5788442ee8e9ae1e62727a48461", "version": "0.3.0" } \ No newline at end of file diff --git a/src/lfx/src/lfx/_assets/stable_hash_history.json b/src/lfx/src/lfx/_assets/stable_hash_history.json index 53b18200326a..e7c35ab4bd52 100644 --- a/src/lfx/src/lfx/_assets/stable_hash_history.json +++ b/src/lfx/src/lfx/_assets/stable_hash_history.json @@ -1381,7 +1381,7 @@ }, "DataFrameOperations": { "versions": { - "0.3.0": "904f4eaebccd" + "0.3.0": "4e4612f8cf41" } }, "DynamicCreateData": { From 3037dc78fc441d037a343ff635431b668cd1aaf1 Mon Sep 17 00:00:00 2001 From: cristhianzl Date: Thu, 5 Feb 2026 11:39:02 -0300 Subject: [PATCH 4/6] add code rabbit suggestions --- .../processing/test_dataframe_operations.py | 33 ++++++++++++------- .../processing/dataframe_operations.py | 16 ++++++--- 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/src/backend/tests/unit/components/processing/test_dataframe_operations.py b/src/backend/tests/unit/components/processing/test_dataframe_operations.py index 375403be65e8..4ea3dde5c566 100644 --- a/src/backend/tests/unit/components/processing/test_dataframe_operations.py +++ b/src/backend/tests/unit/components/processing/test_dataframe_operations.py @@ -222,14 +222,12 @@ def test_empty_selection(self, component, sample_dataframe): assert list(result.columns) == list(sample_dataframe.columns) def test_invalid_operation_format(self, component, sample_dataframe): - """Test with invalid operation format.""" + """Test with invalid operation format raises error.""" component.df = sample_dataframe component.operation = "Invalid String" # Not list format - result = component.perform_operation() - - # Should return original DataFrame - assert len(result) == len(sample_dataframe) + with pytest.raises(ValueError, match="Unsupported operation"): + component.perform_operation() def test_empty_dataframe(self, component): """Test operations on empty DataFrame.""" @@ -537,9 +535,24 @@ def test_merge_same_columns_coalesces_values(self, component): assert len(result) == 3 # Check no duplicate columns with _df2 suffix assert "value_df2" not in result.columns - # id=1 should have value "a" (from df1) - # id=2 should have value "b" (from df1, coalesced) - # id=3 should have value "y" (from df2) + # Verify coalesced values + assert result.loc[result["id"] == 1, "value"].iloc[0] == "a" # from df1 + assert result.loc[result["id"] == 2, "value"].iloc[0] == "b" # from df1 (coalesced) + assert result.loc[result["id"] == 3, "value"].iloc[0] == "y" # from df2 + + def test_merge_more_than_two_dataframes_raises_error(self, component): + """Test merge with more than 2 DataFrames raises ValueError.""" + df1 = DataFrame(pd.DataFrame({"id": [1], "name": ["A"]})) + df2 = DataFrame(pd.DataFrame({"id": [2], "name": ["B"]})) + df3 = DataFrame(pd.DataFrame({"id": [3], "name": ["C"]})) + + component.df = [df1, df2, df3] + component.operation = [{"name": "Merge", "icon": "merge"}] + component.merge_on_column = "id" + component.merge_how = "inner" + + with pytest.raises(ValueError, match="Merge requires exactly"): + component.perform_operation() class TestListInputHandling: @@ -580,9 +593,7 @@ def test_merge_fields_show(self, component): "merge_how": {"show": False}, } - updated_config = component.update_build_config( - build_config, [{"name": "Merge", "icon": "merge"}], "operation" - ) + updated_config = component.update_build_config(build_config, [{"name": "Merge", "icon": "merge"}], "operation") assert updated_config["merge_on_column"]["show"] is True assert updated_config["merge_how"]["show"] is True diff --git a/src/lfx/src/lfx/components/processing/dataframe_operations.py b/src/lfx/src/lfx/components/processing/dataframe_operations.py index 2b29d5115732..747f2acd9e86 100644 --- a/src/lfx/src/lfx/components/processing/dataframe_operations.py +++ b/src/lfx/src/lfx/components/processing/dataframe_operations.py @@ -33,7 +33,7 @@ class DataFrameOperationsComponent(Component): DataFrameInput( name="df", display_name="DataFrame", - info="The input DataFrame to operate on. Connect multiple DataFrames for merge operations.", + info="The input DataFrame to operate on. Connect multiple DataFrames for merge or concatenate operations.", required=True, is_list=True, ), @@ -239,12 +239,12 @@ def _get_primary_dataframe(self) -> DataFrame: def perform_operation(self) -> DataFrame: df_copy = self._get_primary_dataframe() - # Handle SortableListInput format for operation + # Handle SortableListInput format for operation (also supports legacy string format) operation_input = getattr(self, "operation", []) - if isinstance(operation_input, list) and len(operation_input) > 0: - op = operation_input[0].get("name", "") + if isinstance(operation_input, list): + op = operation_input[0].get("name", "") if operation_input else "" else: - op = "" + op = operation_input or "" # If no operation selected, return original DataFrame if not op: @@ -370,6 +370,12 @@ def merge_dataframes(self) -> DataFrame: if len(self.df) == 1: return self.df[0].copy() + # Merge requires exactly two DataFrames + max_merge_inputs = 2 + if len(self.df) > max_merge_inputs: + msg = f"Merge requires exactly {max_merge_inputs} DataFrames, got {len(self.df)}" + raise ValueError(msg) + df1 = self.df[0].copy() df2 = self.df[1].copy() From f1d0b562d5b1cacb0e6383573aa384a879d73c23 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 5 Feb 2026 14:42:17 +0000 Subject: [PATCH 5/6] [autofix.ci] apply automated fixes --- src/lfx/src/lfx/_assets/component_index.json | 8 ++++---- src/lfx/src/lfx/_assets/stable_hash_history.json | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/lfx/src/lfx/_assets/component_index.json b/src/lfx/src/lfx/_assets/component_index.json index f79a32f4db00..121c1c5fd339 100644 --- a/src/lfx/src/lfx/_assets/component_index.json +++ b/src/lfx/src/lfx/_assets/component_index.json @@ -97949,7 +97949,7 @@ "icon": "table", "legacy": false, "metadata": { - "code_hash": "4e4612f8cf41", + "code_hash": "e2b4323d4ed5", "dependencies": { "dependencies": [ { @@ -98022,7 +98022,7 @@ "show": true, "title_case": false, "type": "code", - "value": "import pandas as pd\n\nfrom lfx.custom.custom_component.component import Component\nfrom lfx.inputs import SortableListInput\nfrom lfx.io import BoolInput, DataFrameInput, DropdownInput, IntInput, MessageTextInput, Output, StrInput\nfrom lfx.log.logger import logger\nfrom lfx.schema.dataframe import DataFrame\n\n\nclass DataFrameOperationsComponent(Component):\n display_name = \"DataFrame Operations\"\n description = \"Perform various operations on a DataFrame.\"\n documentation: str = \"https://docs.langflow.org/dataframe-operations\"\n icon = \"table\"\n name = \"DataFrameOperations\"\n\n OPERATION_CHOICES = [\n \"Add Column\",\n \"Concatenate\",\n \"Drop Column\",\n \"Filter\",\n \"Head\",\n \"Merge\",\n \"Rename Column\",\n \"Replace Value\",\n \"Select Columns\",\n \"Sort\",\n \"Tail\",\n \"Drop Duplicates\",\n ]\n\n inputs = [\n DataFrameInput(\n name=\"df\",\n display_name=\"DataFrame\",\n info=\"The input DataFrame to operate on. Connect multiple DataFrames for merge operations.\",\n required=True,\n is_list=True,\n ),\n SortableListInput(\n name=\"operation\",\n display_name=\"Operation\",\n placeholder=\"Select Operation\",\n info=\"Select the DataFrame operation to perform.\",\n options=[\n {\"name\": \"Add Column\", \"icon\": \"plus\"},\n {\"name\": \"Concatenate\", \"icon\": \"combine\"},\n {\"name\": \"Drop Column\", \"icon\": \"minus\"},\n {\"name\": \"Filter\", \"icon\": \"filter\"},\n {\"name\": \"Head\", \"icon\": \"arrow-up\"},\n {\"name\": \"Merge\", \"icon\": \"merge\"},\n {\"name\": \"Rename Column\", \"icon\": \"pencil\"},\n {\"name\": \"Replace Value\", \"icon\": \"replace\"},\n {\"name\": \"Select Columns\", \"icon\": \"columns\"},\n {\"name\": \"Sort\", \"icon\": \"arrow-up-down\"},\n {\"name\": \"Tail\", \"icon\": \"arrow-down\"},\n {\"name\": \"Drop Duplicates\", \"icon\": \"copy-x\"},\n ],\n real_time_refresh=True,\n limit=1,\n ),\n StrInput(\n name=\"column_name\",\n display_name=\"Column Name\",\n info=\"The column name to use for the operation.\",\n dynamic=True,\n show=False,\n ),\n MessageTextInput(\n name=\"filter_value\",\n display_name=\"Filter Value\",\n info=\"The value to filter rows by.\",\n dynamic=True,\n show=False,\n ),\n DropdownInput(\n name=\"filter_operator\",\n display_name=\"Filter Operator\",\n options=[\n \"equals\",\n \"not equals\",\n \"contains\",\n \"not contains\",\n \"starts with\",\n \"ends with\",\n \"greater than\",\n \"less than\",\n ],\n value=\"equals\",\n info=\"The operator to apply for filtering rows.\",\n advanced=False,\n dynamic=True,\n show=False,\n ),\n BoolInput(\n name=\"ascending\",\n display_name=\"Sort Ascending\",\n info=\"Whether to sort in ascending order.\",\n dynamic=True,\n show=False,\n value=True,\n ),\n StrInput(\n name=\"new_column_name\",\n display_name=\"New Column Name\",\n info=\"The new column name when renaming or adding a column.\",\n dynamic=True,\n show=False,\n ),\n MessageTextInput(\n name=\"new_column_value\",\n display_name=\"New Column Value\",\n info=\"The value to populate the new column with.\",\n dynamic=True,\n show=False,\n ),\n StrInput(\n name=\"columns_to_select\",\n display_name=\"Columns to Select\",\n dynamic=True,\n is_list=True,\n show=False,\n ),\n IntInput(\n name=\"num_rows\",\n display_name=\"Number of Rows\",\n info=\"Number of rows to return (for head/tail).\",\n dynamic=True,\n show=False,\n value=5,\n ),\n MessageTextInput(\n name=\"replace_value\",\n display_name=\"Value to Replace\",\n info=\"The value to replace in the column.\",\n dynamic=True,\n show=False,\n ),\n MessageTextInput(\n name=\"replacement_value\",\n display_name=\"Replacement Value\",\n info=\"The value to replace with.\",\n dynamic=True,\n show=False,\n ),\n StrInput(\n name=\"merge_on_column\",\n display_name=\"Merge On Column\",\n info=\"The column name to merge DataFrames on. Must exist in both DataFrames.\",\n dynamic=True,\n show=False,\n ),\n DropdownInput(\n name=\"merge_how\",\n display_name=\"Merge Type\",\n options=[\"inner\", \"outer\", \"left\", \"right\"],\n value=\"inner\",\n info=\"Type of merge: inner (intersection), outer (union), left, or right.\",\n dynamic=True,\n show=False,\n ),\n ]\n\n outputs = [\n Output(\n display_name=\"DataFrame\",\n name=\"output\",\n method=\"perform_operation\",\n info=\"The resulting DataFrame after the operation.\",\n )\n ]\n\n def update_build_config(self, build_config, field_value, field_name=None):\n dynamic_fields = [\n \"column_name\",\n \"filter_value\",\n \"filter_operator\",\n \"ascending\",\n \"new_column_name\",\n \"new_column_value\",\n \"columns_to_select\",\n \"num_rows\",\n \"replace_value\",\n \"replacement_value\",\n \"merge_on_column\",\n \"merge_how\",\n ]\n for field in dynamic_fields:\n build_config[field][\"show\"] = False\n\n if field_name == \"operation\":\n # Handle SortableListInput format\n if isinstance(field_value, list):\n operation_name = field_value[0].get(\"name\", \"\") if field_value else \"\"\n else:\n operation_name = field_value or \"\"\n\n # If no operation selected, all dynamic fields stay hidden (already set to False above)\n if not operation_name:\n return build_config\n\n if operation_name == \"Filter\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"filter_value\"][\"show\"] = True\n build_config[\"filter_operator\"][\"show\"] = True\n elif operation_name == \"Sort\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"ascending\"][\"show\"] = True\n elif operation_name == \"Drop Column\":\n build_config[\"column_name\"][\"show\"] = True\n elif operation_name == \"Rename Column\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"new_column_name\"][\"show\"] = True\n elif operation_name == \"Add Column\":\n build_config[\"new_column_name\"][\"show\"] = True\n build_config[\"new_column_value\"][\"show\"] = True\n elif operation_name == \"Select Columns\":\n build_config[\"columns_to_select\"][\"show\"] = True\n elif operation_name in {\"Head\", \"Tail\"}:\n build_config[\"num_rows\"][\"show\"] = True\n elif operation_name == \"Replace Value\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"replace_value\"][\"show\"] = True\n build_config[\"replacement_value\"][\"show\"] = True\n elif operation_name == \"Drop Duplicates\":\n build_config[\"column_name\"][\"show\"] = True\n elif operation_name == \"Merge\":\n build_config[\"merge_on_column\"][\"show\"] = True\n build_config[\"merge_how\"][\"show\"] = True\n\n return build_config\n\n def _get_primary_dataframe(self) -> DataFrame:\n \"\"\"Get the first DataFrame from input (handles both single and list inputs).\"\"\"\n if isinstance(self.df, list):\n return self.df[0].copy() if self.df else DataFrame()\n return self.df.copy()\n\n def perform_operation(self) -> DataFrame:\n df_copy = self._get_primary_dataframe()\n\n # Handle SortableListInput format for operation\n operation_input = getattr(self, \"operation\", [])\n if isinstance(operation_input, list) and len(operation_input) > 0:\n op = operation_input[0].get(\"name\", \"\")\n else:\n op = \"\"\n\n # If no operation selected, return original DataFrame\n if not op:\n return df_copy\n\n if op == \"Filter\":\n return self.filter_rows_by_value(df_copy)\n if op == \"Sort\":\n return self.sort_by_column(df_copy)\n if op == \"Drop Column\":\n return self.drop_column(df_copy)\n if op == \"Rename Column\":\n return self.rename_column(df_copy)\n if op == \"Add Column\":\n return self.add_column(df_copy)\n if op == \"Select Columns\":\n return self.select_columns(df_copy)\n if op == \"Head\":\n return self.head(df_copy)\n if op == \"Tail\":\n return self.tail(df_copy)\n if op == \"Replace Value\":\n return self.replace_values(df_copy)\n if op == \"Drop Duplicates\":\n return self.drop_duplicates(df_copy)\n if op == \"Concatenate\":\n return self.concatenate_dataframes()\n if op == \"Merge\":\n return self.merge_dataframes()\n msg = f\"Unsupported operation: {op}\"\n logger.error(msg)\n raise ValueError(msg)\n\n def filter_rows_by_value(self, df: DataFrame) -> DataFrame:\n column = df[self.column_name]\n filter_value = self.filter_value\n\n # Handle regular DropdownInput format (just a string value)\n operator = getattr(self, \"filter_operator\", \"equals\") # Default to equals for backward compatibility\n\n if operator == \"equals\":\n mask = column == filter_value\n elif operator == \"not equals\":\n mask = column != filter_value\n elif operator == \"contains\":\n mask = column.astype(str).str.contains(str(filter_value), na=False)\n elif operator == \"not contains\":\n mask = ~column.astype(str).str.contains(str(filter_value), na=False)\n elif operator == \"starts with\":\n mask = column.astype(str).str.startswith(str(filter_value), na=False)\n elif operator == \"ends with\":\n mask = column.astype(str).str.endswith(str(filter_value), na=False)\n elif operator == \"greater than\":\n try:\n # Try to convert filter_value to numeric for comparison\n numeric_value = pd.to_numeric(filter_value)\n mask = column > numeric_value\n except (ValueError, TypeError):\n # If conversion fails, compare as strings\n mask = column.astype(str) > str(filter_value)\n elif operator == \"less than\":\n try:\n # Try to convert filter_value to numeric for comparison\n numeric_value = pd.to_numeric(filter_value)\n mask = column < numeric_value\n except (ValueError, TypeError):\n # If conversion fails, compare as strings\n mask = column.astype(str) < str(filter_value)\n else:\n mask = column == filter_value # Fallback to equals\n\n return DataFrame(df[mask])\n\n def sort_by_column(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.sort_values(by=self.column_name, ascending=self.ascending))\n\n def drop_column(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.drop(columns=[self.column_name]))\n\n def rename_column(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.rename(columns={self.column_name: self.new_column_name}))\n\n def add_column(self, df: DataFrame) -> DataFrame:\n df[self.new_column_name] = [self.new_column_value] * len(df)\n return DataFrame(df)\n\n def select_columns(self, df: DataFrame) -> DataFrame:\n columns = [col.strip() for col in self.columns_to_select]\n return DataFrame(df[columns])\n\n def head(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.head(self.num_rows))\n\n def tail(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.tail(self.num_rows))\n\n def replace_values(self, df: DataFrame) -> DataFrame:\n df[self.column_name] = df[self.column_name].replace(self.replace_value, self.replacement_value)\n return DataFrame(df)\n\n def drop_duplicates(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.drop_duplicates(subset=self.column_name))\n\n def concatenate_dataframes(self) -> DataFrame:\n \"\"\"Concatenate multiple DataFrames vertically (stack rows).\"\"\"\n if not isinstance(self.df, list) or len(self.df) == 0:\n return self.df.copy() if self.df is not None else DataFrame()\n\n # If only one DataFrame, return it\n if len(self.df) == 1:\n return self.df[0].copy()\n\n # Concatenate all DataFrames vertically\n concatenated = pd.concat(self.df, ignore_index=True)\n return DataFrame(concatenated)\n\n def merge_dataframes(self) -> DataFrame:\n \"\"\"Merge two DataFrames based on a common column (join operation).\"\"\"\n if not isinstance(self.df, list) or len(self.df) == 0:\n return self.df.copy() if self.df is not None else DataFrame()\n\n # If only one DataFrame, return it\n if len(self.df) == 1:\n return self.df[0].copy()\n\n df1 = self.df[0].copy()\n df2 = self.df[1].copy()\n\n merge_on = getattr(self, \"merge_on_column\", None)\n merge_how = getattr(self, \"merge_how\", \"inner\")\n\n # If merge column specified, validate it exists in both DataFrames\n if merge_on:\n if merge_on not in df1.columns:\n msg = f\"Column '{merge_on}' not found in first DataFrame. Available: {list(df1.columns)}\"\n raise ValueError(msg)\n if merge_on not in df2.columns:\n msg = f\"Column '{merge_on}' not found in second DataFrame. Available: {list(df2.columns)}\"\n raise ValueError(msg)\n\n merged = df1.merge(df2, on=merge_on, how=merge_how, suffixes=(\"\", \"_df2\"))\n else:\n merged = df1.merge(df2, left_index=True, right_index=True, how=merge_how, suffixes=(\"\", \"_df2\"))\n\n # Combine duplicate columns: use df1 value if exists, otherwise df2 value\n cols_to_drop = []\n for col in merged.columns:\n if col.endswith(\"_df2\"):\n original_col = col[:-4] # Remove \"_df2\" suffix\n if original_col in merged.columns:\n # Coalesce: use original if not null, otherwise use _df2\n merged[original_col] = merged[original_col].combine_first(merged[col])\n cols_to_drop.append(col)\n\n if cols_to_drop:\n merged = merged.drop(columns=cols_to_drop)\n\n return DataFrame(merged)\n" + "value": "import pandas as pd\n\nfrom lfx.custom.custom_component.component import Component\nfrom lfx.inputs import SortableListInput\nfrom lfx.io import BoolInput, DataFrameInput, DropdownInput, IntInput, MessageTextInput, Output, StrInput\nfrom lfx.log.logger import logger\nfrom lfx.schema.dataframe import DataFrame\n\n\nclass DataFrameOperationsComponent(Component):\n display_name = \"DataFrame Operations\"\n description = \"Perform various operations on a DataFrame.\"\n documentation: str = \"https://docs.langflow.org/dataframe-operations\"\n icon = \"table\"\n name = \"DataFrameOperations\"\n\n OPERATION_CHOICES = [\n \"Add Column\",\n \"Concatenate\",\n \"Drop Column\",\n \"Filter\",\n \"Head\",\n \"Merge\",\n \"Rename Column\",\n \"Replace Value\",\n \"Select Columns\",\n \"Sort\",\n \"Tail\",\n \"Drop Duplicates\",\n ]\n\n inputs = [\n DataFrameInput(\n name=\"df\",\n display_name=\"DataFrame\",\n info=\"The input DataFrame to operate on. Connect multiple DataFrames for merge or concatenate operations.\",\n required=True,\n is_list=True,\n ),\n SortableListInput(\n name=\"operation\",\n display_name=\"Operation\",\n placeholder=\"Select Operation\",\n info=\"Select the DataFrame operation to perform.\",\n options=[\n {\"name\": \"Add Column\", \"icon\": \"plus\"},\n {\"name\": \"Concatenate\", \"icon\": \"combine\"},\n {\"name\": \"Drop Column\", \"icon\": \"minus\"},\n {\"name\": \"Filter\", \"icon\": \"filter\"},\n {\"name\": \"Head\", \"icon\": \"arrow-up\"},\n {\"name\": \"Merge\", \"icon\": \"merge\"},\n {\"name\": \"Rename Column\", \"icon\": \"pencil\"},\n {\"name\": \"Replace Value\", \"icon\": \"replace\"},\n {\"name\": \"Select Columns\", \"icon\": \"columns\"},\n {\"name\": \"Sort\", \"icon\": \"arrow-up-down\"},\n {\"name\": \"Tail\", \"icon\": \"arrow-down\"},\n {\"name\": \"Drop Duplicates\", \"icon\": \"copy-x\"},\n ],\n real_time_refresh=True,\n limit=1,\n ),\n StrInput(\n name=\"column_name\",\n display_name=\"Column Name\",\n info=\"The column name to use for the operation.\",\n dynamic=True,\n show=False,\n ),\n MessageTextInput(\n name=\"filter_value\",\n display_name=\"Filter Value\",\n info=\"The value to filter rows by.\",\n dynamic=True,\n show=False,\n ),\n DropdownInput(\n name=\"filter_operator\",\n display_name=\"Filter Operator\",\n options=[\n \"equals\",\n \"not equals\",\n \"contains\",\n \"not contains\",\n \"starts with\",\n \"ends with\",\n \"greater than\",\n \"less than\",\n ],\n value=\"equals\",\n info=\"The operator to apply for filtering rows.\",\n advanced=False,\n dynamic=True,\n show=False,\n ),\n BoolInput(\n name=\"ascending\",\n display_name=\"Sort Ascending\",\n info=\"Whether to sort in ascending order.\",\n dynamic=True,\n show=False,\n value=True,\n ),\n StrInput(\n name=\"new_column_name\",\n display_name=\"New Column Name\",\n info=\"The new column name when renaming or adding a column.\",\n dynamic=True,\n show=False,\n ),\n MessageTextInput(\n name=\"new_column_value\",\n display_name=\"New Column Value\",\n info=\"The value to populate the new column with.\",\n dynamic=True,\n show=False,\n ),\n StrInput(\n name=\"columns_to_select\",\n display_name=\"Columns to Select\",\n dynamic=True,\n is_list=True,\n show=False,\n ),\n IntInput(\n name=\"num_rows\",\n display_name=\"Number of Rows\",\n info=\"Number of rows to return (for head/tail).\",\n dynamic=True,\n show=False,\n value=5,\n ),\n MessageTextInput(\n name=\"replace_value\",\n display_name=\"Value to Replace\",\n info=\"The value to replace in the column.\",\n dynamic=True,\n show=False,\n ),\n MessageTextInput(\n name=\"replacement_value\",\n display_name=\"Replacement Value\",\n info=\"The value to replace with.\",\n dynamic=True,\n show=False,\n ),\n StrInput(\n name=\"merge_on_column\",\n display_name=\"Merge On Column\",\n info=\"The column name to merge DataFrames on. Must exist in both DataFrames.\",\n dynamic=True,\n show=False,\n ),\n DropdownInput(\n name=\"merge_how\",\n display_name=\"Merge Type\",\n options=[\"inner\", \"outer\", \"left\", \"right\"],\n value=\"inner\",\n info=\"Type of merge: inner (intersection), outer (union), left, or right.\",\n dynamic=True,\n show=False,\n ),\n ]\n\n outputs = [\n Output(\n display_name=\"DataFrame\",\n name=\"output\",\n method=\"perform_operation\",\n info=\"The resulting DataFrame after the operation.\",\n )\n ]\n\n def update_build_config(self, build_config, field_value, field_name=None):\n dynamic_fields = [\n \"column_name\",\n \"filter_value\",\n \"filter_operator\",\n \"ascending\",\n \"new_column_name\",\n \"new_column_value\",\n \"columns_to_select\",\n \"num_rows\",\n \"replace_value\",\n \"replacement_value\",\n \"merge_on_column\",\n \"merge_how\",\n ]\n for field in dynamic_fields:\n build_config[field][\"show\"] = False\n\n if field_name == \"operation\":\n # Handle SortableListInput format\n if isinstance(field_value, list):\n operation_name = field_value[0].get(\"name\", \"\") if field_value else \"\"\n else:\n operation_name = field_value or \"\"\n\n # If no operation selected, all dynamic fields stay hidden (already set to False above)\n if not operation_name:\n return build_config\n\n if operation_name == \"Filter\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"filter_value\"][\"show\"] = True\n build_config[\"filter_operator\"][\"show\"] = True\n elif operation_name == \"Sort\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"ascending\"][\"show\"] = True\n elif operation_name == \"Drop Column\":\n build_config[\"column_name\"][\"show\"] = True\n elif operation_name == \"Rename Column\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"new_column_name\"][\"show\"] = True\n elif operation_name == \"Add Column\":\n build_config[\"new_column_name\"][\"show\"] = True\n build_config[\"new_column_value\"][\"show\"] = True\n elif operation_name == \"Select Columns\":\n build_config[\"columns_to_select\"][\"show\"] = True\n elif operation_name in {\"Head\", \"Tail\"}:\n build_config[\"num_rows\"][\"show\"] = True\n elif operation_name == \"Replace Value\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"replace_value\"][\"show\"] = True\n build_config[\"replacement_value\"][\"show\"] = True\n elif operation_name == \"Drop Duplicates\":\n build_config[\"column_name\"][\"show\"] = True\n elif operation_name == \"Merge\":\n build_config[\"merge_on_column\"][\"show\"] = True\n build_config[\"merge_how\"][\"show\"] = True\n\n return build_config\n\n def _get_primary_dataframe(self) -> DataFrame:\n \"\"\"Get the first DataFrame from input (handles both single and list inputs).\"\"\"\n if isinstance(self.df, list):\n return self.df[0].copy() if self.df else DataFrame()\n return self.df.copy()\n\n def perform_operation(self) -> DataFrame:\n df_copy = self._get_primary_dataframe()\n\n # Handle SortableListInput format for operation (also supports legacy string format)\n operation_input = getattr(self, \"operation\", [])\n if isinstance(operation_input, list):\n op = operation_input[0].get(\"name\", \"\") if operation_input else \"\"\n else:\n op = operation_input or \"\"\n\n # If no operation selected, return original DataFrame\n if not op:\n return df_copy\n\n if op == \"Filter\":\n return self.filter_rows_by_value(df_copy)\n if op == \"Sort\":\n return self.sort_by_column(df_copy)\n if op == \"Drop Column\":\n return self.drop_column(df_copy)\n if op == \"Rename Column\":\n return self.rename_column(df_copy)\n if op == \"Add Column\":\n return self.add_column(df_copy)\n if op == \"Select Columns\":\n return self.select_columns(df_copy)\n if op == \"Head\":\n return self.head(df_copy)\n if op == \"Tail\":\n return self.tail(df_copy)\n if op == \"Replace Value\":\n return self.replace_values(df_copy)\n if op == \"Drop Duplicates\":\n return self.drop_duplicates(df_copy)\n if op == \"Concatenate\":\n return self.concatenate_dataframes()\n if op == \"Merge\":\n return self.merge_dataframes()\n msg = f\"Unsupported operation: {op}\"\n logger.error(msg)\n raise ValueError(msg)\n\n def filter_rows_by_value(self, df: DataFrame) -> DataFrame:\n column = df[self.column_name]\n filter_value = self.filter_value\n\n # Handle regular DropdownInput format (just a string value)\n operator = getattr(self, \"filter_operator\", \"equals\") # Default to equals for backward compatibility\n\n if operator == \"equals\":\n mask = column == filter_value\n elif operator == \"not equals\":\n mask = column != filter_value\n elif operator == \"contains\":\n mask = column.astype(str).str.contains(str(filter_value), na=False)\n elif operator == \"not contains\":\n mask = ~column.astype(str).str.contains(str(filter_value), na=False)\n elif operator == \"starts with\":\n mask = column.astype(str).str.startswith(str(filter_value), na=False)\n elif operator == \"ends with\":\n mask = column.astype(str).str.endswith(str(filter_value), na=False)\n elif operator == \"greater than\":\n try:\n # Try to convert filter_value to numeric for comparison\n numeric_value = pd.to_numeric(filter_value)\n mask = column > numeric_value\n except (ValueError, TypeError):\n # If conversion fails, compare as strings\n mask = column.astype(str) > str(filter_value)\n elif operator == \"less than\":\n try:\n # Try to convert filter_value to numeric for comparison\n numeric_value = pd.to_numeric(filter_value)\n mask = column < numeric_value\n except (ValueError, TypeError):\n # If conversion fails, compare as strings\n mask = column.astype(str) < str(filter_value)\n else:\n mask = column == filter_value # Fallback to equals\n\n return DataFrame(df[mask])\n\n def sort_by_column(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.sort_values(by=self.column_name, ascending=self.ascending))\n\n def drop_column(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.drop(columns=[self.column_name]))\n\n def rename_column(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.rename(columns={self.column_name: self.new_column_name}))\n\n def add_column(self, df: DataFrame) -> DataFrame:\n df[self.new_column_name] = [self.new_column_value] * len(df)\n return DataFrame(df)\n\n def select_columns(self, df: DataFrame) -> DataFrame:\n columns = [col.strip() for col in self.columns_to_select]\n return DataFrame(df[columns])\n\n def head(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.head(self.num_rows))\n\n def tail(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.tail(self.num_rows))\n\n def replace_values(self, df: DataFrame) -> DataFrame:\n df[self.column_name] = df[self.column_name].replace(self.replace_value, self.replacement_value)\n return DataFrame(df)\n\n def drop_duplicates(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.drop_duplicates(subset=self.column_name))\n\n def concatenate_dataframes(self) -> DataFrame:\n \"\"\"Concatenate multiple DataFrames vertically (stack rows).\"\"\"\n if not isinstance(self.df, list) or len(self.df) == 0:\n return self.df.copy() if self.df is not None else DataFrame()\n\n # If only one DataFrame, return it\n if len(self.df) == 1:\n return self.df[0].copy()\n\n # Concatenate all DataFrames vertically\n concatenated = pd.concat(self.df, ignore_index=True)\n return DataFrame(concatenated)\n\n def merge_dataframes(self) -> DataFrame:\n \"\"\"Merge two DataFrames based on a common column (join operation).\"\"\"\n if not isinstance(self.df, list) or len(self.df) == 0:\n return self.df.copy() if self.df is not None else DataFrame()\n\n # If only one DataFrame, return it\n if len(self.df) == 1:\n return self.df[0].copy()\n\n # Merge requires exactly two DataFrames\n max_merge_inputs = 2\n if len(self.df) > max_merge_inputs:\n msg = f\"Merge requires exactly {max_merge_inputs} DataFrames, got {len(self.df)}\"\n raise ValueError(msg)\n\n df1 = self.df[0].copy()\n df2 = self.df[1].copy()\n\n merge_on = getattr(self, \"merge_on_column\", None)\n merge_how = getattr(self, \"merge_how\", \"inner\")\n\n # If merge column specified, validate it exists in both DataFrames\n if merge_on:\n if merge_on not in df1.columns:\n msg = f\"Column '{merge_on}' not found in first DataFrame. Available: {list(df1.columns)}\"\n raise ValueError(msg)\n if merge_on not in df2.columns:\n msg = f\"Column '{merge_on}' not found in second DataFrame. Available: {list(df2.columns)}\"\n raise ValueError(msg)\n\n merged = df1.merge(df2, on=merge_on, how=merge_how, suffixes=(\"\", \"_df2\"))\n else:\n merged = df1.merge(df2, left_index=True, right_index=True, how=merge_how, suffixes=(\"\", \"_df2\"))\n\n # Combine duplicate columns: use df1 value if exists, otherwise df2 value\n cols_to_drop = []\n for col in merged.columns:\n if col.endswith(\"_df2\"):\n original_col = col[:-4] # Remove \"_df2\" suffix\n if original_col in merged.columns:\n # Coalesce: use original if not null, otherwise use _df2\n merged[original_col] = merged[original_col].combine_first(merged[col])\n cols_to_drop.append(col)\n\n if cols_to_drop:\n merged = merged.drop(columns=cols_to_drop)\n\n return DataFrame(merged)\n" }, "column_name": { "_input_type": "StrInput", @@ -98071,7 +98071,7 @@ "advanced": false, "display_name": "DataFrame", "dynamic": false, - "info": "The input DataFrame to operate on. Connect multiple DataFrames for merge operations.", + "info": "The input DataFrame to operate on. Connect multiple DataFrames for merge or concatenate operations.", "input_types": [ "DataFrame" ], @@ -116101,6 +116101,6 @@ "num_components": 355, "num_modules": 95 }, - "sha256": "e76091e51bfc1a1675415f87b3fa9fe5609ac5788442ee8e9ae1e62727a48461", + "sha256": "3674c6534b51db6a061a03e1f4a867ed39b3f02dcce705bcee4523dc45acf883", "version": "0.3.0" } \ No newline at end of file diff --git a/src/lfx/src/lfx/_assets/stable_hash_history.json b/src/lfx/src/lfx/_assets/stable_hash_history.json index e7c35ab4bd52..d19b8989086b 100644 --- a/src/lfx/src/lfx/_assets/stable_hash_history.json +++ b/src/lfx/src/lfx/_assets/stable_hash_history.json @@ -1381,7 +1381,7 @@ }, "DataFrameOperations": { "versions": { - "0.3.0": "4e4612f8cf41" + "0.3.0": "e2b4323d4ed5" } }, "DynamicCreateData": { From 4a106419a3008de5d0466ad4a9dbd5825cf24051 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Mon, 2 Mar 2026 20:08:47 +0000 Subject: [PATCH 6/6] [autofix.ci] apply automated fixes --- src/lfx/src/lfx/_assets/component_index.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lfx/src/lfx/_assets/component_index.json b/src/lfx/src/lfx/_assets/component_index.json index 27885a027905..4730b681d171 100644 --- a/src/lfx/src/lfx/_assets/component_index.json +++ b/src/lfx/src/lfx/_assets/component_index.json @@ -117438,6 +117438,6 @@ "num_components": 357, "num_modules": 96 }, - "sha256": "e76a4e55f9f5949d6aad19ca795fd26d8a87f720724e22315a17a016654c1f71", + "sha256": "030eeb78403165aee7c8acea835027ceb6a34b9bdcd34af23007eb0b157a1a49", "version": "0.3.0" } \ No newline at end of file