diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala index 0f66388c852..4248d06bdd8 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala @@ -36,7 +36,7 @@ import org.apache.texera.dao.jooq.generated.tables.daos.{ WorkflowUserAccessDao } import org.apache.texera.dao.jooq.generated.tables.pojos._ -import org.apache.texera.service.util.BigObjectManager +import org.apache.texera.service.util.LargeBinaryManager import org.apache.texera.web.resource.dashboard.hub.EntityType import org.apache.texera.web.resource.dashboard.hub.HubResource.recordCloneAction import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowAccessResource.hasReadAccess @@ -601,7 +601,7 @@ class WorkflowResource extends LazyLogging { .asScala .toList - BigObjectManager.deleteAllObjects() + LargeBinaryManager.deleteAllObjects() // Collect all URIs related to executions for cleanup val uris = eids.flatMap { eid => diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala index ae67407a811..ee7cbe55443 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala @@ -49,7 +49,7 @@ import org.apache.texera.amber.error.ErrorUtils.{ getStackTraceWithAllCauses } import org.apache.texera.dao.jooq.generated.tables.pojos.User -import org.apache.texera.service.util.BigObjectManager +import org.apache.texera.service.util.LargeBinaryManager import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource @@ -348,7 +348,7 @@ class WorkflowService( logger.debug(s"Error processing document at $uri: ${error.getMessage}") } } - // Delete big objects - BigObjectManager.deleteAllObjects() + // Delete large binaries + LargeBinaryManager.deleteAllObjects() } } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeType.java b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeType.java index fa8f93ea9e8..61abd741e87 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeType.java +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeType.java @@ -70,7 +70,7 @@ public enum AttributeType implements Serializable { BOOLEAN("boolean", Boolean.class), TIMESTAMP("timestamp", Timestamp.class), BINARY("binary", byte[].class), - BIG_OBJECT("big_object", BigObject.class), + LARGE_BINARY("large_binary", LargeBinary.class), ANY("ANY", Object.class); private final String name; @@ -110,8 +110,8 @@ public static AttributeType getAttributeType(Class fieldClass) { return TIMESTAMP; } else if (fieldClass.equals(byte[].class)) { return BINARY; - } else if (fieldClass.equals(BigObject.class)) { - return BIG_OBJECT; + } else if (fieldClass.equals(LargeBinary.class)) { + return LARGE_BINARY; } else { return ANY; } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtils.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtils.scala index 41c04c41e85..efb119e6640 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtils.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtils.scala @@ -121,15 +121,15 @@ object AttributeTypeUtils extends Serializable { ): Any = { if (field == null) return null attributeType match { - case AttributeType.INTEGER => parseInteger(field, force) - case AttributeType.LONG => parseLong(field, force) - case AttributeType.DOUBLE => parseDouble(field) - case AttributeType.BOOLEAN => parseBoolean(field) - case AttributeType.TIMESTAMP => parseTimestamp(field) - case AttributeType.STRING => field.toString - case AttributeType.BINARY => field - case AttributeType.BIG_OBJECT => new BigObject(field.toString) - case AttributeType.ANY | _ => field + case AttributeType.INTEGER => parseInteger(field, force) + case AttributeType.LONG => parseLong(field, force) + case AttributeType.DOUBLE => parseDouble(field) + case AttributeType.BOOLEAN => parseBoolean(field) + case AttributeType.TIMESTAMP => parseTimestamp(field) + case AttributeType.STRING => field.toString + case AttributeType.BINARY => field + case AttributeType.LARGE_BINARY => new LargeBinary(field.toString) + case AttributeType.ANY | _ => field } } @@ -384,8 +384,8 @@ object AttributeTypeUtils extends Serializable { case AttributeType.INTEGER => tryParseInteger(fieldValue) case AttributeType.TIMESTAMP => tryParseTimestamp(fieldValue) case AttributeType.BINARY => tryParseString() - case AttributeType.BIG_OBJECT => - AttributeType.BIG_OBJECT // Big objects are never inferred from data + case AttributeType.LARGE_BINARY => + AttributeType.LARGE_BINARY // Large binaries are never inferred from data case _ => tryParseString() } } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/BigObject.java b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/LargeBinary.java similarity index 70% rename from common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/BigObject.java rename to common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/LargeBinary.java index 4feb5673ab6..9e8c4db870c 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/BigObject.java +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/LargeBinary.java @@ -23,56 +23,56 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonValue; import org.apache.texera.amber.core.executor.OperatorExecutor; -import org.apache.texera.service.util.BigObjectManager; +import org.apache.texera.service.util.LargeBinaryManager; import java.net.URI; import java.util.Objects; /** - * BigObject represents a reference to a large object stored in S3. + * LargeBinary represents a reference to a large object stored in S3. * - * Each BigObject is identified by an S3 URI (s3://bucket/path/to/object). - * BigObjects are automatically tracked and cleaned up when the workflow execution completes. + * Each LargeBinary is identified by an S3 URI (s3://bucket/path/to/object). + * LargeBinaries are automatically tracked and cleaned up when the workflow execution completes. */ -public class BigObject { +public class LargeBinary { private final String uri; /** - * Creates a BigObject from an existing S3 URI. + * Creates a LargeBinary from an existing S3 URI. * Used primarily for deserialization from JSON. * * @param uri S3 URI in the format s3://bucket/path/to/object * @throws IllegalArgumentException if URI is null or doesn't start with "s3://" */ @JsonCreator - public BigObject(@JsonProperty("uri") String uri) { + public LargeBinary(@JsonProperty("uri") String uri) { if (uri == null) { - throw new IllegalArgumentException("BigObject URI cannot be null"); + throw new IllegalArgumentException("LargeBinary URI cannot be null"); } if (!uri.startsWith("s3://")) { throw new IllegalArgumentException( - "BigObject URI must start with 's3://', got: " + uri + "LargeBinary URI must start with 's3://', got: " + uri ); } this.uri = uri; } /** - * Creates a new BigObject for writing data. + * Creates a new LargeBinary for writing data. * Generates a unique S3 URI. * * Usage example: * - * BigObject bigObject = new BigObject(); - * try (BigObjectOutputStream out = new BigObjectOutputStream(bigObject)) { + * LargeBinary largeBinary = new LargeBinary(); + * try (LargeBinaryOutputStream out = new LargeBinaryOutputStream(largeBinary)) { * out.write(data); * } - * // bigObject is now ready to be added to tuples + * // largeBinary is now ready to be added to tuples * */ - public BigObject() { - this(BigObjectManager.create()); + public LargeBinary() { + this(LargeBinaryManager.create()); } @JsonValue @@ -97,8 +97,8 @@ public String toString() { @Override public boolean equals(Object obj) { if (this == obj) return true; - if (!(obj instanceof BigObject)) return false; - BigObject that = (BigObject) obj; + if (!(obj instanceof LargeBinary)) return false; + LargeBinary that = (LargeBinary) obj; return Objects.equals(uri, that.uri); } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala index 19ed81efa48..ad6ac07c1ff 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala @@ -20,7 +20,7 @@ package org.apache.texera.amber.util import org.apache.texera.amber.config.StorageConfig -import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, BigObject, Schema, Tuple} +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, LargeBinary, Schema, Tuple} import org.apache.hadoop.conf.Configuration import org.apache.iceberg.catalog.{Catalog, TableIdentifier} import org.apache.iceberg.data.parquet.GenericParquetReaders @@ -52,8 +52,8 @@ import scala.jdk.CollectionConverters._ */ object IcebergUtil { - // Unique suffix for BIG_OBJECT field encoding - private val BIG_OBJECT_FIELD_SUFFIX = "__texera_big_obj_ptr" + // Unique suffix for LARGE_BINARY field encoding + private val LARGE_BINARY_FIELD_SUFFIX = "__texera_large_binary_ptr" /** * Creates and initializes a HadoopCatalog with the given parameters. @@ -203,7 +203,7 @@ object IcebergUtil { /** * Converts a custom Amber `Schema` to an Iceberg `Schema`. - * Field names are encoded to preserve BIG_OBJECT type information. + * Field names are encoded to preserve LARGE_BINARY type information. * * @param amberSchema The custom Amber Schema. * @return An Iceberg Schema. @@ -211,7 +211,7 @@ object IcebergUtil { def toIcebergSchema(amberSchema: Schema): IcebergSchema = { val icebergFields = amberSchema.getAttributes.zipWithIndex.map { case (attribute, index) => - val encodedName = encodeBigObjectFieldName(attribute.getName, attribute.getType) + val encodedName = encodeLargeBinaryFieldName(attribute.getName, attribute.getType) val icebergType = toIcebergType(attribute.getType) Types.NestedField.optional(index + 1, encodedName, icebergType) } @@ -220,7 +220,7 @@ object IcebergUtil { /** * Converts a custom Amber `AttributeType` to an Iceberg `Type`. - * Note: BIG_OBJECT is stored as StringType; field name encoding is used to distinguish it. + * Note: LARGE_BINARY is stored as StringType; field name encoding is used to distinguish it. * * @param attributeType The custom Amber AttributeType. * @return The corresponding Iceberg Type. @@ -234,8 +234,8 @@ object IcebergUtil { case AttributeType.BOOLEAN => Types.BooleanType.get() case AttributeType.TIMESTAMP => Types.TimestampType.withoutZone() case AttributeType.BINARY => Types.BinaryType.get() - case AttributeType.BIG_OBJECT => - Types.StringType.get() // Store BigObjectPointer URI as string + case AttributeType.LARGE_BINARY => + Types.StringType.get() // Store LargeBinary URI as string case AttributeType.ANY => throw new IllegalArgumentException("ANY type is not supported in Iceberg") } @@ -252,13 +252,13 @@ object IcebergUtil { tuple.schema.getAttributes.zipWithIndex.foreach { case (attribute, index) => - val fieldName = encodeBigObjectFieldName(attribute.getName, attribute.getType) + val fieldName = encodeLargeBinaryFieldName(attribute.getName, attribute.getType) val value = tuple.getField[AnyRef](index) match { - case null => null - case ts: Timestamp => ts.toInstant.atZone(ZoneId.systemDefault()).toLocalDateTime - case bytes: Array[Byte] => ByteBuffer.wrap(bytes) - case bigObjPtr: BigObject => bigObjPtr.getUri - case other => other + case null => null + case ts: Timestamp => ts.toInstant.atZone(ZoneId.systemDefault()).toLocalDateTime + case bytes: Array[Byte] => ByteBuffer.wrap(bytes) + case largeBinaryPtr: LargeBinary => largeBinaryPtr.getUri + case other => other } record.setField(fieldName, value) } @@ -275,7 +275,7 @@ object IcebergUtil { */ def fromRecord(record: Record, amberSchema: Schema): Tuple = { val fieldValues = amberSchema.getAttributes.map { attribute => - val fieldName = encodeBigObjectFieldName(attribute.getName, attribute.getType) + val fieldName = encodeLargeBinaryFieldName(attribute.getName, attribute.getType) val rawValue = record.getField(fieldName) rawValue match { @@ -285,8 +285,8 @@ object IcebergUtil { val bytes = new Array[Byte](buffer.remaining()) buffer.get(bytes) bytes - case uri: String if attribute.getType == AttributeType.BIG_OBJECT => - new BigObject(uri) + case uri: String if attribute.getType == AttributeType.LARGE_BINARY => + new LargeBinary(uri) case other => other } } @@ -295,16 +295,19 @@ object IcebergUtil { } /** - * Encodes a field name for BIG_OBJECT types by adding a unique system suffix. - * This ensures BIG_OBJECT fields can be identified when reading from Iceberg. + * Encodes a field name for LARGE_BINARY types by adding a unique system suffix. + * This ensures LARGE_BINARY fields can be identified when reading from Iceberg. * * @param fieldName The original field name * @param attributeType The attribute type - * @return The encoded field name with a unique suffix for BIG_OBJECT types + * @return The encoded field name with a unique suffix for LARGE_BINARY types */ - private def encodeBigObjectFieldName(fieldName: String, attributeType: AttributeType): String = { - if (attributeType == AttributeType.BIG_OBJECT) { - s"${fieldName}${BIG_OBJECT_FIELD_SUFFIX}" + private def encodeLargeBinaryFieldName( + fieldName: String, + attributeType: AttributeType + ): String = { + if (attributeType == AttributeType.LARGE_BINARY) { + s"${fieldName}${LARGE_BINARY_FIELD_SUFFIX}" } else { fieldName } @@ -317,27 +320,27 @@ object IcebergUtil { * @param fieldName The encoded field name * @return The original field name with system suffix removed */ - private def decodeBigObjectFieldName(fieldName: String): String = { - if (isBigObjectField(fieldName)) { - fieldName.substring(0, fieldName.length - BIG_OBJECT_FIELD_SUFFIX.length) + private def decodeLargeBinaryFieldName(fieldName: String): String = { + if (isLargeBinaryField(fieldName)) { + fieldName.substring(0, fieldName.length - LARGE_BINARY_FIELD_SUFFIX.length) } else { fieldName } } /** - * Checks if a field name indicates a BIG_OBJECT type by examining the unique suffix. + * Checks if a field name indicates a LARGE_BINARY type by examining the unique suffix. * * @param fieldName The field name to check - * @return true if the field represents a BIG_OBJECT type, false otherwise + * @return true if the field represents a LARGE_BINARY type, false otherwise */ - private def isBigObjectField(fieldName: String): Boolean = { - fieldName.endsWith(BIG_OBJECT_FIELD_SUFFIX) + private def isLargeBinaryField(fieldName: String): Boolean = { + fieldName.endsWith(LARGE_BINARY_FIELD_SUFFIX) } /** * Converts an Iceberg `Schema` to an Amber `Schema`. - * Field names are decoded to restore original names and detect BIG_OBJECT types. + * Field names are decoded to restore original names and detect LARGE_BINARY types. * * @param icebergSchema The Iceberg Schema. * @return The corresponding Amber Schema. @@ -349,7 +352,7 @@ object IcebergUtil { .map { field => val fieldName = field.name() val attributeType = fromIcebergType(field.`type`().asPrimitiveType(), fieldName) - val originalName = decodeBigObjectFieldName(fieldName) + val originalName = decodeLargeBinaryFieldName(fieldName) new Attribute(originalName, attributeType) } .toList @@ -361,7 +364,7 @@ object IcebergUtil { * Converts an Iceberg `Type` to an Amber `AttributeType`. * * @param icebergType The Iceberg Type. - * @param fieldName The field name (used to detect BIG_OBJECT by suffix). + * @param fieldName The field name (used to detect LARGE_BINARY by suffix). * @return The corresponding Amber AttributeType. */ def fromIcebergType( @@ -370,7 +373,7 @@ object IcebergUtil { ): AttributeType = { icebergType match { case _: Types.StringType => - if (isBigObjectField(fieldName)) AttributeType.BIG_OBJECT else AttributeType.STRING + if (isLargeBinaryField(fieldName)) AttributeType.LARGE_BINARY else AttributeType.STRING case _: Types.IntegerType => AttributeType.INTEGER case _: Types.LongType => AttributeType.LONG case _: Types.DoubleType => AttributeType.DOUBLE diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectInputStream.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryInputStream.scala similarity index 83% rename from common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectInputStream.scala rename to common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryInputStream.scala index 8a3da1d6b78..e2af371e777 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectInputStream.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryInputStream.scala @@ -19,31 +19,31 @@ package org.apache.texera.service.util -import org.apache.texera.amber.core.tuple.BigObject +import org.apache.texera.amber.core.tuple.LargeBinary import java.io.InputStream /** - * InputStream for reading BigObject data from S3. + * InputStream for reading LargeBinary data from S3. * * The underlying S3 download is lazily initialized on first read. * The stream will fail if the S3 object doesn't exist when read is attempted. * * Usage: * {{{ - * val bigObject: BigObject = ... - * try (val in = new BigObjectInputStream(bigObject)) { + * val largeBinary: LargeBinary = ... + * try (val in = new LargeBinaryInputStream(largeBinary)) { * val bytes = in.readAllBytes() * } * }}} */ -class BigObjectInputStream(bigObject: BigObject) extends InputStream { +class LargeBinaryInputStream(largeBinary: LargeBinary) extends InputStream { - require(bigObject != null, "BigObject cannot be null") + require(largeBinary != null, "LargeBinary cannot be null") // Lazy initialization - downloads only when first read() is called private lazy val underlying: InputStream = - S3StorageClient.downloadObject(bigObject.getBucketName, bigObject.getObjectKey) + S3StorageClient.downloadObject(largeBinary.getBucketName, largeBinary.getObjectKey) @volatile private var closed = false diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala similarity index 72% rename from common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala rename to common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala index a6a273eb304..211d7d3b757 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala @@ -24,19 +24,19 @@ import com.typesafe.scalalogging.LazyLogging import java.util.UUID /** - * Manages the lifecycle of BigObjects stored in S3. + * Manages the lifecycle of LargeBinaries stored in S3. * * Handles creation and deletion of large objects that exceed * normal tuple size limits. */ -object BigObjectManager extends LazyLogging { - private val DEFAULT_BUCKET = "texera-big-objects" +object LargeBinaryManager extends LazyLogging { + private val DEFAULT_BUCKET = "texera-large-binaries" /** - * Creates a new BigObject reference. - * The actual data upload happens separately via BigObjectOutputStream. + * Creates a new LargeBinary reference. + * The actual data upload happens separately via LargeBinaryOutputStream. * - * @return S3 URI string for the new BigObject (format: s3://bucket/key) + * @return S3 URI string for the new LargeBinary (format: s3://bucket/key) */ def create(): String = { S3StorageClient.createBucketIfNotExist(DEFAULT_BUCKET) @@ -48,7 +48,7 @@ object BigObjectManager extends LazyLogging { } /** - * Deletes all big objects from the bucket. + * Deletes all large binaries from the bucket. * * @throws Exception if the deletion fails * @return Unit @@ -56,10 +56,10 @@ object BigObjectManager extends LazyLogging { def deleteAllObjects(): Unit = { try { S3StorageClient.deleteDirectory(DEFAULT_BUCKET, "objects") - logger.info(s"Successfully deleted all big objects from bucket: $DEFAULT_BUCKET") + logger.info(s"Successfully deleted all large binaries from bucket: $DEFAULT_BUCKET") } catch { case e: Exception => - logger.warn(s"Failed to delete big objects from bucket: $DEFAULT_BUCKET", e) + logger.warn(s"Failed to delete large binaries from bucket: $DEFAULT_BUCKET", e) } } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectOutputStream.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryOutputStream.scala similarity index 82% rename from common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectOutputStream.scala rename to common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryOutputStream.scala index e0526107fac..ac6025146cb 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectOutputStream.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryOutputStream.scala @@ -20,7 +20,7 @@ package org.apache.texera.service.util import com.typesafe.scalalogging.LazyLogging -import org.apache.texera.amber.core.tuple.BigObject +import org.apache.texera.amber.core.tuple.LargeBinary import java.io.{IOException, OutputStream, PipedInputStream, PipedOutputStream} import java.util.concurrent.atomic.AtomicReference @@ -28,32 +28,32 @@ import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration.Duration /** - * OutputStream for streaming BigObject data to S3. + * OutputStream for streaming LargeBinary data to S3. * * Data is uploaded in the background using multipart upload as you write. * Call close() to complete the upload and ensure all data is persisted. * * Usage: * {{{ - * val bigObject = new BigObject() - * try (val out = new BigObjectOutputStream(bigObject)) { + * val largeBinary = new LargeBinary() + * try (val out = new LargeBinaryOutputStream(largeBinary)) { * out.write(myBytes) * } - * // bigObject is now ready to use + * // largeBinary is now ready to use * }}} * * Note: Not thread-safe. Do not access from multiple threads concurrently. * - * @param bigObject The BigObject reference to write to + * @param largeBinary The LargeBinary reference to write to */ -class BigObjectOutputStream(bigObject: BigObject) extends OutputStream with LazyLogging { +class LargeBinaryOutputStream(largeBinary: LargeBinary) extends OutputStream with LazyLogging { private val PIPE_BUFFER_SIZE = 64 * 1024 // 64KB - require(bigObject != null, "BigObject cannot be null") + require(largeBinary != null, "LargeBinary cannot be null") - private val bucketName: String = bigObject.getBucketName - private val objectKey: String = bigObject.getObjectKey + private val bucketName: String = largeBinary.getBucketName + private val objectKey: String = largeBinary.getObjectKey private implicit val ec: ExecutionContext = ExecutionContext.global // Pipe: we write to pipedOut, and S3 reads from pipedIn @@ -68,11 +68,11 @@ class BigObjectOutputStream(bigObject: BigObject) extends OutputStream with Lazy try { S3StorageClient.createBucketIfNotExist(bucketName) S3StorageClient.uploadObject(bucketName, objectKey, pipedIn) - logger.debug(s"Upload completed: ${bigObject.getUri}") + logger.debug(s"Upload completed: ${largeBinary.getUri}") } catch { case e: Exception => uploadException.set(Some(e)) - logger.error(s"Upload failed: ${bigObject.getUri}", e) + logger.error(s"Upload failed: ${largeBinary.getUri}", e) } finally { pipedIn.close() } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtilsSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtilsSpec.scala index e99c54d5d69..08b9774607e 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtilsSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtilsSpec.scala @@ -196,24 +196,24 @@ class AttributeTypeUtilsSpec extends AnyFunSuite { assert(parseField("anything", AttributeType.ANY) == "anything") } - test("parseField correctly parses to BIG_OBJECT") { - // Valid S3 URI strings are converted to BigObject - val pointer1 = parseField("s3://bucket/path/to/object", AttributeType.BIG_OBJECT) - .asInstanceOf[BigObject] + test("parseField correctly parses to LARGE_BINARY") { + // Valid S3 URI strings are converted to LargeBinary + val pointer1 = parseField("s3://bucket/path/to/object", AttributeType.LARGE_BINARY) + .asInstanceOf[LargeBinary] assert(pointer1.getUri == "s3://bucket/path/to/object") assert(pointer1.getBucketName == "bucket") assert(pointer1.getObjectKey == "path/to/object") // Null input returns null - assert(parseField(null, AttributeType.BIG_OBJECT) == null) + assert(parseField(null, AttributeType.LARGE_BINARY) == null) } - test("BIG_OBJECT type is preserved but never inferred from data") { - // BIG_OBJECT remains BIG_OBJECT when passed as typeSoFar - assert(inferField(AttributeType.BIG_OBJECT, "any-value") == AttributeType.BIG_OBJECT) - assert(inferField(AttributeType.BIG_OBJECT, null) == AttributeType.BIG_OBJECT) + test("LARGE_BINARY type is preserved but never inferred from data") { + // LARGE_BINARY remains LARGE_BINARY when passed as typeSoFar + assert(inferField(AttributeType.LARGE_BINARY, "any-value") == AttributeType.LARGE_BINARY) + assert(inferField(AttributeType.LARGE_BINARY, null) == AttributeType.LARGE_BINARY) - // String data is inferred as STRING, never BIG_OBJECT + // String data is inferred as STRING, never LARGE_BINARY assert(inferField("s3://bucket/path") == AttributeType.STRING) } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/IcebergUtilSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/IcebergUtilSpec.scala index 59a04ad6680..da15a8060d3 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/IcebergUtilSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/IcebergUtilSpec.scala @@ -19,7 +19,7 @@ package org.apache.texera.amber.util -import org.apache.texera.amber.core.tuple.{AttributeType, BigObject, Schema, Tuple} +import org.apache.texera.amber.core.tuple.{AttributeType, LargeBinary, Schema, Tuple} import org.apache.texera.amber.util.IcebergUtil.toIcebergSchema import org.apache.iceberg.data.GenericRecord import org.apache.iceberg.types.Types @@ -200,90 +200,91 @@ class IcebergUtilSpec extends AnyFlatSpec { assert(tuple.getField[Array[Byte]]("test-7") sameElements Array[Byte](1, 2, 3, 4)) } - // BIG_OBJECT type tests + // LARGE_BINARY type tests - it should "convert BIG_OBJECT type correctly between Texera and Iceberg" in { - // BIG_OBJECT stored as StringType with field name suffix - assert(IcebergUtil.toIcebergType(AttributeType.BIG_OBJECT) == Types.StringType.get()) + it should "convert LARGE_BINARY type correctly between Texera and Iceberg" in { + // LARGE_BINARY stored as StringType with field name suffix + assert(IcebergUtil.toIcebergType(AttributeType.LARGE_BINARY) == Types.StringType.get()) assert(IcebergUtil.fromIcebergType(Types.StringType.get(), "field") == AttributeType.STRING) assert( IcebergUtil.fromIcebergType( Types.StringType.get(), - "field__texera_big_obj_ptr" - ) == AttributeType.BIG_OBJECT + "field__texera_large_binary_ptr" + ) == AttributeType.LARGE_BINARY ) } - it should "convert schemas with BIG_OBJECT fields correctly" in { + it should "convert schemas with LARGE_BINARY fields correctly" in { val texeraSchema = Schema() .add("id", AttributeType.INTEGER) - .add("large_data", AttributeType.BIG_OBJECT) + .add("large_data", AttributeType.LARGE_BINARY) val icebergSchema = IcebergUtil.toIcebergSchema(texeraSchema) - // BIG_OBJECT field gets encoded name with suffix - assert(icebergSchema.findField("large_data__texera_big_obj_ptr") != null) + // LARGE_BINARY field gets encoded name with suffix + assert(icebergSchema.findField("large_data__texera_large_binary_ptr") != null) assert( - icebergSchema.findField("large_data__texera_big_obj_ptr").`type`() == Types.StringType.get() + icebergSchema.findField("large_data__texera_large_binary_ptr").`type`() == Types.StringType + .get() ) // Round-trip preserves schema val roundTripSchema = IcebergUtil.fromIcebergSchema(icebergSchema) - assert(roundTripSchema.getAttribute("large_data").getType == AttributeType.BIG_OBJECT) + assert(roundTripSchema.getAttribute("large_data").getType == AttributeType.LARGE_BINARY) } - it should "convert tuples with BIG_OBJECT to records and back correctly" in { + it should "convert tuples with LARGE_BINARY to records and back correctly" in { val schema = Schema() .add("id", AttributeType.INTEGER) - .add("large_data", AttributeType.BIG_OBJECT) + .add("large_data", AttributeType.LARGE_BINARY) val tuple = Tuple .builder(schema) - .addSequentially(Array(Int.box(42), new BigObject("s3://bucket/object/key.data"))) + .addSequentially(Array(Int.box(42), new LargeBinary("s3://bucket/object/key.data"))) .build() val record = IcebergUtil.toGenericRecord(toIcebergSchema(schema), tuple) - // BIG_OBJECT stored as URI string with encoded field name + // LARGE_BINARY stored as URI string with encoded field name assert(record.getField("id") == 42) - assert(record.getField("large_data__texera_big_obj_ptr") == "s3://bucket/object/key.data") + assert(record.getField("large_data__texera_large_binary_ptr") == "s3://bucket/object/key.data") // Round-trip preserves data val roundTripTuple = IcebergUtil.fromRecord(record, schema) assert(roundTripTuple == tuple) - // BigObject properties are accessible - val bigObj = roundTripTuple.getField[BigObject]("large_data") - assert(bigObj.getUri == "s3://bucket/object/key.data") - assert(bigObj.getBucketName == "bucket") - assert(bigObj.getObjectKey == "object/key.data") + // LargeBinary properties are accessible + val largeBinary = roundTripTuple.getField[LargeBinary]("large_data") + assert(largeBinary.getUri == "s3://bucket/object/key.data") + assert(largeBinary.getBucketName == "bucket") + assert(largeBinary.getObjectKey == "object/key.data") } - it should "handle null BIG_OBJECT values correctly" in { - val schema = Schema().add("data", AttributeType.BIG_OBJECT) + it should "handle null LARGE_BINARY values correctly" in { + val schema = Schema().add("data", AttributeType.LARGE_BINARY) val tupleWithNull = Tuple.builder(schema).addSequentially(Array(null)).build() val record = IcebergUtil.toGenericRecord(toIcebergSchema(schema), tupleWithNull) - assert(record.getField("data__texera_big_obj_ptr") == null) + assert(record.getField("data__texera_large_binary_ptr") == null) assert(IcebergUtil.fromRecord(record, schema) == tupleWithNull) } - it should "handle multiple BIG_OBJECT fields and mixed types correctly" in { + it should "handle multiple LARGE_BINARY fields and mixed types correctly" in { val schema = Schema() .add("int_field", AttributeType.INTEGER) - .add("big_obj_1", AttributeType.BIG_OBJECT) + .add("large_binary_1", AttributeType.LARGE_BINARY) .add("string_field", AttributeType.STRING) - .add("big_obj_2", AttributeType.BIG_OBJECT) + .add("large_binary_2", AttributeType.LARGE_BINARY) val tuple = Tuple .builder(schema) .addSequentially( Array( Int.box(123), - new BigObject("s3://bucket1/file1.dat"), + new LargeBinary("s3://bucket1/file1.dat"), "normal string", - null // null BIG_OBJECT + null // null LARGE_BINARY ) ) .build() @@ -291,9 +292,9 @@ class IcebergUtilSpec extends AnyFlatSpec { val record = IcebergUtil.toGenericRecord(toIcebergSchema(schema), tuple) assert(record.getField("int_field") == 123) - assert(record.getField("big_obj_1__texera_big_obj_ptr") == "s3://bucket1/file1.dat") + assert(record.getField("large_binary_1__texera_large_binary_ptr") == "s3://bucket1/file1.dat") assert(record.getField("string_field") == "normal string") - assert(record.getField("big_obj_2__texera_big_obj_ptr") == null) + assert(record.getField("large_binary_2__texera_large_binary_ptr") == null) assert(IcebergUtil.fromRecord(record, schema) == tuple) } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectManagerSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectManagerSpec.scala deleted file mode 100644 index c40e87a4a84..00000000000 --- a/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectManagerSpec.scala +++ /dev/null @@ -1,471 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.texera.service.util - -import org.apache.texera.amber.core.tuple.BigObject -import org.scalatest.funsuite.AnyFunSuite - -class BigObjectManagerSpec extends AnyFunSuite with S3StorageTestBase { - - /** Creates a big object from string data and returns it. */ - private def createBigObject(data: String): BigObject = { - val bigObject = new BigObject() - val out = new BigObjectOutputStream(bigObject) - try { - out.write(data.getBytes) - } finally { - out.close() - } - bigObject - } - - /** Verifies standard bucket name. */ - private def assertStandardBucket(pointer: BigObject): Unit = { - assert(pointer.getBucketName == "texera-big-objects") - assert(pointer.getUri.startsWith("s3://texera-big-objects/")) - } - - // ======================================== - // BigObjectInputStream Tests (Standard Java InputStream) - // ======================================== - - test("BigObjectInputStream should read all bytes from stream") { - val data = "Hello, World! This is a test." - val bigObject = createBigObject(data) - - val stream = new BigObjectInputStream(bigObject) - assert(stream.readAllBytes().sameElements(data.getBytes)) - stream.close() - - BigObjectManager.deleteAllObjects() - } - - test("BigObjectInputStream should read exact number of bytes") { - val bigObject = createBigObject("0123456789ABCDEF") - - val stream = new BigObjectInputStream(bigObject) - val result = stream.readNBytes(10) - - assert(result.length == 10) - assert(result.sameElements("0123456789".getBytes)) - stream.close() - - BigObjectManager.deleteAllObjects() - } - - test("BigObjectInputStream should handle reading more bytes than available") { - val data = "Short" - val bigObject = createBigObject(data) - - val stream = new BigObjectInputStream(bigObject) - val result = stream.readNBytes(100) - - assert(result.length == data.length) - assert(result.sameElements(data.getBytes)) - stream.close() - - BigObjectManager.deleteAllObjects() - } - - test("BigObjectInputStream should support standard single-byte read") { - val bigObject = createBigObject("ABC") - - val stream = new BigObjectInputStream(bigObject) - assert(stream.read() == 65) // 'A' - assert(stream.read() == 66) // 'B' - assert(stream.read() == 67) // 'C' - assert(stream.read() == -1) // EOF - stream.close() - - BigObjectManager.deleteAllObjects() - } - - test("BigObjectInputStream should return -1 at EOF") { - val bigObject = createBigObject("EOF") - - val stream = new BigObjectInputStream(bigObject) - stream.readAllBytes() // Read all data - assert(stream.read() == -1) - stream.close() - - BigObjectManager.deleteAllObjects() - } - - test("BigObjectInputStream should throw exception when reading from closed stream") { - val bigObject = createBigObject("test") - - val stream = new BigObjectInputStream(bigObject) - stream.close() - - assertThrows[java.io.IOException](stream.read()) - assertThrows[java.io.IOException](stream.readAllBytes()) - - BigObjectManager.deleteAllObjects() - } - - test("BigObjectInputStream should handle multiple close calls") { - val bigObject = createBigObject("test") - - val stream = new BigObjectInputStream(bigObject) - stream.close() - stream.close() // Should not throw - - BigObjectManager.deleteAllObjects() - } - - test("BigObjectInputStream should read large data correctly") { - val largeData = Array.fill[Byte](20000)((scala.util.Random.nextInt(256) - 128).toByte) - val bigObject = new BigObject() - val out = new BigObjectOutputStream(bigObject) - try { - out.write(largeData) - } finally { - out.close() - } - - val stream = new BigObjectInputStream(bigObject) - val result = stream.readAllBytes() - assert(result.sameElements(largeData)) - stream.close() - - BigObjectManager.deleteAllObjects() - } - - // ======================================== - // BigObjectManager Tests - // ======================================== - - test("BigObjectManager should create a big object") { - val pointer = createBigObject("Test big object data") - - assertStandardBucket(pointer) - } - - test("BigObjectInputStream should open and read a big object") { - val data = "Hello from big object!" - val pointer = createBigObject(data) - - val stream = new BigObjectInputStream(pointer) - val readData = stream.readAllBytes() - stream.close() - - assert(readData.sameElements(data.getBytes)) - } - - test("BigObjectInputStream should fail to open non-existent big object") { - val fakeBigObject = new BigObject("s3://texera-big-objects/nonexistent/file") - val stream = new BigObjectInputStream(fakeBigObject) - - try { - intercept[Exception] { - stream.read() - } - } finally { - try { stream.close() } - catch { case _: Exception => } - } - } - - test("BigObjectManager should delete all big objects") { - val pointer1 = new BigObject() - val out1 = new BigObjectOutputStream(pointer1) - try { - out1.write("Object 1".getBytes) - } finally { - out1.close() - } - - val pointer2 = new BigObject() - val out2 = new BigObjectOutputStream(pointer2) - try { - out2.write("Object 2".getBytes) - } finally { - out2.close() - } - - BigObjectManager.deleteAllObjects() - } - - test("BigObjectManager should handle delete with no objects gracefully") { - BigObjectManager.deleteAllObjects() // Should not throw exception - } - - test("BigObjectManager should delete all objects") { - val pointer1 = createBigObject("Test data") - val pointer2 = createBigObject("Test data") - - BigObjectManager.deleteAllObjects() - } - - test("BigObjectManager should create bucket if it doesn't exist") { - val pointer = createBigObject("Test bucket creation") - - assertStandardBucket(pointer) - - BigObjectManager.deleteAllObjects() - } - - test("BigObjectManager should handle large objects correctly") { - val largeData = Array.fill[Byte](6 * 1024 * 1024)((scala.util.Random.nextInt(256) - 128).toByte) - val pointer = new BigObject() - val out = new BigObjectOutputStream(pointer) - try { - out.write(largeData) - } finally { - out.close() - } - - val stream = new BigObjectInputStream(pointer) - val readData = stream.readAllBytes() - stream.close() - - assert(readData.sameElements(largeData)) - BigObjectManager.deleteAllObjects() - } - - test("BigObjectManager should generate unique URIs for different objects") { - val testData = "Unique URI test".getBytes - val pointer1 = new BigObject() - val out1 = new BigObjectOutputStream(pointer1) - try { - out1.write(testData) - } finally { - out1.close() - } - - val pointer2 = new BigObject() - val out2 = new BigObjectOutputStream(pointer2) - try { - out2.write(testData) - } finally { - out2.close() - } - - assert(pointer1.getUri != pointer2.getUri) - assert(pointer1.getObjectKey != pointer2.getObjectKey) - - BigObjectManager.deleteAllObjects() - } - - test("BigObjectInputStream should handle multiple reads from the same big object") { - val data = "Multiple reads test data" - val pointer = createBigObject(data) - - val stream1 = new BigObjectInputStream(pointer) - val readData1 = stream1.readAllBytes() - stream1.close() - - val stream2 = new BigObjectInputStream(pointer) - val readData2 = stream2.readAllBytes() - stream2.close() - - assert(readData1.sameElements(data.getBytes)) - assert(readData2.sameElements(data.getBytes)) - - BigObjectManager.deleteAllObjects() - } - - test("BigObjectManager should properly parse bucket name and object key from big object") { - val bigObject = createBigObject("URI parsing test") - - assertStandardBucket(bigObject) - assert(bigObject.getObjectKey.nonEmpty) - assert(!bigObject.getObjectKey.startsWith("/")) - - BigObjectManager.deleteAllObjects() - } - - // ======================================== - // Object-Oriented API Tests - // ======================================== - - test("BigObject with BigObjectOutputStream should create a big object") { - val data = "Test data for BigObject with BigObjectOutputStream" - - val bigObject = new BigObject() - val out = new BigObjectOutputStream(bigObject) - try { - out.write(data.getBytes) - } finally { - out.close() - } - - assertStandardBucket(bigObject) - - BigObjectManager.deleteAllObjects() - } - - test("BigObjectInputStream constructor should read big object contents") { - val data = "Test data for BigObjectInputStream constructor" - val bigObject = createBigObject(data) - - val stream = new BigObjectInputStream(bigObject) - val readData = stream.readAllBytes() - stream.close() - - assert(readData.sameElements(data.getBytes)) - - BigObjectManager.deleteAllObjects() - } - - test("BigObjectOutputStream and BigObjectInputStream should work together end-to-end") { - val data = "End-to-end test data" - - // Create using streaming API - val bigObject = new BigObject() - val out = new BigObjectOutputStream(bigObject) - try { - out.write(data.getBytes) - } finally { - out.close() - } - - // Read using standard constructor - val stream = new BigObjectInputStream(bigObject) - val readData = stream.readAllBytes() - stream.close() - - assert(readData.sameElements(data.getBytes)) - - BigObjectManager.deleteAllObjects() - } - - // ======================================== - // BigObjectOutputStream Tests (New Symmetric API) - // ======================================== - - test("BigObjectOutputStream should write and upload data to S3") { - val data = "Test data for BigObjectOutputStream" - - val bigObject = new BigObject() - val outStream = new BigObjectOutputStream(bigObject) - outStream.write(data.getBytes) - outStream.close() - - assertStandardBucket(bigObject) - - // Verify data can be read back - val inStream = new BigObjectInputStream(bigObject) - val readData = inStream.readAllBytes() - inStream.close() - - assert(readData.sameElements(data.getBytes)) - - BigObjectManager.deleteAllObjects() - } - - test("BigObjectOutputStream should create big object") { - val data = "Database registration test" - - val bigObject = new BigObject() - val outStream = new BigObjectOutputStream(bigObject) - outStream.write(data.getBytes) - outStream.close() - - assertStandardBucket(bigObject) - - BigObjectManager.deleteAllObjects() - } - - test("BigObjectOutputStream should handle large data correctly") { - val largeData = Array.fill[Byte](8 * 1024 * 1024)((scala.util.Random.nextInt(256) - 128).toByte) - - val bigObject = new BigObject() - val outStream = new BigObjectOutputStream(bigObject) - outStream.write(largeData) - outStream.close() - - // Verify data integrity - val inStream = new BigObjectInputStream(bigObject) - val readData = inStream.readAllBytes() - inStream.close() - - assert(readData.sameElements(largeData)) - - BigObjectManager.deleteAllObjects() - } - - test("BigObjectOutputStream should handle multiple writes") { - val bigObject = new BigObject() - val outStream = new BigObjectOutputStream(bigObject) - outStream.write("Hello ".getBytes) - outStream.write("World".getBytes) - outStream.write("!".getBytes) - outStream.close() - - val inStream = new BigObjectInputStream(bigObject) - val readData = inStream.readAllBytes() - inStream.close() - - assert(readData.sameElements("Hello World!".getBytes)) - - BigObjectManager.deleteAllObjects() - } - - test("BigObjectOutputStream should throw exception when writing to closed stream") { - val bigObject = new BigObject() - val outStream = new BigObjectOutputStream(bigObject) - outStream.write("test".getBytes) - outStream.close() - - assertThrows[java.io.IOException](outStream.write("more".getBytes)) - - BigObjectManager.deleteAllObjects() - } - - test("BigObjectOutputStream should handle close() being called multiple times") { - val bigObject = new BigObject() - val outStream = new BigObjectOutputStream(bigObject) - outStream.write("test".getBytes) - outStream.close() - outStream.close() // Should not throw - - BigObjectManager.deleteAllObjects() - } - - test("New BigObject() constructor should create unique URIs") { - val bigObject1 = new BigObject() - val bigObject2 = new BigObject() - - assert(bigObject1.getUri != bigObject2.getUri) - assert(bigObject1.getObjectKey != bigObject2.getObjectKey) - - BigObjectManager.deleteAllObjects() - } - - test("BigObject() and BigObjectOutputStream API should be symmetric with input") { - val data = "Symmetric API test" - - // Write using new symmetric API - val bigObject = new BigObject() - val outStream = new BigObjectOutputStream(bigObject) - outStream.write(data.getBytes) - outStream.close() - - // Read using symmetric API - val inStream = new BigObjectInputStream(bigObject) - val readData = inStream.readAllBytes() - inStream.close() - - assert(readData.sameElements(data.getBytes)) - - BigObjectManager.deleteAllObjects() - } -} diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectInputStreamSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryInputStreamSpec.scala similarity index 68% rename from common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectInputStreamSpec.scala rename to common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryInputStreamSpec.scala index e964d0a1cc9..0e307d8e42c 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectInputStreamSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryInputStreamSpec.scala @@ -19,20 +19,20 @@ package org.apache.texera.service.util -import org.apache.texera.amber.core.tuple.BigObject +import org.apache.texera.amber.core.tuple.LargeBinary import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.funsuite.AnyFunSuite import java.io.{ByteArrayInputStream, IOException} import scala.util.Random -class BigObjectInputStreamSpec +class LargeBinaryInputStreamSpec extends AnyFunSuite with S3StorageTestBase with BeforeAndAfterAll with BeforeAndAfterEach { - private val testBucketName = "test-big-object-input-stream" + private val testBucketName = "test-large-binary-input-stream" override def beforeAll(): Unit = { super.beforeAll() @@ -49,19 +49,19 @@ class BigObjectInputStreamSpec } // Helper methods - private def createTestObject(key: String, data: Array[Byte]): BigObject = { + private def createTestObject(key: String, data: Array[Byte]): LargeBinary = { S3StorageClient.uploadObject(testBucketName, key, new ByteArrayInputStream(data)) - new BigObject(s"s3://$testBucketName/$key") + new LargeBinary(s"s3://$testBucketName/$key") } - private def createTestObject(key: String, data: String): BigObject = + private def createTestObject(key: String, data: String): LargeBinary = createTestObject(key, data.getBytes) private def generateRandomData(size: Int): Array[Byte] = Array.fill[Byte](size)((Random.nextInt(256) - 128).toByte) - private def withStream[T](bigObject: BigObject)(f: BigObjectInputStream => T): T = { - val stream = new BigObjectInputStream(bigObject) + private def withStream[T](largeBinary: LargeBinary)(f: LargeBinaryInputStream => T): T = { + val stream = new LargeBinaryInputStream(largeBinary) try { f(stream) } finally { @@ -69,31 +69,31 @@ class BigObjectInputStreamSpec } } - private def assertThrowsIOExceptionWhenClosed(operation: BigObjectInputStream => Unit): Unit = { - val bigObject = createTestObject(s"test/closed-${Random.nextInt()}.txt", "data") - val stream = new BigObjectInputStream(bigObject) + private def assertThrowsIOExceptionWhenClosed(operation: LargeBinaryInputStream => Unit): Unit = { + val largeBinary = createTestObject(s"test/closed-${Random.nextInt()}.txt", "data") + val stream = new LargeBinaryInputStream(largeBinary) stream.close() val exception = intercept[IOException](operation(stream)) assert(exception.getMessage.contains("Stream is closed")) } // Constructor Tests - test("constructor should reject null BigObject") { + test("constructor should reject null LargeBinary") { val exception = intercept[IllegalArgumentException] { - new BigObjectInputStream(null) + new LargeBinaryInputStream(null) } - assert(exception.getMessage.contains("BigObject cannot be null")) + assert(exception.getMessage.contains("LargeBinary cannot be null")) } - test("constructor should accept valid BigObject") { - val bigObject = createTestObject("test/valid.txt", "test data") - withStream(bigObject) { _ => } + test("constructor should accept valid LargeBinary") { + val largeBinary = createTestObject("test/valid.txt", "test data") + withStream(largeBinary) { _ => } } // read() Tests test("read() should read single bytes correctly") { - val bigObject = createTestObject("test/single-byte.txt", "Hello") - withStream(bigObject) { stream => + val largeBinary = createTestObject("test/single-byte.txt", "Hello") + withStream(largeBinary) { stream => assert(stream.read() == 'H'.toByte) assert(stream.read() == 'e'.toByte) assert(stream.read() == 'l'.toByte) @@ -104,8 +104,8 @@ class BigObjectInputStreamSpec } test("read() should return -1 for empty object") { - val bigObject = createTestObject("test/empty.txt", "") - withStream(bigObject) { stream => + val largeBinary = createTestObject("test/empty.txt", "") + withStream(largeBinary) { stream => assert(stream.read() == -1) } } @@ -113,8 +113,8 @@ class BigObjectInputStreamSpec // read(byte[], int, int) Tests test("read(byte[], int, int) should read data into buffer") { val testData = "Hello, World!" - val bigObject = createTestObject("test/buffer-read.txt", testData) - withStream(bigObject) { stream => + val largeBinary = createTestObject("test/buffer-read.txt", testData) + withStream(largeBinary) { stream => val buffer = new Array[Byte](testData.length) val bytesRead = stream.read(buffer, 0, buffer.length) assert(bytesRead == testData.length) @@ -124,8 +124,8 @@ class BigObjectInputStreamSpec test("read(byte[], int, int) should handle partial reads and offsets") { val testData = "Hello, World!" - val bigObject = createTestObject("test/partial.txt", testData) - withStream(bigObject) { stream => + val largeBinary = createTestObject("test/partial.txt", testData) + withStream(largeBinary) { stream => // Test partial read val buffer1 = new Array[Byte](5) assert(stream.read(buffer1, 0, 5) == 5) @@ -133,7 +133,7 @@ class BigObjectInputStreamSpec } // Test offset - withStream(bigObject) { stream => + withStream(largeBinary) { stream => val buffer2 = new Array[Byte](20) assert(stream.read(buffer2, 5, 10) == 10) assert(new String(buffer2, 5, 10) == "Hello, Wor") @@ -141,8 +141,8 @@ class BigObjectInputStreamSpec } test("read(byte[], int, int) should return -1 at EOF") { - val bigObject = createTestObject("test/eof.txt", "test") - withStream(bigObject) { stream => + val largeBinary = createTestObject("test/eof.txt", "test") + withStream(largeBinary) { stream => val buffer = new Array[Byte](10) stream.read(buffer, 0, 10) assert(stream.read(buffer, 0, 10) == -1) @@ -152,16 +152,16 @@ class BigObjectInputStreamSpec // readAllBytes() Tests test("readAllBytes() should read entire object") { val testData = "Hello, World! This is a test." - val bigObject = createTestObject("test/read-all.txt", testData) - withStream(bigObject) { stream => + val largeBinary = createTestObject("test/read-all.txt", testData) + withStream(largeBinary) { stream => assert(new String(stream.readAllBytes()) == testData) } } test("readAllBytes() should handle large objects") { val largeData = generateRandomData(1024 * 1024) // 1MB - val bigObject = createTestObject("test/large.bin", largeData) - withStream(bigObject) { stream => + val largeBinary = createTestObject("test/large.bin", largeData) + withStream(largeBinary) { stream => val bytes = stream.readAllBytes() assert(bytes.length == largeData.length) assert(bytes.sameElements(largeData)) @@ -169,8 +169,8 @@ class BigObjectInputStreamSpec } test("readAllBytes() should return empty array for empty object") { - val bigObject = createTestObject("test/empty-all.txt", "") - withStream(bigObject) { stream => + val largeBinary = createTestObject("test/empty-all.txt", "") + withStream(largeBinary) { stream => assert(stream.readAllBytes().length == 0) } } @@ -178,8 +178,8 @@ class BigObjectInputStreamSpec // readNBytes() Tests test("readNBytes() should read exactly N bytes") { val testData = "Hello, World! This is a test." - val bigObject = createTestObject("test/read-n.txt", testData) - withStream(bigObject) { stream => + val largeBinary = createTestObject("test/read-n.txt", testData) + withStream(largeBinary) { stream => val bytes = stream.readNBytes(5) assert(bytes.length == 5) assert(new String(bytes) == "Hello") @@ -187,8 +187,8 @@ class BigObjectInputStreamSpec } test("readNBytes() should handle EOF and zero") { - val bigObject = createTestObject("test/read-n-eof.txt", "Hello") - withStream(bigObject) { stream => + val largeBinary = createTestObject("test/read-n-eof.txt", "Hello") + withStream(largeBinary) { stream => // Request more than available val bytes = stream.readNBytes(100) assert(bytes.length == 5) @@ -196,53 +196,53 @@ class BigObjectInputStreamSpec } // Test n=0 - withStream(bigObject) { stream => + withStream(largeBinary) { stream => assert(stream.readNBytes(0).length == 0) } } // skip() Tests test("skip() should skip bytes correctly") { - val bigObject = createTestObject("test/skip.txt", "Hello, World!") - withStream(bigObject) { stream => + val largeBinary = createTestObject("test/skip.txt", "Hello, World!") + withStream(largeBinary) { stream => assert(stream.skip(7) == 7) assert(stream.read() == 'W'.toByte) } } test("skip() should handle EOF and zero") { - val bigObject = createTestObject("test/skip-eof.txt", "Hello") - withStream(bigObject) { stream => + val largeBinary = createTestObject("test/skip-eof.txt", "Hello") + withStream(largeBinary) { stream => assert(stream.skip(100) == 5) assert(stream.read() == -1) } // Test n=0 - withStream(bigObject) { stream => + withStream(largeBinary) { stream => assert(stream.skip(0) == 0) } } // available() Tests test("available() should return non-negative value") { - val bigObject = createTestObject("test/available.txt", "Hello, World!") - withStream(bigObject) { stream => + val largeBinary = createTestObject("test/available.txt", "Hello, World!") + withStream(largeBinary) { stream => assert(stream.available() >= 0) } } // close() Tests test("close() should be idempotent") { - val bigObject = createTestObject("test/close-idempotent.txt", "data") - val stream = new BigObjectInputStream(bigObject) + val largeBinary = createTestObject("test/close-idempotent.txt", "data") + val stream = new LargeBinaryInputStream(largeBinary) stream.close() stream.close() // Should not throw stream.close() // Should not throw } test("close() should prevent further operations") { - val bigObject = createTestObject("test/close-prevents.txt", "data") - val stream = new BigObjectInputStream(bigObject) + val largeBinary = createTestObject("test/close-prevents.txt", "data") + val stream = new LargeBinaryInputStream(largeBinary) stream.close() intercept[IOException] { stream.read() } @@ -253,8 +253,8 @@ class BigObjectInputStreamSpec } test("close() should work without reading (lazy initialization)") { - val bigObject = createTestObject("test/close-lazy.txt", "data") - val stream = new BigObjectInputStream(bigObject) + val largeBinary = createTestObject("test/close-lazy.txt", "data") + val stream = new LargeBinaryInputStream(largeBinary) stream.close() // Should not throw } @@ -272,16 +272,16 @@ class BigObjectInputStreamSpec // mark/reset Tests test("markSupported() should delegate to underlying stream") { - val bigObject = createTestObject("test/mark.txt", "data") - withStream(bigObject) { stream => + val largeBinary = createTestObject("test/mark.txt", "data") + withStream(largeBinary) { stream => val supported = stream.markSupported() assert(!supported || supported) // Just verify it's callable } } test("mark() and reset() should delegate to underlying stream") { - val bigObject = createTestObject("test/mark-reset.txt", "data") - withStream(bigObject) { stream => + val largeBinary = createTestObject("test/mark-reset.txt", "data") + withStream(largeBinary) { stream => if (stream.markSupported()) { stream.mark(100) stream.read() @@ -293,8 +293,8 @@ class BigObjectInputStreamSpec // Lazy initialization Tests test("lazy initialization should not download until first read") { - val bigObject = createTestObject("test/lazy-init.txt", "data") - val stream = new BigObjectInputStream(bigObject) + val largeBinary = createTestObject("test/lazy-init.txt", "data") + val stream = new LargeBinaryInputStream(largeBinary) // Creating the stream should not trigger download // Reading should trigger download try { @@ -307,8 +307,8 @@ class BigObjectInputStreamSpec // Integration Tests test("should handle chunked reading of large objects") { val largeData = generateRandomData(10 * 1024) // 10KB - val bigObject = createTestObject("test/chunked.bin", largeData) - withStream(bigObject) { stream => + val largeBinary = createTestObject("test/chunked.bin", largeData) + withStream(largeBinary) { stream => val buffer = new Array[Byte](1024) val output = new java.io.ByteArrayOutputStream() var bytesRead = 0 @@ -328,10 +328,10 @@ class BigObjectInputStreamSpec test("should handle multiple streams reading same object") { val testData = "Shared data" - val bigObject = createTestObject("test/shared.txt", testData) + val largeBinary = createTestObject("test/shared.txt", testData) - val stream1 = new BigObjectInputStream(bigObject) - val stream2 = new BigObjectInputStream(bigObject) + val stream1 = new LargeBinaryInputStream(largeBinary) + val stream2 = new LargeBinaryInputStream(largeBinary) try { assert(new String(stream1.readAllBytes()) == testData) @@ -344,8 +344,8 @@ class BigObjectInputStreamSpec test("should preserve binary data integrity") { val binaryData = Array[Byte](0, 1, 2, 127, -128, -1, 50, 100) - val bigObject = createTestObject("test/binary.bin", binaryData) - withStream(bigObject) { stream => + val largeBinary = createTestObject("test/binary.bin", binaryData) + withStream(largeBinary) { stream => assert(stream.readAllBytes().sameElements(binaryData)) } } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala new file mode 100644 index 00000000000..77d142efeeb --- /dev/null +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import org.apache.texera.amber.core.tuple.LargeBinary +import org.scalatest.funsuite.AnyFunSuite + +class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { + + /** Creates a large binary from string data and returns it. */ + private def createLargeBinary(data: String): LargeBinary = { + val largeBinary = new LargeBinary() + val out = new LargeBinaryOutputStream(largeBinary) + try { + out.write(data.getBytes) + } finally { + out.close() + } + largeBinary + } + + /** Verifies standard bucket name. */ + private def assertStandardBucket(pointer: LargeBinary): Unit = { + assert(pointer.getBucketName == "texera-large-binaries") + assert(pointer.getUri.startsWith("s3://texera-large-binaries/")) + } + + // ======================================== + // LargeBinaryInputStream Tests (Standard Java InputStream) + // ======================================== + + test("LargeBinaryInputStream should read all bytes from stream") { + val data = "Hello, World! This is a test." + val largeBinary = createLargeBinary(data) + + val stream = new LargeBinaryInputStream(largeBinary) + assert(stream.readAllBytes().sameElements(data.getBytes)) + stream.close() + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryInputStream should read exact number of bytes") { + val largeBinary = createLargeBinary("0123456789ABCDEF") + + val stream = new LargeBinaryInputStream(largeBinary) + val result = stream.readNBytes(10) + + assert(result.length == 10) + assert(result.sameElements("0123456789".getBytes)) + stream.close() + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryInputStream should handle reading more bytes than available") { + val data = "Short" + val largeBinary = createLargeBinary(data) + + val stream = new LargeBinaryInputStream(largeBinary) + val result = stream.readNBytes(100) + + assert(result.length == data.length) + assert(result.sameElements(data.getBytes)) + stream.close() + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryInputStream should support standard single-byte read") { + val largeBinary = createLargeBinary("ABC") + + val stream = new LargeBinaryInputStream(largeBinary) + assert(stream.read() == 65) // 'A' + assert(stream.read() == 66) // 'B' + assert(stream.read() == 67) // 'C' + assert(stream.read() == -1) // EOF + stream.close() + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryInputStream should return -1 at EOF") { + val largeBinary = createLargeBinary("EOF") + + val stream = new LargeBinaryInputStream(largeBinary) + stream.readAllBytes() // Read all data + assert(stream.read() == -1) + stream.close() + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryInputStream should throw exception when reading from closed stream") { + val largeBinary = createLargeBinary("test") + + val stream = new LargeBinaryInputStream(largeBinary) + stream.close() + + assertThrows[java.io.IOException](stream.read()) + assertThrows[java.io.IOException](stream.readAllBytes()) + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryInputStream should handle multiple close calls") { + val largeBinary = createLargeBinary("test") + + val stream = new LargeBinaryInputStream(largeBinary) + stream.close() + stream.close() // Should not throw + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryInputStream should read large data correctly") { + val largeData = Array.fill[Byte](20000)((scala.util.Random.nextInt(256) - 128).toByte) + val largeBinary = new LargeBinary() + val out = new LargeBinaryOutputStream(largeBinary) + try { + out.write(largeData) + } finally { + out.close() + } + + val stream = new LargeBinaryInputStream(largeBinary) + val result = stream.readAllBytes() + assert(result.sameElements(largeData)) + stream.close() + + LargeBinaryManager.deleteAllObjects() + } + + // ======================================== + // LargeBinaryManager Tests + // ======================================== + + test("LargeBinaryManager should create a large binary") { + val pointer = createLargeBinary("Test large binary data") + + assertStandardBucket(pointer) + } + + test("LargeBinaryInputStream should open and read a large binary") { + val data = "Hello from large binary!" + val pointer = createLargeBinary(data) + + val stream = new LargeBinaryInputStream(pointer) + val readData = stream.readAllBytes() + stream.close() + + assert(readData.sameElements(data.getBytes)) + } + + test("LargeBinaryInputStream should fail to open non-existent large binary") { + val fakeLargeBinary = new LargeBinary("s3://texera-large-binaries/nonexistent/file") + val stream = new LargeBinaryInputStream(fakeLargeBinary) + + try { + intercept[Exception] { + stream.read() + } + } finally { + try { stream.close() } + catch { case _: Exception => } + } + } + + test("LargeBinaryManager should delete all large binaries") { + val pointer1 = new LargeBinary() + val out1 = new LargeBinaryOutputStream(pointer1) + try { + out1.write("Object 1".getBytes) + } finally { + out1.close() + } + + val pointer2 = new LargeBinary() + val out2 = new LargeBinaryOutputStream(pointer2) + try { + out2.write("Object 2".getBytes) + } finally { + out2.close() + } + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryManager should handle delete with no objects gracefully") { + LargeBinaryManager.deleteAllObjects() // Should not throw exception + } + + test("LargeBinaryManager should delete all objects") { + val pointer1 = createLargeBinary("Test data") + val pointer2 = createLargeBinary("Test data") + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryManager should create bucket if it doesn't exist") { + val pointer = createLargeBinary("Test bucket creation") + + assertStandardBucket(pointer) + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryManager should handle large objects correctly") { + val largeData = Array.fill[Byte](6 * 1024 * 1024)((scala.util.Random.nextInt(256) - 128).toByte) + val pointer = new LargeBinary() + val out = new LargeBinaryOutputStream(pointer) + try { + out.write(largeData) + } finally { + out.close() + } + + val stream = new LargeBinaryInputStream(pointer) + val readData = stream.readAllBytes() + stream.close() + + assert(readData.sameElements(largeData)) + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryManager should generate unique URIs for different objects") { + val testData = "Unique URI test".getBytes + val pointer1 = new LargeBinary() + val out1 = new LargeBinaryOutputStream(pointer1) + try { + out1.write(testData) + } finally { + out1.close() + } + + val pointer2 = new LargeBinary() + val out2 = new LargeBinaryOutputStream(pointer2) + try { + out2.write(testData) + } finally { + out2.close() + } + + assert(pointer1.getUri != pointer2.getUri) + assert(pointer1.getObjectKey != pointer2.getObjectKey) + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryInputStream should handle multiple reads from the same large binary") { + val data = "Multiple reads test data" + val pointer = createLargeBinary(data) + + val stream1 = new LargeBinaryInputStream(pointer) + val readData1 = stream1.readAllBytes() + stream1.close() + + val stream2 = new LargeBinaryInputStream(pointer) + val readData2 = stream2.readAllBytes() + stream2.close() + + assert(readData1.sameElements(data.getBytes)) + assert(readData2.sameElements(data.getBytes)) + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryManager should properly parse bucket name and object key from large binary") { + val largeBinary = createLargeBinary("URI parsing test") + + assertStandardBucket(largeBinary) + assert(largeBinary.getObjectKey.nonEmpty) + assert(!largeBinary.getObjectKey.startsWith("/")) + + LargeBinaryManager.deleteAllObjects() + } + + // ======================================== + // Object-Oriented API Tests + // ======================================== + + test("LargeBinary with LargeBinaryOutputStream should create a large binary") { + val data = "Test data for LargeBinary with LargeBinaryOutputStream" + + val largeBinary = new LargeBinary() + val out = new LargeBinaryOutputStream(largeBinary) + try { + out.write(data.getBytes) + } finally { + out.close() + } + + assertStandardBucket(largeBinary) + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryInputStream constructor should read large binary contents") { + val data = "Test data for LargeBinaryInputStream constructor" + val largeBinary = createLargeBinary(data) + + val stream = new LargeBinaryInputStream(largeBinary) + val readData = stream.readAllBytes() + stream.close() + + assert(readData.sameElements(data.getBytes)) + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryOutputStream and LargeBinaryInputStream should work together end-to-end") { + val data = "End-to-end test data" + + // Create using streaming API + val largeBinary = new LargeBinary() + val out = new LargeBinaryOutputStream(largeBinary) + try { + out.write(data.getBytes) + } finally { + out.close() + } + + // Read using standard constructor + val stream = new LargeBinaryInputStream(largeBinary) + val readData = stream.readAllBytes() + stream.close() + + assert(readData.sameElements(data.getBytes)) + + LargeBinaryManager.deleteAllObjects() + } + + // ======================================== + // LargeBinaryOutputStream Tests (New Symmetric API) + // ======================================== + + test("LargeBinaryOutputStream should write and upload data to S3") { + val data = "Test data for LargeBinaryOutputStream" + + val largeBinary = new LargeBinary() + val outStream = new LargeBinaryOutputStream(largeBinary) + outStream.write(data.getBytes) + outStream.close() + + assertStandardBucket(largeBinary) + + // Verify data can be read back + val inStream = new LargeBinaryInputStream(largeBinary) + val readData = inStream.readAllBytes() + inStream.close() + + assert(readData.sameElements(data.getBytes)) + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryOutputStream should create large binary") { + val data = "Database registration test" + + val largeBinary = new LargeBinary() + val outStream = new LargeBinaryOutputStream(largeBinary) + outStream.write(data.getBytes) + outStream.close() + + assertStandardBucket(largeBinary) + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryOutputStream should handle large data correctly") { + val largeData = Array.fill[Byte](8 * 1024 * 1024)((scala.util.Random.nextInt(256) - 128).toByte) + + val largeBinary = new LargeBinary() + val outStream = new LargeBinaryOutputStream(largeBinary) + outStream.write(largeData) + outStream.close() + + // Verify data integrity + val inStream = new LargeBinaryInputStream(largeBinary) + val readData = inStream.readAllBytes() + inStream.close() + + assert(readData.sameElements(largeData)) + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryOutputStream should handle multiple writes") { + val largeBinary = new LargeBinary() + val outStream = new LargeBinaryOutputStream(largeBinary) + outStream.write("Hello ".getBytes) + outStream.write("World".getBytes) + outStream.write("!".getBytes) + outStream.close() + + val inStream = new LargeBinaryInputStream(largeBinary) + val readData = inStream.readAllBytes() + inStream.close() + + assert(readData.sameElements("Hello World!".getBytes)) + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryOutputStream should throw exception when writing to closed stream") { + val largeBinary = new LargeBinary() + val outStream = new LargeBinaryOutputStream(largeBinary) + outStream.write("test".getBytes) + outStream.close() + + assertThrows[java.io.IOException](outStream.write("more".getBytes)) + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinaryOutputStream should handle close() being called multiple times") { + val largeBinary = new LargeBinary() + val outStream = new LargeBinaryOutputStream(largeBinary) + outStream.write("test".getBytes) + outStream.close() + outStream.close() // Should not throw + + LargeBinaryManager.deleteAllObjects() + } + + test("New LargeBinary() constructor should create unique URIs") { + val largeBinary1 = new LargeBinary() + val largeBinary2 = new LargeBinary() + + assert(largeBinary1.getUri != largeBinary2.getUri) + assert(largeBinary1.getObjectKey != largeBinary2.getObjectKey) + + LargeBinaryManager.deleteAllObjects() + } + + test("LargeBinary() and LargeBinaryOutputStream API should be symmetric with input") { + val data = "Symmetric API test" + + // Write using new symmetric API + val largeBinary = new LargeBinary() + val outStream = new LargeBinaryOutputStream(largeBinary) + outStream.write(data.getBytes) + outStream.close() + + // Read using symmetric API + val inStream = new LargeBinaryInputStream(largeBinary) + val readData = inStream.readAllBytes() + inStream.close() + + assert(readData.sameElements(data.getBytes)) + + LargeBinaryManager.deleteAllObjects() + } +} diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectOutputStreamSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryOutputStreamSpec.scala similarity index 63% rename from common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectOutputStreamSpec.scala rename to common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryOutputStreamSpec.scala index b4c106d6197..ed2a2fbace7 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectOutputStreamSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryOutputStreamSpec.scala @@ -19,20 +19,20 @@ package org.apache.texera.service.util -import org.apache.texera.amber.core.tuple.BigObject +import org.apache.texera.amber.core.tuple.LargeBinary import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.funsuite.AnyFunSuite import java.io.IOException import scala.util.Random -class BigObjectOutputStreamSpec +class LargeBinaryOutputStreamSpec extends AnyFunSuite with S3StorageTestBase with BeforeAndAfterAll with BeforeAndAfterEach { - private val testBucketName = "test-big-object-output-stream" + private val testBucketName = "test-large-binary-output-stream" override def beforeAll(): Unit = { super.beforeAll() @@ -49,43 +49,43 @@ class BigObjectOutputStreamSpec } // Helper methods - private def createBigObject(key: String): BigObject = - new BigObject(s"s3://$testBucketName/$key") + private def createLargeBinary(key: String): LargeBinary = + new LargeBinary(s"s3://$testBucketName/$key") private def generateRandomData(size: Int): Array[Byte] = Array.fill[Byte](size)((Random.nextInt(256) - 128).toByte) - private def withStream[T](bigObject: BigObject)(f: BigObjectOutputStream => T): T = { - val stream = new BigObjectOutputStream(bigObject) + private def withStream[T](largeBinary: LargeBinary)(f: LargeBinaryOutputStream => T): T = { + val stream = new LargeBinaryOutputStream(largeBinary) try f(stream) finally stream.close() } - private def readBack(bigObject: BigObject): Array[Byte] = { - val inputStream = new BigObjectInputStream(bigObject) + private def readBack(largeBinary: LargeBinary): Array[Byte] = { + val inputStream = new LargeBinaryInputStream(largeBinary) try inputStream.readAllBytes() finally inputStream.close() } private def writeAndVerify(key: String, data: Array[Byte]): Unit = { - val bigObject = createBigObject(key) - withStream(bigObject)(_.write(data, 0, data.length)) - assert(readBack(bigObject).sameElements(data)) + val largeBinary = createLargeBinary(key) + withStream(largeBinary)(_.write(data, 0, data.length)) + assert(readBack(largeBinary).sameElements(data)) } // === Constructor Tests === - test("should reject null BigObject") { - val exception = intercept[IllegalArgumentException](new BigObjectOutputStream(null)) - assert(exception.getMessage.contains("BigObject cannot be null")) + test("should reject null LargeBinary") { + val exception = intercept[IllegalArgumentException](new LargeBinaryOutputStream(null)) + assert(exception.getMessage.contains("LargeBinary cannot be null")) } // === Basic Write Tests === test("should write single bytes correctly") { - val bigObject = createBigObject("test/single-bytes.txt") - withStream(bigObject) { stream => + val largeBinary = createLargeBinary("test/single-bytes.txt") + withStream(largeBinary) { stream => "Hello".foreach(c => stream.write(c.toByte)) } - assert(new String(readBack(bigObject)) == "Hello") + assert(new String(readBack(largeBinary)) == "Hello") } test("should write byte arrays correctly") { @@ -95,58 +95,58 @@ class BigObjectOutputStreamSpec test("should handle partial writes with offset and length") { val testData = "Hello, World!".getBytes - val bigObject = createBigObject("test/partial-write.txt") + val largeBinary = createLargeBinary("test/partial-write.txt") - withStream(bigObject) { stream => + withStream(largeBinary) { stream => stream.write(testData, 0, 5) // "Hello" stream.write(testData, 7, 5) // "World" } - assert(new String(readBack(bigObject)) == "HelloWorld") + assert(new String(readBack(largeBinary)) == "HelloWorld") } test("should handle multiple consecutive writes") { - val bigObject = createBigObject("test/multiple-writes.txt") - withStream(bigObject) { stream => + val largeBinary = createLargeBinary("test/multiple-writes.txt") + withStream(largeBinary) { stream => stream.write("Hello".getBytes) stream.write(", ".getBytes) stream.write("World!".getBytes) } - assert(new String(readBack(bigObject)) == "Hello, World!") + assert(new String(readBack(largeBinary)) == "Hello, World!") } // === Stream Lifecycle Tests === test("flush should not throw") { - val bigObject = createBigObject("test/flush.txt") - withStream(bigObject) { stream => + val largeBinary = createLargeBinary("test/flush.txt") + withStream(largeBinary) { stream => stream.write("test".getBytes) stream.flush() stream.write(" data".getBytes) } - assert(new String(readBack(bigObject)) == "test data") + assert(new String(readBack(largeBinary)) == "test data") } test("close should be idempotent") { - val bigObject = createBigObject("test/close-idempotent.txt") - val stream = new BigObjectOutputStream(bigObject) + val largeBinary = createLargeBinary("test/close-idempotent.txt") + val stream = new LargeBinaryOutputStream(largeBinary) stream.write("data".getBytes) stream.close() stream.close() // Should not throw stream.flush() // Should not throw after close - assert(new String(readBack(bigObject)) == "data") + assert(new String(readBack(largeBinary)) == "data") } test("close should handle empty stream") { - val bigObject = createBigObject("test/empty-stream.txt") - val stream = new BigObjectOutputStream(bigObject) + val largeBinary = createLargeBinary("test/empty-stream.txt") + val stream = new LargeBinaryOutputStream(largeBinary) stream.close() - assert(readBack(bigObject).length == 0) + assert(readBack(largeBinary).length == 0) } // === Error Handling === test("write operations should throw IOException when stream is closed") { - val bigObject = createBigObject("test/closed-stream.txt") - val stream = new BigObjectOutputStream(bigObject) + val largeBinary = createLargeBinary("test/closed-stream.txt") + val stream = new LargeBinaryOutputStream(largeBinary) stream.close() val ex1 = intercept[IOException](stream.write('A'.toByte)) @@ -171,13 +171,13 @@ class BigObjectOutputStreamSpec val totalSize = 1024 * 1024 // 1MB val chunkSize = 8 * 1024 // 8KB val data = generateRandomData(totalSize) - val bigObject = createBigObject("test/chunked.bin") + val largeBinary = createLargeBinary("test/chunked.bin") - withStream(bigObject) { stream => + withStream(largeBinary) { stream => data.grouped(chunkSize).foreach(chunk => stream.write(chunk)) } - assert(readBack(bigObject).sameElements(data)) + assert(readBack(largeBinary).sameElements(data)) } // === Binary Data Tests === @@ -189,8 +189,8 @@ class BigObjectOutputStreamSpec // === Integration Tests === test("should handle concurrent writes to different objects") { val streams = (1 to 3).map { i => - val obj = createBigObject(s"test/concurrent-$i.txt") - val stream = new BigObjectOutputStream(obj) + val obj = createLargeBinary(s"test/concurrent-$i.txt") + val stream = new LargeBinaryOutputStream(obj) (obj, stream, s"Data $i") } @@ -207,32 +207,32 @@ class BigObjectOutputStreamSpec } test("should overwrite existing object") { - val bigObject = createBigObject("test/overwrite.txt") - withStream(bigObject)(_.write("original data".getBytes)) - withStream(bigObject)(_.write("new data".getBytes)) - assert(new String(readBack(bigObject)) == "new data") + val largeBinary = createLargeBinary("test/overwrite.txt") + withStream(largeBinary)(_.write("original data".getBytes)) + withStream(largeBinary)(_.write("new data".getBytes)) + assert(new String(readBack(largeBinary)) == "new data") } test("should handle mixed write operations") { - val bigObject = createBigObject("test/mixed-writes.txt") - withStream(bigObject) { stream => + val largeBinary = createLargeBinary("test/mixed-writes.txt") + withStream(largeBinary) { stream => stream.write('A'.toByte) stream.write(" test ".getBytes) stream.write('B'.toByte) val data = "Hello, World!".getBytes stream.write(data, 7, 6) // "World!" } - assert(new String(readBack(bigObject)) == "A test BWorld!") + assert(new String(readBack(largeBinary)) == "A test BWorld!") } // === Edge Cases === test("should create bucket automatically") { val newBucketName = s"new-bucket-${Random.nextInt(10000)}" - val bigObject = new BigObject(s"s3://$newBucketName/test/auto-create.txt") + val largeBinary = new LargeBinary(s"s3://$newBucketName/test/auto-create.txt") try { - withStream(bigObject)(_.write("test".getBytes)) - assert(new String(readBack(bigObject)) == "test") + withStream(largeBinary)(_.write("test".getBytes)) + assert(new String(readBack(largeBinary)) == "test") } finally { try S3StorageClient.deleteDirectory(newBucketName, "") catch { case _: Exception => /* ignore */ } @@ -241,11 +241,11 @@ class BigObjectOutputStreamSpec test("should handle rapid open/close cycles") { (1 to 10).foreach { i => - withStream(createBigObject(s"test/rapid-$i.txt"))(_.write(s"data-$i".getBytes)) + withStream(createLargeBinary(s"test/rapid-$i.txt"))(_.write(s"data-$i".getBytes)) } (1 to 10).foreach { i => - val result = readBack(createBigObject(s"test/rapid-$i.txt")) + val result = readBack(createLargeBinary(s"test/rapid-$i.txt")) assert(new String(result) == s"data-$i") } } diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileAttributeType.java b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileAttributeType.java index 9e1dfc2b8c3..87d6c38c64d 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileAttributeType.java +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileAttributeType.java @@ -31,7 +31,7 @@ public enum FileAttributeType { BOOLEAN("boolean", AttributeType.BOOLEAN), TIMESTAMP("timestamp", AttributeType.TIMESTAMP), BINARY("binary", AttributeType.BINARY), - BIG_OBJECT("big object", AttributeType.BIG_OBJECT); + LARGE_BINARY("large binary", AttributeType.LARGE_BINARY); private final String name; @@ -57,6 +57,6 @@ public String toString() { } public boolean isSingle() { - return this == SINGLE_STRING || this == BINARY || this == BIG_OBJECT; + return this == SINGLE_STRING || this == BINARY || this == LARGE_BINARY; } } diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala index 11bfd60cf06..91c817240cc 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala @@ -22,9 +22,9 @@ package org.apache.texera.amber.operator.source.scan import org.apache.texera.amber.core.executor.SourceOperatorExecutor import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField -import org.apache.texera.amber.core.tuple.{BigObject, TupleLike} +import org.apache.texera.amber.core.tuple.{LargeBinary, TupleLike} import org.apache.texera.amber.util.JSONUtils.objectMapper -import org.apache.texera.service.util.BigObjectOutputStream +import org.apache.texera.service.util.LargeBinaryOutputStream import org.apache.commons.compress.archivers.ArchiveStreamFactory import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream import org.apache.commons.io.IOUtils.toByteArray @@ -85,10 +85,10 @@ class FileScanSourceOpExec private[scan] ( fields.addOne(desc.attributeType match { case FileAttributeType.SINGLE_STRING => new String(toByteArray(entry), desc.fileEncoding.getCharset) - case FileAttributeType.BIG_OBJECT => - // For big objects, create reference and upload via streaming - val bigObject = new BigObject() - val out = new BigObjectOutputStream(bigObject) + case FileAttributeType.LARGE_BINARY => + // For large binaries, create reference and upload via streaming + val largeBinary = new LargeBinary() + val out = new LargeBinaryOutputStream(largeBinary) try { val buffer = new Array[Byte](8192) var bytesRead = entry.read(buffer) @@ -99,7 +99,7 @@ class FileScanSourceOpExec private[scan] ( } finally { out.close() } - bigObject + largeBinary case _ => parseField(toByteArray(entry), desc.attributeType.getType) }) TupleLike(fields.toSeq: _*) diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala index 9a23b3f1d61..ab3383ca0ad 100644 --- a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala @@ -19,7 +19,7 @@ package org.apache.texera.amber.operator.source.scan -import org.apache.texera.amber.core.tuple.{AttributeType, BigObject, Schema, SchemaEnforceable} +import org.apache.texera.amber.core.tuple.{AttributeType, LargeBinary, Schema, SchemaEnforceable} import org.apache.texera.amber.util.JSONUtils.objectMapper import org.scalatest.BeforeAndAfterAll import org.scalatest.flatspec.AnyFlatSpec @@ -30,8 +30,8 @@ import java.nio.file.{Files, Path} import java.util.zip.{ZipEntry, ZipOutputStream} /** - * Unit tests for BIG_OBJECT logic in FileScanSourceOpExec. - * Full integration tests with S3 and database are in BigObjectManagerSpec. + * Unit tests for LARGE_BINARY logic in FileScanSourceOpExec. + * Full integration tests with S3 and database are in LargeBinaryManagerSpec. */ class FileScanSourceOpExecSpec extends AnyFlatSpec with BeforeAndAfterAll { @@ -40,8 +40,8 @@ class FileScanSourceOpExecSpec extends AnyFlatSpec with BeforeAndAfterAll { .resolve("common/workflow-operator/src/test/resources") .toRealPath() - private val testFile = testDir.resolve("test_big_object.txt") - private val testZip = testDir.resolve("test_big_object.zip") + private val testFile = testDir.resolve("test_large_binary.txt") + private val testZip = testDir.resolve("test_large_binary.zip") override def beforeAll(): Unit = { super.beforeAll() @@ -75,7 +75,7 @@ class FileScanSourceOpExecSpec extends AnyFlatSpec with BeforeAndAfterAll { ): FileScanSourceOpDesc = { val desc = new FileScanSourceOpDesc() desc.fileName = Some(file.toString) - desc.attributeType = FileAttributeType.BIG_OBJECT + desc.attributeType = FileAttributeType.LARGE_BINARY desc.attributeName = attributeName desc.fileEncoding = FileDecodingMethod.UTF_8 desc @@ -83,26 +83,26 @@ class FileScanSourceOpExecSpec extends AnyFlatSpec with BeforeAndAfterAll { private def assertSchema(schema: Schema, attributeName: String): Unit = { assert(schema.getAttributes.length == 1) - assert(schema.getAttribute(attributeName).getType == AttributeType.BIG_OBJECT) + assert(schema.getAttribute(attributeName).getType == AttributeType.LARGE_BINARY) } // Schema Tests - it should "infer BIG_OBJECT schema with default attribute name" in { + it should "infer LARGE_BINARY schema with default attribute name" in { assertSchema(createDescriptor().sourceSchema(), "line") } - it should "infer BIG_OBJECT schema with custom attribute name" in { + it should "infer LARGE_BINARY schema with custom attribute name" in { assertSchema(createDescriptor(attributeName = "custom_field").sourceSchema(), "custom_field") } - it should "map BIG_OBJECT to correct AttributeType" in { - assert(FileAttributeType.BIG_OBJECT.getType == AttributeType.BIG_OBJECT) + it should "map LARGE_BINARY to correct AttributeType" in { + assert(FileAttributeType.LARGE_BINARY.getType == AttributeType.LARGE_BINARY) } // Type Classification Tests - it should "correctly classify BIG_OBJECT as isSingle type" in { + it should "correctly classify LARGE_BINARY as isSingle type" in { val isSingleTypes = List( - FileAttributeType.BIG_OBJECT, + FileAttributeType.LARGE_BINARY, FileAttributeType.SINGLE_STRING, FileAttributeType.BINARY ) @@ -120,7 +120,7 @@ class FileScanSourceOpExecSpec extends AnyFlatSpec with BeforeAndAfterAll { } // Execution Tests - it should "create BigObject when reading file with BIG_OBJECT type" in { + it should "create LargeBinary when reading file with LARGE_BINARY type" in { val desc = createDescriptor() desc.setResolvedFileName(URI.create(testFile.toUri.toString)) @@ -137,26 +137,26 @@ class FileScanSourceOpExecSpec extends AnyFlatSpec with BeforeAndAfterAll { .enforceSchema(desc.sourceSchema()) .getField[Any]("line") - assert(field.isInstanceOf[BigObject]) - assert(field.asInstanceOf[BigObject].getUri.startsWith("s3://")) + assert(field.isInstanceOf[LargeBinary]) + assert(field.asInstanceOf[LargeBinary].getUri.startsWith("s3://")) } catch { case e: Exception => info(s"S3 not configured: ${e.getMessage}") } } - // BigObject Tests - it should "create valid BigObject with correct URI parsing" in { - val pointer = new BigObject("s3://bucket/path/to/object") + // LargeBinary Tests + it should "create valid LargeBinary with correct URI parsing" in { + val pointer = new LargeBinary("s3://bucket/path/to/object") assert(pointer.getUri == "s3://bucket/path/to/object") assert(pointer.getBucketName == "bucket") assert(pointer.getObjectKey == "path/to/object") } - it should "reject invalid BigObject URIs" in { - assertThrows[IllegalArgumentException](new BigObject("http://invalid")) - assertThrows[IllegalArgumentException](new BigObject("not-a-uri")) - assertThrows[IllegalArgumentException](new BigObject(null.asInstanceOf[String])) + it should "reject invalid LargeBinary URIs" in { + assertThrows[IllegalArgumentException](new LargeBinary("http://invalid")) + assertThrows[IllegalArgumentException](new LargeBinary("not-a-uri")) + assertThrows[IllegalArgumentException](new LargeBinary(null.asInstanceOf[String])) } }