Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink.dynamic;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Map;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

/**
* {@link org.apache.iceberg.flink.sink.dynamic.DataConverter} is responsible to change the input
* data to make it compatible with the target schema. This is done when
*
* <ul>
* <li>The input schema has fewer fields than the target schema.
* <li>The table types are wider than the input type.
* <li>The field order differs for source and target schema.
* </ul>
*
* <p>The resolution is as follows:
*
* <ul>
* <li>In the first case, we would add a null values for the missing field (if the field is
* optional).
* <li>In the second case, we would convert the data for the input field to a wider type, e.g. int
* (input type) => long (table type).
* <li>In the third case, we would rearrange the input data to match the target table.
* </ul>
*/
interface DataConverter {
Object convert(Object object);

static DataConverter identity() {
return object -> object;
}

static DataConverter getNullable(LogicalType sourceType, LogicalType targetType) {
return nullable(get(sourceType, targetType));
}

static DataConverter get(LogicalType sourceType, LogicalType targetType) {
switch (targetType.getTypeRoot()) {
case BOOLEAN:
case INTEGER:
case FLOAT:
case VARCHAR:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
case BINARY:
case VARBINARY:
return object -> object;
case DOUBLE:
return object -> {
if (object instanceof Float) {
return ((Float) object).doubleValue();
} else {
return object;
}
};
case BIGINT:
return object -> {
if (object instanceof Integer) {
return ((Integer) object).longValue();
} else {
return object;
}
};
case DECIMAL:
return object -> {
DecimalType toDecimalType = (DecimalType) targetType;
DecimalData decimalData = (DecimalData) object;
if (((DecimalType) sourceType).getPrecision() == toDecimalType.getPrecision()) {
return object;
} else {
return DecimalData.fromBigDecimal(
decimalData.toBigDecimal(), toDecimalType.getPrecision(), toDecimalType.getScale());
}
};
case TIMESTAMP_WITHOUT_TIME_ZONE:
return object -> {
if (object instanceof Integer) {
LocalDateTime dateTime =
LocalDateTime.of(LocalDate.ofEpochDay((Integer) object), LocalTime.MIN);
return TimestampData.fromLocalDateTime(dateTime);
} else {
return object;
}
};
case ROW:
return new RowDataConverter((RowType) sourceType, (RowType) targetType);
case ARRAY:
return new ArrayConverter((ArrayType) sourceType, (ArrayType) targetType);
case MAP:
return new MapConverter((MapType) sourceType, (MapType) targetType);
default:
throw new UnsupportedOperationException("Not a supported type: " + targetType);
}
}

static DataConverter nullable(DataConverter converter) {
return value -> value == null ? null : converter.convert(value);
}

class RowDataConverter implements DataConverter {
private final RowData.FieldGetter[] fieldGetters;
private final DataConverter[] dataConverters;

RowDataConverter(RowType sourceType, RowType targetType) {
this.fieldGetters = new RowData.FieldGetter[targetType.getFields().size()];
this.dataConverters = new DataConverter[targetType.getFields().size()];

for (int i = 0; i < targetType.getFields().size(); i++) {
RowData.FieldGetter fieldGetter;
DataConverter dataConverter;
RowType.RowField targetField = targetType.getFields().get(i);
int sourceFieldIndex = sourceType.getFieldIndex(targetField.getName());
if (sourceFieldIndex == -1) {
if (targetField.getType().isNullable()) {
fieldGetter = row -> null;
dataConverter = value -> null;
} else {
throw new IllegalArgumentException(
String.format(
"Field %s in target schema %s is non-nullable but does not exist in source schema.",
i + 1, targetType));
}
} else {
RowType.RowField sourceField = sourceType.getFields().get(sourceFieldIndex);
fieldGetter = RowData.createFieldGetter(sourceField.getType(), sourceFieldIndex);
dataConverter = DataConverter.getNullable(sourceField.getType(), targetField.getType());
}

this.fieldGetters[i] = fieldGetter;
this.dataConverters[i] = dataConverter;
}
}

@Override
public RowData convert(Object object) {
RowData sourceData = (RowData) object;
GenericRowData targetData = new GenericRowData(fieldGetters.length);
for (int i = 0; i < fieldGetters.length; i++) {
Object value = fieldGetters[i].getFieldOrNull(sourceData);
targetData.setField(i, dataConverters[i].convert(value));
}

return targetData;
}
}

class ArrayConverter implements DataConverter {
private final ArrayData.ElementGetter elementGetter;
private final DataConverter elementConverter;

ArrayConverter(ArrayType sourceType, ArrayType targetType) {
this.elementGetter = ArrayData.createElementGetter(sourceType.getElementType());
this.elementConverter =
DataConverter.getNullable(sourceType.getElementType(), targetType.getElementType());
}

@Override
public ArrayData convert(Object object) {
ArrayData arrayData = (ArrayData) object;
Object[] convertedArray = new Object[arrayData.size()];
for (int i = 0; i < convertedArray.length; i++) {
Object element = elementGetter.getElementOrNull(arrayData, i);
convertedArray[i] = elementConverter.convert(element);
}

return new GenericArrayData(convertedArray);
}
}

class MapConverter implements DataConverter {
private final ArrayData.ElementGetter keyGetter;
private final ArrayData.ElementGetter valueGetter;
private final DataConverter keyConverter;
private final DataConverter valueConverter;

MapConverter(MapType sourceType, MapType targetType) {
this.keyGetter = ArrayData.createElementGetter(sourceType.getKeyType());
this.valueGetter = ArrayData.createElementGetter(sourceType.getValueType());
this.keyConverter =
DataConverter.getNullable(sourceType.getKeyType(), targetType.getKeyType());
this.valueConverter =
DataConverter.getNullable(sourceType.getValueType(), targetType.getValueType());
}

@Override
public MapData convert(Object object) {
MapData sourceData = (MapData) object;
ArrayData keyArray = sourceData.keyArray();
ArrayData valueArray = sourceData.valueArray();
Map<Object, Object> convertedMap = Maps.newLinkedHashMap();
for (int i = 0; i < keyArray.size(); ++i) {
convertedMap.put(
keyConverter.convert(keyGetter.getElementOrNull(keyArray, i)),
valueConverter.convert(valueGetter.getElementOrNull(valueArray, i)));
}

return new GenericMapData(convertedMap);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public static class Builder<T> {
private boolean immediateUpdate = false;
private int cacheMaximumSize = 100;
private long cacheRefreshMs = 1_000;
private int inputSchemasPerTableCacheMaximumSize = 10;

Builder() {}

Expand Down Expand Up @@ -315,6 +316,16 @@ public Builder<T> cacheRefreshMs(long refreshMs) {
return this;
}

/**
* Maximum input {@link org.apache.iceberg.Schema} objects to cache per each Iceberg table. The
* cache improves Dynamic Sink performance by storing {@link org.apache.iceberg.Schema}
* comparison results.
*/
public Builder<T> inputSchemasPerTableCacheMaxSize(int inputSchemasPerTableCacheMaxSize) {
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaxSize;
return this;
}

private String operatorName(String suffix) {
return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
}
Expand Down Expand Up @@ -358,7 +369,12 @@ public DataStreamSink<DynamicRecordInternal> append() {
input
.process(
new DynamicRecordProcessor<>(
generator, catalogLoader, immediateUpdate, cacheMaximumSize, cacheRefreshMs))
generator,
catalogLoader,
immediateUpdate,
cacheMaximumSize,
cacheRefreshMs,
inputSchemasPerTableCacheMaximumSize))
.uid(prefixIfNotNull(uidPrefix, "-generator"))
.name(operatorName("generator"))
.returns(type);
Expand All @@ -370,7 +386,12 @@ public DataStreamSink<DynamicRecordInternal> append() {
DynamicRecordProcessor.DYNAMIC_TABLE_UPDATE_STREAM,
new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize)))
.keyBy((KeySelector<DynamicRecordInternal, String>) DynamicRecordInternal::tableName)
.map(new DynamicTableUpdateOperator(catalogLoader, cacheMaximumSize, cacheRefreshMs))
.map(
new DynamicTableUpdateOperator(
catalogLoader,
cacheMaximumSize,
cacheRefreshMs,
inputSchemasPerTableCacheMaximumSize))
.uid(prefixIfNotNull(uidPrefix, "-updater"))
.name(operatorName("Updater"))
.returns(type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
Expand All @@ -43,6 +42,7 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T, DynamicRecordInternal
private final boolean immediateUpdate;
private final int cacheMaximumSize;
private final long cacheRefreshMs;
private final int inputSchemasPerTableCacheMaximumSize;

private transient TableMetadataCache tableCache;
private transient HashKeyGenerator hashKeyGenerator;
Expand All @@ -56,19 +56,23 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T, DynamicRecordInternal
CatalogLoader catalogLoader,
boolean immediateUpdate,
int cacheMaximumSize,
long cacheRefreshMs) {
long cacheRefreshMs,
int inputSchemasPerTableCacheMaximumSize) {
this.generator = generator;
this.catalogLoader = catalogLoader;
this.immediateUpdate = immediateUpdate;
this.cacheMaximumSize = cacheMaximumSize;
this.cacheRefreshMs = cacheRefreshMs;
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize;
}

@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
Catalog catalog = catalogLoader.loadCatalog();
this.tableCache = new TableMetadataCache(catalog, cacheMaximumSize, cacheRefreshMs);
this.tableCache =
new TableMetadataCache(
catalog, cacheMaximumSize, cacheRefreshMs, inputSchemasPerTableCacheMaximumSize);
this.hashKeyGenerator =
new HashKeyGenerator(
cacheMaximumSize, getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks());
Expand Down Expand Up @@ -97,7 +101,7 @@ public void collect(DynamicRecord data) {
boolean exists = tableCache.exists(data.tableIdentifier()).f0;
String foundBranch = exists ? tableCache.branch(data.tableIdentifier(), data.branch()) : null;

Tuple2<Schema, CompareSchemasVisitor.Result> foundSchema =
TableMetadataCache.ResolvedSchemaInfo foundSchema =
exists
? tableCache.schema(data.tableIdentifier(), data.schema())
: TableMetadataCache.NOT_FOUND;
Expand All @@ -107,16 +111,23 @@ public void collect(DynamicRecord data) {
if (!exists
|| foundBranch == null
|| foundSpec == null
|| foundSchema.f1 == CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) {
|| foundSchema.compareResult() == CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) {
if (immediateUpdate) {
Tuple3<Schema, CompareSchemasVisitor.Result, PartitionSpec> newData =
Tuple2<TableMetadataCache.ResolvedSchemaInfo, PartitionSpec> newData =
updater.update(data.tableIdentifier(), data.branch(), data.schema(), data.spec());
emit(collector, data, newData.f0, newData.f1, newData.f2);
emit(
collector,
data,
newData.f0.resolvedTableSchema(),
newData.f0.recordConverter(),
newData.f1);
} else {
int writerKey =
hashKeyGenerator.generateKey(
data,
foundSchema.f0 != null ? foundSchema.f0 : data.schema(),
foundSchema.resolvedTableSchema() != null
? foundSchema.resolvedTableSchema()
: data.schema(),
foundSpec != null ? foundSpec : data.spec(),
data.rowData());
context.output(
Expand All @@ -132,20 +143,22 @@ public void collect(DynamicRecord data) {
DynamicSinkUtil.getEqualityFieldIds(data.equalityFields(), data.schema())));
}
} else {
emit(collector, data, foundSchema.f0, foundSchema.f1, foundSpec);
emit(
collector,
data,
foundSchema.resolvedTableSchema(),
foundSchema.recordConverter(),
foundSpec);
}
}

private void emit(
Collector<DynamicRecordInternal> out,
DynamicRecord data,
Schema schema,
CompareSchemasVisitor.Result result,
DataConverter recordConverter,
PartitionSpec spec) {
RowData rowData =
result == CompareSchemasVisitor.Result.SAME
? data.rowData()
: RowDataEvolver.convert(data.rowData(), data.schema(), schema);
RowData rowData = (RowData) recordConverter.convert(data.rowData());
int writerKey = hashKeyGenerator.generateKey(data, schema, spec, rowData);
String tableName = data.tableIdentifier().toString();
out.collect(
Expand Down
Loading