Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableRow;
import com.google.auto.value.AutoValue;
import com.google.auto.value.extension.memoized.Memoized;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
Expand All @@ -28,35 +35,111 @@
* StorageApiWritesShardedRecords} to enapsulate a destination {@link TableSchema} along with a
* {@link BigQueryServices.StreamAppendClient} and other objects needed to write records.
*/
class AppendClientInfo {
@Nullable BigQueryServices.StreamAppendClient streamAppendClient;
@Nullable TableSchema tableSchema;
Consumer<BigQueryServices.StreamAppendClient> closeAppendClient;
Descriptors.Descriptor descriptor;
@AutoValue
abstract class AppendClientInfo {
abstract @Nullable BigQueryServices.StreamAppendClient getStreamAppendClient();

public AppendClientInfo(
abstract TableSchema getTableSchema();

abstract Consumer<BigQueryServices.StreamAppendClient> getCloseAppendClient();

abstract com.google.api.services.bigquery.model.TableSchema getJsonTableSchema();

abstract TableRowToStorageApiProto.SchemaInformation getSchemaInformation();

abstract Descriptors.Descriptor getDescriptor();

@AutoValue.Builder
abstract static class Builder {
abstract Builder setStreamAppendClient(@Nullable BigQueryServices.StreamAppendClient value);

abstract Builder setTableSchema(TableSchema value);

abstract Builder setCloseAppendClient(Consumer<BigQueryServices.StreamAppendClient> value);

abstract Builder setJsonTableSchema(com.google.api.services.bigquery.model.TableSchema value);

abstract Builder setSchemaInformation(TableRowToStorageApiProto.SchemaInformation value);

abstract Builder setDescriptor(Descriptors.Descriptor value);

abstract AppendClientInfo build();
};

abstract Builder toBuilder();

static AppendClientInfo of(
TableSchema tableSchema, Consumer<BigQueryServices.StreamAppendClient> closeAppendClient)
throws Exception {
this.tableSchema = tableSchema;
this.descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema, true);
this.closeAppendClient = closeAppendClient;
return new AutoValue_AppendClientInfo.Builder()
.setTableSchema(tableSchema)
.setCloseAppendClient(closeAppendClient)
.setJsonTableSchema(TableRowToStorageApiProto.protoSchemaToTableSchema(tableSchema))
.setSchemaInformation(
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema))
.setDescriptor(TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema, true))
.build();
}

public AppendClientInfo createAppendClient(
public AppendClientInfo withNoAppendClient() {
return toBuilder().setStreamAppendClient(null).build();
}

public AppendClientInfo withAppendClient(
BigQueryServices.DatasetService datasetService,
Supplier<String> getStreamName,
boolean useConnectionPool)
throws Exception {
if (streamAppendClient == null) {
this.streamAppendClient =
datasetService.getStreamAppendClient(getStreamName.get(), descriptor, useConnectionPool);
if (getStreamAppendClient() != null) {
return this;
} else {
return toBuilder()
.setStreamAppendClient(
datasetService.getStreamAppendClient(
getStreamName.get(), getDescriptor(), useConnectionPool))
.build();
}
return this;
}

public void close() {
if (streamAppendClient != null) {
closeAppendClient.accept(streamAppendClient);
BigQueryServices.StreamAppendClient client = getStreamAppendClient();
if (client != null) {
getCloseAppendClient().accept(client);
}
}

boolean hasSchemaChanged(TableSchema updatedTableSchema) {
return updatedTableSchema.hashCode() != getTableSchema().hashCode();
}

public ByteString encodeUnknownFields(TableRow unknown, boolean ignoreUnknownValues)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of ignoreUnknonwValues to be false, this will be a void operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No - these are fields that are unknown to the prior step. They make actually end up being known to the current step due to the updated schema.

throws TableRowToStorageApiProto.SchemaConversionException {
Message msg =
TableRowToStorageApiProto.messageFromTableRow(
getSchemaInformation(),
getDescriptorIgnoreRequired(),
unknown,
ignoreUnknownValues,
true,
null);
return msg.toByteString();
}

@Memoized
Descriptors.Descriptor getDescriptorIgnoreRequired() {
try {
return TableRowToStorageApiProto.getDescriptorFromTableSchema(getTableSchema(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public TableRow toTableRow(ByteString protoBytes) {
try {
return TableRowToStorageApiProto.tableRowFromMessage(
DynamicMessage.parseFrom(getDescriptor(), protoBytes));
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2878,6 +2878,15 @@ public WriteResult expand(PCollection<T> input) {
"withAutoSchemaUpdate only supported when using storage-api writes.");
}

if (getAutoSchemaUpdate()) {
// TODO(reuvenlax): Remove this restriction once we implement support.
checkArgument(
getIgnoreUnknownValues(),
"Auto schema update currently only supported when ignoreUnknownValues also set.");
checkArgument(
!getUseBeamSchema(), "Auto schema update not supported when using Beam schemas.");
}

if (method != Write.Method.FILE_LOADS) {
// we only support writing avro for FILE_LOADS
checkArgument(
Expand Down Expand Up @@ -3172,11 +3181,12 @@ private <DestinationT> WriteResult continueExpandTyped(
dynamicDestinations,
tableRowWriterFactory.getToRowFn(),
getCreateDisposition(),
getIgnoreUnknownValues());
getIgnoreUnknownValues(),
getAutoSchemaUpdate());
}

StorageApiLoads<DestinationT, T> storageApiLoads =
new StorageApiLoads<DestinationT, T>(
new StorageApiLoads<>(
destinationCoder,
storageApiDynamicDestinations,
getCreateDisposition(),
Expand All @@ -3185,7 +3195,9 @@ private <DestinationT> WriteResult continueExpandTyped(
getBigQueryServices(),
getStorageApiNumStreams(bqOptions),
method == Method.STORAGE_API_AT_LEAST_ONCE,
getAutoSharding());
getAutoSharding(),
getAutoSchemaUpdate(),
getIgnoreUnknownValues());
return input.apply("StorageApiLoads", storageApiLoads);
} else {
throw new RuntimeException("Unexpected write method " + method);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.protobuf.Descriptors.Descriptor;
import java.io.IOException;
Expand Down Expand Up @@ -204,6 +205,9 @@ Table patchTableDescription(TableReference tableReference, @Nullable String tabl
WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
throws IOException, InterruptedException;

@Nullable
WriteStream getWriteStream(String writeStream);

/**
* Create an append client for a given Storage API write stream. The stream must be created
* first.
Expand All @@ -230,6 +234,10 @@ interface StreamAppendClient extends AutoCloseable {
/** Append rows to a Storage API write stream at the given offset. */
ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows) throws Exception;

/** If the table schema has been updated, returns the new schema. Otherwise returns null. */
@Nullable
TableSchema getUpdatedSchema();

/**
* If the previous call to appendRows blocked due to flow control, returns how long the call
* blocked for.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
Expand Down Expand Up @@ -1309,6 +1310,11 @@ public WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
.build());
}

@Override
public @Nullable WriteStream getWriteStream(String writeStream) {
return newWriteClient.getWriteStream(writeStream);
}

@Override
public StreamAppendClient getStreamAppendClient(
String streamName, Descriptor descriptor, boolean useConnectionPool) throws Exception {
Expand Down Expand Up @@ -1378,6 +1384,11 @@ public ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows)
return streamWriter.append(rows, offset);
}

@Override
public TableSchema getUpdatedSchema() {
return streamWriter.getUpdatedSchema();
}

@Override
public long getInflightWaitSeconds() {
return streamWriter.getInflightWaitSeconds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,9 @@
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.value.AutoValue;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.Descriptors.FieldDescriptor.Type;
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
Expand Down Expand Up @@ -74,7 +70,6 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -1036,28 +1031,4 @@ private static Object convertAvroNumeric(Object value) {
public static ServiceCallMetric writeCallMetric(TableReference tableReference) {
return callMetricForMethod(tableReference, "BigQueryBatchWrite");
}

/**
* Hashes a schema descriptor using a deterministic hash function.
*
* <p>Warning! These hashes are encoded into messages, so changing this function will cause
* pipelines to get stuck on update!
*/
public static long hashSchemaDescriptorDeterministic(Descriptor descriptor) {
long hashCode = 0;
for (FieldDescriptor fieldDescriptor : descriptor.getFields()) {
hashCode +=
Hashing.murmur3_32()
.hashString(fieldDescriptor.getName(), StandardCharsets.UTF_8)
.asInt();
hashCode += Hashing.murmur3_32().hashInt(fieldDescriptor.isRepeated() ? 1 : 0).asInt();
hashCode += Hashing.murmur3_32().hashInt(fieldDescriptor.isRequired() ? 1 : 0).asInt();
Type type = fieldDescriptor.getType();
hashCode += Hashing.murmur3_32().hashInt(type.ordinal()).asInt();
if (type.equals(Type.MESSAGE)) {
hashCode += hashSchemaDescriptorDeterministic(fieldDescriptor.getMessageType());
}
}
return hashCode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,50 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.protobuf.ByteString;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.annotation.Nullable;

/**
* Takes in an iterable and batches the results into multiple ProtoRows objects. The splitSize
* parameter controls how many rows are batched into a single ProtoRows object before we move on to
* the next one.
*/
class SplittingIterable implements Iterable<ProtoRows> {
interface ConvertUnknownFields {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I somehow couldn't find the implementation of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a Java functional interface - any matching lambda will conform. e.g. you can pass in (tableRow, ignore) -> {} and this will conform to the interface

ByteString convert(TableRow tableRow, boolean ignoreUnknownValues)
throws TableRowToStorageApiProto.SchemaConversionException;
}

private final Iterable<StorageApiWritePayload> underlying;
private final long splitSize;

public SplittingIterable(Iterable<StorageApiWritePayload> underlying, long splitSize) {
private final ConvertUnknownFields unknownFieldsToMessage;
private final Function<ByteString, TableRow> protoToTableRow;
private final BiConsumer<TableRow, String> failedRowsConsumer;
private final boolean autoUpdateSchema;
private final boolean ignoreUnknownValues;

public SplittingIterable(
Iterable<StorageApiWritePayload> underlying,
long splitSize,
ConvertUnknownFields unknownFieldsToMessage,
Function<ByteString, TableRow> protoToTableRow,
BiConsumer<TableRow, String> failedRowsConsumer,
boolean autoUpdateSchema,
boolean ignoreUnknownValues) {
this.underlying = underlying;
this.splitSize = splitSize;
this.unknownFieldsToMessage = unknownFieldsToMessage;
this.protoToTableRow = protoToTableRow;
this.failedRowsConsumer = failedRowsConsumer;
this.autoUpdateSchema = autoUpdateSchema;
this.ignoreUnknownValues = ignoreUnknownValues;
}

@Override
Expand All @@ -57,7 +84,37 @@ public ProtoRows next() {
while (underlyingIterator.hasNext()) {
StorageApiWritePayload payload = underlyingIterator.next();
ByteString byteString = ByteString.copyFrom(payload.getPayload());

if (autoUpdateSchema) {
try {
@Nullable TableRow unknownFields = payload.getUnknownFields();
if (unknownFields != null) {
// Protocol buffer serialization format supports concatenation. We serialize any new
// "known" fields
// into a proto and concatenate to the existing proto.
try {
byteString =
byteString.concat(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I missed something, so every time we will concat the unknown fields with the existing byte string? Then what's the difference between this and passing down the entire message? Maybe the unknownFieldsToMessage will filter something out? But I don't see it has a schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The prior convert message only includes fields known to it in the proto it generates. It can't include fields it doesn't know about as they would have to be in the proto descriptor (and it can't use the proto's unknown field set as that requires field ids, which is not known yet).

Therefore the incoming byteString contains only fields that were known to the convert stage, and all other fields are put into the json unknownFields object. What we are doing here is taking advantage of the fact that the write step has a more up-to-date view on the schema, so we walk over the unknownFields json and extract whatever fields are now known (which might still be only a subset of the remaining fields). We then convert those unknownFields to a proto, and concatenate the two protos.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not an expert in modern Java... Where is the implementation of unknownFieldsToMessage.convert? How could only convert the "known" unknown fields?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, found it, should be encodeUnknownFields

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, code looks good here.

unknownFieldsToMessage.convert(unknownFields, ignoreUnknownValues));
} catch (TableRowToStorageApiProto.SchemaConversionException e) {
// This generally implies that ignoreUnknownValues=false and there were still
// unknown values here.
// Reconstitute the TableRow and send it to the failed-rows consumer.
TableRow tableRow = protoToTableRow.apply(byteString);
// TODO(24926, reuvenlax): We need to merge the unknown fields in! Currently we
// only execute this
// codepath when ignoreUnknownFields==true, so we should never hit this codepath.
// However once
// 24926 is fixed, we need to merge the unknownFields back into the main row
// before outputting to the
// failed-rows consumer.
failedRowsConsumer.accept(tableRow, e.toString());
continue;
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
inserts.addSerializedRows(byteString);
bytesSize += byteString.size();
if (bytesSize > splitSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public interface MessageConverter<T> {

StorageApiWritePayload toMessage(T element) throws Exception;

StorageApiWritePayload toMessage(TableRow tableRow, boolean respectRequired) throws Exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this relevant to this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't simply take the unknownFields and convert them to a proto, as there may be missing required fields (because those fields are in the original proto). We need a way to do the conversion without enforcing nullability.


TableRow toTableRow(T element);
}

Expand Down
Loading