diff --git a/CHANGES.md b/CHANGES.md index 294ab4e3df82..03ea89e71bd1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -31,6 +31,7 @@ ### New Features / Improvements * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). +* New AnnotateVideo & AnnotateVideoWithContext PTransform's that integrates GCP Video Intelligence functionality ([BEAM-9146](https://issues.apache.org/jira/browse/BEAM-9146)) * Added a PTransform for inspection and deidentification of text using Google Cloud DLP. (Python) ([BEAM-9258](https://issues.apache.org/jira/browse/BEAM-9258)) ### Breaking Changes diff --git a/sdks/python/apache_beam/ml/__init__.py b/sdks/python/apache_beam/ml/__init__.py index a4f2cfbf76c7..cce3acad34a4 100644 --- a/sdks/python/apache_beam/ml/__init__.py +++ b/sdks/python/apache_beam/ml/__init__.py @@ -1,17 +1,16 @@ -# /* -# * Licensed to the Apache Software Foundation (ASF) under one -# * or more contributor license agreements. See the NOTICE file -# * distributed with this work for additional information -# * regarding copyright ownership. The ASF licenses this file -# * to you under the Apache License, Version 2.0 (the -# * "License"); you may not use this file except in compliance -# * with the License. You may obtain a copy of the License at -# * -# * http://www.apache.org/licenses/LICENSE-2.0 -# * -# * Unless required by applicable law or agreed to in writing, software -# * distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. -# */ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/sdks/python/apache_beam/ml/gcp/__init__.py b/sdks/python/apache_beam/ml/gcp/__init__.py index a4f2cfbf76c7..cce3acad34a4 100644 --- a/sdks/python/apache_beam/ml/gcp/__init__.py +++ b/sdks/python/apache_beam/ml/gcp/__init__.py @@ -1,17 +1,16 @@ -# /* -# * Licensed to the Apache Software Foundation (ASF) under one -# * or more contributor license agreements. See the NOTICE file -# * distributed with this work for additional information -# * regarding copyright ownership. The ASF licenses this file -# * to you under the Apache License, Version 2.0 (the -# * "License"); you may not use this file except in compliance -# * with the License. You may obtain a copy of the License at -# * -# * http://www.apache.org/licenses/LICENSE-2.0 -# * -# * Unless required by applicable law or agreed to in writing, software -# * distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. -# */ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/sdks/python/apache_beam/ml/gcp/videointelligenceml.py b/sdks/python/apache_beam/ml/gcp/videointelligenceml.py new file mode 100644 index 000000000000..67ff4969f624 --- /dev/null +++ b/sdks/python/apache_beam/ml/gcp/videointelligenceml.py @@ -0,0 +1,230 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A connector for sending API requests to the GCP Video Intelligence API.""" + +from __future__ import absolute_import + +from typing import Optional +from typing import Tuple +from typing import Union + +from future.utils import binary_type +from future.utils import text_type + +from apache_beam import typehints +from apache_beam.metrics import Metrics +from apache_beam.transforms import DoFn +from apache_beam.transforms import ParDo +from apache_beam.transforms import PTransform +from cachetools.func import ttl_cache + +try: + from google.cloud import videointelligence +except ImportError: + raise ImportError( + 'Google Cloud Video Intelligence not supported for this execution ' + 'environment (could not import google.cloud.videointelligence).') + +__all__ = ['AnnotateVideo', 'AnnotateVideoWithContext'] + + +@ttl_cache(maxsize=128, ttl=3600) +def get_videointelligence_client(): + """Returns a Cloud Video Intelligence client.""" + _client = videointelligence.VideoIntelligenceServiceClient() + return _client + + +class AnnotateVideo(PTransform): + """A ``PTransform`` for annotating video using the GCP Video Intelligence API + ref: https://cloud.google.com/video-intelligence/docs + + Sends each element to the GCP Video Intelligence API. Element is a + Union[text_type, binary_type] of either an URI (e.g. a GCS URI) or + binary_type base64-encoded video data. + Accepts an `AsDict` side input that maps each video to a video context. + """ + def __init__( + self, + features, + location_id=None, + metadata=None, + timeout=120, + context_side_input=None): + """ + Args: + features: (List[``videointelligence_v1.enums.Feature``]) Required. + The Video Intelligence API features to detect + location_id: (str) Optional. + Cloud region where annotation should take place. + If no region is specified, a region will be determined + based on video file location. + metadata: (Sequence[Tuple[str, str]]) Optional. + Additional metadata that is provided to the method. + timeout: (int) Optional. + The time in seconds to wait for the response from the + Video Intelligence API + context_side_input: (beam.pvalue.AsDict) Optional. + An ``AsDict`` of a PCollection to be passed to the + _VideoAnnotateFn as the video context mapping containing additional + video context and/or feature-specific parameters. + Example usage:: + + video_contexts = + [('gs://cloud-samples-data/video/cat.mp4', Union[dict, + ``videointelligence_v1.types.VideoContext``]), + ('gs://some-other-video/sample.mp4', Union[dict, + ``videointelligence_v1.types.VideoContext``]),] + + context_side_input = + ( + p + | "Video contexts" >> beam.Create(video_contexts) + ) + + videointelligenceml.AnnotateVideo(features, + context_side_input=beam.pvalue.AsDict(context_side_input))) + """ + super(AnnotateVideo, self).__init__() + self.features = features + self.location_id = location_id + self.metadata = metadata + self.timeout = timeout + self.context_side_input = context_side_input + + def expand(self, pvalue): + return pvalue | ParDo( + _VideoAnnotateFn( + features=self.features, + location_id=self.location_id, + metadata=self.metadata, + timeout=self.timeout), + context_side_input=self.context_side_input) + + +@typehints.with_input_types( + Union[text_type, binary_type], + Optional[videointelligence.types.VideoContext]) +class _VideoAnnotateFn(DoFn): + """A DoFn that sends each input element to the GCP Video Intelligence API + service and outputs an element with the return result of the API + (``google.cloud.videointelligence_v1.types.AnnotateVideoResponse``). + """ + def __init__(self, features, location_id, metadata, timeout): + super(_VideoAnnotateFn, self).__init__() + self._client = None + self.features = features + self.location_id = location_id + self.metadata = metadata + self.timeout = timeout + self.counter = Metrics.counter(self.__class__, "API Calls") + + def start_bundle(self): + self._client = get_videointelligence_client() + + def _annotate_video(self, element, video_context): + if isinstance(element, text_type): # Is element an URI to a GCS bucket + response = self._client.annotate_video( + input_uri=element, + features=self.features, + video_context=video_context, + location_id=self.location_id, + metadata=self.metadata) + else: # Is element raw bytes + response = self._client.annotate_video( + input_content=element, + features=self.features, + video_context=video_context, + location_id=self.location_id, + metadata=self.metadata) + return response + + def process(self, element, context_side_input=None, *args, **kwargs): + if context_side_input: # If we have a side input video context, use that + video_context = context_side_input.get(element) + else: + video_context = None + response = self._annotate_video(element, video_context) + self.counter.inc() + yield response.result(timeout=self.timeout) + + +class AnnotateVideoWithContext(AnnotateVideo): + """A ``PTransform`` for annotating video using the GCP Video Intelligence API + ref: https://cloud.google.com/video-intelligence/docs + + Sends each element to the GCP Video Intelligence API. + Element is a tuple of + + (Union[text_type, binary_type], + Optional[videointelligence.types.VideoContext]) + + where the former is either an URI (e.g. a GCS URI) or + binary_type base64-encoded video data + """ + def __init__(self, features, location_id=None, metadata=None, timeout=120): + """ + Args: + features: (List[``videointelligence_v1.enums.Feature``]) Required. + the Video Intelligence API features to detect + location_id: (str) Optional. + Cloud region where annotation should take place. + If no region is specified, a region will be determined + based on video file location. + metadata: (Sequence[Tuple[str, str]]) Optional. + Additional metadata that is provided to the method. + timeout: (int) Optional. + The time in seconds to wait for the response from the + Video Intelligence API + """ + super(AnnotateVideoWithContext, self).__init__( + features=features, + location_id=location_id, + metadata=metadata, + timeout=timeout) + + def expand(self, pvalue): + return pvalue | ParDo( + _VideoAnnotateFnWithContext( + features=self.features, + location_id=self.location_id, + metadata=self.metadata, + timeout=self.timeout)) + + +@typehints.with_input_types( + Tuple[Union[text_type, binary_type], + Optional[videointelligence.types.VideoContext]]) +class _VideoAnnotateFnWithContext(_VideoAnnotateFn): + """A DoFn that unpacks each input tuple to element, video_context variables + and sends these to the GCP Video Intelligence API service and outputs + an element with the return result of the API + (``google.cloud.videointelligence_v1.types.AnnotateVideoResponse``). + """ + def __init__(self, features, location_id, metadata, timeout): + super(_VideoAnnotateFnWithContext, self).__init__( + features=features, + location_id=location_id, + metadata=metadata, + timeout=timeout) + + def process(self, element, *args, **kwargs): + element, video_context = element # Unpack (video, video_context) tuple + response = self._annotate_video(element, video_context) + self.counter.inc() + yield response.result(timeout=self.timeout) diff --git a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py new file mode 100644 index 000000000000..b0bb441c5bba --- /dev/null +++ b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py @@ -0,0 +1,208 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for videointelligenceml.""" + +# pytype: skip-file + +from __future__ import absolute_import +from __future__ import unicode_literals + +import logging +import unittest + +import mock + +import apache_beam as beam +from apache_beam.metrics import MetricsFilter +from apache_beam.typehints.decorators import TypeCheckError + +# Protect against environments where video intelligence lib is not available. +# pylint: disable=ungrouped-imports +try: + from google.cloud.videointelligence import VideoIntelligenceServiceClient + from google.cloud import videointelligence + from apache_beam.ml.gcp import videointelligenceml +except ImportError: + VideoIntelligenceServiceClient = None + + +@unittest.skipIf( + VideoIntelligenceServiceClient is None, + 'Video intelligence dependencies are not installed') +class VideoIntelligenceTest(unittest.TestCase): + def setUp(self): + self._mock_client = mock.Mock() + self.m2 = mock.Mock() + self.m2.result.return_value = None + self._mock_client.annotate_video.return_value = self.m2 + self.features = [videointelligence.enums.Feature.LABEL_DETECTION] + self.location_id = 'us-west1' + config = videointelligence.types.SpeechTranscriptionConfig( + language_code='en-US', enable_automatic_punctuation=True) + self.video_ctx = videointelligence.types.VideoContext( + speech_transcription_config=config) + + def test_AnnotateVideo_with_side_input_context(self): + videos_to_annotate = [ + 'gs://cloud-samples-data/video/cat.mp4', + 'gs://some-other-video/sample.mp4', + 'gs://some-other-video/sample_2.mp4' + ] + video_contexts = [ + ('gs://cloud-samples-data/video/cat.mp4', self.video_ctx), + ('gs://some-other-video/sample.mp4', self.video_ctx), + ] + + expected_counter = len(videos_to_annotate) + with mock.patch.object(videointelligenceml, + 'get_videointelligence_client', + return_value=self._mock_client): + p = beam.Pipeline() + context_side_input = (p | "Video contexts" >> beam.Create(video_contexts)) + + _ = ( + p + | "Create data" >> beam.Create(videos_to_annotate) + | "Annotate video" >> videointelligenceml.AnnotateVideo( + self.features, + context_side_input=beam.pvalue.AsDict(context_side_input))) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.committed == expected_counter) + + def test_AnnotateVideo_URIs(self): + videos_to_annotate = [ + 'gs://cloud-samples-data/video/cat.mp4', + 'gs://cloud-samples-data/video/cat.mp4' + ] + expected_counter = len(videos_to_annotate) + with mock.patch.object(videointelligenceml, + 'get_videointelligence_client', + return_value=self._mock_client): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(videos_to_annotate) + | + "Annotate video" >> videointelligenceml.AnnotateVideo(self.features)) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.committed == expected_counter) + + def test_AnnotateVideoWithContext_b64_content(self): + base_64_encoded_video = \ + b'YmVnaW4gNjQ0IGNhdC12aWRlby5tcDRNICAgICgmOVQ+NyFNPCMwUi4uZmFrZV92aWRlb' + videos_to_annotate = [ + (base_64_encoded_video, self.video_ctx), + (base_64_encoded_video, None), + (base_64_encoded_video, self.video_ctx), + ] + expected_counter = len(videos_to_annotate) + with mock.patch.object(videointelligenceml, + 'get_videointelligence_client', + return_value=self._mock_client): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(videos_to_annotate) + | "Annotate video" >> videointelligenceml.AnnotateVideoWithContext( + self.features)) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.committed == expected_counter) + + def test_AnnotateVideo_b64_content(self): + base_64_encoded_video = \ + b'YmVnaW4gNjQ0IGNhdC12aWRlby5tcDRNICAgICgmOVQ+NyFNPCMwUi4uZmFrZV92aWRlb' + videos_to_annotate = [ + base_64_encoded_video, + base_64_encoded_video, + base_64_encoded_video, + ] + expected_counter = len(videos_to_annotate) + with mock.patch.object(videointelligenceml, + 'get_videointelligence_client', + return_value=self._mock_client): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(videos_to_annotate) + | + "Annotate video" >> videointelligenceml.AnnotateVideo(self.features)) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.committed == expected_counter) + + def test_AnnotateVideoWithContext_bad_input(self): + """AnnotateVideoWithContext should not accept videos without context""" + videos_to_annotate = [ + 'gs://cloud-samples-data/video/cat.mp4', + 'gs://cloud-samples-data/video/cat.mp4' + ] + with mock.patch.object(videointelligenceml, + 'get_videointelligence_client', + return_value=self._mock_client): + with self.assertRaises(TypeCheckError): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(videos_to_annotate) + | "Annotate video" >> videointelligenceml.AnnotateVideoWithContext( + self.features)) + result = p.run() + result.wait_until_finish() + + def test_AnnotateVideo_bad_input(self): + videos_to_annotate = [123456789, 123456789, 123456789] + with mock.patch.object(videointelligenceml, + 'get_videointelligence_client', + return_value=self._mock_client): + with self.assertRaises(TypeCheckError): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(videos_to_annotate) + | "Annotate video" >> videointelligenceml.AnnotateVideo( + self.features)) + result = p.run() + result.wait_until_finish() + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 6b9ea50348cf..0761c47e3ba8 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -212,6 +212,8 @@ def get_version(): 'proto-google-cloud-datastore-v1>=0.90.0,<=0.90.4; python_version < "3.0"', 'google-cloud-spanner>=1.13.0,<1.14.0', 'grpcio-gcp>=0.2.2,<1', + # GCP Packages required by ML functionality + 'google-cloud-videointelligence>=1.8.0<=1.12.1', ] INTERACTIVE_BEAM = [