From a71f168ec7316c216504ec1e04e3d8e6c3e9c6d8 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Thu, 14 Sep 2023 22:44:25 -0700 Subject: [PATCH 1/3] [HUDI-6863] Revert auto-tuning of dedup parallelism --- .../hudi/TestHoodieSparkSqlWriter.scala | 46 ++++++++++++++----- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 7f89817a7f8c3..c31bd30b060a3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -49,7 +49,6 @@ import org.mockito.Mockito.{spy, times, verify} import org.scalatest.Assertions.assertThrows import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, intercept} -import java.io.IOException import java.time.format.DateTimeFormatterBuilder import java.time.temporal.ChronoField import java.time.{Instant, ZoneId} @@ -298,13 +297,17 @@ class TestHoodieSparkSqlWriter { def testValidateTableConfigWithOverwriteSaveMode(): Unit = { //create a new table val tableModifier1 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, - "hoodie.datasource.write.recordkey.field" -> "uuid") + "hoodie.datasource.write.recordkey.field" -> "uuid", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2") val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier1, dataFrame) //on same path try write with different RECORDKEY_FIELD_NAME and Append SaveMode should throw an exception val tableModifier2 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, - "hoodie.datasource.write.recordkey.field" -> "ts") + "hoodie.datasource.write.recordkey.field" -> "ts", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2") val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) val hoodieException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableModifier2, dataFrame2)) assert(hoodieException.getMessage.contains("Config conflict")) @@ -321,13 +324,17 @@ class TestHoodieSparkSqlWriter { def testChangePartitionPath(): Unit = { //create a new table val tableModifier1 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, - "hoodie.datasource.write.recordkey.field" -> "uuid", "hoodie.datasource.write.partitionpath.field" -> "ts") + "hoodie.datasource.write.recordkey.field" -> "uuid", "hoodie.datasource.write.partitionpath.field" -> "ts", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2") val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier1, dataFrame) //on same path try write with different partitionpath field and Append SaveMode should throw an exception val tableModifier2 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, - "hoodie.datasource.write.recordkey.field" -> "uuid", "hoodie.datasource.write.partitionpath.field" -> "uuid") + "hoodie.datasource.write.recordkey.field" -> "uuid", "hoodie.datasource.write.partitionpath.field" -> "uuid", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2") val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) val hoodieException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableModifier2, dataFrame2)) assert(hoodieException.getMessage.contains("Config conflict")) @@ -791,7 +798,10 @@ class TestHoodieSparkSqlWriter { DataSourceWriteOptions.RECORDKEY_FIELD.key -> "keyid", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "", DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator", - HoodieWriteConfig.TBL_NAME.key -> "hoodie_test") + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" + ) val df = spark.range(0, 1000).toDF("keyid") .withColumn("col3", expr("keyid")) .withColumn("age", lit(1)) @@ -1051,7 +1061,9 @@ class TestHoodieSparkSqlWriter { val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" ) // case 1: test table which created by sql @@ -1129,7 +1141,9 @@ class TestHoodieSparkSqlWriter { val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt") val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", - DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts" + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" ) // case 1: When commit C1 specifies a key generator and commit C2 does not specify key generator @@ -1158,7 +1172,9 @@ class TestHoodieSparkSqlWriter { val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" ) // case 1: When commit C1 does not specify key generator and commit C2 specifies a key generator @@ -1191,7 +1207,9 @@ class TestHoodieSparkSqlWriter { val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" ) // case 1: When commit C1 specifies a key generator and commkt C2 does not specify key generator @@ -1223,7 +1241,9 @@ class TestHoodieSparkSqlWriter { val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" ) // case 1: When commit C1 specifies a key generator and commkt C2 does not specify key generator @@ -1279,7 +1299,9 @@ class TestHoodieSparkSqlWriter { DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key -> "CONSISTENT_HASHING", - HoodieIndexConfig.INDEX_TYPE.key -> "BUCKET" + HoodieIndexConfig.INDEX_TYPE.key -> "BUCKET", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" ) val (tableName1, tablePath1) = ("hoodie_test_params_1", s"$tempBasePath" + "_1") From a0db324834048806b63baf804713c1c5cb657678 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 15 Sep 2023 10:30:25 -0700 Subject: [PATCH 2/3] Remove unnecessary parallelism configs in tests --- .../hudi/TestHoodieSparkSqlWriter.scala | 46 +++++-------------- 1 file changed, 12 insertions(+), 34 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index c31bd30b060a3..7f89817a7f8c3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -49,6 +49,7 @@ import org.mockito.Mockito.{spy, times, verify} import org.scalatest.Assertions.assertThrows import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, intercept} +import java.io.IOException import java.time.format.DateTimeFormatterBuilder import java.time.temporal.ChronoField import java.time.{Instant, ZoneId} @@ -297,17 +298,13 @@ class TestHoodieSparkSqlWriter { def testValidateTableConfigWithOverwriteSaveMode(): Unit = { //create a new table val tableModifier1 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, - "hoodie.datasource.write.recordkey.field" -> "uuid", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2") + "hoodie.datasource.write.recordkey.field" -> "uuid") val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier1, dataFrame) //on same path try write with different RECORDKEY_FIELD_NAME and Append SaveMode should throw an exception val tableModifier2 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, - "hoodie.datasource.write.recordkey.field" -> "ts", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2") + "hoodie.datasource.write.recordkey.field" -> "ts") val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) val hoodieException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableModifier2, dataFrame2)) assert(hoodieException.getMessage.contains("Config conflict")) @@ -324,17 +321,13 @@ class TestHoodieSparkSqlWriter { def testChangePartitionPath(): Unit = { //create a new table val tableModifier1 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, - "hoodie.datasource.write.recordkey.field" -> "uuid", "hoodie.datasource.write.partitionpath.field" -> "ts", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2") + "hoodie.datasource.write.recordkey.field" -> "uuid", "hoodie.datasource.write.partitionpath.field" -> "ts") val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier1, dataFrame) //on same path try write with different partitionpath field and Append SaveMode should throw an exception val tableModifier2 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, - "hoodie.datasource.write.recordkey.field" -> "uuid", "hoodie.datasource.write.partitionpath.field" -> "uuid", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2") + "hoodie.datasource.write.recordkey.field" -> "uuid", "hoodie.datasource.write.partitionpath.field" -> "uuid") val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) val hoodieException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableModifier2, dataFrame2)) assert(hoodieException.getMessage.contains("Config conflict")) @@ -798,10 +791,7 @@ class TestHoodieSparkSqlWriter { DataSourceWriteOptions.RECORDKEY_FIELD.key -> "keyid", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "", DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator", - HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" - ) + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test") val df = spark.range(0, 1000).toDF("keyid") .withColumn("col3", expr("keyid")) .withColumn("age", lit(1)) @@ -1061,9 +1051,7 @@ class TestHoodieSparkSqlWriter { val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" ) // case 1: test table which created by sql @@ -1141,9 +1129,7 @@ class TestHoodieSparkSqlWriter { val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt") val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", - DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts" ) // case 1: When commit C1 specifies a key generator and commit C2 does not specify key generator @@ -1172,9 +1158,7 @@ class TestHoodieSparkSqlWriter { val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" ) // case 1: When commit C1 does not specify key generator and commit C2 specifies a key generator @@ -1207,9 +1191,7 @@ class TestHoodieSparkSqlWriter { val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" ) // case 1: When commit C1 specifies a key generator and commkt C2 does not specify key generator @@ -1241,9 +1223,7 @@ class TestHoodieSparkSqlWriter { val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" ) // case 1: When commit C1 specifies a key generator and commkt C2 does not specify key generator @@ -1299,9 +1279,7 @@ class TestHoodieSparkSqlWriter { DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key -> "CONSISTENT_HASHING", - HoodieIndexConfig.INDEX_TYPE.key -> "BUCKET", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" + HoodieIndexConfig.INDEX_TYPE.key -> "BUCKET" ) val (tableName1, tablePath1) = ("hoodie_test_params_1", s"$tempBasePath" + "_1") From 047941b66ee52a99f626fd0dadb72581d9855385 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 15 Sep 2023 18:13:16 -0700 Subject: [PATCH 3/3] [MINOR] Add tests on combine parallelism --- .../table/action/commit/BaseWriteHelper.java | 11 +-- .../action/commit/TestWriterHelperBase.java | 90 +++++++++++++++++++ .../action/commit/TestSparkWriteHelper.java | 76 ++++++++++++++++ .../testutils/HoodieCommonTestHarness.java | 11 ++- 4 files changed, 180 insertions(+), 8 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/commit/TestWriterHelperBase.java create mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestSparkWriteHelper.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java index 8d8978927f63c..b5edc7878f994 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java @@ -27,7 +27,6 @@ import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; - import org.apache.hudi.table.action.HoodieWriteMetadata; import java.time.Duration; @@ -48,12 +47,9 @@ public HoodieWriteMetadata write(String instantTime, BaseCommitActionExecutor executor, WriteOperationType operationType) { try { - int targetParallelism = - deduceShuffleParallelism(inputRecords, configuredShuffleParallelism); - // De-dupe/merge if needed I dedupedRecords = - combineOnCondition(shouldCombine, inputRecords, targetParallelism, table); + combineOnCondition(shouldCombine, inputRecords, configuredShuffleParallelism, table); Instant lookupBegin = Instant.now(); I taggedRecords = dedupedRecords; @@ -79,8 +75,9 @@ protected abstract I tag( I dedupedRecords, HoodieEngineContext context, HoodieTable table); public I combineOnCondition( - boolean condition, I records, int parallelism, HoodieTable table) { - return condition ? deduplicateRecords(records, table, parallelism) : records; + boolean condition, I records, int configuredParallelism, HoodieTable table) { + int targetParallelism = deduceShuffleParallelism(records, configuredParallelism); + return condition ? deduplicateRecords(records, table, targetParallelism) : records; } /** diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/commit/TestWriterHelperBase.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/commit/TestWriterHelperBase.java new file mode 100644 index 0000000000000..2d43b4146085b --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/commit/TestWriterHelperBase.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.table.HoodieTable; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.io.IOException; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests for write helpers + */ +public abstract class TestWriterHelperBase extends HoodieCommonTestHarness { + private static int runNo = 0; + protected final BaseWriteHelper writeHelper; + protected HoodieEngineContext context; + protected HoodieTable table; + protected I inputRecords; + + public TestWriterHelperBase(BaseWriteHelper writeHelper) { + this.writeHelper = writeHelper; + } + + public abstract I getInputRecords(List recordList, int numPartitions); + + @BeforeEach + public void setUp() throws Exception { + initResources(); + } + + @AfterEach + public void tearDown() throws Exception { + cleanupResources(); + } + + @ParameterizedTest + @CsvSource({"true,0", "true,50", "false,0", "false,50"}) + public void testCombineParallelism(boolean shouldCombine, int configuredShuffleParallelism) { + int inputParallelism = 5; + inputRecords = getInputRecords( + dataGen.generateInserts("20230915000000000", 10), inputParallelism); + HoodieData outputRecords = (HoodieData) writeHelper.combineOnCondition( + shouldCombine, inputRecords, configuredShuffleParallelism, table); + if (!shouldCombine || configuredShuffleParallelism == 0) { + assertEquals(inputParallelism, outputRecords.getNumPartitions()); + } else { + assertEquals(configuredShuffleParallelism, outputRecords.getNumPartitions()); + } + } + + private void initResources() throws IOException { + initPath("dataset" + runNo); + runNo++; + initTestDataGenerator(); + initMetaClient(); + } + + private void cleanupResources() { + cleanMetaClient(); + cleanupTestDataGenerator(); + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestSparkWriteHelper.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestSparkWriteHelper.java new file mode 100644 index 0000000000000..5689de996eb48 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestSparkWriteHelper.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.testutils.HoodieClientTestUtils; + +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests for {@link HoodieWriteHelper} + */ +public class TestSparkWriteHelper extends TestWriterHelperBase> { + JavaSparkContext jsc; + + public TestSparkWriteHelper() { + super(HoodieWriteHelper.newInstance()); + } + + @BeforeEach + public void setup() throws Exception { + super.setUp(); + this.jsc = new JavaSparkContext( + HoodieClientTestUtils.getSparkConfForTest(TestSparkWriteHelper.class.getName())); + this.context = new HoodieSparkEngineContext(jsc); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withEmbeddedTimelineServerEnabled(false) + .build(); + this.table = HoodieSparkTable.create(config, context, metaClient); + } + + @Override + public HoodieData getInputRecords(List recordList, int numPartitions) { + HoodieData inputRecords = context.parallelize(recordList, numPartitions); + assertEquals(numPartitions, inputRecords.getNumPartitions()); + return inputRecords; + } + + @AfterEach + public void tearDown() throws Exception { + super.tearDown(); + if (this.jsc != null) { + this.jsc.stop(); + } + this.context = null; + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java index 7e70da23e09a1..a1a3864a6a980 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java @@ -51,8 +51,17 @@ protected void setTableName(String tableName) { * Initializes basePath. */ protected void initPath() { + initPath("dataset"); + } + + /** + * Initializes basePath with folder name. + * + * @param folderName Folder name. + */ + protected void initPath(String folderName) { try { - java.nio.file.Path basePath = tempDir.resolve("dataset"); + java.nio.file.Path basePath = tempDir.resolve(folderName); java.nio.file.Files.createDirectories(basePath); this.basePath = basePath.toAbsolutePath().toString(); this.baseUri = basePath.toUri();