Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/content/flink/sql-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,23 @@ If you want see `DELETE` records, you can use audit_log table:
SELECT * FROM t$audit_log /*+ OPTIONS('incremental-between' = '12,20') */;
```

### Batch Incremental between Auto-created Tags

You can use `incremental-between` to query incremental changes between two tags. But for auto-created tag, the tag may
not be created in-time because of data delay.

For example, assume that tags '2024-12-01', '2024-12-02' and '2024-12-04' are auto created daily. Data for 12/03 are delayed
and ingested with data for 12/04. Now if you want to query the incremental changes between tags, and you don't know the tag
of 12/03 is not created, you will use `incremental-between` with '2024-12-01,2024-12-02', '2024-12-02,2024-12-03' and
'2024-12-03,2024-12-04' respectively, then you will get an error that the tag '2024-12-03' doesn't exist.

We introduced a new option `incremental-to-auto-tag` for this scenario. You can only specify the end tag, and Paimon will
find an earlier tag and return changes between them. If the tag doesn't exist or the earlier tag doesn't exist, return empty.

For example, when you query 'incremental-to-auto-tag=2024-12-01' or 'incremental-to-auto-tag=2024-12-03', the result is
empty; Query 'incremental-to-auto-tag=2024-12-02', the result is change between 12/01 and 12/02; Query 'incremental-to-auto-tag=2024-12-04',
the result is change between 12/02 and 12/04.

## Streaming Query

By default, Streaming read produces the latest snapshot on the table upon first startup,
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,12 @@
<td>String</td>
<td>Read incremental changes between start timestamp (exclusive) and end timestamp, for example, 't1,t2' means changes between timestamp t1 and timestamp t2.</td>
</tr>
<tr>
<td><h5>incremental-to-auto-tag</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Used to specify the auto-created tag to reading incremental changes.</td>
</tr>
<tr>
<td><h5>local-merge-buffer-size</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
14 changes: 13 additions & 1 deletion paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,13 @@ public class CoreOptions implements Serializable {
"Read incremental changes between start timestamp (exclusive) and end timestamp, "
+ "for example, 't1,t2' means changes between timestamp t1 and timestamp t2.");

public static final ConfigOption<String> INCREMENTAL_TO_AUTO_TAG =
key("incremental-to-auto-tag")
.stringType()
.noDefaultValue()
.withDescription(
"Used to specify the auto-created tag to reading incremental changes.");

public static final ConfigOption<Boolean> END_INPUT_CHECK_PARTITION_EXPIRE =
key("end-input.check-partition-expire")
.booleanType()
Expand Down Expand Up @@ -2119,6 +2126,10 @@ public IncrementalBetweenScanMode incrementalBetweenScanMode() {
return options.get(INCREMENTAL_BETWEEN_SCAN_MODE);
}

public String incrementalToAutoTag() {
return options.get(INCREMENTAL_TO_AUTO_TAG);
}

public Integer scanManifestParallelism() {
return options.get(SCAN_MANIFEST_PARALLELISM);
}
Expand Down Expand Up @@ -2752,7 +2763,8 @@ public static void setDefaultValues(Options options) {
}

if ((options.contains(INCREMENTAL_BETWEEN_TIMESTAMP)
|| options.contains(INCREMENTAL_BETWEEN))
|| options.contains(INCREMENTAL_BETWEEN)
|| options.contains(INCREMENTAL_TO_AUTO_TAG))
&& !options.contains(SCAN_MODE)) {
options.set(SCAN_MODE, StartupMode.INCREMENTAL);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG;
import static org.apache.paimon.CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS;
import static org.apache.paimon.CoreOptions.SCAN_MODE;
import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
Expand Down Expand Up @@ -278,7 +279,8 @@ private static void validateStartupMode(CoreOptions options) {
SCAN_FILE_CREATION_TIME_MILLIS,
SCAN_TAG_NAME,
INCREMENTAL_BETWEEN_TIMESTAMP,
INCREMENTAL_BETWEEN),
INCREMENTAL_BETWEEN,
INCREMENTAL_TO_AUTO_TAG),
Arrays.asList(SCAN_TIMESTAMP_MILLIS, SCAN_TIMESTAMP));
} else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT) {
checkExactOneOptionExistInMode(
Expand All @@ -294,14 +296,16 @@ private static void validateStartupMode(CoreOptions options) {
SCAN_TIMESTAMP,
SCAN_FILE_CREATION_TIME_MILLIS,
INCREMENTAL_BETWEEN_TIMESTAMP,
INCREMENTAL_BETWEEN),
INCREMENTAL_BETWEEN,
INCREMENTAL_TO_AUTO_TAG),
Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME));
} else if (options.startupMode() == CoreOptions.StartupMode.INCREMENTAL) {
checkExactOneOptionExistInMode(
options,
options.startupMode(),
INCREMENTAL_BETWEEN,
INCREMENTAL_BETWEEN_TIMESTAMP);
INCREMENTAL_BETWEEN_TIMESTAMP,
INCREMENTAL_TO_AUTO_TAG);
checkOptionsConflict(
options,
Arrays.asList(
Expand All @@ -310,7 +314,10 @@ private static void validateStartupMode(CoreOptions options) {
SCAN_FILE_CREATION_TIME_MILLIS,
SCAN_TIMESTAMP,
SCAN_TAG_NAME),
Arrays.asList(INCREMENTAL_BETWEEN, INCREMENTAL_BETWEEN_TIMESTAMP));
Arrays.asList(
INCREMENTAL_BETWEEN,
INCREMENTAL_BETWEEN_TIMESTAMP,
INCREMENTAL_TO_AUTO_TAG));
} else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
checkOptionExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode());
checkOptionsConflict(
Expand All @@ -321,7 +328,8 @@ private static void validateStartupMode(CoreOptions options) {
SCAN_FILE_CREATION_TIME_MILLIS,
SCAN_TAG_NAME,
INCREMENTAL_BETWEEN_TIMESTAMP,
INCREMENTAL_BETWEEN),
INCREMENTAL_BETWEEN,
INCREMENTAL_TO_AUTO_TAG),
Collections.singletonList(SCAN_SNAPSHOT_ID));
} else if (options.startupMode() == CoreOptions.StartupMode.FROM_FILE_CREATION_TIME) {
checkOptionExistInMode(
Expand All @@ -335,7 +343,8 @@ private static void validateStartupMode(CoreOptions options) {
SCAN_TIMESTAMP_MILLIS,
SCAN_TAG_NAME,
INCREMENTAL_BETWEEN_TIMESTAMP,
INCREMENTAL_BETWEEN),
INCREMENTAL_BETWEEN,
INCREMENTAL_TO_AUTO_TAG),
Collections.singletonList(SCAN_FILE_CREATION_TIME_MILLIS));
} else {
checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS, options.startupMode());
Expand All @@ -347,6 +356,7 @@ private static void validateStartupMode(CoreOptions options) {
checkOptionNotExistInMode(
options, INCREMENTAL_BETWEEN_TIMESTAMP, options.startupMode());
checkOptionNotExistInMode(options, INCREMENTAL_BETWEEN, options.startupMode());
checkOptionNotExistInMode(options, INCREMENTAL_TO_AUTO_TAG, options.startupMode());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotFullStartingScanner;
Expand All @@ -44,21 +46,29 @@
import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
import org.apache.paimon.tag.Tag;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** An abstraction layer above {@link FileStoreScan} to provide input split generation. */
public abstract class AbstractDataTableScan implements DataTableScan {

private static final Logger LOG = LoggerFactory.getLogger(AbstractDataTableScan.class);

private final CoreOptions options;
protected final SnapshotReader snapshotReader;

Expand Down Expand Up @@ -199,50 +209,87 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {
: new StaticFromSnapshotStartingScanner(snapshotManager, scanSnapshotId);
case INCREMENTAL:
checkArgument(!isStreaming, "Cannot read incremental in streaming mode.");
Pair<String, String> incrementalBetween = options.incrementalBetween();
CoreOptions.IncrementalBetweenScanMode scanType =
options.incrementalBetweenScanMode();
ScanMode scanMode;
switch (scanType) {
case AUTO:
scanMode =
options.changelogProducer() == ChangelogProducer.NONE
? ScanMode.DELTA
: ScanMode.CHANGELOG;
break;
case DELTA:
scanMode = ScanMode.DELTA;
break;
case CHANGELOG:
scanMode = ScanMode.CHANGELOG;
break;
default:
throw new UnsupportedOperationException(
"Unknown incremental scan type " + scanType.name());
}
if (options.toMap().get(CoreOptions.INCREMENTAL_BETWEEN.key()) != null) {
try {
return new IncrementalStartingScanner(
snapshotManager,
Long.parseLong(incrementalBetween.getLeft()),
Long.parseLong(incrementalBetween.getRight()),
scanMode);
} catch (NumberFormatException e) {
return new IncrementalTagStartingScanner(
snapshotManager,
incrementalBetween.getLeft(),
incrementalBetween.getRight());
}
} else {
return new IncrementalTimeStampStartingScanner(
snapshotManager,
Long.parseLong(incrementalBetween.getLeft()),
Long.parseLong(incrementalBetween.getRight()),
scanMode);
}
return createIncrementalStartingScanner(snapshotManager);
default:
throw new UnsupportedOperationException(
"Unknown startup mode " + startupMode.name());
}
}

private StartingScanner createIncrementalStartingScanner(SnapshotManager snapshotManager) {
CoreOptions.IncrementalBetweenScanMode scanType = options.incrementalBetweenScanMode();
ScanMode scanMode;
switch (scanType) {
case AUTO:
scanMode =
options.changelogProducer() == ChangelogProducer.NONE
? ScanMode.DELTA
: ScanMode.CHANGELOG;
break;
case DELTA:
scanMode = ScanMode.DELTA;
break;
case CHANGELOG:
scanMode = ScanMode.CHANGELOG;
break;
default:
throw new UnsupportedOperationException(
"Unknown incremental scan type " + scanType.name());
}

Options conf = options.toConfiguration();
TagManager tagManager =
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN)) {
Pair<String, String> incrementalBetween = options.incrementalBetween();
Optional<Tag> startTag = tagManager.get(incrementalBetween.getLeft());
Optional<Tag> endTag = tagManager.get(incrementalBetween.getRight());
if (startTag.isPresent() && endTag.isPresent()) {
Snapshot start = startTag.get().trimToSnapshot();
Snapshot end = endTag.get().trimToSnapshot();

LOG.info(
"{} start and end are parsed to tag with snapshot id {} to {}.",
INCREMENTAL_BETWEEN.key(),
start.id(),
end.id());

if (end.id() <= start.id()) {
throw new IllegalArgumentException(
String.format(
"Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s",
incrementalBetween.getRight(),
end.id(),
incrementalBetween.getLeft(),
start.id()));
}
return new IncrementalTagStartingScanner(snapshotManager, start, end);
} else {
long startId, endId;
try {
startId = Long.parseLong(incrementalBetween.getLeft());
endId = Long.parseLong(incrementalBetween.getRight());
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
String.format(
"Didn't find two tags for start '%s' and end '%s', and they are not two snapshot Ids. "
+ "Please set two tags or two snapshot Ids.",
incrementalBetween.getLeft(), incrementalBetween.getRight()));
}
return new IncrementalStartingScanner(snapshotManager, startId, endId, scanMode);
}
} else if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)) {
Pair<String, String> incrementalBetween = options.incrementalBetween();
return new IncrementalTimeStampStartingScanner(
snapshotManager,
Long.parseLong(incrementalBetween.getLeft()),
Long.parseLong(incrementalBetween.getRight()),
scanMode);
} else if (conf.contains(CoreOptions.INCREMENTAL_TO_AUTO_TAG)) {
String endTag = options.incrementalToAutoTag();
return IncrementalTagStartingScanner.create(snapshotManager, endTag, options);
} else {
throw new UnsupportedOperationException("Unknown incremental read mode.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table.source.snapshot;

import org.apache.paimon.utils.SnapshotManager;

/** This scanner always return an empty result. */
public class EmptyResultStartingScanner extends AbstractStartingScanner {

EmptyResultStartingScanner(SnapshotManager snapshotManager) {
super(snapshotManager);
}

@Override
public Result scan(SnapshotReader snapshotReader) {
return new NoSnapshot();
}
}
Loading
Loading