Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
1fe9f17
add itemized file document and partition document
bobbai00 Dec 10, 2024
219b82d
add unit test for PartitionDocument
bobbai00 Dec 10, 2024
e446e9c
add more to unit tests
bobbai00 Dec 10, 2024
9627b25
make PartitionDocument return T
bobbai00 Dec 11, 2024
b85fd45
fix partition document test
bobbai00 Dec 11, 2024
8e6fec3
refining the documents
bobbai00 Dec 12, 2024
288aea4
add type R to PartitionedItemizedFileDocument
bobbai00 Dec 12, 2024
c3a1d00
do a rename
bobbai00 Dec 12, 2024
97c601e
adding the arrow file document, TODO: fix the test
bobbai00 Dec 12, 2024
e2c5515
pass the compilation
bobbai00 Dec 12, 2024
c17a54e
finish arrow document
bobbai00 Dec 14, 2024
bc38cc4
start to add some iceberg related
bobbai00 Dec 16, 2024
51dd7cf
finish initial iceberg writer
bobbai00 Dec 17, 2024
481c437
finish initial version of iceberg
bobbai00 Dec 19, 2024
0274f66
refactor test parts
bobbai00 Dec 19, 2024
4663fef
finish 1st viable version
bobbai00 Dec 19, 2024
9607f98
fix the append read
bobbai00 Dec 19, 2024
d2d0ed7
finish append read
bobbai00 Dec 20, 2024
f4ea0e3
finish concurrent write test
bobbai00 Dec 20, 2024
e46311c
resolve the binary type issue
bobbai00 Dec 20, 2024
5467b6f
refactor the util and config
bobbai00 Dec 20, 2024
d92a5ad
add a simple implementation for getRange and getAfter
bobbai00 Dec 21, 2024
15508b9
try to add iceberg as new type of result storage
bobbai00 Dec 21, 2024
c234dfd
closing to fix the dependency
bobbai00 Dec 21, 2024
4213a5b
fix the websocket connection
bobbai00 Dec 21, 2024
11ce821
add create override
bobbai00 Dec 22, 2024
129453e
add more comments and adjust the dependency
bobbai00 Dec 22, 2024
d553e32
add worker id when creating the writer
bobbai00 Dec 30, 2024
78df063
drop the write lock for iceberg table writer
bobbai00 Dec 30, 2024
92e2caf
clean up the build sbt
bobbai00 Dec 30, 2024
bb6961a
fix py result storage issue
bobbai00 Dec 30, 2024
1be10bf
clean up the iceberg document
bobbai00 Dec 30, 2024
7adfda4
clean up the iceberg writer
bobbai00 Dec 30, 2024
4617564
add more comments on the iceberg util
bobbai00 Dec 30, 2024
13731cb
add more comments
bobbai00 Dec 30, 2024
2baa661
refactor local file IO
bobbai00 Dec 30, 2024
8639579
Merge branch 'master' into jiadong-add-file-result-storage
bobbai00 Dec 30, 2024
e105913
merge master
bobbai00 Dec 30, 2024
4cf144b
Merge branch 'master' into jiadong-add-file-result-storage
bobbai00 Dec 30, 2024
9b69f59
cleanup the config
bobbai00 Dec 30, 2024
60445e6
Merge remote-tracking branch 'origin/jiadong-add-file-result-storage'…
bobbai00 Dec 30, 2024
9a482b1
cleanup the clear logic
bobbai00 Dec 30, 2024
decab8d
fmt
bobbai00 Dec 30, 2024
9cb2674
refactor the test to use the test db
bobbai00 Dec 30, 2024
51d8a1e
make the test harder
bobbai00 Dec 30, 2024
39b0448
make the test more clean
bobbai00 Dec 30, 2024
2655dae
Merge branch 'master' into jiadong-add-file-result-storage
bobbai00 Jan 1, 2025
73106dd
incorporate worker idx to sink
bobbai00 Jan 1, 2025
a2e53b5
add format version and row lineage to the iceberg table
bobbai00 Jan 1, 2025
cffafe0
Merge branch 'master' into jiadong-add-file-result-storage
bobbai00 Jan 1, 2025
f54e38c
Revert "add format version and row lineage to the iceberg table"
bobbai00 Jan 1, 2025
7176864
fix iceberg util spec
bobbai00 Jan 1, 2025
76dd31c
try to add the record id
bobbai00 Jan 2, 2025
31070be
try debugging the test
bobbai00 Jan 2, 2025
1156db4
half way to have a consistent order
bobbai00 Jan 2, 2025
c712c1d
fix the get range
bobbai00 Jan 3, 2025
a16ff80
fix the get's refresh
bobbai00 Jan 3, 2025
d2e710f
add getAfter test
bobbai00 Jan 3, 2025
a8bb3db
remove redundant dependency
bobbai00 Jan 3, 2025
6a69575
remove redundant file document
bobbai00 Jan 5, 2025
0e28267
add hadoop catalog
bobbai00 Jan 5, 2025
0506e4c
remove local file io by using hadoop file io
bobbai00 Jan 5, 2025
bfe0696
clear up the catalog, just keep hadoop catalog
bobbai00 Jan 6, 2025
9861246
add more comments
bobbai00 Jan 6, 2025
fe543f7
do more cleanup
bobbai00 Jan 6, 2025
afd5d78
clean up the dependencies
bobbai00 Jan 6, 2025
3282aa3
clean up the dependencies
bobbai00 Jan 6, 2025
9238d27
Merge branch 'master' into jiadong-add-file-result-storage
shengquan-ni Jan 6, 2025
cfcd7f9
fix the count
bobbai00 Jan 6, 2025
bddf022
Merge remote-tracking branch 'origin/jiadong-add-file-result-storage'…
bobbai00 Jan 6, 2025
4b56645
fix the dependency
bobbai00 Jan 6, 2025
75e554d
trigger the CI
bobbai00 Jan 7, 2025
ecaeca5
Merge branch 'master' into jiadong-add-file-result-storage
bobbai00 Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ trait InitializeExecutorHandler {
case OpExecWithCode(code, _) => ExecFactory.newExecFromJavaCode(code)
case OpExecSink(storageKey, workflowIdentity, outputMode) =>
new ProgressiveSinkOpExec(
workerIdx,
outputMode,
storageKey,
workflowIdentity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with
val storageType = collection.get("storageType").asText()
val collectionName = collection.get("storageKey").asText()
storageType match {
case OpResultStorage.MEMORY =>
case OpResultStorage.MEMORY | OpResultStorage.ICEBERG =>
// rely on the server-side result cleanup logic.
case OpResultStorage.MONGODB =>
MongoDatabaseManager.dropCollection(collectionName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ class WorkflowCompiler(
val storageKey =
OpResultStorage.createStorageKey(physicalOp.id.logicalOpId, outputPortId)

// Determine the storage type, defaulting to memory for large HTML visualizations
// Determine the storage type, defaulting to iceberg for large HTML visualizations
val storageType =
if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.MEMORY
if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.ICEBERG
else OpResultStorage.defaultStorageMode

if (!storage.contains(storageKey)) {
Expand Down
23 changes: 21 additions & 2 deletions core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,37 @@ lazy val WorkflowCore = (project in file("workflow-core"))
.dependsOn(DAO)
.configs(Test)
.dependsOn(DAO % "test->test") // test scope dependency
lazy val WorkflowOperator = (project in file("workflow-operator")).dependsOn(WorkflowCore)
lazy val WorkflowOperator = (project in file("workflow-operator"))
.dependsOn(WorkflowCore)
.settings(
dependencyOverrides ++= Seq(
"org.apache.commons" % "commons-compress" % "1.23.0", // because of the dependency introduced by iceberg
)
)
lazy val WorkflowCompilingService = (project in file("workflow-compiling-service"))
.dependsOn(WorkflowOperator)
.settings(
dependencyOverrides ++= Seq(
// override it as io.dropwizard 4 require 2.16.1 or higher
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.1"
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.16.1",
"org.glassfish.jersey.core" % "jersey-common" % "3.0.12"
)
)

lazy val WorkflowExecutionService = (project in file("amber"))
.dependsOn(WorkflowOperator)
.settings(
dependencyOverrides ++= Seq(
"com.fasterxml.jackson.core" % "jackson-core" % "2.15.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.15.1",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.15.1",
"org.slf4j" % "slf4j-api" % "1.7.26",
"org.eclipse.jetty" % "jetty-server" % "9.4.20.v20190813",
"org.eclipse.jetty" % "jetty-servlet" % "9.4.20.v20190813",
"org.eclipse.jetty" % "jetty-http" % "9.4.20.v20190813",
)
)
.configs(Test)
.dependsOn(DAO % "test->test") // test scope dependency

Expand Down
50 changes: 49 additions & 1 deletion core/workflow-core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,54 @@ val arrowDependencies = Seq(

libraryDependencies ++= arrowDependencies

/////////////////////////////////////////////////////////////////////////////
// Iceberg-related Dependencies
/////////////////////////////////////////////////////////////////////////////
val excludeJersey = ExclusionRule(organization = "com.sun.jersey")
val excludeGlassfishJersey = ExclusionRule(organization = "org.glassfish.jersey")
val excludeSlf4j = ExclusionRule(organization = "org.slf4j")
val excludeJetty = ExclusionRule(organization = "org.eclipse.jetty")
val excludeJsp = ExclusionRule(organization = "javax.servlet.jsp")
val excludeXmlBind = ExclusionRule(organization = "javax.xml.bind")
val excludeJackson = ExclusionRule(organization = "com.fasterxml.jackson.core")
val excludeJacksonModule = ExclusionRule(organization = "com.fasterxml.jackson.module")

libraryDependencies ++= Seq(
"org.apache.iceberg" % "iceberg-api" % "1.7.1",
"org.apache.iceberg" % "iceberg-parquet" % "1.7.1" excludeAll(
excludeJackson,
excludeJacksonModule
),
"org.apache.iceberg" % "iceberg-core" % "1.7.1" excludeAll(
excludeJackson,
excludeJacksonModule
),
"org.apache.iceberg" % "iceberg-data" % "1.7.1" excludeAll(
excludeJackson,
excludeJacksonModule
),
"org.apache.hadoop" % "hadoop-common" % "3.3.1" excludeAll(
excludeXmlBind,
excludeGlassfishJersey,
excludeJersey,
excludeSlf4j,
excludeJetty,
excludeJsp,
excludeJackson,
excludeJacksonModule
),
"org.apache.hadoop" % "hadoop-mapreduce-client-core" % "3.3.1" excludeAll(
excludeXmlBind,
excludeGlassfishJersey,
excludeJersey,
excludeSlf4j,
excludeJetty,
excludeJsp,
excludeJackson,
excludeJacksonModule
),
)

/////////////////////////////////////////////////////////////////////////////
// Additional Dependencies
/////////////////////////////////////////////////////////////////////////////
Expand All @@ -123,5 +171,5 @@ libraryDependencies ++= Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5", // Scala Logging
"org.eclipse.jgit" % "org.eclipse.jgit" % "5.13.0.202109080827-r", // jgit
"org.yaml" % "snakeyaml" % "1.30", // yaml reader (downgrade to 1.30 due to dropwizard 1.3.23 required by amber)
"org.apache.commons" % "commons-vfs2" % "2.9.0" // for FileResolver throw VFS-related exceptions
"org.apache.commons" % "commons-vfs2" % "2.9.0", // for FileResolver throw VFS-related exceptions
)
14 changes: 13 additions & 1 deletion core/workflow-core/src/main/resources/storage-config.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
storage:
result-storage-mode: memory
result-storage-mode: iceberg
mongodb:
url: "mongodb://localhost:27017"
database: "texera_storage"
commit-batch-size: 1000
iceberg:
table:
namespace: "operator-result"
commit:
batch-size: 4096 # decide the buffer size of our IcebergTableWriter
retry:
# retry configures the OCC parameter for concurrent write operations in Iceberg
# Docs about Reliability in Iceberg: https://iceberg.apache.org/docs/1.7.1/reliability/
# Docs about full parameter list and their meaning: https://iceberg.apache.org/docs/1.7.1/configuration/#write-properties
num-retries: 10
min-wait-ms: 100 # 0.1s
max-wait-ms: 10000 # 10s
jdbc:
url: "jdbc:mysql://localhost:3306/texera_db?serverTimezone=UTC"
username: ""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package edu.uci.ics.amber.core.storage

import edu.uci.ics.amber.util.IcebergUtil
import org.apache.iceberg.catalog.Catalog

/**
* IcebergCatalogInstance is a singleton that manages the Iceberg catalog instance.
* - Provides a single shared catalog for all Iceberg table-related operations in the Texera application.
* - Lazily initializes the catalog on first access.
* - Supports replacing the catalog instance primarily for testing or reconfiguration.
*/
object IcebergCatalogInstance {

private var instance: Option[Catalog] = None

/**
* Retrieves the singleton Iceberg catalog instance.
* - If the catalog is not initialized, it is lazily created using the configured properties.
* @return the Iceberg catalog instance.
*/
def getInstance(): Catalog = {
instance match {
case Some(catalog) => catalog
case None =>
val hadoopCatalog = IcebergUtil.createHadoopCatalog(
"texera_iceberg",
StorageConfig.fileStorageDirectoryPath
)
instance = Some(hadoopCatalog)
hadoopCatalog
}
}

/**
* Replaces the existing Iceberg catalog instance.
* - This method is useful for testing or dynamically updating the catalog.
*
* @param catalog the new Iceberg catalog instance to replace the current one.
*/
def replaceInstance(catalog: Catalog): Unit = {
instance = Some(catalog)
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package edu.uci.ics.amber.core.storage

import edu.uci.ics.amber.util.PathUtils.corePath
import org.yaml.snakeyaml.Yaml

import java.nio.file.Path
import java.util.{Map => JMap}
import scala.jdk.CollectionConverters._

Expand All @@ -13,34 +15,94 @@ object StorageConfig {

val storageMap = javaConf("storage").asInstanceOf[JMap[String, Any]].asScala.toMap
val mongodbMap = storageMap("mongodb").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergMap = storageMap("iceberg").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergTableMap = icebergMap("table").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergCommitMap = icebergTableMap("commit").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergRetryMap = icebergCommitMap("retry").asInstanceOf[JMap[String, Any]].asScala.toMap
val jdbcMap = storageMap("jdbc").asInstanceOf[JMap[String, Any]].asScala.toMap
javaConf
.updated("storage", storageMap.updated("mongodb", mongodbMap).updated("jdbc", jdbcMap))

javaConf.updated(
"storage",
storageMap
.updated("mongodb", mongodbMap)
.updated(
"iceberg",
icebergMap
.updated(
"table",
icebergTableMap.updated(
"commit",
icebergCommitMap.updated("retry", icebergRetryMap)
)
)
)
.updated("jdbc", jdbcMap)
)
}

// Result storage mode
val resultStorageMode: String =
conf("storage").asInstanceOf[Map[String, Any]]("result-storage-mode").asInstanceOf[String]

// For MongoDB specifics
// MongoDB configurations
val mongodbUrl: String = conf("storage")
.asInstanceOf[Map[String, Any]]("mongodb")
.asInstanceOf[Map[String, Any]]("url")
.asInstanceOf[String]

val mongodbDatabaseName: String = conf("storage")
.asInstanceOf[Map[String, Any]]("mongodb")
.asInstanceOf[Map[String, Any]]("database")
.asInstanceOf[String]

val mongodbBatchSize: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("mongodb")
.asInstanceOf[Map[String, Any]]("commit-batch-size")
.asInstanceOf[Int]

val icebergTableNamespace: String = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("namespace")
.asInstanceOf[String]

val icebergTableCommitBatchSize: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("batch-size")
.asInstanceOf[Int]

val icebergTableCommitNumRetries: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("retry")
.asInstanceOf[Map[String, Any]]("num-retries")
.asInstanceOf[Int]

val icebergTableCommitMinRetryWaitMs: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("retry")
.asInstanceOf[Map[String, Any]]("min-wait-ms")
.asInstanceOf[Int]

val icebergTableCommitMaxRetryWaitMs: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("retry")
.asInstanceOf[Map[String, Any]]("max-wait-ms")
.asInstanceOf[Int]

// JDBC configurations
val jdbcUrl: String = conf("storage")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("url")
.asInstanceOf[String]

// For jdbc specifics
val jdbcUsername: String = conf("storage")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("username")
Expand All @@ -50,4 +112,8 @@ object StorageConfig {
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("password")
.asInstanceOf[String]

// File storage configurations
val fileStorageDirectoryPath: Path =
corePath.resolve("amber").resolve("user-resources").resolve("workflow-results")
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ abstract class VirtualDocument[T] extends ReadonlyVirtualDocument[T] {

/**
* return a writer that buffers the items and performs the flush operation at close time
* @param writerIdentifier the id of the writer, maybe required by some implementations
* @return a buffered item writer
*/
def writer(): BufferedItemWriter[T] =
def writer(writerIdentifier: String): BufferedItemWriter[T] =
throw new NotImplementedError("write method is not implemented")

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class MemoryDocument[T >: Null <: AnyRef](key: String)
results += item
}

override def writer(): BufferedItemWriter[T] = this
override def writer(writerIdentifier: String): BufferedItemWriter[T] = this

/**
* The size of the buffer for the buffered writer. This number is not used currently
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class MongoDocument[T >: Null <: AnyRef](
* Return a buffered item writer for the MongoDB collection.
* @return a new instance of MongoDBBufferedItemWriter.
*/
override def writer(): BufferedItemWriter[T] = {
override def writer(writerIdentifier: String): BufferedItemWriter[T] = {
new MongoDBBufferedItemWriter[T](
commitBatchSize,
id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package edu.uci.ics.amber.core.storage.result
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.storage.StorageConfig
import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.storage.result.iceberg.IcebergDocument
import edu.uci.ics.amber.core.tuple.{Schema, Tuple}
import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity
import edu.uci.ics.amber.core.workflow.PortIdentity
import edu.uci.ics.amber.util.IcebergUtil
import org.apache.iceberg.data.Record
import org.apache.iceberg.{Schema => IcebergSchema}

import java.util.concurrent.ConcurrentHashMap
import scala.jdk.CollectionConverters.IteratorHasAsScala
Expand All @@ -18,6 +22,7 @@ object OpResultStorage {
val defaultStorageMode: String = StorageConfig.resultStorageMode.toLowerCase
val MEMORY: String = "memory"
val MONGODB: String = "mongodb"
val ICEBERG = "iceberg"

/**
* Creates a unique storage key by combining operator and port identities.
Expand Down Expand Up @@ -112,7 +117,7 @@ class OpResultStorage extends Serializable with LazyLogging {
val storage: VirtualDocument[Tuple] =
if (mode == OpResultStorage.MEMORY) {
new MemoryDocument[Tuple](key)
} else {
} else if (mode == OpResultStorage.MONGODB) {
try {
new MongoDocument[Tuple](
executionId + key,
Expand All @@ -125,6 +130,19 @@ class OpResultStorage extends Serializable with LazyLogging {
logger.info(s"Falling back to memory storage for $key")
new MemoryDocument[Tuple](key)
}
} else {
val icebergSchema = IcebergUtil.toIcebergSchema(schema)
val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord
val deserde: (IcebergSchema, Record) => Tuple = (_, record) =>
IcebergUtil.fromRecord(record, schema)

new IcebergDocument[Tuple](
StorageConfig.icebergTableNamespace,
executionId + key,
icebergSchema,
serde,
deserde
)
}
cache.put(key, (storage, schema))
storage
Expand Down
Loading
Loading