diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java index 8d8978927f63c..b5edc7878f994 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java @@ -27,7 +27,6 @@ import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; - import org.apache.hudi.table.action.HoodieWriteMetadata; import java.time.Duration; @@ -48,12 +47,9 @@ public HoodieWriteMetadata write(String instantTime, BaseCommitActionExecutor executor, WriteOperationType operationType) { try { - int targetParallelism = - deduceShuffleParallelism(inputRecords, configuredShuffleParallelism); - // De-dupe/merge if needed I dedupedRecords = - combineOnCondition(shouldCombine, inputRecords, targetParallelism, table); + combineOnCondition(shouldCombine, inputRecords, configuredShuffleParallelism, table); Instant lookupBegin = Instant.now(); I taggedRecords = dedupedRecords; @@ -79,8 +75,9 @@ protected abstract I tag( I dedupedRecords, HoodieEngineContext context, HoodieTable table); public I combineOnCondition( - boolean condition, I records, int parallelism, HoodieTable table) { - return condition ? deduplicateRecords(records, table, parallelism) : records; + boolean condition, I records, int configuredParallelism, HoodieTable table) { + int targetParallelism = deduceShuffleParallelism(records, configuredParallelism); + return condition ? deduplicateRecords(records, table, targetParallelism) : records; } /** diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/commit/TestWriterHelperBase.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/commit/TestWriterHelperBase.java new file mode 100644 index 0000000000000..2d43b4146085b --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/commit/TestWriterHelperBase.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.table.HoodieTable; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.io.IOException; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests for write helpers + */ +public abstract class TestWriterHelperBase extends HoodieCommonTestHarness { + private static int runNo = 0; + protected final BaseWriteHelper writeHelper; + protected HoodieEngineContext context; + protected HoodieTable table; + protected I inputRecords; + + public TestWriterHelperBase(BaseWriteHelper writeHelper) { + this.writeHelper = writeHelper; + } + + public abstract I getInputRecords(List recordList, int numPartitions); + + @BeforeEach + public void setUp() throws Exception { + initResources(); + } + + @AfterEach + public void tearDown() throws Exception { + cleanupResources(); + } + + @ParameterizedTest + @CsvSource({"true,0", "true,50", "false,0", "false,50"}) + public void testCombineParallelism(boolean shouldCombine, int configuredShuffleParallelism) { + int inputParallelism = 5; + inputRecords = getInputRecords( + dataGen.generateInserts("20230915000000000", 10), inputParallelism); + HoodieData outputRecords = (HoodieData) writeHelper.combineOnCondition( + shouldCombine, inputRecords, configuredShuffleParallelism, table); + if (!shouldCombine || configuredShuffleParallelism == 0) { + assertEquals(inputParallelism, outputRecords.getNumPartitions()); + } else { + assertEquals(configuredShuffleParallelism, outputRecords.getNumPartitions()); + } + } + + private void initResources() throws IOException { + initPath("dataset" + runNo); + runNo++; + initTestDataGenerator(); + initMetaClient(); + } + + private void cleanupResources() { + cleanMetaClient(); + cleanupTestDataGenerator(); + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestSparkWriteHelper.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestSparkWriteHelper.java new file mode 100644 index 0000000000000..5689de996eb48 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestSparkWriteHelper.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.testutils.HoodieClientTestUtils; + +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests for {@link HoodieWriteHelper} + */ +public class TestSparkWriteHelper extends TestWriterHelperBase> { + JavaSparkContext jsc; + + public TestSparkWriteHelper() { + super(HoodieWriteHelper.newInstance()); + } + + @BeforeEach + public void setup() throws Exception { + super.setUp(); + this.jsc = new JavaSparkContext( + HoodieClientTestUtils.getSparkConfForTest(TestSparkWriteHelper.class.getName())); + this.context = new HoodieSparkEngineContext(jsc); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withEmbeddedTimelineServerEnabled(false) + .build(); + this.table = HoodieSparkTable.create(config, context, metaClient); + } + + @Override + public HoodieData getInputRecords(List recordList, int numPartitions) { + HoodieData inputRecords = context.parallelize(recordList, numPartitions); + assertEquals(numPartitions, inputRecords.getNumPartitions()); + return inputRecords; + } + + @AfterEach + public void tearDown() throws Exception { + super.tearDown(); + if (this.jsc != null) { + this.jsc.stop(); + } + this.context = null; + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java index 7e70da23e09a1..a1a3864a6a980 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java @@ -51,8 +51,17 @@ protected void setTableName(String tableName) { * Initializes basePath. */ protected void initPath() { + initPath("dataset"); + } + + /** + * Initializes basePath with folder name. + * + * @param folderName Folder name. + */ + protected void initPath(String folderName) { try { - java.nio.file.Path basePath = tempDir.resolve("dataset"); + java.nio.file.Path basePath = tempDir.resolve(folderName); java.nio.file.Files.createDirectories(basePath); this.basePath = basePath.toAbsolutePath().toString(); this.baseUri = basePath.toUri();