From 720510ae90d216a7f190f0fe9fb6e9163508d520 Mon Sep 17 00:00:00 2001 From: schongloo Date: Mon, 16 Oct 2023 18:16:34 -0700 Subject: [PATCH 1/4] Reverting the automatic application of custom partitioner for bucketing column with hash distribution --- .../java/org/apache/iceberg/flink/sink/FlinkSink.java | 8 +------- .../java/org/apache/iceberg/flink/sink/FlinkSink.java | 8 +------- .../java/org/apache/iceberg/flink/sink/FlinkSink.java | 8 +------- 3 files changed, 3 insertions(+), 21 deletions(-) diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index e058f975c500..769af7d77140 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -525,13 +525,7 @@ private DataStream distributeDataStream( + "and table is unpartitioned"); return input; } else { - if (BucketPartitionerUtil.hasOneBucketField(partitionSpec)) { - return input.partitionCustom( - new BucketPartitioner(partitionSpec), - new BucketPartitionKeySelector(partitionSpec, iSchema, flinkRowType)); - } else { - return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); - } + return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); } } else { if (partitionSpec.isUnpartitioned()) { diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index e058f975c500..769af7d77140 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -525,13 +525,7 @@ private DataStream distributeDataStream( + "and table is unpartitioned"); return input; } else { - if (BucketPartitionerUtil.hasOneBucketField(partitionSpec)) { - return input.partitionCustom( - new BucketPartitioner(partitionSpec), - new BucketPartitionKeySelector(partitionSpec, iSchema, flinkRowType)); - } else { - return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); - } + return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); } } else { if (partitionSpec.isUnpartitioned()) { diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index e058f975c500..769af7d77140 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -525,13 +525,7 @@ private DataStream distributeDataStream( + "and table is unpartitioned"); return input; } else { - if (BucketPartitionerUtil.hasOneBucketField(partitionSpec)) { - return input.partitionCustom( - new BucketPartitioner(partitionSpec), - new BucketPartitionKeySelector(partitionSpec, iSchema, flinkRowType)); - } else { - return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); - } + return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); } } else { if (partitionSpec.isUnpartitioned()) { From 9d6ab9ea4e5adc8bbcb17f7f9f3283c74fc1464b Mon Sep 17 00:00:00 2001 From: schongloo Date: Mon, 16 Oct 2023 21:58:28 -0700 Subject: [PATCH 2/4] Adjusting tests. Removing a redundant one. --- ...TestBucketPartitionerFlinkIcebergSink.java | 30 +++++-------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java index 29a0898a1b76..9dae43ce5e58 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java @@ -114,13 +114,19 @@ private void appendRowsToTable(List allRows) throws Exception { new BoundedTestSource<>( allRows.stream().map(converter::toExternal).toArray(Row[]::new)), ROW_TYPE_INFO) - .map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)); + .map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)) + .partitionCustom( + new BucketPartitioner(table.spec()), + new BucketPartitionKeySelector( + table.spec(), + table.schema(), + FlinkSink.toFlinkRowType(table.schema(), SimpleDataUtil.FLINK_SCHEMA))); FlinkSink.forRowData(dataStream) .table(table) .tableLoader(tableLoader) .writeParallelism(parallelism) - .distributionMode(DistributionMode.HASH) + .distributionMode(DistributionMode.NONE) .append(); env.execute("Test Iceberg DataStream"); @@ -157,26 +163,6 @@ public void testSendRecordsToAllBucketsEvenly(TableSchemaType tableSchemaType) t } } - /** - * Verifies the BucketPartitioner is not used when the PartitionSpec has more than 1 bucket, and - * that it should fallback to input.keyBy - */ - @ParameterizedTest - @EnumSource(value = TableSchemaType.class, names = "TWO_BUCKETS") - public void testMultipleBucketsFallback(TableSchemaType tableSchemaType) throws Exception { - setupEnvironment(tableSchemaType); - List rows = generateTestDataRows(); - - appendRowsToTable(rows); - TableTestStats stats = extractPartitionResults(tableSchemaType); - - Assertions.assertThat(stats.totalRowCount).isEqualTo(rows.size()); - for (int i = 0, j = numBuckets; i < numBuckets; i++, j++) { - // Only 1 file per bucket will be created when falling back to input.keyBy(...) - Assertions.assertThat((int) stats.numFilesPerBucket.get(i)).isEqualTo(1); - } - } - /** * Generating 16 rows to be sent uniformly to all writers (round-robin across 8 writers -> 4 * buckets) From 26f1734eac3c72f47bd72fdaa19392cd6015eaa6 Mon Sep 17 00:00:00 2001 From: schongloo Date: Tue, 17 Oct 2023 08:21:03 -0700 Subject: [PATCH 3/4] Adjusting tests for 1.15 and 1.16. Removing a redundant one. --- ...TestBucketPartitionerFlinkIcebergSink.java | 108 ++++++++---------- ...TestBucketPartitionerFlinkIcebergSink.java | 108 ++++++++---------- 2 files changed, 94 insertions(+), 122 deletions(-) diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java index 29a0898a1b76..71bf61fa9161 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java @@ -64,19 +64,19 @@ public class TestBucketPartitionerFlinkIcebergSink { @RegisterExtension private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = - new MiniClusterExtension( - new MiniClusterResourceConfiguration.Builder() - .setNumberTaskManagers(NUMBER_TASK_MANAGERS) - .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER) - .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) - .build()); + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(NUMBER_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER) + .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) + .build()); @RegisterExtension private static final HadoopCatalogExtension catalogExtension = - new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); + new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); private static final TypeInformation ROW_TYPE_INFO = - new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); + new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); // Parallelism = 8 (parallelism > numBuckets) throughout the test suite private final int parallelism = NUMBER_TASK_MANAGERS * SLOTS_PER_TASK_MANAGER; @@ -90,38 +90,44 @@ public class TestBucketPartitionerFlinkIcebergSink { private void setupEnvironment(TableSchemaType tableSchemaType) { PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets); table = - catalogExtension - .catalog() - .createTable( - TABLE_IDENTIFIER, - SimpleDataUtil.SCHEMA, - partitionSpec, - ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + catalogExtension + .catalog() + .createTable( + TABLE_IDENTIFIER, + SimpleDataUtil.SCHEMA, + partitionSpec, + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); env = - StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG) - .enableCheckpointing(100) - .setParallelism(parallelism) - .setMaxParallelism(parallelism * 2); + StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG) + .enableCheckpointing(100) + .setParallelism(parallelism) + .setMaxParallelism(parallelism * 2); tableLoader = catalogExtension.tableLoader(); } private void appendRowsToTable(List allRows) throws Exception { DataFormatConverters.RowConverter converter = - new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes()); + new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes()); DataStream dataStream = - env.addSource( - new BoundedTestSource<>( - allRows.stream().map(converter::toExternal).toArray(Row[]::new)), - ROW_TYPE_INFO) - .map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)); + env.addSource( + new BoundedTestSource<>( + allRows.stream().map(converter::toExternal).toArray(Row[]::new)), + ROW_TYPE_INFO) + .map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)) + .partitionCustom( + new BucketPartitioner(table.spec()), + new BucketPartitionKeySelector( + table.spec(), + table.schema(), + FlinkSink.toFlinkRowType(table.schema(), SimpleDataUtil.FLINK_SCHEMA))); FlinkSink.forRowData(dataStream) - .table(table) - .tableLoader(tableLoader) - .writeParallelism(parallelism) - .distributionMode(DistributionMode.HASH) - .append(); + .table(table) + .tableLoader(tableLoader) + .writeParallelism(parallelism) + .distributionMode(DistributionMode.NONE) + .append(); env.execute("Test Iceberg DataStream"); @@ -130,8 +136,8 @@ private void appendRowsToTable(List allRows) throws Exception { @ParameterizedTest @EnumSource( - value = TableSchemaType.class, - names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + value = TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) public void testSendRecordsToAllBucketsEvenly(TableSchemaType tableSchemaType) throws Exception { setupEnvironment(tableSchemaType); List rows = generateTestDataRows(); @@ -157,26 +163,6 @@ public void testSendRecordsToAllBucketsEvenly(TableSchemaType tableSchemaType) t } } - /** - * Verifies the BucketPartitioner is not used when the PartitionSpec has more than 1 bucket, and - * that it should fallback to input.keyBy - */ - @ParameterizedTest - @EnumSource(value = TableSchemaType.class, names = "TWO_BUCKETS") - public void testMultipleBucketsFallback(TableSchemaType tableSchemaType) throws Exception { - setupEnvironment(tableSchemaType); - List rows = generateTestDataRows(); - - appendRowsToTable(rows); - TableTestStats stats = extractPartitionResults(tableSchemaType); - - Assertions.assertThat(stats.totalRowCount).isEqualTo(rows.size()); - for (int i = 0, j = numBuckets; i < numBuckets; i++, j++) { - // Only 1 file per bucket will be created when falling back to input.keyBy(...) - Assertions.assertThat((int) stats.numFilesPerBucket.get(i)).isEqualTo(1); - } - } - /** * Generating 16 rows to be sent uniformly to all writers (round-robin across 8 writers -> 4 * buckets) @@ -188,7 +174,7 @@ private List generateTestDataRows() { } private TableTestStats extractPartitionResults(TableSchemaType tableSchemaType) - throws IOException { + throws IOException { int totalRecordCount = 0; Map> writersPerBucket = Maps.newHashMap(); // > Map filesPerBucket = Maps.newHashMap(); // @@ -206,10 +192,10 @@ private TableTestStats extractPartitionResults(TableSchemaType tableSchemaType) totalRecordCount += recordCountInFile; int bucketId = - scanTask - .file() - .partition() - .get(tableSchemaType.bucketPartitionColumnPosition(), Integer.class); + scanTask + .file() + .partition() + .get(tableSchemaType.bucketPartitionColumnPosition(), Integer.class); writersPerBucket.computeIfAbsent(bucketId, k -> Lists.newArrayList()); writersPerBucket.get(bucketId).add(writerId); filesPerBucket.put(bucketId, filesPerBucket.getOrDefault(bucketId, 0) + 1); @@ -228,10 +214,10 @@ private static class TableTestStats { final Map rowsPerWriter; TableTestStats( - int totalRecordCount, - Map> writersPerBucket, - Map numFilesPerBucket, - Map rowsPerWriter) { + int totalRecordCount, + Map> writersPerBucket, + Map numFilesPerBucket, + Map rowsPerWriter) { this.totalRowCount = totalRecordCount; this.writersPerBucket = writersPerBucket; this.numFilesPerBucket = numFilesPerBucket; diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java index 29a0898a1b76..71bf61fa9161 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java @@ -64,19 +64,19 @@ public class TestBucketPartitionerFlinkIcebergSink { @RegisterExtension private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = - new MiniClusterExtension( - new MiniClusterResourceConfiguration.Builder() - .setNumberTaskManagers(NUMBER_TASK_MANAGERS) - .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER) - .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) - .build()); + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(NUMBER_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER) + .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) + .build()); @RegisterExtension private static final HadoopCatalogExtension catalogExtension = - new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); + new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); private static final TypeInformation ROW_TYPE_INFO = - new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); + new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); // Parallelism = 8 (parallelism > numBuckets) throughout the test suite private final int parallelism = NUMBER_TASK_MANAGERS * SLOTS_PER_TASK_MANAGER; @@ -90,38 +90,44 @@ public class TestBucketPartitionerFlinkIcebergSink { private void setupEnvironment(TableSchemaType tableSchemaType) { PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets); table = - catalogExtension - .catalog() - .createTable( - TABLE_IDENTIFIER, - SimpleDataUtil.SCHEMA, - partitionSpec, - ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + catalogExtension + .catalog() + .createTable( + TABLE_IDENTIFIER, + SimpleDataUtil.SCHEMA, + partitionSpec, + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); env = - StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG) - .enableCheckpointing(100) - .setParallelism(parallelism) - .setMaxParallelism(parallelism * 2); + StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG) + .enableCheckpointing(100) + .setParallelism(parallelism) + .setMaxParallelism(parallelism * 2); tableLoader = catalogExtension.tableLoader(); } private void appendRowsToTable(List allRows) throws Exception { DataFormatConverters.RowConverter converter = - new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes()); + new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes()); DataStream dataStream = - env.addSource( - new BoundedTestSource<>( - allRows.stream().map(converter::toExternal).toArray(Row[]::new)), - ROW_TYPE_INFO) - .map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)); + env.addSource( + new BoundedTestSource<>( + allRows.stream().map(converter::toExternal).toArray(Row[]::new)), + ROW_TYPE_INFO) + .map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)) + .partitionCustom( + new BucketPartitioner(table.spec()), + new BucketPartitionKeySelector( + table.spec(), + table.schema(), + FlinkSink.toFlinkRowType(table.schema(), SimpleDataUtil.FLINK_SCHEMA))); FlinkSink.forRowData(dataStream) - .table(table) - .tableLoader(tableLoader) - .writeParallelism(parallelism) - .distributionMode(DistributionMode.HASH) - .append(); + .table(table) + .tableLoader(tableLoader) + .writeParallelism(parallelism) + .distributionMode(DistributionMode.NONE) + .append(); env.execute("Test Iceberg DataStream"); @@ -130,8 +136,8 @@ private void appendRowsToTable(List allRows) throws Exception { @ParameterizedTest @EnumSource( - value = TableSchemaType.class, - names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + value = TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) public void testSendRecordsToAllBucketsEvenly(TableSchemaType tableSchemaType) throws Exception { setupEnvironment(tableSchemaType); List rows = generateTestDataRows(); @@ -157,26 +163,6 @@ public void testSendRecordsToAllBucketsEvenly(TableSchemaType tableSchemaType) t } } - /** - * Verifies the BucketPartitioner is not used when the PartitionSpec has more than 1 bucket, and - * that it should fallback to input.keyBy - */ - @ParameterizedTest - @EnumSource(value = TableSchemaType.class, names = "TWO_BUCKETS") - public void testMultipleBucketsFallback(TableSchemaType tableSchemaType) throws Exception { - setupEnvironment(tableSchemaType); - List rows = generateTestDataRows(); - - appendRowsToTable(rows); - TableTestStats stats = extractPartitionResults(tableSchemaType); - - Assertions.assertThat(stats.totalRowCount).isEqualTo(rows.size()); - for (int i = 0, j = numBuckets; i < numBuckets; i++, j++) { - // Only 1 file per bucket will be created when falling back to input.keyBy(...) - Assertions.assertThat((int) stats.numFilesPerBucket.get(i)).isEqualTo(1); - } - } - /** * Generating 16 rows to be sent uniformly to all writers (round-robin across 8 writers -> 4 * buckets) @@ -188,7 +174,7 @@ private List generateTestDataRows() { } private TableTestStats extractPartitionResults(TableSchemaType tableSchemaType) - throws IOException { + throws IOException { int totalRecordCount = 0; Map> writersPerBucket = Maps.newHashMap(); // > Map filesPerBucket = Maps.newHashMap(); // @@ -206,10 +192,10 @@ private TableTestStats extractPartitionResults(TableSchemaType tableSchemaType) totalRecordCount += recordCountInFile; int bucketId = - scanTask - .file() - .partition() - .get(tableSchemaType.bucketPartitionColumnPosition(), Integer.class); + scanTask + .file() + .partition() + .get(tableSchemaType.bucketPartitionColumnPosition(), Integer.class); writersPerBucket.computeIfAbsent(bucketId, k -> Lists.newArrayList()); writersPerBucket.get(bucketId).add(writerId); filesPerBucket.put(bucketId, filesPerBucket.getOrDefault(bucketId, 0) + 1); @@ -228,10 +214,10 @@ private static class TableTestStats { final Map rowsPerWriter; TableTestStats( - int totalRecordCount, - Map> writersPerBucket, - Map numFilesPerBucket, - Map rowsPerWriter) { + int totalRecordCount, + Map> writersPerBucket, + Map numFilesPerBucket, + Map rowsPerWriter) { this.totalRowCount = totalRecordCount; this.writersPerBucket = writersPerBucket; this.numFilesPerBucket = numFilesPerBucket; From 77ff8d7626708478c80748ec89d02d66a85b5981 Mon Sep 17 00:00:00 2001 From: schongloo Date: Tue, 17 Oct 2023 08:32:59 -0700 Subject: [PATCH 4/4] Adjusting tests for 1.15 and 1.16. Removing a redundant one. Style was lost. --- ...TestBucketPartitionerFlinkIcebergSink.java | 94 +++++++++---------- ...TestBucketPartitionerFlinkIcebergSink.java | 94 +++++++++---------- 2 files changed, 94 insertions(+), 94 deletions(-) diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java index 71bf61fa9161..9dae43ce5e58 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java @@ -64,19 +64,19 @@ public class TestBucketPartitionerFlinkIcebergSink { @RegisterExtension private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = - new MiniClusterExtension( - new MiniClusterResourceConfiguration.Builder() - .setNumberTaskManagers(NUMBER_TASK_MANAGERS) - .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER) - .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) - .build()); + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(NUMBER_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER) + .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) + .build()); @RegisterExtension private static final HadoopCatalogExtension catalogExtension = - new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); + new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); private static final TypeInformation ROW_TYPE_INFO = - new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); + new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); // Parallelism = 8 (parallelism > numBuckets) throughout the test suite private final int parallelism = NUMBER_TASK_MANAGERS * SLOTS_PER_TASK_MANAGER; @@ -90,44 +90,44 @@ public class TestBucketPartitionerFlinkIcebergSink { private void setupEnvironment(TableSchemaType tableSchemaType) { PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets); table = - catalogExtension - .catalog() - .createTable( - TABLE_IDENTIFIER, - SimpleDataUtil.SCHEMA, - partitionSpec, - ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + catalogExtension + .catalog() + .createTable( + TABLE_IDENTIFIER, + SimpleDataUtil.SCHEMA, + partitionSpec, + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); env = - StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG) - .enableCheckpointing(100) - .setParallelism(parallelism) - .setMaxParallelism(parallelism * 2); + StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG) + .enableCheckpointing(100) + .setParallelism(parallelism) + .setMaxParallelism(parallelism * 2); tableLoader = catalogExtension.tableLoader(); } private void appendRowsToTable(List allRows) throws Exception { DataFormatConverters.RowConverter converter = - new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes()); + new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes()); DataStream dataStream = - env.addSource( - new BoundedTestSource<>( - allRows.stream().map(converter::toExternal).toArray(Row[]::new)), - ROW_TYPE_INFO) - .map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)) - .partitionCustom( - new BucketPartitioner(table.spec()), - new BucketPartitionKeySelector( - table.spec(), - table.schema(), - FlinkSink.toFlinkRowType(table.schema(), SimpleDataUtil.FLINK_SCHEMA))); + env.addSource( + new BoundedTestSource<>( + allRows.stream().map(converter::toExternal).toArray(Row[]::new)), + ROW_TYPE_INFO) + .map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)) + .partitionCustom( + new BucketPartitioner(table.spec()), + new BucketPartitionKeySelector( + table.spec(), + table.schema(), + FlinkSink.toFlinkRowType(table.schema(), SimpleDataUtil.FLINK_SCHEMA))); FlinkSink.forRowData(dataStream) - .table(table) - .tableLoader(tableLoader) - .writeParallelism(parallelism) - .distributionMode(DistributionMode.NONE) - .append(); + .table(table) + .tableLoader(tableLoader) + .writeParallelism(parallelism) + .distributionMode(DistributionMode.NONE) + .append(); env.execute("Test Iceberg DataStream"); @@ -136,8 +136,8 @@ private void appendRowsToTable(List allRows) throws Exception { @ParameterizedTest @EnumSource( - value = TableSchemaType.class, - names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + value = TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) public void testSendRecordsToAllBucketsEvenly(TableSchemaType tableSchemaType) throws Exception { setupEnvironment(tableSchemaType); List rows = generateTestDataRows(); @@ -174,7 +174,7 @@ private List generateTestDataRows() { } private TableTestStats extractPartitionResults(TableSchemaType tableSchemaType) - throws IOException { + throws IOException { int totalRecordCount = 0; Map> writersPerBucket = Maps.newHashMap(); // > Map filesPerBucket = Maps.newHashMap(); // @@ -192,10 +192,10 @@ private TableTestStats extractPartitionResults(TableSchemaType tableSchemaType) totalRecordCount += recordCountInFile; int bucketId = - scanTask - .file() - .partition() - .get(tableSchemaType.bucketPartitionColumnPosition(), Integer.class); + scanTask + .file() + .partition() + .get(tableSchemaType.bucketPartitionColumnPosition(), Integer.class); writersPerBucket.computeIfAbsent(bucketId, k -> Lists.newArrayList()); writersPerBucket.get(bucketId).add(writerId); filesPerBucket.put(bucketId, filesPerBucket.getOrDefault(bucketId, 0) + 1); @@ -214,10 +214,10 @@ private static class TableTestStats { final Map rowsPerWriter; TableTestStats( - int totalRecordCount, - Map> writersPerBucket, - Map numFilesPerBucket, - Map rowsPerWriter) { + int totalRecordCount, + Map> writersPerBucket, + Map numFilesPerBucket, + Map rowsPerWriter) { this.totalRowCount = totalRecordCount; this.writersPerBucket = writersPerBucket; this.numFilesPerBucket = numFilesPerBucket; diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java index 71bf61fa9161..9dae43ce5e58 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java @@ -64,19 +64,19 @@ public class TestBucketPartitionerFlinkIcebergSink { @RegisterExtension private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = - new MiniClusterExtension( - new MiniClusterResourceConfiguration.Builder() - .setNumberTaskManagers(NUMBER_TASK_MANAGERS) - .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER) - .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) - .build()); + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(NUMBER_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER) + .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) + .build()); @RegisterExtension private static final HadoopCatalogExtension catalogExtension = - new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); + new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); private static final TypeInformation ROW_TYPE_INFO = - new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); + new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); // Parallelism = 8 (parallelism > numBuckets) throughout the test suite private final int parallelism = NUMBER_TASK_MANAGERS * SLOTS_PER_TASK_MANAGER; @@ -90,44 +90,44 @@ public class TestBucketPartitionerFlinkIcebergSink { private void setupEnvironment(TableSchemaType tableSchemaType) { PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets); table = - catalogExtension - .catalog() - .createTable( - TABLE_IDENTIFIER, - SimpleDataUtil.SCHEMA, - partitionSpec, - ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + catalogExtension + .catalog() + .createTable( + TABLE_IDENTIFIER, + SimpleDataUtil.SCHEMA, + partitionSpec, + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); env = - StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG) - .enableCheckpointing(100) - .setParallelism(parallelism) - .setMaxParallelism(parallelism * 2); + StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG) + .enableCheckpointing(100) + .setParallelism(parallelism) + .setMaxParallelism(parallelism * 2); tableLoader = catalogExtension.tableLoader(); } private void appendRowsToTable(List allRows) throws Exception { DataFormatConverters.RowConverter converter = - new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes()); + new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes()); DataStream dataStream = - env.addSource( - new BoundedTestSource<>( - allRows.stream().map(converter::toExternal).toArray(Row[]::new)), - ROW_TYPE_INFO) - .map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)) - .partitionCustom( - new BucketPartitioner(table.spec()), - new BucketPartitionKeySelector( - table.spec(), - table.schema(), - FlinkSink.toFlinkRowType(table.schema(), SimpleDataUtil.FLINK_SCHEMA))); + env.addSource( + new BoundedTestSource<>( + allRows.stream().map(converter::toExternal).toArray(Row[]::new)), + ROW_TYPE_INFO) + .map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)) + .partitionCustom( + new BucketPartitioner(table.spec()), + new BucketPartitionKeySelector( + table.spec(), + table.schema(), + FlinkSink.toFlinkRowType(table.schema(), SimpleDataUtil.FLINK_SCHEMA))); FlinkSink.forRowData(dataStream) - .table(table) - .tableLoader(tableLoader) - .writeParallelism(parallelism) - .distributionMode(DistributionMode.NONE) - .append(); + .table(table) + .tableLoader(tableLoader) + .writeParallelism(parallelism) + .distributionMode(DistributionMode.NONE) + .append(); env.execute("Test Iceberg DataStream"); @@ -136,8 +136,8 @@ private void appendRowsToTable(List allRows) throws Exception { @ParameterizedTest @EnumSource( - value = TableSchemaType.class, - names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + value = TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) public void testSendRecordsToAllBucketsEvenly(TableSchemaType tableSchemaType) throws Exception { setupEnvironment(tableSchemaType); List rows = generateTestDataRows(); @@ -174,7 +174,7 @@ private List generateTestDataRows() { } private TableTestStats extractPartitionResults(TableSchemaType tableSchemaType) - throws IOException { + throws IOException { int totalRecordCount = 0; Map> writersPerBucket = Maps.newHashMap(); // > Map filesPerBucket = Maps.newHashMap(); // @@ -192,10 +192,10 @@ private TableTestStats extractPartitionResults(TableSchemaType tableSchemaType) totalRecordCount += recordCountInFile; int bucketId = - scanTask - .file() - .partition() - .get(tableSchemaType.bucketPartitionColumnPosition(), Integer.class); + scanTask + .file() + .partition() + .get(tableSchemaType.bucketPartitionColumnPosition(), Integer.class); writersPerBucket.computeIfAbsent(bucketId, k -> Lists.newArrayList()); writersPerBucket.get(bucketId).add(writerId); filesPerBucket.put(bucketId, filesPerBucket.getOrDefault(bucketId, 0) + 1); @@ -214,10 +214,10 @@ private static class TableTestStats { final Map rowsPerWriter; TableTestStats( - int totalRecordCount, - Map> writersPerBucket, - Map numFilesPerBucket, - Map rowsPerWriter) { + int totalRecordCount, + Map> writersPerBucket, + Map numFilesPerBucket, + Map rowsPerWriter) { this.totalRowCount = totalRecordCount; this.writersPerBucket = writersPerBucket; this.numFilesPerBucket = numFilesPerBucket;