Skip to content

Conversation

@EDjur
Copy link
Contributor

@EDjur EDjur commented Feb 25, 2020

This PR refers to https://issues.apache.org/jira/browse/BEAM-9247: [Python] PTransform that integrates Cloud Vision functionality.

Small comments:
The synchronous annotation is very similar to the videointelligence implementation.

I opted to only support async (offline) batch annotation:

  • Synchronous batch annotation only supports batching <=5 elements at a time, so is not very useful (https://cloud.google.com/vision/docs/batch).
  • Synchronous batch annotation also doesn't make a lot of sense for Beam, as the CPU/workers would just sit around and wait until a response is received from the Vision API, which seems inefficient.

I'm still working on a few kinks and issues with the batch annotation as well as making sure all tests run properly, so consider this a draft PR for now. PR is now finalised.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@EDjur
Copy link
Contributor Author

EDjur commented Feb 25, 2020

R: @aaltay @kamilwu

@robertwb
Copy link
Contributor

Has there been a discussion of whether this (rather specialized) kind of transform belongs in beam vs. as a separate library built on top of beam?

Also, is the synchronous/batching pattern something that would be generally useful, and could be pulled out, or is it really integral to these transforms and contexts?

@EDjur
Copy link
Contributor Author

EDjur commented Feb 25, 2020

Most of the discussion was done in this PR: #10764 and the over-arching Jira ticket is here https://issues.apache.org/jira/browse/BEAM-9145

I agree that these transforms are slightly different as they call an external API and lets that do most of the heavy-lifting processing.

The batching pattern implemented here is quite specific to these transforms. Not sure how applicable it would be to pull out.

@kamilwu
Copy link
Contributor

kamilwu commented Feb 25, 2020

Has there been a discussion of whether this (rather specialized) kind of transform belongs in beam vs. as a separate library built on top of beam?

There's been a discussion on devlist some time ago: here. General conclusion was that it's fine to put Cloud AI transforms directly into Beam. Also, we have similar transforms already implemented: https://github.com/apache/beam/tree/master/sdks/python/apache_beam/ml/gcp

@kamilwu
Copy link
Contributor

kamilwu commented Feb 25, 2020

I wonder whether it makes sense to support async (offline) annotation from Beam's perspective. Let's suppose that we don't return anything and AsyncBatchAnnotateImage essentially become a sink. In that case, Beam, as a data processing framework, doesn't provide much value. If all the transform does is just sending a request, there is no point in executing it on multiple Dataflow or Flink workers. We'd rather use a task orchestration tool, like Apache Airflow.

On the other hand, the main advantage of sync (online) annotation is that it returns results relatively fast for further processing.

@kamilwu
Copy link
Contributor

kamilwu commented Feb 25, 2020

Synchronous batch annotation only supports batching <=5 elements at a time, so is not very useful

From my perspective, there should be no problem in sending a request containing up to 5 files to be annotated, then waiting for the result and sending another request.

@EDjur
Copy link
Contributor Author

EDjur commented Feb 25, 2020

I wonder whether it makes sense to support async (offline) annotation from Beam's perspective. Let's suppose that we don't return anything and AsyncBatchAnnotateImage essentially become a sink. In that case, Beam, as a data processing framework, doesn't provide much value. If all the transform does is just sending a request, there is no point in executing it on multiple Dataflow or Flink workers. We'd rather use a task orchestration tool, like Apache Airflow.

On the other hand, the main advantage of sync (online) annotation is that it returns results relatively fast for further processing.

I agree that those sort of tasks are more suited for e.g. Airflow, and that was something that crossed my mind too.

If you think that batches of <=5 items are useful, then perhaps we should go with sync (online) batch annotation rather than async. But I guess the question then is how much use (or more efficient) a batch of 5 image annotations is compared to 5 separate annotations.

@kamilwu
Copy link
Contributor

kamilwu commented Feb 25, 2020

If you think that batches of <=5 items are useful, then perhaps we should go with sync (online) batch annotation rather than async.

I don't see any obstacles.

Also take a look at BatchElements built-in PTransform (to be found at sdks/python/apache_beam/transforms/util.py), which could be useful in our case. This transform consumes a PCollection of element type T and produces a PCollection of type List[T]. The maximum size of the list can be configured by setting a max_batch_size parameter. This would speed things up a bit: it's much better to send one request with 5 files than 5 separate requests.

@EDjur
Copy link
Contributor Author

EDjur commented Feb 25, 2020

Cheers for the tip about BatchElements. Using that reduces code duplication quite a bit as we can offload the creation of the AnnotateImageRequest to an earlier step in the PTransform, leaving us with only one DoFn for the two Batch transforms.

@kamilwu
Copy link
Contributor

kamilwu commented Feb 25, 2020

Cool, nice to hear that!

One more thing that came to my mind.
Would it be possible to merge AnnotateImage and BatchAnnotateImage transforms? I think the user has little interest in configuring min_batch_size and max_batch_size, because, at the end, responses are flatten (output PCollection is of type AnnotateImageResponse, not List[AnnotateImageResponse]). Also, I expect that batch_annotate_images with len(requests) == 1 works the same way as annotate_images. But maybe I'm wrong.

@aaltay
Copy link
Member

aaltay commented Feb 25, 2020

Overall direction LGTM. One comment on the setup.py

  1. Do you need a lower limit?
  2. <0.43.0 ? if we dont want to automatically pick up the next version.

@EDjur
Copy link
Contributor Author

EDjur commented Feb 25, 2020

Overall direction LGTM. One comment on the setup.py

  1. Do you need a lower limit?
  2. <0.43.0 ? if we dont want to automatically pick up the next version.
  1. Yes, sorry, I'm meaning to test out a few different versions below 0.42.0 before finalising the PR. But wanted to make sure that the PR was heading in the correct general direction before doing that.

  2. Good catch, that's a typo, thanks!

@EDjur
Copy link
Contributor Author

EDjur commented Feb 26, 2020

Cool, nice to hear that!

One more thing that came to my mind.
Would it be possible to merge AnnotateImage and BatchAnnotateImage transforms? I think the user has little interest in configuring min_batch_size and max_batch_size, because, at the end, responses are flatten (output PCollection is of type AnnotateImageResponse, not List[AnnotateImageResponse]). Also, I expect that batch_annotate_images with len(requests) == 1 works the same way as annotate_images. But maybe I'm wrong.

Good guess! Indeed, looking at the source code for annotate_image, it is essentially just a wrapper around batch_annotate_images like so r = self.batch_annotate_images([request], retry=retry, timeout=timeout).

I'll need to keep the min_batch_size and max_batch_size parameters for testing purposes, in order to get a valid count of the API calls as otherwise the number of calls is dependent on the BatchElements transform. But I'll make sure to document these two parameters as such.

@EDjur
Copy link
Contributor Author

EDjur commented Feb 26, 2020

@aaltay I noticed an issue in setup.py for the videointelligence library: it was missing a comma. I added that fix in this PR if that's okay.

I also updated CHANGES.md to specify that the videointelligence features are for the Python SDK.

class VisionTest(unittest.TestCase):
def setUp(self):
self._mock_client = mock.Mock()
self.m2 = mock.Mock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mock seems to be unused

query_result = result.metrics().query(read_filter)
if query_result['counters']:
read_counter = query_result['counters'][0]
self.assertTrue(read_counter.committed == expected_counter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's safer to use result property rather than committed, because when committed is empty, attempted is being used. See the code:

@property
def result(self):
"""Short-hand for falling back to attempted metrics if it seems that
committed was not populated (e.g. due to not being supported on a given
runner"""
return self.committed if self.committed else self.attempted

@kamilwu
Copy link
Contributor

kamilwu commented Feb 26, 2020

LGTM. I've left two small comments, but consider them as non-blockers.

Thanks for merging those PTransform I mentioned and for documenting batch_size parameters.

@EDjur
Copy link
Contributor Author

EDjur commented Feb 26, 2020

Thanks for the thorough review!

@mwalenia
Copy link
Member

retest this please

@mwalenia mwalenia merged commit cf3ca68 into apache:master Feb 26, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants