diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java index d7640c28e50db..b56ac08e16fe1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java @@ -60,9 +60,6 @@ public HoodieData> deduplicateRecords( HoodieData> records, HoodieIndex index, int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger merger) { boolean isIndexingGlobal = index.isGlobal(); final SerializableSchema schema = new SerializableSchema(schemaStr); - // Auto-tunes the parallelism for reduce transformation based on the number of data partitions - // in engine-specific representation - int reduceParallelism = Math.max(1, Math.min(records.getNumPartitions(), parallelism)); return records.mapToPair(record -> { HoodieKey hoodieKey = record.getKey(); // If index used is global, then records are expected to differ in their partitionPath @@ -74,7 +71,7 @@ public HoodieData> deduplicateRecords( }).reduceByKey((rec1, rec2) -> { HoodieRecord reducedRecord; try { - reducedRecord = merger.merge(rec1, schema.get(), rec2, schema.get(), props).get().getLeft(); + reducedRecord = merger.merge(rec1, schema.get(), rec2, schema.get(), props).get().getLeft(); } catch (IOException e) { throw new HoodieException(String.format("Error to merge two records, %s, %s", rec1, rec2), e); } @@ -82,6 +79,6 @@ public HoodieData> deduplicateRecords( HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey(); HoodieOperation operation = choosePrev ? rec1.getOperation() : rec2.getOperation(); return reducedRecord.newInstance(reducedKey, operation); - }, reduceParallelism).map(Pair::getRight); + }, parallelism).map(Pair::getRight); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 5fcc4c0adf3e1..764be044bc2fd 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -479,12 +479,12 @@ private void testDeduplication( // Global dedup should be done based on recordKey only HoodieIndex index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(true); - int dedupParallelism = records.getNumPartitions() + 100; + int dedupParallelism = records.getNumPartitions() + 2; HoodieData> dedupedRecsRdd = (HoodieData>) HoodieWriteHelper.newInstance() .deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), HoodiePreCombineAvroRecordMerger.INSTANCE); List> dedupedRecs = dedupedRecsRdd.collectAsList(); - assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions()); + assertEquals(dedupParallelism, dedupedRecsRdd.getNumPartitions()); assertEquals(1, dedupedRecs.size()); assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath()); assertNodupesWithinPartition(dedupedRecs); @@ -496,7 +496,7 @@ private void testDeduplication( (HoodieData>) HoodieWriteHelper.newInstance() .deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), HoodiePreCombineAvroRecordMerger.INSTANCE); dedupedRecs = dedupedRecsRdd.collectAsList(); - assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions()); + assertEquals(dedupParallelism, dedupedRecsRdd.getNumPartitions()); assertEquals(2, dedupedRecs.size()); assertNodupesWithinPartition(dedupedRecs);