From 712be46794d69760aaa73ca81fac1be2fc175aac Mon Sep 17 00:00:00 2001 From: Royce Mitchell <3742423+remdragon@users.noreply.github.com> Date: Thu, 10 Aug 2017 20:42:56 -0500 Subject: [PATCH 01/11] Update sqlite3worker.py --- sqlite3worker.py | 497 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 335 insertions(+), 162 deletions(-) diff --git a/sqlite3worker.py b/sqlite3worker.py index 23eec87..0fedbec 100644 --- a/sqlite3worker.py +++ b/sqlite3worker.py @@ -26,172 +26,345 @@ import logging try: - import queue as Queue # module re-named in Python 3 -except ImportError: - import Queue + import queue as Queue # module re-named in Python 3 +except ImportError: # pragma: no cover + import Queue +import pathlib # pip install pathlib +import platform import sqlite3 import threading import time -import uuid LOGGER = logging.getLogger('sqlite3worker') +workers = {} -class Sqlite3Worker(threading.Thread): - """Sqlite thread safe object. - - Example: - from sqlite3worker import Sqlite3Worker - sql_worker = Sqlite3Worker("/tmp/test.sqlite") - sql_worker.execute( - "CREATE TABLE tester (timestamp DATETIME, uuid TEXT)") - sql_worker.execute( - "INSERT into tester values (?, ?)", ("2010-01-01 13:00:00", "bow")) - sql_worker.execute( - "INSERT into tester values (?, ?)", ("2011-02-02 14:14:14", "dog")) - sql_worker.execute("SELECT * from tester") - sql_worker.close() - """ - def __init__(self, file_name, max_queue_size=100): - """Automatically starts the thread. - - Args: - file_name: The name of the file. - max_queue_size: The max queries that will be queued. - """ - threading.Thread.__init__(self) - self.daemon = True - self.sqlite3_conn = sqlite3.connect( - file_name, check_same_thread=False, - detect_types=sqlite3.PARSE_DECLTYPES) - self.sqlite3_cursor = self.sqlite3_conn.cursor() - self.sql_queue = Queue.Queue(maxsize=max_queue_size) - self.results = {} - self.max_queue_size = max_queue_size - self.exit_set = False - # Token that is put into queue when close() is called. - self.exit_token = str(uuid.uuid4()) - self.start() - self.thread_running = True - - def run(self): - """Thread loop. - - This is an infinite loop. The iter method calls self.sql_queue.get() - which blocks if there are not values in the queue. As soon as values - are placed into the queue the process will continue. - - If many executes happen at once it will churn through them all before - calling commit() to speed things up by reducing the number of times - commit is called. - """ - LOGGER.debug("run: Thread started") - execute_count = 0 - for token, query, values in iter(self.sql_queue.get, None): - LOGGER.debug("sql_queue: %s", self.sql_queue.qsize()) - if token != self.exit_token: - LOGGER.debug("run: %s", query) - self.run_query(token, query, values) - execute_count += 1 - # Let the executes build up a little before committing to disk - # to speed things up. - if ( - self.sql_queue.empty() or - execute_count == self.max_queue_size): - LOGGER.debug("run: commit") - self.sqlite3_conn.commit() - execute_count = 0 - # Only exit if the queue is empty. Otherwise keep getting - # through the queue until it's empty. - if self.exit_set and self.sql_queue.empty(): - self.sqlite3_conn.commit() - self.sqlite3_conn.close() - self.thread_running = False - return - - def run_query(self, token, query, values): - """Run a query. - - Args: - token: A uuid object of the query you want returned. - query: A sql query with ? placeholders for values. - values: A tuple of values to replace "?" in query. - """ - if query.lower().strip().startswith("select"): - try: - self.sqlite3_cursor.execute(query, values) - self.results[token] = self.sqlite3_cursor.fetchall() - except sqlite3.Error as err: - # Put the error into the output queue since a response - # is required. - self.results[token] = ( - "Query returned error: %s: %s: %s" % (query, values, err)) - LOGGER.error( - "Query returned error: %s: %s: %s", query, values, err) - else: - try: - self.sqlite3_cursor.execute(query, values) - except sqlite3.Error as err: - LOGGER.error( - "Query returned error: %s: %s: %s", query, values, err) - - def close(self): - """Close down the thread and close the sqlite3 database file.""" - self.exit_set = True - self.sql_queue.put((self.exit_token, "", ""), timeout=5) - # Sleep and check that the thread is done before returning. - while self.thread_running: - time.sleep(.01) # Don't kill the CPU waiting. - - @property - def queue_size(self): - """Return the queue size.""" - return self.sql_queue.qsize() - - def query_results(self, token): - """Get the query results for a specific token. - - Args: - token: A uuid object of the query you want returned. - - Returns: - Return the results of the query when it's executed by the thread. - """ - delay = .001 - while True: - if token in self.results: - return_val = self.results[token] - del self.results[token] - return return_val - # Double back on the delay to a max of 8 seconds. This prevents - # a long lived select statement from trashing the CPU with this - # infinite loop as it's waiting for the query results. - LOGGER.debug("Sleeping: %s %s", delay, token) - time.sleep(delay) - if delay < 8: - delay += delay - - def execute(self, query, values=None): - """Execute a query. - - Args: - query: The sql string using ? for placeholders of dynamic values. - values: A tuple of values to be replaced into the ? of the query. - - Returns: - If it's a select query it will return the results of the query. - """ - if self.exit_set: - LOGGER.debug("Exit set, not running: %s", query) - return "Exit Called" - LOGGER.debug("execute: %s", query) - values = values or [] - # A token to track this query with. - token = str(uuid.uuid4()) - # If it's a select we queue it up with a token to mark the results - # into the output queue so we know what results are ours. - if query.lower().strip().startswith("select"): - self.sql_queue.put((token, query, values), timeout=5) - return self.query_results(token) - else: - self.sql_queue.put((token, query, values), timeout=5) +OperationalError = sqlite3.OperationalError +Row = sqlite3.Row + +class Frozen_object ( object ): + def __setattr__ ( self, key, value ): + if key not in dir ( self ): # prevent from accidentally creating new attributes + raise AttributeError ( '{!r} object has no attribute {!r}'.format ( type ( self ).__name__, key ) ) + super ( Frozen_object, self ).__setattr__ ( key, value ) + +class Sqlite3WorkerRequest ( Frozen_object ): + def execute ( self ): # pragma: no cover + raise NotImplementedError ( type ( self ).__name__ + '.execute()' ) + +class Sqlite3WorkerSetRowFactory ( Sqlite3WorkerRequest ): + worker = None + row_factory = None + + def __init__ ( self, worker, row_factory ): + self.worker = worker + self.row_factory = row_factory + + def execute ( self ): + self.worker.sqlite3_cursor.row_factory = self.row_factory + +class Sqlite3WorkerSetTextFactory ( Sqlite3WorkerRequest ): + worker = None + text_factory = None + + def __init__ ( self, worker, text_factory ): + self.worker = worker + self.text_factory = text_factory + + def execute ( self ): + self.worker.sqlite3_conn.text_factory = self.text_factory + +class Sqlite3WorkerExecute ( Sqlite3WorkerRequest ): + worker = None + query = None + values = None + results = None + + def __init__ ( self, worker, query, values ): + self.worker = worker + self.query = query + self.values = values + self.results = Queue.Queue() + + def execute ( self ): + LOGGER.debug ( "run execute: %s", self.query ) + worker = self.worker + cur = worker.sqlite3_cursor + try: + cur.execute ( self.query, self.values ) + result = ( cur.fetchall(), cur.description, cur.lastrowid ) + success = True + except Exception as err: + LOGGER.error ( + "Unhandled Exception in Sqlite3WorkerExecute.execute: {!r}".format ( err ) ) + result = err + success = False + self.results.put ( ( success, result ) ) + +class Sqlite3WorkerExecuteScript ( Sqlite3WorkerRequest ): + worker = None + query = None + results = None + + def __init__ ( self, worker, query ): + self.worker = worker + self.query = query + self.results = Queue.Queue() + + def execute ( self ): + LOGGER.debug ( "run executescript: %s", self.query ) + worker = self.worker + cur = worker.sqlite3_cursor + try: + cur.executescript ( self.query ) + result = ( cur.fetchall(), cur.description, cur.lastrowid ) + success = True + except Exception as err: + LOGGER.error ( + "Unhandled Exception in Sqlite3WorkerExecuteScript.execute: {!r}".format ( err ) ) + result = err + success = False + self.results.put ( ( success, result ) ) + +class Sqlite3WorkerCommit ( Sqlite3WorkerRequest ): + worker = None + + def __init__ ( self, worker ): + self.worker = worker + + def execute ( self ): + LOGGER.debug("run commit") + worker = self.worker + worker.sqlite3_conn.commit() + +class Sqlite3WorkerExit ( Exception, Sqlite3WorkerRequest ): + def execute ( self ): + raise self + +def normalize_file_name ( file_name ): + if file_name.lower() == ':memory:': + return ':memory:' + # lookup absolute path of file_name + file_name = str ( pathlib.Path ( file_name ).absolute() ) + if platform.system() == 'Windows': + file_name = file_name.lower() # Windows filenames are not case-sensitive + return file_name + +class Sqlite3Worker ( Frozen_object ): + """Sqlite thread safe object. + Example: + from sqlite3worker import Sqlite3Worker + sql_worker = Sqlite3Worker("/tmp/test.sqlite") + sql_worker.execute( + "CREATE TABLE tester (timestamp DATETIME, uuid TEXT)") + sql_worker.execute( + "INSERT into tester values (?, ?)", ("2010-01-01 13:00:00", "bow")) + sql_worker.execute( + "INSERT into tester values (?, ?)", ("2011-02-02 14:14:14", "dog")) + sql_worker.execute("SELECT * from tester") + sql_worker.close() + """ + file_name = None + sqlite3_conn = None + sqlite3_cursor = None + sql_queue = None + max_queue_size = None + exit_set = False + _thread = None + + def __init__ ( self, file_name, max_queue_size=100 ): + """Automatically starts the thread. + Args: + file_name: The name of the file. + max_queue_size: The max queries that will be queued. + """ + self._thread = threading.Thread ( target=self.run ) + self._thread.daemon = True + + self.file_name = normalize_file_name ( file_name ) + if self.file_name != ':memory:': + global workers + assert file_name not in workers, 'attempted to create two different Sqlite3Worker objects that reference the same database' + workers[file_name] = self + + self.sqlite3_conn = sqlite3.connect ( + file_name, check_same_thread=False, + #detect_types=sqlite3.PARSE_DECLTYPES + ) + self.sqlite3_cursor = self.sqlite3_conn.cursor() + self.sql_queue = Queue.Queue ( maxsize=max_queue_size ) + self.max_queue_size = max_queue_size + self._thread.name = self._thread.name.replace ( 'Thread-', 'sqlite3worker-' ) + self._thread.start() + + def run ( self ): + """Thread loop. + This is an infinite loop. The iter method calls self.sql_queue.get() + which blocks if there are not values in the queue. As soon as values + are placed into the queue the process will continue. + If many executes happen at once it will churn through them all before + calling commit() to speed things up by reducing the number of times + commit is called. + """ + LOGGER.debug("run: Thread started") + while True: + try: + self.sql_queue.get().execute() + except Sqlite3WorkerExit as e: + if not self.sql_queue.empty(): # pragma: no cover ( TODO FIXME: come back to this ) + self.sql_queue.put ( e ) # push the exit event to the end of the queue + continue + self.sqlite3_conn.commit() + self.sqlite3_conn.close() + if self.file_name != ':memory:': + global workers + del workers[self.file_name] + return + + def close ( self ): + """Close down the thread and close the sqlite3 database file.""" + if self.exit_set: # pragma: no cover + LOGGER.debug ( "Exit set, not running: %s", query ) + raise OperationalError ( 'sqlite worker thread already shutting down' ) + self.exit_set = True + self.sql_queue.put ( Sqlite3WorkerExit(), timeout=5 ) + # Sleep and check that the thread is done before returning. + self._thread.join() + + @property + def queue_size ( self ): # pragma: no cover + """Return the queue size.""" + return self.sql_queue.qsize() + + def set_row_factory ( self, row_factory ): + self.sql_queue.put ( Sqlite3WorkerSetRowFactory ( self, row_factory ), timeout=5 ) + + def set_text_factory ( self, text_factory ): + self.sql_queue.put ( Sqlite3WorkerSetTextFactory ( self, text_factory ), timeout=5 ) + + def execute_ex ( self, query, values=None ): + """Execute a query. + Args: + query: The sql string using ? for placeholders of dynamic values. + values: A tuple of values to be replaced into the ? of the query. + Returns: + a tuple of ( rows, description, lastrowid ): + rows is a list of row results returned by fetchall() or [] if no rows + description is the results of cursor.description after executing the query + lastrowid is the result of calling cursor.lastrowid after executing the query + """ + if self.exit_set: # pragma: no cover + LOGGER.debug ( "Exit set, not running: %s", query ) + raise OperationalError ( 'sqlite worker thread already shutting down' ) + LOGGER.debug ( "request execute: %s", query ) + r = Sqlite3WorkerExecute ( self, query, values or [] ) + self.sql_queue.put ( r, timeout=5 ) + success, result = r.results.get() + if not success: + raise result + else: + return result + + def execute ( self, query, values=None ): + return self.execute_ex ( query, values )[0] + + def executescript_ex ( self, query ): + if self.exit_set: # pragma: no cover + LOGGER.debug ( "Exit set, not running: %s", query ) + raise OperationalError ( 'sqlite worker thread already shutting down' ) + LOGGER.debug ( "request executescript: %s", query ) + r = Sqlite3WorkerExecuteScript ( self, query ) + self.sql_queue.put ( r, timeout=5 ) + success, result = r.results.get() + if not success: + raise result + else: + return result + + def executescript ( self, sql ): + return self.executescript_ex ( sql )[0] + + def commit ( self ): + if self.exit_set: # pragma: no cover + LOGGER.debug ( "Exit set, not running: %s", query ) + raise OperationalError ( 'sqlite worker thread already shutting down' ) + LOGGER.debug ( "request commit" ) + self.sql_queue.put ( Sqlite3WorkerCommit ( self ), timeout=5 ) + +class Sqlite3worker_dbapi_cursor ( Frozen_object ): + con = None + rows = None + description = None + lastrowid = None + + def __init__ ( self, con ): + self.con = con + + def close ( self ): + pass + + def execute ( self, sql, values=None ): + self.rows, self.description, self.lastrowid = self.con.worker.execute_ex ( sql, values ) + + def executescript ( self, sql_script ): + self.rows, self.description, self.lastrowid = self.con.worker.executescript_ex ( sql_script ) + + def fetchone ( self ): + try: + return self.rows.pop ( 0 ) + except IndexError: + return None + + def __iter__ ( self ): + while self.rows: + yield self.fetchone() + +class Sqlite3worker_dbapi_connection ( Frozen_object ): + worker = None + + def __init__ ( self, worker ): + self.worker = worker + + def commit ( self ): + self.worker.commit() + + def cursor ( self ): + return Sqlite3worker_dbapi_cursor ( self ) + + def execute ( self, sql, values=None ): + cur = self.cursor() + cur.execute ( sql, values ) + return cur + + def executescript ( self, sql_script ): + cur = self.cursor() + cur.executescript ( sql_script ) + return cur + + def close ( self ): + self.worker.close() + + @property + def row_factory ( self ): + raise NotImplementedError ( type ( self ).__name__ + '.row_factory' ) + + @row_factory.setter + def row_factory ( self, row_factory ): + self.worker.set_row_factory ( row_factory ) + + @property + def text_factory ( self ): + raise NotImplementedError ( type ( self ).__name__ + '.text_factory' ) + + @text_factory.setter + def text_factory ( self, text_factory ): + self.worker.set_text_factory ( text_factory ) + +def connect ( file_name ): + file_name = normalize_file_name ( file_name ) + global workers + try: + worker = workers[file_name] + except KeyError: + worker = Sqlite3Worker ( file_name ) + return Sqlite3worker_dbapi_connection ( worker ) From e01fd4eb2e0e0980e3d987b02b8d3a2ac93c019f Mon Sep 17 00:00:00 2001 From: Royce Mitchell <3742423+remdragon@users.noreply.github.com> Date: Fri, 11 Aug 2017 00:28:19 -0500 Subject: [PATCH 02/11] fix tests to match changes in sqlite3worker --- sqlite3worker_test.py | 80 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 70 insertions(+), 10 deletions(-) diff --git a/sqlite3worker_test.py b/sqlite3worker_test.py index 9fc90d3..da28e5b 100755 --- a/sqlite3worker_test.py +++ b/sqlite3worker_test.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +# -*- coding: utf-8 -*- # Copyright (c) 2014 Palantir Technologies # # Permission is hereby granted, free of charge, to any person obtaining a copy @@ -22,12 +23,15 @@ """sqlite3worker test routines.""" __author__ = "Shawn Lee" -__email__ = "shawnl@palantir.com" +__email__ = "dashawn@gmail.com" __license__ = "MIT" import os import tempfile +import threading import time +import uuid + import unittest import sqlite3worker @@ -35,7 +39,8 @@ class Sqlite3WorkerTests(unittest.TestCase): # pylint:disable=R0904 """Test out the sqlite3worker library.""" - def setUp(self): # pylint:disable=C0103 + + def setUp(self): # pylint:disable=D0102 self.tmp_file = tempfile.NamedTemporaryFile( suffix="pytest", prefix="sqlite").name self.sqlite3worker = sqlite3worker.Sqlite3Worker(self.tmp_file) @@ -43,23 +48,24 @@ def setUp(self): # pylint:disable=C0103 self.sqlite3worker.execute( "CREATE TABLE tester (timestamp DATETIME, uuid TEXT)") - def tearDown(self): # pylint:disable=C0103 - self.sqlite3worker.close() + def tearDown(self): # pylint:disable=D0102 + try: + self.sqlite3worker.close() + except sqlite3worker.OperationalError: + pass # the test may have already closed the database os.unlink(self.tmp_file) def test_bad_select(self): """Test a bad select query.""" query = "select THIS IS BAD SQL" - self.assertEqual( - self.sqlite3worker.execute(query), - ( - "Query returned error: select THIS IS BAD SQL: " - "[]: no such column: THIS")) + with self.assertRaises ( sqlite3worker.OperationalError ): + self.sqlite3worker.execute(query) def test_bad_insert(self): """Test a bad insert query.""" query = "insert THIS IS BAD SQL" - self.sqlite3worker.execute(query) + with self.assertRaises ( sqlite3worker.OperationalError ): + self.sqlite3worker.execute(query) # Give it one second to clear the queue. if self.sqlite3worker.queue_size != 0: time.sleep(1) @@ -83,6 +89,60 @@ def test_valid_insert(self): self.sqlite3worker.execute("SELECT * from tester"), [("2010-01-01 13:00:00", "bow"), ("2011-02-02 14:14:14", "dog")]) + def test_run_after_close(self): + """Test to make sure all events are cleared after object closed.""" + self.sqlite3worker.close() + with self.assertRaises ( sqlite3worker.OperationalError ): + self.sqlite3worker.execute( + "INSERT into tester values (?, ?)", ("2010-01-01 13:00:00", "bow")) + + def test_double_close(self): + """Make sure double closeing messages properly.""" + self.sqlite3worker.close() + with self.assertRaises ( sqlite3worker.OperationalError ): + self.sqlite3worker.close() + + def test_db_closed_properly(self): + """Make sure sqlite object is properly closed out.""" + self.sqlite3worker.close() + with self.assertRaises( + self.sqlite3worker._sqlite3_conn.ProgrammingError): + self.sqlite3worker._sqlite3_conn.total_changes + + def test_many_threads(self): + """Make sure lots of threads work together.""" + class threaded(threading.Thread): + def __init__(self, sqlite_obj): + threading.Thread.__init__(self, name=__name__) + self.sqlite_obj = sqlite_obj + self.daemon = True + self.failed = False + self.completed = False + self.start() + + def run(self): + for _ in range(5): + token = str(uuid.uuid4()) + self.sqlite_obj.execute( + "INSERT into tester values (?, ?)", + ("2010-01-01 13:00:00", token)) + resp = self.sqlite_obj.execute( + "SELECT * from tester where uuid = ?", (token,)) + if resp != [("2010-01-01 13:00:00", token)]: + self.failed = True + break + self.completed = True + + threads = [] + for _ in range(5): + threads.append(threaded(self.sqlite3worker)) + + for i in range(5): + while not threads[i].completed: + time.sleep(.1) + self.assertEqual(threads[i].failed, False) + threads[i].join() + if __name__ == "__main__": unittest.main() From ba5fb0262e643eaa9db8359b08018d3d3ba94677 Mon Sep 17 00:00:00 2001 From: Royce Mitchell <3742423+remdragon@users.noreply.github.com> Date: Fri, 11 Aug 2017 00:30:36 -0500 Subject: [PATCH 03/11] fixes from unit testing renamed sqlite3_conn to _sqlite3_conn to match tests, fix bad LOGGER.debug() call, --- sqlite3worker.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sqlite3worker.py b/sqlite3worker.py index 0fedbec..36d2006 100644 --- a/sqlite3worker.py +++ b/sqlite3worker.py @@ -72,7 +72,7 @@ def __init__ ( self, worker, text_factory ): self.text_factory = text_factory def execute ( self ): - self.worker.sqlite3_conn.text_factory = self.text_factory + self.worker._sqlite3_conn.text_factory = self.text_factory class Sqlite3WorkerExecute ( Sqlite3WorkerRequest ): worker = None @@ -135,7 +135,7 @@ def __init__ ( self, worker ): def execute ( self ): LOGGER.debug("run commit") worker = self.worker - worker.sqlite3_conn.commit() + worker._sqlite3_conn.commit() class Sqlite3WorkerExit ( Exception, Sqlite3WorkerRequest ): def execute ( self ): @@ -165,7 +165,7 @@ class Sqlite3Worker ( Frozen_object ): sql_worker.close() """ file_name = None - sqlite3_conn = None + _sqlite3_conn = None sqlite3_cursor = None sql_queue = None max_queue_size = None @@ -187,11 +187,11 @@ def __init__ ( self, file_name, max_queue_size=100 ): assert file_name not in workers, 'attempted to create two different Sqlite3Worker objects that reference the same database' workers[file_name] = self - self.sqlite3_conn = sqlite3.connect ( + self._sqlite3_conn = sqlite3.connect ( file_name, check_same_thread=False, #detect_types=sqlite3.PARSE_DECLTYPES ) - self.sqlite3_cursor = self.sqlite3_conn.cursor() + self.sqlite3_cursor = self._sqlite3_conn.cursor() self.sql_queue = Queue.Queue ( maxsize=max_queue_size ) self.max_queue_size = max_queue_size self._thread.name = self._thread.name.replace ( 'Thread-', 'sqlite3worker-' ) @@ -214,8 +214,8 @@ def run ( self ): if not self.sql_queue.empty(): # pragma: no cover ( TODO FIXME: come back to this ) self.sql_queue.put ( e ) # push the exit event to the end of the queue continue - self.sqlite3_conn.commit() - self.sqlite3_conn.close() + self._sqlite3_conn.commit() + self._sqlite3_conn.close() if self.file_name != ':memory:': global workers del workers[self.file_name] @@ -224,7 +224,7 @@ def run ( self ): def close ( self ): """Close down the thread and close the sqlite3 database file.""" if self.exit_set: # pragma: no cover - LOGGER.debug ( "Exit set, not running: %s", query ) + LOGGER.debug ( "sqlite worker thread already shutting down" ) raise OperationalError ( 'sqlite worker thread already shutting down' ) self.exit_set = True self.sql_queue.put ( Sqlite3WorkerExit(), timeout=5 ) From 58c7cd276e06f74b9e9ecaf33dedd60b58771322 Mon Sep 17 00:00:00 2001 From: Royce Mitchell <3742423+remdragon@users.noreply.github.com> Date: Fri, 11 Aug 2017 01:09:13 -0500 Subject: [PATCH 04/11] py3 compatibility --- sqlite3worker.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/sqlite3worker.py b/sqlite3worker.py index 36d2006..f26ce4d 100644 --- a/sqlite3worker.py +++ b/sqlite3worker.py @@ -95,8 +95,8 @@ def execute ( self ): result = ( cur.fetchall(), cur.description, cur.lastrowid ) success = True except Exception as err: - LOGGER.error ( - "Unhandled Exception in Sqlite3WorkerExecute.execute: {!r}".format ( err ) ) + LOGGER.debug ( + "Sqlite3WorkerExecute.execute sending exception back to calling thread: {!r}".format ( err ) ) result = err success = False self.results.put ( ( success, result ) ) @@ -120,8 +120,8 @@ def execute ( self ): result = ( cur.fetchall(), cur.description, cur.lastrowid ) success = True except Exception as err: - LOGGER.error ( - "Unhandled Exception in Sqlite3WorkerExecuteScript.execute: {!r}".format ( err ) ) + LOGGER.debug ( + "Sqlite3WorkerExecuteScript.execute sending exception back to calling thread: {!r}".format ( err ) ) result = err success = False self.results.put ( ( success, result ) ) @@ -184,8 +184,8 @@ def __init__ ( self, file_name, max_queue_size=100 ): self.file_name = normalize_file_name ( file_name ) if self.file_name != ':memory:': global workers - assert file_name not in workers, 'attempted to create two different Sqlite3Worker objects that reference the same database' - workers[file_name] = self + assert self.file_name not in workers, 'attempted to create two different Sqlite3Worker objects that reference the same database' + workers[self.file_name] = self self._sqlite3_conn = sqlite3.connect ( file_name, check_same_thread=False, @@ -218,7 +218,10 @@ def run ( self ): self._sqlite3_conn.close() if self.file_name != ':memory:': global workers - del workers[self.file_name] + try: + del workers[self.file_name] + except KeyError: + print ( 'file_name {!r} not found in workers {!r}'.format ( self.file_name, workers ) ) return def close ( self ): From 539bc01484954d67506ce7fe3258b97a6b7b69e7 Mon Sep 17 00:00:00 2001 From: Royce Mitchell <3742423+remdragon@users.noreply.github.com> Date: Fri, 11 Aug 2017 01:09:35 -0500 Subject: [PATCH 05/11] py3 compatibility --- sqlite3worker_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sqlite3worker_test.py b/sqlite3worker_test.py index da28e5b..78ab807 100755 --- a/sqlite3worker_test.py +++ b/sqlite3worker_test.py @@ -41,8 +41,8 @@ class Sqlite3WorkerTests(unittest.TestCase): # pylint:disable=R0904 """Test out the sqlite3worker library.""" def setUp(self): # pylint:disable=D0102 - self.tmp_file = tempfile.NamedTemporaryFile( - suffix="pytest", prefix="sqlite").name + self.tmp_file = tempfile.mktemp( + suffix="pytest", prefix="sqlite") self.sqlite3worker = sqlite3worker.Sqlite3Worker(self.tmp_file) # Create sql db. self.sqlite3worker.execute( From 5fbd4f95e8b52b2fe92832f5314cb14f38bf673c Mon Sep 17 00:00:00 2001 From: Royce Mitchell <3742423+remdragon@users.noreply.github.com> Date: Fri, 11 Aug 2017 01:19:38 -0500 Subject: [PATCH 06/11] Update sqlite3worker.py --- sqlite3worker.py | 68 ++++++++++++++++++++++++------------------------ 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/sqlite3worker.py b/sqlite3worker.py index f26ce4d..82375a4 100644 --- a/sqlite3worker.py +++ b/sqlite3worker.py @@ -61,7 +61,7 @@ def __init__ ( self, worker, row_factory ): self.row_factory = row_factory def execute ( self ): - self.worker.sqlite3_cursor.row_factory = self.row_factory + self.worker._sqlite3_cursor.row_factory = self.row_factory class Sqlite3WorkerSetTextFactory ( Sqlite3WorkerRequest ): worker = None @@ -89,7 +89,7 @@ def __init__ ( self, worker, query, values ): def execute ( self ): LOGGER.debug ( "run execute: %s", self.query ) worker = self.worker - cur = worker.sqlite3_cursor + cur = worker._sqlite3_cursor try: cur.execute ( self.query, self.values ) result = ( cur.fetchall(), cur.description, cur.lastrowid ) @@ -114,7 +114,7 @@ def __init__ ( self, worker, query ): def execute ( self ): LOGGER.debug ( "run executescript: %s", self.query ) worker = self.worker - cur = worker.sqlite3_cursor + cur = worker._sqlite3_cursor try: cur.executescript ( self.query ) result = ( cur.fetchall(), cur.description, cur.lastrowid ) @@ -164,12 +164,12 @@ class Sqlite3Worker ( Frozen_object ): sql_worker.execute("SELECT * from tester") sql_worker.close() """ - file_name = None + _file_name = None _sqlite3_conn = None - sqlite3_cursor = None - sql_queue = None - max_queue_size = None - exit_set = False + _sqlite3_cursor = None + _sql_queue = None + _max_queue_size = None + _exit_set = False _thread = None def __init__ ( self, file_name, max_queue_size=100 ): @@ -181,25 +181,25 @@ def __init__ ( self, file_name, max_queue_size=100 ): self._thread = threading.Thread ( target=self.run ) self._thread.daemon = True - self.file_name = normalize_file_name ( file_name ) - if self.file_name != ':memory:': + self._file_name = normalize_file_name ( file_name ) + if self._file_name != ':memory:': global workers - assert self.file_name not in workers, 'attempted to create two different Sqlite3Worker objects that reference the same database' - workers[self.file_name] = self + assert self._file_name not in workers, 'attempted to create two different Sqlite3Worker objects that reference the same database' + workers[self._file_name] = self self._sqlite3_conn = sqlite3.connect ( file_name, check_same_thread=False, #detect_types=sqlite3.PARSE_DECLTYPES ) - self.sqlite3_cursor = self._sqlite3_conn.cursor() - self.sql_queue = Queue.Queue ( maxsize=max_queue_size ) - self.max_queue_size = max_queue_size + self._sqlite3_cursor = self._sqlite3_conn.cursor() + self._sql_queue = Queue.Queue ( maxsize=max_queue_size ) + self._max_queue_size = max_queue_size self._thread.name = self._thread.name.replace ( 'Thread-', 'sqlite3worker-' ) self._thread.start() def run ( self ): """Thread loop. - This is an infinite loop. The iter method calls self.sql_queue.get() + This is an infinite loop. The iter method calls self._sql_queue.get() which blocks if there are not values in the queue. As soon as values are placed into the queue the process will continue. If many executes happen at once it will churn through them all before @@ -209,41 +209,41 @@ def run ( self ): LOGGER.debug("run: Thread started") while True: try: - self.sql_queue.get().execute() + self._sql_queue.get().execute() except Sqlite3WorkerExit as e: - if not self.sql_queue.empty(): # pragma: no cover ( TODO FIXME: come back to this ) - self.sql_queue.put ( e ) # push the exit event to the end of the queue + if not self._sql_queue.empty(): # pragma: no cover ( TODO FIXME: come back to this ) + self._sql_queue.put ( e ) # push the exit event to the end of the queue continue self._sqlite3_conn.commit() self._sqlite3_conn.close() - if self.file_name != ':memory:': + if self._file_name != ':memory:': global workers try: - del workers[self.file_name] + del workers[self._file_name] except KeyError: - print ( 'file_name {!r} not found in workers {!r}'.format ( self.file_name, workers ) ) + LOGGER.error ( 'file_name {!r} not found in workers {!r}'.format ( self._file_name, workers ) ) return def close ( self ): """Close down the thread and close the sqlite3 database file.""" - if self.exit_set: # pragma: no cover + if self._exit_set: # pragma: no cover LOGGER.debug ( "sqlite worker thread already shutting down" ) raise OperationalError ( 'sqlite worker thread already shutting down' ) - self.exit_set = True - self.sql_queue.put ( Sqlite3WorkerExit(), timeout=5 ) + self._exit_set = True + self._sql_queue.put ( Sqlite3WorkerExit(), timeout=5 ) # Sleep and check that the thread is done before returning. self._thread.join() @property def queue_size ( self ): # pragma: no cover """Return the queue size.""" - return self.sql_queue.qsize() + return self._sql_queue.qsize() def set_row_factory ( self, row_factory ): - self.sql_queue.put ( Sqlite3WorkerSetRowFactory ( self, row_factory ), timeout=5 ) + self._sql_queue.put ( Sqlite3WorkerSetRowFactory ( self, row_factory ), timeout=5 ) def set_text_factory ( self, text_factory ): - self.sql_queue.put ( Sqlite3WorkerSetTextFactory ( self, text_factory ), timeout=5 ) + self._sql_queue.put ( Sqlite3WorkerSetTextFactory ( self, text_factory ), timeout=5 ) def execute_ex ( self, query, values=None ): """Execute a query. @@ -256,12 +256,12 @@ def execute_ex ( self, query, values=None ): description is the results of cursor.description after executing the query lastrowid is the result of calling cursor.lastrowid after executing the query """ - if self.exit_set: # pragma: no cover + if self._exit_set: # pragma: no cover LOGGER.debug ( "Exit set, not running: %s", query ) raise OperationalError ( 'sqlite worker thread already shutting down' ) LOGGER.debug ( "request execute: %s", query ) r = Sqlite3WorkerExecute ( self, query, values or [] ) - self.sql_queue.put ( r, timeout=5 ) + self._sql_queue.put ( r, timeout=5 ) success, result = r.results.get() if not success: raise result @@ -272,12 +272,12 @@ def execute ( self, query, values=None ): return self.execute_ex ( query, values )[0] def executescript_ex ( self, query ): - if self.exit_set: # pragma: no cover + if self._exit_set: # pragma: no cover LOGGER.debug ( "Exit set, not running: %s", query ) raise OperationalError ( 'sqlite worker thread already shutting down' ) LOGGER.debug ( "request executescript: %s", query ) r = Sqlite3WorkerExecuteScript ( self, query ) - self.sql_queue.put ( r, timeout=5 ) + self._sql_queue.put ( r, timeout=5 ) success, result = r.results.get() if not success: raise result @@ -288,11 +288,11 @@ def executescript ( self, sql ): return self.executescript_ex ( sql )[0] def commit ( self ): - if self.exit_set: # pragma: no cover + if self._exit_set: # pragma: no cover LOGGER.debug ( "Exit set, not running: %s", query ) raise OperationalError ( 'sqlite worker thread already shutting down' ) LOGGER.debug ( "request commit" ) - self.sql_queue.put ( Sqlite3WorkerCommit ( self ), timeout=5 ) + self._sql_queue.put ( Sqlite3WorkerCommit ( self ), timeout=5 ) class Sqlite3worker_dbapi_cursor ( Frozen_object ): con = None From d9d53e668d71ef2a88fb55f976e380ae5239ef07 Mon Sep 17 00:00:00 2001 From: Royce Mitchell <3742423+remdragon@users.noreply.github.com> Date: Fri, 11 Aug 2017 21:13:31 -0500 Subject: [PATCH 07/11] pathlib is overkill here --- sqlite3worker.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sqlite3worker.py b/sqlite3worker.py index 82375a4..a618f44 100644 --- a/sqlite3worker.py +++ b/sqlite3worker.py @@ -25,15 +25,15 @@ __license__ = "MIT" import logging -try: - import queue as Queue # module re-named in Python 3 -except ImportError: # pragma: no cover - import Queue -import pathlib # pip install pathlib import platform +import os import sqlite3 import threading import time +try: + import queue as Queue # module re-named in Python 3 +except ImportError: # pragma: no cover + import Queue LOGGER = logging.getLogger('sqlite3worker') @@ -145,7 +145,7 @@ def normalize_file_name ( file_name ): if file_name.lower() == ':memory:': return ':memory:' # lookup absolute path of file_name - file_name = str ( pathlib.Path ( file_name ).absolute() ) + file_name = os.path.abspath ( file_name ) if platform.system() == 'Windows': file_name = file_name.lower() # Windows filenames are not case-sensitive return file_name From 3cf2e0bed85a998ced433eb61919b1d5ffd81204 Mon Sep 17 00:00:00 2001 From: Royce Mitchell <3742423+remdragon@users.noreply.github.com> Date: Thu, 17 Aug 2017 22:56:09 -0500 Subject: [PATCH 08/11] sqlite3worker.py: refactor Sqlite3Worker class to share thread objects so that we can have multiple instances of a Sqlite3Worker. This fixes bug with multiple dbapi connections not shutting down the thread on each other. Replaced OperationalError with ProgrammingError where it seemed more appropriate. sqlite3worker_test.py: wrote test to reproduce an error that occurred in production when opening multiple dbapi connections to the same database from different threads --- sqlite3worker.py | 201 +++++++++++++++++++++++------------------- sqlite3worker_test.py | 15 ++-- 2 files changed, 117 insertions(+), 99 deletions(-) diff --git a/sqlite3worker.py b/sqlite3worker.py index a618f44..6cd5160 100644 --- a/sqlite3worker.py +++ b/sqlite3worker.py @@ -37,9 +37,8 @@ LOGGER = logging.getLogger('sqlite3worker') -workers = {} - OperationalError = sqlite3.OperationalError +ProgrammingError = sqlite3.ProgrammingError Row = sqlite3.Row class Frozen_object ( object ): @@ -53,43 +52,43 @@ def execute ( self ): # pragma: no cover raise NotImplementedError ( type ( self ).__name__ + '.execute()' ) class Sqlite3WorkerSetRowFactory ( Sqlite3WorkerRequest ): - worker = None + thread = None row_factory = None - def __init__ ( self, worker, row_factory ): - self.worker = worker + def __init__ ( self, thread, row_factory ): + self.thread = thread self.row_factory = row_factory def execute ( self ): - self.worker._sqlite3_cursor.row_factory = self.row_factory + self.thread._sqlite3_cursor.row_factory = self.row_factory class Sqlite3WorkerSetTextFactory ( Sqlite3WorkerRequest ): - worker = None + thread = None text_factory = None - def __init__ ( self, worker, text_factory ): - self.worker = worker + def __init__ ( self, thread, text_factory ): + self.thread = thread self.text_factory = text_factory def execute ( self ): - self.worker._sqlite3_conn.text_factory = self.text_factory + self.thread._sqlite3_conn.text_factory = self.text_factory class Sqlite3WorkerExecute ( Sqlite3WorkerRequest ): - worker = None + thread = None query = None values = None results = None - def __init__ ( self, worker, query, values ): - self.worker = worker + def __init__ ( self, thread, query, values ): + self.thread = thread self.query = query self.values = values self.results = Queue.Queue() def execute ( self ): LOGGER.debug ( "run execute: %s", self.query ) - worker = self.worker - cur = worker._sqlite3_cursor + thread = self.thread + cur = thread._sqlite3_cursor try: cur.execute ( self.query, self.values ) result = ( cur.fetchall(), cur.description, cur.lastrowid ) @@ -102,19 +101,18 @@ def execute ( self ): self.results.put ( ( success, result ) ) class Sqlite3WorkerExecuteScript ( Sqlite3WorkerRequest ): - worker = None + thread = None query = None results = None - def __init__ ( self, worker, query ): - self.worker = worker + def __init__ ( self, thread, query ): + self.thread = thread self.query = query self.results = Queue.Queue() def execute ( self ): LOGGER.debug ( "run executescript: %s", self.query ) - worker = self.worker - cur = worker._sqlite3_cursor + cur = self._sqlite3_cursor try: cur.executescript ( self.query ) result = ( cur.fetchall(), cur.description, cur.lastrowid ) @@ -127,15 +125,15 @@ def execute ( self ): self.results.put ( ( success, result ) ) class Sqlite3WorkerCommit ( Sqlite3WorkerRequest ): - worker = None + thread = None - def __init__ ( self, worker ): - self.worker = worker + def __init__ ( self, thread ): + self.thread = thread def execute ( self ): LOGGER.debug("run commit") - worker = self.worker - worker._sqlite3_conn.commit() + thread = self.thread + thread._sqlite3_conn.commit() class Sqlite3WorkerExit ( Exception, Sqlite3WorkerRequest ): def execute ( self ): @@ -150,43 +148,17 @@ def normalize_file_name ( file_name ): file_name = file_name.lower() # Windows filenames are not case-sensitive return file_name -class Sqlite3Worker ( Frozen_object ): - """Sqlite thread safe object. - Example: - from sqlite3worker import Sqlite3Worker - sql_worker = Sqlite3Worker("/tmp/test.sqlite") - sql_worker.execute( - "CREATE TABLE tester (timestamp DATETIME, uuid TEXT)") - sql_worker.execute( - "INSERT into tester values (?, ?)", ("2010-01-01 13:00:00", "bow")) - sql_worker.execute( - "INSERT into tester values (?, ?)", ("2011-02-02 14:14:14", "dog")) - sql_worker.execute("SELECT * from tester") - sql_worker.close() - """ - _file_name = None +class Sqlite3WorkerThread ( threading.Thread ): + _workers = None _sqlite3_conn = None _sqlite3_cursor = None _sql_queue = None _max_queue_size = None - _exit_set = False - _thread = None - def __init__ ( self, file_name, max_queue_size=100 ): - """Automatically starts the thread. - Args: - file_name: The name of the file. - max_queue_size: The max queries that will be queued. - """ - self._thread = threading.Thread ( target=self.run ) - self._thread.daemon = True - - self._file_name = normalize_file_name ( file_name ) - if self._file_name != ':memory:': - global workers - assert self._file_name not in workers, 'attempted to create two different Sqlite3Worker objects that reference the same database' - workers[self._file_name] = self - + def __init__ ( self, file_name, max_queue_size, *args, **kwargs ): + super ( Sqlite3WorkerThread, self ).__init__ ( *args, **kwargs ) + self.daemon = True + self._workers = set() self._sqlite3_conn = sqlite3.connect ( file_name, check_same_thread=False, #detect_types=sqlite3.PARSE_DECLTYPES @@ -194,8 +166,8 @@ def __init__ ( self, file_name, max_queue_size=100 ): self._sqlite3_cursor = self._sqlite3_conn.cursor() self._sql_queue = Queue.Queue ( maxsize=max_queue_size ) self._max_queue_size = max_queue_size - self._thread.name = self._thread.name.replace ( 'Thread-', 'sqlite3worker-' ) - self._thread.start() + self.name = self.name.replace ( 'Thread-', 'Sqlite3WorkerThread-' ) + self.start() def run ( self ): """Thread loop. @@ -209,41 +181,86 @@ def run ( self ): LOGGER.debug("run: Thread started") while True: try: - self._sql_queue.get().execute() + x = self._sql_queue.get() + x.execute() except Sqlite3WorkerExit as e: if not self._sql_queue.empty(): # pragma: no cover ( TODO FIXME: come back to this ) + LOGGER.debug ( 'requeueing the exit event because there are unfinished actions' ) self._sql_queue.put ( e ) # push the exit event to the end of the queue continue + LOGGER.debug ( 'closing database connection' ) + self._sqlite3_cursor.close() self._sqlite3_conn.commit() self._sqlite3_conn.close() - if self._file_name != ':memory:': - global workers - try: - del workers[self._file_name] - except KeyError: - LOGGER.error ( 'file_name {!r} not found in workers {!r}'.format ( self._file_name, workers ) ) - return + LOGGER.debug ( 'exiting thread' ) + break + +class Sqlite3Worker ( Frozen_object ): + """Sqlite thread safe object. + Example: + from sqlite3worker import Sqlite3Worker + sql_worker = Sqlite3Worker("/tmp/test.sqlite") + sql_worker.execute( + "CREATE TABLE tester (timestamp DATETIME, uuid TEXT)") + sql_worker.execute( + "INSERT into tester values (?, ?)", ("2010-01-01 13:00:00", "bow")) + sql_worker.execute( + "INSERT into tester values (?, ?)", ("2011-02-02 14:14:14", "dog")) + sql_worker.execute("SELECT * from tester") + sql_worker.close() + """ + _file_name = None + _exit_set = False + _thread = None + + # class shared attributes + _threads = {} + _threads_lock = threading.Lock() + + def __init__ ( self, file_name, max_queue_size=100 ): + """Automatically starts the thread. + Args: + file_name: The name of the file. + max_queue_size: The max queries that will be queued. + """ + + self._file_name = normalize_file_name ( file_name ) + with self._threads_lock: + self._thread = self._threads.get ( self._file_name ) + if self._thread is None: + self._thread = Sqlite3WorkerThread ( self._file_name, max_queue_size ) + self._threads[self._file_name] = self._thread + if self._file_name != ':memory:': + self._threads[self._file_name] = self._thread + self._thread._workers.add ( self ) def close ( self ): - """Close down the thread and close the sqlite3 database file.""" + """If we're the last worker, close down the thread which closes the sqlite3 database file.""" if self._exit_set: # pragma: no cover - LOGGER.debug ( "sqlite worker thread already shutting down" ) - raise OperationalError ( 'sqlite worker thread already shutting down' ) + LOGGER.debug ( "sqlite worker already closed" ) + raise ProgrammingError ( 'sqlite worker already closed' ) self._exit_set = True - self._sql_queue.put ( Sqlite3WorkerExit(), timeout=5 ) - # Sleep and check that the thread is done before returning. - self._thread.join() + with self._threads_lock: + self._thread._workers.remove ( self ) + if not self._thread._workers: + self._thread._sql_queue.put ( Sqlite3WorkerExit(), timeout=5 ) + # wait for the thread to finish what it's doing and shut down + self._thread.join() + try: + del self._threads[self._file_name] + except KeyError: + assert self._file_name == ':memory:' @property def queue_size ( self ): # pragma: no cover """Return the queue size.""" - return self._sql_queue.qsize() + return self._thread._sql_queue.qsize() def set_row_factory ( self, row_factory ): - self._sql_queue.put ( Sqlite3WorkerSetRowFactory ( self, row_factory ), timeout=5 ) + self._thread._sql_queue.put ( Sqlite3WorkerSetRowFactory ( self._thread, row_factory ), timeout=5 ) def set_text_factory ( self, text_factory ): - self._sql_queue.put ( Sqlite3WorkerSetTextFactory ( self, text_factory ), timeout=5 ) + self._thread._sql_queue.put ( Sqlite3WorkerSetTextFactory ( self._thread, text_factory ), timeout=5 ) def execute_ex ( self, query, values=None ): """Execute a query. @@ -258,10 +275,10 @@ def execute_ex ( self, query, values=None ): """ if self._exit_set: # pragma: no cover LOGGER.debug ( "Exit set, not running: %s", query ) - raise OperationalError ( 'sqlite worker thread already shutting down' ) + raise ProgrammingError ( 'sqlite worker already closed' ) LOGGER.debug ( "request execute: %s", query ) - r = Sqlite3WorkerExecute ( self, query, values or [] ) - self._sql_queue.put ( r, timeout=5 ) + r = Sqlite3WorkerExecute ( self._thread, query, values or [] ) + self._thread._sql_queue.put ( r, timeout=5 ) success, result = r.results.get() if not success: raise result @@ -274,10 +291,10 @@ def execute ( self, query, values=None ): def executescript_ex ( self, query ): if self._exit_set: # pragma: no cover LOGGER.debug ( "Exit set, not running: %s", query ) - raise OperationalError ( 'sqlite worker thread already shutting down' ) + raise ProgrammingError ( 'sqlite worker already closed' ) LOGGER.debug ( "request executescript: %s", query ) - r = Sqlite3WorkerExecuteScript ( self, query ) - self._sql_queue.put ( r, timeout=5 ) + r = Sqlite3WorkerExecuteScript ( self._thread, query ) + self._thread._sql_queue.put ( r, timeout=5 ) success, result = r.results.get() if not success: raise result @@ -290,9 +307,16 @@ def executescript ( self, sql ): def commit ( self ): if self._exit_set: # pragma: no cover LOGGER.debug ( "Exit set, not running: %s", query ) - raise OperationalError ( 'sqlite worker thread already shutting down' ) + raise ProgrammingError ( 'sqlite worker already closed' ) LOGGER.debug ( "request commit" ) - self._sql_queue.put ( Sqlite3WorkerCommit ( self ), timeout=5 ) + self._thread._sql_queue.put ( Sqlite3WorkerCommit ( self._thread ), timeout=5 ) + + @property + def total_changes ( self ): + if self._exit_set: # pragma: no cover + LOGGER.debug ( "Exit set, not querying total_changes" ) + raise ProgrammingError ( 'sqlite worker already closed' ) + return self._thread._sqlite3_conn.total_changes class Sqlite3worker_dbapi_cursor ( Frozen_object ): con = None @@ -346,6 +370,7 @@ def executescript ( self, sql_script ): def close ( self ): self.worker.close() + self.worker = None @property def row_factory ( self ): @@ -364,10 +389,4 @@ def text_factory ( self, text_factory ): self.worker.set_text_factory ( text_factory ) def connect ( file_name ): - file_name = normalize_file_name ( file_name ) - global workers - try: - worker = workers[file_name] - except KeyError: - worker = Sqlite3Worker ( file_name ) - return Sqlite3worker_dbapi_connection ( worker ) + return Sqlite3worker_dbapi_connection ( Sqlite3Worker ( file_name ) ) diff --git a/sqlite3worker_test.py b/sqlite3worker_test.py index 78ab807..d39463c 100755 --- a/sqlite3worker_test.py +++ b/sqlite3worker_test.py @@ -41,8 +41,8 @@ class Sqlite3WorkerTests(unittest.TestCase): # pylint:disable=R0904 """Test out the sqlite3worker library.""" def setUp(self): # pylint:disable=D0102 - self.tmp_file = tempfile.mktemp( - suffix="pytest", prefix="sqlite") + self.tmp_file = tempfile.NamedTemporaryFile( + suffix="pytest", prefix="sqlite").name self.sqlite3worker = sqlite3worker.Sqlite3Worker(self.tmp_file) # Create sql db. self.sqlite3worker.execute( @@ -51,7 +51,7 @@ def setUp(self): # pylint:disable=D0102 def tearDown(self): # pylint:disable=D0102 try: self.sqlite3worker.close() - except sqlite3worker.OperationalError: + except sqlite3worker.ProgrammingError: pass # the test may have already closed the database os.unlink(self.tmp_file) @@ -92,22 +92,21 @@ def test_valid_insert(self): def test_run_after_close(self): """Test to make sure all events are cleared after object closed.""" self.sqlite3worker.close() - with self.assertRaises ( sqlite3worker.OperationalError ): + with self.assertRaises ( sqlite3worker.ProgrammingError ): self.sqlite3worker.execute( "INSERT into tester values (?, ?)", ("2010-01-01 13:00:00", "bow")) def test_double_close(self): """Make sure double closeing messages properly.""" self.sqlite3worker.close() - with self.assertRaises ( sqlite3worker.OperationalError ): + with self.assertRaises ( sqlite3worker.ProgrammingError ): self.sqlite3worker.close() def test_db_closed_properly(self): """Make sure sqlite object is properly closed out.""" self.sqlite3worker.close() - with self.assertRaises( - self.sqlite3worker._sqlite3_conn.ProgrammingError): - self.sqlite3worker._sqlite3_conn.total_changes + with self.assertRaises ( sqlite3worker.ProgrammingError): + self.sqlite3worker.total_changes def test_many_threads(self): """Make sure lots of threads work together.""" From 190a257c82814674e2883396b7334f294b504320 Mon Sep 17 00:00:00 2001 From: Royce Mitchell <3742423+remdragon@users.noreply.github.com> Date: Fri, 18 Aug 2017 07:39:24 -0500 Subject: [PATCH 09/11] sqlite3worker.py: fix bug in Sqlite3Worker.close() sqlite3worker_test.py: it helps when you actually commit the new tests you write --- sqlite3worker.py | 8 +++--- sqlite3worker_test.py | 62 ++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 62 insertions(+), 8 deletions(-) diff --git a/sqlite3worker.py b/sqlite3worker.py index 6cd5160..e425907 100644 --- a/sqlite3worker.py +++ b/sqlite3worker.py @@ -246,10 +246,10 @@ def close ( self ): self._thread._sql_queue.put ( Sqlite3WorkerExit(), timeout=5 ) # wait for the thread to finish what it's doing and shut down self._thread.join() - try: - del self._threads[self._file_name] - except KeyError: - assert self._file_name == ':memory:' + try: + del self._threads[self._file_name] + except KeyError: + assert self._file_name == ':memory:' @property def queue_size ( self ): # pragma: no cover diff --git a/sqlite3worker_test.py b/sqlite3worker_test.py index d39463c..57b378c 100755 --- a/sqlite3worker_test.py +++ b/sqlite3worker_test.py @@ -26,6 +26,7 @@ __email__ = "dashawn@gmail.com" __license__ = "MIT" +import logging import os import tempfile import threading @@ -41,8 +42,8 @@ class Sqlite3WorkerTests(unittest.TestCase): # pylint:disable=R0904 """Test out the sqlite3worker library.""" def setUp(self): # pylint:disable=D0102 - self.tmp_file = tempfile.NamedTemporaryFile( - suffix="pytest", prefix="sqlite").name + self.tmp_file = tempfile.mktemp( + suffix="pytest", prefix="sqlite") self.sqlite3worker = sqlite3worker.Sqlite3Worker(self.tmp_file) # Create sql db. self.sqlite3worker.execute( @@ -97,7 +98,7 @@ def test_run_after_close(self): "INSERT into tester values (?, ?)", ("2010-01-01 13:00:00", "bow")) def test_double_close(self): - """Make sure double closeing messages properly.""" + """Make sure double closing messages properly.""" self.sqlite3worker.close() with self.assertRaises ( sqlite3worker.ProgrammingError ): self.sqlite3worker.close() @@ -105,7 +106,7 @@ def test_double_close(self): def test_db_closed_properly(self): """Make sure sqlite object is properly closed out.""" self.sqlite3worker.close() - with self.assertRaises ( sqlite3worker.ProgrammingError): + with self.assertRaises ( sqlite3worker.ProgrammingError ): self.sqlite3worker.total_changes def test_many_threads(self): @@ -142,6 +143,59 @@ def run(self): self.assertEqual(threads[i].failed, False) threads[i].join() + def test_many_dbapi_threads ( self ): + """Make sure lots of threads work together with dbapi interface.""" + class threaded ( threading.Thread ): + def __init__ ( self, id, tmp_file ): + threading.Thread.__init__ ( self, name='test {}'.format ( id ) ) + self.tmp_file = tmp_file + self.daemon = True + self.failed = False + self.completed = False + self.start() + + def run ( self ): + logging.debug ( 'connecting' ) + con = sqlite3worker.connect ( self.tmp_file ) + for i in range ( 5 ): + logging.debug ( 'creating cursor #{}'.format ( i ) ) + c = con.cursor() + token = str ( uuid.uuid4() ) + logging.debug ( 'cursor #{} inserting token {!r}'.format ( i, token ) ) + c.execute ( + "INSERT into tester values (?, ?)", + ( "2010-01-01 13:00:00", token ) + ) + logging.debug ( 'cursor #{} querying token {!r}'.format ( i, token ) ) + c.execute ( + "SELECT * from tester where uuid = ?", (token,) + ) + resp = c.fetchone() + logging.debug ( 'cursor #{} closing'.format ( i ) ) + c.close() + if resp != ( "2010-01-01 13:00:00", token ): + logging.debug ( 'cursor #{} invalid resp {!r}'.format ( i, resp ) ) + logging.debug ( repr ( resp ) ) + self.failed = True + break + else: + logging.debug ( 'cursor #{} success'.format ( i ) ) + logging.debug ( 'closing connection' ) + con.close() + self.completed = True + + threads = [] + for id in range ( 5 ): + threads.append ( threaded ( id, self.tmp_file ) ) + + for i in range ( 5 ): + while not threads[i].completed: + time.sleep ( 0.1 ) + self.assertEqual ( threads[i].failed, False ) + threads[i].join() if __name__ == "__main__": + if False: + import sys + logging.basicConfig ( stream=sys.stdout, level=logging.DEBUG, format='%(asctime)s [%(threadName)s %(levelname)s] %(message)s' ) unittest.main() From b665c6a24e47b1dac5c743ac5733034a99bc6fe2 Mon Sep 17 00:00:00 2001 From: Royce Mitchell <3742423+remdragon@users.noreply.github.com> Date: Mon, 4 Sep 2017 16:34:05 -0500 Subject: [PATCH 10/11] sqlite3worker.py: replace sqlite3.Row with dict_factory because sqlite3.Row doesn't like our fake cursor, fix bug in Sqlite3WorkerExecuteScript, remove "no cover" pragmas that are now covered in testing, fix typo in Sqlite3Worker.commit(), dbapi interface now keeps and applies row_factory itself instead of relying on Sqlite3Worker to do it - this fixes bug where different connections have different row factories sqlite3worker_test.py: add a bunch more tests to get code coverage to 100% and prevent regression of bugs fixed in sqlite3worker.py --- sqlite3worker.py | 47 +++++++++++++++++++------------------ sqlite3worker_test.py | 54 +++++++++++++++++++++++++++++++++++++------ 2 files changed, 71 insertions(+), 30 deletions(-) diff --git a/sqlite3worker.py b/sqlite3worker.py index e425907..a959ce4 100644 --- a/sqlite3worker.py +++ b/sqlite3worker.py @@ -39,7 +39,15 @@ OperationalError = sqlite3.OperationalError ProgrammingError = sqlite3.ProgrammingError -Row = sqlite3.Row + +def dict_factory ( cursor, row ): + d = {} + for idx, col in enumerate ( cursor.description ): + d[col[0]] = row[idx] + return d + +# native sqlite3.Row doesn't like our proxy cursor class so we're going to substitute dict_factory instead which is almost the same thing +Row = dict_factory # sqlite3.Row class Frozen_object ( object ): def __setattr__ ( self, key, value ): @@ -87,8 +95,7 @@ def __init__ ( self, thread, query, values ): def execute ( self ): LOGGER.debug ( "run execute: %s", self.query ) - thread = self.thread - cur = thread._sqlite3_cursor + cur = self.thread._sqlite3_cursor try: cur.execute ( self.query, self.values ) result = ( cur.fetchall(), cur.description, cur.lastrowid ) @@ -112,7 +119,7 @@ def __init__ ( self, thread, query ): def execute ( self ): LOGGER.debug ( "run executescript: %s", self.query ) - cur = self._sqlite3_cursor + cur = self.thread._sqlite3_cursor try: cur.executescript ( self.query ) result = ( cur.fetchall(), cur.description, cur.lastrowid ) @@ -132,8 +139,7 @@ def __init__ ( self, thread ): def execute ( self ): LOGGER.debug("run commit") - thread = self.thread - thread._sqlite3_conn.commit() + self.thread._sqlite3_conn.commit() class Sqlite3WorkerExit ( Exception, Sqlite3WorkerRequest ): def execute ( self ): @@ -236,8 +242,7 @@ def __init__ ( self, file_name, max_queue_size=100 ): def close ( self ): """If we're the last worker, close down the thread which closes the sqlite3 database file.""" - if self._exit_set: # pragma: no cover - LOGGER.debug ( "sqlite worker already closed" ) + if self._exit_set: raise ProgrammingError ( 'sqlite worker already closed' ) self._exit_set = True with self._threads_lock: @@ -252,7 +257,7 @@ def close ( self ): assert self._file_name == ':memory:' @property - def queue_size ( self ): # pragma: no cover + def queue_size ( self ): """Return the queue size.""" return self._thread._sql_queue.qsize() @@ -273,7 +278,7 @@ def execute_ex ( self, query, values=None ): description is the results of cursor.description after executing the query lastrowid is the result of calling cursor.lastrowid after executing the query """ - if self._exit_set: # pragma: no cover + if self._exit_set: LOGGER.debug ( "Exit set, not running: %s", query ) raise ProgrammingError ( 'sqlite worker already closed' ) LOGGER.debug ( "request execute: %s", query ) @@ -289,7 +294,7 @@ def execute ( self, query, values=None ): return self.execute_ex ( query, values )[0] def executescript_ex ( self, query ): - if self._exit_set: # pragma: no cover + if self._exit_set: LOGGER.debug ( "Exit set, not running: %s", query ) raise ProgrammingError ( 'sqlite worker already closed' ) LOGGER.debug ( "request executescript: %s", query ) @@ -305,15 +310,15 @@ def executescript ( self, sql ): return self.executescript_ex ( sql )[0] def commit ( self ): - if self._exit_set: # pragma: no cover - LOGGER.debug ( "Exit set, not running: %s", query ) + if self._exit_set: + LOGGER.debug ( "Exit set, not commiting" ) raise ProgrammingError ( 'sqlite worker already closed' ) LOGGER.debug ( "request commit" ) self._thread._sql_queue.put ( Sqlite3WorkerCommit ( self._thread ), timeout=5 ) @property def total_changes ( self ): - if self._exit_set: # pragma: no cover + if self._exit_set: LOGGER.debug ( "Exit set, not querying total_changes" ) raise ProgrammingError ( 'sqlite worker already closed' ) return self._thread._sqlite3_conn.total_changes @@ -338,7 +343,7 @@ def executescript ( self, sql_script ): def fetchone ( self ): try: - return self.rows.pop ( 0 ) + return self.con.row_factory ( self, self.rows.pop ( 0 ) ) except IndexError: return None @@ -372,16 +377,12 @@ def close ( self ): self.worker.close() self.worker = None - @property - def row_factory ( self ): - raise NotImplementedError ( type ( self ).__name__ + '.row_factory' ) - - @row_factory.setter - def row_factory ( self, row_factory ): - self.worker.set_row_factory ( row_factory ) + @staticmethod + def row_factory ( cursor, row ): + return row @property - def text_factory ( self ): + def text_factory ( self ): # pragma: no cover raise NotImplementedError ( type ( self ).__name__ + '.text_factory' ) @text_factory.setter diff --git a/sqlite3worker_test.py b/sqlite3worker_test.py index 57b378c..7ac3c10 100755 --- a/sqlite3worker_test.py +++ b/sqlite3worker_test.py @@ -46,7 +46,7 @@ def setUp(self): # pylint:disable=D0102 suffix="pytest", prefix="sqlite") self.sqlite3worker = sqlite3worker.Sqlite3Worker(self.tmp_file) # Create sql db. - self.sqlite3worker.execute( + self.sqlite3worker.executescript( # using executescript here for code coverage reasons "CREATE TABLE tester (timestamp DATETIME, uuid TEXT)") def tearDown(self): # pylint:disable=D0102 @@ -68,7 +68,7 @@ def test_bad_insert(self): with self.assertRaises ( sqlite3worker.OperationalError ): self.sqlite3worker.execute(query) # Give it one second to clear the queue. - if self.sqlite3worker.queue_size != 0: + if self.sqlite3worker.queue_size != 0: # pragma: no cover - this never happens any more time.sleep(1) self.assertEqual(self.sqlite3worker.queue_size, 0) self.assertEqual( @@ -84,7 +84,7 @@ def test_valid_insert(self): self.sqlite3worker.execute( "INSERT into tester values (?, ?)", ("2011-02-02 14:14:14", "dog")) # Give it one second to clear the queue. - if self.sqlite3worker.queue_size != 0: + if self.sqlite3worker.queue_size != 0: # pragma: no cover - this never happens any more time.sleep(1) self.assertEqual( self.sqlite3worker.execute("SELECT * from tester"), @@ -128,7 +128,7 @@ def run(self): ("2010-01-01 13:00:00", token)) resp = self.sqlite_obj.execute( "SELECT * from tester where uuid = ?", (token,)) - if resp != [("2010-01-01 13:00:00", token)]: + if resp != [("2010-01-01 13:00:00", token)]: # pragma: no cover ( we don't expect tests to fail ) self.failed = True break self.completed = True @@ -173,7 +173,7 @@ def run ( self ): resp = c.fetchone() logging.debug ( 'cursor #{} closing'.format ( i ) ) c.close() - if resp != ( "2010-01-01 13:00:00", token ): + if resp != ( "2010-01-01 13:00:00", token ): # pragma: no cover ( we don't expect tests to fail ) logging.debug ( 'cursor #{} invalid resp {!r}'.format ( i, resp ) ) logging.debug ( repr ( resp ) ) self.failed = True @@ -188,13 +188,53 @@ def run ( self ): for id in range ( 5 ): threads.append ( threaded ( id, self.tmp_file ) ) + con = sqlite3worker.connect ( self.tmp_file ) + con.executescript ( 'pragma foreign_keys=on;' ) # not using this, put here for code coverage reasons + con.row_factory = sqlite3worker.Row + con.text_factory = unicode + for i in range ( 5 ): while not threads[i].completed: time.sleep ( 0.1 ) self.assertEqual ( threads[i].failed, False ) threads[i].join() - -if __name__ == "__main__": + + logging.debug ( 'counting results' ) # yes I could do a count(*) here but I'm doing it this way for code coverage reasons + con.commit() + cur = con.execute ( 'select * from tester' ) + count = 0 + for row in cur: + self.assertEqual ( len ( row['uuid'] ), 36 ) + count += 1 + self.assertEquals ( cur.fetchone(), None ) # make sure all rows retrieved + con.close() + self.assertEquals ( count, 25 ) + + def test_coverage ( self ): + """ a bunch of miscellaneous things to get code coverage to 100% """ + class Foo ( sqlite3worker.Frozen_object ): + pass + foo = Foo() + with self.assertRaises ( AttributeError ): + foo.bar = 'bar' + self.sqlite3worker.set_row_factory ( sqlite3worker.Row ) + self.assertEquals ( self.sqlite3worker.total_changes, 0 ) + self.sqlite3worker.set_text_factory ( unicode ) + with self.assertRaises ( sqlite3worker.OperationalError ): + self.sqlite3worker.executescript ( 'THIS IS INTENTIONALLY BAD SQL' ) + + # try to force and catch an assert in the close logic... + del self.sqlite3worker._threads[self.sqlite3worker._file_name] + with self.assertRaises ( AssertionError ): + self.sqlite3worker.close() + + self.assertEquals ( sqlite3worker.normalize_file_name ( ':MEMORY:' ), ':memory:' ) + with self.assertRaises ( sqlite3worker.ProgrammingError ): + self.sqlite3worker.executescript ( 'drop table tester' ) + with self.assertRaises ( sqlite3worker.ProgrammingError ): + self.sqlite3worker.commit() + +if __name__ == "__main__": # pragma: no cover ( only executed when running test directly ) if False: import sys logging.basicConfig ( stream=sys.stdout, level=logging.DEBUG, format='%(asctime)s [%(threadName)s %(levelname)s] %(message)s' ) From 6a9f7e15918ae72a8edc7b6475f456442588fb23 Mon Sep 17 00:00:00 2001 From: Royce Mitchell <3742423+remdragon@users.noreply.github.com> Date: Mon, 4 Sep 2017 16:38:22 -0500 Subject: [PATCH 11/11] sqlite3worker_test.py: py3 compatibility --- sqlite3worker_test.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sqlite3worker_test.py b/sqlite3worker_test.py index 7ac3c10..4a85aff 100755 --- a/sqlite3worker_test.py +++ b/sqlite3worker_test.py @@ -28,6 +28,7 @@ import logging import os +import sys import tempfile import threading import time @@ -37,6 +38,8 @@ import sqlite3worker +if sys.version_info[0] >= 3: + unicode = str class Sqlite3WorkerTests(unittest.TestCase): # pylint:disable=R0904 """Test out the sqlite3worker library.""" @@ -206,9 +209,9 @@ def run ( self ): for row in cur: self.assertEqual ( len ( row['uuid'] ), 36 ) count += 1 - self.assertEquals ( cur.fetchone(), None ) # make sure all rows retrieved + self.assertEqual ( cur.fetchone(), None ) # make sure all rows retrieved con.close() - self.assertEquals ( count, 25 ) + self.assertEqual ( count, 25 ) def test_coverage ( self ): """ a bunch of miscellaneous things to get code coverage to 100% """ @@ -218,7 +221,7 @@ class Foo ( sqlite3worker.Frozen_object ): with self.assertRaises ( AttributeError ): foo.bar = 'bar' self.sqlite3worker.set_row_factory ( sqlite3worker.Row ) - self.assertEquals ( self.sqlite3worker.total_changes, 0 ) + self.assertEqual ( self.sqlite3worker.total_changes, 0 ) self.sqlite3worker.set_text_factory ( unicode ) with self.assertRaises ( sqlite3worker.OperationalError ): self.sqlite3worker.executescript ( 'THIS IS INTENTIONALLY BAD SQL' ) @@ -228,7 +231,7 @@ class Foo ( sqlite3worker.Frozen_object ): with self.assertRaises ( AssertionError ): self.sqlite3worker.close() - self.assertEquals ( sqlite3worker.normalize_file_name ( ':MEMORY:' ), ':memory:' ) + self.assertEqual ( sqlite3worker.normalize_file_name ( ':MEMORY:' ), ':memory:' ) with self.assertRaises ( sqlite3worker.ProgrammingError ): self.sqlite3worker.executescript ( 'drop table tester' ) with self.assertRaises ( sqlite3worker.ProgrammingError ):