-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Add BigTableRead connector and new feature implemented #35696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ad54a58
0edf81d
18c9395
a25033e
0e4fb14
c048dcf
4ebc7c8
3bb3dfc
a8b8196
cf8bd8f
a06d7c6
5760278
5cb0dd7
f2640ae
69467c5
121ddf6
82d3612
14ca717
c36d864
d6366dd
fcb5d03
38ff386
a15e4ff
c759a07
8e19a81
b6d0157
7ea3f76
a35ced7
50bb5a3
426519d
3152094
84a3cfd
1ca2527
954355b
ab18e18
80a732e
16f1064
3c9c582
b842ac9
0ed5da1
cea5987
b6498c8
5f6992d
bdc9cff
37abe22
5cb46df
b1fae9c
4866acc
8ac0fda
32bfbe8
b217de2
1ad3a32
364a761
5338470
c1bc8c6
fad8ae8
4315c4f
9e4514c
64a7303
1fc5366
680678b
6fa20ab
2b2af72
8c96d22
373b87f
9bd071c
74b6dc3
01f84da
54b6ad1
c46ef26
b4fab07
c600ea0
2d30e08
221e558
9cb6c32
a9f77eb
c0596f3
1db6821
80544d0
5753f18
6cd69d5
a53045c
7874226
92a0ff9
54b9900
5b58815
ca12b07
81aa2ed
417bfea
fbf74a5
8aad18a
d3f17bd
d0d12ae
16030c6
15a8bd2
85c1392
2cdd808
636df03
0ab4db4
204ff4d
a712320
2e09dd7
f28eea9
d26b45d
0b6e855
bfa8431
78afb0d
df653fc
ff8bb26
80509a2
98642a3
9a7c16e
a12cdbd
de860a4
5293f96
bc1d637
159a9ac
4ba3104
ff6449d
dda6544
215587d
b140513
9fd3658
0651ec8
764d51b
b423787
2b0806d
77e4dd3
b4ad9e4
4b9ce38
f56e50d
3386f94
2fd82ea
3059504
e0a2bd5
af0e80b
188222f
ef8b856
669b80d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,7 +24,8 @@ | |
| import com.google.bigtable.v2.Cell; | ||
| import com.google.bigtable.v2.Column; | ||
| import com.google.bigtable.v2.Family; | ||
| import java.nio.ByteBuffer; | ||
| import com.google.protobuf.ByteString; | ||
| import java.io.Serializable; | ||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
|
|
@@ -37,11 +38,12 @@ | |
| import org.apache.beam.sdk.schemas.transforms.SchemaTransform; | ||
| import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; | ||
| import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; | ||
| import org.apache.beam.sdk.transforms.MapElements; | ||
| import org.apache.beam.sdk.transforms.SimpleFunction; | ||
| import org.apache.beam.sdk.transforms.DoFn; | ||
| import org.apache.beam.sdk.transforms.ParDo; | ||
| import org.apache.beam.sdk.values.PCollection; | ||
| import org.apache.beam.sdk.values.PCollectionRowTuple; | ||
| import org.apache.beam.sdk.values.Row; | ||
| import org.checkerframework.checker.nullness.qual.Nullable; | ||
|
|
||
| /** | ||
| * An implementation of {@link TypedSchemaTransformProvider} for Bigtable Read jobs configured via | ||
|
|
@@ -69,6 +71,13 @@ public class BigtableReadSchemaTransformProvider | |
| Schema.FieldType.STRING, | ||
| Schema.FieldType.array(Schema.FieldType.row(CELL_SCHEMA)))) | ||
| .build(); | ||
| public static final Schema FLATTENED_ROW_SCHEMA = | ||
| Schema.builder() | ||
| .addByteArrayField("key") | ||
| .addStringField("family_name") | ||
arnavarora2004 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| .addByteArrayField("column_qualifier") | ||
| .addArrayField("cells", Schema.FieldType.row(CELL_SCHEMA)) | ||
| .build(); | ||
|
|
||
| @Override | ||
| protected SchemaTransform from(BigtableReadSchemaTransformConfiguration configuration) { | ||
|
|
@@ -88,7 +97,7 @@ public List<String> outputCollectionNames() { | |
| /** Configuration for reading from Bigtable. */ | ||
| @DefaultSchema(AutoValueSchema.class) | ||
| @AutoValue | ||
| public abstract static class BigtableReadSchemaTransformConfiguration { | ||
| public abstract static class BigtableReadSchemaTransformConfiguration implements Serializable { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We shouldn't need to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for some reason this is the only way I can like set it up so it works correctly, I can try to remove it but I don't believe it will work,
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is still not working? what error are you seeing? |
||
| /** Instantiates a {@link BigtableReadSchemaTransformConfiguration.Builder} instance. */ | ||
| public void validate() { | ||
| String emptyStringMessage = | ||
|
|
@@ -100,7 +109,8 @@ public void validate() { | |
|
|
||
| public static Builder builder() { | ||
| return new AutoValue_BigtableReadSchemaTransformProvider_BigtableReadSchemaTransformConfiguration | ||
| .Builder(); | ||
| .Builder() | ||
| .setFlatten(true); | ||
| } | ||
|
|
||
| public abstract String getTableId(); | ||
|
|
@@ -109,6 +119,8 @@ public static Builder builder() { | |
|
|
||
| public abstract String getProjectId(); | ||
|
|
||
| public abstract @Nullable Boolean getFlatten(); | ||
arnavarora2004 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /** Builder for the {@link BigtableReadSchemaTransformConfiguration}. */ | ||
| @AutoValue.Builder | ||
| public abstract static class Builder { | ||
|
|
@@ -118,6 +130,8 @@ public abstract static class Builder { | |
|
|
||
| public abstract Builder setProjectId(String projectId); | ||
|
|
||
| public abstract Builder setFlatten(Boolean flatten); | ||
|
|
||
| /** Builds a {@link BigtableReadSchemaTransformConfiguration} instance. */ | ||
| public abstract BigtableReadSchemaTransformConfiguration build(); | ||
| } | ||
|
|
@@ -152,45 +166,97 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { | |
| .withInstanceId(configuration.getInstanceId()) | ||
| .withProjectId(configuration.getProjectId())); | ||
|
|
||
| Schema outputSchema = | ||
| Boolean.FALSE.equals(configuration.getFlatten()) ? ROW_SCHEMA : FLATTENED_ROW_SCHEMA; | ||
|
|
||
| PCollection<Row> beamRows = | ||
| bigtableRows.apply(MapElements.via(new BigtableRowToBeamRow())).setRowSchema(ROW_SCHEMA); | ||
| bigtableRows | ||
| .apply("ConvertToBeamRows", ParDo.of(new BigtableRowConverterDoFn(configuration))) | ||
| .setRowSchema(outputSchema); | ||
|
|
||
| return PCollectionRowTuple.of(OUTPUT_TAG, beamRows); | ||
| } | ||
| } | ||
|
|
||
| public static class BigtableRowToBeamRow extends SimpleFunction<com.google.bigtable.v2.Row, Row> { | ||
| @Override | ||
| public Row apply(com.google.bigtable.v2.Row bigtableRow) { | ||
| // The collection of families is represented as a Map of column families. | ||
| // Each column family is represented as a Map of columns. | ||
| // Each column is represented as a List of cells | ||
| // Each cell is represented as a Beam Row consisting of value and timestamp_micros | ||
| Map<String, Map<String, List<Row>>> families = new HashMap<>(); | ||
|
|
||
| for (Family fam : bigtableRow.getFamiliesList()) { | ||
| // Map of column qualifier to list of cells | ||
| Map<String, List<Row>> columns = new HashMap<>(); | ||
| for (Column col : fam.getColumnsList()) { | ||
| List<Row> cells = new ArrayList<>(); | ||
| for (Cell cell : col.getCellsList()) { | ||
| Row cellRow = | ||
| Row.withSchema(CELL_SCHEMA) | ||
| .withFieldValue("value", ByteBuffer.wrap(cell.getValue().toByteArray())) | ||
| .withFieldValue("timestamp_micros", cell.getTimestampMicros()) | ||
| /** | ||
| * A {@link DoFn} that converts a Bigtable {@link com.google.bigtable.v2.Row} to a Beam {@link | ||
| * Row}. It supports both a nested representation and a flattened representation where each column | ||
| * becomes a separate output element. | ||
| */ | ||
| private static class BigtableRowConverterDoFn extends DoFn<com.google.bigtable.v2.Row, Row> { | ||
| private final BigtableReadSchemaTransformConfiguration configuration; | ||
|
|
||
| BigtableRowConverterDoFn(BigtableReadSchemaTransformConfiguration configuration) { | ||
| this.configuration = configuration; | ||
| } | ||
|
|
||
| private List<Row> convertCells(List<Cell> bigtableCells) { | ||
| List<Row> beamCells = new ArrayList<>(); | ||
| for (Cell cell : bigtableCells) { | ||
| Row cellRow = | ||
| Row.withSchema(CELL_SCHEMA) | ||
| .withFieldValue("value", cell.getValue().toByteArray()) | ||
| .withFieldValue("timestamp_micros", cell.getTimestampMicros()) | ||
| .build(); | ||
| beamCells.add(cellRow); | ||
| } | ||
| return beamCells; | ||
| } | ||
|
|
||
| @ProcessElement | ||
| public void processElement( | ||
| @Element com.google.bigtable.v2.Row bigtableRow, OutputReceiver<Row> out) { | ||
arnavarora2004 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // The builder defaults flatten to true. We check for an explicit false setting to disable it. | ||
|
|
||
| if (Boolean.FALSE.equals(configuration.getFlatten())) { | ||
| // Non-flattening logic (original behavior): one output row per Bigtable row. | ||
| Map<String, Map<String, List<Row>>> families = new HashMap<>(); | ||
| for (Family fam : bigtableRow.getFamiliesList()) { | ||
| Map<String, List<Row>> columns = new HashMap<>(); | ||
| for (Column col : fam.getColumnsList()) { | ||
|
|
||
| List<Cell> bigTableCells = col.getCellsList(); | ||
|
|
||
| List<Row> cells = convertCells(bigTableCells); | ||
|
|
||
| columns.put(col.getQualifier().toStringUtf8(), cells); | ||
| } | ||
| families.put(fam.getName(), columns); | ||
| } | ||
| Row beamRow = | ||
| Row.withSchema(ROW_SCHEMA) | ||
| .withFieldValue("key", bigtableRow.getKey().toByteArray()) | ||
| .withFieldValue("column_families", families) | ||
| .build(); | ||
| out.output(beamRow); | ||
| } else { | ||
| // Flattening logic (new behavior): one output row per column qualifier. | ||
| byte[] key = bigtableRow.getKey().toByteArray(); | ||
| for (Family fam : bigtableRow.getFamiliesList()) { | ||
| String familyName = fam.getName(); | ||
| for (Column col : fam.getColumnsList()) { | ||
| ByteString qualifierName = col.getQualifier(); | ||
| List<Row> cells = new ArrayList<>(); | ||
| for (Cell cell : col.getCellsList()) { | ||
arnavarora2004 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Row cellRow = | ||
| Row.withSchema(CELL_SCHEMA) | ||
| .withFieldValue("value", cell.getValue().toByteArray()) | ||
| .withFieldValue("timestamp_micros", cell.getTimestampMicros()) | ||
| .build(); | ||
| cells.add(cellRow); | ||
| } | ||
|
|
||
| Row flattenedRow = | ||
| Row.withSchema(FLATTENED_ROW_SCHEMA) | ||
| .withFieldValue("key", key) | ||
| .withFieldValue("family_name", familyName) | ||
| .withFieldValue("column_qualifier", qualifierName.toByteArray()) | ||
| .withFieldValue("cells", cells) | ||
| .build(); | ||
| cells.add(cellRow); | ||
| out.output(flattenedRow); | ||
| } | ||
| columns.put(col.getQualifier().toStringUtf8(), cells); | ||
| } | ||
| families.put(fam.getName(), columns); | ||
| } | ||
| Row beamRow = | ||
| Row.withSchema(ROW_SCHEMA) | ||
| .withFieldValue("key", ByteBuffer.wrap(bigtableRow.getKey().toByteArray())) | ||
| .withFieldValue("column_families", families) | ||
| .build(); | ||
| return beamRow; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -168,7 +168,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { | |
| validateField(inputSchema, "column_qualifier", Schema.TypeName.BYTES); | ||
| } | ||
| if (inputSchema.hasField("family_name")) { | ||
| validateField(inputSchema, "family_name", Schema.TypeName.BYTES); | ||
| validateField(inputSchema, "family_name", Schema.TypeName.STRING); | ||
| } | ||
| if (inputSchema.hasField("timestamp_micros")) { | ||
| validateField(inputSchema, "timestamp_micros", Schema.TypeName.INT64); | ||
|
|
@@ -189,7 +189,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { | |
| + "\"type\": String\n" | ||
| + "\"value\": ByteString\n" | ||
| + "\"column_qualifier\": ByteString\n" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The type for qualifier is bytearray but here it's bytestring, is this accurate? Same for key and value
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thank you for that catch!
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bump on this ^ |
||
| + "\"family_name\": ByteString\n" | ||
| + "\"family_name\": String\n" | ||
| + "\"timestamp_micros\": Long\n" | ||
| + "\"start_timestamp_micros\": Long\n" | ||
| + "\"end_timestamp_micros\": Long\n" | ||
|
|
@@ -259,11 +259,10 @@ public PCollection<KV<ByteString, Iterable<Mutation>>> changeMutationInput( | |
| Preconditions.checkStateNotNull( | ||
| input.getBytes("column_qualifier"), | ||
| "Encountered SetCell mutation with null 'column_qualifier' property. "))) | ||
| .setFamilyNameBytes( | ||
| ByteString.copyFrom( | ||
| Preconditions.checkStateNotNull( | ||
| input.getBytes("family_name"), | ||
| "Encountered SetCell mutation with null 'family_name' property."))); | ||
| .setFamilyName( | ||
| Preconditions.checkStateNotNull( | ||
| input.getString("family_name"), | ||
| "Encountered SetCell mutation with null 'family_name' property.")); | ||
| // Use timestamp if provided, else default to -1 (current | ||
| // Bigtable | ||
| // server time) | ||
|
|
@@ -284,11 +283,10 @@ public PCollection<KV<ByteString, Iterable<Mutation>>> changeMutationInput( | |
| Preconditions.checkStateNotNull( | ||
| input.getBytes("column_qualifier"), | ||
| "Encountered DeleteFromColumn mutation with null 'column_qualifier' property."))) | ||
| .setFamilyNameBytes( | ||
| ByteString.copyFrom( | ||
| Preconditions.checkStateNotNull( | ||
| input.getBytes("family_name"), | ||
| "Encountered DeleteFromColumn mutation with null 'family_name' property."))); | ||
| .setFamilyName( | ||
| Preconditions.checkStateNotNull( | ||
| input.getString("family_name"), | ||
| "Encountered DeleteFromColumn mutation with null 'family_name' property.")); | ||
|
|
||
| // if start or end timestamp provided | ||
| // Timestamp Range (optional, assuming Long type in Row schema) | ||
|
|
@@ -322,11 +320,10 @@ public PCollection<KV<ByteString, Iterable<Mutation>>> changeMutationInput( | |
| Mutation.newBuilder() | ||
| .setDeleteFromFamily( | ||
| Mutation.DeleteFromFamily.newBuilder() | ||
| .setFamilyNameBytes( | ||
| ByteString.copyFrom( | ||
| Preconditions.checkStateNotNull( | ||
| input.getBytes("family_name"), | ||
| "Encountered DeleteFromFamily mutation with null 'family_name' property."))) | ||
| .setFamilyName( | ||
| Preconditions.checkStateNotNull( | ||
| input.getString("family_name"), | ||
| "Encountered DeleteFromFamily mutation with null 'family_name' property.")) | ||
| .build()) | ||
| .build(); | ||
| break; | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.