From 13bad6b3f9fd3f4c1aad35e1a325df5babe5cc72 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 29 Mar 2016 18:37:30 -0700 Subject: [PATCH 1/3] Add TextIO and AvroIO withNumShards tests --- .../cloud/dataflow/sdk/io/AvroIOTest.java | 33 +++++++++++++- .../cloud/dataflow/sdk/io/TextIOTest.java | 44 +++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java index f0a9b1a1512d..691032c561a1 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java @@ -222,6 +222,37 @@ public void testAvroSinkWrite() throws Exception { } } - // TODO: for Write only, test withSuffix, withNumShards, + @SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for tests. + @Test + public void testAvroSinkShardedWrite() throws Exception { + String outputFilePrefix = new File(tmpFolder.getRoot(), "prefix").getAbsolutePath(); + String[] expectedElements = new String[] {"first", "second", "third", "fourth", "fifth"}; + + TestPipeline p = TestPipeline.create(); + int numShards = 4; + p.apply(Create.of(expectedElements)) + .apply( + AvroIO.Write.to(outputFilePrefix) + .withSchema(String.class) + .withNumShards(numShards) + .withShardNameTemplate(ShardNameTemplate.INDEX_OF_MAX)); + p.run(); + + // Validate that the data written matches the expected elements in the expected order + List actualElements = new ArrayList<>(); + for (int i = 0; i < numShards; i++) { + String expectedName = + IOChannelUtils.constructName( + outputFilePrefix, ShardNameTemplate.INDEX_OF_MAX, "" /* no suffix */, i, numShards); + File outputFile = new File(expectedName); + assertTrue("Expected output file " + expectedName, outputFile.exists()); + try (DataFileReader reader = + new DataFileReader<>(outputFile, AvroCoder.of(String.class).createDatumReader())) { + Iterators.addAll(actualElements, reader); + } + } + assertThat(actualElements, containsInAnyOrder(expectedElements)); + } + // TODO: for Write only, test withSuffix, // withShardNameTemplate and withoutSharding. } diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java index 0bd6ce76274e..a1bb7f71dd7f 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java @@ -43,6 +43,7 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.util.CoderUtils; import com.google.cloud.dataflow.sdk.util.GcsUtil; +import com.google.cloud.dataflow.sdk.util.IOChannelUtils; import com.google.cloud.dataflow.sdk.util.TestCredential; import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -285,6 +286,49 @@ public void testWriteNamed() { } } + @Test + public void testShardedWrite() throws Exception { + File tmpFile = tmpFolder.newFile("file"); + String filename = tmpFile.getPath(); + + Pipeline p = TestPipeline.create(); + + PCollection input = + p.apply(Create.of(Arrays.asList(LINES_ARRAY)).withCoder(StringUtf8Coder.of())); + + int numShards = 5; + String suffix = ".txt"; + TextIO.Write.Bound write = + TextIO.Write.to(filename) + .withNumShards(numShards) + .withShardNameTemplate(ShardNameTemplate.INDEX_OF_MAX) + .withSuffix(suffix); + + input.apply(write); + + p.run(); + + List actual = new ArrayList<>(); + for (int i = 0; i < numShards; i++) { + String shardName = + IOChannelUtils.constructName( + filename, ShardNameTemplate.INDEX_OF_MAX, suffix, i, numShards); + try (BufferedReader reader = new BufferedReader(new FileReader(shardName))) { + for (;;) { + String line = reader.readLine(); + if (line == null) { + break; + } + actual.add(line); + } + } + } + + String[] expected = LINES_ARRAY; + + assertThat(actual, containsInAnyOrder(expected)); + } + @Test public void testUnsupportedFilePattern() throws IOException { File outFolder = tmpFolder.newFolder(); From ea289fb9a0fb9b1d710f2fb467bf948407ada74e Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 6 Apr 2016 19:08:54 -0700 Subject: [PATCH 2/3] fixup! Add TextIO and AvroIO withNumShards tests --- .../cloud/dataflow/sdk/io/AvroIOTest.java | 84 ++++++++--------- .../cloud/dataflow/sdk/io/TextIOTest.java | 90 ++++++++----------- 2 files changed, 79 insertions(+), 95 deletions(-) diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java index 691032c561a1..15d00abab2da 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java @@ -25,6 +25,7 @@ import com.google.cloud.dataflow.sdk.coders.AvroCoder; import com.google.cloud.dataflow.sdk.coders.DefaultCoder; +import com.google.cloud.dataflow.sdk.io.AvroIO.Write.Bound; import com.google.cloud.dataflow.sdk.runners.DirectPipeline; import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.testing.TestPipeline; @@ -45,6 +46,7 @@ import org.junit.runners.JUnit4; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -198,54 +200,40 @@ public void testAvroIOWriteAndReadSchemaUpgrade() throws Throwable { } @SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for tests. - @Test - public void testAvroSinkWrite() throws Exception { - String outputFilePrefix = new File(tmpFolder.getRoot(), "prefix").getAbsolutePath(); - String[] expectedElements = new String[] {"first", "second", "third"}; - + private void runTestWrite(String[] expectedElements, int numShards) throws IOException { + File baseOutputFile = new File(tmpFolder.getRoot(), "prefix"); + String outputFilePrefix = baseOutputFile.getAbsolutePath(); TestPipeline p = TestPipeline.create(); - p.apply(Create.of(expectedElements)) - .apply(AvroIO.Write.to(outputFilePrefix).withSchema(String.class)); + Bound write = AvroIO.Write.to(outputFilePrefix) + .withSchema(String.class); + if (numShards > 1) { + write = write.withNumShards(numShards).withShardNameTemplate(ShardNameTemplate.INDEX_OF_MAX); + } else { + write = write.withoutSharding(); + } + p.apply(Create.of(expectedElements)).apply(write); p.run(); // Validate that the data written matches the expected elements in the expected order - String expectedName = - IOChannelUtils.constructName( - outputFilePrefix, ShardNameTemplate.INDEX_OF_MAX, "" /* no suffix */, 0, 1); - File outputFile = new File(expectedName); - assertTrue("Expected output file " + expectedName, outputFile.exists()); - try (DataFileReader reader = - new DataFileReader<>(outputFile, AvroCoder.of(String.class).createDatumReader())) { - List actualElements = new ArrayList<>(); - Iterators.addAll(actualElements, reader); - assertThat(actualElements, containsInAnyOrder(expectedElements)); + List expectedFiles = new ArrayList<>(); + if (numShards == 1) { + expectedFiles.add(baseOutputFile); + } else { + for (int i = 0; i < numShards; i++) { + expectedFiles.add( + new File( + IOChannelUtils.constructName( + outputFilePrefix, + ShardNameTemplate.INDEX_OF_MAX, + "" /* no suffix */, + i, + numShards))); + } } - } - - @SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for tests. - @Test - public void testAvroSinkShardedWrite() throws Exception { - String outputFilePrefix = new File(tmpFolder.getRoot(), "prefix").getAbsolutePath(); - String[] expectedElements = new String[] {"first", "second", "third", "fourth", "fifth"}; - - TestPipeline p = TestPipeline.create(); - int numShards = 4; - p.apply(Create.of(expectedElements)) - .apply( - AvroIO.Write.to(outputFilePrefix) - .withSchema(String.class) - .withNumShards(numShards) - .withShardNameTemplate(ShardNameTemplate.INDEX_OF_MAX)); - p.run(); - // Validate that the data written matches the expected elements in the expected order List actualElements = new ArrayList<>(); - for (int i = 0; i < numShards; i++) { - String expectedName = - IOChannelUtils.constructName( - outputFilePrefix, ShardNameTemplate.INDEX_OF_MAX, "" /* no suffix */, i, numShards); - File outputFile = new File(expectedName); - assertTrue("Expected output file " + expectedName, outputFile.exists()); + for (File outputFile : expectedFiles) { + assertTrue("Expected output file " + outputFile.getName(), outputFile.exists()); try (DataFileReader reader = new DataFileReader<>(outputFile, AvroCoder.of(String.class).createDatumReader())) { Iterators.addAll(actualElements, reader); @@ -253,6 +241,20 @@ public void testAvroSinkShardedWrite() throws Exception { } assertThat(actualElements, containsInAnyOrder(expectedElements)); } + + @Test + public void testAvroSinkWrite() throws Exception { + String[] expectedElements = new String[] {"first", "second", "third"}; + + runTestWrite(expectedElements, 1); + } + + @Test + public void testAvroSinkShardedWrite() throws Exception { + String[] expectedElements = new String[] {"first", "second", "third", "fourth", "fifth"}; + + runTestWrite(expectedElements, 4); + } // TODO: for Write only, test withSuffix, // withShardNameTemplate and withoutSharding. } diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java index a1bb7f71dd7f..6d4d6e7efe65 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java @@ -200,36 +200,57 @@ public void testReadNamed() throws Exception { } void runTestWrite(T[] elems, Coder coder) throws Exception { - File tmpFile = tmpFolder.newFile("file.txt"); - String filename = tmpFile.getPath(); + runTestWrite(elems, coder, 1); + } + + void runTestWrite(T[] elems, Coder coder, int numShards) throws Exception { + String filename = tmpFolder.newFile("file.txt").getPath(); Pipeline p = TestPipeline.create(); - PCollection input = - p.apply(Create.of(Arrays.asList(elems)).withCoder(coder)); + PCollection input = p.apply(Create.of(Arrays.asList(elems)).withCoder(coder)); TextIO.Write.Bound write; if (coder.equals(StringUtf8Coder.of())) { - TextIO.Write.Bound writeStrings = - TextIO.Write.to(filename).withoutSharding(); + TextIO.Write.Bound writeStrings = TextIO.Write.to(filename); // T==String write = (TextIO.Write.Bound) writeStrings; } else { - write = TextIO.Write.to(filename).withCoder(coder).withoutSharding(); + write = TextIO.Write.to(filename).withCoder(coder); + } + if (numShards == 1) { + write = write.withoutSharding(); + } else { + write = write.withNumShards(numShards).withShardNameTemplate(ShardNameTemplate.INDEX_OF_MAX); } input.apply(write); p.run(); + List expectedFiles = new ArrayList<>(); + if (numShards == 1) { + expectedFiles.add(new File(filename)); + } else { + for (int i = 0; i < numShards; i++) { + expectedFiles.add( + new File( + tmpFolder.getRoot(), + IOChannelUtils.constructName( + "file.txt", ShardNameTemplate.INDEX_OF_MAX, "", i, numShards))); + } + } + List actual = new ArrayList<>(); - try (BufferedReader reader = new BufferedReader(new FileReader(tmpFile))) { - for (;;) { - String line = reader.readLine(); - if (line == null) { - break; + for (File tmpFile : expectedFiles) { + try (BufferedReader reader = new BufferedReader(new FileReader(tmpFile))) { + for (;;) { + String line = reader.readLine(); + if (line == null) { + break; + } + actual.add(line); } - actual.add(line); } } @@ -241,8 +262,7 @@ void runTestWrite(T[] elems, Coder coder) throws Exception { expected[i] = line; } - assertThat(actual, - containsInAnyOrder(expected)); + assertThat(actual, containsInAnyOrder(expected)); } @Test @@ -288,45 +308,7 @@ public void testWriteNamed() { @Test public void testShardedWrite() throws Exception { - File tmpFile = tmpFolder.newFile("file"); - String filename = tmpFile.getPath(); - - Pipeline p = TestPipeline.create(); - - PCollection input = - p.apply(Create.of(Arrays.asList(LINES_ARRAY)).withCoder(StringUtf8Coder.of())); - - int numShards = 5; - String suffix = ".txt"; - TextIO.Write.Bound write = - TextIO.Write.to(filename) - .withNumShards(numShards) - .withShardNameTemplate(ShardNameTemplate.INDEX_OF_MAX) - .withSuffix(suffix); - - input.apply(write); - - p.run(); - - List actual = new ArrayList<>(); - for (int i = 0; i < numShards; i++) { - String shardName = - IOChannelUtils.constructName( - filename, ShardNameTemplate.INDEX_OF_MAX, suffix, i, numShards); - try (BufferedReader reader = new BufferedReader(new FileReader(shardName))) { - for (;;) { - String line = reader.readLine(); - if (line == null) { - break; - } - actual.add(line); - } - } - } - - String[] expected = LINES_ARRAY; - - assertThat(actual, containsInAnyOrder(expected)); + runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), 5); } @Test From c3854536e392290b53b37cccdce4730b8bdb3422 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Thu, 7 Apr 2016 08:51:12 -0700 Subject: [PATCH 3/3] fixup! Add TextIO and AvroIO withNumShards tests --- .../test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java index 15d00abab2da..ccc24d563faa 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java @@ -204,8 +204,7 @@ private void runTestWrite(String[] expectedElements, int numShards) throws IOExc File baseOutputFile = new File(tmpFolder.getRoot(), "prefix"); String outputFilePrefix = baseOutputFile.getAbsolutePath(); TestPipeline p = TestPipeline.create(); - Bound write = AvroIO.Write.to(outputFilePrefix) - .withSchema(String.class); + Bound write = AvroIO.Write.to(outputFilePrefix).withSchema(String.class); if (numShards > 1) { write = write.withNumShards(numShards).withShardNameTemplate(ShardNameTemplate.INDEX_OF_MAX); } else {