From a6593f8e3f09f7df3854af8bbc066d2151b1a4c6 Mon Sep 17 00:00:00 2001 From: kyligence-git Date: Wed, 15 Jan 2025 23:08:53 +0000 Subject: [PATCH 1/5] [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250116) --- cpp-ch/clickhouse.version | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version index 45287fab517c..dfeb3287e387 100644 --- a/cpp-ch/clickhouse.version +++ b/cpp-ch/clickhouse.version @@ -1,3 +1,3 @@ CH_ORG=Kyligence -CH_BRANCH=rebase_ch/20250115 -CH_COMMIT=8e0d5eaf0fc +CH_BRANCH=rebase_ch/20250116 +CH_COMMIT=a260339b40c From d4db7683a5434447076f113f580871261e881761 Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Thu, 16 Jan 2025 11:23:16 +0800 Subject: [PATCH 2/5] Fix ut due to https://github.com/ClickHouse/ClickHouse/pull/73651 --- .../Storages/Output/ParquetOutputFormatFile.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp b/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp index f3ac41c19a5b..06a572dd5794 100644 --- a/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp +++ b/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp @@ -25,7 +25,7 @@ # include # include # include -# include +# include namespace local_engine { @@ -46,6 +46,13 @@ OutputFormatFile::OutputFormatPtr ParquetOutputFormatFile::createOutputFormat(co auto new_header = createHeaderWithPreferredSchema(header); // TODO: align all spark parquet config with ch parquet config auto format_settings = DB::getFormatSettings(context); + + /// ClickHouse has always used Snappy compression for Parquet by default, which is similar to Spark. + /// As a result, we have not supported set the codec. However after https://github.com/ClickHouse/ClickHouse/pull/73651, + /// output_format_compression_level will be set to 3, which is wrong, since snappy does not support it. + format_settings.parquet.output_compression_method = DB::FormatSettings::ParquetCompression::SNAPPY; + format_settings.parquet.output_compression_level = arrow::util::kUseDefaultCompressionLevel; + auto output_format = std::make_shared(*(res->write_buffer), new_header, format_settings); res->output = output_format; return res; From cb9df2b02a97f52131ce7b561f2bfd4cb8574e1f Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Thu, 16 Jan 2025 13:06:42 +0800 Subject: [PATCH 3/5] In C++, The declaration order determines the construction order: member variables are constructed in the order they are declared, and the order in the initialization list does not affect this. The destruction order is the reverse of the construction order: the last constructed member is the first to be destructed. since output depends on write_buffer, we need declare write_buffer first. --- cpp-ch/local-engine/Storages/Output/OutputFormatFile.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h b/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h index 915f9a7e7efa..5c8fa300d4d5 100644 --- a/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h +++ b/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h @@ -29,8 +29,8 @@ class OutputFormatFile public: struct OutputFormat { - DB::OutputFormatPtr output; std::unique_ptr write_buffer; + DB::OutputFormatPtr output; void finalizeOutput() const { output->finalize(); From dc1ba83da8eb3f9e69f41e208d3c7df4ad37ae58 Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Thu, 16 Jan 2025 16:58:11 +0800 Subject: [PATCH 4/5] Another way to fix https://github.com/ClickHouse/ClickHouse/pull/73651, this also fix "write into hdfs in spark 3.5" --- .../clickhouse/CHIteratorApi.scala | 8 +-- .../clickhouse/RuntimeSettings.scala | 11 ++++ .../execution/CHColumnarWriteFilesExec.scala | 1 - .../GlutenClickHouseExcelFormatSuite.scala | 56 +++++++++++++++++-- cpp-ch/local-engine/Common/CHUtil.cpp | 22 ++++++-- .../Storages/Output/NormalFileWriter.h | 12 +++- .../Storages/Output/OutputFormatFile.h | 5 ++ .../Output/ParquetOutputFormatFile.cpp | 8 --- 8 files changed, 95 insertions(+), 28 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index c6ec95ded55f..0b20e5aeeabb 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -106,12 +106,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { updateInputMetrics, updateInputMetrics.map(_ => context.taskMetrics().inputMetrics).orNull) - context.addTaskFailureListener( - (ctx, _) => { - if (ctx.isInterrupted()) { - iter.cancel() - } - }) + context.addTaskFailureListener((ctx, _) => { iter.cancel() }) + context.addTaskCompletionListener[Unit](_ => iter.close()) new CloseableCHColumnBatchIterator(iter, Some(pipelineTime)) } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala index 4365b2987c2b..412ce145ed34 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala @@ -36,6 +36,17 @@ object RuntimeSettings { .doc("https://clickhouse.com/docs/en/operations/settings/query-complexity#settings-max_bytes_before_external_sort") .longConf .createWithDefault(0) + + // TODO: support check value + val OUTPUT_FORMAT_COMPRESSION_LEVEL = + buildConf(runtimeSettings("output_format_compression_level")) + .doc(s"""https://clickhouse.com/docs/en/operations/settings/settings#output_format_compression_level + | Notes: we always use Snappy compression, and Snappy doesn't support compression level. + | Currently, we ONLY set it in UT. + |""".stripMargin) + .longConf + .createWithDefault(Integer.MIN_VALUE & 0xffffffffL) + // .checkValue(v => v >= 0, "COMPRESSION LEVEL must be greater than 0") // scalastyle:on line.size.limit /** Gluten Configuration */ diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala index d43d23bfc8d8..efc7138fe9ca 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala @@ -104,7 +104,6 @@ class CHColumnarWriteFilesRDD( * otherwise, we need to access ColumnarBatch row by row, which is not efficient. */ val writeResults = CHExecUtil.c2r(resultColumnarBatch).map(_.copy()).toSeq - // TODO: we need close iterator here before processing the result. // TODO: task commit time // TODO: get the schema from result ColumnarBatch and verify it. assert(!iter.hasNext) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala index 76afd602cefa..7c778483138d 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala @@ -16,10 +16,11 @@ */ package org.apache.gluten.execution -import org.apache.gluten.backendsapi.clickhouse.CHConf +import org.apache.gluten.backendsapi.clickhouse.{CHConf, RuntimeSettings} import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.exception.GlutenException -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{functions, DataFrame, Row} import org.apache.spark.sql.execution.LocalTableScanExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -1464,16 +1465,14 @@ class GlutenClickHouseExcelFormatSuite fileName } - /** TODO: fix the issue and test in spark 3.5 */ - testSparkVersionLE33("write into hdfs") { + test("write into hdfs") { /** * There is a bug in pipeline write to HDFS; when a pipeline returns column batch, it doesn't * close the hdfs file, and hence the file is not flushed.HDFS file is closed when LocalExecutor * is destroyed, but before that, the file moved by spark committer. */ - val tableName = "write_into_hdfs" - val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/$tableName/" + val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/write_into_hdfs/" val format = "parquet" val sql = s""" @@ -1485,4 +1484,49 @@ class GlutenClickHouseExcelFormatSuite testFileFormatBase(tablePath, format, sql, df => {}) } } + + // TODO: pass spark configuration to FileFormatWriter in Spark 3.3 and 3.2 + testWithSpecifiedSparkVersion("write failed if set wrong snappy compression codec level", Some("3.5")) { + // TODO: remove duplicated test codes + val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/failed_test/" + val format = "parquet" + val sql = + s""" + | select * + | from $format.`$tablePath` + | where long_field > 30 + |""".stripMargin + + withSQLConf( + (GlutenConfig.NATIVE_WRITER_ENABLED.key, "true"), + ( + RuntimeSettings.OUTPUT_FORMAT_COMPRESSION_LEVEL.key, + RuntimeSettings.OUTPUT_FORMAT_COMPRESSION_LEVEL.defaultValue.get.toString) + ) { + testFileFormatBase(tablePath, format, sql, df => {}) + } + + /// we can't pass the configuration to FileFormatWriter in Spark 3.3 and 3.2 + withSQLConf( + (GlutenConfig.NATIVE_WRITER_ENABLED.key, "true"), + (RuntimeSettings.OUTPUT_FORMAT_COMPRESSION_LEVEL.key, "3") + ) { + val sparkError = intercept[SparkException] { + testFileFormatBase(tablePath, format, sql, df => {}) + } + + // throw at org.apache.spark.sql.execution.CHColumnarWriteFilesRDD + val causeOuter = sparkError.getCause + assert(causeOuter.isInstanceOf[SparkException]) + assert(causeOuter.getMessage.contains("Task failed while writing rows to output path: hdfs")) + + // throw at the writing file + val causeInner = causeOuter.getCause + assert(causeInner.isInstanceOf[GlutenException]) + assert( + causeInner.getMessage.contains( + "Invalid: Codec 'snappy' doesn't support setting a compression level")) + } + + } } diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index b2dfa3cf449d..37d1a1bbd717 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -58,6 +58,7 @@ #include #include #include +#include #include #include #include @@ -91,6 +92,7 @@ extern const SettingsDouble max_bytes_ratio_before_external_sort; extern const SettingsBool query_plan_merge_filters; extern const SettingsBool compile_expressions; extern const SettingsShortCircuitFunctionEvaluation short_circuit_function_evaluation; +extern const SettingsUInt64 output_format_compression_level; } namespace ErrorCodes { @@ -640,23 +642,31 @@ void BackendInitializerUtil::initSettings(const SparkConfigs::ConfigMap & spark_ settings.set("input_format_parquet_enable_row_group_prefetch", false); settings.set("output_format_parquet_use_custom_encoder", false); + //1. // TODO: we need set Setting::max_threads to 1 by default, but now we can't get correct metrics for the some query if we set it to 1. // settings[Setting::max_threads] = 1; - /// Set false after https://github.com/ClickHouse/ClickHouse/pull/71539 - /// if true, we can't get correct metrics for the query + /// 2. After https://github.com/ClickHouse/ClickHouse/pull/71539 + /// Set false to query_plan_merge_filters. + /// If true, we can't get correct metrics for the query settings[Setting::query_plan_merge_filters] = false; + /// 3. After https://github.com/ClickHouse/ClickHouse/pull/70598. + /// Set false to compile_expressions to avoid dead loop. + /// TODO: FIXME set true again. /// We now set BuildQueryPipelineSettings according to config. - // TODO: FIXME. Set false after https://github.com/ClickHouse/ClickHouse/pull/70598. settings[Setting::compile_expressions] = false; settings[Setting::short_circuit_function_evaluation] = ShortCircuitFunctionEvaluation::DISABLE; - /// - // After https://github.com/ClickHouse/ClickHouse/pull/73422 - // Since we already set max_bytes_before_external_sort, set max_bytes_ratio_before_external_sort to 0 + /// 4. After https://github.com/ClickHouse/ClickHouse/pull/73422 + /// Since we already set max_bytes_before_external_sort, set max_bytes_ratio_before_external_sort to 0 settings[Setting::max_bytes_ratio_before_external_sort] = 0.; + /// 5. After https://github.com/ClickHouse/ClickHouse/pull/73651. + /// See following settings, we always use Snappy compression for Parquet, however after https://github.com/ClickHouse/ClickHouse/pull/73651, + /// output_format_compression_level is set to 3, which is wrong, since snappy does not support it. + settings[Setting::output_format_compression_level] = arrow::util::kUseDefaultCompressionLevel; + for (const auto & [key, value] : spark_conf_map) { // Firstly apply spark.gluten.sql.columnar.backend.ch.runtime_config.local_engine.settings.* to settings diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h index 9e848f82d782..d12ccaae6062 100644 --- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h +++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h @@ -371,14 +371,24 @@ class SubstraitFileSink final : public DB::SinkToStorage } void onFinish() override { - if (output_format_) [[unlikely]] + if (output_format_) { output_format_->finalizeOutput(); + /// We need close reset output_format_ here before return to spark, because the file is closed in ~WriteBufferFromHDFSImpl(). + /// So that Spark Commit protocol can move the file safely. + output_format_.reset(); assert(delta_stats_.row_count > 0); if (stats_) stats_->collectStats(relative_path_, partition_id_, delta_stats_); } } + void onCancel() noexcept override + { + if (output_format_) { + output_format_->cancel(); + output_format_.reset(); + } + } }; class SparkPartitionedBaseSink : public DB::PartitionedSink diff --git a/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h b/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h index 5c8fa300d4d5..a2a7ab052e02 100644 --- a/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h +++ b/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h @@ -37,6 +37,11 @@ class OutputFormatFile output->flush(); write_buffer->finalize(); } + void cancel() + { + output.reset(); + write_buffer->finalize(); + } }; using OutputFormatPtr = std::shared_ptr; diff --git a/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp b/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp index 06a572dd5794..684c433db3e3 100644 --- a/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp +++ b/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp @@ -25,7 +25,6 @@ # include # include # include -# include namespace local_engine { @@ -46,13 +45,6 @@ OutputFormatFile::OutputFormatPtr ParquetOutputFormatFile::createOutputFormat(co auto new_header = createHeaderWithPreferredSchema(header); // TODO: align all spark parquet config with ch parquet config auto format_settings = DB::getFormatSettings(context); - - /// ClickHouse has always used Snappy compression for Parquet by default, which is similar to Spark. - /// As a result, we have not supported set the codec. However after https://github.com/ClickHouse/ClickHouse/pull/73651, - /// output_format_compression_level will be set to 3, which is wrong, since snappy does not support it. - format_settings.parquet.output_compression_method = DB::FormatSettings::ParquetCompression::SNAPPY; - format_settings.parquet.output_compression_level = arrow::util::kUseDefaultCompressionLevel; - auto output_format = std::make_shared(*(res->write_buffer), new_header, format_settings); res->output = output_format; return res; From 9ebe2790bf37f8f458240dbdf669038a284845a3 Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Thu, 16 Jan 2025 17:45:38 +0800 Subject: [PATCH 5/5] fix style --- .../gluten/execution/GlutenClickHouseExcelFormatSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala index 7c778483138d..495e8961500c 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala @@ -1486,7 +1486,9 @@ class GlutenClickHouseExcelFormatSuite } // TODO: pass spark configuration to FileFormatWriter in Spark 3.3 and 3.2 - testWithSpecifiedSparkVersion("write failed if set wrong snappy compression codec level", Some("3.5")) { + testWithSpecifiedSparkVersion( + "write failed if set wrong snappy compression codec level", + Some("3.5")) { // TODO: remove duplicated test codes val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/failed_test/" val format = "parquet" @@ -1506,7 +1508,7 @@ class GlutenClickHouseExcelFormatSuite testFileFormatBase(tablePath, format, sql, df => {}) } - /// we can't pass the configuration to FileFormatWriter in Spark 3.3 and 3.2 + // we can't pass the configuration to FileFormatWriter in Spark 3.3 and 3.2 withSQLConf( (GlutenConfig.NATIVE_WRITER_ENABLED.key, "true"), (RuntimeSettings.OUTPUT_FORMAT_COMPRESSION_LEVEL.key, "3")