From bd023fe7fae373934ab66e562da7cb4a4ce7c1ea Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Mon, 4 Dec 2017 10:27:54 -0800 Subject: [PATCH] Revert "Add RPC retries to Bigtable (#3811)" This reverts commit 5a0e5495946004c8c2b9717e2252b260c6f16836 / PR #3811. --- .circleci/config.yml | 2 - bigtable/google/cloud/bigtable/retry.py | 205 ------------- bigtable/google/cloud/bigtable/row_data.py | 13 - bigtable/google/cloud/bigtable/table.py | 107 +++++-- bigtable/tests/retry_test_script.txt | 38 --- bigtable/tests/system.py | 83 ------ bigtable/tests/unit/_testing.py | 26 +- bigtable/tests/unit/test_table.py | 277 +----------------- .../scripts/circleci/prepare_bigtable.sh | 24 -- 9 files changed, 88 insertions(+), 687 deletions(-) delete mode 100644 bigtable/google/cloud/bigtable/retry.py delete mode 100644 bigtable/tests/retry_test_script.txt delete mode 100755 test_utils/scripts/circleci/prepare_bigtable.sh diff --git a/.circleci/config.yml b/.circleci/config.yml index 4de4c3d9f146..7eaffb6115a7 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -62,8 +62,6 @@ jobs: name: Run tests - google.cloud.bigtable command: | if [[ -n $(grep bigtable ~/target_packages) ]]; then - test_utils/scripts/circleci/prepare_bigtable.sh - export DOWNLOAD_BIGTABLE_TEST_SERVER=0 nox -f bigtable/nox.py fi - run: diff --git a/bigtable/google/cloud/bigtable/retry.py b/bigtable/google/cloud/bigtable/retry.py deleted file mode 100644 index 687da4bc65cb..000000000000 --- a/bigtable/google/cloud/bigtable/retry.py +++ /dev/null @@ -1,205 +0,0 @@ -# Copyright 2017 Google Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Provides function wrappers that implement retrying.""" - -import random -import time -import six -import sys - -from google.cloud._helpers import _to_bytes -from google.cloud.bigtable._generated import ( - bigtable_pb2 as data_messages_v2_pb2) -from google.gax import config, errors -from grpc import RpcError - - -_MILLIS_PER_SECOND = 1000 - - -class ReadRowsIterator(object): - """Creates an iterator equivalent to a_iter, but that retries on certain - exceptions. - """ - - def __init__(self, client, name, start_key, end_key, filter_, limit, - end_inclusive, retry_options, **kwargs): - self.client = client - self.retry_options = retry_options - self.name = name - self.start_key = start_key - self.start_key_closed = True - self.end_key = end_key - self.filter_ = filter_ - self.limit = limit - self.end_inclusive = end_inclusive - self.delay_mult = retry_options.backoff_settings.retry_delay_multiplier - self.max_delay_millis = \ - retry_options.backoff_settings.max_retry_delay_millis - self.timeout_mult = \ - retry_options.backoff_settings.rpc_timeout_multiplier - self.max_timeout = \ - (retry_options.backoff_settings.max_rpc_timeout_millis / - _MILLIS_PER_SECOND) - self.total_timeout = \ - (retry_options.backoff_settings.total_timeout_millis / - _MILLIS_PER_SECOND) - self._responses_for_row = 0 - self.set_stream() - - def set_start_key(self, start_key): - """ - Sets the row key at which this iterator will begin reading. - """ - self.start_key = start_key - self.start_key_closed = False - - def set_stream(self): - """ - Resets the read stream by making an RPC on the 'ReadRows' endpoint. - """ - req_pb = _create_row_request(self.name, start_key=self.start_key, - start_key_closed=self.start_key_closed, - end_key=self.end_key, - filter_=self.filter_, limit=self.limit, - end_inclusive=self.end_inclusive) - self.stream = self.client._data_stub.ReadRows(req_pb) - - @property - def responses_for_row(self): - """ Property that gives the number of calls made so far for the current - row. If 1, then either this row is being read for the first time, - or the most recent response required a retry, causing the row to be - read again - - :rtype: int - :returns: Int that gives the number of calls made so far for the - current row. - """ - return self._responses_for_row - - def clear_responses_for_row(self): - """ - Signals that a new row has been started. - """ - self._responses_for_row = 0 - - def next(self, *args, **kwargs): - """ - Read and return the next chunk from the stream. - Retry on idempotent failure. - """ - delay = self.retry_options.backoff_settings.initial_retry_delay_millis - exc = errors.RetryError('Retry total timeout exceeded before any' - 'response was received') - - now = time.time() - deadline = now + self.total_timeout - while deadline is None or now < deadline: - self._responses_for_row += 1 - try: - return(six.next(self.stream)) - except StopIteration as stop: - raise stop - except RpcError as error: # pylint: disable=broad-except - code = config.exc_to_code(error) - if code not in self.retry_options.retry_codes: - six.reraise(type(error), error) - - # pylint: disable=redefined-variable-type - exc = errors.RetryError( - 'Retry total timeout exceeded with exception', error) - - # Sleep a random number which will, on average, equal the - # expected delay. - to_sleep = random.uniform(0, delay * 2) - time.sleep(to_sleep / _MILLIS_PER_SECOND) - delay = min(delay * self.delay_mult, self.max_delay_millis) - now = time.time() - self._responses_for_row = 0 - self.set_stream() - - six.reraise(errors.RetryError, exc, sys.exc_info()[2]) - - def __next__(self, *args, **kwargs): - return self.next(*args, **kwargs) - - -def _create_row_request(table_name, row_key=None, start_key=None, - start_key_closed=True, end_key=None, filter_=None, - limit=None, end_inclusive=False): - """Creates a request to read rows in a table. - - :type table_name: str - :param table_name: The name of the table to read from. - - :type row_key: bytes - :param row_key: (Optional) The key of a specific row to read from. - - :type start_key: bytes - :param start_key: (Optional) The beginning of a range of row keys to - read from. The range will include ``start_key``. If - left empty, will be interpreted as the empty string. - - :type end_key: bytes - :param end_key: (Optional) The end of a range of row keys to read from. - The range will not include ``end_key``. If left empty, - will be interpreted as an infinite string. - - :type filter_: :class:`.RowFilter` - :param filter_: (Optional) The filter to apply to the contents of the - specified row(s). If unset, reads the entire table. - - :type limit: int - :param limit: (Optional) The read will terminate after committing to N - rows' worth of results. The default (zero) is to return - all results. - - :rtype: :class:`data_messages_v2_pb2.ReadRowsRequest` - :returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs. - :raises: :class:`ValueError ` if both - ``row_key`` and one of ``start_key`` and ``end_key`` are set - """ - request_kwargs = {'table_name': table_name} - if (row_key is not None and - (start_key is not None or end_key is not None)): - raise ValueError('Row key and row range cannot be ' - 'set simultaneously') - range_kwargs = {} - if start_key is not None or end_key is not None: - if start_key is not None: - if start_key_closed: - range_kwargs['start_key_closed'] = _to_bytes(start_key) - else: - range_kwargs['start_key_open'] = _to_bytes(start_key) - if end_key is not None: - end_key_key = 'end_key_open' - if end_inclusive: - end_key_key = 'end_key_closed' - range_kwargs[end_key_key] = _to_bytes(end_key) - if filter_ is not None: - request_kwargs['filter'] = filter_.to_pb() - if limit is not None: - request_kwargs['rows_limit'] = limit - - message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs) - - if row_key is not None: - message.rows.row_keys.append(_to_bytes(row_key)) - - if range_kwargs: - message.rows.row_ranges.add(**range_kwargs) - - return message diff --git a/bigtable/google/cloud/bigtable/row_data.py b/bigtable/google/cloud/bigtable/row_data.py index f45097f1d8e5..9bde1c0cb5a3 100644 --- a/bigtable/google/cloud/bigtable/row_data.py +++ b/bigtable/google/cloud/bigtable/row_data.py @@ -267,10 +267,6 @@ def consume_next(self): self._last_scanned_row_key = response.last_scanned_row_key - if hasattr(self._response_iterator, 'responses_for_row'): - if (self._response_iterator.responses_for_row == 1): - self._clear_accumulated_row() - row = self._row cell = self._cell @@ -304,10 +300,6 @@ def consume_next(self): if chunk.commit_row: self._save_current_row() - if hasattr(self._response_iterator, 'set_start_key'): - self._response_iterator.set_start_key(chunk.row_key) - if hasattr(self._response_iterator, 'clear_responses_for_row'): - self._response_iterator.clear_responses_for_row() row = cell = None continue @@ -353,11 +345,6 @@ def _validate_chunk_status(chunk): # No negative value_size (inferred as a general constraint). _raise_if(chunk.value_size < 0) - def _clear_accumulated_row(self): - self._row = None - self._cell = None - self._previous_cell = None - def _validate_chunk_new_row(self, chunk): """Helper for :meth:`_validate_chunk`.""" assert self.state == self.NEW_ROW diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index fbebb58c968b..100409e5e81c 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -18,6 +18,7 @@ from google.api_core.exceptions import RetryError from google.api_core.retry import if_exception_type from google.api_core.retry import Retry +from google.cloud._helpers import _to_bytes from google.cloud.bigtable._generated import ( bigtable_pb2 as data_messages_v2_pb2) from google.cloud.bigtable._generated import ( @@ -30,27 +31,8 @@ from google.cloud.bigtable.row import ConditionalRow from google.cloud.bigtable.row import DirectRow from google.cloud.bigtable.row_data import PartialRowsData -from google.gax import RetryOptions, BackoffSettings -from google.cloud.bigtable.retry import ReadRowsIterator, _create_row_request from grpc import StatusCode -BACKOFF_SETTINGS = BackoffSettings( - initial_retry_delay_millis=10, - retry_delay_multiplier=1.3, - max_retry_delay_millis=30000, - initial_rpc_timeout_millis=25 * 60 * 1000, - rpc_timeout_multiplier=1.0, - max_rpc_timeout_millis=25 * 60 * 1000, - total_timeout_millis=30 * 60 * 1000 -) - -RETRY_CODES = [ - StatusCode.DEADLINE_EXCEEDED, - StatusCode.ABORTED, - StatusCode.INTERNAL, - StatusCode.UNAVAILABLE -] - # Maximum number of mutations in bulk (MutateRowsRequest message): # (https://cloud.google.com/bigtable/docs/reference/data/rpc/ @@ -295,7 +277,7 @@ def read_row(self, row_key, filter_=None): return rows_data.rows[row_key] def read_rows(self, start_key=None, end_key=None, limit=None, - filter_=None, end_inclusive=False, backoff_settings=None): + filter_=None, end_inclusive=False): """Read rows from this table. :type start_key: bytes @@ -326,18 +308,13 @@ def read_rows(self, start_key=None, end_key=None, limit=None, :returns: A :class:`.PartialRowsData` convenience wrapper for consuming the streamed results. """ + request_pb = _create_row_request( + self.name, start_key=start_key, end_key=end_key, filter_=filter_, + limit=limit, end_inclusive=end_inclusive) client = self._instance._client - if backoff_settings is None: - backoff_settings = BACKOFF_SETTINGS - RETRY_OPTIONS = RetryOptions( - retry_codes=RETRY_CODES, - backoff_settings=backoff_settings - ) - - retrying_iterator = ReadRowsIterator(client, self.name, start_key, - end_key, filter_, limit, - end_inclusive, RETRY_OPTIONS) - return PartialRowsData(retrying_iterator) + response_iterator = client._data_stub.ReadRows(request_pb) + # We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse` + return PartialRowsData(response_iterator) def mutate_rows(self, rows, retry=DEFAULT_RETRY): """Mutates multiple rows in bulk. @@ -518,6 +495,74 @@ def _do_mutate_retryable_rows(self): return self.responses_statuses +def _create_row_request(table_name, row_key=None, start_key=None, end_key=None, + filter_=None, limit=None, end_inclusive=False): + """Creates a request to read rows in a table. + + :type table_name: str + :param table_name: The name of the table to read from. + + :type row_key: bytes + :param row_key: (Optional) The key of a specific row to read from. + + :type start_key: bytes + :param start_key: (Optional) The beginning of a range of row keys to + read from. The range will include ``start_key``. If + left empty, will be interpreted as the empty string. + + :type end_key: bytes + :param end_key: (Optional) The end of a range of row keys to read from. + The range will not include ``end_key``. If left empty, + will be interpreted as an infinite string. + + :type filter_: :class:`.RowFilter` + :param filter_: (Optional) The filter to apply to the contents of the + specified row(s). If unset, reads the entire table. + + :type limit: int + :param limit: (Optional) The read will terminate after committing to N + rows' worth of results. The default (zero) is to return + all results. + + :type end_inclusive: bool + :param end_inclusive: (Optional) Whether the ``end_key`` should be + considered inclusive. The default is False (exclusive). + + :rtype: :class:`data_messages_v2_pb2.ReadRowsRequest` + :returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs. + :raises: :class:`ValueError ` if both + ``row_key`` and one of ``start_key`` and ``end_key`` are set + """ + request_kwargs = {'table_name': table_name} + if (row_key is not None and + (start_key is not None or end_key is not None)): + raise ValueError('Row key and row range cannot be ' + 'set simultaneously') + range_kwargs = {} + if start_key is not None or end_key is not None: + if start_key is not None: + range_kwargs['start_key_closed'] = _to_bytes(start_key) + if end_key is not None: + end_key_key = 'end_key_open' + if end_inclusive: + end_key_key = 'end_key_closed' + range_kwargs[end_key_key] = _to_bytes(end_key) + if filter_ is not None: + request_kwargs['filter'] = filter_.to_pb() + if limit is not None: + request_kwargs['rows_limit'] = limit + + message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs) + + if row_key is not None: + message.rows.row_keys.append(_to_bytes(row_key)) + + if range_kwargs: + message.rows.row_ranges.add(**range_kwargs) + + return message + + def _mutate_rows_request(table_name, rows): """Creates a request to mutate rows in a table. diff --git a/bigtable/tests/retry_test_script.txt b/bigtable/tests/retry_test_script.txt deleted file mode 100644 index 863662e897ba..000000000000 --- a/bigtable/tests/retry_test_script.txt +++ /dev/null @@ -1,38 +0,0 @@ -# This retry script is processed by the retry server and the client under test. -# Client tests should parse any command beginning with "CLIENT:", send the corresponding RPC -# to the retry server and expect a valid response. -# "EXPECT" commands indicate the call the server is expecting the client to send. -# -# The retry server has one table named "table" that should be used for testing. -# There are three types of commands supported: -# READ -# Expect the corresponding rows to be returned with arbitrary values. -# SCAN ... -# Ranges are expressed as an interval with either open or closed start and end, -# such as [1,3) for "1,2" or (1, 3] for "2,3". -# WRITE -# All writes should succeed eventually. Value payload is ignored. -# The server writes PASS or FAIL on a line by itself to STDOUT depending on the result of the test. -# All other server output should be ignored. - -# Echo same scan back after immediate error -CLIENT: SCAN [r1,r3) r1,r2 -EXPECT: SCAN [r1,r3) -SERVER: ERROR Unavailable -EXPECT: SCAN [r1,r3) -SERVER: READ_RESPONSE r1,r2 - -# Retry scans with open interval starting at the least read row key. -# Instead of using open intervals for retry ranges, '\x00' can be -# appended to the last received row key and sent in a closed interval. -CLIENT: SCAN [r1,r9) r1,r2,r3,r4,r5,r6,r7,r8 -EXPECT: SCAN [r1,r9) -SERVER: READ_RESPONSE r1,r2,r3,r4 -SERVER: ERROR Unavailable -EXPECT: SCAN (r4,r9) -SERVER: ERROR Unavailable -EXPECT: SCAN (r4,r9) -SERVER: READ_RESPONSE r5,r6,r7 -SERVER: ERROR Unavailable -EXPECT: SCAN (r7,r9) -SERVER: READ_RESPONSE r8 diff --git a/bigtable/tests/system.py b/bigtable/tests/system.py index 4cbe4ef67152..c889b181673e 100644 --- a/bigtable/tests/system.py +++ b/bigtable/tests/system.py @@ -272,89 +272,6 @@ def test_delete_column_family(self): # Make sure we have successfully deleted it. self.assertEqual(temp_table.list_column_families(), {}) - def test_retry(self): - import subprocess, os, stat, platform, ssl - from google.cloud.bigtable.client import Client - from google.cloud.bigtable.instance import Instance - from google.cloud.bigtable.table import Table - - # import for urlopen based on version - try: - # python 3 - from urllib.request import urlopen - except ImportError: - # python 2 - from urllib2 import urlopen - - TEST_SCRIPT = 'tests/retry_test_script.txt' - SERVER_NAME = 'retry_server' - SERVER_ZIP = SERVER_NAME + ".tar.gz" - - def download_server(): - MOCK_SERVER_URLS = { - 'Linux': 'https://storage.googleapis.com/cloud-bigtable-test/retries/retry_server_linux.tar.gz', - 'Darwin': 'https://storage.googleapis.com/cloud-bigtable-test/retries/retry_server_mac.tar.gz', - } - - test_platform = platform.system() - if test_platform not in MOCK_SERVER_URLS: - self.skip('Retry server not available for platform {0}.'.format(test_platform)) - - context = ssl._create_unverified_context() - mock_server_download = urlopen(MOCK_SERVER_URLS[test_platform], context=context).read() - mock_server_file = open(SERVER_ZIP, 'wb') - mock_server_file.write(mock_server_download) - - # Extract server binary from archive - subprocess.call(['tar', 'zxvf', SERVER_ZIP, '-C', '.']) - os.remove(SERVER_ZIP) - - def process_scan(table, range, ids): - range_chunks = range.split(',') - range_open = range_chunks[0].lstrip('[]') - range_close = range_chunks[1].rstrip(')') - rows = table.read_rows(range_open, range_close) - rows.consume_all() - - should_download = os.environ.get('DOWNLOAD_BIGTABLE_TEST_SERVER') - if should_download is None or should_download == '1': - if not os.path.isfile(SERVER_NAME): - download_server() - - # Connect to server - server = subprocess.Popen( - ['./' + SERVER_NAME, '--script=' + TEST_SCRIPT], - stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - - (endpoint, port) = server.stdout.readline().decode("utf-8").rstrip("\n").split(":") - os.environ["BIGTABLE_EMULATOR_HOST"] = endpoint + ":" + port - client = Client(project="client", admin=True) - instance = Instance("instance", client) - table = instance.table("table") - - # Run test, line by line - with open(TEST_SCRIPT, 'r') as script: - for line in script.readlines(): - if line.startswith("CLIENT:"): - chunks = line.split(" ") - op = chunks[1] - process_scan(table, chunks[2], chunks[3]) - - # Check that the test passed - server.kill() - server_stdout_lines = [] - while True: - line = server.stdout.readline().decode("utf-8") - if line != '': - server_stdout_lines.append(line) - else: - break - self.assertEqual(server_stdout_lines[-1], "PASS\n") - - # Clean up - os.remove(SERVER_NAME) class TestDataAPI(unittest.TestCase): diff --git a/bigtable/tests/unit/_testing.py b/bigtable/tests/unit/_testing.py index 9192a134854e..cfa24c062660 100644 --- a/bigtable/tests/unit/_testing.py +++ b/bigtable/tests/unit/_testing.py @@ -14,6 +14,7 @@ """Mocks used to emulate gRPC generated objects.""" + class _FakeStub(object): """Acts as a gPRC stub.""" @@ -26,17 +27,6 @@ def __getattr__(self, name): # since __getattribute__ will handle them. return _MethodMock(name, self) -class _CustomFakeStub(object): - """Acts as a gRPC stub. Generates a result from a given iterator - """ - def __init__(self, result): - self.result = result - self.method_calls = [] - - def __getattr__(self, name): - # We need not worry about attributes set in constructor - # since __getattribute__ will handle them. - return _CustomMethodMock(name, self) class _MethodMock(object): """Mock for API method attached to a gRPC stub. @@ -54,17 +44,3 @@ def __call__(self, *args, **kwargs): curr_result, self._stub.results = (self._stub.results[0], self._stub.results[1:]) return curr_result - -class _CustomMethodMock(object): - """ - Same as _MethodMock, but backed by an injected callable. - """ - - def __init__(self, name, stub): - self._name = name - self._stub = stub - - def __call__(self, *args, **kwargs): - """Sync method meant to mock a gRPC stub request.""" - self._stub.method_calls.append((self._name, args, kwargs)) - return self._stub.result diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index 943cceb6f6a8..5b904f091c15 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -473,8 +473,7 @@ def test_read_rows(self): from google.cloud._testing import _Monkey from tests.unit._testing import _FakeStub from google.cloud.bigtable.row_data import PartialRowsData - from google.cloud.bigtable import retry as MUT - from google.cloud.bigtable.retry import ReadRowsIterator + from google.cloud.bigtable import table as MUT client = _Client() instance = _Instance(self.INSTANCE_NAME, client=client) @@ -494,18 +493,20 @@ def mock_create_row_request(table_name, **kwargs): # Patch the stub used by the API method. client._data_stub = stub = _FakeStub(response_iterator) + # Create expected_result. + expected_result = PartialRowsData(response_iterator) + + # Perform the method and check the result. start_key = b'start-key' end_key = b'end-key' filter_obj = object() limit = 22 with _Monkey(MUT, _create_row_request=mock_create_row_request): - # Perform the method and check the result. result = table.read_rows( start_key=start_key, end_key=end_key, filter_=filter_obj, limit=limit) - self.assertIsInstance(result._response_iterator, ReadRowsIterator) - self.assertEqual(result._response_iterator.client, client) + self.assertEqual(result, expected_result) self.assertEqual(stub.method_calls, [( 'ReadRows', (request_pb,), @@ -513,258 +514,13 @@ def mock_create_row_request(table_name, **kwargs): )]) created_kwargs = { 'start_key': start_key, - 'end_inclusive': False, 'end_key': end_key, 'filter_': filter_obj, 'limit': limit, - 'start_key_closed': True, + 'end_inclusive': False, } self.assertEqual(mock_created, [(table.name, created_kwargs)]) - def test_read_rows_one_chunk(self): - from google.cloud._testing import _Monkey - from tests.unit._testing import _FakeStub - from google.cloud.bigtable import retry as MUT - from google.cloud.bigtable.retry import ReadRowsIterator - from google.cloud.bigtable.row_data import Cell - from google.cloud.bigtable.row_data import PartialRowsData - - client = _Client() - instance = _Instance(self.INSTANCE_NAME, client=client) - table = self._make_one(self.TABLE_ID, instance) - - # Create request_pb - request_pb = object() # Returned by our mock. - mock_created = [] - - def mock_create_row_request(table_name, **kwargs): - mock_created.append((table_name, kwargs)) - return request_pb - - # Create response_iterator - chunk = _ReadRowsResponseCellChunkPB( - row_key=self.ROW_KEY, - family_name=self.FAMILY_NAME, - qualifier=self.QUALIFIER, - timestamp_micros=self.TIMESTAMP_MICROS, - value=self.VALUE, - commit_row=True, - ) - response_pb = _ReadRowsResponsePB(chunks=[chunk]) - response_iterator = iter([response_pb]) - - # Patch the stub used by the API method. - client._data_stub = stub = _FakeStub(response_iterator) - - start_key = b'start-key' - end_key = b'end-key' - filter_obj = object() - limit = 22 - with _Monkey(MUT, _create_row_request=mock_create_row_request): - # Perform the method and check the result. - result = table.read_rows( - start_key=start_key, end_key=end_key, filter_=filter_obj, - limit=limit) - result.consume_all() - - def test_read_rows_retry_timeout(self): - from google.cloud._testing import _Monkey - from tests.unit._testing import _CustomFakeStub - from google.cloud.bigtable.row_data import PartialRowsData - from google.cloud.bigtable import retry as MUT - from google.cloud.bigtable.retry import ReadRowsIterator - from google.gax import BackoffSettings - from google.gax.errors import RetryError - from grpc import StatusCode, RpcError - import time - - client = _Client() - instance = _Instance(self.INSTANCE_NAME, client=client) - table = self._make_one(self.TABLE_ID, instance) - - # Create request_pb - request_pb = object() # Returned by our mock. - mock_created = [] - - def mock_create_row_request(table_name, **kwargs): - mock_created.append((table_name, kwargs)) - return request_pb - - # Create a slow response iterator to cause a timeout - class MockTimeoutError(RpcError): - def code(self): - return StatusCode.DEADLINE_EXCEEDED - - class MockTimeoutIterator(object): - def next(self): - return self.__next__() - def __next__(self): - raise MockTimeoutError() - - client._data_stub = stub = _CustomFakeStub(MockTimeoutIterator()) - - # Set to timeout before RPC completes - test_backoff_settings = BackoffSettings( - initial_retry_delay_millis=10, - retry_delay_multiplier=0.3, - max_retry_delay_millis=30000, - initial_rpc_timeout_millis=1000, - rpc_timeout_multiplier=1.0, - max_rpc_timeout_millis=25 * 60 * 1000, - total_timeout_millis=1000 - ) - - start_key = b'start-key' - end_key = b'end-key' - filter_obj = object() - limit = 22 - with _Monkey(MUT, _create_row_request=mock_create_row_request): - # Verify that a RetryError is thrown on read. - result = table.read_rows( - start_key=start_key, end_key=end_key, filter_=filter_obj, - limit=limit, backoff_settings=test_backoff_settings) - with self.assertRaises(RetryError): - result.consume_next() - - def test_read_rows_mid_row_timeout_retry(self): - from google.cloud._testing import _Monkey - from tests.unit._testing import _CustomFakeStub - from google.cloud.bigtable.row_data import PartialRowsData - from google.cloud.bigtable import retry as MUT - from google.cloud.bigtable.retry import ReadRowsIterator - from google.gax import BackoffSettings - from google.gax.errors import RetryError - from grpc import StatusCode, RpcError - import time - - client = _Client() - instance = _Instance(self.INSTANCE_NAME, client=client) - table = self._make_one(self.TABLE_ID, instance) - - # Create request_pb - request_pb = object() # Returned by our mock. - mock_created = [] - - def mock_create_row_request(table_name, **kwargs): - mock_created.append((table_name, kwargs)) - return request_pb - - # Create an iterator that throws an idempotent exception - class MockTimeoutError(RpcError): - def code(self): - return StatusCode.DEADLINE_EXCEEDED - - first_chunk = _ReadRowsResponseCellChunkPB( - row_key=self.ROW_KEY, - family_name=self.FAMILY_NAME, - qualifier=self.QUALIFIER, - timestamp_micros=self.TIMESTAMP_MICROS, - value=self.VALUE, - ) - first_response = _ReadRowsResponsePB(chunks = [first_chunk]) - - second_chunk = _ReadRowsResponseCellChunkPB( - row_key=self.ROW_KEY, - family_name=self.FAMILY_NAME, - qualifier=self.QUALIFIER, - timestamp_micros=self.TIMESTAMP_MICROS, - value=self.VALUE, - commit_row=True, - ) - second_response = _ReadRowsResponsePB(chunks = [second_chunk]) - - class MidRowTimeoutIterator(object): - def __init__(self): - self.invocation_count = 0 - def next(self): - return self.__next__() - def __next__(self): - self.invocation_count += 1 - if (self.invocation_count == 1): - return first_response - elif (self.invocation_count == 2): - raise MockTimeoutError() - elif (self.invocation_count == 3): - return first_response - elif (self.invocation_count == 4): - return second_response - else: - raise StopIteration - - client._data_stub = stub = _CustomFakeStub(MidRowTimeoutIterator()) - - # Set to timeout before RPC completes - test_backoff_settings = BackoffSettings( - initial_retry_delay_millis=10, - retry_delay_multiplier=1, - max_retry_delay_millis=30000, - initial_rpc_timeout_millis=1000, - rpc_timeout_multiplier=1.0, - max_rpc_timeout_millis=25 * 60 * 1000, - total_timeout_millis=1000 - ) - - start_key = b'start-key' - end_key = b'end-key' - filter_obj = object() - limit = 22 - with _Monkey(MUT, _create_row_request=mock_create_row_request): - # Verify that a RetryError is thrown on read. - result = table.read_rows( - start_key=start_key, end_key=end_key, filter_=filter_obj, - limit=limit, backoff_settings=test_backoff_settings) - result.consume_all() - - cells = result.rows[self.ROW_KEY].cells[self.FAMILY_NAME][self.QUALIFIER] - self.assertEquals(len(cells), 2) - - def test_read_rows_non_idempotent_error_throws(self): - from google.cloud._testing import _Monkey - from tests.unit._testing import _CustomFakeStub - from google.cloud.bigtable.row_data import PartialRowsData - from google.cloud.bigtable import retry as MUT - from google.cloud.bigtable.retry import ReadRowsIterator - from google.gax import BackoffSettings - from google.gax.errors import RetryError - from grpc import StatusCode, RpcError - import time - - client = _Client() - instance = _Instance(self.INSTANCE_NAME, client=client) - table = self._make_one(self.TABLE_ID, instance) - - # Create request_pb - request_pb = object() # Returned by our mock. - mock_created = [] - - def mock_create_row_request(table_name, **kwargs): - mock_created.append((table_name, kwargs)) - return request_pb - - # Create response iterator that raises a non-idempotent exception - class MockNonIdempotentError(RpcError): - def code(self): - return StatusCode.RESOURCE_EXHAUSTED - - class MockNonIdempotentIterator(object): - def next(self): - return self.__next__() - def __next__(self): - raise MockNonIdempotentError() - - client._data_stub = stub = _CustomFakeStub(MockNonIdempotentIterator()) - - start_key = b'start-key' - end_key = b'end-key' - filter_obj = object() - limit = 22 - with _Monkey(MUT, _create_row_request=mock_create_row_request): - result = table.read_rows( - start_key=start_key, end_key=end_key, filter_=filter_obj, - limit=limit) - with self.assertRaises(MockNonIdempotentError): - result.consume_next() - def test_sample_row_keys(self): from tests.unit._testing import _FakeStub @@ -1244,14 +1000,12 @@ def test_do_mutate_retryable_rows_mismatch_num_responses(self): class Test__create_row_request(unittest.TestCase): def _call_fut(self, table_name, row_key=None, start_key=None, end_key=None, - filter_=None, limit=None, start_key_closed=True, - end_inclusive=False): - from google.cloud.bigtable.retry import _create_row_request + filter_=None, limit=None, end_inclusive=False): + from google.cloud.bigtable.table import _create_row_request return _create_row_request( table_name, row_key=row_key, start_key=start_key, end_key=end_key, - start_key_closed=start_key_closed, filter_=filter_, - limit=limit, end_inclusive=end_inclusive) + filter_=filter_, limit=limit, end_inclusive=end_inclusive) def test_table_name_only(self): table_name = 'table_name' @@ -1274,7 +1028,7 @@ def test_row_key(self): expected_result.rows.row_keys.append(row_key) self.assertEqual(result, expected_result) - def test_row_range_start_key_closed(self): + def test_row_range_start_key(self): table_name = 'table_name' start_key = b'start_key' result = self._call_fut(table_name, start_key=start_key) @@ -1282,15 +1036,6 @@ def test_row_range_start_key_closed(self): expected_result.rows.row_ranges.add(start_key_closed=start_key) self.assertEqual(result, expected_result) - def test_row_range_start_key_open(self): - table_name = 'table_name' - start_key = b'start_key' - result = self._call_fut(table_name, start_key=start_key, - start_key_closed=False) - expected_result = _ReadRowsRequestPB(table_name=table_name) - expected_result.rows.row_ranges.add(start_key_open=start_key) - self.assertEqual(result, expected_result) - def test_row_range_end_key(self): table_name = 'table_name' end_key = b'end_key' diff --git a/test_utils/scripts/circleci/prepare_bigtable.sh b/test_utils/scripts/circleci/prepare_bigtable.sh deleted file mode 100755 index 88f85c00425c..000000000000 --- a/test_utils/scripts/circleci/prepare_bigtable.sh +++ /dev/null @@ -1,24 +0,0 @@ -#!/bin/bash - -# Copyright 2017 Google Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -set -ev - -mkdir -p ~/bigtable -if [ ! -f ~/bigtable/retry_server ]; then - wget https://storage.googleapis.com/cloud-bigtable-test/retries/retry_server_linux.tar.gz - tar -zxvf retry_server_linux.tar.gz --no-same-owner -C /var/code/gcp/bigtable/ - rm retry_server_linux.tar.gz -fi