Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,66 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.beam.sdk.coders.AtomicCoder;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.MetadataCoder;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

/** A {@link Coder} for {@link FileIO.ReadableFile}. */
public class ReadableFileCoder extends AtomicCoder<FileIO.ReadableFile> {
private static final ReadableFileCoder INSTANCE = new ReadableFileCoder();
/** A {@link Coder} for {@link org.apache.beam.sdk.io.FileIO.ReadableFile}. */
public class ReadableFileCoder extends StructuredCoder<FileIO.ReadableFile> {

private final Coder<Metadata> metadataCoder;

public static ReadableFileCoder of(Coder<Metadata> metadataCoder) {
return new ReadableFileCoder(metadataCoder);
}

/** Returns the instance of {@link ReadableFileCoder}. */
public static ReadableFileCoder of() {
return INSTANCE;
return new ReadableFileCoder(MetadataCoder.of());
}

public Coder<Metadata> getMetadataCoder() {
return metadataCoder;
}

private ReadableFileCoder(Coder<Metadata> metadataCoder) {
this.metadataCoder = metadataCoder;
}

@Override
public void encode(FileIO.ReadableFile value, OutputStream os) throws IOException {
MetadataCoder.of().encode(value.getMetadata(), os);
VarIntCoder.of().encode(value.getCompression().ordinal(), os);
public void encode(
FileIO.ReadableFile value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream)
throws CoderException, IOException {
getMetadataCoder().encode(value.getMetadata(), outStream);
VarIntCoder.of().encode(value.getCompression().ordinal(), outStream);
}

@Override
public FileIO.ReadableFile decode(InputStream is) throws IOException {
MatchResult.Metadata metadata = MetadataCoder.of().decode(is);
Compression compression = Compression.values()[VarIntCoder.of().decode(is)];
public FileIO.ReadableFile decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream)
throws CoderException, IOException {
MatchResult.Metadata metadata = getMetadataCoder().decode(inStream);
Compression compression = Compression.values()[VarIntCoder.of().decode(inStream)];
return new FileIO.ReadableFile(metadata, compression);
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<? extends Coder<?>> getCoderArguments() {
return Collections.singletonList(metadataCoder);
}

@Override
public void verifyDeterministic() throws NonDeterministicException {
// ignore the default Metadata coder for backward compatible
if (!getMetadataCoder().equals(MetadataCoder.of())) {
verifyDeterministic(this, "Metadata coder must be deterministic", getMetadataCoder());
}
}
}