From d7e0a51a44338c4eabcc36afe0d950d1e3bc4e03 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Wed, 24 Apr 2024 14:52:38 +0000 Subject: [PATCH 1/5] reader_max_read configuration, old field deprecation --- .../options/DataflowPipelineDebugOptions.java | 15 ++++++++++++++- .../dataflow/worker/WorkerCustomSources.java | 10 +++++++--- .../dataflow/worker/WorkerCustomSourcesTest.java | 6 +++--- 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index 30496dec2965..eea63e51c82e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -216,14 +216,27 @@ public Dataflow create(PipelineOptions options) { void setReaderCacheTimeoutSec(Integer value); - /** The max amount of time an UnboundedReader is consumed before checkpointing. */ + /** + * The max amount of time an UnboundedReader is consumed before checkpointing. + * + * @deprecated use {@link DataflowPipelineDebugOptions#getUnboundedReaderMaxReadTimeMs()} instead + */ @Description( "The max amount of time before an UnboundedReader is consumed before checkpointing, in seconds.") @Default.Integer(10) + @Deprecated Integer getUnboundedReaderMaxReadTimeSec(); void setUnboundedReaderMaxReadTimeSec(Integer value); + /** The max amount of time an UnboundedReader is consumed before checkpointing. */ + @Description( + "The max amount of time before an UnboundedReader is consumed before checkpointing, in millis.") + @Default.Integer(10 * 1000) + Integer getUnboundedReaderMaxReadTimeMs(); + + void setUnboundedReaderMaxReadTimeMs(Integer value); + /** The max elements read from an UnboundedReader before checkpointing. */ @Description("The max elements read from an UnboundedReader before checkpointing. ") @Default.Integer(10 * 1000) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java index 8c086016ee95..1e4cc62c2b77 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java @@ -796,9 +796,13 @@ private UnboundedReaderIterator( this.context = context; this.started = started; DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class); - this.endTime = - Instant.now() - .plus(Duration.standardSeconds(debugOptions.getUnboundedReaderMaxReadTimeSec())); + long MaxReadTimeMsDeprecated = debugOptions.getUnboundedReaderMaxReadTimeSec() * 1000L; + long maxReadTimeMs = debugOptions.getUnboundedReaderMaxReadTimeMs(); + if (MaxReadTimeMsDeprecated != 10000L && maxReadTimeMs == 10000L) { + // Take old field if UnboundedReaderMaxReadTimeMs is default + maxReadTimeMs = MaxReadTimeMsDeprecated; + } + this.endTime = Instant.now().plus(Duration.millis(maxReadTimeMs)); this.maxElems = debugOptions.getUnboundedReaderMaxElements(); this.backoffFactory = FluentBackoff.DEFAULT diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index d451ec093f77..cc9e6da4a735 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -645,10 +645,10 @@ public void testReadUnboundedReader() throws Exception { numReadOnThisIteration++; } Instant afterReading = Instant.now(); - long maxReadSec = debugOptions.getUnboundedReaderMaxReadTimeSec(); + long maxReadMs = debugOptions.getUnboundedReaderMaxReadTimeMs(); assertThat( - new Duration(beforeReading, afterReading).getStandardSeconds(), - lessThanOrEqualTo(maxReadSec + 1)); + new Duration(beforeReading, afterReading).getMillis(), + lessThanOrEqualTo(maxReadMs + 1000L)); assertThat( numReadOnThisIteration, lessThanOrEqualTo(debugOptions.getUnboundedReaderMaxElements())); From 61abe9c8cd110e8507d21750784e47ba74cf3fd4 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 26 Apr 2024 11:08:55 +0000 Subject: [PATCH 2/5] Add factory for detaults to hide complexity related to deprecation --- .../options/DataflowPipelineDebugOptions.java | 17 +++++++++++++++-- .../dataflow/worker/WorkerCustomSources.java | 5 ----- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index eea63e51c82e..8b00e313b81c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -223,7 +223,6 @@ public Dataflow create(PipelineOptions options) { */ @Description( "The max amount of time before an UnboundedReader is consumed before checkpointing, in seconds.") - @Default.Integer(10) @Deprecated Integer getUnboundedReaderMaxReadTimeSec(); @@ -232,11 +231,25 @@ public Dataflow create(PipelineOptions options) { /** The max amount of time an UnboundedReader is consumed before checkpointing. */ @Description( "The max amount of time before an UnboundedReader is consumed before checkpointing, in millis.") - @Default.Integer(10 * 1000) + @Default.InstanceFactory(UnboundedReaderMaxReadTimeFactory.class) Integer getUnboundedReaderMaxReadTimeMs(); void setUnboundedReaderMaxReadTimeMs(Integer value); + /** + * Sets Integer value based on old, deprecated field ({@link + * DataflowPipelineDebugOptions#getUnboundedReaderMaxReadTimeSec()}) is set. + */ + class UnboundedReaderMaxReadTimeFactory implements DefaultValueFactory { + @Override + public Integer create(PipelineOptions options) { + DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class); + if (debugOptions.getUnboundedReaderMaxReadTimeSec() != null) { + return debugOptions.getUnboundedReaderMaxReadTimeSec() * 1000; + } else return 10000; + } + } + /** The max elements read from an UnboundedReader before checkpointing. */ @Description("The max elements read from an UnboundedReader before checkpointing. ") @Default.Integer(10 * 1000) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java index 1e4cc62c2b77..b965110b3ef1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java @@ -796,12 +796,7 @@ private UnboundedReaderIterator( this.context = context; this.started = started; DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class); - long MaxReadTimeMsDeprecated = debugOptions.getUnboundedReaderMaxReadTimeSec() * 1000L; long maxReadTimeMs = debugOptions.getUnboundedReaderMaxReadTimeMs(); - if (MaxReadTimeMsDeprecated != 10000L && maxReadTimeMs == 10000L) { - // Take old field if UnboundedReaderMaxReadTimeMs is default - maxReadTimeMs = MaxReadTimeMsDeprecated; - } this.endTime = Instant.now().plus(Duration.millis(maxReadTimeMs)); this.maxElems = debugOptions.getUnboundedReaderMaxElements(); this.backoffFactory = From a39d62e31902800a6b004881bdb0152196f68e96 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 26 Apr 2024 11:19:06 +0000 Subject: [PATCH 3/5] spotless --- .../dataflow/options/DataflowPipelineDebugOptions.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index 8b00e313b81c..fee962e80492 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -246,7 +246,9 @@ public Integer create(PipelineOptions options) { DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class); if (debugOptions.getUnboundedReaderMaxReadTimeSec() != null) { return debugOptions.getUnboundedReaderMaxReadTimeSec() * 1000; - } else return 10000; + } else { + return 10000; + } } } From 6d01bfac28412fbc21ac73ed6de43bd1f8c0b631 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 26 Apr 2024 11:46:19 +0000 Subject: [PATCH 4/5] nits --- .../options/DataflowPipelineDebugOptions.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index fee962e80492..32cb78807058 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -224,6 +224,7 @@ public Dataflow create(PipelineOptions options) { @Description( "The max amount of time before an UnboundedReader is consumed before checkpointing, in seconds.") @Deprecated + @Default.Integer(10) Integer getUnboundedReaderMaxReadTimeSec(); void setUnboundedReaderMaxReadTimeSec(Integer value); @@ -238,17 +239,13 @@ public Dataflow create(PipelineOptions options) { /** * Sets Integer value based on old, deprecated field ({@link - * DataflowPipelineDebugOptions#getUnboundedReaderMaxReadTimeSec()}) is set. + * DataflowPipelineDebugOptions#getUnboundedReaderMaxReadTimeSec()}). */ - class UnboundedReaderMaxReadTimeFactory implements DefaultValueFactory { + static final class UnboundedReaderMaxReadTimeFactory implements DefaultValueFactory { @Override public Integer create(PipelineOptions options) { DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class); - if (debugOptions.getUnboundedReaderMaxReadTimeSec() != null) { - return debugOptions.getUnboundedReaderMaxReadTimeSec() * 1000; - } else { - return 10000; - } + return debugOptions.getUnboundedReaderMaxReadTimeSec() * 1000; } } From d704797a0bf368b1fb4ea00db74bf1cf0cc8e915 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 26 Apr 2024 12:07:07 +0000 Subject: [PATCH 5/5] remove static --- .../runners/dataflow/options/DataflowPipelineDebugOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index 32cb78807058..7a5284151b9b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -241,7 +241,7 @@ public Dataflow create(PipelineOptions options) { * Sets Integer value based on old, deprecated field ({@link * DataflowPipelineDebugOptions#getUnboundedReaderMaxReadTimeSec()}). */ - static final class UnboundedReaderMaxReadTimeFactory implements DefaultValueFactory { + final class UnboundedReaderMaxReadTimeFactory implements DefaultValueFactory { @Override public Integer create(PipelineOptions options) { DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);