[GLUTEN-10214][VL] Merge inputstream for shuffle reader#10499
[GLUTEN-10214][VL] Merge inputstream for shuffle reader#10499marin-ma merged 1 commit intoapache:mainfrom
Conversation
|
Run Gluten Clickhouse CI on x86 |
6 similar comments
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
77780bd to
b16faf5
Compare
|
Run Gluten Clickhouse CI on x86 |
|
Perf result shows overall 2% improvement for TPCH/TPCDS sf500. TPCH SF500
TPCDS SF500
|
|
Why hash-based shuffle reader cannot benefit from it by eliminate VeloxResizeBatches? |
| extends ColumnarBatchSerializerInstance | ||
| with Logging { | ||
|
|
||
| private val runtime = |
There was a problem hiding this comment.
If the runtime is only used to construct ShuffleReaderJniWrapper, do not create it here, wrap in jniWrapper
There was a problem hiding this comment.
It's also used when creating the output ColumnarBatchOutIterator
cpp/core/jni/JniWrapper.cc
Outdated
| public: | ||
| ShuffleStreamReader(JNIEnv* env, jobject reader) { | ||
| if (env->GetJavaVM(&vm_) != JNI_OK) { | ||
| std::string errorMessage = "Unable to get JavaVM instance"; |
There was a problem hiding this comment.
throw GlutenException("Unable to get JavaVM instance")
cpp/core/shuffle/ShuffleReader.h
Outdated
|
|
||
| namespace gluten { | ||
|
|
||
| class ShuffleReader; |
There was a problem hiding this comment.
Why need this line?
|
|
||
| namespace gluten { | ||
|
|
||
| class TestStreamReader : public StreamReader { |
There was a problem hiding this comment.
Maybe add a library to move code only used for tests and benchmarks out of the production source code.
There was a problem hiding this comment.
May add BUILD_TEST_UTILS like VELOX_BUILD_TEST_UTILS, and set it to ON if BUILD_TESTS = ON or BUILD_BENCHMARK=ON
There was a problem hiding this comment.
It's a good idea. I will propose another pr for this refactor.
b16faf5 to
8f4c974
Compare
|
Run Gluten Clickhouse CI on x86 |
8f4c974 to
95e3b9c
Compare
|
Run Gluten Clickhouse CI on x86 |
95e3b9c to
575086b
Compare
|
Run Gluten Clickhouse CI on x86 |
Change-Id: Id2dd154e13529b1d295f04ada41f76d3e37feb8f
The current design of Shuffle reading process reuses Spark's
BlockStoreShuffleReader. The shuffle reader process each input stream individually with a dedicatedSerializerInstanceto deserialize the input data. This works fine when the output are rows. However, in Gluten we convert the deserialised row data into columnar batches as the output during sort-based shuffle read.In real use cases, each input stream may only contain a small number of rows, The deserialisation time for sort-based shuffle reader can be very slow due to the small batch of row->column conversion. In this case, it's hard to tune the performance for the r2c process.
This patch adds a new
ColumnarShuffleReaderto create only oneSerializerInstancefor a reducer task that accepts deserialising all input streams. This allows the native reader to load all input streams so that it can do the r2c conversion with reading/accumulating a larger number of rows. This change can also eliminate theVeloxResizeBatchesoperation for sort-based shuffle read.There are no benefits for hash-based shuffle reader and rss-sort shuffle reader.
Below charts demonstrate the before and after shuffle read process for this change.
Before:

After:
