diff --git a/CHANGES.md b/CHANGES.md index cb0f4e89bb6c..2b159cedb373 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -121,6 +121,9 @@ conversion to beam schema options. *Remark: Schema aware is still experimental.* values as strings) into Python native types that are written to Avro (Python's date, datetime types, decimal, etc). For more information see https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#avro_conversions. +* Added integration of Java SDK with Google Cloud AI VideoIntelligence service +([BEAM-9147](https://issues.apache.org/jira/browse/BEAM-9147)) + ## Breaking Changes diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoBytesWithContextFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoBytesWithContextFn.java new file mode 100644 index 000000000000..c8edcd106cdb --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoBytesWithContextFn.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.cloud.videointelligence.v1.Feature; +import com.google.cloud.videointelligence.v1.VideoAnnotationResults; +import com.google.cloud.videointelligence.v1.VideoContext; +import com.google.protobuf.ByteString; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.values.KV; + +/** + * Implementation of AnnotateVideoFn accepting KVs as contents of input PCollection. Keys are the + * ByteString encoded video contents, values - VideoContext objects. + */ +@Experimental +class AnnotateVideoBytesWithContextFn extends AnnotateVideoFn> { + + public AnnotateVideoBytesWithContextFn(List featureList) { + super(featureList); + } + + /** ProcessElement implementation. */ + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + ByteString element = context.element().getKey(); + VideoContext videoContext = context.element().getValue(); + List videoAnnotationResults = + getVideoAnnotationResults(null, element, videoContext); + context.output(videoAnnotationResults); + } +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java similarity index 93% rename from sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java rename to sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java index d74829d36990..a954ff330b88 100644 --- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java @@ -35,31 +35,31 @@ import org.apache.beam.sdk.values.PCollectionView; /** - * Base class for Video Intelligence transform. + * Base class for DoFns used in VideoIntelligence transforms. * * @param Class of input data being passed in - either ByteString - video data encoded into. * String or String - a GCS URI of the video to be annotated. */ @Experimental -public abstract class AnnotateVideo extends DoFn> { +abstract class AnnotateVideoFn extends DoFn> { protected final PCollectionView> contextSideInput; protected final List featureList; VideoIntelligenceServiceClient videoIntelligenceServiceClient; - public AnnotateVideo( + public AnnotateVideoFn( PCollectionView> contextSideInput, List featureList) { this.contextSideInput = contextSideInput; this.featureList = featureList; } - public AnnotateVideo(List featureList) { + public AnnotateVideoFn(List featureList) { contextSideInput = null; this.featureList = featureList; } - @StartBundle - public void startBundle() throws IOException { + @Setup + public void setup() throws IOException { videoIntelligenceServiceClient = VideoIntelligenceServiceClient.create(); } diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromBytesFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromBytesFn.java new file mode 100644 index 000000000000..20aa083e9509 --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromBytesFn.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.cloud.videointelligence.v1.Feature; +import com.google.cloud.videointelligence.v1.VideoAnnotationResults; +import com.google.cloud.videointelligence.v1.VideoContext; +import com.google.protobuf.ByteString; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Implementation of AnnotateVideoFn accepting ByteStrings as contents of input PCollection. Videos + * decoded from the ByteStrings are annotated. + */ +@Experimental +class AnnotateVideoFromBytesFn extends AnnotateVideoFn { + + public AnnotateVideoFromBytesFn( + PCollectionView> contextSideInput, List featureList) { + super(contextSideInput, featureList); + } + + /** Implementation of ProcessElement. */ + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + ByteString element = context.element(); + VideoContext videoContext = null; + if (contextSideInput != null) { + videoContext = context.sideInput(contextSideInput).get(element); + } + List videoAnnotationResults = + getVideoAnnotationResults(null, element, videoContext); + context.output(videoAnnotationResults); + } +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromURIFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromURIFn.java new file mode 100644 index 000000000000..5dfea0cc1710 --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromURIFn.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.cloud.videointelligence.v1.Feature; +import com.google.cloud.videointelligence.v1.VideoAnnotationResults; +import com.google.cloud.videointelligence.v1.VideoContext; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Implementation of AnnotateVideoFn accepting Strings as contents of input PCollection. Annotates + * videos found on GCS based on URIs from input PCollection. + */ +@Experimental +class AnnotateVideoFromURIFn extends AnnotateVideoFn { + + public AnnotateVideoFromURIFn( + PCollectionView> contextSideInput, List featureList) { + super(contextSideInput, featureList); + } + + /** ProcessElement implementation. */ + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + String elementURI = context.element(); + VideoContext videoContext = null; + if (contextSideInput != null) { + videoContext = context.sideInput(contextSideInput).get(elementURI); + } + List annotationResultsList = + getVideoAnnotationResults(elementURI, null, videoContext); + context.output(annotationResultsList); + } +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoURIWithContextFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoURIWithContextFn.java new file mode 100644 index 000000000000..a165d5ac9073 --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoURIWithContextFn.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.cloud.videointelligence.v1.Feature; +import com.google.cloud.videointelligence.v1.VideoAnnotationResults; +import com.google.cloud.videointelligence.v1.VideoContext; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.values.KV; + +/** + * Implementation of AnnotateVideoFn accepting KVs as contents of input PCollection. Keys are the + * GCS URIs, values - VideoContext objects. + */ +@Experimental +class AnnotateVideoURIWithContextFn extends AnnotateVideoFn> { + + public AnnotateVideoURIWithContextFn(List featureList) { + super(featureList); + } + + /** ProcessElement implementation. */ + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + String elementURI = context.element().getKey(); + VideoContext videoContext = context.element().getValue(); + List videoAnnotationResults = + getVideoAnnotationResults(elementURI, null, videoContext); + context.output(videoAnnotationResults); + } +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java index 9278e542dcb9..fcee65a38ce5 100644 --- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java @@ -23,15 +23,17 @@ import com.google.protobuf.ByteString; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; /** - * Factory class for AnnotateVideo subclasses. allows integration with Google Cloud AI - - * VideoIntelligence service. Converts GCS URIs of videos or ByteStrings with video contents into - * Lists of VideoAnnotationResults. + * Factory class for PTransforms integrating with Google Cloud AI - VideoIntelligence service. + * Converts GCS URIs of videos or ByteStrings with video contents into Lists of + * VideoAnnotationResults. * *

Adding a side input of Maps of elements to VideoContext objects is allowed, so is using KVs of * element and VideoContext as input. @@ -46,11 +48,11 @@ public class VideoIntelligence { * * @param featureList List of features to be annotated * @param contextSideInput Optional side input with map of contexts to URIs - * @return DoFn performing the necessary operations + * @return PTransform performing the necessary operations */ - public static AnnotateVideoFromURI annotateFromURI( + public static AnnotateVideoFromUri annotateFromURI( List featureList, PCollectionView> contextSideInput) { - return new AnnotateVideoFromURI(contextSideInput, featureList); + return new AnnotateVideoFromUri(contextSideInput, featureList); } /** @@ -58,7 +60,7 @@ public static AnnotateVideoFromURI annotateFromURI( * * @param featureList List of features to be annotated * @param contextSideInput Optional side input with map of contexts to ByteStrings - * @return DoFn performing the necessary operations + * @return PTransform performing the necessary operations */ public static AnnotateVideoFromBytes annotateFromBytes( PCollectionView> contextSideInput, List featureList) { @@ -69,122 +71,116 @@ public static AnnotateVideoFromBytes annotateFromBytes( * Annotates videos from key-value pairs of GCS URI and VideoContext. * * @param featureList List of features to be annotated - * @return DoFn performing the necessary operations + * @return PTransform performing the necessary operations */ - public static AnnotateVideoURIWithContext annotateFromUriWithContext(List featureList) { - return new AnnotateVideoURIWithContext(featureList); + public static AnnotateVideoFromURIWithContext annotateFromUriWithContext( + List featureList) { + return new AnnotateVideoFromURIWithContext(featureList); } /** * Annotates videos from key-value pairs of ByteStrings and VideoContext. * * @param featureList List of features to be annotated - * @return DoFn performing the necessary operations + * @return PTransform performing the necessary operations */ - public static AnnotateVideoBytesWithContext annotateFromBytesWithContext( + public static AnnotateVideoFromBytesWithContext annotateFromBytesWithContext( List featureList) { - return new AnnotateVideoBytesWithContext(featureList); + return new AnnotateVideoFromBytesWithContext(featureList); } /** - * Implementation of AnnotateVideo accepting Strings as contents of input PCollection. Annotates - * videos found on GCS based on URIs from input PCollection. + * A PTransform taking a PCollection of {@link String} and an optional side input with a context + * map and emitting lists of {@link VideoAnnotationResults} for each element. Calls Cloud AI + * VideoIntelligence. */ @Experimental - public static class AnnotateVideoFromURI extends AnnotateVideo { + public static class AnnotateVideoFromUri + extends PTransform, PCollection>> { + + private final PCollectionView> contextSideInput; + private final List featureList; - public AnnotateVideoFromURI( + protected AnnotateVideoFromUri( PCollectionView> contextSideInput, List featureList) { - super(contextSideInput, featureList); + this.contextSideInput = contextSideInput; + this.featureList = featureList; } - /** ProcessElement implementation. */ @Override - public void processElement(ProcessContext context) - throws ExecutionException, InterruptedException { - String elementURI = context.element(); - VideoContext videoContext = null; - if (contextSideInput != null) { - videoContext = context.sideInput(contextSideInput).get(elementURI); - } - List annotationResultsList = - getVideoAnnotationResults(elementURI, null, videoContext); - context.output(annotationResultsList); + public PCollection> expand(PCollection input) { + return input.apply(ParDo.of(new AnnotateVideoFromURIFn(contextSideInput, featureList))); } } /** - * Implementation of AnnotateVideo accepting ByteStrings as contents of input PCollection. Videos - * decoded from the ByteStrings are annotated. + * A PTransform taking a PCollection of {@link ByteString} and an optional side input with a + * context map and emitting lists of {@link VideoAnnotationResults} for each element. Calls Cloud + * AI VideoIntelligence. */ @Experimental - public static class AnnotateVideoFromBytes extends AnnotateVideo { + public static class AnnotateVideoFromBytes + extends PTransform, PCollection>> { + + private final PCollectionView> contextSideInput; + private final List featureList; - public AnnotateVideoFromBytes( + protected AnnotateVideoFromBytes( PCollectionView> contextSideInput, List featureList) { - super(contextSideInput, featureList); + this.contextSideInput = contextSideInput; + this.featureList = featureList; } - /** Implementation of ProcessElement. */ @Override - public void processElement(ProcessContext context) - throws ExecutionException, InterruptedException { - ByteString element = context.element(); - VideoContext videoContext = null; - if (contextSideInput != null) { - videoContext = context.sideInput(contextSideInput).get(element); - } - List videoAnnotationResults = - getVideoAnnotationResults(null, element, videoContext); - context.output(videoAnnotationResults); + public PCollection> expand(PCollection input) { + return input.apply(ParDo.of(new AnnotateVideoFromBytesFn(contextSideInput, featureList))); } } /** - * Implementation of AnnotateVideo accepting KVs as contents of input PCollection. Keys are the - * GCS URIs, values - VideoContext objects. + * A PTransform taking a PCollection of {@link KV} of {@link String} and {@link VideoContext} and + * emitting lists of {@link VideoAnnotationResults} for each element. Calls Cloud AI + * VideoIntelligence. */ @Experimental - public static class AnnotateVideoURIWithContext extends AnnotateVideo> { + public static class AnnotateVideoFromURIWithContext + extends PTransform< + PCollection>, PCollection>> { - public AnnotateVideoURIWithContext(List featureList) { - super(featureList); + private final List featureList; + + protected AnnotateVideoFromURIWithContext(List featureList) { + this.featureList = featureList; } - /** ProcessElement implementation. */ @Override - public void processElement(ProcessContext context) - throws ExecutionException, InterruptedException { - String elementURI = context.element().getKey(); - VideoContext videoContext = context.element().getValue(); - List videoAnnotationResults = - getVideoAnnotationResults(elementURI, null, videoContext); - context.output(videoAnnotationResults); + public PCollection> expand( + PCollection> input) { + return input.apply(ParDo.of(new AnnotateVideoURIWithContextFn(featureList))); } } /** - * Implementation of AnnotateVideo accepting KVs as contents of input PCollection. Keys are the - * ByteString encoded video contents, values - VideoContext objects. + * A PTransform taking a PCollection of {@link KV} of {@link ByteString} and {@link VideoContext} + * and emitting lists of {@link VideoAnnotationResults} for each element. Calls Cloud AI + * VideoIntelligence. */ @Experimental - public static class AnnotateVideoBytesWithContext - extends AnnotateVideo> { + public static class AnnotateVideoFromBytesWithContext + extends PTransform< + PCollection>, PCollection>> { + + private final List featureList; - public AnnotateVideoBytesWithContext(List featureList) { - super(featureList); + protected AnnotateVideoFromBytesWithContext(List featureList) { + this.featureList = featureList; } - /** ProcessElement implementation. */ @Override - public void processElement(ProcessContext context) - throws ExecutionException, InterruptedException { - ByteString element = context.element().getKey(); - VideoContext videoContext = context.element().getValue(); - List videoAnnotationResults = - getVideoAnnotationResults(null, element, videoContext); - context.output(videoAnnotationResults); + public PCollection> expand( + PCollection> input) { + return input.apply(ParDo.of(new AnnotateVideoBytesWithContextFn(featureList))); } } } diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java index 57400e48d064..56473aae1ae2 100644 --- a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java +++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java @@ -52,22 +52,20 @@ public void shouldReturnAListOfAnnotations() throws ExecutionException, Interrup .thenReturn(Collections.singletonList(VideoAnnotationResults.newBuilder().build())); when(future.get()).thenReturn(response); when(serviceClient.annotateVideoAsync(any())).thenReturn(future); - VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes = - VideoIntelligence.annotateFromBytes( - null, Collections.singletonList(Feature.LABEL_DETECTION)); + AnnotateVideoFromBytesFn annotateVideoFromBytesFn = + new AnnotateVideoFromBytesFn(null, Collections.singletonList(Feature.LABEL_DETECTION)); - annotateVideoFromBytes.videoIntelligenceServiceClient = serviceClient; + annotateVideoFromBytesFn.videoIntelligenceServiceClient = serviceClient; List videoAnnotationResults = - annotateVideoFromBytes.getVideoAnnotationResults(TEST_URI, null, null); + annotateVideoFromBytesFn.getVideoAnnotationResults(TEST_URI, null, null); assertEquals(1, videoAnnotationResults.size()); } @Test(expected = IllegalArgumentException.class) public void shouldThrowErrorWhenBothInputTypesNull() throws ExecutionException, InterruptedException { - VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes = - VideoIntelligence.annotateFromBytes( - null, Collections.singletonList(Feature.LABEL_DETECTION)); - annotateVideoFromBytes.getVideoAnnotationResults(null, null, null); + AnnotateVideoFromBytesFn annotateVideoFromBytesFn = + new AnnotateVideoFromBytesFn(null, Collections.singletonList(Feature.LABEL_DETECTION)); + annotateVideoFromBytesFn.getVideoAnnotationResults(null, null, null); } } diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java index b0b74eeccc27..642722562384 100644 --- a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java +++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.extensions.ml; -import static org.apache.beam.sdk.extensions.ml.VideoIntelligence.annotateFromURI; import static org.junit.Assert.assertEquals; import com.google.cloud.videointelligence.v1.Feature; @@ -29,7 +28,6 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.junit.Rule; @@ -49,7 +47,7 @@ public void annotateVideoFromURINoContext() { PCollection> annotationResults = testPipeline .apply(Create.of(VIDEO_URI)) - .apply("Annotate video", ParDo.of(annotateFromURI(featureList, null))); + .apply("Annotate video", VideoIntelligence.annotateFromURI(featureList, null)); PAssert.that(annotationResults).satisfies(new VerifyVideoAnnotationResult()); testPipeline.run().waitUntilFinish(); }