diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index 76717ad6b9f5..9295981744d5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -23,6 +23,7 @@ import static org.apache.beam.sdk.transforms.Contextful.fn; import com.google.auto.value.AutoValue; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.collect.Lists; import java.io.IOException; @@ -1147,6 +1148,52 @@ public Write withIgnoreWindowing() { return toBuilder().setIgnoreWindowing(true).build(); } + @VisibleForTesting + Contextful> resolveFileNamingFn() { + if (getDynamic()) { + checkArgument( + getConstantFileNaming() == null, + "when using writeDynamic(), must use versions of .withNaming() " + + "that take functions from DestinationT"); + checkArgument(getFilenamePrefix() == null, ".withPrefix() requires write()"); + checkArgument(getFilenameSuffix() == null, ".withSuffix() requires write()"); + checkArgument( + getFileNamingFn() != null, + "when using writeDynamic(), must specify " + + ".withNaming() taking a function form DestinationT"); + return fn( + (element, c) -> { + FileNaming naming = getFileNamingFn().getClosure().apply(element, c); + return getOutputDirectory() == null + ? naming + : relativeFileNaming(getOutputDirectory(), naming); + }, + getFileNamingFn().getRequirements()); + } else { + checkArgument(getFileNamingFn() == null, + ".withNaming() taking a function from DestinationT requires writeDynamic()"); + FileNaming constantFileNaming; + if (getConstantFileNaming() == null) { + constantFileNaming = defaultNaming( + MoreObjects.firstNonNull( + getFilenamePrefix(), StaticValueProvider.of("output")), + MoreObjects.firstNonNull(getFilenameSuffix(), StaticValueProvider.of(""))); + } else { + checkArgument( + getFilenamePrefix() == null, + ".to(FileNaming) is incompatible with .withSuffix()"); + checkArgument( + getFilenameSuffix() == null, + ".to(FileNaming) is incompatible with .withPrefix()"); + constantFileNaming = getConstantFileNaming(); + } + if (getOutputDirectory() != null) { + constantFileNaming = relativeFileNaming(getOutputDirectory(), constantFileNaming); + } + return fn(SerializableFunctions.constant(constantFileNaming)); + } + } + @Override public WriteFilesResult expand(PCollection input) { Write.Builder resolvedSpec = new AutoValue_FileIO_Write.Builder<>(); @@ -1172,52 +1219,7 @@ public WriteFilesResult expand(PCollection input) { resolvedSpec.setDestinationCoder((Coder) VoidCoder.of()); } - // Resolve fileNamingFn - Contextful> fileNamingFn; - if (getDynamic()) { - checkArgument( - getConstantFileNaming() == null, - "when using writeDynamic(), must use versions of .withNaming() " - + "that take functions from DestinationT"); - checkArgument(getFilenamePrefix() == null, ".withPrefix() requires write()"); - checkArgument(getFilenameSuffix() == null, ".withSuffix() requires write()"); - checkArgument( - getFileNamingFn() != null, - "when using writeDynamic(), must specify " - + ".withNaming() taking a function form DestinationT"); - fileNamingFn = - Contextful.fn( - (element, c) -> { - FileNaming naming = getFileNamingFn().getClosure().apply(element, c); - return getOutputDirectory() == null - ? naming - : relativeFileNaming(getOutputDirectory(), naming); - }, - getFileNamingFn().getRequirements()); - } else { - checkArgument(getFileNamingFn() == null, - ".withNaming() taking a function from DestinationT requires writeDynamic()"); - FileNaming constantFileNaming; - if (getConstantFileNaming() == null) { - constantFileNaming = defaultNaming( - MoreObjects.firstNonNull( - getFilenamePrefix(), StaticValueProvider.of("output")), - MoreObjects.firstNonNull(getFilenameSuffix(), StaticValueProvider.of(""))); - if (getOutputDirectory() != null) { - constantFileNaming = relativeFileNaming(getOutputDirectory(), constantFileNaming); - } - } else { - checkArgument( - getFilenamePrefix() == null, ".to(FileNaming) is incompatible with .withSuffix()"); - checkArgument( - getFilenameSuffix() == null, ".to(FileNaming) is incompatible with .withPrefix()"); - constantFileNaming = getConstantFileNaming(); - } - fileNamingFn = - fn(SerializableFunctions.constant(constantFileNaming)); - } - - resolvedSpec.setFileNamingFn(fileNamingFn); + resolvedSpec.setFileNamingFn(resolveFileNamingFn()); resolvedSpec.setEmptyWindowDestination(getEmptyWindowDestination()); if (getTempDirectory() == null) { checkArgument( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java index 608fd0a8b1c6..15989d622a87 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io; +import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE; import static org.hamcrest.Matchers.isA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -42,6 +43,8 @@ import org.apache.beam.sdk.testing.UsesSplittableParDo; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Watch; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; import org.junit.Rule; @@ -301,4 +304,106 @@ private static MatchResult.Metadata metadata(Path path, int size) { .setSizeBytes(size) .build(); } + + @Test + public void testFilenameFnResolution() throws Exception { + FileIO.Write.FileNaming testFileNaming = FileIO.writeDynamic() + .to("test") + .withNaming(o -> (window, pane, numShards, shardIndex, compression) -> "foo") + .resolveFileNamingFn() + .getClosure() + .apply(null, null); + + assertEquals( + "Filenames should be resolved within a relative directory if '.to' is invoked", + FileSystems.matchNewResource("test", true) + .resolve("foo", RESOLVE_FILE) + .toString(), + testFileNaming.getFilename(null, null, 0, 0, null) + ); + + testFileNaming = FileIO.write() + .to("test") + .withNaming((window, pane, numShards, shardIndex, compression) -> "foo") + .resolveFileNamingFn() + .getClosure() + .apply(null, null); + + assertEquals( + "Filenames should be resolved within a relative directory if '.to' is invoked", + FileSystems.matchNewResource("test", true) + .resolve("foo", RESOLVE_FILE) + .toString(), + testFileNaming.getFilename(null, null, 0, 0, null) + ); + + testFileNaming = FileIO.writeDynamic() + .withNaming(o -> (window, pane, numShards, shardIndex, compression) -> "foo") + .resolveFileNamingFn() + .getClosure() + .apply(null, null); + + assertEquals( + "Filenames should be resolved as the direct result of the filenaming function if '.to' " + + "is not invoked", + "foo", + testFileNaming.getFilename(null, null, 0, 0, null) + ); + + testFileNaming = FileIO.write() + .withNaming((window, pane, numShards, shardIndex, compression) -> "foo") + .resolveFileNamingFn() + .getClosure() + .apply(null, null); + + assertEquals( + "Filenames should be resolved as the direct result of the filenaming function if '.to' " + + "is not invoked", + "foo", + testFileNaming.getFilename(null, null, 0, 0, null) + ); + + testFileNaming = FileIO.write() + .resolveFileNamingFn() + .getClosure() + .apply(null, null); + + assertEquals( + "Default to the defaultNaming if a filenaming isn't provided for a non-dynamic write", + "output-00000-of-00000", + testFileNaming.getFilename(GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING, 0, + 0, Compression.UNCOMPRESSED) + ); + + testFileNaming = FileIO.write() + .withPrefix("foo") + .withSuffix(".bar") + .resolveFileNamingFn() + .getClosure() + .apply(null, null); + + assertEquals( + "Default Naming should take prefix and suffix into account if provided", + "foo-00000-of-00000.bar", + testFileNaming.getFilename(GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING, 0, + 0, Compression.UNCOMPRESSED) + ); + + testFileNaming = FileIO.write() + .to("test") + .resolveFileNamingFn() + .getClosure() + .apply(null, null); + + + assertEquals( + "Filenames should be resolved within a relative directory if '.to' is invoked, " + + "even with default naming", + FileSystems.matchNewResource("test", true) + .resolve("output-00000-of-00000", RESOLVE_FILE) + .toString(), + testFileNaming.getFilename(GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING, 0, + 0, Compression.UNCOMPRESSED) + ); + } }