diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index 0cbf044672..2194cb9c0d 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -26,6 +26,7 @@ _metadata_with_prefix, _metadata_with_leader_aware_routing, _merge_Transaction_Options, + AtomicCounter, ) from google.cloud.spanner_v1._opentelemetry_tracing import trace_call from google.cloud.spanner_v1 import RequestOptions @@ -248,7 +249,7 @@ def commit( trace_attributes, observability_options=observability_options, metadata=metadata, - ), MetricsCapture(): + ) as span, MetricsCapture(): def wrapped_method(*args, **kwargs): method = functools.partial( @@ -261,6 +262,7 @@ def wrapped_method(*args, **kwargs): getattr(database, "_next_nth_request", 0), 1, metadata, + span, ), ) return method(*args, **kwargs) @@ -384,14 +386,25 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals trace_attributes, observability_options=observability_options, metadata=metadata, - ), MetricsCapture(): - method = functools.partial( - api.batch_write, - request=request, - metadata=metadata, - ) + ) as span, MetricsCapture(): + attempt = AtomicCounter(0) + nth_request = getattr(database, "_next_nth_request", 0) + + def wrapped_method(*args, **kwargs): + method = functools.partial( + api.batch_write, + request=request, + metadata=database.metadata_with_request_id( + nth_request, + attempt.increment(), + metadata, + span, + ), + ) + return method(*args, **kwargs) + response = _retry( - method, + wrapped_method, allowed_exceptions={ InternalServerError: _check_rst_stream_error, }, diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index f2d570feb9..38d1cdd9ff 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -462,13 +462,19 @@ def spanner_api(self): return self._spanner_api - def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + def metadata_with_request_id( + self, nth_request, nth_attempt, prior_metadata=[], span=None + ): + if span is None: + span = get_current_span() + return _metadata_with_request_id( self._nth_client_id, self._channel_id, nth_request, nth_attempt, prior_metadata, + span, ) def __eq__(self, other): @@ -762,6 +768,7 @@ def execute_pdml(): self._next_nth_request, 1, metadata, + span, ), ) diff --git a/google/cloud/spanner_v1/pool.py b/google/cloud/spanner_v1/pool.py index 0bc0135ba0..b8b6e11da7 100644 --- a/google/cloud/spanner_v1/pool.py +++ b/google/cloud/spanner_v1/pool.py @@ -257,7 +257,10 @@ def bind(self, database): resp = api.batch_create_sessions( request=request, metadata=database.metadata_with_request_id( - database._next_nth_request, 1, metadata + database._next_nth_request, + 1, + metadata, + span, ), ) @@ -564,7 +567,10 @@ def bind(self, database): resp = api.batch_create_sessions( request=request, metadata=database.metadata_with_request_id( - database._next_nth_request, 1, metadata + database._next_nth_request, + 1, + metadata, + span, ), ) diff --git a/google/cloud/spanner_v1/request_id_header.py b/google/cloud/spanner_v1/request_id_header.py index 74a5bb1253..c095bc88e2 100644 --- a/google/cloud/spanner_v1/request_id_header.py +++ b/google/cloud/spanner_v1/request_id_header.py @@ -33,10 +33,32 @@ def generate_rand_uint64(): REQ_RAND_PROCESS_ID = generate_rand_uint64() +X_GOOG_SPANNER_REQUEST_ID_SPAN_ATTR = "x_goog_spanner_request_id" -def with_request_id(client_id, channel_id, nth_request, attempt, other_metadata=[]): +def with_request_id( + client_id, channel_id, nth_request, attempt, other_metadata=[], span=None +): req_id = f"{REQ_ID_VERSION}.{REQ_RAND_PROCESS_ID}.{client_id}.{channel_id}.{nth_request}.{attempt}" all_metadata = (other_metadata or []).copy() all_metadata.append((REQ_ID_HEADER_KEY, req_id)) + + if span is not None: + span.set_attribute(X_GOOG_SPANNER_REQUEST_ID_SPAN_ATTR, req_id) + return all_metadata + + +def parse_request_id(request_id_str): + splits = request_id_str.split(".") + version, rand_process_id, client_id, channel_id, nth_request, nth_attempt = list( + map(lambda v: int(v), splits) + ) + return ( + version, + rand_process_id, + client_id, + channel_id, + nth_request, + nth_attempt, + ) diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index e3ece505c6..a2e494fb33 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -167,11 +167,14 @@ def create(self): self._labels, observability_options=observability_options, metadata=metadata, - ), MetricsCapture(): + ) as span, MetricsCapture(): session_pb = api.create_session( request=request, metadata=self._database.metadata_with_request_id( - self._database._next_nth_request, 1, metadata + self._database._next_nth_request, + 1, + metadata, + span, ), ) self._session_id = session_pb.name.split("/")[-1] @@ -218,7 +221,10 @@ def exists(self): api.get_session( name=self.name, metadata=database.metadata_with_request_id( - database._next_nth_request, 1, metadata + database._next_nth_request, + 1, + metadata, + span, ), ) if span: @@ -263,11 +269,14 @@ def delete(self): }, observability_options=observability_options, metadata=metadata, - ), MetricsCapture(): + ) as span, MetricsCapture(): api.delete_session( name=self.name, metadata=database.metadata_with_request_id( - database._next_nth_request, 1, metadata + database._next_nth_request, + 1, + metadata, + span, ), ) diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index badc23026e..b8131db18a 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -104,11 +104,14 @@ def _restart_on_unavailable( attributes, observability_options=observability_options, metadata=metadata, - ), MetricsCapture(): + ) as span, MetricsCapture(): iterator = method( request=request, metadata=request_id_manager.metadata_with_request_id( - nth_request, attempt, metadata + nth_request, + attempt, + metadata, + span, ), ) for item in iterator: @@ -133,7 +136,7 @@ def _restart_on_unavailable( attributes, observability_options=observability_options, metadata=metadata, - ), MetricsCapture(): + ) as span, MetricsCapture(): request.resume_token = resume_token if transaction is not None: transaction_selector = transaction._make_txn_selector() @@ -142,7 +145,10 @@ def _restart_on_unavailable( iterator = method( request=request, metadata=request_id_manager.metadata_with_request_id( - nth_request, attempt, metadata + nth_request, + attempt, + metadata, + span, ), ) continue @@ -160,7 +166,7 @@ def _restart_on_unavailable( attributes, observability_options=observability_options, metadata=metadata, - ), MetricsCapture(): + ) as span, MetricsCapture(): request.resume_token = resume_token if transaction is not None: transaction_selector = transaction._make_txn_selector() @@ -169,7 +175,10 @@ def _restart_on_unavailable( iterator = method( request=request, metadata=request_id_manager.metadata_with_request_id( - nth_request, attempt, metadata + nth_request, + attempt, + metadata, + span, ), ) continue @@ -745,13 +754,16 @@ def partition_read( extra_attributes=trace_attributes, observability_options=getattr(database, "observability_options", None), metadata=metadata, - ), MetricsCapture(): + ) as span, MetricsCapture(): nth_request = getattr(database, "_next_nth_request", 0) attempt = AtomicCounter() def attempt_tracking_method(): all_metadata = database.metadata_with_request_id( - nth_request, attempt.increment(), metadata + nth_request, + attempt.increment(), + metadata, + span, ) method = functools.partial( api.partition_read, @@ -858,13 +870,16 @@ def partition_query( trace_attributes, observability_options=getattr(database, "observability_options", None), metadata=metadata, - ), MetricsCapture(): + ) as span, MetricsCapture(): nth_request = getattr(database, "_next_nth_request", 0) attempt = AtomicCounter() def attempt_tracking_method(): all_metadata = database.metadata_with_request_id( - nth_request, attempt.increment(), metadata + nth_request, + attempt.increment(), + metadata, + span, ) method = functools.partial( api.partition_query, @@ -1014,13 +1029,16 @@ def begin(self): self._session, observability_options=getattr(database, "observability_options", None), metadata=metadata, - ), MetricsCapture(): + ) as span, MetricsCapture(): nth_request = getattr(database, "_next_nth_request", 0) attempt = AtomicCounter() def attempt_tracking_method(): all_metadata = database.metadata_with_request_id( - nth_request, attempt.increment(), metadata + nth_request, + attempt.increment(), + metadata, + span, ) method = functools.partial( api.begin_transaction, diff --git a/google/cloud/spanner_v1/testing/interceptors.py b/google/cloud/spanner_v1/testing/interceptors.py index 71b77e4d16..e1745f0921 100644 --- a/google/cloud/spanner_v1/testing/interceptors.py +++ b/google/cloud/spanner_v1/testing/interceptors.py @@ -17,6 +17,7 @@ from grpc_interceptor import ClientInterceptor from google.api_core.exceptions import Aborted +from google.cloud.spanner_v1.request_id_header import parse_request_id class MethodCountInterceptor(ClientInterceptor): @@ -119,18 +120,3 @@ def stream_request_ids(self): def reset(self): self._stream_req_segments.clear() self._unary_req_segments.clear() - - -def parse_request_id(request_id_str): - splits = request_id_str.split(".") - version, rand_process_id, client_id, channel_id, nth_request, nth_attempt = list( - map(lambda v: int(v), splits) - ) - return ( - version, - rand_process_id, - client_id, - channel_id, - nth_request, - nth_attempt, - ) diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index e16912dcf1..795e158f6a 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -191,7 +191,10 @@ def wrapped_method(*args, **kwargs): session=self._session.name, options=txn_options, metadata=database.metadata_with_request_id( - nth_request, attempt.increment(), metadata + nth_request, + attempt.increment(), + metadata, + span, ), ) return method(*args, **kwargs) @@ -232,7 +235,7 @@ def rollback(self): self._session, observability_options=observability_options, metadata=metadata, - ), MetricsCapture(): + ) as span, MetricsCapture(): attempt = AtomicCounter(0) nth_request = database._next_nth_request @@ -243,7 +246,10 @@ def wrapped_method(*args, **kwargs): session=self._session.name, transaction_id=self._transaction_id, metadata=database.metadata_with_request_id( - nth_request, attempt.value, metadata + nth_request, + attempt.value, + metadata, + span, ), ) return method(*args, **kwargs) @@ -334,7 +340,10 @@ def wrapped_method(*args, **kwargs): api.commit, request=request, metadata=database.metadata_with_request_id( - nth_request, attempt.value, metadata + nth_request, + attempt.value, + metadata, + span, ), ) return method(*args, **kwargs) diff --git a/tests/system/test_session_api.py b/tests/system/test_session_api.py index 21d7bccd44..743ff2f958 100644 --- a/tests/system/test_session_api.py +++ b/tests/system/test_session_api.py @@ -33,6 +33,10 @@ from tests import _helpers as ot_helpers from . import _helpers from . import _sample_data +from google.cloud.spanner_v1.request_id_header import ( + REQ_RAND_PROCESS_ID, + parse_request_id, +) SOME_DATE = datetime.date(2011, 1, 17) @@ -441,28 +445,51 @@ def test_batch_insert_then_read(sessions_database, ot_exporter): if ot_exporter is not None: span_list = ot_exporter.get_finished_spans() + sampling_req_id = parse_request_id( + span_list[0].attributes["x_goog_spanner_request_id"] + ) + nth_req0 = sampling_req_id[-2] + + db = sessions_database assert_span_attributes( ot_exporter, "CloudSpanner.GetSession", - attributes=_make_attributes(db_name, session_found=True), + attributes=_make_attributes( + db_name, + session_found=True, + x_goog_spanner_request_id=f"1.{REQ_RAND_PROCESS_ID}.{db._nth_client_id}.{db._channel_id}.{nth_req0+0}.1", + ), span=span_list[0], ) assert_span_attributes( ot_exporter, "CloudSpanner.Batch.commit", - attributes=_make_attributes(db_name, num_mutations=2), + attributes=_make_attributes( + db_name, + num_mutations=2, + x_goog_spanner_request_id=f"1.{REQ_RAND_PROCESS_ID}.{db._nth_client_id}.{db._channel_id}.{nth_req0+1}.1", + ), span=span_list[1], ) assert_span_attributes( ot_exporter, "CloudSpanner.GetSession", - attributes=_make_attributes(db_name, session_found=True), + attributes=_make_attributes( + db_name, + session_found=True, + x_goog_spanner_request_id=f"1.{REQ_RAND_PROCESS_ID}.{db._nth_client_id}.{db._channel_id}.{nth_req0+2}.1", + ), span=span_list[2], ) assert_span_attributes( ot_exporter, "CloudSpanner.Snapshot.read", - attributes=_make_attributes(db_name, columns=sd.COLUMNS, table_id=sd.TABLE), + attributes=_make_attributes( + db_name, + columns=sd.COLUMNS, + table_id=sd.TABLE, + x_goog_spanner_request_id=f"1.{REQ_RAND_PROCESS_ID}.{db._nth_client_id}.{db._channel_id}.{nth_req0+3}.1", + ), span=span_list[3], ) @@ -625,28 +652,50 @@ def test_transaction_read_and_insert_then_rollback( ] assert got_span_names == want_span_names + sampling_req_id = parse_request_id( + span_list[0].attributes["x_goog_spanner_request_id"] + ) + nth_req0 = sampling_req_id[-2] + + db = sessions_database assert_span_attributes( ot_exporter, "CloudSpanner.CreateSession", - attributes=_make_attributes(db_name), + attributes=dict( + _make_attributes( + db_name, + x_goog_spanner_request_id=f"1.{REQ_RAND_PROCESS_ID}.{db._nth_client_id}.{db._channel_id}.{nth_req0+0}.1", + ), + ), span=span_list[0], ) assert_span_attributes( ot_exporter, "CloudSpanner.GetSession", - attributes=_make_attributes(db_name, session_found=True), + attributes=_make_attributes( + db_name, + session_found=True, + x_goog_spanner_request_id=f"1.{REQ_RAND_PROCESS_ID}.{db._nth_client_id}.{db._channel_id}.{nth_req0+1}.1", + ), span=span_list[1], ) assert_span_attributes( ot_exporter, "CloudSpanner.Batch.commit", - attributes=_make_attributes(db_name, num_mutations=1), + attributes=_make_attributes( + db_name, + num_mutations=1, + x_goog_spanner_request_id=f"1.{REQ_RAND_PROCESS_ID}.{db._nth_client_id}.{db._channel_id}.{nth_req0+2}.1", + ), span=span_list[2], ) assert_span_attributes( ot_exporter, "CloudSpanner.Transaction.begin", - attributes=_make_attributes(db_name), + attributes=_make_attributes( + db_name, + x_goog_spanner_request_id=f"1.{REQ_RAND_PROCESS_ID}.{db._nth_client_id}.{db._channel_id}.{nth_req0+3}.1", + ), span=span_list[3], ) assert_span_attributes( @@ -656,6 +705,7 @@ def test_transaction_read_and_insert_then_rollback( db_name, table_id=sd.TABLE, columns=sd.COLUMNS, + x_goog_spanner_request_id=f"1.{REQ_RAND_PROCESS_ID}.{db._nth_client_id}.{db._channel_id}.{nth_req0+4}.1", ), span=span_list[4], ) @@ -666,13 +716,17 @@ def test_transaction_read_and_insert_then_rollback( db_name, table_id=sd.TABLE, columns=sd.COLUMNS, + x_goog_spanner_request_id=f"1.{REQ_RAND_PROCESS_ID}.{db._nth_client_id}.{db._channel_id}.{nth_req0+5}.1", ), span=span_list[5], ) assert_span_attributes( ot_exporter, "CloudSpanner.Transaction.rollback", - attributes=_make_attributes(db_name), + attributes=_make_attributes( + db_name, + x_goog_spanner_request_id=f"1.{REQ_RAND_PROCESS_ID}.{db._nth_client_id}.{db._channel_id}.{nth_req0+6}.1", + ), span=span_list[6], ) assert_span_attributes( @@ -682,6 +736,7 @@ def test_transaction_read_and_insert_then_rollback( db_name, table_id=sd.TABLE, columns=sd.COLUMNS, + x_goog_spanner_request_id=f"1.{REQ_RAND_PROCESS_ID}.{db._nth_client_id}.{db._channel_id}.{nth_req0+7}.1", ), span=span_list[7], ) diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index 2014b60eb9..cb3dc7e2cd 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -216,10 +216,13 @@ def test_commit_grpc_error(self): with self.assertRaises(Unknown): batch.commit() + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" self.assertSpanAttributes( "CloudSpanner.Batch.commit", status=StatusCode.ERROR, - attributes=dict(BASE_ATTRIBUTES, num_mutations=1), + attributes=dict( + BASE_ATTRIBUTES, num_mutations=1, x_goog_spanner_request_id=req_id + ), ) def test_commit_ok(self): @@ -249,6 +252,7 @@ def test_commit_ok(self): self.assertEqual(mutations, batch._mutations) self.assertIsInstance(single_use_txn, TransactionOptions) self.assertTrue(type(single_use_txn).pb(single_use_txn).HasField("read_write")) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" self.assertEqual( metadata, [ @@ -256,7 +260,7 @@ def test_commit_ok(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + req_id, ), ], ) @@ -265,7 +269,9 @@ def test_commit_ok(self): self.assertSpanAttributes( "CloudSpanner.Batch.commit", - attributes=dict(BASE_ATTRIBUTES, num_mutations=1), + attributes=dict( + BASE_ATTRIBUTES, num_mutations=1, x_goog_spanner_request_id=req_id + ), ) def test_aborted_exception_on_commit_with_retries(self): @@ -347,6 +353,7 @@ def _test_commit_with_options( single_use_txn.isolation_level, isolation_level, ) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" self.assertEqual( metadata, [ @@ -354,7 +361,7 @@ def _test_commit_with_options( ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + req_id, ), ], ) @@ -362,7 +369,9 @@ def _test_commit_with_options( self.assertSpanAttributes( "CloudSpanner.Batch.commit", - attributes=dict(BASE_ATTRIBUTES, num_mutations=1), + attributes=dict( + BASE_ATTRIBUTES, num_mutations=1, x_goog_spanner_request_id=req_id + ), ) self.assertEqual(max_commit_delay_in, max_commit_delay) @@ -461,6 +470,7 @@ def test_context_mgr_success(self): self.assertEqual(mutations, batch._mutations) self.assertIsInstance(single_use_txn, TransactionOptions) self.assertTrue(type(single_use_txn).pb(single_use_txn).HasField("read_write")) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" self.assertEqual( metadata, [ @@ -468,7 +478,7 @@ def test_context_mgr_success(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + req_id, ), ], ) @@ -476,7 +486,9 @@ def test_context_mgr_success(self): self.assertSpanAttributes( "CloudSpanner.Batch.commit", - attributes=dict(BASE_ATTRIBUTES, num_mutations=1), + attributes=dict( + BASE_ATTRIBUTES, num_mutations=1, x_goog_spanner_request_id=req_id + ), ) def test_context_mgr_failure(self): @@ -520,10 +532,13 @@ def test_batch_write_already_committed(self): group = groups.group() group.delete(TABLE_NAME, keyset=keyset) groups.batch_write() + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" self.assertSpanAttributes( "CloudSpanner.batch_write", status=StatusCode.OK, - attributes=dict(BASE_ATTRIBUTES, num_mutation_groups=1), + attributes=dict( + BASE_ATTRIBUTES, num_mutation_groups=1, x_goog_spanner_request_id=req_id + ), ) assert groups.committed # The second call to batch_write should raise an error. @@ -543,10 +558,13 @@ def test_batch_write_grpc_error(self): with self.assertRaises(Unknown): groups.batch_write() + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" self.assertSpanAttributes( "CloudSpanner.batch_write", status=StatusCode.ERROR, - attributes=dict(BASE_ATTRIBUTES, num_mutation_groups=1), + attributes=dict( + BASE_ATTRIBUTES, num_mutation_groups=1, x_goog_spanner_request_id=req_id + ), ) def _test_batch_write_with_request_options( @@ -596,6 +614,11 @@ def _test_batch_write_with_request_options( "traceparent is missing in metadata", ) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" + expected_metadata.append( + ("x-goog-spanner-request-id", req_id), + ) + # Remove traceparent from actual metadata for comparison filtered_metadata = [item for item in metadata if item[0] != "traceparent"] @@ -615,7 +638,9 @@ def _test_batch_write_with_request_options( self.assertSpanAttributes( "CloudSpanner.batch_write", status=StatusCode.OK, - attributes=dict(BASE_ATTRIBUTES, num_mutation_groups=1), + attributes=dict( + BASE_ATTRIBUTES, num_mutation_groups=1, x_goog_spanner_request_id=req_id + ), ) def test_batch_write_no_request_options(self): @@ -671,13 +696,16 @@ def _next_nth_request(self): self._nth_request += 1 return self._nth_request - def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + def metadata_with_request_id( + self, nth_request, nth_attempt, prior_metadata=[], span=None + ): return _metadata_with_request_id( self._nth_client_id, self._channel_id, nth_request, nth_attempt, prior_metadata, + span, ) @property diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 56ac22eab0..44ef402daa 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -3241,6 +3241,10 @@ def test_context_mgr_success(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), ], ) @@ -3405,13 +3409,16 @@ def __init__(self, name, instance=None): def _next_nth_request(self): return self._nth_request.increment() - def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + def metadata_with_request_id( + self, nth_request, nth_attempt, prior_metadata=[], span=None + ): return _metadata_with_request_id( self._nth_client_id, self._channel_id, nth_request, nth_attempt, prior_metadata, + span, ) @property diff --git a/tests/unit/test_pool.py b/tests/unit/test_pool.py index 8069f806d8..d33c891838 100644 --- a/tests/unit/test_pool.py +++ b/tests/unit/test_pool.py @@ -23,6 +23,7 @@ _metadata_with_request_id, AtomicCounter, ) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID from google.cloud.spanner_v1._opentelemetry_tracing import trace_call from tests._helpers import ( @@ -260,7 +261,10 @@ def test_spans_bind_get(self): want_span_names = ["CloudSpanner.FixedPool.BatchCreateSessions", "pool.Get"] assert got_span_names == want_span_names - attrs = TestFixedSizePool.BASE_ATTRIBUTES.copy() + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id-1}.{database._channel_id}.{_Database.NTH_REQUEST.value}.1" + attrs = dict( + TestFixedSizePool.BASE_ATTRIBUTES.copy(), x_goog_spanner_request_id=req_id + ) # Check for the overall spans. self.assertSpanAttributes( @@ -927,7 +931,10 @@ def test_spans_put_full(self): want_span_names = ["CloudSpanner.PingingPool.BatchCreateSessions"] assert got_span_names == want_span_names - attrs = TestPingingPool.BASE_ATTRIBUTES.copy() + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id-1}.{database._channel_id}.{_Database.NTH_REQUEST.value}.1" + attrs = dict( + TestPingingPool.BASE_ATTRIBUTES.copy(), x_goog_spanner_request_id=req_id + ) self.assertSpanAttributes( "CloudSpanner.PingingPool.BatchCreateSessions", attributes=attrs, @@ -1263,13 +1270,16 @@ def _next_nth_request(self): def _nth_client_id(self): return self.NTH_CLIENT_ID.increment() - def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + def metadata_with_request_id( + self, nth_request, nth_attempt, prior_metadata=[], span=None + ): return _metadata_with_request_id( self._nth_client_id, self._channel_id, nth_request, nth_attempt, prior_metadata, + span, ) @property diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index b80d6bd18a..010d59e198 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -72,7 +72,9 @@ def inject_into_mock_database(mockdb): channel_id = 1 setattr(mockdb, "_channel_id", channel_id) - def metadata_with_request_id(nth_request, nth_attempt, prior_metadata=[]): + def metadata_with_request_id( + nth_request, nth_attempt, prior_metadata=[], span=None + ): nth_req = nth_request.fget(mockdb) return _metadata_with_request_id( nth_client_id, @@ -80,6 +82,7 @@ def metadata_with_request_id(nth_request, nth_attempt, prior_metadata=[]): nth_req, nth_attempt, prior_metadata, + span, ) setattr(mockdb, "metadata_with_request_id", metadata_with_request_id) @@ -223,6 +226,7 @@ def test_create_w_database_role(self): session=session_template, ) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" gax_api.create_session.assert_called_once_with( request=request, metadata=[ @@ -230,13 +234,16 @@ def test_create_w_database_role(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + req_id, ), ], ) self.assertSpanAttributes( - "CloudSpanner.CreateSession", attributes=TestSession.BASE_ATTRIBUTES + "CloudSpanner.CreateSession", + attributes=dict( + TestSession.BASE_ATTRIBUTES, x_goog_spanner_request_id=req_id + ), ) def test_create_session_span_annotations(self): @@ -293,6 +300,7 @@ def test_create_wo_database_role(self): database=database.name, ) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" gax_api.create_session.assert_called_once_with( request=request, metadata=[ @@ -306,7 +314,10 @@ def test_create_wo_database_role(self): ) self.assertSpanAttributes( - "CloudSpanner.CreateSession", attributes=TestSession.BASE_ATTRIBUTES + "CloudSpanner.CreateSession", + attributes=dict( + TestSession.BASE_ATTRIBUTES, x_goog_spanner_request_id=req_id + ), ) def test_create_ok(self): @@ -325,6 +336,7 @@ def test_create_ok(self): database=database.name, ) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" gax_api.create_session.assert_called_once_with( request=request, metadata=[ @@ -332,13 +344,16 @@ def test_create_ok(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + req_id, ), ], ) self.assertSpanAttributes( - "CloudSpanner.CreateSession", attributes=TestSession.BASE_ATTRIBUTES + "CloudSpanner.CreateSession", + attributes=dict( + TestSession.BASE_ATTRIBUTES, x_goog_spanner_request_id=req_id + ), ) def test_create_w_labels(self): @@ -359,6 +374,7 @@ def test_create_w_labels(self): session=SessionRequestProto(labels=labels), ) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" gax_api.create_session.assert_called_once_with( request=request, metadata=[ @@ -366,14 +382,16 @@ def test_create_w_labels(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + req_id, ), ], ) self.assertSpanAttributes( "CloudSpanner.CreateSession", - attributes=dict(TestSession.BASE_ATTRIBUTES, foo="bar"), + attributes=dict( + TestSession.BASE_ATTRIBUTES, foo="bar", x_goog_spanner_request_id=req_id + ), ) def test_create_error(self): @@ -386,10 +404,13 @@ def test_create_error(self): with self.assertRaises(Unknown): session.create() + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" self.assertSpanAttributes( "CloudSpanner.CreateSession", status=StatusCode.ERROR, - attributes=TestSession.BASE_ATTRIBUTES, + attributes=dict( + TestSession.BASE_ATTRIBUTES, x_goog_spanner_request_id=req_id + ), ) def test_exists_wo_session_id(self): @@ -410,6 +431,7 @@ def test_exists_hit(self): self.assertTrue(session.exists()) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" gax_api.get_session.assert_called_once_with( name=self.SESSION_NAME, metadata=[ @@ -417,14 +439,18 @@ def test_exists_hit(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + req_id, ), ], ) self.assertSpanAttributes( "CloudSpanner.GetSession", - attributes=dict(TestSession.BASE_ATTRIBUTES, session_found=True), + attributes=dict( + TestSession.BASE_ATTRIBUTES, + session_found=True, + x_goog_spanner_request_id=req_id, + ), ) @mock.patch( @@ -466,6 +492,7 @@ def test_exists_miss(self): self.assertFalse(session.exists()) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" gax_api.get_session.assert_called_once_with( name=self.SESSION_NAME, metadata=[ @@ -473,14 +500,18 @@ def test_exists_miss(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + req_id, ), ], ) self.assertSpanAttributes( "CloudSpanner.GetSession", - attributes=dict(TestSession.BASE_ATTRIBUTES, session_found=False), + attributes=dict( + TestSession.BASE_ATTRIBUTES, + session_found=False, + x_goog_spanner_request_id=req_id, + ), ) @mock.patch( @@ -522,6 +553,7 @@ def test_exists_error(self): with self.assertRaises(Unknown): session.exists() + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" gax_api.get_session.assert_called_once_with( name=self.SESSION_NAME, metadata=[ @@ -529,7 +561,7 @@ def test_exists_error(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + req_id, ), ], ) @@ -537,7 +569,9 @@ def test_exists_error(self): self.assertSpanAttributes( "CloudSpanner.GetSession", status=StatusCode.ERROR, - attributes=TestSession.BASE_ATTRIBUTES, + attributes=dict( + TestSession.BASE_ATTRIBUTES, x_goog_spanner_request_id=req_id + ), ) def test_ping_wo_session_id(self): @@ -645,13 +679,14 @@ def test_delete_hit(self): session.delete() + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" gax_api.delete_session.assert_called_once_with( name=self.SESSION_NAME, metadata=[ ("google-cloud-resource-prefix", database.name), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + req_id, ), ], ) @@ -660,7 +695,7 @@ def test_delete_hit(self): attrs.update(TestSession.BASE_ATTRIBUTES) self.assertSpanAttributes( "CloudSpanner.DeleteSession", - attributes=attrs, + attributes=dict(attrs, x_goog_spanner_request_id=req_id), ) def test_delete_miss(self): @@ -674,18 +709,23 @@ def test_delete_miss(self): with self.assertRaises(NotFound): session.delete() + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" gax_api.delete_session.assert_called_once_with( name=self.SESSION_NAME, metadata=[ ("google-cloud-resource-prefix", database.name), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + req_id, ), ], ) - attrs = {"session.id": session._session_id, "session.name": session.name} + attrs = { + "session.id": session._session_id, + "session.name": session.name, + "x_goog_spanner_request_id": req_id, + } attrs.update(TestSession.BASE_ATTRIBUTES) self.assertSpanAttributes( @@ -705,18 +745,23 @@ def test_delete_error(self): with self.assertRaises(Unknown): session.delete() + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" gax_api.delete_session.assert_called_once_with( name=self.SESSION_NAME, metadata=[ ("google-cloud-resource-prefix", database.name), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + req_id, ), ], ) - attrs = {"session.id": session._session_id, "session.name": session.name} + attrs = { + "session.id": session._session_id, + "session.name": session.name, + "x_goog_spanner_request_id": req_id, + } attrs.update(TestSession.BASE_ATTRIBUTES) self.assertSpanAttributes( diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index 7b3ad679a9..1d5a367341 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -570,7 +570,13 @@ def test_iteration_w_span_creation(self): derived, restart, request, name, _Session(_Database()), extra_atts ) self.assertEqual(list(resumable), []) - self.assertSpanAttributes(name, attributes=dict(BASE_ATTRIBUTES, test_att=1)) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" + self.assertSpanAttributes( + name, + attributes=dict( + BASE_ATTRIBUTES, test_att=1, x_goog_spanner_request_id=req_id + ), + ) def test_iteration_w_multiple_span_creation(self): from google.api_core.exceptions import ServiceUnavailable @@ -599,11 +605,15 @@ def test_iteration_w_multiple_span_creation(self): span_list = self.ot_exporter.get_finished_spans() self.assertEqual(len(span_list), 2) - for span in span_list: + for i, span in enumerate(span_list): self.assertEqual(span.name, name) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.{i+1}" self.assertEqual( dict(span.attributes), - enrich_with_otel_scope(BASE_ATTRIBUTES), + dict( + enrich_with_otel_scope(BASE_ATTRIBUTES), + x_goog_spanner_request_id=req_id, + ), ) @@ -678,11 +688,15 @@ def test_read_other_error(self): with self.assertRaises(RuntimeError): list(derived.read(TABLE_NAME, COLUMNS, keyset)) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" self.assertSpanAttributes( "CloudSpanner._Derived.read", status=StatusCode.ERROR, attributes=dict( - BASE_ATTRIBUTES, table_id=TABLE_NAME, columns=tuple(COLUMNS) + BASE_ATTRIBUTES, + table_id=TABLE_NAME, + columns=tuple(COLUMNS), + x_goog_spanner_request_id=req_id, ), ) @@ -828,13 +842,14 @@ def _read_helper( request_options=expected_request_options, directed_read_options=expected_directed_read_options, ) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" api.streaming_read.assert_called_once_with( request=expected_request, metadata=[ ("google-cloud-resource-prefix", database.name), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + req_id, ), ], retry=retry, @@ -844,7 +859,10 @@ def _read_helper( self.assertSpanAttributes( "CloudSpanner._Derived.read", attributes=dict( - BASE_ATTRIBUTES, table_id=TABLE_NAME, columns=tuple(COLUMNS) + BASE_ATTRIBUTES, + table_id=TABLE_NAME, + columns=tuple(COLUMNS), + x_goog_spanner_request_id=req_id, ), ) @@ -936,10 +954,14 @@ def test_execute_sql_other_error(self): self.assertEqual(derived._execute_sql_count, 1) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" self.assertSpanAttributes( "CloudSpanner._Derived.execute_sql", status=StatusCode.ERROR, - attributes=dict(BASE_ATTRIBUTES, **{"db.statement": SQL_QUERY}), + attributes=dict( + BASE_ATTRIBUTES, + **{"db.statement": SQL_QUERY, "x_goog_spanner_request_id": req_id}, + ), ) def _execute_sql_helper( @@ -1083,13 +1105,14 @@ def _execute_sql_helper( seqno=sql_count, directed_read_options=expected_directed_read_options, ) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" api.execute_streaming_sql.assert_called_once_with( request=expected_request, metadata=[ ("google-cloud-resource-prefix", database.name), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + req_id, ), ], timeout=timeout, @@ -1101,7 +1124,13 @@ def _execute_sql_helper( self.assertSpanAttributes( "CloudSpanner._Derived.execute_sql", status=StatusCode.OK, - attributes=dict(BASE_ATTRIBUTES, **{"db.statement": SQL_QUERY_WITH_PARAM}), + attributes=dict( + BASE_ATTRIBUTES, + **{ + "db.statement": SQL_QUERY_WITH_PARAM, + "x_goog_spanner_request_id": req_id, + }, + ), ) def test_execute_sql_wo_multi_use(self): @@ -1259,6 +1288,7 @@ def _partition_read_helper( index=index, partition_options=expected_partition_options, ) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" api.partition_read.assert_called_once_with( request=expected_request, metadata=[ @@ -1266,7 +1296,7 @@ def _partition_read_helper( ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + req_id, ), ], retry=retry, @@ -1277,6 +1307,7 @@ def _partition_read_helper( BASE_ATTRIBUTES, table_id=TABLE_NAME, columns=tuple(COLUMNS), + x_goog_spanner_request_id=req_id, ) if index: want_span_attributes["index"] = index @@ -1309,11 +1340,15 @@ def test_partition_read_other_error(self): with self.assertRaises(RuntimeError): list(derived.partition_read(TABLE_NAME, COLUMNS, keyset)) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" self.assertSpanAttributes( "CloudSpanner._Derived.partition_read", status=StatusCode.ERROR, attributes=dict( - BASE_ATTRIBUTES, table_id=TABLE_NAME, columns=tuple(COLUMNS) + BASE_ATTRIBUTES, + table_id=TABLE_NAME, + columns=tuple(COLUMNS), + x_goog_spanner_request_id=req_id, ), ) @@ -1442,6 +1477,7 @@ def _partition_query_helper( param_types=PARAM_TYPES, partition_options=expected_partition_options, ) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" api.partition_query.assert_called_once_with( request=expected_request, metadata=[ @@ -1449,7 +1485,7 @@ def _partition_query_helper( ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + req_id, ), ], retry=retry, @@ -1459,7 +1495,13 @@ def _partition_query_helper( self.assertSpanAttributes( "CloudSpanner._Derived.partition_query", status=StatusCode.OK, - attributes=dict(BASE_ATTRIBUTES, **{"db.statement": SQL_QUERY_WITH_PARAM}), + attributes=dict( + BASE_ATTRIBUTES, + **{ + "db.statement": SQL_QUERY_WITH_PARAM, + "x_goog_spanner_request_id": req_id, + }, + ), ) def test_partition_query_other_error(self): @@ -1474,10 +1516,14 @@ def test_partition_query_other_error(self): with self.assertRaises(RuntimeError): list(derived.partition_query(SQL_QUERY)) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" self.assertSpanAttributes( "CloudSpanner._Derived.partition_query", status=StatusCode.ERROR, - attributes=dict(BASE_ATTRIBUTES, **{"db.statement": SQL_QUERY}), + attributes=dict( + BASE_ATTRIBUTES, + **{"db.statement": SQL_QUERY, "x_goog_spanner_request_id": req_id}, + ), ) def test_partition_query_single_use_raises(self): @@ -1792,10 +1838,11 @@ def test_begin_w_other_error(self): want_span_names = ["CloudSpanner.Snapshot.begin"] assert got_span_names == want_span_names + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" self.assertSpanAttributes( "CloudSpanner.Snapshot.begin", status=StatusCode.ERROR, - attributes=BASE_ATTRIBUTES, + attributes=dict(BASE_ATTRIBUTES, x_goog_spanner_request_id=req_id), ) def test_begin_w_retry(self): @@ -1844,6 +1891,7 @@ def test_begin_ok_exact_staleness(self): ) ) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" api.begin_transaction.assert_called_once_with( session=session.name, options=expected_txn_options, @@ -1851,7 +1899,7 @@ def test_begin_ok_exact_staleness(self): ("google-cloud-resource-prefix", database.name), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + req_id, ), ], ) @@ -1859,7 +1907,7 @@ def test_begin_ok_exact_staleness(self): self.assertSpanAttributes( "CloudSpanner.Snapshot.begin", status=StatusCode.OK, - attributes=BASE_ATTRIBUTES, + attributes=dict(BASE_ATTRIBUTES, x_goog_spanner_request_id=req_id), ) def test_begin_ok_exact_strong(self): @@ -1886,6 +1934,7 @@ def test_begin_ok_exact_strong(self): ) ) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" api.begin_transaction.assert_called_once_with( session=session.name, options=expected_txn_options, @@ -1893,7 +1942,7 @@ def test_begin_ok_exact_strong(self): ("google-cloud-resource-prefix", database.name), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + req_id, ), ], ) @@ -1901,7 +1950,7 @@ def test_begin_ok_exact_strong(self): self.assertSpanAttributes( "CloudSpanner.Snapshot.begin", status=StatusCode.OK, - attributes=BASE_ATTRIBUTES, + attributes=dict(BASE_ATTRIBUTES, x_goog_spanner_request_id=req_id), ) @@ -1938,13 +1987,16 @@ def _next_nth_request(self): def _nth_client_id(self): return 1 - def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + def metadata_with_request_id( + self, nth_request, nth_attempt, prior_metadata=[], span=None + ): return _metadata_with_request_id( self._nth_client_id, self._channel_id, nth_request, nth_attempt, prior_metadata, + span, ) @property diff --git a/tests/unit/test_spanner.py b/tests/unit/test_spanner.py index b3b24ad6c8..85892e47ec 100644 --- a/tests/unit/test_spanner.py +++ b/tests/unit/test_spanner.py @@ -1264,13 +1264,16 @@ def _next_nth_request(self): def _nth_client_id(self): return self._instance._client._nth_client_id - def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + def metadata_with_request_id( + self, nth_request, nth_attempt, prior_metadata=[], span=None + ): return _metadata_with_request_id( self._nth_client_id, self._channel_id, nth_request, nth_attempt, prior_metadata, + span, ) @property diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 64fafcae46..e477ef27c6 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -172,10 +172,13 @@ def test_begin_w_other_error(self): with self.assertRaises(RuntimeError): transaction.begin() + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" self.assertSpanAttributes( "CloudSpanner.Transaction.begin", status=StatusCode.ERROR, - attributes=TestTransaction.BASE_ATTRIBUTES, + attributes=dict( + TestTransaction.BASE_ATTRIBUTES, x_goog_spanner_request_id=req_id + ), ) def test_begin_ok(self): @@ -197,6 +200,7 @@ def test_begin_ok(self): session_id, txn_options, metadata = api._begun self.assertEqual(session_id, session.name) self.assertTrue(type(txn_options).pb(txn_options).HasField("read_write")) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1" self.assertEqual( metadata, [ @@ -204,13 +208,16 @@ def test_begin_ok(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + req_id, ), ], ) self.assertSpanAttributes( - "CloudSpanner.Transaction.begin", attributes=TestTransaction.BASE_ATTRIBUTES + "CloudSpanner.Transaction.begin", + attributes=dict( + TestTransaction.BASE_ATTRIBUTES, x_goog_spanner_request_id=req_id + ), ) def test_begin_w_retry(self): @@ -280,10 +287,13 @@ def test_rollback_w_other_error(self): self.assertFalse(transaction.rolled_back) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" self.assertSpanAttributes( "CloudSpanner.Transaction.rollback", status=StatusCode.ERROR, - attributes=TestTransaction.BASE_ATTRIBUTES, + attributes=dict( + TestTransaction.BASE_ATTRIBUTES, x_goog_spanner_request_id=req_id + ), ) def test_rollback_ok(self): @@ -305,6 +315,7 @@ def test_rollback_ok(self): session_id, txn_id, metadata = api._rolled_back self.assertEqual(session_id, session.name) self.assertEqual(txn_id, self.TRANSACTION_ID) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" self.assertEqual( metadata, [ @@ -312,14 +323,16 @@ def test_rollback_ok(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + req_id, ), ], ) self.assertSpanAttributes( "CloudSpanner.Transaction.rollback", - attributes=TestTransaction.BASE_ATTRIBUTES, + attributes=dict( + TestTransaction.BASE_ATTRIBUTES, x_goog_spanner_request_id=req_id + ), ) def test_commit_not_begun(self): @@ -430,10 +443,15 @@ def test_commit_w_other_error(self): self.assertIsNone(transaction.committed) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1" self.assertSpanAttributes( "CloudSpanner.Transaction.commit", status=StatusCode.ERROR, - attributes=dict(TestTransaction.BASE_ATTRIBUTES, num_mutations=1), + attributes=dict( + TestTransaction.BASE_ATTRIBUTES, + num_mutations=1, + x_goog_spanner_request_id=req_id, + ), ) def _commit_helper( @@ -500,6 +518,7 @@ def _commit_helper( self.assertEqual(session_id, session.name) self.assertEqual(txn_id, self.TRANSACTION_ID) self.assertEqual(mutations, transaction._mutations) + req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1" self.assertEqual( metadata, [ @@ -507,7 +526,7 @@ def _commit_helper( ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + req_id, ), ], ) @@ -521,6 +540,7 @@ def _commit_helper( attributes=dict( TestTransaction.BASE_ATTRIBUTES, num_mutations=len(transaction._mutations), + x_goog_spanner_request_id=req_id, ), ) @@ -1069,13 +1089,16 @@ def _next_nth_request(self): def _nth_client_id(self): return self._instance._client._nth_client_id - def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + def metadata_with_request_id( + self, nth_request, nth_attempt, prior_metadata=[], span=None + ): return _metadata_with_request_id( self._nth_client_id, self._channel_id, nth_request, nth_attempt, prior_metadata, + span, ) @property