diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index 30496dec2965..3f6c47ece68a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -218,11 +218,12 @@ public Dataflow create(PipelineOptions options) { /** The max amount of time an UnboundedReader is consumed before checkpointing. */ @Description( - "The max amount of time before an UnboundedReader is consumed before checkpointing, in seconds.") - @Default.Integer(10) - Integer getUnboundedReaderMaxReadTimeSec(); + "The max amount of time before an UnboundedReader is consumed before checkpointing, " + + "in seconds. Duration can be set to fractions of seconds. ") + @Default.Double(10.0) + double getUnboundedReaderMaxReadTimeSec(); - void setUnboundedReaderMaxReadTimeSec(Integer value); + void setUnboundedReaderMaxReadTimeSec(double value); /** The max elements read from an UnboundedReader before checkpointing. */ @Description("The max elements read from an UnboundedReader before checkpointing. ") diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java index 8c086016ee95..a8e358f19e07 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java @@ -798,7 +798,9 @@ private UnboundedReaderIterator( DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class); this.endTime = Instant.now() - .plus(Duration.standardSeconds(debugOptions.getUnboundedReaderMaxReadTimeSec())); + .plus( + Duration.millis( + (long) (debugOptions.getUnboundedReaderMaxReadTimeSec() * 1000L))); this.maxElems = debugOptions.getUnboundedReaderMaxElements(); this.backoffFactory = FluentBackoff.DEFAULT diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index d451ec093f77..261567930fe6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -598,6 +598,7 @@ public void testReadUnboundedReader() throws Exception { int maxElements = 10; DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class); debugOptions.setUnboundedReaderMaxElements(maxElements); + debugOptions.setUnboundedReaderMaxReadTimeSec(10); ByteString state = ByteString.EMPTY; for (int i = 0; i < 10 * maxElements; @@ -645,10 +646,10 @@ public void testReadUnboundedReader() throws Exception { numReadOnThisIteration++; } Instant afterReading = Instant.now(); - long maxReadSec = debugOptions.getUnboundedReaderMaxReadTimeSec(); + double maxReadSec = debugOptions.getUnboundedReaderMaxReadTimeSec(); assertThat( - new Duration(beforeReading, afterReading).getStandardSeconds(), - lessThanOrEqualTo(maxReadSec + 1)); + new Duration(beforeReading, afterReading).getMillis(), + lessThanOrEqualTo((long) ((maxReadSec + 1) * 1000L))); assertThat( numReadOnThisIteration, lessThanOrEqualTo(debugOptions.getUnboundedReaderMaxElements()));