Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@
<td><h5>scan.mode</h5></td>
<td style="word-wrap: break-word;">default</td>
<td><p>Enum</p></td>
<td>Specify the scanning behavior of the source.<br /><br />Possible values:<ul><li>"default": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp", and if "scan.snapshot-id" is set the actual startup mode will be "from-snapshot". Otherwise the actual startup mode will be "latest-full".</li><li>"latest-full": For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just produce the latest snapshot but does not read new changes.</li><li>"full": Deprecated. Same as "latest-full".</li><li>"latest": For streaming sources, continuously reads latest changes without producing a snapshot at the beginning. For batch sources, behaves the same as the "latest-full" startup mode.</li><li>"compacted-full": For streaming sources, produces a snapshot after the latest compaction on the table upon first startup, and continue to read the latest changes. For batch sources, just produce a snapshot after the latest compaction but does not read new changes. Snapshots of full compaction are picked when scheduled full-compaction is enabled.</li><li>"from-timestamp": For streaming sources, continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning. For batch sources, produces a snapshot at timestamp specified by "scan.timestamp-millis" but does not read new changes.</li><li>"from-snapshot": For streaming sources, continuously reads changes starting from snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning. For batch sources, produces a snapshot specified by "scan.snapshot-id" but does not read new changes.</li><li>"from-snapshot-full": For streaming sources, produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes. For batch sources, produces a snapshot specified by "scan.snapshot-id" but does not read new changes.</li></ul></td>
<td>Specify the scanning behavior of the source.<br /><br />Possible values:<ul><li>"default": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is set the actual startup mode will be "from-snapshot". Otherwise the actual startup mode will be "latest-full".</li><li>"latest-full": For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just produce the latest snapshot but does not read new changes.</li><li>"full": Deprecated. Same as "latest-full".</li><li>"latest": For streaming sources, continuously reads latest changes without producing a snapshot at the beginning. For batch sources, behaves the same as the "latest-full" startup mode.</li><li>"compacted-full": For streaming sources, produces a snapshot after the latest compaction on the table upon first startup, and continue to read the latest changes. For batch sources, just produce a snapshot after the latest compaction but does not read new changes. Snapshots of full compaction are picked when scheduled full-compaction is enabled.</li><li>"from-timestamp": For streaming sources, continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning. For batch sources, produces a snapshot at timestamp specified by "scan.timestamp-millis" but does not read new changes.</li><li>"from-snapshot": For streaming sources, continuously reads changes starting from snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning. For batch sources, produces a snapshot specified by "scan.snapshot-id" or "scan.tag-name" but does not read new changes.</li><li>"from-snapshot-full": For streaming sources, produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes. For batch sources, produces a snapshot specified by "scan.snapshot-id" but does not read new changes.</li></ul></td>
</tr>
<tr>
<td><h5>scan.plan-sort-partition</h5></td>
Expand All @@ -362,6 +362,12 @@
<td>Long</td>
<td>Optional snapshot id used in case of "from-snapshot" or "from-snapshot-full" scan mode</td>
</tr>
<tr>
<td><h5>scan.tag-name</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Optional tag name used in case of "from-tag" scan mode.</td>
</tr>
<tr>
<td><h5>scan.timestamp-millis</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
23 changes: 17 additions & 6 deletions paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,12 @@ public class CoreOptions implements Serializable {
.withDescription(
"Optional snapshot id used in case of \"from-snapshot\" or \"from-snapshot-full\" scan mode");

public static final ConfigOption<String> SCAN_TAG_NAME =
key("scan.tag-name")
.stringType()
.noDefaultValue()
.withDescription("Optional tag name used in case of \"from-tag\" scan mode.");

public static final ConfigOption<Long> SCAN_BOUNDED_WATERMARK =
key("scan.bounded.watermark")
.longType()
Expand Down Expand Up @@ -887,7 +893,8 @@ public static StartupMode startupMode(Options options) {
if (mode == StartupMode.DEFAULT) {
if (options.getOptional(SCAN_TIMESTAMP_MILLIS).isPresent()) {
return StartupMode.FROM_TIMESTAMP;
} else if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent()) {
} else if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent()
|| options.getOptional(SCAN_TAG_NAME).isPresent()) {
return StartupMode.FROM_SNAPSHOT;
} else {
return StartupMode.LATEST_FULL;
Expand All @@ -911,6 +918,10 @@ public Long scanSnapshotId() {
return options.get(SCAN_SNAPSHOT_ID);
}

public String scanTagName() {
return options.get(SCAN_TAG_NAME);
}

public Integer scanManifestParallelism() {
return options.get(SCAN_MANIFEST_PARALLELISM);
}
Expand Down Expand Up @@ -1008,7 +1019,7 @@ public enum StartupMode implements DescribedEnum {
"default",
"Determines actual startup mode according to other table properties. "
+ "If \"scan.timestamp-millis\" is set the actual startup mode will be \"from-timestamp\", "
+ "and if \"scan.snapshot-id\" is set the actual startup mode will be \"from-snapshot\". "
+ "and if \"scan.snapshot-id\" or \"scan.tag-name\" is set the actual startup mode will be \"from-snapshot\". "
+ "Otherwise the actual startup mode will be \"latest-full\"."),

LATEST_FULL(
Expand Down Expand Up @@ -1043,10 +1054,10 @@ public enum StartupMode implements DescribedEnum {

FROM_SNAPSHOT(
"from-snapshot",
"For streaming sources, continuously reads changes "
+ "starting from snapshot specified by \"scan.snapshot-id\", "
+ "without producing a snapshot at the beginning. For batch sources, "
+ "produces a snapshot specified by \"scan.snapshot-id\" but does not read new changes."),
"For streaming sources, continuously reads changes starting from snapshot "
+ "specified by \"scan.snapshot-id\", without producing a snapshot at the beginning. "
+ "For batch sources, produces a snapshot specified by \"scan.snapshot-id\" "
+ "or \"scan.tag-name\" but does not read new changes."),

FROM_SNAPSHOT_FULL(
"from-snapshot-full",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.stream.Collectors;

import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;

/** Default implementation of {@link FileStoreScan}. */
public abstract class AbstractFileStoreScan implements FileStoreScan {
Expand All @@ -68,7 +69,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
protected final ScanBucketFilter bucketFilter;

private Predicate partitionFilter;
private Long specifiedSnapshotId = null;
private Snapshot specifiedSnapshot = null;
private Integer specifiedBucket = null;
private List<ManifestFileMeta> specifiedManifests = null;
private ScanKind scanKind = ScanKind.ALL;
Expand Down Expand Up @@ -142,19 +143,22 @@ public FileStoreScan withPartitionBucket(BinaryRow partition, int bucket) {

@Override
public FileStoreScan withSnapshot(long snapshotId) {
this.specifiedSnapshotId = snapshotId;
if (specifiedManifests != null) {
throw new IllegalStateException("Cannot set both snapshot id and manifests.");
}
checkState(specifiedManifests == null, "Cannot set both snapshot and manifests.");
this.specifiedSnapshot = snapshotManager.snapshot(snapshotId);
return this;
}

@Override
public FileStoreScan withSnapshot(Snapshot snapshot) {
checkState(specifiedManifests == null, "Cannot set both snapshot and manifests.");
this.specifiedSnapshot = snapshot;
return this;
}

@Override
public FileStoreScan withManifestList(List<ManifestFileMeta> manifests) {
checkState(specifiedSnapshot == null, "Cannot set both snapshot and manifests.");
this.specifiedManifests = manifests;
if (specifiedSnapshotId != null) {
throw new IllegalStateException("Cannot set both snapshot id and manifests.");
}
return this;
}

Expand Down Expand Up @@ -209,20 +213,17 @@ private Pair<Snapshot, List<ManifestEntry>> doPlan(
List<ManifestFileMeta> manifests = specifiedManifests;
Snapshot snapshot = null;
if (manifests == null) {
Long snapshotId = specifiedSnapshotId;
if (snapshotId == null) {
snapshotId = snapshotManager.latestSnapshotId();
}
if (snapshotId == null) {
snapshot =
specifiedSnapshot == null
? snapshotManager.latestSnapshot()
: specifiedSnapshot;
if (snapshot == null) {
manifests = Collections.emptyList();
} else {
snapshot = snapshotManager.snapshot(snapshotId);
manifests = readManifests(snapshot);
}
}

final List<ManifestFileMeta> readManifests = manifests;

Iterable<ManifestEntry> entries =
ParallellyExecuteUtils.parallelismBatchIterable(
files ->
Expand All @@ -231,7 +232,7 @@ private Pair<Snapshot, List<ManifestEntry>> doPlan(
.flatMap(m -> readManifest.apply(m).stream())
.filter(this::filterByStats)
.collect(Collectors.toList()),
readManifests,
manifests,
scanManifestParallelism);

List<ManifestEntry> files = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.operation;

import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
Expand Down Expand Up @@ -48,6 +49,8 @@ public interface FileStoreScan {

FileStoreScan withSnapshot(long snapshotId);

FileStoreScan withSnapshot(Snapshot snapshot);

FileStoreScan withManifestList(List<ManifestFileMeta> manifests);

FileStoreScan withKind(ScanKind scanKind);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,19 @@
import org.apache.paimon.types.RowType;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.SCAN_MODE;
import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
Expand Down Expand Up @@ -68,18 +71,8 @@ public static void validateTableSchema(TableSchema schema) {
validatePrimaryKeysType(schema.fields(), schema.primaryKeys());

CoreOptions options = new CoreOptions(schema.options());
if (options.startupMode() == CoreOptions.StartupMode.FROM_TIMESTAMP) {
checkOptionExistInMode(
options, SCAN_TIMESTAMP_MILLIS, CoreOptions.StartupMode.FROM_TIMESTAMP);
checkOptionsConflict(options, SCAN_SNAPSHOT_ID, SCAN_TIMESTAMP_MILLIS);
} else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT
|| options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
checkOptionExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode());
checkOptionsConflict(options, SCAN_TIMESTAMP_MILLIS, SCAN_SNAPSHOT_ID);
} else {
checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS, options.startupMode());
checkOptionNotExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode());
}

validateStartupMode(options);

if (options.writeMode() == WriteMode.APPEND_ONLY
&& options.changelogProducer() != CoreOptions.ChangelogProducer.NONE) {
Expand Down Expand Up @@ -191,6 +184,34 @@ private static void validatePrimaryKeysType(List<DataField> fields, List<String>
}
}

private static void validateStartupMode(CoreOptions options) {
if (options.startupMode() == CoreOptions.StartupMode.FROM_TIMESTAMP) {
checkOptionExistInMode(
options, SCAN_TIMESTAMP_MILLIS, CoreOptions.StartupMode.FROM_TIMESTAMP);
checkOptionsConflict(
options,
Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME),
Collections.singletonList(SCAN_TIMESTAMP_MILLIS));
} else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT) {
checkExactOneOptionExistInMode(
options, options.startupMode(), SCAN_SNAPSHOT_ID, SCAN_TAG_NAME);
checkOptionsConflict(
options,
Collections.singletonList(SCAN_TIMESTAMP_MILLIS),
Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME));
} else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
checkOptionExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode());
checkOptionsConflict(
options,
Arrays.asList(SCAN_TIMESTAMP_MILLIS, SCAN_TAG_NAME),
Collections.singletonList(SCAN_SNAPSHOT_ID));
} else {
checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS, options.startupMode());
checkOptionNotExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode());
checkOptionNotExistInMode(options, SCAN_TAG_NAME, options.startupMode());
}
}

private static void checkOptionExistInMode(
CoreOptions options, ConfigOption<?> option, CoreOptions.StartupMode startupMode) {
checkArgument(
Expand All @@ -209,11 +230,34 @@ private static void checkOptionNotExistInMode(
option.key(), startupMode, SCAN_MODE.key()));
}

private static void checkOptionsConflict(
CoreOptions options, ConfigOption<?> illegalOption, ConfigOption<?> legalOption) {
private static void checkExactOneOptionExistInMode(
CoreOptions options,
CoreOptions.StartupMode startupMode,
ConfigOption<?>... configOptions) {
checkArgument(
!options.toConfiguration().contains(illegalOption),
Arrays.stream(configOptions)
.filter(op -> options.toConfiguration().contains(op))
.count()
== 1,
String.format(
"%s must be null when you set %s", illegalOption.key(), legalOption.key()));
"must set only one key in [%s] when you use %s for %s",
concatConfigKeys(Arrays.asList(configOptions)),
startupMode,
SCAN_MODE.key()));
}

private static void checkOptionsConflict(
CoreOptions options,
List<ConfigOption<?>> illegalOptions,
List<ConfigOption<?>> legalOptions) {
checkArgument(
illegalOptions.stream().noneMatch(op -> options.toConfiguration().contains(op)),
"[%s] must be null when you set [%s]",
concatConfigKeys(illegalOptions),
concatConfigKeys(legalOptions));
}

private static String concatConfigKeys(List<ConfigOption<?>> configOptions) {
return configOptions.stream().map(ConfigOption::key).collect(Collectors.joining(","));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,23 @@ public TableCommitImpl newCommit(String commitUser) {

private Optional<TableSchema> tryTimeTravel(Options options) {
CoreOptions coreOptions = new CoreOptions(options);
Long snapshotId;

switch (coreOptions.startupMode()) {
case FROM_SNAPSHOT:
case FROM_SNAPSHOT_FULL:
snapshotId = coreOptions.scanSnapshotId();
if (snapshotManager().snapshotExists(snapshotId)) {
long schemaId = snapshotManager().snapshot(snapshotId).schemaId();
return Optional.of(schemaManager().schema(schemaId).copy(options.toMap()));
if (coreOptions.scanSnapshotId() != null) {
long snapshotId = coreOptions.scanSnapshotId();
if (snapshotManager().snapshotExists(snapshotId)) {
long schemaId = snapshotManager().snapshot(snapshotId).schemaId();
return Optional.of(schemaManager().schema(schemaId).copy(options.toMap()));
}
} else {
String tagName = coreOptions.scanTagName();
TagManager tagManager = new TagManager(fileIO, path);
if (tagManager.tagExists(tagName)) {
long schemaId = tagManager.taggedSnapshot(tagName).schemaId();
return Optional.of(schemaManager().schema(schemaId).copy(options.toMap()));
}
}
return Optional.empty();
case FROM_TIMESTAMP:
Expand Down
Loading