Skip to content
27 changes: 25 additions & 2 deletions bigquery/google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,21 +224,44 @@ def _row_tuple_from_json(row, schema):

Args:
row (Dict): A JSON response row to be converted.
schema (Tuple): A tuple of :class:`~google.cloud.bigquery.schema.SchemaField`.
schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]): Specification of the field types in ``row``.

Returns:
Tuple: A tuple of data converted to native types.
"""
from google.cloud.bigquery.schema import _to_schema_fields

schema = _to_schema_fields(schema)

row_data = []
for field, cell in zip(schema, row["f"]):
row_data.append(_field_from_json(cell["v"], field))
return tuple(row_data)


def _rows_from_json(values, schema):
"""Convert JSON row data to rows with appropriate types."""
"""Convert JSON row data to rows with appropriate types.

Args:
values (Sequence[Dict]): The list of responses (JSON rows) to convert.
schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
The table's schema. If any item is a mapping, its content must be
compatible with
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.

Returns:
List[:class:`~google.cloud.bigquery.Row`]
"""
from google.cloud.bigquery import Row
from google.cloud.bigquery.schema import _to_schema_fields

schema = _to_schema_fields(schema)
field_to_index = _field_to_index_mapping(schema)
return [Row(_row_tuple_from_json(r, schema), field_to_index) for r in values]

Expand Down
58 changes: 46 additions & 12 deletions bigquery/google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,10 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
Args:
dataframe (pandas.DataFrame):
DataFrame for which the client determines the BigQuery schema.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
bq_schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
A BigQuery schema. Use this argument to override the autodetected
type for some or all of the DataFrame columns.

Expand All @@ -249,6 +252,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
any column cannot be determined.
"""
if bq_schema:
bq_schema = schema._to_schema_fields(bq_schema)
for field in bq_schema:
if field.field_type in schema._STRUCT_TYPES:
raise ValueError(
Expand Down Expand Up @@ -297,9 +301,12 @@ def dataframe_to_arrow(dataframe, bq_schema):
Args:
dataframe (pandas.DataFrame):
DataFrame to convert to Arrow table.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
Desired BigQuery schema. Number of columns must match number of
columns in the DataFrame.
bq_schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
Desired BigQuery schema. The number of columns must match the
number of columns in the DataFrame.

Returns:
pyarrow.Table:
Expand All @@ -310,6 +317,8 @@ def dataframe_to_arrow(dataframe, bq_schema):
column_and_index_names = set(
name for name, _ in list_columns_and_indexes(dataframe)
)

bq_schema = schema._to_schema_fields(bq_schema)
bq_field_names = set(field.name for field in bq_schema)

extra_fields = bq_field_names - column_and_index_names
Expand Down Expand Up @@ -354,7 +363,10 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
Args:
dataframe (pandas.DataFrame):
DataFrame to convert to Parquet file.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
bq_schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
Desired BigQuery schema. Number of columns must match number of
columns in the DataFrame.
filepath (str):
Expand All @@ -368,6 +380,7 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
if pyarrow is None:
raise ValueError("pyarrow is required for BigQuery schema conversion.")

bq_schema = schema._to_schema_fields(bq_schema)
arrow_table = dataframe_to_arrow(dataframe, bq_schema)
pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression)

Expand All @@ -388,20 +401,24 @@ def _tabledata_list_page_to_arrow(page, column_names, arrow_types):
return pyarrow.RecordBatch.from_arrays(arrays, names=column_names)


def download_arrow_tabledata_list(pages, schema):
def download_arrow_tabledata_list(pages, bq_schema):
"""Use tabledata.list to construct an iterable of RecordBatches.

Args:
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
An iterator over the result pages.
schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
bq_schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
A decription of the fields in result pages.
Yields:
:class:`pyarrow.RecordBatch`
The next page of records as a ``pyarrow`` record batch.
"""
column_names = bq_to_arrow_schema(schema) or [field.name for field in schema]
arrow_types = [bq_to_arrow_data_type(field) for field in schema]
bq_schema = schema._to_schema_fields(bq_schema)
column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema]
arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema]

for page in pages:
yield _tabledata_list_page_to_arrow(page, column_names, arrow_types)
Expand All @@ -422,9 +439,26 @@ def _tabledata_list_page_to_dataframe(page, column_names, dtypes):
return pandas.DataFrame(columns, columns=column_names)


def download_dataframe_tabledata_list(pages, schema, dtypes):
"""Use (slower, but free) tabledata.list to construct a DataFrame."""
column_names = [field.name for field in schema]
def download_dataframe_tabledata_list(pages, bq_schema, dtypes):
"""Use (slower, but free) tabledata.list to construct a DataFrame.

Args:
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
An iterator over the result pages.
bq_schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
A decription of the fields in result pages.
dtypes(Mapping[str, numpy.dtype]):
The types of columns in result data to hint construction of the
resulting DataFrame. Not all column types have to be specified.
Yields:
:class:`pandas.DataFrame`
The next page of records as a ``pandas.DataFrame`` record batch.
"""
bq_schema = schema._to_schema_fields(bq_schema)
column_names = [field.name for field in bq_schema]
for page in pages:
yield _tabledata_list_page_to_dataframe(page, column_names, dtypes)

Expand Down
11 changes: 7 additions & 4 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.schema import _to_schema_fields
from google.cloud.bigquery.table import _EmptyRowIterator
from google.cloud.bigquery.table import RangePartitioning
from google.cloud.bigquery.table import _table_arg_to_table_ref
Expand Down Expand Up @@ -1225,8 +1226,10 @@ def range_partitioning(self, value):

@property
def schema(self):
"""List[google.cloud.bigquery.schema.SchemaField]: Schema of the
destination table.
"""Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]: Schema of the destination table.

See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.schema
Expand All @@ -1242,8 +1245,8 @@ def schema(self, value):
self._del_sub_prop("schema")
return

if not all(hasattr(field, "to_api_repr") for field in value):
raise ValueError("Schema items must be fields")
value = _to_schema_fields(value)

_helpers._set_sub_prop(
self._properties,
["load", "schema", "fields"],
Expand Down
35 changes: 35 additions & 0 deletions bigquery/google/cloud/bigquery/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

"""Schemas for BigQuery tables / queries."""

import collections

from google.cloud.bigquery_v2 import types


Expand Down Expand Up @@ -256,3 +258,36 @@ def _build_schema_resource(fields):
Sequence[Dict]: Mappings describing the schema of the supplied fields.
"""
return [field.to_api_repr() for field in fields]


def _to_schema_fields(schema):
"""Coerce `schema` to a list of schema field instances.

Args:
schema(Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
Table schema to convert. If some items are passed as mappings,
their content must be compatible with
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.

Returns:
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`]

Raises:
Exception: If ``schema`` is not a sequence, or if any item in the
sequence is not a :class:`~google.cloud.bigquery.schema.SchemaField`
instance or a compatible mapping representation of the field.
"""
for field in schema:
if not isinstance(field, (SchemaField, collections.Mapping)):
raise ValueError(
"Schema items must either be fields or compatible "
"mapping representations."
)

return [
field if isinstance(field, SchemaField) else SchemaField.from_api_repr(field)
for field in schema
]
36 changes: 26 additions & 10 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
import google.cloud._helpers
from google.cloud.bigquery import _helpers
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.schema import _build_schema_resource
from google.cloud.bigquery.schema import _parse_schema_resource
from google.cloud.bigquery.schema import _to_schema_fields
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration

Expand Down Expand Up @@ -305,8 +305,13 @@ class Table(object):
A pointer to a table. If ``table_ref`` is a string, it must
included a project ID, dataset ID, and table ID, each separated
by ``.``.
schema (List[google.cloud.bigquery.schema.SchemaField]):
The table's schema
schema (Optional[Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]]):
The table's schema. If any item is a mapping, its content must be
compatible with
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.
"""

_PROPERTY_TO_API_FIELD = {
Expand Down Expand Up @@ -369,13 +374,17 @@ def require_partition_filter(self, value):

@property
def schema(self):
"""List[google.cloud.bigquery.schema.SchemaField]: Table's schema.
"""Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]:
Table's schema.

Raises:
TypeError: If 'value' is not a sequence
ValueError:
If any item in the sequence is not a
:class:`~google.cloud.bigquery.schema.SchemaField`
Exception:
If ``schema`` is not a sequence, or if any item in the sequence
is not a :class:`~google.cloud.bigquery.schema.SchemaField`
instance or a compatible mapping representation of the field.
"""
prop = self._properties.get("schema")
if not prop:
Expand All @@ -387,9 +396,8 @@ def schema(self):
def schema(self, value):
if value is None:
self._properties["schema"] = None
elif not all(isinstance(field, SchemaField) for field in value):
raise ValueError("Schema items must be fields")
else:
value = _to_schema_fields(value)
self._properties["schema"] = {"fields": _build_schema_resource(value)}

@property
Expand Down Expand Up @@ -1284,6 +1292,13 @@ class RowIterator(HTTPIterator):
api_request (Callable[google.cloud._http.JSONConnection.api_request]):
The function to use to make API requests.
path (str): The method path to query for the list of items.
schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
The table's schema. If any item is a mapping, its content must be
compatible with
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.
page_token (str): A token identifying a page in a result set to start
fetching results from.
max_results (int, optional): The maximum number of results to fetch.
Expand Down Expand Up @@ -1328,6 +1343,7 @@ def __init__(
page_start=_rows_page_start,
next_token="pageToken",
)
schema = _to_schema_fields(schema)
self._field_to_index = _helpers._field_to_index_mapping(schema)
self._page_size = page_size
self._preserve_order = False
Expand Down
4 changes: 2 additions & 2 deletions bigquery/samples/query_external_sheets_permanent_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ def query_external_sheets_permanent_table(dataset_id):
external_config.source_uris = [sheet_url]
external_config.options.skip_leading_rows = 1 # Optionally skip header row.
external_config.options.range = (
"us-states!A20:B49"
) # Optionally set range of the sheet to query from.
"us-states!A20:B49" # Optionally set range of the sheet to query from.
)
table.external_data_configuration = external_config

# Create a permanent table linked to the Sheets file.
Expand Down
4 changes: 2 additions & 2 deletions bigquery/samples/query_external_sheets_temporary_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def query_external_sheets_temporary_table():
]
external_config.options.skip_leading_rows = 1 # Optionally skip header row.
external_config.options.range = (
"us-states!A20:B49"
) # Optionally set range of the sheet to query from.
"us-states!A20:B49" # Optionally set range of the sheet to query from.
)
table_id = "us_states"
job_config = bigquery.QueryJobConfig()
job_config.table_definitions = {table_id: external_config}
Expand Down
Loading