From e2f9688ac8fdb6ef4d5947dbc21b5610e76b87d1 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 29 Feb 2016 13:48:09 -0800 Subject: [PATCH 1/5] Add UnboundedCountingInput#withRate The rate controls the speed at which UnboundedCountingInput outputs elements. This is an aggregate rate across all instances of the source, and thus elements will not necessarily be output "smoothly", or within the first period. The aggregate rate, however, will be approximately equal to the provided rate. Add package-private CountingSource#createUnbounded() to expose the UnboundedCountingSource type. Make UnboundedCountingSource package-private. --- .../cloud/dataflow/sdk/io/CountingInput.java | 45 +++++- .../cloud/dataflow/sdk/io/CountingSource.java | 129 ++++++++++++++++-- .../dataflow/sdk/io/CountingInputTest.java | 25 ++++ .../dataflow/sdk/io/CountingSourceTest.java | 73 ++++++++++ 4 files changed, 259 insertions(+), 13 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java index 07609ba716eb..d5ee92b92f79 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java @@ -89,7 +89,15 @@ public static BoundedCountingInput upTo(long numElements) { */ public static UnboundedCountingInput unbounded() { return new UnboundedCountingInput( - new NowTimestampFn(), Optional.absent(), Optional.absent()); + new NowTimestampFn(), + // Elements per period + 1L, + // period length + Duration.ZERO, + // max num records + Optional.absent(), + // max read duration + Optional.absent()); } /** @@ -125,14 +133,20 @@ public PCollection apply(PBegin begin) { */ public static class UnboundedCountingInput extends PTransform> { private final SerializableFunction timestampFn; + private final long elementsPerPeriod; + private final Duration period; private final Optional maxNumRecords; private final Optional maxReadTime; private UnboundedCountingInput( SerializableFunction timestampFn, + long elementsPerPeriod, + Duration period, Optional maxNumRecords, Optional maxReadTime) { this.timestampFn = timestampFn; + this.elementsPerPeriod = elementsPerPeriod; + this.period = period; this.maxNumRecords = maxNumRecords; this.maxReadTime = maxReadTime; } @@ -144,7 +158,8 @@ private UnboundedCountingInput( *

Note that the timestamps produced by {@code timestampFn} may not decrease. */ public UnboundedCountingInput withTimestampFn(SerializableFunction timestampFn) { - return new UnboundedCountingInput(timestampFn, maxNumRecords, maxReadTime); + return new UnboundedCountingInput( + timestampFn, elementsPerPeriod, period, maxNumRecords, maxReadTime); } /** @@ -157,7 +172,22 @@ public UnboundedCountingInput withTimestampFn(SerializableFunction 0, "MaxRecords must be a positive (nonzero) value. Got %s", maxRecords); - return new UnboundedCountingInput(timestampFn, Optional.of(maxRecords), maxReadTime); + return new UnboundedCountingInput( + timestampFn, elementsPerPeriod, period, Optional.of(maxRecords), maxReadTime); + } + + /** + * Returns an {@link UnboundedCountingInput} like this one, but with output production limited + * to an aggregate rate of no more than the number of elements per the period length. + * + *

Note that this period is taken in aggregate across all instances of the + * {@link PTransform}, which may cause elements to be produced in bunches. + * + *

A duration of {@link Duration#ZERO} will produce output as fast as possbile. + */ + public UnboundedCountingInput withRate(long numElements, Duration periodLength) { + return new UnboundedCountingInput( + timestampFn, numElements, periodLength, maxNumRecords, maxReadTime); } /** @@ -169,13 +199,18 @@ public UnboundedCountingInput withMaxNumRecords(long maxRecords) { */ public UnboundedCountingInput withMaxReadTime(Duration readTime) { checkNotNull(readTime, "ReadTime cannot be null"); - return new UnboundedCountingInput(timestampFn, maxNumRecords, Optional.of(readTime)); + return new UnboundedCountingInput( + timestampFn, elementsPerPeriod, period, maxNumRecords, Optional.of(readTime)); } @SuppressWarnings("deprecation") @Override public PCollection apply(PBegin begin) { - Unbounded read = Read.from(CountingSource.unboundedWithTimestampFn(timestampFn)); + Unbounded read = + Read.from( + CountingSource.createUnbounded() + .withTimestampFn(timestampFn) + .withRate(elementsPerPeriod, period)); if (!maxNumRecords.isPresent() && !maxReadTime.isPresent()) { return begin.apply(read); } else if (maxNumRecords.isPresent() && !maxReadTime.isPresent()) { diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java index 412f3a7ec9c6..a6ff0adab4eb 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java @@ -17,6 +17,7 @@ package com.google.cloud.dataflow.sdk.io; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import com.google.cloud.dataflow.sdk.coders.AvroCoder; import com.google.cloud.dataflow.sdk.coders.Coder; @@ -28,7 +29,9 @@ import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.collect.ImmutableList; +import com.google.common.math.LongMath; +import org.joda.time.Duration; import org.joda.time.Instant; import java.io.IOException; @@ -81,6 +84,14 @@ public static BoundedSource upTo(long numElements) { return new BoundedCountingSource(0, numElements); } + /** + * Create a new {@link UnboundedCountingSource}. + */ + // package-private to return a typed UnboundedCountingSource rather than the UnboundedSource type. + static UnboundedCountingSource createUnbounded() { + return new UnboundedCountingSource(0, 1, 1L, Duration.ZERO, new NowTimestampFn()); + } + /** * Creates an {@link UnboundedSource} that will produce numbers starting from {@code 0} up to * {@link Long#MAX_VALUE}. @@ -113,7 +124,7 @@ public static UnboundedSource unbounded() { @Deprecated public static UnboundedSource unboundedWithTimestampFn( SerializableFunction timestampFn) { - return new UnboundedCountingSource(0, 1, timestampFn); + return new UnboundedCountingSource(0, 1, 1L, Duration.ZERO, timestampFn); } ///////////////////////////////////////////////////////////////////////////////////////////// @@ -226,11 +237,15 @@ public void close() throws IOException {} /** * An implementation of {@link CountingSource} that produces an unbounded {@link PCollection}. */ - private static class UnboundedCountingSource extends UnboundedSource { + static class UnboundedCountingSource extends UnboundedSource { /** The first number (>= 0) generated by this {@link UnboundedCountingSource}. */ private final long start; /** The interval between numbers generated by this {@link UnboundedCountingSource}. */ private final long stride; + /** The number of elements to produce each period. */ + private final long elementsPerPeriod; + /** The time between producing numbers from this {@link UnboundedCountingSource}. */ + private final Duration period; /** The function used to produce timestamps for the generated elements. */ private final SerializableFunction timestampFn; @@ -243,13 +258,47 @@ private static class UnboundedCountingSource extends UnboundedSourceNote that the timestamps produced by {@code timestampFn} may not decrease. */ - public UnboundedCountingSource( - long start, long stride, SerializableFunction timestampFn) { + private UnboundedCountingSource( + long start, + long stride, + long elementsPerPeriod, + Duration period, + SerializableFunction timestampFn) { this.start = start; this.stride = stride; + this.elementsPerPeriod = elementsPerPeriod; + this.period = period; this.timestampFn = timestampFn; } + /** + * Returns an {@link UnboundedCountingSource} like this one with the specified period. Elements + * will be produced with an interval between them equal to the period. + */ + public UnboundedCountingSource withRate(long elementsPerPeriod, Duration period) { + checkArgument( + elementsPerPeriod > 0, + "elements produced per period must be a positive value, got %s", + elementsPerPeriod); + checkArgument( + period.getMillis() >= 0, + "Period must be a non-negative value, got %s", + period.getMillis()); + return new UnboundedCountingSource(start, stride, elementsPerPeriod, period, timestampFn); + } + + /** + * Returns an {@link UnboundedCountingSource} like this one where the timestamp of output + * elements are supplied by the specified function. + * + *

Note that timestamps produced by {@code timestampFn} may not decrease. + */ + public UnboundedCountingSource withTimestampFn( + SerializableFunction timestampFn) { + checkNotNull(timestampFn); + return new UnboundedCountingSource(start, stride, elementsPerPeriod, period, timestampFn); + } + /** * Splits an unbounded source {@code desiredNumSplits} ways by giving each split every * {@code desiredNumSplits}th element that this {@link UnboundedCountingSource} @@ -265,12 +314,15 @@ public List> generat int desiredNumSplits, PipelineOptions options) throws Exception { // Using Javadoc example, stride 2 with 3 splits becomes stride 6. long newStride = stride * desiredNumSplits; + Duration newPeriod = period.multipliedBy(desiredNumSplits); ImmutableList.Builder splits = ImmutableList.builder(); for (int i = 0; i < desiredNumSplits; ++i) { // Starts offset by the original stride. Using Javadoc example, this generates starts of // 0, 2, and 4. - splits.add(new UnboundedCountingSource(start + i * stride, newStride, timestampFn)); + splits.add( + new UnboundedCountingSource( + start + i * stride, newStride, elementsPerPeriod, newPeriod, timestampFn)); } return splits.build(); } @@ -287,7 +339,9 @@ public Coder getCheckpointMarkCoder() { } @Override - public void validate() {} + public void validate() { + checkArgument(period.getMillis() >= 0L); + } @Override public Coder getDefaultOutputCoder() { @@ -302,17 +356,27 @@ public Coder getDefaultOutputCoder() { */ private static class UnboundedCountingReader extends UnboundedReader { private UnboundedCountingSource source; + private long current; private Instant currentTimestamp; + private Instant started; + private long totalProduced; + public UnboundedCountingReader(UnboundedCountingSource source, CounterMark mark) { this.source = source; if (mark == null) { // Because we have not emitted an element yet, and start() calls advance, we need to // "un-advance" so that start() produces the correct output. this.current = source.start - source.stride; + // Do not produce an element immediately, to ensure that we do not exceed the requested + // period due to splits. + this.started = Instant.now(); + this.totalProduced = 0L; } else { this.current = mark.getLastEmitted(); + this.started = mark.getStartTime(); + this.totalProduced = mark.getElementsProduced(); } } @@ -327,11 +391,43 @@ public boolean advance() throws IOException { if (Long.MAX_VALUE - source.stride < current) { return false; } + if (getSplitBacklogElements() <= 0) { + return false; + } current += source.stride; currentTimestamp = source.timestampFn.apply(current); + totalProduced++; return true; } + /** + * Gets the size of the split backlog in number of elements. + */ + private long getSplitBacklogElements() { + if (source.period.getMillis() == 0) { + return Long.MAX_VALUE; + } + long msElapsed = Instant.now().getMillis() - started.getMillis(); + long expectedOutputs = + LongMath.checkedMultiply(source.elementsPerPeriod, msElapsed) / source.period.getMillis(); + return expectedOutputs - totalProduced; + } + + @Override + public long getSplitBacklogBytes() { + long backlogElems = getSplitBacklogElements(); + if (backlogElems == Long.MAX_VALUE) { + return BACKLOG_UNKNOWN; + } + // 8 == Long.BYTES + long backlogBytes = backlogElems * 8; + // overflow protection + if (backlogBytes < backlogElems) { + return Long.MAX_VALUE; + } + return backlogBytes; + } + @Override public Instant getWatermark() { return source.timestampFn.apply(current); @@ -339,7 +435,7 @@ public Instant getWatermark() { @Override public CounterMark getCheckpointMark() { - return new CounterMark(current); + return new CounterMark(current, started, totalProduced); } @Override @@ -369,12 +465,16 @@ public void close() throws IOException {} public static class CounterMark implements UnboundedSource.CheckpointMark { /** The last value emitted. */ private final long lastEmitted; + private final Instant startTime; + private final long totalProduced; /** * Creates a checkpoint mark reflecting the last emitted value. */ - public CounterMark(long lastEmitted) { + public CounterMark(long lastEmitted, Instant startTime, long totalProduced) { this.lastEmitted = lastEmitted; + this.startTime = startTime; + this.totalProduced = totalProduced; } /** @@ -384,11 +484,24 @@ public long getLastEmitted() { return lastEmitted; } + /** + * Returns the total number of elements already produced by this source. + */ + public long getElementsProduced() { + return totalProduced; + } + + public Instant getStartTime() { + return startTime; + } + ///////////////////////////////////////////////////////////////////////////////////// @SuppressWarnings("unused") // For AvroCoder private CounterMark() { this.lastEmitted = 0L; + this.startTime = Instant.now(); + this.totalProduced = 0L; } @Override diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java index cc609532ba76..14824ceb2362 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java @@ -17,6 +17,9 @@ package com.google.cloud.dataflow.sdk.io; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.io.CountingInput.UnboundedCountingInput; import com.google.cloud.dataflow.sdk.testing.DataflowAssert; @@ -31,6 +34,7 @@ import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; import com.google.cloud.dataflow.sdk.values.PCollection; +import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -82,6 +86,27 @@ public void testUnboundedInput() { p.run(); } + @Test + public void testUnboundedInputRate() { + Pipeline p = TestPipeline.create(); + long numElements = 5000; + + long elemsPerPeriod = 10L; + Duration periodLength = Duration.millis(8); + PCollection input = + p.apply( + CountingInput.unbounded() + .withRate(elemsPerPeriod, periodLength) + .withMaxNumRecords(numElements)); + + addCountingAsserts(input, numElements); + long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / elemsPerPeriod; + Instant startTime = Instant.now(); + p.run(); + Instant endTime = Instant.now(); + assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true)); + } + private static class ElementValueDiff extends DoFn { @Override public void processElement(ProcessContext c) throws Exception { diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingSourceTest.java index cc9db7978f78..315417049b50 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingSourceTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingSourceTest.java @@ -16,11 +16,15 @@ package com.google.cloud.dataflow.sdk.io; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.io.CountingSource.CounterMark; +import com.google.cloud.dataflow.sdk.io.CountingSource.UnboundedCountingSource; import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader; import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.testing.RunnableOnService; @@ -37,6 +41,7 @@ import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollectionList; +import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -149,6 +154,40 @@ public void testUnboundedSourceTimestamps() { p.run(); } + @Test + public void testUnboundedSourceWithRate() { + Pipeline p = TestPipeline.create(); + + Duration period = Duration.millis(5); + long numElements = 1000L; + + PCollection input = + p.apply( + Read.from( + CountingSource.createUnbounded() + .withTimestampFn(new ValueAsTimestampFn()) + .withRate(1, period)) + .withMaxNumRecords(numElements)); + addCountingAsserts(input, numElements); + + PCollection diffs = + input + .apply("TimestampDiff", ParDo.of(new ElementValueDiff())) + .apply("RemoveDuplicateTimestamps", RemoveDuplicates.create()); + // This assert also confirms that diffs only has one unique value. + DataflowAssert.thatSingleton(diffs).isEqualTo(0L); + + Instant started = Instant.now(); + p.run(); + Instant finished = Instant.now(); + Duration expectedDuration = period.multipliedBy((int) numElements); + assertThat( + started + .plus(expectedDuration) + .isBefore(finished), + is(true)); + } + @Test @Category(RunnableOnService.class) public void testUnboundedSourceSplits() throws Exception { @@ -174,6 +213,40 @@ public void testUnboundedSourceSplits() throws Exception { p.run(); } + @Test + public void testUnboundedSourceRateSplits() throws Exception { + Pipeline p = TestPipeline.create(); + int elementsPerPeriod = 10; + Duration period = Duration.millis(5); + + long numElements = 1000; + int numSplits = 10; + + UnboundedCountingSource initial = + CountingSource.createUnbounded().withRate(elementsPerPeriod, period); + List> splits = + initial.generateInitialSplits(numSplits, p.getOptions()); + assertEquals("Expected exact splitting", numSplits, splits.size()); + + long elementsPerSplit = numElements / numSplits; + assertEquals("Expected even splits", numElements, elementsPerSplit * numSplits); + PCollectionList pcollections = PCollectionList.empty(p); + for (int i = 0; i < splits.size(); ++i) { + pcollections = + pcollections.and( + p.apply("split" + i, Read.from(splits.get(i)).withMaxNumRecords(elementsPerSplit))); + } + PCollection input = pcollections.apply(Flatten.pCollections()); + + addCountingAsserts(input, numElements); + Instant startTime = Instant.now(); + p.run(); + Instant endTime = Instant.now(); + // 500 ms if the readers are all initialized in parallel; 5000 ms if they are evaluated serially + long expectedMinimumMillis = (numElements * period.getMillis()) / elementsPerPeriod; + assertThat(expectedMinimumMillis, lessThan(endTime.getMillis() - startTime.getMillis())); + } + /** * A timestamp function that uses the given value as the timestamp. Because the input values will * not wrap, this function is non-decreasing and meets the timestamp function criteria laid out From 9470cf209e325bcc6a9bba365427d07091be8016 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 30 Mar 2016 13:20:26 -0700 Subject: [PATCH 2/5] fixup! Add UnboundedCountingInput#withRate Use the expected produced value by the current time to decide if we should output elements. --- .../cloud/dataflow/sdk/io/CountingInput.java | 19 ++-- .../cloud/dataflow/sdk/io/CountingSource.java | 87 ++++++------------- 2 files changed, 36 insertions(+), 70 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java index d5ee92b92f79..b1c2c9939d64 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java @@ -90,14 +90,10 @@ public static BoundedCountingInput upTo(long numElements) { public static UnboundedCountingInput unbounded() { return new UnboundedCountingInput( new NowTimestampFn(), - // Elements per period - 1L, - // period length - Duration.ZERO, - // max num records - Optional.absent(), - // max read duration - Optional.absent()); + 1L /* Elements per period */, + Duration.ZERO /* Period length */, + Optional.absent() /* Maximum number of records */, + Optional.absent() /* Maximum read duration */); } /** @@ -180,10 +176,11 @@ public UnboundedCountingInput withMaxNumRecords(long maxRecords) { * Returns an {@link UnboundedCountingInput} like this one, but with output production limited * to an aggregate rate of no more than the number of elements per the period length. * - *

Note that this period is taken in aggregate across all instances of the - * {@link PTransform}, which may cause elements to be produced in bunches. + *

Note that when there are multiple splits, each split outputs independently. This may lead + * to elements not being produced evenly across time, though the aggregate rate will still + * approach the specified rate. * - *

A duration of {@link Duration#ZERO} will produce output as fast as possbile. + *

A duration of {@link Duration#ZERO} will produce output as fast as possible. */ public UnboundedCountingInput withRate(long numElements, Duration periodLength) { return new UnboundedCountingInput( diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java index a6ff0adab4eb..5d20e1dca66a 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java @@ -29,7 +29,6 @@ import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.collect.ImmutableList; -import com.google.common.math.LongMath; import org.joda.time.Duration; import org.joda.time.Instant; @@ -266,7 +265,13 @@ private UnboundedCountingSource( SerializableFunction timestampFn) { this.start = start; this.stride = stride; + checkArgument( + elementsPerPeriod > 0L, + "Must produce at least one element per period, got %s", + elementsPerPeriod); this.elementsPerPeriod = elementsPerPeriod; + checkArgument( + period.getMillis() >= 0L, "Must have a non-negative period length, got %s", period); this.period = period; this.timestampFn = timestampFn; } @@ -276,14 +281,6 @@ private UnboundedCountingSource( * will be produced with an interval between them equal to the period. */ public UnboundedCountingSource withRate(long elementsPerPeriod, Duration period) { - checkArgument( - elementsPerPeriod > 0, - "elements produced per period must be a positive value, got %s", - elementsPerPeriod); - checkArgument( - period.getMillis() >= 0, - "Period must be a non-negative value, got %s", - period.getMillis()); return new UnboundedCountingSource(start, stride, elementsPerPeriod, period, timestampFn); } @@ -314,7 +311,6 @@ public List> generat int desiredNumSplits, PipelineOptions options) throws Exception { // Using Javadoc example, stride 2 with 3 splits becomes stride 6. long newStride = stride * desiredNumSplits; - Duration newPeriod = period.multipliedBy(desiredNumSplits); ImmutableList.Builder splits = ImmutableList.builder(); for (int i = 0; i < desiredNumSplits; ++i) { @@ -322,7 +318,7 @@ public List> generat // 0, 2, and 4. splits.add( new UnboundedCountingSource( - start + i * stride, newStride, elementsPerPeriod, newPeriod, timestampFn)); + start + i * stride, newStride, elementsPerPeriod, period, timestampFn)); } return splits.build(); } @@ -340,7 +336,6 @@ public Coder getCheckpointMarkCoder() { @Override public void validate() { - checkArgument(period.getMillis() >= 0L); } @Override @@ -360,8 +355,7 @@ private static class UnboundedCountingReader extends UnboundedReader { private long current; private Instant currentTimestamp; - private Instant started; - private long totalProduced; + private Instant firstStarted; public UnboundedCountingReader(UnboundedCountingSource source, CounterMark mark) { this.source = source; @@ -369,19 +363,17 @@ public UnboundedCountingReader(UnboundedCountingSource source, CounterMark mark) // Because we have not emitted an element yet, and start() calls advance, we need to // "un-advance" so that start() produces the correct output. this.current = source.start - source.stride; - // Do not produce an element immediately, to ensure that we do not exceed the requested - // period due to splits. - this.started = Instant.now(); - this.totalProduced = 0L; } else { this.current = mark.getLastEmitted(); - this.started = mark.getStartTime(); - this.totalProduced = mark.getElementsProduced(); + this.firstStarted = mark.getStartTime(); } } @Override public boolean start() throws IOException { + if (firstStarted == null) { + this.firstStarted = Instant.now(); + } return advance(); } @@ -391,41 +383,18 @@ public boolean advance() throws IOException { if (Long.MAX_VALUE - source.stride < current) { return false; } - if (getSplitBacklogElements() <= 0) { + long nextValue = current + source.stride; + if (timeToEmit(nextValue).isAfter(Instant.now())) { return false; } - current += source.stride; + current = nextValue; currentTimestamp = source.timestampFn.apply(current); - totalProduced++; return true; } - /** - * Gets the size of the split backlog in number of elements. - */ - private long getSplitBacklogElements() { - if (source.period.getMillis() == 0) { - return Long.MAX_VALUE; - } - long msElapsed = Instant.now().getMillis() - started.getMillis(); - long expectedOutputs = - LongMath.checkedMultiply(source.elementsPerPeriod, msElapsed) / source.period.getMillis(); - return expectedOutputs - totalProduced; - } - - @Override - public long getSplitBacklogBytes() { - long backlogElems = getSplitBacklogElements(); - if (backlogElems == Long.MAX_VALUE) { - return BACKLOG_UNKNOWN; - } - // 8 == Long.BYTES - long backlogBytes = backlogElems * 8; - // overflow protection - if (backlogBytes < backlogElems) { - return Long.MAX_VALUE; - } - return backlogBytes; + private Instant timeToEmit(long value) { + long periodForValue = value / source.elementsPerPeriod; + return firstStarted.plus(source.period.multipliedBy(periodForValue)); } @Override @@ -435,7 +404,7 @@ public Instant getWatermark() { @Override public CounterMark getCheckpointMark() { - return new CounterMark(current, started, totalProduced); + return new CounterMark(current, firstStarted); } @Override @@ -455,6 +424,13 @@ public Instant getCurrentTimestamp() throws NoSuchElementException { @Override public void close() throws IOException {} + + @Override + public long getSplitBacklogBytes() { + long elapsedMillis = Instant.now().getMillis() - firstStarted.getMillis(); + long expected = source.elementsPerPeriod * (elapsedMillis / source.period.getMillis()); + return Math.max(0L, 8 * (expected - current) / source.stride); + } } /** @@ -466,15 +442,13 @@ public static class CounterMark implements UnboundedSource.CheckpointMark { /** The last value emitted. */ private final long lastEmitted; private final Instant startTime; - private final long totalProduced; /** * Creates a checkpoint mark reflecting the last emitted value. */ - public CounterMark(long lastEmitted, Instant startTime, long totalProduced) { + public CounterMark(long lastEmitted, Instant startTime) { this.lastEmitted = lastEmitted; this.startTime = startTime; - this.totalProduced = totalProduced; } /** @@ -485,12 +459,8 @@ public long getLastEmitted() { } /** - * Returns the total number of elements already produced by this source. + * Returns the time the reader was started. */ - public long getElementsProduced() { - return totalProduced; - } - public Instant getStartTime() { return startTime; } @@ -501,7 +471,6 @@ public Instant getStartTime() { private CounterMark() { this.lastEmitted = 0L; this.startTime = Instant.now(); - this.totalProduced = 0L; } @Override From 2dc8d5ab3ae2fe3ddc419c432d9167d1b48bdb75 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 30 Mar 2016 13:52:48 -0700 Subject: [PATCH 3/5] fixup! Add UnboundedCountingInput#withRate Use the expected value (with doubles for partial period precision) for determining whether to output elements and the size of the split backlog --- .../cloud/dataflow/sdk/io/CountingSource.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java index 5d20e1dca66a..cc6febc2288e 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java @@ -384,7 +384,7 @@ public boolean advance() throws IOException { return false; } long nextValue = current + source.stride; - if (timeToEmit(nextValue).isAfter(Instant.now())) { + if (expectedValue() < nextValue) { return false; } current = nextValue; @@ -392,9 +392,11 @@ public boolean advance() throws IOException { return true; } - private Instant timeToEmit(long value) { - long periodForValue = value / source.elementsPerPeriod; - return firstStarted.plus(source.period.multipliedBy(periodForValue)); + private long expectedValue() { + double periodsElapsed = + (Instant.now().getMillis() - firstStarted.getMillis()) + / (double) source.period.getMillis(); + return (long) (source.elementsPerPeriod * periodsElapsed); } @Override @@ -427,8 +429,7 @@ public void close() throws IOException {} @Override public long getSplitBacklogBytes() { - long elapsedMillis = Instant.now().getMillis() - firstStarted.getMillis(); - long expected = source.elementsPerPeriod * (elapsedMillis / source.period.getMillis()); + long expected = expectedValue(); return Math.max(0L, 8 * (expected - current) / source.stride); } } From 3708e032321682a38d01e1476deb5bf0c12659c2 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 30 Mar 2016 15:01:47 -0700 Subject: [PATCH 4/5] fixup! Add UnboundedCountingInput#withRate Short-circuit expected value in UnboundedCountingReader with zero duration --- .../java/com/google/cloud/dataflow/sdk/io/CountingSource.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java index cc6febc2288e..b5c349ce1b12 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java @@ -393,6 +393,9 @@ public boolean advance() throws IOException { } private long expectedValue() { + if (source.period.getMillis() == 0L) { + return Long.MAX_VALUE; + } double periodsElapsed = (Instant.now().getMillis() - firstStarted.getMillis()) / (double) source.period.getMillis(); From ec37b48fc4fc0ff64f6202dab393df9d7aa1b23b Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 30 Mar 2016 15:25:51 -0700 Subject: [PATCH 5/5] fixup! Add UnboundedCountingInput#withRate whitespace --- .../com/google/cloud/dataflow/sdk/io/CountingSource.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java index b5c349ce1b12..070a1c70d5de 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java @@ -335,8 +335,7 @@ public Coder getCheckpointMarkCoder() { } @Override - public void validate() { - } + public void validate() {} @Override public Coder getDefaultOutputCoder() { @@ -351,10 +350,8 @@ public Coder getDefaultOutputCoder() { */ private static class UnboundedCountingReader extends UnboundedReader { private UnboundedCountingSource source; - private long current; private Instant currentTimestamp; - private Instant firstStarted; public UnboundedCountingReader(UnboundedCountingSource source, CounterMark mark) {