From 94f5061ae40d2c5251379f1909aa83a19a51ab1a Mon Sep 17 00:00:00 2001 From: schongloo Date: Tue, 15 Aug 2023 10:17:11 -0700 Subject: [PATCH] Backporting PR 7161 from 1.16 -> 1.15/1.17 --- .../sink/BucketPartitionKeySelector.java | 70 +++++ .../iceberg/flink/sink/BucketPartitioner.java | 103 ++++++++ .../flink/sink/BucketPartitionerUtil.java | 125 +++++++++ .../apache/iceberg/flink/sink/FlinkSink.java | 8 +- .../iceberg/flink/HadoopCatalogExtension.java | 104 ++++++++ .../sink/TestBucketPartitionKeySelector.java | 65 +++++ .../flink/sink/TestBucketPartitioner.java | 107 ++++++++ ...TestBucketPartitionerFlinkIcebergSink.java | 241 ++++++++++++++++++ .../flink/sink/TestBucketPartitionerUtil.java | 126 +++++++++ .../sink/BucketPartitionKeySelector.java | 70 +++++ .../iceberg/flink/sink/BucketPartitioner.java | 103 ++++++++ .../flink/sink/BucketPartitionerUtil.java | 125 +++++++++ .../apache/iceberg/flink/sink/FlinkSink.java | 8 +- .../iceberg/flink/HadoopCatalogExtension.java | 104 ++++++++ .../sink/TestBucketPartitionKeySelector.java | 65 +++++ .../flink/sink/TestBucketPartitioner.java | 107 ++++++++ ...TestBucketPartitionerFlinkIcebergSink.java | 241 ++++++++++++++++++ .../flink/sink/TestBucketPartitionerUtil.java | 126 +++++++++ 18 files changed, 1896 insertions(+), 2 deletions(-) create mode 100644 flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionKeySelector.java create mode 100644 flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java create mode 100644 flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionerUtil.java create mode 100644 flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/HadoopCatalogExtension.java create mode 100644 flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionKeySelector.java create mode 100644 flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java create mode 100644 flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java create mode 100644 flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtil.java create mode 100644 flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionKeySelector.java create mode 100644 flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java create mode 100644 flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionerUtil.java create mode 100644 flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/HadoopCatalogExtension.java create mode 100644 flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionKeySelector.java create mode 100644 flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java create mode 100644 flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java create mode 100644 flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtil.java diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionKeySelector.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionKeySelector.java new file mode 100644 index 000000000000..1cb6e013bd2c --- /dev/null +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionKeySelector.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.util.stream.IntStream; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.RowDataWrapper; + +/** + * A {@link KeySelector} that extracts the bucketId from a data row's bucket partition as the key. + * To be used with the {@link BucketPartitioner}. + */ +class BucketPartitionKeySelector implements KeySelector { + + private final Schema schema; + private final PartitionKey partitionKey; + private final RowType flinkSchema; + private final int bucketFieldPosition; + + private transient RowDataWrapper rowDataWrapper; + + BucketPartitionKeySelector(PartitionSpec partitionSpec, Schema schema, RowType flinkSchema) { + this.schema = schema; + this.partitionKey = new PartitionKey(partitionSpec, schema); + this.flinkSchema = flinkSchema; + this.bucketFieldPosition = getBucketFieldPosition(partitionSpec); + } + + private int getBucketFieldPosition(PartitionSpec partitionSpec) { + int bucketFieldId = BucketPartitionerUtil.getBucketFieldId(partitionSpec); + return IntStream.range(0, partitionSpec.fields().size()) + .filter(i -> partitionSpec.fields().get(i).fieldId() == bucketFieldId) + .toArray()[0]; + } + + private RowDataWrapper lazyRowDataWrapper() { + if (rowDataWrapper == null) { + rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); + } + + return rowDataWrapper; + } + + @Override + public Integer getKey(RowData rowData) { + partitionKey.partition(lazyRowDataWrapper().wrap(rowData)); + return partitionKey.get(bucketFieldPosition, Integer.class); + } +} diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java new file mode 100644 index 000000000000..9c9a117906e2 --- /dev/null +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * This partitioner will redirect records to writers deterministically based on the Bucket partition + * spec. It'll attempt to optimize the file size written depending on whether numPartitions is + * greater, less or equal than the maxNumBuckets. Note: The current implementation only supports ONE + * bucket in the partition spec. + */ +class BucketPartitioner implements Partitioner { + + static final String BUCKET_NULL_MESSAGE = "bucketId cannot be null"; + static final String BUCKET_LESS_THAN_LOWER_BOUND_MESSAGE = + "Invalid bucket ID %s: must be non-negative."; + static final String BUCKET_GREATER_THAN_UPPER_BOUND_MESSAGE = + "Invalid bucket ID %s: must be less than bucket limit: %s."; + + private final int maxNumBuckets; + + // To hold the OFFSET of the next writer to use for any bucket, only used when writers > the + // number of buckets + private final int[] currentBucketWriterOffset; + + BucketPartitioner(PartitionSpec partitionSpec) { + this.maxNumBuckets = BucketPartitionerUtil.getMaxNumBuckets(partitionSpec); + this.currentBucketWriterOffset = new int[maxNumBuckets]; + } + + /** + * Determine the partition id based on the following criteria: If the number of writers <= the + * number of buckets, an evenly distributed number of buckets will be assigned to each writer (one + * writer -> many buckets). Conversely, if the number of writers > the number of buckets the logic + * is handled by the {@link #getPartitionWithMoreWritersThanBuckets + * getPartitionWritersGreaterThanBuckets} method. + * + * @param bucketId the bucketId for each request + * @param numPartitions the total number of partitions + * @return the partition id (writer) to use for each request + */ + @Override + public int partition(Integer bucketId, int numPartitions) { + Preconditions.checkNotNull(bucketId, BUCKET_NULL_MESSAGE); + Preconditions.checkArgument(bucketId >= 0, BUCKET_LESS_THAN_LOWER_BOUND_MESSAGE, bucketId); + Preconditions.checkArgument( + bucketId < maxNumBuckets, BUCKET_GREATER_THAN_UPPER_BOUND_MESSAGE, bucketId, maxNumBuckets); + + if (numPartitions <= maxNumBuckets) { + return bucketId % numPartitions; + } else { + return getPartitionWithMoreWritersThanBuckets(bucketId, numPartitions); + } + } + + /*- + * If the number of writers > the number of buckets each partitioner will keep a state of multiple + * writers per bucket as evenly as possible, and will round-robin the requests across them, in this + * case each writer will target only one bucket at all times (many writers -> one bucket). Example: + * Configuration: numPartitions (writers) = 5, maxBuckets = 2 + * Expected behavior: + * - Records for Bucket 0 will be "round robin" between Writers 0, 2 and 4 + * - Records for Bucket 1 will always use Writer 1 and 3 + * Notes: + * - maxNumWritersPerBucket determines when to reset the currentBucketWriterOffset to 0 for this bucketId + * - When numPartitions is not evenly divisible by maxBuckets, some buckets will have one more writer (extraWriter). + * In this example Bucket 0 has an "extra writer" to consider before resetting its offset to 0. + * + * @return the destination partition index (writer subtask id) + */ + private int getPartitionWithMoreWritersThanBuckets(int bucketId, int numPartitions) { + int currentOffset = currentBucketWriterOffset[bucketId]; + // Determine if this bucket requires an "extra writer" + int extraWriter = bucketId < (numPartitions % maxNumBuckets) ? 1 : 0; + // The max number of writers this bucket can have + int maxNumWritersPerBucket = (numPartitions / maxNumBuckets) + extraWriter; + + // Increment the writer offset or reset if it's reached the max for this bucket + int nextOffset = currentOffset == maxNumWritersPerBucket - 1 ? 0 : currentOffset + 1; + currentBucketWriterOffset[bucketId] = nextOffset; + + return bucketId + (maxNumBuckets * currentOffset); + } +} diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionerUtil.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionerUtil.java new file mode 100644 index 000000000000..c33207728d3e --- /dev/null +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionerUtil.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.transforms.PartitionSpecVisitor; + +final class BucketPartitionerUtil { + static final String BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE = + "Invalid number of buckets: %s (must be 1)"; + + private BucketPartitionerUtil() {} + + /** + * Determines whether the PartitionSpec has one and only one Bucket definition + * + * @param partitionSpec the partition spec in question + * @return whether the PartitionSpec has only one Bucket + */ + static boolean hasOneBucketField(PartitionSpec partitionSpec) { + List> bucketFields = getBucketFields(partitionSpec); + return bucketFields != null && bucketFields.size() == 1; + } + + /** + * Extracts the Bucket definition from a PartitionSpec. + * + * @param partitionSpec the partition spec in question + * @return the Bucket definition in the form of a tuple (fieldId, maxNumBuckets) + */ + private static Tuple2 getBucketFieldInfo(PartitionSpec partitionSpec) { + List> bucketFields = getBucketFields(partitionSpec); + Preconditions.checkArgument( + bucketFields.size() == 1, + BucketPartitionerUtil.BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE, + bucketFields.size()); + return bucketFields.get(0); + } + + static int getBucketFieldId(PartitionSpec partitionSpec) { + return getBucketFieldInfo(partitionSpec).f0; + } + + static int getMaxNumBuckets(PartitionSpec partitionSpec) { + return getBucketFieldInfo(partitionSpec).f1; + } + + private static List> getBucketFields(PartitionSpec spec) { + return PartitionSpecVisitor.visit(spec, new BucketPartitionSpecVisitor()).stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + private static class BucketPartitionSpecVisitor + implements PartitionSpecVisitor> { + @Override + public Tuple2 identity(int fieldId, String sourceName, int sourceId) { + return null; + } + + @Override + public Tuple2 bucket( + int fieldId, String sourceName, int sourceId, int numBuckets) { + return new Tuple2<>(fieldId, numBuckets); + } + + @Override + public Tuple2 truncate( + int fieldId, String sourceName, int sourceId, int width) { + return null; + } + + @Override + public Tuple2 year(int fieldId, String sourceName, int sourceId) { + return null; + } + + @Override + public Tuple2 month(int fieldId, String sourceName, int sourceId) { + return null; + } + + @Override + public Tuple2 day(int fieldId, String sourceName, int sourceId) { + return null; + } + + @Override + public Tuple2 hour(int fieldId, String sourceName, int sourceId) { + return null; + } + + @Override + public Tuple2 alwaysNull(int fieldId, String sourceName, int sourceId) { + return null; + } + + @Override + public Tuple2 unknown( + int fieldId, String sourceName, int sourceId, String transform) { + return null; + } + } +} diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 844efd6ca6d7..023702790116 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -509,7 +509,13 @@ private DataStream distributeDataStream( + "and table is unpartitioned"); return input; } else { - return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + if (BucketPartitionerUtil.hasOneBucketField(partitionSpec)) { + return input.partitionCustom( + new BucketPartitioner(partitionSpec), + new BucketPartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + } else { + return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + } } } else { if (partitionSpec.isUnpartitioned()) { diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/HadoopCatalogExtension.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/HadoopCatalogExtension.java new file mode 100644 index 000000000000..d8e1325254d9 --- /dev/null +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/HadoopCatalogExtension.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.UUID; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +public class HadoopCatalogExtension + implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback, AfterEachCallback { + protected final String database; + protected final String tableName; + + protected Path temporaryFolder; + protected Catalog catalog; + protected CatalogLoader catalogLoader; + protected String warehouse; + protected TableLoader tableLoader; + + public HadoopCatalogExtension(String database, String tableName) { + this.database = database; + this.tableName = tableName; + } + + @Override + public void beforeAll(ExtensionContext context) throws Exception { + this.temporaryFolder = Files.createTempDirectory("junit5_hadoop_catalog-"); + } + + @Override + public void afterAll(ExtensionContext context) throws Exception { + FileUtils.deleteDirectory(temporaryFolder.toFile()); + } + + @Override + public void beforeEach(ExtensionContext context) throws Exception { + Assertions.assertThat(temporaryFolder).exists().isDirectory(); + this.warehouse = "file:" + temporaryFolder + "/" + UUID.randomUUID(); + this.catalogLoader = + CatalogLoader.hadoop( + "hadoop", + new Configuration(), + ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouse)); + this.catalog = catalogLoader.loadCatalog(); + this.tableLoader = + TableLoader.fromCatalog(catalogLoader, TableIdentifier.of(database, tableName)); + } + + @Override + public void afterEach(ExtensionContext context) throws Exception { + try { + catalog.dropTable(TableIdentifier.of(database, tableName)); + ((HadoopCatalog) catalog).close(); + tableLoader.close(); + } catch (Exception e) { + throw new RuntimeException("Failed to close catalog resource"); + } + } + + public TableLoader tableLoader() { + return tableLoader; + } + + public Catalog catalog() { + return catalog; + } + + public CatalogLoader catalogLoader() { + return catalogLoader; + } + + public String warehouse() { + return warehouse; + } +} diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionKeySelector.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionKeySelector.java new file mode 100644 index 000000000000..5ebcc6361c7b --- /dev/null +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionKeySelector.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.sink.TestBucketPartitionerUtil.TableSchemaType; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +public class TestBucketPartitionKeySelector { + + @ParameterizedTest + @EnumSource( + value = TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + public void testCorrectKeySelection(TableSchemaType tableSchemaType) { + int numBuckets = 60; + + PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets); + BucketPartitionKeySelector keySelector = + new BucketPartitionKeySelector( + partitionSpec, SimpleDataUtil.SCHEMA, SimpleDataUtil.ROW_TYPE); + + TestBucketPartitionerUtil.generateRowsForBucketIdRange(2, numBuckets) + .forEach( + rowData -> { + int expectedBucketId = + TestBucketPartitionerUtil.computeBucketId( + numBuckets, rowData.getString(1).toString()); + Integer key = keySelector.getKey(rowData); + Assertions.assertThat(key).isEqualTo(expectedBucketId); + }); + } + + @Test + public void testKeySelectorMultipleBucketsFail() { + PartitionSpec partitionSpec = TableSchemaType.TWO_BUCKETS.getPartitionSpec(1); + + Assertions.assertThatExceptionOfType(RuntimeException.class) + .isThrownBy( + () -> + new BucketPartitionKeySelector( + partitionSpec, SimpleDataUtil.SCHEMA, SimpleDataUtil.ROW_TYPE)) + .withMessage(BucketPartitionerUtil.BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE, 2); + } +} diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java new file mode 100644 index 000000000000..835713e6b417 --- /dev/null +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_GREATER_THAN_UPPER_BOUND_MESSAGE; +import static org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_LESS_THAN_LOWER_BOUND_MESSAGE; +import static org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_NULL_MESSAGE; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.flink.sink.TestBucketPartitionerUtil.TableSchemaType; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +public class TestBucketPartitioner { + + static final int DEFAULT_NUM_BUCKETS = 60; + + @ParameterizedTest + @CsvSource({"ONE_BUCKET,50", "IDENTITY_AND_BUCKET,50", "ONE_BUCKET,60", "IDENTITY_AND_BUCKET,60"}) + public void testPartitioningParallelismGreaterThanBuckets( + String schemaTypeStr, String numBucketsStr) { + int numPartitions = 500; + TableSchemaType tableSchemaType = TableSchemaType.valueOf(schemaTypeStr); + int numBuckets = Integer.parseInt(numBucketsStr); + PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets); + BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec); + + int bucketId = 0; + for (int expectedIndex = 0; expectedIndex < numPartitions; expectedIndex++) { + int actualPartitionIndex = bucketPartitioner.partition(bucketId, numPartitions); + Assertions.assertThat(actualPartitionIndex).isEqualTo(expectedIndex); + bucketId++; + if (bucketId == numBuckets) { + bucketId = 0; + } + } + } + + @ParameterizedTest + @CsvSource({"ONE_BUCKET,50", "IDENTITY_AND_BUCKET,50", "ONE_BUCKET,60", "IDENTITY_AND_BUCKET,60"}) + public void testPartitioningParallelismEqualLessThanBuckets( + String schemaTypeStr, String numBucketsStr) { + int numPartitions = 30; + TableSchemaType tableSchemaType = TableSchemaType.valueOf(schemaTypeStr); + int numBuckets = Integer.parseInt(numBucketsStr); + PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets); + BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec); + + for (int bucketId = 0; bucketId < numBuckets; bucketId++) { + int actualPartitionIndex = bucketPartitioner.partition(bucketId, numPartitions); + Assertions.assertThat(actualPartitionIndex).isEqualTo(bucketId % numPartitions); + } + } + + @Test + public void testPartitionerBucketIdNullFail() { + PartitionSpec partitionSpec = TableSchemaType.ONE_BUCKET.getPartitionSpec(DEFAULT_NUM_BUCKETS); + BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec); + + Assertions.assertThatExceptionOfType(RuntimeException.class) + .isThrownBy(() -> bucketPartitioner.partition(null, DEFAULT_NUM_BUCKETS)) + .withMessage(BUCKET_NULL_MESSAGE); + } + + @Test + public void testPartitionerMultipleBucketsFail() { + PartitionSpec partitionSpec = TableSchemaType.TWO_BUCKETS.getPartitionSpec(DEFAULT_NUM_BUCKETS); + + Assertions.assertThatExceptionOfType(RuntimeException.class) + .isThrownBy(() -> new BucketPartitioner(partitionSpec)) + .withMessage(BucketPartitionerUtil.BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE, 2); + } + + @Test + public void testPartitionerBucketIdOutOfRangeFail() { + PartitionSpec partitionSpec = TableSchemaType.ONE_BUCKET.getPartitionSpec(DEFAULT_NUM_BUCKETS); + BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec); + + int negativeBucketId = -1; + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> bucketPartitioner.partition(negativeBucketId, 1)) + .withMessage(BUCKET_LESS_THAN_LOWER_BOUND_MESSAGE, negativeBucketId); + + int tooBigBucketId = DEFAULT_NUM_BUCKETS; + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> bucketPartitioner.partition(tooBigBucketId, 1)) + .withMessage(BUCKET_GREATER_THAN_UPPER_BOUND_MESSAGE, tooBigBucketId, DEFAULT_NUM_BUCKETS); + } +} diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java new file mode 100644 index 000000000000..29a0898a1b76 --- /dev/null +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG; +import static org.apache.iceberg.flink.TestFixtures.DATABASE; +import static org.apache.iceberg.flink.TestFixtures.TABLE_IDENTIFIER; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.types.Row; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.sink.TestBucketPartitionerUtil.TableSchemaType; +import org.apache.iceberg.flink.source.BoundedTestSource; +import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +public class TestBucketPartitionerFlinkIcebergSink { + + private static final int NUMBER_TASK_MANAGERS = 1; + private static final int SLOTS_PER_TASK_MANAGER = 8; + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(NUMBER_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER) + .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) + .build()); + + @RegisterExtension + private static final HadoopCatalogExtension catalogExtension = + new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); + + private static final TypeInformation ROW_TYPE_INFO = + new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); + + // Parallelism = 8 (parallelism > numBuckets) throughout the test suite + private final int parallelism = NUMBER_TASK_MANAGERS * SLOTS_PER_TASK_MANAGER; + private final FileFormat format = FileFormat.PARQUET; + private final int numBuckets = 4; + + private Table table; + private StreamExecutionEnvironment env; + private TableLoader tableLoader; + + private void setupEnvironment(TableSchemaType tableSchemaType) { + PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets); + table = + catalogExtension + .catalog() + .createTable( + TABLE_IDENTIFIER, + SimpleDataUtil.SCHEMA, + partitionSpec, + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + env = + StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG) + .enableCheckpointing(100) + .setParallelism(parallelism) + .setMaxParallelism(parallelism * 2); + tableLoader = catalogExtension.tableLoader(); + } + + private void appendRowsToTable(List allRows) throws Exception { + DataFormatConverters.RowConverter converter = + new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes()); + + DataStream dataStream = + env.addSource( + new BoundedTestSource<>( + allRows.stream().map(converter::toExternal).toArray(Row[]::new)), + ROW_TYPE_INFO) + .map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)); + + FlinkSink.forRowData(dataStream) + .table(table) + .tableLoader(tableLoader) + .writeParallelism(parallelism) + .distributionMode(DistributionMode.HASH) + .append(); + + env.execute("Test Iceberg DataStream"); + + SimpleDataUtil.assertTableRows(table, allRows); + } + + @ParameterizedTest + @EnumSource( + value = TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + public void testSendRecordsToAllBucketsEvenly(TableSchemaType tableSchemaType) throws Exception { + setupEnvironment(tableSchemaType); + List rows = generateTestDataRows(); + + appendRowsToTable(rows); + TableTestStats stats = extractPartitionResults(tableSchemaType); + + Assertions.assertThat(stats.totalRowCount).isEqualTo(rows.size()); + // All 4 buckets should've been written to + Assertions.assertThat(stats.writersPerBucket.size()).isEqualTo(numBuckets); + Assertions.assertThat(stats.numFilesPerBucket.size()).isEqualTo(numBuckets); + // Writer expectation (2 writers per bucket): + // - Bucket0 -> Writers [0, 4] + // - Bucket1 -> Writers [1, 5] + // - Bucket2 -> Writers [2, 6] + // - Bucket3 -> Writers [3, 7] + for (int i = 0, j = numBuckets; i < numBuckets; i++, j++) { + Assertions.assertThat(stats.writersPerBucket.get(i)).hasSameElementsAs(Arrays.asList(i, j)); + // 2 files per bucket (one file is created by each writer) + Assertions.assertThat(stats.numFilesPerBucket.get(i)).isEqualTo(2); + // 2 rows per file (total of 16 rows across 8 files) + Assertions.assertThat(stats.rowsPerWriter.get(i)).isEqualTo(2); + } + } + + /** + * Verifies the BucketPartitioner is not used when the PartitionSpec has more than 1 bucket, and + * that it should fallback to input.keyBy + */ + @ParameterizedTest + @EnumSource(value = TableSchemaType.class, names = "TWO_BUCKETS") + public void testMultipleBucketsFallback(TableSchemaType tableSchemaType) throws Exception { + setupEnvironment(tableSchemaType); + List rows = generateTestDataRows(); + + appendRowsToTable(rows); + TableTestStats stats = extractPartitionResults(tableSchemaType); + + Assertions.assertThat(stats.totalRowCount).isEqualTo(rows.size()); + for (int i = 0, j = numBuckets; i < numBuckets; i++, j++) { + // Only 1 file per bucket will be created when falling back to input.keyBy(...) + Assertions.assertThat((int) stats.numFilesPerBucket.get(i)).isEqualTo(1); + } + } + + /** + * Generating 16 rows to be sent uniformly to all writers (round-robin across 8 writers -> 4 + * buckets) + */ + private List generateTestDataRows() { + int totalNumRows = parallelism * 2; + int numRowsPerBucket = totalNumRows / numBuckets; + return TestBucketPartitionerUtil.generateRowsForBucketIdRange(numRowsPerBucket, numBuckets); + } + + private TableTestStats extractPartitionResults(TableSchemaType tableSchemaType) + throws IOException { + int totalRecordCount = 0; + Map> writersPerBucket = Maps.newHashMap(); // > + Map filesPerBucket = Maps.newHashMap(); // + Map rowsPerWriter = Maps.newHashMap(); // + + try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { + for (FileScanTask scanTask : fileScanTasks) { + long recordCountInFile = scanTask.file().recordCount(); + + String[] splitFilePath = scanTask.file().path().toString().split("/"); + // Filename example: 00007-0-a7d3a29a-33e9-4740-88f4-0f494397d60c-00001.parquet + // Writer ID: .......^^^^^ + String filename = splitFilePath[splitFilePath.length - 1]; + int writerId = Integer.parseInt(filename.split("-")[0]); + + totalRecordCount += recordCountInFile; + int bucketId = + scanTask + .file() + .partition() + .get(tableSchemaType.bucketPartitionColumnPosition(), Integer.class); + writersPerBucket.computeIfAbsent(bucketId, k -> Lists.newArrayList()); + writersPerBucket.get(bucketId).add(writerId); + filesPerBucket.put(bucketId, filesPerBucket.getOrDefault(bucketId, 0) + 1); + rowsPerWriter.put(writerId, rowsPerWriter.getOrDefault(writerId, 0L) + recordCountInFile); + } + } + + return new TableTestStats(totalRecordCount, writersPerBucket, filesPerBucket, rowsPerWriter); + } + + /** DTO to hold Test Stats */ + private static class TableTestStats { + final int totalRowCount; + final Map> writersPerBucket; + final Map numFilesPerBucket; + final Map rowsPerWriter; + + TableTestStats( + int totalRecordCount, + Map> writersPerBucket, + Map numFilesPerBucket, + Map rowsPerWriter) { + this.totalRowCount = totalRecordCount; + this.writersPerBucket = writersPerBucket; + this.numFilesPerBucket = numFilesPerBucket; + this.rowsPerWriter = rowsPerWriter; + } + } +} diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtil.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtil.java new file mode 100644 index 000000000000..e1309bfac6d5 --- /dev/null +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtil.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.util.List; +import java.util.UUID; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.BucketUtil; + +final class TestBucketPartitionerUtil { + + enum TableSchemaType { + ONE_BUCKET { + @Override + public int bucketPartitionColumnPosition() { + return 0; + } + + @Override + public PartitionSpec getPartitionSpec(int numBuckets) { + return PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("data", numBuckets).build(); + } + }, + IDENTITY_AND_BUCKET { + @Override + public int bucketPartitionColumnPosition() { + return 1; + } + + @Override + public PartitionSpec getPartitionSpec(int numBuckets) { + return PartitionSpec.builderFor(SimpleDataUtil.SCHEMA) + .identity("id") + .bucket("data", numBuckets) + .build(); + } + }, + TWO_BUCKETS { + @Override + public int bucketPartitionColumnPosition() { + return 1; + } + + @Override + public PartitionSpec getPartitionSpec(int numBuckets) { + return PartitionSpec.builderFor(SimpleDataUtil.SCHEMA) + .bucket("id", numBuckets) + .bucket("data", numBuckets) + .build(); + } + }; + + public abstract int bucketPartitionColumnPosition(); + + public abstract PartitionSpec getPartitionSpec(int numBuckets); + } + + private TestBucketPartitionerUtil() {} + + /** + * Utility method to generate rows whose values will "hash" to a range of bucketIds (from 0 to + * numBuckets - 1) + * + * @param numRowsPerBucket how many different rows should be generated per bucket + * @param numBuckets max number of buckets to consider + * @return the list of rows whose data "hashes" to the desired bucketId + */ + static List generateRowsForBucketIdRange(int numRowsPerBucket, int numBuckets) { + List rows = Lists.newArrayListWithCapacity(numBuckets * numRowsPerBucket); + // For some of our tests, this order of the generated rows matters + for (int i = 0; i < numRowsPerBucket; i++) { + for (int bucketId = 0; bucketId < numBuckets; bucketId++) { + String value = generateValueForBucketId(bucketId, numBuckets); + rows.add(GenericRowData.of(1, StringData.fromString(value))); + } + } + return rows; + } + + /** + * Utility method to generate a UUID string that will "hash" to a desired bucketId + * + * @param bucketId the desired bucketId + * @return the string data that "hashes" to the desired bucketId + */ + private static String generateValueForBucketId(int bucketId, int numBuckets) { + while (true) { + String uuid = UUID.randomUUID().toString(); + if (computeBucketId(numBuckets, uuid) == bucketId) { + return uuid; + } + } + } + + /** + * Utility that performs the same hashing/bucketing mechanism used by Bucket.java + * + * @param numBuckets max number of buckets to consider + * @param value the string to compute the bucketId from + * @return the computed bucketId + */ + static int computeBucketId(int numBuckets, String value) { + return (BucketUtil.hash(value) & Integer.MAX_VALUE) % numBuckets; + } +} diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionKeySelector.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionKeySelector.java new file mode 100644 index 000000000000..1cb6e013bd2c --- /dev/null +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionKeySelector.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.util.stream.IntStream; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.RowDataWrapper; + +/** + * A {@link KeySelector} that extracts the bucketId from a data row's bucket partition as the key. + * To be used with the {@link BucketPartitioner}. + */ +class BucketPartitionKeySelector implements KeySelector { + + private final Schema schema; + private final PartitionKey partitionKey; + private final RowType flinkSchema; + private final int bucketFieldPosition; + + private transient RowDataWrapper rowDataWrapper; + + BucketPartitionKeySelector(PartitionSpec partitionSpec, Schema schema, RowType flinkSchema) { + this.schema = schema; + this.partitionKey = new PartitionKey(partitionSpec, schema); + this.flinkSchema = flinkSchema; + this.bucketFieldPosition = getBucketFieldPosition(partitionSpec); + } + + private int getBucketFieldPosition(PartitionSpec partitionSpec) { + int bucketFieldId = BucketPartitionerUtil.getBucketFieldId(partitionSpec); + return IntStream.range(0, partitionSpec.fields().size()) + .filter(i -> partitionSpec.fields().get(i).fieldId() == bucketFieldId) + .toArray()[0]; + } + + private RowDataWrapper lazyRowDataWrapper() { + if (rowDataWrapper == null) { + rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); + } + + return rowDataWrapper; + } + + @Override + public Integer getKey(RowData rowData) { + partitionKey.partition(lazyRowDataWrapper().wrap(rowData)); + return partitionKey.get(bucketFieldPosition, Integer.class); + } +} diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java new file mode 100644 index 000000000000..9c9a117906e2 --- /dev/null +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * This partitioner will redirect records to writers deterministically based on the Bucket partition + * spec. It'll attempt to optimize the file size written depending on whether numPartitions is + * greater, less or equal than the maxNumBuckets. Note: The current implementation only supports ONE + * bucket in the partition spec. + */ +class BucketPartitioner implements Partitioner { + + static final String BUCKET_NULL_MESSAGE = "bucketId cannot be null"; + static final String BUCKET_LESS_THAN_LOWER_BOUND_MESSAGE = + "Invalid bucket ID %s: must be non-negative."; + static final String BUCKET_GREATER_THAN_UPPER_BOUND_MESSAGE = + "Invalid bucket ID %s: must be less than bucket limit: %s."; + + private final int maxNumBuckets; + + // To hold the OFFSET of the next writer to use for any bucket, only used when writers > the + // number of buckets + private final int[] currentBucketWriterOffset; + + BucketPartitioner(PartitionSpec partitionSpec) { + this.maxNumBuckets = BucketPartitionerUtil.getMaxNumBuckets(partitionSpec); + this.currentBucketWriterOffset = new int[maxNumBuckets]; + } + + /** + * Determine the partition id based on the following criteria: If the number of writers <= the + * number of buckets, an evenly distributed number of buckets will be assigned to each writer (one + * writer -> many buckets). Conversely, if the number of writers > the number of buckets the logic + * is handled by the {@link #getPartitionWithMoreWritersThanBuckets + * getPartitionWritersGreaterThanBuckets} method. + * + * @param bucketId the bucketId for each request + * @param numPartitions the total number of partitions + * @return the partition id (writer) to use for each request + */ + @Override + public int partition(Integer bucketId, int numPartitions) { + Preconditions.checkNotNull(bucketId, BUCKET_NULL_MESSAGE); + Preconditions.checkArgument(bucketId >= 0, BUCKET_LESS_THAN_LOWER_BOUND_MESSAGE, bucketId); + Preconditions.checkArgument( + bucketId < maxNumBuckets, BUCKET_GREATER_THAN_UPPER_BOUND_MESSAGE, bucketId, maxNumBuckets); + + if (numPartitions <= maxNumBuckets) { + return bucketId % numPartitions; + } else { + return getPartitionWithMoreWritersThanBuckets(bucketId, numPartitions); + } + } + + /*- + * If the number of writers > the number of buckets each partitioner will keep a state of multiple + * writers per bucket as evenly as possible, and will round-robin the requests across them, in this + * case each writer will target only one bucket at all times (many writers -> one bucket). Example: + * Configuration: numPartitions (writers) = 5, maxBuckets = 2 + * Expected behavior: + * - Records for Bucket 0 will be "round robin" between Writers 0, 2 and 4 + * - Records for Bucket 1 will always use Writer 1 and 3 + * Notes: + * - maxNumWritersPerBucket determines when to reset the currentBucketWriterOffset to 0 for this bucketId + * - When numPartitions is not evenly divisible by maxBuckets, some buckets will have one more writer (extraWriter). + * In this example Bucket 0 has an "extra writer" to consider before resetting its offset to 0. + * + * @return the destination partition index (writer subtask id) + */ + private int getPartitionWithMoreWritersThanBuckets(int bucketId, int numPartitions) { + int currentOffset = currentBucketWriterOffset[bucketId]; + // Determine if this bucket requires an "extra writer" + int extraWriter = bucketId < (numPartitions % maxNumBuckets) ? 1 : 0; + // The max number of writers this bucket can have + int maxNumWritersPerBucket = (numPartitions / maxNumBuckets) + extraWriter; + + // Increment the writer offset or reset if it's reached the max for this bucket + int nextOffset = currentOffset == maxNumWritersPerBucket - 1 ? 0 : currentOffset + 1; + currentBucketWriterOffset[bucketId] = nextOffset; + + return bucketId + (maxNumBuckets * currentOffset); + } +} diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionerUtil.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionerUtil.java new file mode 100644 index 000000000000..c33207728d3e --- /dev/null +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionerUtil.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.transforms.PartitionSpecVisitor; + +final class BucketPartitionerUtil { + static final String BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE = + "Invalid number of buckets: %s (must be 1)"; + + private BucketPartitionerUtil() {} + + /** + * Determines whether the PartitionSpec has one and only one Bucket definition + * + * @param partitionSpec the partition spec in question + * @return whether the PartitionSpec has only one Bucket + */ + static boolean hasOneBucketField(PartitionSpec partitionSpec) { + List> bucketFields = getBucketFields(partitionSpec); + return bucketFields != null && bucketFields.size() == 1; + } + + /** + * Extracts the Bucket definition from a PartitionSpec. + * + * @param partitionSpec the partition spec in question + * @return the Bucket definition in the form of a tuple (fieldId, maxNumBuckets) + */ + private static Tuple2 getBucketFieldInfo(PartitionSpec partitionSpec) { + List> bucketFields = getBucketFields(partitionSpec); + Preconditions.checkArgument( + bucketFields.size() == 1, + BucketPartitionerUtil.BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE, + bucketFields.size()); + return bucketFields.get(0); + } + + static int getBucketFieldId(PartitionSpec partitionSpec) { + return getBucketFieldInfo(partitionSpec).f0; + } + + static int getMaxNumBuckets(PartitionSpec partitionSpec) { + return getBucketFieldInfo(partitionSpec).f1; + } + + private static List> getBucketFields(PartitionSpec spec) { + return PartitionSpecVisitor.visit(spec, new BucketPartitionSpecVisitor()).stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + private static class BucketPartitionSpecVisitor + implements PartitionSpecVisitor> { + @Override + public Tuple2 identity(int fieldId, String sourceName, int sourceId) { + return null; + } + + @Override + public Tuple2 bucket( + int fieldId, String sourceName, int sourceId, int numBuckets) { + return new Tuple2<>(fieldId, numBuckets); + } + + @Override + public Tuple2 truncate( + int fieldId, String sourceName, int sourceId, int width) { + return null; + } + + @Override + public Tuple2 year(int fieldId, String sourceName, int sourceId) { + return null; + } + + @Override + public Tuple2 month(int fieldId, String sourceName, int sourceId) { + return null; + } + + @Override + public Tuple2 day(int fieldId, String sourceName, int sourceId) { + return null; + } + + @Override + public Tuple2 hour(int fieldId, String sourceName, int sourceId) { + return null; + } + + @Override + public Tuple2 alwaysNull(int fieldId, String sourceName, int sourceId) { + return null; + } + + @Override + public Tuple2 unknown( + int fieldId, String sourceName, int sourceId, String transform) { + return null; + } + } +} diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 844efd6ca6d7..023702790116 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -509,7 +509,13 @@ private DataStream distributeDataStream( + "and table is unpartitioned"); return input; } else { - return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + if (BucketPartitionerUtil.hasOneBucketField(partitionSpec)) { + return input.partitionCustom( + new BucketPartitioner(partitionSpec), + new BucketPartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + } else { + return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + } } } else { if (partitionSpec.isUnpartitioned()) { diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/HadoopCatalogExtension.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/HadoopCatalogExtension.java new file mode 100644 index 000000000000..d8e1325254d9 --- /dev/null +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/HadoopCatalogExtension.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.UUID; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +public class HadoopCatalogExtension + implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback, AfterEachCallback { + protected final String database; + protected final String tableName; + + protected Path temporaryFolder; + protected Catalog catalog; + protected CatalogLoader catalogLoader; + protected String warehouse; + protected TableLoader tableLoader; + + public HadoopCatalogExtension(String database, String tableName) { + this.database = database; + this.tableName = tableName; + } + + @Override + public void beforeAll(ExtensionContext context) throws Exception { + this.temporaryFolder = Files.createTempDirectory("junit5_hadoop_catalog-"); + } + + @Override + public void afterAll(ExtensionContext context) throws Exception { + FileUtils.deleteDirectory(temporaryFolder.toFile()); + } + + @Override + public void beforeEach(ExtensionContext context) throws Exception { + Assertions.assertThat(temporaryFolder).exists().isDirectory(); + this.warehouse = "file:" + temporaryFolder + "/" + UUID.randomUUID(); + this.catalogLoader = + CatalogLoader.hadoop( + "hadoop", + new Configuration(), + ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouse)); + this.catalog = catalogLoader.loadCatalog(); + this.tableLoader = + TableLoader.fromCatalog(catalogLoader, TableIdentifier.of(database, tableName)); + } + + @Override + public void afterEach(ExtensionContext context) throws Exception { + try { + catalog.dropTable(TableIdentifier.of(database, tableName)); + ((HadoopCatalog) catalog).close(); + tableLoader.close(); + } catch (Exception e) { + throw new RuntimeException("Failed to close catalog resource"); + } + } + + public TableLoader tableLoader() { + return tableLoader; + } + + public Catalog catalog() { + return catalog; + } + + public CatalogLoader catalogLoader() { + return catalogLoader; + } + + public String warehouse() { + return warehouse; + } +} diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionKeySelector.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionKeySelector.java new file mode 100644 index 000000000000..5ebcc6361c7b --- /dev/null +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionKeySelector.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.sink.TestBucketPartitionerUtil.TableSchemaType; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +public class TestBucketPartitionKeySelector { + + @ParameterizedTest + @EnumSource( + value = TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + public void testCorrectKeySelection(TableSchemaType tableSchemaType) { + int numBuckets = 60; + + PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets); + BucketPartitionKeySelector keySelector = + new BucketPartitionKeySelector( + partitionSpec, SimpleDataUtil.SCHEMA, SimpleDataUtil.ROW_TYPE); + + TestBucketPartitionerUtil.generateRowsForBucketIdRange(2, numBuckets) + .forEach( + rowData -> { + int expectedBucketId = + TestBucketPartitionerUtil.computeBucketId( + numBuckets, rowData.getString(1).toString()); + Integer key = keySelector.getKey(rowData); + Assertions.assertThat(key).isEqualTo(expectedBucketId); + }); + } + + @Test + public void testKeySelectorMultipleBucketsFail() { + PartitionSpec partitionSpec = TableSchemaType.TWO_BUCKETS.getPartitionSpec(1); + + Assertions.assertThatExceptionOfType(RuntimeException.class) + .isThrownBy( + () -> + new BucketPartitionKeySelector( + partitionSpec, SimpleDataUtil.SCHEMA, SimpleDataUtil.ROW_TYPE)) + .withMessage(BucketPartitionerUtil.BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE, 2); + } +} diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java new file mode 100644 index 000000000000..835713e6b417 --- /dev/null +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_GREATER_THAN_UPPER_BOUND_MESSAGE; +import static org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_LESS_THAN_LOWER_BOUND_MESSAGE; +import static org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_NULL_MESSAGE; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.flink.sink.TestBucketPartitionerUtil.TableSchemaType; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +public class TestBucketPartitioner { + + static final int DEFAULT_NUM_BUCKETS = 60; + + @ParameterizedTest + @CsvSource({"ONE_BUCKET,50", "IDENTITY_AND_BUCKET,50", "ONE_BUCKET,60", "IDENTITY_AND_BUCKET,60"}) + public void testPartitioningParallelismGreaterThanBuckets( + String schemaTypeStr, String numBucketsStr) { + int numPartitions = 500; + TableSchemaType tableSchemaType = TableSchemaType.valueOf(schemaTypeStr); + int numBuckets = Integer.parseInt(numBucketsStr); + PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets); + BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec); + + int bucketId = 0; + for (int expectedIndex = 0; expectedIndex < numPartitions; expectedIndex++) { + int actualPartitionIndex = bucketPartitioner.partition(bucketId, numPartitions); + Assertions.assertThat(actualPartitionIndex).isEqualTo(expectedIndex); + bucketId++; + if (bucketId == numBuckets) { + bucketId = 0; + } + } + } + + @ParameterizedTest + @CsvSource({"ONE_BUCKET,50", "IDENTITY_AND_BUCKET,50", "ONE_BUCKET,60", "IDENTITY_AND_BUCKET,60"}) + public void testPartitioningParallelismEqualLessThanBuckets( + String schemaTypeStr, String numBucketsStr) { + int numPartitions = 30; + TableSchemaType tableSchemaType = TableSchemaType.valueOf(schemaTypeStr); + int numBuckets = Integer.parseInt(numBucketsStr); + PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets); + BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec); + + for (int bucketId = 0; bucketId < numBuckets; bucketId++) { + int actualPartitionIndex = bucketPartitioner.partition(bucketId, numPartitions); + Assertions.assertThat(actualPartitionIndex).isEqualTo(bucketId % numPartitions); + } + } + + @Test + public void testPartitionerBucketIdNullFail() { + PartitionSpec partitionSpec = TableSchemaType.ONE_BUCKET.getPartitionSpec(DEFAULT_NUM_BUCKETS); + BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec); + + Assertions.assertThatExceptionOfType(RuntimeException.class) + .isThrownBy(() -> bucketPartitioner.partition(null, DEFAULT_NUM_BUCKETS)) + .withMessage(BUCKET_NULL_MESSAGE); + } + + @Test + public void testPartitionerMultipleBucketsFail() { + PartitionSpec partitionSpec = TableSchemaType.TWO_BUCKETS.getPartitionSpec(DEFAULT_NUM_BUCKETS); + + Assertions.assertThatExceptionOfType(RuntimeException.class) + .isThrownBy(() -> new BucketPartitioner(partitionSpec)) + .withMessage(BucketPartitionerUtil.BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE, 2); + } + + @Test + public void testPartitionerBucketIdOutOfRangeFail() { + PartitionSpec partitionSpec = TableSchemaType.ONE_BUCKET.getPartitionSpec(DEFAULT_NUM_BUCKETS); + BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec); + + int negativeBucketId = -1; + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> bucketPartitioner.partition(negativeBucketId, 1)) + .withMessage(BUCKET_LESS_THAN_LOWER_BOUND_MESSAGE, negativeBucketId); + + int tooBigBucketId = DEFAULT_NUM_BUCKETS; + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> bucketPartitioner.partition(tooBigBucketId, 1)) + .withMessage(BUCKET_GREATER_THAN_UPPER_BOUND_MESSAGE, tooBigBucketId, DEFAULT_NUM_BUCKETS); + } +} diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java new file mode 100644 index 000000000000..29a0898a1b76 --- /dev/null +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG; +import static org.apache.iceberg.flink.TestFixtures.DATABASE; +import static org.apache.iceberg.flink.TestFixtures.TABLE_IDENTIFIER; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.types.Row; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.sink.TestBucketPartitionerUtil.TableSchemaType; +import org.apache.iceberg.flink.source.BoundedTestSource; +import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +public class TestBucketPartitionerFlinkIcebergSink { + + private static final int NUMBER_TASK_MANAGERS = 1; + private static final int SLOTS_PER_TASK_MANAGER = 8; + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(NUMBER_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER) + .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) + .build()); + + @RegisterExtension + private static final HadoopCatalogExtension catalogExtension = + new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); + + private static final TypeInformation ROW_TYPE_INFO = + new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); + + // Parallelism = 8 (parallelism > numBuckets) throughout the test suite + private final int parallelism = NUMBER_TASK_MANAGERS * SLOTS_PER_TASK_MANAGER; + private final FileFormat format = FileFormat.PARQUET; + private final int numBuckets = 4; + + private Table table; + private StreamExecutionEnvironment env; + private TableLoader tableLoader; + + private void setupEnvironment(TableSchemaType tableSchemaType) { + PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(numBuckets); + table = + catalogExtension + .catalog() + .createTable( + TABLE_IDENTIFIER, + SimpleDataUtil.SCHEMA, + partitionSpec, + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + env = + StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG) + .enableCheckpointing(100) + .setParallelism(parallelism) + .setMaxParallelism(parallelism * 2); + tableLoader = catalogExtension.tableLoader(); + } + + private void appendRowsToTable(List allRows) throws Exception { + DataFormatConverters.RowConverter converter = + new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes()); + + DataStream dataStream = + env.addSource( + new BoundedTestSource<>( + allRows.stream().map(converter::toExternal).toArray(Row[]::new)), + ROW_TYPE_INFO) + .map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)); + + FlinkSink.forRowData(dataStream) + .table(table) + .tableLoader(tableLoader) + .writeParallelism(parallelism) + .distributionMode(DistributionMode.HASH) + .append(); + + env.execute("Test Iceberg DataStream"); + + SimpleDataUtil.assertTableRows(table, allRows); + } + + @ParameterizedTest + @EnumSource( + value = TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + public void testSendRecordsToAllBucketsEvenly(TableSchemaType tableSchemaType) throws Exception { + setupEnvironment(tableSchemaType); + List rows = generateTestDataRows(); + + appendRowsToTable(rows); + TableTestStats stats = extractPartitionResults(tableSchemaType); + + Assertions.assertThat(stats.totalRowCount).isEqualTo(rows.size()); + // All 4 buckets should've been written to + Assertions.assertThat(stats.writersPerBucket.size()).isEqualTo(numBuckets); + Assertions.assertThat(stats.numFilesPerBucket.size()).isEqualTo(numBuckets); + // Writer expectation (2 writers per bucket): + // - Bucket0 -> Writers [0, 4] + // - Bucket1 -> Writers [1, 5] + // - Bucket2 -> Writers [2, 6] + // - Bucket3 -> Writers [3, 7] + for (int i = 0, j = numBuckets; i < numBuckets; i++, j++) { + Assertions.assertThat(stats.writersPerBucket.get(i)).hasSameElementsAs(Arrays.asList(i, j)); + // 2 files per bucket (one file is created by each writer) + Assertions.assertThat(stats.numFilesPerBucket.get(i)).isEqualTo(2); + // 2 rows per file (total of 16 rows across 8 files) + Assertions.assertThat(stats.rowsPerWriter.get(i)).isEqualTo(2); + } + } + + /** + * Verifies the BucketPartitioner is not used when the PartitionSpec has more than 1 bucket, and + * that it should fallback to input.keyBy + */ + @ParameterizedTest + @EnumSource(value = TableSchemaType.class, names = "TWO_BUCKETS") + public void testMultipleBucketsFallback(TableSchemaType tableSchemaType) throws Exception { + setupEnvironment(tableSchemaType); + List rows = generateTestDataRows(); + + appendRowsToTable(rows); + TableTestStats stats = extractPartitionResults(tableSchemaType); + + Assertions.assertThat(stats.totalRowCount).isEqualTo(rows.size()); + for (int i = 0, j = numBuckets; i < numBuckets; i++, j++) { + // Only 1 file per bucket will be created when falling back to input.keyBy(...) + Assertions.assertThat((int) stats.numFilesPerBucket.get(i)).isEqualTo(1); + } + } + + /** + * Generating 16 rows to be sent uniformly to all writers (round-robin across 8 writers -> 4 + * buckets) + */ + private List generateTestDataRows() { + int totalNumRows = parallelism * 2; + int numRowsPerBucket = totalNumRows / numBuckets; + return TestBucketPartitionerUtil.generateRowsForBucketIdRange(numRowsPerBucket, numBuckets); + } + + private TableTestStats extractPartitionResults(TableSchemaType tableSchemaType) + throws IOException { + int totalRecordCount = 0; + Map> writersPerBucket = Maps.newHashMap(); // > + Map filesPerBucket = Maps.newHashMap(); // + Map rowsPerWriter = Maps.newHashMap(); // + + try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { + for (FileScanTask scanTask : fileScanTasks) { + long recordCountInFile = scanTask.file().recordCount(); + + String[] splitFilePath = scanTask.file().path().toString().split("/"); + // Filename example: 00007-0-a7d3a29a-33e9-4740-88f4-0f494397d60c-00001.parquet + // Writer ID: .......^^^^^ + String filename = splitFilePath[splitFilePath.length - 1]; + int writerId = Integer.parseInt(filename.split("-")[0]); + + totalRecordCount += recordCountInFile; + int bucketId = + scanTask + .file() + .partition() + .get(tableSchemaType.bucketPartitionColumnPosition(), Integer.class); + writersPerBucket.computeIfAbsent(bucketId, k -> Lists.newArrayList()); + writersPerBucket.get(bucketId).add(writerId); + filesPerBucket.put(bucketId, filesPerBucket.getOrDefault(bucketId, 0) + 1); + rowsPerWriter.put(writerId, rowsPerWriter.getOrDefault(writerId, 0L) + recordCountInFile); + } + } + + return new TableTestStats(totalRecordCount, writersPerBucket, filesPerBucket, rowsPerWriter); + } + + /** DTO to hold Test Stats */ + private static class TableTestStats { + final int totalRowCount; + final Map> writersPerBucket; + final Map numFilesPerBucket; + final Map rowsPerWriter; + + TableTestStats( + int totalRecordCount, + Map> writersPerBucket, + Map numFilesPerBucket, + Map rowsPerWriter) { + this.totalRowCount = totalRecordCount; + this.writersPerBucket = writersPerBucket; + this.numFilesPerBucket = numFilesPerBucket; + this.rowsPerWriter = rowsPerWriter; + } + } +} diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtil.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtil.java new file mode 100644 index 000000000000..e1309bfac6d5 --- /dev/null +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtil.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.util.List; +import java.util.UUID; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.BucketUtil; + +final class TestBucketPartitionerUtil { + + enum TableSchemaType { + ONE_BUCKET { + @Override + public int bucketPartitionColumnPosition() { + return 0; + } + + @Override + public PartitionSpec getPartitionSpec(int numBuckets) { + return PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("data", numBuckets).build(); + } + }, + IDENTITY_AND_BUCKET { + @Override + public int bucketPartitionColumnPosition() { + return 1; + } + + @Override + public PartitionSpec getPartitionSpec(int numBuckets) { + return PartitionSpec.builderFor(SimpleDataUtil.SCHEMA) + .identity("id") + .bucket("data", numBuckets) + .build(); + } + }, + TWO_BUCKETS { + @Override + public int bucketPartitionColumnPosition() { + return 1; + } + + @Override + public PartitionSpec getPartitionSpec(int numBuckets) { + return PartitionSpec.builderFor(SimpleDataUtil.SCHEMA) + .bucket("id", numBuckets) + .bucket("data", numBuckets) + .build(); + } + }; + + public abstract int bucketPartitionColumnPosition(); + + public abstract PartitionSpec getPartitionSpec(int numBuckets); + } + + private TestBucketPartitionerUtil() {} + + /** + * Utility method to generate rows whose values will "hash" to a range of bucketIds (from 0 to + * numBuckets - 1) + * + * @param numRowsPerBucket how many different rows should be generated per bucket + * @param numBuckets max number of buckets to consider + * @return the list of rows whose data "hashes" to the desired bucketId + */ + static List generateRowsForBucketIdRange(int numRowsPerBucket, int numBuckets) { + List rows = Lists.newArrayListWithCapacity(numBuckets * numRowsPerBucket); + // For some of our tests, this order of the generated rows matters + for (int i = 0; i < numRowsPerBucket; i++) { + for (int bucketId = 0; bucketId < numBuckets; bucketId++) { + String value = generateValueForBucketId(bucketId, numBuckets); + rows.add(GenericRowData.of(1, StringData.fromString(value))); + } + } + return rows; + } + + /** + * Utility method to generate a UUID string that will "hash" to a desired bucketId + * + * @param bucketId the desired bucketId + * @return the string data that "hashes" to the desired bucketId + */ + private static String generateValueForBucketId(int bucketId, int numBuckets) { + while (true) { + String uuid = UUID.randomUUID().toString(); + if (computeBucketId(numBuckets, uuid) == bucketId) { + return uuid; + } + } + } + + /** + * Utility that performs the same hashing/bucketing mechanism used by Bucket.java + * + * @param numBuckets max number of buckets to consider + * @param value the string to compute the bucketId from + * @return the computed bucketId + */ + static int computeBucketId(int numBuckets, String value) { + return (BucketUtil.hash(value) & Integer.MAX_VALUE) % numBuckets; + } +}