[SPARK-41952][SQL] Fix Parquet zstd off-heap memory leak as a workaround for PARQUET-2160#40091
[SPARK-41952][SQL] Fix Parquet zstd off-heap memory leak as a workaround for PARQUET-2160#40091pan3793 wants to merge 4 commits intoapache:masterfrom
Conversation
dongjoon-hyun
left a comment
There was a problem hiding this comment.
cc @sunchao and @parthchandra
There was a problem hiding this comment.
BTW, do you happen to know why Apache Parquet community didn't fill the Fixed Version of that parquet JIRA or did release, @pan3793 ? According to the log, it seems to be merged on September, 2022.
|
Based on apache/parquet-java#982 (comment), I guess that the Parquet community may think it's not a critical issue, but in my case, it's critical. |
|
banch-3.3/3.2 use parquet 1.12.2, if this fix is accepted, would you mind submitting pr for these two branches? @pan3793 |
sure. |
|
I verified this patch by scanning a large parquet/zstd table and updated the PR description. |
| * workaround for memory issues encountered when reading from zstd-compressed files. For | ||
| * details, see <a href="https://issues.apache.org/jira/browse/PARQUET-2160">PARQUET-2160</a> | ||
| */ | ||
| public class ParquetCodecFactory extends CodecFactory { |
There was a problem hiding this comment.
I think we should add a TODO to remind us to revert this file when updating Parquet version
| * workaround for memory issues encountered when reading from zstd-compressed files. For | ||
| * details, see <a href="https://issues.apache.org/jira/browse/PARQUET-2160">PARQUET-2160</a> | ||
| * | ||
| * TODO: Remove this workaround after upgrading Parquet which include PARQUET-2160. |
There was a problem hiding this comment.
should be TODO(SPARK-xxx):
|
Thank you all. And, feel free to merge to land this to the release branches, @sunchao . cc @cloud-fan , @HyukjinKwon , @mridulm , @tgravescs |
…und for PARQUET-2160 ### What changes were proposed in this pull request? SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first. We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM. See more discussions at apache/parquet-java#982 and apache/iceberg#5681 ### Why are the changes needed? The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet. ### Does this PR introduce _any_ user-facing change? Yes, it's bug fix. ### How was this patch tested? The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table. ``` spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g ``` ``` spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false) ``` - before this patch All executors get killed by NM quickly. ``` ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead. ``` <img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png"> - after this patch Query runs well, no executor gets killed. <img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png"> Closes #40091 from pan3793/SPARK-41952. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Chao Sun <sunchao@apple.com>
…und for PARQUET-2160 ### What changes were proposed in this pull request? SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first. We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM. See more discussions at apache/parquet-java#982 and apache/iceberg#5681 ### Why are the changes needed? The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet. ### Does this PR introduce _any_ user-facing change? Yes, it's bug fix. ### How was this patch tested? The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table. ``` spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g ``` ``` spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false) ``` - before this patch All executors get killed by NM quickly. ``` ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead. ``` <img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png"> - after this patch Query runs well, no executor gets killed. <img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png"> Closes #40091 from pan3793/SPARK-41952. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Chao Sun <sunchao@apple.com>
|
Thanks! merged to master/branch-3.3/branch-3.2 |
|
@sunchao also need merge to branch-3.4 ... |
…und for PARQUET-2160 ### What changes were proposed in this pull request? SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first. We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM. See more discussions at apache/parquet-java#982 and apache/iceberg#5681 ### Why are the changes needed? The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet. ### Does this PR introduce _any_ user-facing change? Yes, it's bug fix. ### How was this patch tested? The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table. ``` spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g ``` ``` spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false) ``` - before this patch All executors get killed by NM quickly. ``` ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead. ``` <img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png"> - after this patch Query runs well, no executor gets killed. <img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png"> Closes #40091 from pan3793/SPARK-41952. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Chao Sun <sunchao@apple.com>
|
Yes, it's in branch-3.4 as well |
…und for PARQUET-2160 ### What changes were proposed in this pull request? SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first. We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM. See more discussions at apache/parquet-java#982 and apache/iceberg#5681 ### Why are the changes needed? The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet. ### Does this PR introduce _any_ user-facing change? Yes, it's bug fix. ### How was this patch tested? The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table. ``` spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g ``` ``` spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false) ``` - before this patch All executors get killed by NM quickly. ``` ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead. ``` <img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png"> - after this patch Query runs well, no executor gets killed. <img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png"> Closes apache#40091 from pan3793/SPARK-41952. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Chao Sun <sunchao@apple.com>
…und for PARQUET-2160 ### What changes were proposed in this pull request? SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first. We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM. See more discussions at apache/parquet-java#982 and apache/iceberg#5681 ### Why are the changes needed? The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet. ### Does this PR introduce _any_ user-facing change? Yes, it's bug fix. ### How was this patch tested? The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table. ``` spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g ``` ``` spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false) ``` - before this patch All executors get killed by NM quickly. ``` ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead. ``` <img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png"> - after this patch Query runs well, no executor gets killed. <img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png"> Closes apache#40091 from pan3793/SPARK-41952. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Chao Sun <sunchao@apple.com>
|
@pan3793 We found that in some cases this workaround did not work when reading Parquet files in zstd codec and outputting it to a new path. val df = spark.read.parquet("zstd_parquet_file")
df.repartition(3).write.mode("overwrite").parquet("/tmp/test")In debug mode, you can see it won't set ParquetCodecFactory when initializing |
|
@yujhe possible to provide a text thread stack? I will find some time to take a look later |
|
Sure, here is the thread stack. |
|
We found that this happens if we are reading Parquet file with nested columns in schema. val path = "/tmp/parquet_zstd"
(1 to 100).map(i => (i, Seq(i)))
.toDF("id", "value")
.repartition(1)
.write
.mode("overwrite")
.parquet(path)
val df = spark.read.parquet(path)
df.write.mode("overwrite").parquet("/tmp/dummy")After tracing the code, ParquetCodecFactory only applies to VectorizedParquetRecordReader with vectorized reader enabled. However, for schema with nested columns, the vectorized reader is disabled by default in Spark 3.3 ( |
|
@yujhe Oops, I totally forgot this one... Your analysis makes sense, I took a look at the parquet non-vectorized reading code path, injecting such a workaround does not clean as we did in this PR. As this is not a regression, and the new Spark version adopts new Parquet version which already fixed this issue, I prefer to keep things as-is in branch-3.3. |
+1, It seems there's no elegant way to fix the path where the native parquet-mr initializes the reader. |
|
Totally understand, we will try to apply patch PARQUET-2160 to parquet-hadoop-1.12.2 to our cluster. |

What changes were proposed in this pull request?
SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first.
We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM.
See more discussions at apache/parquet-java#982 and apache/iceberg#5681
Why are the changes needed?
The issue is fixed in the parquet community PARQUET-2160, but the patched version is not available yet.
Does this PR introduce any user-facing change?
Yes, it's bug fix.
How was this patch tested?
The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table.
All executors get killed by NM quickly.
Query runs well, no executor gets killed.