Skip to content

[GLUTEN-9163][VL] Use stream de/compressor in sort-based shuffle#9278

Merged
marin-ma merged 15 commits intoapache:mainfrom
marin-ma:sort-based-shuffle-stream-compression
Apr 17, 2025
Merged

[GLUTEN-9163][VL] Use stream de/compressor in sort-based shuffle#9278
marin-ma merged 15 commits intoapache:mainfrom
marin-ma:sort-based-shuffle-stream-compression

Conversation

@marin-ma
Copy link
Copy Markdown
Contributor

@marin-ma marin-ma commented Apr 9, 2025

For shuffle writer tasks with heavily spill, streaming compression should provide better compression ratio. Currently, spilled data is compressed before writing to the output stream. This patch modifies the compression logic for sort-based shuffle, removes that compression and uses the compressed output stream to do the work.

@github-actions github-actions bot added the VELOX label Apr 9, 2025
@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 9, 2025

#9163

@marin-ma marin-ma force-pushed the sort-based-shuffle-stream-compression branch from 79ecfbf to abf4ddb Compare April 11, 2025 07:01
@marin-ma marin-ma force-pushed the sort-based-shuffle-stream-compression branch from 57ec2b9 to 1b762f3 Compare April 14, 2025 10:54
@marin-ma
Copy link
Copy Markdown
Contributor Author

@j7nhai I can see the compression size is reduced with the streaming compression. Could you try this patch for your case?

@j7nhai
Copy link
Copy Markdown
Contributor

j7nhai commented Apr 16, 2025

@j7nhai I can see the compression size is reduced with the streaming compression. Could you try this patch for your case?

@marin-ma I will try and give my feedback soon.

@j7nhai
Copy link
Copy Markdown
Contributor

j7nhai commented Apr 16, 2025

@j7nhai I can see the compression size is reduced with the streaming compression. Could you try this patch for your case?

Seems that there are no compressedOS before this patch, and the spark conf spark.gluten.sql.columnar.shuffle.codec actually not work. Right? @marin-ma

@marin-ma
Copy link
Copy Markdown
Contributor Author

Seems that there are no compressedOS before this patch, and the spark conf spark.gluten.sql.columnar.shuffle.codec actually not work. Right? @marin-ma

@j7nhai That's not true. We didn't use compressed os because the spilled data has been compressed before writing to the output stream. This patch modifies the compression logic, removes that compression and uses the compressed os to do the work.

@marin-ma marin-ma marked this pull request as ready for review April 16, 2025 09:00
@j7nhai
Copy link
Copy Markdown
Contributor

j7nhai commented Apr 17, 2025

@j7nhai I can see the compression size is reduced with the streaming compression. Could you try this patch for your case?

It works much better now in my case. In some case, number of shuffle byte written is less than vanllina spark.

@marin-ma marin-ma changed the title [GLUTEN-9163][VL] WIP: Use stream de/compressor in sort-based shuffle [GLUTEN-9163][VL] Use stream de/compressor in sort-based shuffle Apr 17, 2025
@zhouyuan
Copy link
Copy Markdown
Member

@j7nhai I can see the compression size is reduced with the streaming compression. Could you try this patch for your case?

It works much better now in my case. In some case, number of shuffle byte written is less than vanllina spark.

hi @j7nhai thanks for reporting back! other than the shuffle data size, have you also notice some performance improvement on shuffle operator with this patch? thanks, -yuan

Copy link
Copy Markdown
Member

@zhouyuan zhouyuan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
The code is clean, I can understand the logic here is to using compressing stream instead of iostream + compression. so the benefits may come from less metadata written?

Comment thread cpp/core/shuffle/Payload.cc Outdated
return getBufferSize(buffers_);
}

bool InMemoryPayload::mergable() const {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice refactor! typo mergeable ?

Comment thread cpp/core/shuffle/Utils.h

// TODO: Support setting chunk size
// Write 64 KB compressed data at a time
static const int64_t kChunkSize = 64 * 1024;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comparing with the original method, will this bring higher memory footprint?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should respect spark.io.compression.lz4.blockSize and spark.io.compression.zstd.bufferSize when creating this buffer. Both have a default value of 32k. I will create a followup pr to do some refactoring and make this configurable.

Comment thread cpp/core/shuffle/Utils.h
}

RETURN_NOT_OK(FinalizeCompression());
ARROW_ASSIGN_OR_RAISE(compressor_, codec_->MakeCompressor());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this compressor out of the shufflestream and reuse it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot reuse the compressor outside of the stream. One output stream is held by one spiller, and the compressor is re-created each time the spiller receive a new partition to spill. Therefore the compressor can be ended and then recreated multiple times when writing one output stream.

@zhouyuan
Copy link
Copy Markdown
Member

@GlutenPerfBot benchmark velox

@marin-ma marin-ma merged commit 1b8d420 into apache:main Apr 17, 2025
47 checks passed
namespace gluten {

namespace {
constexpr uint64_t kMaxReadBufferSize = (1 << 20) - AlignedBuffer::kPaddedSize;
Copy link
Copy Markdown
Contributor

@j7nhai j7nhai Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can make this configurable? @marin-ma

For cases with huge data, this may not be enough

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will follow up.

Copy link
Copy Markdown
Contributor Author

@marin-ma marin-ma Apr 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@j7nhai Resolved in #9356

The configuration is spark.gluten.sql.columnar.shuffle.sort.deserializerBufferSize, default 1MB

j7nhai pushed a commit to j7nhai/gluten that referenced this pull request Apr 22, 2025
@wangyum
Copy link
Copy Markdown
Member

wangyum commented May 22, 2025

It seems this patch makes core dump. Error message:

/bin/bash: line 1: 320713 Aborted                 (core dumped) LD_LIBRARY_PATH="/apache/hadoop/lib/native/Linux-amd64-64/lib:/apache/hadoop/lib/native:" /apache/java17/bin/java -server -Xmx46080m -Xms46080m '-Djava.net.preferIPv6Addresses=false' '-XX:+IgnoreUnrecognizedVMOptions' '--add-opens=java.base/java.lang=ALL-UNNAMED' '--add-opens=java.base/java.lang.invoke=ALL-UNNAMED' '--add-opens=java.base/java.lang.reflect=ALL-UNNAMED' '--add-opens=java.base/java.io=ALL-UNNAMED' '--add-opens=java.base/java.net=ALL-UNNAMED' '--add-opens=java.base/java.nio=ALL-UNNAMED' '--add-opens=java.base/java.util=ALL-UNNAMED' '--add-opens=java.base/java.util.concurrent=ALL-UNNAMED' '--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED' '--add-opens=java.base/sun.nio.ch=ALL-UNNAMED' '--add-opens=java.base/sun.nio.cs=ALL-UNNAMED' '--add-opens=java.base/sun.security.action=ALL-UNNAMED' '--add-opens=java.base/sun.util.calendar=ALL-UNNAMED' '--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED' '-Djdk.reflect.useDirectMethodHandle=false' '-Dio.netty.tryReflectionSetAccessible=true' '-Xss8M' '-XX:MaxPermSize=512m' '-XX:+IgnoreUnrecognizedVMOptions' '-XX:+UseG1GC' '-XX:+UnlockExperimentalVMOptions' '-XX:MaxGCPauseMillis=500' '-XX:ParallelGCThreads=10' '-XX:ConcGCThreads=4' '-XX:G1MixedGCLiveThresholdPercent=55' '-XX:G1ReservePercent=15' '-XX:+PrintGC' '-XX:+PrintGCDetails' '-XX:+PrintGCDateStamps' '-XX:+PrintGCApplicationStoppedTime' '-XX:+PrintGCApplicationConcurrentTime' '-XX:+PrintClassHistogramAfterFullGC' '-XX:MaxJavaStackTraceDepth=50000' '-XX:ReservedCodeCacheSize=512M' '-XX:PerMethodRecompilationCutoff=10000' '-XX:PerBytecodeRecompilationCutoff=10000' '-Djava.net.preferIPv4Stack=true' '-Dsun.net.inetaddr.ttl=120' '-Dio.netty.eventLoop.maxPendingTasks=20000' '-Dio.netty.maxDirectMemory=15032385536' '-Dio.netty.tryReflectionSetAccessible=true' -Djava.io.tmpdir=/hadoop/4/yarn/local/usercache/appcache/application_1736396393732_145750/container_e2322_1736396393732_145750_01_000191/tmp '-Dspark.port.maxRetries=100' '-Dspark.driver.blockManager.port=30601' '-Dspark.blockManager.port=30501' '-Dspark.ui.port=30401' '-Dspark.viewpoint.ui.port=9090' '-Dspark.history.ui.port=9090' '-Dspark.network.timeout=300' '-Dspark.ssl.historyServer.port=8080' '-Dspark.driver.port=30201' '-Dspark.rpc.io.serverThreads=16' '-Dspark.shuffle.ess.service.port=7337' '-Dspark.shuffle.service.port=7338' -Dspark.yarn.app.container.log.dir=/hadoop/2/yarn/logs/application_1736396393732_145750/container_e2322_1736396393732_145750_01_000191 -XX:OnOutOfMemoryError='kill %p' org.apache.spark.executor.YarnCoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.211.174.46:30201 --executor-id 107  --cores 18 --app-id application_1736396393732_145750 --resourceProfileId 0 > /hadoop/2/yarn/logs/application_1736396393732_145750/container_e2322_1736396393732_145750_01_000191/stdout 2> /hadoop/2/yarn/logs/application_1736396393732_145750/container_e2322_1736396393732_145750_01_000191/stderr
Last 4096 bytes of stderr :
W20250522 09:56:36.712019 321053 MemoryArbitrator.cpp:84] Query memory capacity[10.50GB] is set for NOOP arbitrator which has no capacity enforcement
java: malloc.c:4687: _int_free: Assertion `heap->ar_ptr == av' failed.

Conf:

spark.gluten.sql.columnar.shuffle.sort.partitions.threshol=0
spark.gluten.sql.columnar.shuffle.sort.columns.threshold=0
spark.gluten.storage.hdfsViewfs.enabled=true

@zhouyuan
Copy link
Copy Markdown
Member

@wangyum Thanks for reporting! Is it convenient for you to share the shuffle schema also? It looks like TPCH/DS based tests are not able to trigger the issue in our testing env.

@wangyum
Copy link
Copy Markdown
Member

wangyum commented May 23, 2025

It seems I can reproduce this issue:

create table t_decimal_tt11 as
SELECT Cast(id AS DECIMAL(18, 0)) id
FROM   Range(10000000);

create table t_decimal_tt12 as
SELECT Cast(id AS DECIMAL(18, 0)) id
FROM   Range(2103627078L);
insert into t_decimal_tt12 values(-998), (-999), (-1), (-10), (-9999), (-1003), (-2), (-1005), (-1006), (-1007), (-1008), (-1009);

SELECT Count(*)
FROM  t_decimal_tt11 t_decimal_tt1
LEFT JOIN t_decimal_tt12 t_decimal_tt2
      ON t_decimal_tt1.id = t_decimal_tt2.id;
Key Value
spark.driver.maxResultSize 50g
spark.driver.memory 170g
spark.executor.cores 18
spark.executor.maxNumFailures 2147483647
spark.executor.memory 45g
spark.executor.memoryOverhead 14336
spark.gluten.enabled true
spark.gluten.memory.conservative.task.offHeap.size.in.bytes 1640438897
spark.gluten.memory.offHeap.size.in.bytes 59055800320
spark.gluten.memory.task.offHeap.size.in.bytes 3280877795
spark.gluten.memoryOverhead.size.in.bytes 15032385536
spark.gluten.numTaskSlotsPerExecutor 18
spark.gluten.sql.columnar.backend.velox.IOThreads 0
spark.memory.fraction 0.75
spark.memory.offHeap.enabled true
spark.memory.offHeap.size 55g
spark.plugins org.apache.gluten.GlutenPlugin
spark.shuffle.manager org.apache.spark.shuffle.sort.ColumnarShuffleManager
spark.shuffle.service.enabled true
spark.shuffle.service.port 7338
spark.shuffle.service.removeShuffle true
spark.shuffle.spill.numElementsForceSpillThreshold 10000000
spark.shuffle.useOldFetchProtocol false
spark.speculation true
spark.speculation.interval 5000
spark.speculation.minTaskRuntime 5000
spark.speculation.multiplier 2
spark.speculation.quantile 0.95
spark.sql.adaptive.coalescePartitions.initialPartitionNum 10000
spark.sql.adaptive.coalescePartitions.minPartitionNum 200
spark.sql.adaptive.customCostEvaluatorClass org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator
spark.sql.adaptive.enabled true

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants