From 7a961488b28fb1751be0a2791cd543bd8e542a04 Mon Sep 17 00:00:00 2001 From: larkee Date: Mon, 9 Sep 2019 11:17:30 +1000 Subject: [PATCH 1/7] Update session pools to use batch_create_sessions --- spanner/google/cloud/spanner_v1/pool.py | 39 ++++++++++++++++++++----- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/spanner/google/cloud/spanner_v1/pool.py b/spanner/google/cloud/spanner_v1/pool.py index 823681fbc864..93f991b1fb4d 100644 --- a/spanner/google/cloud/spanner_v1/pool.py +++ b/spanner/google/cloud/spanner_v1/pool.py @@ -20,6 +20,7 @@ from six.moves import xrange from google.cloud.exceptions import NotFound +from google.cloud.spanner_v1._helpers import _metadata_with_prefix _NOW = datetime.datetime.utcnow # unit tests may replace @@ -166,11 +167,22 @@ def bind(self, database): when needed. """ self._database = database + api = database.spanner_api + created_session_count = 0 + metadata = _metadata_with_prefix(database.name) while not self._sessions.full(): - session = self._new_session() - session.create() - self._sessions.put(session) + resp = api.batch_create_sessions( + database.name, + session_count=self.size - created_session_count, + timeout=self.default_timeout, + metadata=metadata, + ) + for session_pb in resp.session: + session = self._new_session() + session._session_id = session_pb.name.split("/")[-1] + self._sessions.put(session) + created_session_count += len(resp.session) def get(self, timeout=None): # pylint: disable=arguments-differ """Check a session out from the pool. @@ -350,11 +362,22 @@ def bind(self, database): when needed. """ self._database = database - - for _ in xrange(self.size): - session = self._new_session() - session.create() - self.put(session) + api = database.spanner_api + metadata = _metadata_with_prefix(database.name) + created_session_count = 0 + + while created_session_count < self.size: + resp = api.batch_create_sessions( + database.name, + session_count=self.size - created_session_count, + timeout=self.default_timeout, + metadata=metadata, + ) + for session_pb in resp.session: + session = self._new_session() + session._session_id = session_pb.name.split("/")[-1] + self._sessions.put(session) + created_session_count += len(resp.session) def get(self, timeout=None): # pylint: disable=arguments-differ """Check a session out from the pool. From 46c10b855c732e5134ebc94a86767da233f4c90c Mon Sep 17 00:00:00 2001 From: larkee Date: Wed, 11 Sep 2019 17:43:44 +1000 Subject: [PATCH 2/7] Update session pool unit tests to handle batch_create_session calls --- spanner/tests/unit/test_pool.py | 58 +++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 18 deletions(-) diff --git a/spanner/tests/unit/test_pool.py b/spanner/tests/unit/test_pool.py index 549044b1f423..9c2f8a9af199 100644 --- a/spanner/tests/unit/test_pool.py +++ b/spanner/tests/unit/test_pool.py @@ -156,8 +156,10 @@ def test_bind(self): self.assertEqual(pool.default_timeout, 10) self.assertTrue(pool._sessions.full()) + api = database.spanner_api + self.assertEqual(api.batch_create_sessions.call_count, 5) for session in SESSIONS: - self.assertTrue(session._created) + session.create.assert_not_called() def test_get_non_expired(self): pool = self._make_one(size=4) @@ -183,7 +185,7 @@ def test_get_expired(self): session = pool.get() self.assertIs(session, SESSIONS[4]) - self.assertTrue(session._created) + session.create.assert_called() self.assertTrue(SESSIONS[0]._exists_checked) self.assertFalse(pool._sessions.full()) @@ -243,8 +245,10 @@ def test_clear(self): pool.bind(database) self.assertTrue(pool._sessions.full()) + api = database.spanner_api + self.assertEqual(api.batch_create_sessions.call_count, 5) for session in SESSIONS: - self.assertTrue(session._created) + session.create.assert_not_called() pool.clear() @@ -286,7 +290,7 @@ def test_get_empty(self): self.assertIsInstance(session, _Session) self.assertIs(session._database, database) - self.assertTrue(session._created) + session.create.assert_called() self.assertTrue(pool._sessions.empty()) def test_get_non_empty_session_exists(self): @@ -299,7 +303,7 @@ def test_get_non_empty_session_exists(self): session = pool.get() self.assertIs(session, previous) - self.assertFalse(session._created) + session.create.assert_not_called() self.assertTrue(session._exists_checked) self.assertTrue(pool._sessions.empty()) @@ -316,7 +320,7 @@ def test_get_non_empty_session_expired(self): self.assertTrue(previous._exists_checked) self.assertIs(session, newborn) - self.assertTrue(session._created) + session.create.assert_called() self.assertFalse(session._exists_checked) self.assertTrue(pool._sessions.empty()) @@ -405,7 +409,6 @@ def test_bind(self): database = _Database("name") SESSIONS = [_Session(database)] * 10 database._sessions.extend(SESSIONS) - pool.bind(database) self.assertIs(pool._database, database) @@ -414,8 +417,10 @@ def test_bind(self): self.assertEqual(pool._delta.seconds, 3000) self.assertTrue(pool._sessions.full()) + api = database.spanner_api + self.assertEqual(api.batch_create_sessions.call_count, 5) for session in SESSIONS: - self.assertTrue(session._created) + session.create.assert_not_called() def test_get_hit_no_ping(self): pool = self._make_one(size=4) @@ -470,7 +475,7 @@ def test_get_hit_w_ping_expired(self): session = pool.get() self.assertIs(session, SESSIONS[4]) - self.assertTrue(session._created) + session.create.assert_called() self.assertTrue(SESSIONS[0]._exists_checked) self.assertFalse(pool._sessions.full()) @@ -522,6 +527,7 @@ def test_put_non_full(self): database = _Database("name") session = _Session(database) + with _Monkey(MUT, _NOW=lambda: now): pool.put(session) @@ -538,8 +544,10 @@ def test_clear(self): pool.bind(database) self.assertTrue(pool._sessions.full()) + api = database.spanner_api + self.assertEqual(api.batch_create_sessions.call_count, 5) for session in SESSIONS: - self.assertTrue(session._created) + session.create.assert_not_called() pool.clear() @@ -595,7 +603,7 @@ def test_ping_oldest_stale_and_not_exists(self): pool.ping() self.assertTrue(SESSIONS[0]._exists_checked) - self.assertTrue(SESSIONS[1]._created) + SESSIONS[1].create.assert_called() class TestTransactionPingingPool(unittest.TestCase): @@ -635,7 +643,6 @@ def test_bind(self): database = _Database("name") SESSIONS = [_Session(database) for _ in range(10)] database._sessions.extend(SESSIONS) - pool.bind(database) self.assertIs(pool._database, database) @@ -644,8 +651,10 @@ def test_bind(self): self.assertEqual(pool._delta.seconds, 3000) self.assertTrue(pool._sessions.full()) + api = database.spanner_api + self.assertEqual(api.batch_create_sessions.call_count, 5) for session in SESSIONS: - self.assertTrue(session._created) + session.create.assert_not_called() txn = session._transaction self.assertTrue(txn._begun) @@ -671,8 +680,10 @@ def test_bind_w_timestamp_race(self): self.assertEqual(pool._delta.seconds, 3000) self.assertTrue(pool._sessions.full()) + api = database.spanner_api + self.assertEqual(api.batch_create_sessions.call_count, 5) for session in SESSIONS: - self.assertTrue(session._created) + session.create.assert_not_called() txn = session._transaction self.assertTrue(txn._begun) @@ -843,16 +854,13 @@ def __init__(self, database, exists=True, transaction=None): self._database = database self._exists = exists self._exists_checked = False - self._created = False + self.create = mock.Mock() self._deleted = False self._transaction = transaction def __lt__(self, other): return id(self) < id(other) - def create(self): - self._created = True - def exists(self): self._exists_checked = True return self._exists @@ -874,6 +882,20 @@ def __init__(self, name): self.name = name self._sessions = [] + def mock_batch_create_sessions(db, session_count=10, timeout=10, metadata=[]): + from google.cloud.spanner_v1.proto import spanner_pb2 + response = spanner_pb2.BatchCreateSessionsResponse() + if session_count < 2: + response.session.add() + else: + response.session.add() + response.session.add() + return response + + from google.cloud.spanner_v1.gapic.spanner_client import SpannerClient + self.spanner_api = mock.create_autospec(SpannerClient, instance=True) + self.spanner_api.batch_create_sessions.side_effect = mock_batch_create_sessions + def session(self): return self._sessions.pop() From 2c015f8dea422669536321c6461994e3e9505316 Mon Sep 17 00:00:00 2001 From: larkee Date: Wed, 11 Sep 2019 17:45:16 +1000 Subject: [PATCH 3/7] Fix where PingingPool sessions are added to in batch_create_session --- spanner/google/cloud/spanner_v1/pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spanner/google/cloud/spanner_v1/pool.py b/spanner/google/cloud/spanner_v1/pool.py index 93f991b1fb4d..6f4b4c0fb132 100644 --- a/spanner/google/cloud/spanner_v1/pool.py +++ b/spanner/google/cloud/spanner_v1/pool.py @@ -376,7 +376,7 @@ def bind(self, database): for session_pb in resp.session: session = self._new_session() session._session_id = session_pb.name.split("/")[-1] - self._sessions.put(session) + self.put(session) created_session_count += len(resp.session) def get(self, timeout=None): # pylint: disable=arguments-differ From 21f45668af402f746e7bc88a3d96b9540026a35f Mon Sep 17 00:00:00 2001 From: larkee Date: Wed, 11 Sep 2019 17:54:26 +1000 Subject: [PATCH 4/7] Remove unnecessary variable from FixedSizePool bind() --- spanner/google/cloud/spanner_v1/pool.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/spanner/google/cloud/spanner_v1/pool.py b/spanner/google/cloud/spanner_v1/pool.py index 6f4b4c0fb132..57d6de46a27f 100644 --- a/spanner/google/cloud/spanner_v1/pool.py +++ b/spanner/google/cloud/spanner_v1/pool.py @@ -168,13 +168,12 @@ def bind(self, database): """ self._database = database api = database.spanner_api - created_session_count = 0 metadata = _metadata_with_prefix(database.name) while not self._sessions.full(): resp = api.batch_create_sessions( database.name, - session_count=self.size - created_session_count, + session_count=self.size - self._sessions.qsize(), timeout=self.default_timeout, metadata=metadata, ) @@ -182,7 +181,6 @@ def bind(self, database): session = self._new_session() session._session_id = session_pb.name.split("/")[-1] self._sessions.put(session) - created_session_count += len(resp.session) def get(self, timeout=None): # pylint: disable=arguments-differ """Check a session out from the pool. From 1ee1eb598104771559100345e313cf0502552927 Mon Sep 17 00:00:00 2001 From: larkee Date: Fri, 13 Sep 2019 15:13:55 +1000 Subject: [PATCH 5/7] Remove unused import --- spanner/google/cloud/spanner_v1/pool.py | 1 - 1 file changed, 1 deletion(-) diff --git a/spanner/google/cloud/spanner_v1/pool.py b/spanner/google/cloud/spanner_v1/pool.py index 57d6de46a27f..3b2a8d06bc99 100644 --- a/spanner/google/cloud/spanner_v1/pool.py +++ b/spanner/google/cloud/spanner_v1/pool.py @@ -17,7 +17,6 @@ import datetime from six.moves import queue -from six.moves import xrange from google.cloud.exceptions import NotFound from google.cloud.spanner_v1._helpers import _metadata_with_prefix From f43ac005d83e78b2de1a93b28689f9e8706d929c Mon Sep 17 00:00:00 2001 From: larkee Date: Fri, 13 Sep 2019 15:15:06 +1000 Subject: [PATCH 6/7] Apply lint formatting to test_pool.py --- spanner/tests/unit/test_pool.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/spanner/tests/unit/test_pool.py b/spanner/tests/unit/test_pool.py index 9c2f8a9af199..eded02ea4e6d 100644 --- a/spanner/tests/unit/test_pool.py +++ b/spanner/tests/unit/test_pool.py @@ -527,7 +527,6 @@ def test_put_non_full(self): database = _Database("name") session = _Session(database) - with _Monkey(MUT, _NOW=lambda: now): pool.put(session) @@ -883,16 +882,18 @@ def __init__(self, name): self._sessions = [] def mock_batch_create_sessions(db, session_count=10, timeout=10, metadata=[]): - from google.cloud.spanner_v1.proto import spanner_pb2 - response = spanner_pb2.BatchCreateSessionsResponse() - if session_count < 2: - response.session.add() - else: - response.session.add() - response.session.add() - return response + from google.cloud.spanner_v1.proto import spanner_pb2 + + response = spanner_pb2.BatchCreateSessionsResponse() + if session_count < 2: + response.session.add() + else: + response.session.add() + response.session.add() + return response from google.cloud.spanner_v1.gapic.spanner_client import SpannerClient + self.spanner_api = mock.create_autospec(SpannerClient, instance=True) self.spanner_api.batch_create_sessions.side_effect = mock_batch_create_sessions From 991366f0f888c6fa49e03efef4c729c131aabc1c Mon Sep 17 00:00:00 2001 From: larkee Date: Thu, 26 Sep 2019 15:07:42 +1000 Subject: [PATCH 7/7] Update 'batch_create_sessions' to remove session_count keyword --- spanner/google/cloud/spanner_v1/pool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spanner/google/cloud/spanner_v1/pool.py b/spanner/google/cloud/spanner_v1/pool.py index 3b2a8d06bc99..4ef5aee9baab 100644 --- a/spanner/google/cloud/spanner_v1/pool.py +++ b/spanner/google/cloud/spanner_v1/pool.py @@ -172,7 +172,7 @@ def bind(self, database): while not self._sessions.full(): resp = api.batch_create_sessions( database.name, - session_count=self.size - self._sessions.qsize(), + self.size - self._sessions.qsize(), timeout=self.default_timeout, metadata=metadata, ) @@ -366,7 +366,7 @@ def bind(self, database): while created_session_count < self.size: resp = api.batch_create_sessions( database.name, - session_count=self.size - created_session_count, + self.size - created_session_count, timeout=self.default_timeout, metadata=metadata, )