From a1f2bc8afa1ed6c2641e641aa87df612d5777f39 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 8 Jan 2025 15:01:32 +0800 Subject: [PATCH 1/4] fixup --- .../gluten/execution/ArrowCsvScanSuite.scala | 64 ++++++++++++------- 1 file changed, 40 insertions(+), 24 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala index 374fa543af10..4eb3afb653be 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala @@ -67,33 +67,22 @@ class ArrowCsvScanSuiteV2 extends ArrowCsvScanSuite { } } -/** Since https://github.com/apache/incubator-gluten/pull/5850. */ -abstract class ArrowCsvScanSuite extends VeloxWholeStageTransformerSuite { - override protected val resourcePath: String = "N/A" - override protected val fileFormat: String = "N/A" - - protected val rootPath: String = getClass.getResource("/").getPath - - override def beforeAll(): Unit = { - super.beforeAll() - createCsvTables() - } - - override def afterAll(): Unit = { - super.afterAll() - } - +class ArrowCsvScanWithTableCacheSuite extends ArrowCsvScanSuiteBase { override protected def sparkConf: SparkConf = { super.sparkConf - .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") - .set("spark.sql.files.maxPartitionBytes", "1g") - .set("spark.sql.shuffle.partitions", "1") - .set("spark.memory.offHeap.size", "2g") - .set("spark.unsafe.exceptionOnMemoryLeak", "true") - .set("spark.sql.autoBroadcastJoinThreshold", "-1") - .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true") + .set("spark.sql.sources.useV1SourceList", "csv") + .set(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, "true") + } + + test("csv scan v1 with table cache") { + val df = spark.sql("select * from student") + df.cache() + df.collect() } +} +/** Since https://github.com/apache/incubator-gluten/pull/5850. */ +abstract class ArrowCsvScanSuite extends ArrowCsvScanSuiteBase { test("csv scan with option string as null") { val df = runAndCompare("select * from student_option_str")() val plan = df.queryExecution.executedPlan @@ -152,6 +141,33 @@ abstract class ArrowCsvScanSuite extends VeloxWholeStageTransformerSuite { val df = runAndCompare("select count(1) from student")() checkLengthAndPlan(df, 1) } +} + +abstract class ArrowCsvScanSuiteBase extends VeloxWholeStageTransformerSuite { + override protected val resourcePath: String = "N/A" + override protected val fileFormat: String = "N/A" + + protected val rootPath: String = getClass.getResource("/").getPath + + override def beforeAll(): Unit = { + super.beforeAll() + createCsvTables() + } + + override def afterAll(): Unit = { + super.afterAll() + } + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.sql.files.maxPartitionBytes", "1g") + .set("spark.sql.shuffle.partitions", "1") + .set("spark.memory.offHeap.size", "2g") + .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true") + } private def createCsvTables(): Unit = { spark.read @@ -196,4 +212,4 @@ abstract class ArrowCsvScanSuite extends VeloxWholeStageTransformerSuite { .load(rootPath + "/datasource/csv/student_option_schema.csv") .createOrReplaceTempView("student_option_schema_lm") } -} +} \ No newline at end of file From fdf59bcad4cbf76dcc68ad5929f54e10bcefb498 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 8 Jan 2025 15:05:42 +0800 Subject: [PATCH 2/4] fixup --- .../org/apache/gluten/execution/ArrowCsvScanSuite.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala index 4eb3afb653be..e318a9f802ea 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala @@ -74,10 +74,14 @@ class ArrowCsvScanWithTableCacheSuite extends ArrowCsvScanSuiteBase { .set(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, "true") } + /** + * Test for GLUTEN-8453: https://github.com/apache/incubator-gluten/issues/8453. + * To make sure no error is thrown when caching an Arrow Java query plan. + */ test("csv scan v1 with table cache") { val df = spark.sql("select * from student") df.cache() - df.collect() + assert(df.collect().length == 3) } } @@ -212,4 +216,4 @@ abstract class ArrowCsvScanSuiteBase extends VeloxWholeStageTransformerSuite { .load(rootPath + "/datasource/csv/student_option_schema.csv") .createOrReplaceTempView("student_option_schema_lm") } -} \ No newline at end of file +} From 2b7b0219185124818ef9c7574afcb6bbce94bf9a Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 8 Jan 2025 15:58:23 +0800 Subject: [PATCH 3/4] fixup --- .../columnarbatch/VeloxColumnarBatches.java | 21 +++++++++++++++++++ .../ColumnarCachedBatchSerializer.scala | 16 ++++---------- .../gluten/columnarbatch/ColumnarBatches.java | 10 +++++---- 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java b/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java index db2d08e31435..3c2d739270aa 100644 --- a/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java +++ b/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java @@ -17,6 +17,7 @@ package org.apache.gluten.columnarbatch; import org.apache.gluten.backendsapi.BackendsApiManager; +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators; import org.apache.gluten.runtime.Runtime; import org.apache.gluten.runtime.Runtimes; @@ -56,6 +57,7 @@ public static void checkNonVeloxBatch(ColumnarBatch batch) { } public static ColumnarBatch toVeloxBatch(ColumnarBatch input) { + ColumnarBatches.checkOffloaded(input); if (ColumnarBatches.isZeroColumnBatch(input)) { return input; } @@ -86,6 +88,25 @@ public static ColumnarBatch toVeloxBatch(ColumnarBatch input) { return input; } + /** + * Check if a columnar batch is in Velox format. If not, convert it to Velox format then + * return. If already in Velox format, return the batch directly. + *

+ * Should only be used for certain conditions when unable to insert explicit to-Velox transitions + * through query planner. + *

+ * For example, used by {@link org.apache.spark.sql.execution.ColumnarCachedBatchSerializer} as + * Spark directly calls API ColumnarCachedBatchSerializer#convertColumnarBatchToCachedBatch for + * query plan that returns supportsColumnar=true without generating a cache-write query plan node. + */ + public static ColumnarBatch ensureVeloxBatch(ColumnarBatch input) { + final ColumnarBatch light = ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), input); + if (isVeloxBatch(light)) { + return light; + } + return toVeloxBatch(light); + } + /** * Combine multiple columnar batches horizontally, assuming each of them is already offloaded. * Otherwise {@link UnsupportedOperationException} will be thrown. diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala index 1f9419976f29..16004737ea7f 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala @@ -171,24 +171,16 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with Logging { conf: SQLConf): RDD[CachedBatch] = { input.mapPartitions { it => - val lightBatches = it.map { + val veloxBatches = it.map { /* Native code needs a Velox offloaded batch, making sure to offload if heavy batch is encountered */ - batch => - val heavy = ColumnarBatches.isHeavyBatch(batch) - if (heavy) { - val offloaded = VeloxColumnarBatches.toVeloxBatch( - ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(), batch)) - offloaded - } else { - batch - } + batch => VeloxColumnarBatches.ensureVeloxBatch(batch) } new Iterator[CachedBatch] { - override def hasNext: Boolean = lightBatches.hasNext + override def hasNext: Boolean = veloxBatches.hasNext override def next(): CachedBatch = { - val batch = lightBatches.next() + val batch = veloxBatches.next() val results = ColumnarBatchSerializerJniWrapper .create( diff --git a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java index 5114853363bd..156de4e0d84d 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java @@ -85,7 +85,8 @@ private static BatchType identifyBatchType(ColumnarBatch batch) { } /** Heavy batch: Data is readable from JVM and formatted as Arrow data. */ - public static boolean isHeavyBatch(ColumnarBatch batch) { + @VisibleForTesting + static boolean isHeavyBatch(ColumnarBatch batch) { return identifyBatchType(batch) == BatchType.HEAVY; } @@ -93,7 +94,8 @@ public static boolean isHeavyBatch(ColumnarBatch batch) { * Light batch: Data is not readable from JVM, a long int handle (which is a pointer usually) is * used to bind the batch to a native side implementation. */ - public static boolean isLightBatch(ColumnarBatch batch) { + @VisibleForTesting + static boolean isLightBatch(ColumnarBatch batch) { return identifyBatchType(batch) == BatchType.LIGHT; } @@ -128,7 +130,7 @@ public static ColumnarBatch select(String backendName, ColumnarBatch batch, int[ * Ensure the input batch is offloaded as native-based columnar batch (See {@link IndicatorVector} * and {@link PlaceholderVector}). This method will close the input column batch after offloaded. */ - private static ColumnarBatch ensureOffloaded(BufferAllocator allocator, ColumnarBatch batch) { + static ColumnarBatch ensureOffloaded(BufferAllocator allocator, ColumnarBatch batch) { if (ColumnarBatches.isLightBatch(batch)) { return batch; } @@ -140,7 +142,7 @@ private static ColumnarBatch ensureOffloaded(BufferAllocator allocator, Columnar * take place if loading is required, which means when the input batch is not loaded yet. This * method will close the input column batch after loaded. */ - private static ColumnarBatch ensureLoaded(BufferAllocator allocator, ColumnarBatch batch) { + static ColumnarBatch ensureLoaded(BufferAllocator allocator, ColumnarBatch batch) { if (isHeavyBatch(batch)) { return batch; } From 8dc4f8b9d34db4fff0597fef179fba629806c642 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 8 Jan 2025 16:00:12 +0800 Subject: [PATCH 4/4] fixup --- .../columnarbatch/VeloxColumnarBatches.java | 17 +++++++++-------- .../gluten/execution/ArrowCsvScanSuite.scala | 4 ++-- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java b/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java index 3c2d739270aa..33f02be08980 100644 --- a/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java +++ b/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java @@ -89,18 +89,19 @@ public static ColumnarBatch toVeloxBatch(ColumnarBatch input) { } /** - * Check if a columnar batch is in Velox format. If not, convert it to Velox format then - * return. If already in Velox format, return the batch directly. - *

- * Should only be used for certain conditions when unable to insert explicit to-Velox transitions - * through query planner. - *

- * For example, used by {@link org.apache.spark.sql.execution.ColumnarCachedBatchSerializer} as + * Check if a columnar batch is in Velox format. If not, convert it to Velox format then return. + * If already in Velox format, return the batch directly. + * + *

Should only be used for certain conditions when unable to insert explicit to-Velox + * transitions through query planner. + * + *

For example, used by {@link org.apache.spark.sql.execution.ColumnarCachedBatchSerializer} as * Spark directly calls API ColumnarCachedBatchSerializer#convertColumnarBatchToCachedBatch for * query plan that returns supportsColumnar=true without generating a cache-write query plan node. */ public static ColumnarBatch ensureVeloxBatch(ColumnarBatch input) { - final ColumnarBatch light = ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), input); + final ColumnarBatch light = + ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), input); if (isVeloxBatch(light)) { return light; } diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala index e318a9f802ea..c59936a927c1 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala @@ -75,8 +75,8 @@ class ArrowCsvScanWithTableCacheSuite extends ArrowCsvScanSuiteBase { } /** - * Test for GLUTEN-8453: https://github.com/apache/incubator-gluten/issues/8453. - * To make sure no error is thrown when caching an Arrow Java query plan. + * Test for GLUTEN-8453: https://github.com/apache/incubator-gluten/issues/8453. To make sure no + * error is thrown when caching an Arrow Java query plan. */ test("csv scan v1 with table cache") { val df = spark.sql("select * from student")