From 0b8003e6312ba152cccb4d521643a3ccb578366b Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 30 Apr 2025 21:13:56 +0300 Subject: [PATCH] chore(x-goog-spanner-request-id): implement request_id generation and propagation 1/3 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Generates a request_id that is then injected inside metadata that's sent over to the Cloud Spanner backend. Officially inject the first set of x-goog-spanner-request-id values into header metadata Add request-id interceptor to use in asserting tests Wrap Snapshot methods with x-goog-request-id metadata injector Updates #1261 --- google/cloud/spanner_v1/batch.py | 12 +- google/cloud/spanner_v1/database.py | 99 ++++- google/cloud/spanner_v1/pool.py | 8 +- google/cloud/spanner_v1/session.py | 39 +- google/cloud/spanner_v1/snapshot.py | 132 +++++-- .../cloud/spanner_v1/testing/interceptors.py | 24 +- .../cloud/spanner_v1/testing/mock_spanner.py | 1 + google/cloud/spanner_v1/transaction.py | 120 ++++-- .../mockserver_tests/mock_server_test_base.py | 22 ++ .../test_request_id_header.py | 363 +++++++++++++++++ tests/unit/test_atomic_counter.py | 1 + tests/unit/test_batch.py | 50 +++ tests/unit/test_database.py | 342 ++++++++++++++-- tests/unit/test_pool.py | 29 ++ tests/unit/test_session.py | 366 ++++++++++++++++-- tests/unit/test_snapshot.py | 170 ++++++-- tests/unit/test_spanner.py | 168 +++++++- tests/unit/test_transaction.py | 55 ++- 18 files changed, 1764 insertions(+), 237 deletions(-) create mode 100644 tests/mockserver_tests/test_request_id_header.py diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index 3d632c7568..afa0a4e71d 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -252,7 +252,11 @@ def commit( method = functools.partial( api.commit, request=request, - metadata=metadata, + metadata=database.metadata_with_request_id( + database._next_nth_request, + 1, + metadata, + ), ) deadline = time.time() + kwargs.get( "timeout_secs", DEFAULT_RETRY_TIMEOUT_SECS @@ -377,7 +381,11 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals method = functools.partial( api.batch_write, request=request, - metadata=metadata, + metadata=database.metadata_with_request_id( + database._next_nth_request, + 1, + metadata, + ), ) response = _retry( method, diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 03c6e5119f..004754960b 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -51,8 +51,10 @@ from google.cloud.spanner_v1 import SpannerClient from google.cloud.spanner_v1._helpers import _merge_query_options from google.cloud.spanner_v1._helpers import ( + AtomicCounter, _metadata_with_prefix, _metadata_with_leader_aware_routing, + _metadata_with_request_id, ) from google.cloud.spanner_v1.batch import Batch from google.cloud.spanner_v1.batch import MutationGroups @@ -151,6 +153,9 @@ class Database(object): _spanner_api: SpannerClient = None + __transport_lock = threading.Lock() + __transports_to_channel_id = dict() + def __init__( self, database_id, @@ -188,6 +193,7 @@ def __init__( self._instance._client.default_transaction_options ) self._proto_descriptors = proto_descriptors + self._channel_id = 0 # It'll be created when _spanner_api is created. if pool is None: pool = BurstyPool(database_role=database_role) @@ -446,8 +452,26 @@ def spanner_api(self): client_info=client_info, client_options=client_options, ) + + with self.__transport_lock: + transport = self._spanner_api._transport + channel_id = self.__transports_to_channel_id.get(transport, None) + if channel_id is None: + channel_id = len(self.__transports_to_channel_id) + 1 + self.__transports_to_channel_id[transport] = channel_id + self._channel_id = channel_id + return self._spanner_api + def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + return _metadata_with_request_id( + self._nth_client_id, + self._channel_id, + nth_request, + nth_attempt, + prior_metadata, + ) + def __eq__(self, other): if not isinstance(other, self.__class__): return NotImplemented @@ -490,7 +514,10 @@ def create(self): database_dialect=self._database_dialect, proto_descriptors=self._proto_descriptors, ) - future = api.create_database(request=request, metadata=metadata) + future = api.create_database( + request=request, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) return future def exists(self): @@ -506,7 +533,12 @@ def exists(self): metadata = _metadata_with_prefix(self.name) try: - api.get_database_ddl(database=self.name, metadata=metadata) + api.get_database_ddl( + database=self.name, + metadata=self.metadata_with_request_id( + self._next_nth_request, 1, metadata + ), + ) except NotFound: return False return True @@ -523,10 +555,16 @@ def reload(self): """ api = self._instance._client.database_admin_api metadata = _metadata_with_prefix(self.name) - response = api.get_database_ddl(database=self.name, metadata=metadata) + response = api.get_database_ddl( + database=self.name, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) self._ddl_statements = tuple(response.statements) self._proto_descriptors = response.proto_descriptors - response = api.get_database(name=self.name, metadata=metadata) + response = api.get_database( + name=self.name, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) self._state = DatabasePB.State(response.state) self._create_time = response.create_time self._restore_info = response.restore_info @@ -571,7 +609,10 @@ def update_ddl(self, ddl_statements, operation_id="", proto_descriptors=None): proto_descriptors=proto_descriptors, ) - future = api.update_database_ddl(request=request, metadata=metadata) + future = api.update_database_ddl( + request=request, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) return future def update(self, fields): @@ -609,7 +650,9 @@ def update(self, fields): metadata = _metadata_with_prefix(self.name) future = api.update_database( - database=database_pb, update_mask=field_mask, metadata=metadata + database=database_pb, + update_mask=field_mask, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), ) return future @@ -622,7 +665,10 @@ def drop(self): """ api = self._instance._client.database_admin_api metadata = _metadata_with_prefix(self.name) - api.drop_database(database=self.name, metadata=metadata) + api.drop_database( + database=self.name, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) def execute_partitioned_dml( self, @@ -711,7 +757,13 @@ def execute_pdml(): with SessionCheckout(self._pool) as session: add_span_event(span, "Starting BeginTransaction") txn = api.begin_transaction( - session=session.name, options=txn_options, metadata=metadata + session=session.name, + options=txn_options, + metadata=self.metadata_with_request_id( + self._next_nth_request, + 1, + metadata, + ), ) txn_selector = TransactionSelector(id=txn.id) @@ -724,6 +776,7 @@ def execute_pdml(): query_options=query_options, request_options=request_options, ) + method = functools.partial( api.execute_streaming_sql, metadata=metadata, @@ -736,6 +789,7 @@ def execute_pdml(): metadata=metadata, transaction_selector=txn_selector, observability_options=self.observability_options, + request_id_manager=self, ) result_set = StreamedResultSet(iterator) @@ -745,6 +799,17 @@ def execute_pdml(): return _retry_on_aborted(execute_pdml, DEFAULT_RETRY_BACKOFF)() + @property + def _next_nth_request(self): + if self._instance and self._instance._client: + return self._instance._client._next_nth_request + raise Exception("returning 1 for next_nth_request") + return 1 + + @property + def _nth_client_id(self): + return self._instance._client._nth_client_id + def session(self, labels=None, database_role=None): """Factory to create a session for this database. @@ -965,7 +1030,8 @@ def restore(self, source): ) future = api.restore_database( request=request, - metadata=metadata, + # TODO: Infer the channel_id being used. + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), ) return future @@ -1034,7 +1100,10 @@ def list_database_roles(self, page_size=None): parent=self.name, page_size=page_size, ) - return api.list_database_roles(request=request, metadata=metadata) + return api.list_database_roles( + request=request, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) def table(self, table_id): """Factory to create a table object within this database. @@ -1118,7 +1187,10 @@ def get_iam_policy(self, policy_version=None): requested_policy_version=policy_version ), ) - response = api.get_iam_policy(request=request, metadata=metadata) + response = api.get_iam_policy( + request=request, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) return response def set_iam_policy(self, policy): @@ -1140,7 +1212,10 @@ def set_iam_policy(self, policy): resource=self.name, policy=policy, ) - response = api.set_iam_policy(request=request, metadata=metadata) + response = api.set_iam_policy( + request=request, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) return response @property diff --git a/google/cloud/spanner_v1/pool.py b/google/cloud/spanner_v1/pool.py index 0c4dd5a63b..0bc0135ba0 100644 --- a/google/cloud/spanner_v1/pool.py +++ b/google/cloud/spanner_v1/pool.py @@ -256,7 +256,9 @@ def bind(self, database): ) resp = api.batch_create_sessions( request=request, - metadata=metadata, + metadata=database.metadata_with_request_id( + database._next_nth_request, 1, metadata + ), ) add_span_event( @@ -561,7 +563,9 @@ def bind(self, database): while returned_session_count < self.size: resp = api.batch_create_sessions( request=request, - metadata=metadata, + metadata=database.metadata_with_request_id( + database._next_nth_request, 1, metadata + ), ) add_span_event( diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index d5feb2ef1a..2e3492892b 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -170,7 +170,9 @@ def create(self): ), MetricsCapture(): session_pb = api.create_session( request=request, - metadata=metadata, + metadata=self._database.metadata_with_request_id( + self._database._next_nth_request, 1, metadata + ), ) self._session_id = session_pb.name.split("/")[-1] @@ -195,7 +197,8 @@ def exists(self): current_span, "Checking if Session exists", {"session.id": self._session_id} ) - api = self._database.spanner_api + database = self._database + api = database.spanner_api metadata = _metadata_with_prefix(self._database.name) if self._database._route_to_leader_enabled: metadata.append( @@ -212,7 +215,12 @@ def exists(self): metadata=metadata, ) as span, MetricsCapture(): try: - api.get_session(name=self.name, metadata=metadata) + api.get_session( + name=self.name, + metadata=database.metadata_with_request_id( + database._next_nth_request, 1, metadata + ), + ) if span: span.set_attribute("session_found", True) except NotFound: @@ -242,8 +250,11 @@ def delete(self): current_span, "Deleting Session", {"session.id": self._session_id} ) - api = self._database.spanner_api - metadata = _metadata_with_prefix(self._database.name) + database = self._database + api = database.spanner_api + metadata = database.metadata_with_request_id( + database._next_nth_request, 1, _metadata_with_prefix(database.name) + ) observability_options = getattr(self._database, "observability_options", None) with trace_call( "CloudSpanner.DeleteSession", @@ -255,7 +266,10 @@ def delete(self): observability_options=observability_options, metadata=metadata, ), MetricsCapture(): - api.delete_session(name=self.name, metadata=metadata) + api.delete_session( + name=self.name, + metadata=metadata, + ) def ping(self): """Ping the session to keep it alive by executing "SELECT 1". @@ -264,10 +278,17 @@ def ping(self): """ if self._session_id is None: raise ValueError("Session ID not set by back-end") - api = self._database.spanner_api - metadata = _metadata_with_prefix(self._database.name) + database = self._database + api = database.spanner_api request = ExecuteSqlRequest(session=self.name, sql="SELECT 1") - api.execute_sql(request=request, metadata=metadata) + api.execute_sql( + request=request, + metadata=database.metadata_with_request_id( + database._next_nth_request, + 1, + _metadata_with_prefix(database.name), + ), + ) self._last_use_time = datetime.now() def snapshot(self, **kw): diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 3b18d2c855..6d4b8916f0 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -38,6 +38,7 @@ _retry, _check_rst_stream_error, _SessionWrapper, + AtomicCounter, ) from google.cloud.spanner_v1._opentelemetry_tracing import trace_call from google.cloud.spanner_v1.streamed import StreamedResultSet @@ -61,6 +62,7 @@ def _restart_on_unavailable( transaction=None, transaction_selector=None, observability_options=None, + request_id_manager=None, ): """Restart iteration after :exc:`.ServiceUnavailable`. @@ -80,6 +82,8 @@ def _restart_on_unavailable( resume_token = b"" item_buffer = [] + next_nth_request = lambda: getattr(request_id_manager, "_next_nth_request", 0) + nth_request = next_nth_request() if transaction is not None: transaction_selector = transaction._make_txn_selector() @@ -90,6 +94,7 @@ def _restart_on_unavailable( request.transaction = transaction_selector iterator = None + attempt = 0 while True: try: @@ -101,7 +106,13 @@ def _restart_on_unavailable( observability_options=observability_options, metadata=metadata, ), MetricsCapture(): - iterator = method(request=request, metadata=metadata) + attempt += 1 + iterator = method( + request=request, + metadata=request_id_manager.metadata_with_request_id( + nth_request, attempt, metadata + ), + ) for item in iterator: item_buffer.append(item) # Setting the transaction id because the transaction begin was inlined for first rpc. @@ -129,7 +140,14 @@ def _restart_on_unavailable( if transaction is not None: transaction_selector = transaction._make_txn_selector() request.transaction = transaction_selector - iterator = method(request=request) + nth_request = next_nth_request() + attempt = 1 + iterator = method( + request=request, + metadata=request_id_manager.metadata_with_request_id( + nth_request, attempt, metadata + ), + ) continue except InternalServerError as exc: resumable_error = any( @@ -149,8 +167,15 @@ def _restart_on_unavailable( request.resume_token = resume_token if transaction is not None: transaction_selector = transaction._make_txn_selector() + nth_request = next_nth_request() + attempt = 1 request.transaction = transaction_selector - iterator = method(request=request) + iterator = method( + request=request, + metadata=request_id_manager.metadata_with_request_id( + nth_request, attempt, metadata + ), + ) continue if len(item_buffer) == 0: @@ -329,6 +354,7 @@ def read( data_boost_enabled=data_boost_enabled, directed_read_options=directed_read_options, ) + restart = functools.partial( api.streaming_read, request=request, @@ -352,6 +378,7 @@ def read( trace_attributes, transaction=self, observability_options=observability_options, + request_id_manager=self._session._database, ) self._read_request_count += 1 if self._multi_use: @@ -375,6 +402,7 @@ def read( trace_attributes, transaction=self, observability_options=observability_options, + request_id_manager=self._session._database, ) self._read_request_count += 1 @@ -562,13 +590,16 @@ def execute_sql( data_boost_enabled=data_boost_enabled, directed_read_options=directed_read_options, ) - restart = functools.partial( - api.execute_streaming_sql, - request=request, - metadata=metadata, - retry=retry, - timeout=timeout, - ) + + def wrapped_restart(*args, **kwargs): + restart = functools.partial( + api.execute_streaming_sql, + request=request, + metadata=kwargs.get("metadata", metadata), + retry=retry, + timeout=timeout, + ) + return restart(*args, **kwargs) trace_attributes = {"db.statement": sql} observability_options = getattr(database, "observability_options", None) @@ -577,7 +608,7 @@ def execute_sql( # lock is added to handle the inline begin for first rpc with self._lock: return self._get_streamed_result_set( - restart, + wrapped_restart, request, metadata, trace_attributes, @@ -587,7 +618,7 @@ def execute_sql( ) else: return self._get_streamed_result_set( - restart, + wrapped_restart, request, metadata, trace_attributes, @@ -615,6 +646,7 @@ def _get_streamed_result_set( trace_attributes, transaction=self, observability_options=observability_options, + request_id_manager=self._session._database, ) self._read_request_count += 1 self._execute_sql_count += 1 @@ -718,15 +750,25 @@ def partition_read( observability_options=getattr(database, "observability_options", None), metadata=metadata, ), MetricsCapture(): - method = functools.partial( - api.partition_read, - request=request, - metadata=metadata, - retry=retry, - timeout=timeout, - ) + nth_request = getattr(database, "_next_nth_request", 0) + counters = dict(attempt=0) + + def attempt_tracking_method(): + counters["attempt"] += 1 + all_metadata = database.metadata_with_request_id( + nth_request, counters["attempt"], metadata + ) + method = functools.partial( + api.partition_read, + request=request, + metadata=all_metadata, + retry=retry, + timeout=timeout, + ) + return method() + response = _retry( - method, + attempt_tracking_method, allowed_exceptions={InternalServerError: _check_rst_stream_error}, ) @@ -822,15 +864,25 @@ def partition_query( observability_options=getattr(database, "observability_options", None), metadata=metadata, ), MetricsCapture(): - method = functools.partial( - api.partition_query, - request=request, - metadata=metadata, - retry=retry, - timeout=timeout, - ) + nth_request = getattr(database, "_next_nth_request", 0) + counters = dict(attempt=0) + + def attempt_tracking_method(): + counters["attempt"] += 1 + all_metadata = database.metadata_with_request_id( + nth_request, counters["attempt"], metadata + ) + method = functools.partial( + api.partition_query, + request=request, + metadata=all_metadata, + retry=retry, + timeout=timeout, + ) + return method() + response = _retry( - method, + attempt_tracking_method, allowed_exceptions={InternalServerError: _check_rst_stream_error}, ) @@ -969,14 +1021,24 @@ def begin(self): observability_options=getattr(database, "observability_options", None), metadata=metadata, ), MetricsCapture(): - method = functools.partial( - api.begin_transaction, - session=self._session.name, - options=txn_selector.begin, - metadata=metadata, - ) + nth_request = getattr(database, "_next_nth_request", 0) + counters = dict(attempt=0) + + def attempt_tracking_method(): + counters["attempt"] += 1 + all_metadata = database.metadata_with_request_id( + nth_request, counters["attempt"], metadata + ) + method = functools.partial( + api.begin_transaction, + session=self._session.name, + options=txn_selector.begin, + metadata=all_metadata, + ) + return method() + response = _retry( - method, + attempt_tracking_method, allowed_exceptions={InternalServerError: _check_rst_stream_error}, ) self._transaction_id = response.id diff --git a/google/cloud/spanner_v1/testing/interceptors.py b/google/cloud/spanner_v1/testing/interceptors.py index bf5e271e26..bb36b5b294 100644 --- a/google/cloud/spanner_v1/testing/interceptors.py +++ b/google/cloud/spanner_v1/testing/interceptors.py @@ -71,9 +71,6 @@ def reset(self): class XGoogRequestIDHeaderInterceptor(ClientInterceptor): - # TODO:(@odeke-em): delete this guard when PR #1367 is merged. - X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED = False - def __init__(self): self._unary_req_segments = [] self._stream_req_segments = [] @@ -87,7 +84,7 @@ def intercept(self, method, request_or_iterator, call_details): x_goog_request_id = value break - if self.X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED and not x_goog_request_id: + if not x_goog_request_id: raise Exception( f"Missing {X_GOOG_REQUEST_ID} header in {call_details.method}" ) @@ -95,16 +92,15 @@ def intercept(self, method, request_or_iterator, call_details): response_or_iterator = method(request_or_iterator, call_details) streaming = getattr(response_or_iterator, "__iter__", None) is not None - if self.X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED: - with self.__lock: - if streaming: - self._stream_req_segments.append( - (call_details.method, parse_request_id(x_goog_request_id)) - ) - else: - self._unary_req_segments.append( - (call_details.method, parse_request_id(x_goog_request_id)) - ) + with self.__lock: + if streaming: + self._stream_req_segments.append( + (call_details.method, parse_request_id(x_goog_request_id)) + ) + else: + self._unary_req_segments.append( + (call_details.method, parse_request_id(x_goog_request_id)) + ) return response_or_iterator diff --git a/google/cloud/spanner_v1/testing/mock_spanner.py b/google/cloud/spanner_v1/testing/mock_spanner.py index f8971a6098..e2ac14e976 100644 --- a/google/cloud/spanner_v1/testing/mock_spanner.py +++ b/google/cloud/spanner_v1/testing/mock_spanner.py @@ -53,6 +53,7 @@ def pop_error(self, context): name = inspect.currentframe().f_back.f_code.co_name error: _Status | None = self.errors.pop(name, None) if error: + print("context.abort_with_status", error) context.abort_with_status(error) def get_result_as_partial_result_sets( diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index 2f52aaa144..8adbf8e108 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -32,6 +32,7 @@ from google.cloud.spanner_v1 import ExecuteSqlRequest from google.cloud.spanner_v1 import TransactionSelector from google.cloud.spanner_v1 import TransactionOptions +from google.cloud.spanner_v1._helpers import AtomicCounter from google.cloud.spanner_v1.snapshot import _SnapshotBase from google.cloud.spanner_v1.batch import _BatchBase from google.cloud.spanner_v1._opentelemetry_tracing import add_span_event, trace_call @@ -181,12 +182,20 @@ def begin(self): observability_options=observability_options, metadata=metadata, ) as span, MetricsCapture(): - method = functools.partial( - api.begin_transaction, - session=self._session.name, - options=txn_options, - metadata=metadata, - ) + attempt = AtomicCounter(0) + nth_request = database._next_nth_request + + def wrapped_method(*args, **kwargs): + attempt.increment() + method = functools.partial( + api.begin_transaction, + session=self._session.name, + options=txn_options, + metadata=database.metadata_with_request_id( + nth_request, attempt.value, metadata + ), + ) + return method(*args, **kwargs) def beforeNextRetry(nthRetry, delayInSeconds): add_span_event( @@ -196,7 +205,7 @@ def beforeNextRetry(nthRetry, delayInSeconds): ) response = _retry( - method, + wrapped_method, allowed_exceptions={InternalServerError: _check_rst_stream_error}, beforeNextRetry=beforeNextRetry, ) @@ -217,6 +226,7 @@ def rollback(self): database._route_to_leader_enabled ) ) + observability_options = getattr(database, "observability_options", None) with trace_call( f"CloudSpanner.{type(self).__name__}.rollback", @@ -224,16 +234,26 @@ def rollback(self): observability_options=observability_options, metadata=metadata, ), MetricsCapture(): - method = functools.partial( - api.rollback, - session=self._session.name, - transaction_id=self._transaction_id, - metadata=metadata, - ) + attempt = AtomicCounter(0) + nth_request = database._next_nth_request + + def wrapped_method(*args, **kwargs): + attempt.increment() + method = functools.partial( + api.rollback, + session=self._session.name, + transaction_id=self._transaction_id, + metadata=database.metadata_with_request_id( + nth_request, attempt.value, metadata + ), + ) + return method(*args, **kwargs) + _retry( - method, + wrapped_method, allowed_exceptions={InternalServerError: _check_rst_stream_error}, ) + self.rolled_back = True del self._session._transaction @@ -306,11 +326,19 @@ def commit( add_span_event(span, "Starting Commit") - method = functools.partial( - api.commit, - request=request, - metadata=metadata, - ) + attempt = AtomicCounter(0) + nth_request = database._next_nth_request + + def wrapped_method(*args, **kwargs): + attempt.increment() + method = functools.partial( + api.commit, + request=request, + metadata=database.metadata_with_request_id( + nth_request, attempt.value, metadata + ), + ) + return method(*args, **kwargs) def beforeNextRetry(nthRetry, delayInSeconds): add_span_event( @@ -320,7 +348,7 @@ def beforeNextRetry(nthRetry, delayInSeconds): ) response = _retry( - method, + wrapped_method, allowed_exceptions={InternalServerError: _check_rst_stream_error}, beforeNextRetry=beforeNextRetry, ) @@ -469,19 +497,27 @@ def execute_update( last_statement=last_statement, ) - method = functools.partial( - api.execute_sql, - request=request, - metadata=metadata, - retry=retry, - timeout=timeout, - ) + nth_request = database._next_nth_request + attempt = AtomicCounter(0) + + def wrapped_method(*args, **kwargs): + attempt.increment() + method = functools.partial( + api.execute_sql, + request=request, + metadata=database.metadata_with_request_id( + nth_request, attempt.value, metadata + ), + retry=retry, + timeout=timeout, + ) + return method(*args, **kwargs) if self._transaction_id is None: # lock is added to handle the inline begin for first rpc with self._lock: response = self._execute_request( - method, + wrapped_method, request, metadata, f"CloudSpanner.{type(self).__name__}.execute_update", @@ -499,7 +535,7 @@ def execute_update( self._transaction_id = response.metadata.transaction.id else: response = self._execute_request( - method, + wrapped_method, request, metadata, f"CloudSpanner.{type(self).__name__}.execute_update", @@ -611,19 +647,27 @@ def batch_update( last_statements=last_statement, ) - method = functools.partial( - api.execute_batch_dml, - request=request, - metadata=metadata, - retry=retry, - timeout=timeout, - ) + nth_request = database._next_nth_request + attempt = AtomicCounter(0) + + def wrapped_method(*args, **kwargs): + attempt.increment() + method = functools.partial( + api.execute_batch_dml, + request=request, + metadata=database.metadata_with_request_id( + nth_request, attempt.value, metadata + ), + retry=retry, + timeout=timeout, + ) + return method(*args, **kwargs) if self._transaction_id is None: # lock is added to handle the inline begin for first rpc with self._lock: response = self._execute_request( - method, + wrapped_method, request, metadata, "CloudSpanner.DMLTransaction", @@ -642,7 +686,7 @@ def batch_update( break else: response = self._execute_request( - method, + wrapped_method, request, metadata, "CloudSpanner.DMLTransaction", diff --git a/tests/mockserver_tests/mock_server_test_base.py b/tests/mockserver_tests/mock_server_test_base.py index 7b4538d601..2f89415b55 100644 --- a/tests/mockserver_tests/mock_server_test_base.py +++ b/tests/mockserver_tests/mock_server_test_base.py @@ -20,6 +20,7 @@ start_mock_server, SpannerServicer, ) +from google.cloud.spanner_v1.client import Client import google.cloud.spanner_v1.types.type as spanner_type import google.cloud.spanner_v1.types.result_set as result_set from google.api_core.client_options import ClientOptions @@ -78,6 +79,27 @@ def unavailable_status() -> _Status: return status +# Creates an UNAVAILABLE status with the smallest possible retry delay. +def unavailable_status() -> _Status: + error = status_pb2.Status( + code=code_pb2.UNAVAILABLE, + message="Service unavailable.", + ) + retry_info = RetryInfo(retry_delay=Duration(seconds=0, nanos=1)) + status = _Status( + code=code_to_grpc_status_code(error.code), + details=error.message, + trailing_metadata=( + ("grpc-status-details-bin", error.SerializeToString()), + ( + "google.rpc.retryinfo-bin", + retry_info.SerializeToString(), + ), + ), + ) + return status + + def add_error(method: str, error: status_pb2.Status): MockServerTestBase.spanner_service.mock_spanner.add_error(method, error) diff --git a/tests/mockserver_tests/test_request_id_header.py b/tests/mockserver_tests/test_request_id_header.py new file mode 100644 index 0000000000..24af837728 --- /dev/null +++ b/tests/mockserver_tests/test_request_id_header.py @@ -0,0 +1,363 @@ +# Copyright 2024 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +import threading + +from google.cloud.spanner_v1 import ( + BatchCreateSessionsRequest, + BeginTransactionRequest, + ExecuteSqlRequest, +) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID +from google.cloud.spanner_v1.testing.mock_spanner import SpannerServicer +from tests.mockserver_tests.mock_server_test_base import ( + MockServerTestBase, + add_select1_result, + aborted_status, + add_error, + unavailable_status, +) + + +class TestRequestIDHeader(MockServerTestBase): + def tearDown(self): + self.database._x_goog_request_id_interceptor.reset() + + def test_snapshot_execute_sql(self): + add_select1_result() + if not getattr(self.database, "_interceptors", None): + self.database._interceptors = MockServerTestBase._interceptors + with self.database.snapshot() as snapshot: + results = snapshot.execute_sql("select 1") + result_list = [] + for row in results: + result_list.append(row) + self.assertEqual(1, row[0]) + self.assertEqual(1, len(result_list)) + + requests = self.spanner_service.requests + self.assertEqual(2, len(requests), msg=requests) + self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest)) + self.assertTrue(isinstance(requests[1], ExecuteSqlRequest)) + + NTH_CLIENT = self.database._nth_client_id + CHANNEL_ID = self.database._channel_id + # Now ensure monotonicity of the received request-id segments. + got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers() + want_unary_segments = [ + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 1), + ) + ] + want_stream_segments = [ + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 1), + ) + ] + + assert got_unary_segments == want_unary_segments + assert got_stream_segments == want_stream_segments + + def test_snapshot_read_concurrent(self): + db = self.database + # Trigger BatchCreateSessions firstly. + with db.snapshot() as snapshot: + rows = snapshot.execute_sql("select 1") + for row in rows: + _ = row + + # The other requests can then proceed. + def select1(): + with db.snapshot() as snapshot: + rows = snapshot.execute_sql("select 1") + res_list = [] + for row in rows: + self.assertEqual(1, row[0]) + res_list.append(row) + self.assertEqual(1, len(res_list)) + + n = 10 + threads = [] + for i in range(n): + th = threading.Thread(target=select1, name=f"snapshot-select1-{i}") + th.run() + threads.append(th) + + random.shuffle(threads) + + while True: + n_finished = 0 + for thread in threads: + if thread.is_alive(): + thread.join() + else: + n_finished += 1 + + if n_finished == len(threads): + break + + requests = self.spanner_service.requests + self.assertEqual(2 + n * 2, len(requests), msg=requests) + + client_id = db._nth_client_id + channel_id = db._channel_id + got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers() + + want_unary_segments = [ + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 1, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 3, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 5, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 7, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 9, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 11, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 13, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 15, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 17, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 19, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 21, 1), + ), + ] + assert got_unary_segments == want_unary_segments + + want_stream_segments = [ + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 2, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 4, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 6, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 8, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 10, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 12, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 14, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 16, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 18, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 20, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 22, 1), + ), + ] + assert got_stream_segments == want_stream_segments + + def test_database_run_in_transaction_retries_on_abort(self): + counters = dict(aborted=0) + want_failed_attempts = 2 + + def select_in_txn(txn): + results = txn.execute_sql("select 1") + for row in results: + _ = row + + if counters["aborted"] < want_failed_attempts: + counters["aborted"] += 1 + add_error(SpannerServicer.Commit.__name__, aborted_status()) + + add_select1_result() + if not getattr(self.database, "_interceptors", None): + self.database._interceptors = MockServerTestBase._interceptors + + self.database.run_in_transaction(select_in_txn) + + def test_database_execute_partitioned_dml_request_id(self): + add_select1_result() + if not getattr(self.database, "_interceptors", None): + self.database._interceptors = MockServerTestBase._interceptors + _ = self.database.execute_partitioned_dml("select 1") + + requests = self.spanner_service.requests + self.assertEqual(3, len(requests), msg=requests) + self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest)) + self.assertTrue(isinstance(requests[1], BeginTransactionRequest)) + self.assertTrue(isinstance(requests[2], ExecuteSqlRequest)) + + # Now ensure monotonicity of the received request-id segments. + got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers() + NTH_CLIENT = self.database._nth_client_id + CHANNEL_ID = self.database._channel_id + want_unary_segments = [ + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 1), + ), + ( + "/google.spanner.v1.Spanner/BeginTransaction", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 1), + ), + ] + want_stream_segments = [ + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 3, 1), + ) + ] + + assert got_unary_segments == want_unary_segments + assert got_stream_segments == want_stream_segments + + def test_unary_retryable_error(self): + add_select1_result() + add_error(SpannerServicer.BatchCreateSessions.__name__, unavailable_status()) + + if not getattr(self.database, "_interceptors", None): + self.database._interceptors = MockServerTestBase._interceptors + with self.database.snapshot() as snapshot: + results = snapshot.execute_sql("select 1") + result_list = [] + for row in results: + result_list.append(row) + self.assertEqual(1, row[0]) + self.assertEqual(1, len(result_list)) + + requests = self.spanner_service.requests + self.assertEqual(3, len(requests), msg=requests) + self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest)) + self.assertTrue(isinstance(requests[1], BatchCreateSessionsRequest)) + self.assertTrue(isinstance(requests[2], ExecuteSqlRequest)) + + NTH_CLIENT = self.database._nth_client_id + CHANNEL_ID = self.database._channel_id + # Now ensure monotonicity of the received request-id segments. + got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers() + want_unary_segments = [ + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 1), + ), + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 2), + ), + ] + want_stream_segments = [ + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 1), + ) + ] + + print("got_unaries", got_unary_segments) + print("got_stream", got_stream_segments) + assert got_unary_segments == want_unary_segments + assert got_stream_segments == want_stream_segments + + def test_streaming_retryable_error(self): + add_select1_result() + add_error(SpannerServicer.ExecuteStreamingSql.__name__, unavailable_status()) + + if not getattr(self.database, "_interceptors", None): + self.database._interceptors = MockServerTestBase._interceptors + with self.database.snapshot() as snapshot: + results = snapshot.execute_sql("select 1") + result_list = [] + for row in results: + result_list.append(row) + self.assertEqual(1, row[0]) + self.assertEqual(1, len(result_list)) + + requests = self.spanner_service.requests + self.assertEqual(3, len(requests), msg=requests) + self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest)) + self.assertTrue(isinstance(requests[1], ExecuteSqlRequest)) + self.assertTrue(isinstance(requests[2], ExecuteSqlRequest)) + + NTH_CLIENT = self.database._nth_client_id + CHANNEL_ID = self.database._channel_id + # Now ensure monotonicity of the received request-id segments. + got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers() + want_unary_segments = [ + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 1), + ), + ] + want_stream_segments = [ + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 2), + ), + ] + + assert got_unary_segments == want_unary_segments + assert got_stream_segments == want_stream_segments + + def canonicalize_request_id_headers(self): + src = self.database._x_goog_request_id_interceptor + return src._stream_req_segments, src._unary_req_segments diff --git a/tests/unit/test_atomic_counter.py b/tests/unit/test_atomic_counter.py index 92d10cac79..e8d8b6b7ce 100644 --- a/tests/unit/test_atomic_counter.py +++ b/tests/unit/test_atomic_counter.py @@ -15,6 +15,7 @@ import random import threading import unittest + from google.cloud.spanner_v1._helpers import AtomicCounter diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index 355ce20520..4d059b17cc 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -37,6 +37,10 @@ from google.cloud.spanner_v1.keyset import KeySet from google.rpc.status_pb2 import Status +from google.cloud.spanner_v1._helpers import ( + _metadata_with_request_id, +) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID TABLE_NAME = "citizens" COLUMNS = ["email", "first_name", "last_name", "age"] @@ -249,6 +253,10 @@ def test_commit_ok(self): [ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + ), ], ) self.assertEqual(request_options, RequestOptions()) @@ -343,6 +351,10 @@ def _test_commit_with_options( [ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), ], ) self.assertEqual(actual_request_options, expected_request_options) @@ -453,6 +465,10 @@ def test_context_mgr_success(self): [ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Database.NTH_CLIENT}.1.1.1", + ), ], ) self.assertEqual(request_options, RequestOptions()) @@ -579,10 +595,17 @@ def _test_batch_write_with_request_options( "traceparent is missing in metadata", ) + expected_metadata.append( + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + ) + ) # Remove traceparent from actual metadata for comparison filtered_metadata = [item for item in metadata if item[0] != "traceparent"] self.assertEqual(filtered_metadata, expected_metadata) + if request_options is None: expected_request_options = RequestOptions() elif type(request_options) is dict: @@ -635,12 +658,39 @@ def session_id(self): class _Database(object): + name = "testing" + _route_to_leader_enabled = True + NTH_CLIENT = 1 + def __init__(self, enable_end_to_end_tracing=False): self.name = "testing" self._route_to_leader_enabled = True if enable_end_to_end_tracing: self.observability_options = dict(enable_end_to_end_tracing=True) self.default_transaction_options = DefaultTransactionOptions() + self._nth_request = 0 + + @property + def _next_nth_request(self): + self._nth_request += 1 + return self._nth_request + + @property + def _nth_client_id(self): + return 1 + + def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + return _metadata_with_request_id( + self._nth_client_id, + self._channel_id, + nth_request, + nth_attempt, + prior_metadata, + ) + + @property + def _channel_id(self): + return 1 class _FauxSpannerAPI: diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index c7ed5a0e3d..c9f2e05e43 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -30,6 +30,11 @@ DirectedReadOptions, DefaultTransactionOptions, ) +from google.cloud.spanner_v1._helpers import ( + AtomicCounter, + _metadata_with_request_id, +) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID DML_WO_PARAM = """ DELETE FROM citizens @@ -115,7 +120,9 @@ def _make_database_admin_api(): def _make_spanner_api(): from google.cloud.spanner_v1 import SpannerClient - return mock.create_autospec(SpannerClient, instance=True) + api = mock.create_autospec(SpannerClient, instance=True) + api._transport = "transport" + return api def test_ctor_defaults(self): from google.cloud.spanner_v1.pool import BurstyPool @@ -549,7 +556,13 @@ def test_create_grpc_error(self): api.create_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_create_already_exists(self): @@ -576,7 +589,13 @@ def test_create_already_exists(self): api.create_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_create_instance_not_found(self): @@ -602,7 +621,13 @@ def test_create_instance_not_found(self): api.create_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_create_success(self): @@ -638,7 +663,13 @@ def test_create_success(self): api.create_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_create_success_w_encryption_config_dict(self): @@ -675,7 +706,13 @@ def test_create_success_w_encryption_config_dict(self): api.create_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_create_success_w_proto_descriptors(self): @@ -710,7 +747,13 @@ def test_create_success_w_proto_descriptors(self): api.create_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_exists_grpc_error(self): @@ -728,7 +771,13 @@ def test_exists_grpc_error(self): api.get_database_ddl.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_exists_not_found(self): @@ -745,7 +794,13 @@ def test_exists_not_found(self): api.get_database_ddl.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_exists_success(self): @@ -764,7 +819,13 @@ def test_exists_success(self): api.get_database_ddl.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_reload_grpc_error(self): @@ -782,7 +843,13 @@ def test_reload_grpc_error(self): api.get_database_ddl.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_reload_not_found(self): @@ -800,7 +867,13 @@ def test_reload_not_found(self): api.get_database_ddl.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_reload_success(self): @@ -859,11 +932,23 @@ def test_reload_success(self): api.get_database_ddl.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) api.get_database.assert_called_once_with( name=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.2.1", + ), + ], ) def test_update_ddl_grpc_error(self): @@ -889,7 +974,13 @@ def test_update_ddl_grpc_error(self): api.update_database_ddl.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_update_ddl_not_found(self): @@ -915,7 +1006,13 @@ def test_update_ddl_not_found(self): api.update_database_ddl.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_update_ddl(self): @@ -942,7 +1039,13 @@ def test_update_ddl(self): api.update_database_ddl.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_update_ddl_w_operation_id(self): @@ -969,7 +1072,13 @@ def test_update_ddl_w_operation_id(self): api.update_database_ddl.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_update_success(self): @@ -995,7 +1104,13 @@ def test_update_success(self): api.update_database.assert_called_once_with( database=expected_database, update_mask=field_mask, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_update_ddl_w_proto_descriptors(self): @@ -1023,7 +1138,13 @@ def test_update_ddl_w_proto_descriptors(self): api.update_database_ddl.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_drop_grpc_error(self): @@ -1041,7 +1162,13 @@ def test_drop_grpc_error(self): api.drop_database.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_drop_not_found(self): @@ -1059,7 +1186,13 @@ def test_drop_not_found(self): api.drop_database.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_drop_success(self): @@ -1076,7 +1209,13 @@ def test_drop_success(self): api.drop_database.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def _execute_partitioned_dml_helper( @@ -1149,18 +1288,35 @@ def _execute_partitioned_dml_helper( exclude_txn_from_change_streams=exclude_txn_from_change_streams, ) - api.begin_transaction.assert_called_with( - session=session.name, - options=txn_options, - metadata=[ - ("google-cloud-resource-prefix", database.name), - ("x-goog-spanner-route-to-leader", "true"), - ], - ) if retried: self.assertEqual(api.begin_transaction.call_count, 2) + api.begin_transaction.assert_called_with( + session=session.name, + options=txn_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + # Please note that this try was by an abort and not from service unavailable. + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.3.1", + ), + ], + ) else: self.assertEqual(api.begin_transaction.call_count, 1) + api.begin_transaction.assert_called_with( + session=session.name, + options=txn_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], + ) if params: expected_params = Struct( @@ -1196,6 +1352,10 @@ def _execute_partitioned_dml_helper( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.{database._channel_id}.2.1", + ), ], ) if retried: @@ -1211,11 +1371,33 @@ def _execute_partitioned_dml_helper( query_options=expected_query_options, request_options=expected_request_options, ) + + api.begin_transaction.assert_called_with( + session=self.SESSION_NAME, + options=txn_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + # Retrying on an aborted response involves creating the transaction afresh + # and also re-invoking execute_streaming_sql, hence the fresh request 4.1. + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.3.1", + ), + ], + ) + api.execute_streaming_sql.assert_called_with( request=expected_request, metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + # Retrying on an aborted response involves creating the transaction afresh + # and also re-invoking execute_streaming_sql, hence the fresh request 4.1. + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.4.1", + ), ], ) self.assertEqual(api.execute_streaming_sql.call_count, 2) @@ -1490,7 +1672,13 @@ def test_restore_grpc_error(self): api.restore_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_restore_not_found(self): @@ -1516,7 +1704,13 @@ def test_restore_not_found(self): api.restore_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_restore_success(self): @@ -1553,7 +1747,13 @@ def test_restore_success(self): api.restore_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_restore_success_w_encryption_config_dict(self): @@ -1594,7 +1794,13 @@ def test_restore_success_w_encryption_config_dict(self): api.restore_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_restore_w_invalid_encryption_config_dict(self): @@ -1741,7 +1947,13 @@ def test_list_database_roles_grpc_error(self): api.list_database_roles.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_list_database_roles_defaults(self): @@ -1762,7 +1974,13 @@ def test_list_database_roles_defaults(self): api.list_database_roles.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) self.assertIsNotNone(resp) @@ -1849,6 +2067,10 @@ def test_context_mgr_success(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + ), ], ) @@ -1896,6 +2118,10 @@ def test_context_mgr_w_commit_stats_success(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + ), ], ) @@ -1941,6 +2167,10 @@ def test_context_mgr_w_aborted_commit_status(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + ), ], ) @@ -3015,6 +3245,10 @@ def test_context_mgr_success(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + ), ], ) @@ -3113,6 +3347,8 @@ def _make_database_admin_api(): class _Client(object): + NTH_CLIENT = AtomicCounter() + def __init__( self, project=TestDatabase.PROJECT_ID, @@ -3135,6 +3371,13 @@ def __init__( self.directed_read_options = directed_read_options self.default_transaction_options = default_transaction_options self.observability_options = observability_options + self._nth_client_id = _Client.NTH_CLIENT.increment() + self._nth_request = AtomicCounter() + self.credentials = {} + + @property + def _next_nth_request(self): + return self._nth_request.increment() class _Instance(object): @@ -3164,6 +3407,31 @@ def __init__(self, name, instance=None): self._directed_read_options = None self.default_transaction_options = DefaultTransactionOptions() + @property + def _next_nth_request(self): + if self._instance and self._instance._client: + return self._instance._client._next_nth_request + return 1 + + @property + def _nth_client_id(self): + if self._instance and self._instance._client: + return self._instance._client._nth_client_id + return 1 + + def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + return _metadata_with_request_id( + self._nth_client_id, + self._channel_id, + nth_request, + nth_attempt, + prior_metadata, + ) + + @property + def _channel_id(self): + return 1 + class _Pool(object): _bound = None diff --git a/tests/unit/test_pool.py b/tests/unit/test_pool.py index 768f8482f3..8069f806d8 100644 --- a/tests/unit/test_pool.py +++ b/tests/unit/test_pool.py @@ -19,6 +19,11 @@ from datetime import datetime, timedelta import mock +from google.cloud.spanner_v1._helpers import ( + _metadata_with_request_id, + AtomicCounter, +) + from google.cloud.spanner_v1._opentelemetry_tracing import trace_call from tests._helpers import ( OpenTelemetryBase, @@ -1193,6 +1198,9 @@ def session_id(self): class _Database(object): + NTH_REQUEST = AtomicCounter() + NTH_CLIENT_ID = AtomicCounter() + def __init__(self, name): self.name = name self._sessions = [] @@ -1247,6 +1255,27 @@ def session(self, **kwargs): def observability_options(self): return dict(db_name=self.name) + @property + def _next_nth_request(self): + return self.NTH_REQUEST.increment() + + @property + def _nth_client_id(self): + return self.NTH_CLIENT_ID.increment() + + def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + return _metadata_with_request_id( + self._nth_client_id, + self._channel_id, + nth_request, + nth_attempt, + prior_metadata, + ) + + @property + def _channel_id(self): + return 1 + class _Queue(object): _size = 1 diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index d72c01f5ab..d97c545256 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -49,6 +49,11 @@ from google.protobuf.struct_pb2 import Struct, Value from google.cloud.spanner_v1.batch import Batch from google.cloud.spanner_v1 import DefaultTransactionOptions +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID +from google.cloud.spanner_v1._helpers import ( + _metadata_with_request_id, + AtomicCounter, +) def _make_rpc_error(error_cls, trailing_metadata=None): @@ -95,7 +100,20 @@ def _make_database( database.database_role = database_role database._route_to_leader_enabled = True database.default_transaction_options = default_transaction_options + nth_client_id = AtomicCounter(1) + database.NTH_CLIENT = nth_client_id + next_nth_request = AtomicCounter(0) + + def metadata_with_request_id(nth_request, nth_attempt, prior_metadata=[]): + return _metadata_with_request_id( + nth_client_id.value, + 1, + next_nth_request.increment(), + nth_attempt, + prior_metadata, + ) + database.metadata_with_request_id = metadata_with_request_id return database @staticmethod @@ -191,6 +209,10 @@ def test_create_w_database_role(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -226,6 +248,10 @@ def test_create_session_span_annotations(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -253,6 +279,10 @@ def test_create_wo_database_role(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -281,6 +311,10 @@ def test_create_ok(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -311,6 +345,10 @@ def test_create_w_labels(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -358,6 +396,10 @@ def test_exists_hit(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -386,6 +428,10 @@ def test_exists_hit_wo_span(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -406,6 +452,10 @@ def test_exists_miss(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -433,6 +483,10 @@ def test_exists_miss_wo_span(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -454,6 +508,10 @@ def test_exists_error(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -486,7 +544,13 @@ def test_ping_hit(self): gax_api.execute_sql.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), + ], ) def test_ping_miss(self): @@ -507,7 +571,13 @@ def test_ping_miss(self): gax_api.execute_sql.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), + ], ) def test_ping_error(self): @@ -528,7 +598,13 @@ def test_ping_error(self): gax_api.execute_sql.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), + ], ) def test_delete_wo_session_id(self): @@ -552,7 +628,13 @@ def test_delete_hit(self): gax_api.delete_session.assert_called_once_with( name=self.SESSION_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), + ], ) attrs = {"session.id": session._session_id, "session.name": session.name} @@ -575,7 +657,13 @@ def test_delete_miss(self): gax_api.delete_session.assert_called_once_with( name=self.SESSION_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), + ], ) attrs = {"session.id": session._session_id, "session.name": session.name} @@ -600,7 +688,13 @@ def test_delete_error(self): gax_api.delete_session.assert_called_once_with( name=self.SESSION_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), + ], ) attrs = {"session.id": session._session_id, "session.name": session.name} @@ -936,6 +1030,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) request = CommitRequest( @@ -949,6 +1047,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -1000,6 +1102,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -1052,10 +1158,25 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], - ) - ] - * 2, + ), + mock.call( + session=self.SESSION_NAME, + options=expected_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.3.1", + ), + ], + ), + ], ) request = CommitRequest( session=self.SESSION_NAME, @@ -1071,10 +1192,24 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), + ], + ), + mock.call( + request=request, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.4.1", + ), ], - ) - ] - * 2, + ), + ], ) def test_run_in_transaction_w_abort_w_retry_metadata(self): @@ -1137,10 +1272,25 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], - ) - ] - * 2, + ), + mock.call( + session=self.SESSION_NAME, + options=expected_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.3.1", + ), + ], + ), + ], ) request = CommitRequest( session=self.SESSION_NAME, @@ -1156,10 +1306,24 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), ], - ) - ] - * 2, + ), + mock.call( + request=request, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.4.1", + ), + ], + ), + ], ) def test_run_in_transaction_w_callback_raises_abort_wo_metadata(self): @@ -1221,6 +1385,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) request = CommitRequest( @@ -1234,6 +1402,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -1297,6 +1469,10 @@ def _time(_results=[1, 1.5]): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) request = CommitRequest( @@ -1310,6 +1486,10 @@ def _time(_results=[1, 1.5]): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -1360,6 +1540,7 @@ def _time(_results=[1, 2, 4, 8]): self.assertEqual(kw, {}) expected_options = TransactionOptions(read_write=TransactionOptions.ReadWrite()) + print("gax_api", gax_api.begin_transaction.call_args_list[2]) self.assertEqual( gax_api.begin_transaction.call_args_list, [ @@ -1369,10 +1550,37 @@ def _time(_results=[1, 2, 4, 8]): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), + ], + ), + mock.call( + session=self.SESSION_NAME, + options=expected_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.3.1", + ), ], - ) - ] - * 3, + ), + mock.call( + session=self.SESSION_NAME, + options=expected_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.5.1", + ), + ], + ), + ], ) request = CommitRequest( session=self.SESSION_NAME, @@ -1388,10 +1596,35 @@ def _time(_results=[1, 2, 4, 8]): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), + ], + ), + mock.call( + request=request, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.4.1", + ), + ], + ), + mock.call( + request=request, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.6.1", + ), ], - ) - ] - * 3, + ), + ], ) def test_run_in_transaction_w_commit_stats_success(self): @@ -1440,6 +1673,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) request = CommitRequest( @@ -1454,6 +1691,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), ], ) database.logger.info.assert_called_once_with( @@ -1502,6 +1743,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) request = CommitRequest( @@ -1516,6 +1761,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), ], ) database.logger.info.assert_not_called() @@ -1568,6 +1817,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) request = CommitRequest( @@ -1581,6 +1834,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -1633,6 +1890,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) request = CommitRequest( @@ -1646,6 +1907,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -1719,10 +1984,25 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], - ) - ] - * 2, + ), + mock.call( + session=self.SESSION_NAME, + options=expected_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.3.1", + ), + ], + ), + ], ) request = CommitRequest( session=self.SESSION_NAME, @@ -1738,10 +2018,24 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), ], - ) - ] - * 2, + ), + mock.call( + request=request, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.4.1", + ), + ], + ), + ], ) def test_run_in_transaction_w_isolation_level_at_request(self): @@ -1773,6 +2067,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -1807,6 +2105,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -1845,6 +2147,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index 11fc0135d1..c1fe8ec1b5 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -26,6 +26,11 @@ ) from google.cloud.spanner_v1.param_types import INT64 from google.api_core.retry import Retry +from google.cloud.spanner_v1._helpers import ( + AtomicCounter, + _metadata_with_request_id, +) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID TABLE_NAME = "citizens" COLUMNS = ["email", "first_name", "last_name", "age"] @@ -135,6 +140,7 @@ def _call_fut( session, attributes, transaction=derived, + request_id_manager=session._database, ) def _make_item(self, value, resume_token=b"", metadata=None): @@ -153,9 +159,18 @@ def test_iteration_w_empty_raw(self): database.spanner_api = self._make_spanner_api() session = _Session(database) derived = self._makeDerived(session) - resumable = self._call_fut(derived, restart, request) + resumable = self._call_fut(derived, restart, request, session=session) self.assertEqual(list(resumable), []) - restart.assert_called_once_with(request=request, metadata=None) + restart.assert_called_once_with( + request=request, + metadata=[ + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ) + ], + ) + self.assertNoSpans() def test_iteration_w_non_empty_raw(self): @@ -167,9 +182,17 @@ def test_iteration_w_non_empty_raw(self): database.spanner_api = self._make_spanner_api() session = _Session(database) derived = self._makeDerived(session) - resumable = self._call_fut(derived, restart, request) + resumable = self._call_fut(derived, restart, request, session=session) self.assertEqual(list(resumable), list(ITEMS)) - restart.assert_called_once_with(request=request, metadata=None) + restart.assert_called_once_with( + request=request, + metadata=[ + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ) + ], + ) self.assertNoSpans() def test_iteration_w_raw_w_resume_tken(self): @@ -186,9 +209,17 @@ def test_iteration_w_raw_w_resume_tken(self): database.spanner_api = self._make_spanner_api() session = _Session(database) derived = self._makeDerived(session) - resumable = self._call_fut(derived, restart, request) + resumable = self._call_fut(derived, restart, request, session=session) self.assertEqual(list(resumable), list(ITEMS)) - restart.assert_called_once_with(request=request, metadata=None) + restart.assert_called_once_with( + request=request, + metadata=[ + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ) + ], + ) self.assertNoSpans() def test_iteration_w_raw_raising_unavailable_no_token(self): @@ -207,7 +238,7 @@ def test_iteration_w_raw_raising_unavailable_no_token(self): database.spanner_api = self._make_spanner_api() session = _Session(database) derived = self._makeDerived(session) - resumable = self._call_fut(derived, restart, request) + resumable = self._call_fut(derived, restart, request, session=session) self.assertEqual(list(resumable), list(ITEMS)) self.assertEqual(len(restart.mock_calls), 2) self.assertEqual(request.resume_token, b"") @@ -234,7 +265,7 @@ def test_iteration_w_raw_raising_retryable_internal_error_no_token(self): database.spanner_api = self._make_spanner_api() session = _Session(database) derived = self._makeDerived(session) - resumable = self._call_fut(derived, restart, request) + resumable = self._call_fut(derived, restart, request, session=session) self.assertEqual(list(resumable), list(ITEMS)) self.assertEqual(len(restart.mock_calls), 2) self.assertEqual(request.resume_token, b"") @@ -256,10 +287,18 @@ def test_iteration_w_raw_raising_non_retryable_internal_error_no_token(self): database.spanner_api = self._make_spanner_api() session = _Session(database) derived = self._makeDerived(session) - resumable = self._call_fut(derived, restart, request) + resumable = self._call_fut(derived, restart, request, session=session) with self.assertRaises(InternalServerError): list(resumable) - restart.assert_called_once_with(request=request, metadata=None) + restart.assert_called_once_with( + request=request, + metadata=[ + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ) + ], + ) self.assertNoSpans() def test_iteration_w_raw_raising_unavailable(self): @@ -278,7 +317,7 @@ def test_iteration_w_raw_raising_unavailable(self): database.spanner_api = self._make_spanner_api() session = _Session(database) derived = self._makeDerived(session) - resumable = self._call_fut(derived, restart, request) + resumable = self._call_fut(derived, restart, request, session=session) self.assertEqual(list(resumable), list(FIRST + LAST)) self.assertEqual(len(restart.mock_calls), 2) self.assertEqual(request.resume_token, RESUME_TOKEN) @@ -295,7 +334,7 @@ def test_iteration_w_raw_raising_retryable_internal_error(self): fail_after=True, error=InternalServerError( "Received unexpected EOS on DATA frame from server" - ) + ), ) after = _MockIterator(*LAST) request = mock.Mock(test="test", spec=["test", "resume_token"]) @@ -304,7 +343,7 @@ def test_iteration_w_raw_raising_retryable_internal_error(self): database.spanner_api = self._make_spanner_api() session = _Session(database) derived = self._makeDerived(session) - resumable = self._call_fut(derived, restart, request) + resumable = self._call_fut(derived, restart, request, session=session) self.assertEqual(list(resumable), list(FIRST + LAST)) self.assertEqual(len(restart.mock_calls), 2) self.assertEqual(request.resume_token, RESUME_TOKEN) @@ -326,10 +365,18 @@ def test_iteration_w_raw_raising_non_retryable_internal_error(self): database.spanner_api = self._make_spanner_api() session = _Session(database) derived = self._makeDerived(session) - resumable = self._call_fut(derived, restart, request) + resumable = self._call_fut(derived, restart, request, session=session) with self.assertRaises(InternalServerError): list(resumable) - restart.assert_called_once_with(request=request, metadata=None) + restart.assert_called_once_with( + request=request, + metadata=[ + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ) + ], + ) self.assertNoSpans() def test_iteration_w_raw_raising_unavailable_after_token(self): @@ -347,7 +394,7 @@ def test_iteration_w_raw_raising_unavailable_after_token(self): database.spanner_api = self._make_spanner_api() session = _Session(database) derived = self._makeDerived(session) - resumable = self._call_fut(derived, restart, request) + resumable = self._call_fut(derived, restart, request, session=session) self.assertEqual(list(resumable), list(FIRST + SECOND)) self.assertEqual(len(restart.mock_calls), 2) self.assertEqual(request.resume_token, RESUME_TOKEN) @@ -370,7 +417,7 @@ def test_iteration_w_raw_w_multiuse(self): session = _Session(database) derived = self._makeDerived(session) derived._multi_use = True - resumable = self._call_fut(derived, restart, request) + resumable = self._call_fut(derived, restart, request, session=session) self.assertEqual(list(resumable), list(FIRST)) self.assertEqual(len(restart.mock_calls), 1) begin_count = sum( @@ -401,7 +448,7 @@ def test_iteration_w_raw_raising_unavailable_w_multiuse(self): session = _Session(database) derived = self._makeDerived(session) derived._multi_use = True - resumable = self._call_fut(derived, restart, request) + resumable = self._call_fut(derived, restart, request, session=session) self.assertEqual(list(resumable), list(SECOND)) self.assertEqual(len(restart.mock_calls), 2) begin_count = sum( @@ -440,7 +487,7 @@ def test_iteration_w_raw_raising_unavailable_after_token_w_multiuse(self): derived = self._makeDerived(session) derived._multi_use = True - resumable = self._call_fut(derived, restart, request) + resumable = self._call_fut(derived, restart, request, session=session) self.assertEqual(list(resumable), list(FIRST + SECOND)) self.assertEqual(len(restart.mock_calls), 2) @@ -467,7 +514,7 @@ def test_iteration_w_raw_raising_retryable_internal_error_after_token(self): fail_after=True, error=InternalServerError( "Received unexpected EOS on DATA frame from server" - ) + ), ) after = _MockIterator(*SECOND) request = mock.Mock(test="test", spec=["test", "resume_token"]) @@ -476,7 +523,7 @@ def test_iteration_w_raw_raising_retryable_internal_error_after_token(self): database.spanner_api = self._make_spanner_api() session = _Session(database) derived = self._makeDerived(session) - resumable = self._call_fut(derived, restart, request) + resumable = self._call_fut(derived, restart, request, session=session) self.assertEqual(list(resumable), list(FIRST + SECOND)) self.assertEqual(len(restart.mock_calls), 2) self.assertEqual(request.resume_token, RESUME_TOKEN) @@ -497,10 +544,18 @@ def test_iteration_w_raw_raising_non_retryable_internal_error_after_token(self): database.spanner_api = self._make_spanner_api() session = _Session(database) derived = self._makeDerived(session) - resumable = self._call_fut(derived, restart, request) + resumable = self._call_fut(derived, restart, request, session=session) with self.assertRaises(InternalServerError): list(resumable) - restart.assert_called_once_with(request=request, metadata=None) + restart.assert_called_once_with( + request=request, + metadata=[ + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ) + ], + ) self.assertNoSpans() def test_iteration_w_span_creation(self): @@ -777,7 +832,13 @@ def _read_helper( ) api.streaming_read.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), + ], retry=retry, timeout=timeout, ) @@ -1026,7 +1087,13 @@ def _execute_sql_helper( ) api.execute_streaming_sql.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), + ], timeout=timeout, retry=retry, ) @@ -1199,6 +1266,10 @@ def _partition_read_helper( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=retry, timeout=timeout, @@ -1378,6 +1449,10 @@ def _partition_query_helper( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=retry, timeout=timeout, @@ -1774,7 +1849,13 @@ def test_begin_ok_exact_staleness(self): api.begin_transaction.assert_called_once_with( session=session.name, options=expected_txn_options, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), + ], ) self.assertSpanAttributes( @@ -1810,7 +1891,13 @@ def test_begin_ok_exact_strong(self): api.begin_transaction.assert_called_once_with( session=session.name, options=expected_txn_options, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), + ], ) self.assertSpanAttributes( @@ -1821,10 +1908,18 @@ def test_begin_ok_exact_strong(self): class _Client(object): + NTH_CLIENT = AtomicCounter() + def __init__(self): from google.cloud.spanner_v1 import ExecuteSqlRequest self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1") + self._nth_client_id = _Client.NTH_CLIENT.increment() + self._nth_request = AtomicCounter() + + @property + def _next_nth_request(self): + return self._nth_request.increment() class _Instance(object): @@ -1843,6 +1938,27 @@ def __init__(self, directed_read_options=None): def observability_options(self): return dict(db_name=self.name) + @property + def _next_nth_request(self): + return self._instance._client._next_nth_request + + @property + def _nth_client_id(self): + return self._instance._client._nth_client_id + + def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + return _metadata_with_request_id( + self._nth_client_id, + self._channel_id, + nth_request, + nth_attempt, + prior_metadata, + ) + + @property + def _channel_id(self): + return 1 + class _Session(object): def __init__(self, database=None, name=TestSnapshot.SESSION_NAME): diff --git a/tests/unit/test_spanner.py b/tests/unit/test_spanner.py index 8bd95c7228..3a31d9b7d8 100644 --- a/tests/unit/test_spanner.py +++ b/tests/unit/test_spanner.py @@ -13,6 +13,8 @@ # limitations under the License. +import sys +import traceback import threading from google.protobuf.struct_pb2 import Struct from google.cloud.spanner_v1 import ( @@ -38,9 +40,12 @@ from google.cloud.spanner_v1.keyset import KeySet from google.cloud.spanner_v1._helpers import ( + AtomicCounter, _make_value_pb, _merge_query_options, + _metadata_with_request_id, ) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID import mock @@ -522,6 +527,10 @@ def test_transaction_should_include_begin_with_first_update(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -537,6 +546,10 @@ def test_transaction_should_include_begin_with_first_query(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], timeout=TIMEOUT, retry=RETRY, @@ -554,6 +567,10 @@ def test_transaction_should_include_begin_with_first_read(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -570,6 +587,10 @@ def test_transaction_should_include_begin_with_first_batch_update(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -595,6 +616,10 @@ def test_transaction_should_include_begin_w_exclude_txn_from_change_streams_with metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -621,6 +646,10 @@ def test_transaction_should_include_begin_w_isolation_level_with_first_update( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -639,6 +668,10 @@ def test_transaction_should_use_transaction_id_if_error_with_first_batch_update( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -653,6 +686,10 @@ def test_transaction_should_use_transaction_id_if_error_with_first_batch_update( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -669,6 +706,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_query(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -682,6 +723,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_query(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -698,6 +743,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_update(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -711,6 +760,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_update(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -732,6 +785,10 @@ def test_transaction_execute_sql_w_directed_read_options(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, @@ -755,6 +812,10 @@ def test_transaction_streaming_read_w_directed_read_options(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -771,6 +832,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_read(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -782,6 +847,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_read(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -798,6 +867,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_batch_update(se metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -810,6 +883,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_batch_update(se metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -850,6 +927,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -860,6 +941,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -868,6 +953,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.3.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -911,6 +1000,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.3.1", + ), ], ) @@ -919,6 +1012,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -929,6 +1026,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -977,6 +1078,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.3.1", + ), ], ) @@ -985,6 +1090,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -995,6 +1104,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -1043,6 +1156,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.3.1", + ), ], ) req = self._execute_sql_expected_request(database) @@ -1051,6 +1168,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -1061,6 +1182,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -1079,19 +1204,39 @@ def test_transaction_should_execute_sql_with_route_to_leader_disabled(self): api.execute_streaming_sql.assert_called_once_with( request=self._execute_sql_expected_request(database=database), - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), + ], timeout=TIMEOUT, retry=RETRY, ) class _Client(object): + NTH_CLIENT = AtomicCounter() + def __init__(self): from google.cloud.spanner_v1 import ExecuteSqlRequest self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1") self.directed_read_options = None self.default_transaction_options = DefaultTransactionOptions() + self._nth_client_id = _Client.NTH_CLIENT.increment() + self._nth_request = AtomicCounter() + + @property + def _next_nth_request(self): + return self._nth_request.increment() + + def get_next_request(self): + # This method exists because somehow Python isn't able to + # call the property method "_next_nth_request" and that's + # needlessly stalled progress. + return self._nth_request.increment() class _Instance(object): @@ -1107,6 +1252,27 @@ def __init__(self): self._directed_read_options = None self.default_transaction_options = DefaultTransactionOptions() + @property + def _next_nth_request(self): + return self._instance._client.get_next_request() + + @property + def _nth_client_id(self): + return self._instance._client._nth_client_id + + def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + return _metadata_with_request_id( + self._nth_client_id, + self._channel_id, + nth_request, + nth_attempt, + prior_metadata, + ) + + @property + def _channel_id(self): + return 1 + class _Session(object): _transaction = None diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index ff4743f1f6..64fafcae46 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -25,6 +25,7 @@ AtomicCounter, _metadata_with_request_id, ) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID from tests._helpers import ( HAS_OPENTELEMETRY_INSTALLED, @@ -201,11 +202,10 @@ def test_begin_ok(self): [ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), - # TODO(@odeke-em): enable with PR #1367. - # ( - # "x-goog-spanner-request-id", - # f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", - # ), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -310,11 +310,10 @@ def test_rollback_ok(self): [ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), - # TODO(@odeke-em): enable with PR #1367. - # ( - # "x-goog-spanner-request-id", - # f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", - # ), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -506,11 +505,10 @@ def _commit_helper( [ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), - # TODO(@odeke-em): enable with PR #1367. - # ( - # "x-goog-spanner-request-id", - # f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", - # ), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) self.assertEqual(actual_request_options, expected_request_options) @@ -685,11 +683,10 @@ def _execute_update_helper( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), - # TODO(@odeke-em): enable with PR #1367. - # ( - # "x-goog-spanner-request-id", - # f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", - # ), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -883,11 +880,10 @@ def _batch_update_helper( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), - # TODO(@odeke-em): enable with PR #1367. - # ( - # "x-goog-spanner-request-id", - # f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", - # ), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=retry, timeout=timeout, @@ -1003,11 +999,10 @@ def test_context_mgr_success(self): [ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), - # TODO(@odeke-em): enable with PR #1367. - # ( - # "x-goog-spanner-request-id", - # f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", - # ), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], )