Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 3,
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
"modification": 5
}
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [IcebergIO] Support reading with column pruning ([#34856](https://github.com/apache/beam/pull/34856))
* [IcebergIO] Support reading with pushdown filtering ([#34827](https://github.com/apache/beam/pull/34827))

## New Features / Improvements
* Adding Google Storage Requests Pays feature (Golang)([#30747](https://github.com/apache/beam/issues/30747)).
Expand Down Expand Up @@ -103,6 +105,7 @@
* [Python] Fixed vLLM leaks connections causing a throughput bottleneck and underutilization of GPU ([35053](https://github.com/apache/beam/pull/35053))
* (Python) Fixed cloudpickle overwriting class states every time loading a same object of dynamic class ([#35062](https://github.com/apache/beam/issues/35062)).
* [Python] Fixed pip install apache-beam[interactive] causes crash on google colab ([#35148](https://github.com/apache/beam/pull/35148)).
* [IcebergIO] Fixed Beam <-> Iceberg conversion logic for arrays of structs and maps of structs ([#35230](https://github.com/apache/beam/pull/35230)).

## Security Fixes
* Fixed [CVE-YYYY-NNNN](https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN) (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlBasicCall;
Expand Down Expand Up @@ -72,6 +74,55 @@ class FilterUtils {
.put(SqlKind.OR, Operation.OR)
.build();

/**
* Parses a SQL filter expression string and returns a set of all field names referenced within
* it.
*/
static Set<String> getReferencedFieldNames(@Nullable String filter) {
if (filter == null || filter.trim().isEmpty()) {
return new HashSet<>();
}

SqlParser parser = SqlParser.create(filter);
try {
SqlNode expression = parser.parseExpression();
Set<String> fieldNames = new HashSet<>();
extractFieldNames(expression, fieldNames);
return fieldNames;
} catch (Exception exception) {
throw new RuntimeException(
String.format("Encountered an error when parsing filter: '%s'", filter), exception);
}
}

private static void extractFieldNames(SqlNode node, Set<String> fieldNames) {
if (node instanceof SqlIdentifier) {
fieldNames.add(((SqlIdentifier) node).getSimple());
} else if (node instanceof SqlBasicCall) {
// recursively check operands
SqlBasicCall call = (SqlBasicCall) node;
for (SqlNode operand : call.getOperandList()) {
extractFieldNames(operand, fieldNames);
}
} else if (node instanceof SqlNodeList) {
// For IN clauses, the right-hand side is a SqlNodeList, so iterate through its elements
SqlNodeList nodeList = (SqlNodeList) node;
for (SqlNode element : nodeList.getList()) {
if (element != null) {
extractFieldNames(element, fieldNames);
}
}
}
// SqlLiteral nodes do not contain field names, so we can ignore them.
}

/**
* parses a SQL filter expression string into an Iceberg {@link Expression} that can be used for
* data pruning.
*
* <p>Note: This utility currently supports only top-level fields within the filter expression.
* Nested field references are not supported.
*/
static Expression convert(@Nullable String filter, Schema schema) {
if (filter == null) {
return Expressions.alwaysTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ public PCollection<Row> expand(PBegin input) {
.setUseCdc(getUseCdc())
.setKeepFields(getKeep())
.setDropFields(getDrop())
.setFilter(FilterUtils.convert(getFilter(), table.schema()))
.setFilterString(getFilter())
.build();
scanConfig.validate(table);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -45,6 +46,10 @@
@AutoValue
public abstract class IcebergScanConfig implements Serializable {
private transient @MonotonicNonNull Table cachedTable;
private transient org.apache.iceberg.@MonotonicNonNull Schema cachedProjectedSchema;
private transient org.apache.iceberg.@MonotonicNonNull Schema cachedRequiredSchema;
private transient @MonotonicNonNull Evaluator cachedEvaluator;
private transient @MonotonicNonNull Expression cachedFilter;

public enum ScanType {
TABLE,
Expand Down Expand Up @@ -75,19 +80,59 @@ public Table getTable() {
@VisibleForTesting
static org.apache.iceberg.Schema resolveSchema(
org.apache.iceberg.Schema schema, @Nullable List<String> keep, @Nullable List<String> drop) {
return resolveSchema(schema, keep, drop, null);
}

@VisibleForTesting
static org.apache.iceberg.Schema resolveSchema(
org.apache.iceberg.Schema schema,
@Nullable List<String> keep,
@Nullable List<String> drop,
@Nullable Set<String> fieldsInFilter) {
ImmutableList.Builder<String> selectedFieldsBuilder = ImmutableList.builder();
if (keep != null && !keep.isEmpty()) {
schema = schema.select(keep);
selectedFieldsBuilder.addAll(keep);
} else if (drop != null && !drop.isEmpty()) {
Set<String> fields =
schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toSet());
drop.forEach(fields::remove);
schema = schema.select(fields);
selectedFieldsBuilder.addAll(fields);
} else {
// default: include all columns
return schema;
}
return schema;

if (fieldsInFilter != null && !fieldsInFilter.isEmpty()) {
fieldsInFilter.stream()
.map(f -> schema.caseInsensitiveFindField(f).name())
.forEach(selectedFieldsBuilder::add);
}
ImmutableList<String> selectedFields = selectedFieldsBuilder.build();
return selectedFields.isEmpty() ? schema : schema.select(selectedFields);
}

/** Returns the projected Schema after applying column pruning. */
public org.apache.iceberg.Schema getProjectedSchema() {
return resolveSchema(getTable().schema(), getKeepFields(), getDropFields());
if (cachedProjectedSchema == null) {
cachedProjectedSchema = resolveSchema(getTable().schema(), getKeepFields(), getDropFields());
}
return cachedProjectedSchema;
}

/**
* Returns a Schema that includes all the fields required for a successful read. This includes
* explicitly selected fields and fields referenced in the filter statement.
*/
public org.apache.iceberg.Schema getRequiredSchema() {
if (cachedRequiredSchema == null) {
cachedRequiredSchema =
resolveSchema(
getTable().schema(),
getKeepFields(),
getDropFields(),
FilterUtils.getReferencedFieldNames(getFilterString()));
}
return cachedRequiredSchema;
}

@Pure
Expand All @@ -98,15 +143,22 @@ public Evaluator getEvaluator() {
return null;
}
if (cachedEvaluator == null) {
cachedEvaluator = new Evaluator(getProjectedSchema().asStruct(), filter);
cachedEvaluator = new Evaluator(getRequiredSchema().asStruct(), filter);
}
return cachedEvaluator;
}

private transient @Nullable Evaluator cachedEvaluator;
@Pure
@Nullable
public Expression getFilter() {
if (cachedFilter == null) {
cachedFilter = FilterUtils.convert(getFilterString(), getTable().schema());
}
return cachedFilter;
}

@Pure
public abstract @Nullable Expression getFilter();
public abstract @Nullable String getFilterString();

@Pure
public abstract @Nullable Boolean getCaseSensitive();
Expand Down Expand Up @@ -172,7 +224,7 @@ public Evaluator getEvaluator() {
public static Builder builder() {
return new AutoValue_IcebergScanConfig.Builder()
.setScanType(ScanType.TABLE)
.setFilter(null)
.setFilterString(null)
.setCaseSensitive(null)
.setOptions(ImmutableMap.of())
.setSnapshot(null)
Expand Down Expand Up @@ -211,7 +263,7 @@ public Builder setTableIdentifier(String... names) {

public abstract Builder setSchema(Schema schema);

public abstract Builder setFilter(@Nullable Expression filter);
public abstract Builder setFilterString(@Nullable String filter);

public abstract Builder setCaseSensitive(@Nullable Boolean caseSensitive);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.nio.ByteBuffer;
import java.time.LocalDate;
Expand All @@ -30,17 +32,20 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.Instant;
Expand Down Expand Up @@ -345,10 +350,34 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row
copyRowIntoRecord(GenericRecord.create(field.type().asStructType()), row)));
break;
case LIST:
Optional.ofNullable(value.getArray(name)).ifPresent(list -> rec.setField(name, list));
Iterable<@NonNull ?> icebergList = value.getIterable(name);
Type collectionType = ((Types.ListType) field.type()).elementType();

if (collectionType.isStructType() && icebergList != null) {
org.apache.iceberg.Schema innerSchema = collectionType.asStructType().asSchema();
ImmutableList.Builder<Record> builder = ImmutableList.builder();
for (Row v : (Iterable<Row>) icebergList) {
builder.add(beamRowToIcebergRecord(innerSchema, v));
}
icebergList = builder.build();
}
Optional.ofNullable(icebergList).ifPresent(list -> rec.setField(name, list));
break;
case MAP:
Optional.ofNullable(value.getMap(name)).ifPresent(v -> rec.setField(name, v));
Map<?, ?> icebergMap = value.getMap(name);
Type valueType = ((Types.MapType) field.type()).valueType();
// recurse on struct types
if (valueType.isStructType() && icebergMap != null) {
org.apache.iceberg.Schema innerSchema = valueType.asStructType().asSchema();

ImmutableMap.Builder<Object, Record> newMap = ImmutableMap.builder();
for (Map.Entry<?, ?> entry : icebergMap.entrySet()) {
Row row = checkStateNotNull(((Row) entry.getValue()));
newMap.put(checkStateNotNull(entry.getKey()), beamRowToIcebergRecord(innerSchema, row));
}
icebergMap = newMap.build();
}
Optional.ofNullable(icebergMap).ifPresent(v -> rec.setField(name, v));
break;
}
}
Expand Down Expand Up @@ -426,10 +455,68 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) {
case DOUBLE: // Iceberg and Beam both use double
case STRING: // Iceberg and Beam both use String
case BOOLEAN: // Iceberg and Beam both use boolean
rowBuilder.addValue(icebergValue);
break;
case ARRAY:
checkState(
icebergValue instanceof List,
"Expected List type for field '%s' but received %s",
field.getName(),
icebergValue.getClass());
List<@NonNull ?> beamList = (List<@NonNull ?>) icebergValue;
Schema.FieldType collectionType =
checkStateNotNull(field.getType().getCollectionElementType());
// recurse on struct types
if (collectionType.getTypeName().isCompositeType()) {
Schema innerSchema = checkStateNotNull(collectionType.getRowSchema());
beamList =
beamList.stream()
.map(v -> icebergRecordToBeamRow(innerSchema, (Record) v))
.collect(Collectors.toList());
}
rowBuilder.addValue(beamList);
break;
case ITERABLE:
checkState(
icebergValue instanceof Iterable,
"Expected Iterable type for field '%s' but received %s",
field.getName(),
icebergValue.getClass());
Iterable<@NonNull ?> beamIterable = (Iterable<@NonNull ?>) icebergValue;
Schema.FieldType iterableCollectionType =
checkStateNotNull(field.getType().getCollectionElementType());
// recurse on struct types
if (iterableCollectionType.getTypeName().isCompositeType()) {
Schema innerSchema = checkStateNotNull(iterableCollectionType.getRowSchema());
ImmutableList.Builder<Row> builder = ImmutableList.builder();
for (Record v : (Iterable<@NonNull Record>) icebergValue) {
builder.add(icebergRecordToBeamRow(innerSchema, v));
}
beamIterable = builder.build();
}
rowBuilder.addValue(beamIterable);
break;
case MAP:
rowBuilder.addValue(icebergValue);
checkState(
icebergValue instanceof Map,
"Expected Map type for field '%s' but received %s",
field.getName(),
icebergValue.getClass());
Map<?, ?> beamMap = (Map<?, ?>) icebergValue;
Schema.FieldType valueType = checkStateNotNull(field.getType().getMapValueType());
// recurse on struct types
if (valueType.getTypeName().isCompositeType()) {
Schema innerSchema = checkStateNotNull(valueType.getRowSchema());
ImmutableMap.Builder<Object, Row> newMap = ImmutableMap.builder();
for (Map.Entry<?, ?> entry : ((Map<?, ?>) icebergValue).entrySet()) {
Record rec = ((Record) entry.getValue());
newMap.put(
checkStateNotNull(entry.getKey()),
icebergRecordToBeamRow(innerSchema, checkStateNotNull(rec)));
}
beamMap = newMap.build();
}
rowBuilder.addValue(beamMap);
break;
case DATETIME:
// Iceberg uses a long for micros.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,9 @@ public void process(
return;
}
FileScanTask task = fileScanTasks.get((int) l);
org.apache.iceberg.Schema projected = scanConfig.getProjectedSchema();
Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(projected);
Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(scanConfig.getProjectedSchema());
try (CloseableIterable<Record> fullIterable =
ReadUtils.createReader(task, table, projected)) {
ReadUtils.createReader(task, table, scanConfig.getRequiredSchema())) {
CloseableIterable<Record> reader = ReadUtils.maybeApplyFilter(fullIterable, scanConfig);

for (Record record : reader) {
Expand Down
Loading
Loading