From a7f55e60c1cfe39112331dd4e2b34bc522a2c77a Mon Sep 17 00:00:00 2001 From: Michal Walenia Date: Tue, 24 Mar 2020 15:41:31 +0100 Subject: [PATCH 1/5] [BEAM-9147] Add VideoIntelligence transform --- sdks/java/extensions/ml/build.gradle | 34 +++++ .../beam/sdk/extensions/ml/AnnotateVideo.java | 81 +++++++++++ .../sdk/extensions/ml/VideoIntelligence.java | 126 ++++++++++++++++++ .../sdk/extensions/ml/AnnotateVideoTest.java | 73 ++++++++++ .../org.mockito.plugins.MockMaker | 1 + settings.gradle | 3 + 6 files changed, 318 insertions(+) create mode 100644 sdks/java/extensions/ml/build.gradle create mode 100644 sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java create mode 100644 sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java create mode 100644 sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java create mode 100644 sdks/java/extensions/ml/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/sdks/java/extensions/ml/build.gradle b/sdks/java/extensions/ml/build.gradle new file mode 100644 index 000000000000..482decceeb25 --- /dev/null +++ b/sdks/java/extensions/ml/build.gradle @@ -0,0 +1,34 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +plugins { id 'org.apache.beam.module' } +applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.extensions.protobuf') + +description = 'Apache Beam :: SDKs :: Java :: Extensions :: ML' + +dependencies { + compile project(path: ":sdks:java:core", configuration: "shadow") + compile project(":sdks:java:expansion-service") + testCompile project(path: ':sdks:java:core', configuration: 'shadowTest') + compile 'com.google.cloud:google-cloud-video-intelligence:1.2.0' + testCompile library.java.mockito_core + testCompile 'com.google.cloud:google-cloud-video-intelligence:1.2.0' + testCompile library.java.junit +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java new file mode 100644 index 000000000000..edb5c049c643 --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.videointelligence.v1.*; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.PCollectionView; + +public abstract class AnnotateVideo extends DoFn> { + + protected final PCollectionView> contextSideInput; + protected final List featureList; + VideoIntelligenceServiceClient videoIntelligenceServiceClient; + + public AnnotateVideo( + PCollectionView> contextSideInput, List featureList) { + this.contextSideInput = contextSideInput; + this.featureList = featureList; + } + + public AnnotateVideo(List featureList) { + contextSideInput = null; + this.featureList = featureList; + } + + @StartBundle + public void startBundle() throws IOException { + videoIntelligenceServiceClient = VideoIntelligenceServiceClient.create(); + } + + @Teardown + public void teardown() { + videoIntelligenceServiceClient.close(); + } + + List getVideoAnnotationResults( + String elementURI, ByteString elementContents, VideoContext videoContext) + throws InterruptedException, ExecutionException { + AnnotateVideoRequest.Builder requestBuilder = + AnnotateVideoRequest.newBuilder().addAllFeatures(featureList); + if (elementURI != null) { + requestBuilder.setInputUri(elementURI); + } else if (elementContents != null) { + requestBuilder.setInputContent(elementContents); + } else { + throw new IllegalArgumentException("Either elementURI or elementContents should be non-null"); + } + if (videoContext != null) { + requestBuilder.setVideoContext(videoContext); + } + AnnotateVideoRequest annotateVideoRequest = requestBuilder.build(); + OperationFuture annotateVideoAsync = + videoIntelligenceServiceClient.annotateVideoAsync(annotateVideoRequest); + return annotateVideoAsync.get().getAnnotationResultsList(); + } + + @ProcessElement + public abstract void processElement(ProcessContext context) + throws ExecutionException, InterruptedException; +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java new file mode 100644 index 000000000000..4b01de0b47b2 --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.cloud.videointelligence.v1.*; +import com.google.protobuf.ByteString; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; + +public class VideoIntelligence { + + public static AnnotateVideoFromURI annotateFromURI( + List featureList, PCollectionView> contextSideInput) { + return new AnnotateVideoFromURI(contextSideInput, featureList); + } + + public static AnnotateVideoFromBytes annotateFromBytes( + PCollectionView> contextSideInput, List featureList) { + return new AnnotateVideoFromBytes(contextSideInput, featureList); + } + + public static AnnotateVideoURIWithContext annotateFromUriWithContext(List featureList) { + return new AnnotateVideoURIWithContext(featureList); + } + + public static AnnotateVideoBytesWithContext annotateFromBytesWithContext( + List featureList) { + return new AnnotateVideoBytesWithContext(featureList); + } + + public static class AnnotateVideoFromURI extends AnnotateVideo { + + public AnnotateVideoFromURI( + PCollectionView> contextSideInput, List featureList) { + super(contextSideInput, featureList); + } + + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + String elementURI = context.element(); + VideoContext videoContext = null; + if (contextSideInput != null) { + videoContext = context.sideInput(contextSideInput).get(elementURI); + } + List annotationResultsList = + getVideoAnnotationResults(elementURI, null, videoContext); + context.output(annotationResultsList); + } + } + + public static class AnnotateVideoFromBytes extends AnnotateVideo { + + public AnnotateVideoFromBytes( + PCollectionView> contextSideInput, + List featureList) { + super(contextSideInput, featureList); + } + + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + ByteString element = context.element(); + VideoContext videoContext = null; + if (contextSideInput != null) { + videoContext = context.sideInput(contextSideInput).get(element); + } + List videoAnnotationResults = + getVideoAnnotationResults(null, element, videoContext); + context.output(videoAnnotationResults); + } + } + + public static class AnnotateVideoURIWithContext extends AnnotateVideo> { + + public AnnotateVideoURIWithContext(List featureList) { + super(featureList); + } + + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + String elementURI = context.element().getKey(); + VideoContext videoContext = context.element().getValue(); + List videoAnnotationResults = + getVideoAnnotationResults(elementURI, null, videoContext); + context.output(videoAnnotationResults); + } + } + + public static class AnnotateVideoBytesWithContext + extends AnnotateVideo> { + + public AnnotateVideoBytesWithContext(List featureList) { + super(featureList); + } + + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + ByteString element = context.element().getKey(); + VideoContext videoContext = context.element().getValue(); + List videoAnnotationResults = + getVideoAnnotationResults(null, element, videoContext); + context.output(videoAnnotationResults); + } + } +} diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java new file mode 100644 index 000000000000..57400e48d064 --- /dev/null +++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.videointelligence.v1.AnnotateVideoProgress; +import com.google.cloud.videointelligence.v1.AnnotateVideoResponse; +import com.google.cloud.videointelligence.v1.Feature; +import com.google.cloud.videointelligence.v1.VideoAnnotationResults; +import com.google.cloud.videointelligence.v1.VideoIntelligenceServiceClient; +import com.google.protobuf.ByteString; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class AnnotateVideoTest { + + private static final String TEST_URI = "fake_uri"; + private static final ByteString TEST_BYTES = ByteString.copyFromUtf8("12345"); + + @Mock private VideoIntelligenceServiceClient serviceClient; + @Mock private OperationFuture future; + @Mock private AnnotateVideoResponse response; + + @Test + public void shouldReturnAListOfAnnotations() throws ExecutionException, InterruptedException { + when(response.getAnnotationResultsList()) + .thenReturn(Collections.singletonList(VideoAnnotationResults.newBuilder().build())); + when(future.get()).thenReturn(response); + when(serviceClient.annotateVideoAsync(any())).thenReturn(future); + VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes = + VideoIntelligence.annotateFromBytes( + null, Collections.singletonList(Feature.LABEL_DETECTION)); + + annotateVideoFromBytes.videoIntelligenceServiceClient = serviceClient; + List videoAnnotationResults = + annotateVideoFromBytes.getVideoAnnotationResults(TEST_URI, null, null); + assertEquals(1, videoAnnotationResults.size()); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowErrorWhenBothInputTypesNull() + throws ExecutionException, InterruptedException { + VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes = + VideoIntelligence.annotateFromBytes( + null, Collections.singletonList(Feature.LABEL_DETECTION)); + annotateVideoFromBytes.getVideoAnnotationResults(null, null, null); + } +} diff --git a/sdks/java/extensions/ml/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/sdks/java/extensions/ml/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 000000000000..1f0955d450f0 --- /dev/null +++ b/sdks/java/extensions/ml/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline diff --git a/settings.gradle b/settings.gradle index a25f3570df07..b7c832a773c0 100644 --- a/settings.gradle +++ b/settings.gradle @@ -165,3 +165,6 @@ include "beam-test-infra-metrics" project(":beam-test-infra-metrics").dir = file(".test-infra/metrics") include "beam-test-tools" project(":beam-test-tools").dir = file(".test-infra/tools") +include 'sdks:java:extensions:ml' +findProject(':sdks:java:extensions:ml')?.name = 'ml' + From 4d383b78d847755006e193d5481d09ccd1e7d0d0 Mon Sep 17 00:00:00 2001 From: Michal Walenia Date: Fri, 27 Mar 2020 15:43:29 +0100 Subject: [PATCH 2/5] [BEAM-9147] Add Video Intelligence integration test --- sdks/java/extensions/ml/build.gradle | 6 ++ .../extensions/ml/VideoIntelligenceIT.java | 79 +++++++++++++++++++ 2 files changed, 85 insertions(+) create mode 100644 sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java diff --git a/sdks/java/extensions/ml/build.gradle b/sdks/java/extensions/ml/build.gradle index 482decceeb25..274c0747e00a 100644 --- a/sdks/java/extensions/ml/build.gradle +++ b/sdks/java/extensions/ml/build.gradle @@ -31,4 +31,10 @@ dependencies { testCompile library.java.mockito_core testCompile 'com.google.cloud:google-cloud-video-intelligence:1.2.0' testCompile library.java.junit + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") + testRuntimeOnly project(":runners:google-cloud-dataflow-java") +} + +project.test { + include "**/**IT.class" } diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java new file mode 100644 index 000000000000..3ff18ceff63a --- /dev/null +++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import static org.apache.beam.sdk.extensions.ml.VideoIntelligence.annotateFromURI; +import static org.junit.Assert.assertEquals; + +import com.google.cloud.videointelligence.v1.Feature; +import com.google.cloud.videointelligence.v1.VideoAnnotationResults; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class VideoIntelligenceIT { + @Rule public TestPipeline testPipeline = TestPipeline.create(); + private static final String VIDEO_URI = + "gs://apache-beam-samples/advanced_analytics/video/gbikes_dinosaur.mp4"; + private List featureList = Collections.singletonList(Feature.LABEL_DETECTION); + + @Test + public void annotateVideoFromURINoContext() { + PCollection> annotationResults = + testPipeline + .apply(Create.of(VIDEO_URI)) + .apply("Annotate video", ParDo.of(annotateFromURI(featureList, null))); + PAssert.that(annotationResults).satisfies(new VerifyVideoAnnotationResult()); + testPipeline.run().waitUntilFinish(); + } + + private static class VerifyVideoAnnotationResult + implements SerializableFunction>, Void> { + + @Override + public Void apply(Iterable> input) { + List labelEvaluations = new ArrayList<>(); + input.forEach( + videoAnnotationResults -> + labelEvaluations.add( + videoAnnotationResults.stream() + .anyMatch( + result -> + result.getSegmentLabelAnnotationsList().stream() + .anyMatch( + labelAnnotation -> + labelAnnotation + .getEntity() + .getDescription() + .equals("dinosaur"))))); + assertEquals(Boolean.TRUE, labelEvaluations.contains(Boolean.TRUE)); + return null; + } + } +} From d1c23bf7b654d2561ae977fd43122559811af7b2 Mon Sep 17 00:00:00 2001 From: Michal Walenia Date: Mon, 30 Mar 2020 12:12:20 +0200 Subject: [PATCH 3/5] [BEAM-9147] Add documentation of VideoIntelligence transforms --- build.gradle | 3 +- .../beam/sdk/extensions/ml/AnnotateVideo.java | 16 ++++++ .../sdk/extensions/ml/VideoIntelligence.java | 56 +++++++++++++++++++ 3 files changed, 74 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 2a5d5411b68a..021ba5bc9cc6 100644 --- a/build.gradle +++ b/build.gradle @@ -107,7 +107,8 @@ rat { "learning/katas/*/IO/**/*.txt", // Mockito extensions - "sdks/java/io/amazon-web-services2/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker" + "sdks/java/io/amazon-web-services2/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker", + "sdks/java/extensions/ml/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker" ] // Add .gitignore excludes to the Apache Rat exclusion list. We re-create the behavior diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java index edb5c049c643..6e7de6f603d3 100644 --- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java @@ -27,6 +27,12 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.PCollectionView; +/** + * Base class for Video Intelligence transform. + * + * @param Class of input data being passed in - either ByteString - video data encoded into + * String or String - a GCS URI of the video to be annotated + */ public abstract class AnnotateVideo extends DoFn> { protected final PCollectionView> contextSideInput; @@ -54,6 +60,15 @@ public void teardown() { videoIntelligenceServiceClient.close(); } + /** + * Call the Video Intelligence Cloud AI service and return annotation results + * + * @param elementURI This or elementContents is required. GCS address of video to be annotated + * @param elementContents this or elementURI is required. Hex-encoded contents of video to be + * annotated + * @param videoContext Optional context for video annotation. + * @return + */ List getVideoAnnotationResults( String elementURI, ByteString elementContents, VideoContext videoContext) throws InterruptedException, ExecutionException { @@ -75,6 +90,7 @@ List getVideoAnnotationResults( return annotateVideoAsync.get().getAnnotationResultsList(); } + /** Process element implementation required. */ @ProcessElement public abstract void processElement(ProcessContext context) throws ExecutionException, InterruptedException; diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java index 4b01de0b47b2..267f65b35ed9 100644 --- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java @@ -25,27 +25,67 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; +/** + * Factory class for AnnotateVideo subclasses. allows integration with Google Cloud AI - + * VideoIntelligence service. Converts GCS URIs of videos or ByteStrings with video contents into + * Lists of VideoAnnotationResults. + * + *

Adding a side input of Maps of elements to VideoContext objects is allowed, so is using KVs of + * element and VideoContext as input. + * + *

Service account with proper permissions is required to use these transforms. + */ public class VideoIntelligence { + /** + * Annotates videos from GCS URIs. + * + * @param featureList List of features to be annotated + * @param contextSideInput Optional side input with map of contexts to URIs + * @return DoFn performing the necessary operations + */ public static AnnotateVideoFromURI annotateFromURI( List featureList, PCollectionView> contextSideInput) { return new AnnotateVideoFromURI(contextSideInput, featureList); } + /** + * Annotates videos from ByteStrings of their contents. + * + * @param featureList List of features to be annotated + * @param contextSideInput Optional side input with map of contexts to ByteStrings + * @return DoFn performing the necessary operations + */ public static AnnotateVideoFromBytes annotateFromBytes( PCollectionView> contextSideInput, List featureList) { return new AnnotateVideoFromBytes(contextSideInput, featureList); } + /** + * Annotates videos from key-value pairs of GCS URI and VideoContext + * + * @param featureList List of features to be annotated + * @return DoFn performing the necessary operations + */ public static AnnotateVideoURIWithContext annotateFromUriWithContext(List featureList) { return new AnnotateVideoURIWithContext(featureList); } + /** + * Annotates videos from key-value pairs of ByteStrings and VideoContext + * + * @param featureList List of features to be annotated + * @return DoFn performing the necessary operations + */ public static AnnotateVideoBytesWithContext annotateFromBytesWithContext( List featureList) { return new AnnotateVideoBytesWithContext(featureList); } + /** + * Implementation of AnnotateVideo accepting Strings as contents of input PCollection. Annotates + * videos found on GCS based on URIs from input PCollection + */ public static class AnnotateVideoFromURI extends AnnotateVideo { public AnnotateVideoFromURI( @@ -53,6 +93,7 @@ public AnnotateVideoFromURI( super(contextSideInput, featureList); } + /** ProcessElement implementation. */ @Override public void processElement(ProcessContext context) throws ExecutionException, InterruptedException { @@ -67,6 +108,10 @@ public void processElement(ProcessContext context) } } + /** + * Implementation of AnnotateVideo accepting ByteStrings as contents of input PCollection. Videos + * decoded from the ByteStrings are annotated. + */ public static class AnnotateVideoFromBytes extends AnnotateVideo { public AnnotateVideoFromBytes( @@ -75,6 +120,7 @@ public AnnotateVideoFromBytes( super(contextSideInput, featureList); } + /** Implementation of ProcessElement */ @Override public void processElement(ProcessContext context) throws ExecutionException, InterruptedException { @@ -89,12 +135,17 @@ public void processElement(ProcessContext context) } } + /** + * Implementation of AnnotateVideo accepting KVs as contents of input PCollection. Keys are the + * GCS URIs, values - VideoContext objects. + */ public static class AnnotateVideoURIWithContext extends AnnotateVideo> { public AnnotateVideoURIWithContext(List featureList) { super(featureList); } + /** ProcessElement implementation */ @Override public void processElement(ProcessContext context) throws ExecutionException, InterruptedException { @@ -106,6 +157,10 @@ public void processElement(ProcessContext context) } } + /** + * Implementation of AnnotateVideo accepting KVs as contents of input PCollection. Keys are the + * ByteString encoded video contents, values - VideoContext objects. + */ public static class AnnotateVideoBytesWithContext extends AnnotateVideo> { @@ -113,6 +168,7 @@ public AnnotateVideoBytesWithContext(List featureList) { super(featureList); } + /** ProcessElement implementation */ @Override public void processElement(ProcessContext context) throws ExecutionException, InterruptedException { From 86744f5671b2b7b5022dcd775a25ae0175b90b97 Mon Sep 17 00:00:00 2001 From: Michal Walenia Date: Mon, 30 Mar 2020 13:36:59 +0200 Subject: [PATCH 4/5] Make checkstyle happy --- .../beam/sdk/extensions/ml/AnnotateVideo.java | 14 ++++++++++---- .../sdk/extensions/ml/VideoIntelligence.java | 16 +++++++++------- .../beam/sdk/extensions/ml/package-info.java | 19 +++++++++++++++++++ 3 files changed, 38 insertions(+), 11 deletions(-) create mode 100644 sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java index 6e7de6f603d3..56e863843eea 100644 --- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java @@ -18,7 +18,13 @@ package org.apache.beam.sdk.extensions.ml; import com.google.api.gax.longrunning.OperationFuture; -import com.google.cloud.videointelligence.v1.*; +import com.google.cloud.videointelligence.v1.AnnotateVideoProgress; +import com.google.cloud.videointelligence.v1.AnnotateVideoRequest; +import com.google.cloud.videointelligence.v1.AnnotateVideoResponse; +import com.google.cloud.videointelligence.v1.Feature; +import com.google.cloud.videointelligence.v1.VideoAnnotationResults; +import com.google.cloud.videointelligence.v1.VideoContext; +import com.google.cloud.videointelligence.v1.VideoIntelligenceServiceClient; import com.google.protobuf.ByteString; import java.io.IOException; import java.util.List; @@ -30,8 +36,8 @@ /** * Base class for Video Intelligence transform. * - * @param Class of input data being passed in - either ByteString - video data encoded into - * String or String - a GCS URI of the video to be annotated + * @param Class of input data being passed in - either ByteString - video data encoded into. + * String or String - a GCS URI of the video to be annotated. */ public abstract class AnnotateVideo extends DoFn> { @@ -61,7 +67,7 @@ public void teardown() { } /** - * Call the Video Intelligence Cloud AI service and return annotation results + * Call the Video Intelligence Cloud AI service and return annotation results. * * @param elementURI This or elementContents is required. GCS address of video to be annotated * @param elementContents this or elementURI is required. Hex-encoded contents of video to be diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java index 267f65b35ed9..0f447da6498f 100644 --- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java @@ -17,7 +17,9 @@ */ package org.apache.beam.sdk.extensions.ml; -import com.google.cloud.videointelligence.v1.*; +import com.google.cloud.videointelligence.v1.Feature; +import com.google.cloud.videointelligence.v1.VideoAnnotationResults; +import com.google.cloud.videointelligence.v1.VideoContext; import com.google.protobuf.ByteString; import java.util.List; import java.util.Map; @@ -62,7 +64,7 @@ public static AnnotateVideoFromBytes annotateFromBytes( } /** - * Annotates videos from key-value pairs of GCS URI and VideoContext + * Annotates videos from key-value pairs of GCS URI and VideoContext. * * @param featureList List of features to be annotated * @return DoFn performing the necessary operations @@ -72,7 +74,7 @@ public static AnnotateVideoURIWithContext annotateFromUriWithContext(List { @@ -120,7 +122,7 @@ public AnnotateVideoFromBytes( super(contextSideInput, featureList); } - /** Implementation of ProcessElement */ + /** Implementation of ProcessElement. */ @Override public void processElement(ProcessContext context) throws ExecutionException, InterruptedException { @@ -145,7 +147,7 @@ public AnnotateVideoURIWithContext(List featureList) { super(featureList); } - /** ProcessElement implementation */ + /** ProcessElement implementation. */ @Override public void processElement(ProcessContext context) throws ExecutionException, InterruptedException { @@ -168,7 +170,7 @@ public AnnotateVideoBytesWithContext(List featureList) { super(featureList); } - /** ProcessElement implementation */ + /** ProcessElement implementation. */ @Override public void processElement(ProcessContext context) throws ExecutionException, InterruptedException { diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java new file mode 100644 index 000000000000..ad5216dd8ccf --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** Provides DoFns for integration with Google Cloud AI Video Intelligence service. */ +package org.apache.beam.sdk.extensions.ml; From f3383534e90340943d97a3af4ebf690c1d506fa3 Mon Sep 17 00:00:00 2001 From: Michal Walenia Date: Fri, 3 Apr 2020 12:51:11 +0200 Subject: [PATCH 5/5] Refactor test verification --- .../extensions/ml/VideoIntelligenceIT.java | 30 +++++++++++-------- settings.gradle | 4 +-- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java index 3ff18ceff63a..b0b74eeccc27 100644 --- a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java +++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.function.Consumer; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -59,21 +60,24 @@ private static class VerifyVideoAnnotationResult @Override public Void apply(Iterable> input) { List labelEvaluations = new ArrayList<>(); - input.forEach( - videoAnnotationResults -> - labelEvaluations.add( - videoAnnotationResults.stream() - .anyMatch( - result -> - result.getSegmentLabelAnnotationsList().stream() - .anyMatch( - labelAnnotation -> - labelAnnotation - .getEntity() - .getDescription() - .equals("dinosaur"))))); + input.forEach(findStringMatchesInVideoAnnotationResultList(labelEvaluations, "dinosaur")); assertEquals(Boolean.TRUE, labelEvaluations.contains(Boolean.TRUE)); return null; } + + private Consumer> findStringMatchesInVideoAnnotationResultList( + List labelEvaluations, String toMatch) { + return videoAnnotationResults -> + labelEvaluations.add( + videoAnnotationResults.stream() + .anyMatch(result -> entityWithDescriptionFoundInSegmentLabels(toMatch, result))); + } + + private boolean entityWithDescriptionFoundInSegmentLabels( + String toMatch, VideoAnnotationResults result) { + return result.getSegmentLabelAnnotationsList().stream() + .anyMatch( + labelAnnotation -> labelAnnotation.getEntity().getDescription().equals(toMatch)); + } } } diff --git a/settings.gradle b/settings.gradle index b7c832a773c0..ea18ec490a0a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -71,6 +71,7 @@ include ":sdks:java:extensions:kryo" include ":sdks:java:extensions:google-cloud-platform-core" include ":sdks:java:extensions:jackson" include ":sdks:java:extensions:join-library" +include ":sdks:java:extensions:ml" include ":sdks:java:extensions:protobuf" include ":sdks:java:extensions:sketching" include ":sdks:java:extensions:sorter" @@ -165,6 +166,3 @@ include "beam-test-infra-metrics" project(":beam-test-infra-metrics").dir = file(".test-infra/metrics") include "beam-test-tools" project(":beam-test-tools").dir = file(".test-infra/tools") -include 'sdks:java:extensions:ml' -findProject(':sdks:java:extensions:ml')?.name = 'ml' -