Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.runners.dataflow.util.OutputReference;
Expand All @@ -73,6 +74,7 @@
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.SpannerChangeStreamOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
Expand Down Expand Up @@ -405,6 +407,8 @@ public Job translate(List<DataflowPackage> packages) {
workerPool.setDiskSizeGb(options.getDiskSizeGb());
}
AutoscalingSettings settings = new AutoscalingSettings();
// TODO: Remove this once autoscaling is supported for SpannerIO.readChangeStream
assertSpannerChangeStreamsNoAutoScaling(options);
if (options.getAutoscalingAlgorithm() != null) {
settings.setAlgorithm(options.getAutoscalingAlgorithm().getAlgorithm());
}
Expand Down Expand Up @@ -604,6 +608,29 @@ public void leaveCompositeTransform(TransformHierarchy.Node node) {
return parents.peekFirst().toAppliedPTransform(getPipeline());
}
}

// TODO: Remove this once the autoscaling is supported for Spanner change streams
private void assertSpannerChangeStreamsNoAutoScaling(DataflowPipelineOptions options) {
if (isSpannerChangeStream(options) && !isAutoScalingAlgorithmNone(options)) {
throw new IllegalArgumentException(
"Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE.");
}
}

private boolean isSpannerChangeStream(DataflowPipelineOptions options) {
try {
final SpannerChangeStreamOptions spannerOptions =
options.as(SpannerChangeStreamOptions.class);
final String metadataTable = spannerOptions.getMetadataTable();
return metadataTable != null && !metadataTable.isEmpty();
} catch (Exception e) {
return false;
}
}

private boolean isAutoScalingAlgorithmNone(DataflowPipelineOptions options) {
return AutoscalingAlgorithmType.NONE.equals(options.getAutoscalingAlgorithm());
}
}

static class StepTranslator implements StepTranslationContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.runners.dataflow.util.PropertyNames;
Expand All @@ -84,6 +85,7 @@
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.SpannerChangeStreamOptions;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand Down Expand Up @@ -426,6 +428,71 @@ public void testMaxNumWorkersIsPassedWhenNoAlgorithmIsSet() throws IOException {
.intValue());
}

@Test
public void testSuccessWhenSpannerChangeStreamsAndAutoscalingEqualToNone() throws IOException {
final DataflowPipelineOptions options = buildPipelineOptions();
options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.NONE);
options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable");
final Pipeline p = buildPipeline(options);
final SdkComponents sdkComponents = createSdkComponents(options);
final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true);

final JobSpecification jobSpecification =
DataflowPipelineTranslator.fromOptions(options)
.translate(
p,
pipelineProto,
sdkComponents,
DataflowRunner.fromOptions(options),
Collections.emptyList());
assertNotNull(jobSpecification);
}

@Test
public void testExceptionIsThrownWhenSpannerChangeStreamsAndAutoscalingDifferentThanNone()
throws IOException {
final DataflowPipelineOptions options = buildPipelineOptions();
options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable");
final Pipeline p = buildPipeline(options);
final SdkComponents sdkComponents = createSdkComponents(options);
final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true);

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
"Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE");
DataflowPipelineTranslator.fromOptions(options)
.translate(
p,
pipelineProto,
sdkComponents,
DataflowRunner.fromOptions(options),
Collections.emptyList());
}

@Test
public void testExceptionIsThrownWhenSpannerChangeStreamsAndNoAutoscalingSpecified()
throws IOException {
final DataflowPipelineOptions options = buildPipelineOptions();
options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable");
final Pipeline p = buildPipeline(options);
final SdkComponents sdkComponents = createSdkComponents(options);
final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true);

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
"Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE");
final JobSpecification jobSpecification =
DataflowPipelineTranslator.fromOptions(options)
.translate(
p,
pipelineProto,
sdkComponents,
DataflowRunner.fromOptions(options),
Collections.emptyList());
assertNotNull(jobSpecification);
}

@Test
public void testNumWorkersCannotExceedMaxNumWorkers() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
Expand Down