From c9ac05bc86ce2727300215095a1f59101789b986 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Stankiewicz?= Date: Thu, 18 Apr 2024 10:43:54 +0200 Subject: [PATCH 01/11] add ms part for UnboundedReader checkpointing --- .../dataflow/options/DataflowPipelineDebugOptions.java | 10 +++++++++- .../runners/dataflow/worker/WorkerCustomSources.java | 3 ++- .../dataflow/worker/WorkerCustomSourcesTest.java | 6 +++--- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index 30496dec2965..29225d005eca 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -218,12 +218,20 @@ public Dataflow create(PipelineOptions options) { /** The max amount of time an UnboundedReader is consumed before checkpointing. */ @Description( - "The max amount of time before an UnboundedReader is consumed before checkpointing, in seconds.") + "The max amount of time (UnboundedReaderMaxReadTimeSec+UnboundedReaderMaxReadTimeMs) before an UnboundedReader is consumed before checkpointing, seconds part.") @Default.Integer(10) Integer getUnboundedReaderMaxReadTimeSec(); void setUnboundedReaderMaxReadTimeSec(Integer value); + /** The max amount of time an UnboundedReader is consumed before checkpointing. */ + @Description( + "The max amount of time (UnboundedReaderMaxReadTimeSec+UnboundedReaderMaxReadTimeMs) before an UnboundedReader is consumed before checkpointing, millis part.") + @Default.Integer(0) + Integer getUnboundedReaderMaxReadTimeMs(); + + void setUnboundedReaderMaxReadTimeMs(Integer value); + /** The max elements read from an UnboundedReader before checkpointing. */ @Description("The max elements read from an UnboundedReader before checkpointing. ") @Default.Integer(10 * 1000) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java index 8c086016ee95..3a36b7573610 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java @@ -798,7 +798,8 @@ private UnboundedReaderIterator( DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class); this.endTime = Instant.now() - .plus(Duration.standardSeconds(debugOptions.getUnboundedReaderMaxReadTimeSec())); + .plus(Duration.standardSeconds(debugOptions.getUnboundedReaderMaxReadTimeSec())) + .plus(Duration.millis(debugOptions.getUnboundedReaderMaxReadTimeMs())); this.maxElems = debugOptions.getUnboundedReaderMaxElements(); this.backoffFactory = FluentBackoff.DEFAULT diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index d451ec093f77..e302aafc69b8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -645,10 +645,10 @@ public void testReadUnboundedReader() throws Exception { numReadOnThisIteration++; } Instant afterReading = Instant.now(); - long maxReadSec = debugOptions.getUnboundedReaderMaxReadTimeSec(); + long maxReadSec = debugOptions.getUnboundedReaderMaxReadTimeSec()*1000+debugOptions.getUnboundedReaderMaxReadTimeMs(); assertThat( - new Duration(beforeReading, afterReading).getStandardSeconds(), - lessThanOrEqualTo(maxReadSec + 1)); + new Duration(beforeReading, afterReading).millis(), + lessThanOrEqualTo(maxReadSec + 100)); assertThat( numReadOnThisIteration, lessThanOrEqualTo(debugOptions.getUnboundedReaderMaxElements())); From a36f178d3925229d4c7d397ed880f4e7383bd530 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Stankiewicz?= Date: Thu, 18 Apr 2024 10:51:46 +0200 Subject: [PATCH 02/11] typo --- .../beam/runners/dataflow/worker/WorkerCustomSourcesTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index e302aafc69b8..69a520b0c2d0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -645,10 +645,10 @@ public void testReadUnboundedReader() throws Exception { numReadOnThisIteration++; } Instant afterReading = Instant.now(); - long maxReadSec = debugOptions.getUnboundedReaderMaxReadTimeSec()*1000+debugOptions.getUnboundedReaderMaxReadTimeMs(); + long maxReadMillis = debugOptions.getUnboundedReaderMaxReadTimeSec()*1000+debugOptions.getUnboundedReaderMaxReadTimeMs(); assertThat( new Duration(beforeReading, afterReading).millis(), - lessThanOrEqualTo(maxReadSec + 100)); + lessThanOrEqualTo(maxReadMillis + 1000)); assertThat( numReadOnThisIteration, lessThanOrEqualTo(debugOptions.getUnboundedReaderMaxElements())); From cf5192a8662efedf31e8b712bbb1bf5aa330d233 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Thu, 18 Apr 2024 09:18:06 +0000 Subject: [PATCH 03/11] spotless --- .../beam/runners/dataflow/worker/WorkerCustomSourcesTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 69a520b0c2d0..939cf1ae76be 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -645,7 +645,9 @@ public void testReadUnboundedReader() throws Exception { numReadOnThisIteration++; } Instant afterReading = Instant.now(); - long maxReadMillis = debugOptions.getUnboundedReaderMaxReadTimeSec()*1000+debugOptions.getUnboundedReaderMaxReadTimeMs(); + long maxReadMillis = + debugOptions.getUnboundedReaderMaxReadTimeSec() * 1000 + + debugOptions.getUnboundedReaderMaxReadTimeMs(); assertThat( new Duration(beforeReading, afterReading).millis(), lessThanOrEqualTo(maxReadMillis + 1000)); From a001d69d652be990ad86cb53b78147b984d6b271 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Thu, 18 Apr 2024 09:36:32 +0000 Subject: [PATCH 04/11] spotless --- .../beam/runners/dataflow/worker/WorkerCustomSourcesTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 939cf1ae76be..8c11fc96f69b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -649,7 +649,7 @@ public void testReadUnboundedReader() throws Exception { debugOptions.getUnboundedReaderMaxReadTimeSec() * 1000 + debugOptions.getUnboundedReaderMaxReadTimeMs(); assertThat( - new Duration(beforeReading, afterReading).millis(), + new Duration(beforeReading, afterReading).getMillis(), lessThanOrEqualTo(maxReadMillis + 1000)); assertThat( numReadOnThisIteration, lessThanOrEqualTo(debugOptions.getUnboundedReaderMaxElements())); From dff48be77c5b4dc8074300c65924105c38607404 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Thu, 18 Apr 2024 09:39:47 +0000 Subject: [PATCH 05/11] spotless --- .../options/DataflowPipelineDebugOptions.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index 29225d005eca..f6f36dd412f6 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -224,13 +224,13 @@ public Dataflow create(PipelineOptions options) { void setUnboundedReaderMaxReadTimeSec(Integer value); - /** The max amount of time an UnboundedReader is consumed before checkpointing. */ - @Description( - "The max amount of time (UnboundedReaderMaxReadTimeSec+UnboundedReaderMaxReadTimeMs) before an UnboundedReader is consumed before checkpointing, millis part.") - @Default.Integer(0) - Integer getUnboundedReaderMaxReadTimeMs(); - - void setUnboundedReaderMaxReadTimeMs(Integer value); + /** The max amount of time an UnboundedReader is consumed before checkpointing. */ + @Description( + "The max amount of time (UnboundedReaderMaxReadTimeSec+UnboundedReaderMaxReadTimeMs) before an UnboundedReader is consumed before checkpointing, millis part.") + @Default.Integer(0) + Integer getUnboundedReaderMaxReadTimeMs(); + + void setUnboundedReaderMaxReadTimeMs(Integer value); /** The max elements read from an UnboundedReader before checkpointing. */ @Description("The max elements read from an UnboundedReader before checkpointing. ") From 8bb39f9f39a4bfd3db75f9bb6832e6491e20b39a Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Thu, 18 Apr 2024 09:50:48 +0000 Subject: [PATCH 06/11] [IntLongMath] Expression of type int may overflow before being assigned to a long --- .../beam/runners/dataflow/worker/WorkerCustomSourcesTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 8c11fc96f69b..72138744a778 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -646,11 +646,11 @@ public void testReadUnboundedReader() throws Exception { } Instant afterReading = Instant.now(); long maxReadMillis = - debugOptions.getUnboundedReaderMaxReadTimeSec() * 1000 + debugOptions.getUnboundedReaderMaxReadTimeSec() * 1000L + debugOptions.getUnboundedReaderMaxReadTimeMs(); assertThat( new Duration(beforeReading, afterReading).getMillis(), - lessThanOrEqualTo(maxReadMillis + 1000)); + lessThanOrEqualTo(maxReadMillis + 1000L)); assertThat( numReadOnThisIteration, lessThanOrEqualTo(debugOptions.getUnboundedReaderMaxElements())); From 852efebff946a922f973fc1bcbbba654e260f660 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Thu, 18 Apr 2024 13:21:54 +0000 Subject: [PATCH 07/11] readerMaxReadTime sec as double --- .../options/DataflowPipelineDebugOptions.java | 16 ++++------------ .../dataflow/worker/WorkerCustomSources.java | 5 +++-- .../dataflow/worker/WorkerCustomSourcesTest.java | 6 ++---- 3 files changed, 9 insertions(+), 18 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index f6f36dd412f6..7dad8fc4b365 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -218,19 +218,11 @@ public Dataflow create(PipelineOptions options) { /** The max amount of time an UnboundedReader is consumed before checkpointing. */ @Description( - "The max amount of time (UnboundedReaderMaxReadTimeSec+UnboundedReaderMaxReadTimeMs) before an UnboundedReader is consumed before checkpointing, seconds part.") - @Default.Integer(10) - Integer getUnboundedReaderMaxReadTimeSec(); + "The max amount of time before an UnboundedReader is consumed before checkpointing, in seconds.") + @Default.Double(10.0) + Double getUnboundedReaderMaxReadTimeSec(); - void setUnboundedReaderMaxReadTimeSec(Integer value); - - /** The max amount of time an UnboundedReader is consumed before checkpointing. */ - @Description( - "The max amount of time (UnboundedReaderMaxReadTimeSec+UnboundedReaderMaxReadTimeMs) before an UnboundedReader is consumed before checkpointing, millis part.") - @Default.Integer(0) - Integer getUnboundedReaderMaxReadTimeMs(); - - void setUnboundedReaderMaxReadTimeMs(Integer value); + void setUnboundedReaderMaxReadTimeSec(Double value); /** The max elements read from an UnboundedReader before checkpointing. */ @Description("The max elements read from an UnboundedReader before checkpointing. ") diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java index 3a36b7573610..a8e358f19e07 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java @@ -798,8 +798,9 @@ private UnboundedReaderIterator( DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class); this.endTime = Instant.now() - .plus(Duration.standardSeconds(debugOptions.getUnboundedReaderMaxReadTimeSec())) - .plus(Duration.millis(debugOptions.getUnboundedReaderMaxReadTimeMs())); + .plus( + Duration.millis( + (long) (debugOptions.getUnboundedReaderMaxReadTimeSec() * 1000L))); this.maxElems = debugOptions.getUnboundedReaderMaxElements(); this.backoffFactory = FluentBackoff.DEFAULT diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 72138744a778..7e0cc721f3dc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -645,12 +645,10 @@ public void testReadUnboundedReader() throws Exception { numReadOnThisIteration++; } Instant afterReading = Instant.now(); - long maxReadMillis = - debugOptions.getUnboundedReaderMaxReadTimeSec() * 1000L - + debugOptions.getUnboundedReaderMaxReadTimeMs(); + double maxReadSec = debugOptions.getUnboundedReaderMaxReadTimeSec(); assertThat( new Duration(beforeReading, afterReading).getMillis(), - lessThanOrEqualTo(maxReadMillis + 1000L)); + lessThanOrEqualTo((long) (maxReadSec * 1000L))); assertThat( numReadOnThisIteration, lessThanOrEqualTo(debugOptions.getUnboundedReaderMaxElements())); From 8fc8062fa11b336dd29d58893d4d49011cd3c1da Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Thu, 18 Apr 2024 13:31:47 +0000 Subject: [PATCH 08/11] readerMaxReadTime sec as double --- .../runners/dataflow/options/DataflowPipelineDebugOptions.java | 2 +- .../beam/runners/dataflow/worker/WorkerCustomSourcesTest.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index 7dad8fc4b365..3bfd971fe900 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -222,7 +222,7 @@ public Dataflow create(PipelineOptions options) { @Default.Double(10.0) Double getUnboundedReaderMaxReadTimeSec(); - void setUnboundedReaderMaxReadTimeSec(Double value); + void setUnboundedReaderMaxReadTimeSec(double value); /** The max elements read from an UnboundedReader before checkpointing. */ @Description("The max elements read from an UnboundedReader before checkpointing. ") diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 7e0cc721f3dc..1d7faa794a62 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -598,6 +598,7 @@ public void testReadUnboundedReader() throws Exception { int maxElements = 10; DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class); debugOptions.setUnboundedReaderMaxElements(maxElements); + debugOptions.setUnboundedReaderMaxReadTimeSec(10); ByteString state = ByteString.EMPTY; for (int i = 0; i < 10 * maxElements; From 87fca4c282adc07e8df9fafaba8358fa78d5c818 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Thu, 18 Apr 2024 13:43:47 +0000 Subject: [PATCH 09/11] readerMaxReadTime sec as double --- .../runners/dataflow/options/DataflowPipelineDebugOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index 3bfd971fe900..c3dd3bfbe1ff 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -220,7 +220,7 @@ public Dataflow create(PipelineOptions options) { @Description( "The max amount of time before an UnboundedReader is consumed before checkpointing, in seconds.") @Default.Double(10.0) - Double getUnboundedReaderMaxReadTimeSec(); + double getUnboundedReaderMaxReadTimeSec(); void setUnboundedReaderMaxReadTimeSec(double value); From 5ec8f0b436125f44507df97596880de00c0d382d Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Thu, 18 Apr 2024 15:31:02 +0000 Subject: [PATCH 10/11] readerMaxReadTime sec as double --- .../runners/dataflow/options/DataflowPipelineDebugOptions.java | 3 ++- .../beam/runners/dataflow/worker/WorkerCustomSourcesTest.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index c3dd3bfbe1ff..902282d8d601 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -218,7 +218,8 @@ public Dataflow create(PipelineOptions options) { /** The max amount of time an UnboundedReader is consumed before checkpointing. */ @Description( - "The max amount of time before an UnboundedReader is consumed before checkpointing, in seconds.") + "The max amount of time before an UnboundedReader is consumed before checkpointing, " + + "in seconds. Duration can be set to fractions of seconds. ") @Default.Double(10.0) double getUnboundedReaderMaxReadTimeSec(); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 1d7faa794a62..261567930fe6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -649,7 +649,7 @@ public void testReadUnboundedReader() throws Exception { double maxReadSec = debugOptions.getUnboundedReaderMaxReadTimeSec(); assertThat( new Duration(beforeReading, afterReading).getMillis(), - lessThanOrEqualTo((long) (maxReadSec * 1000L))); + lessThanOrEqualTo((long) ((maxReadSec + 1) * 1000L))); assertThat( numReadOnThisIteration, lessThanOrEqualTo(debugOptions.getUnboundedReaderMaxElements())); From 445d97077507cd34609caa474bee48363777953f Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Thu, 18 Apr 2024 15:59:00 +0000 Subject: [PATCH 11/11] spotless --- .../runners/dataflow/options/DataflowPipelineDebugOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index 902282d8d601..3f6c47ece68a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -219,7 +219,7 @@ public Dataflow create(PipelineOptions options) { /** The max amount of time an UnboundedReader is consumed before checkpointing. */ @Description( "The max amount of time before an UnboundedReader is consumed before checkpointing, " - + "in seconds. Duration can be set to fractions of seconds. ") + + "in seconds. Duration can be set to fractions of seconds. ") @Default.Double(10.0) double getUnboundedReaderMaxReadTimeSec();