-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Description
What happened?
Code:
# 1. Define a reference to a dataset in your BigQuery project.
temp_dataset_ref = DatasetReference(
projectId='foo',
datasetId='my_temp_beam_datasets'
)
# 2. Create your read requests as before.
read_requests = (
p
| 'PeriodicImpulse' >> PeriodicImpulse(...)
| 'MapToReadRequest' >> beam.Map(
lambda x: ReadFromBigQueryRequest(
query='SELECT * FROM `foo.your_dataset.your_table`'
)
)
)
# 3. Pass the temp_dataset reference to the ReadAllFromBigQuery transform.
# run pipeline in baz project
results = read_requests | "ReadAllFromBigQuery" >> ReadAllFromBigQuery(
temp_dataset=temp_dataset_ref
)
what happens:
Dataset baz:my_temp_beam_datasets does not exist so we will create it as temporary with location=US
expected:
BQ client will create dataset in foo project
issue:
beam/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
Lines 300 to 306 in c2c9275
| def _setup_temporary_dataset( | |
| self, | |
| bq: bigquery_tools.BigQueryWrapper, | |
| element: 'ReadFromBigQueryRequest'): | |
| location = bq.get_query_location( | |
| self._get_project(), element.query, not element.use_standard_sql) | |
| bq.create_temporary_dataset(self._get_project(), location) |
beam/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
Lines 231 to 241 in c2c9275
| def _get_temp_dataset_id(self): | |
| if self.temp_dataset is None: | |
| return None | |
| elif isinstance(self.temp_dataset, DatasetReference): | |
| return self.temp_dataset.datasetId | |
| elif isinstance(self.temp_dataset, str): | |
| return self.temp_dataset | |
| else: | |
| raise ValueError("temp_dataset has to be either str or DatasetReference") | |
project for other transforms is taking into account temp_dataset
beam/sdks/python/apache_beam/io/gcp/bigquery.py
Lines 773 to 783 in a742b90
| def _get_project(self): | |
| """Returns the project that queries and exports will be billed to.""" | |
| project = self.options.view_as(GoogleCloudOptions).project | |
| if isinstance(project, vp.ValueProvider): | |
| project = project.get() | |
| if self.temp_dataset: | |
| return self.temp_dataset.projectId | |
| if not project: | |
| project = self.project | |
| return project |
but here it is only using project used for running dataflow
beam/sdks/python/apache_beam/io/gcp/bigquery.py
Lines 3159 to 3183 in a742b90
| project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project | |
| unique_id = str(uuid.uuid4())[0:10] | |
| try: | |
| step_name = self.label | |
| except AttributeError: | |
| step_name = 'ReadAllFromBigQuery_%d' % ReadAllFromBigQuery.COUNTER | |
| ReadAllFromBigQuery.COUNTER += 1 | |
| sources_to_read, cleanup_locations = ( | |
| pcoll | |
| | beam.ParDo( | |
| _BigQueryReadSplit( | |
| options=pcoll.pipeline.options, | |
| gcs_location=self.gcs_location, | |
| validate=self.validate, | |
| bigquery_job_labels=self.bigquery_job_labels, | |
| job_name=job_name, | |
| step_name=step_name, | |
| unique_id=unique_id, | |
| kms_key=self.kms_key, | |
| project=project, | |
| temp_dataset=self.temp_dataset, | |
| query_priority=self.query_priority)).with_outputs( | |
| "location_to_cleanup", main="files_to_read") |
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner