Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ rat {
"learning/katas/*/IO/**/*.txt",

// Mockito extensions
"sdks/java/io/amazon-web-services2/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker"
"sdks/java/io/amazon-web-services2/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker",
"sdks/java/extensions/ml/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker"
]

// Add .gitignore excludes to the Apache Rat exclusion list. We re-create the behavior
Expand Down
40 changes: 40 additions & 0 deletions sdks/java/extensions/ml/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

plugins { id 'org.apache.beam.module' }
applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.extensions.protobuf')

description = 'Apache Beam :: SDKs :: Java :: Extensions :: ML'

dependencies {
compile project(path: ":sdks:java:core", configuration: "shadow")
compile project(":sdks:java:expansion-service")
testCompile project(path: ':sdks:java:core', configuration: 'shadowTest')
compile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
testCompile library.java.mockito_core
testCompile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
testCompile library.java.junit
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testRuntimeOnly project(":runners:google-cloud-dataflow-java")
}

project.test {
include "**/**IT.class"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.ml;

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.videointelligence.v1.AnnotateVideoProgress;
import com.google.cloud.videointelligence.v1.AnnotateVideoRequest;
import com.google.cloud.videointelligence.v1.AnnotateVideoResponse;
import com.google.cloud.videointelligence.v1.Feature;
import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
import com.google.cloud.videointelligence.v1.VideoContext;
import com.google.cloud.videointelligence.v1.VideoIntelligenceServiceClient;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.PCollectionView;

/**
* Base class for Video Intelligence transform.
*
* @param <T> Class of input data being passed in - either ByteString - video data encoded into.
* String or String - a GCS URI of the video to be annotated.
*/
public abstract class AnnotateVideo<T> extends DoFn<T, List<VideoAnnotationResults>> {

protected final PCollectionView<Map<T, VideoContext>> contextSideInput;
protected final List<Feature> featureList;
VideoIntelligenceServiceClient videoIntelligenceServiceClient;

public AnnotateVideo(
PCollectionView<Map<T, VideoContext>> contextSideInput, List<Feature> featureList) {
this.contextSideInput = contextSideInput;
this.featureList = featureList;
}

public AnnotateVideo(List<Feature> featureList) {
contextSideInput = null;
this.featureList = featureList;
}

@StartBundle
public void startBundle() throws IOException {
videoIntelligenceServiceClient = VideoIntelligenceServiceClient.create();
}

@Teardown
public void teardown() {
videoIntelligenceServiceClient.close();
}

/**
* Call the Video Intelligence Cloud AI service and return annotation results.
*
* @param elementURI This or elementContents is required. GCS address of video to be annotated
* @param elementContents this or elementURI is required. Hex-encoded contents of video to be
* annotated
* @param videoContext Optional context for video annotation.
* @return
*/
List<VideoAnnotationResults> getVideoAnnotationResults(
String elementURI, ByteString elementContents, VideoContext videoContext)
throws InterruptedException, ExecutionException {
AnnotateVideoRequest.Builder requestBuilder =
AnnotateVideoRequest.newBuilder().addAllFeatures(featureList);
if (elementURI != null) {
requestBuilder.setInputUri(elementURI);
} else if (elementContents != null) {
requestBuilder.setInputContent(elementContents);
} else {
throw new IllegalArgumentException("Either elementURI or elementContents should be non-null");
}
if (videoContext != null) {
requestBuilder.setVideoContext(videoContext);
}
AnnotateVideoRequest annotateVideoRequest = requestBuilder.build();
OperationFuture<AnnotateVideoResponse, AnnotateVideoProgress> annotateVideoAsync =
videoIntelligenceServiceClient.annotateVideoAsync(annotateVideoRequest);
return annotateVideoAsync.get().getAnnotationResultsList();
}

/** Process element implementation required. */
@ProcessElement
public abstract void processElement(ProcessContext context)
throws ExecutionException, InterruptedException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.ml;

import com.google.cloud.videointelligence.v1.Feature;
import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
import com.google.cloud.videointelligence.v1.VideoContext;
import com.google.protobuf.ByteString;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;

/**
* Factory class for AnnotateVideo subclasses. allows integration with Google Cloud AI -
* VideoIntelligence service. Converts GCS URIs of videos or ByteStrings with video contents into
* Lists of VideoAnnotationResults.
*
* <p>Adding a side input of Maps of elements to VideoContext objects is allowed, so is using KVs of
* element and VideoContext as input.
*
* <p>Service account with proper permissions is required to use these transforms.
*/
public class VideoIntelligence {

/**
* Annotates videos from GCS URIs.
*
* @param featureList List of features to be annotated
* @param contextSideInput Optional side input with map of contexts to URIs
* @return DoFn performing the necessary operations
*/
public static AnnotateVideoFromURI annotateFromURI(
List<Feature> featureList, PCollectionView<Map<String, VideoContext>> contextSideInput) {
return new AnnotateVideoFromURI(contextSideInput, featureList);
}

/**
* Annotates videos from ByteStrings of their contents.
*
* @param featureList List of features to be annotated
* @param contextSideInput Optional side input with map of contexts to ByteStrings
* @return DoFn performing the necessary operations
*/
public static AnnotateVideoFromBytes annotateFromBytes(
PCollectionView<Map<ByteString, VideoContext>> contextSideInput, List<Feature> featureList) {
return new AnnotateVideoFromBytes(contextSideInput, featureList);
}

/**
* Annotates videos from key-value pairs of GCS URI and VideoContext.
*
* @param featureList List of features to be annotated
* @return DoFn performing the necessary operations
*/
public static AnnotateVideoURIWithContext annotateFromUriWithContext(List<Feature> featureList) {
return new AnnotateVideoURIWithContext(featureList);
}

/**
* Annotates videos from key-value pairs of ByteStrings and VideoContext.
*
* @param featureList List of features to be annotated
* @return DoFn performing the necessary operations
*/
public static AnnotateVideoBytesWithContext annotateFromBytesWithContext(
List<Feature> featureList) {
return new AnnotateVideoBytesWithContext(featureList);
}

/**
* Implementation of AnnotateVideo accepting Strings as contents of input PCollection. Annotates
* videos found on GCS based on URIs from input PCollection.
*/
public static class AnnotateVideoFromURI extends AnnotateVideo<String> {

public AnnotateVideoFromURI(
PCollectionView<Map<String, VideoContext>> contextSideInput, List<Feature> featureList) {
super(contextSideInput, featureList);
}

/** ProcessElement implementation. */
@Override
public void processElement(ProcessContext context)
throws ExecutionException, InterruptedException {
String elementURI = context.element();
VideoContext videoContext = null;
if (contextSideInput != null) {
videoContext = context.sideInput(contextSideInput).get(elementURI);
}
List<VideoAnnotationResults> annotationResultsList =
getVideoAnnotationResults(elementURI, null, videoContext);
context.output(annotationResultsList);
}
}

/**
* Implementation of AnnotateVideo accepting ByteStrings as contents of input PCollection. Videos
* decoded from the ByteStrings are annotated.
*/
public static class AnnotateVideoFromBytes extends AnnotateVideo<ByteString> {

public AnnotateVideoFromBytes(
PCollectionView<Map<ByteString, VideoContext>> contextSideInput,
List<Feature> featureList) {
super(contextSideInput, featureList);
}

/** Implementation of ProcessElement. */
@Override
public void processElement(ProcessContext context)
throws ExecutionException, InterruptedException {
ByteString element = context.element();
VideoContext videoContext = null;
if (contextSideInput != null) {
videoContext = context.sideInput(contextSideInput).get(element);
}
List<VideoAnnotationResults> videoAnnotationResults =
getVideoAnnotationResults(null, element, videoContext);
context.output(videoAnnotationResults);
}
}

/**
* Implementation of AnnotateVideo accepting KVs as contents of input PCollection. Keys are the
* GCS URIs, values - VideoContext objects.
*/
public static class AnnotateVideoURIWithContext extends AnnotateVideo<KV<String, VideoContext>> {

public AnnotateVideoURIWithContext(List<Feature> featureList) {
super(featureList);
}

/** ProcessElement implementation. */
@Override
public void processElement(ProcessContext context)
throws ExecutionException, InterruptedException {
String elementURI = context.element().getKey();
VideoContext videoContext = context.element().getValue();
List<VideoAnnotationResults> videoAnnotationResults =
getVideoAnnotationResults(elementURI, null, videoContext);
context.output(videoAnnotationResults);
}
}

/**
* Implementation of AnnotateVideo accepting KVs as contents of input PCollection. Keys are the
* ByteString encoded video contents, values - VideoContext objects.
*/
public static class AnnotateVideoBytesWithContext
extends AnnotateVideo<KV<ByteString, VideoContext>> {

public AnnotateVideoBytesWithContext(List<Feature> featureList) {
super(featureList);
}

/** ProcessElement implementation. */
@Override
public void processElement(ProcessContext context)
throws ExecutionException, InterruptedException {
ByteString element = context.element().getKey();
VideoContext videoContext = context.element().getValue();
List<VideoAnnotationResults> videoAnnotationResults =
getVideoAnnotationResults(null, element, videoContext);
context.output(videoAnnotationResults);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** Provides DoFns for integration with Google Cloud AI Video Intelligence service. */
package org.apache.beam.sdk.extensions.ml;
Loading