From f14393aa5c0f182ef4ab1621f41d92d45bcfa210 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 4 Mar 2016 09:46:55 +0000 Subject: [PATCH 1/8] Init import for adding wholestage codegen support to Sample. --- .../sql/execution/BufferedRowIterator.java | 10 ++ .../sql/execution/WholeStageCodegen.scala | 4 +- .../spark/sql/execution/basicOperators.scala | 103 +++++++++++++++++- .../BenchmarkWholeStageCodegen.scala | 25 +++++ 4 files changed, 135 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java b/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java index 1d1d7edb240dd..8982f09bd9fdd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java @@ -35,6 +35,8 @@ public abstract class BufferedRowIterator { // used when there is no column in output protected UnsafeRow unsafeRow = new UnsafeRow(0); + protected int partitionIndex = -1; + public boolean hasNext() throws IOException { if (currentRows.isEmpty()) { processNext(); @@ -51,6 +53,14 @@ public InternalRow next() { */ public abstract void init(Iterator iters[]); + /** + * Initializes from array of iterators of InternalRow. + */ + public void init(int index, Iterator iters[]) { + partitionIndex = index; + init(iters); + } + /** * Append a row to currentRows. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index cb68ca6ada366..fe3183b341bc2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -297,10 +297,10 @@ case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan]) val rdds = plan.upstreams() assert(rdds.size <= 2, "Up to two upstream RDDs can be supported") if (rdds.length == 1) { - rdds.head.mapPartitions { iter => + rdds.head.mapPartitionsWithIndex { (index, iter) => val clazz = CodeGenerator.compile(cleanedSource) val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator] - buffer.init(Array(iter)) + buffer.init(index, Array(iter)) new Iterator[InternalRow] { override def hasNext: Boolean = buffer.hasNext override def next: InternalRow = buffer.next() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index b2f443c0e9ae6..8efc4792a39ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -20,11 +20,11 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer, GenerateUnsafeProjection} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics} import org.apache.spark.sql.types.LongType -import org.apache.spark.util.random.PoissonSampler +import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode with CodegenSupport { @@ -127,7 +127,7 @@ case class Sample( upperBound: Double, withReplacement: Boolean, seed: Long, - child: SparkPlan) extends UnaryNode { + child: SparkPlan) extends UnaryNode with CodegenSupport { override def output: Seq[Attribute] = child.output protected override def doExecute(): RDD[InternalRow] = { @@ -143,6 +143,99 @@ case class Sample( child.execute().randomSampleWithRange(lowerBound, upperBound, seed) } } + + override def upstreams(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].upstreams() + } + + private var rowBuffer: String = _ + + protected override def doProduce(ctx: CodegenContext): String = { + val needToProduce = ctx.freshName("needToProduce") + ctx.addMutableState("boolean", needToProduce, s"$needToProduce = true;") + + rowBuffer = ctx.freshName("rowBuffer") + ctx.addMutableState("scala.collection.mutable.ArrayBuffer", rowBuffer, + s"$rowBuffer = new scala.collection.mutable.ArrayBuffer();") + + val addToBuffer = ctx.freshName("addToBuffer") + ctx.addNewFunction(addToBuffer, + s""" + | private void $addToBuffer() throws java.io.IOException { + | ${child.asInstanceOf[CodegenSupport].produce(ctx, this)} + | } + """.stripMargin.trim) + + val sampledIterator = ctx.freshName("sampledIterator") + ctx.addMutableState("scala.collection.Iterator", sampledIterator, "") + + val outputRow = ctx.freshName("outputRow") + + val sampleCodes = if (withReplacement) { + val samplerClass = classOf[PoissonSampler[UnsafeRow]].getName + val classTag = ctx.freshName("classTag") + val classTagClass = "scala.reflect.ClassTag" + ctx.addMutableState(classTagClass, classTag, + s"$classTag = ($classTagClass)scala.reflect.ClassTag$$.MODULE$$.apply(UnsafeRow.class);") + + val sampler = ctx.freshName("sampler") + ctx.addMutableState(s"$samplerClass", sampler, + s"$sampler = new $samplerClass($upperBound - $lowerBound, false, $classTag);") + + val random = ctx.freshName("random") + val randomSeed = ctx.freshName("randomSeed") + val loopCount = ctx.freshName("loopCount") + s""" + | java.util.Random $random = new java.util.Random(${seed}L); + | long $randomSeed = $random.nextLong(); + | int $loopCount = 0; + | while ($loopCount < partitionIndex) { + | $randomSeed = $random.nextLong(); + | $loopCount += 1; + | } + | $sampler.setSeed($randomSeed); + | $sampledIterator = $sampler.sample($rowBuffer.toIterator()); + """.stripMargin + } else { + val samplerClass = classOf[BernoulliCellSampler[UnsafeRow]].getName + val sampler = ctx.freshName("sampler") + ctx.addMutableState(s"$samplerClass", sampler, + s"$sampler = new $samplerClass($lowerBound, $upperBound, false);") + + s""" + | $sampler.setSeed(${seed}L + partitionIndex); + | $sampledIterator = $sampler.sample($rowBuffer.toIterator()); + """.stripMargin + } + + s""" + | if ($needToProduce) { + | $addToBuffer(); + | $sampleCodes + | $needToProduce = false; + | } + | + | while ($sampledIterator != null && $sampledIterator.hasNext()) { + | UnsafeRow $outputRow = (UnsafeRow)$sampledIterator.next(); + | ${consume(ctx, null, outputRow)} + | } + """.stripMargin + } + + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { + val colExprs = child.output.zipWithIndex.map { case (attr, i) => + BoundReference(i, attr.dataType, attr.nullable) + } + + ctx.currentVars = input + val code = GenerateUnsafeProjection.createCode(ctx, colExprs) + + s""" + | // Convert the input attributes to an UnsafeRow and add it to the iterator. + | ${code.code} + | $rowBuffer.$$plus$$eq(${code.value}.copy()); + """.stripMargin.trim + } } case class Range( @@ -221,8 +314,8 @@ case class Range( | // initialize Range | if (!$initTerm) { | $initTerm = true; - | if ($input.hasNext()) { - | initRange(((InternalRow) $input.next()).getInt(0)); + | if (partitionIndex != -1) { + | initRange(partitionIndex); | } else { | return; | } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 2d3e34d0e1292..baa73ef1af822 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -84,6 +84,31 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { */ } + ignore("range/sample") { + val N = 50 << 10 + runBenchmark("range/sample withRep.", N) { + sqlContext.range(N).sample(true, 0.8).collect() + } + /* + Westmere E56xx/L56xx/X56xx (Nehalem-C) + range/sample withRep.: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + range/sample withRep. codegen=false 149 / 238 0.3 2908.4 1.0X + range/sample withRep. codegen=true 192 / 206 0.3 3751.7 0.8X + */ + + runBenchmark("range/sample", N) { + sqlContext.range(N).sample(false, 0.8).collect() + } + /* + Westmere E56xx/L56xx/X56xx (Nehalem-C) + range/sample: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + range/sample codegen=false 104 / 122 0.5 2023.5 1.0X + range/sample codegen=true 147 / 159 0.3 2879.0 0.7X + */ + } + ignore("stat functions") { val N = 100 << 20 From 38e94b422832031da2413ab952caf37f24617c68 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 7 Mar 2016 17:08:52 +0800 Subject: [PATCH 2/8] Use Java list to avoid weird janino error. --- .../org/apache/spark/util/random/RandomSampler.scala | 6 ++++++ .../apache/spark/sql/execution/basicOperators.scala | 12 ++++++------ .../sql/execution/BenchmarkWholeStageCodegen.scala | 12 ++++++------ 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala index 1314217023d15..06701c4d7a151 100644 --- a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala +++ b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala @@ -19,6 +19,7 @@ package org.apache.spark.util.random import java.util.Random +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag @@ -41,6 +42,11 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** take a random sample */ + def sample(items: java.util.Iterator[T]): Iterator[U] = { + sample(items.asScala) + } + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError("clone() is not implemented.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 8efc4792a39ea..583f0762df463 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -155,8 +155,8 @@ case class Sample( ctx.addMutableState("boolean", needToProduce, s"$needToProduce = true;") rowBuffer = ctx.freshName("rowBuffer") - ctx.addMutableState("scala.collection.mutable.ArrayBuffer", rowBuffer, - s"$rowBuffer = new scala.collection.mutable.ArrayBuffer();") + ctx.addMutableState("java.util.ArrayList", rowBuffer, + s"$rowBuffer = new java.util.ArrayList();") val addToBuffer = ctx.freshName("addToBuffer") ctx.addNewFunction(addToBuffer, @@ -194,7 +194,7 @@ case class Sample( | $loopCount += 1; | } | $sampler.setSeed($randomSeed); - | $sampledIterator = $sampler.sample($rowBuffer.toIterator()); + | $sampledIterator = $sampler.sample($rowBuffer.iterator()); """.stripMargin } else { val samplerClass = classOf[BernoulliCellSampler[UnsafeRow]].getName @@ -204,7 +204,7 @@ case class Sample( s""" | $sampler.setSeed(${seed}L + partitionIndex); - | $sampledIterator = $sampler.sample($rowBuffer.toIterator()); + | $sampledIterator = $sampler.sample($rowBuffer.iterator()); """.stripMargin } @@ -215,7 +215,7 @@ case class Sample( | $needToProduce = false; | } | - | while ($sampledIterator != null && $sampledIterator.hasNext()) { + | while ($sampledIterator.hasNext()) { | UnsafeRow $outputRow = (UnsafeRow)$sampledIterator.next(); | ${consume(ctx, null, outputRow)} | } @@ -233,7 +233,7 @@ case class Sample( s""" | // Convert the input attributes to an UnsafeRow and add it to the iterator. | ${code.code} - | $rowBuffer.$$plus$$eq(${code.value}.copy()); + | $rowBuffer.add(${code.value}.copy()); """.stripMargin.trim } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index baa73ef1af822..4dd01231a981d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -90,22 +90,22 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { sqlContext.range(N).sample(true, 0.8).collect() } /* - Westmere E56xx/L56xx/X56xx (Nehalem-C) + Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz range/sample withRep.: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- - range/sample withRep. codegen=false 149 / 238 0.3 2908.4 1.0X - range/sample withRep. codegen=true 192 / 206 0.3 3751.7 0.8X + range/sample withRep. codegen=false 237 / 322 0.2 4633.6 1.0X + range/sample withRep. codegen=true 232 / 302 0.2 4536.5 1.0X */ runBenchmark("range/sample", N) { sqlContext.range(N).sample(false, 0.8).collect() } /* - Westmere E56xx/L56xx/X56xx (Nehalem-C) + Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz range/sample: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- - range/sample codegen=false 104 / 122 0.5 2023.5 1.0X - range/sample codegen=true 147 / 159 0.3 2879.0 0.7X + range/sample codegen=false 135 / 180 0.4 2628.3 1.0X + range/sample codegen=true 184 / 225 0.3 3592.5 0.7X */ } From 6144810614245912f659c94f5341dd351b08e89e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 9 Mar 2016 14:02:49 +0800 Subject: [PATCH 3/8] Use new RandomSampler API to do wholestage codegen of Sampler. --- .../spark/util/random/RandomSampler.scala | 199 +++++++++++++++--- .../rdd/PartitionwiseSampledRDDSuite.scala | 2 + .../spark/sql/execution/basicOperators.scala | 98 +++------ .../BenchmarkWholeStageCodegen.scala | 24 +-- 4 files changed, 220 insertions(+), 103 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala index 06701c4d7a151..49e1bf14cab0d 100644 --- a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala +++ b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala @@ -19,7 +19,6 @@ package org.apache.spark.util.random import java.util.Random -import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag @@ -42,10 +41,11 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] - /** take a random sample */ - def sample(items: java.util.Iterator[T]): Iterator[U] = { - sample(items.asScala) - } + /** + * Whether to sample the next item or not. + * Return how many times the next item will be sampled. Return 0 if it is not sampled. + */ + def sample(): Int /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = @@ -113,6 +113,28 @@ class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = fals override def setSeed(seed: Long): Unit = rng.setSeed(seed) + override def sample(): Int = { + if (ub - lb <= 0.0) { + if (complement) 1 else 0 + } else { + if (complement) { + val x = rng.nextDouble() + if ((x < lb) || (x >= ub)) { + 1 + } else { + 0 + } + } else { + val x = rng.nextDouble() + if ((x >= lb) && (x < ub)) { + 1 + } else { + 0 + } + } + } + } + override def sample(items: Iterator[T]): Iterator[T] = { if (ub - lb <= 0.0) { if (complement) items else Iterator.empty @@ -161,6 +183,32 @@ class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T override def setSeed(seed: Long): Unit = rng.setSeed(seed) + private val gapSampling: GapSampling = if (fraction > 0.0 && fraction < 1.0) { + new GapSampling(fraction, rng, RandomSampler.rngEpsilon) + } else { + null + } + + override def sample(): Int = { + if (fraction <= 0.0) { + 0 + } else if (fraction >= 1.0) { + 1 + } else if (fraction <= RandomSampler.defaultMaxGapSamplingFraction) { + if (gapSampling.sample()) { + 1 + } else { + 0 + } + } else { + if (rng.nextDouble() <= fraction) { + 1 + } else { + 0 + } + } + } + override def sample(items: Iterator[T]): Iterator[T] = { if (fraction <= 0.0) { Iterator.empty @@ -207,6 +255,23 @@ class PoissonSampler[T: ClassTag]( rngGap.setSeed(seed) } + private val gapSamplingReplacement = if (fraction > 0.0) { + new GapSamplingReplacement(fraction, rngGap, RandomSampler.rngEpsilon) + } else { + null + } + + override def sample(): Int = { + if (fraction <= 0.0) { + 0 + } else if (useGapSamplingIfPossible && + fraction <= RandomSampler.defaultMaxGapSamplingFraction) { + gapSamplingReplacement.sample() + } else { + rng.sample() + } + } + override def sample(items: Iterator[T]): Iterator[T] = { if (fraction <= 0.0) { Iterator.empty @@ -280,11 +345,76 @@ class GapSamplingIterator[T: ClassTag]( } private[spark] -class GapSamplingReplacementIterator[T: ClassTag]( - var data: Iterator[T], +class GapSampling( f: Double, rng: Random = RandomSampler.newDefaultRNG, - epsilon: Double = RandomSampler.rngEpsilon) extends Iterator[T] { + epsilon: Double = RandomSampler.rngEpsilon) extends Serializable { + + require(f > 0.0 && f < 1.0, s"Sampling fraction ($f) must reside on open interval (0, 1)") + require(epsilon > 0.0, s"epsilon ($epsilon) must be > 0") + + private val lnq = math.log1p(-f) + + /** Return true if the next item should be sampled. Otherwise, return false. */ + def sample(): Boolean = { + if (countForDropping > 0) { + countForDropping -= 1 + false + } else { + advance() + true + } + } + + private var countForDropping: Int = 0 + + /** + * Decide the number of elements that won't be sampled, + * according to geometric dist P(k) = (f)(1-f)^k. + */ + private def advance(): Unit = { + val u = math.max(rng.nextDouble(), epsilon) + countForDropping = (math.log(u) / lnq).toInt + } + + /** advance to first sample as part of object construction. */ + advance() +} + +private[spark] +trait PoissonGE { + val f: Double + val rng: Random + + protected val q = math.exp(-f) + + /** + * Sample from Poisson distribution, conditioned such that the sampled value is >= 1. + * This is an adaptation from the algorithm for Generating Poisson distributed random variables: + * http://en.wikipedia.org/wiki/Poisson_distribution + */ + protected def poissonGE1: Int = { + // simulate that the standard poisson sampling + // gave us at least one iteration, for a sample of >= 1 + var pp = q + ((1.0 - q) * rng.nextDouble()) + var r = 1 + + // now continue with standard poisson sampling algorithm + pp *= rng.nextDouble() + while (pp > q) { + r += 1 + pp *= rng.nextDouble() + } + r + } +} + +private[spark] +class GapSamplingReplacementIterator[T: ClassTag]( + var data: Iterator[T], + val f: Double, + val rng: Random = RandomSampler.newDefaultRNG, + epsilon: Double = RandomSampler.rngEpsilon) extends Iterator[T] with PoissonGE { require(f > 0.0, s"Sampling fraction ($f) must be > 0") require(epsilon > 0.0, s"epsilon ($epsilon) must be > 0") @@ -338,31 +468,44 @@ class GapSamplingReplacementIterator[T: ClassTag]( } } - private val q = math.exp(-f) + /** advance to first sample as part of object construction. */ + advance() + // Attempting to invoke this closer to the top with other object initialization + // was causing it to break in strange ways, so I'm invoking it last, which seems to + // work reliably. +} - /** - * Sample from Poisson distribution, conditioned such that the sampled value is >= 1. - * This is an adaptation from the algorithm for Generating Poisson distributed random variables: - * http://en.wikipedia.org/wiki/Poisson_distribution - */ - private def poissonGE1: Int = { - // simulate that the standard poisson sampling - // gave us at least one iteration, for a sample of >= 1 - var pp = q + ((1.0 - q) * rng.nextDouble()) - var r = 1 +private[spark] +class GapSamplingReplacement( + val f: Double, + val rng: Random = RandomSampler.newDefaultRNG, + epsilon: Double = RandomSampler.rngEpsilon) extends PoissonGE with Serializable { - // now continue with standard poisson sampling algorithm - pp *= rng.nextDouble() - while (pp > q) { - r += 1 - pp *= rng.nextDouble() + require(f > 0.0, s"Sampling fraction ($f) must be > 0") + require(epsilon > 0.0, s"epsilon ($epsilon) must be > 0") + + private var countForDropping: Int = 0 + + def sample(): Int = { + if (countForDropping > 0) { + countForDropping -= 1 + 0 + } else { + advance() + poissonGE1 } - r + } + + /** + * Skip elements with replication factor zero (i.e. elements that won't be sampled). + * Samples 'k' from geometric distribution P(k) = (1-q)(q)^k, where q = e^(-f), that is + * q is the probabililty of Poisson(0; f) + */ + private def advance(): Unit = { + val u = math.max(rng.nextDouble(), epsilon) + countForDropping = (math.log(u) / (-f)).toInt } /** advance to first sample as part of object construction. */ advance() - // Attempting to invoke this closer to the top with other object initialization - // was causing it to break in strange ways, so I'm invoking it last, which seems to - // work reliably. } diff --git a/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala index 132a5fa9a80fb..cb0de1c6beb6b 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala @@ -29,6 +29,8 @@ class MockSampler extends RandomSampler[Long, Long] { s = seed } + override def sample(): Int = 1 + override def sample(items: Iterator[Long]): Iterator[Long] = { Iterator(s) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 6344703fdf2f8..f8742a3cd8044 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -161,90 +161,62 @@ case class Sample( private var rowBuffer: String = _ protected override def doProduce(ctx: CodegenContext): String = { - val needToProduce = ctx.freshName("needToProduce") - ctx.addMutableState("boolean", needToProduce, s"$needToProduce = true;") - - rowBuffer = ctx.freshName("rowBuffer") - ctx.addMutableState("java.util.ArrayList", rowBuffer, - s"$rowBuffer = new java.util.ArrayList();") - - val addToBuffer = ctx.freshName("addToBuffer") - ctx.addNewFunction(addToBuffer, - s""" - | private void $addToBuffer() throws java.io.IOException { - | ${child.asInstanceOf[CodegenSupport].produce(ctx, this)} - | } - """.stripMargin.trim) - - val sampledIterator = ctx.freshName("sampledIterator") - ctx.addMutableState("scala.collection.Iterator", sampledIterator, "") + child.asInstanceOf[CodegenSupport].produce(ctx, this) + } - val outputRow = ctx.freshName("outputRow") + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { + val sampler = ctx.freshName("sampler") - val sampleCodes = if (withReplacement) { + if (withReplacement) { val samplerClass = classOf[PoissonSampler[UnsafeRow]].getName val classTag = ctx.freshName("classTag") val classTagClass = "scala.reflect.ClassTag" ctx.addMutableState(classTagClass, classTag, s"$classTag = ($classTagClass)scala.reflect.ClassTag$$.MODULE$$.apply(UnsafeRow.class);") - val sampler = ctx.freshName("sampler") + val initSampler = ctx.freshName("initSampler") ctx.addMutableState(s"$samplerClass", sampler, - s"$sampler = new $samplerClass($upperBound - $lowerBound, false, $classTag);") + s"$initSampler();") val random = ctx.freshName("random") val randomSeed = ctx.freshName("randomSeed") val loopCount = ctx.freshName("loopCount") + + ctx.addNewFunction(initSampler, + s""" + | private void $initSampler() { + | $sampler = new $samplerClass($upperBound - $lowerBound, false, $classTag); + | java.util.Random $random = new java.util.Random(${seed}L); + | long $randomSeed = $random.nextLong(); + | int $loopCount = 0; + | while ($loopCount < partitionIndex) { + | $randomSeed = $random.nextLong(); + | $loopCount += 1; + | } + | $sampler.setSeed($randomSeed); + | } + """.stripMargin.trim) + + val samplingCount = ctx.freshName("samplingCount") s""" - | java.util.Random $random = new java.util.Random(${seed}L); - | long $randomSeed = $random.nextLong(); - | int $loopCount = 0; - | while ($loopCount < partitionIndex) { - | $randomSeed = $random.nextLong(); - | $loopCount += 1; + | int $samplingCount = $sampler.sample(); + | while ($samplingCount-- > 0) { + | ${consume(ctx, input)} | } - | $sampler.setSeed($randomSeed); - | $sampledIterator = $sampler.sample($rowBuffer.iterator()); - """.stripMargin + """.stripMargin.trim } else { val samplerClass = classOf[BernoulliCellSampler[UnsafeRow]].getName - val sampler = ctx.freshName("sampler") ctx.addMutableState(s"$samplerClass", sampler, - s"$sampler = new $samplerClass($lowerBound, $upperBound, false);") + s""" + | $sampler = new $samplerClass($lowerBound, $upperBound, false); + | $sampler.setSeed(${seed}L + partitionIndex); + """.stripMargin.trim) s""" - | $sampler.setSeed(${seed}L + partitionIndex); - | $sampledIterator = $sampler.sample($rowBuffer.iterator()); - """.stripMargin + | if ($sampler.sample() == 0) continue; + | ${consume(ctx, input)} + """.stripMargin.trim } - - s""" - | if ($needToProduce) { - | $addToBuffer(); - | $sampleCodes - | $needToProduce = false; - | } - | - | while ($sampledIterator.hasNext()) { - | UnsafeRow $outputRow = (UnsafeRow)$sampledIterator.next(); - | ${consume(ctx, null, outputRow)} - | } - """.stripMargin - } - - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { - val colExprs = child.output.zipWithIndex.map { case (attr, i) => - BoundReference(i, attr.dataType, attr.nullable) - } - - ctx.currentVars = input - val code = GenerateUnsafeProjection.createCode(ctx, colExprs) - - s""" - | // Convert the input attributes to an UnsafeRow and add it to the iterator. - | ${code.code} - | $rowBuffer.add(${code.value}.copy()); - """.stripMargin.trim } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 4dd01231a981d..1425af8dee776 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -84,28 +84,28 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { */ } - ignore("range/sample") { - val N = 50 << 10 - runBenchmark("range/sample withRep.", N) { - sqlContext.range(N).sample(true, 0.8).collect() + ignore("range/sample/sum") { + val N = 500 << 20 + runBenchmark("range/sample/sum", N) { + sqlContext.range(N).sample(true, 0.8).groupBy().sum().collect() } /* Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz - range/sample withRep.: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + range/sample/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- - range/sample withRep. codegen=false 237 / 322 0.2 4633.6 1.0X - range/sample withRep. codegen=true 232 / 302 0.2 4536.5 1.0X + range/sample/sum codegen=false 55656 / 56490 9.4 106.2 1.0X + range/sample/sum codegen=true 35423 / 35758 14.8 67.6 1.6X */ - runBenchmark("range/sample", N) { - sqlContext.range(N).sample(false, 0.8).collect() + runBenchmark("range/sample/sum", N) { + sqlContext.range(N).sample(false, 0.8).groupBy().sum().collect() } /* Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz - range/sample: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + range/sample/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- - range/sample codegen=false 135 / 180 0.4 2628.3 1.0X - range/sample codegen=true 184 / 225 0.3 3592.5 0.7X + range/sample/sum codegen=false 16460 / 17161 31.9 31.4 1.0X + range/sample/sum codegen=true 4081 / 5390 128.5 7.8 4.0X */ } From 0cb714b0767839e590078f5f35c702dc93438520 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 21 Mar 2016 07:36:28 +0000 Subject: [PATCH 4/8] With updated API. --- .../scala/org/apache/spark/sql/execution/basicOperators.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 82b968f2bd0ec..38eec23eb7725 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -205,7 +205,7 @@ case class Sample( child.asInstanceOf[CodegenSupport].produce(ctx, this) } - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = { val sampler = ctx.freshName("sampler") if (withReplacement) { From fa51f62244e45e5cc3fea24806ccd5a6eff51464 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 31 Mar 2016 10:11:44 +0000 Subject: [PATCH 5/8] Address comments. Mainly remove classtag. --- .../spark/util/random/RandomSampler.scala | 2 +- .../spark/sql/execution/basicOperators.scala | 33 +++++-------------- 2 files changed, 10 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala index 2921b939bc0cf..d397cca4b444d 100644 --- a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala +++ b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala @@ -186,7 +186,7 @@ class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T * @tparam T item type */ @DeveloperApi -class PoissonSampler[T: ClassTag]( +class PoissonSampler[T]( fraction: Double, useGapSamplingIfPossible: Boolean) extends RandomSampler[T, T] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index ef9b6a83570d4..b003f19e57bad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -199,8 +199,6 @@ case class Sample( child.asInstanceOf[CodegenSupport].upstreams() } - private var rowBuffer: String = _ - protected override def doProduce(ctx: CodegenContext): String = { child.asInstanceOf[CodegenSupport].produce(ctx, this) } @@ -210,31 +208,22 @@ case class Sample( if (withReplacement) { val samplerClass = classOf[PoissonSampler[UnsafeRow]].getName - val classTag = ctx.freshName("classTag") - val classTagClass = "scala.reflect.ClassTag" - ctx.addMutableState(classTagClass, classTag, - s"$classTag = ($classTagClass)scala.reflect.ClassTag$$.MODULE$$.apply(UnsafeRow.class);") - val initSampler = ctx.freshName("initSampler") ctx.addMutableState(s"$samplerClass", sampler, s"$initSampler();") - val random = ctx.freshName("random") - val randomSeed = ctx.freshName("randomSeed") - val loopCount = ctx.freshName("loopCount") - ctx.addNewFunction(initSampler, s""" | private void $initSampler() { - | $sampler = new $samplerClass($upperBound - $lowerBound, false, $classTag); - | java.util.Random $random = new java.util.Random(${seed}L); - | long $randomSeed = $random.nextLong(); - | int $loopCount = 0; - | while ($loopCount < partitionIndex) { - | $randomSeed = $random.nextLong(); - | $loopCount += 1; + | $sampler = new $samplerClass($upperBound - $lowerBound, false); + | java.util.Random random = new java.util.Random(${seed}L); + | long randomSeed = random.nextLong(); + | int loopCount = 0; + | while (loopCount < partitionIndex) { + | randomSeed = random.nextLong(); + | loopCount += 1; | } - | $sampler.setSeed($randomSeed); + | $sampler.setSeed(randomSeed); | } """.stripMargin.trim) @@ -340,11 +329,7 @@ case class Range( | // initialize Range | if (!$initTerm) { | $initTerm = true; - | if (partitionIndex != -1) { - | initRange(partitionIndex); - | } else { - | return; - | } + | initRange(partitionIndex); | } | | while (!$overflow && $checkEnd) { From d8a9af8f54c50db82efc889d2c240c90ad9ba55f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 31 Mar 2016 10:39:34 +0000 Subject: [PATCH 6/8] Fix MiMa and add numOutputRow for Sample. --- project/MimaExcludes.scala | 4 ++++ .../org/apache/spark/sql/execution/basicOperators.scala | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 208c7a28cf9bc..2c0bb14cb2237 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -589,6 +589,10 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.util.MLUtils.loadLabeledData"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.optimization.LBFGS.setMaxNumIterations"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.evaluation.BinaryClassificationEvaluator.setScoreCol") + ) ++ Seq( + // [SPARK-13674][SQL] Add wholestage codegen support to Sample + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.util.random.PoissonSampler.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.util.random.PoissonSampler.this") ) case v if v.startsWith("1.6") => Seq( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index b003f19e57bad..9421d5f130ce3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -181,6 +181,9 @@ case class Sample( child: SparkPlan) extends UnaryNode with CodegenSupport { override def output: Seq[Attribute] = child.output + private[sql] override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + protected override def doExecute(): RDD[InternalRow] = { if (withReplacement) { // Disable gap sampling since the gap sampling method buffers two rows internally, @@ -204,6 +207,7 @@ case class Sample( } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { + val numOutput = metricTerm(ctx, "numOutputRows") val sampler = ctx.freshName("sampler") if (withReplacement) { @@ -231,6 +235,7 @@ case class Sample( s""" | int $samplingCount = $sampler.sample(); | while ($samplingCount-- > 0) { + | $numOutput.add(1); | ${consume(ctx, input)} | } """.stripMargin.trim @@ -244,6 +249,7 @@ case class Sample( s""" | if ($sampler.sample() == 0) continue; + | $numOutput.add(1); | ${consume(ctx, input)} """.stripMargin.trim } From ef588db4a8f8d9d742225c16dbad9d8cb17e2c71 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 1 Apr 2016 05:02:57 +0000 Subject: [PATCH 7/8] Consolidate two init APIs. --- .../sql/execution/BufferedRowIterator.java | 10 +----- .../sql/execution/WholeStageCodegen.scala | 8 +++-- .../BenchmarkWholeStageCodegen.scala | 35 +++++++++++++++++++ 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java b/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java index 90cb4cbd9216f..c2633a9f8cd48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java @@ -60,15 +60,7 @@ public long durationMs() { /** * Initializes from array of iterators of InternalRow. */ - public abstract void init(Iterator iters[]); - - /** - * Initializes from array of iterators of InternalRow. - */ - public void init(int index, Iterator iters[]) { - partitionIndex = index; - init(iters); - } + public abstract void init(int index, Iterator iters[]); /** * Append a row to currentRows. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index 689ffde9fb926..9bdf611f6e536 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution -import org.apache.spark.broadcast +import org.apache.spark.{broadcast, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -323,7 +323,8 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup this.references = references; } - public void init(scala.collection.Iterator inputs[]) { + public void init(int index, scala.collection.Iterator inputs[]) { + partitionIndex = index; ${ctx.initMutableStates()} } @@ -367,9 +368,10 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup } else { // Right now, we support up to two upstreams. rdds.head.zipPartitions(rdds(1)) { (leftIter, rightIter) => + val partitionIndex = TaskContext.getPartitionId() val clazz = CodeGenerator.compile(cleanedSource) val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator] - buffer.init(Array(leftIter, rightIter)) + buffer.init(partitionIndex, Array(leftIter, rightIter)) new Iterator[InternalRow] { override def hasNext: Boolean = { val v = buffer.hasNext diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 1dc0ba0fa5891..462c2ef5ffe64 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -110,6 +110,41 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { */ } + ignore("sort merge join/sample") { + val N = 2 << 20 + runBenchmark("sort merge join", N) { + val df1 = sqlContext.range(N) + .selectExpr(s"(id * 15485863) % ${N*10} as k1") + val df2 = sqlContext.range(N) + .selectExpr(s"(id * 15485867) % ${N*10} as k2") + df1.join(df2, col("k1") === col("k2")).sample(true, 0.8).count() + } + + /** + Westmere E56xx/L56xx/X56xx (Nehalem-C) + sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + sort merge join codegen=false 6925 / 7130 0.3 3301.9 1.0X + sort merge join codegen=true 6584 / 6876 0.3 3139.5 1.1X + */ + + runBenchmark("sort merge join", N) { + val df1 = sqlContext.range(N) + .selectExpr(s"(id * 15485863) % ${N*10} as k1") + val df2 = sqlContext.range(N) + .selectExpr(s"(id * 15485867) % ${N*10} as k2") + df1.join(df2, col("k1") === col("k2")).sample(false, 0.8).count() + } + + /** + Westmere E56xx/L56xx/X56xx (Nehalem-C) + sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + sort merge join codegen=false 6787 / 6939 0.3 3236.2 1.0X + sort merge join codegen=true 6623 / 6774 0.3 3158.1 1.0X + */ + } + ignore("stat functions") { val N = 100L << 20 From 12e1b37e076ca7161bae66ecec99938f1cd6e813 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 1 Apr 2016 08:32:17 +0000 Subject: [PATCH 8/8] Update tests. --- .../BenchmarkWholeStageCodegen.scala | 51 +++---------------- 1 file changed, 8 insertions(+), 43 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 462c2ef5ffe64..55906793c0b81 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -88,63 +88,28 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { ignore("range/sample/sum") { val N = 500 << 20 runBenchmark("range/sample/sum", N) { - sqlContext.range(N).sample(true, 0.8).groupBy().sum().collect() + sqlContext.range(N).sample(true, 0.01).groupBy().sum().collect() } /* - Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz + Westmere E56xx/L56xx/X56xx (Nehalem-C) range/sample/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- - range/sample/sum codegen=false 55656 / 56490 9.4 106.2 1.0X - range/sample/sum codegen=true 35423 / 35758 14.8 67.6 1.6X + range/sample/sum codegen=false 53888 / 56592 9.7 102.8 1.0X + range/sample/sum codegen=true 41614 / 42607 12.6 79.4 1.3X */ runBenchmark("range/sample/sum", N) { - sqlContext.range(N).sample(false, 0.8).groupBy().sum().collect() + sqlContext.range(N).sample(false, 0.01).groupBy().sum().collect() } /* - Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz + Westmere E56xx/L56xx/X56xx (Nehalem-C) range/sample/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- - range/sample/sum codegen=false 16460 / 17161 31.9 31.4 1.0X - range/sample/sum codegen=true 4081 / 5390 128.5 7.8 4.0X + range/sample/sum codegen=false 12982 / 13384 40.4 24.8 1.0X + range/sample/sum codegen=true 7074 / 7383 74.1 13.5 1.8X */ } - ignore("sort merge join/sample") { - val N = 2 << 20 - runBenchmark("sort merge join", N) { - val df1 = sqlContext.range(N) - .selectExpr(s"(id * 15485863) % ${N*10} as k1") - val df2 = sqlContext.range(N) - .selectExpr(s"(id * 15485867) % ${N*10} as k2") - df1.join(df2, col("k1") === col("k2")).sample(true, 0.8).count() - } - - /** - Westmere E56xx/L56xx/X56xx (Nehalem-C) - sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - sort merge join codegen=false 6925 / 7130 0.3 3301.9 1.0X - sort merge join codegen=true 6584 / 6876 0.3 3139.5 1.1X - */ - - runBenchmark("sort merge join", N) { - val df1 = sqlContext.range(N) - .selectExpr(s"(id * 15485863) % ${N*10} as k1") - val df2 = sqlContext.range(N) - .selectExpr(s"(id * 15485867) % ${N*10} as k2") - df1.join(df2, col("k1") === col("k2")).sample(false, 0.8).count() - } - - /** - Westmere E56xx/L56xx/X56xx (Nehalem-C) - sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - sort merge join codegen=false 6787 / 6939 0.3 3236.2 1.0X - sort merge join codegen=true 6623 / 6774 0.3 3158.1 1.0X - */ - } - ignore("stat functions") { val N = 100L << 20