diff --git a/amber/operator-requirements.txt b/amber/operator-requirements.txt index ce114ea8415..3328d152f08 100644 --- a/amber/operator-requirements.txt +++ b/amber/operator-requirements.txt @@ -23,3 +23,4 @@ pybase64==1.3.2 torch==2.8.0 scikit-learn==1.5.0 transformers==4.57.3 +boto3==1.40.53 diff --git a/amber/src/main/python/core/architecture/packaging/output_manager.py b/amber/src/main/python/core/architecture/packaging/output_manager.py index 4003429baf6..bf4afbf396f 100644 --- a/amber/src/main/python/core/architecture/packaging/output_manager.py +++ b/amber/src/main/python/core/architecture/packaging/output_manager.py @@ -262,7 +262,7 @@ def tuple_to_frame(self, tuples: typing.List[Tuple]) -> DataFrame: return DataFrame( frame=Table.from_pydict( { - name: [t[name] for t in tuples] + name: [t.get_serialized_field(name) for t in tuples] for name in self.get_port().get_schema().get_attr_names() }, schema=self.get_port().get_schema().as_arrow_schema(), diff --git a/amber/src/main/python/core/models/__init__.py b/amber/src/main/python/core/models/__init__.py index 9011c1db887..d24fe0a277d 100644 --- a/amber/src/main/python/core/models/__init__.py +++ b/amber/src/main/python/core/models/__init__.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +import builtins from inspect import Traceback from typing import NamedTuple @@ -36,7 +37,7 @@ class ExceptionInfo(NamedTuple): - exc: type + exc: builtins.type value: Exception tb: Traceback diff --git a/amber/src/main/python/core/models/schema/__init__.py b/amber/src/main/python/core/models/schema/__init__.py index 7c8b57bb316..306fe4e1d4e 100644 --- a/amber/src/main/python/core/models/schema/__init__.py +++ b/amber/src/main/python/core/models/schema/__init__.py @@ -16,12 +16,14 @@ # under the License. from .attribute_type import AttributeType +from core.models.type.large_binary import largebinary from .field import Field from .schema import Schema __all__ = [ "AttributeType", + "largebinary", "Field", "Schema", ] diff --git a/amber/src/main/python/core/models/schema/arrow_schema_utils.py b/amber/src/main/python/core/models/schema/arrow_schema_utils.py new file mode 100644 index 00000000000..527e0095e65 --- /dev/null +++ b/amber/src/main/python/core/models/schema/arrow_schema_utils.py @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Utilities for converting between Arrow schemas and Amber schemas, +handling LARGE_BINARY metadata preservation. +""" + +import pyarrow as pa +from typing import Mapping + +from core.models.schema.attribute_type import AttributeType +from core.models.schema.attribute_type_utils import ( + detect_attribute_type_from_arrow_field, + create_arrow_field_with_metadata, +) + + +def arrow_schema_to_attr_types(arrow_schema: pa.Schema) -> dict[str, AttributeType]: + """ + Converts an Arrow schema to a dictionary of attribute name to AttributeType. + Handles LARGE_BINARY metadata detection. + + :param arrow_schema: PyArrow schema that may contain LARGE_BINARY metadata + :return: Dictionary mapping attribute names to AttributeTypes + """ + attr_types = {} + for attr_name in arrow_schema.names: + field = arrow_schema.field(attr_name) + attr_types[attr_name] = detect_attribute_type_from_arrow_field(field) + return attr_types + + +def attr_types_to_arrow_schema( + attr_types: Mapping[str, AttributeType], +) -> pa.Schema: + """ + Converts a mapping of attribute name to AttributeType into an Arrow schema. + Adds metadata for LARGE_BINARY types. + Preserves the order of attributes from the input mapping. + + :param attr_types: Mapping of attribute names to AttributeTypes (e.g., OrderedDict) + :return: PyArrow schema with metadata for LARGE_BINARY types + """ + fields = [ + create_arrow_field_with_metadata(attr_name, attr_type) + for attr_name, attr_type in attr_types.items() + ] + return pa.schema(fields) diff --git a/amber/src/main/python/core/models/schema/attribute_type.py b/amber/src/main/python/core/models/schema/attribute_type.py index 72799b015cc..24d0745f41e 100644 --- a/amber/src/main/python/core/models/schema/attribute_type.py +++ b/amber/src/main/python/core/models/schema/attribute_type.py @@ -20,6 +20,7 @@ from bidict import bidict from enum import Enum from pyarrow import lib +from core.models.type.large_binary import largebinary class AttributeType(Enum): @@ -37,6 +38,7 @@ class AttributeType(Enum): DOUBLE = 5 TIMESTAMP = 6 BINARY = 7 + LARGE_BINARY = 8 RAW_TYPE_MAPPING = bidict( @@ -48,6 +50,7 @@ class AttributeType(Enum): "BOOLEAN": AttributeType.BOOL, "TIMESTAMP": AttributeType.TIMESTAMP, "BINARY": AttributeType.BINARY, + "LARGE_BINARY": AttributeType.LARGE_BINARY, } ) @@ -59,6 +62,7 @@ class AttributeType(Enum): AttributeType.BOOL: pa.bool_(), AttributeType.BINARY: pa.binary(), AttributeType.TIMESTAMP: pa.timestamp("us"), + AttributeType.LARGE_BINARY: pa.string(), # Serialized as URI string } FROM_ARROW_MAPPING = { @@ -83,6 +87,7 @@ class AttributeType(Enum): AttributeType.BOOL: bool, AttributeType.BINARY: bytes, AttributeType.TIMESTAMP: datetime.datetime, + AttributeType.LARGE_BINARY: largebinary, } FROM_PYOBJECT_MAPPING = { @@ -92,4 +97,5 @@ class AttributeType(Enum): bool: AttributeType.BOOL, bytes: AttributeType.BINARY, datetime.datetime: AttributeType.TIMESTAMP, + largebinary: AttributeType.LARGE_BINARY, } diff --git a/amber/src/main/python/core/models/schema/attribute_type_utils.py b/amber/src/main/python/core/models/schema/attribute_type_utils.py new file mode 100644 index 00000000000..3918fdfc342 --- /dev/null +++ b/amber/src/main/python/core/models/schema/attribute_type_utils.py @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Utilities for converting between AttributeTypes and Arrow field types, +handling LARGE_BINARY metadata preservation. +""" + +import pyarrow as pa + +from core.models.schema.attribute_type import ( + AttributeType, + FROM_ARROW_MAPPING, + TO_ARROW_MAPPING, +) + +# Metadata key used to mark LARGE_BINARY fields in Arrow schemas +TEXERA_TYPE_METADATA_KEY = b"texera_type" +LARGE_BINARY_METADATA_VALUE = b"LARGE_BINARY" + + +def detect_attribute_type_from_arrow_field(field: pa.Field) -> AttributeType: + """ + Detects the AttributeType from an Arrow field, checking metadata for LARGE_BINARY. + + :param field: PyArrow field that may contain metadata + :return: The detected AttributeType + """ + # Check metadata for LARGE_BINARY type + # (can be stored by either Scala ArrowUtils or Python) + is_large_binary = ( + field.metadata + and field.metadata.get(TEXERA_TYPE_METADATA_KEY) == LARGE_BINARY_METADATA_VALUE + ) + + if is_large_binary: + return AttributeType.LARGE_BINARY + else: + return FROM_ARROW_MAPPING[field.type.id] + + +def create_arrow_field_with_metadata( + attr_name: str, attr_type: AttributeType +) -> pa.Field: + """ + Creates a PyArrow field with appropriate metadata for the given AttributeType. + + :param attr_name: Name of the attribute + :param attr_type: The AttributeType + :return: PyArrow field with metadata if needed + """ + metadata = ( + {TEXERA_TYPE_METADATA_KEY: LARGE_BINARY_METADATA_VALUE} + if attr_type == AttributeType.LARGE_BINARY + else None + ) + + return pa.field(attr_name, TO_ARROW_MAPPING[attr_type], metadata=metadata) diff --git a/amber/src/main/python/core/models/schema/schema.py b/amber/src/main/python/core/models/schema/schema.py index 132ca23884c..d349807ab79 100644 --- a/amber/src/main/python/core/models/schema/schema.py +++ b/amber/src/main/python/core/models/schema/schema.py @@ -22,8 +22,10 @@ from core.models.schema.attribute_type import ( AttributeType, RAW_TYPE_MAPPING, - FROM_ARROW_MAPPING, - TO_ARROW_MAPPING, +) +from core.models.schema.arrow_schema_utils import ( + arrow_schema_to_attr_types, + attr_types_to_arrow_schema, ) @@ -85,22 +87,17 @@ def _from_arrow_schema(self, arrow_schema: pa.Schema) -> None: :return: """ self._name_type_mapping = OrderedDict() + attr_types = arrow_schema_to_attr_types(arrow_schema) + # Preserve field order from arrow_schema for attr_name in arrow_schema.names: - arrow_type = arrow_schema.field(attr_name).type # type: ignore - attr_type = FROM_ARROW_MAPPING[arrow_type.id] - self.add(attr_name, attr_type) + self.add(attr_name, attr_types[attr_name]) def as_arrow_schema(self) -> pa.Schema: """ Creates a new pyarrow.Schema according to the current Schema. :return: pyarrow.Schema """ - return pa.schema( - [ - pa.field(attr_name, TO_ARROW_MAPPING[attr_type]) - for attr_name, attr_type in self._name_type_mapping.items() - ] - ) + return attr_types_to_arrow_schema(self._name_type_mapping) def get_attr_names(self) -> List[str]: """ diff --git a/amber/src/main/python/core/models/schema/test_schema.py b/amber/src/main/python/core/models/schema/test_schema.py index 2da4e70655f..60e4c848a54 100644 --- a/amber/src/main/python/core/models/schema/test_schema.py +++ b/amber/src/main/python/core/models/schema/test_schema.py @@ -89,3 +89,68 @@ def test_convert_from_raw_schema(self, raw_schema, schema): def test_convert_from_arrow_schema(self, arrow_schema, schema): assert schema == Schema(arrow_schema=arrow_schema) assert schema.as_arrow_schema() == arrow_schema + + def test_large_binary_in_raw_schema(self): + """Test creating schema with LARGE_BINARY from raw schema.""" + raw_schema = { + "regular_field": "STRING", + "large_binary_field": "LARGE_BINARY", + } + schema = Schema(raw_schema=raw_schema) + assert schema.get_attr_type("regular_field") == AttributeType.STRING + assert schema.get_attr_type("large_binary_field") == AttributeType.LARGE_BINARY + + def test_large_binary_in_arrow_schema_with_metadata(self): + """Test creating schema with LARGE_BINARY from Arrow schema with metadata.""" + arrow_schema = pa.schema( + [ + pa.field("regular_field", pa.string()), + pa.field( + "large_binary_field", + pa.string(), + metadata={b"texera_type": b"LARGE_BINARY"}, + ), + ] + ) + schema = Schema(arrow_schema=arrow_schema) + assert schema.get_attr_type("regular_field") == AttributeType.STRING + assert schema.get_attr_type("large_binary_field") == AttributeType.LARGE_BINARY + + def test_large_binary_as_arrow_schema_includes_metadata(self): + """Test that LARGE_BINARY fields include metadata in Arrow schema.""" + schema = Schema() + schema.add("regular_field", AttributeType.STRING) + schema.add("large_binary_field", AttributeType.LARGE_BINARY) + + arrow_schema = schema.as_arrow_schema() + + # Regular field should have no metadata + regular_field = arrow_schema.field("regular_field") + assert ( + regular_field.metadata is None + or b"texera_type" not in regular_field.metadata + ) + + # LARGE_BINARY field should have metadata + large_binary_field = arrow_schema.field("large_binary_field") + assert large_binary_field.metadata is not None + assert large_binary_field.metadata.get(b"texera_type") == b"LARGE_BINARY" + assert ( + large_binary_field.type == pa.string() + ) # LARGE_BINARY is stored as string + + def test_round_trip_large_binary_schema(self): + """Test round-trip conversion of schema with LARGE_BINARY.""" + original_schema = Schema() + original_schema.add("field1", AttributeType.STRING) + original_schema.add("field2", AttributeType.LARGE_BINARY) + original_schema.add("field3", AttributeType.INT) + + # Convert to Arrow and back + arrow_schema = original_schema.as_arrow_schema() + round_trip_schema = Schema(arrow_schema=arrow_schema) + + assert round_trip_schema == original_schema + assert round_trip_schema.get_attr_type("field1") == AttributeType.STRING + assert round_trip_schema.get_attr_type("field2") == AttributeType.LARGE_BINARY + assert round_trip_schema.get_attr_type("field3") == AttributeType.INT diff --git a/amber/src/main/python/core/models/test_tuple.py b/amber/src/main/python/core/models/test_tuple.py index bfce7bb94fe..efb4fdf5c71 100644 --- a/amber/src/main/python/core/models/test_tuple.py +++ b/amber/src/main/python/core/models/test_tuple.py @@ -221,3 +221,101 @@ def test_hash(self): schema, ) assert hash(tuple5) == -2099556631 # calculated with Java + + def test_tuple_with_large_binary(self): + """Test tuple with largebinary field.""" + from core.models.type.large_binary import largebinary + + schema = Schema( + raw_schema={ + "regular_field": "STRING", + "large_binary_field": "LARGE_BINARY", + } + ) + + large_binary = largebinary("s3://test-bucket/path/to/object") + tuple_ = Tuple( + { + "regular_field": "test string", + "large_binary_field": large_binary, + }, + schema=schema, + ) + + assert tuple_["regular_field"] == "test string" + assert tuple_["large_binary_field"] == large_binary + assert isinstance(tuple_["large_binary_field"], largebinary) + assert tuple_["large_binary_field"].uri == "s3://test-bucket/path/to/object" + + def test_tuple_from_arrow_with_large_binary(self): + """Test creating tuple from Arrow table with LARGE_BINARY metadata.""" + import pyarrow as pa + from core.models.type.large_binary import largebinary + + # Create Arrow schema with LARGE_BINARY metadata + arrow_schema = pa.schema( + [ + pa.field("regular_field", pa.string()), + pa.field( + "large_binary_field", + pa.string(), + metadata={b"texera_type": b"LARGE_BINARY"}, + ), + ] + ) + + # Create Arrow table with URI string for large_binary_field + arrow_table = pa.Table.from_pydict( + { + "regular_field": ["test"], + "large_binary_field": ["s3://test-bucket/path/to/object"], + }, + schema=arrow_schema, + ) + + # Create tuple from Arrow table + tuple_provider = ArrowTableTupleProvider(arrow_table) + tuples = [ + Tuple({name: field_accessor for name in arrow_table.column_names}) + for field_accessor in tuple_provider + ] + + assert len(tuples) == 1 + tuple_ = tuples[0] + assert tuple_["regular_field"] == "test" + assert isinstance(tuple_["large_binary_field"], largebinary) + assert tuple_["large_binary_field"].uri == "s3://test-bucket/path/to/object" + + def test_tuple_with_null_large_binary(self): + """Test tuple with null largebinary field.""" + import pyarrow as pa + + # Create Arrow schema with LARGE_BINARY metadata + arrow_schema = pa.schema( + [ + pa.field( + "large_binary_field", + pa.string(), + metadata={b"texera_type": b"LARGE_BINARY"}, + ), + ] + ) + + # Create Arrow table with null value + arrow_table = pa.Table.from_pydict( + { + "large_binary_field": [None], + }, + schema=arrow_schema, + ) + + # Create tuple from Arrow table + tuple_provider = ArrowTableTupleProvider(arrow_table) + tuples = [ + Tuple({name: field_accessor for name in arrow_table.column_names}) + for field_accessor in tuple_provider + ] + + assert len(tuples) == 1 + tuple_ = tuples[0] + assert tuple_["large_binary_field"] is None diff --git a/amber/src/main/python/core/models/tuple.py b/amber/src/main/python/core/models/tuple.py index d6ae12862b8..916301406f5 100644 --- a/amber/src/main/python/core/models/tuple.py +++ b/amber/src/main/python/core/models/tuple.py @@ -29,6 +29,7 @@ from typing import Any, List, Iterator, Callable from typing_extensions import Protocol, runtime_checkable +from core.models.type.large_binary import largebinary from .schema.attribute_type import TO_PYOBJECT_MAPPING, AttributeType from .schema.field import Field from .schema.schema import Schema @@ -86,6 +87,7 @@ def field_accessor(field_name: str) -> Field: """ value = self._table.column(field_name).chunks[chunk_idx][tuple_idx].as_py() field_type = self._table.schema.field(field_name).type + field_metadata = self._table.schema.field(field_name).metadata # for binary types, convert pickled objects back. if ( @@ -94,6 +96,16 @@ def field_accessor(field_name: str) -> Field: and value[:6] == b"pickle" ): value = pickle.loads(value[10:]) + + # Convert URI string to largebinary for LARGE_BINARY types + # Metadata is set by Scala ArrowUtils or Python iceberg_utils + elif ( + value is not None + and field_metadata + and field_metadata.get(b"texera_type") == b"LARGE_BINARY" + ): + value = largebinary(value) + return value self._current_idx += 1 @@ -234,6 +246,27 @@ def as_dict(self) -> "OrderedDict[str, Field]": def as_key_value_pairs(self) -> List[typing.Tuple[str, Field]]: return [(k, v) for k, v in self.as_dict().items()] + def get_serialized_field(self, field_name: str) -> Field: + """ + Get a field value serialized for Arrow table conversion. + For LARGE_BINARY fields, converts largebinary instances to URI strings. + For other fields, returns the value as-is. + + :param field_name: field name + :return: field value (URI string for LARGE_BINARY fields with largebinary values) + """ + value = self[field_name] + + # Convert largebinary to URI string for LARGE_BINARY fields when schema available + if ( + self._schema is not None + and self._schema.get_attr_type(field_name) == AttributeType.LARGE_BINARY + and isinstance(value, largebinary) + ): + return value.uri + + return value + def get_field_names(self) -> typing.Tuple[str]: return tuple(map(str, self._field_data.keys())) diff --git a/amber/src/main/python/core/models/type/__init__.py b/amber/src/main/python/core/models/type/__init__.py new file mode 100644 index 00000000000..41344433aab --- /dev/null +++ b/amber/src/main/python/core/models/type/__init__.py @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from .large_binary import largebinary + +__all__ = ["largebinary"] diff --git a/amber/src/main/python/core/models/type/large_binary.py b/amber/src/main/python/core/models/type/large_binary.py new file mode 100644 index 00000000000..581a688912b --- /dev/null +++ b/amber/src/main/python/core/models/type/large_binary.py @@ -0,0 +1,98 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +largebinary represents a reference to a large object stored externally (e.g., S3). +This is a schema type class used throughout the system for handling +LARGE_BINARY attribute types. +""" + +from typing import Optional +from urllib.parse import urlparse + + +class largebinary: + """ + largebinary represents a reference to a large object stored in S3. + + Each largebinary is identified by an S3 URI (s3://bucket/path/to/object). + largebinary objects are automatically tracked and cleaned up when the workflow + execution completes. + + Usage: + from pytexera import largebinary, LargeBinaryInputStream, LargeBinaryOutputStream + + # Create a new largebinary for writing + large_binary = largebinary() + with LargeBinaryOutputStream(large_binary) as out: + out.write(b"data") + # large_binary is now ready to be added to tuples + + # Read from an existing largebinary + with LargeBinaryInputStream(large_binary) as stream: + content = stream.read() + + # Create from existing URI (e.g., from deserialization) + large_binary = largebinary("s3://bucket/path/to/object") + """ + + def __init__(self, uri: Optional[str] = None): + """ + Create a largebinary. + + Args: + uri: Optional S3 URI in the format s3://bucket/path/to/object. + If None, creates a new largebinary with a unique S3 URI. + + Raises: + ValueError: If URI is provided but doesn't start with "s3://" + """ + if uri is None: + # Lazy import to avoid circular dependencies + from pytexera.storage import large_binary_manager + + uri = large_binary_manager.create() + + if not uri.startswith("s3://"): + raise ValueError(f"largebinary URI must start with 's3://', got: {uri}") + + self._uri = uri + + @property + def uri(self) -> str: + """Get the S3 URI of this largebinary.""" + return self._uri + + def get_bucket_name(self) -> str: + """Get the S3 bucket name from the URI.""" + return urlparse(self._uri).netloc + + def get_object_key(self) -> str: + """Get the S3 object key (path) from the URI, without leading slash.""" + return urlparse(self._uri).path.lstrip("/") + + def __str__(self) -> str: + return self._uri + + def __repr__(self) -> str: + return f"largebinary('{self._uri}')" + + def __eq__(self, other) -> bool: + return isinstance(other, largebinary) and self._uri == other._uri + + def __hash__(self) -> int: + return hash(self._uri) diff --git a/amber/src/main/python/core/models/type/test_large_binary.py b/amber/src/main/python/core/models/type/test_large_binary.py new file mode 100644 index 00000000000..36310e1dd53 --- /dev/null +++ b/amber/src/main/python/core/models/type/test_large_binary.py @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest +from unittest.mock import patch +from core.models.type.large_binary import largebinary + + +class TestLargeBinary: + def test_create_with_uri(self): + """Test creating largebinary with a valid S3 URI.""" + uri = "s3://test-bucket/path/to/object" + large_binary = largebinary(uri) + assert large_binary.uri == uri + assert str(large_binary) == uri + assert repr(large_binary) == f"largebinary('{uri}')" + + def test_create_without_uri(self): + """Test creating largebinary without URI (calls large_binary_manager.create).""" + with patch("pytexera.storage.large_binary_manager.create") as mock_create: + mock_create.return_value = "s3://bucket/objects/123/uuid" + large_binary = largebinary() + assert large_binary.uri == "s3://bucket/objects/123/uuid" + mock_create.assert_called_once() + + def test_invalid_uri_raises_value_error(self): + """Test that invalid URI (not starting with s3://) raises ValueError.""" + with pytest.raises(ValueError, match="largebinary URI must start with 's3://'"): + largebinary("http://invalid-uri") + + with pytest.raises(ValueError, match="largebinary URI must start with 's3://'"): + largebinary("invalid-uri") + + def test_get_bucket_name(self): + """Test extracting bucket name from URI.""" + large_binary = largebinary("s3://my-bucket/path/to/object") + assert large_binary.get_bucket_name() == "my-bucket" + + def test_get_object_key(self): + """Test extracting object key from URI.""" + large_binary = largebinary("s3://my-bucket/path/to/object") + assert large_binary.get_object_key() == "path/to/object" + + def test_get_object_key_with_leading_slash(self): + """Test extracting object key when URI has leading slash.""" + large_binary = largebinary("s3://my-bucket/path/to/object") + # urlparse includes leading slash, but get_object_key removes it + assert large_binary.get_object_key() == "path/to/object" + + def test_equality(self): + """Test largebinary equality comparison.""" + uri = "s3://bucket/path" + obj1 = largebinary(uri) + obj2 = largebinary(uri) + obj3 = largebinary("s3://bucket/different") + + assert obj1 == obj2 + assert obj1 != obj3 + assert obj1 != "not a largebinary" + + def test_hash(self): + """Test largebinary hashing.""" + uri = "s3://bucket/path" + obj1 = largebinary(uri) + obj2 = largebinary(uri) + + assert hash(obj1) == hash(obj2) + assert hash(obj1) == hash(uri) + + def test_uri_property(self): + """Test URI property access.""" + uri = "s3://test-bucket/test/path" + large_binary = largebinary(uri) + assert large_binary.uri == uri diff --git a/amber/src/main/python/core/storage/document_factory.py b/amber/src/main/python/core/storage/document_factory.py index ba15069817e..9b686ab66b6 100644 --- a/amber/src/main/python/core/storage/document_factory.py +++ b/amber/src/main/python/core/storage/document_factory.py @@ -25,6 +25,7 @@ from core.storage.iceberg.iceberg_document import IcebergDocument from core.storage.iceberg.iceberg_utils import ( create_table, + amber_schema_to_iceberg_schema, amber_tuples_to_arrow_table, arrow_table_to_amber_tuples, load_table_metadata, @@ -63,7 +64,9 @@ def create_document(uri: str, schema: Schema) -> VirtualDocument: if resource_type in {VFSResourceType.RESULT}: storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) - iceberg_schema = Schema.as_arrow_schema(schema) + # Convert Amber Schema to Iceberg Schema with LARGE_BINARY + # field name encoding + iceberg_schema = amber_schema_to_iceberg_schema(schema) create_table( IcebergCatalogInstance.get_instance(), diff --git a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py index 12c841ed71f..9e17b2e0e82 100644 --- a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py +++ b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py @@ -25,9 +25,103 @@ from pyiceberg.schema import Schema from pyiceberg.table import Table from typing import Optional, Iterable +from pyiceberg import types as iceberg_types import core +import core.models from core.models import ArrowTableTupleProvider, Tuple +from core.models.schema.attribute_type import AttributeType, TO_ARROW_MAPPING + +# Suffix used to encode LARGE_BINARY fields in Iceberg (must match Scala IcebergUtil) +LARGE_BINARY_FIELD_SUFFIX = "__texera_large_binary_ptr" + +# Type mappings +_ICEBERG_TO_AMBER_TYPE_MAPPING = { + "string": "STRING", + "int": "INT", + "integer": "INT", + "long": "LONG", + "double": "DOUBLE", + "float": "DOUBLE", + "boolean": "BOOL", + "timestamp": "TIMESTAMP", + "binary": "BINARY", +} + +_AMBER_TO_ICEBERG_TYPE_MAPPING = { + AttributeType.STRING: iceberg_types.StringType(), + AttributeType.INT: iceberg_types.IntegerType(), + AttributeType.LONG: iceberg_types.LongType(), + AttributeType.DOUBLE: iceberg_types.DoubleType(), + AttributeType.BOOL: iceberg_types.BooleanType(), + AttributeType.TIMESTAMP: iceberg_types.TimestampType(), + AttributeType.BINARY: iceberg_types.BinaryType(), + AttributeType.LARGE_BINARY: iceberg_types.StringType(), +} + + +def encode_large_binary_field_name(field_name: str, attr_type) -> str: + """Encodes LARGE_BINARY field names with suffix for Iceberg storage.""" + if attr_type == AttributeType.LARGE_BINARY: + return f"{field_name}{LARGE_BINARY_FIELD_SUFFIX}" + return field_name + + +def decode_large_binary_field_name(field_name: str) -> str: + """Decodes field names by removing LARGE_BINARY suffix if present.""" + if field_name.endswith(LARGE_BINARY_FIELD_SUFFIX): + return field_name[: -len(LARGE_BINARY_FIELD_SUFFIX)] + return field_name + + +def iceberg_schema_to_amber_schema(iceberg_schema: Schema): + """ + Converts PyIceberg Schema to Amber Schema. + Decodes LARGE_BINARY field names and adds Arrow metadata. + """ + arrow_fields = [] + for field in iceberg_schema.fields: + decoded_name = decode_large_binary_field_name(field.name) + is_large_binary = field.name != decoded_name + + if is_large_binary: + attr_type = AttributeType.LARGE_BINARY + else: + iceberg_type_str = str(field.field_type).lower() + attr_type_name = _ICEBERG_TO_AMBER_TYPE_MAPPING.get( + iceberg_type_str, "STRING" + ) + attr_type = getattr(AttributeType, attr_type_name) + + arrow_fields.append( + pa.field( + decoded_name, + TO_ARROW_MAPPING[attr_type], + metadata={b"texera_type": b"LARGE_BINARY"} if is_large_binary else None, + ) + ) + + return core.models.Schema(pa.schema(arrow_fields)) + + +def amber_schema_to_iceberg_schema(amber_schema) -> Schema: + """ + Converts Amber Schema to PyIceberg Schema. + Encodes LARGE_BINARY field names with suffix. + """ + fields = [ + iceberg_types.NestedField( + field_id=idx, + name=encode_large_binary_field_name(field_name, attr_type), + field_type=_AMBER_TO_ICEBERG_TYPE_MAPPING[attr_type], + required=False, + ) + for idx, (field_name, attr_type) in enumerate( + amber_schema._name_type_mapping.items(), start=1 + ) + ] + + return Schema(*fields) def create_postgres_catalog( @@ -135,27 +229,47 @@ def amber_tuples_to_arrow_table( ) -> pa.Table: """ Converts a list of amber tuples to a pyarrow table for serialization. + Handles LARGE_BINARY field name encoding and serialization. """ - return pa.Table.from_pydict( - { - name: [t[name] for t in tuple_list] - for name in iceberg_schema.as_arrow().names - }, - schema=iceberg_schema.as_arrow(), - ) + from core.models.type.large_binary import largebinary + + tuple_list = list(tuple_list) # Convert to list to allow multiple iterations + data_dict = {} + for encoded_name in iceberg_schema.as_arrow().names: + decoded_name = decode_large_binary_field_name(encoded_name) + data_dict[encoded_name] = [ + ( + t[decoded_name].uri + if isinstance(t[decoded_name], largebinary) + else t[decoded_name] + ) + for t in tuple_list + ] + + return pa.Table.from_pydict(data_dict, schema=iceberg_schema.as_arrow()) def arrow_table_to_amber_tuples( iceberg_schema: Schema, arrow_table: pa.Table ) -> Iterable[Tuple]: """ - Converts an arrow table to a list of amber tuples for deserialization. + Converts an arrow table read from Iceberg to Amber tuples. + Properly handles LARGE_BINARY field name decoding and type detection. """ - tuple_provider = ArrowTableTupleProvider(arrow_table) + amber_schema = iceberg_schema_to_amber_schema(iceberg_schema) + arrow_table_with_metadata = pa.Table.from_arrays( + [arrow_table.column(name) for name in arrow_table.column_names], + schema=amber_schema.as_arrow_schema(), + ) + + tuple_provider = ArrowTableTupleProvider(arrow_table_with_metadata) return ( Tuple( - {name: field_accessor for name in arrow_table.column_names}, - schema=core.models.Schema(iceberg_schema.as_arrow()), + { + decode_large_binary_field_name(name): field_accessor + for name in arrow_table.column_names + }, + schema=amber_schema, ) for field_accessor in tuple_provider ) diff --git a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py index ad3f49067e6..34711beb652 100644 --- a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py +++ b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py @@ -43,6 +43,10 @@ table_result_namespace="operator-port-result", directory_path="../../../../../../amber/user-resources/workflow-results", commit_batch_size=4096, + s3_endpoint="http://localhost:9000", + s3_region="us-east-1", + s3_auth_username="minioadmin", + s3_auth_password="minioadmin", ) diff --git a/amber/src/main/python/core/storage/iceberg/test_iceberg_utils_large_binary.py b/amber/src/main/python/core/storage/iceberg/test_iceberg_utils_large_binary.py new file mode 100644 index 00000000000..c601d4f7190 --- /dev/null +++ b/amber/src/main/python/core/storage/iceberg/test_iceberg_utils_large_binary.py @@ -0,0 +1,230 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pyarrow as pa +from pyiceberg import types as iceberg_types +from pyiceberg.schema import Schema as IcebergSchema +from core.models import Schema, Tuple +from core.models.schema.attribute_type import AttributeType +from core.models.type.large_binary import largebinary +from core.storage.iceberg.iceberg_utils import ( + encode_large_binary_field_name, + decode_large_binary_field_name, + iceberg_schema_to_amber_schema, + amber_schema_to_iceberg_schema, + amber_tuples_to_arrow_table, + arrow_table_to_amber_tuples, +) + + +class TestIcebergUtilsLargeBinary: + def test_encode_large_binary_field_name(self): + """Test encoding LARGE_BINARY field names with suffix.""" + assert ( + encode_large_binary_field_name("my_field", AttributeType.LARGE_BINARY) + == "my_field__texera_large_binary_ptr" + ) + assert ( + encode_large_binary_field_name("my_field", AttributeType.STRING) + == "my_field" + ) + + def test_decode_large_binary_field_name(self): + """Test decoding LARGE_BINARY field names by removing suffix.""" + assert ( + decode_large_binary_field_name("my_field__texera_large_binary_ptr") + == "my_field" + ) + assert decode_large_binary_field_name("my_field") == "my_field" + assert decode_large_binary_field_name("regular_field") == "regular_field" + + def test_amber_schema_to_iceberg_schema_with_large_binary(self): + """Test converting Amber schema with LARGE_BINARY to Iceberg schema.""" + amber_schema = Schema() + amber_schema.add("regular_field", AttributeType.STRING) + amber_schema.add("large_binary_field", AttributeType.LARGE_BINARY) + amber_schema.add("int_field", AttributeType.INT) + + iceberg_schema = amber_schema_to_iceberg_schema(amber_schema) + + # Check field names are encoded + field_names = [field.name for field in iceberg_schema.fields] + assert "regular_field" in field_names + assert "large_binary_field__texera_large_binary_ptr" in field_names + assert "int_field" in field_names + + # Check types + large_binary_field = next( + f for f in iceberg_schema.fields if "large_binary" in f.name + ) + assert isinstance(large_binary_field.field_type, iceberg_types.StringType) + + def test_iceberg_schema_to_amber_schema_with_large_binary(self): + """Test converting Iceberg schema with LARGE_BINARY to Amber schema.""" + iceberg_schema = IcebergSchema( + iceberg_types.NestedField( + 1, "regular_field", iceberg_types.StringType(), required=False + ), + iceberg_types.NestedField( + 2, + "large_binary_field__texera_large_binary_ptr", + iceberg_types.StringType(), + required=False, + ), + iceberg_types.NestedField( + 3, "int_field", iceberg_types.IntegerType(), required=False + ), + ) + + amber_schema = iceberg_schema_to_amber_schema(iceberg_schema) + + assert amber_schema.get_attr_type("regular_field") == AttributeType.STRING + assert ( + amber_schema.get_attr_type("large_binary_field") + == AttributeType.LARGE_BINARY + ) + assert amber_schema.get_attr_type("int_field") == AttributeType.INT + + # Check Arrow schema has metadata for LARGE_BINARY + arrow_schema = amber_schema.as_arrow_schema() + large_binary_field = arrow_schema.field("large_binary_field") + assert large_binary_field.metadata is not None + assert large_binary_field.metadata.get(b"texera_type") == b"LARGE_BINARY" + + def test_amber_tuples_to_arrow_table_with_large_binary(self): + """Test converting Amber tuples with largebinary to Arrow table.""" + amber_schema = Schema() + amber_schema.add("regular_field", AttributeType.STRING) + amber_schema.add("large_binary_field", AttributeType.LARGE_BINARY) + + large_binary1 = largebinary("s3://bucket/path1") + large_binary2 = largebinary("s3://bucket/path2") + + tuples = [ + Tuple( + {"regular_field": "value1", "large_binary_field": large_binary1}, + schema=amber_schema, + ), + Tuple( + {"regular_field": "value2", "large_binary_field": large_binary2}, + schema=amber_schema, + ), + ] + + iceberg_schema = amber_schema_to_iceberg_schema(amber_schema) + arrow_table = amber_tuples_to_arrow_table(iceberg_schema, tuples) + + # Check that largebinary values are converted to URI strings + regular_values = arrow_table.column("regular_field").to_pylist() + large_binary_values = arrow_table.column( + "large_binary_field__texera_large_binary_ptr" + ).to_pylist() + + assert regular_values == ["value1", "value2"] + assert large_binary_values == ["s3://bucket/path1", "s3://bucket/path2"] + + def test_arrow_table_to_amber_tuples_with_large_binary(self): + """Test converting Arrow table with LARGE_BINARY to Amber tuples.""" + # Create Iceberg schema with encoded field name + iceberg_schema = IcebergSchema( + iceberg_types.NestedField( + 1, "regular_field", iceberg_types.StringType(), required=False + ), + iceberg_types.NestedField( + 2, + "large_binary_field__texera_large_binary_ptr", + iceberg_types.StringType(), + required=False, + ), + ) + + # Create Arrow table with URI strings + arrow_table = pa.Table.from_pydict( + { + "regular_field": ["value1", "value2"], + "large_binary_field__texera_large_binary_ptr": [ + "s3://bucket/path1", + "s3://bucket/path2", + ], + } + ) + + tuples = list(arrow_table_to_amber_tuples(iceberg_schema, arrow_table)) + + assert len(tuples) == 2 + assert tuples[0]["regular_field"] == "value1" + assert isinstance(tuples[0]["large_binary_field"], largebinary) + assert tuples[0]["large_binary_field"].uri == "s3://bucket/path1" + + assert tuples[1]["regular_field"] == "value2" + assert isinstance(tuples[1]["large_binary_field"], largebinary) + assert tuples[1]["large_binary_field"].uri == "s3://bucket/path2" + + def test_round_trip_large_binary_tuples(self): + """Test round-trip conversion of tuples with largebinary.""" + amber_schema = Schema() + amber_schema.add("regular_field", AttributeType.STRING) + amber_schema.add("large_binary_field", AttributeType.LARGE_BINARY) + + large_binary = largebinary("s3://bucket/path/to/object") + original_tuples = [ + Tuple( + {"regular_field": "value1", "large_binary_field": large_binary}, + schema=amber_schema, + ), + ] + + # Convert to Iceberg and Arrow + iceberg_schema = amber_schema_to_iceberg_schema(amber_schema) + arrow_table = amber_tuples_to_arrow_table(iceberg_schema, original_tuples) + + # Convert back to Amber tuples + retrieved_tuples = list( + arrow_table_to_amber_tuples(iceberg_schema, arrow_table) + ) + + assert len(retrieved_tuples) == 1 + assert retrieved_tuples[0]["regular_field"] == "value1" + assert isinstance(retrieved_tuples[0]["large_binary_field"], largebinary) + assert retrieved_tuples[0]["large_binary_field"].uri == large_binary.uri + + def test_arrow_table_to_amber_tuples_with_null_large_binary(self): + """Test converting Arrow table with null largebinary values.""" + iceberg_schema = IcebergSchema( + iceberg_types.NestedField( + 1, "regular_field", iceberg_types.StringType(), required=False + ), + iceberg_types.NestedField( + 2, + "large_binary_field__texera_large_binary_ptr", + iceberg_types.StringType(), + required=False, + ), + ) + + arrow_table = pa.Table.from_pydict( + { + "regular_field": ["value1"], + "large_binary_field__texera_large_binary_ptr": [None], + } + ) + + tuples = list(arrow_table_to_amber_tuples(iceberg_schema, arrow_table)) + + assert len(tuples) == 1 + assert tuples[0]["regular_field"] == "value1" + assert tuples[0]["large_binary_field"] is None diff --git a/amber/src/main/python/core/storage/storage_config.py b/amber/src/main/python/core/storage/storage_config.py index 9a03aa713f8..c55495ea14c 100644 --- a/amber/src/main/python/core/storage/storage_config.py +++ b/amber/src/main/python/core/storage/storage_config.py @@ -32,6 +32,12 @@ class StorageConfig: ICEBERG_FILE_STORAGE_DIRECTORY_PATH = None ICEBERG_TABLE_COMMIT_BATCH_SIZE = None + # S3 configs (for large_binary_manager module) + S3_ENDPOINT = None + S3_REGION = None + S3_AUTH_USERNAME = None + S3_AUTH_PASSWORD = None + @classmethod def initialize( cls, @@ -41,10 +47,14 @@ def initialize( table_result_namespace, directory_path, commit_batch_size, + s3_endpoint, + s3_region, + s3_auth_username, + s3_auth_password, ): if cls._initialized: raise RuntimeError( - "Storage config has already been initializedand cannot be modified." + "Storage config has already been initialized and cannot be modified." ) cls.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME = postgres_uri_without_scheme @@ -53,6 +63,13 @@ def initialize( cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size) + + # S3 configs + cls.S3_ENDPOINT = s3_endpoint + cls.S3_REGION = s3_region + cls.S3_AUTH_USERNAME = s3_auth_username + cls.S3_AUTH_PASSWORD = s3_auth_password + cls._initialized = True def __new__(cls, *args, **kwargs): diff --git a/amber/src/main/python/pytexera/__init__.py b/amber/src/main/python/pytexera/__init__.py index db78319f490..e40d1a43fe0 100644 --- a/amber/src/main/python/pytexera/__init__.py +++ b/amber/src/main/python/pytexera/__init__.py @@ -21,12 +21,15 @@ from pyamber import * from .storage.dataset_file_document import DatasetFileDocument +from .storage.large_binary_input_stream import LargeBinaryInputStream +from .storage.large_binary_output_stream import LargeBinaryOutputStream from .udf.udf_operator import ( UDFOperatorV2, UDFTableOperator, UDFBatchOperator, UDFSourceOperator, ) +from core.models.type.large_binary import largebinary __all__ = [ "State", @@ -41,6 +44,9 @@ "UDFBatchOperator", "UDFSourceOperator", "DatasetFileDocument", + "largebinary", + "LargeBinaryInputStream", + "LargeBinaryOutputStream", # export external tools to be used "overrides", "logger", diff --git a/amber/src/main/python/pytexera/storage/large_binary_input_stream.py b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py new file mode 100644 index 00000000000..8e7d8640403 --- /dev/null +++ b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py @@ -0,0 +1,121 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +LargeBinaryInputStream for reading largebinary data from S3. + +Usage: + with LargeBinaryInputStream(large_binary) as stream: + content = stream.read() +""" + +from typing import BinaryIO, Optional +from functools import wraps +from io import IOBase +from core.models.type.large_binary import largebinary + + +def _require_open(func): + """Decorator to ensure stream is open before reading operations.""" + + @wraps(func) + def wrapper(self, *args, **kwargs): + if self._closed: + raise ValueError("I/O operation on closed stream") + if self._underlying is None: + self._lazy_init() + return func(self, *args, **kwargs) + + return wrapper + + +class LargeBinaryInputStream(IOBase): + """ + InputStream for reading largebinary data from S3. + + Lazily downloads from S3 on first read. Supports context manager and iteration. + """ + + def __init__(self, large_binary: largebinary): + """Initialize stream for reading the given largebinary.""" + super().__init__() + if large_binary is None: + raise ValueError("largebinary cannot be None") + self._large_binary = large_binary + self._underlying: Optional[BinaryIO] = None + self._closed = False + + def _lazy_init(self): + """Download from S3 on first read operation.""" + from pytexera.storage import large_binary_manager + + s3 = large_binary_manager._get_s3_client() + response = s3.get_object( + Bucket=self._large_binary.get_bucket_name(), + Key=self._large_binary.get_object_key(), + ) + self._underlying = response["Body"] + + @_require_open + def read(self, n: int = -1) -> bytes: + """Read and return up to n bytes (-1 reads all).""" + return self._underlying.read(n) + + @_require_open + def readline(self, size: int = -1) -> bytes: + """Read and return one line from the stream.""" + return self._underlying.readline(size) + + @_require_open + def readlines(self, hint: int = -1) -> list[bytes]: + """Read and return a list of lines from the stream.""" + return self._underlying.readlines(hint) + + def readable(self) -> bool: + """Return True if the stream can be read from.""" + return not self._closed + + def seekable(self) -> bool: + """Return False - this stream does not support seeking.""" + return False + + @property + def closed(self) -> bool: + """Return True if the stream is closed.""" + return self._closed + + def close(self) -> None: + """Close the stream and release resources.""" + if not self._closed: + self._closed = True + if self._underlying is not None: + self._underlying.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def __iter__(self): + return self + + def __next__(self) -> bytes: + line = self.readline() + if not line: + raise StopIteration + return line diff --git a/amber/src/main/python/pytexera/storage/large_binary_manager.py b/amber/src/main/python/pytexera/storage/large_binary_manager.py new file mode 100644 index 00000000000..e061eac6228 --- /dev/null +++ b/amber/src/main/python/pytexera/storage/large_binary_manager.py @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Internal largebinary manager for S3 operations. + +Users should not interact with this module directly. Use largebinary() constructor +and LargeBinaryInputStream/LargeBinaryOutputStream instead. +""" + +import time +import uuid +from loguru import logger +from core.storage.storage_config import StorageConfig + +# Module-level state +_s3_client = None +DEFAULT_BUCKET = "texera-large-binaries" + + +def _get_s3_client(): + """Get or initialize S3 client (lazy initialization, cached).""" + global _s3_client + if _s3_client is None: + try: + import boto3 + from botocore.config import Config + except ImportError as e: + raise RuntimeError("boto3 required. Install with: pip install boto3") from e + + _s3_client = boto3.client( + "s3", + endpoint_url=StorageConfig.S3_ENDPOINT, + aws_access_key_id=StorageConfig.S3_AUTH_USERNAME, + aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD, + region_name=StorageConfig.S3_REGION, + config=Config(signature_version="s3v4", s3={"addressing_style": "path"}), + ) + return _s3_client + + +def _ensure_bucket_exists(bucket: str): + """Ensure S3 bucket exists, creating it if necessary.""" + s3 = _get_s3_client() + try: + s3.head_bucket(Bucket=bucket) + except s3.exceptions.NoSuchBucket: + logger.debug(f"Bucket {bucket} not found, creating it") + s3.create_bucket(Bucket=bucket) + logger.info(f"Created bucket: {bucket}") + + +def create() -> str: + """ + Creates a new largebinary reference with a unique S3 URI. + + Returns: + S3 URI string (format: s3://bucket/key) + """ + _ensure_bucket_exists(DEFAULT_BUCKET) + timestamp_ms = int(time.time() * 1000) + unique_id = uuid.uuid4() + object_key = f"objects/{timestamp_ms}/{unique_id}" + return f"s3://{DEFAULT_BUCKET}/{object_key}" diff --git a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py new file mode 100644 index 00000000000..af4f1a275c2 --- /dev/null +++ b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py @@ -0,0 +1,244 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +LargeBinaryOutputStream for streaming largebinary data to S3. + +Usage: + from pytexera import largebinary, LargeBinaryOutputStream + + large_binary = largebinary() + with LargeBinaryOutputStream(large_binary) as out: + out.write(b"data") +""" + +from typing import Optional, Union +from io import IOBase +from core.models.type.large_binary import largebinary +from pytexera.storage import large_binary_manager +import threading +import queue + +# Constants +_CHUNK_SIZE = 64 * 1024 # 64KB +_QUEUE_TIMEOUT = 0.1 + + +class _QueueReader: + """File-like object that reads from a queue.""" + + def __init__(self, q: queue.Queue): + self._queue = q + self._buffer = b"" + self._eof = False + + def read(self, size=-1): + """Read bytes from the queue.""" + if self._eof and not self._buffer: + return b"" + + # Collect chunks until we have enough data or reach EOF + chunks = [self._buffer] if self._buffer else [] + total_size = len(self._buffer) + self._buffer = b"" + needed = size if size != -1 else None + + while not self._eof and (needed is None or total_size < needed): + try: + chunk = self._queue.get(timeout=_QUEUE_TIMEOUT) + if chunk is None: # EOF marker + self._eof = True + break + chunks.append(chunk) + total_size += len(chunk) + except queue.Empty: + continue + + result = b"".join(chunks) + + # If size was specified, split and buffer remainder + if needed is not None and len(result) > needed: + self._buffer = result[needed:] + result = result[:needed] + + return result + + +class LargeBinaryOutputStream(IOBase): + """ + OutputStream for streaming largebinary data to S3. + + Data is uploaded in the background using multipart upload as you write. + Call close() to complete the upload and ensure all data is persisted. + + This class follows Python's standard I/O interface (io.IOBase). + + Usage: + from pytexera import largebinary, LargeBinaryOutputStream + + # Create a new largebinary and write to it + large_binary = largebinary() + with LargeBinaryOutputStream(large_binary) as out: + out.write(b"Hello, World!") + out.write(b"More data") + # large_binary is now ready to be added to tuples + + Note: Not thread-safe. Do not access from multiple threads concurrently. + """ + + def __init__(self, large_binary: largebinary): + """ + Initialize a LargeBinaryOutputStream. + + Args: + large_binary: The largebinary reference to write to + + Raises: + ValueError: If large_binary is None + """ + super().__init__() + if large_binary is None: + raise ValueError("largebinary cannot be None") + + self._large_binary = large_binary + self._bucket_name = large_binary.get_bucket_name() + self._object_key = large_binary.get_object_key() + self._closed = False + + # Background upload thread state + self._queue: queue.Queue = queue.Queue(maxsize=_CHUNK_SIZE) + self._upload_exception: Optional[Exception] = None + self._upload_complete = threading.Event() + self._upload_thread: Optional[threading.Thread] = None + self._lock = threading.Lock() + + def write(self, b: Union[bytes, bytearray]) -> int: + """ + Write bytes to the stream. + + Args: + b: Bytes to write + + Returns: + Number of bytes written + + Raises: + ValueError: If stream is closed + IOError: If previous upload failed + """ + if self._closed: + raise ValueError("I/O operation on closed stream") + + # Check if upload has failed + with self._lock: + if self._upload_exception is not None: + raise IOError( + f"Background upload failed: {self._upload_exception}" + ) from self._upload_exception + + # Start upload thread on first write + if self._upload_thread is None: + + def upload_worker(): + try: + large_binary_manager._ensure_bucket_exists(self._bucket_name) + s3 = large_binary_manager._get_s3_client() + reader = _QueueReader(self._queue) + s3.upload_fileobj(reader, self._bucket_name, self._object_key) + except Exception as e: + with self._lock: + self._upload_exception = e + finally: + self._upload_complete.set() + + self._upload_thread = threading.Thread(target=upload_worker, daemon=True) + self._upload_thread.start() + + # Write data in chunks + data = bytes(b) + for offset in range(0, len(data), _CHUNK_SIZE): + self._queue.put(data[offset : offset + _CHUNK_SIZE], block=True) + + return len(data) + + def writable(self) -> bool: + """Return True if the stream can be written to.""" + return not self._closed + + def seekable(self) -> bool: + """Return False - this stream does not support seeking.""" + return False + + @property + def closed(self) -> bool: + """Return True if the stream is closed.""" + return self._closed + + def flush(self) -> None: + """ + Flush the write buffer. + + Note: This doesn't guarantee data is uploaded to S3 yet. + Call close() to ensure upload completion. + """ + # No-op: data is already being consumed by the upload thread + pass + + def close(self) -> None: + """ + Close the stream and complete the S3 upload. + Blocks until upload is complete. Raises IOError if upload failed. + + Raises: + IOError: If upload failed + """ + if self._closed: + return + + self._closed = True + + # Signal EOF to upload thread and wait for completion + if self._upload_thread is not None: + self._queue.put(None, block=True) # EOF marker + self._upload_thread.join() + self._upload_complete.wait() + + # Check for errors and cleanup if needed + with self._lock: + exception = self._upload_exception + + if exception is not None: + self._cleanup_failed_upload() + raise IOError(f"Failed to complete upload: {exception}") from exception + + def _cleanup_failed_upload(self): + """Clean up a failed upload by deleting the S3 object.""" + try: + s3 = large_binary_manager._get_s3_client() + s3.delete_object(Bucket=self._bucket_name, Key=self._object_key) + except Exception: + # Ignore cleanup errors - we're already handling an upload failure + pass + + def __enter__(self): + """Context manager entry.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit - automatically cleanup.""" + self.close() + return False diff --git a/amber/src/main/python/pytexera/storage/test_large_binary_input_stream.py b/amber/src/main/python/pytexera/storage/test_large_binary_input_stream.py new file mode 100644 index 00000000000..85bdbd13fa1 --- /dev/null +++ b/amber/src/main/python/pytexera/storage/test_large_binary_input_stream.py @@ -0,0 +1,222 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest +from unittest.mock import patch, MagicMock +from io import BytesIO +from core.models.type.large_binary import largebinary +from pytexera.storage.large_binary_input_stream import LargeBinaryInputStream +from pytexera.storage import large_binary_manager + + +class TestLargeBinaryInputStream: + @pytest.fixture + def large_binary(self): + """Create a test largebinary.""" + return largebinary("s3://test-bucket/path/to/object") + + @pytest.fixture + def mock_s3_response(self): + """Create a mock S3 response with a BytesIO body.""" + return {"Body": BytesIO(b"test data content")} + + def test_init_with_valid_large_binary(self, large_binary): + """Test initialization with a valid largebinary.""" + stream = LargeBinaryInputStream(large_binary) + try: + assert stream._large_binary == large_binary + assert stream._underlying is None + assert not stream._closed + finally: + stream.close() + + def test_init_with_none_raises_error(self): + """Test that initializing with None raises ValueError.""" + with pytest.raises(ValueError, match="largebinary cannot be None"): + LargeBinaryInputStream(None) + + def test_lazy_init_downloads_from_s3(self, large_binary, mock_s3_response): + """Test that _lazy_init downloads from S3 on first read.""" + with patch.object(large_binary_manager, "_get_s3_client") as mock_get_s3_client: + mock_s3_client = MagicMock() + mock_s3_client.get_object.return_value = mock_s3_response + mock_get_s3_client.return_value = mock_s3_client + + stream = LargeBinaryInputStream(large_binary) + try: + assert stream._underlying is None # Not initialized yet + + # Trigger lazy init by reading + data = stream.read() + assert data == b"test data content" + assert stream._underlying is not None + + # Verify S3 was called correctly + mock_s3_client.get_object.assert_called_once_with( + Bucket="test-bucket", Key="path/to/object" + ) + finally: + stream.close() + + def test_read_all(self, large_binary, mock_s3_response): + """Test reading all data.""" + with patch.object(large_binary_manager, "_get_s3_client") as mock_get_s3_client: + mock_s3_client = MagicMock() + mock_s3_client.get_object.return_value = mock_s3_response + mock_get_s3_client.return_value = mock_s3_client + + stream = LargeBinaryInputStream(large_binary) + try: + data = stream.read() + assert data == b"test data content" + finally: + stream.close() + + def test_read_partial(self, large_binary, mock_s3_response): + """Test reading partial data.""" + with patch.object(large_binary_manager, "_get_s3_client") as mock_get_s3_client: + mock_s3_client = MagicMock() + mock_s3_client.get_object.return_value = mock_s3_response + mock_get_s3_client.return_value = mock_s3_client + + stream = LargeBinaryInputStream(large_binary) + try: + data = stream.read(4) + assert data == b"test" + finally: + stream.close() + + def test_readline(self, large_binary): + """Test reading a line.""" + with patch.object(large_binary_manager, "_get_s3_client") as mock_get_s3_client: + response = {"Body": BytesIO(b"line1\nline2\nline3")} + mock_s3_client = MagicMock() + mock_s3_client.get_object.return_value = response + mock_get_s3_client.return_value = mock_s3_client + + stream = LargeBinaryInputStream(large_binary) + try: + line = stream.readline() + assert line == b"line1\n" + finally: + stream.close() + + def test_readlines(self, large_binary): + """Test reading all lines.""" + with patch.object(large_binary_manager, "_get_s3_client") as mock_get_s3_client: + response = {"Body": BytesIO(b"line1\nline2\nline3")} + mock_s3_client = MagicMock() + mock_s3_client.get_object.return_value = response + mock_get_s3_client.return_value = mock_s3_client + + stream = LargeBinaryInputStream(large_binary) + try: + lines = stream.readlines() + assert lines == [b"line1\n", b"line2\n", b"line3"] + finally: + stream.close() + + def test_readable(self, large_binary): + """Test readable() method.""" + stream = LargeBinaryInputStream(large_binary) + try: + assert stream.readable() is True + + stream.close() + assert stream.readable() is False + finally: + if not stream._closed: + stream.close() + + def test_seekable(self, large_binary): + """Test seekable() method (should always return False).""" + stream = LargeBinaryInputStream(large_binary) + try: + assert stream.seekable() is False + finally: + stream.close() + + def test_closed_property(self, large_binary): + """Test closed property.""" + stream = LargeBinaryInputStream(large_binary) + try: + assert stream.closed is False + + stream.close() + assert stream.closed is True + finally: + if not stream._closed: + stream.close() + + def test_close(self, large_binary, mock_s3_response): + """Test closing the stream.""" + with patch.object(large_binary_manager, "_get_s3_client") as mock_get_s3_client: + mock_s3_client = MagicMock() + mock_s3_client.get_object.return_value = mock_s3_response + mock_get_s3_client.return_value = mock_s3_client + + stream = LargeBinaryInputStream(large_binary) + stream.read(1) # Trigger lazy init + assert stream._underlying is not None + + stream.close() + assert stream._closed is True + assert stream._underlying.closed + + def test_context_manager(self, large_binary, mock_s3_response): + """Test using as context manager.""" + with patch.object(large_binary_manager, "_get_s3_client") as mock_get_s3_client: + mock_s3_client = MagicMock() + mock_s3_client.get_object.return_value = mock_s3_response + mock_get_s3_client.return_value = mock_s3_client + + with LargeBinaryInputStream(large_binary) as stream: + data = stream.read() + assert data == b"test data content" + assert not stream._closed + + # Stream should be closed after context exit + assert stream._closed + + def test_iteration(self, large_binary): + """Test iteration over lines.""" + with patch.object(large_binary_manager, "_get_s3_client") as mock_get_s3_client: + response = {"Body": BytesIO(b"line1\nline2\nline3")} + mock_s3_client = MagicMock() + mock_s3_client.get_object.return_value = response + mock_get_s3_client.return_value = mock_s3_client + + stream = LargeBinaryInputStream(large_binary) + try: + lines = list(stream) + assert lines == [b"line1\n", b"line2\n", b"line3"] + finally: + stream.close() + + def test_read_after_close_raises_error(self, large_binary, mock_s3_response): + """Test that reading after close raises ValueError.""" + with patch.object(large_binary_manager, "_get_s3_client") as mock_get_s3_client: + mock_s3_client = MagicMock() + mock_s3_client.get_object.return_value = mock_s3_response + mock_get_s3_client.return_value = mock_s3_client + + stream = LargeBinaryInputStream(large_binary) + stream.close() + + with pytest.raises(ValueError, match="I/O operation on closed stream"): + stream.read() + # Stream is already closed, no need to close again diff --git a/amber/src/main/python/pytexera/storage/test_large_binary_manager.py b/amber/src/main/python/pytexera/storage/test_large_binary_manager.py new file mode 100644 index 00000000000..a657f244f38 --- /dev/null +++ b/amber/src/main/python/pytexera/storage/test_large_binary_manager.py @@ -0,0 +1,150 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest +from unittest.mock import patch, MagicMock +from pytexera.storage import large_binary_manager +from core.storage.storage_config import StorageConfig + + +class TestLargeBinaryManager: + @pytest.fixture(autouse=True) + def setup_storage_config(self): + """Initialize StorageConfig for tests.""" + if not StorageConfig._initialized: + StorageConfig.initialize( + postgres_uri_without_scheme="localhost:5432/test", + postgres_username="test", + postgres_password="test", + table_result_namespace="test", + directory_path="/tmp/test", + commit_batch_size=1000, + s3_endpoint="http://localhost:9000", + s3_region="us-east-1", + s3_auth_username="minioadmin", + s3_auth_password="minioadmin", + ) + + def test_get_s3_client_initializes_once(self): + """Test that S3 client is initialized and cached.""" + # Reset the client + large_binary_manager._s3_client = None + + with patch("boto3.client") as mock_boto3_client: + mock_client = MagicMock() + mock_boto3_client.return_value = mock_client + + # First call should create client + client1 = large_binary_manager._get_s3_client() + assert client1 == mock_client + assert mock_boto3_client.call_count == 1 + + # Second call should return cached client + client2 = large_binary_manager._get_s3_client() + assert client2 == mock_client + assert mock_boto3_client.call_count == 1 # Still 1, not 2 + + def test_get_s3_client_without_boto3_raises_error(self): + """Test that missing boto3 raises RuntimeError.""" + large_binary_manager._s3_client = None + + import sys + + # Temporarily remove boto3 from sys.modules to simulate it not being installed + boto3_backup = sys.modules.pop("boto3", None) + try: + # Mock the import to raise ImportError + original_import = __import__ + + def mock_import(name, *args, **kwargs): + if name == "boto3": + raise ImportError("No module named boto3") + return original_import(name, *args, **kwargs) + + with patch("builtins.__import__", side_effect=mock_import): + with pytest.raises(RuntimeError, match="boto3 required"): + large_binary_manager._get_s3_client() + finally: + # Restore boto3 if it was there + if boto3_backup is not None: + sys.modules["boto3"] = boto3_backup + + def test_ensure_bucket_exists_when_bucket_exists(self): + """Test that existing bucket doesn't trigger creation.""" + large_binary_manager._s3_client = None + + with patch("boto3.client") as mock_boto3_client: + mock_client = MagicMock() + mock_boto3_client.return_value = mock_client + # head_bucket doesn't raise exception (bucket exists) + mock_client.head_bucket.return_value = None + mock_client.exceptions.NoSuchBucket = type("NoSuchBucket", (Exception,), {}) + + large_binary_manager._ensure_bucket_exists("test-bucket") + mock_client.head_bucket.assert_called_once_with(Bucket="test-bucket") + mock_client.create_bucket.assert_not_called() + + def test_ensure_bucket_exists_creates_bucket_when_missing(self): + """Test that missing bucket triggers creation.""" + large_binary_manager._s3_client = None + + with patch("boto3.client") as mock_boto3_client: + mock_client = MagicMock() + mock_boto3_client.return_value = mock_client + # head_bucket raises NoSuchBucket exception + no_such_bucket = type("NoSuchBucket", (Exception,), {}) + mock_client.exceptions.NoSuchBucket = no_such_bucket + mock_client.head_bucket.side_effect = no_such_bucket() + + large_binary_manager._ensure_bucket_exists("test-bucket") + mock_client.head_bucket.assert_called_once_with(Bucket="test-bucket") + mock_client.create_bucket.assert_called_once_with(Bucket="test-bucket") + + def test_create_generates_unique_uri(self): + """Test that create() generates a unique S3 URI.""" + large_binary_manager._s3_client = None + + with patch("boto3.client") as mock_boto3_client: + mock_client = MagicMock() + mock_boto3_client.return_value = mock_client + mock_client.head_bucket.return_value = None + mock_client.exceptions.NoSuchBucket = type("NoSuchBucket", (Exception,), {}) + + uri = large_binary_manager.create() + + # Check URI format + assert uri.startswith("s3://") + assert uri.startswith(f"s3://{large_binary_manager.DEFAULT_BUCKET}/") + assert "objects/" in uri + + # Verify bucket was checked/created + mock_client.head_bucket.assert_called_once_with( + Bucket=large_binary_manager.DEFAULT_BUCKET + ) + + def test_create_uses_default_bucket(self): + """Test that create() uses the default bucket.""" + large_binary_manager._s3_client = None + + with patch("boto3.client") as mock_boto3_client: + mock_client = MagicMock() + mock_boto3_client.return_value = mock_client + mock_client.head_bucket.return_value = None + mock_client.exceptions.NoSuchBucket = type("NoSuchBucket", (Exception,), {}) + + uri = large_binary_manager.create() + assert large_binary_manager.DEFAULT_BUCKET in uri diff --git a/amber/src/main/python/pytexera/storage/test_large_binary_output_stream.py b/amber/src/main/python/pytexera/storage/test_large_binary_output_stream.py new file mode 100644 index 00000000000..7ebcc9b4cfd --- /dev/null +++ b/amber/src/main/python/pytexera/storage/test_large_binary_output_stream.py @@ -0,0 +1,238 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest +import time +from unittest.mock import patch, MagicMock +from core.models.type.large_binary import largebinary +from pytexera.storage.large_binary_output_stream import LargeBinaryOutputStream +from pytexera.storage import large_binary_manager + + +class TestLargeBinaryOutputStream: + @pytest.fixture + def large_binary(self): + """Create a test largebinary.""" + return largebinary("s3://test-bucket/path/to/object") + + def test_init_with_valid_large_binary(self, large_binary): + """Test initialization with a valid largebinary.""" + stream = LargeBinaryOutputStream(large_binary) + assert stream._large_binary == large_binary + assert stream._bucket_name == "test-bucket" + assert stream._object_key == "path/to/object" + assert not stream._closed + assert stream._upload_thread is None + + def test_init_with_none_raises_error(self): + """Test that initializing with None raises ValueError.""" + with pytest.raises(ValueError, match="largebinary cannot be None"): + LargeBinaryOutputStream(None) + + def test_write_starts_upload_thread(self, large_binary): + """Test that write() starts the upload thread.""" + with ( + patch.object(large_binary_manager, "_get_s3_client") as mock_get_s3_client, + patch.object( + large_binary_manager, "_ensure_bucket_exists" + ) as mock_ensure_bucket, + ): + mock_s3 = MagicMock() + mock_get_s3_client.return_value = mock_s3 + mock_ensure_bucket.return_value = None + + stream = LargeBinaryOutputStream(large_binary) + assert stream._upload_thread is None + + stream.write(b"test data") + assert stream._upload_thread is not None + # Thread may have already completed, so just check it was created + assert stream._upload_thread is not None + + # Wait for thread to finish + stream.close() + + def test_write_data(self, large_binary): + """Test writing data to the stream.""" + with ( + patch.object(large_binary_manager, "_get_s3_client") as mock_get_s3_client, + patch.object( + large_binary_manager, "_ensure_bucket_exists" + ) as mock_ensure_bucket, + ): + mock_s3 = MagicMock() + mock_get_s3_client.return_value = mock_s3 + mock_ensure_bucket.return_value = None + + stream = LargeBinaryOutputStream(large_binary) + bytes_written = stream.write(b"test data") + assert bytes_written == len(b"test data") + + stream.close() + + def test_write_multiple_chunks(self, large_binary): + """Test writing multiple chunks of data.""" + with ( + patch.object(large_binary_manager, "_get_s3_client") as mock_get_s3_client, + patch.object( + large_binary_manager, "_ensure_bucket_exists" + ) as mock_ensure_bucket, + ): + mock_s3 = MagicMock() + mock_get_s3_client.return_value = mock_s3 + mock_ensure_bucket.return_value = None + + stream = LargeBinaryOutputStream(large_binary) + stream.write(b"chunk1") + stream.write(b"chunk2") + stream.write(b"chunk3") + + stream.close() + + def test_writable(self, large_binary): + """Test writable() method.""" + stream = LargeBinaryOutputStream(large_binary) + assert stream.writable() is True + + stream.close() + assert stream.writable() is False + + def test_seekable(self, large_binary): + """Test seekable() method (should always return False).""" + stream = LargeBinaryOutputStream(large_binary) + assert stream.seekable() is False + + def test_closed_property(self, large_binary): + """Test closed property.""" + stream = LargeBinaryOutputStream(large_binary) + assert stream.closed is False + + stream.close() + assert stream.closed is True + + def test_flush(self, large_binary): + """Test flush() method (should be a no-op).""" + stream = LargeBinaryOutputStream(large_binary) + # Should not raise any exception + stream.flush() + + def test_close_completes_upload(self, large_binary): + """Test that close() completes the upload.""" + with ( + patch.object(large_binary_manager, "_get_s3_client") as mock_get_s3_client, + patch.object( + large_binary_manager, "_ensure_bucket_exists" + ) as mock_ensure_bucket, + ): + mock_s3 = MagicMock() + mock_get_s3_client.return_value = mock_s3 + mock_ensure_bucket.return_value = None + + stream = LargeBinaryOutputStream(large_binary) + stream.write(b"test data") + + # Close should wait for upload to complete + stream.close() + + # Verify upload_fileobj was called + assert mock_s3.upload_fileobj.called + + def test_context_manager(self, large_binary): + """Test using as context manager.""" + with ( + patch.object(large_binary_manager, "_get_s3_client") as mock_get_s3_client, + patch.object( + large_binary_manager, "_ensure_bucket_exists" + ) as mock_ensure_bucket, + ): + mock_s3 = MagicMock() + mock_get_s3_client.return_value = mock_s3 + mock_ensure_bucket.return_value = None + + with LargeBinaryOutputStream(large_binary) as stream: + stream.write(b"test data") + assert not stream._closed + + # Stream should be closed after context exit + assert stream._closed + + def test_write_after_close_raises_error(self, large_binary): + """Test that writing after close raises ValueError.""" + stream = LargeBinaryOutputStream(large_binary) + stream.close() + + with pytest.raises(ValueError, match="I/O operation on closed stream"): + stream.write(b"data") + + def test_close_handles_upload_error(self, large_binary): + """Test that close() raises IOError if upload fails.""" + with ( + patch.object(large_binary_manager, "_get_s3_client") as mock_get_s3_client, + patch.object( + large_binary_manager, "_ensure_bucket_exists" + ) as mock_ensure_bucket, + ): + mock_s3 = MagicMock() + mock_get_s3_client.return_value = mock_s3 + mock_ensure_bucket.return_value = None + mock_s3.upload_fileobj.side_effect = Exception("Upload failed") + + stream = LargeBinaryOutputStream(large_binary) + stream.write(b"test data") + + with pytest.raises(IOError, match="Failed to complete upload"): + stream.close() + + def test_write_after_upload_error_raises_error(self, large_binary): + """Test that writing after upload error raises IOError.""" + with ( + patch.object(large_binary_manager, "_get_s3_client") as mock_get_s3_client, + patch.object( + large_binary_manager, "_ensure_bucket_exists" + ) as mock_ensure_bucket, + ): + mock_s3 = MagicMock() + mock_get_s3_client.return_value = mock_s3 + mock_ensure_bucket.return_value = None + mock_s3.upload_fileobj.side_effect = Exception("Upload failed") + + stream = LargeBinaryOutputStream(large_binary) + stream.write(b"test data") + + # Wait a bit for the error to be set + time.sleep(0.1) + + with pytest.raises(IOError, match="Background upload failed"): + stream.write(b"more data") + + def test_multiple_close_calls(self, large_binary): + """Test that multiple close() calls are safe.""" + with ( + patch.object(large_binary_manager, "_get_s3_client") as mock_get_s3_client, + patch.object( + large_binary_manager, "_ensure_bucket_exists" + ) as mock_ensure_bucket, + ): + mock_s3 = MagicMock() + mock_get_s3_client.return_value = mock_s3 + mock_ensure_bucket.return_value = None + + stream = LargeBinaryOutputStream(large_binary) + stream.write(b"test data") + stream.close() + # Second close should not raise error + stream.close() diff --git a/amber/src/main/python/texera_run_python_worker.py b/amber/src/main/python/texera_run_python_worker.py index c9594cc2187..3ebf81c201f 100644 --- a/amber/src/main/python/texera_run_python_worker.py +++ b/amber/src/main/python/texera_run_python_worker.py @@ -51,6 +51,10 @@ def init_loguru_logger(stream_log_level) -> None: iceberg_table_namespace, iceberg_file_storage_directory_path, iceberg_table_commit_batch_size, + s3_endpoint, + s3_region, + s3_auth_username, + s3_auth_password, ) = sys.argv init_loguru_logger(logger_level) StorageConfig.initialize( @@ -60,6 +64,10 @@ def init_loguru_logger(stream_log_level) -> None: iceberg_table_namespace, iceberg_file_storage_directory_path, iceberg_table_commit_batch_size, + s3_endpoint, + s3_region, + s3_auth_username, + s3_auth_password, ) # Setting R_HOME environment variable for R-UDF usage diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala index f32d227fc70..558b99c9b7b 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala @@ -183,7 +183,11 @@ class PythonWorkflowWorker( StorageConfig.icebergPostgresCatalogPassword, StorageConfig.icebergTableResultNamespace, StorageConfig.fileStorageDirectoryPath.toString, - StorageConfig.icebergTableCommitBatchSize.toString + StorageConfig.icebergTableCommitBatchSize.toString, + StorageConfig.s3Endpoint, + StorageConfig.s3Region, + StorageConfig.s3Username, + StorageConfig.s3Password ) ).run(BasicIO.standard(false)) } diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala index ee7cbe55443..aa593cdcc65 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala @@ -311,7 +311,7 @@ class WorkflowService( * 2. Clears URI references from the execution registry * 3. Safely clears all result and console message documents * 4. Expires Iceberg snapshots for runtime statistics - * 5. Deletes big objects from MinIO + * 5. Deletes large binaries from MinIO * * @param eid The execution identity to clean up resources for */ diff --git a/build.sbt b/build.sbt index 68c3ba231ac..027775ff253 100644 --- a/build.sbt +++ b/build.sbt @@ -84,7 +84,19 @@ lazy val WorkflowExecutionService = (project in file("amber")) "org.slf4j" % "slf4j-api" % "1.7.26", "org.eclipse.jetty" % "jetty-server" % "9.4.20.v20190813", "org.eclipse.jetty" % "jetty-servlet" % "9.4.20.v20190813", - "org.eclipse.jetty" % "jetty-http" % "9.4.20.v20190813" + "org.eclipse.jetty" % "jetty-http" % "9.4.20.v20190813", + // Netty dependency overrides to ensure compatibility with Arrow 14.0.1 + // Arrow requires Netty 4.1.96.Final to avoid NoSuchFieldError: chunkSize + "io.netty" % "netty-all" % "4.1.96.Final", + "io.netty" % "netty-buffer" % "4.1.96.Final", + "io.netty" % "netty-codec" % "4.1.96.Final", + "io.netty" % "netty-codec-http" % "4.1.96.Final", + "io.netty" % "netty-codec-http2" % "4.1.96.Final", + "io.netty" % "netty-common" % "4.1.96.Final", + "io.netty" % "netty-handler" % "4.1.96.Final", + "io.netty" % "netty-resolver" % "4.1.96.Final", + "io.netty" % "netty-transport" % "4.1.96.Final", + "io.netty" % "netty-transport-native-unix-common" % "4.1.96.Final" ), libraryDependencies ++= Seq( "com.squareup.okhttp3" % "okhttp" % "4.10.0" force () // Force usage of OkHttp 4.10.0 diff --git a/common/workflow-core/build.sbt b/common/workflow-core/build.sbt index ab6b8f27c65..99870f03eb4 100644 --- a/common/workflow-core/build.sbt +++ b/common/workflow-core/build.sbt @@ -115,6 +115,7 @@ libraryDependencies ++= Seq( ///////////////////////////////////////////////////////////////////////////// // Arrow related val arrowVersion = "14.0.1" +val nettyVersion = "4.1.96.Final" val arrowDependencies = Seq( // https://mvnrepository.com/artifact/org.apache.arrow/flight-grpc "org.apache.arrow" % "flight-grpc" % arrowVersion, @@ -124,6 +125,22 @@ val arrowDependencies = Seq( libraryDependencies ++= arrowDependencies +// Netty dependency overrides to ensure compatibility with Arrow +// Arrow 14.0.1 requires Netty 4.1.96.Final for proper memory allocation +// The chunkSize field issue occurs when Netty versions are mismatched +dependencyOverrides ++= Seq( + "io.netty" % "netty-all" % nettyVersion, + "io.netty" % "netty-buffer" % nettyVersion, + "io.netty" % "netty-codec" % nettyVersion, + "io.netty" % "netty-codec-http" % nettyVersion, + "io.netty" % "netty-codec-http2" % nettyVersion, + "io.netty" % "netty-common" % nettyVersion, + "io.netty" % "netty-handler" % nettyVersion, + "io.netty" % "netty-resolver" % nettyVersion, + "io.netty" % "netty-transport" % nettyVersion, + "io.netty" % "netty-transport-native-unix-common" % nettyVersion +) + ///////////////////////////////////////////////////////////////////////////// // Iceberg-related Dependencies ///////////////////////////////////////////////////////////////////////////// diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala index 8e306c12a56..626047fda21 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala @@ -26,7 +26,7 @@ import org.apache.arrow.memory.{BufferAllocator, RootAllocator} import org.apache.arrow.vector.types.FloatingPointPrecision import org.apache.arrow.vector.types.TimeUnit.MILLISECOND import org.apache.arrow.vector.types.pojo.ArrowType.PrimitiveType -import org.apache.arrow.vector.types.pojo.{ArrowType, Field} +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType} import org.apache.arrow.vector.{ BigIntVector, BitVector, @@ -73,28 +73,28 @@ object ArrowUtils extends LazyLogging { Tuple .builder(schema) .addSequentially( - vectorSchemaRoot.getFieldVectors.asScala - .map((fieldVector: FieldVector) => { + vectorSchemaRoot.getFieldVectors.asScala.zipWithIndex.map { + case (fieldVector: FieldVector, index: Int) => val value: AnyRef = fieldVector.getObject(rowIndex) try { - val arrowType = fieldVector.getField.getFieldType.getType - val attributeType = toAttributeType(arrowType) + // Use the attribute type from the schema (which includes metadata) + // instead of deriving it from the Arrow type + val attributeType = schema.getAttributes(index).getType AttributeTypeUtils.parseField(value, attributeType) - } catch { case e: Exception => logger.warn("Caught error during parsing Arrow value back to Texera value", e) null } - }) - .toArray + }.toArray ) .build() } /** * Converts an Arrow Schema into Texera Schema. + * Checks field metadata to detect LARGE_BINARY types. * * @param arrowSchema The Arrow Schema to be converted. * @return A Texera Schema. @@ -102,7 +102,12 @@ object ArrowUtils extends LazyLogging { def toTexeraSchema(arrowSchema: org.apache.arrow.vector.types.pojo.Schema): Schema = Schema( arrowSchema.getFields.asScala.map { field => - new Attribute(field.getName, toAttributeType(field.getType)) + val isLargeBinary = Option(field.getMetadata) + .exists(m => m.containsKey("texera_type") && m.get("texera_type") == "LARGE_BINARY") + + val attributeType = + if (isLargeBinary) AttributeType.LARGE_BINARY else toAttributeType(field.getType) + new Attribute(field.getName, attributeType) }.toList ) @@ -211,7 +216,7 @@ object ArrowUtils extends LazyLogging { else vector .asInstanceOf[VarCharVector] - .setSafe(index, value.asInstanceOf[String].getBytes(StandardCharsets.UTF_8)) + .setSafe(index, value.toString.getBytes(StandardCharsets.UTF_8)) case _: ArrowType.Binary | _: ArrowType.LargeBinary => if (isNull) vector.asInstanceOf[VarBinaryVector].setNull(index) else @@ -227,19 +232,27 @@ object ArrowUtils extends LazyLogging { /** * Converts an Amber schema into Arrow schema. + * Stores AttributeType in field metadata to preserve LARGE_BINARY type information. * * @param schema The Texera Schema. * @return An Arrow Schema. */ def fromTexeraSchema(schema: Schema): org.apache.arrow.vector.types.pojo.Schema = { - val arrowFields = new util.ArrayList[Field] - - for (amberAttribute <- schema.getAttributes) { - val name = amberAttribute.getName - val field = Field.nullablePrimitive(name, fromAttributeType(amberAttribute.getType)) - arrowFields.add(field) + val arrowFields = schema.getAttributes.map { attribute => + val metadata = if (attribute.getType == AttributeType.LARGE_BINARY) { + val map = new util.HashMap[String, String]() + map.put("texera_type", "LARGE_BINARY") + map + } else null + + new Field( + attribute.getName, + new FieldType(true, fromAttributeType(attribute.getType), null, metadata), + null + ) } - new org.apache.arrow.vector.types.pojo.Schema(arrowFields) + + new org.apache.arrow.vector.types.pojo.Schema(util.Arrays.asList(arrowFields: _*)) } /** @@ -270,7 +283,7 @@ object ArrowUtils extends LazyLogging { case AttributeType.BINARY => new ArrowType.Binary - case AttributeType.STRING | AttributeType.ANY => + case AttributeType.STRING | AttributeType.LARGE_BINARY | AttributeType.ANY => ArrowType.Utf8.INSTANCE case _ => diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala index edeaf7d4ae7..6a97df8ca3e 100644 --- a/common/workflow-operator/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala @@ -20,14 +20,15 @@ package org.apache.texera.amber.util import org.apache.texera.amber.core.tuple.AttributeTypeUtils.AttributeTypeException -import org.apache.texera.amber.core.tuple.{AttributeType, Schema, Tuple} +import org.apache.texera.amber.core.tuple.{AttributeType, LargeBinary, Schema, Tuple} import org.apache.arrow.memory.{BufferAllocator, RootAllocator} import org.apache.arrow.vector.VectorSchemaRoot -import org.apache.arrow.vector.types.pojo.{ArrowType, Field} +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType} import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, IntervalUnit, TimeUnit} import org.scalatest.flatspec.AnyFlatSpec import java.sql.Timestamp +import java.util import scala.jdk.CollectionConverters.IterableHasAsJava class ArrowUtilsSpec extends AnyFlatSpec { @@ -99,6 +100,9 @@ class ArrowUtilsSpec extends AnyFlatSpec { // but not the other way around. assert(ArrowUtils.fromAttributeType(AttributeType.ANY) == string) + // LARGE_BINARY is converted to ArrowType.Utf8 (same as STRING) + assert(ArrowUtils.fromAttributeType(AttributeType.LARGE_BINARY) == string) + } it should "raise AttributeTypeException when converting unsupported types" in { @@ -239,4 +243,140 @@ class ArrowUtilsSpec extends AnyFlatSpec { } + it should "convert from AttributeType to ArrowType for LARGE_BINARY correctly" in { + // LARGE_BINARY is converted to ArrowType.Utf8 (stored as string) + assert(ArrowUtils.fromAttributeType(AttributeType.LARGE_BINARY) == string) + } + + it should "convert Texera Schema with LARGE_BINARY to Arrow Schema with metadata correctly" in { + val texeraSchemaWithLargeBinary = Schema() + .add("regular_string", AttributeType.STRING) + .add("large_binary_field", AttributeType.LARGE_BINARY) + + val arrowSchema = ArrowUtils.fromTexeraSchema(texeraSchemaWithLargeBinary) + + // Check that regular string field has no metadata + val regularStringField = arrowSchema.getFields.get(0) + assert(regularStringField.getName == "regular_string") + assert(regularStringField.getType == string) + assert( + regularStringField.getMetadata == null || !regularStringField.getMetadata.containsKey( + "texera_type" + ) + ) + + // Check that LARGE_BINARY field has metadata + val largeBinaryField = arrowSchema.getFields.get(1) + assert(largeBinaryField.getName == "large_binary_field") + assert(largeBinaryField.getType == string) // LARGE_BINARY is stored as Utf8 + assert(largeBinaryField.getMetadata != null) + assert(largeBinaryField.getMetadata.get("texera_type") == "LARGE_BINARY") + } + + it should "convert Arrow Schema with LARGE_BINARY metadata to Texera Schema correctly" in { + // Create Arrow schema with LARGE_BINARY metadata + val largeBinaryMetadata = new util.HashMap[String, String]() + largeBinaryMetadata.put("texera_type", "LARGE_BINARY") + + val arrowSchemaWithLargeBinary = new org.apache.arrow.vector.types.pojo.Schema( + Array( + Field.nullablePrimitive("regular_string", string), + new Field( + "large_binary_field", + new FieldType(true, string, null, largeBinaryMetadata), + null + ) + ).toList.asJava + ) + + val texeraSchema = ArrowUtils.toTexeraSchema(arrowSchemaWithLargeBinary) + + assert(texeraSchema.getAttribute("regular_string").getName == "regular_string") + assert(texeraSchema.getAttribute("regular_string").getType == AttributeType.STRING) + + assert(texeraSchema.getAttribute("large_binary_field").getName == "large_binary_field") + assert(texeraSchema.getAttribute("large_binary_field").getType == AttributeType.LARGE_BINARY) + } + + it should "set and get Texera Tuple with LARGE_BINARY correctly" in { + val texeraSchemaWithLargeBinary = Schema() + .add("large_binary_field", AttributeType.LARGE_BINARY) + .add("regular_string", AttributeType.STRING) + + val largeBinary = new LargeBinary("s3://test-bucket/path/to/object") + val tuple = Tuple + .builder(texeraSchemaWithLargeBinary) + .addSequentially( + Array( + largeBinary, + "regular string value" + ) + ) + .build() + + val allocator: BufferAllocator = new RootAllocator() + val arrowSchema = ArrowUtils.fromTexeraSchema(texeraSchemaWithLargeBinary) + val vectorSchemaRoot = VectorSchemaRoot.create(arrowSchema, allocator) + vectorSchemaRoot.allocateNew() + + // Set Tuple into the Vectors + ArrowUtils.appendTexeraTuple(tuple, vectorSchemaRoot) + + // Verify the LARGE_BINARY is stored as string (URI) in Arrow + val storedValue = vectorSchemaRoot.getVector(0).getObject(0) + assert(storedValue.toString == "s3://test-bucket/path/to/object") + + // Get the Tuple from the Vectors + val retrievedTuple = ArrowUtils.getTexeraTuple(0, vectorSchemaRoot) + assert(retrievedTuple.getField[LargeBinary](0) == largeBinary) + assert(retrievedTuple.getField[String](1) == "regular string value") + } + + it should "handle null LARGE_BINARY values correctly" in { + val texeraSchemaWithLargeBinary = Schema() + .add("large_binary_field", AttributeType.LARGE_BINARY) + + val tuple = Tuple + .builder(texeraSchemaWithLargeBinary) + .addSequentially( + Array( + null.asInstanceOf[LargeBinary] + ) + ) + .build() + + val allocator: BufferAllocator = new RootAllocator() + val arrowSchema = ArrowUtils.fromTexeraSchema(texeraSchemaWithLargeBinary) + val vectorSchemaRoot = VectorSchemaRoot.create(arrowSchema, allocator) + vectorSchemaRoot.allocateNew() + + // Set Tuple into the Vectors + ArrowUtils.appendTexeraTuple(tuple, vectorSchemaRoot) + + // Verify null is stored correctly + assert(vectorSchemaRoot.getVector(0).getObject(0) == null) + + // Get the Tuple from the Vectors + val retrievedTuple = ArrowUtils.getTexeraTuple(0, vectorSchemaRoot) + assert(retrievedTuple.getField[LargeBinary](0) == null) + } + + it should "round-trip LARGE_BINARY schema conversion correctly" in { + val originalSchema = Schema() + .add("field1", AttributeType.STRING) + .add("field2", AttributeType.LARGE_BINARY) + .add("field3", AttributeType.INTEGER) + .add("field4", AttributeType.LARGE_BINARY) + + // Convert to Arrow and back + val arrowSchema = ArrowUtils.fromTexeraSchema(originalSchema) + val roundTripSchema = ArrowUtils.toTexeraSchema(arrowSchema) + + assert(roundTripSchema.getAttribute("field1").getType == AttributeType.STRING) + assert(roundTripSchema.getAttribute("field2").getType == AttributeType.LARGE_BINARY) + assert(roundTripSchema.getAttribute("field3").getType == AttributeType.INTEGER) + assert(roundTripSchema.getAttribute("field4").getType == AttributeType.LARGE_BINARY) + assert(roundTripSchema == originalSchema) + } + }