From c95a61a0df01f93ebeeb897249535f160afdf9eb Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Tue, 3 May 2016 16:18:27 -0700 Subject: [PATCH 01/12] Add browse URL to GcsPath --- .../beam/sdk/util/FileIOChannelFactory.java | 7 +++ .../beam/sdk/util/GcsIOChannelFactory.java | 7 +++ .../beam/sdk/util/IOChannelFactory.java | 20 +++++++- .../apache/beam/sdk/util/gcsfs/GcsPath.java | 35 ++++++++++++- .../beam/sdk/util/gcsfs/GcsPathTest.java | 51 +++++++++++++++++++ 5 files changed, 118 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java index 7bc09e9c95ef..6e15ba60627b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java @@ -43,6 +43,8 @@ import java.util.List; import java.util.regex.Matcher; +import javax.annotation.Nullable; + /** * Implements IOChannelFactory for local files. */ @@ -133,4 +135,9 @@ public boolean isReadSeekEfficient(String spec) throws IOException { public String resolve(String path, String other) throws IOException { return Paths.get(path).resolve(other).toString(); } + @Nullable + @Override + public String getBrowseUrl(String path) { + return null; // TODO + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java index 2122c6427bd0..ba04e125fd17 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java @@ -27,6 +27,8 @@ import java.util.LinkedList; import java.util.List; +import javax.annotation.Nullable; + /** * Implements IOChannelFactory for GCS. */ @@ -84,4 +86,9 @@ public boolean isReadSeekEfficient(String spec) throws IOException { public String resolve(String path, String other) throws IOException { return GcsPath.fromUri(path).resolve(other).toString(); } + @Nullable + @Override + public String getBrowseUrl(String path) { + return GcsPath.fromUri(path).getBrowseUrl(); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java index c89c7ad56fd4..7170c3e26dca 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java @@ -23,6 +23,8 @@ import java.nio.channels.WritableByteChannel; import java.util.Collection; +import javax.annotation.Nullable; + /** * Defines a factory for working with read and write channels. * @@ -98,5 +100,21 @@ public interface IOChannelFactory { * Where the {@code other} path has a root component then resolution is highly implementation * dependent and therefore unspecified. */ - public String resolve(String path, String other) throws IOException; + String resolve(String path, String other) throws IOException; + + /** + * Retrieve a URL where the given {@code path} can be viewed and browsed, or null if browse URLs + * are not supported. + * + *

The returned URL should be suitable for a user to enter into a web browser and browse + * interactively. If the {@code path} refers to a file or data resource, the URL should + * point to a location where the resource can be viewed. If the {@code path} points to a + * directory or contains wildcards, the URL should point to a location where the inner resources + * can be browsed. + * + *

This method does not validate that a resource exists or is accessible for the given + * {@code path}. + */ + @Nullable + String getBrowseUrl(String path); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java index 3e4b7914d10f..d80c72761c65 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java @@ -478,7 +478,6 @@ public WatchKey register(WatchService watcher, WatchEvent.Kind... events) public Iterator iterator() { return new NameIterator(fs, !bucket.isEmpty(), bucketAndObject()); } - private static class NameIterator implements Iterator { private final FileSystem fs; private boolean fullPath; @@ -601,6 +600,40 @@ public String toResourceName() { return sb.toString(); } + private static final Pattern HAS_GLOB = Pattern.compile(".*[\\*\\?\\[\\]].*"); + /** + * @inheritDoc + * + * @see + * Accessing Google Cloud Platform Console + */ + public String getBrowseUrl() { + // GCS uses different URLs for browsing buckets vs. objects. Object "subdirectories" can + // be treated as buckets for browsing. + final String bucketPrefix = "https://console.cloud.google.com/storage/browser/"; + final String objectPrefix = "https://storage.cloud.google.com/"; + + // Iterate through path to remove any glob pattern suffix. + StringBuilder resourceBuilder = new StringBuilder(bucket).append("/"); + String[] components = object.split("/"); + for (int i = 0; i < components.length; i++) { + String component = components[i]; + if (HAS_GLOB.matcher(component).matches()) { + break; + } + + resourceBuilder.append(component); + if (i + 1 < components.length || object.endsWith("/")) { + resourceBuilder.append("/"); + } + } + + final boolean isDirectoryPath = resourceBuilder.charAt(resourceBuilder.length() - 1) == '/'; + String prefix = isDirectoryPath ? bucketPrefix : objectPrefix; + resourceBuilder.insert(0, prefix); + return resourceBuilder.toString(); + } + @Override public URI toUri() { try { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java index fdd1dfd6e7aa..29dc4302d4dc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java @@ -36,6 +36,8 @@ import java.util.Iterator; import java.util.List; +import autovalue.shaded.com.google.common.common.collect.ImmutableList; + /** * Tests of GcsPath. */ @@ -331,4 +333,53 @@ public void testSubPathError() { a.subpath(1, 1); // throws IllegalArgumentException Assert.fail(); } + + @Test + public void testBrowseUrl() { + class TestData { + String path; + String expectedBrowseUrl; + + TestData(String path, String expectedBrowseUrl) { + this.path = path; + this.expectedBrowseUrl = expectedBrowseUrl; + } + } + + List tests = ImmutableList.builder() + .add(new TestData( + "gs://bucket/a/b/c/object", + "https://storage.cloud.google.com/bucket/a/b/c/object")) + .add(new TestData( + "gs://bucket/", + "https://console.cloud.google.com/storage/browser/bucket/")) + .add(new TestData( + "gs://bucket/subdir/", + "https://console.cloud.google.com/storage/browser/bucket/subdir/")) + .add(new TestData( + "gs://bucket/subdir/*", + "https://console.cloud.google.com/storage/browser/bucket/subdir/")) + .add(new TestData( + "gs://bucket/subdir/foo*", + "https://console.cloud.google.com/storage/browser/bucket/subdir/")) + .add(new TestData( + "gs://bucket/subdir/**/b", + "https://console.cloud.google.com/storage/browser/bucket/subdir/")) + .add(new TestData( + "gs://bucket/subdir/b?", + "https://console.cloud.google.com/storage/browser/bucket/subdir/")) + .add(new TestData( + "gs://bucket/subdir/[bcd]", + "https://console.cloud.google.com/storage/browser/bucket/subdir/")) + .add(new TestData( + "gs://bucket/subdir*/foo", + "https://console.cloud.google.com/storage/browser/bucket/")) + .build(); + + for (TestData test : tests) { + GcsPath path = GcsPath.fromUri(test.path); + String actual = path.getBrowseUrl(); + assertEquals(String.format("Browse URL for %s", path), test.expectedBrowseUrl, actual); + } + } } From 6ff5bae0e5bc1cdc2d10ef9e65b44039c8363407 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Tue, 3 May 2016 16:32:21 -0700 Subject: [PATCH 02/12] Add browse URL to File IO --- .../beam/sdk/util/FileIOChannelFactory.java | 3 +- .../sdk/util/FileIOChannelFactoryTest.java | 29 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java index 6e15ba60627b..6c2e54093b26 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java @@ -138,6 +138,7 @@ public String resolve(String path, String other) throws IOException { @Nullable @Override public String getBrowseUrl(String path) { - return null; // TODO + File file = new File(path); + return file.toURI().toString(); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java index b510408fe960..27f076d95c4a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java @@ -36,6 +36,7 @@ import java.io.File; import java.io.FileNotFoundException; +import java.io.IOException; import java.io.Reader; import java.io.Writer; import java.nio.channels.Channels; @@ -224,4 +225,32 @@ public void testGetSizeBytesForNonExistentFile() throws Exception { factory.getSizeBytes( factory.resolve(temporaryFolder.getRoot().getPath(), "non-existent-file")); } + + @Test + public void testBrowseUrl() throws IOException { + class TestData { + String description; + String path; + String expectedBrowseUrl; + + TestData(String description, String path, String expectedBrowseUrl) { + this.description = description; + this.path = path; + this.expectedBrowseUrl = expectedBrowseUrl; + } + } + + File file = temporaryFolder.newFile(); + List tests = ImmutableList.builder() + .add(new TestData("absolute", "/a/b/c", "file:/a/b/c")) + .add(new TestData("relative", "a/b/c", new File("").toURI().toString() + "a/b/c")) + .add(new TestData("temp path", file.getPath(), file.toURI().toString())) + .build(); + + for (TestData test : tests) { + String actual = factory.getBrowseUrl(test.path); + assertEquals(String.format("Browse URL for %s: %s", test.description, test.path), + test.expectedBrowseUrl, actual); + } + } } From ca40e25c87ddde4693b17169e40c00c0b62be170 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Wed, 4 May 2016 09:38:18 -0700 Subject: [PATCH 03/12] Add DisplayDataMatcher for linkUrl --- .../transforms/display/DisplayDataMatchers.java | 13 +++++++++++++ .../display/DisplayDataMatchersTest.java | 16 ++++++++++++++++ .../sdk/transforms/display/DisplayDataTest.java | 17 +++++------------ 3 files changed, 34 insertions(+), 12 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java index e3721b8f6ef4..a86ab050e212 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java @@ -337,6 +337,19 @@ public static Matcher> hasValue(Object value) { return hasValue(Matchers.is(value)); } + public static Matcher> hasLinkUrl(String linkUrl) { + return hasLinkUrl(Matchers.is(linkUrl)); + } + + public static Matcher> hasLinkUrl(Matcher urlMatcher) { + return new FeatureMatcher, String>(urlMatcher, "with link url", "url") { + @Override + protected String featureValueOf(Item actual) { + return actual.getLinkUrl(); + } + }; + } + /** * Creates a matcher that matches if the examined {@link DisplayData.Item} contains a value * matching the specified value matcher. diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java index f848c5ed3082..50f57c5e026e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLinkUrl; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; @@ -97,6 +98,21 @@ public void testHasValue() { assertThat(createDisplayDataWithItem("foo", "bar"), matcher); } + @Test + public void testHasLinkUrl() { + DisplayData displayData = DisplayData.from(new HasDisplayData() { + @Override + public void populateDisplayData(Builder builder) { + builder.add(DisplayData.item("foo", "bar").withLinkUrl("http://go")); + } + }); + + Matcher matcher = hasDisplayItem(hasLinkUrl("http://gooooo")); + assertFalse(matcher.matches(displayData)); + + assertThat(displayData, hasDisplayItem(hasLinkUrl("http://go"))); + } + @Test public void testHasNamespace() { Matcher matcher = hasDisplayItem(hasNamespace(SampleTransform.class)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java index 851769a03e60..b88cc12ece39 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLinkUrl; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; @@ -198,7 +199,7 @@ public void populateDisplayData(DisplayData.Builder builder) { hasValue(ISO_FORMATTER.print(value)), hasShortValue(nullValue(String.class)), hasLabel(is("the current instant")), - hasUrl(is("http://time.gov"))); + hasLinkUrl("http://time.gov")); assertThat(item, matchesAllOf); } @@ -216,7 +217,9 @@ public void populateDisplayData(DisplayData.Builder builder) { assertThat( data, - hasDisplayItem(allOf(hasLabel(nullValue(String.class)), hasUrl(nullValue(String.class))))); + hasDisplayItem(allOf( + hasLabel(nullValue(String.class)), + hasLinkUrl(nullValue(String.class))))); } @Test @@ -1011,16 +1014,6 @@ protected String featureValueOf(DisplayData.Item actual) { }; } - private static Matcher> hasUrl(Matcher urlMatcher) { - return new FeatureMatcher, String>( - urlMatcher, "display item with url", "URL") { - @Override - protected String featureValueOf(DisplayData.Item actual) { - return actual.getLinkUrl(); - } - }; - } - private static Matcher> hasShortValue(Matcher valueStringMatcher) { return new FeatureMatcher, T>( valueStringMatcher, "display item with short value", "short value") { From 28739a73ec2510fc9b78fc878346f121e5bd7636 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Wed, 4 May 2016 09:39:04 -0700 Subject: [PATCH 04/12] Add linkUrl to AvroIO DisplayData --- .../java/org/apache/beam/sdk/io/AvroIO.java | 31 ++++++++++++++++--- .../org/apache/beam/sdk/io/AvroIOTest.java | 31 ++++++++++++++++--- 2 files changed, 54 insertions(+), 8 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 6b9f0100f034..12fc15fb8fe7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PCollection; @@ -330,9 +331,12 @@ public PCollection apply(PInput input) { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("filePattern", filepattern)) - .addIfNotDefault(DisplayData.item("validation", validate), true); + if (filepattern != null) { + builder.add(DisplayData.item("filePattern", filepattern) + .withLinkUrl(getBrowseUrl(filepattern))); + } + + builder.addIfNotDefault(DisplayData.item("validation", validate), true); } @Override @@ -693,9 +697,16 @@ public PDone apply(PCollection input) { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); + if (filenamePrefix != null) { + // Append wildcard to browseUrl input since this is a prefix + // for shardNameTemplate + fileSuffix. + String browseUrl = getBrowseUrl(filenamePrefix + "*"); + builder.add(DisplayData.item("filePrefix", filenamePrefix) + .withLinkUrl(browseUrl)); + } + builder .add(DisplayData.item("schema", type)) - .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix)) .addIfNotDefault( DisplayData.item("shardNameTemplate", shardTemplate), DEFAULT_SHARD_TEMPLATE) @@ -760,6 +771,18 @@ private static void validateOutputComponent(String partialFilePattern) { + partialFilePattern); } + private static String getBrowseUrl(String filePattern) { + IOChannelFactory factory; + try { + factory = IOChannelUtils.getFactory(filePattern); + } catch (IOException e) { + throw new IllegalStateException( + String.format("Invalid filePattern: %s", filePattern), e); + } + + return factory.getBrowseUrl(filePattern); + } + ///////////////////////////////////////////////////////////////////////////// /** Disallow construction of utility class. */ diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 43b1219d9473..c82ceb237c34 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -18,7 +18,11 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLinkUrl; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -28,11 +32,14 @@ import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.io.AvroIO.Write.Bound; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.PCollection; import com.google.common.base.MoreObjects; @@ -42,6 +49,7 @@ import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.Nullable; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -62,6 +70,11 @@ public class AvroIOTest { @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @BeforeClass + public static void setUpClass() { + IOChannelUtils.registerStandardIOFactories(PipelineOptionsFactory.as(GcsOptions.class)); + } + @Test public void testReadWithoutValidationFlag() throws Exception { AvroIO.Read.Bound read = AvroIO.Read.from("gs://bucket/foo*/baz"); @@ -262,18 +275,24 @@ public void testAvroSinkShardedWrite() throws Exception { @Test public void testReadDisplayData() { - AvroIO.Read.Bound read = AvroIO.Read.from("foo.*") + String filePattern = "gs://bucket/foo/bar"; + AvroIO.Read.Bound read = AvroIO.Read.from(filePattern) .withoutValidation(); DisplayData displayData = DisplayData.from(read); - assertThat(displayData, hasDisplayItem("filePattern", "foo.*")); assertThat(displayData, hasDisplayItem("validation", false)); + assertThat(displayData, hasDisplayItem(allOf( + hasKey("filePattern"), + hasValue(filePattern), + hasLinkUrl(GcsPath.fromUri(filePattern).getBrowseUrl())))); } @Test public void testWriteDisplayData() { + String filePattern = "gs://bucket/foo/bar"; + AvroIO.Write.Bound write = AvroIO.Write - .to("foo") + .to(filePattern) .withShardNameTemplate("-SS-of-NN-") .withSuffix("bar") .withSchema(GenericClass.class) @@ -282,7 +301,11 @@ public void testWriteDisplayData() { DisplayData displayData = DisplayData.from(write); - assertThat(displayData, hasDisplayItem("filePrefix", "foo")); + assertThat(displayData, hasDisplayItem(allOf( + hasKey("filePrefix"), + hasValue(filePattern), + hasLinkUrl(GcsPath.fromUri(filePattern + "*").getBrowseUrl())))); + assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-")); assertThat(displayData, hasDisplayItem("fileSuffix", "bar")); assertThat(displayData, hasDisplayItem("schema", GenericClass.class)); From fefc3ef5b2a3b7c0178a216c7fdaaa4885893144 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Wed, 4 May 2016 09:51:20 -0700 Subject: [PATCH 05/12] Add linkUrl to FileBasedSource DisplayData --- .../apache/beam/sdk/io/FileBasedSource.java | 9 ++++++- .../beam/sdk/io/FileBasedSourceTest.java | 24 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java index 954877f6ec73..6f1ac4cc568e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java @@ -277,7 +277,14 @@ private static long getEstimatedSizeOfFilesBySampling( @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.add(DisplayData.item("filePattern", getFileOrPatternSpec())); + String fileOrPatternSpec = getFileOrPatternSpec(); + try { + builder.add(DisplayData.item("filePattern", fileOrPatternSpec) + .withLinkUrl(IOChannelUtils.getFactory(fileOrPatternSpec).getBrowseUrl(fileOrPatternSpec))); + } catch (IOException e) { + throw new IllegalStateException( + String.format("Invalid file pattern: %s", fileOrPatternSpec), e); + } } private ListenableFuture>> createFutureForFileSplit( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java index bedbc9977844..6092bfd8c69b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java @@ -21,7 +21,12 @@ import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails; import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent; import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLinkUrl; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -34,17 +39,21 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader; import org.apache.beam.sdk.io.Source.Reader; +import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.PCollection; import com.google.common.collect.ImmutableList; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -76,6 +85,11 @@ public class FileBasedSourceTest { @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + @BeforeClass + public static void setUpClass() { + IOChannelUtils.registerStandardIOFactories(PipelineOptionsFactory.as(GcsOptions.class)); + } + /** * If {@code splitHeader} is null, this is just a simple line-based reader. Otherwise, the file is * considered to consist of blocks beginning with {@code splitHeader}. The header itself is not @@ -916,4 +930,14 @@ public void testSplitAtFractionExhaustive() throws Exception { TestFileBasedSource source = new TestFileBasedSource(file.getPath(), 1, 0, file.length(), null); assertSplitAtFractionExhaustive(source, options); } + + @Test + public void testDisplayData() { + String filePattern = "gs://bucket/foo/bar"; + FileBasedSource source = new TestFileBasedSource(filePattern, 1, null); + assertThat(DisplayData.from(source), hasDisplayItem(allOf( + hasKey("filePattern"), + hasValue(filePattern), + hasLinkUrl(GcsPath.fromUri(filePattern).getBrowseUrl())))); + } } From c28c4470c93b77096ea182ddc29813f7fe69db03 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Wed, 4 May 2016 10:01:39 -0700 Subject: [PATCH 06/12] Add linkUrl to FileBasedSink DisplayData --- .../org/apache/beam/sdk/io/FileBasedSink.java | 15 ++++++++++- .../apache/beam/sdk/io/FileBasedSinkTest.java | 26 +++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 10e93f5aeb2e..87c60c76c679 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -140,9 +140,22 @@ public void validate(PipelineOptions options) {} public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); + IOChannelFactory factory; + try { + factory = IOChannelUtils.getFactory(baseOutputFilename); + } catch (IOException e) { + throw new IllegalStateException( + String.format("Unrecognized file name format: %s", baseOutputFilename), e); + } + + // Append wildcard to browseUrl input since this is a filename prefix + String browseUrl = factory.getBrowseUrl(baseOutputFilename + "*"); + String fileNamePattern = String.format("%s%s%s", baseOutputFilename, fileNamingTemplate, getFileExtension(extension)); - builder.add(DisplayData.item("fileNamePattern", fileNamePattern)); + + builder.add(DisplayData.item("fileNamePattern", fileNamePattern) + .withLinkUrl(browseUrl)); } /** diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index 0e434fccd45e..3dd4c83724bf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -17,6 +17,12 @@ */ package org.apache.beam.sdk.io; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLinkUrl; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; + +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -26,9 +32,14 @@ import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation; import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation.TemporaryFileRetention; import org.apache.beam.sdk.io.FileBasedSink.FileResult; +import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -70,6 +81,11 @@ private String getBaseTempFilename() { return appendToTempFolder(baseTemporaryFilename); } + @BeforeClass + public static void setUpClass() { + IOChannelUtils.registerStandardIOFactories(PipelineOptionsFactory.as(GcsOptions.class)); + } + /** * FileBasedWriter opens the correct file, writes the header, footer, and elements in the * correct order, and returns the correct filename. @@ -397,6 +413,16 @@ public void testGenerateOutputFilenamesWithoutExtension() { assertEquals(expected, actual); } + @Test + public void testDisplayData() { + FileBasedSink sink = new SimpleSink("gs://bucket/foo/", "xml", "bar-NNN"); + assertThat(DisplayData.from(sink), hasDisplayItem(allOf( + hasKey("fileNamePattern"), + hasValue("gs://bucket/foo/bar-NNN.xml"), + hasLinkUrl(GcsPath.fromUri("gs://bucket/foo/").getBrowseUrl()) + ))); + } + /** * A simple FileBasedSink that writes String values as lines with header and footer lines. */ From f369a3324a260346bd45c786bccfa8a2a1c15e64 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Wed, 4 May 2016 10:19:15 -0700 Subject: [PATCH 07/12] Add linkUrl to TextIO DisplayData --- .../java/org/apache/beam/sdk/io/TextIO.java | 30 +++++++++++++++++-- .../org/apache/beam/sdk/io/TextIOTest.java | 29 +++++++++++++++--- 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 7f69c0a2131a..66db1d6b24da 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PCollection; @@ -343,10 +344,14 @@ public PCollection apply(PInput input) { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); + if (filepattern != null) { + builder.add(DisplayData.item("filePattern", filepattern) + .withLinkUrl(getBrowseUrl(filepattern))); + } + builder .add(DisplayData.item("compressionType", compressionType.toString())) - .addIfNotDefault(DisplayData.item("validation", validate), true) - .addIfNotNull(DisplayData.item("filePattern", filepattern)); + .addIfNotDefault(DisplayData.item("validation", validate), true); } @Override @@ -648,8 +653,15 @@ public PDone apply(PCollection input) { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); + if (filenamePrefix != null) { + // Append wildcard to browseUrl input since this is a filename prefix + String browseUrl = getBrowseUrl(filenamePrefix + "*"); + + builder.add(DisplayData.item("filePrefix", filenamePrefix) + .withLinkUrl(browseUrl)); + } + builder - .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix)) .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix), "") .addIfNotDefault( DisplayData.item("shardNameTemplate", shardTemplate), @@ -744,6 +756,18 @@ private static void validateOutputComponent(String partialFilePattern) { + partialFilePattern); } + private static String getBrowseUrl(String filePattern) { + IOChannelFactory factory; + try { + factory = IOChannelUtils.getFactory(filePattern); + } catch (IOException e) { + throw new IllegalStateException( + String.format("Invalid filePattern: %s", filePattern), e); + } + + return factory.getBrowseUrl(filePattern); + } + ////////////////////////////////////////////////////////////////////////////// /** Disable construction of utility class. */ diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 4d6d8dd4ac0b..888e4c8fd382 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -22,8 +22,12 @@ import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY; import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLinkUrl; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.core.AllOf.allOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -36,6 +40,7 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.TextIO.CompressionType; import org.apache.beam.sdk.io.TextIO.TextSource; +import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.SourceTestUtils; @@ -45,11 +50,13 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import com.google.common.collect.ImmutableList; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -79,6 +86,11 @@ public class TextIOTest { @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @Rule public ExpectedException expectedException = ExpectedException.none(); + @BeforeClass + public static void setUpClass() { + IOChannelUtils.registerStandardIOFactories(PipelineOptionsFactory.as(GcsOptions.class)); + } + void runTestRead(T[] expected, Coder coder) throws Exception { File tmpFile = tmpFolder.newFile("file.txt"); String filename = tmpFile.getPath(); @@ -159,14 +171,19 @@ public void testReadNamed() throws Exception { @Test public void testReadDisplayData() { + String filePattern = "gs://bucket/foo/bar*"; + TextIO.Read.Bound read = TextIO.Read - .from("foo.*") + .from(filePattern) .withCompressionType(CompressionType.BZIP2) .withoutValidation(); DisplayData displayData = DisplayData.from(read); - assertThat(displayData, hasDisplayItem("filePattern", "foo.*")); + assertThat(displayData, hasDisplayItem(allOf( + hasKey("filePattern"), + hasValue(filePattern), + hasLinkUrl(GcsPath.fromUri(filePattern).getBrowseUrl())))); assertThat(displayData, hasDisplayItem("compressionType", CompressionType.BZIP2.toString())); assertThat(displayData, hasDisplayItem("validation", false)); } @@ -292,8 +309,9 @@ public void testShardedWrite() throws Exception { @Test public void testWriteDisplayData() { + String filePrefix = "gs://bucket/foo/thing"; TextIO.Write.Bound write = TextIO.Write - .to("foo") + .to(filePrefix) .withSuffix("bar") .withShardNameTemplate("-SS-of-NN-") .withNumShards(100) @@ -301,7 +319,10 @@ public void testWriteDisplayData() { DisplayData displayData = DisplayData.from(write); - assertThat(displayData, hasDisplayItem("filePrefix", "foo")); + assertThat(displayData, hasDisplayItem(allOf( + hasKey("filePrefix"), + hasValue(filePrefix), + hasLinkUrl(GcsPath.fromUri(filePrefix + "*").getBrowseUrl())))); assertThat(displayData, hasDisplayItem("fileSuffix", "bar")); assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-")); assertThat(displayData, hasDisplayItem("numShards", 100)); From 0c4e94d0da173bd4af82bb2e20e153938047e351 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Tue, 10 May 2016 14:09:01 -0700 Subject: [PATCH 08/12] Check file pattern before using in display data IO transforms internally use IOChannelUtils to retrieve an IOChannelFactory for file operations. It's not safe to assume that this is safe at construction time because a custom IOChannelFactory may not be registered, or the transform implementation may be replaced altogether. --- .../java/org/apache/beam/sdk/io/AvroIO.java | 28 +++++++-- .../org/apache/beam/sdk/io/FileBasedSink.java | 23 ++++--- .../apache/beam/sdk/io/FileBasedSource.java | 20 +++++-- .../java/org/apache/beam/sdk/io/TextIO.java | 27 +++++++-- .../apache/beam/sdk/util/IOChannelUtils.java | 32 +++++++--- .../org/apache/beam/sdk/io/AvroIOTest.java | 28 +++++++++ .../apache/beam/sdk/io/FileBasedSinkTest.java | 9 +++ .../beam/sdk/io/FileBasedSourceTest.java | 10 ++++ .../org/apache/beam/sdk/io/TextIOTest.java | 23 +++++++ .../display/DisplayDataMatchers.java | 16 ++++- .../beam/sdk/util/IOChannelUtilsTest.java | 60 +++++++++++++++++++ 11 files changed, 243 insertions(+), 33 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 12fc15fb8fe7..22b5b0512d1c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -333,7 +333,7 @@ public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); if (filepattern != null) { builder.add(DisplayData.item("filePattern", filepattern) - .withLinkUrl(getBrowseUrl(filepattern))); + .withLinkUrl(getBrowseUrl(filepattern, validate))); } builder.addIfNotDefault(DisplayData.item("validation", validate), true); @@ -700,7 +700,7 @@ public void populateDisplayData(DisplayData.Builder builder) { if (filenamePrefix != null) { // Append wildcard to browseUrl input since this is a prefix // for shardNameTemplate + fileSuffix. - String browseUrl = getBrowseUrl(filenamePrefix + "*"); + String browseUrl = getBrowseUrl(filenamePrefix + "*", validate); builder.add(DisplayData.item("filePrefix", filenamePrefix) .withLinkUrl(browseUrl)); } @@ -771,13 +771,31 @@ private static void validateOutputComponent(String partialFilePattern) { + partialFilePattern); } - private static String getBrowseUrl(String filePattern) { + /** + * Retrieve the browse URL for a file pattern. + * + * @param validate Whether validation errors should cause an exception to throw. + * @return The browse URL, or null if the filePattern is invalid and validation is disabled. + */ + @Nullable + private static String getBrowseUrl(String filePattern, boolean validate) { + if (!IOChannelUtils.hasFactory(filePattern)) { + // Browse URLs are only used for display data and shouldn't throw unless validation is + // enabled. + if (validate) { + throw new IllegalStateException(String.format("Invalid filePattern: %s", filePattern)); + } else { + return null; + } + } + IOChannelFactory factory; try { factory = IOChannelUtils.getFactory(filePattern); } catch (IOException e) { - throw new IllegalStateException( - String.format("Invalid filePattern: %s", filePattern), e); + // hasFactory checked above, should not throw + throw new AssertionError(String.format( + "Unexpected error while retrieving browse url for file pattern: %s", filePattern), e); } return factory.getBrowseUrl(filePattern); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 87c60c76c679..fbb06921a599 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -140,16 +140,21 @@ public void validate(PipelineOptions options) {} public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - IOChannelFactory factory; - try { - factory = IOChannelUtils.getFactory(baseOutputFilename); - } catch (IOException e) { - throw new IllegalStateException( - String.format("Unrecognized file name format: %s", baseOutputFilename), e); - } - // Append wildcard to browseUrl input since this is a filename prefix - String browseUrl = factory.getBrowseUrl(baseOutputFilename + "*"); + String browseUrl = null; + String browseFilePattern = baseOutputFilename + "*"; + if (IOChannelUtils.hasFactory(browseFilePattern)) { + IOChannelFactory factory; + try { + factory = IOChannelUtils.getFactory(browseFilePattern); + } catch (IOException e) { + throw new AssertionError(String.format( + "Unexpected error while retrieving browse url for file pattern: %s", browseFilePattern), + e); + } + + browseUrl = factory.getBrowseUrl(browseFilePattern); + } String fileNamePattern = String.format("%s%s%s", baseOutputFilename, fileNamingTemplate, getFileExtension(extension)); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java index 6f1ac4cc568e..7f19d3a50cc0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java @@ -277,14 +277,22 @@ private static long getEstimatedSizeOfFilesBySampling( @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); + + String fileOrPatternSpec = getFileOrPatternSpec(); - try { - builder.add(DisplayData.item("filePattern", fileOrPatternSpec) - .withLinkUrl(IOChannelUtils.getFactory(fileOrPatternSpec).getBrowseUrl(fileOrPatternSpec))); - } catch (IOException e) { - throw new IllegalStateException( - String.format("Invalid file pattern: %s", fileOrPatternSpec), e); + String browseUrl = null; + if (IOChannelUtils.hasFactory(fileOrPatternSpec)) { + try { + browseUrl = IOChannelUtils.getFactory(fileOrPatternSpec).getBrowseUrl(fileOrPatternSpec); + } catch (IOException e) { + throw new AssertionError(String.format( + "Unexpected error while retrieving browse url for file pattern: %s", fileOrPatternSpec), + e); + } } + + builder.add(DisplayData.item("filePattern", fileOrPatternSpec) + .withLinkUrl(browseUrl)); } private ListenableFuture>> createFutureForFileSplit( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 66db1d6b24da..0eb318deea0b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -346,7 +346,7 @@ public void populateDisplayData(DisplayData.Builder builder) { if (filepattern != null) { builder.add(DisplayData.item("filePattern", filepattern) - .withLinkUrl(getBrowseUrl(filepattern))); + .withLinkUrl(getBrowseUrl(filepattern, validate))); } builder @@ -655,7 +655,7 @@ public void populateDisplayData(DisplayData.Builder builder) { if (filenamePrefix != null) { // Append wildcard to browseUrl input since this is a filename prefix - String browseUrl = getBrowseUrl(filenamePrefix + "*"); + String browseUrl = getBrowseUrl(filenamePrefix + "*", validate); builder.add(DisplayData.item("filePrefix", filenamePrefix) .withLinkUrl(browseUrl)); @@ -756,13 +756,30 @@ private static void validateOutputComponent(String partialFilePattern) { + partialFilePattern); } - private static String getBrowseUrl(String filePattern) { + /** + * Retrieve the browse URL for a file pattern. + * + * @param validate Whether validation errors should cause an exception to throw. + * @return The browse URL, or null if the filePattern is invalid and validation is disabled. + */ + private static String getBrowseUrl(String filePattern, boolean validate) { + if (!IOChannelUtils.hasFactory(filePattern)) { + // Browse URLs are only used for display data and shouldn't throw unless validation is + // enabled. + if (validate) { + throw new IllegalStateException(String.format("Invalid filePattern: %s", filePattern)); + } else { + return null; + } + } + IOChannelFactory factory; try { factory = IOChannelUtils.getFactory(filePattern); } catch (IOException e) { - throw new IllegalStateException( - String.format("Invalid filePattern: %s", filePattern), e); + // hasFactory checked above, should not throw + throw new AssertionError(String.format( + "Unexpected error while retrieving browse url for file pattern: %s", filePattern), e); } return factory.getBrowseUrl(filePattern); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java index 05443fb3fe50..c03e53c7938e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java @@ -37,6 +37,9 @@ * Provides utilities for creating read and write channels. */ public class IOChannelUtils { + // Prevent instances + private IOChannelUtils() {} + // TODO: add registration mechanism for adding new schemas. private static final Map FACTORY_MAP = Collections.synchronizedMap(new HashMap()); @@ -160,13 +163,33 @@ public static String constructName(String prefix, return sb.toString(); } + /** + * Query whether an {@link IOChannelFactory} is associated with an input specification. + * + *

To register a new {@link IOChannelFactory}, call {@link #setIOFactory}. + */ + public static boolean hasFactory(String spec) { + return tryGetFactory(spec) != null; + } + private static final Pattern URI_SCHEME_PATTERN = Pattern.compile( "(?[a-zA-Z][-a-zA-Z0-9+.]*)://.*"); /** - * Returns the IOChannelFactory associated with an input specification. + * Returns the {@link IOChannelFactory} associated with an input specification. + * + * @throws IOException if no {@link IOChannelFactory} is registered to handle the given file spec. */ public static IOChannelFactory getFactory(String spec) throws IOException { + IOChannelFactory ioFactory = tryGetFactory(spec); + if (ioFactory != null) { + return ioFactory; + } + + throw new IOException("Unable to find handler for " + spec); + } + + private static IOChannelFactory tryGetFactory(String spec) { // The spec is almost, but not quite, a URI. In particular, // the reserved characters '[', ']', and '?' have meanings that differ // from their use in the URI spec. ('*' is not reserved). @@ -179,12 +202,7 @@ public static IOChannelFactory getFactory(String spec) throws IOException { } String scheme = matcher.group("scheme"); - IOChannelFactory ioFactory = FACTORY_MAP.get(scheme); - if (ioFactory != null) { - return ioFactory; - } - - throw new IOException("Unable to find handler for " + spec); + return FACTORY_MAP.get(scheme); } /** diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index c82ceb237c34..359500840a1c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -52,6 +53,7 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -62,11 +64,16 @@ import java.util.List; import java.util.Objects; +import autovalue.shaded.com.google.common.common.collect.Lists; + /** * Tests for AvroIO Read and Write transforms. */ @RunWith(JUnit4.class) public class AvroIOTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -312,4 +319,25 @@ public void testWriteDisplayData() { assertThat(displayData, hasDisplayItem("numShards", 100)); assertThat(displayData, hasDisplayItem("validation", false)); } + + /** + * A known file scheme is necessary to construct a link URL for display data. + * Verify that a bad file scheme doesn't throw an exception in display data unless + * validation is enabled. + */ + @Test + public void testDisplayDataResilientToUnknownFileScheme() { + DisplayData unvalidatedDisplayData = DisplayData.from(AvroIO.Write + .to("foo://bar/baz") + .withoutValidation()); + assertThat(unvalidatedDisplayData, hasDisplayItem(allOf( + hasKey("filePrefix"), + not(hasLinkUrl()) + ))); + + String validatedScheme = "omg"; + thrown.expectMessage(validatedScheme); + + DisplayData.from(AvroIO.Write.to(validatedScheme + "://foo/bar")); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index 3dd4c83724bf..70fc18897a09 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -423,6 +424,14 @@ public void testDisplayData() { ))); } + @Test + public void testDisplayDataUnknownFileScheme() { + FileBasedSink sink = new SimpleSink("unknown://foo/", "xml", "bar-NNN"); + assertThat(DisplayData.from(sink), hasDisplayItem(allOf( + hasKey("fileNamePattern"), + not(hasLinkUrl())))); + } + /** * A simple FileBasedSink that writes String values as lines with header and footer lines. */ diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java index 6092bfd8c69b..7b5f90813f7e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java @@ -28,6 +28,7 @@ import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -940,4 +941,13 @@ public void testDisplayData() { hasValue(filePattern), hasLinkUrl(GcsPath.fromUri(filePattern).getBrowseUrl())))); } + + + @Test + public void testDisplayDataUnknownFileScheme() { + FileBasedSource source = new TestFileBasedSource("unknown://foo.txt", 1, null); + assertThat(DisplayData.from(source), hasDisplayItem(allOf( + hasKey("filePattern"), + not(hasLinkUrl())))); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 888e4c8fd382..49e3858dfe83 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -27,6 +27,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.not; import static org.hamcrest.core.AllOf.allOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -56,6 +57,7 @@ import com.google.common.collect.ImmutableList; +import org.hamcrest.Matchers; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -188,6 +190,27 @@ public void testReadDisplayData() { assertThat(displayData, hasDisplayItem("validation", false)); } + /** + * A known file scheme is necessary to construct a link URL for display data. + * Verify that a bad file scheme doesn't throw an exception in display data unless + * validation is enabled. + */ + @Test + public void testDisplayDataResilientToUnknownFileScheme() { + DisplayData unvalidatedDisplayData = DisplayData.from(TextIO.Read + .from("foo://bar/baz") + .withoutValidation()); + assertThat(unvalidatedDisplayData, hasDisplayItem(Matchers.allOf( + hasKey("filePattern"), + not(hasLinkUrl()) + ))); + + String validatedScheme = "omg"; + expectedException.expectMessage(validatedScheme); + + DisplayData.from(TextIO.Read.from(validatedScheme + "://foo/bar")); + } + void runTestWrite(T[] elems, Coder coder) throws Exception { runTestWrite(elems, coder, 1); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java index a86ab050e212..ee8a334a676f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java @@ -332,15 +332,29 @@ protected DisplayData.Type featureValueOf(DisplayData.Item actual) { * Creates a matcher that matches if the examined {@link DisplayData.Item} has the specified * value. */ - public static Matcher> hasValue(Object value) { return hasValue(Matchers.is(value)); } + /** + * Creates a matcher that matches if the examined {@link DisplayData.Item} contains any link url. + */ + public static Matcher> hasLinkUrl() { + return hasLinkUrl(Matchers.not(Matchers.isEmptyOrNullString())); + } + + /** + * Creates a matcher that matches if the examined {@link DisplayData.Item} has the specified + * link url. + */ public static Matcher> hasLinkUrl(String linkUrl) { return hasLinkUrl(Matchers.is(linkUrl)); } + /** + * Creates a matcher that matches if the examined {@link DisplayData.Item} contains a link url + * matching the specified value matcher. + */ public static Matcher> hasLinkUrl(Matcher urlMatcher) { return new FeatureMatcher, String>(urlMatcher, "with link url", "url") { @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java index c79f1a1faf86..2afade4fbfd4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java @@ -18,8 +18,11 @@ package org.apache.beam.sdk.util; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.common.collect.Lists; import com.google.common.io.Files; import org.junit.Assert; @@ -30,7 +33,14 @@ import org.junit.runners.JUnit4; import java.io.File; +import java.io.IOException; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.UUID; + +import javax.annotation.Nullable; /** * Tests for IOChannelUtils. @@ -92,4 +102,54 @@ public void testResolve() throws Exception { String expected = tmpFolder.getRoot().toPath().resolve("aa").toString(); assertEquals(expected, IOChannelUtils.resolve(tmpFolder.getRoot().toString(), "aa")); } + + @Test + public void testHasFactory() { + assertTrue(IOChannelUtils.hasFactory("no/scheme.txt")); + + String randomScheme = "test" + UUID.randomUUID(); + String prefixedFile = randomScheme + "://foo/bar.txt"; + assertFalse(IOChannelUtils.hasFactory(prefixedFile)); + + IOChannelUtils.setIOFactory(randomScheme, new StubIOChannelFactory()); + assertTrue(IOChannelUtils.hasFactory(prefixedFile)); + } + + private static class StubIOChannelFactory implements IOChannelFactory { + @Override + public Collection match(String spec) throws IOException { + return Lists.newArrayList(); + } + + @Override + public ReadableByteChannel open(String spec) throws IOException { + return null; + } + + @Override + public WritableByteChannel create(String spec, String mimeType) throws IOException { + return null; + } + + @Override + public long getSizeBytes(String spec) throws IOException { + return 0; + } + + @Override + public boolean isReadSeekEfficient(String spec) throws IOException { + return false; + } + + @Override + public String resolve(String path, String other) throws IOException { + return null; + } + + @Nullable + @Override + public String getBrowseUrl(String path) { + return null; + } + } } From b3e23eaefa077e1b7ba150038881112370793f97 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Tue, 10 May 2016 16:26:41 -0700 Subject: [PATCH 09/12] fixup! Add linkUrl to AvroIO DisplayData --- .../core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 359500840a1c..457d4a41738b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -64,8 +64,6 @@ import java.util.List; import java.util.Objects; -import autovalue.shaded.com.google.common.common.collect.Lists; - /** * Tests for AvroIO Read and Write transforms. */ From a453d71cb5c82f26576f8eae3e41b99f291d329a Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Thu, 12 May 2016 13:57:06 -0700 Subject: [PATCH 10/12] Only add linkUrl when we can safely construct path --- .../java/org/apache/beam/sdk/io/AvroIO.java | 8 +--- .../org/apache/beam/sdk/io/FileBasedSink.java | 16 +++++--- .../java/org/apache/beam/sdk/io/TextIO.java | 9 +---- .../org/apache/beam/sdk/io/AvroIOTest.java | 6 +-- .../apache/beam/sdk/io/FileBasedSinkTest.java | 38 +++++++++++++++++++ .../org/apache/beam/sdk/io/TextIOTest.java | 5 +-- 6 files changed, 53 insertions(+), 29 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 22b5b0512d1c..5b3020441997 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -697,19 +697,13 @@ public PDone apply(PCollection input) { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - if (filenamePrefix != null) { - // Append wildcard to browseUrl input since this is a prefix - // for shardNameTemplate + fileSuffix. - String browseUrl = getBrowseUrl(filenamePrefix + "*", validate); - builder.add(DisplayData.item("filePrefix", filenamePrefix) - .withLinkUrl(browseUrl)); - } builder .add(DisplayData.item("schema", type)) .addIfNotDefault( DisplayData.item("shardNameTemplate", shardTemplate), DEFAULT_SHARD_TEMPLATE) + .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix)) .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix), "") .addIfNotDefault(DisplayData.item("numShards", numShards), 0) .addIfNotDefault(DisplayData.item("validation", validate), true); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index fbb06921a599..b10525622344 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -140,9 +140,9 @@ public void validate(PipelineOptions options) {} public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - // Append wildcard to browseUrl input since this is a filename prefix String browseUrl = null; - String browseFilePattern = baseOutputFilename + "*"; + String browseFilePattern = formatFilePattern( + baseOutputFilename, globTemplate(fileNamingTemplate), extension); if (IOChannelUtils.hasFactory(browseFilePattern)) { IOChannelFactory factory; try { @@ -156,13 +156,19 @@ public void populateDisplayData(DisplayData.Builder builder) { browseUrl = factory.getBrowseUrl(browseFilePattern); } - String fileNamePattern = String.format("%s%s%s", - baseOutputFilename, fileNamingTemplate, getFileExtension(extension)); - + String fileNamePattern = formatFilePattern(baseOutputFilename, fileNamingTemplate, extension); builder.add(DisplayData.item("fileNamePattern", fileNamePattern) .withLinkUrl(browseUrl)); } + private static String formatFilePattern(String base, String template, String extension) { + return String.format("%s%s%s", base, template, getFileExtension(extension)); + } + + private static String globTemplate(String fileNamingTemplate) { + return fileNamingTemplate.replace('N', '*').replace('S', '*'); + } + /** * Returns the file extension to be used. If the user did not request a file * extension then this method returns the empty string. Otherwise this method diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 0eb318deea0b..89a1ac129cc2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -653,15 +653,8 @@ public PDone apply(PCollection input) { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - if (filenamePrefix != null) { - // Append wildcard to browseUrl input since this is a filename prefix - String browseUrl = getBrowseUrl(filenamePrefix + "*", validate); - - builder.add(DisplayData.item("filePrefix", filenamePrefix) - .withLinkUrl(browseUrl)); - } - builder + .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix)) .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix), "") .addIfNotDefault( DisplayData.item("shardNameTemplate", shardTemplate), diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 457d4a41738b..9c6b48c748d5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -306,11 +306,7 @@ public void testWriteDisplayData() { DisplayData displayData = DisplayData.from(write); - assertThat(displayData, hasDisplayItem(allOf( - hasKey("filePrefix"), - hasValue(filePattern), - hasLinkUrl(GcsPath.fromUri(filePattern + "*").getBrowseUrl())))); - + assertThat(displayData, hasDisplayItem("filePrefix", filePattern)); assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-")); assertThat(displayData, hasDisplayItem("fileSuffix", "bar")); assertThat(displayData, hasDisplayItem("schema", GenericClass.class)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index 70fc18897a09..9975481e53dc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -40,6 +40,8 @@ import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.gcsfs.GcsPath; +import com.google.common.collect.ImmutableList; + import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -51,6 +53,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.FileReader; +import java.io.IOException; import java.io.PrintWriter; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; @@ -424,6 +427,41 @@ public void testDisplayData() { ))); } + @Test + public void testDisplayDataLinkUrl() throws IOException { + class TestCase { + String prefix; + String shardTemplate; + String suffix; + String expectedBrowsePath; + + TestCase(String prefix, String shardTemplate, String suffix, String expectedBrowsePath) { + this.prefix = prefix; + this.shardTemplate = shardTemplate; + this.suffix = suffix; + this.expectedBrowsePath = expectedBrowsePath; + } + } + + Iterable tests = ImmutableList.builder() + .add(new TestCase("gs://bucket/foo/", "bar", ".xml", "gs://bucket/foo/bar.xml")) + .add(new TestCase("gs://bucket/foo/", "object-NNN", ".xml", "gs://bucket/foo/")) + .add(new TestCase("gs://bucket/foo/", "object-SSS", ".xml", "gs://bucket/foo/")) + .add(new TestCase("gs://bucket/foo", "/object-NNN", ".xml", "gs://bucket/foo/")) + .add(new TestCase("gs://bucket/foo", "object-NNN", ".xml", "gs://bucket/")) + .add(new TestCase("gs://bucket/", "object-NNN", ".xml", "gs://bucket/")) + .add(new TestCase("gs://bucket", "/object-NNN", ".xml", "gs://bucket/")) + .build(); + + for (TestCase test : tests) { + FileBasedSink sink = new SimpleSink(test.prefix, test.suffix, test.shardTemplate); + String expectedLinkUrl = IOChannelUtils.getFactory(test.expectedBrowsePath) + .getBrowseUrl(test.expectedBrowsePath); + + assertThat(DisplayData.from(sink), hasDisplayItem(hasLinkUrl(expectedLinkUrl))); + } + } + @Test public void testDisplayDataUnknownFileScheme() { FileBasedSink sink = new SimpleSink("unknown://foo/", "xml", "bar-NNN"); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 49e3858dfe83..2117e9b18461 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -342,10 +342,7 @@ public void testWriteDisplayData() { DisplayData displayData = DisplayData.from(write); - assertThat(displayData, hasDisplayItem(allOf( - hasKey("filePrefix"), - hasValue(filePrefix), - hasLinkUrl(GcsPath.fromUri(filePrefix + "*").getBrowseUrl())))); + assertThat(displayData, hasDisplayItem("filePrefix", filePrefix)); assertThat(displayData, hasDisplayItem("fileSuffix", "bar")); assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-")); assertThat(displayData, hasDisplayItem("numShards", 100)); From 6c8356618b281c85ae7863aff59987903923a530 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Fri, 13 May 2016 14:27:58 -0700 Subject: [PATCH 11/12] fixup! Add linkUrl to AvroIO DisplayData --- .../org/apache/beam/sdk/io/AvroIOTest.java | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 9c6b48c748d5..6f715d5a576a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -313,25 +313,4 @@ public void testWriteDisplayData() { assertThat(displayData, hasDisplayItem("numShards", 100)); assertThat(displayData, hasDisplayItem("validation", false)); } - - /** - * A known file scheme is necessary to construct a link URL for display data. - * Verify that a bad file scheme doesn't throw an exception in display data unless - * validation is enabled. - */ - @Test - public void testDisplayDataResilientToUnknownFileScheme() { - DisplayData unvalidatedDisplayData = DisplayData.from(AvroIO.Write - .to("foo://bar/baz") - .withoutValidation()); - assertThat(unvalidatedDisplayData, hasDisplayItem(allOf( - hasKey("filePrefix"), - not(hasLinkUrl()) - ))); - - String validatedScheme = "omg"; - thrown.expectMessage(validatedScheme); - - DisplayData.from(AvroIO.Write.to(validatedScheme + "://foo/bar")); - } } From abcfce4d458e4027ba09482f5e1b6ed4516c959c Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Fri, 13 May 2016 14:28:27 -0700 Subject: [PATCH 12/12] Use parametrized tests for FileBasedSinkTest --- .../apache/beam/sdk/io/FileBasedSinkTest.java | 58 +++++++----- .../beam/sdk/util/gcsfs/GcsPathTest.java | 92 ++++++++++--------- 2 files changed, 83 insertions(+), 67 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index 9975481e53dc..d9a6c882ea83 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -40,14 +40,13 @@ import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.gcsfs.GcsPath; -import com.google.common.collect.ImmutableList; - import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; import java.io.BufferedReader; import java.io.File; @@ -60,6 +59,7 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; /** @@ -427,36 +427,50 @@ public void testDisplayData() { ))); } - @Test - public void testDisplayDataLinkUrl() throws IOException { - class TestCase { + @RunWith(Parameterized.class) + public static class DisplayDataValidLinkUrl { + static class TestCase { String prefix; String shardTemplate; String suffix; - String expectedBrowsePath; - TestCase(String prefix, String shardTemplate, String suffix, String expectedBrowsePath) { + TestCase(String prefix, String shardTemplate, String suffix) { this.prefix = prefix; this.shardTemplate = shardTemplate; this.suffix = suffix; - this.expectedBrowsePath = expectedBrowsePath; } } - Iterable tests = ImmutableList.builder() - .add(new TestCase("gs://bucket/foo/", "bar", ".xml", "gs://bucket/foo/bar.xml")) - .add(new TestCase("gs://bucket/foo/", "object-NNN", ".xml", "gs://bucket/foo/")) - .add(new TestCase("gs://bucket/foo/", "object-SSS", ".xml", "gs://bucket/foo/")) - .add(new TestCase("gs://bucket/foo", "/object-NNN", ".xml", "gs://bucket/foo/")) - .add(new TestCase("gs://bucket/foo", "object-NNN", ".xml", "gs://bucket/")) - .add(new TestCase("gs://bucket/", "object-NNN", ".xml", "gs://bucket/")) - .add(new TestCase("gs://bucket", "/object-NNN", ".xml", "gs://bucket/")) - .build(); - - for (TestCase test : tests) { - FileBasedSink sink = new SimpleSink(test.prefix, test.suffix, test.shardTemplate); - String expectedLinkUrl = IOChannelUtils.getFactory(test.expectedBrowsePath) - .getBrowseUrl(test.expectedBrowsePath); + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList(new Object[][]{ + {new TestCase("gs://bucket/foo/", "bar", ".xml"), "gs://bucket/foo/bar.xml"}, + {new TestCase("gs://bucket/foo/", "object-NNN", ".xml"), "gs://bucket/foo/"}, + {new TestCase("gs://bucket/foo/", "object-SSS", ".xml"), "gs://bucket/foo/"}, + {new TestCase("gs://bucket/foo", "/object-NNN", ".xml"), "gs://bucket/foo/"}, + {new TestCase("gs://bucket/foo", "object-NNN", ".xml"), "gs://bucket/"}, + {new TestCase("gs://bucket/", "object-NNN", ".xml"), "gs://bucket/"}, + {new TestCase("gs://bucket", "/object-NNN", ".xml"), "gs://bucket/"}}); + } + + @BeforeClass + public static void setUpClass() { + FileBasedSinkTest.setUpClass(); + } + + private final TestCase input; + private final String expectedBrowsePath; + + public DisplayDataValidLinkUrl(TestCase input, String expectedBrowsePath) { + this.input = input; + this.expectedBrowsePath = expectedBrowsePath; + } + + @Test + public void test() throws IOException { + FileBasedSink sink = new SimpleSink(input.prefix, input.suffix, input.shardTemplate); + String expectedLinkUrl = IOChannelUtils.getFactory(expectedBrowsePath) + .getBrowseUrl(expectedBrowsePath); assertThat(DisplayData.from(sink), hasDisplayItem(hasLinkUrl(expectedLinkUrl))); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java index 29dc4302d4dc..f4288ee43ee1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java @@ -28,16 +28,16 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; import java.net.URI; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; +import java.util.Collection; import java.util.Iterator; import java.util.List; -import autovalue.shaded.com.google.common.common.collect.ImmutableList; - /** * Tests of GcsPath. */ @@ -334,52 +334,54 @@ public void testSubPathError() { Assert.fail(); } - @Test - public void testBrowseUrl() { - class TestData { - String path; - String expectedBrowseUrl; - - TestData(String path, String expectedBrowseUrl) { - this.path = path; - this.expectedBrowseUrl = expectedBrowseUrl; - } + @RunWith(Parameterized.class) + public static class DisplayDataBrowseUrl { + + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList(new Object[][]{ + { + "gs://bucket/a/b/c/object", + "https://storage.cloud.google.com/bucket/a/b/c/object"}, + { + "gs://bucket/", + "https://console.cloud.google.com/storage/browser/bucket/"}, + { + "gs://bucket/subdir/", + "https://console.cloud.google.com/storage/browser/bucket/subdir/"}, + { + "gs://bucket/subdir/*", + "https://console.cloud.google.com/storage/browser/bucket/subdir/"}, + { + "gs://bucket/subdir/foo*", + "https://console.cloud.google.com/storage/browser/bucket/subdir/"}, + { + "gs://bucket/subdir/**/b", + "https://console.cloud.google.com/storage/browser/bucket/subdir/"}, + { + "gs://bucket/subdir/b?", + "https://console.cloud.google.com/storage/browser/bucket/subdir/"}, + { + "gs://bucket/subdir/[bcd]", + "https://console.cloud.google.com/storage/browser/bucket/subdir/"}, + { + "gs://bucket/subdir*/foo", + "https://console.cloud.google.com/storage/browser/bucket/"}}); + } + + private final String input; + private final String expected; + + public DisplayDataBrowseUrl(String input, String expected) { + this.input = input; + this.expected = expected; } - List tests = ImmutableList.builder() - .add(new TestData( - "gs://bucket/a/b/c/object", - "https://storage.cloud.google.com/bucket/a/b/c/object")) - .add(new TestData( - "gs://bucket/", - "https://console.cloud.google.com/storage/browser/bucket/")) - .add(new TestData( - "gs://bucket/subdir/", - "https://console.cloud.google.com/storage/browser/bucket/subdir/")) - .add(new TestData( - "gs://bucket/subdir/*", - "https://console.cloud.google.com/storage/browser/bucket/subdir/")) - .add(new TestData( - "gs://bucket/subdir/foo*", - "https://console.cloud.google.com/storage/browser/bucket/subdir/")) - .add(new TestData( - "gs://bucket/subdir/**/b", - "https://console.cloud.google.com/storage/browser/bucket/subdir/")) - .add(new TestData( - "gs://bucket/subdir/b?", - "https://console.cloud.google.com/storage/browser/bucket/subdir/")) - .add(new TestData( - "gs://bucket/subdir/[bcd]", - "https://console.cloud.google.com/storage/browser/bucket/subdir/")) - .add(new TestData( - "gs://bucket/subdir*/foo", - "https://console.cloud.google.com/storage/browser/bucket/")) - .build(); - - for (TestData test : tests) { - GcsPath path = GcsPath.fromUri(test.path); + @Test + public void test() { + GcsPath path = GcsPath.fromUri(input); String actual = path.getBrowseUrl(); - assertEquals(String.format("Browse URL for %s", path), test.expectedBrowseUrl, actual); + assertEquals(String.format("Browse URL for %s", path), expected, actual); } } }