Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;

import org.joda.time.Instant;

import java.util.UUID;

/**
Expand Down Expand Up @@ -154,6 +152,7 @@ public void processElement(ProcessContext c) throws Exception {
// There is a dependency between this ParDo and the first (the WriteOperation PCollection
// as a side input), so this will happen after the initial ParDo.
PCollection<WriteT> results = input
.apply(Window.<T>into(new GlobalWindows()))
.apply("WriteBundles", ParDo.of(new DoFn<T, WriteT>() {
// Writer that will write the records in this bundle. Lazily
// initialized in processElement.
Expand Down Expand Up @@ -184,13 +183,11 @@ public void processElement(ProcessContext c) throws Exception {
public void finishBundle(Context c) throws Exception {
if (writer != null) {
WriteT result = writer.close();
// Output the result of the write.
c.outputWithTimestamp(result, Instant.now());
c.output(result);
}
}
}).withSideInputs(writeOperationView))
.setCoder(writeOperation.getWriterResultCoder())
.apply(Window.<WriteT>into(new GlobalWindows()));
.setCoder(writeOperation.getWriterResultCoder());

final PCollectionView<Iterable<WriteT>> resultsView =
results.apply(View.<WriteT>asIterable());
Expand Down
95 changes: 77 additions & 18 deletions sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,16 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

import com.google.common.base.MoreObjects;
Expand All @@ -54,14 +62,55 @@
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

/**
* Tests for the Write PTransform.
*/
@RunWith(JUnit4.class)
public class WriteTest {
// Static store that can be accessed within the writer
static List<String> sinkContents = new ArrayList<>();
private static List<String> sinkContents = new ArrayList<>();

private static final MapElements<String, String> IDENTITY_MAP =
MapElements.via(new SimpleFunction<String, String>() {
@Override
public String apply(String input) {
return input;
}
});

private static class WindowAndReshuffle<T> extends PTransform<PCollection<T>, PCollection<T>> {
private final Window.Bound<T> window;
public WindowAndReshuffle(Window.Bound<T> window) {
this.window = window;
}

private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
}
}

private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
@Override
public void processElement(ProcessContext c) throws Exception {
for (T s : c.element().getValue()) {
c.output(s);
}
}
}

@Override
public PCollection<T> apply(PCollection<T> input) {
return input
.apply(window)
.apply(ParDo.of(new AddArbitraryKey<T>()))
.apply(GroupByKey.<Integer, T>create())
.apply(ParDo.of(new RemoveArbitraryKey<T>()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up to you, but you can use Values.create() and Flatten.flattenIterables() instead of RemoveArbitraryKey

}
}

/**
* Test a Write transform with a PCollection of elements.
Expand All @@ -70,7 +119,7 @@ public class WriteTest {
public void testWrite() {
List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
"Intimidating pigeon", "Pedantic gull", "Frisky finch");
runWrite(inputs, /* not windowed */ false);
runWrite(inputs, IDENTITY_MAP);
}

/**
Expand All @@ -79,7 +128,7 @@ public void testWrite() {
@Test
public void testWriteWithEmptyPCollection() {
List<String> inputs = new ArrayList<>();
runWrite(inputs, /* not windowed */ false);
runWrite(inputs, IDENTITY_MAP);
}

/**
Expand All @@ -89,15 +138,30 @@ public void testWriteWithEmptyPCollection() {
public void testWriteWindowed() {
List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
"Intimidating pigeon", "Pedantic gull", "Frisky finch");
runWrite(inputs, /* windowed */ true);
runWrite(
inputs, new WindowAndReshuffle(Window.<String>into(FixedWindows.of(Duration.millis(2)))));
}

/**
* Test a Write with sessions.
*/
@Test
public void testWriteWithSessions() {
List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
"Intimidating pigeon", "Pedantic gull", "Frisky finch");

runWrite(
inputs,
new WindowAndReshuffle(Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))));
}

/**
* Performs a Write transform and verifies the Write transform calls the appropriate methods on
* a test sink in the correct order, as well as verifies that the elements of a PCollection are
* written to the sink.
*/
public void runWrite(List<String> inputs, boolean windowed) {
private static void runWrite(
List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform) {
// Flag to validate that the pipeline options are passed to the Sink
String[] args = {"--testFlag=test_value"};
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(WriteOptions.class);
Expand All @@ -106,21 +170,16 @@ public void runWrite(List<String> inputs, boolean windowed) {
// Clear the sink's contents.
sinkContents.clear();

// Construct the input PCollection and test Sink.
PCollection<String> input;
if (windowed) {
List<Long> timestamps = new ArrayList<>();
for (long i = 0; i < inputs.size(); i++) {
timestamps.add(i + 1);
}
input = p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
.apply(Window.<String>into(FixedWindows.of(new Duration(2))));
} else {
input = p.apply(Create.of(inputs).withCoder(StringUtf8Coder.of()));
// Prepare timestamps for the elements.
List<Long> timestamps = new ArrayList<>();
for (long i = 0; i < inputs.size(); i++) {
timestamps.add(i + 1);
}
TestSink sink = new TestSink();

input.apply(Write.to(sink));
TestSink sink = new TestSink();
p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
.apply(transform)
.apply(Write.to(sink));

p.run();
assertThat(sinkContents, containsInAnyOrder(inputs.toArray()));
Expand Down