From 7c89dd136f02155c67b51b3d3b9a7255e14064f5 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 10 Feb 2021 23:26:42 +0000 Subject: [PATCH 01/31] matching in strategies.scala set up class thing cleanup added test cases for non-equi left anti join rename to serializeEquiJoinExpression added isEncrypted condition set up keys JoinExpr now has condition rename serialization does not throw compile error for BNLJ split up added condition in ExpressionEvaluation.h zipPartitions cpp put in place typo added func to header two loops in place update tests condition fixed scala loop interchange rows added tags ensure cached == match working comparison decoupling in ExpressionEvalulation save compiles and condition works is printing fix swap outer/inner o_i_match show() has the same result tests pass test cleanup added test cases for different condition BuildLeft works optional keys in scala started C++ passes the operator tests comments, cleanup attemping to do it the ~right~ way comments to distinguish between primary/secondary, operator tests pass cleanup comments, about to begin implementation for distinct agg ops is_distinct added test case serializing with isDistinct is_distinct in ExpressionEvaluation.h removed unused code from join implementation remove RowWriter/Reader in condition evaluation (join) easier test serialization done correct checking in Scala set is set up spaghetti but it finally works function for clearing values condition_eval isntead of condition goto comment remove explain from test, need to fix distinct aggregation for >1 partitions started impl of multiple partitions fix added rangepartitionexec that runs partitioning cleanup serialization properly comments, generalization for > 1 distinct function comments about to refactor into logical.Aggregation the new case has distinct in result expressions need to match on distinct removed new case (doesn't make difference?) works Upgrade to OE 0.12 (#153) Update README.md Support for scalar subquery (#157) This PR implements the scalar subquery expression, which is triggered whenever a subquery returns a scalar value. There were two main problems that needed to be solved. First, support for matching the scalar subquery expression is necessary. Spark implements this by wrapping a SparkPlan within the expression and calls executeCollect. Then it constructs a literal with that value. However, this is problematic for us because that value should not be decrypted by the driver and serialized into an expression, since it's an intermediate value. Therefore, the second issue to be addressed here is supporting an encrypted literal. This is implemented in this PR by serializing an encrypted ciphertext into a base64 encoded string, and wrapping a Decrypt expression on top of it. This expression is then evaluated in the enclave and returns a literal. Note that, in order to test our implementation, we also implement a Decrypt expression in Scala. However, this should never be evaluated on the driver side and serialized into a plaintext literal. This is because Decrypt is designated as a Nondeterministic expression, and therefore will always evaluate on the workers. match remove RangePartitionExec inefficient implementation refined Add TPC-H Benchmarks (#139) * logic decoupling in TPCH.scala for easier benchmarking * added TPCHBenchmark.scala * Benchmark.scala rewrite * done adding all support TPC-H query benchmarks * changed commandline arguments that benchmark takes * TPCHBenchmark takes in parameters * fixed issue with spark conf * size error handling, --help flag * add Utils.force, break cluster mode * comment out logistic regression benchmark * ensureCached right before temp view created/replaced * upgrade to 3.0.1 * upgrade to 3.0.1 * 10 scale factor * persistData * almost done refactor * more cleanup * compiles * 9 passes * cleanup * collect instead of force, sf_none * remove sf_none * defaultParallelism * no removing trailing/leading whitespace * add sf_med * hdfs works in local case * cleanup, added new CLI argument * added newly supported tpch queries * function for running all supported tests complete instead of partial -> final removed traces of join cleanup --- src/enclave/Enclave/Aggregate.cpp | 4 +- src/enclave/Enclave/ExpressionEvaluation.h | 30 ++++++++++ src/flatbuffers/operators.fbs | 3 + .../edu/berkeley/cs/rise/opaque/Utils.scala | 50 +++++++++++++--- .../berkeley/cs/rise/opaque/strategies.scala | 60 +++++++++++++------ .../cs/rise/opaque/OpaqueOperatorTests.scala | 7 +++ 6 files changed, 124 insertions(+), 30 deletions(-) diff --git a/src/enclave/Enclave/Aggregate.cpp b/src/enclave/Enclave/Aggregate.cpp index e434f77e37..1e2a549269 100644 --- a/src/enclave/Enclave/Aggregate.cpp +++ b/src/enclave/Enclave/Aggregate.cpp @@ -30,8 +30,8 @@ void non_oblivious_aggregate( count += 1; } - // Skip outputting the final row if the number of input rows is 0 AND - // 1. It's a grouping aggregation, OR + // Skip outputting the final row if: + // 1. The number of input rows is 0 AND it's a grouping aggregation, OR // 2. It's a global aggregation, the mode is final if (!(count == 0 && (agg_op_eval.get_num_grouping_keys() > 0 || (agg_op_eval.get_num_grouping_keys() == 0 && !is_partial)))) { w.append(agg_op_eval.evaluate()); diff --git a/src/enclave/Enclave/ExpressionEvaluation.h b/src/enclave/Enclave/ExpressionEvaluation.h index 0f48c56d48..490ca01077 100644 --- a/src/enclave/Enclave/ExpressionEvaluation.h +++ b/src/enclave/Enclave/ExpressionEvaluation.h @@ -1811,6 +1811,9 @@ class AggregateExpressionEvaluator { std::unique_ptr( new FlatbuffersExpressionEvaluator(eval_expr))); } + is_distinct = expr->is_distinct(); + value_selector = std::unique_ptr( + new FlatbuffersExpressionEvaluator(expr->value_selector())); } std::vector initial_values(const tuix::Row *unused) { @@ -1824,6 +1827,15 @@ class AggregateExpressionEvaluator { std::vector update(const tuix::Row *concat) { std::vector result; for (auto&& e : update_evaluators) { + if (is_distinct) { + std::string value = to_string(value_selector->eval(concat)); + /* Check to see if this distinct value has already been counted */ + if (observed_values.count(value)) { + std::vector vect(1, nullptr); + return vect; + } + observed_values.insert(value); + } result.push_back(e->eval(concat)); } return result; @@ -1837,11 +1849,18 @@ class AggregateExpressionEvaluator { return result; } + void clear_observed_values() { + observed_values.clear(); + } + private: flatbuffers::FlatBufferBuilder builder; std::vector> initial_value_evaluators; std::vector> update_evaluators; std::vector> evaluate_evaluators; + bool is_distinct; + std::unique_ptr value_selector; + std::set observed_values; }; class FlatbuffersAggOpEvaluator { @@ -1880,6 +1899,7 @@ class FlatbuffersAggOpEvaluator { // Write initial values to a std::vector> init_fields; for (auto&& e : aggregate_evaluators) { + e->clear_observed_values(); for (auto f : e->initial_values(nullptr)) { init_fields.push_back(flatbuffers_copy(f, builder2)); } @@ -1901,6 +1921,7 @@ class FlatbuffersAggOpEvaluator { void aggregate(const tuix::Row *row) { builder.Clear(); flatbuffers::Offset concat; + int a_length = a->field_values()->size(); std::vector> concat_fields; // concat row to a @@ -1918,9 +1939,18 @@ class FlatbuffersAggOpEvaluator { std::vector> output_fields; for (auto&& e : aggregate_evaluators) { for (auto f : e->update(concat_ptr)) { + if (f == nullptr) { // Only triggered on EXPR(distinct expr ...) + output_fields.clear(); + for (int i = 0; i < a_length; i++) { + auto f = concat_ptr->field_values()->Get(i); + output_fields.push_back(flatbuffers_copy(f, builder2)); + } + goto save_a; + } output_fields.push_back(flatbuffers_copy(f, builder2)); } } +save_a: a = flatbuffers::GetTemporaryPointer( builder2, tuix::CreateRowDirect(builder2, &output_fields)); } diff --git a/src/flatbuffers/operators.fbs b/src/flatbuffers/operators.fbs index 1ebd06c971..3d8a864638 100644 --- a/src/flatbuffers/operators.fbs +++ b/src/flatbuffers/operators.fbs @@ -38,6 +38,9 @@ table AggregateExpr { initial_values: [Expr]; update_exprs: [Expr]; evaluate_exprs: [Expr]; + // Items below are used for EXPR(distinct col_name ...) + is_distinct: bool; + value_selector: Expr; } // Supported: Average, Count, First, Last, Max, Min, Sum diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index 4c6970e489..b0bbd5e4b1 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -1371,13 +1371,25 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray), + false, + 0 ) case c @ Count(children) => val count = c.aggBufferAttributes(0) // COUNT(*) should count NULL values // COUNT(expr) should return the number or rows for which the supplied expressions are non-NULL + // COUNT(distinct expr ...) should return the number of rows that contain UNIQUE values of expr + + val ar = e.aggregateFunction.children(0) + val colNum = concatSchema.indexWhere(_.semanticEquals(ar)) + val (isDistinct, valueSelector) = (e.isDistinct, colNum) match { + case (true, x) if x >= 0 => // If colNum < 0, then the given schema does not contain the attribute + (true, flatbuffersSerializeExpression(builder, ar, concatSchema)) + case _ => + (false, 0) + } val (updateExprs: Seq[Expression], evaluateExprs: Seq[Expression]) = e.mode match { case Partial => { @@ -1396,7 +1408,7 @@ object Utils extends Logging { val countUpdateExpr = Add(count, Literal(1L)) (Seq(countUpdateExpr), Seq(count)) } - case _ => + case _ => } tuix.AggregateExpr.createAggregateExpr( @@ -1410,7 +1422,9 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray), + isDistinct, + valueSelector ) case f @ First(child, false) => @@ -1449,7 +1463,10 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray)) + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray), + false, + 0 + ) case l @ Last(child, false) => val last = l.aggBufferAttributes(0) @@ -1487,7 +1504,10 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray)) + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray), + false, + 0 + ) case m @ Max(child) => val max = m.aggBufferAttributes(0) @@ -1520,7 +1540,10 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray)) + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray), + false, + 0 + ) case m @ Min(child) => val min = m.aggBufferAttributes(0) @@ -1553,7 +1576,10 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray)) + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray), + false, + 0 + ) case s @ Sum(child) => val sum = s.aggBufferAttributes(0) @@ -1591,7 +1617,10 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray)) + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray), + false, + 0 + ) case vs @ ScalaUDAF(Seq(child), _: VectorSum, _, _) => val sum = vs.aggBufferAttributes(0) @@ -1626,7 +1655,10 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray)) + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray), + false, + 0 + ) } } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index 0c8f188369..c4d281f7ca 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -109,25 +109,47 @@ object OpaqueOperators extends Strategy { if (isEncrypted(child) && aggExpressions.forall(expr => expr.isInstanceOf[AggregateExpression])) => val aggregateExpressions = aggExpressions.map(expr => expr.asInstanceOf[AggregateExpression]) - - if (groupingExpressions.size == 0) { - // Global aggregation - val partialAggregate = EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Partial, planLater(child)) - val partialOutput = partialAggregate.output - val (projSchema, tag) = tagForGlobalAggregate(partialOutput) - - EncryptedProjectExec(resultExpressions, - EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Final, - EncryptedProjectExec(partialOutput, - EncryptedSortExec(Seq(SortOrder(tag, Ascending)), true, - EncryptedProjectExec(projSchema, partialAggregate))))) :: Nil - } else { - // Grouping aggregation - EncryptedProjectExec(resultExpressions, - EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Final, - EncryptedSortExec(groupingExpressions.map(_.toAttribute).map(e => SortOrder(e, Ascending)), true, - EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Partial, - EncryptedSortExec(groupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)))))) :: Nil + val (functionsWithDistinct, functionsWithoutDistinct) = aggregateExpressions.partition(_.isDistinct) + + functionsWithDistinct.size match { + case size if size == 0 => // No distinct aggregate operations + if (groupingExpressions.size == 0) { + // Global aggregation + val partialAggregate = EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Partial, planLater(child)) + val partialOutput = partialAggregate.output + val (projSchema, tag) = tagForGlobalAggregate(partialOutput) + + EncryptedProjectExec(resultExpressions, + EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Final, + EncryptedProjectExec(partialOutput, + EncryptedSortExec(Seq(SortOrder(tag, Ascending)), true, + EncryptedProjectExec(projSchema, partialAggregate))))) :: Nil + } else { + // Grouping aggregation + EncryptedProjectExec(resultExpressions, + EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Final, + EncryptedSortExec(groupingExpressions.map(_.toAttribute).map(e => SortOrder(e, Ascending)), true, + EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Partial, + EncryptedSortExec(groupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)))))) :: Nil + } + case size if size == 1 => // One distinct aggregate operation + if (groupingExpressions.size == 0) { + // Global aggregation + val partialAggregate = EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Partial, planLater(child)) + val partialOutput = partialAggregate.output + val (projSchema, tag) = tagForGlobalAggregate(partialOutput) + + EncryptedProjectExec(resultExpressions, + EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Final, + EncryptedProjectExec(partialOutput, + EncryptedSortExec(Seq(SortOrder(tag, Ascending)), true, + EncryptedProjectExec(projSchema, partialAggregate))))) :: Nil + } else { + // Grouping aggregation + EncryptedProjectExec(resultExpressions, + EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Complete, + EncryptedSortExec(groupingExpressions.map(e => SortOrder(e, Ascending)), true, planLater(child)))) :: Nil + } } case p @ Union(Seq(left, right)) if isEncrypted(p) => diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index a69894d13c..231ef6439e 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -377,6 +377,13 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => .collect.sortBy { case Row(category: String, _) => category } } + testAgainstSpark("aggregate count - distinct") { securityLevel => + val data = (0 until 32).map{ i => (abc(i), i % 8)}.toSeq + val words = makeDF(data, securityLevel, "category", "price") + words.groupBy("category").agg(countDistinct("price").as("distinctPrices")) + .collect.sortBy { case Row(category: String, _) => category } + } + testAgainstSpark("aggregate first") { securityLevel => val data = for (i <- 0 until 256) yield (i, abc(i), 1) val words = makeDF(data, securityLevel, "id", "category", "price") From 59d228e8301e836cfabe7f8b01267889a2e39acd Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Tue, 23 Feb 2021 23:05:36 +0000 Subject: [PATCH 02/31] added test case for one distinct one non, reverted comment --- src/enclave/Enclave/Aggregate.cpp | 6 ++++-- .../scala/edu/berkeley/cs/rise/opaque/strategies.scala | 2 +- .../berkeley/cs/rise/opaque/OpaqueOperatorTests.scala | 10 +++++++++- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/enclave/Enclave/Aggregate.cpp b/src/enclave/Enclave/Aggregate.cpp index 1e2a549269..eea297d5a2 100644 --- a/src/enclave/Enclave/Aggregate.cpp +++ b/src/enclave/Enclave/Aggregate.cpp @@ -5,6 +5,8 @@ #include "FlatbuffersWriters.h" #include "common.h" +#include + void non_oblivious_aggregate( uint8_t *agg_op, size_t agg_op_length, uint8_t *input_rows, size_t input_rows_length, @@ -30,8 +32,8 @@ void non_oblivious_aggregate( count += 1; } - // Skip outputting the final row if: - // 1. The number of input rows is 0 AND it's a grouping aggregation, OR + // Skip outputting the final row if the number of input rows is 0 AND + // 1. It's a grouping aggregation, OR // 2. It's a global aggregation, the mode is final if (!(count == 0 && (agg_op_eval.get_num_grouping_keys() > 0 || (agg_op_eval.get_num_grouping_keys() == 0 && !is_partial)))) { w.append(agg_op_eval.evaluate()); diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index c4d281f7ca..fcdd9ff87a 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -112,7 +112,7 @@ object OpaqueOperators extends Strategy { val (functionsWithDistinct, functionsWithoutDistinct) = aggregateExpressions.partition(_.isDistinct) functionsWithDistinct.size match { - case size if size == 0 => // No distinct aggregate operations + case 0 => // No distinct aggregate operations if (groupingExpressions.size == 0) { // Global aggregation val partialAggregate = EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Partial, planLater(child)) diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index 231ef6439e..fb1ab88f70 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -380,10 +380,18 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => testAgainstSpark("aggregate count - distinct") { securityLevel => val data = (0 until 32).map{ i => (abc(i), i % 8)}.toSeq val words = makeDF(data, securityLevel, "category", "price") - words.groupBy("category").agg(countDistinct("price").as("distinctPrices")) + words.groupBy("category").agg(countDistinct("price").as("num_unique_prices")) .collect.sortBy { case Row(category: String, _) => category } } + testAgainstSpark("aggregate count - distinct and indistinct") { securityLevel => + val data = (0 until 32).map{ i => (abc(i), i % 4, i % 8)}.toSeq + val words = makeDF(data, securityLevel, "category", "id", "price") + val df = words.groupBy("category").agg(count("id").as("num_ids"), + countDistinct("price").as("num_unique_prices")) + df.collect + } + testAgainstSpark("aggregate first") { securityLevel => val data = for (i <- 0 until 256) yield (i, abc(i), 1) val words = makeDF(data, securityLevel, "id", "category", "price") From aa4c12753a1f287685464f39f0c5559179bf9303 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 00:29:42 +0000 Subject: [PATCH 03/31] removed C++ level implementation of is_distinct --- src/enclave/Enclave/ExpressionEvaluation.h | 30 ------------- src/flatbuffers/operators.fbs | 3 -- .../edu/berkeley/cs/rise/opaque/Utils.scala | 42 ++++--------------- 3 files changed, 8 insertions(+), 67 deletions(-) diff --git a/src/enclave/Enclave/ExpressionEvaluation.h b/src/enclave/Enclave/ExpressionEvaluation.h index 490ca01077..0f48c56d48 100644 --- a/src/enclave/Enclave/ExpressionEvaluation.h +++ b/src/enclave/Enclave/ExpressionEvaluation.h @@ -1811,9 +1811,6 @@ class AggregateExpressionEvaluator { std::unique_ptr( new FlatbuffersExpressionEvaluator(eval_expr))); } - is_distinct = expr->is_distinct(); - value_selector = std::unique_ptr( - new FlatbuffersExpressionEvaluator(expr->value_selector())); } std::vector initial_values(const tuix::Row *unused) { @@ -1827,15 +1824,6 @@ class AggregateExpressionEvaluator { std::vector update(const tuix::Row *concat) { std::vector result; for (auto&& e : update_evaluators) { - if (is_distinct) { - std::string value = to_string(value_selector->eval(concat)); - /* Check to see if this distinct value has already been counted */ - if (observed_values.count(value)) { - std::vector vect(1, nullptr); - return vect; - } - observed_values.insert(value); - } result.push_back(e->eval(concat)); } return result; @@ -1849,18 +1837,11 @@ class AggregateExpressionEvaluator { return result; } - void clear_observed_values() { - observed_values.clear(); - } - private: flatbuffers::FlatBufferBuilder builder; std::vector> initial_value_evaluators; std::vector> update_evaluators; std::vector> evaluate_evaluators; - bool is_distinct; - std::unique_ptr value_selector; - std::set observed_values; }; class FlatbuffersAggOpEvaluator { @@ -1899,7 +1880,6 @@ class FlatbuffersAggOpEvaluator { // Write initial values to a std::vector> init_fields; for (auto&& e : aggregate_evaluators) { - e->clear_observed_values(); for (auto f : e->initial_values(nullptr)) { init_fields.push_back(flatbuffers_copy(f, builder2)); } @@ -1921,7 +1901,6 @@ class FlatbuffersAggOpEvaluator { void aggregate(const tuix::Row *row) { builder.Clear(); flatbuffers::Offset concat; - int a_length = a->field_values()->size(); std::vector> concat_fields; // concat row to a @@ -1939,18 +1918,9 @@ class FlatbuffersAggOpEvaluator { std::vector> output_fields; for (auto&& e : aggregate_evaluators) { for (auto f : e->update(concat_ptr)) { - if (f == nullptr) { // Only triggered on EXPR(distinct expr ...) - output_fields.clear(); - for (int i = 0; i < a_length; i++) { - auto f = concat_ptr->field_values()->Get(i); - output_fields.push_back(flatbuffers_copy(f, builder2)); - } - goto save_a; - } output_fields.push_back(flatbuffers_copy(f, builder2)); } } -save_a: a = flatbuffers::GetTemporaryPointer( builder2, tuix::CreateRowDirect(builder2, &output_fields)); } diff --git a/src/flatbuffers/operators.fbs b/src/flatbuffers/operators.fbs index 3d8a864638..1ebd06c971 100644 --- a/src/flatbuffers/operators.fbs +++ b/src/flatbuffers/operators.fbs @@ -38,9 +38,6 @@ table AggregateExpr { initial_values: [Expr]; update_exprs: [Expr]; evaluate_exprs: [Expr]; - // Items below are used for EXPR(distinct col_name ...) - is_distinct: bool; - value_selector: Expr; } // Supported: Average, Count, First, Last, Max, Min, Sum diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index b0bbd5e4b1..460363ba01 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -1371,25 +1371,13 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray), - false, - 0 + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) ) case c @ Count(children) => val count = c.aggBufferAttributes(0) // COUNT(*) should count NULL values // COUNT(expr) should return the number or rows for which the supplied expressions are non-NULL - // COUNT(distinct expr ...) should return the number of rows that contain UNIQUE values of expr - - val ar = e.aggregateFunction.children(0) - val colNum = concatSchema.indexWhere(_.semanticEquals(ar)) - val (isDistinct, valueSelector) = (e.isDistinct, colNum) match { - case (true, x) if x >= 0 => // If colNum < 0, then the given schema does not contain the attribute - (true, flatbuffersSerializeExpression(builder, ar, concatSchema)) - case _ => - (false, 0) - } val (updateExprs: Seq[Expression], evaluateExprs: Seq[Expression]) = e.mode match { case Partial => { @@ -1422,9 +1410,7 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray), - isDistinct, - valueSelector + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) ) case f @ First(child, false) => @@ -1463,9 +1449,7 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray), - false, - 0 + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) ) case l @ Last(child, false) => @@ -1504,9 +1488,7 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray), - false, - 0 + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) ) case m @ Max(child) => @@ -1540,9 +1522,7 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray), - false, - 0 + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) ) case m @ Min(child) => @@ -1576,9 +1556,7 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray), - false, - 0 + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) ) case s @ Sum(child) => @@ -1617,9 +1595,7 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray), - false, - 0 + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) ) case vs @ ScalaUDAF(Seq(child), _: VectorSum, _, _) => @@ -1655,9 +1631,7 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray), - false, - 0 + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) ) } } From 4635081d5f34043dc1baf614a94856ee68330f14 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 17:49:02 +0000 Subject: [PATCH 04/31] PartialMerge in operators.scala --- .../cs/rise/opaque/execution/operators.scala | 19 +++++++++++++++---- .../berkeley/cs/rise/opaque/strategies.scala | 6 +++--- .../cs/rise/opaque/OpaqueOperatorTests.scala | 1 + 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala index 4eb941157e..85e0248774 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala @@ -243,9 +243,16 @@ case class EncryptedAggregateExec( AttributeSet(aggExpressions) -- AttributeSet(groupingExpressions) override def output: Seq[Attribute] = mode match { - case Partial => groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.copy(mode = Partial)).flatMap(_.aggregateFunction.inputAggBufferAttributes) - case Final => groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.resultAttribute) - case Complete => groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.resultAttribute) + case Partial => + groupingExpressions.map(_.toAttribute) ++ + aggExpressions.map(_.copy(mode = Partial)).flatMap(_.aggregateFunction.inputAggBufferAttributes) + case PartialMerge => + groupingExpressions.map(_.toAttribute) ++ + aggExpressions.map(_.copy(mode = Partial)).flatMap(_.aggregateFunction.inputAggBufferAttributes) + case Final => + groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.resultAttribute) + case Complete => + groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.resultAttribute) } override def executeBlocked(): RDD[Block] = { @@ -255,6 +262,10 @@ case class EncryptedAggregateExec( val partialAggExpressions = aggExpressions.map(_.copy(mode = Partial)) (groupingExpressions, partialAggExpressions) } + case PartialMerge => { + val partialMergeAggExpressions = aggExpressions.map(_.copy(mode = PartialMerge)) + (groupingExpressions, partialMergeAggExpressions) + } case Final => { val finalGroupingExpressions = groupingExpressions.map(_.toAttribute) val finalAggExpressions = aggExpressions.map(_.copy(mode = Final)) @@ -270,7 +281,7 @@ case class EncryptedAggregateExec( timeOperator(child.asInstanceOf[OpaqueOperatorExec].executeBlocked(), "EncryptedPartialAggregateExec") { childRDD => childRDD.map { block => val (enclave, eid) = Utils.initEnclave() - Block(enclave.NonObliviousAggregate(eid, aggExprSer, block.bytes, (mode == Partial))) + Block(enclave.NonObliviousAggregate(eid, aggExprSer, block.bytes, (mode == Partial || mode == PartialMerge))) } } } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index fcdd9ff87a..342d350fea 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -146,9 +146,9 @@ object OpaqueOperators extends Strategy { EncryptedProjectExec(projSchema, partialAggregate))))) :: Nil } else { // Grouping aggregation - EncryptedProjectExec(resultExpressions, - EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Complete, - EncryptedSortExec(groupingExpressions.map(e => SortOrder(e, Ascending)), true, planLater(child)))) :: Nil + + // 1. Create an Aggregate Operator for partial aggregations. + Nil } } diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index fb1ab88f70..179ae59baf 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -389,6 +389,7 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => val words = makeDF(data, securityLevel, "category", "id", "price") val df = words.groupBy("category").agg(count("id").as("num_ids"), countDistinct("price").as("num_unique_prices")) + df.explain() df.collect } From c6a475045acfd7297540e849dd664d5f1c2f0cf6 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 18:10:52 +0000 Subject: [PATCH 05/31] stage 1: grouping with distinct expressions --- .../edu/berkeley/cs/rise/opaque/Utils.scala | 4 ++++ .../berkeley/cs/rise/opaque/strategies.scala | 17 +++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index 460363ba01..d1cbee78cb 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -1388,6 +1388,10 @@ object Utils extends Logging { } (Seq(countUpdateExpr), Seq(count)) } + case PartialMerge => { + val countUpdateExpr = Add(count, c.inputAggBufferAttributes(0)) + (Seq(countUpdateExpr), Seq(count)) + } case Final => { val countUpdateExpr = Add(count, c.inputAggBufferAttributes(0)) (Seq(countUpdateExpr), Seq(count)) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index 342d350fea..989cb23bc2 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -146,8 +146,25 @@ object OpaqueOperators extends Strategy { EncryptedProjectExec(projSchema, partialAggregate))))) :: Nil } else { // Grouping aggregation + val namedDistinctExpressions = functionsWithDistinct.head.aggregateFunction.children.map { e => + e match { + case ne: NamedExpression => ne + case other => + // Keep the name of the original expression. + val name = e match { + case ne: NamedExpression => ne.name + case _ => e.toString + } + Alias(other, name)() + } + } // 1. Create an Aggregate Operator for partial aggregations. + val partialAggregate = EncryptedAggregateExec(groupingExpressions ++ namedDistinctExpressions, + functionsWithoutDistinct, Partial, planLater(child)) + + println("Partial aggregate plan:") + println(partialAggregate) Nil } } From 3afb9493e0f69e85db6049bbece6222e48b8d829 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 18:29:56 +0000 Subject: [PATCH 06/31] stage 2: WIP --- .../scala/edu/berkeley/cs/rise/opaque/strategies.scala | 9 ++++++--- .../berkeley/cs/rise/opaque/OpaqueOperatorTests.scala | 1 - 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index 989cb23bc2..f1d5f41858 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -163,9 +163,12 @@ object OpaqueOperators extends Strategy { val partialAggregate = EncryptedAggregateExec(groupingExpressions ++ namedDistinctExpressions, functionsWithoutDistinct, Partial, planLater(child)) - println("Partial aggregate plan:") - println(partialAggregate) - Nil + // 2. Create an Aggregate Operator for partial merge aggregations. + val partialMergeAggregate = EncryptedAggregateExec(groupingExpressions ++ namedDistinctExpressions, + functionsWithoutDistinct, PartialMerge, partialAggregate) + + println(partialMergeAggregate) + partialMergeAggregate :: Nil } } diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index 179ae59baf..fb1ab88f70 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -389,7 +389,6 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => val words = makeDF(data, securityLevel, "category", "id", "price") val df = words.groupBy("category").agg(count("id").as("num_ids"), countDistinct("price").as("num_unique_prices")) - df.explain() df.collect } From db409a1531285cad2379d95f27b4fe1f42bcb3d5 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 19:47:25 +0000 Subject: [PATCH 07/31] saving, sorting by group expressions ++ name distinct expressions worked --- src/enclave/Enclave/Aggregate.cpp | 4 ++++ src/enclave/Enclave/ExpressionEvaluation.h | 3 +++ .../edu/berkeley/cs/rise/opaque/Utils.scala | 1 + .../cs/rise/opaque/execution/operators.scala | 2 +- .../edu/berkeley/cs/rise/opaque/strategies.scala | 16 ++++++++++++++-- .../cs/rise/opaque/OpaqueOperatorTests.scala | 12 ++++++++++-- 6 files changed, 33 insertions(+), 5 deletions(-) diff --git a/src/enclave/Enclave/Aggregate.cpp b/src/enclave/Enclave/Aggregate.cpp index eea297d5a2..7e29743ce7 100644 --- a/src/enclave/Enclave/Aggregate.cpp +++ b/src/enclave/Enclave/Aggregate.cpp @@ -6,6 +6,7 @@ #include "common.h" #include +using namespace std; void non_oblivious_aggregate( uint8_t *agg_op, size_t agg_op_length, @@ -25,6 +26,7 @@ void non_oblivious_aggregate( cur.set(r.next()); if (prev.get() != nullptr && !agg_op_eval.is_same_group(prev.get(), cur.get())) { + cout << to_string(agg_op_eval.evaluate()) << endl; w.append(agg_op_eval.evaluate()); agg_op_eval.reset_group(); } @@ -36,9 +38,11 @@ void non_oblivious_aggregate( // 1. It's a grouping aggregation, OR // 2. It's a global aggregation, the mode is final if (!(count == 0 && (agg_op_eval.get_num_grouping_keys() > 0 || (agg_op_eval.get_num_grouping_keys() == 0 && !is_partial)))) { + cout << to_string(agg_op_eval.evaluate()) << endl; w.append(agg_op_eval.evaluate()); } + cout << "end of function" << endl; w.output_buffer(output_rows, output_rows_length); } diff --git a/src/enclave/Enclave/ExpressionEvaluation.h b/src/enclave/Enclave/ExpressionEvaluation.h index 0f48c56d48..3d16ef7d0b 100644 --- a/src/enclave/Enclave/ExpressionEvaluation.h +++ b/src/enclave/Enclave/ExpressionEvaluation.h @@ -7,6 +7,9 @@ #include "Flatbuffers.h" +#include +using namespace std; + int printf(const char *fmt, ...); #ifndef EXPRESSION_EVALUATION_H diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index d1cbee78cb..daaba91a0d 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -1394,6 +1394,7 @@ object Utils extends Logging { } case Final => { val countUpdateExpr = Add(count, c.inputAggBufferAttributes(0)) + println(countUpdateExpr) (Seq(countUpdateExpr), Seq(count)) } case Complete => { diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala index 85e0248774..f00f5a2c00 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala @@ -281,7 +281,7 @@ case class EncryptedAggregateExec( timeOperator(child.asInstanceOf[OpaqueOperatorExec].executeBlocked(), "EncryptedPartialAggregateExec") { childRDD => childRDD.map { block => val (enclave, eid) = Utils.initEnclave() - Block(enclave.NonObliviousAggregate(eid, aggExprSer, block.bytes, (mode == Partial || mode == PartialMerge))) + Block(enclave.NonObliviousAggregate(eid, aggExprSer, block.bytes, (mode == Partial))) } } } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index f1d5f41858..7dc3f57cb8 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -126,6 +126,8 @@ object OpaqueOperators extends Strategy { EncryptedProjectExec(projSchema, partialAggregate))))) :: Nil } else { // Grouping aggregation + println(groupingExpressions) + println(aggregateExpressions) EncryptedProjectExec(resultExpressions, EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Final, EncryptedSortExec(groupingExpressions.map(_.toAttribute).map(e => SortOrder(e, Ascending)), true, @@ -158,17 +160,27 @@ object OpaqueOperators extends Strategy { Alias(other, name)() } } + println(groupingExpressions ++ namedDistinctExpressions) + println(functionsWithoutDistinct) + EncryptedAggregateExec(groupingExpressions ++ namedDistinctExpressions, functionsWithoutDistinct, Final, + EncryptedSortExec((groupingExpressions ++ namedDistinctExpressions).map(_.toAttribute).map(e => SortOrder(e, Ascending)), true, + EncryptedAggregateExec(groupingExpressions ++ namedDistinctExpressions, functionsWithoutDistinct, Partial, + EncryptedSortExec((groupingExpressions ++ namedDistinctExpressions).map(e => SortOrder(e, Ascending)), false, planLater(child))))) :: Nil + + /* + // Preprocessing. + val localSorted = EncryptedSortExec(groupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)) // 1. Create an Aggregate Operator for partial aggregations. val partialAggregate = EncryptedAggregateExec(groupingExpressions ++ namedDistinctExpressions, - functionsWithoutDistinct, Partial, planLater(child)) + functionsWithoutDistinct, Partial, localSorted) // 2. Create an Aggregate Operator for partial merge aggregations. val partialMergeAggregate = EncryptedAggregateExec(groupingExpressions ++ namedDistinctExpressions, functionsWithoutDistinct, PartialMerge, partialAggregate) - println(partialMergeAggregate) partialMergeAggregate :: Nil + */ } } diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index fb1ab88f70..b8057baef1 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -387,11 +387,19 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => testAgainstSpark("aggregate count - distinct and indistinct") { securityLevel => val data = (0 until 32).map{ i => (abc(i), i % 4, i % 8)}.toSeq val words = makeDF(data, securityLevel, "category", "id", "price") - val df = words.groupBy("category").agg(count("id").as("num_ids"), - countDistinct("price").as("num_unique_prices")) + val df = words.groupBy("category").agg(countDistinct("id").as("num_unique_ids"), + count("price").as("num_prices")) df.collect } + testAgainstSpark("aggregate count - two indistinct") { securityLevel => + val data = (0 until 32).map{ i => (abc(i), i % 4, i % 8)}.toSeq + val words = makeDF(data, securityLevel, "category", "id", "price") + val df = words.groupBy("category", "id").agg( + count("price").as("num_prices")) + df.collect.toSet + } + testAgainstSpark("aggregate first") { securityLevel => val data = for (i <- 0 until 256) yield (i, abc(i), 1) val words = makeDF(data, securityLevel, "id", "category", "price") From cb2220ffeab182bc58523b0b4655206e74b6f90b Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 19:53:33 +0000 Subject: [PATCH 08/31] stage 1 & 2 printing the expected results --- src/enclave/Enclave/Aggregate.cpp | 2 +- .../edu/berkeley/cs/rise/opaque/Utils.scala | 1 - .../berkeley/cs/rise/opaque/strategies.scala | 29 +++++++++++-------- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/enclave/Enclave/Aggregate.cpp b/src/enclave/Enclave/Aggregate.cpp index 7e29743ce7..4b04ca126a 100644 --- a/src/enclave/Enclave/Aggregate.cpp +++ b/src/enclave/Enclave/Aggregate.cpp @@ -42,7 +42,7 @@ void non_oblivious_aggregate( w.append(agg_op_eval.evaluate()); } - cout << "end of function" << endl; + cout << "end of aggregate function" << endl; w.output_buffer(output_rows, output_rows_length); } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index daaba91a0d..d1cbee78cb 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -1394,7 +1394,6 @@ object Utils extends Logging { } case Final => { val countUpdateExpr = Add(count, c.inputAggBufferAttributes(0)) - println(countUpdateExpr) (Seq(countUpdateExpr), Seq(count)) } case Complete => { diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index 7dc3f57cb8..0afff745cb 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -126,8 +126,6 @@ object OpaqueOperators extends Strategy { EncryptedProjectExec(projSchema, partialAggregate))))) :: Nil } else { // Grouping aggregation - println(groupingExpressions) - println(aggregateExpressions) EncryptedProjectExec(resultExpressions, EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Final, EncryptedSortExec(groupingExpressions.map(_.toAttribute).map(e => SortOrder(e, Ascending)), true, @@ -160,27 +158,34 @@ object OpaqueOperators extends Strategy { Alias(other, name)() } } - println(groupingExpressions ++ namedDistinctExpressions) - println(functionsWithoutDistinct) - EncryptedAggregateExec(groupingExpressions ++ namedDistinctExpressions, functionsWithoutDistinct, Final, + /* + EncryptedAggregateExec(groupingExpressions ++ namedDistinctExpressions, functionsWithoutDistinct, PartialMerge, EncryptedSortExec((groupingExpressions ++ namedDistinctExpressions).map(_.toAttribute).map(e => SortOrder(e, Ascending)), true, EncryptedAggregateExec(groupingExpressions ++ namedDistinctExpressions, functionsWithoutDistinct, Partial, EncryptedSortExec((groupingExpressions ++ namedDistinctExpressions).map(e => SortOrder(e, Ascending)), false, planLater(child))))) :: Nil + */ - /* // Preprocessing. - val localSorted = EncryptedSortExec(groupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)) // 1. Create an Aggregate Operator for partial aggregations. - val partialAggregate = EncryptedAggregateExec(groupingExpressions ++ namedDistinctExpressions, - functionsWithoutDistinct, Partial, localSorted) + val partialAggregate = { + val combinedGroupingExpressions = groupingExpressions ++ namedDistinctExpressions + val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)) + + EncryptedAggregateExec(combinedGroupingExpressions, + functionsWithoutDistinct, Partial, sorted) + } // 2. Create an Aggregate Operator for partial merge aggregations. - val partialMergeAggregate = EncryptedAggregateExec(groupingExpressions ++ namedDistinctExpressions, - functionsWithoutDistinct, PartialMerge, partialAggregate) + val partialMergeAggregate = { + val combinedGroupingExpressions = groupingExpressions ++ namedDistinctExpressions + val sorted = EncryptedSortExec(combinedGroupingExpressions.map(_.toAttribute).map(e => SortOrder(e, Ascending)), true, partialAggregate) + + EncryptedAggregateExec(groupingExpressions ++ namedDistinctExpressions, + functionsWithoutDistinct, PartialMerge, sorted) + } partialMergeAggregate :: Nil - */ } } From e84ffa87290042da46b4934833d901d1b176d0d3 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 20:28:25 +0000 Subject: [PATCH 09/31] removed extraneous call to sorted, #3 in place but not working --- .../berkeley/cs/rise/opaque/strategies.scala | 27 +++++++++---------- .../cs/rise/opaque/OpaqueOperatorTests.scala | 1 + 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index 0afff745cb..a175729674 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -146,7 +146,8 @@ object OpaqueOperators extends Strategy { EncryptedProjectExec(projSchema, partialAggregate))))) :: Nil } else { // Grouping aggregation - val namedDistinctExpressions = functionsWithDistinct.head.aggregateFunction.children.map { e => + val distinctExpressions = functionsWithDistinct.head.aggregateFunction.children + val namedDistinctExpressions = distinctExpressions.map { e => e match { case ne: NamedExpression => ne case other => @@ -158,31 +159,27 @@ object OpaqueOperators extends Strategy { Alias(other, name)() } } - /* - EncryptedAggregateExec(groupingExpressions ++ namedDistinctExpressions, functionsWithoutDistinct, PartialMerge, - EncryptedSortExec((groupingExpressions ++ namedDistinctExpressions).map(_.toAttribute).map(e => SortOrder(e, Ascending)), true, - EncryptedAggregateExec(groupingExpressions ++ namedDistinctExpressions, functionsWithoutDistinct, Partial, - EncryptedSortExec((groupingExpressions ++ namedDistinctExpressions).map(e => SortOrder(e, Ascending)), false, planLater(child))))) :: Nil - */ - - // Preprocessing. // 1. Create an Aggregate Operator for partial aggregations. val partialAggregate = { val combinedGroupingExpressions = groupingExpressions ++ namedDistinctExpressions - val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)) - EncryptedAggregateExec(combinedGroupingExpressions, - functionsWithoutDistinct, Partial, sorted) + val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)) + EncryptedAggregateExec(combinedGroupingExpressions, functionsWithoutDistinct, Partial, sorted) } // 2. Create an Aggregate Operator for partial merge aggregations. val partialMergeAggregate = { val combinedGroupingExpressions = groupingExpressions ++ namedDistinctExpressions - val sorted = EncryptedSortExec(combinedGroupingExpressions.map(_.toAttribute).map(e => SortOrder(e, Ascending)), true, partialAggregate) - EncryptedAggregateExec(groupingExpressions ++ namedDistinctExpressions, - functionsWithoutDistinct, PartialMerge, sorted) + EncryptedAggregateExec(combinedGroupingExpressions, + functionsWithoutDistinct, PartialMerge, partialAggregate) + } + + // 3. Create an Aggregate operator for partial aggregation (for distinct) + val partialDistinctAggregate = { + EncryptedAggregateExec(groupingExpressions, + functionsWithoutDistinct ++ functionsWithDistinct, PartialMerge, partialMergeAggregate) } partialMergeAggregate :: Nil diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index b8057baef1..e1cc010481 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -389,6 +389,7 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => val words = makeDF(data, securityLevel, "category", "id", "price") val df = words.groupBy("category").agg(countDistinct("id").as("num_unique_ids"), count("price").as("num_prices")) + df.show() df.collect } From 400ffc9fa02a38f45ac2379dc5af434222e47517 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 20:51:22 +0000 Subject: [PATCH 10/31] stage 3 has the final, correct result: refactoring the Aggregate code to not cast aggregate expressions to Partial, PartialMerge, etc will be needed --- .../edu/berkeley/cs/rise/opaque/execution/operators.scala | 2 +- src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala index f00f5a2c00..3a3c22ca2e 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala @@ -263,7 +263,7 @@ case class EncryptedAggregateExec( (groupingExpressions, partialAggExpressions) } case PartialMerge => { - val partialMergeAggExpressions = aggExpressions.map(_.copy(mode = PartialMerge)) + val partialMergeAggExpressions = aggExpressions (groupingExpressions, partialMergeAggExpressions) } case Final => { diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index a175729674..7bcab96ad6 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -173,16 +173,16 @@ object OpaqueOperators extends Strategy { val combinedGroupingExpressions = groupingExpressions ++ namedDistinctExpressions EncryptedAggregateExec(combinedGroupingExpressions, - functionsWithoutDistinct, PartialMerge, partialAggregate) + functionsWithoutDistinct.map(_.copy(mode = PartialMerge)), PartialMerge, partialAggregate) } // 3. Create an Aggregate operator for partial aggregation (for distinct) val partialDistinctAggregate = { EncryptedAggregateExec(groupingExpressions, - functionsWithoutDistinct ++ functionsWithDistinct, PartialMerge, partialMergeAggregate) + functionsWithoutDistinct.map(_.copy(mode = PartialMerge)) ++ functionsWithDistinct.map(_.copy(mode = Partial)), PartialMerge, partialMergeAggregate) } - partialMergeAggregate :: Nil + partialDistinctAggregate :: Nil } } From c4e84a26e5d4f3d8db0364e31624dd941404e39d Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 21:18:33 +0000 Subject: [PATCH 11/31] refactor done, C++ still printing the correct values --- .../cs/rise/opaque/execution/operators.scala | 36 ++++++++++++------- .../berkeley/cs/rise/opaque/strategies.scala | 21 ++++++----- 2 files changed, 36 insertions(+), 21 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala index 3a3c22ca2e..b36cd246f8 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala @@ -235,7 +235,8 @@ case class EncryptedFilterExec(condition: Expression, child: SparkPlan) case class EncryptedAggregateExec( groupingExpressions: Seq[NamedExpression], aggExpressions: Seq[AggregateExpression], - mode: AggregateMode, + // Specify mode if ALL aggExpressions should be case to the desired AggregateMode + mode: Option[AggregateMode], child: SparkPlan) extends UnaryExecNode with OpaqueOperatorExec { @@ -243,45 +244,56 @@ case class EncryptedAggregateExec( AttributeSet(aggExpressions) -- AttributeSet(groupingExpressions) override def output: Seq[Attribute] = mode match { - case Partial => + case Some(Partial) => groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.copy(mode = Partial)).flatMap(_.aggregateFunction.inputAggBufferAttributes) - case PartialMerge => + case Some(PartialMerge) => groupingExpressions.map(_.toAttribute) ++ - aggExpressions.map(_.copy(mode = Partial)).flatMap(_.aggregateFunction.inputAggBufferAttributes) - case Final => + aggExpressions.map(_.copy(mode = PartialMerge)).flatMap(_.aggregateFunction.inputAggBufferAttributes) + case Some(Final) => + groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.resultAttribute) + case Some(Complete) => groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.resultAttribute) - case Complete => + case None => groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.resultAttribute) } override def executeBlocked(): RDD[Block] = { val (groupingExprs, aggExprs) = mode match { - case Partial => { + case Some(Partial) => { val partialAggExpressions = aggExpressions.map(_.copy(mode = Partial)) (groupingExpressions, partialAggExpressions) } - case PartialMerge => { - val partialMergeAggExpressions = aggExpressions + case Some(PartialMerge) => { + val partialMergeAggExpressions = aggExpressions.map(_.copy(mode = PartialMerge)) (groupingExpressions, partialMergeAggExpressions) } - case Final => { + case Some(Final) => { val finalGroupingExpressions = groupingExpressions.map(_.toAttribute) val finalAggExpressions = aggExpressions.map(_.copy(mode = Final)) (finalGroupingExpressions, finalAggExpressions) } - case Complete => { + case Some(Complete) => { (groupingExpressions, aggExpressions.map(_.copy(mode = Complete))) } + case None => { + (groupingExpressions, aggExpressions) + } } val aggExprSer = Utils.serializeAggOp(groupingExprs, aggExprs, child.output) + val isPartial = mode match { + case Some(x) => + x == Partial + case None => + false + } timeOperator(child.asInstanceOf[OpaqueOperatorExec].executeBlocked(), "EncryptedPartialAggregateExec") { childRDD => childRDD.map { block => val (enclave, eid) = Utils.initEnclave() - Block(enclave.NonObliviousAggregate(eid, aggExprSer, block.bytes, (mode == Partial))) + Block(enclave.NonObliviousAggregate(eid, aggExprSer, block.bytes, isPartial)) } } } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index 7bcab96ad6..619a8ef8f7 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -115,32 +115,32 @@ object OpaqueOperators extends Strategy { case 0 => // No distinct aggregate operations if (groupingExpressions.size == 0) { // Global aggregation - val partialAggregate = EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Partial, planLater(child)) + val partialAggregate = EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Some(Partial), planLater(child)) val partialOutput = partialAggregate.output val (projSchema, tag) = tagForGlobalAggregate(partialOutput) EncryptedProjectExec(resultExpressions, - EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Final, + EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Some(Final), EncryptedProjectExec(partialOutput, EncryptedSortExec(Seq(SortOrder(tag, Ascending)), true, EncryptedProjectExec(projSchema, partialAggregate))))) :: Nil } else { // Grouping aggregation EncryptedProjectExec(resultExpressions, - EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Final, + EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Some(Final), EncryptedSortExec(groupingExpressions.map(_.toAttribute).map(e => SortOrder(e, Ascending)), true, - EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Partial, + EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Some(Partial), EncryptedSortExec(groupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)))))) :: Nil } case size if size == 1 => // One distinct aggregate operation if (groupingExpressions.size == 0) { // Global aggregation - val partialAggregate = EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Partial, planLater(child)) + val partialAggregate = EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Some(Partial), planLater(child)) val partialOutput = partialAggregate.output val (projSchema, tag) = tagForGlobalAggregate(partialOutput) EncryptedProjectExec(resultExpressions, - EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Final, + EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Some(Final), EncryptedProjectExec(partialOutput, EncryptedSortExec(Seq(SortOrder(tag, Ascending)), true, EncryptedProjectExec(projSchema, partialAggregate))))) :: Nil @@ -165,7 +165,7 @@ object OpaqueOperators extends Strategy { val combinedGroupingExpressions = groupingExpressions ++ namedDistinctExpressions val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)) - EncryptedAggregateExec(combinedGroupingExpressions, functionsWithoutDistinct, Partial, sorted) + EncryptedAggregateExec(combinedGroupingExpressions, functionsWithoutDistinct, Some(Partial), sorted) } // 2. Create an Aggregate Operator for partial merge aggregations. @@ -173,13 +173,16 @@ object OpaqueOperators extends Strategy { val combinedGroupingExpressions = groupingExpressions ++ namedDistinctExpressions EncryptedAggregateExec(combinedGroupingExpressions, - functionsWithoutDistinct.map(_.copy(mode = PartialMerge)), PartialMerge, partialAggregate) + functionsWithoutDistinct, Some(PartialMerge), partialAggregate) } // 3. Create an Aggregate operator for partial aggregation (for distinct) val partialDistinctAggregate = { + // Indistinct functions operate on aggregation buffers since partial aggregation already called, + // but distinct functions operate on the original input to the aggregation. EncryptedAggregateExec(groupingExpressions, - functionsWithoutDistinct.map(_.copy(mode = PartialMerge)) ++ functionsWithDistinct.map(_.copy(mode = Partial)), PartialMerge, partialMergeAggregate) + functionsWithoutDistinct.map(_.copy(mode = PartialMerge)) ++ + functionsWithDistinct.map(_.copy(mode = Partial)), None, partialMergeAggregate) } partialDistinctAggregate :: Nil From 37c9d69aea2491b1018f8f8ef664fd93e6425c02 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 21:34:52 +0000 Subject: [PATCH 12/31] need to formalize None case in EncryptedAggregateExec.output, but stage 4 passes --- .../cs/rise/opaque/execution/operators.scala | 17 ++++++++++++++++- .../berkeley/cs/rise/opaque/strategies.scala | 10 +++++++++- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala index b36cd246f8..335065b881 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala @@ -255,7 +255,22 @@ case class EncryptedAggregateExec( case Some(Complete) => groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.resultAttribute) case None => - groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.resultAttribute) + groupingExpressions.map(_.toAttribute) ++ + aggExpressions.map(_.copy(mode = Partial)).flatMap(_.aggregateFunction.inputAggBufferAttributes) + /* + val res = groupingExpressions.map(_.toAttribute) + for (aggExpr <- aggExpressions) { + val attributes = aggExpr.mode match { + case Partial | PartialMerge => + aggExpr.aggregateFunction.inputAggBufferAttributes + case _ => + Seq(aggExpr.resultAttribute) + } + res ++ attributes + } + println(res) + res + */ } override def executeBlocked(): RDD[Block] = { diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index 619a8ef8f7..ddfb1420ba 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -184,8 +184,16 @@ object OpaqueOperators extends Strategy { functionsWithoutDistinct.map(_.copy(mode = PartialMerge)) ++ functionsWithDistinct.map(_.copy(mode = Partial)), None, partialMergeAggregate) } + println(partialDistinctAggregate.output) + println(resultExpressions) - partialDistinctAggregate :: Nil + // 4. Create an Aggregate Operator for the final aggregation. + val finalAggregate = { + EncryptedAggregateExec(groupingExpressions, + functionsWithoutDistinct ++ functionsWithDistinct, Some(Final), partialDistinctAggregate) + } + + finalAggregate :: Nil } } From 3e629aa7a636e3c0138294ae8e31b44badbe2ecb Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 21:38:14 +0000 Subject: [PATCH 13/31] distinct and indistinct passes (git add -u) --- src/enclave/Enclave/Aggregate.cpp | 3 --- .../berkeley/cs/rise/opaque/execution/operators.scala | 9 ++------- .../scala/edu/berkeley/cs/rise/opaque/strategies.scala | 6 ++---- 3 files changed, 4 insertions(+), 14 deletions(-) diff --git a/src/enclave/Enclave/Aggregate.cpp b/src/enclave/Enclave/Aggregate.cpp index 4b04ca126a..9587496d59 100644 --- a/src/enclave/Enclave/Aggregate.cpp +++ b/src/enclave/Enclave/Aggregate.cpp @@ -26,7 +26,6 @@ void non_oblivious_aggregate( cur.set(r.next()); if (prev.get() != nullptr && !agg_op_eval.is_same_group(prev.get(), cur.get())) { - cout << to_string(agg_op_eval.evaluate()) << endl; w.append(agg_op_eval.evaluate()); agg_op_eval.reset_group(); } @@ -38,11 +37,9 @@ void non_oblivious_aggregate( // 1. It's a grouping aggregation, OR // 2. It's a global aggregation, the mode is final if (!(count == 0 && (agg_op_eval.get_num_grouping_keys() > 0 || (agg_op_eval.get_num_grouping_keys() == 0 && !is_partial)))) { - cout << to_string(agg_op_eval.evaluate()) << endl; w.append(agg_op_eval.evaluate()); } - cout << "end of aggregate function" << endl; w.output_buffer(output_rows, output_rows_length); } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala index 335065b881..1eca77a158 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala @@ -255,10 +255,7 @@ case class EncryptedAggregateExec( case Some(Complete) => groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.resultAttribute) case None => - groupingExpressions.map(_.toAttribute) ++ - aggExpressions.map(_.copy(mode = Partial)).flatMap(_.aggregateFunction.inputAggBufferAttributes) - /* - val res = groupingExpressions.map(_.toAttribute) + var res = groupingExpressions.map(_.toAttribute) for (aggExpr <- aggExpressions) { val attributes = aggExpr.mode match { case Partial | PartialMerge => @@ -266,11 +263,9 @@ case class EncryptedAggregateExec( case _ => Seq(aggExpr.resultAttribute) } - res ++ attributes + res = res ++ attributes } - println(res) res - */ } override def executeBlocked(): RDD[Block] = { diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index ddfb1420ba..f945c05d0b 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -176,7 +176,7 @@ object OpaqueOperators extends Strategy { functionsWithoutDistinct, Some(PartialMerge), partialAggregate) } - // 3. Create an Aggregate operator for partial aggregation (for distinct) + // 3. Create an Aggregate operator for partial aggregation of distinct aggregate expressions val partialDistinctAggregate = { // Indistinct functions operate on aggregation buffers since partial aggregation already called, // but distinct functions operate on the original input to the aggregation. @@ -184,8 +184,6 @@ object OpaqueOperators extends Strategy { functionsWithoutDistinct.map(_.copy(mode = PartialMerge)) ++ functionsWithDistinct.map(_.copy(mode = Partial)), None, partialMergeAggregate) } - println(partialDistinctAggregate.output) - println(resultExpressions) // 4. Create an Aggregate Operator for the final aggregation. val finalAggregate = { @@ -193,7 +191,7 @@ object OpaqueOperators extends Strategy { functionsWithoutDistinct ++ functionsWithDistinct, Some(Final), partialDistinctAggregate) } - finalAggregate :: Nil + EncryptedProjectExec(resultExpressions, finalAggregate) :: Nil } } From 4abe290dd8e720543baa1fd99ba2460a303589a5 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 21:45:01 +0000 Subject: [PATCH 14/31] general cleanup, None case looks nicer --- .../cs/rise/opaque/execution/operators.scala | 13 +++++-------- .../edu/berkeley/cs/rise/opaque/strategies.scala | 13 ++++++------- .../cs/rise/opaque/OpaqueOperatorTests.scala | 6 ++---- 3 files changed, 13 insertions(+), 19 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala index 1eca77a158..ffe881953a 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala @@ -255,17 +255,14 @@ case class EncryptedAggregateExec( case Some(Complete) => groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.resultAttribute) case None => - var res = groupingExpressions.map(_.toAttribute) - for (aggExpr <- aggExpressions) { - val attributes = aggExpr.mode match { + groupingExpressions.map(_.toAttribute) ++ aggExpressions.flatMap(expr => { + expr.mode match { case Partial | PartialMerge => - aggExpr.aggregateFunction.inputAggBufferAttributes + expr.aggregateFunction.inputAggBufferAttributes case _ => - Seq(aggExpr.resultAttribute) + Seq(expr.resultAttribute) } - res = res ++ attributes - } - res + }) } override def executeBlocked(): RDD[Block] = { diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index f945c05d0b..d630991939 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -146,8 +146,7 @@ object OpaqueOperators extends Strategy { EncryptedProjectExec(projSchema, partialAggregate))))) :: Nil } else { // Grouping aggregation - val distinctExpressions = functionsWithDistinct.head.aggregateFunction.children - val namedDistinctExpressions = distinctExpressions.map { e => + val namedDistinctExpressions = functionsWithDistinct.head.aggregateFunction.children.map { e => e match { case ne: NamedExpression => ne case other => @@ -160,7 +159,7 @@ object OpaqueOperators extends Strategy { } } - // 1. Create an Aggregate Operator for partial aggregations. + // 1. Create an Aggregate operator for partial aggregations. val partialAggregate = { val combinedGroupingExpressions = groupingExpressions ++ namedDistinctExpressions @@ -168,7 +167,7 @@ object OpaqueOperators extends Strategy { EncryptedAggregateExec(combinedGroupingExpressions, functionsWithoutDistinct, Some(Partial), sorted) } - // 2. Create an Aggregate Operator for partial merge aggregations. + // 2. Create an Aggregate operator for partial merge aggregations. val partialMergeAggregate = { val combinedGroupingExpressions = groupingExpressions ++ namedDistinctExpressions @@ -176,16 +175,16 @@ object OpaqueOperators extends Strategy { functionsWithoutDistinct, Some(PartialMerge), partialAggregate) } - // 3. Create an Aggregate operator for partial aggregation of distinct aggregate expressions + // 3. Create an Aggregate operator for partial aggregation of distinct aggregate expressions. val partialDistinctAggregate = { - // Indistinct functions operate on aggregation buffers since partial aggregation already called, + // Indistinct functions operate on aggregation buffers since partial aggregation was already called, // but distinct functions operate on the original input to the aggregation. EncryptedAggregateExec(groupingExpressions, functionsWithoutDistinct.map(_.copy(mode = PartialMerge)) ++ functionsWithDistinct.map(_.copy(mode = Partial)), None, partialMergeAggregate) } - // 4. Create an Aggregate Operator for the final aggregation. + // 4. Create an Aggregate operator for the final aggregation. val finalAggregate = { EncryptedAggregateExec(groupingExpressions, functionsWithoutDistinct ++ functionsWithDistinct, Some(Final), partialDistinctAggregate) diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index e1cc010481..2813ce8508 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -387,10 +387,8 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => testAgainstSpark("aggregate count - distinct and indistinct") { securityLevel => val data = (0 until 32).map{ i => (abc(i), i % 4, i % 8)}.toSeq val words = makeDF(data, securityLevel, "category", "id", "price") - val df = words.groupBy("category").agg(countDistinct("id").as("num_unique_ids"), - count("price").as("num_prices")) - df.show() - df.collect + words.groupBy("category").agg(countDistinct("id").as("num_unique_ids"), + count("price").as("num_prices")).collect } testAgainstSpark("aggregate count - two indistinct") { securityLevel => From f4e6019b19cc706f441eedc21ae1bc6040cf47b4 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 21:55:50 +0000 Subject: [PATCH 15/31] throw error with >1 distinct, add test case for global distinct --- src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala | 3 +++ .../edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index d630991939..4fdb8343e1 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -192,6 +192,9 @@ object OpaqueOperators extends Strategy { EncryptedProjectExec(resultExpressions, finalAggregate) :: Nil } + case _ => { // more than one distinct operations + throw new UnsupportedOperationException("Aggregate operations with more than one distinct expressions is not yet supported") + } } case p @ Union(Seq(left, right)) if isEncrypted(p) => diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index 2813ce8508..12c2935cc2 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -477,6 +477,12 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => words.agg(sum("count").as("totalCount")).collect } + testAgainstSpark("global aggregate count distinct") { securityLevel => + val data = for (i <- 0 until 256) yield (i, abc(i), i % 64) + val words = makeDF(data, securityLevel, "id", "word", "price") + words.agg(countDistinct("price").as("num_unique_prices")).collect + } + testAgainstSpark("global aggregate with 0 rows") { securityLevel => val data = for (i <- 0 until 256) yield (i, abc(i), 1) val words = makeDF(data, securityLevel, "id", "word", "count") From 39acc1a6538bd56f3247f2ce72d5b40b50b6d312 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 22:12:52 +0000 Subject: [PATCH 16/31] no need for global aggregation case --- .../cs/rise/opaque/execution/operators.scala | 2 +- .../berkeley/cs/rise/opaque/strategies.scala | 94 +++++++++---------- .../cs/rise/opaque/OpaqueOperatorTests.scala | 6 +- 3 files changed, 46 insertions(+), 56 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala index ffe881953a..9bd487076c 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala @@ -292,7 +292,7 @@ case class EncryptedAggregateExec( val aggExprSer = Utils.serializeAggOp(groupingExprs, aggExprs, child.output) val isPartial = mode match { case Some(x) => - x == Partial + x == Partial || x == PartialMerge case None => false } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index 4fdb8343e1..30e6e9e57b 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -133,67 +133,57 @@ object OpaqueOperators extends Strategy { EncryptedSortExec(groupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)))))) :: Nil } case size if size == 1 => // One distinct aggregate operation - if (groupingExpressions.size == 0) { - // Global aggregation - val partialAggregate = EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Some(Partial), planLater(child)) - val partialOutput = partialAggregate.output - val (projSchema, tag) = tagForGlobalAggregate(partialOutput) - - EncryptedProjectExec(resultExpressions, - EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Some(Final), - EncryptedProjectExec(partialOutput, - EncryptedSortExec(Seq(SortOrder(tag, Ascending)), true, - EncryptedProjectExec(projSchema, partialAggregate))))) :: Nil - } else { - // Grouping aggregation - val namedDistinctExpressions = functionsWithDistinct.head.aggregateFunction.children.map { e => - e match { - case ne: NamedExpression => ne - case other => - // Keep the name of the original expression. - val name = e match { - case ne: NamedExpression => ne.name - case _ => e.toString - } - Alias(other, name)() - } + // Because we are also grouping on the columns used in the distinct expressions, + // we do not need separate cases for global and grouping aggregation. + + val namedDistinctExpressions = functionsWithDistinct.head.aggregateFunction.children.map { e => + e match { + case ne: NamedExpression => ne + case other => + // Keep the name of the original expression. + val name = e match { + case ne: NamedExpression => ne.name + case _ => e.toString + } + Alias(other, name)() } + } - // 1. Create an Aggregate operator for partial aggregations. - val partialAggregate = { - val combinedGroupingExpressions = groupingExpressions ++ namedDistinctExpressions - - val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)) - EncryptedAggregateExec(combinedGroupingExpressions, functionsWithoutDistinct, Some(Partial), sorted) - } + // 1. Create an Aggregate operator for partial aggregations. + val partialAggregate = { + val combinedGroupingExpressions = groupingExpressions ++ namedDistinctExpressions - // 2. Create an Aggregate operator for partial merge aggregations. - val partialMergeAggregate = { - val combinedGroupingExpressions = groupingExpressions ++ namedDistinctExpressions + val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)) + EncryptedAggregateExec(combinedGroupingExpressions, functionsWithoutDistinct, Some(Partial), sorted) + } - EncryptedAggregateExec(combinedGroupingExpressions, - functionsWithoutDistinct, Some(PartialMerge), partialAggregate) - } + // 2. Create an Aggregate operator for partial merge aggregations. + val partialMergeAggregate = { + val combinedGroupingExpressions = groupingExpressions ++ namedDistinctExpressions - // 3. Create an Aggregate operator for partial aggregation of distinct aggregate expressions. - val partialDistinctAggregate = { - // Indistinct functions operate on aggregation buffers since partial aggregation was already called, - // but distinct functions operate on the original input to the aggregation. - EncryptedAggregateExec(groupingExpressions, - functionsWithoutDistinct.map(_.copy(mode = PartialMerge)) ++ - functionsWithDistinct.map(_.copy(mode = Partial)), None, partialMergeAggregate) - } + EncryptedAggregateExec(combinedGroupingExpressions, + functionsWithoutDistinct, Some(PartialMerge), partialAggregate) + } - // 4. Create an Aggregate operator for the final aggregation. - val finalAggregate = { - EncryptedAggregateExec(groupingExpressions, - functionsWithoutDistinct ++ functionsWithDistinct, Some(Final), partialDistinctAggregate) - } + // 3. Create an Aggregate operator for partial aggregation of distinct aggregate expressions. + val partialDistinctAggregate = { + // Indistinct functions operate on aggregation buffers since partial aggregation was already called, + // but distinct functions operate on the original input to the aggregation. + EncryptedAggregateExec(groupingExpressions, + functionsWithoutDistinct.map(_.copy(mode = PartialMerge)) ++ + functionsWithDistinct.map(_.copy(mode = Partial)), None, partialMergeAggregate) + } - EncryptedProjectExec(resultExpressions, finalAggregate) :: Nil + // 4. Create an Aggregate operator for the final aggregation. + val finalAggregate = { + EncryptedAggregateExec(groupingExpressions, + functionsWithoutDistinct ++ functionsWithDistinct, Some(Final), partialDistinctAggregate) } + + EncryptedProjectExec(resultExpressions, finalAggregate) :: Nil + case _ => { // more than one distinct operations - throw new UnsupportedOperationException("Aggregate operations with more than one distinct expressions is not yet supported") + throw new UnsupportedOperationException("Aggregate operations with more than one distinct expressions are not yet supported.") } } diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index 12c2935cc2..6bee594d0b 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -377,21 +377,21 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => .collect.sortBy { case Row(category: String, _) => category } } - testAgainstSpark("aggregate count - distinct") { securityLevel => + testAgainstSpark("aggregate count distinct") { securityLevel => val data = (0 until 32).map{ i => (abc(i), i % 8)}.toSeq val words = makeDF(data, securityLevel, "category", "price") words.groupBy("category").agg(countDistinct("price").as("num_unique_prices")) .collect.sortBy { case Row(category: String, _) => category } } - testAgainstSpark("aggregate count - distinct and indistinct") { securityLevel => + testAgainstSpark("aggregate count distinct and indistinct") { securityLevel => val data = (0 until 32).map{ i => (abc(i), i % 4, i % 8)}.toSeq val words = makeDF(data, securityLevel, "category", "id", "price") words.groupBy("category").agg(countDistinct("id").as("num_unique_ids"), count("price").as("num_prices")).collect } - testAgainstSpark("aggregate count - two indistinct") { securityLevel => + testAgainstSpark("aggregate count two indistinct") { securityLevel => val data = (0 until 32).map{ i => (abc(i), i % 4, i % 8)}.toSeq val words = makeDF(data, securityLevel, "category", "id", "price") val df = words.groupBy("category", "id").agg( From 33b0d5aa65fe2121cfdc87e2f10ef68517c32058 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 22:27:58 +0000 Subject: [PATCH 17/31] single partition passes all aggregate tests, multiple partition doesn't --- src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index 30e6e9e57b..6127b702f7 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -148,19 +148,16 @@ object OpaqueOperators extends Strategy { Alias(other, name)() } } + val combinedGroupingExpressions = groupingExpressions ++ namedDistinctExpressions // 1. Create an Aggregate operator for partial aggregations. val partialAggregate = { - val combinedGroupingExpressions = groupingExpressions ++ namedDistinctExpressions - val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)) EncryptedAggregateExec(combinedGroupingExpressions, functionsWithoutDistinct, Some(Partial), sorted) } // 2. Create an Aggregate operator for partial merge aggregations. val partialMergeAggregate = { - val combinedGroupingExpressions = groupingExpressions ++ namedDistinctExpressions - EncryptedAggregateExec(combinedGroupingExpressions, functionsWithoutDistinct, Some(PartialMerge), partialAggregate) } From 1c02769983ea74cb1aab27d5c9b54a86b1365af3 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 22:49:22 +0000 Subject: [PATCH 18/31] works with global sort first --- src/enclave/Enclave/Aggregate.cpp | 3 +++ src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala | 5 +++-- .../edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/enclave/Enclave/Aggregate.cpp b/src/enclave/Enclave/Aggregate.cpp index 9587496d59..7e29743ce7 100644 --- a/src/enclave/Enclave/Aggregate.cpp +++ b/src/enclave/Enclave/Aggregate.cpp @@ -26,6 +26,7 @@ void non_oblivious_aggregate( cur.set(r.next()); if (prev.get() != nullptr && !agg_op_eval.is_same_group(prev.get(), cur.get())) { + cout << to_string(agg_op_eval.evaluate()) << endl; w.append(agg_op_eval.evaluate()); agg_op_eval.reset_group(); } @@ -37,9 +38,11 @@ void non_oblivious_aggregate( // 1. It's a grouping aggregation, OR // 2. It's a global aggregation, the mode is final if (!(count == 0 && (agg_op_eval.get_num_grouping_keys() > 0 || (agg_op_eval.get_num_grouping_keys() == 0 && !is_partial)))) { + cout << to_string(agg_op_eval.evaluate()) << endl; w.append(agg_op_eval.evaluate()); } + cout << "end of function" << endl; w.output_buffer(output_rows, output_rows_length); } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index 6127b702f7..869f918c16 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -152,7 +152,7 @@ object OpaqueOperators extends Strategy { // 1. Create an Aggregate operator for partial aggregations. val partialAggregate = { - val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)) + val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), true, planLater(child)) EncryptedAggregateExec(combinedGroupingExpressions, functionsWithoutDistinct, Some(Partial), sorted) } @@ -173,8 +173,9 @@ object OpaqueOperators extends Strategy { // 4. Create an Aggregate operator for the final aggregation. val finalAggregate = { + val sorted = EncryptedSortExec(groupingExpressions.map(_.toAttribute).map(e => SortOrder(e, Ascending)), true, partialDistinctAggregate) EncryptedAggregateExec(groupingExpressions, - functionsWithoutDistinct ++ functionsWithDistinct, Some(Final), partialDistinctAggregate) + functionsWithoutDistinct ++ functionsWithDistinct, Some(Final), sorted) } EncryptedProjectExec(resultExpressions, finalAggregate) :: Nil diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index 6bee594d0b..5769649703 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -388,7 +388,7 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => val data = (0 until 32).map{ i => (abc(i), i % 4, i % 8)}.toSeq val words = makeDF(data, securityLevel, "category", "id", "price") words.groupBy("category").agg(countDistinct("id").as("num_unique_ids"), - count("price").as("num_prices")).collect + count("price").as("num_prices")).collect.toSet } testAgainstSpark("aggregate count two indistinct") { securityLevel => From dbcb18c627432f8492d778ad1ab2025e3d55f0e5 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 22:54:41 +0000 Subject: [PATCH 19/31] works with non-global sort first --- src/enclave/Enclave/Aggregate.cpp | 3 --- src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala | 5 +++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/enclave/Enclave/Aggregate.cpp b/src/enclave/Enclave/Aggregate.cpp index 7e29743ce7..9587496d59 100644 --- a/src/enclave/Enclave/Aggregate.cpp +++ b/src/enclave/Enclave/Aggregate.cpp @@ -26,7 +26,6 @@ void non_oblivious_aggregate( cur.set(r.next()); if (prev.get() != nullptr && !agg_op_eval.is_same_group(prev.get(), cur.get())) { - cout << to_string(agg_op_eval.evaluate()) << endl; w.append(agg_op_eval.evaluate()); agg_op_eval.reset_group(); } @@ -38,11 +37,9 @@ void non_oblivious_aggregate( // 1. It's a grouping aggregation, OR // 2. It's a global aggregation, the mode is final if (!(count == 0 && (agg_op_eval.get_num_grouping_keys() > 0 || (agg_op_eval.get_num_grouping_keys() == 0 && !is_partial)))) { - cout << to_string(agg_op_eval.evaluate()) << endl; w.append(agg_op_eval.evaluate()); } - cout << "end of function" << endl; w.output_buffer(output_rows, output_rows_length); } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index 869f918c16..498858d307 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -152,14 +152,15 @@ object OpaqueOperators extends Strategy { // 1. Create an Aggregate operator for partial aggregations. val partialAggregate = { - val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), true, planLater(child)) + val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)) EncryptedAggregateExec(combinedGroupingExpressions, functionsWithoutDistinct, Some(Partial), sorted) } // 2. Create an Aggregate operator for partial merge aggregations. val partialMergeAggregate = { + val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), true, partialAggregate) EncryptedAggregateExec(combinedGroupingExpressions, - functionsWithoutDistinct, Some(PartialMerge), partialAggregate) + functionsWithoutDistinct, Some(PartialMerge), sorted) } // 3. Create an Aggregate operator for partial aggregation of distinct aggregate expressions. From 3fa757775fbd4d54cae44fe95a348d3267a74275 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 23:00:17 +0000 Subject: [PATCH 20/31] cleanup --- .../scala/edu/berkeley/cs/rise/opaque/strategies.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index 498858d307..3fbae52481 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -152,13 +152,15 @@ object OpaqueOperators extends Strategy { // 1. Create an Aggregate operator for partial aggregations. val partialAggregate = { - val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)) + val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), false, + planLater(child)) EncryptedAggregateExec(combinedGroupingExpressions, functionsWithoutDistinct, Some(Partial), sorted) } // 2. Create an Aggregate operator for partial merge aggregations. val partialMergeAggregate = { - val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), true, partialAggregate) + val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), true, + partialAggregate) EncryptedAggregateExec(combinedGroupingExpressions, functionsWithoutDistinct, Some(PartialMerge), sorted) } @@ -174,7 +176,8 @@ object OpaqueOperators extends Strategy { // 4. Create an Aggregate operator for the final aggregation. val finalAggregate = { - val sorted = EncryptedSortExec(groupingExpressions.map(_.toAttribute).map(e => SortOrder(e, Ascending)), true, partialDistinctAggregate) + val sorted = EncryptedSortExec(groupingExpressions.map(e => SortOrder(e, Ascending)), + true, partialDistinctAggregate) EncryptedAggregateExec(groupingExpressions, functionsWithoutDistinct ++ functionsWithDistinct, Some(Final), sorted) } From 84a79b69a9d1ca2ce8468b3b8ac97908a254e284 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 23:01:45 +0000 Subject: [PATCH 21/31] cleanup tests --- .../cs/rise/opaque/OpaqueOperatorTests.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index 5769649703..d46d3235c3 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -377,11 +377,12 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => .collect.sortBy { case Row(category: String, _) => category } } - testAgainstSpark("aggregate count distinct") { securityLevel => - val data = (0 until 32).map{ i => (abc(i), i % 8)}.toSeq - val words = makeDF(data, securityLevel, "category", "price") - words.groupBy("category").agg(countDistinct("price").as("num_unique_prices")) - .collect.sortBy { case Row(category: String, _) => category } + testAgainstSpark("aggregate count two indistinct") { securityLevel => + val data = (0 until 32).map{ i => (abc(i), i % 4, i % 8)}.toSeq + val words = makeDF(data, securityLevel, "category", "id", "price") + val df = words.groupBy("category", "id").agg( + count("price").as("num_prices")) + df.collect.toSet } testAgainstSpark("aggregate count distinct and indistinct") { securityLevel => @@ -391,12 +392,11 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => count("price").as("num_prices")).collect.toSet } - testAgainstSpark("aggregate count two indistinct") { securityLevel => - val data = (0 until 32).map{ i => (abc(i), i % 4, i % 8)}.toSeq - val words = makeDF(data, securityLevel, "category", "id", "price") - val df = words.groupBy("category", "id").agg( - count("price").as("num_prices")) - df.collect.toSet + testAgainstSpark("aggregate count distinct") { securityLevel => + val data = (0 until 32).map{ i => (abc(i), i % 8)}.toSeq + val words = makeDF(data, securityLevel, "category", "price") + words.groupBy("category").agg(countDistinct("price").as("num_unique_prices")) + .collect.sortBy { case Row(category: String, _) => category } } testAgainstSpark("aggregate first") { securityLevel => From 39384d6aac8dbeb03fc139e2e582ec954395dfc9 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 23:17:25 +0000 Subject: [PATCH 22/31] removed iostream, other nit --- src/enclave/Enclave/Aggregate.cpp | 3 --- src/enclave/Enclave/ExpressionEvaluation.h | 3 --- .../edu/berkeley/cs/rise/opaque/Utils.scala | 24 +++++++------------ .../cs/rise/opaque/execution/operators.scala | 2 +- .../berkeley/cs/rise/opaque/strategies.scala | 2 +- 5 files changed, 10 insertions(+), 24 deletions(-) diff --git a/src/enclave/Enclave/Aggregate.cpp b/src/enclave/Enclave/Aggregate.cpp index 9587496d59..e434f77e37 100644 --- a/src/enclave/Enclave/Aggregate.cpp +++ b/src/enclave/Enclave/Aggregate.cpp @@ -5,9 +5,6 @@ #include "FlatbuffersWriters.h" #include "common.h" -#include -using namespace std; - void non_oblivious_aggregate( uint8_t *agg_op, size_t agg_op_length, uint8_t *input_rows, size_t input_rows_length, diff --git a/src/enclave/Enclave/ExpressionEvaluation.h b/src/enclave/Enclave/ExpressionEvaluation.h index 3d16ef7d0b..0f48c56d48 100644 --- a/src/enclave/Enclave/ExpressionEvaluation.h +++ b/src/enclave/Enclave/ExpressionEvaluation.h @@ -7,9 +7,6 @@ #include "Flatbuffers.h" -#include -using namespace std; - int printf(const char *fmt, ...); #ifndef EXPRESSION_EVALUATION_H diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index d1cbee78cb..abe2465fc8 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -1371,8 +1371,7 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) - ) + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray)) case c @ Count(children) => val count = c.aggBufferAttributes(0) @@ -1414,8 +1413,7 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) - ) + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray)) case f @ First(child, false) => val first = f.aggBufferAttributes(0) @@ -1453,8 +1451,7 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) - ) + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray)) case l @ Last(child, false) => val last = l.aggBufferAttributes(0) @@ -1492,8 +1489,7 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) - ) + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray)) case m @ Max(child) => val max = m.aggBufferAttributes(0) @@ -1526,8 +1522,7 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) - ) + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray)) case m @ Min(child) => val min = m.aggBufferAttributes(0) @@ -1560,8 +1555,7 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) - ) + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray)) case s @ Sum(child) => val sum = s.aggBufferAttributes(0) @@ -1599,8 +1593,7 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) - ) + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray)) case vs @ ScalaUDAF(Seq(child), _: VectorSum, _, _) => val sum = vs.aggBufferAttributes(0) @@ -1635,8 +1628,7 @@ object Utils extends Logging { updateExprs.map(e => flatbuffersSerializeExpression(builder, e, concatSchema)).toArray), tuix.AggregateExpr.createEvaluateExprsVector( builder, - evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray) - ) + evaluateExprs.map(e => flatbuffersSerializeExpression(builder, e, aggSchema)).toArray)) } } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala index 9bd487076c..6a138be23b 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala @@ -235,7 +235,7 @@ case class EncryptedFilterExec(condition: Expression, child: SparkPlan) case class EncryptedAggregateExec( groupingExpressions: Seq[NamedExpression], aggExpressions: Seq[AggregateExpression], - // Specify mode if ALL aggExpressions should be case to the desired AggregateMode + // Specify mode if ALL aggExpressions should be cast to the desired AggregateMode mode: Option[AggregateMode], child: SparkPlan) extends UnaryExecNode with OpaqueOperatorExec { diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index 3fbae52481..56cc7fa23c 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -184,7 +184,7 @@ object OpaqueOperators extends Strategy { EncryptedProjectExec(resultExpressions, finalAggregate) :: Nil - case _ => { // more than one distinct operations + case _ => { // More than one distinct operations throw new UnsupportedOperationException("Aggregate operations with more than one distinct expressions are not yet supported.") } } From 00dfe76e0d2dcf73a1c13c3b677ad71a561fce48 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Wed, 24 Feb 2021 23:22:31 +0000 Subject: [PATCH 23/31] added test case for 13 --- .../edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala | 2 +- .../scala/edu/berkeley/cs/rise/opaque/execution/operators.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala index c235265624..ac964d7b8f 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.SQLContext object TPCHBenchmark { // Add query numbers here once they are supported - val supportedQueries = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 17, 19, 20, 22) + val supportedQueries = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 17, 19, 20, 22) def query(queryNumber: Int, tpch: TPCH, sqlContext: SQLContext, numPartitions: Int) = { val sqlStr = tpch.getQuery(queryNumber) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala index 590f32debe..beff968d46 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala @@ -234,7 +234,7 @@ case class EncryptedFilterExec(condition: Expression, child: SparkPlan) case class EncryptedAggregateExec( groupingExpressions: Seq[NamedExpression], aggExpressions: Seq[AggregateExpression], - // Specify mode if ALL aggExpressions should be cast to the desired AggregateMode + // Specify mode if ALL aggExpressions should be cast to the provided AggregateMode mode: Option[AggregateMode], child: SparkPlan) extends UnaryExecNode with OpaqueOperatorExec { From db978aeb6c296af357738115dbec14785a83eab2 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Thu, 25 Feb 2021 00:58:27 +0000 Subject: [PATCH 24/31] None case in isPartial match done properly --- .../edu/berkeley/cs/rise/opaque/execution/operators.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala index beff968d46..2f6288b095 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala @@ -290,10 +290,11 @@ case class EncryptedAggregateExec( val aggExprSer = Utils.serializeAggOp(groupingExprs, aggExprs, child.output) val isPartial = mode match { - case Some(x) => - x == Partial || x == PartialMerge + case Some(mode) => + mode == Partial || mode == PartialMerge case None => - false + aggExpressions.map(expr => expr.mode) + .exists(mode => mode == Partial || mode == PartialMerge) } timeOperator(child.asInstanceOf[OpaqueOperatorExec].executeBlocked(), "EncryptedPartialAggregateExec") { From a6f6e37ac1a209e3f2a877b0e427e4773c97eed8 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Thu, 25 Feb 2021 21:23:02 +0000 Subject: [PATCH 25/31] added test cases for sumDistinct --- .../cs/rise/opaque/OpaqueOperatorTests.scala | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index 65e5a7545e..1a28d4a8c1 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -461,14 +461,6 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => .collect.sortBy { case Row(category: String, _) => category } } - testAgainstSpark("aggregate count two indistinct") { securityLevel => - val data = (0 until 32).map{ i => (abc(i), i % 4, i % 8)}.toSeq - val words = makeDF(data, securityLevel, "category", "id", "price") - val df = words.groupBy("category", "id").agg( - count("price").as("num_prices")) - df.collect.toSet - } - testAgainstSpark("aggregate count distinct and indistinct") { securityLevel => val data = (0 until 32).map{ i => (abc(i), i % 4, i % 8)}.toSeq val words = makeDF(data, securityLevel, "category", "id", "price") @@ -530,6 +522,20 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => .collect.sortBy { case Row(word: String, _) => word } } + testAgainstSpark("aggregate sum distinct and indistinct") { securityLevel => + val data = (0 until 32).map{ i => (abc(i), i % 4, i % 8)}.toSeq + val words = makeDF(data, securityLevel, "category", "id", "price") + words.groupBy("category").agg(sumDistinct("id").as("sum_unique_ids"), + count("price").as("num_prices")).collect.toSet + } + + testAgainstSpark("aggregate sum distinct") { securityLevel => + val data = (0 until 32).map{ i => (abc(i), i % 8)}.toSeq + val words = makeDF(data, securityLevel, "category", "price") + words.groupBy("category").agg(sumDistinct("price").as("sum_unique_prices")) + .collect.sortBy { case Row(category: String, _) => category } + } + testAgainstSpark("aggregate on multiple columns") { securityLevel => val data = for (i <- 0 until 256) yield (abc(i), 1, 1.0f) val words = makeDF(data, securityLevel, "str", "x", "y") From 11b746d85f7185f673b01de2bca0abcdc1c6c567 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Thu, 25 Feb 2021 21:49:45 +0000 Subject: [PATCH 26/31] case-specific namedDistinctExpressions working --- src/enclave/Enclave/Aggregate.cpp | 6 ++++++ src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala | 5 +++++ .../scala/edu/berkeley/cs/rise/opaque/strategies.scala | 7 +++++-- .../edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala | 6 ++++-- 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/enclave/Enclave/Aggregate.cpp b/src/enclave/Enclave/Aggregate.cpp index e434f77e37..7e29743ce7 100644 --- a/src/enclave/Enclave/Aggregate.cpp +++ b/src/enclave/Enclave/Aggregate.cpp @@ -5,6 +5,9 @@ #include "FlatbuffersWriters.h" #include "common.h" +#include +using namespace std; + void non_oblivious_aggregate( uint8_t *agg_op, size_t agg_op_length, uint8_t *input_rows, size_t input_rows_length, @@ -23,6 +26,7 @@ void non_oblivious_aggregate( cur.set(r.next()); if (prev.get() != nullptr && !agg_op_eval.is_same_group(prev.get(), cur.get())) { + cout << to_string(agg_op_eval.evaluate()) << endl; w.append(agg_op_eval.evaluate()); agg_op_eval.reset_group(); } @@ -34,9 +38,11 @@ void non_oblivious_aggregate( // 1. It's a grouping aggregation, OR // 2. It's a global aggregation, the mode is final if (!(count == 0 && (agg_op_eval.get_num_grouping_keys() > 0 || (agg_op_eval.get_num_grouping_keys() == 0 && !is_partial)))) { + cout << to_string(agg_op_eval.evaluate()) << endl; w.append(agg_op_eval.evaluate()); } + cout << "end of function" << endl; w.output_buffer(output_rows, output_rows_length); } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala index 8314a266cd..b37e7a4207 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala @@ -1598,6 +1598,11 @@ object Utils extends Logging { val sumUpdateExpr = If(IsNull(partialSum), sum, partialSum) (Seq(sumUpdateExpr), Seq(sum)) } + case PartialMerge => { + val partialSum = Add(If(IsNull(sum), Literal.default(sumDataType), sum), s.inputAggBufferAttributes(0)) + val sumUpdateExpr = If(IsNull(partialSum), sum, partialSum) + (Seq(sumUpdateExpr), Seq(sum)) + } case Final => { val partialSum = Add(If(IsNull(sum), Literal.default(sumDataType), sum), s.inputAggBufferAttributes(0)) val sumUpdateExpr = If(IsNull(partialSum), sum, partialSum) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index eeb06d72d7..7492b24020 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -168,14 +168,16 @@ object OpaqueOperators extends Strategy { case ne: NamedExpression => ne case other => // Keep the name of the original expression. - val name = e match { + val name = e.children(0) match { case ne: NamedExpression => ne.name case _ => e.toString } - Alias(other, name)() + e.children(0).asInstanceOf[NamedExpression] + // Alias(other, name)() } } val combinedGroupingExpressions = groupingExpressions ++ namedDistinctExpressions + println(combinedGroupingExpressions) // 1. Create an Aggregate operator for partial aggregations. val partialAggregate = { @@ -210,6 +212,7 @@ object OpaqueOperators extends Strategy { } EncryptedProjectExec(resultExpressions, finalAggregate) :: Nil + // partialMergeAggregate :: Nil case _ => { // More than one distinct operations throw new UnsupportedOperationException("Aggregate operations with more than one distinct expressions are not yet supported.") diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index 1a28d4a8c1..b5aa2f6102 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -525,8 +525,10 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => testAgainstSpark("aggregate sum distinct and indistinct") { securityLevel => val data = (0 until 32).map{ i => (abc(i), i % 4, i % 8)}.toSeq val words = makeDF(data, securityLevel, "category", "id", "price") - words.groupBy("category").agg(sumDistinct("id").as("sum_unique_ids"), - count("price").as("num_prices")).collect.toSet + val df = words.groupBy("category").agg(sumDistinct("id").as("sum_unique_ids"), + sum("price").as("sum_prices")) + df.show() + df.collect.toSet } testAgainstSpark("aggregate sum distinct") { securityLevel => From 8ba80b6dc913d7e1400e25ec3103f1d6b01f71e3 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Thu, 25 Feb 2021 22:03:33 +0000 Subject: [PATCH 27/31] distinct sum is done --- src/enclave/Enclave/Aggregate.cpp | 3 --- .../berkeley/cs/rise/opaque/strategies.scala | 19 ++++++++----------- .../cs/rise/opaque/OpaqueOperatorTests.scala | 6 ++---- 3 files changed, 10 insertions(+), 18 deletions(-) diff --git a/src/enclave/Enclave/Aggregate.cpp b/src/enclave/Enclave/Aggregate.cpp index 7e29743ce7..9587496d59 100644 --- a/src/enclave/Enclave/Aggregate.cpp +++ b/src/enclave/Enclave/Aggregate.cpp @@ -26,7 +26,6 @@ void non_oblivious_aggregate( cur.set(r.next()); if (prev.get() != nullptr && !agg_op_eval.is_same_group(prev.get(), cur.get())) { - cout << to_string(agg_op_eval.evaluate()) << endl; w.append(agg_op_eval.evaluate()); agg_op_eval.reset_group(); } @@ -38,11 +37,9 @@ void non_oblivious_aggregate( // 1. It's a grouping aggregation, OR // 2. It's a global aggregation, the mode is final if (!(count == 0 && (agg_op_eval.get_num_grouping_keys() > 0 || (agg_op_eval.get_num_grouping_keys() == 0 && !is_partial)))) { - cout << to_string(agg_op_eval.evaluate()) << endl; w.append(agg_op_eval.evaluate()); } - cout << "end of function" << endl; w.output_buffer(output_rows, output_rows_length); } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index 7492b24020..eb6699e50e 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -163,21 +163,18 @@ object OpaqueOperators extends Strategy { // Because we are also grouping on the columns used in the distinct expressions, // we do not need separate cases for global and grouping aggregation. - val namedDistinctExpressions = functionsWithDistinct.head.aggregateFunction.children.map { e => + // We need to extract named expressions from the children of the distinct aggregate functions + // in order to group by those columns. + val namedDistinctExpressions = functionsWithDistinct.head.aggregateFunction.children.flatMap{ e => e match { - case ne: NamedExpression => ne - case other => - // Keep the name of the original expression. - val name = e.children(0) match { - case ne: NamedExpression => ne.name - case _ => e.toString - } - e.children(0).asInstanceOf[NamedExpression] - // Alias(other, name)() + case ne: NamedExpression => + Seq(ne) + case _ => + e.children.filter(child => child.isInstanceOf[NamedExpression]) + .map(child => child.asInstanceOf[NamedExpression]) } } val combinedGroupingExpressions = groupingExpressions ++ namedDistinctExpressions - println(combinedGroupingExpressions) // 1. Create an Aggregate operator for partial aggregations. val partialAggregate = { diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index b5aa2f6102..428a4e41e8 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -525,10 +525,8 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => testAgainstSpark("aggregate sum distinct and indistinct") { securityLevel => val data = (0 until 32).map{ i => (abc(i), i % 4, i % 8)}.toSeq val words = makeDF(data, securityLevel, "category", "id", "price") - val df = words.groupBy("category").agg(sumDistinct("id").as("sum_unique_ids"), - sum("price").as("sum_prices")) - df.show() - df.collect.toSet + words.groupBy("category").agg(sumDistinct("id").as("sum_unique_ids"), + sum("price").as("sum_prices")).collect.toSet } testAgainstSpark("aggregate sum distinct") { securityLevel => From f9bb4c445ac8b25565d44a334b271b1ea8c96ecb Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Thu, 25 Feb 2021 22:07:26 +0000 Subject: [PATCH 28/31] removed comments --- src/enclave/Enclave/Aggregate.cpp | 3 --- src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala | 1 - 2 files changed, 4 deletions(-) diff --git a/src/enclave/Enclave/Aggregate.cpp b/src/enclave/Enclave/Aggregate.cpp index 9587496d59..e434f77e37 100644 --- a/src/enclave/Enclave/Aggregate.cpp +++ b/src/enclave/Enclave/Aggregate.cpp @@ -5,9 +5,6 @@ #include "FlatbuffersWriters.h" #include "common.h" -#include -using namespace std; - void non_oblivious_aggregate( uint8_t *agg_op, size_t agg_op_length, uint8_t *input_rows, size_t input_rows_length, diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index eb6699e50e..901c56a8ec 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -209,7 +209,6 @@ object OpaqueOperators extends Strategy { } EncryptedProjectExec(resultExpressions, finalAggregate) :: Nil - // partialMergeAggregate :: Nil case _ => { // More than one distinct operations throw new UnsupportedOperationException("Aggregate operations with more than one distinct expressions are not yet supported.") From 5aefa65810774a21301f52d3f03afde74149badf Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Fri, 26 Feb 2021 18:44:24 +0000 Subject: [PATCH 29/31] got rid of mode argument --- .../cs/rise/opaque/execution/operators.scala | 70 ++++--------------- .../berkeley/cs/rise/opaque/strategies.scala | 17 ++--- 2 files changed, 24 insertions(+), 63 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala index 2f6288b095..38faca78f6 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/execution/operators.scala @@ -233,69 +233,29 @@ case class EncryptedFilterExec(condition: Expression, child: SparkPlan) case class EncryptedAggregateExec( groupingExpressions: Seq[NamedExpression], - aggExpressions: Seq[AggregateExpression], - // Specify mode if ALL aggExpressions should be cast to the provided AggregateMode - mode: Option[AggregateMode], + aggregateExpressions: Seq[AggregateExpression], child: SparkPlan) extends UnaryExecNode with OpaqueOperatorExec { override def producedAttributes: AttributeSet = - AttributeSet(aggExpressions) -- AttributeSet(groupingExpressions) - - override def output: Seq[Attribute] = mode match { - case Some(Partial) => - groupingExpressions.map(_.toAttribute) ++ - aggExpressions.map(_.copy(mode = Partial)).flatMap(_.aggregateFunction.inputAggBufferAttributes) - case Some(PartialMerge) => - groupingExpressions.map(_.toAttribute) ++ - aggExpressions.map(_.copy(mode = PartialMerge)).flatMap(_.aggregateFunction.inputAggBufferAttributes) - case Some(Final) => - groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.resultAttribute) - case Some(Complete) => - groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.resultAttribute) - case None => - groupingExpressions.map(_.toAttribute) ++ aggExpressions.flatMap(expr => { - expr.mode match { - case Partial | PartialMerge => - expr.aggregateFunction.inputAggBufferAttributes - case _ => - Seq(expr.resultAttribute) - } - }) - } + AttributeSet(aggregateExpressions) -- AttributeSet(groupingExpressions) + + override def output: Seq[Attribute] = groupingExpressions.map(_.toAttribute) ++ + aggregateExpressions.flatMap(expr => { + expr.mode match { + case Partial | PartialMerge => + expr.aggregateFunction.inputAggBufferAttributes + case _ => + Seq(expr.resultAttribute) + } + }) override def executeBlocked(): RDD[Block] = { - val (groupingExprs, aggExprs) = mode match { - case Some(Partial) => { - val partialAggExpressions = aggExpressions.map(_.copy(mode = Partial)) - (groupingExpressions, partialAggExpressions) - } - case Some(PartialMerge) => { - val partialMergeAggExpressions = aggExpressions.map(_.copy(mode = PartialMerge)) - (groupingExpressions, partialMergeAggExpressions) - } - case Some(Final) => { - val finalGroupingExpressions = groupingExpressions.map(_.toAttribute) - val finalAggExpressions = aggExpressions.map(_.copy(mode = Final)) - (finalGroupingExpressions, finalAggExpressions) - } - case Some(Complete) => { - (groupingExpressions, aggExpressions.map(_.copy(mode = Complete))) - } - case None => { - (groupingExpressions, aggExpressions) - } - } + val aggExprSer = Utils.serializeAggOp(groupingExpressions, aggregateExpressions, child.output) + val isPartial = aggregateExpressions.map(expr => expr.mode) + .exists(mode => mode == Partial || mode == PartialMerge) - val aggExprSer = Utils.serializeAggOp(groupingExprs, aggExprs, child.output) - val isPartial = mode match { - case Some(mode) => - mode == Partial || mode == PartialMerge - case None => - aggExpressions.map(expr => expr.mode) - .exists(mode => mode == Partial || mode == PartialMerge) - } timeOperator(child.asInstanceOf[OpaqueOperatorExec].executeBlocked(), "EncryptedPartialAggregateExec") { childRDD => childRDD.map { block => diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index 901c56a8ec..82af622587 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -142,21 +142,22 @@ object OpaqueOperators extends Strategy { case 0 => // No distinct aggregate operations if (groupingExpressions.size == 0) { // Global aggregation - val partialAggregate = EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Some(Partial), planLater(child)) + val partialAggregate = EncryptedAggregateExec(groupingExpressions, + aggregateExpressions.map(_.copy(mode = Partial)), planLater(child)) val partialOutput = partialAggregate.output val (projSchema, tag) = tagForGlobalAggregate(partialOutput) EncryptedProjectExec(resultExpressions, - EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Some(Final), + EncryptedAggregateExec(groupingExpressions, aggregateExpressions.map(_.copy(mode = Final)), EncryptedProjectExec(partialOutput, EncryptedSortExec(Seq(SortOrder(tag, Ascending)), true, EncryptedProjectExec(projSchema, partialAggregate))))) :: Nil } else { // Grouping aggregation EncryptedProjectExec(resultExpressions, - EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Some(Final), + EncryptedAggregateExec(groupingExpressions, aggregateExpressions.map(_.copy(mode = Final)), EncryptedSortExec(groupingExpressions.map(_.toAttribute).map(e => SortOrder(e, Ascending)), true, - EncryptedAggregateExec(groupingExpressions, aggregateExpressions, Some(Partial), + EncryptedAggregateExec(groupingExpressions, aggregateExpressions.map(_.copy(mode = Partial)), EncryptedSortExec(groupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)))))) :: Nil } case size if size == 1 => // One distinct aggregate operation @@ -180,7 +181,7 @@ object OpaqueOperators extends Strategy { val partialAggregate = { val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), false, planLater(child)) - EncryptedAggregateExec(combinedGroupingExpressions, functionsWithoutDistinct, Some(Partial), sorted) + EncryptedAggregateExec(combinedGroupingExpressions, functionsWithoutDistinct.map(_.copy(mode = Partial)), sorted) } // 2. Create an Aggregate operator for partial merge aggregations. @@ -188,7 +189,7 @@ object OpaqueOperators extends Strategy { val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), true, partialAggregate) EncryptedAggregateExec(combinedGroupingExpressions, - functionsWithoutDistinct, Some(PartialMerge), sorted) + functionsWithoutDistinct.map(_.copy(mode = PartialMerge)), sorted) } // 3. Create an Aggregate operator for partial aggregation of distinct aggregate expressions. @@ -197,7 +198,7 @@ object OpaqueOperators extends Strategy { // but distinct functions operate on the original input to the aggregation. EncryptedAggregateExec(groupingExpressions, functionsWithoutDistinct.map(_.copy(mode = PartialMerge)) ++ - functionsWithDistinct.map(_.copy(mode = Partial)), None, partialMergeAggregate) + functionsWithDistinct.map(_.copy(mode = Partial)), partialMergeAggregate) } // 4. Create an Aggregate operator for the final aggregation. @@ -205,7 +206,7 @@ object OpaqueOperators extends Strategy { val sorted = EncryptedSortExec(groupingExpressions.map(e => SortOrder(e, Ascending)), true, partialDistinctAggregate) EncryptedAggregateExec(groupingExpressions, - functionsWithoutDistinct ++ functionsWithDistinct, Some(Final), sorted) + (functionsWithoutDistinct ++ functionsWithDistinct).map(_.copy(mode = Final)), sorted) } EncryptedProjectExec(resultExpressions, finalAggregate) :: Nil From c07843b6bdf23e8a64b0879baa47cdfac8031558 Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Fri, 26 Feb 2021 18:53:46 +0000 Subject: [PATCH 30/31] tests include null values --- .../cs/rise/opaque/OpaqueOperatorTests.scala | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala index 428a4e41e8..59e9ca0f46 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/OpaqueOperatorTests.scala @@ -462,14 +462,24 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => } testAgainstSpark("aggregate count distinct and indistinct") { securityLevel => - val data = (0 until 32).map{ i => (abc(i), i % 4, i % 8)}.toSeq + val data = (0 until 64).map{ i => + if (i % 6 == 0) + (abc(i), null.asInstanceOf[Int], i % 8) + else + (abc(i), i % 4, i % 8) + }.toSeq val words = makeDF(data, securityLevel, "category", "id", "price") words.groupBy("category").agg(countDistinct("id").as("num_unique_ids"), count("price").as("num_prices")).collect.toSet } testAgainstSpark("aggregate count distinct") { securityLevel => - val data = (0 until 32).map{ i => (abc(i), i % 8)}.toSeq + val data = (0 until 64).map{ i => + if (i % 6 == 0) + (abc(i), null.asInstanceOf[Int]) + else + (abc(i), i % 8) + }.toSeq val words = makeDF(data, securityLevel, "category", "price") words.groupBy("category").agg(countDistinct("price").as("num_unique_prices")) .collect.sortBy { case Row(category: String, _) => category } @@ -523,14 +533,24 @@ trait OpaqueOperatorTests extends OpaqueTestsBase { self => } testAgainstSpark("aggregate sum distinct and indistinct") { securityLevel => - val data = (0 until 32).map{ i => (abc(i), i % 4, i % 8)}.toSeq + val data = (0 until 64).map{ i => + if (i % 6 == 0) + (abc(i), null.asInstanceOf[Int], i % 8) + else + (abc(i), i % 4, i % 8) + }.toSeq val words = makeDF(data, securityLevel, "category", "id", "price") words.groupBy("category").agg(sumDistinct("id").as("sum_unique_ids"), sum("price").as("sum_prices")).collect.toSet } testAgainstSpark("aggregate sum distinct") { securityLevel => - val data = (0 until 32).map{ i => (abc(i), i % 8)}.toSeq + val data = (0 until 64).map{ i => + if (i % 6 == 0) + (abc(i), null.asInstanceOf[Int]) + else + (abc(i), i % 8) + }.toSeq val words = makeDF(data, securityLevel, "category", "price") words.groupBy("category").agg(sumDistinct("price").as("sum_unique_prices")) .collect.sortBy { case Row(category: String, _) => category } From 2fff873424d9930d37da45ff690ad9a91afbff2c Mon Sep 17 00:00:00 2001 From: Octavian Sima Date: Fri, 26 Feb 2021 19:02:24 +0000 Subject: [PATCH 31/31] partition followed by local sort instead of first global sort --- .../scala/edu/berkeley/cs/rise/opaque/strategies.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala index 82af622587..a774115516 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/strategies.scala @@ -186,8 +186,14 @@ object OpaqueOperators extends Strategy { // 2. Create an Aggregate operator for partial merge aggregations. val partialMergeAggregate = { - val sorted = EncryptedSortExec(combinedGroupingExpressions.map(e => SortOrder(e, Ascending)), true, - partialAggregate) + // Partition based on the final grouping expressions. + val partitionOrder = groupingExpressions.map(e => SortOrder(e, Ascending)) + val partitioned = EncryptedRangePartitionExec(partitionOrder, partialAggregate) + + // Local sort on the combined grouping expressions. + val sortOrder = combinedGroupingExpressions.map(e => SortOrder(e, Ascending)) + val sorted = EncryptedSortExec(sortOrder, false, partitioned) + EncryptedAggregateExec(combinedGroupingExpressions, functionsWithoutDistinct.map(_.copy(mode = PartialMerge)), sorted) }