From aaecd1df280ff84d9e1ba4bd64e0379d4f5bcc4a Mon Sep 17 00:00:00 2001 From: gang_ye Date: Wed, 7 Dec 2022 17:25:21 -0800 Subject: [PATCH 01/15] Implement ShuffleOperator to collect data statistics --- .../flink/sink/shuffle/ShuffleOperator.java | 140 ++++++++++++++++++ .../sink/shuffle/ShuffleRecordWrapper.java | 82 ++++++++++ .../sink/shuffle/TestShuffleOperator.java | 132 +++++++++++++++++ 3 files changed, 354 insertions(+) create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java create mode 100644 flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java new file mode 100644 index 000000000000..d36bde66154b --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.io.Serializable; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * Shuffle operator can help to improve data clustering based on the key. + * + *

It collects the data statistics information, sends to coordinator and gets the global data + * distribution weight from coordinator. Then it will ingest the weight into data stream(wrap by a + * class{@link ShuffleRecordWrapper}) and send to partitioner. + */ +@Internal +public class ShuffleOperator + extends AbstractStreamOperator> + implements OneInputStreamOperator>, OperatorEventHandler { + + private static final long serialVersionUID = 1L; + + private final KeySelector keySelector; + // the type of the key to collect data statistics + private final TypeInformation keyType; + private final OperatorEventGateway operatorEventGateway; + // key is generated by applying KeySelector to record + // value is the times key occurs + private transient Map localDataStatisticsMap; + private transient Map globalDataDistributionWeightMap; + private transient ListState> globalDataDistributionWeightState; + + public ShuffleOperator( + KeySelector keySelector, + TypeInformation keyType, + OperatorEventGateway operatorEventGateway) { + this.keySelector = keySelector; + this.keyType = keyType; + this.operatorEventGateway = operatorEventGateway; + } + + @VisibleForTesting + ListStateDescriptor> generateGlobalDataDistributionWeightDescriptor() { + return new ListStateDescriptor<>( + "globalDataDistributionWeight", new MapTypeInfo<>(keyType, TypeInformation.of(Long.class))); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + localDataStatisticsMap = Maps.newHashMap(); + globalDataDistributionWeightState = + context + .getOperatorStateStore() + .getListState(generateGlobalDataDistributionWeightDescriptor()); + + if (globalDataDistributionWeightState.get() != null + && globalDataDistributionWeightState.get().iterator().hasNext()) { + globalDataDistributionWeightMap = globalDataDistributionWeightState.get().iterator().next(); + } else { + globalDataDistributionWeightMap = Maps.newHashMap(); + } + } + + @Override + public void open() throws Exception { + // TODO: handle scaling up + if (globalDataDistributionWeightMap != null && globalDataDistributionWeightMap.size() > 0) { + output.collect( + new StreamRecord<>( + ShuffleRecordWrapper.fromDistribution(globalDataDistributionWeightMap))); + } + } + + @Override + public void handleOperatorEvent(OperatorEvent evt) { + // TODO: receive event with globalDataDistributionWeight from coordinator and update + // globalDataDistributionWeightMap + } + + @Override + public void processElement(StreamRecord streamRecord) throws Exception { + final K key = keySelector.getKey(streamRecord.getValue()); + localDataStatisticsMap.put(key, localDataStatisticsMap.getOrDefault(key, 0L) + 1); + output.collect(new StreamRecord<>(ShuffleRecordWrapper.fromRecord(streamRecord.getValue()))); + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + long checkpointId = context.getCheckpointId(); + LOG.debug("Taking shuffle operator snapshot for checkpoint {}", checkpointId); + + // Set the globalDataDistributionWeightState state to the latest global value. + if (globalDataDistributionWeightMap != null && globalDataDistributionWeightMap.size() > 0) { + globalDataDistributionWeightState.clear(); + globalDataDistributionWeightState.add(globalDataDistributionWeightMap); + } + + // TODO: send to coordinator + // For now we make it simple to send globalDataDistributionWeightState at checkpoint + + // Reset the local data count + localDataStatisticsMap.clear(); + } + + @VisibleForTesting + Map localDataStatisticsMap() { + return localDataStatisticsMap; + } +} diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java new file mode 100644 index 000000000000..5f50193fe537 --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.io.Serializable; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * The wrapper class for record and data distribution weight + * + *

ShuffleRecordWrapper is sent from ShuffleOperator to partitioner. It may contain a record or + * data distribution weight. Once partitioner receives the weight, it will use that to decide the + * coming record should send to which writer subtask. After shuffling, a filter and mapper are + * required to filter out the data distribution weight, unwrap the object and extract the original + * record type T. + */ +@Internal +public class ShuffleRecordWrapper implements Serializable { + + private static final long serialVersionUID = 1L; + + private final Map globalDataDistributionWeight; + private final T record; + + private ShuffleRecordWrapper(T record, Map globalDataDistributionWeight) { + Preconditions.checkArgument( + record != null ^ globalDataDistributionWeight != null, + "A ShuffleRecordWrapper has to contain exactly one of record and stats, not neither or both"); + this.globalDataDistributionWeight = globalDataDistributionWeight; + this.record = record; + } + + static ShuffleRecordWrapper fromRecord(T record) { + return new ShuffleRecordWrapper<>(record, null); + } + + static ShuffleRecordWrapper fromDistribution( + Map globalDataDistributionWeight) { + return new ShuffleRecordWrapper<>(null, globalDataDistributionWeight); + } + + boolean hasGlobalDataDistributionWeight() { + return globalDataDistributionWeight != null; + } + + boolean hasRecord() { + return record != null; + } + + Map globalDataDistributionWeight() { + return globalDataDistributionWeight; + } + + T record() { + return record; + } + + @Override + public String toString() { + return String.format( + "ShuffleRecordWrapper[globalDataDistributionWeight = %s, record = %s)", + globalDataDistributionWeight, record); + } +} diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java new file mode 100644 index 000000000000..d43d8348ba0b --- /dev/null +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; +import org.apache.flink.streaming.util.MockOutput; +import org.apache.flink.streaming.util.MockStreamConfig; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestShuffleOperator { + private ShuffleOperator operator; + + private Environment getTestingEnvironment() { + return new StreamMockEnvironment( + new Configuration(), + new Configuration(), + new ExecutionConfig(), + 1L, + new MockInputSplitProvider(), + 1, + new TestTaskStateManager()); + } + + @Before + public void before() throws Exception { + MockOperatorEventGateway mockGateway = new MockOperatorEventGateway(); + KeySelector keySelector = + new KeySelector() { + private static final long serialVersionUID = 7662520075515707428L; + + @Override + public String getKey(String value) { + return value; + } + }; + + this.operator = + new ShuffleOperator<>(keySelector, TypeInformation.of(String.class), mockGateway); + Environment env = getTestingEnvironment(); + this.operator.setup( + new OneInputStreamTask(env), + new MockStreamConfig(new Configuration(), 1), + new MockOutput<>(Lists.newArrayList())); + } + + @After + public void clean() throws Exception { + operator.close(); + } + + @Test + public void testInitializeState() throws Exception { + StateInitializationContext stateContext = getStateContext(); + operator.initializeState(stateContext); + + assertNotNull( + stateContext + .getOperatorStateStore() + .getListState(operator.generateGlobalDataDistributionWeightDescriptor())); + } + + @Test + public void testProcessElement() throws Exception { + StateInitializationContext stateContext = getStateContext(); + operator.initializeState(stateContext); + operator.processElement(new StreamRecord<>("a")); + operator.processElement(new StreamRecord<>("a")); + operator.processElement(new StreamRecord<>("b")); + assertTrue(operator.localDataStatisticsMap().containsKey("a")); + assertTrue(operator.localDataStatisticsMap().containsKey("b")); + assertEquals(2L, (long) operator.localDataStatisticsMap().get("a")); + assertEquals(1L, (long) operator.localDataStatisticsMap().get("b")); + } + + // ---------------- helper methods ------------------------- + + private StateInitializationContext getStateContext() throws Exception { + // Create the state context. + OperatorStateStore operatorStateStore = createOperatorStateStore(); + return new StateInitializationContextImpl(null, operatorStateStore, null, null, null); + } + + private OperatorStateStore createOperatorStateStore() throws Exception { + MockEnvironment env = new MockEnvironmentBuilder().build(); + AbstractStateBackend abstractStateBackend = new HashMapStateBackend(); + CloseableRegistry cancelStreamRegistry = new CloseableRegistry(); + return abstractStateBackend.createOperatorStateBackend( + env, "test-operator", Collections.emptyList(), cancelStreamRegistry); + } +} From 225bc04b01e50b330c65731dad3f413a63e72cf3 Mon Sep 17 00:00:00 2001 From: gang_ye Date: Tue, 13 Dec 2022 23:45:19 -0800 Subject: [PATCH 02/15] handle comments and add TestHarness --- .../flink/sink/shuffle/ShuffleOperator.java | 38 +++++++++++-------- .../sink/shuffle/ShuffleRecordWrapper.java | 28 +++++++------- .../sink/shuffle/TestShuffleOperator.java | 32 ++++++++++++++++ 3 files changed, 70 insertions(+), 28 deletions(-) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java index d36bde66154b..549620019800 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java @@ -21,8 +21,10 @@ import java.io.Serializable; import java.util.Map; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.MapTypeInfo; @@ -57,8 +59,9 @@ public class ShuffleOperator private final OperatorEventGateway operatorEventGateway; // key is generated by applying KeySelector to record // value is the times key occurs - private transient Map localDataStatisticsMap; - private transient Map globalDataDistributionWeightMap; + // TODO: support to store statistics for high cardinality cases + private transient Map localDataStatistics; + private transient Map globalDataStatistics; private transient ListState> globalDataDistributionWeightState; public ShuffleOperator( @@ -78,7 +81,14 @@ ListStateDescriptor> generateGlobalDataDistributionWeightDescriptor @Override public void initializeState(StateInitializationContext context) throws Exception { - localDataStatisticsMap = Maps.newHashMap(); + localDataStatistics = Maps.newHashMap(); + + final MapStateDescriptor broadcastStateDesc = + new MapStateDescriptor<>("test-broadcast", keyType, TypeInformation.of(Long.class)); + + final BroadcastState broadcastState = + context.getOperatorStateStore().getBroadcastState(broadcastStateDesc); + globalDataDistributionWeightState = context .getOperatorStateStore() @@ -86,32 +96,30 @@ public void initializeState(StateInitializationContext context) throws Exception if (globalDataDistributionWeightState.get() != null && globalDataDistributionWeightState.get().iterator().hasNext()) { - globalDataDistributionWeightMap = globalDataDistributionWeightState.get().iterator().next(); + globalDataStatistics = globalDataDistributionWeightState.get().iterator().next(); } else { - globalDataDistributionWeightMap = Maps.newHashMap(); + globalDataStatistics = Maps.newHashMap(); } } @Override public void open() throws Exception { // TODO: handle scaling up - if (globalDataDistributionWeightMap != null && globalDataDistributionWeightMap.size() > 0) { - output.collect( - new StreamRecord<>( - ShuffleRecordWrapper.fromDistribution(globalDataDistributionWeightMap))); + if (globalDataStatistics != null && globalDataStatistics.size() > 0) { + output.collect(new StreamRecord<>(ShuffleRecordWrapper.fromStatistics(globalDataStatistics))); } } @Override public void handleOperatorEvent(OperatorEvent evt) { // TODO: receive event with globalDataDistributionWeight from coordinator and update - // globalDataDistributionWeightMap + // globalDataStatistics } @Override public void processElement(StreamRecord streamRecord) throws Exception { final K key = keySelector.getKey(streamRecord.getValue()); - localDataStatisticsMap.put(key, localDataStatisticsMap.getOrDefault(key, 0L) + 1); + localDataStatistics.put(key, localDataStatistics.getOrDefault(key, 0L) + 1); output.collect(new StreamRecord<>(ShuffleRecordWrapper.fromRecord(streamRecord.getValue()))); } @@ -121,20 +129,20 @@ public void snapshotState(StateSnapshotContext context) throws Exception { LOG.debug("Taking shuffle operator snapshot for checkpoint {}", checkpointId); // Set the globalDataDistributionWeightState state to the latest global value. - if (globalDataDistributionWeightMap != null && globalDataDistributionWeightMap.size() > 0) { + if (globalDataStatistics != null && globalDataStatistics.size() > 0) { globalDataDistributionWeightState.clear(); - globalDataDistributionWeightState.add(globalDataDistributionWeightMap); + globalDataDistributionWeightState.add(globalDataStatistics); } // TODO: send to coordinator // For now we make it simple to send globalDataDistributionWeightState at checkpoint // Reset the local data count - localDataStatisticsMap.clear(); + localDataStatistics.clear(); } @VisibleForTesting Map localDataStatisticsMap() { - return localDataStatisticsMap; + return localDataStatistics; } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java index 5f50193fe537..585cdcad512d 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.Map; import org.apache.flink.annotation.Internal; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** @@ -37,14 +38,14 @@ public class ShuffleRecordWrapper implements Serializ private static final long serialVersionUID = 1L; - private final Map globalDataDistributionWeight; + private final Map globalDataStatistics; private final T record; - private ShuffleRecordWrapper(T record, Map globalDataDistributionWeight) { + private ShuffleRecordWrapper(T record, Map globalDataStatistics) { Preconditions.checkArgument( - record != null ^ globalDataDistributionWeight != null, - "A ShuffleRecordWrapper has to contain exactly one of record and stats, not neither or both"); - this.globalDataDistributionWeight = globalDataDistributionWeight; + record != null ^ globalDataStatistics != null, + "A ShuffleRecordWrapper contain either record and stats, not neither or both"); + this.globalDataStatistics = globalDataStatistics; this.record = record; } @@ -52,13 +53,13 @@ static ShuffleRecordWrapper fromRecord(T recor return new ShuffleRecordWrapper<>(record, null); } - static ShuffleRecordWrapper fromDistribution( - Map globalDataDistributionWeight) { - return new ShuffleRecordWrapper<>(null, globalDataDistributionWeight); + static ShuffleRecordWrapper fromStatistics( + Map globalDataStatistics) { + return new ShuffleRecordWrapper<>(null, globalDataStatistics); } boolean hasGlobalDataDistributionWeight() { - return globalDataDistributionWeight != null; + return globalDataStatistics != null; } boolean hasRecord() { @@ -66,7 +67,7 @@ boolean hasRecord() { } Map globalDataDistributionWeight() { - return globalDataDistributionWeight; + return globalDataStatistics; } T record() { @@ -75,8 +76,9 @@ T record() { @Override public String toString() { - return String.format( - "ShuffleRecordWrapper[globalDataDistributionWeight = %s, record = %s)", - globalDataDistributionWeight, record); + return MoreObjects.toStringHelper(this) + .add("globalDataStatistics", globalDataStatistics) + .add("record", record) + .toString(); } } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java index d43d8348ba0b..5862f1dd4d43 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java @@ -18,11 +18,14 @@ */ package org.apache.iceberg.flink.sink.shuffle; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -44,6 +47,8 @@ import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; import org.apache.flink.streaming.util.MockOutput; import org.apache.flink.streaming.util.MockStreamConfig; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.After; import org.junit.Before; @@ -114,6 +119,24 @@ public void testProcessElement() throws Exception { assertEquals(1L, (long) operator.localDataStatisticsMap().get("b")); } + @Test + public void testOperatorOutput() throws Exception { + try (OneInputStreamOperatorTestHarness> + testHarness = createHarness(this.operator)) { + testHarness.processElement(new StreamRecord<>("a")); + testHarness.processElement(new StreamRecord<>("b")); + testHarness.processElement(new StreamRecord<>("b")); + + List recordsOutput = + testHarness.extractOutputValues().stream() + .filter(ShuffleRecordWrapper::hasRecord) + .map(ShuffleRecordWrapper::record) + .collect(Collectors.toList()); + assertThat(recordsOutput) + .containsExactlyInAnyOrderElementsOf(ImmutableList.of("a", "b", "b")); + } + } + // ---------------- helper methods ------------------------- private StateInitializationContext getStateContext() throws Exception { @@ -129,4 +152,13 @@ private OperatorStateStore createOperatorStateStore() throws Exception { return abstractStateBackend.createOperatorStateBackend( env, "test-operator", Collections.emptyList(), cancelStreamRegistry); } + + private OneInputStreamOperatorTestHarness> + createHarness(final ShuffleOperator operator) throws Exception { + OneInputStreamOperatorTestHarness> harness = + new OneInputStreamOperatorTestHarness<>(operator, 1, 1, 0); + harness.setup(); + harness.open(); + return harness; + } } From c48bfd14909b136e7bbece65b3de3fff928da9f6 Mon Sep 17 00:00:00 2001 From: gang_ye Date: Mon, 19 Dec 2022 23:47:07 -0800 Subject: [PATCH 03/15] remove map suffix for data statistics --- .../flink/sink/shuffle/ShuffleOperator.java | 35 ++++++------------- .../sink/shuffle/ShuffleRecordWrapper.java | 9 ++--- .../sink/shuffle/TestShuffleOperator.java | 8 ++--- 3 files changed, 18 insertions(+), 34 deletions(-) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java index 549620019800..8173d9a37bd4 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java @@ -18,13 +18,9 @@ */ package org.apache.iceberg.flink.sink.shuffle; -import java.io.Serializable; import java.util.Map; -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.MapTypeInfo; @@ -46,9 +42,7 @@ * distribution weight from coordinator. Then it will ingest the weight into data stream(wrap by a * class{@link ShuffleRecordWrapper}) and send to partitioner. */ -@Internal -public class ShuffleOperator - extends AbstractStreamOperator> +class ShuffleOperator extends AbstractStreamOperator> implements OneInputStreamOperator>, OperatorEventHandler { private static final long serialVersionUID = 1L; @@ -62,7 +56,7 @@ public class ShuffleOperator // TODO: support to store statistics for high cardinality cases private transient Map localDataStatistics; private transient Map globalDataStatistics; - private transient ListState> globalDataDistributionWeightState; + private transient ListState> globalDataStatisticsState; public ShuffleOperator( KeySelector keySelector, @@ -76,27 +70,20 @@ public ShuffleOperator( @VisibleForTesting ListStateDescriptor> generateGlobalDataDistributionWeightDescriptor() { return new ListStateDescriptor<>( - "globalDataDistributionWeight", new MapTypeInfo<>(keyType, TypeInformation.of(Long.class))); + "globalDataStatisticsState", new MapTypeInfo<>(keyType, TypeInformation.of(Long.class))); } @Override public void initializeState(StateInitializationContext context) throws Exception { localDataStatistics = Maps.newHashMap(); - - final MapStateDescriptor broadcastStateDesc = - new MapStateDescriptor<>("test-broadcast", keyType, TypeInformation.of(Long.class)); - - final BroadcastState broadcastState = - context.getOperatorStateStore().getBroadcastState(broadcastStateDesc); - - globalDataDistributionWeightState = + globalDataStatisticsState = context .getOperatorStateStore() .getListState(generateGlobalDataDistributionWeightDescriptor()); - if (globalDataDistributionWeightState.get() != null - && globalDataDistributionWeightState.get().iterator().hasNext()) { - globalDataStatistics = globalDataDistributionWeightState.get().iterator().next(); + if (globalDataStatisticsState.get() != null + && globalDataStatisticsState.get().iterator().hasNext()) { + globalDataStatistics = globalDataStatisticsState.get().iterator().next(); } else { globalDataStatistics = Maps.newHashMap(); } @@ -130,19 +117,19 @@ public void snapshotState(StateSnapshotContext context) throws Exception { // Set the globalDataDistributionWeightState state to the latest global value. if (globalDataStatistics != null && globalDataStatistics.size() > 0) { - globalDataDistributionWeightState.clear(); - globalDataDistributionWeightState.add(globalDataStatistics); + globalDataStatisticsState.clear(); + globalDataStatisticsState.add(globalDataStatistics); } // TODO: send to coordinator - // For now we make it simple to send globalDataDistributionWeightState at checkpoint + // For now we make it simple to send globalDataStatisticsState at checkpoint // Reset the local data count localDataStatistics.clear(); } @VisibleForTesting - Map localDataStatisticsMap() { + Map localDataStatistics() { return localDataStatistics; } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java index 585cdcad512d..b96f05ca8e3c 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java @@ -20,7 +20,6 @@ import java.io.Serializable; import java.util.Map; -import org.apache.flink.annotation.Internal; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -33,8 +32,7 @@ * required to filter out the data distribution weight, unwrap the object and extract the original * record type T. */ -@Internal -public class ShuffleRecordWrapper implements Serializable { +public class ShuffleRecordWrapper implements Serializable { private static final long serialVersionUID = 1L; @@ -49,12 +47,11 @@ record != null ^ globalDataStatistics != null, this.record = record; } - static ShuffleRecordWrapper fromRecord(T record) { + static ShuffleRecordWrapper fromRecord(T record) { return new ShuffleRecordWrapper<>(record, null); } - static ShuffleRecordWrapper fromStatistics( - Map globalDataStatistics) { + static ShuffleRecordWrapper fromStatistics(Map globalDataStatistics) { return new ShuffleRecordWrapper<>(null, globalDataStatistics); } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java index 5862f1dd4d43..046b5b740818 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java @@ -113,10 +113,10 @@ public void testProcessElement() throws Exception { operator.processElement(new StreamRecord<>("a")); operator.processElement(new StreamRecord<>("a")); operator.processElement(new StreamRecord<>("b")); - assertTrue(operator.localDataStatisticsMap().containsKey("a")); - assertTrue(operator.localDataStatisticsMap().containsKey("b")); - assertEquals(2L, (long) operator.localDataStatisticsMap().get("a")); - assertEquals(1L, (long) operator.localDataStatisticsMap().get("b")); + assertTrue(operator.localDataStatistics().containsKey("a")); + assertTrue(operator.localDataStatistics().containsKey("b")); + assertEquals(2L, (long) operator.localDataStatistics().get("a")); + assertEquals(1L, (long) operator.localDataStatistics().get("b")); } @Test From 99532ce8c281cacfcea3083aa80bf2e93b8a4213 Mon Sep 17 00:00:00 2001 From: gang_ye Date: Tue, 3 Jan 2023 23:37:07 -0800 Subject: [PATCH 04/15] Add data statistics interface --- .../flink/sink/shuffle/DataStatistics.java | 34 +++++++++++ .../sink/shuffle/DataStatisticsFactory.java | 28 +++++++++ .../flink/sink/shuffle/MapDataStatistics.java | 59 +++++++++++++++++++ .../flink/sink/shuffle/ShuffleOperator.java | 33 +++++------ .../sink/shuffle/ShuffleRecordWrapper.java | 9 ++- .../sink/shuffle/TestShuffleOperator.java | 16 ++--- 6 files changed, 149 insertions(+), 30 deletions(-) create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsFactory.java create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java new file mode 100644 index 000000000000..45707dcd8559 --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +/** + * Shuffle operator can help to improve data clustering based on the key. + * + *

It collects the data statistics information, sends to coordinator and gets the global data + * distribution weight from coordinator. Then it will ingest the weight into data stream(wrap by a + * class{@link ShuffleRecordWrapper}) and send to partitioner. + */ +interface DataStatistics { + long size(); + + void put(K key); + + void merge(DataStatistics other); +} diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsFactory.java new file mode 100644 index 000000000000..eba510ccae13 --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsFactory.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +public class DataStatisticsFactory { + + public DataStatistics createDataStatistics() { + // Only support MapDataStatistics for now. New DataStatistics type will be added for high + // cardinality case. + return new MapDataStatistics<>(); + } +} diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java new file mode 100644 index 000000000000..73b47a946577 --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * Shuffle operator can help to improve data clustering based on the key. + * + *

It collects the data statistics information, sends to coordinator and gets the global data + * distribution weight from coordinator. Then it will ingest the weight into data stream(wrap by a + * class{@link ShuffleRecordWrapper}) and send to partitioner. + */ +class MapDataStatistics implements DataStatistics { + private final Map dataStatistics = Maps.newHashMap(); + + @Override + public long size() { + return dataStatistics.size(); + } + + @Override + public void put(K key) { + dataStatistics.put(key, dataStatistics.getOrDefault(key, 0L) + 1L); + } + + @Override + public void merge(DataStatistics other) { + Preconditions.checkArgument( + other instanceof MapDataStatistics, "Can not merge this type of statistics: " + other); + MapDataStatistics mapDataStatistic = (MapDataStatistics) other; + mapDataStatistic.dataStatistics.forEach( + (key, count) -> dataStatistics.put(key, dataStatistics.getOrDefault(key, 0L) + count)); + } + + @VisibleForTesting + Map dataStatistics() { + return dataStatistics; + } +} diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java index 8173d9a37bd4..d0e7da3b6903 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java @@ -18,12 +18,11 @@ */ package org.apache.iceberg.flink.sink.shuffle; -import java.util.Map; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.typeutils.MapTypeInfo; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; @@ -33,7 +32,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; /** * Shuffle operator can help to improve data clustering based on the key. @@ -48,34 +46,33 @@ class ShuffleOperator extends AbstractStreamOperator keySelector; - // the type of the key to collect data statistics - private final TypeInformation keyType; private final OperatorEventGateway operatorEventGateway; // key is generated by applying KeySelector to record // value is the times key occurs // TODO: support to store statistics for high cardinality cases - private transient Map localDataStatistics; - private transient Map globalDataStatistics; - private transient ListState> globalDataStatisticsState; + private transient DataStatistics localDataStatistics; + private transient DataStatistics globalDataStatistics; + private transient ListState> globalDataStatisticsState; + private transient DataStatisticsFactory dataStatisticsFactory; public ShuffleOperator( KeySelector keySelector, - TypeInformation keyType, - OperatorEventGateway operatorEventGateway) { + OperatorEventGateway operatorEventGateway, + DataStatisticsFactory dataStatisticsFactory) { this.keySelector = keySelector; - this.keyType = keyType; this.operatorEventGateway = operatorEventGateway; + this.dataStatisticsFactory = dataStatisticsFactory; } @VisibleForTesting - ListStateDescriptor> generateGlobalDataDistributionWeightDescriptor() { + ListStateDescriptor> generateGlobalDataDistributionWeightDescriptor() { return new ListStateDescriptor<>( - "globalDataStatisticsState", new MapTypeInfo<>(keyType, TypeInformation.of(Long.class))); + "globalDataStatisticsState", TypeInformation.of(new TypeHint>() {})); } @Override public void initializeState(StateInitializationContext context) throws Exception { - localDataStatistics = Maps.newHashMap(); + localDataStatistics = dataStatisticsFactory.createDataStatistics(); globalDataStatisticsState = context .getOperatorStateStore() @@ -85,7 +82,7 @@ public void initializeState(StateInitializationContext context) throws Exception && globalDataStatisticsState.get().iterator().hasNext()) { globalDataStatistics = globalDataStatisticsState.get().iterator().next(); } else { - globalDataStatistics = Maps.newHashMap(); + globalDataStatistics = dataStatisticsFactory.createDataStatistics(); } } @@ -106,7 +103,7 @@ public void handleOperatorEvent(OperatorEvent evt) { @Override public void processElement(StreamRecord streamRecord) throws Exception { final K key = keySelector.getKey(streamRecord.getValue()); - localDataStatistics.put(key, localDataStatistics.getOrDefault(key, 0L) + 1); + localDataStatistics.put(key); output.collect(new StreamRecord<>(ShuffleRecordWrapper.fromRecord(streamRecord.getValue()))); } @@ -125,11 +122,11 @@ public void snapshotState(StateSnapshotContext context) throws Exception { // For now we make it simple to send globalDataStatisticsState at checkpoint // Reset the local data count - localDataStatistics.clear(); + localDataStatistics = dataStatisticsFactory.createDataStatistics(); } @VisibleForTesting - Map localDataStatistics() { + DataStatistics localDataStatistics() { return localDataStatistics; } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java index b96f05ca8e3c..8c37b6f3fbd8 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java @@ -19,7 +19,6 @@ package org.apache.iceberg.flink.sink.shuffle; import java.io.Serializable; -import java.util.Map; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -36,10 +35,10 @@ public class ShuffleRecordWrapper implements Serializable { private static final long serialVersionUID = 1L; - private final Map globalDataStatistics; + private final DataStatistics globalDataStatistics; private final T record; - private ShuffleRecordWrapper(T record, Map globalDataStatistics) { + private ShuffleRecordWrapper(T record, DataStatistics globalDataStatistics) { Preconditions.checkArgument( record != null ^ globalDataStatistics != null, "A ShuffleRecordWrapper contain either record and stats, not neither or both"); @@ -51,7 +50,7 @@ static ShuffleRecordWrapper fromRecord(T record) { return new ShuffleRecordWrapper<>(record, null); } - static ShuffleRecordWrapper fromStatistics(Map globalDataStatistics) { + static ShuffleRecordWrapper fromStatistics(DataStatistics globalDataStatistics) { return new ShuffleRecordWrapper<>(null, globalDataStatistics); } @@ -63,7 +62,7 @@ boolean hasRecord() { return record != null; } - Map globalDataDistributionWeight() { + DataStatistics globalDataDistributionWeight() { return globalDataStatistics; } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java index 046b5b740818..777e3f5e1db1 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java @@ -28,7 +28,6 @@ import java.util.stream.Collectors; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.OperatorStateStore; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.CloseableRegistry; @@ -80,9 +79,9 @@ public String getKey(String value) { return value; } }; + DataStatisticsFactory dataStatisticsFactory = new DataStatisticsFactory<>(); - this.operator = - new ShuffleOperator<>(keySelector, TypeInformation.of(String.class), mockGateway); + this.operator = new ShuffleOperator<>(keySelector, mockGateway, dataStatisticsFactory); Environment env = getTestingEnvironment(); this.operator.setup( new OneInputStreamTask(env), @@ -113,10 +112,13 @@ public void testProcessElement() throws Exception { operator.processElement(new StreamRecord<>("a")); operator.processElement(new StreamRecord<>("a")); operator.processElement(new StreamRecord<>("b")); - assertTrue(operator.localDataStatistics().containsKey("a")); - assertTrue(operator.localDataStatistics().containsKey("b")); - assertEquals(2L, (long) operator.localDataStatistics().get("a")); - assertEquals(1L, (long) operator.localDataStatistics().get("b")); + assertTrue(operator.localDataStatistics() instanceof MapDataStatistics); + MapDataStatistics mapDataStatistics = + (MapDataStatistics) operator.localDataStatistics(); + assertTrue(mapDataStatistics.dataStatistics().containsKey("a")); + assertTrue(mapDataStatistics.dataStatistics().containsKey("b")); + assertEquals(2L, (long) mapDataStatistics.dataStatistics().get("a")); + assertEquals(1L, (long) mapDataStatistics.dataStatistics().get("b")); } @Test From 694c7057cfbc71c80d383ed0d95944dc5c7d74bf Mon Sep 17 00:00:00 2001 From: gang_ye Date: Wed, 4 Jan 2023 10:54:01 -0800 Subject: [PATCH 05/15] update comments for class --- .../apache/iceberg/flink/sink/shuffle/DataStatistics.java | 7 +++---- .../iceberg/flink/sink/shuffle/DataStatisticsFactory.java | 7 +++++-- .../iceberg/flink/sink/shuffle/MapDataStatistics.java | 8 +------- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java index 45707dcd8559..8c0981f5e69d 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java @@ -19,11 +19,10 @@ package org.apache.iceberg.flink.sink.shuffle; /** - * Shuffle operator can help to improve data clustering based on the key. + * DataStatistics defines the interface to collect data statistics. * - *

It collects the data statistics information, sends to coordinator and gets the global data - * distribution weight from coordinator. Then it will ingest the weight into data stream(wrap by a - * class{@link ShuffleRecordWrapper}) and send to partitioner. + *

{@link ShuffleOperator} will store local data statistics and later distribute + * the global statistics(received from ShuffleCoordiantor) to Partitioner. */ interface DataStatistics { long size(); diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsFactory.java index eba510ccae13..283ceb753ce8 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsFactory.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsFactory.java @@ -18,9 +18,12 @@ */ package org.apache.iceberg.flink.sink.shuffle; -public class DataStatisticsFactory { +/** + * DataStatisticsFactory provides the DataStatistics definition for different mode like HASH, RANGE + */ +class DataStatisticsFactory { - public DataStatistics createDataStatistics() { + DataStatistics createDataStatistics() { // Only support MapDataStatistics for now. New DataStatistics type will be added for high // cardinality case. return new MapDataStatistics<>(); diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java index 73b47a946577..1e8f8edfdf50 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java @@ -23,13 +23,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -/** - * Shuffle operator can help to improve data clustering based on the key. - * - *

It collects the data statistics information, sends to coordinator and gets the global data - * distribution weight from coordinator. Then it will ingest the weight into data stream(wrap by a - * class{@link ShuffleRecordWrapper}) and send to partitioner. - */ +/** MapDataStatistics uses map to count key frequency */ class MapDataStatistics implements DataStatistics { private final Map dataStatistics = Maps.newHashMap(); From a6b5e0bdf822c71c3a9cd6a1e0386d2306a9b8a8 Mon Sep 17 00:00:00 2001 From: gang_ye Date: Thu, 5 Jan 2023 23:57:00 -0800 Subject: [PATCH 06/15] add toString in MapDataStatistics class --- .../apache/iceberg/flink/sink/shuffle/DataStatistics.java | 4 ++-- .../apache/iceberg/flink/sink/shuffle/MapDataStatistics.java | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java index 8c0981f5e69d..bac302d08ec9 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java @@ -21,8 +21,8 @@ /** * DataStatistics defines the interface to collect data statistics. * - *

{@link ShuffleOperator} will store local data statistics and later distribute - * the global statistics(received from ShuffleCoordiantor) to Partitioner. + *

{@link ShuffleOperator} will store local data statistics and later distribute the global + * statistics(received from ShuffleCoordiantor) to Partitioner. */ interface DataStatistics { long size(); diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java index 1e8f8edfdf50..32a7800498b9 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java @@ -50,4 +50,9 @@ public void merge(DataStatistics other) { Map dataStatistics() { return dataStatistics; } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("dataStatistics", dataStatistics).toString(); + } } From 7247369a715930c3800085e9d597c7a9ad63b2ac Mon Sep 17 00:00:00 2001 From: gang_ye Date: Tue, 10 Jan 2023 00:13:48 -0800 Subject: [PATCH 07/15] handle comments to make class comments accurate and update variable name to make it consistent --- .../flink/sink/shuffle/DataStatistics.java | 7 +- .../flink/sink/shuffle/MapDataStatistics.java | 13 ++-- .../flink/sink/shuffle/ShuffleOperator.java | 67 +++++++++---------- .../sink/shuffle/ShuffleRecordWrapper.java | 33 ++++----- .../sink/shuffle/TestShuffleOperator.java | 14 ---- 5 files changed, 60 insertions(+), 74 deletions(-) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java index bac302d08ec9..11fc0df27191 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java @@ -21,13 +21,14 @@ /** * DataStatistics defines the interface to collect data statistics. * - *

{@link ShuffleOperator} will store local data statistics and later distribute the global - * statistics(received from ShuffleCoordiantor) to Partitioner. + *

Data statistics tracks traffic volume distribution across data keys. For low-cardinality key, + * a simple map of (key, count) can be used. For high-cardinality key, probabilistic data structures + * (sketching) can be used. */ interface DataStatistics { long size(); - void put(K key); + void add(K key); void merge(DataStatistics other); } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java index 32a7800498b9..dc35800d0a59 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java @@ -19,7 +19,7 @@ package org.apache.iceberg.flink.sink.shuffle; import java.util.Map; -import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -33,20 +33,21 @@ public long size() { } @Override - public void put(K key) { + public void add(K key) { + // increase frequency of occurrence by one in the dataStatistics map dataStatistics.put(key, dataStatistics.getOrDefault(key, 0L) + 1L); } @Override - public void merge(DataStatistics other) { + public void merge(DataStatistics otherStatistics) { Preconditions.checkArgument( - other instanceof MapDataStatistics, "Can not merge this type of statistics: " + other); - MapDataStatistics mapDataStatistic = (MapDataStatistics) other; + otherStatistics instanceof MapDataStatistics, + "Can not merge this type of statistics: " + otherStatistics); + MapDataStatistics mapDataStatistic = (MapDataStatistics) otherStatistics; mapDataStatistic.dataStatistics.forEach( (key, count) -> dataStatistics.put(key, dataStatistics.getOrDefault(key, 0L) + count)); } - @VisibleForTesting Map dataStatistics() { return dataStatistics; } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java index d0e7da3b6903..b6e6e62ee9fe 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java @@ -36,61 +36,58 @@ /** * Shuffle operator can help to improve data clustering based on the key. * - *

It collects the data statistics information, sends to coordinator and gets the global data - * distribution weight from coordinator. Then it will ingest the weight into data stream(wrap by a - * class{@link ShuffleRecordWrapper}) and send to partitioner. + *

Shuffle operator collects traffic distribution statistics. A custom partitioner shall be + * attached to the shuffle operator output. The custom partitioner leverages the statistics to + * shuffle record to improve data clustering while maintaining relative balanced traffic + * distribution to downstream subtasks. */ class ShuffleOperator extends AbstractStreamOperator> implements OneInputStreamOperator>, OperatorEventHandler { private static final long serialVersionUID = 1L; + // keySelector will be used to generate key from data for collecting data statistics private final KeySelector keySelector; private final OperatorEventGateway operatorEventGateway; - // key is generated by applying KeySelector to record - // value is the times key occurs - // TODO: support to store statistics for high cardinality cases - private transient DataStatistics localDataStatistics; - private transient DataStatistics globalDataStatistics; - private transient ListState> globalDataStatisticsState; - private transient DataStatisticsFactory dataStatisticsFactory; + private transient DataStatistics localStatistics; + private transient DataStatistics globalStatistics; + private transient ListState> globalStatisticsState; + private transient DataStatisticsFactory statisticsFactory; public ShuffleOperator( KeySelector keySelector, OperatorEventGateway operatorEventGateway, - DataStatisticsFactory dataStatisticsFactory) { + DataStatisticsFactory statisticsFactory) { this.keySelector = keySelector; this.operatorEventGateway = operatorEventGateway; - this.dataStatisticsFactory = dataStatisticsFactory; - } - - @VisibleForTesting - ListStateDescriptor> generateGlobalDataDistributionWeightDescriptor() { - return new ListStateDescriptor<>( - "globalDataStatisticsState", TypeInformation.of(new TypeHint>() {})); + this.statisticsFactory = statisticsFactory; } @Override public void initializeState(StateInitializationContext context) throws Exception { - localDataStatistics = dataStatisticsFactory.createDataStatistics(); - globalDataStatisticsState = + localStatistics = statisticsFactory.createDataStatistics(); + globalStatisticsState = context .getOperatorStateStore() - .getListState(generateGlobalDataDistributionWeightDescriptor()); - - if (globalDataStatisticsState.get() != null - && globalDataStatisticsState.get().iterator().hasNext()) { - globalDataStatistics = globalDataStatisticsState.get().iterator().next(); + .getListState( + new ListStateDescriptor<>( + "globalDataStatisticsState", + TypeInformation.of(new TypeHint>() {}))); + + if (context.isRestored() + && globalStatisticsState.get() != null + && globalStatisticsState.get().iterator().hasNext()) { + globalStatistics = globalStatisticsState.get().iterator().next(); } else { - globalDataStatistics = dataStatisticsFactory.createDataStatistics(); + globalStatistics = statisticsFactory.createDataStatistics(); } } @Override public void open() throws Exception { // TODO: handle scaling up - if (globalDataStatistics != null && globalDataStatistics.size() > 0) { - output.collect(new StreamRecord<>(ShuffleRecordWrapper.fromStatistics(globalDataStatistics))); + if (globalStatistics != null && globalStatistics.size() > 0) { + output.collect(new StreamRecord<>(ShuffleRecordWrapper.fromStatistics(globalStatistics))); } } @@ -103,7 +100,7 @@ public void handleOperatorEvent(OperatorEvent evt) { @Override public void processElement(StreamRecord streamRecord) throws Exception { final K key = keySelector.getKey(streamRecord.getValue()); - localDataStatistics.put(key); + localStatistics.add(key); output.collect(new StreamRecord<>(ShuffleRecordWrapper.fromRecord(streamRecord.getValue()))); } @@ -113,20 +110,20 @@ public void snapshotState(StateSnapshotContext context) throws Exception { LOG.debug("Taking shuffle operator snapshot for checkpoint {}", checkpointId); // Set the globalDataDistributionWeightState state to the latest global value. - if (globalDataStatistics != null && globalDataStatistics.size() > 0) { - globalDataStatisticsState.clear(); - globalDataStatisticsState.add(globalDataStatistics); + if (globalStatistics != null && globalStatistics.size() > 0) { + globalStatisticsState.clear(); + globalStatisticsState.add(globalStatistics); } // TODO: send to coordinator // For now we make it simple to send globalDataStatisticsState at checkpoint - // Reset the local data count - localDataStatistics = dataStatisticsFactory.createDataStatistics(); + // Reset the local statistics + localStatistics = statisticsFactory.createDataStatistics(); } @VisibleForTesting DataStatistics localDataStatistics() { - return localDataStatistics; + return localStatistics; } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java index 8c37b6f3fbd8..ecb9fc6795e3 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java @@ -23,26 +23,27 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** - * The wrapper class for record and data distribution weight + * The wrapper class for record and data statistics. It is the only way for shuffle operator to send + * global data statistics to custom partitioner to distribute data based on statistics * - *

ShuffleRecordWrapper is sent from ShuffleOperator to partitioner. It may contain a record or - * data distribution weight. Once partitioner receives the weight, it will use that to decide the - * coming record should send to which writer subtask. After shuffling, a filter and mapper are - * required to filter out the data distribution weight, unwrap the object and extract the original - * record type T. + *

ShuffleRecordWrapper is sent from ShuffleOestperator to partitioner. It contain either a + * record or data statistics(globally aggregated). Once partitioner receives the weight, it will use + * that to decide the coming record should send to which writer subtask. After shuffling, a filter + * and mapper are required to filter out the data distribution weight, unwrap the object and extract + * the original record type T. */ public class ShuffleRecordWrapper implements Serializable { private static final long serialVersionUID = 1L; - private final DataStatistics globalDataStatistics; + private final DataStatistics statistics; private final T record; - private ShuffleRecordWrapper(T record, DataStatistics globalDataStatistics) { + private ShuffleRecordWrapper(T record, DataStatistics statistics) { Preconditions.checkArgument( - record != null ^ globalDataStatistics != null, - "A ShuffleRecordWrapper contain either record and stats, not neither or both"); - this.globalDataStatistics = globalDataStatistics; + record != null ^ statistics != null, + "A ShuffleRecordWrapper contain either record or data statistics, not neither or both"); + this.statistics = statistics; this.record = record; } @@ -54,16 +55,16 @@ static ShuffleRecordWrapper fromStatistics(DataStatistics global return new ShuffleRecordWrapper<>(null, globalDataStatistics); } - boolean hasGlobalDataDistributionWeight() { - return globalDataStatistics != null; + boolean hasDataStatistics() { + return statistics != null; } boolean hasRecord() { return record != null; } - DataStatistics globalDataDistributionWeight() { - return globalDataStatistics; + DataStatistics dataStatistics() { + return statistics; } T record() { @@ -73,7 +74,7 @@ T record() { @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("globalDataStatistics", globalDataStatistics) + .add("statistics", statistics) .add("record", record) .toString(); } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java index 777e3f5e1db1..25bcf3be77d2 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java @@ -20,7 +20,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.util.Collections; @@ -94,17 +93,6 @@ public void clean() throws Exception { operator.close(); } - @Test - public void testInitializeState() throws Exception { - StateInitializationContext stateContext = getStateContext(); - operator.initializeState(stateContext); - - assertNotNull( - stateContext - .getOperatorStateStore() - .getListState(operator.generateGlobalDataDistributionWeightDescriptor())); - } - @Test public void testProcessElement() throws Exception { StateInitializationContext stateContext = getStateContext(); @@ -139,8 +127,6 @@ public void testOperatorOutput() throws Exception { } } - // ---------------- helper methods ------------------------- - private StateInitializationContext getStateContext() throws Exception { // Create the state context. OperatorStateStore operatorStateStore = createOperatorStateStore(); From 4c19c1a9b2c4e45f2e66fafffba164c4c8801fa3 Mon Sep 17 00:00:00 2001 From: gang_ye Date: Wed, 11 Jan 2023 23:42:37 -0800 Subject: [PATCH 08/15] convert DataStatisticsFactory into interface and define MapDataStatisticsFactory --- .../flink/sink/shuffle/ShuffleOperator.java | 35 ++++++------ .../sink/shuffle/ShuffleRecordWrapper.java | 17 +++--- .../shuffle/statistics/DataStatistics.java | 53 +++++++++++++++++++ .../DataStatisticsFactory.java} | 19 ++++--- .../{ => statistics}/MapDataStatistics.java | 26 ++++----- .../MapDataStatisticsFactory.java} | 15 +++--- .../sink/shuffle/TestShuffleOperator.java | 5 +- 7 files changed, 118 insertions(+), 52 deletions(-) create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/DataStatistics.java rename flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/{DataStatistics.java => statistics/DataStatisticsFactory.java} (64%) rename flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/{ => statistics}/MapDataStatistics.java (68%) rename flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/{DataStatisticsFactory.java => statistics/MapDataStatisticsFactory.java} (69%) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java index b6e6e62ee9fe..13234f1926ed 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java @@ -31,6 +31,8 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.flink.sink.shuffle.statistics.DataStatistics; +import org.apache.iceberg.flink.sink.shuffle.statistics.DataStatisticsFactory; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; /** @@ -43,16 +45,15 @@ */ class ShuffleOperator extends AbstractStreamOperator> implements OneInputStreamOperator>, OperatorEventHandler { - private static final long serialVersionUID = 1L; // keySelector will be used to generate key from data for collecting data statistics private final KeySelector keySelector; private final OperatorEventGateway operatorEventGateway; + private final transient DataStatisticsFactory statisticsFactory; private transient DataStatistics localStatistics; private transient DataStatistics globalStatistics; private transient ListState> globalStatisticsState; - private transient DataStatisticsFactory statisticsFactory; public ShuffleOperator( KeySelector keySelector, @@ -71,13 +72,19 @@ public void initializeState(StateInitializationContext context) throws Exception .getOperatorStateStore() .getListState( new ListStateDescriptor<>( - "globalDataStatisticsState", + "globalStatisticsState", TypeInformation.of(new TypeHint>() {}))); - if (context.isRestored() - && globalStatisticsState.get() != null - && globalStatisticsState.get().iterator().hasNext()) { - globalStatistics = globalStatisticsState.get().iterator().next(); + if (context.isRestored()) { + int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); + if (globalStatisticsState.get() == null + || !globalStatisticsState.get().iterator().hasNext()) { + LOG.warn("Subtask {} doesn't have global statistics state to restore", subtaskIndex); + globalStatistics = statisticsFactory.createDataStatistics(); + } else { + LOG.info("Restoring global statistics state for subtask {}", subtaskIndex); + globalStatistics = globalStatisticsState.get().iterator().next(); + } } else { globalStatistics = statisticsFactory.createDataStatistics(); } @@ -85,16 +92,14 @@ public void initializeState(StateInitializationContext context) throws Exception @Override public void open() throws Exception { - // TODO: handle scaling up - if (globalStatistics != null && globalStatistics.size() > 0) { + if (!globalStatistics.isEmpty()) { output.collect(new StreamRecord<>(ShuffleRecordWrapper.fromStatistics(globalStatistics))); } } @Override public void handleOperatorEvent(OperatorEvent evt) { - // TODO: receive event with globalDataDistributionWeight from coordinator and update - // globalDataStatistics + // TODO: receive event with aggregated statistics from coordinator and update globalStatistics } @Override @@ -109,16 +114,16 @@ public void snapshotState(StateSnapshotContext context) throws Exception { long checkpointId = context.getCheckpointId(); LOG.debug("Taking shuffle operator snapshot for checkpoint {}", checkpointId); - // Set the globalDataDistributionWeightState state to the latest global value. - if (globalStatistics != null && globalStatistics.size() > 0) { + // Update globalStatisticsState with latest global statistics + if (!globalStatistics.isEmpty()) { globalStatisticsState.clear(); globalStatisticsState.add(globalStatistics); } // TODO: send to coordinator - // For now we make it simple to send globalDataStatisticsState at checkpoint + // For now we make it simple to send globalStatisticsState at checkpoint - // Reset the local statistics + // Recreate the local statistics localStatistics = statisticsFactory.createDataStatistics(); } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java index ecb9fc6795e3..abf1aab459bf 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.sink.shuffle; import java.io.Serializable; +import org.apache.iceberg.flink.sink.shuffle.statistics.DataStatistics; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -26,11 +27,11 @@ * The wrapper class for record and data statistics. It is the only way for shuffle operator to send * global data statistics to custom partitioner to distribute data based on statistics * - *

ShuffleRecordWrapper is sent from ShuffleOestperator to partitioner. It contain either a - * record or data statistics(globally aggregated). Once partitioner receives the weight, it will use - * that to decide the coming record should send to which writer subtask. After shuffling, a filter - * and mapper are required to filter out the data distribution weight, unwrap the object and extract - * the original record type T. + *

ShuffleRecordWrapper is sent from ShuffleOperator to partitioner. It contain either a record + * or data statistics(globally aggregated). Once partitioner receives the weight, it will use that + * to decide the coming record should send to which writer subtask. After shuffling, a filter and + * mapper are required to filter out the data distribution weight, unwrap the object and extract the + * original record type T. */ public class ShuffleRecordWrapper implements Serializable { @@ -42,7 +43,7 @@ public class ShuffleRecordWrapper implements Serializable { private ShuffleRecordWrapper(T record, DataStatistics statistics) { Preconditions.checkArgument( record != null ^ statistics != null, - "A ShuffleRecordWrapper contain either record or data statistics, not neither or both"); + "A ShuffleRecordWrapper contain either record or statistics, not neither or both"); this.statistics = statistics; this.record = record; } @@ -51,8 +52,8 @@ static ShuffleRecordWrapper fromRecord(T record) { return new ShuffleRecordWrapper<>(record, null); } - static ShuffleRecordWrapper fromStatistics(DataStatistics globalDataStatistics) { - return new ShuffleRecordWrapper<>(null, globalDataStatistics); + static ShuffleRecordWrapper fromStatistics(DataStatistics statistics) { + return new ShuffleRecordWrapper<>(null, statistics); } boolean hasDataStatistics() { diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/DataStatistics.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/DataStatistics.java new file mode 100644 index 000000000000..acc42a5483e6 --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/DataStatistics.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle.statistics; + +import org.apache.flink.annotation.Internal; + +/** + * DataStatistics defines the interface to collect data distribution information. + * + *

Data statistics tracks traffic volume distribution across data keys. For low-cardinality key, + * a simple map of (key, count) can be used. For high-cardinality key, probabilistic data structures + * (sketching) can be used. + */ +@Internal +public interface DataStatistics { + + /** + * Check if data statistics contains any statistics information + * + * @return true if data statistics doesn't contain any statistics information + */ + boolean isEmpty(); + + /** + * Add data key to data statistics. + * + * @param key generate from data by applying key selector + */ + void add(K key); + + /** + * Merge current statistics with other statistics + * + * @param otherStatistics the statistics to be merged + */ + void merge(DataStatistics otherStatistics); +} diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/DataStatisticsFactory.java similarity index 64% rename from flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java rename to flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/DataStatisticsFactory.java index 11fc0df27191..0bc11f66e2e2 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/DataStatisticsFactory.java @@ -16,19 +16,18 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iceberg.flink.sink.shuffle; +package org.apache.iceberg.flink.sink.shuffle.statistics; + +import org.apache.flink.annotation.Internal; /** - * DataStatistics defines the interface to collect data statistics. + * DataStatisticsFactory defines the interface to create DataStatistics. * - *

Data statistics tracks traffic volume distribution across data keys. For low-cardinality key, - * a simple map of (key, count) can be used. For high-cardinality key, probabilistic data structures - * (sketching) can be used. + *

For low-cardinality key, MapDataStatisticsFactory will be implemented to create + * MapDataStatistics. */ -interface DataStatistics { - long size(); - - void add(K key); +@Internal +public interface DataStatisticsFactory { - void merge(DataStatistics other); + DataStatistics createDataStatistics(); } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatistics.java similarity index 68% rename from flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java rename to flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatistics.java index dc35800d0a59..59638ea3cb9d 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatistics.java @@ -16,26 +16,28 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iceberg.flink.sink.shuffle; +package org.apache.iceberg.flink.sink.shuffle.statistics; import java.util.Map; +import org.apache.flink.annotation.Internal; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; /** MapDataStatistics uses map to count key frequency */ -class MapDataStatistics implements DataStatistics { - private final Map dataStatistics = Maps.newHashMap(); +@Internal +public class MapDataStatistics implements DataStatistics { + private final Map statistics = Maps.newHashMap(); @Override - public long size() { - return dataStatistics.size(); + public boolean isEmpty() { + return statistics.size() == 0; } @Override public void add(K key) { - // increase frequency of occurrence by one in the dataStatistics map - dataStatistics.put(key, dataStatistics.getOrDefault(key, 0L) + 1L); + // increase count of occurrence by one in the dataStatistics map + statistics.put(key, statistics.getOrDefault(key, 0L) + 1L); } @Override @@ -44,16 +46,16 @@ public void merge(DataStatistics otherStatistics) { otherStatistics instanceof MapDataStatistics, "Can not merge this type of statistics: " + otherStatistics); MapDataStatistics mapDataStatistic = (MapDataStatistics) otherStatistics; - mapDataStatistic.dataStatistics.forEach( - (key, count) -> dataStatistics.put(key, dataStatistics.getOrDefault(key, 0L) + count)); + mapDataStatistic.statistics.forEach( + (key, count) -> statistics.put(key, statistics.getOrDefault(key, 0L) + count)); } - Map dataStatistics() { - return dataStatistics; + public Map dataStatistics() { + return statistics; } @Override public String toString() { - return MoreObjects.toStringHelper(this).add("dataStatistics", dataStatistics).toString(); + return MoreObjects.toStringHelper(this).add("statistics", statistics).toString(); } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatisticsFactory.java similarity index 69% rename from flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsFactory.java rename to flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatisticsFactory.java index 283ceb753ce8..51f721c351c2 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsFactory.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatisticsFactory.java @@ -16,16 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iceberg.flink.sink.shuffle; +package org.apache.iceberg.flink.sink.shuffle.statistics; + +import org.apache.flink.annotation.Internal; /** - * DataStatisticsFactory provides the DataStatistics definition for different mode like HASH, RANGE + * MapDataStatisticsFactory creates MapDataStatistics to track traffic volume for low-cardinality + * key in hash mode */ -class DataStatisticsFactory { +@Internal +public class MapDataStatisticsFactory implements DataStatisticsFactory { - DataStatistics createDataStatistics() { - // Only support MapDataStatistics for now. New DataStatistics type will be added for high - // cardinality case. + @Override + public DataStatistics createDataStatistics() { return new MapDataStatistics<>(); } } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java index 25bcf3be77d2..6a29dc8a6c57 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java @@ -46,6 +46,9 @@ import org.apache.flink.streaming.util.MockOutput; import org.apache.flink.streaming.util.MockStreamConfig; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.iceberg.flink.sink.shuffle.statistics.DataStatisticsFactory; +import org.apache.iceberg.flink.sink.shuffle.statistics.MapDataStatistics; +import org.apache.iceberg.flink.sink.shuffle.statistics.MapDataStatisticsFactory; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.After; @@ -78,7 +81,7 @@ public String getKey(String value) { return value; } }; - DataStatisticsFactory dataStatisticsFactory = new DataStatisticsFactory<>(); + DataStatisticsFactory dataStatisticsFactory = new MapDataStatisticsFactory<>(); this.operator = new ShuffleOperator<>(keySelector, mockGateway, dataStatisticsFactory); Environment env = getTestingEnvironment(); From c00e9d1e977f6c7e8852c57ba4d598d397c9608e Mon Sep 17 00:00:00 2001 From: gang_ye Date: Tue, 17 Jan 2023 22:20:07 -0800 Subject: [PATCH 09/15] Add link in comments --- .../apache/iceberg/flink/sink/shuffle/ShuffleOperator.java | 2 +- .../iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java | 2 +- .../flink/sink/shuffle/statistics/DataStatisticsFactory.java | 2 +- .../sink/shuffle/statistics/MapDataStatisticsFactory.java | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java index 13234f1926ed..2f4ebedfc2a2 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java @@ -93,7 +93,7 @@ public void initializeState(StateInitializationContext context) throws Exception @Override public void open() throws Exception { if (!globalStatistics.isEmpty()) { - output.collect(new StreamRecord<>(ShuffleRecordWrapper.fromStatistics(globalStatistics))); + output.collect(new StreamRecord<>(ShuffleRecordWrapper.fromDataStatistics(globalStatistics))); } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java index abf1aab459bf..d81897dfccda 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java @@ -52,7 +52,7 @@ static ShuffleRecordWrapper fromRecord(T record) { return new ShuffleRecordWrapper<>(record, null); } - static ShuffleRecordWrapper fromStatistics(DataStatistics statistics) { + static ShuffleRecordWrapper fromDataStatistics(DataStatistics statistics) { return new ShuffleRecordWrapper<>(null, statistics); } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/DataStatisticsFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/DataStatisticsFactory.java index 0bc11f66e2e2..e1baf52a6091 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/DataStatisticsFactory.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/DataStatisticsFactory.java @@ -21,7 +21,7 @@ import org.apache.flink.annotation.Internal; /** - * DataStatisticsFactory defines the interface to create DataStatistics. + * DataStatisticsFactory defines the interface to create {@link DataStatistics}. * *

For low-cardinality key, MapDataStatisticsFactory will be implemented to create * MapDataStatistics. diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatisticsFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatisticsFactory.java index 51f721c351c2..694a8cb0a12b 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatisticsFactory.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatisticsFactory.java @@ -21,8 +21,8 @@ import org.apache.flink.annotation.Internal; /** - * MapDataStatisticsFactory creates MapDataStatistics to track traffic volume for low-cardinality - * key in hash mode + * MapDataStatisticsFactory creates {@link MapDataStatistics} to track traffic volume for + * low-cardinality key in hash mode */ @Internal public class MapDataStatisticsFactory implements DataStatisticsFactory { From 62a0da409690ab460099fdb097dabc773017e0b0 Mon Sep 17 00:00:00 2001 From: gang_ye Date: Tue, 21 Feb 2023 23:33:18 -0800 Subject: [PATCH 10/15] remove transient --- .../iceberg/flink/sink/shuffle/ShuffleOperator.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java index 2f4ebedfc2a2..4554a9261546 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java @@ -50,10 +50,10 @@ class ShuffleOperator extends AbstractStreamOperator keySelector; private final OperatorEventGateway operatorEventGateway; - private final transient DataStatisticsFactory statisticsFactory; - private transient DataStatistics localStatistics; - private transient DataStatistics globalStatistics; - private transient ListState> globalStatisticsState; + private final DataStatisticsFactory statisticsFactory; + private DataStatistics localStatistics; + private DataStatistics globalStatistics; + private ListState> globalStatisticsState; public ShuffleOperator( KeySelector keySelector, From bdabda5f4082481b2a2d6105314eadd0bace33e7 Mon Sep 17 00:00:00 2001 From: gang_ye Date: Mon, 27 Feb 2023 22:57:38 -0800 Subject: [PATCH 11/15] rename operator from ShuffleOperator to DataStatisticsOperator --- ...va => DataStatisticsAndRecordWrapper.java} | 26 +++++++++---------- ...rator.java => DataStatisticsOperator.java} | 18 ++++++------- ...r.java => TestDataStatisticsOperator.java} | 18 ++++++------- 3 files changed, 31 insertions(+), 31 deletions(-) rename flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/{ShuffleRecordWrapper.java => DataStatisticsAndRecordWrapper.java} (62%) rename flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/{ShuffleOperator.java => DataStatisticsOperator.java} (85%) rename flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/{TestShuffleOperator.java => TestDataStatisticsOperator.java} (89%) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsAndRecordWrapper.java similarity index 62% rename from flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java rename to flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsAndRecordWrapper.java index d81897dfccda..4b0e6a63f080 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleRecordWrapper.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsAndRecordWrapper.java @@ -24,36 +24,36 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** - * The wrapper class for record and data statistics. It is the only way for shuffle operator to send + * The wrapper class for data statistics and record. It is the only way for data statistics operator to send * global data statistics to custom partitioner to distribute data based on statistics * - *

ShuffleRecordWrapper is sent from ShuffleOperator to partitioner. It contain either a record - * or data statistics(globally aggregated). Once partitioner receives the weight, it will use that - * to decide the coming record should send to which writer subtask. After shuffling, a filter and - * mapper are required to filter out the data distribution weight, unwrap the object and extract the - * original record type T. + *

DataStatisticsAndRecordWrapper is sent from {@link DataStatisticsOperator} to partitioner. It + * contains either data statistics(globally aggregated) or a record. Once partitioner receives the data + * statistics, it will use that to decide the coming record should send to which writer subtask. After + * shuffling, a filter and mapper are required to filter out the data distribution weight, unwrap the + * object and extract the original record type T. */ -public class ShuffleRecordWrapper implements Serializable { +public class DataStatisticsAndRecordWrapper implements Serializable { private static final long serialVersionUID = 1L; private final DataStatistics statistics; private final T record; - private ShuffleRecordWrapper(T record, DataStatistics statistics) { + private DataStatisticsAndRecordWrapper(T record, DataStatistics statistics) { Preconditions.checkArgument( record != null ^ statistics != null, - "A ShuffleRecordWrapper contain either record or statistics, not neither or both"); + "A DataStatisticsAndRecordWrapper contain either statistics or record, not neither or both"); this.statistics = statistics; this.record = record; } - static ShuffleRecordWrapper fromRecord(T record) { - return new ShuffleRecordWrapper<>(record, null); + static DataStatisticsAndRecordWrapper fromRecord(T record) { + return new DataStatisticsAndRecordWrapper<>(record, null); } - static ShuffleRecordWrapper fromDataStatistics(DataStatistics statistics) { - return new ShuffleRecordWrapper<>(null, statistics); + static DataStatisticsAndRecordWrapper fromDataStatistics(DataStatistics statistics) { + return new DataStatisticsAndRecordWrapper<>(null, statistics); } boolean hasDataStatistics() { diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java similarity index 85% rename from flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java rename to flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java index 4554a9261546..9fe6920d2943 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java @@ -36,15 +36,15 @@ import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; /** - * Shuffle operator can help to improve data clustering based on the key. + * DataStatisticsOperator can help to improve data clustering based on the key. * - *

Shuffle operator collects traffic distribution statistics. A custom partitioner shall be - * attached to the shuffle operator output. The custom partitioner leverages the statistics to + *

DataStatisticsOperator collects traffic distribution statistics. A custom partitioner shall be + * attached to the DataStatisticsOperator output. The custom partitioner leverages the statistics to * shuffle record to improve data clustering while maintaining relative balanced traffic * distribution to downstream subtasks. */ -class ShuffleOperator extends AbstractStreamOperator> - implements OneInputStreamOperator>, OperatorEventHandler { +class DataStatisticsOperator extends AbstractStreamOperator> + implements OneInputStreamOperator>, OperatorEventHandler { private static final long serialVersionUID = 1L; // keySelector will be used to generate key from data for collecting data statistics @@ -55,7 +55,7 @@ class ShuffleOperator extends AbstractStreamOperator globalStatistics; private ListState> globalStatisticsState; - public ShuffleOperator( + public DataStatisticsOperator( KeySelector keySelector, OperatorEventGateway operatorEventGateway, DataStatisticsFactory statisticsFactory) { @@ -93,7 +93,7 @@ public void initializeState(StateInitializationContext context) throws Exception @Override public void open() throws Exception { if (!globalStatistics.isEmpty()) { - output.collect(new StreamRecord<>(ShuffleRecordWrapper.fromDataStatistics(globalStatistics))); + output.collect(new StreamRecord<>(DataStatisticsAndRecordWrapper.fromDataStatistics(globalStatistics))); } } @@ -106,13 +106,13 @@ public void handleOperatorEvent(OperatorEvent evt) { public void processElement(StreamRecord streamRecord) throws Exception { final K key = keySelector.getKey(streamRecord.getValue()); localStatistics.add(key); - output.collect(new StreamRecord<>(ShuffleRecordWrapper.fromRecord(streamRecord.getValue()))); + output.collect(new StreamRecord<>(DataStatisticsAndRecordWrapper.fromRecord(streamRecord.getValue()))); } @Override public void snapshotState(StateSnapshotContext context) throws Exception { long checkpointId = context.getCheckpointId(); - LOG.debug("Taking shuffle operator snapshot for checkpoint {}", checkpointId); + LOG.debug("Taking data statistics operator snapshot for checkpoint {}", checkpointId); // Update globalStatisticsState with latest global statistics if (!globalStatistics.isEmpty()) { diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java similarity index 89% rename from flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java rename to flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java index 6a29dc8a6c57..22158b00f3c2 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java @@ -55,8 +55,8 @@ import org.junit.Before; import org.junit.Test; -public class TestShuffleOperator { - private ShuffleOperator operator; +public class TestDataStatisticsOperator { + private DataStatisticsOperator operator; private Environment getTestingEnvironment() { return new StreamMockEnvironment( @@ -83,7 +83,7 @@ public String getKey(String value) { }; DataStatisticsFactory dataStatisticsFactory = new MapDataStatisticsFactory<>(); - this.operator = new ShuffleOperator<>(keySelector, mockGateway, dataStatisticsFactory); + this.operator = new DataStatisticsOperator<>(keySelector, mockGateway, dataStatisticsFactory); Environment env = getTestingEnvironment(); this.operator.setup( new OneInputStreamTask(env), @@ -114,7 +114,7 @@ public void testProcessElement() throws Exception { @Test public void testOperatorOutput() throws Exception { - try (OneInputStreamOperatorTestHarness> + try (OneInputStreamOperatorTestHarness> testHarness = createHarness(this.operator)) { testHarness.processElement(new StreamRecord<>("a")); testHarness.processElement(new StreamRecord<>("b")); @@ -122,8 +122,8 @@ public void testOperatorOutput() throws Exception { List recordsOutput = testHarness.extractOutputValues().stream() - .filter(ShuffleRecordWrapper::hasRecord) - .map(ShuffleRecordWrapper::record) + .filter(DataStatisticsAndRecordWrapper::hasRecord) + .map(DataStatisticsAndRecordWrapper::record) .collect(Collectors.toList()); assertThat(recordsOutput) .containsExactlyInAnyOrderElementsOf(ImmutableList.of("a", "b", "b")); @@ -144,9 +144,9 @@ private OperatorStateStore createOperatorStateStore() throws Exception { env, "test-operator", Collections.emptyList(), cancelStreamRegistry); } - private OneInputStreamOperatorTestHarness> - createHarness(final ShuffleOperator operator) throws Exception { - OneInputStreamOperatorTestHarness> harness = + private OneInputStreamOperatorTestHarness> + createHarness(final DataStatisticsOperator operator) throws Exception { + OneInputStreamOperatorTestHarness> harness = new OneInputStreamOperatorTestHarness<>(operator, 1, 1, 0); harness.setup(); harness.open(); From c69e2c5e68b2491112f2f159e58b7b248b065d9e Mon Sep 17 00:00:00 2001 From: gang_ye Date: Fri, 3 Mar 2023 23:17:31 -0800 Subject: [PATCH 12/15] use merge to add key or merge statistics and rename DataStatisticsAndRecordWrapper to DataStatisticsOrRecord --- .../sink/shuffle/DataStatisticsOperator.java | 8 ++++---- ...dWrapper.java => DataStatisticsOrRecord.java} | 16 ++++++++-------- .../shuffle/statistics/MapDataStatistics.java | 4 ++-- 3 files changed, 14 insertions(+), 14 deletions(-) rename flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/{DataStatisticsAndRecordWrapper.java => DataStatisticsOrRecord.java} (77%) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java index 9fe6920d2943..a0cb7c93fa52 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java @@ -43,8 +43,8 @@ * shuffle record to improve data clustering while maintaining relative balanced traffic * distribution to downstream subtasks. */ -class DataStatisticsOperator extends AbstractStreamOperator> - implements OneInputStreamOperator>, OperatorEventHandler { +class DataStatisticsOperator extends AbstractStreamOperator> + implements OneInputStreamOperator>, OperatorEventHandler { private static final long serialVersionUID = 1L; // keySelector will be used to generate key from data for collecting data statistics @@ -93,7 +93,7 @@ public void initializeState(StateInitializationContext context) throws Exception @Override public void open() throws Exception { if (!globalStatistics.isEmpty()) { - output.collect(new StreamRecord<>(DataStatisticsAndRecordWrapper.fromDataStatistics(globalStatistics))); + output.collect(new StreamRecord<>(DataStatisticsOrRecord.fromDataStatistics(globalStatistics))); } } @@ -106,7 +106,7 @@ public void handleOperatorEvent(OperatorEvent evt) { public void processElement(StreamRecord streamRecord) throws Exception { final K key = keySelector.getKey(streamRecord.getValue()); localStatistics.add(key); - output.collect(new StreamRecord<>(DataStatisticsAndRecordWrapper.fromRecord(streamRecord.getValue()))); + output.collect(new StreamRecord<>(DataStatisticsOrRecord.fromRecord(streamRecord.getValue()))); } @Override diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsAndRecordWrapper.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOrRecord.java similarity index 77% rename from flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsAndRecordWrapper.java rename to flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOrRecord.java index 4b0e6a63f080..60b2f70923e8 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsAndRecordWrapper.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOrRecord.java @@ -27,33 +27,33 @@ * The wrapper class for data statistics and record. It is the only way for data statistics operator to send * global data statistics to custom partitioner to distribute data based on statistics * - *

DataStatisticsAndRecordWrapper is sent from {@link DataStatisticsOperator} to partitioner. It + *

DataStatisticsOrRecord is sent from {@link DataStatisticsOperator} to partitioner. It * contains either data statistics(globally aggregated) or a record. Once partitioner receives the data * statistics, it will use that to decide the coming record should send to which writer subtask. After * shuffling, a filter and mapper are required to filter out the data distribution weight, unwrap the * object and extract the original record type T. */ -public class DataStatisticsAndRecordWrapper implements Serializable { +public class DataStatisticsOrRecord implements Serializable { private static final long serialVersionUID = 1L; private final DataStatistics statistics; private final T record; - private DataStatisticsAndRecordWrapper(T record, DataStatistics statistics) { + private DataStatisticsOrRecord(T record, DataStatistics statistics) { Preconditions.checkArgument( record != null ^ statistics != null, - "A DataStatisticsAndRecordWrapper contain either statistics or record, not neither or both"); + "A DataStatisticsOrRecord contain either statistics or record, not neither or both"); this.statistics = statistics; this.record = record; } - static DataStatisticsAndRecordWrapper fromRecord(T record) { - return new DataStatisticsAndRecordWrapper<>(record, null); + static DataStatisticsOrRecord fromRecord(T record) { + return new DataStatisticsOrRecord<>(record, null); } - static DataStatisticsAndRecordWrapper fromDataStatistics(DataStatistics statistics) { - return new DataStatisticsAndRecordWrapper<>(null, statistics); + static DataStatisticsOrRecord fromDataStatistics(DataStatistics statistics) { + return new DataStatisticsOrRecord<>(null, statistics); } boolean hasDataStatistics() { diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatistics.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatistics.java index 59638ea3cb9d..44d1393c9704 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatistics.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatistics.java @@ -37,7 +37,7 @@ public boolean isEmpty() { @Override public void add(K key) { // increase count of occurrence by one in the dataStatistics map - statistics.put(key, statistics.getOrDefault(key, 0L) + 1L); + statistics.merge(key, 1L, Long::sum); } @Override @@ -47,7 +47,7 @@ public void merge(DataStatistics otherStatistics) { "Can not merge this type of statistics: " + otherStatistics); MapDataStatistics mapDataStatistic = (MapDataStatistics) otherStatistics; mapDataStatistic.statistics.forEach( - (key, count) -> statistics.put(key, statistics.getOrDefault(key, 0L) + count)); + (key, count) -> statistics.merge(key, count, Long::sum)); } public Map dataStatistics() { From 371a7b64d56b82e8db0c0546296c8cff47d3e247 Mon Sep 17 00:00:00 2001 From: gang_ye Date: Tue, 14 Mar 2023 22:38:31 -0700 Subject: [PATCH 13/15] use unionListState in operator and fix unit test --- .../flink/sink/shuffle/DataStatisticsOperator.java | 2 +- .../flink/sink/shuffle/TestDataStatisticsOperator.java | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java index a0cb7c93fa52..3360ba2c0e97 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java @@ -70,7 +70,7 @@ public void initializeState(StateInitializationContext context) throws Exception globalStatisticsState = context .getOperatorStateStore() - .getListState( + .getUnionListState( new ListStateDescriptor<>( "globalStatisticsState", TypeInformation.of(new TypeHint>() {}))); diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java index 22158b00f3c2..743c0b7a0c8b 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java @@ -114,7 +114,7 @@ public void testProcessElement() throws Exception { @Test public void testOperatorOutput() throws Exception { - try (OneInputStreamOperatorTestHarness> + try (OneInputStreamOperatorTestHarness> testHarness = createHarness(this.operator)) { testHarness.processElement(new StreamRecord<>("a")); testHarness.processElement(new StreamRecord<>("b")); @@ -122,8 +122,8 @@ public void testOperatorOutput() throws Exception { List recordsOutput = testHarness.extractOutputValues().stream() - .filter(DataStatisticsAndRecordWrapper::hasRecord) - .map(DataStatisticsAndRecordWrapper::record) + .filter(DataStatisticsOrRecord::hasRecord) + .map(DataStatisticsOrRecord::record) .collect(Collectors.toList()); assertThat(recordsOutput) .containsExactlyInAnyOrderElementsOf(ImmutableList.of("a", "b", "b")); @@ -144,9 +144,9 @@ private OperatorStateStore createOperatorStateStore() throws Exception { env, "test-operator", Collections.emptyList(), cancelStreamRegistry); } - private OneInputStreamOperatorTestHarness> + private OneInputStreamOperatorTestHarness> createHarness(final DataStatisticsOperator operator) throws Exception { - OneInputStreamOperatorTestHarness> harness = + OneInputStreamOperatorTestHarness> harness = new OneInputStreamOperatorTestHarness<>(operator, 1, 1, 0); harness.setup(); harness.open(); From 0fd1a86eb226a9303ae3d7245a54620dd9ca283b Mon Sep 17 00:00:00 2001 From: gang_ye Date: Tue, 21 Mar 2023 23:22:46 -0700 Subject: [PATCH 14/15] add transient volatile to operator statistics variables and only save global statistics for subtask0 --- .../sink/shuffle/DataStatisticsOperator.java | 20 +++++++++---------- .../shuffle/statistics/MapDataStatistics.java | 2 +- .../shuffle/TestDataStatisticsOperator.java | 15 +++++--------- 3 files changed, 16 insertions(+), 21 deletions(-) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java index 3360ba2c0e97..4e3d98c85618 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java @@ -36,9 +36,7 @@ import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; /** - * DataStatisticsOperator can help to improve data clustering based on the key. - * - *

DataStatisticsOperator collects traffic distribution statistics. A custom partitioner shall be + * DataStatisticsOperator collects traffic distribution statistics. A custom partitioner shall be * attached to the DataStatisticsOperator output. The custom partitioner leverages the statistics to * shuffle record to improve data clustering while maintaining relative balanced traffic * distribution to downstream subtasks. @@ -51,11 +49,11 @@ class DataStatisticsOperator extends AbstractStreamOperator keySelector; private final OperatorEventGateway operatorEventGateway; private final DataStatisticsFactory statisticsFactory; - private DataStatistics localStatistics; - private DataStatistics globalStatistics; - private ListState> globalStatisticsState; + private transient volatile DataStatistics localStatistics; + private transient volatile DataStatistics globalStatistics; + private transient ListState> globalStatisticsState; - public DataStatisticsOperator( + DataStatisticsOperator( KeySelector keySelector, OperatorEventGateway operatorEventGateway, DataStatisticsFactory statisticsFactory) { @@ -112,11 +110,13 @@ public void processElement(StreamRecord streamRecord) throws Exception { @Override public void snapshotState(StateSnapshotContext context) throws Exception { long checkpointId = context.getCheckpointId(); - LOG.debug("Taking data statistics operator snapshot for checkpoint {}", checkpointId); + LOG.info("Taking data statistics operator snapshot for checkpoint {}", checkpointId); - // Update globalStatisticsState with latest global statistics - if (!globalStatistics.isEmpty()) { + // Only subtask 0 saves the state so that globalStatisticsState(UnionListState) stores + // an exact copy of globalStatistics + if (!globalStatistics.isEmpty() && getRuntimeContext().getIndexOfThisSubtask() == 0) { globalStatisticsState.clear(); + LOG.debug("Saving global statistics {} to state", globalStatistics); globalStatisticsState.add(globalStatistics); } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatistics.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatistics.java index 44d1393c9704..47582cb8df11 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatistics.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatistics.java @@ -44,7 +44,7 @@ public void add(K key) { public void merge(DataStatistics otherStatistics) { Preconditions.checkArgument( otherStatistics instanceof MapDataStatistics, - "Can not merge this type of statistics: " + otherStatistics); + "Map statistics can not merge with " + otherStatistics.getClass()); MapDataStatistics mapDataStatistic = (MapDataStatistics) otherStatistics; mapDataStatistic.statistics.forEach( (key, count) -> statistics.merge(key, count, Long::sum)); diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java index 743c0b7a0c8b..3d3c1f529b25 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java @@ -131,23 +131,18 @@ public void testOperatorOutput() throws Exception { } private StateInitializationContext getStateContext() throws Exception { - // Create the state context. - OperatorStateStore operatorStateStore = createOperatorStateStore(); - return new StateInitializationContextImpl(null, operatorStateStore, null, null, null); - } - - private OperatorStateStore createOperatorStateStore() throws Exception { MockEnvironment env = new MockEnvironmentBuilder().build(); AbstractStateBackend abstractStateBackend = new HashMapStateBackend(); CloseableRegistry cancelStreamRegistry = new CloseableRegistry(); - return abstractStateBackend.createOperatorStateBackend( - env, "test-operator", Collections.emptyList(), cancelStreamRegistry); + OperatorStateStore operatorStateStore = abstractStateBackend.createOperatorStateBackend( + env, "test-operator", Collections.emptyList(), cancelStreamRegistry); + return new StateInitializationContextImpl(null, operatorStateStore, null, null, null); } private OneInputStreamOperatorTestHarness> - createHarness(final DataStatisticsOperator operator) throws Exception { + createHarness(final DataStatisticsOperator dataStatisticsOperator) throws Exception { OneInputStreamOperatorTestHarness> harness = - new OneInputStreamOperatorTestHarness<>(operator, 1, 1, 0); + new OneInputStreamOperatorTestHarness<>(dataStatisticsOperator, 1, 1, 0); harness.setup(); harness.open(); return harness; From 608628565e86a0ef48e1e80da5572b543cd6f082 Mon Sep 17 00:00:00 2001 From: gang_ye Date: Mon, 27 Mar 2023 15:04:05 -0700 Subject: [PATCH 15/15] add subtask id to snapshot log --- .../sink/shuffle/DataStatisticsOperator.java | 16 +++++++++++----- .../sink/shuffle/DataStatisticsOrRecord.java | 14 +++++++------- .../shuffle/statistics/MapDataStatistics.java | 3 +-- .../sink/shuffle/TestDataStatisticsOperator.java | 6 ++++-- 4 files changed, 23 insertions(+), 16 deletions(-) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java index 4e3d98c85618..2582104de3c4 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java @@ -91,7 +91,8 @@ public void initializeState(StateInitializationContext context) throws Exception @Override public void open() throws Exception { if (!globalStatistics.isEmpty()) { - output.collect(new StreamRecord<>(DataStatisticsOrRecord.fromDataStatistics(globalStatistics))); + output.collect( + new StreamRecord<>(DataStatisticsOrRecord.fromDataStatistics(globalStatistics))); } } @@ -102,21 +103,26 @@ public void handleOperatorEvent(OperatorEvent evt) { @Override public void processElement(StreamRecord streamRecord) throws Exception { - final K key = keySelector.getKey(streamRecord.getValue()); + T record = streamRecord.getValue(); + K key = keySelector.getKey(record); localStatistics.add(key); - output.collect(new StreamRecord<>(DataStatisticsOrRecord.fromRecord(streamRecord.getValue()))); + output.collect(new StreamRecord<>(DataStatisticsOrRecord.fromRecord(record))); } @Override public void snapshotState(StateSnapshotContext context) throws Exception { long checkpointId = context.getCheckpointId(); - LOG.info("Taking data statistics operator snapshot for checkpoint {}", checkpointId); + int subTaskId = getRuntimeContext().getIndexOfThisSubtask(); + LOG.info( + "Taking data statistics operator snapshot for checkpoint {} in subtask {}", + checkpointId, + subTaskId); // Only subtask 0 saves the state so that globalStatisticsState(UnionListState) stores // an exact copy of globalStatistics if (!globalStatistics.isEmpty() && getRuntimeContext().getIndexOfThisSubtask() == 0) { globalStatisticsState.clear(); - LOG.debug("Saving global statistics {} to state", globalStatistics); + LOG.info("Saving global statistics {} to state in subtask {}", globalStatistics, subTaskId); globalStatisticsState.add(globalStatistics); } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOrRecord.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOrRecord.java index 60b2f70923e8..fae563fe1758 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOrRecord.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOrRecord.java @@ -24,14 +24,14 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** - * The wrapper class for data statistics and record. It is the only way for data statistics operator to send - * global data statistics to custom partitioner to distribute data based on statistics + * The wrapper class for data statistics and record. It is the only way for data statistics operator + * to send global data statistics to custom partitioner to distribute data based on statistics * - *

DataStatisticsOrRecord is sent from {@link DataStatisticsOperator} to partitioner. It - * contains either data statistics(globally aggregated) or a record. Once partitioner receives the data - * statistics, it will use that to decide the coming record should send to which writer subtask. After - * shuffling, a filter and mapper are required to filter out the data distribution weight, unwrap the - * object and extract the original record type T. + *

DataStatisticsOrRecord contains either data statistics(globally aggregated) or a record. It is + * sent from {@link DataStatisticsOperator} to partitioner. Once partitioner receives the data + * statistics, it will use that to decide the coming record should send to which writer subtask. + * After shuffling, a filter and mapper are required to filter out the data distribution weight, + * unwrap the object and extract the original record type T. */ public class DataStatisticsOrRecord implements Serializable { diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatistics.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatistics.java index 47582cb8df11..c737d0238778 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatistics.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatistics.java @@ -46,8 +46,7 @@ public void merge(DataStatistics otherStatistics) { otherStatistics instanceof MapDataStatistics, "Map statistics can not merge with " + otherStatistics.getClass()); MapDataStatistics mapDataStatistic = (MapDataStatistics) otherStatistics; - mapDataStatistic.statistics.forEach( - (key, count) -> statistics.merge(key, count, Long::sum)); + mapDataStatistic.statistics.forEach((key, count) -> statistics.merge(key, count, Long::sum)); } public Map dataStatistics() { diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java index 3d3c1f529b25..6801cfcf720b 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java @@ -134,13 +134,15 @@ private StateInitializationContext getStateContext() throws Exception { MockEnvironment env = new MockEnvironmentBuilder().build(); AbstractStateBackend abstractStateBackend = new HashMapStateBackend(); CloseableRegistry cancelStreamRegistry = new CloseableRegistry(); - OperatorStateStore operatorStateStore = abstractStateBackend.createOperatorStateBackend( + OperatorStateStore operatorStateStore = + abstractStateBackend.createOperatorStateBackend( env, "test-operator", Collections.emptyList(), cancelStreamRegistry); return new StateInitializationContextImpl(null, operatorStateStore, null, null, null); } private OneInputStreamOperatorTestHarness> - createHarness(final DataStatisticsOperator dataStatisticsOperator) throws Exception { + createHarness(final DataStatisticsOperator dataStatisticsOperator) + throws Exception { OneInputStreamOperatorTestHarness> harness = new OneInputStreamOperatorTestHarness<>(dataStatisticsOperator, 1, 1, 0); harness.setup();