Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
<td>Boolean</td>
<td>If true, it will add a compact coordinator and worker operator after the writer operator,in order to compact several changelog files (for primary key tables) or newly created data files (for unaware bucket tables) from the same partition into large ones, which can decrease the number of small files. </td>
</tr>
<tr>
<td><h5>scan.bounded</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Boolean</td>
<td>Bounded mode for Paimon consumer. By default, Paimon automatically selects bounded mode based on the mode of the Flink job.</td>
</tr>
<tr>
<td><h5>scan.infer-parallelism</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ public class DataTableSource extends BaseDataTableSource {
public DataTableSource(
ObjectIdentifier tableIdentifier,
Table table,
boolean streaming,
boolean unbounded,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory) {
this(
tableIdentifier,
table,
streaming,
unbounded,
context,
logStoreTableFactory,
null,
Expand All @@ -57,7 +57,7 @@ public DataTableSource(
public DataTableSource(
ObjectIdentifier tableIdentifier,
Table table,
boolean streaming,
boolean unbounded,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory,
@Nullable Predicate predicate,
Expand All @@ -68,7 +68,7 @@ public DataTableSource(
super(
tableIdentifier,
table,
streaming,
unbounded,
context,
logStoreTableFactory,
predicate,
Expand All @@ -83,7 +83,7 @@ public DataTableSource copy() {
return new DataTableSource(
tableIdentifier,
table,
streaming,
unbounded,
context,
logStoreTableFactory,
predicate,
Expand Down
4 changes: 4 additions & 0 deletions paimon-flink/paimon-flink-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ under the License.
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.Set;
import java.util.regex.Pattern;

import static java.lang.Boolean.parseBoolean;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
import static org.apache.paimon.CoreOptions.SCAN_MODE;
Expand All @@ -75,6 +76,7 @@
import static org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT_FULL;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
import static org.apache.paimon.flink.FlinkConnectorOptions.NONE;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_BOUNDED;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.flink.log.LogStoreTableFactory.discoverLogStoreFactory;

Expand All @@ -93,19 +95,25 @@ public AbstractFlinkTableFactory(@Nullable FlinkCatalog flinkCatalog) {
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
CatalogTable origin = context.getCatalogTable().getOrigin();
boolean isStreamingMode =
Table table =
origin instanceof SystemCatalogTable
? ((SystemCatalogTable) origin).table()
: buildPaimonTable(context);
boolean unbounded =
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
Map<String, String> options = table.options();
if (options.containsKey(SCAN_BOUNDED.key())
&& parseBoolean(options.get(SCAN_BOUNDED.key()))) {
unbounded = false;
}
if (origin instanceof SystemCatalogTable) {
return new SystemTableSource(
((SystemCatalogTable) origin).table(),
isStreamingMode,
context.getObjectIdentifier());
return new SystemTableSource(table, unbounded, context.getObjectIdentifier());
} else {
return new DataTableSource(
context.getObjectIdentifier(),
buildPaimonTable(context),
isStreamingMode,
table,
unbounded,
context,
createOptionalLogStoreFactory(context).orElse(null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,14 @@ public class FlinkConnectorOptions {
+ "${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will "
+ "automatically generate the operator uid, which may be incompatible when the topology changes.");

public static final ConfigOption<Boolean> SCAN_BOUNDED =
key("scan.bounded")
.booleanType()
.noDefaultValue()
.withDescription(
"Bounded mode for Paimon consumer. "
+ "By default, Paimon automatically selects bounded mode based on the mode of the Flink job.");

public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public abstract class BaseDataTableSource extends FlinkTableSource
CoreOptions.SCAN_VERSION);

protected final ObjectIdentifier tableIdentifier;
protected final boolean streaming;
protected final boolean unbounded;
protected final DynamicTableFactory.Context context;
@Nullable protected final LogStoreTableFactory logStoreTableFactory;

Expand All @@ -104,7 +104,7 @@ public abstract class BaseDataTableSource extends FlinkTableSource
public BaseDataTableSource(
ObjectIdentifier tableIdentifier,
Table table,
boolean streaming,
boolean unbounded,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory,
@Nullable Predicate predicate,
Expand All @@ -114,7 +114,7 @@ public BaseDataTableSource(
@Nullable Long countPushed) {
super(table, predicate, projectFields, limit);
this.tableIdentifier = tableIdentifier;
this.streaming = streaming;
this.unbounded = unbounded;
this.context = context;
this.logStoreTableFactory = logStoreTableFactory;
this.predicate = predicate;
Expand All @@ -126,7 +126,7 @@ public BaseDataTableSource(

@Override
public ChangelogMode getChangelogMode() {
if (!streaming) {
if (!unbounded) {
// batch merge all, return insert only
return ChangelogMode.insertOnly();
}
Expand Down Expand Up @@ -195,7 +195,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
FlinkSourceBuilder sourceBuilder =
new FlinkSourceBuilder(table)
.sourceName(tableIdentifier.asSummaryString())
.sourceBounded(!streaming)
.sourceBounded(!unbounded)
.logSourceProvider(logSourceProvider)
.projection(projectFields)
.predicate(predicate)
Expand All @@ -204,7 +204,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.dynamicPartitionFilteringFields(dynamicPartitionFilteringFields());

return new PaimonDataStreamScanProvider(
!streaming,
!unbounded,
env ->
sourceBuilder
.sourceParallelism(inferSourceParallelism(env))
Expand Down Expand Up @@ -294,7 +294,7 @@ public boolean applyAggregates(
List<int[]> groupingSets,
List<AggregateExpression> aggregateExpressions,
DataType producedDataType) {
if (isStreaming()) {
if (isUnbounded()) {
return false;
}

Expand Down Expand Up @@ -349,7 +349,7 @@ public String asSummaryString() {
}

@Override
public boolean isStreaming() {
return streaming;
public boolean isUnbounded() {
return unbounded;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ public class DataTableSource extends BaseDataTableSource
public DataTableSource(
ObjectIdentifier tableIdentifier,
Table table,
boolean streaming,
boolean unbounded,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory) {
this(
tableIdentifier,
table,
streaming,
unbounded,
context,
logStoreTableFactory,
null,
Expand All @@ -76,7 +76,7 @@ public DataTableSource(
public DataTableSource(
ObjectIdentifier tableIdentifier,
Table table,
boolean streaming,
boolean unbounded,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory,
@Nullable Predicate predicate,
Expand All @@ -88,7 +88,7 @@ public DataTableSource(
super(
tableIdentifier,
table,
streaming,
unbounded,
context,
logStoreTableFactory,
predicate,
Expand All @@ -104,7 +104,7 @@ public DataTableSource copy() {
return new DataTableSource(
tableIdentifier,
table,
streaming,
unbounded,
context,
logStoreTableFactory,
predicate,
Expand All @@ -117,7 +117,7 @@ public DataTableSource copy() {

@Override
public TableStats reportStatistics() {
if (streaming) {
if (unbounded) {
return TableStats.UNKNOWN;
}
Optional<Statistics> optionStatistics = table.statistics();
Expand All @@ -142,13 +142,13 @@ public TableStats reportStatistics() {
@Override
public List<String> listAcceptedFilterFields() {
// note that streaming query doesn't support dynamic filtering
return streaming ? Collections.emptyList() : table.partitionKeys();
return unbounded ? Collections.emptyList() : table.partitionKeys();
}

@Override
public void applyDynamicFiltering(List<String> candidateFilterFields) {
checkState(
!streaming,
!unbounded,
"Cannot apply dynamic filtering to Paimon table '%s' when streaming reading.",
table.name());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public Result applyFilters(List<ResolvedExpression> filters) {
unConsumedFilters.add(filter);
} else {
Predicate p = predicateOptional.get();
if (isStreaming() || !p.visit(onlyPartFieldsVisitor)) {
if (isUnbounded() || !p.visit(onlyPartFieldsVisitor)) {
unConsumedFilters.add(filter);
} else {
consumedFilters.add(filter);
Expand Down Expand Up @@ -137,7 +137,7 @@ public void applyLimit(long limit) {
this.limit = limit;
}

public abstract boolean isStreaming();
public abstract boolean isUnbounded();

@Nullable
protected Integer inferSourceParallelism(StreamExecutionEnvironment env) {
Expand All @@ -150,7 +150,7 @@ protected Integer inferSourceParallelism(StreamExecutionEnvironment env) {
}
Integer parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
if (parallelism == null && options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
if (isStreaming()) {
if (isUnbounded()) {
parallelism = Math.max(1, options.get(CoreOptions.BUCKET));
} else {
scanSplitsForInference();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,14 @@
/** A {@link FlinkTableSource} for system table. */
public class SystemTableSource extends FlinkTableSource {

private final boolean isStreamingMode;
private final boolean unbounded;
private final int splitBatchSize;
private final FlinkConnectorOptions.SplitAssignMode splitAssignMode;
private final ObjectIdentifier tableIdentifier;

public SystemTableSource(
Table table, boolean isStreamingMode, ObjectIdentifier tableIdentifier) {
public SystemTableSource(Table table, boolean unbounded, ObjectIdentifier tableIdentifier) {
super(table);
this.isStreamingMode = isStreamingMode;
this.unbounded = unbounded;
Options options = Options.fromMap(table.options());
this.splitBatchSize = options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE);
this.splitAssignMode = options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE);
Expand All @@ -59,15 +58,15 @@ public SystemTableSource(

public SystemTableSource(
Table table,
boolean isStreamingMode,
boolean unbounded,
@Nullable Predicate predicate,
@Nullable int[][] projectFields,
@Nullable Long limit,
int splitBatchSize,
FlinkConnectorOptions.SplitAssignMode splitAssignMode,
ObjectIdentifier tableIdentifier) {
super(table, predicate, projectFields, limit);
this.isStreamingMode = isStreamingMode;
this.unbounded = unbounded;
this.splitBatchSize = splitBatchSize;
this.splitAssignMode = splitAssignMode;
this.tableIdentifier = tableIdentifier;
Expand Down Expand Up @@ -96,7 +95,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
}
readBuilder.withFilter(predicate);

if (isStreamingMode && table instanceof DataTable) {
if (unbounded && table instanceof DataTable) {
source =
new ContinuousFileStoreSource(
readBuilder, table.options(), limit, BucketMode.HASH_FIXED, rowData);
Expand Down Expand Up @@ -125,7 +124,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
public SystemTableSource copy() {
return new SystemTableSource(
table,
isStreamingMode,
unbounded,
predicate,
projectFields,
limit,
Expand All @@ -140,7 +139,7 @@ public String asSummaryString() {
}

@Override
public boolean isStreaming() {
return isStreamingMode;
public boolean isUnbounded() {
return unbounded;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.SnapshotNotExistException;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
Expand Down Expand Up @@ -636,6 +638,20 @@ public void testParquetRowDecimalAndTimestamp() {
Row.of(Row.of(DateTimeUtils.toLocalDateTime("2024-11-13 18:00:00", 0))));
}

@Test
public void testScanBounded() {
sql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
List<Row> result;
try (CloseableIterator<Row> iter =
sEnv.executeSql("SELECT * FROM T /*+ OPTIONS('scan.bounded'='true') */")
.collect()) {
result = ImmutableList.copyOf(iter);
} catch (Exception e) {
throw new RuntimeException(e);
}
assertThat(result).containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222));
}

private void validateCount1PushDown(String sql) {
Transformation<?> transformation = AbstractTestBase.translate(tEnv, sql);
while (!transformation.getInputs().isEmpty()) {
Expand Down
Loading