From feed5fd6abab65586c92c59f8d24e5f15c4a7959 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Tue, 28 Nov 2023 11:32:35 +0100 Subject: [PATCH 1/3] Flink: Document watermark generation feature --- docs/flink-queries.md | 52 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/docs/flink-queries.md b/docs/flink-queries.md index 4cef5468cd1a..7e849aba10db 100644 --- a/docs/flink-queries.md +++ b/docs/flink-queries.md @@ -277,6 +277,58 @@ DataStream stream = env.fromSource(source, WatermarkStrategy.noWatermarks() "Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema)); ``` +### Emitting watermarks +Emitting watermarks from the source itself could be beneficial for several purposes, like harnessing the +[Flink Watermark Alignment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment) +feature to prevent runaway readers, or providing triggers for [Flink windowing](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/). + +Enable watermark generation for an `IcebergSource` by setting the `watermarkColumn`. +The supported column types are `timestamp`, `timestamptz` and `long`. +Timestamp columns are automatically converted to milliseconds since the Java epoch of +1970-01-01T00:00:00Z. Use `watermarkTimeUnit` to configure the conversion for long columns. + +The watermarks are generated based on column metrics stored for data files and emitted once per split. +When using watermarks for Flink watermark alignment set `read.split.open-file-cost` to prevent +combining multiple files to a single split. +By default, the column metrics are collected for the first 100 columns of the table. Use [write properties](configuration.md#write-properties) starting with `write.metadata.metrics` when needed. + +```java +StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); +TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path"); + +// For windowing +DataStream stream = + env.fromSource( + IcebergSource.forRowData() + .tableLoader(tableLoader) + // Watermark using timestamp column + .watermarkColumn("timestamp_column") + .build(), + // Watermarks are generated by the source, no need to generate it manually + WatermarkStrategy.noWatermarks() + // Extract event timestamp from records + .withTimestampAssigner((record, eventTime) -> record.getTimestamp(pos, precision).getMillisecond()), + SOURCE_NAME, + TypeInformation.of(RowData.class)); + +// For watermark alignment +DataStream stream = + env.fromSource( + IcebergSource source = IcebergSource.forRowData() + .tableLoader(tableLoader) + // Disable combining multiple files to a single split + .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, String.valueOf(TableProperties.SPLIT_SIZE_DEFAULT)) + // Watermark using long column + .watermarkColumn("long_column") + .watermarkTimeUnit(TimeUnit.MILLI_SCALE) + .build(), + // Watermarks are generated by the source, no need to generate it manually + WatermarkStrategy.noWatermarks() + .withWatermarkAlignment(watermarkGroup, maxAllowedWatermarkDrift), + SOURCE_NAME, + TypeInformation.of(RowData.class)); +``` + ## Options ### Read options From f2654b2bd9b9c62e266b0dc1d59b58950206d9d9 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Thu, 30 Nov 2023 13:24:29 +0100 Subject: [PATCH 2/3] Review comments --- docs/flink-queries.md | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/docs/flink-queries.md b/docs/flink-queries.md index 7e849aba10db..71a599bdc679 100644 --- a/docs/flink-queries.md +++ b/docs/flink-queries.md @@ -279,24 +279,32 @@ DataStream stream = env.fromSource(source, WatermarkStrategy.noWatermarks() ### Emitting watermarks Emitting watermarks from the source itself could be beneficial for several purposes, like harnessing the -[Flink Watermark Alignment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment) -feature to prevent runaway readers, or providing triggers for [Flink windowing](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/). +[Flink Watermark Alignment](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment), +or prevent triggering [windows](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/windows/) +too early when reading multiple data files concurrently. Enable watermark generation for an `IcebergSource` by setting the `watermarkColumn`. The supported column types are `timestamp`, `timestamptz` and `long`. -Timestamp columns are automatically converted to milliseconds since the Java epoch of -1970-01-01T00:00:00Z. Use `watermarkTimeUnit` to configure the conversion for long columns. +Iceberg `timestamp` or `timestamptz` inherently contains the time precision. So there is no need +to specify the time unit. But `long` type column doesn't contain time unit information. Use +`watermarkTimeUnit` to configure the conversion for long columns. The watermarks are generated based on column metrics stored for data files and emitted once per split. -When using watermarks for Flink watermark alignment set `read.split.open-file-cost` to prevent -combining multiple files to a single split. -By default, the column metrics are collected for the first 100 columns of the table. Use [write properties](configuration.md#write-properties) starting with `write.metadata.metrics` when needed. +If multiple smaller files with different time ranges are combined into a single split, it can increase +the out-of-orderliness and extra data buffering in the Flink state. The main purpose of watermark alignment +is to reduce out-of-orderliness and excess data buffering in the Flink state. Hence it is recommended to +set `read.split.open-file-cost` to a very large value to prevent combining multiple smaller files into a +single split. Do not forget to consider the additional memory and CPU load caused by having multiple +splits in this case. + +By default, the column metrics are collected for the first 100 columns of the table. +Use [write properties](configuration.md#write-properties) starting with `write.metadata.metrics` when needed. ```java StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path"); -// For windowing +// Ordered data file reads with windowing, using a timestamp column DataStream stream = env.fromSource( IcebergSource.forRowData() @@ -311,7 +319,7 @@ DataStream stream = SOURCE_NAME, TypeInformation.of(RowData.class)); -// For watermark alignment +// Watermark alignment, using a long event time column DataStream stream = env.fromSource( IcebergSource source = IcebergSource.forRowData() From 8a600788be04c880cb0dc7201c36eb2fbdd03e5d Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Mon, 4 Dec 2023 13:01:29 +0100 Subject: [PATCH 3/3] Stats and examples --- docs/flink-queries.md | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/docs/flink-queries.md b/docs/flink-queries.md index 71a599bdc679..cf68fa367c21 100644 --- a/docs/flink-queries.md +++ b/docs/flink-queries.md @@ -294,17 +294,21 @@ If multiple smaller files with different time ranges are combined into a single the out-of-orderliness and extra data buffering in the Flink state. The main purpose of watermark alignment is to reduce out-of-orderliness and excess data buffering in the Flink state. Hence it is recommended to set `read.split.open-file-cost` to a very large value to prevent combining multiple smaller files into a -single split. Do not forget to consider the additional memory and CPU load caused by having multiple -splits in this case. +single split. The negative impact (of not combining small files into a single split) is on read throughput, +especially if there are many small files. In typical stateful processing jobs, source read throughput is not +the bottleneck. Hence this is probably a reasonable tradeoff. -By default, the column metrics are collected for the first 100 columns of the table. -Use [write properties](configuration.md#write-properties) starting with `write.metadata.metrics` when needed. +This feature requires column-level min-max stats. Make sure stats are generated for the watermark column +during write phase. By default, the column metrics are collected for the first 100 columns of the table. +If watermark column doesn't have stats enabled by default, use +[write properties](configuration.md#write-properties) starting with `write.metadata.metrics` when needed. +The following example could be useful if watermarks are used for windowing. The source reads Iceberg data files +in order, using a timestamp column and emits watermarks: ```java StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path"); -// Ordered data file reads with windowing, using a timestamp column DataStream stream = env.fromSource( IcebergSource.forRowData() @@ -318,8 +322,13 @@ DataStream stream = .withTimestampAssigner((record, eventTime) -> record.getTimestamp(pos, precision).getMillisecond()), SOURCE_NAME, TypeInformation.of(RowData.class)); +``` + +Example for reading Iceberg table using a long event column for watermark alignment: +```java +StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); +TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path"); -// Watermark alignment, using a long event time column DataStream stream = env.fromSource( IcebergSource source = IcebergSource.forRowData()