From 454dd9c5bf5a8ceb816fc1c637242086d27cf5bd Mon Sep 17 00:00:00 2001 From: Cal Peyser Date: Fri, 8 Sep 2017 14:25:32 -0400 Subject: [PATCH 1/6] Adding RPC retries to Bigtable. --- bigtable/google/cloud/bigtable/retry.py | 189 +++++++++++++++++++++ bigtable/google/cloud/bigtable/row_data.py | 3 + bigtable/google/cloud/bigtable/table.py | 108 ++++-------- bigtable/tests/retry_test_script.txt | 38 +++++ bigtable/tests/system.py | 79 +++++++++ bigtable/tests/unit/_testing.py | 27 ++- bigtable/tests/unit/test_table.py | 189 +++++++++++++++++++-- 7 files changed, 544 insertions(+), 89 deletions(-) create mode 100644 bigtable/google/cloud/bigtable/retry.py create mode 100644 bigtable/tests/retry_test_script.txt diff --git a/bigtable/google/cloud/bigtable/retry.py b/bigtable/google/cloud/bigtable/retry.py new file mode 100644 index 000000000000..0ca846475feb --- /dev/null +++ b/bigtable/google/cloud/bigtable/retry.py @@ -0,0 +1,189 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Provides function wrappers that implement retrying.""" + +import random +import time +import six +import sys + +from google.cloud._helpers import _to_bytes +from google.cloud.bigtable._generated import ( + bigtable_pb2 as data_messages_v2_pb2) +from google.gax import config, errors +from grpc import RpcError + + +_MILLIS_PER_SECOND = 1000 + + +class ReadRowsIterator(object): + """Creates an iterator equivalent to a_iter, but that retries on certain + exceptions. + """ + + def __init__(self, client, name, start_key, end_key, filter_, limit, + end_inclusive, retry_options, **kwargs): + self.client = client + self.retry_options = retry_options + self.name = name + self.start_key = start_key + self.start_key_closed = True + self.end_key = end_key + self.filter_ = filter_ + self.limit = limit + self.end_inclusive = end_inclusive + self.delay_mult = retry_options.backoff_settings.retry_delay_multiplier + self.max_delay_millis = \ + retry_options.backoff_settings.max_retry_delay_millis + self.timeout_mult = \ + retry_options.backoff_settings.rpc_timeout_multiplier + self.max_timeout = \ + (retry_options.backoff_settings.max_rpc_timeout_millis / + _MILLIS_PER_SECOND) + self.total_timeout = \ + (retry_options.backoff_settings.total_timeout_millis / + _MILLIS_PER_SECOND) + self.set_stream() + + def set_start_key(self, start_key): + """ + Sets the row key at which this iterator will begin reading. + """ + self.start_key = start_key + self.start_key_closed = False + + def set_stream(self): + """ + Resets the read stream by making an RPC on the 'ReadRows' endpoint. + """ + req_pb = _create_row_request(self.name, start_key=self.start_key, + start_key_closed=self.start_key_closed, + end_key=self.end_key, + filter_=self.filter_, limit=self.limit, + end_inclusive=self.end_inclusive) + self.stream = self.client._data_stub.ReadRows(req_pb) + + def next(self, *args, **kwargs): + """ + Read and return the next row from the stream. + Retry on idempotent failure. + """ + delay = self.retry_options.backoff_settings.initial_retry_delay_millis + exc = errors.RetryError('Retry total timeout exceeded before any' + 'response was received') + timeout = (self.retry_options.backoff_settings + .initial_rpc_timeout_millis / + _MILLIS_PER_SECOND) + + now = time.time() + deadline = now + self.total_timeout + while deadline is None or now < deadline: + try: + return six.next(self.stream) + except StopIteration as stop: + raise stop + except RpcError as error: # pylint: disable=broad-except + code = config.exc_to_code(error) + if code not in self.retry_options.retry_codes: + six.reraise(type(error), error) + + # pylint: disable=redefined-variable-type + exc = errors.RetryError( + 'Retry total timeout exceeded with exception', error) + + # Sleep a random number which will, on average, equal the + # expected delay. + to_sleep = random.uniform(0, delay * 2) + time.sleep(to_sleep / _MILLIS_PER_SECOND) + delay = min(delay * self.delay_mult, self.max_delay_millis) + now = time.time() + timeout = min( + timeout * self.timeout_mult, self.max_timeout, + deadline - now) + self.set_stream() + + six.reraise(errors.RetryError, exc, sys.exc_info()[2]) + + def __next__(self, *args, **kwargs): + return self.next(*args, **kwargs) + + +def _create_row_request(table_name, row_key=None, start_key=None, + start_key_closed=True, end_key=None, filter_=None, + limit=None, end_inclusive=False): + """Creates a request to read rows in a table. + + :type table_name: str + :param table_name: The name of the table to read from. + + :type row_key: bytes + :param row_key: (Optional) The key of a specific row to read from. + + :type start_key: bytes + :param start_key: (Optional) The beginning of a range of row keys to + read from. The range will include ``start_key``. If + left empty, will be interpreted as the empty string. + + :type end_key: bytes + :param end_key: (Optional) The end of a range of row keys to read from. + The range will not include ``end_key``. If left empty, + will be interpreted as an infinite string. + + :type filter_: :class:`.RowFilter` + :param filter_: (Optional) The filter to apply to the contents of the + specified row(s). If unset, reads the entire table. + + :type limit: int + :param limit: (Optional) The read will terminate after committing to N + rows' worth of results. The default (zero) is to return + all results. + + :rtype: :class:`data_messages_v2_pb2.ReadRowsRequest` + :returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs. + :raises: :class:`ValueError ` if both + ``row_key`` and one of ``start_key`` and ``end_key`` are set + """ + request_kwargs = {'table_name': table_name} + if (row_key is not None and + (start_key is not None or end_key is not None)): + raise ValueError('Row key and row range cannot be ' + 'set simultaneously') + range_kwargs = {} + if start_key is not None or end_key is not None: + if start_key is not None: + if start_key_closed: + range_kwargs['start_key_closed'] = _to_bytes(start_key) + else: + range_kwargs['start_key_open'] = _to_bytes(start_key) + if end_key is not None: + end_key_key = 'end_key_open' + if end_inclusive: + end_key_key = 'end_key_closed' + range_kwargs[end_key_key] = _to_bytes(end_key) + if filter_ is not None: + request_kwargs['filter'] = filter_.to_pb() + if limit is not None: + request_kwargs['rows_limit'] = limit + + message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs) + + if row_key is not None: + message.rows.row_keys.append(_to_bytes(row_key)) + + if range_kwargs: + message.rows.row_ranges.add(**range_kwargs) + + return message diff --git a/bigtable/google/cloud/bigtable/row_data.py b/bigtable/google/cloud/bigtable/row_data.py index 56129f6342b8..2a7f4dbfa2ac 100644 --- a/bigtable/google/cloud/bigtable/row_data.py +++ b/bigtable/google/cloud/bigtable/row_data.py @@ -274,6 +274,9 @@ def consume_next(self): self._validate_chunk(chunk) + if hasattr(self._response_iterator, 'set_start_key'): + self._response_iterator.set_start_key(chunk.row_key) + if chunk.reset_row: row = self._row = None cell = self._cell = self._previous_cell = None diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index aaec98b6265b..fe10360ebcbe 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -17,7 +17,6 @@ import six -from google.cloud._helpers import _to_bytes from google.cloud.bigtable._generated import ( bigtable_pb2 as data_messages_v2_pb2) from google.cloud.bigtable._generated import ( @@ -30,6 +29,26 @@ from google.cloud.bigtable.row import ConditionalRow from google.cloud.bigtable.row import DirectRow from google.cloud.bigtable.row_data import PartialRowsData +from google.gax import RetryOptions, BackoffSettings +from google.cloud.bigtable.retry import ReadRowsIterator, _create_row_request +from grpc import StatusCode + +BACKOFF_SETTINGS = BackoffSettings( + initial_retry_delay_millis=10, + retry_delay_multiplier=1.3, + max_retry_delay_millis=30000, + initial_rpc_timeout_millis=25 * 60 * 1000, + rpc_timeout_multiplier=1.0, + max_rpc_timeout_millis=25 * 60 * 1000, + total_timeout_millis=30 * 60 * 1000 +) + +RETRY_CODES = [ + StatusCode.DEADLINE_EXCEEDED, + StatusCode.ABORTED, + StatusCode.INTERNAL, + StatusCode.UNAVAILABLE +] # Maximum number of mutations in bulk (MutateRowsRequest message): @@ -257,7 +276,7 @@ def read_row(self, row_key, filter_=None): return rows_data.rows[row_key] def read_rows(self, start_key=None, end_key=None, limit=None, - filter_=None, end_inclusive=False): + filter_=None, end_inclusive=False, backoff_settings=None): """Read rows from this table. :type start_key: bytes @@ -288,13 +307,18 @@ def read_rows(self, start_key=None, end_key=None, limit=None, :returns: A :class:`.PartialRowsData` convenience wrapper for consuming the streamed results. """ - request_pb = _create_row_request( - self.name, start_key=start_key, end_key=end_key, filter_=filter_, - limit=limit, end_inclusive=end_inclusive) client = self._instance._client - response_iterator = client._data_stub.ReadRows(request_pb) - # We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse` - return PartialRowsData(response_iterator) + if backoff_settings is None: + backoff_settings = BACKOFF_SETTINGS + RETRY_OPTIONS = RetryOptions( + retry_codes=RETRY_CODES, + backoff_settings=backoff_settings + ) + + retrying_iterator = ReadRowsIterator(client, self.name, start_key, + end_key, filter_, limit, + end_inclusive, RETRY_OPTIONS) + return PartialRowsData(retrying_iterator) def mutate_rows(self, rows): """Mutates multiple rows in bulk. @@ -363,74 +387,6 @@ def sample_row_keys(self): return response_iterator -def _create_row_request(table_name, row_key=None, start_key=None, end_key=None, - filter_=None, limit=None, end_inclusive=False): - """Creates a request to read rows in a table. - - :type table_name: str - :param table_name: The name of the table to read from. - - :type row_key: bytes - :param row_key: (Optional) The key of a specific row to read from. - - :type start_key: bytes - :param start_key: (Optional) The beginning of a range of row keys to - read from. The range will include ``start_key``. If - left empty, will be interpreted as the empty string. - - :type end_key: bytes - :param end_key: (Optional) The end of a range of row keys to read from. - The range will not include ``end_key``. If left empty, - will be interpreted as an infinite string. - - :type filter_: :class:`.RowFilter` - :param filter_: (Optional) The filter to apply to the contents of the - specified row(s). If unset, reads the entire table. - - :type limit: int - :param limit: (Optional) The read will terminate after committing to N - rows' worth of results. The default (zero) is to return - all results. - - :type end_inclusive: bool - :param end_inclusive: (Optional) Whether the ``end_key`` should be - considered inclusive. The default is False (exclusive). - - :rtype: :class:`data_messages_v2_pb2.ReadRowsRequest` - :returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs. - :raises: :class:`ValueError ` if both - ``row_key`` and one of ``start_key`` and ``end_key`` are set - """ - request_kwargs = {'table_name': table_name} - if (row_key is not None and - (start_key is not None or end_key is not None)): - raise ValueError('Row key and row range cannot be ' - 'set simultaneously') - range_kwargs = {} - if start_key is not None or end_key is not None: - if start_key is not None: - range_kwargs['start_key_closed'] = _to_bytes(start_key) - if end_key is not None: - end_key_key = 'end_key_open' - if end_inclusive: - end_key_key = 'end_key_closed' - range_kwargs[end_key_key] = _to_bytes(end_key) - if filter_ is not None: - request_kwargs['filter'] = filter_.to_pb() - if limit is not None: - request_kwargs['rows_limit'] = limit - - message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs) - - if row_key is not None: - message.rows.row_keys.append(_to_bytes(row_key)) - - if range_kwargs: - message.rows.row_ranges.add(**range_kwargs) - - return message - - def _mutate_rows_request(table_name, rows): """Creates a request to mutate rows in a table. diff --git a/bigtable/tests/retry_test_script.txt b/bigtable/tests/retry_test_script.txt new file mode 100644 index 000000000000..863662e897ba --- /dev/null +++ b/bigtable/tests/retry_test_script.txt @@ -0,0 +1,38 @@ +# This retry script is processed by the retry server and the client under test. +# Client tests should parse any command beginning with "CLIENT:", send the corresponding RPC +# to the retry server and expect a valid response. +# "EXPECT" commands indicate the call the server is expecting the client to send. +# +# The retry server has one table named "table" that should be used for testing. +# There are three types of commands supported: +# READ +# Expect the corresponding rows to be returned with arbitrary values. +# SCAN ... +# Ranges are expressed as an interval with either open or closed start and end, +# such as [1,3) for "1,2" or (1, 3] for "2,3". +# WRITE +# All writes should succeed eventually. Value payload is ignored. +# The server writes PASS or FAIL on a line by itself to STDOUT depending on the result of the test. +# All other server output should be ignored. + +# Echo same scan back after immediate error +CLIENT: SCAN [r1,r3) r1,r2 +EXPECT: SCAN [r1,r3) +SERVER: ERROR Unavailable +EXPECT: SCAN [r1,r3) +SERVER: READ_RESPONSE r1,r2 + +# Retry scans with open interval starting at the least read row key. +# Instead of using open intervals for retry ranges, '\x00' can be +# appended to the last received row key and sent in a closed interval. +CLIENT: SCAN [r1,r9) r1,r2,r3,r4,r5,r6,r7,r8 +EXPECT: SCAN [r1,r9) +SERVER: READ_RESPONSE r1,r2,r3,r4 +SERVER: ERROR Unavailable +EXPECT: SCAN (r4,r9) +SERVER: ERROR Unavailable +EXPECT: SCAN (r4,r9) +SERVER: READ_RESPONSE r5,r6,r7 +SERVER: ERROR Unavailable +EXPECT: SCAN (r7,r9) +SERVER: READ_RESPONSE r8 diff --git a/bigtable/tests/system.py b/bigtable/tests/system.py index cfc2cb17f805..68633b59fc35 100644 --- a/bigtable/tests/system.py +++ b/bigtable/tests/system.py @@ -272,6 +272,85 @@ def test_delete_column_family(self): # Make sure we have successfully deleted it. self.assertEqual(temp_table.list_column_families(), {}) + def test_retry(self): + import subprocess, os, stat, platform, ssl + from google.cloud.bigtable.client import Client + from google.cloud.bigtable.instance import Instance + from google.cloud.bigtable.table import Table + + # import for urlopen based on version + try: + # python 3 + from urllib.request import urlopen + except ImportError: + # python 2 + from urllib2 import urlopen + + + TEST_SCRIPT = 'tests/retry_test_script.txt' + SERVER_NAME = 'retry_server' + SERVER_ZIP = SERVER_NAME + ".tar.gz" + + def process_scan(table, range, ids): + range_chunks = range.split(",") + range_open = range_chunks[0].lstrip("[") + range_close = range_chunks[1].rstrip(")") + rows = table.read_rows(range_open, range_close) + rows.consume_all() + + # Download server + MOCK_SERVER_URLS = { + 'Linux': 'https://storage.googleapis.com/cloud-bigtable-test/retries/retry_server_linux.tar.gz', + 'Darwin': 'https://storage.googleapis.com/cloud-bigtable-test/retries/retry_server_mac.tar.gz', + } + + test_platform = platform.system() + if test_platform not in MOCK_SERVER_URLS: + self.skip('Retry server not available for platform {0}.'.format(test_platform)) + + context = ssl._create_unverified_context() + mock_server_download = urlopen(MOCK_SERVER_URLS[test_platform], context=context).read() + mock_server_file = open(SERVER_ZIP, 'wb') + mock_server_file.write(mock_server_download) + + # Unzip server + subprocess.call(['tar', 'zxvf', SERVER_ZIP, '-C', '.']) + + # Connect to server + server = subprocess.Popen( + ['./' + SERVER_NAME, '--script=' + TEST_SCRIPT], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + (endpoint, port) = server.stdout.readline().decode("utf-8").rstrip("\n").split(":") + os.environ["BIGTABLE_EMULATOR_HOST"] = endpoint + ":" + port + client = Client(project="client", admin=True) + instance = Instance("instance", client) + table = instance.table("table") + + # Run test, line by line + with open(TEST_SCRIPT, 'r') as script: + for line in script.readlines(): + if line.startswith("CLIENT:"): + chunks = line.split(" ") + op = chunks[1] + process_scan(table, chunks[2], chunks[3]) + + # Check that the test passed + server.kill() + server_stdout_lines = [] + while True: + line = server.stdout.readline().decode("utf-8") + if line != '': + server_stdout_lines.append(line) + else: + break + self.assertEqual(server_stdout_lines[-1], "PASS\n") + + # Clean up + os.remove(SERVER_ZIP) + os.remove(SERVER_NAME) class TestDataAPI(unittest.TestCase): diff --git a/bigtable/tests/unit/_testing.py b/bigtable/tests/unit/_testing.py index e67af6a1498c..7587c66c133b 100644 --- a/bigtable/tests/unit/_testing.py +++ b/bigtable/tests/unit/_testing.py @@ -14,7 +14,6 @@ """Mocks used to emulate gRPC generated objects.""" - class _FakeStub(object): """Acts as a gPRC stub.""" @@ -27,6 +26,16 @@ def __getattr__(self, name): # since __getattribute__ will handle them. return _MethodMock(name, self) +class _CustomFakeStub(object): + """Acts as a gRPC stub. Generates a result using an injected callable.""" + def __init__(self, result_callable): + self.result_callable = result_callable + self.method_calls = [] + + def __getattr__(self, name): + # We need not worry about attributes set in constructor + # since __getattribute__ will handle them. + return _CustomMethodMock(name, self) class _MethodMock(object): """Mock for API method attached to a gRPC stub. @@ -42,5 +51,19 @@ def __call__(self, *args, **kwargs): """Sync method meant to mock a gRPC stub request.""" self._stub.method_calls.append((self._name, args, kwargs)) curr_result, self._stub.results = (self._stub.results[0], - self._stub.results[1:]) + self._stub.results[1:]) return curr_result + +class _CustomMethodMock(object): + """ + Same as _MethodMock, but backed by an injected callable. + """ + + def __init__(self, name, stub): + self._name = name + self._stub = stub + + def __call__(self, *args, **kwargs): + """Sync method meant to mock a gRPC stub request.""" + self._stub.method_calls.append((self._name, args, kwargs)) + return self._stub.result_callable() diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index 3890d097f572..1b83f197086a 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -493,7 +493,8 @@ def test_read_rows(self): from google.cloud._testing import _Monkey from tests.unit._testing import _FakeStub from google.cloud.bigtable.row_data import PartialRowsData - from google.cloud.bigtable import table as MUT + from google.cloud.bigtable import retry as MUT + from google.cloud.bigtable.retry import ReadRowsIterator client = _Client() instance = _Instance(self.INSTANCE_NAME, client=client) @@ -513,20 +514,18 @@ def mock_create_row_request(table_name, **kwargs): # Patch the stub used by the API method. client._data_stub = stub = _FakeStub(response_iterator) - # Create expected_result. - expected_result = PartialRowsData(response_iterator) - - # Perform the method and check the result. start_key = b'start-key' end_key = b'end-key' filter_obj = object() limit = 22 with _Monkey(MUT, _create_row_request=mock_create_row_request): + # Perform the method and check the result. result = table.read_rows( start_key=start_key, end_key=end_key, filter_=filter_obj, limit=limit) - self.assertEqual(result, expected_result) + self.assertIsInstance(result._response_iterator, ReadRowsIterator) + self.assertEqual(result._response_iterator.client, client) self.assertEqual(stub.method_calls, [( 'ReadRows', (request_pb,), @@ -534,13 +533,170 @@ def mock_create_row_request(table_name, **kwargs): )]) created_kwargs = { 'start_key': start_key, + 'end_inclusive': False, 'end_key': end_key, 'filter_': filter_obj, 'limit': limit, - 'end_inclusive': False, + 'start_key_closed': True, } self.assertEqual(mock_created, [(table.name, created_kwargs)]) + def test_read_rows_one_chunk(self): + from google.cloud._testing import _Monkey + from tests.unit._testing import _FakeStub + from google.cloud.bigtable import retry as MUT + from google.cloud.bigtable.retry import ReadRowsIterator + from google.cloud.bigtable.row_data import Cell + from google.cloud.bigtable.row_data import PartialRowsData + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_one(self.TABLE_ID, instance) + + # Create request_pb + request_pb = object() # Returned by our mock. + mock_created = [] + + def mock_create_row_request(table_name, **kwargs): + mock_created.append((table_name, kwargs)) + return request_pb + + # Create response_iterator + chunk = _ReadRowsResponseCellChunkPB( + row_key=self.ROW_KEY, + family_name=self.FAMILY_NAME, + qualifier=self.QUALIFIER, + timestamp_micros=self.TIMESTAMP_MICROS, + value=self.VALUE, + commit_row=True, + ) + response_pb = _ReadRowsResponsePB(chunks=[chunk]) + response_iterator = iter([response_pb]) + + # Patch the stub used by the API method. + client._data_stub = stub = _FakeStub(response_iterator) + + start_key = b'start-key' + end_key = b'end-key' + filter_obj = object() + limit = 22 + with _Monkey(MUT, _create_row_request=mock_create_row_request): + # Perform the method and check the result. + result = table.read_rows( + start_key=start_key, end_key=end_key, filter_=filter_obj, + limit=limit) + result.consume_all() + + def test_read_rows_retry_timeout(self): + from google.cloud._testing import _Monkey + from tests.unit._testing import _CustomFakeStub + from google.cloud.bigtable.row_data import PartialRowsData + from google.cloud.bigtable import retry as MUT + from google.cloud.bigtable.retry import ReadRowsIterator + from google.gax import BackoffSettings + from google.gax.errors import RetryError + from grpc import StatusCode, RpcError + import time + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_one(self.TABLE_ID, instance) + + # Create request_pb + request_pb = object() # Returned by our mock. + mock_created = [] + + def mock_create_row_request(table_name, **kwargs): + mock_created.append((table_name, kwargs)) + return request_pb + + # Create a slow response iterator to cause a timeout + class MockTimeoutError(RpcError): + def code(self): + return StatusCode.DEADLINE_EXCEEDED + + def _wait_then_raise(): + time.sleep(0.1) + raise MockTimeoutError() + + # Patch the stub used by the API method. The stub should create a new + # slow_iterator every time its queried. + def make_slow_iterator(): + return (_wait_then_raise() for i in range(10)) + client._data_stub = stub = _CustomFakeStub(make_slow_iterator) + + # Set to timeout before RPC completes + test_backoff_settings = BackoffSettings( + initial_retry_delay_millis=10, + retry_delay_multiplier=0.3, + max_retry_delay_millis=30000, + initial_rpc_timeout_millis=1000, + rpc_timeout_multiplier=1.0, + max_rpc_timeout_millis=25 * 60 * 1000, + total_timeout_millis=1000 + ) + + start_key = b'start-key' + end_key = b'end-key' + filter_obj = object() + limit = 22 + with _Monkey(MUT, _create_row_request=mock_create_row_request): + # Verify that a RetryError is thrown on read. + result = table.read_rows( + start_key=start_key, end_key=end_key, filter_=filter_obj, + limit=limit, backoff_settings=test_backoff_settings) + with self.assertRaises(RetryError): + result.consume_next() + + def test_read_rows_non_idempotent_error_throws(self): + from google.cloud._testing import _Monkey + from tests.unit._testing import _CustomFakeStub + from google.cloud.bigtable.row_data import PartialRowsData + from google.cloud.bigtable import retry as MUT + from google.cloud.bigtable.retry import ReadRowsIterator + from google.gax import BackoffSettings + from google.gax.errors import RetryError + from grpc import StatusCode, RpcError + import time + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_one(self.TABLE_ID, instance) + + # Create request_pb + request_pb = object() # Returned by our mock. + mock_created = [] + + def mock_create_row_request(table_name, **kwargs): + mock_created.append((table_name, kwargs)) + return request_pb + + # Create response iterator that raises a non-idempotent exception + class MockNonIdempotentError(RpcError): + def code(self): + return StatusCode.RESOURCE_EXHAUSTED + + def _raise(): + raise MockNonIdempotentError() + + # Patch the stub used by the API method. The stub should create a new + # slow_iterator every time its queried. + def make_raising_iterator(): + return (_raise() for i in range(10)) + client._data_stub = stub = _CustomFakeStub(make_raising_iterator) + + start_key = b'start-key' + end_key = b'end-key' + filter_obj = object() + limit = 22 + with _Monkey(MUT, _create_row_request=mock_create_row_request): + # Verify that a RetryError is thrown on read. + result = table.read_rows( + start_key=start_key, end_key=end_key, filter_=filter_obj, + limit=limit) + with self.assertRaises(MockNonIdempotentError): + result.consume_next() + def test_sample_row_keys(self): from tests.unit._testing import _FakeStub @@ -573,12 +729,14 @@ def test_sample_row_keys(self): class Test__create_row_request(unittest.TestCase): def _call_fut(self, table_name, row_key=None, start_key=None, end_key=None, - filter_=None, limit=None, end_inclusive=False): - from google.cloud.bigtable.table import _create_row_request + filter_=None, limit=None, start_key_closed=True, + end_inclusive=False): + from google.cloud.bigtable.retry import _create_row_request return _create_row_request( table_name, row_key=row_key, start_key=start_key, end_key=end_key, - filter_=filter_, limit=limit, end_inclusive=end_inclusive) + start_key_closed=start_key_closed, filter_=filter_, + limit=limit, end_inclusive=end_inclusive) def test_table_name_only(self): table_name = 'table_name' @@ -601,7 +759,7 @@ def test_row_key(self): expected_result.rows.row_keys.append(row_key) self.assertEqual(result, expected_result) - def test_row_range_start_key(self): + def test_row_range_start_key_closed(self): table_name = 'table_name' start_key = b'start_key' result = self._call_fut(table_name, start_key=start_key) @@ -609,6 +767,15 @@ def test_row_range_start_key(self): expected_result.rows.row_ranges.add(start_key_closed=start_key) self.assertEqual(result, expected_result) + def test_row_range_start_key_open(self): + table_name = 'table_name' + start_key = b'start_key' + result = self._call_fut(table_name, start_key=start_key, + start_key_closed=False) + expected_result = _ReadRowsRequestPB(table_name=table_name) + expected_result.rows.row_ranges.add(start_key_open=start_key) + self.assertEqual(result, expected_result) + def test_row_range_end_key(self): table_name = 'table_name' end_key = b'end_key' From eb0c9e6bcf5df5c48b117537e02a6641746c453b Mon Sep 17 00:00:00 2001 From: Cal Peyser Date: Tue, 12 Sep 2017 21:21:33 -0400 Subject: [PATCH 2/6] for CI builds, download bigtable test server before system tests run --- .circleci/config.yml | 2 + bigtable/tests/system.py | 43 +++++++++++-------- bigtable/tests/unit/_testing.py | 2 +- .../scripts/circleci/prepare_bigtable.sh | 24 +++++++++++ 4 files changed, 51 insertions(+), 20 deletions(-) create mode 100755 test_utils/scripts/circleci/prepare_bigtable.sh diff --git a/.circleci/config.yml b/.circleci/config.yml index 2352d187edc1..6149cc052ca3 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -46,6 +46,8 @@ jobs: name: Run tests - google.cloud.bigtable command: | if [[ -n $(grep bigtable ~/target_packages) ]]; then + test_utils/scripts/circleci/prepare_bigtable.sh + export DOWNLOAD_BIGTABLE_SERVER=0 nox -f bigtable/nox.py fi - run: diff --git a/bigtable/tests/system.py b/bigtable/tests/system.py index 68633b59fc35..c586ed9b6058 100644 --- a/bigtable/tests/system.py +++ b/bigtable/tests/system.py @@ -286,11 +286,30 @@ def test_retry(self): # python 2 from urllib2 import urlopen - TEST_SCRIPT = 'tests/retry_test_script.txt' SERVER_NAME = 'retry_server' SERVER_ZIP = SERVER_NAME + ".tar.gz" + def download_server(): + # Download server + MOCK_SERVER_URLS = { + 'Linux': 'https://storage.googleapis.com/cloud-bigtable-test/retries/retry_server_linux.tar.gz', + 'Darwin': 'https://storage.googleapis.com/cloud-bigtable-test/retries/retry_server_mac.tar.gz', + } + + test_platform = platform.system() + if test_platform not in MOCK_SERVER_URLS: + self.skip('Retry server not available for platform {0}.'.format(test_platform)) + + context = ssl._create_unverified_context() + mock_server_download = urlopen(MOCK_SERVER_URLS[test_platform], context=context).read() + mock_server_file = open(SERVER_ZIP, 'wb') + mock_server_file.write(mock_server_download) + + # Unzip server + subprocess.call(['tar', 'zxvf', SERVER_ZIP, '-C', '.']) + os.remove(SERVER_ZIP) + def process_scan(table, range, ids): range_chunks = range.split(",") range_open = range_chunks[0].lstrip("[") @@ -298,23 +317,10 @@ def process_scan(table, range, ids): rows = table.read_rows(range_open, range_close) rows.consume_all() - # Download server - MOCK_SERVER_URLS = { - 'Linux': 'https://storage.googleapis.com/cloud-bigtable-test/retries/retry_server_linux.tar.gz', - 'Darwin': 'https://storage.googleapis.com/cloud-bigtable-test/retries/retry_server_mac.tar.gz', - } - - test_platform = platform.system() - if test_platform not in MOCK_SERVER_URLS: - self.skip('Retry server not available for platform {0}.'.format(test_platform)) - - context = ssl._create_unverified_context() - mock_server_download = urlopen(MOCK_SERVER_URLS[test_platform], context=context).read() - mock_server_file = open(SERVER_ZIP, 'wb') - mock_server_file.write(mock_server_download) - - # Unzip server - subprocess.call(['tar', 'zxvf', SERVER_ZIP, '-C', '.']) + should_download = os.environ.get("DOWNLOAD_BIGTABLE_SERVER") + if should_download is None or should_download == '1': + if not os.path.isfile(SERVER_NAME): + download_server() # Connect to server server = subprocess.Popen( @@ -349,7 +355,6 @@ def process_scan(table, range, ids): self.assertEqual(server_stdout_lines[-1], "PASS\n") # Clean up - os.remove(SERVER_ZIP) os.remove(SERVER_NAME) class TestDataAPI(unittest.TestCase): diff --git a/bigtable/tests/unit/_testing.py b/bigtable/tests/unit/_testing.py index 7587c66c133b..4c0463e53825 100644 --- a/bigtable/tests/unit/_testing.py +++ b/bigtable/tests/unit/_testing.py @@ -51,7 +51,7 @@ def __call__(self, *args, **kwargs): """Sync method meant to mock a gRPC stub request.""" self._stub.method_calls.append((self._name, args, kwargs)) curr_result, self._stub.results = (self._stub.results[0], - self._stub.results[1:]) + self._stub.results[1:]) return curr_result class _CustomMethodMock(object): diff --git a/test_utils/scripts/circleci/prepare_bigtable.sh b/test_utils/scripts/circleci/prepare_bigtable.sh new file mode 100755 index 000000000000..88f85c00425c --- /dev/null +++ b/test_utils/scripts/circleci/prepare_bigtable.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -ev + +mkdir -p ~/bigtable +if [ ! -f ~/bigtable/retry_server ]; then + wget https://storage.googleapis.com/cloud-bigtable-test/retries/retry_server_linux.tar.gz + tar -zxvf retry_server_linux.tar.gz --no-same-owner -C /var/code/gcp/bigtable/ + rm retry_server_linux.tar.gz +fi From 7bfe919e8168fc10f69fe86a54667056825d2181 Mon Sep 17 00:00:00 2001 From: Cal Peyser Date: Wed, 27 Sep 2017 16:59:35 -0400 Subject: [PATCH 3/6] Support retries in the middle of rows --- bigtable/google/cloud/bigtable/retry.py | 20 +++- bigtable/google/cloud/bigtable/row_data.py | 11 +- bigtable/tests/unit/_testing.py | 9 +- bigtable/tests/unit/test_table.py | 120 ++++++++++++++++++--- 4 files changed, 137 insertions(+), 23 deletions(-) diff --git a/bigtable/google/cloud/bigtable/retry.py b/bigtable/google/cloud/bigtable/retry.py index 0ca846475feb..7f0fafbbab37 100644 --- a/bigtable/google/cloud/bigtable/retry.py +++ b/bigtable/google/cloud/bigtable/retry.py @@ -56,6 +56,7 @@ def __init__(self, client, name, start_key, end_key, filter_, limit, self.total_timeout = \ (retry_options.backoff_settings.total_timeout_millis / _MILLIS_PER_SECOND) + self._responses_for_row = 0 self.set_stream() def set_start_key(self, start_key): @@ -76,9 +77,22 @@ def set_stream(self): end_inclusive=self.end_inclusive) self.stream = self.client._data_stub.ReadRows(req_pb) + @property + def responses_for_row(self): + """ Property that gives the number of calls made so far for the current + row. If 1, then either this row is being read for the first time, + or the most recent response requied a retry, causing the row to be + read again + + :rtype: int + :returns: Int that gives the number of calls make so far for the + current row. + """ + return self._responses_for_row + def next(self, *args, **kwargs): """ - Read and return the next row from the stream. + Read and return the next chunk from the stream. Retry on idempotent failure. """ delay = self.retry_options.backoff_settings.initial_retry_delay_millis @@ -91,8 +105,9 @@ def next(self, *args, **kwargs): now = time.time() deadline = now + self.total_timeout while deadline is None or now < deadline: + self._responses_for_row += 1 try: - return six.next(self.stream) + return(six.next(self.stream)) except StopIteration as stop: raise stop except RpcError as error: # pylint: disable=broad-except @@ -113,6 +128,7 @@ def next(self, *args, **kwargs): timeout = min( timeout * self.timeout_mult, self.max_timeout, deadline - now) + self._responses_for_row = 0 self.set_stream() six.reraise(errors.RetryError, exc, sys.exc_info()[2]) diff --git a/bigtable/google/cloud/bigtable/row_data.py b/bigtable/google/cloud/bigtable/row_data.py index 2a7f4dbfa2ac..da89a561eb6d 100644 --- a/bigtable/google/cloud/bigtable/row_data.py +++ b/bigtable/google/cloud/bigtable/row_data.py @@ -267,6 +267,10 @@ def consume_next(self): self._last_scanned_row_key = response.last_scanned_row_key + if hasattr(self._response_iterator, 'responses_for_row'): + if (self._response_iterator.responses_for_row == 1): + self._clear_accumulated_row() + row = self._row cell = self._cell @@ -299,7 +303,7 @@ def consume_next(self): if chunk.commit_row: self._save_current_row() - row = cell = None + row = cell = None continue if chunk.value_size == 0: @@ -344,6 +348,11 @@ def _validate_chunk_status(chunk): # No negative value_size (inferred as a general constraint). _raise_if(chunk.value_size < 0) + def _clear_accumulated_row(self): + self._row = None + self._cell = None + self._previous_cell = None + def _validate_chunk_new_row(self, chunk): """Helper for :meth:`_validate_chunk`.""" assert self.state == self.NEW_ROW diff --git a/bigtable/tests/unit/_testing.py b/bigtable/tests/unit/_testing.py index 4c0463e53825..895b504627a8 100644 --- a/bigtable/tests/unit/_testing.py +++ b/bigtable/tests/unit/_testing.py @@ -27,9 +27,10 @@ def __getattr__(self, name): return _MethodMock(name, self) class _CustomFakeStub(object): - """Acts as a gRPC stub. Generates a result using an injected callable.""" - def __init__(self, result_callable): - self.result_callable = result_callable + """Acts as a gRPC stub. Generates a result from a given iterator + """ + def __init__(self, result): + self.result = result self.method_calls = [] def __getattr__(self, name): @@ -66,4 +67,4 @@ def __init__(self, name, stub): def __call__(self, *args, **kwargs): """Sync method meant to mock a gRPC stub request.""" self._stub.method_calls.append((self._name, args, kwargs)) - return self._stub.result_callable() + return self._stub.result diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index 1b83f197086a..62ce4f0c706a 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -615,15 +615,13 @@ class MockTimeoutError(RpcError): def code(self): return StatusCode.DEADLINE_EXCEEDED - def _wait_then_raise(): - time.sleep(0.1) - raise MockTimeoutError() + class MockTimeoutIterator(object): + def next(self): + return self.__next__() + def __next__(self): + raise MockTimeoutError() - # Patch the stub used by the API method. The stub should create a new - # slow_iterator every time its queried. - def make_slow_iterator(): - return (_wait_then_raise() for i in range(10)) - client._data_stub = stub = _CustomFakeStub(make_slow_iterator) + client._data_stub = stub = _CustomFakeStub(MockTimeoutIterator()) # Set to timeout before RPC completes test_backoff_settings = BackoffSettings( @@ -648,6 +646,98 @@ def make_slow_iterator(): with self.assertRaises(RetryError): result.consume_next() + def test_read_rows_mid_row_timeout_retry(self): + from google.cloud._testing import _Monkey + from tests.unit._testing import _CustomFakeStub + from google.cloud.bigtable.row_data import PartialRowsData + from google.cloud.bigtable import retry as MUT + from google.cloud.bigtable.retry import ReadRowsIterator + from google.gax import BackoffSettings + from google.gax.errors import RetryError + from grpc import StatusCode, RpcError + import time + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_one(self.TABLE_ID, instance) + + # Create request_pb + request_pb = object() # Returned by our mock. + mock_created = [] + + def mock_create_row_request(table_name, **kwargs): + mock_created.append((table_name, kwargs)) + return request_pb + + # Create an iterator that throws an idempotent exception + class MockTimeoutError(RpcError): + def code(self): + return StatusCode.DEADLINE_EXCEEDED + + first_chunk = _ReadRowsResponseCellChunkPB( + row_key=self.ROW_KEY, + family_name=self.FAMILY_NAME, + qualifier=self.QUALIFIER, + timestamp_micros=self.TIMESTAMP_MICROS, + value=self.VALUE, + ) + first_response = _ReadRowsResponsePB(chunks = [first_chunk]) + + second_chunk = _ReadRowsResponseCellChunkPB( + row_key=self.ROW_KEY, + family_name=self.FAMILY_NAME, + qualifier=self.QUALIFIER, + timestamp_micros=self.TIMESTAMP_MICROS, + value=self.VALUE, + commit_row=True, + ) + second_response = _ReadRowsResponsePB(chunks = [second_chunk]) + + class MidRowTimeoutIterator(object): + def __init__(self): + self.invocation_count = 0 + def next(self): + return self.__next__() + def __next__(self): + self.invocation_count += 1 + if (self.invocation_count == 1): + return first_response + elif (self.invocation_count == 2): + raise MockTimeoutError() + elif (self.invocation_count == 3): + return first_response + elif (self.invocation_count == 4): + return second_response + else: + raise StopIteration + + client._data_stub = stub = _CustomFakeStub(MidRowTimeoutIterator()) + + # Set to timeout before RPC completes + test_backoff_settings = BackoffSettings( + initial_retry_delay_millis=10, + retry_delay_multiplier=1, + max_retry_delay_millis=30000, + initial_rpc_timeout_millis=1000, + rpc_timeout_multiplier=1.0, + max_rpc_timeout_millis=25 * 60 * 1000, + total_timeout_millis=1000 + ) + + start_key = b'start-key' + end_key = b'end-key' + filter_obj = object() + limit = 22 + with _Monkey(MUT, _create_row_request=mock_create_row_request): + # Verify that a RetryError is thrown on read. + result = table.read_rows( + start_key=start_key, end_key=end_key, filter_=filter_obj, + limit=limit, backoff_settings=test_backoff_settings) + result.consume_all() + + cells = result.rows[self.ROW_KEY].cells[self.FAMILY_NAME][self.QUALIFIER] + self.assertEquals(len(cells), 2) + def test_read_rows_non_idempotent_error_throws(self): from google.cloud._testing import _Monkey from tests.unit._testing import _CustomFakeStub @@ -676,21 +766,19 @@ class MockNonIdempotentError(RpcError): def code(self): return StatusCode.RESOURCE_EXHAUSTED - def _raise(): - raise MockNonIdempotentError() + class MockNonIdempotentIterator(object): + def next(self): + return self.__next__() + def __next__(self): + raise MockNonIdempotentError() - # Patch the stub used by the API method. The stub should create a new - # slow_iterator every time its queried. - def make_raising_iterator(): - return (_raise() for i in range(10)) - client._data_stub = stub = _CustomFakeStub(make_raising_iterator) + client._data_stub = stub = _CustomFakeStub(MockNonIdempotentIterator()) start_key = b'start-key' end_key = b'end-key' filter_obj = object() limit = 22 with _Monkey(MUT, _create_row_request=mock_create_row_request): - # Verify that a RetryError is thrown on read. result = table.read_rows( start_key=start_key, end_key=end_key, filter_=filter_obj, limit=limit) From ff5cd92886e00134c637cf8b424c841d531d027d Mon Sep 17 00:00:00 2001 From: Cal Peyser Date: Wed, 27 Sep 2017 16:59:59 -0400 Subject: [PATCH 4/6] cleanup bigtable system test --- .circleci/config.yml | 2 +- bigtable/google/cloud/bigtable/row_data.py | 2 +- bigtable/tests/system.py | 11 +++++------ 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 6149cc052ca3..851b2d1a384b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -47,7 +47,7 @@ jobs: command: | if [[ -n $(grep bigtable ~/target_packages) ]]; then test_utils/scripts/circleci/prepare_bigtable.sh - export DOWNLOAD_BIGTABLE_SERVER=0 + export DOWNLOAD_BIGTABLE_TEST_SERVER=0 nox -f bigtable/nox.py fi - run: diff --git a/bigtable/google/cloud/bigtable/row_data.py b/bigtable/google/cloud/bigtable/row_data.py index da89a561eb6d..220bf3f229d2 100644 --- a/bigtable/google/cloud/bigtable/row_data.py +++ b/bigtable/google/cloud/bigtable/row_data.py @@ -303,7 +303,7 @@ def consume_next(self): if chunk.commit_row: self._save_current_row() - row = cell = None + row = cell = None continue if chunk.value_size == 0: diff --git a/bigtable/tests/system.py b/bigtable/tests/system.py index c586ed9b6058..4963a3b69cad 100644 --- a/bigtable/tests/system.py +++ b/bigtable/tests/system.py @@ -291,7 +291,6 @@ def test_retry(self): SERVER_ZIP = SERVER_NAME + ".tar.gz" def download_server(): - # Download server MOCK_SERVER_URLS = { 'Linux': 'https://storage.googleapis.com/cloud-bigtable-test/retries/retry_server_linux.tar.gz', 'Darwin': 'https://storage.googleapis.com/cloud-bigtable-test/retries/retry_server_mac.tar.gz', @@ -306,18 +305,18 @@ def download_server(): mock_server_file = open(SERVER_ZIP, 'wb') mock_server_file.write(mock_server_download) - # Unzip server + # Extract server binary from archive subprocess.call(['tar', 'zxvf', SERVER_ZIP, '-C', '.']) os.remove(SERVER_ZIP) def process_scan(table, range, ids): - range_chunks = range.split(",") - range_open = range_chunks[0].lstrip("[") - range_close = range_chunks[1].rstrip(")") + range_chunks = range.split(',') + range_open = range_chunks[0].lstrip('[]') + range_close = range_chunks[1].rstrip(')') rows = table.read_rows(range_open, range_close) rows.consume_all() - should_download = os.environ.get("DOWNLOAD_BIGTABLE_SERVER") + should_download = os.environ.get('DOWNLOAD_BIGTABLE_TEST_SERVER') if should_download is None or should_download == '1': if not os.path.isfile(SERVER_NAME): download_server() From 3d2daeb3f2ee1da414373540c62e35412f42a621 Mon Sep 17 00:00:00 2001 From: Cal Peyser Date: Mon, 16 Oct 2017 18:05:28 -0400 Subject: [PATCH 5/6] fixes for middle-row failures --- bigtable/google/cloud/bigtable/retry.py | 16 ++++++++-------- bigtable/google/cloud/bigtable/row_data.py | 7 ++++--- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/bigtable/google/cloud/bigtable/retry.py b/bigtable/google/cloud/bigtable/retry.py index 7f0fafbbab37..687da4bc65cb 100644 --- a/bigtable/google/cloud/bigtable/retry.py +++ b/bigtable/google/cloud/bigtable/retry.py @@ -81,15 +81,21 @@ def set_stream(self): def responses_for_row(self): """ Property that gives the number of calls made so far for the current row. If 1, then either this row is being read for the first time, - or the most recent response requied a retry, causing the row to be + or the most recent response required a retry, causing the row to be read again :rtype: int - :returns: Int that gives the number of calls make so far for the + :returns: Int that gives the number of calls made so far for the current row. """ return self._responses_for_row + def clear_responses_for_row(self): + """ + Signals that a new row has been started. + """ + self._responses_for_row = 0 + def next(self, *args, **kwargs): """ Read and return the next chunk from the stream. @@ -98,9 +104,6 @@ def next(self, *args, **kwargs): delay = self.retry_options.backoff_settings.initial_retry_delay_millis exc = errors.RetryError('Retry total timeout exceeded before any' 'response was received') - timeout = (self.retry_options.backoff_settings - .initial_rpc_timeout_millis / - _MILLIS_PER_SECOND) now = time.time() deadline = now + self.total_timeout @@ -125,9 +128,6 @@ def next(self, *args, **kwargs): time.sleep(to_sleep / _MILLIS_PER_SECOND) delay = min(delay * self.delay_mult, self.max_delay_millis) now = time.time() - timeout = min( - timeout * self.timeout_mult, self.max_timeout, - deadline - now) self._responses_for_row = 0 self.set_stream() diff --git a/bigtable/google/cloud/bigtable/row_data.py b/bigtable/google/cloud/bigtable/row_data.py index 220bf3f229d2..a01a24b8d036 100644 --- a/bigtable/google/cloud/bigtable/row_data.py +++ b/bigtable/google/cloud/bigtable/row_data.py @@ -278,9 +278,6 @@ def consume_next(self): self._validate_chunk(chunk) - if hasattr(self._response_iterator, 'set_start_key'): - self._response_iterator.set_start_key(chunk.row_key) - if chunk.reset_row: row = self._row = None cell = self._cell = self._previous_cell = None @@ -303,6 +300,10 @@ def consume_next(self): if chunk.commit_row: self._save_current_row() + if hasattr(self._response_iterator, 'set_start_key'): + self._response_iterator.set_start_key(chunk.row_key) + if hasattr(self._response_iterator, 'clear_responses_for_row'): + self._response_iterator.clear_responses_for_row() row = cell = None continue From f73b80119b38cd8ea895c0630d634b669ec5c6ca Mon Sep 17 00:00:00 2001 From: Cal Peyser Date: Tue, 14 Nov 2017 09:36:40 -0500 Subject: [PATCH 6/6] remove redundant _create_row_request implementation from bigtable --- bigtable/google/cloud/bigtable/table.py | 72 ------------------------- 1 file changed, 72 deletions(-) diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index 7bb30ceaddbf..fbebb58c968b 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -15,12 +15,9 @@ """User-friendly container for Google Cloud Bigtable Table.""" -import six - from google.api_core.exceptions import RetryError from google.api_core.retry import if_exception_type from google.api_core.retry import Retry -from google.cloud._helpers import _to_bytes from google.cloud.bigtable._generated import ( bigtable_pb2 as data_messages_v2_pb2) from google.cloud.bigtable._generated import ( @@ -55,7 +52,6 @@ ] - # Maximum number of mutations in bulk (MutateRowsRequest message): # (https://cloud.google.com/bigtable/docs/reference/data/rpc/ # google.bigtable.v2#google.bigtable.v2.MutateRowRequest) @@ -522,74 +518,6 @@ def _do_mutate_retryable_rows(self): return self.responses_statuses -def _create_row_request(table_name, row_key=None, start_key=None, end_key=None, - filter_=None, limit=None, end_inclusive=False): - """Creates a request to read rows in a table. - - :type table_name: str - :param table_name: The name of the table to read from. - - :type row_key: bytes - :param row_key: (Optional) The key of a specific row to read from. - - :type start_key: bytes - :param start_key: (Optional) The beginning of a range of row keys to - read from. The range will include ``start_key``. If - left empty, will be interpreted as the empty string. - - :type end_key: bytes - :param end_key: (Optional) The end of a range of row keys to read from. - The range will not include ``end_key``. If left empty, - will be interpreted as an infinite string. - - :type filter_: :class:`.RowFilter` - :param filter_: (Optional) The filter to apply to the contents of the - specified row(s). If unset, reads the entire table. - - :type limit: int - :param limit: (Optional) The read will terminate after committing to N - rows' worth of results. The default (zero) is to return - all results. - - :type end_inclusive: bool - :param end_inclusive: (Optional) Whether the ``end_key`` should be - considered inclusive. The default is False (exclusive). - - :rtype: :class:`data_messages_v2_pb2.ReadRowsRequest` - :returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs. - :raises: :class:`ValueError ` if both - ``row_key`` and one of ``start_key`` and ``end_key`` are set - """ - request_kwargs = {'table_name': table_name} - if (row_key is not None and - (start_key is not None or end_key is not None)): - raise ValueError('Row key and row range cannot be ' - 'set simultaneously') - range_kwargs = {} - if start_key is not None or end_key is not None: - if start_key is not None: - range_kwargs['start_key_closed'] = _to_bytes(start_key) - if end_key is not None: - end_key_key = 'end_key_open' - if end_inclusive: - end_key_key = 'end_key_closed' - range_kwargs[end_key_key] = _to_bytes(end_key) - if filter_ is not None: - request_kwargs['filter'] = filter_.to_pb() - if limit is not None: - request_kwargs['rows_limit'] = limit - - message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs) - - if row_key is not None: - message.rows.row_keys.append(_to_bytes(row_key)) - - if range_kwargs: - message.rows.row_ranges.add(**range_kwargs) - - return message - - def _mutate_rows_request(table_name, rows): """Creates a request to mutate rows in a table.