Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 1
}
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
## Highlights

* Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528))
* [Managed Iceberg] Added support for streaming writes ([#32451](https://github.com/apache/beam/pull/32451))
* [Managed Iceberg] Added support for writing to dynamic destinations ([#32565](https://github.com/apache/beam/pull/32565))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,21 @@
*
* // this filter will exclusively keep these fields and drop everything else
* List<String> fields = Arrays.asList("foo", "bar", "baz");
* RowFilter keepingFilter = new RowFilter(beamSchema).keeping(fields);
* RowFilter keepFilter = new RowFilter(beamSchema).keep(fields);
*
* // this filter will drop these fields
* RowFilter droppingFilter = new RowFilter(beamSchema).dropping(fields);
* RowFilter dropFilter = new RowFilter(beamSchema).drop(fields);
*
* // this filter will only output the contents of row field "my_record"
* String field = "my_record";
* RowFilter onlyFilter = new RowFilter(beamSchema).only(field);
*
* // produces a filtered row
* Row outputRow = keepingFilter.filter(row);
* Row outputRow = keepFilter.filter(row);
* }</pre>
*
* Check the documentation for {@link #keeping(List)}, {@link #dropping(List)}, and {@link
* #only(String)} for further details on what an output Row can look like.
* Check the documentation for {@link #keep(List)}, {@link #drop(List)}, and {@link #only(String)}
* for further details on what an output Row can look like.
*/
public class RowFilter implements Serializable {
private final Schema rowSchema;
Expand Down Expand Up @@ -103,7 +103,7 @@ public RowFilter(Schema rowSchema) {
* nested_2: xyz
* }</pre>
*/
public RowFilter keeping(List<String> fields) {
public RowFilter keep(List<String> fields) {
checkUnconfigured();
verifyNoNestedFields(fields, "keep");
validateSchemaContainsFields(rowSchema, fields, "keep");
Expand Down Expand Up @@ -132,7 +132,7 @@ public RowFilter keeping(List<String> fields) {
* bar: 456
* }</pre>
*/
public RowFilter dropping(List<String> fields) {
public RowFilter drop(List<String> fields) {
checkUnconfigured();
verifyNoNestedFields(fields, "drop");
validateSchemaContainsFields(rowSchema, fields, "drop");
Expand Down Expand Up @@ -168,6 +168,7 @@ public RowFilter dropping(List<String> fields) {
*/
public RowFilter only(String field) {
checkUnconfigured();
verifyNoNestedFields(Collections.singletonList(field), "only");
validateSchemaContainsFields(rowSchema, Collections.singletonList(field), "only");
Schema.Field rowField = rowSchema.getField(field);
Preconditions.checkArgument(
Expand All @@ -184,8 +185,8 @@ public RowFilter only(String field) {

/**
* Performs a filter operation (keep or drop) on the input {@link Row}. Must have already
* configured a filter operation with {@link #dropping(List)} or {@link #keeping(List)} for this
* {@link RowFilter}.
* configured a filter operation with {@link #drop(List)} or {@link #keep(List)} for this {@link
* RowFilter}.
*
* <p>If not yet configured, will simply return the same {@link Row}.
*/
Expand All @@ -196,9 +197,9 @@ public Row filter(Row row) {

Preconditions.checkState(
row.getSchema().assignableTo(rowSchema),
"Encountered Row with schema that is incompatible with this RowFilter's schema."
"Encountered Row with schema that is incompatible with this filter's schema."
+ "\nRow schema: %s"
+ "\nSchema used to initialize this RowFilter: %s",
+ "\nSchema used to initialize this filter: %s",
row.getSchema(),
rowSchema);

Expand All @@ -219,8 +220,7 @@ public Schema outputSchema() {
private void checkUnconfigured() {
Preconditions.checkState(
transformedSchema == null,
"This RowFilter has already been configured to filter to the following Schema: %s",
transformedSchema);
"Invalid filter configuration: Please set only one of 'keep', 'drop', or 'only'.");
}

/** Verifies that this selection contains no nested fields. */
Expand All @@ -233,9 +233,7 @@ private void verifyNoNestedFields(List<String> fields, String operation) {
}
if (!nestedFields.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"RowFilter does not support specifying nested fields to %s: %s",
operation, nestedFields));
String.format("'%s' does not support nested fields: %s", operation, nestedFields));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
Expand Down Expand Up @@ -118,16 +119,17 @@ public RowStringInterpolator(String template, Schema rowSchema) {
* Performs string interpolation on the template using values from the input {@link Row} and its
* windowing metadata.
*/
public String interpolate(Row row, BoundedWindow window, PaneInfo paneInfo, Instant timestamp) {
public String interpolate(ValueInSingleWindow<Row> element) {
String interpolated = this.template;
for (String field : fieldsToReplace) {
Object val;
Instant timestamp = element.getTimestamp();
switch (field) {
case WINDOW:
val = window.toString();
val = element.getWindow().toString();
break;
case PANE_INDEX:
val = paneInfo.getIndex();
val = element.getPane().getIndex();
break;
case YYYY:
val = timestamp.getChronology().year().get(timestamp.getMillis());
Expand All @@ -139,7 +141,7 @@ public String interpolate(Row row, BoundedWindow window, PaneInfo paneInfo, Inst
val = timestamp.getChronology().dayOfMonth().get(timestamp.getMillis());
break;
default:
val = MoreObjects.firstNonNull(getValue(row, field), "");
val = MoreObjects.firstNonNull(getValue(element.getValue(), field), "");
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,10 @@ public void testKeepSchemaFields() {
@Test
public void testDropNestedFieldsFails() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("RowFilter does not support specifying nested fields to drop");
thrown.expectMessage("'drop' does not support nested fields");

new RowFilter(ROW_SCHEMA)
.dropping(
.drop(
Arrays.asList(
"bool",
"nullable_int",
Expand All @@ -270,10 +270,10 @@ public void testDropNestedFieldsFails() {
@Test
public void testKeepNestedFieldsFails() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("RowFilter does not support specifying nested fields to keep");
thrown.expectMessage("'keep' does not support nested fields");

new RowFilter(ROW_SCHEMA)
.keeping(
.keep(
Arrays.asList("str", "arr_int", "row.nested_str", "row.nested_row.doubly_nested_str"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.junit.Rule;
Expand Down Expand Up @@ -68,7 +69,9 @@ public void testInvalidRowThrowsHelpfulError() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid row does not contain field 'str'.");

interpolator.interpolate(invalidRow, null, null, null);
interpolator.interpolate(
ValueInSingleWindow.of(
invalidRow, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
}

@Test
Expand All @@ -85,7 +88,9 @@ public void testInvalidRowThrowsHelpfulErrorForNestedFields() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid row does not contain field 'nested_int'.");

interpolator.interpolate(invalidRow, null, null, null);
interpolator.interpolate(
ValueInSingleWindow.of(
invalidRow, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
}

@Test
Expand All @@ -105,7 +110,9 @@ public void testInvalidRowThrowsHelpfulErrorForDoublyNestedFields() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid row does not contain field 'doubly_nested_int'.");

interpolator.interpolate(invalidRow, null, null, null);
interpolator.interpolate(
ValueInSingleWindow.of(
invalidRow, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
}

private static final Row ROW =
Expand Down Expand Up @@ -134,7 +141,9 @@ public void testTopLevelInterpolation() {
String template = "foo {str}, bar {bool}, baz {int}, xyz {nullable_int}";
RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);

String output = interpolator.interpolate(ROW, null, null, null);
String output =
interpolator.interpolate(
ValueInSingleWindow.of(ROW, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));

assertEquals("foo str_value, bar true, baz 123, xyz ", output);
}
Expand All @@ -144,7 +153,9 @@ public void testNestedLevelInterpolation() {
String template = "foo {str}, bar {row.nested_str}, baz {row.nested_float}";
RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);

String output = interpolator.interpolate(ROW, null, null, null);
String output =
interpolator.interpolate(
ValueInSingleWindow.of(ROW, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));

assertEquals("foo str_value, bar nested_str_value, baz 1.234", output);
}
Expand All @@ -155,7 +166,9 @@ public void testDoublyNestedInterpolation() {
"foo {str}, bar {row.nested_row.doubly_nested_str}, baz {row.nested_row.doubly_nested_int}";
RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);

String output = interpolator.interpolate(ROW, null, null, null);
String output =
interpolator.interpolate(
ValueInSingleWindow.of(ROW, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));

assertEquals("foo str_value, bar doubly_nested_str_value, baz 789", output);
}
Expand All @@ -177,10 +190,11 @@ public void testInterpolateWindowingInformation() {

String output =
interpolator.interpolate(
ROW,
GlobalWindow.INSTANCE,
PaneInfo.createPane(false, false, PaneInfo.Timing.ON_TIME, 2, 0),
instant);
ValueInSingleWindow.of(
ROW,
instant,
GlobalWindow.INSTANCE,
PaneInfo.createPane(false, false, PaneInfo.Timing.ON_TIME, 2, 0)));
String expected =
String.format(
"str: str_value, window: %s, pane: 2, year: 2024, month: 8, day: 28",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DATA;
import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DEST;

import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Instant;

/**
* Assigns the destination metadata for each input record.
Expand All @@ -32,7 +39,7 @@
*/
class AssignDestinations extends PTransform<PCollection<Row>, PCollection<Row>> {

private DynamicDestinations dynamicDestinations;
private final DynamicDestinations dynamicDestinations;

public AssignDestinations(DynamicDestinations dynamicDestinations) {
this.dynamicDestinations = dynamicDestinations;
Expand All @@ -41,23 +48,30 @@ public AssignDestinations(DynamicDestinations dynamicDestinations) {
@Override
public PCollection<Row> expand(PCollection<Row> input) {

final Schema inputSchema = input.getSchema();
final Schema outputSchema =
Schema.builder()
.addRowField("data", inputSchema)
.addRowField("dest", dynamicDestinations.getMetadataSchema())
.addStringField(DEST)
.addRowField(DATA, dynamicDestinations.getDataSchema())
.build();

return input
.apply(
ParDo.of(
new DoFn<Row, Row>() {
@ProcessElement
public void processElement(@Element Row data, OutputReceiver<Row> out) {
public void processElement(
@Element Row element,
BoundedWindow window,
PaneInfo paneInfo,
@Timestamp Instant timestamp,
OutputReceiver<Row> out) {
String tableIdentifier =
dynamicDestinations.getTableStringIdentifier(
ValueInSingleWindow.of(element, timestamp, window, paneInfo));
Row data = dynamicDestinations.getData(element);

out.output(
Row.withSchema(outputSchema)
.addValues(data, dynamicDestinations.assignDestinationMetadata(data))
.build());
Row.withSchema(outputSchema).addValues(tableIdentifier, data).build());
}
}))
.setRowSchema(outputSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@
import java.io.Serializable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.iceberg.catalog.TableIdentifier;

public interface DynamicDestinations extends Serializable {

Schema getMetadataSchema();
Schema getDataSchema();

Row assignDestinationMetadata(Row data);
Row getData(Row element);

IcebergDestination instantiateDestination(Row dest);
IcebergDestination instantiateDestination(String destination);

static DynamicDestinations singleTable(TableIdentifier tableId) {
return new OneTableDynamicDestinations(tableId);
String getTableStringIdentifier(ValueInSingleWindow<Row> element);

static DynamicDestinations singleTable(TableIdentifier tableId, Schema inputSchema) {
return new OneTableDynamicDestinations(tableId, inputSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,21 @@ public IcebergWriteResult expand(PCollection<Row> input) {
DynamicDestinations destinations = getDynamicDestinations();
if (destinations == null) {
destinations =
DynamicDestinations.singleTable(Preconditions.checkNotNull(getTableIdentifier()));
DynamicDestinations.singleTable(
Preconditions.checkNotNull(getTableIdentifier()), input.getSchema());
}

if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
// Assign destinations before re-windowing to global because
// user's dynamic destination may depend on windowing properties
PCollection<Row> assignedRows =
input.apply("Set Destination Metadata", new AssignDestinations(destinations));

if (assignedRows.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
Duration triggeringFrequency = getTriggeringFrequency();
checkArgumentNotNull(
triggeringFrequency, "Streaming pipelines must set a triggering frequency.");
input =
input.apply(
assignedRows =
assignedRows.apply(
"WindowIntoGlobal",
Window.<Row>into(new GlobalWindows())
.triggering(
Expand All @@ -138,11 +144,9 @@ public IcebergWriteResult expand(PCollection<Row> input) {
getTriggeringFrequency() == null,
"Triggering frequency is only applicable for streaming pipelines.");
}
return input
.apply("Set Destination Metadata", new AssignDestinations(destinations))
.apply(
"Write Rows to Destinations",
new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency()));
return assignedRows.apply(
"Write Rows to Destinations",
new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency()));
}
}

Expand Down
Loading