From 0544b5158ce1c81b29279fbb0ecfa57e350a11c4 Mon Sep 17 00:00:00 2001 From: Pei He Date: Mon, 9 May 2016 15:59:58 -0700 Subject: [PATCH 01/11] Add toUnbounded() to get a Unbounded Read from a Bounded Read. --- .../direct/UnboundedReadEvaluatorFactory.java | 6 + .../io/BoundedReadFromUnboundedSource.java | 3 +- .../java/org/apache/beam/sdk/io/Read.java | 9 + .../io/UnboundedReadFromBoundedSource.java | 198 ++++++++++++++++++ .../apache/beam/sdk/io/UnboundedSource.java | 2 +- .../UnboundedReadFromBoundedSourceTest.java | 75 +++++++ 6 files changed, 291 insertions(+), 2 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSource.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSourceTest.java diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index 9a287b769dab..5760d37010f9 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -221,12 +221,18 @@ private boolean startReader() throws IOException { } } +<<<<<<< HEAD /** * Checkpoint the current reader, finalize the previous checkpoint, and update the state of this * evaluator. */ private void finishRead() throws IOException { final CheckpointMark oldMark = checkpointMark; +======= + private UnboundedReader createReader( + UnboundedSource source, PipelineOptions options) + throws IOException { +>>>>>>> 71424df... Add toUnbounded() to get a Unbounded Read from a Bounded Read. @SuppressWarnings("unchecked") final CheckpointMarkT mark = (CheckpointMarkT) currentReader.getCheckpointMark(); checkpointMark = mark; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java index 49b2ad4fab34..ba13f9dbe8bd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java @@ -197,7 +197,8 @@ public void validate() { } @Override - public BoundedReader> createReader(PipelineOptions options) { + public BoundedReader> createReader(PipelineOptions options) + throws IOException { return new Reader(source.createReader(options, null)); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index c0440f260104..ebba168d31fd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -114,6 +114,15 @@ public Bounded named(String name) { return new Bounded(name, source); } + /** + * Returns a new {@link UnboundedReadFromBoundedSourceTest}. + * + * It performs a unbounded read from the given {@link BoundedSource}. + */ + public UnboundedReadFromBoundedSource toUnbounded() { + return new UnboundedReadFromBoundedSource<>(source); + } + @Override protected Coder getDefaultOutputCoder() { return source.getDefaultOutputCoder(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSource.java new file mode 100644 index 000000000000..18a046d7f7f4 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSource.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; + +import com.google.api.client.util.Lists; + +import org.joda.time.Instant; + +import java.io.IOException; +import java.util.List; +import java.util.NoSuchElementException; + +import autovalue.shaded.com.google.common.common.annotations.VisibleForTesting; + +/** + * {@link PTransform} that performs a unbounded read from an {@link BoundedSource}. + * + *

Created by {@link Read}. + */ +public class UnboundedReadFromBoundedSource extends PTransform> { + private final BoundedSource source; + + UnboundedReadFromBoundedSource(BoundedSource source) { + this.source = source; + } + + @Override + public PCollection apply(PInput input) { + return Pipeline.applyTransform(input, + Read.from(new BoundedToUnboundedSourceAdapter<>(source))); + } + + @Override + protected Coder getDefaultOutputCoder() { + return source.getDefaultOutputCoder(); + } + + @Override + public String getKindString() { + return "Read(" + approximateSimpleName(source.getClass()) + ")"; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + // We explicitly do not register base-class data, instead we use the delegate inner source. + builder + .add(DisplayData.item("source", source.getClass())) + .include(source); + } + + @VisibleForTesting + static class BoundedToUnboundedSourceAdapter + extends UnboundedSource { + + private final BoundedSource boundedSource; + + public BoundedToUnboundedSourceAdapter(BoundedSource boundedSource) { + this.boundedSource = boundedSource; + } + + @Override + public void validate() { + boundedSource.validate(); + } + + @Override + public List> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits; + List> result = Lists.newArrayList(); + for (BoundedSource split : boundedSource.splitIntoBundles(desiredBundleSize, options)) { + result.add(new BoundedToUnboundedSourceAdapter<>(split)); + } + return result; + } + + @Override + public Reader createReader(PipelineOptions options, Checkpoint checkpoint) throws IOException { + return new Reader(boundedSource.createReader(options), checkpoint); + } + + @Override + public Coder getDefaultOutputCoder() { + return boundedSource.getDefaultOutputCoder(); + } + + static class Checkpoint implements UnboundedSource.CheckpointMark { + private final boolean done; + + public Checkpoint(boolean done) { + this.done = done; + } + + public boolean isDone() { + return done; + } + + @Override + public void finalizeCheckpoint() {} + } + + @Override + public Coder getCheckpointMarkCoder() { + return AvroCoder.of(Checkpoint.class); + } + + private class Reader extends UnboundedReader { + private final BoundedSource.BoundedReader boundedReader; + private boolean done; + + public Reader(BoundedSource.BoundedReader boundedReader, Checkpoint checkpoint) { + this.done = checkpoint != null && checkpoint.isDone(); + this.boundedReader = boundedReader; + } + + @Override + public boolean start() throws IOException { + if (done) { + return false; + } + boolean result = boundedReader.start(); + if (!result) { + done = true; + boundedReader.close(); + } + return result; + } + + @Override + public boolean advance() throws IOException { + if (done) { + return false; + } + boolean result = boundedReader.advance(); + if (!result) { + done = true; + boundedReader.close(); + } + return result; + } + + @Override + public void close() {} + + @Override + public T getCurrent() throws NoSuchElementException { + return boundedReader.getCurrent(); + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return boundedReader.getCurrentTimestamp(); + } + + @Override + public Instant getWatermark() { + return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + @Override + public Checkpoint getCheckpointMark() { + return new Checkpoint(done); + } + + @Override + public BoundedToUnboundedSourceAdapter getCurrentSource() { + return BoundedToUnboundedSourceAdapter.this; + } + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java index 2c4a32572864..ea3004ebc7ea 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java @@ -76,7 +76,7 @@ public abstract List> genera * checkpoint if present. */ public abstract UnboundedReader createReader( - PipelineOptions options, @Nullable CheckpointMarkT checkpointMark); + PipelineOptions options, @Nullable CheckpointMarkT checkpointMark) throws IOException; /** * Returns a {@link Coder} for encoding and decoding the checkpoints for this source, or diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSourceTest.java new file mode 100644 index 000000000000..9c207c46080c --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSourceTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; +import org.apache.beam.sdk.io.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.RunnableOnService; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Max; +import org.apache.beam.sdk.transforms.Min; +import org.apache.beam.sdk.transforms.RemoveDuplicates; +import org.apache.beam.sdk.values.PCollection; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link UnboundedReadFromBoundedSource}. + */ +@RunWith(JUnit4.class) +public class UnboundedReadFromBoundedSourceTest { + + @Test + @Category(RunnableOnService.class) + public void testBoundedToUnboundedSourceAdapter() throws Exception { + long numElements = 100; + BoundedSource boundedSource = CountingSource.upTo(numElements); + UnboundedSource unboundedSource = + new BoundedToUnboundedSourceAdapter<>(boundedSource); + + Pipeline p = TestPipeline.create(); + + PCollection output = + p.apply(Read.from(unboundedSource).withMaxNumRecords(numElements)); + + // Count == numElements + PAssert + .thatSingleton(output.apply("Count", Count.globally())) + .isEqualTo(numElements); + // Unique count == numElements + PAssert + .thatSingleton(output.apply(RemoveDuplicates.create()) + .apply("UniqueCount", Count.globally())) + .isEqualTo(numElements); + // Min == 0 + PAssert + .thatSingleton(output.apply("Min", Min.globally())) + .isEqualTo(0L); + // Max == numElements-1 + PAssert + .thatSingleton(output.apply("Max", Max.globally())) + .isEqualTo(numElements - 1); + p.run(); + } +} From de01db2444a07c73432c2bed4d0a85d9ed41c999 Mon Sep 17 00:00:00 2001 From: Pei He Date: Mon, 16 May 2016 14:38:13 -0700 Subject: [PATCH 02/11] fixup: addressed feedback --- .../beam/runners/dataflow/DataflowRunner.java | 37 +++- .../runners/dataflow/DataflowRunnerTest.java | 23 -- .../java/org/apache/beam/sdk/io/Read.java | 9 - .../io/UnboundedReadFromBoundedSource.java | 206 ++++++++++++++---- .../UnboundedReadFromBoundedSourceTest.java | 51 ++++- 5 files changed, 248 insertions(+), 78 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 5818ba536c98..86e7ef6c6d99 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -60,6 +60,7 @@ import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.BigQueryIO; +import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.PubsubIO; import org.apache.beam.sdk.io.PubsubUnboundedSink; @@ -67,6 +68,7 @@ import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.ShardNameTemplate; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.UnboundedReadFromBoundedSource; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.options.PipelineOptions; @@ -353,11 +355,9 @@ public static DataflowRunner fromOptions(PipelineOptions options) { builder.put(View.AsIterable.class, StreamingViewAsIterable.class); builder.put(Write.Bound.class, StreamingWrite.class); builder.put(Read.Unbounded.class, StreamingUnboundedRead.class); - builder.put(Read.Bounded.class, UnsupportedIO.class); - builder.put(AvroIO.Read.Bound.class, UnsupportedIO.class); + builder.put(Read.Bounded.class, StreamingBoundedRead.class); builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class); builder.put(BigQueryIO.Read.Bound.class, UnsupportedIO.class); - builder.put(TextIO.Read.Bound.class, UnsupportedIO.class); builder.put(TextIO.Write.Bound.class, UnsupportedIO.class); builder.put(Window.Bound.class, AssignWindows.class); // In streaming mode must use either the custom Pubsub unbounded source/sink or @@ -2623,6 +2623,37 @@ public void processElement(ProcessContext c) { } } + /** + * Specialized implementation for + * {@link org.apache.beam.sdk.io.Read.Bounded Read.Bounded} for the + * Dataflow runner in streaming mode. + */ + private static class StreamingBoundedRead extends PTransform> { + private final BoundedSource source; + + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply() + public StreamingBoundedRead(DataflowPipelineRunner runner, Read.Bounded transform) { + this.source = transform.getSource(); + } + + @Override + protected Coder getDefaultOutputCoder() { + return source.getDefaultOutputCoder(); + } + + @Override + public final PCollection apply(PInput input) { + source.validate(); + + return Pipeline + .applyTransform(input, new UnboundedReadFromBoundedSource<>(source)) + .setIsBoundedInternal(IsBounded.BOUNDED); + } + } + /** * Specialized implementation for * {@link org.apache.beam.sdk.transforms.Create.Values Create.Values} for the diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index e094d0d3ed38..a37ed27408a3 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -55,7 +55,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.io.AvroSource; import org.apache.beam.sdk.io.BigQueryIO; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.TextIO; @@ -897,34 +896,12 @@ private void testUnsupportedSource(PTransform source, String name, bo p.run(); } - @Test - public void testBoundedSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource( - AvroSource.readFromFileWithClass("foo", String.class), "Read.Bounded", true); - } - @Test public void testBigQueryIOSourceUnsupportedInStreaming() throws Exception { testUnsupportedSource( BigQueryIO.Read.from("project:bar.baz").withoutValidation(), "BigQueryIO.Read", true); } - @Test - public void testAvroIOSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource( - AvroIO.Read.from("foo"), "AvroIO.Read", true); - } - - @Test - public void testTextIOSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource(TextIO.Read.from("foo"), "TextIO.Read", true); - } - - @Test - public void testReadBoundedSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource(Read.from(AvroSource.from("/tmp/test")), "Read.Bounded", true); - } - @Test public void testReadUnboundedUnsupportedInBatch() throws Exception { testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index ebba168d31fd..c0440f260104 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -114,15 +114,6 @@ public Bounded named(String name) { return new Bounded(name, source); } - /** - * Returns a new {@link UnboundedReadFromBoundedSourceTest}. - * - * It performs a unbounded read from the given {@link BoundedSource}. - */ - public UnboundedReadFromBoundedSource toUnbounded() { - return new UnboundedReadFromBoundedSource<>(source); - } - @Override protected Coder getDefaultOutputCoder() { return source.getDefaultOutputCoder(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSource.java index 18a046d7f7f4..89032f672596 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSource.java @@ -20,24 +20,38 @@ import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.TimestampedValue; import com.google.api.client.util.Lists; +import com.google.api.client.util.Preconditions; +import com.google.common.annotations.VisibleForTesting; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import org.joda.time.Instant; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; import java.util.List; import java.util.NoSuchElementException; -import autovalue.shaded.com.google.common.common.annotations.VisibleForTesting; +import javax.annotation.Nullable; /** * {@link PTransform} that performs a unbounded read from an {@link BoundedSource}. @@ -47,7 +61,7 @@ public class UnboundedReadFromBoundedSource extends PTransform> { private final BoundedSource source; - UnboundedReadFromBoundedSource(BoundedSource source) { + public UnboundedReadFromBoundedSource(BoundedSource source) { this.source = source; } @@ -77,9 +91,9 @@ public void populateDisplayData(DisplayData.Builder builder) { @VisibleForTesting static class BoundedToUnboundedSourceAdapter - extends UnboundedSource { + extends UnboundedSource> { - private final BoundedSource boundedSource; + private BoundedSource boundedSource; public BoundedToUnboundedSourceAdapter(BoundedSource boundedSource) { this.boundedSource = boundedSource; @@ -102,8 +116,13 @@ public List> generateInitialSplits( } @Override - public Reader createReader(PipelineOptions options, Checkpoint checkpoint) throws IOException { - return new Reader(boundedSource.createReader(options), checkpoint); + public Reader createReader(PipelineOptions options, Checkpoint checkpoint) + throws IOException { + if (checkpoint == null) { + return new Reader(null /* residualElements */, boundedSource, options); + } else { + return new Reader(checkpoint.residualElements, checkpoint.residualSource, options); + } } @Override @@ -111,72 +130,156 @@ public Coder getDefaultOutputCoder() { return boundedSource.getDefaultOutputCoder(); } - static class Checkpoint implements UnboundedSource.CheckpointMark { - private final boolean done; + @SuppressWarnings({"rawtypes", "unchecked"}) + @Override + public Coder> getCheckpointMarkCoder() { + return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder()); + } - public Checkpoint(boolean done) { - this.done = done; - } + @VisibleForTesting + static class Checkpoint implements UnboundedSource.CheckpointMark { + private final List> residualElements; + private final BoundedSource residualSource; - public boolean isDone() { - return done; + public Checkpoint( + List> residualElements, BoundedSource residualSource) { + this.residualElements = residualElements; + this.residualSource = residualSource; } @Override public void finalizeCheckpoint() {} } - @Override - public Coder getCheckpointMarkCoder() { - return AvroCoder.of(Checkpoint.class); + private static class CheckpointCoder extends AtomicCoder> { + private final Coder>> elemsCoder; + private final Coder elemCoder; + + @JsonCreator + public static CheckpointCoder of( + @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) + List> components) { + Preconditions.checkArgument(components.size() == 1, + "Expecting 1 components, got " + components.size()); + return new CheckpointCoder<>(components.get(0)); + } + + @SuppressWarnings("rawtypes") + private final SerializableCoder serializableCoder = + SerializableCoder.of(BoundedSource.class); + + CheckpointCoder(Coder elemCoder) { + this.elemsCoder = ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)); + this.elemCoder = elemCoder; + } + + @Override + public void encode(Checkpoint value, OutputStream outStream, Context context) + throws CoderException, IOException { + elemsCoder.encode(value.residualElements, outStream, context); + serializableCoder.encode(value.residualSource, outStream, context); + } + + @SuppressWarnings("unchecked") + @Override + public Checkpoint decode(InputStream inStream, Context context) + throws CoderException, IOException { + return new Checkpoint<>( + elemsCoder.decode(inStream, context), + serializableCoder.decode(inStream, context)); + } + + @Override + public List> getCoderArguments() { + return Arrays.>asList(elemCoder); + } } - private class Reader extends UnboundedReader { - private final BoundedSource.BoundedReader boundedReader; + @VisibleForTesting + class Reader extends UnboundedReader { + private final PipelineOptions options; + + private @Nullable final List> residualElements; + private @Nullable BoundedSource residualSource; + + private boolean currentElementInList; + private int currentIndexInList; + private BoundedReader reader; private boolean done; - public Reader(BoundedSource.BoundedReader boundedReader, Checkpoint checkpoint) { - this.done = checkpoint != null && checkpoint.isDone(); - this.boundedReader = boundedReader; + public Reader( + @Nullable List> residualElements, + @Nullable BoundedSource residualSource, + PipelineOptions options) { + this.residualElements = residualElements; + this.residualSource = residualSource; + this.options = Preconditions.checkNotNull(options, "options"); + this.currentElementInList = true; + this.currentIndexInList = -1; + this.reader = null; + this.done = false; } @Override public boolean start() throws IOException { - if (done) { - return false; - } - boolean result = boundedReader.start(); - if (!result) { - done = true; - boundedReader.close(); - } - return result; + return advance(); } @Override public boolean advance() throws IOException { - if (done) { + if (advanceInList()) { + done = false; + } else { + done = !advanceInReader(); + } + return !done; + } + + private boolean advanceInList() { + if (residualElements != null && currentIndexInList + 1 < residualElements.size()) { + ++currentIndexInList; + currentElementInList = true; + return true; + } else { + currentElementInList = false; return false; } - boolean result = boundedReader.advance(); - if (!result) { - done = true; - boundedReader.close(); + } + + private boolean advanceInReader() throws IOException { + boolean hasNext; + if (boundedSource == null) { + hasNext = false; + } else if (reader == null) { + reader = residualSource.createReader(options); + hasNext = reader.start(); + } else { + hasNext = reader.advance(); } - return result; + return hasNext; } @Override - public void close() {} + public void close() throws IOException { + reader.close(); + } @Override public T getCurrent() throws NoSuchElementException { - return boundedReader.getCurrent(); + if (currentElementInList) { + return residualElements.get(currentIndexInList).getValue(); + } else { + return reader.getCurrent(); + } } @Override public Instant getCurrentTimestamp() throws NoSuchElementException { - return boundedReader.getCurrentTimestamp(); + if (currentElementInList) { + return residualElements.get(currentIndexInList).getTimestamp(); + } else { + return reader.getCurrentTimestamp(); + } } @Override @@ -185,8 +288,27 @@ public Instant getWatermark() { } @Override - public Checkpoint getCheckpointMark() { - return new Checkpoint(done); + public Checkpoint getCheckpointMark() { + List> newResidualElements = Lists.newArrayList(); + BoundedSource newResidualSource; + if (currentElementInList) { + while (advanceInList()) { + newResidualElements.add(residualElements.get(currentIndexInList)); + } + newResidualSource = residualSource; + } else { + BoundedSource residualSplit = reader.splitAtFraction(reader.getFractionConsumed()); + try { + while (advanceInReader()) { + newResidualElements.add( + TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp())); + } + } catch (NoSuchElementException | IOException e) { + throw new RuntimeException("Failed to read elements from the bounded reader.", e); + } + newResidualSource = residualSplit; + } + return new Checkpoint(newResidualElements, newResidualSource); } @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSourceTest.java index 9c207c46080c..ee43d0240436 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSourceTest.java @@ -18,8 +18,11 @@ package org.apache.beam.sdk.io; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; import org.apache.beam.sdk.io.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; @@ -27,13 +30,21 @@ import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.Min; import org.apache.beam.sdk.transforms.RemoveDuplicates; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.PCollection; +import com.google.api.client.util.Lists; +import com.google.common.collect.Sets; + +import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.util.List; +import java.util.Set; + /** * Unit tests for {@link UnboundedReadFromBoundedSource}. */ @@ -45,7 +56,7 @@ public class UnboundedReadFromBoundedSourceTest { public void testBoundedToUnboundedSourceAdapter() throws Exception { long numElements = 100; BoundedSource boundedSource = CountingSource.upTo(numElements); - UnboundedSource unboundedSource = + UnboundedSource> unboundedSource = new BoundedToUnboundedSourceAdapter<>(boundedSource); Pipeline p = TestPipeline.create(); @@ -72,4 +83,42 @@ public void testBoundedToUnboundedSourceAdapter() throws Exception { .isEqualTo(numElements - 1); p.run(); } + + @Test + public void testBoundedToUnboundedSourceAdapterCheckpoint() throws Exception { + long numElements = 100; + BoundedSource boundedSource = CountingSource.upTo(numElements); + BoundedToUnboundedSourceAdapter unboundedSource = + new BoundedToUnboundedSourceAdapter<>(boundedSource); + + PipelineOptions options = PipelineOptionsFactory.create(); + BoundedToUnboundedSourceAdapter.Reader reader = + unboundedSource.createReader(options, null); + + Set expected = Sets.newHashSet(); + for (long i = 0; i < numElements; ++i) { + expected.add(i); + } + + List actual = Lists.newArrayList(); + for (boolean hasNext = reader.start(); hasNext;) { + actual.add(reader.getCurrent()); + if (actual.size() % 9 == 0) { + Coder> checkpointCoder = unboundedSource.getCheckpointMarkCoder(); + Checkpoint decodedCheckpoint = CoderUtils.decodeFromByteArray( + checkpointCoder, + CoderUtils.encodeToByteArray(checkpointCoder, reader.getCheckpointMark())); + + BoundedToUnboundedSourceAdapter.Reader restarted = + unboundedSource.createReader(options, decodedCheckpoint); + reader.close(); + reader = restarted; + hasNext = reader.start(); + } else { + hasNext = reader.advance(); + } + } + Assert.assertEquals(100, actual.size()); + Assert.assertEquals(expected, Sets.newHashSet(actual)); + } } From 0f7ab0b2b24ff6d7497ca7de7f4c1151d5c6e7a3 Mon Sep 17 00:00:00 2001 From: Pei He Date: Wed, 18 May 2016 14:02:29 -0700 Subject: [PATCH 03/11] fixup: addressed feedback --- .../beam/sdk/io/UnboundedReadFromBoundedSource.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSource.java index 89032f672596..fac61f6efdd7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSource.java @@ -199,7 +199,7 @@ public List> getCoderArguments() { class Reader extends UnboundedReader { private final PipelineOptions options; - private @Nullable final List> residualElements; + private final List> residualElements; private @Nullable BoundedSource residualSource; private boolean currentElementInList; @@ -208,10 +208,10 @@ class Reader extends UnboundedReader { private boolean done; public Reader( - @Nullable List> residualElements, + List> residualElements, @Nullable BoundedSource residualSource, PipelineOptions options) { - this.residualElements = residualElements; + this.residualElements = Preconditions.checkNotNull(residualElements, "residualElements"); this.residualSource = residualSource; this.options = Preconditions.checkNotNull(options, "options"); this.currentElementInList = true; @@ -303,7 +303,7 @@ public Checkpoint getCheckpointMark() { newResidualElements.add( TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp())); } - } catch (NoSuchElementException | IOException e) { + } catch (IOException e) { throw new RuntimeException("Failed to read elements from the bounded reader.", e); } newResidualSource = residualSplit; From e4ccde1291a9e2bf871973a13ce96c89552691af Mon Sep 17 00:00:00 2001 From: Pei He Date: Wed, 18 May 2016 16:11:30 -0700 Subject: [PATCH 04/11] fixup: addressed feedback --- .../io/UnboundedReadFromBoundedSource.java | 51 +++++++++++++------ 1 file changed, 36 insertions(+), 15 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSource.java index fac61f6efdd7..335e479fe4cb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSource.java @@ -20,11 +20,11 @@ import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.PTransform; @@ -119,7 +119,7 @@ public List> generateInitialSplits( public Reader createReader(PipelineOptions options, Checkpoint checkpoint) throws IOException { if (checkpoint == null) { - return new Reader(null /* residualElements */, boundedSource, options); + return new Reader(Lists.>newArrayList(), boundedSource, options); } else { return new Reader(checkpoint.residualElements, checkpoint.residualSource, options); } @@ -151,9 +151,7 @@ public Checkpoint( public void finalizeCheckpoint() {} } - private static class CheckpointCoder extends AtomicCoder> { - private final Coder>> elemsCoder; - private final Coder elemCoder; + private static class CheckpointCoder extends StandardCoder> { @JsonCreator public static CheckpointCoder of( @@ -164,6 +162,8 @@ public static CheckpointCoder of( return new CheckpointCoder<>(components.get(0)); } + private final Coder>> elemsCoder; + private final Coder elemCoder; @SuppressWarnings("rawtypes") private final SerializableCoder serializableCoder = SerializableCoder.of(BoundedSource.class); @@ -176,23 +176,31 @@ public static CheckpointCoder of( @Override public void encode(Checkpoint value, OutputStream outStream, Context context) throws CoderException, IOException { - elemsCoder.encode(value.residualElements, outStream, context); - serializableCoder.encode(value.residualSource, outStream, context); + Context nested = context.nested(); + elemsCoder.encode(value.residualElements, outStream, nested); + serializableCoder.encode(value.residualSource, outStream, nested); } @SuppressWarnings("unchecked") @Override public Checkpoint decode(InputStream inStream, Context context) throws CoderException, IOException { + Context nested = context.nested(); return new Checkpoint<>( - elemsCoder.decode(inStream, context), - serializableCoder.decode(inStream, context)); + elemsCoder.decode(inStream, nested), + serializableCoder.decode(inStream, nested)); } @Override public List> getCoderArguments() { return Arrays.>asList(elemCoder); } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + serializableCoder.verifyDeterministic(); + elemsCoder.verifyDeterministic(); + } } @VisibleForTesting @@ -204,7 +212,7 @@ class Reader extends UnboundedReader { private boolean currentElementInList; private int currentIndexInList; - private BoundedReader reader; + private @Nullable BoundedReader reader; private boolean done; public Reader( @@ -232,6 +240,9 @@ public boolean advance() throws IOException { } else { done = !advanceInReader(); } + if (done) { + close(); + } return !done; } @@ -248,7 +259,7 @@ private boolean advanceInList() { private boolean advanceInReader() throws IOException { boolean hasNext; - if (boundedSource == null) { + if (residualSource == null) { hasNext = false; } else if (reader == null) { reader = residualSource.createReader(options); @@ -261,15 +272,21 @@ private boolean advanceInReader() throws IOException { @Override public void close() throws IOException { - reader.close(); + if (reader != null) { + reader.close(); + residualSource = null; + reader = null; + } } @Override public T getCurrent() throws NoSuchElementException { if (currentElementInList) { return residualElements.get(currentIndexInList).getValue(); - } else { + } else if (reader != null){ return reader.getCurrent(); + } else { + throw new NoSuchElementException(); } } @@ -277,8 +294,10 @@ public T getCurrent() throws NoSuchElementException { public Instant getCurrentTimestamp() throws NoSuchElementException { if (currentElementInList) { return residualElements.get(currentIndexInList).getTimestamp(); - } else { + } else if (reader != null){ return reader.getCurrentTimestamp(); + } else { + throw new NoSuchElementException(); } } @@ -296,7 +315,7 @@ public Checkpoint getCheckpointMark() { newResidualElements.add(residualElements.get(currentIndexInList)); } newResidualSource = residualSource; - } else { + } else if (reader != null) { BoundedSource residualSplit = reader.splitAtFraction(reader.getFractionConsumed()); try { while (advanceInReader()) { @@ -307,6 +326,8 @@ public Checkpoint getCheckpointMark() { throw new RuntimeException("Failed to read elements from the bounded reader.", e); } newResidualSource = residualSplit; + } else { + newResidualSource = residualSource; } return new Checkpoint(newResidualElements, newResidualSource); } From 9cc3412bb44b2d6f69d129834a5109939bac7147 Mon Sep 17 00:00:00 2001 From: Pei He Date: Thu, 9 Jun 2016 18:31:02 -0700 Subject: [PATCH 05/11] addressed comments --- .../core}/UnboundedReadFromBoundedSource.java | 52 ++++++++++++++----- .../UnboundedReadFromBoundedSourceTest.java | 24 ++++++--- .../beam/runners/dataflow/DataflowRunner.java | 2 +- 3 files changed, 57 insertions(+), 21 deletions(-) rename {sdks/java/core/src/main/java/org/apache/beam/sdk/io => runners/core-java/src/main/java/org/apache/beam/runners/core}/UnboundedReadFromBoundedSource.java (86%) rename {sdks/java/core/src/test/java/org/apache/beam/sdk/io => runners/core-java/src/test/java/org/apache/beam/runners/core}/UnboundedReadFromBoundedSourceTest.java (84%) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java similarity index 86% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSource.java rename to runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java index 335e479fe4cb..f44c2dee6981 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSource.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io; +package org.apache.beam.runners.core; import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; @@ -25,6 +25,9 @@ import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.PTransform; @@ -35,9 +38,11 @@ import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.TimestampedValue; -import com.google.api.client.util.Lists; -import com.google.api.client.util.Preconditions; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -61,6 +66,9 @@ public class UnboundedReadFromBoundedSource extends PTransform> { private final BoundedSource source; + /** + * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}. + */ public UnboundedReadFromBoundedSource(BoundedSource source) { this.source = source; } @@ -90,7 +98,7 @@ public void populateDisplayData(DisplayData.Builder builder) { } @VisibleForTesting - static class BoundedToUnboundedSourceAdapter + public static class BoundedToUnboundedSourceAdapter extends UnboundedSource> { private BoundedSource boundedSource; @@ -108,11 +116,19 @@ public void validate() { public List> generateInitialSplits( int desiredNumSplits, PipelineOptions options) throws Exception { long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits; - List> result = Lists.newArrayList(); - for (BoundedSource split : boundedSource.splitIntoBundles(desiredBundleSize, options)) { - result.add(new BoundedToUnboundedSourceAdapter<>(split)); + List> splits; + if (desiredBundleSize > 0 + && (splits = boundedSource.splitIntoBundles(desiredBundleSize, options)) != null) { + return Lists.transform( + splits, + new Function, BoundedToUnboundedSourceAdapter>() { + @Override + public BoundedToUnboundedSourceAdapter apply(BoundedSource input) { + return new BoundedToUnboundedSourceAdapter<>(input); + }}); + } else { + return ImmutableList.of(this); } - return result; } @Override @@ -203,6 +219,12 @@ public void verifyDeterministic() throws NonDeterministicException { } } + /** + * An {@code UnboundedReader} that wraps a {@code BoundedSource}. + * + *

It checkpoints by splitting the {@code BoundedSource} and tracking residual + * elements from the current source. + */ @VisibleForTesting class Reader extends UnboundedReader { private final PipelineOptions options; @@ -215,7 +237,7 @@ class Reader extends UnboundedReader { private @Nullable BoundedReader reader; private boolean done; - public Reader( + Reader( List> residualElements, @Nullable BoundedSource residualSource, PipelineOptions options) { @@ -311,11 +333,19 @@ public Checkpoint getCheckpointMark() { List> newResidualElements = Lists.newArrayList(); BoundedSource newResidualSource; if (currentElementInList) { + // 1. Part of residualElements are consumed. + // Checkpoints the remaining elements and residualSource. while (advanceInList()) { newResidualElements.add(residualElements.get(currentIndexInList)); } newResidualSource = residualSource; - } else if (reader != null) { + } else if (reader == null) { + // 2. All of residualElements are consumed. + // Only checkpoints the residualSource. + newResidualSource = residualSource; + } else { + // 3. All of residualElements and part of residualSource are consumed. + // Splits the residualSource and tracks the new residualElements in current source. BoundedSource residualSplit = reader.splitAtFraction(reader.getFractionConsumed()); try { while (advanceInReader()) { @@ -326,8 +356,6 @@ public Checkpoint getCheckpointMark() { throw new RuntimeException("Failed to read elements from the bounded reader.", e); } newResidualSource = residualSplit; - } else { - newResidualSource = residualSource; } return new Checkpoint(newResidualElements, newResidualSource); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java similarity index 84% rename from sdks/java/core/src/test/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSourceTest.java rename to runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java index ee43d0240436..a28f0c03210e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/UnboundedReadFromBoundedSourceTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java @@ -15,12 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io; +package org.apache.beam.runners.core; +import static org.junit.Assert.assertEquals; + +import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; +import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; -import org.apache.beam.sdk.io.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; @@ -36,7 +42,6 @@ import com.google.api.client.util.Lists; import com.google.common.collect.Sets; -import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -103,22 +108,25 @@ public void testBoundedToUnboundedSourceAdapterCheckpoint() throws Exception { List actual = Lists.newArrayList(); for (boolean hasNext = reader.start(); hasNext;) { actual.add(reader.getCurrent()); + // checkpoint every 9 elements if (actual.size() % 9 == 0) { + Checkpoint checkpoint = reader.getCheckpointMark(); Coder> checkpointCoder = unboundedSource.getCheckpointMarkCoder(); Checkpoint decodedCheckpoint = CoderUtils.decodeFromByteArray( checkpointCoder, - CoderUtils.encodeToByteArray(checkpointCoder, reader.getCheckpointMark())); + CoderUtils.encodeToByteArray(checkpointCoder, checkpoint)); + reader.close(); + checkpoint.finalizeCheckpoint(); BoundedToUnboundedSourceAdapter.Reader restarted = unboundedSource.createReader(options, decodedCheckpoint); - reader.close(); reader = restarted; hasNext = reader.start(); } else { hasNext = reader.advance(); } } - Assert.assertEquals(100, actual.size()); - Assert.assertEquals(expected, Sets.newHashSet(actual)); + assertEquals(numElements, actual.size()); + assertEquals(expected, Sets.newHashSet(actual)); } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 86e7ef6c6d99..1b1e5beff31e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -24,6 +24,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import org.apache.beam.runners.core.UnboundedReadFromBoundedSource; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext; @@ -68,7 +69,6 @@ import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.ShardNameTemplate; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.UnboundedReadFromBoundedSource; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.options.PipelineOptions; From 120d4c7cff9fd9efd12363b13adad683a633524f Mon Sep 17 00:00:00 2001 From: Pei He Date: Fri, 10 Jun 2016 14:36:59 -0700 Subject: [PATCH 06/11] addressed comments --- .../core/UnboundedReadFromBoundedSource.java | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java index f44c2dee6981..c40c15e36773 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java @@ -48,6 +48,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; @@ -64,6 +66,9 @@ *

Created by {@link Read}. */ public class UnboundedReadFromBoundedSource extends PTransform> { + + private static final Logger LOG = LoggerFactory.getLogger(UnboundedReadFromBoundedSource.class); + private final BoundedSource source; /** @@ -115,10 +120,18 @@ public void validate() { @Override public List> generateInitialSplits( int desiredNumSplits, PipelineOptions options) throws Exception { - long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits; - List> splits; - if (desiredBundleSize > 0 - && (splits = boundedSource.splitIntoBundles(desiredBundleSize, options)) != null) { + try { + long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits; + if (desiredBundleSize <= 0) { + LOG.warn("BoundedSource cannot estimate its size, skips the initial splits."); + return ImmutableList.of(this); + } + List> splits + = boundedSource.splitIntoBundles(desiredBundleSize, options); + if (splits == null) { + LOG.warn("BoundedSource cannot split, skips the initial splits."); + return ImmutableList.of(this); + } return Lists.transform( splits, new Function, BoundedToUnboundedSourceAdapter>() { @@ -126,7 +139,8 @@ public List> generateInitialSplits( public BoundedToUnboundedSourceAdapter apply(BoundedSource input) { return new BoundedToUnboundedSourceAdapter<>(input); }}); - } else { + } catch (Exception e) { + LOG.warn("Exception while splitting, skips the initial splits.", e); return ImmutableList.of(this); } } @@ -244,7 +258,7 @@ class Reader extends UnboundedReader { this.residualElements = Preconditions.checkNotNull(residualElements, "residualElements"); this.residualSource = residualSource; this.options = Preconditions.checkNotNull(options, "options"); - this.currentElementInList = true; + this.currentElementInList = false; this.currentIndexInList = -1; this.reader = null; this.done = false; From 2908b4a1995cf8351c797fc425880d7596561c94 Mon Sep 17 00:00:00 2001 From: Pei He Date: Tue, 14 Jun 2016 16:16:51 -0700 Subject: [PATCH 07/11] test unsplittable source and refactor the Reader code --- runners/core-java/pom.xml | 32 ++ .../core/UnboundedReadFromBoundedSource.java | 316 +++++++++++++----- .../UnboundedReadFromBoundedSourceTest.java | 277 ++++++++++++++- .../direct/UnboundedReadEvaluatorFactory.java | 6 - 4 files changed, 520 insertions(+), 111 deletions(-) diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml index 1587a1af36d2..3f8410bb3ee4 100644 --- a/runners/core-java/pom.xml +++ b/runners/core-java/pom.xml @@ -195,6 +195,38 @@ beam-sdks-java-core + + org.apache.commons + commons-compress + 1.9 + + + + + com.fasterxml.jackson.core + jackson-annotations + + + + joda-time + joda-time + + + + com.google.guava + guava + + + + com.google.code.findbugs + jsr305 + + + + org.slf4j + slf4j-api + + diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java index c40c15e36773..b9455257e886 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java @@ -19,16 +19,19 @@ import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; +import static com.google.common.base.Preconditions.checkNotNull; + import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -55,6 +58,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; @@ -102,6 +106,9 @@ public void populateDisplayData(DisplayData.Builder builder) { .include(source); } + /** + * A {@code BoundedSource} to {@code UnboundedSource} adapter. + */ @VisibleForTesting public static class BoundedToUnboundedSourceAdapter extends UnboundedSource> { @@ -149,7 +156,7 @@ public BoundedToUnboundedSourceAdapter apply(BoundedSource input) { public Reader createReader(PipelineOptions options, Checkpoint checkpoint) throws IOException { if (checkpoint == null) { - return new Reader(Lists.>newArrayList(), boundedSource, options); + return new Reader(null /* residualElements */, boundedSource, options); } else { return new Reader(checkpoint.residualElements, checkpoint.residualSource, options); } @@ -168,20 +175,32 @@ public Coder> getCheckpointMarkCoder() { @VisibleForTesting static class Checkpoint implements UnboundedSource.CheckpointMark { - private final List> residualElements; - private final BoundedSource residualSource; + private final @Nullable List> residualElements; + private final @Nullable BoundedSource residualSource; public Checkpoint( - List> residualElements, BoundedSource residualSource) { + @Nullable List> residualElements, + @Nullable BoundedSource residualSource) { this.residualElements = residualElements; this.residualSource = residualSource; } @Override public void finalizeCheckpoint() {} + + @VisibleForTesting + @Nullable List> getResidualElements() { + return residualElements; + } + + @VisibleForTesting + @Nullable BoundedSource getResidualSource() { + return residualSource; + } } - private static class CheckpointCoder extends StandardCoder> { + @VisibleForTesting + static class CheckpointCoder extends StandardCoder> { @JsonCreator public static CheckpointCoder of( @@ -195,11 +214,12 @@ public static CheckpointCoder of( private final Coder>> elemsCoder; private final Coder elemCoder; @SuppressWarnings("rawtypes") - private final SerializableCoder serializableCoder = - SerializableCoder.of(BoundedSource.class); + private final Coder sourceCoder = + NullableCoder.of(SerializableCoder.of(BoundedSource.class)); CheckpointCoder(Coder elemCoder) { - this.elemsCoder = ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)); + this.elemsCoder = NullableCoder.of( + ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder))); this.elemCoder = elemCoder; } @@ -208,7 +228,7 @@ public void encode(Checkpoint value, OutputStream outStream, Context context) throws CoderException, IOException { Context nested = context.nested(); elemsCoder.encode(value.residualElements, outStream, nested); - serializableCoder.encode(value.residualSource, outStream, nested); + sourceCoder.encode(value.residualSource, outStream, nested); } @SuppressWarnings("unchecked") @@ -218,7 +238,7 @@ public Checkpoint decode(InputStream inStream, Context context) Context nested = context.nested(); return new Checkpoint<>( elemsCoder.decode(inStream, nested), - serializableCoder.decode(inStream, nested)); + sourceCoder.decode(inStream, nested)); } @Override @@ -228,42 +248,45 @@ public List> getCoderArguments() { @Override public void verifyDeterministic() throws NonDeterministicException { - serializableCoder.verifyDeterministic(); + sourceCoder.verifyDeterministic(); elemsCoder.verifyDeterministic(); } } /** - * An {@code UnboundedReader} that wraps a {@code BoundedSource}. + * An {@code UnboundedReader} that wraps a {@code BoundedSource} into + * {@link ResidualElements} and {@link ResidualSource}. * - *

It checkpoints by splitting the {@code BoundedSource} and tracking residual - * elements from the current source. + *

In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains + * the {@code BoundedSource}. After the first checkpoint, the {@code BoundedSource} will + * be split into {@link ResidualElements} and {@link ResidualSource}. */ @VisibleForTesting class Reader extends UnboundedReader { + private @Nullable ResidualElements residualElements; + private @Nullable ResidualSource residualSource; private final PipelineOptions options; - - private final List> residualElements; - private @Nullable BoundedSource residualSource; - - private boolean currentElementInList; - private int currentIndexInList; - private @Nullable BoundedReader reader; private boolean done; Reader( - List> residualElements, + @Nullable List> residualElementsList, @Nullable BoundedSource residualSource, PipelineOptions options) { - this.residualElements = Preconditions.checkNotNull(residualElements, "residualElements"); - this.residualSource = residualSource; - this.options = Preconditions.checkNotNull(options, "options"); - this.currentElementInList = false; - this.currentIndexInList = -1; - this.reader = null; + init(residualElementsList, residualSource, options); + this.options = checkNotNull(options, "options"); this.done = false; } + private void init( + @Nullable List> residualElementsList, + @Nullable BoundedSource residualSource, + PipelineOptions options) { + this.residualElements = + residualElementsList == null ? null : new ResidualElements(residualElementsList); + this.residualSource = + residualSource == null ? null : new ResidualSource(residualSource, options); + } + @Override public boolean start() throws IOException { return advance(); @@ -271,56 +294,29 @@ public boolean start() throws IOException { @Override public boolean advance() throws IOException { - if (advanceInList()) { - done = false; - } else { - done = !advanceInReader(); - } - if (done) { - close(); - } - return !done; - } - - private boolean advanceInList() { - if (residualElements != null && currentIndexInList + 1 < residualElements.size()) { - ++currentIndexInList; - currentElementInList = true; + if (residualElements != null && residualElements.advance()) { + return true; + } else if (residualSource != null && residualSource.advance()) { return true; } else { - currentElementInList = false; + done = true; return false; } } - private boolean advanceInReader() throws IOException { - boolean hasNext; - if (residualSource == null) { - hasNext = false; - } else if (reader == null) { - reader = residualSource.createReader(options); - hasNext = reader.start(); - } else { - hasNext = reader.advance(); - } - return hasNext; - } - @Override public void close() throws IOException { - if (reader != null) { - reader.close(); - residualSource = null; - reader = null; + if (residualSource != null) { + residualSource.close(); } } @Override public T getCurrent() throws NoSuchElementException { - if (currentElementInList) { - return residualElements.get(currentIndexInList).getValue(); - } else if (reader != null){ - return reader.getCurrent(); + if (residualElements != null && residualElements.hasCurrent()) { + return residualElements.getCurrent(); + } else if (residualSource != null) { + return residualSource.getCurrent(); } else { throw new NoSuchElementException(); } @@ -328,10 +324,10 @@ public T getCurrent() throws NoSuchElementException { @Override public Instant getCurrentTimestamp() throws NoSuchElementException { - if (currentElementInList) { - return residualElements.get(currentIndexInList).getTimestamp(); - } else if (reader != null){ - return reader.getCurrentTimestamp(); + if (residualElements != null && residualElements.hasCurrent()) { + return residualElements.getCurrentTimestamp(); + } else if (residualSource != null) { + return residualSource.getCurrentTimestamp(); } else { throw new NoSuchElementException(); } @@ -342,41 +338,179 @@ public Instant getWatermark() { return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE; } + /** + * {@inheritDoc} + * + *

If only part of the {@link ResidualElements} is consumed, the new + * checkpoint will contain the remaining elements in {@link ResidualElements} and + * the {@link ResidualSource}. + * + *

If all {@link ResidualElements} and part of the + * {@link ResidualSource} are consumed, the new checkpoint is done by splitting + * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}. + * {@link ResidualSource} is the source split from the current source, + * and {@link ResidualElements} contains rest elements from the current source after + * the splitting. For unsplittable source, it will put all remaining elements into + * the {@link ResidualElements}. + */ @Override public Checkpoint getCheckpointMark() { - List> newResidualElements = Lists.newArrayList(); - BoundedSource newResidualSource; - if (currentElementInList) { - // 1. Part of residualElements are consumed. + Checkpoint newCheckpoint; + if (residualElements != null && !residualElements.done()) { + // Part of residualElements are consumed. // Checkpoints the remaining elements and residualSource. - while (advanceInList()) { - newResidualElements.add(residualElements.get(currentIndexInList)); + newCheckpoint = new Checkpoint<>( + residualElements.getRestElements(), + residualSource == null ? null : residualSource.getSource()); + } else if (residualSource != null) { + newCheckpoint = residualSource.getCheckpointMark(); + } else { + newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */); + } + // Re-initialize since the residualElements and the residualSource might be + // consumed or split by checkpointing. + init(newCheckpoint.residualElements, newCheckpoint.residualSource, options); + return newCheckpoint; + } + + @Override + public BoundedToUnboundedSourceAdapter getCurrentSource() { + return BoundedToUnboundedSourceAdapter.this; + } + } + + private class ResidualElements { + private final List> elementsList; + private @Nullable Iterator> elementsIterator; + private @Nullable TimestampedValue currentT; + private boolean hasCurrent; + private boolean done; + + ResidualElements(List> residualElementsList) { + this.elementsList = checkNotNull(residualElementsList, "residualElementsList"); + this.elementsIterator = null; + this.currentT = null; + this.hasCurrent = false; + this.done = false; + } + + public boolean advance() { + if (elementsIterator == null) { + elementsIterator = elementsList.iterator(); + } + if (elementsIterator.hasNext()) { + currentT = elementsIterator.next(); + hasCurrent = true; + return true; + } else { + done = true; + return false; + } + } + + boolean hasCurrent() { + return hasCurrent; + } + + boolean done() { + return done; + } + + TimestampedValue getCurrentTimestampedValue() { + if (!hasCurrent) { + throw new NoSuchElementException(); + } + return currentT; + } + + T getCurrent() { + return getCurrentTimestampedValue().getValue(); + } + + Instant getCurrentTimestamp() { + return getCurrentTimestampedValue().getTimestamp(); + } + + List> getRestElements() { + if (elementsIterator == null) { + return elementsList; + } else { + List> newResidualElements = Lists.newArrayList(); + while (elementsIterator.hasNext()) { + newResidualElements.add(elementsIterator.next()); } - newResidualSource = residualSource; - } else if (reader == null) { - // 2. All of residualElements are consumed. - // Only checkpoints the residualSource. - newResidualSource = residualSource; + return newResidualElements; + } + } + } + + private class ResidualSource { + private BoundedSource residualSource; + private PipelineOptions options; + private @Nullable BoundedReader reader; + + public ResidualSource(BoundedSource residualSource, PipelineOptions options) { + this.residualSource = checkNotNull(residualSource, "residualSource"); + this.options = checkNotNull(options, "options"); + this.reader = null; + } + + private boolean advance() throws IOException { + if (reader == null) { + reader = residualSource.createReader(options); + return reader.start(); } else { - // 3. All of residualElements and part of residualSource are consumed. + return reader.advance(); + } + } + + T getCurrent() throws NoSuchElementException { + if (reader == null) { + throw new NoSuchElementException(); + } + return reader.getCurrent(); + } + + Instant getCurrentTimestamp() throws NoSuchElementException { + if (reader == null) { + throw new NoSuchElementException(); + } + return reader.getCurrentTimestamp(); + } + + void close() throws IOException { + if (reader != null) { + reader.close(); + } + } + + BoundedSource getSource() { + return residualSource; + } + + Checkpoint getCheckpointMark() { + if (reader == null) { + // Reader hasn't started, checkpoint the residualSource. + return new Checkpoint<>(null /* residualElements */, residualSource); + } else { + // Part of residualSource are consumed. // Splits the residualSource and tracks the new residualElements in current source. - BoundedSource residualSplit = reader.splitAtFraction(reader.getFractionConsumed()); + BoundedSource residualSplit = null; + Double fractionConsumed = reader.getFractionConsumed(); + if (fractionConsumed != null) { + residualSplit = reader.splitAtFraction(fractionConsumed); + } + List> newResidualElements = Lists.newArrayList(); try { - while (advanceInReader()) { + while (advance()) { newResidualElements.add( TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp())); } } catch (IOException e) { throw new RuntimeException("Failed to read elements from the bounded reader.", e); } - newResidualSource = residualSplit; + return new Checkpoint<>(newResidualElements, residualSplit); } - return new Checkpoint(newResidualElements, newResidualSource); - } - - @Override - public BoundedToUnboundedSourceAdapter getCurrentSource() { - return BoundedToUnboundedSourceAdapter.this; } } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java index a28f0c03210e..d56d79ed4f87 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java @@ -18,13 +18,20 @@ package org.apache.beam.runners.core; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint; +import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.CheckpointCoder; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.CompressedSource; +import org.apache.beam.sdk.io.CompressedSource.CompressionMode; import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.FileBasedSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; @@ -38,17 +45,32 @@ import org.apache.beam.sdk.transforms.RemoveDuplicates; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; -import com.google.api.client.util.Lists; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; +import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; import java.util.List; -import java.util.Set; +import java.util.NoSuchElementException; +import java.util.Random; /** * Unit tests for {@link UnboundedReadFromBoundedSource}. @@ -56,6 +78,23 @@ @RunWith(JUnit4.class) public class UnboundedReadFromBoundedSourceTest { + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + + @Test + public void testCheckpointCoderNulls() throws Exception { + CheckpointCoder coder = new CheckpointCoder<>(StringUtf8Coder.of()); + Checkpoint emptyCheckpoint = new Checkpoint<>(null, null); + Checkpoint decodedEmptyCheckpoint = CoderUtils.decodeFromByteArray( + coder, + CoderUtils.encodeToByteArray(coder, emptyCheckpoint)); + assertNull(decodedEmptyCheckpoint.getResidualElements()); + assertNull(decodedEmptyCheckpoint.getResidualSource()); + } + @Test @Category(RunnableOnService.class) public void testBoundedToUnboundedSourceAdapter() throws Exception { @@ -90,35 +129,106 @@ public void testBoundedToUnboundedSourceAdapter() throws Exception { } @Test - public void testBoundedToUnboundedSourceAdapterCheckpoint() throws Exception { + public void testCountingSourceToUnboundedCheckpoint() throws Exception { long numElements = 100; - BoundedSource boundedSource = CountingSource.upTo(numElements); - BoundedToUnboundedSourceAdapter unboundedSource = + BoundedSource countingSource = CountingSource.upTo(numElements); + List expected = Lists.newArrayList(); + for (long i = 0; i < numElements; ++i) { + expected.add(i); + } + testBoundedToUnboundedSourceAdapterCheckpoint(countingSource, expected); + } + + @Test + public void testCompressedSourceToUnboundedCheckpoint() throws Exception { + String baseName = "test-input"; + File compressedFile = tmpFolder.newFile(baseName + ".gz"); + byte[] input = generateInput(100); + writeFile(compressedFile, input, CompressionMode.GZIP); + + CompressedSource source = + CompressedSource.from(new ByteSource(compressedFile.getPath(), 1)); + List expected = Lists.newArrayList(); + for (byte i : input) { + expected.add(i); + } + testBoundedToUnboundedSourceAdapterCheckpoint(source, expected); + } + + private void testBoundedToUnboundedSourceAdapterCheckpoint( + BoundedSource boundedSource, + List expectedElements) throws Exception { + BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>(boundedSource); PipelineOptions options = PipelineOptionsFactory.create(); - BoundedToUnboundedSourceAdapter.Reader reader = + BoundedToUnboundedSourceAdapter.Reader reader = unboundedSource.createReader(options, null); - Set expected = Sets.newHashSet(); + List actual = Lists.newArrayList(); + for (boolean hasNext = reader.start(); hasNext; hasNext = reader.advance()) { + actual.add(reader.getCurrent()); + // checkpoint every 9 elements + if (actual.size() % 9 == 0) { + Checkpoint checkpoint = reader.getCheckpointMark(); + checkpoint.finalizeCheckpoint(); + } + } + assertEquals(expectedElements.size(), actual.size()); + assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual)); + } + + @Test + public void testCountingSourceToUnboundedCheckpointRestart() throws Exception { + long numElements = 100; + BoundedSource countingSource = CountingSource.upTo(numElements); + List expected = Lists.newArrayList(); for (long i = 0; i < numElements; ++i) { expected.add(i); } + testBoundedToUnboundedSourceAdapterCheckpointRestart(countingSource, expected); + } + + @Test + public void testCompressedSourceToUnboundedCheckpointRestart() throws Exception { + String baseName = "test-input"; + File compressedFile = tmpFolder.newFile(baseName + ".gz"); + byte[] input = generateInput(100); + writeFile(compressedFile, input, CompressionMode.GZIP); + + CompressedSource source = + CompressedSource.from(new ByteSource(compressedFile.getPath(), 1)); + List expected = Lists.newArrayList(); + for (byte i : input) { + expected.add(i); + } + testBoundedToUnboundedSourceAdapterCheckpointRestart(source, expected); + } + + private void testBoundedToUnboundedSourceAdapterCheckpointRestart( + BoundedSource boundedSource, + List expectedElements) throws Exception { + BoundedToUnboundedSourceAdapter unboundedSource = + new BoundedToUnboundedSourceAdapter<>(boundedSource); + + PipelineOptions options = PipelineOptionsFactory.create(); + BoundedToUnboundedSourceAdapter.Reader reader = + unboundedSource.createReader(options, null); - List actual = Lists.newArrayList(); + List actual = Lists.newArrayList(); for (boolean hasNext = reader.start(); hasNext;) { actual.add(reader.getCurrent()); // checkpoint every 9 elements if (actual.size() % 9 == 0) { - Checkpoint checkpoint = reader.getCheckpointMark(); - Coder> checkpointCoder = unboundedSource.getCheckpointMarkCoder(); - Checkpoint decodedCheckpoint = CoderUtils.decodeFromByteArray( + Checkpoint checkpoint = reader.getCheckpointMark(); + Coder> checkpointCoder = unboundedSource.getCheckpointMarkCoder(); + Checkpoint decodedCheckpoint = CoderUtils.decodeFromByteArray( checkpointCoder, CoderUtils.encodeToByteArray(checkpointCoder, checkpoint)); reader.close(); checkpoint.finalizeCheckpoint(); - BoundedToUnboundedSourceAdapter.Reader restarted = + BoundedToUnboundedSourceAdapter.Reader restarted = unboundedSource.createReader(options, decodedCheckpoint); reader = restarted; hasNext = reader.start(); @@ -126,7 +236,146 @@ public void testBoundedToUnboundedSourceAdapterCheckpoint() throws Exception { hasNext = reader.advance(); } } - assertEquals(numElements, actual.size()); - assertEquals(expected, Sets.newHashSet(actual)); + assertEquals(expectedElements.size(), actual.size()); + assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual)); + } + + @Test + public void testReadBeforeStart() throws Exception { + thrown.expect(NoSuchElementException.class); + + BoundedSource countingSource = CountingSource.upTo(100); + BoundedToUnboundedSourceAdapter unboundedSource = + new BoundedToUnboundedSourceAdapter<>(countingSource); + PipelineOptions options = PipelineOptionsFactory.create(); + + unboundedSource.createReader(options, null).getCurrent(); + } + + @Test + public void testReadFromCheckpointBeforeStart() throws Exception { + thrown.expect(NoSuchElementException.class); + + BoundedSource countingSource = CountingSource.upTo(100); + BoundedToUnboundedSourceAdapter unboundedSource = + new BoundedToUnboundedSourceAdapter<>(countingSource); + PipelineOptions options = PipelineOptionsFactory.create(); + + List> elements = + ImmutableList.of(TimestampedValue.of(1L, new Instant(1L))); + Checkpoint checkpoint = new Checkpoint<>(elements, countingSource); + unboundedSource.createReader(options, checkpoint).getCurrent(); + } + + /** + * Generate byte array of given size. + */ + private static byte[] generateInput(int size) { + // Arbitrary but fixed seed + Random random = new Random(285930); + byte[] buff = new byte[size]; + for (int i = 0; i < size; i++) { + buff[i] = (byte) (random.nextInt() % Byte.MAX_VALUE); + } + return buff; + } + + /** + * Get a compressing stream for a given compression mode. + */ + private static OutputStream getOutputStreamForMode(CompressionMode mode, OutputStream stream) + throws IOException { + switch (mode) { + case GZIP: + return new GzipCompressorOutputStream(stream); + case BZIP2: + return new BZip2CompressorOutputStream(stream); + default: + throw new RuntimeException("Unexpected compression mode"); + } + } + + /** + * Writes a single output file. + */ + private static void writeFile(File file, byte[] input, CompressionMode mode) throws IOException { + try (OutputStream os = getOutputStreamForMode(mode, new FileOutputStream(file))) { + os.write(input); + } + } + + /** + * Dummy source for use in tests. + */ + private static class ByteSource extends FileBasedSource { + public ByteSource(String fileOrPatternSpec, long minBundleSize) { + super(fileOrPatternSpec, minBundleSize); + } + + public ByteSource(String fileName, long minBundleSize, long startOffset, long endOffset) { + super(fileName, minBundleSize, startOffset, endOffset); + } + + @Override + protected FileBasedSource createForSubrangeOfFile(String fileName, long start, long end) { + return new ByteSource(fileName, getMinBundleSize(), start, end); + } + + @Override + protected FileBasedReader createSingleFileReader(PipelineOptions options) { + return new ByteReader(this); + } + + @Override + public boolean producesSortedKeys(PipelineOptions options) throws Exception { + return false; + } + + @Override + public Coder getDefaultOutputCoder() { + return SerializableCoder.of(Byte.class); + } + + private static class ByteReader extends FileBasedReader { + ByteBuffer buff = ByteBuffer.allocate(1); + Byte current; + long offset = -1; + ReadableByteChannel channel; + + public ByteReader(ByteSource source) { + super(source); + } + + @Override + public Byte getCurrent() throws NoSuchElementException { + return current; + } + + @Override + protected boolean isAtSplitPoint() { + return true; + } + + @Override + protected void startReading(ReadableByteChannel channel) throws IOException { + this.channel = channel; + } + + @Override + protected boolean readNextRecord() throws IOException { + buff.clear(); + if (channel.read(buff) != 1) { + return false; + } + current = new Byte(buff.get(0)); + offset += 1; + return true; + } + + @Override + protected long getCurrentOffset() { + return offset; + } + } } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index 5760d37010f9..9a287b769dab 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -221,18 +221,12 @@ private boolean startReader() throws IOException { } } -<<<<<<< HEAD /** * Checkpoint the current reader, finalize the previous checkpoint, and update the state of this * evaluator. */ private void finishRead() throws IOException { final CheckpointMark oldMark = checkpointMark; -======= - private UnboundedReader createReader( - UnboundedSource source, PipelineOptions options) - throws IOException { ->>>>>>> 71424df... Add toUnbounded() to get a Unbounded Read from a Bounded Read. @SuppressWarnings("unchecked") final CheckpointMarkT mark = (CheckpointMarkT) currentReader.getCheckpointMark(); checkpointMark = mark; From 44b595dc6c193083f37d0209da89e1c4935b3817 Mon Sep 17 00:00:00 2001 From: Pei He Date: Fri, 17 Jun 2016 13:59:27 -0700 Subject: [PATCH 08/11] Addressed comments --- runners/core-java/pom.xml | 7 -- .../core/UnboundedReadFromBoundedSource.java | 85 ++++++++++++------- .../UnboundedReadFromBoundedSourceTest.java | 72 ++++++---------- .../beam/runners/dataflow/DataflowRunner.java | 5 +- .../runners/dataflow/DataflowRunnerTest.java | 7 -- 5 files changed, 86 insertions(+), 90 deletions(-) diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml index 3f8410bb3ee4..98d80bb1bbe2 100644 --- a/runners/core-java/pom.xml +++ b/runners/core-java/pom.xml @@ -195,13 +195,6 @@ beam-sdks-java-core - - org.apache.commons - commons-compress - 1.9 - - - com.fasterxml.jackson.core jackson-annotations diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java index b9455257e886..ba3c0f03b729 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.core; import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; - +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import org.apache.beam.sdk.Pipeline; @@ -43,7 +43,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -58,6 +57,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; @@ -65,9 +65,20 @@ import javax.annotation.Nullable; /** - * {@link PTransform} that performs a unbounded read from an {@link BoundedSource}. + * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}. + * + *

{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles}, + * and element timestamps are propagated. While any elements remain, the watermark is the beginning + * of time {@code BoundedWindow.TIMESTAMP_MIN_VALUE}, and after all elements have been produced + * the watermark goes to the end of time {@code BoundedWindow.TIMESTAMP_MAX_VALUE}. + * + *

Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner + * {@link BoundedSource}. + * Sources that cannot be split are read entirely into memory, so this transform does not work well + * with large, unsplittable sources. * - *

Created by {@link Read}. + *

This transform is intended to be used by a runner during pipeline translation to convert + * a Read.Bounded into a Read.Unbounded. */ public class UnboundedReadFromBoundedSource extends PTransform> { @@ -130,13 +141,14 @@ public List> generateInitialSplits( try { long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits; if (desiredBundleSize <= 0) { - LOG.warn("BoundedSource cannot estimate its size, skips the initial splits."); + LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.", + boundedSource); return ImmutableList.of(this); } List> splits = boundedSource.splitIntoBundles(desiredBundleSize, options); if (splits == null) { - LOG.warn("BoundedSource cannot split, skips the initial splits."); + LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource); return ImmutableList.of(this); } return Lists.transform( @@ -147,7 +159,7 @@ public BoundedToUnboundedSourceAdapter apply(BoundedSource input) { return new BoundedToUnboundedSourceAdapter<>(input); }}); } catch (Exception e) { - LOG.warn("Exception while splitting, skips the initial splits.", e); + LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e); return ImmutableList.of(this); } } @@ -156,7 +168,10 @@ public BoundedToUnboundedSourceAdapter apply(BoundedSource input) { public Reader createReader(PipelineOptions options, Checkpoint checkpoint) throws IOException { if (checkpoint == null) { - return new Reader(null /* residualElements */, boundedSource, options); + return new Reader( + Collections.>emptyList() /* residualElements */, + boundedSource, + options); } else { return new Reader(checkpoint.residualElements, checkpoint.residualSource, options); } @@ -175,11 +190,11 @@ public Coder> getCheckpointMarkCoder() { @VisibleForTesting static class Checkpoint implements UnboundedSource.CheckpointMark { - private final @Nullable List> residualElements; + private final List> residualElements; private final @Nullable BoundedSource residualSource; public Checkpoint( - @Nullable List> residualElements, + List> residualElements, @Nullable BoundedSource residualSource) { this.residualElements = residualElements; this.residualSource = residualSource; @@ -189,7 +204,7 @@ public Checkpoint( public void finalizeCheckpoint() {} @VisibleForTesting - @Nullable List> getResidualElements() { + List> getResidualElements() { return residualElements; } @@ -206,21 +221,24 @@ static class CheckpointCoder extends StandardCoder> { public static CheckpointCoder of( @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List> components) { - Preconditions.checkArgument(components.size() == 1, - "Expecting 1 components, got " + components.size()); + checkArgument(components.size() == 1, + "Expecting 1 components, got %s", components.size()); return new CheckpointCoder<>(components.get(0)); } + // The coder for a list of residual elements and their timestamps private final Coder>> elemsCoder; + // The coder from the BoundedReader for coding each element private final Coder elemCoder; + // The nullable and serializable coder for the BoundedSource. @SuppressWarnings("rawtypes") - private final Coder sourceCoder = - NullableCoder.of(SerializableCoder.of(BoundedSource.class)); + private final Coder sourceCoder; CheckpointCoder(Coder elemCoder) { this.elemsCoder = NullableCoder.of( ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder))); this.elemCoder = elemCoder; + this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class)); } @Override @@ -248,8 +266,8 @@ public List> getCoderArguments() { @Override public void verifyDeterministic() throws NonDeterministicException { - sourceCoder.verifyDeterministic(); - elemsCoder.verifyDeterministic(); + throw new NonDeterministicException(this, + "CheckpointCoder uses Java Serialization, which may be non-deterministic."); } } @@ -263,13 +281,13 @@ public void verifyDeterministic() throws NonDeterministicException { */ @VisibleForTesting class Reader extends UnboundedReader { - private @Nullable ResidualElements residualElements; + private ResidualElements residualElements; private @Nullable ResidualSource residualSource; private final PipelineOptions options; private boolean done; Reader( - @Nullable List> residualElementsList, + List> residualElementsList, @Nullable BoundedSource residualSource, PipelineOptions options) { init(residualElementsList, residualSource, options); @@ -278,11 +296,10 @@ class Reader extends UnboundedReader { } private void init( - @Nullable List> residualElementsList, + List> residualElementsList, @Nullable BoundedSource residualSource, PipelineOptions options) { - this.residualElements = - residualElementsList == null ? null : new ResidualElements(residualElementsList); + this.residualElements = new ResidualElements(residualElementsList); this.residualSource = residualSource == null ? null : new ResidualSource(residualSource, options); } @@ -294,7 +311,7 @@ public boolean start() throws IOException { @Override public boolean advance() throws IOException { - if (residualElements != null && residualElements.advance()) { + if (residualElements.advance()) { return true; } else if (residualSource != null && residualSource.advance()) { return true; @@ -313,7 +330,7 @@ public void close() throws IOException { @Override public T getCurrent() throws NoSuchElementException { - if (residualElements != null && residualElements.hasCurrent()) { + if (residualElements.hasCurrent()) { return residualElements.getCurrent(); } else if (residualSource != null) { return residualSource.getCurrent(); @@ -324,7 +341,7 @@ public T getCurrent() throws NoSuchElementException { @Override public Instant getCurrentTimestamp() throws NoSuchElementException { - if (residualElements != null && residualElements.hasCurrent()) { + if (residualElements.hasCurrent()) { return residualElements.getCurrentTimestamp(); } else if (residualSource != null) { return residualSource.getCurrentTimestamp(); @@ -356,7 +373,7 @@ public Instant getWatermark() { @Override public Checkpoint getCheckpointMark() { Checkpoint newCheckpoint; - if (residualElements != null && !residualElements.done()) { + if (!residualElements.done()) { // Part of residualElements are consumed. // Checkpoints the remaining elements and residualSource. newCheckpoint = new Checkpoint<>( @@ -404,6 +421,7 @@ public boolean advance() { return true; } else { done = true; + hasCurrent = false; return false; } } @@ -448,15 +466,17 @@ private class ResidualSource { private BoundedSource residualSource; private PipelineOptions options; private @Nullable BoundedReader reader; + private boolean closed; public ResidualSource(BoundedSource residualSource, PipelineOptions options) { this.residualSource = checkNotNull(residualSource, "residualSource"); this.options = checkNotNull(options, "options"); this.reader = null; + this.closed = false; } private boolean advance() throws IOException { - if (reader == null) { + if (reader == null && !closed) { reader = residualSource.createReader(options); return reader.start(); } else { @@ -481,7 +501,9 @@ Instant getCurrentTimestamp() throws NoSuchElementException { void close() throws IOException { if (reader != null) { reader.close(); + reader = null; } + closed = true; } BoundedSource getSource() { @@ -497,8 +519,13 @@ Checkpoint getCheckpointMark() { // Splits the residualSource and tracks the new residualElements in current source. BoundedSource residualSplit = null; Double fractionConsumed = reader.getFractionConsumed(); - if (fractionConsumed != null) { - residualSplit = reader.splitAtFraction(fractionConsumed); + if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) { + double fractionRest = 1 - fractionConsumed; + int splitAttampts = 8; + for (int i = 0; i < 8 && residualSplit == null; ++i) { + double fractionToSplit = fractionConsumed + fractionRest * i / splitAttampts; + residualSplit = reader.splitAtFraction(fractionToSplit); + } } List> newResidualElements = Lists.newArrayList(); try { diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java index d56d79ed4f87..afd0927dbf19 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java @@ -28,16 +28,14 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.CompressedSource; -import org.apache.beam.sdk.io.CompressedSource.CompressionMode; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.FileBasedSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Max; @@ -51,8 +49,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; -import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; @@ -96,7 +92,7 @@ public void testCheckpointCoderNulls() throws Exception { } @Test - @Category(RunnableOnService.class) + @Category(NeedsRunner.class) public void testBoundedToUnboundedSourceAdapter() throws Exception { long numElements = 100; BoundedSource boundedSource = CountingSource.upTo(numElements); @@ -140,14 +136,13 @@ public void testCountingSourceToUnboundedCheckpoint() throws Exception { } @Test - public void testCompressedSourceToUnboundedCheckpoint() throws Exception { + public void testUnsplittableSourceToUnboundedCheckpoint() throws Exception { String baseName = "test-input"; File compressedFile = tmpFolder.newFile(baseName + ".gz"); byte[] input = generateInput(100); - writeFile(compressedFile, input, CompressionMode.GZIP); + writeFile(compressedFile, input); - CompressedSource source = - CompressedSource.from(new ByteSource(compressedFile.getPath(), 1)); + BoundedSource source = new UnsplittableSource(compressedFile.getPath(), 1); List expected = Lists.newArrayList(); for (byte i : input) { expected.add(i); @@ -190,14 +185,13 @@ public void testCountingSourceToUnboundedCheckpointRestart() throws Exception { } @Test - public void testCompressedSourceToUnboundedCheckpointRestart() throws Exception { + public void testUnsplittableSourceToUnboundedCheckpointRestart() throws Exception { String baseName = "test-input"; File compressedFile = tmpFolder.newFile(baseName + ".gz"); - byte[] input = generateInput(100); - writeFile(compressedFile, input, CompressionMode.GZIP); + byte[] input = generateInput(1000); + writeFile(compressedFile, input); - CompressedSource source = - CompressedSource.from(new ByteSource(compressedFile.getPath(), 1)); + BoundedSource source = new UnsplittableSource(compressedFile.getPath(), 1); List expected = Lists.newArrayList(); for (byte i : input) { expected.add(i); @@ -274,56 +268,40 @@ private static byte[] generateInput(int size) { // Arbitrary but fixed seed Random random = new Random(285930); byte[] buff = new byte[size]; - for (int i = 0; i < size; i++) { - buff[i] = (byte) (random.nextInt() % Byte.MAX_VALUE); - } + random.nextBytes(buff); return buff; } - /** - * Get a compressing stream for a given compression mode. - */ - private static OutputStream getOutputStreamForMode(CompressionMode mode, OutputStream stream) - throws IOException { - switch (mode) { - case GZIP: - return new GzipCompressorOutputStream(stream); - case BZIP2: - return new BZip2CompressorOutputStream(stream); - default: - throw new RuntimeException("Unexpected compression mode"); - } - } - /** * Writes a single output file. */ - private static void writeFile(File file, byte[] input, CompressionMode mode) throws IOException { - try (OutputStream os = getOutputStreamForMode(mode, new FileOutputStream(file))) { + private static void writeFile(File file, byte[] input) throws IOException { + try (OutputStream os = new FileOutputStream(file)) { os.write(input); } } /** - * Dummy source for use in tests. + * Unsplittable source for use in tests. */ - private static class ByteSource extends FileBasedSource { - public ByteSource(String fileOrPatternSpec, long minBundleSize) { + private static class UnsplittableSource extends FileBasedSource { + public UnsplittableSource(String fileOrPatternSpec, long minBundleSize) { super(fileOrPatternSpec, minBundleSize); } - public ByteSource(String fileName, long minBundleSize, long startOffset, long endOffset) { + public UnsplittableSource( + String fileName, long minBundleSize, long startOffset, long endOffset) { super(fileName, minBundleSize, startOffset, endOffset); } @Override protected FileBasedSource createForSubrangeOfFile(String fileName, long start, long end) { - return new ByteSource(fileName, getMinBundleSize(), start, end); + return new UnsplittableSource(fileName, getMinBundleSize(), start, end); } @Override protected FileBasedReader createSingleFileReader(PipelineOptions options) { - return new ByteReader(this); + return new UnsplittableReader(this); } @Override @@ -336,14 +314,15 @@ public Coder getDefaultOutputCoder() { return SerializableCoder.of(Byte.class); } - private static class ByteReader extends FileBasedReader { + private static class UnsplittableReader extends FileBasedReader { ByteBuffer buff = ByteBuffer.allocate(1); Byte current; - long offset = -1; + long offset; ReadableByteChannel channel; - public ByteReader(ByteSource source) { + public UnsplittableReader(UnsplittableSource source) { super(source); + offset = source.getStartOffset() - 1; } @Override @@ -351,6 +330,11 @@ public Byte getCurrent() throws NoSuchElementException { return current; } + @Override + public boolean allowsDynamicSplitting() { + return false; + } + @Override protected boolean isAtSplitPoint() { return true; diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 1b1e5beff31e..6f2e4044a07d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -357,7 +357,6 @@ public static DataflowRunner fromOptions(PipelineOptions options) { builder.put(Read.Unbounded.class, StreamingUnboundedRead.class); builder.put(Read.Bounded.class, StreamingBoundedRead.class); builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class); - builder.put(BigQueryIO.Read.Bound.class, UnsupportedIO.class); builder.put(TextIO.Write.Bound.class, UnsupportedIO.class); builder.put(Window.Bound.class, AssignWindows.class); // In streaming mode must use either the custom Pubsub unbounded source/sink or @@ -2634,8 +2633,8 @@ private static class StreamingBoundedRead extends PTransform transform) { + @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() + public StreamingBoundedRead(DataflowRunner runner, Read.Bounded transform) { this.source = transform.getSource(); } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index a37ed27408a3..558efbb34696 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -55,7 +55,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.io.BigQueryIO; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; @@ -896,12 +895,6 @@ private void testUnsupportedSource(PTransform source, String name, bo p.run(); } - @Test - public void testBigQueryIOSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource( - BigQueryIO.Read.from("project:bar.baz").withoutValidation(), "BigQueryIO.Read", true); - } - @Test public void testReadUnboundedUnsupportedInBatch() throws Exception { testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false); From 91527cd462a9f961193eec64d1dae0de0b30d511 Mon Sep 17 00:00:00 2001 From: Pei He Date: Wed, 22 Jun 2016 18:12:05 -0700 Subject: [PATCH 09/11] addressed comments --- .../runners/core/UnboundedReadFromBoundedSource.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java index ba3c0f03b729..782af658b936 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java @@ -69,8 +69,8 @@ * *

{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles}, * and element timestamps are propagated. While any elements remain, the watermark is the beginning - * of time {@code BoundedWindow.TIMESTAMP_MIN_VALUE}, and after all elements have been produced - * the watermark goes to the end of time {@code BoundedWindow.TIMESTAMP_MAX_VALUE}. + * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced + * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. * *

Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner * {@link BoundedSource}. @@ -95,7 +95,7 @@ public UnboundedReadFromBoundedSource(BoundedSource source) { @Override public PCollection apply(PInput input) { - return Pipeline.applyTransform(input, + return input.getPipeline().apply( Read.from(new BoundedToUnboundedSourceAdapter<>(source))); } @@ -521,9 +521,9 @@ Checkpoint getCheckpointMark() { Double fractionConsumed = reader.getFractionConsumed(); if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) { double fractionRest = 1 - fractionConsumed; - int splitAttampts = 8; + int splitAttempts = 8; for (int i = 0; i < 8 && residualSplit == null; ++i) { - double fractionToSplit = fractionConsumed + fractionRest * i / splitAttampts; + double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts; residualSplit = reader.splitAtFraction(fractionToSplit); } } From 46abb9069a7665be40ed580d4cc848e06e8e147b Mon Sep 17 00:00:00 2001 From: Pei He Date: Thu, 23 Jun 2016 00:08:38 -0700 Subject: [PATCH 10/11] fix checkstyle and build --- .../apache/beam/runners/core/UnboundedReadFromBoundedSource.java | 1 - runners/google-cloud-dataflow-java/pom.xml | 1 - 2 files changed, 2 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java index 782af658b936..2b3d1c7105b0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.ListCoder; diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index 54084622d231..cd9bb712e4c9 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -299,7 +299,6 @@ org.apache.beam beam-runners-core-java - runtime From 847421a6921171ddede9b8c8882444c0ed07a0c8 Mon Sep 17 00:00:00 2001 From: Pei He Date: Thu, 23 Jun 2016 16:45:57 -0700 Subject: [PATCH 11/11] Revert Dataflow runner changes --- runners/google-cloud-dataflow-java/pom.xml | 1 + .../beam/runners/dataflow/DataflowRunner.java | 38 ++----------------- .../runners/dataflow/DataflowRunnerTest.java | 30 +++++++++++++++ 3 files changed, 35 insertions(+), 34 deletions(-) diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index cd9bb712e4c9..54084622d231 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -299,6 +299,7 @@ org.apache.beam beam-runners-core-java + runtime diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 6f2e4044a07d..5818ba536c98 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -24,7 +24,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; -import org.apache.beam.runners.core.UnboundedReadFromBoundedSource; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext; @@ -61,7 +60,6 @@ import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.BigQueryIO; -import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.PubsubIO; import org.apache.beam.sdk.io.PubsubUnboundedSink; @@ -355,8 +353,11 @@ public static DataflowRunner fromOptions(PipelineOptions options) { builder.put(View.AsIterable.class, StreamingViewAsIterable.class); builder.put(Write.Bound.class, StreamingWrite.class); builder.put(Read.Unbounded.class, StreamingUnboundedRead.class); - builder.put(Read.Bounded.class, StreamingBoundedRead.class); + builder.put(Read.Bounded.class, UnsupportedIO.class); + builder.put(AvroIO.Read.Bound.class, UnsupportedIO.class); builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class); + builder.put(BigQueryIO.Read.Bound.class, UnsupportedIO.class); + builder.put(TextIO.Read.Bound.class, UnsupportedIO.class); builder.put(TextIO.Write.Bound.class, UnsupportedIO.class); builder.put(Window.Bound.class, AssignWindows.class); // In streaming mode must use either the custom Pubsub unbounded source/sink or @@ -2622,37 +2623,6 @@ public void processElement(ProcessContext c) { } } - /** - * Specialized implementation for - * {@link org.apache.beam.sdk.io.Read.Bounded Read.Bounded} for the - * Dataflow runner in streaming mode. - */ - private static class StreamingBoundedRead extends PTransform> { - private final BoundedSource source; - - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public StreamingBoundedRead(DataflowRunner runner, Read.Bounded transform) { - this.source = transform.getSource(); - } - - @Override - protected Coder getDefaultOutputCoder() { - return source.getDefaultOutputCoder(); - } - - @Override - public final PCollection apply(PInput input) { - source.validate(); - - return Pipeline - .applyTransform(input, new UnboundedReadFromBoundedSource<>(source)) - .setIsBoundedInternal(IsBounded.BOUNDED); - } - } - /** * Specialized implementation for * {@link org.apache.beam.sdk.transforms.Create.Values Create.Values} for the diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 558efbb34696..e094d0d3ed38 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -55,6 +55,8 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.AvroIO; +import org.apache.beam.sdk.io.AvroSource; +import org.apache.beam.sdk.io.BigQueryIO; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; @@ -895,6 +897,34 @@ private void testUnsupportedSource(PTransform source, String name, bo p.run(); } + @Test + public void testBoundedSourceUnsupportedInStreaming() throws Exception { + testUnsupportedSource( + AvroSource.readFromFileWithClass("foo", String.class), "Read.Bounded", true); + } + + @Test + public void testBigQueryIOSourceUnsupportedInStreaming() throws Exception { + testUnsupportedSource( + BigQueryIO.Read.from("project:bar.baz").withoutValidation(), "BigQueryIO.Read", true); + } + + @Test + public void testAvroIOSourceUnsupportedInStreaming() throws Exception { + testUnsupportedSource( + AvroIO.Read.from("foo"), "AvroIO.Read", true); + } + + @Test + public void testTextIOSourceUnsupportedInStreaming() throws Exception { + testUnsupportedSource(TextIO.Read.from("foo"), "TextIO.Read", true); + } + + @Test + public void testReadBoundedSourceUnsupportedInStreaming() throws Exception { + testUnsupportedSource(Read.from(AvroSource.from("/tmp/test")), "Read.Bounded", true); + } + @Test public void testReadUnboundedUnsupportedInBatch() throws Exception { testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false);