Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
1a705ae
add initial lake fs based implementation
bobbai00 Jan 15, 2025
978c528
move lakefs logic to workflow core
bobbai00 Jan 16, 2025
cfd84b3
add uri related and lake fs document
bobbai00 Jan 16, 2025
ec3baf6
fix bugs
bobbai00 Jan 16, 2025
779f1df
a compilable version
bobbai00 Feb 16, 2025
a8bd16e
a runnable version
bobbai00 Feb 16, 2025
8ef7927
finish jwt auth
bobbai00 Feb 21, 2025
583ce58
make the backend work
bobbai00 Feb 22, 2025
3e1d0d6
keep refactoring the dataset resource
bobbai00 Feb 23, 2025
c690e8c
succinct the config parsing
bobbai00 Feb 23, 2025
2c93de6
test more APIs and closing to finish
bobbai00 Feb 24, 2025
dafa6e0
fix dataset creation and version creation
bobbai00 Feb 25, 2025
8ebccfb
fix the presigned get
bobbai00 Feb 25, 2025
7e3ad39
closing to finish the upload
bobbai00 Feb 25, 2025
b980628
refactor dataset frontend
bobbai00 Feb 26, 2025
01221f3
finish upload
bobbai00 Feb 26, 2025
1b46692
closing to finish the gui
bobbai00 Feb 26, 2025
7df961e
delete the lakefs test as the test environment don't have it
bobbai00 Feb 27, 2025
1fd8718
keep improving the backend and frontend
bobbai00 Feb 27, 2025
7416e1d
make the workflow be able to read from the dataset
bobbai00 Feb 27, 2025
e7a3b5c
adding python side dataset reader
bobbai00 Feb 28, 2025
1303092
keep improving the frontend
bobbai00 Feb 28, 2025
e4b7649
clean up the frontend
bobbai00 Feb 28, 2025
cf28dee
finish the export
bobbai00 Mar 1, 2025
280aad0
finalize the sharing feature
bobbai00 Mar 2, 2025
d53fd75
fix the delete
bobbai00 Mar 2, 2025
76b7e7e
recover the frontend change
bobbai00 Mar 3, 2025
ed44b77
fix test
bobbai00 Mar 3, 2025
b9cdfde
fix backend dependency and fix frontend
bobbai00 Mar 3, 2025
07464a9
cleanup the storage config
bobbai00 Mar 3, 2025
602033a
add more comments
bobbai00 Mar 3, 2025
074ff46
save the multipart chunk change on frontend
bobbai00 Mar 3, 2025
ae78df1
recover gui changes
bobbai00 Mar 3, 2025
4f83db0
do the rebase
bobbai00 Mar 4, 2025
166b1f2
add the flag for controlling whether to select files from dataset
bobbai00 Mar 4, 2025
ba9dfcf
add default values for lakeFS+S3
bobbai00 Mar 4, 2025
b356e76
fmt
bobbai00 Mar 4, 2025
0c26e27
add file service to part of the scripts
bobbai00 Mar 4, 2025
1d71e74
resolve comments and fix the py udf document
bobbai00 Mar 6, 2025
ee60cf9
fmt python
bobbai00 Mar 6, 2025
de73637
fmt and fix the version of docker compose
bobbai00 Mar 6, 2025
2e0ab31
try to fix the cors issue
bobbai00 Mar 7, 2025
f9bc34e
fmt py file
bobbai00 Mar 7, 2025
0c55cff
add header for put
bobbai00 Mar 7, 2025
fe03345
fmt UDF
bobbai00 Mar 7, 2025
efa49fe
keep refining
bobbai00 Mar 9, 2025
77467a5
update the docker compose
bobbai00 Mar 10, 2025
cf8d460
remove the header in the dataset.service.ts fetch
bobbai00 Mar 10, 2025
b470e18
improve the upload
bobbai00 Mar 11, 2025
5620944
add the concurrency in the config
bobbai00 Mar 11, 2025
ab37cf0
add more comments on the env
bobbai00 Mar 11, 2025
ef43ca9
add cancel feature
bobbai00 Mar 11, 2025
e294ac7
add the bold
bobbai00 Mar 11, 2025
bebc938
fmt
bobbai00 Mar 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/amber/src/main/python/pytexera/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Iterator, Optional, Union

from pyamber import *
from .storage.dataset_file_document import DatasetFileDocument
from .udf.udf_operator import (
UDFOperatorV2,
UDFTableOperator,
Expand All @@ -22,6 +23,7 @@
"UDFTableOperator",
"UDFBatchOperator",
"UDFSourceOperator",
"DatasetFileDocument",
# export external tools to be used
"overrides",
"logger",
Expand Down
3 changes: 3 additions & 0 deletions core/amber/src/main/python/pytexera/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .dataset_file_document import DatasetFileDocument

__all__ = ["DatasetFileDocument"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import os
import io
import requests
import urllib.parse


class DatasetFileDocument:
def __init__(self, file_path: str):
"""
Parses the file path into dataset metadata.

:param file_path:
Expected format - "/ownerEmail/datasetName/versionName/fileRelativePath"
Example: "/bob@texera.com/twitterDataset/v1/california/irvine/tw1.csv"
"""
parts = file_path.strip("/").split("/")
if len(parts) < 4:
raise ValueError(
"Invalid file path format. "
"Expected: /ownerEmail/datasetName/versionName/fileRelativePath"
)

self.owner_email = parts[0]
self.dataset_name = parts[1]
self.version_name = parts[2]
self.file_relative_path = "/".join(parts[3:])

self.jwt_token = os.getenv("USER_JWT_TOKEN")
self.presign_endpoint = os.getenv("PRESIGN_API_ENDPOINT")

if not self.jwt_token:
raise ValueError(
"JWT token is required but not set in environment variables."
)
if not self.presign_endpoint:
self.presign_endpoint = "http://localhost:9092/api/dataset/presign-download"

def get_presigned_url(self) -> str:
"""
Requests a presigned URL from the API.

:return: The presigned URL as a string.
:raises: RuntimeError if the request fails.
"""
headers = {"Authorization": f"Bearer {self.jwt_token}"}
encoded_file_path = urllib.parse.quote(
f"/{self.owner_email}"
f"/{self.dataset_name}"
f"/{self.version_name}"
f"/{self.file_relative_path}"
)

params = {"filePath": encoded_file_path}

response = requests.get(self.presign_endpoint, headers=headers, params=params)

if response.status_code != 200:
raise RuntimeError(
f"Failed to get presigned URL: "
f"{response.status_code} {response.text}"
)

return response.json().get("presignedUrl")

def read_file(self) -> io.BytesIO:
"""
Reads the file content from the presigned URL.

:return: A file-like object.
:raises: RuntimeError if the retrieval fails.
"""
presigned_url = self.get_presigned_url()
response = requests.get(presigned_url)

if response.status_code != 200:
raise RuntimeError(
f"Failed to retrieve file content: "
f"{response.status_code} {response.text}"
)

return io.BytesIO(response.content)
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package edu.uci.ics.texera.web

import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.github.dirkraft.dropwizard.fileassets.FileAssetsBundle
import com.typesafe.scalalogging.LazyLogging
Expand All @@ -17,14 +16,7 @@ import edu.uci.ics.texera.web.resource.dashboard.DashboardResource
import edu.uci.ics.texera.web.resource.dashboard.admin.execution.AdminExecutionResource
import edu.uci.ics.texera.web.resource.dashboard.admin.user.AdminUserResource
import edu.uci.ics.texera.web.resource.dashboard.hub.HubResource
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.`type`.{
DatasetFileNode,
DatasetFileNodeSerializer
}
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.{
DatasetAccessResource,
DatasetResource
}
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResource
import edu.uci.ics.texera.web.resource.dashboard.user.project.{
ProjectAccessResource,
ProjectResource,
Expand Down Expand Up @@ -89,11 +81,6 @@ class TexeraWebApplication
bootstrap.addBundle(new WebsocketBundle(classOf[CollaborationResource]))
// register scala module to dropwizard default object mapper
bootstrap.getObjectMapper.registerModule(DefaultScalaModule)

// register a new custom module and add the custom serializer into it
val customSerializerModule = new SimpleModule("CustomSerializers")
customSerializerModule.addSerializer(classOf[DatasetFileNode], new DatasetFileNodeSerializer())
bootstrap.getObjectMapper.registerModule(customSerializerModule)
}

override def run(configuration: TexeraWebConfiguration, environment: Environment): Unit = {
Expand Down Expand Up @@ -146,7 +133,6 @@ class TexeraWebApplication
environment.jersey.register(classOf[ResultResource])
environment.jersey.register(classOf[HubResource])
environment.jersey.register(classOf[WorkflowVersionResource])
environment.jersey.register(classOf[DatasetResource])
environment.jersey.register(classOf[DatasetAccessResource])
environment.jersey.register(classOf[ProjectResource])
environment.jersey.register(classOf[ProjectAccessResource])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{
Dataset,
DatasetUserAccess,
DatasetVersion,
User
DatasetVersion
}
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResource._
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetResource.{context, _}
Expand Down Expand Up @@ -200,28 +199,6 @@ object DatasetResource {
DatasetOperation(filesToAdd.toMap, filesToRemove.toList)
}

/**
* Create a new dataset version by adding new files
* @param did the target dataset id
* @param user the user submitting the request
* @param filesToAdd the map containing the files to add
* @return the created dataset version
*/
def createNewDatasetVersionByAddingFiles(
did: Integer,
user: User,
filesToAdd: Map[java.nio.file.Path, InputStream]
): Option[DashboardDatasetVersion] = {
applyDatasetOperationToCreateNewVersion(
context,
did,
user.getUid,
user.getEmail,
"",
DatasetOperation(filesToAdd, List())
)
}

// apply the dataset operation to create a new dataset version
// it returns the created dataset version if creation succeed, else return None
// concurrency control is performed here: the thread has to have the lock in order to create the new version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@ import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.tuple.Tuple
import edu.uci.ics.amber.core.virtualidentity.{OperatorIdentity, WorkflowIdentity}
import edu.uci.ics.amber.core.workflow.PortIdentity
import edu.uci.ics.amber.util.{ArrowUtils, PathUtils}
import edu.uci.ics.amber.util.ArrowUtils
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.User
import edu.uci.ics.texera.web.model.websocket.request.ResultExportRequest
import edu.uci.ics.texera.web.model.websocket.response.ResultExportResponse
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetResource.createNewDatasetVersionByAddingFiles
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.{
WorkflowExecutionsResource,
WorkflowVersionResource
}
import edu.uci.ics.texera.web.service.WorkflowExecutionService.getLatestExecutionId

import java.io.{FilterOutputStream, IOException, OutputStream, PipedInputStream, PipedOutputStream}
import java.io.{FilterOutputStream, IOException, OutputStream}
import java.nio.channels.Channels
import java.nio.charset.StandardCharsets
import java.time.LocalDateTime
Expand All @@ -33,6 +32,10 @@ import org.apache.arrow.vector.ipc.ArrowFileWriter
import org.apache.commons.lang3.StringUtils
import javax.ws.rs.WebApplicationException
import javax.ws.rs.core.StreamingOutput
import edu.uci.ics.texera.web.auth.JwtAuth
import edu.uci.ics.texera.web.auth.JwtAuth.{TOKEN_EXPIRE_TIME_IN_DAYS, dayToMin, jwtClaims}

import java.net.{HttpURLConnection, URL, URLEncoder}

/**
* A simple wrapper that ignores 'close()' calls on the underlying stream.
Expand All @@ -52,6 +55,14 @@ object ResultExportService {
// Matches the remote's approach for a thread pool
final private val pool: ThreadPoolExecutor =
Executors.newFixedThreadPool(3).asInstanceOf[ThreadPoolExecutor]

lazy val fileServiceUploadOneFileToDatasetEndpoint: String =
sys.env
.getOrElse(
"FILE_SERVICE_UPLOAD_ONE_FILE_TO_DATASET_ENDPOINT",
"http://localhost:9092/api/dataset/did/upload"
)
.trim
}

class ResultExportService(workflowIdentity: WorkflowIdentity) {
Expand Down Expand Up @@ -156,23 +167,22 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) {
results: Iterable[Tuple],
headers: List[String]
): (Option[String], Option[String]) = {
val fileName = generateFileName(request, operatorId, "csv")
try {
val pipedOutputStream = new PipedOutputStream()
val pipedInputStream = new PipedInputStream(pipedOutputStream)

pool.submit(new Runnable {
override def run(): Unit = {
val writer = CSVWriter.open(pipedOutputStream)
saveToDatasets(
request,
user,
outputStream => {
val writer = CSVWriter.open(outputStream)
writer.writeRow(headers)
results.foreach { tuple =>
writer.writeRow(tuple.getFields.toIndexedSeq)
}
writer.close()
}
})

val fileName = generateFileName(request, operatorId, "csv")
saveToDatasets(request, user, pipedInputStream, fileName)
},
fileName
)
(Some(s"CSV export done for operator $operatorId -> file: $fileName"), None)
} catch {
case ex: Exception =>
Expand Down Expand Up @@ -202,17 +212,15 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) {
val field = selectedRow.getField(columnIndex)
val dataBytes: Array[Byte] = convertFieldToBytes(field)

val pipedOutputStream = new PipedOutputStream()
val pipedInputStream = new PipedInputStream(pipedOutputStream)

pool.submit(new Runnable {
override def run(): Unit = {
pipedOutputStream.write(dataBytes)
pipedOutputStream.close()
}
})

saveToDatasets(request, user, pipedInputStream, fileName)
saveToDatasets(
request,
user,
outputStream => {
outputStream.write(dataBytes)
outputStream.close()
},
fileName
)
(Some(s"Data export done for operator $operatorId -> file: $fileName"), None)
} catch {
case ex: Exception =>
Expand Down Expand Up @@ -242,24 +250,24 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) {
}

try {
val pipedOutputStream = new PipedOutputStream()
val pipedInputStream = new PipedInputStream(pipedOutputStream)
val allocator = new RootAllocator()

pool.submit(() => {
Using.Manager { use =>
val (writer, root) = createArrowWriter(results, allocator, pipedOutputStream)
use(writer)
use(root)
use(allocator)
use(pipedOutputStream)

writeArrowData(writer, root, results)
}
})

val fileName = generateFileName(request, operatorId, "arrow")
saveToDatasets(request, user, pipedInputStream, fileName)

saveToDatasets(
request,
user,
outputStream => {
val allocator = new RootAllocator()
Using.Manager { use =>
val (writer, root) = createArrowWriter(results, allocator, outputStream)
use(writer)
use(root)
use(allocator)

writeArrowData(writer, root, results)
}
},
fileName
)

(Some(s"Arrow file export done for operator $operatorId -> file: $fileName"), None)
} catch {
Expand Down Expand Up @@ -333,17 +341,47 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) {
private def saveToDatasets(
request: ResultExportRequest,
user: User,
pipedInputStream: PipedInputStream,
fileWriter: OutputStream => Unit, // Pass function that writes data
fileName: String
): Unit = {
request.datasetIds.foreach { did =>
val datasetPath = PathUtils.getDatasetPath(did)
val filePath = datasetPath.resolve(fileName)
createNewDatasetVersionByAddingFiles(
did,
user,
Map(filePath -> pipedInputStream)
val encodedFilePath = URLEncoder.encode(fileName, StandardCharsets.UTF_8.name())
val message = URLEncoder.encode(
s"Export from workflow ${request.workflowName}",
StandardCharsets.UTF_8.name()
)

val uploadUrl = s"$fileServiceUploadOneFileToDatasetEndpoint"
.replace("did", did.toString) + s"?filePath=$encodedFilePath&message=$message"

var connection: HttpURLConnection = null
try {
val url = new URL(uploadUrl)
connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setDoOutput(true)
connection.setRequestMethod("POST")
connection.setRequestProperty("Content-Type", "application/octet-stream")
connection.setRequestProperty(
"Authorization",
s"Bearer ${JwtAuth.jwtToken(jwtClaims(user, dayToMin(TOKEN_EXPIRE_TIME_IN_DAYS)))}"
)

// Get output stream from connection
val outputStream = connection.getOutputStream
fileWriter(outputStream) // Write directly to HTTP request output stream
outputStream.close()

// Check response
val responseCode = connection.getResponseCode
if (responseCode != HttpURLConnection.HTTP_OK) {
throw new RuntimeException(s"Failed to upload file. Server responded with: $responseCode")
}
} catch {
case e: Exception =>
throw new RuntimeException(s"Error uploading file to dataset $did: ${e.getMessage}", e)
} finally {
if (connection != null) connection.disconnect()
}
}
}

Expand Down
Loading
Loading