From 75bf1993a8db1e36281fb5d052728136c797d9ed Mon Sep 17 00:00:00 2001 From: ankiaga Date: Wed, 22 Nov 2023 11:38:28 +0530 Subject: [PATCH 1/9] fix: Refactoring tests to use fixtures properly --- tests/system/test_dbapi.py | 1273 +++++++++++++++++------------------- 1 file changed, 588 insertions(+), 685 deletions(-) diff --git a/tests/system/test_dbapi.py b/tests/system/test_dbapi.py index bd49e478ba..73356fb324 100644 --- a/tests/system/test_dbapi.py +++ b/tests/system/test_dbapi.py @@ -21,9 +21,7 @@ from google.cloud import spanner_v1 from google.cloud._helpers import UTC -from google.cloud.spanner_dbapi import Cursor -from google.cloud.spanner_dbapi.connection import connect -from google.cloud.spanner_dbapi.connection import Connection +from google.cloud.spanner_dbapi.connection import Connection, connect from google.cloud.spanner_dbapi.exceptions import ProgrammingError from google.cloud.spanner_v1 import JsonObject from google.cloud.spanner_v1 import gapic_version as package_version @@ -44,10 +42,10 @@ @pytest.fixture(scope="session") def raw_database(shared_instance, database_operation_timeout, not_postgres): - databse_id = _helpers.unique_id("dbapi-txn") + database_id = _helpers.unique_id("dbapi-txn") pool = spanner_v1.BurstyPool(labels={"testcase": "database_api"}) database = shared_instance.database( - databse_id, + database_id, ddl_statements=DDL_STATEMENTS, pool=pool, ) @@ -59,779 +57,684 @@ def raw_database(shared_instance, database_operation_timeout, not_postgres): database.drop() -def clear_table(transaction): - transaction.execute_update("DELETE FROM contacts WHERE true") +class TestDbApi: + @staticmethod + def clear_table(transaction): + transaction.execute_update("DELETE FROM contacts WHERE true") + @pytest.fixture(scope="function") + def dbapi_database(self, raw_database): + raw_database.run_in_transaction(self.clear_table) -@pytest.fixture(scope="function") -def dbapi_database(raw_database): - raw_database.run_in_transaction(clear_table) + yield raw_database - yield raw_database + raw_database.run_in_transaction(self.clear_table) - raw_database.run_in_transaction(clear_table) + @pytest.fixture(autouse=True) + def init_connection(self, shared_instance, dbapi_database): + self._conn = Connection(shared_instance, dbapi_database) + self._cursor = self._conn.cursor() + yield + self._cursor.close() + self._conn.close() + @pytest.fixture + def execute_common_statements(self): + # execute several DML statements within one transaction + self._cursor.execute( + """ + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (1, 'first-name', 'last-name', 'test.email@domen.ru') + """ + ) + self._cursor.execute( + """ + UPDATE contacts + SET first_name = 'updated-first-name' + WHERE first_name = 'first-name' + """ + ) + self._cursor.execute( + """ + UPDATE contacts + SET email = 'test.email_updated@domen.ru' + WHERE email = 'test.email@domen.ru' + """ + ) -def test_commit(shared_instance, dbapi_database): - """Test committing a transaction with several statements.""" - # connect to the test database - conn = Connection(shared_instance, dbapi_database) - cursor = conn.cursor() - - want_row = _execute_common_precommit_statements(cursor) - conn.commit() - - # read the resulting data from the database - cursor.execute("SELECT * FROM contacts") - got_rows = cursor.fetchall() - conn.commit() - - assert got_rows == [want_row] - - cursor.close() - conn.close() - + @pytest.fixture + def updated_row(self, execute_common_statements): + return ( + 1, + "updated-first-name", + "last-name", + "test.email_updated@domen.ru", + ) -def test_commit_client_side(shared_instance, dbapi_database): - """Test committing a transaction with several statements.""" - # connect to the test database - conn = Connection(shared_instance, dbapi_database) - cursor = conn.cursor() + def test_commit(self, updated_row): + """Test committing a transaction with several statements.""" + self._conn.commit() - want_row = _execute_common_precommit_statements(cursor) - cursor.execute("""COMMIT""") + # read the resulting data from the database + self._cursor.execute("SELECT * FROM contacts") + got_rows = self._cursor.fetchall() + self._conn.commit() - # read the resulting data from the database - cursor.execute("SELECT * FROM contacts") - got_rows = cursor.fetchall() - conn.commit() - cursor.close() - conn.close() + assert got_rows == [updated_row] - assert got_rows == [want_row] + def test_commit_client_side(self, updated_row): + """Test committing a transaction with several statements.""" + self._cursor.execute("""COMMIT""") + # read the resulting data from the database + self._cursor.execute("SELECT * FROM contacts") + got_rows = self._cursor.fetchall() + self._conn.commit() -def test_rollback(shared_instance, dbapi_database): - """Test rollbacking a transaction with several statements.""" - want_row = (2, "first-name", "last-name", "test.email@domen.ru") - # connect to the test database - conn = Connection(shared_instance, dbapi_database) - cursor = conn.cursor() + assert got_rows == [updated_row] - cursor.execute( - """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (2, 'first-name', 'last-name', 'test.email@domen.ru') - """ - ) - conn.commit() + def test_rollback(self): + """Test rollbacking a transaction with several statements.""" + want_row = (2, "first-name", "last-name", "test.email@domen.ru") - # execute several DMLs with one transaction - cursor.execute( - """ -UPDATE contacts -SET first_name = 'updated-first-name' -WHERE first_name = 'first-name' -""" - ) - cursor.execute( + self._cursor.execute( + """ + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (2, 'first-name', 'last-name', 'test.email@domen.ru') """ -UPDATE contacts -SET email = 'test.email_updated@domen.ru' -WHERE email = 'test.email@domen.ru' -""" - ) - conn.rollback() - - # read the resulting data from the database - cursor.execute("SELECT * FROM contacts") - got_rows = cursor.fetchall() - conn.commit() - - assert got_rows == [want_row] - - cursor.close() - conn.close() - + ) + self._conn.commit() -def test_autocommit_mode_change(shared_instance, dbapi_database): - """Test auto committing a transaction on `autocommit` mode change.""" - want_row = ( - 2, - "updated-first-name", - "last-name", - "test.email@domen.ru", - ) - # connect to the test database - conn = Connection(shared_instance, dbapi_database) - cursor = conn.cursor() + # execute several DMLs with one transaction + self._cursor.execute( + """ + UPDATE contacts + SET first_name = 'updated-first-name' + WHERE first_name = 'first-name' + """ + ) + self._cursor.execute( + """ + UPDATE contacts + SET email = 'test.email_updated@domen.ru' + WHERE email = 'test.email@domen.ru' + """ + ) + self._conn.rollback() + + # read the resulting data from the database + self._cursor.execute("SELECT * FROM contacts") + got_rows = self._cursor.fetchall() + self._conn.commit() + + assert got_rows == [want_row] + + def test_autocommit_mode_change(self): + """Test auto committing a transaction on `autocommit` mode change.""" + want_row = ( + 2, + "updated-first-name", + "last-name", + "test.email@domen.ru", + ) - cursor.execute( + self._cursor.execute( + """ + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (2, 'first-name', 'last-name', 'test.email@domen.ru') """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (2, 'first-name', 'last-name', 'test.email@domen.ru') + ) + self._cursor.execute( + """ + UPDATE contacts + SET first_name = 'updated-first-name' + WHERE first_name = 'first-name' """ - ) - cursor.execute( - """ -UPDATE contacts -SET first_name = 'updated-first-name' -WHERE first_name = 'first-name' -""" - ) - conn.autocommit = True - - # read the resulting data from the database - cursor.execute("SELECT * FROM contacts") - got_rows = cursor.fetchall() - - assert got_rows == [want_row] + ) + self._conn.autocommit = True - cursor.close() - conn.close() + # read the resulting data from the database + self._cursor.execute("SELECT * FROM contacts") + got_rows = self._cursor.fetchall() + assert got_rows == [want_row] -def test_rollback_on_connection_closing(shared_instance, dbapi_database): - """ - When closing a connection all the pending transactions - must be rollbacked. Testing if it's working this way. - """ - want_row = (1, "first-name", "last-name", "test.email@domen.ru") - # connect to the test database - conn = Connection(shared_instance, dbapi_database) - cursor = conn.cursor() - - cursor.execute( + def test_rollback_on_connection_closing(self, shared_instance, dbapi_database): """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (1, 'first-name', 'last-name', 'test.email@domen.ru') - """ - ) - conn.commit() + When closing a connection all the pending transactions + must be rollbacked. Testing if it's working this way. + """ + want_row = (1, "first-name", "last-name", "test.email@domen.ru") + # connect to the test database + conn = Connection(shared_instance, dbapi_database) + cursor = conn.cursor() - cursor.execute( + cursor.execute( + """ + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (1, 'first-name', 'last-name', 'test.email@domen.ru') """ -UPDATE contacts -SET first_name = 'updated-first-name' -WHERE first_name = 'first-name' -""" - ) - conn.close() + ) + conn.commit() - # connect again, as the previous connection is no-op after closing - conn = Connection(shared_instance, dbapi_database) - cursor = conn.cursor() + cursor.execute( + """ + UPDATE contacts + SET first_name = 'updated-first-name' + WHERE first_name = 'first-name' + """ + ) + conn.close() - # read the resulting data from the database - cursor.execute("SELECT * FROM contacts") - got_rows = cursor.fetchall() - conn.commit() + # connect again, as the previous connection is no-op after closing + conn = Connection(shared_instance, dbapi_database) + cursor = conn.cursor() - assert got_rows == [want_row] + # read the resulting data from the database + cursor.execute("SELECT * FROM contacts") + got_rows = cursor.fetchall() + conn.commit() - cursor.close() - conn.close() + assert got_rows == [want_row] + cursor.close() + conn.close() -def test_results_checksum(shared_instance, dbapi_database): - """Test that results checksum is calculated properly.""" - conn = Connection(shared_instance, dbapi_database) - cursor = conn.cursor() + def test_results_checksum(self): + """Test that results checksum is calculated properly.""" - cursor.execute( + self._cursor.execute( + """ + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES + (1, 'first-name', 'last-name', 'test.email@domen.ru'), + (2, 'first-name2', 'last-name2', 'test.email2@domen.ru') """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES -(1, 'first-name', 'last-name', 'test.email@domen.ru'), -(2, 'first-name2', 'last-name2', 'test.email2@domen.ru') - """ - ) - assert len(conn._statements) == 1 - conn.commit() - - cursor.execute("SELECT * FROM contacts") - got_rows = cursor.fetchall() - - assert len(conn._statements) == 1 - conn.commit() + ) + assert len(self._conn._statements) == 1 + self._conn.commit() - checksum = hashlib.sha256() - checksum.update(pickle.dumps(got_rows[0])) - checksum.update(pickle.dumps(got_rows[1])) + self._cursor.execute("SELECT * FROM contacts") + got_rows = self._cursor.fetchall() - assert cursor._checksum.checksum.digest() == checksum.digest() + assert len(self._conn._statements) == 1 + self._conn.commit() + checksum = hashlib.sha256() + checksum.update(pickle.dumps(got_rows[0])) + checksum.update(pickle.dumps(got_rows[1])) -def test_execute_many(shared_instance, dbapi_database): - # connect to the test database - conn = Connection(shared_instance, dbapi_database) - cursor = conn.cursor() + assert self._cursor._checksum.checksum.digest() == checksum.digest() - row_data = [ - (1, "first-name", "last-name", "test.email@example.com"), - (2, "first-name2", "last-name2", "test.email2@example.com"), - ] - cursor.executemany( - """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (%s, %s, %s, %s) - """, - row_data, - ) - conn.commit() - - cursor.executemany( - """SELECT * FROM contacts WHERE contact_id = %s""", - ((1,), (2,)), - ) - res = cursor.fetchall() - conn.commit() + def test_execute_many(self): + row_data = [ + (1, "first-name", "last-name", "test.email@example.com"), + (2, "first-name2", "last-name2", "test.email2@example.com"), + ] + self._cursor.executemany( + """ + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (%s, %s, %s, %s) + """, + row_data, + ) + self._conn.commit() - assert len(res) == len(row_data) - for found, expected in zip(res, row_data): - assert found[0] == expected[0] + self._cursor.executemany( + """SELECT * FROM contacts WHERE contact_id = %s""", + ((1,), (2,)), + ) + res = self._cursor.fetchall() + self._conn.commit() - # checking that execute() and executemany() - # results are not mixed together - cursor.execute( - """ -SELECT * FROM contacts WHERE contact_id = 1 -""", - ) - res = cursor.fetchone() - conn.commit() + assert len(res) == len(row_data) + for found, expected in zip(res, row_data): + assert found[0] == expected[0] - assert res[0] == 1 - conn.close() + # checking that execute() and executemany() + # results are not mixed together + self._cursor.execute( + """ + SELECT * FROM contacts WHERE contact_id = 1 + """, + ) + res = self._cursor.fetchone() + self._conn.commit() + assert res[0] == 1 -def test_DDL_autocommit(shared_instance, dbapi_database): - """Check that DDLs in autocommit mode are immediately executed.""" + def test_DDL_autocommit(self, shared_instance, dbapi_database): + """Check that DDLs in autocommit mode are immediately executed.""" - try: - conn = Connection(shared_instance, dbapi_database) - conn.autocommit = True + try: + conn = Connection(shared_instance, dbapi_database) + conn.autocommit = True - cur = conn.cursor() - cur.execute( + cur = conn.cursor() + cur.execute( + """ + CREATE TABLE Singers ( + SingerId INT64 NOT NULL, + Name STRING(1024), + ) PRIMARY KEY (SingerId) """ - CREATE TABLE Singers ( + ) + conn.close() + + # if previous DDL wasn't committed, the next DROP TABLE + # statement will fail with a ProgrammingError + conn = Connection(shared_instance, dbapi_database) + cur = conn.cursor() + + cur.execute("DROP TABLE Singers") + conn.commit() + finally: + # Delete table + table = dbapi_database.table("Singers") + if table.exists(): + op = dbapi_database.update_ddl(["DROP TABLE Singers"]) + op.result() + + def test_ddl_execute_autocommit_true(self, dbapi_database): + """Check that DDL statement in autocommit mode results in successful + DDL statement execution for execute method.""" + + self._conn.autocommit = True + self._cursor.execute( + """ + CREATE TABLE DdlExecuteAutocommit ( SingerId INT64 NOT NULL, Name STRING(1024), ) PRIMARY KEY (SingerId) - """ + """ ) - conn.close() - - # if previous DDL wasn't committed, the next DROP TABLE - # statement will fail with a ProgrammingError - conn = Connection(shared_instance, dbapi_database) - cur = conn.cursor() - - cur.execute("DROP TABLE Singers") - conn.commit() - finally: - # Delete table - table = dbapi_database.table("Singers") - if table.exists(): - op = dbapi_database.update_ddl(["DROP TABLE Singers"]) - op.result() - - -def test_ddl_execute_autocommit_true(shared_instance, dbapi_database): - """Check that DDL statement in autocommit mode results in successful - DDL statement execution for execute method.""" - - conn = Connection(shared_instance, dbapi_database) - conn.autocommit = True - cur = conn.cursor() - cur.execute( - """ - CREATE TABLE DdlExecuteAutocommit ( - SingerId INT64 NOT NULL, - Name STRING(1024), - ) PRIMARY KEY (SingerId) - """ - ) - table = dbapi_database.table("DdlExecuteAutocommit") - assert table.exists() is True - - cur.close() - conn.close() - - -def test_ddl_executemany_autocommit_true(shared_instance, dbapi_database): - """Check that DDL statement in autocommit mode results in exception for - executemany method .""" - - conn = Connection(shared_instance, dbapi_database) - conn.autocommit = True - cur = conn.cursor() - with pytest.raises(ProgrammingError): - cur.executemany( + table = dbapi_database.table("DdlExecuteAutocommit") + assert table.exists() is True + + def test_ddl_executemany_autocommit_true(self, dbapi_database): + """Check that DDL statement in autocommit mode results in exception for + executemany method .""" + + self._conn.autocommit = True + with pytest.raises(ProgrammingError): + self._cursor.executemany( + """ + CREATE TABLE DdlExecuteManyAutocommit ( + SingerId INT64 NOT NULL, + Name STRING(1024), + ) PRIMARY KEY (SingerId) + """, + [], + ) + table = dbapi_database.table("DdlExecuteManyAutocommit") + assert table.exists() is False + + def test_ddl_executemany_autocommit_false(self, dbapi_database): + """Check that DDL statement in non-autocommit mode results in exception for + executemany method .""" + with pytest.raises(ProgrammingError): + self._cursor.executemany( + """ + CREATE TABLE DdlExecuteManyAutocommit ( + SingerId INT64 NOT NULL, + Name STRING(1024), + ) PRIMARY KEY (SingerId) + """, + [], + ) + table = dbapi_database.table("DdlExecuteManyAutocommit") + assert table.exists() is False + + def test_ddl_execute(self, dbapi_database): + """Check that DDL statement followed by non-DDL execute statement in + non autocommit mode results in successful DDL statement execution.""" + + want_row = ( + 1, + "first-name", + ) + self._cursor.execute( """ - CREATE TABLE DdlExecuteManyAutocommit ( + CREATE TABLE DdlExecute ( SingerId INT64 NOT NULL, Name STRING(1024), ) PRIMARY KEY (SingerId) - """, - [], + """ ) - table = dbapi_database.table("DdlExecuteManyAutocommit") - assert table.exists() is False + table = dbapi_database.table("DdlExecute") + assert table.exists() is False - cur.close() - conn.close() + self._cursor.execute( + """ + INSERT INTO DdlExecute (SingerId, Name) + VALUES (1, "first-name") + """ + ) + assert table.exists() is True + self._conn.commit() + + # read the resulting data from the database + self._cursor.execute("SELECT * FROM DdlExecute") + got_rows = self._cursor.fetchall() + assert got_rows == [want_row] -def test_ddl_executemany_autocommit_false(shared_instance, dbapi_database): - """Check that DDL statement in non-autocommit mode results in exception for - executemany method .""" + def test_ddl_executemany(self, dbapi_database): + """Check that DDL statement followed by non-DDL executemany statement in + non autocommit mode results in successful DDL statement execution.""" - conn = Connection(shared_instance, dbapi_database) - cur = conn.cursor() - with pytest.raises(ProgrammingError): - cur.executemany( + want_row = ( + 1, + "first-name", + ) + self._cursor.execute( """ - CREATE TABLE DdlExecuteManyAutocommit ( + CREATE TABLE DdlExecuteMany ( SingerId INT64 NOT NULL, Name STRING(1024), ) PRIMARY KEY (SingerId) - """, - [], + """ ) - table = dbapi_database.table("DdlExecuteManyAutocommit") - assert table.exists() is False - - cur.close() - conn.close() + table = dbapi_database.table("DdlExecuteMany") + assert table.exists() is False + self._cursor.executemany( + """ + INSERT INTO DdlExecuteMany (SingerId, Name) + VALUES (%s, %s) + """, + [want_row], + ) + assert table.exists() is True + self._conn.commit() -def test_ddl_execute(shared_instance, dbapi_database): - """Check that DDL statement followed by non-DDL execute statement in - non autocommit mode results in successful DDL statement execution.""" + # read the resulting data from the database + self._cursor.execute("SELECT * FROM DdlExecuteMany") + got_rows = self._cursor.fetchall() - conn = Connection(shared_instance, dbapi_database) - want_row = ( - 1, - "first-name", - ) - cur = conn.cursor() - cur.execute( - """ - CREATE TABLE DdlExecute ( - SingerId INT64 NOT NULL, - Name STRING(1024), - ) PRIMARY KEY (SingerId) - """ - ) - table = dbapi_database.table("DdlExecute") - assert table.exists() is False + assert got_rows == [want_row] - cur.execute( + @pytest.mark.skipif(_helpers.USE_EMULATOR, reason="Emulator does not support json.") + def test_autocommit_with_json_data(self, dbapi_database): """ - INSERT INTO DdlExecute (SingerId, Name) - VALUES (1, "first-name") + Check that DDLs in autocommit mode are immediately + executed for json fields. """ - ) - assert table.exists() is True - conn.commit() - - # read the resulting data from the database - cur.execute("SELECT * FROM DdlExecute") - got_rows = cur.fetchall() - - assert got_rows == [want_row] - - cur.close() - conn.close() - - -def test_ddl_executemany(shared_instance, dbapi_database): - """Check that DDL statement followed by non-DDL executemany statement in - non autocommit mode results in successful DDL statement execution.""" + try: + self._conn.autocommit = True + self._cursor.execute( + """ + CREATE TABLE JsonDetails ( + DataId INT64 NOT NULL, + Details JSON, + ) PRIMARY KEY (DataId) + """ + ) + + # Insert data to table + self._cursor.execute( + sql="INSERT INTO JsonDetails (DataId, Details) VALUES (%s, %s)", + args=(123, JsonObject({"name": "Jakob", "age": "26"})), + ) + + # Read back the data. + self._cursor.execute("""select * from JsonDetails;""") + got_rows = self._cursor.fetchall() + + # Assert the response + assert len(got_rows) == 1 + assert got_rows[0][0] == 123 + assert got_rows[0][1] == {"age": "26", "name": "Jakob"} + + # Drop the table + self._cursor.execute("DROP TABLE JsonDetails") + self._conn.commit() + finally: + # Delete table + table = dbapi_database.table("JsonDetails") + if table.exists(): + op = dbapi_database.update_ddl(["DROP TABLE JsonDetails"]) + op.result() + + @pytest.mark.skipif(_helpers.USE_EMULATOR, reason="Emulator does not support json.") + def test_json_array(self, dbapi_database): + try: + # Create table + self._conn.autocommit = True + + self._cursor.execute( + """ + CREATE TABLE JsonDetails ( + DataId INT64 NOT NULL, + Details JSON, + ) PRIMARY KEY (DataId) + """ + ) + self._cursor.execute( + "INSERT INTO JsonDetails (DataId, Details) VALUES (%s, %s)", + [1, JsonObject([1, 2, 3])], + ) + + self._cursor.execute("SELECT * FROM JsonDetails WHERE DataId = 1") + row = self._cursor.fetchone() + assert isinstance(row[1], JsonObject) + assert row[1].serialize() == "[1,2,3]" + + self._cursor.execute("DROP TABLE JsonDetails") + finally: + # Delete table + table = dbapi_database.table("JsonDetails") + if table.exists(): + op = dbapi_database.update_ddl(["DROP TABLE JsonDetails"]) + op.result() + + def test_DDL_commit(self, shared_instance, dbapi_database): + """Check that DDLs in commit mode are executed on calling `commit()`.""" + try: + conn = Connection(shared_instance, dbapi_database) + cur = conn.cursor() + + cur.execute( + """ + CREATE TABLE Singers ( + SingerId INT64 NOT NULL, + Name STRING(1024), + ) PRIMARY KEY (SingerId) + """ + ) + conn.commit() + conn.close() + + # if previous DDL wasn't committed, the next DROP TABLE + # statement will fail with a ProgrammingError + conn = Connection(shared_instance, dbapi_database) + cur = conn.cursor() + + cur.execute("DROP TABLE Singers") + conn.commit() + finally: + # Delete table + table = dbapi_database.table("Singers") + if table.exists(): + op = dbapi_database.update_ddl(["DROP TABLE Singers"]) + op.result() + + def test_ping(self): + """Check connection validation method.""" + self._conn.validate() + + def test_user_agent(self, shared_instance, dbapi_database): + """Check that DB API uses an appropriate user agent.""" + conn = connect(shared_instance.name, dbapi_database.name) + assert ( + conn.instance._client._client_info.user_agent + == "gl-dbapi/" + package_version.__version__ + ) + assert ( + conn.instance._client._client_info.client_library_version + == package_version.__version__ + ) - conn = Connection(shared_instance, dbapi_database) - want_row = ( - 1, - "first-name", - ) - cur = conn.cursor() - cur.execute( + def test_read_only(self): """ - CREATE TABLE DdlExecuteMany ( - SingerId INT64 NOT NULL, - Name STRING(1024), - ) PRIMARY KEY (SingerId) - """ - ) - table = dbapi_database.table("DdlExecuteMany") - assert table.exists() is False - - cur.executemany( + Check that connection set to `read_only=True` uses + ReadOnly transactions. """ - INSERT INTO DdlExecuteMany (SingerId, Name) - VALUES (%s, %s) - """, - [want_row], - ) - assert table.exists() is True - conn.commit() - - # read the resulting data from the database - cur.execute("SELECT * FROM DdlExecuteMany") - got_rows = cur.fetchall() - - assert got_rows == [want_row] - cur.close() - conn.close() - - -@pytest.mark.skipif(_helpers.USE_EMULATOR, reason="Emulator does not support json.") -def test_autocommit_with_json_data(shared_instance, dbapi_database): + self._conn.read_only = True + with pytest.raises(ProgrammingError): + self._cursor.execute( + """ + UPDATE contacts + SET first_name = 'updated-first-name' + WHERE first_name = 'first-name' """ - Check that DDLs in autocommit mode are immediately - executed for json fields. - """ - try: - # Create table - conn = Connection(shared_instance, dbapi_database) - conn.autocommit = True + ) - cur = conn.cursor() - cur.execute( - """ - CREATE TABLE JsonDetails ( - DataId INT64 NOT NULL, - Details JSON, - ) PRIMARY KEY (DataId) - """ - ) + self._cursor.execute("SELECT * FROM contacts") + self._conn.commit() - # Insert data to table - cur.execute( - sql="INSERT INTO JsonDetails (DataId, Details) VALUES (%s, %s)", - args=(123, JsonObject({"name": "Jakob", "age": "26"})), - ) + def test_staleness(self): + """Check the DB API `staleness` option.""" - # Read back the data. - cur.execute("""select * from JsonDetails;""") - got_rows = cur.fetchall() + before_insert = datetime.datetime.utcnow().replace(tzinfo=UTC) + time.sleep(0.25) - # Assert the response - assert len(got_rows) == 1 - assert got_rows[0][0] == 123 - assert got_rows[0][1] == {"age": "26", "name": "Jakob"} - - # Drop the table - cur.execute("DROP TABLE JsonDetails") - conn.commit() - conn.close() - finally: - # Delete table - table = dbapi_database.table("JsonDetails") - if table.exists(): - op = dbapi_database.update_ddl(["DROP TABLE JsonDetails"]) - op.result() - - -@pytest.mark.skipif(_helpers.USE_EMULATOR, reason="Emulator does not support json.") -def test_json_array(shared_instance, dbapi_database): - try: - # Create table - conn = Connection(shared_instance, dbapi_database) - conn.autocommit = True - - cur = conn.cursor() - cur.execute( + self._cursor.execute( """ - CREATE TABLE JsonDetails ( - DataId INT64 NOT NULL, - Details JSON, - ) PRIMARY KEY (DataId) + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (1, 'first-name', 'last-name', 'test.email@example.com') """ ) - cur.execute( - "INSERT INTO JsonDetails (DataId, Details) VALUES (%s, %s)", - [1, JsonObject([1, 2, 3])], - ) - - cur.execute("SELECT * FROM JsonDetails WHERE DataId = 1") - row = cur.fetchone() - assert isinstance(row[1], JsonObject) - assert row[1].serialize() == "[1,2,3]" - - cur.execute("DROP TABLE JsonDetails") - conn.close() - finally: - # Delete table - table = dbapi_database.table("JsonDetails") - if table.exists(): - op = dbapi_database.update_ddl(["DROP TABLE JsonDetails"]) - op.result() - - -def test_DDL_commit(shared_instance, dbapi_database): - """Check that DDLs in commit mode are executed on calling `commit()`.""" - try: - conn = Connection(shared_instance, dbapi_database) - cur = conn.cursor() - - cur.execute( + self._conn.commit() + + self._conn.read_only = True + self._conn.staleness = {"read_timestamp": before_insert} + self._cursor.execute("SELECT * FROM contacts") + self._conn.commit() + assert len(self._cursor.fetchall()) == 0 + + self._conn.staleness = None + self._cursor.execute("SELECT * FROM contacts") + self._conn.commit() + assert len(self._cursor.fetchall()) == 1 + + @pytest.mark.parametrize("autocommit", [False, True]) + def test_rowcount(self, dbapi_database, autocommit): + try: + self._conn.autocommit = autocommit + + self._cursor.execute( + """ + CREATE TABLE Singers ( + SingerId INT64 NOT NULL, + Name STRING(1024), + ) PRIMARY KEY (SingerId) """ - CREATE TABLE Singers ( - SingerId INT64 NOT NULL, - Name STRING(1024), - ) PRIMARY KEY (SingerId) + ) + self._conn.commit() + + # executemany sets rowcount to the total modified rows + rows = [(i, f"Singer {i}") for i in range(100)] + self._cursor.executemany( + "INSERT INTO Singers (SingerId, Name) VALUES (%s, %s)", rows[:98] + ) + assert self._cursor.rowcount == 98 + + # execute with INSERT + self._cursor.execute( + "INSERT INTO Singers (SingerId, Name) VALUES (%s, %s), (%s, %s)", + [x for row in rows[98:] for x in row], + ) + assert self._cursor.rowcount == 2 + + # execute with UPDATE + self._cursor.execute("UPDATE Singers SET Name = 'Cher' WHERE SingerId < 25") + assert self._cursor.rowcount == 25 + + # execute with SELECT + self._cursor.execute("SELECT Name FROM Singers WHERE SingerId < 75") + assert len(self._cursor.fetchall()) == 75 + # rowcount is not available for SELECT + assert self._cursor.rowcount == -1 + + # execute with DELETE + self._cursor.execute("DELETE FROM Singers") + assert self._cursor.rowcount == 100 + + # execute with UPDATE matching 0 rows + self._cursor.execute("UPDATE Singers SET Name = 'Cher' WHERE SingerId < 25") + assert self._cursor.rowcount == 0 + + self._conn.commit() + self._cursor.execute("DROP TABLE Singers") + self._conn.commit() + finally: + # Delete table + table = dbapi_database.table("Singers") + if table.exists(): + op = dbapi_database.update_ddl(["DROP TABLE Singers"]) + op.result() + + @pytest.mark.parametrize("autocommit", [False, True]) + @pytest.mark.skipif( + _helpers.USE_EMULATOR, reason="Emulator does not support DML Returning." + ) + def test_dml_returning_insert(self, autocommit): + self._conn.autocommit = autocommit + self._cursor.execute( + """ + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (1, 'first-name', 'last-name', 'test.email@example.com') + THEN RETURN contact_id, first_name """ ) - conn.commit() - conn.close() + assert self._cursor.fetchone() == (1, "first-name") + assert self._cursor.rowcount == 1 + self._conn.commit() - # if previous DDL wasn't committed, the next DROP TABLE - # statement will fail with a ProgrammingError - conn = Connection(shared_instance, dbapi_database) - cur = conn.cursor() - - cur.execute("DROP TABLE Singers") - conn.commit() - finally: - # Delete table - table = dbapi_database.table("Singers") - if table.exists(): - op = dbapi_database.update_ddl(["DROP TABLE Singers"]) - op.result() - - -def test_ping(shared_instance, dbapi_database): - """Check connection validation method.""" - conn = Connection(shared_instance, dbapi_database) - conn.validate() - conn.close() - - -def test_user_agent(shared_instance, dbapi_database): - """Check that DB API uses an appropriate user agent.""" - conn = connect(shared_instance.name, dbapi_database.name) - assert ( - conn.instance._client._client_info.user_agent - == "gl-dbapi/" + package_version.__version__ - ) - assert ( - conn.instance._client._client_info.client_library_version - == package_version.__version__ + @pytest.mark.parametrize("autocommit", [False, True]) + @pytest.mark.skipif( + _helpers.USE_EMULATOR, reason="Emulator does not support DML Returning." ) - - -def test_read_only(shared_instance, dbapi_database): - """ - Check that connection set to `read_only=True` uses - ReadOnly transactions. - """ - conn = Connection(shared_instance, dbapi_database, read_only=True) - cur = conn.cursor() - - with pytest.raises(ProgrammingError): - cur.execute( + def test_dml_returning_update(self, autocommit): + self._conn.autocommit = autocommit + self._cursor.execute( """ -UPDATE contacts -SET first_name = 'updated-first-name' -WHERE first_name = 'first-name' -""" - ) - - cur.execute("SELECT * FROM contacts") - conn.commit() - - -def test_staleness(shared_instance, dbapi_database): - """Check the DB API `staleness` option.""" - conn = Connection(shared_instance, dbapi_database) - cursor = conn.cursor() - - before_insert = datetime.datetime.utcnow().replace(tzinfo=UTC) - time.sleep(0.25) - - cursor.execute( + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (1, 'first-name', 'last-name', 'test.email@example.com') """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (1, 'first-name', 'last-name', 'test.email@example.com') - """ - ) - conn.commit() - - conn.read_only = True - conn.staleness = {"read_timestamp": before_insert} - cursor.execute("SELECT * FROM contacts") - conn.commit() - assert len(cursor.fetchall()) == 0 - - conn.staleness = None - cursor.execute("SELECT * FROM contacts") - conn.commit() - assert len(cursor.fetchall()) == 1 - - conn.close() - - -@pytest.mark.parametrize("autocommit", [False, True]) -def test_rowcount(shared_instance, dbapi_database, autocommit): - try: - conn = Connection(shared_instance, dbapi_database) - conn.autocommit = autocommit - cur = conn.cursor() - - cur.execute( + ) + assert self._cursor.rowcount == 1 + self._cursor.execute( """ - CREATE TABLE Singers ( - SingerId INT64 NOT NULL, - Name STRING(1024), - ) PRIMARY KEY (SingerId) + UPDATE contacts SET first_name = 'new-name' WHERE contact_id = 1 + THEN RETURN contact_id, first_name """ ) - conn.commit() - - # executemany sets rowcount to the total modified rows - rows = [(i, f"Singer {i}") for i in range(100)] - cur.executemany( - "INSERT INTO Singers (SingerId, Name) VALUES (%s, %s)", rows[:98] - ) - assert cur.rowcount == 98 - - # execute with INSERT - cur.execute( - "INSERT INTO Singers (SingerId, Name) VALUES (%s, %s), (%s, %s)", - [x for row in rows[98:] for x in row], - ) - assert cur.rowcount == 2 - - # execute with UPDATE - cur.execute("UPDATE Singers SET Name = 'Cher' WHERE SingerId < 25") - assert cur.rowcount == 25 - - # execute with SELECT - cur.execute("SELECT Name FROM Singers WHERE SingerId < 75") - assert len(cur.fetchall()) == 75 - # rowcount is not available for SELECT - assert cur.rowcount == -1 - - # execute with DELETE - cur.execute("DELETE FROM Singers") - assert cur.rowcount == 100 - - # execute with UPDATE matching 0 rows - cur.execute("UPDATE Singers SET Name = 'Cher' WHERE SingerId < 25") - assert cur.rowcount == 0 - - conn.commit() - cur.execute("DROP TABLE Singers") - conn.commit() - finally: - # Delete table - table = dbapi_database.table("Singers") - if table.exists(): - op = dbapi_database.update_ddl(["DROP TABLE Singers"]) - op.result() + assert self._cursor.fetchone() == (1, "new-name") + assert self._cursor.rowcount == 1 + self._conn.commit() - -@pytest.mark.parametrize("autocommit", [False, True]) -@pytest.mark.skipif( - _helpers.USE_EMULATOR, reason="Emulator does not support DML Returning." -) -def test_dml_returning_insert(shared_instance, dbapi_database, autocommit): - conn = Connection(shared_instance, dbapi_database) - conn.autocommit = autocommit - cur = conn.cursor() - cur.execute( - """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (1, 'first-name', 'last-name', 'test.email@example.com') -THEN RETURN contact_id, first_name - """ + @pytest.mark.parametrize("autocommit", [False, True]) + @pytest.mark.skipif( + _helpers.USE_EMULATOR, reason="Emulator does not support DML Returning." ) - assert cur.fetchone() == (1, "first-name") - assert cur.rowcount == 1 - conn.commit() - - -@pytest.mark.parametrize("autocommit", [False, True]) -@pytest.mark.skipif( - _helpers.USE_EMULATOR, reason="Emulator does not support DML Returning." -) -def test_dml_returning_update(shared_instance, dbapi_database, autocommit): - conn = Connection(shared_instance, dbapi_database) - conn.autocommit = autocommit - cur = conn.cursor() - cur.execute( - """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (1, 'first-name', 'last-name', 'test.email@example.com') - """ - ) - assert cur.rowcount == 1 - cur.execute( - """ -UPDATE contacts SET first_name = 'new-name' WHERE contact_id = 1 -THEN RETURN contact_id, first_name - """ - ) - assert cur.fetchone() == (1, "new-name") - assert cur.rowcount == 1 - conn.commit() - - -@pytest.mark.parametrize("autocommit", [False, True]) -@pytest.mark.skipif( - _helpers.USE_EMULATOR, reason="Emulator does not support DML Returning." -) -def test_dml_returning_delete(shared_instance, dbapi_database, autocommit): - conn = Connection(shared_instance, dbapi_database) - conn.autocommit = autocommit - cur = conn.cursor() - cur.execute( - """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (1, 'first-name', 'last-name', 'test.email@example.com') - """ - ) - assert cur.rowcount == 1 - cur.execute( - """ -DELETE FROM contacts WHERE contact_id = 1 -THEN RETURN contact_id, first_name - """ - ) - assert cur.fetchone() == (1, "first-name") - assert cur.rowcount == 1 - conn.commit() - - -def _execute_common_precommit_statements(cursor: Cursor): - # execute several DML statements within one transaction - cursor.execute( - """ - INSERT INTO contacts (contact_id, first_name, last_name, email) - VALUES (1, 'first-name', 'last-name', 'test.email@domen.ru') - """ - ) - cursor.execute( - """ - UPDATE contacts - SET first_name = 'updated-first-name' - WHERE first_name = 'first-name' - """ - ) - cursor.execute( + def test_dml_returning_delete(self, autocommit): + self._conn.autocommit = autocommit + self._cursor.execute( + """ + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (1, 'first-name', 'last-name', 'test.email@example.com') """ - UPDATE contacts - SET email = 'test.email_updated@domen.ru' - WHERE email = 'test.email@domen.ru' + ) + assert self._cursor.rowcount == 1 + self._cursor.execute( + """ + DELETE FROM contacts WHERE contact_id = 1 + THEN RETURN contact_id, first_name """ - ) - return ( - 1, - "updated-first-name", - "last-name", - "test.email_updated@domen.ru", - ) + ) + assert self._cursor.fetchone() == (1, "first-name") + assert self._cursor.rowcount == 1 + self._conn.commit() From 5fb56105ddd83b6394c49cd55af66ad1db554491 Mon Sep 17 00:00:00 2001 From: ankiaga Date: Fri, 24 Nov 2023 11:04:00 +0530 Subject: [PATCH 2/9] Not using autouse fixtures for few tests where not needed --- tests/system/test_dbapi.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/tests/system/test_dbapi.py b/tests/system/test_dbapi.py index 73356fb324..ada21fef2c 100644 --- a/tests/system/test_dbapi.py +++ b/tests/system/test_dbapi.py @@ -71,12 +71,14 @@ def dbapi_database(self, raw_database): raw_database.run_in_transaction(self.clear_table) @pytest.fixture(autouse=True) - def init_connection(self, shared_instance, dbapi_database): - self._conn = Connection(shared_instance, dbapi_database) - self._cursor = self._conn.cursor() + def init_connection(self, request, shared_instance, dbapi_database): + if "noautofixt" not in request.keywords: + self._conn = Connection(shared_instance, dbapi_database) + self._cursor = self._conn.cursor() yield - self._cursor.close() - self._conn.close() + if "noautofixt" not in request.keywords: + self._cursor.close() + self._conn.close() @pytest.fixture def execute_common_statements(self): @@ -199,6 +201,7 @@ def test_autocommit_mode_change(self): assert got_rows == [want_row] + @pytest.mark.noautofixt def test_rollback_on_connection_closing(self, shared_instance, dbapi_database): """ When closing a connection all the pending transactions @@ -303,6 +306,7 @@ def test_execute_many(self): assert res[0] == 1 + @pytest.mark.noautofixt def test_DDL_autocommit(self, shared_instance, dbapi_database): """Check that DDLs in autocommit mode are immediately executed.""" @@ -528,6 +532,7 @@ def test_json_array(self, dbapi_database): op = dbapi_database.update_ddl(["DROP TABLE JsonDetails"]) op.result() + @pytest.mark.noautofixt def test_DDL_commit(self, shared_instance, dbapi_database): """Check that DDLs in commit mode are executed on calling `commit()`.""" try: @@ -563,6 +568,7 @@ def test_ping(self): """Check connection validation method.""" self._conn.validate() + @pytest.mark.noautofixt def test_user_agent(self, shared_instance, dbapi_database): """Check that DB API uses an appropriate user agent.""" conn = connect(shared_instance.name, dbapi_database.name) From 3e80473f5af46f8fd839e90948d6cc7bb8c34449 Mon Sep 17 00:00:00 2001 From: ankiaga Date: Thu, 23 Nov 2023 18:13:52 +0530 Subject: [PATCH 3/9] feat: Implementation for Begin and Rollback clientside statements --- .../client_side_statement_executor.py | 7 ++ .../client_side_statement_parser.py | 10 ++ google/cloud/spanner_dbapi/connection.py | 48 ++++++-- google/cloud/spanner_dbapi/cursor.py | 14 +-- .../cloud/spanner_dbapi/parsed_statement.py | 1 + tests/system/test_dbapi.py | 103 +++++++++++++++--- tests/unit/spanner_dbapi/test_connection.py | 48 +++++++- tests/unit/spanner_dbapi/test_parse_utils.py | 6 + 8 files changed, 198 insertions(+), 39 deletions(-) diff --git a/google/cloud/spanner_dbapi/client_side_statement_executor.py b/google/cloud/spanner_dbapi/client_side_statement_executor.py index f65e8ada1a..e75e3a611f 100644 --- a/google/cloud/spanner_dbapi/client_side_statement_executor.py +++ b/google/cloud/spanner_dbapi/client_side_statement_executor.py @@ -22,8 +22,15 @@ def execute(connection, parsed_statement: ParsedStatement): It is an internal method that can make backwards-incompatible changes. + :type connection: Connection + :param connection: Connection object of the dbApi + :type parsed_statement: ParsedStatement :param parsed_statement: parsed_statement based on the sql query """ if parsed_statement.client_side_statement_type == ClientSideStatementType.COMMIT: return connection.commit() + if parsed_statement.client_side_statement_type == ClientSideStatementType.BEGIN: + return connection.begin() + if parsed_statement.client_side_statement_type == ClientSideStatementType.ROLLBACK: + return connection.rollback() diff --git a/google/cloud/spanner_dbapi/client_side_statement_parser.py b/google/cloud/spanner_dbapi/client_side_statement_parser.py index e93b71f3e1..ce1474e809 100644 --- a/google/cloud/spanner_dbapi/client_side_statement_parser.py +++ b/google/cloud/spanner_dbapi/client_side_statement_parser.py @@ -20,7 +20,9 @@ ClientSideStatementType, ) +RE_BEGIN = re.compile(r"^\s*(BEGIN|START)(TRANSACTION)?", re.IGNORECASE) RE_COMMIT = re.compile(r"^\s*(COMMIT)(TRANSACTION)?", re.IGNORECASE) +RE_ROLLBACK = re.compile(r"^\s*(ROLLBACK)(TRANSACTION)?", re.IGNORECASE) def parse_stmt(query): @@ -39,4 +41,12 @@ def parse_stmt(query): return ParsedStatement( StatementType.CLIENT_SIDE, query, ClientSideStatementType.COMMIT ) + if RE_BEGIN.match(query): + return ParsedStatement( + StatementType.CLIENT_SIDE, query, ClientSideStatementType.BEGIN + ) + if RE_ROLLBACK.match(query): + return ParsedStatement( + StatementType.CLIENT_SIDE, query, ClientSideStatementType.ROLLBACK + ) return None diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index efbdc80f3f..2ebb7d4eab 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -34,7 +34,9 @@ from google.rpc.code_pb2 import ABORTED -AUTOCOMMIT_MODE_WARNING = "This method is non-operational in autocommit mode" +TRANSACTION_NOT_BEGUN_WARNING = ( + "This method is non-operational as transaction has not begun" +) MAX_INTERNAL_RETRIES = 50 @@ -104,6 +106,7 @@ def __init__(self, instance, database=None, read_only=False): self._read_only = read_only self._staleness = None self.request_priority = None + self._transaction_begin_marked = False @property def autocommit(self): @@ -141,7 +144,7 @@ def inside_transaction(self): """Flag: transaction is started. Returns: - bool: True if transaction begun, False otherwise. + bool: True if transaction started, False otherwise. """ return ( self._transaction @@ -149,6 +152,15 @@ def inside_transaction(self): and not self._transaction.rolled_back ) + @property + def transaction_begun(self): + """Flag: transaction has begun + + Returns: + bool: True if transaction begun, False otherwise. + """ + return (not self._autocommit) or self._transaction_begin_marked + @property def instance(self): """Instance to which this connection relates. @@ -333,12 +345,10 @@ def transaction_checkout(self): Begin a new transaction, if there is no transaction in this connection yet. Return the begun one otherwise. - The method is non operational in autocommit mode. - :rtype: :class:`google.cloud.spanner_v1.transaction.Transaction` :returns: A Cloud Spanner transaction object, ready to use. """ - if not self.autocommit: + if self.transaction_begun: if not self.inside_transaction: self._transaction = self._session_checkout().transaction() self._transaction.begin() @@ -354,7 +364,7 @@ def snapshot_checkout(self): :rtype: :class:`google.cloud.spanner_v1.snapshot.Snapshot` :returns: A Cloud Spanner snapshot object, ready to use. """ - if self.read_only and not self.autocommit: + if self.read_only and self.transaction_begun: if not self._snapshot: self._snapshot = Snapshot( self._session_checkout(), multi_use=True, **self.staleness @@ -377,6 +387,22 @@ def close(self): self.is_closed = True + @check_not_closed + def begin(self): + """ + Marks the transaction as started. + + :raises: :class:`InterfaceError`: if this connection is closed. + :raises: :class:`OperationalError`: if there is an existing transaction that has begin or is running + """ + if self._transaction_begin_marked: + raise OperationalError("A transaction has already begun") + if self.inside_transaction: + raise OperationalError( + "Beginning a new transaction is not allowed when a transaction is already running" + ) + self._transaction_begin_marked = True + def commit(self): """Commits any pending transaction to the database. @@ -386,8 +412,8 @@ def commit(self): raise ValueError("Database needs to be passed for this operation") self._snapshot = None - if self._autocommit: - warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2) + if not self.transaction_begun: + warnings.warn(TRANSACTION_NOT_BEGUN_WARNING, UserWarning, stacklevel=2) return self.run_prior_DDL_statements() @@ -398,6 +424,7 @@ def commit(self): self._release_session() self._statements = [] + self._transaction_begin_marked = False except Aborted: self.retry_transaction() self.commit() @@ -410,14 +437,15 @@ def rollback(self): """ self._snapshot = None - if self._autocommit: - warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2) + if not self.transaction_begun: + warnings.warn(TRANSACTION_NOT_BEGUN_WARNING, UserWarning, stacklevel=2) elif self._transaction: if not self.read_only: self._transaction.rollback() self._release_session() self._statements = [] + self._transaction_begin_marked = False @check_not_closed def cursor(self): diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index 95d20f5730..8b1a02b06a 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -250,7 +250,7 @@ def execute(self, sql, args=None): ) if parsed_statement.statement_type == StatementType.DDL: self._batch_DDLs(sql) - if self.connection.autocommit: + if not self.connection.transaction_begun: self.connection.run_prior_DDL_statements() return @@ -264,7 +264,7 @@ def execute(self, sql, args=None): sql, args = sql_pyformat_args_to_spanner(sql, args or None) - if not self.connection.autocommit: + if self.connection.transaction_begun: statement = Statement( sql, args, @@ -348,7 +348,7 @@ def executemany(self, operation, seq_of_params): ) statements.append((sql, params, get_param_types(params))) - if self.connection.autocommit: + if not self.connection.transaction_begun: self.connection.database.run_in_transaction( self._do_batch_update, statements, many_result_set ) @@ -396,7 +396,7 @@ def fetchone(self): sequence, or None when no more data is available.""" try: res = next(self) - if not self.connection.autocommit and not self.connection.read_only: + if self.connection.transaction_begun and not self.connection.read_only: self._checksum.consume_result(res) return res except StopIteration: @@ -414,7 +414,7 @@ def fetchall(self): res = [] try: for row in self: - if not self.connection.autocommit and not self.connection.read_only: + if self.connection.transaction_begun and not self.connection.read_only: self._checksum.consume_result(row) res.append(row) except Aborted: @@ -443,7 +443,7 @@ def fetchmany(self, size=None): for _ in range(size): try: res = next(self) - if not self.connection.autocommit and not self.connection.read_only: + if self.connection.transaction_begun and not self.connection.read_only: self._checksum.consume_result(res) items.append(res) except StopIteration: @@ -473,7 +473,7 @@ def _handle_DQL(self, sql, params): if self.connection.database is None: raise ValueError("Database needs to be passed for this operation") sql, params = parse_utils.sql_pyformat_args_to_spanner(sql, params) - if self.connection.read_only and not self.connection.autocommit: + if self.connection.read_only and self.connection.transaction_begun: # initiate or use the existing multi-use snapshot self._handle_DQL_with_snapshot( self.connection.snapshot_checkout(), sql, params diff --git a/google/cloud/spanner_dbapi/parsed_statement.py b/google/cloud/spanner_dbapi/parsed_statement.py index c36bc1d81c..28705b69ed 100644 --- a/google/cloud/spanner_dbapi/parsed_statement.py +++ b/google/cloud/spanner_dbapi/parsed_statement.py @@ -27,6 +27,7 @@ class StatementType(Enum): class ClientSideStatementType(Enum): COMMIT = 1 BEGIN = 2 + ROLLBACK = 3 @dataclass diff --git a/tests/system/test_dbapi.py b/tests/system/test_dbapi.py index ada21fef2c..a114166696 100644 --- a/tests/system/test_dbapi.py +++ b/tests/system/test_dbapi.py @@ -22,7 +22,7 @@ from google.cloud._helpers import UTC from google.cloud.spanner_dbapi.connection import Connection, connect -from google.cloud.spanner_dbapi.exceptions import ProgrammingError +from google.cloud.spanner_dbapi.exceptions import ProgrammingError, OperationalError from google.cloud.spanner_v1 import JsonObject from google.cloud.spanner_v1 import gapic_version as package_version from . import _helpers @@ -80,32 +80,28 @@ def init_connection(self, request, shared_instance, dbapi_database): self._cursor.close() self._conn.close() - @pytest.fixture - def execute_common_statements(self): + def _execute_common_statements(self, cursor): # execute several DML statements within one transaction - self._cursor.execute( + cursor.execute( """ INSERT INTO contacts (contact_id, first_name, last_name, email) VALUES (1, 'first-name', 'last-name', 'test.email@domen.ru') """ ) - self._cursor.execute( + cursor.execute( """ UPDATE contacts SET first_name = 'updated-first-name' WHERE first_name = 'first-name' """ ) - self._cursor.execute( + cursor.execute( """ UPDATE contacts SET email = 'test.email_updated@domen.ru' WHERE email = 'test.email@domen.ru' """ ) - - @pytest.fixture - def updated_row(self, execute_common_statements): return ( 1, "updated-first-name", @@ -113,9 +109,14 @@ def updated_row(self, execute_common_statements): "test.email_updated@domen.ru", ) - def test_commit(self, updated_row): + @pytest.mark.parametrize("client_side", [False, True]) + def test_commit(self, client_side): """Test committing a transaction with several statements.""" - self._conn.commit() + updated_row = self._execute_common_statements(self._cursor) + if client_side: + self._cursor.execute("""COMMIT""") + else: + self._conn.commit() # read the resulting data from the database self._cursor.execute("SELECT * FROM contacts") @@ -124,18 +125,80 @@ def test_commit(self, updated_row): assert got_rows == [updated_row] - def test_commit_client_side(self, updated_row): - """Test committing a transaction with several statements.""" - self._cursor.execute("""COMMIT""") + @pytest.mark.noautofixt + def test_begin_client_side(self, shared_instance, dbapi_database): + """Test beginning a transaction using client side statement, + where connection is in autocommit mode.""" + + conn1 = Connection(shared_instance, dbapi_database) + conn1.autocommit = True + cursor1 = conn1.cursor() + cursor1.execute("begin transaction") + updated_row = self._execute_common_statements(cursor1) + + # As the connection conn1 is not committed a new connection wont see its results + conn2 = Connection(shared_instance, dbapi_database) + cursor2 = conn2.cursor() + cursor2.execute("SELECT * FROM contacts") + conn2.commit() + got_rows = cursor2.fetchall() + assert got_rows != [updated_row] + + assert conn1._transaction_begin_marked is True + conn1.commit() + assert conn1._transaction_begin_marked is False + + # As the connection conn1 is committed a new connection should see its results + conn3 = Connection(shared_instance, dbapi_database) + cursor3 = conn3.cursor() + cursor3.execute("SELECT * FROM contacts") + conn3.commit() + got_rows = cursor3.fetchall() + assert got_rows == [updated_row] - # read the resulting data from the database + conn1.close() + conn2.close() + conn3.close() + cursor1.close() + cursor2.close() + cursor3.close() + + def test_begin_success_post_commit(self): + """Test beginning a new transaction post commiting an existing transaction + is possible on a connection, when connection is in autocommit mode.""" + want_row = (2, "first-name", "last-name", "test.email@domen.ru") + self._conn.autocommit = True + self._cursor.execute("begin transaction") + self._cursor.execute( + """ + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (2, 'first-name', 'last-name', 'test.email@domen.ru') + """ + ) + self._conn.commit() + + self._cursor.execute("begin transaction") self._cursor.execute("SELECT * FROM contacts") got_rows = self._cursor.fetchall() self._conn.commit() + assert got_rows == [want_row] - assert got_rows == [updated_row] + def test_begin_error_before_commit(self): + """Test beginning a new transaction before commiting an existing transaction is not possible on a connection, when connection is in autocommit mode.""" + self._conn.autocommit = True + self._cursor.execute("begin transaction") + self._cursor.execute( + """ + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (2, 'first-name', 'last-name', 'test.email@domen.ru') + """ + ) + + with pytest.raises(OperationalError): + self._cursor.execute("begin transaction") - def test_rollback(self): + @pytest.mark.parametrize("client_side", [False, True]) + def test_rollback(self, client_side): """Test rollbacking a transaction with several statements.""" want_row = (2, "first-name", "last-name", "test.email@domen.ru") @@ -162,7 +225,11 @@ def test_rollback(self): WHERE email = 'test.email@domen.ru' """ ) - self._conn.rollback() + + if client_side: + self._cursor.execute("ROLLBACK") + else: + self._conn.rollback() # read the resulting data from the database self._cursor.execute("SELECT * FROM contacts") diff --git a/tests/unit/spanner_dbapi/test_connection.py b/tests/unit/spanner_dbapi/test_connection.py index 1628f84062..ca25752a3b 100644 --- a/tests/unit/spanner_dbapi/test_connection.py +++ b/tests/unit/spanner_dbapi/test_connection.py @@ -280,7 +280,7 @@ def test_close(self, mock_client): @mock.patch.object(warnings, "warn") def test_commit(self, mock_warn): from google.cloud.spanner_dbapi import Connection - from google.cloud.spanner_dbapi.connection import AUTOCOMMIT_MODE_WARNING + from google.cloud.spanner_dbapi.connection import TRANSACTION_NOT_BEGUN_WARNING connection = Connection(INSTANCE, DATABASE) @@ -307,7 +307,7 @@ def test_commit(self, mock_warn): connection._autocommit = True connection.commit() mock_warn.assert_called_once_with( - AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2 + TRANSACTION_NOT_BEGUN_WARNING, UserWarning, stacklevel=2 ) def test_commit_database_error(self): @@ -321,7 +321,7 @@ def test_commit_database_error(self): @mock.patch.object(warnings, "warn") def test_rollback(self, mock_warn): from google.cloud.spanner_dbapi import Connection - from google.cloud.spanner_dbapi.connection import AUTOCOMMIT_MODE_WARNING + from google.cloud.spanner_dbapi.connection import TRANSACTION_NOT_BEGUN_WARNING connection = Connection(INSTANCE, DATABASE) @@ -348,7 +348,7 @@ def test_rollback(self, mock_warn): connection._autocommit = True connection.rollback() mock_warn.assert_called_once_with( - AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2 + TRANSACTION_NOT_BEGUN_WARNING, UserWarning, stacklevel=2 ) @mock.patch("google.cloud.spanner_v1.database.Database", autospec=True) @@ -385,6 +385,46 @@ def test_as_context_manager(self): self.assertTrue(connection.is_closed) + def test_begin_cursor_closed(self): + from google.cloud.spanner_dbapi.exceptions import InterfaceError + + connection = self._make_connection() + connection.close() + + with self.assertRaises(InterfaceError): + connection.begin() + + self.assertEqual(connection._transaction_begin_marked, False) + + def test_begin_transaction_begin_marked(self): + from google.cloud.spanner_dbapi.exceptions import OperationalError + + connection = self._make_connection() + connection._transaction_begin_marked = True + + with self.assertRaises(OperationalError): + connection.begin() + + def test_begin_inside_transaction(self): + from google.cloud.spanner_dbapi.exceptions import OperationalError + + connection = self._make_connection() + mock_transaction = mock.MagicMock() + mock_transaction.committed = mock_transaction.rolled_back = False + connection._transaction = mock_transaction + + with self.assertRaises(OperationalError): + connection.begin() + + self.assertEqual(connection._transaction_begin_marked, False) + + def test_begin(self): + connection = self._make_connection() + + connection.begin() + + self.assertEqual(connection._transaction_begin_marked, True) + def test_run_statement_wo_retried(self): """Check that Connection remembers executed statements.""" from google.cloud.spanner_dbapi.checksum import ResultsChecksum diff --git a/tests/unit/spanner_dbapi/test_parse_utils.py b/tests/unit/spanner_dbapi/test_parse_utils.py index 162535349f..06819c3a3d 100644 --- a/tests/unit/spanner_dbapi/test_parse_utils.py +++ b/tests/unit/spanner_dbapi/test_parse_utils.py @@ -53,6 +53,12 @@ def test_classify_stmt(self): ("CREATE ROLE parent", StatementType.DDL), ("commit", StatementType.CLIENT_SIDE), (" commit TRANSACTION ", StatementType.CLIENT_SIDE), + ("begin", StatementType.CLIENT_SIDE), + ("start", StatementType.CLIENT_SIDE), + ("begin transaction", StatementType.CLIENT_SIDE), + ("start transaction", StatementType.CLIENT_SIDE), + ("rollback", StatementType.CLIENT_SIDE), + (" rollback TRANSACTION ", StatementType.CLIENT_SIDE), ("GRANT SELECT ON TABLE Singers TO ROLE parent", StatementType.DDL), ("REVOKE SELECT ON TABLE Singers TO ROLE parent", StatementType.DDL), ("GRANT ROLE parent TO ROLE child", StatementType.DDL), From 6318ce9445f2097edd7534e0b88ac845004fcb48 Mon Sep 17 00:00:00 2001 From: ankiaga Date: Wed, 29 Nov 2023 17:36:07 +0530 Subject: [PATCH 4/9] Incorporating comments --- google/cloud/spanner_dbapi/connection.py | 41 +++++++++++---------- google/cloud/spanner_dbapi/cursor.py | 14 +++---- tests/system/test_dbapi.py | 13 +++---- tests/unit/spanner_dbapi/test_connection.py | 36 ++++++++---------- 4 files changed, 51 insertions(+), 53 deletions(-) diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index 2ebb7d4eab..526a480c04 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -34,7 +34,7 @@ from google.rpc.code_pb2 import ABORTED -TRANSACTION_NOT_BEGUN_WARNING = ( +CLIENT_TRANSACTION_NOT_STARTED_WARNING = ( "This method is non-operational as transaction has not begun" ) MAX_INTERNAL_RETRIES = 50 @@ -125,7 +125,7 @@ def autocommit(self, value): :type value: bool :param value: New autocommit mode state. """ - if value and not self._autocommit and self.inside_transaction: + if value and not self._autocommit and self.spanner_transaction_started: self.commit() self._autocommit = value @@ -140,11 +140,14 @@ def database(self): return self._database @property - def inside_transaction(self): - """Flag: transaction is started. + def spanner_transaction_started(self): + """Flag: whether transaction started at SpanFE. This means that we had + made atleast one call to SpanFE. Property client_transaction_started + would always be true if this is true as transaction has to start first + at clientside than at Spanner (SpanFE) Returns: - bool: True if transaction started, False otherwise. + bool: True if SpanFE transaction started, False otherwise. """ return ( self._transaction @@ -153,8 +156,8 @@ def inside_transaction(self): ) @property - def transaction_begun(self): - """Flag: transaction has begun + def client_transaction_started(self): + """Flag: whether transaction started at client side. Returns: bool: True if transaction begun, False otherwise. @@ -187,7 +190,7 @@ def read_only(self, value): Args: value (bool): True for ReadOnly mode, False for ReadWrite. """ - if self.inside_transaction: + if self.spanner_transaction_started: raise ValueError( "Connection read/write mode can't be changed while a transaction is in progress. " "Commit or rollback the current transaction and try again." @@ -225,7 +228,7 @@ def staleness(self, value): Args: value (dict): Staleness type and value. """ - if self.inside_transaction: + if self.spanner_transaction_started: raise ValueError( "`staleness` option can't be changed while a transaction is in progress. " "Commit or rollback the current transaction and try again." @@ -348,8 +351,8 @@ def transaction_checkout(self): :rtype: :class:`google.cloud.spanner_v1.transaction.Transaction` :returns: A Cloud Spanner transaction object, ready to use. """ - if self.transaction_begun: - if not self.inside_transaction: + if self.client_transaction_started: + if not self.spanner_transaction_started: self._transaction = self._session_checkout().transaction() self._transaction.begin() @@ -364,7 +367,7 @@ def snapshot_checkout(self): :rtype: :class:`google.cloud.spanner_v1.snapshot.Snapshot` :returns: A Cloud Spanner snapshot object, ready to use. """ - if self.read_only and self.transaction_begun: + if self.read_only and self.client_transaction_started: if not self._snapshot: self._snapshot = Snapshot( self._session_checkout(), multi_use=True, **self.staleness @@ -379,7 +382,7 @@ def close(self): The connection will be unusable from this point forward. If the connection has an active transaction, it will be rolled back. """ - if self.inside_transaction: + if self.spanner_transaction_started: self._transaction.rollback() if self._own_pool and self.database: @@ -397,7 +400,7 @@ def begin(self): """ if self._transaction_begin_marked: raise OperationalError("A transaction has already begun") - if self.inside_transaction: + if self.spanner_transaction_started: raise OperationalError( "Beginning a new transaction is not allowed when a transaction is already running" ) @@ -412,12 +415,12 @@ def commit(self): raise ValueError("Database needs to be passed for this operation") self._snapshot = None - if not self.transaction_begun: - warnings.warn(TRANSACTION_NOT_BEGUN_WARNING, UserWarning, stacklevel=2) + if not self.client_transaction_started: + warnings.warn(CLIENT_TRANSACTION_NOT_STARTED_WARNING, UserWarning, stacklevel=2) return self.run_prior_DDL_statements() - if self.inside_transaction: + if self.spanner_transaction_started: try: if not self.read_only: self._transaction.commit() @@ -437,8 +440,8 @@ def rollback(self): """ self._snapshot = None - if not self.transaction_begun: - warnings.warn(TRANSACTION_NOT_BEGUN_WARNING, UserWarning, stacklevel=2) + if not self.client_transaction_started: + warnings.warn(CLIENT_TRANSACTION_NOT_STARTED_WARNING, UserWarning, stacklevel=2) elif self._transaction: if not self.read_only: self._transaction.rollback() diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index 8b1a02b06a..4ca7601a88 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -250,7 +250,7 @@ def execute(self, sql, args=None): ) if parsed_statement.statement_type == StatementType.DDL: self._batch_DDLs(sql) - if not self.connection.transaction_begun: + if not self.connection.client_transaction_started: self.connection.run_prior_DDL_statements() return @@ -264,7 +264,7 @@ def execute(self, sql, args=None): sql, args = sql_pyformat_args_to_spanner(sql, args or None) - if self.connection.transaction_begun: + if self.connection.client_transaction_started: statement = Statement( sql, args, @@ -348,7 +348,7 @@ def executemany(self, operation, seq_of_params): ) statements.append((sql, params, get_param_types(params))) - if not self.connection.transaction_begun: + if not self.connection.client_transaction_started: self.connection.database.run_in_transaction( self._do_batch_update, statements, many_result_set ) @@ -396,7 +396,7 @@ def fetchone(self): sequence, or None when no more data is available.""" try: res = next(self) - if self.connection.transaction_begun and not self.connection.read_only: + if self.connection.client_transaction_started and not self.connection.read_only: self._checksum.consume_result(res) return res except StopIteration: @@ -414,7 +414,7 @@ def fetchall(self): res = [] try: for row in self: - if self.connection.transaction_begun and not self.connection.read_only: + if self.connection.client_transaction_started and not self.connection.read_only: self._checksum.consume_result(row) res.append(row) except Aborted: @@ -443,7 +443,7 @@ def fetchmany(self, size=None): for _ in range(size): try: res = next(self) - if self.connection.transaction_begun and not self.connection.read_only: + if self.connection.client_transaction_started and not self.connection.read_only: self._checksum.consume_result(res) items.append(res) except StopIteration: @@ -473,7 +473,7 @@ def _handle_DQL(self, sql, params): if self.connection.database is None: raise ValueError("Database needs to be passed for this operation") sql, params = parse_utils.sql_pyformat_args_to_spanner(sql, params) - if self.connection.read_only and self.connection.transaction_begun: + if self.connection.read_only and self.connection.client_transaction_started: # initiate or use the existing multi-use snapshot self._handle_DQL_with_snapshot( self.connection.snapshot_checkout(), sql, params diff --git a/tests/system/test_dbapi.py b/tests/system/test_dbapi.py index a114166696..637ea79535 100644 --- a/tests/system/test_dbapi.py +++ b/tests/system/test_dbapi.py @@ -142,11 +142,15 @@ def test_begin_client_side(self, shared_instance, dbapi_database): cursor2.execute("SELECT * FROM contacts") conn2.commit() got_rows = cursor2.fetchall() + cursor2.close() + conn2.close() assert got_rows != [updated_row] assert conn1._transaction_begin_marked is True conn1.commit() assert conn1._transaction_begin_marked is False + cursor1.close() + conn1.close() # As the connection conn1 is committed a new connection should see its results conn3 = Connection(shared_instance, dbapi_database) @@ -154,14 +158,9 @@ def test_begin_client_side(self, shared_instance, dbapi_database): cursor3.execute("SELECT * FROM contacts") conn3.commit() got_rows = cursor3.fetchall() - assert got_rows == [updated_row] - - conn1.close() - conn2.close() - conn3.close() - cursor1.close() - cursor2.close() cursor3.close() + conn3.close() + assert got_rows == [updated_row] def test_begin_success_post_commit(self): """Test beginning a new transaction post commiting an existing transaction diff --git a/tests/unit/spanner_dbapi/test_connection.py b/tests/unit/spanner_dbapi/test_connection.py index ca25752a3b..f4756bc27f 100644 --- a/tests/unit/spanner_dbapi/test_connection.py +++ b/tests/unit/spanner_dbapi/test_connection.py @@ -19,6 +19,7 @@ import unittest import warnings import pytest +from google.cloud.spanner_dbapi.exceptions import InterfaceError, OperationalError PROJECT = "test-project" INSTANCE = "test-instance" @@ -36,6 +37,10 @@ class _CredentialsWithScopes(credentials.Credentials, credentials.Scoped): class TestConnection(unittest.TestCase): + + def setUp(self): + self._under_test = self._make_connection() + def _get_client_info(self): from google.api_core.gapic_v1.client_info import ClientInfo @@ -280,7 +285,7 @@ def test_close(self, mock_client): @mock.patch.object(warnings, "warn") def test_commit(self, mock_warn): from google.cloud.spanner_dbapi import Connection - from google.cloud.spanner_dbapi.connection import TRANSACTION_NOT_BEGUN_WARNING + from google.cloud.spanner_dbapi.connection import CLIENT_TRANSACTION_NOT_STARTED_WARNING connection = Connection(INSTANCE, DATABASE) @@ -307,7 +312,7 @@ def test_commit(self, mock_warn): connection._autocommit = True connection.commit() mock_warn.assert_called_once_with( - TRANSACTION_NOT_BEGUN_WARNING, UserWarning, stacklevel=2 + CLIENT_TRANSACTION_NOT_STARTED_WARNING, UserWarning, stacklevel=2 ) def test_commit_database_error(self): @@ -321,7 +326,7 @@ def test_commit_database_error(self): @mock.patch.object(warnings, "warn") def test_rollback(self, mock_warn): from google.cloud.spanner_dbapi import Connection - from google.cloud.spanner_dbapi.connection import TRANSACTION_NOT_BEGUN_WARNING + from google.cloud.spanner_dbapi.connection import CLIENT_TRANSACTION_NOT_STARTED_WARNING connection = Connection(INSTANCE, DATABASE) @@ -348,7 +353,7 @@ def test_rollback(self, mock_warn): connection._autocommit = True connection.rollback() mock_warn.assert_called_once_with( - TRANSACTION_NOT_BEGUN_WARNING, UserWarning, stacklevel=2 + CLIENT_TRANSACTION_NOT_STARTED_WARNING, UserWarning, stacklevel=2 ) @mock.patch("google.cloud.spanner_v1.database.Database", autospec=True) @@ -386,7 +391,6 @@ def test_as_context_manager(self): self.assertTrue(connection.is_closed) def test_begin_cursor_closed(self): - from google.cloud.spanner_dbapi.exceptions import InterfaceError connection = self._make_connection() connection.close() @@ -397,33 +401,25 @@ def test_begin_cursor_closed(self): self.assertEqual(connection._transaction_begin_marked, False) def test_begin_transaction_begin_marked(self): - from google.cloud.spanner_dbapi.exceptions import OperationalError - - connection = self._make_connection() - connection._transaction_begin_marked = True + self._under_test._transaction_begin_marked = True with self.assertRaises(OperationalError): - connection.begin() + self._under_test.begin() def test_begin_inside_transaction(self): - from google.cloud.spanner_dbapi.exceptions import OperationalError - - connection = self._make_connection() mock_transaction = mock.MagicMock() mock_transaction.committed = mock_transaction.rolled_back = False - connection._transaction = mock_transaction + self._under_test._transaction = mock_transaction with self.assertRaises(OperationalError): - connection.begin() + self._under_test.begin() - self.assertEqual(connection._transaction_begin_marked, False) + self.assertEqual(self._under_test._transaction_begin_marked, False) def test_begin(self): - connection = self._make_connection() - - connection.begin() + self._under_test.begin() - self.assertEqual(connection._transaction_begin_marked, True) + self.assertEqual(self._under_test._transaction_begin_marked, True) def test_run_statement_wo_retried(self): """Check that Connection remembers executed statements.""" From df707d7d5980edeb49229328c7e9498bf035416a Mon Sep 17 00:00:00 2001 From: ankiaga Date: Wed, 29 Nov 2023 18:16:25 +0530 Subject: [PATCH 5/9] Formatting --- google/cloud/spanner_dbapi/connection.py | 8 ++++++-- google/cloud/spanner_dbapi/cursor.py | 15 ++++++++++++--- tests/unit/spanner_dbapi/test_connection.py | 10 ++++++---- 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index 526a480c04..4f08c47c80 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -416,7 +416,9 @@ def commit(self): self._snapshot = None if not self.client_transaction_started: - warnings.warn(CLIENT_TRANSACTION_NOT_STARTED_WARNING, UserWarning, stacklevel=2) + warnings.warn( + CLIENT_TRANSACTION_NOT_STARTED_WARNING, UserWarning, stacklevel=2 + ) return self.run_prior_DDL_statements() @@ -441,7 +443,9 @@ def rollback(self): self._snapshot = None if not self.client_transaction_started: - warnings.warn(CLIENT_TRANSACTION_NOT_STARTED_WARNING, UserWarning, stacklevel=2) + warnings.warn( + CLIENT_TRANSACTION_NOT_STARTED_WARNING, UserWarning, stacklevel=2 + ) elif self._transaction: if not self.read_only: self._transaction.rollback() diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index 4ca7601a88..1e53ebeeb0 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -396,7 +396,10 @@ def fetchone(self): sequence, or None when no more data is available.""" try: res = next(self) - if self.connection.client_transaction_started and not self.connection.read_only: + if ( + self.connection.client_transaction_started + and not self.connection.read_only + ): self._checksum.consume_result(res) return res except StopIteration: @@ -414,7 +417,10 @@ def fetchall(self): res = [] try: for row in self: - if self.connection.client_transaction_started and not self.connection.read_only: + if ( + self.connection.client_transaction_started + and not self.connection.read_only + ): self._checksum.consume_result(row) res.append(row) except Aborted: @@ -443,7 +449,10 @@ def fetchmany(self, size=None): for _ in range(size): try: res = next(self) - if self.connection.client_transaction_started and not self.connection.read_only: + if ( + self.connection.client_transaction_started + and not self.connection.read_only + ): self._checksum.consume_result(res) items.append(res) except StopIteration: diff --git a/tests/unit/spanner_dbapi/test_connection.py b/tests/unit/spanner_dbapi/test_connection.py index f4756bc27f..586f58d9fb 100644 --- a/tests/unit/spanner_dbapi/test_connection.py +++ b/tests/unit/spanner_dbapi/test_connection.py @@ -37,7 +37,6 @@ class _CredentialsWithScopes(credentials.Credentials, credentials.Scoped): class TestConnection(unittest.TestCase): - def setUp(self): self._under_test = self._make_connection() @@ -285,7 +284,9 @@ def test_close(self, mock_client): @mock.patch.object(warnings, "warn") def test_commit(self, mock_warn): from google.cloud.spanner_dbapi import Connection - from google.cloud.spanner_dbapi.connection import CLIENT_TRANSACTION_NOT_STARTED_WARNING + from google.cloud.spanner_dbapi.connection import ( + CLIENT_TRANSACTION_NOT_STARTED_WARNING, + ) connection = Connection(INSTANCE, DATABASE) @@ -326,7 +327,9 @@ def test_commit_database_error(self): @mock.patch.object(warnings, "warn") def test_rollback(self, mock_warn): from google.cloud.spanner_dbapi import Connection - from google.cloud.spanner_dbapi.connection import CLIENT_TRANSACTION_NOT_STARTED_WARNING + from google.cloud.spanner_dbapi.connection import ( + CLIENT_TRANSACTION_NOT_STARTED_WARNING, + ) connection = Connection(INSTANCE, DATABASE) @@ -391,7 +394,6 @@ def test_as_context_manager(self): self.assertTrue(connection.is_closed) def test_begin_cursor_closed(self): - connection = self._make_connection() connection.close() From 792c0dc11d8954e615585f59979d7edf9e816461 Mon Sep 17 00:00:00 2001 From: ankiaga Date: Thu, 30 Nov 2023 22:56:33 +0530 Subject: [PATCH 6/9] Comments incorporated --- .../client_side_statement_executor.py | 6 +- google/cloud/spanner_dbapi/connection.py | 72 +++++++++++-------- google/cloud/spanner_dbapi/cursor.py | 14 ++-- tests/system/test_dbapi.py | 2 +- tests/unit/spanner_dbapi/test_connection.py | 9 ++- 5 files changed, 59 insertions(+), 44 deletions(-) diff --git a/google/cloud/spanner_dbapi/client_side_statement_executor.py b/google/cloud/spanner_dbapi/client_side_statement_executor.py index e75e3a611f..4ef43e9d74 100644 --- a/google/cloud/spanner_dbapi/client_side_statement_executor.py +++ b/google/cloud/spanner_dbapi/client_side_statement_executor.py @@ -11,13 +11,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from google.cloud.spanner_dbapi import Connection from google.cloud.spanner_dbapi.parsed_statement import ( ParsedStatement, ClientSideStatementType, ) -def execute(connection, parsed_statement: ParsedStatement): +def execute(connection: "Connection", parsed_statement: ParsedStatement): """Executes the client side statements by calling the relevant method. It is an internal method that can make backwards-incompatible changes. diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index 4f08c47c80..a3306b316c 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -35,7 +35,7 @@ CLIENT_TRANSACTION_NOT_STARTED_WARNING = ( - "This method is non-operational as transaction has not begun" + "This method is non-operational as transaction has not started" ) MAX_INTERNAL_RETRIES = 50 @@ -125,7 +125,7 @@ def autocommit(self, value): :type value: bool :param value: New autocommit mode state. """ - if value and not self._autocommit and self.spanner_transaction_started: + if value and not self._autocommit and self._spanner_transaction_started: self.commit() self._autocommit = value @@ -140,27 +140,33 @@ def database(self): return self._database @property - def spanner_transaction_started(self): - """Flag: whether transaction started at SpanFE. This means that we had - made atleast one call to SpanFE. Property client_transaction_started + def _spanner_transaction_started(self): + """Flag: whether transaction started at Spanner. This means that we had + made atleast one call to Spanner. Property client_transaction_started would always be true if this is true as transaction has to start first - at clientside than at Spanner (SpanFE) + at clientside than at Spanner Returns: - bool: True if SpanFE transaction started, False otherwise. + bool: True if Spanner transaction started, False otherwise. """ return ( self._transaction and not self._transaction.committed and not self._transaction.rolled_back - ) + ) or (self._snapshot is not None) + + @property + def inside_transaction(self): + """Deprecated property which won't be supported in future versions. + Please use spanner_transaction_started property instead.""" + return self._spanner_transaction_started @property - def client_transaction_started(self): + def _client_transaction_started(self): """Flag: whether transaction started at client side. Returns: - bool: True if transaction begun, False otherwise. + bool: True if transaction started, False otherwise. """ return (not self._autocommit) or self._transaction_begin_marked @@ -190,7 +196,7 @@ def read_only(self, value): Args: value (bool): True for ReadOnly mode, False for ReadWrite. """ - if self.spanner_transaction_started: + if self._spanner_transaction_started: raise ValueError( "Connection read/write mode can't be changed while a transaction is in progress. " "Commit or rollback the current transaction and try again." @@ -228,7 +234,7 @@ def staleness(self, value): Args: value (dict): Staleness type and value. """ - if self.spanner_transaction_started: + if self._spanner_transaction_started: raise ValueError( "`staleness` option can't be changed while a transaction is in progress. " "Commit or rollback the current transaction and try again." @@ -346,13 +352,16 @@ def transaction_checkout(self): """Get a Cloud Spanner transaction. Begin a new transaction, if there is no transaction in - this connection yet. Return the begun one otherwise. + this connection yet. Return the started one otherwise. + + This method is a no-op if the connection is in autocommit mode and no + explicit transaction has been started :rtype: :class:`google.cloud.spanner_v1.transaction.Transaction` :returns: A Cloud Spanner transaction object, ready to use. """ - if self.client_transaction_started: - if not self.spanner_transaction_started: + if not self.read_only and self._client_transaction_started: + if not self._spanner_transaction_started: self._transaction = self._session_checkout().transaction() self._transaction.begin() @@ -367,7 +376,7 @@ def snapshot_checkout(self): :rtype: :class:`google.cloud.spanner_v1.snapshot.Snapshot` :returns: A Cloud Spanner snapshot object, ready to use. """ - if self.read_only and self.client_transaction_started: + if self.read_only and self._client_transaction_started: if not self._snapshot: self._snapshot = Snapshot( self._session_checkout(), multi_use=True, **self.staleness @@ -382,7 +391,7 @@ def close(self): The connection will be unusable from this point forward. If the connection has an active transaction, it will be rolled back. """ - if self.spanner_transaction_started: + if self._spanner_transaction_started and not self.read_only: self._transaction.rollback() if self._own_pool and self.database: @@ -399,8 +408,8 @@ def begin(self): :raises: :class:`OperationalError`: if there is an existing transaction that has begin or is running """ if self._transaction_begin_marked: - raise OperationalError("A transaction has already begun") - if self.spanner_transaction_started: + raise OperationalError("A transaction has already started") + if self._spanner_transaction_started: raise OperationalError( "Beginning a new transaction is not allowed when a transaction is already running" ) @@ -409,22 +418,23 @@ def begin(self): def commit(self): """Commits any pending transaction to the database. - This method is non-operational in autocommit mode. + This is a no-op if there is no active client transaction. """ if self.database is None: raise ValueError("Database needs to be passed for this operation") - self._snapshot = None - if not self.client_transaction_started: + if not self._client_transaction_started: warnings.warn( CLIENT_TRANSACTION_NOT_STARTED_WARNING, UserWarning, stacklevel=2 ) return self.run_prior_DDL_statements() - if self.spanner_transaction_started: + if self._spanner_transaction_started: try: - if not self.read_only: + if self.read_only: + self._snapshot = None + else: self._transaction.commit() self._release_session() @@ -437,17 +447,19 @@ def commit(self): def rollback(self): """Rolls back any pending transaction. - This is a no-op if there is no active transaction or if the connection - is in autocommit mode. + This is a no-op if there is no active client transaction. """ - self._snapshot = None - if not self.client_transaction_started: + if not self._client_transaction_started: warnings.warn( CLIENT_TRANSACTION_NOT_STARTED_WARNING, UserWarning, stacklevel=2 ) - elif self._transaction: - if not self.read_only: + return + + if self._spanner_transaction_started: + if self.read_only: + self._snapshot = None + else: self._transaction.rollback() self._release_session() diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index 1e53ebeeb0..023149eeb0 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -250,7 +250,7 @@ def execute(self, sql, args=None): ) if parsed_statement.statement_type == StatementType.DDL: self._batch_DDLs(sql) - if not self.connection.client_transaction_started: + if not self.connection._client_transaction_started: self.connection.run_prior_DDL_statements() return @@ -264,7 +264,7 @@ def execute(self, sql, args=None): sql, args = sql_pyformat_args_to_spanner(sql, args or None) - if self.connection.client_transaction_started: + if self.connection._client_transaction_started: statement = Statement( sql, args, @@ -348,7 +348,7 @@ def executemany(self, operation, seq_of_params): ) statements.append((sql, params, get_param_types(params))) - if not self.connection.client_transaction_started: + if not self.connection._client_transaction_started: self.connection.database.run_in_transaction( self._do_batch_update, statements, many_result_set ) @@ -397,7 +397,7 @@ def fetchone(self): try: res = next(self) if ( - self.connection.client_transaction_started + self.connection._client_transaction_started and not self.connection.read_only ): self._checksum.consume_result(res) @@ -418,7 +418,7 @@ def fetchall(self): try: for row in self: if ( - self.connection.client_transaction_started + self.connection._client_transaction_started and not self.connection.read_only ): self._checksum.consume_result(row) @@ -450,7 +450,7 @@ def fetchmany(self, size=None): try: res = next(self) if ( - self.connection.client_transaction_started + self.connection._client_transaction_started and not self.connection.read_only ): self._checksum.consume_result(res) @@ -482,7 +482,7 @@ def _handle_DQL(self, sql, params): if self.connection.database is None: raise ValueError("Database needs to be passed for this operation") sql, params = parse_utils.sql_pyformat_args_to_spanner(sql, params) - if self.connection.read_only and self.connection.client_transaction_started: + if self.connection.read_only and self.connection._client_transaction_started: # initiate or use the existing multi-use snapshot self._handle_DQL_with_snapshot( self.connection.snapshot_checkout(), sql, params diff --git a/tests/system/test_dbapi.py b/tests/system/test_dbapi.py index 637ea79535..ed816f9d9c 100644 --- a/tests/system/test_dbapi.py +++ b/tests/system/test_dbapi.py @@ -716,7 +716,7 @@ def test_rowcount(self, dbapi_database, autocommit): # execute with INSERT self._cursor.execute( "INSERT INTO Singers (SingerId, Name) VALUES (%s, %s), (%s, %s)", - [x for row in rows[98:] for x in row], + (x for row in rows[98:] for x in row), ) assert self._cursor.rowcount == 2 diff --git a/tests/unit/spanner_dbapi/test_connection.py b/tests/unit/spanner_dbapi/test_connection.py index 586f58d9fb..d565f62823 100644 --- a/tests/unit/spanner_dbapi/test_connection.py +++ b/tests/unit/spanner_dbapi/test_connection.py @@ -394,13 +394,12 @@ def test_as_context_manager(self): self.assertTrue(connection.is_closed) def test_begin_cursor_closed(self): - connection = self._make_connection() - connection.close() + self._under_test.close() with self.assertRaises(InterfaceError): - connection.begin() + self._under_test.begin() - self.assertEqual(connection._transaction_begin_marked, False) + self.assertEqual(self._under_test._transaction_begin_marked, False) def test_begin_transaction_begin_marked(self): self._under_test._transaction_begin_marked = True @@ -408,7 +407,7 @@ def test_begin_transaction_begin_marked(self): with self.assertRaises(OperationalError): self._under_test.begin() - def test_begin_inside_transaction(self): + def test_begin_transaction_started(self): mock_transaction = mock.MagicMock() mock_transaction.committed = mock_transaction.rolled_back = False self._under_test._transaction = mock_transaction From 6d5f419b23f02abdd7579d46d3aa460f4b7387ab Mon Sep 17 00:00:00 2001 From: ankiaga Date: Fri, 1 Dec 2023 10:57:21 +0530 Subject: [PATCH 7/9] Fixing tests --- tests/unit/spanner_dbapi/test_connection.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/unit/spanner_dbapi/test_connection.py b/tests/unit/spanner_dbapi/test_connection.py index d565f62823..91b2e3d5e8 100644 --- a/tests/unit/spanner_dbapi/test_connection.py +++ b/tests/unit/spanner_dbapi/test_connection.py @@ -230,6 +230,8 @@ def test_snapshot_checkout(self): session_checkout = mock.MagicMock(autospec=True) connection._session_checkout = session_checkout + release_session = mock.MagicMock() + connection._release_session = release_session snapshot = connection.snapshot_checkout() session_checkout.assert_called_once() @@ -238,6 +240,7 @@ def test_snapshot_checkout(self): connection.commit() self.assertIsNone(connection._snapshot) + release_session.assert_called_once() connection.snapshot_checkout() self.assertIsNotNone(connection._snapshot) @@ -341,6 +344,7 @@ def test_rollback(self, mock_warn): mock_release.assert_not_called() mock_transaction = mock.MagicMock() + mock_transaction.committed = mock_transaction.rolled_back = False connection._transaction = mock_transaction mock_rollback = mock.MagicMock() mock_transaction.rollback = mock_rollback @@ -522,7 +526,8 @@ def test_rollback_clears_statements(self, mock_transaction): cleared, when the transaction is roll backed. """ connection = self._make_connection() - connection._transaction = mock.Mock() + mock_transaction.committed = mock_transaction.rolled_back = False + connection._transaction = mock_transaction connection._statements = [{}, {}] self.assertEqual(len(connection._statements), 2) From c59e46897aa5e022108d03401ee17edc42b5ce96 Mon Sep 17 00:00:00 2001 From: ankiaga Date: Fri, 1 Dec 2023 11:59:08 +0530 Subject: [PATCH 8/9] Small fix --- tests/system/test_dbapi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/test_dbapi.py b/tests/system/test_dbapi.py index ed816f9d9c..637ea79535 100644 --- a/tests/system/test_dbapi.py +++ b/tests/system/test_dbapi.py @@ -716,7 +716,7 @@ def test_rowcount(self, dbapi_database, autocommit): # execute with INSERT self._cursor.execute( "INSERT INTO Singers (SingerId, Name) VALUES (%s, %s), (%s, %s)", - (x for row in rows[98:] for x in row), + [x for row in rows[98:] for x in row], ) assert self._cursor.rowcount == 2 From 70ac65ea128506bfa584676a1f7be956574a4c8e Mon Sep 17 00:00:00 2001 From: ankiaga Date: Sat, 2 Dec 2023 16:24:26 +0530 Subject: [PATCH 9/9] Test fix as emulator was going OOM --- tests/system/test_dbapi.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/tests/system/test_dbapi.py b/tests/system/test_dbapi.py index 637ea79535..26af9e5e0f 100644 --- a/tests/system/test_dbapi.py +++ b/tests/system/test_dbapi.py @@ -136,16 +136,6 @@ def test_begin_client_side(self, shared_instance, dbapi_database): cursor1.execute("begin transaction") updated_row = self._execute_common_statements(cursor1) - # As the connection conn1 is not committed a new connection wont see its results - conn2 = Connection(shared_instance, dbapi_database) - cursor2 = conn2.cursor() - cursor2.execute("SELECT * FROM contacts") - conn2.commit() - got_rows = cursor2.fetchall() - cursor2.close() - conn2.close() - assert got_rows != [updated_row] - assert conn1._transaction_begin_marked is True conn1.commit() assert conn1._transaction_begin_marked is False