diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java index f48a23559141..2ed75d7bc7e0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java @@ -24,7 +24,8 @@ import com.google.bigtable.v2.Cell; import com.google.bigtable.v2.Column; import com.google.bigtable.v2.Family; -import java.nio.ByteBuffer; +import com.google.protobuf.ByteString; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -37,11 +38,12 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.Nullable; /** * An implementation of {@link TypedSchemaTransformProvider} for Bigtable Read jobs configured via @@ -69,6 +71,13 @@ public class BigtableReadSchemaTransformProvider Schema.FieldType.STRING, Schema.FieldType.array(Schema.FieldType.row(CELL_SCHEMA)))) .build(); + public static final Schema FLATTENED_ROW_SCHEMA = + Schema.builder() + .addByteArrayField("key") + .addStringField("family_name") + .addByteArrayField("column_qualifier") + .addArrayField("cells", Schema.FieldType.row(CELL_SCHEMA)) + .build(); @Override protected SchemaTransform from(BigtableReadSchemaTransformConfiguration configuration) { @@ -88,7 +97,7 @@ public List outputCollectionNames() { /** Configuration for reading from Bigtable. */ @DefaultSchema(AutoValueSchema.class) @AutoValue - public abstract static class BigtableReadSchemaTransformConfiguration { + public abstract static class BigtableReadSchemaTransformConfiguration implements Serializable { /** Instantiates a {@link BigtableReadSchemaTransformConfiguration.Builder} instance. */ public void validate() { String emptyStringMessage = @@ -100,7 +109,8 @@ public void validate() { public static Builder builder() { return new AutoValue_BigtableReadSchemaTransformProvider_BigtableReadSchemaTransformConfiguration - .Builder(); + .Builder() + .setFlatten(true); } public abstract String getTableId(); @@ -109,6 +119,8 @@ public static Builder builder() { public abstract String getProjectId(); + public abstract @Nullable Boolean getFlatten(); + /** Builder for the {@link BigtableReadSchemaTransformConfiguration}. */ @AutoValue.Builder public abstract static class Builder { @@ -118,6 +130,8 @@ public abstract static class Builder { public abstract Builder setProjectId(String projectId); + public abstract Builder setFlatten(Boolean flatten); + /** Builds a {@link BigtableReadSchemaTransformConfiguration} instance. */ public abstract BigtableReadSchemaTransformConfiguration build(); } @@ -152,45 +166,97 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .withInstanceId(configuration.getInstanceId()) .withProjectId(configuration.getProjectId())); + Schema outputSchema = + Boolean.FALSE.equals(configuration.getFlatten()) ? ROW_SCHEMA : FLATTENED_ROW_SCHEMA; + PCollection beamRows = - bigtableRows.apply(MapElements.via(new BigtableRowToBeamRow())).setRowSchema(ROW_SCHEMA); + bigtableRows + .apply("ConvertToBeamRows", ParDo.of(new BigtableRowConverterDoFn(configuration))) + .setRowSchema(outputSchema); return PCollectionRowTuple.of(OUTPUT_TAG, beamRows); } } - public static class BigtableRowToBeamRow extends SimpleFunction { - @Override - public Row apply(com.google.bigtable.v2.Row bigtableRow) { - // The collection of families is represented as a Map of column families. - // Each column family is represented as a Map of columns. - // Each column is represented as a List of cells - // Each cell is represented as a Beam Row consisting of value and timestamp_micros - Map>> families = new HashMap<>(); - - for (Family fam : bigtableRow.getFamiliesList()) { - // Map of column qualifier to list of cells - Map> columns = new HashMap<>(); - for (Column col : fam.getColumnsList()) { - List cells = new ArrayList<>(); - for (Cell cell : col.getCellsList()) { - Row cellRow = - Row.withSchema(CELL_SCHEMA) - .withFieldValue("value", ByteBuffer.wrap(cell.getValue().toByteArray())) - .withFieldValue("timestamp_micros", cell.getTimestampMicros()) + /** + * A {@link DoFn} that converts a Bigtable {@link com.google.bigtable.v2.Row} to a Beam {@link + * Row}. It supports both a nested representation and a flattened representation where each column + * becomes a separate output element. + */ + private static class BigtableRowConverterDoFn extends DoFn { + private final BigtableReadSchemaTransformConfiguration configuration; + + BigtableRowConverterDoFn(BigtableReadSchemaTransformConfiguration configuration) { + this.configuration = configuration; + } + + private List convertCells(List bigtableCells) { + List beamCells = new ArrayList<>(); + for (Cell cell : bigtableCells) { + Row cellRow = + Row.withSchema(CELL_SCHEMA) + .withFieldValue("value", cell.getValue().toByteArray()) + .withFieldValue("timestamp_micros", cell.getTimestampMicros()) + .build(); + beamCells.add(cellRow); + } + return beamCells; + } + + @ProcessElement + public void processElement( + @Element com.google.bigtable.v2.Row bigtableRow, OutputReceiver out) { + // The builder defaults flatten to true. We check for an explicit false setting to disable it. + + if (Boolean.FALSE.equals(configuration.getFlatten())) { + // Non-flattening logic (original behavior): one output row per Bigtable row. + Map>> families = new HashMap<>(); + for (Family fam : bigtableRow.getFamiliesList()) { + Map> columns = new HashMap<>(); + for (Column col : fam.getColumnsList()) { + + List bigTableCells = col.getCellsList(); + + List cells = convertCells(bigTableCells); + + columns.put(col.getQualifier().toStringUtf8(), cells); + } + families.put(fam.getName(), columns); + } + Row beamRow = + Row.withSchema(ROW_SCHEMA) + .withFieldValue("key", bigtableRow.getKey().toByteArray()) + .withFieldValue("column_families", families) + .build(); + out.output(beamRow); + } else { + // Flattening logic (new behavior): one output row per column qualifier. + byte[] key = bigtableRow.getKey().toByteArray(); + for (Family fam : bigtableRow.getFamiliesList()) { + String familyName = fam.getName(); + for (Column col : fam.getColumnsList()) { + ByteString qualifierName = col.getQualifier(); + List cells = new ArrayList<>(); + for (Cell cell : col.getCellsList()) { + Row cellRow = + Row.withSchema(CELL_SCHEMA) + .withFieldValue("value", cell.getValue().toByteArray()) + .withFieldValue("timestamp_micros", cell.getTimestampMicros()) + .build(); + cells.add(cellRow); + } + + Row flattenedRow = + Row.withSchema(FLATTENED_ROW_SCHEMA) + .withFieldValue("key", key) + .withFieldValue("family_name", familyName) + .withFieldValue("column_qualifier", qualifierName.toByteArray()) + .withFieldValue("cells", cells) .build(); - cells.add(cellRow); + out.output(flattenedRow); } - columns.put(col.getQualifier().toStringUtf8(), cells); } - families.put(fam.getName(), columns); } - Row beamRow = - Row.withSchema(ROW_SCHEMA) - .withFieldValue("key", ByteBuffer.wrap(bigtableRow.getKey().toByteArray())) - .withFieldValue("column_families", families) - .build(); - return beamRow; } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java index 480d4199c653..455591543898 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java @@ -168,7 +168,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { validateField(inputSchema, "column_qualifier", Schema.TypeName.BYTES); } if (inputSchema.hasField("family_name")) { - validateField(inputSchema, "family_name", Schema.TypeName.BYTES); + validateField(inputSchema, "family_name", Schema.TypeName.STRING); } if (inputSchema.hasField("timestamp_micros")) { validateField(inputSchema, "timestamp_micros", Schema.TypeName.INT64); @@ -189,7 +189,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { + "\"type\": String\n" + "\"value\": ByteString\n" + "\"column_qualifier\": ByteString\n" - + "\"family_name\": ByteString\n" + + "\"family_name\": String\n" + "\"timestamp_micros\": Long\n" + "\"start_timestamp_micros\": Long\n" + "\"end_timestamp_micros\": Long\n" @@ -259,11 +259,10 @@ public PCollection>> changeMutationInput( Preconditions.checkStateNotNull( input.getBytes("column_qualifier"), "Encountered SetCell mutation with null 'column_qualifier' property. "))) - .setFamilyNameBytes( - ByteString.copyFrom( - Preconditions.checkStateNotNull( - input.getBytes("family_name"), - "Encountered SetCell mutation with null 'family_name' property."))); + .setFamilyName( + Preconditions.checkStateNotNull( + input.getString("family_name"), + "Encountered SetCell mutation with null 'family_name' property.")); // Use timestamp if provided, else default to -1 (current // Bigtable // server time) @@ -284,11 +283,10 @@ public PCollection>> changeMutationInput( Preconditions.checkStateNotNull( input.getBytes("column_qualifier"), "Encountered DeleteFromColumn mutation with null 'column_qualifier' property."))) - .setFamilyNameBytes( - ByteString.copyFrom( - Preconditions.checkStateNotNull( - input.getBytes("family_name"), - "Encountered DeleteFromColumn mutation with null 'family_name' property."))); + .setFamilyName( + Preconditions.checkStateNotNull( + input.getString("family_name"), + "Encountered DeleteFromColumn mutation with null 'family_name' property.")); // if start or end timestamp provided // Timestamp Range (optional, assuming Long type in Row schema) @@ -322,11 +320,10 @@ public PCollection>> changeMutationInput( Mutation.newBuilder() .setDeleteFromFamily( Mutation.DeleteFromFamily.newBuilder() - .setFamilyNameBytes( - ByteString.copyFrom( - Preconditions.checkStateNotNull( - input.getBytes("family_name"), - "Encountered DeleteFromFamily mutation with null 'family_name' property."))) + .setFamilyName( + Preconditions.checkStateNotNull( + input.getString("family_name"), + "Encountered DeleteFromFamily mutation with null 'family_name' property.")) .build()) .build(); break; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java index 81d3103f38bf..65e02c47d353 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.bigtable; import static org.apache.beam.sdk.io.gcp.bigtable.BigtableReadSchemaTransformProvider.CELL_SCHEMA; +import static org.apache.beam.sdk.io.gcp.bigtable.BigtableReadSchemaTransformProvider.FLATTENED_ROW_SCHEMA; import static org.apache.beam.sdk.io.gcp.bigtable.BigtableReadSchemaTransformProvider.ROW_SCHEMA; import static org.junit.Assert.assertThrows; @@ -28,7 +29,6 @@ import com.google.cloud.bigtable.data.v2.BigtableDataClient; import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.models.RowMutation; -import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -136,95 +136,194 @@ public void tearDown() { tableAdminClient.close(); } - public List writeToTable(int numRows) { + @Test + public void testRead() { + int numRows = 20; List expectedRows = new ArrayList<>(); + for (int i = 1; i <= numRows; i++) { + String key = "key" + i; + byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + String valueA = "value a" + i; + byte[] valueABytes = valueA.getBytes(StandardCharsets.UTF_8); + String valueB = "value b" + i; + byte[] valueBBytes = valueB.getBytes(StandardCharsets.UTF_8); + String valueC = "value c" + i; + byte[] valueCBytes = valueC.getBytes(StandardCharsets.UTF_8); + String valueD = "value d" + i; + byte[] valueDBytes = valueD.getBytes(StandardCharsets.UTF_8); + long timestamp = 1000L * i; - try { - for (int i = 1; i <= numRows; i++) { - String key = "key" + i; - String valueA = "value a" + i; - String valueB = "value b" + i; - String valueC = "value c" + i; - String valueD = "value d" + i; - long timestamp = 1000L * i; - - RowMutation rowMutation = - RowMutation.create(tableId, key) - .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA) - .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB) - .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC) - .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD); - dataClient.mutateRow(rowMutation); - - // Set up expected Beam Row - Map> columns1 = new HashMap<>(); - columns1.put( - "a", - Arrays.asList( - Row.withSchema(CELL_SCHEMA) - .withFieldValue( - "value", ByteBuffer.wrap(valueA.getBytes(StandardCharsets.UTF_8))) - .withFieldValue("timestamp_micros", timestamp) - .build())); - columns1.put( - "b", - Arrays.asList( - Row.withSchema(CELL_SCHEMA) - .withFieldValue( - "value", ByteBuffer.wrap(valueB.getBytes(StandardCharsets.UTF_8))) - .withFieldValue("timestamp_micros", timestamp) - .build())); - - Map> columns2 = new HashMap<>(); - columns2.put( - "c", - Arrays.asList( - Row.withSchema(CELL_SCHEMA) - .withFieldValue( - "value", ByteBuffer.wrap(valueC.getBytes(StandardCharsets.UTF_8))) - .withFieldValue("timestamp_micros", timestamp) - .build())); - columns2.put( - "d", - Arrays.asList( - Row.withSchema(CELL_SCHEMA) - .withFieldValue( - "value", ByteBuffer.wrap(valueD.getBytes(StandardCharsets.UTF_8))) - .withFieldValue("timestamp_micros", timestamp) - .build())); - - Map>> families = new HashMap<>(); - families.put(COLUMN_FAMILY_NAME_1, columns1); - families.put(COLUMN_FAMILY_NAME_2, columns2); - - Row expectedRow = - Row.withSchema(ROW_SCHEMA) - .withFieldValue("key", ByteBuffer.wrap(key.getBytes(StandardCharsets.UTF_8))) - .withFieldValue("column_families", families) - .build(); - - expectedRows.add(expectedRow); - } - LOG.info("Finished writing {} rows to table {}", numRows, tableId); - } catch (NotFoundException e) { - throw new RuntimeException("Failed to write to table", e); + RowMutation rowMutation = + RowMutation.create(tableId, key) + .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA) + .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB) + .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC) + .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD); + dataClient.mutateRow(rowMutation); + + // Set up expected Beam Row + Map> columns1 = new HashMap<>(); + columns1.put( + "a", + Arrays.asList( + Row.withSchema(CELL_SCHEMA) + .withFieldValue("value", valueABytes) + .withFieldValue("timestamp_micros", timestamp) + .build())); + columns1.put( + "b", + Arrays.asList( + Row.withSchema(CELL_SCHEMA) + .withFieldValue("value", valueBBytes) + .withFieldValue("timestamp_micros", timestamp) + .build())); + + Map> columns2 = new HashMap<>(); + columns2.put( + "c", + Arrays.asList( + Row.withSchema(CELL_SCHEMA) + .withFieldValue("value", valueCBytes) + .withFieldValue("timestamp_micros", timestamp) + .build())); + columns2.put( + "d", + Arrays.asList( + Row.withSchema(CELL_SCHEMA) + .withFieldValue("value", valueDBytes) + .withFieldValue("timestamp_micros", timestamp) + .build())); + + Map>> families = new HashMap<>(); + families.put(COLUMN_FAMILY_NAME_1, columns1); + families.put(COLUMN_FAMILY_NAME_2, columns2); + + Row expectedRow = + Row.withSchema(ROW_SCHEMA) + .withFieldValue("key", keyBytes) + .withFieldValue("column_families", families) + .build(); + + expectedRows.add(expectedRow); } - return expectedRows; + LOG.info("Finished writing {} rows to table {}", numRows, tableId); + + BigtableReadSchemaTransformConfiguration config = + BigtableReadSchemaTransformConfiguration.builder() + .setTableId(tableId) + .setInstanceId(instanceId) + .setProjectId(projectId) + .setFlatten(false) + .build(); + + SchemaTransform transform = new BigtableReadSchemaTransformProvider().from(config); + + PCollection rows = PCollectionRowTuple.empty(p).apply(transform).get("output"); + + PAssert.that(rows).containsInAnyOrder(expectedRows); + p.run().waitUntilFinish(); } @Test - public void testRead() { - List expectedRows = writeToTable(20); + public void testReadFlatten() { + int numRows = 20; + List expectedRows = new ArrayList<>(); + for (int i = 1; i <= numRows; i++) { + String key = "key" + i; + byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + String valueA = "value a" + i; + byte[] valueABytes = valueA.getBytes(StandardCharsets.UTF_8); + String valueB = "value b" + i; + byte[] valueBBytes = valueB.getBytes(StandardCharsets.UTF_8); + String valueC = "value c" + i; + byte[] valueCBytes = valueC.getBytes(StandardCharsets.UTF_8); + String valueD = "value d" + i; + byte[] valueDBytes = valueD.getBytes(StandardCharsets.UTF_8); + long timestamp = 1000L * i; + // Write a row with four distinct columns to Bigtable + RowMutation rowMutation = + RowMutation.create(tableId, key) + .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA) + .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB) + .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC) + .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD); + dataClient.mutateRow(rowMutation); + + // For each Bigtable row, we expect four flattened Beam Rows as output. + // Each Row corresponds to one column. + expectedRows.add( + Row.withSchema(FLATTENED_ROW_SCHEMA) + .withFieldValue("key", keyBytes) + .withFieldValue("family_name", COLUMN_FAMILY_NAME_1) + .withFieldValue("column_qualifier", "a".getBytes(StandardCharsets.UTF_8)) + .withFieldValue( + "cells", + Arrays.asList( + Row.withSchema(CELL_SCHEMA) + .withFieldValue("value", valueABytes) + .withFieldValue("timestamp_micros", timestamp) + .build())) + .build()); + + expectedRows.add( + Row.withSchema(FLATTENED_ROW_SCHEMA) + .withFieldValue("key", keyBytes) + .withFieldValue("family_name", COLUMN_FAMILY_NAME_1) + .withFieldValue("column_qualifier", "b".getBytes(StandardCharsets.UTF_8)) + .withFieldValue( + "cells", + Arrays.asList( + Row.withSchema(CELL_SCHEMA) + .withFieldValue("value", valueBBytes) + .withFieldValue("timestamp_micros", timestamp) + .build())) + .build()); + + expectedRows.add( + Row.withSchema(FLATTENED_ROW_SCHEMA) + .withFieldValue("key", keyBytes) + .withFieldValue("family_name", COLUMN_FAMILY_NAME_2) + .withFieldValue("column_qualifier", "c".getBytes(StandardCharsets.UTF_8)) + .withFieldValue( + "cells", + Arrays.asList( + Row.withSchema(CELL_SCHEMA) + .withFieldValue("value", valueCBytes) + .withFieldValue("timestamp_micros", timestamp) + .build())) + .build()); + + expectedRows.add( + Row.withSchema(FLATTENED_ROW_SCHEMA) + .withFieldValue("key", keyBytes) + .withFieldValue("family_name", COLUMN_FAMILY_NAME_2) + .withFieldValue("column_qualifier", "d".getBytes(StandardCharsets.UTF_8)) + .withFieldValue( + "cells", + Arrays.asList( + Row.withSchema(CELL_SCHEMA) + .withFieldValue("value", valueDBytes) + .withFieldValue("timestamp_micros", timestamp) + .build())) + .build()); + } + LOG.info("Finished writing {} rows to table {} with Flatten state true", numRows, tableId); + + // Configure the transform to use flatten mode (the default). BigtableReadSchemaTransformConfiguration config = BigtableReadSchemaTransformConfiguration.builder() .setTableId(tableId) .setInstanceId(instanceId) .setProjectId(projectId) + .setFlatten(true) .build(); + SchemaTransform transform = new BigtableReadSchemaTransformProvider().from(config); PCollection rows = PCollectionRowTuple.empty(p).apply(transform).get("output"); + + // Assert that the actual rows match the expected flattened rows. PAssert.that(rows).containsInAnyOrder(expectedRows); p.run().waitUntilFinish(); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProviderIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProviderIT.java index 7a5dcdc3e999..eceb1ddff4be 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProviderIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProviderIT.java @@ -156,7 +156,7 @@ public void testSetMutationsExistingColumn() { .addStringField("type") .addByteArrayField("value") .addByteArrayField("column_qualifier") - .addByteArrayField("family_name") + .addStringField("family_name") .addField("timestamp_micros", FieldType.INT64) // Changed to INT64 .build(); @@ -166,7 +166,7 @@ public void testSetMutationsExistingColumn() { .withFieldValue("type", "SetCell") .withFieldValue("value", "new-val-1-a".getBytes(StandardCharsets.UTF_8)) .withFieldValue("column_qualifier", "col_a".getBytes(StandardCharsets.UTF_8)) - .withFieldValue("family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8)) + .withFieldValue("family_name", COLUMN_FAMILY_NAME_1) .withFieldValue("timestamp_micros", 2000L) .build(); Row mutationRow2 = @@ -175,7 +175,7 @@ public void testSetMutationsExistingColumn() { .withFieldValue("type", "SetCell") .withFieldValue("value", "new-val-1-c".getBytes(StandardCharsets.UTF_8)) .withFieldValue("column_qualifier", "col_c".getBytes(StandardCharsets.UTF_8)) - .withFieldValue("family_name", COLUMN_FAMILY_NAME_2.getBytes(StandardCharsets.UTF_8)) + .withFieldValue("family_name", COLUMN_FAMILY_NAME_2) .withFieldValue("timestamp_micros", 2000L) .build(); @@ -225,7 +225,7 @@ public void testSetMutationNewColumn() { .addStringField("type") .addByteArrayField("value") .addByteArrayField("column_qualifier") - .addByteArrayField("family_name") + .addStringField("family_name") .addField("timestamp_micros", FieldType.INT64) .build(); Row mutationRow = @@ -234,7 +234,7 @@ public void testSetMutationNewColumn() { .withFieldValue("type", "SetCell") .withFieldValue("value", "new-val-1".getBytes(StandardCharsets.UTF_8)) .withFieldValue("column_qualifier", "new_col".getBytes(StandardCharsets.UTF_8)) - .withFieldValue("family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8)) + .withFieldValue("family_name", COLUMN_FAMILY_NAME_1) .withFieldValue("timestamp_micros", 999_000L) .build(); @@ -276,14 +276,14 @@ public void testDeleteCellsFromColumn() { .addByteArrayField("key") .addStringField("type") .addByteArrayField("column_qualifier") - .addByteArrayField("family_name") + .addStringField("family_name") .build(); Row mutationRow = Row.withSchema(testSchema) .withFieldValue("key", "key-1".getBytes(StandardCharsets.UTF_8)) .withFieldValue("type", "DeleteFromColumn") .withFieldValue("column_qualifier", "col_a".getBytes(StandardCharsets.UTF_8)) - .withFieldValue("family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8)) + .withFieldValue("family_name", COLUMN_FAMILY_NAME_1) .build(); PCollection inputPCollection = p.apply(Create.of(Arrays.asList(mutationRow))); @@ -325,7 +325,7 @@ public void testDeleteCellsFromColumnWithTimestampRange() { .addByteArrayField("key") .addStringField("type") .addByteArrayField("column_qualifier") - .addByteArrayField("family_name") + .addStringField("family_name") .addField("start_timestamp_micros", FieldType.INT64) .addField("end_timestamp_micros", FieldType.INT64) .build(); @@ -334,7 +334,7 @@ public void testDeleteCellsFromColumnWithTimestampRange() { .withFieldValue("key", "key-1".getBytes(StandardCharsets.UTF_8)) .withFieldValue("type", "DeleteFromColumn") .withFieldValue("column_qualifier", "col".getBytes(StandardCharsets.UTF_8)) - .withFieldValue("family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8)) + .withFieldValue("family_name", COLUMN_FAMILY_NAME_1) .withFieldValue("start_timestamp_micros", 99_990_000L) .withFieldValue("end_timestamp_micros", 100_000_000L) .build(); @@ -373,13 +373,13 @@ public void testDeleteColumnFamily() { Schema.builder() .addByteArrayField("key") .addStringField("type") - .addByteArrayField("family_name") + .addStringField("family_name") .build(); Row mutationRow = Row.withSchema(testSchema) .withFieldValue("key", "key-1".getBytes(StandardCharsets.UTF_8)) .withFieldValue("type", "DeleteFromFamily") - .withFieldValue("family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8)) + .withFieldValue("family_name", COLUMN_FAMILY_NAME_1) .build(); PCollection inputPCollection = p.apply(Create.of(Arrays.asList(mutationRow))); @@ -484,7 +484,7 @@ public void testAllMutations() { "column_qualifier", FieldType.BYTES) // Used by SetCell, DeleteFromColumn .addNullableField( "family_name", - FieldType.BYTES) // Used by SetCell, DeleteFromColumn, DeleteFromFamily + FieldType.STRING) // Used by SetCell, DeleteFromColumn, DeleteFromFamily .addNullableField("timestamp_micros", FieldType.INT64) // Optional for SetCell .addNullableField( "start_timestamp_micros", FieldType.INT64) // Used by DeleteFromColumn with range @@ -503,7 +503,7 @@ public void testAllMutations() { .withFieldValue("type", "SetCell") .withFieldValue("value", "updated_val_1".getBytes(StandardCharsets.UTF_8)) .withFieldValue("column_qualifier", "col_initial_1".getBytes(StandardCharsets.UTF_8)) - .withFieldValue("family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8)) + .withFieldValue("family_name", COLUMN_FAMILY_NAME_1) .withFieldValue("timestamp_micros", 3000L) .build()); // Add new cell to "row-setcell" @@ -513,7 +513,7 @@ public void testAllMutations() { .withFieldValue("type", "SetCell") .withFieldValue("value", "new_col_val".getBytes(StandardCharsets.UTF_8)) .withFieldValue("column_qualifier", "new_col_A".getBytes(StandardCharsets.UTF_8)) - .withFieldValue("family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8)) + .withFieldValue("family_name", COLUMN_FAMILY_NAME_1) .withFieldValue("timestamp_micros", 4000L) .build()); @@ -524,7 +524,7 @@ public void testAllMutations() { .withFieldValue("key", "row-delete-col".getBytes(StandardCharsets.UTF_8)) .withFieldValue("type", "DeleteFromColumn") .withFieldValue("column_qualifier", "col_to_delete_A".getBytes(StandardCharsets.UTF_8)) - .withFieldValue("family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8)) + .withFieldValue("family_name", COLUMN_FAMILY_NAME_1) .build()); // 3. DeleteFromColumn with Timestamp Range @@ -534,7 +534,7 @@ public void testAllMutations() { .withFieldValue("key", "row-delete-col-ts".getBytes(StandardCharsets.UTF_8)) .withFieldValue("type", "DeleteFromColumn") .withFieldValue("column_qualifier", "ts_col".getBytes(StandardCharsets.UTF_8)) - .withFieldValue("family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8)) + .withFieldValue("family_name", COLUMN_FAMILY_NAME_1) .withFieldValue("start_timestamp_micros", 999L) // Inclusive .withFieldValue("end_timestamp_micros", 1001L) // Exclusive .build()); @@ -545,7 +545,7 @@ public void testAllMutations() { Row.withSchema(uberSchema) .withFieldValue("key", "row-delete-family".getBytes(StandardCharsets.UTF_8)) .withFieldValue("type", "DeleteFromFamily") - .withFieldValue("family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8)) + .withFieldValue("family_name", COLUMN_FAMILY_NAME_1) .build()); // 5. DeleteFromRow diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py b/sdks/python/apache_beam/io/gcp/bigtableio.py index b32433df547a..ff140082a1ef 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio.py @@ -357,7 +357,8 @@ def expand(self, input): rearrange_based_on_discovery=True, table_id=self._table_id, instance_id=self._instance_id, - project_id=self._project_id) + project_id=self._project_id, + flatten=False) return ( input.pipeline diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index cee3b8f2bca2..3865af184b61 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -33,6 +33,7 @@ from typing import Dict from typing import Generic from typing import Iterator +from typing import NamedTuple from typing import Optional from typing import Sequence from typing import TypeVar @@ -675,11 +676,15 @@ def __hash__(self): return hash(self.__dict__.items()) def __eq__(self, other): + if type(self) == type(other): + other_dict = other.__dict__ + elif type(other) == type(NamedTuple): + other_dict = other._asdict() + else: + return False return ( - type(self) == type(other) and - len(self.__dict__) == len(other.__dict__) and all( - s == o - for s, o in zip(self.__dict__.items(), other.__dict__.items()))) + len(self.__dict__) == len(other_dict) and + all(s == o for s, o in zip(self.__dict__.items(), other_dict.items()))) def __reduce__(self): return _make_Row, tuple(self.__dict__.items()) diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 80b2e76ace27..4bf4ba037faf 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -375,15 +375,16 @@ #BigTable - type: renaming transforms: - #'ReadFromBigTable': 'ReadFromBigTable' + 'ReadFromBigTable': 'ReadFromBigTable' 'WriteToBigTable': 'WriteToBigTable' config: mappings: #Temp removing read from bigTable IO -# 'ReadFromBigTable': -# project: 'project_id' -# instance: 'instance_id' -# table: 'table_id' + 'ReadFromBigTable': + project: 'project_id' + instance: 'instance_id' + table: 'table_id' + flatten: "flatten" 'WriteToBigTable': project: 'project_id' instance: 'instance_id' @@ -391,7 +392,7 @@ underlying_provider: type: beamJar transforms: -# 'ReadFromBigTable': 'beam:schematransform:org.apache.beam:bigtable_read:v1' + 'ReadFromBigTable': 'beam:schematransform:org.apache.beam:bigtable_read:v1' 'WriteToBigTable': 'beam:schematransform:org.apache.beam:bigtable_write:v1' config: gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' diff --git a/sdks/python/apache_beam/yaml/tests/bigtable.yaml b/sdks/python/apache_beam/yaml/tests/bigtable.yaml index eae5f1bcbb74..2f97b83c6e92 100644 --- a/sdks/python/apache_beam/yaml/tests/bigtable.yaml +++ b/sdks/python/apache_beam/yaml/tests/bigtable.yaml @@ -37,17 +37,17 @@ pipelines: config: elements: - {key: 'row1', - type: 'SetCell', - family_name: "cf1", - column_qualifier: "cq1", - value: "value1", - timestamp_micros: -1} + type: 'SetCell', + family_name: "cf1", + column_qualifier: "cq1", + value: "value1", + timestamp_micros: 5000} - {key: 'row1', - type: 'SetCell', - family_name: "cf2", - column_qualifier: "cq1", - value: "value2", - timestamp_micros: 1000} + type: 'SetCell', + family_name: "cf1", + column_qualifier: "cq2", + value: "value2", + timestamp_micros: 1000} - type: LogForTesting - type: MapToFields @@ -63,9 +63,7 @@ pipelines: type: type family_name: - callable: | - def convert_to_bytes(row): - return bytes(row.family_name, 'utf-8') if 'family_name' in row._fields else None + family_name column_qualifier: callable: | def convert_to_bytes(row): @@ -85,3 +83,95 @@ pipelines: project: 'apache-beam-testing' instance: "{BT_INSTANCE}" table: 'test-table' + - pipeline: + type: chain + transforms: + - type: ReadFromBigTable + config: + project: 'apache-beam-testing' + instance: "{BT_INSTANCE}" + table: 'test-table' + - type: MapToFields + config: + language: python + fields: + key: + callable: | + def convert_to_string(row): + return row.key.decode("utf-8") if "key" in row._fields else None + family_name: + family_name + column_qualifier: + callable: | + def convert_to_string(row): + return row.column_qualifier.decode("utf-8") if "column_qualifier" in row._fields else None + cells: + callable: | + def convert_to_string(row): + cell_bytes = [] + for (value, timestamp) in row.cells: + value_bytes = value.decode("utf-8") + cell_bytes.append(beam.Row(value=value_bytes, timestamp_micros=timestamp)) + return cell_bytes + - type: AssertEqual + config: + elements: + - { key: 'row1', + family_name: "cf1", + column_qualifier: "cq1", + cells:[{ + value: "value1", + timestamp_micros: 5000}]} + - { key: 'row1', + family_name: "cf1", + column_qualifier: "cq2", + cells: [{ + value: "value2", + timestamp_micros: 1000 } ] } + - type: LogForTesting + + - pipeline: + type: chain + transforms: + - type: ReadFromBigTable + config: + project: 'apache-beam-testing' + instance: "{BT_INSTANCE}" + table: 'test-table' + flatten: False + - type: MapToFields + config: + language: python + fields: + key: + callable: | + def convert_to_bytes(row): + return row.key.decode("utf-8") if "key" in row._fields else None + + column_families: + column_families +# TODO: issue #35790, once fixed we can uncomment this assert +# - type: AssertEqual +# config: +# elements: +# - {key: 'row1', +# # Use explicit map syntax to match the actual output +# column_families: { +# cf1: { +# cq1: [ +# { value: "value1", timestamp_micros: 5000 } +# ], +# cq2: [ +# { value: "value2", timestamp_micros: 1000 } +# ] +# } +# } +# } + # - {'key': 'row1', + # column_families: {cf1: {cq2: + # [BeamSchema_3281a0ae_fe85_474b_9030_86fbed58833a(value=b'value2', timestamp_micros=1000)], 'cq1': [BeamSchema_3281a0ae_fe85_474b_9030_86fbed58833a(value=b'value1', timestamp_micros=5000)]}}} + + +# - type: LogForTesting + +