diff --git a/google/cloud/spanner_v1/testing/interceptors.py b/google/cloud/spanner_v1/testing/interceptors.py index e1745f0921..fd05a6d4b3 100644 --- a/google/cloud/spanner_v1/testing/interceptors.py +++ b/google/cloud/spanner_v1/testing/interceptors.py @@ -72,9 +72,6 @@ def reset(self): class XGoogRequestIDHeaderInterceptor(ClientInterceptor): - # TODO:(@odeke-em): delete this guard when PR #1367 is merged. - X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED = True - def __init__(self): self._unary_req_segments = [] self._stream_req_segments = [] @@ -88,7 +85,7 @@ def intercept(self, method, request_or_iterator, call_details): x_goog_request_id = value break - if self.X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED and not x_goog_request_id: + if not x_goog_request_id: raise Exception( f"Missing {X_GOOG_REQUEST_ID} header in {call_details.method}" ) @@ -96,16 +93,15 @@ def intercept(self, method, request_or_iterator, call_details): response_or_iterator = method(request_or_iterator, call_details) streaming = getattr(response_or_iterator, "__iter__", None) is not None - if self.X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED: - with self.__lock: - if streaming: - self._stream_req_segments.append( - (call_details.method, parse_request_id(x_goog_request_id)) - ) - else: - self._unary_req_segments.append( - (call_details.method, parse_request_id(x_goog_request_id)) - ) + with self.__lock: + if streaming: + self._stream_req_segments.append( + (call_details.method, parse_request_id(x_goog_request_id)) + ) + else: + self._unary_req_segments.append( + (call_details.method, parse_request_id(x_goog_request_id)) + ) return response_or_iterator diff --git a/tests/mockserver_tests/test_request_id_header.py b/tests/mockserver_tests/test_request_id_header.py new file mode 100644 index 0000000000..6503d179d5 --- /dev/null +++ b/tests/mockserver_tests/test_request_id_header.py @@ -0,0 +1,325 @@ +# Copyright 2025 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +import threading + +from google.cloud.spanner_v1 import ( + BatchCreateSessionsRequest, + BeginTransactionRequest, + ExecuteSqlRequest, +) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID +from google.cloud.spanner_v1.testing.mock_spanner import SpannerServicer +from tests.mockserver_tests.mock_server_test_base import ( + MockServerTestBase, + add_select1_result, + aborted_status, + add_error, + unavailable_status, +) + + +class TestRequestIDHeader(MockServerTestBase): + def tearDown(self): + self.database._x_goog_request_id_interceptor.reset() + + def test_snapshot_execute_sql(self): + add_select1_result() + if not getattr(self.database, "_interceptors", None): + self.database._interceptors = MockServerTestBase._interceptors + with self.database.snapshot() as snapshot: + results = snapshot.execute_sql("select 1") + result_list = [] + for row in results: + result_list.append(row) + self.assertEqual(1, row[0]) + self.assertEqual(1, len(result_list)) + + requests = self.spanner_service.requests + self.assertEqual(2, len(requests), msg=requests) + self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest)) + self.assertTrue(isinstance(requests[1], ExecuteSqlRequest)) + + NTH_CLIENT = self.database._nth_client_id + CHANNEL_ID = self.database._channel_id + # Now ensure monotonicity of the received request-id segments. + got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers() + want_unary_segments = [ + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 1), + ) + ] + want_stream_segments = [ + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 1), + ) + ] + + assert got_unary_segments == want_unary_segments + assert got_stream_segments == want_stream_segments + + def test_snapshot_read_concurrent(self): + add_select1_result() + db = self.database + # Trigger BatchCreateSessions first. + with db.snapshot() as snapshot: + rows = snapshot.execute_sql("select 1") + for row in rows: + _ = row + + # The other requests can then proceed. + def select1(): + with db.snapshot() as snapshot: + rows = snapshot.execute_sql("select 1") + res_list = [] + for row in rows: + self.assertEqual(1, row[0]) + res_list.append(row) + self.assertEqual(1, len(res_list)) + + n = 10 + threads = [] + for i in range(n): + th = threading.Thread(target=select1, name=f"snapshot-select1-{i}") + threads.append(th) + th.start() + + random.shuffle(threads) + for thread in threads: + thread.join() + + requests = self.spanner_service.requests + # We expect 2 + n requests, because: + # 1. The initial query triggers one BatchCreateSessions call + one ExecuteStreamingSql call. + # 2. Each following query triggers one ExecuteStreamingSql call. + self.assertEqual(2 + n, len(requests), msg=requests) + + client_id = db._nth_client_id + channel_id = db._channel_id + got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers() + + want_unary_segments = [ + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 1, 1), + ), + ] + assert got_unary_segments == want_unary_segments + + want_stream_segments = [ + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 2, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 3, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 4, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 5, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 6, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 7, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 8, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 9, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 10, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 11, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 12, 1), + ), + ] + assert got_stream_segments == want_stream_segments + + def test_database_run_in_transaction_retries_on_abort(self): + counters = dict(aborted=0) + want_failed_attempts = 2 + + def select_in_txn(txn): + results = txn.execute_sql("select 1") + for row in results: + _ = row + + if counters["aborted"] < want_failed_attempts: + counters["aborted"] += 1 + add_error(SpannerServicer.Commit.__name__, aborted_status()) + + add_select1_result() + if not getattr(self.database, "_interceptors", None): + self.database._interceptors = MockServerTestBase._interceptors + + self.database.run_in_transaction(select_in_txn) + + def test_database_execute_partitioned_dml_request_id(self): + add_select1_result() + if not getattr(self.database, "_interceptors", None): + self.database._interceptors = MockServerTestBase._interceptors + _ = self.database.execute_partitioned_dml("select 1") + + requests = self.spanner_service.requests + self.assertEqual(3, len(requests), msg=requests) + self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest)) + self.assertTrue(isinstance(requests[1], BeginTransactionRequest)) + self.assertTrue(isinstance(requests[2], ExecuteSqlRequest)) + + # Now ensure monotonicity of the received request-id segments. + got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers() + NTH_CLIENT = self.database._nth_client_id + CHANNEL_ID = self.database._channel_id + want_unary_segments = [ + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 1), + ), + ( + "/google.spanner.v1.Spanner/BeginTransaction", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 1), + ), + ] + want_stream_segments = [ + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 3, 1), + ) + ] + + assert got_unary_segments == want_unary_segments + assert got_stream_segments == want_stream_segments + + def test_unary_retryable_error(self): + add_select1_result() + add_error(SpannerServicer.BatchCreateSessions.__name__, unavailable_status()) + + if not getattr(self.database, "_interceptors", None): + self.database._interceptors = MockServerTestBase._interceptors + with self.database.snapshot() as snapshot: + results = snapshot.execute_sql("select 1") + result_list = [] + for row in results: + result_list.append(row) + self.assertEqual(1, row[0]) + self.assertEqual(1, len(result_list)) + + requests = self.spanner_service.requests + self.assertEqual(3, len(requests), msg=requests) + self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest)) + self.assertTrue(isinstance(requests[1], BatchCreateSessionsRequest)) + self.assertTrue(isinstance(requests[2], ExecuteSqlRequest)) + + NTH_CLIENT = self.database._nth_client_id + CHANNEL_ID = self.database._channel_id + # Now ensure monotonicity of the received request-id segments. + got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers() + + want_stream_segments = [ + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 1), + ) + ] + assert got_stream_segments == want_stream_segments + + want_unary_segments = [ + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 1), + ), + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 2), + ), + ] + # TODO(@odeke-em): enable this test in the next iteration + # when we've figured out unary retries with UNAVAILABLE. + # See https://github.com/googleapis/python-spanner/issues/1379. + if True: + print( + "TODO(@odeke-em): enable request_id checking when we figure out propagation for unary requests" + ) + else: + assert got_unary_segments == want_unary_segments + + def test_streaming_retryable_error(self): + add_select1_result() + add_error(SpannerServicer.ExecuteStreamingSql.__name__, unavailable_status()) + + if not getattr(self.database, "_interceptors", None): + self.database._interceptors = MockServerTestBase._interceptors + with self.database.snapshot() as snapshot: + results = snapshot.execute_sql("select 1") + result_list = [] + for row in results: + result_list.append(row) + self.assertEqual(1, row[0]) + self.assertEqual(1, len(result_list)) + + requests = self.spanner_service.requests + self.assertEqual(3, len(requests), msg=requests) + self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest)) + self.assertTrue(isinstance(requests[1], ExecuteSqlRequest)) + self.assertTrue(isinstance(requests[2], ExecuteSqlRequest)) + + NTH_CLIENT = self.database._nth_client_id + CHANNEL_ID = self.database._channel_id + # Now ensure monotonicity of the received request-id segments. + got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers() + want_unary_segments = [ + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 1), + ), + ] + want_stream_segments = [ + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 2), + ), + ] + + assert got_unary_segments == want_unary_segments + assert got_stream_segments == want_stream_segments + + def canonicalize_request_id_headers(self): + src = self.database._x_goog_request_id_interceptor + return src._stream_req_segments, src._unary_req_segments diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 44ef402daa..9f66127e72 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -120,7 +120,9 @@ def _make_database_admin_api(): def _make_spanner_api(): from google.cloud.spanner_v1 import SpannerClient - return mock.create_autospec(SpannerClient, instance=True) + api = mock.create_autospec(SpannerClient, instance=True) + api._transport = "transport" + return api def test_ctor_defaults(self): from google.cloud.spanner_v1.pool import BurstyPool @@ -1300,6 +1302,19 @@ def _execute_partitioned_dml_helper( ], ) self.assertEqual(api.begin_transaction.call_count, 2) + api.begin_transaction.assert_called_with( + session=session.name, + options=txn_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + # Please note that this try was by an abort and not from service unavailable. + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.3.1", + ), + ], + ) else: api.begin_transaction.assert_called_with( session=session.name, @@ -1314,6 +1329,18 @@ def _execute_partitioned_dml_helper( ], ) self.assertEqual(api.begin_transaction.call_count, 1) + api.begin_transaction.assert_called_with( + session=session.name, + options=txn_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], + ) if params: expected_params = Struct( diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index 1d5a367341..2eefb04ba0 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -26,6 +26,7 @@ ) from google.cloud.spanner_v1._helpers import ( _metadata_with_request_id, + AtomicCounter, ) from google.cloud.spanner_v1.param_types import INT64 from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID @@ -165,7 +166,7 @@ def test_iteration_w_empty_raw(self): metadata=[ ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ) ], ) @@ -187,7 +188,7 @@ def test_iteration_w_non_empty_raw(self): metadata=[ ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ) ], ) @@ -214,7 +215,7 @@ def test_iteration_w_raw_w_resume_tken(self): metadata=[ ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ) ], ) @@ -293,7 +294,7 @@ def test_iteration_w_raw_raising_non_retryable_internal_error_no_token(self): metadata=[ ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ) ], ) @@ -371,7 +372,7 @@ def test_iteration_w_raw_raising_non_retryable_internal_error(self): metadata=[ ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ) ], ) @@ -550,7 +551,7 @@ def test_iteration_w_raw_raising_non_retryable_internal_error_after_token(self): metadata=[ ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ) ], ) @@ -1955,10 +1956,18 @@ def test_begin_ok_exact_strong(self): class _Client(object): + NTH_CLIENT = AtomicCounter() + def __init__(self): from google.cloud.spanner_v1 import ExecuteSqlRequest self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1") + self._nth_client_id = _Client.NTH_CLIENT.increment() + self._nth_request = AtomicCounter() + + @property + def _next_nth_request(self): + return self._nth_request.increment() class _Instance(object): diff --git a/tests/unit/test_spanner.py b/tests/unit/test_spanner.py index 85892e47ec..4acd7d3798 100644 --- a/tests/unit/test_spanner.py +++ b/tests/unit/test_spanner.py @@ -38,11 +38,9 @@ from google.cloud.spanner_v1.keyset import KeySet from google.cloud.spanner_v1._helpers import ( + AtomicCounter, _make_value_pb, _merge_query_options, -) -from google.cloud.spanner_v1._helpers import ( - AtomicCounter, _metadata_with_request_id, ) from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID @@ -547,7 +545,7 @@ def test_transaction_should_include_begin_with_first_query(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ), ], timeout=TIMEOUT, @@ -568,7 +566,7 @@ def test_transaction_should_include_begin_with_first_read(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ), ], retry=RETRY, @@ -588,7 +586,7 @@ def test_transaction_should_include_begin_with_first_batch_update(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ), ], retry=RETRY, @@ -617,7 +615,7 @@ def test_transaction_should_include_begin_w_exclude_txn_from_change_streams_with ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ), ], ) @@ -647,7 +645,7 @@ def test_transaction_should_include_begin_w_isolation_level_with_first_update( ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ), ], ) @@ -669,7 +667,7 @@ def test_transaction_should_use_transaction_id_if_error_with_first_batch_update( ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ), ], retry=RETRY, @@ -687,7 +685,7 @@ def test_transaction_should_use_transaction_id_if_error_with_first_batch_update( ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.2.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.2.1", ), ], ) @@ -707,7 +705,7 @@ def test_transaction_should_use_transaction_id_returned_by_first_query(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ), ], ) @@ -724,7 +722,7 @@ def test_transaction_should_use_transaction_id_returned_by_first_query(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.2.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.2.1", ), ], ) @@ -744,7 +742,7 @@ def test_transaction_should_use_transaction_id_returned_by_first_update(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ), ], ) @@ -761,7 +759,7 @@ def test_transaction_should_use_transaction_id_returned_by_first_update(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.2.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.2.1", ), ], ) @@ -786,7 +784,7 @@ def test_transaction_execute_sql_w_directed_read_options(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ), ], retry=gapic_v1.method.DEFAULT, @@ -813,7 +811,7 @@ def test_transaction_streaming_read_w_directed_read_options(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ), ], retry=RETRY, @@ -833,7 +831,7 @@ def test_transaction_should_use_transaction_id_returned_by_first_read(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ), ], retry=RETRY, @@ -848,7 +846,7 @@ def test_transaction_should_use_transaction_id_returned_by_first_read(self): ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.2.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.2.1", ), ], retry=RETRY, @@ -868,7 +866,7 @@ def test_transaction_should_use_transaction_id_returned_by_first_batch_update(se ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ), ], retry=RETRY, @@ -884,7 +882,7 @@ def test_transaction_should_use_transaction_id_returned_by_first_batch_update(se ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.2.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.2.1", ), ], retry=RETRY, @@ -928,7 +926,7 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ), ], ) @@ -942,7 +940,7 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.2.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.2.1", ), ], ) @@ -954,7 +952,7 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.3.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.3.1", ), ], retry=RETRY, @@ -1002,7 +1000,7 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ ("x-goog-spanner-route-to-leader", "true"), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.3.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.3.1", ), ], ) @@ -1218,7 +1216,7 @@ def test_transaction_should_execute_sql_with_route_to_leader_disabled(self): ("google-cloud-resource-prefix", database.name), ( "x-goog-spanner-request-id", - f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", ), ], timeout=TIMEOUT,