From 136cce774303e86b2dc60263ac3de4c99b11229c Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Fri, 17 Jan 2025 16:46:52 +0800 Subject: [PATCH] [flink] Introduce scan bounded to force bounded in streaming job --- .../flink_connector_configuration.html | 6 +++++ .../paimon/flink/source/DataTableSource.java | 10 ++++----- paimon-flink/paimon-flink-common/pom.xml | 4 ++++ .../flink/AbstractFlinkTableFactory.java | 22 +++++++++++++------ .../paimon/flink/FlinkConnectorOptions.java | 8 +++++++ .../flink/source/BaseDataTableSource.java | 18 +++++++-------- .../paimon/flink/source/DataTableSource.java | 16 +++++++------- .../paimon/flink/source/FlinkTableSource.java | 6 ++--- .../flink/source/SystemTableSource.java | 19 ++++++++-------- .../paimon/flink/BatchFileStoreITCase.java | 16 ++++++++++++++ 10 files changed, 83 insertions(+), 42 deletions(-) diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index 23bb827aef33..078831af0694 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -110,6 +110,12 @@ Boolean If true, it will add a compact coordinator and worker operator after the writer operator,in order to compact several changelog files (for primary key tables) or newly created data files (for unaware bucket tables) from the same partition into large ones, which can decrease the number of small files. + +
scan.bounded
+ (none) + Boolean + Bounded mode for Paimon consumer. By default, Paimon automatically selects bounded mode based on the mode of the Flink job. +
scan.infer-parallelism
true diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java index f41f8da6c820..629be7f32e2e 100644 --- a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java +++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java @@ -38,13 +38,13 @@ public class DataTableSource extends BaseDataTableSource { public DataTableSource( ObjectIdentifier tableIdentifier, Table table, - boolean streaming, + boolean unbounded, DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory) { this( tableIdentifier, table, - streaming, + unbounded, context, logStoreTableFactory, null, @@ -57,7 +57,7 @@ public DataTableSource( public DataTableSource( ObjectIdentifier tableIdentifier, Table table, - boolean streaming, + boolean unbounded, DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory, @Nullable Predicate predicate, @@ -68,7 +68,7 @@ public DataTableSource( super( tableIdentifier, table, - streaming, + unbounded, context, logStoreTableFactory, predicate, @@ -83,7 +83,7 @@ public DataTableSource copy() { return new DataTableSource( tableIdentifier, table, - streaming, + unbounded, context, logStoreTableFactory, predicate, diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index a02b6afb6e8c..5716ab90d075 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -125,6 +125,10 @@ under the License. log4j log4j + + org.apache.logging.log4j + log4j-slf4j-impl + diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java index 6b10dbb84bf4..5cbdf1abc19b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java @@ -67,6 +67,7 @@ import java.util.Set; import java.util.regex.Pattern; +import static java.lang.Boolean.parseBoolean; import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE; import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY; import static org.apache.paimon.CoreOptions.SCAN_MODE; @@ -75,6 +76,7 @@ import static org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT_FULL; import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM; import static org.apache.paimon.flink.FlinkConnectorOptions.NONE; +import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_BOUNDED; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; import static org.apache.paimon.flink.log.LogStoreTableFactory.discoverLogStoreFactory; @@ -93,19 +95,25 @@ public AbstractFlinkTableFactory(@Nullable FlinkCatalog flinkCatalog) { @Override public DynamicTableSource createDynamicTableSource(Context context) { CatalogTable origin = context.getCatalogTable().getOrigin(); - boolean isStreamingMode = + Table table = + origin instanceof SystemCatalogTable + ? ((SystemCatalogTable) origin).table() + : buildPaimonTable(context); + boolean unbounded = context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; + Map options = table.options(); + if (options.containsKey(SCAN_BOUNDED.key()) + && parseBoolean(options.get(SCAN_BOUNDED.key()))) { + unbounded = false; + } if (origin instanceof SystemCatalogTable) { - return new SystemTableSource( - ((SystemCatalogTable) origin).table(), - isStreamingMode, - context.getObjectIdentifier()); + return new SystemTableSource(table, unbounded, context.getObjectIdentifier()); } else { return new DataTableSource( context.getObjectIdentifier(), - buildPaimonTable(context), - isStreamingMode, + table, + unbounded, context, createOptionalLogStoreFactory(context).orElse(null)); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 0105202b9d76..49ee9092e2b4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -444,6 +444,14 @@ public class FlinkConnectorOptions { + "${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will " + "automatically generate the operator uid, which may be incompatible when the topology changes."); + public static final ConfigOption SCAN_BOUNDED = + key("scan.bounded") + .booleanType() + .noDefaultValue() + .withDescription( + "Bounded mode for Paimon consumer. " + + "By default, Paimon automatically selects bounded mode based on the mode of the Flink job."); + public static List> getOptions() { final Field[] fields = FlinkConnectorOptions.class.getFields(); final List> list = new ArrayList<>(fields.length); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java index a94d799773bc..836c1372f70c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java @@ -94,7 +94,7 @@ public abstract class BaseDataTableSource extends FlinkTableSource CoreOptions.SCAN_VERSION); protected final ObjectIdentifier tableIdentifier; - protected final boolean streaming; + protected final boolean unbounded; protected final DynamicTableFactory.Context context; @Nullable protected final LogStoreTableFactory logStoreTableFactory; @@ -104,7 +104,7 @@ public abstract class BaseDataTableSource extends FlinkTableSource public BaseDataTableSource( ObjectIdentifier tableIdentifier, Table table, - boolean streaming, + boolean unbounded, DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory, @Nullable Predicate predicate, @@ -114,7 +114,7 @@ public BaseDataTableSource( @Nullable Long countPushed) { super(table, predicate, projectFields, limit); this.tableIdentifier = tableIdentifier; - this.streaming = streaming; + this.unbounded = unbounded; this.context = context; this.logStoreTableFactory = logStoreTableFactory; this.predicate = predicate; @@ -126,7 +126,7 @@ public BaseDataTableSource( @Override public ChangelogMode getChangelogMode() { - if (!streaming) { + if (!unbounded) { // batch merge all, return insert only return ChangelogMode.insertOnly(); } @@ -195,7 +195,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { FlinkSourceBuilder sourceBuilder = new FlinkSourceBuilder(table) .sourceName(tableIdentifier.asSummaryString()) - .sourceBounded(!streaming) + .sourceBounded(!unbounded) .logSourceProvider(logSourceProvider) .projection(projectFields) .predicate(predicate) @@ -204,7 +204,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .dynamicPartitionFilteringFields(dynamicPartitionFilteringFields()); return new PaimonDataStreamScanProvider( - !streaming, + !unbounded, env -> sourceBuilder .sourceParallelism(inferSourceParallelism(env)) @@ -294,7 +294,7 @@ public boolean applyAggregates( List groupingSets, List aggregateExpressions, DataType producedDataType) { - if (isStreaming()) { + if (isUnbounded()) { return false; } @@ -349,7 +349,7 @@ public String asSummaryString() { } @Override - public boolean isStreaming() { - return streaming; + public boolean isUnbounded() { + return unbounded; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java index 2b470cb4383a..eab0432c79b2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java @@ -56,13 +56,13 @@ public class DataTableSource extends BaseDataTableSource public DataTableSource( ObjectIdentifier tableIdentifier, Table table, - boolean streaming, + boolean unbounded, DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory) { this( tableIdentifier, table, - streaming, + unbounded, context, logStoreTableFactory, null, @@ -76,7 +76,7 @@ public DataTableSource( public DataTableSource( ObjectIdentifier tableIdentifier, Table table, - boolean streaming, + boolean unbounded, DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory, @Nullable Predicate predicate, @@ -88,7 +88,7 @@ public DataTableSource( super( tableIdentifier, table, - streaming, + unbounded, context, logStoreTableFactory, predicate, @@ -104,7 +104,7 @@ public DataTableSource copy() { return new DataTableSource( tableIdentifier, table, - streaming, + unbounded, context, logStoreTableFactory, predicate, @@ -117,7 +117,7 @@ public DataTableSource copy() { @Override public TableStats reportStatistics() { - if (streaming) { + if (unbounded) { return TableStats.UNKNOWN; } Optional optionStatistics = table.statistics(); @@ -142,13 +142,13 @@ public TableStats reportStatistics() { @Override public List listAcceptedFilterFields() { // note that streaming query doesn't support dynamic filtering - return streaming ? Collections.emptyList() : table.partitionKeys(); + return unbounded ? Collections.emptyList() : table.partitionKeys(); } @Override public void applyDynamicFiltering(List candidateFilterFields) { checkState( - !streaming, + !unbounded, "Cannot apply dynamic filtering to Paimon table '%s' when streaming reading.", table.name()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java index 741754fc3376..7459351c5e2a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java @@ -108,7 +108,7 @@ public Result applyFilters(List filters) { unConsumedFilters.add(filter); } else { Predicate p = predicateOptional.get(); - if (isStreaming() || !p.visit(onlyPartFieldsVisitor)) { + if (isUnbounded() || !p.visit(onlyPartFieldsVisitor)) { unConsumedFilters.add(filter); } else { consumedFilters.add(filter); @@ -137,7 +137,7 @@ public void applyLimit(long limit) { this.limit = limit; } - public abstract boolean isStreaming(); + public abstract boolean isUnbounded(); @Nullable protected Integer inferSourceParallelism(StreamExecutionEnvironment env) { @@ -150,7 +150,7 @@ protected Integer inferSourceParallelism(StreamExecutionEnvironment env) { } Integer parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM); if (parallelism == null && options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) { - if (isStreaming()) { + if (isUnbounded()) { parallelism = Math.max(1, options.get(CoreOptions.BUCKET)); } else { scanSplitsForInference(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java index 5198bd42136b..4086d296320d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java @@ -42,15 +42,14 @@ /** A {@link FlinkTableSource} for system table. */ public class SystemTableSource extends FlinkTableSource { - private final boolean isStreamingMode; + private final boolean unbounded; private final int splitBatchSize; private final FlinkConnectorOptions.SplitAssignMode splitAssignMode; private final ObjectIdentifier tableIdentifier; - public SystemTableSource( - Table table, boolean isStreamingMode, ObjectIdentifier tableIdentifier) { + public SystemTableSource(Table table, boolean unbounded, ObjectIdentifier tableIdentifier) { super(table); - this.isStreamingMode = isStreamingMode; + this.unbounded = unbounded; Options options = Options.fromMap(table.options()); this.splitBatchSize = options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE); this.splitAssignMode = options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE); @@ -59,7 +58,7 @@ public SystemTableSource( public SystemTableSource( Table table, - boolean isStreamingMode, + boolean unbounded, @Nullable Predicate predicate, @Nullable int[][] projectFields, @Nullable Long limit, @@ -67,7 +66,7 @@ public SystemTableSource( FlinkConnectorOptions.SplitAssignMode splitAssignMode, ObjectIdentifier tableIdentifier) { super(table, predicate, projectFields, limit); - this.isStreamingMode = isStreamingMode; + this.unbounded = unbounded; this.splitBatchSize = splitBatchSize; this.splitAssignMode = splitAssignMode; this.tableIdentifier = tableIdentifier; @@ -96,7 +95,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { } readBuilder.withFilter(predicate); - if (isStreamingMode && table instanceof DataTable) { + if (unbounded && table instanceof DataTable) { source = new ContinuousFileStoreSource( readBuilder, table.options(), limit, BucketMode.HASH_FIXED, rowData); @@ -125,7 +124,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { public SystemTableSource copy() { return new SystemTableSource( table, - isStreamingMode, + unbounded, predicate, projectFields, limit, @@ -140,7 +139,7 @@ public String asSummaryString() { } @Override - public boolean isStreaming() { - return isStreamingMode; + public boolean isUnbounded() { + return unbounded; } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index d3108e374914..ee24dc8ef352 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -25,6 +25,8 @@ import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.SnapshotNotExistException; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; + import org.apache.flink.api.dag.Transformation; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; @@ -636,6 +638,20 @@ public void testParquetRowDecimalAndTimestamp() { Row.of(Row.of(DateTimeUtils.toLocalDateTime("2024-11-13 18:00:00", 0)))); } + @Test + public void testScanBounded() { + sql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)"); + List result; + try (CloseableIterator iter = + sEnv.executeSql("SELECT * FROM T /*+ OPTIONS('scan.bounded'='true') */") + .collect()) { + result = ImmutableList.copyOf(iter); + } catch (Exception e) { + throw new RuntimeException(e); + } + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222)); + } + private void validateCount1PushDown(String sql) { Transformation transformation = AbstractTestBase.translate(tEnv, sql); while (!transformation.getInputs().isEmpty()) {