From 3fde920e6e4e1006b99a687804f802d44f9e31aa Mon Sep 17 00:00:00 2001 From: Eric Feng Date: Sat, 17 Oct 2020 17:05:09 -0500 Subject: [PATCH 1/5] add date_add, interval sql still running into issues --- src/enclave/Enclave/ExpressionEvaluation.h | 46 + src/flatbuffers/Expr.fbs | 8 +- .../edu/berkeley/cs/rise/opaque/Utils.scala | 9 + .../cs/rise/opaque/OpaqueOperatorTests.scala | 1299 +++++++++-------- 4 files changed, 729 insertions(+), 633 deletions(-) diff --git a/src/enclave/Enclave/ExpressionEvaluation.h b/src/enclave/Enclave/ExpressionEvaluation.h index ac41c85f2d..2b40e23de6 100644 --- a/src/enclave/Enclave/ExpressionEvaluation.h +++ b/src/enclave/Enclave/ExpressionEvaluation.h @@ -888,6 +888,52 @@ class FlatbuffersExpressionEvaluator { false); } + // Time expressions + case tuix::ExprUnion_DateAdd: + { + // TODO: handle Contains(str, "") + + auto c = static_cast(expr->expr()); + auto left_offset = eval_helper(row, c->left()); + auto right_offset = eval_helper(row, c->right()); + + // Note: These temporary pointers will be invalidated when we next write to builder + const tuix::Field *left = flatbuffers::GetTemporaryPointer(builder, left_offset); + const tuix::Field *right = flatbuffers::GetTemporaryPointer(builder, right_offset); + + if (left->value_type() != tuix::FieldUnion_DateField + || right->value_type() != tuix::FieldUnion_IntegerField) { + throw std::runtime_error( + std::string("tuix::DateAdd requires date Date, increment Integer, not ") + + std::string("date ") + + std::string(tuix::EnumNameFieldUnion(left->value_type())) + + std::string(", increment ") + + std::string(tuix::EnumNameFieldUnion(right->value_type()))); + } + + bool result_is_null = left->is_null() || right->is_null(); + + if (!result_is_null) { + auto left_field = static_cast(left->value()); + auto right_field = static_cast(right->value()); + + uint32_t result = left_field->value() + right_field->value(); + + return tuix::CreateField( + builder, + tuix::FieldUnion_DateField, + tuix::CreateDateField(builder, result).Union(), + result_is_null); + } else { + uint32_t result = 0; + return tuix::CreateField( + builder, + tuix::FieldUnion_DateField, + tuix::CreateDateField(builder, result).Union(), + result_is_null); + } + } + case tuix::ExprUnion_Year: { auto e = static_cast(expr->expr()); diff --git a/src/flatbuffers/Expr.fbs b/src/flatbuffers/Expr.fbs index be66e9e5a9..b78baa3602 100644 --- a/src/flatbuffers/Expr.fbs +++ b/src/flatbuffers/Expr.fbs @@ -33,7 +33,8 @@ union ExprUnion { Exp, ClosestPoint, CreateArray, - Upper + Upper, + DateAdd } table Expr { @@ -147,6 +148,11 @@ table Year { child:Expr; } +table DateAdd { + left:Expr; + right:Expr; +} + // Math expressions table Exp { child:Expr; diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index ac64ca0e18..c740914585 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -44,6 +44,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.catalyst.expressions.Contains +import org.apache.spark.sql.catalyst.expressions.DateAdd import org.apache.spark.sql.catalyst.expressions.Descending import org.apache.spark.sql.catalyst.expressions.Divide import org.apache.spark.sql.catalyst.expressions.EqualTo @@ -976,6 +977,7 @@ object Utils extends Logging { tuix.Contains.createContains( builder, leftOffset, rightOffset)) + // Time expressions case (Year(child), Seq(childOffset)) => tuix.Expr.createExpr( builder, @@ -983,6 +985,13 @@ object Utils extends Logging { tuix.Year.createYear( builder, childOffset)) + case (DateAdd(left, right), Seq(leftOffset, rightOffset)) => + tuix.Expr.createExpr( + builder, + tuix.ExprUnion.DateAdd, + tuix.DateAdd.createDateAdd( + builder, leftOffset, rightOffset)) + // Math expressions case (Exp(child), Seq(childOffset)) => tuix.Expr.createExpr( diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index b1d4fc5d52..e455af19b4 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -122,633 +122,668 @@ trait OpaqueOperatorTests extends FunSuite with BeforeAndAfterAll { self => } } - testAgainstSpark("create DataFrame from sequence") { securityLevel => - val data = for (i <- 0 until 5) yield ("foo", i) - makeDF(data, securityLevel, "word", "count").collect - } - - testAgainstSpark("create DataFrame with BinaryType + ByteType") { securityLevel => - val data: Seq[(Array[Byte], Byte)] = - Seq((Array[Byte](0.toByte, -128.toByte, 127.toByte), 42.toByte)) - makeDF(data, securityLevel, "BinaryType", "ByteType").collect - } - - testAgainstSpark("create DataFrame with CalendarIntervalType + NullType") { securityLevel => - val data: Seq[(CalendarInterval, Byte)] = Seq((new CalendarInterval(12, 1, 12345), 0.toByte)) - val schema = StructType(Seq( - StructField("CalendarIntervalType", CalendarIntervalType), - StructField("NullType", NullType))) - - securityLevel.applyTo( - spark.createDataFrame( - spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - schema)).collect - } - - testAgainstSpark("create DataFrame with ShortType + TimestampType") { securityLevel => - val data: Seq[(Short, Timestamp)] = Seq((13.toShort, Timestamp.valueOf("2017-12-02 03:04:00"))) - makeDF(data, securityLevel, "ShortType", "TimestampType").collect - } - - testAgainstSpark("create DataFrame with ArrayType") { securityLevel => - val array: Array[Int] = Array(0, -128, 127, 1) - val data = Seq( - (array, "dog"), - (array, "cat"), - (array, "ant")) - val df = makeDF(data, securityLevel, "array", "string") - df.collect - } - - testAgainstSpark("create DataFrame with MapType") { securityLevel => - val map: Map[String, Int] = Map("x" -> 24, "y" -> 25, "z" -> 26) - val data = Seq( - (map, "dog"), - (map, "cat"), - (map, "ant")) - val df = makeDF(data, securityLevel, "map", "string") - df.collect - } - - testAgainstSpark("create DataFrame with nulls for all types") { securityLevel => - val schema = StructType(Seq( - StructField("boolean", BooleanType), - StructField("integer", IntegerType), - StructField("long", LongType), - StructField("float", FloatType), - StructField("double", DoubleType), - StructField("date", DateType), - StructField("binary", BinaryType), - StructField("byte", ByteType), - StructField("calendar_interval", CalendarIntervalType), - StructField("null", NullType), - StructField("short", ShortType), - StructField("timestamp", TimestampType), - StructField("array_of_int", DataTypes.createArrayType(IntegerType)), - StructField("map_int_to_int", DataTypes.createMapType(IntegerType, IntegerType)), - StructField("string", StringType))) - - securityLevel.applyTo( - spark.createDataFrame( - spark.sparkContext.makeRDD(Seq(Row.fromSeq(Seq.fill(schema.length) { null })), numPartitions), - schema)).collect - } - - testAgainstSpark("filter") { securityLevel => - val df = makeDF( - (1 to 20).map(x => (true, "hello", 1.0, 2.0f, x)), - securityLevel, - "a", "b", "c", "d", "x") - df.filter($"x" > lit(10)).collect - } - - testAgainstSpark("filter with NULLs") { securityLevel => - val data: Seq[Tuple1[Integer]] = Random.shuffle((0 until 256).map(x => { - if (x % 3 == 0) - Tuple1(null.asInstanceOf[Integer]) - else - Tuple1(x.asInstanceOf[Integer]) - }).toSeq) - val df = makeDF(data, securityLevel, "x") - df.filter($"x" > lit(10)).collect.toSet - } - - testAgainstSpark("select") { securityLevel => - val data = for (i <- 0 until 256) yield ("%03d".format(i) * 3, i.toFloat) - val df = makeDF(data, securityLevel, "str", "x") - df.select($"str").collect - } - - testAgainstSpark("select with expressions") { securityLevel => - val df = makeDF( - (1 to 20).map(x => (true, "hello world!", 1.0, 2.0f, x)), - securityLevel, - "a", "b", "c", "d", "x") - df.select( - $"x" + $"x" * $"x" - $"x", - substring($"b", 5, 20), - $"x" > $"x", - $"x" >= $"x", - $"x" <= $"x").collect.toSet - } - - testAgainstSpark("union") { securityLevel => - val df1 = makeDF( - (1 to 20).map(x => (x, x.toString)).reverse, - securityLevel, - "a", "b") - val df2 = makeDF( - (1 to 20).map(x => (x, (x + 1).toString)), - securityLevel, - "a", "b") - df1.union(df2).collect.toSet - } - - testOpaqueOnly("cache") { securityLevel => - def numCached(ds: Dataset[_]): Int = - ds.queryExecution.executedPlan.collect { - case cached: EncryptedBlockRDDScanExec - if cached.rdd.getStorageLevel != StorageLevel.NONE => - cached - }.size - - val data = List((1, 3), (1, 4), (1, 5), (2, 4)) - val df = makeDF(data, securityLevel, "a", "b").cache() - - val agg = df.groupBy($"a").agg(sum("b")) - - assert(numCached(agg) === 1) - - val expected = data.groupBy(_._1).mapValues(_.map(_._2).sum) - assert(agg.collect.toSet === expected.map(Row.fromTuple).toSet) - df.unpersist() - } - - testAgainstSpark("sort") { securityLevel => - val data = Random.shuffle((0 until 256).map(x => (x.toString, x)).toSeq) - val df = makeDF(data, securityLevel, "str", "x") - df.sort($"x").collect - } - - testAgainstSpark("sort zero elements") { securityLevel => - val data = Seq.empty[(String, Int)] - val df = makeDF(data, securityLevel, "str", "x") - df.sort($"x").collect - } - - testAgainstSpark("sort by float") { securityLevel => - val data = Random.shuffle((0 until 256).map(x => (x.toString, x.toFloat)).toSeq) - val df = makeDF(data, securityLevel, "str", "x") - df.sort($"x").collect - } - - testAgainstSpark("sort by string") { securityLevel => - val data = Random.shuffle((0 until 256).map(x => (x.toString, x.toFloat)).toSeq) - val df = makeDF(data, securityLevel, "str", "x") - df.sort($"str").collect - } - - testAgainstSpark("sort by 2 columns") { securityLevel => - val data = Random.shuffle((0 until 256).map(x => (x / 16, x)).toSeq) - val df = makeDF(data, securityLevel, "x", "y") - df.sort($"x", $"y").collect - } - - testAgainstSpark("sort with null values") { securityLevel => - val data: Seq[Tuple1[Integer]] = Random.shuffle((0 until 256).map(x => { - if (x % 3 == 0) - Tuple1(null.asInstanceOf[Integer]) - else - Tuple1(x.asInstanceOf[Integer]) - }).toSeq) - val df = makeDF(data, securityLevel, "x") - df.sort($"x").collect - } - - testAgainstSpark("join") { securityLevel => - val p_data = for (i <- 1 to 16) yield (i, i.toString, i * 10) - val f_data = for (i <- 1 to 256 - 16) yield (i, (i % 16).toString, i * 10) - val p = makeDF(p_data, securityLevel, "id", "pk", "x") - val f = makeDF(f_data, securityLevel, "id", "fk", "x") - p.join(f, $"pk" === $"fk").collect.toSet - } - - testAgainstSpark("join on column 1") { securityLevel => - val p_data = for (i <- 1 to 16) yield (i.toString, i * 10) - val f_data = for (i <- 1 to 256 - 16) yield ((i % 16).toString, (i * 10).toString, i.toFloat) - val p = makeDF(p_data, securityLevel, "pk", "x") - val f = makeDF(f_data, securityLevel, "fk", "x", "y") - p.join(f, $"pk" === $"fk").collect.toSet - } - - testAgainstSpark("non-foreign-key join") { securityLevel => - val p_data = for (i <- 1 to 128) yield (i, (i % 16).toString, i * 10) - val f_data = for (i <- 1 to 256 - 128) yield (i, (i % 16).toString, i * 10) - val p = makeDF(p_data, securityLevel, "id", "join_col_1", "x") - val f = makeDF(f_data, securityLevel, "id", "join_col_2", "x") - p.join(f, $"join_col_1" === $"join_col_2").collect.toSet - } - - def abc(i: Int): String = (i % 3) match { - case 0 => "A" - case 1 => "B" - case 2 => "C" - } - - testAgainstSpark("aggregate average") { securityLevel => - val data = for (i <- 0 until 256) yield (i, abc(i), i.toDouble) - val words = makeDF(data, securityLevel, "id", "category", "price") - - words.groupBy("category").agg(avg("price").as("avgPrice")) - .collect.sortBy { case Row(category: String, _) => category } - } - - testAgainstSpark("aggregate count") { securityLevel => - val data = for (i <- 0 until 256) yield (i, abc(i), 1) - val words = makeDF(data, securityLevel, "id", "category", "price") - - words.groupBy("category").agg(count("category").as("itemsInCategory")) - .collect.sortBy { case Row(category: String, _) => category } - } - - testAgainstSpark("aggregate first") { securityLevel => - val data = for (i <- 0 until 256) yield (i, abc(i), 1) - val words = makeDF(data, securityLevel, "id", "category", "price") - - words.groupBy("category").agg(first("category").as("firstInCategory")) - .collect.sortBy { case Row(category: String, _) => category } - } - - testAgainstSpark("aggregate last") { securityLevel => - val data = for (i <- 0 until 256) yield (i, abc(i), 1) - val words = makeDF(data, securityLevel, "id", "category", "price") - - words.groupBy("category").agg(last("category").as("lastInCategory")) - .collect.sortBy { case Row(category: String, _) => category } - } - - testAgainstSpark("aggregate max") { securityLevel => - val data = for (i <- 0 until 256) yield (i, abc(i), 1) - val words = makeDF(data, securityLevel, "id", "category", "price") - - words.groupBy("category").agg(max("price").as("maxPrice")) - .collect.sortBy { case Row(category: String, _) => category } - } - - testAgainstSpark("aggregate min") { securityLevel => - val data = for (i <- 0 until 256) yield (i, abc(i), 1) - val words = makeDF(data, securityLevel, "id", "category", "price") - - words.groupBy("category").agg(min("price").as("minPrice")) - .collect.sortBy { case Row(category: String, _) => category } - } - - testAgainstSpark("aggregate sum") { securityLevel => - val data = for (i <- 0 until 256) yield (i, abc(i), 1) - val words = makeDF(data, securityLevel, "id", "word", "count") - - words.groupBy("word").agg(sum("count").as("totalCount")) - .collect.sortBy { case Row(word: String, _) => word } - } - - testAgainstSpark("aggregate on multiple columns") { securityLevel => - val data = for (i <- 0 until 256) yield (abc(i), 1, 1.0f) - val words = makeDF(data, securityLevel, "str", "x", "y") - - words.groupBy("str").agg(sum("y").as("totalY"), avg("x").as("avgX")) - .collect.sortBy { case Row(str: String, _, _) => str } - } - - testAgainstSpark("global aggregate") { securityLevel => - val data = for (i <- 0 until 256) yield (i, abc(i), 1) - val words = makeDF(data, securityLevel, "id", "word", "count") - words.agg(sum("count").as("totalCount")).collect - } - - testAgainstSpark("contains") { securityLevel => - val data = for (i <- 0 until 256) yield(i.toString, abc(i)) - val df = makeDF(data, securityLevel, "word", "abc") - df.filter($"word".contains(lit("1"))).collect - } - - testAgainstSpark("between") { securityLevel => - val data = for (i <- 0 until 256) yield(i.toString, i) - val df = makeDF(data, securityLevel, "word", "count") - df.filter($"count".between(50, 150)).collect - } - - testAgainstSpark("year") { securityLevel => - val data = Seq(Tuple2(1, new java.sql.Date(new java.util.Date().getTime()))) - val df = makeDF(data, securityLevel, "id", "date") - df.select(year($"date")).collect - } - - testAgainstSpark("case when - 1 branch with else (string)") { securityLevel => - val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) - val df = makeDF(data, securityLevel, "word", "count") - df.select(when(df("word") === "foo", "hi").otherwise("bye")).collect - } - - testAgainstSpark("case when - 1 branch with else (int)") { securityLevel => - val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) - val df = makeDF(data, securityLevel, "word", "count") - df.select(when(df("word") === "foo", 10).otherwise(30)).collect - } - - testAgainstSpark("case when - 1 branch without else (string)") { securityLevel => - val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) - val df = makeDF(data, securityLevel, "word", "count") - df.select(when(df("word") === "foo", "hi")).collect - } - - testAgainstSpark("case when - 1 branch without else (int)") { securityLevel => - val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) - val df = makeDF(data, securityLevel, "word", "count") - df.select(when(df("word") === "foo", 10)).collect - } - - testAgainstSpark("case when - 2 branch with else (string)") { securityLevel => - val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) - val df = makeDF(data, securityLevel, "word", "count") - df.select(when(df("word") === "foo", "hi").when(df("word") === "baz", "hello").otherwise("bye")).collect - } - - testAgainstSpark("case when - 2 branch with else (int)") { securityLevel => - val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) - val df = makeDF(data, securityLevel, "word", "count") - df.select(when(df("word") === "foo", 10).when(df("word") === "baz", 20).otherwise(30)).collect - } - - testAgainstSpark("case when - 2 branch without else (string)") { securityLevel => - val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) - val df = makeDF(data, securityLevel, "word", "count") - df.select(when(df("word") === "foo", "hi").when(df("word") === "baz", "hello")).collect - } - - testAgainstSpark("case when - 2 branch without else (int)") { securityLevel => - val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) - val df = makeDF(data, securityLevel, "word", "count") - df.select(when(df("word") === "foo", 3).when(df("word") === "baz", 2)).collect - } - - testOpaqueOnly("save and load with explicit schema") { securityLevel => - val data = for (i <- 0 until 256) yield (i, abc(i), 1) - val df = makeDF(data, securityLevel, "id", "word", "count") - val path = Utils.createTempDir() - path.delete() - df.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) - try { - val df2 = spark.read - .format("edu.berkeley.cs.rise.opaque.EncryptedSource") - .schema(df.schema) - .load(path.toString) - assert(df.collect.toSet === df2.collect.toSet) - assert(df.groupBy("word").agg(sum("count")).collect.toSet - === df2.groupBy("word").agg(sum("count")).collect.toSet) - } finally { - Utils.deleteRecursively(path) - } - } - - testOpaqueOnly("save and load without schema") { securityLevel => - val data = for (i <- 0 until 256) yield (i, abc(i), 1) - val df = makeDF(data, securityLevel, "id", "word", "count") - val path = Utils.createTempDir() - path.delete() - df.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) - try { - val df2 = spark.read - .format("edu.berkeley.cs.rise.opaque.EncryptedSource") - .load(path.toString) - assert(df.collect.toSet === df2.collect.toSet) - assert(df.groupBy("word").agg(sum("count")).collect.toSet - === df2.groupBy("word").agg(sum("count")).collect.toSet) - } finally { - Utils.deleteRecursively(path) - } - } - - testOpaqueOnly("load from SQL with explicit schema") { securityLevel => - val data = for (i <- 0 until 256) yield (i, abc(i), 1) - val df = makeDF(data, securityLevel, "id", "word", "count") - val path = Utils.createTempDir() - path.delete() - df.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) - - try { - spark.sql(s""" - |CREATE TEMPORARY VIEW df2 - |(${df.schema.toDDL}) - |USING edu.berkeley.cs.rise.opaque.EncryptedSource - |OPTIONS ( - | path "${path}" - |)""".stripMargin) - val df2 = spark.sql(s""" - |SELECT * FROM df2 - |""".stripMargin) - - assert(df.collect.toSet === df2.collect.toSet) - } finally { - spark.catalog.dropTempView("df2") - Utils.deleteRecursively(path) - } - } - - testOpaqueOnly("load from SQL without schema") { securityLevel => - val data = for (i <- 0 until 256) yield (i, abc(i), 1) - val df = makeDF(data, securityLevel, "id", "word", "count") - val path = Utils.createTempDir() - path.delete() - df.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) + testAgainstSpark("Interval SQL Sanity") { securityLevel => + // val data = Seq(Tuple2(1, new java.sql.Date(new java.util.Date().getTime()))) + // val df = makeDF(data, securityLevel, "index", "time") + val df = Seq(Tuple2(1, new java.sql.Date(new java.util.Date().getTime()))).toDF("index", "time") + df.createTempView("IntervalSanity") try { - spark.sql(s""" - |CREATE TEMPORARY VIEW df2 - |USING edu.berkeley.cs.rise.opaque.EncryptedSource - |OPTIONS ( - | path "${path}" - |)""".stripMargin) - val df2 = spark.sql(s""" - |SELECT * FROM df2 - |""".stripMargin) - - assert(df.collect.toSet === df2.collect.toSet) + spark.sql("SELECT time + INTERVAL 7 DAY FROM IntervalSanity").collect } finally { - spark.catalog.dropTempView("df2") - Utils.deleteRecursively(path) + spark.catalog.dropTempView("IntervalSanity") } } - testAgainstSpark("SQL API") { securityLevel => - val df = makeDF( - (1 to 20).map(x => (true, "hello", 1.0, 2.0f, x)), - securityLevel, - "a", "b", "c", "d", "x") - df.createTempView("df") + testAgainstSpark("Interval SQL") { securityLevel => + val data = Seq(Tuple2(1, new java.sql.Date(new java.util.Date().getTime()))) + val df = makeDF(data, securityLevel, "index", "time") + df.createTempView("Interval") try { - spark.sql("SELECT * FROM df WHERE x > 10").collect + spark.sql("SELECT time + INTERVAL 7 DAY FROM Interval").collect } finally { - spark.catalog.dropTempView("df") + spark.catalog.dropTempView("Interval") } } - testOpaqueOnly("cast error") { securityLevel => - val data: Seq[(CalendarInterval, Byte)] = Seq((new CalendarInterval(12, 1, 12345), 0.toByte)) - val schema = StructType(Seq( - StructField("CalendarIntervalType", CalendarIntervalType), - StructField("NullType", NullType))) - val df = securityLevel.applyTo( - spark.createDataFrame( - spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - schema)) - // Trigger an Opaque exception by attempting an unsupported cast: CalendarIntervalType to - // StringType - val e = intercept[SparkException] { - withLoggingOff { - df.select($"CalendarIntervalType".cast(StringType)).collect - } - } - assert(e.getCause.isInstanceOf[OpaqueException]) - } - - testAgainstSpark("exp") { securityLevel => - val data: Seq[(Double, Double)] = Seq( - (2.0, 3.0)) - val schema = StructType(Seq( - StructField("x", DoubleType), - StructField("y", DoubleType))) - - val df = securityLevel.applyTo( - spark.createDataFrame( - spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - schema)) - - df.select(exp($"y")).collect - } - - testAgainstSpark("vector multiply") { securityLevel => - val data: Seq[(Array[Double], Double)] = Seq( - (Array[Double](1.0, 1.0, 1.0), 3.0)) - val schema = StructType(Seq( - StructField("v", DataTypes.createArrayType(DoubleType)), - StructField("c", DoubleType))) - - val df = securityLevel.applyTo( - spark.createDataFrame( - spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - schema)) - - df.select(vectormultiply($"v", $"c")).collect - } - - testAgainstSpark("dot product") { securityLevel => - val data: Seq[(Array[Double], Array[Double])] = Seq( - (Array[Double](1.0, 1.0, 1.0), Array[Double](1.0, 1.0, 1.0))) - val schema = StructType(Seq( - StructField("v1", DataTypes.createArrayType(DoubleType)), - StructField("v2", DataTypes.createArrayType(DoubleType)))) - - val df = securityLevel.applyTo( - spark.createDataFrame( - spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - schema)) - - df.select(dot($"v1", $"v2")).collect - } - - testAgainstSpark("upper") { securityLevel => - val data = Seq(("lower", "upper"), ("lower2", "upper2")) - val schema = StructType(Seq( - StructField("v1", StringType), - StructField("v2", StringType))) - - val df = securityLevel.applyTo( - spark.createDataFrame( - spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - schema)) - - df.select(upper($"v1")).collect - } - - testAgainstSpark("upper with null") { securityLevel => - val data = Seq(("lower", null.asInstanceOf[String])) - - val df = makeDF(data, securityLevel, "v1", "v2") - - df.select(upper($"v2")).collect - } - - testAgainstSpark("vector sum") { securityLevel => - val data: Seq[(Array[Double], Double)] = Seq( - (Array[Double](1.0, 2.0, 3.0), 4.0), - (Array[Double](5.0, 7.0, 7.0), 8.0)) - val schema = StructType(Seq( - StructField("v", DataTypes.createArrayType(DoubleType)), - StructField("c", DoubleType))) - - val df = securityLevel.applyTo( - spark.createDataFrame( - spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - schema)) - - val vectorsum = new VectorSum - df.groupBy().agg(vectorsum($"v")).collect - } - - testAgainstSpark("create array") { securityLevel => - val data: Seq[(Double, Double)] = Seq( - (1.0, 2.0), - (3.0, 4.0)) - val schema = StructType(Seq( - StructField("x1", DoubleType), - StructField("x2", DoubleType))) - - val df = securityLevel.applyTo( - spark.createDataFrame( - spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - schema)) - - df.select(array($"x1", $"x2").as("x")).collect - } - - testAgainstSpark("limit with fewer returned values") { securityLevel => - val data = Random.shuffle(for (i <- 0 until 256) yield (i, abc(i))) - val schema = StructType(Seq( - StructField("id", IntegerType), - StructField("word", StringType))) - val df = securityLevel.applyTo( - spark.createDataFrame( - spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - schema)) - df.sort($"id").limit(5).collect - } - - testAgainstSpark("limit with more returned values") { securityLevel => - val data = Random.shuffle(for (i <- 0 until 256) yield (i, abc(i))) - val schema = StructType(Seq( - StructField("id", IntegerType), - StructField("word", StringType))) - val df = securityLevel.applyTo( - spark.createDataFrame( - spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - schema)) - df.sort($"id").limit(200).collect - } - - testAgainstSpark("least squares") { securityLevel => - LeastSquares.query(spark, securityLevel, "tiny", numPartitions).collect - } - - testAgainstSpark("logistic regression") { securityLevel => - LogisticRegression.train(spark, securityLevel, 1000, numPartitions) - } - - testAgainstSpark("k-means") { securityLevel => - import scala.math.Ordering.Implicits.seqDerivedOrdering - KMeans.train(spark, securityLevel, numPartitions, 10, 2, 3, 0.01).map(_.toSeq).sorted + testAgainstSpark("Date Add Sanity") { securityLevel => + val df = Seq(Tuple2(1, new java.sql.Date(new java.util.Date().getTime()))).toDF("index", "time") + df.select(date_add($"time", 3)).collect } - testAgainstSpark("pagerank") { securityLevel => - PageRank.run(spark, securityLevel, "256", numPartitions).collect.toSet - } - - testAgainstSpark("TPC-H 9") { securityLevel => - TPCH.tpch9(spark.sqlContext, securityLevel, "sf_small", numPartitions).collect.toSet - } - - testAgainstSpark("big data 1") { securityLevel => - BigDataBenchmark.q1(spark, securityLevel, "tiny", numPartitions).collect - } - - testAgainstSpark("big data 2") { securityLevel => - BigDataBenchmark.q2(spark, securityLevel, "tiny", numPartitions).collect - .map { case Row(a: String, b: Double) => (a, b.toFloat) } - .sortBy(_._1) - } - - testAgainstSpark("big data 3") { securityLevel => - BigDataBenchmark.q3(spark, securityLevel, "tiny", numPartitions).collect - } + testAgainstSpark("Date Add") { securityLevel => + val data = Seq(Tuple2(1, new java.sql.Date(new java.util.Date().getTime()))) + val df = makeDF(data, securityLevel, "index", "time") + df.select(date_add($"time", 3)).collect + } + + // testAgainstSpark("create DataFrame from sequence") { securityLevel => + // val data = for (i <- 0 until 5) yield ("foo", i) + // makeDF(data, securityLevel, "word", "count").collect + // } + + // testAgainstSpark("create DataFrame with BinaryType + ByteType") { securityLevel => + // val data: Seq[(Array[Byte], Byte)] = + // Seq((Array[Byte](0.toByte, -128.toByte, 127.toByte), 42.toByte)) + // makeDF(data, securityLevel, "BinaryType", "ByteType").collect + // } + + // testAgainstSpark("create DataFrame with CalendarIntervalType + NullType") { securityLevel => + // val data: Seq[(CalendarInterval, Byte)] = Seq((new CalendarInterval(12, 1, 12345), 0.toByte)) + // val schema = StructType(Seq( + // StructField("CalendarIntervalType", CalendarIntervalType), + // StructField("NullType", NullType))) + + // securityLevel.applyTo( + // spark.createDataFrame( + // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + // schema)).collect + // } + + // testAgainstSpark("create DataFrame with ShortType + TimestampType") { securityLevel => + // val data: Seq[(Short, Timestamp)] = Seq((13.toShort, Timestamp.valueOf("2017-12-02 03:04:00"))) + // makeDF(data, securityLevel, "ShortType", "TimestampType").collect + // } + + // testAgainstSpark("create DataFrame with ArrayType") { securityLevel => + // val array: Array[Int] = Array(0, -128, 127, 1) + // val data = Seq( + // (array, "dog"), + // (array, "cat"), + // (array, "ant")) + // val df = makeDF(data, securityLevel, "array", "string") + // df.collect + // } + + // testAgainstSpark("create DataFrame with MapType") { securityLevel => + // val map: Map[String, Int] = Map("x" -> 24, "y" -> 25, "z" -> 26) + // val data = Seq( + // (map, "dog"), + // (map, "cat"), + // (map, "ant")) + // val df = makeDF(data, securityLevel, "map", "string") + // df.collect + // } + + // testAgainstSpark("create DataFrame with nulls for all types") { securityLevel => + // val schema = StructType(Seq( + // StructField("boolean", BooleanType), + // StructField("integer", IntegerType), + // StructField("long", LongType), + // StructField("float", FloatType), + // StructField("double", DoubleType), + // StructField("date", DateType), + // StructField("binary", BinaryType), + // StructField("byte", ByteType), + // StructField("calendar_interval", CalendarIntervalType), + // StructField("null", NullType), + // StructField("short", ShortType), + // StructField("timestamp", TimestampType), + // StructField("array_of_int", DataTypes.createArrayType(IntegerType)), + // StructField("map_int_to_int", DataTypes.createMapType(IntegerType, IntegerType)), + // StructField("string", StringType))) + + // securityLevel.applyTo( + // spark.createDataFrame( + // spark.sparkContext.makeRDD(Seq(Row.fromSeq(Seq.fill(schema.length) { null })), numPartitions), + // schema)).collect + // } + + // testAgainstSpark("filter") { securityLevel => + // val df = makeDF( + // (1 to 20).map(x => (true, "hello", 1.0, 2.0f, x)), + // securityLevel, + // "a", "b", "c", "d", "x") + // df.filter($"x" > lit(10)).collect + // } + + // testAgainstSpark("filter with NULLs") { securityLevel => + // val data: Seq[Tuple1[Integer]] = Random.shuffle((0 until 256).map(x => { + // if (x % 3 == 0) + // Tuple1(null.asInstanceOf[Integer]) + // else + // Tuple1(x.asInstanceOf[Integer]) + // }).toSeq) + // val df = makeDF(data, securityLevel, "x") + // df.filter($"x" > lit(10)).collect.toSet + // } + + // testAgainstSpark("select") { securityLevel => + // val data = for (i <- 0 until 256) yield ("%03d".format(i) * 3, i.toFloat) + // val df = makeDF(data, securityLevel, "str", "x") + // df.select($"str").collect + // } + + // testAgainstSpark("select with expressions") { securityLevel => + // val df = makeDF( + // (1 to 20).map(x => (true, "hello world!", 1.0, 2.0f, x)), + // securityLevel, + // "a", "b", "c", "d", "x") + // df.select( + // $"x" + $"x" * $"x" - $"x", + // substring($"b", 5, 20), + // $"x" > $"x", + // $"x" >= $"x", + // $"x" <= $"x").collect.toSet + // } + + // testAgainstSpark("union") { securityLevel => + // val df1 = makeDF( + // (1 to 20).map(x => (x, x.toString)).reverse, + // securityLevel, + // "a", "b") + // val df2 = makeDF( + // (1 to 20).map(x => (x, (x + 1).toString)), + // securityLevel, + // "a", "b") + // df1.union(df2).collect.toSet + // } + + // testOpaqueOnly("cache") { securityLevel => + // def numCached(ds: Dataset[_]): Int = + // ds.queryExecution.executedPlan.collect { + // case cached: EncryptedBlockRDDScanExec + // if cached.rdd.getStorageLevel != StorageLevel.NONE => + // cached + // }.size + + // val data = List((1, 3), (1, 4), (1, 5), (2, 4)) + // val df = makeDF(data, securityLevel, "a", "b").cache() + + // val agg = df.groupBy($"a").agg(sum("b")) + + // assert(numCached(agg) === 1) + + // val expected = data.groupBy(_._1).mapValues(_.map(_._2).sum) + // assert(agg.collect.toSet === expected.map(Row.fromTuple).toSet) + // df.unpersist() + // } + + // testAgainstSpark("sort") { securityLevel => + // val data = Random.shuffle((0 until 256).map(x => (x.toString, x)).toSeq) + // val df = makeDF(data, securityLevel, "str", "x") + // df.sort($"x").collect + // } + + // testAgainstSpark("sort zero elements") { securityLevel => + // val data = Seq.empty[(String, Int)] + // val df = makeDF(data, securityLevel, "str", "x") + // df.sort($"x").collect + // } + + // testAgainstSpark("sort by float") { securityLevel => + // val data = Random.shuffle((0 until 256).map(x => (x.toString, x.toFloat)).toSeq) + // val df = makeDF(data, securityLevel, "str", "x") + // df.sort($"x").collect + // } + + // testAgainstSpark("sort by string") { securityLevel => + // val data = Random.shuffle((0 until 256).map(x => (x.toString, x.toFloat)).toSeq) + // val df = makeDF(data, securityLevel, "str", "x") + // df.sort($"str").collect + // } + + // testAgainstSpark("sort by 2 columns") { securityLevel => + // val data = Random.shuffle((0 until 256).map(x => (x / 16, x)).toSeq) + // val df = makeDF(data, securityLevel, "x", "y") + // df.sort($"x", $"y").collect + // } + + // testAgainstSpark("sort with null values") { securityLevel => + // val data: Seq[Tuple1[Integer]] = Random.shuffle((0 until 256).map(x => { + // if (x % 3 == 0) + // Tuple1(null.asInstanceOf[Integer]) + // else + // Tuple1(x.asInstanceOf[Integer]) + // }).toSeq) + // val df = makeDF(data, securityLevel, "x") + // df.sort($"x").collect + // } + + // testAgainstSpark("join") { securityLevel => + // val p_data = for (i <- 1 to 16) yield (i, i.toString, i * 10) + // val f_data = for (i <- 1 to 256 - 16) yield (i, (i % 16).toString, i * 10) + // val p = makeDF(p_data, securityLevel, "id", "pk", "x") + // val f = makeDF(f_data, securityLevel, "id", "fk", "x") + // p.join(f, $"pk" === $"fk").collect.toSet + // } + + // testAgainstSpark("join on column 1") { securityLevel => + // val p_data = for (i <- 1 to 16) yield (i.toString, i * 10) + // val f_data = for (i <- 1 to 256 - 16) yield ((i % 16).toString, (i * 10).toString, i.toFloat) + // val p = makeDF(p_data, securityLevel, "pk", "x") + // val f = makeDF(f_data, securityLevel, "fk", "x", "y") + // p.join(f, $"pk" === $"fk").collect.toSet + // } + + // testAgainstSpark("non-foreign-key join") { securityLevel => + // val p_data = for (i <- 1 to 128) yield (i, (i % 16).toString, i * 10) + // val f_data = for (i <- 1 to 256 - 128) yield (i, (i % 16).toString, i * 10) + // val p = makeDF(p_data, securityLevel, "id", "join_col_1", "x") + // val f = makeDF(f_data, securityLevel, "id", "join_col_2", "x") + // p.join(f, $"join_col_1" === $"join_col_2").collect.toSet + // } + + // def abc(i: Int): String = (i % 3) match { + // case 0 => "A" + // case 1 => "B" + // case 2 => "C" + // } + + // testAgainstSpark("aggregate average") { securityLevel => + // val data = for (i <- 0 until 256) yield (i, abc(i), i.toDouble) + // val words = makeDF(data, securityLevel, "id", "category", "price") + + // words.groupBy("category").agg(avg("price").as("avgPrice")) + // .collect.sortBy { case Row(category: String, _) => category } + // } + + // testAgainstSpark("aggregate count") { securityLevel => + // val data = for (i <- 0 until 256) yield (i, abc(i), 1) + // val words = makeDF(data, securityLevel, "id", "category", "price") + + // words.groupBy("category").agg(count("category").as("itemsInCategory")) + // .collect.sortBy { case Row(category: String, _) => category } + // } + + // testAgainstSpark("aggregate first") { securityLevel => + // val data = for (i <- 0 until 256) yield (i, abc(i), 1) + // val words = makeDF(data, securityLevel, "id", "category", "price") + + // words.groupBy("category").agg(first("category").as("firstInCategory")) + // .collect.sortBy { case Row(category: String, _) => category } + // } + + // testAgainstSpark("aggregate last") { securityLevel => + // val data = for (i <- 0 until 256) yield (i, abc(i), 1) + // val words = makeDF(data, securityLevel, "id", "category", "price") + + // words.groupBy("category").agg(last("category").as("lastInCategory")) + // .collect.sortBy { case Row(category: String, _) => category } + // } + + // testAgainstSpark("aggregate max") { securityLevel => + // val data = for (i <- 0 until 256) yield (i, abc(i), 1) + // val words = makeDF(data, securityLevel, "id", "category", "price") + + // words.groupBy("category").agg(max("price").as("maxPrice")) + // .collect.sortBy { case Row(category: String, _) => category } + // } + + // testAgainstSpark("aggregate min") { securityLevel => + // val data = for (i <- 0 until 256) yield (i, abc(i), 1) + // val words = makeDF(data, securityLevel, "id", "category", "price") + + // words.groupBy("category").agg(min("price").as("minPrice")) + // .collect.sortBy { case Row(category: String, _) => category } + // } + + // testAgainstSpark("aggregate sum") { securityLevel => + // val data = for (i <- 0 until 256) yield (i, abc(i), 1) + // val words = makeDF(data, securityLevel, "id", "word", "count") + + // words.groupBy("word").agg(sum("count").as("totalCount")) + // .collect.sortBy { case Row(word: String, _) => word } + // } + + // testAgainstSpark("aggregate on multiple columns") { securityLevel => + // val data = for (i <- 0 until 256) yield (abc(i), 1, 1.0f) + // val words = makeDF(data, securityLevel, "str", "x", "y") + + // words.groupBy("str").agg(sum("y").as("totalY"), avg("x").as("avgX")) + // .collect.sortBy { case Row(str: String, _, _) => str } + // } + + // testAgainstSpark("global aggregate") { securityLevel => + // val data = for (i <- 0 until 256) yield (i, abc(i), 1) + // val words = makeDF(data, securityLevel, "id", "word", "count") + // words.agg(sum("count").as("totalCount")).collect + // } + + // testAgainstSpark("contains") { securityLevel => + // val data = for (i <- 0 until 256) yield(i.toString, abc(i)) + // val df = makeDF(data, securityLevel, "word", "abc") + // df.filter($"word".contains(lit("1"))).collect + // } + + // testAgainstSpark("between") { securityLevel => + // val data = for (i <- 0 until 256) yield(i.toString, i) + // val df = makeDF(data, securityLevel, "word", "count") + // df.filter($"count".between(50, 150)).collect + // } + + // testAgainstSpark("year") { securityLevel => + // val data = Seq(Tuple2(1, new java.sql.Date(new java.util.Date().getTime()))) + // val df = makeDF(data, securityLevel, "id", "date") + // df.select(year($"date")).collect + // } + + // testAgainstSpark("case when - 1 branch with else (string)") { securityLevel => + // val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) + // val df = makeDF(data, securityLevel, "word", "count") + // df.select(when(df("word") === "foo", "hi").otherwise("bye")).collect + // } + + // testAgainstSpark("case when - 1 branch with else (int)") { securityLevel => + // val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) + // val df = makeDF(data, securityLevel, "word", "count") + // df.select(when(df("word") === "foo", 10).otherwise(30)).collect + // } + + // testAgainstSpark("case when - 1 branch without else (string)") { securityLevel => + // val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) + // val df = makeDF(data, securityLevel, "word", "count") + // df.select(when(df("word") === "foo", "hi")).collect + // } + + // testAgainstSpark("case when - 1 branch without else (int)") { securityLevel => + // val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) + // val df = makeDF(data, securityLevel, "word", "count") + // df.select(when(df("word") === "foo", 10)).collect + // } + + // testAgainstSpark("case when - 2 branch with else (string)") { securityLevel => + // val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) + // val df = makeDF(data, securityLevel, "word", "count") + // df.select(when(df("word") === "foo", "hi").when(df("word") === "baz", "hello").otherwise("bye")).collect + // } + + // testAgainstSpark("case when - 2 branch with else (int)") { securityLevel => + // val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) + // val df = makeDF(data, securityLevel, "word", "count") + // df.select(when(df("word") === "foo", 10).when(df("word") === "baz", 20).otherwise(30)).collect + // } + + // testAgainstSpark("case when - 2 branch without else (string)") { securityLevel => + // val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) + // val df = makeDF(data, securityLevel, "word", "count") + // df.select(when(df("word") === "foo", "hi").when(df("word") === "baz", "hello")).collect + // } + + // testAgainstSpark("case when - 2 branch without else (int)") { securityLevel => + // val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) + // val df = makeDF(data, securityLevel, "word", "count") + // df.select(when(df("word") === "foo", 3).when(df("word") === "baz", 2)).collect + // } + + // testOpaqueOnly("save and load with explicit schema") { securityLevel => + // val data = for (i <- 0 until 256) yield (i, abc(i), 1) + // val df = makeDF(data, securityLevel, "id", "word", "count") + // val path = Utils.createTempDir() + // path.delete() + // df.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) + // try { + // val df2 = spark.read + // .format("edu.berkeley.cs.rise.opaque.EncryptedSource") + // .schema(df.schema) + // .load(path.toString) + // assert(df.collect.toSet === df2.collect.toSet) + // assert(df.groupBy("word").agg(sum("count")).collect.toSet + // === df2.groupBy("word").agg(sum("count")).collect.toSet) + // } finally { + // Utils.deleteRecursively(path) + // } + // } + + // testOpaqueOnly("save and load without schema") { securityLevel => + // val data = for (i <- 0 until 256) yield (i, abc(i), 1) + // val df = makeDF(data, securityLevel, "id", "word", "count") + // val path = Utils.createTempDir() + // path.delete() + // df.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) + // try { + // val df2 = spark.read + // .format("edu.berkeley.cs.rise.opaque.EncryptedSource") + // .load(path.toString) + // assert(df.collect.toSet === df2.collect.toSet) + // assert(df.groupBy("word").agg(sum("count")).collect.toSet + // === df2.groupBy("word").agg(sum("count")).collect.toSet) + // } finally { + // Utils.deleteRecursively(path) + // } + // } + + // testOpaqueOnly("load from SQL with explicit schema") { securityLevel => + // val data = for (i <- 0 until 256) yield (i, abc(i), 1) + // val df = makeDF(data, securityLevel, "id", "word", "count") + // val path = Utils.createTempDir() + // path.delete() + // df.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) + + // try { + // spark.sql(s""" + // |CREATE TEMPORARY VIEW df2 + // |(${df.schema.toDDL}) + // |USING edu.berkeley.cs.rise.opaque.EncryptedSource + // |OPTIONS ( + // | path "${path}" + // |)""".stripMargin) + // val df2 = spark.sql(s""" + // |SELECT * FROM df2 + // |""".stripMargin) + + // assert(df.collect.toSet === df2.collect.toSet) + // } finally { + // spark.catalog.dropTempView("df2") + // Utils.deleteRecursively(path) + // } + // } + + // testOpaqueOnly("load from SQL without schema") { securityLevel => + // val data = for (i <- 0 until 256) yield (i, abc(i), 1) + // val df = makeDF(data, securityLevel, "id", "word", "count") + // val path = Utils.createTempDir() + // path.delete() + // df.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) + + // try { + // spark.sql(s""" + // |CREATE TEMPORARY VIEW df2 + // |USING edu.berkeley.cs.rise.opaque.EncryptedSource + // |OPTIONS ( + // | path "${path}" + // |)""".stripMargin) + // val df2 = spark.sql(s""" + // |SELECT * FROM df2 + // |""".stripMargin) + + // assert(df.collect.toSet === df2.collect.toSet) + // } finally { + // spark.catalog.dropTempView("df2") + // Utils.deleteRecursively(path) + // } + // } + + // testAgainstSpark("SQL API") { securityLevel => + // val df = makeDF( + // (1 to 20).map(x => (true, "hello", 1.0, 2.0f, x)), + // securityLevel, + // "a", "b", "c", "d", "x") + // df.createTempView("df") + // try { + // spark.sql("SELECT * FROM df WHERE x > 10").collect + // } finally { + // spark.catalog.dropTempView("df") + // } + // } + + // testOpaqueOnly("cast error") { securityLevel => + // val data: Seq[(CalendarInterval, Byte)] = Seq((new CalendarInterval(12, 1, 12345), 0.toByte)) + // val schema = StructType(Seq( + // StructField("CalendarIntervalType", CalendarIntervalType), + // StructField("NullType", NullType))) + // val df = securityLevel.applyTo( + // spark.createDataFrame( + // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + // schema)) + // // Trigger an Opaque exception by attempting an unsupported cast: CalendarIntervalType to + // // StringType + // val e = intercept[SparkException] { + // withLoggingOff { + // df.select($"CalendarIntervalType".cast(StringType)).collect + // } + // } + // assert(e.getCause.isInstanceOf[OpaqueException]) + // } + + // testAgainstSpark("exp") { securityLevel => + // val data: Seq[(Double, Double)] = Seq( + // (2.0, 3.0)) + // val schema = StructType(Seq( + // StructField("x", DoubleType), + // StructField("y", DoubleType))) + + // val df = securityLevel.applyTo( + // spark.createDataFrame( + // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + // schema)) + + // df.select(exp($"y")).collect + // } + + // testAgainstSpark("vector multiply") { securityLevel => + // val data: Seq[(Array[Double], Double)] = Seq( + // (Array[Double](1.0, 1.0, 1.0), 3.0)) + // val schema = StructType(Seq( + // StructField("v", DataTypes.createArrayType(DoubleType)), + // StructField("c", DoubleType))) + + // val df = securityLevel.applyTo( + // spark.createDataFrame( + // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + // schema)) + + // df.select(vectormultiply($"v", $"c")).collect + // } + + // testAgainstSpark("dot product") { securityLevel => + // val data: Seq[(Array[Double], Array[Double])] = Seq( + // (Array[Double](1.0, 1.0, 1.0), Array[Double](1.0, 1.0, 1.0))) + // val schema = StructType(Seq( + // StructField("v1", DataTypes.createArrayType(DoubleType)), + // StructField("v2", DataTypes.createArrayType(DoubleType)))) + + // val df = securityLevel.applyTo( + // spark.createDataFrame( + // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + // schema)) + + // df.select(dot($"v1", $"v2")).collect + // } + + // testAgainstSpark("upper") { securityLevel => + // val data = Seq(("lower", "upper"), ("lower2", "upper2")) + // val schema = StructType(Seq( + // StructField("v1", StringType), + // StructField("v2", StringType))) + + // val df = securityLevel.applyTo( + // spark.createDataFrame( + // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + // schema)) + + // df.select(upper($"v1")).collect + // } + + // testAgainstSpark("upper with null") { securityLevel => + // val data = Seq(("lower", null.asInstanceOf[String])) + + // val df = makeDF(data, securityLevel, "v1", "v2") + + // df.select(upper($"v2")).collect + // } + + // testAgainstSpark("vector sum") { securityLevel => + // val data: Seq[(Array[Double], Double)] = Seq( + // (Array[Double](1.0, 2.0, 3.0), 4.0), + // (Array[Double](5.0, 7.0, 7.0), 8.0)) + // val schema = StructType(Seq( + // StructField("v", DataTypes.createArrayType(DoubleType)), + // StructField("c", DoubleType))) + + // val df = securityLevel.applyTo( + // spark.createDataFrame( + // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + // schema)) + + // val vectorsum = new VectorSum + // df.groupBy().agg(vectorsum($"v")).collect + // } + + // testAgainstSpark("create array") { securityLevel => + // val data: Seq[(Double, Double)] = Seq( + // (1.0, 2.0), + // (3.0, 4.0)) + // val schema = StructType(Seq( + // StructField("x1", DoubleType), + // StructField("x2", DoubleType))) + + // val df = securityLevel.applyTo( + // spark.createDataFrame( + // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + // schema)) + + // df.select(array($"x1", $"x2").as("x")).collect + // } + + // testAgainstSpark("limit with fewer returned values") { securityLevel => + // val data = Random.shuffle(for (i <- 0 until 256) yield (i, abc(i))) + // val schema = StructType(Seq( + // StructField("id", IntegerType), + // StructField("word", StringType))) + // val df = securityLevel.applyTo( + // spark.createDataFrame( + // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + // schema)) + // df.sort($"id").limit(5).collect + // } + + // testAgainstSpark("limit with more returned values") { securityLevel => + // val data = Random.shuffle(for (i <- 0 until 256) yield (i, abc(i))) + // val schema = StructType(Seq( + // StructField("id", IntegerType), + // StructField("word", StringType))) + // val df = securityLevel.applyTo( + // spark.createDataFrame( + // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + // schema)) + // df.sort($"id").limit(200).collect + // } + + // testAgainstSpark("least squares") { securityLevel => + // LeastSquares.query(spark, securityLevel, "tiny", numPartitions).collect + // } + + // testAgainstSpark("logistic regression") { securityLevel => + // LogisticRegression.train(spark, securityLevel, 1000, numPartitions) + // } + + // testAgainstSpark("k-means") { securityLevel => + // import scala.math.Ordering.Implicits.seqDerivedOrdering + // KMeans.train(spark, securityLevel, numPartitions, 10, 2, 3, 0.01).map(_.toSeq).sorted + // } + + // testAgainstSpark("pagerank") { securityLevel => + // PageRank.run(spark, securityLevel, "256", numPartitions).collect.toSet + // } + + // testAgainstSpark("TPC-H 9") { securityLevel => + // TPCH.tpch9(spark.sqlContext, securityLevel, "sf_small", numPartitions).collect.toSet + // } + + // testAgainstSpark("big data 1") { securityLevel => + // BigDataBenchmark.q1(spark, securityLevel, "tiny", numPartitions).collect + // } + + // testAgainstSpark("big data 2") { securityLevel => + // BigDataBenchmark.q2(spark, securityLevel, "tiny", numPartitions).collect + // .map { case Row(a: String, b: Double) => (a, b.toFloat) } + // .sortBy(_._1) + // } + + // testAgainstSpark("big data 3") { securityLevel => + // BigDataBenchmark.q3(spark, securityLevel, "tiny", numPartitions).collect + // } def makeDF[A <: Product : scala.reflect.ClassTag : scala.reflect.runtime.universe.TypeTag]( data: Seq[A], securityLevel: SecurityLevel, columnNames: String*): DataFrame = @@ -790,23 +825,23 @@ class OpaqueMultiplePartitionSuite extends OpaqueOperatorTests { .toDF(columnNames: _*)) } - testAgainstSpark("join with different numbers of partitions (#34)") { securityLevel => - val p_data = for (i <- 1 to 16) yield (i.toString, i * 10) - val f_data = for (i <- 1 to 256 - 16) yield ((i % 16).toString, (i * 10).toString, i.toFloat) - val p = makeDF(p_data, securityLevel, "pk", "x") - val f = makePartitionedDF(f_data, securityLevel, numPartitions + 1, "fk", "x", "y") - p.join(f, $"pk" === $"fk").collect.toSet - } - - testAgainstSpark("non-foreign-key join with high skew") { securityLevel => - // This test is intended to ensure that primary groups are never split across multiple - // partitions, which would break our implementation of non-foreign-key join. - - val p_data = for (i <- 1 to 128) yield (i, 1) - val f_data = for (i <- 1 to 128) yield (i, 1) - val p = makeDF(p_data, securityLevel, "id", "join_col_1") - val f = makeDF(f_data, securityLevel, "id", "join_col_2") - p.join(f, $"join_col_1" === $"join_col_2").collect.toSet - } + // testAgainstSpark("join with different numbers of partitions (#34)") { securityLevel => + // val p_data = for (i <- 1 to 16) yield (i.toString, i * 10) + // val f_data = for (i <- 1 to 256 - 16) yield ((i % 16).toString, (i * 10).toString, i.toFloat) + // val p = makeDF(p_data, securityLevel, "pk", "x") + // val f = makePartitionedDF(f_data, securityLevel, numPartitions + 1, "fk", "x", "y") + // p.join(f, $"pk" === $"fk").collect.toSet + // } + + // testAgainstSpark("non-foreign-key join with high skew") { securityLevel => + // // This test is intended to ensure that primary groups are never split across multiple + // // partitions, which would break our implementation of non-foreign-key join. + + // val p_data = for (i <- 1 to 128) yield (i, 1) + // val f_data = for (i <- 1 to 128) yield (i, 1) + // val p = makeDF(p_data, securityLevel, "id", "join_col_1") + // val f = makeDF(f_data, securityLevel, "id", "join_col_2") + // p.join(f, $"join_col_1" === $"join_col_2").collect.toSet + // } } From b9b2422ca94d0a5181310ea6cca0630f5f42dda3 Mon Sep 17 00:00:00 2001 From: Eric Feng Date: Tue, 20 Oct 2020 12:25:27 -0500 Subject: [PATCH 2/5] Add Interval SQL support --- src/enclave/Enclave/ExpressionEvaluation.h | 77 ++++++++++++++++++- src/flatbuffers/Expr.fbs | 8 +- .../edu/berkeley/cs/rise/opaque/Utils.scala | 9 +++ .../cs/rise/opaque/OpaqueOperatorTests.scala | 30 ++++---- 4 files changed, 108 insertions(+), 16 deletions(-) diff --git a/src/enclave/Enclave/ExpressionEvaluation.h b/src/enclave/Enclave/ExpressionEvaluation.h index 2b40e23de6..5c46fa477e 100644 --- a/src/enclave/Enclave/ExpressionEvaluation.h +++ b/src/enclave/Enclave/ExpressionEvaluation.h @@ -3,8 +3,11 @@ #include #include #include +// #include #include +#include + #include "Flatbuffers.h" int printf(const char *fmt, ...); @@ -265,6 +268,25 @@ class FlatbuffersExpressionEvaluator { case tuix::ExprUnion_Literal: { + auto * literal = static_cast(expr->expr()); + const tuix::Field *value = literal->value(); + + // If type is CalendarInterval, manually return a calendar interval field. + // Otherwise 'days' disappears in conversion. + if (value->value_type() == 10) { + + auto *interval = value->value_as_CalendarIntervalField(); + uint32_t months = interval->months(); + uint32_t days = interval->days(); + uint64_t ms = interval->microseconds(); + + return tuix::CreateField( + builder, + tuix::FieldUnion_CalendarIntervalField, + tuix::CreateCalendarIntervalField(builder, months, days, ms).Union(), + false); + } + return flatbuffers_copy( static_cast(expr->expr())->value(), builder); } @@ -403,6 +425,7 @@ class FlatbuffersExpressionEvaluator { auto add = static_cast(expr->expr()); auto left_offset = eval_helper(row, add->left()); auto right_offset = eval_helper(row, add->right()); + return eval_binary_arithmetic_op( builder, flatbuffers::GetTemporaryPointer(builder, left_offset), @@ -891,8 +914,6 @@ class FlatbuffersExpressionEvaluator { // Time expressions case tuix::ExprUnion_DateAdd: { - // TODO: handle Contains(str, "") - auto c = static_cast(expr->expr()); auto left_offset = eval_helper(row, c->left()); auto right_offset = eval_helper(row, c->right()); @@ -934,6 +955,58 @@ class FlatbuffersExpressionEvaluator { } } + case tuix::ExprUnion_DateAddInterval: + { + auto c = static_cast(expr->expr()); + auto left_offset = eval_helper(row, c->left()); + auto right_offset = eval_helper(row, c->right()); + + // Note: These temporary pointers will be invalidated when we next write to builder + const tuix::Field *left = flatbuffers::GetTemporaryPointer(builder, left_offset); + const tuix::Field *right = flatbuffers::GetTemporaryPointer(builder, right_offset); + + if (left->value_type() != tuix::FieldUnion_DateField + || right->value_type() != tuix::FieldUnion_CalendarIntervalField) { + throw std::runtime_error( + std::string("tuix::DateAddInterval requires date Date, interval CalendarIntervalField, not ") + + std::string("date ") + + std::string(tuix::EnumNameFieldUnion(left->value_type())) + + std::string(", interval ") + + std::string(tuix::EnumNameFieldUnion(right->value_type()))); + } + + bool result_is_null = left->is_null() || right->is_null(); + uint32_t result = 0; + + if (!result_is_null) { + + auto left_field = static_cast(left->value()); + auto right_field = static_cast(right->value()); + + //This is an approximation + //TODO take into account leap seconds + uint64_t date = 86400L*left_field->value(); + struct tm tm; + secs_to_tm(date, &tm); + tm.tm_mon += right_field->months(); + tm.tm_mday += right_field->days(); + time_t time = std::mktime(&tm); + uint32_t result = (time + (right_field->microseconds() / 1000)) / 86400L; + + return tuix::CreateField( + builder, + tuix::FieldUnion_DateField, + tuix::CreateDateField(builder, result).Union(), + result_is_null); + } else { + return tuix::CreateField( + builder, + tuix::FieldUnion_DateField, + tuix::CreateDateField(builder, result).Union(), + result_is_null); + } + } + case tuix::ExprUnion_Year: { auto e = static_cast(expr->expr()); diff --git a/src/flatbuffers/Expr.fbs b/src/flatbuffers/Expr.fbs index b78baa3602..7587bb3b9c 100644 --- a/src/flatbuffers/Expr.fbs +++ b/src/flatbuffers/Expr.fbs @@ -34,7 +34,8 @@ union ExprUnion { ClosestPoint, CreateArray, Upper, - DateAdd + DateAdd, + DateAddInterval } table Expr { @@ -153,6 +154,11 @@ table DateAdd { right:Expr; } +table DateAddInterval { + left:Expr; + right:Expr; +} + // Math expressions table Exp { child:Expr; diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index c740914585..9fa0c69f17 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -45,6 +45,7 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.catalyst.expressions.Contains import org.apache.spark.sql.catalyst.expressions.DateAdd +import org.apache.spark.sql.catalyst.expressions.DateAddInterval import org.apache.spark.sql.catalyst.expressions.Descending import org.apache.spark.sql.catalyst.expressions.Divide import org.apache.spark.sql.catalyst.expressions.EqualTo @@ -67,6 +68,7 @@ import org.apache.spark.sql.catalyst.expressions.Or import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.catalyst.expressions.Substring import org.apache.spark.sql.catalyst.expressions.Subtract +import org.apache.spark.sql.catalyst.expressions.TimeAdd import org.apache.spark.sql.catalyst.expressions.UnaryMinus import org.apache.spark.sql.catalyst.expressions.Upper import org.apache.spark.sql.catalyst.expressions.Year @@ -992,6 +994,13 @@ object Utils extends Logging { tuix.DateAdd.createDateAdd( builder, leftOffset, rightOffset)) + case (DateAddInterval(left, right, _, _), Seq(leftOffset, rightOffset)) => + tuix.Expr.createExpr( + builder, + tuix.ExprUnion.DateAddInterval, + tuix.DateAddInterval.createDateAddInterval( + builder, leftOffset, rightOffset)) + // Math expressions case (Exp(child), Seq(childOffset)) => tuix.Expr.createExpr( diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index e455af19b4..19c0bacf26 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -122,33 +122,37 @@ trait OpaqueOperatorTests extends FunSuite with BeforeAndAfterAll { self => } } - testAgainstSpark("Interval SQL Sanity") { securityLevel => - // val data = Seq(Tuple2(1, new java.sql.Date(new java.util.Date().getTime()))) - // val df = makeDF(data, securityLevel, "index", "time") - - val df = Seq(Tuple2(1, new java.sql.Date(new java.util.Date().getTime()))).toDF("index", "time") - df.createTempView("IntervalSanity") + testAgainstSpark("Interval SQL") { securityLevel => + val data = Seq(Tuple2(1, new java.sql.Date(new java.util.Date().getTime()))) + val df = makeDF(data, securityLevel, "index", "time") + df.createTempView("Interval") try { - spark.sql("SELECT time + INTERVAL 7 DAY FROM IntervalSanity").collect + spark.sql("SELECT time + INTERVAL 7 DAY FROM Interval").collect } finally { - spark.catalog.dropTempView("IntervalSanity") + spark.catalog.dropTempView("Interval") } } - testAgainstSpark("Interval SQL") { securityLevel => + testAgainstSpark("Interval Week SQL") { securityLevel => val data = Seq(Tuple2(1, new java.sql.Date(new java.util.Date().getTime()))) val df = makeDF(data, securityLevel, "index", "time") df.createTempView("Interval") try { - spark.sql("SELECT time + INTERVAL 7 DAY FROM Interval").collect + spark.sql("SELECT time + INTERVAL 7 WEEK FROM Interval").collect } finally { spark.catalog.dropTempView("Interval") } } - testAgainstSpark("Date Add Sanity") { securityLevel => - val df = Seq(Tuple2(1, new java.sql.Date(new java.util.Date().getTime()))).toDF("index", "time") - df.select(date_add($"time", 3)).collect + testAgainstSpark("Interval Month SQL") { securityLevel => + val data = Seq(Tuple2(1, new java.sql.Date(new java.util.Date().getTime()))) + val df = makeDF(data, securityLevel, "index", "time") + df.createTempView("Interval") + try { + spark.sql("SELECT time + INTERVAL 6 MONTH FROM Interval").collect + } finally { + spark.catalog.dropTempView("Interval") + } } testAgainstSpark("Date Add") { securityLevel => From ece0dec1530fe7111395f20e073d6be156bc3e24 Mon Sep 17 00:00:00 2001 From: Eric Feng Date: Tue, 20 Oct 2020 12:26:42 -0500 Subject: [PATCH 3/5] uncomment out the other tests --- .../cs/rise/opaque/OpaqueOperatorTests.scala | 1290 ++++++++--------- 1 file changed, 645 insertions(+), 645 deletions(-) diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index 19c0bacf26..604a5deac0 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -161,633 +161,633 @@ trait OpaqueOperatorTests extends FunSuite with BeforeAndAfterAll { self => df.select(date_add($"time", 3)).collect } - // testAgainstSpark("create DataFrame from sequence") { securityLevel => - // val data = for (i <- 0 until 5) yield ("foo", i) - // makeDF(data, securityLevel, "word", "count").collect - // } - - // testAgainstSpark("create DataFrame with BinaryType + ByteType") { securityLevel => - // val data: Seq[(Array[Byte], Byte)] = - // Seq((Array[Byte](0.toByte, -128.toByte, 127.toByte), 42.toByte)) - // makeDF(data, securityLevel, "BinaryType", "ByteType").collect - // } - - // testAgainstSpark("create DataFrame with CalendarIntervalType + NullType") { securityLevel => - // val data: Seq[(CalendarInterval, Byte)] = Seq((new CalendarInterval(12, 1, 12345), 0.toByte)) - // val schema = StructType(Seq( - // StructField("CalendarIntervalType", CalendarIntervalType), - // StructField("NullType", NullType))) - - // securityLevel.applyTo( - // spark.createDataFrame( - // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - // schema)).collect - // } - - // testAgainstSpark("create DataFrame with ShortType + TimestampType") { securityLevel => - // val data: Seq[(Short, Timestamp)] = Seq((13.toShort, Timestamp.valueOf("2017-12-02 03:04:00"))) - // makeDF(data, securityLevel, "ShortType", "TimestampType").collect - // } - - // testAgainstSpark("create DataFrame with ArrayType") { securityLevel => - // val array: Array[Int] = Array(0, -128, 127, 1) - // val data = Seq( - // (array, "dog"), - // (array, "cat"), - // (array, "ant")) - // val df = makeDF(data, securityLevel, "array", "string") - // df.collect - // } - - // testAgainstSpark("create DataFrame with MapType") { securityLevel => - // val map: Map[String, Int] = Map("x" -> 24, "y" -> 25, "z" -> 26) - // val data = Seq( - // (map, "dog"), - // (map, "cat"), - // (map, "ant")) - // val df = makeDF(data, securityLevel, "map", "string") - // df.collect - // } - - // testAgainstSpark("create DataFrame with nulls for all types") { securityLevel => - // val schema = StructType(Seq( - // StructField("boolean", BooleanType), - // StructField("integer", IntegerType), - // StructField("long", LongType), - // StructField("float", FloatType), - // StructField("double", DoubleType), - // StructField("date", DateType), - // StructField("binary", BinaryType), - // StructField("byte", ByteType), - // StructField("calendar_interval", CalendarIntervalType), - // StructField("null", NullType), - // StructField("short", ShortType), - // StructField("timestamp", TimestampType), - // StructField("array_of_int", DataTypes.createArrayType(IntegerType)), - // StructField("map_int_to_int", DataTypes.createMapType(IntegerType, IntegerType)), - // StructField("string", StringType))) - - // securityLevel.applyTo( - // spark.createDataFrame( - // spark.sparkContext.makeRDD(Seq(Row.fromSeq(Seq.fill(schema.length) { null })), numPartitions), - // schema)).collect - // } - - // testAgainstSpark("filter") { securityLevel => - // val df = makeDF( - // (1 to 20).map(x => (true, "hello", 1.0, 2.0f, x)), - // securityLevel, - // "a", "b", "c", "d", "x") - // df.filter($"x" > lit(10)).collect - // } - - // testAgainstSpark("filter with NULLs") { securityLevel => - // val data: Seq[Tuple1[Integer]] = Random.shuffle((0 until 256).map(x => { - // if (x % 3 == 0) - // Tuple1(null.asInstanceOf[Integer]) - // else - // Tuple1(x.asInstanceOf[Integer]) - // }).toSeq) - // val df = makeDF(data, securityLevel, "x") - // df.filter($"x" > lit(10)).collect.toSet - // } - - // testAgainstSpark("select") { securityLevel => - // val data = for (i <- 0 until 256) yield ("%03d".format(i) * 3, i.toFloat) - // val df = makeDF(data, securityLevel, "str", "x") - // df.select($"str").collect - // } - - // testAgainstSpark("select with expressions") { securityLevel => - // val df = makeDF( - // (1 to 20).map(x => (true, "hello world!", 1.0, 2.0f, x)), - // securityLevel, - // "a", "b", "c", "d", "x") - // df.select( - // $"x" + $"x" * $"x" - $"x", - // substring($"b", 5, 20), - // $"x" > $"x", - // $"x" >= $"x", - // $"x" <= $"x").collect.toSet - // } - - // testAgainstSpark("union") { securityLevel => - // val df1 = makeDF( - // (1 to 20).map(x => (x, x.toString)).reverse, - // securityLevel, - // "a", "b") - // val df2 = makeDF( - // (1 to 20).map(x => (x, (x + 1).toString)), - // securityLevel, - // "a", "b") - // df1.union(df2).collect.toSet - // } - - // testOpaqueOnly("cache") { securityLevel => - // def numCached(ds: Dataset[_]): Int = - // ds.queryExecution.executedPlan.collect { - // case cached: EncryptedBlockRDDScanExec - // if cached.rdd.getStorageLevel != StorageLevel.NONE => - // cached - // }.size - - // val data = List((1, 3), (1, 4), (1, 5), (2, 4)) - // val df = makeDF(data, securityLevel, "a", "b").cache() - - // val agg = df.groupBy($"a").agg(sum("b")) - - // assert(numCached(agg) === 1) - - // val expected = data.groupBy(_._1).mapValues(_.map(_._2).sum) - // assert(agg.collect.toSet === expected.map(Row.fromTuple).toSet) - // df.unpersist() - // } - - // testAgainstSpark("sort") { securityLevel => - // val data = Random.shuffle((0 until 256).map(x => (x.toString, x)).toSeq) - // val df = makeDF(data, securityLevel, "str", "x") - // df.sort($"x").collect - // } - - // testAgainstSpark("sort zero elements") { securityLevel => - // val data = Seq.empty[(String, Int)] - // val df = makeDF(data, securityLevel, "str", "x") - // df.sort($"x").collect - // } - - // testAgainstSpark("sort by float") { securityLevel => - // val data = Random.shuffle((0 until 256).map(x => (x.toString, x.toFloat)).toSeq) - // val df = makeDF(data, securityLevel, "str", "x") - // df.sort($"x").collect - // } - - // testAgainstSpark("sort by string") { securityLevel => - // val data = Random.shuffle((0 until 256).map(x => (x.toString, x.toFloat)).toSeq) - // val df = makeDF(data, securityLevel, "str", "x") - // df.sort($"str").collect - // } - - // testAgainstSpark("sort by 2 columns") { securityLevel => - // val data = Random.shuffle((0 until 256).map(x => (x / 16, x)).toSeq) - // val df = makeDF(data, securityLevel, "x", "y") - // df.sort($"x", $"y").collect - // } - - // testAgainstSpark("sort with null values") { securityLevel => - // val data: Seq[Tuple1[Integer]] = Random.shuffle((0 until 256).map(x => { - // if (x % 3 == 0) - // Tuple1(null.asInstanceOf[Integer]) - // else - // Tuple1(x.asInstanceOf[Integer]) - // }).toSeq) - // val df = makeDF(data, securityLevel, "x") - // df.sort($"x").collect - // } - - // testAgainstSpark("join") { securityLevel => - // val p_data = for (i <- 1 to 16) yield (i, i.toString, i * 10) - // val f_data = for (i <- 1 to 256 - 16) yield (i, (i % 16).toString, i * 10) - // val p = makeDF(p_data, securityLevel, "id", "pk", "x") - // val f = makeDF(f_data, securityLevel, "id", "fk", "x") - // p.join(f, $"pk" === $"fk").collect.toSet - // } - - // testAgainstSpark("join on column 1") { securityLevel => - // val p_data = for (i <- 1 to 16) yield (i.toString, i * 10) - // val f_data = for (i <- 1 to 256 - 16) yield ((i % 16).toString, (i * 10).toString, i.toFloat) - // val p = makeDF(p_data, securityLevel, "pk", "x") - // val f = makeDF(f_data, securityLevel, "fk", "x", "y") - // p.join(f, $"pk" === $"fk").collect.toSet - // } - - // testAgainstSpark("non-foreign-key join") { securityLevel => - // val p_data = for (i <- 1 to 128) yield (i, (i % 16).toString, i * 10) - // val f_data = for (i <- 1 to 256 - 128) yield (i, (i % 16).toString, i * 10) - // val p = makeDF(p_data, securityLevel, "id", "join_col_1", "x") - // val f = makeDF(f_data, securityLevel, "id", "join_col_2", "x") - // p.join(f, $"join_col_1" === $"join_col_2").collect.toSet - // } - - // def abc(i: Int): String = (i % 3) match { - // case 0 => "A" - // case 1 => "B" - // case 2 => "C" - // } - - // testAgainstSpark("aggregate average") { securityLevel => - // val data = for (i <- 0 until 256) yield (i, abc(i), i.toDouble) - // val words = makeDF(data, securityLevel, "id", "category", "price") - - // words.groupBy("category").agg(avg("price").as("avgPrice")) - // .collect.sortBy { case Row(category: String, _) => category } - // } - - // testAgainstSpark("aggregate count") { securityLevel => - // val data = for (i <- 0 until 256) yield (i, abc(i), 1) - // val words = makeDF(data, securityLevel, "id", "category", "price") - - // words.groupBy("category").agg(count("category").as("itemsInCategory")) - // .collect.sortBy { case Row(category: String, _) => category } - // } - - // testAgainstSpark("aggregate first") { securityLevel => - // val data = for (i <- 0 until 256) yield (i, abc(i), 1) - // val words = makeDF(data, securityLevel, "id", "category", "price") - - // words.groupBy("category").agg(first("category").as("firstInCategory")) - // .collect.sortBy { case Row(category: String, _) => category } - // } - - // testAgainstSpark("aggregate last") { securityLevel => - // val data = for (i <- 0 until 256) yield (i, abc(i), 1) - // val words = makeDF(data, securityLevel, "id", "category", "price") - - // words.groupBy("category").agg(last("category").as("lastInCategory")) - // .collect.sortBy { case Row(category: String, _) => category } - // } - - // testAgainstSpark("aggregate max") { securityLevel => - // val data = for (i <- 0 until 256) yield (i, abc(i), 1) - // val words = makeDF(data, securityLevel, "id", "category", "price") - - // words.groupBy("category").agg(max("price").as("maxPrice")) - // .collect.sortBy { case Row(category: String, _) => category } - // } - - // testAgainstSpark("aggregate min") { securityLevel => - // val data = for (i <- 0 until 256) yield (i, abc(i), 1) - // val words = makeDF(data, securityLevel, "id", "category", "price") - - // words.groupBy("category").agg(min("price").as("minPrice")) - // .collect.sortBy { case Row(category: String, _) => category } - // } - - // testAgainstSpark("aggregate sum") { securityLevel => - // val data = for (i <- 0 until 256) yield (i, abc(i), 1) - // val words = makeDF(data, securityLevel, "id", "word", "count") - - // words.groupBy("word").agg(sum("count").as("totalCount")) - // .collect.sortBy { case Row(word: String, _) => word } - // } - - // testAgainstSpark("aggregate on multiple columns") { securityLevel => - // val data = for (i <- 0 until 256) yield (abc(i), 1, 1.0f) - // val words = makeDF(data, securityLevel, "str", "x", "y") - - // words.groupBy("str").agg(sum("y").as("totalY"), avg("x").as("avgX")) - // .collect.sortBy { case Row(str: String, _, _) => str } - // } - - // testAgainstSpark("global aggregate") { securityLevel => - // val data = for (i <- 0 until 256) yield (i, abc(i), 1) - // val words = makeDF(data, securityLevel, "id", "word", "count") - // words.agg(sum("count").as("totalCount")).collect - // } - - // testAgainstSpark("contains") { securityLevel => - // val data = for (i <- 0 until 256) yield(i.toString, abc(i)) - // val df = makeDF(data, securityLevel, "word", "abc") - // df.filter($"word".contains(lit("1"))).collect - // } - - // testAgainstSpark("between") { securityLevel => - // val data = for (i <- 0 until 256) yield(i.toString, i) - // val df = makeDF(data, securityLevel, "word", "count") - // df.filter($"count".between(50, 150)).collect - // } - - // testAgainstSpark("year") { securityLevel => - // val data = Seq(Tuple2(1, new java.sql.Date(new java.util.Date().getTime()))) - // val df = makeDF(data, securityLevel, "id", "date") - // df.select(year($"date")).collect - // } - - // testAgainstSpark("case when - 1 branch with else (string)") { securityLevel => - // val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) - // val df = makeDF(data, securityLevel, "word", "count") - // df.select(when(df("word") === "foo", "hi").otherwise("bye")).collect - // } - - // testAgainstSpark("case when - 1 branch with else (int)") { securityLevel => - // val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) - // val df = makeDF(data, securityLevel, "word", "count") - // df.select(when(df("word") === "foo", 10).otherwise(30)).collect - // } - - // testAgainstSpark("case when - 1 branch without else (string)") { securityLevel => - // val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) - // val df = makeDF(data, securityLevel, "word", "count") - // df.select(when(df("word") === "foo", "hi")).collect - // } - - // testAgainstSpark("case when - 1 branch without else (int)") { securityLevel => - // val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) - // val df = makeDF(data, securityLevel, "word", "count") - // df.select(when(df("word") === "foo", 10)).collect - // } - - // testAgainstSpark("case when - 2 branch with else (string)") { securityLevel => - // val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) - // val df = makeDF(data, securityLevel, "word", "count") - // df.select(when(df("word") === "foo", "hi").when(df("word") === "baz", "hello").otherwise("bye")).collect - // } - - // testAgainstSpark("case when - 2 branch with else (int)") { securityLevel => - // val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) - // val df = makeDF(data, securityLevel, "word", "count") - // df.select(when(df("word") === "foo", 10).when(df("word") === "baz", 20).otherwise(30)).collect - // } - - // testAgainstSpark("case when - 2 branch without else (string)") { securityLevel => - // val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) - // val df = makeDF(data, securityLevel, "word", "count") - // df.select(when(df("word") === "foo", "hi").when(df("word") === "baz", "hello")).collect - // } - - // testAgainstSpark("case when - 2 branch without else (int)") { securityLevel => - // val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) - // val df = makeDF(data, securityLevel, "word", "count") - // df.select(when(df("word") === "foo", 3).when(df("word") === "baz", 2)).collect - // } - - // testOpaqueOnly("save and load with explicit schema") { securityLevel => - // val data = for (i <- 0 until 256) yield (i, abc(i), 1) - // val df = makeDF(data, securityLevel, "id", "word", "count") - // val path = Utils.createTempDir() - // path.delete() - // df.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) - // try { - // val df2 = spark.read - // .format("edu.berkeley.cs.rise.opaque.EncryptedSource") - // .schema(df.schema) - // .load(path.toString) - // assert(df.collect.toSet === df2.collect.toSet) - // assert(df.groupBy("word").agg(sum("count")).collect.toSet - // === df2.groupBy("word").agg(sum("count")).collect.toSet) - // } finally { - // Utils.deleteRecursively(path) - // } - // } - - // testOpaqueOnly("save and load without schema") { securityLevel => - // val data = for (i <- 0 until 256) yield (i, abc(i), 1) - // val df = makeDF(data, securityLevel, "id", "word", "count") - // val path = Utils.createTempDir() - // path.delete() - // df.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) - // try { - // val df2 = spark.read - // .format("edu.berkeley.cs.rise.opaque.EncryptedSource") - // .load(path.toString) - // assert(df.collect.toSet === df2.collect.toSet) - // assert(df.groupBy("word").agg(sum("count")).collect.toSet - // === df2.groupBy("word").agg(sum("count")).collect.toSet) - // } finally { - // Utils.deleteRecursively(path) - // } - // } - - // testOpaqueOnly("load from SQL with explicit schema") { securityLevel => - // val data = for (i <- 0 until 256) yield (i, abc(i), 1) - // val df = makeDF(data, securityLevel, "id", "word", "count") - // val path = Utils.createTempDir() - // path.delete() - // df.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) - - // try { - // spark.sql(s""" - // |CREATE TEMPORARY VIEW df2 - // |(${df.schema.toDDL}) - // |USING edu.berkeley.cs.rise.opaque.EncryptedSource - // |OPTIONS ( - // | path "${path}" - // |)""".stripMargin) - // val df2 = spark.sql(s""" - // |SELECT * FROM df2 - // |""".stripMargin) - - // assert(df.collect.toSet === df2.collect.toSet) - // } finally { - // spark.catalog.dropTempView("df2") - // Utils.deleteRecursively(path) - // } - // } - - // testOpaqueOnly("load from SQL without schema") { securityLevel => - // val data = for (i <- 0 until 256) yield (i, abc(i), 1) - // val df = makeDF(data, securityLevel, "id", "word", "count") - // val path = Utils.createTempDir() - // path.delete() - // df.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) - - // try { - // spark.sql(s""" - // |CREATE TEMPORARY VIEW df2 - // |USING edu.berkeley.cs.rise.opaque.EncryptedSource - // |OPTIONS ( - // | path "${path}" - // |)""".stripMargin) - // val df2 = spark.sql(s""" - // |SELECT * FROM df2 - // |""".stripMargin) - - // assert(df.collect.toSet === df2.collect.toSet) - // } finally { - // spark.catalog.dropTempView("df2") - // Utils.deleteRecursively(path) - // } - // } - - // testAgainstSpark("SQL API") { securityLevel => - // val df = makeDF( - // (1 to 20).map(x => (true, "hello", 1.0, 2.0f, x)), - // securityLevel, - // "a", "b", "c", "d", "x") - // df.createTempView("df") - // try { - // spark.sql("SELECT * FROM df WHERE x > 10").collect - // } finally { - // spark.catalog.dropTempView("df") - // } - // } - - // testOpaqueOnly("cast error") { securityLevel => - // val data: Seq[(CalendarInterval, Byte)] = Seq((new CalendarInterval(12, 1, 12345), 0.toByte)) - // val schema = StructType(Seq( - // StructField("CalendarIntervalType", CalendarIntervalType), - // StructField("NullType", NullType))) - // val df = securityLevel.applyTo( - // spark.createDataFrame( - // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - // schema)) - // // Trigger an Opaque exception by attempting an unsupported cast: CalendarIntervalType to - // // StringType - // val e = intercept[SparkException] { - // withLoggingOff { - // df.select($"CalendarIntervalType".cast(StringType)).collect - // } - // } - // assert(e.getCause.isInstanceOf[OpaqueException]) - // } - - // testAgainstSpark("exp") { securityLevel => - // val data: Seq[(Double, Double)] = Seq( - // (2.0, 3.0)) - // val schema = StructType(Seq( - // StructField("x", DoubleType), - // StructField("y", DoubleType))) - - // val df = securityLevel.applyTo( - // spark.createDataFrame( - // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - // schema)) - - // df.select(exp($"y")).collect - // } - - // testAgainstSpark("vector multiply") { securityLevel => - // val data: Seq[(Array[Double], Double)] = Seq( - // (Array[Double](1.0, 1.0, 1.0), 3.0)) - // val schema = StructType(Seq( - // StructField("v", DataTypes.createArrayType(DoubleType)), - // StructField("c", DoubleType))) - - // val df = securityLevel.applyTo( - // spark.createDataFrame( - // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - // schema)) - - // df.select(vectormultiply($"v", $"c")).collect - // } - - // testAgainstSpark("dot product") { securityLevel => - // val data: Seq[(Array[Double], Array[Double])] = Seq( - // (Array[Double](1.0, 1.0, 1.0), Array[Double](1.0, 1.0, 1.0))) - // val schema = StructType(Seq( - // StructField("v1", DataTypes.createArrayType(DoubleType)), - // StructField("v2", DataTypes.createArrayType(DoubleType)))) - - // val df = securityLevel.applyTo( - // spark.createDataFrame( - // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - // schema)) - - // df.select(dot($"v1", $"v2")).collect - // } - - // testAgainstSpark("upper") { securityLevel => - // val data = Seq(("lower", "upper"), ("lower2", "upper2")) - // val schema = StructType(Seq( - // StructField("v1", StringType), - // StructField("v2", StringType))) - - // val df = securityLevel.applyTo( - // spark.createDataFrame( - // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - // schema)) - - // df.select(upper($"v1")).collect - // } - - // testAgainstSpark("upper with null") { securityLevel => - // val data = Seq(("lower", null.asInstanceOf[String])) - - // val df = makeDF(data, securityLevel, "v1", "v2") - - // df.select(upper($"v2")).collect - // } - - // testAgainstSpark("vector sum") { securityLevel => - // val data: Seq[(Array[Double], Double)] = Seq( - // (Array[Double](1.0, 2.0, 3.0), 4.0), - // (Array[Double](5.0, 7.0, 7.0), 8.0)) - // val schema = StructType(Seq( - // StructField("v", DataTypes.createArrayType(DoubleType)), - // StructField("c", DoubleType))) - - // val df = securityLevel.applyTo( - // spark.createDataFrame( - // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - // schema)) - - // val vectorsum = new VectorSum - // df.groupBy().agg(vectorsum($"v")).collect - // } - - // testAgainstSpark("create array") { securityLevel => - // val data: Seq[(Double, Double)] = Seq( - // (1.0, 2.0), - // (3.0, 4.0)) - // val schema = StructType(Seq( - // StructField("x1", DoubleType), - // StructField("x2", DoubleType))) - - // val df = securityLevel.applyTo( - // spark.createDataFrame( - // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - // schema)) - - // df.select(array($"x1", $"x2").as("x")).collect - // } - - // testAgainstSpark("limit with fewer returned values") { securityLevel => - // val data = Random.shuffle(for (i <- 0 until 256) yield (i, abc(i))) - // val schema = StructType(Seq( - // StructField("id", IntegerType), - // StructField("word", StringType))) - // val df = securityLevel.applyTo( - // spark.createDataFrame( - // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - // schema)) - // df.sort($"id").limit(5).collect - // } - - // testAgainstSpark("limit with more returned values") { securityLevel => - // val data = Random.shuffle(for (i <- 0 until 256) yield (i, abc(i))) - // val schema = StructType(Seq( - // StructField("id", IntegerType), - // StructField("word", StringType))) - // val df = securityLevel.applyTo( - // spark.createDataFrame( - // spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), - // schema)) - // df.sort($"id").limit(200).collect - // } - - // testAgainstSpark("least squares") { securityLevel => - // LeastSquares.query(spark, securityLevel, "tiny", numPartitions).collect - // } - - // testAgainstSpark("logistic regression") { securityLevel => - // LogisticRegression.train(spark, securityLevel, 1000, numPartitions) - // } - - // testAgainstSpark("k-means") { securityLevel => - // import scala.math.Ordering.Implicits.seqDerivedOrdering - // KMeans.train(spark, securityLevel, numPartitions, 10, 2, 3, 0.01).map(_.toSeq).sorted - // } - - // testAgainstSpark("pagerank") { securityLevel => - // PageRank.run(spark, securityLevel, "256", numPartitions).collect.toSet - // } - - // testAgainstSpark("TPC-H 9") { securityLevel => - // TPCH.tpch9(spark.sqlContext, securityLevel, "sf_small", numPartitions).collect.toSet - // } - - // testAgainstSpark("big data 1") { securityLevel => - // BigDataBenchmark.q1(spark, securityLevel, "tiny", numPartitions).collect - // } - - // testAgainstSpark("big data 2") { securityLevel => - // BigDataBenchmark.q2(spark, securityLevel, "tiny", numPartitions).collect - // .map { case Row(a: String, b: Double) => (a, b.toFloat) } - // .sortBy(_._1) - // } - - // testAgainstSpark("big data 3") { securityLevel => - // BigDataBenchmark.q3(spark, securityLevel, "tiny", numPartitions).collect - // } + testAgainstSpark("create DataFrame from sequence") { securityLevel => + val data = for (i <- 0 until 5) yield ("foo", i) + makeDF(data, securityLevel, "word", "count").collect + } + + testAgainstSpark("create DataFrame with BinaryType + ByteType") { securityLevel => + val data: Seq[(Array[Byte], Byte)] = + Seq((Array[Byte](0.toByte, -128.toByte, 127.toByte), 42.toByte)) + makeDF(data, securityLevel, "BinaryType", "ByteType").collect + } + + testAgainstSpark("create DataFrame with CalendarIntervalType + NullType") { securityLevel => + val data: Seq[(CalendarInterval, Byte)] = Seq((new CalendarInterval(12, 1, 12345), 0.toByte)) + val schema = StructType(Seq( + StructField("CalendarIntervalType", CalendarIntervalType), + StructField("NullType", NullType))) + + securityLevel.applyTo( + spark.createDataFrame( + spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + schema)).collect + } + + testAgainstSpark("create DataFrame with ShortType + TimestampType") { securityLevel => + val data: Seq[(Short, Timestamp)] = Seq((13.toShort, Timestamp.valueOf("2017-12-02 03:04:00"))) + makeDF(data, securityLevel, "ShortType", "TimestampType").collect + } + + testAgainstSpark("create DataFrame with ArrayType") { securityLevel => + val array: Array[Int] = Array(0, -128, 127, 1) + val data = Seq( + (array, "dog"), + (array, "cat"), + (array, "ant")) + val df = makeDF(data, securityLevel, "array", "string") + df.collect + } + + testAgainstSpark("create DataFrame with MapType") { securityLevel => + val map: Map[String, Int] = Map("x" -> 24, "y" -> 25, "z" -> 26) + val data = Seq( + (map, "dog"), + (map, "cat"), + (map, "ant")) + val df = makeDF(data, securityLevel, "map", "string") + df.collect + } + + testAgainstSpark("create DataFrame with nulls for all types") { securityLevel => + val schema = StructType(Seq( + StructField("boolean", BooleanType), + StructField("integer", IntegerType), + StructField("long", LongType), + StructField("float", FloatType), + StructField("double", DoubleType), + StructField("date", DateType), + StructField("binary", BinaryType), + StructField("byte", ByteType), + StructField("calendar_interval", CalendarIntervalType), + StructField("null", NullType), + StructField("short", ShortType), + StructField("timestamp", TimestampType), + StructField("array_of_int", DataTypes.createArrayType(IntegerType)), + StructField("map_int_to_int", DataTypes.createMapType(IntegerType, IntegerType)), + StructField("string", StringType))) + + securityLevel.applyTo( + spark.createDataFrame( + spark.sparkContext.makeRDD(Seq(Row.fromSeq(Seq.fill(schema.length) { null })), numPartitions), + schema)).collect + } + + testAgainstSpark("filter") { securityLevel => + val df = makeDF( + (1 to 20).map(x => (true, "hello", 1.0, 2.0f, x)), + securityLevel, + "a", "b", "c", "d", "x") + df.filter($"x" > lit(10)).collect + } + + testAgainstSpark("filter with NULLs") { securityLevel => + val data: Seq[Tuple1[Integer]] = Random.shuffle((0 until 256).map(x => { + if (x % 3 == 0) + Tuple1(null.asInstanceOf[Integer]) + else + Tuple1(x.asInstanceOf[Integer]) + }).toSeq) + val df = makeDF(data, securityLevel, "x") + df.filter($"x" > lit(10)).collect.toSet + } + + testAgainstSpark("select") { securityLevel => + val data = for (i <- 0 until 256) yield ("%03d".format(i) * 3, i.toFloat) + val df = makeDF(data, securityLevel, "str", "x") + df.select($"str").collect + } + + testAgainstSpark("select with expressions") { securityLevel => + val df = makeDF( + (1 to 20).map(x => (true, "hello world!", 1.0, 2.0f, x)), + securityLevel, + "a", "b", "c", "d", "x") + df.select( + $"x" + $"x" * $"x" - $"x", + substring($"b", 5, 20), + $"x" > $"x", + $"x" >= $"x", + $"x" <= $"x").collect.toSet + } + + testAgainstSpark("union") { securityLevel => + val df1 = makeDF( + (1 to 20).map(x => (x, x.toString)).reverse, + securityLevel, + "a", "b") + val df2 = makeDF( + (1 to 20).map(x => (x, (x + 1).toString)), + securityLevel, + "a", "b") + df1.union(df2).collect.toSet + } + + testOpaqueOnly("cache") { securityLevel => + def numCached(ds: Dataset[_]): Int = + ds.queryExecution.executedPlan.collect { + case cached: EncryptedBlockRDDScanExec + if cached.rdd.getStorageLevel != StorageLevel.NONE => + cached + }.size + + val data = List((1, 3), (1, 4), (1, 5), (2, 4)) + val df = makeDF(data, securityLevel, "a", "b").cache() + + val agg = df.groupBy($"a").agg(sum("b")) + + assert(numCached(agg) === 1) + + val expected = data.groupBy(_._1).mapValues(_.map(_._2).sum) + assert(agg.collect.toSet === expected.map(Row.fromTuple).toSet) + df.unpersist() + } + + testAgainstSpark("sort") { securityLevel => + val data = Random.shuffle((0 until 256).map(x => (x.toString, x)).toSeq) + val df = makeDF(data, securityLevel, "str", "x") + df.sort($"x").collect + } + + testAgainstSpark("sort zero elements") { securityLevel => + val data = Seq.empty[(String, Int)] + val df = makeDF(data, securityLevel, "str", "x") + df.sort($"x").collect + } + + testAgainstSpark("sort by float") { securityLevel => + val data = Random.shuffle((0 until 256).map(x => (x.toString, x.toFloat)).toSeq) + val df = makeDF(data, securityLevel, "str", "x") + df.sort($"x").collect + } + + testAgainstSpark("sort by string") { securityLevel => + val data = Random.shuffle((0 until 256).map(x => (x.toString, x.toFloat)).toSeq) + val df = makeDF(data, securityLevel, "str", "x") + df.sort($"str").collect + } + + testAgainstSpark("sort by 2 columns") { securityLevel => + val data = Random.shuffle((0 until 256).map(x => (x / 16, x)).toSeq) + val df = makeDF(data, securityLevel, "x", "y") + df.sort($"x", $"y").collect + } + + testAgainstSpark("sort with null values") { securityLevel => + val data: Seq[Tuple1[Integer]] = Random.shuffle((0 until 256).map(x => { + if (x % 3 == 0) + Tuple1(null.asInstanceOf[Integer]) + else + Tuple1(x.asInstanceOf[Integer]) + }).toSeq) + val df = makeDF(data, securityLevel, "x") + df.sort($"x").collect + } + + testAgainstSpark("join") { securityLevel => + val p_data = for (i <- 1 to 16) yield (i, i.toString, i * 10) + val f_data = for (i <- 1 to 256 - 16) yield (i, (i % 16).toString, i * 10) + val p = makeDF(p_data, securityLevel, "id", "pk", "x") + val f = makeDF(f_data, securityLevel, "id", "fk", "x") + p.join(f, $"pk" === $"fk").collect.toSet + } + + testAgainstSpark("join on column 1") { securityLevel => + val p_data = for (i <- 1 to 16) yield (i.toString, i * 10) + val f_data = for (i <- 1 to 256 - 16) yield ((i % 16).toString, (i * 10).toString, i.toFloat) + val p = makeDF(p_data, securityLevel, "pk", "x") + val f = makeDF(f_data, securityLevel, "fk", "x", "y") + p.join(f, $"pk" === $"fk").collect.toSet + } + + testAgainstSpark("non-foreign-key join") { securityLevel => + val p_data = for (i <- 1 to 128) yield (i, (i % 16).toString, i * 10) + val f_data = for (i <- 1 to 256 - 128) yield (i, (i % 16).toString, i * 10) + val p = makeDF(p_data, securityLevel, "id", "join_col_1", "x") + val f = makeDF(f_data, securityLevel, "id", "join_col_2", "x") + p.join(f, $"join_col_1" === $"join_col_2").collect.toSet + } + + def abc(i: Int): String = (i % 3) match { + case 0 => "A" + case 1 => "B" + case 2 => "C" + } + + testAgainstSpark("aggregate average") { securityLevel => + val data = for (i <- 0 until 256) yield (i, abc(i), i.toDouble) + val words = makeDF(data, securityLevel, "id", "category", "price") + + words.groupBy("category").agg(avg("price").as("avgPrice")) + .collect.sortBy { case Row(category: String, _) => category } + } + + testAgainstSpark("aggregate count") { securityLevel => + val data = for (i <- 0 until 256) yield (i, abc(i), 1) + val words = makeDF(data, securityLevel, "id", "category", "price") + + words.groupBy("category").agg(count("category").as("itemsInCategory")) + .collect.sortBy { case Row(category: String, _) => category } + } + + testAgainstSpark("aggregate first") { securityLevel => + val data = for (i <- 0 until 256) yield (i, abc(i), 1) + val words = makeDF(data, securityLevel, "id", "category", "price") + + words.groupBy("category").agg(first("category").as("firstInCategory")) + .collect.sortBy { case Row(category: String, _) => category } + } + + testAgainstSpark("aggregate last") { securityLevel => + val data = for (i <- 0 until 256) yield (i, abc(i), 1) + val words = makeDF(data, securityLevel, "id", "category", "price") + + words.groupBy("category").agg(last("category").as("lastInCategory")) + .collect.sortBy { case Row(category: String, _) => category } + } + + testAgainstSpark("aggregate max") { securityLevel => + val data = for (i <- 0 until 256) yield (i, abc(i), 1) + val words = makeDF(data, securityLevel, "id", "category", "price") + + words.groupBy("category").agg(max("price").as("maxPrice")) + .collect.sortBy { case Row(category: String, _) => category } + } + + testAgainstSpark("aggregate min") { securityLevel => + val data = for (i <- 0 until 256) yield (i, abc(i), 1) + val words = makeDF(data, securityLevel, "id", "category", "price") + + words.groupBy("category").agg(min("price").as("minPrice")) + .collect.sortBy { case Row(category: String, _) => category } + } + + testAgainstSpark("aggregate sum") { securityLevel => + val data = for (i <- 0 until 256) yield (i, abc(i), 1) + val words = makeDF(data, securityLevel, "id", "word", "count") + + words.groupBy("word").agg(sum("count").as("totalCount")) + .collect.sortBy { case Row(word: String, _) => word } + } + + testAgainstSpark("aggregate on multiple columns") { securityLevel => + val data = for (i <- 0 until 256) yield (abc(i), 1, 1.0f) + val words = makeDF(data, securityLevel, "str", "x", "y") + + words.groupBy("str").agg(sum("y").as("totalY"), avg("x").as("avgX")) + .collect.sortBy { case Row(str: String, _, _) => str } + } + + testAgainstSpark("global aggregate") { securityLevel => + val data = for (i <- 0 until 256) yield (i, abc(i), 1) + val words = makeDF(data, securityLevel, "id", "word", "count") + words.agg(sum("count").as("totalCount")).collect + } + + testAgainstSpark("contains") { securityLevel => + val data = for (i <- 0 until 256) yield(i.toString, abc(i)) + val df = makeDF(data, securityLevel, "word", "abc") + df.filter($"word".contains(lit("1"))).collect + } + + testAgainstSpark("between") { securityLevel => + val data = for (i <- 0 until 256) yield(i.toString, i) + val df = makeDF(data, securityLevel, "word", "count") + df.filter($"count".between(50, 150)).collect + } + + testAgainstSpark("year") { securityLevel => + val data = Seq(Tuple2(1, new java.sql.Date(new java.util.Date().getTime()))) + val df = makeDF(data, securityLevel, "id", "date") + df.select(year($"date")).collect + } + + testAgainstSpark("case when - 1 branch with else (string)") { securityLevel => + val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) + val df = makeDF(data, securityLevel, "word", "count") + df.select(when(df("word") === "foo", "hi").otherwise("bye")).collect + } + + testAgainstSpark("case when - 1 branch with else (int)") { securityLevel => + val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) + val df = makeDF(data, securityLevel, "word", "count") + df.select(when(df("word") === "foo", 10).otherwise(30)).collect + } + + testAgainstSpark("case when - 1 branch without else (string)") { securityLevel => + val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) + val df = makeDF(data, securityLevel, "word", "count") + df.select(when(df("word") === "foo", "hi")).collect + } + + testAgainstSpark("case when - 1 branch without else (int)") { securityLevel => + val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) + val df = makeDF(data, securityLevel, "word", "count") + df.select(when(df("word") === "foo", 10)).collect + } + + testAgainstSpark("case when - 2 branch with else (string)") { securityLevel => + val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) + val df = makeDF(data, securityLevel, "word", "count") + df.select(when(df("word") === "foo", "hi").when(df("word") === "baz", "hello").otherwise("bye")).collect + } + + testAgainstSpark("case when - 2 branch with else (int)") { securityLevel => + val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) + val df = makeDF(data, securityLevel, "word", "count") + df.select(when(df("word") === "foo", 10).when(df("word") === "baz", 20).otherwise(30)).collect + } + + testAgainstSpark("case when - 2 branch without else (string)") { securityLevel => + val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) + val df = makeDF(data, securityLevel, "word", "count") + df.select(when(df("word") === "foo", "hi").when(df("word") === "baz", "hello")).collect + } + + testAgainstSpark("case when - 2 branch without else (int)") { securityLevel => + val data = Seq(("foo", 4), ("bar", 1), ("baz", 5), ("bear", null.asInstanceOf[Int])) + val df = makeDF(data, securityLevel, "word", "count") + df.select(when(df("word") === "foo", 3).when(df("word") === "baz", 2)).collect + } + + testOpaqueOnly("save and load with explicit schema") { securityLevel => + val data = for (i <- 0 until 256) yield (i, abc(i), 1) + val df = makeDF(data, securityLevel, "id", "word", "count") + val path = Utils.createTempDir() + path.delete() + df.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) + try { + val df2 = spark.read + .format("edu.berkeley.cs.rise.opaque.EncryptedSource") + .schema(df.schema) + .load(path.toString) + assert(df.collect.toSet === df2.collect.toSet) + assert(df.groupBy("word").agg(sum("count")).collect.toSet + === df2.groupBy("word").agg(sum("count")).collect.toSet) + } finally { + Utils.deleteRecursively(path) + } + } + + testOpaqueOnly("save and load without schema") { securityLevel => + val data = for (i <- 0 until 256) yield (i, abc(i), 1) + val df = makeDF(data, securityLevel, "id", "word", "count") + val path = Utils.createTempDir() + path.delete() + df.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) + try { + val df2 = spark.read + .format("edu.berkeley.cs.rise.opaque.EncryptedSource") + .load(path.toString) + assert(df.collect.toSet === df2.collect.toSet) + assert(df.groupBy("word").agg(sum("count")).collect.toSet + === df2.groupBy("word").agg(sum("count")).collect.toSet) + } finally { + Utils.deleteRecursively(path) + } + } + + testOpaqueOnly("load from SQL with explicit schema") { securityLevel => + val data = for (i <- 0 until 256) yield (i, abc(i), 1) + val df = makeDF(data, securityLevel, "id", "word", "count") + val path = Utils.createTempDir() + path.delete() + df.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) + + try { + spark.sql(s""" + |CREATE TEMPORARY VIEW df2 + |(${df.schema.toDDL}) + |USING edu.berkeley.cs.rise.opaque.EncryptedSource + |OPTIONS ( + | path "${path}" + |)""".stripMargin) + val df2 = spark.sql(s""" + |SELECT * FROM df2 + |""".stripMargin) + + assert(df.collect.toSet === df2.collect.toSet) + } finally { + spark.catalog.dropTempView("df2") + Utils.deleteRecursively(path) + } + } + + testOpaqueOnly("load from SQL without schema") { securityLevel => + val data = for (i <- 0 until 256) yield (i, abc(i), 1) + val df = makeDF(data, securityLevel, "id", "word", "count") + val path = Utils.createTempDir() + path.delete() + df.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(path.toString) + + try { + spark.sql(s""" + |CREATE TEMPORARY VIEW df2 + |USING edu.berkeley.cs.rise.opaque.EncryptedSource + |OPTIONS ( + | path "${path}" + |)""".stripMargin) + val df2 = spark.sql(s""" + |SELECT * FROM df2 + |""".stripMargin) + + assert(df.collect.toSet === df2.collect.toSet) + } finally { + spark.catalog.dropTempView("df2") + Utils.deleteRecursively(path) + } + } + + testAgainstSpark("SQL API") { securityLevel => + val df = makeDF( + (1 to 20).map(x => (true, "hello", 1.0, 2.0f, x)), + securityLevel, + "a", "b", "c", "d", "x") + df.createTempView("df") + try { + spark.sql("SELECT * FROM df WHERE x > 10").collect + } finally { + spark.catalog.dropTempView("df") + } + } + + testOpaqueOnly("cast error") { securityLevel => + val data: Seq[(CalendarInterval, Byte)] = Seq((new CalendarInterval(12, 1, 12345), 0.toByte)) + val schema = StructType(Seq( + StructField("CalendarIntervalType", CalendarIntervalType), + StructField("NullType", NullType))) + val df = securityLevel.applyTo( + spark.createDataFrame( + spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + schema)) + // Trigger an Opaque exception by attempting an unsupported cast: CalendarIntervalType to + // StringType + val e = intercept[SparkException] { + withLoggingOff { + df.select($"CalendarIntervalType".cast(StringType)).collect + } + } + assert(e.getCause.isInstanceOf[OpaqueException]) + } + + testAgainstSpark("exp") { securityLevel => + val data: Seq[(Double, Double)] = Seq( + (2.0, 3.0)) + val schema = StructType(Seq( + StructField("x", DoubleType), + StructField("y", DoubleType))) + + val df = securityLevel.applyTo( + spark.createDataFrame( + spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + schema)) + + df.select(exp($"y")).collect + } + + testAgainstSpark("vector multiply") { securityLevel => + val data: Seq[(Array[Double], Double)] = Seq( + (Array[Double](1.0, 1.0, 1.0), 3.0)) + val schema = StructType(Seq( + StructField("v", DataTypes.createArrayType(DoubleType)), + StructField("c", DoubleType))) + + val df = securityLevel.applyTo( + spark.createDataFrame( + spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + schema)) + + df.select(vectormultiply($"v", $"c")).collect + } + + testAgainstSpark("dot product") { securityLevel => + val data: Seq[(Array[Double], Array[Double])] = Seq( + (Array[Double](1.0, 1.0, 1.0), Array[Double](1.0, 1.0, 1.0))) + val schema = StructType(Seq( + StructField("v1", DataTypes.createArrayType(DoubleType)), + StructField("v2", DataTypes.createArrayType(DoubleType)))) + + val df = securityLevel.applyTo( + spark.createDataFrame( + spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + schema)) + + df.select(dot($"v1", $"v2")).collect + } + + testAgainstSpark("upper") { securityLevel => + val data = Seq(("lower", "upper"), ("lower2", "upper2")) + val schema = StructType(Seq( + StructField("v1", StringType), + StructField("v2", StringType))) + + val df = securityLevel.applyTo( + spark.createDataFrame( + spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + schema)) + + df.select(upper($"v1")).collect + } + + testAgainstSpark("upper with null") { securityLevel => + val data = Seq(("lower", null.asInstanceOf[String])) + + val df = makeDF(data, securityLevel, "v1", "v2") + + df.select(upper($"v2")).collect + } + + testAgainstSpark("vector sum") { securityLevel => + val data: Seq[(Array[Double], Double)] = Seq( + (Array[Double](1.0, 2.0, 3.0), 4.0), + (Array[Double](5.0, 7.0, 7.0), 8.0)) + val schema = StructType(Seq( + StructField("v", DataTypes.createArrayType(DoubleType)), + StructField("c", DoubleType))) + + val df = securityLevel.applyTo( + spark.createDataFrame( + spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + schema)) + + val vectorsum = new VectorSum + df.groupBy().agg(vectorsum($"v")).collect + } + + testAgainstSpark("create array") { securityLevel => + val data: Seq[(Double, Double)] = Seq( + (1.0, 2.0), + (3.0, 4.0)) + val schema = StructType(Seq( + StructField("x1", DoubleType), + StructField("x2", DoubleType))) + + val df = securityLevel.applyTo( + spark.createDataFrame( + spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + schema)) + + df.select(array($"x1", $"x2").as("x")).collect + } + + testAgainstSpark("limit with fewer returned values") { securityLevel => + val data = Random.shuffle(for (i <- 0 until 256) yield (i, abc(i))) + val schema = StructType(Seq( + StructField("id", IntegerType), + StructField("word", StringType))) + val df = securityLevel.applyTo( + spark.createDataFrame( + spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + schema)) + df.sort($"id").limit(5).collect + } + + testAgainstSpark("limit with more returned values") { securityLevel => + val data = Random.shuffle(for (i <- 0 until 256) yield (i, abc(i))) + val schema = StructType(Seq( + StructField("id", IntegerType), + StructField("word", StringType))) + val df = securityLevel.applyTo( + spark.createDataFrame( + spark.sparkContext.makeRDD(data.map(Row.fromTuple), numPartitions), + schema)) + df.sort($"id").limit(200).collect + } + + testAgainstSpark("least squares") { securityLevel => + LeastSquares.query(spark, securityLevel, "tiny", numPartitions).collect + } + + testAgainstSpark("logistic regression") { securityLevel => + LogisticRegression.train(spark, securityLevel, 1000, numPartitions) + } + + testAgainstSpark("k-means") { securityLevel => + import scala.math.Ordering.Implicits.seqDerivedOrdering + KMeans.train(spark, securityLevel, numPartitions, 10, 2, 3, 0.01).map(_.toSeq).sorted + } + + testAgainstSpark("pagerank") { securityLevel => + PageRank.run(spark, securityLevel, "256", numPartitions).collect.toSet + } + + testAgainstSpark("TPC-H 9") { securityLevel => + TPCH.tpch9(spark.sqlContext, securityLevel, "sf_small", numPartitions).collect.toSet + } + + testAgainstSpark("big data 1") { securityLevel => + BigDataBenchmark.q1(spark, securityLevel, "tiny", numPartitions).collect + } + + testAgainstSpark("big data 2") { securityLevel => + BigDataBenchmark.q2(spark, securityLevel, "tiny", numPartitions).collect + .map { case Row(a: String, b: Double) => (a, b.toFloat) } + .sortBy(_._1) + } + + testAgainstSpark("big data 3") { securityLevel => + BigDataBenchmark.q3(spark, securityLevel, "tiny", numPartitions).collect + } def makeDF[A <: Product : scala.reflect.ClassTag : scala.reflect.runtime.universe.TypeTag]( data: Seq[A], securityLevel: SecurityLevel, columnNames: String*): DataFrame = @@ -829,23 +829,23 @@ class OpaqueMultiplePartitionSuite extends OpaqueOperatorTests { .toDF(columnNames: _*)) } - // testAgainstSpark("join with different numbers of partitions (#34)") { securityLevel => - // val p_data = for (i <- 1 to 16) yield (i.toString, i * 10) - // val f_data = for (i <- 1 to 256 - 16) yield ((i % 16).toString, (i * 10).toString, i.toFloat) - // val p = makeDF(p_data, securityLevel, "pk", "x") - // val f = makePartitionedDF(f_data, securityLevel, numPartitions + 1, "fk", "x", "y") - // p.join(f, $"pk" === $"fk").collect.toSet - // } - - // testAgainstSpark("non-foreign-key join with high skew") { securityLevel => - // // This test is intended to ensure that primary groups are never split across multiple - // // partitions, which would break our implementation of non-foreign-key join. - - // val p_data = for (i <- 1 to 128) yield (i, 1) - // val f_data = for (i <- 1 to 128) yield (i, 1) - // val p = makeDF(p_data, securityLevel, "id", "join_col_1") - // val f = makeDF(f_data, securityLevel, "id", "join_col_2") - // p.join(f, $"join_col_1" === $"join_col_2").collect.toSet - // } + testAgainstSpark("join with different numbers of partitions (#34)") { securityLevel => + val p_data = for (i <- 1 to 16) yield (i.toString, i * 10) + val f_data = for (i <- 1 to 256 - 16) yield ((i % 16).toString, (i * 10).toString, i.toFloat) + val p = makeDF(p_data, securityLevel, "pk", "x") + val f = makePartitionedDF(f_data, securityLevel, numPartitions + 1, "fk", "x", "y") + p.join(f, $"pk" === $"fk").collect.toSet + } + + testAgainstSpark("non-foreign-key join with high skew") { securityLevel => + // This test is intended to ensure that primary groups are never split across multiple + // partitions, which would break our implementation of non-foreign-key join. + + val p_data = for (i <- 1 to 128) yield (i, 1) + val f_data = for (i <- 1 to 128) yield (i, 1) + val p = makeDF(p_data, securityLevel, "id", "join_col_1") + val f = makeDF(f_data, securityLevel, "id", "join_col_2") + p.join(f, $"join_col_1" === $"join_col_2").collect.toSet + } } From f5f91313cf60900502819e7b3d5084c97f717503 Mon Sep 17 00:00:00 2001 From: Eric Feng Date: Fri, 6 Nov 2020 12:30:59 -0600 Subject: [PATCH 4/5] resolve comments --- src/enclave/Enclave/ExpressionEvaluation.h | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/enclave/Enclave/ExpressionEvaluation.h b/src/enclave/Enclave/ExpressionEvaluation.h index 1c19936fdf..e1dbd24e18 100644 --- a/src/enclave/Enclave/ExpressionEvaluation.h +++ b/src/enclave/Enclave/ExpressionEvaluation.h @@ -3,11 +3,8 @@ #include #include #include -// #include #include -#include - #include "Flatbuffers.h" int printf(const char *fmt, ...); From 0d00a171245994073d9bf3f8550be69bb3227c20 Mon Sep 17 00:00:00 2001 From: Eric Feng Date: Thu, 12 Nov 2020 12:29:35 -0600 Subject: [PATCH 5/5] change interval equality --- src/enclave/Enclave/ExpressionEvaluation.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/enclave/Enclave/ExpressionEvaluation.h b/src/enclave/Enclave/ExpressionEvaluation.h index e1dbd24e18..1c91d2e3f4 100644 --- a/src/enclave/Enclave/ExpressionEvaluation.h +++ b/src/enclave/Enclave/ExpressionEvaluation.h @@ -270,7 +270,7 @@ class FlatbuffersExpressionEvaluator { // If type is CalendarInterval, manually return a calendar interval field. // Otherwise 'days' disappears in conversion. - if (value->value_type() == 10) { + if (value->value_type() == tuix::FieldUnion_CalendarIntervalField) { auto *interval = value->value_as_CalendarIntervalField(); uint32_t months = interval->months();