From 6ab5013a3e3e703a2734fab60fbfc6ed61c0a9de Mon Sep 17 00:00:00 2001 From: EDjur Date: Tue, 4 Feb 2020 15:06:00 +0100 Subject: [PATCH 01/20] [BEAM-9146] PTransform that integrates Video Intelligence functionality --- sdks/python/apache_beam/io/gcp/ai/__init__.py | 17 +++ sdks/python/apache_beam/io/gcp/ai/helper.py | 34 ++++++ .../io/gcp/ai/video_intelligence.py | 75 ++++++++++++ .../io/gcp/ai/video_intelligence_test.py | 108 ++++++++++++++++++ 4 files changed, 234 insertions(+) create mode 100644 sdks/python/apache_beam/io/gcp/ai/__init__.py create mode 100644 sdks/python/apache_beam/io/gcp/ai/helper.py create mode 100644 sdks/python/apache_beam/io/gcp/ai/video_intelligence.py create mode 100644 sdks/python/apache_beam/io/gcp/ai/video_intelligence_test.py diff --git a/sdks/python/apache_beam/io/gcp/ai/__init__.py b/sdks/python/apache_beam/io/gcp/ai/__init__.py new file mode 100644 index 000000000000..f4f43cbb1236 --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/ai/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import absolute_import diff --git a/sdks/python/apache_beam/io/gcp/ai/helper.py b/sdks/python/apache_beam/io/gcp/ai/helper.py new file mode 100644 index 000000000000..f591384f54f2 --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/ai/helper.py @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Cloud Video Intelligence client + +For internal use only; no backwards-compatibility guarantees. +""" + +from __future__ import absolute_import + +from cachetools.func import ttl_cache +from google.cloud import videointelligence + + +@ttl_cache(maxsize=128, ttl=3600) +def get_videointelligence_client(): + """Returns a Cloud Video Intelligence client.""" + _client = videointelligence.VideoIntelligenceServiceClient() + return _client diff --git a/sdks/python/apache_beam/io/gcp/ai/video_intelligence.py b/sdks/python/apache_beam/io/gcp/ai/video_intelligence.py new file mode 100644 index 000000000000..986d3cdf34f5 --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/ai/video_intelligence.py @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +A connector for sending API requests to the GCP Video Intelligence API. +""" +from __future__ import absolute_import + +from typing import Union + +from apache_beam import typehints +from apache_beam.io.gcp.ai import helper +from apache_beam.metrics import Metrics +from apache_beam.transforms import DoFn, ParDo, PTransform + + +class AnnotateVideo(PTransform): + """A ``PTransform`` for annotating video using the GCP Video Intelligence API + ref: https://cloud.google.com/video-intelligence/docs + """ + + def __init__(self, features): + """ + Args: + features: (List[``videointelligence.enums.Feature``]) + the Video Intelligence API features to detect + """ + super(AnnotateVideo).__init__() + self.features = features + + def expand(self, pvalue): + return pvalue | ParDo(self._VideoAnnotateFn(features=self.features)) + + @typehints.with_input_types(Union[str, bytes]) + class _VideoAnnotateFn(DoFn): + """ A ``DoFn`` that sends every element to the GCP Video Intelligence API + and returns a PCollection of + ``google.cloud.videointelligence_v1.types.AnnotateVideoResponse``. + """ + + def __init__(self, features): + super(AnnotateVideo._VideoAnnotateFn, self).__init__() + self._client = None + self.features = features + self.counter = Metrics.counter(self.__class__, "API Calls") + + def start_bundle(self): + self._client = helper.get_videointelligence_client() + + def process(self, element, *args, **kwargs): + if isinstance(element, str): # Is element an URI to a GCS bucket + response = self._client.annotate_video(input_uri=element, + features=self.features) + elif isinstance(element, bytes): # Is element raw bytes + response = self._client.annotate_video(input_content=element, + features=self.features) + else: + raise TypeError( + "{}: input element needs to be either str or b64-encoded bytes" + " got {} instead".format(self.__class__.__name__, type(element))) + self.counter.inc() + yield response.result(timeout=120) diff --git a/sdks/python/apache_beam/io/gcp/ai/video_intelligence_test.py b/sdks/python/apache_beam/io/gcp/ai/video_intelligence_test.py new file mode 100644 index 000000000000..a2d1e4bb2d85 --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/ai/video_intelligence_test.py @@ -0,0 +1,108 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for video_intelligence.""" + +# pytype: skip-file + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import unittest +from base64 import b64encode + +import mock + +import apache_beam as beam +from apache_beam.io.gcp.ai import helper, video_intelligence +from apache_beam.metrics import MetricsFilter +from apache_beam.typehints.decorators import TypeCheckError + +# Protect against environments where video intelligence lib is not available. +try: + from google.cloud.videointelligence import VideoIntelligenceServiceClient + from google.cloud import videointelligence +except ImportError: + VideoIntelligenceServiceClient = None + + +@unittest.skipIf(VideoIntelligenceServiceClient is None, + 'Video intelligence dependencies are not installed') +class VideoIntelligenceTest(unittest.TestCase): + def setUp(self): + self._mock_client = mock.Mock() + self.m2 = mock.Mock() + self.m2.result.return_value = None + self._mock_client.annotate_video.return_value = self.m2 + self.features = [videointelligence.enums.Feature.LABEL_DETECTION] + + def test_AnnotateVideo_URIs(self): + videos_to_annotate = ['gs://cloud-samples-data/video/cat.mp4', + 'gs://cloud-samples-data/video/cat.mp4'] + expected_counter = len(videos_to_annotate) + with mock.patch.object(helper, 'get_videointelligence_client', + return_value=self._mock_client): + p = beam.Pipeline() + _ = (p + | "Create data" >> beam.Create(videos_to_annotate) + | "Annotate video" >> video_intelligence.AnnotateVideo(self.features) + ) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.committed == expected_counter) + + def test_AnnotateVideo_b64_content(self): + base_64_encoded_video = b64encode( + b'begin 644 cat-video.mp4M (&9T>7!M<#0R..fake_video_content') + videos_to_annotate = [base_64_encoded_video, base_64_encoded_video, + base_64_encoded_video] + expected_counter = len(videos_to_annotate) + with mock.patch.object(helper, 'get_videointelligence_client', + return_value=self._mock_client): + p = beam.Pipeline() + _ = (p + | "Create data" >> beam.Create(videos_to_annotate) + | "Annotate video" >> video_intelligence.AnnotateVideo(self.features) + ) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.committed == expected_counter) + + def test_AnnotateVideo_bad_input(self): + videos_to_annotate = [123456789, 123456789, 123456789] + with mock.patch.object(helper, 'get_videointelligence_client', + return_value=self._mock_client): + with self.assertRaises(TypeCheckError): + p = beam.Pipeline() + _ = (p + | "Create data" >> beam.Create(videos_to_annotate) + | "Annotate video" >> video_intelligence.AnnotateVideo( + self.features) + ) + result = p.run() + result.wait_until_finish() From 0e54a6423bb5b220a6d3e1d7bc7e937af2541033 Mon Sep 17 00:00:00 2001 From: EDjur Date: Tue, 4 Feb 2020 15:14:55 +0100 Subject: [PATCH 02/20] Removed unused imports --- sdks/python/apache_beam/io/gcp/ai/video_intelligence_test.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/ai/video_intelligence_test.py b/sdks/python/apache_beam/io/gcp/ai/video_intelligence_test.py index a2d1e4bb2d85..cdeee7fd4a3c 100644 --- a/sdks/python/apache_beam/io/gcp/ai/video_intelligence_test.py +++ b/sdks/python/apache_beam/io/gcp/ai/video_intelligence_test.py @@ -20,8 +20,6 @@ # pytype: skip-file from __future__ import absolute_import -from __future__ import division -from __future__ import print_function import unittest from base64 import b64encode From 86aeff86aaa37f538ffcafa92fff78b4647c5c6a Mon Sep 17 00:00:00 2001 From: EDjur Date: Thu, 6 Feb 2020 14:55:29 +0100 Subject: [PATCH 03/20] byte and string comparison now supporting py23. Added extra args to annotate_video. Added requirement in setup.py --- sdks/python/apache_beam/io/gcp/ai/helper.py | 2 + .../io/gcp/ai/video_intelligence.py | 54 ++++++++++++++----- .../io/gcp/ai/video_intelligence_test.py | 35 ++++++++++-- sdks/python/setup.py | 2 + 4 files changed, 77 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/ai/helper.py b/sdks/python/apache_beam/io/gcp/ai/helper.py index f591384f54f2..0100a6f42d8c 100644 --- a/sdks/python/apache_beam/io/gcp/ai/helper.py +++ b/sdks/python/apache_beam/io/gcp/ai/helper.py @@ -21,6 +21,8 @@ For internal use only; no backwards-compatibility guarantees. """ +# pytype: skip-file + from __future__ import absolute_import from cachetools.func import ttl_cache diff --git a/sdks/python/apache_beam/io/gcp/ai/video_intelligence.py b/sdks/python/apache_beam/io/gcp/ai/video_intelligence.py index 986d3cdf34f5..e998625e6ee9 100644 --- a/sdks/python/apache_beam/io/gcp/ai/video_intelligence.py +++ b/sdks/python/apache_beam/io/gcp/ai/video_intelligence.py @@ -19,6 +19,7 @@ """ from __future__ import absolute_import +from future.utils import binary_type, text_type from typing import Union from apache_beam import typehints @@ -26,50 +27,79 @@ from apache_beam.metrics import Metrics from apache_beam.transforms import DoFn, ParDo, PTransform +__all__ = ['AnnotateVideo'] + class AnnotateVideo(PTransform): """A ``PTransform`` for annotating video using the GCP Video Intelligence API ref: https://cloud.google.com/video-intelligence/docs """ - def __init__(self, features): + def __init__(self, features, video_context=None, location_id=None, + metadata=None): """ Args: - features: (List[``videointelligence.enums.Feature``]) + features: (List[``videointelligence_v1.enums.Feature``]) Required. the Video Intelligence API features to detect + video_context: (dict, ``videointelligence_v1.types.VideoContext``) + Optional. + Additional video context and/or feature-specific parameters. + location_id: (str) Optional. + Cloud region where annotation should take place. + If no region is specified, a region will be determined + based on video file location. + metadata: (Sequence[Tuple[str, str]]) Optional. + Additional metadata that is provided to the method. """ - super(AnnotateVideo).__init__() + super(AnnotateVideo, self).__init__() self.features = features + self.video_context = video_context + self.location_id = location_id + self.metadata = metadata def expand(self, pvalue): - return pvalue | ParDo(self._VideoAnnotateFn(features=self.features)) + return pvalue | ParDo(self._VideoAnnotateFn( + features=self.features, + video_context=self.video_context, + location_id=self.location_id, + metadata=self.metadata)) - @typehints.with_input_types(Union[str, bytes]) + @typehints.with_input_types(Union[text_type, binary_type]) class _VideoAnnotateFn(DoFn): """ A ``DoFn`` that sends every element to the GCP Video Intelligence API and returns a PCollection of ``google.cloud.videointelligence_v1.types.AnnotateVideoResponse``. """ - def __init__(self, features): + def __init__(self, features, video_context, location_id, metadata): super(AnnotateVideo._VideoAnnotateFn, self).__init__() self._client = None self.features = features + self.video_context = video_context + self.location_id = location_id + self.metadata = metadata self.counter = Metrics.counter(self.__class__, "API Calls") def start_bundle(self): self._client = helper.get_videointelligence_client() def process(self, element, *args, **kwargs): - if isinstance(element, str): # Is element an URI to a GCS bucket + if isinstance(element, text_type): # Is element an URI to a GCS bucket response = self._client.annotate_video(input_uri=element, - features=self.features) - elif isinstance(element, bytes): # Is element raw bytes + features=self.features, + video_context=self.video_context, + location_id=self.location_id, + metadata=self.metadata) + elif isinstance(element, binary_type): # Is element raw bytes response = self._client.annotate_video(input_content=element, - features=self.features) + features=self.features, + video_context=self.video_context, + location_id=self.location_id, + metadata=self.metadata) else: raise TypeError( - "{}: input element needs to be either str or b64-encoded bytes" - " got {} instead".format(self.__class__.__name__, type(element))) + "{}: input element needs to be either {} or {}" + " got {} instead".format(self.__class__.__name__, text_type, + binary_type, type(element))) self.counter.inc() yield response.result(timeout=120) diff --git a/sdks/python/apache_beam/io/gcp/ai/video_intelligence_test.py b/sdks/python/apache_beam/io/gcp/ai/video_intelligence_test.py index cdeee7fd4a3c..251e526a7017 100644 --- a/sdks/python/apache_beam/io/gcp/ai/video_intelligence_test.py +++ b/sdks/python/apache_beam/io/gcp/ai/video_intelligence_test.py @@ -19,10 +19,9 @@ # pytype: skip-file -from __future__ import absolute_import +from __future__ import absolute_import, unicode_literals import unittest -from base64 import b64encode import mock @@ -48,6 +47,12 @@ def setUp(self): self.m2.result.return_value = None self._mock_client.annotate_video.return_value = self.m2 self.features = [videointelligence.enums.Feature.LABEL_DETECTION] + config = videointelligence.types.SpeechTranscriptionConfig( + language_code='en-US', + enable_automatic_punctuation=True) + self.video_context = videointelligence.types.VideoContext( + speech_transcription_config=config) + self.location_id = 'us-west1' def test_AnnotateVideo_URIs(self): videos_to_annotate = ['gs://cloud-samples-data/video/cat.mp4', @@ -70,8 +75,8 @@ def test_AnnotateVideo_URIs(self): self.assertTrue(read_counter.committed == expected_counter) def test_AnnotateVideo_b64_content(self): - base_64_encoded_video = b64encode( - b'begin 644 cat-video.mp4M (&9T>7!M<#0R..fake_video_content') + base_64_encoded_video = \ + b'begin 644 cat-video.mp4M (&9T>7!M<#0R..fake_video_content' videos_to_annotate = [base_64_encoded_video, base_64_encoded_video, base_64_encoded_video] expected_counter = len(videos_to_annotate) @@ -104,3 +109,25 @@ def test_AnnotateVideo_bad_input(self): ) result = p.run() result.wait_until_finish() + + def test_AnnotateVideo_video_context(self): + videos_to_annotate = ['gs://cloud-samples-data/video/cat.mp4'] + expected_counter = len(videos_to_annotate) + with mock.patch.object(helper, 'get_videointelligence_client', + return_value=self._mock_client): + p = beam.Pipeline() + _ = (p + | "Create data" >> beam.Create(videos_to_annotate) + | "Annotate video" >> video_intelligence.AnnotateVideo( + self.features, + video_context=self.video_context, + location_id=self.location_id) + ) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.committed == expected_counter) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index a396d17a5e52..8959ba4415d1 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -206,6 +206,8 @@ def get_version(): 'proto-google-cloud-datastore-v1>=0.90.0,<=0.90.4; python_version < "3.0"', 'google-cloud-spanner>=1.7.1<1.8.0', 'grpcio-gcp>=0.2.2,<1', + # GCP Packages required by ML functionality + 'google-cloud-videointelligence>=1.8.0<=1.12.1', ] INTERACTIVE_BEAM = [ From d2d3fc4ba18bd551f8a2156c675cba6545e39d7a Mon Sep 17 00:00:00 2001 From: EDjur Date: Thu, 6 Feb 2020 14:57:04 +0100 Subject: [PATCH 04/20] Folder restructuring --- .../apache_beam/{io/gcp/ai => ml}/__init__.py | 1 - sdks/python/apache_beam/ml/gcp/__init__.py | 16 ++++++++++++++++ .../{io/gcp/ai => ml/gcp}/video_intelligence.py | 2 +- .../gcp/video_intelligence_helper.py} | 0 .../gcp/ai => ml/gcp}/video_intelligence_test.py | 3 ++- 5 files changed, 19 insertions(+), 3 deletions(-) rename sdks/python/apache_beam/{io/gcp/ai => ml}/__init__.py (95%) create mode 100644 sdks/python/apache_beam/ml/gcp/__init__.py rename sdks/python/apache_beam/{io/gcp/ai => ml/gcp}/video_intelligence.py (98%) rename sdks/python/apache_beam/{io/gcp/ai/helper.py => ml/gcp/video_intelligence_helper.py} (100%) rename sdks/python/apache_beam/{io/gcp/ai => ml/gcp}/video_intelligence_test.py (98%) diff --git a/sdks/python/apache_beam/io/gcp/ai/__init__.py b/sdks/python/apache_beam/ml/__init__.py similarity index 95% rename from sdks/python/apache_beam/io/gcp/ai/__init__.py rename to sdks/python/apache_beam/ml/__init__.py index f4f43cbb1236..cce3acad34a4 100644 --- a/sdks/python/apache_beam/io/gcp/ai/__init__.py +++ b/sdks/python/apache_beam/ml/__init__.py @@ -14,4 +14,3 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from __future__ import absolute_import diff --git a/sdks/python/apache_beam/ml/gcp/__init__.py b/sdks/python/apache_beam/ml/gcp/__init__.py new file mode 100644 index 000000000000..cce3acad34a4 --- /dev/null +++ b/sdks/python/apache_beam/ml/gcp/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/sdks/python/apache_beam/io/gcp/ai/video_intelligence.py b/sdks/python/apache_beam/ml/gcp/video_intelligence.py similarity index 98% rename from sdks/python/apache_beam/io/gcp/ai/video_intelligence.py rename to sdks/python/apache_beam/ml/gcp/video_intelligence.py index e998625e6ee9..b6571809f6fe 100644 --- a/sdks/python/apache_beam/io/gcp/ai/video_intelligence.py +++ b/sdks/python/apache_beam/ml/gcp/video_intelligence.py @@ -23,7 +23,7 @@ from typing import Union from apache_beam import typehints -from apache_beam.io.gcp.ai import helper +from apache_beam.ml.gcp import video_intelligence_helper as helper from apache_beam.metrics import Metrics from apache_beam.transforms import DoFn, ParDo, PTransform diff --git a/sdks/python/apache_beam/io/gcp/ai/helper.py b/sdks/python/apache_beam/ml/gcp/video_intelligence_helper.py similarity index 100% rename from sdks/python/apache_beam/io/gcp/ai/helper.py rename to sdks/python/apache_beam/ml/gcp/video_intelligence_helper.py diff --git a/sdks/python/apache_beam/io/gcp/ai/video_intelligence_test.py b/sdks/python/apache_beam/ml/gcp/video_intelligence_test.py similarity index 98% rename from sdks/python/apache_beam/io/gcp/ai/video_intelligence_test.py rename to sdks/python/apache_beam/ml/gcp/video_intelligence_test.py index 251e526a7017..921b51139ccf 100644 --- a/sdks/python/apache_beam/io/gcp/ai/video_intelligence_test.py +++ b/sdks/python/apache_beam/ml/gcp/video_intelligence_test.py @@ -26,8 +26,9 @@ import mock import apache_beam as beam -from apache_beam.io.gcp.ai import helper, video_intelligence from apache_beam.metrics import MetricsFilter +from apache_beam.ml.gcp import video_intelligence_helper as helper, \ + video_intelligence from apache_beam.typehints.decorators import TypeCheckError # Protect against environments where video intelligence lib is not available. From c1abf00aff393c3e72e8837bc23e45c865ed0520 Mon Sep 17 00:00:00 2001 From: EDjur Date: Thu, 6 Feb 2020 15:30:25 +0100 Subject: [PATCH 05/20] Yapf formatting --- .../apache_beam/ml/gcp/video_intelligence.py | 44 ++++++------ .../ml/gcp/video_intelligence_test.py | 71 ++++++++++--------- 2 files changed, 62 insertions(+), 53 deletions(-) diff --git a/sdks/python/apache_beam/ml/gcp/video_intelligence.py b/sdks/python/apache_beam/ml/gcp/video_intelligence.py index b6571809f6fe..59f1d282b00e 100644 --- a/sdks/python/apache_beam/ml/gcp/video_intelligence.py +++ b/sdks/python/apache_beam/ml/gcp/video_intelligence.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # + """ A connector for sending API requests to the GCP Video Intelligence API. """ @@ -34,9 +35,8 @@ class AnnotateVideo(PTransform): """A ``PTransform`` for annotating video using the GCP Video Intelligence API ref: https://cloud.google.com/video-intelligence/docs """ - - def __init__(self, features, video_context=None, location_id=None, - metadata=None): + def __init__( + self, features, video_context=None, location_id=None, metadata=None): """ Args: features: (List[``videointelligence_v1.enums.Feature``]) Required. @@ -58,11 +58,12 @@ def __init__(self, features, video_context=None, location_id=None, self.metadata = metadata def expand(self, pvalue): - return pvalue | ParDo(self._VideoAnnotateFn( - features=self.features, - video_context=self.video_context, - location_id=self.location_id, - metadata=self.metadata)) + return pvalue | ParDo( + self._VideoAnnotateFn( + features=self.features, + video_context=self.video_context, + location_id=self.location_id, + metadata=self.metadata)) @typehints.with_input_types(Union[text_type, binary_type]) class _VideoAnnotateFn(DoFn): @@ -70,7 +71,6 @@ class _VideoAnnotateFn(DoFn): and returns a PCollection of ``google.cloud.videointelligence_v1.types.AnnotateVideoResponse``. """ - def __init__(self, features, video_context, location_id, metadata): super(AnnotateVideo._VideoAnnotateFn, self).__init__() self._client = None @@ -85,21 +85,23 @@ def start_bundle(self): def process(self, element, *args, **kwargs): if isinstance(element, text_type): # Is element an URI to a GCS bucket - response = self._client.annotate_video(input_uri=element, - features=self.features, - video_context=self.video_context, - location_id=self.location_id, - metadata=self.metadata) + response = self._client.annotate_video( + input_uri=element, + features=self.features, + video_context=self.video_context, + location_id=self.location_id, + metadata=self.metadata) elif isinstance(element, binary_type): # Is element raw bytes - response = self._client.annotate_video(input_content=element, - features=self.features, - video_context=self.video_context, - location_id=self.location_id, - metadata=self.metadata) + response = self._client.annotate_video( + input_content=element, + features=self.features, + video_context=self.video_context, + location_id=self.location_id, + metadata=self.metadata) else: raise TypeError( "{}: input element needs to be either {} or {}" - " got {} instead".format(self.__class__.__name__, text_type, - binary_type, type(element))) + " got {} instead".format( + self.__class__.__name__, text_type, binary_type, type(element))) self.counter.inc() yield response.result(timeout=120) diff --git a/sdks/python/apache_beam/ml/gcp/video_intelligence_test.py b/sdks/python/apache_beam/ml/gcp/video_intelligence_test.py index 921b51139ccf..7fe00ee05e2c 100644 --- a/sdks/python/apache_beam/ml/gcp/video_intelligence_test.py +++ b/sdks/python/apache_beam/ml/gcp/video_intelligence_test.py @@ -39,8 +39,9 @@ VideoIntelligenceServiceClient = None -@unittest.skipIf(VideoIntelligenceServiceClient is None, - 'Video intelligence dependencies are not installed') +@unittest.skipIf( + VideoIntelligenceServiceClient is None, + 'Video intelligence dependencies are not installed') class VideoIntelligenceTest(unittest.TestCase): def setUp(self): self._mock_client = mock.Mock() @@ -49,23 +50,25 @@ def setUp(self): self._mock_client.annotate_video.return_value = self.m2 self.features = [videointelligence.enums.Feature.LABEL_DETECTION] config = videointelligence.types.SpeechTranscriptionConfig( - language_code='en-US', - enable_automatic_punctuation=True) + language_code='en-US', enable_automatic_punctuation=True) self.video_context = videointelligence.types.VideoContext( speech_transcription_config=config) self.location_id = 'us-west1' def test_AnnotateVideo_URIs(self): - videos_to_annotate = ['gs://cloud-samples-data/video/cat.mp4', - 'gs://cloud-samples-data/video/cat.mp4'] + videos_to_annotate = [ + 'gs://cloud-samples-data/video/cat.mp4', + 'gs://cloud-samples-data/video/cat.mp4' + ] expected_counter = len(videos_to_annotate) - with mock.patch.object(helper, 'get_videointelligence_client', + with mock.patch.object(helper, + 'get_videointelligence_client', return_value=self._mock_client): p = beam.Pipeline() - _ = (p - | "Create data" >> beam.Create(videos_to_annotate) - | "Annotate video" >> video_intelligence.AnnotateVideo(self.features) - ) + _ = ( + p + | "Create data" >> beam.Create(videos_to_annotate) + | "Annotate video" >> video_intelligence.AnnotateVideo(self.features)) result = p.run() result.wait_until_finish() @@ -78,16 +81,18 @@ def test_AnnotateVideo_URIs(self): def test_AnnotateVideo_b64_content(self): base_64_encoded_video = \ b'begin 644 cat-video.mp4M (&9T>7!M<#0R..fake_video_content' - videos_to_annotate = [base_64_encoded_video, base_64_encoded_video, - base_64_encoded_video] + videos_to_annotate = [ + base_64_encoded_video, base_64_encoded_video, base_64_encoded_video + ] expected_counter = len(videos_to_annotate) - with mock.patch.object(helper, 'get_videointelligence_client', + with mock.patch.object(helper, + 'get_videointelligence_client', return_value=self._mock_client): p = beam.Pipeline() - _ = (p - | "Create data" >> beam.Create(videos_to_annotate) - | "Annotate video" >> video_intelligence.AnnotateVideo(self.features) - ) + _ = ( + p + | "Create data" >> beam.Create(videos_to_annotate) + | "Annotate video" >> video_intelligence.AnnotateVideo(self.features)) result = p.run() result.wait_until_finish() @@ -99,31 +104,33 @@ def test_AnnotateVideo_b64_content(self): def test_AnnotateVideo_bad_input(self): videos_to_annotate = [123456789, 123456789, 123456789] - with mock.patch.object(helper, 'get_videointelligence_client', + with mock.patch.object(helper, + 'get_videointelligence_client', return_value=self._mock_client): with self.assertRaises(TypeCheckError): p = beam.Pipeline() - _ = (p - | "Create data" >> beam.Create(videos_to_annotate) - | "Annotate video" >> video_intelligence.AnnotateVideo( - self.features) - ) + _ = ( + p + | "Create data" >> beam.Create(videos_to_annotate) + | + "Annotate video" >> video_intelligence.AnnotateVideo(self.features)) result = p.run() result.wait_until_finish() def test_AnnotateVideo_video_context(self): videos_to_annotate = ['gs://cloud-samples-data/video/cat.mp4'] expected_counter = len(videos_to_annotate) - with mock.patch.object(helper, 'get_videointelligence_client', + with mock.patch.object(helper, + 'get_videointelligence_client', return_value=self._mock_client): p = beam.Pipeline() - _ = (p - | "Create data" >> beam.Create(videos_to_annotate) - | "Annotate video" >> video_intelligence.AnnotateVideo( - self.features, - video_context=self.video_context, - location_id=self.location_id) - ) + _ = ( + p + | "Create data" >> beam.Create(videos_to_annotate) + | "Annotate video" >> video_intelligence.AnnotateVideo( + self.features, + video_context=self.video_context, + location_id=self.location_id)) result = p.run() result.wait_until_finish() From bfe848a4ac0d31ca83058cb197cae6a816ab9b6b Mon Sep 17 00:00:00 2001 From: EDjur Date: Thu, 6 Feb 2020 19:53:02 +0100 Subject: [PATCH 06/20] Merged helper into main file. Added timeout default arg. Doc fixes. --- .../apache_beam/ml/gcp/video_intelligence.py | 42 +++++++++++++++---- .../ml/gcp/video_intelligence_helper.py | 36 ---------------- .../ml/gcp/video_intelligence_test.py | 11 +++-- 3 files changed, 38 insertions(+), 51 deletions(-) delete mode 100644 sdks/python/apache_beam/ml/gcp/video_intelligence_helper.py diff --git a/sdks/python/apache_beam/ml/gcp/video_intelligence.py b/sdks/python/apache_beam/ml/gcp/video_intelligence.py index 59f1d282b00e..8b48bcc42771 100644 --- a/sdks/python/apache_beam/ml/gcp/video_intelligence.py +++ b/sdks/python/apache_beam/ml/gcp/video_intelligence.py @@ -24,19 +24,38 @@ from typing import Union from apache_beam import typehints -from apache_beam.ml.gcp import video_intelligence_helper as helper from apache_beam.metrics import Metrics from apache_beam.transforms import DoFn, ParDo, PTransform +try: + from google.cloud import videointelligence +except ImportError: + raise ImportError( + 'Google Cloud Video Intelligence not supported for this execution environment ' + '(could not import google.cloud.videointelligence).') +from cachetools.func import ttl_cache + __all__ = ['AnnotateVideo'] +@ttl_cache(maxsize=128, ttl=3600) +def get_videointelligence_client(): + """Returns a Cloud Video Intelligence client.""" + _client = videointelligence.VideoIntelligenceServiceClient() + return _client + + class AnnotateVideo(PTransform): """A ``PTransform`` for annotating video using the GCP Video Intelligence API ref: https://cloud.google.com/video-intelligence/docs """ def __init__( - self, features, video_context=None, location_id=None, metadata=None): + self, + features, + video_context=None, + location_id=None, + metadata=None, + timeout=120): """ Args: features: (List[``videointelligence_v1.enums.Feature``]) Required. @@ -50,12 +69,15 @@ def __init__( based on video file location. metadata: (Sequence[Tuple[str, str]]) Optional. Additional metadata that is provided to the method. + timeout: (int) Optional. + The time in seconds to wait for the response from the Video Intelligence API """ super(AnnotateVideo, self).__init__() self.features = features self.video_context = video_context self.location_id = location_id self.metadata = metadata + self.timeout = timeout def expand(self, pvalue): return pvalue | ParDo( @@ -63,25 +85,27 @@ def expand(self, pvalue): features=self.features, video_context=self.video_context, location_id=self.location_id, - metadata=self.metadata)) + metadata=self.metadata, + timeout=self.timeout)) @typehints.with_input_types(Union[text_type, binary_type]) class _VideoAnnotateFn(DoFn): - """ A ``DoFn`` that sends every element to the GCP Video Intelligence API - and returns a PCollection of - ``google.cloud.videointelligence_v1.types.AnnotateVideoResponse``. + """ A DoFn that sends each input element to the GCP Video Intelligence API + service and outputs an element with the return result of the API + (``google.cloud.videointelligence_v1.types.AnnotateVideoResponse``). """ - def __init__(self, features, video_context, location_id, metadata): + def __init__(self, features, video_context, location_id, metadata, timeout): super(AnnotateVideo._VideoAnnotateFn, self).__init__() self._client = None self.features = features self.video_context = video_context self.location_id = location_id self.metadata = metadata + self.timeout = timeout self.counter = Metrics.counter(self.__class__, "API Calls") def start_bundle(self): - self._client = helper.get_videointelligence_client() + self._client = get_videointelligence_client() def process(self, element, *args, **kwargs): if isinstance(element, text_type): # Is element an URI to a GCS bucket @@ -104,4 +128,4 @@ def process(self, element, *args, **kwargs): " got {} instead".format( self.__class__.__name__, text_type, binary_type, type(element))) self.counter.inc() - yield response.result(timeout=120) + yield response.result(timeout=self.timeout) diff --git a/sdks/python/apache_beam/ml/gcp/video_intelligence_helper.py b/sdks/python/apache_beam/ml/gcp/video_intelligence_helper.py deleted file mode 100644 index 0100a6f42d8c..000000000000 --- a/sdks/python/apache_beam/ml/gcp/video_intelligence_helper.py +++ /dev/null @@ -1,36 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -Cloud Video Intelligence client - -For internal use only; no backwards-compatibility guarantees. -""" - -# pytype: skip-file - -from __future__ import absolute_import - -from cachetools.func import ttl_cache -from google.cloud import videointelligence - - -@ttl_cache(maxsize=128, ttl=3600) -def get_videointelligence_client(): - """Returns a Cloud Video Intelligence client.""" - _client = videointelligence.VideoIntelligenceServiceClient() - return _client diff --git a/sdks/python/apache_beam/ml/gcp/video_intelligence_test.py b/sdks/python/apache_beam/ml/gcp/video_intelligence_test.py index 7fe00ee05e2c..458e1ed03087 100644 --- a/sdks/python/apache_beam/ml/gcp/video_intelligence_test.py +++ b/sdks/python/apache_beam/ml/gcp/video_intelligence_test.py @@ -27,14 +27,13 @@ import apache_beam as beam from apache_beam.metrics import MetricsFilter -from apache_beam.ml.gcp import video_intelligence_helper as helper, \ - video_intelligence from apache_beam.typehints.decorators import TypeCheckError # Protect against environments where video intelligence lib is not available. try: from google.cloud.videointelligence import VideoIntelligenceServiceClient from google.cloud import videointelligence + from apache_beam.ml.gcp import video_intelligence except ImportError: VideoIntelligenceServiceClient = None @@ -61,7 +60,7 @@ def test_AnnotateVideo_URIs(self): 'gs://cloud-samples-data/video/cat.mp4' ] expected_counter = len(videos_to_annotate) - with mock.patch.object(helper, + with mock.patch.object(video_intelligence, 'get_videointelligence_client', return_value=self._mock_client): p = beam.Pipeline() @@ -85,7 +84,7 @@ def test_AnnotateVideo_b64_content(self): base_64_encoded_video, base_64_encoded_video, base_64_encoded_video ] expected_counter = len(videos_to_annotate) - with mock.patch.object(helper, + with mock.patch.object(video_intelligence, 'get_videointelligence_client', return_value=self._mock_client): p = beam.Pipeline() @@ -104,7 +103,7 @@ def test_AnnotateVideo_b64_content(self): def test_AnnotateVideo_bad_input(self): videos_to_annotate = [123456789, 123456789, 123456789] - with mock.patch.object(helper, + with mock.patch.object(video_intelligence, 'get_videointelligence_client', return_value=self._mock_client): with self.assertRaises(TypeCheckError): @@ -120,7 +119,7 @@ def test_AnnotateVideo_bad_input(self): def test_AnnotateVideo_video_context(self): videos_to_annotate = ['gs://cloud-samples-data/video/cat.mp4'] expected_counter = len(videos_to_annotate) - with mock.patch.object(helper, + with mock.patch.object(video_intelligence, 'get_videointelligence_client', return_value=self._mock_client): p = beam.Pipeline() From ebb701c6202ac1ac0b3d5d5992e78807bd868b2e Mon Sep 17 00:00:00 2001 From: EDjur Date: Thu, 6 Feb 2020 19:56:52 +0100 Subject: [PATCH 07/20] Optimise imports --- sdks/python/apache_beam/ml/gcp/video_intelligence.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/gcp/video_intelligence.py b/sdks/python/apache_beam/ml/gcp/video_intelligence.py index 8b48bcc42771..e10c9072a116 100644 --- a/sdks/python/apache_beam/ml/gcp/video_intelligence.py +++ b/sdks/python/apache_beam/ml/gcp/video_intelligence.py @@ -20,6 +20,7 @@ """ from __future__ import absolute_import +from cachetools.func import ttl_cache from future.utils import binary_type, text_type from typing import Union @@ -33,7 +34,6 @@ raise ImportError( 'Google Cloud Video Intelligence not supported for this execution environment ' '(could not import google.cloud.videointelligence).') -from cachetools.func import ttl_cache __all__ = ['AnnotateVideo'] From 50e5be0ac80fccaa499732ce68e147370b303d3e Mon Sep 17 00:00:00 2001 From: EDjur Date: Mon, 10 Feb 2020 09:17:15 +0100 Subject: [PATCH 08/20] Rename to make more consistent with the rest of BEAM --- ..._intelligence.py => videointelligenceml.py} | 0 ...nce_test.py => videointelligenceml_test.py} | 18 +++++++++--------- 2 files changed, 9 insertions(+), 9 deletions(-) rename sdks/python/apache_beam/ml/gcp/{video_intelligence.py => videointelligenceml.py} (100%) rename sdks/python/apache_beam/ml/gcp/{video_intelligence_test.py => videointelligenceml_test.py} (89%) diff --git a/sdks/python/apache_beam/ml/gcp/video_intelligence.py b/sdks/python/apache_beam/ml/gcp/videointelligenceml.py similarity index 100% rename from sdks/python/apache_beam/ml/gcp/video_intelligence.py rename to sdks/python/apache_beam/ml/gcp/videointelligenceml.py diff --git a/sdks/python/apache_beam/ml/gcp/video_intelligence_test.py b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py similarity index 89% rename from sdks/python/apache_beam/ml/gcp/video_intelligence_test.py rename to sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py index 458e1ed03087..75dd3ddc8ebc 100644 --- a/sdks/python/apache_beam/ml/gcp/video_intelligence_test.py +++ b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py @@ -33,7 +33,7 @@ try: from google.cloud.videointelligence import VideoIntelligenceServiceClient from google.cloud import videointelligence - from apache_beam.ml.gcp import video_intelligence + from apache_beam.ml.gcp import videointelligenceml except ImportError: VideoIntelligenceServiceClient = None @@ -60,14 +60,14 @@ def test_AnnotateVideo_URIs(self): 'gs://cloud-samples-data/video/cat.mp4' ] expected_counter = len(videos_to_annotate) - with mock.patch.object(video_intelligence, + with mock.patch.object(videointelligenceml, 'get_videointelligence_client', return_value=self._mock_client): p = beam.Pipeline() _ = ( p | "Create data" >> beam.Create(videos_to_annotate) - | "Annotate video" >> video_intelligence.AnnotateVideo(self.features)) + | "Annotate video" >> videointelligenceml.AnnotateVideo(self.features)) result = p.run() result.wait_until_finish() @@ -84,14 +84,14 @@ def test_AnnotateVideo_b64_content(self): base_64_encoded_video, base_64_encoded_video, base_64_encoded_video ] expected_counter = len(videos_to_annotate) - with mock.patch.object(video_intelligence, + with mock.patch.object(videointelligenceml, 'get_videointelligence_client', return_value=self._mock_client): p = beam.Pipeline() _ = ( p | "Create data" >> beam.Create(videos_to_annotate) - | "Annotate video" >> video_intelligence.AnnotateVideo(self.features)) + | "Annotate video" >> videointelligenceml.AnnotateVideo(self.features)) result = p.run() result.wait_until_finish() @@ -103,7 +103,7 @@ def test_AnnotateVideo_b64_content(self): def test_AnnotateVideo_bad_input(self): videos_to_annotate = [123456789, 123456789, 123456789] - with mock.patch.object(video_intelligence, + with mock.patch.object(videointelligenceml, 'get_videointelligence_client', return_value=self._mock_client): with self.assertRaises(TypeCheckError): @@ -112,21 +112,21 @@ def test_AnnotateVideo_bad_input(self): p | "Create data" >> beam.Create(videos_to_annotate) | - "Annotate video" >> video_intelligence.AnnotateVideo(self.features)) + "Annotate video" >> videointelligenceml.AnnotateVideo(self.features)) result = p.run() result.wait_until_finish() def test_AnnotateVideo_video_context(self): videos_to_annotate = ['gs://cloud-samples-data/video/cat.mp4'] expected_counter = len(videos_to_annotate) - with mock.patch.object(video_intelligence, + with mock.patch.object(videointelligenceml, 'get_videointelligence_client', return_value=self._mock_client): p = beam.Pipeline() _ = ( p | "Create data" >> beam.Create(videos_to_annotate) - | "Annotate video" >> video_intelligence.AnnotateVideo( + | "Annotate video" >> videointelligenceml.AnnotateVideo( self.features, video_context=self.video_context, location_id=self.location_id)) From a3d596188e1d4548d5f8f9c464bcccf5137e8a33 Mon Sep 17 00:00:00 2001 From: EDjur Date: Mon, 10 Feb 2020 09:20:45 +0100 Subject: [PATCH 09/20] Docstring 'video_intelligence' -> 'videointelligenceml' --- sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py index 75dd3ddc8ebc..9cd4871aa264 100644 --- a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py +++ b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py @@ -15,7 +15,7 @@ # limitations under the License. # -"""Unit tests for video_intelligence.""" +"""Unit tests for videointelligenceml.""" # pytype: skip-file From 73e14a6bee706c330e5365acd90cf053e2cd6509 Mon Sep 17 00:00:00 2001 From: EDjur Date: Tue, 11 Feb 2020 09:14:44 +0100 Subject: [PATCH 10/20] yapf formatting --- .../apache_beam/ml/gcp/videointelligenceml_test.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py index 9cd4871aa264..969eb62bdbbf 100644 --- a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py +++ b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py @@ -67,7 +67,8 @@ def test_AnnotateVideo_URIs(self): _ = ( p | "Create data" >> beam.Create(videos_to_annotate) - | "Annotate video" >> videointelligenceml.AnnotateVideo(self.features)) + | + "Annotate video" >> videointelligenceml.AnnotateVideo(self.features)) result = p.run() result.wait_until_finish() @@ -91,7 +92,8 @@ def test_AnnotateVideo_b64_content(self): _ = ( p | "Create data" >> beam.Create(videos_to_annotate) - | "Annotate video" >> videointelligenceml.AnnotateVideo(self.features)) + | + "Annotate video" >> videointelligenceml.AnnotateVideo(self.features)) result = p.run() result.wait_until_finish() @@ -111,8 +113,8 @@ def test_AnnotateVideo_bad_input(self): _ = ( p | "Create data" >> beam.Create(videos_to_annotate) - | - "Annotate video" >> videointelligenceml.AnnotateVideo(self.features)) + | "Annotate video" >> videointelligenceml.AnnotateVideo( + self.features)) result = p.run() result.wait_until_finish() From 7450cea79c4ef4bd5ef3196c096039681b227472 Mon Sep 17 00:00:00 2001 From: EDjur Date: Tue, 11 Feb 2020 20:24:54 +0100 Subject: [PATCH 11/20] Added new feature note in changes.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index ce39f8303853..149acf0cd147 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -30,6 +30,7 @@ ### New Features / Improvements * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). +* New AnnotateVideo PTransform that integrates GCP Video Intelligence functionality ([BEAM-9146](https://issues.apache.org/jira/browse/BEAM-9146)) ### Breaking Changes From 2723b7193f25deca7f7f1677f173f7b5e489022d Mon Sep 17 00:00:00 2001 From: EDjur Date: Wed, 12 Feb 2020 22:14:01 +0100 Subject: [PATCH 12/20] Lint fixes --- sdks/python/apache_beam/ml/gcp/videointelligenceml.py | 7 ++++--- sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py | 1 + 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/ml/gcp/videointelligenceml.py b/sdks/python/apache_beam/ml/gcp/videointelligenceml.py index e10c9072a116..79940f78e434 100644 --- a/sdks/python/apache_beam/ml/gcp/videointelligenceml.py +++ b/sdks/python/apache_beam/ml/gcp/videointelligenceml.py @@ -32,8 +32,8 @@ from google.cloud import videointelligence except ImportError: raise ImportError( - 'Google Cloud Video Intelligence not supported for this execution environment ' - '(could not import google.cloud.videointelligence).') + 'Google Cloud Video Intelligence not supported for this execution ' + 'environment (could not import google.cloud.videointelligence).') __all__ = ['AnnotateVideo'] @@ -70,7 +70,8 @@ def __init__( metadata: (Sequence[Tuple[str, str]]) Optional. Additional metadata that is provided to the method. timeout: (int) Optional. - The time in seconds to wait for the response from the Video Intelligence API + The time in seconds to wait for the response from the + Video Intelligence API """ super(AnnotateVideo, self).__init__() self.features = features diff --git a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py index 969eb62bdbbf..89a30e5a22f2 100644 --- a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py +++ b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py @@ -30,6 +30,7 @@ from apache_beam.typehints.decorators import TypeCheckError # Protect against environments where video intelligence lib is not available. +# pylint: disable=wrong-import-order, wrong-import-position try: from google.cloud.videointelligence import VideoIntelligenceServiceClient from google.cloud import videointelligence From 4a3b8dbf5c5d0c021bbe9b7d12bf42ed4c30bf88 Mon Sep 17 00:00:00 2001 From: EDjur Date: Wed, 12 Feb 2020 22:34:20 +0100 Subject: [PATCH 13/20] Disabled correct pylint metric --- sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py index 89a30e5a22f2..974126af937d 100644 --- a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py +++ b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py @@ -30,7 +30,7 @@ from apache_beam.typehints.decorators import TypeCheckError # Protect against environments where video intelligence lib is not available. -# pylint: disable=wrong-import-order, wrong-import-position +# pylint: disable=ungrouped-imports try: from google.cloud.videointelligence import VideoIntelligenceServiceClient from google.cloud import videointelligence From 03135c515d3ddb810ddc5883e22b19f2115708f3 Mon Sep 17 00:00:00 2001 From: EDjur Date: Thu, 13 Feb 2020 09:10:30 +0100 Subject: [PATCH 14/20] Re-ordered imports --- sdks/python/apache_beam/ml/gcp/videointelligenceml.py | 9 ++++++--- .../apache_beam/ml/gcp/videointelligenceml_test.py | 3 ++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/ml/gcp/videointelligenceml.py b/sdks/python/apache_beam/ml/gcp/videointelligenceml.py index 79940f78e434..c37263c98cc6 100644 --- a/sdks/python/apache_beam/ml/gcp/videointelligenceml.py +++ b/sdks/python/apache_beam/ml/gcp/videointelligenceml.py @@ -20,13 +20,16 @@ """ from __future__ import absolute_import -from cachetools.func import ttl_cache -from future.utils import binary_type, text_type +from future.utils import binary_type +from future.utils import text_type from typing import Union from apache_beam import typehints from apache_beam.metrics import Metrics -from apache_beam.transforms import DoFn, ParDo, PTransform +from apache_beam.transforms import DoFn +from apache_beam.transforms import ParDo +from apache_beam.transforms import PTransform +from cachetools.func import ttl_cache try: from google.cloud import videointelligence diff --git a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py index 974126af937d..359a8a4d97e2 100644 --- a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py +++ b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py @@ -19,7 +19,8 @@ # pytype: skip-file -from __future__ import absolute_import, unicode_literals +from __future__ import absolute_import +from __future__ import unicode_literals import unittest From ababbf6c48d843f63bb3e32e8299c55eae75d9d4 Mon Sep 17 00:00:00 2001 From: EDjur Date: Thu, 13 Feb 2020 11:06:53 +0100 Subject: [PATCH 15/20] Re-ordered imports --- sdks/python/apache_beam/ml/gcp/videointelligenceml.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/gcp/videointelligenceml.py b/sdks/python/apache_beam/ml/gcp/videointelligenceml.py index c37263c98cc6..776eb3e47e90 100644 --- a/sdks/python/apache_beam/ml/gcp/videointelligenceml.py +++ b/sdks/python/apache_beam/ml/gcp/videointelligenceml.py @@ -20,9 +20,10 @@ """ from __future__ import absolute_import +from typing import Union + from future.utils import binary_type from future.utils import text_type -from typing import Union from apache_beam import typehints from apache_beam.metrics import Metrics From 6818f354db5dd943daa981b8b7041ce7dfd5fd8f Mon Sep 17 00:00:00 2001 From: EDjur Date: Thu, 13 Feb 2020 11:56:31 +0100 Subject: [PATCH 16/20] Added call to unittest.main() in tests --- sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py index 359a8a4d97e2..ffb37f764096 100644 --- a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py +++ b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py @@ -22,6 +22,7 @@ from __future__ import absolute_import from __future__ import unicode_literals +import logging import unittest import mock @@ -142,3 +143,8 @@ def test_AnnotateVideo_video_context(self): if query_result['counters']: read_counter = query_result['counters'][0] self.assertTrue(read_counter.committed == expected_counter) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() From 4c9a150575c29f67a8126929605d059adba3d470 Mon Sep 17 00:00:00 2001 From: EDjur Date: Tue, 18 Feb 2020 14:23:52 +0100 Subject: [PATCH 17/20] Split into 2 PTransforms -> 1. Video context as side input. 2. Video context as part of element tuple --- .../apache_beam/ml/gcp/videointelligenceml.py | 193 +++++++++++++----- .../ml/gcp/videointelligenceml_test.py | 110 +++++++--- 2 files changed, 228 insertions(+), 75 deletions(-) diff --git a/sdks/python/apache_beam/ml/gcp/videointelligenceml.py b/sdks/python/apache_beam/ml/gcp/videointelligenceml.py index 776eb3e47e90..d070dc3d0632 100644 --- a/sdks/python/apache_beam/ml/gcp/videointelligenceml.py +++ b/sdks/python/apache_beam/ml/gcp/videointelligenceml.py @@ -20,6 +20,8 @@ """ from __future__ import absolute_import +from typing import Optional +from typing import Tuple from typing import Union from future.utils import binary_type @@ -39,7 +41,7 @@ 'Google Cloud Video Intelligence not supported for this execution ' 'environment (could not import google.cloud.videointelligence).') -__all__ = ['AnnotateVideo'] +__all__ = ['AnnotateVideo', 'AnnotateVideoWithContext'] @ttl_cache(maxsize=128, ttl=3600) @@ -52,21 +54,23 @@ def get_videointelligence_client(): class AnnotateVideo(PTransform): """A ``PTransform`` for annotating video using the GCP Video Intelligence API ref: https://cloud.google.com/video-intelligence/docs + + Sends each element to the GCP Video Intelligence API. Element is a + Union[text_type, binary_type] of either an URI (e.g. a GCS URI) or + binary_type base64-encoded video data. + Accepts an `AsDict` side input that maps each video to a video context. """ def __init__( self, features, - video_context=None, location_id=None, metadata=None, - timeout=120): + timeout=120, + context_side_input=None): """ Args: features: (List[``videointelligence_v1.enums.Feature``]) Required. the Video Intelligence API features to detect - video_context: (dict, ``videointelligence_v1.types.VideoContext``) - Optional. - Additional video context and/or feature-specific parameters. location_id: (str) Optional. Cloud region where annotation should take place. If no region is specified, a region will be determined @@ -76,61 +80,152 @@ def __init__( timeout: (int) Optional. The time in seconds to wait for the response from the Video Intelligence API + context_side_input: (beam.pvalue.AsDict) Optional. + An ``AsDict`` of a PCollection to be passed to the + _VideoAnnotateFn as the video context mapping containing additional + video context and/or feature-specific parameters. + Example usage: + video_contexts = + [ + ('gs://cloud-samples-data/video/cat.mp4', + Union[dict, ``videointelligence_v1.types.VideoContext``]), + + ('gs://some-other-video/sample.mp4', + Union[dict, ``videointelligence_v1.types.VideoContext``]), + ] + context_side_input = + ( + p + | "Video contexts" >> beam.Create(video_contexts) + ) + videointelligenceml.AnnotateVideo(features, + context_side_input=beam.pvalue.AsDict(context_side_input))) """ super(AnnotateVideo, self).__init__() self.features = features - self.video_context = video_context self.location_id = location_id self.metadata = metadata self.timeout = timeout + self.context_side_input = context_side_input def expand(self, pvalue): return pvalue | ParDo( - self._VideoAnnotateFn( + _VideoAnnotateFn( features=self.features, - video_context=self.video_context, location_id=self.location_id, metadata=self.metadata, - timeout=self.timeout)) + timeout=self.timeout), + context_side_input=self.context_side_input) - @typehints.with_input_types(Union[text_type, binary_type]) - class _VideoAnnotateFn(DoFn): - """ A DoFn that sends each input element to the GCP Video Intelligence API - service and outputs an element with the return result of the API - (``google.cloud.videointelligence_v1.types.AnnotateVideoResponse``). - """ - def __init__(self, features, video_context, location_id, metadata, timeout): - super(AnnotateVideo._VideoAnnotateFn, self).__init__() - self._client = None - self.features = features - self.video_context = video_context - self.location_id = location_id - self.metadata = metadata - self.timeout = timeout - self.counter = Metrics.counter(self.__class__, "API Calls") - - def start_bundle(self): - self._client = get_videointelligence_client() - - def process(self, element, *args, **kwargs): - if isinstance(element, text_type): # Is element an URI to a GCS bucket - response = self._client.annotate_video( - input_uri=element, - features=self.features, - video_context=self.video_context, - location_id=self.location_id, - metadata=self.metadata) - elif isinstance(element, binary_type): # Is element raw bytes - response = self._client.annotate_video( - input_content=element, + +@typehints.with_input_types( + Union[text_type, binary_type], + Optional[videointelligence.types.VideoContext]) +class _VideoAnnotateFn(DoFn): + """ A DoFn that sends each input element to the GCP Video Intelligence API + service and outputs an element with the return result of the API + (``google.cloud.videointelligence_v1.types.AnnotateVideoResponse``). + """ + def __init__(self, features, location_id, metadata, timeout): + super(_VideoAnnotateFn, self).__init__() + self._client = None + self.features = features + self.location_id = location_id + self.metadata = metadata + self.timeout = timeout + self.counter = Metrics.counter(self.__class__, "API Calls") + + def start_bundle(self): + self._client = get_videointelligence_client() + + def _annotate_video(self, element, video_context): + if isinstance(element, text_type): # Is element an URI to a GCS bucket + response = self._client.annotate_video( + input_uri=element, + features=self.features, + video_context=video_context, + location_id=self.location_id, + metadata=self.metadata) + else: # Is element raw bytes + response = self._client.annotate_video( + input_content=element, + features=self.features, + video_context=video_context, + location_id=self.location_id, + metadata=self.metadata) + return response + + def process(self, element, context_side_input=None, *args, **kwargs): + if context_side_input: # If we have a side input video context, use that + video_context = context_side_input.get(element) + else: + video_context = None + response = self._annotate_video(element, video_context) + self.counter.inc() + yield response.result(timeout=self.timeout) + + +class AnnotateVideoWithContext(AnnotateVideo): + """A ``PTransform`` for annotating video using the GCP Video Intelligence API + ref: https://cloud.google.com/video-intelligence/docs + + Sends each element to the GCP Video Intelligence API. + Element is a tuple of + + (Union[text_type, binary_type], + Optional[videointelligence.types.VideoContext]) + + where the former is either an URI (e.g. a GCS URI) or + binary_type base64-encoded video data + """ + def __init__(self, features, location_id=None, metadata=None, timeout=120): + """ + Args: + features: (List[``videointelligence_v1.enums.Feature``]) Required. + the Video Intelligence API features to detect + location_id: (str) Optional. + Cloud region where annotation should take place. + If no region is specified, a region will be determined + based on video file location. + metadata: (Sequence[Tuple[str, str]]) Optional. + Additional metadata that is provided to the method. + timeout: (int) Optional. + The time in seconds to wait for the response from the + Video Intelligence API + """ + super(AnnotateVideoWithContext, self).__init__( + features=features, + location_id=location_id, + metadata=metadata, + timeout=timeout) + + def expand(self, pvalue): + return pvalue | ParDo( + _VideoAnnotateFnWithContext( features=self.features, - video_context=self.video_context, location_id=self.location_id, - metadata=self.metadata) - else: - raise TypeError( - "{}: input element needs to be either {} or {}" - " got {} instead".format( - self.__class__.__name__, text_type, binary_type, type(element))) - self.counter.inc() - yield response.result(timeout=self.timeout) + metadata=self.metadata, + timeout=self.timeout)) + + +@typehints.with_input_types( + Tuple[Union[text_type, binary_type], + Optional[videointelligence.types.VideoContext]]) +class _VideoAnnotateFnWithContext(_VideoAnnotateFn): + """ A DoFn that unpacks each input tuple to element, video_context variables + and sends these to the GCP Video Intelligence API service and outputs + an element with the return result of the API + (``google.cloud.videointelligence_v1.types.AnnotateVideoResponse``). + """ + def __init__(self, features, location_id, metadata, timeout): + super(_VideoAnnotateFnWithContext, self).__init__( + features=features, + location_id=location_id, + metadata=metadata, + timeout=timeout) + + def process(self, element, *args, **kwargs): + element, video_context = element # Unpack (video, video_context) tuple + response = self._annotate_video(element, video_context) + self.counter.inc() + yield response.result(timeout=self.timeout) diff --git a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py index ffb37f764096..b0bb441c5bba 100644 --- a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py +++ b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test.py @@ -51,11 +51,44 @@ def setUp(self): self.m2.result.return_value = None self._mock_client.annotate_video.return_value = self.m2 self.features = [videointelligence.enums.Feature.LABEL_DETECTION] + self.location_id = 'us-west1' config = videointelligence.types.SpeechTranscriptionConfig( language_code='en-US', enable_automatic_punctuation=True) - self.video_context = videointelligence.types.VideoContext( + self.video_ctx = videointelligence.types.VideoContext( speech_transcription_config=config) - self.location_id = 'us-west1' + + def test_AnnotateVideo_with_side_input_context(self): + videos_to_annotate = [ + 'gs://cloud-samples-data/video/cat.mp4', + 'gs://some-other-video/sample.mp4', + 'gs://some-other-video/sample_2.mp4' + ] + video_contexts = [ + ('gs://cloud-samples-data/video/cat.mp4', self.video_ctx), + ('gs://some-other-video/sample.mp4', self.video_ctx), + ] + + expected_counter = len(videos_to_annotate) + with mock.patch.object(videointelligenceml, + 'get_videointelligence_client', + return_value=self._mock_client): + p = beam.Pipeline() + context_side_input = (p | "Video contexts" >> beam.Create(video_contexts)) + + _ = ( + p + | "Create data" >> beam.Create(videos_to_annotate) + | "Annotate video" >> videointelligenceml.AnnotateVideo( + self.features, + context_side_input=beam.pvalue.AsDict(context_side_input))) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.committed == expected_counter) def test_AnnotateVideo_URIs(self): videos_to_annotate = [ @@ -81,11 +114,40 @@ def test_AnnotateVideo_URIs(self): read_counter = query_result['counters'][0] self.assertTrue(read_counter.committed == expected_counter) + def test_AnnotateVideoWithContext_b64_content(self): + base_64_encoded_video = \ + b'YmVnaW4gNjQ0IGNhdC12aWRlby5tcDRNICAgICgmOVQ+NyFNPCMwUi4uZmFrZV92aWRlb' + videos_to_annotate = [ + (base_64_encoded_video, self.video_ctx), + (base_64_encoded_video, None), + (base_64_encoded_video, self.video_ctx), + ] + expected_counter = len(videos_to_annotate) + with mock.patch.object(videointelligenceml, + 'get_videointelligence_client', + return_value=self._mock_client): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(videos_to_annotate) + | "Annotate video" >> videointelligenceml.AnnotateVideoWithContext( + self.features)) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.committed == expected_counter) + def test_AnnotateVideo_b64_content(self): base_64_encoded_video = \ - b'begin 644 cat-video.mp4M (&9T>7!M<#0R..fake_video_content' + b'YmVnaW4gNjQ0IGNhdC12aWRlby5tcDRNICAgICgmOVQ+NyFNPCMwUi4uZmFrZV92aWRlb' videos_to_annotate = [ - base_64_encoded_video, base_64_encoded_video, base_64_encoded_video + base_64_encoded_video, + base_64_encoded_video, + base_64_encoded_video, ] expected_counter = len(videos_to_annotate) with mock.patch.object(videointelligenceml, @@ -106,8 +168,12 @@ def test_AnnotateVideo_b64_content(self): read_counter = query_result['counters'][0] self.assertTrue(read_counter.committed == expected_counter) - def test_AnnotateVideo_bad_input(self): - videos_to_annotate = [123456789, 123456789, 123456789] + def test_AnnotateVideoWithContext_bad_input(self): + """AnnotateVideoWithContext should not accept videos without context""" + videos_to_annotate = [ + 'gs://cloud-samples-data/video/cat.mp4', + 'gs://cloud-samples-data/video/cat.mp4' + ] with mock.patch.object(videointelligenceml, 'get_videointelligence_client', return_value=self._mock_client): @@ -116,33 +182,25 @@ def test_AnnotateVideo_bad_input(self): _ = ( p | "Create data" >> beam.Create(videos_to_annotate) - | "Annotate video" >> videointelligenceml.AnnotateVideo( + | "Annotate video" >> videointelligenceml.AnnotateVideoWithContext( self.features)) result = p.run() result.wait_until_finish() - def test_AnnotateVideo_video_context(self): - videos_to_annotate = ['gs://cloud-samples-data/video/cat.mp4'] - expected_counter = len(videos_to_annotate) + def test_AnnotateVideo_bad_input(self): + videos_to_annotate = [123456789, 123456789, 123456789] with mock.patch.object(videointelligenceml, 'get_videointelligence_client', return_value=self._mock_client): - p = beam.Pipeline() - _ = ( - p - | "Create data" >> beam.Create(videos_to_annotate) - | "Annotate video" >> videointelligenceml.AnnotateVideo( - self.features, - video_context=self.video_context, - location_id=self.location_id)) - result = p.run() - result.wait_until_finish() - - read_filter = MetricsFilter().with_name('API Calls') - query_result = result.metrics().query(read_filter) - if query_result['counters']: - read_counter = query_result['counters'][0] - self.assertTrue(read_counter.committed == expected_counter) + with self.assertRaises(TypeCheckError): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(videos_to_annotate) + | "Annotate video" >> videointelligenceml.AnnotateVideo( + self.features)) + result = p.run() + result.wait_until_finish() if __name__ == '__main__': From 702e3d7732ad37c1bd3a693be86f6ae3b8d81b75 Mon Sep 17 00:00:00 2001 From: EDjur Date: Tue, 18 Feb 2020 14:29:21 +0100 Subject: [PATCH 18/20] Added AnnotateVideoWithContext to CHANGES.md --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 6bed78f46b02..e54229a8a71a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -31,7 +31,7 @@ ### New Features / Improvements * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). -* New AnnotateVideo PTransform that integrates GCP Video Intelligence functionality ([BEAM-9146](https://issues.apache.org/jira/browse/BEAM-9146)) +* New AnnotateVideo & AnnotateVideoWithContext PTransform's that integrates GCP Video Intelligence functionality ([BEAM-9146](https://issues.apache.org/jira/browse/BEAM-9146)) ### Breaking Changes From 4c3af025815a5468c0e76e9127f460f1e5d4fea1 Mon Sep 17 00:00:00 2001 From: Ahmet Altay Date: Tue, 18 Feb 2020 11:28:14 -0800 Subject: [PATCH 19/20] Add final new line. --- sdks/python/apache_beam/ml/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/__init__.py b/sdks/python/apache_beam/ml/__init__.py index ecb1860df848..cce3acad34a4 100644 --- a/sdks/python/apache_beam/ml/__init__.py +++ b/sdks/python/apache_beam/ml/__init__.py @@ -13,4 +13,4 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# \ No newline at end of file +# From 26a19b0c951fe4e0f54a595f028006105e50f12f Mon Sep 17 00:00:00 2001 From: EDjur Date: Wed, 19 Feb 2020 12:22:38 +0100 Subject: [PATCH 20/20] Fixed pycommon:docs issue --- .../apache_beam/ml/gcp/videointelligenceml.py | 87 +++++++++---------- 1 file changed, 43 insertions(+), 44 deletions(-) diff --git a/sdks/python/apache_beam/ml/gcp/videointelligenceml.py b/sdks/python/apache_beam/ml/gcp/videointelligenceml.py index d070dc3d0632..67ff4969f624 100644 --- a/sdks/python/apache_beam/ml/gcp/videointelligenceml.py +++ b/sdks/python/apache_beam/ml/gcp/videointelligenceml.py @@ -15,9 +15,8 @@ # limitations under the License. # -""" -A connector for sending API requests to the GCP Video Intelligence API. -""" +"""A connector for sending API requests to the GCP Video Intelligence API.""" + from __future__ import absolute_import from typing import Optional @@ -68,38 +67,38 @@ def __init__( timeout=120, context_side_input=None): """ - Args: - features: (List[``videointelligence_v1.enums.Feature``]) Required. - the Video Intelligence API features to detect - location_id: (str) Optional. - Cloud region where annotation should take place. - If no region is specified, a region will be determined - based on video file location. - metadata: (Sequence[Tuple[str, str]]) Optional. - Additional metadata that is provided to the method. - timeout: (int) Optional. - The time in seconds to wait for the response from the - Video Intelligence API - context_side_input: (beam.pvalue.AsDict) Optional. - An ``AsDict`` of a PCollection to be passed to the - _VideoAnnotateFn as the video context mapping containing additional - video context and/or feature-specific parameters. - Example usage: - video_contexts = - [ - ('gs://cloud-samples-data/video/cat.mp4', - Union[dict, ``videointelligence_v1.types.VideoContext``]), - - ('gs://some-other-video/sample.mp4', - Union[dict, ``videointelligence_v1.types.VideoContext``]), - ] - context_side_input = - ( - p - | "Video contexts" >> beam.Create(video_contexts) - ) - videointelligenceml.AnnotateVideo(features, - context_side_input=beam.pvalue.AsDict(context_side_input))) + Args: + features: (List[``videointelligence_v1.enums.Feature``]) Required. + The Video Intelligence API features to detect + location_id: (str) Optional. + Cloud region where annotation should take place. + If no region is specified, a region will be determined + based on video file location. + metadata: (Sequence[Tuple[str, str]]) Optional. + Additional metadata that is provided to the method. + timeout: (int) Optional. + The time in seconds to wait for the response from the + Video Intelligence API + context_side_input: (beam.pvalue.AsDict) Optional. + An ``AsDict`` of a PCollection to be passed to the + _VideoAnnotateFn as the video context mapping containing additional + video context and/or feature-specific parameters. + Example usage:: + + video_contexts = + [('gs://cloud-samples-data/video/cat.mp4', Union[dict, + ``videointelligence_v1.types.VideoContext``]), + ('gs://some-other-video/sample.mp4', Union[dict, + ``videointelligence_v1.types.VideoContext``]),] + + context_side_input = + ( + p + | "Video contexts" >> beam.Create(video_contexts) + ) + + videointelligenceml.AnnotateVideo(features, + context_side_input=beam.pvalue.AsDict(context_side_input))) """ super(AnnotateVideo, self).__init__() self.features = features @@ -122,10 +121,10 @@ def expand(self, pvalue): Union[text_type, binary_type], Optional[videointelligence.types.VideoContext]) class _VideoAnnotateFn(DoFn): - """ A DoFn that sends each input element to the GCP Video Intelligence API - service and outputs an element with the return result of the API - (``google.cloud.videointelligence_v1.types.AnnotateVideoResponse``). - """ + """A DoFn that sends each input element to the GCP Video Intelligence API + service and outputs an element with the return result of the API + (``google.cloud.videointelligence_v1.types.AnnotateVideoResponse``). + """ def __init__(self, features, location_id, metadata, timeout): super(_VideoAnnotateFn, self).__init__() self._client = None @@ -212,11 +211,11 @@ def expand(self, pvalue): Tuple[Union[text_type, binary_type], Optional[videointelligence.types.VideoContext]]) class _VideoAnnotateFnWithContext(_VideoAnnotateFn): - """ A DoFn that unpacks each input tuple to element, video_context variables - and sends these to the GCP Video Intelligence API service and outputs - an element with the return result of the API - (``google.cloud.videointelligence_v1.types.AnnotateVideoResponse``). - """ + """A DoFn that unpacks each input tuple to element, video_context variables + and sends these to the GCP Video Intelligence API service and outputs + an element with the return result of the API + (``google.cloud.videointelligence_v1.types.AnnotateVideoResponse``). + """ def __init__(self, features, location_id, metadata, timeout): super(_VideoAnnotateFnWithContext, self).__init__( features=features,