-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-7246] Added Google Spanner Write Transform #10712
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-7246] Added Google Spanner Write Transform #10712
Conversation
|
ping for tests |
|
R: @chamikaramj |
9bcaf69 to
c8831cb
Compare
|
retest this please |
|
Run Python Precommit |
mszb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I've added some explanations to help the review process. Feedbacks are always welcome!
|
|
||
| def process(self, element): | ||
| if element.primary().operation == 'delete': | ||
| # As delete mutations are not batchable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per java implementation, we can not determine the size of delete mutation, so we simply mark them as unbatchable mutations.
| "provided: <%s: %s>" % (self.__class__.__name__, | ||
| str(self.__dict__))) | ||
|
|
||
| def __call__(self, *args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since in python we don't use builder pattern, I use this magic method to construct the mutation object! This class also have the static method which I thought is more convenient for the user to use.
| from google.cloud.spanner import KeySet | ||
| from google.cloud.spanner_v1 import batch | ||
| from google.cloud.spanner_v1.database import BatchSnapshot | ||
| from google.cloud.spanner_v1.proto.mutation_pb2 import Mutation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since spanner package wont expose the Mutation and batch objects, so this is the only way to import it.
| values: Values to be modified. | ||
| """ | ||
| return _Mutator( | ||
| mutation=Mutation(insert=batch._make_write_pb(table, columns, values)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we need to calculate the mutation size to create the batches, we need to create Mutation object which has the method ByteSize() and we uses its value to determine how many mutations we can have in one batch on _BatchFn dofn. We are not passing these mutation object to spanner or anywhere, just using them to calculate the bytesize for write transform!
|
|
||
| @with_input_types(MutationGroup) | ||
| @with_output_types(MutationGroup) | ||
| class _BatchableFilterFn(DoFn): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this DoFn, we are just filtering the objects whos mutation size is greater the max_batch_size provided by the user. This helps us to focus on only batchable mutation which will further passed on the _BatchFn dofn to make batches.
|
Retest this please |
|
ping for test |
|
cc: @nithinsujir and @nielm |
|
Just to let you know that we've just introduced Python autoformatter. Your merge conflict might be a result of this. |
No worries @kamilwu , i'll resolve the conflicts on my next commit. Thanks for the heads up :) |
|
ping for test |
|
Trigger tests. |
|
Could you also add a new feature note in https://github.com/apache/beam/blob/master/CHANGES.md |
Done. :) |
|
ping for test |
|
Trigger tests. |
|
retest this please |
|
retest this please |
|
/cc @markflyhigh - Tests seems to be not triggering? |
|
@aaltay @markflyhigh Seems like jobs are triggered and completed successfully but showing no activity on github! https://builds.apache.org/job/beam_PreCommit_Python_Commit/11080/ |
|
retest this please |
|
@aaltay All three tests are failed due to Portable_Python PreCommit PythonFormatter PreCommit Could you please rerun these test, possibly it'll fix the issue! |
|
I believe this is fixed with #10844, you may need to rebase. |
|
@aaltay I've rebased my branch, could you please trigger the tests!
|
|
retest this please |
1 similar comment
|
retest this please |
|
@chamikaramj @nielm could you please verify the changes. |
nielm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Thanks @nielm .. @chamikaramj is there anything you would like to add or we are ready for merge? |
|
retest this please |
1 similar comment
|
retest this please |
|
LGTM. Thanks. |
In addition to
ReadFromSpanner(PR), this transform allows users to write on the google cloud spanner by usingWriteToSpannertransform.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.