Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.texera.dao.jooq.generated.tables.daos.{
WorkflowUserAccessDao
}
import org.apache.texera.dao.jooq.generated.tables.pojos._
import org.apache.texera.service.util.BigObjectManager
import org.apache.texera.service.util.LargeBinaryManager
import org.apache.texera.web.resource.dashboard.hub.EntityType
import org.apache.texera.web.resource.dashboard.hub.HubResource.recordCloneAction
import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowAccessResource.hasReadAccess
Expand Down Expand Up @@ -601,7 +601,7 @@ class WorkflowResource extends LazyLogging {
.asScala
.toList

BigObjectManager.deleteAllObjects()
LargeBinaryManager.deleteAllObjects()

// Collect all URIs related to executions for cleanup
val uris = eids.flatMap { eid =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.texera.amber.error.ErrorUtils.{
getStackTraceWithAllCauses
}
import org.apache.texera.dao.jooq.generated.tables.pojos.User
import org.apache.texera.service.util.BigObjectManager
import org.apache.texera.service.util.LargeBinaryManager
import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent
import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest
import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
Expand Down Expand Up @@ -348,7 +348,7 @@ class WorkflowService(
logger.debug(s"Error processing document at $uri: ${error.getMessage}")
}
}
// Delete big objects
BigObjectManager.deleteAllObjects()
// Delete large binaries
LargeBinaryManager.deleteAllObjects()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public enum AttributeType implements Serializable {
BOOLEAN("boolean", Boolean.class),
TIMESTAMP("timestamp", Timestamp.class),
BINARY("binary", byte[].class),
BIG_OBJECT("big_object", BigObject.class),
LARGE_BINARY("large_binary", LargeBinary.class),
ANY("ANY", Object.class);

private final String name;
Expand Down Expand Up @@ -110,8 +110,8 @@ public static AttributeType getAttributeType(Class<?> fieldClass) {
return TIMESTAMP;
} else if (fieldClass.equals(byte[].class)) {
return BINARY;
} else if (fieldClass.equals(BigObject.class)) {
return BIG_OBJECT;
} else if (fieldClass.equals(LargeBinary.class)) {
return LARGE_BINARY;
} else {
return ANY;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ object AttributeTypeUtils extends Serializable {
): Any = {
if (field == null) return null
attributeType match {
case AttributeType.INTEGER => parseInteger(field, force)
case AttributeType.LONG => parseLong(field, force)
case AttributeType.DOUBLE => parseDouble(field)
case AttributeType.BOOLEAN => parseBoolean(field)
case AttributeType.TIMESTAMP => parseTimestamp(field)
case AttributeType.STRING => field.toString
case AttributeType.BINARY => field
case AttributeType.BIG_OBJECT => new BigObject(field.toString)
case AttributeType.ANY | _ => field
case AttributeType.INTEGER => parseInteger(field, force)
case AttributeType.LONG => parseLong(field, force)
case AttributeType.DOUBLE => parseDouble(field)
case AttributeType.BOOLEAN => parseBoolean(field)
case AttributeType.TIMESTAMP => parseTimestamp(field)
case AttributeType.STRING => field.toString
case AttributeType.BINARY => field
case AttributeType.LARGE_BINARY => new LargeBinary(field.toString)
case AttributeType.ANY | _ => field
}
}

Expand Down Expand Up @@ -384,8 +384,8 @@ object AttributeTypeUtils extends Serializable {
case AttributeType.INTEGER => tryParseInteger(fieldValue)
case AttributeType.TIMESTAMP => tryParseTimestamp(fieldValue)
case AttributeType.BINARY => tryParseString()
case AttributeType.BIG_OBJECT =>
AttributeType.BIG_OBJECT // Big objects are never inferred from data
case AttributeType.LARGE_BINARY =>
AttributeType.LARGE_BINARY // Large binaries are never inferred from data
case _ => tryParseString()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,56 +23,56 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import org.apache.texera.amber.core.executor.OperatorExecutor;
import org.apache.texera.service.util.BigObjectManager;
import org.apache.texera.service.util.LargeBinaryManager;

import java.net.URI;
import java.util.Objects;

/**
* BigObject represents a reference to a large object stored in S3.
* LargeBinary represents a reference to a large object stored in S3.
*
* Each BigObject is identified by an S3 URI (s3://bucket/path/to/object).
* BigObjects are automatically tracked and cleaned up when the workflow execution completes.
* Each LargeBinary is identified by an S3 URI (s3://bucket/path/to/object).
* LargeBinaries are automatically tracked and cleaned up when the workflow execution completes.
*/
public class BigObject {
public class LargeBinary {

private final String uri;

/**
* Creates a BigObject from an existing S3 URI.
* Creates a LargeBinary from an existing S3 URI.
* Used primarily for deserialization from JSON.
*
* @param uri S3 URI in the format s3://bucket/path/to/object
* @throws IllegalArgumentException if URI is null or doesn't start with "s3://"
*/
@JsonCreator
public BigObject(@JsonProperty("uri") String uri) {
public LargeBinary(@JsonProperty("uri") String uri) {
if (uri == null) {
throw new IllegalArgumentException("BigObject URI cannot be null");
throw new IllegalArgumentException("LargeBinary URI cannot be null");
}
if (!uri.startsWith("s3://")) {
throw new IllegalArgumentException(
"BigObject URI must start with 's3://', got: " + uri
"LargeBinary URI must start with 's3://', got: " + uri
);
}
this.uri = uri;
}

/**
* Creates a new BigObject for writing data.
* Creates a new LargeBinary for writing data.
* Generates a unique S3 URI.
*
* Usage example:
*
* BigObject bigObject = new BigObject();
* try (BigObjectOutputStream out = new BigObjectOutputStream(bigObject)) {
* LargeBinary largeBinary = new LargeBinary();
* try (LargeBinaryOutputStream out = new LargeBinaryOutputStream(largeBinary)) {
* out.write(data);
* }
* // bigObject is now ready to be added to tuples
* // largeBinary is now ready to be added to tuples
*
*/
public BigObject() {
this(BigObjectManager.create());
public LargeBinary() {
this(LargeBinaryManager.create());
}

@JsonValue
Expand All @@ -97,8 +97,8 @@ public String toString() {
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (!(obj instanceof BigObject)) return false;
BigObject that = (BigObject) obj;
if (!(obj instanceof LargeBinary)) return false;
LargeBinary that = (LargeBinary) obj;
return Objects.equals(uri, that.uri);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.texera.amber.util

import org.apache.texera.amber.config.StorageConfig
import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, BigObject, Schema, Tuple}
import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, LargeBinary, Schema, Tuple}
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
import org.apache.iceberg.data.parquet.GenericParquetReaders
Expand Down Expand Up @@ -52,8 +52,8 @@ import scala.jdk.CollectionConverters._
*/
object IcebergUtil {

// Unique suffix for BIG_OBJECT field encoding
private val BIG_OBJECT_FIELD_SUFFIX = "__texera_big_obj_ptr"
// Unique suffix for LARGE_BINARY field encoding
private val LARGE_BINARY_FIELD_SUFFIX = "__texera_large_binary_ptr"

/**
* Creates and initializes a HadoopCatalog with the given parameters.
Expand Down Expand Up @@ -203,15 +203,15 @@ object IcebergUtil {

/**
* Converts a custom Amber `Schema` to an Iceberg `Schema`.
* Field names are encoded to preserve BIG_OBJECT type information.
* Field names are encoded to preserve LARGE_BINARY type information.
*
* @param amberSchema The custom Amber Schema.
* @return An Iceberg Schema.
*/
def toIcebergSchema(amberSchema: Schema): IcebergSchema = {
val icebergFields = amberSchema.getAttributes.zipWithIndex.map {
case (attribute, index) =>
val encodedName = encodeBigObjectFieldName(attribute.getName, attribute.getType)
val encodedName = encodeLargeBinaryFieldName(attribute.getName, attribute.getType)
val icebergType = toIcebergType(attribute.getType)
Types.NestedField.optional(index + 1, encodedName, icebergType)
}
Expand All @@ -220,7 +220,7 @@ object IcebergUtil {

/**
* Converts a custom Amber `AttributeType` to an Iceberg `Type`.
* Note: BIG_OBJECT is stored as StringType; field name encoding is used to distinguish it.
* Note: LARGE_BINARY is stored as StringType; field name encoding is used to distinguish it.
*
* @param attributeType The custom Amber AttributeType.
* @return The corresponding Iceberg Type.
Expand All @@ -234,8 +234,8 @@ object IcebergUtil {
case AttributeType.BOOLEAN => Types.BooleanType.get()
case AttributeType.TIMESTAMP => Types.TimestampType.withoutZone()
case AttributeType.BINARY => Types.BinaryType.get()
case AttributeType.BIG_OBJECT =>
Types.StringType.get() // Store BigObjectPointer URI as string
case AttributeType.LARGE_BINARY =>
Types.StringType.get() // Store LargeBinary URI as string
case AttributeType.ANY =>
throw new IllegalArgumentException("ANY type is not supported in Iceberg")
}
Expand All @@ -252,13 +252,13 @@ object IcebergUtil {

tuple.schema.getAttributes.zipWithIndex.foreach {
case (attribute, index) =>
val fieldName = encodeBigObjectFieldName(attribute.getName, attribute.getType)
val fieldName = encodeLargeBinaryFieldName(attribute.getName, attribute.getType)
val value = tuple.getField[AnyRef](index) match {
case null => null
case ts: Timestamp => ts.toInstant.atZone(ZoneId.systemDefault()).toLocalDateTime
case bytes: Array[Byte] => ByteBuffer.wrap(bytes)
case bigObjPtr: BigObject => bigObjPtr.getUri
case other => other
case null => null
case ts: Timestamp => ts.toInstant.atZone(ZoneId.systemDefault()).toLocalDateTime
case bytes: Array[Byte] => ByteBuffer.wrap(bytes)
case largeBinaryPtr: LargeBinary => largeBinaryPtr.getUri
case other => other
}
record.setField(fieldName, value)
}
Expand All @@ -275,7 +275,7 @@ object IcebergUtil {
*/
def fromRecord(record: Record, amberSchema: Schema): Tuple = {
val fieldValues = amberSchema.getAttributes.map { attribute =>
val fieldName = encodeBigObjectFieldName(attribute.getName, attribute.getType)
val fieldName = encodeLargeBinaryFieldName(attribute.getName, attribute.getType)
val rawValue = record.getField(fieldName)

rawValue match {
Expand All @@ -285,8 +285,8 @@ object IcebergUtil {
val bytes = new Array[Byte](buffer.remaining())
buffer.get(bytes)
bytes
case uri: String if attribute.getType == AttributeType.BIG_OBJECT =>
new BigObject(uri)
case uri: String if attribute.getType == AttributeType.LARGE_BINARY =>
new LargeBinary(uri)
case other => other
}
}
Expand All @@ -295,16 +295,19 @@ object IcebergUtil {
}

/**
* Encodes a field name for BIG_OBJECT types by adding a unique system suffix.
* This ensures BIG_OBJECT fields can be identified when reading from Iceberg.
* Encodes a field name for LARGE_BINARY types by adding a unique system suffix.
* This ensures LARGE_BINARY fields can be identified when reading from Iceberg.
*
* @param fieldName The original field name
* @param attributeType The attribute type
* @return The encoded field name with a unique suffix for BIG_OBJECT types
* @return The encoded field name with a unique suffix for LARGE_BINARY types
*/
private def encodeBigObjectFieldName(fieldName: String, attributeType: AttributeType): String = {
if (attributeType == AttributeType.BIG_OBJECT) {
s"${fieldName}${BIG_OBJECT_FIELD_SUFFIX}"
private def encodeLargeBinaryFieldName(
fieldName: String,
attributeType: AttributeType
): String = {
if (attributeType == AttributeType.LARGE_BINARY) {
s"${fieldName}${LARGE_BINARY_FIELD_SUFFIX}"
} else {
fieldName
}
Expand All @@ -317,27 +320,27 @@ object IcebergUtil {
* @param fieldName The encoded field name
* @return The original field name with system suffix removed
*/
private def decodeBigObjectFieldName(fieldName: String): String = {
if (isBigObjectField(fieldName)) {
fieldName.substring(0, fieldName.length - BIG_OBJECT_FIELD_SUFFIX.length)
private def decodeLargeBinaryFieldName(fieldName: String): String = {
if (isLargeBinaryField(fieldName)) {
fieldName.substring(0, fieldName.length - LARGE_BINARY_FIELD_SUFFIX.length)
} else {
fieldName
}
}

/**
* Checks if a field name indicates a BIG_OBJECT type by examining the unique suffix.
* Checks if a field name indicates a LARGE_BINARY type by examining the unique suffix.
*
* @param fieldName The field name to check
* @return true if the field represents a BIG_OBJECT type, false otherwise
* @return true if the field represents a LARGE_BINARY type, false otherwise
*/
private def isBigObjectField(fieldName: String): Boolean = {
fieldName.endsWith(BIG_OBJECT_FIELD_SUFFIX)
private def isLargeBinaryField(fieldName: String): Boolean = {
fieldName.endsWith(LARGE_BINARY_FIELD_SUFFIX)
}

/**
* Converts an Iceberg `Schema` to an Amber `Schema`.
* Field names are decoded to restore original names and detect BIG_OBJECT types.
* Field names are decoded to restore original names and detect LARGE_BINARY types.
*
* @param icebergSchema The Iceberg Schema.
* @return The corresponding Amber Schema.
Expand All @@ -349,7 +352,7 @@ object IcebergUtil {
.map { field =>
val fieldName = field.name()
val attributeType = fromIcebergType(field.`type`().asPrimitiveType(), fieldName)
val originalName = decodeBigObjectFieldName(fieldName)
val originalName = decodeLargeBinaryFieldName(fieldName)
new Attribute(originalName, attributeType)
}
.toList
Expand All @@ -361,7 +364,7 @@ object IcebergUtil {
* Converts an Iceberg `Type` to an Amber `AttributeType`.
*
* @param icebergType The Iceberg Type.
* @param fieldName The field name (used to detect BIG_OBJECT by suffix).
* @param fieldName The field name (used to detect LARGE_BINARY by suffix).
* @return The corresponding Amber AttributeType.
*/
def fromIcebergType(
Expand All @@ -370,7 +373,7 @@ object IcebergUtil {
): AttributeType = {
icebergType match {
case _: Types.StringType =>
if (isBigObjectField(fieldName)) AttributeType.BIG_OBJECT else AttributeType.STRING
if (isLargeBinaryField(fieldName)) AttributeType.LARGE_BINARY else AttributeType.STRING
case _: Types.IntegerType => AttributeType.INTEGER
case _: Types.LongType => AttributeType.LONG
case _: Types.DoubleType => AttributeType.DOUBLE
Expand Down
Loading
Loading