From bcbf5a7a99800c1856c532a640147cbc807ca43a Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 2 May 2025 22:34:07 -0400 Subject: [PATCH 01/15] filter utils --- sdks/java/io/iceberg/build.gradle | 4 + .../beam/sdk/io/iceberg/FilterUtils.java | 250 ++++++++++++++++++ 2 files changed, 254 insertions(+) create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index e4d148cf5563..84ed45208add 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -58,6 +58,10 @@ dependencies { implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" implementation "org.apache.iceberg:iceberg-data:$iceberg_version" implementation library.java.hadoop_common + // TODO(https://github.com/apache/beam/issues/21156): Determine how to build without this dependency + provided "org.immutables:value:2.8.8" + permitUnusedDeclared "org.immutables:value:2.8.8" + implementation library.java.vendored_calcite_1_28_0 runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version" testImplementation project(":sdks:java:managed") diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java new file mode 100644 index 000000000000..8377c3a42bd9 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlBasicCall; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlIdentifier; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlLiteral; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlNode; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlOperator; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParseException; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParser; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expression.Operation; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.util.NaNUtil; + +public class FilterUtils { + + private static final Map FILTERS = + ImmutableMap.builder() + .put(SqlKind.IS_TRUE, Operation.TRUE) + .put(SqlKind.IS_NOT_FALSE, Operation.TRUE) + .put(SqlKind.IS_FALSE, Operation.FALSE) + .put(SqlKind.IS_NOT_TRUE, Operation.FALSE) + .put(SqlKind.IS_NULL, Operation.IS_NULL) + .put(SqlKind.IS_NOT_NULL, Operation.NOT_NULL) + .put(SqlKind.LESS_THAN, Operation.LT) + .put(SqlKind.LESS_THAN_OR_EQUAL, Operation.LT_EQ) + .put(SqlKind.GREATER_THAN, Operation.GT) + .put(SqlKind.GREATER_THAN_OR_EQUAL, Operation.GT_EQ) + .put(SqlKind.EQUALS, Operation.EQ) + .put(SqlKind.NOT_EQUALS, Operation.NOT_EQ) + .put(SqlKind.IN, Operation.IN) + .put(SqlKind.NOT_IN, Operation.NOT_IN) + .put(SqlKind.AND, Operation.AND) + .put(SqlKind.OR, Operation.OR) + .build(); + + public static Expression convert(String filter) throws SqlParseException { + SqlParser parser = SqlParser.create(filter); + return convert(parser.parseExpression()); + } + + private static Expression convert(SqlNode expression) throws SqlParseException { + Preconditions.checkArgument(expression instanceof SqlBasicCall); + SqlBasicCall call = (SqlBasicCall) expression; + + SqlOperator op = call.getOperator(); + SqlKind kind = op.getKind(); + + Operation operation = + checkArgumentNotNull( + FILTERS.get(kind), + "Unable to convert filter to Iceberg expression: %s", + expression.toString()); + + switch (operation) { + case TRUE: + return Expressions.alwaysTrue(); + case FALSE: + return Expressions.alwaysFalse(); + case IS_NULL: + return Expressions.isNull(getOnlyChild(call).toString()); + case NOT_NULL: + return Expressions.notNull(getOnlyChild(call).toString()); + case LT: + return convertFieldAndLiteral(Expressions::lessThan, Expressions::greaterThan, call); + case LT_EQ: + return convertFieldAndLiteral( + Expressions::lessThanOrEqual, Expressions::greaterThanOrEqual, call); + case GT: + return convertFieldAndLiteral(Expressions::greaterThan, Expressions::lessThan, call); + case GT_EQ: + return convertFieldAndLiteral( + Expressions::greaterThanOrEqual, Expressions::lessThanOrEqual, call); + case EQ: + return convertFieldAndLiteral( + (ref, lit) -> { + if (lit == null) { + return Expressions.isNull(ref); + } else if (NaNUtil.isNaN(lit)) { + return Expressions.isNaN(ref); + } else { + return Expressions.equal(ref, lit); + } + }, + call); + case NOT_EQ: + return convertFieldAndLiteral( + (ref, lit) -> { + if (lit == null) { + return Expressions.notNull(ref); + } else if (NaNUtil.isNaN(lit)) { + return Expressions.notNaN(ref); + } else { + return Expressions.notEqual(ref, lit); + } + }, + call); + case IN: + return convertFieldInLiteral(Expressions::in, call); + case NOT_IN: + return convertFieldInLiteral(Expressions::notIn, call); + case AND: + return convertLogicalExpr(Expressions::and, call); + case OR: + return convertLogicalExpr(Expressions::or, call); + default: + throw new IllegalArgumentException( + String.format("Unsupported operation '%s' in filter expression: %s", operation, call)); + } + } + + private static SqlNode getOnlyChild(SqlBasicCall call) { + Preconditions.checkArgument( + call.operandCount() == 1, + "Expected only 1 operand but got %s in filter: %s", + call.getOperandList(), + call.toString()); + return call.operand(0); + } + + private static SqlNode getLeftChild(SqlBasicCall call) { + Preconditions.checkArgument( + call.operandCount() == 2, + "Expected 2 operands but got %s in filter: %s", + call.getOperandList(), + call.toString()); + return call.operand(0); + } + + private static SqlNode getRightChild(SqlBasicCall call) { + Preconditions.checkArgument( + call.operandCount() == 2, + "Expected 2 operands but got %s in filter: %s", + call.getOperandList(), + call.toString()); + return call.operand(1); + } + + private static Expression convertLogicalExpr( + BiFunction expr, SqlBasicCall call) + throws SqlParseException { + SqlNode left = getLeftChild(call); + SqlNode right = getRightChild(call); + return expr.apply(convert(left), convert(right)); + } + + private static Expression convertFieldInLiteral( + BiFunction expr, SqlBasicCall call) { + List operands = call.getOperandList(); + operands = operands.subList(1, operands.size()); + SqlNode term = call.operand(0); + Preconditions.checkArgument( + term instanceof SqlIdentifier && operands.stream().allMatch(o -> o instanceof SqlLiteral)); + String field = ((SqlIdentifier) term).getSimple(); + List values = + operands.stream() + .map(o -> convertLiteral((SqlLiteral) o, field)) + .collect(Collectors.toList()); + return expr.apply(field, values); + } + + private static Expression convertFieldAndLiteral( + BiFunction expr, SqlBasicCall call) { + return convertFieldAndLiteral(expr, expr, call); + } + + private static Expression convertFieldAndLiteral( + BiFunction convertLR, + BiFunction convertRL, + SqlBasicCall call) { + SqlNode left = getLeftChild(call); + SqlNode right = getRightChild(call); + if (left instanceof SqlIdentifier && right instanceof SqlLiteral) { + String field = ((SqlIdentifier) left).getSimple(); + Object value = convertLiteral((SqlLiteral) right, field); + return convertLR.apply(field, value); + } else if (left instanceof SqlLiteral && right instanceof SqlIdentifier) { + String field = ((SqlIdentifier) right).getSimple(); + Object value = convertLiteral((SqlLiteral) left, field); + return convertRL.apply(field, value); + } else { + throw new IllegalArgumentException("Unsupported operands for expression: " + call); + } + } + + private static Object convertLiteral(SqlLiteral literal, String field) { + switch (literal.getTypeName()) { + case BOOLEAN: + return literal.getValueAs(Boolean.class); + case TINYINT: + case SMALLINT: + case INTEGER: + return literal.getValueAs(Integer.class); + case BIGINT: + return literal.getValueAs(Long.class); + case FLOAT: + return literal.getValueAs(Float.class); + case DOUBLE: + return literal.getValueAs(Double.class); + case DECIMAL: + return literal.getValueAs(BigDecimal.class); + case CHAR: + case VARCHAR: + return literal.getValueAs(String.class); + case DATE: + Date sqlDate = literal.getValueAs(Date.class); + return sqlDate.toLocalDate(); + case TIME: + Time sqlTime = literal.getValueAs(Time.class); + return sqlTime.toLocalTime(); + case TIMESTAMP: + Timestamp ts = literal.getValueAs(Timestamp.class); + return ts.toLocalDateTime(); + default: + throw new IllegalArgumentException( + String.format( + "Unsupported literal type in field '%s': %s", field, literal.getTypeName())); + } + } +} From 30835862d432c2564fed1a5a30df44779449f109 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 2 May 2025 22:49:34 -0400 Subject: [PATCH 02/15] apply throughout icebergIO --- .../beam/sdk/io/iceberg/CreateReadTasksDoFn.java | 3 ++- .../apache/beam/sdk/io/iceberg/FilterUtils.java | 14 ++++++++++++-- .../IcebergCdcReadSchemaTransformProvider.java | 9 ++++++++- .../org/apache/beam/sdk/io/iceberg/IcebergIO.java | 9 +++++++++ .../IcebergReadSchemaTransformProvider.java | 9 ++++++++- .../beam/sdk/io/iceberg/IcebergScanConfig.java | 4 ++++ 6 files changed, 43 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.java index 895563e960ad..fedb13812989 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.java @@ -78,7 +78,8 @@ public void process( } LOG.info("Planning to scan snapshot {}", toSnapshot); - IncrementalAppendScan scan = table.newIncrementalAppendScan().toSnapshot(toSnapshot); + IncrementalAppendScan scan = + table.newIncrementalAppendScan().toSnapshot(toSnapshot).filter(scanConfig.getFilter()); if (fromSnapshot != null) { scan = scan.fromSnapshotExclusive(fromSnapshot); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java index 8377c3a42bd9..eeae9096a89b 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java @@ -41,6 +41,7 @@ import org.apache.iceberg.expressions.Expression.Operation; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.util.NaNUtil; +import org.checkerframework.checker.nullness.qual.Nullable; public class FilterUtils { @@ -64,9 +65,18 @@ public class FilterUtils { .put(SqlKind.OR, Operation.OR) .build(); - public static Expression convert(String filter) throws SqlParseException { + public static Expression convert(@Nullable String filter) { + if (filter == null) { + return Expressions.alwaysTrue(); + } + SqlParser parser = SqlParser.create(filter); - return convert(parser.parseExpression()); + try { + SqlNode expression = parser.parseExpression(); + return convert(expression); + } catch (SqlParseException exception) { + throw new RuntimeException("Encountered an error when parsing filter: " + filter, exception); + } } private static Expression convert(SqlNode expression) throws SqlParseException { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java index 0064b49475d0..70efe9a41bdb 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java @@ -115,7 +115,8 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .fromTimestamp(configuration.getFromTimestamp()) .toTimestamp(configuration.getToTimestamp()) .withStartingStrategy(strategy) - .streaming(configuration.getStreaming()); + .streaming(configuration.getStreaming()) + .withFilter(configuration.getFilter()); @Nullable Integer pollIntervalSeconds = configuration.getPollIntervalSeconds(); if (pollIntervalSeconds != null) { @@ -177,6 +178,10 @@ static Builder builder() { "The interval at which to poll for new snapshots. Defaults to 60 seconds.") abstract @Nullable Integer getPollIntervalSeconds(); + @SchemaFieldDescription("SQL-like filter to apply when scanning for files.") + @Nullable + abstract String getFilter(); + @AutoValue.Builder abstract static class Builder { abstract Builder setTable(String table); @@ -201,6 +206,8 @@ abstract static class Builder { abstract Builder setStreaming(Boolean streaming); + abstract Builder setFilter(String filter); + abstract Configuration build(); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index efeb9a97587f..3c94c7aa2a75 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -603,6 +603,8 @@ public enum StartingStrategy { abstract @Nullable Duration getPollInterval(); + abstract @Nullable String getFilter(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -627,6 +629,8 @@ abstract static class Builder { abstract Builder setPollInterval(@Nullable Duration triggeringFrequency); + abstract Builder setFilter(@Nullable String filter); + abstract ReadRows build(); } @@ -666,6 +670,10 @@ public ReadRows withStartingStrategy(@Nullable StartingStrategy strategy) { return toBuilder().setStartingStrategy(strategy).build(); } + public ReadRows withFilter(@Nullable String filter) { + return toBuilder().setFilter(filter).build(); + } + @Override public PCollection expand(PBegin input) { TableIdentifier tableId = @@ -687,6 +695,7 @@ public PCollection expand(PBegin input) { .setStreaming(getStreaming()) .setPollInterval(getPollInterval()) .setUseCdc(getUseCdc()) + .setFilter(getFilter()) .build(); scanConfig.validate(table); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java index 35500676ae2f..62cab1f0dd51 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java @@ -93,7 +93,8 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .getPipeline() .apply( IcebergIO.readRows(configuration.getIcebergCatalog()) - .from(TableIdentifier.parse(configuration.getTable()))); + .from(TableIdentifier.parse(configuration.getTable())) + .withFilter(configuration.getFilter())); return PCollectionRowTuple.of(OUTPUT_TAG, output); } @@ -121,6 +122,10 @@ static Builder builder() { @Nullable abstract Map getConfigProperties(); + @SchemaFieldDescription("SQL-like filter to apply when scanning for files.") + @Nullable + abstract String getFilter(); + @AutoValue.Builder abstract static class Builder { abstract Builder setTable(String table); @@ -131,6 +136,8 @@ abstract static class Builder { abstract Builder setConfigProperties(Map confProperties); + abstract Builder setFilter(String filter); + abstract Configuration build(); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index ff5e4c736244..047b210d885b 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -160,6 +160,10 @@ public Builder setTableIdentifier(TableIdentifier tableIdentifier) { return this.setTableIdentifier(tableIdentifier.toString()); } + public Builder setFilter(@Nullable String filter) { + return setFilter(FilterUtils.convert(filter)); + } + public Builder setTableIdentifier(String... names) { return setTableIdentifier(TableIdentifier.of(names)); } From 2511b7d5ac945a461e34b61eb6e1030b8f1885cd Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 2 May 2025 22:55:56 -0400 Subject: [PATCH 03/15] time types --- .../org/apache/beam/sdk/io/iceberg/FilterUtils.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java index eeae9096a89b..34fcb3462404 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java @@ -40,11 +40,11 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expression.Operation; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.NaNUtil; import org.checkerframework.checker.nullness.qual.Nullable; -public class FilterUtils { - +class FilterUtils { private static final Map FILTERS = ImmutableMap.builder() .put(SqlKind.IS_TRUE, Operation.TRUE) @@ -65,7 +65,7 @@ public class FilterUtils { .put(SqlKind.OR, Operation.OR) .build(); - public static Expression convert(@Nullable String filter) { + static Expression convert(@Nullable String filter) { if (filter == null) { return Expressions.alwaysTrue(); } @@ -244,13 +244,13 @@ private static Object convertLiteral(SqlLiteral literal, String field) { return literal.getValueAs(String.class); case DATE: Date sqlDate = literal.getValueAs(Date.class); - return sqlDate.toLocalDate(); + return DateTimeUtil.daysFromDate(sqlDate.toLocalDate()); case TIME: Time sqlTime = literal.getValueAs(Time.class); - return sqlTime.toLocalTime(); + return DateTimeUtil.microsFromTime(sqlTime.toLocalTime()); case TIMESTAMP: Timestamp ts = literal.getValueAs(Timestamp.class); - return ts.toLocalDateTime(); + return DateTimeUtil.microsFromTimestamp(ts.toLocalDateTime()); default: throw new IllegalArgumentException( String.format( From ad636d8716c8861a36291f639f0ff867b6e066ab Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 5 May 2025 16:23:51 -0400 Subject: [PATCH 04/15] add test class and apply to reader filter --- .../IO_Iceberg_Integration_Tests.json | 2 +- ...IO_Iceberg_Integration_Tests_Dataflow.json | 2 +- .../sdk/io/iceberg/CreateReadTasksDoFn.java | 8 +- .../beam/sdk/io/iceberg/FilterUtils.java | 178 +++-- .../apache/beam/sdk/io/iceberg/IcebergIO.java | 2 +- .../sdk/io/iceberg/IcebergScanConfig.java | 4 - .../beam/sdk/io/iceberg/ReadFromTasks.java | 12 +- .../beam/sdk/io/iceberg/ScanTaskReader.java | 11 +- .../beam/sdk/io/iceberg/ScanTaskSource.java | 8 + .../beam/sdk/io/iceberg/FilterUtilsTest.java | 622 ++++++++++++++++++ .../sdk/io/iceberg/IcebergIOReadTest.java | 34 + .../io/iceberg/catalog/HadoopCatalogIT.java | 5 - .../iceberg/catalog/IcebergCatalogBaseIT.java | 83 ++- 13 files changed, 875 insertions(+), 96 deletions(-) create mode 100644 sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 7ab7bcd9a9c6..37dd25bf9029 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 2 + "modification": 3 } diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json b/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json index 5abe02fc09c7..3a009261f4f9 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 1 + "modification": 2 } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.java index fedb13812989..7db130376c4c 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.java @@ -30,6 +30,7 @@ import org.apache.iceberg.DataOperations; import org.apache.iceberg.IncrementalAppendScan; import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -78,11 +79,14 @@ public void process( } LOG.info("Planning to scan snapshot {}", toSnapshot); - IncrementalAppendScan scan = - table.newIncrementalAppendScan().toSnapshot(toSnapshot).filter(scanConfig.getFilter()); + IncrementalAppendScan scan = table.newIncrementalAppendScan().toSnapshot(toSnapshot); if (fromSnapshot != null) { scan = scan.fromSnapshotExclusive(fromSnapshot); } + @Nullable Expression filter = scanConfig.getFilter(); + if (filter != null) { + scan = scan.filter(filter); + } createAndOutputReadTasks(scan, snapshot, out); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java index 34fcb3462404..1743045e01fe 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java @@ -18,13 +18,15 @@ package org.apache.beam.sdk.io.iceberg; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.BiFunction; import java.util.stream.Collectors; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlBasicCall; @@ -32,25 +34,32 @@ import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlLiteral; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlNode; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlNodeList; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlOperator; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParseException; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParser; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expression.Operation; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.types.Type.TypeID; import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.NaNUtil; import org.checkerframework.checker.nullness.qual.Nullable; +/** + * Utilities that convert between a SQL filter expression and an Iceberg {@link Expression}. Uses + * Apache Calcite semantics. + * + *

Note: Only supports top-level fields (i.e. cannot reference nested fields). + */ class FilterUtils { private static final Map FILTERS = ImmutableMap.builder() - .put(SqlKind.IS_TRUE, Operation.TRUE) - .put(SqlKind.IS_NOT_FALSE, Operation.TRUE) - .put(SqlKind.IS_FALSE, Operation.FALSE) - .put(SqlKind.IS_NOT_TRUE, Operation.FALSE) .put(SqlKind.IS_NULL, Operation.IS_NULL) .put(SqlKind.IS_NOT_NULL, Operation.NOT_NULL) .put(SqlKind.LESS_THAN, Operation.LT) @@ -65,7 +74,23 @@ class FilterUtils { .put(SqlKind.OR, Operation.OR) .build(); - static Expression convert(@Nullable String filter) { + static FilterFunction filterOn(Expression expression, Schema schema) { + return new FilterFunction(expression, schema); + } + + static class FilterFunction { + private final Evaluator evaluator; + + FilterFunction(Expression expression, Schema schema) { + this.evaluator = new Evaluator(schema.asStruct(), expression); + } + + boolean filter(Record rec) { + return evaluator.eval(rec); + } + } + + static Expression convert(@Nullable String filter, Schema schema) { if (filter == null) { return Expressions.alwaysTrue(); } @@ -73,14 +98,15 @@ static Expression convert(@Nullable String filter) { SqlParser parser = SqlParser.create(filter); try { SqlNode expression = parser.parseExpression(); - return convert(expression); - } catch (SqlParseException exception) { - throw new RuntimeException("Encountered an error when parsing filter: " + filter, exception); + return convert(expression, schema); + } catch (Exception exception) { + throw new RuntimeException( + String.format("Encountered an error when parsing filter: '%s'", filter), exception); } } - private static Expression convert(SqlNode expression) throws SqlParseException { - Preconditions.checkArgument(expression instanceof SqlBasicCall); + private static Expression convert(SqlNode expression, Schema schema) throws SqlParseException { + checkArgument(expression instanceof SqlBasicCall); SqlBasicCall call = (SqlBasicCall) expression; SqlOperator op = call.getOperator(); @@ -89,28 +115,27 @@ private static Expression convert(SqlNode expression) throws SqlParseException { Operation operation = checkArgumentNotNull( FILTERS.get(kind), - "Unable to convert filter to Iceberg expression: %s", + "Unable to convert SQL operation '%s' in Iceberg expression: %s", + kind, expression.toString()); switch (operation) { - case TRUE: - return Expressions.alwaysTrue(); - case FALSE: - return Expressions.alwaysFalse(); case IS_NULL: - return Expressions.isNull(getOnlyChild(call).toString()); + return Expressions.isNull(getOnlyChildName(call)); case NOT_NULL: - return Expressions.notNull(getOnlyChild(call).toString()); + return Expressions.notNull(getOnlyChildName(call)); case LT: - return convertFieldAndLiteral(Expressions::lessThan, Expressions::greaterThan, call); + return convertFieldAndLiteral( + Expressions::lessThan, Expressions::greaterThan, call, schema); case LT_EQ: return convertFieldAndLiteral( - Expressions::lessThanOrEqual, Expressions::greaterThanOrEqual, call); + Expressions::lessThanOrEqual, Expressions::greaterThanOrEqual, call, schema); case GT: - return convertFieldAndLiteral(Expressions::greaterThan, Expressions::lessThan, call); + return convertFieldAndLiteral( + Expressions::greaterThan, Expressions::lessThan, call, schema); case GT_EQ: return convertFieldAndLiteral( - Expressions::greaterThanOrEqual, Expressions::lessThanOrEqual, call); + Expressions::greaterThanOrEqual, Expressions::lessThanOrEqual, call, schema); case EQ: return convertFieldAndLiteral( (ref, lit) -> { @@ -122,7 +147,8 @@ private static Expression convert(SqlNode expression) throws SqlParseException { return Expressions.equal(ref, lit); } }, - call); + call, + schema); case NOT_EQ: return convertFieldAndLiteral( (ref, lit) -> { @@ -134,32 +160,36 @@ private static Expression convert(SqlNode expression) throws SqlParseException { return Expressions.notEqual(ref, lit); } }, - call); + call, + schema); case IN: - return convertFieldInLiteral(Expressions::in, call); + return convertFieldInLiteral(Operation.IN, call, schema); case NOT_IN: - return convertFieldInLiteral(Expressions::notIn, call); + return convertFieldInLiteral(Operation.NOT_IN, call, schema); case AND: - return convertLogicalExpr(Expressions::and, call); + return convertLogicalExpr(Expressions::and, call, schema); case OR: - return convertLogicalExpr(Expressions::or, call); + return convertLogicalExpr(Expressions::or, call, schema); default: throw new IllegalArgumentException( String.format("Unsupported operation '%s' in filter expression: %s", operation, call)); } } - private static SqlNode getOnlyChild(SqlBasicCall call) { - Preconditions.checkArgument( + private static String getOnlyChildName(SqlBasicCall call) { + checkArgument( call.operandCount() == 1, "Expected only 1 operand but got %s in filter: %s", call.getOperandList(), call.toString()); - return call.operand(0); + SqlNode ref = call.operand(0); + Preconditions.checkState( + ref instanceof SqlIdentifier, "Expected operand '%s' to be a reference.", ref); + return ((SqlIdentifier) ref).getSimple().toLowerCase(); } private static SqlNode getLeftChild(SqlBasicCall call) { - Preconditions.checkArgument( + checkArgument( call.operandCount() == 2, "Expected 2 operands but got %s in filter: %s", call.getOperandList(), @@ -168,7 +198,7 @@ private static SqlNode getLeftChild(SqlBasicCall call) { } private static SqlNode getRightChild(SqlBasicCall call) { - Preconditions.checkArgument( + checkArgument( call.operandCount() == 2, "Expected 2 operands but got %s in filter: %s", call.getOperandList(), @@ -177,61 +207,73 @@ private static SqlNode getRightChild(SqlBasicCall call) { } private static Expression convertLogicalExpr( - BiFunction expr, SqlBasicCall call) + BiFunction expr, SqlBasicCall call, Schema schema) throws SqlParseException { SqlNode left = getLeftChild(call); SqlNode right = getRightChild(call); - return expr.apply(convert(left), convert(right)); + return expr.apply(convert(left, schema), convert(right, schema)); } - private static Expression convertFieldInLiteral( - BiFunction expr, SqlBasicCall call) { - List operands = call.getOperandList(); - operands = operands.subList(1, operands.size()); + private static Expression convertFieldInLiteral(Operation op, SqlBasicCall call, Schema schema) { + checkArgument( + call.operandCount() == 2, + "Expected only 2 operands but got %s: %s", + call.operandCount(), + call); SqlNode term = call.operand(0); - Preconditions.checkArgument( - term instanceof SqlIdentifier && operands.stream().allMatch(o -> o instanceof SqlLiteral)); - String field = ((SqlIdentifier) term).getSimple(); + SqlNode value = call.operand(1); + checkArgument( + term instanceof SqlIdentifier, + "Expected left hand side to be a field identifier but got " + term.getClass()); + checkArgument( + value instanceof SqlNodeList, + "Expected right hand side to be a list but got " + value.getClass()); + String field = ((SqlIdentifier) term).getSimple().toLowerCase(); + List list = + ((SqlNodeList) value) + .getList().stream().filter(Objects::nonNull).collect(Collectors.toList()); + checkArgument(list.stream().allMatch(o -> o instanceof SqlLiteral)); List values = - operands.stream() - .map(o -> convertLiteral((SqlLiteral) o, field)) + list.stream() + .map(o -> convertLiteral((SqlLiteral) o, field, schema.findType(field).typeId())) .collect(Collectors.toList()); - return expr.apply(field, values); + return op == Operation.IN ? Expressions.in(field, values) : Expressions.notIn(field, values); } private static Expression convertFieldAndLiteral( - BiFunction expr, SqlBasicCall call) { - return convertFieldAndLiteral(expr, expr, call); + BiFunction expr, SqlBasicCall call, Schema schema) { + return convertFieldAndLiteral(expr, expr, call, schema); } private static Expression convertFieldAndLiteral( BiFunction convertLR, BiFunction convertRL, - SqlBasicCall call) { + SqlBasicCall call, + Schema schema) { SqlNode left = getLeftChild(call); SqlNode right = getRightChild(call); if (left instanceof SqlIdentifier && right instanceof SqlLiteral) { - String field = ((SqlIdentifier) left).getSimple(); - Object value = convertLiteral((SqlLiteral) right, field); + String field = ((SqlIdentifier) left).getSimple().toLowerCase(); + TypeID type = schema.findType(field).typeId(); + Object value = convertLiteral((SqlLiteral) right, field, type); return convertLR.apply(field, value); } else if (left instanceof SqlLiteral && right instanceof SqlIdentifier) { - String field = ((SqlIdentifier) right).getSimple(); - Object value = convertLiteral((SqlLiteral) left, field); + String field = ((SqlIdentifier) right).getSimple().toLowerCase(); + TypeID type = schema.findType(field).typeId(); + Object value = convertLiteral((SqlLiteral) left, field, type); return convertRL.apply(field, value); } else { throw new IllegalArgumentException("Unsupported operands for expression: " + call); } } - private static Object convertLiteral(SqlLiteral literal, String field) { - switch (literal.getTypeName()) { + private static Object convertLiteral(SqlLiteral literal, String field, TypeID type) { + switch (type) { case BOOLEAN: return literal.getValueAs(Boolean.class); - case TINYINT: - case SMALLINT: case INTEGER: return literal.getValueAs(Integer.class); - case BIGINT: + case LONG: return literal.getValueAs(Long.class); case FLOAT: return literal.getValueAs(Float.class); @@ -239,22 +281,20 @@ private static Object convertLiteral(SqlLiteral literal, String field) { return literal.getValueAs(Double.class); case DECIMAL: return literal.getValueAs(BigDecimal.class); - case CHAR: - case VARCHAR: + case STRING: return literal.getValueAs(String.class); case DATE: - Date sqlDate = literal.getValueAs(Date.class); - return DateTimeUtil.daysFromDate(sqlDate.toLocalDate()); + LocalDate date = LocalDate.parse(literal.getValueAs(String.class)); + return DateTimeUtil.daysFromDate(date); case TIME: - Time sqlTime = literal.getValueAs(Time.class); - return DateTimeUtil.microsFromTime(sqlTime.toLocalTime()); + LocalTime time = LocalTime.parse(literal.getValueAs(String.class)); + return DateTimeUtil.microsFromTime(time); case TIMESTAMP: - Timestamp ts = literal.getValueAs(Timestamp.class); - return DateTimeUtil.microsFromTimestamp(ts.toLocalDateTime()); + LocalDateTime dateTime = LocalDateTime.parse(literal.getValueAs(String.class)); + return DateTimeUtil.microsFromTimestamp(dateTime); default: throw new IllegalArgumentException( - String.format( - "Unsupported literal type in field '%s': %s", field, literal.getTypeName())); + String.format("Unsupported filter type in field '%s': %s", field, type)); } } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 3c94c7aa2a75..8457e794a76a 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -695,7 +695,7 @@ public PCollection expand(PBegin input) { .setStreaming(getStreaming()) .setPollInterval(getPollInterval()) .setUseCdc(getUseCdc()) - .setFilter(getFilter()) + .setFilter(FilterUtils.convert(getFilter(), table.schema())) .build(); scanConfig.validate(table); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index 047b210d885b..ff5e4c736244 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -160,10 +160,6 @@ public Builder setTableIdentifier(TableIdentifier tableIdentifier) { return this.setTableIdentifier(tableIdentifier.toString()); } - public Builder setFilter(@Nullable String filter) { - return setFilter(FilterUtils.convert(filter)); - } - public Builder setTableIdentifier(String... names) { return setTableIdentifier(TableIdentifier.of(names)); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java index ec5a11e9377c..48c9a82f7f86 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.io.iceberg.FilterUtils.FilterFunction; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; @@ -30,7 +31,9 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Table; import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Bounded read implementation. @@ -63,6 +66,7 @@ public void process( throws IOException, ExecutionException, InterruptedException { ReadTask readTask = element.getValue(); Table table = TableCache.get(scanConfig.getTableIdentifier()); + @Nullable Expression filter = scanConfig.getFilter(); List fileScanTasks = readTask.getFileScanTasks(); @@ -73,7 +77,13 @@ public void process( return; } FileScanTask task = fileScanTasks.get((int) l); - try (CloseableIterable reader = ReadUtils.createReader(task, table)) { + try (CloseableIterable fullIterable = ReadUtils.createReader(task, table)) { + CloseableIterable reader = fullIterable; + if (filter != null && filter.op() != Expression.Operation.TRUE) { + FilterFunction filterFunc = FilterUtils.filterOn(filter, table.schema()); + reader = CloseableIterable.filter(reader, filterFunc::filter); + } + for (Record record : reader) { Row row = IcebergUtils.icebergRecordToBeamRow(scanConfig.getSchema(), record); out.output(row); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java index c4bc8072e114..cdb5b050029b 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java @@ -43,6 +43,7 @@ import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.encryption.InputFilesDecryptor; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; @@ -59,6 +60,7 @@ class ScanTaskReader extends BoundedSource.BoundedReader { private final ScanTaskSource source; private final org.apache.iceberg.Schema project; + private final @Nullable Expression filter; private final Schema beamSchema; transient @Nullable FileIO io; @@ -70,6 +72,7 @@ class ScanTaskReader extends BoundedSource.BoundedReader { public ScanTaskReader(ScanTaskSource source) { this.source = source; this.project = source.getSchema(); + this.filter = source.getFilter(); this.beamSchema = icebergSchemaToBeamSchema(source.getSchema()); } @@ -183,8 +186,12 @@ public boolean advance() throws IOException { } GenericDeleteFilter deleteFilter = new GenericDeleteFilter(checkStateNotNull(io), fileTask, fileTask.schema(), project); - currentIterator = deleteFilter.filter(iterable).iterator(); - + iterable = deleteFilter.filter(iterable); + if (filter != null && filter.op() != Expression.Operation.TRUE) { + FilterUtils.FilterFunction filterFunction = FilterUtils.filterOn(filter, project); + iterable = CloseableIterable.filter(iterable, filterFunction::filter); + } + currentIterator = iterable.iterator(); } while (true); return false; diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java index 7d4b05d221f6..812a8af3c035 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java @@ -29,6 +29,8 @@ import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expression; +import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.dataflow.qual.Pure; /** @@ -59,6 +61,12 @@ Schema getSchema() { return getTable().schema(); } + @Pure + @Nullable + Expression getFilter() { + return scanConfig.getFilter(); + } + @Override public List> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java new file mode 100644 index 000000000000..512b00eeeff0 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.io.iceberg.FilterUtils.convert; +import static org.apache.iceberg.expressions.Expressions.and; +import static org.apache.iceberg.expressions.Expressions.equal; +import static org.apache.iceberg.expressions.Expressions.greaterThan; +import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.in; +import static org.apache.iceberg.expressions.Expressions.isNull; +import static org.apache.iceberg.expressions.Expressions.lessThan; +import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.notEqual; +import static org.apache.iceberg.expressions.Expressions.notIn; +import static org.apache.iceberg.expressions.Expressions.notNull; +import static org.apache.iceberg.expressions.Expressions.or; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.iceberg.util.DateTimeUtil.daysFromDate; +import static org.apache.iceberg.util.DateTimeUtil.microsFromTime; +import static org.apache.iceberg.util.DateTimeUtil.microsFromTimestamp; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.And; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expression.Operation; +import org.apache.iceberg.expressions.Or; +import org.apache.iceberg.expressions.UnboundPredicate; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** Test class for {@link FilterUtils}. */ +public class FilterUtilsTest { + @Test + public void testIsNull() { + TestCase.expecting(isNull("field_1")) + .fromFilter("field_1 IS NULL") + .withFieldType(Types.IntegerType.get()) + .validate(); + } + + @Test + public void testIsNotNull() { + TestCase.expecting(notNull("field_1")) + .fromFilter("field_1 IS NOT NULL") + .withFieldType(Types.IntegerType.get()) + .validate(); + } + + @Test + public void testLessThan() { + // integer + TestCase.expecting(lessThan("field_1", 30)) + .fromFilter("field_1 < 30") + .withFieldType(Types.IntegerType.get()) + .validate(); + + // float + TestCase.expecting(lessThan("field_1", 30.58f)) + .fromFilter("field_1 < 30.58") + .withFieldType(Types.FloatType.get()) + .validate(); + + // string + TestCase.expecting(lessThan("field_1", "xyz")) + .fromFilter("field_1 < 'xyz'") + .withFieldType(Types.StringType.get()) + .validate(); + + // date + TestCase.expecting(lessThan("field_1", daysFromDate(LocalDate.parse("2025-05-03")))) + .fromFilter("field_1 < '2025-05-03'") + .withFieldType(Types.DateType.get()) + .validate(); + + // time + TestCase.expecting(lessThan("field_1", microsFromTime(LocalTime.parse("10:30:05.123")))) + .fromFilter("field_1 < '10:30:05.123'") + .withFieldType(Types.TimeType.get()) + .validate(); + + // datetime + TestCase.expecting( + lessThan( + "field_1", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123")))) + .fromFilter("field_1 < '2025-05-03T10:30:05.123'") + .withFieldType(Types.TimestampType.withoutZone()) + .validate(); + } + + @Test + public void testLessThanOrEqual() { + // integer + TestCase.expecting(lessThanOrEqual("field_1", 30)) + .fromFilter("field_1 <= 30") + .withFieldType(Types.IntegerType.get()) + .validate(); + + // float + TestCase.expecting(lessThanOrEqual("field_1", 30.58f)) + .fromFilter("field_1 <= 30.58") + .withFieldType(Types.FloatType.get()) + .validate(); + + // string + TestCase.expecting(lessThanOrEqual("field_1", "xyz")) + .fromFilter("field_1 <= 'xyz'") + .withFieldType(Types.StringType.get()) + .validate(); + + // date + TestCase.expecting(lessThanOrEqual("field_1", daysFromDate(LocalDate.parse("2025-05-03")))) + .fromFilter("field_1 <= '2025-05-03'") + .withFieldType(Types.DateType.get()) + .validate(); + + // time + TestCase.expecting(lessThanOrEqual("field_1", microsFromTime(LocalTime.parse("10:30:05.123")))) + .fromFilter("field_1 <= '10:30:05.123'") + .withFieldType(Types.TimeType.get()) + .validate(); + + // datetime + TestCase.expecting( + lessThanOrEqual( + "field_1", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123")))) + .fromFilter("field_1 <= '2025-05-03T10:30:05.123'") + .withFieldType(Types.TimestampType.withoutZone()) + .validate(); + } + + @Test + public void testGreaterThan() { + // integer + TestCase.expecting(greaterThan("field_1", 30)) + .fromFilter("field_1 > 30") + .withFieldType(Types.IntegerType.get()) + .validate(); + + // float + TestCase.expecting(greaterThan("field_1", 30.58f)) + .fromFilter("field_1 > 30.58") + .withFieldType(Types.FloatType.get()) + .validate(); + + // string + TestCase.expecting(greaterThan("field_1", "xyz")) + .fromFilter("field_1 > 'xyz'") + .withFieldType(Types.StringType.get()) + .validate(); + + // date + TestCase.expecting(greaterThan("field_1", daysFromDate(LocalDate.parse("2025-05-03")))) + .fromFilter("field_1 > '2025-05-03'") + .withFieldType(Types.DateType.get()) + .validate(); + + // time + TestCase.expecting(greaterThan("field_1", microsFromTime(LocalTime.parse("10:30:05.123")))) + .fromFilter("field_1 > '10:30:05.123'") + .withFieldType(Types.TimeType.get()) + .validate(); + + // datetime + TestCase.expecting( + greaterThan( + "field_1", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123")))) + .fromFilter("field_1 > '2025-05-03T10:30:05.123'") + .withFieldType(Types.TimestampType.withoutZone()) + .validate(); + } + + @Test + public void testGreaterThanOrEqual() { + // integer + TestCase.expecting(greaterThanOrEqual("field_1", 30)) + .fromFilter("field_1 >= 30") + .withFieldType(Types.IntegerType.get()) + .validate(); + + // float + TestCase.expecting(greaterThanOrEqual("field_1", 30.58f)) + .fromFilter("field_1 >= 30.58") + .withFieldType(Types.FloatType.get()) + .validate(); + + // string + TestCase.expecting(greaterThanOrEqual("field_1", "xyz")) + .fromFilter("field_1 >= 'xyz'") + .withFieldType(Types.StringType.get()) + .validate(); + + // date + TestCase.expecting(greaterThanOrEqual("field_1", daysFromDate(LocalDate.parse("2025-05-03")))) + .fromFilter("field_1 >= '2025-05-03'") + .withFieldType(Types.DateType.get()) + .validate(); + + // time + TestCase.expecting( + greaterThanOrEqual("field_1", microsFromTime(LocalTime.parse("10:30:05.123")))) + .fromFilter("field_1 >= '10:30:05.123'") + .withFieldType(Types.TimeType.get()) + .validate(); + + // datetime + TestCase.expecting( + greaterThanOrEqual( + "field_1", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123")))) + .fromFilter("field_1 >= '2025-05-03T10:30:05.123'") + .withFieldType(Types.TimestampType.withoutZone()) + .validate(); + } + + @Test + public void testEquals() { + // integer + TestCase.expecting(equal("field_1", 30)) + .fromFilter("field_1 = 30") + .withFieldType(Types.IntegerType.get()) + .validate(); + + // float + TestCase.expecting(equal("field_1", 30.58f)) + .fromFilter("field_1 = 30.58") + .withFieldType(Types.FloatType.get()) + .validate(); + + // string + TestCase.expecting(equal("field_1", "xyz")) + .fromFilter("field_1 = 'xyz'") + .withFieldType(Types.StringType.get()) + .validate(); + + // date + TestCase.expecting(equal("field_1", daysFromDate(LocalDate.parse("2025-05-03")))) + .fromFilter("field_1 = '2025-05-03'") + .withFieldType(Types.DateType.get()) + .validate(); + + // time + TestCase.expecting(equal("field_1", microsFromTime(LocalTime.parse("10:30:05.123")))) + .fromFilter("field_1 = '10:30:05.123'") + .withFieldType(Types.TimeType.get()) + .validate(); + + // datetime + TestCase.expecting( + equal("field_1", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123")))) + .fromFilter("field_1 = '2025-05-03T10:30:05.123'") + .withFieldType(Types.TimestampType.withoutZone()) + .validate(); + } + + @Test + public void testNotEquals() { + // integer + TestCase.expecting(notEqual("field_1", 30)) + .fromFilter("field_1 <> 30") + .withFieldType(Types.IntegerType.get()) + .validate(); + + // float + TestCase.expecting(notEqual("field_1", 30.58f)) + .fromFilter("field_1 <> 30.58") + .withFieldType(Types.FloatType.get()) + .validate(); + + // string + TestCase.expecting(notEqual("field_1", "xyz")) + .fromFilter("field_1 <> 'xyz'") + .withFieldType(Types.StringType.get()) + .validate(); + + // date + TestCase.expecting(notEqual("field_1", daysFromDate(LocalDate.parse("2025-05-03")))) + .fromFilter("field_1 <> '2025-05-03'") + .withFieldType(Types.DateType.get()) + .validate(); + + // time + TestCase.expecting(notEqual("field_1", microsFromTime(LocalTime.parse("10:30:05.123")))) + .fromFilter("field_1 <> '10:30:05.123'") + .withFieldType(Types.TimeType.get()) + .validate(); + + // datetime + TestCase.expecting( + notEqual( + "field_1", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123")))) + .fromFilter("field_1 <> '2025-05-03T10:30:05.123'") + .withFieldType(Types.TimestampType.withoutZone()) + .validate(); + } + + @Test + public void testIn() { + // string + TestCase.expecting(in("field_1", Arrays.asList("xyz", "abc", "123", "foo"))) + .fromFilter("field_1 IN ('xyz', 'abc', '123', 'foo')") + .withFieldType(Types.StringType.get()) + .validate(); + + // integer + TestCase.expecting(in("field_1", Arrays.asList(1, 2, 3, 4, 5))) + .fromFilter("field_1 IN (1, 2, 3, 4, 5)") + .withFieldType(Types.IntegerType.get()) + .validate(); + } + + @Test + public void testNotIn() { + // string + TestCase.expecting(notIn("field_1", Arrays.asList("xyz", "abc", "123", "foo"))) + .fromFilter("field_1 NOT IN ('xyz', 'abc', '123', 'foo')") + .withFieldType(Types.StringType.get()) + .validate(); + + // integer + TestCase.expecting(notIn("field_1", Arrays.asList(1, 2, 3, 4, 5))) + .fromFilter("field_1 NOT IN (1, 2, 3, 4, 5)") + .withFieldType(Types.IntegerType.get()) + .validate(); + } + + @Test + public void testAnd() { + TestCase.expecting( + and( + and( + and( + and( + and( + and( + and(and(and(IS_NULL, NOT_NULL), LESS_THAN), LESS_THAN_OR_EQUAL), + GREATER_THAN), + GREATER_THAN_OR_EQUAL), + EQUAL), + NOT_EQUAL), + IN), + NOT_IN)) + .fromFilter( + "field_1 IS NULL AND " + + "field_2 IS NOT NULL AND " + + "field_3 < 'xyz' AND " + + "field_4 <= 123 AND " + + "field_5 > 123.456 AND " + + "field_6 >= '2025-05-03' AND " + + "field_7 = '10:30:05.123' AND " + + "field_8 <> '2025-05-03T10:30:05.123' AND " + + "field_9 IN ('xyz', 'abc', '123', 'foo') AND " + + "field_10 NOT IN (1, 2, 3, 4, 5)") + .withSchema(SCHEMA) + .validate(); + } + + @Test + public void testOr() { + TestCase.expecting( + or( + or( + or( + or( + or( + or( + or(or(or(IS_NULL, NOT_NULL), LESS_THAN), LESS_THAN_OR_EQUAL), + GREATER_THAN), + GREATER_THAN_OR_EQUAL), + EQUAL), + NOT_EQUAL), + IN), + NOT_IN)) + .fromFilter( + "field_1 IS NULL OR " + + "field_2 IS NOT NULL OR " + + "field_3 < 'xyz' OR " + + "field_4 <= 123 OR " + + "field_5 > 123.456 OR " + + "field_6 >= '2025-05-03' OR " + + "field_7 = '10:30:05.123' OR " + + "field_8 <> '2025-05-03T10:30:05.123' OR " + + "field_9 IN ('xyz', 'abc', '123', 'foo') OR " + + "field_10 NOT IN (1, 2, 3, 4, 5)") + .withSchema(SCHEMA) + .validate(); + } + + @Test + public void testAndOr() { + TestCase.expecting( + or( + or( + or( + or(and(IS_NULL, NOT_NULL), and(LESS_THAN, LESS_THAN_OR_EQUAL)), + and(GREATER_THAN, GREATER_THAN_OR_EQUAL)), + and(EQUAL, NOT_EQUAL)), + and(IN, NOT_IN))) + .fromFilter( + "field_1 IS NULL AND " + + "field_2 IS NOT NULL OR " + + "field_3 < 'xyz' AND " + + "field_4 <= 123 OR " + + "field_5 > 123.456 AND " + + "field_6 >= '2025-05-03' OR " + + "field_7 = '10:30:05.123' AND " + + "field_8 <> '2025-05-03T10:30:05.123' OR " + + "field_9 IN ('xyz', 'abc', '123', 'foo') AND " + + "field_10 NOT IN (1, 2, 3, 4, 5)") + .withSchema(SCHEMA) + .validate(); + } + + @Test + public void testScanFiles() throws IOException { + Schema schema = + new Schema( + required(1, "id", Types.IntegerType.get()), required(2, "str", Types.StringType.get())); + Table table = warehouse.createTable(TableIdentifier.parse("default.table"), schema); + + List recs = + IntStream.range(0, 100) + .mapToObj( + i -> { + GenericRecord rec = GenericRecord.create(schema); + rec.setField("id", i); + rec.setField("str", "value_" + i); + return rec; + }) + .collect(Collectors.toList()); + + ImmutableList files = + ImmutableList.of( + warehouse.writeRecords("file_0.parquet", schema, recs.subList(0, 10)), + warehouse.writeRecords("file_10.parquet", schema, recs.subList(10, 20)), + warehouse.writeRecords("file_20.parquet", schema, recs.subList(20, 30)), + warehouse.writeRecords("file_30.parquet", schema, recs.subList(30, 40)), + warehouse.writeRecords("file_40.parquet", schema, recs.subList(40, 50)), + warehouse.writeRecords("file_50.parquet", schema, recs.subList(50, 60)), + warehouse.writeRecords("file_60.parquet", schema, recs.subList(60, 70)), + warehouse.writeRecords("file_70.parquet", schema, recs.subList(70, 80)), + warehouse.writeRecords("file_80.parquet", schema, recs.subList(80, 90)), + warehouse.writeRecords("file_90.parquet", schema, recs.subList(90, 100))); + + AppendFiles append = table.newAppend(); + files.forEach(append::appendFile); + append.commit(); + + TableScan scan = table.newScan().project(schema).filter(FilterUtils.convert("id < 58", schema)); + + Set expectedFiles = + ImmutableSet.of( + "file_0.parquet", + "file_10.parquet", + "file_20.parquet", + "file_30.parquet", + "file_40.parquet", + "file_50.parquet"); + ImmutableSet.Builder actualFiles = ImmutableSet.builder(); + for (FileScanTask task : scan.planFiles()) { + String fileName = Iterables.getLast(Splitter.on('/').split(task.file().path().toString())); + actualFiles.add(fileName); + } + assertEquals(expectedFiles, actualFiles.build()); + } + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule public TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); + + private static class TestCase { + private @Nullable String filter; + private @Nullable Schema schema; + private final Expression expected; + + private TestCase(Expression expression) { + this.expected = expression; + } + + TestCase fromFilter(String filter) { + this.filter = filter; + return this; + } + + TestCase withFieldType(Type type) { + String fieldName = ((UnboundPredicate) expected).ref().name(); + this.schema = new Schema(Types.NestedField.required(1, fieldName, type)); + return this; + } + + TestCase withSchema(Schema schema) { + this.schema = schema; + return this; + } + + static TestCase expecting(Expression expression) { + return new TestCase(expression); + } + + void validate() { + Preconditions.checkState( + schema != null && filter != null, "TestCase has not been fully initialized yet"); + Expression actual = convert(filter, schema); + checkEquals(expected, actual); + } + } + + private static final Expression IS_NULL = isNull("field_1"); + private static final Expression NOT_NULL = notNull("field_2"); + private static final Expression LESS_THAN = lessThan("field_3", "xyz"); + private static final Expression LESS_THAN_OR_EQUAL = lessThanOrEqual("field_4", 123); + private static final Expression GREATER_THAN = greaterThan("field_5", 123.456f); + private static final Expression GREATER_THAN_OR_EQUAL = + greaterThanOrEqual("field_6", daysFromDate(LocalDate.parse("2025-05-03"))); + private static final Expression EQUAL = + equal("field_7", microsFromTime(LocalTime.parse("10:30:05.123"))); + private static final Expression NOT_EQUAL = + notEqual("field_8", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123"))); + private static final Expression IN = in("field_9", Arrays.asList("xyz", "abc", "123", "foo")); + private static final Expression NOT_IN = notIn("field_10", Arrays.asList(1, 2, 3, 4, 5)); + + private static final Schema SCHEMA = + new Schema( + required(1, "field_1", Types.StringType.get()), + required(2, "field_2", Types.StringType.get()), + required(3, "field_3", Types.StringType.get()), + required(4, "field_4", Types.IntegerType.get()), + required(5, "field_5", Types.FloatType.get()), + required(6, "field_6", Types.DateType.get()), + required(7, "field_7", Types.TimeType.get()), + required(8, "field_8", Types.TimestampType.withoutZone()), + required(9, "field_9", Types.StringType.get()), + required(10, "field_10", Types.IntegerType.get())); + + private static void checkEquals(Expression expectedExpr, Expression actualExpr) { + System.out.printf("e: %s\n\na: %s%n-----------%n", expectedExpr, actualExpr); + if (expectedExpr instanceof UnboundPredicate) { + assertTrue(actualExpr instanceof UnboundPredicate); + } else if (expectedExpr instanceof And) { + assertTrue(actualExpr instanceof And); + checkEqualsAnd((And) expectedExpr, (And) actualExpr); + return; + } else if (expectedExpr instanceof Or) { + assertTrue(actualExpr instanceof Or); + checkEqualsOr((Or) expectedExpr, (Or) actualExpr); + return; + } + + UnboundPredicate expected = (UnboundPredicate) expectedExpr; + UnboundPredicate actual = (UnboundPredicate) actualExpr; + assertEquals(expected.op(), actual.op()); + assertEquals(expected.ref().name(), actual.ref().name()); + + ImmutableSet inOperations = ImmutableSet.of(Operation.IN, Operation.NOT_IN); + if (inOperations.contains(expected.op())) { + System.out.printf( + "xxx op: %s, literals: %s, ref: %s%n", + expected.op(), expected.literals(), expected.ref().name()); + assertEquals(expected.literals(), actual.literals()); + } else { + System.out.printf( + "xxx op: %s, literal: %s, ref: %s%n", + expected.op(), expected.literal(), expected.ref().name()); + assertEquals(expected.literal(), actual.literal()); + } + } + + private static void checkEqualsAnd(And expected, And actual) { + assertEquals(expected.op(), actual.op()); + checkEquals(expected.left(), actual.left()); + checkEquals(expected.right(), actual.right()); + } + + private static void checkEqualsOr(Or expected, Or actual) { + assertEquals(expected.op(), actual.op()); + checkEquals(expected.left(), actual.left()); + checkEquals(expected.right(), actual.right()); + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index fe92480ab2ce..6acca0825513 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -234,6 +234,40 @@ public void testSimpleScan() throws Exception { testPipeline.run(); } + @Test + public void testScanWithFilter() throws Exception { + TableIdentifier tableId = + TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA); + final Schema schema = icebergSchemaToBeamSchema(TestFixtures.SCHEMA); + + List> expectedRecords = warehouse.commitData(simpleTable); + + IcebergIO.ReadRows read = + IcebergIO.readRows(catalogConfig()).from(tableId).withFilter("id < 10"); + + if (useIncrementalScan) { + read = read.withCdc().toSnapshot(simpleTable.currentSnapshot().snapshotId()); + } + final List expectedRows = + expectedRecords.stream() + .flatMap(List::stream) + .map(record -> IcebergUtils.icebergRecordToBeamRow(schema, record)) + .filter(row -> row.getInt64("id") < 10) + .collect(Collectors.toList()); + + PCollection output = testPipeline.apply(read).apply(new PrintRow()); + + PAssert.that(output) + .satisfies( + (Iterable rows) -> { + assertThat(rows, containsInAnyOrder(expectedRows.toArray())); + return null; + }); + + testPipeline.run(); + } + @Test public void testReadSchemaWithRandomlyOrderedIds() throws IOException { TableIdentifier tableId = TableIdentifier.of("default", testName.getMethodName()); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java index dc5e3b263247..8b731c001ad1 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java @@ -29,11 +29,6 @@ import org.apache.iceberg.hadoop.HadoopCatalog; public class HadoopCatalogIT extends IcebergCatalogBaseIT { - @Override - public Integer numRecords() { - return 100; - } - @Override public Catalog createCatalog() { Configuration catalogHadoopConf = new Configuration(); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java index a6b4df69f42b..d426d920cc6c 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java @@ -144,7 +144,7 @@ public void catalogSetup() throws Exception {} public void catalogCleanup() throws Exception {} public Integer numRecords() { - return OPTIONS.getRunner().equals(DirectRunner.class) ? 10 : 1000; + return OPTIONS.getRunner().equals(DirectRunner.class) ? 4 : 1000; } public String tableId() { @@ -244,8 +244,8 @@ public void cleanUp() throws Exception { .addStringField("str") .addStringField("char") .addInt64Field("modulo_5") - .addBooleanField("bool") - .addInt32Field("int") + .addBooleanField("bool_field") + .addInt32Field("int_field") .addRowField("row", NESTED_ROW_SCHEMA) .addArrayField("arr_long", Schema.FieldType.INT64) .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA) @@ -424,14 +424,43 @@ public void testRead() throws Exception { } @Test - public void testStreamingRead() throws Exception { + public void testReadWithFilter() throws Exception { Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA); - List expectedRows = populateTable(table); + List expectedRows = + populateTable(table).stream() + .filter( + row -> + row.getBoolean("bool_field") + && (row.getInt32("int_field") < 500 || row.getInt32("modulo_5") == 3)) + .collect(Collectors.toList()); + + Map config = new HashMap<>(managedIcebergConfig(tableId())); + config.put("filter", "bool_field = TRUE AND (int_field < 500 OR modulo_5 = 3)"); + + PCollection rows = + pipeline.apply(Managed.read(ICEBERG).withConfig(config)).getSinglePCollection(); + + PAssert.that(rows).containsInAnyOrder(expectedRows); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testStreamingReadWithFilter() throws Exception { + Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA); + + List expectedRows = + populateTable(table).stream() + .filter( + row -> + row.getBoolean("bool_field") + && (row.getInt32("int_field") < 350 || row.getInt32("modulo_5") == 2)) + .collect(Collectors.toList()); Map config = new HashMap<>(managedIcebergConfig(tableId())); config.put("streaming", true); config.put("to_snapshot", table.currentSnapshot().snapshotId()); + config.put("filter", "bool_field = TRUE AND (int_field < 350 OR modulo_5 = 2)"); PCollection rows = pipeline.apply(Managed.read(ICEBERG_CDC).withConfig(config)).getSinglePCollection(); @@ -472,6 +501,34 @@ public void testWriteRead() throws IOException { containsInAnyOrder(expectedRows.stream().map(RECORD_FUNC::apply).toArray())); } + @Test + public void testWriteReadWithFilter() throws IOException { + Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA); + List expectedRows = + populateTable(table).stream() + .filter( + row -> + row.getBoolean("bool_field") + && (row.getInt32("int_field") < 350 || row.getInt32("modulo_5") == 2)) + .collect(Collectors.toList()); + Map readConfig = new HashMap<>(managedIcebergConfig(tableId())); + readConfig.put("filter", "bool_field = TRUE AND (int_field < 350 OR modulo_5 = 2)"); + String writeTableId = tableId() + "_2"; + Map writeConfig = managedIcebergConfig(writeTableId); + + pipeline + .apply("read", Managed.read(ICEBERG).withConfig(readConfig)) + .getSinglePCollection() + .apply("write", Managed.write(ICEBERG).withConfig(writeConfig)); + pipeline.run().waitUntilFinish(); + + List returnedRecords = + readRecords(catalog.loadTable(TableIdentifier.parse(writeTableId))); + assertThat( + returnedRecords, + containsInAnyOrder(expectedRows.stream().map(RECORD_FUNC::apply).toArray())); + } + @Test public void testReadWriteStreaming() throws IOException { Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA); @@ -525,7 +582,7 @@ public void testWriteToPartitionedTable() throws IOException { // this partition spec will create a partition like: /bool=true/modulo_5=3/str_trunc=value_3/ PartitionSpec partitionSpec = PartitionSpec.builderFor(ICEBERG_SCHEMA) - .identity("bool") + .identity("bool_field") .hour("datetime") .truncate("str", "value_x".length()) .build(); @@ -554,7 +611,10 @@ private PeriodicImpulse getStreamingSource() { public void testStreamingWrite() throws IOException { int numRecords = numRecords(); PartitionSpec partitionSpec = - PartitionSpec.builderFor(ICEBERG_SCHEMA).identity("bool").identity("modulo_5").build(); + PartitionSpec.builderFor(ICEBERG_SCHEMA) + .identity("bool_field") + .identity("modulo_5") + .build(); Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA, partitionSpec); @@ -584,7 +644,10 @@ public void testStreamingWrite() throws IOException { public void testStreamingWriteWithPriorWindowing() throws IOException { int numRecords = numRecords(); PartitionSpec partitionSpec = - PartitionSpec.builderFor(ICEBERG_SCHEMA).identity("bool").identity("modulo_5").build(); + PartitionSpec.builderFor(ICEBERG_SCHEMA) + .identity("bool_field") + .identity("modulo_5") + .build(); Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA, partitionSpec); @@ -628,7 +691,7 @@ private void writeToDynamicDestinations( String tableIdentifierTemplate = tableId() + "_{modulo_5}_{char}"; Map writeConfig = new HashMap<>(managedIcebergConfig(tableIdentifierTemplate)); - List fieldsToFilter = Arrays.asList("row", "str", "int", "nullable_long"); + List fieldsToFilter = Arrays.asList("row", "str", "int_field", "nullable_long"); // an un-configured filter will just return the same row RowFilter rowFilter = new RowFilter(BEAM_SCHEMA); if (filterOp != null) { @@ -663,7 +726,7 @@ private void writeToDynamicDestinations( if (partitioning) { Preconditions.checkState(filterOp == null || !filterOp.equals("only")); PartitionSpec partitionSpec = - PartitionSpec.builderFor(tableSchema).identity("bool").identity("modulo_5").build(); + PartitionSpec.builderFor(tableSchema).identity("bool_field").identity("modulo_5").build(); catalog.createTable(tableIdentifier0, tableSchema, partitionSpec); catalog.createTable(tableIdentifier1, tableSchema, partitionSpec); catalog.createTable(tableIdentifier2, tableSchema, partitionSpec); From 0ab81bbc6e8013fbad2b2b34b92de1f3a28f5d16 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 5 May 2025 16:27:28 -0400 Subject: [PATCH 05/15] tweak description --- .../sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java | 2 +- .../beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java index 70efe9a41bdb..a4dea19723c3 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java @@ -178,7 +178,7 @@ static Builder builder() { "The interval at which to poll for new snapshots. Defaults to 60 seconds.") abstract @Nullable Integer getPollIntervalSeconds(); - @SchemaFieldDescription("SQL-like filter to apply when scanning for files.") + @SchemaFieldDescription("SQL-like predicate to filter data at scan time. Example: \"id > 5 AND status = 'ACTIVE'\".") @Nullable abstract String getFilter(); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java index 62cab1f0dd51..8011fc1b8eee 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java @@ -122,7 +122,7 @@ static Builder builder() { @Nullable abstract Map getConfigProperties(); - @SchemaFieldDescription("SQL-like filter to apply when scanning for files.") + @SchemaFieldDescription("SQL-like predicate to filter data at scan time. Example: \"id > 5 AND status = 'ACTIVE'\".") @Nullable abstract String getFilter(); From 5bd926a2aa4ded7c3ba2644373fa21aed46fd632 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 5 May 2025 17:09:37 -0400 Subject: [PATCH 06/15] add to documentation, don't force lower casing fields --- .../beam/sdk/io/iceberg/FilterUtils.java | 25 +-- ...IcebergCdcReadSchemaTransformProvider.java | 4 +- .../IcebergReadSchemaTransformProvider.java | 4 +- .../beam/sdk/io/iceberg/FilterUtilsTest.java | 155 +++++++++--------- .../content/en/documentation/io/managed-io.md | 24 +++ 5 files changed, 121 insertions(+), 91 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java index 1743045e01fe..9bec38c09e55 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java @@ -185,7 +185,7 @@ private static String getOnlyChildName(SqlBasicCall call) { SqlNode ref = call.operand(0); Preconditions.checkState( ref instanceof SqlIdentifier, "Expected operand '%s' to be a reference.", ref); - return ((SqlIdentifier) ref).getSimple().toLowerCase(); + return ((SqlIdentifier) ref).getSimple(); } private static SqlNode getLeftChild(SqlBasicCall call) { @@ -228,16 +228,17 @@ private static Expression convertFieldInLiteral(Operation op, SqlBasicCall call, checkArgument( value instanceof SqlNodeList, "Expected right hand side to be a list but got " + value.getClass()); - String field = ((SqlIdentifier) term).getSimple().toLowerCase(); + String name = ((SqlIdentifier) term).getSimple(); + TypeID type = schema.findType(name).typeId(); List list = ((SqlNodeList) value) .getList().stream().filter(Objects::nonNull).collect(Collectors.toList()); checkArgument(list.stream().allMatch(o -> o instanceof SqlLiteral)); List values = list.stream() - .map(o -> convertLiteral((SqlLiteral) o, field, schema.findType(field).typeId())) + .map(o -> convertLiteral((SqlLiteral) o, name, type)) .collect(Collectors.toList()); - return op == Operation.IN ? Expressions.in(field, values) : Expressions.notIn(field, values); + return op == Operation.IN ? Expressions.in(name, values) : Expressions.notIn(name, values); } private static Expression convertFieldAndLiteral( @@ -253,15 +254,15 @@ private static Expression convertFieldAndLiteral( SqlNode left = getLeftChild(call); SqlNode right = getRightChild(call); if (left instanceof SqlIdentifier && right instanceof SqlLiteral) { - String field = ((SqlIdentifier) left).getSimple().toLowerCase(); - TypeID type = schema.findType(field).typeId(); - Object value = convertLiteral((SqlLiteral) right, field, type); - return convertLR.apply(field, value); + String name = ((SqlIdentifier) left).getSimple(); + TypeID type = schema.findType(name).typeId(); + Object value = convertLiteral((SqlLiteral) right, name, type); + return convertLR.apply(name, value); } else if (left instanceof SqlLiteral && right instanceof SqlIdentifier) { - String field = ((SqlIdentifier) right).getSimple().toLowerCase(); - TypeID type = schema.findType(field).typeId(); - Object value = convertLiteral((SqlLiteral) left, field, type); - return convertRL.apply(field, value); + String name = ((SqlIdentifier) right).getSimple(); + TypeID type = schema.findType(name).typeId(); + Object value = convertLiteral((SqlLiteral) left, name, type); + return convertRL.apply(name, value); } else { throw new IllegalArgumentException("Unsupported operands for expression: " + call); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java index a4dea19723c3..f3fbfbcc52e5 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java @@ -178,7 +178,9 @@ static Builder builder() { "The interval at which to poll for new snapshots. Defaults to 60 seconds.") abstract @Nullable Integer getPollIntervalSeconds(); - @SchemaFieldDescription("SQL-like predicate to filter data at scan time. Example: \"id > 5 AND status = 'ACTIVE'\".") + @SchemaFieldDescription( + "SQL-like predicate to filter data at scan time. Example: \"id > 5 AND status = 'ACTIVE'\". " + + "Uses Apache Calcite syntax: https://calcite.apache.org/docs/reference.html") @Nullable abstract String getFilter(); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java index 8011fc1b8eee..1729db0bd068 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java @@ -122,7 +122,9 @@ static Builder builder() { @Nullable abstract Map getConfigProperties(); - @SchemaFieldDescription("SQL-like predicate to filter data at scan time. Example: \"id > 5 AND status = 'ACTIVE'\".") + @SchemaFieldDescription( + "SQL-like predicate to filter data at scan time. Example: \"id > 5 AND status = 'ACTIVE'\". " + + "Uses Apache Calcite syntax: https://calcite.apache.org/docs/reference.html") @Nullable abstract String getFilter(); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java index 512b00eeeff0..cc0755897548 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java @@ -37,8 +37,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import java.io.IOException; import java.time.LocalDate; import java.time.LocalDateTime; @@ -50,7 +48,9 @@ import java.util.stream.IntStream; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; @@ -77,16 +77,16 @@ public class FilterUtilsTest { @Test public void testIsNull() { - TestCase.expecting(isNull("field_1")) - .fromFilter("field_1 IS NULL") + TestCase.expecting(isNull("fiELd_1")) + .fromFilter("\"fiELd_1\" IS NULL") .withFieldType(Types.IntegerType.get()) .validate(); } @Test public void testIsNotNull() { - TestCase.expecting(notNull("field_1")) - .fromFilter("field_1 IS NOT NULL") + TestCase.expecting(notNull("fiELd_1")) + .fromFilter("\"fiELd_1\" IS NOT NULL") .withFieldType(Types.IntegerType.get()) .validate(); } @@ -95,31 +95,31 @@ public void testIsNotNull() { public void testLessThan() { // integer TestCase.expecting(lessThan("field_1", 30)) - .fromFilter("field_1 < 30") + .fromFilter("\"field_1\" < 30") .withFieldType(Types.IntegerType.get()) .validate(); // float TestCase.expecting(lessThan("field_1", 30.58f)) - .fromFilter("field_1 < 30.58") + .fromFilter("\"field_1\" < 30.58") .withFieldType(Types.FloatType.get()) .validate(); // string TestCase.expecting(lessThan("field_1", "xyz")) - .fromFilter("field_1 < 'xyz'") + .fromFilter("\"field_1\" < 'xyz'") .withFieldType(Types.StringType.get()) .validate(); // date TestCase.expecting(lessThan("field_1", daysFromDate(LocalDate.parse("2025-05-03")))) - .fromFilter("field_1 < '2025-05-03'") + .fromFilter("\"field_1\" < '2025-05-03'") .withFieldType(Types.DateType.get()) .validate(); // time TestCase.expecting(lessThan("field_1", microsFromTime(LocalTime.parse("10:30:05.123")))) - .fromFilter("field_1 < '10:30:05.123'") + .fromFilter("\"field_1\" < '10:30:05.123'") .withFieldType(Types.TimeType.get()) .validate(); @@ -127,7 +127,7 @@ public void testLessThan() { TestCase.expecting( lessThan( "field_1", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123")))) - .fromFilter("field_1 < '2025-05-03T10:30:05.123'") + .fromFilter("\"field_1\" < '2025-05-03T10:30:05.123'") .withFieldType(Types.TimestampType.withoutZone()) .validate(); } @@ -136,31 +136,31 @@ public void testLessThan() { public void testLessThanOrEqual() { // integer TestCase.expecting(lessThanOrEqual("field_1", 30)) - .fromFilter("field_1 <= 30") + .fromFilter("\"field_1\" <= 30") .withFieldType(Types.IntegerType.get()) .validate(); // float TestCase.expecting(lessThanOrEqual("field_1", 30.58f)) - .fromFilter("field_1 <= 30.58") + .fromFilter("\"field_1\" <= 30.58") .withFieldType(Types.FloatType.get()) .validate(); // string TestCase.expecting(lessThanOrEqual("field_1", "xyz")) - .fromFilter("field_1 <= 'xyz'") + .fromFilter("\"field_1\" <= 'xyz'") .withFieldType(Types.StringType.get()) .validate(); // date TestCase.expecting(lessThanOrEqual("field_1", daysFromDate(LocalDate.parse("2025-05-03")))) - .fromFilter("field_1 <= '2025-05-03'") + .fromFilter("\"field_1\" <= '2025-05-03'") .withFieldType(Types.DateType.get()) .validate(); // time TestCase.expecting(lessThanOrEqual("field_1", microsFromTime(LocalTime.parse("10:30:05.123")))) - .fromFilter("field_1 <= '10:30:05.123'") + .fromFilter("\"field_1\" <= '10:30:05.123'") .withFieldType(Types.TimeType.get()) .validate(); @@ -168,7 +168,7 @@ public void testLessThanOrEqual() { TestCase.expecting( lessThanOrEqual( "field_1", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123")))) - .fromFilter("field_1 <= '2025-05-03T10:30:05.123'") + .fromFilter("\"field_1\" <= '2025-05-03T10:30:05.123'") .withFieldType(Types.TimestampType.withoutZone()) .validate(); } @@ -177,31 +177,31 @@ public void testLessThanOrEqual() { public void testGreaterThan() { // integer TestCase.expecting(greaterThan("field_1", 30)) - .fromFilter("field_1 > 30") + .fromFilter("\"field_1\" > 30") .withFieldType(Types.IntegerType.get()) .validate(); // float TestCase.expecting(greaterThan("field_1", 30.58f)) - .fromFilter("field_1 > 30.58") + .fromFilter("\"field_1\" > 30.58") .withFieldType(Types.FloatType.get()) .validate(); // string TestCase.expecting(greaterThan("field_1", "xyz")) - .fromFilter("field_1 > 'xyz'") + .fromFilter("\"field_1\" > 'xyz'") .withFieldType(Types.StringType.get()) .validate(); // date TestCase.expecting(greaterThan("field_1", daysFromDate(LocalDate.parse("2025-05-03")))) - .fromFilter("field_1 > '2025-05-03'") + .fromFilter("\"field_1\" > '2025-05-03'") .withFieldType(Types.DateType.get()) .validate(); // time TestCase.expecting(greaterThan("field_1", microsFromTime(LocalTime.parse("10:30:05.123")))) - .fromFilter("field_1 > '10:30:05.123'") + .fromFilter("\"field_1\" > '10:30:05.123'") .withFieldType(Types.TimeType.get()) .validate(); @@ -209,7 +209,7 @@ public void testGreaterThan() { TestCase.expecting( greaterThan( "field_1", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123")))) - .fromFilter("field_1 > '2025-05-03T10:30:05.123'") + .fromFilter("\"field_1\" > '2025-05-03T10:30:05.123'") .withFieldType(Types.TimestampType.withoutZone()) .validate(); } @@ -218,32 +218,32 @@ public void testGreaterThan() { public void testGreaterThanOrEqual() { // integer TestCase.expecting(greaterThanOrEqual("field_1", 30)) - .fromFilter("field_1 >= 30") + .fromFilter("\"field_1\" >= 30") .withFieldType(Types.IntegerType.get()) .validate(); // float TestCase.expecting(greaterThanOrEqual("field_1", 30.58f)) - .fromFilter("field_1 >= 30.58") + .fromFilter("\"field_1\" >= 30.58") .withFieldType(Types.FloatType.get()) .validate(); // string TestCase.expecting(greaterThanOrEqual("field_1", "xyz")) - .fromFilter("field_1 >= 'xyz'") + .fromFilter("\"field_1\" >= 'xyz'") .withFieldType(Types.StringType.get()) .validate(); // date TestCase.expecting(greaterThanOrEqual("field_1", daysFromDate(LocalDate.parse("2025-05-03")))) - .fromFilter("field_1 >= '2025-05-03'") + .fromFilter("\"field_1\" >= '2025-05-03'") .withFieldType(Types.DateType.get()) .validate(); // time TestCase.expecting( greaterThanOrEqual("field_1", microsFromTime(LocalTime.parse("10:30:05.123")))) - .fromFilter("field_1 >= '10:30:05.123'") + .fromFilter("\"field_1\" >= '10:30:05.123'") .withFieldType(Types.TimeType.get()) .validate(); @@ -251,7 +251,7 @@ public void testGreaterThanOrEqual() { TestCase.expecting( greaterThanOrEqual( "field_1", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123")))) - .fromFilter("field_1 >= '2025-05-03T10:30:05.123'") + .fromFilter("\"field_1\" >= '2025-05-03T10:30:05.123'") .withFieldType(Types.TimestampType.withoutZone()) .validate(); } @@ -260,38 +260,38 @@ public void testGreaterThanOrEqual() { public void testEquals() { // integer TestCase.expecting(equal("field_1", 30)) - .fromFilter("field_1 = 30") + .fromFilter("\"field_1\" = 30") .withFieldType(Types.IntegerType.get()) .validate(); // float TestCase.expecting(equal("field_1", 30.58f)) - .fromFilter("field_1 = 30.58") + .fromFilter("\"field_1\" = 30.58") .withFieldType(Types.FloatType.get()) .validate(); // string TestCase.expecting(equal("field_1", "xyz")) - .fromFilter("field_1 = 'xyz'") + .fromFilter("\"field_1\" = 'xyz'") .withFieldType(Types.StringType.get()) .validate(); // date TestCase.expecting(equal("field_1", daysFromDate(LocalDate.parse("2025-05-03")))) - .fromFilter("field_1 = '2025-05-03'") + .fromFilter("\"field_1\" = '2025-05-03'") .withFieldType(Types.DateType.get()) .validate(); // time TestCase.expecting(equal("field_1", microsFromTime(LocalTime.parse("10:30:05.123")))) - .fromFilter("field_1 = '10:30:05.123'") + .fromFilter("\"field_1\" = '10:30:05.123'") .withFieldType(Types.TimeType.get()) .validate(); // datetime TestCase.expecting( equal("field_1", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123")))) - .fromFilter("field_1 = '2025-05-03T10:30:05.123'") + .fromFilter("\"field_1\" = '2025-05-03T10:30:05.123'") .withFieldType(Types.TimestampType.withoutZone()) .validate(); } @@ -300,31 +300,31 @@ public void testEquals() { public void testNotEquals() { // integer TestCase.expecting(notEqual("field_1", 30)) - .fromFilter("field_1 <> 30") + .fromFilter("\"field_1\" <> 30") .withFieldType(Types.IntegerType.get()) .validate(); // float TestCase.expecting(notEqual("field_1", 30.58f)) - .fromFilter("field_1 <> 30.58") + .fromFilter("\"field_1\" <> 30.58") .withFieldType(Types.FloatType.get()) .validate(); // string TestCase.expecting(notEqual("field_1", "xyz")) - .fromFilter("field_1 <> 'xyz'") + .fromFilter("\"field_1\" <> 'xyz'") .withFieldType(Types.StringType.get()) .validate(); // date TestCase.expecting(notEqual("field_1", daysFromDate(LocalDate.parse("2025-05-03")))) - .fromFilter("field_1 <> '2025-05-03'") + .fromFilter("\"field_1\" <> '2025-05-03'") .withFieldType(Types.DateType.get()) .validate(); // time TestCase.expecting(notEqual("field_1", microsFromTime(LocalTime.parse("10:30:05.123")))) - .fromFilter("field_1 <> '10:30:05.123'") + .fromFilter("\"field_1\" <> '10:30:05.123'") .withFieldType(Types.TimeType.get()) .validate(); @@ -332,7 +332,7 @@ public void testNotEquals() { TestCase.expecting( notEqual( "field_1", microsFromTimestamp(LocalDateTime.parse("2025-05-03T10:30:05.123")))) - .fromFilter("field_1 <> '2025-05-03T10:30:05.123'") + .fromFilter("\"field_1\" <> '2025-05-03T10:30:05.123'") .withFieldType(Types.TimestampType.withoutZone()) .validate(); } @@ -341,13 +341,13 @@ public void testNotEquals() { public void testIn() { // string TestCase.expecting(in("field_1", Arrays.asList("xyz", "abc", "123", "foo"))) - .fromFilter("field_1 IN ('xyz', 'abc', '123', 'foo')") + .fromFilter("\"field_1\" IN ('xyz', 'abc', '123', 'foo')") .withFieldType(Types.StringType.get()) .validate(); // integer TestCase.expecting(in("field_1", Arrays.asList(1, 2, 3, 4, 5))) - .fromFilter("field_1 IN (1, 2, 3, 4, 5)") + .fromFilter("\"field_1\" IN (1, 2, 3, 4, 5)") .withFieldType(Types.IntegerType.get()) .validate(); } @@ -356,13 +356,13 @@ public void testIn() { public void testNotIn() { // string TestCase.expecting(notIn("field_1", Arrays.asList("xyz", "abc", "123", "foo"))) - .fromFilter("field_1 NOT IN ('xyz', 'abc', '123', 'foo')") + .fromFilter("\"field_1\" NOT IN ('xyz', 'abc', '123', 'foo')") .withFieldType(Types.StringType.get()) .validate(); // integer TestCase.expecting(notIn("field_1", Arrays.asList(1, 2, 3, 4, 5))) - .fromFilter("field_1 NOT IN (1, 2, 3, 4, 5)") + .fromFilter("\"field_1\" NOT IN (1, 2, 3, 4, 5)") .withFieldType(Types.IntegerType.get()) .validate(); } @@ -384,16 +384,16 @@ public void testAnd() { IN), NOT_IN)) .fromFilter( - "field_1 IS NULL AND " - + "field_2 IS NOT NULL AND " - + "field_3 < 'xyz' AND " - + "field_4 <= 123 AND " - + "field_5 > 123.456 AND " - + "field_6 >= '2025-05-03' AND " - + "field_7 = '10:30:05.123' AND " - + "field_8 <> '2025-05-03T10:30:05.123' AND " - + "field_9 IN ('xyz', 'abc', '123', 'foo') AND " - + "field_10 NOT IN (1, 2, 3, 4, 5)") + "\"field_1\" IS NULL AND " + + "\"field_2\" IS NOT NULL AND " + + "\"field_3\" < 'xyz' AND " + + "\"field_4\" <= 123 AND " + + "\"field_5\" > 123.456 AND " + + "\"field_6\" >= '2025-05-03' AND " + + "\"field_7\" = '10:30:05.123' AND " + + "\"field_8\" <> '2025-05-03T10:30:05.123' AND " + + "\"field_9\" IN ('xyz', 'abc', '123', 'foo') AND " + + "\"field_10\" NOT IN (1, 2, 3, 4, 5)") .withSchema(SCHEMA) .validate(); } @@ -415,16 +415,16 @@ public void testOr() { IN), NOT_IN)) .fromFilter( - "field_1 IS NULL OR " - + "field_2 IS NOT NULL OR " - + "field_3 < 'xyz' OR " - + "field_4 <= 123 OR " - + "field_5 > 123.456 OR " - + "field_6 >= '2025-05-03' OR " - + "field_7 = '10:30:05.123' OR " - + "field_8 <> '2025-05-03T10:30:05.123' OR " - + "field_9 IN ('xyz', 'abc', '123', 'foo') OR " - + "field_10 NOT IN (1, 2, 3, 4, 5)") + "\"field_1\" IS NULL OR " + + "\"field_2\" IS NOT NULL OR " + + "\"field_3\" < 'xyz' OR " + + "\"field_4\" <= 123 OR " + + "\"field_5\" > 123.456 OR " + + "\"field_6\" >= '2025-05-03' OR " + + "\"field_7\" = '10:30:05.123' OR " + + "\"field_8\" <> '2025-05-03T10:30:05.123' OR " + + "\"field_9\" IN ('xyz', 'abc', '123', 'foo') OR " + + "\"field_10\" NOT IN (1, 2, 3, 4, 5)") .withSchema(SCHEMA) .validate(); } @@ -440,16 +440,16 @@ public void testAndOr() { and(EQUAL, NOT_EQUAL)), and(IN, NOT_IN))) .fromFilter( - "field_1 IS NULL AND " - + "field_2 IS NOT NULL OR " - + "field_3 < 'xyz' AND " - + "field_4 <= 123 OR " - + "field_5 > 123.456 AND " - + "field_6 >= '2025-05-03' OR " - + "field_7 = '10:30:05.123' AND " - + "field_8 <> '2025-05-03T10:30:05.123' OR " - + "field_9 IN ('xyz', 'abc', '123', 'foo') AND " - + "field_10 NOT IN (1, 2, 3, 4, 5)") + "\"field_1\" IS NULL AND " + + "\"field_2\" IS NOT NULL OR " + + "\"field_3\" < 'xyz' AND " + + "\"field_4\" <= 123 OR " + + "\"field_5\" > 123.456 AND " + + "\"field_6\" >= '2025-05-03' OR " + + "\"field_7\" = '10:30:05.123' AND " + + "\"field_8\" <> '2025-05-03T10:30:05.123' OR " + + "\"field_9\" IN ('xyz', 'abc', '123', 'foo') AND " + + "\"field_10\" NOT IN (1, 2, 3, 4, 5)") .withSchema(SCHEMA) .validate(); } @@ -489,7 +489,8 @@ public void testScanFiles() throws IOException { files.forEach(append::appendFile); append.commit(); - TableScan scan = table.newScan().project(schema).filter(FilterUtils.convert("id < 58", schema)); + TableScan scan = + table.newScan().project(schema).filter(FilterUtils.convert("\"id\" < 58", schema)); Set expectedFiles = ImmutableSet.of( diff --git a/website/www/site/content/en/documentation/io/managed-io.md b/website/www/site/content/en/documentation/io/managed-io.md index 9d4b98b82570..6fadbc79bf52 100644 --- a/website/www/site/content/en/documentation/io/managed-io.md +++ b/website/www/site/content/en/documentation/io/managed-io.md @@ -50,6 +50,7 @@ manual updates or user intervention required!) catalog_name (str)
catalog_properties (map[str, str])
config_properties (map[str, str])
+ filter (str)
from_snapshot (int64)
from_timestamp (int64)
poll_interval_seconds (int32)
@@ -69,6 +70,7 @@ manual updates or user intervention required!) catalog_name (str)
catalog_properties (map[str, str])
config_properties (map[str, str])
+ filter (str)
table (str)
@@ -180,6 +182,17 @@ manual updates or user intervention required!) Properties passed to the Hadoop Configuration. + + + filter + + + str + + + SQL-like predicate to filter data at scan time. Example: "id > 5 AND status = 'ACTIVE'". Uses Apache Calcite syntax: https://calcite.apache.org/docs/reference.html + + from_snapshot @@ -413,6 +426,17 @@ manual updates or user intervention required!) Properties passed to the Hadoop Configuration. + + + filter + + + str + + + SQL-like predicate to filter data at scan time. Example: "id > 5 AND status = 'ACTIVE'". Uses Apache Calcite syntax: https://calcite.apache.org/docs/reference.html + + From feca95b8b8798739991bfdfe68b8509abb4b891d Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 5 May 2025 17:11:59 -0400 Subject: [PATCH 07/15] fix tests --- .../org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java | 2 +- .../beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index 6acca0825513..c0f0a3b56c11 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -244,7 +244,7 @@ public void testScanWithFilter() throws Exception { List> expectedRecords = warehouse.commitData(simpleTable); IcebergIO.ReadRows read = - IcebergIO.readRows(catalogConfig()).from(tableId).withFilter("id < 10"); + IcebergIO.readRows(catalogConfig()).from(tableId).withFilter("\"id\" < 10"); if (useIncrementalScan) { read = read.withCdc().toSnapshot(simpleTable.currentSnapshot().snapshotId()); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java index d426d920cc6c..e6cd537e1148 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java @@ -436,7 +436,7 @@ public void testReadWithFilter() throws Exception { .collect(Collectors.toList()); Map config = new HashMap<>(managedIcebergConfig(tableId())); - config.put("filter", "bool_field = TRUE AND (int_field < 500 OR modulo_5 = 3)"); + config.put("filter", "\"bool_field\" = TRUE AND (\"int_field\" < 500 OR \"modulo_5\" = 3)"); PCollection rows = pipeline.apply(Managed.read(ICEBERG).withConfig(config)).getSinglePCollection(); @@ -460,7 +460,7 @@ public void testStreamingReadWithFilter() throws Exception { Map config = new HashMap<>(managedIcebergConfig(tableId())); config.put("streaming", true); config.put("to_snapshot", table.currentSnapshot().snapshotId()); - config.put("filter", "bool_field = TRUE AND (int_field < 350 OR modulo_5 = 2)"); + config.put("filter", "\"bool_field\" = TRUE AND (\"int_field\" < 350 OR \"modulo_5\" = 2)"); PCollection rows = pipeline.apply(Managed.read(ICEBERG_CDC).withConfig(config)).getSinglePCollection(); @@ -512,7 +512,7 @@ public void testWriteReadWithFilter() throws IOException { && (row.getInt32("int_field") < 350 || row.getInt32("modulo_5") == 2)) .collect(Collectors.toList()); Map readConfig = new HashMap<>(managedIcebergConfig(tableId())); - readConfig.put("filter", "bool_field = TRUE AND (int_field < 350 OR modulo_5 = 2)"); + readConfig.put("filter", "\"bool_field\" = TRUE AND (\"int_field\" < 350 OR \"modulo_5\" = 2)"); String writeTableId = tableId() + "_2"; Map writeConfig = managedIcebergConfig(writeTableId); From 80df7948452b1055ae848d4b7d9d8a35478e5173 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 6 May 2025 06:19:41 -0400 Subject: [PATCH 08/15] fix --- sdks/java/io/iceberg/build.gradle | 1 + .../beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 84ed45208add..8d5cef919949 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -177,6 +177,7 @@ task dataflowIntegrationTest(type: Test) { filter { includeTestsMatching 'org.apache.beam.sdk.io.iceberg.catalog.BigQueryMetastoreCatalogIT.testRead' + includeTestsMatching 'org.apache.beam.sdk.io.iceberg.catalog.BigQueryMetastoreCatalogIT.testReadWithFilter' includeTestsMatching 'org.apache.beam.sdk.io.iceberg.catalog.BigQueryMetastoreCatalogIT.testStreamingRead' includeTestsMatching 'org.apache.beam.sdk.io.iceberg.catalog.BigQueryMetastoreCatalogIT.testWrite' includeTestsMatching 'org.apache.beam.sdk.io.iceberg.catalog.BigQueryMetastoreCatalogIT.testWriteRead' diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java index e6cd537e1148..17907230dab8 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java @@ -144,7 +144,7 @@ public void catalogSetup() throws Exception {} public void catalogCleanup() throws Exception {} public Integer numRecords() { - return OPTIONS.getRunner().equals(DirectRunner.class) ? 4 : 1000; + return OPTIONS.getRunner().equals(DirectRunner.class) ? 10 : 1000; } public String tableId() { From d5036495fb3e49bbefb48c36a097063665281d0c Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 6 May 2025 06:55:40 -0400 Subject: [PATCH 09/15] fix --- .../iceberg/catalog/BigQueryMetastoreCatalogIT.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java index d224f0caafcf..27aa92b40069 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java @@ -108,11 +108,12 @@ public Map managedIcebergConfig(String tableId) { @Test public void testWriteToPartitionedAndValidateWithBQQuery() throws IOException, InterruptedException { - // For an example row where bool=true, modulo_5=3, str=value_303, - // this partition spec will create a partition like: /bool=true/modulo_5=3/str_trunc=value_3/ + // For an example row where bool_field=true, modulo_5=3, str=value_303, + // this partition spec will create a partition like: + // /bool_field=true/modulo_5=3/str_trunc=value_3/ PartitionSpec partitionSpec = PartitionSpec.builderFor(ICEBERG_SCHEMA) - .identity("bool") + .identity("bool_field") .hour("datetime") .truncate("str", "value_x".length()) .build(); @@ -136,9 +137,9 @@ public void testWriteToPartitionedAndValidateWithBQQuery() assertThat(beamRows, containsInAnyOrder(inputRows.toArray())); String queryByPartition = - String.format("SELECT bool, datetime FROM `%s.%s`", OPTIONS.getProject(), tableId()); + String.format("SELECT bool_field, datetime FROM `%s.%s`", OPTIONS.getProject(), tableId()); rows = bqClient.queryUnflattened(queryByPartition, OPTIONS.getProject(), true, true); - RowFilter rowFilter = new RowFilter(BEAM_SCHEMA).keep(Arrays.asList("bool", "datetime")); + RowFilter rowFilter = new RowFilter(BEAM_SCHEMA).keep(Arrays.asList("bool_field", "datetime")); beamRows = rows.stream() .map(tr -> BigQueryUtils.toBeamRow(rowFilter.outputSchema(), tr)) From f8069945e7b6a9d3436ca535e658e99a9006d69a Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 6 May 2025 11:54:48 -0400 Subject: [PATCH 10/15] simplify unneeded class --- .../beam/sdk/io/iceberg/FilterUtils.java | 18 ------------------ .../beam/sdk/io/iceberg/ReadFromTasks.java | 6 +++--- .../beam/sdk/io/iceberg/ScanTaskReader.java | 5 +++-- 3 files changed, 6 insertions(+), 23 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java index 9bec38c09e55..74ff86fcafda 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java @@ -41,8 +41,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.Schema; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expression.Operation; import org.apache.iceberg.expressions.Expressions; @@ -74,22 +72,6 @@ class FilterUtils { .put(SqlKind.OR, Operation.OR) .build(); - static FilterFunction filterOn(Expression expression, Schema schema) { - return new FilterFunction(expression, schema); - } - - static class FilterFunction { - private final Evaluator evaluator; - - FilterFunction(Expression expression, Schema schema) { - this.evaluator = new Evaluator(schema.asStruct(), expression); - } - - boolean filter(Record rec) { - return evaluator.eval(rec); - } - } - static Expression convert(@Nullable String filter, Schema schema) { if (filter == null) { return Expressions.alwaysTrue(); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java index 48c9a82f7f86..f408134be94d 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.ExecutionException; -import org.apache.beam.sdk.io.iceberg.FilterUtils.FilterFunction; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; @@ -31,6 +30,7 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Table; import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.checkerframework.checker.nullness.qual.Nullable; @@ -80,8 +80,8 @@ public void process( try (CloseableIterable fullIterable = ReadUtils.createReader(task, table)) { CloseableIterable reader = fullIterable; if (filter != null && filter.op() != Expression.Operation.TRUE) { - FilterFunction filterFunc = FilterUtils.filterOn(filter, table.schema()); - reader = CloseableIterable.filter(reader, filterFunc::filter); + Evaluator evaluator = new Evaluator(table.schema().asStruct(), filter); + reader = CloseableIterable.filter(reader, evaluator::eval); } for (Record record : reader) { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java index cdb5b050029b..5691f179d1b7 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java @@ -43,6 +43,7 @@ import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.encryption.InputFilesDecryptor; +import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; @@ -188,8 +189,8 @@ public boolean advance() throws IOException { new GenericDeleteFilter(checkStateNotNull(io), fileTask, fileTask.schema(), project); iterable = deleteFilter.filter(iterable); if (filter != null && filter.op() != Expression.Operation.TRUE) { - FilterUtils.FilterFunction filterFunction = FilterUtils.filterOn(filter, project); - iterable = CloseableIterable.filter(iterable, filterFunction::filter); + Evaluator evaluator = new Evaluator(project.asStruct(), filter); + iterable = CloseableIterable.filter(iterable, evaluator::eval); } currentIterator = iterable.iterator(); } while (true); From 03cdffe61513bba98bbe79504963d7d80d6e6711 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 12 May 2025 19:09:42 -0400 Subject: [PATCH 11/15] multiple filter test; add evaluator logic to read utils; cleanup --- .../beam/sdk/io/iceberg/IcebergScanConfig.java | 16 ++++++++++++++++ .../beam/sdk/io/iceberg/ReadFromTasks.java | 10 +--------- .../apache/beam/sdk/io/iceberg/ReadUtils.java | 13 +++++++++++++ .../beam/sdk/io/iceberg/ScanTaskReader.java | 10 ++-------- .../beam/sdk/io/iceberg/ScanTaskSource.java | 7 ++----- .../beam/sdk/io/iceberg/FilterUtilsTest.java | 1 - .../beam/sdk/io/iceberg/IcebergIOReadTest.java | 17 +++++++++++++++-- 7 files changed, 49 insertions(+), 25 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index 5c3b0471c4be..437bcbbf7cea 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -34,6 +34,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.types.Types; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @@ -89,6 +90,21 @@ public org.apache.iceberg.Schema getProjectedSchema() { return resolveSchema(getTable().schema(), getKeepFields(), getDropFields()); } + @Pure + @Nullable + public Evaluator getEvaluator() { + @Nullable Expression filter = getFilter(); + if (filter == null) { + return null; + } + if (cachedEvaluator == null) { + cachedEvaluator = new Evaluator(getProjectedSchema().asStruct(), filter); + } + return cachedEvaluator; + } + + private @Nullable Evaluator cachedEvaluator; + @Pure public abstract @Nullable Expression getFilter(); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java index 7a4e6b78f113..366f5565d425 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java @@ -31,10 +31,7 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Table; import org.apache.iceberg.data.Record; -import org.apache.iceberg.expressions.Evaluator; -import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; -import org.checkerframework.checker.nullness.qual.Nullable; /** * Bounded read implementation. @@ -67,7 +64,6 @@ public void process( throws IOException, ExecutionException, InterruptedException { ReadTask readTask = element.getValue(); Table table = TableCache.get(scanConfig.getTableIdentifier()); - @Nullable Expression filter = scanConfig.getFilter(); List fileScanTasks = readTask.getFileScanTasks(); @@ -82,11 +78,7 @@ public void process( Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(projected); try (CloseableIterable fullIterable = ReadUtils.createReader(task, table, projected)) { - CloseableIterable reader = fullIterable; - if (filter != null && filter.op() != Expression.Operation.TRUE) { - Evaluator evaluator = new Evaluator(table.schema().asStruct(), filter); - reader = CloseableIterable.filter(reader, evaluator::eval); - } + CloseableIterable reader = ReadUtils.maybeApplyFilter(fullIterable, scanConfig); for (Record record : reader) { Row row = IcebergUtils.icebergRecordToBeamRow(beamSchema, record); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java index 827d17f7819d..7790ad941fc3 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java @@ -43,7 +43,10 @@ import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptedInputFile; +import org.apache.iceberg.expressions.Evaluator; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMapping; @@ -192,4 +195,14 @@ static List snapshotsBetween( return snapshotIds; } + + public static CloseableIterable maybeApplyFilter( + CloseableIterable iterable, IcebergScanConfig scanConfig) { + Expression filter = scanConfig.getFilter(); + Evaluator evaluator = scanConfig.getEvaluator(); + if (filter != null && evaluator != null && filter.op() != Expression.Operation.TRUE) { + return CloseableIterable.filter(iterable, evaluator::eval); + } + return iterable; + } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java index d6ad474a6b19..c52b39dde1c2 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java @@ -43,8 +43,6 @@ import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.encryption.InputFilesDecryptor; -import org.apache.iceberg.expressions.Evaluator; -import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; @@ -61,7 +59,6 @@ class ScanTaskReader extends BoundedSource.BoundedReader { private final ScanTaskSource source; private final org.apache.iceberg.Schema project; - private final @Nullable Expression filter; private final Schema beamSchema; transient @Nullable FileIO io; @@ -73,7 +70,6 @@ class ScanTaskReader extends BoundedSource.BoundedReader { public ScanTaskReader(ScanTaskSource source) { this.source = source; this.project = source.getSchema(); - this.filter = source.getFilter(); this.beamSchema = icebergSchemaToBeamSchema(project); } @@ -188,10 +184,8 @@ public boolean advance() throws IOException { GenericDeleteFilter deleteFilter = new GenericDeleteFilter(checkStateNotNull(io), fileTask, fileTask.schema(), project); iterable = deleteFilter.filter(iterable); - if (filter != null && filter.op() != Expression.Operation.TRUE) { - Evaluator evaluator = new Evaluator(project.asStruct(), filter); - iterable = CloseableIterable.filter(iterable, evaluator::eval); - } + + iterable = ReadUtils.maybeApplyFilter(iterable, source.getScanConfig()); currentIterator = iterable.iterator(); } while (true); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java index df334407978e..2c92c5572c6d 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskSource.java @@ -29,8 +29,6 @@ import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.expressions.Expression; -import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.dataflow.qual.Pure; /** @@ -62,9 +60,8 @@ Schema getSchema() { } @Pure - @Nullable - Expression getFilter() { - return scanConfig.getFilter(); + IcebergScanConfig getScanConfig() { + return scanConfig; } @Override diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java index cc0755897548..34e7be619110 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java @@ -577,7 +577,6 @@ void validate() { required(10, "field_10", Types.IntegerType.get())); private static void checkEquals(Expression expectedExpr, Expression actualExpr) { - System.out.printf("e: %s\n\na: %s%n-----------%n", expectedExpr, actualExpr); if (expectedExpr instanceof UnboundPredicate) { assertTrue(actualExpr instanceof UnboundPredicate); } else if (expectedExpr instanceof And) { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index f439b6384b49..5bfcb1345c37 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -22,6 +22,7 @@ import static org.apache.beam.sdk.io.iceberg.IcebergScanConfig.resolveSchema; import static org.apache.beam.sdk.io.iceberg.IcebergUtils.icebergSchemaToBeamSchema; import static org.apache.beam.sdk.io.iceberg.TestFixtures.createRecord; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.iceberg.types.Types.NestedField.required; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -33,6 +34,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -360,7 +362,10 @@ public void testScanWithFilter() throws Exception { List> expectedRecords = warehouse.commitData(simpleTable); IcebergIO.ReadRows read = - IcebergIO.readRows(catalogConfig()).from(tableId).withFilter("\"id\" < 10"); + IcebergIO.readRows(catalogConfig()) + .from(tableId) + .withFilter( + "\"id\" < 10 AND \"id\" >= 2 AND \"data\" <> 'clammy' AND \"data\" <> 'brainy'"); if (useIncrementalScan) { read = read.withCdc().toSnapshot(simpleTable.currentSnapshot().snapshotId()); @@ -369,7 +374,15 @@ public void testScanWithFilter() throws Exception { expectedRecords.stream() .flatMap(List::stream) .map(record -> IcebergUtils.icebergRecordToBeamRow(schema, record)) - .filter(row -> row.getInt64("id") < 10) + .filter( + row -> { + long id = checkStateNotNull(row.getInt64("id")); + String data = checkStateNotNull(row.getString("data")); + return id < 10 + && id >= 2 + && !Objects.equals(data, "clammy") + && !Objects.equals(data, "brainy"); + }) .collect(Collectors.toList()); PCollection output = testPipeline.apply(read).apply(new PrintRow()); From 6e7a7b9cd765da357b3062d4315701117787466a Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 12 May 2025 19:11:14 -0400 Subject: [PATCH 12/15] update public doc --- .../content/en/documentation/io/managed-io.md | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/website/www/site/content/en/documentation/io/managed-io.md b/website/www/site/content/en/documentation/io/managed-io.md index dc7879df5530..d9420cb6a16c 100644 --- a/website/www/site/content/en/documentation/io/managed-io.md +++ b/website/www/site/content/en/documentation/io/managed-io.md @@ -62,8 +62,8 @@ and Beam SQL is invoked via the Managed API under the hood. catalog_name (str)
catalog_properties (map[str, str])
config_properties (map[str, str])
- filter (str)
drop (list[str])
+ filter (str)
from_snapshot (int64)
from_timestamp (int64)
keep (list[str])
@@ -85,8 +85,8 @@ and Beam SQL is invoked via the Managed API under the hood. catalog_properties (map[str, str])
config_properties (map[str, str])
drop (list[str])
- keep (list[str])
filter (str)
+ keep (list[str])
table (str)
@@ -200,24 +200,24 @@ and Beam SQL is invoked via the Managed API under the hood. - filter + drop - str + list[str] - SQL-like predicate to filter data at scan time. Example: "id > 5 AND status = 'ACTIVE'". Uses Apache Calcite syntax: https://calcite.apache.org/docs/reference.html + A subset of column names to exclude from reading. If null or empty, all columns will be read. - drop + filter - list[str] + str - A subset of column names to exclude from reading. If null or empty, all columns will be read. + SQL-like predicate to filter data at scan time. Example: "id > 5 AND status = 'ACTIVE'". Uses Apache Calcite syntax: https://calcite.apache.org/docs/reference.html @@ -477,24 +477,24 @@ and Beam SQL is invoked via the Managed API under the hood. - keep + filter - list[str] + str - A subset of column names to read exclusively. If null or empty, all columns will be read. + SQL-like predicate to filter data at scan time. Example: "id > 5 AND status = 'ACTIVE'". Uses Apache Calcite syntax: https://calcite.apache.org/docs/reference.html - filter + keep - str + list[str] - SQL-like predicate to filter data at scan time. Example: "id > 5 AND status = 'ACTIVE'". Uses Apache Calcite syntax: https://calcite.apache.org/docs/reference.html + A subset of column names to read exclusively. If null or empty, all columns will be read. From 05cdd4b3b47e8d7880173031e1aed168ee91b671 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 13 May 2025 10:39:42 -0400 Subject: [PATCH 13/15] transient --- .../java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index 437bcbbf7cea..4edf3512952e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -103,7 +103,7 @@ public Evaluator getEvaluator() { return cachedEvaluator; } - private @Nullable Evaluator cachedEvaluator; + private transient @Nullable Evaluator cachedEvaluator; @Pure public abstract @Nullable Expression getFilter(); From bdde37310942c40a55a4db951b6bf941c0448161 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 13 May 2025 10:40:17 -0400 Subject: [PATCH 14/15] trigger tests --- .github/trigger_files/IO_Iceberg_Integration_Tests.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 37dd25bf9029..7ab7bcd9a9c6 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 3 + "modification": 2 } From 7dbb1a8bb76e3fcf6700902e9e5ead739b0661e0 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 13 May 2025 16:08:16 -0400 Subject: [PATCH 15/15] fix test --- .../apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java index 11e4774b7ed8..e4b91fa4014a 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java @@ -478,6 +478,7 @@ public void testStreamingReadWithFilter() throws Exception { Map config = new HashMap<>(managedIcebergConfig(tableId())); config.put("streaming", true); config.put("to_snapshot", table.currentSnapshot().snapshotId()); + config.put("filter", "\"bool_field\" = TRUE AND (\"int_field\" < 350 OR \"modulo_5\" = 2)"); PCollection rows = pipeline.apply(Managed.read(ICEBERG_CDC).withConfig(config)).getSinglePCollection();