From 714ce69f70b5aaf77efaef65d8280cff702bd47f Mon Sep 17 00:00:00 2001 From: zhengchar Date: Fri, 25 Feb 2022 10:34:14 +0800 Subject: [PATCH] to solve data skew problem on flinksink --- .../java/org/apache/iceberg/PartitionKey.java | 4 ++ .../org/apache/iceberg/PartitionSpec.java | 33 ++++++++++ .../apache/iceberg/flink/sink/FlinkSink.java | 11 +++- .../flink/sink/PartitionKeySelector.java | 62 ++++++++++++++++++- 4 files changed, 108 insertions(+), 2 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/PartitionKey.java b/api/src/main/java/org/apache/iceberg/PartitionKey.java index 71cdb2756ed2..ef0a08506818 100644 --- a/api/src/main/java/org/apache/iceberg/PartitionKey.java +++ b/api/src/main/java/org/apache/iceberg/PartitionKey.java @@ -136,4 +136,8 @@ public boolean equals(Object o) { public int hashCode() { return Arrays.hashCode(partitionTuple); } + + public PartitionSpec getSpec() { + return this.spec; + } } diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java b/api/src/main/java/org/apache/iceberg/PartitionSpec.java index 1352780faf24..845fce2ab4db 100644 --- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java +++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java @@ -28,6 +28,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -63,6 +65,10 @@ public class PartitionSpec implements Serializable { private transient volatile List fieldList = null; private final int lastAssignedFieldId; + // Recognize Bucket Partition + private static final String EXTRACT_BUCKET_NUMBER_EXP = "bucket\\[(\\d+)\\]"; + private static final Pattern pattern = Pattern.compile(EXTRACT_BUCKET_NUMBER_EXP); + private PartitionSpec(Schema schema, int specId, List fields, int lastAssignedFieldId) { this.schema = schema; this.specId = specId; @@ -293,6 +299,33 @@ public String toString() { return sb.toString(); } + /* + * Check This Table Partition Spec is + * Bucket Partition Only or NOT + */ + public boolean isBucketPartiton() { + boolean res = false; + List partFields = fields(); + int bucketCnt = 0; + + for (int i = 0; i < partFields.size(); i++) { + PartitionField field = partFields.get(i); + Matcher matcher = pattern.matcher(field.transform().dedupName()); + + if (!matcher.find()) { + break; + } + + bucketCnt++; + } + + if (bucketCnt == partFields.size()) { + res = true; + } + + return res; + } + private static final PartitionSpec UNPARTITIONED_SPEC = new PartitionSpec(new Schema(), 0, ImmutableList.of(), PARTITION_DATA_ID_START - 1); diff --git a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 6714ae8357a1..c71695b44338 100644 --- a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -418,7 +418,16 @@ private DataStream distributeDataStream(DataStream input, if (partitionSpec.isUnpartitioned()) { return input; } else { - return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + if (partitionSpec.isBucketPartiton()) { + return input.partitionCustom(new Partitioner() { + @Override + public int partition(String key, int numPartitions) { + return Integer.parseInt(key) % numPartitions; + } + }, new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + } else { + return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + } } case RANGE: diff --git a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionKeySelector.java b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionKeySelector.java index 598df09eee83..fe2882a27d31 100644 --- a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionKeySelector.java +++ b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionKeySelector.java @@ -56,9 +56,69 @@ private RowDataWrapper lazyRowDataWrapper() { return rowDataWrapper; } + @SuppressWarnings("unchecked") + private T get(StructLike data, int pos, Class javaClass) { + return data.get(pos, (Class) javaClass); + } + + /* Making data balanceed for a single task to process + * Bucket Partition Only + */ + private Integer getBalanceKey() { + PartitionSpec spec = partitionKey.getSpec(); + List partFields = spec.fields(); + Class[] javaClasses = spec.javaClasses(); + Integer res = null; + int[][] buckArr = new int[partFields.size()][2]; + + for (int i = 0; i < partFields.size(); i++) { + PartitionField field = partFields.get(i); + String valueStr = partitionKey.get(i, javaClasses[i]).toString(); + Integer valueInt = Integer.parseInt(escape(valueStr)); + Matcher matcher = pattern.matcher(field.transform().dedupName()); + + if (matcher.find()) { + String bucketNum = matcher.group(1); + int[] meta = new int[2]; + meta[0] = Integer.parseInt(bucketNum); + meta[1] = valueInt; + buckArr[i] = meta; + } else { + break; + } + } + + if (buckArr.length == partFields.size()) { + int balanceKey = 0; + for (int i = 0; i < buckArr.length; i++) { + int val = buckArr[i][1]; + int leafCnt = 1; + for (int j = i + 1; j < buckArr.length; j++) { + leafCnt = leafCnt * buckArr[j][0]; + } + + balanceKey += leafCnt * val; + } + + res = balanceKey; + } + + return res; + } + @Override public String getKey(RowData row) { + String key = null; partitionKey.partition(lazyRowDataWrapper().wrap(row)); - return partitionKey.toPath(); + boolean isBucketPartTable = partitionKey.getSpec().isBucketPartiton(); + + if (isBucketPartTable) { + Integer balancekey = getBalanceKey(); + key = String.valueOf(balancekey.intValue()); + } else { + key = partitionKey.toPath(); + } + + return key; } }