Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package edu.uci.ics.amber.engine.common.executor

import edu.uci.ics.amber.engine.common.model.tuple.{Tuple, TupleLike}
import edu.uci.ics.amber.engine.common.storage.DatasetFileDocument
import edu.uci.ics.amber.engine.common.workflow.PortIdentity

import java.io.{FileInputStream, InputStream}

trait SourceOperatorExecutor extends OperatorExecutor {
override def open(): Unit = {}

Expand All @@ -25,21 +22,4 @@ trait SourceOperatorExecutor extends OperatorExecutor {
// We should move this to onFinishAllPorts later.
produceTuple().map(t => (t, Option.empty))
}

// this function create the input stream accordingly:
// - if filePath is set, create the stream from the file
// - if fileDesc is set, create the stream via JGit call
def createInputStream(filePath: String, datasetFileDocument: DatasetFileDocument): InputStream = {
if (filePath != null && datasetFileDocument != null) {
throw new RuntimeException(
"File Path and Dataset File Descriptor cannot present at the same time."
)
}
if (filePath != null) {
new FileInputStream(filePath)
} else {
// create stream from dataset file desc
datasetFileDocument.asInputStream()
}
}
}
Original file line number Diff line number Diff line change
@@ -1,30 +1,46 @@
package edu.uci.ics.amber.engine.common.storage

import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetResource
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.service.GitVersionControlLocalFileStorage
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.utils.PathUtils
import org.jooq.types.UInteger

import java.io.{File, InputStream, FileOutputStream}
import java.net.URI
import java.nio.file.{Files, Path}
import java.io.{File, FileOutputStream, InputStream}
import java.net.{URI, URLDecoder}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths}
import scala.jdk.CollectionConverters.IteratorHasAsScala

class DatasetFileDocument(fileFullPath: Path) extends VirtualDocument[Nothing] {
class DatasetFileDocument(uri: URI) extends VirtualDocument[Nothing] {
// Utility function to parse and decode URI segments into individual components
private def parseUri(uri: URI): (Int, String, Path) = {
val segments = Paths.get(uri.getPath).iterator().asScala.map(_.toString).toArray
if (segments.length < 3)
throw new IllegalArgumentException("URI format is incorrect")

private val (_, dataset, datasetVersion, fileRelativePath) =
DatasetResource.resolvePath(fileFullPath, shouldContainFile = true)
val did = segments(0).toInt
val datasetVersionHash = URLDecoder.decode(segments(1), StandardCharsets.UTF_8)
val decodedRelativeSegments =
segments.drop(2).map(part => URLDecoder.decode(part, StandardCharsets.UTF_8))
val fileRelativePath = Paths.get(decodedRelativeSegments.head, decodedRelativeSegments.tail: _*)

(did, datasetVersionHash, fileRelativePath)
}

// Extract components from URI using the utility function
private val (did, datasetVersionHash, fileRelativePath) = parseUri(uri)

private var tempFile: Option[File] = None

override def getURI: URI =
throw new UnsupportedOperationException(
"The URI cannot be acquired because the file is not physically located"
)
override def getURI: URI = uri

override def asInputStream(): InputStream = {
fileRelativePath match {
case Some(path) =>
DatasetResource.getDatasetFile(dataset.getDid, datasetVersion.getDvid, path)
case None =>
throw new IllegalArgumentException("File relative path is missing.")
}
val datasetAbsolutePath = PathUtils.getDatasetPath(UInteger.valueOf(did))
GitVersionControlLocalFileStorage
.retrieveFileContentOfVersionAsInputStream(
datasetAbsolutePath,
datasetVersionHash,
datasetAbsolutePath.resolve(fileRelativePath)
)
}

override def asFile(): File = {
Expand Down Expand Up @@ -53,9 +69,15 @@ class DatasetFileDocument(fileFullPath: Path) extends VirtualDocument[Nothing] {
}

override def remove(): Unit = {
// first remove the temporary file
tempFile match {
case Some(file) => Files.delete(file.toPath)
case None => // Do nothing
}
// then remove the dataset file
GitVersionControlLocalFileStorage.removeFileFromRepo(
PathUtils.getDatasetPath(UInteger.valueOf(did)),
PathUtils.getDatasetPath(UInteger.valueOf(did)).resolve(fileRelativePath)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package edu.uci.ics.amber.engine.common.storage

import edu.uci.ics.texera.workflow.common.storage.FileResolver.DATASET_FILE_URI_SCHEME

import java.net.URI

object DocumentFactory {
def newReadonlyDocument(fileUri: URI): ReadonlyVirtualDocument[_] = {
fileUri.getScheme match {
case DATASET_FILE_URI_SCHEME =>
new DatasetFileDocument(fileUri)

case "file" =>
// For local files, create a ReadonlyLocalFileDocument
new ReadonlyLocalFileDocument(fileUri)

case _ =>
throw new UnsupportedOperationException(s"Unsupported URI scheme: ${fileUri.getScheme}")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class SchemaPropagationResource extends LazyLogging {
)

val logicalPlan = LogicalPlan(logicalPlanPojo)
logicalPlan.resolveScanSourceOpFileName(None)

// the PhysicalPlan with topology expanded.
val physicalPlan = PhysicalPlan(context, logicalPlan)
Expand Down
Loading