From 9753676f423d2df2e6ac4824520d323094d5d328 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 25 Jul 2020 00:14:33 -0700 Subject: [PATCH 1/3] [SPARK-32436][CORE] Remove unused HighlyCompressedMapStatus.numNonEmptyBlocks --- .../scala/org/apache/spark/scheduler/MapStatus.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 0af3a2e171906..37a71b912a967 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -161,7 +161,6 @@ private[spark] class CompressedMapStatus( * plus a bitmap for tracking which blocks are empty. * * @param loc location where the task is being executed - * @param numNonEmptyBlocks the number of non-empty blocks * @param emptyBlocks a bitmap tracking which blocks are empty * @param avgSize average size of the non-empty and non-huge blocks * @param hugeBlockSizes sizes of huge blocks by their reduceId. @@ -169,7 +168,6 @@ private[spark] class CompressedMapStatus( */ private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, - private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, private[this] var avgSize: Long, private[this] var hugeBlockSizes: scala.collection.Map[Int, Byte], @@ -177,11 +175,10 @@ private[spark] class HighlyCompressedMapStatus private ( extends MapStatus with Externalizable { // loc could be null when the default constructor is called during deserialization - require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 - || numNonEmptyBlocks == 0 || _mapTaskId > 0, + require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 || _mapTaskId > 0, "Average size can only be zero for map stages that produced no output") - protected def this() = this(null, -1, null, -1, null, -1) // For deserialization only + protected def this() = this(null, null, -1, null, -1) // For deserialization only override def location: BlockManagerId = loc @@ -240,7 +237,6 @@ private[spark] object HighlyCompressedMapStatus { // We must keep track of which blocks are empty so that we don't report a zero-sized // block as being non-empty (or vice-versa) when using the average block size. var i = 0 - var numNonEmptyBlocks: Int = 0 var numSmallBlocks: Int = 0 var totalSmallBlockSize: Long = 0 // From a compression standpoint, it shouldn't matter whether we track empty or non-empty @@ -255,7 +251,6 @@ private[spark] object HighlyCompressedMapStatus { while (i < totalNumBlocks) { val size = uncompressedSizes(i) if (size > 0) { - numNonEmptyBlocks += 1 // Huge blocks are not included in the calculation for average size, thus size for smaller // blocks is more accurate. if (size < threshold) { @@ -276,7 +271,6 @@ private[spark] object HighlyCompressedMapStatus { } emptyBlocks.trim() emptyBlocks.runOptimize() - new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, - hugeBlockSizes, mapTaskId) + new HighlyCompressedMapStatus(loc, emptyBlocks, avgSize, hugeBlockSizes, mapTaskId) } } From 2bbfa4e27052c4f625d744851fcd11f58e5ff66b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 25 Jul 2020 00:55:16 -0700 Subject: [PATCH 2/3] Revert "[SPARK-32436][CORE] Remove unused HighlyCompressedMapStatus.numNonEmptyBlocks" This reverts commit 9753676f423d2df2e6ac4824520d323094d5d328. --- .../scala/org/apache/spark/scheduler/MapStatus.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 37a71b912a967..0af3a2e171906 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -161,6 +161,7 @@ private[spark] class CompressedMapStatus( * plus a bitmap for tracking which blocks are empty. * * @param loc location where the task is being executed + * @param numNonEmptyBlocks the number of non-empty blocks * @param emptyBlocks a bitmap tracking which blocks are empty * @param avgSize average size of the non-empty and non-huge blocks * @param hugeBlockSizes sizes of huge blocks by their reduceId. @@ -168,6 +169,7 @@ private[spark] class CompressedMapStatus( */ private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, + private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, private[this] var avgSize: Long, private[this] var hugeBlockSizes: scala.collection.Map[Int, Byte], @@ -175,10 +177,11 @@ private[spark] class HighlyCompressedMapStatus private ( extends MapStatus with Externalizable { // loc could be null when the default constructor is called during deserialization - require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 || _mapTaskId > 0, + require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 + || numNonEmptyBlocks == 0 || _mapTaskId > 0, "Average size can only be zero for map stages that produced no output") - protected def this() = this(null, null, -1, null, -1) // For deserialization only + protected def this() = this(null, -1, null, -1, null, -1) // For deserialization only override def location: BlockManagerId = loc @@ -237,6 +240,7 @@ private[spark] object HighlyCompressedMapStatus { // We must keep track of which blocks are empty so that we don't report a zero-sized // block as being non-empty (or vice-versa) when using the average block size. var i = 0 + var numNonEmptyBlocks: Int = 0 var numSmallBlocks: Int = 0 var totalSmallBlockSize: Long = 0 // From a compression standpoint, it shouldn't matter whether we track empty or non-empty @@ -251,6 +255,7 @@ private[spark] object HighlyCompressedMapStatus { while (i < totalNumBlocks) { val size = uncompressedSizes(i) if (size > 0) { + numNonEmptyBlocks += 1 // Huge blocks are not included in the calculation for average size, thus size for smaller // blocks is more accurate. if (size < threshold) { @@ -271,6 +276,7 @@ private[spark] object HighlyCompressedMapStatus { } emptyBlocks.trim() emptyBlocks.runOptimize() - new HighlyCompressedMapStatus(loc, emptyBlocks, avgSize, hugeBlockSizes, mapTaskId) + new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, + hugeBlockSizes, mapTaskId) } } From 665401396cce866e4490ad18e247eab88946f393 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 25 Jul 2020 00:57:01 -0700 Subject: [PATCH 3/3] fix --- core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 0af3a2e171906..04ed6bbdd0c28 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -217,6 +217,7 @@ private[spark] class HighlyCompressedMapStatus private ( override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) + numNonEmptyBlocks = -1 // SPARK-32436 Scala 2.13 doesn't initialize this during deserialization emptyBlocks = new RoaringBitmap() emptyBlocks.readExternal(in) avgSize = in.readLong()