diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml index 1587a1af36d2..98d80bb1bbe2 100644 --- a/runners/core-java/pom.xml +++ b/runners/core-java/pom.xml @@ -195,6 +195,31 @@ beam-sdks-java-core + + com.fasterxml.jackson.core + jackson-annotations + + + + joda-time + joda-time + + + + com.google.guava + guava + + + + com.google.code.findbugs + jsr305 + + + + org.slf4j + slf4j-api + + diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java new file mode 100644 index 000000000000..2b3d1c7105b0 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java @@ -0,0 +1,543 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.PropertyNames; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.TimestampedValue; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; + +/** + * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}. + * + *

{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles}, + * and element timestamps are propagated. While any elements remain, the watermark is the beginning + * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced + * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. + * + *

Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner + * {@link BoundedSource}. + * Sources that cannot be split are read entirely into memory, so this transform does not work well + * with large, unsplittable sources. + * + *

This transform is intended to be used by a runner during pipeline translation to convert + * a Read.Bounded into a Read.Unbounded. + */ +public class UnboundedReadFromBoundedSource extends PTransform> { + + private static final Logger LOG = LoggerFactory.getLogger(UnboundedReadFromBoundedSource.class); + + private final BoundedSource source; + + /** + * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}. + */ + public UnboundedReadFromBoundedSource(BoundedSource source) { + this.source = source; + } + + @Override + public PCollection apply(PInput input) { + return input.getPipeline().apply( + Read.from(new BoundedToUnboundedSourceAdapter<>(source))); + } + + @Override + protected Coder getDefaultOutputCoder() { + return source.getDefaultOutputCoder(); + } + + @Override + public String getKindString() { + return "Read(" + approximateSimpleName(source.getClass()) + ")"; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + // We explicitly do not register base-class data, instead we use the delegate inner source. + builder + .add(DisplayData.item("source", source.getClass())) + .include(source); + } + + /** + * A {@code BoundedSource} to {@code UnboundedSource} adapter. + */ + @VisibleForTesting + public static class BoundedToUnboundedSourceAdapter + extends UnboundedSource> { + + private BoundedSource boundedSource; + + public BoundedToUnboundedSourceAdapter(BoundedSource boundedSource) { + this.boundedSource = boundedSource; + } + + @Override + public void validate() { + boundedSource.validate(); + } + + @Override + public List> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + try { + long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits; + if (desiredBundleSize <= 0) { + LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.", + boundedSource); + return ImmutableList.of(this); + } + List> splits + = boundedSource.splitIntoBundles(desiredBundleSize, options); + if (splits == null) { + LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource); + return ImmutableList.of(this); + } + return Lists.transform( + splits, + new Function, BoundedToUnboundedSourceAdapter>() { + @Override + public BoundedToUnboundedSourceAdapter apply(BoundedSource input) { + return new BoundedToUnboundedSourceAdapter<>(input); + }}); + } catch (Exception e) { + LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e); + return ImmutableList.of(this); + } + } + + @Override + public Reader createReader(PipelineOptions options, Checkpoint checkpoint) + throws IOException { + if (checkpoint == null) { + return new Reader( + Collections.>emptyList() /* residualElements */, + boundedSource, + options); + } else { + return new Reader(checkpoint.residualElements, checkpoint.residualSource, options); + } + } + + @Override + public Coder getDefaultOutputCoder() { + return boundedSource.getDefaultOutputCoder(); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + @Override + public Coder> getCheckpointMarkCoder() { + return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder()); + } + + @VisibleForTesting + static class Checkpoint implements UnboundedSource.CheckpointMark { + private final List> residualElements; + private final @Nullable BoundedSource residualSource; + + public Checkpoint( + List> residualElements, + @Nullable BoundedSource residualSource) { + this.residualElements = residualElements; + this.residualSource = residualSource; + } + + @Override + public void finalizeCheckpoint() {} + + @VisibleForTesting + List> getResidualElements() { + return residualElements; + } + + @VisibleForTesting + @Nullable BoundedSource getResidualSource() { + return residualSource; + } + } + + @VisibleForTesting + static class CheckpointCoder extends StandardCoder> { + + @JsonCreator + public static CheckpointCoder of( + @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) + List> components) { + checkArgument(components.size() == 1, + "Expecting 1 components, got %s", components.size()); + return new CheckpointCoder<>(components.get(0)); + } + + // The coder for a list of residual elements and their timestamps + private final Coder>> elemsCoder; + // The coder from the BoundedReader for coding each element + private final Coder elemCoder; + // The nullable and serializable coder for the BoundedSource. + @SuppressWarnings("rawtypes") + private final Coder sourceCoder; + + CheckpointCoder(Coder elemCoder) { + this.elemsCoder = NullableCoder.of( + ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder))); + this.elemCoder = elemCoder; + this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class)); + } + + @Override + public void encode(Checkpoint value, OutputStream outStream, Context context) + throws CoderException, IOException { + Context nested = context.nested(); + elemsCoder.encode(value.residualElements, outStream, nested); + sourceCoder.encode(value.residualSource, outStream, nested); + } + + @SuppressWarnings("unchecked") + @Override + public Checkpoint decode(InputStream inStream, Context context) + throws CoderException, IOException { + Context nested = context.nested(); + return new Checkpoint<>( + elemsCoder.decode(inStream, nested), + sourceCoder.decode(inStream, nested)); + } + + @Override + public List> getCoderArguments() { + return Arrays.>asList(elemCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + throw new NonDeterministicException(this, + "CheckpointCoder uses Java Serialization, which may be non-deterministic."); + } + } + + /** + * An {@code UnboundedReader} that wraps a {@code BoundedSource} into + * {@link ResidualElements} and {@link ResidualSource}. + * + *

In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains + * the {@code BoundedSource}. After the first checkpoint, the {@code BoundedSource} will + * be split into {@link ResidualElements} and {@link ResidualSource}. + */ + @VisibleForTesting + class Reader extends UnboundedReader { + private ResidualElements residualElements; + private @Nullable ResidualSource residualSource; + private final PipelineOptions options; + private boolean done; + + Reader( + List> residualElementsList, + @Nullable BoundedSource residualSource, + PipelineOptions options) { + init(residualElementsList, residualSource, options); + this.options = checkNotNull(options, "options"); + this.done = false; + } + + private void init( + List> residualElementsList, + @Nullable BoundedSource residualSource, + PipelineOptions options) { + this.residualElements = new ResidualElements(residualElementsList); + this.residualSource = + residualSource == null ? null : new ResidualSource(residualSource, options); + } + + @Override + public boolean start() throws IOException { + return advance(); + } + + @Override + public boolean advance() throws IOException { + if (residualElements.advance()) { + return true; + } else if (residualSource != null && residualSource.advance()) { + return true; + } else { + done = true; + return false; + } + } + + @Override + public void close() throws IOException { + if (residualSource != null) { + residualSource.close(); + } + } + + @Override + public T getCurrent() throws NoSuchElementException { + if (residualElements.hasCurrent()) { + return residualElements.getCurrent(); + } else if (residualSource != null) { + return residualSource.getCurrent(); + } else { + throw new NoSuchElementException(); + } + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + if (residualElements.hasCurrent()) { + return residualElements.getCurrentTimestamp(); + } else if (residualSource != null) { + return residualSource.getCurrentTimestamp(); + } else { + throw new NoSuchElementException(); + } + } + + @Override + public Instant getWatermark() { + return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + /** + * {@inheritDoc} + * + *

If only part of the {@link ResidualElements} is consumed, the new + * checkpoint will contain the remaining elements in {@link ResidualElements} and + * the {@link ResidualSource}. + * + *

If all {@link ResidualElements} and part of the + * {@link ResidualSource} are consumed, the new checkpoint is done by splitting + * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}. + * {@link ResidualSource} is the source split from the current source, + * and {@link ResidualElements} contains rest elements from the current source after + * the splitting. For unsplittable source, it will put all remaining elements into + * the {@link ResidualElements}. + */ + @Override + public Checkpoint getCheckpointMark() { + Checkpoint newCheckpoint; + if (!residualElements.done()) { + // Part of residualElements are consumed. + // Checkpoints the remaining elements and residualSource. + newCheckpoint = new Checkpoint<>( + residualElements.getRestElements(), + residualSource == null ? null : residualSource.getSource()); + } else if (residualSource != null) { + newCheckpoint = residualSource.getCheckpointMark(); + } else { + newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */); + } + // Re-initialize since the residualElements and the residualSource might be + // consumed or split by checkpointing. + init(newCheckpoint.residualElements, newCheckpoint.residualSource, options); + return newCheckpoint; + } + + @Override + public BoundedToUnboundedSourceAdapter getCurrentSource() { + return BoundedToUnboundedSourceAdapter.this; + } + } + + private class ResidualElements { + private final List> elementsList; + private @Nullable Iterator> elementsIterator; + private @Nullable TimestampedValue currentT; + private boolean hasCurrent; + private boolean done; + + ResidualElements(List> residualElementsList) { + this.elementsList = checkNotNull(residualElementsList, "residualElementsList"); + this.elementsIterator = null; + this.currentT = null; + this.hasCurrent = false; + this.done = false; + } + + public boolean advance() { + if (elementsIterator == null) { + elementsIterator = elementsList.iterator(); + } + if (elementsIterator.hasNext()) { + currentT = elementsIterator.next(); + hasCurrent = true; + return true; + } else { + done = true; + hasCurrent = false; + return false; + } + } + + boolean hasCurrent() { + return hasCurrent; + } + + boolean done() { + return done; + } + + TimestampedValue getCurrentTimestampedValue() { + if (!hasCurrent) { + throw new NoSuchElementException(); + } + return currentT; + } + + T getCurrent() { + return getCurrentTimestampedValue().getValue(); + } + + Instant getCurrentTimestamp() { + return getCurrentTimestampedValue().getTimestamp(); + } + + List> getRestElements() { + if (elementsIterator == null) { + return elementsList; + } else { + List> newResidualElements = Lists.newArrayList(); + while (elementsIterator.hasNext()) { + newResidualElements.add(elementsIterator.next()); + } + return newResidualElements; + } + } + } + + private class ResidualSource { + private BoundedSource residualSource; + private PipelineOptions options; + private @Nullable BoundedReader reader; + private boolean closed; + + public ResidualSource(BoundedSource residualSource, PipelineOptions options) { + this.residualSource = checkNotNull(residualSource, "residualSource"); + this.options = checkNotNull(options, "options"); + this.reader = null; + this.closed = false; + } + + private boolean advance() throws IOException { + if (reader == null && !closed) { + reader = residualSource.createReader(options); + return reader.start(); + } else { + return reader.advance(); + } + } + + T getCurrent() throws NoSuchElementException { + if (reader == null) { + throw new NoSuchElementException(); + } + return reader.getCurrent(); + } + + Instant getCurrentTimestamp() throws NoSuchElementException { + if (reader == null) { + throw new NoSuchElementException(); + } + return reader.getCurrentTimestamp(); + } + + void close() throws IOException { + if (reader != null) { + reader.close(); + reader = null; + } + closed = true; + } + + BoundedSource getSource() { + return residualSource; + } + + Checkpoint getCheckpointMark() { + if (reader == null) { + // Reader hasn't started, checkpoint the residualSource. + return new Checkpoint<>(null /* residualElements */, residualSource); + } else { + // Part of residualSource are consumed. + // Splits the residualSource and tracks the new residualElements in current source. + BoundedSource residualSplit = null; + Double fractionConsumed = reader.getFractionConsumed(); + if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) { + double fractionRest = 1 - fractionConsumed; + int splitAttempts = 8; + for (int i = 0; i < 8 && residualSplit == null; ++i) { + double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts; + residualSplit = reader.splitAtFraction(fractionToSplit); + } + } + List> newResidualElements = Lists.newArrayList(); + try { + while (advance()) { + newResidualElements.add( + TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp())); + } + } catch (IOException e) { + throw new RuntimeException("Failed to read elements from the bounded reader.", e); + } + return new Checkpoint<>(newResidualElements, residualSplit); + } + } + } + } +} diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java new file mode 100644 index 000000000000..afd0927dbf19 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; +import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint; +import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.CheckpointCoder; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.FileBasedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Max; +import org.apache.beam.sdk.transforms.Min; +import org.apache.beam.sdk.transforms.RemoveDuplicates; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Random; + +/** + * Unit tests for {@link UnboundedReadFromBoundedSource}. + */ +@RunWith(JUnit4.class) +public class UnboundedReadFromBoundedSourceTest { + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + + @Test + public void testCheckpointCoderNulls() throws Exception { + CheckpointCoder coder = new CheckpointCoder<>(StringUtf8Coder.of()); + Checkpoint emptyCheckpoint = new Checkpoint<>(null, null); + Checkpoint decodedEmptyCheckpoint = CoderUtils.decodeFromByteArray( + coder, + CoderUtils.encodeToByteArray(coder, emptyCheckpoint)); + assertNull(decodedEmptyCheckpoint.getResidualElements()); + assertNull(decodedEmptyCheckpoint.getResidualSource()); + } + + @Test + @Category(NeedsRunner.class) + public void testBoundedToUnboundedSourceAdapter() throws Exception { + long numElements = 100; + BoundedSource boundedSource = CountingSource.upTo(numElements); + UnboundedSource> unboundedSource = + new BoundedToUnboundedSourceAdapter<>(boundedSource); + + Pipeline p = TestPipeline.create(); + + PCollection output = + p.apply(Read.from(unboundedSource).withMaxNumRecords(numElements)); + + // Count == numElements + PAssert + .thatSingleton(output.apply("Count", Count.globally())) + .isEqualTo(numElements); + // Unique count == numElements + PAssert + .thatSingleton(output.apply(RemoveDuplicates.create()) + .apply("UniqueCount", Count.globally())) + .isEqualTo(numElements); + // Min == 0 + PAssert + .thatSingleton(output.apply("Min", Min.globally())) + .isEqualTo(0L); + // Max == numElements-1 + PAssert + .thatSingleton(output.apply("Max", Max.globally())) + .isEqualTo(numElements - 1); + p.run(); + } + + @Test + public void testCountingSourceToUnboundedCheckpoint() throws Exception { + long numElements = 100; + BoundedSource countingSource = CountingSource.upTo(numElements); + List expected = Lists.newArrayList(); + for (long i = 0; i < numElements; ++i) { + expected.add(i); + } + testBoundedToUnboundedSourceAdapterCheckpoint(countingSource, expected); + } + + @Test + public void testUnsplittableSourceToUnboundedCheckpoint() throws Exception { + String baseName = "test-input"; + File compressedFile = tmpFolder.newFile(baseName + ".gz"); + byte[] input = generateInput(100); + writeFile(compressedFile, input); + + BoundedSource source = new UnsplittableSource(compressedFile.getPath(), 1); + List expected = Lists.newArrayList(); + for (byte i : input) { + expected.add(i); + } + testBoundedToUnboundedSourceAdapterCheckpoint(source, expected); + } + + private void testBoundedToUnboundedSourceAdapterCheckpoint( + BoundedSource boundedSource, + List expectedElements) throws Exception { + BoundedToUnboundedSourceAdapter unboundedSource = + new BoundedToUnboundedSourceAdapter<>(boundedSource); + + PipelineOptions options = PipelineOptionsFactory.create(); + BoundedToUnboundedSourceAdapter.Reader reader = + unboundedSource.createReader(options, null); + + List actual = Lists.newArrayList(); + for (boolean hasNext = reader.start(); hasNext; hasNext = reader.advance()) { + actual.add(reader.getCurrent()); + // checkpoint every 9 elements + if (actual.size() % 9 == 0) { + Checkpoint checkpoint = reader.getCheckpointMark(); + checkpoint.finalizeCheckpoint(); + } + } + assertEquals(expectedElements.size(), actual.size()); + assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual)); + } + + @Test + public void testCountingSourceToUnboundedCheckpointRestart() throws Exception { + long numElements = 100; + BoundedSource countingSource = CountingSource.upTo(numElements); + List expected = Lists.newArrayList(); + for (long i = 0; i < numElements; ++i) { + expected.add(i); + } + testBoundedToUnboundedSourceAdapterCheckpointRestart(countingSource, expected); + } + + @Test + public void testUnsplittableSourceToUnboundedCheckpointRestart() throws Exception { + String baseName = "test-input"; + File compressedFile = tmpFolder.newFile(baseName + ".gz"); + byte[] input = generateInput(1000); + writeFile(compressedFile, input); + + BoundedSource source = new UnsplittableSource(compressedFile.getPath(), 1); + List expected = Lists.newArrayList(); + for (byte i : input) { + expected.add(i); + } + testBoundedToUnboundedSourceAdapterCheckpointRestart(source, expected); + } + + private void testBoundedToUnboundedSourceAdapterCheckpointRestart( + BoundedSource boundedSource, + List expectedElements) throws Exception { + BoundedToUnboundedSourceAdapter unboundedSource = + new BoundedToUnboundedSourceAdapter<>(boundedSource); + + PipelineOptions options = PipelineOptionsFactory.create(); + BoundedToUnboundedSourceAdapter.Reader reader = + unboundedSource.createReader(options, null); + + List actual = Lists.newArrayList(); + for (boolean hasNext = reader.start(); hasNext;) { + actual.add(reader.getCurrent()); + // checkpoint every 9 elements + if (actual.size() % 9 == 0) { + Checkpoint checkpoint = reader.getCheckpointMark(); + Coder> checkpointCoder = unboundedSource.getCheckpointMarkCoder(); + Checkpoint decodedCheckpoint = CoderUtils.decodeFromByteArray( + checkpointCoder, + CoderUtils.encodeToByteArray(checkpointCoder, checkpoint)); + reader.close(); + checkpoint.finalizeCheckpoint(); + + BoundedToUnboundedSourceAdapter.Reader restarted = + unboundedSource.createReader(options, decodedCheckpoint); + reader = restarted; + hasNext = reader.start(); + } else { + hasNext = reader.advance(); + } + } + assertEquals(expectedElements.size(), actual.size()); + assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual)); + } + + @Test + public void testReadBeforeStart() throws Exception { + thrown.expect(NoSuchElementException.class); + + BoundedSource countingSource = CountingSource.upTo(100); + BoundedToUnboundedSourceAdapter unboundedSource = + new BoundedToUnboundedSourceAdapter<>(countingSource); + PipelineOptions options = PipelineOptionsFactory.create(); + + unboundedSource.createReader(options, null).getCurrent(); + } + + @Test + public void testReadFromCheckpointBeforeStart() throws Exception { + thrown.expect(NoSuchElementException.class); + + BoundedSource countingSource = CountingSource.upTo(100); + BoundedToUnboundedSourceAdapter unboundedSource = + new BoundedToUnboundedSourceAdapter<>(countingSource); + PipelineOptions options = PipelineOptionsFactory.create(); + + List> elements = + ImmutableList.of(TimestampedValue.of(1L, new Instant(1L))); + Checkpoint checkpoint = new Checkpoint<>(elements, countingSource); + unboundedSource.createReader(options, checkpoint).getCurrent(); + } + + /** + * Generate byte array of given size. + */ + private static byte[] generateInput(int size) { + // Arbitrary but fixed seed + Random random = new Random(285930); + byte[] buff = new byte[size]; + random.nextBytes(buff); + return buff; + } + + /** + * Writes a single output file. + */ + private static void writeFile(File file, byte[] input) throws IOException { + try (OutputStream os = new FileOutputStream(file)) { + os.write(input); + } + } + + /** + * Unsplittable source for use in tests. + */ + private static class UnsplittableSource extends FileBasedSource { + public UnsplittableSource(String fileOrPatternSpec, long minBundleSize) { + super(fileOrPatternSpec, minBundleSize); + } + + public UnsplittableSource( + String fileName, long minBundleSize, long startOffset, long endOffset) { + super(fileName, minBundleSize, startOffset, endOffset); + } + + @Override + protected FileBasedSource createForSubrangeOfFile(String fileName, long start, long end) { + return new UnsplittableSource(fileName, getMinBundleSize(), start, end); + } + + @Override + protected FileBasedReader createSingleFileReader(PipelineOptions options) { + return new UnsplittableReader(this); + } + + @Override + public boolean producesSortedKeys(PipelineOptions options) throws Exception { + return false; + } + + @Override + public Coder getDefaultOutputCoder() { + return SerializableCoder.of(Byte.class); + } + + private static class UnsplittableReader extends FileBasedReader { + ByteBuffer buff = ByteBuffer.allocate(1); + Byte current; + long offset; + ReadableByteChannel channel; + + public UnsplittableReader(UnsplittableSource source) { + super(source); + offset = source.getStartOffset() - 1; + } + + @Override + public Byte getCurrent() throws NoSuchElementException { + return current; + } + + @Override + public boolean allowsDynamicSplitting() { + return false; + } + + @Override + protected boolean isAtSplitPoint() { + return true; + } + + @Override + protected void startReading(ReadableByteChannel channel) throws IOException { + this.channel = channel; + } + + @Override + protected boolean readNextRecord() throws IOException { + buff.clear(); + if (channel.read(buff) != 1) { + return false; + } + current = new Byte(buff.get(0)); + offset += 1; + return true; + } + + @Override + protected long getCurrentOffset() { + return offset; + } + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java index 49b2ad4fab34..ba13f9dbe8bd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java @@ -197,7 +197,8 @@ public void validate() { } @Override - public BoundedReader> createReader(PipelineOptions options) { + public BoundedReader> createReader(PipelineOptions options) + throws IOException { return new Reader(source.createReader(options, null)); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java index 2c4a32572864..ea3004ebc7ea 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java @@ -76,7 +76,7 @@ public abstract List> genera * checkpoint if present. */ public abstract UnboundedReader createReader( - PipelineOptions options, @Nullable CheckpointMarkT checkpointMark); + PipelineOptions options, @Nullable CheckpointMarkT checkpointMark) throws IOException; /** * Returns a {@link Coder} for encoding and decoding the checkpoints for this source, or