From 5b0f9c83eef62ada2ef90d8d8f75769248ce04ef Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Tue, 18 Oct 2016 16:56:09 -0700 Subject: [PATCH 1/5] Making BigQuery dataset.list_tables() into an iterator. --- bigquery/google/cloud/bigquery/dataset.py | 55 ++++++++++++----------- bigquery/unit_tests/test_dataset.py | 20 +++++++-- docs/bigquery_snippets.py | 5 +-- system_tests/bigquery.py | 10 +++-- 4 files changed, 55 insertions(+), 35 deletions(-) diff --git a/bigquery/google/cloud/bigquery/dataset.py b/bigquery/google/cloud/bigquery/dataset.py index 423484c68d51..a9a7aecdc31c 100644 --- a/bigquery/google/cloud/bigquery/dataset.py +++ b/bigquery/google/cloud/bigquery/dataset.py @@ -18,6 +18,7 @@ from google.cloud._helpers import _datetime_from_microseconds from google.cloud.exceptions import NotFound from google.cloud.bigquery.table import Table +from google.cloud.iterator import Iterator class AccessGrant(object): @@ -542,35 +543,24 @@ def list_tables(self, max_results=None, page_token=None): https://cloud.google.com/bigquery/docs/reference/v2/tables/list :type max_results: int - :param max_results: maximum number of tables to return, If not - passed, defaults to a value set by the API. + :param max_results: (Optional) Maximum number of tables to return. + If not passed, defaults to a value set by the API. :type page_token: str - :param page_token: opaque marker for the next "page" of datasets. If - not passed, the API will return the first page of - datasets. - - :rtype: tuple, (list, str) - :returns: list of :class:`google.cloud.bigquery.table.Table`, plus a - "next page token" string: if not ``None``, indicates that - more tables can be retrieved with another call (pass that - value as ``page_token``). - """ - params = {} - - if max_results is not None: - params['maxResults'] = max_results - - if page_token is not None: - params['pageToken'] = page_token + :param page_token: (Optional) Opaque marker for the next "page" of + datasets. If not passed, the API will return the + first page of datasets. + :rtype: :class:`~google.cloud.iterator.Iterator` + :returns: Iterator of :class:`~google.cloud.bigquery.table.Table` + contained within the current dataset. + """ path = '/projects/%s/datasets/%s/tables' % (self.project, self.name) - connection = self._client.connection - resp = connection.api_request(method='GET', path=path, - query_params=params) - tables = [Table.from_api_repr(resource, self) - for resource in resp.get('tables', ())] - return tables, resp.get('nextPageToken') + result = Iterator(client=self._client, path=path, + item_to_value=_item_to_table, items_key='tables', + page_token=page_token, max_results=max_results) + result.dataset = self + return result def table(self, name, schema=()): """Construct a table bound to this dataset. @@ -585,3 +575,18 @@ def table(self, name, schema=()): :returns: a new ``Table`` instance """ return Table(name, dataset=self, schema=schema) + + +def _item_to_table(iterator, resource): + """Convert a JSON table to the native object. + + :type iterator: :class:`~google.cloud.iterator.Iterator` + :param iterator: The iterator that is currently in use. + + :type resource: dict + :param resource: An item to be converted to a table. + + :rtype: :class:`~google.cloud.bigquery.table.Table` + :returns: The next table in the page. + """ + return Table.from_api_repr(resource, iterator.dataset) diff --git a/bigquery/unit_tests/test_dataset.py b/bigquery/unit_tests/test_dataset.py index d9b85cf5fad3..7252349e0d03 100644 --- a/bigquery/unit_tests/test_dataset.py +++ b/bigquery/unit_tests/test_dataset.py @@ -639,7 +639,13 @@ def test_list_tables_empty(self): conn = _Connection({}) client = _Client(project=self.PROJECT, connection=conn) dataset = self._makeOne(self.DS_NAME, client=client) - tables, token = dataset.list_tables() + + iterator = dataset.list_tables() + self.assertIs(iterator.dataset, dataset) + iterator.update_page() + tables = list(iterator.page) + token = iterator.next_page_token + self.assertEqual(tables, []) self.assertIsNone(token) self.assertEqual(len(conn._requested), 1) @@ -677,7 +683,11 @@ def test_list_tables_defaults(self): client = _Client(project=self.PROJECT, connection=conn) dataset = self._makeOne(self.DS_NAME, client=client) - tables, token = dataset.list_tables() + iterator = dataset.list_tables() + self.assertIs(iterator.dataset, dataset) + iterator.update_page() + tables = list(iterator.page) + token = iterator.next_page_token self.assertEqual(len(tables), len(DATA['tables'])) for found, expected in zip(tables, DATA['tables']): @@ -719,7 +729,11 @@ def test_list_tables_explicit(self): client = _Client(project=self.PROJECT, connection=conn) dataset = self._makeOne(self.DS_NAME, client=client) - tables, token = dataset.list_tables(max_results=3, page_token=TOKEN) + iterator = dataset.list_tables(max_results=3, page_token=TOKEN) + self.assertIs(iterator.dataset, dataset) + iterator.update_page() + tables = list(iterator.page) + token = iterator.next_page_token self.assertEqual(len(tables), len(DATA['tables'])) for found, expected in zip(tables, DATA['tables']): diff --git a/docs/bigquery_snippets.py b/docs/bigquery_snippets.py index ec39d2667cfa..3343338de9b7 100644 --- a/docs/bigquery_snippets.py +++ b/docs/bigquery_snippets.py @@ -200,13 +200,12 @@ def dataset_list_tables(client, to_delete): to_delete.append(dataset) # [START dataset_list_tables] - tables, token = dataset.list_tables() # API request + tables = list(dataset.list_tables()) # API request(s) assert len(tables) == 0 - assert token is None table = dataset.table(TABLE_NAME) table.view_query = QUERY table.create() # API request - tables, token = dataset.list_tables() # API request + tables = list(dataset.list_tables()) # API request(s) assert len(tables) == 1 assert tables[0].name == TABLE_NAME # [END dataset_list_tables] diff --git a/system_tests/bigquery.py b/system_tests/bigquery.py index 5df4078fce87..db4a000b770d 100644 --- a/system_tests/bigquery.py +++ b/system_tests/bigquery.py @@ -186,9 +186,10 @@ def test_list_tables(self): self.to_delete.append(dataset) # Retrieve tables before any are created for the dataset. - all_tables, token = dataset.list_tables() + iterator = dataset.list_tables() + all_tables = list(iterator) self.assertEqual(all_tables, []) - self.assertIsNone(token) + self.assertIsNone(iterator.next_page_token) # Insert some tables to be listed. tables_to_create = [ @@ -205,8 +206,9 @@ def test_list_tables(self): self.to_delete.insert(0, table) # Retrieve the tables. - all_tables, token = dataset.list_tables() - self.assertIsNone(token) + iterator = dataset.list_tables() + all_tables = list(iterator) + self.assertIsNone(iterator.next_page_token) created = [table for table in all_tables if (table.name in tables_to_create and table.dataset_name == DATASET_NAME)] From 092dd09581be458399903200064c0c29201f3808 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Tue, 18 Oct 2016 16:57:02 -0700 Subject: [PATCH 2/5] Refactoring _rows_from_json BigQuery helper. In particular, isolating the logic useful to work on a single row. --- bigquery/google/cloud/bigquery/_helpers.py | 38 +++++++++++++++------- 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_helpers.py b/bigquery/google/cloud/bigquery/_helpers.py index d22e1592a68e..6d31eea3c03e 100644 --- a/bigquery/google/cloud/bigquery/_helpers.py +++ b/bigquery/google/cloud/bigquery/_helpers.py @@ -86,20 +86,34 @@ def _string_from_json(value, _): } +def _row_from_json(row, schema): + """Convert JSON row data to row w/ appropriate types. + + :type row: dict + :param row: + + :type schema: tuple + :param schema: A tuple of + :class:`~google.cloud.bigquery.schema.SchemaField`. + + :rtype: tuple + :returns: A tuple of data converted to native types. + """ + row_data = [] + for field, cell in zip(schema, row['f']): + converter = _CELLDATA_FROM_JSON[field.field_type] + if field.mode == 'REPEATED': + row_data.append([converter(item, field) + for item in cell['v']]) + else: + row_data.append(converter(cell['v'], field)) + + return tuple(row_data) + + def _rows_from_json(rows, schema): """Convert JSON row data to rows w/ appropriate types.""" - rows_data = [] - for row in rows: - row_data = [] - for field, cell in zip(schema, row['f']): - converter = _CELLDATA_FROM_JSON[field.field_type] - if field.mode == 'REPEATED': - row_data.append([converter(item, field) - for item in cell['v']]) - else: - row_data.append(converter(cell['v'], field)) - rows_data.append(tuple(row_data)) - return rows_data + return [_row_from_json(row, schema) for row in rows] class _ConfigurationProperty(object): From 0e7b01f4203bd3f801442787902abdb5906293a8 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Tue, 18 Oct 2016 17:21:42 -0700 Subject: [PATCH 3/5] Making BigQuery table.fetch_data() into an iterator. --- bigquery/google/cloud/bigquery/table.py | 102 +++++++++++++++--------- bigquery/unit_tests/test_table.py | 27 +++++-- core/google/cloud/iterator.py | 1 + docs/bigquery-usage.rst | 7 +- docs/bigquery_snippets.py | 19 ++--- scripts/run_pylint.py | 2 +- system_tests/bigquery.py | 14 +++- 7 files changed, 113 insertions(+), 59 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 8467c8f23210..6d73e538cf1d 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -32,7 +32,8 @@ from google.cloud.streaming.transfer import RESUMABLE_UPLOAD from google.cloud.streaming.transfer import Upload from google.cloud.bigquery.schema import SchemaField -from google.cloud.bigquery._helpers import _rows_from_json +from google.cloud.bigquery._helpers import _row_from_json +from google.cloud.iterator import Iterator _TABLE_HAS_NO_SCHEMA = "Table has no schema: call 'table.reload()'" @@ -653,47 +654,36 @@ def fetch_data(self, max_results=None, page_token=None, client=None): up-to-date with the schema as defined on the back-end: if the two schemas are not identical, the values returned may be incomplete. To ensure that the local copy of the schema is - up-to-date, call the table's ``reload`` method. + up-to-date, call :meth:`reload`. :type max_results: int - :param max_results: (Optional) maximum number of rows to return. + :param max_results: (Optional) Maximum number of rows to return. :type page_token: str - :param page_token: - (Optional) token representing a cursor into the table's rows. - - :type client: :class:`~google.cloud.bigquery.client.Client` or - ``NoneType`` - :param client: the client to use. If not passed, falls back to the - ``client`` stored on the current dataset. - - :rtype: tuple - :returns: ``(row_data, total_rows, page_token)``, where ``row_data`` - is a list of tuples, one per result row, containing only - the values; ``total_rows`` is a count of the total number - of rows in the table; and ``page_token`` is an opaque - string which can be used to fetch the next batch of rows - (``None`` if no further batches can be fetched). + :param page_token: (Optional) Token representing a cursor into the + table's rows. + + :type client: :class:`~google.cloud.bigquery.client.Client` + :param client: (Optional) The client to use. If not passed, falls + back to the ``client`` stored on the current dataset. + + :rtype: :class:`~google.cloud.iterator.Iterator` + :returns: Iterator of row data :class:`tuple`s. Each page in the + iterator will have the ``total_rows`` attribute set, + which counts the total number of rows **in the table** + (this is distinct from the total number of rows in the + current page: ``iterator.page.num_items``). """ client = self._require_client(client) - params = {} - - if max_results is not None: - params['maxResults'] = max_results - - if page_token is not None: - params['pageToken'] = page_token - - response = client.connection.api_request(method='GET', - path='%s/data' % self.path, - query_params=params) - total_rows = response.get('totalRows') - if total_rows is not None: - total_rows = int(total_rows) - page_token = response.get('pageToken') - rows_data = _rows_from_json(response.get('rows', ()), self._schema) - - return rows_data, total_rows, page_token + path = '%s/data' % (self.path,) + iterator = Iterator(client=client, path=path, + item_to_value=_item_to_row, items_key='rows', + page_token=page_token, max_results=max_results, + page_start=_rows_page_start) + iterator.schema = self._schema + # Over-ride the key used to retrieve the next page token. + iterator._NEXT_TOKEN = 'pageToken' + return iterator def insert_data(self, rows, @@ -1083,6 +1073,46 @@ def _build_schema_resource(fields): return infos +def _item_to_row(iterator, resource): + """Convert a JSON row to the native object. + + .. note:: + + This assumes that the ``schema`` attribute has been + added to the iterator after being created, which + should be done by the caller. + + :type iterator: :class:`~google.cloud.iterator.Iterator` + :param iterator: The iterator that is currently in use. + + :type resource: dict + :param resource: An item to be converted to a row. + + :rtype: tuple + :returns: The next row in the page. + """ + return _row_from_json(resource, iterator.schema) + + +# pylint: disable=unused-argument +def _rows_page_start(iterator, page, response): + """Grab total rows after a :class:`~google.cloud.iterator.Page` started. + + :type iterator: :class:`~google.cloud.iterator.Iterator` + :param iterator: The iterator that is currently in use. + + :type page: :class:`~google.cloud.iterator.Page` + :param page: The page that was just created. + + :type response: dict + :param response: The JSON API response for a page of rows in a table. + """ + total_rows = response.get('totalRows') + if total_rows is not None: + page.total_rows = int(total_rows) +# pylint: enable=unused-argument + + class _UploadConfig(object): """Faux message FBO apitools' 'configure_request'.""" accept = ['*/*'] diff --git a/bigquery/unit_tests/test_table.py b/bigquery/unit_tests/test_table.py index 8ddcafab556b..48741172f732 100644 --- a/bigquery/unit_tests/test_table.py +++ b/bigquery/unit_tests/test_table.py @@ -1068,7 +1068,11 @@ def _bigquery_timestamp_float_repr(ts_float): table = self._makeOne(self.TABLE_NAME, dataset=dataset, schema=[full_name, age, joined]) - rows, total_rows, page_token = table.fetch_data() + iterator = table.fetch_data() + iterator.update_page() + rows = list(iterator.page) + total_rows = iterator.page.total_rows + page_token = iterator.next_page_token self.assertEqual(len(rows), 4) self.assertEqual(rows[0], ('Phred Phlyntstone', 32, WHEN)) @@ -1129,9 +1133,12 @@ def test_fetch_data_w_alternate_client(self): table = self._makeOne(self.TABLE_NAME, dataset=dataset, schema=[full_name, age, voter, score]) - rows, total_rows, page_token = table.fetch_data(client=client2, - max_results=MAX, - page_token=TOKEN) + iterator = table.fetch_data( + client=client2, max_results=MAX, page_token=TOKEN) + iterator.update_page() + rows = list(iterator.page) + total_rows = getattr(iterator.page, 'total_rows', None) + page_token = iterator.next_page_token self.assertEqual(len(rows), 4) self.assertEqual(rows[0], ('Phred Phlyntstone', 32, True, 3.1415926)) @@ -1177,7 +1184,11 @@ def test_fetch_data_w_repeated_fields(self): table = self._makeOne(self.TABLE_NAME, dataset=dataset, schema=[full_name, struct]) - rows, total_rows, page_token = table.fetch_data() + iterator = table.fetch_data() + iterator.update_page() + rows = list(iterator.page) + total_rows = iterator.page.total_rows + page_token = iterator.next_page_token self.assertEqual(len(rows), 1) self.assertEqual(rows[0][0], ['red', 'green']) @@ -1227,7 +1238,11 @@ def test_fetch_data_w_record_schema(self): table = self._makeOne(self.TABLE_NAME, dataset=dataset, schema=[full_name, phone]) - rows, total_rows, page_token = table.fetch_data() + iterator = table.fetch_data() + iterator.update_page() + rows = list(iterator.page) + total_rows = iterator.page.total_rows + page_token = iterator.next_page_token self.assertEqual(len(rows), 3) self.assertEqual(rows[0][0], 'Phred Phlyntstone') diff --git a/core/google/cloud/iterator.py b/core/google/cloud/iterator.py index 021fab0c1653..49f70fb54a59 100644 --- a/core/google/cloud/iterator.py +++ b/core/google/cloud/iterator.py @@ -297,6 +297,7 @@ class HTTPIterator(Iterator): _PAGE_TOKEN = 'pageToken' _MAX_RESULTS = 'maxResults' + _NEXT_TOKEN = 'nextPageToken' _RESERVED_PARAMS = frozenset([_PAGE_TOKEN, _MAX_RESULTS]) _HTTP_METHOD = 'GET' diff --git a/docs/bigquery-usage.rst b/docs/bigquery-usage.rst index 91e59003dc94..6c9f0b33192d 100644 --- a/docs/bigquery-usage.rst +++ b/docs/bigquery-usage.rst @@ -209,8 +209,9 @@ Run a query which can be expected to complete within bounded time: :start-after: [START client_run_sync_query] :end-before: [END client_run_sync_query] -If the rows returned by the query do not fit into the inital response, -then we need to fetch the remaining rows via ``fetch_data``: +If the rows returned by the query do not fit into the initial response, +then we need to fetch the remaining rows via +:meth:`~google.cloud.bigquery.query.QueryResults.fetch_data`: .. literalinclude:: bigquery_snippets.py :start-after: [START client_run_sync_query_paged] @@ -218,7 +219,7 @@ then we need to fetch the remaining rows via ``fetch_data``: If the query takes longer than the timeout allowed, ``query.complete`` will be ``False``. In that case, we need to poll the associated job until -it is done, and then fetch the reuslts: +it is done, and then fetch the results: .. literalinclude:: bigquery_snippets.py :start-after: [START client_run_sync_query_timeout] diff --git a/docs/bigquery_snippets.py b/docs/bigquery_snippets.py index 3343338de9b7..5c2593196a58 100644 --- a/docs/bigquery_snippets.py +++ b/docs/bigquery_snippets.py @@ -341,7 +341,9 @@ def _warm_up_inserted_table_data(table): while len(rows) == 0 and counter > 0: counter -= 1 - rows, _, _ = table.fetch_data() + iterator = table.fetch_data() + iterator.update_page() + rows = list(iterator.page) if len(rows) == 0: time.sleep(5) @@ -376,13 +378,8 @@ def do_something(row): found_rows.append(row) # [START table_fetch_data] - rows, _, token = table.fetch_data() - while True: - for row in rows: - do_something(row) - if token is None: - break - rows, _, token = table.fetch_data(page_token=token) + for row in table.fetch_data(): + do_something(row) # [END table_fetch_data] assert len(found_rows) == len(ROWS_TO_INSERT) @@ -424,7 +421,11 @@ def table_upload_from_file(client, to_delete): _warm_up_inserted_table_data(table) - rows, total, token = table.fetch_data() + iterator = table.fetch_data() + iterator.update_page() + rows = list(iterator.page) + total = iterator.page.total_rows + token = iterator.next_page_token assert len(rows) == total == 2 assert token is None diff --git a/scripts/run_pylint.py b/scripts/run_pylint.py index 9b4c77883a57..1ef918e850e5 100644 --- a/scripts/run_pylint.py +++ b/scripts/run_pylint.py @@ -72,7 +72,7 @@ } TEST_RC_REPLACEMENTS = { 'FORMAT': { - 'max-module-lines': 1960, + 'max-module-lines': 2000, }, } diff --git a/system_tests/bigquery.py b/system_tests/bigquery.py index db4a000b770d..666b91c4aef7 100644 --- a/system_tests/bigquery.py +++ b/system_tests/bigquery.py @@ -263,6 +263,12 @@ def test_update_table(self): self.assertEqual(found.field_type, expected.field_type) self.assertEqual(found.mode, expected.mode) + @staticmethod + def _fetch_single_page(table): + iterator = table.fetch_data() + iterator.update_page() + return list(iterator.page) + def test_insert_data_then_dump_table(self): import datetime from google.cloud._helpers import UTC @@ -303,11 +309,11 @@ def test_insert_data_then_dump_table(self): def _has_rows(result): return len(result[0]) > 0 - # Allow for 90 seconds of "warm up" before rows visible. See: + # Allow for "warm up" before rows visible. See: # https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability # 8 tries -> 1 + 2 + 4 + 8 + 16 + 32 + 64 = 127 seconds retry = RetryResult(_has_rows, max_tries=8) - rows, _, _ = retry(table.fetch_data)() + rows = retry(self._fetch_single_page)(table) by_age = operator.itemgetter(1) self.assertEqual(sorted(rows, key=by_age), @@ -361,7 +367,7 @@ def _job_done(instance): self.assertEqual(job.output_rows, len(ROWS)) - rows, _, _ = table.fetch_data() + rows = self._fetch_single_page(table) by_age = operator.itemgetter(1) self.assertEqual(sorted(rows, key=by_age), sorted(ROWS, key=by_age)) @@ -431,7 +437,7 @@ def _job_done(instance): retry = RetryInstanceState(_job_done, max_tries=8) retry(job.reload)() - rows, _, _ = table.fetch_data() + rows = self._fetch_single_page(table) by_age = operator.itemgetter(1) self.assertEqual(sorted(rows, key=by_age), sorted(ROWS, key=by_age)) From d3a6dffffa1825d8a0d94d25bae7ee782e7d76ea Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Mon, 31 Oct 2016 22:53:05 -0700 Subject: [PATCH 4/5] Rebase fixes. --- bigquery/google/cloud/bigquery/dataset.py | 8 +++--- bigquery/google/cloud/bigquery/table.py | 10 ++++---- bigquery/unit_tests/test_dataset.py | 16 +++++++----- bigquery/unit_tests/test_table.py | 31 ++++++++++++++--------- core/google/cloud/iterator.py | 2 +- docs/bigquery_snippets.py | 12 +++++---- system_tests/bigquery.py | 6 +++-- 7 files changed, 50 insertions(+), 35 deletions(-) diff --git a/bigquery/google/cloud/bigquery/dataset.py b/bigquery/google/cloud/bigquery/dataset.py index a9a7aecdc31c..f29fdbc8a243 100644 --- a/bigquery/google/cloud/bigquery/dataset.py +++ b/bigquery/google/cloud/bigquery/dataset.py @@ -18,7 +18,7 @@ from google.cloud._helpers import _datetime_from_microseconds from google.cloud.exceptions import NotFound from google.cloud.bigquery.table import Table -from google.cloud.iterator import Iterator +from google.cloud.iterator import HTTPIterator class AccessGrant(object): @@ -556,9 +556,9 @@ def list_tables(self, max_results=None, page_token=None): contained within the current dataset. """ path = '/projects/%s/datasets/%s/tables' % (self.project, self.name) - result = Iterator(client=self._client, path=path, - item_to_value=_item_to_table, items_key='tables', - page_token=page_token, max_results=max_results) + result = HTTPIterator(client=self._client, path=path, + item_to_value=_item_to_table, items_key='tables', + page_token=page_token, max_results=max_results) result.dataset = self return result diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 6d73e538cf1d..26bb80584be6 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -33,7 +33,7 @@ from google.cloud.streaming.transfer import Upload from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery._helpers import _row_from_json -from google.cloud.iterator import Iterator +from google.cloud.iterator import HTTPIterator _TABLE_HAS_NO_SCHEMA = "Table has no schema: call 'table.reload()'" @@ -676,10 +676,10 @@ def fetch_data(self, max_results=None, page_token=None, client=None): """ client = self._require_client(client) path = '%s/data' % (self.path,) - iterator = Iterator(client=client, path=path, - item_to_value=_item_to_row, items_key='rows', - page_token=page_token, max_results=max_results, - page_start=_rows_page_start) + iterator = HTTPIterator(client=client, path=path, + item_to_value=_item_to_row, items_key='rows', + page_token=page_token, max_results=max_results, + page_start=_rows_page_start) iterator.schema = self._schema # Over-ride the key used to retrieve the next page token. iterator._NEXT_TOKEN = 'pageToken' diff --git a/bigquery/unit_tests/test_dataset.py b/bigquery/unit_tests/test_dataset.py index 7252349e0d03..24c270b5005b 100644 --- a/bigquery/unit_tests/test_dataset.py +++ b/bigquery/unit_tests/test_dataset.py @@ -636,14 +636,16 @@ def test_delete_w_alternate_client(self): self.assertEqual(req['path'], '/%s' % PATH) def test_list_tables_empty(self): + import six + conn = _Connection({}) client = _Client(project=self.PROJECT, connection=conn) dataset = self._makeOne(self.DS_NAME, client=client) iterator = dataset.list_tables() self.assertIs(iterator.dataset, dataset) - iterator.update_page() - tables = list(iterator.page) + page = six.next(iterator.pages) + tables = list(page) token = iterator.next_page_token self.assertEqual(tables, []) @@ -655,6 +657,7 @@ def test_list_tables_empty(self): self.assertEqual(req['path'], '/%s' % PATH) def test_list_tables_defaults(self): + import six from google.cloud.bigquery.table import Table TABLE_1 = 'table_one' @@ -685,8 +688,8 @@ def test_list_tables_defaults(self): iterator = dataset.list_tables() self.assertIs(iterator.dataset, dataset) - iterator.update_page() - tables = list(iterator.page) + page = six.next(iterator.pages) + tables = list(page) token = iterator.next_page_token self.assertEqual(len(tables), len(DATA['tables'])) @@ -702,6 +705,7 @@ def test_list_tables_defaults(self): self.assertEqual(req['path'], '/%s' % PATH) def test_list_tables_explicit(self): + import six from google.cloud.bigquery.table import Table TABLE_1 = 'table_one' @@ -731,8 +735,8 @@ def test_list_tables_explicit(self): iterator = dataset.list_tables(max_results=3, page_token=TOKEN) self.assertIs(iterator.dataset, dataset) - iterator.update_page() - tables = list(iterator.page) + page = six.next(iterator.pages) + tables = list(page) token = iterator.next_page_token self.assertEqual(len(tables), len(DATA['tables'])) diff --git a/bigquery/unit_tests/test_table.py b/bigquery/unit_tests/test_table.py index 48741172f732..354821e4a149 100644 --- a/bigquery/unit_tests/test_table.py +++ b/bigquery/unit_tests/test_table.py @@ -1015,6 +1015,7 @@ def test_delete_w_alternate_client(self): def test_fetch_data_w_bound_client(self): import datetime + import six from google.cloud._helpers import UTC from google.cloud.bigquery.table import SchemaField @@ -1069,9 +1070,9 @@ def _bigquery_timestamp_float_repr(ts_float): schema=[full_name, age, joined]) iterator = table.fetch_data() - iterator.update_page() - rows = list(iterator.page) - total_rows = iterator.page.total_rows + page = six.next(iterator.pages) + rows = list(page) + total_rows = page.total_rows page_token = iterator.next_page_token self.assertEqual(len(rows), 4) @@ -1088,7 +1089,9 @@ def _bigquery_timestamp_float_repr(ts_float): self.assertEqual(req['path'], '/%s' % PATH) def test_fetch_data_w_alternate_client(self): + import six from google.cloud.bigquery.table import SchemaField + PATH = 'projects/%s/datasets/%s/tables/%s/data' % ( self.PROJECT, self.DS_NAME, self.TABLE_NAME) MAX = 10 @@ -1135,9 +1138,9 @@ def test_fetch_data_w_alternate_client(self): iterator = table.fetch_data( client=client2, max_results=MAX, page_token=TOKEN) - iterator.update_page() - rows = list(iterator.page) - total_rows = getattr(iterator.page, 'total_rows', None) + page = six.next(iterator.pages) + rows = list(page) + total_rows = getattr(page, 'total_rows', None) page_token = iterator.next_page_token self.assertEqual(len(rows), 4) @@ -1157,7 +1160,9 @@ def test_fetch_data_w_alternate_client(self): {'maxResults': MAX, 'pageToken': TOKEN}) def test_fetch_data_w_repeated_fields(self): + import six from google.cloud.bigquery.table import SchemaField + PATH = 'projects/%s/datasets/%s/tables/%s/data' % ( self.PROJECT, self.DS_NAME, self.TABLE_NAME) ROWS = 1234 @@ -1185,9 +1190,9 @@ def test_fetch_data_w_repeated_fields(self): schema=[full_name, struct]) iterator = table.fetch_data() - iterator.update_page() - rows = list(iterator.page) - total_rows = iterator.page.total_rows + page = six.next(iterator.pages) + rows = list(page) + total_rows = page.total_rows page_token = iterator.next_page_token self.assertEqual(len(rows), 1) @@ -1203,7 +1208,9 @@ def test_fetch_data_w_repeated_fields(self): self.assertEqual(req['path'], '/%s' % PATH) def test_fetch_data_w_record_schema(self): + import six from google.cloud.bigquery.table import SchemaField + PATH = 'projects/%s/datasets/%s/tables/%s/data' % ( self.PROJECT, self.DS_NAME, self.TABLE_NAME) ROWS = 1234 @@ -1239,9 +1246,9 @@ def test_fetch_data_w_record_schema(self): schema=[full_name, phone]) iterator = table.fetch_data() - iterator.update_page() - rows = list(iterator.page) - total_rows = iterator.page.total_rows + page = six.next(iterator.pages) + rows = list(page) + total_rows = page.total_rows page_token = iterator.next_page_token self.assertEqual(len(rows), 3) diff --git a/core/google/cloud/iterator.py b/core/google/cloud/iterator.py index 49f70fb54a59..5f774aa4a846 100644 --- a/core/google/cloud/iterator.py +++ b/core/google/cloud/iterator.py @@ -340,7 +340,7 @@ def _next_page(self): items = response.get(self._items_key, ()) page = Page(self, items, self._item_to_value) self._page_start(self, page, response) - self.next_page_token = response.get('nextPageToken') + self.next_page_token = response.get(self._NEXT_TOKEN) return page else: return None diff --git a/docs/bigquery_snippets.py b/docs/bigquery_snippets.py index 5c2593196a58..1e7f7a7084ce 100644 --- a/docs/bigquery_snippets.py +++ b/docs/bigquery_snippets.py @@ -26,6 +26,8 @@ import operator import time +import six + from google.cloud.bigquery import SchemaField from google.cloud.bigquery.client import Client @@ -342,8 +344,8 @@ def _warm_up_inserted_table_data(table): while len(rows) == 0 and counter > 0: counter -= 1 iterator = table.fetch_data() - iterator.update_page() - rows = list(iterator.page) + page = six.next(iterator.pages) + rows = list(page) if len(rows) == 0: time.sleep(5) @@ -422,9 +424,9 @@ def table_upload_from_file(client, to_delete): _warm_up_inserted_table_data(table) iterator = table.fetch_data() - iterator.update_page() - rows = list(iterator.page) - total = iterator.page.total_rows + page = six.next(iterator.pages) + rows = list(page) + total = page.total_rows token = iterator.next_page_token assert len(rows) == total == 2 diff --git a/system_tests/bigquery.py b/system_tests/bigquery.py index 666b91c4aef7..746feac9edcb 100644 --- a/system_tests/bigquery.py +++ b/system_tests/bigquery.py @@ -265,9 +265,11 @@ def test_update_table(self): @staticmethod def _fetch_single_page(table): + import six + iterator = table.fetch_data() - iterator.update_page() - return list(iterator.page) + page = six.next(iterator.pages) + return list(page) def test_insert_data_then_dump_table(self): import datetime From 76fa05c0c9e3458834f6d562f2c3d317827dcf18 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Mon, 31 Oct 2016 23:12:46 -0700 Subject: [PATCH 5/5] Review feedback: moving total_rows from Page to Iterator. Also fixing BigQuery system test in the process. --- bigquery/google/cloud/bigquery/_helpers.py | 6 +++--- bigquery/google/cloud/bigquery/table.py | 5 +++-- bigquery/unit_tests/test_table.py | 8 ++++---- docs/bigquery_snippets.py | 2 +- system_tests/bigquery.py | 7 ++++--- 5 files changed, 15 insertions(+), 13 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_helpers.py b/bigquery/google/cloud/bigquery/_helpers.py index 6d31eea3c03e..89eb390993c6 100644 --- a/bigquery/google/cloud/bigquery/_helpers.py +++ b/bigquery/google/cloud/bigquery/_helpers.py @@ -87,10 +87,10 @@ def _string_from_json(value, _): def _row_from_json(row, schema): - """Convert JSON row data to row w/ appropriate types. + """Convert JSON row data to row with appropriate types. :type row: dict - :param row: + :param row: A JSON response row to be converted. :type schema: tuple :param schema: A tuple of @@ -112,7 +112,7 @@ def _row_from_json(row, schema): def _rows_from_json(rows, schema): - """Convert JSON row data to rows w/ appropriate types.""" + """Convert JSON row data to rows with appropriate types.""" return [_row_from_json(row, schema) for row in rows] diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 26bb80584be6..870d8520159e 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -668,7 +668,7 @@ def fetch_data(self, max_results=None, page_token=None, client=None): back to the ``client`` stored on the current dataset. :rtype: :class:`~google.cloud.iterator.Iterator` - :returns: Iterator of row data :class:`tuple`s. Each page in the + :returns: Iterator of row data :class:`tuple`s. During each page, the iterator will have the ``total_rows`` attribute set, which counts the total number of rows **in the table** (this is distinct from the total number of rows in the @@ -1109,7 +1109,8 @@ def _rows_page_start(iterator, page, response): """ total_rows = response.get('totalRows') if total_rows is not None: - page.total_rows = int(total_rows) + total_rows = int(total_rows) + iterator.total_rows = total_rows # pylint: enable=unused-argument diff --git a/bigquery/unit_tests/test_table.py b/bigquery/unit_tests/test_table.py index 354821e4a149..b46e3a2974df 100644 --- a/bigquery/unit_tests/test_table.py +++ b/bigquery/unit_tests/test_table.py @@ -1072,7 +1072,7 @@ def _bigquery_timestamp_float_repr(ts_float): iterator = table.fetch_data() page = six.next(iterator.pages) rows = list(page) - total_rows = page.total_rows + total_rows = iterator.total_rows page_token = iterator.next_page_token self.assertEqual(len(rows), 4) @@ -1140,7 +1140,7 @@ def test_fetch_data_w_alternate_client(self): client=client2, max_results=MAX, page_token=TOKEN) page = six.next(iterator.pages) rows = list(page) - total_rows = getattr(page, 'total_rows', None) + total_rows = iterator.total_rows page_token = iterator.next_page_token self.assertEqual(len(rows), 4) @@ -1192,7 +1192,7 @@ def test_fetch_data_w_repeated_fields(self): iterator = table.fetch_data() page = six.next(iterator.pages) rows = list(page) - total_rows = page.total_rows + total_rows = iterator.total_rows page_token = iterator.next_page_token self.assertEqual(len(rows), 1) @@ -1248,7 +1248,7 @@ def test_fetch_data_w_record_schema(self): iterator = table.fetch_data() page = six.next(iterator.pages) rows = list(page) - total_rows = page.total_rows + total_rows = iterator.total_rows page_token = iterator.next_page_token self.assertEqual(len(rows), 3) diff --git a/docs/bigquery_snippets.py b/docs/bigquery_snippets.py index 1e7f7a7084ce..f63a26c87db6 100644 --- a/docs/bigquery_snippets.py +++ b/docs/bigquery_snippets.py @@ -426,7 +426,7 @@ def table_upload_from_file(client, to_delete): iterator = table.fetch_data() page = six.next(iterator.pages) rows = list(page) - total = page.total_rows + total = iterator.total_rows token = iterator.next_page_token assert len(rows) == total == 2 diff --git a/system_tests/bigquery.py b/system_tests/bigquery.py index 746feac9edcb..0020672c63a8 100644 --- a/system_tests/bigquery.py +++ b/system_tests/bigquery.py @@ -25,6 +25,10 @@ from system_test_utils import unique_resource_id +def _has_rows(result): + return len(result) > 0 + + def _make_dataset_name(prefix): return '%s%s' % (prefix, unique_resource_id()) @@ -308,9 +312,6 @@ def test_insert_data_then_dump_table(self): rows = () - def _has_rows(result): - return len(result[0]) > 0 - # Allow for "warm up" before rows visible. See: # https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability # 8 tries -> 1 + 2 + 4 + 8 + 16 + 32 + 64 = 127 seconds