Skip to content

[SPARK-32436][CORE] Initialize numNonEmptyBlocks in HighlyCompressedMapStatus.readExternal#29231

Closed
dongjoon-hyun wants to merge 3 commits intoapache:masterfrom
dongjoon-hyun:SPARK-32436
Closed

[SPARK-32436][CORE] Initialize numNonEmptyBlocks in HighlyCompressedMapStatus.readExternal#29231
dongjoon-hyun wants to merge 3 commits intoapache:masterfrom
dongjoon-hyun:SPARK-32436

Conversation

@dongjoon-hyun
Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun commented Jul 25, 2020

What changes were proposed in this pull request?

This PR aims to initialize numNonEmptyBlocks in HighlyCompressedMapStatus.readExternal.

In Scala 2.12, this is initialized to -1 via the following.

protected def this() = this(null, -1, null, -1, null, -1)  // For deserialization only

Why are the changes needed?

In Scala 2.13, this causes several UT failures because HighlyCompressedMapStatus.readExternal doesn't initialize this field. The following is one example.

  • org.apache.spark.scheduler.MapStatusSuite
MapStatusSuite:
- compressSize
- decompressSize
*** RUN ABORTED ***
  java.lang.NoSuchFieldError: numNonEmptyBlocks
  at org.apache.spark.scheduler.HighlyCompressedMapStatus.<init>(MapStatus.scala:181)
  at org.apache.spark.scheduler.HighlyCompressedMapStatus$.apply(MapStatus.scala:281)
  at org.apache.spark.scheduler.MapStatus$.apply(MapStatus.scala:73)
  at org.apache.spark.scheduler.MapStatusSuite.$anonfun$new$8(MapStatusSuite.scala:64)
  at scala.runtime.java8.JFunction1$mcVD$sp.apply(JFunction1$mcVD$sp.scala:18)
  at scala.collection.immutable.List.foreach(List.scala:333)
  at org.apache.spark.scheduler.MapStatusSuite.$anonfun$new$7(MapStatusSuite.scala:61)
  at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.scala:18)
  at scala.collection.immutable.List.foreach(List.scala:333)
  at org.apache.spark.scheduler.MapStatusSuite.$anonfun$new$6(MapStatusSuite.scala:60)
  ...

Does this PR introduce any user-facing change?

No. This is a private class.

How was this patch tested?

  1. Pass the GitHub Action or Jenkins with the existing tests.
  2. Test with Scala-2.13 with MapStatusSuite.
$ dev/change-scala-version.sh 2.13
$ build/mvn test -pl core --am -Pscala-2.13 -Dtest=none -DwildcardSuites=org.apache.spark.scheduler.MapStatusSuite
...
MapStatusSuite:
- compressSize
- decompressSize
- MapStatus should never report non-empty blocks' sizes as 0
- large tasks should use org.apache.spark.scheduler.HighlyCompressedMapStatus
- HighlyCompressedMapStatus: estimated size should be the average non-empty block size
- SPARK-22540: ensure HighlyCompressedMapStatus calculates correct avgSize
- RoaringBitmap: runOptimize succeeded
- RoaringBitmap: runOptimize failed
- Blocks which are bigger than SHUFFLE_ACCURATE_BLOCK_THRESHOLD should not be underestimated.
- SPARK-21133 HighlyCompressedMapStatus#writeExternal throws NPE
Run completed in 7 seconds, 971 milliseconds.
Total number of tests run: 10
Suites: completed 2, aborted 0
Tests: succeeded 10, failed 0, canceled 0, ignored 0, pending 0
All tests passed.

@dongjoon-hyun dongjoon-hyun reopened this Jul 25, 2020
@dongjoon-hyun dongjoon-hyun changed the title [SPARK-32436][CORE] Remove unused HighlyCompressedMapStatus.numNonEmptyBlocks [SPARK-32436][CORE] Initialize numNonEmptyBlocks in HighlyCompressedMapStatus.readExternal Jul 25, 2020
@SparkQA

This comment has been minimized.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Jul 25, 2020

Test build #126535 has finished for PR 29231 at commit 6654013.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Copy Markdown
Member Author

Could you review this, @HyukjinKwon ?

@dongjoon-hyun
Copy link
Copy Markdown
Member Author

Also, cc @srowen .

@srowen
Copy link
Copy Markdown
Member

srowen commented Jul 25, 2020

If the fix works, great. I think this might help me understand another failure in the streaming component.
I'm trying to understand why it fails now. Externalizable should still start by constructing this object with the no-arg constructor, which does init this field. I think the key is that Externalizable wants a public no-arg constructor and this isn't public, and for some reason that is now a problem.

Other possible fixes? make it public (the class is private anyway).

@dongjoon-hyun
Copy link
Copy Markdown
Member Author

Let me try, @srowen .

Other possible fixes? make it public (the class is private anyway).

@dongjoon-hyun
Copy link
Copy Markdown
Member Author

I tried to remove all constraints; 1) public no-op constructor, 2) public constructor, 3) public class. But, it turns out that it cannot resolve this issue.

-private[spark] class HighlyCompressedMapStatus private (
+class HighlyCompressedMapStatus(
     private[this] var loc: BlockManagerId,
     private[this] var numNonEmptyBlocks: Int,
     private[this] var emptyBlocks: RoaringBitmap,
@@ -181,7 +181,7 @@ private[spark] class HighlyCompressedMapStatus private (
     || numNonEmptyBlocks == 0 || _mapTaskId > 0,
     "Average size can only be zero for map stages that produced no output")

-  protected def this() = this(null, -1, null, -1, null, -1)  // For deserialization only
+  def this() = this(null, -1, null, -1, null, -1)  // For deserialization only

   override def location: BlockManagerId = loc

@@ -217,7 +217,7 @@ private[spark] class HighlyCompressedMapStatus private (

   override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
     loc = BlockManagerId(in)
-    numNonEmptyBlocks = -1 // SPARK-32436 Scala 2.13 doesn't initialize this during deserialization
+    // numNonEmptyBlocks = -1 // SPARK-32436 Scala 2.13 doesn't initialize this during deserialization

@dongjoon-hyun
Copy link
Copy Markdown
Member Author

This PR technically doesn't change the logic. In other words, the value will be the same in Scala 2.12 and 2.13. Can we move forward with AS-IS patch?

BTW, RDDSuite has another failure which I'm working on now.

@dongjoon-hyun
Copy link
Copy Markdown
Member Author

I updated the PR description by focusing on MapStatusSuite to be more clear.

@srowen
Copy link
Copy Markdown
Member

srowen commented Jul 25, 2020

Hm, weird. I still don't understand why this behavior is different in 2.13. OK, go ahead.

@dongjoon-hyun
Copy link
Copy Markdown
Member Author

Thanks! Merged to master.

@HyukjinKwon
Copy link
Copy Markdown
Member

LGTM

@dongjoon-hyun
Copy link
Copy Markdown
Member Author

Thank you, @HyukjinKwon .

@mridulm
Copy link
Copy Markdown
Contributor

mridulm commented Aug 10, 2020

@dongjoon-hyun Circling through older PR's ... do we know why this is happening ?
More than the specifics of this class, I am more concerned for other classes where the initialization might not be happening, and we are not (yet) detecting the issue.

@dongjoon-hyun
Copy link
Copy Markdown
Member Author

dongjoon-hyun commented Aug 10, 2020

Hi, @mridulm ! This was Scala 2.13 and 2.12.12 bug.

I believe they will fix the bug in the next releases. Then, we don't need to detect or change inside Apache Spark.

@mridulm
Copy link
Copy Markdown
Contributor

mridulm commented Aug 11, 2020

Thanks @dongjoon-hyun !

@dossett
Copy link
Copy Markdown

dossett commented Oct 16, 2020

@dongjoon-hyun Will this be included in a Spark 3.0.x release or is the plan to wait for a fix on the scala side? I ran into this very issue today, so just wondering. Thank you.

@srowen
Copy link
Copy Markdown
Member

srowen commented Oct 16, 2020

This only seems to affect scala 2.13, regardless, and only 3.1.x supports scala 2.13, so no there isn't a need to put it in 3.0.x. The workaround doesn't require a scala fix if any, but, that may also resolve it anyway.

@dossett
Copy link
Copy Markdown

dossett commented Oct 16, 2020

Thank you @srowen, the environment I saw this on was running spark 3.0.1 and scala 2.12.12. If I can reproduce it today I can share a stack trace and other details if that would be helpful.

@dossett
Copy link
Copy Markdown

dossett commented Oct 16, 2020

Running on GCP's dataproc 2.0:

aniskodedossett@dossett-delta-w-0:~$ spark-sql --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.1
      /_/

Using Scala version 2.12.12, OpenJDK 64-Bit Server VM, 1.8.0_265
Branch HEAD
Compiled by user  on 2020-09-17T10:01:54Z
Revision 0aee93de8ef2a90403093b91843de9777b7ab5ef
Url https://bigdataoss-internal.googlesource.com/third_party/apache/spark
Type --help for more information.

I'm playing with the Databricks Delta Lake a simple vacuum command fails with a long stack trace with this at the bottom:

Caused by: java.lang.NoSuchFieldError: numNonEmptyBlocks
	at org.apache.spark.scheduler.HighlyCompressedMapStatus.<init>(MapStatus.scala:174)
	at org.apache.spark.scheduler.HighlyCompressedMapStatus$.apply(MapStatus.scala:269)
	at org.apache.spark.scheduler.MapStatus$.apply(MapStatus.scala:70)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:71)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

If this is helpful I'm happy to provide more information.

@srowen
Copy link
Copy Markdown
Member

srowen commented Oct 16, 2020

That is strange - it doesn't seem to happen in Spark unit tests in 2.12. But Spark is on 2.12.10. From the links above, it seems like it could be an issue in 2.12.12. Therefore @dongjoon-hyun it might be useful to backport this just in case? It's a small change, and i think what it does is prevent the compiler from (incorrectly?) eliding the field during compilation. Even when that's fixed this change doesn't hurt anything really.

@dongjoon-hyun
Copy link
Copy Markdown
Member Author

I agree with you guys, @srowen and @dossett ! Sure, I'll test and backport this.

@dongjoon-hyun
Copy link
Copy Markdown
Member Author

BTW, for the related Scala issues, I linked here.

dongjoon-hyun added a commit that referenced this pull request Oct 16, 2020
…apStatus.readExternal

### What changes were proposed in this pull request?

This PR aims to initialize `numNonEmptyBlocks` in `HighlyCompressedMapStatus.readExternal`.

In Scala 2.12, this is initialized to `-1` via the following.
```scala
protected def this() = this(null, -1, null, -1, null, -1)  // For deserialization only
```

### Why are the changes needed?

In Scala 2.13, this causes several UT failures because `HighlyCompressedMapStatus.readExternal` doesn't initialize this field. The following is one example.

- org.apache.spark.scheduler.MapStatusSuite
```
MapStatusSuite:
- compressSize
- decompressSize
*** RUN ABORTED ***
  java.lang.NoSuchFieldError: numNonEmptyBlocks
  at org.apache.spark.scheduler.HighlyCompressedMapStatus.<init>(MapStatus.scala:181)
  at org.apache.spark.scheduler.HighlyCompressedMapStatus$.apply(MapStatus.scala:281)
  at org.apache.spark.scheduler.MapStatus$.apply(MapStatus.scala:73)
  at org.apache.spark.scheduler.MapStatusSuite.$anonfun$new$8(MapStatusSuite.scala:64)
  at scala.runtime.java8.JFunction1$mcVD$sp.apply(JFunction1$mcVD$sp.scala:18)
  at scala.collection.immutable.List.foreach(List.scala:333)
  at org.apache.spark.scheduler.MapStatusSuite.$anonfun$new$7(MapStatusSuite.scala:61)
  at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.scala:18)
  at scala.collection.immutable.List.foreach(List.scala:333)
  at org.apache.spark.scheduler.MapStatusSuite.$anonfun$new$6(MapStatusSuite.scala:60)
  ...
```

### Does this PR introduce _any_ user-facing change?

No. This is a private class.

### How was this patch tested?

1. Pass the GitHub Action or Jenkins with the existing tests.
2. Test with Scala-2.13 with `MapStatusSuite`.
```
$ dev/change-scala-version.sh 2.13
$ build/mvn test -pl core --am -Pscala-2.13 -Dtest=none -DwildcardSuites=org.apache.spark.scheduler.MapStatusSuite
...
MapStatusSuite:
- compressSize
- decompressSize
- MapStatus should never report non-empty blocks' sizes as 0
- large tasks should use org.apache.spark.scheduler.HighlyCompressedMapStatus
- HighlyCompressedMapStatus: estimated size should be the average non-empty block size
- SPARK-22540: ensure HighlyCompressedMapStatus calculates correct avgSize
- RoaringBitmap: runOptimize succeeded
- RoaringBitmap: runOptimize failed
- Blocks which are bigger than SHUFFLE_ACCURATE_BLOCK_THRESHOLD should not be underestimated.
- SPARK-21133 HighlyCompressedMapStatus#writeExternal throws NPE
Run completed in 7 seconds, 971 milliseconds.
Total number of tests run: 10
Suites: completed 2, aborted 0
Tests: succeeded 10, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

Closes #29231 from dongjoon-hyun/SPARK-32436.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit f9f1867)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@dongjoon-hyun
Copy link
Copy Markdown
Member Author

This lands at branch-3.0 now.

holdenk pushed a commit to holdenk/spark that referenced this pull request Oct 27, 2020
…apStatus.readExternal

### What changes were proposed in this pull request?

This PR aims to initialize `numNonEmptyBlocks` in `HighlyCompressedMapStatus.readExternal`.

In Scala 2.12, this is initialized to `-1` via the following.
```scala
protected def this() = this(null, -1, null, -1, null, -1)  // For deserialization only
```

### Why are the changes needed?

In Scala 2.13, this causes several UT failures because `HighlyCompressedMapStatus.readExternal` doesn't initialize this field. The following is one example.

- org.apache.spark.scheduler.MapStatusSuite
```
MapStatusSuite:
- compressSize
- decompressSize
*** RUN ABORTED ***
  java.lang.NoSuchFieldError: numNonEmptyBlocks
  at org.apache.spark.scheduler.HighlyCompressedMapStatus.<init>(MapStatus.scala:181)
  at org.apache.spark.scheduler.HighlyCompressedMapStatus$.apply(MapStatus.scala:281)
  at org.apache.spark.scheduler.MapStatus$.apply(MapStatus.scala:73)
  at org.apache.spark.scheduler.MapStatusSuite.$anonfun$new$8(MapStatusSuite.scala:64)
  at scala.runtime.java8.JFunction1$mcVD$sp.apply(JFunction1$mcVD$sp.scala:18)
  at scala.collection.immutable.List.foreach(List.scala:333)
  at org.apache.spark.scheduler.MapStatusSuite.$anonfun$new$7(MapStatusSuite.scala:61)
  at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.scala:18)
  at scala.collection.immutable.List.foreach(List.scala:333)
  at org.apache.spark.scheduler.MapStatusSuite.$anonfun$new$6(MapStatusSuite.scala:60)
  ...
```

### Does this PR introduce _any_ user-facing change?

No. This is a private class.

### How was this patch tested?

1. Pass the GitHub Action or Jenkins with the existing tests.
2. Test with Scala-2.13 with `MapStatusSuite`.
```
$ dev/change-scala-version.sh 2.13
$ build/mvn test -pl core --am -Pscala-2.13 -Dtest=none -DwildcardSuites=org.apache.spark.scheduler.MapStatusSuite
...
MapStatusSuite:
- compressSize
- decompressSize
- MapStatus should never report non-empty blocks' sizes as 0
- large tasks should use org.apache.spark.scheduler.HighlyCompressedMapStatus
- HighlyCompressedMapStatus: estimated size should be the average non-empty block size
- SPARK-22540: ensure HighlyCompressedMapStatus calculates correct avgSize
- RoaringBitmap: runOptimize succeeded
- RoaringBitmap: runOptimize failed
- Blocks which are bigger than SHUFFLE_ACCURATE_BLOCK_THRESHOLD should not be underestimated.
- SPARK-21133 HighlyCompressedMapStatus#writeExternal throws NPE
Run completed in 7 seconds, 971 milliseconds.
Total number of tests run: 10
Suites: completed 2, aborted 0
Tests: succeeded 10, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

Closes apache#29231 from dongjoon-hyun/SPARK-32436.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit f9f1867)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants