diff --git a/.github/workflows/github-action-build.yml b/.github/workflows/github-action-build.yml index 963a88f8b23..46a1d27d2c4 100644 --- a/.github/workflows/github-action-build.yml +++ b/.github/workflows/github-action-build.yml @@ -80,6 +80,30 @@ jobs: - name: Run backend tests run: cd core/amber && sbt -v -J-Xmx2G test + micro-services: + strategy: + matrix: + os: [ ubuntu-latest ] + java-version: [ 11 ] + runs-on: ${{ matrix.os }} + steps: + - name: Checkout Texera + uses: actions/checkout@v2 + - name: Setup Java + uses: actions/setup-java@v2 + with: + distribution: 'temurin' + java-version: ${{ matrix.java-version }} + - uses: coursier/cache-action@v6 + with: + extraSbtFiles: '["core/micro-services/*.sbt", "core/micro-services/project/**.{scala,sbt}", "core/micro-services/project/build.properties" ]' + - name: Lint with scalafmt + run: cd core/micro-services && sbt scalafmtCheckAll + - name: Compile with sbt + run: cd core/micro-services && sbt clean package + - name: Run backend tests + run: cd core/micro-services && sbt test + python_udf: runs-on: ${{ matrix.os }} strategy: diff --git a/core/micro-services/.scalafix.conf b/core/micro-services/.scalafix.conf new file mode 100644 index 00000000000..c79b7c47340 --- /dev/null +++ b/core/micro-services/.scalafix.conf @@ -0,0 +1,9 @@ +rules = [ + ProcedureSyntax, + RemoveUnused, +] +RemoveUnused.imports = true +RemoveUnused.privates = true +RemoveUnused.locals = false +RemoveUnused.patternvars = false +RemoveUnused.params = false \ No newline at end of file diff --git a/core/micro-services/.scalafmt.conf b/core/micro-services/.scalafmt.conf new file mode 100644 index 00000000000..ffe5deb092b --- /dev/null +++ b/core/micro-services/.scalafmt.conf @@ -0,0 +1,2 @@ +version=2.6.4 +maxColumn = 100 diff --git a/core/micro-services/build.sbt b/core/micro-services/build.sbt index 3f4dc349c75..99095c9d0f5 100644 --- a/core/micro-services/build.sbt +++ b/core/micro-services/build.sbt @@ -1,21 +1,12 @@ +lazy val WorkflowCore = project in file("workflow-core") + // root project definition lazy val MicroServices = (project in file(".")) + .aggregate(WorkflowCore) .settings( name := "micro-services", - version := "0.1.0" + version := "0.1.0", + organization := "edu.uci.ics", + scalaVersion := "2.13.12", + publishMavenStyle := true ) - -// The template of the subproject: WorkflowCompilingService(as an example) -// lazy val WorkflowCompilingService = (project in file("workflow-compiling-service")) -// .settings( -// name := "WorkflowCompilingService", -// version := "0.1.0" -// libraryDependencies ++= Seq( -// "io.dropwizard" % "dropwizard-core" % "4.0.7", -// "com.typesafe" % "config" % "1.4.1", -// "com.fasterxml.jackson.core" % "jackson-databind" % "2.17.2", // Jackson Databind for JSON processing -// "com.fasterxml.jackson.core" % "jackson-annotations" % "2.17.2", // Jackson Annotations for JSON properties -// "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.17.2" // Jackson Scala module -// ) -// ) -// once this subproject is defined, aggregate it to the MicroServices definition \ No newline at end of file diff --git a/core/micro-services/project/build.properties b/core/micro-services/project/build.properties new file mode 100644 index 00000000000..4d5f78cc412 --- /dev/null +++ b/core/micro-services/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.9.9 \ No newline at end of file diff --git a/core/micro-services/project/plugins.sbt b/core/micro-services/project/plugins.sbt new file mode 100644 index 00000000000..9d62dd6d010 --- /dev/null +++ b/core/micro-services/project/plugins.sbt @@ -0,0 +1,7 @@ +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2") +addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.11.0") + +// for scalapb code gen +addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.6") +libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.1" +addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.9.16") \ No newline at end of file diff --git a/core/micro-services/workflow-core/build.sbt b/core/micro-services/workflow-core/build.sbt new file mode 100644 index 00000000000..e96c6a03776 --- /dev/null +++ b/core/micro-services/workflow-core/build.sbt @@ -0,0 +1,113 @@ +///////////////////////////////////////////////////////////////////////////// +// Project Settings +///////////////////////////////////////////////////////////////////////////// + +name := "workflow-core" +organization := "edu.uci.ics" +version := "0.1.0" +scalaVersion := "2.13.12" + +enablePlugins(JavaAppPackaging) + +// Enable semanticdb for Scalafix +ThisBuild / semanticdbEnabled := true +ThisBuild / semanticdbVersion := scalafixSemanticdb.revision + +// Manage dependency conflicts by always using the latest revision +ThisBuild / conflictManager := ConflictManager.latestRevision + +// Restrict parallel execution of tests to avoid conflicts +Global / concurrentRestrictions += Tags.limit(Tags.Test, 1) + + +///////////////////////////////////////////////////////////////////////////// +// Compiler Options +///////////////////////////////////////////////////////////////////////////// + +// Scala compiler options +Compile / scalacOptions ++= Seq( + "-Xelide-below", "WARNING", // Turn on optimizations with "WARNING" as the threshold + "-feature", // Check feature warnings + "-deprecation", // Check deprecation warnings + "-Ywarn-unused:imports" // Check for unused imports +) + + +///////////////////////////////////////////////////////////////////////////// +// ScalaPB Configuration +///////////////////////////////////////////////////////////////////////////// + +// Exclude some proto files +PB.generate / excludeFilter := "scalapb.proto" + +// Set the protoc version for ScalaPB +ThisBuild / PB.protocVersion := "3.19.4" + +// ScalaPB code generation for .proto files +Compile / PB.targets := Seq( + scalapb.gen(singleLineToProtoString = true) -> (Compile / sourceManaged).value +) + +// Mark the ScalaPB-generated directory as a generated source root +Compile / managedSourceDirectories += (Compile / sourceManaged).value + +// ScalaPB library dependencies +libraryDependencies ++= Seq( + "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf", + "com.thesamet.scalapb" %% "scalapb-json4s" % "0.12.0" // For ScalaPB 0.11.x +) + +// Enable protobuf compilation in Test +Test / PB.protoSources += PB.externalSourcePath.value + + +///////////////////////////////////////////////////////////////////////////// +// Test-related Dependencies +///////////////////////////////////////////////////////////////////////////// + +libraryDependencies ++= Seq( + "org.scalamock" %% "scalamock" % "5.2.0" % Test, // ScalaMock + "org.scalatest" %% "scalatest" % "3.2.15" % Test, // ScalaTest + "junit" % "junit" % "4.13.2" % Test, // JUnit + "com.novocode" % "junit-interface" % "0.11" % Test // SBT interface for JUnit +) + + +///////////////////////////////////////////////////////////////////////////// +// Jackson-related Dependencies +///////////////////////////////////////////////////////////////////////////// + +val jacksonVersion = "2.15.1" +libraryDependencies ++= Seq( + "com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion, // Jackson Databind + "com.fasterxml.jackson.module" % "jackson-module-kotlin" % jacksonVersion % Test, // Jackson Kotlin Module + "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % jacksonVersion % Test, // Jackson JDK8 Datatypes + "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % jacksonVersion % Test, // Jackson JSR310 + "com.fasterxml.jackson.datatype" % "jackson-datatype-joda" % jacksonVersion % Test, // Jackson Joda + "com.fasterxml.jackson.module" % "jackson-module-jsonSchema" % jacksonVersion, // JSON Schema Module + "com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion, // Scala Module + "com.fasterxml.jackson.module" % "jackson-module-no-ctor-deser" % jacksonVersion // No Constructor Deserializer +) + + +///////////////////////////////////////////////////////////////////////////// +// MongoDB-related Dependencies +///////////////////////////////////////////////////////////////////////////// + +libraryDependencies ++= Seq( + "org.mongodb" % "mongodb-driver-sync" % "5.0.0", // MongoDB driver + "org.apache.commons" % "commons-jcs3-core" % "3.2" // Apache Commons JCS +) + + +///////////////////////////////////////////////////////////////////////////// +// Additional Dependencies +///////////////////////////////////////////////////////////////////////////// + +libraryDependencies ++= Seq( + "com.github.sisyphsu" % "dateparser" % "1.0.11", // DateParser + "com.google.guava" % "guava" % "31.1-jre", // Guava + "org.ehcache" % "sizeof" % "0.4.3", // Ehcache SizeOf + "org.jgrapht" % "jgrapht-core" % "1.4.0", // JGraphT Core + "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5" // Scala Logging +) \ No newline at end of file diff --git a/core/micro-services/workflow-core/src/main/protobuf/edu/uci/ics/amber/virtualidentity.proto b/core/micro-services/workflow-core/src/main/protobuf/edu/uci/ics/amber/virtualidentity.proto new file mode 100644 index 00000000000..272e94954a1 --- /dev/null +++ b/core/micro-services/workflow-core/src/main/protobuf/edu/uci/ics/amber/virtualidentity.proto @@ -0,0 +1,45 @@ +syntax = "proto3"; + +package edu.uci.ics.amber; + +import "scalapb/scalapb.proto"; + +option (scalapb.options) = { + scope: FILE, + preserve_unknown_fields: false + no_default_values_in_constructor: true +}; + +message WorkflowIdentity { + int64 id = 1; +} + +message ExecutionIdentity { + int64 id = 1; +} + +message ActorVirtualIdentity { + string name = 1; +} + +message ChannelIdentity { + ActorVirtualIdentity fromWorkerId = 1 [(scalapb.field).no_box = true]; + ActorVirtualIdentity toWorkerId = 2 [(scalapb.field).no_box = true]; + bool isControl = 3; +} + +message OperatorIdentity { + string id = 1; +} + +message PhysicalOpIdentity{ + OperatorIdentity logicalOpId = 1 [(scalapb.field).no_box = true]; + string layerName = 2; +} + +message ChannelMarkerIdentity{ + string id = 1; +} + + + diff --git a/core/micro-services/workflow-core/src/main/protobuf/edu/uci/ics/amber/workflow.proto b/core/micro-services/workflow-core/src/main/protobuf/edu/uci/ics/amber/workflow.proto new file mode 100644 index 00000000000..5d279c4ad58 --- /dev/null +++ b/core/micro-services/workflow-core/src/main/protobuf/edu/uci/ics/amber/workflow.proto @@ -0,0 +1,38 @@ +syntax = "proto3"; + +package edu.uci.ics.amber; + +import "edu/uci/ics/amber/virtualidentity.proto"; +import "scalapb/scalapb.proto"; + +option (scalapb.options) = { + scope: FILE, + preserve_unknown_fields: false, + no_default_values_in_constructor: false +}; + +message PortIdentity { + int32 id = 1; + bool internal = 2; +} + +message InputPort { + PortIdentity id = 1 [(scalapb.field).no_box = true]; + string displayName = 2; + bool allowMultiLinks = 3; + repeated PortIdentity dependencies = 4; +} + +message OutputPort { + PortIdentity id = 1 [(scalapb.field).no_box = true]; + string displayName = 2; + bool blocking = 3; +} + + +message PhysicalLink { + PhysicalOpIdentity fromOpId = 1 [(scalapb.field).no_box = true]; + PortIdentity fromPortId = 2 [(scalapb.field).no_box = true]; + PhysicalOpIdentity toOpId = 3 [(scalapb.field).no_box = true]; + PortIdentity toPortId = 4 [(scalapb.field).no_box = true]; +} diff --git a/core/micro-services/workflow-core/src/main/protobuf/scalapb/scalapb.proto b/core/micro-services/workflow-core/src/main/protobuf/scalapb/scalapb.proto new file mode 100644 index 00000000000..bf58fe15204 --- /dev/null +++ b/core/micro-services/workflow-core/src/main/protobuf/scalapb/scalapb.proto @@ -0,0 +1,363 @@ +syntax = "proto2"; + +package scalapb; + +option java_package = "scalapb.options"; + +option (options) = { + package_name: "scalapb.options" + flat_package: true +}; + +import "google/protobuf/descriptor.proto"; + +message ScalaPbOptions { + // If set then it overrides the java_package and package. + optional string package_name = 1; + + // If true, the compiler does not append the proto base file name + // into the generated package name. If false (the default), the + // generated scala package name is the package_name.basename where + // basename is the proto file name without the .proto extension. + optional bool flat_package = 2; + + // Adds the following imports at the top of the file (this is meant + // to provide implicit TypeMappers) + repeated string import = 3; + + // Text to add to the generated scala file. This can be used only + // when single_file is true. + repeated string preamble = 4; + + // If true, all messages and enums (but not services) will be written + // to a single Scala file. + optional bool single_file = 5; + + // By default, wrappers defined at + // https://github.com/google/protobuf/blob/master/src/google/protobuf/wrappers.proto, + // are mapped to an Option[T] where T is a primitive type. When this field + // is set to true, we do not perform this transformation. + optional bool no_primitive_wrappers = 7; + + // DEPRECATED. In ScalaPB <= 0.5.47, it was necessary to explicitly enable + // primitive_wrappers. This field remains here for backwards compatibility, + // but it has no effect on generated code. It is an error to set both + // `primitive_wrappers` and `no_primitive_wrappers`. + optional bool primitive_wrappers = 6; + + // Scala type to be used for repeated fields. If unspecified, + // `scala.collection.Seq` will be used. + optional string collection_type = 8; + + // If set to true, all generated messages in this file will preserve unknown + // fields. + optional bool preserve_unknown_fields = 9 [default=true]; + + // If defined, sets the name of the file-level object that would be generated. This + // object extends `GeneratedFileObject` and contains descriptors, and list of message + // and enum companions. + optional string object_name = 10; + + // Whether to apply the options only to this file, or for the entire package (and its subpackages) + enum OptionsScope { + // Apply the options for this file only (default) + FILE = 0; + + // Apply the options for the entire package and its subpackages. + PACKAGE = 1; + } + // Experimental: scope to apply the given options. + optional OptionsScope scope = 11; + + // If true, lenses will be generated. + optional bool lenses = 12 [default=true]; + + // If true, then source-code info information will be included in the + // generated code - normally the source code info is cleared out to reduce + // code size. The source code info is useful for extracting source code + // location from the descriptors as well as comments. + optional bool retain_source_code_info = 13; + + // Scala type to be used for maps. If unspecified, + // `scala.collection.immutable.Map` will be used. + optional string map_type = 14; + + // If true, no default values will be generated in message constructors. + optional bool no_default_values_in_constructor = 15; + + /* Naming convention for generated enum values */ + enum EnumValueNaming { + AS_IN_PROTO = 0; // Enum value names in Scala use the same name as in the proto + CAMEL_CASE = 1; // Convert enum values to CamelCase in Scala. + } + optional EnumValueNaming enum_value_naming = 16; + + // Indicate if prefix (enum name + optional underscore) should be removed in scala code + // Strip is applied before enum value naming changes. + optional bool enum_strip_prefix = 17 [default=false]; + + // Scala type to use for bytes fields. + optional string bytes_type = 21; + + // Enable java conversions for this file. + optional bool java_conversions = 23; + + // AuxMessageOptions enables you to set message-level options through package-scoped options. + // This is useful when you can't add a dependency on scalapb.proto from the proto file that + // defines the message. + message AuxMessageOptions { + // The fully-qualified name of the message in the proto name space. + optional string target = 1; + + // Options to apply to the message. If there are any options defined on the target message + // they take precedence over the options. + optional MessageOptions options = 2; + } + + // AuxFieldOptions enables you to set field-level options through package-scoped options. + // This is useful when you can't add a dependency on scalapb.proto from the proto file that + // defines the field. + message AuxFieldOptions { + // The fully-qualified name of the field in the proto name space. + optional string target = 1; + + // Options to apply to the field. If there are any options defined on the target message + // they take precedence over the options. + optional FieldOptions options = 2; + } + + // AuxEnumOptions enables you to set enum-level options through package-scoped options. + // This is useful when you can't add a dependency on scalapb.proto from the proto file that + // defines the enum. + message AuxEnumOptions { + // The fully-qualified name of the enum in the proto name space. + optional string target = 1; + + // Options to apply to the enum. If there are any options defined on the target enum + // they take precedence over the options. + optional EnumOptions options = 2; + } + + // AuxEnumValueOptions enables you to set enum value level options through package-scoped + // options. This is useful when you can't add a dependency on scalapb.proto from the proto + // file that defines the enum. + message AuxEnumValueOptions { + // The fully-qualified name of the enum value in the proto name space. + optional string target = 1; + + // Options to apply to the enum value. If there are any options defined on + // the target enum value they take precedence over the options. + optional EnumValueOptions options = 2; + } + + // List of message options to apply to some messages. + repeated AuxMessageOptions aux_message_options = 18; + + // List of message options to apply to some fields. + repeated AuxFieldOptions aux_field_options = 19; + + // List of message options to apply to some enums. + repeated AuxEnumOptions aux_enum_options = 20; + + // List of enum value options to apply to some enum values. + repeated AuxEnumValueOptions aux_enum_value_options = 22; + + // List of preprocessors to apply. + repeated string preprocessors = 24; + + repeated FieldTransformation field_transformations = 25; + + // Ignores all transformations for this file. This is meant to allow specific files to + // opt out from transformations inherited through package-scoped options. + optional bool ignore_all_transformations = 26; + + // If true, getters will be generated. + optional bool getters = 27 [default=true]; + + // For use in tests only. Inhibit Java conversions even when when generator parameters + // request for it. + optional bool test_only_no_java_conversions = 999; + + extensions 1000 to max; +} + +extend google.protobuf.FileOptions { + // File-level optionals for ScalaPB. + // Extension number officially assigned by protobuf-global-extension-registry@google.com + optional ScalaPbOptions options = 1020; +} + +message MessageOptions { + // Additional classes and traits to mix in to the case class. + repeated string extends = 1; + + // Additional classes and traits to mix in to the companion object. + repeated string companion_extends = 2; + + // Custom annotations to add to the generated case class. + repeated string annotations = 3; + + // All instances of this message will be converted to this type. An implicit TypeMapper + // must be present. + optional string type = 4; + + // Custom annotations to add to the companion object of the generated class. + repeated string companion_annotations = 5; + + // Additional classes and traits to mix in to generated sealed_oneof base trait. + repeated string sealed_oneof_extends = 6; + + // If true, when this message is used as an optional field, do not wrap it in an `Option`. + // This is equivalent of setting `(field).no_box` to true on each field with the message type. + optional bool no_box = 7; + + // Custom annotations to add to the generated `unknownFields` case class field. + repeated string unknown_fields_annotations = 8; + + extensions 1000 to max; +} + +extend google.protobuf.MessageOptions { + // Message-level optionals for ScalaPB. + // Extension number officially assigned by protobuf-global-extension-registry@google.com + optional MessageOptions message = 1020; +} + +// Represents a custom Collection type in Scala. This allows ScalaPB to integrate with +// collection types that are different enough from the ones in the standard library. +message Collection { + // Type of the collection + optional string type = 1; + + // Set to true if this collection type is not allowed to be empty, for example + // cats.data.NonEmptyList. When true, ScalaPB will not generate `clearX` for the repeated + // field and not provide a default argument in the constructor. + optional bool non_empty = 2; + + // An Adapter is a Scala object available at runtime that provides certain static methods + // that can operate on this collection type. + optional string adapter = 3; +} + +message FieldOptions { + optional string type = 1; + + optional string scala_name = 2; + + // Can be specified only if this field is repeated. If unspecified, + // it falls back to the file option named `collection_type`, which defaults + // to `scala.collection.Seq`. + optional string collection_type = 3; + + optional Collection collection = 8; + + // If the field is a map, you can specify custom Scala types for the key + // or value. + optional string key_type = 4; + optional string value_type = 5; + + // Custom annotations to add to the field. + repeated string annotations = 6; + + // Can be specified only if this field is a map. If unspecified, + // it falls back to the file option named `map_type` which defaults to + // `scala.collection.immutable.Map` + optional string map_type = 7; + + // Do not box this value in Option[T]. If set, this overrides MessageOptions.no_box + optional bool no_box = 30; + + // Like no_box it does not box a value in Option[T], but also fails parsing when a value + // is not provided. This enables to emulate required fields in proto3. + optional bool required = 31; + + extensions 1000 to max; +} + +extend google.protobuf.FieldOptions { + // Field-level optionals for ScalaPB. + // Extension number officially assigned by protobuf-global-extension-registry@google.com + optional FieldOptions field = 1020; +} + +message EnumOptions { + // Additional classes and traits to mix in to the base trait + repeated string extends = 1; + + // Additional classes and traits to mix in to the companion object. + repeated string companion_extends = 2; + + // All instances of this enum will be converted to this type. An implicit TypeMapper + // must be present. + optional string type = 3; + + // Custom annotations to add to the generated enum's base class. + repeated string base_annotations = 4; + + // Custom annotations to add to the generated trait. + repeated string recognized_annotations = 5; + + // Custom annotations to add to the generated Unrecognized case class. + repeated string unrecognized_annotations = 6; + + extensions 1000 to max; +} + +extend google.protobuf.EnumOptions { + // Enum-level optionals for ScalaPB. + // Extension number officially assigned by protobuf-global-extension-registry@google.com + // + // The field is called enum_options and not enum since enum is not allowed in Java. + optional EnumOptions enum_options = 1020; +} + +message EnumValueOptions { + // Additional classes and traits to mix in to an individual enum value. + repeated string extends = 1; + + // Name in Scala to use for this enum value. + optional string scala_name = 2; + + // Custom annotations to add to the generated case object for this enum value. + repeated string annotations = 3; + + extensions 1000 to max; +} + +extend google.protobuf.EnumValueOptions { + // Enum-level optionals for ScalaPB. + // Extension number officially assigned by protobuf-global-extension-registry@google.com + optional EnumValueOptions enum_value = 1020; +} + +message OneofOptions { + // Additional traits to mix in to a oneof. + repeated string extends = 1; + + // Name in Scala to use for this oneof field. + optional string scala_name = 2; + + extensions 1000 to max; +} + +extend google.protobuf.OneofOptions { + // Enum-level optionals for ScalaPB. + // Extension number officially assigned by protobuf-global-extension-registry@google.com + optional OneofOptions oneof = 1020; +} + +enum MatchType { + CONTAINS = 0; + EXACT = 1; + PRESENCE = 2; +} + +message FieldTransformation { + optional google.protobuf.FieldDescriptorProto when = 1; + optional MatchType match_type = 2 [default=CONTAINS]; + optional google.protobuf.FieldOptions set = 3; +} + +message PreprocessorOutput { + map options_by_file = 1; +} diff --git a/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/executor/JavaRuntimeCompilation.scala b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/executor/JavaRuntimeCompilation.scala new file mode 100644 index 00000000000..eb904e691bd --- /dev/null +++ b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/executor/JavaRuntimeCompilation.scala @@ -0,0 +1,87 @@ +package edu.uci.ics.amber.core.executor + +import java.io.ByteArrayOutputStream +import java.net.URI +import java.util +import javax.tools._ + +object JavaRuntimeCompilation { + val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler + + def compileCode(code: String): Class[_] = { + val packageName = "edu.uci.ics.texera.workflow.operators.udf.java" + + //to hide it from user we will append the package in the udf code. + val codeToCompile = s"package $packageName;\n$code" + val defaultClassName = s"$packageName.JavaUDFOpExec" + + val fileManager: CustomJavaFileManager = new CustomJavaFileManager( + compiler.getStandardFileManager(null, null, null) + ) + + // Diagnostic collector is to capture compilation diagnostics (errors, warnings, etc.) + val diagnosticCollector = new DiagnosticCollector[JavaFileObject] + + /* Compiles the provided source code using the Java Compiler API, utilizing a custom file manager, + Collecting compilation diagnostics, and storing the result in 'compilationResult'. + */ + val compilationResult = compiler + .getTask( + null, + fileManager, + diagnosticCollector, + null, + null, + util.Arrays.asList(new StringJavaFileObject(defaultClassName, codeToCompile)) + ) + .call() + + // Checking if compilation was successful + if (!compilationResult) { + // Getting the compilation diagnostics (errors and warnings) + val diagnostics = diagnosticCollector.getDiagnostics + val errorMessageBuilder = new StringBuilder() + diagnostics.forEach { diagnostic => + errorMessageBuilder.append( + s"Error at line ${diagnostic.getLineNumber}: ${diagnostic.getMessage(null)}\n" + ) + } + throw new RuntimeException(errorMessageBuilder.toString()) + } + new CustomClassLoader().loadClass(defaultClassName, fileManager.getCompiledBytes) + } + + private class CustomJavaFileManager(fileManager: JavaFileManager) + extends ForwardingJavaFileManager[JavaFileManager](fileManager) { + private val outputBuffer: ByteArrayOutputStream = new ByteArrayOutputStream() + + def getCompiledBytes: Array[Byte] = outputBuffer.toByteArray + + override def getJavaFileForOutput( + location: JavaFileManager.Location, + className: String, + kind: JavaFileObject.Kind, + sibling: FileObject + ): JavaFileObject = { + new SimpleJavaFileObject(URI.create(s"string:///$className${kind.extension}"), kind) { + override def openOutputStream(): ByteArrayOutputStream = outputBuffer + } + } + } + + private class StringJavaFileObject(className: String, code: String) + extends SimpleJavaFileObject( + URI.create( + "string:///" + className.replace('.', '/') + JavaFileObject.Kind.SOURCE.extension + ), + JavaFileObject.Kind.SOURCE + ) { + override def getCharContent(ignoreEncodingErrors: Boolean): CharSequence = code + } + + private class CustomClassLoader extends ClassLoader { + + def loadClass(name: String, classBytes: Array[Byte]): Class[_] = + defineClass(name, classBytes, 0, classBytes.length) + } +} diff --git a/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/executor/OpExecInitInfo.scala b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/executor/OpExecInitInfo.scala new file mode 100644 index 00000000000..2e315e6296a --- /dev/null +++ b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/executor/OpExecInitInfo.scala @@ -0,0 +1,54 @@ +package edu.uci.ics.amber.core.executor + +object OpExecInitInfo { + + type OpExecFunc = (Int, Int) => OperatorExecutor + type JavaOpExecFunc = + java.util.function.Function[(Int, Int), OperatorExecutor] with java.io.Serializable + + def generateJavaOpExec( + opExecInitInfo: OpExecInitInfo, + workerIdx: Int, + numWorkers: Int + ): OperatorExecutor = { + opExecInitInfo match { + case OpExecInitInfoWithCode(codeGen) => + val (code, _) = + codeGen(workerIdx, numWorkers) + JavaRuntimeCompilation + .compileCode(code) + .getDeclaredConstructor() + .newInstance() + .asInstanceOf[OperatorExecutor] + case OpExecInitInfoWithFunc(opGen) => + opGen( + workerIdx, + numWorkers + ) + } + } + + def apply(code: String, language: String): OpExecInitInfo = + OpExecInitInfoWithCode((_, _) => (code, language)) + def apply(opExecFunc: OpExecFunc): OpExecInitInfo = OpExecInitInfoWithFunc(opExecFunc) + def apply(opExecFunc: JavaOpExecFunc): OpExecInitInfo = + OpExecInitInfoWithFunc((idx, totalWorkerCount) => opExecFunc.apply(idx, totalWorkerCount)) +} + +/** + * Information regarding initializing an operator executor instance + * it could be two cases: + * - OpExecInitInfoWithFunc: + * A function to create an operator executor instance, with parameters: + * 1) the worker index, 2) the PhysicalOp; + * - OpExecInitInfoWithCode: + * A function returning the code string that to be compiled in a virtual machine. + */ +sealed trait OpExecInitInfo + +final case class OpExecInitInfoWithCode( + codeGen: (Int, Int) => (String, String) +) extends OpExecInitInfo +final case class OpExecInitInfoWithFunc( + opGen: (Int, Int) => OperatorExecutor +) extends OpExecInitInfo diff --git a/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/executor/OperatorExecutor.scala b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/executor/OperatorExecutor.scala new file mode 100644 index 00000000000..f286d7e6aca --- /dev/null +++ b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/executor/OperatorExecutor.scala @@ -0,0 +1,40 @@ +package edu.uci.ics.amber.core.executor + +import edu.uci.ics.amber.core.marker.State +import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike} +import edu.uci.ics.amber.workflow.PortIdentity + +trait OperatorExecutor { + + def open(): Unit = {} + + def produceStateOnStart(port: Int): Option[State] = None + + def processState(state: State, port: Int): Option[State] = { + if (state.isPassToAllDownstream) { + Some(state) + } else { + None + } + } + + def processTupleMultiPort( + tuple: Tuple, + port: Int + ): Iterator[(TupleLike, Option[PortIdentity])] = { + processTuple(tuple, port).map(t => (t, None)) + } + + def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] + + def produceStateOnFinish(port: Int): Option[State] = None + + def onFinishMultiPort(port: Int): Iterator[(TupleLike, Option[PortIdentity])] = { + onFinish(port).map(t => (t, None)) + } + + def onFinish(port: Int): Iterator[TupleLike] = Iterator.empty + + def close(): Unit = {} + +} diff --git a/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/marker/Marker.scala b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/marker/Marker.scala new file mode 100644 index 00000000000..3c201bf51d0 --- /dev/null +++ b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/marker/Marker.scala @@ -0,0 +1,47 @@ +package edu.uci.ics.amber.core.marker + +import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} + +import scala.collection.mutable + +sealed trait Marker + +final case class StartOfInputChannel() extends Marker +final case class EndOfInputChannel() extends Marker + +final case class State(tuple: Option[Tuple] = None, passToAllDownstream: Boolean = false) + extends Marker { + val data: mutable.Map[String, (AttributeType, Any)] = mutable.LinkedHashMap() + add("passToAllDownstream", passToAllDownstream, AttributeType.BOOLEAN) + if (tuple.isDefined) { + tuple.get.getSchema.getAttributes.foreach { attribute => + add(attribute.getName, tuple.get.getField(attribute.getName), attribute.getType) + } + } + + def add(key: String, value: Any, valueType: AttributeType): Unit = + data.put(key, (valueType, value)) + + def get(key: String): Any = data(key)._2 + + def isPassToAllDownstream: Boolean = get("passToAllDownstream").asInstanceOf[Boolean] + + def apply(key: String): Any = get(key) + + def toTuple: Tuple = + Tuple + .builder( + Schema + .builder() + .add(data.map { + case (name, (attrType, _)) => + new Attribute(name, attrType) + }) + .build() + ) + .addSequentially(data.values.map(_._2).toArray) + .build() + + override def toString: String = + data.map { case (key, (_, value)) => s"$key: $value" }.mkString(", ") +} diff --git a/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/Attribute.java b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/Attribute.java new file mode 100644 index 00000000000..9c7661d514e --- /dev/null +++ b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/Attribute.java @@ -0,0 +1,72 @@ +package edu.uci.ics.amber.core.tuple; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.Serializable; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * An attribute describes the name and the type of a column. + */ +public class Attribute implements Serializable { + + private final String attributeName; + private final AttributeType attributeType; + + @JsonCreator + public Attribute( + @JsonProperty(value = "attributeName", required = true) String attributeName, + @JsonProperty(value = "attributeType", required = true) AttributeType attributeType + ) { + checkNotNull(attributeName); + checkNotNull(attributeType); + this.attributeName = attributeName; + this.attributeType = attributeType; + } + + @JsonProperty(value = "attributeName") + public String getName() { + return attributeName; + } + + @JsonProperty(value = "attributeType") + public AttributeType getType() { + return attributeType; + } + + @Override + public String toString() { + return "edu.ics.uci.amber.model.tuple.model.Attribute[name=" + attributeName + ", type=" + attributeType + "]"; + } + + @Override + public boolean equals(Object toCompare) { + if (this == toCompare) { + return true; + } + if (toCompare == null) { + return false; + } + if (this.getClass() != toCompare.getClass()) { + return false; + } + + Attribute that = (Attribute) toCompare; + + if (this.attributeName == null) { + return that.attributeName == null; + } + if (this.attributeType == null) { + return that.attributeType == null; + } + + return this.attributeName.equalsIgnoreCase(that.attributeName) && this.attributeType.equals(that.attributeType); + } + + @Override + public int hashCode() { + return this.attributeName.hashCode() + this.attributeType.toString().hashCode(); + } +} diff --git a/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/AttributeType.java b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/AttributeType.java new file mode 100644 index 00000000000..cf5233343ff --- /dev/null +++ b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/AttributeType.java @@ -0,0 +1,102 @@ +package edu.uci.ics.amber.core.tuple; + +import com.fasterxml.jackson.annotation.JsonValue; + +import java.io.Serializable; +import java.sql.Timestamp; + +public enum AttributeType implements Serializable { + + /** + * To add a new edu.ics.uci.amber.model.tuple.model.AttributeType, update the following files to handle the new type: + * 1. AttributeTypeUtils + * src/main/scala/edu/uci/ics/texera/workflow/common/tuple/schema/AttributeTypeUtils.scala + * Provide parsing, inferring, and casting logic between other AttributeTypes. + *

+ * 2. SQLSourceOpDesc + * src/main/scala/edu/uci/ics/texera/workflow/operators/source/sql/SQLSourceOpDesc + * Especially SQLSources will need to map the input schema to Texera.Schema. edu.ics.uci.amber.model.tuple.model.AttributeType + * needs to be converted from original source types accordingly. + *

+ * 3. FilterPredicate + * src/main/scala/edu/uci/ics/texera/workflow/operators/filter/FilterPredicate.java + * FilterPredicate takes in AttributeTypes and converts them into a comparable type, then do + * the comparison. New AttributeTypes needs to be mapped to a comparable type there. + *

+ * 4. SpecializedAverageOpDesc.getNumericalValue + * src/main/scala/edu/uci/ics/texera/workflow/operators/aggregate/SpecializedAverageOpDesc.scala + * New AttributeTypes might need to be converted into a numerical value in order to perform + * aggregations. + *

+ * 5. SchemaPropagationService.SchemaAttribute + * src/app/workspace/service/dynamic-schema/schema-propagation/schema-propagation.service.ts + * Declare the frontend SchemaAttribute for the new edu.ics.uci.amber.model.tuple.model.AttributeType. + *

+ * 6. ArrowUtils (Java) + * src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/ArrowUtils.scala + * Provide java-side conversion between ArrowType and edu.ics.uci.amber.model.tuple.model.AttributeType. + *

+ * 7. ArrowUtils (Python) + * src/main/python/core/util/arrow_utils.py + * Provide python-side conversion between ArrowType and edu.ics.uci.amber.model.tuple.model.AttributeType. + */ + + + // A field that is indexed but not tokenized: the entire String + // value is indexed as a single token + STRING("string", String.class), + INTEGER("integer", Integer.class), + LONG("long", Long.class), + DOUBLE("double", Double.class), + BOOLEAN("boolean", Boolean.class), + TIMESTAMP("timestamp", Timestamp.class), + BINARY("binary", byte[].class), + ANY("ANY", Object.class); + + private final String name; + private final Class fieldClass; + + AttributeType(String name, Class fieldClass) { + this.name = name; + this.fieldClass = fieldClass; + } + + @JsonValue + public String getName() { + if (this.name.equals(ANY.name)) { + // exclude this enum type in JSON schema + // JSON schema generator will ignore empty enum type + return ""; + } + return this.name; + } + + public Class getFieldClass() { + return this.fieldClass; + } + + public static AttributeType getAttributeType(Class fieldClass) { + if (fieldClass.equals(String.class)) { + return STRING; + } else if (fieldClass.equals(Integer.class)) { + return INTEGER; + } else if (fieldClass.equals(Long.class)) { + return LONG; + } else if (fieldClass.equals(Double.class)) { + return DOUBLE; + } else if (fieldClass.equals(Boolean.class)) { + return BOOLEAN; + } else if (fieldClass.equals(Timestamp.class)) { + return TIMESTAMP; + } else if (fieldClass.equals(byte[].class)) { + return BINARY; + } else { + return ANY; + } + } + + @Override + public String toString() { + return this.getName(); + } +} diff --git a/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/AttributeTypeUtils.scala b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/AttributeTypeUtils.scala new file mode 100644 index 00000000000..8cb7934011a --- /dev/null +++ b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/AttributeTypeUtils.scala @@ -0,0 +1,310 @@ +package edu.uci.ics.amber.core.tuple + +import com.github.sisyphsu.dateparser.DateParserUtils + +import java.sql.Timestamp +import scala.util.Try +import scala.util.control.Exception.allCatch + +object AttributeTypeUtils extends Serializable { + + /** + * this loop check whether the current attribute in the array is the attribute for casting, + * if it is, change it to result type + * if it's not, remain the same type + * we need this loop to keep the order the same as the original + * @param schema schema of data + * @param attribute selected attribute + * @param resultType casting type + * @return schema of data + */ + def SchemaCasting( + schema: Schema, + attribute: String, + resultType: AttributeType + ): Schema = { + // need a builder to maintain the order of original schema + val builder = Schema.builder() + val attributes: List[Attribute] = schema.getAttributes + // change the schema when meet selected attribute else remain the same + for (i <- attributes.indices) { + if (attributes.apply(i).getName.equals(attribute)) { + resultType match { + case AttributeType.STRING | AttributeType.INTEGER | AttributeType.DOUBLE | + AttributeType.LONG | AttributeType.BOOLEAN | AttributeType.TIMESTAMP | + AttributeType.BINARY => + builder.add(attribute, resultType) + case AttributeType.ANY | _ => + builder.add(attribute, attributes.apply(i).getType) + } + } else { + builder.add(attributes.apply(i).getName, attributes.apply(i).getType) + } + } + builder.build() + } + + /** + * Casts the fields of a tuple to new types according to a list of type casting units, + * producing a new tuple that conforms to the specified type changes. + * Each type casting unit specifies the attribute name and the target type to cast to. + * If an attribute name in the tuple does not have a corresponding type casting unit, + * its value is included in the result tuple without type conversion. + * + * @param tuple The source tuple whose fields are to be casted. + * @param targetTypes A mapping of attribute names to their target types, which specifies how to cast each field. + * If an attribute is not present in the map, no casting is applied to it. + * @return A new instance of TupleLike with fields casted to the target types + * as specified by the typeCastingUnits. + */ + def tupleCasting( + tuple: Tuple, + targetTypes: Map[String, AttributeType] + ): TupleLike = + TupleLike( + tuple.getSchema.getAttributes.map { attr => + val targetType = targetTypes.getOrElse(attr.getName, attr.getType) + parseField(tuple.getField(attr.getName), targetType) + } + ) + + def parseFields(fields: Array[Any], schema: Schema): Array[Any] = { + parseFields(fields, schema.getAttributes.map(attr => attr.getType).toArray) + } + + /** + * parse Fields to corresponding Java objects base on the given Schema AttributeTypes + * @param attributeTypes Schema AttributeTypeList + * @param fields fields value + * @return parsedFields in the target AttributeTypes + */ + @throws[AttributeTypeException] + def parseFields( + fields: Array[Any], + attributeTypes: Array[AttributeType] + ): Array[Any] = { + fields.indices.map(i => parseField(fields(i), attributeTypes(i))).toArray + } + + /** + * parse Field to a corresponding Java object base on the given Schema AttributeType + * @param field fields value + * @param attributeType target AttributeType + * + * @return parsedField in the target AttributeType + */ + @throws[AttributeTypeException] + def parseField( + field: Any, + attributeType: AttributeType + ): Any = { + if (field == null) return null + attributeType match { + case AttributeType.INTEGER => parseInteger(field) + case AttributeType.LONG => parseLong(field) + case AttributeType.DOUBLE => parseDouble(field) + case AttributeType.BOOLEAN => parseBoolean(field) + case AttributeType.TIMESTAMP => parseTimestamp(field) + case AttributeType.STRING => field.toString + case AttributeType.BINARY => field + case AttributeType.ANY | _ => field + } + } + + @throws[AttributeTypeException] + private def parseInteger(fieldValue: Any): Integer = { + fieldValue match { + case str: String => str.trim.toInt + case int: Integer => int + case long: java.lang.Long => long.toInt + case double: java.lang.Double => double.toInt + case boolean: java.lang.Boolean => if (boolean) 1 else 0 + // Timestamp and Binary are considered to be illegal here. + case _ => + throw new AttributeTypeException( + s"not able to parse type ${fieldValue.getClass} to Integer: ${fieldValue.toString}" + ) + } + } + + @throws[AttributeTypeException] + private def parseLong(fieldValue: Any): java.lang.Long = { + fieldValue match { + case str: String => str.trim.toLong + case int: Integer => int.toLong + case long: java.lang.Long => long + case double: java.lang.Double => double.toLong + case boolean: java.lang.Boolean => if (boolean) 1L else 0L + case timestamp: Timestamp => timestamp.toInstant.toEpochMilli + // Binary is considered to be illegal here. + case _ => + throw new AttributeTypeException( + s"not able to parse type ${fieldValue.getClass} to Long: ${fieldValue.toString}" + ) + } + } + + @throws[AttributeTypeException] + def parseTimestamp(fieldValue: Any): Timestamp = { + val parseError = new AttributeTypeException( + s"not able to parse type ${fieldValue.getClass} to Timestamp: ${fieldValue.toString}" + ) + fieldValue match { + case str: String => new Timestamp(DateParserUtils.parseDate(str.trim).getTime) + case long: java.lang.Long => new Timestamp(long) + case timestamp: Timestamp => timestamp + case date: java.util.Date => new Timestamp(date.getTime) + // Integer, Double, Boolean, Binary are considered to be illegal here. + case _ => + throw parseError + } + } + + @throws[AttributeTypeException] + def parseDouble(fieldValue: Any): java.lang.Double = { + fieldValue match { + case str: String => str.trim.toDouble + case int: Integer => int.toDouble + case long: java.lang.Long => long.toDouble + case double: java.lang.Double => double + case boolean: java.lang.Boolean => if (boolean) 1 else 0 + // Timestamp and Binary are considered to be illegal here. + case _ => + throw new AttributeTypeException( + s"not able to parse type ${fieldValue.getClass} to Double: ${fieldValue.toString}" + ) + } + } + + @throws[AttributeTypeException] + private def parseBoolean(fieldValue: Any): java.lang.Boolean = { + val parseError = new AttributeTypeException( + s"not able to parse type ${fieldValue.getClass} to Boolean: ${fieldValue.toString}" + ) + fieldValue match { + case str: String => + (Try(str.trim.toBoolean) orElse Try(str.trim.toInt == 1)) + .getOrElse(throw parseError) + case int: Integer => int != 0 + case long: java.lang.Long => long != 0 + case double: java.lang.Double => double != 0 + case boolean: java.lang.Boolean => boolean + // Timestamp and Binary are considered to be illegal here. + case _ => + throw parseError + } + } + + /** + * Infers field types of a given row of data. The given attributeTypes will be updated + * through each iteration of row inference, to contain the most accurate inference. + * @param attributeTypes AttributeTypes that being passed to each iteration. + * @param fields data fields to be parsed + * @return + */ + private def inferRow( + attributeTypes: Array[AttributeType], + fields: Array[Any] + ): Unit = { + for (i <- fields.indices) { + attributeTypes.update(i, inferField(attributeTypes.apply(i), fields.apply(i))) + } + } + + /** + * Infers field types of a given row of data. + * @param fieldsIterator iterator of field arrays to be parsed. + * each field array should have exact same order and length. + * @return AttributeType array + */ + def inferSchemaFromRows(fieldsIterator: Iterator[Array[Any]]): Array[AttributeType] = { + var attributeTypes: Array[AttributeType] = Array() + + for (fields <- fieldsIterator) { + if (attributeTypes.isEmpty) { + attributeTypes = Array.fill[AttributeType](fields.length)(AttributeType.INTEGER) + } + inferRow(attributeTypes, fields) + } + attributeTypes + } + + /** + * infer filed type with only data field + * @param fieldValue data field to be parsed, original as String field + * @return inferred AttributeType + */ + def inferField(fieldValue: Any): AttributeType = { + tryParseInteger(fieldValue) + } + + private def tryParseInteger(fieldValue: Any): AttributeType = { + if (fieldValue == null) + return AttributeType.INTEGER + allCatch opt parseInteger(fieldValue) match { + case Some(_) => AttributeType.INTEGER + case None => tryParseLong(fieldValue) + } + } + + private def tryParseLong(fieldValue: Any): AttributeType = { + if (fieldValue == null) + return AttributeType.LONG + allCatch opt parseLong(fieldValue) match { + case Some(_) => AttributeType.LONG + case None => tryParseTimestamp(fieldValue) + } + } + + private def tryParseTimestamp(fieldValue: Any): AttributeType = { + if (fieldValue == null) + return AttributeType.TIMESTAMP + allCatch opt parseTimestamp(fieldValue) match { + case Some(_) => AttributeType.TIMESTAMP + case None => tryParseDouble(fieldValue) + } + } + + private def tryParseDouble(fieldValue: Any): AttributeType = { + if (fieldValue == null) + return AttributeType.DOUBLE + allCatch opt parseDouble(fieldValue) match { + case Some(_) => AttributeType.DOUBLE + case None => tryParseBoolean(fieldValue) + } + } + + private def tryParseBoolean(fieldValue: Any): AttributeType = { + if (fieldValue == null) + return AttributeType.BOOLEAN + allCatch opt parseBoolean(fieldValue) match { + case Some(_) => AttributeType.BOOLEAN + case None => tryParseString() + } + } + + private def tryParseString(): AttributeType = { + AttributeType.STRING + } + + /** + * InferField when get both typeSofar and tuple string + * @param attributeType typeSofar + * @param fieldValue data field to be parsed, original as String field + * @return inferred AttributeType + */ + def inferField(attributeType: AttributeType, fieldValue: Any): AttributeType = { + attributeType match { + case AttributeType.STRING => tryParseString() + case AttributeType.BOOLEAN => tryParseBoolean(fieldValue) + case AttributeType.DOUBLE => tryParseDouble(fieldValue) + case AttributeType.LONG => tryParseLong(fieldValue) + case AttributeType.INTEGER => tryParseInteger(fieldValue) + case AttributeType.TIMESTAMP => tryParseTimestamp(fieldValue) + case AttributeType.BINARY => tryParseString() + case _ => tryParseString() + } + } + + class AttributeTypeException(msg: String) extends IllegalArgumentException(msg) {} +} diff --git a/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/Schema.scala b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/Schema.scala new file mode 100644 index 00000000000..c6932b8d8f1 --- /dev/null +++ b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/Schema.scala @@ -0,0 +1,202 @@ +package edu.uci.ics.amber.core.tuple + +import com.fasterxml.jackson.annotation.{JsonCreator, JsonIgnore, JsonProperty} +import com.google.common.base.Preconditions.checkNotNull + +import scala.collection.immutable.ListMap +import scala.collection.mutable + +case class Schema @JsonCreator() ( + @JsonProperty(value = "attributes", required = true) attributes: List[Attribute] +) extends Serializable { + + checkNotNull(attributes) + + val attributeIndex: Map[String, Int] = + attributes.view.map(_.getName.toLowerCase).zipWithIndex.toMap + + def this(attrs: Attribute*) = { + this(attrs.toList) + } + + @JsonProperty(value = "attributes") + def getAttributes: List[Attribute] = attributes + + @JsonIgnore + def getAttributeNames: List[String] = attributes.map(_.getName) + + def getIndex(attributeName: String): Int = { + if (!containsAttribute(attributeName)) { + throw new RuntimeException(s"$attributeName is not contained in the schema") + } + attributeIndex(attributeName.toLowerCase) + } + + def getAttribute(attributeName: String): Attribute = attributes(getIndex(attributeName)) + + @JsonIgnore + def containsAttribute(attributeName: String): Boolean = + attributeIndex.contains(attributeName.toLowerCase) + + override def hashCode(): Int = { + val prime = 31 + var result = 1 + result = prime * result + (if (attributes == null) 0 else attributes.hashCode) + result = prime * result + (if (attributeIndex == null) 0 else attributeIndex.hashCode) + result + } + + override def equals(obj: Any): Boolean = + obj match { + case that: Schema => + this.attributes == that.attributes && this.attributeIndex == that.attributeIndex + case _ => false + } + + override def toString: String = s"Schema[$attributes]" + + def getPartialSchema(attributeNames: List[String]): Schema = { + Schema(attributeNames.map(name => getAttribute(name))) + } + + /** + * This method converts to a Schema into a raw format, where each pair of attribute name and attribute type + * are represented as string. This is for serialization between languages. + */ + def toRawSchema: Map[String, String] = + getAttributes.foldLeft(ListMap[String, String]())((list, attr) => + list + (attr.getName -> attr.getType.toString) + ) +} + +object Schema { + def builder(): Builder = Builder() + + case class Builder(private var attributes: List[Attribute] = List.empty) { + private val attributeNames: mutable.Set[String] = mutable.Set.empty + + def add(attribute: Attribute): Builder = { + require(attribute != null, "edu.ics.uci.amber.model.tuple.model.Attribute cannot be null") + checkAttributeNotExists(attribute.getName) + attributes ::= attribute + attributeNames += attribute.getName.toLowerCase + this + } + + def add(attributeName: String, attributeType: AttributeType): Builder = { + add(new Attribute(attributeName, attributeType)) + this + } + + def add(attributes: Iterable[Attribute]): Builder = { + attributes.foreach(add) + this + } + + def add(attributes: Attribute*): Builder = { + attributes.foreach(add) + this + } + + def add(schema: Schema): Builder = { + checkNotNull(schema) + add(schema.getAttributes) + this + } + + def build(): Schema = Schema(attributes.reverse) + + /** + * Removes an attribute from the schema builder if it exists. + * + * @param attribute , the name of the attribute + * @return this Builder object + */ + def removeIfExists(attribute: String): Builder = { + checkNotNull(attribute) + attributes = attributes.filter((attr: Attribute) => !attr.getName.equalsIgnoreCase(attribute)) + attributeNames.remove(attribute.toLowerCase) + this + } + + /** + * Removes the attributes from the schema builder if they exist. + * + * @param attributes , the names of the attributes + * @return this Builder object + */ + def removeIfExists(attributes: Iterable[String]): Builder = { + checkNotNull(attributes) + attributes.foreach((attr: String) => checkNotNull(attr)) + attributes.foreach((attr: String) => this.removeIfExists(attr)) + this + } + + /** + * Removes the attributes from the schema builder if they exist. + * + * @param attributes , the names of the attributes + * @return this Builder object + */ + def removeIfExists(attributes: String*): Builder = { + checkNotNull(attributes) + this.removeIfExists(attributes) + this + } + + /** + * Removes an attribute from the schema builder. + * Fails if the attribute does not exist. + * + * @param attribute , the name of the attribute + * @return this Builder object + */ + def remove(attribute: String): Builder = { + checkNotNull(attribute) + checkAttributeExists(attribute) + removeIfExists(attribute) + this + } + + /** + * Removes the attributes from the schema builder. + * Fails if an attributes does not exist. + */ + def remove(attributes: Iterable[String]): Builder = { + checkNotNull(attributes) + attributes.foreach(attrName => checkNotNull(attrName)) + attributes.foreach(this.checkAttributeExists) + this.removeIfExists(attributes) + this + } + + /** + * Removes the attributes from the schema builder. + * Fails if an attributes does not exist. + * + * @param attributes + * @return the builder itself + */ + def remove(attributes: String*): Builder = { + checkNotNull(attributes) + this.remove(attributes) + this + } + + private def checkAttributeNotExists(attributeName: String): Unit = { + if (attributeNames.contains(attributeName.toLowerCase)) { + throw new RuntimeException( + s"edu.ics.uci.amber.model.tuple.model.Attribute $attributeName already exists in the schema" + ) + } + } + + private def checkAttributeExists(attributeName: String): Unit = { + if (!attributeNames.contains(attributeName.toLowerCase)) { + throw new RuntimeException( + s"edu.ics.uci.amber.model.tuple.model.Attribute $attributeName does not exist in the schema" + ) + } + } + } +} diff --git a/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/Tuple.scala b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/Tuple.scala new file mode 100644 index 00000000000..dea62ba6c00 --- /dev/null +++ b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/Tuple.scala @@ -0,0 +1,248 @@ +package edu.uci.ics.amber.core.tuple + +import Tuple.checkSchemaMatchesFields +import TupleUtils.document2Tuple +import com.fasterxml.jackson.annotation.{JsonCreator, JsonIgnore, JsonProperty} +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ObjectNode +import com.google.common.base.Preconditions.checkNotNull +import edu.uci.ics.amber.util.JSONUtils +import org.ehcache.sizeof.SizeOf +import org.bson.Document + +import java.util +import scala.collection.mutable + +class TupleBuildingException(errorMessage: String) extends RuntimeException(errorMessage) {} + +/** + * Represents a tuple in a data processing workflow, encapsulating a schema and corresponding field values. + * + * A Tuple is a fundamental data structure that holds an ordered collection of elements. Each element can be of any type. + * The schema defines the structure of the Tuple, including the names and types of fields that the Tuple can hold. + * + * @constructor Create a new Tuple with a specified schema and field values. + * @param schema The schema associated with this tuple, defining the structure and types of fields in the tuple. + * @param fieldVals A list of values corresponding to the fields defined in the schema. Each value in this list + * is mapped to a field in the schema, in the same order as the fields are defined. + * + * @throws IllegalArgumentException if either schema or fieldVals is null, ensuring that every Tuple has a well-defined structure. + */ +case class Tuple @JsonCreator() ( + @JsonProperty(value = "schema", required = true) schema: Schema, + @JsonProperty(value = "fields", required = true) fieldVals: Array[Any] +) extends SeqTupleLike + with Serializable { + + checkNotNull(schema) + checkNotNull(fieldVals) + checkSchemaMatchesFields(schema.getAttributes, fieldVals) + + override val inMemSize: Long = SizeOf.newInstance().deepSizeOf(this) + + @JsonIgnore def length: Int = fieldVals.length + + @JsonIgnore def getSchema: Schema = schema + + def getField[T](index: Int): T = { + fieldVals(index).asInstanceOf[T] + } + + def getField[T](attributeName: String): T = { + if (!schema.containsAttribute(attributeName)) { + throw new RuntimeException(s"$attributeName is not in the tuple") + } + getField(schema.getIndex(attributeName)) + } + + def getField[T](attribute: Attribute): T = getField(attribute.getName) + + override def getFields: Array[Any] = fieldVals + + override def enforceSchema(schema: Schema): Tuple = { + assert( + getSchema == schema, + s"output tuple schema does not match the expected schema! " + + s"output schema: $getSchema, " + + s"expected schema: $schema" + ) + this + } + + override def hashCode: Int = util.Arrays.deepHashCode(getFields.map(_.asInstanceOf[AnyRef])) + + override def equals(obj: Any): Boolean = + obj match { + case that: Tuple => (this.getFields sameElements that.getFields) && this.schema == that.schema + case _ => false + } + + def getPartialTuple(attributeNames: List[String]): Tuple = { + val partialSchema = schema.getPartialSchema(attributeNames) + val builder = Tuple.Builder(partialSchema) + val partialArray = attributeNames.map(getField[Any]).toArray + builder.addSequentially(partialArray) + builder.build() + } + + override def toString: String = + s"Tuple [schema=$schema, fields=${fieldVals.mkString("[", ", ", "]")}]" + + def asKeyValuePairJson(): ObjectNode = { + val objectNode = JSONUtils.objectMapper.createObjectNode() + this.schema.getAttributeNames.foreach { attrName => + val valueNode = + JSONUtils.objectMapper.convertValue(this.getField(attrName), classOf[JsonNode]) + objectNode.set[ObjectNode](attrName, valueNode) + } + objectNode + } + + def asDocument(): Document = { + val doc = new Document() + this.schema.getAttributeNames.foreach { attrName => + doc.put(attrName, this.getField(attrName)) + } + doc + } +} + +object Tuple { + val toDocument: Tuple => Document = (tuple: Tuple) => { + val doc = new Document() + tuple.schema.getAttributeNames.foreach { attrName => + doc.put(attrName, tuple.getField(attrName)) + } + doc + } + + val fromDocument: Schema => Document => Tuple = (schema: Schema) => + (doc: Document) => { + document2Tuple(doc, schema) + } + + /** + * Validates that the provided attributes match the provided fields in type and order. + * + * @param attributes An iterable of Attributes to be validated against the fields. + * @param fields An iterable of field values to be validated against the attributes. + * @throws RuntimeException if the sizes of attributes and fields do not match, or if their types are incompatible. + */ + private def checkSchemaMatchesFields( + attributes: Iterable[Attribute], + fields: Iterable[Any] + ): Unit = { + val attributeList = attributes.toList + val fieldList = fields.toList + + if (attributeList.size != fieldList.size) { + throw new RuntimeException( + s"Schema size (${attributeList.size}) and field size (${fieldList.size}) are different" + ) + } + + (attributeList zip fieldList).foreach { + case (attribute, field) => + checkAttributeMatchesField(attribute, field) + } + } + + /** + * Validates that a single field matches its corresponding attribute in type. + * + * @param attribute The attribute to be matched. + * @param field The field value to be checked. + * @throws RuntimeException if the field's type does not match the attribute's defined type. + */ + private def checkAttributeMatchesField(attribute: Attribute, field: Any): Unit = { + if ( + field != null && attribute.getType != AttributeType.ANY && !field.getClass.equals( + attribute.getType.getFieldClass + ) + ) { + throw new RuntimeException( + s"edu.ics.uci.amber.model.tuple.model.Attribute ${attribute.getName}'s type (${attribute.getType}) is different from field's type (${AttributeType + .getAttributeType(field.getClass)})" + ) + } + } + + /** + * Creates a new Tuple builder for a specified schema. + * + * @param schema The schema for which the Tuple builder will create Tuples. + * @return A new instance of Tuple.Builder configured with the specified schema. + */ + def builder(schema: Schema): Builder = { + Tuple.Builder(schema) + } + + /** + * Builder class for constructing Tuple instances in a flexible and controlled manner. + */ + case class Builder(schema: Schema) { + private val fieldNameMap = mutable.Map.empty[String, Any] + + def add(tuple: Tuple, isStrictSchemaMatch: Boolean = true): Builder = { + require(tuple != null, "Tuple cannot be null") + + tuple.getFields.zipWithIndex.foreach { + case (field, i) => + val attribute = tuple.schema.getAttributes(i) + if (!isStrictSchemaMatch && !schema.containsAttribute(attribute.getName)) { + // Skip if not matching in non-strict mode + } else { + add(attribute, tuple.getFields(i)) + } + } + this + } + + def add(attribute: Attribute, field: Any): Builder = { + require(attribute != null, "edu.ics.uci.amber.model.tuple.model.Attribute cannot be null") + checkAttributeMatchesField(attribute, field) + + if (!schema.containsAttribute(attribute.getName)) { + throw new TupleBuildingException( + s"${attribute.getName} doesn't exist in the expected schema." + ) + } + + fieldNameMap.put(attribute.getName.toLowerCase, field) + this + } + + def add(attributeName: String, attributeType: AttributeType, field: Any): Builder = { + require( + attributeName != null && attributeType != null, + "edu.ics.uci.amber.model.tuple.model.Attribute name and type cannot be null" + ) + this.add(new Attribute(attributeName, attributeType), field) + this + } + + def addSequentially(fields: Array[Any]): Builder = { + require(fields != null, "Fields cannot be null") + checkSchemaMatchesFields(schema.getAttributes, fields) + schema.getAttributes.zip(fields).foreach { + case (attribute, field) => + this.add(attribute, field) + } + this + } + + def build(): Tuple = { + val missingAttributes = + schema.getAttributes.filterNot(attr => fieldNameMap.contains(attr.getName.toLowerCase)) + if (missingAttributes.nonEmpty) { + throw new TupleBuildingException( + s"Tuple does not have the same number of attributes as schema. Missing attributes are $missingAttributes" + ) + } + + val fields = + schema.getAttributes.map(attr => fieldNameMap(attr.getName.toLowerCase)).toArray + new Tuple(schema, fields) + } + } +} diff --git a/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/TupleLike.scala b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/TupleLike.scala new file mode 100644 index 00000000000..16eddf5a8f9 --- /dev/null +++ b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/TupleLike.scala @@ -0,0 +1,139 @@ +package edu.uci.ics.amber.core.tuple + +import edu.uci.ics.amber.workflow.PortIdentity + +import scala.jdk.CollectionConverters.CollectionHasAsScala + +sealed trait FieldArray { + def getFields: Array[Any] +} + +sealed trait TupleLike extends FieldArray { + def inMemSize: Long = 0L +} + +trait SchemaEnforceable { + def enforceSchema(schema: Schema): Tuple +} + +trait InternalMarker extends TupleLike { + override def getFields: Array[Any] = Array.empty +} + +final case class FinalizePort(portId: PortIdentity, input: Boolean) extends InternalMarker +final case class FinalizeExecutor() extends InternalMarker + +trait SeqTupleLike extends TupleLike with SchemaEnforceable { + override def inMemSize: Long = ??? + + /** + * Constructs a Tuple object from a sequence of field values + * according to the specified schema. It asserts that the number + * of provided fields matches the schema's requirement, every + * field must also satisfy the field type. + * + * @param schema Schema for Tuple construction. + * @return Tuple constructed according to the schema. + */ + override def enforceSchema(schema: Schema): Tuple = { + val attributes = schema.getAttributes + val builder = Tuple.builder(schema) + getFields.zipWithIndex.foreach { + case (value, i) => + builder.add(attributes(i), value) + } + builder.build() + } + +} + +trait MapTupleLike extends SeqTupleLike with SchemaEnforceable { + override def inMemSize: Long = ??? + def fieldMappings: Map[String, Any] + override def getFields: Array[Any] = fieldMappings.values.toArray + + /** + * Constructs a `Tuple` based on the provided schema and `tupleLike` object. + * + * For each attribute in the schema, the function attempts to find a corresponding value + * in the tuple-like object's field mappings. If a mapping is found, that value is used; + * otherwise, `null` is used as the attribute value in the built tuple. + * + * @param schema The schema defining the attributes and their types for the tuple. + * @return A new `Tuple` instance built according to the schema and the data provided + * by the `tupleLike` object. + */ + override def enforceSchema(schema: Schema): Tuple = { + val builder = Tuple.builder(schema) + schema.getAttributes.foreach { attribute => + val value = fieldMappings.getOrElse(attribute.getName, null) + builder.add(attribute, value) + } + builder.build() + } +} + +object TupleLike { + + // Implicit evidence markers for different types + trait NotAnIterable[A] + + // Provide a low-priority implicit evidence for all types that are not Iterable + trait LowPriorityNotAnIterableImplicits { + implicit def defaultNotAnIterable[A]: NotAnIterable[A] = new NotAnIterable[A] {} + } + + // Object to hold the implicits + object NotAnIterable extends LowPriorityNotAnIterableImplicits { + // Prioritize this implicit for Strings, allowing them explicitly + implicit object StringIsNotAnIterable extends NotAnIterable[String] + + // Ensure Iterable types do not have an implicit NotAnIterable available + // This is a way to "exclude" Iterable types by not providing an implicit instance for them + implicit def iterableIsNotAnIterable[C[_] <: Iterable[A], A]: NotAnIterable[C[A]] = + throw new RuntimeException("Iterable types are not allowed") + } + + def apply(mappings: Map[String, Any]): MapTupleLike = { + new MapTupleLike { + override val fieldMappings: Map[String, Any] = mappings + } + } + + def apply(mappings: Iterable[(String, Any)]): MapTupleLike = { + new MapTupleLike { + override val fieldMappings: Map[String, Any] = mappings.toMap + } + } + + def apply(mappings: (String, Any)*): MapTupleLike = { + new MapTupleLike { + override val fieldMappings: Map[String, Any] = mappings.toMap + } + } + + def apply(fieldList: java.util.List[Any]): SeqTupleLike = { + new SeqTupleLike { + override val getFields: Array[Any] = fieldList.asScala.toArray + } + } + + def apply[T: NotAnIterable](fieldSeq: T*)(implicit ev: NotAnIterable[_] = null): SeqTupleLike = { + new SeqTupleLike { + override val getFields: Array[Any] = fieldSeq.toArray + } + } + + def apply[T <: Any](fieldIter: Iterable[T]): SeqTupleLike = { + new SeqTupleLike { + override val getFields: Array[Any] = fieldIter.toArray + } + } + + def apply(array: Array[Any]): SeqTupleLike = { + new SeqTupleLike { + override val getFields: Array[Any] = array + } + } + +} diff --git a/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/TupleUtils.scala b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/TupleUtils.scala new file mode 100644 index 00000000000..c8735782748 --- /dev/null +++ b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/TupleUtils.scala @@ -0,0 +1,83 @@ +package edu.uci.ics.amber.core.tuple + +import com.fasterxml.jackson.databind.JsonNode +import edu.uci.ics.amber.util.JSONUtils.JSONToMap +import edu.uci.ics.amber.util.JSONUtils.objectMapper +import AttributeTypeUtils.{inferSchemaFromRows, parseField} +import AttributeType.BINARY +import org.bson.Document +import org.bson.types.Binary + +import scala.collection.mutable.ArrayBuffer + +object TupleUtils { + + def tuple2json(tuple: Tuple): String = { + tuple.asKeyValuePairJson().toString + } + + def json2tuple(json: String): Tuple = { + var fieldNames = Set[String]() + + val allFields: ArrayBuffer[Map[String, String]] = ArrayBuffer() + + val root: JsonNode = objectMapper.readTree(json) + if (root.isObject) { + val fields: Map[String, String] = JSONToMap(root) + fieldNames = fieldNames.++(fields.keySet) + allFields += fields + } + + val sortedFieldNames = fieldNames.toList + + val attributeTypes = inferSchemaFromRows(allFields.iterator.map(fields => { + val result = ArrayBuffer[Object]() + for (fieldName <- sortedFieldNames) { + if (fields.contains(fieldName)) { + result += fields(fieldName) + } else { + result += null + } + } + result.toArray + })) + + val schema = Schema + .builder() + .add( + sortedFieldNames.indices + .map(i => new Attribute(sortedFieldNames(i), attributeTypes(i))) + ) + .build() + + try { + val fields = scala.collection.mutable.ArrayBuffer.empty[Any] + val data = JSONToMap(objectMapper.readTree(json)) + + for (fieldName <- schema.getAttributeNames) { + if (data.contains(fieldName)) { + fields += parseField(data(fieldName), schema.getAttribute(fieldName).getType) + } else { + fields += null + } + } + Tuple.builder(schema).addSequentially(fields.toArray).build() + } catch { + case e: Exception => throw e + } + } + + def document2Tuple(doc: Document, schema: Schema): Tuple = { + val builder = Tuple.builder(schema) + schema.getAttributes.foreach(attr => + if (attr.getType == BINARY) { + // special care for converting MongoDB's binary type to byte[] in our schema + builder.add(attr, doc.get(attr.getName).asInstanceOf[Binary].getData) + } else { + builder.add(attr, parseField(doc.get(attr.getName), attr.getType)) + } + ) + builder.build() + } + +} diff --git a/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/LocationPreference.scala b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/LocationPreference.scala new file mode 100644 index 00000000000..40372b93dd6 --- /dev/null +++ b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/LocationPreference.scala @@ -0,0 +1,14 @@ +package edu.uci.ics.amber.core.workflow + +// LocationPreference defines where operators should run. +sealed trait LocationPreference extends Serializable + +// PreferController: Run on the controller node. +// Example: For scan operators reading files or sink operators writing results on the controller. +object PreferController extends LocationPreference + +// RoundRobinPreference: Distribute across worker nodes, per operator. +// Example: +// - Operator A: Worker 1 -> Node 1, Worker 2 -> Node 2, Worker 3 -> Node 3 +// - Operator B: Worker 1 -> Node 1, Worker 2 -> Node 2 +object RoundRobinPreference extends LocationPreference diff --git a/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PartitionInfo.scala b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PartitionInfo.scala new file mode 100644 index 00000000000..dddd2b6c0cc --- /dev/null +++ b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PartitionInfo.scala @@ -0,0 +1,88 @@ +package edu.uci.ics.amber.core.workflow + +import com.fasterxml.jackson.annotation.JsonSubTypes.Type +import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo} + +/** + * The base interface of partition information in the compiler layer. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") +@JsonSubTypes( + Array( + new Type(value = classOf[HashPartition], name = "hash"), + new Type(value = classOf[RangePartition], name = "range"), + new Type(value = classOf[SinglePartition], name = "single"), + new Type(value = classOf[BroadcastPartition], name = "broadcast"), + new Type(value = classOf[UnknownPartition], name = "none") + ) +) +sealed abstract class PartitionInfo { + + // whether this partition satisfies the other partition + // in the default implementation, a partition only satisfies itself, + // a partition also always satisfy unknown partition (indicating no partition requirement) + def satisfies(other: PartitionInfo): Boolean = { + this == other || other == UnknownPartition() + } + + // after a stream with this partition merges with another stream with the other partition + // returns the the result partition after the merge + def merge(other: PartitionInfo): PartitionInfo = { + // if merge with the same partition, the result is the same + // if merge with a different partition, the result is unknown + if (this == other) this else UnknownPartition() + } + +} + +/** + * Defines a partitioning strategy where an input stream is distributed across + * multiple nodes based on a hash function applied to specified attribute names. + * If the list of attribute names is empty, hashing is applied to all attributes. + */ +final case class HashPartition(hashAttributeNames: List[String] = List.empty) extends PartitionInfo +object RangePartition { + + def apply(rangeAttributeNames: List[String], rangeMin: Long, rangeMax: Long): PartitionInfo = { + if (rangeAttributeNames.nonEmpty) + new RangePartition(rangeAttributeNames, rangeMin, rangeMax) + else + UnknownPartition() + } + +} + +/** + * Represents an input stream is partitioned on multiple nodes + * and each node contains data fit in a specific range. + * The data within each node is also sorted. + */ +final case class RangePartition(rangeAttributeNames: List[String], rangeMin: Long, rangeMax: Long) + extends PartitionInfo { + + // if two streams of input with the same range partition are merged (without another sort), + // we cannot ensure that the output stream follow the same sorting order. + override def merge(other: PartitionInfo): PartitionInfo = { + UnknownPartition() + } +} + +/** + * Represent the input stream is not partitioned and all data are on a single node. + */ +final case class SinglePartition() extends PartitionInfo {} + +/** + * Represents an input stream that is partitioned one-to-one between nodes, where each node processes a unique subset of the data. + */ +final case class OneToOnePartition() extends PartitionInfo {} + +/** + * Represents the input stream needs to send to every node + */ +final case class BroadcastPartition() extends PartitionInfo {} + +/** + * Represents there is no specific partitioning scheme of the input stream. + */ +final case class UnknownPartition() extends PartitionInfo {} diff --git a/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalOp.scala b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalOp.scala new file mode 100644 index 00000000000..066ea82f753 --- /dev/null +++ b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalOp.scala @@ -0,0 +1,508 @@ +package edu.uci.ics.amber.core.workflow + +import com.fasterxml.jackson.annotation.{JsonIgnore, JsonIgnoreProperties} +import com.typesafe.scalalogging.LazyLogging +import edu.uci.ics.amber.core.executor.{OpExecInitInfo, OpExecInitInfoWithCode} +import edu.uci.ics.amber.core.tuple.Schema +import edu.uci.ics.amber.virtualidentity.{ + ExecutionIdentity, + OperatorIdentity, + PhysicalOpIdentity, + WorkflowIdentity +} +import edu.uci.ics.amber.workflow.{InputPort, OutputPort, PhysicalLink, PortIdentity} +import org.jgrapht.graph.{DefaultEdge, DirectedAcyclicGraph} +import org.jgrapht.traverse.TopologicalOrderIterator + +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} + +case object SchemaPropagationFunc { + private type JavaSchemaPropagationFunc = + java.util.function.Function[Map[PortIdentity, Schema], Map[PortIdentity, Schema]] + with java.io.Serializable + def apply(javaFunc: JavaSchemaPropagationFunc): SchemaPropagationFunc = + SchemaPropagationFunc(inputSchemas => javaFunc.apply(inputSchemas)) + +} + +case class SchemaPropagationFunc(func: Map[PortIdentity, Schema] => Map[PortIdentity, Schema]) + +class SchemaNotAvailableException(message: String) extends Exception(message) + +object PhysicalOp { + + /** all source operators should use sourcePhysicalOp to give the following configs: + * 1) it initializes at the controller jvm. + * 2) it only has 1 worker actor. + * 3) it has no input ports. + */ + def sourcePhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity, + logicalOpId: OperatorIdentity, + opExecInitInfo: OpExecInitInfo + ): PhysicalOp = + sourcePhysicalOp( + PhysicalOpIdentity(logicalOpId, "main"), + workflowId, + executionId, + opExecInitInfo + ) + + def sourcePhysicalOp( + physicalOpId: PhysicalOpIdentity, + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity, + opExecInitInfo: OpExecInitInfo + ): PhysicalOp = + PhysicalOp( + physicalOpId, + workflowId, + executionId, + opExecInitInfo, + parallelizable = false, + locationPreference = Some(PreferController) + ) + + def oneToOnePhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity, + logicalOpId: OperatorIdentity, + opExecInitInfo: OpExecInitInfo + ): PhysicalOp = + oneToOnePhysicalOp( + PhysicalOpIdentity(logicalOpId, "main"), + workflowId, + executionId, + opExecInitInfo + ) + + def oneToOnePhysicalOp( + physicalOpId: PhysicalOpIdentity, + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity, + opExecInitInfo: OpExecInitInfo + ): PhysicalOp = + PhysicalOp(physicalOpId, workflowId, executionId, opExecInitInfo = opExecInitInfo) + + def manyToOnePhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity, + logicalOpId: OperatorIdentity, + opExecInitInfo: OpExecInitInfo + ): PhysicalOp = + manyToOnePhysicalOp( + PhysicalOpIdentity(logicalOpId, "main"), + workflowId, + executionId, + opExecInitInfo + ) + + def manyToOnePhysicalOp( + physicalOpId: PhysicalOpIdentity, + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity, + opExecInitInfo: OpExecInitInfo + ): PhysicalOp = { + PhysicalOp( + physicalOpId, + workflowId, + executionId, + opExecInitInfo, + parallelizable = false, + partitionRequirement = List(Option(SinglePartition())), + derivePartition = _ => SinglePartition() + ) + } + + def localPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity, + logicalOpId: OperatorIdentity, + opExecInitInfo: OpExecInitInfo + ): PhysicalOp = + localPhysicalOp( + PhysicalOpIdentity(logicalOpId, "main"), + workflowId, + executionId, + opExecInitInfo + ) + + def localPhysicalOp( + physicalOpId: PhysicalOpIdentity, + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity, + opExecInitInfo: OpExecInitInfo + ): PhysicalOp = { + manyToOnePhysicalOp(physicalOpId, workflowId, executionId, opExecInitInfo) + .withLocationPreference(Some(PreferController)) + } +} + +// @JsonIgnore is not working when directly annotated to fields of a case class +// https://stackoverflow.com/questions/40482904/jsonignore-doesnt-work-in-scala-case-class +@JsonIgnoreProperties( + Array( + "opExecInitInfo", // function type, ignore it + "derivePartition", // function type, ignore it + "inputPorts", // may contain very long stacktrace, ignore it + "outputPorts", // same reason with above + "propagateSchema" // function type, so ignore it + ) +) +case class PhysicalOp( + // the identifier of this PhysicalOp + id: PhysicalOpIdentity, + // the workflow id number + workflowId: WorkflowIdentity, + // the execution id number + executionId: ExecutionIdentity, + // information regarding initializing an operator executor instance + opExecInitInfo: OpExecInitInfo, + // preference of parallelism + parallelizable: Boolean = true, + // preference of worker placement + locationPreference: Option[LocationPreference] = None, + // requirement of partition policy (hash/range/single/none) on inputs + partitionRequirement: List[Option[PartitionInfo]] = List(), + // derive the output partition info given the input partitions + // if not specified, by default the output partition is the same as input partition + derivePartition: List[PartitionInfo] => PartitionInfo = inputParts => inputParts.head, + // input/output ports of the physical operator + // for operators with multiple input/output ports: must set these variables properly + inputPorts: Map[PortIdentity, (InputPort, List[PhysicalLink], Either[Throwable, Schema])] = + Map.empty, + outputPorts: Map[PortIdentity, (OutputPort, List[PhysicalLink], Either[Throwable, Schema])] = + Map.empty, + // schema propagation function + propagateSchema: SchemaPropagationFunc = SchemaPropagationFunc(schemas => schemas), + isOneToManyOp: Boolean = false, + // hint for number of workers + suggestedWorkerNum: Option[Int] = None +) extends LazyLogging { + + // all the "dependee" links are also blocking + private lazy val dependeeInputs: List[PortIdentity] = + inputPorts.values + .flatMap({ + case (port, _, _) => port.dependencies + }) + .toList + .distinct + + private lazy val isInitWithCode: Boolean = opExecInitInfo.isInstanceOf[OpExecInitInfoWithCode] + + /** + * Helper functions related to compile-time operations + */ + def isSourceOperator: Boolean = { + inputPorts.isEmpty + } + + /** + * Helper function used to determine whether the input link is a materialized link. + */ + def isSinkOperator: Boolean = { + outputPorts.forall(port => port._2._2.isEmpty) + } + + def isPythonBased: Boolean = { + opExecInitInfo match { + case opExecInfo: OpExecInitInfoWithCode => + val (_, language) = opExecInfo.codeGen(0, 0) + language == "python" || language == "r-tuple" || language == "r-table" + case _ => false + } + } + + @JsonIgnore // this is needed to prevent the serialization issue + def getPythonCode: String = { + val (code, _) = + opExecInitInfo.asInstanceOf[OpExecInitInfoWithCode].codeGen(0, 0) + code + } + + /** + * creates a copy with the location preference information + */ + def withLocationPreference(preference: Option[LocationPreference]): PhysicalOp = { + this.copy(locationPreference = preference) + } + + /** + * Creates a copy of the PhysicalOp with the specified input ports. Each input port is associated + * with an empty list of links and a None schema, reflecting the absence of predefined connections + * and schema information. + * + * @param inputs A list of InputPort instances to set as the new input ports. + * @return A new instance of PhysicalOp with the input ports updated. + */ + def withInputPorts(inputs: List[InputPort]): PhysicalOp = { + this.copy(inputPorts = + inputs + .map(input => + input.id -> (input, List + .empty[PhysicalLink], Left(new SchemaNotAvailableException("schema is not available"))) + ) + .toMap + ) + } + + /** + * Creates a copy of the PhysicalOp with the specified output ports. Each output port is + * initialized with an empty list of links and a None schema, indicating + * the absence of outbound connections and schema details at this stage. + * + * @param outputs A list of OutputPort instances to set as the new output ports. + * @return A new instance of PhysicalOp with the output ports updated. + */ + def withOutputPorts(outputs: List[OutputPort]): PhysicalOp = { + this.copy(outputPorts = + outputs + .map(output => + output.id -> (output, List + .empty[PhysicalLink], Left(new SchemaNotAvailableException("schema is not available"))) + ) + .toMap + ) + } + + /** + * creates a copy with suggested worker number. This is only to be used by Python UDF operators. + */ + def withSuggestedWorkerNum(workerNum: Int): PhysicalOp = { + this.copy(suggestedWorkerNum = Some(workerNum)) + } + + /** + * creates a copy with the partition requirements + */ + def withPartitionRequirement(partitionRequirements: List[Option[PartitionInfo]]): PhysicalOp = { + this.copy(partitionRequirement = partitionRequirements) + } + + /** + * creates a copy with the partition info derive function + */ + def withDerivePartition(derivePartition: List[PartitionInfo] => PartitionInfo): PhysicalOp = { + this.copy(derivePartition = derivePartition) + } + + /** + * creates a copy with the parallelizable specified + */ + def withParallelizable(parallelizable: Boolean): PhysicalOp = + this.copy(parallelizable = parallelizable) + + /** + * creates a copy with the specified property that whether this operator is one-to-many + */ + def withIsOneToManyOp(isOneToManyOp: Boolean): PhysicalOp = + this.copy(isOneToManyOp = isOneToManyOp) + + /** + * Creates a copy of the PhysicalOp with the schema of a specified input port updated. + * The schema can either be a successful schema definition or an error represented as a Throwable. + * + * @param portId The identity of the port to update. + * @param schema The new schema, or error, to be associated with the port, encapsulated within an Either. + * A Right value represents a successful schema, while a Left value represents an error (Throwable). + * @return A new instance of PhysicalOp with the updated input port schema or error information. + */ + private def withInputSchema( + portId: PortIdentity, + schema: Either[Throwable, Schema] + ): PhysicalOp = { + this.copy(inputPorts = inputPorts.updatedWith(portId) { + case Some((port, links, _)) => Some((port, links, schema)) + case None => None + }) + } + + /** + * Creates a copy of the PhysicalOp with the schema of a specified output port updated. + * Similar to `withInputSchema`, the schema can either represent a successful schema definition + * or an error, encapsulated as an Either type. + * + * @param portId The identity of the port to update. + * @param schema The new schema, or error, to be associated with the port, encapsulated within an Either. + * A Right value indicates a successful schema, while a Left value indicates an error (Throwable). + * @return A new instance of PhysicalOp with the updated output port schema or error information. + */ + private def withOutputSchema( + portId: PortIdentity, + schema: Either[Throwable, Schema] + ): PhysicalOp = { + this.copy(outputPorts = outputPorts.updatedWith(portId) { + case Some((port, links, _)) => Some((port, links, schema)) + case None => None + }) + } + + /** + * creates a copy with the schema propagation function. + */ + def withPropagateSchema(func: SchemaPropagationFunc): PhysicalOp = { + this.copy(propagateSchema = func) + } + + /** + * creates a copy with an additional input link specified on an input port + */ + def addInputLink(link: PhysicalLink): PhysicalOp = { + assert(link.toOpId == id) + assert(inputPorts.contains(link.toPortId)) + val (port, existingLinks, schema) = inputPorts(link.toPortId) + val newLinks = existingLinks :+ link + this.copy( + inputPorts = inputPorts + (link.toPortId -> (port, newLinks, schema)) + ) + } + + /** + * creates a copy with an additional output link specified on an output port + */ + def addOutputLink(link: PhysicalLink): PhysicalOp = { + assert(link.fromOpId == id) + assert(outputPorts.contains(link.fromPortId)) + val (port, existingLinks, schema) = outputPorts(link.fromPortId) + val newLinks = existingLinks :+ link + this.copy( + outputPorts = outputPorts + (link.fromPortId -> (port, newLinks, schema)) + ) + } + + /** + * creates a copy with a removed input link + */ + def removeInputLink(linkToRemove: PhysicalLink): PhysicalOp = { + val portId = linkToRemove.toPortId + val (port, existingLinks, schema) = inputPorts(portId) + this.copy( + inputPorts = + inputPorts + (portId -> (port, existingLinks.filter(link => link != linkToRemove), schema)) + ) + } + + /** + * creates a copy with a removed output link + */ + def removeOutputLink(linkToRemove: PhysicalLink): PhysicalOp = { + val portId = linkToRemove.fromPortId + val (port, existingLinks, schema) = outputPorts(portId) + this.copy( + outputPorts = + outputPorts + (portId -> (port, existingLinks.filter(link => link != linkToRemove), schema)) + ) + } + + /** + * creates a copy with an input schema updated, and if all input schemas are available, propagate + * the schema change to output schemas. + * @param newInputSchema optionally provide a schema for an input port. + */ + def propagateSchema(newInputSchema: Option[(PortIdentity, Schema)] = None): PhysicalOp = { + // Update the input schema if a new one is provided + val updatedOp = newInputSchema.foldLeft(this) { + case (op, (portId, schema)) => op.withInputSchema(portId, Right(schema)) + } + + // Extract input schemas, checking if all are defined + val inputSchemas = updatedOp.inputPorts.collect { + case (portId, (_, _, Right(schema))) => portId -> schema + } + + if (updatedOp.inputPorts.size == inputSchemas.size) { + // All input schemas are available, propagate to output schema + val schemaPropagationResult = Try(propagateSchema.func(inputSchemas)) + schemaPropagationResult match { + case Success(schemaMapping) => + schemaMapping.foldLeft(updatedOp) { + case (op, (portId, schema)) => + op.withOutputSchema(portId, Right(schema)) + } + case Failure(exception) => + // apply the exception to all output ports in case of failure + updatedOp.outputPorts.keys.foldLeft(updatedOp) { (op, portId) => + op.withOutputSchema(portId, Left(exception)) + } + } + } else { + // Not all input schemas are defined, return the updated operation without changes + updatedOp + } + } + + /** + * returns all output links. Optionally, if a specific portId is provided, returns the links connected to that portId. + */ + def getOutputLinks(portId: PortIdentity): List[PhysicalLink] = { + outputPorts.values + .flatMap(_._2) + .filter(link => link.fromPortId == portId) + .toList + } + + /** + * returns all input links. Optionally, if a specific portId is provided, returns the links connected to that portId. + */ + def getInputLinks(portIdOpt: Option[PortIdentity] = None): List[PhysicalLink] = { + inputPorts.values + .flatMap(_._2) + .toList + .filter(link => + portIdOpt match { + case Some(portId) => link.toPortId == portId + case None => true + } + ) + } + + /** + * Tells whether the input port the link connects to is depended by another input . + */ + def isInputLinkDependee(link: PhysicalLink): Boolean = { + dependeeInputs.contains(link.toPortId) + } + + /** + * Tells whether the output on this link is blocking i.e. the operator doesn't output anything till this link + * outputs all its tuples. + */ + def isOutputLinkBlocking(link: PhysicalLink): Boolean = { + this.outputPorts(link.fromPortId)._1.blocking + } + + /** + * Some operators process their inputs in a particular order. Eg: 2 phase hash join first + * processes the build input, then the probe input. + */ + def getInputLinksInProcessingOrder: List[PhysicalLink] = { + val dependencyDag = { + new DirectedAcyclicGraph[PhysicalLink, DefaultEdge](classOf[DefaultEdge]) + } + inputPorts.values + .map(_._1) + .flatMap(port => port.dependencies.map(dependee => port.id -> dependee)) + .foreach({ + case (depender: PortIdentity, dependee: PortIdentity) => + val upstreamLink = getInputLinks(Some(dependee)).head + val downstreamLink = getInputLinks(Some(depender)).head + if (!dependencyDag.containsVertex(upstreamLink)) { + dependencyDag.addVertex(upstreamLink) + } + if (!dependencyDag.containsVertex(downstreamLink)) { + dependencyDag.addVertex(downstreamLink) + } + dependencyDag.addEdge(upstreamLink, downstreamLink) + }) + val topologicalIterator = + new TopologicalOrderIterator[PhysicalLink, DefaultEdge](dependencyDag) + val processingOrder = new ArrayBuffer[PhysicalLink]() + while (topologicalIterator.hasNext) { + processingOrder.append(topologicalIterator.next()) + } + processingOrder.toList + } +} diff --git a/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala new file mode 100644 index 00000000000..81ec10324a2 --- /dev/null +++ b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalPlan.scala @@ -0,0 +1,281 @@ +package edu.uci.ics.amber.core.workflow + +import com.fasterxml.jackson.annotation.JsonIgnore +import com.typesafe.scalalogging.LazyLogging +import edu.uci.ics.amber.util.VirtualIdentityUtils +import edu.uci.ics.amber.virtualidentity.{ + ActorVirtualIdentity, + OperatorIdentity, + PhysicalOpIdentity +} +import edu.uci.ics.amber.workflow.PhysicalLink +import org.jgrapht.alg.connectivity.BiconnectivityInspector +import org.jgrapht.alg.shortestpath.AllDirectedPaths +import org.jgrapht.graph.DirectedAcyclicGraph +import org.jgrapht.traverse.TopologicalOrderIterator +import org.jgrapht.util.SupplierUtil + +import scala.jdk.CollectionConverters.{CollectionHasAsScala, IteratorHasAsScala} + +case class PhysicalPlan( + operators: Set[PhysicalOp], + links: Set[PhysicalLink] +) extends LazyLogging { + + @transient private lazy val operatorMap: Map[PhysicalOpIdentity, PhysicalOp] = + operators.map(o => (o.id, o)).toMap + + // the dag will be re-computed again once it reaches the coordinator. + @transient lazy val dag: DirectedAcyclicGraph[PhysicalOpIdentity, PhysicalLink] = { + val jgraphtDag = new DirectedAcyclicGraph[PhysicalOpIdentity, PhysicalLink]( + null, // vertexSupplier + SupplierUtil.createSupplier(classOf[PhysicalLink]), // edgeSupplier + false, // weighted + true // allowMultipleEdges + ) + operatorMap.foreach(op => jgraphtDag.addVertex(op._1)) + links.foreach(l => jgraphtDag.addEdge(l.fromOpId, l.toOpId, l)) + jgraphtDag + } + + @transient lazy val maxChains: Set[Set[PhysicalLink]] = this.getMaxChains + + def getSourceOperatorIds: Set[PhysicalOpIdentity] = + operatorMap.keys.filter(op => dag.inDegreeOf(op) == 0).toSet + + def getPhysicalOpsOfLogicalOp(logicalOpId: OperatorIdentity): List[PhysicalOp] = { + topologicalIterator() + .filter(physicalOpId => physicalOpId.logicalOpId == logicalOpId) + .map(physicalOpId => getOperator(physicalOpId)) + .toList + } + + def getOperator(physicalOpId: PhysicalOpIdentity): PhysicalOp = operatorMap(physicalOpId) + + /** + * returns a sub-plan that contains the specified operators and the links connected within these operators + */ + def getSubPlan(subOperators: Set[PhysicalOpIdentity]): PhysicalPlan = { + val newOps = operators.filter(op => subOperators.contains(op.id)) + val newLinks = + links.filter(link => + subOperators.contains(link.fromOpId) && subOperators.contains(link.toOpId) + ) + PhysicalPlan(newOps, newLinks) + } + + def getUpstreamPhysicalOpIds(physicalOpId: PhysicalOpIdentity): Set[PhysicalOpIdentity] = { + dag.incomingEdgesOf(physicalOpId).asScala.map(e => dag.getEdgeSource(e)).toSet + } + + def getUpstreamPhysicalLinks(physicalOpId: PhysicalOpIdentity): Set[PhysicalLink] = { + links.filter(l => l.toOpId == physicalOpId) + } + + def getDownstreamPhysicalLinks(physicalOpId: PhysicalOpIdentity): Set[PhysicalLink] = { + links.filter(l => l.fromOpId == physicalOpId) + } + + def topologicalIterator(): Iterator[PhysicalOpIdentity] = { + new TopologicalOrderIterator(dag).asScala + } + def addOperator(physicalOp: PhysicalOp): PhysicalPlan = { + this.copy(operators = Set(physicalOp) ++ operators) + } + + def addLink(link: PhysicalLink): PhysicalPlan = { + val formOp = operatorMap(link.fromOpId) + val (_, _, outputSchema) = formOp.outputPorts(link.fromPortId) + val newFromOp = formOp.addOutputLink(link) + val newToOp = getOperator(link.toOpId) + .addInputLink(link) + .propagateSchema(outputSchema.toOption.map(schema => (link.toPortId, schema))) + + val newOperators = operatorMap + + (link.fromOpId -> newFromOp) + + (link.toOpId -> newToOp) + this.copy(newOperators.values.toSet, links ++ Set(link)) + } + + def removeLink( + link: PhysicalLink + ): PhysicalPlan = { + val fromOpId = link.fromOpId + val toOpId = link.toOpId + val newOperators = operatorMap + + (fromOpId -> getOperator(fromOpId).removeOutputLink(link)) + + (toOpId -> getOperator(toOpId).removeInputLink(link)) + this.copy(operators = newOperators.values.toSet, links.filter(l => l != link)) + } + + def setOperator(physicalOp: PhysicalOp): PhysicalPlan = { + this.copy(operators = (operatorMap + (physicalOp.id -> physicalOp)).values.toSet) + } + + def getPhysicalOpByWorkerId(workerId: ActorVirtualIdentity): PhysicalOp = + getOperator(VirtualIdentityUtils.getPhysicalOpId(workerId)) + + def getLinksBetween( + from: PhysicalOpIdentity, + to: PhysicalOpIdentity + ): Set[PhysicalLink] = { + links.filter(link => link.fromOpId == from && link.toOpId == to) + + } + + def getOutputPartitionInfo( + link: PhysicalLink, + upstreamPartitionInfo: PartitionInfo, + opToWorkerNumberMapping: Map[PhysicalOpIdentity, Int] + ): PartitionInfo = { + val fromPhysicalOp = getOperator(link.fromOpId) + val toPhysicalOp = getOperator(link.toOpId) + + // make sure this input is connected to this port + assert( + toPhysicalOp + .getInputLinks(Some(link.toPortId)) + .map(link => link.fromOpId) + .contains(fromPhysicalOp.id) + ) + + // partition requirement of this PhysicalOp on this input port + val requiredPartitionInfo = + toPhysicalOp.partitionRequirement + .lift(link.toPortId.id) + .flatten + .getOrElse(UnknownPartition()) + + // the upstream partition info satisfies the requirement, and number of worker match + if ( + upstreamPartitionInfo.satisfies(requiredPartitionInfo) && opToWorkerNumberMapping.getOrElse( + fromPhysicalOp.id, + 0 + ) == opToWorkerNumberMapping.getOrElse(toPhysicalOp.id, 0) + ) { + upstreamPartitionInfo + } else { + // we must re-distribute the input partitions + requiredPartitionInfo + + } + } + + private def isMaterializedLink(link: PhysicalLink): Boolean = { + getOperator(link.toOpId).isSinkOperator + } + + def getNonMaterializedBlockingAndDependeeLinks: Set[PhysicalLink] = { + operators + .flatMap { physicalOp => + { + getUpstreamPhysicalOpIds(physicalOp.id) + .flatMap { upstreamPhysicalOpId => + links + .filter(link => + link.fromOpId == upstreamPhysicalOpId && link.toOpId == physicalOp.id + ) + .filter(link => + !isMaterializedLink(link) && (getOperator(physicalOp.id).isInputLinkDependee( + link + ) || getOperator(upstreamPhysicalOpId).isOutputLinkBlocking(link)) + ) + } + } + } + } + + def getDependeeLinks: Set[PhysicalLink] = { + operators + .flatMap { physicalOp => + { + getUpstreamPhysicalOpIds(physicalOp.id) + .flatMap { upstreamPhysicalOpId => + links + .filter(link => + link.fromOpId == upstreamPhysicalOpId && link.toOpId == physicalOp.id + ) + .filter(link => getOperator(physicalOp.id).isInputLinkDependee(link)) + } + } + } + } + + /** + * create a DAG similar to the physical DAG but with all dependee links removed. + */ + @JsonIgnore // this is needed to prevent the serialization issue + def getDependeeLinksRemovedDAG: PhysicalPlan = { + this.copy(operators, links.diff(getDependeeLinks)) + } + + /** + * A link is a bridge if removal of that link would increase the number of (weakly) connected components in the DAG. + * Assuming pipelining a link is more desirable than materializing it, and optimal physical plan always pipelines + * a bridge. We can thus use bridges to optimize the process of searching for an optimal physical plan. + * + * @return All non-blocking links that are not bridges. + */ + def getNonBridgeNonBlockingLinks: Set[PhysicalLink] = { + val bridges = + new BiconnectivityInspector[PhysicalOpIdentity, PhysicalLink](this.dag).getBridges.asScala + .map { edge => + { + val fromOpId = this.dag.getEdgeSource(edge) + val toOpId = this.dag.getEdgeTarget(edge) + links.find(l => l.fromOpId == fromOpId && l.toOpId == toOpId) + } + } + .flatMap(_.toList) + this.links.diff(getNonMaterializedBlockingAndDependeeLinks).diff(bridges.toSet) + } + + /** + * A chain in a physical plan is a path such that each of its operators (except the first and the last operators) + * is connected only to operators on the path. Assuming pipelining a link is more desirable than materializations, + * and optimal physical plan has at most one link on each chain. We can thus use chains to optimize the process of + * searching for an optimal physical plan. A maximal chain is a chain that is not a sub-path of any other chain. + * A maximal chain can cover the optimizations of all its sub-chains, so finding only maximal chains is adequate for + * optimization purposes. Note the definition of a chain has nothing to do with that of a connected component. + * + * @return All the maximal chains of this physical plan, where each chain is represented as a set of links. + */ + private def getMaxChains: Set[Set[PhysicalLink]] = { + val dijkstra = new AllDirectedPaths[PhysicalOpIdentity, PhysicalLink](this.dag) + val chains = this.dag + .vertexSet() + .asScala + .flatMap { ancestor => + { + this.dag.getDescendants(ancestor).asScala.flatMap { descendant => + { + dijkstra + .getAllPaths(ancestor, descendant, true, Integer.MAX_VALUE) + .asScala + .filter(path => + path.getLength > 1 && + path.getVertexList.asScala + .filter(v => v != path.getStartVertex && v != path.getEndVertex) + .forall(v => this.dag.inDegreeOf(v) == 1 && this.dag.outDegreeOf(v) == 1) + ) + .map(path => + path.getEdgeList.asScala + .map { edge => + { + val fromOpId = this.dag.getEdgeSource(edge) + val toOpId = this.dag.getEdgeTarget(edge) + links.find(l => l.fromOpId == fromOpId && l.toOpId == toOpId) + } + } + .flatMap(_.toList) + .toSet + ) + .toSet + } + } + } + } + chains.filter(s1 => chains.forall(s2 => s1 == s2 || !s1.subsetOf(s2))).toSet + } + +} diff --git a/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/WorkflowContext.scala b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/WorkflowContext.scala new file mode 100644 index 00000000000..abb776b1cad --- /dev/null +++ b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/WorkflowContext.scala @@ -0,0 +1,21 @@ +package edu.uci.ics.amber.core.workflow + +import edu.uci.ics.amber.core.workflow.WorkflowContext.{ + DEFAULT_EXECUTION_ID, + DEFAULT_WORKFLOW_ID, + DEFAULT_WORKFLOW_SETTINGS +} +import edu.uci.ics.amber.virtualidentity.{ExecutionIdentity, WorkflowIdentity} + +object WorkflowContext { + val DEFAULT_EXECUTION_ID: ExecutionIdentity = ExecutionIdentity(1L) + val DEFAULT_WORKFLOW_ID: WorkflowIdentity = WorkflowIdentity(1L) + val DEFAULT_WORKFLOW_SETTINGS: WorkflowSettings = WorkflowSettings( + 400 // TODO: make this configurable + ) +} +class WorkflowContext( + var workflowId: WorkflowIdentity = DEFAULT_WORKFLOW_ID, + var executionId: ExecutionIdentity = DEFAULT_EXECUTION_ID, + var workflowSettings: WorkflowSettings = DEFAULT_WORKFLOW_SETTINGS +) diff --git a/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/WorkflowSettings.scala b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/WorkflowSettings.scala new file mode 100644 index 00000000000..d8d5d67ee9f --- /dev/null +++ b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/WorkflowSettings.scala @@ -0,0 +1,3 @@ +package edu.uci.ics.amber.core.workflow + +case class WorkflowSettings(dataTransferBatchSize: Int) diff --git a/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/util/JSONUtils.scala b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/util/JSONUtils.scala new file mode 100644 index 00000000000..a30f35c562e --- /dev/null +++ b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/util/JSONUtils.scala @@ -0,0 +1,82 @@ +package edu.uci.ics.amber.util + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import com.fasterxml.jackson.module.noctordeser.NoCtorDeserModule +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import java.text.SimpleDateFormat +import scala.jdk.CollectionConverters.IteratorHasAsScala + +object JSONUtils { + + /** + * A singleton object for configuring the Jackson `ObjectMapper` to handle JSON serialization and deserialization + * in Scala. This custom `ObjectMapper` is tailored for Scala, ensuring compatibility with Scala types + * and specific serialization/deserialization settings. + * + * - Registers the `DefaultScalaModule` to ensure proper handling of Scala-specific types (e.g., `Option`, `Seq`). + * - Registers the `NoCtorDeserModule` to handle deserialization of Scala classes that lack a default constructor, + * which is common in case classes. + * - Sets the serialization inclusion rules to exclude `null` and `absent` values: + * - `Include.NON_NULL`: Excludes fields with `null` values from the serialized JSON. + * - `Include.NON_ABSENT`: Excludes fields with `Option.empty` (or equivalent absent values) from serialization. + * - Configures the date format for JSON serialization and deserialization: + * - The format is set to `yyyy-MM-dd'T'HH:mm:ss.SSS'Z'`, which follows the ISO-8601 standard for representing date and time, + * commonly used in JSON APIs, including millisecond precision and the UTC 'Z' suffix. + * + * This `ObjectMapper` provides a consistent way to serialize and deserialize JSON while adhering to Scala conventions + * and handling common patterns like `Option` and case classes. + */ + final val objectMapper = new ObjectMapper() + .registerModule(DefaultScalaModule) + .registerModule(new NoCtorDeserModule()) + .setSerializationInclusion(Include.NON_NULL) + .setSerializationInclusion(Include.NON_ABSENT) + .setDateFormat(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) + + /** + * this method helps convert JSON into a key-value Map. By default it will only + * take the first level attributes of the JSON object, and ignore nested objects + * and arrays. For example: + * input JSON {"A" : "a", "B": 1, "C": 2.3, "D" :{"some":"object"}, "E": ["1", "2"]} + * will be converted to Map[String, String]{"A" : "a", "B": "1", "C": "2.3"}. + * + * If flatten mode is enabled, then the nested objects and arrays will be converted + * to map recursively. The key will be the `parentName[index].childName`. For example: + * input JSON {"A" : "a", "B": 1, "C": 2.3, "D" :{"some":"object"}, "E": ["X", "Y"]} + * will be converted to Map[String, String]{"A" : "a", "B": "1", "C": "2.3", + * "D.some":"object", "E1":"X", "E2":"Y"}. + * + * @param node the JSONNode to convert. + * @param flatten a boolean to toggle flatten mode. + * @param parentName the parent's name to pass into children's naming conversion. + * @return a Map[String, String] of all the key value pairs from the given JSONNode. + */ + def JSONToMap( + node: JsonNode, + flatten: Boolean = false, + parentName: String = "" + ): Map[String, String] = { + var result = Map[String, String]() + if (node.isObject) { + for (key <- node.fieldNames().asScala) { + val child: JsonNode = node.get(key) + val absoluteKey = (if (parentName.nonEmpty) parentName + "." else "") + key + if (flatten && (child.isObject || child.isArray)) { + result = result ++ JSONToMap(child, flatten, absoluteKey) + } else if (child.isValueNode) { + result = result + (absoluteKey -> child.asText()) + } else { + // do nothing + } + } + } else if (node.isArray) { + for ((child, i) <- node.elements().asScala.zipWithIndex) { + result = result ++ JSONToMap(child, flatten, parentName + (i + 1)) + } + } + result + } + +} diff --git a/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/util/VirtualIdentityUtils.scala b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/util/VirtualIdentityUtils.scala new file mode 100644 index 00000000000..f38bb9577bb --- /dev/null +++ b/core/micro-services/workflow-core/src/main/scala/edu/uci/ics/amber/util/VirtualIdentityUtils.scala @@ -0,0 +1,74 @@ +package edu.uci.ics.amber.util + +import edu.uci.ics.amber.virtualidentity.{ + ActorVirtualIdentity, + OperatorIdentity, + PhysicalOpIdentity, + WorkflowIdentity +} + +import scala.util.matching.Regex + +object VirtualIdentityUtils { + + private val workerNamePattern: Regex = raw"Worker:WF(\d+)-(.+)-(\w+)-(\d+)".r + private val operatorUUIDPattern: Regex = raw"(\w+)-(.+)-(\w+)".r + def createWorkerIdentity( + workflowId: WorkflowIdentity, + operator: String, + layerName: String, + workerId: Int + ): ActorVirtualIdentity = { + + ActorVirtualIdentity( + s"Worker:WF${workflowId.id}-$operator-$layerName-$workerId" + ) + } + + def createWorkerIdentity( + workflowId: WorkflowIdentity, + physicalOpId: PhysicalOpIdentity, + workerId: Int + ): ActorVirtualIdentity = { + createWorkerIdentity( + workflowId, + physicalOpId.logicalOpId.id, + physicalOpId.layerName, + workerId + ) + } + + def getPhysicalOpId(workerId: ActorVirtualIdentity): PhysicalOpIdentity = { + workerId.name match { + case workerNamePattern(_, operator, layerName, _) => + PhysicalOpIdentity(OperatorIdentity(operator), layerName) + case other => + // for special actorId such as SELF, CONTROLLER + PhysicalOpIdentity(OperatorIdentity("__DummyOperator"), "__DummyLayer") + } + } + + def getWorkerIndex(workerId: ActorVirtualIdentity): Int = { + workerId.name match { + case workerNamePattern(_, _, _, idx) => + idx.toInt + } + } + + def toShorterString(workerId: ActorVirtualIdentity): String = { + workerId.name match { + case workerNamePattern(workflowId, operatorName, layerName, workerIndex) => + val shorterName = if (operatorName.length > 6) { + operatorName match { + case operatorUUIDPattern(op, _, postfix) => op + "-" + postfix.takeRight(6) + case _ => operatorName.takeRight(6) + } + } else { + operatorName + } + + s"WF$workflowId-$shorterName-$layerName-$workerIndex" + case _ => workerId.name + } + } +} diff --git a/core/micro-services/workflow-core/src/test/scala/edu/uci/ics/amber/core/tuple/AttributeTypeUtilsSpec.scala b/core/micro-services/workflow-core/src/test/scala/edu/uci/ics/amber/core/tuple/AttributeTypeUtilsSpec.scala new file mode 100644 index 00000000000..62607eb4ccb --- /dev/null +++ b/core/micro-services/workflow-core/src/test/scala/edu/uci/ics/amber/core/tuple/AttributeTypeUtilsSpec.scala @@ -0,0 +1,86 @@ +package edu.uci.ics.amber.core.tuple + +import edu.uci.ics.amber.core.tuple.AttributeTypeUtils.{inferField, inferSchemaFromRows} +import edu.uci.ics.amber.core.tuple.AttributeType._ +import org.scalatest.funsuite.AnyFunSuite + +class AttributeTypeUtilsSpec extends AnyFunSuite { + // Unit Test for Infer Schema + + test("type should get inferred correctly individually") { + + assert(inferField(" 1 \n\n") == INTEGER) + assert(inferField(" 1.1\t") == DOUBLE) + assert(inferField("1,111.1 ") == STRING) + assert(inferField("k2068-10-29T18:43:15.000Z") == STRING) + assert(inferField(" 12321321312321312312321321 ") == DOUBLE) + assert(inferField(" 123,123,123,123,123,123,123.11") == STRING) + assert(inferField(" 00\t") == INTEGER) + assert(inferField("\t-.2 ") == DOUBLE) + assert(inferField("\n False ") == BOOLEAN) + assert(inferField("07/10/96 4:5 PM, PDT") == TIMESTAMP) + assert(inferField("02/2/2020") == TIMESTAMP) + assert(inferField("\n\n02/2/23 ") == TIMESTAMP) + assert(inferField(" 2023年8月7日 ") == TIMESTAMP) + assert( + inferField("2020-12-31T23:25:59.999Z") == TIMESTAMP + ) // ISO format with milliseconds and UTC + assert(inferField("2020-12-31T11:59:59+01:00") == TIMESTAMP) // ISO format with timezone offset + assert( + inferField("2020-12-31T11:59:59") == TIMESTAMP + ) // ISO format without milliseconds and timezone + assert( + inferField("31/12/2020 23:59:59") == TIMESTAMP + ) // European datetime format with slash separators + assert( + inferField("12/31/2020 11:59:59") == TIMESTAMP + ) // US datetime format with slash separators + assert(inferField("2020-12-31") == TIMESTAMP) // Common date format + assert(inferField("31-Dec-2020") == TIMESTAMP) // Date format with three-letter month + assert( + inferField("Wednesday, 31-Dec-20 23:59:59 GMT") == TIMESTAMP + ) // Verbose format with day and timezone + assert( + inferField("1 Jan 2020 05:30:00 GMT") == TIMESTAMP + ) // Another verbose format with timezone + assert(inferField("15-Aug-2020 20:20:20") == TIMESTAMP) // Day-Month-Year format with time + assert(inferField("2020年12月31日 23:59") == TIMESTAMP) // East Asian date format with time + assert(inferField("2020/12/31 23:59") == TIMESTAMP) // Alternate slash format with time + + } + + test("types should get inferred correctly with one row") { + val row: Array[Any] = + Array("string", "1", "2020-01-02T00:05:56.000Z", "1.3", "213214124124124", "true") + val rows: Iterator[Array[Any]] = Iterator(row) + val attributeTypes = inferSchemaFromRows(rows) + assert(attributeTypes(0) == STRING) + assert(attributeTypes(1) == INTEGER) + assert(attributeTypes(2) == TIMESTAMP) + assert(attributeTypes(3) == DOUBLE) + assert(attributeTypes(4) == LONG) + assert(attributeTypes(5) == BOOLEAN) + + } + + test("types should get inferred correctly with multiple rows") { + + val rows: Iterator[Array[Any]] = Iterator( + Array("string", "1 ", "2020-01-02T00:05:56.000Z", "1.3 ", "9223372036854775807", "true"), + Array("1932-09-06", "0 ", "1932-09-06T03:47:19Z", "9223.23", "-1", "false "), + Array("", "-1", "1979-08-12T10:18:49Z", "-.11", "-9223372036854775808 ", "0"), + Array("123,456,789", " -0", " 2023-6-7 8:9:38", " -9.32", "0", "1"), + Array("92233720368547758072", "2147483647", "2023-06-27T08:09:38Z", ".1", "1", " TRUE"), + Array("\n", "-2147483648", "2068-10-29T18:43:15.000Z ", " 100.00 ", "03685477", "FALSE") + ) + val attributeTypes = inferSchemaFromRows(rows) + assert(attributeTypes(0) == STRING) + assert(attributeTypes(1) == INTEGER) + assert(attributeTypes(2) == TIMESTAMP) + assert(attributeTypes(3) == DOUBLE) + assert(attributeTypes(4) == LONG) + assert(attributeTypes(5) == BOOLEAN) + + } + +} diff --git a/core/micro-services/workflow-core/src/test/scala/edu/uci/ics/amber/core/tuple/TupleSpec.scala b/core/micro-services/workflow-core/src/test/scala/edu/uci/ics/amber/core/tuple/TupleSpec.scala new file mode 100644 index 00000000000..f5941c22c85 --- /dev/null +++ b/core/micro-services/workflow-core/src/test/scala/edu/uci/ics/amber/core/tuple/TupleSpec.scala @@ -0,0 +1,178 @@ +package edu.uci.ics.amber.core.tuple + +import edu.uci.ics.amber.core.tuple.TupleUtils.{json2tuple, tuple2json} +import org.scalatest.flatspec.AnyFlatSpec + +import java.sql.Timestamp + +class TupleSpec extends AnyFlatSpec { + val stringAttribute = new Attribute("col-string", AttributeType.STRING) + val integerAttribute = new Attribute("col-int", AttributeType.INTEGER) + val boolAttribute = new Attribute("col-bool", AttributeType.BOOLEAN) + val longAttribute = new Attribute("col-long", AttributeType.LONG) + val doubleAttribute = new Attribute("col-double", AttributeType.DOUBLE) + val timestampAttribute = new Attribute("col-timestamp", AttributeType.TIMESTAMP) + val binaryAttribute = new Attribute("col-binary", AttributeType.BINARY) + + val capitalizedStringAttribute = new Attribute("COL-string", AttributeType.STRING) + + it should "create a tuple with capitalized attributeName" in { + + val schema = Schema.builder().add(capitalizedStringAttribute).build() + val tuple = Tuple.builder(schema).add(capitalizedStringAttribute, "string-value").build() + assert(tuple.getField("COL-string").asInstanceOf[String] == "string-value") + + } + + it should "create a tuple with capitalized attributeName, using addSequentially" in { + val schema = Schema.builder().add(capitalizedStringAttribute).build() + val tuple = Tuple.builder(schema).addSequentially(Array("string-value")).build() + assert(tuple.getField("COL-string").asInstanceOf[String] == "string-value") + } + + it should "create a tuple using new builder, based on another tuple using old builder" in { + val schema = Schema.builder().add(stringAttribute).build() + val inputTuple = Tuple.builder(schema).addSequentially(Array("string-value")).build() + val newTuple = Tuple.builder(inputTuple.getSchema).add(inputTuple).build() + + assert(newTuple.length == inputTuple.length) + } + + it should "fail when unknown attribute is added to tuple" in { + val schema = Schema.builder().add(stringAttribute).build() + assertThrows[TupleBuildingException] { + Tuple.builder(schema).add(integerAttribute, 1) + } + } + + it should "fail when tuple does not conform to complete schema" in { + val schema = Schema.builder().add(stringAttribute).add(integerAttribute).build() + assertThrows[TupleBuildingException] { + Tuple.builder(schema).add(integerAttribute, 1).build() + } + } + + it should "fail when entire tuple passed in has extra attributes" in { + val inputSchema = + Schema.builder().add(stringAttribute).add(integerAttribute).add(boolAttribute).build() + val inputTuple = Tuple + .builder(inputSchema) + .add(integerAttribute, 1) + .add(stringAttribute, "string-attr") + .add(boolAttribute, true) + .build() + + val outputSchema = Schema.builder().add(stringAttribute).add(integerAttribute).build() + assertThrows[TupleBuildingException] { + Tuple.builder(outputSchema).add(inputTuple).build() + } + } + + it should "not fail when entire tuple passed in has extra attributes and strictSchemaMatch is false" in { + val inputSchema = + Schema.builder().add(stringAttribute).add(integerAttribute).add(boolAttribute).build() + val inputTuple = Tuple + .builder(inputSchema) + .add(integerAttribute, 1) + .add(stringAttribute, "string-attr") + .add(boolAttribute, true) + .build() + + val outputSchema = Schema.builder().add(stringAttribute).add(integerAttribute).build() + val outputTuple = Tuple.builder(outputSchema).add(inputTuple, false).build() + + // This is the important test. Input tuple has 3 attributes but output tuple has only 2 + // It's because of isStrictSchemaMatch=false + assert(outputTuple.length == 2); + } + + it should "produce identical strings" in { + val inputSchema = + Schema.builder().add(stringAttribute).add(integerAttribute).add(boolAttribute).build() + val inputTuple = Tuple + .builder(inputSchema) + .add(integerAttribute, 1) + .add(stringAttribute, "string-attr") + .add(boolAttribute, true) + .build() + + val line = tuple2json(inputTuple) + val newTuple = json2tuple(line) + assert(line == tuple2json(newTuple)) + + } + + it should "calculate hash" in { + val inputSchema = + Schema + .builder() + .add(integerAttribute) + .add(stringAttribute) + .add(boolAttribute) + .add(longAttribute) + .add(doubleAttribute) + .add(timestampAttribute) + .add(binaryAttribute) + .build() + + val inputTuple = Tuple + .builder(inputSchema) + .add(integerAttribute, 922323) + .add(stringAttribute, "string-attr") + .add(boolAttribute, true) + .add(longAttribute, 1123213213213L) + .add(doubleAttribute, 214214.9969346) + .add(timestampAttribute, new Timestamp(100000000L)) + .add(binaryAttribute, Array[Byte](104, 101, 108, 108, 111)) + .build() + assert(inputTuple.hashCode() == -1335416166) + + val inputTuple2 = Tuple + .builder(inputSchema) + .add(integerAttribute, 0) + .add(stringAttribute, "") + .add(boolAttribute, false) + .add(longAttribute, 0L) + .add(doubleAttribute, 0.0) + .add(timestampAttribute, new Timestamp(0L)) + .add(binaryAttribute, Array[Byte]()) + .build() + assert(inputTuple2.hashCode() == -1409761483) + + val inputTuple3 = Tuple + .builder(inputSchema) + .add(integerAttribute, null) + .add(stringAttribute, null) + .add(boolAttribute, null) + .add(longAttribute, null) + .add(doubleAttribute, null) + .add(timestampAttribute, null) + .add(binaryAttribute, null) + .build() + assert(inputTuple3.hashCode() == 1742810335) + + val inputTuple4 = Tuple + .builder(inputSchema) + .add(integerAttribute, -3245763) + .add(stringAttribute, "\n\r\napple") + .add(boolAttribute, true) + .add(longAttribute, -8965536434247L) + .add(doubleAttribute, 1 / 3.0d) + .add(timestampAttribute, new Timestamp(-1990)) + .add(binaryAttribute, null) + .build() + assert(inputTuple4.hashCode() == -592643630) + + val inputTuple5 = Tuple + .builder(inputSchema) + .add(integerAttribute, Int.MaxValue) + .add(stringAttribute, new String()) + .add(boolAttribute, true) + .add(longAttribute, Long.MaxValue) + .add(doubleAttribute, 7 / 17.0d) + .add(timestampAttribute, new Timestamp(1234567890L)) + .add(binaryAttribute, Array.fill[Byte](4097)('o')) + .build() + assert(inputTuple5.hashCode() == -2099556631) + } +}