Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ class BatchLoads<DestinationT, ElementT>
// the table, even if there is no data in it.
private final boolean singletonTable;
private final DynamicDestinations<?, DestinationT> dynamicDestinations;

/**
* destinationsWithMatching wraps the dynamicDestinations redirects the schema, partitioning, etc
* to the final destination tables, if the final destination table exists already (and we're
* appending to it). It is used in writing to temp tables and updating final table schema.
*/
private DynamicDestinations<?, DestinationT> destinationsWithMatching;

private final Coder<DestinationT> destinationCoder;
private int maxNumWritersPerBundle;
private long maxFileSize;
Expand Down Expand Up @@ -179,6 +187,9 @@ class BatchLoads<DestinationT, ElementT>
this.createDisposition = createDisposition;
this.singletonTable = singletonTable;
this.dynamicDestinations = dynamicDestinations;
this.destinationsWithMatching =
DynamicDestinationsHelpers.matchTableDynamicDestinations(
dynamicDestinations, bigQueryServices);
this.destinationCoder = destinationCoder;
this.maxNumWritersPerBundle = DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE;
this.maxFileSize = DEFAULT_MAX_FILE_SIZE;
Expand All @@ -201,6 +212,15 @@ class BatchLoads<DestinationT, ElementT>

void setSchemaUpdateOptions(Set<SchemaUpdateOption> schemaUpdateOptions) {
this.schemaUpdateOptions = schemaUpdateOptions;
// In the case schemaUpdateOptions are specified by the user, do not wrap dynamicDestinations
// to respect those options.
if (schemaUpdateOptions != null && !schemaUpdateOptions.isEmpty()) {
this.destinationsWithMatching = dynamicDestinations;
} else {
this.destinationsWithMatching =
DynamicDestinationsHelpers.matchTableDynamicDestinations(
dynamicDestinations, bigQueryServices);
}
}

void setTestServices(BigQueryServices bigQueryServices) {
Expand Down Expand Up @@ -287,6 +307,8 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
final PCollectionView<String> loadJobIdPrefixView = createJobIdPrefixView(p, JobType.LOAD);
final PCollectionView<String> tempLoadJobIdPrefixView =
createJobIdPrefixView(p, JobType.TEMP_TABLE_LOAD);
final PCollectionView<String> zeroLoadJobIdPrefixView =
createJobIdPrefixView(p, JobType.SCHEMA_UPDATE);
final PCollectionView<String> copyJobIdPrefixView = createJobIdPrefixView(p, JobType.COPY);
final PCollectionView<String> tempFilePrefixView =
createTempFilePrefixView(p, loadJobIdPrefixView);
Expand Down Expand Up @@ -367,7 +389,7 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView);

List<PCollectionView<?>> sideInputsForUpdateSchema =
Lists.newArrayList(tempLoadJobIdPrefixView);
Lists.newArrayList(zeroLoadJobIdPrefixView);
sideInputsForUpdateSchema.addAll(dynamicDestinations.getSideInputs());

PCollection<TableDestination> successfulMultiPartitionWrites =
Expand All @@ -385,14 +407,14 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
ParDo.of(
new UpdateSchemaDestination<DestinationT>(
bigQueryServices,
tempLoadJobIdPrefixView,
zeroLoadJobIdPrefixView,
loadJobProjectId,
WriteDisposition.WRITE_APPEND,
CreateDisposition.CREATE_NEVER,
maxRetryJobs,
kmsKey,
schemaUpdateOptions,
dynamicDestinations))
destinationsWithMatching))
.withSideInputs(sideInputsForUpdateSchema))
.apply(
"WriteRenameTriggered",
Expand Down Expand Up @@ -426,6 +448,8 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
final PCollectionView<String> loadJobIdPrefixView = createJobIdPrefixView(p, JobType.LOAD);
final PCollectionView<String> tempLoadJobIdPrefixView =
createJobIdPrefixView(p, JobType.TEMP_TABLE_LOAD);
final PCollectionView<String> zeroLoadJobIdPrefixView =
createJobIdPrefixView(p, JobType.SCHEMA_UPDATE);
final PCollectionView<String> copyJobIdPrefixView = createJobIdPrefixView(p, JobType.COPY);
final PCollectionView<String> tempFilePrefixView =
createTempFilePrefixView(p, loadJobIdPrefixView);
Expand Down Expand Up @@ -471,7 +495,7 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
writeSinglePartition(partitions.get(singlePartitionTag), loadJobIdPrefixView);

List<PCollectionView<?>> sideInputsForUpdateSchema =
Lists.newArrayList(tempLoadJobIdPrefixView);
Lists.newArrayList(zeroLoadJobIdPrefixView);
sideInputsForUpdateSchema.addAll(dynamicDestinations.getSideInputs());

PCollection<TableDestination> successfulMultiPartitionWrites =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to have a GBK after writeTempTables? Re your comment on the issue we'd end up with a PCollection<KV<DestinationT, Iterable<WriteTables.Result>>>. Also would protect writeTempTables against retry if UpdateSchemaDestination fails

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this would need some changes in WriteRename as well, perhaps this could be an improvement in a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after leaving that comment I find using a gbk may break the use case when the final destination table gets updated more than once, e.g. in streaming file load. Of course careful windowing we can avoid it, however I did not look deep into it at this moment. So far I leave the overall structure of the pipeline unchanged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using a gbk may break the use case when the final destination table gets updated more than once, e.g. in streaming file load

Ideally, this check would prevent that if it worked as intended:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah agree, ideally. Just not confident enough and want to keep the change limited to what necessary to fix bug (though change is already not minor)

Expand All @@ -481,14 +505,14 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
ParDo.of(
new UpdateSchemaDestination<DestinationT>(
bigQueryServices,
tempLoadJobIdPrefixView,
zeroLoadJobIdPrefixView,
loadJobProjectId,
WriteDisposition.WRITE_APPEND,
CreateDisposition.CREATE_NEVER,
maxRetryJobs,
kmsKey,
schemaUpdateOptions,
dynamicDestinations))
destinationsWithMatching))
.withSideInputs(sideInputsForUpdateSchema))
.apply(
"WriteRenameUntriggered",
Expand Down Expand Up @@ -728,18 +752,6 @@ private PCollection<KV<DestinationT, WriteTables.Result>> writeTempTables(
ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
WritePartition.ResultCoder.INSTANCE);

// If the final destination table exists already (and we're appending to it), then the temp
// tables must exactly match schema, partitioning, etc. Wrap the DynamicDestinations object
// with one that makes this happen.
// In the case schemaUpdateOptions are specified by the user, matching does not occur in order
// to respect those options.
DynamicDestinations<?, DestinationT> destinationsWithMatching = dynamicDestinations;
if (schemaUpdateOptions.isEmpty()) {
destinationsWithMatching =
DynamicDestinationsHelpers.matchTableDynamicDestinations(
dynamicDestinations, bigQueryServices);
}

// If WriteBundlesToFiles produced more than DEFAULT_MAX_FILES_PER_PARTITION files or
// DEFAULT_MAX_BYTES_PER_PARTITION bytes, then
// the import needs to be split into multiple partitions, and those partitions will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public enum JobType {
COPY,
EXPORT,
QUERY,
SCHEMA_UPDATE,
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand All @@ -41,9 +41,23 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Update destination schema based on data that is about to be copied into it.
*
* <p>Unlike load and query jobs, BigQuery copy jobs do not support schema field addition or
* relaxation on the destination table. This DoFn fills that gap by updating the destination table
* schemas to be compatible with the data coming from the source table so that schemaUpdateOptions
* are respected regardless of whether data is loaded directly to the destination table or loaded
* into temporary tables before being copied into the destination.
*
* <p>This transform takes as input a list of KV(destination, WriteTables.Result) and emits a list
* of KV(TableDestination, WriteTables.Result) where the destination label is parsed and replaced to
* TableDestination objects.
*/
@SuppressWarnings({"nullness"})
public class UpdateSchemaDestination<DestinationT>
extends DoFn<
Expand All @@ -52,8 +66,8 @@ public class UpdateSchemaDestination<DestinationT>

private static final Logger LOG = LoggerFactory.getLogger(UpdateSchemaDestination.class);
private final BigQueryServices bqServices;
private final PCollectionView<String> loadJobIdPrefixView;
private final ValueProvider<String> loadJobProjectId;
private final PCollectionView<String> zeroLoadJobIdPrefixView;
private final @Nullable ValueProvider<String> loadJobProjectId;
private transient @Nullable DatasetService datasetService;
private final int maxRetryJobs;
private final @Nullable String kmsKey;
Expand All @@ -78,11 +92,11 @@ public PendingJobData(
}
}

private final List<UpdateSchemaDestination.PendingJobData> pendingJobs = Lists.newArrayList();
private final Map<DestinationT, PendingJobData> pendingJobs = Maps.newHashMap();

public UpdateSchemaDestination(
BigQueryServices bqServices,
PCollectionView<String> loadJobIdPrefixView,
PCollectionView<String> zeroLoadJobIdPrefixView,
@Nullable ValueProvider<String> loadJobProjectId,
BigQueryIO.Write.WriteDisposition writeDisposition,
BigQueryIO.Write.CreateDisposition createDisposition,
Expand All @@ -91,7 +105,7 @@ public UpdateSchemaDestination(
Set<BigQueryIO.Write.SchemaUpdateOption> schemaUpdateOptions,
DynamicDestinations<?, DestinationT> dynamicDestinations) {
this.loadJobProjectId = loadJobProjectId;
this.loadJobIdPrefixView = loadJobIdPrefixView;
this.zeroLoadJobIdPrefixView = zeroLoadJobIdPrefixView;
this.bqServices = bqServices;
this.maxRetryJobs = maxRetryJobs;
this.kmsKey = kmsKey;
Expand All @@ -106,7 +120,13 @@ public void startBundle(StartBundleContext c) {
pendingJobs.clear();
}

TableDestination getTableWithDefaultProject(DestinationT destination, BigQueryOptions options) {
TableDestination getTableWithDefaultProject(DestinationT destination) {
if (dynamicDestinations.getPipelineOptions() == null) {
throw new IllegalStateException(
"Unexpected null pipeline option for DynamicDestination object. "
+ "Need to call setSideInputAccessorFromProcessContext(context) before use it.");
}
BigQueryOptions options = dynamicDestinations.getPipelineOptions().as(BigQueryOptions.class);
TableDestination tableDestination = dynamicDestinations.getTable(destination);
TableReference tableReference = tableDestination.getTableReference();

Expand All @@ -127,25 +147,24 @@ public void processElement(
ProcessContext context,
BoundedWindow window)
throws IOException {
DestinationT destination = null;
BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
dynamicDestinations.setSideInputAccessorFromProcessContext(context);
List<KV<TableDestination, WriteTables.Result>> outputs = Lists.newArrayList();
for (KV<DestinationT, WriteTables.Result> entry : element) {
destination = entry.getKey();
if (destination != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I neglected the handling of null destination in the original implementation. I do not see what kind of scenario would have null destination (unless user provided DynamicDestination.getDestination returns a null, but the documentation says it may not return null). Nevertheless, the upstream WriteTempTables essentially has this processElement calling dynamicDestinations.getTable(destination) for all incoming elements:

TableDestination tableDestination = dynamicDestinations.getTable(destination);

and asking it return a nonNull tableDestination. (same call used in UpdateSchemaDestination). So make the behavior consistent to WriteTables here

break;
DestinationT destination = entry.getKey();
TableDestination tableDestination = getTableWithDefaultProject(destination);
outputs.add(KV.of(tableDestination, entry.getValue()));
if (pendingJobs.containsKey(destination)) {
// zero load job for this destination is already set
continue;
}
}
if (destination != null) {
TableDestination tableDestination = getTableWithDefaultProject(destination, options);
TableSchema schema = dynamicDestinations.getSchema(destination);
TableReference tableReference = tableDestination.getTableReference();
String jobIdPrefix =
BigQueryResourceNaming.createJobIdWithDestination(
context.sideInput(loadJobIdPrefixView),
context.sideInput(zeroLoadJobIdPrefixView),
tableDestination,
1,
context.pane().getIndex());
jobIdPrefix += "_schemaUpdateDestination";
BigQueryHelpers.PendingJob updateSchemaDestinationJob =
startZeroLoadJob(
getJobService(context.getPipelineOptions().as(BigQueryOptions.class)),
Expand All @@ -159,15 +178,17 @@ public void processElement(
createDisposition,
schemaUpdateOptions);
if (updateSchemaDestinationJob != null) {
pendingJobs.add(new PendingJobData(updateSchemaDestinationJob, tableDestination, window));
pendingJobs.put(
destination, new PendingJobData(updateSchemaDestinationJob, tableDestination, window));
}
}
List<KV<TableDestination, WriteTables.Result>> tableDestinations = new ArrayList<>();
for (KV<DestinationT, WriteTables.Result> entry : element) {
tableDestinations.add(
KV.of(getTableWithDefaultProject(destination, options), entry.getValue()));
if (!pendingJobs.isEmpty()) {
LOG.info(
"Added {} pending jobs to update the schema for each destination before copying {} temp tables.",
pendingJobs.size(),
outputs.size());
}
context.output(tableDestinations);
context.output(outputs);
}

@Teardown
Expand All @@ -191,7 +212,7 @@ public void finishBundle(FinishBundleContext context) throws Exception {
DatasetService datasetService =
getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class));
BigQueryHelpers.PendingJobManager jobManager = new BigQueryHelpers.PendingJobManager();
for (final PendingJobData pendingJobData : pendingJobs) {
for (final PendingJobData pendingJobData : pendingJobs.values()) {
jobManager =
jobManager.addPendingJob(
pendingJobData.retryJob,
Expand All @@ -204,10 +225,10 @@ public void finishBundle(FinishBundleContext context) throws Exception {
.setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())),
pendingJobData.tableDestination.getTableDescription());
}
return null;
} catch (IOException | InterruptedException e) {
return e;
}
return null;
});
}
jobManager.waitForDone();
Expand Down Expand Up @@ -337,15 +358,14 @@ private BigQueryHelpers.PendingJob startZeroLoadJob(
return retryJob;
}

private BigQueryServices.JobService getJobService(PipelineOptions pipelineOptions)
throws IOException {
private BigQueryServices.JobService getJobService(PipelineOptions pipelineOptions) {
if (jobService == null) {
jobService = bqServices.getJobService(pipelineOptions.as(BigQueryOptions.class));
}
return jobService;
}

private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
private DatasetService getDatasetService(PipelineOptions pipelineOptions) {
if (datasetService == null) {
datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
}
Expand Down
Loading