diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index bac7c05bdfef..1c6644238c3d 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -136,23 +136,26 @@ void validate() { if (startingStrategy == StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID) { Preconditions.checkArgument( startSnapshotId != null, - "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: null"); + "Invalid starting snapshot id for %s strategy: null", + startingStrategy); Preconditions.checkArgument( startSnapshotTimestamp == null, - "Invalid starting snapshot timestamp for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); + "Invalid starting snapshot timestamp for %s strategy: not null", + startingStrategy); } if (startingStrategy == StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP) { Preconditions.checkArgument( startSnapshotTimestamp != null, - "Invalid starting snapshot timestamp for SPECIFIC_START_SNAPSHOT_TIMESTAMP strategy: null"); + "Invalid starting snapshot timestamp for %s strategy: null", + startingStrategy); Preconditions.checkArgument( startSnapshotId == null, - "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); + "Invalid starting snapshot id for %s strategy: not null", + startingStrategy); } Preconditions.checkArgument( - tag == null, - String.format("Cannot scan table using ref %s configured for streaming reader", tag)); + tag == null, "Cannot scan table using ref %s configured for streaming reader", tag); Preconditions.checkArgument( snapshotId == null, "Cannot set snapshot-id option for streaming reader"); Preconditions.checkArgument( diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestScanContext.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestScanContext.java index 5dd7de545e11..09639a8a9568 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestScanContext.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestScanContext.java @@ -31,7 +31,7 @@ void testIncrementalFromSnapshotId() { .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID) .build(); assertException( - context, "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: null"); + context, "Invalid starting snapshot id for INCREMENTAL_FROM_SNAPSHOT_ID strategy: null"); context = ScanContext.builder() @@ -42,7 +42,7 @@ void testIncrementalFromSnapshotId() { .build(); assertException( context, - "Invalid starting snapshot timestamp for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); + "Invalid starting snapshot timestamp for INCREMENTAL_FROM_SNAPSHOT_ID strategy: not null"); } @Test @@ -54,7 +54,7 @@ void testIncrementalFromSnapshotTimestamp() { .build(); assertException( context, - "Invalid starting snapshot timestamp for SPECIFIC_START_SNAPSHOT_TIMESTAMP strategy: null"); + "Invalid starting snapshot timestamp for INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP strategy: null"); context = ScanContext.builder() @@ -64,7 +64,8 @@ void testIncrementalFromSnapshotTimestamp() { .startSnapshotTimestamp(1L) .build(); assertException( - context, "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); + context, + "Invalid starting snapshot id for INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP strategy: not null"); } @Test diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index bac7c05bdfef..1c6644238c3d 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -136,23 +136,26 @@ void validate() { if (startingStrategy == StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID) { Preconditions.checkArgument( startSnapshotId != null, - "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: null"); + "Invalid starting snapshot id for %s strategy: null", + startingStrategy); Preconditions.checkArgument( startSnapshotTimestamp == null, - "Invalid starting snapshot timestamp for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); + "Invalid starting snapshot timestamp for %s strategy: not null", + startingStrategy); } if (startingStrategy == StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP) { Preconditions.checkArgument( startSnapshotTimestamp != null, - "Invalid starting snapshot timestamp for SPECIFIC_START_SNAPSHOT_TIMESTAMP strategy: null"); + "Invalid starting snapshot timestamp for %s strategy: null", + startingStrategy); Preconditions.checkArgument( startSnapshotId == null, - "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); + "Invalid starting snapshot id for %s strategy: not null", + startingStrategy); } Preconditions.checkArgument( - tag == null, - String.format("Cannot scan table using ref %s configured for streaming reader", tag)); + tag == null, "Cannot scan table using ref %s configured for streaming reader", tag); Preconditions.checkArgument( snapshotId == null, "Cannot set snapshot-id option for streaming reader"); Preconditions.checkArgument( diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestScanContext.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestScanContext.java index 5dd7de545e11..09639a8a9568 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestScanContext.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestScanContext.java @@ -31,7 +31,7 @@ void testIncrementalFromSnapshotId() { .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID) .build(); assertException( - context, "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: null"); + context, "Invalid starting snapshot id for INCREMENTAL_FROM_SNAPSHOT_ID strategy: null"); context = ScanContext.builder() @@ -42,7 +42,7 @@ void testIncrementalFromSnapshotId() { .build(); assertException( context, - "Invalid starting snapshot timestamp for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); + "Invalid starting snapshot timestamp for INCREMENTAL_FROM_SNAPSHOT_ID strategy: not null"); } @Test @@ -54,7 +54,7 @@ void testIncrementalFromSnapshotTimestamp() { .build(); assertException( context, - "Invalid starting snapshot timestamp for SPECIFIC_START_SNAPSHOT_TIMESTAMP strategy: null"); + "Invalid starting snapshot timestamp for INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP strategy: null"); context = ScanContext.builder() @@ -64,7 +64,8 @@ void testIncrementalFromSnapshotTimestamp() { .startSnapshotTimestamp(1L) .build(); assertException( - context, "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); + context, + "Invalid starting snapshot id for INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP strategy: not null"); } @Test diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index bac7c05bdfef..1c6644238c3d 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -136,23 +136,26 @@ void validate() { if (startingStrategy == StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID) { Preconditions.checkArgument( startSnapshotId != null, - "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: null"); + "Invalid starting snapshot id for %s strategy: null", + startingStrategy); Preconditions.checkArgument( startSnapshotTimestamp == null, - "Invalid starting snapshot timestamp for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); + "Invalid starting snapshot timestamp for %s strategy: not null", + startingStrategy); } if (startingStrategy == StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP) { Preconditions.checkArgument( startSnapshotTimestamp != null, - "Invalid starting snapshot timestamp for SPECIFIC_START_SNAPSHOT_TIMESTAMP strategy: null"); + "Invalid starting snapshot timestamp for %s strategy: null", + startingStrategy); Preconditions.checkArgument( startSnapshotId == null, - "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); + "Invalid starting snapshot id for %s strategy: not null", + startingStrategy); } Preconditions.checkArgument( - tag == null, - String.format("Cannot scan table using ref %s configured for streaming reader", tag)); + tag == null, "Cannot scan table using ref %s configured for streaming reader", tag); Preconditions.checkArgument( snapshotId == null, "Cannot set snapshot-id option for streaming reader"); Preconditions.checkArgument( diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestScanContext.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestScanContext.java index 5dd7de545e11..09639a8a9568 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestScanContext.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestScanContext.java @@ -31,7 +31,7 @@ void testIncrementalFromSnapshotId() { .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID) .build(); assertException( - context, "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: null"); + context, "Invalid starting snapshot id for INCREMENTAL_FROM_SNAPSHOT_ID strategy: null"); context = ScanContext.builder() @@ -42,7 +42,7 @@ void testIncrementalFromSnapshotId() { .build(); assertException( context, - "Invalid starting snapshot timestamp for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); + "Invalid starting snapshot timestamp for INCREMENTAL_FROM_SNAPSHOT_ID strategy: not null"); } @Test @@ -54,7 +54,7 @@ void testIncrementalFromSnapshotTimestamp() { .build(); assertException( context, - "Invalid starting snapshot timestamp for SPECIFIC_START_SNAPSHOT_TIMESTAMP strategy: null"); + "Invalid starting snapshot timestamp for INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP strategy: null"); context = ScanContext.builder() @@ -64,7 +64,8 @@ void testIncrementalFromSnapshotTimestamp() { .startSnapshotTimestamp(1L) .build(); assertException( - context, "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); + context, + "Invalid starting snapshot id for INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP strategy: not null"); } @Test