From 07c571254a3596b6200282b9718361f6b8960f25 Mon Sep 17 00:00:00 2001 From: Jacob Geiger Date: Tue, 26 Apr 2016 13:07:14 -0700 Subject: [PATCH 1/3] Change page-streaming behavior Now return an iterable over pages of the response resource field, rather than the return type, for page stream-able calls where page streaming is disabled. Note that this removes the ability to get a standard gRPC/proto response from any call that is capable of page streaming. --- google/gax/__init__.py | 18 ++++++++++++------ google/gax/api_callable.py | 33 +++++++++++++++++++++++---------- test/test_api_callable.py | 19 ++++++++++++++++--- 3 files changed, 51 insertions(+), 19 deletions(-) diff --git a/google/gax/__init__.py b/google/gax/__init__.py index 1413313..5bc8442 100644 --- a/google/gax/__init__.py +++ b/google/gax/__init__.py @@ -33,7 +33,7 @@ import collections -__version__ = '0.10.1' +__version__ = '0.10.2' OPTION_INHERIT = object() @@ -48,7 +48,7 @@ class CallSettings(object): """Encapsulates the call settings for an API call.""" # pylint: disable=too-few-public-methods def __init__(self, timeout=30, retry=None, page_descriptor=None, - bundler=None, bundle_descriptor=None): + flatten_pages=None, bundler=None, bundle_descriptor=None): """Constructor. Args: @@ -59,6 +59,10 @@ def __init__(self, timeout=30, retry=None, page_descriptor=None, page_descriptor (:class:`PageDescriptor`): indicates the structure of page streaming to be performed. If set to None, page streaming is disabled. + flatten_pages (bool): If there is no page_descriptor, this attrbute + has no meaning. Otherwise, determines whether a page streamed + response should make the page structure transparent to the user by + flattening the repeated field in the returned generator. bundler (:class:`gax.bundling.Executor`): orchestrates bundling. If None, bundling is not performed. bundle_descriptor (:class:`BundleDescriptor`): indicates the @@ -67,6 +71,7 @@ def __init__(self, timeout=30, retry=None, page_descriptor=None, self.timeout = timeout self.retry = retry self.page_descriptor = page_descriptor + self.flatten_pages = flatten_pages self.bundler = bundler self.bundle_descriptor = bundle_descriptor @@ -97,10 +102,10 @@ def merge(self, options): else: retry = options.retry - if options.is_page_streaming: - page_descriptor = self.page_descriptor + if options.is_page_streaming == OPTION_INHERIT: + flatten_pages = self.flatten_pages else: - page_descriptor = None + flatten_pages = options.is_page_streaming if options.is_bundling: bundler = self.bundler @@ -109,7 +114,8 @@ def merge(self, options): return CallSettings( timeout=timeout, retry=retry, - page_descriptor=page_descriptor, bundler=bundler, + page_descriptor=self.page_descriptor, + flatten_pages=flatten_pages, bundler=bundler, bundle_descriptor=self.bundle_descriptor) diff --git a/google/gax/api_callable.py b/google/gax/api_callable.py index dc497f1..9b2cec2 100644 --- a/google/gax/api_callable.py +++ b/google/gax/api_callable.py @@ -164,18 +164,23 @@ def inner(*args, **kwargs): def _page_streamable(a_func, request_page_token_field, response_page_token_field, - resource_field): + resource_field, + flatten_pages=True): """Creates a function that yields an iterable to performs page-streaming. Args: - a_func: an API call that is page streaming. - request_page_token_field: The field of the page token in the request. - response_page_token_field: The field of the next page token in the - response. - resource_field: The field to be streamed. + a_func (callable[[req], resp]): an API call that is page streaming. + request_page_token_field (str): The name of the field of the page token + in the request. + response_page_token_field (str): The name of the field of the next page + token in the response. + resource_field (str): The name of the field to be streamed. + flatten_pages (bool): Optional. If set, the returned iterable is over + ``resource_field``; otherwise the returned iterable is over the pages + of the response, each of which is an iterable over ``resource_field``. Returns: - A function that returns an iterable over the specified field. + A function that returns an iterable. """ def inner(*args, **kwargs): @@ -183,8 +188,11 @@ def inner(*args, **kwargs): request = args[0] while True: response = a_func(request, **kwargs) - for obj in getattr(response, resource_field): - yield obj + if flatten_pages: + for obj in getattr(response, resource_field): + yield obj + else: + yield getattr(response, resource_field) next_page_token = getattr(response, response_page_token_field) if not next_page_token: break @@ -459,11 +467,16 @@ def create_api_call(func, settings): if settings.bundler and settings.bundle_descriptor: raise ValueError('The API call has incompatible settings: ' 'bundling and page streaming') + if settings.flatten_pages is None: + flatten_pages = True + else: + flatten_pages = settings.flatten_pages return _page_streamable( api_call, settings.page_descriptor.request_page_token_field, settings.page_descriptor.response_page_token_field, - settings.page_descriptor.resource_field) + settings.page_descriptor.resource_field, + flatten_pages=flatten_pages) if settings.bundler and settings.bundle_descriptor: return _bundleable(api_call, settings.bundle_descriptor, diff --git a/test/test_api_callable.py b/test/test_api_callable.py index 3329a05..3c6f260 100644 --- a/test/test_api_callable.py +++ b/test/test_api_callable.py @@ -287,13 +287,13 @@ def grpc_return_value(request, *dummy_args, **dummy_kwargs): if (request.page_token > 0 and request.page_token < page_size * pages_to_stream): return PageStreamingResponse( - nums=iter(range(request.page_token, + nums=list(range(request.page_token, request.page_token + page_size)), next_page_token=request.page_token + page_size) elif request.page_token >= page_size * pages_to_stream: return PageStreamingResponse() else: - return PageStreamingResponse(nums=iter(range(page_size)), + return PageStreamingResponse(nums=list(range(page_size)), next_page_token=page_size) with mock.patch('grpc.framework.crust.implementations.' @@ -301,10 +301,23 @@ def grpc_return_value(request, *dummy_args, **dummy_kwargs): mock_grpc.side_effect = grpc_return_value settings = CallSettings( page_descriptor=fake_grpc_func_descriptor, timeout=0) - my_callable = api_callable.create_api_call(mock_grpc, settings=settings) + my_callable = api_callable.create_api_call( + mock_grpc, settings=settings) self.assertEqual(list(my_callable(PageStreamingRequest())), list(range(page_size * pages_to_stream))) + unflattened_settings = CallSettings( + page_descriptor=fake_grpc_func_descriptor, timeout=0, + flatten_pages=False) + unflattened_callable = api_callable.create_api_call( + mock_grpc, settings=unflattened_settings) + # Expect a list of pages_to_stream pages, each of size page_size, + # plus one empty page + expected = [list(range(page_size * n, page_size * (n + 1))) + for n in range(pages_to_stream)] + [()] + self.assertEqual(list(unflattened_callable(PageStreamingRequest())), + expected) + def test_bundling_page_streaming_error(self): settings = CallSettings( page_descriptor=object(), bundle_descriptor=object(), From 51e679f0f2bd8812081d53b633870f3057d6b2ff Mon Sep 17 00:00:00 2001 From: Jacob Geiger Date: Fri, 29 Apr 2016 12:42:58 -0700 Subject: [PATCH 2/3] Permit passing page token for page streaming. Now, if page streaming is set to per-page (rather than per-resource), it is possible to specify a page token to the request. It is also possible to retrieve the full response message, as well as the next page token, from the per-page iterator. --- google/gax/__init__.py | 101 ++++++++++++++++++++++++++++++++----- google/gax/api_callable.py | 45 +++++++++-------- test/test_api_callable.py | 33 +++++++++--- test/test_gax.py | 15 ++++-- 4 files changed, 147 insertions(+), 47 deletions(-) diff --git a/google/gax/__init__.py b/google/gax/__init__.py index 5bc8442..90c9c5a 100644 --- a/google/gax/__init__.py +++ b/google/gax/__init__.py @@ -36,6 +36,10 @@ __version__ = '0.10.2' +INITIAL_PAGE = '' +"""The page token passed into an initial paginated request.""" + + OPTION_INHERIT = object() """Global constant. @@ -48,7 +52,8 @@ class CallSettings(object): """Encapsulates the call settings for an API call.""" # pylint: disable=too-few-public-methods def __init__(self, timeout=30, retry=None, page_descriptor=None, - flatten_pages=None, bundler=None, bundle_descriptor=None): + flatten_pages=None, page_token=None, bundler=None, + bundle_descriptor=None): """Constructor. Args: @@ -59,10 +64,14 @@ def __init__(self, timeout=30, retry=None, page_descriptor=None, page_descriptor (:class:`PageDescriptor`): indicates the structure of page streaming to be performed. If set to None, page streaming is disabled. - flatten_pages (bool): If there is no page_descriptor, this attrbute - has no meaning. Otherwise, determines whether a page streamed - response should make the page structure transparent to the user by - flattening the repeated field in the returned generator. + flatten_pages (bool): If there is no ``page_descriptor``, this + attrbute has no meaning. Otherwise, determines whether a page + streamed response should make the page structure transparent to + the user by flattening the repeated field in the returned + generator. + page_token (str): If there is no ``page_descriptor``, this attribute + has no meaning. Otherwise, determines the page token used in the + page streaming request. bundler (:class:`gax.bundling.Executor`): orchestrates bundling. If None, bundling is not performed. bundle_descriptor (:class:`BundleDescriptor`): indicates the @@ -72,12 +81,17 @@ def __init__(self, timeout=30, retry=None, page_descriptor=None, self.retry = retry self.page_descriptor = page_descriptor self.flatten_pages = flatten_pages + self.page_token = page_token self.bundler = bundler self.bundle_descriptor = bundle_descriptor def merge(self, options): """Returns a new CallSettings merged from this and a CallOptions object. + Note that passing if the CallOptions instance specifies a page_token, + the merged CallSettings will have ``flatten_pages`` disabled. This + permits toggling per-resource/per-page page streaming. + Args: options (:class:`CallOptions`): an instance whose values override those in this object. If None, ``merge`` returns a copy of this @@ -102,10 +116,12 @@ def merge(self, options): else: retry = options.retry - if options.is_page_streaming == OPTION_INHERIT: + if options.page_token == OPTION_INHERIT: flatten_pages = self.flatten_pages + page_token = self.page_token else: - flatten_pages = options.is_page_streaming + flatten_pages = False + page_token = options.page_token if options.is_bundling: bundler = self.bundler @@ -114,7 +130,7 @@ def merge(self, options): return CallSettings( timeout=timeout, retry=retry, - page_descriptor=self.page_descriptor, + page_descriptor=self.page_descriptor, page_token=page_token, flatten_pages=flatten_pages, bundler=bundler, bundle_descriptor=self.bundle_descriptor) @@ -130,7 +146,7 @@ class CallOptions(object): """ # pylint: disable=too-few-public-methods def __init__(self, timeout=OPTION_INHERIT, retry=OPTION_INHERIT, - is_page_streaming=OPTION_INHERIT, is_bundling=False): + page_token=OPTION_INHERIT, is_bundling=False): """Constructor. Example: @@ -150,14 +166,17 @@ def __init__(self, timeout=OPTION_INHERIT, retry=OPTION_INHERIT, timeout (int): The client-side timeout for API calls. retry (:class:`RetryOptions`): determines whether and how to retry on transient errors. When set to None, the call will not retry. - is_page_streaming (bool): If set and the call is configured for page - streaming, page streaming is performed. + page_token (str): If set and the call is configured for page + streaming, page streaming is performed per-page, starting with + this page_token. Use ``INITIAL_PAGE`` for the first request. + If unset and the call is configured for page streaming, page + streaming is performed per-resource. is_bundling (bool): If set and the call is configured for bundling, bundling is performed. Bundling is always disabled by default. """ self.timeout = timeout self.retry = retry - self.is_page_streaming = is_page_streaming + self.page_token = page_token self.is_bundling = is_bundling @@ -346,3 +365,61 @@ def __new__(cls, request_byte_threshold, request_byte_limit, delay_threshold) + + +class PageIterator(object): + """An iterator over the pages of a page streaming API call. + + Provides access to the individual pages of the call, as well as the page + token. + + Attributes: + response: The full response message for the call most recently made, or + None if a call has not yet been made. + page_token: The page token to be passed in the request for the next call + to be made. + """ + # pylint: disable=too-few-public-methods + def __init__(self, api_call, page_descriptor, page_token, request, **kwargs): + """Constructor. + + Args: + api_call (callable[[req], resp]): an API call that is page + streaming. + page_descriptor (:class:`PageDescriptor`): indicates the structure + of page streaming to be performed. + page_token (str): The page token to be passed to API call request. + If no page token has yet been acquired, this field should be set + to ``INITIAL_PAGE``. + request (object): The request to be passed to the API call. The page + token field of the request is overwritten by the ``page_token`` + passed to the constructor. + **kwargs: Arbitrary keyword arguments to be passed to the API call. + + Returns: + A PageIterator object. + """ + self.response = None + self.page_token = page_token + self._func = api_call + self._page_descriptor = page_descriptor + self._request = request + self._kwargs = kwargs + self._done = False + + def __iter__(self): + return self + + def next(self): + """Retrieves the next page.""" + if self._done: + raise StopIteration + setattr(self._request, + self._page_descriptor.request_page_token_field, + self.page_token) + response = self._func(self._request, **self._kwargs) + self.page_token = getattr( + response, self._page_descriptor.response_page_token_field) + if not self.page_token: + self._done = True + return getattr(response, self._page_descriptor.resource_field) diff --git a/google/gax/api_callable.py b/google/gax/api_callable.py index 9b2cec2..d3a5594 100644 --- a/google/gax/api_callable.py +++ b/google/gax/api_callable.py @@ -35,7 +35,7 @@ import time from . import (BackoffSettings, BundleOptions, bundling, CallSettings, config, - OPTION_INHERIT, RetryOptions) + OPTION_INHERIT, PageIterator, RetryOptions) from .errors import GaxError, RetryError _MILLIS_PER_SECOND = 1000 @@ -161,20 +161,16 @@ def inner(*args, **kwargs): return inner -def _page_streamable(a_func, - request_page_token_field, - response_page_token_field, - resource_field, +def _page_streamable(a_func, page_descriptor, page_token=None, flatten_pages=True): """Creates a function that yields an iterable to performs page-streaming. Args: a_func (callable[[req], resp]): an API call that is page streaming. - request_page_token_field (str): The name of the field of the page token - in the request. - response_page_token_field (str): The name of the field of the next page - token in the response. - resource_field (str): The name of the field to be streamed. + page_descriptor (:class:`PageDescriptor`): indicates the structure + of page streaming to be performed. + page_token (str): Optional. If set and page streaming is over pages of + the response, indicates the page_token to be passed to the API call. flatten_pages (bool): Optional. If set, the returned iterable is over ``resource_field``; otherwise the returned iterable is over the pages of the response, each of which is an iterable over ``resource_field``. @@ -183,22 +179,28 @@ def _page_streamable(a_func, A function that returns an iterable. """ - def inner(*args, **kwargs): + def flattened(*args, **kwargs): """A generator that yields all the paged responses.""" request = args[0] while True: response = a_func(request, **kwargs) - if flatten_pages: - for obj in getattr(response, resource_field): - yield obj - else: - yield getattr(response, resource_field) - next_page_token = getattr(response, response_page_token_field) + for obj in getattr(response, page_descriptor.resource_field): + yield obj + next_page_token = getattr( + response, page_descriptor.response_page_token_field) if not next_page_token: break - setattr(request, request_page_token_field, next_page_token) + setattr(request, + page_descriptor.request_page_token_field, + next_page_token) - return inner + def unflattened(*args, **kwargs): + """A generator that yields individual pages.""" + request = args[0] + return PageIterator( + a_func, page_descriptor, page_token, request, **kwargs) + + return flattened if flatten_pages else unflattened def _construct_bundling(method_config, method_bundling_override, @@ -473,9 +475,8 @@ def create_api_call(func, settings): flatten_pages = settings.flatten_pages return _page_streamable( api_call, - settings.page_descriptor.request_page_token_field, - settings.page_descriptor.response_page_token_field, - settings.page_descriptor.resource_field, + settings.page_descriptor, + page_token=settings.page_token, flatten_pages=flatten_pages) if settings.bundler and settings.bundle_descriptor: diff --git a/test/test_api_callable.py b/test/test_api_callable.py index 3c6f260..bb395cb 100644 --- a/test/test_api_callable.py +++ b/test/test_api_callable.py @@ -36,7 +36,7 @@ from google.gax import ( api_callable, bundling, BackoffSettings, BundleDescriptor, BundleOptions, - CallSettings, PageDescriptor, RetryOptions) + CallSettings, INITIAL_PAGE, PageDescriptor, RetryOptions) from google.gax.errors import GaxError, RetryError @@ -267,12 +267,13 @@ def test_page_streaming(self): # integers, returning `page_size` integers with each call and using # the next integer to return as the page token, until `pages_to_stream` # pages have been returned. + # pylint:disable=too-many-locals page_size = 3 pages_to_stream = 5 # pylint: disable=abstract-method, too-few-public-methods class PageStreamingRequest(object): - def __init__(self, page_token=0): + def __init__(self, page_token=INITIAL_PAGE): self.page_token = page_token class PageStreamingResponse(object): @@ -284,13 +285,13 @@ def __init__(self, nums=(), next_page_token=0): 'page_token', 'next_page_token', 'nums') def grpc_return_value(request, *dummy_args, **dummy_kwargs): - if (request.page_token > 0 and - request.page_token < page_size * pages_to_stream): + start = int(request.page_token) if request.page_token else 0 + if start > 0 and start < page_size * pages_to_stream: return PageStreamingResponse( - nums=list(range(request.page_token, - request.page_token + page_size)), - next_page_token=request.page_token + page_size) - elif request.page_token >= page_size * pages_to_stream: + nums=list(range(start, + start + page_size)), + next_page_token=start + page_size) + elif start >= page_size * pages_to_stream: return PageStreamingResponse() else: return PageStreamingResponse(nums=list(range(page_size)), @@ -318,6 +319,22 @@ def grpc_return_value(request, *dummy_args, **dummy_kwargs): self.assertEqual(list(unflattened_callable(PageStreamingRequest())), expected) + pages_already_read = 2 + explicit_page_token_settings = CallSettings( + page_descriptor=fake_grpc_func_descriptor, timeout=0, + flatten_pages=False, + page_token=str(page_size * pages_already_read)) + explicit_page_token_callable = api_callable.create_api_call( + mock_grpc, settings=explicit_page_token_settings) + # Expect a list of pages_to_stream pages, each of size page_size, + # plus one empty page, minus the pages_already_read + expected = [list(range(page_size * n, page_size * (n + 1))) + for n in range(pages_already_read, pages_to_stream)] + expected += [()] + self.assertEqual( + list(explicit_page_token_callable(PageStreamingRequest())), + expected) + def test_bundling_page_streaming_error(self): settings = CallSettings( page_descriptor=object(), bundle_descriptor=object(), diff --git a/test/test_gax.py b/test/test_gax.py index 4ff84da..2f1e211 100644 --- a/test/test_gax.py +++ b/test/test_gax.py @@ -35,7 +35,8 @@ import unittest2 from google.gax import ( - BundleOptions, CallOptions, CallSettings, OPTION_INHERIT, RetryOptions) + BundleOptions, CallOptions, CallSettings, INITIAL_PAGE, OPTION_INHERIT, + RetryOptions) class TestBundleOptions(unittest2.TestCase): @@ -63,7 +64,7 @@ def test_call_options_simple(self): options = CallOptions(timeout=23) self.assertEqual(options.timeout, 23) self.assertEqual(options.retry, OPTION_INHERIT) - self.assertEqual(options.is_page_streaming, OPTION_INHERIT) + self.assertEqual(options.page_token, OPTION_INHERIT) def test_settings_merge_options1(self): retry = RetryOptions(None, None) @@ -85,11 +86,15 @@ def test_settings_merge_options2(self): def test_settings_merge_options_page_streaming(self): retry = RetryOptions(None, None) - options = CallOptions(timeout=46, is_page_streaming=False) - settings = CallSettings(timeout=9, retry=retry) + page_descriptor = object() + options = CallOptions(timeout=46, page_token=INITIAL_PAGE) + settings = CallSettings(timeout=9, retry=retry, + page_descriptor=page_descriptor) final = settings.merge(options) self.assertEqual(final.timeout, 46) - self.assertIsNone(final.page_descriptor) + self.assertEqual(final.page_descriptor, page_descriptor) + self.assertEqual(final.page_token, INITIAL_PAGE) + self.assertFalse(final.flatten_pages) self.assertEqual(final.retry, retry) def test_settings_merge_none(self): From 7123ce6e48fc3c0748173e9f24b77f105de1d5e9 Mon Sep 17 00:00:00 2001 From: Jacob Geiger Date: Fri, 29 Apr 2016 14:18:34 -0700 Subject: [PATCH 3/3] Make page_token agnostic to type Previously, assumed page_token was a string. Now it can have any type. --- google/gax/__init__.py | 14 ++++++++------ test/test_api_callable.py | 4 ++-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/google/gax/__init__.py b/google/gax/__init__.py index 90c9c5a..450fb55 100644 --- a/google/gax/__init__.py +++ b/google/gax/__init__.py @@ -36,8 +36,8 @@ __version__ = '0.10.2' -INITIAL_PAGE = '' -"""The page token passed into an initial paginated request.""" +INITIAL_PAGE = object() +"""A placeholder for the page token passed into an initial paginated request.""" OPTION_INHERIT = object() @@ -393,7 +393,8 @@ def __init__(self, api_call, page_descriptor, page_token, request, **kwargs): to ``INITIAL_PAGE``. request (object): The request to be passed to the API call. The page token field of the request is overwritten by the ``page_token`` - passed to the constructor. + passed to the constructor, unless ``page_token`` is + ``INITIAL_PAGE``. **kwargs: Arbitrary keyword arguments to be passed to the API call. Returns: @@ -414,9 +415,10 @@ def next(self): """Retrieves the next page.""" if self._done: raise StopIteration - setattr(self._request, - self._page_descriptor.request_page_token_field, - self.page_token) + if self.page_token != INITIAL_PAGE: + setattr(self._request, + self._page_descriptor.request_page_token_field, + self.page_token) response = self._func(self._request, **self._kwargs) self.page_token = getattr( response, self._page_descriptor.response_page_token_field) diff --git a/test/test_api_callable.py b/test/test_api_callable.py index bb395cb..db8c4ea 100644 --- a/test/test_api_callable.py +++ b/test/test_api_callable.py @@ -273,7 +273,7 @@ def test_page_streaming(self): # pylint: disable=abstract-method, too-few-public-methods class PageStreamingRequest(object): - def __init__(self, page_token=INITIAL_PAGE): + def __init__(self, page_token=0): self.page_token = page_token class PageStreamingResponse(object): @@ -309,7 +309,7 @@ def grpc_return_value(request, *dummy_args, **dummy_kwargs): unflattened_settings = CallSettings( page_descriptor=fake_grpc_func_descriptor, timeout=0, - flatten_pages=False) + flatten_pages=False, page_token=INITIAL_PAGE) unflattened_callable = api_callable.create_api_call( mock_grpc, settings=unflattened_settings) # Expect a list of pages_to_stream pages, each of size page_size,