values = new HashMap<>(newSchema.getFieldCount());
+
+ for (Schema.Field field : newSchema.getFields()) {
+ String name = field.getName();
+ Object value = row.getValue(name);
+ if (field.getType().getTypeName().equals(Schema.TypeName.ROW)) {
+ Schema nestedRowSchema = Preconditions.checkNotNull(field.getType().getRowSchema());
+ value = copyWithNewSchema(row.getRow(name), nestedRowSchema);
+ }
+ if (value != null) {
+ values.put(name, value);
+ }
+ }
+ return Row.withSchema(newSchema).withFieldValues(values).build();
+ }
+
+ /**
+ * Returns a new {@link Schema} with the specified fields removed.
+ *
+ * No guarantee that field ordering will remain the same.
+ */
+ @VisibleForTesting
+ static Schema dropFields(Schema schema, List fieldsToDrop) {
+ if (fieldsToDrop.isEmpty()) {
+ return schema;
+ }
+ List newFieldsList = new ArrayList<>(schema.getFields());
+ Map> fieldTree = getFieldTree(fieldsToDrop);
+
+ for (Map.Entry> fieldAndDescendents : fieldTree.entrySet()) {
+ String root = fieldAndDescendents.getKey();
+ List nestedFields = fieldAndDescendents.getValue();
+ Schema.Field fieldToRemove = schema.getField(root);
+ Schema.FieldType typeToRemove = fieldToRemove.getType();
+
+ // Base case: we're at the specified field to remove.
+ if (nestedFields.isEmpty()) {
+ newFieldsList.remove(fieldToRemove);
+ } else {
+ // Otherwise, we're asked to remove a nested field. Verify current field is ROW type
+ Preconditions.checkArgument(
+ typeToRemove.getTypeName().equals(Schema.TypeName.ROW),
+ "Expected type %s for specified nested field '%s', but instead got type %s.",
+ Schema.TypeName.ROW,
+ root,
+ typeToRemove.getTypeName());
+
+ Schema nestedSchema = Preconditions.checkNotNull(typeToRemove.getRowSchema());
+ Schema newNestedSchema = dropFields(nestedSchema, nestedFields);
+ Schema.Field modifiedField =
+ Schema.Field.of(root, Schema.FieldType.row(newNestedSchema))
+ .withNullable(typeToRemove.getNullable());
+
+ // Replace with modified field
+ newFieldsList.set(newFieldsList.indexOf(fieldToRemove), modifiedField);
+ }
+ }
+ return new Schema(newFieldsList);
+ }
+
+ /**
+ * Returns a new {@link Schema} with only the specified fields kept.
+ *
+ * No guarantee that field ordering will remain the same.
+ */
+ @VisibleForTesting
+ static Schema keepFields(Schema schema, List fieldsToKeep) {
+ if (fieldsToKeep.isEmpty()) {
+ return schema;
+ }
+ List newFieldsList = new ArrayList<>(fieldsToKeep.size());
+ Map> fieldTree = getFieldTree(fieldsToKeep);
+
+ for (Map.Entry> fieldAndDescendents : fieldTree.entrySet()) {
+ String root = fieldAndDescendents.getKey();
+ List nestedFields = fieldAndDescendents.getValue();
+ Schema.Field fieldToKeep = schema.getField(root);
+ Schema.FieldType typeToKeep = fieldToKeep.getType();
+
+ // Base case: we're at the specified field to keep, and we can skip this conditional.
+ // Otherwise: we're asked to keep a nested field, so we dig deeper to determine which nested
+ // fields to keep
+ if (!nestedFields.isEmpty()) {
+ Preconditions.checkArgument(
+ typeToKeep.getTypeName().equals(Schema.TypeName.ROW),
+ "Expected type %s for specified nested field '%s', but instead got type %s.",
+ Schema.TypeName.ROW,
+ root,
+ typeToKeep.getTypeName());
+
+ Schema nestedSchema = Preconditions.checkNotNull(typeToKeep.getRowSchema());
+ Schema newNestedSchema = keepFields(nestedSchema, nestedFields);
+ fieldToKeep =
+ Schema.Field.of(root, Schema.FieldType.row(newNestedSchema))
+ .withNullable(typeToKeep.getNullable());
+ }
+ newFieldsList.add(fieldToKeep);
+ }
+
+ return new Schema(newFieldsList);
+ }
+
+ /**
+ * Performs a filter operation (keep or drop) on the input {@link Row}. Must have already
+ * configured a filter operation with {@link #dropping(List)} or {@link #keeping(List)} for this
+ * {@link RowFilter}.
+ *
+ * If not yet configured, will simply return the same {@link Row}.
+ */
+ public Row filter(Row row) {
+ if (transformedSchema == null) {
+ return row;
+ }
+ Preconditions.checkState(
+ row.getSchema().assignableTo(rowSchema),
+ "Encountered Row with schema that is incompatible with this RowFilter's schema.\nRow schema: %s\nSchema used to initialize this RowFilter: %s",
+ row.getSchema(),
+ rowSchema);
+
+ return Preconditions.checkNotNull(copyWithNewSchema(row, outputSchema()));
+ }
+
+ /** Returns the output {@link Row}'s {@link Schema}. */
+ public Schema outputSchema() {
+ return transformedSchema != null ? transformedSchema : rowSchema;
+ }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java
new file mode 100644
index 000000000000..b00df9ae74ea
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A utility that interpolates values in a pre-determined {@link String} using an input Beam {@link
+ * Row}.
+ *
+ *
The {@link RowStringInterpolator} looks for field names specified inside {curly braces}. For
+ * example, if the interpolator is configured with the String {@code "unified {foo} and streaming"},
+ * it will look for a field name {@code "foo"} in the input {@link Row} and substitute in that
+ * value. A {@link RowStringInterpolator} configured with a template String that contains no
+ * placeholders (i.e. no curly braces), it will always return that String, untouched.
+ *
+ *
Nested fields can be specified using dot-notation (e.g. {@code "top.middle.nested"}).
+ *
+ *
Configure a {@link RowStringInterpolator} like so:
+ *
+ *
{@code
+ * String template = "unified {foo} and {bar.baz}!"
+ * Row inputRow = ...
+ *
+ * RowStringInterpolator interpolator = new RowStringInterpolator(template, beamSchema);
+ * String output = interpolator.interpolate(inputRow);
+ * // output --> "unified batch and streaming!"
+ * }
+ *
+ * Additionally, {@link #interpolate(Row, BoundedWindow, PaneInfo, Instant)} can be used in
+ * streaming scenarios when you want to include windowing metadata in the output String. To make use
+ * of this, include the relevant placeholder:
+ *
+ *
+ * - $WINDOW: the window's string representation
+ *
- $PANE_INDEX: the pane's index
+ *
- $YYYY: the element timestamp's year
+ *
- $MM: the element timestamp's month
+ *
- $DD: the element timestamp's day
+ *
+ *
+ * For example, your Sting template can look like:
+ *
+ *
{@code "unified {foo} and {bar} since {$YYYY}-{$MM}!"}
+ */
+public class RowStringInterpolator implements Serializable {
+ private final String template;
+ private final Set fieldsToReplace;
+ public static final String WINDOW = "$WINDOW";
+ public static final String PANE_INDEX = "$PANE_INDEX";
+ public static final String YYYY = "$YYYY";
+ public static final String MM = "$MM";
+ public static final String DD = "$DD";
+ private static final Set WINDOWING_METADATA =
+ Sets.newHashSet(WINDOW, PANE_INDEX, YYYY, MM, DD);
+
+ public RowStringInterpolator(String template, Schema rowSchema) {
+ this.template = template;
+
+ Matcher m = Pattern.compile("\\{(.+?)}").matcher(template);
+ fieldsToReplace = new HashSet<>();
+ while (m.find()) {
+ fieldsToReplace.add(StringUtils.strip(m.group(), "{}"));
+ }
+
+ List rowFields =
+ fieldsToReplace.stream()
+ .filter(f -> !WINDOWING_METADATA.contains(f))
+ .collect(Collectors.toList());
+
+ RowFilter.validateSchemaContainsFields(rowSchema, rowFields, "string interpolation");
+ }
+
+ /** Performs string interpolation on the template using values from the input {@link Row}. */
+ public String interpolate(Row row) {
+ String interpolated = this.template;
+ for (String field : fieldsToReplace) {
+ List levels = Splitter.on(".").splitToList(field);
+
+ Object val = MoreObjects.firstNonNull(getValue(row, levels, 0), "");
+
+ interpolated = interpolated.replace("{" + field + "}", String.valueOf(val));
+ }
+ return interpolated;
+ }
+
+ /** Like {@link #interpolate(Row)} but also potentially include windowing information. */
+ public String interpolate(Row row, BoundedWindow window, PaneInfo paneInfo, Instant timestamp) {
+ String interpolated = this.template;
+ for (String field : fieldsToReplace) {
+ Object val;
+ switch (field) {
+ case WINDOW:
+ val = window.toString();
+ break;
+ case PANE_INDEX:
+ val = paneInfo.getIndex();
+ break;
+ case YYYY:
+ val = timestamp.getChronology().year().get(timestamp.getMillis());
+ break;
+ case MM:
+ val = timestamp.getChronology().monthOfYear().get(timestamp.getMillis());
+ break;
+ case DD:
+ val = timestamp.getChronology().dayOfMonth().get(timestamp.getMillis());
+ break;
+ default:
+ List levels = Splitter.on(".").splitToList(field);
+ val = MoreObjects.firstNonNull(getValue(row, levels, 0), "");
+ break;
+ }
+
+ interpolated = interpolated.replace("{" + field + "}", String.valueOf(val));
+ }
+ return interpolated;
+ }
+
+ private @Nullable Object getValue(@Nullable Row row, List fieldLevels, int index) {
+ if (row == null || fieldLevels.isEmpty()) {
+ return null;
+ }
+ Preconditions.checkState(
+ index < fieldLevels.size(),
+ "'%s' only goes %s levels deep. Invalid attempt to check for depth at level %s",
+ String.join(".", fieldLevels),
+ fieldLevels.size(),
+ index);
+
+ String currentField = fieldLevels.get(index);
+ Object val;
+ try {
+ val = row.getValue(currentField);
+ } catch (IllegalArgumentException e) {
+ throw new RuntimeException(
+ String.format(
+ "Invalid row does not contain field '%s'.",
+ String.join(".", fieldLevels.subList(0, index + 1))),
+ e);
+ }
+
+ // base case: we've reached the target
+ if (index == fieldLevels.size() - 1) {
+ return val;
+ }
+
+ return getValue((Row) val, fieldLevels, ++index);
+ }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java
new file mode 100644
index 000000000000..d1779f3d8465
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/** Tests for {@link RowFilter}. */
+public class RowFilterTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
+ Schema.builder()
+ .addStringField("doubly_nested_str")
+ .addInt32Field("doubly_nested_int")
+ .build();
+
+ private static final Schema NESTED_ROW_SCHEMA =
+ Schema.builder()
+ .addStringField("nested_str")
+ .addInt32Field("nested_int")
+ .addFloatField("nested_float")
+ .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA)
+ .build();
+ private static final Schema ROW_SCHEMA =
+ Schema.builder()
+ .addStringField("str")
+ .addBooleanField("bool")
+ .addNullableInt32Field("nullable_int")
+ .addArrayField("arr_int", Schema.FieldType.INT32)
+ .addRowField("row", NESTED_ROW_SCHEMA)
+ .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA)
+ .build();
+
+ @Test
+ public void testSchemaValidation() {
+ List> goodFields =
+ Arrays.asList(
+ Arrays.asList("str", "bool", "nullable_row"),
+ Arrays.asList("nullable_int", "arr_int"),
+ Arrays.asList("row.nested_str", "row.nested_row.doubly_nested_str"),
+ Arrays.asList("nullable_row.nested_row.doubly_nested_int"));
+
+ for (List fields : goodFields) {
+ RowFilter.validateSchemaContainsFields(ROW_SCHEMA, fields, "test-operation");
+ }
+ }
+
+ @Test
+ public void testSchemaValidationFailsWithHelpfulErrorForMissingFields() {
+ List, List>> nonExistentFields =
+ Arrays.asList(
+ KV.of(
+ Arrays.asList("nonexistent_1", "nonexistent_2", "nonexistent_3"),
+ Arrays.asList("nonexistent_1", "nonexistent_2", "nonexistent_3")),
+ KV.of(
+ Arrays.asList("nullable_int", "arr_int", "nonexistent"),
+ Collections.singletonList("nonexistent")),
+ KV.of(
+ Arrays.asList(
+ "nullable_row.nested_row.nonexistent", "row.nonexistent", "row.nested_float"),
+ Arrays.asList("nullable_row.nested_row.nonexistent", "row.nonexistent")));
+
+ for (KV, List> fields : nonExistentFields) {
+ List allFields = fields.getKey();
+ List badFields = fields.getValue();
+
+ IllegalArgumentException e =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ RowFilter.validateSchemaContainsFields(ROW_SCHEMA, allFields, "test-operation"));
+
+ assertThat(e.getMessage(), containsString("Validation failed for test-operation"));
+ assertThat(
+ e.getMessage(),
+ containsString("Row Schema does not contain the following specified fields"));
+ for (String badField : badFields) {
+ assertThat(e.getMessage(), containsString(badField));
+ }
+ }
+ }
+
+ @Test
+ public void testSchemaValidationFailsWithHelpfulErrorForInvalidNestedFields() {
+ List, List>> nonNestedFields =
+ Arrays.asList(
+ KV.of(
+ Arrays.asList(
+ "row.nested_row", "row.nested_int", "row.nested_str.unexpected_nested"),
+ Collections.singletonList("row.nested_str")),
+ KV.of(
+ Arrays.asList(
+ "nullable_row.nested_str",
+ "nullable_row.nested_str.unexpected",
+ "row.nested_int.unexpected_2"),
+ Arrays.asList("nullable_row.nested_str", "row.nested_int")));
+
+ for (KV, List> fields : nonNestedFields) {
+ List allFields = fields.getKey();
+ List badFields = fields.getValue();
+
+ IllegalArgumentException e =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ RowFilter.validateSchemaContainsFields(ROW_SCHEMA, allFields, "test-operation"));
+
+ assertThat(e.getMessage(), containsString("Validation failed for test-operation"));
+ assertThat(
+ e.getMessage(),
+ containsString(
+ "The following specified fields are not of type Row. Their nested fields could not be reached"));
+ for (String badField : badFields) {
+ assertThat(e.getMessage(), containsString(badField));
+ }
+ }
+ }
+
+ @Test
+ public void testGetFieldTree() {
+ List fields =
+ Arrays.asList(
+ "top-level",
+ "top-level-2",
+ "top-level.nested-level",
+ "top-level.nested-level-2",
+ "top-level.nested-level.doubly-nested-level",
+ "top-level.nested-level.doubly-nested-level-2");
+ List nestedLayer =
+ Arrays.asList(
+ "nested-level",
+ "nested-level-2",
+ "nested-level.doubly-nested-level",
+ "nested-level.doubly-nested-level-2");
+
+ Map> expectedTree =
+ ImmutableMap.>builder()
+ .put("top-level-2", Collections.emptyList())
+ .put("top-level", nestedLayer)
+ .build();
+
+ assertEquals(expectedTree, RowFilter.getFieldTree(fields));
+
+ List doublyNestedLayer = Arrays.asList("doubly-nested-level", "doubly-nested-level-2");
+
+ Map> expectedNestedTree =
+ ImmutableMap.>builder()
+ .put("nested-level-2", Collections.emptyList())
+ .put("nested-level", doublyNestedLayer)
+ .build();
+
+ assertEquals(expectedNestedTree, RowFilter.getFieldTree(nestedLayer));
+ }
+
+ @Test
+ public void testDropSchemaFields() {
+ List fieldsToDrop =
+ Arrays.asList(
+ "str",
+ "arr_int",
+ "nullable_int",
+ "row.nested_int",
+ "row.nested_float",
+ "row.nested_row.doubly_nested_int",
+ "nullable_row.nested_str",
+ "nullable_row.nested_row");
+
+ Schema expectedDroppedSchema =
+ Schema.builder()
+ .addBooleanField("bool")
+ .addRowField(
+ "row",
+ Schema.builder()
+ .addStringField("nested_str")
+ .addRowField(
+ "nested_row", Schema.builder().addStringField("doubly_nested_str").build())
+ .build())
+ .addNullableRowField(
+ "nullable_row",
+ Schema.builder().addInt32Field("nested_int").addFloatField("nested_float").build())
+ .build();
+
+ assertTrue(expectedDroppedSchema.equivalent(RowFilter.dropFields(ROW_SCHEMA, fieldsToDrop)));
+ }
+
+ @Test
+ public void testKeepSchemaFields() {
+ List fieldsToKeep =
+ Arrays.asList(
+ "str",
+ "arr_int",
+ "nullable_int",
+ "row.nested_int",
+ "row.nested_float",
+ "row.nested_row.doubly_nested_int",
+ "nullable_row.nested_str",
+ "nullable_row.nested_row");
+
+ Schema expectedKeptSchema =
+ Schema.builder()
+ .addStringField("str")
+ .addArrayField("arr_int", Schema.FieldType.INT32)
+ .addNullableInt32Field("nullable_int")
+ .addRowField(
+ "row",
+ Schema.builder()
+ .addInt32Field("nested_int")
+ .addFloatField("nested_float")
+ .addRowField(
+ "nested_row", Schema.builder().addInt32Field("doubly_nested_int").build())
+ .build())
+ .addNullableRowField(
+ "nullable_row",
+ Schema.builder()
+ .addStringField("nested_str")
+ .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA)
+ .build())
+ .build();
+
+ assertTrue(expectedKeptSchema.equivalent(RowFilter.keepFields(ROW_SCHEMA, fieldsToKeep)));
+ }
+
+ private static final Row ORIGINAL_ROW =
+ Row.withSchema(ROW_SCHEMA)
+ .addValue("str_value")
+ .addValue(true)
+ .addValue(123)
+ .addValue(Arrays.asList(1, 2, 3, 4, 5))
+ .addValue(
+ Row.withSchema(NESTED_ROW_SCHEMA)
+ .addValue("nested_str_value")
+ .addValue(456)
+ .addValue(1.234f)
+ .addValue(
+ Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA)
+ .addValue("doubly_nested_str_value")
+ .addValue(789)
+ .build())
+ .build())
+ .addValue(null)
+ .build();
+
+ private static final Schema FILTERED_DOUBLY_NESTED_SCHEMA =
+ Schema.builder().addStringField("doubly_nested_str").build();
+ private static final Schema FILTERED_NESTED_SCHEMA =
+ Schema.builder()
+ .addStringField("nested_str")
+ .addRowField("nested_row", FILTERED_DOUBLY_NESTED_SCHEMA)
+ .build();
+ private static final Schema FILTERED_SCHEMA =
+ Schema.builder()
+ .addStringField("str")
+ .addArrayField("arr_int", Schema.FieldType.INT32)
+ .addRowField("row", FILTERED_NESTED_SCHEMA)
+ .build();
+
+ private static final Row FILTERED_ROW =
+ Row.withSchema(FILTERED_SCHEMA)
+ .addValue("str_value")
+ .addValue(Arrays.asList(1, 2, 3, 4, 5))
+ .addValue(
+ Row.withSchema(FILTERED_NESTED_SCHEMA)
+ .addValue("nested_str_value")
+ .addValue(
+ Row.withSchema(FILTERED_DOUBLY_NESTED_SCHEMA)
+ .addValue("doubly_nested_str_value")
+ .build())
+ .build())
+ .build();
+
+ @Test
+ public void testCopyRowWithNewSchema() {
+ assertEquals(FILTERED_ROW, RowFilter.copyWithNewSchema(ORIGINAL_ROW, FILTERED_SCHEMA));
+ }
+
+ @Test
+ public void testDropRowFields() {
+ RowFilter rowFilter =
+ new RowFilter(ROW_SCHEMA)
+ .dropping(
+ Arrays.asList(
+ "bool",
+ "nullable_int",
+ "row.nested_int",
+ "row.nested_float",
+ "row.nested_row.doubly_nested_int",
+ "nullable_row"));
+
+ assertEquals(FILTERED_ROW, rowFilter.filter(ORIGINAL_ROW));
+ }
+
+ @Test
+ public void testKeepRowFields() {
+ RowFilter rowFilter =
+ new RowFilter(ROW_SCHEMA)
+ .keeping(
+ Arrays.asList(
+ "str", "arr_int", "row.nested_str", "row.nested_row.doubly_nested_str"));
+
+ assertEquals(FILTERED_ROW, rowFilter.filter(ORIGINAL_ROW));
+ }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java
new file mode 100644
index 000000000000..bcdf7f6a394d
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/** Test class for {@link RowStringInterpolator}. */
+public class RowStringInterpolatorTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
+ Schema.builder()
+ .addStringField("doubly_nested_str")
+ .addInt32Field("doubly_nested_int")
+ .build();
+
+ private static final Schema NESTED_ROW_SCHEMA =
+ Schema.builder()
+ .addStringField("nested_str")
+ .addInt32Field("nested_int")
+ .addFloatField("nested_float")
+ .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA)
+ .build();
+ private static final Schema ROW_SCHEMA =
+ Schema.builder()
+ .addStringField("str")
+ .addBooleanField("bool")
+ .addInt32Field("int")
+ .addNullableInt32Field("nullable_int")
+ .addArrayField("arr_int", Schema.FieldType.INT32)
+ .addRowField("row", NESTED_ROW_SCHEMA)
+ .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA)
+ .build();
+
+ @Test
+ public void testInvalidRowThrowsHelpfulError() {
+ String template = "foo {str}";
+ RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);
+
+ Row invalidRow = Row.nullRow(Schema.builder().addNullableStringField("xyz").build());
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Invalid row does not contain field 'str'.");
+
+ interpolator.interpolate(invalidRow);
+ }
+
+ @Test
+ public void testInvalidRowThrowsHelpfulErrorForNestedFields() {
+ String template = "foo {row.nested_int}";
+ RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);
+
+ Schema nestedSchema = Schema.builder().addNullableStringField("xyz").build();
+ Row invalidRow =
+ Row.withSchema(Schema.builder().addNullableRowField("row", nestedSchema).build())
+ .addValue(Row.nullRow(nestedSchema))
+ .build();
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Invalid row does not contain field 'row.nested_int'.");
+
+ interpolator.interpolate(invalidRow);
+ }
+
+ @Test
+ public void testInvalidRowThrowsHelpfulErrorForDoublyNestedFields() {
+ String template = "foo {row.nested_row.doubly_nested_int}";
+ RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);
+
+ Schema doublyNestedSchema = Schema.builder().addNullableStringField("xyz").build();
+ Schema nestedSchema =
+ Schema.builder().addNullableRowField("nested_row", doublyNestedSchema).build();
+ Row invalidRow =
+ Row.withSchema(Schema.builder().addNullableRowField("row", doublyNestedSchema).build())
+ .addValue(
+ Row.withSchema(nestedSchema).addValue(Row.nullRow(doublyNestedSchema)).build())
+ .build();
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Invalid row does not contain field 'row.nested_row.doubly_nested_int'.");
+
+ interpolator.interpolate(invalidRow);
+ }
+
+ private static final Row ROW =
+ Row.withSchema(ROW_SCHEMA)
+ .addValue("str_value")
+ .addValue(true)
+ .addValue(123)
+ .addValue(null)
+ .addValue(Arrays.asList(1, 2, 3, 4, 5))
+ .addValue(
+ Row.withSchema(NESTED_ROW_SCHEMA)
+ .addValue("nested_str_value")
+ .addValue(456)
+ .addValue(1.234f)
+ .addValue(
+ Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA)
+ .addValue("doubly_nested_str_value")
+ .addValue(789)
+ .build())
+ .build())
+ .addValue(null)
+ .build();
+
+ @Test
+ public void testTopLevelInterpolation() {
+ String template = "foo {str}, bar {bool}, baz {int}, xyz {nullable_int}";
+ RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);
+
+ String output = interpolator.interpolate(ROW);
+
+ assertEquals("foo str_value, bar true, baz 123, xyz ", output);
+ }
+
+ @Test
+ public void testNestedLevelInterpolation() {
+ String template = "foo {str}, bar {row.nested_str}, baz {row.nested_float}";
+ RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);
+
+ String output = interpolator.interpolate(ROW);
+
+ assertEquals("foo str_value, bar nested_str_value, baz 1.234", output);
+ }
+
+ @Test
+ public void testDoublyNestedInterpolation() {
+ String template =
+ "foo {str}, bar {row.nested_row.doubly_nested_str}, baz {row.nested_row.doubly_nested_int}";
+ RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);
+
+ String output = interpolator.interpolate(ROW);
+
+ assertEquals("foo str_value, bar doubly_nested_str_value, baz 789", output);
+ }
+
+ @Test
+ public void testInterpolateWindowingInformation() {
+ String template =
+ String.format(
+ "str: {str}, window: {%s}, pane: {%s}, year: {%s}, month: {%s}, day: {%s}",
+ RowStringInterpolator.WINDOW,
+ RowStringInterpolator.PANE_INDEX,
+ RowStringInterpolator.YYYY,
+ RowStringInterpolator.MM,
+ RowStringInterpolator.DD);
+
+ RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);
+
+ Instant instant = new DateTime(2024, 8, 28, 12, 0).toInstant();
+
+ String output =
+ interpolator.interpolate(
+ ROW,
+ GlobalWindow.INSTANCE,
+ PaneInfo.createPane(false, false, PaneInfo.Timing.ON_TIME, 2, 0),
+ instant);
+ String expected =
+ String.format(
+ "str: str_value, window: %s, pane: 2, year: 2024, month: 8, day: 28",
+ GlobalWindow.INSTANCE);
+
+ assertEquals(expected, output);
+ }
+}
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java
index 4008fcc6f5bd..82669c90fbba 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java
@@ -21,8 +21,11 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
/**
* Assigns the destination metadata for each input record.
@@ -32,7 +35,7 @@
*/
class AssignDestinations extends PTransform, PCollection> {
- private DynamicDestinations dynamicDestinations;
+ private final DynamicDestinations dynamicDestinations;
public AssignDestinations(DynamicDestinations dynamicDestinations) {
this.dynamicDestinations = dynamicDestinations;
@@ -41,11 +44,10 @@ public AssignDestinations(DynamicDestinations dynamicDestinations) {
@Override
public PCollection expand(PCollection input) {
- final Schema inputSchema = input.getSchema();
final Schema outputSchema =
Schema.builder()
- .addRowField("data", inputSchema)
- .addRowField("dest", dynamicDestinations.getMetadataSchema())
+ .addStringField("dest")
+ .addRowField("data", dynamicDestinations.getDataSchema())
.build();
return input
@@ -53,11 +55,19 @@ public PCollection expand(PCollection input) {
ParDo.of(
new DoFn() {
@ProcessElement
- public void processElement(@Element Row data, OutputReceiver out) {
+ public void processElement(
+ @Element Row element,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ @Timestamp Instant timestamp,
+ OutputReceiver out) {
+ String tableIdentifier =
+ dynamicDestinations.getDestinationIdentifier(
+ element, window, paneInfo, timestamp);
+ Row data = dynamicDestinations.getData(element);
+
out.output(
- Row.withSchema(outputSchema)
- .addValues(data, dynamicDestinations.assignDestinationMetadata(data))
- .build());
+ Row.withSchema(outputSchema).addValues(tableIdentifier, data).build());
}
}))
.setRowSchema(outputSchema);
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/DynamicDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/DynamicDestinations.java
index 6fc3c139bdc1..c92ee54ae82b 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/DynamicDestinations.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/DynamicDestinations.java
@@ -19,18 +19,28 @@
import java.io.Serializable;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.Row;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.joda.time.Instant;
public interface DynamicDestinations extends Serializable {
- Schema getMetadataSchema();
+ Schema getDataSchema();
- Row assignDestinationMetadata(Row data);
+ Row getData(Row element);
- IcebergDestination instantiateDestination(Row dest);
+ String getDestinationIdentifier(Row element);
- static DynamicDestinations singleTable(TableIdentifier tableId) {
- return new OneTableDynamicDestinations(tableId);
+ default String getDestinationIdentifier(
+ Row element, BoundedWindow window, PaneInfo paneInfo, Instant timestamp) {
+ return getDestinationIdentifier(element);
+ }
+
+ IcebergDestination instantiateDestination(String destination);
+
+ static DynamicDestinations singleTable(TableIdentifier tableId, Schema inputSchema) {
+ return new OneTableDynamicDestinations(tableId, inputSchema);
}
}
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
index c3c1da7c7885..8eb70bf0291b 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
@@ -87,7 +87,8 @@ public IcebergWriteResult expand(PCollection input) {
DynamicDestinations destinations = getDynamicDestinations();
if (destinations == null) {
destinations =
- DynamicDestinations.singleTable(Preconditions.checkNotNull(getTableIdentifier()));
+ DynamicDestinations.singleTable(
+ Preconditions.checkNotNull(getTableIdentifier()), input.getSchema());
}
return input
.apply("Set Destination Metadata", new AssignDestinations(destinations))
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
index 3f0f88946d9c..465df5505cc4 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
@@ -34,7 +34,7 @@
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.FileFormat;
/**
* SchemaTransform implementation for {@link IcebergIO#writeRows}. Writes Beam Rows to Iceberg and
@@ -106,7 +106,13 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
IcebergWriteResult result =
rows.apply(
IcebergIO.writeRows(configuration.getIcebergCatalog())
- .to(TableIdentifier.parse(configuration.getTable())));
+ .to(
+ new PortableIcebergDestinations(
+ configuration.getTable(),
+ FileFormat.PARQUET.toString(),
+ rows.getSchema(),
+ configuration.getDrop(),
+ configuration.getKeep())));
PCollection snapshots =
result
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java
index e09fdf171fd6..18a1c559e6c8 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.iceberg;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
@@ -24,45 +26,47 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
class OneTableDynamicDestinations implements DynamicDestinations, Externalizable {
-
- private static final Schema EMPTY_SCHEMA = Schema.builder().build();
- private static final Row EMPTY_ROW = Row.nullRow(EMPTY_SCHEMA);
-
// TableId represented as String for serializability
private transient @MonotonicNonNull String tableIdString;
private transient @MonotonicNonNull TableIdentifier tableId;
+ private transient @MonotonicNonNull Schema rowSchema;
@VisibleForTesting
TableIdentifier getTableIdentifier() {
if (tableId == null) {
- tableId = TableIdentifier.parse(Preconditions.checkNotNull(tableIdString));
+ tableId = TableIdentifier.parse(checkStateNotNull(tableIdString));
}
return tableId;
}
- OneTableDynamicDestinations(TableIdentifier tableId) {
+ OneTableDynamicDestinations(TableIdentifier tableId, Schema rowSchema) {
this.tableIdString = tableId.toString();
+ this.rowSchema = rowSchema;
+ }
+
+ @Override
+ public Schema getDataSchema() {
+ return checkStateNotNull(rowSchema);
}
@Override
- public Schema getMetadataSchema() {
- return EMPTY_SCHEMA;
+ public Row getData(Row element) {
+ return element;
}
@Override
- public Row assignDestinationMetadata(Row data) {
- return EMPTY_ROW;
+ public String getDestinationIdentifier(Row element) {
+ return checkStateNotNull(tableIdString);
}
@Override
- public IcebergDestination instantiateDestination(Row dest) {
+ public IcebergDestination instantiateDestination(String unused) {
return IcebergDestination.builder()
.setTableIdentifier(getTableIdentifier())
.setTableCreateConfig(null)
@@ -75,7 +79,7 @@ public OneTableDynamicDestinations() {}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
- out.writeUTF(Preconditions.checkNotNull(tableIdString));
+ out.writeUTF(checkStateNotNull(tableIdString));
}
@Override
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java
new file mode 100644
index 000000000000..8403f346d30d
--- /dev/null
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.RowFilter;
+import org.apache.beam.sdk.util.RowStringInterpolator;
+import org.apache.beam.sdk.values.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+class PortableIcebergDestinations implements DynamicDestinations {
+ private final RowFilter rowFilter;
+ private final RowStringInterpolator interpolator;
+ private final String fileFormat;
+
+ public PortableIcebergDestinations(
+ String destinationTemplate,
+ String fileFormat,
+ Schema inputSchema,
+ @Nullable List fieldsToDrop,
+ @Nullable List fieldsToKeep) {
+ interpolator = new RowStringInterpolator(destinationTemplate, inputSchema);
+ RowFilter rf = new RowFilter(inputSchema);
+
+ if (fieldsToDrop != null) {
+ rf = rf.dropping(fieldsToDrop);
+ }
+ if (fieldsToKeep != null) {
+ rf = rf.keeping(fieldsToKeep);
+ }
+ rowFilter = rf;
+ this.fileFormat = fileFormat;
+ }
+
+ @Override
+ public Schema getDataSchema() {
+ return rowFilter.outputSchema();
+ }
+
+ @Override
+ public Row getData(Row element) {
+ return rowFilter.filter(element);
+ }
+
+ @Override
+ public String getDestinationIdentifier(Row element) {
+ return interpolator.interpolate(element);
+ }
+
+ @Override
+ public String getDestinationIdentifier(
+ Row element, BoundedWindow window, PaneInfo paneInfo, Instant timestamp) {
+ return interpolator.interpolate(element, window, paneInfo, timestamp);
+ }
+
+ @Override
+ public IcebergDestination instantiateDestination(String dest) {
+ return IcebergDestination.builder()
+ .setTableIdentifier(TableIdentifier.parse(dest))
+ .setTableCreateConfig(null)
+ .setFileFormat(FileFormat.fromString(fileFormat))
+ .build();
+ }
+}
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaTransformConfiguration.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaTransformConfiguration.java
index 6e7a12aa15af..9e5776a23dbe 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaTransformConfiguration.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaTransformConfiguration.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.iceberg;
import com.google.auto.value.AutoValue;
+import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
@@ -34,6 +35,15 @@ public static Builder builder() {
@SchemaFieldDescription("Identifier of the Iceberg table.")
public abstract String getTable();
+ @SchemaFieldDescription("When specified, only these columns will be written to the table.")
+ @Nullable
+ public abstract List getKeep();
+
+ @SchemaFieldDescription(
+ "When specified, these columns will be removed before writing to the table.")
+ @Nullable
+ public abstract List getDrop();
+
@SchemaFieldDescription("Name of the catalog containing the table.")
@Nullable
public abstract String getCatalogName();
@@ -50,6 +60,10 @@ public static Builder builder() {
public abstract static class Builder {
public abstract Builder setTable(String table);
+ public abstract Builder setKeep(List fieldsToKeep);
+
+ public abstract Builder setDrop(List fieldsToDrop);
+
public abstract Builder setCatalogName(String catalogName);
public abstract Builder setCatalogProperties(Map catalogProperties);
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
index 65cc3f3c3059..0bc18ffcf421 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
@@ -36,7 +36,7 @@
class WriteGroupedRowsToFiles
extends PTransform<
- PCollection, Iterable>>, PCollection> {
+ PCollection, Iterable>>, PCollection> {
static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 40); // 1TB
@@ -51,7 +51,7 @@ class WriteGroupedRowsToFiles
@Override
public PCollection expand(
- PCollection, Iterable>> input) {
+ PCollection, Iterable>> input) {
return input.apply(
ParDo.of(
new WriteGroupedRowsToFilesDoFn(
@@ -59,7 +59,7 @@ public PCollection expand(
}
private static class WriteGroupedRowsToFilesDoFn
- extends DoFn, Iterable>, FileWriteResult> {
+ extends DoFn, Iterable>, FileWriteResult> {
private final DynamicDestinations dynamicDestinations;
private final IcebergCatalogConfig catalogConfig;
@@ -87,13 +87,13 @@ private org.apache.iceberg.catalog.Catalog getCatalog() {
@ProcessElement
public void processElement(
ProcessContext c,
- @Element KV, Iterable> element,
+ @Element KV, Iterable> element,
BoundedWindow window,
PaneInfo pane)
throws Exception {
- Row destMetadata = element.getKey().getKey();
- IcebergDestination destination = dynamicDestinations.instantiateDestination(destMetadata);
+ String tableIdentifier = element.getKey().getKey();
+ IcebergDestination destination = dynamicDestinations.instantiateDestination(tableIdentifier);
WindowedValue windowedDestination =
WindowedValue.of(destination, window.maxTimestamp(), window, pane);
RecordWriterManager writer;
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java
index 65fd551c782a..1393b23713b2 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java
@@ -22,6 +22,7 @@
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.ShardedKeyCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -33,6 +34,7 @@
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ShardedKey;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
class WriteToDestinations extends PTransform, IcebergWriteResult> {
@@ -60,15 +62,9 @@ public IcebergWriteResult expand(PCollection input) {
new WriteUngroupedRowsToFiles(catalogConfig, dynamicDestinations));
// Then write the rest by shuffling on the destination metadata
- Schema destSchema =
- checkArgumentNotNull(
- writeUngroupedResult
- .getSpilledRows()
- .getSchema()
- .getField("dest")
- .getType()
- .getRowSchema(),
- "Input schema missing `dest` field.");
+ Preconditions.checkState(
+ writeUngroupedResult.getSpilledRows().getSchema().hasField("dest"),
+ "Input schema missing `dest` field.");
Schema dataSchema =
checkArgumentNotNull(
writeUngroupedResult
@@ -85,24 +81,23 @@ public IcebergWriteResult expand(PCollection input) {
.apply(
"Key by destination and shard",
MapElements.via(
- new SimpleFunction, Row>>() {
+ new SimpleFunction, Row>>() {
private static final int SPILLED_ROWS_SHARDING_FACTOR = 10;
private int shardNumber = 0;
@Override
- public KV, Row> apply(Row elem) {
+ public KV, Row> apply(Row elem) {
Row data =
checkArgumentNotNull(
elem.getRow("data"), "Element missing `data` field");
- Row dest =
+ String dest =
checkArgumentNotNull(
- elem.getRow("dest"), "Element missing `dest` field");
+ elem.getString("dest"), "Element missing `dest` field");
return KV.of(
ShardedKey.of(dest, shardNumber % SPILLED_ROWS_SHARDING_FACTOR), data);
}
}))
- .setCoder(
- KvCoder.of(ShardedKeyCoder.of(RowCoder.of(destSchema)), RowCoder.of(dataSchema)))
+ .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), RowCoder.of(dataSchema)))
.apply("Group spilled rows by destination shard", GroupByKey.create())
.apply(
"Write remaining rows to files",
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
index 0ca06d797750..b4160d2496f4 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
@@ -209,10 +209,10 @@ public void processElement(
@Element Row element, BoundedWindow window, PaneInfo pane, MultiOutputReceiver out)
throws Exception {
+ String dest =
+ checkArgumentNotNull(element.getString("dest"), "Input row missing `dest` field.");
Row data = checkArgumentNotNull(element.getRow("data"), "Input row missing `data` field.");
- Row destMetadata =
- checkArgumentNotNull(element.getRow("dest"), "Input row missing `dest` field.");
- IcebergDestination destination = dynamicDestinations.instantiateDestination(destMetadata);
+ IcebergDestination destination = dynamicDestinations.instantiateDestination(dest);
WindowedValue windowedDestination =
WindowedValue.of(destination, window.maxTimestamp(), window, pane);
diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
index 2e748e9644e8..51f3455932a6 100644
--- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
@@ -17,23 +17,29 @@
*/
package org.apache.beam.sdk.io.iceberg;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
+import java.util.stream.Stream;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.util.RowFilter;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -43,6 +49,7 @@
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Catalog;
@@ -57,6 +64,7 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -70,6 +78,7 @@
public class IcebergIOIT implements Serializable {
private static final org.apache.beam.sdk.schemas.Schema DOUBLY_NESTED_ROW_SCHEMA =
org.apache.beam.sdk.schemas.Schema.builder()
+ .addStringField("char")
.addStringField("doubly_nested_str")
.addInt64Field("doubly_nested_float")
.build();
@@ -103,6 +112,7 @@ public Row apply(Long num) {
.addValue("nested_str_value_" + strNum)
.addValue(
Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA)
+ .addValue(String.valueOf((char) (97 + num % 5)))
.addValue("doubly_nested_str_value_" + strNum)
.addValue(num)
.build())
@@ -145,7 +155,7 @@ public Record apply(Row input) {
private String warehouseLocation;
- private TableIdentifier tableId;
+ private String tableId;
private Catalog catalog;
@BeforeClass
@@ -162,7 +172,7 @@ public void setUp() {
warehouseLocation =
String.format("%s/IcebergIOIT/%s", options.getTempLocation(), UUID.randomUUID());
- tableId = TableIdentifier.of(testName.getMethodName(), "test_table");
+ tableId = testName.getMethodName() + ".test_table";
catalog = new HadoopCatalog(catalogHadoopConf, warehouseLocation);
}
@@ -204,7 +214,7 @@ private List populateTable(Table table) throws IOException {
}
private List readRecords(Table table) {
- TableScan tableScan = table.newScan().project(ICEBERG_SCHEMA);
+ TableScan tableScan = table.newScan().project(table.schema());
List writtenRecords = new ArrayList<>();
for (CombinedScanTask task : tableScan.planTasks()) {
InputFilesDecryptor descryptor =
@@ -214,9 +224,9 @@ private List readRecords(Table table) {
CloseableIterable iterable =
Parquet.read(inputFile)
.split(fileTask.start(), fileTask.length())
- .project(ICEBERG_SCHEMA)
+ .project(table.schema())
.createReaderFunc(
- fileSchema -> GenericParquetReaders.buildReader(ICEBERG_SCHEMA, fileSchema))
+ fileSchema -> GenericParquetReaders.buildReader(table.schema(), fileSchema))
.filter(fileTask.residual())
.build();
@@ -228,9 +238,9 @@ private List readRecords(Table table) {
return writtenRecords;
}
- private Map managedIcebergConfig() {
+ private Map managedIcebergConfig(String tableId) {
return ImmutableMap.builder()
- .put("table", tableId.toString())
+ .put("table", tableId)
.put("catalog_name", "test-name")
.put(
"catalog_properties",
@@ -248,11 +258,11 @@ private Map managedIcebergConfig() {
*/
@Test
public void testRead() throws Exception {
- Table table = catalog.createTable(tableId, ICEBERG_SCHEMA);
+ Table table = catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA);
List expectedRows = populateTable(table);
- Map config = managedIcebergConfig();
+ Map config = managedIcebergConfig(tableId);
PCollection rows =
pipeline.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection();
@@ -270,10 +280,10 @@ public void testRead() throws Exception {
*/
@Test
public void testWrite() {
- Table table = catalog.createTable(tableId, ICEBERG_SCHEMA);
+ Table table = catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA);
// Write with Beam
- Map config = managedIcebergConfig();
+ Map config = managedIcebergConfig(tableId);
PCollection input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA);
input.apply(Managed.write(Managed.ICEBERG).withConfig(config));
pipeline.run().waitUntilFinish();
@@ -294,10 +304,11 @@ public void testWritePartitionedData() {
.identity("modulo_5")
.truncate("str", "value_x".length())
.build();
- Table table = catalog.createTable(tableId, ICEBERG_SCHEMA, partitionSpec);
+ Table table =
+ catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA, partitionSpec);
// Write with Beam
- Map config = managedIcebergConfig();
+ Map config = managedIcebergConfig(tableId);
PCollection input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA);
input.apply(Managed.write(Managed.ICEBERG).withConfig(config));
pipeline.run().waitUntilFinish();
@@ -307,4 +318,79 @@ public void testWritePartitionedData() {
assertThat(
returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray()));
}
+
+ /**
+ * @param drop if null, just perform a normal dynamic destination write test; if true, also drop
+ * some fields from input rows; if false, also keep only some fields from input rows.
+ */
+ private void writeToDynamicDestinationsAndMaybeFilter(@Nullable Boolean drop) {
+ String tableIdentifierTemplate = tableId + "_{modulo_5}_{row.nested_row.char}";
+ Map writeConfig = new HashMap<>(managedIcebergConfig(tableIdentifierTemplate));
+
+ List fieldsToFilter =
+ Arrays.asList(
+ "str",
+ "int",
+ "nullable_long",
+ "row.nested_str",
+ "row.nested_int",
+ "row.nested_row.doubly_nested_str");
+ // an un-configured filter will just return the same row
+ RowFilter rowFilter = new RowFilter(BEAM_SCHEMA);
+ if (drop != null) {
+ rowFilter = drop ? rowFilter.dropping(fieldsToFilter) : rowFilter.keeping(fieldsToFilter);
+ writeConfig.put(drop ? "drop" : "keep", fieldsToFilter);
+ }
+
+ Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(rowFilter.outputSchema());
+
+ Table table0 = catalog.createTable(TableIdentifier.parse(tableId + "_0_a"), tableSchema);
+ Table table1 = catalog.createTable(TableIdentifier.parse(tableId + "_1_b"), tableSchema);
+ Table table2 = catalog.createTable(TableIdentifier.parse(tableId + "_2_c"), tableSchema);
+ Table table3 = catalog.createTable(TableIdentifier.parse(tableId + "_3_d"), tableSchema);
+ Table table4 = catalog.createTable(TableIdentifier.parse(tableId + "_4_e"), tableSchema);
+
+ // Write with Beam
+ PCollection input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA);
+ input.apply(Managed.write(Managed.ICEBERG).withConfig(writeConfig));
+ pipeline.run().waitUntilFinish();
+
+ // Read back and check records are correct
+ List> returnedRecords = new ArrayList<>(5);
+ returnedRecords.add(readRecords(table0));
+ returnedRecords.add(readRecords(table1));
+ returnedRecords.add(readRecords(table2));
+ returnedRecords.add(readRecords(table3));
+ returnedRecords.add(readRecords(table4));
+
+ SerializableFunction recordFunc =
+ row -> IcebergUtils.beamRowToIcebergRecord(tableSchema, row);
+
+ for (int i = 0; i < returnedRecords.size(); i++) {
+ List records = returnedRecords.get(i);
+ long l = i;
+ Stream expectedRecords =
+ INPUT_ROWS.stream()
+ .filter(r -> checkStateNotNull(r.getInt64("modulo_5")) == l)
+ .map(rowFilter::filter)
+ .map(recordFunc::apply);
+
+ assertThat(records, containsInAnyOrder(expectedRecords.toArray()));
+ }
+ }
+
+ @Test
+ public void testWriteToDynamicDestinations() {
+ writeToDynamicDestinationsAndMaybeFilter(null);
+ }
+
+ @Test
+ public void testWriteToDynamicDestinationsAndDropFields() {
+ writeToDynamicDestinationsAndMaybeFilter(true);
+ }
+
+ @Test
+ public void testWriteToDynamicDestinationsAndKeepFields() {
+ writeToDynamicDestinationsAndMaybeFilter(false);
+ }
}
diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
index 2abe6b093481..336d905837a7 100644
--- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
@@ -128,25 +128,26 @@ public void testDynamicDestinationsWithoutSpillover() throws Exception {
DynamicDestinations dynamicDestinations =
new DynamicDestinations() {
- private final Schema schema = Schema.builder().addInt64Field("tableNumber").build();
+ @Override
+ public Schema getDataSchema() {
+ return IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
+ }
@Override
- public Schema getMetadataSchema() {
- return schema;
+ public Row getData(Row element) {
+ return element;
}
@Override
- public Row assignDestinationMetadata(Row data) {
- long rowId = data.getInt64("id");
- return Row.withSchema(schema).addValues((rowId / 3) + 1).build();
+ public String getDestinationIdentifier(Row element) {
+ long tableNumber = element.getInt64("id") / 3 + 1;
+ return String.format("default.table%s-%s", tableNumber, salt);
}
@Override
- public IcebergDestination instantiateDestination(Row dest) {
+ public IcebergDestination instantiateDestination(String dest) {
return IcebergDestination.builder()
- .setTableIdentifier(
- TableIdentifier.of(
- "default", "table" + dest.getInt64("tableNumber") + "-" + salt))
+ .setTableIdentifier(TableIdentifier.parse(dest))
.setFileFormat(FileFormat.PARQUET)
.build();
}
@@ -223,25 +224,26 @@ public void testDynamicDestinationsWithSpillover() throws Exception {
DynamicDestinations dynamicDestinations =
new DynamicDestinations() {
- private final Schema schema = Schema.builder().addInt64Field("tableNumber").build();
+ @Override
+ public Schema getDataSchema() {
+ return IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
+ }
@Override
- public Schema getMetadataSchema() {
- return schema;
+ public Row getData(Row element) {
+ return element;
}
@Override
- public Row assignDestinationMetadata(Row data) {
- long rowId = data.getInt64("id");
- return Row.withSchema(schema).addValues(rowId % numDestinations).build();
+ public String getDestinationIdentifier(Row element) {
+ long tableNumber = element.getInt64("id") % numDestinations;
+ return String.format("default.table%s-%s", tableNumber, salt);
}
@Override
- public IcebergDestination instantiateDestination(Row dest) {
+ public IcebergDestination instantiateDestination(String dest) {
return IcebergDestination.builder()
- .setTableIdentifier(
- TableIdentifier.of(
- "default", "table" + dest.getInt64("tableNumber") + "-" + salt))
+ .setTableIdentifier(TableIdentifier.parse(dest))
.setFileFormat(FileFormat.PARQUET)
.build();
}
diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
index 6b555e7e14d0..3fb08c68966a 100644
--- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
@@ -22,19 +22,24 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.RowFilter;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -108,7 +113,8 @@ public void testSimpleAppend() {
.apply("Append To Table", new IcebergWriteSchemaTransformProvider().from(config))
.get(OUTPUT_TAG);
- PAssert.that(result).satisfies(new VerifyOutputs(identifier, "append"));
+ PAssert.that(result)
+ .satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append"));
testPipeline.run().waitUntilFinish();
@@ -139,7 +145,8 @@ public void testWriteUsingManagedTransform() {
PCollection result =
inputRows.apply(Managed.write(Managed.ICEBERG).withConfig(configMap)).get(OUTPUT_TAG);
- PAssert.that(result).satisfies(new VerifyOutputs(identifier, "append"));
+ PAssert.that(result)
+ .satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append"));
testPipeline.run().waitUntilFinish();
@@ -147,12 +154,158 @@ public void testWriteUsingManagedTransform() {
assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
}
+ @Test
+ public void testWriteToDynamicDestinationsUsingManagedTransform() {
+ String salt = Long.toString(UUID.randomUUID().hashCode(), 16);
+
+ String identifier0 = "default.table_0_" + salt;
+ String identifier1 = "default.table_1_" + salt;
+ String identifier2 = "default.table_2_" + salt;
+ Table table0 = warehouse.createTable(TableIdentifier.parse(identifier0), TestFixtures.SCHEMA);
+ Table table1 = warehouse.createTable(TableIdentifier.parse(identifier1), TestFixtures.SCHEMA);
+ Table table2 = warehouse.createTable(TableIdentifier.parse(identifier2), TestFixtures.SCHEMA);
+
+ String identifierTemplate = "default.table_{id}_" + salt;
+
+ Map writeConfig =
+ ImmutableMap.builder()
+ .put("table", identifierTemplate)
+ .put("catalog_name", "test-name")
+ .put(
+ "catalog_properties",
+ ImmutableMap.builder()
+ .put("type", "hadoop")
+ .put("warehouse", warehouse.location)
+ .build())
+ .build();
+
+ PCollection inputRows =
+ testPipeline
+ .apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1)))
+ .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA));
+ PCollection result =
+ inputRows
+ .apply(Managed.write(Managed.ICEBERG).withConfig(writeConfig))
+ .getSinglePCollection();
+
+ PAssert.that(result)
+ .satisfies(
+ new VerifyOutputs(Arrays.asList(identifier0, identifier1, identifier2), "append"));
+
+ testPipeline.run().waitUntilFinish();
+
+ List table0Records = ImmutableList.copyOf(IcebergGenerics.read(table0).build());
+ List table1Records = ImmutableList.copyOf(IcebergGenerics.read(table1).build());
+ List table2Records = ImmutableList.copyOf(IcebergGenerics.read(table2).build());
+
+ assertThat(table0Records, Matchers.contains(TestFixtures.FILE1SNAPSHOT1.get(0)));
+ assertThat(table1Records, Matchers.contains(TestFixtures.FILE1SNAPSHOT1.get(1)));
+ assertThat(table2Records, Matchers.contains(TestFixtures.FILE1SNAPSHOT1.get(2)));
+ }
+
+ /** @param drop if true, will test dropping fields. if false, will test keeping fields */
+ private void writeToDynamicDestinationsAndFilter(boolean drop) {
+ String salt = Long.toString(UUID.randomUUID().hashCode(), 16);
+
+ Schema nestedSchema =
+ Schema.builder().addNullableStringField("str").addInt64Field("long").build();
+ Schema beamSchema =
+ Schema.builder()
+ .addNullableInt32Field("id")
+ .addStringField("name")
+ .addFloatField("cost")
+ .addRowField("nested", nestedSchema)
+ .build();
+
+ // (drop) we drop these fields from our iceberg table, so we drop them from our input rows
+ // (keep) we want to include only these fields in our iceberg table, so we keep them and drop
+ // everything else
+ List filteredFields = Arrays.asList("id", "nested.long");
+ RowFilter filter = new RowFilter(beamSchema);
+ filter = drop ? filter.dropping(filteredFields) : filter.keeping(filteredFields);
+ org.apache.iceberg.Schema icebergSchema =
+ IcebergUtils.beamSchemaToIcebergSchema(filter.outputSchema());
+
+ String identifierTemplate = "default.table_{id}_{nested.str}_" + salt;
+ String identifier0 = "default.table_0_x_" + salt;
+ String identifier1 = "default.table_1_y_" + salt;
+ String identifier2 = "default.table_2_z_" + salt;
+ Table table0 = warehouse.createTable(TableIdentifier.parse(identifier0), icebergSchema);
+ Table table1 = warehouse.createTable(TableIdentifier.parse(identifier1), icebergSchema);
+ Table table2 = warehouse.createTable(TableIdentifier.parse(identifier2), icebergSchema);
+
+ Map writeConfig =
+ ImmutableMap.builder()
+ .put("table", identifierTemplate)
+ .put(drop ? "drop" : "keep", filteredFields)
+ .put("catalog_name", "test-name")
+ .put(
+ "catalog_properties",
+ ImmutableMap.builder()
+ .put("type", "hadoop")
+ .put("warehouse", warehouse.location)
+ .build())
+ .build();
+
+ List rows =
+ Arrays.asList(
+ Row.withSchema(beamSchema)
+ .addValues(0, "a", 1.23f, Row.withSchema(nestedSchema).addValues("x", 1L).build())
+ .build(),
+ Row.withSchema(beamSchema)
+ .addValues(1, "b", 4.56f, Row.withSchema(nestedSchema).addValues("y", 2L).build())
+ .build(),
+ Row.withSchema(beamSchema)
+ .addValues(2, "c", 7.89f, Row.withSchema(nestedSchema).addValues("z", 3L).build())
+ .build());
+
+ PCollection inputRows =
+ testPipeline.apply("Records To Add", Create.of(rows)).setRowSchema(beamSchema);
+ PCollection result =
+ inputRows
+ .apply(Managed.write(Managed.ICEBERG).withConfig(writeConfig))
+ .getSinglePCollection();
+
+ PAssert.that(result)
+ .satisfies(
+ new VerifyOutputs(Arrays.asList(identifier0, identifier1, identifier2), "append"));
+
+ testPipeline.run().waitUntilFinish();
+
+ List table0Records = ImmutableList.copyOf(IcebergGenerics.read(table0).build());
+ List table1Records = ImmutableList.copyOf(IcebergGenerics.read(table1).build());
+ List table2Records = ImmutableList.copyOf(IcebergGenerics.read(table2).build());
+
+ assertThat(
+ table0Records,
+ Matchers.contains(
+ IcebergUtils.beamRowToIcebergRecord(icebergSchema, filter.filter(rows.get(0)))));
+ assertThat(
+ table1Records,
+ Matchers.contains(
+ IcebergUtils.beamRowToIcebergRecord(icebergSchema, filter.filter(rows.get(1)))));
+ assertThat(
+ table2Records,
+ Matchers.contains(
+ IcebergUtils.beamRowToIcebergRecord(icebergSchema, filter.filter(rows.get(2)))));
+ }
+
+ @Test
+ public void testWriteToDynamicDestinationsAndDropFields() {
+ writeToDynamicDestinationsAndFilter(true);
+ }
+
+ @Test
+ public void testWriteToDynamicDestinationsAndKeepFields() {
+ writeToDynamicDestinationsAndFilter(false);
+ }
+
private static class VerifyOutputs implements SerializableFunction, Void> {
- private final String tableId;
+ private final List tableIds;
private final String operation;
- public VerifyOutputs(String identifier, String operation) {
- this.tableId = identifier;
+ public VerifyOutputs(List identifiers, String operation) {
+ this.tableIds = identifiers;
this.operation = operation;
}
@@ -160,7 +313,7 @@ public VerifyOutputs(String identifier, String operation) {
public Void apply(Iterable input) {
Row row = input.iterator().next();
- assertEquals(tableId, row.getString("table"));
+ assertThat(tableIds, Matchers.hasItem(row.getString("table")));
assertEquals(operation, row.getString("operation"));
return null;
}