diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala index 01ae898a66a..c6917d9390a 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala @@ -36,6 +36,7 @@ import org.apache.texera.dao.jooq.generated.tables.daos.{ WorkflowUserAccessDao } import org.apache.texera.dao.jooq.generated.tables.pojos._ +import org.apache.texera.service.util.BigObjectManager import org.apache.texera.web.resource.dashboard.hub.EntityType import org.apache.texera.web.resource.dashboard.hub.HubResource.recordCloneAction import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowAccessResource.hasReadAccess @@ -600,6 +601,8 @@ class WorkflowResource extends LazyLogging { .asScala .toList + BigObjectManager.deleteAllObjects() + // Collect all URIs related to executions for cleanup val uris = eids.flatMap { eid => val executionId = ExecutionIdentity(eid.longValue()) diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala index 01c66fb4589..c9f1bee3463 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala @@ -46,6 +46,7 @@ import org.apache.amber.engine.architecture.worker.WorkflowWorker.{ } import org.apache.amber.error.ErrorUtils.{getOperatorFromActorIdOpt, getStackTraceWithAllCauses} import org.apache.texera.dao.jooq.generated.tables.pojos.User +import org.apache.texera.service.util.BigObjectManager import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource @@ -307,6 +308,7 @@ class WorkflowService( * 2. Clears URI references from the execution registry * 3. Safely clears all result and console message documents * 4. Expires Iceberg snapshots for runtime statistics + * 5. Deletes big objects from MinIO * * @param eid The execution identity to clean up resources for */ @@ -343,6 +345,7 @@ class WorkflowService( logger.debug(s"Error processing document at $uri: ${error.getMessage}") } } + // Delete big objects + BigObjectManager.deleteAllObjects() } - } diff --git a/common/workflow-core/build.sbt b/common/workflow-core/build.sbt index 82a79e8e04b..ab6b8f27c65 100644 --- a/common/workflow-core/build.sbt +++ b/common/workflow-core/build.sbt @@ -83,11 +83,15 @@ Test / PB.protoSources += PB.externalSourcePath.value // Test-related Dependencies ///////////////////////////////////////////////////////////////////////////// +val testcontainersVersion = "0.43.0" + libraryDependencies ++= Seq( "org.scalamock" %% "scalamock" % "5.2.0" % Test, // ScalaMock "org.scalatest" %% "scalatest" % "3.2.15" % Test, // ScalaTest "junit" % "junit" % "4.13.2" % Test, // JUnit - "com.novocode" % "junit-interface" % "0.11" % Test // SBT interface for JUnit + "com.novocode" % "junit-interface" % "0.11" % Test, // SBT interface for JUnit + "com.dimafeng" %% "testcontainers-scala-scalatest" % testcontainersVersion % Test, // Testcontainers ScalaTest integration + "com.dimafeng" %% "testcontainers-scala-minio" % testcontainersVersion % Test // MinIO Testcontainer Scala integration ) @@ -183,5 +187,10 @@ libraryDependencies ++= Seq( "org.apache.commons" % "commons-vfs2" % "2.9.0", // for FileResolver throw VFS-related exceptions "io.lakefs" % "sdk" % "1.51.0", // for lakeFS api calls "com.typesafe" % "config" % "1.4.3", // config reader - "org.apache.commons" % "commons-jcs3-core" % "3.2" // Apache Commons JCS + "org.apache.commons" % "commons-jcs3-core" % "3.2", // Apache Commons JCS + "software.amazon.awssdk" % "s3" % "2.29.51" excludeAll( + ExclusionRule(organization = "io.netty") + ), + "software.amazon.awssdk" % "auth" % "2.29.51", + "software.amazon.awssdk" % "regions" % "2.29.51", ) \ No newline at end of file diff --git a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeType.java b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeType.java index 472679f5275..64fa921d461 100644 --- a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeType.java +++ b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeType.java @@ -70,6 +70,7 @@ public enum AttributeType implements Serializable { BOOLEAN("boolean", Boolean.class), TIMESTAMP("timestamp", Timestamp.class), BINARY("binary", byte[].class), + BIG_OBJECT("big_object", BigObject.class), ANY("ANY", Object.class); private final String name; @@ -109,6 +110,8 @@ public static AttributeType getAttributeType(Class fieldClass) { return TIMESTAMP; } else if (fieldClass.equals(byte[].class)) { return BINARY; + } else if (fieldClass.equals(BigObject.class)) { + return BIG_OBJECT; } else { return ANY; } diff --git a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala index 7cbfb271796..b959c38981b 100644 --- a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala +++ b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala @@ -121,14 +121,15 @@ object AttributeTypeUtils extends Serializable { ): Any = { if (field == null) return null attributeType match { - case AttributeType.INTEGER => parseInteger(field, force) - case AttributeType.LONG => parseLong(field, force) - case AttributeType.DOUBLE => parseDouble(field) - case AttributeType.BOOLEAN => parseBoolean(field) - case AttributeType.TIMESTAMP => parseTimestamp(field) - case AttributeType.STRING => field.toString - case AttributeType.BINARY => field - case AttributeType.ANY | _ => field + case AttributeType.INTEGER => parseInteger(field, force) + case AttributeType.LONG => parseLong(field, force) + case AttributeType.DOUBLE => parseDouble(field) + case AttributeType.BOOLEAN => parseBoolean(field) + case AttributeType.TIMESTAMP => parseTimestamp(field) + case AttributeType.STRING => field.toString + case AttributeType.BINARY => field + case AttributeType.BIG_OBJECT => new BigObject(field.toString) + case AttributeType.ANY | _ => field } } @@ -383,7 +384,9 @@ object AttributeTypeUtils extends Serializable { case AttributeType.INTEGER => tryParseInteger(fieldValue) case AttributeType.TIMESTAMP => tryParseTimestamp(fieldValue) case AttributeType.BINARY => tryParseString() - case _ => tryParseString() + case AttributeType.BIG_OBJECT => + AttributeType.BIG_OBJECT // Big objects are never inferred from data + case _ => tryParseString() } } diff --git a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/BigObject.java b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/BigObject.java new file mode 100644 index 00000000000..2be14dc167c --- /dev/null +++ b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/BigObject.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.amber.core.tuple; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; +import org.apache.amber.core.executor.OperatorExecutor; +import org.apache.texera.service.util.BigObjectManager; + +import java.net.URI; +import java.util.Objects; + +/** + * BigObject represents a reference to a large object stored in S3. + * + * Each BigObject is identified by an S3 URI (s3://bucket/path/to/object). + * BigObjects are automatically tracked and cleaned up when the workflow execution completes. + */ +public class BigObject { + + private final String uri; + + /** + * Creates a BigObject from an existing S3 URI. + * Used primarily for deserialization from JSON. + * + * @param uri S3 URI in the format s3://bucket/path/to/object + * @throws IllegalArgumentException if URI is null or doesn't start with "s3://" + */ + @JsonCreator + public BigObject(@JsonProperty("uri") String uri) { + if (uri == null) { + throw new IllegalArgumentException("BigObject URI cannot be null"); + } + if (!uri.startsWith("s3://")) { + throw new IllegalArgumentException( + "BigObject URI must start with 's3://', got: " + uri + ); + } + this.uri = uri; + } + + /** + * Creates a new BigObject for writing data. + * Generates a unique S3 URI. + * + * Usage example: + * + * BigObject bigObject = new BigObject(); + * try (BigObjectOutputStream out = new BigObjectOutputStream(bigObject)) { + * out.write(data); + * } + * // bigObject is now ready to be added to tuples + * + */ + public BigObject() { + this(BigObjectManager.create()); + } + + @JsonValue + public String getUri() { + return uri; + } + + public String getBucketName() { + return URI.create(uri).getHost(); + } + + public String getObjectKey() { + String path = URI.create(uri).getPath(); + return path.startsWith("/") ? path.substring(1) : path; + } + + @Override + public String toString() { + return uri; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof BigObject)) return false; + BigObject that = (BigObject) obj; + return Objects.equals(uri, that.uri); + } + + @Override + public int hashCode() { + return Objects.hash(uri); + } +} diff --git a/common/workflow-core/src/main/scala/org/apache/amber/util/IcebergUtil.scala b/common/workflow-core/src/main/scala/org/apache/amber/util/IcebergUtil.scala index bc171396418..7216530a91a 100644 --- a/common/workflow-core/src/main/scala/org/apache/amber/util/IcebergUtil.scala +++ b/common/workflow-core/src/main/scala/org/apache/amber/util/IcebergUtil.scala @@ -20,7 +20,7 @@ package org.apache.amber.util import org.apache.amber.config.StorageConfig -import org.apache.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} +import org.apache.amber.core.tuple.{Attribute, AttributeType, BigObject, Schema, Tuple} import org.apache.hadoop.conf.Configuration import org.apache.iceberg.catalog.{Catalog, TableIdentifier} import org.apache.iceberg.data.parquet.GenericParquetReaders @@ -52,6 +52,9 @@ import scala.jdk.CollectionConverters._ */ object IcebergUtil { + // Unique suffix for BIG_OBJECT field encoding + private val BIG_OBJECT_FIELD_SUFFIX = "__texera_big_obj_ptr" + /** * Creates and initializes a HadoopCatalog with the given parameters. * - Uses an empty Hadoop `Configuration`, meaning the local file system (or `file:/`) will be used by default @@ -200,6 +203,7 @@ object IcebergUtil { /** * Converts a custom Amber `Schema` to an Iceberg `Schema`. + * Field names are encoded to preserve BIG_OBJECT type information. * * @param amberSchema The custom Amber Schema. * @return An Iceberg Schema. @@ -207,13 +211,16 @@ object IcebergUtil { def toIcebergSchema(amberSchema: Schema): IcebergSchema = { val icebergFields = amberSchema.getAttributes.zipWithIndex.map { case (attribute, index) => - Types.NestedField.optional(index + 1, attribute.getName, toIcebergType(attribute.getType)) + val encodedName = encodeBigObjectFieldName(attribute.getName, attribute.getType) + val icebergType = toIcebergType(attribute.getType) + Types.NestedField.optional(index + 1, encodedName, icebergType) } new IcebergSchema(icebergFields.asJava) } /** * Converts a custom Amber `AttributeType` to an Iceberg `Type`. + * Note: BIG_OBJECT is stored as StringType; field name encoding is used to distinguish it. * * @param attributeType The custom Amber AttributeType. * @return The corresponding Iceberg Type. @@ -227,6 +234,8 @@ object IcebergUtil { case AttributeType.BOOLEAN => Types.BooleanType.get() case AttributeType.TIMESTAMP => Types.TimestampType.withoutZone() case AttributeType.BINARY => Types.BinaryType.get() + case AttributeType.BIG_OBJECT => + Types.StringType.get() // Store BigObjectPointer URI as string case AttributeType.ANY => throw new IllegalArgumentException("ANY type is not supported in Iceberg") } @@ -243,13 +252,15 @@ object IcebergUtil { tuple.schema.getAttributes.zipWithIndex.foreach { case (attribute, index) => + val fieldName = encodeBigObjectFieldName(attribute.getName, attribute.getType) val value = tuple.getField[AnyRef](index) match { - case null => null - case ts: Timestamp => ts.toInstant.atZone(ZoneId.systemDefault()).toLocalDateTime - case bytes: Array[Byte] => ByteBuffer.wrap(bytes) - case other => other + case null => null + case ts: Timestamp => ts.toInstant.atZone(ZoneId.systemDefault()).toLocalDateTime + case bytes: Array[Byte] => ByteBuffer.wrap(bytes) + case bigObjPtr: BigObject => bigObjPtr.getUri + case other => other } - record.setField(attribute.getName, value) + record.setField(fieldName, value) } record @@ -264,23 +275,69 @@ object IcebergUtil { */ def fromRecord(record: Record, amberSchema: Schema): Tuple = { val fieldValues = amberSchema.getAttributes.map { attribute => - val value = record.getField(attribute.getName) match { + val fieldName = encodeBigObjectFieldName(attribute.getName, attribute.getType) + val rawValue = record.getField(fieldName) + + rawValue match { case null => null case ldt: LocalDateTime => Timestamp.valueOf(ldt) case buffer: ByteBuffer => val bytes = new Array[Byte](buffer.remaining()) buffer.get(bytes) bytes + case uri: String if attribute.getType == AttributeType.BIG_OBJECT => + new BigObject(uri) case other => other } - value } Tuple(amberSchema, fieldValues.toArray) } + /** + * Encodes a field name for BIG_OBJECT types by adding a unique system suffix. + * This ensures BIG_OBJECT fields can be identified when reading from Iceberg. + * + * @param fieldName The original field name + * @param attributeType The attribute type + * @return The encoded field name with a unique suffix for BIG_OBJECT types + */ + private def encodeBigObjectFieldName(fieldName: String, attributeType: AttributeType): String = { + if (attributeType == AttributeType.BIG_OBJECT) { + s"${fieldName}${BIG_OBJECT_FIELD_SUFFIX}" + } else { + fieldName + } + } + + /** + * Decodes a field name by removing the unique system suffix if present. + * This restores the original user-defined field name. + * + * @param fieldName The encoded field name + * @return The original field name with system suffix removed + */ + private def decodeBigObjectFieldName(fieldName: String): String = { + if (isBigObjectField(fieldName)) { + fieldName.substring(0, fieldName.length - BIG_OBJECT_FIELD_SUFFIX.length) + } else { + fieldName + } + } + + /** + * Checks if a field name indicates a BIG_OBJECT type by examining the unique suffix. + * + * @param fieldName The field name to check + * @return true if the field represents a BIG_OBJECT type, false otherwise + */ + private def isBigObjectField(fieldName: String): Boolean = { + fieldName.endsWith(BIG_OBJECT_FIELD_SUFFIX) + } + /** * Converts an Iceberg `Schema` to an Amber `Schema`. + * Field names are decoded to restore original names and detect BIG_OBJECT types. * * @param icebergSchema The Iceberg Schema. * @return The corresponding Amber Schema. @@ -290,7 +347,10 @@ object IcebergUtil { .columns() .asScala .map { field => - new Attribute(field.name(), fromIcebergType(field.`type`().asPrimitiveType())) + val fieldName = field.name() + val attributeType = fromIcebergType(field.`type`().asPrimitiveType(), fieldName) + val originalName = decodeBigObjectFieldName(fieldName) + new Attribute(originalName, attributeType) } .toList @@ -301,11 +361,16 @@ object IcebergUtil { * Converts an Iceberg `Type` to an Amber `AttributeType`. * * @param icebergType The Iceberg Type. + * @param fieldName The field name (used to detect BIG_OBJECT by suffix). * @return The corresponding Amber AttributeType. */ - def fromIcebergType(icebergType: PrimitiveType): AttributeType = { + def fromIcebergType( + icebergType: PrimitiveType, + fieldName: String = "" + ): AttributeType = { icebergType match { - case _: Types.StringType => AttributeType.STRING + case _: Types.StringType => + if (isBigObjectField(fieldName)) AttributeType.BIG_OBJECT else AttributeType.STRING case _: Types.IntegerType => AttributeType.INTEGER case _: Types.LongType => AttributeType.LONG case _: Types.DoubleType => AttributeType.DOUBLE diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectInputStream.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectInputStream.scala new file mode 100644 index 00000000000..cdc7e5b77af --- /dev/null +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectInputStream.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import org.apache.amber.core.tuple.BigObject + +import java.io.InputStream + +/** + * InputStream for reading BigObject data from S3. + * + * The underlying S3 download is lazily initialized on first read. + * The stream will fail if the S3 object doesn't exist when read is attempted. + * + * Usage: + * {{{ + * val bigObject: BigObject = ... + * try (val in = new BigObjectInputStream(bigObject)) { + * val bytes = in.readAllBytes() + * } + * }}} + */ +class BigObjectInputStream(bigObject: BigObject) extends InputStream { + + require(bigObject != null, "BigObject cannot be null") + + // Lazy initialization - downloads only when first read() is called + private lazy val underlying: InputStream = + S3StorageClient.downloadObject(bigObject.getBucketName, bigObject.getObjectKey) + + @volatile private var closed = false + + override def read(): Int = whenOpen(underlying.read()) + + override def read(b: Array[Byte], off: Int, len: Int): Int = + whenOpen(underlying.read(b, off, len)) + + override def readAllBytes(): Array[Byte] = whenOpen(underlying.readAllBytes()) + + override def readNBytes(n: Int): Array[Byte] = whenOpen(underlying.readNBytes(n)) + + override def skip(n: Long): Long = whenOpen(underlying.skip(n)) + + override def available(): Int = whenOpen(underlying.available()) + + override def close(): Unit = { + if (!closed) { + closed = true + if (underlying != null) { // Only close if initialized + underlying.close() + } + } + } + + override def markSupported(): Boolean = whenOpen(underlying.markSupported()) + + override def mark(readlimit: Int): Unit = whenOpen(underlying.mark(readlimit)) + + override def reset(): Unit = whenOpen(underlying.reset()) + + private def whenOpen[T](f: => T): T = { + if (closed) throw new java.io.IOException("Stream is closed") + f + } +} diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala new file mode 100644 index 00000000000..a6a273eb304 --- /dev/null +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import com.typesafe.scalalogging.LazyLogging + +import java.util.UUID + +/** + * Manages the lifecycle of BigObjects stored in S3. + * + * Handles creation and deletion of large objects that exceed + * normal tuple size limits. + */ +object BigObjectManager extends LazyLogging { + private val DEFAULT_BUCKET = "texera-big-objects" + + /** + * Creates a new BigObject reference. + * The actual data upload happens separately via BigObjectOutputStream. + * + * @return S3 URI string for the new BigObject (format: s3://bucket/key) + */ + def create(): String = { + S3StorageClient.createBucketIfNotExist(DEFAULT_BUCKET) + + val objectKey = s"objects/${System.currentTimeMillis()}/${UUID.randomUUID()}" + val uri = s"s3://$DEFAULT_BUCKET/$objectKey" + + uri + } + + /** + * Deletes all big objects from the bucket. + * + * @throws Exception if the deletion fails + * @return Unit + */ + def deleteAllObjects(): Unit = { + try { + S3StorageClient.deleteDirectory(DEFAULT_BUCKET, "objects") + logger.info(s"Successfully deleted all big objects from bucket: $DEFAULT_BUCKET") + } catch { + case e: Exception => + logger.warn(s"Failed to delete big objects from bucket: $DEFAULT_BUCKET", e) + } + } + +} diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectOutputStream.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectOutputStream.scala new file mode 100644 index 00000000000..80214a973f9 --- /dev/null +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectOutputStream.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import com.typesafe.scalalogging.LazyLogging +import org.apache.amber.core.tuple.BigObject + +import java.io.{IOException, OutputStream, PipedInputStream, PipedOutputStream} +import java.util.concurrent.atomic.AtomicReference +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration.Duration + +/** + * OutputStream for streaming BigObject data to S3. + * + * Data is uploaded in the background using multipart upload as you write. + * Call close() to complete the upload and ensure all data is persisted. + * + * Usage: + * {{{ + * val bigObject = new BigObject() + * try (val out = new BigObjectOutputStream(bigObject)) { + * out.write(myBytes) + * } + * // bigObject is now ready to use + * }}} + * + * Note: Not thread-safe. Do not access from multiple threads concurrently. + * + * @param bigObject The BigObject reference to write to + */ +class BigObjectOutputStream(bigObject: BigObject) extends OutputStream with LazyLogging { + + private val PIPE_BUFFER_SIZE = 64 * 1024 // 64KB + + require(bigObject != null, "BigObject cannot be null") + + private val bucketName: String = bigObject.getBucketName + private val objectKey: String = bigObject.getObjectKey + private implicit val ec: ExecutionContext = ExecutionContext.global + + // Pipe: we write to pipedOut, and S3 reads from pipedIn + private val pipedIn = new PipedInputStream(PIPE_BUFFER_SIZE) + private val pipedOut = new PipedOutputStream(pipedIn) + + @volatile private var closed = false + private val uploadException = new AtomicReference[Option[Throwable]](None) + + // Start background upload immediately + private val uploadFuture: Future[Unit] = Future { + try { + S3StorageClient.createBucketIfNotExist(bucketName) + S3StorageClient.uploadObject(bucketName, objectKey, pipedIn) + logger.debug(s"Upload completed: ${bigObject.getUri}") + } catch { + case e: Exception => + uploadException.set(Some(e)) + logger.error(s"Upload failed: ${bigObject.getUri}", e) + } finally { + pipedIn.close() + } + } + + override def write(b: Int): Unit = whenOpen(pipedOut.write(b)) + + override def write(b: Array[Byte], off: Int, len: Int): Unit = + whenOpen(pipedOut.write(b, off, len)) + + override def flush(): Unit = { + if (!closed) pipedOut.flush() + } + + /** + * Closes the stream and completes the S3 upload. + * Blocks until upload is complete. Throws IOException if upload failed. + */ + override def close(): Unit = { + if (closed) return + + closed = true + try { + pipedOut.close() + Await.result(uploadFuture, Duration.Inf) + checkUploadSuccess() + } catch { + case e: IOException => throw e + case e: Exception => + S3StorageClient.deleteObject(bucketName, objectKey) + throw new IOException(s"Failed to complete upload: ${e.getMessage}", e) + } + } + + private def whenOpen[T](f: => T): T = { + if (closed) throw new IOException("Stream is closed") + checkUploadSuccess() + f + } + + private def checkUploadSuccess(): Unit = { + uploadException.get().foreach { ex => + throw new IOException("Background upload failed", ex) + } + } +} diff --git a/file-service/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala similarity index 59% rename from file-service/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala rename to common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala index 7c157cc0aeb..8c3bc2f5f33 100644 --- a/file-service/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala @@ -24,7 +24,9 @@ import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCrede import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.model._ import software.amazon.awssdk.services.s3.{S3Client, S3Configuration} +import software.amazon.awssdk.core.sync.RequestBody +import java.io.InputStream import java.security.MessageDigest import scala.jdk.CollectionConverters._ @@ -139,4 +141,122 @@ object S3StorageClient { s3Client.deleteObjects(deleteObjectsRequest) } } + + /** + * Uploads an object to S3 using multipart upload. + * Handles streams of any size without loading into memory. + */ + def uploadObject(bucketName: String, objectKey: String, inputStream: InputStream): String = { + val buffer = new Array[Byte](MINIMUM_NUM_OF_MULTIPART_S3_PART.toInt) + + // Helper to read a full buffer from input stream + def readChunk(): Int = { + var offset = 0 + var read = 0 + while ( + offset < buffer.length && { + read = inputStream.read(buffer, offset, buffer.length - offset); read > 0 + } + ) { + offset += read + } + offset + } + + // Read first chunk to check if stream is empty + val firstChunkSize = readChunk() + if (firstChunkSize == 0) { + return s3Client + .putObject( + PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), + RequestBody.fromBytes(Array.empty[Byte]) + ) + .eTag() + } + + val uploadId = s3Client + .createMultipartUpload( + CreateMultipartUploadRequest.builder().bucket(bucketName).key(objectKey).build() + ) + .uploadId() + + var uploadSuccess = false + try { + // Upload all parts using an iterator + val allParts = Iterator + .iterate((1, firstChunkSize)) { case (partNum, _) => (partNum + 1, readChunk()) } + .takeWhile { case (_, size) => size > 0 } + .map { + case (partNumber, chunkSize) => + val eTag = s3Client + .uploadPart( + UploadPartRequest + .builder() + .bucket(bucketName) + .key(objectKey) + .uploadId(uploadId) + .partNumber(partNumber) + .build(), + RequestBody.fromBytes(buffer.take(chunkSize)) + ) + .eTag() + CompletedPart.builder().partNumber(partNumber).eTag(eTag).build() + } + .toList + + val result = s3Client + .completeMultipartUpload( + CompleteMultipartUploadRequest + .builder() + .bucket(bucketName) + .key(objectKey) + .uploadId(uploadId) + .multipartUpload(CompletedMultipartUpload.builder().parts(allParts.asJava).build()) + .build() + ) + .eTag() + + uploadSuccess = true + result + + } finally { + if (!uploadSuccess) { + try { + s3Client.abortMultipartUpload( + AbortMultipartUploadRequest + .builder() + .bucket(bucketName) + .key(objectKey) + .uploadId(uploadId) + .build() + ) + } catch { case _: Exception => } + } + } + } + + /** + * Downloads an object from S3 as an InputStream. + * + * @param bucketName The S3 bucket name. + * @param objectKey The object key (path) in S3. + * @return An InputStream containing the object data. + */ + def downloadObject(bucketName: String, objectKey: String): InputStream = { + s3Client.getObject( + GetObjectRequest.builder().bucket(bucketName).key(objectKey).build() + ) + } + + /** + * Deletes a single object from S3. + * + * @param bucketName The S3 bucket name. + * @param objectKey The object key (path) in S3. + */ + def deleteObject(bucketName: String, objectKey: String): Unit = { + s3Client.deleteObject( + DeleteObjectRequest.builder().bucket(bucketName).key(objectKey).build() + ) + } } diff --git a/common/workflow-core/src/test/scala/org/apache/amber/core/tuple/AttributeTypeUtilsSpec.scala b/common/workflow-core/src/test/scala/org/apache/amber/core/tuple/AttributeTypeUtilsSpec.scala index 53e5f68430f..492be4a3882 100644 --- a/common/workflow-core/src/test/scala/org/apache/amber/core/tuple/AttributeTypeUtilsSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/amber/core/tuple/AttributeTypeUtilsSpec.scala @@ -196,6 +196,27 @@ class AttributeTypeUtilsSpec extends AnyFunSuite { assert(parseField("anything", AttributeType.ANY) == "anything") } + test("parseField correctly parses to BIG_OBJECT") { + // Valid S3 URI strings are converted to BigObject + val pointer1 = parseField("s3://bucket/path/to/object", AttributeType.BIG_OBJECT) + .asInstanceOf[BigObject] + assert(pointer1.getUri == "s3://bucket/path/to/object") + assert(pointer1.getBucketName == "bucket") + assert(pointer1.getObjectKey == "path/to/object") + + // Null input returns null + assert(parseField(null, AttributeType.BIG_OBJECT) == null) + } + + test("BIG_OBJECT type is preserved but never inferred from data") { + // BIG_OBJECT remains BIG_OBJECT when passed as typeSoFar + assert(inferField(AttributeType.BIG_OBJECT, "any-value") == AttributeType.BIG_OBJECT) + assert(inferField(AttributeType.BIG_OBJECT, null) == AttributeType.BIG_OBJECT) + + // String data is inferred as STRING, never BIG_OBJECT + assert(inferField("s3://bucket/path") == AttributeType.STRING) + } + test("compare correctly handles null values for different attribute types") { assert(compare(null, null, INTEGER) == 0) assert(compare(null, 10, INTEGER) < 0) diff --git a/common/workflow-core/src/test/scala/org/apache/amber/util/IcebergUtilSpec.scala b/common/workflow-core/src/test/scala/org/apache/amber/util/IcebergUtilSpec.scala index ee7744e8154..0b20d5cd970 100644 --- a/common/workflow-core/src/test/scala/org/apache/amber/util/IcebergUtilSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/amber/util/IcebergUtilSpec.scala @@ -19,7 +19,7 @@ package org.apache.amber.util -import org.apache.amber.core.tuple.{AttributeType, Schema, Tuple} +import org.apache.amber.core.tuple.{AttributeType, BigObject, Schema, Tuple} import org.apache.amber.util.IcebergUtil.toIcebergSchema import org.apache.iceberg.data.GenericRecord import org.apache.iceberg.types.Types @@ -199,4 +199,102 @@ class IcebergUtilSpec extends AnyFlatSpec { assert(tuple.getField[String]("test-6") == "hello world") assert(tuple.getField[Array[Byte]]("test-7") sameElements Array[Byte](1, 2, 3, 4)) } + + // BIG_OBJECT type tests + + it should "convert BIG_OBJECT type correctly between Texera and Iceberg" in { + // BIG_OBJECT stored as StringType with field name suffix + assert(IcebergUtil.toIcebergType(AttributeType.BIG_OBJECT) == Types.StringType.get()) + assert(IcebergUtil.fromIcebergType(Types.StringType.get(), "field") == AttributeType.STRING) + assert( + IcebergUtil.fromIcebergType( + Types.StringType.get(), + "field__texera_big_obj_ptr" + ) == AttributeType.BIG_OBJECT + ) + } + + it should "convert schemas with BIG_OBJECT fields correctly" in { + val texeraSchema = Schema() + .add("id", AttributeType.INTEGER) + .add("large_data", AttributeType.BIG_OBJECT) + + val icebergSchema = IcebergUtil.toIcebergSchema(texeraSchema) + + // BIG_OBJECT field gets encoded name with suffix + assert(icebergSchema.findField("large_data__texera_big_obj_ptr") != null) + assert( + icebergSchema.findField("large_data__texera_big_obj_ptr").`type`() == Types.StringType.get() + ) + + // Round-trip preserves schema + val roundTripSchema = IcebergUtil.fromIcebergSchema(icebergSchema) + assert(roundTripSchema.getAttribute("large_data").getType == AttributeType.BIG_OBJECT) + } + + it should "convert tuples with BIG_OBJECT to records and back correctly" in { + val schema = Schema() + .add("id", AttributeType.INTEGER) + .add("large_data", AttributeType.BIG_OBJECT) + + val tuple = Tuple + .builder(schema) + .addSequentially(Array(Int.box(42), new BigObject("s3://bucket/object/key.data"))) + .build() + + val record = IcebergUtil.toGenericRecord(toIcebergSchema(schema), tuple) + + // BIG_OBJECT stored as URI string with encoded field name + assert(record.getField("id") == 42) + assert(record.getField("large_data__texera_big_obj_ptr") == "s3://bucket/object/key.data") + + // Round-trip preserves data + val roundTripTuple = IcebergUtil.fromRecord(record, schema) + assert(roundTripTuple == tuple) + + // BigObject properties are accessible + val bigObj = roundTripTuple.getField[BigObject]("large_data") + assert(bigObj.getUri == "s3://bucket/object/key.data") + assert(bigObj.getBucketName == "bucket") + assert(bigObj.getObjectKey == "object/key.data") + } + + it should "handle null BIG_OBJECT values correctly" in { + val schema = Schema().add("data", AttributeType.BIG_OBJECT) + + val tupleWithNull = Tuple.builder(schema).addSequentially(Array(null)).build() + val record = IcebergUtil.toGenericRecord(toIcebergSchema(schema), tupleWithNull) + + assert(record.getField("data__texera_big_obj_ptr") == null) + assert(IcebergUtil.fromRecord(record, schema) == tupleWithNull) + } + + it should "handle multiple BIG_OBJECT fields and mixed types correctly" in { + val schema = Schema() + .add("int_field", AttributeType.INTEGER) + .add("big_obj_1", AttributeType.BIG_OBJECT) + .add("string_field", AttributeType.STRING) + .add("big_obj_2", AttributeType.BIG_OBJECT) + + val tuple = Tuple + .builder(schema) + .addSequentially( + Array( + Int.box(123), + new BigObject("s3://bucket1/file1.dat"), + "normal string", + null // null BIG_OBJECT + ) + ) + .build() + + val record = IcebergUtil.toGenericRecord(toIcebergSchema(schema), tuple) + + assert(record.getField("int_field") == 123) + assert(record.getField("big_obj_1__texera_big_obj_ptr") == "s3://bucket1/file1.dat") + assert(record.getField("string_field") == "normal string") + assert(record.getField("big_obj_2__texera_big_obj_ptr") == null) + + assert(IcebergUtil.fromRecord(record, schema) == tuple) + } } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectInputStreamSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectInputStreamSpec.scala new file mode 100644 index 00000000000..a163326b9d8 --- /dev/null +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectInputStreamSpec.scala @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import org.apache.amber.core.tuple.BigObject +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.funsuite.AnyFunSuite + +import java.io.{ByteArrayInputStream, IOException} +import scala.util.Random + +class BigObjectInputStreamSpec + extends AnyFunSuite + with S3StorageTestBase + with BeforeAndAfterAll + with BeforeAndAfterEach { + + private val testBucketName = "test-big-object-input-stream" + + override def beforeAll(): Unit = { + super.beforeAll() + S3StorageClient.createBucketIfNotExist(testBucketName) + } + + override def afterAll(): Unit = { + try { + S3StorageClient.deleteDirectory(testBucketName, "") + } catch { + case _: Exception => // Ignore cleanup errors + } + super.afterAll() + } + + // Helper methods + private def createTestObject(key: String, data: Array[Byte]): BigObject = { + S3StorageClient.uploadObject(testBucketName, key, new ByteArrayInputStream(data)) + new BigObject(s"s3://$testBucketName/$key") + } + + private def createTestObject(key: String, data: String): BigObject = + createTestObject(key, data.getBytes) + + private def generateRandomData(size: Int): Array[Byte] = + Array.fill[Byte](size)((Random.nextInt(256) - 128).toByte) + + private def withStream[T](bigObject: BigObject)(f: BigObjectInputStream => T): T = { + val stream = new BigObjectInputStream(bigObject) + try { + f(stream) + } finally { + stream.close() + } + } + + private def assertThrowsIOExceptionWhenClosed(operation: BigObjectInputStream => Unit): Unit = { + val bigObject = createTestObject(s"test/closed-${Random.nextInt()}.txt", "data") + val stream = new BigObjectInputStream(bigObject) + stream.close() + val exception = intercept[IOException](operation(stream)) + assert(exception.getMessage.contains("Stream is closed")) + } + + // Constructor Tests + test("constructor should reject null BigObject") { + val exception = intercept[IllegalArgumentException] { + new BigObjectInputStream(null) + } + assert(exception.getMessage.contains("BigObject cannot be null")) + } + + test("constructor should accept valid BigObject") { + val bigObject = createTestObject("test/valid.txt", "test data") + withStream(bigObject) { _ => } + } + + // read() Tests + test("read() should read single bytes correctly") { + val bigObject = createTestObject("test/single-byte.txt", "Hello") + withStream(bigObject) { stream => + assert(stream.read() == 'H'.toByte) + assert(stream.read() == 'e'.toByte) + assert(stream.read() == 'l'.toByte) + assert(stream.read() == 'l'.toByte) + assert(stream.read() == 'o'.toByte) + assert(stream.read() == -1) // EOF + } + } + + test("read() should return -1 for empty object") { + val bigObject = createTestObject("test/empty.txt", "") + withStream(bigObject) { stream => + assert(stream.read() == -1) + } + } + + // read(byte[], int, int) Tests + test("read(byte[], int, int) should read data into buffer") { + val testData = "Hello, World!" + val bigObject = createTestObject("test/buffer-read.txt", testData) + withStream(bigObject) { stream => + val buffer = new Array[Byte](testData.length) + val bytesRead = stream.read(buffer, 0, buffer.length) + assert(bytesRead == testData.length) + assert(new String(buffer) == testData) + } + } + + test("read(byte[], int, int) should handle partial reads and offsets") { + val testData = "Hello, World!" + val bigObject = createTestObject("test/partial.txt", testData) + withStream(bigObject) { stream => + // Test partial read + val buffer1 = new Array[Byte](5) + assert(stream.read(buffer1, 0, 5) == 5) + assert(new String(buffer1) == "Hello") + } + + // Test offset + withStream(bigObject) { stream => + val buffer2 = new Array[Byte](20) + assert(stream.read(buffer2, 5, 10) == 10) + assert(new String(buffer2, 5, 10) == "Hello, Wor") + } + } + + test("read(byte[], int, int) should return -1 at EOF") { + val bigObject = createTestObject("test/eof.txt", "test") + withStream(bigObject) { stream => + val buffer = new Array[Byte](10) + stream.read(buffer, 0, 10) + assert(stream.read(buffer, 0, 10) == -1) + } + } + + // readAllBytes() Tests + test("readAllBytes() should read entire object") { + val testData = "Hello, World! This is a test." + val bigObject = createTestObject("test/read-all.txt", testData) + withStream(bigObject) { stream => + assert(new String(stream.readAllBytes()) == testData) + } + } + + test("readAllBytes() should handle large objects") { + val largeData = generateRandomData(1024 * 1024) // 1MB + val bigObject = createTestObject("test/large.bin", largeData) + withStream(bigObject) { stream => + val bytes = stream.readAllBytes() + assert(bytes.length == largeData.length) + assert(bytes.sameElements(largeData)) + } + } + + test("readAllBytes() should return empty array for empty object") { + val bigObject = createTestObject("test/empty-all.txt", "") + withStream(bigObject) { stream => + assert(stream.readAllBytes().length == 0) + } + } + + // readNBytes() Tests + test("readNBytes() should read exactly N bytes") { + val testData = "Hello, World! This is a test." + val bigObject = createTestObject("test/read-n.txt", testData) + withStream(bigObject) { stream => + val bytes = stream.readNBytes(5) + assert(bytes.length == 5) + assert(new String(bytes) == "Hello") + } + } + + test("readNBytes() should handle EOF and zero") { + val bigObject = createTestObject("test/read-n-eof.txt", "Hello") + withStream(bigObject) { stream => + // Request more than available + val bytes = stream.readNBytes(100) + assert(bytes.length == 5) + assert(new String(bytes) == "Hello") + } + + // Test n=0 + withStream(bigObject) { stream => + assert(stream.readNBytes(0).length == 0) + } + } + + // skip() Tests + test("skip() should skip bytes correctly") { + val bigObject = createTestObject("test/skip.txt", "Hello, World!") + withStream(bigObject) { stream => + assert(stream.skip(7) == 7) + assert(stream.read() == 'W'.toByte) + } + } + + test("skip() should handle EOF and zero") { + val bigObject = createTestObject("test/skip-eof.txt", "Hello") + withStream(bigObject) { stream => + assert(stream.skip(100) == 5) + assert(stream.read() == -1) + } + + // Test n=0 + withStream(bigObject) { stream => + assert(stream.skip(0) == 0) + } + } + + // available() Tests + test("available() should return non-negative value") { + val bigObject = createTestObject("test/available.txt", "Hello, World!") + withStream(bigObject) { stream => + assert(stream.available() >= 0) + } + } + + // close() Tests + test("close() should be idempotent") { + val bigObject = createTestObject("test/close-idempotent.txt", "data") + val stream = new BigObjectInputStream(bigObject) + stream.close() + stream.close() // Should not throw + stream.close() // Should not throw + } + + test("close() should prevent further operations") { + val bigObject = createTestObject("test/close-prevents.txt", "data") + val stream = new BigObjectInputStream(bigObject) + stream.close() + + intercept[IOException] { stream.read() } + intercept[IOException] { stream.readAllBytes() } + intercept[IOException] { stream.readNBytes(10) } + intercept[IOException] { stream.skip(10) } + intercept[IOException] { stream.available() } + } + + test("close() should work without reading (lazy initialization)") { + val bigObject = createTestObject("test/close-lazy.txt", "data") + val stream = new BigObjectInputStream(bigObject) + stream.close() // Should not throw + } + + // Closed stream tests - consolidated + test("operations should throw IOException when stream is closed") { + assertThrowsIOExceptionWhenClosed(_.read()) + assertThrowsIOExceptionWhenClosed(_.read(new Array[Byte](10), 0, 10)) + assertThrowsIOExceptionWhenClosed(_.readAllBytes()) + assertThrowsIOExceptionWhenClosed(_.readNBytes(10)) + assertThrowsIOExceptionWhenClosed(_.skip(10)) + assertThrowsIOExceptionWhenClosed(_.available()) + assertThrowsIOExceptionWhenClosed(_.mark(100)) + assertThrowsIOExceptionWhenClosed(_.reset()) + } + + // mark/reset Tests + test("markSupported() should delegate to underlying stream") { + val bigObject = createTestObject("test/mark.txt", "data") + withStream(bigObject) { stream => + val supported = stream.markSupported() + assert(!supported || supported) // Just verify it's callable + } + } + + test("mark() and reset() should delegate to underlying stream") { + val bigObject = createTestObject("test/mark-reset.txt", "data") + withStream(bigObject) { stream => + if (stream.markSupported()) { + stream.mark(100) + stream.read() + stream.reset() + } + // If not supported, methods should still be callable + } + } + + // Lazy initialization Tests + test("lazy initialization should not download until first read") { + val bigObject = createTestObject("test/lazy-init.txt", "data") + val stream = new BigObjectInputStream(bigObject) + // Creating the stream should not trigger download + // Reading should trigger download + try { + assert(stream.read() == 'd'.toByte) + } finally { + stream.close() + } + } + + // Integration Tests + test("should handle chunked reading of large objects") { + val largeData = generateRandomData(10 * 1024) // 10KB + val bigObject = createTestObject("test/chunked.bin", largeData) + withStream(bigObject) { stream => + val buffer = new Array[Byte](1024) + val output = new java.io.ByteArrayOutputStream() + var bytesRead = 0 + + while ({ + bytesRead = stream.read(buffer, 0, buffer.length) + bytesRead != -1 + }) { + output.write(buffer, 0, bytesRead) + } + + val result = output.toByteArray + assert(result.length == largeData.length) + assert(result.sameElements(largeData)) + } + } + + test("should handle multiple streams reading same object") { + val testData = "Shared data" + val bigObject = createTestObject("test/shared.txt", testData) + + val stream1 = new BigObjectInputStream(bigObject) + val stream2 = new BigObjectInputStream(bigObject) + + try { + assert(new String(stream1.readAllBytes()) == testData) + assert(new String(stream2.readAllBytes()) == testData) + } finally { + stream1.close() + stream2.close() + } + } + + test("should preserve binary data integrity") { + val binaryData = Array[Byte](0, 1, 2, 127, -128, -1, 50, 100) + val bigObject = createTestObject("test/binary.bin", binaryData) + withStream(bigObject) { stream => + assert(stream.readAllBytes().sameElements(binaryData)) + } + } +} diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectManagerSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectManagerSpec.scala new file mode 100644 index 00000000000..ce1d4f4e691 --- /dev/null +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectManagerSpec.scala @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import org.apache.amber.core.tuple.BigObject +import org.scalatest.funsuite.AnyFunSuite + +class BigObjectManagerSpec extends AnyFunSuite with S3StorageTestBase { + + /** Creates a big object from string data and returns it. */ + private def createBigObject(data: String): BigObject = { + val bigObject = new BigObject() + val out = new BigObjectOutputStream(bigObject) + try { + out.write(data.getBytes) + } finally { + out.close() + } + bigObject + } + + /** Verifies standard bucket name. */ + private def assertStandardBucket(pointer: BigObject): Unit = { + assert(pointer.getBucketName == "texera-big-objects") + assert(pointer.getUri.startsWith("s3://texera-big-objects/")) + } + + // ======================================== + // BigObjectInputStream Tests (Standard Java InputStream) + // ======================================== + + test("BigObjectInputStream should read all bytes from stream") { + val data = "Hello, World! This is a test." + val bigObject = createBigObject(data) + + val stream = new BigObjectInputStream(bigObject) + assert(stream.readAllBytes().sameElements(data.getBytes)) + stream.close() + + BigObjectManager.deleteAllObjects() + } + + test("BigObjectInputStream should read exact number of bytes") { + val bigObject = createBigObject("0123456789ABCDEF") + + val stream = new BigObjectInputStream(bigObject) + val result = stream.readNBytes(10) + + assert(result.length == 10) + assert(result.sameElements("0123456789".getBytes)) + stream.close() + + BigObjectManager.deleteAllObjects() + } + + test("BigObjectInputStream should handle reading more bytes than available") { + val data = "Short" + val bigObject = createBigObject(data) + + val stream = new BigObjectInputStream(bigObject) + val result = stream.readNBytes(100) + + assert(result.length == data.length) + assert(result.sameElements(data.getBytes)) + stream.close() + + BigObjectManager.deleteAllObjects() + } + + test("BigObjectInputStream should support standard single-byte read") { + val bigObject = createBigObject("ABC") + + val stream = new BigObjectInputStream(bigObject) + assert(stream.read() == 65) // 'A' + assert(stream.read() == 66) // 'B' + assert(stream.read() == 67) // 'C' + assert(stream.read() == -1) // EOF + stream.close() + + BigObjectManager.deleteAllObjects() + } + + test("BigObjectInputStream should return -1 at EOF") { + val bigObject = createBigObject("EOF") + + val stream = new BigObjectInputStream(bigObject) + stream.readAllBytes() // Read all data + assert(stream.read() == -1) + stream.close() + + BigObjectManager.deleteAllObjects() + } + + test("BigObjectInputStream should throw exception when reading from closed stream") { + val bigObject = createBigObject("test") + + val stream = new BigObjectInputStream(bigObject) + stream.close() + + assertThrows[java.io.IOException](stream.read()) + assertThrows[java.io.IOException](stream.readAllBytes()) + + BigObjectManager.deleteAllObjects() + } + + test("BigObjectInputStream should handle multiple close calls") { + val bigObject = createBigObject("test") + + val stream = new BigObjectInputStream(bigObject) + stream.close() + stream.close() // Should not throw + + BigObjectManager.deleteAllObjects() + } + + test("BigObjectInputStream should read large data correctly") { + val largeData = Array.fill[Byte](20000)((scala.util.Random.nextInt(256) - 128).toByte) + val bigObject = new BigObject() + val out = new BigObjectOutputStream(bigObject) + try { + out.write(largeData) + } finally { + out.close() + } + + val stream = new BigObjectInputStream(bigObject) + val result = stream.readAllBytes() + assert(result.sameElements(largeData)) + stream.close() + + BigObjectManager.deleteAllObjects() + } + + // ======================================== + // BigObjectManager Tests + // ======================================== + + test("BigObjectManager should create a big object") { + val pointer = createBigObject("Test big object data") + + assertStandardBucket(pointer) + } + + test("BigObjectInputStream should open and read a big object") { + val data = "Hello from big object!" + val pointer = createBigObject(data) + + val stream = new BigObjectInputStream(pointer) + val readData = stream.readAllBytes() + stream.close() + + assert(readData.sameElements(data.getBytes)) + } + + test("BigObjectInputStream should fail to open non-existent big object") { + val fakeBigObject = new BigObject("s3://texera-big-objects/nonexistent/file") + val stream = new BigObjectInputStream(fakeBigObject) + + try { + intercept[Exception] { + stream.read() + } + } finally { + try { stream.close() } + catch { case _: Exception => } + } + } + + test("BigObjectManager should delete all big objects") { + val pointer1 = new BigObject() + val out1 = new BigObjectOutputStream(pointer1) + try { + out1.write("Object 1".getBytes) + } finally { + out1.close() + } + + val pointer2 = new BigObject() + val out2 = new BigObjectOutputStream(pointer2) + try { + out2.write("Object 2".getBytes) + } finally { + out2.close() + } + + BigObjectManager.deleteAllObjects() + } + + test("BigObjectManager should handle delete with no objects gracefully") { + BigObjectManager.deleteAllObjects() // Should not throw exception + } + + test("BigObjectManager should delete all objects") { + val pointer1 = createBigObject("Test data") + val pointer2 = createBigObject("Test data") + + BigObjectManager.deleteAllObjects() + } + + test("BigObjectManager should create bucket if it doesn't exist") { + val pointer = createBigObject("Test bucket creation") + + assertStandardBucket(pointer) + + BigObjectManager.deleteAllObjects() + } + + test("BigObjectManager should handle large objects correctly") { + val largeData = Array.fill[Byte](6 * 1024 * 1024)((scala.util.Random.nextInt(256) - 128).toByte) + val pointer = new BigObject() + val out = new BigObjectOutputStream(pointer) + try { + out.write(largeData) + } finally { + out.close() + } + + val stream = new BigObjectInputStream(pointer) + val readData = stream.readAllBytes() + stream.close() + + assert(readData.sameElements(largeData)) + BigObjectManager.deleteAllObjects() + } + + test("BigObjectManager should generate unique URIs for different objects") { + val testData = "Unique URI test".getBytes + val pointer1 = new BigObject() + val out1 = new BigObjectOutputStream(pointer1) + try { + out1.write(testData) + } finally { + out1.close() + } + + val pointer2 = new BigObject() + val out2 = new BigObjectOutputStream(pointer2) + try { + out2.write(testData) + } finally { + out2.close() + } + + assert(pointer1.getUri != pointer2.getUri) + assert(pointer1.getObjectKey != pointer2.getObjectKey) + + BigObjectManager.deleteAllObjects() + } + + test("BigObjectInputStream should handle multiple reads from the same big object") { + val data = "Multiple reads test data" + val pointer = createBigObject(data) + + val stream1 = new BigObjectInputStream(pointer) + val readData1 = stream1.readAllBytes() + stream1.close() + + val stream2 = new BigObjectInputStream(pointer) + val readData2 = stream2.readAllBytes() + stream2.close() + + assert(readData1.sameElements(data.getBytes)) + assert(readData2.sameElements(data.getBytes)) + + BigObjectManager.deleteAllObjects() + } + + test("BigObjectManager should properly parse bucket name and object key from big object") { + val bigObject = createBigObject("URI parsing test") + + assertStandardBucket(bigObject) + assert(bigObject.getObjectKey.nonEmpty) + assert(!bigObject.getObjectKey.startsWith("/")) + + BigObjectManager.deleteAllObjects() + } + + // ======================================== + // Object-Oriented API Tests + // ======================================== + + test("BigObject with BigObjectOutputStream should create a big object") { + val data = "Test data for BigObject with BigObjectOutputStream" + + val bigObject = new BigObject() + val out = new BigObjectOutputStream(bigObject) + try { + out.write(data.getBytes) + } finally { + out.close() + } + + assertStandardBucket(bigObject) + + BigObjectManager.deleteAllObjects() + } + + test("BigObjectInputStream constructor should read big object contents") { + val data = "Test data for BigObjectInputStream constructor" + val bigObject = createBigObject(data) + + val stream = new BigObjectInputStream(bigObject) + val readData = stream.readAllBytes() + stream.close() + + assert(readData.sameElements(data.getBytes)) + + BigObjectManager.deleteAllObjects() + } + + test("BigObjectOutputStream and BigObjectInputStream should work together end-to-end") { + val data = "End-to-end test data" + + // Create using streaming API + val bigObject = new BigObject() + val out = new BigObjectOutputStream(bigObject) + try { + out.write(data.getBytes) + } finally { + out.close() + } + + // Read using standard constructor + val stream = new BigObjectInputStream(bigObject) + val readData = stream.readAllBytes() + stream.close() + + assert(readData.sameElements(data.getBytes)) + + BigObjectManager.deleteAllObjects() + } + + // ======================================== + // BigObjectOutputStream Tests (New Symmetric API) + // ======================================== + + test("BigObjectOutputStream should write and upload data to S3") { + val data = "Test data for BigObjectOutputStream" + + val bigObject = new BigObject() + val outStream = new BigObjectOutputStream(bigObject) + outStream.write(data.getBytes) + outStream.close() + + assertStandardBucket(bigObject) + + // Verify data can be read back + val inStream = new BigObjectInputStream(bigObject) + val readData = inStream.readAllBytes() + inStream.close() + + assert(readData.sameElements(data.getBytes)) + + BigObjectManager.deleteAllObjects() + } + + test("BigObjectOutputStream should create big object") { + val data = "Database registration test" + + val bigObject = new BigObject() + val outStream = new BigObjectOutputStream(bigObject) + outStream.write(data.getBytes) + outStream.close() + + assertStandardBucket(bigObject) + + BigObjectManager.deleteAllObjects() + } + + test("BigObjectOutputStream should handle large data correctly") { + val largeData = Array.fill[Byte](8 * 1024 * 1024)((scala.util.Random.nextInt(256) - 128).toByte) + + val bigObject = new BigObject() + val outStream = new BigObjectOutputStream(bigObject) + outStream.write(largeData) + outStream.close() + + // Verify data integrity + val inStream = new BigObjectInputStream(bigObject) + val readData = inStream.readAllBytes() + inStream.close() + + assert(readData.sameElements(largeData)) + + BigObjectManager.deleteAllObjects() + } + + test("BigObjectOutputStream should handle multiple writes") { + val bigObject = new BigObject() + val outStream = new BigObjectOutputStream(bigObject) + outStream.write("Hello ".getBytes) + outStream.write("World".getBytes) + outStream.write("!".getBytes) + outStream.close() + + val inStream = new BigObjectInputStream(bigObject) + val readData = inStream.readAllBytes() + inStream.close() + + assert(readData.sameElements("Hello World!".getBytes)) + + BigObjectManager.deleteAllObjects() + } + + test("BigObjectOutputStream should throw exception when writing to closed stream") { + val bigObject = new BigObject() + val outStream = new BigObjectOutputStream(bigObject) + outStream.write("test".getBytes) + outStream.close() + + assertThrows[java.io.IOException](outStream.write("more".getBytes)) + + BigObjectManager.deleteAllObjects() + } + + test("BigObjectOutputStream should handle close() being called multiple times") { + val bigObject = new BigObject() + val outStream = new BigObjectOutputStream(bigObject) + outStream.write("test".getBytes) + outStream.close() + outStream.close() // Should not throw + + BigObjectManager.deleteAllObjects() + } + + test("New BigObject() constructor should create unique URIs") { + val bigObject1 = new BigObject() + val bigObject2 = new BigObject() + + assert(bigObject1.getUri != bigObject2.getUri) + assert(bigObject1.getObjectKey != bigObject2.getObjectKey) + + BigObjectManager.deleteAllObjects() + } + + test("BigObject() and BigObjectOutputStream API should be symmetric with input") { + val data = "Symmetric API test" + + // Write using new symmetric API + val bigObject = new BigObject() + val outStream = new BigObjectOutputStream(bigObject) + outStream.write(data.getBytes) + outStream.close() + + // Read using symmetric API + val inStream = new BigObjectInputStream(bigObject) + val readData = inStream.readAllBytes() + inStream.close() + + assert(readData.sameElements(data.getBytes)) + + BigObjectManager.deleteAllObjects() + } +} diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectOutputStreamSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectOutputStreamSpec.scala new file mode 100644 index 00000000000..14fdfa1ddb0 --- /dev/null +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/BigObjectOutputStreamSpec.scala @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import org.apache.amber.core.tuple.BigObject +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.funsuite.AnyFunSuite + +import java.io.IOException +import scala.util.Random + +class BigObjectOutputStreamSpec + extends AnyFunSuite + with S3StorageTestBase + with BeforeAndAfterAll + with BeforeAndAfterEach { + + private val testBucketName = "test-big-object-output-stream" + + override def beforeAll(): Unit = { + super.beforeAll() + S3StorageClient.createBucketIfNotExist(testBucketName) + } + + override def afterAll(): Unit = { + try { + S3StorageClient.deleteDirectory(testBucketName, "") + } catch { + case _: Exception => // Ignore cleanup errors + } + super.afterAll() + } + + // Helper methods + private def createBigObject(key: String): BigObject = + new BigObject(s"s3://$testBucketName/$key") + + private def generateRandomData(size: Int): Array[Byte] = + Array.fill[Byte](size)((Random.nextInt(256) - 128).toByte) + + private def withStream[T](bigObject: BigObject)(f: BigObjectOutputStream => T): T = { + val stream = new BigObjectOutputStream(bigObject) + try f(stream) + finally stream.close() + } + + private def readBack(bigObject: BigObject): Array[Byte] = { + val inputStream = new BigObjectInputStream(bigObject) + try inputStream.readAllBytes() + finally inputStream.close() + } + + private def writeAndVerify(key: String, data: Array[Byte]): Unit = { + val bigObject = createBigObject(key) + withStream(bigObject)(_.write(data, 0, data.length)) + assert(readBack(bigObject).sameElements(data)) + } + + // === Constructor Tests === + test("should reject null BigObject") { + val exception = intercept[IllegalArgumentException](new BigObjectOutputStream(null)) + assert(exception.getMessage.contains("BigObject cannot be null")) + } + + // === Basic Write Tests === + test("should write single bytes correctly") { + val bigObject = createBigObject("test/single-bytes.txt") + withStream(bigObject) { stream => + "Hello".foreach(c => stream.write(c.toByte)) + } + assert(new String(readBack(bigObject)) == "Hello") + } + + test("should write byte arrays correctly") { + val testData = "Hello, World!".getBytes + writeAndVerify("test/array-write.txt", testData) + } + + test("should handle partial writes with offset and length") { + val testData = "Hello, World!".getBytes + val bigObject = createBigObject("test/partial-write.txt") + + withStream(bigObject) { stream => + stream.write(testData, 0, 5) // "Hello" + stream.write(testData, 7, 5) // "World" + } + + assert(new String(readBack(bigObject)) == "HelloWorld") + } + + test("should handle multiple consecutive writes") { + val bigObject = createBigObject("test/multiple-writes.txt") + withStream(bigObject) { stream => + stream.write("Hello".getBytes) + stream.write(", ".getBytes) + stream.write("World!".getBytes) + } + assert(new String(readBack(bigObject)) == "Hello, World!") + } + + // === Stream Lifecycle Tests === + test("flush should not throw") { + val bigObject = createBigObject("test/flush.txt") + withStream(bigObject) { stream => + stream.write("test".getBytes) + stream.flush() + stream.write(" data".getBytes) + } + assert(new String(readBack(bigObject)) == "test data") + } + + test("close should be idempotent") { + val bigObject = createBigObject("test/close-idempotent.txt") + val stream = new BigObjectOutputStream(bigObject) + stream.write("data".getBytes) + stream.close() + stream.close() // Should not throw + stream.flush() // Should not throw after close + assert(new String(readBack(bigObject)) == "data") + } + + test("close should handle empty stream") { + val bigObject = createBigObject("test/empty-stream.txt") + val stream = new BigObjectOutputStream(bigObject) + stream.close() + assert(readBack(bigObject).length == 0) + } + + // === Error Handling === + test("write operations should throw IOException when stream is closed") { + val bigObject = createBigObject("test/closed-stream.txt") + val stream = new BigObjectOutputStream(bigObject) + stream.close() + + val ex1 = intercept[IOException](stream.write('A'.toByte)) + assert(ex1.getMessage.contains("Stream is closed")) + + val ex2 = intercept[IOException](stream.write("test".getBytes)) + assert(ex2.getMessage.contains("Stream is closed")) + } + + // === Large Data Tests === + test("should handle large data (1MB)") { + val largeData = generateRandomData(1024 * 1024) + writeAndVerify("test/large-1mb.bin", largeData) + } + + test("should handle very large data (10MB)") { + val veryLargeData = generateRandomData(10 * 1024 * 1024) + writeAndVerify("test/large-10mb.bin", veryLargeData) + } + + test("should handle chunked writes") { + val totalSize = 1024 * 1024 // 1MB + val chunkSize = 8 * 1024 // 8KB + val data = generateRandomData(totalSize) + val bigObject = createBigObject("test/chunked.bin") + + withStream(bigObject) { stream => + data.grouped(chunkSize).foreach(chunk => stream.write(chunk)) + } + + assert(readBack(bigObject).sameElements(data)) + } + + // === Binary Data Tests === + test("should preserve all byte values (0-255)") { + val allBytes = (0 until 256).map(_.toByte).toArray + writeAndVerify("test/all-bytes.bin", allBytes) + } + + // === Integration Tests === + test("should handle concurrent writes to different objects") { + val streams = (1 to 3).map { i => + val obj = createBigObject(s"test/concurrent-$i.txt") + val stream = new BigObjectOutputStream(obj) + (obj, stream, s"Data $i") + } + + try { + streams.foreach { case (_, stream, data) => stream.write(data.getBytes) } + } finally { + streams.foreach(_._2.close()) + } + + streams.foreach { + case (obj, _, expected) => + assert(new String(readBack(obj)) == expected) + } + } + + test("should overwrite existing object") { + val bigObject = createBigObject("test/overwrite.txt") + withStream(bigObject)(_.write("original data".getBytes)) + withStream(bigObject)(_.write("new data".getBytes)) + assert(new String(readBack(bigObject)) == "new data") + } + + test("should handle mixed write operations") { + val bigObject = createBigObject("test/mixed-writes.txt") + withStream(bigObject) { stream => + stream.write('A'.toByte) + stream.write(" test ".getBytes) + stream.write('B'.toByte) + val data = "Hello, World!".getBytes + stream.write(data, 7, 6) // "World!" + } + assert(new String(readBack(bigObject)) == "A test BWorld!") + } + + // === Edge Cases === + test("should create bucket automatically") { + val newBucketName = s"new-bucket-${Random.nextInt(10000)}" + val bigObject = new BigObject(s"s3://$newBucketName/test/auto-create.txt") + + try { + withStream(bigObject)(_.write("test".getBytes)) + assert(new String(readBack(bigObject)) == "test") + } finally { + try S3StorageClient.deleteDirectory(newBucketName, "") + catch { case _: Exception => /* ignore */ } + } + } + + test("should handle rapid open/close cycles") { + (1 to 10).foreach { i => + withStream(createBigObject(s"test/rapid-$i.txt"))(_.write(s"data-$i".getBytes)) + } + + (1 to 10).foreach { i => + val result = readBack(createBigObject(s"test/rapid-$i.txt")) + assert(new String(result) == s"data-$i") + } + } +} diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala new file mode 100644 index 00000000000..a1662cf8c3f --- /dev/null +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.funsuite.AnyFunSuite + +import java.io.ByteArrayInputStream +import scala.util.Random + +class S3StorageClientSpec + extends AnyFunSuite + with S3StorageTestBase + with BeforeAndAfterAll + with BeforeAndAfterEach { + + private val testBucketName = "test-s3-storage-client" + + override def beforeAll(): Unit = { + super.beforeAll() + S3StorageClient.createBucketIfNotExist(testBucketName) + } + + override def afterAll(): Unit = { + // Clean up test bucket + try { + S3StorageClient.deleteDirectory(testBucketName, "") + } catch { + case _: Exception => // Ignore cleanup errors + } + super.afterAll() + } + + // Helper methods + private def createInputStream(data: String): ByteArrayInputStream = { + new ByteArrayInputStream(data.getBytes) + } + + private def createInputStream(data: Array[Byte]): ByteArrayInputStream = { + new ByteArrayInputStream(data) + } + + private def readInputStream(inputStream: java.io.InputStream): Array[Byte] = { + val buffer = new Array[Byte](8192) + val outputStream = new java.io.ByteArrayOutputStream() + var bytesRead = 0 + while ({ + bytesRead = inputStream.read(buffer); bytesRead != -1 + }) { + outputStream.write(buffer, 0, bytesRead) + } + outputStream.toByteArray + } + + // ======================================== + // uploadObject Tests + // ======================================== + + test("uploadObject should upload a small object successfully") { + val testData = "Hello, World! This is a small test object." + val objectKey = "test/small-object.txt" + + val eTag = S3StorageClient.uploadObject(testBucketName, objectKey, createInputStream(testData)) + + assert(eTag != null) + assert(eTag.nonEmpty) + + // Clean up + S3StorageClient.deleteObject(testBucketName, objectKey) + } + + test("uploadObject should upload an empty object") { + val objectKey = "test/empty-object.txt" + + val eTag = S3StorageClient.uploadObject(testBucketName, objectKey, createInputStream("")) + + assert(eTag != null) + + // Clean up + S3StorageClient.deleteObject(testBucketName, objectKey) + } + + test("uploadObject should upload a large object using multipart upload") { + // Create data larger than MINIMUM_NUM_OF_MULTIPART_S3_PART (5MB) + val largeData = Array.fill[Byte](6 * 1024 * 1024)((Random.nextInt(256) - 128).toByte) + val objectKey = "test/large-object.bin" + + val eTag = S3StorageClient.uploadObject(testBucketName, objectKey, createInputStream(largeData)) + + assert(eTag != null) + assert(eTag.nonEmpty) + + // Verify the uploaded content + val downloadedStream = S3StorageClient.downloadObject(testBucketName, objectKey) + val downloadedData = readInputStream(downloadedStream) + downloadedStream.close() + + assert(downloadedData.length == largeData.length) + assert(downloadedData.sameElements(largeData)) + + // Clean up + S3StorageClient.deleteObject(testBucketName, objectKey) + } + + test("uploadObject should handle objects with special characters in key") { + val testData = "Testing special characters" + val objectKey = "test/special-chars/file with spaces & symbols!@#.txt" + + val eTag = S3StorageClient.uploadObject(testBucketName, objectKey, createInputStream(testData)) + + assert(eTag != null) + + // Clean up + S3StorageClient.deleteObject(testBucketName, objectKey) + } + + test("uploadObject should overwrite existing object") { + val objectKey = "test/overwrite-test.txt" + val data1 = "Original data" + val data2 = "Updated data" + + S3StorageClient.uploadObject(testBucketName, objectKey, createInputStream(data1)) + val eTag2 = S3StorageClient.uploadObject(testBucketName, objectKey, createInputStream(data2)) + + assert(eTag2 != null) + + val downloadedStream = S3StorageClient.downloadObject(testBucketName, objectKey) + val downloadedData = new String(readInputStream(downloadedStream)) + downloadedStream.close() + + assert(downloadedData == data2) + + // Clean up + S3StorageClient.deleteObject(testBucketName, objectKey) + } + + // ======================================== + // downloadObject Tests + // ======================================== + + test("downloadObject should download an object successfully") { + val testData = "This is test data for download." + val objectKey = "test/download-test.txt" + + S3StorageClient.uploadObject(testBucketName, objectKey, createInputStream(testData)) + + val inputStream = S3StorageClient.downloadObject(testBucketName, objectKey) + val downloadedData = new String(readInputStream(inputStream)) + inputStream.close() + + assert(downloadedData == testData) + + // Clean up + S3StorageClient.deleteObject(testBucketName, objectKey) + } + + test("downloadObject should download large objects correctly") { + val largeData = Array.fill[Byte](10 * 1024 * 1024)((Random.nextInt(256) - 128).toByte) + val objectKey = "test/large-download-test.bin" + + S3StorageClient.uploadObject(testBucketName, objectKey, createInputStream(largeData)) + + val inputStream = S3StorageClient.downloadObject(testBucketName, objectKey) + val downloadedData = readInputStream(inputStream) + inputStream.close() + + assert(downloadedData.length == largeData.length) + assert(downloadedData.sameElements(largeData)) + + // Clean up + S3StorageClient.deleteObject(testBucketName, objectKey) + } + + test("downloadObject should download empty objects") { + val objectKey = "test/empty-download-test.txt" + + S3StorageClient.uploadObject(testBucketName, objectKey, createInputStream("")) + + val inputStream = S3StorageClient.downloadObject(testBucketName, objectKey) + val downloadedData = readInputStream(inputStream) + inputStream.close() + + assert(downloadedData.isEmpty) + + // Clean up + S3StorageClient.deleteObject(testBucketName, objectKey) + } + + test("downloadObject should throw exception for non-existent object") { + val nonExistentKey = "test/non-existent-object.txt" + + assertThrows[Exception] { + S3StorageClient.downloadObject(testBucketName, nonExistentKey) + } + } + + test("downloadObject should handle binary data correctly") { + val binaryData = Array[Byte](0, 1, 2, 127, -128, -1, 64, 32, 16, 8, 4, 2, 1) + val objectKey = "test/binary-data.bin" + + S3StorageClient.uploadObject(testBucketName, objectKey, createInputStream(binaryData)) + + val inputStream = S3StorageClient.downloadObject(testBucketName, objectKey) + val downloadedData = readInputStream(inputStream) + inputStream.close() + + assert(downloadedData.sameElements(binaryData)) + + // Clean up + S3StorageClient.deleteObject(testBucketName, objectKey) + } + + // ======================================== + // deleteObject Tests + // ======================================== + + test("deleteObject should delete an existing object") { + val objectKey = "test/delete-test.txt" + S3StorageClient.uploadObject(testBucketName, objectKey, createInputStream("delete me")) + + S3StorageClient.deleteObject(testBucketName, objectKey) + + // Verify deletion by attempting to download + assertThrows[Exception] { + S3StorageClient.downloadObject(testBucketName, objectKey) + } + } + + test("deleteObject should not throw exception for non-existent object") { + val nonExistentKey = "test/already-deleted.txt" + + // Should not throw exception + S3StorageClient.deleteObject(testBucketName, nonExistentKey) + } + + test("deleteObject should delete large objects") { + val largeData = Array.fill[Byte](7 * 1024 * 1024)((Random.nextInt(256) - 128).toByte) + val objectKey = "test/large-delete-test.bin" + + S3StorageClient.uploadObject(testBucketName, objectKey, createInputStream(largeData)) + + S3StorageClient.deleteObject(testBucketName, objectKey) + + // Verify deletion by attempting to download + assertThrows[Exception] { + S3StorageClient.downloadObject(testBucketName, objectKey) + } + } + + test("deleteObject should handle multiple deletions of the same object") { + val objectKey = "test/multi-delete-test.txt" + S3StorageClient.uploadObject( + testBucketName, + objectKey, + createInputStream("delete multiple times") + ) + + S3StorageClient.deleteObject(testBucketName, objectKey) + + // Second delete should not throw exception + S3StorageClient.deleteObject(testBucketName, objectKey) + } + + // ======================================== + // Integration Tests (combining methods) + // ======================================== + + test("upload, download, and delete workflow should work correctly") { + val testData = "Complete workflow test data" + val objectKey = "test/workflow-test.txt" + + // Upload + val eTag = S3StorageClient.uploadObject(testBucketName, objectKey, createInputStream(testData)) + assert(eTag != null) + + // Download + val inputStream = S3StorageClient.downloadObject(testBucketName, objectKey) + val downloadedData = new String(readInputStream(inputStream)) + inputStream.close() + assert(downloadedData == testData) + + // Delete + S3StorageClient.deleteObject(testBucketName, objectKey) + } + + test("multiple objects can be managed independently") { + val objects = Map( + "test/object1.txt" -> "Data for object 1", + "test/object2.txt" -> "Data for object 2", + "test/object3.txt" -> "Data for object 3" + ) + + // Upload all objects + objects.foreach { + case (key, data) => + S3StorageClient.uploadObject(testBucketName, key, createInputStream(data)) + } + + // Delete one object + S3StorageClient.deleteObject(testBucketName, "test/object2.txt") + + // Clean up remaining objects + S3StorageClient.deleteObject(testBucketName, "test/object1.txt") + S3StorageClient.deleteObject(testBucketName, "test/object3.txt") + } + + test("objects with nested paths should be handled correctly") { + val objectKey = "test/deeply/nested/path/to/object.txt" + val testData = "Nested path test" + + S3StorageClient.uploadObject(testBucketName, objectKey, createInputStream(testData)) + + val inputStream = S3StorageClient.downloadObject(testBucketName, objectKey) + val downloadedData = new String(readInputStream(inputStream)) + inputStream.close() + assert(downloadedData == testData) + + S3StorageClient.deleteObject(testBucketName, objectKey) + } +} diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageTestBase.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageTestBase.scala new file mode 100644 index 00000000000..ad80e6c40eb --- /dev/null +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageTestBase.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import com.dimafeng.testcontainers.MinIOContainer +import org.apache.amber.config.StorageConfig +import org.scalatest.{BeforeAndAfterAll, Suite} +import org.testcontainers.utility.DockerImageName + +/** + * Base trait for tests requiring S3 storage (MinIO). + * Provides access to a single shared MinIO container across all test suites. + * + * Usage: Mix this trait into any test suite that needs S3 storage. + */ +trait S3StorageTestBase extends BeforeAndAfterAll { this: Suite => + + override def beforeAll(): Unit = { + super.beforeAll() + // Trigger lazy initialization of shared container + S3StorageTestBase.ensureContainerStarted() + } +} + +object S3StorageTestBase { + private lazy val container: MinIOContainer = { + val c = MinIOContainer( + dockerImageName = DockerImageName.parse("minio/minio:RELEASE.2025-02-28T09-55-16Z"), + userName = "texera_minio", + password = "password" + ) + c.start() + + val endpoint = s"http://${c.host}:${c.mappedPort(9000)}" + StorageConfig.s3Endpoint = endpoint + + println(s"[S3Storage] Started shared MinIO at $endpoint") + + sys.addShutdownHook { + println("[S3Storage] Stopping shared MinIO...") + c.stop() + } + + c + } + + /** Ensures the container is started (triggers lazy initialization). */ + def ensureContainerStarted(): Unit = { + container // Access lazy val to trigger initialization + () + } +} diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileAttributeType.java b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileAttributeType.java index 84e3de95a61..aa198a9d2d6 100644 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileAttributeType.java +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileAttributeType.java @@ -30,7 +30,8 @@ public enum FileAttributeType { DOUBLE("double", AttributeType.DOUBLE), BOOLEAN("boolean", AttributeType.BOOLEAN), TIMESTAMP("timestamp", AttributeType.TIMESTAMP), - BINARY("binary", AttributeType.BINARY); + BINARY("binary", AttributeType.BINARY), + BIG_OBJECT("big object", AttributeType.BIG_OBJECT); private final String name; @@ -56,6 +57,6 @@ public String toString() { } public boolean isSingle() { - return this == SINGLE_STRING || this == BINARY; + return this == SINGLE_STRING || this == BINARY || this == BIG_OBJECT; } } diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala index 2124c9da433..c039b6e2d8a 100644 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala @@ -22,8 +22,9 @@ package org.apache.amber.operator.source.scan import org.apache.amber.core.executor.SourceOperatorExecutor import org.apache.amber.core.storage.DocumentFactory import org.apache.amber.core.tuple.AttributeTypeUtils.parseField -import org.apache.amber.core.tuple.TupleLike +import org.apache.amber.core.tuple.{BigObject, TupleLike} import org.apache.amber.util.JSONUtils.objectMapper +import org.apache.texera.service.util.BigObjectOutputStream import org.apache.commons.compress.archivers.ArchiveStreamFactory import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream import org.apache.commons.io.IOUtils.toByteArray @@ -84,6 +85,21 @@ class FileScanSourceOpExec private[scan] ( fields.addOne(desc.attributeType match { case FileAttributeType.SINGLE_STRING => new String(toByteArray(entry), desc.fileEncoding.getCharset) + case FileAttributeType.BIG_OBJECT => + // For big objects, create reference and upload via streaming + val bigObject = new BigObject() + val out = new BigObjectOutputStream(bigObject) + try { + val buffer = new Array[Byte](8192) + var bytesRead = entry.read(buffer) + while (bytesRead != -1) { + out.write(buffer, 0, bytesRead) + bytesRead = entry.read(buffer) + } + } finally { + out.close() + } + bigObject case _ => parseField(toByteArray(entry), desc.attributeType.getType) }) TupleLike(fields.toSeq: _*) diff --git a/common/workflow-operator/src/test/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExecSpec.scala b/common/workflow-operator/src/test/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExecSpec.scala new file mode 100644 index 00000000000..07b09f0a268 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExecSpec.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.amber.operator.source.scan + +import org.apache.amber.core.tuple.{AttributeType, BigObject, Schema, SchemaEnforceable} +import org.apache.amber.util.JSONUtils.objectMapper +import org.scalatest.BeforeAndAfterAll +import org.scalatest.flatspec.AnyFlatSpec + +import java.io.{BufferedOutputStream, FileOutputStream} +import java.net.URI +import java.nio.file.{Files, Path} +import java.util.zip.{ZipEntry, ZipOutputStream} + +/** + * Unit tests for BIG_OBJECT logic in FileScanSourceOpExec. + * Full integration tests with S3 and database are in BigObjectManagerSpec. + */ +class FileScanSourceOpExecSpec extends AnyFlatSpec with BeforeAndAfterAll { + + private val testDir = Path + .of(sys.env.getOrElse("TEXERA_HOME", ".")) + .resolve("common/workflow-operator/src/test/resources") + .toRealPath() + + private val testFile = testDir.resolve("test_big_object.txt") + private val testZip = testDir.resolve("test_big_object.zip") + + override def beforeAll(): Unit = { + super.beforeAll() + Files.write(testFile, "Test content\nLine 2\nLine 3".getBytes) + createZipFile(testZip, Map("file1.txt" -> "Content 1", "file2.txt" -> "Content 2")) + } + + override def afterAll(): Unit = { + Files.deleteIfExists(testFile) + Files.deleteIfExists(testZip) + super.afterAll() + } + + private def createZipFile(path: Path, entries: Map[String, String]): Unit = { + val zipOut = new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(path.toFile))) + try { + entries.foreach { + case (name, content) => + zipOut.putNextEntry(new ZipEntry(name)) + zipOut.write(content.getBytes) + zipOut.closeEntry() + } + } finally { + zipOut.close() + } + } + + private def createDescriptor( + file: Path = testFile, + attributeName: String = "line" + ): FileScanSourceOpDesc = { + val desc = new FileScanSourceOpDesc() + desc.fileName = Some(file.toString) + desc.attributeType = FileAttributeType.BIG_OBJECT + desc.attributeName = attributeName + desc.fileEncoding = FileDecodingMethod.UTF_8 + desc + } + + private def assertSchema(schema: Schema, attributeName: String): Unit = { + assert(schema.getAttributes.length == 1) + assert(schema.getAttribute(attributeName).getType == AttributeType.BIG_OBJECT) + } + + // Schema Tests + it should "infer BIG_OBJECT schema with default attribute name" in { + assertSchema(createDescriptor().sourceSchema(), "line") + } + + it should "infer BIG_OBJECT schema with custom attribute name" in { + assertSchema(createDescriptor(attributeName = "custom_field").sourceSchema(), "custom_field") + } + + it should "map BIG_OBJECT to correct AttributeType" in { + assert(FileAttributeType.BIG_OBJECT.getType == AttributeType.BIG_OBJECT) + } + + // Type Classification Tests + it should "correctly classify BIG_OBJECT as isSingle type" in { + val isSingleTypes = List( + FileAttributeType.BIG_OBJECT, + FileAttributeType.SINGLE_STRING, + FileAttributeType.BINARY + ) + val multiLineTypes = List( + FileAttributeType.STRING, + FileAttributeType.INTEGER, + FileAttributeType.LONG, + FileAttributeType.DOUBLE, + FileAttributeType.BOOLEAN, + FileAttributeType.TIMESTAMP + ) + + isSingleTypes.foreach(t => assert(t.isSingle, s"$t should be isSingle")) + multiLineTypes.foreach(t => assert(!t.isSingle, s"$t should not be isSingle")) + } + + // Execution Tests + it should "create BigObject when reading file with BIG_OBJECT type" in { + val desc = createDescriptor() + desc.setResolvedFileName(URI.create(testFile.toUri.toString)) + + val executor = new FileScanSourceOpExec(objectMapper.writeValueAsString(desc)) + + try { + executor.open() + val tuples = executor.produceTuple().toSeq + executor.close() + + assert(tuples.size == 1) + val field = tuples.head + .asInstanceOf[SchemaEnforceable] + .enforceSchema(desc.sourceSchema()) + .getField[Any]("line") + + assert(field.isInstanceOf[BigObject]) + assert(field.asInstanceOf[BigObject].getUri.startsWith("s3://")) + } catch { + case e: Exception => + info(s"S3 not configured: ${e.getMessage}") + } + } + + // BigObject Tests + it should "create valid BigObject with correct URI parsing" in { + val pointer = new BigObject("s3://bucket/path/to/object") + + assert(pointer.getUri == "s3://bucket/path/to/object") + assert(pointer.getBucketName == "bucket") + assert(pointer.getObjectKey == "path/to/object") + } + + it should "reject invalid BigObject URIs" in { + assertThrows[IllegalArgumentException](new BigObject("http://invalid")) + assertThrows[IllegalArgumentException](new BigObject("not-a-uri")) + assertThrows[IllegalArgumentException](new BigObject(null.asInstanceOf[String])) + } +} diff --git a/file-service/build.sbt b/file-service/build.sbt index 68ac82e6b3b..34b30472e0c 100644 --- a/file-service/build.sbt +++ b/file-service/build.sbt @@ -84,7 +84,4 @@ libraryDependencies ++= Seq( "jakarta.ws.rs" % "jakarta.ws.rs-api" % "3.1.0", // Ensure Jakarta JAX-RS API is available "org.bitbucket.b_c" % "jose4j" % "0.9.6", "org.playframework" %% "play-json" % "3.1.0-M1", - "software.amazon.awssdk" % "s3" % "2.29.51", - "software.amazon.awssdk" % "auth" % "2.29.51", - "software.amazon.awssdk" % "regions" % "2.29.51", )