Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 27 additions & 24 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,19 @@ object Partitioner {
/**
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
*
* If any of the RDDs already has a partitioner, and the number of partitions of the
* partitioner is either greater than or is less than and within a single order of
* magnitude of the max number of upstream partitions, choose that one.
* If spark.default.parallelism is set, we'll use the value of SparkContext defaultParallelism
* as the default partitions number, otherwise we'll use the max number of upstream partitions.
*
* Otherwise, we use a default HashPartitioner. For the number of partitions, if
* spark.default.parallelism is set, then we'll use the value from SparkContext
* defaultParallelism, otherwise we'll use the max number of upstream partitions.
* When available, we choose the partitioner from rdds with maximum number of partitions. If this
* partitioner is eligible (number of partitions within an order of maximum number of partitions
* in rdds), or has partition number higher than default partitions number - we use this
* partitioner.
*
* Unless spark.default.parallelism is set, the number of partitions will be the
* same as the number of partitions in the largest upstream RDD, as this should
* be least likely to cause out-of-memory errors.
* Otherwise, we'll use a new HashPartitioner with the default partitions number.
*
* Unless spark.default.parallelism is set, the number of partitions will be the same as the
* number of partitions in the largest upstream RDD, as this should be least likely to cause
* out-of-memory errors.
*
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*/
Expand All @@ -67,31 +69,32 @@ object Partitioner {
None
}

if (isEligiblePartitioner(hasMaxPartitioner, rdds)) {
val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
rdd.context.defaultParallelism
} else {
rdds.map(_.partitions.length).max
}

// If the existing max partitioner is an eligible one, or its partitions number is larger
// than the default number of partitions, use the existing partitioner.
if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the core change. I think it makes sense as it fixes a regression in #20002

If the partitioner is not eligible, but its numPartition is larger the the default one, we should still pick this partitioner instead of creating a new one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are multiple cases here.

a) spark.default.parallelism is not set by user.
For this case, PR is a noop

b) maxPartitions is atleast an order higher than max partitioner

b.1) If spark.default.parallelism is not set, the PR is a noop.

b.2) spark.default.parallelism is explicitly set by user.

This is a change in behavior which has been introduced - rely on user specified value instead of trying to infer it when inferred value is off by atleast an order.

If users were setting suboptimal values for "spark.default.parallelism" - then there will be a change in behavior - though I would argue this is the expected behavior given documentation of 'spark.default.parallelism'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on how you define "default". In this case, if we can benefit from reusing an existing partitioner, we should pick that partitioner. If we want to respect spark.default.parallelism strictly, we should not reuse partitioner at all.

For this particular case, picking the existing partitioner is obviously a better choice and it was the behavior before #20002 , so I'm +1 on this change.

Copy link
Contributor

@mridulm mridulm Jan 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Edited, hopefully, for clarity]

It depends on how you define "default".

I dont see an ambiguity here - am I missing something ?
To rephrase my point - this proposed PR has an impact only if user has explicitly set 'spark.default.parallelism' - else it is a noop.
If this is not the case (other than desired behavior of SPARK-22465), I might be missing something; do let me know !

What is the concern here ? Users have set incorrect values for spark.default.parallelism ?

If we want to respect spark.default.parallelism strictly, we should not reuse partitioner at all.

I agree with you - we should not have - except that ship has sailed long long time back - since atleast 0.5 this has been the behavior in spark - I dont have context before that.
Historically, default parallelism was added later - using "largest partitioner if set or largest partition size when no partitioner is set" was the behavior. When default parallelism was introduced, probably (I guess) for backward compatible, the behavior was continued.

#20002 surgically fixed only the case when inferred partition size was off by atleast an order.
When it is off by an order - dont rely on largest partitioner, it is not useful due to OOM's.
In this case, if user has explicitly specified spark.default.parallelism, rely on user provided value - else preserve existing behavior of picking largest partition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we all agree that reusing partitioner is an existing behavior and we should not stick to spark.default.parallelism here.

#20002 is good as it fixes a bad case where reusing partitioner slows down the query. And this PR surgically fixed one regression introduced by #20002 that, even if the existing partitioner is not eligible(has very little partitions), it's still better than fallback to default parallelism.

hasMaxPartitioner.get.partitioner.get
} else {
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(rdds.map(_.partitions.length).max)
}
new HashPartitioner(defaultNumPartitions)
}
}

/**
* Returns true if the number of partitions of the RDD is either greater
* than or is less than and within a single order of magnitude of the
* max number of upstream partitions;
* otherwise, returns false
* Returns true if the number of partitions of the RDD is either greater than or is less than and
* within a single order of magnitude of the max number of upstream partitions, otherwise returns
* false.
*/
private def isEligiblePartitioner(
hasMaxPartitioner: Option[RDD[_]],
hasMaxPartitioner: RDD[_],
rdds: Seq[RDD[_]]): Boolean = {
if (hasMaxPartitioner.isEmpty) {
return false
}
val maxPartitions = rdds.map(_.partitions.length).max
log10(maxPartitions) - log10(hasMaxPartitioner.get.getNumPartitions) < 1
log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1
}
}

Expand Down
44 changes: 38 additions & 6 deletions core/src/test/scala/org/apache/spark/PartitioningSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,11 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva

test("defaultPartitioner") {
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 150)
val rdd2 = sc
.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
val rdd2 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
.partitionBy(new HashPartitioner(10))
val rdd3 = sc
.parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 14)))
val rdd3 = sc.parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 14)))
.partitionBy(new HashPartitioner(100))
val rdd4 = sc
.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
val rdd4 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
.partitionBy(new HashPartitioner(9))
val rdd5 = sc.parallelize((1 to 10).map(x => (x, x)), 11)

Expand All @@ -284,7 +281,42 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva
assert(partitioner3.numPartitions == rdd3.getNumPartitions)
assert(partitioner4.numPartitions == rdd3.getNumPartitions)
assert(partitioner5.numPartitions == rdd4.getNumPartitions)
}

test("defaultPartitioner when defaultParallelism is set") {
assert(!sc.conf.contains("spark.default.parallelism"))
try {
sc.conf.set("spark.default.parallelism", "4")

val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 150)
val rdd2 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
.partitionBy(new HashPartitioner(10))
val rdd3 = sc.parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 14)))
.partitionBy(new HashPartitioner(100))
val rdd4 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
.partitionBy(new HashPartitioner(9))
val rdd5 = sc.parallelize((1 to 10).map(x => (x, x)), 11)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a case where partitioner is not used and default (from spark.default.parallelism) gets used ?
For example, something like the following pseudo

val rdd6 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4))).partitionBy(new HashPartitioner(3))

...
Partitioner.defaultPartitioner(rdd1, rdd6).numPartitions == sc.conf.get("spark.default.parallelism").toInt

val rdd6 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
.partitionBy(new HashPartitioner(3))

val partitioner1 = Partitioner.defaultPartitioner(rdd1, rdd2)
val partitioner2 = Partitioner.defaultPartitioner(rdd2, rdd3)
val partitioner3 = Partitioner.defaultPartitioner(rdd3, rdd1)
val partitioner4 = Partitioner.defaultPartitioner(rdd1, rdd2, rdd3)
val partitioner5 = Partitioner.defaultPartitioner(rdd4, rdd5)
val partitioner6 = Partitioner.defaultPartitioner(rdd5, rdd5)
val partitioner7 = Partitioner.defaultPartitioner(rdd1, rdd6)

assert(partitioner1.numPartitions == rdd2.getNumPartitions)
assert(partitioner2.numPartitions == rdd3.getNumPartitions)
assert(partitioner3.numPartitions == rdd3.getNumPartitions)
assert(partitioner4.numPartitions == rdd3.getNumPartitions)
assert(partitioner5.numPartitions == rdd4.getNumPartitions)
assert(partitioner6.numPartitions == sc.defaultParallelism)
assert(partitioner7.numPartitions == sc.defaultParallelism)
} finally {
sc.conf.remove("spark.default.parallelism")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
}

// See SPARK-22465
test("cogroup between multiple RDD" +
" with number of partitions similar in order of magnitude") {
test("cogroup between multiple RDD with number of partitions similar in order of magnitude") {
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
val rdd2 = sc
.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
Expand All @@ -332,6 +331,48 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
assert(joined.getNumPartitions == rdd2.getNumPartitions)
}

test("cogroup between multiple RDD when defaultParallelism is set without proper partitioner") {
assert(!sc.conf.contains("spark.default.parallelism"))
try {
sc.conf.set("spark.default.parallelism", "4")
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)), 10)
val joined = rdd1.cogroup(rdd2)
assert(joined.getNumPartitions == sc.defaultParallelism)
} finally {
sc.conf.remove("spark.default.parallelism")
}
}

test("cogroup between multiple RDD when defaultParallelism is set with proper partitioner") {
assert(!sc.conf.contains("spark.default.parallelism"))
try {
sc.conf.set("spark.default.parallelism", "4")
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
.partitionBy(new HashPartitioner(10))
val joined = rdd1.cogroup(rdd2)
assert(joined.getNumPartitions == rdd2.getNumPartitions)
} finally {
sc.conf.remove("spark.default.parallelism")
}
}

test("cogroup between multiple RDD when defaultParallelism is set; with huge number of " +
"partitions in upstream RDDs") {
assert(!sc.conf.contains("spark.default.parallelism"))
try {
sc.conf.set("spark.default.parallelism", "4")
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 1000)
val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
.partitionBy(new HashPartitioner(10))
val joined = rdd1.cogroup(rdd2)
assert(joined.getNumPartitions == rdd2.getNumPartitions)
} finally {
sc.conf.remove("spark.default.parallelism")
}
}

test("rightOuterJoin") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
Expand Down