From 168dfb4f548c492932bb6606b3dd239ce84ba0c2 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 14 Jul 2021 12:31:16 +0300 Subject: [PATCH 01/21] feat: use mutations for executemany() inserts --- google/cloud/spanner_dbapi/cursor.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index c5de13b370..30babf4f68 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -38,6 +38,7 @@ from google.cloud.spanner_dbapi import parse_utils from google.cloud.spanner_dbapi.parse_utils import get_param_types from google.cloud.spanner_dbapi.parse_utils import sql_pyformat_args_to_spanner +from google.cloud.spanner_dbapi.parse_utils import RE_INSERT from google.cloud.spanner_dbapi.utils import PeekIterator from google.cloud.spanner_dbapi.utils import StreamedManyResultSets @@ -258,9 +259,21 @@ def executemany(self, operation, seq_of_params): many_result_set = StreamedManyResultSets() - for params in seq_of_params: - self.execute(operation, params) - many_result_set.add_iter(self._itr) + if classification == parse_utils.STMT_INSERT: + match = RE_INSERT.search(operation) + + table_name = match["table_name"].strip("`") + + transaction = self.connection.transaction_checkout() + transaction.insert( + table=table_name, + columns=[col.strip('" ') for col in match["columns"].split(",")], + values=seq_of_params, + ) + else: + for params in seq_of_params: + self.execute(operation, params) + many_result_set.add_iter(self._itr) self._result_set = many_result_set self._itr = many_result_set From 35bce436614bb959f4f336c8f23c3b307b2ddbcc Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Thu, 15 Jul 2021 12:16:33 +0300 Subject: [PATCH 02/21] add unit test and fix parsing --- google/cloud/spanner_dbapi/cursor.py | 14 +++++++++--- tests/unit/spanner_dbapi/test_cursor.py | 29 +++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index 30babf4f68..a4a38f4f27 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -264,11 +264,19 @@ def executemany(self, operation, seq_of_params): table_name = match["table_name"].strip("`") + cols = [] + for col in match["columns"].split(","): + col = col.strip() + + if col[0] == '"' and col[-1] == '"': + col = col[1:-1] + + col = col.strip("`") + cols.append(col) + transaction = self.connection.transaction_checkout() transaction.insert( - table=table_name, - columns=[col.strip('" ') for col in match["columns"].split(",")], - values=seq_of_params, + table=table_name, columns=cols, values=seq_of_params, ) else: for params in seq_of_params: diff --git a/tests/unit/spanner_dbapi/test_cursor.py b/tests/unit/spanner_dbapi/test_cursor.py index 5b1cf12138..f49e4a029f 100644 --- a/tests/unit/spanner_dbapi/test_cursor.py +++ b/tests/unit/spanner_dbapi/test_cursor.py @@ -337,6 +337,35 @@ def test_executemany(self): (mock.call(operation, (1,)), mock.call(operation, (2,))) ) + def test_executemany_insert(self): + from google.cloud.spanner_dbapi import connect + + sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" + + with mock.patch( + "google.cloud.spanner_v1.instance.Instance.exists", return_value=True + ): + with mock.patch( + "google.cloud.spanner_v1.database.Database.exists", return_value=True + ): + connection = connect("test-instance", "test-database") + + cursor = connection.cursor() + transact_mock = mock.Mock() + transact_mock.insert = mock.Mock() + + with mock.patch( + "google.cloud.spanner_dbapi.connection.Connection.transaction_checkout", + return_value=transact_mock, + ): + cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) + + transact_mock.insert.assert_called_once_with( + table="table", + columns=["col1", "col2", "col3", '"col4"'], + values=[(1, 2, 3, 4), (5, 6, 7, 8)], + ) + @unittest.skipIf( sys.version_info[0] < 3, "Python 2 has an outdated iterator definition" ) From 2540ff71a50db810af7f435a891d59c6501e4496 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 16 Jul 2021 11:32:21 +0300 Subject: [PATCH 03/21] add use_mutations flag into Connection class --- google/cloud/spanner_dbapi/connection.py | 27 ++++++++++++++++++ google/cloud/spanner_dbapi/cursor.py | 29 ++++++++++---------- tests/unit/spanner_dbapi/test_cursor.py | 35 +++++++++++++++++++++++- 3 files changed, 76 insertions(+), 15 deletions(-) diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index 926408c928..d25982b8ee 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -63,6 +63,7 @@ def __init__(self, instance, database): self.is_closed = False self._autocommit = False + self._use_mutations = False # indicator to know if the session pool used by # this connection should be cleared on the # connection close @@ -121,6 +122,32 @@ def instance(self): """ return self._instance + @property + def use_mutations(self): + """ + Flag of the connection mode in which mutations + are used for massive DML statements. + + Returns: + bool: True if mutations mode is enable, False otherwise. + """ + return self._use_mutations + + @use_mutations.setter + def use_mutations(self, value): + """Change mutations use mode. + + Mutations are used by default in autocommit + mode and they can be used in transactions + mode in case of this flag set to True. + + Args: + value (bool): New flag value. + """ + self._use_mutations = value + if value: + self.autocommit = False + def _session_checkout(self): """Get a Cloud Spanner session from the pool. diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index a4a38f4f27..cded81e4b2 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -259,25 +259,26 @@ def executemany(self, operation, seq_of_params): many_result_set = StreamedManyResultSets() - if classification == parse_utils.STMT_INSERT: - match = RE_INSERT.search(operation) + if self.connection.autocommit or self.connection.use_mutations: + if classification == parse_utils.STMT_INSERT: + match = RE_INSERT.search(operation) - table_name = match["table_name"].strip("`") + table_name = match["table_name"].strip("`") - cols = [] - for col in match["columns"].split(","): - col = col.strip() + cols = [] + for col in match["columns"].split(","): + col = col.strip() - if col[0] == '"' and col[-1] == '"': - col = col[1:-1] + if col[0] == '"' and col[-1] == '"': + col = col[1:-1] - col = col.strip("`") - cols.append(col) + col = col.strip("`") + cols.append(col) - transaction = self.connection.transaction_checkout() - transaction.insert( - table=table_name, columns=cols, values=seq_of_params, - ) + transaction = self.connection.transaction_checkout() + transaction.insert( + table=table_name, columns=cols, values=seq_of_params, + ) else: for params in seq_of_params: self.execute(operation, params) diff --git a/tests/unit/spanner_dbapi/test_cursor.py b/tests/unit/spanner_dbapi/test_cursor.py index f49e4a029f..b8cd52e1ae 100644 --- a/tests/unit/spanner_dbapi/test_cursor.py +++ b/tests/unit/spanner_dbapi/test_cursor.py @@ -337,7 +337,7 @@ def test_executemany(self): (mock.call(operation, (1,)), mock.call(operation, (2,))) ) - def test_executemany_insert(self): + def test_executemany_insert_autocommit(self): from google.cloud.spanner_dbapi import connect sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" @@ -350,10 +350,43 @@ def test_executemany_insert(self): ): connection = connect("test-instance", "test-database") + connection.autocommit = True + cursor = connection.cursor() + transact_mock = mock.Mock() + transact_mock.insert = mock.Mock() + + with mock.patch( + "google.cloud.spanner_dbapi.connection.Connection.transaction_checkout", + return_value=transact_mock, + ): + cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) + + transact_mock.insert.assert_called_once_with( + table="table", + columns=["col1", "col2", "col3", '"col4"'], + values=[(1, 2, 3, 4), (5, 6, 7, 8)], + ) + + def test_executemany_insert_use_mutations(self): + from google.cloud.spanner_dbapi import connect + + sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" + + with mock.patch( + "google.cloud.spanner_v1.instance.Instance.exists", return_value=True + ): + with mock.patch( + "google.cloud.spanner_v1.database.Database.exists", return_value=True, + ): + connection = connect("test-instance", "test-database") + + connection.use_mutations = True cursor = connection.cursor() transact_mock = mock.Mock() transact_mock.insert = mock.Mock() + self.assertFalse(connection.autocommit) + with mock.patch( "google.cloud.spanner_dbapi.connection.Connection.transaction_checkout", return_value=transact_mock, From 0f8a8ff3098e5a0f37712a8a0a37bcb18584fbd7 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Mon, 19 Jul 2021 11:12:17 +0300 Subject: [PATCH 04/21] use three-values flag for use_mutations --- google/cloud/spanner_dbapi/connection.py | 13 ++- google/cloud/spanner_dbapi/cursor.py | 5 +- tests/unit/spanner_dbapi/test_cursor.py | 127 ++++++++++++++++++++--- 3 files changed, 127 insertions(+), 18 deletions(-) diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index d25982b8ee..856159f18d 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -50,6 +50,11 @@ class Connection: :param database: The database to which the connection is linked. """ + # use_mutations flag possible values + ALWAYS = "ALWAYS" + AUTOCOMMIT_ONLY = "AUTOCOMMIT_ONLY" + NEVER = "NEVER" + def __init__(self, instance, database): self._instance = instance self._database = database @@ -63,7 +68,7 @@ def __init__(self, instance, database): self.is_closed = False self._autocommit = False - self._use_mutations = False + self._use_mutations = self.NEVER # indicator to know if the session pool used by # this connection should be cleared on the # connection close @@ -144,9 +149,11 @@ def use_mutations(self, value): Args: value (bool): New flag value. """ + if value not in (self.NEVER, self.AUTOCOMMIT_ONLY, self.ALWAYS): + raise ValueError( + """`use_mutations` value should be one of: "NEVER", "AUTOCOMMIT_ONLY", "ALWAYS" """ + ) self._use_mutations = value - if value: - self.autocommit = False def _session_checkout(self): """Get a Cloud Spanner session from the pool. diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index cded81e4b2..c8cabe05e1 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -259,7 +259,10 @@ def executemany(self, operation, seq_of_params): many_result_set = StreamedManyResultSets() - if self.connection.autocommit or self.connection.use_mutations: + if ( + self.connection.use_mutations == self.connection.AUTOCOMMIT_ONLY + and self.connection.autocommit + ) or self.connection.use_mutations == self.connection.ALWAYS: if classification == parse_utils.STMT_INSERT: match = RE_INSERT.search(operation) diff --git a/tests/unit/spanner_dbapi/test_cursor.py b/tests/unit/spanner_dbapi/test_cursor.py index b8cd52e1ae..e43056904f 100644 --- a/tests/unit/spanner_dbapi/test_cursor.py +++ b/tests/unit/spanner_dbapi/test_cursor.py @@ -337,7 +337,36 @@ def test_executemany(self): (mock.call(operation, (1,)), mock.call(operation, (2,))) ) - def test_executemany_insert_autocommit(self): + def test_executemany_mutations_wrong_value(self): + """ + Check that `use_mutations` accepts three possible + values and fails if other value used. + """ + from google.cloud.spanner_dbapi import connect + + with mock.patch( + "google.cloud.spanner_v1.instance.Instance.exists", return_value=True + ): + with mock.patch( + "google.cloud.spanner_v1.database.Database.exists", return_value=True, + ): + connection = connect("test-instance", "test-database") + + with self.assertRaises(ValueError): + connection.use_mutations = "SOMETIMES" + + for use_mutations in ( + connection.NEVER, + connection.AUTOCOMMIT_ONLY, + connection.ALWAYS, + ): + connection.use_mutations = use_mutations + + def test_executemany_insert_use_mutations_never(self): + """ + Check that simple `execute()` used in autocommit and + !autocommit modes, when `use_mutations` is set to "NEVER". + """ from google.cloud.spanner_dbapi import connect sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" @@ -346,28 +375,79 @@ def test_executemany_insert_autocommit(self): "google.cloud.spanner_v1.instance.Instance.exists", return_value=True ): with mock.patch( - "google.cloud.spanner_v1.database.Database.exists", return_value=True + "google.cloud.spanner_v1.database.Database.exists", return_value=True, ): connection = connect("test-instance", "test-database") + cursor = connection.cursor() + cursor.execute = mock.Mock() + + # !autocommit mode + self.assertFalse(connection.autocommit) + cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) + cursor.execute.assert_called() + + cursor.execute.reset_mock() + + # autocommit mode connection.autocommit = True + cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) + cursor.execute.assert_called() + + def test_executemany_insert_use_mutations_autocommit_only(self): + """ + Check that simple `execute()` used in !autocommit mode + and mutations are used in autocommit mode, when + `use_mutations` is set to "AUTOCOMMIT_ONLY". + """ + from google.cloud.spanner_dbapi import connect + + sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" + + with mock.patch( + "google.cloud.spanner_v1.instance.Instance.exists", return_value=True + ): + with mock.patch( + "google.cloud.spanner_v1.database.Database.exists", return_value=True, + ): + connection = connect("test-instance", "test-database") + + connection.use_mutations = connection.AUTOCOMMIT_ONLY cursor = connection.cursor() + + cursor.execute = mock.Mock() transact_mock = mock.Mock() transact_mock.insert = mock.Mock() + # !autocommit mode + self.assertFalse(connection.autocommit) + cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) + cursor.execute.assert_called() + transact_mock.insert.assert_not_called() + + cursor.execute.reset_mock() + + # autocommit mode + connection.autocommit = True + with mock.patch( "google.cloud.spanner_dbapi.connection.Connection.transaction_checkout", return_value=transact_mock, ): cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) - transact_mock.insert.assert_called_once_with( - table="table", - columns=["col1", "col2", "col3", '"col4"'], - values=[(1, 2, 3, 4), (5, 6, 7, 8)], - ) + transact_mock.insert.assert_called_once_with( + table="table", + columns=["col1", "col2", "col3", '"col4"'], + values=[(1, 2, 3, 4), (5, 6, 7, 8)], + ) + cursor.execute.assert_not_called() - def test_executemany_insert_use_mutations(self): + def test_executemany_insert_use_mutations_always(self): + """ + Check that mutations are used in autocommit and !autocommit + modes, when `use_mutations` is set to "ALWAYS". + """ from google.cloud.spanner_dbapi import connect sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" @@ -380,24 +460,43 @@ def test_executemany_insert_use_mutations(self): ): connection = connect("test-instance", "test-database") - connection.use_mutations = True + connection.use_mutations = connection.ALWAYS cursor = connection.cursor() + + cursor.execute = mock.Mock() transact_mock = mock.Mock() transact_mock.insert = mock.Mock() + # !autocommit mode self.assertFalse(connection.autocommit) + with mock.patch( + "google.cloud.spanner_dbapi.connection.Connection.transaction_checkout", + return_value=transact_mock, + ): + cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) + + cursor.execute.assert_not_called() + transact_mock.insert.assert_called_once_with( + table="table", + columns=["col1", "col2", "col3", '"col4"'], + values=[(1, 2, 3, 4), (5, 6, 7, 8)], + ) + transact_mock.insert.reset_mock() + # autocommit mode + connection.autocommit = True with mock.patch( "google.cloud.spanner_dbapi.connection.Connection.transaction_checkout", return_value=transact_mock, ): cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) - transact_mock.insert.assert_called_once_with( - table="table", - columns=["col1", "col2", "col3", '"col4"'], - values=[(1, 2, 3, 4), (5, 6, 7, 8)], - ) + transact_mock.insert.assert_called_once_with( + table="table", + columns=["col1", "col2", "col3", '"col4"'], + values=[(1, 2, 3, 4), (5, 6, 7, 8)], + ) + cursor.execute.assert_not_called() @unittest.skipIf( sys.version_info[0] < 3, "Python 2 has an outdated iterator definition" From 1aa1359ac66b29cb634fc59e0d2cb39ea2b125cc Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Mon, 19 Jul 2021 11:15:45 +0300 Subject: [PATCH 05/21] update docstrings --- google/cloud/spanner_dbapi/connection.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index 856159f18d..5825102418 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -129,12 +129,12 @@ def instance(self): @property def use_mutations(self): - """ - Flag of the connection mode in which mutations - are used for massive DML statements. + """Mutations use mode of this connection. Returns: - bool: True if mutations mode is enable, False otherwise. + str: + One of three possible values: + "NEVER", "AUTOCOMMIT_ONLY", "ALWAYS". """ return self._use_mutations @@ -142,10 +142,6 @@ def use_mutations(self): def use_mutations(self, value): """Change mutations use mode. - Mutations are used by default in autocommit - mode and they can be used in transactions - mode in case of this flag set to True. - Args: value (bool): New flag value. """ From dcc1a94789013b5feb6f7a7d9c8f5798e6a97d95 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 20 Jul 2021 13:55:19 +0300 Subject: [PATCH 06/21] use batch DMLs for executemany() method --- google/cloud/spanner_dbapi/cursor.py | 28 +---- tests/unit/spanner_dbapi/test_cursor.py | 144 ++---------------------- 2 files changed, 13 insertions(+), 159 deletions(-) diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index c8cabe05e1..35f89543d9 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -259,29 +259,13 @@ def executemany(self, operation, seq_of_params): many_result_set = StreamedManyResultSets() - if ( - self.connection.use_mutations == self.connection.AUTOCOMMIT_ONLY - and self.connection.autocommit - ) or self.connection.use_mutations == self.connection.ALWAYS: - if classification == parse_utils.STMT_INSERT: - match = RE_INSERT.search(operation) - - table_name = match["table_name"].strip("`") - - cols = [] - for col in match["columns"].split(","): - col = col.strip() - - if col[0] == '"' and col[-1] == '"': - col = col[1:-1] - - col = col.strip("`") - cols.append(col) + if classification in (parse_utils.STMT_INSERT, parse_utils.STMT_UPDATING): + statements = [] + for params in seq_of_params: + statements.append(operation % tuple(params)) - transaction = self.connection.transaction_checkout() - transaction.insert( - table=table_name, columns=cols, values=seq_of_params, - ) + transaction = self.connection.transaction_checkout() + transaction.batch_update(statements) else: for params in seq_of_params: self.execute(operation, params) diff --git a/tests/unit/spanner_dbapi/test_cursor.py b/tests/unit/spanner_dbapi/test_cursor.py index e43056904f..217489e36f 100644 --- a/tests/unit/spanner_dbapi/test_cursor.py +++ b/tests/unit/spanner_dbapi/test_cursor.py @@ -337,117 +337,7 @@ def test_executemany(self): (mock.call(operation, (1,)), mock.call(operation, (2,))) ) - def test_executemany_mutations_wrong_value(self): - """ - Check that `use_mutations` accepts three possible - values and fails if other value used. - """ - from google.cloud.spanner_dbapi import connect - - with mock.patch( - "google.cloud.spanner_v1.instance.Instance.exists", return_value=True - ): - with mock.patch( - "google.cloud.spanner_v1.database.Database.exists", return_value=True, - ): - connection = connect("test-instance", "test-database") - - with self.assertRaises(ValueError): - connection.use_mutations = "SOMETIMES" - - for use_mutations in ( - connection.NEVER, - connection.AUTOCOMMIT_ONLY, - connection.ALWAYS, - ): - connection.use_mutations = use_mutations - - def test_executemany_insert_use_mutations_never(self): - """ - Check that simple `execute()` used in autocommit and - !autocommit modes, when `use_mutations` is set to "NEVER". - """ - from google.cloud.spanner_dbapi import connect - - sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" - - with mock.patch( - "google.cloud.spanner_v1.instance.Instance.exists", return_value=True - ): - with mock.patch( - "google.cloud.spanner_v1.database.Database.exists", return_value=True, - ): - connection = connect("test-instance", "test-database") - - cursor = connection.cursor() - cursor.execute = mock.Mock() - - # !autocommit mode - self.assertFalse(connection.autocommit) - cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) - cursor.execute.assert_called() - - cursor.execute.reset_mock() - - # autocommit mode - connection.autocommit = True - cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) - cursor.execute.assert_called() - - def test_executemany_insert_use_mutations_autocommit_only(self): - """ - Check that simple `execute()` used in !autocommit mode - and mutations are used in autocommit mode, when - `use_mutations` is set to "AUTOCOMMIT_ONLY". - """ - from google.cloud.spanner_dbapi import connect - - sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" - - with mock.patch( - "google.cloud.spanner_v1.instance.Instance.exists", return_value=True - ): - with mock.patch( - "google.cloud.spanner_v1.database.Database.exists", return_value=True, - ): - connection = connect("test-instance", "test-database") - - connection.use_mutations = connection.AUTOCOMMIT_ONLY - cursor = connection.cursor() - - cursor.execute = mock.Mock() - transact_mock = mock.Mock() - transact_mock.insert = mock.Mock() - - # !autocommit mode - self.assertFalse(connection.autocommit) - cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) - cursor.execute.assert_called() - transact_mock.insert.assert_not_called() - - cursor.execute.reset_mock() - - # autocommit mode - connection.autocommit = True - - with mock.patch( - "google.cloud.spanner_dbapi.connection.Connection.transaction_checkout", - return_value=transact_mock, - ): - cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) - - transact_mock.insert.assert_called_once_with( - table="table", - columns=["col1", "col2", "col3", '"col4"'], - values=[(1, 2, 3, 4), (5, 6, 7, 8)], - ) - cursor.execute.assert_not_called() - - def test_executemany_insert_use_mutations_always(self): - """ - Check that mutations are used in autocommit and !autocommit - modes, when `use_mutations` is set to "ALWAYS". - """ + def test_executemany_insert_batch(self): from google.cloud.spanner_dbapi import connect sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" @@ -460,43 +350,23 @@ def test_executemany_insert_use_mutations_always(self): ): connection = connect("test-instance", "test-database") - connection.use_mutations = connection.ALWAYS cursor = connection.cursor() - cursor.execute = mock.Mock() transact_mock = mock.Mock() - transact_mock.insert = mock.Mock() + transact_mock.batch_update = mock.Mock() - # !autocommit mode - self.assertFalse(connection.autocommit) - with mock.patch( - "google.cloud.spanner_dbapi.connection.Connection.transaction_checkout", - return_value=transact_mock, - ): - cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) - - cursor.execute.assert_not_called() - transact_mock.insert.assert_called_once_with( - table="table", - columns=["col1", "col2", "col3", '"col4"'], - values=[(1, 2, 3, 4), (5, 6, 7, 8)], - ) - transact_mock.insert.reset_mock() - - # autocommit mode - connection.autocommit = True with mock.patch( "google.cloud.spanner_dbapi.connection.Connection.transaction_checkout", return_value=transact_mock, ): cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) - transact_mock.insert.assert_called_once_with( - table="table", - columns=["col1", "col2", "col3", '"col4"'], - values=[(1, 2, 3, 4), (5, 6, 7, 8)], + transact_mock.batch_update.assert_called_once_with( + [ + """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (1, 2, 3, 4)""", + """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (5, 6, 7, 8)""", + ] ) - cursor.execute.assert_not_called() @unittest.skipIf( sys.version_info[0] < 3, "Python 2 has an outdated iterator definition" From 60d2d1add61181f22d6510141fbb8474cfd04abb Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 21 Jul 2021 12:21:14 +0300 Subject: [PATCH 07/21] prepare args before inserting into SQL statement --- google/cloud/spanner_dbapi/connection.py | 24 ------------------------ google/cloud/spanner_dbapi/cursor.py | 19 +++++++++++++++---- 2 files changed, 15 insertions(+), 28 deletions(-) diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index 5825102418..ef88fd2ce9 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -127,30 +127,6 @@ def instance(self): """ return self._instance - @property - def use_mutations(self): - """Mutations use mode of this connection. - - Returns: - str: - One of three possible values: - "NEVER", "AUTOCOMMIT_ONLY", "ALWAYS". - """ - return self._use_mutations - - @use_mutations.setter - def use_mutations(self, value): - """Change mutations use mode. - - Args: - value (bool): New flag value. - """ - if value not in (self.NEVER, self.AUTOCOMMIT_ONLY, self.ALWAYS): - raise ValueError( - """`use_mutations` value should be one of: "NEVER", "AUTOCOMMIT_ONLY", "ALWAYS" """ - ) - self._use_mutations = value - def _session_checkout(self): """Get a Cloud Spanner session from the pool. diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index 35f89543d9..71f33054d8 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -38,7 +38,6 @@ from google.cloud.spanner_dbapi import parse_utils from google.cloud.spanner_dbapi.parse_utils import get_param_types from google.cloud.spanner_dbapi.parse_utils import sql_pyformat_args_to_spanner -from google.cloud.spanner_dbapi.parse_utils import RE_INSERT from google.cloud.spanner_dbapi.utils import PeekIterator from google.cloud.spanner_dbapi.utils import StreamedManyResultSets @@ -259,13 +258,25 @@ def executemany(self, operation, seq_of_params): many_result_set = StreamedManyResultSets() - if classification in (parse_utils.STMT_INSERT, parse_utils.STMT_UPDATING): + if ( + classification in (parse_utils.STMT_INSERT, parse_utils.STMT_UPDATING) + and not self.connection.autocommit + ): statements = [] for params in seq_of_params: - statements.append(operation % tuple(params)) + pars = [] + for par in params: + if isinstance(par, str): + par = "'" + par + "'" + elif par is None: + par = "NULL" + pars.append(par) + + statements.append(operation % tuple(pars)) transaction = self.connection.transaction_checkout() - transaction.batch_update(statements) + _, res = transaction.batch_update(statements) + many_result_set.add_iter(res) else: for params in seq_of_params: self.execute(operation, params) From b9194439b5f06023205afc8aa832aa9716f0b67d Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 21 Jul 2021 12:22:02 +0300 Subject: [PATCH 08/21] erase mutation mentions --- google/cloud/spanner_dbapi/connection.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index ef88fd2ce9..926408c928 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -50,11 +50,6 @@ class Connection: :param database: The database to which the connection is linked. """ - # use_mutations flag possible values - ALWAYS = "ALWAYS" - AUTOCOMMIT_ONLY = "AUTOCOMMIT_ONLY" - NEVER = "NEVER" - def __init__(self, instance, database): self._instance = instance self._database = database @@ -68,7 +63,6 @@ def __init__(self, instance, database): self.is_closed = False self._autocommit = False - self._use_mutations = self.NEVER # indicator to know if the session pool used by # this connection should be cleared on the # connection close From e70ace8ffae6f75933dd97eb7fa0a60aa1fab2bd Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Thu, 22 Jul 2021 14:52:54 +0300 Subject: [PATCH 09/21] next step --- google/cloud/spanner_dbapi/cursor.py | 23 ++++++++++------------- tests/unit/spanner_dbapi/test_cursor.py | 25 +++++++++++++++---------- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index 71f33054d8..6023713e6e 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -258,24 +258,21 @@ def executemany(self, operation, seq_of_params): many_result_set = StreamedManyResultSets() - if ( - classification in (parse_utils.STMT_INSERT, parse_utils.STMT_UPDATING) - and not self.connection.autocommit - ): + if classification in (parse_utils.STMT_INSERT, parse_utils.STMT_UPDATING): statements = [] - for params in seq_of_params: - pars = [] - for par in params: - if isinstance(par, str): - par = "'" + par + "'" - elif par is None: - par = "NULL" - pars.append(par) - statements.append(operation % tuple(pars)) + for params in seq_of_params: + sql, params = parse_utils.sql_pyformat_args_to_spanner( + operation, params + ) + statements.append((sql, params, get_param_types(params))) transaction = self.connection.transaction_checkout() _, res = transaction.batch_update(statements) + + if self.connection.autocommit: + transaction.commit() + many_result_set.add_iter(res) else: for params in seq_of_params: diff --git a/tests/unit/spanner_dbapi/test_cursor.py b/tests/unit/spanner_dbapi/test_cursor.py index 217489e36f..3477b455b7 100644 --- a/tests/unit/spanner_dbapi/test_cursor.py +++ b/tests/unit/spanner_dbapi/test_cursor.py @@ -338,6 +338,7 @@ def test_executemany(self): ) def test_executemany_insert_batch(self): + from google.cloud.spanner_v1.param_types import INT64 from google.cloud.spanner_dbapi import connect sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" @@ -352,19 +353,23 @@ def test_executemany_insert_batch(self): cursor = connection.cursor() - transact_mock = mock.Mock() - transact_mock.batch_update = mock.Mock() + connection._transaction = mock.Mock(committed=False, rolled_back=False) + connection._transaction.batch_update = mock.Mock(return_value=[None, []]) - with mock.patch( - "google.cloud.spanner_dbapi.connection.Connection.transaction_checkout", - return_value=transact_mock, - ): - cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) + cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) - transact_mock.batch_update.assert_called_once_with( + connection._transaction.batch_update.assert_called_once_with( [ - """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (1, 2, 3, 4)""", - """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (5, 6, 7, 8)""", + ( + """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (@a0, @a1, @a2, @a3)""", + {"a0": 1, "a1": 2, "a2": 3, "a3": 4}, + {"a0": INT64, "a1": INT64, "a2": INT64, "a3": INT64}, + ), + ( + """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (@a0, @a1, @a2, @a3)""", + {"a0": 5, "a1": 6, "a2": 7, "a3": 8}, + {"a0": INT64, "a1": INT64, "a2": INT64, "a3": INT64}, + ), ] ) From 62c1b2b802e0dbc60721f5eec9c5bcc47c2525df Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Mon, 26 Jul 2021 12:46:46 +0300 Subject: [PATCH 10/21] next step --- google/cloud/spanner_dbapi/connection.py | 11 +-- google/cloud/spanner_dbapi/cursor.py | 12 ++- tests/unit/spanner_dbapi/test_connection.py | 3 - tests/unit/spanner_dbapi/test_cursor.py | 102 +++++++++++++++++++- 4 files changed, 113 insertions(+), 15 deletions(-) diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index 926408c928..7a3278027b 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -201,17 +201,14 @@ def transaction_checkout(self): Begin a new transaction, if there is no transaction in this connection yet. Return the begun one otherwise. - The method is non operational in autocommit mode. - :rtype: :class:`google.cloud.spanner_v1.transaction.Transaction` :returns: A Cloud Spanner transaction object, ready to use. """ - if not self.autocommit: - if not self.inside_transaction: - self._transaction = self._session_checkout().transaction() - self._transaction.begin() + if not self.inside_transaction: + self._transaction = self._session_checkout().transaction() + self._transaction.begin() - return self._transaction + return self._transaction def _raise_if_closed(self): """Helper to check the connection state before running a query. diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index 6023713e6e..e0b69d167b 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -41,6 +41,8 @@ from google.cloud.spanner_dbapi.utils import PeekIterator from google.cloud.spanner_dbapi.utils import StreamedManyResultSets +from google.rpc.code_pb2 import ABORTED, OK + _UNSET_COUNT = -1 ColumnDetails = namedtuple("column_details", ["null_ok", "spanner_type"]) @@ -268,12 +270,16 @@ def executemany(self, operation, seq_of_params): statements.append((sql, params, get_param_types(params))) transaction = self.connection.transaction_checkout() - _, res = transaction.batch_update(statements) + status, res = transaction.batch_update(statements) + many_result_set.add_iter(res) + + if status.code == ABORTED: + raise Aborted(status.details) + elif status.code != OK: + raise OperationalError(status.details) if self.connection.autocommit: transaction.commit() - - many_result_set.add_iter(res) else: for params in seq_of_params: self.execute(operation, params) diff --git a/tests/unit/spanner_dbapi/test_connection.py b/tests/unit/spanner_dbapi/test_connection.py index 772ac35032..15d57704e0 100644 --- a/tests/unit/spanner_dbapi/test_connection.py +++ b/tests/unit/spanner_dbapi/test_connection.py @@ -157,9 +157,6 @@ def test_transaction_checkout(self): mock_transaction.committed = mock_transaction.rolled_back = False self.assertEqual(connection.transaction_checkout(), mock_transaction) - connection._autocommit = True - self.assertIsNone(connection.transaction_checkout()) - def test_close(self): from google.cloud.spanner_dbapi import connect, InterfaceError diff --git a/tests/unit/spanner_dbapi/test_cursor.py b/tests/unit/spanner_dbapi/test_cursor.py index 3477b455b7..8dafa7828b 100644 --- a/tests/unit/spanner_dbapi/test_cursor.py +++ b/tests/unit/spanner_dbapi/test_cursor.py @@ -337,9 +337,10 @@ def test_executemany(self): (mock.call(operation, (1,)), mock.call(operation, (2,))) ) - def test_executemany_insert_batch(self): + def test_executemany_insert_batch_non_autocommit(self): from google.cloud.spanner_v1.param_types import INT64 from google.cloud.spanner_dbapi import connect + from google.rpc.code_pb2 import OK sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" @@ -351,11 +352,53 @@ def test_executemany_insert_batch(self): ): connection = connect("test-instance", "test-database") + connection._transaction = mock.Mock(committed=False, rolled_back=False) + connection._transaction.batch_update = mock.Mock( + return_value=[mock.Mock(code=OK), []] + ) + cursor = connection.cursor() + cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) + + connection._transaction.batch_update.assert_called_once_with( + [ + ( + """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (@a0, @a1, @a2, @a3)""", + {"a0": 1, "a1": 2, "a2": 3, "a3": 4}, + {"a0": INT64, "a1": INT64, "a2": INT64, "a3": INT64}, + ), + ( + """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (@a0, @a1, @a2, @a3)""", + {"a0": 5, "a1": 6, "a2": 7, "a3": 8}, + {"a0": INT64, "a1": INT64, "a2": INT64, "a3": INT64}, + ), + ] + ) + + def test_executemany_insert_batch_autocommit(self): + from google.cloud.spanner_v1.param_types import INT64 + from google.cloud.spanner_dbapi import connect + from google.rpc.code_pb2 import OK + + sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" + + with mock.patch( + "google.cloud.spanner_v1.instance.Instance.exists", return_value=True + ): + with mock.patch( + "google.cloud.spanner_v1.database.Database.exists", return_value=True, + ): + connection = connect("test-instance", "test-database") + + connection.autocommit = True connection._transaction = mock.Mock(committed=False, rolled_back=False) - connection._transaction.batch_update = mock.Mock(return_value=[None, []]) + connection._transaction.batch_update = mock.Mock( + return_value=[mock.Mock(code=OK), []] + ) + connection._transaction.commit = mock.Mock() + cursor = connection.cursor() cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) connection._transaction.batch_update.assert_called_once_with( @@ -372,6 +415,61 @@ def test_executemany_insert_batch(self): ), ] ) + connection._transaction.commit.assert_called_once() + + def test_executemany_insert_batch_failed(self): + from google.cloud.spanner_dbapi import connect + from google.cloud.spanner_dbapi.exceptions import OperationalError + from google.rpc.code_pb2 import UNKNOWN + + sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" + err_details = "Details here" + + with mock.patch( + "google.cloud.spanner_v1.instance.Instance.exists", return_value=True + ): + with mock.patch( + "google.cloud.spanner_v1.database.Database.exists", return_value=True, + ): + connection = connect("test-instance", "test-database") + + connection.autocommit = True + cursor = connection.cursor() + + connection._transaction = mock.Mock(committed=False, rolled_back=False) + connection._transaction.batch_update = mock.Mock( + return_value=(mock.Mock(code=UNKNOWN, details=err_details), []) + ) + + with self.assertRaisesRegex(OperationalError, err_details): + cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) + + def test_executemany_insert_batch_aborted(self): + from google.api_core.exceptions import Aborted + from google.cloud.spanner_dbapi import connect + from google.rpc.code_pb2 import ABORTED + + sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" + err_details = "Aborted details here" + + with mock.patch( + "google.cloud.spanner_v1.instance.Instance.exists", return_value=True + ): + with mock.patch( + "google.cloud.spanner_v1.database.Database.exists", return_value=True, + ): + connection = connect("test-instance", "test-database") + + connection.autocommit = True + cursor = connection.cursor() + + connection._transaction = mock.Mock(committed=False, rolled_back=False) + connection._transaction.batch_update = mock.Mock( + return_value=(mock.Mock(code=ABORTED, details=err_details), []) + ) + + with self.assertRaisesRegex(Aborted, err_details): + cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) @unittest.skipIf( sys.version_info[0] < 3, "Python 2 has an outdated iterator definition" From 62b937d6889fc56a64e5073985bab8ccd5d8c24d Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Mon, 26 Jul 2021 17:24:06 +0300 Subject: [PATCH 11/21] next step --- google/cloud/spanner_dbapi/connection.py | 53 ++++++++++++------ google/cloud/spanner_dbapi/cursor.py | 42 ++++++++++++--- tests/unit/spanner_dbapi/test_cursor.py | 69 +++++++++++++++++++++--- 3 files changed, 132 insertions(+), 32 deletions(-) diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index 7a3278027b..b675eac7c7 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -32,6 +32,9 @@ from google.cloud.spanner_dbapi.version import DEFAULT_USER_AGENT from google.cloud.spanner_dbapi.version import PY_VERSION +from google.cloud.spanner_dbapi.exceptions import OperationalError +from google.rpc.code_pb2 import ABORTED, OK + AUTOCOMMIT_MODE_WARNING = "This method is non-operational in autocommit mode" MAX_INTERNAL_RETRIES = 50 @@ -175,25 +178,43 @@ def _rerun_previous_statements(self): from the last transaction. """ for statement in self._statements: - res_iter, retried_checksum = self.run_statement(statement, retried=True) - # executing all the completed statements - if statement != self._statements[-1]: - for res in res_iter: - retried_checksum.consume_result(res) - - _compare_checksums(statement.checksum, retried_checksum) - # executing the failed statement + if isinstance(statement, list): + statements, checksum = statement + + transaction = self.transaction_checkout() + status, res = transaction.batch_update(statements) + + if status.code == ABORTED: + self.connection._transaction = None + raise Aborted(status.details) + elif status.code != OK: + self.connection._transaction = None + raise OperationalError(status.details) + + retried_checksum = ResultsChecksum() + retried_checksum.consume_result(res) + + _compare_checksums(checksum, retried_checksum) else: - # streaming up to the failed result or - # to the end of the streaming iterator - while len(retried_checksum) < len(statement.checksum): - try: - res = next(iter(res_iter)) + res_iter, retried_checksum = self.run_statement(statement, retried=True) + # executing all the completed statements + if statement != self._statements[-1]: + for res in res_iter: retried_checksum.consume_result(res) - except StopIteration: - break - _compare_checksums(statement.checksum, retried_checksum) + _compare_checksums(statement.checksum, retried_checksum) + # executing the failed statement + else: + # streaming up to the failed result or + # to the end of the streaming iterator + while len(retried_checksum) < len(statement.checksum): + try: + res = next(iter(res_iter)) + retried_checksum.consume_result(res) + except StopIteration: + break + + _compare_checksums(statement.checksum, retried_checksum) def transaction_checkout(self): """Get a Cloud Spanner transaction. diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index e0b69d167b..7c5ade9967 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -269,17 +269,43 @@ def executemany(self, operation, seq_of_params): ) statements.append((sql, params, get_param_types(params))) - transaction = self.connection.transaction_checkout() - status, res = transaction.batch_update(statements) - many_result_set.add_iter(res) + if self.connection.autocommit: + transaction = self.connection.transaction_checkout() + status, res = transaction.batch_update(statements) + many_result_set.add_iter(res) - if status.code == ABORTED: - raise Aborted(status.details) - elif status.code != OK: - raise OperationalError(status.details) + if status.code != OK: + self.connection._transaction = None + raise OperationalError(status.details) - if self.connection.autocommit: transaction.commit() + else: + retried = False + while True: + try: + transaction = self.connection.transaction_checkout() + + res_checksum = ResultsChecksum() + if not retried: + self.connection._statements.append( + (statements, res_checksum) + ) + + status, res = transaction.batch_update(statements) + many_result_set.add_iter(res) + res_checksum.consume_result(res) + + if status.code == ABORTED: + self.connection._transaction = None + raise Aborted(status.details) + elif status.code != OK: + self.connection._transaction = None + raise OperationalError(status.details) + break + except Aborted: + self.connection.retry_transaction() + retried = True + else: for params in seq_of_params: self.execute(operation, params) diff --git a/tests/unit/spanner_dbapi/test_cursor.py b/tests/unit/spanner_dbapi/test_cursor.py index 8dafa7828b..b8e18cdf4c 100644 --- a/tests/unit/spanner_dbapi/test_cursor.py +++ b/tests/unit/spanner_dbapi/test_cursor.py @@ -445,9 +445,10 @@ def test_executemany_insert_batch_failed(self): cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) def test_executemany_insert_batch_aborted(self): - from google.api_core.exceptions import Aborted from google.cloud.spanner_dbapi import connect - from google.rpc.code_pb2 import ABORTED + from google.cloud.spanner_dbapi.checksum import ResultsChecksum + from google.cloud.spanner_v1.param_types import INT64 + from google.rpc.code_pb2 import ABORTED, OK sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" err_details = "Aborted details here" @@ -460,16 +461,68 @@ def test_executemany_insert_batch_aborted(self): ): connection = connect("test-instance", "test-database") - connection.autocommit = True + transaction1 = mock.Mock(committed=False, rolled_back=False) + transaction1.batch_update = mock.Mock( + side_effect=[(mock.Mock(code=ABORTED, details=err_details), [])] + ) + + transaction2 = mock.Mock(committed=False, rolled_back=False) + transaction2.batch_update = mock.Mock(side_effect=[(mock.Mock(code=OK), [])]) + + connection.transaction_checkout = mock.Mock( + side_effect=[transaction1, transaction2] + ) + connection.retry_transaction = mock.Mock() + cursor = connection.cursor() + cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) - connection._transaction = mock.Mock(committed=False, rolled_back=False) - connection._transaction.batch_update = mock.Mock( - return_value=(mock.Mock(code=ABORTED, details=err_details), []) + transaction1.batch_update.assert_called_with( + [ + ( + """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (@a0, @a1, @a2, @a3)""", + {"a0": 1, "a1": 2, "a2": 3, "a3": 4}, + {"a0": INT64, "a1": INT64, "a2": INT64, "a3": INT64}, + ), + ( + """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (@a0, @a1, @a2, @a3)""", + {"a0": 5, "a1": 6, "a2": 7, "a3": 8}, + {"a0": INT64, "a1": INT64, "a2": INT64, "a3": INT64}, + ), + ] + ) + transaction2.batch_update.assert_called_with( + [ + ( + """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (@a0, @a1, @a2, @a3)""", + {"a0": 1, "a1": 2, "a2": 3, "a3": 4}, + {"a0": INT64, "a1": INT64, "a2": INT64, "a3": INT64}, + ), + ( + """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (@a0, @a1, @a2, @a3)""", + {"a0": 5, "a1": 6, "a2": 7, "a3": 8}, + {"a0": INT64, "a1": INT64, "a2": INT64, "a3": INT64}, + ), + ] ) + connection.retry_transaction.assert_called_once() - with self.assertRaisesRegex(Aborted, err_details): - cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) + self.assertEqual( + connection._statements[0][0], + [ + ( + """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (@a0, @a1, @a2, @a3)""", + {"a0": 1, "a1": 2, "a2": 3, "a3": 4}, + {"a0": INT64, "a1": INT64, "a2": INT64, "a3": INT64}, + ), + ( + """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (@a0, @a1, @a2, @a3)""", + {"a0": 5, "a1": 6, "a2": 7, "a3": 8}, + {"a0": INT64, "a1": INT64, "a2": INT64, "a3": INT64}, + ), + ], + ) + self.assertIsInstance(connection._statements[0][1], ResultsChecksum) @unittest.skipIf( sys.version_info[0] < 3, "Python 2 has an outdated iterator definition" From d34ae20867f75a50228bb0cc769ef1de82b0c9d1 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 27 Jul 2021 11:38:20 +0300 Subject: [PATCH 12/21] fixes --- google/cloud/spanner_dbapi/connection.py | 2 +- google/cloud/spanner_dbapi/cursor.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index b675eac7c7..8e71c10664 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -188,11 +188,11 @@ def _rerun_previous_statements(self): self.connection._transaction = None raise Aborted(status.details) elif status.code != OK: - self.connection._transaction = None raise OperationalError(status.details) retried_checksum = ResultsChecksum() retried_checksum.consume_result(res) + retried_checksum.consume_result(status.code) _compare_checksums(checksum, retried_checksum) else: diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index 7c5ade9967..5bf79d1ec6 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -275,6 +275,7 @@ def executemany(self, operation, seq_of_params): many_result_set.add_iter(res) if status.code != OK: + self.connection._transaction.rollback() self.connection._transaction = None raise OperationalError(status.details) @@ -294,12 +295,12 @@ def executemany(self, operation, seq_of_params): status, res = transaction.batch_update(statements) many_result_set.add_iter(res) res_checksum.consume_result(res) + res_checksum.consume_result(status.code) if status.code == ABORTED: self.connection._transaction = None raise Aborted(status.details) elif status.code != OK: - self.connection._transaction = None raise OperationalError(status.details) break except Aborted: From cdc2faf8142573dd8f6ddb576f0806e029decf49 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 27 Jul 2021 12:06:17 +0300 Subject: [PATCH 13/21] add unit tests for UPDATE and DELETE statements --- tests/unit/spanner_dbapi/test_cursor.py | 82 +++++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/tests/unit/spanner_dbapi/test_cursor.py b/tests/unit/spanner_dbapi/test_cursor.py index b8e18cdf4c..cb91732496 100644 --- a/tests/unit/spanner_dbapi/test_cursor.py +++ b/tests/unit/spanner_dbapi/test_cursor.py @@ -337,6 +337,88 @@ def test_executemany(self): (mock.call(operation, (1,)), mock.call(operation, (2,))) ) + def test_executemany_delete_batch_autocommit(self): + from google.cloud.spanner_v1.param_types import INT64 + from google.cloud.spanner_dbapi import connect + from google.rpc.code_pb2 import OK + + sql = """DELETE FROM table WHERE col1 = %s""" + + with mock.patch( + "google.cloud.spanner_v1.instance.Instance.exists", return_value=True + ): + with mock.patch( + "google.cloud.spanner_v1.database.Database.exists", return_value=True, + ): + connection = connect("test-instance", "test-database") + + connection.autocommit = True + + connection._transaction = mock.Mock(committed=False, rolled_back=False) + connection._transaction.batch_update = mock.Mock( + return_value=[mock.Mock(code=OK), []] + ) + connection._transaction.commit = mock.Mock() + + cursor = connection.cursor() + cursor.executemany(sql, [(1,), (2,), (3,)]) + + connection._transaction.batch_update.assert_called_once_with( + [ + ("""DELETE FROM table WHERE col1 = @a0""", {"a0": 1}, {"a0": INT64},), + ("""DELETE FROM table WHERE col1 = @a0""", {"a0": 2}, {"a0": INT64},), + ("""DELETE FROM table WHERE col1 = @a0""", {"a0": 3}, {"a0": INT64},), + ] + ) + connection._transaction.commit.assert_called_once() + + def test_executemany_update_batch_autocommit(self): + from google.cloud.spanner_v1.param_types import INT64, STRING + from google.cloud.spanner_dbapi import connect + from google.rpc.code_pb2 import OK + + sql = """UPDATE table SET col1 = %s WHERE col2 = %s""" + + with mock.patch( + "google.cloud.spanner_v1.instance.Instance.exists", return_value=True + ): + with mock.patch( + "google.cloud.spanner_v1.database.Database.exists", return_value=True, + ): + connection = connect("test-instance", "test-database") + + connection.autocommit = True + + connection._transaction = mock.Mock(committed=False, rolled_back=False) + connection._transaction.batch_update = mock.Mock( + return_value=[mock.Mock(code=OK), []] + ) + connection._transaction.commit = mock.Mock() + + cursor = connection.cursor() + cursor.executemany(sql, [(1, "a"), (2, "b"), (3, "c")]) + + connection._transaction.batch_update.assert_called_once_with( + [ + ( + """UPDATE table SET col1 = @a0 WHERE col2 = @a1""", + {"a0": 1, "a1": "a"}, + {"a0": INT64, "a1": STRING}, + ), + ( + """UPDATE table SET col1 = @a0 WHERE col2 = @a1""", + {"a0": 2, "a1": "b"}, + {"a0": INT64, "a1": STRING}, + ), + ( + """UPDATE table SET col1 = @a0 WHERE col2 = @a1""", + {"a0": 3, "a1": "c"}, + {"a0": INT64, "a1": STRING}, + ), + ] + ) + connection._transaction.commit.assert_called_once() + def test_executemany_insert_batch_non_autocommit(self): from google.cloud.spanner_v1.param_types import INT64 from google.cloud.spanner_dbapi import connect From a0f60d2c0f67ecf9357a817b768cec624447bc69 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 27 Jul 2021 14:27:01 +0300 Subject: [PATCH 14/21] don't propagate errors to users on retry --- google/cloud/spanner_dbapi/connection.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index 8e71c10664..8555d5f7c0 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -187,8 +187,6 @@ def _rerun_previous_statements(self): if status.code == ABORTED: self.connection._transaction = None raise Aborted(status.details) - elif status.code != OK: - raise OperationalError(status.details) retried_checksum = ResultsChecksum() retried_checksum.consume_result(res) From aec86e3db0c03729f3487e578954a41a2d7939f0 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 27 Jul 2021 14:58:15 +0300 Subject: [PATCH 15/21] lint fixes --- google/cloud/spanner_dbapi/connection.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index 8555d5f7c0..2ca7edbea2 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -32,8 +32,7 @@ from google.cloud.spanner_dbapi.version import DEFAULT_USER_AGENT from google.cloud.spanner_dbapi.version import PY_VERSION -from google.cloud.spanner_dbapi.exceptions import OperationalError -from google.rpc.code_pb2 import ABORTED, OK +from google.rpc.code_pb2 import ABORTED AUTOCOMMIT_MODE_WARNING = "This method is non-operational in autocommit mode" From 924430a5338c59efa6a30b039d3ef945f2b0e5fe Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 28 Jul 2021 12:51:53 +0300 Subject: [PATCH 16/21] use run_in_transaction --- google/cloud/spanner_dbapi/cursor.py | 22 ++--- tests/unit/spanner_dbapi/test_cursor.py | 113 ++++++++++++++++-------- 2 files changed, 87 insertions(+), 48 deletions(-) diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index 5bf79d1ec6..dccbf04dc8 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -158,6 +158,15 @@ def _do_execute_update(self, transaction, sql, params): return result + def _do_batch_update(self, transaction, statements, many_result_set): + status, res = transaction.batch_update(statements) + many_result_set.add_iter(res) + + if status.code == ABORTED: + raise Aborted(status.details) + elif status.code != OK: + raise OperationalError(status.details) + def execute(self, sql, args=None): """Prepares and executes a Spanner database operation. @@ -270,16 +279,9 @@ def executemany(self, operation, seq_of_params): statements.append((sql, params, get_param_types(params))) if self.connection.autocommit: - transaction = self.connection.transaction_checkout() - status, res = transaction.batch_update(statements) - many_result_set.add_iter(res) - - if status.code != OK: - self.connection._transaction.rollback() - self.connection._transaction = None - raise OperationalError(status.details) - - transaction.commit() + self.connection.database.run_in_transaction( + self._do_batch_update, statements, many_result_set + ) else: retried = False while True: diff --git a/tests/unit/spanner_dbapi/test_cursor.py b/tests/unit/spanner_dbapi/test_cursor.py index cb91732496..8f00a18fc7 100644 --- a/tests/unit/spanner_dbapi/test_cursor.py +++ b/tests/unit/spanner_dbapi/test_cursor.py @@ -338,8 +338,9 @@ def test_executemany(self): ) def test_executemany_delete_batch_autocommit(self): - from google.cloud.spanner_v1.param_types import INT64 from google.cloud.spanner_dbapi import connect + from google.cloud.spanner_v1.param_types import INT64 + from google.cloud.spanner_v1.types.spanner import Session from google.rpc.code_pb2 import OK sql = """DELETE FROM table WHERE col1 = %s""" @@ -354,27 +355,34 @@ def test_executemany_delete_batch_autocommit(self): connection.autocommit = True - connection._transaction = mock.Mock(committed=False, rolled_back=False) - connection._transaction.batch_update = mock.Mock( - return_value=[mock.Mock(code=OK), []] - ) - connection._transaction.commit = mock.Mock() + transaction = mock.Mock(committed=False, rolled_back=False) + transaction.batch_update = mock.Mock(return_value=[mock.Mock(code=OK), []]) + transaction.commit = mock.Mock() cursor = connection.cursor() - cursor.executemany(sql, [(1,), (2,), (3,)]) - connection._transaction.batch_update.assert_called_once_with( + with mock.patch( + "google.cloud.spanner_v1.services.spanner.client.SpannerClient.create_session", + return_value=Session(), + ): + with mock.patch( + "google.cloud.spanner_v1.session.Session.transaction", + return_value=transaction, + ): + cursor.executemany(sql, [(1,), (2,), (3,)]) + + transaction.batch_update.assert_called_once_with( [ ("""DELETE FROM table WHERE col1 = @a0""", {"a0": 1}, {"a0": INT64},), ("""DELETE FROM table WHERE col1 = @a0""", {"a0": 2}, {"a0": INT64},), ("""DELETE FROM table WHERE col1 = @a0""", {"a0": 3}, {"a0": INT64},), ] ) - connection._transaction.commit.assert_called_once() def test_executemany_update_batch_autocommit(self): - from google.cloud.spanner_v1.param_types import INT64, STRING from google.cloud.spanner_dbapi import connect + from google.cloud.spanner_v1.param_types import INT64, STRING + from google.cloud.spanner_v1.types.spanner import Session from google.rpc.code_pb2 import OK sql = """UPDATE table SET col1 = %s WHERE col2 = %s""" @@ -389,16 +397,23 @@ def test_executemany_update_batch_autocommit(self): connection.autocommit = True - connection._transaction = mock.Mock(committed=False, rolled_back=False) - connection._transaction.batch_update = mock.Mock( - return_value=[mock.Mock(code=OK), []] - ) - connection._transaction.commit = mock.Mock() + transaction = mock.Mock(committed=False, rolled_back=False) + transaction.batch_update = mock.Mock(return_value=[mock.Mock(code=OK), []]) + transaction.commit = mock.Mock() cursor = connection.cursor() - cursor.executemany(sql, [(1, "a"), (2, "b"), (3, "c")]) - connection._transaction.batch_update.assert_called_once_with( + with mock.patch( + "google.cloud.spanner_v1.services.spanner.client.SpannerClient.create_session", + return_value=Session(), + ): + with mock.patch( + "google.cloud.spanner_v1.session.Session.transaction", + return_value=transaction, + ): + cursor.executemany(sql, [(1, "a"), (2, "b"), (3, "c")]) + + transaction.batch_update.assert_called_once_with( [ ( """UPDATE table SET col1 = @a0 WHERE col2 = @a1""", @@ -417,11 +432,11 @@ def test_executemany_update_batch_autocommit(self): ), ] ) - connection._transaction.commit.assert_called_once() def test_executemany_insert_batch_non_autocommit(self): - from google.cloud.spanner_v1.param_types import INT64 from google.cloud.spanner_dbapi import connect + from google.cloud.spanner_v1.param_types import INT64 + from google.cloud.spanner_v1.types.spanner import Session from google.rpc.code_pb2 import OK sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" @@ -434,15 +449,21 @@ def test_executemany_insert_batch_non_autocommit(self): ): connection = connect("test-instance", "test-database") - connection._transaction = mock.Mock(committed=False, rolled_back=False) - connection._transaction.batch_update = mock.Mock( - return_value=[mock.Mock(code=OK), []] - ) + transaction = mock.Mock(committed=False, rolled_back=False) + transaction.batch_update = mock.Mock(return_value=[mock.Mock(code=OK), []]) cursor = connection.cursor() - cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) + with mock.patch( + "google.cloud.spanner_v1.services.spanner.client.SpannerClient.create_session", + return_value=Session(), + ): + with mock.patch( + "google.cloud.spanner_v1.session.Session.transaction", + return_value=transaction, + ): + cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) - connection._transaction.batch_update.assert_called_once_with( + transaction.batch_update.assert_called_once_with( [ ( """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (@a0, @a1, @a2, @a3)""", @@ -458,8 +479,9 @@ def test_executemany_insert_batch_non_autocommit(self): ) def test_executemany_insert_batch_autocommit(self): - from google.cloud.spanner_v1.param_types import INT64 from google.cloud.spanner_dbapi import connect + from google.cloud.spanner_v1.param_types import INT64 + from google.cloud.spanner_v1.types.spanner import Session from google.rpc.code_pb2 import OK sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" @@ -474,16 +496,22 @@ def test_executemany_insert_batch_autocommit(self): connection.autocommit = True - connection._transaction = mock.Mock(committed=False, rolled_back=False) - connection._transaction.batch_update = mock.Mock( - return_value=[mock.Mock(code=OK), []] - ) - connection._transaction.commit = mock.Mock() + transaction = mock.Mock(committed=False, rolled_back=False) + transaction.batch_update = mock.Mock(return_value=[mock.Mock(code=OK), []]) + transaction.commit = mock.Mock() cursor = connection.cursor() - cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) + with mock.patch( + "google.cloud.spanner_v1.services.spanner.client.SpannerClient.create_session", + return_value=Session(), + ): + with mock.patch( + "google.cloud.spanner_v1.session.Session.transaction", + return_value=transaction, + ): + cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) - connection._transaction.batch_update.assert_called_once_with( + transaction.batch_update.assert_called_once_with( [ ( """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (@a0, @a1, @a2, @a3)""", @@ -497,11 +525,12 @@ def test_executemany_insert_batch_autocommit(self): ), ] ) - connection._transaction.commit.assert_called_once() + transaction.commit.assert_called_once() def test_executemany_insert_batch_failed(self): from google.cloud.spanner_dbapi import connect from google.cloud.spanner_dbapi.exceptions import OperationalError + from google.cloud.spanner_v1.types.spanner import Session from google.rpc.code_pb2 import UNKNOWN sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" @@ -518,13 +547,21 @@ def test_executemany_insert_batch_failed(self): connection.autocommit = True cursor = connection.cursor() - connection._transaction = mock.Mock(committed=False, rolled_back=False) - connection._transaction.batch_update = mock.Mock( + transaction = mock.Mock(committed=False, rolled_back=False) + transaction.batch_update = mock.Mock( return_value=(mock.Mock(code=UNKNOWN, details=err_details), []) ) - with self.assertRaisesRegex(OperationalError, err_details): - cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) + with mock.patch( + "google.cloud.spanner_v1.services.spanner.client.SpannerClient.create_session", + return_value=Session(), + ): + with mock.patch( + "google.cloud.spanner_v1.session.Session.transaction", + return_value=transaction, + ): + with self.assertRaisesRegex(OperationalError, err_details): + cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)]) def test_executemany_insert_batch_aborted(self): from google.cloud.spanner_dbapi import connect From d66d401465d53f9c3ba7661b634160006cc18ad5 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Thu, 29 Jul 2021 10:54:54 +0300 Subject: [PATCH 17/21] refactor the tests code --- tests/unit/spanner_dbapi/test_cursor.py | 50 +++++++++++-------------- 1 file changed, 21 insertions(+), 29 deletions(-) diff --git a/tests/unit/spanner_dbapi/test_cursor.py b/tests/unit/spanner_dbapi/test_cursor.py index 8f00a18fc7..3599cec623 100644 --- a/tests/unit/spanner_dbapi/test_cursor.py +++ b/tests/unit/spanner_dbapi/test_cursor.py @@ -37,6 +37,13 @@ def _make_connection(self, *args, **kwargs): return Connection(*args, **kwargs) + def _transaction_mock(self): + from google.rpc.code_pb2 import OK + + transaction = mock.Mock(committed=False, rolled_back=False) + transaction.batch_update = mock.Mock(return_value=[mock.Mock(code=OK), []]) + return transaction + def test_property_connection(self): connection = self._make_connection(self.INSTANCE, self.DATABASE) cursor = self._make_one(connection) @@ -341,9 +348,8 @@ def test_executemany_delete_batch_autocommit(self): from google.cloud.spanner_dbapi import connect from google.cloud.spanner_v1.param_types import INT64 from google.cloud.spanner_v1.types.spanner import Session - from google.rpc.code_pb2 import OK - sql = """DELETE FROM table WHERE col1 = %s""" + sql = "DELETE FROM table WHERE col1 = %s" with mock.patch( "google.cloud.spanner_v1.instance.Instance.exists", return_value=True @@ -354,11 +360,7 @@ def test_executemany_delete_batch_autocommit(self): connection = connect("test-instance", "test-database") connection.autocommit = True - - transaction = mock.Mock(committed=False, rolled_back=False) - transaction.batch_update = mock.Mock(return_value=[mock.Mock(code=OK), []]) - transaction.commit = mock.Mock() - + transaction = self._transaction_mock() cursor = connection.cursor() with mock.patch( @@ -373,9 +375,9 @@ def test_executemany_delete_batch_autocommit(self): transaction.batch_update.assert_called_once_with( [ - ("""DELETE FROM table WHERE col1 = @a0""", {"a0": 1}, {"a0": INT64},), - ("""DELETE FROM table WHERE col1 = @a0""", {"a0": 2}, {"a0": INT64},), - ("""DELETE FROM table WHERE col1 = @a0""", {"a0": 3}, {"a0": INT64},), + ("DELETE FROM table WHERE col1 = @a0", {"a0": 1}, {"a0": INT64}), + ("DELETE FROM table WHERE col1 = @a0", {"a0": 2}, {"a0": INT64}), + ("DELETE FROM table WHERE col1 = @a0", {"a0": 3}, {"a0": INT64}), ] ) @@ -383,9 +385,8 @@ def test_executemany_update_batch_autocommit(self): from google.cloud.spanner_dbapi import connect from google.cloud.spanner_v1.param_types import INT64, STRING from google.cloud.spanner_v1.types.spanner import Session - from google.rpc.code_pb2 import OK - sql = """UPDATE table SET col1 = %s WHERE col2 = %s""" + sql = "UPDATE table SET col1 = %s WHERE col2 = %s" with mock.patch( "google.cloud.spanner_v1.instance.Instance.exists", return_value=True @@ -396,11 +397,7 @@ def test_executemany_update_batch_autocommit(self): connection = connect("test-instance", "test-database") connection.autocommit = True - - transaction = mock.Mock(committed=False, rolled_back=False) - transaction.batch_update = mock.Mock(return_value=[mock.Mock(code=OK), []]) - transaction.commit = mock.Mock() - + transaction = self._transaction_mock() cursor = connection.cursor() with mock.patch( @@ -416,17 +413,17 @@ def test_executemany_update_batch_autocommit(self): transaction.batch_update.assert_called_once_with( [ ( - """UPDATE table SET col1 = @a0 WHERE col2 = @a1""", + "UPDATE table SET col1 = @a0 WHERE col2 = @a1", {"a0": 1, "a1": "a"}, {"a0": INT64, "a1": STRING}, ), ( - """UPDATE table SET col1 = @a0 WHERE col2 = @a1""", + "UPDATE table SET col1 = @a0 WHERE col2 = @a1", {"a0": 2, "a1": "b"}, {"a0": INT64, "a1": STRING}, ), ( - """UPDATE table SET col1 = @a0 WHERE col2 = @a1""", + "UPDATE table SET col1 = @a0 WHERE col2 = @a1", {"a0": 3, "a1": "c"}, {"a0": INT64, "a1": STRING}, ), @@ -437,7 +434,6 @@ def test_executemany_insert_batch_non_autocommit(self): from google.cloud.spanner_dbapi import connect from google.cloud.spanner_v1.param_types import INT64 from google.cloud.spanner_v1.types.spanner import Session - from google.rpc.code_pb2 import OK sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" @@ -449,8 +445,7 @@ def test_executemany_insert_batch_non_autocommit(self): ): connection = connect("test-instance", "test-database") - transaction = mock.Mock(committed=False, rolled_back=False) - transaction.batch_update = mock.Mock(return_value=[mock.Mock(code=OK), []]) + transaction = self._transaction_mock() cursor = connection.cursor() with mock.patch( @@ -482,7 +477,6 @@ def test_executemany_insert_batch_autocommit(self): from google.cloud.spanner_dbapi import connect from google.cloud.spanner_v1.param_types import INT64 from google.cloud.spanner_v1.types.spanner import Session - from google.rpc.code_pb2 import OK sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" @@ -496,8 +490,7 @@ def test_executemany_insert_batch_autocommit(self): connection.autocommit = True - transaction = mock.Mock(committed=False, rolled_back=False) - transaction.batch_update = mock.Mock(return_value=[mock.Mock(code=OK), []]) + transaction = self._transaction_mock() transaction.commit = mock.Mock() cursor = connection.cursor() @@ -567,7 +560,7 @@ def test_executemany_insert_batch_aborted(self): from google.cloud.spanner_dbapi import connect from google.cloud.spanner_dbapi.checksum import ResultsChecksum from google.cloud.spanner_v1.param_types import INT64 - from google.rpc.code_pb2 import ABORTED, OK + from google.rpc.code_pb2 import ABORTED sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)""" err_details = "Aborted details here" @@ -585,8 +578,7 @@ def test_executemany_insert_batch_aborted(self): side_effect=[(mock.Mock(code=ABORTED, details=err_details), [])] ) - transaction2 = mock.Mock(committed=False, rolled_back=False) - transaction2.batch_update = mock.Mock(side_effect=[(mock.Mock(code=OK), [])]) + transaction2 = self._transaction_mock() connection.transaction_checkout = mock.Mock( side_effect=[transaction1, transaction2] From 404e5211f8ce074f6fbec333405c7b0996cd7c27 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Thu, 29 Jul 2021 10:58:32 +0300 Subject: [PATCH 18/21] fix merge conflict --- tests/unit/spanner_dbapi/test_connection.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/unit/spanner_dbapi/test_connection.py b/tests/unit/spanner_dbapi/test_connection.py index 8ebd41e4db..48129dcc2f 100644 --- a/tests/unit/spanner_dbapi/test_connection.py +++ b/tests/unit/spanner_dbapi/test_connection.py @@ -157,8 +157,13 @@ def test_transaction_checkout(self): self.assertEqual(connection.transaction_checkout(), mock_transaction) - def test_close(self): - from google.cloud.spanner_dbapi import connect, InterfaceError + connection._autocommit = True + self.assertIsNone(connection.transaction_checkout()) + + @mock.patch("google.cloud.spanner_v1.Client") + def test_close(self, mock_client): + from google.cloud.spanner_dbapi import connect + from google.cloud.spanner_dbapi import InterfaceError connection = connect("test-instance", "test-database") From 916ec7ac91945b50cc3dcb12792f3912cce07482 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 30 Jul 2021 11:49:54 +0300 Subject: [PATCH 19/21] fix the unit test --- tests/unit/spanner_dbapi/test_connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/spanner_dbapi/test_connection.py b/tests/unit/spanner_dbapi/test_connection.py index 48129dcc2f..b30cbe15af 100644 --- a/tests/unit/spanner_dbapi/test_connection.py +++ b/tests/unit/spanner_dbapi/test_connection.py @@ -158,7 +158,7 @@ def test_transaction_checkout(self): self.assertEqual(connection.transaction_checkout(), mock_transaction) connection._autocommit = True - self.assertIsNone(connection.transaction_checkout()) + self.assertIsNotNone(connection.transaction_checkout()) @mock.patch("google.cloud.spanner_v1.Client") def test_close(self, mock_client): From 6125727186ca7834c0a6874a332f3b9f4f4e8cc1 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 4 Aug 2021 11:21:33 +0300 Subject: [PATCH 20/21] revert some changes --- google/cloud/spanner_dbapi/connection.py | 11 +++++++---- tests/unit/spanner_dbapi/test_connection.py | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index 2ca7edbea2..110e0f9b9b 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -219,14 +219,17 @@ def transaction_checkout(self): Begin a new transaction, if there is no transaction in this connection yet. Return the begun one otherwise. + The method is non operational in autocommit mode. + :rtype: :class:`google.cloud.spanner_v1.transaction.Transaction` :returns: A Cloud Spanner transaction object, ready to use. """ - if not self.inside_transaction: - self._transaction = self._session_checkout().transaction() - self._transaction.begin() + if not self.autocommit: + if not self.inside_transaction: + self._transaction = self._session_checkout().transaction() + self._transaction.begin() - return self._transaction + return self._transaction def _raise_if_closed(self): """Helper to check the connection state before running a query. diff --git a/tests/unit/spanner_dbapi/test_connection.py b/tests/unit/spanner_dbapi/test_connection.py index b30cbe15af..48129dcc2f 100644 --- a/tests/unit/spanner_dbapi/test_connection.py +++ b/tests/unit/spanner_dbapi/test_connection.py @@ -158,7 +158,7 @@ def test_transaction_checkout(self): self.assertEqual(connection.transaction_checkout(), mock_transaction) connection._autocommit = True - self.assertIsNotNone(connection.transaction_checkout()) + self.assertIsNone(connection.transaction_checkout()) @mock.patch("google.cloud.spanner_v1.Client") def test_close(self, mock_client): From b22be9843370ce6575173d20a33fa698401f9df6 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Mon, 9 Aug 2021 11:33:44 +0300 Subject: [PATCH 21/21] use executemany for test data insert --- tests/system/test_system_dbapi.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/system/test_system_dbapi.py b/tests/system/test_system_dbapi.py index 6ca1029ae1..28636a561c 100644 --- a/tests/system/test_system_dbapi.py +++ b/tests/system/test_system_dbapi.py @@ -343,20 +343,20 @@ def test_execute_many(self): conn = Connection(Config.INSTANCE, self._db) cursor = conn.cursor() - cursor.execute( + cursor.executemany( """ INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (1, 'first-name', 'last-name', 'test.email@example.com'), - (2, 'first-name2', 'last-name2', 'test.email2@example.com') - """ +VALUES (%s, %s, %s, %s) + """, + [ + (1, "first-name", "last-name", "test.email@example.com"), + (2, "first-name2", "last-name2", "test.email2@example.com"), + ], ) conn.commit() cursor.executemany( - """ -SELECT * FROM contacts WHERE contact_id = @a1 -""", - ({"a1": 1}, {"a1": 2}), + """SELECT * FROM contacts WHERE contact_id = @a1""", ({"a1": 1}, {"a1": 2}), ) res = cursor.fetchall() conn.commit()