From 610e18fd76e6399ccf7a0a38ff480de1a3e63aab Mon Sep 17 00:00:00 2001 From: Michal Walenia Date: Mon, 20 Apr 2020 13:14:48 +0200 Subject: [PATCH 1/3] [BEAM-9147] Make VideoIntelligence use PTransform on user-facing API --- .../beam/sdk/extensions/ml/AnnotateVideo.java | 71 +++--------- .../ml/AnnotateVideoBytesWithContextFn.java | 50 +++++++++ .../sdk/extensions/ml/AnnotateVideoFn.java | 105 ++++++++++++++++++ .../ml/AnnotateVideoFromBytesFn.java | 55 +++++++++ .../extensions/ml/AnnotateVideoFromURIFn.java | 54 +++++++++ .../ml/AnnotateVideoURIWithContextFn.java | 49 ++++++++ .../sdk/extensions/ml/VideoIntelligence.java | 103 +++++++---------- .../sdk/extensions/ml/AnnotateVideoTest.java | 16 ++- .../extensions/ml/VideoIntelligenceIT.java | 4 +- 9 files changed, 373 insertions(+), 134 deletions(-) create mode 100644 sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoBytesWithContextFn.java create mode 100644 sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java create mode 100644 sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromBytesFn.java create mode 100644 sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromURIFn.java create mode 100644 sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoURIWithContextFn.java diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java index d74829d36990..3cedb625840f 100644 --- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java @@ -17,89 +17,44 @@ */ package org.apache.beam.sdk.extensions.ml; -import com.google.api.gax.longrunning.OperationFuture; -import com.google.cloud.videointelligence.v1.AnnotateVideoProgress; -import com.google.cloud.videointelligence.v1.AnnotateVideoRequest; -import com.google.cloud.videointelligence.v1.AnnotateVideoResponse; import com.google.cloud.videointelligence.v1.Feature; import com.google.cloud.videointelligence.v1.VideoAnnotationResults; import com.google.cloud.videointelligence.v1.VideoContext; -import com.google.cloud.videointelligence.v1.VideoIntelligenceServiceClient; -import com.google.protobuf.ByteString; -import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; /** - * Base class for Video Intelligence transform. + * Base class for VideoIntelligence PTransform. * - * @param Class of input data being passed in - either ByteString - video data encoded into. - * String or String - a GCS URI of the video to be annotated. + * @param Type of input PCollection contents. */ @Experimental -public abstract class AnnotateVideo extends DoFn> { - +public abstract class AnnotateVideo + extends PTransform, PCollection>> { protected final PCollectionView> contextSideInput; protected final List featureList; - VideoIntelligenceServiceClient videoIntelligenceServiceClient; - public AnnotateVideo( + protected AnnotateVideo( PCollectionView> contextSideInput, List featureList) { this.contextSideInput = contextSideInput; this.featureList = featureList; } - public AnnotateVideo(List featureList) { - contextSideInput = null; + protected AnnotateVideo(List featureList) { + this.contextSideInput = null; this.featureList = featureList; } - @StartBundle - public void startBundle() throws IOException { - videoIntelligenceServiceClient = VideoIntelligenceServiceClient.create(); - } - - @Teardown - public void teardown() { - videoIntelligenceServiceClient.close(); - } - /** - * Call the Video Intelligence Cloud AI service and return annotation results. + * To be implemented based on input PCollection's content type. * - * @param elementURI This or elementContents is required. GCS address of video to be annotated - * @param elementContents this or elementURI is required. Hex-encoded contents of video to be - * annotated - * @param videoContext Optional context for video annotation. + * @param input * @return */ - List getVideoAnnotationResults( - String elementURI, ByteString elementContents, VideoContext videoContext) - throws InterruptedException, ExecutionException { - AnnotateVideoRequest.Builder requestBuilder = - AnnotateVideoRequest.newBuilder().addAllFeatures(featureList); - if (elementURI != null) { - requestBuilder.setInputUri(elementURI); - } else if (elementContents != null) { - requestBuilder.setInputContent(elementContents); - } else { - throw new IllegalArgumentException("Either elementURI or elementContents should be non-null"); - } - if (videoContext != null) { - requestBuilder.setVideoContext(videoContext); - } - AnnotateVideoRequest annotateVideoRequest = requestBuilder.build(); - OperationFuture annotateVideoAsync = - videoIntelligenceServiceClient.annotateVideoAsync(annotateVideoRequest); - return annotateVideoAsync.get().getAnnotationResultsList(); - } - - /** Process element implementation required. */ - @ProcessElement - public abstract void processElement(ProcessContext context) - throws ExecutionException, InterruptedException; + @Override + public abstract PCollection> expand(PCollection input); } diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoBytesWithContextFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoBytesWithContextFn.java new file mode 100644 index 000000000000..c8edcd106cdb --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoBytesWithContextFn.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.cloud.videointelligence.v1.Feature; +import com.google.cloud.videointelligence.v1.VideoAnnotationResults; +import com.google.cloud.videointelligence.v1.VideoContext; +import com.google.protobuf.ByteString; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.values.KV; + +/** + * Implementation of AnnotateVideoFn accepting KVs as contents of input PCollection. Keys are the + * ByteString encoded video contents, values - VideoContext objects. + */ +@Experimental +class AnnotateVideoBytesWithContextFn extends AnnotateVideoFn> { + + public AnnotateVideoBytesWithContextFn(List featureList) { + super(featureList); + } + + /** ProcessElement implementation. */ + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + ByteString element = context.element().getKey(); + VideoContext videoContext = context.element().getValue(); + List videoAnnotationResults = + getVideoAnnotationResults(null, element, videoContext); + context.output(videoAnnotationResults); + } +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java new file mode 100644 index 000000000000..3cf0c30025b1 --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.videointelligence.v1.AnnotateVideoProgress; +import com.google.cloud.videointelligence.v1.AnnotateVideoRequest; +import com.google.cloud.videointelligence.v1.AnnotateVideoResponse; +import com.google.cloud.videointelligence.v1.Feature; +import com.google.cloud.videointelligence.v1.VideoAnnotationResults; +import com.google.cloud.videointelligence.v1.VideoContext; +import com.google.cloud.videointelligence.v1.VideoIntelligenceServiceClient; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Base class for DoFns used in VideoIntelligence transforms. + * + * @param Class of input data being passed in - either ByteString - video data encoded into. + * String or String - a GCS URI of the video to be annotated. + */ +@Experimental +abstract class AnnotateVideoFn extends DoFn> { + + protected final PCollectionView> contextSideInput; + protected final List featureList; + VideoIntelligenceServiceClient videoIntelligenceServiceClient; + + public AnnotateVideoFn( + PCollectionView> contextSideInput, List featureList) { + this.contextSideInput = contextSideInput; + this.featureList = featureList; + } + + public AnnotateVideoFn(List featureList) { + contextSideInput = null; + this.featureList = featureList; + } + + @StartBundle + public void startBundle() throws IOException { + videoIntelligenceServiceClient = VideoIntelligenceServiceClient.create(); + } + + @Teardown + public void teardown() { + videoIntelligenceServiceClient.close(); + } + + /** + * Call the Video Intelligence Cloud AI service and return annotation results. + * + * @param elementURI This or elementContents is required. GCS address of video to be annotated + * @param elementContents this or elementURI is required. Hex-encoded contents of video to be + * annotated + * @param videoContext Optional context for video annotation. + * @return + */ + List getVideoAnnotationResults( + String elementURI, ByteString elementContents, VideoContext videoContext) + throws InterruptedException, ExecutionException { + AnnotateVideoRequest.Builder requestBuilder = + AnnotateVideoRequest.newBuilder().addAllFeatures(featureList); + if (elementURI != null) { + requestBuilder.setInputUri(elementURI); + } else if (elementContents != null) { + requestBuilder.setInputContent(elementContents); + } else { + throw new IllegalArgumentException("Either elementURI or elementContents should be non-null"); + } + if (videoContext != null) { + requestBuilder.setVideoContext(videoContext); + } + AnnotateVideoRequest annotateVideoRequest = requestBuilder.build(); + OperationFuture annotateVideoAsync = + videoIntelligenceServiceClient.annotateVideoAsync(annotateVideoRequest); + return annotateVideoAsync.get().getAnnotationResultsList(); + } + + /** Process element implementation required. */ + @ProcessElement + public abstract void processElement(ProcessContext context) + throws ExecutionException, InterruptedException; +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromBytesFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromBytesFn.java new file mode 100644 index 000000000000..20aa083e9509 --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromBytesFn.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.cloud.videointelligence.v1.Feature; +import com.google.cloud.videointelligence.v1.VideoAnnotationResults; +import com.google.cloud.videointelligence.v1.VideoContext; +import com.google.protobuf.ByteString; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Implementation of AnnotateVideoFn accepting ByteStrings as contents of input PCollection. Videos + * decoded from the ByteStrings are annotated. + */ +@Experimental +class AnnotateVideoFromBytesFn extends AnnotateVideoFn { + + public AnnotateVideoFromBytesFn( + PCollectionView> contextSideInput, List featureList) { + super(contextSideInput, featureList); + } + + /** Implementation of ProcessElement. */ + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + ByteString element = context.element(); + VideoContext videoContext = null; + if (contextSideInput != null) { + videoContext = context.sideInput(contextSideInput).get(element); + } + List videoAnnotationResults = + getVideoAnnotationResults(null, element, videoContext); + context.output(videoAnnotationResults); + } +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromURIFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromURIFn.java new file mode 100644 index 000000000000..5dfea0cc1710 --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromURIFn.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.cloud.videointelligence.v1.Feature; +import com.google.cloud.videointelligence.v1.VideoAnnotationResults; +import com.google.cloud.videointelligence.v1.VideoContext; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Implementation of AnnotateVideoFn accepting Strings as contents of input PCollection. Annotates + * videos found on GCS based on URIs from input PCollection. + */ +@Experimental +class AnnotateVideoFromURIFn extends AnnotateVideoFn { + + public AnnotateVideoFromURIFn( + PCollectionView> contextSideInput, List featureList) { + super(contextSideInput, featureList); + } + + /** ProcessElement implementation. */ + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + String elementURI = context.element(); + VideoContext videoContext = null; + if (contextSideInput != null) { + videoContext = context.sideInput(contextSideInput).get(elementURI); + } + List annotationResultsList = + getVideoAnnotationResults(elementURI, null, videoContext); + context.output(annotationResultsList); + } +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoURIWithContextFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoURIWithContextFn.java new file mode 100644 index 000000000000..a165d5ac9073 --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoURIWithContextFn.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.cloud.videointelligence.v1.Feature; +import com.google.cloud.videointelligence.v1.VideoAnnotationResults; +import com.google.cloud.videointelligence.v1.VideoContext; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.values.KV; + +/** + * Implementation of AnnotateVideoFn accepting KVs as contents of input PCollection. Keys are the + * GCS URIs, values - VideoContext objects. + */ +@Experimental +class AnnotateVideoURIWithContextFn extends AnnotateVideoFn> { + + public AnnotateVideoURIWithContextFn(List featureList) { + super(featureList); + } + + /** ProcessElement implementation. */ + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + String elementURI = context.element().getKey(); + VideoContext videoContext = context.element().getValue(); + List videoAnnotationResults = + getVideoAnnotationResults(elementURI, null, videoContext); + context.output(videoAnnotationResults); + } +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java index 9278e542dcb9..5fa25d6737dc 100644 --- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java @@ -23,9 +23,10 @@ import com.google.protobuf.ByteString; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; /** @@ -46,11 +47,11 @@ public class VideoIntelligence { * * @param featureList List of features to be annotated * @param contextSideInput Optional side input with map of contexts to URIs - * @return DoFn performing the necessary operations + * @return PTransform performing the necessary operations */ - public static AnnotateVideoFromURI annotateFromURI( + public static AnnotateVideoFromUri annotateFromURI( List featureList, PCollectionView> contextSideInput) { - return new AnnotateVideoFromURI(contextSideInput, featureList); + return new AnnotateVideoFromUri(contextSideInput, featureList); } /** @@ -58,7 +59,7 @@ public static AnnotateVideoFromURI annotateFromURI( * * @param featureList List of features to be annotated * @param contextSideInput Optional side input with map of contexts to ByteStrings - * @return DoFn performing the necessary operations + * @return PTransform performing the necessary operations */ public static AnnotateVideoFromBytes annotateFromBytes( PCollectionView> contextSideInput, List featureList) { @@ -69,122 +70,96 @@ public static AnnotateVideoFromBytes annotateFromBytes( * Annotates videos from key-value pairs of GCS URI and VideoContext. * * @param featureList List of features to be annotated - * @return DoFn performing the necessary operations + * @return PTransform performing the necessary operations */ - public static AnnotateVideoURIWithContext annotateFromUriWithContext(List featureList) { - return new AnnotateVideoURIWithContext(featureList); + public static AnnotateVideoFromURIWithContext annotateFromUriWithContext( + List featureList) { + return new AnnotateVideoFromURIWithContext(featureList); } /** * Annotates videos from key-value pairs of ByteStrings and VideoContext. * * @param featureList List of features to be annotated - * @return DoFn performing the necessary operations + * @return PTransform performing the necessary operations */ - public static AnnotateVideoBytesWithContext annotateFromBytesWithContext( + public static AnnotateVideoFromBytesWithContext annotateFromBytesWithContext( List featureList) { - return new AnnotateVideoBytesWithContext(featureList); + return new AnnotateVideoFromBytesWithContext(featureList); } /** - * Implementation of AnnotateVideo accepting Strings as contents of input PCollection. Annotates - * videos found on GCS based on URIs from input PCollection. + * Implementation of {@link AnnotateVideo} taking a PCollection of {@link String} and an optional + * side input with a context map. */ @Experimental - public static class AnnotateVideoFromURI extends AnnotateVideo { + public static class AnnotateVideoFromUri extends AnnotateVideo { - public AnnotateVideoFromURI( + protected AnnotateVideoFromUri( PCollectionView> contextSideInput, List featureList) { super(contextSideInput, featureList); } - /** ProcessElement implementation. */ @Override - public void processElement(ProcessContext context) - throws ExecutionException, InterruptedException { - String elementURI = context.element(); - VideoContext videoContext = null; - if (contextSideInput != null) { - videoContext = context.sideInput(contextSideInput).get(elementURI); - } - List annotationResultsList = - getVideoAnnotationResults(elementURI, null, videoContext); - context.output(annotationResultsList); + public PCollection> expand(PCollection input) { + return input.apply(ParDo.of(new AnnotateVideoFromURIFn(contextSideInput, featureList))); } } /** - * Implementation of AnnotateVideo accepting ByteStrings as contents of input PCollection. Videos - * decoded from the ByteStrings are annotated. + * Implementation of {@link AnnotateVideo} taking a PCollection of {@link ByteString} and an + * optional side input with a context map. */ @Experimental public static class AnnotateVideoFromBytes extends AnnotateVideo { - public AnnotateVideoFromBytes( + protected AnnotateVideoFromBytes( PCollectionView> contextSideInput, List featureList) { super(contextSideInput, featureList); } - /** Implementation of ProcessElement. */ @Override - public void processElement(ProcessContext context) - throws ExecutionException, InterruptedException { - ByteString element = context.element(); - VideoContext videoContext = null; - if (contextSideInput != null) { - videoContext = context.sideInput(contextSideInput).get(element); - } - List videoAnnotationResults = - getVideoAnnotationResults(null, element, videoContext); - context.output(videoAnnotationResults); + public PCollection> expand(PCollection input) { + return input.apply(ParDo.of(new AnnotateVideoFromBytesFn(contextSideInput, featureList))); } } /** - * Implementation of AnnotateVideo accepting KVs as contents of input PCollection. Keys are the - * GCS URIs, values - VideoContext objects. + * Implementation of {@link AnnotateVideo} taking a PCollection of {@link KV} of {@link String} + * and {@link VideoContext}. */ @Experimental - public static class AnnotateVideoURIWithContext extends AnnotateVideo> { + public static class AnnotateVideoFromURIWithContext + extends AnnotateVideo> { - public AnnotateVideoURIWithContext(List featureList) { + protected AnnotateVideoFromURIWithContext(List featureList) { super(featureList); } - /** ProcessElement implementation. */ @Override - public void processElement(ProcessContext context) - throws ExecutionException, InterruptedException { - String elementURI = context.element().getKey(); - VideoContext videoContext = context.element().getValue(); - List videoAnnotationResults = - getVideoAnnotationResults(elementURI, null, videoContext); - context.output(videoAnnotationResults); + public PCollection> expand( + PCollection> input) { + return input.apply(ParDo.of(new AnnotateVideoURIWithContextFn(featureList))); } } /** - * Implementation of AnnotateVideo accepting KVs as contents of input PCollection. Keys are the - * ByteString encoded video contents, values - VideoContext objects. + * Implementation of {@link AnnotateVideo} taking a PCollection of {@link KV} of {@link + * ByteString} and {@link VideoContext}. */ @Experimental - public static class AnnotateVideoBytesWithContext + public static class AnnotateVideoFromBytesWithContext extends AnnotateVideo> { - public AnnotateVideoBytesWithContext(List featureList) { + protected AnnotateVideoFromBytesWithContext(List featureList) { super(featureList); } - /** ProcessElement implementation. */ @Override - public void processElement(ProcessContext context) - throws ExecutionException, InterruptedException { - ByteString element = context.element().getKey(); - VideoContext videoContext = context.element().getValue(); - List videoAnnotationResults = - getVideoAnnotationResults(null, element, videoContext); - context.output(videoAnnotationResults); + public PCollection> expand( + PCollection> input) { + return input.apply(ParDo.of(new AnnotateVideoBytesWithContextFn(featureList))); } } } diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java index 57400e48d064..56473aae1ae2 100644 --- a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java +++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java @@ -52,22 +52,20 @@ public void shouldReturnAListOfAnnotations() throws ExecutionException, Interrup .thenReturn(Collections.singletonList(VideoAnnotationResults.newBuilder().build())); when(future.get()).thenReturn(response); when(serviceClient.annotateVideoAsync(any())).thenReturn(future); - VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes = - VideoIntelligence.annotateFromBytes( - null, Collections.singletonList(Feature.LABEL_DETECTION)); + AnnotateVideoFromBytesFn annotateVideoFromBytesFn = + new AnnotateVideoFromBytesFn(null, Collections.singletonList(Feature.LABEL_DETECTION)); - annotateVideoFromBytes.videoIntelligenceServiceClient = serviceClient; + annotateVideoFromBytesFn.videoIntelligenceServiceClient = serviceClient; List videoAnnotationResults = - annotateVideoFromBytes.getVideoAnnotationResults(TEST_URI, null, null); + annotateVideoFromBytesFn.getVideoAnnotationResults(TEST_URI, null, null); assertEquals(1, videoAnnotationResults.size()); } @Test(expected = IllegalArgumentException.class) public void shouldThrowErrorWhenBothInputTypesNull() throws ExecutionException, InterruptedException { - VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes = - VideoIntelligence.annotateFromBytes( - null, Collections.singletonList(Feature.LABEL_DETECTION)); - annotateVideoFromBytes.getVideoAnnotationResults(null, null, null); + AnnotateVideoFromBytesFn annotateVideoFromBytesFn = + new AnnotateVideoFromBytesFn(null, Collections.singletonList(Feature.LABEL_DETECTION)); + annotateVideoFromBytesFn.getVideoAnnotationResults(null, null, null); } } diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java index b0b74eeccc27..642722562384 100644 --- a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java +++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.extensions.ml; -import static org.apache.beam.sdk.extensions.ml.VideoIntelligence.annotateFromURI; import static org.junit.Assert.assertEquals; import com.google.cloud.videointelligence.v1.Feature; @@ -29,7 +28,6 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.junit.Rule; @@ -49,7 +47,7 @@ public void annotateVideoFromURINoContext() { PCollection> annotationResults = testPipeline .apply(Create.of(VIDEO_URI)) - .apply("Annotate video", ParDo.of(annotateFromURI(featureList, null))); + .apply("Annotate video", VideoIntelligence.annotateFromURI(featureList, null)); PAssert.that(annotationResults).satisfies(new VerifyVideoAnnotationResult()); testPipeline.run().waitUntilFinish(); } From abe777c202df0173889fbfc0687e577876e9756e Mon Sep 17 00:00:00 2001 From: Michal Walenia Date: Mon, 20 Apr 2020 13:19:29 +0200 Subject: [PATCH 2/3] Add position to CHANGES.md --- CHANGES.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index cb0f4e89bb6c..2b159cedb373 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -121,6 +121,9 @@ conversion to beam schema options. *Remark: Schema aware is still experimental.* values as strings) into Python native types that are written to Avro (Python's date, datetime types, decimal, etc). For more information see https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#avro_conversions. +* Added integration of Java SDK with Google Cloud AI VideoIntelligence service +([BEAM-9147](https://issues.apache.org/jira/browse/BEAM-9147)) + ## Breaking Changes From d8baca4a31544deac46068d4fbc3a08c03dc8eeb Mon Sep 17 00:00:00 2001 From: Michal Walenia Date: Tue, 21 Apr 2020 14:39:59 +0200 Subject: [PATCH 3/3] Remove abstract AnnotateVideo PTransform --- .../beam/sdk/extensions/ml/AnnotateVideo.java | 60 ------------------- .../sdk/extensions/ml/AnnotateVideoFn.java | 4 +- .../sdk/extensions/ml/VideoIntelligence.java | 59 ++++++++++++------ 3 files changed, 42 insertions(+), 81 deletions(-) delete mode 100644 sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java deleted file mode 100644 index 3cedb625840f..000000000000 --- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.ml; - -import com.google.cloud.videointelligence.v1.Feature; -import com.google.cloud.videointelligence.v1.VideoAnnotationResults; -import com.google.cloud.videointelligence.v1.VideoContext; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; - -/** - * Base class for VideoIntelligence PTransform. - * - * @param Type of input PCollection contents. - */ -@Experimental -public abstract class AnnotateVideo - extends PTransform, PCollection>> { - protected final PCollectionView> contextSideInput; - protected final List featureList; - - protected AnnotateVideo( - PCollectionView> contextSideInput, List featureList) { - this.contextSideInput = contextSideInput; - this.featureList = featureList; - } - - protected AnnotateVideo(List featureList) { - this.contextSideInput = null; - this.featureList = featureList; - } - - /** - * To be implemented based on input PCollection's content type. - * - * @param input - * @return - */ - @Override - public abstract PCollection> expand(PCollection input); -} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java index 3cf0c30025b1..a954ff330b88 100644 --- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java @@ -58,8 +58,8 @@ public AnnotateVideoFn(List featureList) { this.featureList = featureList; } - @StartBundle - public void startBundle() throws IOException { + @Setup + public void setup() throws IOException { videoIntelligenceServiceClient = VideoIntelligenceServiceClient.create(); } diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java index 5fa25d6737dc..fcee65a38ce5 100644 --- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java @@ -24,15 +24,16 @@ import java.util.List; import java.util.Map; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; /** - * Factory class for AnnotateVideo subclasses. allows integration with Google Cloud AI - - * VideoIntelligence service. Converts GCS URIs of videos or ByteStrings with video contents into - * Lists of VideoAnnotationResults. + * Factory class for PTransforms integrating with Google Cloud AI - VideoIntelligence service. + * Converts GCS URIs of videos or ByteStrings with video contents into Lists of + * VideoAnnotationResults. * *

Adding a side input of Maps of elements to VideoContext objects is allowed, so is using KVs of * element and VideoContext as input. @@ -89,15 +90,21 @@ public static AnnotateVideoFromBytesWithContext annotateFromBytesWithContext( } /** - * Implementation of {@link AnnotateVideo} taking a PCollection of {@link String} and an optional - * side input with a context map. + * A PTransform taking a PCollection of {@link String} and an optional side input with a context + * map and emitting lists of {@link VideoAnnotationResults} for each element. Calls Cloud AI + * VideoIntelligence. */ @Experimental - public static class AnnotateVideoFromUri extends AnnotateVideo { + public static class AnnotateVideoFromUri + extends PTransform, PCollection>> { + + private final PCollectionView> contextSideInput; + private final List featureList; protected AnnotateVideoFromUri( PCollectionView> contextSideInput, List featureList) { - super(contextSideInput, featureList); + this.contextSideInput = contextSideInput; + this.featureList = featureList; } @Override @@ -107,16 +114,22 @@ public PCollection> expand(PCollection inpu } /** - * Implementation of {@link AnnotateVideo} taking a PCollection of {@link ByteString} and an - * optional side input with a context map. + * A PTransform taking a PCollection of {@link ByteString} and an optional side input with a + * context map and emitting lists of {@link VideoAnnotationResults} for each element. Calls Cloud + * AI VideoIntelligence. */ @Experimental - public static class AnnotateVideoFromBytes extends AnnotateVideo { + public static class AnnotateVideoFromBytes + extends PTransform, PCollection>> { + + private final PCollectionView> contextSideInput; + private final List featureList; protected AnnotateVideoFromBytes( PCollectionView> contextSideInput, List featureList) { - super(contextSideInput, featureList); + this.contextSideInput = contextSideInput; + this.featureList = featureList; } @Override @@ -126,15 +139,19 @@ public PCollection> expand(PCollection } /** - * Implementation of {@link AnnotateVideo} taking a PCollection of {@link KV} of {@link String} - * and {@link VideoContext}. + * A PTransform taking a PCollection of {@link KV} of {@link String} and {@link VideoContext} and + * emitting lists of {@link VideoAnnotationResults} for each element. Calls Cloud AI + * VideoIntelligence. */ @Experimental public static class AnnotateVideoFromURIWithContext - extends AnnotateVideo> { + extends PTransform< + PCollection>, PCollection>> { + + private final List featureList; protected AnnotateVideoFromURIWithContext(List featureList) { - super(featureList); + this.featureList = featureList; } @Override @@ -145,15 +162,19 @@ public PCollection> expand( } /** - * Implementation of {@link AnnotateVideo} taking a PCollection of {@link KV} of {@link - * ByteString} and {@link VideoContext}. + * A PTransform taking a PCollection of {@link KV} of {@link ByteString} and {@link VideoContext} + * and emitting lists of {@link VideoAnnotationResults} for each element. Calls Cloud AI + * VideoIntelligence. */ @Experimental public static class AnnotateVideoFromBytesWithContext - extends AnnotateVideo> { + extends PTransform< + PCollection>, PCollection>> { + + private final List featureList; protected AnnotateVideoFromBytesWithContext(List featureList) { - super(featureList); + this.featureList = featureList; } @Override