From c8831cbcc13de6e701269d7ed7721e7aee70e482 Mon Sep 17 00:00:00 2001 From: Shoaib Zafar Date: Thu, 30 Jan 2020 12:44:41 +0500 Subject: [PATCH 1/6] Added Spanner Write Transform --- .../io/gcp/experimental/spannerio.py | 431 +++++++++++++++++- .../io/gcp/experimental/spannerio_test.py | 155 ++++++- 2 files changed, 581 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py index 21a2f8f8d306..42329db998b8 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py @@ -22,6 +22,8 @@ This is an experimental module for reading and writing data from Google Cloud Spanner. Visit: https://cloud.google.com/spanner for more details. +Reading Data from Cloud Spanner. + To read from Cloud Spanner apply ReadFromSpanner transformation. It will return a PCollection, where each element represents an individual row returned from the read operation. Both Query and Read APIs are supported. @@ -109,20 +111,74 @@ ReadFromSpanner takes this transform in the constructor and pass this to the read pipeline as the singleton side input. + +Writing Data to Cloud Spanner. + +The WriteToSpanner transform writes to Cloud Spanner by executing a +collection a input rows (WriteMutation). The mutations are grouped into +batches for efficiency. + +WriteToSpanner transform relies on the WriteMutation objects which is exposed +by the SpannerIO API. WriteMutation have five static methods (insert, update, +insert_or_update, replace, delete). These methods returns the instance of the +_Mutator object which contains the mutation type and the Spanner Mutation +object. For more details, review the docs of the class SpannerIO.WriteMutation. +For example::: + + mutations = [ + WriteMutation.insert(table='user', columns=('name', 'email'), + values=[('sara'. 'sara@dev.com')]) + ] + _ = (p + | beam.Create(mutations) + | WriteToSpanner( + project_id=SPANNER_PROJECT_ID, + instance_id=SPANNER_INSTANCE_ID, + database_id=SPANNER_DATABASE_NAME) + ) + +You can also create WriteMutation via calling its constructor. For example::: + + mutations = [ + WriteMutation(insert='users', columns=('name', 'email'), + values=[('sara", 'sara@example.com')]) + ] + +For more information, review the docs available on WriteMutation class. + +WriteToSpanner transform also takes 'max_batch_size_bytes' param which is set +to 1MB (1048576 bytes) by default. This parameter used to reduce the number of +transactions sent to spanner by grouping the mutation into batches. Setting +this either to smaller value or zero to disable batching. + +WriteToSpanner transforms starts with the grouping into batches. The first step +in this process is to make the make the mutation groups of the WriteMutation +objects and then filtering them into batchable and unbatchable mutation +groups by getting the mutation size from the method available in the +`google.cloud.spanner_v1.proto.mutation_pb2.Mutation.ByteSize`, if the size is +smaller than value of "max_batch_size_bytes" param, it will be tagged as +"unbatchable" mutation. After this all the batchable mutation are merged into a +single mutation group whos size is not larger than the "max_batch_size_bytes", +after this process, all the mutation groups flatten together to process. """ from __future__ import absolute_import import typing +from collections import deque from collections import namedtuple from apache_beam import Create from apache_beam import DoFn +from apache_beam import Flatten from apache_beam import ParDo from apache_beam import Reshuffle +from apache_beam.metrics import Metrics from apache_beam.pvalue import AsSingleton from apache_beam.pvalue import PBegin +from apache_beam.pvalue import TaggedOutput from apache_beam.transforms import PTransform from apache_beam.transforms import ptransform_fn +from apache_beam.transforms import window from apache_beam.transforms.display import DisplayDataItem from apache_beam.typehints import with_input_types from apache_beam.typehints import with_output_types @@ -131,13 +187,18 @@ try: from google.cloud.spanner import Client from google.cloud.spanner import KeySet + from google.cloud.spanner_v1 import batch from google.cloud.spanner_v1.database import BatchSnapshot + from google.cloud.spanner_v1.proto.mutation_pb2 import Mutation except ImportError: Client = None KeySet = None BatchSnapshot = None -__all__ = ['create_transaction', 'ReadFromSpanner', 'ReadOperation'] +__all__ = [ + 'create_transaction', 'ReadFromSpanner', 'ReadOperation', 'WriteToSpanner', + 'WriteMutation', 'MutationGroup' +] class _SPANNER_TRANSACTION(namedtuple("SPANNER_TRANSACTION", ["transaction"])): @@ -224,6 +285,7 @@ def snapshot_options(self): snapshot_options['read_timestamp'] = self.snapshot_read_timestamp return snapshot_options + @with_input_types(ReadOperation, typing.Dict[typing.Any, typing.Any]) @with_output_types(typing.List[typing.Any]) class _NaiveSpannerReadDoFn(DoFn): @@ -414,6 +476,7 @@ def create_transaction(pbegin, project_id, instance_id, database_id, pool, read_timestamp, exact_staleness))) + @with_input_types(typing.Dict[typing.Any, typing.Any]) @with_output_types(typing.List[typing.Any]) class _ReadFromPartitionFn(DoFn): @@ -581,3 +644,369 @@ def display_data(self): label='transaction') return res + + +@experimental(extra_message="No backwards-compatibility guarantees.") +class WriteToSpanner(PTransform): + + def __init__(self, project_id, instance_id, database_id, pool=None, + credentials=None, max_batch_size_bytes=1048576): + """ + A PTransform to write onto Google Cloud Spanner. + + Args: + project_id: Cloud spanner project id. Be sure to use the Project ID, + not the Project Number. + instance_id: Cloud spanner instance id. + database_id: Cloud spanner database id. + max_batch_size_bytes: (optional) Split the mutation into batches to + reduce the number of transaction sent to Spanner. By default it is + set to 1 MB (1048576 Bytes). + """ + self._configuration = _BeamSpannerConfiguration( + project=project_id, instance=instance_id, database=database_id, + credentials=credentials, pool=pool, snapshot_read_timestamp=None, + snapshot_exact_staleness=None + ) + self._max_batch_size_bytes = max_batch_size_bytes + self._database_id = database_id + self._project_id = project_id + self._instance_id = instance_id + self._pool = pool + + def display_data(self): + res = { + 'project_id': DisplayDataItem(self._project_id, label='Project Id'), + 'instance_id': DisplayDataItem(self._instance_id, label='Instance Id'), + 'pool': DisplayDataItem(str(self._pool), label='Pool'), + 'database': DisplayDataItem(self._database_id, label='Database'), + 'batch_size': DisplayDataItem(self._max_batch_size_bytes, + label="Batch Size"), + } + return res + + def expand(self, pcoll): + return (pcoll + | "make batches" >> + _WriteGroup(max_batch_size_bytes=self._max_batch_size_bytes) + | 'Writing to spanner' >> ParDo( + _WriteToSpannerDoFn(self._configuration))) + + +class _Mutator(namedtuple('_Mutator', ["mutation", "operation", "kwargs"])): + __slots__ = () + + @property + def byte_size(self): + return self.mutation.ByteSize() + + +class MutationGroup(deque): + """ + A Bundle of Spanner Mutations (_Mutator). + """ + + @property + def byte_size(self): + s = 0 + for m in self.__iter__(): + s += m.byte_size + return s + + def primary(self): + return next(self.__iter__()) + + +class WriteMutation(object): + + _OPERATION_DELETE = "delete" + _OPERATION_INSERT = "insert" + _OPERATION_INSERT_OR_UPDATE = "insert_or_update" + _OPERATION_REPLACE = "replace" + _OPERATION_UPDATE = "update" + + def __init__(self, + insert=None, + update=None, + insert_or_update=None, + replace=None, + delete=None, + columns=None, + values=None, + keyset=None): + """ + A convenient class to create Spanner Mutations for Write. User can provide + the operation via constructor or via static methods. + + Note: If a user passing the operation via construction, make sure that it + will only accept one operation at a time. For example, if a user passing + a table name in the `insert` parameter, and he also passes the `update` + parameter value, this will cause an error. + + Args: + insert: (Optional) Name of the table in which rows will be inserted. + update: (Optional) Name of the table in which existing rows will be + updated. + insert_or_update: (Optional) Table name in which rows will be written. + Like insert, except that if the row already exists, then its column + values are overwritten with the ones provided. Any column values not + explicitly written are preserved. + replace: (Optional) Table name in which rows will be replaced. Like + insert, except that if the row already exists, it is deleted, and the + column values provided are inserted instead. Unlike `insert_or_update`, + this means any values not explicitly written become `NULL`. + delete: (Optional) Table name from which rows will be deleted. Succeeds + whether or not the named rows were present. + columns: The names of the columns in table to be written. The list of + columns must contain enough columns to allow Cloud Spanner to derive + values for all primary key columns in the row(s) to be modified. + values: The values to be written. `values` can contain more than one + list of values. If it does, then multiple rows are written, one for + each entry in `values`. Each list in `values` must have exactly as + many entries as there are entries in columns above. Sending multiple + lists is equivalent to sending multiple Mutations, each containing one + `values` entry and repeating table and columns. + keyset: (Optional) The primary keys of the rows within table to delete. + Delete is idempotent. The transaction will succeed even if some or + all rows do not exist. + """ + self._columns = columns + self._values = values + self._keyset = keyset + + self._insert = insert + self._update = update + self._insert_or_update = insert_or_update + self._replace = replace + self._delete = delete + + if sum([ + 1 for x in [self._insert, self._update, self._insert_or_update, + self._replace, self._delete] + if x is not None + ]) != 1: + raise ValueError("No or more than one write mutation operation " + "provided: <%s: %s>" % (self.__class__.__name__, + str(self.__dict__))) + + def __call__(self, *args, **kwargs): + if self._insert is not None: + return WriteMutation.insert( + table=self._insert, columns=self._columns, values=self._values) + elif self._update is not None: + return WriteMutation.update( + table=self._update, columns=self._columns, values=self._values) + elif self._insert_or_update is not None: + return WriteMutation.insert_or_update( + table=self._insert_or_update, + columns=self._columns, + values=self._values) + elif self._replace is not None: + return WriteMutation.replace( + table=self._replace, columns=self._columns, values=self._values) + elif self._delete is not None: + return WriteMutation.delete(table=self._delete, keyset=self._keyset) + + @staticmethod + def insert(table, columns, values): + """Insert one or more new table rows. + + Args: + table: Name of the table to be modified. + columns: Name of the table columns to be modified. + values: Values to be modified. + """ + return _Mutator( + mutation=Mutation(insert=batch._make_write_pb(table, columns, values)), + operation=WriteMutation._OPERATION_INSERT, kwargs={ + "table": table, "columns": columns, "values": values}) + + @staticmethod + def update(table, columns, values): + """Update one or more existing table rows. + + Args: + table: Name of the table to be modified. + columns: Name of the table columns to be modified. + values: Values to be modified. + """ + return _Mutator( + mutation=Mutation(update=batch._make_write_pb(table, columns, values)), + operation=WriteMutation._OPERATION_UPDATE, kwargs={ + "table": table, "columns": columns, "values": values}) + @staticmethod + def insert_or_update(table, columns, values): + """Insert/update one or more table rows. + Args: + table: Name of the table to be modified. + columns: Name of the table columns to be modified. + values: Values to be modified. + """ + return _Mutator( + mutation=Mutation( + insert_or_update=batch._make_write_pb(table, columns, values)), + operation=WriteMutation._OPERATION_INSERT_OR_UPDATE, kwargs={ + "table": table, "columns": columns, "values": values}) + + @staticmethod + def replace(table, columns, values): + """Replace one or more table rows. + + Args: + table: Name of the table to be modified. + columns: Name of the table columns to be modified. + values: Values to be modified. + """ + return _Mutator( + mutation=Mutation(replace=batch._make_write_pb(table, columns, values)), + operation=WriteMutation._OPERATION_REPLACE, kwargs={ + "table": table, "columns": columns, "values": values}) + + @staticmethod + def delete(table, keyset): + """Delete one or more table rows. + + Args: + table: Name of the table to be modified. + keyset: Keys/ranges identifying rows to delete. + """ + delete = Mutation.Delete(table=table, key_set=keyset._to_pb()) + return _Mutator(mutation=Mutation(delete=delete), + operation=WriteMutation._OPERATION_DELETE, + kwargs={"table": table, "keyset": keyset}) + + +@with_input_types(typing.Union[MutationGroup, TaggedOutput]) +@with_output_types(MutationGroup) +class _BatchFn(DoFn): + """ + Batches mutations together. + """ + + def __init__(self, max_batch_size_bytes): + self._max_batch_size_bytes = max_batch_size_bytes + + def start_bundle(self): + self._batch = MutationGroup() + self._size_in_bytes = 0 + + def process(self, element): + _max_bytes = self._max_batch_size_bytes + mg_size = element.byte_size # total size of the mutation group. + + if mg_size + self._size_in_bytes > _max_bytes: + # Batch is full, output the batch and resetting the count. + yield self._batch + self._size_in_bytes = 0 + self._batch = MutationGroup() + + self._batch.extend(element) + self._size_in_bytes += mg_size + + def finish_bundle(self): + if self._batch is not None: + yield window.GlobalWindows.windowed_value(self._batch) + self._batch = None + + +@with_input_types(MutationGroup) +@with_output_types(MutationGroup) +class _BatchableFilterFn(DoFn): + """ + Filters MutationGroups larger than the batch size to the output tagged with + OUTPUT_TAG_UNBATCHABLE. + """ + OUTPUT_TAG_UNBATCHABLE = 'unbatchable' + + def __init__(self, max_batch_size_bytes): + self._max_batch_size_bytes = max_batch_size_bytes + self._batchable = None + self._unbatchable = None + + def process(self, element): + if element.primary().operation == 'delete': + # As delete mutations are not batchable. + yield TaggedOutput(_BatchableFilterFn.OUTPUT_TAG_UNBATCHABLE, element) + else: + _max_bytes = self._max_batch_size_bytes + mg = element + mg_size = mg.byte_size + if mg_size > _max_bytes: + yield TaggedOutput(_BatchableFilterFn.OUTPUT_TAG_UNBATCHABLE, element) + else: + yield element + + +class _WriteToSpannerDoFn(DoFn): + + def __init__(self, spanner_configuration): + self._spanner_configuration = spanner_configuration + self._db_instance = None + self.batches = Metrics.counter(self.__class__, 'SpannerBatches') + + def setup(self): + spanner_client = Client(self._spanner_configuration.project) + instance = spanner_client.instance(self._spanner_configuration.instance) + self._db_instance = instance.database( + self._spanner_configuration.database, + pool=self._spanner_configuration.pool) + + def process(self, element): + self.batches.inc() + with self._db_instance.batch() as b: + for m in element: + if m.operation == WriteMutation._OPERATION_DELETE: + batch_func = b.delete + elif m.operation == WriteMutation._OPERATION_REPLACE: + batch_func = b.replace + elif m.operation == WriteMutation._OPERATION_INSERT_OR_UPDATE: + batch_func = b.insert_or_update + elif m.operation == WriteMutation._OPERATION_INSERT: + batch_func = b.insert + elif m.operation == WriteMutation._OPERATION_UPDATE: + batch_func = b.update + else: + raise ValueError("Unknown operation action: %s" % m.operation) + + batch_func(**m.kwargs) + + +@with_input_types(typing.Union[MutationGroup, _Mutator]) +@with_output_types(MutationGroup) +class _MakeMutationGroupsFn(DoFn): + """ + Make Mutation group object if the element is the instance of _Mutator. + """ + + def process(self, element): + if isinstance(element, MutationGroup): + yield element + elif isinstance(element, _Mutator): + yield MutationGroup([element]) + else: + raise ValueError( + "Invalid object type: %s. Object must be an instance of " + "MutationGroup or WriteMutations" % str(element)) + + +class _WriteGroup(PTransform): + + def __init__(self, max_batch_size_bytes=1024): + self._max_batch_size_bytes = max_batch_size_bytes + + def expand(self, pcoll): + filter_batchable_mutations = ( + pcoll + | 'Making mutation groups' >> ParDo(_MakeMutationGroupsFn()) + | 'Filtering Batchable Mutations' >> ParDo( + _BatchableFilterFn(self._max_batch_size_bytes)).with_outputs( + _BatchableFilterFn.OUTPUT_TAG_UNBATCHABLE, main='batchable')) + + batching_batchables = ( + filter_batchable_mutations['batchable'] + | ParDo(_BatchFn(self._max_batch_size_bytes))) + + return ( + (batching_batchables, + filter_batchable_mutations[_BatchableFilterFn.OUTPUT_TAG_UNBATCHABLE]) + | 'Merging batchable and unbatchable' >> Flatten()) diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py index be838f4e97a9..116e9eb46e21 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py @@ -26,21 +26,27 @@ import mock import apache_beam as beam +from apache_beam.metrics.metric import MetricsFilter from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to # Protect against environments where spanner library is not available. # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports +# pylint: disable=unused-import try: from google.cloud import spanner - from apache_beam.io.gcp.experimental.spannerio import (create_transaction, - ReadOperation, - ReadFromSpanner) # pylint: disable=unused-import - # disable=unused-import + from apache_beam.io.gcp.experimental.spannerio import create_transaction + from apache_beam.io.gcp.experimental.spannerio import ReadOperation + from apache_beam.io.gcp.experimental.spannerio import ReadFromSpanner + from apache_beam.io.gcp.experimental.spannerio import WriteMutation + from apache_beam.io.gcp.experimental.spannerio import MutationGroup + from apache_beam.io.gcp.experimental.spannerio import WriteToSpanner + from apache_beam.io.gcp.experimental.spannerio import _BatchFn except ImportError: spanner = None # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports +# pylint: enable=unused-import MAX_DB_NAME_LENGTH = 30 @@ -333,6 +339,147 @@ def test_display_data(self, *args): self.assertTrue("transaction" in dd_transaction) +@unittest.skipIf(spanner is None, 'GCP dependencies are not installed.') +@mock.patch('apache_beam.io.gcp.experimental.spannerio.Client') +@mock.patch('google.cloud.spanner_v1.database.BatchCheckout') +class SpannerWriteTest(unittest.TestCase): + + def test_spanner_write(self, mock_batch_snapshot_class, mock_batch_checkout): + ks = spanner.KeySet(keys=[[1233], [1234]]) + + mutations = [ + WriteMutation.delete("roles", ks), + WriteMutation.insert("roles", ("key", "rolename"), + [('1233', "mutations-inset-1233")]), + WriteMutation.insert("roles", ("key", "rolename"), + [('1234', "mutations-inset-1234")]), + WriteMutation.update("roles", ("key", "rolename"), + [('1234', "mutations-inset-1233-updated")]), + ] + + p = TestPipeline() + _ = ( + p + | beam.Create(mutations) + | WriteToSpanner( + project_id=TEST_PROJECT_ID, + instance_id=TEST_INSTANCE_ID, + database_id=_generate_database_name(), + max_batch_size_bytes=1024) + ) + res = p.run() + res.wait_until_finish() + + metric_results = res.metrics().query( + MetricsFilter().with_name("SpannerBatches")) + batches_counter = metric_results['counters'][0] + + self.assertEqual(batches_counter.committed, 2) + self.assertEqual(batches_counter.attempted, 2) + + def test_spanner_bundles_size(self, mock_batch_snapshot_class, + mock_batch_checkout): + ks = spanner.KeySet(keys=[[1233], [1234]]) + mutations = [ + WriteMutation.delete("roles", ks), + WriteMutation.insert("roles", ("key", "rolename"), + [('1234', "mutations-inset-1234")]) + ] * 50 + p = TestPipeline() + _ = ( + p + | beam.Create(mutations) + | WriteToSpanner( + project_id=TEST_PROJECT_ID, + instance_id=TEST_INSTANCE_ID, + database_id=_generate_database_name(), + max_batch_size_bytes=1024) + ) + res = p.run() + res.wait_until_finish() + + metric_results = res.metrics().query( + MetricsFilter().with_name('SpannerBatches')) + batches_counter = metric_results['counters'][0] + + self.assertEqual(batches_counter.committed, 53) + self.assertEqual(batches_counter.attempted, 53) + + def test_spanner_write_mutation_groups(self, mock_batch_snapshot_class, + mock_batch_checkout): + ks = spanner.KeySet(keys=[[1233], [1234]]) + mutation_groups = [ + MutationGroup([ + WriteMutation.insert("roles", ("key", "rolename"), + [('9001233', "mutations-inset-1233")]), + WriteMutation.insert("roles", ("key", "rolename"), + [('9001234', "mutations-inset-1234")]) + ]), + MutationGroup([ + WriteMutation.update( + "roles", ("key", "rolename"), + [('9001234', "mutations-inset-9001233-updated")]) + ]), + MutationGroup([WriteMutation.delete("roles", ks)]) + ] + + p = TestPipeline() + _ = ( + p + | beam.Create(mutation_groups) + | WriteToSpanner( + project_id=TEST_PROJECT_ID, + instance_id=TEST_INSTANCE_ID, + database_id=_generate_database_name(), + max_batch_size_bytes=100) + ) + res = p.run() + res.wait_until_finish() + + metric_results = res.metrics().query( + MetricsFilter().with_name('SpannerBatches')) + batches_counter = metric_results['counters'][0] + + self.assertEqual(batches_counter.committed, 3) + self.assertEqual(batches_counter.attempted, 3) + + def test_mutation_group_batching(self, mock_batch_snapshot_class, + mock_batch_checkout): + + # each mutation group byte size is 58 bytes. + mutation_group = [MutationGroup([ + WriteMutation.insert("roles", ("key", "rolename"), + [('1234', "mutations-inset-1234")])])] * 50 + + with TestPipeline() as p: + # the total 50 mutation gorup size will be 2900 (58 * 50) + # if we want to make two batches, so batch size should be 1450 (2900 / 2) + # and each bach should contains 25 mutations. + res = ( + p | beam.Create(mutation_group) | beam.ParDo(_BatchFn(1450)) + | beam.Map(lambda x: len(x)) + ) + assert_that(res, equal_to([25, 25])) + + def test_write_mutation_error(self, *args): + with self.assertRaises(ValueError): + # since `WriteMutation` only accept one operation. + WriteMutation(insert="table-name", update="table-name") + + def test_display_data(self, *args): + data = WriteToSpanner( + project_id=TEST_PROJECT_ID, + instance_id=TEST_INSTANCE_ID, + database_id=_generate_database_name(), + max_batch_size_bytes=1024 + ).display_data() + self.assertTrue("project_id" in data) + self.assertTrue("instance_id" in data) + self.assertTrue("pool" in data) + self.assertTrue("database" in data) + self.assertTrue("batch_size" in data) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() From fc0c5cfdd734e3f2215e353e4078339e36dff659 Mon Sep 17 00:00:00 2001 From: Shoaib Zafar Date: Thu, 30 Jan 2020 15:59:19 +0500 Subject: [PATCH 2/6] Fix typo in _WriteGroup class. --- sdks/python/apache_beam/io/gcp/experimental/spannerio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py index 42329db998b8..e293278d512f 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py @@ -991,7 +991,7 @@ def process(self, element): class _WriteGroup(PTransform): - def __init__(self, max_batch_size_bytes=1024): + def __init__(self, max_batch_size_bytes=1048576): self._max_batch_size_bytes = max_batch_size_bytes def expand(self, pcoll): From 2e31bb0114a96ee27a1545e007eb4d44266ea4d9 Mon Sep 17 00:00:00 2001 From: Shoaib Date: Mon, 10 Feb 2020 21:31:33 +0500 Subject: [PATCH 3/6] added two batching parameters (max_number_rows, max_number_cells) --- .../io/gcp/experimental/spannerio.py | 272 ++++++++++++------ .../io/gcp/experimental/spannerio_test.py | 165 ++++++++--- 2 files changed, 313 insertions(+), 124 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py index 3a7c1e727913..5b2ee588186f 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py @@ -146,20 +146,25 @@ For more information, review the docs available on WriteMutation class. -WriteToSpanner transform also takes 'max_batch_size_bytes' param which is set -to 1MB (1048576 bytes) by default. This parameter used to reduce the number of +WriteToSpanner transform also takes three batching parameters (max_number_rows, +max_number_cells and max_batch_size_bytes). By default, max_number_rows is set +to 50 rows, max_number_cells is set to 500 cells and max_batch_size_bytes is +set to 1MB (1048576 bytes). These parameter used to reduce the number of transactions sent to spanner by grouping the mutation into batches. Setting -this either to smaller value or zero to disable batching. +these param values either to smaller value or zero to disable batching. WriteToSpanner transforms starts with the grouping into batches. The first step in this process is to make the make the mutation groups of the WriteMutation objects and then filtering them into batchable and unbatchable mutation -groups by getting the mutation size from the method available in the -`google.cloud.spanner_v1.proto.mutation_pb2.Mutation.ByteSize`, if the size is -smaller than value of "max_batch_size_bytes" param, it will be tagged as -"unbatchable" mutation. After this all the batchable mutation are merged into a -single mutation group whos size is not larger than the "max_batch_size_bytes", -after this process, all the mutation groups flatten together to process. +groups. There are three batching parameters (max_number_cells, max_number_rows +& max_batch_size_bytes). We calculated th mutation byte size from the method +available in the `google.cloud.spanner_v1.proto.mutation_pb2.Mutation.ByteSize`. +if the mutation rows, cells or byte size are larger than value of the any +batching parameters param, it will be tagged as "unbatchable" mutation. After +this all the batchable mutation are merged into a single mutation group whos +size is not larger than the "max_batch_size_bytes", after this process, all the +mutation groups together to process. If the Mutation references a table or +column does not exits, it will cause a exception and fails the entire pipeline. """ from __future__ import absolute_import @@ -196,8 +201,12 @@ BatchSnapshot = None __all__ = [ - 'create_transaction', 'ReadFromSpanner', 'ReadOperation', 'WriteToSpanner', - 'WriteMutation', 'MutationGroup' + 'create_transaction', + 'ReadFromSpanner', + 'ReadOperation', + 'WriteToSpanner', + 'WriteMutation', + 'MutationGroup' ] @@ -506,7 +515,6 @@ def create_transaction( exact_staleness))) - @with_input_types(typing.Dict[typing.Any, typing.Any]) @with_output_types(typing.List[typing.Any]) class _ReadFromPartitionFn(DoFn): @@ -685,9 +693,16 @@ def display_data(self): @experimental(extra_message="No backwards-compatibility guarantees.") class WriteToSpanner(PTransform): - - def __init__(self, project_id, instance_id, database_id, pool=None, - credentials=None, max_batch_size_bytes=1048576): + def __init__( + self, + project_id, + instance_id, + database_id, + pool=None, + credentials=None, + max_batch_size_bytes=1048576, + max_number_rows=50, + max_number_cells=500): """ A PTransform to write onto Google Cloud Spanner. @@ -696,16 +711,27 @@ def __init__(self, project_id, instance_id, database_id, pool=None, not the Project Number. instance_id: Cloud spanner instance id. database_id: Cloud spanner database id. - max_batch_size_bytes: (optional) Split the mutation into batches to + max_batch_size_bytes: (optional) Split the mutations into batches to reduce the number of transaction sent to Spanner. By default it is set to 1 MB (1048576 Bytes). + max_number_rows: (optional) Split the mutations into batches to + reduce the number of transaction sent to Spanner. By default it is + set to 50 rows per batch. + max_number_cells: (optional) Split the mutations into batches to + reduce the number of transaction sent to Spanner. By default it is + set to 500 cells per batch. """ self._configuration = _BeamSpannerConfiguration( - project=project_id, instance=instance_id, database=database_id, - credentials=credentials, pool=pool, snapshot_read_timestamp=None, - snapshot_exact_staleness=None - ) + project=project_id, + instance=instance_id, + database=database_id, + credentials=credentials, + pool=pool, + snapshot_read_timestamp=None, + snapshot_exact_staleness=None) self._max_batch_size_bytes = max_batch_size_bytes + self._max_number_rows = max_number_rows + self._max_number_cells = max_number_cells self._database_id = database_id self._project_id = project_id self._instance_id = instance_id @@ -717,20 +743,29 @@ def display_data(self): 'instance_id': DisplayDataItem(self._instance_id, label='Instance Id'), 'pool': DisplayDataItem(str(self._pool), label='Pool'), 'database': DisplayDataItem(self._database_id, label='Database'), - 'batch_size': DisplayDataItem(self._max_batch_size_bytes, - label="Batch Size"), + 'batch_size': DisplayDataItem( + self._max_batch_size_bytes, label="Batch Size"), + 'max_number_rows': DisplayDataItem( + self._max_number_rows, label="Max Rows"), + 'max_number_cells': DisplayDataItem( + self._max_number_cells, label="Max Cells"), } return res def expand(self, pcoll): - return (pcoll - | "make batches" >> - _WriteGroup(max_batch_size_bytes=self._max_batch_size_bytes) - | 'Writing to spanner' >> ParDo( - _WriteToSpannerDoFn(self._configuration))) + return ( + pcoll + | "make batches" >> _WriteGroup( + max_batch_size_bytes=self._max_batch_size_bytes, + max_number_rows=self._max_number_rows, + max_number_cells=self._max_number_cells) + | + 'Writing to spanner' >> ParDo(_WriteToSpannerDoFn(self._configuration))) -class _Mutator(namedtuple('_Mutator', ["mutation", "operation", "kwargs"])): +class _Mutator(namedtuple('_Mutator', + ["mutation", "operation", "kwargs", "rows", "cells"]) + ): __slots__ = () @property @@ -742,13 +777,16 @@ class MutationGroup(deque): """ A Bundle of Spanner Mutations (_Mutator). """ - @property - def byte_size(self): - s = 0 + def info(self): + cells = 0 + rows = 0 + bytes = 0 for m in self.__iter__(): - s += m.byte_size - return s + bytes += m.byte_size + rows += m.rows + cells += m.cells + return {"rows": rows, "cells": cells, "byte_size": bytes} def primary(self): return next(self.__iter__()) @@ -762,15 +800,16 @@ class WriteMutation(object): _OPERATION_REPLACE = "replace" _OPERATION_UPDATE = "update" - def __init__(self, - insert=None, - update=None, - insert_or_update=None, - replace=None, - delete=None, - columns=None, - values=None, - keyset=None): + def __init__( + self, + insert=None, + update=None, + insert_or_update=None, + replace=None, + delete=None, + columns=None, + values=None, + keyset=None): """ A convenient class to create Spanner Mutations for Write. User can provide the operation via constructor or via static methods. @@ -817,14 +856,14 @@ def __init__(self, self._replace = replace self._delete = delete - if sum([ - 1 for x in [self._insert, self._update, self._insert_or_update, - self._replace, self._delete] - if x is not None - ]) != 1: - raise ValueError("No or more than one write mutation operation " - "provided: <%s: %s>" % (self.__class__.__name__, - str(self.__dict__))) + if sum([1 for x in [self._insert, + self._update, + self._insert_or_update, + self._replace, + self._delete] if x is not None]) != 1: + raise ValueError( + "No or more than one write mutation operation " + "provided: <%s: %s>" % (self.__class__.__name__, str(self.__dict__))) def __call__(self, *args, **kwargs): if self._insert is not None: @@ -853,10 +892,16 @@ def insert(table, columns, values): columns: Name of the table columns to be modified. values: Values to be modified. """ + rows = len(values) + cells = len(columns) * len(values) return _Mutator( mutation=Mutation(insert=batch._make_write_pb(table, columns, values)), - operation=WriteMutation._OPERATION_INSERT, kwargs={ - "table": table, "columns": columns, "values": values}) + operation=WriteMutation._OPERATION_INSERT, + rows=rows, + cells=cells, + kwargs={ + "table": table, "columns": columns, "values": values + }) @staticmethod def update(table, columns, values): @@ -867,10 +912,17 @@ def update(table, columns, values): columns: Name of the table columns to be modified. values: Values to be modified. """ + rows = len(values) + cells = len(columns) * len(values) return _Mutator( mutation=Mutation(update=batch._make_write_pb(table, columns, values)), - operation=WriteMutation._OPERATION_UPDATE, kwargs={ - "table": table, "columns": columns, "values": values}) + operation=WriteMutation._OPERATION_UPDATE, + rows=rows, + cells=cells, + kwargs={ + "table": table, "columns": columns, "values": values + }) + @staticmethod def insert_or_update(table, columns, values): """Insert/update one or more table rows. @@ -879,11 +931,17 @@ def insert_or_update(table, columns, values): columns: Name of the table columns to be modified. values: Values to be modified. """ + rows = len(values) + cells = len(columns) * len(values) return _Mutator( mutation=Mutation( insert_or_update=batch._make_write_pb(table, columns, values)), - operation=WriteMutation._OPERATION_INSERT_OR_UPDATE, kwargs={ - "table": table, "columns": columns, "values": values}) + operation=WriteMutation._OPERATION_INSERT_OR_UPDATE, + rows=rows, + cells=cells, + kwargs={ + "table": table, "columns": columns, "values": values + }) @staticmethod def replace(table, columns, values): @@ -894,10 +952,16 @@ def replace(table, columns, values): columns: Name of the table columns to be modified. values: Values to be modified. """ + rows = len(values) + cells = len(columns) * len(values) return _Mutator( mutation=Mutation(replace=batch._make_write_pb(table, columns, values)), - operation=WriteMutation._OPERATION_REPLACE, kwargs={ - "table": table, "columns": columns, "values": values}) + operation=WriteMutation._OPERATION_REPLACE, + rows=rows, + cells=cells, + kwargs={ + "table": table, "columns": columns, "values": values + }) @staticmethod def delete(table, keyset): @@ -908,9 +972,14 @@ def delete(table, keyset): keyset: Keys/ranges identifying rows to delete. """ delete = Mutation.Delete(table=table, key_set=keyset._to_pb()) - return _Mutator(mutation=Mutation(delete=delete), - operation=WriteMutation._OPERATION_DELETE, - kwargs={"table": table, "keyset": keyset}) + return _Mutator( + mutation=Mutation(delete=delete), + rows=0, + cells=0, + operation=WriteMutation._OPERATION_DELETE, + kwargs={ + "table": table, "keyset": keyset + }) @with_input_types(typing.Union[MutationGroup, TaggedOutput]) @@ -919,26 +988,44 @@ class _BatchFn(DoFn): """ Batches mutations together. """ - - def __init__(self, max_batch_size_bytes): + def __init__(self, max_batch_size_bytes, max_number_rows, max_number_cells): self._max_batch_size_bytes = max_batch_size_bytes + self._max_number_rows = max_number_rows + self._max_number_cells = max_number_cells def start_bundle(self): self._batch = MutationGroup() self._size_in_bytes = 0 + self._rows = 0 + self._cells = 0 + + def _reset_count(self): + self._batch = MutationGroup() + self._size_in_bytes = 0 + self._rows = 0 + self._cells = 0 def process(self, element): - _max_bytes = self._max_batch_size_bytes - mg_size = element.byte_size # total size of the mutation group. + mg_info = element.info - if mg_size + self._size_in_bytes > _max_bytes: + if mg_info['byte_size'] + self._size_in_bytes > self._max_batch_size_bytes \ + or mg_info['cells'] + self._cells > self._max_number_cells \ + or mg_info['rows'] + self._rows > self._max_number_rows: # Batch is full, output the batch and resetting the count. - yield self._batch - self._size_in_bytes = 0 - self._batch = MutationGroup() + if self._batch: + yield self._batch + self._reset_count() self._batch.extend(element) - self._size_in_bytes += mg_size + + # total byte size of the mutation group. + self._size_in_bytes += mg_info['byte_size'] + + # total rows in the mutation group. + self._rows += mg_info['rows'] + + # total cells in the mutation group. + self._cells += mg_info['cells'] def finish_bundle(self): if self._batch is not None: @@ -955,27 +1042,28 @@ class _BatchableFilterFn(DoFn): """ OUTPUT_TAG_UNBATCHABLE = 'unbatchable' - def __init__(self, max_batch_size_bytes): + def __init__(self, max_batch_size_bytes, max_number_rows, max_number_cells): self._max_batch_size_bytes = max_batch_size_bytes + self._max_number_rows = max_number_rows + self._max_number_cells = max_number_cells self._batchable = None self._unbatchable = None def process(self, element): - if element.primary().operation == 'delete': + if element.primary().operation == WriteMutation._OPERATION_DELETE: # As delete mutations are not batchable. yield TaggedOutput(_BatchableFilterFn.OUTPUT_TAG_UNBATCHABLE, element) else: - _max_bytes = self._max_batch_size_bytes - mg = element - mg_size = mg.byte_size - if mg_size > _max_bytes: + mg_info = element.info + if mg_info['byte_size'] > self._max_batch_size_bytes \ + or mg_info['cells'] > self._max_number_cells \ + or mg_info['rows'] > self._max_number_rows: yield TaggedOutput(_BatchableFilterFn.OUTPUT_TAG_UNBATCHABLE, element) else: yield element class _WriteToSpannerDoFn(DoFn): - def __init__(self, spanner_configuration): self._spanner_configuration = spanner_configuration self._db_instance = None @@ -1014,7 +1102,6 @@ class _MakeMutationGroupsFn(DoFn): """ Make Mutation group object if the element is the instance of _Mutator. """ - def process(self, element): if isinstance(element, MutationGroup): yield element @@ -1027,23 +1114,32 @@ def process(self, element): class _WriteGroup(PTransform): - - def __init__(self, max_batch_size_bytes=1048576): + def __init__(self, max_batch_size_bytes, max_number_rows, max_number_cells): self._max_batch_size_bytes = max_batch_size_bytes + self._max_number_rows = max_number_rows + self._max_number_cells = max_number_cells def expand(self, pcoll): filter_batchable_mutations = ( pcoll | 'Making mutation groups' >> ParDo(_MakeMutationGroupsFn()) | 'Filtering Batchable Mutations' >> ParDo( - _BatchableFilterFn(self._max_batch_size_bytes)).with_outputs( - _BatchableFilterFn.OUTPUT_TAG_UNBATCHABLE, main='batchable')) + _BatchableFilterFn( + max_batch_size_bytes=self._max_batch_size_bytes, + max_number_rows=self._max_number_rows, + max_number_cells=self._max_number_cells)).with_outputs( + _BatchableFilterFn.OUTPUT_TAG_UNBATCHABLE, main='batchable') + ) batching_batchables = ( filter_batchable_mutations['batchable'] - | ParDo(_BatchFn(self._max_batch_size_bytes))) - - return ( - (batching_batchables, - filter_batchable_mutations[_BatchableFilterFn.OUTPUT_TAG_UNBATCHABLE]) - | 'Merging batchable and unbatchable' >> Flatten()) + | ParDo( + _BatchFn( + max_batch_size_bytes=self._max_batch_size_bytes, + max_number_rows=self._max_number_rows, + max_number_cells=self._max_number_cells))) + + return (( + batching_batchables, + filter_batchable_mutations[_BatchableFilterFn.OUTPUT_TAG_UNBATCHABLE]) + | 'Merging batchable and unbatchable' >> Flatten()) diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py index 4e2f90015795..001e1c75a860 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py @@ -382,18 +382,18 @@ def test_display_data(self, *args): @mock.patch('apache_beam.io.gcp.experimental.spannerio.Client') @mock.patch('google.cloud.spanner_v1.database.BatchCheckout') class SpannerWriteTest(unittest.TestCase): - def test_spanner_write(self, mock_batch_snapshot_class, mock_batch_checkout): ks = spanner.KeySet(keys=[[1233], [1234]]) mutations = [ WriteMutation.delete("roles", ks), - WriteMutation.insert("roles", ("key", "rolename"), - [('1233', "mutations-inset-1233")]), - WriteMutation.insert("roles", ("key", "rolename"), - [('1234', "mutations-inset-1234")]), - WriteMutation.update("roles", ("key", "rolename"), - [('1234', "mutations-inset-1233-updated")]), + WriteMutation.insert( + "roles", ("key", "rolename"), [('1233', "mutations-inset-1233")]), + WriteMutation.insert( + "roles", ("key", "rolename"), [('1234', "mutations-inset-1234")]), + WriteMutation.update( + "roles", ("key", "rolename"), + [('1234', "mutations-inset-1233-updated")]), ] p = TestPipeline() @@ -404,8 +404,7 @@ def test_spanner_write(self, mock_batch_snapshot_class, mock_batch_checkout): project_id=TEST_PROJECT_ID, instance_id=TEST_INSTANCE_ID, database_id=_generate_database_name(), - max_batch_size_bytes=1024) - ) + max_batch_size_bytes=1024)) res = p.run() res.wait_until_finish() @@ -416,13 +415,13 @@ def test_spanner_write(self, mock_batch_snapshot_class, mock_batch_checkout): self.assertEqual(batches_counter.committed, 2) self.assertEqual(batches_counter.attempted, 2) - def test_spanner_bundles_size(self, mock_batch_snapshot_class, - mock_batch_checkout): + def test_spanner_bundles_size( + self, mock_batch_snapshot_class, mock_batch_checkout): ks = spanner.KeySet(keys=[[1233], [1234]]) mutations = [ WriteMutation.delete("roles", ks), - WriteMutation.insert("roles", ("key", "rolename"), - [('1234', "mutations-inset-1234")]) + WriteMutation.insert( + "roles", ("key", "rolename"), [('1234', "mutations-inset-1234")]) ] * 50 p = TestPipeline() _ = ( @@ -432,8 +431,7 @@ def test_spanner_bundles_size(self, mock_batch_snapshot_class, project_id=TEST_PROJECT_ID, instance_id=TEST_INSTANCE_ID, database_id=_generate_database_name(), - max_batch_size_bytes=1024) - ) + max_batch_size_bytes=1024)) res = p.run() res.wait_until_finish() @@ -444,15 +442,17 @@ def test_spanner_bundles_size(self, mock_batch_snapshot_class, self.assertEqual(batches_counter.committed, 53) self.assertEqual(batches_counter.attempted, 53) - def test_spanner_write_mutation_groups(self, mock_batch_snapshot_class, - mock_batch_checkout): + def test_spanner_write_mutation_groups( + self, mock_batch_snapshot_class, mock_batch_checkout): ks = spanner.KeySet(keys=[[1233], [1234]]) mutation_groups = [ MutationGroup([ - WriteMutation.insert("roles", ("key", "rolename"), - [('9001233', "mutations-inset-1233")]), - WriteMutation.insert("roles", ("key", "rolename"), - [('9001234', "mutations-inset-1234")]) + WriteMutation.insert( + "roles", ("key", "rolename"), + [('9001233', "mutations-inset-1233")]), + WriteMutation.insert( + "roles", ("key", "rolename"), + [('9001234', "mutations-inset-1234")]) ]), MutationGroup([ WriteMutation.update( @@ -470,8 +470,7 @@ def test_spanner_write_mutation_groups(self, mock_batch_snapshot_class, project_id=TEST_PROJECT_ID, instance_id=TEST_INSTANCE_ID, database_id=_generate_database_name(), - max_batch_size_bytes=100) - ) + max_batch_size_bytes=100)) res = p.run() res.wait_until_finish() @@ -482,23 +481,116 @@ def test_spanner_write_mutation_groups(self, mock_batch_snapshot_class, self.assertEqual(batches_counter.committed, 3) self.assertEqual(batches_counter.attempted, 3) - def test_mutation_group_batching(self, mock_batch_snapshot_class, - mock_batch_checkout): + def test_batch_byte_size( + self, mock_batch_snapshot_class, mock_batch_checkout): - # each mutation group byte size is 58 bytes. - mutation_group = [MutationGroup([ - WriteMutation.insert("roles", ("key", "rolename"), - [('1234', "mutations-inset-1234")])])] * 50 + # each mutation group byte size is 58 bytes. + mutation_group = [ + MutationGroup([ + WriteMutation.insert( + "roles", + ("key", "rolename"), [('1234', "mutations-inset-1234")]) + ]) + ] * 50 with TestPipeline() as p: - # the total 50 mutation gorup size will be 2900 (58 * 50) + # the total 50 mutation group size will be 2900 (58 * 50) # if we want to make two batches, so batch size should be 1450 (2900 / 2) # and each bach should contains 25 mutations. res = ( - p | beam.Create(mutation_group) | beam.ParDo(_BatchFn(1450)) - | beam.Map(lambda x: len(x)) - ) - assert_that(res, equal_to([25, 25])) + p | beam.Create(mutation_group) + | beam.ParDo( + _BatchFn( + max_batch_size_bytes=1450, + max_number_rows=50, + max_number_cells=500)) + | beam.Map(lambda x: len(x))) + assert_that(res, equal_to([25] * 2)) + + def test_batch_disable( + self, mock_batch_snapshot_class, mock_batch_checkout): + + mutation_group = [ + MutationGroup([ + WriteMutation.insert( + "roles", + ("key", "rolename"), [('1234', "mutations-inset-1234")]) + ]) + ] * 4 + + with TestPipeline() as p: + # to disable to batching, we need to set any of the batching parameters + # either to lower value or zero + res = ( + p | beam.Create(mutation_group) + | beam.ParDo( + _BatchFn( + max_batch_size_bytes=1450, + max_number_rows=0, + max_number_cells=500)) + | beam.Map(lambda x: len(x))) + assert_that(res, equal_to([1] * 4)) + + def test_batch_max_rows( + self, mock_batch_snapshot_class, mock_batch_checkout): + + mutation_group = [ + MutationGroup([ + WriteMutation.insert( + "roles", + ("key", "rolename"), [ + ('1234', "mutations-inset-1234"), + ('1235', "mutations-inset-1235"), + ] + )] + )] * 50 + + with TestPipeline() as p: + # There are total 50 mutation groups, each contains two rows. + # The total number of rows will be 100 (50 * 2). + # If each batch contains 10 rows max then batch count should be 10 + # (contains 5 mutation groups each). + res = ( + p | beam.Create(mutation_group) + | beam.ParDo( + _BatchFn( + max_batch_size_bytes=1048576, + max_number_rows=10, + max_number_cells=500)) + | beam.Map(lambda x: len(x))) + assert_that(res, equal_to([5] * 10)) + + def test_batch_max_cells( + self, mock_batch_snapshot_class, mock_batch_checkout): + + mutation_group = [ + MutationGroup([ + WriteMutation.insert( + "roles", + ("key", "rolename"), [ + ('1234', "mutations-inset-1234"), + ('1235', "mutations-inset-1235"), + ] + )] + )] * 50 + + with TestPipeline() as p: + # There are total 50 mutation groups, each contains two rows (or 4 cells). + # The total number of cells will be 200 (50 groups * 4 cells). + # If each batch contains 50 cells max then batch count should be 5. + # 4 batches contains 12 mutations groups and the fifth batch should be + # consists of 2 mutation group element. + # No. of mutations groups per batch = Max Cells / Cells per mutation group + # total_batches = Total Number of Cells / Max Cells + res = ( + p | beam.Create(mutation_group) + | beam.ParDo( + _BatchFn( + max_batch_size_bytes=1048576, + max_number_rows=500, + max_number_cells=50)) + | beam.Map(lambda x: len(x))) + assert_that(res, equal_to([12, 12, 12, 12, 2])) def test_write_mutation_error(self, *args): with self.assertRaises(ValueError): @@ -510,13 +602,14 @@ def test_display_data(self, *args): project_id=TEST_PROJECT_ID, instance_id=TEST_INSTANCE_ID, database_id=_generate_database_name(), - max_batch_size_bytes=1024 - ).display_data() + max_batch_size_bytes=1024).display_data() self.assertTrue("project_id" in data) self.assertTrue("instance_id" in data) self.assertTrue("pool" in data) self.assertTrue("database" in data) self.assertTrue("batch_size" in data) + self.assertTrue("max_number_rows" in data) + self.assertTrue("max_number_cells" in data) if __name__ == '__main__': From c51d9a76f8b5c614aeafa41ac55f1d441eed5359 Mon Sep 17 00:00:00 2001 From: Shoaib Zafar Date: Mon, 10 Feb 2020 23:32:16 +0500 Subject: [PATCH 4/6] adds io note on CHANGES.md --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index ce39f8303853..17206608f679 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -25,7 +25,7 @@ * New highly anticipated feature Y added to JavaSDK ([BEAM-Y](https://issues.apache.org/jira/browse/BEAM-Y)). ### I/Os -* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). +* Support for Google Cloud Spanner added for Python SDK. This is an experimental module for reading and writing data from Google Cloud Spanner ([BEAM-7246](https://issues.apache.org/jira/browse/BEAM-7246)). ### New Features / Improvements From fc3e056df25320cc608249b262bbf0df38a2882f Mon Sep 17 00:00:00 2001 From: Shoaib Date: Tue, 11 Feb 2020 19:13:14 +0500 Subject: [PATCH 5/6] fix python code formate --- .../io/gcp/experimental/spannerio_test.py | 50 +++++++++---------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py index 001e1c75a860..672bf8b7d242 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py @@ -507,8 +507,7 @@ def test_batch_byte_size( | beam.Map(lambda x: len(x))) assert_that(res, equal_to([25] * 2)) - def test_batch_disable( - self, mock_batch_snapshot_class, mock_batch_checkout): + def test_batch_disable(self, mock_batch_snapshot_class, mock_batch_checkout): mutation_group = [ MutationGroup([ @@ -524,26 +523,25 @@ def test_batch_disable( res = ( p | beam.Create(mutation_group) | beam.ParDo( - _BatchFn( - max_batch_size_bytes=1450, - max_number_rows=0, - max_number_cells=500)) + _BatchFn( + max_batch_size_bytes=1450, + max_number_rows=0, + max_number_cells=500)) | beam.Map(lambda x: len(x))) assert_that(res, equal_to([1] * 4)) - def test_batch_max_rows( - self, mock_batch_snapshot_class, mock_batch_checkout): + def test_batch_max_rows(self, mock_batch_snapshot_class, mock_batch_checkout): mutation_group = [ MutationGroup([ WriteMutation.insert( - "roles", - ("key", "rolename"), [ + "roles", ("key", "rolename"), + [ ('1234', "mutations-inset-1234"), ('1235', "mutations-inset-1235"), - ] - )] - )] * 50 + ]) + ]) + ] * 50 with TestPipeline() as p: # There are total 50 mutation groups, each contains two rows. @@ -553,10 +551,10 @@ def test_batch_max_rows( res = ( p | beam.Create(mutation_group) | beam.ParDo( - _BatchFn( - max_batch_size_bytes=1048576, - max_number_rows=10, - max_number_cells=500)) + _BatchFn( + max_batch_size_bytes=1048576, + max_number_rows=10, + max_number_cells=500)) | beam.Map(lambda x: len(x))) assert_that(res, equal_to([5] * 10)) @@ -566,13 +564,13 @@ def test_batch_max_cells( mutation_group = [ MutationGroup([ WriteMutation.insert( - "roles", - ("key", "rolename"), [ + "roles", ("key", "rolename"), + [ ('1234', "mutations-inset-1234"), ('1235', "mutations-inset-1235"), - ] - )] - )] * 50 + ]) + ]) + ] * 50 with TestPipeline() as p: # There are total 50 mutation groups, each contains two rows (or 4 cells). @@ -585,10 +583,10 @@ def test_batch_max_cells( res = ( p | beam.Create(mutation_group) | beam.ParDo( - _BatchFn( - max_batch_size_bytes=1048576, - max_number_rows=500, - max_number_cells=50)) + _BatchFn( + max_batch_size_bytes=1048576, + max_number_rows=500, + max_number_cells=50)) | beam.Map(lambda x: len(x))) assert_that(res, equal_to([12, 12, 12, 12, 2])) From 989cf4a1cb5423070674b1606358a9d67bf723ea Mon Sep 17 00:00:00 2001 From: Shoaib Date: Thu, 13 Feb 2020 19:14:03 +0500 Subject: [PATCH 6/6] Added docs for spanner write. --- sdks/python/apache_beam/io/gcp/experimental/spannerio.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py index 5b2ee588186f..b575f6ee4d74 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py @@ -152,6 +152,8 @@ set to 1MB (1048576 bytes). These parameter used to reduce the number of transactions sent to spanner by grouping the mutation into batches. Setting these param values either to smaller value or zero to disable batching. +Unlike the Java connector, this connector does not create batches of +transactions sorted by table and primary key. WriteToSpanner transforms starts with the grouping into batches. The first step in this process is to make the make the mutation groups of the WriteMutation