diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/executor/SourceOperatorExecutor.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/executor/SourceOperatorExecutor.scala index baff229db0b..453a41c5ac6 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/executor/SourceOperatorExecutor.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/executor/SourceOperatorExecutor.scala @@ -1,11 +1,8 @@ package edu.uci.ics.amber.engine.common.executor import edu.uci.ics.amber.engine.common.model.tuple.{Tuple, TupleLike} -import edu.uci.ics.amber.engine.common.storage.DatasetFileDocument import edu.uci.ics.amber.engine.common.workflow.PortIdentity -import java.io.{FileInputStream, InputStream} - trait SourceOperatorExecutor extends OperatorExecutor { override def open(): Unit = {} @@ -25,21 +22,4 @@ trait SourceOperatorExecutor extends OperatorExecutor { // We should move this to onFinishAllPorts later. produceTuple().map(t => (t, Option.empty)) } - - // this function create the input stream accordingly: - // - if filePath is set, create the stream from the file - // - if fileDesc is set, create the stream via JGit call - def createInputStream(filePath: String, datasetFileDocument: DatasetFileDocument): InputStream = { - if (filePath != null && datasetFileDocument != null) { - throw new RuntimeException( - "File Path and Dataset File Descriptor cannot present at the same time." - ) - } - if (filePath != null) { - new FileInputStream(filePath) - } else { - // create stream from dataset file desc - datasetFileDocument.asInputStream() - } - } } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/storage/DatasetFileDocument.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/storage/DatasetFileDocument.scala index bb5ef6b8e2b..fa4d740b5c6 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/storage/DatasetFileDocument.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/storage/DatasetFileDocument.scala @@ -1,30 +1,46 @@ package edu.uci.ics.amber.engine.common.storage -import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetResource +import edu.uci.ics.texera.web.resource.dashboard.user.dataset.service.GitVersionControlLocalFileStorage +import edu.uci.ics.texera.web.resource.dashboard.user.dataset.utils.PathUtils +import org.jooq.types.UInteger -import java.io.{File, InputStream, FileOutputStream} -import java.net.URI -import java.nio.file.{Files, Path} +import java.io.{File, FileOutputStream, InputStream} +import java.net.{URI, URLDecoder} +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Path, Paths} +import scala.jdk.CollectionConverters.IteratorHasAsScala -class DatasetFileDocument(fileFullPath: Path) extends VirtualDocument[Nothing] { +class DatasetFileDocument(uri: URI) extends VirtualDocument[Nothing] { + // Utility function to parse and decode URI segments into individual components + private def parseUri(uri: URI): (Int, String, Path) = { + val segments = Paths.get(uri.getPath).iterator().asScala.map(_.toString).toArray + if (segments.length < 3) + throw new IllegalArgumentException("URI format is incorrect") - private val (_, dataset, datasetVersion, fileRelativePath) = - DatasetResource.resolvePath(fileFullPath, shouldContainFile = true) + val did = segments(0).toInt + val datasetVersionHash = URLDecoder.decode(segments(1), StandardCharsets.UTF_8) + val decodedRelativeSegments = + segments.drop(2).map(part => URLDecoder.decode(part, StandardCharsets.UTF_8)) + val fileRelativePath = Paths.get(decodedRelativeSegments.head, decodedRelativeSegments.tail: _*) + + (did, datasetVersionHash, fileRelativePath) + } + + // Extract components from URI using the utility function + private val (did, datasetVersionHash, fileRelativePath) = parseUri(uri) private var tempFile: Option[File] = None - override def getURI: URI = - throw new UnsupportedOperationException( - "The URI cannot be acquired because the file is not physically located" - ) + override def getURI: URI = uri override def asInputStream(): InputStream = { - fileRelativePath match { - case Some(path) => - DatasetResource.getDatasetFile(dataset.getDid, datasetVersion.getDvid, path) - case None => - throw new IllegalArgumentException("File relative path is missing.") - } + val datasetAbsolutePath = PathUtils.getDatasetPath(UInteger.valueOf(did)) + GitVersionControlLocalFileStorage + .retrieveFileContentOfVersionAsInputStream( + datasetAbsolutePath, + datasetVersionHash, + datasetAbsolutePath.resolve(fileRelativePath) + ) } override def asFile(): File = { @@ -53,9 +69,15 @@ class DatasetFileDocument(fileFullPath: Path) extends VirtualDocument[Nothing] { } override def remove(): Unit = { + // first remove the temporary file tempFile match { case Some(file) => Files.delete(file.toPath) case None => // Do nothing } + // then remove the dataset file + GitVersionControlLocalFileStorage.removeFileFromRepo( + PathUtils.getDatasetPath(UInteger.valueOf(did)), + PathUtils.getDatasetPath(UInteger.valueOf(did)).resolve(fileRelativePath) + ) } } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/storage/DocumentFactory.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/storage/DocumentFactory.scala new file mode 100644 index 00000000000..f216504d096 --- /dev/null +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/storage/DocumentFactory.scala @@ -0,0 +1,21 @@ +package edu.uci.ics.amber.engine.common.storage + +import edu.uci.ics.texera.workflow.common.storage.FileResolver.DATASET_FILE_URI_SCHEME + +import java.net.URI + +object DocumentFactory { + def newReadonlyDocument(fileUri: URI): ReadonlyVirtualDocument[_] = { + fileUri.getScheme match { + case DATASET_FILE_URI_SCHEME => + new DatasetFileDocument(fileUri) + + case "file" => + // For local files, create a ReadonlyLocalFileDocument + new ReadonlyLocalFileDocument(fileUri) + + case _ => + throw new UnsupportedOperationException(s"Unsupported URI scheme: ${fileUri.getScheme}") + } + } +} diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/SchemaPropagationResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/SchemaPropagationResource.scala index 1a48eca0ae9..dfcf38cedf3 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/SchemaPropagationResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/SchemaPropagationResource.scala @@ -40,6 +40,7 @@ class SchemaPropagationResource extends LazyLogging { ) val logicalPlan = LogicalPlan(logicalPlanPojo) + logicalPlan.resolveScanSourceOpFileName(None) // the PhysicalPlan with topology expanded. val physicalPlan = PhysicalPlan(context, logicalPlan) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetResource.scala index d5da3dc602b..742545af59e 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetResource.scala @@ -1,9 +1,14 @@ package edu.uci.ics.texera.web.resource.dashboard.user.dataset import edu.uci.ics.amber.engine.common.Utils.withTransaction +import edu.uci.ics.amber.engine.common.storage.DatasetFileDocument import edu.uci.ics.texera.web.SqlServer import edu.uci.ics.texera.web.auth.SessionUser import edu.uci.ics.texera.web.model.jooq.generated.enums.DatasetUserAccessPrivilege +import edu.uci.ics.texera.web.model.jooq.generated.tables.Dataset.DATASET +import edu.uci.ics.texera.web.model.jooq.generated.tables.DatasetUserAccess.DATASET_USER_ACCESS +import edu.uci.ics.texera.web.model.jooq.generated.tables.DatasetVersion.DATASET_VERSION +import edu.uci.ics.texera.web.model.jooq.generated.tables.User.USER import edu.uci.ics.texera.web.model.jooq.generated.tables.daos.{ DatasetDao, DatasetUserAccessDao, @@ -15,90 +20,39 @@ import edu.uci.ics.texera.web.model.jooq.generated.tables.pojos.{ DatasetVersion, User } -import edu.uci.ics.texera.web.model.jooq.generated.tables.Dataset.DATASET -import edu.uci.ics.texera.web.model.jooq.generated.tables.User.USER -import edu.uci.ics.texera.web.model.jooq.generated.tables.DatasetUserAccess.DATASET_USER_ACCESS -import edu.uci.ics.texera.web.model.jooq.generated.tables.DatasetVersion.DATASET_VERSION -import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResource.{ - getDatasetUserAccessPrivilege, - getOwner, - userHasReadAccess, - userHasWriteAccess, - userOwnDataset -} -import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetResource.{ - DATASET_IS_PRIVATE, - DATASET_IS_PUBLIC, - DashboardDataset, - DashboardDatasetVersion, - DatasetDescriptionModification, - DatasetIDs, - DatasetNameModification, - DatasetVersionRootFileNodes, - DatasetVersionRootFileNodesResponse, - DatasetVersions, - ERR_DATASET_CREATION_FAILED_MESSAGE, - ERR_DATASET_NAME_ALREADY_EXISTS, - ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE, - ListDatasetsResponse, - calculateDatasetVersionSize, - calculateLatestDatasetVersionSize, - context, - createNewDatasetVersionFromFormData, - getDashboardDataset, - getDatasetByID, - getDatasetVersionByID, - getDatasetVersions, - getFileNodesOfCertainVersion, - getLatestDatasetVersionWithAccessCheck, - getUserDatasets, - resolvePath, - retrievePublicDatasets -} +import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResource._ +import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetResource.{context, _} import edu.uci.ics.texera.web.resource.dashboard.user.dataset.`type`.{ DatasetFileNode, PhysicalFileNode } import edu.uci.ics.texera.web.resource.dashboard.user.dataset.service.GitVersionControlLocalFileStorage import edu.uci.ics.texera.web.resource.dashboard.user.dataset.utils.PathUtils +import edu.uci.ics.texera.workflow.common.storage.FileResolver import io.dropwizard.auth.Auth import org.apache.commons.lang3.StringUtils import org.glassfish.jersey.media.multipart.{FormDataMultiPart, FormDataParam} -import org.jooq.{DSLContext, EnumType} import org.jooq.types.UInteger +import org.jooq.{DSLContext, EnumType} import play.api.libs.json.Json import java.io.{IOException, InputStream, OutputStream} -import java.net.URLDecoder +import java.net.{URI, URLDecoder} import java.nio.charset.StandardCharsets -import java.nio.file.{Files, Paths} -import java.util.zip.{ZipEntry, ZipOutputStream} +import java.nio.file.Files import java.util import java.util.Optional import java.util.concurrent.locks.ReentrantLock +import java.util.zip.{ZipEntry, ZipOutputStream} import javax.annotation.security.RolesAllowed -import javax.ws.rs.{ - BadRequestException, - Consumes, - ForbiddenException, - GET, - NotFoundException, - POST, - Path, - PathParam, - Produces, - QueryParam, - WebApplicationException -} import javax.ws.rs.core.{MediaType, Response, StreamingOutput} -import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` +import javax.ws.rs._ +import scala.jdk.CollectionConverters._ import scala.collection.mutable import scala.collection.mutable.ListBuffer -import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters._ -import scala.util.Using import scala.util.control.NonFatal -import scala.util.{Failure, Success, Try} +import scala.util.{Failure, Success, Try, Using} object DatasetResource { val DATASET_IS_PUBLIC: Byte = 1; @@ -136,33 +90,6 @@ object DatasetResource { dataset } - private def getDatasetByName( - ctx: DSLContext, - ownerEmail: String, - datasetName: String - ): Dataset = { - ctx - .select(DATASET.fields: _*) - .from(DATASET) - .leftJoin(USER) - .on(USER.UID.eq(DATASET.OWNER_UID)) - .where(USER.EMAIL.eq(ownerEmail)) - .and(DATASET.NAME.eq(datasetName)) - .fetchOneInto(classOf[Dataset]) - } - - private def getDatasetVersionByName( - ctx: DSLContext, - did: UInteger, - versionName: String - ): DatasetVersion = { - ctx - .selectFrom(DATASET_VERSION) - .where(DATASET_VERSION.DID.eq(did)) - .and(DATASET_VERSION.NAME.eq(versionName)) - .fetchOneInto(classOf[DatasetVersion]) - } - // this function retrieve the version hash identified by dvid and did // read access will be checked private def getDatasetVersionByID( @@ -177,56 +104,6 @@ object DatasetResource { version } - // @param shouldContainFile a boolean flag indicating whether the path includes a fileRelativePath - // when shouldContainFile is true, user given path is /ownerEmail/datasetName/versionName/fileRelativePath - // e.g. /bob@texera.com/twitterDataset/v1/california/irvine/tw1.csv - // ownerName is bob@texera.com; datasetName is twitterDataset, versionName is v1, fileRelativePath is california/irvine/tw1.csv - // when shouldContainFile is false, user given path is /ownerEmail/datasetName/versionName - // e.g. /bob@texera.com/twitterDataset/v1 - // ownerName is bob@texera.com; datasetName is twitterDataset, versionName is v1 - def resolvePath( - path: java.nio.file.Path, - shouldContainFile: Boolean - ): (String, Dataset, DatasetVersion, Option[java.nio.file.Path]) = { - - val pathSegments = (0 until path.getNameCount).map(path.getName(_).toString).toArray - - // The expected length of the path segments: - // - If shouldContainFile is true, the path should include 4 segments: /ownerEmail/datasetName/versionName/fileRelativePath - // - If shouldContainFile is false, the path should include only 3 segments: /ownerEmail/datasetName/versionName - val expectedLength = if (shouldContainFile) 4 else 3 - - if (pathSegments.length < expectedLength) { - throw new BadRequestException( - s"Invalid path format. Expected format: /ownerEmail/datasetName/versionName" + - (if (shouldContainFile) "/fileRelativePath" else "") - ) - } - - val ownerEmail = pathSegments(0) - val datasetName = pathSegments(1) - val versionName = pathSegments(2) - - val fileRelativePath = - if (shouldContainFile) Some(Paths.get(pathSegments.drop(3).mkString("/"))) else None - - withTransaction(context) { ctx => - // Get the dataset by owner email and dataset name - val dataset = getDatasetByName(ctx, ownerEmail, datasetName) - if (dataset == null) { - throw new NotFoundException("Dataset not found") - } - - // Get the dataset version by dataset ID and version name - val datasetVersion = getDatasetVersionByName(ctx, dataset.getDid, versionName) - if (datasetVersion == null) { - throw new NotFoundException("Dataset version not found") - } - - (ownerEmail, dataset, datasetVersion, fileRelativePath) - } - } - // this function retrieve the DashboardDataset(Dataset from DB+more information) identified by did // read access will be checked def getDashboardDataset(ctx: DSLContext, did: UInteger, uid: UInteger): DashboardDataset = { @@ -300,21 +177,6 @@ object DatasetResource { } } - def getDatasetFile( - did: UInteger, - dvid: UInteger, - fileRelativePath: java.nio.file.Path - ): InputStream = { - val versionHash = getDatasetVersionByID(context, dvid).getVersionHash - val datasetPath = PathUtils.getDatasetPath(did) - GitVersionControlLocalFileStorage - .retrieveFileContentOfVersionAsInputStream( - PathUtils.getDatasetPath(did), - versionHash, - datasetPath.resolve(fileRelativePath) - ) - } - private def getFileNodesOfCertainVersion( ownerNode: DatasetFileNode, datasetName: String, @@ -334,7 +196,7 @@ object DatasetResource { // DatasetOperation defines the operations that will be applied when creating a new dataset version private case class DatasetOperation( filesToAdd: Map[java.nio.file.Path, InputStream], - filesToRemove: List[java.nio.file.Path] + filesToRemove: List[URI] ) private def parseUserUploadedFormToDatasetOperations( @@ -345,7 +207,7 @@ object DatasetResource { // Mutable collections for constructing DatasetOperation val filesToAdd = mutable.Map[java.nio.file.Path, InputStream]() - val filesToRemove = mutable.ListBuffer[java.nio.file.Path]() + val filesToRemove = mutable.ListBuffer[URI]() val fields = multiPart.getFields.keySet.iterator() // Get all field names @@ -370,17 +232,7 @@ object DatasetResource { .parse(filePathsValue) .as[List[String]] .foreach(pathStr => { - val (_, _, _, fileRelativePath) = - resolvePath(Paths.get(pathStr), shouldContainFile = true) - - fileRelativePath - .map { path => - filesToRemove += datasetPath - .resolve(path) // When path exists, resolve it and add to filesToRemove - } - .getOrElse { - throw new IllegalArgumentException("File relative path is missing") - } + filesToRemove += FileResolver.resolve(pathStr) }) } } @@ -446,7 +298,7 @@ object DatasetResource { .orderBy(DATASET_VERSION.CREATION_TIME.desc()) // or .asc() for ascending .fetchInto(classOf[DatasetVersion]) - result.toList + result.asScala.toList } // apply the dataset operation to create a new dataset version @@ -484,11 +336,8 @@ object DatasetResource { GitVersionControlLocalFileStorage.writeFileToRepo(datasetPath, filePath, fileStream) } - datasetOperation.filesToRemove.foreach { filePath => - GitVersionControlLocalFileStorage.removeFileFromRepo( - datasetPath, - filePath - ) + datasetOperation.filesToRemove.foreach { fileUri => + new DatasetFileDocument(fileUri).remove() } } ) @@ -514,7 +363,7 @@ object DatasetResource { .into(classOf[DatasetVersion]), DatasetFileNode.fromPhysicalFileNodes( Map( - (ownerEmail, datasetName, versionName) -> physicalFileNodes.toList + (ownerEmail, datasetName, versionName) -> physicalFileNodes.asScala.toList ) ) ) @@ -876,6 +725,7 @@ class DatasetResource { size = calculateLatestDatasetVersionSize(dataset.getDid) ) }) + .asScala ) // then we fetch the public datasets and merge it as a part of the result if not exist @@ -918,6 +768,7 @@ class DatasetResource { PathUtils.getDatasetPath(did), version.getVersionHash ) + .asScala .toList) } DashboardDatasetVersion( @@ -985,6 +836,7 @@ class DatasetResource { datasetPath, latestVersion.getVersionHash ) + .asScala .toList ) ) @@ -1018,7 +870,7 @@ class DatasetResource { val size = calculateDatasetVersionSize(did, dvid) val ownerFileNode = DatasetFileNode .fromPhysicalFileNodes( - Map((dataset.ownerEmail, datasetName, datasetVersion.getName) -> fileNodes.toList) + Map((dataset.ownerEmail, datasetName, datasetVersion.getName) -> fileNodes.asScala.toList) ) .head @@ -1054,34 +906,21 @@ class DatasetResource { val uid = user.getUid val decodedPathStr = URLDecoder.decode(pathStr, StandardCharsets.UTF_8.name()) - val (_, dataset, dsVersion, fileRelativePath) = - resolvePath(Paths.get(decodedPathStr), shouldContainFile = true) - withTransaction(context)(ctx => { - val did = dataset.getDid - val dvid = dsVersion.getDvid - - if (!userHasReadAccess(ctx, dataset.getDid, uid)) { - throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE) - } - - val targetDatasetPath = PathUtils.getDatasetPath(did) - val datasetVersion = getDatasetVersionByID(ctx, dvid) - + val fileUri = FileResolver.resolve(decodedPathStr) val streamingOutput = new StreamingOutput() { override def write(output: OutputStream): Unit = { - fileRelativePath - .map { path => - GitVersionControlLocalFileStorage.retrieveFileContentOfVersion( - targetDatasetPath, - datasetVersion.getVersionHash, - targetDatasetPath.resolve(path), - output - ) - } - .getOrElse { - throw new IllegalArgumentException("File relative path is missing.") + val inputStream = new DatasetFileDocument(fileUri).asInputStream() + try { + val buffer = new Array[Byte](8192) // buffer size + var bytesRead = inputStream.read(buffer) + while (bytesRead != -1) { + output.write(buffer, 0, bytesRead) + bytesRead = inputStream.read(buffer) } + } finally { + inputStream.close() + } } } @@ -1189,18 +1028,6 @@ class DatasetResource { .build() } - private def resolveAndValidatePath( - pathStr: String, - user: SessionUser - ): (Dataset, DatasetVersion) = { - val decodedPathStr = URLDecoder.decode(pathStr, StandardCharsets.UTF_8.name()) - val (_, dataset, dsVersion, _) = - resolvePath(Paths.get(decodedPathStr), shouldContainFile = false) - - validateUserAccess(dataset.getDid, user.getUid) - (dataset, dsVersion) - } - private def getLatestVersionInfo(did: UInteger, user: SessionUser): (Dataset, DatasetVersion) = { validateUserAccess(did, user.getUid) val dataset = getDatasetByID(context, did) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/storage/FileResolver.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/storage/FileResolver.scala index e1a94c8fc48..af5eb26a47c 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/storage/FileResolver.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/storage/FileResolver.scala @@ -1,14 +1,22 @@ package edu.uci.ics.texera.workflow.common.storage -import java.nio.file.{Files, Paths} -import edu.uci.ics.amber.engine.common.storage.DatasetFileDocument +import edu.uci.ics.amber.engine.common.Utils.withTransaction +import edu.uci.ics.texera.web.SqlServer +import edu.uci.ics.texera.web.model.jooq.generated.tables.Dataset.DATASET +import edu.uci.ics.texera.web.model.jooq.generated.tables.DatasetVersion.DATASET_VERSION +import edu.uci.ics.texera.web.model.jooq.generated.tables.User.USER +import edu.uci.ics.texera.web.model.jooq.generated.tables.pojos.{Dataset, DatasetVersion} import org.apache.commons.vfs2.FileNotFoundException +import java.io.File +import java.net.{URI, URLEncoder} +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths} +import scala.jdk.CollectionConverters.IteratorHasAsScala import scala.util.{Success, Try} object FileResolver { - - type FileResolverOutput = Either[String, DatasetFileDocument] + val DATASET_FILE_URI_SCHEME = "vfs" /** * Attempts to resolve the given fileName using a list of resolver functions. @@ -17,11 +25,11 @@ object FileResolver { * @throws FileNotFoundException if the file cannot be resolved by any resolver * @return Either[String, DatasetFileDocument] - the resolved path as a String or a DatasetFileDocument */ - def resolve(fileName: String): FileResolverOutput = { - val resolvers: List[String => FileResolverOutput] = List(localResolveFunc, datasetResolveFunc) + def resolve(fileName: String): URI = { + val resolvers: Seq[String => URI] = Seq(localResolveFunc, datasetResolveFunc) // Try each resolver function in sequence - resolvers.iterator + resolvers .map(resolver => Try(resolver(fileName))) .collectFirst { case Success(output) => output @@ -34,25 +42,85 @@ object FileResolver { * @throws FileNotFoundException if the local file does not exist * @param fileName the name of the file to check */ - private def localResolveFunc(fileName: String): FileResolverOutput = { + private def localResolveFunc(fileName: String): URI = { val filePath = Paths.get(fileName) - if (Files.exists(filePath)) { - Left(fileName) // File exists locally, return the path as a string in the Left - } else { + if (!Files.exists(filePath)) { throw new FileNotFoundException(s"Local file $fileName does not exist") } + filePath.toUri } /** - * Attempts to resolve a DatasetFileDocument. + * Attempts to resolve a given fileName to a URI. + * + * The fileName format should be: /ownerEmail/datasetName/versionName/fileRelativePath + * e.g. /bob@texera.com/twitterDataset/v1/california/irvine/tw1.csv + * The output dataset URI format is: {DATASET_FILE_URI_SCHEME}:///{did}/{versionHash}/file-path + * e.g. vfs:///15/adeq233td/some/dir/file.txt * * @param fileName the name of the file to attempt resolving as a DatasetFileDocument * @return Either[String, DatasetFileDocument] - Right(document) if creation succeeds * @throws FileNotFoundException if the dataset file does not exist or cannot be created */ - private def datasetResolveFunc(fileName: String): FileResolverOutput = { + private def datasetResolveFunc(fileName: String): URI = { val filePath = Paths.get(fileName) - val document = new DatasetFileDocument(filePath) // This will throw if creation fails - Right(document) + val pathSegments = (0 until filePath.getNameCount).map(filePath.getName(_).toString).toArray + + // extract info from the user-given fileName + val ownerEmail = pathSegments(0) + val datasetName = pathSegments(1) + val versionName = pathSegments(2) + val fileRelativePath = Paths.get(pathSegments.drop(3).head, pathSegments.drop(3).tail: _*) + + // fetch the dataset and version from DB to get dataset ID and version hash + val (dataset, datasetVersion) = + withTransaction(SqlServer.createDSLContext()) { ctx => + // fetch the dataset from DB + val dataset = ctx + .select(DATASET.fields: _*) + .from(DATASET) + .leftJoin(USER) + .on(USER.UID.eq(DATASET.OWNER_UID)) + .where(USER.EMAIL.eq(ownerEmail)) + .and(DATASET.NAME.eq(datasetName)) + .fetchOneInto(classOf[Dataset]) + + // fetch the dataset version from DB + val datasetVersion = ctx + .selectFrom(DATASET_VERSION) + .where(DATASET_VERSION.DID.eq(dataset.getDid)) + .and(DATASET_VERSION.NAME.eq(versionName)) + .fetchOneInto(classOf[DatasetVersion]) + + if (dataset == null || datasetVersion == null) { + throw new FileNotFoundException(s"Dataset file $fileName not found.") + } + (dataset, datasetVersion) + } + + // Convert each segment of fileRelativePath to an encoded String + val encodedFileRelativePath = fileRelativePath + .iterator() + .asScala + .map { segment => + URLEncoder.encode(segment.toString, StandardCharsets.UTF_8) + } + .toArray + + // Prepend did and versionHash to the encoded path segments + val allPathSegments = Array( + dataset.getDid.intValue().toString, + datasetVersion.getVersionHash + ) ++ encodedFileRelativePath + + // Build the the format /{did}/{versionHash}/{fileRelativePath} + val encodedPath = Paths.get(File.separator, allPathSegments: _*) + + try { + new URI(DATASET_FILE_URI_SCHEME, "", encodedPath.toString, null) + } catch { + case e: Exception => + throw new FileNotFoundException(s"Dataset file $fileName not found.") + } } } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/LogicalPlan.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/LogicalPlan.scala index 3ea3e24fd06..1574f1cd373 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/LogicalPlan.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/LogicalPlan.scala @@ -8,6 +8,8 @@ import edu.uci.ics.amber.engine.common.workflow.PortIdentity import edu.uci.ics.texera.web.model.websocket.request.LogicalPlanPojo import edu.uci.ics.texera.workflow.common.operators.LogicalOp import edu.uci.ics.texera.workflow.common.operators.source.SourceOperatorDescriptor +import edu.uci.ics.texera.workflow.common.storage.FileResolver +import edu.uci.ics.texera.workflow.operators.source.scan.ScanSourceOpDesc import org.jgrapht.graph.DirectedAcyclicGraph import org.jgrapht.util.SupplierUtil @@ -144,6 +146,32 @@ case class LogicalPlan( .toMap } + /** + * Resolve all user-given filename for the scan source operators to URIs, and call op.setFileUri to set the URi + * @param errorList if given, put errors during resolving to it + */ + def resolveScanSourceOpFileName( + errorList: Option[ArrayBuffer[(OperatorIdentity, Throwable)]] + ): Unit = { + operators.foreach { + case operator @ (scanOp: ScanSourceOpDesc) => + Try { + // Resolve file path for ScanSourceOpDesc + val fileName = scanOp.fileName.getOrElse(throw new RuntimeException("no input file name")) + val fileUri = FileResolver.resolve(fileName) // Convert to URI + + // Set the URI in the ScanSourceOpDesc + scanOp.setFileUri(fileUri) + } match { + case Success(_) => // Successfully resolved and set the file URI + case Failure(err) => + logger.error("Error resolving file path for ScanSourceOpDesc", err) + errorList.foreach(_.append((operator.operatorIdentifier, err))) + } + case _ => // Skip non-ScanSourceOpDesc operators + } + } + def propagateWorkflowSchema( context: WorkflowContext, errorList: Option[ArrayBuffer[(OperatorIdentity, Throwable)]] diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCompiler.scala index efa4e305275..e40252f5409 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCompiler.scala @@ -51,7 +51,7 @@ class WorkflowCompiler( logicalPlanPojo.opsToViewResult, logicalPlan ) - + logicalPlan.resolveScanSourceOpFileName(Some(errorList)) logicalPlan.propagateWorkflowSchema(context, Some(errorList)) // map compilation errors with op id if (errorList.nonEmpty) { @@ -121,6 +121,7 @@ class WorkflowCompiler( logicalPlan ) + logicalPlan.resolveScanSourceOpFileName(Some(errorList)) logicalPlan.propagateWorkflowSchema(context, Some(errorList)) // report compilation errors diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/FileScanSourceOpDesc.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/FileScanSourceOpDesc.scala index daa4c3864b5..ee1621e762f 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/FileScanSourceOpDesc.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/FileScanSourceOpDesc.scala @@ -47,7 +47,6 @@ class FileScanSourceOpDesc extends ScanSourceOpDesc with TextSourceOpDesc { workflowId: WorkflowIdentity, executionId: ExecutionIdentity ): PhysicalOp = { - val (filepath, fileDesc) = determineFilePathOrDatasetFile() PhysicalOp .sourcePhysicalOp( workflowId, @@ -55,8 +54,7 @@ class FileScanSourceOpDesc extends ScanSourceOpDesc with TextSourceOpDesc { operatorIdentifier, OpExecInitInfo((_, _) => new FileScanSourceOpExec( - filepath, - fileDesc, + fileUri.get, attributeType, encoding, extract, diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/FileScanSourceOpExec.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/FileScanSourceOpExec.scala index 0244195d4a9..e7c7be0768a 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/FileScanSourceOpExec.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/FileScanSourceOpExec.scala @@ -2,18 +2,18 @@ package edu.uci.ics.texera.workflow.operators.source.scan import edu.uci.ics.amber.engine.common.executor.SourceOperatorExecutor import edu.uci.ics.amber.engine.common.model.tuple.TupleLike -import edu.uci.ics.amber.engine.common.storage.DatasetFileDocument import edu.uci.ics.amber.engine.common.model.tuple.AttributeTypeUtils.parseField +import edu.uci.ics.amber.engine.common.storage.DocumentFactory import org.apache.commons.compress.archivers.{ArchiveInputStream, ArchiveStreamFactory} import org.apache.commons.io.IOUtils.toByteArray import java.io._ +import java.net.URI import scala.collection.mutable import scala.jdk.CollectionConverters.IteratorHasAsScala class FileScanSourceOpExec private[scan] ( - filePath: String, - datasetFileDesc: DatasetFileDocument, + fileUri: String, fileAttributeType: FileAttributeType, fileEncoding: FileDecodingMethod, extract: Boolean, @@ -26,7 +26,7 @@ class FileScanSourceOpExec private[scan] ( override def produceTuple(): Iterator[TupleLike] = { var filenameIt: Iterator[String] = Iterator.empty val fileEntries: Iterator[InputStream] = { - val is = createInputStream(filePath, datasetFileDesc) + val is = DocumentFactory.newReadonlyDocument(new URI(fileUri)).asInputStream() if (extract) { val inputStream: ArchiveInputStream = new ArchiveStreamFactory().createArchiveInputStream( new BufferedInputStream(is) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/ScanSourceOpDesc.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/ScanSourceOpDesc.scala index 01d03538513..46ff9196ef0 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/ScanSourceOpDesc.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/ScanSourceOpDesc.scala @@ -5,13 +5,13 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import edu.uci.ics.amber.engine.common.model.WorkflowContext import edu.uci.ics.amber.engine.common.model.tuple.Schema -import edu.uci.ics.amber.engine.common.storage.DatasetFileDocument import edu.uci.ics.amber.engine.common.workflow.OutputPort import edu.uci.ics.texera.workflow.common.metadata.{OperatorGroupConstants, OperatorInfo} import edu.uci.ics.texera.workflow.common.operators.source.SourceOperatorDescriptor -import edu.uci.ics.texera.workflow.common.storage.FileResolver import org.apache.commons.lang3.builder.EqualsBuilder +import java.net.URI + abstract class ScanSourceOpDesc extends SourceOperatorDescriptor { /** in the case we do not want to read the entire large file, but only @@ -30,9 +30,9 @@ abstract class ScanSourceOpDesc extends SourceOperatorDescriptor { @JsonPropertyDescription("decoding charset to use on input") var fileEncoding: FileDecodingMethod = FileDecodingMethod.UTF_8 - // Unified file handle, can be either a local path (String) or DatasetFileDocument + // uri of the file @JsonIgnore - var fileHandle: FileResolver.FileResolverOutput = _ + var fileUri: Option[String] = None @JsonIgnore var fileTypeName: Option[String] = None @@ -50,19 +50,12 @@ abstract class ScanSourceOpDesc extends SourceOperatorDescriptor { var offset: Option[Int] = None override def sourceSchema(): Schema = { - if (fileHandle == null) return null + if (fileUri.isEmpty) return null inferSchema() } override def setContext(workflowContext: WorkflowContext): Unit = { super.setContext(workflowContext) - - if (fileName.isEmpty) { - throw new RuntimeException("no input file name") - } - - // Resolve the file and assign the result to fileHandle - fileHandle = FileResolver.resolve(fileName.get) } override def operatorInfo: OperatorInfo = { @@ -77,12 +70,8 @@ abstract class ScanSourceOpDesc extends SourceOperatorDescriptor { def inferSchema(): Schema - // Get the source file descriptor from the fileHandle - def determineFilePathOrDatasetFile(): (String, DatasetFileDocument) = { - fileHandle match { - case Left(path) => (path, null) // File path is a local path as String - case Right(document) => (null, document) // File is a DatasetFileDocument - } + def setFileUri(uri: URI): Unit = { + fileUri = Some(uri.toASCIIString) } override def equals(that: Any): Boolean = diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csv/CSVScanSourceOpDesc.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csv/CSVScanSourceOpDesc.scala index e9336f2acec..5e5ccdfd26b 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csv/CSVScanSourceOpDesc.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csv/CSVScanSourceOpDesc.scala @@ -9,9 +9,11 @@ import edu.uci.ics.amber.engine.common.model.{PhysicalOp, SchemaPropagationFunc} import edu.uci.ics.amber.engine.common.virtualidentity.{ExecutionIdentity, WorkflowIdentity} import edu.uci.ics.amber.engine.common.model.tuple.AttributeTypeUtils.inferSchemaFromRows import edu.uci.ics.amber.engine.common.model.tuple.{Attribute, AttributeType, Schema} +import edu.uci.ics.amber.engine.common.storage.DocumentFactory import edu.uci.ics.texera.workflow.operators.source.scan.ScanSourceOpDesc -import java.io.{File, FileInputStream, IOException, InputStreamReader} +import java.io.{IOException, InputStreamReader} +import java.net.URI class CSVScanSourceOpDesc extends ScanSourceOpDesc { @@ -37,7 +39,6 @@ class CSVScanSourceOpDesc extends ScanSourceOpDesc { if (customDelimiter.isEmpty || customDelimiter.get.isEmpty) customDelimiter = Option(",") - val (filepath, fileDesc) = determineFilePathOrDatasetFile() PhysicalOp .sourcePhysicalOp( workflowId, @@ -45,8 +46,7 @@ class CSVScanSourceOpDesc extends ScanSourceOpDesc { operatorIdentifier, OpExecInitInfo((_, _) => new CSVScanSourceOpExec( - filepath, - fileDesc, + fileUri.get, fileEncoding, limit, offset, @@ -70,17 +70,11 @@ class CSVScanSourceOpDesc extends ScanSourceOpDesc { */ @Override def inferSchema(): Schema = { - if (customDelimiter.isEmpty) { + if (customDelimiter.isEmpty || fileUri.isEmpty) { return null } - val (filepath, fileDesc) = determineFilePathOrDatasetFile() - val stream = - if (filepath != null) { - new FileInputStream(new File(filepath)) - } else { - fileDesc.asInputStream() - } + val stream = DocumentFactory.newReadonlyDocument(new URI(fileUri.get)).asInputStream() val inputReader = new InputStreamReader(stream, fileEncoding.getCharset) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csv/CSVScanSourceOpExec.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csv/CSVScanSourceOpExec.scala index fe5a9a61051..245e67e1dfe 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csv/CSVScanSourceOpExec.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csv/CSVScanSourceOpExec.scala @@ -5,15 +5,15 @@ import edu.uci.ics.amber.engine.common.executor.SourceOperatorExecutor import edu.uci.ics.amber.engine.common.workflow.PortIdentity import edu.uci.ics.amber.engine.common.{CheckpointState, CheckpointSupport} import edu.uci.ics.amber.engine.common.model.tuple.{AttributeTypeUtils, Schema, TupleLike} -import edu.uci.ics.amber.engine.common.storage.DatasetFileDocument +import edu.uci.ics.amber.engine.common.storage.DocumentFactory import edu.uci.ics.texera.workflow.operators.source.scan.FileDecodingMethod import java.io.InputStreamReader +import java.net.URI import scala.collection.immutable.ArraySeq class CSVScanSourceOpExec private[csv] ( - filePath: String, - datasetFileDesc: DatasetFileDocument, + fileUri: String, fileEncoding: FileDecodingMethod, limit: Option[Int], offset: Option[Int], @@ -69,7 +69,7 @@ class CSVScanSourceOpExec private[csv] ( override def open(): Unit = { inputReader = new InputStreamReader( - createInputStream(filePath, datasetFileDesc), + DocumentFactory.newReadonlyDocument(new URI(fileUri)).asInputStream(), fileEncoding.getCharset ) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csv/ParallelCSVScanSourceOpDesc.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csv/ParallelCSVScanSourceOpDesc.scala index 567fa40ed34..2978cb56992 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csv/ParallelCSVScanSourceOpDesc.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csv/ParallelCSVScanSourceOpDesc.scala @@ -9,9 +9,11 @@ import edu.uci.ics.amber.engine.common.model.{PhysicalOp, SchemaPropagationFunc} import edu.uci.ics.amber.engine.common.virtualidentity.{ExecutionIdentity, WorkflowIdentity} import edu.uci.ics.amber.engine.common.model.tuple.AttributeTypeUtils.inferSchemaFromRows import edu.uci.ics.amber.engine.common.model.tuple.{Attribute, AttributeType, Schema} +import edu.uci.ics.amber.engine.common.storage.DocumentFactory import edu.uci.ics.texera.workflow.operators.source.scan.ScanSourceOpDesc -import java.io.{File, IOException} +import java.io.IOException +import java.net.URI class ParallelCSVScanSourceOpDesc extends ScanSourceOpDesc { @@ -39,13 +41,7 @@ class ParallelCSVScanSourceOpDesc extends ScanSourceOpDesc { // here, the stream requires to be seekable, so datasetFileDesc creates a temp file here // TODO: consider a better way - val (filepath, fileDesc) = determineFilePathOrDatasetFile() - val file = - if (filepath == null) { - fileDesc.asFile() - } else { - new File(filepath) - } + val file = DocumentFactory.newReadonlyDocument(new URI(fileUri.get)).asFile() val totalBytes: Long = file.length() PhysicalOp @@ -83,16 +79,10 @@ class ParallelCSVScanSourceOpDesc extends ScanSourceOpDesc { */ @Override def inferSchema(): Schema = { - if (customDelimiter.isEmpty) { + if (customDelimiter.isEmpty || fileUri.isEmpty) { return null } - val (filepath, fileDesc) = determineFilePathOrDatasetFile() - val file = - if (filepath == null) { - fileDesc.asFile() - } else { - new File(filepath) - } + val file = DocumentFactory.newReadonlyDocument(new URI(fileUri.get)).asFile() implicit object CustomFormat extends DefaultCSVFormat { override val delimiter: Char = customDelimiter.get.charAt(0) @@ -102,7 +92,7 @@ class ParallelCSVScanSourceOpDesc extends ScanSourceOpDesc { reader.close() // reopen the file to read from the beginning - reader = CSVReader.open(filepath)(CustomFormat) + reader = CSVReader.open(file.toPath.toString)(CustomFormat) if (hasHeader) reader.readNext() diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csvOld/CSVOldScanSourceOpDesc.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csvOld/CSVOldScanSourceOpDesc.scala index ca17df52a26..38625b62b0f 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csvOld/CSVOldScanSourceOpDesc.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csvOld/CSVOldScanSourceOpDesc.scala @@ -9,9 +9,11 @@ import edu.uci.ics.amber.engine.common.model.{PhysicalOp, SchemaPropagationFunc} import edu.uci.ics.amber.engine.common.virtualidentity.{ExecutionIdentity, WorkflowIdentity} import edu.uci.ics.amber.engine.common.model.tuple.AttributeTypeUtils.inferSchemaFromRows import edu.uci.ics.amber.engine.common.model.tuple.{Attribute, AttributeType, Schema} +import edu.uci.ics.amber.engine.common.storage.DocumentFactory import edu.uci.ics.texera.workflow.operators.source.scan.ScanSourceOpDesc -import java.io.{File, IOException} +import java.io.IOException +import java.net.URI class CSVOldScanSourceOpDesc extends ScanSourceOpDesc { @@ -36,17 +38,6 @@ class CSVOldScanSourceOpDesc extends ScanSourceOpDesc { // fill in default values if (customDelimiter.get.isEmpty) customDelimiter = Option(",") - - val (filepath, datasetFileDocument) = determineFilePathOrDatasetFile() - // for CSVOldScanSourceOpDesc, it requires the full File presence when execute, so use temp file here - // TODO: figure out a better way - val path = - if (filepath == null) { - datasetFileDocument.asFile().toPath.toString - } else { - filepath - } - PhysicalOp .sourcePhysicalOp( workflowId, @@ -54,7 +45,7 @@ class CSVOldScanSourceOpDesc extends ScanSourceOpDesc { operatorIdentifier, OpExecInitInfo((_, _) => new CSVOldScanSourceOpExec( - path, + fileUri.get, fileEncoding, limit, offset, @@ -78,16 +69,10 @@ class CSVOldScanSourceOpDesc extends ScanSourceOpDesc { */ @Override def inferSchema(): Schema = { - if (customDelimiter.isEmpty) { + if (customDelimiter.isEmpty || fileUri.isEmpty) { return null } - val (filepath, fileDesc) = determineFilePathOrDatasetFile() - val file = - if (filepath != null) { - new File(filepath) - } else { - fileDesc.asFile() - } + val file = DocumentFactory.newReadonlyDocument(new URI(fileUri.get)).asFile() implicit object CustomFormat extends DefaultCSVFormat { override val delimiter: Char = customDelimiter.get.charAt(0) } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csvOld/CSVOldScanSourceOpExec.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csvOld/CSVOldScanSourceOpExec.scala index 60dc8d81d02..f6b2bd96c16 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csvOld/CSVOldScanSourceOpExec.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/csvOld/CSVOldScanSourceOpExec.scala @@ -8,12 +8,14 @@ import edu.uci.ics.amber.engine.common.model.tuple.{ Schema, TupleLike } +import edu.uci.ics.amber.engine.common.storage.DocumentFactory import edu.uci.ics.texera.workflow.operators.source.scan.FileDecodingMethod +import java.net.URI import scala.collection.compat.immutable.ArraySeq class CSVOldScanSourceOpExec private[csvOld] ( - filePath: String, + fileUri: String, fileEncoding: FileDecodingMethod, limit: Option[Int], offset: Option[Int], @@ -51,7 +53,8 @@ class CSVOldScanSourceOpExec private[csvOld] ( implicit object CustomFormat extends DefaultCSVFormat { override val delimiter: Char = customDelimiter.get.charAt(0) } - reader = CSVReader.open(filePath, fileEncoding.getCharset.name())(CustomFormat) + val filePath = DocumentFactory.newReadonlyDocument(new URI(fileUri)).asFile().toPath + reader = CSVReader.open(filePath.toString, fileEncoding.getCharset.name())(CustomFormat) // skip line if this worker reads the start of a file, and the file has a header line val startOffset = offset.getOrElse(0) + (if (hasHeader) 1 else 0) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/json/JSONLScanSourceOpDesc.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/json/JSONLScanSourceOpDesc.scala index e3c3bcd6027..c622a6e8853 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/json/JSONLScanSourceOpDesc.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/json/JSONLScanSourceOpDesc.scala @@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} import com.fasterxml.jackson.databind.JsonNode import edu.uci.ics.amber.engine.architecture.deploysemantics.layer.OpExecInitInfo import edu.uci.ics.amber.engine.common.model.{PhysicalOp, SchemaPropagationFunc} -import edu.uci.ics.amber.engine.common.storage.DatasetFileDocument +import edu.uci.ics.amber.engine.common.storage.{DatasetFileDocument, DocumentFactory} import edu.uci.ics.amber.engine.common.virtualidentity.{ExecutionIdentity, WorkflowIdentity} import edu.uci.ics.amber.engine.common.Utils.objectMapper import edu.uci.ics.amber.engine.common.model.tuple.AttributeTypeUtils.inferSchemaFromRows @@ -13,6 +13,7 @@ import edu.uci.ics.texera.workflow.operators.source.scan.ScanSourceOpDesc import edu.uci.ics.texera.workflow.operators.source.scan.json.JSONUtil.JSONToMap import java.io.{BufferedReader, FileInputStream, IOException, InputStream, InputStreamReader} +import java.net.URI import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters.IteratorHasAsScala @@ -37,8 +38,7 @@ class JSONLScanSourceOpDesc extends ScanSourceOpDesc { workflowId: WorkflowIdentity, executionId: ExecutionIdentity ): PhysicalOp = { - val (filepath, fileDesc) = determineFilePathOrDatasetFile() - val stream = createInputStream(filepath, fileDesc) + val stream = DocumentFactory.newReadonlyDocument(new URI(fileUri.get)).asInputStream() // count lines and partition the task to each worker val reader = new BufferedReader( new InputStreamReader(stream, fileEncoding.getCharset) @@ -60,8 +60,7 @@ class JSONLScanSourceOpDesc extends ScanSourceOpDesc { offsetValue + (if (idx != workerCount - 1) count / workerCount * (idx + 1) else count) new JSONLScanSourceOpExec( - filepath, - fileDesc, + fileUri.get, fileEncoding, startOffset, endOffset, @@ -85,8 +84,10 @@ class JSONLScanSourceOpDesc extends ScanSourceOpDesc { */ @Override def inferSchema(): Schema = { - val (filepath, fileDesc) = determineFilePathOrDatasetFile() - val stream = createInputStream(filepath, fileDesc) + if (fileUri.isEmpty) { + return null + } + val stream = DocumentFactory.newReadonlyDocument(new URI(fileUri.get)).asInputStream() val reader = new BufferedReader(new InputStreamReader(stream, fileEncoding.getCharset)) var fieldNames = Set[String]() diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/json/JSONLScanSourceOpExec.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/json/JSONLScanSourceOpExec.scala index e57f93021ec..f58f9adcb8d 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/json/JSONLScanSourceOpExec.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/source/scan/json/JSONLScanSourceOpExec.scala @@ -1,20 +1,20 @@ package edu.uci.ics.texera.workflow.operators.source.scan.json import edu.uci.ics.amber.engine.common.executor.SourceOperatorExecutor -import edu.uci.ics.amber.engine.common.storage.DatasetFileDocument import edu.uci.ics.amber.engine.common.Utils.objectMapper import edu.uci.ics.amber.engine.common.model.tuple.AttributeTypeUtils.parseField import edu.uci.ics.amber.engine.common.model.tuple.{Schema, TupleLike} +import edu.uci.ics.amber.engine.common.storage.DocumentFactory import edu.uci.ics.texera.workflow.operators.source.scan.FileDecodingMethod import edu.uci.ics.texera.workflow.operators.source.scan.json.JSONUtil.JSONToMap import java.io.{BufferedReader, InputStreamReader} +import java.net.URI import scala.jdk.CollectionConverters.IteratorHasAsScala import scala.util.{Failure, Success, Try} class JSONLScanSourceOpExec private[json] ( - filePath: String, - datasetFileDesc: DatasetFileDocument, + fileUri: String, fileEncoding: FileDecodingMethod, startOffset: Int, endOffset: Int, @@ -42,7 +42,10 @@ class JSONLScanSourceOpExec private[json] ( override def open(): Unit = { schema = schemaFunc() reader = new BufferedReader( - new InputStreamReader(createInputStream(filePath, datasetFileDesc), fileEncoding.getCharset) + new InputStreamReader( + DocumentFactory.newReadonlyDocument(new URI(fileUri)).asInputStream(), + fileEncoding.getCharset + ) ) rows = reader.lines().iterator().asScala.slice(startOffset, endOffset) } diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/TestOperators.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/TestOperators.scala index 2dda4568e49..0b891472dcb 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/TestOperators.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/TestOperators.scala @@ -1,5 +1,6 @@ package edu.uci.ics.amber.engine.e2e +import edu.uci.ics.texera.workflow.common.storage.FileResolver import edu.uci.ics.texera.workflow.operators.aggregate.{ AggregateOpDesc, AggregationFunction, @@ -48,6 +49,7 @@ object TestOperators { csvHeaderlessOp.fileName = Some(fileName) csvHeaderlessOp.customDelimiter = Some(",") csvHeaderlessOp.hasHeader = header + csvHeaderlessOp.setFileUri(FileResolver.resolve(fileName)) csvHeaderlessOp } @@ -56,6 +58,7 @@ object TestOperators { val jsonlOp = new JSONLScanSourceOpDesc jsonlOp.fileName = Some(fileName) jsonlOp.flatten = flatten + jsonlOp.setFileUri(FileResolver.resolve(fileName)) jsonlOp } diff --git a/core/amber/src/test/scala/edu/uci/ics/texera/workflow/operators/source/scan/csv/CSVScanSourceOpDescSpec.scala b/core/amber/src/test/scala/edu/uci/ics/texera/workflow/operators/source/scan/csv/CSVScanSourceOpDescSpec.scala index 21ce3de60a0..858b2188309 100644 --- a/core/amber/src/test/scala/edu/uci/ics/texera/workflow/operators/source/scan/csv/CSVScanSourceOpDescSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/texera/workflow/operators/source/scan/csv/CSVScanSourceOpDescSpec.scala @@ -4,6 +4,7 @@ import edu.uci.ics.amber.engine.common.model.WorkflowContext import edu.uci.ics.amber.engine.common.model.tuple.{AttributeType, Schema} import edu.uci.ics.amber.engine.common.workflow.PortIdentity import WorkflowContext.{DEFAULT_EXECUTION_ID, DEFAULT_WORKFLOW_ID} +import edu.uci.ics.texera.workflow.common.storage.FileResolver import org.scalatest.BeforeAndAfter import org.scalatest.flatspec.AnyFlatSpec @@ -27,6 +28,9 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { parallelCsvScanSourceOpDesc.customDelimiter = Some(",") parallelCsvScanSourceOpDesc.hasHeader = true parallelCsvScanSourceOpDesc.setContext(workflowContext) + parallelCsvScanSourceOpDesc.setFileUri( + FileResolver.resolve(parallelCsvScanSourceOpDesc.fileName.get) + ) val inferredSchema: Schema = parallelCsvScanSourceOpDesc.inferSchema() assert(inferredSchema.getAttributes.length == 14) @@ -42,6 +46,9 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { parallelCsvScanSourceOpDesc.customDelimiter = Some(",") parallelCsvScanSourceOpDesc.hasHeader = false parallelCsvScanSourceOpDesc.setContext(workflowContext) + parallelCsvScanSourceOpDesc.setFileUri( + FileResolver.resolve(parallelCsvScanSourceOpDesc.fileName.get) + ) val inferredSchema: Schema = parallelCsvScanSourceOpDesc.inferSchema() @@ -56,6 +63,7 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { csvScanSourceOpDesc.customDelimiter = Some(",") csvScanSourceOpDesc.hasHeader = true csvScanSourceOpDesc.setContext(workflowContext) + csvScanSourceOpDesc.setFileUri(FileResolver.resolve(csvScanSourceOpDesc.fileName.get)) val inferredSchema: Schema = csvScanSourceOpDesc.inferSchema() @@ -70,6 +78,7 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { csvScanSourceOpDesc.customDelimiter = Some(",") csvScanSourceOpDesc.hasHeader = false csvScanSourceOpDesc.setContext(workflowContext) + csvScanSourceOpDesc.setFileUri(FileResolver.resolve(csvScanSourceOpDesc.fileName.get)) val inferredSchema: Schema = csvScanSourceOpDesc.inferSchema() @@ -85,6 +94,7 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { csvScanSourceOpDesc.customDelimiter = Some(";") csvScanSourceOpDesc.hasHeader = false csvScanSourceOpDesc.setContext(workflowContext) + csvScanSourceOpDesc.setFileUri(FileResolver.resolve(csvScanSourceOpDesc.fileName.get)) val inferredSchema: Schema = csvScanSourceOpDesc.inferSchema() @@ -100,6 +110,7 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { csvScanSourceOpDesc.customDelimiter = Some(";") csvScanSourceOpDesc.hasHeader = false csvScanSourceOpDesc.setContext(workflowContext) + csvScanSourceOpDesc.setFileUri(FileResolver.resolve(csvScanSourceOpDesc.fileName.get)) assert( !csvScanSourceOpDesc diff --git a/core/amber/src/test/scala/edu/uci/ics/texera/workflow/operators/source/scan/text/FileScanSourceOpDescSpec.scala b/core/amber/src/test/scala/edu/uci/ics/texera/workflow/operators/source/scan/text/FileScanSourceOpDescSpec.scala index a766be3606d..de1d60c51b8 100644 --- a/core/amber/src/test/scala/edu/uci/ics/texera/workflow/operators/source/scan/text/FileScanSourceOpDescSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/texera/workflow/operators/source/scan/text/FileScanSourceOpDescSpec.scala @@ -1,6 +1,7 @@ package edu.uci.ics.texera.workflow.operators.source.scan.text import edu.uci.ics.amber.engine.common.model.tuple.{AttributeType, Schema, SchemaEnforceable, Tuple} +import edu.uci.ics.texera.workflow.common.storage.FileResolver import edu.uci.ics.texera.workflow.operators.source.scan.{ FileAttributeType, FileDecodingMethod, @@ -18,7 +19,7 @@ class FileScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { before { fileScanSourceOpDesc = new FileScanSourceOpDesc() - fileScanSourceOpDesc.fileHandle = Left(TestTextFilePath) + fileScanSourceOpDesc.fileUri = Some(FileResolver.resolve(TestTextFilePath).toASCIIString) fileScanSourceOpDesc.fileEncoding = FileDecodingMethod.UTF_8 } @@ -61,8 +62,7 @@ class FileScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { fileScanSourceOpDesc.fileScanLimit = Option(5) val FileScanSourceOpExec = new FileScanSourceOpExec( - fileScanSourceOpDesc.fileHandle.left.getOrElse(""), - null, + fileScanSourceOpDesc.fileUri.getOrElse(""), fileScanSourceOpDesc.attributeType, fileScanSourceOpDesc.fileEncoding, fileScanSourceOpDesc.extract, @@ -87,13 +87,12 @@ class FileScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { } it should "read first 5 lines of the input text file with CRLF separators into corresponding output tuples" in { - fileScanSourceOpDesc.fileHandle = Left(TestCRLFTextFilePath) + fileScanSourceOpDesc.fileUri = Some(FileResolver.resolve(TestCRLFTextFilePath).toASCIIString) fileScanSourceOpDesc.attributeType = FileAttributeType.STRING fileScanSourceOpDesc.fileScanLimit = Option(5) val FileScanSourceOpExec = new FileScanSourceOpExec( - fileScanSourceOpDesc.fileHandle.left.getOrElse(""), - null, + fileScanSourceOpDesc.fileUri.getOrElse(""), fileScanSourceOpDesc.attributeType, fileScanSourceOpDesc.fileEncoding, fileScanSourceOpDesc.extract, @@ -121,8 +120,7 @@ class FileScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { fileScanSourceOpDesc.attributeType = FileAttributeType.SINGLE_STRING val FileScanSourceOpExec = new FileScanSourceOpExec( - fileScanSourceOpDesc.fileHandle.left.getOrElse(""), - null, + fileScanSourceOpDesc.fileUri.getOrElse(""), fileScanSourceOpDesc.attributeType, fileScanSourceOpDesc.fileEncoding, fileScanSourceOpDesc.extract, @@ -148,12 +146,11 @@ class FileScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { } it should "read first 5 lines of the input text into corresponding output INTEGER tuples" in { - fileScanSourceOpDesc.fileHandle = Left(TestNumbersFilePath) + fileScanSourceOpDesc.fileUri = Some(FileResolver.resolve(TestNumbersFilePath).toASCIIString) fileScanSourceOpDesc.attributeType = FileAttributeType.INTEGER fileScanSourceOpDesc.fileScanLimit = Option(5) val FileScanSourceOpExec = new FileScanSourceOpExec( - fileScanSourceOpDesc.fileHandle.left.getOrElse(""), - null, + fileScanSourceOpDesc.fileUri.getOrElse(""), fileScanSourceOpDesc.attributeType, fileScanSourceOpDesc.fileEncoding, fileScanSourceOpDesc.extract, @@ -178,14 +175,13 @@ class FileScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter { } it should "read first 5 lines of the input text file with US_ASCII encoding" in { - fileScanSourceOpDesc.fileHandle = Left(TestCRLFTextFilePath) + fileScanSourceOpDesc.fileUri = Some(FileResolver.resolve(TestCRLFTextFilePath).toASCIIString) fileScanSourceOpDesc.fileEncoding = FileDecodingMethod.ASCII fileScanSourceOpDesc.attributeType = FileAttributeType.STRING fileScanSourceOpDesc.fileScanLimit = Option(5) val FileScanSourceOpExec = new FileScanSourceOpExec( - fileScanSourceOpDesc.fileHandle.left.getOrElse(""), - null, + fileScanSourceOpDesc.fileUri.getOrElse(""), fileScanSourceOpDesc.attributeType, fileScanSourceOpDesc.fileEncoding, fileScanSourceOpDesc.extract,