Skip to content
This repository was archived by the owner on Oct 29, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>1.1.0</version>
<version>1.2.1</version>
</dependency>

<!-- Google client dependencies -->
Expand Down Expand Up @@ -114,7 +114,7 @@
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-genomics</artifactId>
<version>v1beta2-rev25-1.19.1</version>
<version>v1beta2-rev87-1.20.0</version>
<exclusions>
<!-- Exclude an old version of guava which is being pulled
in by a transitive dependency google-api-client 1.19.0 -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@
import com.google.api.client.googleapis.util.Utils;
import com.google.api.client.json.GenericJson;
import com.google.api.client.json.JsonFactory;
import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderProvider;
import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.values.TypeDescriptor;

import java.io.IOException;
import java.io.Serializable;

/**
* Can be used as a coder for any object that extends GenericJson.
Expand Down Expand Up @@ -67,4 +72,27 @@ public CloudObject asCloudObject() {
@Override protected String to(T object) throws IOException {
return JSON_FACTORY.toString(object);
}
}

/**
* Coder provider for all objects in the Google Genomics Java client library.
*/
public static final CoderProvider PROVIDER = new CoderProvider() {
@Override
@SuppressWarnings("unchecked")
public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor)
throws CannotProvideCoderException {
Class<T> rawType = (Class<T>) typeDescriptor.getRawType();
if (!GenericJson.class.isAssignableFrom(rawType)) {
if (Serializable.class.isAssignableFrom(rawType)) {
// Fall back this here because if this is used as the follback coder, it overwrites the
// default fallback CoderProvider of SerializableCoder.PROVIDER.
return (Coder<T>) SerializableCoder.of((Class<? extends Serializable>) rawType);
} else {
throw new CannotProvideCoderException("Class " + rawType
+ " does not implement GenericJson or Serialized");
}
}
return (Coder<T>) GenericJsonCoder.of((Class<? extends GenericJson>) rawType);
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.genomics.dataflow.coders.GenericJsonCoder;
import com.google.cloud.genomics.dataflow.utils.AnnotationUtils;
import com.google.cloud.genomics.dataflow.utils.AnnotationUtils.VariantEffect;
import com.google.cloud.genomics.dataflow.utils.DataflowWorkarounds;
import com.google.cloud.genomics.dataflow.utils.GenomicsDatasetOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsOptions;
import com.google.cloud.genomics.utils.GenomicsFactory;
Expand Down Expand Up @@ -277,7 +277,8 @@ public static void main(String[] args) throws Exception {
ShardUtils.getPaginatedVariantRequests(opts.getDatasetId(), opts.getReferences(), opts.getBasesPerShard());

Pipeline p = Pipeline.create(opts);
DataflowWorkarounds.registerGenomicsCoders(p);
p.getCoderRegistry().setFallbackCoderProvider(GenericJsonCoder.PROVIDER);

p.begin()
.apply(Create.of(requests))
.apply(ParDo.of(new AnnotateVariants(auth, callSetIds, transcriptSetIds, variantAnnotationSetIds)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.api.services.genomics.model.Position;
import com.google.api.services.genomics.model.RangePosition;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
Expand All @@ -43,7 +42,6 @@
import com.google.cloud.genomics.dataflow.coders.GenericJsonCoder;
import com.google.cloud.genomics.dataflow.model.PosRgsMq;
import com.google.cloud.genomics.dataflow.readers.ReadGroupStreamer;
import com.google.cloud.genomics.dataflow.utils.DataflowWorkarounds;
import com.google.cloud.genomics.dataflow.utils.GCSOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsDatasetOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsOptions;
Expand Down Expand Up @@ -147,9 +145,7 @@ public static void main(String[] args) throws GeneralSecurityException, IOExcept
auth = GenomicsOptions.Methods.getGenomicsAuth(options);

p = Pipeline.create(options);
DataflowWorkarounds.registerGenomicsCoders(p);
DataflowWorkarounds.registerCoder(p, PosRgsMq.class, GenericJsonCoder.of(PosRgsMq.class));
DataflowWorkarounds.registerCoder(p, Read.class, SerializableCoder.of(Read.class));
p.getCoderRegistry().setFallbackCoderProvider(GenericJsonCoder.PROVIDER);

if (options.getInputDatasetId().isEmpty() && options.getReadGroupSetIds().isEmpty()) {
throw new IllegalArgumentException("InputDatasetId or ReadGroupSetIds must be specified");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.genomics.dataflow.coders.GenericJsonCoder;
import com.google.cloud.genomics.dataflow.readers.ReadReader;
import com.google.cloud.genomics.dataflow.readers.bam.ReadBAMTransform;
import com.google.cloud.genomics.dataflow.readers.bam.Reader;
import com.google.cloud.genomics.dataflow.readers.bam.ReaderOptions;
import com.google.cloud.genomics.dataflow.readers.bam.ShardingPolicy;
import com.google.cloud.genomics.dataflow.utils.DataflowWorkarounds;
import com.google.cloud.genomics.dataflow.utils.GCSOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsDatasetOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsOptions;
Expand Down Expand Up @@ -101,7 +101,7 @@ public static void main(String[] args) throws GeneralSecurityException, IOExcept
GenomicsDatasetOptions.Methods.validateOptions(options);
auth = GenomicsOptions.Methods.getGenomicsAuth(options);
p = Pipeline.create(options);
DataflowWorkarounds.registerGenomicsCoders(p);
p.getCoderRegistry().setFallbackCoderProvider(GenericJsonCoder.PROVIDER);

// ensure data is accessible
String BAMFilePath = options.getBAMFilePath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.genomics.dataflow.coders.GenericJsonCoder;
import com.google.cloud.genomics.dataflow.functions.AlleleSimilarityCalculator;
import com.google.cloud.genomics.dataflow.functions.CallSimilarityCalculatorFactory;
import com.google.cloud.genomics.dataflow.functions.FormatIBSData;
import com.google.cloud.genomics.dataflow.functions.IBSCalculator;
import com.google.cloud.genomics.dataflow.functions.JoinNonVariantSegmentsWithVariants;
import com.google.cloud.genomics.dataflow.readers.VariantReader;
import com.google.cloud.genomics.dataflow.utils.DataflowWorkarounds;
import com.google.cloud.genomics.dataflow.utils.GenomicsDatasetOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsOptions;
import com.google.cloud.genomics.dataflow.utils.IdentityByStateOptions;
Expand Down Expand Up @@ -68,7 +68,7 @@ public static void main(String[] args) throws IOException, GeneralSecurityExcept
ShardUtils.getPaginatedVariantRequests(options.getDatasetId(), options.getReferences(), options.getBasesPerShard());

Pipeline p = Pipeline.create(options);
DataflowWorkarounds.registerGenomicsCoders(p);
p.getCoderRegistry().setFallbackCoderProvider(GenericJsonCoder.PROVIDER);
PCollection<SearchVariantsRequest> input = p.begin().apply(Create.of(requests));

PCollection<Variant> variants =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.api.services.genomics.model.Read;
import com.google.api.services.storage.Storage;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.DelegateCoder;
Expand All @@ -27,11 +26,11 @@
import com.google.cloud.dataflow.sdk.util.Transport;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.genomics.dataflow.coders.GenericJsonCoder;
import com.google.cloud.genomics.dataflow.readers.bam.BAMIO;
import com.google.cloud.genomics.dataflow.readers.bam.ReadBAMTransform;
import com.google.cloud.genomics.dataflow.readers.bam.ReaderOptions;
import com.google.cloud.genomics.dataflow.readers.bam.ShardingPolicy;
import com.google.cloud.genomics.dataflow.utils.DataflowWorkarounds;
import com.google.cloud.genomics.dataflow.utils.GCSOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsDatasetOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsOptions;
Expand Down Expand Up @@ -85,8 +84,8 @@ public static void main(String[] args) throws GeneralSecurityException, IOExcept
auth = GenomicsOptions.Methods.getGenomicsAuth(options);
pipeline = Pipeline.create(options);
// Register coders.
DataflowWorkarounds.registerGenomicsCoders(pipeline);
DataflowWorkarounds.registerCoder(pipeline, Contig.class, CONTIG_CODER);
pipeline.getCoderRegistry().setFallbackCoderProvider(GenericJsonCoder.PROVIDER);
pipeline.getCoderRegistry().registerCoder(Contig.class, CONTIG_CODER);
// Process options.
contigs = Contig.parseContigsFromCommandLine(options.getReferences());
// Get header info.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.genomics.dataflow.coders.GenericJsonCoder;
import com.google.cloud.genomics.dataflow.functions.CalculateTransmissionProbability;
import com.google.cloud.genomics.dataflow.functions.ExtractAlleleTransmissionStatus;
import com.google.cloud.genomics.dataflow.model.Allele;
import com.google.cloud.genomics.dataflow.readers.VariantReader;
import com.google.cloud.genomics.dataflow.utils.DataflowWorkarounds;
import com.google.cloud.genomics.dataflow.utils.GenomicsDatasetOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsOptions;
import com.google.cloud.genomics.utils.GenomicsFactory;
Expand Down Expand Up @@ -62,7 +62,7 @@ public static void main(String[] args) throws IOException, GeneralSecurityExcept
ShardUtils.getPaginatedVariantRequests(options.getDatasetId(), options.getReferences(), options.getBasesPerShard());

Pipeline p = Pipeline.create(options);
DataflowWorkarounds.registerGenomicsCoders(p);
p.getCoderRegistry().setFallbackCoderProvider(GenericJsonCoder.PROVIDER);

// The below pipeline works as follows:
// - Fetch the variants,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.genomics.dataflow.coders.GenericJsonCoder;
import com.google.cloud.genomics.dataflow.functions.ExtractSimilarCallsets;
import com.google.cloud.genomics.dataflow.functions.OutputPCoAFile;
import com.google.cloud.genomics.dataflow.readers.VariantReader;
import com.google.cloud.genomics.dataflow.readers.VariantStreamer;
import com.google.cloud.genomics.dataflow.utils.DataflowWorkarounds;
import com.google.cloud.genomics.dataflow.utils.GCSOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsDatasetOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsOptions;
Expand Down Expand Up @@ -84,11 +84,6 @@ public static void main(String[] args) throws IOException, GeneralSecurityExcept
}

Pipeline p = Pipeline.create(options);
DataflowWorkarounds.registerGenomicsCoders(p);
DataflowWorkarounds.registerCoder(p, Variant.class,
SerializableCoder.of(Variant.class));
DataflowWorkarounds.registerCoder(p, StreamVariantsRequest.class,
Proto2Coder.of(StreamVariantsRequest.class));

p.begin();
PCollection<KV<KV<String, String>, Long>> similarCallsets = null;
Expand All @@ -103,6 +98,7 @@ public static void main(String[] args) throws IOException, GeneralSecurityExcept
.apply(new VariantStreamer(auth, ShardBoundary.Requirement.STRICT, VARIANT_FIELDS))
.apply(ParDo.of(new ExtractSimilarCallsets.v1()));
} else {
p.getCoderRegistry().setFallbackCoderProvider(GenericJsonCoder.PROVIDER);
List<SearchVariantsRequest> requests = options.isAllReferences() ?
ShardUtils.getPaginatedVariantRequests(options.getDatasetId(), ShardUtils.SexChromosomeFilter.EXCLUDE_XY,
options.getBasesPerShard(), auth) :
Expand Down

This file was deleted.

Loading