Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Java_DataflowV2.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)).
* [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231))

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
throws IOException, InterruptedException;

@Nullable
WriteStream getWriteStream(String writeStream);
TableSchema getWriteStreamSchema(String writeStream);

/**
* Create an append client for a given Storage API write stream. The stream must be created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
Expand All @@ -86,6 +87,7 @@
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.storage.v1.WriteStreamView;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
import com.google.protobuf.DescriptorProtos;
Expand Down Expand Up @@ -1418,8 +1420,15 @@ public WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
}

@Override
public @Nullable WriteStream getWriteStream(String writeStream) {
return newWriteClient.getWriteStream(writeStream);
public @Nullable TableSchema getWriteStreamSchema(String writeStream) {
@Nullable
WriteStream stream =
newWriteClient.getWriteStream(
GetWriteStreamRequest.newBuilder()
.setView(WriteStreamView.FULL)
.setName(writeStream)
.build());
return (stream != null && stream.hasTableSchema()) ? stream.getTableSchema() : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos;
Expand Down Expand Up @@ -475,15 +474,18 @@ SchemaAndDescriptor getCurrentTableSchema(String stream, @Nullable TableSchema u
() -> {
if (autoUpdateSchema) {
@Nullable
WriteStream writeStream =
TableSchema streamSchema =
Preconditions.checkStateNotNull(maybeWriteStreamService)
.getWriteStream(streamName);
if (writeStream != null && writeStream.hasTableSchema()) {
TableSchema updatedFromStream = writeStream.getTableSchema();
currentSchema.set(updatedFromStream);
updated.set(true);
LOG.debug(
"Fetched updated schema for table {}:\n\t{}", tableUrn, updatedFromStream);
.getWriteStreamSchema(streamName);
if (streamSchema != null) {
Optional<TableSchema> newSchema =
TableSchemaUpdateUtils.getUpdatedSchema(initialTableSchema, streamSchema);
if (newSchema.isPresent()) {
currentSchema.set(newSchema.get());
updated.set(true);
LOG.debug(
"Fetched updated schema for table {}:\n\t{}", tableUrn, newSchema.get());
}
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,28 @@ public void process(
element.getKey().getKey(), dynamicDestinations, datasetService);
tableSchema = converter.getTableSchema();
descriptor = converter.getDescriptor(false);

if (autoUpdateSchema) {
// A StreamWriter ignores table schema updates that happen prior to its creation.
// So before creating a StreamWriter below, we fetch the table schema to check if we
// missed an update.
// If so, use the new schema instead of the base schema
@Nullable
TableSchema streamSchema =
MoreObjects.firstNonNull(
writeStreamService.getWriteStreamSchema(getOrCreateStream.get()),
TableSchema.getDefaultInstance());
Optional<TableSchema> newSchema =
TableSchemaUpdateUtils.getUpdatedSchema(tableSchema, streamSchema);

if (newSchema.isPresent()) {
tableSchema = newSchema.get();
descriptor =
TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
tableSchema, true, false);
updatedSchema.write(tableSchema);
}
}
}
AppendClientInfo info =
AppendClientInfo.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,11 +590,11 @@ public WriteStream createWriteStream(String tableUrn, Type type) throws Interrup

@Override
@Nullable
public WriteStream getWriteStream(String streamName) {
public com.google.cloud.bigquery.storage.v1.TableSchema getWriteStreamSchema(String streamName) {
synchronized (FakeDatasetService.class) {
@Nullable Stream stream = writeStreams.get(streamName);
if (stream != null) {
return stream.toWriteStream();
return stream.toWriteStream().getTableSchema();
}
}
// TODO(relax): Return the exact error that BigQuery returns.
Expand Down
Loading
Loading