Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
ce14b96
managed bigqueryio
ahmedabu98 Jun 3, 2024
550c1b4
spotless
ahmedabu98 Jun 4, 2024
c94de3c
move managed dependency to test only
ahmedabu98 Jun 4, 2024
912dc08
Merge branch 'master' of https://github.com/ahmedabu98/beam into mana…
ahmedabu98 Jun 5, 2024
f436e62
cleanup after merging snake_case PR
ahmedabu98 Jun 5, 2024
fe60904
choose write method based on boundedness and pipeline options
ahmedabu98 Jul 9, 2024
7d405cf
Merge branch 'master' of https://github.com/ahmedabu98/beam into mana…
ahmedabu98 Jul 9, 2024
d45159f
rename bigquery write config class
ahmedabu98 Jul 9, 2024
989ad0f
spotless
ahmedabu98 Jul 9, 2024
b9b49e7
change read output tag to 'output'
ahmedabu98 Jul 9, 2024
a119bbc
spotless
ahmedabu98 Jul 9, 2024
74bc178
revert logic that depends on DataflowServiceOptions. switching BQ met…
ahmedabu98 Jul 16, 2024
528b504
spotless
ahmedabu98 Jul 16, 2024
dcc398a
fix typo
ahmedabu98 Jul 29, 2024
36edc38
separate BQ write config to a new class
ahmedabu98 Aug 6, 2024
f9be86c
fix doc
ahmedabu98 Aug 6, 2024
bd1e534
Merge branch 'master' of https://github.com/ahmedabu98/beam into mana…
ahmedabu98 Oct 25, 2024
a26765e
resolve after syncing to HEAD
ahmedabu98 Oct 25, 2024
725f7bd
spotless
ahmedabu98 Oct 26, 2024
2631104
fork on batch/streaming
ahmedabu98 Nov 5, 2024
770cf50
cleanup
ahmedabu98 Nov 5, 2024
0a70466
spotless
ahmedabu98 Nov 5, 2024
255c9de
portable bigquery destinations
ahmedabu98 Nov 5, 2024
01a01f7
move forking logic to BQ schematransform side
ahmedabu98 Nov 6, 2024
697c0b8
add file loads translation and tests; add test checks that the correc…
ahmedabu98 Nov 7, 2024
105474b
set top-level wrapper to be the underlying managed BQ transform urn; …
ahmedabu98 Nov 8, 2024
d6b9e69
move unit tests to respectvie schematransform test classes
ahmedabu98 Nov 8, 2024
c0767d7
Merge branch 'master' of https://github.com/ahmedabu98/beam into mana…
ahmedabu98 Nov 8, 2024
a600f62
Merge branch 'master' of https://github.com/ahmedabu98/beam into mana…
ahmedabu98 Nov 8, 2024
ad4dcd9
expose to Python SDK as well
ahmedabu98 Nov 11, 2024
698879e
Merge branch 'managed_bigquery' of https://github.com/ahmedabu98/beam…
ahmedabu98 Nov 11, 2024
c8ad51b
Merge branch 'master' of https://github.com/ahmedabu98/beam into mana…
ahmedabu98 Nov 12, 2024
025a999
cleanup
ahmedabu98 Nov 12, 2024
0c968f0
address comment
ahmedabu98 Nov 13, 2024
0760180
set enable_streaming_engine option; add to CHANGES
ahmedabu98 Nov 13, 2024
432a070
Merge branch 'master' into managed_bq_portable_dest
ahmedabu98 Nov 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688))
* BigQuery CDC writes are now available in Python SDK, only supported when using StorageWrite API at least once mode ([#32527](https://github.com/apache/beam/issues/32527))
* [Managed Iceberg] Allow updating table partition specs during pipeline runtime ([#32879](https://github.com/apache/beam/pull/32879))
* Added BigQueryIO as a Managed IO ([#31486](https://github.com/apache/beam/pull/31486))
* Support for writing to [Solace messages queues](https://solace.com/) (`SolaceIO.Write`) added (Java) ([#31905](https://github.com/apache/beam/issues/31905)).

## New Features / Improvements
Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ task integrationTest(type: Test, dependsOn: processTestResources) {
"--runner=DirectRunner",
"--project=${gcpProject}",
"--tempRoot=${gcpTempRoot}",
"--tempLocation=${gcpTempRoot}",
"--firestoreDb=${firestoreDb}",
"--firestoreHost=${firestoreHost}",
"--bigtableChangeStreamInstanceId=${bigtableChangeStreamInstanceId}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
Expand Down Expand Up @@ -89,24 +90,27 @@ public static class BigQueryFileLoadsSchemaTransform extends SchemaTransform {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
PCollection<Row> rowPCollection = input.getSinglePCollection();
BigQueryIO.Write<Row> write = toWrite(input.getPipeline().getOptions());
BigQueryIO.Write<Row> write =
toWrite(rowPCollection.getSchema(), input.getPipeline().getOptions());
rowPCollection.apply(write);

return PCollectionRowTuple.empty(input.getPipeline());
}

BigQueryIO.Write<Row> toWrite(PipelineOptions options) {
BigQueryIO.Write<Row> toWrite(Schema schema, PipelineOptions options) {
PortableBigQueryDestinations dynamicDestinations =
new PortableBigQueryDestinations(schema, configuration);
BigQueryIO.Write<Row> write =
BigQueryIO.<Row>write()
.to(configuration.getTable())
.to(dynamicDestinations)
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withFormatFunction(BigQueryUtils.toTableRow())
// TODO(https://github.com/apache/beam/issues/33074) BatchLoad's
// createTempFilePrefixView() doesn't pick up the pipeline option
.withCustomGcsTempLocation(
ValueProvider.StaticValueProvider.of(options.getTempLocation()))
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.useBeamSchema();
.withFormatFunction(dynamicDestinations.getFilterFormatFunction(false));

if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
CreateDisposition createDisposition =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,22 @@
package org.apache.beam.sdk.io.gcp.bigquery.providers;

import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration.DYNAMIC_DESTINATIONS;
import static org.apache.beam.sdk.io.gcp.bigquery.providers.PortableBigQueryDestinations.DESTINATION;
import static org.apache.beam.sdk.io.gcp.bigquery.providers.PortableBigQueryDestinations.RECORD;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.api.services.bigquery.model.TableConstraints;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.service.AutoService;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageApiInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.RowMutationInformation;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
Expand All @@ -54,7 +51,6 @@
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.joda.time.Duration;
Expand All @@ -80,6 +76,7 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
private static final String FAILED_ROWS_TAG = "FailedRows";
private static final String FAILED_ROWS_WITH_ERRORS_TAG = "FailedRowsWithErrors";
// magic string that tells us to write to dynamic destinations
protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS";
protected static final String ROW_PROPERTY_MUTATION_INFO = "row_mutation_info";
protected static final String ROW_PROPERTY_MUTATION_TYPE = "mutation_type";
protected static final String ROW_PROPERTY_MUTATION_SQN = "change_sequence_number";
Expand Down Expand Up @@ -176,52 +173,6 @@ private static class NoOutputDoFn<T> extends DoFn<T, Row> {
public void process(ProcessContext c) {}
}

private static class RowDynamicDestinations extends DynamicDestinations<Row, String> {
final Schema schema;
final String fixedDestination;
final List<String> primaryKey;

RowDynamicDestinations(Schema schema) {
this.schema = schema;
this.fixedDestination = null;
this.primaryKey = null;
}

public RowDynamicDestinations(
Schema schema, String fixedDestination, List<String> primaryKey) {
this.schema = schema;
this.fixedDestination = fixedDestination;
this.primaryKey = primaryKey;
}

@Override
public String getDestination(ValueInSingleWindow<Row> element) {
return Optional.ofNullable(fixedDestination)
.orElseGet(() -> element.getValue().getString("destination"));
}

@Override
public TableDestination getTable(String destination) {
return new TableDestination(destination, null);
}

@Override
public TableSchema getSchema(String destination) {
return BigQueryUtils.toTableSchema(schema);
}

@Override
public TableConstraints getTableConstraints(String destination) {
return Optional.ofNullable(this.primaryKey)
.filter(pk -> !pk.isEmpty())
.map(
pk ->
new TableConstraints()
.setPrimaryKey(new TableConstraints.PrimaryKey().setColumns(pk)))
.orElse(null);
}
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
// Check that the input exists
Expand Down Expand Up @@ -309,13 +260,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}
}

void validateDynamicDestinationsExpectedSchema(Schema schema) {
checkArgument(
schema.getFieldNames().containsAll(Arrays.asList("destination", "record")),
"When writing to dynamic destinations, we expect Row Schema with a "
+ "\"destination\" string field and a \"record\" Row field.");
}

BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
Method writeMethod =
configuration.getUseAtLeastOnceSemantics() != null
Expand All @@ -326,21 +270,37 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
BigQueryIO.Write<Row> write =
BigQueryIO.<Row>write()
.withMethod(writeMethod)
.withFormatFunction(BigQueryUtils.toTableRow())
.withWriteDisposition(WriteDisposition.WRITE_APPEND);

// in case CDC writes are configured we validate and include them in the configuration
if (Optional.ofNullable(configuration.getUseCdcWrites()).orElse(false)) {
write = validateAndIncludeCDCInformation(write, schema);
} else if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
validateDynamicDestinationsExpectedSchema(schema);
Schema rowSchema = schema;
boolean fetchNestedRecord = false;
if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
validateDynamicDestinationsSchema(schema);
rowSchema = schema.getField(RECORD).getType().getRowSchema();
fetchNestedRecord = true;
}
if (Boolean.TRUE.equals(configuration.getUseCdcWrites())) {
validateCdcSchema(schema);
rowSchema = schema.getField(RECORD).getType().getRowSchema();
fetchNestedRecord = true;
write =
write
.to(new RowDynamicDestinations(schema.getField("record").getType().getRowSchema()))
.withFormatFunction(row -> BigQueryUtils.toTableRow(row.getRow("record")));
} else {
write = write.to(configuration.getTable()).useBeamSchema();
.withPrimaryKey(configuration.getPrimaryKey())
.withRowMutationInformationFn(
row ->
RowMutationInformation.of(
RowMutationInformation.MutationType.valueOf(
row.getRow(ROW_PROPERTY_MUTATION_INFO)
.getString(ROW_PROPERTY_MUTATION_TYPE)),
row.getRow(ROW_PROPERTY_MUTATION_INFO)
.getString(ROW_PROPERTY_MUTATION_SQN)));
}
PortableBigQueryDestinations dynamicDestinations =
new PortableBigQueryDestinations(rowSchema, configuration);
write =
write
.to(dynamicDestinations)
.withFormatFunction(dynamicDestinations.getFilterFormatFunction(fetchNestedRecord));

if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
CreateDisposition createDisposition =
Expand All @@ -363,19 +323,27 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
return write;
}

BigQueryIO.Write<Row> validateAndIncludeCDCInformation(
BigQueryIO.Write<Row> write, Schema schema) {
void validateDynamicDestinationsSchema(Schema schema) {
checkArgument(
schema.getFieldNames().containsAll(Arrays.asList(DESTINATION, RECORD)),
String.format(
"When writing to dynamic destinations, we expect Row Schema with a "
+ "\"%s\" string field and a \"%s\" Row field.",
DESTINATION, RECORD));
}

private void validateCdcSchema(Schema schema) {
checkArgument(
schema.getFieldNames().containsAll(Arrays.asList(ROW_PROPERTY_MUTATION_INFO, "record")),
schema.getFieldNames().containsAll(Arrays.asList(ROW_PROPERTY_MUTATION_INFO, RECORD)),
"When writing using CDC functionality, we expect Row Schema with a "
+ "\""
+ ROW_PROPERTY_MUTATION_INFO
+ "\" Row field and a \"record\" Row field.");

Schema rowSchema = schema.getField(ROW_PROPERTY_MUTATION_INFO).getType().getRowSchema();
Schema mutationSchema = schema.getField(ROW_PROPERTY_MUTATION_INFO).getType().getRowSchema();

checkArgument(
rowSchema.equals(ROW_SCHEMA_MUTATION_INFO),
mutationSchema != null && mutationSchema.equals(ROW_SCHEMA_MUTATION_INFO),
"When writing using CDC functionality, we expect a \""
+ ROW_PROPERTY_MUTATION_INFO
+ "\" field of Row type with schema:\n"
Expand All @@ -384,31 +352,7 @@ BigQueryIO.Write<Row> validateAndIncludeCDCInformation(
+ "Received \""
+ ROW_PROPERTY_MUTATION_INFO
+ "\" field with schema:\n"
+ rowSchema.toString());

String tableDestination = null;

if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
validateDynamicDestinationsExpectedSchema(schema);
} else {
tableDestination = configuration.getTable();
}

return write
.to(
new RowDynamicDestinations(
schema.getField("record").getType().getRowSchema(),
tableDestination,
configuration.getPrimaryKey()))
.withFormatFunction(row -> BigQueryUtils.toTableRow(row.getRow("record")))
.withPrimaryKey(configuration.getPrimaryKey())
.withRowMutationInformationFn(
row ->
RowMutationInformation.of(
RowMutationInformation.MutationType.valueOf(
row.getRow(ROW_PROPERTY_MUTATION_INFO)
.getString(ROW_PROPERTY_MUTATION_TYPE)),
row.getRow(ROW_PROPERTY_MUTATION_INFO).getString(ROW_PROPERTY_MUTATION_SQN)));
+ mutationSchema);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,18 @@
package org.apache.beam.sdk.io.gcp.bigquery.providers;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import com.google.auto.value.AutoValue;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Configuration for writing to BigQuery with SchemaTransforms. Used by {@link
Expand Down Expand Up @@ -68,11 +66,6 @@ public void validate() {
!Strings.isNullOrEmpty(this.getTable()),
invalidConfigMessage + "Table spec for a BigQuery Write must be specified.");

// if we have an input table spec, validate it
if (!this.getTable().equals(DYNAMIC_DESTINATIONS)) {
checkNotNull(BigQueryHelpers.parseTableSpec(this.getTable()));
}

// validate create and write dispositions
String createDisposition = getCreateDisposition();
if (createDisposition != null && !createDisposition.isEmpty()) {
Expand Down Expand Up @@ -186,6 +179,21 @@ public static Builder builder() {
@Nullable
public abstract List<String> getPrimaryKey();

@SchemaFieldDescription(
"A list of field names to keep in the input record. All other fields are dropped before writing. "
+ "Is mutually exclusive with 'drop' and 'only'.")
public abstract @Nullable List<String> getKeep();

@SchemaFieldDescription(
"A list of field names to drop from the input record before writing. "
+ "Is mutually exclusive with 'keep' and 'only'.")
public abstract @Nullable List<String> getDrop();

@SchemaFieldDescription(
"The name of a single record field that should be written. "
+ "Is mutually exclusive with 'keep' and 'drop'.")
public abstract @Nullable String getOnly();

/** Builder for {@link BigQueryWriteConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {
Expand All @@ -212,6 +220,12 @@ public abstract static class Builder {

public abstract Builder setPrimaryKey(List<String> pkColumns);

public abstract Builder setKeep(List<String> keep);

public abstract Builder setDrop(List<String> drop);

public abstract Builder setOnly(String only);

/** Builds a {@link BigQueryWriteConfiguration} instance. */
public abstract BigQueryWriteConfiguration build();
}
Expand Down
Loading