Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,25 @@ class declaration.
:param language: The language of the operator code.
:return:
"""
assert language not in [
"r-tuple",
"r-table",
], "R language is not supported by default. Please consult third party plugin."
executor: type(Operator) = self.load_executor_definition(code)
self.executor = executor()
self.executor.is_source = is_source
if language in ("r-tuple", "r-table"):
# R support is provided by an optional plugin (texera-rudf)
executor_type = "Tuple" if language == "r-tuple" else "Table"
try:
import texera_r

class_suffix = "SourceExecutor" if is_source else "Executor"
executor_class = getattr(texera_r, f"R{executor_type}{class_suffix}")
except ImportError as e:
raise ImportError(
"R operators require the texera-rudf package.\n"
"Install with: pip install git+https://github.com/Texera/texera-rudf.git\n"
f"Import error: {e}"
)
self.executor = executor_class(code)
else:
executor: type(Operator) = self.load_executor_definition(code)
self.executor = executor()
self.executor.is_source = is_source
assert isinstance(self.executor, SourceOperator) == self.executor.is_source, (
"Please use SourceOperator API for source operators."
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
# specific language governing permissions and limitations
# under the License.

import sys
import pytest
from unittest.mock import MagicMock

from core.architecture.managers.executor_manager import ExecutorManager

Expand All @@ -39,7 +41,7 @@ def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]:


class TestExecutorManager:
"""Test suite for ExecutorManager, focusing on R UDF support removal."""
"""Test suite for ExecutorManager, focusing on R UDF plugin support."""

@pytest.fixture
def executor_manager(self):
Expand All @@ -50,36 +52,108 @@ def executor_manager(self):
if hasattr(manager, "_fs"):
manager.close()

def _mock_r_plugin(self, executor_class_name, is_source):
"""
Helper to mock the texera_r plugin module.

:param executor_class_name: Name of the executor class (e.g., 'RTupleExecutor')
:param is_source: Whether the executor is a source operator
:return: Tuple of (mock_texera_r, mock_executor_instance)
"""
from core.models import SourceOperator, Operator

mock_texera_r = MagicMock()
mock_executor_class = MagicMock()
setattr(mock_texera_r, executor_class_name, mock_executor_class)

# Use appropriate spec based on operator type
spec_class = SourceOperator if is_source else Operator
mock_executor_instance = MagicMock(spec=spec_class)
mock_executor_instance.is_source = is_source
mock_executor_class.return_value = mock_executor_instance

sys.modules["texera_r"] = mock_texera_r
return mock_texera_r, mock_executor_instance

def _cleanup_r_plugin(self):
"""Remove the mocked texera_r module from sys.modules."""
if "texera_r" in sys.modules:
del sys.modules["texera_r"]

def test_initialization(self, executor_manager):
"""Test that ExecutorManager initializes correctly."""
assert executor_manager.executor is None
assert executor_manager.operator_module_name is None
assert executor_manager.executor_version == 0

def test_reject_r_tuple_language(self, executor_manager):
"""Test that 'r-tuple' language is rejected with AssertionError."""
with pytest.raises(AssertionError) as exc_info:
"""Test that 'r-tuple' language is rejected with ImportError when plugin is not available."""
with pytest.raises(ImportError) as exc_info:
executor_manager.initialize_executor(
code=SAMPLE_OPERATOR_CODE, is_source=False, language="r-tuple"
)

# Verify the error message mentions R UDF support has been dropped
assert "not supported" in str(exc_info.value) or "dropped" in str(
# Verify the error message mentions R operators require the texera-rudf package
assert "texera-rudf" in str(exc_info.value) or "R operators require" in str(
exc_info.value
)

def test_reject_r_table_language(self, executor_manager):
"""Test that 'r-table' language is rejected with AssertionError."""
with pytest.raises(AssertionError) as exc_info:
"""Test that 'r-table' language is rejected with ImportError when plugin is not available."""
with pytest.raises(ImportError) as exc_info:
executor_manager.initialize_executor(
code=SAMPLE_OPERATOR_CODE, is_source=False, language="r-table"
)

# Verify the error message mentions R UDF support has been dropped
assert "not supported" in str(exc_info.value) or "dropped" in str(
# Verify the error message mentions R operators require the texera-rudf package
assert "texera-rudf" in str(exc_info.value) or "R operators require" in str(
exc_info.value
)

def test_accept_r_tuple_language_with_plugin(self, executor_manager):
"""Test that 'r-tuple' language is accepted when plugin is available."""
_, mock_executor = self._mock_r_plugin("RTupleExecutor", is_source=False)
try:
executor_manager.initialize_executor(
code="# R code", is_source=False, language="r-tuple"
)
assert executor_manager.executor == mock_executor
finally:
self._cleanup_r_plugin()

def test_accept_r_table_language_with_plugin(self, executor_manager):
"""Test that 'r-table' language is accepted when plugin is available."""
_, mock_executor = self._mock_r_plugin("RTableExecutor", is_source=False)
try:
executor_manager.initialize_executor(
code="# R code", is_source=False, language="r-table"
)
assert executor_manager.executor == mock_executor
finally:
self._cleanup_r_plugin()

def test_accept_r_tuple_source_with_plugin(self, executor_manager):
"""Test that 'r-tuple' source operators work when plugin is available."""
_, mock_executor = self._mock_r_plugin("RTupleSourceExecutor", is_source=True)
try:
executor_manager.initialize_executor(
code="# R code", is_source=True, language="r-tuple"
)
assert executor_manager.executor == mock_executor
finally:
self._cleanup_r_plugin()

def test_accept_r_table_source_with_plugin(self, executor_manager):
"""Test that 'r-table' source operators work when plugin is available."""
_, mock_executor = self._mock_r_plugin("RTableSourceExecutor", is_source=True)
try:
executor_manager.initialize_executor(
code="# R code", is_source=True, language="r-table"
)
assert executor_manager.executor == mock_executor
finally:
self._cleanup_r_plugin()

def test_accept_python_language_regular_operator(self, executor_manager):
"""Test that 'python' language is accepted for regular operators."""
# This should not raise any assertion error
Expand Down
Loading