From 0fd1030521cd8936b7c1407ea1efc0cf6cfe4d45 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Thu, 15 Jan 2026 21:30:59 -0800 Subject: [PATCH 1/3] Enable R UDF operator --- .../architecture/managers/executor_manager.py | 30 ++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/amber/src/main/python/core/architecture/managers/executor_manager.py b/amber/src/main/python/core/architecture/managers/executor_manager.py index 53e5a8903da..bf0bbe4a837 100644 --- a/amber/src/main/python/core/architecture/managers/executor_manager.py +++ b/amber/src/main/python/core/architecture/managers/executor_manager.py @@ -132,13 +132,29 @@ class declaration. :param language: The language of the operator code. :return: """ - assert language not in [ - "r-tuple", - "r-table", - ], "R language is not supported by default. Please consult third party plugin." - executor: type(Operator) = self.load_executor_definition(code) - self.executor = executor() - self.executor.is_source = is_source + if language in ("r-tuple", "r-table"): + # R support is provided by an optional plugin (texera-rudf) + executor_type = "Tuple" if language == "r-tuple" else "Table" + try: + import texera_r + + source_executor_class = getattr( + texera_r, f"R{executor_type}SourceExecutor" + ) + executor_class = getattr(texera_r, f"R{executor_type}Executor") + except ImportError as e: + raise RuntimeError( + "R operators require the texera-rudf package.\n" + "Install with: pip install git+https://github.com/Texera/texera-rudf.git\n" + f"Import error: {e}" + ) + self.executor = ( + source_executor_class(code) if is_source else executor_class(code) + ) + else: + executor: type(Operator) = self.load_executor_definition(code) + self.executor = executor() + self.executor.is_source = is_source assert isinstance(self.executor, SourceOperator) == self.executor.is_source, ( "Please use SourceOperator API for source operators." ) From 7c9af44e03da6ac3b64fd91e4cdd50a5142696f1 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Thu, 15 Jan 2026 22:19:31 -0800 Subject: [PATCH 2/3] Fix pytest --- .../managers/test_executor_manager.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/amber/src/main/python/core/architecture/managers/test_executor_manager.py b/amber/src/main/python/core/architecture/managers/test_executor_manager.py index 7728c509b4d..e32d6bcd2a4 100644 --- a/amber/src/main/python/core/architecture/managers/test_executor_manager.py +++ b/amber/src/main/python/core/architecture/managers/test_executor_manager.py @@ -39,7 +39,7 @@ def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]: class TestExecutorManager: - """Test suite for ExecutorManager, focusing on R UDF support removal.""" + """Test suite for ExecutorManager, focusing on R UDF plugin support.""" @pytest.fixture def executor_manager(self): @@ -57,26 +57,26 @@ def test_initialization(self, executor_manager): assert executor_manager.executor_version == 0 def test_reject_r_tuple_language(self, executor_manager): - """Test that 'r-tuple' language is rejected with AssertionError.""" - with pytest.raises(AssertionError) as exc_info: + """Test that 'r-tuple' language is rejected with RuntimeError when plugin is not available.""" + with pytest.raises(RuntimeError) as exc_info: executor_manager.initialize_executor( code=SAMPLE_OPERATOR_CODE, is_source=False, language="r-tuple" ) - # Verify the error message mentions R UDF support has been dropped - assert "not supported" in str(exc_info.value) or "dropped" in str( + # Verify the error message mentions R operators require the texera-rudf package + assert "texera-rudf" in str(exc_info.value) or "R operators require" in str( exc_info.value ) def test_reject_r_table_language(self, executor_manager): - """Test that 'r-table' language is rejected with AssertionError.""" - with pytest.raises(AssertionError) as exc_info: + """Test that 'r-table' language is rejected with RuntimeError when plugin is not available.""" + with pytest.raises(RuntimeError) as exc_info: executor_manager.initialize_executor( code=SAMPLE_OPERATOR_CODE, is_source=False, language="r-table" ) - # Verify the error message mentions R UDF support has been dropped - assert "not supported" in str(exc_info.value) or "dropped" in str( + # Verify the error message mentions R operators require the texera-rudf package + assert "texera-rudf" in str(exc_info.value) or "R operators require" in str( exc_info.value ) From 4751df2a6441010b355eca9f7cc28ad895409fb1 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Mon, 19 Jan 2026 13:10:06 -0800 Subject: [PATCH 3/3] Address comments --- .../architecture/managers/executor_manager.py | 12 +-- .../managers/test_executor_manager.py | 82 ++++++++++++++++++- 2 files changed, 82 insertions(+), 12 deletions(-) diff --git a/amber/src/main/python/core/architecture/managers/executor_manager.py b/amber/src/main/python/core/architecture/managers/executor_manager.py index bf0bbe4a837..eb1363d0a68 100644 --- a/amber/src/main/python/core/architecture/managers/executor_manager.py +++ b/amber/src/main/python/core/architecture/managers/executor_manager.py @@ -138,19 +138,15 @@ class declaration. try: import texera_r - source_executor_class = getattr( - texera_r, f"R{executor_type}SourceExecutor" - ) - executor_class = getattr(texera_r, f"R{executor_type}Executor") + class_suffix = "SourceExecutor" if is_source else "Executor" + executor_class = getattr(texera_r, f"R{executor_type}{class_suffix}") except ImportError as e: - raise RuntimeError( + raise ImportError( "R operators require the texera-rudf package.\n" "Install with: pip install git+https://github.com/Texera/texera-rudf.git\n" f"Import error: {e}" ) - self.executor = ( - source_executor_class(code) if is_source else executor_class(code) - ) + self.executor = executor_class(code) else: executor: type(Operator) = self.load_executor_definition(code) self.executor = executor() diff --git a/amber/src/main/python/core/architecture/managers/test_executor_manager.py b/amber/src/main/python/core/architecture/managers/test_executor_manager.py index e32d6bcd2a4..901f768a216 100644 --- a/amber/src/main/python/core/architecture/managers/test_executor_manager.py +++ b/amber/src/main/python/core/architecture/managers/test_executor_manager.py @@ -15,7 +15,9 @@ # specific language governing permissions and limitations # under the License. +import sys import pytest +from unittest.mock import MagicMock from core.architecture.managers.executor_manager import ExecutorManager @@ -50,6 +52,34 @@ def executor_manager(self): if hasattr(manager, "_fs"): manager.close() + def _mock_r_plugin(self, executor_class_name, is_source): + """ + Helper to mock the texera_r plugin module. + + :param executor_class_name: Name of the executor class (e.g., 'RTupleExecutor') + :param is_source: Whether the executor is a source operator + :return: Tuple of (mock_texera_r, mock_executor_instance) + """ + from core.models import SourceOperator, Operator + + mock_texera_r = MagicMock() + mock_executor_class = MagicMock() + setattr(mock_texera_r, executor_class_name, mock_executor_class) + + # Use appropriate spec based on operator type + spec_class = SourceOperator if is_source else Operator + mock_executor_instance = MagicMock(spec=spec_class) + mock_executor_instance.is_source = is_source + mock_executor_class.return_value = mock_executor_instance + + sys.modules["texera_r"] = mock_texera_r + return mock_texera_r, mock_executor_instance + + def _cleanup_r_plugin(self): + """Remove the mocked texera_r module from sys.modules.""" + if "texera_r" in sys.modules: + del sys.modules["texera_r"] + def test_initialization(self, executor_manager): """Test that ExecutorManager initializes correctly.""" assert executor_manager.executor is None @@ -57,8 +87,8 @@ def test_initialization(self, executor_manager): assert executor_manager.executor_version == 0 def test_reject_r_tuple_language(self, executor_manager): - """Test that 'r-tuple' language is rejected with RuntimeError when plugin is not available.""" - with pytest.raises(RuntimeError) as exc_info: + """Test that 'r-tuple' language is rejected with ImportError when plugin is not available.""" + with pytest.raises(ImportError) as exc_info: executor_manager.initialize_executor( code=SAMPLE_OPERATOR_CODE, is_source=False, language="r-tuple" ) @@ -69,8 +99,8 @@ def test_reject_r_tuple_language(self, executor_manager): ) def test_reject_r_table_language(self, executor_manager): - """Test that 'r-table' language is rejected with RuntimeError when plugin is not available.""" - with pytest.raises(RuntimeError) as exc_info: + """Test that 'r-table' language is rejected with ImportError when plugin is not available.""" + with pytest.raises(ImportError) as exc_info: executor_manager.initialize_executor( code=SAMPLE_OPERATOR_CODE, is_source=False, language="r-table" ) @@ -80,6 +110,50 @@ def test_reject_r_table_language(self, executor_manager): exc_info.value ) + def test_accept_r_tuple_language_with_plugin(self, executor_manager): + """Test that 'r-tuple' language is accepted when plugin is available.""" + _, mock_executor = self._mock_r_plugin("RTupleExecutor", is_source=False) + try: + executor_manager.initialize_executor( + code="# R code", is_source=False, language="r-tuple" + ) + assert executor_manager.executor == mock_executor + finally: + self._cleanup_r_plugin() + + def test_accept_r_table_language_with_plugin(self, executor_manager): + """Test that 'r-table' language is accepted when plugin is available.""" + _, mock_executor = self._mock_r_plugin("RTableExecutor", is_source=False) + try: + executor_manager.initialize_executor( + code="# R code", is_source=False, language="r-table" + ) + assert executor_manager.executor == mock_executor + finally: + self._cleanup_r_plugin() + + def test_accept_r_tuple_source_with_plugin(self, executor_manager): + """Test that 'r-tuple' source operators work when plugin is available.""" + _, mock_executor = self._mock_r_plugin("RTupleSourceExecutor", is_source=True) + try: + executor_manager.initialize_executor( + code="# R code", is_source=True, language="r-tuple" + ) + assert executor_manager.executor == mock_executor + finally: + self._cleanup_r_plugin() + + def test_accept_r_table_source_with_plugin(self, executor_manager): + """Test that 'r-table' source operators work when plugin is available.""" + _, mock_executor = self._mock_r_plugin("RTableSourceExecutor", is_source=True) + try: + executor_manager.initialize_executor( + code="# R code", is_source=True, language="r-table" + ) + assert executor_manager.executor == mock_executor + finally: + self._cleanup_r_plugin() + def test_accept_python_language_regular_operator(self, executor_manager): """Test that 'python' language is accepted for regular operators.""" # This should not raise any assertion error