diff --git a/.github/workflows/velox_backend_enhanced.yml b/.github/workflows/velox_backend_enhanced.yml index 0399946608be..b22d260bc8d9 100644 --- a/.github/workflows/velox_backend_enhanced.yml +++ b/.github/workflows/velox_backend_enhanced.yml @@ -127,7 +127,7 @@ jobs: java -version export SPARK_HOME=/opt/shims/spark34/spark_home/ ls -l $SPARK_HOME - $MVN_CMD clean test -Pspark-3.4 -Pjava-17 -Pbackends-velox -Piceberg -Pdelta -Phudi \ + $MVN_CMD clean test -Pspark-3.4 -Pjava-17 -Pbackends-velox -Piceberg -Piceberg-test -Pdelta -Phudi \ -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTest \ -DargLine="-Dspark.test.home=$SPARK_HOME" - name: Upload test report diff --git a/.github/workflows/velox_backend_x86.yml b/.github/workflows/velox_backend_x86.yml index 4c56b2f0642f..0022b2257075 100644 --- a/.github/workflows/velox_backend_x86.yml +++ b/.github/workflows/velox_backend_x86.yml @@ -829,7 +829,7 @@ jobs: java -version export SPARK_HOME=/opt/shims/spark34/spark_home/ ls -l $SPARK_HOME - $MVN_CMD clean test -Pspark-3.4 -Pjava-17 -Pbackends-velox -Piceberg -Pdelta -Phudi -Ppaimon -Pspark-ut \ + $MVN_CMD clean test -Pspark-3.4 -Pjava-17 -Pbackends-velox -Piceberg -Piceberg-test -Pdelta -Phudi -Ppaimon -Pspark-ut \ -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest \ -DargLine="-Dspark.test.home=$SPARK_HOME" - name: Upload test report diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml index ceef10d22df6..765dfdd7621a 100644 --- a/backends-clickhouse/pom.xml +++ b/backends-clickhouse/pom.xml @@ -539,8 +539,8 @@ false - 1.18.1 - 1.18 + 1.19.3 + 1.19 diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml index 798d87f539ba..967687cb7e37 100755 --- a/backends-velox/pom.xml +++ b/backends-velox/pom.xml @@ -408,6 +408,13 @@ + + org.apache.iceberg + iceberg-core + ${iceberg.version} + test-jar + test + org.apache.iceberg iceberg-hive-metastore @@ -448,16 +455,9 @@ 3.26.3 test - - junit - junit - 4.13.2 - test - org.junit.jupiter - junit-jupiter-api - 5.11.4 + junit-jupiter test @@ -466,6 +466,50 @@ 4.2.2 test + + org.eclipse.jetty + jetty-server + 11.0.26 + test + + + org.eclipse.jetty + jetty-servlet + 11.0.26 + test + + + org.xerial + sqlite-jdbc + 3.50.3.0 + test + + + + + iceberg-test + + false + + + 3.4.1 + + + + org.apache.iceberg + iceberg-open-api + ${iceberg.version} + test-jar + test + + + org.apache.iceberg + iceberg-open-api + ${iceberg.version} + test-fixtures + test-jar + test + diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenCopyOnWriteDelete.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenCopyOnWriteDelete.java index e03d4aba8c78..d51d04cf194e 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenCopyOnWriteDelete.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenCopyOnWriteDelete.java @@ -16,36 +16,12 @@ */ package org.apache.gluten.extensions; -import org.apache.iceberg.PlanningMode; import org.apache.iceberg.spark.extensions.TestCopyOnWriteDelete; import org.junit.Test; -import java.util.Map; import java.util.concurrent.ExecutionException; public class TestGlutenCopyOnWriteDelete extends TestCopyOnWriteDelete { - public TestGlutenCopyOnWriteDelete( - String catalogName, - String implementation, - Map config, - String fileFormat, - Boolean vectorized, - String distributionMode, - boolean fanoutEnabled, - String branch, - PlanningMode planningMode) { - super( - catalogName, - implementation, - config, - fileFormat, - vectorized, - distributionMode, - fanoutEnabled, - branch, - planningMode); - } - @Test public synchronized void testDeleteWithConcurrentTableRefresh() { System.out.println("Run timeout"); diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadDelete.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadDelete.java index f2fe3e334118..322d74f60b01 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadDelete.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadDelete.java @@ -16,35 +16,12 @@ */ package org.apache.gluten.extensions; -import org.apache.iceberg.PlanningMode; import org.apache.iceberg.spark.extensions.TestMergeOnReadDelete; import org.junit.Test; -import java.util.Map; import java.util.concurrent.ExecutionException; public class TestGlutenMergeOnReadDelete extends TestMergeOnReadDelete { - public TestGlutenMergeOnReadDelete( - String catalogName, - String implementation, - Map config, - String fileFormat, - Boolean vectorized, - String distributionMode, - boolean fanoutEnabled, - String branch, - PlanningMode planningMode) { - super( - catalogName, - implementation, - config, - fileFormat, - vectorized, - distributionMode, - fanoutEnabled, - branch, - planningMode); - } @Test public synchronized void testDeleteWithConcurrentTableRefresh() { diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadMerge.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadMerge.java index efb919f1b48c..7b29cb69296a 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadMerge.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadMerge.java @@ -16,7 +16,6 @@ */ package org.apache.gluten.extensions; -import org.apache.iceberg.PlanningMode; import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -25,8 +24,7 @@ import org.apache.spark.sql.execution.SparkPlan; import org.apache.spark.sql.internal.SQLConf; import org.junit.Test; - -import java.util.Map; +import org.junit.jupiter.api.TestTemplate; import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE; import static org.apache.iceberg.TableProperties.MERGE_MODE; @@ -34,27 +32,6 @@ import static org.assertj.core.api.Assertions.assertThat; public class TestGlutenMergeOnReadMerge extends TestMergeOnReadMerge { - public TestGlutenMergeOnReadMerge( - String catalogName, - String implementation, - Map config, - String fileFormat, - boolean vectorized, - String distributionMode, - boolean fanoutEnabled, - String branch, - PlanningMode planningMode) { - super( - catalogName, - implementation, - config, - fileFormat, - vectorized, - distributionMode, - fanoutEnabled, - branch, - planningMode); - } @Test public synchronized void testMergeWithConcurrentTableRefresh() { @@ -72,7 +49,7 @@ public synchronized void testMergeWithSnapshotIsolation() { } // The matched join string is changed from Join to ShuffledHashJoinExecTransformer - @Test + @TestTemplate public void testMergeConditionSplitIntoTargetPredicateAndJoinCondition() { createAndInitTable( "id INT, salary INT, dep STRING, sub_dep STRING", diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadUpdate.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadUpdate.java index f2db135cec3f..09e771ff8bc0 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadUpdate.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadUpdate.java @@ -16,35 +16,12 @@ */ package org.apache.gluten.extensions; -import org.apache.iceberg.PlanningMode; import org.apache.iceberg.spark.extensions.TestMergeOnReadUpdate; import org.junit.Test; -import java.util.Map; import java.util.concurrent.ExecutionException; public class TestGlutenMergeOnReadUpdate extends TestMergeOnReadUpdate { - public TestGlutenMergeOnReadUpdate( - String catalogName, - String implementation, - Map config, - String fileFormat, - boolean vectorized, - String distributionMode, - boolean fanoutEnabled, - String branch, - PlanningMode planningMode) { - super( - catalogName, - implementation, - config, - fileFormat, - vectorized, - distributionMode, - fanoutEnabled, - branch, - planningMode); - } @Test public synchronized void testUpdateWithConcurrentTableRefresh() { diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenStoragePartitionedJoinsInRowLevelOperations.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenStoragePartitionedJoinsInRowLevelOperations.java index 9d650c6f6c7a..054689a07b5f 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenStoragePartitionedJoinsInRowLevelOperations.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenStoragePartitionedJoinsInRowLevelOperations.java @@ -18,12 +18,5 @@ import org.apache.iceberg.spark.extensions.TestStoragePartitionedJoinsInRowLevelOperations; -import java.util.Map; - public class TestGlutenStoragePartitionedJoinsInRowLevelOperations - extends TestStoragePartitionedJoinsInRowLevelOperations { - public TestGlutenStoragePartitionedJoinsInRowLevelOperations( - String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } -} + extends TestStoragePartitionedJoinsInRowLevelOperations {} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenSystemFunctionPushDownDQL.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenSystemFunctionPushDownDQL.java index 059da147255f..3bb778b2a3ea 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenSystemFunctionPushDownDQL.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenSystemFunctionPushDownDQL.java @@ -18,11 +18,4 @@ import org.apache.iceberg.spark.extensions.TestSystemFunctionPushDownDQL; -import java.util.Map; - -public class TestGlutenSystemFunctionPushDownDQL extends TestSystemFunctionPushDownDQL { - public TestGlutenSystemFunctionPushDownDQL( - String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } -} +public class TestGlutenSystemFunctionPushDownDQL extends TestSystemFunctionPushDownDQL {} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenSystemFunctionPushDownInRowLevelOperations.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenSystemFunctionPushDownInRowLevelOperations.java index 2eaaa6e5feb3..ce2b06913af7 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenSystemFunctionPushDownInRowLevelOperations.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenSystemFunctionPushDownInRowLevelOperations.java @@ -16,12 +16,5 @@ */ package org.apache.gluten.extensions; -import java.util.Map; - public class TestGlutenSystemFunctionPushDownInRowLevelOperations - extends TestGlutenSystemFunctionPushDownDQL { - public TestGlutenSystemFunctionPushDownInRowLevelOperations( - String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } -} + extends TestGlutenSystemFunctionPushDownDQL {} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestDataFrameWrites.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestDataFrameWrites.java deleted file mode 100644 index 678cec58d999..000000000000 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestDataFrameWrites.java +++ /dev/null @@ -1,403 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.source; - -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.*; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.avro.AvroIterable; -import org.apache.iceberg.hadoop.HadoopTables; -import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.shaded.org.apache.avro.generic.GenericData.Record; -import org.apache.iceberg.spark.SparkSQLProperties; -import org.apache.iceberg.spark.SparkSchemaUtil; -import org.apache.iceberg.spark.SparkWriteOptions; -import org.apache.iceberg.spark.data.AvroDataTest; -import org.apache.iceberg.spark.data.RandomData; -import org.apache.iceberg.spark.data.SparkAvroReader; -import org.apache.iceberg.types.Types; -import org.apache.spark.SparkException; -import org.apache.spark.TaskContext; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.MapPartitionsFunction; -import org.apache.spark.sql.*; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.encoders.RowEncoder; -import org.junit.*; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.util.*; - -import static org.apache.iceberg.spark.SparkSchemaUtil.convert; -import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsSafe; -import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -@RunWith(Parameterized.class) -public class TestDataFrameWrites extends AvroDataTest { - private static final Configuration CONF = new Configuration(); - - private final String format; - - @Parameterized.Parameters(name = "format = {0}") - public static Object[] parameters() { - return new Object[] {"parquet", "avro", "orc"}; - } - - public TestDataFrameWrites(String format) { - this.format = format; - } - - private static SparkSession spark = null; - private static JavaSparkContext sc = null; - - private Map tableProperties; - - private final org.apache.spark.sql.types.StructType sparkSchema = - new org.apache.spark.sql.types.StructType( - new org.apache.spark.sql.types.StructField[] { - new org.apache.spark.sql.types.StructField( - "optionalField", - org.apache.spark.sql.types.DataTypes.StringType, - true, - org.apache.spark.sql.types.Metadata.empty()), - new org.apache.spark.sql.types.StructField( - "requiredField", - org.apache.spark.sql.types.DataTypes.StringType, - false, - org.apache.spark.sql.types.Metadata.empty()) - }); - - private final Schema icebergSchema = - new Schema( - Types.NestedField.optional(1, "optionalField", Types.StringType.get()), - Types.NestedField.required(2, "requiredField", Types.StringType.get())); - - private final List data0 = - Arrays.asList( - "{\"optionalField\": \"a1\", \"requiredField\": \"bid_001\"}", - "{\"optionalField\": \"a2\", \"requiredField\": \"bid_002\"}"); - private final List data1 = - Arrays.asList( - "{\"optionalField\": \"d1\", \"requiredField\": \"bid_101\"}", - "{\"optionalField\": \"d2\", \"requiredField\": \"bid_102\"}", - "{\"optionalField\": \"d3\", \"requiredField\": \"bid_103\"}", - "{\"optionalField\": \"d4\", \"requiredField\": \"bid_104\"}"); - - @BeforeClass - public static void startSpark() { - TestDataFrameWrites.spark = SparkSession.builder().master("local[2]").getOrCreate(); - TestDataFrameWrites.sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - } - - @AfterClass - public static void stopSpark() { - SparkSession currentSpark = TestDataFrameWrites.spark; - TestDataFrameWrites.spark = null; - TestDataFrameWrites.sc = null; - currentSpark.stop(); - } - - @Override - protected void writeAndValidate(Schema schema) throws IOException { - File location = createTableFolder(); - Table table = createTable(schema, location); - writeAndValidateWithLocations(table, location, new File(location, "data")); - } - - @Test - public void testWriteWithCustomDataLocation() throws IOException { - File location = createTableFolder(); - File tablePropertyDataLocation = temp.newFolder("test-table-property-data-dir"); - Table table = createTable(new Schema(SUPPORTED_PRIMITIVES.fields()), location); - table - .updateProperties() - .set(TableProperties.WRITE_DATA_LOCATION, tablePropertyDataLocation.getAbsolutePath()) - .commit(); - writeAndValidateWithLocations(table, location, tablePropertyDataLocation); - } - - private File createTableFolder() throws IOException { - File parent = temp.newFolder("parquet"); - File location = new File(parent, "test"); - Assert.assertTrue("Mkdir should succeed", location.mkdirs()); - return location; - } - - private Table createTable(Schema schema, File location) { - HadoopTables tables = new HadoopTables(CONF); - return tables.create(schema, PartitionSpec.unpartitioned(), location.toString()); - } - - private void writeAndValidateWithLocations(Table table, File location, File expectedDataDir) - throws IOException { - Schema tableSchema = table.schema(); // use the table schema because ids are reassigned - - table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit(); - - Iterable expected = RandomData.generate(tableSchema, 100, 0L); - writeData(expected, tableSchema, location.toString()); - - table.refresh(); - - List actual = readTable(location.toString()); - - Iterator expectedIter = expected.iterator(); - Iterator actualIter = actual.iterator(); - while (expectedIter.hasNext() && actualIter.hasNext()) { - assertEqualsSafe(tableSchema.asStruct(), expectedIter.next(), actualIter.next()); - } - Assert.assertEquals( - "Both iterators should be exhausted", expectedIter.hasNext(), actualIter.hasNext()); - - table - .currentSnapshot() - .addedDataFiles(table.io()) - .forEach( - dataFile -> - Assert.assertTrue( - String.format( - "File should have the parent directory %s, but has: %s.", - expectedDataDir.getAbsolutePath(), dataFile.path()), - URI.create(dataFile.path().toString()) - .getPath() - .startsWith(expectedDataDir.getAbsolutePath()))); - } - - private List readTable(String location) { - Dataset result = spark.read().format("iceberg").load(location); - - return result.collectAsList(); - } - - private void writeData(Iterable records, Schema schema, String location) - throws IOException { - Dataset df = createDataset(records, schema); - DataFrameWriter writer = df.write().format("iceberg").mode("append"); - writer.save(location); - } - - private void writeDataWithFailOnPartition( - Iterable records, Schema schema, String location) throws IOException, SparkException { - final int numPartitions = 10; - final int partitionToFail = new Random().nextInt(numPartitions); - MapPartitionsFunction failOnFirstPartitionFunc = - input -> { - int partitionId = TaskContext.getPartitionId(); - - if (partitionId == partitionToFail) { - throw new SparkException( - String.format("Intended exception in partition %d !", partitionId)); - } - return input; - }; - - Dataset df = - createDataset(records, schema) - .repartition(numPartitions) - .mapPartitions(failOnFirstPartitionFunc, RowEncoder.apply(convert(schema))); - // This trick is needed because Spark 3 handles decimal overflow in RowEncoder which "changes" - // nullability of the column to "true" regardless of original nullability. - // Setting "check-nullability" option to "false" doesn't help as it fails at Spark analyzer. - Dataset convertedDf = df.sqlContext().createDataFrame(df.rdd(), convert(schema)); - DataFrameWriter writer = convertedDf.write().format("iceberg").mode("append"); - writer.save(location); - } - - private Dataset createDataset(Iterable records, Schema schema) throws IOException { - // this uses the SparkAvroReader to create a DataFrame from the list of records - // it assumes that SparkAvroReader is correct - File testFile = temp.newFile(); - Assert.assertTrue("Delete should succeed", testFile.delete()); - - try (FileAppender writer = - Avro.write(Files.localOutput(testFile)).schema(schema).named("test").build()) { - for (Record rec : records) { - writer.add(rec); - } - } - - // make sure the dataframe matches the records before moving on - List rows = Lists.newArrayList(); - try (AvroIterable reader = - Avro.read(Files.localInput(testFile)) - .createReaderFunc(SparkAvroReader::new) - .project(schema) - .build()) { - - Iterator recordIter = records.iterator(); - Iterator readIter = reader.iterator(); - while (recordIter.hasNext() && readIter.hasNext()) { - InternalRow row = readIter.next(); - assertEqualsUnsafe(schema.asStruct(), recordIter.next(), row); - rows.add(row); - } - Assert.assertEquals( - "Both iterators should be exhausted", recordIter.hasNext(), readIter.hasNext()); - } - - JavaRDD rdd = sc.parallelize(rows); - return spark.internalCreateDataFrame(JavaRDD.toRDD(rdd), convert(schema), false); - } - - @Test - public void testNullableWithWriteOption() throws IOException { - Assume.assumeTrue( - "Spark 3 rejects writing nulls to a required column", spark.version().startsWith("2")); - - File location = new File(temp.newFolder("parquet"), "test"); - String sourcePath = String.format("%s/nullable_poc/sourceFolder/", location); - String targetPath = String.format("%s/nullable_poc/targetFolder/", location); - - tableProperties = ImmutableMap.of(TableProperties.WRITE_DATA_LOCATION, targetPath); - - // read this and append to iceberg dataset - spark - .read() - .schema(sparkSchema) - .json(JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(data1)) - .write() - .parquet(sourcePath); - - // this is our iceberg dataset to which we will append data - new HadoopTables(spark.sessionState().newHadoopConf()) - .create( - icebergSchema, - PartitionSpec.builderFor(icebergSchema).identity("requiredField").build(), - tableProperties, - targetPath); - - // this is the initial data inside the iceberg dataset - spark - .read() - .schema(sparkSchema) - .json(JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(data0)) - .write() - .format("iceberg") - .mode(SaveMode.Append) - .save(targetPath); - - // read from parquet and append to iceberg w/ nullability check disabled - spark - .read() - .schema(SparkSchemaUtil.convert(icebergSchema)) - .parquet(sourcePath) - .write() - .format("iceberg") - .option(SparkWriteOptions.CHECK_NULLABILITY, false) - .mode(SaveMode.Append) - .save(targetPath); - - // read all data - List rows = spark.read().format("iceberg").load(targetPath).collectAsList(); - Assert.assertEquals("Should contain 6 rows", 6, rows.size()); - } - - @Test - public void testNullableWithSparkSqlOption() throws IOException { - Assume.assumeTrue( - "Spark 3 rejects writing nulls to a required column", spark.version().startsWith("2")); - - File location = new File(temp.newFolder("parquet"), "test"); - String sourcePath = String.format("%s/nullable_poc/sourceFolder/", location); - String targetPath = String.format("%s/nullable_poc/targetFolder/", location); - - tableProperties = ImmutableMap.of(TableProperties.WRITE_DATA_LOCATION, targetPath); - - // read this and append to iceberg dataset - spark - .read() - .schema(sparkSchema) - .json(JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(data1)) - .write() - .parquet(sourcePath); - - SparkSession newSparkSession = - SparkSession.builder() - .master("local[2]") - .appName("NullableTest") - .config(SparkSQLProperties.CHECK_NULLABILITY, false) - .getOrCreate(); - - // this is our iceberg dataset to which we will append data - new HadoopTables(newSparkSession.sessionState().newHadoopConf()) - .create( - icebergSchema, - PartitionSpec.builderFor(icebergSchema).identity("requiredField").build(), - tableProperties, - targetPath); - - // this is the initial data inside the iceberg dataset - newSparkSession - .read() - .schema(sparkSchema) - .json(JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(data0)) - .write() - .format("iceberg") - .mode(SaveMode.Append) - .save(targetPath); - - // read from parquet and append to iceberg - newSparkSession - .read() - .schema(SparkSchemaUtil.convert(icebergSchema)) - .parquet(sourcePath) - .write() - .format("iceberg") - .mode(SaveMode.Append) - .save(targetPath); - - // read all data - List rows = newSparkSession.read().format("iceberg").load(targetPath).collectAsList(); - Assert.assertEquals("Should contain 6 rows", 6, rows.size()); - } - - @Test - public void testFaultToleranceOnWrite() throws IOException { - File location = createTableFolder(); - Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields()); - Table table = createTable(schema, location); - - Iterable records = RandomData.generate(schema, 100, 0L); - writeData(records, schema, location.toString()); - - table.refresh(); - - Snapshot snapshotBeforeFailingWrite = table.currentSnapshot(); - List resultBeforeFailingWrite = readTable(location.toString()); - - Iterable records2 = RandomData.generate(schema, 100, 0L); - - assertThatThrownBy(() -> writeDataWithFailOnPartition(records2, schema, location.toString())) - .isInstanceOf(SparkException.class); - - table.refresh(); - - Snapshot snapshotAfterFailingWrite = table.currentSnapshot(); - List resultAfterFailingWrite = readTable(location.toString()); - - Assert.assertEquals(snapshotAfterFailingWrite, snapshotBeforeFailingWrite); - Assert.assertEquals(resultAfterFailingWrite, resultBeforeFailingWrite); - } -} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenDataFrameWriterV2.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenDataFrameWriterV2.java index b66015515d0c..889a46c86ef9 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenDataFrameWriterV2.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenDataFrameWriterV2.java @@ -16,6 +16,9 @@ */ package org.apache.gluten.source; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.spark.source.TestDataFrameWriterV2; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(ParameterizedTestExtension.class) public class TestGlutenDataFrameWriterV2 extends TestDataFrameWriterV2 {} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenDataFrameWriterV2Coercion.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenDataFrameWriterV2Coercion.java index f40b98bf1868..1c11aae70229 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenDataFrameWriterV2Coercion.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenDataFrameWriterV2Coercion.java @@ -16,11 +16,6 @@ */ package org.apache.gluten.source; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.spark.source.TestDataFrameWriterV2Coercion; -public class TestGlutenDataFrameWriterV2Coercion extends TestDataFrameWriterV2Coercion { - public TestGlutenDataFrameWriterV2Coercion(FileFormat format, String dataType) { - super(format, dataType); - } -} +public class TestGlutenDataFrameWriterV2Coercion extends TestDataFrameWriterV2Coercion {} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenIcebergSourceHiveTables.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenIcebergSourceHiveTables.java index c3e921e3244d..bbfaca15655c 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenIcebergSourceHiveTables.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenIcebergSourceHiveTables.java @@ -16,7 +16,42 @@ */ package org.apache.gluten.source; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.spark.source.TestIcebergSourceHiveTables; +import org.junit.After; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.Map; // Fallback all the table scan because source table is metadata table with format avro. -public class TestGlutenIcebergSourceHiveTables extends TestIcebergSourceHiveTables {} +public class TestGlutenIcebergSourceHiveTables extends TestIcebergSourceHiveTables { + + private static TableIdentifier currentIdentifier; + + // The BeforeAll does not take effect because junit 4 is used in Gluten + @BeforeClass + public static void start() { + Namespace db = Namespace.of(new String[] {"db"}); + if (!catalog.namespaceExists(db)) { + catalog.createNamespace(db); + } + } + + @After + public void dropTable() throws IOException { + if (catalog.tableExists(currentIdentifier)) { + this.dropTable(currentIdentifier); + } + } + + public Table createTable( + TableIdentifier ident, Schema schema, PartitionSpec spec, Map properties) { + currentIdentifier = ident; + return catalog.createTable(ident, schema, spec, properties); + } +} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenIdentityPartitionData.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenIdentityPartitionData.java index 506f8a5226cd..78b6c2e38243 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenIdentityPartitionData.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenIdentityPartitionData.java @@ -16,12 +16,6 @@ */ package org.apache.gluten.source; -import org.apache.iceberg.PlanningMode; import org.apache.iceberg.spark.source.TestIdentityPartitionData; -public class TestGlutenIdentityPartitionData extends TestIdentityPartitionData { - public TestGlutenIdentityPartitionData( - String format, boolean vectorized, PlanningMode planningMode) { - super(format, vectorized, planningMode); - } -} +public class TestGlutenIdentityPartitionData extends TestIdentityPartitionData {} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenPositionDeletesTable.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenPositionDeletesTable.java index 02d348544db9..3545b538387b 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenPositionDeletesTable.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenPositionDeletesTable.java @@ -16,14 +16,6 @@ */ package org.apache.gluten.source; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.spark.source.TestPositionDeletesTable; -import java.util.Map; - -public class TestGlutenPositionDeletesTable extends TestPositionDeletesTable { - public TestGlutenPositionDeletesTable( - String catalogName, String implementation, Map config, FileFormat format) { - super(catalogName, implementation, config, format); - } -} +public class TestGlutenPositionDeletesTable extends TestPositionDeletesTable {} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenRuntimeFiltering.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenRuntimeFiltering.java index 90e382899194..976e482ad3fa 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenRuntimeFiltering.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenRuntimeFiltering.java @@ -16,11 +16,6 @@ */ package org.apache.gluten.source; -import org.apache.iceberg.PlanningMode; import org.apache.iceberg.spark.source.TestRuntimeFiltering; -public class TestGlutenRuntimeFiltering extends TestRuntimeFiltering { - public TestGlutenRuntimeFiltering(PlanningMode planningMode) { - super(planningMode); - } -} +public class TestGlutenRuntimeFiltering extends TestRuntimeFiltering {} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenSparkMetadataColumns.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenSparkMetadataColumns.java index 8e49b5876b43..5189c86c62fd 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenSparkMetadataColumns.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenSparkMetadataColumns.java @@ -16,12 +16,6 @@ */ package org.apache.gluten.source; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.spark.source.TestSparkMetadataColumns; -public class TestGlutenSparkMetadataColumns extends TestSparkMetadataColumns { - public TestGlutenSparkMetadataColumns( - FileFormat fileFormat, boolean vectorized, int formatVersion) { - super(fileFormat, vectorized, formatVersion); - } -} +public class TestGlutenSparkMetadataColumns extends TestSparkMetadataColumns {} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenSparkStagedScan.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenSparkStagedScan.java index 09a6583320de..00e8f300d7bd 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenSparkStagedScan.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenSparkStagedScan.java @@ -18,11 +18,4 @@ import org.apache.iceberg.spark.source.TestSparkStagedScan; -import java.util.Map; - -public class TestGlutenSparkStagedScan extends TestSparkStagedScan { - public TestGlutenSparkStagedScan( - String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } -} +public class TestGlutenSparkStagedScan extends TestSparkStagedScan {} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenAggregatePushDown.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenAggregatePushDown.java index 17a578bad8c1..8433e9b2f425 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenAggregatePushDown.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenAggregatePushDown.java @@ -24,42 +24,34 @@ import org.apache.iceberg.hive.HiveCatalog; import org.apache.iceberg.hive.TestHiveMetastore; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.spark.TestBase; import org.apache.iceberg.spark.sql.TestAggregatePushDown; import org.apache.spark.sql.SparkSession; -import org.junit.BeforeClass; - -import java.util.Map; +import org.junit.jupiter.api.BeforeAll; public class TestGlutenAggregatePushDown extends TestAggregatePushDown { - public TestGlutenAggregatePushDown( - String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } - @BeforeClass + @BeforeAll public static void startMetastoreAndSpark() { - SparkTestBase.metastore = new TestHiveMetastore(); + TestBase.metastore = new TestHiveMetastore(); metastore.start(); - SparkTestBase.hiveConf = metastore.hiveConf(); - - SparkTestBase.spark = + TestBase.hiveConf = metastore.hiveConf(); + TestBase.spark.close(); + TestBase.spark = SparkSession.builder() .master("local[2]") .config("spark.sql.iceberg.aggregate_pushdown", "true") .config(TestConfUtil.GLUTEN_CONF) .enableHiveSupport() .getOrCreate(); - - SparkTestBase.catalog = + TestBase.catalog = (HiveCatalog) CatalogUtil.loadCatalog( HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf); try { - catalog.createNamespace(Namespace.of("default")); - } catch (AlreadyExistsException ignored) { - // the default namespace already exists. ignore the create error + catalog.createNamespace(Namespace.of(new String[] {"default"})); + } catch (AlreadyExistsException var1) { } } } diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenDeleteFrom.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenDeleteFrom.java index f52f0ddb8ace..9c4593e451d4 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenDeleteFrom.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenDeleteFrom.java @@ -18,11 +18,4 @@ import org.apache.iceberg.spark.sql.TestDeleteFrom; -import java.util.Map; - -public class TestGlutenDeleteFrom extends TestDeleteFrom { - public TestGlutenDeleteFrom( - String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } -} +public class TestGlutenDeleteFrom extends TestDeleteFrom {} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesAsSelect.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesAsSelect.java index 52221d6e8501..72a70adb2e8c 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesAsSelect.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesAsSelect.java @@ -17,5 +17,12 @@ package org.apache.gluten.sql; import org.apache.iceberg.spark.sql.TestPartitionedWritesAsSelect; +import org.junit.Test; -public class TestGlutenPartitionedWritesAsSelect extends TestPartitionedWritesAsSelect {} +public class TestGlutenPartitionedWritesAsSelect extends TestPartitionedWritesAsSelect { + @Test + public void testPartitionedWritesAsSelect() { + System.out.println( + com.fasterxml.jackson.databind.ObjectMapper.class.getProtectionDomain().getCodeSource()); + } +} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesToBranch.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesToBranch.java index 6711a7fd2285..818204649b57 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesToBranch.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesToBranch.java @@ -18,11 +18,4 @@ import org.apache.iceberg.spark.sql.TestPartitionedWritesToBranch; -import java.util.Map; - -public class TestGlutenPartitionedWritesToBranch extends TestPartitionedWritesToBranch { - public TestGlutenPartitionedWritesToBranch( - String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } -} +public class TestGlutenPartitionedWritesToBranch extends TestPartitionedWritesToBranch {} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesToWapBranch.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesToWapBranch.java index 935ca6872eac..91c664cf346f 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesToWapBranch.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesToWapBranch.java @@ -18,11 +18,4 @@ import org.apache.iceberg.spark.sql.TestPartitionedWritesToWapBranch; -import java.util.Map; - -public class TestGlutenPartitionedWritesToWapBranch extends TestPartitionedWritesToWapBranch { - public TestGlutenPartitionedWritesToWapBranch( - String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } -} +public class TestGlutenPartitionedWritesToWapBranch extends TestPartitionedWritesToWapBranch {} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenSelect.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenSelect.java index eff29920dfa2..6a7951d4faee 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenSelect.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenSelect.java @@ -18,10 +18,4 @@ import org.apache.iceberg.spark.sql.TestSelect; -import java.util.Map; - -public class TestGlutenSelect extends TestSelect { - public TestGlutenSelect(String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } -} +public class TestGlutenSelect extends TestSelect {} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenTimestampWithoutZone.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenTimestampWithoutZone.java index af83dafd1d71..e346cbc48e57 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenTimestampWithoutZone.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenTimestampWithoutZone.java @@ -18,11 +18,4 @@ import org.apache.iceberg.spark.sql.TestTimestampWithoutZone; -import java.util.Map; - -public class TestGlutenTimestampWithoutZone extends TestTimestampWithoutZone { - public TestGlutenTimestampWithoutZone( - String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } -} +public class TestGlutenTimestampWithoutZone extends TestTimestampWithoutZone {} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/CatalogTestBase.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/CatalogTestBase.java new file mode 100644 index 000000000000..00b88080affa --- /dev/null +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/CatalogTestBase.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.spark; + +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public abstract class CatalogTestBase extends TestBaseWithCatalog { + + // these parameters are broken out to avoid changes that need to modify lots of test suites + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + protected static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + SparkCatalogConfig.HIVE.properties() + }, + { + SparkCatalogConfig.HADOOP.catalogName(), + SparkCatalogConfig.HADOOP.implementation(), + SparkCatalogConfig.HADOOP.properties() + }, + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties() + }, + }; + } +} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java new file mode 100644 index 000000000000..6119d0df3b98 --- /dev/null +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.spark; + +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Map; + +@RunWith(Parameterized.class) +public abstract class SparkCatalogTestBase extends SparkTestBaseWithCatalog { + + // these parameters are broken out to avoid changes that need to modify lots of test suites + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + SparkCatalogConfig.HIVE.properties() + }, + { + SparkCatalogConfig.HADOOP.catalogName(), + SparkCatalogConfig.HADOOP.implementation(), + SparkCatalogConfig.HADOOP.properties() + }, + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties() + } + }; + } + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + public SparkCatalogTestBase(SparkCatalogConfig config) { + super(config); + } + + public SparkCatalogTestBase( + String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } +} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java new file mode 100644 index 000000000000..89b6f23687b1 --- /dev/null +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.spark; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.PlanningMode; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.util.PropertyUtil; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +public abstract class SparkTestBaseWithCatalog extends SparkTestBase { + protected static File warehouse = null; + + @BeforeClass + public static void createWarehouse() throws IOException { + SparkTestBaseWithCatalog.warehouse = File.createTempFile("warehouse", null); + Assert.assertTrue(warehouse.delete()); + } + + @AfterClass + public static void dropWarehouse() throws IOException { + if (warehouse != null && warehouse.exists()) { + Path warehousePath = new Path(warehouse.getAbsolutePath()); + FileSystem fs = warehousePath.getFileSystem(hiveConf); + Assert.assertTrue("Failed to delete " + warehousePath, fs.delete(warehousePath, true)); + } + } + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + protected final String catalogName; + protected final Map catalogConfig; + protected final Catalog validationCatalog; + protected final SupportsNamespaces validationNamespaceCatalog; + protected final TableIdentifier tableIdent = TableIdentifier.of(Namespace.of("default"), "table"); + protected final String tableName; + + public SparkTestBaseWithCatalog() { + this(SparkCatalogConfig.HADOOP); + } + + public SparkTestBaseWithCatalog(SparkCatalogConfig config) { + this(config.catalogName(), config.implementation(), config.properties()); + } + + public SparkTestBaseWithCatalog( + String catalogName, String implementation, Map config) { + this.catalogName = catalogName; + this.catalogConfig = config; + this.validationCatalog = + catalogName.equals("testhadoop") + ? new HadoopCatalog(spark.sessionState().newHadoopConf(), "file:" + warehouse) + : catalog; + this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog; + + spark.conf().set("spark.sql.catalog." + catalogName, implementation); + config.forEach( + (key, value) -> spark.conf().set("spark.sql.catalog." + catalogName + "." + key, value)); + + if ("hadoop".equalsIgnoreCase(config.get("type"))) { + spark.conf().set("spark.sql.catalog." + catalogName + ".warehouse", "file:" + warehouse); + } + + this.tableName = + (catalogName.equals("spark_catalog") ? "" : catalogName + ".") + "default.table"; + + sql("CREATE NAMESPACE IF NOT EXISTS default"); + } + + protected String tableName(String name) { + return (catalogName.equals("spark_catalog") ? "" : catalogName + ".") + "default." + name; + } + + protected String commitTarget() { + return tableName; + } + + protected String selectTarget() { + return tableName; + } + + protected boolean cachingCatalogEnabled() { + return PropertyUtil.propertyAsBoolean( + catalogConfig, CatalogProperties.CACHE_ENABLED, CatalogProperties.CACHE_ENABLED_DEFAULT); + } + + protected void configurePlanningMode(PlanningMode planningMode) { + configurePlanningMode(tableName, planningMode); + } + + protected void configurePlanningMode(String table, PlanningMode planningMode) { + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s', '%s' '%s')", + table, + TableProperties.DATA_PLANNING_MODE, + planningMode.modeName(), + TableProperties.DELETE_PLANNING_MODE, + planningMode.modeName()); + } +} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/TestBase.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/TestBase.java new file mode 100644 index 000000000000..b0b40a8f1b1d --- /dev/null +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/TestBase.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.spark; + +import org.apache.gluten.TestConfUtil; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.TestHiveMetastore; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.execution.QueryExecution; +import org.apache.spark.sql.execution.SparkPlan; +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.util.QueryExecutionListener; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.InetAddress; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; +import static org.assertj.core.api.Assertions.assertThat; + +// Must add the gluten config when create spark session because add plugin config +public abstract class TestBase extends SparkTestHelperBase { + + protected static TestHiveMetastore metastore = null; + protected static HiveConf hiveConf = null; + protected static SparkSession spark = null; + protected static JavaSparkContext sparkContext = null; + protected static HiveCatalog catalog = null; + + @BeforeAll + public static void startMetastoreAndSpark() { + TestBase.metastore = new TestHiveMetastore(); + metastore.start(); + TestBase.hiveConf = metastore.hiveConf(); + + TestBase.spark = + SparkSession.builder() + .master("local[2]") + .config(TestConfUtil.GLUTEN_CONF) + .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress()) + .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") + .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) + .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") + .enableHiveSupport() + .getOrCreate(); + + TestBase.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + TestBase.catalog = + (HiveCatalog) + CatalogUtil.loadCatalog( + HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf); + + try { + catalog.createNamespace(Namespace.of("default")); + } catch (AlreadyExistsException ignored) { + // the default namespace already exists. ignore the create error + } + } + + @AfterAll + public static void stopMetastoreAndSpark() throws Exception { + TestBase.catalog = null; + if (metastore != null) { + metastore.stop(); + TestBase.metastore = null; + } + if (spark != null) { + spark.stop(); + TestBase.spark = null; + TestBase.sparkContext = null; + } + } + + protected long waitUntilAfter(long timestampMillis) { + long current = System.currentTimeMillis(); + while (current <= timestampMillis) { + current = System.currentTimeMillis(); + } + return current; + } + + protected List sql(String query, Object... args) { + List rows = spark.sql(String.format(query, args)).collectAsList(); + if (rows.isEmpty()) { + return ImmutableList.of(); + } + + return rowsToJava(rows); + } + + protected Object scalarSql(String query, Object... args) { + List rows = sql(query, args); + assertThat(rows).as("Scalar SQL should return one row").hasSize(1); + Object[] row = Iterables.getOnlyElement(rows); + assertThat(row).as("Scalar SQL should return one value").hasSize(1); + return row[0]; + } + + protected Object[] row(Object... values) { + return values; + } + + protected static String dbPath(String dbName) { + return metastore.getDatabasePath(dbName); + } + + protected void withUnavailableFiles(Iterable> files, Action action) { + Iterable fileLocations = Iterables.transform(files, ContentFile::location); + withUnavailableLocations(fileLocations, action); + } + + private void move(String location, String newLocation) { + Path path = Paths.get(URI.create(location)); + Path tempPath = Paths.get(URI.create(newLocation)); + + try { + Files.move(path, tempPath); + } catch (IOException e) { + throw new UncheckedIOException("Failed to move: " + location, e); + } + } + + protected void withUnavailableLocations(Iterable locations, Action action) { + for (String location : locations) { + move(location, location + "_temp"); + } + + try { + action.invoke(); + } finally { + for (String location : locations) { + move(location + "_temp", location); + } + } + } + + protected void withDefaultTimeZone(String zoneId, Action action) { + TimeZone currentZone = TimeZone.getDefault(); + try { + TimeZone.setDefault(TimeZone.getTimeZone(zoneId)); + action.invoke(); + } finally { + TimeZone.setDefault(currentZone); + } + } + + protected void withSQLConf(Map conf, Action action) { + SQLConf sqlConf = SQLConf.get(); + + Map currentConfValues = Maps.newHashMap(); + conf.keySet() + .forEach( + confKey -> { + if (sqlConf.contains(confKey)) { + String currentConfValue = sqlConf.getConfString(confKey); + currentConfValues.put(confKey, currentConfValue); + } + }); + + conf.forEach( + (confKey, confValue) -> { + if (SQLConf.isStaticConfigKey(confKey)) { + throw new RuntimeException("Cannot modify the value of a static config: " + confKey); + } + sqlConf.setConfString(confKey, confValue); + }); + + try { + action.invoke(); + } finally { + conf.forEach( + (confKey, confValue) -> { + if (currentConfValues.containsKey(confKey)) { + sqlConf.setConfString(confKey, currentConfValues.get(confKey)); + } else { + sqlConf.unsetConf(confKey); + } + }); + } + } + + protected Dataset jsonToDF(String schema, String... records) { + Dataset jsonDF = spark.createDataset(ImmutableList.copyOf(records), Encoders.STRING()); + return spark.read().schema(schema).json(jsonDF); + } + + protected void append(String table, String... jsonRecords) { + try { + String schema = spark.table(table).schema().toDDL(); + Dataset df = jsonToDF(schema, jsonRecords); + df.coalesce(1).writeTo(table).append(); + } catch (NoSuchTableException e) { + throw new RuntimeException("Failed to write data", e); + } + } + + protected String tablePropsAsString(Map tableProps) { + StringBuilder stringBuilder = new StringBuilder(); + + for (Map.Entry property : tableProps.entrySet()) { + if (stringBuilder.length() > 0) { + stringBuilder.append(", "); + } + stringBuilder.append(String.format("'%s' '%s'", property.getKey(), property.getValue())); + } + + return stringBuilder.toString(); + } + + protected SparkPlan executeAndKeepPlan(String query, Object... args) { + return executeAndKeepPlan(() -> sql(query, args)); + } + + protected SparkPlan executeAndKeepPlan(Action action) { + AtomicReference executedPlanRef = new AtomicReference<>(); + + QueryExecutionListener listener = + new QueryExecutionListener() { + @Override + public void onSuccess(String funcName, QueryExecution qe, long durationNs) { + executedPlanRef.set(qe.executedPlan()); + } + + @Override + public void onFailure(String funcName, QueryExecution qe, Exception exception) {} + }; + + spark.listenerManager().register(listener); + + action.invoke(); + + try { + spark.sparkContext().listenerBus().waitUntilEmpty(); + } catch (TimeoutException e) { + throw new RuntimeException("Timeout while waiting for processing events", e); + } + + SparkPlan executedPlan = executedPlanRef.get(); + if (executedPlan instanceof AdaptiveSparkPlanExec) { + return ((AdaptiveSparkPlanExec) executedPlan).executedPlan(); + } else { + return executedPlan; + } + } + + @FunctionalInterface + protected interface Action { + void invoke(); + } +} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java new file mode 100644 index 000000000000..4a7f1c597deb --- /dev/null +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.spark; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.*; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.util.PropertyUtil; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +import static org.apache.iceberg.CatalogProperties.CATALOG_IMPL; +import static org.apache.iceberg.CatalogUtil.*; +import static org.assertj.core.api.Assertions.assertThat; + +// And remove rest catalog, we don;t need to test the catalog. +@ExtendWith(ParameterizedTestExtension.class) +public abstract class TestBaseWithCatalog extends TestBase { + protected static File warehouse = null; + + // @RegisterExtension + // private static final RESTServerExtension REST_SERVER_EXTENSION = + // new RESTServerExtension( + // Map.of( + // RESTCatalogServer.REST_PORT, + // RESTServerExtension.FREE_PORT, + // // In-memory sqlite database by default is private to the connection that created + // it. + // // If more than 1 jdbc connection backed by in-memory sqlite is created behind one + // // JdbcCatalog, then different jdbc connections could provide different views of + // table + // // status even belonging to the same catalog. Reference: + // // https://www.sqlite.org/inmemorydb.html + // CatalogProperties.CLIENT_POOL_SIZE, + // "1")); + + protected static RESTCatalog restCatalog; + + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + protected static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.HADOOP.catalogName(), + SparkCatalogConfig.HADOOP.implementation(), + SparkCatalogConfig.HADOOP.properties() + }, + }; + } + + @BeforeAll + public static void setUpAll() throws IOException { + TestBaseWithCatalog.warehouse = File.createTempFile("warehouse", null); + assertThat(warehouse.delete()).isTrue(); + // restCatalog = REST_SERVER_EXTENSION.client(); + } + + @AfterAll + public static void tearDownAll() throws IOException { + if (warehouse != null && warehouse.exists()) { + Path warehousePath = new Path(warehouse.getAbsolutePath()); + FileSystem fs = warehousePath.getFileSystem(hiveConf); + assertThat(fs.delete(warehousePath, true)).as("Failed to delete " + warehousePath).isTrue(); + } + } + + @TempDir protected java.nio.file.Path temp; + + @Parameter(index = 0) + protected String catalogName; + + @Parameter(index = 1) + protected String implementation; + + @Parameter(index = 2) + protected Map catalogConfig; + + protected Catalog validationCatalog; + protected SupportsNamespaces validationNamespaceCatalog; + protected TableIdentifier tableIdent = TableIdentifier.of(Namespace.of("default"), "table"); + protected String tableName; + + private void configureValidationCatalog() { + if (catalogConfig.containsKey(ICEBERG_CATALOG_TYPE)) { + String catalogType = catalogConfig.get(ICEBERG_CATALOG_TYPE); + switch (catalogType) { + case ICEBERG_CATALOG_TYPE_HADOOP: + this.validationCatalog = + new HadoopCatalog(spark.sessionState().newHadoopConf(), "file:" + warehouse); + break; + case ICEBERG_CATALOG_TYPE_REST: + // remove rest + // this.validationCatalog = restCatalog; + // break; + case ICEBERG_CATALOG_TYPE_HIVE: + this.validationCatalog = catalog; + break; + default: + throw new IllegalArgumentException("Unknown catalog type: " + catalogType); + } + } else if (catalogConfig.containsKey(CATALOG_IMPL)) { + switch (catalogConfig.get(CATALOG_IMPL)) { + case "org.apache.iceberg.inmemory.InMemoryCatalog": + this.validationCatalog = new InMemoryCatalog(); + break; + default: + throw new IllegalArgumentException("Unknown catalog impl"); + } + } + this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog; + } + + @BeforeEach + public void before() { + configureValidationCatalog(); + + spark.conf().set("spark.sql.catalog." + catalogName, implementation); + catalogConfig.forEach( + (key, value) -> spark.conf().set("spark.sql.catalog." + catalogName + "." + key, value)); + + if ("hadoop".equalsIgnoreCase(catalogConfig.get("type"))) { + spark.conf().set("spark.sql.catalog." + catalogName + ".warehouse", "file:" + warehouse); + } + + this.tableName = + (catalogName.equals("spark_catalog") ? "" : catalogName + ".") + "default.table"; + + sql("CREATE NAMESPACE IF NOT EXISTS default"); + } + + protected String tableName(String name) { + return (catalogName.equals("spark_catalog") ? "" : catalogName + ".") + "default." + name; + } + + protected String commitTarget() { + return tableName; + } + + protected String selectTarget() { + return tableName; + } + + protected boolean cachingCatalogEnabled() { + return PropertyUtil.propertyAsBoolean( + catalogConfig, CatalogProperties.CACHE_ENABLED, CatalogProperties.CACHE_ENABLED_DEFAULT); + } + + protected void configurePlanningMode(PlanningMode planningMode) { + configurePlanningMode(tableName, planningMode); + } + + protected void configurePlanningMode(String table, PlanningMode planningMode) { + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s', '%s' '%s')", + table, + TableProperties.DATA_PLANNING_MODE, + planningMode.modeName(), + TableProperties.DELETE_PLANNING_MODE, + planningMode.modeName()); + } +} diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/data/AvroDataTest.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/data/AvroDataTest.java new file mode 100644 index 000000000000..f5194f7458b7 --- /dev/null +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/data/AvroDataTest.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.spark.data; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.StructType; +import org.apache.spark.sql.internal.SQLConf; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +public abstract class AvroDataTest { + + protected abstract void writeAndValidate(Schema schema) throws IOException; + + protected static final StructType SUPPORTED_PRIMITIVES = + StructType.of( + required(100, "id", LongType.get()), + optional(101, "data", Types.StringType.get()), + required(102, "b", Types.BooleanType.get()), + optional(103, "i", Types.IntegerType.get()), + required(104, "l", LongType.get()), + optional(105, "f", Types.FloatType.get()), + required(106, "d", Types.DoubleType.get()), + optional(107, "date", Types.DateType.get()), + required(108, "ts", Types.TimestampType.withZone()), + required(110, "s", Types.StringType.get()), + required(111, "uuid", Types.UUIDType.get()), + required(112, "fixed", Types.FixedType.ofLength(7)), + optional(113, "bytes", Types.BinaryType.get()), + required(114, "dec_9_0", Types.DecimalType.of(9, 0)), // int encoded + required(115, "dec_11_2", Types.DecimalType.of(11, 2)), // long encoded + required(116, "dec_20_5", Types.DecimalType.of(20, 5)), // requires padding + required(117, "dec_38_10", Types.DecimalType.of(38, 10)) // Spark's maximum precision + ); + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testSimpleStruct() throws IOException { + writeAndValidate(TypeUtil.assignIncreasingFreshIds(new Schema(SUPPORTED_PRIMITIVES.fields()))); + } + + @Test + public void testStructWithRequiredFields() throws IOException { + writeAndValidate( + TypeUtil.assignIncreasingFreshIds( + new Schema( + Lists.transform(SUPPORTED_PRIMITIVES.fields(), Types.NestedField::asRequired)))); + } + + @Test + public void testStructWithOptionalFields() throws IOException { + writeAndValidate( + TypeUtil.assignIncreasingFreshIds( + new Schema( + Lists.transform(SUPPORTED_PRIMITIVES.fields(), Types.NestedField::asOptional)))); + } + + @Test + public void testNestedStruct() throws IOException { + writeAndValidate( + TypeUtil.assignIncreasingFreshIds(new Schema(required(1, "struct", SUPPORTED_PRIMITIVES)))); + } + + @Test + public void testArray() throws IOException { + Schema schema = + new Schema( + required(0, "id", LongType.get()), + optional(1, "data", ListType.ofOptional(2, Types.StringType.get()))); + + writeAndValidate(schema); + } + + @Test + public void testArrayOfStructs() throws IOException { + Schema schema = + TypeUtil.assignIncreasingFreshIds( + new Schema( + required(0, "id", LongType.get()), + optional(1, "data", ListType.ofOptional(2, SUPPORTED_PRIMITIVES)))); + + writeAndValidate(schema); + } + + @Test + public void testMap() throws IOException { + Schema schema = + new Schema( + required(0, "id", LongType.get()), + optional( + 1, + "data", + MapType.ofOptional(2, 3, Types.StringType.get(), Types.StringType.get()))); + + writeAndValidate(schema); + } + + @Test + public void testNumericMapKey() throws IOException { + Schema schema = + new Schema( + required(0, "id", LongType.get()), + optional(1, "data", MapType.ofOptional(2, 3, LongType.get(), Types.StringType.get()))); + + writeAndValidate(schema); + } + + @Test + public void testComplexMapKey() throws IOException { + Schema schema = + new Schema( + required(0, "id", LongType.get()), + optional( + 1, + "data", + MapType.ofOptional( + 2, + 3, + StructType.of( + required(4, "i", Types.IntegerType.get()), + optional(5, "s", Types.StringType.get())), + Types.StringType.get()))); + + writeAndValidate(schema); + } + + @Test + public void testMapOfStructs() throws IOException { + Schema schema = + TypeUtil.assignIncreasingFreshIds( + new Schema( + required(0, "id", LongType.get()), + optional( + 1, + "data", + MapType.ofOptional(2, 3, Types.StringType.get(), SUPPORTED_PRIMITIVES)))); + + writeAndValidate(schema); + } + + @Test + public void testMixedTypes() throws IOException { + StructType structType = + StructType.of( + required(0, "id", LongType.get()), + optional( + 1, + "list_of_maps", + ListType.ofOptional( + 2, MapType.ofOptional(3, 4, Types.StringType.get(), SUPPORTED_PRIMITIVES))), + optional( + 5, + "map_of_lists", + MapType.ofOptional( + 6, 7, Types.StringType.get(), ListType.ofOptional(8, SUPPORTED_PRIMITIVES))), + required( + 9, + "list_of_lists", + ListType.ofOptional(10, ListType.ofOptional(11, SUPPORTED_PRIMITIVES))), + required( + 12, + "map_of_maps", + MapType.ofOptional( + 13, + 14, + Types.StringType.get(), + MapType.ofOptional(15, 16, Types.StringType.get(), SUPPORTED_PRIMITIVES))), + required( + 17, + "list_of_struct_of_nested_types", + ListType.ofOptional( + 19, + StructType.of( + Types.NestedField.required( + 20, + "m1", + MapType.ofOptional( + 21, 22, Types.StringType.get(), SUPPORTED_PRIMITIVES)), + Types.NestedField.optional( + 23, "l1", ListType.ofRequired(24, SUPPORTED_PRIMITIVES)), + Types.NestedField.required( + 25, "l2", ListType.ofRequired(26, SUPPORTED_PRIMITIVES)), + Types.NestedField.optional( + 27, + "m2", + MapType.ofOptional( + 28, 29, Types.StringType.get(), SUPPORTED_PRIMITIVES)))))); + + Schema schema = + new Schema( + TypeUtil.assignFreshIds(structType, new AtomicInteger(0)::incrementAndGet) + .asStructType() + .fields()); + + writeAndValidate(schema); + } + + @Test + public void testTimestampWithoutZone() throws IOException { + Schema schema = + TypeUtil.assignIncreasingFreshIds( + new Schema( + required(0, "id", LongType.get()), + optional(1, "ts_without_zone", Types.TimestampType.withoutZone()))); + + writeAndValidate(schema); + } + + protected void withSQLConf(Map conf, Action action) throws IOException { + SQLConf sqlConf = SQLConf.get(); + + Map currentConfValues = Maps.newHashMap(); + conf.keySet() + .forEach( + confKey -> { + if (sqlConf.contains(confKey)) { + String currentConfValue = sqlConf.getConfString(confKey); + currentConfValues.put(confKey, currentConfValue); + } + }); + + conf.forEach( + (confKey, confValue) -> { + if (SQLConf.isStaticConfigKey(confKey)) { + throw new RuntimeException("Cannot modify the value of a static config: " + confKey); + } + sqlConf.setConfString(confKey, confValue); + }); + + try { + action.invoke(); + } finally { + conf.forEach( + (confKey, confValue) -> { + if (currentConfValues.containsKey(confKey)) { + sqlConf.setConfString(confKey, currentConfValues.get(confKey)); + } else { + sqlConf.unsetConf(confKey); + } + }); + } + } + + @FunctionalInterface + protected interface Action { + void invoke() throws IOException; + } +} diff --git a/backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/data/RandomData.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/data/RandomData.java similarity index 100% rename from backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/data/RandomData.java rename to backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/data/RandomData.java diff --git a/backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/data/TestHelpers.java similarity index 73% rename from backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/data/TestHelpers.java rename to backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/data/TestHelpers.java index dcc7d91d7d19..dc1fd927e21f 100644 --- a/backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/data/TestHelpers.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/data/TestHelpers.java @@ -16,16 +16,26 @@ */ package org.apache.iceberg.spark.data; -import org.apache.iceberg.*; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.shaded.org.apache.avro.Schema.Field; import org.apache.iceberg.shaded.org.apache.avro.generic.GenericData; import org.apache.iceberg.shaded.org.apache.avro.generic.GenericData.Record; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; +import org.apache.iceberg.util.DeleteFileSet; import org.apache.orc.storage.serde2.io.DateWritable; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; @@ -36,17 +46,34 @@ import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.catalyst.util.MapData; -import org.apache.spark.sql.types.*; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.BinaryType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.TimestampNTZType; +import org.apache.spark.sql.types.TimestampType$; import org.apache.spark.sql.vectorized.ColumnarBatch; import org.apache.spark.unsafe.types.UTF8String; -import org.junit.Assert; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.sql.Timestamp; -import java.time.*; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; -import java.util.*; +import java.util.Collection; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -70,11 +97,20 @@ public static void assertEqualsSafe(Types.StructType struct, List recs, public static void assertEqualsSafe(Types.StructType struct, Record rec, Row row) { List fields = struct.fields(); - for (int i = 0; i < fields.size(); i += 1) { - Type fieldType = fields.get(i).type(); - - Object expectedValue = rec.get(i); - Object actualValue = row.get(i); + for (int readPos = 0; readPos < fields.size(); readPos += 1) { + Types.NestedField field = fields.get(readPos); + Field writeField = rec.getSchema().getField(field.name()); + + Type fieldType = field.type(); + Object actualValue = row.get(readPos); + + Object expectedValue; + if (writeField != null) { + int writePos = writeField.pos(); + expectedValue = rec.get(writePos); + } else { + expectedValue = field.initialDefault(); + } assertEqualsSafe(fieldType, expectedValue, actualValue); } @@ -83,13 +119,25 @@ public static void assertEqualsSafe(Types.StructType struct, Record rec, Row row public static void assertEqualsBatch( Types.StructType struct, Iterator expected, ColumnarBatch batch) { for (int rowId = 0; rowId < batch.numRows(); rowId++) { - List fields = struct.fields(); InternalRow row = batch.getRow(rowId); Record rec = expected.next(); - for (int i = 0; i < fields.size(); i += 1) { - Type fieldType = fields.get(i).type(); - Object expectedValue = rec.get(i); - Object actualValue = row.isNullAt(i) ? null : row.get(i, convert(fieldType)); + + List fields = struct.fields(); + for (int readPos = 0; readPos < fields.size(); readPos += 1) { + Types.NestedField field = fields.get(readPos); + Field writeField = rec.getSchema().getField(field.name()); + + Type fieldType = field.type(); + Object actualValue = row.isNullAt(readPos) ? null : row.get(readPos, convert(fieldType)); + + Object expectedValue; + if (writeField != null) { + int writePos = writeField.pos(); + expectedValue = rec.get(writePos); + } else { + expectedValue = field.initialDefault(); + } + assertEqualsUnsafe(fieldType, expectedValue, actualValue); } } @@ -121,7 +169,7 @@ private static void assertEqualsSafe(Types.MapType map, Map expected, Map< } } - Assert.assertNotNull("Should have a matching key", matchingKey); + assertThat(matchingKey).as("Should have a matching key").isNotNull(); assertEqualsSafe(valueType, expected.get(expectedKey), actual.get(matchingKey)); } } @@ -141,14 +189,16 @@ private static void assertEqualsSafe(Type type, Object expected, Object actual) case LONG: case FLOAT: case DOUBLE: - Assert.assertEquals("Primitive value should be equal to expected", expected, actual); + assertThat(actual).as("Primitive value should be equal to expected").isEqualTo(expected); break; case DATE: assertThat(expected).as("Should be an int").isInstanceOf(Integer.class); assertThat(actual).as("Should be a Date").isInstanceOf(Date.class); - int daysFromEpoch = (Integer) expected; - LocalDate date = ChronoUnit.DAYS.addTo(EPOCH_DAY, daysFromEpoch); - Assert.assertEquals("ISO-8601 date should be equal", date.toString(), actual.toString()); + LocalDate date = ChronoUnit.DAYS.addTo(EPOCH_DAY, (Integer) expected); + assertThat(actual) + .as("ISO-8601 date should be equal") + .asString() + .isEqualTo(String.valueOf(date)); break; case TIMESTAMP: Types.TimestampType timestampType = (Types.TimestampType) type; @@ -160,7 +210,7 @@ private static void assertEqualsSafe(Type type, Object expected, Object actual) Timestamp ts = (Timestamp) actual; // milliseconds from nanos has already been added by getTime long tsMicros = (ts.getTime() * 1000) + ((ts.getNanos() / 1000) % 1000); - Assert.assertEquals("Timestamp micros should be equal", expected, tsMicros); + assertThat(tsMicros).as("Timestamp micros should be equal").isEqualTo(expected); } else { assertThat(actual).as("Should be a LocalDateTime").isInstanceOf(LocalDateTime.class); @@ -168,34 +218,42 @@ private static void assertEqualsSafe(Type type, Object expected, Object actual) Instant instant = ts.toInstant(ZoneOffset.UTC); // milliseconds from nanos has already been added by getTime long tsMicros = (instant.toEpochMilli() * 1000) + ((ts.getNano() / 1000) % 1000); - Assert.assertEquals("Timestamp micros should be equal", expected, tsMicros); + assertThat(tsMicros).as("Timestamp micros should be equal").isEqualTo(expected); } break; case STRING: - assertThat(actual).as("Should be a String").isInstanceOf(String.class); - Assert.assertEquals("Strings should be equal", String.valueOf(expected), actual); + assertThat(actual).isInstanceOf(String.class).isEqualTo(String.valueOf(expected)); break; case UUID: assertThat(expected).as("Should expect a UUID").isInstanceOf(UUID.class); - assertThat(actual).as("Should be a String").isInstanceOf(String.class); - Assert.assertEquals("UUID string representation should match", expected.toString(), actual); + assertThat(actual) + .isInstanceOf(String.class) + .asString() + .isEqualTo(String.valueOf(expected)); break; case FIXED: - assertThat(expected).as("Should expect a Fixed").isInstanceOf(GenericData.Fixed.class); - assertThat(actual).as("Should be a byte[]").isInstanceOf(byte[].class); - Assert.assertArrayEquals( - "Bytes should match", ((GenericData.Fixed) expected).bytes(), (byte[]) actual); + // generated data is written using Avro or Parquet/Avro so generated rows use + // GenericData.Fixed, but default values are converted from Iceberg's internal + // representation so the expected value may be either class. + byte[] expectedBytes; + if (expected instanceof ByteBuffer) { + expectedBytes = ByteBuffers.toByteArray((ByteBuffer) expected); + } else if (expected instanceof GenericData.Fixed) { + expectedBytes = ((GenericData.Fixed) expected).bytes(); + } else { + throw new IllegalStateException( + "Invalid expected value, not byte[] or Fixed: " + expected); + } + + assertThat(actual).isInstanceOf(byte[].class).isEqualTo(expectedBytes); break; case BINARY: assertThat(expected).as("Should expect a ByteBuffer").isInstanceOf(ByteBuffer.class); - assertThat(actual).as("Should be a byte[]").isInstanceOf(byte[].class); - Assert.assertArrayEquals( - "Bytes should match", ((ByteBuffer) expected).array(), (byte[]) actual); + assertThat(actual).isInstanceOf(byte[].class).isEqualTo(((ByteBuffer) expected).array()); break; case DECIMAL: assertThat(expected).as("Should expect a BigDecimal").isInstanceOf(BigDecimal.class); - assertThat(actual).as("Should be a BigDecimal").isInstanceOf(BigDecimal.class); - Assert.assertEquals("BigDecimals should be equal", expected, actual); + assertThat(actual).isInstanceOf(BigDecimal.class).isEqualTo(expected); break; case STRUCT: assertThat(expected).as("Should expect a Record").isInstanceOf(Record.class); @@ -206,7 +264,7 @@ private static void assertEqualsSafe(Type type, Object expected, Object actual) assertThat(expected).as("Should expect a Collection").isInstanceOf(Collection.class); assertThat(actual).as("Should be a Seq").isInstanceOf(Seq.class); List asList = seqAsJavaListConverter((Seq) actual).asJava(); - assertEqualsSafe(type.asNestedType().asListType(), (Collection) expected, asList); + assertEqualsSafe(type.asNestedType().asListType(), (Collection) expected, asList); break; case MAP: assertThat(expected).as("Should expect a Collection").isInstanceOf(Map.class); @@ -223,11 +281,20 @@ private static void assertEqualsSafe(Type type, Object expected, Object actual) public static void assertEqualsUnsafe(Types.StructType struct, Record rec, InternalRow row) { List fields = struct.fields(); - for (int i = 0; i < fields.size(); i += 1) { - Type fieldType = fields.get(i).type(); - - Object expectedValue = rec.get(i); - Object actualValue = row.isNullAt(i) ? null : row.get(i, convert(fieldType)); + for (int readPos = 0; readPos < fields.size(); readPos += 1) { + Types.NestedField field = fields.get(readPos); + Field writeField = rec.getSchema().getField(field.name()); + + Type fieldType = field.type(); + Object actualValue = row.isNullAt(readPos) ? null : row.get(readPos, convert(fieldType)); + + Object expectedValue; + if (writeField != null) { + int writePos = writeField.pos(); + expectedValue = rec.get(writePos); + } else { + expectedValue = field.initialDefault(); + } assertEqualsUnsafe(fieldType, expectedValue, actualValue); } @@ -272,20 +339,19 @@ private static void assertEqualsUnsafe(Type type, Object expected, Object actual case LONG: assertThat(actual).as("Should be a long").isInstanceOf(Long.class); if (expected instanceof Integer) { - Assert.assertEquals("Values didn't match", ((Number) expected).longValue(), actual); + assertThat(actual).as("Values didn't match").isEqualTo(((Number) expected).longValue()); } else { - Assert.assertEquals("Primitive value should be equal to expected", expected, actual); + assertThat(actual).as("Primitive value should be equal to expected").isEqualTo(expected); } break; case DOUBLE: assertThat(actual).as("Should be a double").isInstanceOf(Double.class); if (expected instanceof Float) { - Assert.assertEquals( - "Values didn't match", - Double.doubleToLongBits(((Number) expected).doubleValue()), - Double.doubleToLongBits((double) actual)); + assertThat(Double.doubleToLongBits((double) actual)) + .as("Values didn't match") + .isEqualTo(Double.doubleToLongBits(((Number) expected).doubleValue())); } else { - Assert.assertEquals("Primitive value should be equal to expected", expected, actual); + assertThat(actual).as("Primitive value should be equal to expected").isEqualTo(expected); } break; case INTEGER: @@ -293,35 +359,45 @@ private static void assertEqualsUnsafe(Type type, Object expected, Object actual case BOOLEAN: case DATE: case TIMESTAMP: - Assert.assertEquals("Primitive value should be equal to expected", expected, actual); + assertThat(actual).as("Primitive value should be equal to expected").isEqualTo(expected); break; case STRING: - assertThat(actual).as("Should be a UTF8String").isInstanceOf(UTF8String.class); - Assert.assertEquals("Strings should be equal", expected, actual.toString()); + assertThat(actual).isInstanceOf(UTF8String.class).asString().isEqualTo(expected); break; case UUID: assertThat(expected).as("Should expect a UUID").isInstanceOf(UUID.class); - assertThat(actual).as("Should be a UTF8String").isInstanceOf(UTF8String.class); - Assert.assertEquals( - "UUID string representation should match", expected.toString(), actual.toString()); + assertThat(actual) + .isInstanceOf(UTF8String.class) + .asString() + .isEqualTo(String.valueOf(expected)); break; case FIXED: - assertThat(expected).as("Should expect a Fixed").isInstanceOf(GenericData.Fixed.class); + // generated data is written using Avro or Parquet/Avro so generated rows use + // GenericData.Fixed, but default values are converted from Iceberg's internal + // representation so the expected value may be either class. + byte[] expectedBytes; + if (expected instanceof ByteBuffer) { + expectedBytes = ByteBuffers.toByteArray((ByteBuffer) expected); + } else if (expected instanceof GenericData.Fixed) { + expectedBytes = ((GenericData.Fixed) expected).bytes(); + } else { + throw new IllegalStateException( + "Invalid expected value, not byte[] or Fixed: " + expected); + } + assertThat(actual).as("Should be a byte[]").isInstanceOf(byte[].class); - Assert.assertArrayEquals( - "Bytes should match", ((GenericData.Fixed) expected).bytes(), (byte[]) actual); + assertThat(actual).as("Bytes should match").isEqualTo(expectedBytes); break; case BINARY: assertThat(expected).as("Should expect a ByteBuffer").isInstanceOf(ByteBuffer.class); - assertThat(actual).as("Should be a byte[]").isInstanceOf(byte[].class); - Assert.assertArrayEquals( - "Bytes should match", ((ByteBuffer) expected).array(), (byte[]) actual); + assertThat(actual).isInstanceOf(byte[].class).isEqualTo(((ByteBuffer) expected).array()); break; case DECIMAL: assertThat(expected).as("Should expect a BigDecimal").isInstanceOf(BigDecimal.class); assertThat(actual).as("Should be a Decimal").isInstanceOf(Decimal.class); - Assert.assertEquals( - "BigDecimals should be equal", expected, ((Decimal) actual).toJavaBigDecimal()); + assertThat(((Decimal) actual).toJavaBigDecimal()) + .as("BigDecimals should be equal") + .isEqualTo(expected); break; case STRUCT: assertThat(expected).as("Should expect a Record").isInstanceOf(Record.class); @@ -333,12 +409,12 @@ private static void assertEqualsUnsafe(Type type, Object expected, Object actual assertThat(expected).as("Should expect a Collection").isInstanceOf(Collection.class); assertThat(actual).as("Should be an ArrayData").isInstanceOf(ArrayData.class); assertEqualsUnsafe( - type.asNestedType().asListType(), (Collection) expected, (ArrayData) actual); + type.asNestedType().asListType(), (Collection) expected, (ArrayData) actual); break; case MAP: assertThat(expected).as("Should expect a Map").isInstanceOf(Map.class); assertThat(actual).as("Should be an ArrayBasedMapData").isInstanceOf(MapData.class); - assertEqualsUnsafe(type.asNestedType().asMapType(), (Map) expected, (MapData) actual); + assertEqualsUnsafe(type.asNestedType().asMapType(), (Map) expected, (MapData) actual); break; case TIME: default: @@ -357,7 +433,7 @@ private static void assertEqualsUnsafe(Type type, Object expected, Object actual public static void assertEquals( String prefix, Types.StructType type, InternalRow expected, Row actual) { if (expected == null || actual == null) { - Assert.assertEquals(prefix, expected, actual); + assertThat(actual).as(prefix).isEqualTo(expected); } else { List fields = type.fields(); for (int c = 0; c < fields.size(); ++c) { @@ -373,10 +449,9 @@ public static void assertEquals( case DECIMAL: case DATE: case TIMESTAMP: - Assert.assertEquals( - prefix + "." + fieldName + " - " + childType, - getValue(expected, c, childType), - getPrimitiveValue(actual, c, childType)); + assertThat(getPrimitiveValue(actual, c, childType)) + .as(prefix + "." + fieldName + " - " + childType) + .isEqualTo(getValue(expected, c, childType)); break; case UUID: case FIXED: @@ -420,9 +495,9 @@ public static void assertEquals( private static void assertEqualsLists( String prefix, Types.ListType type, ArrayData expected, List actual) { if (expected == null || actual == null) { - Assert.assertEquals(prefix, expected, actual); + assertThat(actual).as(prefix).isEqualTo(expected); } else { - Assert.assertEquals(prefix + " length", expected.numElements(), actual.size()); + assertThat(actual).as(prefix + "length").hasSize(expected.numElements()); Type childType = type.elementType(); for (int e = 0; e < expected.numElements(); ++e) { switch (childType.typeId()) { @@ -435,10 +510,10 @@ private static void assertEqualsLists( case DECIMAL: case DATE: case TIMESTAMP: - Assert.assertEquals( - prefix + ".elem " + e + " - " + childType, - getValue(expected, e, childType), - actual.get(e)); + assertThat(actual) + .as(prefix + ".elem " + e + " - " + childType) + .element(e) + .isEqualTo(getValue(expected, e, childType)); break; case UUID: case FIXED: @@ -482,21 +557,20 @@ private static void assertEqualsLists( private static void assertEqualsMaps( String prefix, Types.MapType type, MapData expected, Map actual) { if (expected == null || actual == null) { - Assert.assertEquals(prefix, expected, actual); + assertThat(actual).as(prefix).isEqualTo(expected); } else { Type keyType = type.keyType(); Type valueType = type.valueType(); ArrayData expectedKeyArray = expected.keyArray(); ArrayData expectedValueArray = expected.valueArray(); - Assert.assertEquals(prefix + " length", expected.numElements(), actual.size()); + assertThat(actual).as(prefix + " length").hasSize(expectedKeyArray.numElements()); for (int e = 0; e < expected.numElements(); ++e) { Object expectedKey = getValue(expectedKeyArray, e, keyType); Object actualValue = actual.get(expectedKey); if (actualValue == null) { - Assert.assertEquals( - prefix + ".key=" + expectedKey + " has null", - true, - expected.valueArray().isNullAt(e)); + assertThat(expected.valueArray().isNullAt(e)) + .as(prefix + ".key=" + expectedKey + " has null") + .isTrue(); } else { switch (valueType.typeId()) { case BOOLEAN: @@ -508,10 +582,9 @@ private static void assertEqualsMaps( case DECIMAL: case DATE: case TIMESTAMP: - Assert.assertEquals( - prefix + ".key=" + expectedKey + " - " + valueType, - getValue(expectedValueArray, e, valueType), - actual.get(expectedKey)); + assertThat(actual.get(expectedKey)) + .as(prefix + ".key=" + expectedKey + " - " + valueType) + .isEqualTo(getValue(expectedValueArray, e, valueType)); break; case UUID: case FIXED: @@ -641,11 +714,7 @@ private static List toList(Seq val) { } private static void assertEqualBytes(String context, byte[] expected, byte[] actual) { - if (expected == null || actual == null) { - Assert.assertEquals(context, expected, actual); - } else { - Assert.assertArrayEquals(context, expected, actual); - } + assertThat(actual).as(context).isEqualTo(expected); } static void assertEquals(Schema schema, Object expected, Object actual) { @@ -685,16 +754,24 @@ private static void assertEquals(String context, DataType type, Object expected, } else if (type instanceof BinaryType) { assertEqualBytes(context, (byte[]) expected, (byte[]) actual); } else { - Assert.assertEquals("Value should match expected: " + context, expected, actual); + assertThat(actual).as("Value should match expected: " + context).isEqualTo(expected); } } private static void assertEquals( String context, StructType struct, InternalRow expected, InternalRow actual) { - Assert.assertEquals("Should have correct number of fields", struct.size(), actual.numFields()); + assertThat(actual.numFields()) + .as("Should have correct number of fields") + .isEqualTo(struct.size()); for (int i = 0; i < actual.numFields(); i += 1) { StructField field = struct.fields()[i]; DataType type = field.dataType(); + // ColumnarRow.get doesn't support TimestampNTZType, causing tests to fail. the representation + // is identical to TimestampType so this uses that type to validate. + if (type instanceof TimestampNTZType) { + type = TimestampType$.MODULE$; + } + assertEquals( context + "." + field.name(), type, @@ -705,8 +782,9 @@ private static void assertEquals( private static void assertEquals( String context, ArrayType array, ArrayData expected, ArrayData actual) { - Assert.assertEquals( - "Should have the same number of elements", expected.numElements(), actual.numElements()); + assertThat(actual.numElements()) + .as("Should have the same number of elements") + .isEqualTo(expected.numElements()); DataType type = array.elementType(); for (int i = 0; i < actual.numElements(); i += 1) { assertEquals( @@ -718,8 +796,9 @@ private static void assertEquals( } private static void assertEquals(String context, MapType map, MapData expected, MapData actual) { - Assert.assertEquals( - "Should have the same number of elements", expected.numElements(), actual.numElements()); + assertThat(actual.numElements()) + .as("Should have the same number of elements") + .isEqualTo(expected.numElements()); DataType keyType = map.keyType(); ArrayData expectedKeys = expected.keyArray(); @@ -766,7 +845,7 @@ public static List dataFiles(Table table, String branch) { } public static Set deleteFiles(Table table) { - Set deleteFiles = Sets.newHashSet(); + DeleteFileSet deleteFiles = DeleteFileSet.create(); for (FileScanTask task : table.newScan().planFiles()) { deleteFiles.addAll(task.deletes()); @@ -775,6 +854,16 @@ public static Set deleteFiles(Table table) { return deleteFiles; } + public static Set deleteFiles(Table table, Snapshot snapshot) { + DeleteFileSet deleteFiles = DeleteFileSet.create(); + + for (FileScanTask task : table.newScan().useSnapshot(snapshot.snapshotId()).planFiles()) { + deleteFiles.addAll(task.deletes()); + } + + return deleteFiles; + } + public static Set reachableManifestPaths(Table table) { return StreamSupport.stream(table.snapshots().spliterator(), false) .flatMap(s -> s.allManifests(table.io()).stream()) @@ -792,11 +881,14 @@ public static void asMetadataRecord(Record file) { file.put(3, 0); // specId } + // suppress the readable metrics and first-row-id that are not in manifest files + private static final Set DERIVED_FIELDS = Set.of("readable_metrics", "first_row_id"); + public static Dataset selectNonDerived(Dataset metadataTable) { StructField[] fields = metadataTable.schema().fields(); return metadataTable.select( Stream.of(fields) - .filter(f -> !f.name().equals("readable_metrics")) // derived field + .filter(f -> !DERIVED_FIELDS.contains(f.name())) .map(f -> new Column(f.name())) .toArray(Column[]::new)); } diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java new file mode 100644 index 000000000000..9a750609fb51 --- /dev/null +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.spark.extensions; + +import org.apache.gluten.TestConfUtil; + +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.TestHiveMetastore; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.TestBase; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.internal.SQLConf; +import org.junit.jupiter.api.BeforeAll; + +import java.net.InetAddress; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; + +public abstract class ExtensionsTestBase extends CatalogTestBase { + + private static final Random RANDOM = ThreadLocalRandom.current(); + + @BeforeAll + public static void startMetastoreAndSpark() { + TestBase.metastore = new TestHiveMetastore(); + metastore.start(); + TestBase.hiveConf = metastore.hiveConf(); + + TestBase.spark.close(); + + TestBase.spark = + SparkSession.builder() + .master("local[2]") + .config(TestConfUtil.GLUTEN_CONF) + .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress()) + .config("spark.testing", "true") + .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") + .config("spark.sql.extensions", IcebergSparkSessionExtensions.class.getName()) + .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) + .config("spark.sql.shuffle.partitions", "4") + .config("spark.sql.hive.metastorePartitionPruningFallbackOnException", "true") + .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") + .config( + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), String.valueOf(RANDOM.nextBoolean())) + .enableHiveSupport() + .getOrCreate(); + + TestBase.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + TestBase.catalog = + (HiveCatalog) + CatalogUtil.loadCatalog( + HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf); + } +} diff --git a/backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/source/LogMessage.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/source/LogMessage.java similarity index 100% rename from backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/source/LogMessage.java rename to backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/source/LogMessage.java diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java index da6e83f6f66f..1d189cebd189 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java @@ -18,7 +18,17 @@ import org.apache.gluten.TestConfUtil; -import org.apache.iceberg.*; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PlanningMode; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.FileAppender; @@ -33,14 +43,13 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.ExtendWith; import java.io.File; import java.io.IOException; +import java.net.InetAddress; import java.util.List; import java.util.Map; import java.util.UUID; @@ -50,38 +59,38 @@ import static org.apache.iceberg.PlanningMode.LOCAL; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; -@RunWith(Parameterized.class) +@ExtendWith(ParameterizedTestExtension.class) public class TestSparkReadProjection extends TestReadProjection { private static SparkSession spark = null; - @Parameterized.Parameters(name = "format = {0}, vectorized = {1}, planningMode = {2}") + @Parameters(name = "format = {0}, vectorized = {1}, planningMode = {2}") public static Object[][] parameters() { return new Object[][] { - {"parquet", false, LOCAL}, - {"parquet", true, DISTRIBUTED}, - {"avro", false, LOCAL}, - {"orc", false, DISTRIBUTED}, - {"orc", true, LOCAL} + {FileFormat.PARQUET, false, LOCAL}, + {FileFormat.PARQUET, true, DISTRIBUTED}, + {FileFormat.AVRO, false, LOCAL}, + {FileFormat.ORC, false, DISTRIBUTED}, + {FileFormat.ORC, true, LOCAL} }; } - private final FileFormat format; - private final boolean vectorized; - private final PlanningMode planningMode; + @Parameter(index = 1) + private boolean vectorized; - public TestSparkReadProjection(String format, boolean vectorized, PlanningMode planningMode) { - super(format); - this.format = FileFormat.fromString(format); - this.vectorized = vectorized; - this.planningMode = planningMode; - } + @Parameter(index = 2) + private PlanningMode planningMode; - @BeforeClass + @BeforeAll public static void startSpark() { TestSparkReadProjection.spark = - SparkSession.builder().master("local[2]").config(TestConfUtil.GLUTEN_CONF).getOrCreate(); + SparkSession.builder() + .master("local[2]") + .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress()) + .config(TestConfUtil.GLUTEN_CONF) + .getOrCreate(); ImmutableMap config = ImmutableMap.of( "type", "hive", @@ -95,7 +104,7 @@ public static void startSpark() { (key, value) -> spark.conf().set("spark.sql.catalog.spark_catalog." + key, value)); } - @AfterClass + @AfterAll public static void stopSpark() { SparkSession currentSpark = TestSparkReadProjection.spark; TestSparkReadProjection.spark = null; @@ -105,10 +114,10 @@ public static void stopSpark() { @Override protected Record writeAndRead(String desc, Schema writeSchema, Schema readSchema, Record record) throws IOException { - File parent = temp.newFolder(desc); + File parent = new File(temp.toFile(), desc); File location = new File(parent, "test"); File dataFolder = new File(location, "data"); - Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs()); + assertThat(dataFolder.mkdirs()).as("mkdirs should succeed").isTrue(); File testFile = new File(dataFolder, format.addExtension(UUID.randomUUID().toString())); @@ -150,8 +159,7 @@ protected Record writeAndRead(String desc, Schema writeSchema, Schema readSchema Schema expectedSchema = reassignIds(readSchema, idMapping); // Set the schema to the expected schema directly to simulate the table schema evolving - TestTables.replaceMetadata( - desc, TestTables.readMetadata(desc).updateSchema(expectedSchema, 100)); + TestTables.replaceMetadata(desc, TestTables.readMetadata(desc).updateSchema(expectedSchema)); Dataset df = spark diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index d3350ddf2533..d06cb2580e3d 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -17,15 +17,36 @@ package org.apache.iceberg.spark.source; import org.apache.gluten.TestConfUtil; -import org.apache.gluten.config.GlutenConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.iceberg.*; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PlanningMode; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.*; +import org.apache.iceberg.data.DeleteReadTests; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.InternalRecordWrapper; import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.hive.HiveCatalog; @@ -45,7 +66,11 @@ import org.apache.iceberg.spark.data.SparkParquetWriters; import org.apache.iceberg.spark.source.metrics.NumDeletes; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.*; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.StructLikeSet; +import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -70,27 +95,32 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; +// Change for compile, import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter; +// import org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.HadoopInputFile; @ExtendWith(ParameterizedTestExtension.class) public class TestSparkReaderDeletes extends DeleteReadTests { - private static TestHiveMetastore metastore = null; protected static SparkSession spark = null; protected static HiveCatalog catalog = null; - @Parameter(index = 1) + @Parameter(index = 2) private boolean vectorized; - @Parameter(index = 2) + @Parameter(index = 3) private PlanningMode planningMode; - @Parameters(name = "format = {0}, vectorized = {1}, planningMode = {2}") + @Parameters(name = "fileFormat = {0}, formatVersion = {1}, vectorized = {2}, planningMode = {3}") public static Object[][] parameters() { - return new Object[][] { - new Object[] {FileFormat.PARQUET, false, PlanningMode.DISTRIBUTED}, - new Object[] {FileFormat.PARQUET, true, PlanningMode.LOCAL}, - new Object[] {FileFormat.ORC, false, PlanningMode.DISTRIBUTED}, - new Object[] {FileFormat.AVRO, false, PlanningMode.LOCAL} - }; + List parameters = Lists.newArrayList(); + for (int version : TestHelpers.V2_AND_ABOVE) { + parameters.add(new Object[] {FileFormat.PARQUET, version, false, PlanningMode.DISTRIBUTED}); + parameters.add(new Object[] {FileFormat.PARQUET, version, true, PlanningMode.LOCAL}); + if (version == 2) { + parameters.add(new Object[] {FileFormat.ORC, version, false, PlanningMode.DISTRIBUTED}); + parameters.add(new Object[] {FileFormat.AVRO, version, false, PlanningMode.LOCAL}); + } + } + return parameters.toArray(new Object[0][]); } @BeforeAll @@ -149,6 +179,7 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) { .set(TableProperties.DEFAULT_FILE_FORMAT, format.name()) .set(TableProperties.DATA_PLANNING_MODE, planningMode.modeName()) .set(TableProperties.DELETE_PLANNING_MODE, planningMode.modeName()) + .set(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)) .commit(); if (format.equals(FileFormat.PARQUET) || format.equals(FileFormat.ORC)) { String vectorizationEnabled = @@ -173,7 +204,7 @@ protected void dropTable(String name) { catalog.dropTable(TableIdentifier.of("default", name)); } - // The native side does not report the numDeletes metric. + // Change to not check, the native side does not report the numDeletes metric. protected boolean countDeletes() { return false; } @@ -207,6 +238,86 @@ public StructLikeSet rowSet(String name, Types.StructType projection, String... return set; } + @TestTemplate + public void testPositionDeletes() throws IOException { + List> deletes = + Lists.newArrayList( + Pair.of(dataFile.location(), 0L), // id = 29 + Pair.of(dataFile.location(), 3L), // id = 89 + Pair.of(dataFile.location(), 6L) // id = 122 + ); + + Pair posDeletes = + FileHelpers.writeDeleteFile( + table, + Files.localOutput(temp.resolve("junit" + System.nanoTime()).toFile()), + TestHelpers.Row.of(0), + deletes, + formatVersion); + + table + .newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122); + StructLikeSet actual = rowSet(tableName, table, "*"); + + assertThat(actual).as("Table should contain expected rows").isEqualTo(expected); + checkDeleteCount(3L); + } + + // Change to satisfy formatVersion + @TestTemplate + public void testMultiplePosDeleteFiles() throws IOException { + assumeThat(formatVersion) + .as("Can't write multiple delete files with formatVersion >= 3") + .isEqualTo(2); + + List> deletes = + Lists.newArrayList( + Pair.of(dataFile.location(), 0L), // id = 29 + Pair.of(dataFile.location(), 3L) // id = 89 + ); + + Pair posDeletes = + FileHelpers.writeDeleteFile( + table, + Files.localOutput(temp.resolve("junit" + System.nanoTime()).toFile()), + TestHelpers.Row.of(0), + deletes, + formatVersion); + + table + .newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + deletes = Lists.newArrayList(Pair.of(dataFile.location(), 6L)); // id = 122 + + posDeletes = + FileHelpers.writeDeleteFile( + table, + Files.localOutput(temp.resolve("junit" + System.nanoTime()).toFile()), + TestHelpers.Row.of(0), + deletes, + formatVersion); + + table + .newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122); + StructLikeSet actual = rowSet(tableName, table, "*"); + + assertThat(actual).as("Table should contain expected rows").isEqualTo(expected); + checkDeleteCount(3L); + } + @TestTemplate public void testEqualityDeleteWithFilter() throws IOException { String tableName = table.name().substring(table.name().lastIndexOf(".") + 1); @@ -299,7 +410,7 @@ public void testReadEqualityDeleteRows() throws IOException { for (CombinedScanTask task : tasks) { try (EqualityDeleteRowReader reader = - new EqualityDeleteRowReader(task, table, null, table.schema(), false)) { + new EqualityDeleteRowReader(task, table, null, table.schema(), false, true)) { while (reader.next()) { actualRowSet.add( new InternalRowWrapper( @@ -319,10 +430,10 @@ public void testPosDeletesAllRowsInBatch() throws IOException { // deleted. List> deletes = Lists.newArrayList( - Pair.of(dataFile.path(), 0L), // id = 29 - Pair.of(dataFile.path(), 1L), // id = 43 - Pair.of(dataFile.path(), 2L), // id = 61 - Pair.of(dataFile.path(), 3L) // id = 89 + Pair.of(dataFile.location(), 0L), // id = 29 + Pair.of(dataFile.location(), 1L), // id = 43 + Pair.of(dataFile.location(), 2L), // id = 61 + Pair.of(dataFile.location(), 3L) // id = 89 ); Pair posDeletes = @@ -330,7 +441,8 @@ public void testPosDeletesAllRowsInBatch() throws IOException { table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() @@ -351,10 +463,10 @@ public void testPosDeletesWithDeletedColumn() throws IOException { // deleted. List> deletes = Lists.newArrayList( - Pair.of(dataFile.path(), 0L), // id = 29 - Pair.of(dataFile.path(), 1L), // id = 43 - Pair.of(dataFile.path(), 2L), // id = 61 - Pair.of(dataFile.path(), 3L) // id = 89 + Pair.of(dataFile.location(), 0L), // id = 29 + Pair.of(dataFile.location(), 1L), // id = 43 + Pair.of(dataFile.location(), 2L), // id = 61 + Pair.of(dataFile.location(), 3L) // id = 89 ); Pair posDeletes = @@ -362,7 +474,8 @@ public void testPosDeletesWithDeletedColumn() throws IOException { table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() @@ -406,30 +519,6 @@ public void testEqualityDeleteWithDeletedColumn() throws IOException { assertThat(actual).as("Table should contain expected row").isEqualTo(expected); checkDeleteCount(3L); - // TODO, the query fallbacks because not supports equality delete. - // Error Source: RUNTIME - // Error Code: NOT_IMPLEMENTED - // Retriable: False - // Context: Split [Hive: - // /var/folders/63/845y6pk53dx_83hpw8ztdchw0000gn/T/junit-17345315326614809092/junit4173952394189821024.tmp 4 - 647] Task Gluten_Stage_5_TID_5_VTID_1 - // Additional Context: Operator: TableScan[0] 0 - // Function: prepareSplit - // File: - // /Users/chengchengjin/code/gluten/ep/build-velox/build/velox_ep/velox/connectors/hive/iceberg/IcebergSplitReader.cpp - // Line: 95 - // Stack trace: - // Check the table query data because above query is fallback by column _deleted. - // This query is fallback by equality delete files, remove this check after equality reader is - // supported. - StructLikeSet actualWithoutMetadata = - rowSet(tableName, PROJECTION_SCHEMA_WITHOUT_DELETED.asStruct(), "id", "data"); - spark.conf().set(GlutenConfig.GLUTEN_ENABLED().key(), "false"); - StructLikeSet expectWithoutMetadata = - rowSet(tableName, PROJECTION_SCHEMA_WITHOUT_DELETED.asStruct(), "id", "data"); - assertThat(actualWithoutMetadata) - .as("Table should contain expected row") - .isEqualTo(expectWithoutMetadata); - spark.conf().set(GlutenConfig.GLUTEN_ENABLED().key(), "true"); } @TestTemplate @@ -453,8 +542,8 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { List> deletes = Lists.newArrayList( - Pair.of(dataFile.path(), 3L), // id = 89 - Pair.of(dataFile.path(), 5L) // id = 121 + Pair.of(dataFile.location(), 3L), // id = 89 + Pair.of(dataFile.location(), 5L) // id = 121 ); Pair posDeletes = @@ -462,7 +551,8 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() @@ -483,10 +573,10 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { public void testFilterOnDeletedMetadataColumn() throws IOException { List> deletes = Lists.newArrayList( - Pair.of(dataFile.path(), 0L), // id = 29 - Pair.of(dataFile.path(), 1L), // id = 43 - Pair.of(dataFile.path(), 2L), // id = 61 - Pair.of(dataFile.path(), 3L) // id = 89 + Pair.of(dataFile.location(), 0L), // id = 29 + Pair.of(dataFile.location(), 1L), // id = 43 + Pair.of(dataFile.location(), 2L), // id = 61 + Pair.of(dataFile.location(), 3L) // id = 89 ); Pair posDeletes = @@ -494,7 +584,8 @@ public void testFilterOnDeletedMetadataColumn() throws IOException { table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() @@ -557,7 +648,7 @@ public void testIsDeletedColumnWithoutDeleteFile() { @TestTemplate public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOException { - assumeThat(format).isEqualTo("parquet"); + assumeThat(format).isEqualTo(FileFormat.PARQUET); String tblName = "test3"; Table tbl = createTable(tblName, SCHEMA, PartitionSpec.unpartitioned()); @@ -587,6 +678,7 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio Iterable records = RandomData.generateSpark(SCHEMA, 100, 34 * i + 37); writer.addAll(records); } + // Change for compile parquetFileWriter.appendFile(HadoopInputFile.fromPath(splitPath, conf)); } parquetFileWriter.end( @@ -606,16 +698,19 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio // Add positional deletes to the table List> deletes = Lists.newArrayList( - Pair.of(dataFile.path(), 97L), - Pair.of(dataFile.path(), 98L), - Pair.of(dataFile.path(), 99L), - Pair.of(dataFile.path(), 101L), - Pair.of(dataFile.path(), 103L), - Pair.of(dataFile.path(), 107L), - Pair.of(dataFile.path(), 109L)); + Pair.of(dataFile.location(), 97L), + Pair.of(dataFile.location(), 98L), + Pair.of(dataFile.location(), 99L), + Pair.of(dataFile.location(), 101L), + Pair.of(dataFile.location(), 103L), + Pair.of(dataFile.location(), 107L), + Pair.of(dataFile.location(), 109L)); Pair posDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), deletes); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + deletes, + formatVersion); tbl.newRowDelta() .addDeletes(posDeletes.first()) .validateDataFilesExist(posDeletes.second()) @@ -630,10 +725,6 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio required(2, "data", Types.StringType.get()), MetadataColumns.IS_DELETED); - private static final Schema PROJECTION_SCHEMA_WITHOUT_DELETED = - new Schema( - required(1, "id", Types.IntegerType.get()), required(2, "data", Types.StringType.get())); - private static StructLikeSet expectedRowSet(int... idsToRemove) { return expectedRowSet(false, false, idsToRemove); } diff --git a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestStructuredStreaming.java b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java similarity index 84% rename from backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestStructuredStreaming.java rename to backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java index 1fc2b3f9652d..fdd933e1d3e1 100644 --- a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestStructuredStreaming.java +++ b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gluten.source; +package org.apache.iceberg.spark.source; import org.apache.gluten.TestConfUtil; @@ -23,25 +23,35 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.hadoop.HadoopTables; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.spark.source.SimpleRecord; import org.apache.iceberg.types.Types; -import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.execution.streaming.MemoryStream; import org.apache.spark.sql.streaming.DataStreamWriter; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; -import org.junit.*; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import java.io.File; +import java.net.InetAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.List; import scala.Option; import scala.collection.JavaConverters; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; public class TestStructuredStreaming { @@ -52,19 +62,20 @@ public class TestStructuredStreaming { optional(1, "id", Types.IntegerType.get()), optional(2, "data", Types.StringType.get())); private static SparkSession spark = null; - @Rule public TemporaryFolder temp = new TemporaryFolder(); + @TempDir private Path temp; - @BeforeClass + @BeforeAll public static void startSpark() { TestStructuredStreaming.spark = SparkSession.builder() .master("local[2]") + .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress()) .config("spark.sql.shuffle.partitions", 4) .config(TestConfUtil.GLUTEN_CONF) .getOrCreate(); } - @AfterClass + @AfterAll public static void stopSpark() { SparkSession currentSpark = TestStructuredStreaming.spark; TestStructuredStreaming.spark = null; @@ -73,7 +84,7 @@ public static void stopSpark() { @Test public void testStreamingWriteAppendMode() throws Exception { - File parent = temp.newFolder("parquet"); + File parent = temp.resolve("parquet").toFile(); File location = new File(parent, "test-table"); File checkpoint = new File(parent, "checkpoint"); @@ -112,7 +123,8 @@ public void testStreamingWriteAppendMode() throws Exception { // remove the last commit to force Spark to reprocess batch #1 File lastCommitFile = new File(checkpoint + "/commits/1"); - Assert.assertTrue("The commit file must be deleted", lastCommitFile.delete()); + assertThat(lastCommitFile.delete()).as("The commit file must be deleted").isTrue(); + Files.deleteIfExists(Paths.get(checkpoint + "/commits/.1.crc")); // restart the query from the checkpoint StreamingQuery restartedQuery = streamWriter.start(); @@ -122,9 +134,8 @@ public void testStreamingWriteAppendMode() throws Exception { Dataset result = spark.read().format("iceberg").load(location.toString()); List actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); - Assert.assertEquals("Number of rows should match", expected.size(), actual.size()); - Assert.assertEquals("Result rows should match", expected, actual); - Assert.assertEquals("Number of snapshots should match", 2, Iterables.size(table.snapshots())); + assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected); + assertThat(table.snapshots()).as("Number of snapshots should match").hasSize(2); } finally { for (StreamingQuery query : spark.streams().active()) { query.stop(); @@ -134,7 +145,7 @@ public void testStreamingWriteAppendMode() throws Exception { @Test public void testStreamingWriteCompleteMode() throws Exception { - File parent = temp.newFolder("parquet"); + File parent = temp.resolve("parquet").toFile(); File location = new File(parent, "test-table"); File checkpoint = new File(parent, "checkpoint"); @@ -172,7 +183,8 @@ public void testStreamingWriteCompleteMode() throws Exception { // remove the last commit to force Spark to reprocess batch #1 File lastCommitFile = new File(checkpoint + "/commits/1"); - Assert.assertTrue("The commit file must be deleted", lastCommitFile.delete()); + assertThat(lastCommitFile.delete()).as("The commit file must be deleted").isTrue(); + Files.deleteIfExists(Paths.get(checkpoint + "/commits/.1.crc")); // restart the query from the checkpoint StreamingQuery restartedQuery = streamWriter.start(); @@ -182,9 +194,8 @@ public void testStreamingWriteCompleteMode() throws Exception { Dataset result = spark.read().format("iceberg").load(location.toString()); List actual = result.orderBy("data").as(Encoders.bean(SimpleRecord.class)).collectAsList(); - Assert.assertEquals("Number of rows should match", expected.size(), actual.size()); - Assert.assertEquals("Result rows should match", expected, actual); - Assert.assertEquals("Number of snapshots should match", 2, Iterables.size(table.snapshots())); + assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected); + assertThat(table.snapshots()).as("Number of snapshots should match").hasSize(2); } finally { for (StreamingQuery query : spark.streams().active()) { query.stop(); @@ -194,7 +205,7 @@ public void testStreamingWriteCompleteMode() throws Exception { @Test public void testStreamingWriteCompleteModeWithProjection() throws Exception { - File parent = temp.newFolder("parquet"); + File parent = temp.resolve("parquet").toFile(); File location = new File(parent, "test-table"); File checkpoint = new File(parent, "checkpoint"); @@ -232,7 +243,8 @@ public void testStreamingWriteCompleteModeWithProjection() throws Exception { // remove the last commit to force Spark to reprocess batch #1 File lastCommitFile = new File(checkpoint + "/commits/1"); - Assert.assertTrue("The commit file must be deleted", lastCommitFile.delete()); + assertThat(lastCommitFile.delete()).as("The commit file must be deleted").isTrue(); + Files.deleteIfExists(Paths.get(checkpoint + "/commits/.1.crc")); // restart the query from the checkpoint StreamingQuery restartedQuery = streamWriter.start(); @@ -242,9 +254,8 @@ public void testStreamingWriteCompleteModeWithProjection() throws Exception { Dataset result = spark.read().format("iceberg").load(location.toString()); List actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); - Assert.assertEquals("Number of rows should match", expected.size(), actual.size()); - Assert.assertEquals("Result rows should match", expected, actual); - Assert.assertEquals("Number of snapshots should match", 2, Iterables.size(table.snapshots())); + assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected); + assertThat(table.snapshots()).as("Number of snapshots should match").hasSize(2); } finally { for (StreamingQuery query : spark.streams().active()) { query.stop(); @@ -254,7 +265,7 @@ public void testStreamingWriteCompleteModeWithProjection() throws Exception { @Test public void testStreamingWriteUpdateMode() throws Exception { - File parent = temp.newFolder("parquet"); + File parent = temp.resolve("parquet").toFile(); File location = new File(parent, "test-table"); File checkpoint = new File(parent, "checkpoint"); diff --git a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala index a601ce70a9f7..f33143feb129 100644 --- a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala +++ b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.{ArrayType, DataType, StructType} -import org.apache.iceberg.{BaseTable, MetadataColumns, Schema, SnapshotSummary} +import org.apache.iceberg.{BaseTable, MetadataColumns, Schema, SnapshotSummary, TableProperties} import org.apache.iceberg.avro.AvroSchemaUtil import org.apache.iceberg.spark.source.{GlutenIcebergSourceUtil, SparkTable} import org.apache.iceberg.spark.source.metrics.NumSplits @@ -126,6 +126,35 @@ case class IcebergScanTransformer( } } + val baseTable = table match { + case t: SparkTable => + t.table() match { + case t: BaseTable => t + case _ => null + } + case _ => null + } + if (baseTable == null) { + return ValidationResult.succeeded + } + val metadata = baseTable.operations().current() + if (metadata.formatVersion() >= 3) { + val hasUnsupportedDelete = finalPartitions.exists { + case p: SparkDataSourceRDDPartition => + GlutenIcebergSourceUtil.deleteExists(p) + case other => + return ValidationResult.failed( + s"Unsupported partition type: ${other.getClass.getSimpleName}") + } + if (hasUnsupportedDelete) { + return ValidationResult.failed("Delete file format puffin is not supported") + } + } + // https://github.com/apache/incubator-gluten/issues/11135 + if (metadata.propertyAsBoolean(TableProperties.SPARK_WRITE_ACCEPT_ANY_SCHEMA, false)) { + return ValidationResult.failed("Not support read the file with accept any schema") + } + ValidationResult.succeeded } diff --git a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala index 0d283b75566f..bf99b8bffe08 100644 --- a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala +++ b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala @@ -18,7 +18,7 @@ package org.apache.gluten.execution import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.iceberg.{FileFormat, PartitionField, PartitionSpec, Schema} +import org.apache.iceberg.{FileFormat, PartitionField, PartitionSpec, Schema, TableProperties} import org.apache.iceberg.TableProperties.{ORC_COMPRESSION, ORC_COMPRESSION_DEFAULT, PARQUET_COMPRESSION, PARQUET_COMPRESSION_DEFAULT} import org.apache.iceberg.avro.AvroSchemaUtil import org.apache.iceberg.spark.source.IcebergWriteUtil @@ -101,6 +101,18 @@ trait IcebergWriteExec extends ColumnarV2TableWriteExec { if (query.output.exists(a => !AvroSchemaUtil.makeCompatibleName(a.name).equals(a.name))) { return ValidationResult.failed("Not support the compatible column name") } + if ( + IcebergWriteUtil + .getTable(write) + .properties() + .getOrDefault(TableProperties.SPARK_WRITE_ACCEPT_ANY_SCHEMA, "false") + .equals("true") + ) { + return ValidationResult.failed("Not support the write with accept any schema") + } + if (IcebergWriteUtil.getWriteConf(write).mergeSchema()) { + return ValidationResult.failed("Not support write with merge schema") + } ValidationResult.succeeded } diff --git a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala index bb551a4c837c..ebe899f922c1 100644 --- a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala +++ b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala @@ -41,6 +41,15 @@ object GlutenIcebergSourceUtil { classOf[SparkBatchQueryScan] } + def deleteExists(p: SparkDataSourceRDDPartition): Boolean = { + p.inputPartitions.exists { + case ip: SparkInputPartition => + val tasks = ip.taskGroup[ScanTask]().tasks().asScala + asFileScanTask(tasks.toList).exists(task => !task.deletes().isEmpty()) + case _ => throw new UnsupportedOperationException(s"Unsupported InputPartition type") + } + } + def genSplitInfo( partition: SparkDataSourceRDDPartition, readPartitionSchema: StructType): SplitInfo = { diff --git a/pom.xml b/pom.xml index 604c424a1512..004888af867c 100644 --- a/pom.xml +++ b/pom.xml @@ -99,6 +99,7 @@ 2.15.0 4.13.1 + 5.11.4 0.62.2 @@ -280,6 +281,12 @@ ${junit.version} test + + org.junit.jupiter + junit-jupiter + ${junit5.version} + test + org.apache.spark spark-core_${scala.binary.version} @@ -1098,7 +1105,7 @@ 34 spark-sql-columnar-shims-spark34 3.4.4 - 1.7.1 + 1.10.0 delta-core 2.4.0 24 @@ -1123,7 +1130,7 @@ 35 spark-sql-columnar-shims-spark35 3.5.5 - 1.8.0 + 1.10.0 delta-spark 3.3.2 33 @@ -1607,9 +1614,6 @@ ${project.basedir}/src-iceberg/main/resources - - ${project.basedir}/src-iceberg-spark${spark.plain.version}/main/resources - @@ -1623,8 +1627,6 @@ ${project.basedir}/src-iceberg/test/scala ${project.basedir}/src-iceberg/test/java - ${project.basedir}/src-iceberg-spark${spark.plain.version}/test/scala - ${project.basedir}/src-iceberg-spark${spark.plain.version}/test/java @@ -1639,6 +1641,49 @@ ${project.basedir}/src-iceberg/test/resources + + + + + + + + + + iceberg-test + + false + + + 3.4.1 + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-iceberg-test-sources + + add-test-source + + generate-test-sources + + + ${project.basedir}/src-iceberg-spark${spark.plain.version}/test/scala + ${project.basedir}/src-iceberg-spark${spark.plain.version}/test/java + + + + + add-iceberg-test-resources + + add-test-resource + + generate-test-resources + + ${project.basedir}/src-iceberg-spark${spark.plain.version}/test/resources