From 673837a6f05c582a74cb5fd2aa98d178d3af2b7c Mon Sep 17 00:00:00 2001 From: jlf <1251489546@qq.com> Date: Tue, 8 Apr 2025 15:13:17 +0800 Subject: [PATCH 1/4] [CH][draft] iceberg UT --- backends-clickhouse/pom.xml | 111 ++++ .../execution/iceberg/TestFlinkUpsert.java | 538 ++++++++++++++++++ .../ClickHouseIcebergHiveTableSupport.scala | 101 ++++ .../Storages/SubstraitSource/FileReader.cpp | 4 +- 4 files changed, 752 insertions(+), 2 deletions(-) create mode 100644 backends-clickhouse/src-iceberg/test/java/org/apache/gluten/execution/iceberg/TestFlinkUpsert.java create mode 100644 backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml index 8a20a0a2cb06..a8f4a25b7b41 100644 --- a/backends-clickhouse/pom.xml +++ b/backends-clickhouse/pom.xml @@ -48,6 +48,9 @@ false + + 1.16.2 + org.apache.gluten @@ -64,6 +67,103 @@ org.scala-lang.modules scala-collection-compat_${scala.binary.version} + + org.apache.iceberg + iceberg-api + ${iceberg.version} + test-jar + test + + + org.apache.iceberg + iceberg-flink-runtime-1.16 + ${iceberg.version} + provided + + + org.apache.iceberg + iceberg-flink-1.16 + ${iceberg.version} + test-jar + test + + + org.apache.iceberg + iceberg-hive-metastore + ${iceberg.version} + test-jar + test + + + + org.junit.jupiter + junit-jupiter + test + 5.10.1 + + + org.junit.jupiter + junit-jupiter-engine + test + 5.10.1 + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-table-api-bridge-base + ${flink.version} + provided + + + org.apache.flink + flink-table-api-java + ${flink.version} + provided + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + provided + + + org.apache.flink + flink-test-utils + ${flink.version} + provided + + + + org.apache.flink + flink-table-planner_2.12 + ${flink.version} + provided + + + org.scala-lang + scala-library + + + + + org.apache.iceberg + iceberg-spark-${sparkbundle.version}_${scala.binary.version} + ${iceberg.version} + test-jar + test + + + org.apache.iceberg + iceberg-data + ${iceberg.version} + test-jar + test + @@ -551,6 +651,17 @@ true + + org.apache.maven.plugins + maven-surefire-plugin + + + **/*Test.java + **/Test*.java + + false + + diff --git a/backends-clickhouse/src-iceberg/test/java/org/apache/gluten/execution/iceberg/TestFlinkUpsert.java b/backends-clickhouse/src-iceberg/test/java/org/apache/gluten/execution/iceberg/TestFlinkUpsert.java new file mode 100644 index 000000000000..ec7503bb1a2a --- /dev/null +++ b/backends-clickhouse/src-iceberg/test/java/org/apache/gluten/execution/iceberg/TestFlinkUpsert.java @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution.iceberg; + +import java.time.LocalDate; +import java.time.ZoneId; +import java.util.Date; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.flink.CatalogTestBase; +import org.apache.iceberg.flink.MiniClusterResource; +import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; + +public class TestFlinkUpsert extends CatalogTestBase { + + @Parameter(index = 2) + private FileFormat format; + + @Parameter(index = 3) + private boolean isStreamingJob; + + private final Map tableUpsertProps = Maps.newHashMap(); + private TableEnvironment tEnv; + private SparkSession spark; + private ClickHouseIcebergHiveTableSupport hiveTableSupport; + + @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}") + public static List parameters() { + List parameters = Lists.newArrayList(); + // ignore orc, it is not supported by ch backend + for (FileFormat format : new FileFormat[] {FileFormat.PARQUET, FileFormat.AVRO}) { + for (Boolean isStreaming : new Boolean[] {true, false}) { + // Only test with one catalog as this is a file operation concern. + // FlinkCatalogTestBase requires the catalog name start with testhadoop if using hadoop + // catalog. + String catalogName = "testhive"; + Namespace baseNamespace = Namespace.empty(); + parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming}); + } + } + return parameters; + } + + @Override + protected TableEnvironment getTableEnv() { + if (tEnv == null) { + synchronized (this) { + EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings.newInstance(); + if (isStreamingJob) { + settingsBuilder.inStreamingMode(); + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment( + MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG); + env.enableCheckpointing(400); + env.setMaxParallelism(2); + env.setParallelism(2); + tEnv = StreamTableEnvironment.create(env, settingsBuilder.build()); + } else { + settingsBuilder.inBatchMode(); + tEnv = TableEnvironment.create(settingsBuilder.build()); + } + } + } + return tEnv; + } + + @Override + @BeforeEach + public void before() { + super.before(); + sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase); + sql("USE CATALOG %s", catalogName); + sql("USE %s", DATABASE); + tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2"); + tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true"); + tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name()); + + hiveTableSupport = new ClickHouseIcebergHiveTableSupport(); + hiveTableSupport.initSparkConf( + hiveConf.get("hive.metastore.uris"), + catalogName, + String.format("file://%s", this.warehouseRoot())); + hiveTableSupport.initializeSession(); + spark = hiveTableSupport.spark(); + } + + @Override + @AfterEach + public void clean() { + sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + super.clean(); + + hiveTableSupport.clean(); + } + + static String toWithClause(Map props) { + StringBuilder builder = new StringBuilder(); + builder.append("("); + int propCount = 0; + for (Map.Entry entry : props.entrySet()) { + if (propCount > 0) { + builder.append(","); + } + builder + .append("'") + .append(entry.getKey()) + .append("'") + .append("=") + .append("'") + .append(entry.getValue()) + .append("'"); + propCount++; + } + builder.append(")"); + return builder.toString(); + } + + @TestTemplate + public void testUpsertAndQuery() { + String tableName = "test_upsert_query"; + LocalDate dt20220301 = LocalDate.of(2022, 3, 1); + LocalDate dt20220302 = LocalDate.of(2022, 3, 2); + Date dt20220301Spark = + Date.from(dt20220301.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()); + Date dt20220302Spark = + Date.from(dt20220302.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()); + + sql( + "CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, dt DATE, " + + "PRIMARY KEY(id,dt) NOT ENFORCED) " + + "PARTITIONED BY (dt) WITH %s", + tableName, toWithClause(tableUpsertProps)); + + try { + sql( + "INSERT INTO %s VALUES " + + "(1, 'Bill', DATE '2022-03-01')," + + "(1, 'Jane', DATE '2022-03-01')," + + "(2, 'Jane', DATE '2022-03-01')", + tableName); + + sql( + "INSERT INTO %s VALUES " + + "(2, 'Bill', DATE '2022-03-01')," + + "(1, 'Jane', DATE '2022-03-02')," + + "(2, 'Jane', DATE '2022-03-02')", + tableName); + + List rowsOn20220301 = + Lists.newArrayList(Row.of(1, "Jane", dt20220301), Row.of(2, "Bill", dt20220301)); + TestHelpers.assertRows( + sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName), rowsOn20220301); + List rowsOn20220301Spark = + Lists.newArrayList( + Row.of(1, "Jane", dt20220301Spark), Row.of(2, "Bill", dt20220301Spark)); + TestHelpers.assertRows( + convertToFlinkRows( + spark.sql( + String.format( + Locale.ROOT, + "SELECT * FROM %s.db.%s WHERE dt < '2022-03-02'", + catalogName, + tableName)), + 3), + rowsOn20220301Spark); + + List rowsOn20220302 = + Lists.newArrayList(Row.of(1, "Jane", dt20220302), Row.of(2, "Jane", dt20220302)); + TestHelpers.assertRows( + sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName), rowsOn20220302); + List rowsOn20220302Spark = + Lists.newArrayList( + Row.of(1, "Jane", dt20220302Spark), Row.of(2, "Jane", dt20220302Spark)); + TestHelpers.assertRows( + convertToFlinkRows( + spark.sql( + String.format( + Locale.ROOT, + "SELECT * FROM %s.db.%s WHERE dt = '2022-03-02'", + catalogName, + tableName)), + 3), + rowsOn20220302Spark); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Iterables.concat(rowsOn20220301, rowsOn20220302))); + TestHelpers.assertRows( + convertToFlinkRows( + spark.sql( + String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", catalogName, tableName)), + 3), + Lists.newArrayList(Iterables.concat(rowsOn20220301Spark, rowsOn20220302Spark))); + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } + } + + private List convertToFlinkRows(Dataset rows, int columnCount) { + return rows.collectAsList().stream() + .map( + r -> { + switch (columnCount) { + case 1: + return Row.of(r.get(0)); + case 2: + return Row.of(r.get(0), r.get(1)); + case 3: + return Row.of(r.get(0), r.get(1), r.get(2)); + default: + throw new IllegalArgumentException("Unsupported column count: " + columnCount); + } + }) + .collect(Collectors.toList()); + } + + @TestTemplate + public void testUpsertOptions() { + String tableName = "test_upsert_options"; + LocalDate dt20220301 = LocalDate.of(2022, 3, 1); + LocalDate dt20220302 = LocalDate.of(2022, 3, 2); + Date dt20220301Spark = + Date.from(dt20220301.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()); + Date dt20220302Spark = + Date.from(dt20220302.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()); + + Map optionsUpsertProps = Maps.newHashMap(tableUpsertProps); + optionsUpsertProps.remove(TableProperties.UPSERT_ENABLED); + sql( + "CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, dt DATE, " + + "PRIMARY KEY(id,dt) NOT ENFORCED) " + + "PARTITIONED BY (dt) WITH %s", + tableName, toWithClause(optionsUpsertProps)); + + try { + sql( + "INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ VALUES " + + "(1, 'Bill', DATE '2022-03-01')," + + "(1, 'Jane', DATE '2022-03-01')," + + "(2, 'Jane', DATE '2022-03-01')", + tableName); + + sql( + "INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ VALUES " + + "(2, 'Bill', DATE '2022-03-01')," + + "(1, 'Jane', DATE '2022-03-02')," + + "(2, 'Jane', DATE '2022-03-02')", + tableName); + + List rowsOn20220301 = + Lists.newArrayList(Row.of(1, "Jane", dt20220301), Row.of(2, "Bill", dt20220301)); + TestHelpers.assertRows( + sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName), rowsOn20220301); + List rowsOn20220301Spark = + Lists.newArrayList( + Row.of(1, "Jane", dt20220301Spark), Row.of(2, "Bill", dt20220301Spark)); + TestHelpers.assertRows( + convertToFlinkRows( + spark.sql( + String.format( + Locale.ROOT, + "SELECT * FROM %s.db.%s WHERE dt < '2022-03-02'", + catalogName, + tableName)), + 3), + rowsOn20220301Spark); + + List rowsOn20220302 = + Lists.newArrayList(Row.of(1, "Jane", dt20220302), Row.of(2, "Jane", dt20220302)); + TestHelpers.assertRows( + sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName), rowsOn20220302); + List rowsOn20220302Spark = + Lists.newArrayList( + Row.of(1, "Jane", dt20220302Spark), Row.of(2, "Jane", dt20220302Spark)); + TestHelpers.assertRows( + convertToFlinkRows( + spark.sql( + String.format( + Locale.ROOT, + "SELECT * FROM %s.db.%s WHERE dt = '2022-03-02'", + catalogName, + tableName)), + 3), + rowsOn20220302Spark); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Iterables.concat(rowsOn20220301, rowsOn20220302))); + TestHelpers.assertRows( + convertToFlinkRows( + spark.sql( + String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", catalogName, tableName)), + 3), + Lists.newArrayList(Iterables.concat(rowsOn20220301Spark, rowsOn20220302Spark))); + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } + } + + @TestTemplate + public void testPrimaryKeyEqualToPartitionKey() { + // This is an SQL based reproduction of TestFlinkIcebergSinkV2#testUpsertOnDataKey + String tableName = "upsert_on_id_key"; + try { + sql( + "CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, PRIMARY KEY(id) NOT ENFORCED) " + + "PARTITIONED BY (id) WITH %s", + tableName, toWithClause(tableUpsertProps)); + + sql("INSERT INTO %s VALUES " + "(1, 'Bill')," + "(1, 'Jane')," + "(2, 'Bill')", tableName); + + List rows = Lists.newArrayList(Row.of(1, "Jane"), Row.of(2, "Bill")); + TestHelpers.assertRows(sql("SELECT * FROM %s", tableName), rows); + TestHelpers.assertRows( + convertToFlinkRows( + spark.sql( + String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", catalogName, tableName)), + 2), + rows); + + sql("INSERT INTO %s VALUES " + "(1, 'Bill')," + "(2, 'Jane')", tableName); + + List rows2 = Lists.newArrayList(Row.of(1, "Bill"), Row.of(2, "Jane")); + TestHelpers.assertRows(sql("SELECT * FROM %s", tableName), rows2); + spark.sql(String.format(Locale.ROOT, "REFRESH TABLE %s.db.%s", catalogName, tableName)); + TestHelpers.assertRows( + convertToFlinkRows( + spark.sql( + String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", catalogName, tableName)), + 2), + rows2); + + sql("INSERT INTO %s VALUES " + "(3, 'Bill')," + "(4, 'Jane')", tableName); + + List rows3 = + Lists.newArrayList( + Row.of(1, "Bill"), Row.of(2, "Jane"), Row.of(3, "Bill"), Row.of(4, "Jane")); + TestHelpers.assertRows(sql("SELECT * FROM %s", tableName), rows3); + spark.sql(String.format(Locale.ROOT, "REFRESH TABLE %s.db.%s", catalogName, tableName)); + TestHelpers.assertRows( + convertToFlinkRows( + spark.sql( + String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", catalogName, tableName)), + 2), + rows3); + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } + } + + @TestTemplate + public void testPrimaryKeyFieldsAtBeginningOfSchema() { + String tableName = "upsert_on_pk_at_schema_start"; + LocalDate dt = LocalDate.of(2022, 3, 1); + Date dtSpark = Date.from(dt.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()); + + try { + sql( + "CREATE TABLE %s(id INT, dt DATE NOT NULL, name STRING NOT NULL, " + + "PRIMARY KEY(id,dt) NOT ENFORCED) " + + "PARTITIONED BY (dt) WITH %s", + tableName, toWithClause(tableUpsertProps)); + + sql( + "INSERT INTO %s VALUES " + + "(1, DATE '2022-03-01', 'Andy')," + + "(1, DATE '2022-03-01', 'Bill')," + + "(2, DATE '2022-03-01', 'Jane')", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of(1, dt, "Bill"), Row.of(2, dt, "Jane"))); + TestHelpers.assertRows( + convertToFlinkRows( + spark.sql( + String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", catalogName, tableName)), + 3), + Lists.newArrayList(Row.of(1, dtSpark, "Bill"), Row.of(2, dtSpark, "Jane"))); + + sql( + "INSERT INTO %s VALUES " + + "(1, DATE '2022-03-01', 'Jane')," + + "(2, DATE '2022-03-01', 'Bill')", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of(1, dt, "Jane"), Row.of(2, dt, "Bill"))); + spark.sql(String.format(Locale.ROOT, "REFRESH TABLE %s.db.%s", catalogName, tableName)); + TestHelpers.assertRows( + convertToFlinkRows( + spark.sql( + String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", catalogName, tableName)), + 3), + Lists.newArrayList(Row.of(1, dtSpark, "Jane"), Row.of(2, dtSpark, "Bill"))); + + sql( + "INSERT INTO %s VALUES " + + "(3, DATE '2022-03-01', 'Duke')," + + "(4, DATE '2022-03-01', 'Leon')", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList( + Row.of(1, dt, "Jane"), + Row.of(2, dt, "Bill"), + Row.of(3, dt, "Duke"), + Row.of(4, dt, "Leon"))); + spark.sql(String.format(Locale.ROOT, "REFRESH TABLE %s.db.%s", catalogName, tableName)); + TestHelpers.assertRows( + convertToFlinkRows( + spark.sql( + String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", catalogName, tableName)), + 3), + Lists.newArrayList( + Row.of(1, dtSpark, "Jane"), + Row.of(2, dtSpark, "Bill"), + Row.of(3, dtSpark, "Duke"), + Row.of(4, dtSpark, "Leon"))); + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } + } + + @TestTemplate + public void testPrimaryKeyFieldsAtEndOfTableSchema() { + // This is the same test case as testPrimaryKeyFieldsAtBeginningOfSchema, but the primary key + // fields + // are located at the end of the flink schema. + String tableName = "upsert_on_pk_at_schema_end"; + LocalDate dt = LocalDate.of(2022, 3, 1); + Date dtSpark = Date.from(dt.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()); + try { + sql( + "CREATE TABLE %s(name STRING NOT NULL, id INT, dt DATE NOT NULL, " + + "PRIMARY KEY(id,dt) NOT ENFORCED) " + + "PARTITIONED BY (dt) WITH %s", + tableName, toWithClause(tableUpsertProps)); + + sql( + "INSERT INTO %s VALUES " + + "('Andy', 1, DATE '2022-03-01')," + + "('Bill', 1, DATE '2022-03-01')," + + "('Jane', 2, DATE '2022-03-01')", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of("Bill", 1, dt), Row.of("Jane", 2, dt))); + TestHelpers.assertRows( + convertToFlinkRows( + spark.sql( + String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", catalogName, tableName)), + 3), + Lists.newArrayList(Row.of("Bill", 1, dtSpark), Row.of("Jane", 2, dtSpark))); + + sql( + "INSERT INTO %s VALUES " + + "('Jane', 1, DATE '2022-03-01')," + + "('Bill', 2, DATE '2022-03-01')", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of("Jane", 1, dt), Row.of("Bill", 2, dt))); + spark.sql(String.format(Locale.ROOT, "REFRESH TABLE %s.db.%s", catalogName, tableName)); + TestHelpers.assertRows( + convertToFlinkRows( + spark.sql( + String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", catalogName, tableName)), + 3), + Lists.newArrayList(Row.of("Jane", 1, dtSpark), Row.of("Bill", 2, dtSpark))); + + sql( + "INSERT INTO %s VALUES " + + "('Duke', 3, DATE '2022-03-01')," + + "('Leon', 4, DATE '2022-03-01')", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList( + Row.of("Jane", 1, dt), + Row.of("Bill", 2, dt), + Row.of("Duke", 3, dt), + Row.of("Leon", 4, dt))); + spark.sql(String.format(Locale.ROOT, "REFRESH TABLE %s.db.%s", catalogName, tableName)); + TestHelpers.assertRows( + convertToFlinkRows( + spark.sql( + String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", catalogName, tableName)), + 3), + Lists.newArrayList( + Row.of("Jane", 1, dtSpark), + Row.of("Bill", 2, dtSpark), + Row.of("Duke", 3, dtSpark), + Row.of("Leon", 4, dtSpark))); + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } + } +} diff --git a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala new file mode 100644 index 000000000000..c9e6a8ee5eef --- /dev/null +++ b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gluten.execution.iceberg + +import com.google.common.base.Strings +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig + +class ClickHouseIcebergHiveTableSupport { + + private val sparkConf: SparkConf = new SparkConf() + + private var _hiveSpark: SparkSession = _ + + def spark: SparkSession = _hiveSpark + + def initSparkConf(url: String, catalog: String, path: String): SparkConf = { + import org.apache.gluten.backendsapi.clickhouse.CHConfig._ + + sparkConf + .set("spark.plugins", "org.apache.gluten.GlutenPlugin") + .set("spark.memory.offHeap.enabled", "true") + .set("spark.memory.offHeap.size", "536870912") + .set("spark.sql.catalogImplementation", "hive") + .set("spark.sql.adaptive.enabled", "true") + .set("spark.sql.files.maxPartitionBytes", "1g") + .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") + .set("spark.sql.shuffle.partitions", "5") + .set("spark.sql.adaptive.enabled", "false") + .set("spark.sql.files.minPartitionNum", "1") + .set(ClickHouseConfig.CLICKHOUSE_WORKER_ID, "1") + .set("spark.gluten.sql.columnar.iterator", "true") + .set("spark.gluten.sql.columnar.hashagg.enablefinal", "true") + .set("spark.gluten.sql.enable.native.validation", "false") + .set("spark.gluten.sql.parquet.maxmin.index", "true") + .set("spark.hive.exec.dynamic.partition.mode", "nonstrict") + .set("spark.gluten.supported.hive.udfs", "my_add") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.sql.adaptive.enabled", "true") + .set("spark.sql.shuffle.partitions", "2") + .set("spark.memory.offHeap.size", "2g") + .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .setCHConfig("use_local_format", true) + .set("spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") + .set("spark.sql.catalog.spark_catalog.type", "hive") + .setMaster("local[*]") + if (!Strings.isNullOrEmpty(url)) { + sparkConf.set("spark.hadoop.hive.metastore.uris", url) + } + if (!Strings.isNullOrEmpty(catalog)) { + sparkConf.set("spark.sql.catalog." + catalog, "org.apache.iceberg.spark.SparkCatalog") + .set("spark.sql.catalog." + catalog + ".type", "hive") + } + if (!Strings.isNullOrEmpty(path)) { + sparkConf.set("spark.sql.warehouse.dir", path) + } + sparkConf + } + + def initializeSession(): Unit = { + if (_hiveSpark == null) { + _hiveSpark = + SparkSession + .builder() + .config(sparkConf) + .enableHiveSupport() + .getOrCreate() + } + } + + def clean(): Unit = { + try { + if (_hiveSpark != null) { + _hiveSpark.stop() + _hiveSpark = null + } + } finally { + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + } + } +} diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp index fedd043500dc..dfbbe4267db2 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp @@ -59,12 +59,12 @@ DB::Columns BaseReader::addVirtualColumn(DB::Chunk dataChunk, size_t rowNum) con std::back_inserter(res_columns), [&](const auto & column) -> DB::ColumnPtr { - if (readHeader.has(column.name)) - return read_columns[readHeader.getPositionByName(column.name)]; if (auto it = normalized_partition_values.find(boost::to_lower_copy(column.name)); it != normalized_partition_values.end()) return createPartitionColumn(it->second, column.type, rows); if (file->fileMetaColumns().virtualColumn(column.name)) return file->fileMetaColumns().createMetaColumn(column.name, column.type, rows); + if (readHeader.has(column.name)) + return read_columns[readHeader.getPositionByName(column.name)]; throw DB::Exception( DB::ErrorCodes::LOGICAL_ERROR, "Not found column = {} when reading file: {}.", column.name, file->getURIPath()); }); From de46c98a08229f3ad9435a8b5d079cd587d02db1 Mon Sep 17 00:00:00 2001 From: jlf <1251489546@qq.com> Date: Mon, 14 Apr 2025 15:17:50 +0800 Subject: [PATCH 2/4] [CH][draft] add UT --- backends-clickhouse/pom.xml | 6 +- .../execution/iceberg/TestFlinkUpsert.java | 4 +- .../TestPositionDeletesTableGluten.java | 80 +++++++++++++++++++ 3 files changed, 87 insertions(+), 3 deletions(-) create mode 100644 backends-clickhouse/src-iceberg/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml index a8f4a25b7b41..0b35dc01535f 100644 --- a/backends-clickhouse/pom.xml +++ b/backends-clickhouse/pom.xml @@ -216,6 +216,10 @@ + + true + + org.apache.gluten @@ -659,7 +663,7 @@ **/*Test.java **/Test*.java - false + ${surefire.skipTests} diff --git a/backends-clickhouse/src-iceberg/test/java/org/apache/gluten/execution/iceberg/TestFlinkUpsert.java b/backends-clickhouse/src-iceberg/test/java/org/apache/gluten/execution/iceberg/TestFlinkUpsert.java index ec7503bb1a2a..429feab086f3 100644 --- a/backends-clickhouse/src-iceberg/test/java/org/apache/gluten/execution/iceberg/TestFlinkUpsert.java +++ b/backends-clickhouse/src-iceberg/test/java/org/apache/gluten/execution/iceberg/TestFlinkUpsert.java @@ -61,8 +61,8 @@ public class TestFlinkUpsert extends CatalogTestBase { @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}") public static List parameters() { List parameters = Lists.newArrayList(); - // ignore orc, it is not supported by ch backend - for (FileFormat format : new FileFormat[] {FileFormat.PARQUET, FileFormat.AVRO}) { + // ignore ORC and AVRO, ch backend only support PARQUET + for (FileFormat format : new FileFormat[] {FileFormat.PARQUET}) { for (Boolean isStreaming : new Boolean[] {true, false}) { // Only test with one catalog as this is a file operation concern. // FlinkCatalogTestBase requires the catalog name start with testhadoop if using hadoop diff --git a/backends-clickhouse/src-iceberg/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java b/backends-clickhouse/src-iceberg/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java new file mode 100644 index 000000000000..c13c2fea70c2 --- /dev/null +++ b/backends-clickhouse/src-iceberg/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gluten.execution.iceberg; + +import java.util.Map; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.TestHiveMetastore; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.source.TestPositionDeletesTable; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.BeforeClass; +import org.junit.runners.Parameterized; + +public class TestPositionDeletesTableGluten extends TestPositionDeletesTable { + private static final Map CATALOG_PROPS = + ImmutableMap.of("type", "hive", "default-namespace", "default", "cache-enabled", "false"); + private static ClickHouseIcebergHiveTableSupport hiveTableSupport; + + @BeforeClass + public static void startMetastoreAndSpark() { + metastore = new TestHiveMetastore(); + metastore.start(); + hiveConf = metastore.hiveConf(); + hiveTableSupport = new ClickHouseIcebergHiveTableSupport(); + hiveTableSupport.initSparkConf( + hiveConf.get("hive.metastore.uris"), SparkCatalogConfig.HIVE.catalogName(), null); + hiveTableSupport.initializeSession(); + spark = hiveTableSupport.spark(); + sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + catalog = + (HiveCatalog) + CatalogUtil.loadCatalog( + HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf); + + try { + catalog.createNamespace(Namespace.of(new String[] {"default"})); + } catch (AlreadyExistsException ignore) { + } + } + + @Parameterized.Parameters( + name = + "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}, fileFormat = {4}") + public static Object[][] parameters() { + // ignore ORC and AVRO, ch backend only support PARQUET + return new Object[][] { + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + CATALOG_PROPS, + FileFormat.PARQUET + } + }; + } + + public TestPositionDeletesTableGluten( + String catalogName, String implementation, Map config, FileFormat format) { + super(catalogName, implementation, config, format); + } +} From 0f9d2240fc68e3496703737368e9a3c4fbccdb6c Mon Sep 17 00:00:00 2001 From: jlf <1251489546@qq.com> Date: Wed, 16 Apr 2025 17:12:16 +0800 Subject: [PATCH 3/4] [CH][draft] run iceberg UT with diff spark version --- backends-clickhouse/pom.xml | 281 +++++++++++++++--- .../execution/iceberg/TestFlinkUpsert.java | 0 .../TestPositionDeletesTableGluten.java | 11 + .../ClickHouseIcebergHiveTableSupport.scala | 0 .../TestPositionDeletesTableGluten.java | 89 ++++++ .../ClickHouseIcebergHiveTableSupport.scala | 101 +++++++ 6 files changed, 444 insertions(+), 38 deletions(-) rename backends-clickhouse/{src-iceberg => src-spark33}/test/java/org/apache/gluten/execution/iceberg/TestFlinkUpsert.java (100%) rename backends-clickhouse/{src-iceberg => src-spark33}/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java (92%) rename backends-clickhouse/{src-iceberg => src-spark33}/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala (100%) create mode 100644 backends-clickhouse/src-spark35/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java create mode 100644 backends-clickhouse/src-spark35/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml index 0b35dc01535f..f095d499034c 100644 --- a/backends-clickhouse/pom.xml +++ b/backends-clickhouse/pom.xml @@ -43,6 +43,181 @@ + + spark-3.3 + + false + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-spark33-sources + generate-sources + + add-source + + + + ${project.basedir}/src-spark33/main/scala + ${project.basedir}src-spark33/main/java + + + + + add-spark33-resources + generate-resources + + add-resource + + + + + ${project.basedir}/src-spark33/main/resources + + + + + + add-spark33-test-sources + generate-test-sources + + add-test-source + + + + ${project.basedir}/src-spark33/test/scala + ${project.basedir}/src-spark33/test/java + + + + + add-spark33-test-resources + generate-test-resources + + add-test-resource + + + + + ${project.basedir}/src-spark33/test/resources + + + + + + + + + + + spark-3.5 + + 1.7.36 + + + false + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + provided + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + test + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + + com.google.guava + guava + ${guava.version} + provided + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-spark35-sources + generate-sources + + add-source + + + + ${project.basedir}/src-spark35/main/scala + ${project.basedir}src-spark35/main/java + + + + + add-spark35-resources + generate-resources + + add-resource + + + + + ${project.basedir}/src-spark35/main/resources + + + + + + add-spark35-test-sources + generate-test-sources + + add-test-source + + + + ${project.basedir}/src-spark35/test/scala + ${project.basedir}/src-spark35/test/java + + + + + add-spark35-test-resources + generate-test-resources + + add-test-resource + + + + + ${project.basedir}/src-spark35/test/resources + + + + + + + + + iceberg @@ -74,6 +249,12 @@ test-jar test + + org.apache.iceberg + iceberg-api + ${iceberg.version} + test + org.apache.iceberg iceberg-flink-runtime-1.16 @@ -87,6 +268,12 @@ test-jar test + + org.apache.iceberg + iceberg-flink-1.16 + ${iceberg.version} + test + org.apache.iceberg iceberg-hive-metastore @@ -94,6 +281,12 @@ test-jar test + + org.apache.iceberg + iceberg-hive-metastore + ${iceberg.version} + test + org.junit.jupiter @@ -157,6 +350,12 @@ test-jar test + + org.apache.iceberg + iceberg-spark-${sparkbundle.version}_${scala.binary.version} + ${iceberg.version} + test + org.apache.iceberg iceberg-data @@ -164,6 +363,12 @@ test-jar test + + org.apache.iceberg + iceberg-data + ${iceberg.version} + test + @@ -344,44 +549,44 @@ test - org.apache.hive.hcatalog - hive-hcatalog-core - 2.3.9 - test - - - org.pentaho - pentaho-aggdesigner-algorithm - - - net.minidev - json-smart - - - org.apache.hive - hive-exec - - - guava - com.google.guava - - - hadoop-common - org.apache.hadoop - - - hadoop-hdfs - org.apache.hadoop - - - protobuf-java - com.google.protobuf - - - jdk.tools - jdk.tools - - + org.apache.hive.hcatalog + hive-hcatalog-core + 2.3.9 + test + + + org.pentaho + pentaho-aggdesigner-algorithm + + + net.minidev + json-smart + + + org.apache.hive + hive-exec + + + guava + com.google.guava + + + hadoop-common + org.apache.hadoop + + + hadoop-hdfs + org.apache.hadoop + + + protobuf-java + com.google.protobuf + + + jdk.tools + jdk.tools + + org.apache.hadoop diff --git a/backends-clickhouse/src-iceberg/test/java/org/apache/gluten/execution/iceberg/TestFlinkUpsert.java b/backends-clickhouse/src-spark33/test/java/org/apache/gluten/execution/iceberg/TestFlinkUpsert.java similarity index 100% rename from backends-clickhouse/src-iceberg/test/java/org/apache/gluten/execution/iceberg/TestFlinkUpsert.java rename to backends-clickhouse/src-spark33/test/java/org/apache/gluten/execution/iceberg/TestFlinkUpsert.java diff --git a/backends-clickhouse/src-iceberg/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java b/backends-clickhouse/src-spark33/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java similarity index 92% rename from backends-clickhouse/src-iceberg/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java rename to backends-clickhouse/src-spark33/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java index c13c2fea70c2..103cdd7438db 100644 --- a/backends-clickhouse/src-iceberg/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java +++ b/backends-clickhouse/src-spark33/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java @@ -28,6 +28,7 @@ import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.iceberg.spark.source.TestPositionDeletesTable; import org.apache.spark.api.java.JavaSparkContext; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.runners.Parameterized; @@ -58,6 +59,16 @@ public static void startMetastoreAndSpark() { } } + @AfterClass + public static void stopMetastoreAndSpark() throws Exception { + catalog = null; + if (metastore != null) { + metastore.stop(); + metastore = null; + } + hiveTableSupport.clean(); + } + @Parameterized.Parameters( name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}, fileFormat = {4}") diff --git a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala b/backends-clickhouse/src-spark33/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala similarity index 100% rename from backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala rename to backends-clickhouse/src-spark33/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala diff --git a/backends-clickhouse/src-spark35/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java b/backends-clickhouse/src-spark35/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java new file mode 100644 index 000000000000..3421a094fae6 --- /dev/null +++ b/backends-clickhouse/src-spark35/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gluten.execution.iceberg; + +import java.util.Map; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.TestHiveMetastore; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.source.TestPositionDeletesTable; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith({ParameterizedTestExtension.class}) +public class TestPositionDeletesTableGluten extends TestPositionDeletesTable { + private static final Map CATALOG_PROPS = + ImmutableMap.of("type", "hive", "default-namespace", "default", "cache-enabled", "false"); + private static ClickHouseIcebergHiveTableSupport hiveTableSupport; + + @BeforeAll + public static void startMetastoreAndSpark() { + metastore = new TestHiveMetastore(); + metastore.start(); + hiveConf = metastore.hiveConf(); + hiveTableSupport = new ClickHouseIcebergHiveTableSupport(); + hiveTableSupport.initSparkConf( + hiveConf.get("hive.metastore.uris"), SparkCatalogConfig.HIVE.catalogName(), null); + hiveTableSupport.initializeSession(); + spark = hiveTableSupport.spark(); + sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + catalog = + (HiveCatalog) + CatalogUtil.loadCatalog( + HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf); + + try { + catalog.createNamespace(Namespace.of(new String[] {"default"})); + } catch (AlreadyExistsException ignore) { + } + } + + @AfterAll + public static void stopMetastoreAndSpark() throws Exception { + catalog = null; + if (metastore != null) { + metastore.stop(); + metastore = null; + } + hiveTableSupport.clean(); + } + + public TestPositionDeletesTableGluten() {} + + @Parameters(name = "catalogName = {1}, implementation = {2}, config = {3}, fileFormat = {4}") + public static Object[][] parameters() { + // ignore ORC and AVRO, ch backend only support PARQUET + return new Object[][] { + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + CATALOG_PROPS, + FileFormat.PARQUET + } + }; + } +} diff --git a/backends-clickhouse/src-spark35/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala b/backends-clickhouse/src-spark35/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala new file mode 100644 index 000000000000..c9e6a8ee5eef --- /dev/null +++ b/backends-clickhouse/src-spark35/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gluten.execution.iceberg + +import com.google.common.base.Strings +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig + +class ClickHouseIcebergHiveTableSupport { + + private val sparkConf: SparkConf = new SparkConf() + + private var _hiveSpark: SparkSession = _ + + def spark: SparkSession = _hiveSpark + + def initSparkConf(url: String, catalog: String, path: String): SparkConf = { + import org.apache.gluten.backendsapi.clickhouse.CHConfig._ + + sparkConf + .set("spark.plugins", "org.apache.gluten.GlutenPlugin") + .set("spark.memory.offHeap.enabled", "true") + .set("spark.memory.offHeap.size", "536870912") + .set("spark.sql.catalogImplementation", "hive") + .set("spark.sql.adaptive.enabled", "true") + .set("spark.sql.files.maxPartitionBytes", "1g") + .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") + .set("spark.sql.shuffle.partitions", "5") + .set("spark.sql.adaptive.enabled", "false") + .set("spark.sql.files.minPartitionNum", "1") + .set(ClickHouseConfig.CLICKHOUSE_WORKER_ID, "1") + .set("spark.gluten.sql.columnar.iterator", "true") + .set("spark.gluten.sql.columnar.hashagg.enablefinal", "true") + .set("spark.gluten.sql.enable.native.validation", "false") + .set("spark.gluten.sql.parquet.maxmin.index", "true") + .set("spark.hive.exec.dynamic.partition.mode", "nonstrict") + .set("spark.gluten.supported.hive.udfs", "my_add") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.sql.adaptive.enabled", "true") + .set("spark.sql.shuffle.partitions", "2") + .set("spark.memory.offHeap.size", "2g") + .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .setCHConfig("use_local_format", true) + .set("spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") + .set("spark.sql.catalog.spark_catalog.type", "hive") + .setMaster("local[*]") + if (!Strings.isNullOrEmpty(url)) { + sparkConf.set("spark.hadoop.hive.metastore.uris", url) + } + if (!Strings.isNullOrEmpty(catalog)) { + sparkConf.set("spark.sql.catalog." + catalog, "org.apache.iceberg.spark.SparkCatalog") + .set("spark.sql.catalog." + catalog + ".type", "hive") + } + if (!Strings.isNullOrEmpty(path)) { + sparkConf.set("spark.sql.warehouse.dir", path) + } + sparkConf + } + + def initializeSession(): Unit = { + if (_hiveSpark == null) { + _hiveSpark = + SparkSession + .builder() + .config(sparkConf) + .enableHiveSupport() + .getOrCreate() + } + } + + def clean(): Unit = { + try { + if (_hiveSpark != null) { + _hiveSpark.stop() + _hiveSpark = null + } + } finally { + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + } + } +} From 765948d7133b274447cf736474781728e641d313 Mon Sep 17 00:00:00 2001 From: jlf <1251489546@qq.com> Date: Tue, 22 Apr 2025 13:56:29 +0800 Subject: [PATCH 4/4] Fixed occasional UT anomalies --- .../GlutenClickHouseCacheBaseTestSuite.scala | 4 ++- .../apache/gluten/utils/CacheTestHelper.scala | 33 +++++++++---------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseCacheBaseTestSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseCacheBaseTestSuite.scala index bb72e34e3e19..47307cf2fa11 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseCacheBaseTestSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseCacheBaseTestSuite.scala @@ -37,7 +37,9 @@ abstract class GlutenClickHouseCacheBaseTestSuite override protected val queriesResults: String = rootPath + "queries-output" // Abstract methods to be implemented by subclasses - protected def cleanupCache(): Unit = cacheHelper.deleteCache(spark, tablesPath) + protected def cleanupCache(): Unit = + cacheHelper.deleteCache(spark, s"$tablesPath/lineitem", s"$tablesPath/$SPARK_DIR_NAME") + protected def copyDataIfNeeded(): Unit // Initialize the cache helper - accessible to subclasses diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/CacheTestHelper.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/CacheTestHelper.scala index 6cdc16e2a194..4b694086a905 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/CacheTestHelper.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/CacheTestHelper.scala @@ -45,22 +45,21 @@ class CacheTestHelper(val TMP_PREFIX: String) { } /** Delete cache files for all tables in the data path */ - def deleteCache(spark: SparkSession, dataPath: String): Unit = { - val targetFile = new Path(dataPath) - val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf()) - fs.listStatus(targetFile) - .foreach( - table => { - if (table.isDirectory) { - fs.listStatus(table.getPath) - .foreach( - data => { - if (data.isFile) { - CHNativeCacheManager - .removeFiles(data.getPath.toUri.getPath.substring(1), CACHE_NAME) - } - }) - } - }) + def deleteCache(spark: SparkSession, dataPaths: String*): Unit = { + dataPaths.foreach( + dataPath => { + val targetFile = new Path(dataPath) + val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf()) + if (fs.isDirectory(targetFile)) { + fs.listStatus(targetFile) + .foreach( + data => { + if (data.isFile) { + CHNativeCacheManager + .removeFiles(data.getPath.toUri.getPath.substring(1), CACHE_NAME) + } + }) + } + }) } }