Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.lakefs.clients.sdk.model._
import org.apache.texera.amber.config.StorageConfig

import java.io.{File, FileOutputStream, InputStream}
import java.net.URI
import java.nio.file.Files
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -358,4 +359,47 @@ object LakeFSStorageClient {

branchesApi.resetBranch(repoName, branchName, resetCreation).execute()
}

/**
* Parse a physical address URI of the form "<scheme>://<bucket>/<key...>" into (bucket, key).
*
* Expected examples:
* - "s3://my-bucket/path/to/file.csv"
* - "gs://my-bucket/some/prefix/data.json"
*
* @param address URI string in the form "<scheme>://<bucket>/<key...>"
* @return (bucket, key) where key does not start with "/"
* @throws IllegalArgumentException
* if the address is empty, not a valid URI, missing bucket/host, or missing key/path
*/
def parsePhysicalAddress(address: String): (String, String) = {
val raw = Option(address).getOrElse("").trim
if (raw.isEmpty)
throw new IllegalArgumentException("Address is empty (expected '<scheme>://<bucket>/<key>')")

val uri =
try new URI(raw)
catch {
case e: Exception =>
throw new IllegalArgumentException(
s"Invalid address URI: '$raw' (expected '<scheme>://<bucket>/<key>')",
e
)
}

val bucket = Option(uri.getHost).getOrElse("").trim
if (bucket.isEmpty)
throw new IllegalArgumentException(
s"Invalid address: missing host/bucket in '$raw' (expected '<scheme>://<bucket>/<key>')"
)

val key = Option(uri.getPath).getOrElse("").stripPrefix("/").trim
if (key.isEmpty)
throw new IllegalArgumentException(
s"Invalid address: missing key/path in '$raw' (expected '<scheme>://<bucket>/<key>')"
)

(bucket, key)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,59 @@ object S3StorageClient {
DeleteObjectRequest.builder().bucket(bucketName).key(objectKey).build()
)
}

/**
* Uploads a single part for an in-progress S3 multipart upload.
*
* This method wraps the AWS SDK v2 {@code UploadPart} API:
* it builds an {@link software.amazon.awssdk.services.s3.model.UploadPartRequest}
* and streams the part payload via a {@link software.amazon.awssdk.core.sync.RequestBody}.
*
* Payload handling:
* - If {@code contentLength} is provided, the payload is streamed directly from {@code inputStream}
* using {@code RequestBody.fromInputStream(inputStream, len)}.
* - If {@code contentLength} is {@code None}, the entire {@code inputStream} is read into memory
* ({@code readAllBytes}) and uploaded using {@code RequestBody.fromBytes(bytes)}.
* This is convenient but can be memory-expensive for large parts; prefer providing a known length.
*
* Notes:
* - {@code partNumber} must be in the valid S3 range (typically 1..10,000).
* - The caller is responsible for closing {@code inputStream}.
* - This method is synchronous and will block the calling thread until the upload completes.
*
* @param bucket S3 bucket name.
* @param key Object key (path) being uploaded.
* @param uploadId Multipart upload identifier returned by CreateMultipartUpload.
* @param partNumber 1-based part number for this upload.
* @param inputStream Stream containing the bytes for this part.
* @param contentLength Optional size (in bytes) of this part; provide it to avoid buffering in memory.
* @return The {@link software.amazon.awssdk.services.s3.model.UploadPartResponse},
* including the part ETag used for completing the multipart upload.
*/
def uploadPartWithRequest(
bucket: String,
key: String,
uploadId: String,
partNumber: Int,
inputStream: InputStream,
contentLength: Option[Long]
): UploadPartResponse = {
val requestBody: RequestBody = contentLength match {
case Some(len) => RequestBody.fromInputStream(inputStream, len)
case None =>
val bytes = inputStream.readAllBytes()
RequestBody.fromBytes(bytes)
}

val req = UploadPartRequest
.builder()
.bucket(bucket)
.key(key)
.uploadId(uploadId)
.partNumber(partNumber)
.build()

s3Client.uploadPart(req, requestBody)
}

}
Loading
Loading