-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-7246] Add Google Spanner IO Read on Python SDK #9606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Run Python PreCommit |
a48c033 to
66d0e49
Compare
|
R: @aaltay @chamikaramj |
|
@chamikaramj @aaltay please take a look. thanks. |
chamikaramj
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
| Encapsulates a spanner read operation. | ||
| """ | ||
|
|
||
| __slots__ = () |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This prevents the creation of dictionaries instances and minimizes memory usage.
ref: https://docs.python.org/3/library/collections.html#collections.somenamedtuple._field_defaults
| snapshot_exact_staleness=exact_staleness | ||
| ) | ||
|
|
||
| def with_query(self, sql, params=None, param_types=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually use keyword arguments instead of builder patter for Beam Python SDK connectors (see textio, bigqueryio, etc).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, i'll look into this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the code and remove the builder pattern.
| __all__ = ['ReadFromSpanner', 'ReadOperation',] | ||
|
|
||
|
|
||
| class ReadOperation(collections.namedtuple("ReadOperation", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you document why this has to be a part of the public API ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As in Java, we have a ReadOperation which executes multiple reads (sql / via table). In python, we also have the read_all method which executes read operation in the same manner!
Unfortunately, I forget to add the test case of read_all. (i will add them now)
Example:
reads = [
ReadOperation.with_query('SELECT * FROM users'),
ReadOperation.with_table("roles", ['key', 'rolename'])
]
records = pipeline | ReadFromSpanner(...).read_all(reads)
| return self.read_all(read_operation) | ||
|
|
||
| def read_all(self, read_operations): | ||
| if self._transaction is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document why we need to fork here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you asking for the read_all or for the transaction?
|
|
||
| @staticmethod | ||
| @experimental(extra_message="(ReadFromSpanner)") | ||
| def create_transaction(project_id, instance_id, database_id, credentials=None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this method have to be public (please precede all methods/classes that should not be a part of public API with _)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is a public method. Users can create a transaction with this method and pass it on with the constructor of the spanner read/write operations (same is available in java)
I will add some docs to make it more clear.
Example:
transaction = ReadFromSpanner.create_transaction(
project_id=TEST_PROJECT_ID, instance_id=TEST_INSTANCE_ID,
database_id=TEST_DATABASE_NAME,
exact_staleness=datetime.timedelta(seconds=10))
records = (pipeline
| ReadFromSpanner(project_id=TEST_PROJECT_ID,
instance_id=TEST_INSTANCE_ID,
database_id=TEST_DATABASE_NAME)
.with_transaction(transaction)
.with_query('SELECT * FROM users'))
|
|
||
| class _BatchRead(PTransform): | ||
| """ | ||
| This transform uses the Cloud Spanner BatchSnapshot to perform reads from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
`BatchSnapshot`
Also please describe what BatchSnapshot is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure i will...
| "google.cloud.spanner_v1.keyset.KeySet") | ||
| return cls( | ||
| read_operation="process_read_batch", | ||
| batch_action="generate_read_batches", transaction_action="read", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Storing the name of the attribute to execute in string form is pretty brittle. Please update the code to directly invoke the method from the class instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay.. good idea! I'll update the code... Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this addressed ? Looks like we are string creating magic strings like "process_read_batch" and "generate_read_batches". If you need to enable certain properties, please use booleans instead.
| .snapshot_options) | ||
|
|
||
| reads = [ | ||
| {"read_operation": ro.read_operation, "partitions": p} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto regarding not storing method name in string form. Please fork here instead.
Also we cannot do critical IO operations during job construction. For example, (1) node that submit the job might not have access to Spanner (2) Some jobs (for example, Dataflow templates) will not invoke constructure in repeated executions. So this logic for initial splitting has to be moved to a DoFn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.. thanks for pointing it out!
| reads = [ | ||
| {"read_operation": ro.read_operation, "partitions": p} | ||
| for ro in self._read_operations | ||
| for p in getattr(snapshot, ro.batch_action)(**ro.kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make sure that the way you generate partitions here is compatible with the Java source.
| @mock.patch('apache_beam.io.gcp.spannerio.BatchSnapshot') | ||
| def test_read_with_table_batch(self, mock_batch_snapshot_class, | ||
| mock_client_class): | ||
| mock_client = mock.MagicMock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also please consider adding an integration test similar to following for BigQuery.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure! I'll add some more tests as per the reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just saw the ticket (https://issues.apache.org/jira/browse/BEAM-7246), it says IT not included on this ticket.
Please suggest!
544b3e4 to
470bddf
Compare
|
Run Python PreCommit |
1 similar comment
|
Run Python PreCommit |
8daaefc to
ed90437
Compare
|
Run Python PreCommit |
1 similar comment
|
Run Python PreCommit |
|
Hi @chamikaramj. I've made the changes you requested. Remove the builder pattern and change it to kwargs value. Also, all the IO operations are now in the pipeline instead of in the constructor. Thanks! |
|
R: @udim will you be able to do a review round on this ? |
|
Thanks @mszb for the updates. |
|
@mszb also please remove the "DO NOT MERGE" tag and add a JIRA assuming this is ready for review. |
sure! |
udim
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made an initial pass. I've tried to understand the Spanner Python API, but it's not clear to me how you're supposed to run the partitions generated by snapshot.generate_read_batches on remote machines.
(https://cloud.google.com/spanner/docs/reads#read_data_in_parallel).
sdks/python/setup.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be moved to GCP_REQUIREMENTS?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, what is this dependency used for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This dependency is required by spanner client (ref https://googleapis.dev/python/spanner/latest/_modules/google/cloud/spanner_v1/client.html#Client)
Its is good idea to move it to GCP_REQUIREMENTS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/And/An/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please decorate all DoFns with input and output type hints.
This makes the code easier to read and allow Beam to do type checks.
For example:
| class _NaiveSpannerReadDoFn(DoFn): | |
| from typing import Any, Dict | |
| from apache_beam import typehints | |
| SerializedBatchSnapshot = Dict[Any, Any] | |
| @typehints.with_input_types(SerializedBatchSnapshot) | |
| @typehints.with_output_types(<row type>) | |
| class _NaiveSpannerReadDoFn(DoFn): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, i'll work on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this addressed in all relevant locations ? Please reply to addressed comments with ""Done" or resolve addressed comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add element['read_operation'] to this error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, i'll update this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this wasn't addressed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a second query example? Perhaps make this an example that uses params?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll add the param example here.
What I intend to mention here is that the user you run both ReadOperation's with sql & with table at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| ReadOperation.sql('Select name, email from customers'), | |
| ReadOperation.query('Select name, email from customers'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the change. It looks better, i'll update the example and test cases as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs an integration test before we can be sure it works as expected.
For example, I'm not sure if closing the BatchSnapshot here closes the read transaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The jira mentions that the IT and performance testing will be on a separate ticket!
https://issues.apache.org/jira/browse/BEAM-7246
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change the classes here to private (for example, _ReadFromSpanner) till we have proper integration tests in place to prevent users from using a solution without proper testing in production. Also create a separate ticket for integration tests and refer to it here.
Having some integration tests in place is a must before making this publicly available IMHO. Performance tests are also helpful but optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @chamikaramj. Thanks for the feedback. Yea IT tests should be there to make sure it's working properly. It's a good thought to make this transform private while some tests is remaining.
I have made the changes you mentioned. And also created the ticket for the IT tests.
BEAM-8949
a5e299f to
243e04e
Compare
|
retest this please |
0d8c4ba to
d1326d1
Compare
|
Run Portable_Python PreCommit |
|
Run Python PreCommit |
|
Hi @udim @chamikaramj. I've made the changes you mention. Please review them. |
|
Sorry about the delay here. Still reviewing. |
chamikaramj
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Added some more comments.
BTW, to make the review process easier, either resolve already addressed comments or add a comment "Done". Also, no need to reply to comments that you hope to address in the future :).
| "google.cloud.spanner_v1.keyset.KeySet") | ||
| return cls( | ||
| read_operation="process_read_batch", | ||
| batch_action="generate_read_batches", transaction_action="read", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this addressed ? Looks like we are string creating magic strings like "process_read_batch" and "generate_read_batches". If you need to enable certain properties, please use booleans instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DItto. Can we use booleans to configure parameters instead of introducing magic strings ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this addressed in all relevant locations ? Please reply to addressed comments with ""Done" or resolve addressed comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this in the DoFn implementation ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
No need for this code. I've used this for local test, thanks for pointing this out!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to close/shutdown the spanner_client as well ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no close/shutdown methods on the spanner client object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why we are forking here based on whether a transaction is provided or not. Seems like, in Java version, we use transactions for both native and batch versions of the read transform: https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java#L420
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If user does not provide the transaction, we use BatchSnapshot to generate query batches. The reason we can not use the transaction here because google.cloud.spanner_v1.database.BatchSnapshot.generate_read_batches requires snapshot with multi_use=True and uses a private method to create snapshot instance (google.cloud.spanner_v1.database.BatchSnapshot._get_snapshot).
On the other hand, create_trasaction simply returns the transaction_id and session_id which we reconstruct in the naive read using google.cloud.spanner_v1.database.BatchSnapshot.from_dict with no way to set multi_use=True Ref: https://github.com/googleapis/google-cloud-python/blob/master/spanner/google/cloud/spanner_v1/database.py#L651
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So not using a transaction may offer better performance ? We should clarify this in the documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't review unit tests in detail yet, but please make sure that we at least have the same set of unit tests as Java.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, i took the references from org.apache.beam.sdk.io.gcp.spanner.SpannerIOReadTest and implementing them in pythonic way.
|
cc: @nielm @nithinsujir who are more familiar regarding Cloud Spanner in case they have additional comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are "naive" reads? Do they mean single reads (point reads)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. In Naive reads transform we do not use spanner partitioning query in the transform.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need an error here if both table and sql are empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I implemented the checks in the expand method. https://github.com/apache/beam/pull/9606/files#diff-fbff531869ead87113e7c97e085d7013R513
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 Also, is there a better name than Naive? I'm not able to deduce what it's trying to convey.
|
Run Python PreCommit |
88bc4fe to
3502025
Compare
|
@chamikaramj is this ready to be merged? Are all the open comments resolved? |
|
Still reviewing the latest round of updates. Thanks. |
| def process(self, element, transaction_info): | ||
| # We used batch snapshot to reuse the same transaction passed through the | ||
| # side input | ||
| self._snapshot = BatchSnapshot.from_dict(self._database, transaction_info) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can we make sure that what's passed by transaction_info is consistent with what BatchSnapshot.from_dict() expects ? (for example, index). Can we introduce some sort of validation before this call ?
| @mock.patch('apache_beam.io.gcp.experimental.spannerio.BatchSnapshot') | ||
| class SpannerReadTest(unittest.TestCase): | ||
|
|
||
| def test_read_with_query_batch(self, mock_batch_snapshot_class, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about runReadUsingIndex ?
|
Thanks. Mostly looks good. Added few more comments. |
|
Any updates ? |
|
retest this please |
1 similar comment
|
retest this please |
added spanne read io onto python sdk refactor code fix docstrings fix linting issue fix linting issue added display data method and its test cases. fix lint Update sdks/python/apache_beam/io/gcp/spannerio.py Co-Authored-By: Udi Meiri <udim@users.noreply.github.com> Update sdks/python/apache_beam/io/gcp/spannerio.py Co-Authored-By: Udi Meiri <udim@users.noreply.github.com> Update sdks/python/apache_beam/io/gcp/spannerio.py Co-Authored-By: Udi Meiri <udim@users.noreply.github.com> Update sdks/python/apache_beam/io/gcp/spannerio.py Co-Authored-By: Udi Meiri <udim@users.noreply.github.com> Update sdks/python/apache_beam/io/gcp/spannerio.py Co-Authored-By: Udi Meiri <udim@users.noreply.github.com> adds typehints and refactor code fix pylint fix import issues fix type hints changed the classes to private to prevent users from using the functionality while integration tests are in development. moved spannerio files to gcp experimental folder refactor code add docs
f5e1098 to
f556f22
Compare
|
Retest this please |
1 similar comment
|
Retest this please |
|
LGTM. Thanks. We can get this in when tests pass. |
|
Thanks @chamikaramj for your support! @aaltay looks like the tests are passing. Would you be able to merge this? |
|
Thank you. |
This is the read implementation of the Spanner.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.