diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index c6ec95ded55f..0b20e5aeeabb 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -106,12 +106,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { updateInputMetrics, updateInputMetrics.map(_ => context.taskMetrics().inputMetrics).orNull) - context.addTaskFailureListener( - (ctx, _) => { - if (ctx.isInterrupted()) { - iter.cancel() - } - }) + context.addTaskFailureListener((ctx, _) => { iter.cancel() }) + context.addTaskCompletionListener[Unit](_ => iter.close()) new CloseableCHColumnBatchIterator(iter, Some(pipelineTime)) } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala index 4365b2987c2b..412ce145ed34 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala @@ -36,6 +36,17 @@ object RuntimeSettings { .doc("https://clickhouse.com/docs/en/operations/settings/query-complexity#settings-max_bytes_before_external_sort") .longConf .createWithDefault(0) + + // TODO: support check value + val OUTPUT_FORMAT_COMPRESSION_LEVEL = + buildConf(runtimeSettings("output_format_compression_level")) + .doc(s"""https://clickhouse.com/docs/en/operations/settings/settings#output_format_compression_level + | Notes: we always use Snappy compression, and Snappy doesn't support compression level. + | Currently, we ONLY set it in UT. + |""".stripMargin) + .longConf + .createWithDefault(Integer.MIN_VALUE & 0xffffffffL) + // .checkValue(v => v >= 0, "COMPRESSION LEVEL must be greater than 0") // scalastyle:on line.size.limit /** Gluten Configuration */ diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala index d43d23bfc8d8..efc7138fe9ca 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala @@ -104,7 +104,6 @@ class CHColumnarWriteFilesRDD( * otherwise, we need to access ColumnarBatch row by row, which is not efficient. */ val writeResults = CHExecUtil.c2r(resultColumnarBatch).map(_.copy()).toSeq - // TODO: we need close iterator here before processing the result. // TODO: task commit time // TODO: get the schema from result ColumnarBatch and verify it. assert(!iter.hasNext) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala index 76afd602cefa..495e8961500c 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala @@ -16,10 +16,11 @@ */ package org.apache.gluten.execution -import org.apache.gluten.backendsapi.clickhouse.CHConf +import org.apache.gluten.backendsapi.clickhouse.{CHConf, RuntimeSettings} import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.exception.GlutenException -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{functions, DataFrame, Row} import org.apache.spark.sql.execution.LocalTableScanExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -1464,16 +1465,14 @@ class GlutenClickHouseExcelFormatSuite fileName } - /** TODO: fix the issue and test in spark 3.5 */ - testSparkVersionLE33("write into hdfs") { + test("write into hdfs") { /** * There is a bug in pipeline write to HDFS; when a pipeline returns column batch, it doesn't * close the hdfs file, and hence the file is not flushed.HDFS file is closed when LocalExecutor * is destroyed, but before that, the file moved by spark committer. */ - val tableName = "write_into_hdfs" - val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/$tableName/" + val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/write_into_hdfs/" val format = "parquet" val sql = s""" @@ -1485,4 +1484,51 @@ class GlutenClickHouseExcelFormatSuite testFileFormatBase(tablePath, format, sql, df => {}) } } + + // TODO: pass spark configuration to FileFormatWriter in Spark 3.3 and 3.2 + testWithSpecifiedSparkVersion( + "write failed if set wrong snappy compression codec level", + Some("3.5")) { + // TODO: remove duplicated test codes + val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/failed_test/" + val format = "parquet" + val sql = + s""" + | select * + | from $format.`$tablePath` + | where long_field > 30 + |""".stripMargin + + withSQLConf( + (GlutenConfig.NATIVE_WRITER_ENABLED.key, "true"), + ( + RuntimeSettings.OUTPUT_FORMAT_COMPRESSION_LEVEL.key, + RuntimeSettings.OUTPUT_FORMAT_COMPRESSION_LEVEL.defaultValue.get.toString) + ) { + testFileFormatBase(tablePath, format, sql, df => {}) + } + + // we can't pass the configuration to FileFormatWriter in Spark 3.3 and 3.2 + withSQLConf( + (GlutenConfig.NATIVE_WRITER_ENABLED.key, "true"), + (RuntimeSettings.OUTPUT_FORMAT_COMPRESSION_LEVEL.key, "3") + ) { + val sparkError = intercept[SparkException] { + testFileFormatBase(tablePath, format, sql, df => {}) + } + + // throw at org.apache.spark.sql.execution.CHColumnarWriteFilesRDD + val causeOuter = sparkError.getCause + assert(causeOuter.isInstanceOf[SparkException]) + assert(causeOuter.getMessage.contains("Task failed while writing rows to output path: hdfs")) + + // throw at the writing file + val causeInner = causeOuter.getCause + assert(causeInner.isInstanceOf[GlutenException]) + assert( + causeInner.getMessage.contains( + "Invalid: Codec 'snappy' doesn't support setting a compression level")) + } + + } } diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version index 45287fab517c..dfeb3287e387 100644 --- a/cpp-ch/clickhouse.version +++ b/cpp-ch/clickhouse.version @@ -1,3 +1,3 @@ CH_ORG=Kyligence -CH_BRANCH=rebase_ch/20250115 -CH_COMMIT=8e0d5eaf0fc +CH_BRANCH=rebase_ch/20250116 +CH_COMMIT=a260339b40c diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index b2dfa3cf449d..37d1a1bbd717 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -58,6 +58,7 @@ #include #include #include +#include #include #include #include @@ -91,6 +92,7 @@ extern const SettingsDouble max_bytes_ratio_before_external_sort; extern const SettingsBool query_plan_merge_filters; extern const SettingsBool compile_expressions; extern const SettingsShortCircuitFunctionEvaluation short_circuit_function_evaluation; +extern const SettingsUInt64 output_format_compression_level; } namespace ErrorCodes { @@ -640,23 +642,31 @@ void BackendInitializerUtil::initSettings(const SparkConfigs::ConfigMap & spark_ settings.set("input_format_parquet_enable_row_group_prefetch", false); settings.set("output_format_parquet_use_custom_encoder", false); + //1. // TODO: we need set Setting::max_threads to 1 by default, but now we can't get correct metrics for the some query if we set it to 1. // settings[Setting::max_threads] = 1; - /// Set false after https://github.com/ClickHouse/ClickHouse/pull/71539 - /// if true, we can't get correct metrics for the query + /// 2. After https://github.com/ClickHouse/ClickHouse/pull/71539 + /// Set false to query_plan_merge_filters. + /// If true, we can't get correct metrics for the query settings[Setting::query_plan_merge_filters] = false; + /// 3. After https://github.com/ClickHouse/ClickHouse/pull/70598. + /// Set false to compile_expressions to avoid dead loop. + /// TODO: FIXME set true again. /// We now set BuildQueryPipelineSettings according to config. - // TODO: FIXME. Set false after https://github.com/ClickHouse/ClickHouse/pull/70598. settings[Setting::compile_expressions] = false; settings[Setting::short_circuit_function_evaluation] = ShortCircuitFunctionEvaluation::DISABLE; - /// - // After https://github.com/ClickHouse/ClickHouse/pull/73422 - // Since we already set max_bytes_before_external_sort, set max_bytes_ratio_before_external_sort to 0 + /// 4. After https://github.com/ClickHouse/ClickHouse/pull/73422 + /// Since we already set max_bytes_before_external_sort, set max_bytes_ratio_before_external_sort to 0 settings[Setting::max_bytes_ratio_before_external_sort] = 0.; + /// 5. After https://github.com/ClickHouse/ClickHouse/pull/73651. + /// See following settings, we always use Snappy compression for Parquet, however after https://github.com/ClickHouse/ClickHouse/pull/73651, + /// output_format_compression_level is set to 3, which is wrong, since snappy does not support it. + settings[Setting::output_format_compression_level] = arrow::util::kUseDefaultCompressionLevel; + for (const auto & [key, value] : spark_conf_map) { // Firstly apply spark.gluten.sql.columnar.backend.ch.runtime_config.local_engine.settings.* to settings diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h index 9e848f82d782..d12ccaae6062 100644 --- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h +++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h @@ -371,14 +371,24 @@ class SubstraitFileSink final : public DB::SinkToStorage } void onFinish() override { - if (output_format_) [[unlikely]] + if (output_format_) { output_format_->finalizeOutput(); + /// We need close reset output_format_ here before return to spark, because the file is closed in ~WriteBufferFromHDFSImpl(). + /// So that Spark Commit protocol can move the file safely. + output_format_.reset(); assert(delta_stats_.row_count > 0); if (stats_) stats_->collectStats(relative_path_, partition_id_, delta_stats_); } } + void onCancel() noexcept override + { + if (output_format_) { + output_format_->cancel(); + output_format_.reset(); + } + } }; class SparkPartitionedBaseSink : public DB::PartitionedSink diff --git a/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h b/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h index 915f9a7e7efa..a2a7ab052e02 100644 --- a/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h +++ b/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h @@ -29,14 +29,19 @@ class OutputFormatFile public: struct OutputFormat { - DB::OutputFormatPtr output; std::unique_ptr write_buffer; + DB::OutputFormatPtr output; void finalizeOutput() const { output->finalize(); output->flush(); write_buffer->finalize(); } + void cancel() + { + output.reset(); + write_buffer->finalize(); + } }; using OutputFormatPtr = std::shared_ptr; diff --git a/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp b/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp index f3ac41c19a5b..684c433db3e3 100644 --- a/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp +++ b/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp @@ -25,7 +25,6 @@ # include # include # include -# include namespace local_engine {