Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink.shuffle;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.flink.sink.shuffle.statistics.DataStatistics;
import org.apache.iceberg.flink.sink.shuffle.statistics.DataStatisticsFactory;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;

/**
* DataStatisticsOperator collects traffic distribution statistics. A custom partitioner shall be
* attached to the DataStatisticsOperator output. The custom partitioner leverages the statistics to
* shuffle record to improve data clustering while maintaining relative balanced traffic
* distribution to downstream subtasks.
*/
class DataStatisticsOperator<T, K> extends AbstractStreamOperator<DataStatisticsOrRecord<T, K>>
implements OneInputStreamOperator<T, DataStatisticsOrRecord<T, K>>, OperatorEventHandler {
private static final long serialVersionUID = 1L;

// keySelector will be used to generate key from data for collecting data statistics
private final KeySelector<T, K> keySelector;
private final OperatorEventGateway operatorEventGateway;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't seem to be in use.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be used in the next PR with the shuffle coordinator to aggregate statistics globally

private final DataStatisticsFactory<K> statisticsFactory;
private transient volatile DataStatistics<K> localStatistics;
private transient volatile DataStatistics<K> globalStatistics;
private transient ListState<DataStatistics<K>> globalStatisticsState;

DataStatisticsOperator(
KeySelector<T, K> keySelector,
OperatorEventGateway operatorEventGateway,
DataStatisticsFactory<K> statisticsFactory) {
this.keySelector = keySelector;
this.operatorEventGateway = operatorEventGateway;
this.statisticsFactory = statisticsFactory;
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
localStatistics = statisticsFactory.createDataStatistics();
globalStatisticsState =
context
.getOperatorStateStore()
.getUnionListState(
Comment thread
yegangy0718 marked this conversation as resolved.
new ListStateDescriptor<>(
"globalStatisticsState",
TypeInformation.of(new TypeHint<DataStatistics<K>>() {})));

if (context.isRestored()) {
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
if (globalStatisticsState.get() == null
|| !globalStatisticsState.get().iterator().hasNext()) {
LOG.warn("Subtask {} doesn't have global statistics state to restore", subtaskIndex);
globalStatistics = statisticsFactory.createDataStatistics();
} else {
LOG.info("Restoring global statistics state for subtask {}", subtaskIndex);
globalStatistics = globalStatisticsState.get().iterator().next();
}
} else {
globalStatistics = statisticsFactory.createDataStatistics();
}
}

@Override
public void open() throws Exception {
if (!globalStatistics.isEmpty()) {
output.collect(
new StreamRecord<>(DataStatisticsOrRecord.fromDataStatistics(globalStatistics)));
}
}

@Override
public void handleOperatorEvent(OperatorEvent evt) {
// TODO: receive event with aggregated statistics from coordinator and update globalStatistics
}

@Override
public void processElement(StreamRecord<T> streamRecord) throws Exception {
T record = streamRecord.getValue();
K key = keySelector.getKey(record);
localStatistics.add(key);
output.collect(new StreamRecord<>(DataStatisticsOrRecord.fromRecord(record)));
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
long checkpointId = context.getCheckpointId();
int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
LOG.info(
"Taking data statistics operator snapshot for checkpoint {} in subtask {}",
checkpointId,
subTaskId);

// Only subtask 0 saves the state so that globalStatisticsState(UnionListState) stores
// an exact copy of globalStatistics
if (!globalStatistics.isEmpty() && getRuntimeContext().getIndexOfThisSubtask() == 0) {
Comment thread
stevenzwu marked this conversation as resolved.
globalStatisticsState.clear();
LOG.info("Saving global statistics {} to state in subtask {}", globalStatistics, subTaskId);
globalStatisticsState.add(globalStatistics);
Comment thread
yegangy0718 marked this conversation as resolved.
}

// TODO: send to coordinator
// For now we make it simple to send globalStatisticsState at checkpoint

// Recreate the local statistics
localStatistics = statisticsFactory.createDataStatistics();
}

@VisibleForTesting
DataStatistics<K> localDataStatistics() {
return localStatistics;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink.shuffle;

import java.io.Serializable;
import org.apache.iceberg.flink.sink.shuffle.statistics.DataStatistics;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* The wrapper class for data statistics and record. It is the only way for data statistics operator
* to send global data statistics to custom partitioner to distribute data based on statistics
*
* <p>DataStatisticsOrRecord contains either data statistics(globally aggregated) or a record. It is
* sent from {@link DataStatisticsOperator} to partitioner. Once partitioner receives the data
* statistics, it will use that to decide the coming record should send to which writer subtask.
* After shuffling, a filter and mapper are required to filter out the data distribution weight,
* unwrap the object and extract the original record type T.
*/
public class DataStatisticsOrRecord<T, K> implements Serializable {

private static final long serialVersionUID = 1L;

private final DataStatistics<K> statistics;
private final T record;

private DataStatisticsOrRecord(T record, DataStatistics<K> statistics) {
Preconditions.checkArgument(
record != null ^ statistics != null,
"A DataStatisticsOrRecord contain either statistics or record, not neither or both");
this.statistics = statistics;
this.record = record;
}

static <T, K> DataStatisticsOrRecord<T, K> fromRecord(T record) {
return new DataStatisticsOrRecord<>(record, null);
}

static <T, K> DataStatisticsOrRecord<T, K> fromDataStatistics(DataStatistics<K> statistics) {
return new DataStatisticsOrRecord<>(null, statistics);
}

boolean hasDataStatistics() {
return statistics != null;
}

boolean hasRecord() {
return record != null;
}

DataStatistics<K> dataStatistics() {
return statistics;
}

T record() {
return record;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("statistics", statistics)
.add("record", record)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink.shuffle.statistics;

import org.apache.flink.annotation.Internal;

/**
* DataStatistics defines the interface to collect data distribution information.
*
* <p>Data statistics tracks traffic volume distribution across data keys. For low-cardinality key,
* a simple map of (key, count) can be used. For high-cardinality key, probabilistic data structures
* (sketching) can be used.
*/
@Internal
public interface DataStatistics<K> {

/**
* Check if data statistics contains any statistics information
*
* @return true if data statistics doesn't contain any statistics information
*/
boolean isEmpty();

/**
* Add data key to data statistics.
*
* @param key generate from data by applying key selector
*/
void add(K key);

/**
* Merge current statistics with other statistics
*
* @param otherStatistics the statistics to be merged
*/
void merge(DataStatistics<K> otherStatistics);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink.shuffle.statistics;

import org.apache.flink.annotation.Internal;

/**
* DataStatisticsFactory defines the interface to create {@link DataStatistics}.
*
* <p>For low-cardinality key, MapDataStatisticsFactory will be implemented to create
* MapDataStatistics.
*/
@Internal
public interface DataStatisticsFactory<K> {

DataStatistics<K> createDataStatistics();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink.shuffle.statistics;

import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

/** MapDataStatistics uses map to count key frequency */
@Internal
public class MapDataStatistics<K> implements DataStatistics<K> {
private final Map<K, Long> statistics = Maps.newHashMap();

@Override
public boolean isEmpty() {
return statistics.size() == 0;
}

@Override
public void add(K key) {
// increase count of occurrence by one in the dataStatistics map
statistics.merge(key, 1L, Long::sum);
}

@Override
public void merge(DataStatistics<K> otherStatistics) {
Preconditions.checkArgument(
otherStatistics instanceof MapDataStatistics,
"Map statistics can not merge with " + otherStatistics.getClass());
MapDataStatistics<K> mapDataStatistic = (MapDataStatistics<K>) otherStatistics;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if the input is not of type MapDataStatistics during the type conversion in line 48, will throw an error on its own. Therefore, whether it is necessary to perform this checkArgument?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if the input is not in the right type, it would be better to throw IllegalArgumentException.
I checked the other places as well, for example at

Preconditions.checkArgument(term instanceof UnboundTerm, "Term must be unbound");

and
Preconditions.checkArgument(flinkType instanceof RowType, "%s is not a RowType.", flinkType);

It's common that first checking type and then convert it into specific type.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @yegangy0718 that it is a little better to check the type IllegalArgumentException

mapDataStatistic.statistics.forEach((key, count) -> statistics.merge(key, count, Long::sum));
}

public Map<K, Long> dataStatistics() {
return statistics;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("statistics", statistics).toString();
}
}
Loading