diff --git a/CHANGES.md b/CHANGES.md index b95b0da008d1..00ce7b1b0c25 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -34,7 +34,8 @@ ### New Features / Improvements * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). -* New AnnotateVideo & AnnotateVideoWithContext PTransform's that integrates GCP Video Intelligence functionality ([BEAM-9146](https://issues.apache.org/jira/browse/BEAM-9146)) +* New AnnotateVideo & AnnotateVideoWithContext PTransform's that integrates GCP Video Intelligence functionality. (Python) ([BEAM-9146](https://issues.apache.org/jira/browse/BEAM-9146)) +* New AnnotateImage & AnnotateImageWithContext PTransform's for element-wise & batch image annotation using Google Cloud Vision API. (Python) ([BEAM-9247](https://issues.apache.org/jira/browse/BEAM-9247)) * Added a PTransform for inspection and deidentification of text using Google Cloud DLP. (Python) ([BEAM-9258](https://issues.apache.org/jira/browse/BEAM-9258)) ### Breaking Changes diff --git a/sdks/python/apache_beam/ml/gcp/visionml.py b/sdks/python/apache_beam/ml/gcp/visionml.py new file mode 100644 index 000000000000..13884a7bf0f0 --- /dev/null +++ b/sdks/python/apache_beam/ml/gcp/visionml.py @@ -0,0 +1,292 @@ +# pylint: skip-file +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A connector for sending API requests to the GCP Vision API. +""" + +from __future__ import absolute_import + +from typing import List +from typing import Optional +from typing import Tuple +from typing import Union + +from future.utils import binary_type +from future.utils import text_type + +from apache_beam import typehints +from apache_beam.metrics import Metrics +from apache_beam.transforms import DoFn +from apache_beam.transforms import FlatMap +from apache_beam.transforms import ParDo +from apache_beam.transforms import PTransform +from apache_beam.transforms import util +from cachetools.func import ttl_cache + +try: + from google.cloud import vision +except ImportError: + raise ImportError( + 'Google Cloud Vision not supported for this execution environment ' + '(could not import google.cloud.vision).') + +__all__ = [ + 'AnnotateImage', + 'AnnotateImageWithContext', +] + + +@ttl_cache(maxsize=128, ttl=3600) +def get_vision_client(client_options=None): + """Returns a Cloud Vision API client.""" + _client = vision.ImageAnnotatorClient(client_options=client_options) + return _client + + +class AnnotateImage(PTransform): + """A ``PTransform`` for annotating images using the GCP Vision API. + ref: https://cloud.google.com/vision/docs/ + + Batches elements together using ``util.BatchElements`` PTransform and sends + each batch of elements to the GCP Vision API. + Element is a Union[text_type, binary_type] of either an URI (e.g. a GCS URI) + or binary_type base64-encoded image data. + Accepts an `AsDict` side input that maps each image to an image context. + """ + + MAX_BATCH_SIZE = 5 + MIN_BATCH_SIZE = 1 + + def __init__( + self, + features, + retry=None, + timeout=120, + max_batch_size=None, + min_batch_size=None, + client_options=None, + context_side_input=None, + metadata=None): + """ + Args: + features: (List[``vision.types.Feature.enums.Feature``]) Required. + The Vision API features to detect + retry: (google.api_core.retry.Retry) Optional. + A retry object used to retry requests. + If None is specified (default), requests will not be retried. + timeout: (float) Optional. + The time in seconds to wait for the response from the Vision API. + Default is 120. + max_batch_size: (int) Optional. + Maximum number of images to batch in the same request to the Vision API. + Default is 5 (which is also the Vision API max). + This parameter is primarily intended for testing. + min_batch_size: (int) Optional. + Minimum number of images to batch in the same request to the Vision API. + Default is None. This parameter is primarily intended for testing. + client_options: + (Union[dict, google.api_core.client_options.ClientOptions]) Optional. + Client options used to set user options on the client. + API Endpoint should be set through client_options. + context_side_input: (beam.pvalue.AsDict) Optional. + An ``AsDict`` of a PCollection to be passed to the + _ImageAnnotateFn as the image context mapping containing additional + image context and/or feature-specific parameters. + Example usage:: + + image_contexts = + [(''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict, + ``vision.types.ImageContext()``]), + (''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict, + ``vision.types.ImageContext()``]),] + + context_side_input = + ( + p + | "Image contexts" >> beam.Create(image_contexts) + ) + + visionml.AnnotateImage(features, + context_side_input=beam.pvalue.AsDict(context_side_input))) + metadata: (Optional[Sequence[Tuple[str, str]]]): Optional. + Additional metadata that is provided to the method. + """ + super(AnnotateImage, self).__init__() + self.features = features + self.retry = retry + self.timeout = timeout + self.max_batch_size = max_batch_size or AnnotateImage.MAX_BATCH_SIZE + if self.max_batch_size > AnnotateImage.MAX_BATCH_SIZE: + raise ValueError( + 'Max batch_size exceeded. ' + 'Batch size needs to be smaller than {}'.format( + AnnotateImage.MAX_BATCH_SIZE)) + self.min_batch_size = min_batch_size or AnnotateImage.MIN_BATCH_SIZE + self.client_options = client_options + self.context_side_input = context_side_input + self.metadata = metadata + + def expand(self, pvalue): + return ( + pvalue + | FlatMap(self._create_image_annotation_pairs, self.context_side_input) + | util.BatchElements( + min_batch_size=self.min_batch_size, + max_batch_size=self.max_batch_size) + | ParDo( + _ImageAnnotateFn( + features=self.features, + retry=self.retry, + timeout=self.timeout, + client_options=self.client_options, + metadata=self.metadata))) + + @typehints.with_input_types( + Union[text_type, binary_type], Optional[vision.types.ImageContext]) + @typehints.with_output_types(List[vision.types.AnnotateImageRequest]) + def _create_image_annotation_pairs(self, element, context_side_input): + if context_side_input: # If we have a side input image context, use that + image_context = context_side_input.get(element) + else: + image_context = None + + if isinstance(element, text_type): + image = vision.types.Image( + source=vision.types.ImageSource(image_uri=element)) + else: # Typehint checks only allows text_type or binary_type + image = vision.types.Image(content=element) + + request = vision.types.AnnotateImageRequest( + image=image, features=self.features, image_context=image_context) + yield request + + +class AnnotateImageWithContext(AnnotateImage): + """A ``PTransform`` for annotating images using the GCP Vision API. + ref: https://cloud.google.com/vision/docs/ + Batches elements together using ``util.BatchElements`` PTransform and sends + each batch of elements to the GCP Vision API. + + Element is a tuple of:: + + (Union[text_type, binary_type], + Optional[``vision.types.ImageContext``]) + + where the former is either an URI (e.g. a GCS URI) or binary_type + base64-encoded image data. + """ + def __init__( + self, + features, + retry=None, + timeout=120, + max_batch_size=None, + min_batch_size=None, + client_options=None, + metadata=None): + """ + Args: + features: (List[``vision.types.Feature.enums.Feature``]) Required. + The Vision API features to detect + retry: (google.api_core.retry.Retry) Optional. + A retry object used to retry requests. + If None is specified (default), requests will not be retried. + timeout: (float) Optional. + The time in seconds to wait for the response from the Vision API. + Default is 120. + max_batch_size: (int) Optional. + Maximum number of images to batch in the same request to the Vision API. + Default is 5 (which is also the Vision API max). + This parameter is primarily intended for testing. + min_batch_size: (int) Optional. + Minimum number of images to batch in the same request to the Vision API. + Default is None. This parameter is primarily intended for testing. + client_options: + (Union[dict, google.api_core.client_options.ClientOptions]) Optional. + Client options used to set user options on the client. + API Endpoint should be set through client_options. + metadata: (Optional[Sequence[Tuple[str, str]]]): Optional. + Additional metadata that is provided to the method. + """ + super(AnnotateImageWithContext, self).__init__( + features=features, + retry=retry, + timeout=timeout, + max_batch_size=max_batch_size, + min_batch_size=min_batch_size, + client_options=client_options, + metadata=metadata) + + def expand(self, pvalue): + return ( + pvalue + | FlatMap(self._create_image_annotation_pairs) + | util.BatchElements( + min_batch_size=self.min_batch_size, + max_batch_size=self.max_batch_size) + | ParDo( + _ImageAnnotateFn( + features=self.features, + retry=self.retry, + timeout=self.timeout, + client_options=self.client_options, + metadata=self.metadata))) + + @typehints.with_input_types( + Tuple[Union[text_type, binary_type], Optional[vision.types.ImageContext]]) + @typehints.with_output_types(List[vision.types.AnnotateImageRequest]) + def _create_image_annotation_pairs(self, element, **kwargs): + element, image_context = element # Unpack (image, image_context) tuple + if isinstance(element, text_type): + image = vision.types.Image( + source=vision.types.ImageSource(image_uri=element)) + else: # Typehint checks only allows text_type or binary_type + image = vision.types.Image(content=element) + + request = vision.types.AnnotateImageRequest( + image=image, features=self.features, image_context=image_context) + yield request + + +@typehints.with_input_types(List[vision.types.AnnotateImageRequest]) +class _ImageAnnotateFn(DoFn): + """A DoFn that sends each input element to the GCP Vision API. + Returns ``google.cloud.vision.types.BatchAnnotateImagesResponse``. + """ + def __init__(self, features, retry, timeout, client_options, metadata): + super(_ImageAnnotateFn, self).__init__() + self._client = None + self.features = features + self.retry = retry + self.timeout = timeout + self.client_options = client_options + self.metadata = metadata + self.counter = Metrics.counter(self.__class__, "API Calls") + + def setup(self): + self._client = get_vision_client(self.client_options) + + def process(self, element, *args, **kwargs): + response = self._client.batch_annotate_images( + requests=element, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata) + self.counter.inc() + yield response diff --git a/sdks/python/apache_beam/ml/gcp/visionml_test.py b/sdks/python/apache_beam/ml/gcp/visionml_test.py new file mode 100644 index 000000000000..d4c6c203feaa --- /dev/null +++ b/sdks/python/apache_beam/ml/gcp/visionml_test.py @@ -0,0 +1,251 @@ +# pylint: skip-file +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for visionml.""" + +# pytype: skip-file + +from __future__ import absolute_import +from __future__ import unicode_literals + +import logging +import unittest + +import mock + +import apache_beam as beam +from apache_beam.metrics import MetricsFilter +from apache_beam.typehints.decorators import TypeCheckError + +# Protect against environments where vision lib is not available. +try: + from google.cloud.vision import ImageAnnotatorClient + from google.cloud import vision + from apache_beam.ml.gcp import visionml +except ImportError: + ImageAnnotatorClient = None + + +@unittest.skipIf( + ImageAnnotatorClient is None, 'Vision dependencies are not installed') +class VisionTest(unittest.TestCase): + def setUp(self): + self._mock_client = mock.Mock() + self._mock_client.batch_annotate_images.return_value = None + + feature_type = vision.enums.Feature.Type.TEXT_DETECTION + self.features = [ + vision.types.Feature( + type=feature_type, max_results=3, model="builtin/stable") + ] + self.img_ctx = vision.types.ImageContext() + self.min_batch_size = 1 + self.max_batch_size = 1 + + def test_AnnotateImage_URIs(self): + images_to_annotate = [ + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg' + ] + + expected_counter = len(images_to_annotate) + with mock.patch.object(visionml, + 'get_vision_client', + return_value=self._mock_client): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(images_to_annotate) + | "Annotate image" >> visionml.AnnotateImage( + self.features, + min_batch_size=self.min_batch_size, + max_batch_size=self.max_batch_size)) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.result == expected_counter) + + def test_AnnotateImage_URI_with_side_input_context(self): + images_to_annotate = [ + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg' + ] + image_contexts = [ + ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), + ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), + ] + + expected_counter = len(images_to_annotate) + with mock.patch.object(visionml, + 'get_vision_client', + return_value=self._mock_client): + p = beam.Pipeline() + context_side_input = (p | "Image contexts" >> beam.Create(image_contexts)) + + _ = ( + p + | "Create data" >> beam.Create(images_to_annotate) + | "Annotate image" >> visionml.AnnotateImage( + self.features, + min_batch_size=self.min_batch_size, + max_batch_size=self.max_batch_size, + context_side_input=beam.pvalue.AsDict(context_side_input))) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.result == expected_counter) + + def test_AnnotateImage_b64_content(self): + base_64_encoded_image = \ + b'YmVnaW4gNjQ0IGNhdC12aWRlby5tcDRNICAgICgmOVQ+NyFNPCMwUi4uZmFrZV92aWRlb' + images_to_annotate = [ + base_64_encoded_image, + base_64_encoded_image, + base_64_encoded_image, + ] + expected_counter = len(images_to_annotate) + with mock.patch.object(visionml, + 'get_vision_client', + return_value=self._mock_client): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(images_to_annotate) + | "Annotate image" >> visionml.AnnotateImage( + self.features, + min_batch_size=self.min_batch_size, + max_batch_size=self.max_batch_size)) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.result == expected_counter) + + def test_AnnotateImageWithContext_URIs(self): + images_to_annotate = [ + ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), + ('gs://cloud-samples-data/vision/ocr/sign.jpg', None), + ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), + ] + batch_size = 5 + expected_counter = 1 # All images should fit in the same batch + with mock.patch.object(visionml, + 'get_vision_client', + return_value=self._mock_client): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(images_to_annotate) + | "Annotate image" >> visionml.AnnotateImageWithContext( + self.features, + min_batch_size=batch_size, + max_batch_size=batch_size)) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.result == expected_counter) + + def test_AnnotateImageWithContext_bad_input(self): + """AnnotateImageWithContext should not accept images without context""" + images_to_annotate = [ + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg' + ] + with mock.patch.object(visionml, + 'get_vision_client', + return_value=self._mock_client): + with self.assertRaises(TypeCheckError): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(images_to_annotate) + | "Annotate image" >> visionml.AnnotateImageWithContext( + self.features)) + result = p.run() + result.wait_until_finish() + + def test_AnnotateImage_bad_input(self): + images_to_annotate = [123456789, 123456789, 123456789] + with mock.patch.object(visionml, + 'get_vision_client', + return_value=self._mock_client): + with self.assertRaises(TypeCheckError): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(images_to_annotate) + | "Annotate image" >> visionml.AnnotateImage(self.features)) + result = p.run() + result.wait_until_finish() + + def test_AnnotateImage_URIs_large_batch(self): + images_to_annotate = [ + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + ] + + batch_size = 5 + expected_counter = 3 # All 11 images should fit in 3 batches + with mock.patch.object(visionml, + 'get_vision_client', + return_value=self._mock_client): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(images_to_annotate) + | "Annotate image" >> visionml.AnnotateImage( + self.features, + max_batch_size=batch_size, + min_batch_size=batch_size)) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.result == expected_counter) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 1cfcb2e8e7b9..c976b6f083c9 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -213,7 +213,8 @@ def get_version(): 'google-cloud-spanner>=1.13.0,<1.14.0', 'grpcio-gcp>=0.2.2,<1', # GCP Packages required by ML functionality - 'google-cloud-videointelligence>=1.8.0<1.14.0', + 'google-cloud-videointelligence>=1.8.0,<1.14.0', + 'google-cloud-vision>=0.38.0,<0.43.0', ] INTERACTIVE_BEAM = [