Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 2
"modification": 3
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 1
"modification": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -212,6 +214,11 @@ public Row filter(Row row) {
return Preconditions.checkNotNull(copyWithNewSchema(row, outputSchema()));
}

/** Applies {@link #filter(Row)} to each Row in a list. */
public List<Row> filter(List<Row> rows) {
return rows.stream().map(this::filter).collect(Collectors.toList());
}

/** Returns the output {@link Row}'s {@link Schema}. */
public Schema outputSchema() {
return transformedSchema != null ? transformedSchema : rowSchema;
Expand Down Expand Up @@ -374,6 +381,14 @@ static Schema dropFields(Schema schema, List<String> fieldsToDrop) {
newFieldsList.set(newFieldsList.indexOf(fieldToRemove), modifiedField);
}
}

// re-order based on original schema's ordering
Map<String, Integer> indexMap = new HashMap<>();
for (int i = 0; i < schema.getFieldCount(); i++) {
indexMap.put(schema.getField(i).getName(), i);
}
newFieldsList.sort(Comparator.comparingInt(f -> checkStateNotNull(indexMap.get(f.getName()))));

return new Schema(newFieldsList);
}

Expand Down Expand Up @@ -416,6 +431,13 @@ static Schema keepFields(Schema schema, List<String> fieldsToKeep) {
newFieldsList.add(fieldToKeep);
}

// re-order based on original schema's ordering
Map<String, Integer> indexMap = new HashMap<>();
for (int i = 0; i < schema.getFieldCount(); i++) {
indexMap.put(schema.getField(i).getName(), i);
}
newFieldsList.sort(Comparator.comparingInt(f -> checkStateNotNull(indexMap.get(f.getName()))));

return new Schema(newFieldsList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ public void process(
}

LOG.info("Planning to scan snapshot {}", toSnapshot);
IncrementalAppendScan scan = table.newIncrementalAppendScan().toSnapshot(toSnapshot);
IncrementalAppendScan scan =
table
.newIncrementalAppendScan()
.toSnapshot(toSnapshot)
.project(scanConfig.getProjectedSchema());
if (fromSnapshot != null) {
scan = scan.fromSnapshotExclusive(fromSnapshot);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
.fromTimestamp(configuration.getFromTimestamp())
.toTimestamp(configuration.getToTimestamp())
.withStartingStrategy(strategy)
.streaming(configuration.getStreaming());
.streaming(configuration.getStreaming())
.keeping(configuration.getKeep())
.dropping(configuration.getDrop());

@Nullable Integer pollIntervalSeconds = configuration.getPollIntervalSeconds();
if (pollIntervalSeconds != null) {
Expand Down Expand Up @@ -177,6 +179,14 @@ static Builder builder() {
"The interval at which to poll for new snapshots. Defaults to 60 seconds.")
abstract @Nullable Integer getPollIntervalSeconds();

@SchemaFieldDescription(
"A subset of column names to read exclusively. If null or empty, all columns will be read.")
abstract @Nullable List<String> getKeep();

@SchemaFieldDescription(
"A subset of column names to exclude from reading. If null or empty, all columns will be read.")
abstract @Nullable List<String> getDrop();

@AutoValue.Builder
abstract static class Builder {
abstract Builder setTable(String table);
Expand All @@ -201,6 +211,10 @@ abstract static class Builder {

abstract Builder setStreaming(Boolean streaming);

abstract Builder setKeep(List<String> keep);

abstract Builder setDrop(List<String> drop);

abstract Configuration build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,10 @@ public enum StartingStrategy {

abstract @Nullable Duration getPollInterval();

abstract @Nullable List<String> getKeep();

abstract @Nullable List<String> getDrop();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -627,6 +631,10 @@ abstract static class Builder {

abstract Builder setPollInterval(@Nullable Duration triggeringFrequency);

abstract Builder setKeep(@Nullable List<String> fields);

abstract Builder setDrop(@Nullable List<String> fields);

abstract ReadRows build();
}

Expand Down Expand Up @@ -666,6 +674,14 @@ public ReadRows withStartingStrategy(@Nullable StartingStrategy strategy) {
return toBuilder().setStartingStrategy(strategy).build();
}

public ReadRows keeping(@Nullable List<String> keep) {
return toBuilder().setKeep(keep).build();
}

public ReadRows dropping(@Nullable List<String> drop) {
return toBuilder().setDrop(drop).build();
}

@Override
public PCollection<Row> expand(PBegin input) {
TableIdentifier tableId =
Expand All @@ -687,6 +703,8 @@ public PCollection<Row> expand(PBegin input) {
.setStreaming(getStreaming())
.setPollInterval(getPollInterval())
.setUseCdc(getUseCdc())
.setKeepFields(getKeep())
.setDropFields(getDrop())
.build();
scanConfig.validate(table);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
.getPipeline()
.apply(
IcebergIO.readRows(configuration.getIcebergCatalog())
.from(TableIdentifier.parse(configuration.getTable())));
.from(TableIdentifier.parse(configuration.getTable()))
.keeping(configuration.getKeep())
.dropping(configuration.getDrop()));

return PCollectionRowTuple.of(OUTPUT_TAG, output);
}
Expand Down Expand Up @@ -121,6 +123,14 @@ static Builder builder() {
@Nullable
abstract Map<String, String> getConfigProperties();

@SchemaFieldDescription(
"A subset of column names to read exclusively. If null or empty, all columns will be read.")
abstract @Nullable List<String> getKeep();

@SchemaFieldDescription(
"A subset of column names to exclude from reading. If null or empty, all columns will be read.")
abstract @Nullable List<String> getDrop();

@AutoValue.Builder
abstract static class Builder {
abstract Builder setTable(String table);
Expand All @@ -131,6 +141,10 @@ abstract static class Builder {

abstract Builder setConfigProperties(Map<String, String> confProperties);

abstract Builder setKeep(List<String> keep);

abstract Builder setDrop(List<String> drop);

abstract Configuration build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.util.Sets.newHashSet;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.iceberg.IcebergIO.ReadRows.StartingStrategy;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand All @@ -31,6 +35,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.types.Types;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;
Expand Down Expand Up @@ -66,6 +71,24 @@ public Table getTable() {
@Pure
public abstract Schema getSchema();

@VisibleForTesting
static org.apache.iceberg.Schema resolveSchema(
org.apache.iceberg.Schema schema, @Nullable List<String> keep, @Nullable List<String> drop) {
if (keep != null && !keep.isEmpty()) {
schema = schema.select(keep);
} else if (drop != null && !drop.isEmpty()) {
Set<String> fields =
schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toSet());
drop.forEach(fields::remove);
schema = schema.select(fields);
}
return schema;
}

public org.apache.iceberg.Schema getProjectedSchema() {
return resolveSchema(getTable().schema(), getKeepFields(), getDropFields());
}

@Pure
public abstract @Nullable Expression getFilter();

Expand Down Expand Up @@ -123,6 +146,12 @@ public Table getTable() {
@Pure
public abstract @Nullable String getBranch();

@Pure
public abstract @Nullable List<String> getKeepFields();

@Pure
public abstract @Nullable List<String> getDropFields();

@Pure
public static Builder builder() {
return new AutoValue_IcebergScanConfig.Builder()
Expand Down Expand Up @@ -204,13 +233,39 @@ public Builder setTableIdentifier(String... names) {

public abstract Builder setBranch(@Nullable String branch);

public abstract Builder setKeepFields(@Nullable List<String> fields);

public abstract Builder setDropFields(@Nullable List<String> fields);

public abstract IcebergScanConfig build();
}

@VisibleForTesting
abstract Builder toBuilder();

void validate(Table table) {
@Nullable List<String> keep = getKeepFields();
@Nullable List<String> drop = getDropFields();
if (keep != null || drop != null) {
checkArgument(
keep == null || drop == null, error("only one of 'keep' or 'drop' can be set."));

Set<String> fieldsSpecified;
String param;
if (keep != null) {
param = "keep";
fieldsSpecified = newHashSet(checkNotNull(keep));
} else { // drop != null
param = "drop";
fieldsSpecified = newHashSet(checkNotNull(drop));
}
table.schema().columns().forEach(nf -> fieldsSpecified.remove(nf.name()));

checkArgument(
fieldsSpecified.isEmpty(),
error(String.format("'%s' specifies unknown field(s): %s", param, fieldsSpecified)));
}

// TODO(#34168, ahmedabu98): fill these gaps for the existing batch source
if (!getUseCdc()) {
List<String> invalidOptions = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public PCollection<Row> expand(PBegin input) {
.setCoder(KvCoder.of(ReadTaskDescriptor.getCoder(), ReadTask.getCoder()))
.apply(Redistribute.arbitrarily())
.apply("Read Rows From Tasks", ParDo.of(new ReadFromTasks(scanConfig)))
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema()));
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(scanConfig.getProjectedSchema()));
}

/** Continuously watches for new snapshots. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.KV;
Expand Down Expand Up @@ -73,9 +74,11 @@ public void process(
return;
}
FileScanTask task = fileScanTasks.get((int) l);
try (CloseableIterable<Record> reader = ReadUtils.createReader(task, table)) {
org.apache.iceberg.Schema projected = scanConfig.getProjectedSchema();
Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(projected);
try (CloseableIterable<Record> reader = ReadUtils.createReader(task, table, projected)) {
for (Record record : reader) {
Row row = IcebergUtils.icebergRecordToBeamRow(scanConfig.getSchema(), record);
Row row = IcebergUtils.icebergRecordToBeamRow(beamSchema, record);
out.output(row);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
Expand Down Expand Up @@ -67,7 +68,7 @@ public class ReadUtils {
"parquet.read.support.class",
"parquet.crypto.factory.class");

static ParquetReader<Record> createReader(FileScanTask task, Table table) {
static ParquetReader<Record> createReader(FileScanTask task, Table table, Schema schema) {
String filePath = task.file().path().toString();
InputFile inputFile;
try (FileIO io = table.io()) {
Expand Down Expand Up @@ -100,11 +101,11 @@ static ParquetReader<Record> createReader(FileScanTask task, Table table) {

return new ParquetReader<>(
inputFile,
table.schema(),
schema,
optionsBuilder.build(),
// TODO(ahmedabu98): Implement a Parquet-to-Beam Row reader, bypassing conversion to Iceberg
// Record
fileSchema -> GenericParquetReaders.buildReader(table.schema(), fileSchema, idToConstants),
fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema, idToConstants),
mapping,
task.residual(),
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public ScanSource(IcebergScanConfig scanConfig) {

private TableScan getTableScan() {
Table table = scanConfig.getTable();
TableScan tableScan = table.newScan().project(table.schema());
TableScan tableScan = table.newScan().project(scanConfig.getProjectedSchema());

if (scanConfig.getFilter() != null) {
tableScan = tableScan.filter(scanConfig.getFilter());
Expand Down Expand Up @@ -115,7 +115,7 @@ public void populateDisplayData(DisplayData.Builder builder) {

@Override
public Coder<Row> getOutputCoder() {
return RowCoder.of(scanConfig.getSchema());
return RowCoder.of(IcebergUtils.icebergSchemaToBeamSchema(scanConfig.getProjectedSchema()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class ScanTaskReader extends BoundedSource.BoundedReader<Row> {
public ScanTaskReader(ScanTaskSource source) {
this.source = source;
this.project = source.getSchema();
this.beamSchema = icebergSchemaToBeamSchema(source.getSchema());
this.beamSchema = icebergSchemaToBeamSchema(project);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ CombinedScanTask getTask() {

@Pure
Schema getSchema() {
return getTable().schema();
return scanConfig.getProjectedSchema();
}

@Override
Expand Down
Loading
Loading