diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index 3fbd198d6b65..8dbf8c1ce6fb 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -12,7 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""User friendly container for Google Cloud Bigtable Table.""" +"""User-friendly container for Google Cloud Bigtable Table.""" + + +import six from google.cloud._helpers import _to_bytes from google.cloud.bigtable._generated import ( @@ -29,6 +32,19 @@ from google.cloud.bigtable.row_data import PartialRowsData +# Maximum number of mutations in bulk (MutateRowsRequest message): +# https://cloud.google.com/bigtable/docs/reference/data/rpc/google.bigtable.v2#google.bigtable.v2.MutateRowRequest +_MAX_BULK_MUTATIONS = 100000 + + +class TableMismatchError(ValueError): + """Row from another table.""" + + +class TooManyMutationsError(ValueError): + """The number of mutations for bulk request is too big.""" + + class Table(object): """Representation of a Google Cloud Bigtable Table. @@ -276,6 +292,35 @@ def read_rows(self, start_key=None, end_key=None, limit=None, # We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse` return PartialRowsData(response_iterator) + def mutate_rows(self, rows): + """Mutates multiple rows in bulk. + + The method tries to update all specified rows. + If some of the rows weren't updated, it would not remove mutations. + They can be applied to the row separately. + If row mutations finished successfully, they would be cleaned up. + + :type rows: list + :param rows: List or other iterable of :class:`.DirectRow` instances. + + :rtype: list + :returns: A list of response statuses (`google.rpc.status_pb2.Status`) + corresponding to success or failure of each row mutation + sent. These will be in the same order as the `rows`. + """ + mutate_rows_request = _mutate_rows_request(self.name, rows) + client = self._instance._client + responses = client._data_stub.MutateRows(mutate_rows_request) + + responses_statuses = [ + None for _ in six.moves.xrange(len(mutate_rows_request.entries))] + for response in responses: + for entry in response.entries: + responses_statuses[entry.index] = entry.status + if entry.status.code == 0: + rows[entry.index].clear() + return responses_statuses + def sample_row_keys(self): """Read a sample of row keys in the table. @@ -373,3 +418,67 @@ def _create_row_request(table_name, row_key=None, start_key=None, end_key=None, message.rows.row_ranges.add(**range_kwargs) return message + + +def _mutate_rows_request(table_name, rows): + """Creates a request to mutate rows in a table. + + :type table_name: str + :param table_name: The name of the table to write to. + + :type rows: list + :param rows: List or other iterable of :class:`.DirectRow` instances. + + :rtype: :class:`data_messages_v2_pb2.MutateRowsRequest` + :returns: The ``MutateRowsRequest`` protobuf corresponding to the inputs. + :raises: :exc:`~.table.TooManyMutationsError` if the number of mutations is + greater than 100,000 + """ + request_pb = data_messages_v2_pb2.MutateRowsRequest(table_name=table_name) + mutations_count = 0 + for row in rows: + _check_row_table_name(table_name, row) + _check_row_type(row) + entry = request_pb.entries.add() + entry.row_key = row.row_key + # NOTE: Since `_check_row_type` has verified `row` is a `DirectRow`, + # the mutations have no state. + for mutation in row._get_mutations(None): + mutations_count += 1 + entry.mutations.add().CopyFrom(mutation) + if mutations_count > _MAX_BULK_MUTATIONS: + raise TooManyMutationsError('Maximum number of mutations is %s' % + (_MAX_BULK_MUTATIONS,)) + return request_pb + + +def _check_row_table_name(table_name, row): + """Checks that a row belongs to a table. + + :type table_name: str + :param table_name: The name of the table. + + :type row: :class:`.Row` + :param row: An instance of :class:`.Row` subclasses. + + :raises: :exc:`~.table.TableMismatchError` if the row does not belong to + the table. + """ + if row.table.name != table_name: + raise TableMismatchError( + 'Row %s is a part of %s table. Current table: %s' % + (row.row_key, row.table.name, table_name)) + + +def _check_row_type(row): + """Checks that a row is an instance of :class:`.DirectRow`. + + :type row: :class:`.Row` + :param row: An instance of :class:`.Row` subclasses. + + :raises: :class:`TypeError ` if the row is not an + instance of DirectRow. + """ + if not isinstance(row, DirectRow): + raise TypeError('Bulk processing can not be applied for ' + 'conditional or append mutations.') diff --git a/bigtable/tests/system.py b/bigtable/tests/system.py index faed85fdb302..1fcda808db39 100644 --- a/bigtable/tests/system.py +++ b/bigtable/tests/system.py @@ -356,6 +356,33 @@ def _write_to_row(self, row1=None, row2=None, row3=None, row4=None): cell4 = Cell(CELL_VAL4, timestamp4) return cell1, cell2, cell3, cell4 + def test_mutate_rows(self): + row1 = self._table.row(ROW_KEY) + row1.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + row1.commit() + self.rows_to_delete.append(row1) + row2 = self._table.row(ROW_KEY_ALT) + row2.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL2) + row2.commit() + self.rows_to_delete.append(row2) + + # Change the contents + row1.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL3) + row2.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL4) + rows = [row1, row2] + statuses = self._table.mutate_rows(rows) + result = [status.code for status in statuses] + expected_result = [0, 0] + self.assertEqual(result, expected_result) + + # Check the contents + row1_data = self._table.read_row(ROW_KEY) + self.assertEqual( + row1_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value, CELL_VAL3) + row2_data = self._table.read_row(ROW_KEY_ALT) + self.assertEqual( + row2_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value, CELL_VAL4) + def test_read_large_cell_limit(self): row = self._table.row(ROW_KEY) self.rows_to_delete.append(row) diff --git a/bigtable/tests/unit/test_row.py b/bigtable/tests/unit/test_row.py index 046934ca1f27..156a517b351a 100644 --- a/bigtable/tests/unit/test_row.py +++ b/bigtable/tests/unit/test_row.py @@ -21,6 +21,7 @@ class TestRow(unittest.TestCase): @staticmethod def _get_target_class(): from google.cloud.bigtable.row import Row + return Row def _make_one(self, *args, **kwargs): diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index 63844f5d48b7..5867e76aff73 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -15,6 +15,109 @@ import unittest +import mock + + +class Test___mutate_rows_request(unittest.TestCase): + + def _call_fut(self, table_name, rows): + from google.cloud.bigtable.table import _mutate_rows_request + + return _mutate_rows_request(table_name, rows) + + @mock.patch('google.cloud.bigtable.table._MAX_BULK_MUTATIONS', new=3) + def test__mutate_rows_too_many_mutations(self): + from google.cloud.bigtable.row import DirectRow + from google.cloud.bigtable.table import TooManyMutationsError + + table = mock.Mock(name='table', spec=['name']) + table.name = 'table' + rows = [DirectRow(row_key=b'row_key', table=table), + DirectRow(row_key=b'row_key_2', table=table)] + rows[0].set_cell('cf1', b'c1', 1) + rows[0].set_cell('cf1', b'c1', 2) + rows[1].set_cell('cf1', b'c1', 3) + rows[1].set_cell('cf1', b'c1', 4) + with self.assertRaises(TooManyMutationsError): + self._call_fut('table', rows) + + def test__mutate_rows_request(self): + from google.cloud.bigtable.row import DirectRow + + table = mock.Mock(name='table', spec=['name']) + table.name = 'table' + rows = [DirectRow(row_key=b'row_key', table=table), + DirectRow(row_key=b'row_key_2', table=table)] + rows[0].set_cell('cf1', b'c1', b'1') + rows[1].set_cell('cf1', b'c1', b'2') + result = self._call_fut('table', rows) + + expected_result = _mutate_rows_request_pb(table_name='table') + entry1 = expected_result.entries.add() + entry1.row_key = b'row_key' + mutations1 = entry1.mutations.add() + mutations1.set_cell.family_name = 'cf1' + mutations1.set_cell.column_qualifier = b'c1' + mutations1.set_cell.timestamp_micros = -1 + mutations1.set_cell.value = b'1' + entry2 = expected_result.entries.add() + entry2.row_key = b'row_key_2' + mutations2 = entry2.mutations.add() + mutations2.set_cell.family_name = 'cf1' + mutations2.set_cell.column_qualifier = b'c1' + mutations2.set_cell.timestamp_micros = -1 + mutations2.set_cell.value = b'2' + + self.assertEqual(result, expected_result) + + +class Test__check_row_table_name(unittest.TestCase): + + def _call_fut(self, table_name, row): + from google.cloud.bigtable.table import _check_row_table_name + + return _check_row_table_name(table_name, row) + + def test_wrong_table_name(self): + from google.cloud.bigtable.table import TableMismatchError + from google.cloud.bigtable.row import DirectRow + + table = mock.Mock(name='table', spec=['name']) + table.name = 'table' + row = DirectRow(row_key=b'row_key', table=table) + with self.assertRaises(TableMismatchError): + self._call_fut('other_table', row) + + def test_right_table_name(self): + from google.cloud.bigtable.row import DirectRow + + table = mock.Mock(name='table', spec=['name']) + table.name = 'table' + row = DirectRow(row_key=b'row_key', table=table) + result = self._call_fut('table', row) + self.assertFalse(result) + + +class Test__check_row_type(unittest.TestCase): + def _call_fut(self, row): + from google.cloud.bigtable.table import _check_row_type + + return _check_row_type(row) + + def test_test_wrong_row_type(self): + from google.cloud.bigtable.row import ConditionalRow + + row = ConditionalRow(row_key=b'row_key', table='table', filter_=None) + with self.assertRaises(TypeError): + self._call_fut(row) + + def test_right_row_type(self): + from google.cloud.bigtable.row import DirectRow + + row = DirectRow(row_key=b'row_key', table='table') + result = self._call_fut(row) + self.assertFalse(result) + class TestTable(unittest.TestCase): @@ -348,6 +451,44 @@ def test_read_row_still_partial(self): with self.assertRaises(ValueError): self._read_row_helper(chunks, None) + def test_mutate_rows(self): + from google.cloud.bigtable._generated.bigtable_pb2 import ( + MutateRowsResponse) + from google.cloud.bigtable.row import DirectRow + from google.rpc.status_pb2 import Status + from tests.unit._testing import _FakeStub + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_one(self.TABLE_ID, instance) + + row_1 = DirectRow(row_key=b'row_key', table=table) + row_1.set_cell('cf', b'col', b'value1') + row_2 = DirectRow(row_key=b'row_key_2', table=table) + row_2.set_cell('cf', b'col', b'value2') + + response = MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + index=0, + status=Status(code=0), + ), + MutateRowsResponse.Entry( + index=1, + status=Status(code=1), + ), + ], + ) + + # Patch the stub used by the API method. + client._data_stub = _FakeStub([response]) + statuses = table.mutate_rows([row_1, row_2]) + result = [status.code for status in statuses] + expected_result = [0, 1] + + self.assertEqual(result, expected_result) + + def test_read_rows(self): from google.cloud._testing import _Monkey from tests.unit._testing import _FakeStub @@ -570,6 +711,13 @@ def _SampleRowKeysRequestPB(*args, **kw): return messages_v2_pb2.SampleRowKeysRequest(*args, **kw) +def _mutate_rows_request_pb(*args, **kw): + from google.cloud.bigtable._generated import ( + bigtable_pb2 as data_messages_v2_pb2) + + return data_messages_v2_pb2.MutateRowsRequest(*args, **kw) + + def _TablePB(*args, **kw): from google.cloud.bigtable._generated import ( table_pb2 as table_v2_pb2)