diff --git a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java index 03e220f10ed3..beccc8bb70bf 100644 --- a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java +++ b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java @@ -39,6 +39,7 @@ public class GenericAppenderHelper { private static final String ORC_CONFIG_PREFIX = "^orc.*"; + private static final String PARQUET_CONFIG_PATTERN = ".*parquet.*"; private final Table table; private final FileFormat fileFormat; @@ -120,6 +121,10 @@ private static DataFile appendToLocalFile( appenderFactory.setAll(conf.getValByRegex(ORC_CONFIG_PREFIX)); } + if (FileFormat.PARQUET.equals(format) && conf != null) { + appenderFactory.setAll(conf.getValByRegex(PARQUET_CONFIG_PATTERN)); + } + FileAppender appender = appenderFactory.newAppender(Files.localOutput(file), format); try (FileAppender fileAppender = appender) { fileAppender.addAll(records); diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java index f85f6277263b..179253cb3a18 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.flink.annotation.Experimental; import org.apache.flink.api.connector.source.Boundedness; @@ -58,15 +59,20 @@ import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState; import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer; import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator; +import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor; import org.apache.iceberg.flink.source.reader.IcebergSourceReader; import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics; import org.apache.iceberg.flink.source.reader.MetaDataReaderFunction; import org.apache.iceberg.flink.source.reader.ReaderFunction; import org.apache.iceberg.flink.source.reader.RowDataReaderFunction; +import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter; +import org.apache.iceberg.flink.source.reader.SplitWatermarkExtractor; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer; import org.apache.iceberg.flink.source.split.SerializableComparator; +import org.apache.iceberg.flink.source.split.SplitComparators; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +86,7 @@ public class IcebergSource implements Source readerFunction; private final SplitAssignerFactory assignerFactory; private final SerializableComparator splitComparator; + private final SerializableRecordEmitter emitter; // Can't use SerializableTable as enumerator needs a regular table // that can discover table changes @@ -91,13 +98,15 @@ public class IcebergSource implements Source readerFunction, SplitAssignerFactory assignerFactory, SerializableComparator splitComparator, - Table table) { + Table table, + SerializableRecordEmitter emitter) { this.tableLoader = tableLoader; this.scanContext = scanContext; this.readerFunction = readerFunction; this.assignerFactory = assignerFactory; this.splitComparator = splitComparator; this.table = table; + this.emitter = emitter; } String name() { @@ -152,7 +161,8 @@ public Boundedness getBoundedness() { public SourceReader createReader(SourceReaderContext readerContext) { IcebergSourceReaderMetrics metrics = new IcebergSourceReaderMetrics(readerContext.metricGroup(), lazyTable().name()); - return new IcebergSourceReader<>(metrics, readerFunction, splitComparator, readerContext); + return new IcebergSourceReader<>( + emitter, metrics, readerFunction, splitComparator, readerContext); } @Override @@ -216,6 +226,8 @@ public static class Builder { private Table table; private SplitAssignerFactory splitAssignerFactory; private SerializableComparator splitComparator; + private String watermarkColumn; + private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS; private ReaderFunction readerFunction; private ReadableConfig flinkConfig = new Configuration(); private final ScanContext.Builder contextBuilder = ScanContext.builder(); @@ -237,6 +249,9 @@ public Builder table(Table newTable) { } public Builder assignerFactory(SplitAssignerFactory assignerFactory) { + Preconditions.checkArgument( + watermarkColumn == null, + "Watermark column and SplitAssigner should not be set in the same source"); this.splitAssignerFactory = assignerFactory; return this; } @@ -429,6 +444,33 @@ public Builder setAll(Map properties) { return this; } + /** + * Emits watermarks once per split based on the min value of column statistics from files + * metadata in the given split. The generated watermarks are also used for ordering the splits + * for read. Accepted column types are timestamp/timestamptz/long. For long columns consider + * setting {@link #watermarkTimeUnit(TimeUnit)}. + * + *

Consider setting `read.split.open-file-cost` to prevent combining small files to a single + * split when the watermark is used for watermark alignment. + */ + public Builder watermarkColumn(String columnName) { + Preconditions.checkArgument( + splitAssignerFactory == null, + "Watermark column and SplitAssigner should not be set in the same source"); + this.watermarkColumn = columnName; + return this; + } + + /** + * When the type of the {@link #watermarkColumn} is {@link + * org.apache.iceberg.types.Types.LongType}, then sets the {@link TimeUnit} to convert the + * value. The default value is {@link TimeUnit#MICROSECONDS}. + */ + public Builder watermarkTimeUnit(TimeUnit timeUnit) { + this.watermarkTimeUnit = timeUnit; + return this; + } + /** @deprecated Use {@link #setAll} instead. */ @Deprecated public Builder properties(Map properties) { @@ -453,6 +495,18 @@ public IcebergSource build() { contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema)); } + SerializableRecordEmitter emitter = SerializableRecordEmitter.defaultEmitter(); + if (watermarkColumn != null) { + // Column statistics is needed for watermark generation + contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn)); + + SplitWatermarkExtractor watermarkExtractor = + new ColumnStatsWatermarkExtractor(icebergSchema, watermarkColumn, watermarkTimeUnit); + emitter = SerializableRecordEmitter.emitterWithWatermark(watermarkExtractor); + splitAssignerFactory = + new OrderedSplitAssignerFactory(SplitComparators.watermark(watermarkExtractor)); + } + ScanContext context = contextBuilder.build(); if (readerFunction == null) { if (table instanceof BaseMetadataTable) { @@ -485,8 +539,14 @@ public IcebergSource build() { checkRequired(); // Since builder already load the table, pass it to the source to avoid double loading - return new IcebergSource( - tableLoader, context, readerFunction, splitAssignerFactory, splitComparator, table); + return new IcebergSource<>( + tableLoader, + context, + readerFunction, + splitAssignerFactory, + splitComparator, + table, + emitter); } private void checkRequired() { diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java new file mode 100644 index 000000000000..4bb6f0a98c4c --- /dev/null +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.concurrent.TimeUnit; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type.TypeID; +import org.apache.iceberg.types.Types; + +/** + * {@link SplitWatermarkExtractor} implementation which uses an Iceberg timestamp column statistics + * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is emitted by the {@link + * WatermarkExtractorRecordEmitter} along with the actual records. + */ +@Internal +public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor, Serializable { + private final int eventTimeFieldId; + private final String eventTimeFieldName; + private final TimeUnit timeUnit; + + /** + * Creates the extractor. + * + * @param schema The schema of the Table + * @param eventTimeFieldName The column which should be used as an event time + * @param timeUnit Used for converting the long value to epoch milliseconds + */ + public ColumnStatsWatermarkExtractor( + Schema schema, String eventTimeFieldName, TimeUnit timeUnit) { + Types.NestedField field = schema.findField(eventTimeFieldName); + TypeID typeID = field.type().typeId(); + Preconditions.checkArgument( + typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP), + "Found %s, expected a LONG or TIMESTAMP column for watermark generation.", + typeID); + this.eventTimeFieldId = field.fieldId(); + this.eventTimeFieldName = eventTimeFieldName; + // Use the timeUnit only for Long columns. + this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit : TimeUnit.MICROSECONDS; + } + + @VisibleForTesting + ColumnStatsWatermarkExtractor(int eventTimeFieldId, String eventTimeFieldName) { + this.eventTimeFieldId = eventTimeFieldId; + this.eventTimeFieldName = eventTimeFieldName; + this.timeUnit = TimeUnit.MICROSECONDS; + } + + /** + * Get the watermark for a split using column statistics. + * + * @param split The split + * @return The watermark + * @throws IllegalArgumentException if there is no statistics for the column + */ + @Override + public long extractWatermark(IcebergSourceSplit split) { + return split.task().files().stream() + .map( + scanTask -> { + Preconditions.checkArgument( + scanTask.file().lowerBounds() != null + && scanTask.file().lowerBounds().get(eventTimeFieldId) != null, + "Missing statistics for column name = %s in file = %s", + eventTimeFieldName, + eventTimeFieldId, + scanTask.file()); + return timeUnit.toMillis( + Conversions.fromByteBuffer( + Types.LongType.get(), scanTask.file().lowerBounds().get(eventTimeFieldId))); + }) + .min(Comparator.comparingLong(l -> l)) + .get(); + } +} diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java index 8d7d68f961cb..f143b8d2df2e 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java @@ -35,13 +35,14 @@ public class IcebergSourceReader RecordAndPosition, T, IcebergSourceSplit, IcebergSourceSplit> { public IcebergSourceReader( + SerializableRecordEmitter emitter, IcebergSourceReaderMetrics metrics, ReaderFunction readerFunction, SerializableComparator splitComparator, SourceReaderContext context) { super( () -> new IcebergSourceSplitReader<>(metrics, readerFunction, splitComparator, context), - new IcebergSourceRecordEmitter<>(), + emitter, context.getConfiguration(), context); } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/SerializableRecordEmitter.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/SerializableRecordEmitter.java new file mode 100644 index 000000000000..a6e2c1dae243 --- /dev/null +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/SerializableRecordEmitter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import java.io.Serializable; +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; + +@Internal +@FunctionalInterface +public interface SerializableRecordEmitter + extends RecordEmitter, T, IcebergSourceSplit>, Serializable { + static SerializableRecordEmitter defaultEmitter() { + return (element, output, split) -> { + output.collect(element.record()); + split.updatePosition(element.fileOffset(), element.recordOffset()); + }; + } + + static SerializableRecordEmitter emitterWithWatermark(SplitWatermarkExtractor extractor) { + return new WatermarkExtractorRecordEmitter<>(extractor); + } +} diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/SplitWatermarkExtractor.java similarity index 63% rename from flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java rename to flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/SplitWatermarkExtractor.java index 337d9d3c4223..d1c50ac8ca52 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/SplitWatermarkExtractor.java @@ -18,19 +18,11 @@ */ package org.apache.iceberg.flink.source.reader; -import org.apache.flink.api.connector.source.SourceOutput; -import org.apache.flink.connector.base.source.reader.RecordEmitter; +import java.io.Serializable; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; -final class IcebergSourceRecordEmitter - implements RecordEmitter, T, IcebergSourceSplit> { - - IcebergSourceRecordEmitter() {} - - @Override - public void emitRecord( - RecordAndPosition element, SourceOutput output, IcebergSourceSplit split) { - output.collect(element.record()); - split.updatePosition(element.fileOffset(), element.recordOffset()); - } +/** The interface used to extract watermarks from splits. */ +public interface SplitWatermarkExtractor extends Serializable { + /** Get the watermark for a split. */ + long extractWatermark(IcebergSourceSplit split); } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java new file mode 100644 index 000000000000..02ef57d344b1 --- /dev/null +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Emitter which emits the watermarks, records and updates the split position. + * + *

The Emitter emits watermarks at the beginning of every split provided by the {@link + * SplitWatermarkExtractor}. + */ +class WatermarkExtractorRecordEmitter implements SerializableRecordEmitter { + private static final Logger LOG = LoggerFactory.getLogger(WatermarkExtractorRecordEmitter.class); + private final SplitWatermarkExtractor timeExtractor; + private String lastSplitId = null; + private long watermark; + + WatermarkExtractorRecordEmitter(SplitWatermarkExtractor timeExtractor) { + this.timeExtractor = timeExtractor; + } + + @Override + public void emitRecord( + RecordAndPosition element, SourceOutput output, IcebergSourceSplit split) { + if (!split.splitId().equals(lastSplitId)) { + long newWatermark = timeExtractor.extractWatermark(split); + if (newWatermark < watermark) { + LOG.info( + "Received a new split with lower watermark. Previous watermark = {}, current watermark = {}, previous split = {}, current split = {}", + watermark, + newWatermark, + lastSplitId, + split.splitId()); + } else { + watermark = newWatermark; + output.emitWatermark(new Watermark(watermark)); + LOG.debug("Watermark = {} emitted based on split = {}", watermark, lastSplitId); + } + + lastSplitId = split.splitId(); + } + + output.collect(element.record()); + split.updatePosition(element.fileOffset(), element.recordOffset()); + } +} diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java index 64e03d77debe..56ee92014d12 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.source.split; +import org.apache.iceberg.flink.source.reader.SplitWatermarkExtractor; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** @@ -45,7 +46,7 @@ public static SerializableComparator fileSequenceNumber() { o1); Preconditions.checkNotNull( seq2, - "IInvalid file sequence number: null. Doesn't support splits written with V1 format: %s", + "Invalid file sequence number: null. Doesn't support splits written with V1 format: %s", o2); int temp = Long.compare(seq1, seq2); @@ -56,4 +57,20 @@ public static SerializableComparator fileSequenceNumber() { } }; } + + /** Comparator which orders the splits based on watermark of the splits */ + public static SerializableComparator watermark( + SplitWatermarkExtractor watermarkExtractor) { + return (IcebergSourceSplit o1, IcebergSourceSplit o2) -> { + long watermark1 = watermarkExtractor.extractWatermark(o1); + long watermark2 = watermarkExtractor.extractWatermark(o2); + + int temp = Long.compare(watermark1, watermark2); + if (temp != 0) { + return temp; + } else { + return o1.splitId().compareTo(o2.splitId()); + } + }; + } } diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java index 70e7a79d8373..7d991ee603c9 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java @@ -39,6 +39,7 @@ import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.data.GenericAppenderHelper; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; @@ -97,6 +98,11 @@ protected List generateRecords(int numRecords, long seed) { return RandomGenericData.generate(schema(), numRecords, seed); } + protected void assertRecords(Table table, List expectedRecords, Duration timeout) + throws Exception { + SimpleDataUtil.assertTableRecords(table, expectedRecords, timeout); + } + @Test public void testBoundedWithTaskManagerFailover() throws Exception { testBoundedIcebergSource(FailoverType.TM); @@ -150,8 +156,7 @@ private void testBoundedIcebergSource(FailoverType failoverType) throws Exceptio RecordCounterToFail::continueProcessing, miniClusterResource.getMiniCluster()); - SimpleDataUtil.assertTableRecords( - sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); + assertRecords(sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); } @Test @@ -214,8 +219,7 @@ private void testContinuousIcebergSource(FailoverType failoverType) throws Excep // wait longer for continuous source to reduce flakiness // because CI servers tend to be overloaded. - SimpleDataUtil.assertTableRecords( - sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); + assertRecords(sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); } // ------------------------------------------------------------------------ diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java new file mode 100644 index 000000000000..f7dc931c506c --- /dev/null +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.util.StructLikeWrapper; +import org.awaitility.Awaitility; + +public class TestIcebergSourceFailoverWithWatermarkExtractor extends TestIcebergSourceFailover { + // Increment ts by 15 minutes for each generateRecords batch + private static final long RECORD_BATCH_TS_INCREMENT_MILLI = TimeUnit.MINUTES.toMillis(15); + // Within a batch, increment ts by 1 second + private static final long RECORD_TS_INCREMENT_MILLI = TimeUnit.SECONDS.toMillis(1); + + private final AtomicLong tsMilli = new AtomicLong(System.currentTimeMillis()); + + @Override + protected IcebergSource.Builder sourceBuilder() { + return IcebergSource.builder() + .tableLoader(sourceTableResource.tableLoader()) + .watermarkColumn("ts") + .project(TestFixtures.TS_SCHEMA); + } + + @Override + protected Schema schema() { + return TestFixtures.TS_SCHEMA; + } + + @Override + protected List generateRecords(int numRecords, long seed) { + // Override the ts field to create a more realistic situation for event time alignment + tsMilli.addAndGet(RECORD_BATCH_TS_INCREMENT_MILLI); + return RandomGenericData.generate(schema(), numRecords, seed).stream() + .peek( + record -> { + LocalDateTime ts = + LocalDateTime.ofInstant( + Instant.ofEpochMilli(tsMilli.addAndGet(RECORD_TS_INCREMENT_MILLI)), + ZoneId.of("Z")); + record.setField("ts", ts); + }) + .collect(Collectors.toList()); + } + + /** + * This override is needed because {@link Comparators} used by {@link StructLikeWrapper} retrieves + * Timestamp type using Long type as inner class, while the {@link RandomGenericData} generates + * {@link LocalDateTime} for {@code TimestampType.withoutZone()}. This method normalizes the + * {@link LocalDateTime} to a Long type so that Comparators can continue to work. + */ + @Override + protected void assertRecords(Table table, List expectedRecords, Duration timeout) + throws Exception { + List expectedNormalized = convertLocalDateTimeToMilli(expectedRecords); + Awaitility.await("expected list of records should be produced") + .atMost(timeout) + .untilAsserted( + () -> { + SimpleDataUtil.equalsRecords( + expectedNormalized, + convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)), + table.schema()); + SimpleDataUtil.assertRecordsEqual( + expectedNormalized, + convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)), + table.schema()); + }); + } + + private List convertLocalDateTimeToMilli(List records) { + return records.stream() + .peek( + r -> { + LocalDateTime localDateTime = ((LocalDateTime) r.getField("ts")); + r.setField("ts", localDateTime.atZone(ZoneOffset.UTC).toInstant().toEpochMilli()); + }) + .collect(Collectors.toList()); + } +} diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java new file mode 100644 index 000000000000..0bb2eb7766e9 --- /dev/null +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java @@ -0,0 +1,451 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.io.Serializable; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.InMemoryReporter; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.Collector; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestIcebergSourceWithWatermarkExtractor implements Serializable { + private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics(); + private static final int PARALLELISM = 4; + private static final String SOURCE_NAME = "IcebergSource"; + private static final int RECORD_NUM_FOR_2_SPLITS = 200; + private static final ConcurrentMap windows = Maps.newConcurrentMap(); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .setConfiguration(reporter.addToConfiguration(new Configuration())) + .withHaLeadershipControl() + .build()); + + @Rule + public final HadoopTableResource sourceTableResource = + new HadoopTableResource( + TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.TS_SCHEMA); + + /** + * This is an integration test for watermark handling and windowing. Integration testing the + * following features: + * + *

    + *
  • - Ordering of the splits + *
  • - Emitting of watermarks + *
  • - Firing windows based on watermarks + *
+ * + *

The test generates 4 splits + * + *

    + *
  • - Split 1 - Watermark 100 min + *
  • - Split 2, 3 - Watermark 0 min + *
  • - Split 4 - Watermark 6 min + *
+ * + *

Creates a source with 5 minutes tumbling window with parallelism 1 (to prevent concurrency + * issues). + * + *

Checks that windows are handled correctly based on the emitted watermarks, and splits are + * read in the following order: + * + *

    + *
  • - Split 2, 3 + *
  • - Split 4 + *
  • - Split 1 + *
+ * + *

As a result the window aggregator emits the records based on in Split 2-3, and Split 4 data. + * + *

Add 2 more splits, so the task manager close the windows for the original 4 splits and emit + * the appropriate aggregated records. + */ + @Test + public void testWindowing() throws Exception { + GenericAppenderHelper dataAppender = appender(); + List expectedRecords = Lists.newArrayList(); + + // Generate records with the following pattern: + // - File 1 - Later records (Watermark 6000000) + // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103") + // - File 2 - First records (Watermark 0) + // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - File 3 - Parallel write for the first records (Watermark 360000) + // - Split 1 - 2 records (6, "file_3-recordTs_6"), (7, "file_3-recordTs_7") + List batch = + ImmutableList.of( + generateRecord(100, "file_1-recordTs_100"), + generateRecord(101, "file_1-recordTs_101"), + generateRecord(103, "file_1-recordTs_103")); + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + batch = Lists.newArrayListWithCapacity(100); + for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) { + // Generate records where the timestamps are out of order, but still between 0-5 minutes + batch.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i)); + } + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + batch = + ImmutableList.of( + generateRecord(6, "file_3-recordTs_6"), generateRecord(7, "file_3-recordTs_7")); + expectedRecords.addAll(batch); + dataAppender.appendToTable(batch); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStream stream = + env.fromSource( + source(), + WatermarkStrategy.noWatermarks() + .withTimestampAssigner(new RowDataTimestampAssigner()), + SOURCE_NAME, + TypeInformation.of(RowData.class)); + + stream + .windowAll(TumblingEventTimeWindows.of(Time.minutes(5))) + .apply( + new AllWindowFunction() { + @Override + public void apply( + TimeWindow window, Iterable values, Collector out) { + // Emit RowData which contains the window start time, and the record count in + // that window + AtomicInteger count = new AtomicInteger(0); + values.forEach(a -> count.incrementAndGet()); + out.collect(row(window.getStart(), count.get())); + windows.put(window.getStart(), count.get()); + } + }); + + // Use static variable to collect the windows, since other solutions were flaky + windows.clear(); + env.executeAsync("Iceberg Source Windowing Test"); + + // Wait for the 2 first windows from File 2 and File 3 + Awaitility.await() + .pollInterval(Duration.ofMillis(10)) + .atMost(30, TimeUnit.SECONDS) + .until( + () -> + windows.equals( + ImmutableMap.of(0L, RECORD_NUM_FOR_2_SPLITS, TimeUnit.MINUTES.toMillis(5), 2))); + + // Write data so the windows containing test data are closed + dataAppender.appendToTable( + dataAppender.writeFile(ImmutableList.of(generateRecord(1500, "last-record")))); + + // Wait for last test record window from File 1 + Awaitility.await() + .pollInterval(Duration.ofMillis(10)) + .atMost(30, TimeUnit.SECONDS) + .until( + () -> + windows.equals( + ImmutableMap.of( + 0L, + RECORD_NUM_FOR_2_SPLITS, + TimeUnit.MINUTES.toMillis(5), + 2, + TimeUnit.MINUTES.toMillis(100), + 3))); + } + + /** + * This is an integration test for watermark handling and throttling. Integration testing the + * following: + * + *

    + *
  • - Emitting of watermarks + *
  • - Watermark alignment + *
+ * + *

The test generates 3 splits + * + *

    + *
  • - Split 1 - Watermark 100 min + *
  • - Split 2, 3 - Watermark 0 min + *
+ * + * The splits are read in the following order: + * + *
    + *
  • - Split 2, 3 (Task Manager 1, Task Manager 2) + *
  • - Split 1 (Task Manager 1 or ask Manager 2 depending on scheduling) + *
+ * + * Reading split 1 will cause the watermark alignment to pause reading for the given task manager. + * + *

The status of the watermark alignment is checked by the alignment related metrics. + * + *

Adding new records with old timestamps to the table will enable the running reader to + * continue reading the files, but the watermark alignment will still prevent the paused reader to + * continue. + * + *

After adding some records with new timestamps the blocked reader is un-paused, and both ot + * the readers continue reading. + */ + @Test + public void testThrottling() throws Exception { + GenericAppenderHelper dataAppender = appender(); + + // Generate records with the following pattern: + // - File 1 - Later records (Watermark 6000000) + // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103") + // - File 2 - First records (Watermark 0) + // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + List batch = + ImmutableList.of( + generateRecord(100, "file_1-recordTs_100"), generateRecord(103, "file_1-recordTs_103")); + dataAppender.appendToTable(batch); + + batch = Lists.newArrayListWithCapacity(100); + for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) { + batch.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i)); + } + + dataAppender.appendToTable(batch); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + DataStream stream = + env.fromSource( + source(), + WatermarkStrategy.noWatermarks() + .withWatermarkAlignment("iceberg", Duration.ofMinutes(20), Duration.ofMillis(10)), + SOURCE_NAME, + TypeInformation.of(RowData.class)); + + try (CloseableIterator resultIterator = stream.collectAsync()) { + JobClient jobClient = env.executeAsync("Iceberg Source Throttling Test"); + + // Check that the read the non-blocked data + // The first RECORD_NUM_FOR_2_SPLITS should be read + // 1 or more from the runaway reader should be arrived depending on thread scheduling + waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1); + + // Get the drift metric, wait for it to be created and reach the expected state + // (100 min - 20 min - 0 min) + // Also this validates that the WatermarkAlignment is working + Awaitility.await() + .pollInterval(Duration.ofMillis(10)) + .atMost(30, TimeUnit.SECONDS) + .until( + () -> + findAlignmentDriftMetric(jobClient.getJobID(), TimeUnit.MINUTES.toMillis(80)) + .isPresent()); + Gauge drift = + findAlignmentDriftMetric(jobClient.getJobID(), TimeUnit.MINUTES.toMillis(80)).get(); + + // Add some old records with 2 splits, so even if the blocked gets one split, the other reader + // one gets one as well + List newBatch1 = + ImmutableList.of( + generateRecord(15, "file_3-recordTs_15"), + generateRecord(16, "file_3-recordTs_16"), + generateRecord(17, "file_3-recordTs_17")); + List newBatch2 = + ImmutableList.of( + generateRecord(15, "file_4-recordTs_15"), + generateRecord(16, "file_4-recordTs_16"), + generateRecord(17, "file_4-recordTs_17")); + dataAppender.appendToTable( + dataAppender.writeFile(newBatch1), dataAppender.writeFile(newBatch2)); + // The records received will highly depend on scheduling + // We minimally get 3 records from the non-blocked reader + // We might get 1 record from the blocked reader (as part of the previous batch - + // file_1-recordTs_100) + // We might get 3 records form the non-blocked reader if it gets both new splits + waitForRecords(resultIterator, 3); + + // Get the drift metric, wait for it to be created and reach the expected state (100 min - 20 + // min - 15 min) + Awaitility.await() + .pollInterval(Duration.ofMillis(10)) + .atMost(30, TimeUnit.SECONDS) + .until(() -> drift.getValue() == TimeUnit.MINUTES.toMillis(65)); + + // Add some new records which should unblock the throttled reader + batch = + ImmutableList.of( + generateRecord(90, "file_5-recordTs_90"), generateRecord(91, "file_5-recordTs_91")); + dataAppender.appendToTable(batch); + // We should get all the records at this point + waitForRecords(resultIterator, 6); + + // Wait for the new drift to decrease below the allowed drift to signal the normal state + Awaitility.await() + .pollInterval(Duration.ofMillis(10)) + .atMost(30, TimeUnit.SECONDS) + .until(() -> drift.getValue() < TimeUnit.MINUTES.toMillis(20)); + } + } + + protected IcebergSource source() { + return IcebergSource.builder() + .tableLoader(sourceTableResource.tableLoader()) + .watermarkColumn("ts") + .project(TestFixtures.TS_SCHEMA) + .splitSize(100L) + .streaming(true) + .monitorInterval(Duration.ofMillis(2)) + .streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(); + } + + protected Record generateRecord(int minutes, String str) { + // Override the ts field to create a more realistic situation for event time alignment + Record record = GenericRecord.create(TestFixtures.TS_SCHEMA); + LocalDateTime ts = + LocalDateTime.ofInstant( + Instant.ofEpochMilli(Time.of(minutes, TimeUnit.MINUTES).toMilliseconds()), + ZoneId.of("Z")); + record.setField("ts", ts); + record.setField("str", str); + return record; + } + + protected void assertRecords( + Collection expectedRecords, CloseableIterator iterator) throws Exception { + Set expected = + expectedRecords.stream() + .map(e -> RowDataConverter.convert(TestFixtures.TS_SCHEMA, e)) + .collect(Collectors.toSet()); + Assert.assertEquals(expected, waitForRecords(iterator, expectedRecords.size())); + } + + protected Set waitForRecords(CloseableIterator iterator, int num) { + Set received = Sets.newHashSetWithExpectedSize(num); + assertThat( + CompletableFuture.supplyAsync( + () -> { + int count = 0; + while (count < num && iterator.hasNext()) { + received.add(iterator.next()); + count++; + } + + if (count < num) { + throw new IllegalStateException(String.format("Fail to get %d records.", num)); + } + + return true; + })) + .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT); + + return received; + } + + private Optional> findAlignmentDriftMetric(JobID jobID, long withValue) { + String metricsName = SOURCE_NAME + ".*" + MetricNames.WATERMARK_ALIGNMENT_DRIFT; + return reporter.findMetrics(jobID, metricsName).values().stream() + .map(m -> (Gauge) m) + .filter(m -> m.getValue() == withValue) + .findFirst(); + } + + private GenericAppenderHelper appender() { + // We need to create multiple splits, so we need to generate parquet files with multiple offsets + org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); + hadoopConf.set("write.parquet.page-size-bytes", "64"); + hadoopConf.set("write.parquet.row-group-size-bytes", "64"); + return new GenericAppenderHelper( + sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER, hadoopConf); + } + + private static RowData row(long time, long count) { + GenericRowData result = new GenericRowData(2); + result.setField(0, time); + result.setField(1, String.valueOf(count)); + return result; + } + + private static class RowDataTimestampAssigner implements SerializableTimestampAssigner { + @Override + public long extractTimestamp(RowData element, long recordTimestamp) { + return element.getTimestamp(0, 0).getMillisecond(); + } + } +} diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java index f28677ca9d6a..090b304942c6 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java @@ -43,15 +43,13 @@ public void testEmptyInitialization() { @Test public void testStaticEnumeratorSequence() throws Exception { SplitAssigner assigner = splitAssigner(); - assigner.onDiscoveredSplits( - SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4, 1)); + assigner.onDiscoveredSplits(createSplits(4, 1, "1")); assertGetNext(assigner, GetSplitResult.Status.AVAILABLE); assertGetNext(assigner, GetSplitResult.Status.AVAILABLE); assertGetNext(assigner, GetSplitResult.Status.AVAILABLE); assertSnapshot(assigner, 1); - assigner.onUnassignedSplits( - SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1)); + assigner.onUnassignedSplits(createSplits(1, 1, "1")); assertSnapshot(assigner, 2); assertGetNext(assigner, GetSplitResult.Status.AVAILABLE); @@ -66,15 +64,12 @@ public void testContinuousEnumeratorSequence() throws Exception { SplitAssigner assigner = splitAssigner(); assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE); - List splits1 = - SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1); + List splits1 = createSplits(1, 1, "1"); assertAvailableFuture(assigner, 1, () -> assigner.onDiscoveredSplits(splits1)); - List splits2 = - SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1); + List splits2 = createSplits(1, 1, "1"); assertAvailableFuture(assigner, 1, () -> assigner.onUnassignedSplits(splits2)); - assigner.onDiscoveredSplits( - SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 2, 1)); + assigner.onDiscoveredSplits(createSplits(2, 1, "1")); assertSnapshot(assigner, 2); assertGetNext(assigner, GetSplitResult.Status.AVAILABLE); assertGetNext(assigner, GetSplitResult.Status.AVAILABLE); @@ -125,5 +120,11 @@ protected void assertSnapshot(SplitAssigner assigner, int splitCount) { Assert.assertEquals(splitCount, stateBeforeGet.size()); } + protected List createSplits(int fileCount, int filesPerSplit, String version) + throws Exception { + return SplitHelpers.createSplitsFromTransientHadoopTable( + TEMPORARY_FOLDER, fileCount, filesPerSplit, version); + } + protected abstract SplitAssigner splitAssigner(); } diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java index 8b9e132e0e22..e78634e6b873 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java @@ -20,7 +20,6 @@ import java.util.List; import org.apache.iceberg.ContentFile; -import org.apache.iceberg.flink.source.SplitHelpers; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; import org.apache.iceberg.flink.source.split.SerializableComparator; import org.apache.iceberg.flink.source.split.SplitComparators; @@ -40,9 +39,7 @@ protected SplitAssigner splitAssigner() { public void testMultipleFilesInAnIcebergSplit() { SplitAssigner assigner = splitAssigner(); Assertions.assertThatThrownBy( - () -> - assigner.onDiscoveredSplits( - SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4, 2, "2")), + () -> assigner.onDiscoveredSplits(createSplits(4, 2, "2")), "Multiple files in a split is not allowed") .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("Please use 'split-open-file-cost'"); @@ -52,8 +49,7 @@ public void testMultipleFilesInAnIcebergSplit() { @Test public void testSplitSort() throws Exception { SplitAssigner assigner = splitAssigner(); - List splits = - SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 5, 1, "2"); + List splits = createSplits(5, 1, "2"); assigner.onDiscoveredSplits(splits.subList(3, 5)); assigner.onDiscoveredSplits(splits.subList(0, 1)); @@ -76,7 +72,7 @@ public void testSerializable() { Assert.assertNotNull(comparator); } - protected void assertGetNext(SplitAssigner assigner, Long expectedSequenceNumber) { + private void assertGetNext(SplitAssigner assigner, Long expectedSequenceNumber) { GetSplitResult result = assigner.getNext(null); ContentFile file = result.split().task().files().iterator().next().file(); Assert.assertEquals(expectedSequenceNumber, file.fileSequenceNumber()); diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java new file mode 100644 index 000000000000..e1fc63fda918 --- /dev/null +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.assigner; + +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.IOException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor; +import org.apache.iceberg.flink.source.reader.ReaderUtil; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.SerializableComparator; +import org.apache.iceberg.flink.source.split.SplitComparators; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.SerializationUtil; +import org.junit.Assert; +import org.junit.Test; + +public class TestWatermarkBasedSplitAssigner extends SplitAssignerTestBase { + public static final Schema SCHEMA = + new Schema(required(1, "timestamp_column", Types.TimestampType.withoutZone())); + private static final GenericAppenderFactory APPENDER_FACTORY = new GenericAppenderFactory(SCHEMA); + + @Override + protected SplitAssigner splitAssigner() { + return new OrderedSplitAssignerFactory( + SplitComparators.watermark( + new ColumnStatsWatermarkExtractor(SCHEMA, "timestamp_column", null))) + .createAssigner(); + } + + /** Test the assigner when multiple files are in a single split */ + @Test + public void testMultipleFilesInAnIcebergSplit() { + SplitAssigner assigner = splitAssigner(); + assigner.onDiscoveredSplits(createSplits(4, 2, "2")); + + assertGetNext(assigner, GetSplitResult.Status.AVAILABLE); + assertGetNext(assigner, GetSplitResult.Status.AVAILABLE); + assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE); + } + + /** Test sorted splits */ + @Test + public void testSplitSort() { + SplitAssigner assigner = splitAssigner(); + + Instant now = Instant.now(); + List splits = + IntStream.range(0, 5) + .mapToObj(i -> splitFromInstant(now.plus(i, ChronoUnit.MINUTES))) + .collect(Collectors.toList()); + + assigner.onDiscoveredSplits(splits.subList(3, 5)); + assigner.onDiscoveredSplits(splits.subList(0, 1)); + assigner.onDiscoveredSplits(splits.subList(1, 3)); + + assertGetNext(assigner, splits.get(0)); + assertGetNext(assigner, splits.get(1)); + assertGetNext(assigner, splits.get(2)); + assertGetNext(assigner, splits.get(3)); + assertGetNext(assigner, splits.get(4)); + + assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE); + } + + @Test + public void testSerializable() { + byte[] bytes = + SerializationUtil.serializeToBytes( + SplitComparators.watermark( + new ColumnStatsWatermarkExtractor( + TestFixtures.SCHEMA, "id", TimeUnit.MILLISECONDS))); + SerializableComparator comparator = + SerializationUtil.deserializeFromBytes(bytes); + Assert.assertNotNull(comparator); + } + + private void assertGetNext(SplitAssigner assigner, IcebergSourceSplit split) { + GetSplitResult result = assigner.getNext(null); + Assert.assertEquals(result.split(), split); + } + + @Override + protected List createSplits( + int fileCount, int filesPerSplit, String version) { + return IntStream.range(0, fileCount / filesPerSplit) + .mapToObj( + splitNum -> + splitFromRecords( + IntStream.range(0, filesPerSplit) + .mapToObj( + fileNum -> + RandomGenericData.generate( + SCHEMA, 2, splitNum * filesPerSplit + fileNum)) + .collect(Collectors.toList()))) + .collect(Collectors.toList()); + } + + private IcebergSourceSplit splitFromInstant(Instant instant) { + Record record = GenericRecord.create(SCHEMA); + record.set(0, LocalDateTime.ofInstant(instant, ZoneOffset.UTC)); + return splitFromRecords(ImmutableList.of(ImmutableList.of(record))); + } + + private IcebergSourceSplit splitFromRecords(List> records) { + try { + return IcebergSourceSplit.fromCombinedScanTask( + ReaderUtil.createCombinedScanTask( + records, TEMPORARY_FOLDER, FileFormat.PARQUET, APPENDER_FACTORY)); + } catch (IOException e) { + throw new RuntimeException("Split creation exception", e); + } + } +} diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java index f9ceaf842263..2a2503ef2478 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java @@ -60,9 +60,12 @@ public static FileScanTask createFileTask( FileFormat fileFormat, FileAppenderFactory appenderFactory) throws IOException { - try (FileAppender appender = - appenderFactory.newAppender(Files.localOutput(file), fileFormat)) { + FileAppender appender = + appenderFactory.newAppender(Files.localOutput(file), fileFormat); + try { appender.addAll(records); + } finally { + appender.close(); } DataFile dataFile = @@ -71,6 +74,7 @@ public static FileScanTask createFileTask( .withFileSizeInBytes(file.length()) .withPath(file.toString()) .withFormat(fileFormat) + .withMetrics(appender.metrics()) .build(); ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(Expressions.alwaysTrue()); diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java new file mode 100644 index 000000000000..afe8a5d0152c --- /dev/null +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestColumnStatsWatermarkExtractor { + public static final Schema SCHEMA = + new Schema( + required(1, "timestamp_column", Types.TimestampType.withoutZone()), + required(2, "timestamptz_column", Types.TimestampType.withZone()), + required(3, "long_column", Types.LongType.get()), + required(4, "string_column", Types.StringType.get())); + + private static final GenericAppenderFactory APPENDER_FACTORY = new GenericAppenderFactory(SCHEMA); + + private static final List> TEST_RECORDS = + ImmutableList.of( + RandomGenericData.generate(SCHEMA, 3, 2L), RandomGenericData.generate(SCHEMA, 3, 19L)); + + private static final List> MIN_VALUES = + ImmutableList.of(Maps.newHashMapWithExpectedSize(3), Maps.newHashMapWithExpectedSize(3)); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final HadoopTableResource sourceTableResource = + new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, SCHEMA); + + private final String columnName; + + @BeforeClass + public static void updateMinValue() { + for (int i = 0; i < TEST_RECORDS.size(); ++i) { + for (Record r : TEST_RECORDS.get(i)) { + Map minValues = MIN_VALUES.get(i); + + LocalDateTime localDateTime = (LocalDateTime) r.get(0); + minValues.merge( + "timestamp_column", localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli(), Math::min); + + OffsetDateTime offsetDateTime = (OffsetDateTime) r.get(1); + minValues.merge("timestamptz_column", offsetDateTime.toInstant().toEpochMilli(), Math::min); + + minValues.merge("long_column", (Long) r.get(2), Math::min); + } + } + } + + @Parameterized.Parameters(name = "{0}") + public static Collection data() { + return ImmutableList.of( + new Object[] {"timestamp_column"}, + new Object[] {"timestamptz_column"}, + new Object[] {"long_column"}); + } + + public TestColumnStatsWatermarkExtractor(String columnName) { + this.columnName = columnName; + } + + @Test + public void testSingle() throws IOException { + ColumnStatsWatermarkExtractor extractor = + new ColumnStatsWatermarkExtractor(SCHEMA, columnName, TimeUnit.MILLISECONDS); + + Assert.assertEquals( + MIN_VALUES.get(0).get(columnName).longValue(), extractor.extractWatermark(split(0))); + } + + @Test + public void testTimeUnit() throws IOException { + Assume.assumeTrue("Run only for long column", columnName.equals("long_column")); + ColumnStatsWatermarkExtractor extractor = + new ColumnStatsWatermarkExtractor(SCHEMA, columnName, TimeUnit.MICROSECONDS); + + Assert.assertEquals( + MIN_VALUES.get(0).get(columnName).longValue() / 1000L, + extractor.extractWatermark(split(0))); + } + + @Test + public void testMultipleFiles() throws IOException { + Assume.assumeTrue("Run only for the timestamp column", columnName.equals("timestamp_column")); + IcebergSourceSplit combinedSplit = + IcebergSourceSplit.fromCombinedScanTask( + ReaderUtil.createCombinedScanTask( + TEST_RECORDS, TEMPORARY_FOLDER, FileFormat.PARQUET, APPENDER_FACTORY)); + + ColumnStatsWatermarkExtractor extractor = + new ColumnStatsWatermarkExtractor(SCHEMA, columnName, null); + + Assert.assertEquals( + MIN_VALUES.get(0).get(columnName).longValue(), extractor.extractWatermark(split(0))); + Assert.assertEquals( + MIN_VALUES.get(1).get(columnName).longValue(), extractor.extractWatermark(split(1))); + Assert.assertEquals( + Math.min(MIN_VALUES.get(0).get(columnName), MIN_VALUES.get(1).get(columnName)), + extractor.extractWatermark(combinedSplit)); + } + + @Test + public void testWrongColumn() { + Assume.assumeTrue("Run only for string column", columnName.equals("string_column")); + Assertions.assertThatThrownBy(() -> new ColumnStatsWatermarkExtractor(SCHEMA, columnName, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Found STRING, expected a LONG or TIMESTAMP column for watermark generation."); + } + + @Test + public void testEmptyStatistics() throws IOException { + Assume.assumeTrue("Run only for timestamp column", columnName.equals("timestamp_column")); + + // Create an extractor for a column we do not have statistics + ColumnStatsWatermarkExtractor extractor = + new ColumnStatsWatermarkExtractor(10, "missing_field"); + Assertions.assertThatThrownBy(() -> extractor.extractWatermark(split(0))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Missing statistics for column"); + } + + private IcebergSourceSplit split(int id) throws IOException { + return IcebergSourceSplit.fromCombinedScanTask( + ReaderUtil.createCombinedScanTask( + ImmutableList.of(TEST_RECORDS.get(id)), + TEMPORARY_FOLDER, + FileFormat.PARQUET, + APPENDER_FACTORY)); + } +} diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java index def4f436851b..88234c61123f 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java @@ -167,7 +167,12 @@ private IcebergSourceReader createReader( new HadoopFileIO(new org.apache.hadoop.conf.Configuration()), new PlaintextEncryptionManager(), Collections.emptyList()); - return new IcebergSourceReader<>(readerMetrics, readerFunction, splitComparator, readerContext); + return new IcebergSourceReader<>( + SerializableRecordEmitter.defaultEmitter(), + readerMetrics, + readerFunction, + splitComparator, + readerContext); } private static class IdBasedComparator implements SerializableComparator {