diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index 37dd25bf9029..7ab7bcd9a9c6 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
- "modification": 3
+ "modification": 2
}
diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle
index e4d148cf5563..8d5cef919949 100644
--- a/sdks/java/io/iceberg/build.gradle
+++ b/sdks/java/io/iceberg/build.gradle
@@ -58,6 +58,10 @@ dependencies {
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
implementation "org.apache.iceberg:iceberg-data:$iceberg_version"
implementation library.java.hadoop_common
+ // TODO(https://github.com/apache/beam/issues/21156): Determine how to build without this dependency
+ provided "org.immutables:value:2.8.8"
+ permitUnusedDeclared "org.immutables:value:2.8.8"
+ implementation library.java.vendored_calcite_1_28_0
runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version"
testImplementation project(":sdks:java:managed")
@@ -173,6 +177,7 @@ task dataflowIntegrationTest(type: Test) {
filter {
includeTestsMatching 'org.apache.beam.sdk.io.iceberg.catalog.BigQueryMetastoreCatalogIT.testRead'
+ includeTestsMatching 'org.apache.beam.sdk.io.iceberg.catalog.BigQueryMetastoreCatalogIT.testReadWithFilter'
includeTestsMatching 'org.apache.beam.sdk.io.iceberg.catalog.BigQueryMetastoreCatalogIT.testStreamingRead'
includeTestsMatching 'org.apache.beam.sdk.io.iceberg.catalog.BigQueryMetastoreCatalogIT.testWrite'
includeTestsMatching 'org.apache.beam.sdk.io.iceberg.catalog.BigQueryMetastoreCatalogIT.testWriteRead'
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.java
index d4b4b6ecdbc7..a40e0e13f78a 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.java
@@ -30,6 +30,7 @@
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.IncrementalAppendScan;
import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
@@ -86,6 +87,10 @@ public void process(
if (fromSnapshot != null) {
scan = scan.fromSnapshotExclusive(fromSnapshot);
}
+ @Nullable Expression filter = scanConfig.getFilter();
+ if (filter != null) {
+ scan = scan.filter(filter);
+ }
createAndOutputReadTasks(scan, snapshot, out);
}
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java
new file mode 100644
index 000000000000..74ff86fcafda
--- /dev/null
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlBasicCall;
+import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlIdentifier;
+import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind;
+import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlLiteral;
+import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlNode;
+import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlNodeList;
+import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlOperator;
+import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParser;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expression.Operation;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.types.Type.TypeID;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.NaNUtil;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Utilities that convert between a SQL filter expression and an Iceberg {@link Expression}. Uses
+ * Apache Calcite semantics.
+ *
+ *
Note: Only supports top-level fields (i.e. cannot reference nested fields).
+ */
+class FilterUtils {
+ private static final Map FILTERS =
+ ImmutableMap.builder()
+ .put(SqlKind.IS_NULL, Operation.IS_NULL)
+ .put(SqlKind.IS_NOT_NULL, Operation.NOT_NULL)
+ .put(SqlKind.LESS_THAN, Operation.LT)
+ .put(SqlKind.LESS_THAN_OR_EQUAL, Operation.LT_EQ)
+ .put(SqlKind.GREATER_THAN, Operation.GT)
+ .put(SqlKind.GREATER_THAN_OR_EQUAL, Operation.GT_EQ)
+ .put(SqlKind.EQUALS, Operation.EQ)
+ .put(SqlKind.NOT_EQUALS, Operation.NOT_EQ)
+ .put(SqlKind.IN, Operation.IN)
+ .put(SqlKind.NOT_IN, Operation.NOT_IN)
+ .put(SqlKind.AND, Operation.AND)
+ .put(SqlKind.OR, Operation.OR)
+ .build();
+
+ static Expression convert(@Nullable String filter, Schema schema) {
+ if (filter == null) {
+ return Expressions.alwaysTrue();
+ }
+
+ SqlParser parser = SqlParser.create(filter);
+ try {
+ SqlNode expression = parser.parseExpression();
+ return convert(expression, schema);
+ } catch (Exception exception) {
+ throw new RuntimeException(
+ String.format("Encountered an error when parsing filter: '%s'", filter), exception);
+ }
+ }
+
+ private static Expression convert(SqlNode expression, Schema schema) throws SqlParseException {
+ checkArgument(expression instanceof SqlBasicCall);
+ SqlBasicCall call = (SqlBasicCall) expression;
+
+ SqlOperator op = call.getOperator();
+ SqlKind kind = op.getKind();
+
+ Operation operation =
+ checkArgumentNotNull(
+ FILTERS.get(kind),
+ "Unable to convert SQL operation '%s' in Iceberg expression: %s",
+ kind,
+ expression.toString());
+
+ switch (operation) {
+ case IS_NULL:
+ return Expressions.isNull(getOnlyChildName(call));
+ case NOT_NULL:
+ return Expressions.notNull(getOnlyChildName(call));
+ case LT:
+ return convertFieldAndLiteral(
+ Expressions::lessThan, Expressions::greaterThan, call, schema);
+ case LT_EQ:
+ return convertFieldAndLiteral(
+ Expressions::lessThanOrEqual, Expressions::greaterThanOrEqual, call, schema);
+ case GT:
+ return convertFieldAndLiteral(
+ Expressions::greaterThan, Expressions::lessThan, call, schema);
+ case GT_EQ:
+ return convertFieldAndLiteral(
+ Expressions::greaterThanOrEqual, Expressions::lessThanOrEqual, call, schema);
+ case EQ:
+ return convertFieldAndLiteral(
+ (ref, lit) -> {
+ if (lit == null) {
+ return Expressions.isNull(ref);
+ } else if (NaNUtil.isNaN(lit)) {
+ return Expressions.isNaN(ref);
+ } else {
+ return Expressions.equal(ref, lit);
+ }
+ },
+ call,
+ schema);
+ case NOT_EQ:
+ return convertFieldAndLiteral(
+ (ref, lit) -> {
+ if (lit == null) {
+ return Expressions.notNull(ref);
+ } else if (NaNUtil.isNaN(lit)) {
+ return Expressions.notNaN(ref);
+ } else {
+ return Expressions.notEqual(ref, lit);
+ }
+ },
+ call,
+ schema);
+ case IN:
+ return convertFieldInLiteral(Operation.IN, call, schema);
+ case NOT_IN:
+ return convertFieldInLiteral(Operation.NOT_IN, call, schema);
+ case AND:
+ return convertLogicalExpr(Expressions::and, call, schema);
+ case OR:
+ return convertLogicalExpr(Expressions::or, call, schema);
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported operation '%s' in filter expression: %s", operation, call));
+ }
+ }
+
+ private static String getOnlyChildName(SqlBasicCall call) {
+ checkArgument(
+ call.operandCount() == 1,
+ "Expected only 1 operand but got %s in filter: %s",
+ call.getOperandList(),
+ call.toString());
+ SqlNode ref = call.operand(0);
+ Preconditions.checkState(
+ ref instanceof SqlIdentifier, "Expected operand '%s' to be a reference.", ref);
+ return ((SqlIdentifier) ref).getSimple();
+ }
+
+ private static SqlNode getLeftChild(SqlBasicCall call) {
+ checkArgument(
+ call.operandCount() == 2,
+ "Expected 2 operands but got %s in filter: %s",
+ call.getOperandList(),
+ call.toString());
+ return call.operand(0);
+ }
+
+ private static SqlNode getRightChild(SqlBasicCall call) {
+ checkArgument(
+ call.operandCount() == 2,
+ "Expected 2 operands but got %s in filter: %s",
+ call.getOperandList(),
+ call.toString());
+ return call.operand(1);
+ }
+
+ private static Expression convertLogicalExpr(
+ BiFunction expr, SqlBasicCall call, Schema schema)
+ throws SqlParseException {
+ SqlNode left = getLeftChild(call);
+ SqlNode right = getRightChild(call);
+ return expr.apply(convert(left, schema), convert(right, schema));
+ }
+
+ private static Expression convertFieldInLiteral(Operation op, SqlBasicCall call, Schema schema) {
+ checkArgument(
+ call.operandCount() == 2,
+ "Expected only 2 operands but got %s: %s",
+ call.operandCount(),
+ call);
+ SqlNode term = call.operand(0);
+ SqlNode value = call.operand(1);
+ checkArgument(
+ term instanceof SqlIdentifier,
+ "Expected left hand side to be a field identifier but got " + term.getClass());
+ checkArgument(
+ value instanceof SqlNodeList,
+ "Expected right hand side to be a list but got " + value.getClass());
+ String name = ((SqlIdentifier) term).getSimple();
+ TypeID type = schema.findType(name).typeId();
+ List list =
+ ((SqlNodeList) value)
+ .getList().stream().filter(Objects::nonNull).collect(Collectors.toList());
+ checkArgument(list.stream().allMatch(o -> o instanceof SqlLiteral));
+ List values =
+ list.stream()
+ .map(o -> convertLiteral((SqlLiteral) o, name, type))
+ .collect(Collectors.toList());
+ return op == Operation.IN ? Expressions.in(name, values) : Expressions.notIn(name, values);
+ }
+
+ private static Expression convertFieldAndLiteral(
+ BiFunction expr, SqlBasicCall call, Schema schema) {
+ return convertFieldAndLiteral(expr, expr, call, schema);
+ }
+
+ private static Expression convertFieldAndLiteral(
+ BiFunction convertLR,
+ BiFunction convertRL,
+ SqlBasicCall call,
+ Schema schema) {
+ SqlNode left = getLeftChild(call);
+ SqlNode right = getRightChild(call);
+ if (left instanceof SqlIdentifier && right instanceof SqlLiteral) {
+ String name = ((SqlIdentifier) left).getSimple();
+ TypeID type = schema.findType(name).typeId();
+ Object value = convertLiteral((SqlLiteral) right, name, type);
+ return convertLR.apply(name, value);
+ } else if (left instanceof SqlLiteral && right instanceof SqlIdentifier) {
+ String name = ((SqlIdentifier) right).getSimple();
+ TypeID type = schema.findType(name).typeId();
+ Object value = convertLiteral((SqlLiteral) left, name, type);
+ return convertRL.apply(name, value);
+ } else {
+ throw new IllegalArgumentException("Unsupported operands for expression: " + call);
+ }
+ }
+
+ private static Object convertLiteral(SqlLiteral literal, String field, TypeID type) {
+ switch (type) {
+ case BOOLEAN:
+ return literal.getValueAs(Boolean.class);
+ case INTEGER:
+ return literal.getValueAs(Integer.class);
+ case LONG:
+ return literal.getValueAs(Long.class);
+ case FLOAT:
+ return literal.getValueAs(Float.class);
+ case DOUBLE:
+ return literal.getValueAs(Double.class);
+ case DECIMAL:
+ return literal.getValueAs(BigDecimal.class);
+ case STRING:
+ return literal.getValueAs(String.class);
+ case DATE:
+ LocalDate date = LocalDate.parse(literal.getValueAs(String.class));
+ return DateTimeUtil.daysFromDate(date);
+ case TIME:
+ LocalTime time = LocalTime.parse(literal.getValueAs(String.class));
+ return DateTimeUtil.microsFromTime(time);
+ case TIMESTAMP:
+ LocalDateTime dateTime = LocalDateTime.parse(literal.getValueAs(String.class));
+ return DateTimeUtil.microsFromTimestamp(dateTime);
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported filter type in field '%s': %s", field, type));
+ }
+ }
+}
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java
index 53d0e587f58f..31ff57a668bb 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java
@@ -117,7 +117,8 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
.withStartingStrategy(strategy)
.streaming(configuration.getStreaming())
.keeping(configuration.getKeep())
- .dropping(configuration.getDrop());
+ .dropping(configuration.getDrop())
+ .withFilter(configuration.getFilter());
@Nullable Integer pollIntervalSeconds = configuration.getPollIntervalSeconds();
if (pollIntervalSeconds != null) {
@@ -179,6 +180,12 @@ static Builder builder() {
"The interval at which to poll for new snapshots. Defaults to 60 seconds.")
abstract @Nullable Integer getPollIntervalSeconds();
+ @SchemaFieldDescription(
+ "SQL-like predicate to filter data at scan time. Example: \"id > 5 AND status = 'ACTIVE'\". "
+ + "Uses Apache Calcite syntax: https://calcite.apache.org/docs/reference.html")
+ @Nullable
+ abstract String getFilter();
+
@SchemaFieldDescription(
"A subset of column names to read exclusively. If null or empty, all columns will be read.")
abstract @Nullable List getKeep();
@@ -215,6 +222,8 @@ abstract static class Builder {
abstract Builder setDrop(List drop);
+ abstract Builder setFilter(String filter);
+
abstract Configuration build();
}
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
index b9866786b07d..6b2ea6721e31 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
@@ -497,6 +497,8 @@ public enum StartingStrategy {
abstract @Nullable List getDrop();
+ abstract @Nullable String getFilter();
+
abstract Builder toBuilder();
@AutoValue.Builder
@@ -525,6 +527,8 @@ abstract static class Builder {
abstract Builder setDrop(@Nullable List fields);
+ abstract Builder setFilter(@Nullable String filter);
+
abstract ReadRows build();
}
@@ -572,6 +576,10 @@ public ReadRows dropping(@Nullable List drop) {
return toBuilder().setDrop(drop).build();
}
+ public ReadRows withFilter(@Nullable String filter) {
+ return toBuilder().setFilter(filter).build();
+ }
+
@Override
public PCollection expand(PBegin input) {
TableIdentifier tableId =
@@ -595,6 +603,7 @@ public PCollection expand(PBegin input) {
.setUseCdc(getUseCdc())
.setKeepFields(getKeep())
.setDropFields(getDrop())
+ .setFilter(FilterUtils.convert(getFilter(), table.schema()))
.build();
scanConfig.validate(table);
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
index b4eaf0800f0d..63d6f792e56a 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
@@ -95,7 +95,8 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
IcebergIO.readRows(configuration.getIcebergCatalog())
.from(TableIdentifier.parse(configuration.getTable()))
.keeping(configuration.getKeep())
- .dropping(configuration.getDrop()));
+ .dropping(configuration.getDrop())
+ .withFilter(configuration.getFilter()));
return PCollectionRowTuple.of(OUTPUT_TAG, output);
}
@@ -131,6 +132,12 @@ static Builder builder() {
"A subset of column names to exclude from reading. If null or empty, all columns will be read.")
abstract @Nullable List getDrop();
+ @SchemaFieldDescription(
+ "SQL-like predicate to filter data at scan time. Example: \"id > 5 AND status = 'ACTIVE'\". "
+ + "Uses Apache Calcite syntax: https://calcite.apache.org/docs/reference.html")
+ @Nullable
+ abstract String getFilter();
+
@AutoValue.Builder
abstract static class Builder {
abstract Builder setTable(String table);
@@ -145,6 +152,8 @@ abstract static class Builder {
abstract Builder setDrop(List drop);
+ abstract Builder setFilter(String filter);
+
abstract Configuration build();
}
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java
index 5c3b0471c4be..4edf3512952e 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java
@@ -34,6 +34,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.types.Types;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@@ -89,6 +90,21 @@ public org.apache.iceberg.Schema getProjectedSchema() {
return resolveSchema(getTable().schema(), getKeepFields(), getDropFields());
}
+ @Pure
+ @Nullable
+ public Evaluator getEvaluator() {
+ @Nullable Expression filter = getFilter();
+ if (filter == null) {
+ return null;
+ }
+ if (cachedEvaluator == null) {
+ cachedEvaluator = new Evaluator(getProjectedSchema().asStruct(), filter);
+ }
+ return cachedEvaluator;
+ }
+
+ private transient @Nullable Evaluator cachedEvaluator;
+
@Pure
public abstract @Nullable Expression getFilter();
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java
index 050c549638c8..366f5565d425 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java
@@ -76,7 +76,10 @@ public void process(
FileScanTask task = fileScanTasks.get((int) l);
org.apache.iceberg.Schema projected = scanConfig.getProjectedSchema();
Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(projected);
- try (CloseableIterable reader = ReadUtils.createReader(task, table, projected)) {
+ try (CloseableIterable fullIterable =
+ ReadUtils.createReader(task, table, projected)) {
+ CloseableIterable reader = ReadUtils.maybeApplyFilter(fullIterable, scanConfig);
+
for (Record record : reader) {
Row row = IcebergUtils.icebergRecordToBeamRow(beamSchema, record);
out.output(row);
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java
index 827d17f7819d..7790ad941fc3 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java
@@ -43,7 +43,10 @@
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedInputFile;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
@@ -192,4 +195,14 @@ static List snapshotsBetween(
return snapshotIds;
}
+
+ public static CloseableIterable maybeApplyFilter(
+ CloseableIterable iterable, IcebergScanConfig scanConfig) {
+ Expression filter = scanConfig.getFilter();
+ Evaluator evaluator = scanConfig.getEvaluator();
+ if (filter != null && evaluator != null && filter.op() != Expression.Operation.TRUE) {
+ return CloseableIterable.filter(iterable, evaluator::eval);
+ }
+ return iterable;
+ }
}
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
index 5f63ff14cca9..c52b39dde1c2 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
@@ -183,8 +183,10 @@ public boolean advance() throws IOException {
}
GenericDeleteFilter deleteFilter =
new GenericDeleteFilter(checkStateNotNull(io), fileTask, fileTask.schema(), project);
- currentIterator = deleteFilter.filter(iterable).iterator();
+ iterable = deleteFilter.filter(iterable);
+ iterable = ReadUtils.maybeApplyFilter(iterable, source.getScanConfig());
+ currentIterator = iterable.iterator();
} while (true);
return false;
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java
index 55c10f2096b6..2c92c5572c6d 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java
@@ -59,6 +59,11 @@ Schema getSchema() {
return scanConfig.getProjectedSchema();
}
+ @Pure
+ IcebergScanConfig getScanConfig() {
+ return scanConfig;
+ }
+
@Override
public List extends BoundedSource> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java
new file mode 100644
index 000000000000..34e7be619110
--- /dev/null
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.io.iceberg.FilterUtils.convert;
+import static org.apache.iceberg.expressions.Expressions.and;
+import static org.apache.iceberg.expressions.Expressions.equal;
+import static org.apache.iceberg.expressions.Expressions.greaterThan;
+import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.in;
+import static org.apache.iceberg.expressions.Expressions.isNull;
+import static org.apache.iceberg.expressions.Expressions.lessThan;
+import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.notEqual;
+import static org.apache.iceberg.expressions.Expressions.notIn;
+import static org.apache.iceberg.expressions.Expressions.notNull;
+import static org.apache.iceberg.expressions.Expressions.or;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.apache.iceberg.util.DateTimeUtil.daysFromDate;
+import static org.apache.iceberg.util.DateTimeUtil.microsFromTime;
+import static org.apache.iceberg.util.DateTimeUtil.microsFromTimestamp;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.And;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expression.Operation;
+import org.apache.iceberg.expressions.Or;
+import org.apache.iceberg.expressions.UnboundPredicate;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/** Test class for {@link FilterUtils}. */
+public class FilterUtilsTest {
+ @Test
+ public void testIsNull() {
+ TestCase.expecting(isNull("fiELd_1"))
+ .fromFilter("\"fiELd_1\" IS NULL")
+ .withFieldType(Types.IntegerType.get())
+ .validate();
+ }
+
+ @Test
+ public void testIsNotNull() {
+ TestCase.expecting(notNull("fiELd_1"))
+ .fromFilter("\"fiELd_1\" IS NOT NULL")
+ .withFieldType(Types.IntegerType.get())
+ .validate();
+ }
+
+ @Test
+ public void testLessThan() {
+ // integer
+ TestCase.expecting(lessThan("field_1", 30))
+ .fromFilter("\"field_1\" < 30")
+ .withFieldType(Types.IntegerType.get())
+ .validate();
+
+ // float
+ TestCase.expecting(lessThan("field_1", 30.58f))
+ .fromFilter("\"field_1\" < 30.58")
+ .withFieldType(Types.FloatType.get())
+ .validate();
+
+ // string
+ TestCase.expecting(lessThan("field_1", "xyz"))
+ .fromFilter("\"field_1\" < 'xyz'")
+ .withFieldType(Types.StringType.get())
+ .validate();
+
+ // date
+ TestCase.expecting(lessThan("field_1", daysFromDate(LocalDate.parse("2025-05-03"))))
+ .fromFilter("\"field_1\" < '2025-05-03'")
+ .withFieldType(Types.DateType.get())
+ .validate();
+
+ // time
+ TestCase.expecting(lessThan("field_1", microsFromTime(LocalTime.parse("10:30:05.123"))))
+ .fromFilter("\"field_1\" < '10:30:05.123'")
+ .withFieldType(Types.TimeType.get())
+ .validate();
+
+ // datetime
+ TestCase.expecting(
+ lessThan(
+ "field_1", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123"))))
+ .fromFilter("\"field_1\" < '2025-05-03T10:30:05.123'")
+ .withFieldType(Types.TimestampType.withoutZone())
+ .validate();
+ }
+
+ @Test
+ public void testLessThanOrEqual() {
+ // integer
+ TestCase.expecting(lessThanOrEqual("field_1", 30))
+ .fromFilter("\"field_1\" <= 30")
+ .withFieldType(Types.IntegerType.get())
+ .validate();
+
+ // float
+ TestCase.expecting(lessThanOrEqual("field_1", 30.58f))
+ .fromFilter("\"field_1\" <= 30.58")
+ .withFieldType(Types.FloatType.get())
+ .validate();
+
+ // string
+ TestCase.expecting(lessThanOrEqual("field_1", "xyz"))
+ .fromFilter("\"field_1\" <= 'xyz'")
+ .withFieldType(Types.StringType.get())
+ .validate();
+
+ // date
+ TestCase.expecting(lessThanOrEqual("field_1", daysFromDate(LocalDate.parse("2025-05-03"))))
+ .fromFilter("\"field_1\" <= '2025-05-03'")
+ .withFieldType(Types.DateType.get())
+ .validate();
+
+ // time
+ TestCase.expecting(lessThanOrEqual("field_1", microsFromTime(LocalTime.parse("10:30:05.123"))))
+ .fromFilter("\"field_1\" <= '10:30:05.123'")
+ .withFieldType(Types.TimeType.get())
+ .validate();
+
+ // datetime
+ TestCase.expecting(
+ lessThanOrEqual(
+ "field_1", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123"))))
+ .fromFilter("\"field_1\" <= '2025-05-03T10:30:05.123'")
+ .withFieldType(Types.TimestampType.withoutZone())
+ .validate();
+ }
+
+ @Test
+ public void testGreaterThan() {
+ // integer
+ TestCase.expecting(greaterThan("field_1", 30))
+ .fromFilter("\"field_1\" > 30")
+ .withFieldType(Types.IntegerType.get())
+ .validate();
+
+ // float
+ TestCase.expecting(greaterThan("field_1", 30.58f))
+ .fromFilter("\"field_1\" > 30.58")
+ .withFieldType(Types.FloatType.get())
+ .validate();
+
+ // string
+ TestCase.expecting(greaterThan("field_1", "xyz"))
+ .fromFilter("\"field_1\" > 'xyz'")
+ .withFieldType(Types.StringType.get())
+ .validate();
+
+ // date
+ TestCase.expecting(greaterThan("field_1", daysFromDate(LocalDate.parse("2025-05-03"))))
+ .fromFilter("\"field_1\" > '2025-05-03'")
+ .withFieldType(Types.DateType.get())
+ .validate();
+
+ // time
+ TestCase.expecting(greaterThan("field_1", microsFromTime(LocalTime.parse("10:30:05.123"))))
+ .fromFilter("\"field_1\" > '10:30:05.123'")
+ .withFieldType(Types.TimeType.get())
+ .validate();
+
+ // datetime
+ TestCase.expecting(
+ greaterThan(
+ "field_1", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123"))))
+ .fromFilter("\"field_1\" > '2025-05-03T10:30:05.123'")
+ .withFieldType(Types.TimestampType.withoutZone())
+ .validate();
+ }
+
+ @Test
+ public void testGreaterThanOrEqual() {
+ // integer
+ TestCase.expecting(greaterThanOrEqual("field_1", 30))
+ .fromFilter("\"field_1\" >= 30")
+ .withFieldType(Types.IntegerType.get())
+ .validate();
+
+ // float
+ TestCase.expecting(greaterThanOrEqual("field_1", 30.58f))
+ .fromFilter("\"field_1\" >= 30.58")
+ .withFieldType(Types.FloatType.get())
+ .validate();
+
+ // string
+ TestCase.expecting(greaterThanOrEqual("field_1", "xyz"))
+ .fromFilter("\"field_1\" >= 'xyz'")
+ .withFieldType(Types.StringType.get())
+ .validate();
+
+ // date
+ TestCase.expecting(greaterThanOrEqual("field_1", daysFromDate(LocalDate.parse("2025-05-03"))))
+ .fromFilter("\"field_1\" >= '2025-05-03'")
+ .withFieldType(Types.DateType.get())
+ .validate();
+
+ // time
+ TestCase.expecting(
+ greaterThanOrEqual("field_1", microsFromTime(LocalTime.parse("10:30:05.123"))))
+ .fromFilter("\"field_1\" >= '10:30:05.123'")
+ .withFieldType(Types.TimeType.get())
+ .validate();
+
+ // datetime
+ TestCase.expecting(
+ greaterThanOrEqual(
+ "field_1", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123"))))
+ .fromFilter("\"field_1\" >= '2025-05-03T10:30:05.123'")
+ .withFieldType(Types.TimestampType.withoutZone())
+ .validate();
+ }
+
+ @Test
+ public void testEquals() {
+ // integer
+ TestCase.expecting(equal("field_1", 30))
+ .fromFilter("\"field_1\" = 30")
+ .withFieldType(Types.IntegerType.get())
+ .validate();
+
+ // float
+ TestCase.expecting(equal("field_1", 30.58f))
+ .fromFilter("\"field_1\" = 30.58")
+ .withFieldType(Types.FloatType.get())
+ .validate();
+
+ // string
+ TestCase.expecting(equal("field_1", "xyz"))
+ .fromFilter("\"field_1\" = 'xyz'")
+ .withFieldType(Types.StringType.get())
+ .validate();
+
+ // date
+ TestCase.expecting(equal("field_1", daysFromDate(LocalDate.parse("2025-05-03"))))
+ .fromFilter("\"field_1\" = '2025-05-03'")
+ .withFieldType(Types.DateType.get())
+ .validate();
+
+ // time
+ TestCase.expecting(equal("field_1", microsFromTime(LocalTime.parse("10:30:05.123"))))
+ .fromFilter("\"field_1\" = '10:30:05.123'")
+ .withFieldType(Types.TimeType.get())
+ .validate();
+
+ // datetime
+ TestCase.expecting(
+ equal("field_1", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123"))))
+ .fromFilter("\"field_1\" = '2025-05-03T10:30:05.123'")
+ .withFieldType(Types.TimestampType.withoutZone())
+ .validate();
+ }
+
+ @Test
+ public void testNotEquals() {
+ // integer
+ TestCase.expecting(notEqual("field_1", 30))
+ .fromFilter("\"field_1\" <> 30")
+ .withFieldType(Types.IntegerType.get())
+ .validate();
+
+ // float
+ TestCase.expecting(notEqual("field_1", 30.58f))
+ .fromFilter("\"field_1\" <> 30.58")
+ .withFieldType(Types.FloatType.get())
+ .validate();
+
+ // string
+ TestCase.expecting(notEqual("field_1", "xyz"))
+ .fromFilter("\"field_1\" <> 'xyz'")
+ .withFieldType(Types.StringType.get())
+ .validate();
+
+ // date
+ TestCase.expecting(notEqual("field_1", daysFromDate(LocalDate.parse("2025-05-03"))))
+ .fromFilter("\"field_1\" <> '2025-05-03'")
+ .withFieldType(Types.DateType.get())
+ .validate();
+
+ // time
+ TestCase.expecting(notEqual("field_1", microsFromTime(LocalTime.parse("10:30:05.123"))))
+ .fromFilter("\"field_1\" <> '10:30:05.123'")
+ .withFieldType(Types.TimeType.get())
+ .validate();
+
+ // datetime
+ TestCase.expecting(
+ notEqual(
+ "field_1", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123"))))
+ .fromFilter("\"field_1\" <> '2025-05-03T10:30:05.123'")
+ .withFieldType(Types.TimestampType.withoutZone())
+ .validate();
+ }
+
+ @Test
+ public void testIn() {
+ // string
+ TestCase.expecting(in("field_1", Arrays.asList("xyz", "abc", "123", "foo")))
+ .fromFilter("\"field_1\" IN ('xyz', 'abc', '123', 'foo')")
+ .withFieldType(Types.StringType.get())
+ .validate();
+
+ // integer
+ TestCase.expecting(in("field_1", Arrays.asList(1, 2, 3, 4, 5)))
+ .fromFilter("\"field_1\" IN (1, 2, 3, 4, 5)")
+ .withFieldType(Types.IntegerType.get())
+ .validate();
+ }
+
+ @Test
+ public void testNotIn() {
+ // string
+ TestCase.expecting(notIn("field_1", Arrays.asList("xyz", "abc", "123", "foo")))
+ .fromFilter("\"field_1\" NOT IN ('xyz', 'abc', '123', 'foo')")
+ .withFieldType(Types.StringType.get())
+ .validate();
+
+ // integer
+ TestCase.expecting(notIn("field_1", Arrays.asList(1, 2, 3, 4, 5)))
+ .fromFilter("\"field_1\" NOT IN (1, 2, 3, 4, 5)")
+ .withFieldType(Types.IntegerType.get())
+ .validate();
+ }
+
+ @Test
+ public void testAnd() {
+ TestCase.expecting(
+ and(
+ and(
+ and(
+ and(
+ and(
+ and(
+ and(and(and(IS_NULL, NOT_NULL), LESS_THAN), LESS_THAN_OR_EQUAL),
+ GREATER_THAN),
+ GREATER_THAN_OR_EQUAL),
+ EQUAL),
+ NOT_EQUAL),
+ IN),
+ NOT_IN))
+ .fromFilter(
+ "\"field_1\" IS NULL AND "
+ + "\"field_2\" IS NOT NULL AND "
+ + "\"field_3\" < 'xyz' AND "
+ + "\"field_4\" <= 123 AND "
+ + "\"field_5\" > 123.456 AND "
+ + "\"field_6\" >= '2025-05-03' AND "
+ + "\"field_7\" = '10:30:05.123' AND "
+ + "\"field_8\" <> '2025-05-03T10:30:05.123' AND "
+ + "\"field_9\" IN ('xyz', 'abc', '123', 'foo') AND "
+ + "\"field_10\" NOT IN (1, 2, 3, 4, 5)")
+ .withSchema(SCHEMA)
+ .validate();
+ }
+
+ @Test
+ public void testOr() {
+ TestCase.expecting(
+ or(
+ or(
+ or(
+ or(
+ or(
+ or(
+ or(or(or(IS_NULL, NOT_NULL), LESS_THAN), LESS_THAN_OR_EQUAL),
+ GREATER_THAN),
+ GREATER_THAN_OR_EQUAL),
+ EQUAL),
+ NOT_EQUAL),
+ IN),
+ NOT_IN))
+ .fromFilter(
+ "\"field_1\" IS NULL OR "
+ + "\"field_2\" IS NOT NULL OR "
+ + "\"field_3\" < 'xyz' OR "
+ + "\"field_4\" <= 123 OR "
+ + "\"field_5\" > 123.456 OR "
+ + "\"field_6\" >= '2025-05-03' OR "
+ + "\"field_7\" = '10:30:05.123' OR "
+ + "\"field_8\" <> '2025-05-03T10:30:05.123' OR "
+ + "\"field_9\" IN ('xyz', 'abc', '123', 'foo') OR "
+ + "\"field_10\" NOT IN (1, 2, 3, 4, 5)")
+ .withSchema(SCHEMA)
+ .validate();
+ }
+
+ @Test
+ public void testAndOr() {
+ TestCase.expecting(
+ or(
+ or(
+ or(
+ or(and(IS_NULL, NOT_NULL), and(LESS_THAN, LESS_THAN_OR_EQUAL)),
+ and(GREATER_THAN, GREATER_THAN_OR_EQUAL)),
+ and(EQUAL, NOT_EQUAL)),
+ and(IN, NOT_IN)))
+ .fromFilter(
+ "\"field_1\" IS NULL AND "
+ + "\"field_2\" IS NOT NULL OR "
+ + "\"field_3\" < 'xyz' AND "
+ + "\"field_4\" <= 123 OR "
+ + "\"field_5\" > 123.456 AND "
+ + "\"field_6\" >= '2025-05-03' OR "
+ + "\"field_7\" = '10:30:05.123' AND "
+ + "\"field_8\" <> '2025-05-03T10:30:05.123' OR "
+ + "\"field_9\" IN ('xyz', 'abc', '123', 'foo') AND "
+ + "\"field_10\" NOT IN (1, 2, 3, 4, 5)")
+ .withSchema(SCHEMA)
+ .validate();
+ }
+
+ @Test
+ public void testScanFiles() throws IOException {
+ Schema schema =
+ new Schema(
+ required(1, "id", Types.IntegerType.get()), required(2, "str", Types.StringType.get()));
+ Table table = warehouse.createTable(TableIdentifier.parse("default.table"), schema);
+
+ List recs =
+ IntStream.range(0, 100)
+ .mapToObj(
+ i -> {
+ GenericRecord rec = GenericRecord.create(schema);
+ rec.setField("id", i);
+ rec.setField("str", "value_" + i);
+ return rec;
+ })
+ .collect(Collectors.toList());
+
+ ImmutableList files =
+ ImmutableList.of(
+ warehouse.writeRecords("file_0.parquet", schema, recs.subList(0, 10)),
+ warehouse.writeRecords("file_10.parquet", schema, recs.subList(10, 20)),
+ warehouse.writeRecords("file_20.parquet", schema, recs.subList(20, 30)),
+ warehouse.writeRecords("file_30.parquet", schema, recs.subList(30, 40)),
+ warehouse.writeRecords("file_40.parquet", schema, recs.subList(40, 50)),
+ warehouse.writeRecords("file_50.parquet", schema, recs.subList(50, 60)),
+ warehouse.writeRecords("file_60.parquet", schema, recs.subList(60, 70)),
+ warehouse.writeRecords("file_70.parquet", schema, recs.subList(70, 80)),
+ warehouse.writeRecords("file_80.parquet", schema, recs.subList(80, 90)),
+ warehouse.writeRecords("file_90.parquet", schema, recs.subList(90, 100)));
+
+ AppendFiles append = table.newAppend();
+ files.forEach(append::appendFile);
+ append.commit();
+
+ TableScan scan =
+ table.newScan().project(schema).filter(FilterUtils.convert("\"id\" < 58", schema));
+
+ Set expectedFiles =
+ ImmutableSet.of(
+ "file_0.parquet",
+ "file_10.parquet",
+ "file_20.parquet",
+ "file_30.parquet",
+ "file_40.parquet",
+ "file_50.parquet");
+ ImmutableSet.Builder actualFiles = ImmutableSet.builder();
+ for (FileScanTask task : scan.planFiles()) {
+ String fileName = Iterables.getLast(Splitter.on('/').split(task.file().path().toString()));
+ actualFiles.add(fileName);
+ }
+ assertEquals(expectedFiles, actualFiles.build());
+ }
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ @Rule public TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default");
+
+ private static class TestCase {
+ private @Nullable String filter;
+ private @Nullable Schema schema;
+ private final Expression expected;
+
+ private TestCase(Expression expression) {
+ this.expected = expression;
+ }
+
+ TestCase fromFilter(String filter) {
+ this.filter = filter;
+ return this;
+ }
+
+ TestCase withFieldType(Type type) {
+ String fieldName = ((UnboundPredicate>) expected).ref().name();
+ this.schema = new Schema(Types.NestedField.required(1, fieldName, type));
+ return this;
+ }
+
+ TestCase withSchema(Schema schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ static TestCase expecting(Expression expression) {
+ return new TestCase(expression);
+ }
+
+ void validate() {
+ Preconditions.checkState(
+ schema != null && filter != null, "TestCase has not been fully initialized yet");
+ Expression actual = convert(filter, schema);
+ checkEquals(expected, actual);
+ }
+ }
+
+ private static final Expression IS_NULL = isNull("field_1");
+ private static final Expression NOT_NULL = notNull("field_2");
+ private static final Expression LESS_THAN = lessThan("field_3", "xyz");
+ private static final Expression LESS_THAN_OR_EQUAL = lessThanOrEqual("field_4", 123);
+ private static final Expression GREATER_THAN = greaterThan("field_5", 123.456f);
+ private static final Expression GREATER_THAN_OR_EQUAL =
+ greaterThanOrEqual("field_6", daysFromDate(LocalDate.parse("2025-05-03")));
+ private static final Expression EQUAL =
+ equal("field_7", microsFromTime(LocalTime.parse("10:30:05.123")));
+ private static final Expression NOT_EQUAL =
+ notEqual("field_8", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123")));
+ private static final Expression IN = in("field_9", Arrays.asList("xyz", "abc", "123", "foo"));
+ private static final Expression NOT_IN = notIn("field_10", Arrays.asList(1, 2, 3, 4, 5));
+
+ private static final Schema SCHEMA =
+ new Schema(
+ required(1, "field_1", Types.StringType.get()),
+ required(2, "field_2", Types.StringType.get()),
+ required(3, "field_3", Types.StringType.get()),
+ required(4, "field_4", Types.IntegerType.get()),
+ required(5, "field_5", Types.FloatType.get()),
+ required(6, "field_6", Types.DateType.get()),
+ required(7, "field_7", Types.TimeType.get()),
+ required(8, "field_8", Types.TimestampType.withoutZone()),
+ required(9, "field_9", Types.StringType.get()),
+ required(10, "field_10", Types.IntegerType.get()));
+
+ private static void checkEquals(Expression expectedExpr, Expression actualExpr) {
+ if (expectedExpr instanceof UnboundPredicate) {
+ assertTrue(actualExpr instanceof UnboundPredicate);
+ } else if (expectedExpr instanceof And) {
+ assertTrue(actualExpr instanceof And);
+ checkEqualsAnd((And) expectedExpr, (And) actualExpr);
+ return;
+ } else if (expectedExpr instanceof Or) {
+ assertTrue(actualExpr instanceof Or);
+ checkEqualsOr((Or) expectedExpr, (Or) actualExpr);
+ return;
+ }
+
+ UnboundPredicate> expected = (UnboundPredicate>) expectedExpr;
+ UnboundPredicate> actual = (UnboundPredicate>) actualExpr;
+ assertEquals(expected.op(), actual.op());
+ assertEquals(expected.ref().name(), actual.ref().name());
+
+ ImmutableSet inOperations = ImmutableSet.of(Operation.IN, Operation.NOT_IN);
+ if (inOperations.contains(expected.op())) {
+ System.out.printf(
+ "xxx op: %s, literals: %s, ref: %s%n",
+ expected.op(), expected.literals(), expected.ref().name());
+ assertEquals(expected.literals(), actual.literals());
+ } else {
+ System.out.printf(
+ "xxx op: %s, literal: %s, ref: %s%n",
+ expected.op(), expected.literal(), expected.ref().name());
+ assertEquals(expected.literal(), actual.literal());
+ }
+ }
+
+ private static void checkEqualsAnd(And expected, And actual) {
+ assertEquals(expected.op(), actual.op());
+ checkEquals(expected.left(), actual.left());
+ checkEquals(expected.right(), actual.right());
+ }
+
+ private static void checkEqualsOr(Or expected, Or actual) {
+ assertEquals(expected.op(), actual.op());
+ checkEquals(expected.left(), actual.left());
+ checkEquals(expected.right(), actual.right());
+ }
+}
diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
index ddd16c24009b..5bfcb1345c37 100644
--- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
@@ -22,6 +22,7 @@
import static org.apache.beam.sdk.io.iceberg.IcebergScanConfig.resolveSchema;
import static org.apache.beam.sdk.io.iceberg.IcebergUtils.icebergSchemaToBeamSchema;
import static org.apache.beam.sdk.io.iceberg.TestFixtures.createRecord;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -33,6 +34,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -350,6 +352,51 @@ public void testScanSelectedFields() throws Exception {
testPipeline.run();
}
+ @Test
+ public void testScanWithFilter() throws Exception {
+ TableIdentifier tableId =
+ TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16));
+ Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA);
+ final Schema schema = icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
+
+ List> expectedRecords = warehouse.commitData(simpleTable);
+
+ IcebergIO.ReadRows read =
+ IcebergIO.readRows(catalogConfig())
+ .from(tableId)
+ .withFilter(
+ "\"id\" < 10 AND \"id\" >= 2 AND \"data\" <> 'clammy' AND \"data\" <> 'brainy'");
+
+ if (useIncrementalScan) {
+ read = read.withCdc().toSnapshot(simpleTable.currentSnapshot().snapshotId());
+ }
+ final List expectedRows =
+ expectedRecords.stream()
+ .flatMap(List::stream)
+ .map(record -> IcebergUtils.icebergRecordToBeamRow(schema, record))
+ .filter(
+ row -> {
+ long id = checkStateNotNull(row.getInt64("id"));
+ String data = checkStateNotNull(row.getString("data"));
+ return id < 10
+ && id >= 2
+ && !Objects.equals(data, "clammy")
+ && !Objects.equals(data, "brainy");
+ })
+ .collect(Collectors.toList());
+
+ PCollection output = testPipeline.apply(read).apply(new PrintRow());
+
+ PAssert.that(output)
+ .satisfies(
+ (Iterable rows) -> {
+ assertThat(rows, containsInAnyOrder(expectedRows.toArray()));
+ return null;
+ });
+
+ testPipeline.run();
+ }
+
@Test
public void testReadSchemaWithRandomlyOrderedIds() throws IOException {
TableIdentifier tableId = TableIdentifier.of("default", testName.getMethodName());
diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java
index d224f0caafcf..27aa92b40069 100644
--- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java
@@ -108,11 +108,12 @@ public Map managedIcebergConfig(String tableId) {
@Test
public void testWriteToPartitionedAndValidateWithBQQuery()
throws IOException, InterruptedException {
- // For an example row where bool=true, modulo_5=3, str=value_303,
- // this partition spec will create a partition like: /bool=true/modulo_5=3/str_trunc=value_3/
+ // For an example row where bool_field=true, modulo_5=3, str=value_303,
+ // this partition spec will create a partition like:
+ // /bool_field=true/modulo_5=3/str_trunc=value_3/
PartitionSpec partitionSpec =
PartitionSpec.builderFor(ICEBERG_SCHEMA)
- .identity("bool")
+ .identity("bool_field")
.hour("datetime")
.truncate("str", "value_x".length())
.build();
@@ -136,9 +137,9 @@ public void testWriteToPartitionedAndValidateWithBQQuery()
assertThat(beamRows, containsInAnyOrder(inputRows.toArray()));
String queryByPartition =
- String.format("SELECT bool, datetime FROM `%s.%s`", OPTIONS.getProject(), tableId());
+ String.format("SELECT bool_field, datetime FROM `%s.%s`", OPTIONS.getProject(), tableId());
rows = bqClient.queryUnflattened(queryByPartition, OPTIONS.getProject(), true, true);
- RowFilter rowFilter = new RowFilter(BEAM_SCHEMA).keep(Arrays.asList("bool", "datetime"));
+ RowFilter rowFilter = new RowFilter(BEAM_SCHEMA).keep(Arrays.asList("bool_field", "datetime"));
beamRows =
rows.stream()
.map(tr -> BigQueryUtils.toBeamRow(rowFilter.outputSchema(), tr))
diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java
index dc5e3b263247..8b731c001ad1 100644
--- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java
@@ -29,11 +29,6 @@
import org.apache.iceberg.hadoop.HadoopCatalog;
public class HadoopCatalogIT extends IcebergCatalogBaseIT {
- @Override
- public Integer numRecords() {
- return 100;
- }
-
@Override
public Catalog createCatalog() {
Configuration catalogHadoopConf = new Configuration();
diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
index 30425a75b0da..e4b91fa4014a 100644
--- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
@@ -244,8 +244,8 @@ public void cleanUp() throws Exception {
.addStringField("str")
.addStringField("char")
.addInt64Field("modulo_5")
- .addBooleanField("bool")
- .addInt32Field("int")
+ .addBooleanField("bool_field")
+ .addInt32Field("int_field")
.addRowField("row", NESTED_ROW_SCHEMA)
.addArrayField("arr_long", Schema.FieldType.INT64)
.addNullableRowField("nullable_row", NESTED_ROW_SCHEMA)
@@ -442,14 +442,43 @@ public void testReadAndKeepSomeFields() throws Exception {
}
@Test
- public void testStreamingRead() throws Exception {
+ public void testReadWithFilter() throws Exception {
Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA);
- List expectedRows = populateTable(table);
+ List expectedRows =
+ populateTable(table).stream()
+ .filter(
+ row ->
+ row.getBoolean("bool_field")
+ && (row.getInt32("int_field") < 500 || row.getInt32("modulo_5") == 3))
+ .collect(Collectors.toList());
+
+ Map config = new HashMap<>(managedIcebergConfig(tableId()));
+ config.put("filter", "\"bool_field\" = TRUE AND (\"int_field\" < 500 OR \"modulo_5\" = 3)");
+
+ PCollection rows =
+ pipeline.apply(Managed.read(ICEBERG).withConfig(config)).getSinglePCollection();
+
+ PAssert.that(rows).containsInAnyOrder(expectedRows);
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testStreamingReadWithFilter() throws Exception {
+ Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA);
+
+ List expectedRows =
+ populateTable(table).stream()
+ .filter(
+ row ->
+ row.getBoolean("bool_field")
+ && (row.getInt32("int_field") < 350 || row.getInt32("modulo_5") == 2))
+ .collect(Collectors.toList());
Map config = new HashMap<>(managedIcebergConfig(tableId()));
config.put("streaming", true);
config.put("to_snapshot", table.currentSnapshot().snapshotId());
+ config.put("filter", "\"bool_field\" = TRUE AND (\"int_field\" < 350 OR \"modulo_5\" = 2)");
PCollection rows =
pipeline.apply(Managed.read(ICEBERG_CDC).withConfig(config)).getSinglePCollection();
@@ -512,6 +541,34 @@ public void testWriteRead() throws IOException {
containsInAnyOrder(expectedRows.stream().map(RECORD_FUNC::apply).toArray()));
}
+ @Test
+ public void testWriteReadWithFilter() throws IOException {
+ Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA);
+ List expectedRows =
+ populateTable(table).stream()
+ .filter(
+ row ->
+ row.getBoolean("bool_field")
+ && (row.getInt32("int_field") < 350 || row.getInt32("modulo_5") == 2))
+ .collect(Collectors.toList());
+ Map readConfig = new HashMap<>(managedIcebergConfig(tableId()));
+ readConfig.put("filter", "\"bool_field\" = TRUE AND (\"int_field\" < 350 OR \"modulo_5\" = 2)");
+ String writeTableId = tableId() + "_2";
+ Map writeConfig = managedIcebergConfig(writeTableId);
+
+ pipeline
+ .apply("read", Managed.read(ICEBERG).withConfig(readConfig))
+ .getSinglePCollection()
+ .apply("write", Managed.write(ICEBERG).withConfig(writeConfig));
+ pipeline.run().waitUntilFinish();
+
+ List returnedRecords =
+ readRecords(catalog.loadTable(TableIdentifier.parse(writeTableId)));
+ assertThat(
+ returnedRecords,
+ containsInAnyOrder(expectedRows.stream().map(RECORD_FUNC::apply).toArray()));
+ }
+
@Test
public void testReadWriteStreaming() throws IOException {
Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA);
@@ -565,7 +622,7 @@ public void testWriteToPartitionedTable() throws IOException {
// this partition spec will create a partition like: /bool=true/modulo_5=3/str_trunc=value_3/
PartitionSpec partitionSpec =
PartitionSpec.builderFor(ICEBERG_SCHEMA)
- .identity("bool")
+ .identity("bool_field")
.hour("datetime")
.truncate("str", "value_x".length())
.build();
@@ -594,7 +651,10 @@ private PeriodicImpulse getStreamingSource() {
public void testStreamingWrite() throws IOException {
int numRecords = numRecords();
PartitionSpec partitionSpec =
- PartitionSpec.builderFor(ICEBERG_SCHEMA).identity("bool").identity("modulo_5").build();
+ PartitionSpec.builderFor(ICEBERG_SCHEMA)
+ .identity("bool_field")
+ .identity("modulo_5")
+ .build();
Table table =
catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA, partitionSpec);
@@ -624,7 +684,10 @@ public void testStreamingWrite() throws IOException {
public void testStreamingWriteWithPriorWindowing() throws IOException {
int numRecords = numRecords();
PartitionSpec partitionSpec =
- PartitionSpec.builderFor(ICEBERG_SCHEMA).identity("bool").identity("modulo_5").build();
+ PartitionSpec.builderFor(ICEBERG_SCHEMA)
+ .identity("bool_field")
+ .identity("modulo_5")
+ .build();
Table table =
catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA, partitionSpec);
@@ -668,7 +731,7 @@ private void writeToDynamicDestinations(
String tableIdentifierTemplate = tableId() + "_{modulo_5}_{char}";
Map writeConfig = new HashMap<>(managedIcebergConfig(tableIdentifierTemplate));
- List fieldsToFilter = Arrays.asList("row", "str", "int", "nullable_long");
+ List fieldsToFilter = Arrays.asList("row", "str", "int_field", "nullable_long");
// an un-configured filter will just return the same row
RowFilter rowFilter = new RowFilter(BEAM_SCHEMA);
if (filterOp != null) {
@@ -703,7 +766,7 @@ private void writeToDynamicDestinations(
if (partitioning) {
Preconditions.checkState(filterOp == null || !filterOp.equals("only"));
PartitionSpec partitionSpec =
- PartitionSpec.builderFor(tableSchema).identity("bool").identity("modulo_5").build();
+ PartitionSpec.builderFor(tableSchema).identity("bool_field").identity("modulo_5").build();
catalog.createTable(tableIdentifier0, tableSchema, partitionSpec);
catalog.createTable(tableIdentifier1, tableSchema, partitionSpec);
catalog.createTable(tableIdentifier2, tableSchema, partitionSpec);
diff --git a/website/www/site/content/en/documentation/io/managed-io.md b/website/www/site/content/en/documentation/io/managed-io.md
index ae707885cfe2..d9420cb6a16c 100644
--- a/website/www/site/content/en/documentation/io/managed-io.md
+++ b/website/www/site/content/en/documentation/io/managed-io.md
@@ -63,6 +63,7 @@ and Beam SQL is invoked via the Managed API under the hood.
catalog_properties (map[str , str ])
config_properties (map[str , str ])
drop (list[str ])
+ filter (str)
from_snapshot (int64)
from_timestamp (int64)
keep (list[str ])
@@ -84,6 +85,7 @@ and Beam SQL is invoked via the Managed API under the hood.
catalog_properties (map[str , str ])
config_properties (map[str , str ])
drop (list[str ])
+ filter (str)
keep (list[str ])
@@ -207,6 +209,17 @@ and Beam SQL is invoked via the Managed API under the hood.
A subset of column names to exclude from reading. If null or empty, all columns will be read.
+
+
+ filter
+
+
+ str
+
+
+ SQL-like predicate to filter data at scan time. Example: "id > 5 AND status = 'ACTIVE'". Uses Apache Calcite syntax: https://calcite.apache.org/docs/reference.html
+
+
from_snapshot
@@ -462,6 +475,17 @@ and Beam SQL is invoked via the Managed API under the hood.
A subset of column names to exclude from reading. If null or empty, all columns will be read.
+
+
+ filter
+
+
+ str
+
+
+ SQL-like predicate to filter data at scan time. Example: "id > 5 AND status = 'ACTIVE'". Uses Apache Calcite syntax: https://calcite.apache.org/docs/reference.html
+
+
keep