diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java index 51bb83ea9cc0..bdf10c47eef3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java @@ -20,31 +20,66 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import org.apache.beam.sdk.coders.AtomicCoder; +import java.util.Collections; +import java.util.List; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.MetadataCoder; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; -/** A {@link Coder} for {@link FileIO.ReadableFile}. */ -public class ReadableFileCoder extends AtomicCoder { - private static final ReadableFileCoder INSTANCE = new ReadableFileCoder(); +/** A {@link Coder} for {@link org.apache.beam.sdk.io.FileIO.ReadableFile}. */ +public class ReadableFileCoder extends StructuredCoder { + + private final Coder metadataCoder; + + public static ReadableFileCoder of(Coder metadataCoder) { + return new ReadableFileCoder(metadataCoder); + } - /** Returns the instance of {@link ReadableFileCoder}. */ public static ReadableFileCoder of() { - return INSTANCE; + return new ReadableFileCoder(MetadataCoder.of()); + } + + public Coder getMetadataCoder() { + return metadataCoder; + } + + private ReadableFileCoder(Coder metadataCoder) { + this.metadataCoder = metadataCoder; } @Override - public void encode(FileIO.ReadableFile value, OutputStream os) throws IOException { - MetadataCoder.of().encode(value.getMetadata(), os); - VarIntCoder.of().encode(value.getCompression().ordinal(), os); + public void encode( + FileIO.ReadableFile value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) + throws CoderException, IOException { + getMetadataCoder().encode(value.getMetadata(), outStream); + VarIntCoder.of().encode(value.getCompression().ordinal(), outStream); } @Override - public FileIO.ReadableFile decode(InputStream is) throws IOException { - MatchResult.Metadata metadata = MetadataCoder.of().decode(is); - Compression compression = Compression.values()[VarIntCoder.of().decode(is)]; + public FileIO.ReadableFile decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) + throws CoderException, IOException { + MatchResult.Metadata metadata = getMetadataCoder().decode(inStream); + Compression compression = Compression.values()[VarIntCoder.of().decode(inStream)]; return new FileIO.ReadableFile(metadata, compression); } + + @Override + public @UnknownKeyFor @NonNull @Initialized List> getCoderArguments() { + return Collections.singletonList(metadataCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + // ignore the default Metadata coder for backward compatible + if (!getMetadataCoder().equals(MetadataCoder.of())) { + verifyDeterministic(this, "Metadata coder must be deterministic", getMetadataCoder()); + } + } }