diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index b46cedb4297a..41134adc3d03 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -63,6 +63,7 @@ import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext; import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType; import org.apache.beam.runners.dataflow.util.CloudObject; import org.apache.beam.runners.dataflow.util.CloudObjects; import org.apache.beam.runners.dataflow.util.OutputReference; @@ -73,6 +74,7 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.SpannerChangeStreamOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -405,6 +407,8 @@ public Job translate(List packages) { workerPool.setDiskSizeGb(options.getDiskSizeGb()); } AutoscalingSettings settings = new AutoscalingSettings(); + // TODO: Remove this once autoscaling is supported for SpannerIO.readChangeStream + assertSpannerChangeStreamsNoAutoScaling(options); if (options.getAutoscalingAlgorithm() != null) { settings.setAlgorithm(options.getAutoscalingAlgorithm().getAlgorithm()); } @@ -604,6 +608,29 @@ public void leaveCompositeTransform(TransformHierarchy.Node node) { return parents.peekFirst().toAppliedPTransform(getPipeline()); } } + + // TODO: Remove this once the autoscaling is supported for Spanner change streams + private void assertSpannerChangeStreamsNoAutoScaling(DataflowPipelineOptions options) { + if (isSpannerChangeStream(options) && !isAutoScalingAlgorithmNone(options)) { + throw new IllegalArgumentException( + "Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE."); + } + } + + private boolean isSpannerChangeStream(DataflowPipelineOptions options) { + try { + final SpannerChangeStreamOptions spannerOptions = + options.as(SpannerChangeStreamOptions.class); + final String metadataTable = spannerOptions.getMetadataTable(); + return metadataTable != null && !metadataTable.isEmpty(); + } catch (Exception e) { + return false; + } + } + + private boolean isAutoScalingAlgorithmNone(DataflowPipelineOptions options) { + return AutoscalingAlgorithmType.NONE.equals(options.getAutoscalingAlgorithm()); + } } static class StepTranslator implements StepTranslationContext { diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 82297b9784ae..58ad2a49dde3 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -67,6 +67,7 @@ import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; +import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType; import org.apache.beam.runners.dataflow.util.CloudObject; import org.apache.beam.runners.dataflow.util.CloudObjects; import org.apache.beam.runners.dataflow.util.PropertyNames; @@ -84,6 +85,7 @@ import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.SpannerChangeStreamOptions; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -426,6 +428,71 @@ public void testMaxNumWorkersIsPassedWhenNoAlgorithmIsSet() throws IOException { .intValue()); } + @Test + public void testSuccessWhenSpannerChangeStreamsAndAutoscalingEqualToNone() throws IOException { + final DataflowPipelineOptions options = buildPipelineOptions(); + options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.NONE); + options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable"); + final Pipeline p = buildPipeline(options); + final SdkComponents sdkComponents = createSdkComponents(options); + final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true); + + final JobSpecification jobSpecification = + DataflowPipelineTranslator.fromOptions(options) + .translate( + p, + pipelineProto, + sdkComponents, + DataflowRunner.fromOptions(options), + Collections.emptyList()); + assertNotNull(jobSpecification); + } + + @Test + public void testExceptionIsThrownWhenSpannerChangeStreamsAndAutoscalingDifferentThanNone() + throws IOException { + final DataflowPipelineOptions options = buildPipelineOptions(); + options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED); + options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable"); + final Pipeline p = buildPipeline(options); + final SdkComponents sdkComponents = createSdkComponents(options); + final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE"); + DataflowPipelineTranslator.fromOptions(options) + .translate( + p, + pipelineProto, + sdkComponents, + DataflowRunner.fromOptions(options), + Collections.emptyList()); + } + + @Test + public void testExceptionIsThrownWhenSpannerChangeStreamsAndNoAutoscalingSpecified() + throws IOException { + final DataflowPipelineOptions options = buildPipelineOptions(); + options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable"); + final Pipeline p = buildPipeline(options); + final SdkComponents sdkComponents = createSdkComponents(options); + final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE"); + final JobSpecification jobSpecification = + DataflowPipelineTranslator.fromOptions(options) + .translate( + p, + pipelineProto, + sdkComponents, + DataflowRunner.fromOptions(options), + Collections.emptyList()); + assertNotNull(jobSpecification); + } + @Test public void testNumWorkersCannotExceedMaxNumWorkers() throws IOException { DataflowPipelineOptions options = buildPipelineOptions();