From 0a60bda7a32dd7bbd8930a8326070c54a978ba4e Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 19 Jul 2017 17:09:48 -0400 Subject: [PATCH 1/5] Add systets for read/query w/ concurrent updates. --- spanner/tests/_fixtures.py | 4 ++ spanner/tests/system/test_system.py | 85 +++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+) diff --git a/spanner/tests/_fixtures.py b/spanner/tests/_fixtures.py index 1123d03c3f2d..ace9b981b6ec 100644 --- a/spanner/tests/_fixtures.py +++ b/spanner/tests/_fixtures.py @@ -38,6 +38,10 @@ description STRING(16), exactly_hwhen TIMESTAMP) PRIMARY KEY (eye_d); +CREATE TABLE counters ( + name STRING(1024), + value INT64 ) + PRIMARY KEY (name); """ DDL_STATEMENTS = [stmt.strip() for stmt in DDL.split(';') if stmt.strip()] diff --git a/spanner/tests/system/test_system.py b/spanner/tests/system/test_system.py index b4ac62194bb1..1a4b333e5527 100644 --- a/spanner/tests/system/test_system.py +++ b/spanner/tests/system/test_system.py @@ -358,6 +358,11 @@ class TestSessionAPI(unittest.TestCase, _TestData): 'description', 'exactly_hwhen', ) + COUNTERS_TABLE = 'counters' + COUNTERS_COLUMNS = ( + 'name', + 'value', + ) SOME_DATE = datetime.date(2011, 1, 17) SOME_TIME = datetime.datetime(1989, 1, 17, 17, 59, 12, 345612) NANO_TIME = TimestampWithNanoseconds(1995, 8, 31, nanosecond=987654321) @@ -508,6 +513,86 @@ def test_transaction_read_and_insert_or_update_then_commit(self): rows = list(session.read(self.TABLE, self.COLUMNS, self.ALL)) self._check_row_data(rows) + def _transaction_concurrency_helper(self, unit_of_work, pkey): + import threading + INITIAL_VALUE = 123 + + retry = RetryInstanceState(_has_all_ddl) + retry(self._db.reload)() + + session = self._db.session() + session.create() + self.to_delete.append(session) + + with session.batch() as batch: + batch.insert_or_update( + self.COUNTERS_TABLE, + self.COUNTERS_COLUMNS, + [[pkey, INITIAL_VALUE]]) + + # We don't want to run the threads' transactions in the current + # session, which would fail. + txn_sessions = [ + self._db.session(), self._db.session(), self._db.session()] + + for txn_session in txn_sessions: + txn_session.create() + self.to_delete.append(txn_session) + + threads = [ + threading.Thread( + target=txn_session.run_in_transaction, + args=(unit_of_work, pkey)) + for txn_session in txn_sessions] + + for thread in threads: + thread.start() + + for thread in threads: + thread.join() + + keyset = KeySet(keys=[(pkey,)]) + rows = list(session.read( + self.COUNTERS_TABLE, self.COUNTERS_COLUMNS, keyset)) + self.assertEqual(len(rows), 1) + _, value = rows[0] + self.assertEqual(value, INITIAL_VALUE + len(threads)) + + def _read_w_concurrent_update(self, transaction, pkey): + keyset = KeySet(keys=[(pkey,)]) + rows = list(transaction.read( + self.COUNTERS_TABLE, self.COUNTERS_COLUMNS, keyset)) + self.assertEqual(len(rows), 1) + pkey, value = rows[0] + transaction.update( + self.COUNTERS_TABLE, + self.COUNTERS_COLUMNS, + [[pkey, value + 1]]) + + def test_transaction_read_w_concurrent_updates(self): + PKEY = 'read_w_concurrent_updates' + self._transaction_concurrency_helper( + self._read_w_concurrent_update, PKEY) + + def _query_w_concurrent_update(self, transaction, pkey): + SQL = 'SELECT * FROM counters WHERE name = @name' + rows = list(transaction.execute_sql( + SQL, + params={'name': pkey}, + param_types={'name': Type(code=STRING)}, + )) + self.assertEqual(len(rows), 1) + pkey, value = rows[0] + transaction.update( + self.COUNTERS_TABLE, + self.COUNTERS_COLUMNS, + [[pkey, value + 1]]) + + def test_transaction_query_w_concurrent_updates(self): + PKEY = 'query_w_concurrent_updates' + self._transaction_concurrency_helper( + self._query_w_concurrent_update, PKEY) + @staticmethod def _row_data(max_index): for index in range(max_index): From e2c294658e4ea497ab079c8b7793022e637f4f30 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 19 Jul 2017 17:19:51 -0400 Subject: [PATCH 2/5] Add systest for user exception aborting transaction. --- spanner/tests/system/test_system.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/spanner/tests/system/test_system.py b/spanner/tests/system/test_system.py index 1a4b333e5527..56f81cfeb0e7 100644 --- a/spanner/tests/system/test_system.py +++ b/spanner/tests/system/test_system.py @@ -487,6 +487,35 @@ def test_transaction_read_and_insert_then_rollback(self): rows = list(session.read(self.TABLE, self.COLUMNS, self.ALL)) self.assertEqual(rows, []) + @RetryErrors(exception=GrpcRendezvous) + def test_transaction_read_and_insert_then_execption(self): + retry = RetryInstanceState(_has_all_ddl) + retry(self._db.reload)() + + class TestException(Exception): + pass + + session = self._db.session() + session.create() + self.to_delete.append(session) + + with session.batch() as batch: + batch.delete(self.TABLE, self.ALL) + + def _unit_of_work(transaction): + rows = list(transaction.read( + self.TABLE, self.COLUMNS, self.ALL)) + assert len(rows) == 0 + transaction.insert(self.TABLE, self.COLUMNS, self.ROW_DATA) + raise TestException() + + with self.assertRaises(TestException): + session.run_in_transaction(_unit_of_work) + + # Transaction was rolled back. + rows = list(session.read(self.TABLE, self.COLUMNS, self.ALL)) + self.assertEqual(rows, []) + @RetryErrors(exception=GrpcRendezvous) def test_transaction_read_and_insert_or_update_then_commit(self): retry = RetryInstanceState(_has_all_ddl) From db86cca4551ac448d47f93c05739ac160aa8ef3a Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 19 Jul 2017 17:31:08 -0400 Subject: [PATCH 3/5] Address review comments. --- spanner/tests/system/test_system.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/spanner/tests/system/test_system.py b/spanner/tests/system/test_system.py index 56f81cfeb0e7..24a71c1345ef 100644 --- a/spanner/tests/system/test_system.py +++ b/spanner/tests/system/test_system.py @@ -17,6 +17,7 @@ import operator import os import struct +import threading import unittest from google.cloud.proto.spanner.v1.type_pb2 import ARRAY @@ -543,8 +544,8 @@ def test_transaction_read_and_insert_or_update_then_commit(self): self._check_row_data(rows) def _transaction_concurrency_helper(self, unit_of_work, pkey): - import threading INITIAL_VALUE = 123 + NUM_THREADS = 3 # conforms to equivalent Java systest. retry = RetryInstanceState(_has_all_ddl) retry(self._db.reload)() @@ -561,10 +562,11 @@ def _transaction_concurrency_helper(self, unit_of_work, pkey): # We don't want to run the threads' transactions in the current # session, which would fail. - txn_sessions = [ - self._db.session(), self._db.session(), self._db.session()] + txn_sessions = [] - for txn_session in txn_sessions: + for _ in range(NUM_THREADS): + txn_session = self._db.session() + txn_sessions.append(txn_session) txn_session.create() self.to_delete.append(txn_session) @@ -594,9 +596,9 @@ def _read_w_concurrent_update(self, transaction, pkey): self.assertEqual(len(rows), 1) pkey, value = rows[0] transaction.update( - self.COUNTERS_TABLE, - self.COUNTERS_COLUMNS, - [[pkey, value + 1]]) + self.COUNTERS_TABLE, + self.COUNTERS_COLUMNS, + [[pkey, value + 1]]) def test_transaction_read_w_concurrent_updates(self): PKEY = 'read_w_concurrent_updates' From 561e8f580bfeb863752cae33293aaab4555d4e11 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 19 Jul 2017 17:49:27 -0400 Subject: [PATCH 4/5] Address more review feedback. --- spanner/tests/system/test_system.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/spanner/tests/system/test_system.py b/spanner/tests/system/test_system.py index 24a71c1345ef..82f7b2f26d49 100644 --- a/spanner/tests/system/test_system.py +++ b/spanner/tests/system/test_system.py @@ -488,14 +488,17 @@ def test_transaction_read_and_insert_then_rollback(self): rows = list(session.read(self.TABLE, self.COLUMNS, self.ALL)) self.assertEqual(rows, []) + def _transaction_read_then_raise(self, transaction): + rows = list(transaction.read(self.TABLE, self.COLUMNS, self.ALL)) + self.assertEqual(len(rows), 0) + transaction.insert(self.TABLE, self.COLUMNS, self.ROW_DATA) + raise TestException() + @RetryErrors(exception=GrpcRendezvous) def test_transaction_read_and_insert_then_execption(self): retry = RetryInstanceState(_has_all_ddl) retry(self._db.reload)() - class TestException(Exception): - pass - session = self._db.session() session.create() self.to_delete.append(session) @@ -503,15 +506,8 @@ class TestException(Exception): with session.batch() as batch: batch.delete(self.TABLE, self.ALL) - def _unit_of_work(transaction): - rows = list(transaction.read( - self.TABLE, self.COLUMNS, self.ALL)) - assert len(rows) == 0 - transaction.insert(self.TABLE, self.COLUMNS, self.ROW_DATA) - raise TestException() - with self.assertRaises(TestException): - session.run_in_transaction(_unit_of_work) + session.run_in_transaction(self._transaction_read_then_raise) # Transaction was rolled back. rows = list(session.read(self.TABLE, self.COLUMNS, self.ALL)) @@ -1026,6 +1022,10 @@ def test_four_meg(self): self._verify_two_columns(FOUR_MEG) +class TestException(Exception): + """Placeholder for any user-defined exception.""" + + class _DatabaseDropper(object): """Helper for cleaning up databases created on-the-fly.""" From e50e31a6f993893b1b8dca50b5f96b00b49e4e6a Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 19 Jul 2017 18:16:47 -0400 Subject: [PATCH 5/5] Mollify py.test. --- spanner/tests/system/test_system.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spanner/tests/system/test_system.py b/spanner/tests/system/test_system.py index 82f7b2f26d49..e6d73f977e94 100644 --- a/spanner/tests/system/test_system.py +++ b/spanner/tests/system/test_system.py @@ -492,7 +492,7 @@ def _transaction_read_then_raise(self, transaction): rows = list(transaction.read(self.TABLE, self.COLUMNS, self.ALL)) self.assertEqual(len(rows), 0) transaction.insert(self.TABLE, self.COLUMNS, self.ROW_DATA) - raise TestException() + raise CustomException() @RetryErrors(exception=GrpcRendezvous) def test_transaction_read_and_insert_then_execption(self): @@ -506,7 +506,7 @@ def test_transaction_read_and_insert_then_execption(self): with session.batch() as batch: batch.delete(self.TABLE, self.ALL) - with self.assertRaises(TestException): + with self.assertRaises(CustomException): session.run_in_transaction(self._transaction_read_then_raise) # Transaction was rolled back. @@ -1022,7 +1022,7 @@ def test_four_meg(self): self._verify_two_columns(FOUR_MEG) -class TestException(Exception): +class CustomException(Exception): """Placeholder for any user-defined exception."""