From cae4f82957017df31a68de36288b74db2616fa78 Mon Sep 17 00:00:00 2001 From: EDjur Date: Thu, 13 Feb 2020 14:08:04 +0100 Subject: [PATCH 01/11] Initial visionapi integration --- sdks/python/apache_beam/ml/__init__.py | 0 sdks/python/apache_beam/ml/gcp/__init__.py | 0 sdks/python/apache_beam/ml/gcp/visionml.py | 108 ++++++++++++ .../apache_beam/ml/gcp/visionml_test.py | 158 ++++++++++++++++++ 4 files changed, 266 insertions(+) create mode 100644 sdks/python/apache_beam/ml/__init__.py create mode 100644 sdks/python/apache_beam/ml/gcp/__init__.py create mode 100644 sdks/python/apache_beam/ml/gcp/visionml.py create mode 100644 sdks/python/apache_beam/ml/gcp/visionml_test.py diff --git a/sdks/python/apache_beam/ml/__init__.py b/sdks/python/apache_beam/ml/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/python/apache_beam/ml/gcp/__init__.py b/sdks/python/apache_beam/ml/gcp/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/python/apache_beam/ml/gcp/visionml.py b/sdks/python/apache_beam/ml/gcp/visionml.py new file mode 100644 index 000000000000..f9bdf5eb3dbf --- /dev/null +++ b/sdks/python/apache_beam/ml/gcp/visionml.py @@ -0,0 +1,108 @@ +# pylint: skip-file +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A connector for sending API requests to the GCP Vision API. +""" + +# TODO: Add Batching support https://google-cloud-python.readthedocs.io/en/0.32.0/vision/gapic/v1/api.html#google.cloud.vision_v1.ImageAnnotatorClient.batch_annotate_images + +from __future__ import absolute_import, print_function + +from cachetools.func import ttl_cache +from future.utils import binary_type, text_type +from typing import Tuple +from typing import Union + +from apache_beam import typehints +from apache_beam.metrics import Metrics +from apache_beam.transforms import DoFn, ParDo, PTransform + +try: + from google.cloud import vision +except ImportError: + raise ImportError( + 'Google Cloud Vision not supported for this execution environment ' + '(could not import google.cloud.vision).') + +__all__ = ['AnnotateImage'] + + +@ttl_cache(maxsize=128, ttl=3600) +def get_vision_client(client_options=None): + """Returns a Cloud Vision API client.""" + _client = vision.ImageAnnotatorClient(client_options=client_options) + return _client + + +class AnnotateImage(PTransform): + """A ``PTransform`` for annotating images using the GCP Vision API + ref: https://cloud.google.com/vision/docs/ + """ + def __init__(self, features, retry=None, timeout=120, client_options=None): + super(AnnotateImage, self).__init__() + self.features = features + self.retry = retry + self.timeout = timeout + self.client_options = client_options + + def expand(self, pvalue): + return pvalue | ParDo( + self._ImageAnnotateFn( + features=self.features, + retry=self.retry, + timeout=self.timeout, + client_options=self.client_options)) + + @typehints.with_input_types( + Union[Tuple[Union[text_type, binary_type], vision.types.ImageContext], + Union[text_type, binary_type]]) + class _ImageAnnotateFn(DoFn): + """ A DoFn that sends each input element to the GCP Vision API + service and outputs an element with the return result of the API + (``google.cloud.vision_v1.types.BatchAnnotateImagesResponse``). + """ + def __init__(self, features, retry, timeout, client_options): + super(AnnotateImage._ImageAnnotateFn, self).__init__() + self._client = None + self.features = features + self.retry = retry + self.timeout = timeout + self.client_options = client_options + self.image_context = None + self.counter = Metrics.counter(self.__class__, "API Calls") + + def start_bundle(self): + self._client = get_vision_client(self.client_options) + + def process(self, element, *args, **kwargs): + if isinstance(element, Tuple): # Unpack (element, ImageContext) tuple + element, self.image_context = element + if isinstance(element, text_type): # Is element an URI + image = vision.types.Image( + source=vision.types.ImageSource(image_uri=element)) + elif isinstance(element, binary_type): # Is element raw bytes + image = vision.types.Image(content=element) + + request = vision.types.AnnotateImageRequest( + image=image, features=self.features, image_context=self.image_context) + response = self._client.annotate_image( + request=request, retry=self.retry, timeout=self.timeout) + + self.counter.inc() + yield response diff --git a/sdks/python/apache_beam/ml/gcp/visionml_test.py b/sdks/python/apache_beam/ml/gcp/visionml_test.py new file mode 100644 index 000000000000..d42f58b1a099 --- /dev/null +++ b/sdks/python/apache_beam/ml/gcp/visionml_test.py @@ -0,0 +1,158 @@ +# pylint: skip-file +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for visionml.""" + +# pytype: skip-file + +from __future__ import absolute_import, unicode_literals + +import unittest + +import mock + +import apache_beam as beam +from apache_beam.metrics import MetricsFilter +from apache_beam.typehints.decorators import TypeCheckError + +# Protect against environments where vision lib is not available. +try: + from google.cloud.vision import ImageAnnotatorClient + from google.cloud import vision + from apache_beam.ml.gcp import visionml +except ImportError: + ImageAnnotatorClient = None + + +@unittest.skipIf( + ImageAnnotatorClient is None, 'Vision dependencies are not installed') +class VisionTest(unittest.TestCase): + def setUp(self): + self._mock_client = mock.Mock() + self._mock_client.annotate_image.return_value = None + feature_type = vision.enums.Feature.Type.TEXT_DETECTION + self.features = [ + vision.types.Feature( + type=feature_type, max_results=3, model="builtin/stable") + ] + + def test_AnnotateImage_URI_with_ImageContext(self): + img_ctx = vision.types.ImageContext() + images_to_annotate = [ + ('gs://cloud-samples-data/vision/ocr/sign.jpg', img_ctx) + ] + expected_counter = len(images_to_annotate) + with mock.patch.object(visionml, + 'get_vision_client', + return_value=self._mock_client): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(images_to_annotate) + | "Annotate image" >> visionml.AnnotateImage(self.features)) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.committed == expected_counter) + + def test_AnnotateImage_URI(self): + images_to_annotate = ['gs://cloud-samples-data/vision/ocr/sign.jpg'] + expected_counter = len(images_to_annotate) + with mock.patch.object(visionml, + 'get_vision_client', + return_value=self._mock_client): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(images_to_annotate) + | "Annotate image" >> visionml.AnnotateImage(self.features)) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.committed == expected_counter) + + def test_AnnotateImage_byte_content(self): + img_ctx = vision.types.ImageContext() + base_64_encoded_bytes = \ + ( + b'begin 644 sign.jpg _]C_X 02D9)1@ ! 0$ 2 !( #_X2EZ fake_image_content', + img_ctx) + + images_to_annotate = [base_64_encoded_bytes] + expected_counter = len(images_to_annotate) + with mock.patch.object(visionml, + 'get_vision_client', + return_value=self._mock_client): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(images_to_annotate) + | "Annotate image" >> visionml.AnnotateImage(self.features)) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.committed == expected_counter) + + def test_AnnotateImage_byte_content_with_ImageContext(self): + base_64_encoded_bytes = \ + b'begin 644 sign.jpg _]C_X 02D9)1@ ! 0$ 2 !( #_X2EZ fake_image_content' + + images_to_annotate = [base_64_encoded_bytes] + expected_counter = len(images_to_annotate) + with mock.patch.object(visionml, + 'get_vision_client', + return_value=self._mock_client): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(images_to_annotate) + | "Annotate image" >> visionml.AnnotateImage(self.features)) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.committed == expected_counter) + + def test_AnnotateImage_bad_input(self): + images_to_annotate = [123456789, 123456789, 123456789] + with mock.patch.object(visionml, + 'get_vision_client', + return_value=self._mock_client): + with self.assertRaises(TypeCheckError): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(images_to_annotate) + | "Annotate image" >> visionml.AnnotateImage(self.features)) + result = p.run() + result.wait_until_finish() From 29f3498cdf93fe5f5de838815a85822ed6395665 Mon Sep 17 00:00:00 2001 From: EDjur Date: Thu, 20 Feb 2020 13:43:38 +0100 Subject: [PATCH 02/11] AnnotateImage and AnnotateImageWithContext PTransforms --- sdks/python/apache_beam/ml/gcp/visionml.py | 209 ++++++++++++++---- .../apache_beam/ml/gcp/visionml_test.py | 120 +++++++--- 2 files changed, 259 insertions(+), 70 deletions(-) diff --git a/sdks/python/apache_beam/ml/gcp/visionml.py b/sdks/python/apache_beam/ml/gcp/visionml.py index f9bdf5eb3dbf..dbd3f0fbd6c6 100644 --- a/sdks/python/apache_beam/ml/gcp/visionml.py +++ b/sdks/python/apache_beam/ml/gcp/visionml.py @@ -21,17 +21,23 @@ """ # TODO: Add Batching support https://google-cloud-python.readthedocs.io/en/0.32.0/vision/gapic/v1/api.html#google.cloud.vision_v1.ImageAnnotatorClient.batch_annotate_images +# TODO: Add more types https://googleapis.dev/python/vision/0.41.0/gapic/v1/types.html -from __future__ import absolute_import, print_function +from __future__ import absolute_import -from cachetools.func import ttl_cache -from future.utils import binary_type, text_type +from typing import Optional from typing import Tuple from typing import Union +from future.utils import binary_type +from future.utils import text_type + from apache_beam import typehints from apache_beam.metrics import Metrics -from apache_beam.transforms import DoFn, ParDo, PTransform +from apache_beam.transforms import DoFn +from apache_beam.transforms import ParDo +from apache_beam.transforms import PTransform +from cachetools.func import ttl_cache try: from google.cloud import vision @@ -40,7 +46,7 @@ 'Google Cloud Vision not supported for this execution environment ' '(could not import google.cloud.vision).') -__all__ = ['AnnotateImage'] +__all__ = ['AnnotateImage', 'AnnotateImageWithContext'] @ttl_cache(maxsize=128, ttl=3600) @@ -53,56 +59,171 @@ def get_vision_client(client_options=None): class AnnotateImage(PTransform): """A ``PTransform`` for annotating images using the GCP Vision API ref: https://cloud.google.com/vision/docs/ + + Sends each element to the GCP Vision API. Element is a + Union[text_type, binary_type] of either an URI (e.g. a GCS URI) or binary_type + base64-encoded image data. + Accepts an `AsDict` side input that maps each image to an image context. """ - def __init__(self, features, retry=None, timeout=120, client_options=None): + def __init__( + self, + features, + retry=None, + timeout=120, + client_options=None, + context_side_input=None): + """ + Args: + features: (List[``vision.types.Feature.enums.Feature``]) Required. + The Vision API features to detect + retry: (google.api_core.retry.Retry) Optional. + A retry object used to retry requests. + If None is specified (default), requests will not be retried. + timeout: (float) Optional. + The time in seconds to wait for the response from the + Vision API + client_options: (Union[dict, google.api_core.client_options.ClientOptions]) + Client options used to set user options on the client. + API Endpoint should be set through client_options. + context_side_input: (beam.pvalue.AsDict) Optional. + An ``AsDict`` of a PCollection to be passed to the + _ImageAnnotateFn as the image context mapping containing additional + image context and/or feature-specific parameters. + Example usage:: + + image_contexts = + [(''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict, + ``vision.types.ImageContext()``]), + (''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict, + ``vision.types.ImageContext()``]),] + + context_side_input = + ( + p + | "Image contexts" >> beam.Create(image_contexts) + ) + + visionml.AnnotateImage(features, + context_side_input=beam.pvalue.AsDict(context_side_input))) + """ super(AnnotateImage, self).__init__() self.features = features self.retry = retry self.timeout = timeout self.client_options = client_options + self.context_side_input = context_side_input def expand(self, pvalue): return pvalue | ParDo( - self._ImageAnnotateFn( + _ImageAnnotateFn( features=self.features, retry=self.retry, timeout=self.timeout, client_options=self.client_options)) - @typehints.with_input_types( - Union[Tuple[Union[text_type, binary_type], vision.types.ImageContext], - Union[text_type, binary_type]]) - class _ImageAnnotateFn(DoFn): - """ A DoFn that sends each input element to the GCP Vision API - service and outputs an element with the return result of the API - (``google.cloud.vision_v1.types.BatchAnnotateImagesResponse``). - """ - def __init__(self, features, retry, timeout, client_options): - super(AnnotateImage._ImageAnnotateFn, self).__init__() - self._client = None - self.features = features - self.retry = retry - self.timeout = timeout - self.client_options = client_options - self.image_context = None - self.counter = Metrics.counter(self.__class__, "API Calls") - - def start_bundle(self): - self._client = get_vision_client(self.client_options) - - def process(self, element, *args, **kwargs): - if isinstance(element, Tuple): # Unpack (element, ImageContext) tuple - element, self.image_context = element - if isinstance(element, text_type): # Is element an URI - image = vision.types.Image( - source=vision.types.ImageSource(image_uri=element)) - elif isinstance(element, binary_type): # Is element raw bytes - image = vision.types.Image(content=element) - - request = vision.types.AnnotateImageRequest( - image=image, features=self.features, image_context=self.image_context) - response = self._client.annotate_image( - request=request, retry=self.retry, timeout=self.timeout) - - self.counter.inc() - yield response + +@typehints.with_input_types( + Union[text_type, binary_type], Optional[vision.types.ImageContext]) +class _ImageAnnotateFn(DoFn): + """A DoFn that sends each input element to the GCP Vision API + service and outputs an element with the return result of the API + (``google.cloud.vision_v1.types.AnnotateImageResponse``). + """ + def __init__(self, features, retry, timeout, client_options): + super(_ImageAnnotateFn, self).__init__() + self._client = None + self.features = features + self.retry = retry + self.timeout = timeout + self.client_options = client_options + self.counter = Metrics.counter(self.__class__, "API Calls") + + def start_bundle(self): + self._client = get_vision_client(self.client_options) + + def _annotate_image(self, element, image_context): + if isinstance(element, text_type): + image = vision.types.Image( + source=vision.types.ImageSource(image_uri=element)) + else: # Typehint checks only allows text_type or binary_type + image = vision.types.Image(content=element) + + request = vision.types.AnnotateImageRequest( + image=image, features=self.features, image_context=image_context) + response = self._client.annotate_image( + request=request, retry=self.retry, timeout=self.timeout) + + return response + + def process(self, element, context_side_input=None, *args, **kwargs): + if context_side_input: # If we have a side input image context, use that + image_context = context_side_input.get(element) + else: + image_context = None + response = self._annotate_image(element, image_context) + self.counter.inc() + yield response + + +class AnnotateImageWithContext(AnnotateImage): + """A ``PTransform`` for annotating images using the GCP Vision API + ref: https://cloud.google.com/vision/docs/ + + Sends each element to the GCP Vision API. Element is a tuple of + + (Union[text_type, binary_type], + Optional[``vision.types.ImageContext``]) + + where the former is either an URI (e.g. a GCS URI) or binary_type + base64-encoded image data + """ + def __init__(self, features, retry=None, timeout=120, client_options=None): + """ + Args: + features: (List[``vision.types.Feature.enums.Feature``]) Required. + The Vision API features to detect + retry: (google.api_core.retry.Retry) Optional. + A retry object used to retry requests. + If None is specified (default), requests will not be retried. + timeout: (float) Optional. + The time in seconds to wait for the response from the + Vision API + client_options: (Union[dict, google.api_core.client_options.ClientOptions]) + Client options used to set user options on the client. + API Endpoint should be set through client_options. + """ + super(AnnotateImageWithContext, self).__init__( + features=features, + retry=retry, + timeout=timeout, + client_options=client_options) + + def expand(self, pvalue): + return pvalue | ParDo( + _ImageAnnotateFnWithContext( + features=self.features, + retry=self.retry, + timeout=self.timeout, + client_options=self.client_options)) + + +@typehints.with_input_types( + Tuple[Union[text_type, binary_type], Optional[vision.types.ImageContext]]) +class _ImageAnnotateFnWithContext(_ImageAnnotateFn): + """A DoFn that unpacks each input tuple to element, image_context variables + and sends these to the GCP Vision API service and outputs an element with the + return result of the API + (``google.cloud.vision_v1.types.AnnotateImageResponse``). + """ + def __init__(self, features, retry, timeout, client_options): + super(_ImageAnnotateFnWithContext, self).__init__( + features=features, + retry=retry, + timeout=timeout, + client_options=client_options) + + def process(self, element, *args, **kwargs): + element, image_context = element # Unpack (image, image_context) tuple + response = self._annotate_image(element, image_context) + self.counter.inc() + yield response diff --git a/sdks/python/apache_beam/ml/gcp/visionml_test.py b/sdks/python/apache_beam/ml/gcp/visionml_test.py index d42f58b1a099..7292689c1740 100644 --- a/sdks/python/apache_beam/ml/gcp/visionml_test.py +++ b/sdks/python/apache_beam/ml/gcp/visionml_test.py @@ -20,8 +20,10 @@ # pytype: skip-file -from __future__ import absolute_import, unicode_literals +from __future__ import absolute_import +from __future__ import unicode_literals +import logging import unittest import mock @@ -50,21 +52,34 @@ def setUp(self): vision.types.Feature( type=feature_type, max_results=3, model="builtin/stable") ] + self.img_ctx = vision.types.ImageContext() - def test_AnnotateImage_URI_with_ImageContext(self): - img_ctx = vision.types.ImageContext() + def test_AnnotateImage_URI_with_side_input_context(self): images_to_annotate = [ - ('gs://cloud-samples-data/vision/ocr/sign.jpg', img_ctx) + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg' ] + + image_contexts = [( + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + self.img_ctx, + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + self.img_ctx, + )] + expected_counter = len(images_to_annotate) with mock.patch.object(visionml, 'get_vision_client', return_value=self._mock_client): p = beam.Pipeline() + context_side_input = (p | "Image contexts" >> beam.Create(image_contexts)) + _ = ( p | "Create data" >> beam.Create(images_to_annotate) - | "Annotate image" >> visionml.AnnotateImage(self.features)) + | "Annotate image" >> visionml.AnnotateImage( + self.features, + context_side_input=beam.pvalue.AsDict(context_side_input))) result = p.run() result.wait_until_finish() @@ -74,8 +89,12 @@ def test_AnnotateImage_URI_with_ImageContext(self): read_counter = query_result['counters'][0] self.assertTrue(read_counter.committed == expected_counter) - def test_AnnotateImage_URI(self): - images_to_annotate = ['gs://cloud-samples-data/vision/ocr/sign.jpg'] + def test_AnnotateImage_URIs(self): + images_to_annotate = [ + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg' + ] + expected_counter = len(images_to_annotate) with mock.patch.object(visionml, 'get_vision_client', @@ -94,14 +113,14 @@ def test_AnnotateImage_URI(self): read_counter = query_result['counters'][0] self.assertTrue(read_counter.committed == expected_counter) - def test_AnnotateImage_byte_content(self): - img_ctx = vision.types.ImageContext() - base_64_encoded_bytes = \ - ( - b'begin 644 sign.jpg _]C_X 02D9)1@ ! 0$ 2 !( #_X2EZ fake_image_content', - img_ctx) - - images_to_annotate = [base_64_encoded_bytes] + def test_AnnotateImageWithContext_b64_content(self): + base_64_encoded_image = \ + b'YmVnaW4gNjQ0IGNhdC12aWRlby5tcDRNICAgICgmOVQ+NyFNPCMwUi4uZmFrZV92aWRlb' + images_to_annotate = [ + (base_64_encoded_image, self.img_ctx), + (base_64_encoded_image, None), + (base_64_encoded_image, self.img_ctx), + ] expected_counter = len(images_to_annotate) with mock.patch.object(visionml, 'get_vision_client', @@ -110,21 +129,25 @@ def test_AnnotateImage_byte_content(self): _ = ( p | "Create data" >> beam.Create(images_to_annotate) - | "Annotate image" >> visionml.AnnotateImage(self.features)) + | + "Annotate image" >> visionml.AnnotateImageWithContext(self.features)) result = p.run() result.wait_until_finish() - read_filter = MetricsFilter().with_name('API Calls') - query_result = result.metrics().query(read_filter) - if query_result['counters']: - read_counter = query_result['counters'][0] - self.assertTrue(read_counter.committed == expected_counter) - - def test_AnnotateImage_byte_content_with_ImageContext(self): - base_64_encoded_bytes = \ - b'begin 644 sign.jpg _]C_X 02D9)1@ ! 0$ 2 !( #_X2EZ fake_image_content' + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.committed == expected_counter) - images_to_annotate = [base_64_encoded_bytes] + def test_AnnotateImage_b64_content(self): + base_64_encoded_image = \ + b'YmVnaW4gNjQ0IGNhdC12aWRlby5tcDRNICAgICgmOVQ+NyFNPCMwUi4uZmFrZV92aWRlb' + images_to_annotate = [ + (base_64_encoded_image), + (base_64_encoded_image), + (base_64_encoded_image), + ] expected_counter = len(images_to_annotate) with mock.patch.object(visionml, 'get_vision_client', @@ -143,6 +166,25 @@ def test_AnnotateImage_byte_content_with_ImageContext(self): read_counter = query_result['counters'][0] self.assertTrue(read_counter.committed == expected_counter) + def test_AnnotateImageWithContext_bad_input(self): + """AnnotateImageWithContext should not accept images without context""" + images_to_annotate = [ + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg' + ] + with mock.patch.object(visionml, + 'get_vision_client', + return_value=self._mock_client): + with self.assertRaises(TypeCheckError): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(images_to_annotate) + | "Annotate image" >> visionml.AnnotateImageWithContext( + self.features)) + result = p.run() + result.wait_until_finish() + def test_AnnotateImage_bad_input(self): images_to_annotate = [123456789, 123456789, 123456789] with mock.patch.object(visionml, @@ -156,3 +198,29 @@ def test_AnnotateImage_bad_input(self): | "Annotate image" >> visionml.AnnotateImage(self.features)) result = p.run() result.wait_until_finish() + + def test_AnnotateImageWithContext_URI_with_ImageContext(self): + images_to_annotate = [ + ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx) + ] + expected_counter = len(images_to_annotate) + with mock.patch.object(visionml, + 'get_vision_client', + return_value=self._mock_client): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(images_to_annotate) + | "Annotate image" >> visionml.AnnotateImageWithContext(self.features)) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.committed == expected_counter) + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() \ No newline at end of file From 096d02e6210dd5be6598036408f77269035e9e74 Mon Sep 17 00:00:00 2001 From: EDjur Date: Tue, 25 Feb 2020 09:31:15 +0100 Subject: [PATCH 03/11] AnnotateImage and AsyncBatchAnnotateImage functionality using GCP Vision API --- sdks/python/apache_beam/ml/gcp/visionml.py | 311 +++++++++++++++++- .../apache_beam/ml/gcp/visionml_test.py | 121 ++++++- sdks/python/setup.py | 1 + 3 files changed, 423 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/ml/gcp/visionml.py b/sdks/python/apache_beam/ml/gcp/visionml.py index dbd3f0fbd6c6..fa4c68ab9c8f 100644 --- a/sdks/python/apache_beam/ml/gcp/visionml.py +++ b/sdks/python/apache_beam/ml/gcp/visionml.py @@ -20,9 +20,6 @@ A connector for sending API requests to the GCP Vision API. """ -# TODO: Add Batching support https://google-cloud-python.readthedocs.io/en/0.32.0/vision/gapic/v1/api.html#google.cloud.vision_v1.ImageAnnotatorClient.batch_annotate_images -# TODO: Add more types https://googleapis.dev/python/vision/0.41.0/gapic/v1/types.html - from __future__ import absolute_import from typing import Optional @@ -46,7 +43,12 @@ 'Google Cloud Vision not supported for this execution environment ' '(could not import google.cloud.vision).') -__all__ = ['AnnotateImage', 'AnnotateImageWithContext'] +__all__ = [ + 'AnnotateImage', + 'AnnotateImageWithContext', + 'AsyncBatchAnnotateImage', + 'AsyncBatchAnnotateImageWithContext' +] @ttl_cache(maxsize=128, ttl=3600) @@ -81,7 +83,7 @@ def __init__( If None is specified (default), requests will not be retried. timeout: (float) Optional. The time in seconds to wait for the response from the - Vision API + Vision API. client_options: (Union[dict, google.api_core.client_options.ClientOptions]) Client options used to set user options on the client. API Endpoint should be set through client_options. @@ -119,7 +121,8 @@ def expand(self, pvalue): features=self.features, retry=self.retry, timeout=self.timeout, - client_options=self.client_options)) + client_options=self.client_options), + context_side_input=self.context_side_input) @typehints.with_input_types( @@ -175,7 +178,7 @@ class AnnotateImageWithContext(AnnotateImage): Optional[``vision.types.ImageContext``]) where the former is either an URI (e.g. a GCS URI) or binary_type - base64-encoded image data + base64-encoded image data. """ def __init__(self, features, retry=None, timeout=120, client_options=None): """ @@ -227,3 +230,297 @@ def process(self, element, *args, **kwargs): response = self._annotate_image(element, image_context) self.counter.inc() yield response + + +class AsyncBatchAnnotateImage(PTransform): + """A ``PTransform`` for batch (offline) annotating images using the + GCP Vision API. ref: https://cloud.google.com/vision/docs/ + + Sends each batch of elements to the GCP Vision API which then stores the + results in GCS. + Element is a Union[text_type, binary_type] of either an URI (e.g. a GCS URI) + or binary_type base64-encoded image data. + Accepts an `AsDict` side input that maps each image to an image context. + """ + + MAX_BATCH_SIZE = 5000 + + def __init__( + self, + features, + output_config=None, + gcs_destination=None, + retry=None, + timeout=120, + batch_size=None, + client_options=None, + context_side_input=None, + metadata=None): + """ + Args: + features: (List[``vision.types.Feature.enums.Feature``]) Required. + The Vision API features to detect + output_config: + (Union[dict, ~google.cloud.vision.types.OutputConfig]) Optional. + The desired output location and metadata (e.g. format). + If a dict is provided, it must be of the same form as the protobuf + message :class:`~google.cloud.vision.types.OutputConfig` + gcs_destination: (str) Optional. The desired output location. + Either output_config or gcs_destination needs to be set. + output_config takes precedence. + retry: (google.api_core.retry.Retry) Optional. + A retry object used to retry requests. + If None is specified (default), requests will not be retried. + timeout: (float) Optional. + The time in seconds to wait for the response from the Vision API. + Default is 120 for single-element requests and 300 for batch annotation. + batch_size: (int) Number of images to batch in the same request to the + Vision API. Default is 5000. + client_options: (Union[dict, google.api_core.client_options.ClientOptions]) + Client options used to set user options on the client. + API Endpoint should be set through client_options. + context_side_input: (beam.pvalue.AsDict) Optional. + An ``AsDict`` of a PCollection to be passed to the + _ImageAnnotateFn as the image context mapping containing additional + image context and/or feature-specific parameters. + Example usage:: + + image_contexts = + [(''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict, + ``vision.types.ImageContext()``]), + (''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict, + ``vision.types.ImageContext()``]),] + + context_side_input = + ( + p + | "Image contexts" >> beam.Create(image_contexts) + ) + + visionml.AsyncBatchAnnotateImage(features, output_config, + context_side_input=beam.pvalue.AsDict(context_side_input))) + metadata: (Optional[Sequence[Tuple[str, str]]]): Optional. + Additional metadata that is provided to the method. + + """ + super(AsyncBatchAnnotateImage, self).__init__() + self.features = features + self.output_config = output_config + if output_config is None and gcs_destination is None: + raise ValueError('output_config or gcs_destination must be specified') + if output_config is None: + self.output_config = self._generate_output_config(gcs_destination) + + self.retry = retry + self.timeout = timeout + self.batch_size = batch_size or AsyncBatchAnnotateImage.MAX_BATCH_SIZE + if self.batch_size > AsyncBatchAnnotateImage.MAX_BATCH_SIZE: + raise ValueError( + 'Max batch_size exceeded. ' + 'Batch size needs to be smaller than {}'.format( + AsyncBatchAnnotateImage.MAX_BATCH_SIZE)) + self.client_options = client_options + self.context_side_input = context_side_input + self.metadata = metadata + + @staticmethod + def _generate_output_config(output_uri): + gcs_destination = {"uri": output_uri} + output_config = {"gcs_destination": gcs_destination} + return output_config + + def expand(self, pvalue): + return pvalue | ParDo( + _AsyncBatchImageAnnotateFn( + features=self.features, + output_config=self.output_config, + retry=self.retry, + timeout=self.timeout, + batch_size=self.batch_size, + client_options=self.client_options, + metadata=self.metadata), + context_side_input=self.context_side_input) + + +class AsyncBatchAnnotateImageWithContext(AsyncBatchAnnotateImage): + """A ``PTransform`` for batch (offline) annotating images using the + GCP Vision API. ref: https://cloud.google.com/vision/docs/batch + + Sends each batch of elements to the GCP Vision API which then stores the + results in GCS. + Element is a Union[text_type, binary_type] of either an URI (e.g. a GCS URI) + or binary_type base64-encoded image data. + Accepts an `AsDict` side input that maps each image to an image context. + """ + def __init__( + self, + features, + output_config=None, + gcs_destination=None, + retry=None, + timeout=120, + batch_size=None, + client_options=None, + metadata=None): + """ + Args: + features: (List[``vision.types.Feature.enums.Feature``]) Required. + The Vision API features to detect + output_config: + (Union[dict, ~google.cloud.vision.types.OutputConfig]) Optional. + The desired output location and metadata (e.g. format). + If a dict is provided, it must be of the same form as the protobuf + message :class:`~google.cloud.vision.types.OutputConfig` + gcs_destination: (str) Optional. The desired output location. + Either output_config or gcs_destination needs to be set. + output_config takes precedence. + retry: (google.api_core.retry.Retry) Optional. + A retry object used to retry requests. + If None is specified (default), requests will not be retried. + timeout: (float) Optional. + The time in seconds to wait for the response from the Vision API. + Default is 120 for single-element requests and 300 for batch annotation. + batch_size: (int) Number of images to batch in the same request to the + Vision API. Default is 5000. + client_options: (Union[dict, google.api_core.client_options.ClientOptions]) + Client options used to set user options on the client. + API Endpoint should be set through client_options. + metadata: (Optional[Sequence[Tuple[str, str]]]): Optional. + Additional metadata that is provided to the method. + """ + super(AsyncBatchAnnotateImageWithContext, self).__init__( + features=features, + output_config=output_config, + gcs_destination=gcs_destination, + retry=retry, + timeout=timeout, + batch_size=batch_size, + client_options=client_options, + metadata=metadata) + + def expand(self, pvalue): + return pvalue | ParDo( + _AsyncBatchImageAnnotateFnWithContext( + features=self.features, + output_config=self.output_config, + retry=self.retry, + timeout=self.timeout, + batch_size=self.batch_size, + client_options=self.client_options, + metadata=self.metadata)) + + +@typehints.with_input_types( + Union[text_type, binary_type], Optional[vision.types.ImageContext]) +class _AsyncBatchImageAnnotateFn(DoFn): + """A DoFn that sends each input element to the GCP Vision API + service in batches and stores the results in GCS. Returns a + ``google.cloud.vision.types.AsyncBatchAnnotateImagesResponse`` containing the + the output location and metadata from AsyncBatchAnnotateImagesRequest. + """ + def __init__( + self, + features, + output_config, + retry, + timeout, + client_options, + batch_size, + metadata): + + super(_AsyncBatchImageAnnotateFn, self).__init__() + self._client = None + self.features = features + self.output_config = output_config + self.retry = retry + self.timeout = timeout + self.client_options = client_options + self.batch_size = batch_size + self._batch_elements = None + self.metadata = metadata + self.counter = Metrics.counter(self.__class__, "API Calls") + + def start_bundle(self): + self._client = get_vision_client(self.client_options) + self._batch_elements = [] + + def finish_bundle(self): + if self._batch_elements: + response = self._flush_batch() + self.counter.inc() + return response + + def create_annotate_image_request(self, element, image_context): + if isinstance(element, text_type): + image = vision.types.Image( + source=vision.types.ImageSource(image_uri=element)) + else: # Typehint checks only allows text_type or binary_type + image = vision.types.Image(content=element) + + request = vision.types.AnnotateImageRequest( + image=image, features=self.features, image_context=image_context) + return request + + def process(self, element, context_side_input=None, *args, **kwargs): + if context_side_input: # If we have a side input image context, use that + image_context = context_side_input.get(element) + else: + image_context = None + + # Evaluate batches + request = self.create_annotate_image_request(element, image_context) + self._batch_elements.append(request) + if len(self._batch_elements) >= self.batch_size or \ + len(self._batch_elements) >= AsyncBatchAnnotateImage.MAX_BATCH_SIZE: + response = self._flush_batch() + self.counter.inc() + yield response + + def _flush_batch(self): + operation = self._client.async_batch_annotate_images( + requests=self._batch_elements, + output_config=self.output_config, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata) + self._batch_elements = [] + return operation + + +@typehints.with_input_types( + Tuple[Union[text_type, binary_type], Optional[vision.types.ImageContext]]) +class _AsyncBatchImageAnnotateFnWithContext(_AsyncBatchImageAnnotateFn): + """A DoFn that sends each input element and its context to the GCP Vision API + service in batches. Returns a + ``google.cloud.vision.types.AsyncBatchAnnotateImagesResponse`` containing the + the output location and metadata from AsyncBatchAnnotateImagesRequest. + """ + def __init__( + self, + features, + output_config, + retry, + timeout, + batch_size, + client_options, + metadata): + super(_AsyncBatchImageAnnotateFnWithContext, self).__init__( + features=features, + output_config=output_config, + retry=retry, + timeout=timeout, + client_options=client_options, + batch_size=batch_size, + metadata=metadata) + + def process(self, element, context_side_input=None, *args, **kwargs): + element, image_context = element # Unpack (image, image_context) tuple + + # Evaluate batches + request = self.create_annotate_image_request(element, image_context) + self._batch_elements.append(request) + if len(self._batch_elements) >= self.batch_size or \ + len(self._batch_elements) >= self.MAX_BATCH_SIZE: + response = self._flush_batch() + self.counter.inc() + yield response diff --git a/sdks/python/apache_beam/ml/gcp/visionml_test.py b/sdks/python/apache_beam/ml/gcp/visionml_test.py index 7292689c1740..9ad626919e0b 100644 --- a/sdks/python/apache_beam/ml/gcp/visionml_test.py +++ b/sdks/python/apache_beam/ml/gcp/visionml_test.py @@ -46,13 +46,19 @@ class VisionTest(unittest.TestCase): def setUp(self): self._mock_client = mock.Mock() - self._mock_client.annotate_image.return_value = None + self.m2 = mock.Mock() + self.m2.result.return_value = None + self._mock_client.annotate_image.return_value = self.m2 + self._mock_client.batch_annotate_images.return_value = self.m2 feature_type = vision.enums.Feature.Type.TEXT_DETECTION self.features = [ vision.types.Feature( type=feature_type, max_results=3, model="builtin/stable") ] self.img_ctx = vision.types.ImageContext() + self.gcs_destination = "gs://example-bucket/prefix/" + # TODO: Add output config test + # self.output_config = vision.types.OutputConfig(gcs_destination=gcs_dest) def test_AnnotateImage_URI_with_side_input_context(self): images_to_annotate = [ @@ -211,7 +217,40 @@ def test_AnnotateImageWithContext_URI_with_ImageContext(self): _ = ( p | "Create data" >> beam.Create(images_to_annotate) - | "Annotate image" >> visionml.AnnotateImageWithContext(self.features)) + | + "Annotate image" >> visionml.AnnotateImageWithContext(self.features)) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.committed == expected_counter) + + def test_AnnotateImage_batch_request(self): + images_to_annotate = [ + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + ] + + batch_size = 5 + expected_counter = 2 # All 6 images should fit in two batches + with mock.patch.object(visionml, + 'get_vision_client', + return_value=self._mock_client): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(images_to_annotate) + | "Annotate image" >> visionml.AsyncBatchAnnotateImage( + self.features, + gcs_destination=self.gcs_destination, + batch_size=batch_size)) result = p.run() result.wait_until_finish() @@ -219,8 +258,84 @@ def test_AnnotateImageWithContext_URI_with_ImageContext(self): query_result = result.metrics().query(read_filter) if query_result['counters']: read_counter = query_result['counters'][0] + print(read_counter.committed) self.assertTrue(read_counter.committed == expected_counter) + def test_AnnotateImageWithContext_batch_request(self): + images_to_annotate = [ + ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), + ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), + ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), + ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), + ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), + ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), + ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), + ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), + ] + batch_size = 3 + expected_counter = 3 # All 8 images should fit in 3 batches + with mock.patch.object(visionml, + 'get_vision_client', + return_value=self._mock_client): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(images_to_annotate) + | "Annotate image" >> visionml.AnnotateImageWithContext( + self.features, batch_size=batch_size)) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + print(read_counter.committed) + self.assertTrue(read_counter.committed == expected_counter) + + def test_AnnotateImageWithContext_batch_bad_input(self): + """AnnotateImageWithContext should not accept images without context""" + images_to_annotate = [ + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg' + ] + batch_size = 5 + with mock.patch.object(visionml, + 'get_vision_client', + return_value=self._mock_client): + with self.assertRaises(TypeCheckError): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(images_to_annotate) + | "Annotate image" >> visionml.AnnotateImageWithContext( + self.features, batch_size=batch_size)) + result = p.run() + result.wait_until_finish() + + def test_AnnotateImage_batch_bad_batch_size(self): + """AnnotateImageWithContext should raise ValueError when + batch size exceeds the limit + """ + images_to_annotate = [ + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg' + ] + batch_size = 50 + with mock.patch.object(visionml, + 'get_vision_client', + return_value=self._mock_client): + with self.assertRaises(ValueError): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(images_to_annotate) + | "Annotate image" >> visionml.AnnotateImageWithContext( + self.features, batch_size=batch_size)) + result = p.run() + result.wait_until_finish() + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) - unittest.main() \ No newline at end of file + unittest.main() diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 6c2a06369a92..239f9fbd1050 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -214,6 +214,7 @@ def get_version(): 'grpcio-gcp>=0.2.2,<1', # GCP Packages required by ML functionality 'google-cloud-videointelligence>=1.8.0<=1.12.1', + 'google-cloud-vision==0.42.0' ] INTERACTIVE_BEAM = [ From 3691df25420e719b163effcab36268ff9a761e7e Mon Sep 17 00:00:00 2001 From: EDjur Date: Tue, 25 Feb 2020 09:39:16 +0100 Subject: [PATCH 04/11] Remove typoed videointelligence dependency --- sdks/python/setup.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 126be673458c..eefd0cd4518b 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -213,7 +213,6 @@ def get_version(): 'google-cloud-spanner>=1.13.0,<1.14.0', 'grpcio-gcp>=0.2.2,<1', # GCP Packages required by ML functionality - 'google-cloud-videointelligence>=1.8.0<=1.12.1', 'google-cloud-videointelligence>=1.8.0<1.14.0', 'google-cloud-vision<=0.43.0' ] From 803576694de207d53e6e1003af35688888a77665 Mon Sep 17 00:00:00 2001 From: EDjur Date: Tue, 25 Feb 2020 15:57:26 +0100 Subject: [PATCH 05/11] Using util.BatchElements. Changed async batching to sync (online) --- sdks/python/apache_beam/ml/gcp/visionml.py | 287 +++++++----------- .../apache_beam/ml/gcp/visionml_test.py | 179 ++++++----- 2 files changed, 207 insertions(+), 259 deletions(-) diff --git a/sdks/python/apache_beam/ml/gcp/visionml.py b/sdks/python/apache_beam/ml/gcp/visionml.py index fa4c68ab9c8f..70747133a13d 100644 --- a/sdks/python/apache_beam/ml/gcp/visionml.py +++ b/sdks/python/apache_beam/ml/gcp/visionml.py @@ -22,6 +22,7 @@ from __future__ import absolute_import +from typing import List from typing import Optional from typing import Tuple from typing import Union @@ -32,8 +33,10 @@ from apache_beam import typehints from apache_beam.metrics import Metrics from apache_beam.transforms import DoFn +from apache_beam.transforms import FlatMap from apache_beam.transforms import ParDo from apache_beam.transforms import PTransform +from apache_beam.transforms import util from cachetools.func import ttl_cache try: @@ -46,8 +49,8 @@ __all__ = [ 'AnnotateImage', 'AnnotateImageWithContext', - 'AsyncBatchAnnotateImage', - 'AsyncBatchAnnotateImageWithContext' + 'BatchAnnotateImage', + 'BatchAnnotateImageWithContext' ] @@ -141,7 +144,7 @@ def __init__(self, features, retry, timeout, client_options): self.client_options = client_options self.counter = Metrics.counter(self.__class__, "API Calls") - def start_bundle(self): + def setup(self): self._client = get_vision_client(self.client_options) def _annotate_image(self, element, image_context): @@ -232,27 +235,25 @@ def process(self, element, *args, **kwargs): yield response -class AsyncBatchAnnotateImage(PTransform): +class BatchAnnotateImage(PTransform): """A ``PTransform`` for batch (offline) annotating images using the GCP Vision API. ref: https://cloud.google.com/vision/docs/ - Sends each batch of elements to the GCP Vision API which then stores the - results in GCS. + Sends each batch of elements to the GCP Vision API. Element is a Union[text_type, binary_type] of either an URI (e.g. a GCS URI) or binary_type base64-encoded image data. Accepts an `AsDict` side input that maps each image to an image context. """ - MAX_BATCH_SIZE = 5000 + MAX_BATCH_SIZE = 5 def __init__( self, features, - output_config=None, - gcs_destination=None, retry=None, timeout=120, - batch_size=None, + max_batch_size=None, + min_batch_size=1, client_options=None, context_side_input=None, metadata=None): @@ -260,22 +261,17 @@ def __init__( Args: features: (List[``vision.types.Feature.enums.Feature``]) Required. The Vision API features to detect - output_config: - (Union[dict, ~google.cloud.vision.types.OutputConfig]) Optional. - The desired output location and metadata (e.g. format). - If a dict is provided, it must be of the same form as the protobuf - message :class:`~google.cloud.vision.types.OutputConfig` - gcs_destination: (str) Optional. The desired output location. - Either output_config or gcs_destination needs to be set. - output_config takes precedence. retry: (google.api_core.retry.Retry) Optional. A retry object used to retry requests. If None is specified (default), requests will not be retried. timeout: (float) Optional. The time in seconds to wait for the response from the Vision API. Default is 120 for single-element requests and 300 for batch annotation. - batch_size: (int) Number of images to batch in the same request to the - Vision API. Default is 5000. + max_batch_size: (int) Maximum number of images to batch in the same + request to the Vision API. + Default is 5 (which is also the Vision API max). + min_batch_size: (int) Minimum number of images to batch in the same + request to the Vision API. Default is 1. client_options: (Union[dict, google.api_core.client_options.ClientOptions]) Client options used to set user options on the client. API Endpoint should be set through client_options. @@ -301,226 +297,163 @@ def __init__( context_side_input=beam.pvalue.AsDict(context_side_input))) metadata: (Optional[Sequence[Tuple[str, str]]]): Optional. Additional metadata that is provided to the method. - """ - super(AsyncBatchAnnotateImage, self).__init__() + super(BatchAnnotateImage, self).__init__() self.features = features - self.output_config = output_config - if output_config is None and gcs_destination is None: - raise ValueError('output_config or gcs_destination must be specified') - if output_config is None: - self.output_config = self._generate_output_config(gcs_destination) - self.retry = retry self.timeout = timeout - self.batch_size = batch_size or AsyncBatchAnnotateImage.MAX_BATCH_SIZE - if self.batch_size > AsyncBatchAnnotateImage.MAX_BATCH_SIZE: + self.max_batch_size = max_batch_size or BatchAnnotateImage.MAX_BATCH_SIZE + if self.max_batch_size > BatchAnnotateImage.MAX_BATCH_SIZE: raise ValueError( 'Max batch_size exceeded. ' 'Batch size needs to be smaller than {}'.format( - AsyncBatchAnnotateImage.MAX_BATCH_SIZE)) + BatchAnnotateImage.MAX_BATCH_SIZE)) + self.min_batch_size = min_batch_size self.client_options = client_options self.context_side_input = context_side_input self.metadata = metadata - @staticmethod - def _generate_output_config(output_uri): - gcs_destination = {"uri": output_uri} - output_config = {"gcs_destination": gcs_destination} - return output_config - def expand(self, pvalue): - return pvalue | ParDo( - _AsyncBatchImageAnnotateFn( - features=self.features, - output_config=self.output_config, - retry=self.retry, - timeout=self.timeout, - batch_size=self.batch_size, - client_options=self.client_options, - metadata=self.metadata), - context_side_input=self.context_side_input) + return ( + pvalue + | FlatMap(self._create_image_annotation_pairs, self.context_side_input) + | util.BatchElements( + min_batch_size=self.min_batch_size, + max_batch_size=self.max_batch_size) + | ParDo( + _BatchImageAnnotateFn( + features=self.features, + retry=self.retry, + timeout=self.timeout, + client_options=self.client_options, + metadata=self.metadata))) + + @typehints.with_input_types( + Union[text_type, binary_type], Optional[vision.types.ImageContext]) + @typehints.with_output_types(List[vision.types.AnnotateImageRequest]) + def _create_image_annotation_pairs(self, element, context_side_input): + if context_side_input: # If we have a side input image context, use that + image_context = context_side_input.get(element) + else: + image_context = None + + if isinstance(element, text_type): + image = vision.types.Image( + source=vision.types.ImageSource(image_uri=element)) + else: # Typehint checks only allows text_type or binary_type + image = vision.types.Image(content=element) + + request = vision.types.AnnotateImageRequest( + image=image, features=self.features, image_context=image_context) + yield request -class AsyncBatchAnnotateImageWithContext(AsyncBatchAnnotateImage): +class BatchAnnotateImageWithContext(BatchAnnotateImage): """A ``PTransform`` for batch (offline) annotating images using the GCP Vision API. ref: https://cloud.google.com/vision/docs/batch - Sends each batch of elements to the GCP Vision API which then stores the - results in GCS. - Element is a Union[text_type, binary_type] of either an URI (e.g. a GCS URI) - or binary_type base64-encoded image data. - Accepts an `AsDict` side input that maps each image to an image context. + Sends each element to the GCP Vision API. Element is a tuple of + + (Union[text_type, binary_type], + Optional[``vision.types.ImageContext``]) + + where the former is either an URI (e.g. a GCS URI) or binary_type + base64-encoded image data. """ def __init__( self, features, - output_config=None, - gcs_destination=None, retry=None, timeout=120, - batch_size=None, + max_batch_size=None, + min_batch_size=1, client_options=None, metadata=None): """ Args: features: (List[``vision.types.Feature.enums.Feature``]) Required. The Vision API features to detect - output_config: - (Union[dict, ~google.cloud.vision.types.OutputConfig]) Optional. - The desired output location and metadata (e.g. format). - If a dict is provided, it must be of the same form as the protobuf - message :class:`~google.cloud.vision.types.OutputConfig` - gcs_destination: (str) Optional. The desired output location. - Either output_config or gcs_destination needs to be set. - output_config takes precedence. retry: (google.api_core.retry.Retry) Optional. A retry object used to retry requests. If None is specified (default), requests will not be retried. timeout: (float) Optional. The time in seconds to wait for the response from the Vision API. Default is 120 for single-element requests and 300 for batch annotation. - batch_size: (int) Number of images to batch in the same request to the - Vision API. Default is 5000. + max_batch_size: (int) Maximum number of images to batch in the same + request to the Vision API. + Default is 5 (which is also the Vision API max). + min_batch_size: (int) Minimum number of images to batch in the same + request to the Vision API. Default is 1. client_options: (Union[dict, google.api_core.client_options.ClientOptions]) Client options used to set user options on the client. API Endpoint should be set through client_options. metadata: (Optional[Sequence[Tuple[str, str]]]): Optional. Additional metadata that is provided to the method. """ - super(AsyncBatchAnnotateImageWithContext, self).__init__( + super(BatchAnnotateImageWithContext, self).__init__( features=features, - output_config=output_config, - gcs_destination=gcs_destination, retry=retry, timeout=timeout, - batch_size=batch_size, + max_batch_size=max_batch_size, + min_batch_size=min_batch_size, client_options=client_options, metadata=metadata) def expand(self, pvalue): - return pvalue | ParDo( - _AsyncBatchImageAnnotateFnWithContext( - features=self.features, - output_config=self.output_config, - retry=self.retry, - timeout=self.timeout, - batch_size=self.batch_size, - client_options=self.client_options, - metadata=self.metadata)) + return ( + pvalue + | FlatMap(self._create_image_annotation_pairs) + | util.BatchElements( + min_batch_size=self.min_batch_size, + max_batch_size=self.max_batch_size) + | ParDo( + _BatchImageAnnotateFn( + features=self.features, + retry=self.retry, + timeout=self.timeout, + client_options=self.client_options, + metadata=self.metadata))) + + @typehints.with_input_types( + Tuple[Union[text_type, binary_type], Optional[vision.types.ImageContext]]) + @typehints.with_output_types(List[vision.types.AnnotateImageRequest]) + def _create_image_annotation_pairs(self, element, **kwargs): + element, image_context = element # Unpack (image, image_context) tuple + if isinstance(element, text_type): + image = vision.types.Image( + source=vision.types.ImageSource(image_uri=element)) + else: # Typehint checks only allows text_type or binary_type + image = vision.types.Image(content=element) + + request = vision.types.AnnotateImageRequest( + image=image, features=self.features, image_context=image_context) + yield request -@typehints.with_input_types( - Union[text_type, binary_type], Optional[vision.types.ImageContext]) -class _AsyncBatchImageAnnotateFn(DoFn): +@typehints.with_input_types(List[vision.types.AnnotateImageRequest]) +class _BatchImageAnnotateFn(DoFn): """A DoFn that sends each input element to the GCP Vision API - service in batches and stores the results in GCS. Returns a - ``google.cloud.vision.types.AsyncBatchAnnotateImagesResponse`` containing the - the output location and metadata from AsyncBatchAnnotateImagesRequest. + service in batches. + Returns a ``google.cloud.vision.types.AsyncBatchAnnotateImagesResponse``. """ - def __init__( - self, - features, - output_config, - retry, - timeout, - client_options, - batch_size, - metadata): - - super(_AsyncBatchImageAnnotateFn, self).__init__() + def __init__(self, features, retry, timeout, client_options, metadata): + super(_BatchImageAnnotateFn, self).__init__() self._client = None self.features = features - self.output_config = output_config self.retry = retry self.timeout = timeout self.client_options = client_options - self.batch_size = batch_size - self._batch_elements = None self.metadata = metadata self.counter = Metrics.counter(self.__class__, "API Calls") def start_bundle(self): self._client = get_vision_client(self.client_options) - self._batch_elements = [] - - def finish_bundle(self): - if self._batch_elements: - response = self._flush_batch() - self.counter.inc() - return response - - def create_annotate_image_request(self, element, image_context): - if isinstance(element, text_type): - image = vision.types.Image( - source=vision.types.ImageSource(image_uri=element)) - else: # Typehint checks only allows text_type or binary_type - image = vision.types.Image(content=element) - - request = vision.types.AnnotateImageRequest( - image=image, features=self.features, image_context=image_context) - return request - - def process(self, element, context_side_input=None, *args, **kwargs): - if context_side_input: # If we have a side input image context, use that - image_context = context_side_input.get(element) - else: - image_context = None - # Evaluate batches - request = self.create_annotate_image_request(element, image_context) - self._batch_elements.append(request) - if len(self._batch_elements) >= self.batch_size or \ - len(self._batch_elements) >= AsyncBatchAnnotateImage.MAX_BATCH_SIZE: - response = self._flush_batch() - self.counter.inc() - yield response - - def _flush_batch(self): - operation = self._client.async_batch_annotate_images( - requests=self._batch_elements, - output_config=self.output_config, + def process(self, element, *args, **kwargs): + response = self._client.batch_annotate_images( + requests=element, retry=self.retry, timeout=self.timeout, metadata=self.metadata) - self._batch_elements = [] - return operation - - -@typehints.with_input_types( - Tuple[Union[text_type, binary_type], Optional[vision.types.ImageContext]]) -class _AsyncBatchImageAnnotateFnWithContext(_AsyncBatchImageAnnotateFn): - """A DoFn that sends each input element and its context to the GCP Vision API - service in batches. Returns a - ``google.cloud.vision.types.AsyncBatchAnnotateImagesResponse`` containing the - the output location and metadata from AsyncBatchAnnotateImagesRequest. - """ - def __init__( - self, - features, - output_config, - retry, - timeout, - batch_size, - client_options, - metadata): - super(_AsyncBatchImageAnnotateFnWithContext, self).__init__( - features=features, - output_config=output_config, - retry=retry, - timeout=timeout, - client_options=client_options, - batch_size=batch_size, - metadata=metadata) - - def process(self, element, context_side_input=None, *args, **kwargs): - element, image_context = element # Unpack (image, image_context) tuple - - # Evaluate batches - request = self.create_annotate_image_request(element, image_context) - self._batch_elements.append(request) - if len(self._batch_elements) >= self.batch_size or \ - len(self._batch_elements) >= self.MAX_BATCH_SIZE: - response = self._flush_batch() - self.counter.inc() - yield response + self.counter.inc() + yield response diff --git a/sdks/python/apache_beam/ml/gcp/visionml_test.py b/sdks/python/apache_beam/ml/gcp/visionml_test.py index 9ad626919e0b..e36e5c23787e 100644 --- a/sdks/python/apache_beam/ml/gcp/visionml_test.py +++ b/sdks/python/apache_beam/ml/gcp/visionml_test.py @@ -47,45 +47,32 @@ class VisionTest(unittest.TestCase): def setUp(self): self._mock_client = mock.Mock() self.m2 = mock.Mock() - self.m2.result.return_value = None - self._mock_client.annotate_image.return_value = self.m2 - self._mock_client.batch_annotate_images.return_value = self.m2 + self._mock_client.annotate_image.return_value = None + self._mock_client.batch_annotate_images.return_value = None + feature_type = vision.enums.Feature.Type.TEXT_DETECTION self.features = [ vision.types.Feature( type=feature_type, max_results=3, model="builtin/stable") ] self.img_ctx = vision.types.ImageContext() - self.gcs_destination = "gs://example-bucket/prefix/" - # TODO: Add output config test - # self.output_config = vision.types.OutputConfig(gcs_destination=gcs_dest) + self.gcs_destination = "gs://mock-example-bucket/prefix/" - def test_AnnotateImage_URI_with_side_input_context(self): + def test_AnnotateImage_URIs(self): images_to_annotate = [ 'gs://cloud-samples-data/vision/ocr/sign.jpg', 'gs://cloud-samples-data/vision/ocr/sign.jpg' ] - image_contexts = [( - 'gs://cloud-samples-data/vision/ocr/sign.jpg', - self.img_ctx, - 'gs://cloud-samples-data/vision/ocr/sign.jpg', - self.img_ctx, - )] - expected_counter = len(images_to_annotate) with mock.patch.object(visionml, 'get_vision_client', return_value=self._mock_client): p = beam.Pipeline() - context_side_input = (p | "Image contexts" >> beam.Create(image_contexts)) - _ = ( p | "Create data" >> beam.Create(images_to_annotate) - | "Annotate image" >> visionml.AnnotateImage( - self.features, - context_side_input=beam.pvalue.AsDict(context_side_input))) + | "Annotate image" >> visionml.AnnotateImage(self.features)) result = p.run() result.wait_until_finish() @@ -95,21 +82,29 @@ def test_AnnotateImage_URI_with_side_input_context(self): read_counter = query_result['counters'][0] self.assertTrue(read_counter.committed == expected_counter) - def test_AnnotateImage_URIs(self): + def test_AnnotateImage_URI_with_side_input_context(self): images_to_annotate = [ 'gs://cloud-samples-data/vision/ocr/sign.jpg', 'gs://cloud-samples-data/vision/ocr/sign.jpg' ] + image_contexts = [ + ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), + ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), + ] expected_counter = len(images_to_annotate) with mock.patch.object(visionml, 'get_vision_client', return_value=self._mock_client): p = beam.Pipeline() + context_side_input = (p | "Image contexts" >> beam.Create(image_contexts)) + _ = ( p | "Create data" >> beam.Create(images_to_annotate) - | "Annotate image" >> visionml.AnnotateImage(self.features)) + | "Annotate image" >> visionml.AnnotateImage( + self.features, + context_side_input=beam.pvalue.AsDict(context_side_input))) result = p.run() result.wait_until_finish() @@ -119,13 +114,13 @@ def test_AnnotateImage_URIs(self): read_counter = query_result['counters'][0] self.assertTrue(read_counter.committed == expected_counter) - def test_AnnotateImageWithContext_b64_content(self): + def test_AnnotateImage_b64_content(self): base_64_encoded_image = \ b'YmVnaW4gNjQ0IGNhdC12aWRlby5tcDRNICAgICgmOVQ+NyFNPCMwUi4uZmFrZV92aWRlb' images_to_annotate = [ - (base_64_encoded_image, self.img_ctx), - (base_64_encoded_image, None), - (base_64_encoded_image, self.img_ctx), + base_64_encoded_image, + base_64_encoded_image, + base_64_encoded_image, ] expected_counter = len(images_to_annotate) with mock.patch.object(visionml, @@ -135,24 +130,21 @@ def test_AnnotateImageWithContext_b64_content(self): _ = ( p | "Create data" >> beam.Create(images_to_annotate) - | - "Annotate image" >> visionml.AnnotateImageWithContext(self.features)) + | "Annotate image" >> visionml.AnnotateImage(self.features)) result = p.run() result.wait_until_finish() - read_filter = MetricsFilter().with_name('API Calls') - query_result = result.metrics().query(read_filter) - if query_result['counters']: - read_counter = query_result['counters'][0] - self.assertTrue(read_counter.committed == expected_counter) + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.committed == expected_counter) - def test_AnnotateImage_b64_content(self): - base_64_encoded_image = \ - b'YmVnaW4gNjQ0IGNhdC12aWRlby5tcDRNICAgICgmOVQ+NyFNPCMwUi4uZmFrZV92aWRlb' + def test_AnnotateImageWithContext_URIs(self): images_to_annotate = [ - (base_64_encoded_image), - (base_64_encoded_image), - (base_64_encoded_image), + ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), + ('gs://cloud-samples-data/vision/ocr/sign.jpg', None), + ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), ] expected_counter = len(images_to_annotate) with mock.patch.object(visionml, @@ -162,7 +154,8 @@ def test_AnnotateImage_b64_content(self): _ = ( p | "Create data" >> beam.Create(images_to_annotate) - | "Annotate image" >> visionml.AnnotateImage(self.features)) + | + "Annotate image" >> visionml.AnnotateImageWithContext(self.features)) result = p.run() result.wait_until_finish() @@ -205,11 +198,18 @@ def test_AnnotateImage_bad_input(self): result = p.run() result.wait_until_finish() - def test_AnnotateImageWithContext_URI_with_ImageContext(self): + def test_BatchAnnotateImage_URIs(self): images_to_annotate = [ - ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx) + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', + 'gs://cloud-samples-data/vision/ocr/sign.jpg', ] - expected_counter = len(images_to_annotate) + + batch_size = 5 + expected_counter = 2 # All 6 images should fit in two batches with mock.patch.object(visionml, 'get_vision_client', return_value=self._mock_client): @@ -217,8 +217,9 @@ def test_AnnotateImageWithContext_URI_with_ImageContext(self): _ = ( p | "Create data" >> beam.Create(images_to_annotate) - | - "Annotate image" >> visionml.AnnotateImageWithContext(self.features)) + | "Annotate image" >> visionml.BatchAnnotateImage( + self.features, max_batch_size=batch_size, min_batch_size=batch_size) + ) result = p.run() result.wait_until_finish() @@ -228,29 +229,66 @@ def test_AnnotateImageWithContext_URI_with_ImageContext(self): read_counter = query_result['counters'][0] self.assertTrue(read_counter.committed == expected_counter) - def test_AnnotateImage_batch_request(self): + def test_BatchAnnotateImage_URIs_with_side_input_context(self): images_to_annotate = [ 'gs://cloud-samples-data/vision/ocr/sign.jpg', 'gs://cloud-samples-data/vision/ocr/sign.jpg', 'gs://cloud-samples-data/vision/ocr/sign.jpg', 'gs://cloud-samples-data/vision/ocr/sign.jpg', 'gs://cloud-samples-data/vision/ocr/sign.jpg', - 'gs://cloud-samples-data/vision/ocr/sign.jpg', + ] + image_contexts = [ + ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), ] batch_size = 5 - expected_counter = 2 # All 6 images should fit in two batches + expected_counter = 1 # All 5 images should fit in one batch with mock.patch.object(visionml, 'get_vision_client', return_value=self._mock_client): p = beam.Pipeline() + context_side_input = (p | "Image contexts" >> beam.Create(image_contexts)) _ = ( p | "Create data" >> beam.Create(images_to_annotate) - | "Annotate image" >> visionml.AsyncBatchAnnotateImage( + | "Annotate image" >> visionml.BatchAnnotateImage( self.features, - gcs_destination=self.gcs_destination, - batch_size=batch_size)) + max_batch_size=batch_size, + min_batch_size=batch_size, + context_side_input=beam.pvalue.AsDict(context_side_input))) + result = p.run() + result.wait_until_finish() + + read_filter = MetricsFilter().with_name('API Calls') + query_result = result.metrics().query(read_filter) + if query_result['counters']: + read_counter = query_result['counters'][0] + self.assertTrue(read_counter.committed == expected_counter) + + def test_BatchAnnotateImage_b64_content(self): + base_64_encoded_image = \ + b'YmVnaW4gNjQ0IGNhdC12aWRlby5tcDRNICAgICgmOVQ+NyFNPCMwUi4uZmFrZV92aWRlb' + images_to_annotate = [ + base_64_encoded_image, + base_64_encoded_image, + base_64_encoded_image, + base_64_encoded_image, + base_64_encoded_image, + base_64_encoded_image, + ] + + batch_size = 1 + expected_counter = 6 # All 6 images should fit in 6 batches + with mock.patch.object(visionml, + 'get_vision_client', + return_value=self._mock_client): + p = beam.Pipeline() + _ = ( + p + | "Create data" >> beam.Create(images_to_annotate) + | "Annotate image" >> visionml.BatchAnnotateImage( + self.features, max_batch_size=batch_size, min_batch_size=batch_size) + ) result = p.run() result.wait_until_finish() @@ -258,10 +296,9 @@ def test_AnnotateImage_batch_request(self): query_result = result.metrics().query(read_filter) if query_result['counters']: read_counter = query_result['counters'][0] - print(read_counter.committed) self.assertTrue(read_counter.committed == expected_counter) - def test_AnnotateImageWithContext_batch_request(self): + def test_BatchAnnotateImageWithContext(self): images_to_annotate = [ ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), @@ -281,8 +318,10 @@ def test_AnnotateImageWithContext_batch_request(self): _ = ( p | "Create data" >> beam.Create(images_to_annotate) - | "Annotate image" >> visionml.AnnotateImageWithContext( - self.features, batch_size=batch_size)) + | "Annotate image" >> visionml.BatchAnnotateImageWithContext( + self.features, + max_batch_size=batch_size, + min_batch_size=batch_size)) result = p.run() result.wait_until_finish() @@ -290,16 +329,14 @@ def test_AnnotateImageWithContext_batch_request(self): query_result = result.metrics().query(read_filter) if query_result['counters']: read_counter = query_result['counters'][0] - print(read_counter.committed) self.assertTrue(read_counter.committed == expected_counter) - def test_AnnotateImageWithContext_batch_bad_input(self): - """AnnotateImageWithContext should not accept images without context""" + def test_BatchAnnotateImageWithContext_bad_input(self): + """BatchAnnotateImageWithContext should not accept images without context""" images_to_annotate = [ 'gs://cloud-samples-data/vision/ocr/sign.jpg', 'gs://cloud-samples-data/vision/ocr/sign.jpg' ] - batch_size = 5 with mock.patch.object(visionml, 'get_vision_client', return_value=self._mock_client): @@ -308,30 +345,8 @@ def test_AnnotateImageWithContext_batch_bad_input(self): _ = ( p | "Create data" >> beam.Create(images_to_annotate) - | "Annotate image" >> visionml.AnnotateImageWithContext( - self.features, batch_size=batch_size)) - result = p.run() - result.wait_until_finish() - - def test_AnnotateImage_batch_bad_batch_size(self): - """AnnotateImageWithContext should raise ValueError when - batch size exceeds the limit - """ - images_to_annotate = [ - 'gs://cloud-samples-data/vision/ocr/sign.jpg', - 'gs://cloud-samples-data/vision/ocr/sign.jpg' - ] - batch_size = 50 - with mock.patch.object(visionml, - 'get_vision_client', - return_value=self._mock_client): - with self.assertRaises(ValueError): - p = beam.Pipeline() - _ = ( - p - | "Create data" >> beam.Create(images_to_annotate) - | "Annotate image" >> visionml.AnnotateImageWithContext( - self.features, batch_size=batch_size)) + | "Annotate image" >> visionml.BatchAnnotateImageWithContext( + self.features)) result = p.run() result.wait_until_finish() From 44f53605090fce5632e3ed75a654773f08ab8820 Mon Sep 17 00:00:00 2001 From: EDjur Date: Tue, 25 Feb 2020 16:37:37 +0100 Subject: [PATCH 06/11] Minor fixes --- sdks/python/apache_beam/ml/gcp/visionml.py | 4 +++- sdks/python/apache_beam/ml/gcp/visionml_test.py | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/ml/gcp/visionml.py b/sdks/python/apache_beam/ml/gcp/visionml.py index 70747133a13d..4cb1e8e5b631 100644 --- a/sdks/python/apache_beam/ml/gcp/visionml.py +++ b/sdks/python/apache_beam/ml/gcp/visionml.py @@ -88,6 +88,7 @@ def __init__( The time in seconds to wait for the response from the Vision API. client_options: (Union[dict, google.api_core.client_options.ClientOptions]) + Optional. Client options used to set user options on the client. API Endpoint should be set through client_options. context_side_input: (beam.pvalue.AsDict) Optional. @@ -195,6 +196,7 @@ def __init__(self, features, retry=None, timeout=120, client_options=None): The time in seconds to wait for the response from the Vision API client_options: (Union[dict, google.api_core.client_options.ClientOptions]) + Optional. Client options used to set user options on the client. API Endpoint should be set through client_options. """ @@ -434,7 +436,7 @@ def _create_image_annotation_pairs(self, element, **kwargs): class _BatchImageAnnotateFn(DoFn): """A DoFn that sends each input element to the GCP Vision API service in batches. - Returns a ``google.cloud.vision.types.AsyncBatchAnnotateImagesResponse``. + Returns ``google.cloud.vision.types.BatchAnnotateImagesResponse``. """ def __init__(self, features, retry, timeout, client_options, metadata): super(_BatchImageAnnotateFn, self).__init__() diff --git a/sdks/python/apache_beam/ml/gcp/visionml_test.py b/sdks/python/apache_beam/ml/gcp/visionml_test.py index e36e5c23787e..f3273b5debbf 100644 --- a/sdks/python/apache_beam/ml/gcp/visionml_test.py +++ b/sdks/python/apache_beam/ml/gcp/visionml_test.py @@ -56,7 +56,6 @@ def setUp(self): type=feature_type, max_results=3, model="builtin/stable") ] self.img_ctx = vision.types.ImageContext() - self.gcs_destination = "gs://mock-example-bucket/prefix/" def test_AnnotateImage_URIs(self): images_to_annotate = [ From 12f01f34f3b862176f3df25769e4e55b8bdc3940 Mon Sep 17 00:00:00 2001 From: EDjur Date: Tue, 25 Feb 2020 16:46:14 +0100 Subject: [PATCH 07/11] Added changelog --- CHANGES.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index b95b0da008d1..dae1eca60db1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -34,7 +34,8 @@ ### New Features / Improvements * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). -* New AnnotateVideo & AnnotateVideoWithContext PTransform's that integrates GCP Video Intelligence functionality ([BEAM-9146](https://issues.apache.org/jira/browse/BEAM-9146)) +* New AnnotateVideo & AnnotateVideoWithContext PTransform's that integrates GCP Video Intelligence functionality. (Python) ([BEAM-9146](https://issues.apache.org/jira/browse/BEAM-9146)) +* Added four new PTransforms for element-wise & batch image annotation using Google Cloud Vision API. (Python) ([BEAM-9247](https://issues.apache.org/jira/browse/BEAM-9247)) * Added a PTransform for inspection and deidentification of text using Google Cloud DLP. (Python) ([BEAM-9258](https://issues.apache.org/jira/browse/BEAM-9258)) ### Breaking Changes From b91b9b8781097b1024788322ac75a33029d1d150 Mon Sep 17 00:00:00 2001 From: EDjur Date: Wed, 26 Feb 2020 10:14:24 +0100 Subject: [PATCH 08/11] Merged single-element and batch image annotation into the same PTransforms --- CHANGES.md | 2 +- sdks/python/apache_beam/ml/gcp/visionml.py | 255 +++--------------- .../apache_beam/ml/gcp/visionml_test.py | 145 ++-------- 3 files changed, 65 insertions(+), 337 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index dae1eca60db1..00ce7b1b0c25 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -35,7 +35,7 @@ * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). * New AnnotateVideo & AnnotateVideoWithContext PTransform's that integrates GCP Video Intelligence functionality. (Python) ([BEAM-9146](https://issues.apache.org/jira/browse/BEAM-9146)) -* Added four new PTransforms for element-wise & batch image annotation using Google Cloud Vision API. (Python) ([BEAM-9247](https://issues.apache.org/jira/browse/BEAM-9247)) +* New AnnotateImage & AnnotateImageWithContext PTransform's for element-wise & batch image annotation using Google Cloud Vision API. (Python) ([BEAM-9247](https://issues.apache.org/jira/browse/BEAM-9247)) * Added a PTransform for inspection and deidentification of text using Google Cloud DLP. (Python) ([BEAM-9258](https://issues.apache.org/jira/browse/BEAM-9258)) ### Breaking Changes diff --git a/sdks/python/apache_beam/ml/gcp/visionml.py b/sdks/python/apache_beam/ml/gcp/visionml.py index 4cb1e8e5b631..13884a7bf0f0 100644 --- a/sdks/python/apache_beam/ml/gcp/visionml.py +++ b/sdks/python/apache_beam/ml/gcp/visionml.py @@ -49,8 +49,6 @@ __all__ = [ 'AnnotateImage', 'AnnotateImageWithContext', - 'BatchAnnotateImage', - 'BatchAnnotateImageWithContext' ] @@ -62,192 +60,18 @@ def get_vision_client(client_options=None): class AnnotateImage(PTransform): - """A ``PTransform`` for annotating images using the GCP Vision API + """A ``PTransform`` for annotating images using the GCP Vision API. ref: https://cloud.google.com/vision/docs/ - Sends each element to the GCP Vision API. Element is a - Union[text_type, binary_type] of either an URI (e.g. a GCS URI) or binary_type - base64-encoded image data. - Accepts an `AsDict` side input that maps each image to an image context. - """ - def __init__( - self, - features, - retry=None, - timeout=120, - client_options=None, - context_side_input=None): - """ - Args: - features: (List[``vision.types.Feature.enums.Feature``]) Required. - The Vision API features to detect - retry: (google.api_core.retry.Retry) Optional. - A retry object used to retry requests. - If None is specified (default), requests will not be retried. - timeout: (float) Optional. - The time in seconds to wait for the response from the - Vision API. - client_options: (Union[dict, google.api_core.client_options.ClientOptions]) - Optional. - Client options used to set user options on the client. - API Endpoint should be set through client_options. - context_side_input: (beam.pvalue.AsDict) Optional. - An ``AsDict`` of a PCollection to be passed to the - _ImageAnnotateFn as the image context mapping containing additional - image context and/or feature-specific parameters. - Example usage:: - - image_contexts = - [(''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict, - ``vision.types.ImageContext()``]), - (''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict, - ``vision.types.ImageContext()``]),] - - context_side_input = - ( - p - | "Image contexts" >> beam.Create(image_contexts) - ) - - visionml.AnnotateImage(features, - context_side_input=beam.pvalue.AsDict(context_side_input))) - """ - super(AnnotateImage, self).__init__() - self.features = features - self.retry = retry - self.timeout = timeout - self.client_options = client_options - self.context_side_input = context_side_input - - def expand(self, pvalue): - return pvalue | ParDo( - _ImageAnnotateFn( - features=self.features, - retry=self.retry, - timeout=self.timeout, - client_options=self.client_options), - context_side_input=self.context_side_input) - - -@typehints.with_input_types( - Union[text_type, binary_type], Optional[vision.types.ImageContext]) -class _ImageAnnotateFn(DoFn): - """A DoFn that sends each input element to the GCP Vision API - service and outputs an element with the return result of the API - (``google.cloud.vision_v1.types.AnnotateImageResponse``). - """ - def __init__(self, features, retry, timeout, client_options): - super(_ImageAnnotateFn, self).__init__() - self._client = None - self.features = features - self.retry = retry - self.timeout = timeout - self.client_options = client_options - self.counter = Metrics.counter(self.__class__, "API Calls") - - def setup(self): - self._client = get_vision_client(self.client_options) - - def _annotate_image(self, element, image_context): - if isinstance(element, text_type): - image = vision.types.Image( - source=vision.types.ImageSource(image_uri=element)) - else: # Typehint checks only allows text_type or binary_type - image = vision.types.Image(content=element) - - request = vision.types.AnnotateImageRequest( - image=image, features=self.features, image_context=image_context) - response = self._client.annotate_image( - request=request, retry=self.retry, timeout=self.timeout) - - return response - - def process(self, element, context_side_input=None, *args, **kwargs): - if context_side_input: # If we have a side input image context, use that - image_context = context_side_input.get(element) - else: - image_context = None - response = self._annotate_image(element, image_context) - self.counter.inc() - yield response - - -class AnnotateImageWithContext(AnnotateImage): - """A ``PTransform`` for annotating images using the GCP Vision API - ref: https://cloud.google.com/vision/docs/ - - Sends each element to the GCP Vision API. Element is a tuple of - - (Union[text_type, binary_type], - Optional[``vision.types.ImageContext``]) - - where the former is either an URI (e.g. a GCS URI) or binary_type - base64-encoded image data. - """ - def __init__(self, features, retry=None, timeout=120, client_options=None): - """ - Args: - features: (List[``vision.types.Feature.enums.Feature``]) Required. - The Vision API features to detect - retry: (google.api_core.retry.Retry) Optional. - A retry object used to retry requests. - If None is specified (default), requests will not be retried. - timeout: (float) Optional. - The time in seconds to wait for the response from the - Vision API - client_options: (Union[dict, google.api_core.client_options.ClientOptions]) - Optional. - Client options used to set user options on the client. - API Endpoint should be set through client_options. - """ - super(AnnotateImageWithContext, self).__init__( - features=features, - retry=retry, - timeout=timeout, - client_options=client_options) - - def expand(self, pvalue): - return pvalue | ParDo( - _ImageAnnotateFnWithContext( - features=self.features, - retry=self.retry, - timeout=self.timeout, - client_options=self.client_options)) - - -@typehints.with_input_types( - Tuple[Union[text_type, binary_type], Optional[vision.types.ImageContext]]) -class _ImageAnnotateFnWithContext(_ImageAnnotateFn): - """A DoFn that unpacks each input tuple to element, image_context variables - and sends these to the GCP Vision API service and outputs an element with the - return result of the API - (``google.cloud.vision_v1.types.AnnotateImageResponse``). - """ - def __init__(self, features, retry, timeout, client_options): - super(_ImageAnnotateFnWithContext, self).__init__( - features=features, - retry=retry, - timeout=timeout, - client_options=client_options) - - def process(self, element, *args, **kwargs): - element, image_context = element # Unpack (image, image_context) tuple - response = self._annotate_image(element, image_context) - self.counter.inc() - yield response - - -class BatchAnnotateImage(PTransform): - """A ``PTransform`` for batch (offline) annotating images using the - GCP Vision API. ref: https://cloud.google.com/vision/docs/ - - Sends each batch of elements to the GCP Vision API. + Batches elements together using ``util.BatchElements`` PTransform and sends + each batch of elements to the GCP Vision API. Element is a Union[text_type, binary_type] of either an URI (e.g. a GCS URI) or binary_type base64-encoded image data. Accepts an `AsDict` side input that maps each image to an image context. """ MAX_BATCH_SIZE = 5 + MIN_BATCH_SIZE = 1 def __init__( self, @@ -255,7 +79,7 @@ def __init__( retry=None, timeout=120, max_batch_size=None, - min_batch_size=1, + min_batch_size=None, client_options=None, context_side_input=None, metadata=None): @@ -268,13 +92,16 @@ def __init__( If None is specified (default), requests will not be retried. timeout: (float) Optional. The time in seconds to wait for the response from the Vision API. - Default is 120 for single-element requests and 300 for batch annotation. - max_batch_size: (int) Maximum number of images to batch in the same - request to the Vision API. + Default is 120. + max_batch_size: (int) Optional. + Maximum number of images to batch in the same request to the Vision API. Default is 5 (which is also the Vision API max). - min_batch_size: (int) Minimum number of images to batch in the same - request to the Vision API. Default is 1. - client_options: (Union[dict, google.api_core.client_options.ClientOptions]) + This parameter is primarily intended for testing. + min_batch_size: (int) Optional. + Minimum number of images to batch in the same request to the Vision API. + Default is None. This parameter is primarily intended for testing. + client_options: + (Union[dict, google.api_core.client_options.ClientOptions]) Optional. Client options used to set user options on the client. API Endpoint should be set through client_options. context_side_input: (beam.pvalue.AsDict) Optional. @@ -295,22 +122,22 @@ def __init__( | "Image contexts" >> beam.Create(image_contexts) ) - visionml.AsyncBatchAnnotateImage(features, output_config, + visionml.AnnotateImage(features, context_side_input=beam.pvalue.AsDict(context_side_input))) metadata: (Optional[Sequence[Tuple[str, str]]]): Optional. Additional metadata that is provided to the method. """ - super(BatchAnnotateImage, self).__init__() + super(AnnotateImage, self).__init__() self.features = features self.retry = retry self.timeout = timeout - self.max_batch_size = max_batch_size or BatchAnnotateImage.MAX_BATCH_SIZE - if self.max_batch_size > BatchAnnotateImage.MAX_BATCH_SIZE: + self.max_batch_size = max_batch_size or AnnotateImage.MAX_BATCH_SIZE + if self.max_batch_size > AnnotateImage.MAX_BATCH_SIZE: raise ValueError( 'Max batch_size exceeded. ' 'Batch size needs to be smaller than {}'.format( - BatchAnnotateImage.MAX_BATCH_SIZE)) - self.min_batch_size = min_batch_size + AnnotateImage.MAX_BATCH_SIZE)) + self.min_batch_size = min_batch_size or AnnotateImage.MIN_BATCH_SIZE self.client_options = client_options self.context_side_input = context_side_input self.metadata = metadata @@ -323,7 +150,7 @@ def expand(self, pvalue): min_batch_size=self.min_batch_size, max_batch_size=self.max_batch_size) | ParDo( - _BatchImageAnnotateFn( + _ImageAnnotateFn( features=self.features, retry=self.retry, timeout=self.timeout, @@ -350,11 +177,13 @@ def _create_image_annotation_pairs(self, element, context_side_input): yield request -class BatchAnnotateImageWithContext(BatchAnnotateImage): - """A ``PTransform`` for batch (offline) annotating images using the - GCP Vision API. ref: https://cloud.google.com/vision/docs/batch +class AnnotateImageWithContext(AnnotateImage): + """A ``PTransform`` for annotating images using the GCP Vision API. + ref: https://cloud.google.com/vision/docs/ + Batches elements together using ``util.BatchElements`` PTransform and sends + each batch of elements to the GCP Vision API. - Sends each element to the GCP Vision API. Element is a tuple of + Element is a tuple of:: (Union[text_type, binary_type], Optional[``vision.types.ImageContext``]) @@ -368,7 +197,7 @@ def __init__( retry=None, timeout=120, max_batch_size=None, - min_batch_size=1, + min_batch_size=None, client_options=None, metadata=None): """ @@ -380,19 +209,22 @@ def __init__( If None is specified (default), requests will not be retried. timeout: (float) Optional. The time in seconds to wait for the response from the Vision API. - Default is 120 for single-element requests and 300 for batch annotation. - max_batch_size: (int) Maximum number of images to batch in the same - request to the Vision API. + Default is 120. + max_batch_size: (int) Optional. + Maximum number of images to batch in the same request to the Vision API. Default is 5 (which is also the Vision API max). - min_batch_size: (int) Minimum number of images to batch in the same - request to the Vision API. Default is 1. - client_options: (Union[dict, google.api_core.client_options.ClientOptions]) + This parameter is primarily intended for testing. + min_batch_size: (int) Optional. + Minimum number of images to batch in the same request to the Vision API. + Default is None. This parameter is primarily intended for testing. + client_options: + (Union[dict, google.api_core.client_options.ClientOptions]) Optional. Client options used to set user options on the client. API Endpoint should be set through client_options. metadata: (Optional[Sequence[Tuple[str, str]]]): Optional. Additional metadata that is provided to the method. """ - super(BatchAnnotateImageWithContext, self).__init__( + super(AnnotateImageWithContext, self).__init__( features=features, retry=retry, timeout=timeout, @@ -409,7 +241,7 @@ def expand(self, pvalue): min_batch_size=self.min_batch_size, max_batch_size=self.max_batch_size) | ParDo( - _BatchImageAnnotateFn( + _ImageAnnotateFn( features=self.features, retry=self.retry, timeout=self.timeout, @@ -433,13 +265,12 @@ def _create_image_annotation_pairs(self, element, **kwargs): @typehints.with_input_types(List[vision.types.AnnotateImageRequest]) -class _BatchImageAnnotateFn(DoFn): - """A DoFn that sends each input element to the GCP Vision API - service in batches. +class _ImageAnnotateFn(DoFn): + """A DoFn that sends each input element to the GCP Vision API. Returns ``google.cloud.vision.types.BatchAnnotateImagesResponse``. """ def __init__(self, features, retry, timeout, client_options, metadata): - super(_BatchImageAnnotateFn, self).__init__() + super(_ImageAnnotateFn, self).__init__() self._client = None self.features = features self.retry = retry @@ -448,7 +279,7 @@ def __init__(self, features, retry, timeout, client_options, metadata): self.metadata = metadata self.counter = Metrics.counter(self.__class__, "API Calls") - def start_bundle(self): + def setup(self): self._client = get_vision_client(self.client_options) def process(self, element, *args, **kwargs): diff --git a/sdks/python/apache_beam/ml/gcp/visionml_test.py b/sdks/python/apache_beam/ml/gcp/visionml_test.py index f3273b5debbf..d943da794f4c 100644 --- a/sdks/python/apache_beam/ml/gcp/visionml_test.py +++ b/sdks/python/apache_beam/ml/gcp/visionml_test.py @@ -47,7 +47,6 @@ class VisionTest(unittest.TestCase): def setUp(self): self._mock_client = mock.Mock() self.m2 = mock.Mock() - self._mock_client.annotate_image.return_value = None self._mock_client.batch_annotate_images.return_value = None feature_type = vision.enums.Feature.Type.TEXT_DETECTION @@ -56,6 +55,8 @@ def setUp(self): type=feature_type, max_results=3, model="builtin/stable") ] self.img_ctx = vision.types.ImageContext() + self.min_batch_size = 1 + self.max_batch_size = 1 def test_AnnotateImage_URIs(self): images_to_annotate = [ @@ -71,7 +72,10 @@ def test_AnnotateImage_URIs(self): _ = ( p | "Create data" >> beam.Create(images_to_annotate) - | "Annotate image" >> visionml.AnnotateImage(self.features)) + | "Annotate image" >> visionml.AnnotateImage( + self.features, + min_batch_size=self.min_batch_size, + max_batch_size=self.max_batch_size)) result = p.run() result.wait_until_finish() @@ -103,6 +107,8 @@ def test_AnnotateImage_URI_with_side_input_context(self): | "Create data" >> beam.Create(images_to_annotate) | "Annotate image" >> visionml.AnnotateImage( self.features, + min_batch_size=self.min_batch_size, + max_batch_size=self.max_batch_size, context_side_input=beam.pvalue.AsDict(context_side_input))) result = p.run() result.wait_until_finish() @@ -129,7 +135,10 @@ def test_AnnotateImage_b64_content(self): _ = ( p | "Create data" >> beam.Create(images_to_annotate) - | "Annotate image" >> visionml.AnnotateImage(self.features)) + | "Annotate image" >> visionml.AnnotateImage( + self.features, + min_batch_size=self.min_batch_size, + max_batch_size=self.max_batch_size)) result = p.run() result.wait_until_finish() @@ -145,7 +154,8 @@ def test_AnnotateImageWithContext_URIs(self): ('gs://cloud-samples-data/vision/ocr/sign.jpg', None), ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), ] - expected_counter = len(images_to_annotate) + batch_size = 5 + expected_counter = 1 # All images should fit in the same batch with mock.patch.object(visionml, 'get_vision_client', return_value=self._mock_client): @@ -153,8 +163,10 @@ def test_AnnotateImageWithContext_URIs(self): _ = ( p | "Create data" >> beam.Create(images_to_annotate) - | - "Annotate image" >> visionml.AnnotateImageWithContext(self.features)) + | "Annotate image" >> visionml.AnnotateImageWithContext( + self.features, + min_batch_size=batch_size, + max_batch_size=batch_size)) result = p.run() result.wait_until_finish() @@ -197,7 +209,7 @@ def test_AnnotateImage_bad_input(self): result = p.run() result.wait_until_finish() - def test_BatchAnnotateImage_URIs(self): + def test_AnnotateImage_URIs_large_batch(self): images_to_annotate = [ 'gs://cloud-samples-data/vision/ocr/sign.jpg', 'gs://cloud-samples-data/vision/ocr/sign.jpg', @@ -205,111 +217,15 @@ def test_BatchAnnotateImage_URIs(self): 'gs://cloud-samples-data/vision/ocr/sign.jpg', 'gs://cloud-samples-data/vision/ocr/sign.jpg', 'gs://cloud-samples-data/vision/ocr/sign.jpg', - ] - - batch_size = 5 - expected_counter = 2 # All 6 images should fit in two batches - with mock.patch.object(visionml, - 'get_vision_client', - return_value=self._mock_client): - p = beam.Pipeline() - _ = ( - p - | "Create data" >> beam.Create(images_to_annotate) - | "Annotate image" >> visionml.BatchAnnotateImage( - self.features, max_batch_size=batch_size, min_batch_size=batch_size) - ) - result = p.run() - result.wait_until_finish() - - read_filter = MetricsFilter().with_name('API Calls') - query_result = result.metrics().query(read_filter) - if query_result['counters']: - read_counter = query_result['counters'][0] - self.assertTrue(read_counter.committed == expected_counter) - - def test_BatchAnnotateImage_URIs_with_side_input_context(self): - images_to_annotate = [ 'gs://cloud-samples-data/vision/ocr/sign.jpg', 'gs://cloud-samples-data/vision/ocr/sign.jpg', 'gs://cloud-samples-data/vision/ocr/sign.jpg', 'gs://cloud-samples-data/vision/ocr/sign.jpg', 'gs://cloud-samples-data/vision/ocr/sign.jpg', ] - image_contexts = [ - ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), - ] batch_size = 5 - expected_counter = 1 # All 5 images should fit in one batch - with mock.patch.object(visionml, - 'get_vision_client', - return_value=self._mock_client): - p = beam.Pipeline() - context_side_input = (p | "Image contexts" >> beam.Create(image_contexts)) - _ = ( - p - | "Create data" >> beam.Create(images_to_annotate) - | "Annotate image" >> visionml.BatchAnnotateImage( - self.features, - max_batch_size=batch_size, - min_batch_size=batch_size, - context_side_input=beam.pvalue.AsDict(context_side_input))) - result = p.run() - result.wait_until_finish() - - read_filter = MetricsFilter().with_name('API Calls') - query_result = result.metrics().query(read_filter) - if query_result['counters']: - read_counter = query_result['counters'][0] - self.assertTrue(read_counter.committed == expected_counter) - - def test_BatchAnnotateImage_b64_content(self): - base_64_encoded_image = \ - b'YmVnaW4gNjQ0IGNhdC12aWRlby5tcDRNICAgICgmOVQ+NyFNPCMwUi4uZmFrZV92aWRlb' - images_to_annotate = [ - base_64_encoded_image, - base_64_encoded_image, - base_64_encoded_image, - base_64_encoded_image, - base_64_encoded_image, - base_64_encoded_image, - ] - - batch_size = 1 - expected_counter = 6 # All 6 images should fit in 6 batches - with mock.patch.object(visionml, - 'get_vision_client', - return_value=self._mock_client): - p = beam.Pipeline() - _ = ( - p - | "Create data" >> beam.Create(images_to_annotate) - | "Annotate image" >> visionml.BatchAnnotateImage( - self.features, max_batch_size=batch_size, min_batch_size=batch_size) - ) - result = p.run() - result.wait_until_finish() - - read_filter = MetricsFilter().with_name('API Calls') - query_result = result.metrics().query(read_filter) - if query_result['counters']: - read_counter = query_result['counters'][0] - self.assertTrue(read_counter.committed == expected_counter) - - def test_BatchAnnotateImageWithContext(self): - images_to_annotate = [ - ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), - ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), - ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), - ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), - ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), - ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), - ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), - ('gs://cloud-samples-data/vision/ocr/sign.jpg', self.img_ctx), - ] - batch_size = 3 - expected_counter = 3 # All 8 images should fit in 3 batches + expected_counter = 3 # All 11 images should fit in 3 batches with mock.patch.object(visionml, 'get_vision_client', return_value=self._mock_client): @@ -317,7 +233,7 @@ def test_BatchAnnotateImageWithContext(self): _ = ( p | "Create data" >> beam.Create(images_to_annotate) - | "Annotate image" >> visionml.BatchAnnotateImageWithContext( + | "Annotate image" >> visionml.AnnotateImage( self.features, max_batch_size=batch_size, min_batch_size=batch_size)) @@ -330,25 +246,6 @@ def test_BatchAnnotateImageWithContext(self): read_counter = query_result['counters'][0] self.assertTrue(read_counter.committed == expected_counter) - def test_BatchAnnotateImageWithContext_bad_input(self): - """BatchAnnotateImageWithContext should not accept images without context""" - images_to_annotate = [ - 'gs://cloud-samples-data/vision/ocr/sign.jpg', - 'gs://cloud-samples-data/vision/ocr/sign.jpg' - ] - with mock.patch.object(visionml, - 'get_vision_client', - return_value=self._mock_client): - with self.assertRaises(TypeCheckError): - p = beam.Pipeline() - _ = ( - p - | "Create data" >> beam.Create(images_to_annotate) - | "Annotate image" >> visionml.BatchAnnotateImageWithContext( - self.features)) - result = p.run() - result.wait_until_finish() - if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From c16356e92904908061257c65ceab2a4da020688c Mon Sep 17 00:00:00 2001 From: EDjur Date: Wed, 26 Feb 2020 10:26:20 +0100 Subject: [PATCH 09/11] Pinned required versions in setup.py --- sdks/python/setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index eefd0cd4518b..bdbadc5e6c13 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -213,8 +213,8 @@ def get_version(): 'google-cloud-spanner>=1.13.0,<1.14.0', 'grpcio-gcp>=0.2.2,<1', # GCP Packages required by ML functionality - 'google-cloud-videointelligence>=1.8.0<1.14.0', - 'google-cloud-vision<=0.43.0' + 'google-cloud-videointelligence>=1.8.0,<1.14.0', + 'google-cloud-vision>=0.38.0,<0.43.0' ] INTERACTIVE_BEAM = [ From 54fd6c14e8ac6639fccc74b62cbb8c0e38241111 Mon Sep 17 00:00:00 2001 From: EDjur Date: Wed, 26 Feb 2020 10:31:28 +0100 Subject: [PATCH 10/11] Minor style fix in setup.py: Add trailing comma. --- sdks/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index bdbadc5e6c13..c976b6f083c9 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -214,7 +214,7 @@ def get_version(): 'grpcio-gcp>=0.2.2,<1', # GCP Packages required by ML functionality 'google-cloud-videointelligence>=1.8.0,<1.14.0', - 'google-cloud-vision>=0.38.0,<0.43.0' + 'google-cloud-vision>=0.38.0,<0.43.0', ] INTERACTIVE_BEAM = [ From d2f01fb62d54b5fca47ab21c5bef40a443f88b23 Mon Sep 17 00:00:00 2001 From: EDjur Date: Wed, 26 Feb 2020 14:36:26 +0100 Subject: [PATCH 11/11] read_counter.commited -> read_counter.result. Remove unused mock --- sdks/python/apache_beam/ml/gcp/visionml_test.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/ml/gcp/visionml_test.py b/sdks/python/apache_beam/ml/gcp/visionml_test.py index d943da794f4c..d4c6c203feaa 100644 --- a/sdks/python/apache_beam/ml/gcp/visionml_test.py +++ b/sdks/python/apache_beam/ml/gcp/visionml_test.py @@ -46,7 +46,6 @@ class VisionTest(unittest.TestCase): def setUp(self): self._mock_client = mock.Mock() - self.m2 = mock.Mock() self._mock_client.batch_annotate_images.return_value = None feature_type = vision.enums.Feature.Type.TEXT_DETECTION @@ -83,7 +82,7 @@ def test_AnnotateImage_URIs(self): query_result = result.metrics().query(read_filter) if query_result['counters']: read_counter = query_result['counters'][0] - self.assertTrue(read_counter.committed == expected_counter) + self.assertTrue(read_counter.result == expected_counter) def test_AnnotateImage_URI_with_side_input_context(self): images_to_annotate = [ @@ -117,7 +116,7 @@ def test_AnnotateImage_URI_with_side_input_context(self): query_result = result.metrics().query(read_filter) if query_result['counters']: read_counter = query_result['counters'][0] - self.assertTrue(read_counter.committed == expected_counter) + self.assertTrue(read_counter.result == expected_counter) def test_AnnotateImage_b64_content(self): base_64_encoded_image = \ @@ -146,7 +145,7 @@ def test_AnnotateImage_b64_content(self): query_result = result.metrics().query(read_filter) if query_result['counters']: read_counter = query_result['counters'][0] - self.assertTrue(read_counter.committed == expected_counter) + self.assertTrue(read_counter.result == expected_counter) def test_AnnotateImageWithContext_URIs(self): images_to_annotate = [ @@ -174,7 +173,7 @@ def test_AnnotateImageWithContext_URIs(self): query_result = result.metrics().query(read_filter) if query_result['counters']: read_counter = query_result['counters'][0] - self.assertTrue(read_counter.committed == expected_counter) + self.assertTrue(read_counter.result == expected_counter) def test_AnnotateImageWithContext_bad_input(self): """AnnotateImageWithContext should not accept images without context""" @@ -244,7 +243,7 @@ def test_AnnotateImage_URIs_large_batch(self): query_result = result.metrics().query(read_filter) if query_result['counters']: read_counter = query_result['counters'][0] - self.assertTrue(read_counter.committed == expected_counter) + self.assertTrue(read_counter.result == expected_counter) if __name__ == '__main__':