Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions amber/operator-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ pybase64==1.3.2
torch==2.8.0
scikit-learn==1.5.0
transformers==4.57.3
boto3==1.40.53
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def tuple_to_frame(self, tuples: typing.List[Tuple]) -> DataFrame:
return DataFrame(
frame=Table.from_pydict(
{
name: [t[name] for t in tuples]
name: [t.get_serialized_field(name) for t in tuples]
for name in self.get_port().get_schema().get_attr_names()
},
schema=self.get_port().get_schema().as_arrow_schema(),
Expand Down
3 changes: 2 additions & 1 deletion amber/src/main/python/core/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import builtins
from inspect import Traceback
from typing import NamedTuple

Expand All @@ -36,7 +37,7 @@


class ExceptionInfo(NamedTuple):
exc: type
exc: builtins.type
value: Exception
tb: Traceback

Expand Down
2 changes: 2 additions & 0 deletions amber/src/main/python/core/models/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
# under the License.

from .attribute_type import AttributeType
from core.models.type.large_binary import largebinary
from .field import Field
from .schema import Schema


__all__ = [
"AttributeType",
"largebinary",
"Field",
"Schema",
]
63 changes: 63 additions & 0 deletions amber/src/main/python/core/models/schema/arrow_schema_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Utilities for converting between Arrow schemas and Amber schemas,
handling LARGE_BINARY metadata preservation.
"""

import pyarrow as pa
from typing import Mapping

from core.models.schema.attribute_type import AttributeType
from core.models.schema.attribute_type_utils import (
detect_attribute_type_from_arrow_field,
create_arrow_field_with_metadata,
)


def arrow_schema_to_attr_types(arrow_schema: pa.Schema) -> dict[str, AttributeType]:
"""
Converts an Arrow schema to a dictionary of attribute name to AttributeType.
Handles LARGE_BINARY metadata detection.

:param arrow_schema: PyArrow schema that may contain LARGE_BINARY metadata
:return: Dictionary mapping attribute names to AttributeTypes
"""
attr_types = {}
for attr_name in arrow_schema.names:
field = arrow_schema.field(attr_name)
attr_types[attr_name] = detect_attribute_type_from_arrow_field(field)
return attr_types


def attr_types_to_arrow_schema(
attr_types: Mapping[str, AttributeType],
) -> pa.Schema:
"""
Converts a mapping of attribute name to AttributeType into an Arrow schema.
Adds metadata for LARGE_BINARY types.
Preserves the order of attributes from the input mapping.

:param attr_types: Mapping of attribute names to AttributeTypes (e.g., OrderedDict)
:return: PyArrow schema with metadata for LARGE_BINARY types
"""
fields = [
create_arrow_field_with_metadata(attr_name, attr_type)
for attr_name, attr_type in attr_types.items()
]
return pa.schema(fields)
6 changes: 6 additions & 0 deletions amber/src/main/python/core/models/schema/attribute_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from bidict import bidict
from enum import Enum
from pyarrow import lib
from core.models.type.large_binary import largebinary


class AttributeType(Enum):
Expand All @@ -37,6 +38,7 @@ class AttributeType(Enum):
DOUBLE = 5
TIMESTAMP = 6
BINARY = 7
LARGE_BINARY = 8


RAW_TYPE_MAPPING = bidict(
Expand All @@ -48,6 +50,7 @@ class AttributeType(Enum):
"BOOLEAN": AttributeType.BOOL,
"TIMESTAMP": AttributeType.TIMESTAMP,
"BINARY": AttributeType.BINARY,
"LARGE_BINARY": AttributeType.LARGE_BINARY,
}
)

Expand All @@ -59,6 +62,7 @@ class AttributeType(Enum):
AttributeType.BOOL: pa.bool_(),
AttributeType.BINARY: pa.binary(),
AttributeType.TIMESTAMP: pa.timestamp("us"),
AttributeType.LARGE_BINARY: pa.string(), # Serialized as URI string
}

FROM_ARROW_MAPPING = {
Expand All @@ -83,6 +87,7 @@ class AttributeType(Enum):
AttributeType.BOOL: bool,
AttributeType.BINARY: bytes,
AttributeType.TIMESTAMP: datetime.datetime,
AttributeType.LARGE_BINARY: largebinary,
}

FROM_PYOBJECT_MAPPING = {
Expand All @@ -92,4 +97,5 @@ class AttributeType(Enum):
bool: AttributeType.BOOL,
bytes: AttributeType.BINARY,
datetime.datetime: AttributeType.TIMESTAMP,
largebinary: AttributeType.LARGE_BINARY,
}
72 changes: 72 additions & 0 deletions amber/src/main/python/core/models/schema/attribute_type_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Utilities for converting between AttributeTypes and Arrow field types,
handling LARGE_BINARY metadata preservation.
"""

import pyarrow as pa

from core.models.schema.attribute_type import (
AttributeType,
FROM_ARROW_MAPPING,
TO_ARROW_MAPPING,
)

# Metadata key used to mark LARGE_BINARY fields in Arrow schemas
TEXERA_TYPE_METADATA_KEY = b"texera_type"
LARGE_BINARY_METADATA_VALUE = b"LARGE_BINARY"


def detect_attribute_type_from_arrow_field(field: pa.Field) -> AttributeType:
"""
Detects the AttributeType from an Arrow field, checking metadata for LARGE_BINARY.

:param field: PyArrow field that may contain metadata
:return: The detected AttributeType
"""
# Check metadata for LARGE_BINARY type
# (can be stored by either Scala ArrowUtils or Python)
is_large_binary = (
field.metadata
and field.metadata.get(TEXERA_TYPE_METADATA_KEY) == LARGE_BINARY_METADATA_VALUE
)

if is_large_binary:
return AttributeType.LARGE_BINARY
else:
return FROM_ARROW_MAPPING[field.type.id]


def create_arrow_field_with_metadata(
attr_name: str, attr_type: AttributeType
) -> pa.Field:
"""
Creates a PyArrow field with appropriate metadata for the given AttributeType.

:param attr_name: Name of the attribute
:param attr_type: The AttributeType
:return: PyArrow field with metadata if needed
"""
metadata = (
{TEXERA_TYPE_METADATA_KEY: LARGE_BINARY_METADATA_VALUE}
if attr_type == AttributeType.LARGE_BINARY
else None
)

return pa.field(attr_name, TO_ARROW_MAPPING[attr_type], metadata=metadata)
19 changes: 8 additions & 11 deletions amber/src/main/python/core/models/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
from core.models.schema.attribute_type import (
AttributeType,
RAW_TYPE_MAPPING,
FROM_ARROW_MAPPING,
TO_ARROW_MAPPING,
)
from core.models.schema.arrow_schema_utils import (
arrow_schema_to_attr_types,
attr_types_to_arrow_schema,
)


Expand Down Expand Up @@ -85,22 +87,17 @@ def _from_arrow_schema(self, arrow_schema: pa.Schema) -> None:
:return:
"""
self._name_type_mapping = OrderedDict()
attr_types = arrow_schema_to_attr_types(arrow_schema)
# Preserve field order from arrow_schema
for attr_name in arrow_schema.names:
arrow_type = arrow_schema.field(attr_name).type # type: ignore
attr_type = FROM_ARROW_MAPPING[arrow_type.id]
self.add(attr_name, attr_type)
self.add(attr_name, attr_types[attr_name])

def as_arrow_schema(self) -> pa.Schema:
"""
Creates a new pyarrow.Schema according to the current Schema.
:return: pyarrow.Schema
"""
return pa.schema(
[
pa.field(attr_name, TO_ARROW_MAPPING[attr_type])
for attr_name, attr_type in self._name_type_mapping.items()
]
)
return attr_types_to_arrow_schema(self._name_type_mapping)

def get_attr_names(self) -> List[str]:
"""
Expand Down
65 changes: 65 additions & 0 deletions amber/src/main/python/core/models/schema/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,68 @@ def test_convert_from_raw_schema(self, raw_schema, schema):
def test_convert_from_arrow_schema(self, arrow_schema, schema):
assert schema == Schema(arrow_schema=arrow_schema)
assert schema.as_arrow_schema() == arrow_schema

def test_large_binary_in_raw_schema(self):
"""Test creating schema with LARGE_BINARY from raw schema."""
raw_schema = {
"regular_field": "STRING",
"large_binary_field": "LARGE_BINARY",
}
schema = Schema(raw_schema=raw_schema)
assert schema.get_attr_type("regular_field") == AttributeType.STRING
assert schema.get_attr_type("large_binary_field") == AttributeType.LARGE_BINARY

def test_large_binary_in_arrow_schema_with_metadata(self):
"""Test creating schema with LARGE_BINARY from Arrow schema with metadata."""
arrow_schema = pa.schema(
[
pa.field("regular_field", pa.string()),
pa.field(
"large_binary_field",
pa.string(),
metadata={b"texera_type": b"LARGE_BINARY"},
),
]
)
schema = Schema(arrow_schema=arrow_schema)
assert schema.get_attr_type("regular_field") == AttributeType.STRING
assert schema.get_attr_type("large_binary_field") == AttributeType.LARGE_BINARY

def test_large_binary_as_arrow_schema_includes_metadata(self):
"""Test that LARGE_BINARY fields include metadata in Arrow schema."""
schema = Schema()
schema.add("regular_field", AttributeType.STRING)
schema.add("large_binary_field", AttributeType.LARGE_BINARY)

arrow_schema = schema.as_arrow_schema()

# Regular field should have no metadata
regular_field = arrow_schema.field("regular_field")
assert (
regular_field.metadata is None
or b"texera_type" not in regular_field.metadata
)

# LARGE_BINARY field should have metadata
large_binary_field = arrow_schema.field("large_binary_field")
assert large_binary_field.metadata is not None
assert large_binary_field.metadata.get(b"texera_type") == b"LARGE_BINARY"
assert (
large_binary_field.type == pa.string()
) # LARGE_BINARY is stored as string

def test_round_trip_large_binary_schema(self):
"""Test round-trip conversion of schema with LARGE_BINARY."""
original_schema = Schema()
original_schema.add("field1", AttributeType.STRING)
original_schema.add("field2", AttributeType.LARGE_BINARY)
original_schema.add("field3", AttributeType.INT)

# Convert to Arrow and back
arrow_schema = original_schema.as_arrow_schema()
round_trip_schema = Schema(arrow_schema=arrow_schema)

assert round_trip_schema == original_schema
assert round_trip_schema.get_attr_type("field1") == AttributeType.STRING
assert round_trip_schema.get_attr_type("field2") == AttributeType.LARGE_BINARY
assert round_trip_schema.get_attr_type("field3") == AttributeType.INT
Loading
Loading