From 4b80f8f6c278dd518e0e7b00ee50d21da0dcb52b Mon Sep 17 00:00:00 2001 From: Thiago Nunes Date: Wed, 30 Mar 2022 03:57:06 +1100 Subject: [PATCH] Merge pull request #17202 from [BEAM-14194]: Disallow autoscaling for SpannerIO.readChangeStream SpannerIO.readChangeStream does not support autoscaling at the moment. In order to avoid customer confusion, we decided to throw an error if any algorithm is specified other than NONE. (cherry picked from commit a090e901b9a6335b2bdc82d62f8fb999d0010e3a) --- .../dataflow/DataflowPipelineTranslator.java | 27 ++++++++ .../DataflowPipelineTranslatorTest.java | 67 +++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index b46cedb4297a..41134adc3d03 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -63,6 +63,7 @@ import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext; import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType; import org.apache.beam.runners.dataflow.util.CloudObject; import org.apache.beam.runners.dataflow.util.CloudObjects; import org.apache.beam.runners.dataflow.util.OutputReference; @@ -73,6 +74,7 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.SpannerChangeStreamOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -405,6 +407,8 @@ public Job translate(List packages) { workerPool.setDiskSizeGb(options.getDiskSizeGb()); } AutoscalingSettings settings = new AutoscalingSettings(); + // TODO: Remove this once autoscaling is supported for SpannerIO.readChangeStream + assertSpannerChangeStreamsNoAutoScaling(options); if (options.getAutoscalingAlgorithm() != null) { settings.setAlgorithm(options.getAutoscalingAlgorithm().getAlgorithm()); } @@ -604,6 +608,29 @@ public void leaveCompositeTransform(TransformHierarchy.Node node) { return parents.peekFirst().toAppliedPTransform(getPipeline()); } } + + // TODO: Remove this once the autoscaling is supported for Spanner change streams + private void assertSpannerChangeStreamsNoAutoScaling(DataflowPipelineOptions options) { + if (isSpannerChangeStream(options) && !isAutoScalingAlgorithmNone(options)) { + throw new IllegalArgumentException( + "Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE."); + } + } + + private boolean isSpannerChangeStream(DataflowPipelineOptions options) { + try { + final SpannerChangeStreamOptions spannerOptions = + options.as(SpannerChangeStreamOptions.class); + final String metadataTable = spannerOptions.getMetadataTable(); + return metadataTable != null && !metadataTable.isEmpty(); + } catch (Exception e) { + return false; + } + } + + private boolean isAutoScalingAlgorithmNone(DataflowPipelineOptions options) { + return AutoscalingAlgorithmType.NONE.equals(options.getAutoscalingAlgorithm()); + } } static class StepTranslator implements StepTranslationContext { diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 82297b9784ae..58ad2a49dde3 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -67,6 +67,7 @@ import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; +import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType; import org.apache.beam.runners.dataflow.util.CloudObject; import org.apache.beam.runners.dataflow.util.CloudObjects; import org.apache.beam.runners.dataflow.util.PropertyNames; @@ -84,6 +85,7 @@ import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.SpannerChangeStreamOptions; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -426,6 +428,71 @@ public void testMaxNumWorkersIsPassedWhenNoAlgorithmIsSet() throws IOException { .intValue()); } + @Test + public void testSuccessWhenSpannerChangeStreamsAndAutoscalingEqualToNone() throws IOException { + final DataflowPipelineOptions options = buildPipelineOptions(); + options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.NONE); + options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable"); + final Pipeline p = buildPipeline(options); + final SdkComponents sdkComponents = createSdkComponents(options); + final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true); + + final JobSpecification jobSpecification = + DataflowPipelineTranslator.fromOptions(options) + .translate( + p, + pipelineProto, + sdkComponents, + DataflowRunner.fromOptions(options), + Collections.emptyList()); + assertNotNull(jobSpecification); + } + + @Test + public void testExceptionIsThrownWhenSpannerChangeStreamsAndAutoscalingDifferentThanNone() + throws IOException { + final DataflowPipelineOptions options = buildPipelineOptions(); + options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED); + options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable"); + final Pipeline p = buildPipeline(options); + final SdkComponents sdkComponents = createSdkComponents(options); + final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE"); + DataflowPipelineTranslator.fromOptions(options) + .translate( + p, + pipelineProto, + sdkComponents, + DataflowRunner.fromOptions(options), + Collections.emptyList()); + } + + @Test + public void testExceptionIsThrownWhenSpannerChangeStreamsAndNoAutoscalingSpecified() + throws IOException { + final DataflowPipelineOptions options = buildPipelineOptions(); + options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable"); + final Pipeline p = buildPipeline(options); + final SdkComponents sdkComponents = createSdkComponents(options); + final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE"); + final JobSpecification jobSpecification = + DataflowPipelineTranslator.fromOptions(options) + .translate( + p, + pipelineProto, + sdkComponents, + DataflowRunner.fromOptions(options), + Collections.emptyList()); + assertNotNull(jobSpecification); + } + @Test public void testNumWorkersCannotExceedMaxNumWorkers() throws IOException { DataflowPipelineOptions options = buildPipelineOptions();