From 65621f37dfb79c4454c0110545cc07052dedcf87 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 7 Jan 2015 21:53:48 -0500 Subject: [PATCH 1/2] Add a 'Batch': collects non-transactional saves / deletes. Closes #421. --- docs/datastore-batches.rst | 7 + docs/index.rst | 3 +- gcloud/datastore/batch.py | 170 +++++++++++++++++++++++ gcloud/datastore/connection.py | 34 ++++- gcloud/datastore/test_batch.py | 203 ++++++++++++++++++++++++++++ gcloud/datastore/test_connection.py | 34 +++++ 6 files changed, 444 insertions(+), 7 deletions(-) create mode 100644 docs/datastore-batches.rst create mode 100644 gcloud/datastore/batch.py create mode 100644 gcloud/datastore/test_batch.py diff --git a/docs/datastore-batches.rst b/docs/datastore-batches.rst new file mode 100644 index 000000000000..49527a9fe495 --- /dev/null +++ b/docs/datastore-batches.rst @@ -0,0 +1,7 @@ +Batches +~~~~~~~ + +.. automodule:: gcloud.datastore.batch + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/index.rst b/docs/index.rst index 179ab099389a..fa65bc41e4e3 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -5,8 +5,9 @@ datastore-api datastore-entities datastore-keys - datastore-transactions datastore-queries + datastore-transactions + datastore-batches storage-api storage-buckets storage-keys diff --git a/gcloud/datastore/batch.py b/gcloud/datastore/batch.py new file mode 100644 index 000000000000..d2bcc96ca082 --- /dev/null +++ b/gcloud/datastore/batch.py @@ -0,0 +1,170 @@ +# Copyright 2014 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Create / interact with a batch of updates / deletes.""" + +from gcloud.datastore import _implicit_environ +from gcloud.datastore import datastore_v1_pb2 as datastore_pb + + +class Batch(object): + """An abstraction representing a collected group of updates / deletes. + + Used to build up a bulk mutuation. + + For example, the following snippet of code will put the two ``save`` + operations and the delete operatiuon into the same mutation, and send + them to the server in a single API request:: + + >>> from gcloud import datastore + >>> batch = Batch() + >>> batch.put(entity1) + >>> batch.put(entity2) + >>> batch.delete(key3) + >>> batch.commit() + + You can also use a batch as a context manager, in which case the + ``commit`` will be called automatically if its block exits without + raising an exception:: + + >>> with Batch() as batch: + ... batch.put(entity1) + ... batch.put(entity2) + ... batch.delete(key3) + + By default, no updates will be sent if the block exits with an error:: + + >>> from gcloud import datastore + >>> dataset = datastore.get_dataset('dataset-id') + >>> with Batch as batch: + ... do_some_work(batch) + ... raise Exception() # rolls back + """ + + def __init__(self, dataset_id=None, connection=None): + """ Construct a batch. + + :type dataset_id: :class:`str`. + :param dataset_id: The ID of the dataset. + + :type connection: :class:`gcloud.datastore.connection.Connection` + :param connection: The connection used to connect to datastore. + + :raises: :class:`ValueError` if either a connection or dataset ID + are not set. + """ + self._connection = connection or _implicit_environ.CONNECTION + self._dataset_id = dataset_id or _implicit_environ.DATASET_ID + + if self._connection is None or self._dataset_id is None: + raise ValueError('A batch must have a connection and ' + 'a dataset ID set.') + + self._id = None + self._mutation = datastore_pb.Mutation() + + @property + def dataset_id(self): + """Getter for dataset ID in which the batch will run. + + :rtype: :class:`str` + :returns: The dataset ID in which the batch will run. + """ + return self._dataset_id + + @property + def connection(self): + """Getter for connection over which the batch will run. + + :rtype: :class:`gcloud.datastore.connection.Connection` + :returns: The connection over which the batch will run. + """ + return self._connection + + @property + def mutation(self): + """Getter for the current mutation. + + Every batch is committed with a single Mutation + representing the 'work' to be done as part of the batch. + Inside a batch, calling ``batch.put()`` with an entity, or + ``batch.delete`` with a key, builds up the mutation. + This getter returns the Mutation protobuf that + has been built-up so far. + + :rtype: :class:`gcloud.datastore.datastore_v1_pb2.Mutation` + :returns: The Mutation protobuf to be sent in the commit request. + """ + return self._mutation + + def put(self, entity): + """Remember an entity's state to be saved during ``commit``. + + .. note:: + Any existing properties for the entity will be replaced by those + currently set on this instance. Already-stored properties which do + not correspond to keys set on this instance will be removed from + the datastore. + + .. note:: + Property values which are "text" ('unicode' in Python2, 'str' in + Python3) map to 'string_value' in the datastore; values which are + "bytes" ('str' in Python2, 'bytes' in Python3) map to 'blob_value'. + + :type entity: :class:`gcloud.datastore.entity.Entity` + :param entity: the entity to be saved. + + :raises: ValueError if entity has no key assigned. + """ + if entity.key is None: + raise ValueError("Entity must have a key") + + key_pb = entity.key.to_protobuf() + properties = dict(entity) + exclude = tuple(entity.exclude_from_indexes) + + self.connection.save_entity( + self.dataset_id, key_pb, properties, + exclude_from_indexes=exclude, mutation=self.mutation) + + def delete(self, key): + """Remember a key to be deleted durring ``commit``. + + :type key: :class:`gcloud.datastore.key.Key` + :param key: the key to be deleted. + + :raises: ValueError if key is not complete. + """ + if key.is_partial: + raise ValueError("Key must be complete") + + key_pb = key.to_protobuf() + self.connection.delete_entities( + self.dataset_id, [key_pb], mutation=self.mutation) + + def commit(self): + """Commits the batch. + + This is called automatically upon exiting a with statement, + however it can be called explicitly if you don't want to use a + context manager. + """ + self.connection.commit(self._dataset_id, self.mutation) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is None: + self.commit() diff --git a/gcloud/datastore/connection.py b/gcloud/datastore/connection.py index d8d95de64b79..a483f00e6e5c 100644 --- a/gcloud/datastore/connection.py +++ b/gcloud/datastore/connection.py @@ -421,7 +421,7 @@ def allocate_ids(self, dataset_id, key_pbs): return list(response.key) def save_entity(self, dataset_id, key_pb, properties, - exclude_from_indexes=()): + exclude_from_indexes=(), mutation=None): """Save an entity to the Cloud Datastore with the provided properties. .. note:: @@ -441,13 +441,24 @@ def save_entity(self, dataset_id, key_pb, properties, :type exclude_from_indexes: sequence of string :param exclude_from_indexes: Names of properties *not* to be indexed. + :type mutation: :class:`gcloud.datastore.datastore_v1_pb2.Mutation` + or None. + :param mutation: If passed, the mutation protobuf into which the + entity will be saved. If None, use th result + of calling ``self.mutation()`` + :rtype: tuple :returns: The pair (``assigned``, ``new_id``) where ``assigned`` is a boolean indicating if a new ID has been assigned and ``new_id`` is either ``None`` or an integer that has been assigned. """ - mutation = self.mutation() + if mutation is not None: + in_batch = True + else: + in_batch = False + mutation = self.mutation() + key_pb = helpers._prepare_key_for_request(key_pb) # If the Key is complete, we should upsert @@ -479,7 +490,7 @@ def save_entity(self, dataset_id, key_pb, properties, # If this is in a transaction, we should just return True. The # transaction will handle assigning any keys as necessary. - if self.transaction(): + if in_batch or self.transaction(): return False, None result = self.commit(dataset_id, mutation) @@ -493,7 +504,7 @@ def save_entity(self, dataset_id, key_pb, properties, return False, None - def delete_entities(self, dataset_id, key_pbs): + def delete_entities(self, dataset_id, key_pbs, mutation=None): """Delete keys from a dataset in the Cloud Datastore. This method deals only with @@ -508,13 +519,24 @@ def delete_entities(self, dataset_id, key_pbs): :type key_pbs: list of :class:`gcloud.datastore.datastore_v1_pb2.Key` :param key_pbs: The keys to delete from the datastore. + :type mutation: :class:`gcloud.datastore.datastore_v1_pb2.Mutation` + or None. + :param mutation: If passed, the mutation protobuf into which the + deletion will be saved. If None, use th result + of calling ``self.mutation()`` + :rtype: boolean :returns: ``True`` """ - mutation = self.mutation() + if mutation is not None: + in_batch = True + else: + in_batch = False + mutation = self.mutation() + helpers._add_keys_to_request(mutation.delete, key_pbs) - if not self.transaction(): + if not in_batch and not self.transaction(): self.commit(dataset_id, mutation) return True diff --git a/gcloud/datastore/test_batch.py b/gcloud/datastore/test_batch.py new file mode 100644 index 000000000000..4b3414e50eab --- /dev/null +++ b/gcloud/datastore/test_batch.py @@ -0,0 +1,203 @@ +# Copyright 2014 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest2 + + +class TestBatch(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.datastore.batch import Batch + + return Batch + + def _makeOne(self, dataset_id=None, connection=None): + return self._getTargetClass()(dataset_id=dataset_id, + connection=connection) + + def test_ctor_missing_required(self): + from gcloud._testing import _Monkey + from gcloud.datastore import _implicit_environ + + with _Monkey(_implicit_environ, + DATASET_ID=None, + CONNECTION=None): + self.assertRaises(ValueError, self._makeOne) + self.assertRaises(ValueError, self._makeOne, dataset_id=object()) + self.assertRaises(ValueError, self._makeOne, connection=object()) + + def test_ctor_explicit(self): + from gcloud.datastore.datastore_v1_pb2 import Mutation + _DATASET = 'DATASET' + connection = _Connection() + batch = self._makeOne(dataset_id=_DATASET, connection=connection) + + self.assertEqual(batch.dataset_id, _DATASET) + self.assertEqual(batch.connection, connection) + self.assertTrue(isinstance(batch.mutation, Mutation)) + + def test_ctor_implicit(self): + from gcloud._testing import _Monkey + from gcloud.datastore import _implicit_environ + from gcloud.datastore.datastore_v1_pb2 import Mutation + DATASET_ID = 'DATASET' + CONNECTION = _Connection() + + with _Monkey(_implicit_environ, + DATASET_ID=DATASET_ID, + CONNECTION=CONNECTION): + batch = self._makeOne() + + self.assertEqual(batch.dataset_id, DATASET_ID) + self.assertEqual(batch.connection, CONNECTION) + self.assertTrue(isinstance(batch.mutation, Mutation)) + + def test_put_entity_wo_key(self): + _DATASET = 'DATASET' + connection = _Connection() + batch = self._makeOne(dataset_id=_DATASET, connection=connection) + + self.assertRaises(ValueError, batch.put, _Entity()) + + def test_put_entity_w_key(self): + _DATASET = 'DATASET' + _PROPERTIES = {'foo': 'bar'} + connection = _Connection() + batch = self._makeOne(dataset_id=_DATASET, connection=connection) + entity = _Entity(_PROPERTIES) + key = entity.key = _Key(_DATASET) + + batch.put(entity) + + self.assertEqual( + connection._saved, + (_DATASET, key._key, _PROPERTIES, (), batch.mutation)) + + def test_delete_w_partial_key(self): + _DATASET = 'DATASET' + connection = _Connection() + batch = self._makeOne(dataset_id=_DATASET, connection=connection) + key = _Key(_DATASET) + key._partial = True + + self.assertRaises(ValueError, batch.delete, key) + + def test_delete_w_completed_key(self): + _DATASET = 'DATASET' + connection = _Connection() + batch = self._makeOne(dataset_id=_DATASET, connection=connection) + key = _Key(_DATASET) + + batch.delete(key) + + self.assertEqual( + connection._deleted, + (_DATASET, [key._key], batch.mutation)) + + def test_commit(self): + _DATASET = 'DATASET' + connection = _Connection() + batch = self._makeOne(dataset_id=_DATASET, connection=connection) + + batch.commit() + + self.assertEqual(connection._committed, (_DATASET, batch.mutation)) + + def test_as_context_mgr_wo_error(self): + _DATASET = 'DATASET' + _PROPERTIES = {'foo': 'bar'} + connection = _Connection() + entity = _Entity(_PROPERTIES) + key = entity.key = _Key(_DATASET) + + with self._makeOne(dataset_id=_DATASET, + connection=connection) as batch: + batch.put(entity) + + self.assertEqual( + connection._saved, + (_DATASET, key._key, _PROPERTIES, (), batch.mutation)) + self.assertEqual(connection._committed, (_DATASET, batch.mutation)) + + def test_as_context_mgr_w_error(self): + _DATASET = 'DATASET' + _PROPERTIES = {'foo': 'bar'} + connection = _Connection() + entity = _Entity(_PROPERTIES) + key = entity.key = _Key(_DATASET) + + try: + with self._makeOne(dataset_id=_DATASET, + connection=connection) as batch: + batch.put(entity) + raise ValueError("testing") + except ValueError: + pass + + self.assertEqual( + connection._saved, + (_DATASET, key._key, _PROPERTIES, (), batch.mutation)) + self.assertEqual(connection._committed, None) + + +class _CommitResult(object): + + def __init__(self, *new_keys): + self.insert_auto_id_key = new_keys + + +class _Connection(object): + _marker = object() + _committed = _saved = _deleted = None + _save_result = (False, None) + + def __init__(self): + self._commit_result = _CommitResult() + + def save_entity(self, dataset_id, key_pb, properties, + exclude_from_indexes=(), mutation=None): + self._saved = (dataset_id, key_pb, properties, + tuple(exclude_from_indexes), mutation) + return self._save_result + + def delete_entities(self, dataset_id, key_pbs, mutation=None): + self._deleted = (dataset_id, key_pbs, mutation) + + def commit(self, dataset_id, mutation): + self._committed = (dataset_id, mutation) + return self._commit_result + + +class _Entity(dict): + key = None + exclude_from_indexes = () + + +class _Key(object): + _MARKER = object() + _key = 'KEY' + _partial = False + _path = None + _id = None + _stored = None + + def __init__(self, dataset_id): + self.dataset_id = dataset_id + + @property + def is_partial(self): + return self._partial + + def to_protobuf(self): + return self._key diff --git a/gcloud/datastore/test_connection.py b/gcloud/datastore/test_connection.py index 32246a6776de..d91c64555920 100644 --- a/gcloud/datastore/test_connection.py +++ b/gcloud/datastore/test_connection.py @@ -1075,6 +1075,25 @@ class Xact(object): mutation = conn.mutation() self.assertEqual(len(mutation.upsert), 1) + def test_save_entity_w_mutation_passed(self): + from gcloud.datastore import datastore_v1_pb2 as datastore_pb + from gcloud.datastore.entity import Entity + DATASET_ID = 'DATASET' + nested = Entity() + nested['bar'] = u'Bar' + key_pb = self._make_key_pb(DATASET_ID) + rsp_pb = datastore_pb.CommitResponse() + conn = self._makeOne() + http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString()) + mutation = datastore_pb.Mutation() + result = conn.save_entity(DATASET_ID, key_pb, {'foo': nested}, + mutation=mutation) + self.assertEqual(result, (False, None)) + self.assertEqual(http._called_with, None) + conn_mutation = conn.mutation() + self.assertEqual(len(mutation.upsert), 1) + self.assertEqual(len(conn_mutation.upsert), 0) + def test_delete_entities_wo_transaction(self): from gcloud.datastore import datastore_v1_pb2 as datastore_pb @@ -1126,6 +1145,21 @@ class Xact(object): mutation = conn.mutation() self.assertEqual(len(mutation.delete), 1) + def test_delete_entities_w_mutation_passed(self): + from gcloud.datastore import datastore_v1_pb2 as datastore_pb + DATASET_ID = 'DATASET' + key_pb = self._make_key_pb(DATASET_ID) + rsp_pb = datastore_pb.CommitResponse() + conn = self._makeOne() + http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString()) + mutation = datastore_pb.Mutation() + result = conn.delete_entities(DATASET_ID, [key_pb], mutation=mutation) + self.assertEqual(result, True) + self.assertEqual(http._called_with, None) + conn_mutation = conn.mutation() + self.assertEqual(len(mutation.delete), 1) + self.assertEqual(len(conn_mutation.delete), 0) + class Http(object): From cf6873bd94d26061667f91e86abc59080e76068a Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 8 Jan 2015 13:08:58 -0500 Subject: [PATCH 2/2] Drop fossil. See: https://github.com/GoogleCloudPlatform/gcloud-python/pull/509#discussion_r22667341 --- gcloud/datastore/batch.py | 1 - 1 file changed, 1 deletion(-) diff --git a/gcloud/datastore/batch.py b/gcloud/datastore/batch.py index d2bcc96ca082..3a9ed176ca78 100644 --- a/gcloud/datastore/batch.py +++ b/gcloud/datastore/batch.py @@ -71,7 +71,6 @@ def __init__(self, dataset_id=None, connection=None): raise ValueError('A batch must have a connection and ' 'a dataset ID set.') - self._id = None self._mutation = datastore_pb.Mutation() @property