diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 6b9f0100f034..5b3020441997 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PCollection; @@ -330,9 +331,12 @@ public PCollection apply(PInput input) { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("filePattern", filepattern)) - .addIfNotDefault(DisplayData.item("validation", validate), true); + if (filepattern != null) { + builder.add(DisplayData.item("filePattern", filepattern) + .withLinkUrl(getBrowseUrl(filepattern, validate))); + } + + builder.addIfNotDefault(DisplayData.item("validation", validate), true); } @Override @@ -693,12 +697,13 @@ public PDone apply(PCollection input) { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); + builder .add(DisplayData.item("schema", type)) - .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix)) .addIfNotDefault( DisplayData.item("shardNameTemplate", shardTemplate), DEFAULT_SHARD_TEMPLATE) + .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix)) .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix), "") .addIfNotDefault(DisplayData.item("numShards", numShards), 0) .addIfNotDefault(DisplayData.item("validation", validate), true); @@ -760,6 +765,36 @@ private static void validateOutputComponent(String partialFilePattern) { + partialFilePattern); } + /** + * Retrieve the browse URL for a file pattern. + * + * @param validate Whether validation errors should cause an exception to throw. + * @return The browse URL, or null if the filePattern is invalid and validation is disabled. + */ + @Nullable + private static String getBrowseUrl(String filePattern, boolean validate) { + if (!IOChannelUtils.hasFactory(filePattern)) { + // Browse URLs are only used for display data and shouldn't throw unless validation is + // enabled. + if (validate) { + throw new IllegalStateException(String.format("Invalid filePattern: %s", filePattern)); + } else { + return null; + } + } + + IOChannelFactory factory; + try { + factory = IOChannelUtils.getFactory(filePattern); + } catch (IOException e) { + // hasFactory checked above, should not throw + throw new AssertionError(String.format( + "Unexpected error while retrieving browse url for file pattern: %s", filePattern), e); + } + + return factory.getBrowseUrl(filePattern); + } + ///////////////////////////////////////////////////////////////////////////// /** Disallow construction of utility class. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 10e93f5aeb2e..b10525622344 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -140,9 +140,33 @@ public void validate(PipelineOptions options) {} public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - String fileNamePattern = String.format("%s%s%s", - baseOutputFilename, fileNamingTemplate, getFileExtension(extension)); - builder.add(DisplayData.item("fileNamePattern", fileNamePattern)); + String browseUrl = null; + String browseFilePattern = formatFilePattern( + baseOutputFilename, globTemplate(fileNamingTemplate), extension); + if (IOChannelUtils.hasFactory(browseFilePattern)) { + IOChannelFactory factory; + try { + factory = IOChannelUtils.getFactory(browseFilePattern); + } catch (IOException e) { + throw new AssertionError(String.format( + "Unexpected error while retrieving browse url for file pattern: %s", browseFilePattern), + e); + } + + browseUrl = factory.getBrowseUrl(browseFilePattern); + } + + String fileNamePattern = formatFilePattern(baseOutputFilename, fileNamingTemplate, extension); + builder.add(DisplayData.item("fileNamePattern", fileNamePattern) + .withLinkUrl(browseUrl)); + } + + private static String formatFilePattern(String base, String template, String extension) { + return String.format("%s%s%s", base, template, getFileExtension(extension)); + } + + private static String globTemplate(String fileNamingTemplate) { + return fileNamingTemplate.replace('N', '*').replace('S', '*'); } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java index 954877f6ec73..7f19d3a50cc0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java @@ -277,7 +277,22 @@ private static long getEstimatedSizeOfFilesBySampling( @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.add(DisplayData.item("filePattern", getFileOrPatternSpec())); + + + String fileOrPatternSpec = getFileOrPatternSpec(); + String browseUrl = null; + if (IOChannelUtils.hasFactory(fileOrPatternSpec)) { + try { + browseUrl = IOChannelUtils.getFactory(fileOrPatternSpec).getBrowseUrl(fileOrPatternSpec); + } catch (IOException e) { + throw new AssertionError(String.format( + "Unexpected error while retrieving browse url for file pattern: %s", fileOrPatternSpec), + e); + } + } + + builder.add(DisplayData.item("filePattern", fileOrPatternSpec) + .withLinkUrl(browseUrl)); } private ListenableFuture>> createFutureForFileSplit( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 7f69c0a2131a..89a1ac129cc2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PCollection; @@ -343,10 +344,14 @@ public PCollection apply(PInput input) { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); + if (filepattern != null) { + builder.add(DisplayData.item("filePattern", filepattern) + .withLinkUrl(getBrowseUrl(filepattern, validate))); + } + builder .add(DisplayData.item("compressionType", compressionType.toString())) - .addIfNotDefault(DisplayData.item("validation", validate), true) - .addIfNotNull(DisplayData.item("filePattern", filepattern)); + .addIfNotDefault(DisplayData.item("validation", validate), true); } @Override @@ -744,6 +749,35 @@ private static void validateOutputComponent(String partialFilePattern) { + partialFilePattern); } + /** + * Retrieve the browse URL for a file pattern. + * + * @param validate Whether validation errors should cause an exception to throw. + * @return The browse URL, or null if the filePattern is invalid and validation is disabled. + */ + private static String getBrowseUrl(String filePattern, boolean validate) { + if (!IOChannelUtils.hasFactory(filePattern)) { + // Browse URLs are only used for display data and shouldn't throw unless validation is + // enabled. + if (validate) { + throw new IllegalStateException(String.format("Invalid filePattern: %s", filePattern)); + } else { + return null; + } + } + + IOChannelFactory factory; + try { + factory = IOChannelUtils.getFactory(filePattern); + } catch (IOException e) { + // hasFactory checked above, should not throw + throw new AssertionError(String.format( + "Unexpected error while retrieving browse url for file pattern: %s", filePattern), e); + } + + return factory.getBrowseUrl(filePattern); + } + ////////////////////////////////////////////////////////////////////////////// /** Disable construction of utility class. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java index 7bc09e9c95ef..6c2e54093b26 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java @@ -43,6 +43,8 @@ import java.util.List; import java.util.regex.Matcher; +import javax.annotation.Nullable; + /** * Implements IOChannelFactory for local files. */ @@ -133,4 +135,10 @@ public boolean isReadSeekEfficient(String spec) throws IOException { public String resolve(String path, String other) throws IOException { return Paths.get(path).resolve(other).toString(); } + @Nullable + @Override + public String getBrowseUrl(String path) { + File file = new File(path); + return file.toURI().toString(); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java index 2122c6427bd0..ba04e125fd17 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java @@ -27,6 +27,8 @@ import java.util.LinkedList; import java.util.List; +import javax.annotation.Nullable; + /** * Implements IOChannelFactory for GCS. */ @@ -84,4 +86,9 @@ public boolean isReadSeekEfficient(String spec) throws IOException { public String resolve(String path, String other) throws IOException { return GcsPath.fromUri(path).resolve(other).toString(); } + @Nullable + @Override + public String getBrowseUrl(String path) { + return GcsPath.fromUri(path).getBrowseUrl(); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java index c89c7ad56fd4..7170c3e26dca 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java @@ -23,6 +23,8 @@ import java.nio.channels.WritableByteChannel; import java.util.Collection; +import javax.annotation.Nullable; + /** * Defines a factory for working with read and write channels. * @@ -98,5 +100,21 @@ public interface IOChannelFactory { * Where the {@code other} path has a root component then resolution is highly implementation * dependent and therefore unspecified. */ - public String resolve(String path, String other) throws IOException; + String resolve(String path, String other) throws IOException; + + /** + * Retrieve a URL where the given {@code path} can be viewed and browsed, or null if browse URLs + * are not supported. + * + *

The returned URL should be suitable for a user to enter into a web browser and browse + * interactively. If the {@code path} refers to a file or data resource, the URL should + * point to a location where the resource can be viewed. If the {@code path} points to a + * directory or contains wildcards, the URL should point to a location where the inner resources + * can be browsed. + * + *

This method does not validate that a resource exists or is accessible for the given + * {@code path}. + */ + @Nullable + String getBrowseUrl(String path); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java index 05443fb3fe50..c03e53c7938e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java @@ -37,6 +37,9 @@ * Provides utilities for creating read and write channels. */ public class IOChannelUtils { + // Prevent instances + private IOChannelUtils() {} + // TODO: add registration mechanism for adding new schemas. private static final Map FACTORY_MAP = Collections.synchronizedMap(new HashMap()); @@ -160,13 +163,33 @@ public static String constructName(String prefix, return sb.toString(); } + /** + * Query whether an {@link IOChannelFactory} is associated with an input specification. + * + *

To register a new {@link IOChannelFactory}, call {@link #setIOFactory}. + */ + public static boolean hasFactory(String spec) { + return tryGetFactory(spec) != null; + } + private static final Pattern URI_SCHEME_PATTERN = Pattern.compile( "(?[a-zA-Z][-a-zA-Z0-9+.]*)://.*"); /** - * Returns the IOChannelFactory associated with an input specification. + * Returns the {@link IOChannelFactory} associated with an input specification. + * + * @throws IOException if no {@link IOChannelFactory} is registered to handle the given file spec. */ public static IOChannelFactory getFactory(String spec) throws IOException { + IOChannelFactory ioFactory = tryGetFactory(spec); + if (ioFactory != null) { + return ioFactory; + } + + throw new IOException("Unable to find handler for " + spec); + } + + private static IOChannelFactory tryGetFactory(String spec) { // The spec is almost, but not quite, a URI. In particular, // the reserved characters '[', ']', and '?' have meanings that differ // from their use in the URI spec. ('*' is not reserved). @@ -179,12 +202,7 @@ public static IOChannelFactory getFactory(String spec) throws IOException { } String scheme = matcher.group("scheme"); - IOChannelFactory ioFactory = FACTORY_MAP.get(scheme); - if (ioFactory != null) { - return ioFactory; - } - - throw new IOException("Unable to find handler for " + spec); + return FACTORY_MAP.get(scheme); } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java index 3e4b7914d10f..d80c72761c65 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java @@ -478,7 +478,6 @@ public WatchKey register(WatchService watcher, WatchEvent.Kind... events) public Iterator iterator() { return new NameIterator(fs, !bucket.isEmpty(), bucketAndObject()); } - private static class NameIterator implements Iterator { private final FileSystem fs; private boolean fullPath; @@ -601,6 +600,40 @@ public String toResourceName() { return sb.toString(); } + private static final Pattern HAS_GLOB = Pattern.compile(".*[\\*\\?\\[\\]].*"); + /** + * @inheritDoc + * + * @see + * Accessing Google Cloud Platform Console + */ + public String getBrowseUrl() { + // GCS uses different URLs for browsing buckets vs. objects. Object "subdirectories" can + // be treated as buckets for browsing. + final String bucketPrefix = "https://console.cloud.google.com/storage/browser/"; + final String objectPrefix = "https://storage.cloud.google.com/"; + + // Iterate through path to remove any glob pattern suffix. + StringBuilder resourceBuilder = new StringBuilder(bucket).append("/"); + String[] components = object.split("/"); + for (int i = 0; i < components.length; i++) { + String component = components[i]; + if (HAS_GLOB.matcher(component).matches()) { + break; + } + + resourceBuilder.append(component); + if (i + 1 < components.length || object.endsWith("/")) { + resourceBuilder.append("/"); + } + } + + final boolean isDirectoryPath = resourceBuilder.charAt(resourceBuilder.length() - 1) == '/'; + String prefix = isDirectoryPath ? bucketPrefix : objectPrefix; + resourceBuilder.insert(0, prefix); + return resourceBuilder.toString(); + } + @Override public URI toUri() { try { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 43b1219d9473..6f715d5a576a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -18,8 +18,13 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLinkUrl; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -28,11 +33,14 @@ import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.io.AvroIO.Write.Bound; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.PCollection; import com.google.common.base.MoreObjects; @@ -42,8 +50,10 @@ import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.Nullable; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -59,9 +69,17 @@ */ @RunWith(JUnit4.class) public class AvroIOTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @BeforeClass + public static void setUpClass() { + IOChannelUtils.registerStandardIOFactories(PipelineOptionsFactory.as(GcsOptions.class)); + } + @Test public void testReadWithoutValidationFlag() throws Exception { AvroIO.Read.Bound read = AvroIO.Read.from("gs://bucket/foo*/baz"); @@ -262,18 +280,24 @@ public void testAvroSinkShardedWrite() throws Exception { @Test public void testReadDisplayData() { - AvroIO.Read.Bound read = AvroIO.Read.from("foo.*") + String filePattern = "gs://bucket/foo/bar"; + AvroIO.Read.Bound read = AvroIO.Read.from(filePattern) .withoutValidation(); DisplayData displayData = DisplayData.from(read); - assertThat(displayData, hasDisplayItem("filePattern", "foo.*")); assertThat(displayData, hasDisplayItem("validation", false)); + assertThat(displayData, hasDisplayItem(allOf( + hasKey("filePattern"), + hasValue(filePattern), + hasLinkUrl(GcsPath.fromUri(filePattern).getBrowseUrl())))); } @Test public void testWriteDisplayData() { + String filePattern = "gs://bucket/foo/bar"; + AvroIO.Write.Bound write = AvroIO.Write - .to("foo") + .to(filePattern) .withShardNameTemplate("-SS-of-NN-") .withSuffix("bar") .withSchema(GenericClass.class) @@ -282,7 +306,7 @@ public void testWriteDisplayData() { DisplayData displayData = DisplayData.from(write); - assertThat(displayData, hasDisplayItem("filePrefix", "foo")); + assertThat(displayData, hasDisplayItem("filePrefix", filePattern)); assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-")); assertThat(displayData, hasDisplayItem("fileSuffix", "bar")); assertThat(displayData, hasDisplayItem("schema", GenericClass.class)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index 0e434fccd45e..d9a6c882ea83 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -17,7 +17,14 @@ */ package org.apache.beam.sdk.io; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLinkUrl; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; + +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -26,25 +33,33 @@ import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation; import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation.TemporaryFileRetention; import org.apache.beam.sdk.io.FileBasedSink.FileResult; +import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; import java.io.FileReader; +import java.io.IOException; import java.io.PrintWriter; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; /** @@ -70,6 +85,11 @@ private String getBaseTempFilename() { return appendToTempFolder(baseTemporaryFilename); } + @BeforeClass + public static void setUpClass() { + IOChannelUtils.registerStandardIOFactories(PipelineOptionsFactory.as(GcsOptions.class)); + } + /** * FileBasedWriter opens the correct file, writes the header, footer, and elements in the * correct order, and returns the correct filename. @@ -397,6 +417,73 @@ public void testGenerateOutputFilenamesWithoutExtension() { assertEquals(expected, actual); } + @Test + public void testDisplayData() { + FileBasedSink sink = new SimpleSink("gs://bucket/foo/", "xml", "bar-NNN"); + assertThat(DisplayData.from(sink), hasDisplayItem(allOf( + hasKey("fileNamePattern"), + hasValue("gs://bucket/foo/bar-NNN.xml"), + hasLinkUrl(GcsPath.fromUri("gs://bucket/foo/").getBrowseUrl()) + ))); + } + + @RunWith(Parameterized.class) + public static class DisplayDataValidLinkUrl { + static class TestCase { + String prefix; + String shardTemplate; + String suffix; + + TestCase(String prefix, String shardTemplate, String suffix) { + this.prefix = prefix; + this.shardTemplate = shardTemplate; + this.suffix = suffix; + } + } + + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList(new Object[][]{ + {new TestCase("gs://bucket/foo/", "bar", ".xml"), "gs://bucket/foo/bar.xml"}, + {new TestCase("gs://bucket/foo/", "object-NNN", ".xml"), "gs://bucket/foo/"}, + {new TestCase("gs://bucket/foo/", "object-SSS", ".xml"), "gs://bucket/foo/"}, + {new TestCase("gs://bucket/foo", "/object-NNN", ".xml"), "gs://bucket/foo/"}, + {new TestCase("gs://bucket/foo", "object-NNN", ".xml"), "gs://bucket/"}, + {new TestCase("gs://bucket/", "object-NNN", ".xml"), "gs://bucket/"}, + {new TestCase("gs://bucket", "/object-NNN", ".xml"), "gs://bucket/"}}); + } + + @BeforeClass + public static void setUpClass() { + FileBasedSinkTest.setUpClass(); + } + + private final TestCase input; + private final String expectedBrowsePath; + + public DisplayDataValidLinkUrl(TestCase input, String expectedBrowsePath) { + this.input = input; + this.expectedBrowsePath = expectedBrowsePath; + } + + @Test + public void test() throws IOException { + FileBasedSink sink = new SimpleSink(input.prefix, input.suffix, input.shardTemplate); + String expectedLinkUrl = IOChannelUtils.getFactory(expectedBrowsePath) + .getBrowseUrl(expectedBrowsePath); + + assertThat(DisplayData.from(sink), hasDisplayItem(hasLinkUrl(expectedLinkUrl))); + } + } + + @Test + public void testDisplayDataUnknownFileScheme() { + FileBasedSink sink = new SimpleSink("unknown://foo/", "xml", "bar-NNN"); + assertThat(DisplayData.from(sink), hasDisplayItem(allOf( + hasKey("fileNamePattern"), + not(hasLinkUrl())))); + } + /** * A simple FileBasedSink that writes String values as lines with header and footer lines. */ diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java index bedbc9977844..7b5f90813f7e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java @@ -21,8 +21,14 @@ import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails; import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent; import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLinkUrl; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -34,17 +40,21 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader; import org.apache.beam.sdk.io.Source.Reader; +import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.PCollection; import com.google.common.collect.ImmutableList; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -76,6 +86,11 @@ public class FileBasedSourceTest { @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + @BeforeClass + public static void setUpClass() { + IOChannelUtils.registerStandardIOFactories(PipelineOptionsFactory.as(GcsOptions.class)); + } + /** * If {@code splitHeader} is null, this is just a simple line-based reader. Otherwise, the file is * considered to consist of blocks beginning with {@code splitHeader}. The header itself is not @@ -916,4 +931,23 @@ public void testSplitAtFractionExhaustive() throws Exception { TestFileBasedSource source = new TestFileBasedSource(file.getPath(), 1, 0, file.length(), null); assertSplitAtFractionExhaustive(source, options); } + + @Test + public void testDisplayData() { + String filePattern = "gs://bucket/foo/bar"; + FileBasedSource source = new TestFileBasedSource(filePattern, 1, null); + assertThat(DisplayData.from(source), hasDisplayItem(allOf( + hasKey("filePattern"), + hasValue(filePattern), + hasLinkUrl(GcsPath.fromUri(filePattern).getBrowseUrl())))); + } + + + @Test + public void testDisplayDataUnknownFileScheme() { + FileBasedSource source = new TestFileBasedSource("unknown://foo.txt", 1, null); + assertThat(DisplayData.from(source), hasDisplayItem(allOf( + hasKey("filePattern"), + not(hasLinkUrl())))); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 4d6d8dd4ac0b..2117e9b18461 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -22,8 +22,13 @@ import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY; import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLinkUrl; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.core.AllOf.allOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -36,6 +41,7 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.TextIO.CompressionType; import org.apache.beam.sdk.io.TextIO.TextSource; +import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.SourceTestUtils; @@ -45,11 +51,14 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import com.google.common.collect.ImmutableList; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -79,6 +88,11 @@ public class TextIOTest { @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @Rule public ExpectedException expectedException = ExpectedException.none(); + @BeforeClass + public static void setUpClass() { + IOChannelUtils.registerStandardIOFactories(PipelineOptionsFactory.as(GcsOptions.class)); + } + void runTestRead(T[] expected, Coder coder) throws Exception { File tmpFile = tmpFolder.newFile("file.txt"); String filename = tmpFile.getPath(); @@ -159,18 +173,44 @@ public void testReadNamed() throws Exception { @Test public void testReadDisplayData() { + String filePattern = "gs://bucket/foo/bar*"; + TextIO.Read.Bound read = TextIO.Read - .from("foo.*") + .from(filePattern) .withCompressionType(CompressionType.BZIP2) .withoutValidation(); DisplayData displayData = DisplayData.from(read); - assertThat(displayData, hasDisplayItem("filePattern", "foo.*")); + assertThat(displayData, hasDisplayItem(allOf( + hasKey("filePattern"), + hasValue(filePattern), + hasLinkUrl(GcsPath.fromUri(filePattern).getBrowseUrl())))); assertThat(displayData, hasDisplayItem("compressionType", CompressionType.BZIP2.toString())); assertThat(displayData, hasDisplayItem("validation", false)); } + /** + * A known file scheme is necessary to construct a link URL for display data. + * Verify that a bad file scheme doesn't throw an exception in display data unless + * validation is enabled. + */ + @Test + public void testDisplayDataResilientToUnknownFileScheme() { + DisplayData unvalidatedDisplayData = DisplayData.from(TextIO.Read + .from("foo://bar/baz") + .withoutValidation()); + assertThat(unvalidatedDisplayData, hasDisplayItem(Matchers.allOf( + hasKey("filePattern"), + not(hasLinkUrl()) + ))); + + String validatedScheme = "omg"; + expectedException.expectMessage(validatedScheme); + + DisplayData.from(TextIO.Read.from(validatedScheme + "://foo/bar")); + } + void runTestWrite(T[] elems, Coder coder) throws Exception { runTestWrite(elems, coder, 1); } @@ -292,8 +332,9 @@ public void testShardedWrite() throws Exception { @Test public void testWriteDisplayData() { + String filePrefix = "gs://bucket/foo/thing"; TextIO.Write.Bound write = TextIO.Write - .to("foo") + .to(filePrefix) .withSuffix("bar") .withShardNameTemplate("-SS-of-NN-") .withNumShards(100) @@ -301,7 +342,7 @@ public void testWriteDisplayData() { DisplayData displayData = DisplayData.from(write); - assertThat(displayData, hasDisplayItem("filePrefix", "foo")); + assertThat(displayData, hasDisplayItem("filePrefix", filePrefix)); assertThat(displayData, hasDisplayItem("fileSuffix", "bar")); assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-")); assertThat(displayData, hasDisplayItem("numShards", 100)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java index e3721b8f6ef4..ee8a334a676f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java @@ -332,11 +332,38 @@ protected DisplayData.Type featureValueOf(DisplayData.Item actual) { * Creates a matcher that matches if the examined {@link DisplayData.Item} has the specified * value. */ - public static Matcher> hasValue(Object value) { return hasValue(Matchers.is(value)); } + /** + * Creates a matcher that matches if the examined {@link DisplayData.Item} contains any link url. + */ + public static Matcher> hasLinkUrl() { + return hasLinkUrl(Matchers.not(Matchers.isEmptyOrNullString())); + } + + /** + * Creates a matcher that matches if the examined {@link DisplayData.Item} has the specified + * link url. + */ + public static Matcher> hasLinkUrl(String linkUrl) { + return hasLinkUrl(Matchers.is(linkUrl)); + } + + /** + * Creates a matcher that matches if the examined {@link DisplayData.Item} contains a link url + * matching the specified value matcher. + */ + public static Matcher> hasLinkUrl(Matcher urlMatcher) { + return new FeatureMatcher, String>(urlMatcher, "with link url", "url") { + @Override + protected String featureValueOf(Item actual) { + return actual.getLinkUrl(); + } + }; + } + /** * Creates a matcher that matches if the examined {@link DisplayData.Item} contains a value * matching the specified value matcher. diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java index f848c5ed3082..50f57c5e026e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLinkUrl; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; @@ -97,6 +98,21 @@ public void testHasValue() { assertThat(createDisplayDataWithItem("foo", "bar"), matcher); } + @Test + public void testHasLinkUrl() { + DisplayData displayData = DisplayData.from(new HasDisplayData() { + @Override + public void populateDisplayData(Builder builder) { + builder.add(DisplayData.item("foo", "bar").withLinkUrl("http://go")); + } + }); + + Matcher matcher = hasDisplayItem(hasLinkUrl("http://gooooo")); + assertFalse(matcher.matches(displayData)); + + assertThat(displayData, hasDisplayItem(hasLinkUrl("http://go"))); + } + @Test public void testHasNamespace() { Matcher matcher = hasDisplayItem(hasNamespace(SampleTransform.class)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java index 851769a03e60..b88cc12ece39 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLinkUrl; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; @@ -198,7 +199,7 @@ public void populateDisplayData(DisplayData.Builder builder) { hasValue(ISO_FORMATTER.print(value)), hasShortValue(nullValue(String.class)), hasLabel(is("the current instant")), - hasUrl(is("http://time.gov"))); + hasLinkUrl("http://time.gov")); assertThat(item, matchesAllOf); } @@ -216,7 +217,9 @@ public void populateDisplayData(DisplayData.Builder builder) { assertThat( data, - hasDisplayItem(allOf(hasLabel(nullValue(String.class)), hasUrl(nullValue(String.class))))); + hasDisplayItem(allOf( + hasLabel(nullValue(String.class)), + hasLinkUrl(nullValue(String.class))))); } @Test @@ -1011,16 +1014,6 @@ protected String featureValueOf(DisplayData.Item actual) { }; } - private static Matcher> hasUrl(Matcher urlMatcher) { - return new FeatureMatcher, String>( - urlMatcher, "display item with url", "URL") { - @Override - protected String featureValueOf(DisplayData.Item actual) { - return actual.getLinkUrl(); - } - }; - } - private static Matcher> hasShortValue(Matcher valueStringMatcher) { return new FeatureMatcher, T>( valueStringMatcher, "display item with short value", "short value") { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java index b510408fe960..27f076d95c4a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java @@ -36,6 +36,7 @@ import java.io.File; import java.io.FileNotFoundException; +import java.io.IOException; import java.io.Reader; import java.io.Writer; import java.nio.channels.Channels; @@ -224,4 +225,32 @@ public void testGetSizeBytesForNonExistentFile() throws Exception { factory.getSizeBytes( factory.resolve(temporaryFolder.getRoot().getPath(), "non-existent-file")); } + + @Test + public void testBrowseUrl() throws IOException { + class TestData { + String description; + String path; + String expectedBrowseUrl; + + TestData(String description, String path, String expectedBrowseUrl) { + this.description = description; + this.path = path; + this.expectedBrowseUrl = expectedBrowseUrl; + } + } + + File file = temporaryFolder.newFile(); + List tests = ImmutableList.builder() + .add(new TestData("absolute", "/a/b/c", "file:/a/b/c")) + .add(new TestData("relative", "a/b/c", new File("").toURI().toString() + "a/b/c")) + .add(new TestData("temp path", file.getPath(), file.toURI().toString())) + .build(); + + for (TestData test : tests) { + String actual = factory.getBrowseUrl(test.path); + assertEquals(String.format("Browse URL for %s: %s", test.description, test.path), + test.expectedBrowseUrl, actual); + } + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java index c79f1a1faf86..2afade4fbfd4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java @@ -18,8 +18,11 @@ package org.apache.beam.sdk.util; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.common.collect.Lists; import com.google.common.io.Files; import org.junit.Assert; @@ -30,7 +33,14 @@ import org.junit.runners.JUnit4; import java.io.File; +import java.io.IOException; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.UUID; + +import javax.annotation.Nullable; /** * Tests for IOChannelUtils. @@ -92,4 +102,54 @@ public void testResolve() throws Exception { String expected = tmpFolder.getRoot().toPath().resolve("aa").toString(); assertEquals(expected, IOChannelUtils.resolve(tmpFolder.getRoot().toString(), "aa")); } + + @Test + public void testHasFactory() { + assertTrue(IOChannelUtils.hasFactory("no/scheme.txt")); + + String randomScheme = "test" + UUID.randomUUID(); + String prefixedFile = randomScheme + "://foo/bar.txt"; + assertFalse(IOChannelUtils.hasFactory(prefixedFile)); + + IOChannelUtils.setIOFactory(randomScheme, new StubIOChannelFactory()); + assertTrue(IOChannelUtils.hasFactory(prefixedFile)); + } + + private static class StubIOChannelFactory implements IOChannelFactory { + @Override + public Collection match(String spec) throws IOException { + return Lists.newArrayList(); + } + + @Override + public ReadableByteChannel open(String spec) throws IOException { + return null; + } + + @Override + public WritableByteChannel create(String spec, String mimeType) throws IOException { + return null; + } + + @Override + public long getSizeBytes(String spec) throws IOException { + return 0; + } + + @Override + public boolean isReadSeekEfficient(String spec) throws IOException { + return false; + } + + @Override + public String resolve(String path, String other) throws IOException { + return null; + } + + @Nullable + @Override + public String getBrowseUrl(String path) { + return null; + } + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java index fdd1dfd6e7aa..f4288ee43ee1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java @@ -28,11 +28,13 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; import java.net.URI; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; +import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -331,4 +333,55 @@ public void testSubPathError() { a.subpath(1, 1); // throws IllegalArgumentException Assert.fail(); } + + @RunWith(Parameterized.class) + public static class DisplayDataBrowseUrl { + + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList(new Object[][]{ + { + "gs://bucket/a/b/c/object", + "https://storage.cloud.google.com/bucket/a/b/c/object"}, + { + "gs://bucket/", + "https://console.cloud.google.com/storage/browser/bucket/"}, + { + "gs://bucket/subdir/", + "https://console.cloud.google.com/storage/browser/bucket/subdir/"}, + { + "gs://bucket/subdir/*", + "https://console.cloud.google.com/storage/browser/bucket/subdir/"}, + { + "gs://bucket/subdir/foo*", + "https://console.cloud.google.com/storage/browser/bucket/subdir/"}, + { + "gs://bucket/subdir/**/b", + "https://console.cloud.google.com/storage/browser/bucket/subdir/"}, + { + "gs://bucket/subdir/b?", + "https://console.cloud.google.com/storage/browser/bucket/subdir/"}, + { + "gs://bucket/subdir/[bcd]", + "https://console.cloud.google.com/storage/browser/bucket/subdir/"}, + { + "gs://bucket/subdir*/foo", + "https://console.cloud.google.com/storage/browser/bucket/"}}); + } + + private final String input; + private final String expected; + + public DisplayDataBrowseUrl(String input, String expected) { + this.input = input; + this.expected = expected; + } + + @Test + public void test() { + GcsPath path = GcsPath.fromUri(input); + String actual = path.getBrowseUrl(); + assertEquals(String.format("Browse URL for %s", path), expected, actual); + } + } }