Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions bigquery/google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,34 @@ def _string_from_json(value, _):
}


def _row_from_json(row, schema):
"""Convert JSON row data to row with appropriate types.

:type row: dict
:param row: A JSON response row to be converted.

:type schema: tuple
:param schema: A tuple of
:class:`~google.cloud.bigquery.schema.SchemaField`.

:rtype: tuple
:returns: A tuple of data converted to native types.
"""
row_data = []
for field, cell in zip(schema, row['f']):
converter = _CELLDATA_FROM_JSON[field.field_type]
if field.mode == 'REPEATED':
row_data.append([converter(item, field)
for item in cell['v']])
else:
row_data.append(converter(cell['v'], field))

return tuple(row_data)


def _rows_from_json(rows, schema):
"""Convert JSON row data to rows w/ appropriate types."""
rows_data = []
for row in rows:
row_data = []
for field, cell in zip(schema, row['f']):
converter = _CELLDATA_FROM_JSON[field.field_type]
if field.mode == 'REPEATED':
row_data.append([converter(item, field)
for item in cell['v']])
else:
row_data.append(converter(cell['v'], field))
rows_data.append(tuple(row_data))
return rows_data
"""Convert JSON row data to rows with appropriate types."""
return [_row_from_json(row, schema) for row in rows]


class _ConfigurationProperty(object):
Expand Down
55 changes: 30 additions & 25 deletions bigquery/google/cloud/bigquery/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from google.cloud._helpers import _datetime_from_microseconds
from google.cloud.exceptions import NotFound
from google.cloud.bigquery.table import Table
from google.cloud.iterator import HTTPIterator


class AccessGrant(object):
Expand Down Expand Up @@ -542,35 +543,24 @@ def list_tables(self, max_results=None, page_token=None):
https://cloud.google.com/bigquery/docs/reference/v2/tables/list

:type max_results: int
:param max_results: maximum number of tables to return, If not
passed, defaults to a value set by the API.
:param max_results: (Optional) Maximum number of tables to return.
If not passed, defaults to a value set by the API.

:type page_token: str
:param page_token: opaque marker for the next "page" of datasets. If
not passed, the API will return the first page of
datasets.

:rtype: tuple, (list, str)
:returns: list of :class:`google.cloud.bigquery.table.Table`, plus a
"next page token" string: if not ``None``, indicates that
more tables can be retrieved with another call (pass that
value as ``page_token``).
"""
params = {}

if max_results is not None:
params['maxResults'] = max_results

if page_token is not None:
params['pageToken'] = page_token
:param page_token: (Optional) Opaque marker for the next "page" of
datasets. If not passed, the API will return the
first page of datasets.

:rtype: :class:`~google.cloud.iterator.Iterator`
:returns: Iterator of :class:`~google.cloud.bigquery.table.Table`
contained within the current dataset.
"""
path = '/projects/%s/datasets/%s/tables' % (self.project, self.name)
connection = self._client.connection
resp = connection.api_request(method='GET', path=path,
query_params=params)
tables = [Table.from_api_repr(resource, self)
for resource in resp.get('tables', ())]
return tables, resp.get('nextPageToken')
result = HTTPIterator(client=self._client, path=path,
item_to_value=_item_to_table, items_key='tables',
page_token=page_token, max_results=max_results)
result.dataset = self
return result

def table(self, name, schema=()):
"""Construct a table bound to this dataset.
Expand All @@ -585,3 +575,18 @@ def table(self, name, schema=()):
:returns: a new ``Table`` instance
"""
return Table(name, dataset=self, schema=schema)


def _item_to_table(iterator, resource):
"""Convert a JSON table to the native object.

:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.

:type resource: dict
:param resource: An item to be converted to a table.

:rtype: :class:`~google.cloud.bigquery.table.Table`
:returns: The next table in the page.
"""
return Table.from_api_repr(resource, iterator.dataset)
103 changes: 67 additions & 36 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
from google.cloud.streaming.transfer import RESUMABLE_UPLOAD
from google.cloud.streaming.transfer import Upload
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery._helpers import _rows_from_json
from google.cloud.bigquery._helpers import _row_from_json
from google.cloud.iterator import HTTPIterator


_TABLE_HAS_NO_SCHEMA = "Table has no schema: call 'table.reload()'"
Expand Down Expand Up @@ -653,47 +654,36 @@ def fetch_data(self, max_results=None, page_token=None, client=None):
up-to-date with the schema as defined on the back-end: if the
two schemas are not identical, the values returned may be
incomplete. To ensure that the local copy of the schema is
up-to-date, call the table's ``reload`` method.
up-to-date, call :meth:`reload`.

:type max_results: int
:param max_results: (Optional) maximum number of rows to return.
:param max_results: (Optional) Maximum number of rows to return.

:type page_token: str
:param page_token:
(Optional) token representing a cursor into the table's rows.

:type client: :class:`~google.cloud.bigquery.client.Client` or
``NoneType``
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current dataset.

:rtype: tuple
:returns: ``(row_data, total_rows, page_token)``, where ``row_data``
is a list of tuples, one per result row, containing only
the values; ``total_rows`` is a count of the total number
of rows in the table; and ``page_token`` is an opaque
string which can be used to fetch the next batch of rows
(``None`` if no further batches can be fetched).
:param page_token: (Optional) Token representing a cursor into the
table's rows.

:type client: :class:`~google.cloud.bigquery.client.Client`
:param client: (Optional) The client to use. If not passed, falls
back to the ``client`` stored on the current dataset.

:rtype: :class:`~google.cloud.iterator.Iterator`
:returns: Iterator of row data :class:`tuple`s. During each page, the
iterator will have the ``total_rows`` attribute set,
which counts the total number of rows **in the table**
(this is distinct from the total number of rows in the
current page: ``iterator.page.num_items``).
"""
client = self._require_client(client)
params = {}

if max_results is not None:
params['maxResults'] = max_results

if page_token is not None:
params['pageToken'] = page_token

response = client.connection.api_request(method='GET',
path='%s/data' % self.path,
query_params=params)
total_rows = response.get('totalRows')
if total_rows is not None:
total_rows = int(total_rows)
page_token = response.get('pageToken')
rows_data = _rows_from_json(response.get('rows', ()), self._schema)

return rows_data, total_rows, page_token
path = '%s/data' % (self.path,)
iterator = HTTPIterator(client=client, path=path,
item_to_value=_item_to_row, items_key='rows',
page_token=page_token, max_results=max_results,
page_start=_rows_page_start)
iterator.schema = self._schema
# Over-ride the key used to retrieve the next page token.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

iterator._NEXT_TOKEN = 'pageToken'
return iterator

def insert_data(self,
rows,
Expand Down Expand Up @@ -1083,6 +1073,47 @@ def _build_schema_resource(fields):
return infos


def _item_to_row(iterator, resource):
"""Convert a JSON row to the native object.

.. note::

This assumes that the ``schema`` attribute has been
added to the iterator after being created, which
should be done by the caller.

:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.

:type resource: dict
:param resource: An item to be converted to a row.

:rtype: tuple
:returns: The next row in the page.
"""
return _row_from_json(resource, iterator.schema)


# pylint: disable=unused-argument

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

def _rows_page_start(iterator, page, response):
"""Grab total rows after a :class:`~google.cloud.iterator.Page` started.

:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.

:type page: :class:`~google.cloud.iterator.Page`
:param page: The page that was just created.

:type response: dict
:param response: The JSON API response for a page of rows in a table.
"""
total_rows = response.get('totalRows')
if total_rows is not None:
total_rows = int(total_rows)
iterator.total_rows = total_rows
# pylint: enable=unused-argument


class _UploadConfig(object):
"""Faux message FBO apitools' 'configure_request'."""
accept = ['*/*']
Expand Down
24 changes: 21 additions & 3 deletions bigquery/unit_tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,10 +636,18 @@ def test_delete_w_alternate_client(self):
self.assertEqual(req['path'], '/%s' % PATH)

def test_list_tables_empty(self):
import six

conn = _Connection({})
client = _Client(project=self.PROJECT, connection=conn)
dataset = self._makeOne(self.DS_NAME, client=client)
tables, token = dataset.list_tables()

iterator = dataset.list_tables()
self.assertIs(iterator.dataset, dataset)
page = six.next(iterator.pages)
tables = list(page)
token = iterator.next_page_token

self.assertEqual(tables, [])
self.assertIsNone(token)
self.assertEqual(len(conn._requested), 1)
Expand All @@ -649,6 +657,7 @@ def test_list_tables_empty(self):
self.assertEqual(req['path'], '/%s' % PATH)

def test_list_tables_defaults(self):
import six
from google.cloud.bigquery.table import Table

TABLE_1 = 'table_one'
Expand Down Expand Up @@ -677,7 +686,11 @@ def test_list_tables_defaults(self):
client = _Client(project=self.PROJECT, connection=conn)
dataset = self._makeOne(self.DS_NAME, client=client)

tables, token = dataset.list_tables()
iterator = dataset.list_tables()
self.assertIs(iterator.dataset, dataset)
page = six.next(iterator.pages)
tables = list(page)
token = iterator.next_page_token

self.assertEqual(len(tables), len(DATA['tables']))
for found, expected in zip(tables, DATA['tables']):
Expand All @@ -692,6 +705,7 @@ def test_list_tables_defaults(self):
self.assertEqual(req['path'], '/%s' % PATH)

def test_list_tables_explicit(self):
import six
from google.cloud.bigquery.table import Table

TABLE_1 = 'table_one'
Expand Down Expand Up @@ -719,7 +733,11 @@ def test_list_tables_explicit(self):
client = _Client(project=self.PROJECT, connection=conn)
dataset = self._makeOne(self.DS_NAME, client=client)

tables, token = dataset.list_tables(max_results=3, page_token=TOKEN)
iterator = dataset.list_tables(max_results=3, page_token=TOKEN)
self.assertIs(iterator.dataset, dataset)
page = six.next(iterator.pages)
tables = list(page)
token = iterator.next_page_token

self.assertEqual(len(tables), len(DATA['tables']))
for found, expected in zip(tables, DATA['tables']):
Expand Down
Loading